diff --git a/CHANGELOG.md b/CHANGELOG.md index 9dd502e2b0..70f088cc32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-sdk`: fix type annotations on `MetricReader` and related types + ([#4938](https://github.com/open-telemetry/opentelemetry-python/pull/4938/)) + ## Version 1.40.0/0.61b0 (2026-03-04) - `opentelemetry-sdk`: deprecate `LoggingHandler` in favor of `opentelemetry-instrumentation-logging`, see `opentelemetry-instrumentation-logging` documentation diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index 972317193d..52ce2d6e6f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -134,9 +134,9 @@ class ConsoleLogRecordExporter(LogRecordExporter): def __init__( self, out: IO = sys.stdout, - formatter: Callable[ - [ReadableLogRecord], str - ] = lambda record: record.to_json() + linesep, + formatter: Callable[[ReadableLogRecord], str] = lambda record: ( + record.to_json() + linesep + ), ): self.out = out self.formatter = formatter diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index cdbad3e343..f878755be8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -144,9 +144,9 @@ class ConsoleMetricExporter(MetricExporter): def __init__( self, out: IO = stdout, - formatter: Callable[ - [MetricsData], str - ] = lambda metrics_data: metrics_data.to_json() + linesep, + formatter: Callable[[MetricsData], str] = lambda metrics_data: ( + metrics_data.to_json() + linesep + ), preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[ @@ -353,7 +353,7 @@ def _set_collect_callback( "opentelemetry.sdk.metrics.export.MetricReader", AggregationTemporality, ], - Iterable["opentelemetry.sdk.metrics.export.Metric"], + MetricsData, ], ) -> None: """This function is internal to the SDK. It should not be called or overridden by users""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index c651033051..43ebc3d345 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -17,7 +17,7 @@ from abc import ABC, abstractmethod from threading import Lock from time import time_ns -from typing import Iterable, List, Mapping, Optional +from typing import List, Mapping, Optional # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics @@ -29,7 +29,7 @@ from opentelemetry.sdk.metrics._internal.metric_reader_storage import ( MetricReaderStorage, ) -from opentelemetry.sdk.metrics._internal.point import Metric +from opentelemetry.sdk.metrics._internal.point import MetricsData class MeasurementConsumer(ABC): @@ -51,7 +51,7 @@ def collect( self, metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, - ) -> Optional[Iterable[Metric]]: + ) -> Optional[MetricsData]: pass @@ -104,7 +104,7 @@ def collect( self, metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, - ) -> Optional[Iterable[Metric]]: + ) -> Optional[MetricsData]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] # for now, just use the defaults diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index a9108b7337..d853dfd6c4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -300,9 +300,9 @@ def __init__( self, service_name: str | None = None, out: typing.IO = sys.stdout, - formatter: typing.Callable[ - [ReadableSpan], str - ] = lambda span: span.to_json() + linesep, + formatter: typing.Callable[[ReadableSpan], str] = lambda span: ( + span.to_json() + linesep + ), ): self.out = out self.formatter = formatter diff --git a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py index bd70d18d20..ff2b743697 100644 --- a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py @@ -20,6 +20,11 @@ from opentelemetry.metrics import Observation from opentelemetry.sdk.metrics import Counter, MeterProvider +from opentelemetry.sdk.metrics._internal.point import ( + MetricsData, + ResourceMetrics, + ScopeMetrics, +) from opentelemetry.sdk.metrics.export import ( AggregationTemporality, InMemoryMetricReader, @@ -27,14 +32,20 @@ NumberDataPoint, Sum, ) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope class TestInMemoryMetricReader(TestCase): def test_no_metrics(self): - mock_collect_callback = Mock(return_value=[]) + mock_collect_callback = Mock( + return_value=MetricsData(resource_metrics=[]) + ) reader = InMemoryMetricReader() reader._set_collect_callback(mock_collect_callback) - self.assertEqual(reader.get_metrics_data(), []) + self.assertEqual( + reader.get_metrics_data(), MetricsData(resource_metrics=[]) + ) mock_collect_callback.assert_called_once() def test_converts_metrics_to_list(self): @@ -55,15 +66,32 @@ def test_converts_metrics_to_list(self): is_monotonic=True, ), ) - mock_collect_callback = Mock(return_value=(metric,)) + metric_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + scope_metrics=[ + ScopeMetrics( + metrics=[metric], + scope=InstrumentationScope(name="test"), + schema_url="", + ) + ], + resource=Resource.create(), + schema_url="", + ) + ] + ) + mock_collect_callback = Mock(return_value=metric_data) reader = InMemoryMetricReader() reader._set_collect_callback(mock_collect_callback) returned_metrics = reader.get_metrics_data() mock_collect_callback.assert_called_once() - self.assertIsInstance(returned_metrics, tuple) - self.assertEqual(len(returned_metrics), 1) - self.assertIs(returned_metrics[0], metric) + self.assertIsNotNone(returned_metrics) + self.assertIs( + returned_metrics.resource_metrics[0].scope_metrics[0].metrics[0], + metric, + ) def test_shutdown(self): # shutdown should always be successful diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 8722effe38..2caa2fad11 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -19,13 +19,18 @@ import weakref from logging import WARNING from time import sleep, time_ns -from typing import Optional, Sequence +from typing import Optional from unittest.mock import Mock import pytest from opentelemetry.sdk.metrics import Counter, MetricsTimeoutError from opentelemetry.sdk.metrics._internal import _Counter +from opentelemetry.sdk.metrics._internal.point import ( + MetricsData, + ResourceMetrics, + ScopeMetrics, +) from opentelemetry.sdk.metrics.export import ( AggregationTemporality, Gauge, @@ -40,6 +45,8 @@ DefaultAggregation, LastValueAggregation, ) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.test.concurrency_test import ConcurrencyTestBase @@ -48,7 +55,7 @@ def __init__( self, wait=0, preferred_temporality=None, preferred_aggregation=None ): self.wait = wait - self.metrics = [] + self.metrics: list[MetricsData] = [] self._shutdown = False super().__init__( preferred_temporality=preferred_temporality, @@ -57,13 +64,13 @@ def __init__( def export( self, - metrics_data: Sequence[Metric], + metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: sleep(self.wait) - self.metrics.extend(metrics_data) - return True + self.metrics.append(metrics_data) + return MetricExportResult.SUCCESS def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self._shutdown = True @@ -126,6 +133,21 @@ def collect(self, timeout_millis: float = 10_000) -> None: ), ), ] +metrics = MetricsData( + resource_metrics=[ + ResourceMetrics( + scope_metrics=[ + ScopeMetrics( + metrics=metrics_list, + scope=InstrumentationScope(name="test"), + schema_url="", + ) + ], + resource=Resource.create(), + schema_url="", + ) + ] +) class TestPeriodicExportingMetricReader(ConcurrencyTestBase): @@ -137,7 +159,12 @@ def test_defaults(self): pmr.shutdown() def _create_periodic_reader( - self, metrics, exporter, collect_wait=0, interval=60000, timeout=30000 + self, + metrics_data: MetricsData, + exporter, + collect_wait=0, + interval=60000, + timeout=30000, ): pmr = PeriodicExportingMetricReader( exporter, @@ -147,7 +174,7 @@ def _create_periodic_reader( def _collect(reader, timeout_millis): sleep(collect_wait) - pmr._receive_metrics(metrics, timeout_millis) + return metrics_data pmr._set_collect_callback(_collect) return pmr @@ -198,24 +225,26 @@ def test_ticker_value_exception_on_negative(self): def test_ticker_collects_metrics(self): exporter = FakeMetricsExporter() - pmr = self._create_periodic_reader( - metrics_list, exporter, interval=100 - ) + pmr = self._create_periodic_reader(metrics, exporter, interval=100) sleep(0.15) - self.assertEqual(exporter.metrics, metrics_list) + self.assertEqual(exporter.metrics[0], metrics) pmr.shutdown() def test_shutdown(self): exporter = FakeMetricsExporter() - pmr = self._create_periodic_reader([], exporter) + pmr = self._create_periodic_reader( + MetricsData(resource_metrics=[]), exporter + ) pmr.shutdown() - self.assertEqual(exporter.metrics, []) + self.assertEqual(exporter.metrics[0], MetricsData(resource_metrics=[])) self.assertTrue(pmr._shutdown) self.assertTrue(exporter._shutdown) def test_shutdown_multiple_times(self): - pmr = self._create_periodic_reader([], FakeMetricsExporter()) + pmr = self._create_periodic_reader( + MetricsData(resource_metrics=[]), FakeMetricsExporter() + ) with self.assertLogs(level="WARNING") as w: self.run_with_many_threads(pmr.shutdown) self.assertTrue("Can't shutdown multiple times" in w.output[0])