Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ class ConsoleLogRecordExporter(LogRecordExporter):
def __init__(
self,
out: IO = sys.stdout,
formatter: Callable[
[ReadableLogRecord], str
] = lambda record: record.to_json() + linesep,
formatter: Callable[[ReadableLogRecord], str] = lambda record: (
record.to_json() + linesep
),
):
self.out = out
self.formatter = formatter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ class ConsoleMetricExporter(MetricExporter):
def __init__(
self,
out: IO = stdout,
formatter: Callable[
[MetricsData], str
] = lambda metrics_data: metrics_data.to_json() + linesep,
formatter: Callable[[MetricsData], str] = lambda metrics_data: (
metrics_data.to_json() + linesep
),
preferred_temporality: dict[type, AggregationTemporality]
| None = None,
preferred_aggregation: dict[
Expand Down Expand Up @@ -353,7 +353,7 @@ def _set_collect_callback(
"opentelemetry.sdk.metrics.export.MetricReader",
AggregationTemporality,
],
Iterable["opentelemetry.sdk.metrics.export.Metric"],
MetricsData,
],
) -> None:
"""This function is internal to the SDK. It should not be called or overridden by users"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from abc import ABC, abstractmethod
from threading import Lock
from time import time_ns
from typing import Iterable, List, Mapping, Optional
from typing import List, Mapping, Optional

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk.metrics
Expand All @@ -29,7 +29,7 @@
from opentelemetry.sdk.metrics._internal.metric_reader_storage import (
MetricReaderStorage,
)
from opentelemetry.sdk.metrics._internal.point import Metric
from opentelemetry.sdk.metrics._internal.point import MetricsData


class MeasurementConsumer(ABC):
Expand All @@ -51,7 +51,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Optional[Iterable[Metric]]:
) -> Optional[MetricsData]:
pass


Expand Down Expand Up @@ -104,7 +104,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Optional[Iterable[Metric]]:
) -> Optional[MetricsData]:
with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
# for now, just use the defaults
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,9 @@ def __init__(
self,
service_name: str | None = None,
out: typing.IO = sys.stdout,
formatter: typing.Callable[
[ReadableSpan], str
] = lambda span: span.to_json() + linesep,
formatter: typing.Callable[[ReadableSpan], str] = lambda span: (
span.to_json() + linesep
),
):
self.out = out
self.formatter = formatter
Expand Down
40 changes: 34 additions & 6 deletions opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,32 @@

from opentelemetry.metrics import Observation
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics._internal.point import (
MetricsData,
ResourceMetrics,
ScopeMetrics,
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
InMemoryMetricReader,
Metric,
NumberDataPoint,
Sum,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope


class TestInMemoryMetricReader(TestCase):
def test_no_metrics(self):
mock_collect_callback = Mock(return_value=[])
mock_collect_callback = Mock(
return_value=MetricsData(resource_metrics=[])
)
reader = InMemoryMetricReader()
reader._set_collect_callback(mock_collect_callback)
self.assertEqual(reader.get_metrics_data(), [])
self.assertEqual(
reader.get_metrics_data(), MetricsData(resource_metrics=[])
)
mock_collect_callback.assert_called_once()

def test_converts_metrics_to_list(self):
Expand All @@ -55,15 +66,32 @@ def test_converts_metrics_to_list(self):
is_monotonic=True,
),
)
mock_collect_callback = Mock(return_value=(metric,))
metric_data = MetricsData(
resource_metrics=[
ResourceMetrics(
scope_metrics=[
ScopeMetrics(
metrics=[metric],
scope=InstrumentationScope(name="test"),
schema_url="",
)
],
resource=Resource.create(),
schema_url="",
)
]
)
mock_collect_callback = Mock(return_value=metric_data)
reader = InMemoryMetricReader()
reader._set_collect_callback(mock_collect_callback)

returned_metrics = reader.get_metrics_data()
mock_collect_callback.assert_called_once()
self.assertIsInstance(returned_metrics, tuple)
self.assertEqual(len(returned_metrics), 1)
self.assertIs(returned_metrics[0], metric)
self.assertIsNotNone(returned_metrics)
self.assertIs(
returned_metrics.resource_metrics[0].scope_metrics[0].metrics[0],
metric,
)

def test_shutdown(self):
# shutdown should always be successful
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
import weakref
from logging import WARNING
from time import sleep, time_ns
from typing import Optional, Sequence
from typing import Optional
from unittest.mock import Mock

import pytest

from opentelemetry.sdk.metrics import Counter, MetricsTimeoutError
from opentelemetry.sdk.metrics._internal import _Counter
from opentelemetry.sdk.metrics._internal.point import (
MetricsData,
ResourceMetrics,
ScopeMetrics,
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Gauge,
Expand All @@ -40,6 +45,8 @@
DefaultAggregation,
LastValueAggregation,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.test.concurrency_test import ConcurrencyTestBase


Expand All @@ -48,7 +55,7 @@ def __init__(
self, wait=0, preferred_temporality=None, preferred_aggregation=None
):
self.wait = wait
self.metrics = []
self.metrics: list[MetricsData] = []
self._shutdown = False
super().__init__(
preferred_temporality=preferred_temporality,
Expand All @@ -57,13 +64,13 @@ def __init__(

def export(
self,
metrics_data: Sequence[Metric],
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
sleep(self.wait)
self.metrics.extend(metrics_data)
return True
self.metrics.append(metrics_data)
return MetricExportResult.SUCCESS

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._shutdown = True
Expand Down Expand Up @@ -126,6 +133,21 @@ def collect(self, timeout_millis: float = 10_000) -> None:
),
),
]
metrics = MetricsData(
resource_metrics=[
ResourceMetrics(
scope_metrics=[
ScopeMetrics(
metrics=metrics_list,
scope=InstrumentationScope(name="test"),
schema_url="",
)
],
resource=Resource.create(),
schema_url="",
)
]
)


class TestPeriodicExportingMetricReader(ConcurrencyTestBase):
Expand All @@ -137,7 +159,12 @@ def test_defaults(self):
pmr.shutdown()

def _create_periodic_reader(
self, metrics, exporter, collect_wait=0, interval=60000, timeout=30000
self,
metrics_data: MetricsData,
exporter,
collect_wait=0,
interval=60000,
timeout=30000,
):
pmr = PeriodicExportingMetricReader(
exporter,
Expand All @@ -147,7 +174,7 @@ def _create_periodic_reader(

def _collect(reader, timeout_millis):
sleep(collect_wait)
pmr._receive_metrics(metrics, timeout_millis)
return metrics_data

pmr._set_collect_callback(_collect)
return pmr
Expand Down Expand Up @@ -198,24 +225,26 @@ def test_ticker_value_exception_on_negative(self):
def test_ticker_collects_metrics(self):
exporter = FakeMetricsExporter()

pmr = self._create_periodic_reader(
metrics_list, exporter, interval=100
)
pmr = self._create_periodic_reader(metrics, exporter, interval=100)
sleep(0.15)
self.assertEqual(exporter.metrics, metrics_list)
self.assertEqual(exporter.metrics[0], metrics)
pmr.shutdown()

def test_shutdown(self):
exporter = FakeMetricsExporter()

pmr = self._create_periodic_reader([], exporter)
pmr = self._create_periodic_reader(
MetricsData(resource_metrics=[]), exporter
)
pmr.shutdown()
self.assertEqual(exporter.metrics, [])
self.assertEqual(exporter.metrics[0], MetricsData(resource_metrics=[]))
self.assertTrue(pmr._shutdown)
self.assertTrue(exporter._shutdown)

def test_shutdown_multiple_times(self):
pmr = self._create_periodic_reader([], FakeMetricsExporter())
pmr = self._create_periodic_reader(
MetricsData(resource_metrics=[]), FakeMetricsExporter()
)
with self.assertLogs(level="WARNING") as w:
self.run_with_many_threads(pmr.shutdown)
self.assertTrue("Can't shutdown multiple times" in w.output[0])
Expand Down
Loading