Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from opentelemetry import metrics
from opentelemetry.metrics import CallbackOptions, Observation
from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs import ReadWriteLogRecord
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.semconv.attributes.exception_attributes import (
EXCEPTION_MESSAGE,
Expand Down Expand Up @@ -630,13 +630,13 @@ def _record_span(self, span: ReadableSpan) -> None:
except Exception: # pylint: disable=broad-except
_logger.exception("Exception occurred while recording span.") # pylint: disable=C4769

def _record_log_record(self, readable_log_record: ReadableLogRecord) -> None:
def _record_log_record(self, read_write_log_record: ReadWriteLogRecord) -> None:
try:
# pylint: disable=global-statement
global _EXCEPTIONS_COUNT
if readable_log_record.log_record:
if read_write_log_record.log_record:
exc_type = None
log_record = readable_log_record.log_record
log_record = read_write_log_record.log_record
if log_record.attributes:
exc_type = log_record.attributes.get(EXCEPTION_TYPE)
exc_message = log_record.attributes.get(EXCEPTION_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from opentelemetry.sdk._logs import ReadableLogRecord, LogRecordProcessor
from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor

from azure.monitor.opentelemetry.exporter._performance_counters._manager import _PerformanceCountersManager
Expand All @@ -13,18 +13,18 @@ def __init__(self):
super().__init__()
self.call_on_emit = hasattr(super(), "on_emit")

def on_emit(self, readable_log_record: ReadableLogRecord) -> None: # type: ignore # pylint: disable=arguments-renamed
def on_emit(self, log_record: ReadWriteLogRecord) -> None: # type: ignore # pylint: disable=arguments-renamed
pcm = _PerformanceCountersManager()
if pcm:
pcm._record_log_record(readable_log_record)
pcm._record_log_record(log_record)
if self.call_on_emit:
super().on_emit(readable_log_record) # type: ignore[safe-super]
super().on_emit(log_record) # type: ignore[safe-super]
else:
# this method was removed in opentelemetry-sdk and replaced with on_emit
super().emit(readable_log_record) # type: ignore[safe-super,misc] # pylint: disable=no-member
super().emit(log_record) # type: ignore[safe-super,misc] # pylint: disable=no-member

def emit(self, readable_log_record: ReadableLogRecord) -> None:
self.on_emit(readable_log_record)
def emit(self, log_record: ReadWriteLogRecord) -> None:
self.on_emit(log_record)

def shutdown(self):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import psutil

from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs import ReadWriteLogRecord
from opentelemetry.sdk.metrics import MeterProvider, Meter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan
Expand Down Expand Up @@ -214,43 +214,43 @@ def _create_metric_instruments(self) -> None:
if not self._meter:
raise ValueError("Meter must be initialized before creating instruments")

self._request_duration = self._meter.create_histogram(
self._request_duration = self._meter.create_histogram( # type: ignore
_REQUEST_DURATION_NAME[0], "ms", "live metrics avg request duration in ms"
)
self._dependency_duration = self._meter.create_histogram(
self._dependency_duration = self._meter.create_histogram( # type: ignore
_DEPENDENCY_DURATION_NAME[0], "ms", "live metrics avg dependency duration in ms"
)
# We use a counter to represent rates per second because collection
# interval is one second so we simply need the number of requests
# within the collection interval
self._request_rate_counter = self._meter.create_counter(
self._request_rate_counter = self._meter.create_counter( # type: ignore
_REQUEST_RATE_NAME[0], "req/sec", "live metrics request rate per second"
)
self._request_failed_rate_counter = self._meter.create_counter(
self._request_failed_rate_counter = self._meter.create_counter( # type: ignore
_REQUEST_FAILURE_RATE_NAME[0], "req/sec", "live metrics request failed rate per second"
)
self._dependency_rate_counter = self._meter.create_counter(
self._dependency_rate_counter = self._meter.create_counter( # type: ignore
_DEPENDENCY_RATE_NAME[0], "dep/sec", "live metrics dependency rate per second"
)
self._dependency_failure_rate_counter = self._meter.create_counter(
self._dependency_failure_rate_counter = self._meter.create_counter( # type: ignore
_DEPENDENCY_FAILURE_RATE_NAME[0], "dep/sec", "live metrics dependency failure rate per second"
)
self._exception_rate_counter = self._meter.create_counter(
self._exception_rate_counter = self._meter.create_counter( # type: ignore
_EXCEPTION_RATE_NAME[0], "exc/sec", "live metrics exception rate per second"
)
self._process_memory_gauge_old = self._meter.create_observable_gauge(
self._process_memory_gauge_old = self._meter.create_observable_gauge( # type: ignore
_COMMITTED_BYTES_NAME[0],
[_get_process_memory],
)
self._process_memory_gauge = self._meter.create_observable_gauge(
self._process_memory_gauge = self._meter.create_observable_gauge( # type: ignore
_PROCESS_PHYSICAL_BYTES_NAME[0],
[_get_process_memory],
)
self._process_time_gauge_old = self._meter.create_observable_gauge(
self._process_time_gauge_old = self._meter.create_observable_gauge( # type: ignore
_PROCESSOR_TIME_NAME[0],
[_get_process_time_normalized_old],
)
self._process_time_gauge = self._meter.create_observable_gauge(
self._process_time_gauge = self._meter.create_observable_gauge( # type: ignore
_PROCESS_TIME_NORMALIZED_NAME[0],
[_get_process_time_normalized],
)
Expand Down Expand Up @@ -354,7 +354,7 @@ def _record_span(self, span: ReadableSpan) -> None:
except Exception as e: # pylint: disable=broad-except
_logger.exception("Exception occurred while recording span: %s", e) # pylint: disable=C4769

def _record_log_record(self, readable_log_record: ReadableLogRecord) -> None:
def _record_log_record(self, read_write_log_record: ReadWriteLogRecord) -> None:
# Only record if in post state and manager is initialized
if not (_is_post_state() and self.is_initialized()):
return
Expand All @@ -365,9 +365,9 @@ def _record_log_record(self, readable_log_record: ReadableLogRecord) -> None:
return

try:
if readable_log_record.log_record:
if read_write_log_record.log_record:
exc_type = None
log_record = readable_log_record.log_record
log_record = read_write_log_record.log_record
if log_record.attributes:
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from opentelemetry.sdk._logs import ReadableLogRecord, LogRecordProcessor
from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor

from azure.monitor.opentelemetry.exporter._quickpulse._state import get_quickpulse_manager
Expand All @@ -13,18 +13,18 @@ def __init__(self):
super().__init__()
self.call_on_emit = hasattr(super(), "on_emit")

def on_emit(self, readable_log_record: ReadableLogRecord) -> None: # type: ignore # pylint: disable=arguments-renamed
def on_emit(self, log_record: ReadWriteLogRecord) -> None: # type: ignore # pylint: disable=arguments-renamed
qpm = get_quickpulse_manager()
if qpm:
qpm._record_log_record(readable_log_record)
qpm._record_log_record(log_record)
if self.call_on_emit:
super().on_emit(readable_log_record) # type: ignore[safe-super]
super().on_emit(log_record) # type: ignore[safe-super]
else:
# this method was removed in opentelemetry-sdk and replaced with on_emit
super().emit(readable_log_record) # type: ignore[safe-super,misc] # pylint: disable=no-member
super().emit(log_record) # type: ignore[safe-super,misc] # pylint: disable=no-member

def emit(self, readable_log_record: ReadableLogRecord) -> None: # pylint: disable=arguments-renamed
self.on_emit(readable_log_record)
def emit(self, log_record: ReadWriteLogRecord) -> None: # pylint: disable=arguments-renamed
self.on_emit(log_record)

def shutdown(self):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
enduser_attributes as _enduser_attributes,
)
except ImportError:
_enduser_attributes = None
_enduser_attributes = None # type: ignore

from azure.monitor.opentelemetry.exporter import _utils
from azure.monitor.opentelemetry.exporter._constants import (
Expand Down Expand Up @@ -85,7 +85,7 @@ def export(self, batch: Sequence[ReadableLogRecord], **kwargs: Any) -> LogRecord
:param batch: OpenTelemetry ReadableLogRecord(s) to export.
:type batch: ~typing.Sequence[~opentelemetry._logs.ReadableLogRecord]
:return: The result of the export.
:rtype: ~opentelemetry.sdk._logs.export.ReadableLogRecord
:rtype: ~opentelemetry.sdk._logs.export.LogRecordExportResult
"""
envelopes = [self._log_to_envelope(log) for log in batch]
try:
Expand Down Expand Up @@ -150,9 +150,9 @@ def _convert_log_to_envelope(readable_log_record: ReadableLogRecord) -> Telemetr
tags.update(_utils._populate_part_a_fields(readable_log_record.resource)) # type: ignore
tags[ContextTagKeys.AI_OPERATION_ID] = "{:032x}".format(log_record.trace_id or _DEFAULT_TRACE_ID) # type: ignore
if log_record.attributes and _ENDUSER_ID_ATTRIBUTE in log_record.attributes:
tags[ContextTagKeys.AI_USER_AUTH_USER_ID] = log_record.attributes[_ENDUSER_ID_ATTRIBUTE]
tags[ContextTagKeys.AI_USER_AUTH_USER_ID] = log_record.attributes[_ENDUSER_ID_ATTRIBUTE] # type: ignore
if log_record.attributes and _ENDUSER_PSEUDO_ID_ATTRIBUTE in log_record.attributes:
tags[ContextTagKeys.AI_USER_ID] = log_record.attributes[_ENDUSER_PSEUDO_ID_ATTRIBUTE]
tags[ContextTagKeys.AI_USER_ID] = log_record.attributes[_ENDUSER_PSEUDO_ID_ATTRIBUTE] # type: ignore

tags[ContextTagKeys.AI_OPERATION_PARENT_ID] = "{:016x}".format( # type: ignore
log_record.span_id or _DEFAULT_SPAN_ID
Expand Down Expand Up @@ -228,9 +228,10 @@ def _convert_log_to_envelope(readable_log_record: ReadableLogRecord) -> Telemetr
severity_level=severity_level, # type: ignore
properties=properties,
)
data.message = data.message.strip()
if len(data.message) == 0:
data.message = _DEFAULT_LOG_MESSAGE
if hasattr(data, "message"):
data.message = data.message.strip()
if len(data.message) == 0:
data.message = _DEFAULT_LOG_MESSAGE
envelope.data = MonitorBase(base_data=data, base_type="MessageData")

return envelope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from typing import Optional, Dict, Any

from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
from opentelemetry.sdk._logs import ReadWriteLogRecord
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogRecordExporter
from opentelemetry.trace import get_current_span


Expand All @@ -13,40 +13,40 @@ class _AzureBatchLogRecordProcessor(BatchLogRecordProcessor):

def __init__(
self,
log_exporter: LogExporter,
log_record_exporter: LogRecordExporter,
options: Optional[Dict[str, Any]] = None,
):
"""Initialize the Azure Monitor Log Record Processor.

:param exporter: The LogRecordExporter to use for exporting logs.
:param log_record_exporter: The LogRecordExporter to use for exporting logs.
:param options: Optional configuration dictionary. Supported options:
- enable_trace_based_sampling_for_logs(bool): Enable trace-based sampling for logs.
"""
super().__init__(log_exporter)
super().__init__(log_record_exporter)
self._options = options or {}
self._enable_trace_based_sampling_for_logs = self._options.get("enable_trace_based_sampling_for_logs")

def on_emit(self, readable_log_record: ReadableLogRecord) -> None: # pylint: disable=arguments-renamed
def on_emit(self, log_record: ReadWriteLogRecord) -> None: # pylint: disable=arguments-renamed
# cspell: disable
"""Determines whether the logger should drop log records associated with unsampled traces.
If `trace_based_sampling` is `true`, log records associated with unsampled traces are dropped by the `Logger`.
If `enable_trace_based_sampling_for_logs` is `true`, log records associated with unsampled traces are
dropped by the `Logger`.
A log record is considered associated with an unsampled trace if it has a valid `SpanId` and its
`TraceFlags` indicate that the trace is unsampled. A log record that isn't associated with a trace
context is not affected by this parameter and therefore bypasses trace based sampling filtering.

:param readable_log_record: Contains the log record to be exported
:type readable_log_record: ReadableLogRecord
:param log_record: Contains the log record to be exported
:type log_record: ReadWriteLogRecord
"""

# cspell: enable
if self._enable_trace_based_sampling_for_logs:
if hasattr(readable_log_record, "log_record") and readable_log_record.log_record is not None:
if hasattr(log_record, "log_record") and log_record.log_record is not None:
if (
hasattr(readable_log_record.log_record, "context")
and readable_log_record.log_record.context is not None
hasattr(log_record.log_record, "context") and log_record.log_record.context is not None
): # pylint: disable=line-too-long
span = get_current_span(readable_log_record.log_record.context)
span = get_current_span(log_record.log_record.context)
span_context = span.get_span_context()
if span_context.is_valid and not span_context.trace_flags.sampled:
return
super().on_emit(readable_log_record)
super().on_emit(log_record)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from opentelemetry.trace import SpanKind
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs import ReadWriteLogRecord
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader

Expand Down Expand Up @@ -570,15 +570,15 @@ def test_record_log_record_with_exception(self, mock_get_meter_provider):
mock_log_record = MagicMock()
mock_log_record.attributes = {EXCEPTION_TYPE: "ValueError", EXCEPTION_MESSAGE: "Test exception"}

mock_readable_log_record = MagicMock(spec=ReadableLogRecord)
mock_readable_log_record.log_record = mock_log_record
mock_read_write_log_record = MagicMock(spec=ReadWriteLogRecord)
mock_read_write_log_record.log_record = mock_log_record

# Import to access global counter
import azure.monitor.opentelemetry.exporter._performance_counters._manager as manager_module

initial_exceptions = manager_module._EXCEPTIONS_COUNT

manager._record_log_record(mock_readable_log_record)
manager._record_log_record(mock_read_write_log_record)

# Check that exception was counted
self.assertEqual(manager_module._EXCEPTIONS_COUNT, initial_exceptions + 1)
Expand All @@ -591,15 +591,15 @@ def test_record_log_record_without_exception(self):
mock_log_record = MagicMock()
mock_log_record.attributes = {"normal": "attribute"}

mock_readable_log_record = MagicMock(spec=ReadableLogRecord)
mock_readable_log_record.log_record = mock_log_record
mock_read_write_log_record = MagicMock(spec=ReadWriteLogRecord)
mock_read_write_log_record.log_record = mock_log_record

# Import to access global counter
import azure.monitor.opentelemetry.exporter._performance_counters._manager as manager_module

initial_exceptions = manager_module._EXCEPTIONS_COUNT

manager._record_log_record(mock_readable_log_record)
manager._record_log_record(mock_read_write_log_record)

# Exception count should not change
self.assertEqual(manager_module._EXCEPTIONS_COUNT, initial_exceptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from unittest import mock
from unittest.mock import MagicMock

from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs import ReadWriteLogRecord
from opentelemetry.sdk.trace import ReadableSpan

from azure.monitor.opentelemetry.exporter._performance_counters._processor import (
Expand Down Expand Up @@ -42,13 +42,13 @@ def test_on_emit_with_manager(self, mock_manager_class):
processor = _PerformanceCountersLogRecordProcessor()

# Create mock log data
mock_readable_log_record = MagicMock(spec=ReadableLogRecord)
mock_read_write_log_record = MagicMock(spec=ReadWriteLogRecord)

processor.on_emit(mock_readable_log_record)
processor.on_emit(mock_read_write_log_record)

# Verify manager was called
mock_manager_class.assert_called_once()
mock_manager._record_log_record.assert_called_once_with(mock_readable_log_record)
mock_manager._record_log_record.assert_called_once_with(mock_read_write_log_record)

def test_emit_calls_on_emit(self):
"""Test emit method calls on_emit."""
Expand All @@ -58,12 +58,12 @@ def test_emit_calls_on_emit(self):
processor.on_emit = MagicMock()

# Create mock log data
mock_readable_log_record = MagicMock(spec=ReadableLogRecord)
mock_read_write_log_record = MagicMock(spec=ReadWriteLogRecord)

processor.emit(mock_readable_log_record)
processor.emit(mock_read_write_log_record)

# Verify on_emit was called
processor.on_emit.assert_called_once_with(mock_readable_log_record)
processor.on_emit.assert_called_once_with(mock_read_write_log_record)

def test_shutdown(self):
"""Test shutdown method."""
Expand Down Expand Up @@ -91,11 +91,11 @@ def test_exception_propagation_in_on_emit(self, mock_manager_class):
processor = _PerformanceCountersLogRecordProcessor()

# Create mock log data
mock_readable_log_record = MagicMock(spec=ReadableLogRecord)
mock_read_write_log_record = MagicMock(spec=ReadWriteLogRecord)

# Exception should be propagated
with self.assertRaises(Exception) as context:
processor.on_emit(mock_readable_log_record)
processor.on_emit(mock_read_write_log_record)
self.assertEqual(str(context.exception), "Test error")


Expand Down
Loading