diff --git a/py/plugins/firebase/README.md b/py/plugins/firebase/README.md index c371a66497..856daf4004 100644 --- a/py/plugins/firebase/README.md +++ b/py/plugins/firebase/README.md @@ -3,16 +3,25 @@ This Genkit plugin provides a set of tools and utilities for working with Firebase. -## Install +## Telemetry -Firestore integrations (no telemetry): +The Firebase plugin provides easy integration with Google Cloud Observability (Cloud Trace and Cloud Monitoring). -```bash -pip install genkit-plugin-firebase -``` +To enable telemetry: -Telemetry export to Google Cloud Observability (Cloud Trace + Cloud Monitoring): +```python +from genkit.plugins.firebase import add_firebase_telemetry -```bash -pip install "genkit-plugin-firebase[telemetry]" +# Enable telemetry (defaults to production-only export) +add_firebase_telemetry() ``` + +### Configuration + +`add_firebase_telemetry` supports the following options: + +- `project_id`: Firebase project ID (optional, auto-detected). +- `force_dev_export`: Set to `True` to export telemetry in dev environment (defaults to `False`). +- `log_input_and_output`: Set to `True` to log model inputs and outputs (defaults to `False` / redacted). +- `disable_metrics`: Set to `True` to disable metrics export. +- `disable_traces`: Set to `True` to disable trace export. diff --git a/py/plugins/firebase/pyproject.toml b/py/plugins/firebase/pyproject.toml index cbc520c646..307f516345 100644 --- a/py/plugins/firebase/pyproject.toml +++ b/py/plugins/firebase/pyproject.toml @@ -44,6 +44,7 @@ classifiers = [ ] dependencies = [ "genkit", + "genkit-plugin-google-cloud", "google-cloud-firestore", "strenum>=0.4.15; python_version < '3.11'", ] diff --git a/py/plugins/firebase/src/genkit/plugins/firebase/__init__.py b/py/plugins/firebase/src/genkit/plugins/firebase/__init__.py index dc5bb7159b..1cd1a84508 100644 --- a/py/plugins/firebase/src/genkit/plugins/firebase/__init__.py +++ b/py/plugins/firebase/src/genkit/plugins/firebase/__init__.py @@ -175,6 +175,11 @@ - Genkit documentation: https://genkit.dev/ """ +from typing import Any + +from opentelemetry.sdk.trace.sampling import Sampler + +from .constant import FirebaseTelemetryConfig from .firestore import define_firestore_vector_store @@ -187,26 +192,76 @@ def package_name() -> str: return 'genkit.plugins.firebase' -def add_firebase_telemetry() -> None: +def add_firebase_telemetry( + config: FirebaseTelemetryConfig | None = None, + *, + project_id: str | None = None, + credentials: dict[str, Any] | None = None, + sampler: Sampler | None = None, + log_input_and_output: bool = False, + force_dev_export: bool = False, + disable_metrics: bool = False, + disable_traces: bool = False, + metric_export_interval_ms: int | None = None, + metric_export_timeout_ms: int | None = None, +) -> None: """Add Firebase telemetry export to Google Cloud Observability. - Exports traces to Cloud Trace and metrics to Cloud Monitoring. - In development (GENKIT_ENV=dev), telemetry is disabled by default. + Exports traces to Cloud Trace, metrics to Cloud Monitoring, and logs to + Cloud Logging. In development (GENKIT_ENV=dev), telemetry is disabled by + default unless force_dev_export is True. + + Args: + config: FirebaseTelemetryConfig object. If provided, kwargs are ignored. + project_id: Firebase project ID. Auto-detected from environment if None. + credentials: Service account credentials dictionary. + sampler: OpenTelemetry trace sampler. + log_input_and_output: If True, logs feature inputs/outputs. WARNING: May log PII. + force_dev_export: If True, exports in dev mode. + disable_metrics: If True, disables metrics export. + disable_traces: If True, disables trace export. + metric_export_interval_ms: Metrics export interval in ms. Minimum: 1000ms. + metric_export_timeout_ms: Metrics export timeout in ms. + + Example:: + + # Using kwargs + add_firebase_telemetry(project_id='my-project', log_input_and_output=True) + + # Using config object + config = FirebaseTelemetryConfig(project_id='my-project') + add_firebase_telemetry(config) """ try: # Imported lazily so Firestore-only users don't need telemetry deps. - from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry + from .telemetry import add_firebase_telemetry as _add_firebase_telemetry except ImportError as e: raise ImportError( 'Firebase telemetry requires the Google Cloud telemetry exporter. ' 'Install it with: pip install "genkit-plugin-firebase[telemetry]"' ) from e - add_gcp_telemetry(force_export=False) + if config is not None: + _add_firebase_telemetry(config) + else: + _add_firebase_telemetry( + FirebaseTelemetryConfig( + project_id=project_id, + credentials=credentials, + sampler=sampler, + log_input_and_output=log_input_and_output, + force_dev_export=force_dev_export, + disable_metrics=disable_metrics, + disable_traces=disable_traces, + metric_export_interval_ms=metric_export_interval_ms, + metric_export_timeout_ms=metric_export_timeout_ms, + ) + ) __all__ = [ 'add_firebase_telemetry', 'define_firestore_vector_store', + 'FirebaseTelemetryConfig', 'package_name', ] diff --git a/py/plugins/firebase/src/genkit/plugins/firebase/constant.py b/py/plugins/firebase/src/genkit/plugins/firebase/constant.py index d62758ef5b..aa9929e06a 100644 --- a/py/plugins/firebase/src/genkit/plugins/firebase/constant.py +++ b/py/plugins/firebase/src/genkit/plugins/firebase/constant.py @@ -14,11 +14,40 @@ # # SPDX-License-Identifier: Apache-2.0 -"""Firebase constants.""" +"""Firebase constants and configuration models.""" from collections.abc import Callable -from typing import Any +from typing import Annotated, Any from google.cloud.firestore_v1 import DocumentSnapshot +from opentelemetry.sdk.trace.sampling import Sampler +from pydantic import BaseModel, Field MetadataTransformFn = Callable[[DocumentSnapshot], dict[str, Any]] + + +class FirebaseTelemetryConfig(BaseModel): + """Configuration for Firebase telemetry export to Google Cloud Observability. + + Args: + project_id: Firebase project ID. Auto-detected from environment if None. + credentials: Service account credentials dictionary. + sampler: OpenTelemetry trace sampler. + log_input_and_output: If True, logs feature inputs/outputs. WARNING: May log PII. + force_dev_export: If True, exports telemetry in dev mode (GENKIT_ENV=dev). + disable_metrics: If True, disables metrics export. + disable_traces: If True, disables trace export. + metric_export_interval_ms: Metrics export interval in ms. Minimum: 1000ms. + metric_export_timeout_ms: Metrics export timeout in ms. + """ + + project_id: str | None = None + credentials: dict[str, Any] | None = None + sampler: Sampler | None = None + log_input_and_output: bool = False + force_dev_export: bool = False + disable_metrics: bool = False + disable_traces: bool = False + metric_export_interval_ms: Annotated[int, Field(ge=1000)] | None = None + metric_export_timeout_ms: int | None = None + model_config = {'arbitrary_types_allowed': True} diff --git a/py/plugins/firebase/src/genkit/plugins/firebase/telemetry.py b/py/plugins/firebase/src/genkit/plugins/firebase/telemetry.py new file mode 100644 index 0000000000..a2bd26220f --- /dev/null +++ b/py/plugins/firebase/src/genkit/plugins/firebase/telemetry.py @@ -0,0 +1,55 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Firebase telemetry integration.""" + +from genkit.core.logging import get_logger +from genkit.plugins.google_cloud.telemetry.config import GcpTelemetry + +from .constant import FirebaseTelemetryConfig + +logger = get_logger(__name__) + + +def add_firebase_telemetry(config: FirebaseTelemetryConfig) -> None: + """Add Firebase telemetry export to Google Cloud Observability. + + Exports traces to Cloud Trace, metrics to Cloud Monitoring, and logs to + Cloud Logging. In development (GENKIT_ENV=dev), telemetry is disabled by + default unless force_dev_export is set to True. + + Args: + config: FirebaseTelemetryConfig object with telemetry configuration. + """ + manager = GcpTelemetry( + project_id=config.project_id, + credentials=config.credentials, + sampler=config.sampler, + log_input_and_output=config.log_input_and_output, + force_dev_export=config.force_dev_export, + disable_metrics=config.disable_metrics, + disable_traces=config.disable_traces, + metric_export_interval_ms=config.metric_export_interval_ms, + metric_export_timeout_ms=config.metric_export_timeout_ms, + ) + + if not manager.project_id: + logger.warning( + 'Firebase project ID not found. Set FIREBASE_PROJECT_ID, GOOGLE_CLOUD_PROJECT, ' + 'or GCLOUD_PROJECT environment variable, or pass project_id parameter.' + ) + + manager.initialize() diff --git a/py/plugins/firebase/src/genkit/plugins/firebase/tests/firebase_telemetry_test.py b/py/plugins/firebase/src/genkit/plugins/firebase/tests/firebase_telemetry_test.py index e03a08914f..dbda63ccb4 100644 --- a/py/plugins/firebase/src/genkit/plugins/firebase/tests/firebase_telemetry_test.py +++ b/py/plugins/firebase/src/genkit/plugins/firebase/tests/firebase_telemetry_test.py @@ -57,11 +57,35 @@ def _create_model_span( return mock_span -@patch('genkit.plugins.google_cloud.telemetry.tracing.add_gcp_telemetry') -def test_firebase_telemetry_delegates_to_gcp(mock_add_gcp_telemetry: MagicMock) -> None: - """Test that Firebase telemetry delegates to GCP telemetry.""" +@patch('genkit.plugins.firebase.telemetry.GcpTelemetry') +def test_firebase_telemetry_initializes_gcp_telemetry(mock_gcp_telemetry_cls: MagicMock) -> None: + """Test that Firebase telemetry initializes GcpTelemetry with correct defaults.""" + mock_manager = MagicMock() + mock_gcp_telemetry_cls.return_value = mock_manager + add_firebase_telemetry() - mock_add_gcp_telemetry.assert_called_once_with(force_export=False) + + mock_gcp_telemetry_cls.assert_called_once() + kwargs = mock_gcp_telemetry_cls.call_args.kwargs + assert kwargs['force_dev_export'] is False + mock_manager.initialize.assert_called_once() + + +def test_firebase_telemetry_passes_configuration() -> None: + """Test that configuration options are passed to GcpTelemetry.""" + with patch('genkit.plugins.firebase.telemetry.GcpTelemetry') as mock_gcp_telemetry_cls: + add_firebase_telemetry( + project_id='test-project', + log_input_and_output=True, + force_dev_export=True, + disable_metrics=True, + ) + + kwargs = mock_gcp_telemetry_cls.call_args.kwargs + assert kwargs['project_id'] == 'test-project' + assert kwargs['log_input_and_output'] is True + assert kwargs['force_dev_export'] is True + assert kwargs['disable_metrics'] is True @patch('genkit.plugins.google_cloud.telemetry.metrics._output_tokens') diff --git a/py/plugins/firebase/tests/firebase_plugin_test.py b/py/plugins/firebase/tests/firebase_plugin_test.py index 4168414a69..afc9f44303 100644 --- a/py/plugins/firebase/tests/firebase_plugin_test.py +++ b/py/plugins/firebase/tests/firebase_plugin_test.py @@ -22,6 +22,7 @@ import pytest from genkit.plugins.firebase import ( + FirebaseTelemetryConfig, add_firebase_telemetry, define_firestore_vector_store, package_name, @@ -33,11 +34,43 @@ def test_package_name() -> None: assert package_name() == 'genkit.plugins.firebase' -@patch('genkit.plugins.google_cloud.telemetry.tracing.add_gcp_telemetry') -def test_add_firebase_telemetry_calls_gcp_telemetry(mock_add_gcp: MagicMock) -> None: +@patch('genkit.plugins.firebase.telemetry.GcpTelemetry') +def test_add_firebase_telemetry_calls_gcp_telemetry(mock_gcp_telemetry_cls: MagicMock) -> None: """Test add_firebase_telemetry delegates to GCP telemetry.""" + mock_manager = MagicMock() + mock_gcp_telemetry_cls.return_value = mock_manager + add_firebase_telemetry() - mock_add_gcp.assert_called_once_with(force_export=False) + + mock_gcp_telemetry_cls.assert_called_once() + mock_manager.initialize.assert_called_once() + + +@patch('genkit.plugins.firebase.telemetry.GcpTelemetry') +def test_add_firebase_telemetry_with_config(mock_gcp_telemetry_cls: MagicMock) -> None: + """Test add_firebase_telemetry accepts config object.""" + mock_manager = MagicMock() + mock_gcp_telemetry_cls.return_value = mock_manager + + config = FirebaseTelemetryConfig( + project_id='test-project', + log_input_and_output=True, + force_dev_export=True, + ) + add_firebase_telemetry(config) + + mock_gcp_telemetry_cls.assert_called_once_with( + project_id='test-project', + credentials=None, + sampler=None, + log_input_and_output=True, + force_dev_export=True, + disable_metrics=False, + disable_traces=False, + metric_export_interval_ms=None, + metric_export_timeout_ms=None, + ) + mock_manager.initialize.assert_called_once() def test_define_firestore_vector_store_exported() -> None: @@ -49,6 +82,18 @@ def test_define_firestore_vector_store_exported() -> None: def test_add_firebase_telemetry_raises_on_missing_deps() -> None: """Test that an informative ImportError is raised if telemetry deps are missing.""" # Temporarily remove the module from sys.modules to simulate it not being installed. - with patch.dict(sys.modules, {'genkit.plugins.google_cloud.telemetry.tracing': None}): + with patch.dict(sys.modules, {'genkit.plugins.firebase.telemetry': None}): with pytest.raises(ImportError, match='Firebase telemetry requires the Google Cloud telemetry exporter'): add_firebase_telemetry() + + +def test_firebase_telemetry_config_validation() -> None: + """Test Pydantic validation on FirebaseTelemetryConfig.""" + # Valid config + config = FirebaseTelemetryConfig(project_id='test-project') + assert config.project_id == 'test-project' + assert config.log_input_and_output is False + + # Invalid metric interval (< 1000ms) should raise ValidationError + with pytest.raises(ValueError): + FirebaseTelemetryConfig(metric_export_interval_ms=500) diff --git a/py/plugins/google-cloud/pyproject.toml b/py/plugins/google-cloud/pyproject.toml index f7b01ff9fe..5ca3c4a6cf 100644 --- a/py/plugins/google-cloud/pyproject.toml +++ b/py/plugins/google-cloud/pyproject.toml @@ -44,6 +44,7 @@ classifiers = [ ] dependencies = [ "genkit", + "google-cloud-logging>=3.10.0", "opentelemetry-exporter-gcp-trace>=1.9.0", "opentelemetry-exporter-gcp-monitoring>=1.9.0", "strenum>=0.4.15; python_version < '3.11'", diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/action.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/action.py index 8ae45a1673..a88684c72c 100644 --- a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/action.py +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/action.py @@ -37,6 +37,7 @@ import structlog from opentelemetry.sdk.trace import ReadableSpan +from .gcp_logger import gcp_logger from .utils import ( create_common_log_attributes, extract_outer_feature_name_from_path, @@ -103,7 +104,7 @@ def _write_log( session_id: str | None, thread_name: str | None, ) -> None: - """Write a structured log entry.""" + """Write a structured log entry to Cloud Logging.""" path = truncate_path(to_display_path(qualified_path)) metadata = { **create_common_log_attributes(span, project_id), @@ -117,7 +118,7 @@ def _write_log( if thread_name: metadata['threadName'] = thread_name - logger.info(f'{tag}[{path}, {feature_name}]', **metadata) + gcp_logger.log_structured(f'{tag}[{path}, {feature_name}]', metadata) # Singleton instance diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/config.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/config.py new file mode 100644 index 0000000000..5b7181b4ca --- /dev/null +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/config.py @@ -0,0 +1,320 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Configuration management for GCP telemetry. + +This module handles project ID resolution, telemetry configuration, +and initialization of tracing, metrics, and logging. +""" + +import logging +import os +import uuid +from collections.abc import Mapping +from typing import Any + +import structlog +from opentelemetry import metrics +from opentelemetry.exporter.cloud_monitoring import CloudMonitoringMetricsExporter +from opentelemetry.resourcedetector.gcp_resource_detector import GoogleCloudResourceDetector +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import SERVICE_INSTANCE_ID, SERVICE_NAME, Resource +from opentelemetry.sdk.trace.sampling import Sampler +from opentelemetry.trace import get_current_span, span as trace_span + +from genkit.core.environment import is_dev_environment +from genkit.core.logging import get_logger +from genkit.core.tracing import add_custom_exporter + +from .constants import ( + DEFAULT_METRIC_EXPORT_INTERVAL_MS, + DEV_METRIC_EXPORT_INTERVAL_MS, + MIN_METRIC_EXPORT_INTERVAL_MS, + PROJECT_ID_ENV_VARS, +) +from .exporters import handle_metric_error, handle_tracing_error +from .metrics_exporter import GenkitMetricExporter +from .trace_exporter import GcpAdjustingTraceExporter, GenkitGCPExporter + +logger = get_logger(__name__) + + +def resolve_project_id( + project_id: str | None = None, + credentials: dict[str, Any] | None = None, +) -> str | None: + """Resolve the GCP project ID from multiple sources. + + Resolution order (matching JS/Go): + 1. Explicit project_id parameter + 2. FIREBASE_PROJECT_ID environment variable + 3. GOOGLE_CLOUD_PROJECT environment variable + 4. GCLOUD_PROJECT environment variable + 5. Project ID from credentials + + Args: + project_id: Explicitly provided project ID. + credentials: Optional credentials dict with project_id. + + Returns: + The resolved project ID or None. + """ + if project_id: + return project_id + + # Check environment variables in order of priority + for env_var in PROJECT_ID_ENV_VARS: + env_value = os.environ.get(env_var) + if env_value: + return env_value + + # Check credentials for project_id + if credentials and 'project_id' in credentials: + return credentials['project_id'] + + return None + + +class GcpTelemetry: + """Central manager for GCP Telemetry configuration. + + Encapsulates configuration and manages the lifecycle of Tracing, Metrics, + and Logging setup, ensuring consistent state (like project_id) across all + telemetry components. + """ + + def __init__( + self, + project_id: str | None = None, + credentials: dict[str, Any] | None = None, + sampler: Sampler | None = None, + log_input_and_output: bool = False, + force_dev_export: bool = False, + disable_metrics: bool = False, + disable_traces: bool = False, + metric_export_interval_ms: int | None = None, + metric_export_timeout_ms: int | None = None, + ) -> None: + """Initialize the GCP Telemetry manager. + + Args: + project_id: GCP project ID. + credentials: Optional credentials dict. + sampler: Trace sampler. + log_input_and_output: If False, hides sensitive data. + force_dev_export: Check to force export in dev environment. + disable_metrics: If True, metrics are not exported. + disable_traces: If True, traces are not exported. + metric_export_interval_ms: Export interval in ms. + metric_export_timeout_ms: Export timeout in ms. + """ + self.credentials = credentials + self.sampler = sampler + self.log_input_and_output = log_input_and_output + self.force_dev_export = force_dev_export + self.disable_metrics = disable_metrics + self.disable_traces = disable_traces + + # Resolve project ID immediately + self.project_id = resolve_project_id(project_id, credentials) + + # Determine metric export settings + is_dev = is_dev_environment() + + default_interval = DEV_METRIC_EXPORT_INTERVAL_MS if is_dev else DEFAULT_METRIC_EXPORT_INTERVAL_MS + self.metric_export_interval_ms = metric_export_interval_ms or default_interval + + if self.metric_export_interval_ms < MIN_METRIC_EXPORT_INTERVAL_MS: + logger.warning( + f'metric_export_interval_ms ({self.metric_export_interval_ms}) is below minimum ' + f'({MIN_METRIC_EXPORT_INTERVAL_MS}), using minimum' + ) + self.metric_export_interval_ms = MIN_METRIC_EXPORT_INTERVAL_MS + + self.metric_export_timeout_ms = metric_export_timeout_ms or self.metric_export_interval_ms + + def _build_exporter_kwargs(self) -> dict[str, Any]: + """Build kwargs dict for exporters with project_id and credentials. + + Returns: + A dict with project_id and/or credentials if available, empty dict otherwise. + """ + kwargs: dict[str, Any] = {} + if self.project_id: + kwargs['project_id'] = self.project_id + if self.credentials: + kwargs['credentials'] = self.credentials + return kwargs + + def initialize(self) -> None: + """Actuates the telemetry configuration. + + CRITICAL: This method MUST be called to initialize telemetry handlers + even in dev mode. The 'export' flag controls whether data is sent to + GCP, but initialization is ALWAYS required for proper operation. + """ + is_dev = is_dev_environment() + should_export = self.force_dev_export or not is_dev + + # ALWAYS configure logging (required for telemetry handlers) + # The export flag is passed down to control Cloud Logging export + self._configure_logging() + + # Only configure tracing/metrics if exporting (performance optimization) + if should_export: + self._configure_tracing() + self._configure_metrics() + logger.info( + 'Telemetry fully initialized', + project_id=self.project_id, + export_enabled=True, + environment='dev' if is_dev else 'prod', + force_dev_export=self.force_dev_export, + ) + else: + logger.debug( + 'Telemetry initialized in local-only mode', + export_enabled=False, + environment='dev', + note='Use force_dev_export=True for full AIM visibility in dev', + ) + + def _configure_logging(self) -> None: + """Configure structlog with Cloud Logging export and trace correlation.""" + from .gcp_logger import gcp_logger + + is_dev = is_dev_environment() + should_export = self.force_dev_export or not is_dev + + # Initialize the GCP logger for telemetry modules + gcp_logger.initialize( + project_id=self.project_id, + credentials=self.credentials, + export=should_export, + ) + + # Configure structlog processors for trace correlation + try: + current_config = structlog.get_config() + processors = list(current_config.get('processors', [])) + + # Early return if already configured + if any(getattr(p, '__name__', '') == '_genkit_inject_trace_context' for p in processors): + return + + # Define processor function that captures self + def _genkit_inject_trace_context( + logger_instance: logging.Logger, + method_name: str, + event_dict: dict[str, Any], + ) -> Mapping[str, Any]: + return self._inject_trace_context(logger_instance, method_name, event_dict) + + # Append processor to chain + processors.append(_genkit_inject_trace_context) + structlog.configure(processors=processors) + logger.debug('Configured structlog for GCP trace correlation') + + except Exception as e: + logger.warning('Failed to configure structlog for trace correlation', error=str(e)) + + def _configure_tracing(self) -> None: + if self.disable_traces: + return + + try: + exporter_kwargs = self._build_exporter_kwargs() + base_exporter = GenkitGCPExporter(**exporter_kwargs) if exporter_kwargs else GenkitGCPExporter() + + trace_exporter = GcpAdjustingTraceExporter( + exporter=base_exporter, + log_input_and_output=self.log_input_and_output, + project_id=self.project_id, + error_handler=handle_tracing_error, + ) + + add_custom_exporter(trace_exporter, 'gcp_telemetry_server') + except Exception as e: + handle_tracing_error(e) + + def _configure_metrics(self) -> None: + if self.disable_metrics: + return + + try: + resource = Resource.create({ + SERVICE_NAME: 'genkit', + SERVICE_INSTANCE_ID: str(uuid.uuid4()), + }) + + # Suppress detector warnings during GCP resource detection + detector_logger = logging.getLogger('opentelemetry.resourcedetector.gcp_resource_detector') + original_level = detector_logger.level + detector_logger.setLevel(logging.ERROR) + + try: + gcp_resource = GoogleCloudResourceDetector(raise_on_error=True).detect() + resource = resource.merge(gcp_resource) + except Exception as e: + # For detection failure log the exception and use the default resource + detector_logger.warning(f'Google Cloud resource detection failed: {e}') + finally: + detector_logger.setLevel(original_level) + + exporter_kwargs = self._build_exporter_kwargs() + cloud_monitoring_exporter = CloudMonitoringMetricsExporter(**exporter_kwargs) + + metrics_exporter = GenkitMetricExporter( + exporter=cloud_monitoring_exporter, + error_handler=handle_metric_error, + ) + + reader = PeriodicExportingMetricReader( + metrics_exporter, + export_interval_millis=self.metric_export_interval_ms, + export_timeout_millis=self.metric_export_timeout_ms, + ) + + provider = MeterProvider(metric_readers=[reader], resource=resource) + metrics.set_meter_provider(provider) + + except Exception as e: + handle_metric_error(e) + + def _inject_trace_context( + self, logger: logging.Logger, method_name: str, event_dict: dict[str, Any] + ) -> dict[str, Any]: + """Structlog processor to inject GCP-compatible trace context.""" + # Only inject if event_dict is a dict or mapping + if not isinstance(event_dict, dict) and not hasattr(event_dict, '__setitem__'): + return event_dict + + span = get_current_span() + if span == trace_span.INVALID_SPAN: + return event_dict + + ctx = span.get_span_context() + if not ctx.is_valid: + return event_dict + + if self.project_id: + event_dict['logging.googleapis.com/trace'] = f'projects/{self.project_id}/traces/{ctx.trace_id:032x}' + + event_dict['logging.googleapis.com/spanId'] = f'{ctx.span_id:016x}' + event_dict['logging.googleapis.com/trace_sampled'] = '1' if ctx.trace_flags.sampled else '0' + + return event_dict diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/constants.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/constants.py new file mode 100644 index 0000000000..54e5baba23 --- /dev/null +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/constants.py @@ -0,0 +1,50 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Constants for GCP telemetry. + +This module centralizes all constants used across the GCP telemetry +implementation, matching the pattern from JS/Go implementations. +""" + +# Metric export intervals (matching JS/Go implementations) +MIN_METRIC_EXPORT_INTERVAL_MS = 5000 +DEFAULT_METRIC_EXPORT_INTERVAL_MS = 300000 +DEV_METRIC_EXPORT_INTERVAL_MS = 5000 +PROD_METRIC_EXPORT_INTERVAL_MS = 300000 + +# Project ID environment variables (resolution order) +# Priority: FIREBASE_PROJECT_ID > GOOGLE_CLOUD_PROJECT > GCLOUD_PROJECT +PROJECT_ID_ENV_VARS = ( + 'FIREBASE_PROJECT_ID', + 'GOOGLE_CLOUD_PROJECT', + 'GCLOUD_PROJECT', +) + +# Retry configuration for trace export to Cloud Trace +TRACE_RETRY_INITIAL = 0.1 +TRACE_RETRY_MAXIMUM = 30.0 +TRACE_RETRY_MULTIPLIER = 2 +TRACE_RETRY_DEADLINE = 120.0 + +# Time adjustment for GCP span requirements +# GCP requires end_time > start_time, so we add 1 microsecond minimum duration +MIN_SPAN_DURATION_NS = 1000 # 1 microsecond in nanoseconds + +# Start time adjustment for metrics to prevent DELTA->CUMULATIVE overlap +# Cloud Monitoring converts DELTA to CUMULATIVE, causing overlap issues +# We add 1 millisecond to ensure discrete export timeframes +METRIC_START_TIME_ADJUSTMENT_NS = 1_000_000 # 1 millisecond in nanoseconds diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/engagement.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/engagement.py index fdded69683..c56e5f3054 100644 --- a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/engagement.py +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/engagement.py @@ -42,6 +42,7 @@ from genkit.core import GENKIT_VERSION +from .gcp_logger import gcp_logger from .utils import create_common_log_attributes, truncate logger = structlog.get_logger(__name__) @@ -130,7 +131,7 @@ def _write_user_feedback( if text_feedback: metadata['textFeedback'] = truncate(str(text_feedback)) - logger.info(f'UserFeedback[{name}]', **metadata) + gcp_logger.log_structured(f'UserFeedback[{name}]', metadata) def _write_user_acceptance( self, @@ -154,7 +155,7 @@ def _write_user_acceptance( **create_common_log_attributes(span, project_id), 'acceptanceValue': acceptance_value, } - logger.info(f'UserAcceptance[{name}]', **metadata) + gcp_logger.log_structured(f'UserAcceptance[{name}]', metadata) def _extract_trace_name(self, attrs: dict[str, Any]) -> str: """Extract the trace name from span attributes.""" diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/exporters.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/exporters.py new file mode 100644 index 0000000000..fa435bf2da --- /dev/null +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/exporters.py @@ -0,0 +1,112 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Base exporter utilities for GCP telemetry. + +This module provides reusable error handling and base utilities for +trace and metrics exporters, eliminating code duplication. +""" + +import structlog + +logger = structlog.get_logger(__name__) + + +class ErrorHandler: + """Manages error handling for telemetry exports. + + Ensures error messages are logged only once to avoid spam, while still + logging subsequent errors without the detailed help text. + + This replaces the previous pattern of module-level boolean flags and + separate error handler functions for tracing and metrics. + """ + + def __init__(self) -> None: + """Initialize the error handler.""" + self._logged = False + + def handle( + self, + error: Exception, + error_message: str, + help_text: str, + ) -> None: + """Handle export error with one-time detailed logging. + + Args: + error: The exception that occurred. + error_message: Brief description of what failed. + help_text: Detailed help text shown only on first error. + """ + if not self._logged: + self._logged = True + logger.error(f'{error_message}\n{help_text}\nError: {error}') + else: + logger.error(f'{error_message}: {error}') + + +# Singleton error handlers for tracing and metrics +_tracing_error_handler = ErrorHandler() +_metrics_error_handler = ErrorHandler() + +# Help text for tracing errors (GCP IAM requirements) +TRACING_HELP_TEXT = 'Ensure the service account has the "Cloud Trace Agent" (roles/cloudtrace.agent) role.' + +# Help text for metrics errors (GCP IAM requirements) +METRICS_HELP_TEXT = ( + 'Ensure the service account has the "Monitoring Metric Writer" ' + '(roles/monitoring.metricWriter) or "Cloud Telemetry Metrics Writer" ' + '(roles/telemetry.metricsWriter) role.' +) + + +def handle_tracing_error(error: Exception) -> None: + """Handle trace export errors with helpful messages. + + Only logs detailed instructions once to avoid spam. + + Args: + error: The export error. + """ + error_str = str(error).lower() + if 'permission' in error_str or 'denied' in error_str or '403' in error_str: + _tracing_error_handler.handle( + error, + 'Unable to send traces to Google Cloud.', + TRACING_HELP_TEXT, + ) + else: + logger.error('Error exporting traces to GCP', error=str(error)) + + +def handle_metric_error(error: Exception) -> None: + """Handle metrics export errors with helpful messages. + + Only logs detailed instructions once to avoid spam. + + Args: + error: The export error. + """ + error_str = str(error).lower() + if 'permission' in error_str or 'denied' in error_str or '403' in error_str: + _metrics_error_handler.handle( + error, + 'Unable to send metrics to Google Cloud.', + METRICS_HELP_TEXT, + ) + else: + logger.error('Error exporting metrics to GCP', error=str(error)) diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/feature.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/feature.py index eee41639de..0f51329ee6 100644 --- a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/feature.py +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/feature.py @@ -39,6 +39,7 @@ from genkit.core import GENKIT_VERSION +from .gcp_logger import gcp_logger from .utils import ( create_common_log_attributes, extract_error_name, @@ -165,7 +166,7 @@ def _write_log( session_id: str | None, thread_name: str | None, ) -> None: - """Write a structured log entry.""" + """Write a structured log entry to Cloud Logging.""" path = truncate_path(to_display_path(qualified_path)) metadata = { **create_common_log_attributes(span, project_id), @@ -179,7 +180,7 @@ def _write_log( if thread_name: metadata['threadName'] = thread_name - logger.info(f'{tag}[{path}, {feature_name}]', **metadata) + gcp_logger.log_structured(f'{tag}[{path}, {feature_name}]', metadata) # Singleton instance diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/gcp_logger.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/gcp_logger.py new file mode 100644 index 0000000000..f2345b973f --- /dev/null +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/gcp_logger.py @@ -0,0 +1,246 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""GCP Cloud Logging integration for Genkit telemetry. + +This module provides a logger that writes structured logs directly to +Google Cloud Logging with trace correlation, enabling visibility in +the Firebase AIM dashboard. + +This is analogous to the JavaScript implementation in: +- js/plugins/google-cloud/src/gcpLogger.ts + +Usage: + from genkit.plugins.google_cloud.telemetry.gcp_logger import gcp_logger + + # Initialize during telemetry setup + gcp_logger.initialize(project_id="my-project", credentials=creds, export=True) + + # Write structured logs + gcp_logger.log_structured("Input[path, feature]", {"content": "...", "traceId": "..."}) +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import structlog +from google.auth.credentials import Credentials +from opentelemetry import trace + +if TYPE_CHECKING: + from google.cloud.logging_v2 import Logger as CloudLogger + +logger = structlog.get_logger(__name__) + + +class GcpLogger: + """Logger for writing structured logs to Cloud Logging. + + This class provides a simple interface for writing logs that appear + in the Firebase AIM dashboard. It writes directly to Cloud Logging + using the google-cloud-logging client library. + """ + + def __init__(self) -> None: + """Initialize logger state.""" + self._initialized = False + self._export = False + self._project_id: str | None = None + self._cloud_logger: CloudLogger | None = None + + def initialize( + self, + *, + project_id: str | None = None, + credentials: Credentials | dict[str, Any] | None = None, + export: bool = False, + ) -> None: + """Initialize the GCP logger. + + This method MUST be called before any log_structured() calls. + + Args: + project_id: GCP project ID (required if export=True). + credentials: GCP credentials (Credentials object or dict). + export: Whether to export logs to Cloud Logging (GCP). + + Behavior: + - export=False: Local logging only (console output) + - export=True: Logs sent to Cloud Logging for AIM visibility + """ + if self._initialized: + logger.debug('GcpLogger already initialized, skipping re-initialization') + return + + self._export = export + self._project_id = project_id + + if not export: + logger.info( + 'GcpLogger initialized in LOCAL mode', + export=False, + project_id=project_id or '', + note='Logs written to console only, not exported to GCP', + ) + self._initialized = True + return + + # Export mode: validate required configuration + if not project_id: + logger.error( + 'GcpLogger initialization FAILED: project_id required for export=True', + export=True, + project_id=None, + consequence='Telemetry logs will NOT appear in Cloud Logging or AIM dashboard', + fix='Provide project_id when calling add_firebase_telemetry()', + ) + self._initialized = True # Mark initialized to prevent repeated errors + return + + try: + from google.cloud import logging as cloud_logging + + # Cloud Logging Client accepts Credentials object or None + # If credentials is a dict, let it use Application Default Credentials + creds = credentials if isinstance(credentials, Credentials) else None + + client = cloud_logging.Client( + project=project_id, + credentials=creds, + ) + self._cloud_logger = client.logger('genkit_log') + logger.info( + 'GcpLogger initialized for CLOUD LOGGING export', + export=True, + project_id=project_id, + log_name='genkit_log', + consequence='Logs will appear in Cloud Logging and AIM dashboard', + ) + except ImportError: + logger.error( + 'GcpLogger initialization FAILED: google-cloud-logging not installed', + export=True, + project_id=project_id, + consequence='Telemetry logs will NOT be exported to Cloud Logging', + fix='Install with: pip install google-cloud-logging>=3.10.0', + ) + except Exception as e: + logger.error( + 'GcpLogger initialization FAILED: Cloud Logging client error', + export=True, + project_id=project_id, + error=str(e), + error_type=type(e).__name__, + consequence='Telemetry logs will NOT be exported to Cloud Logging', + fix='Check credentials and project_id, ensure GCP access is configured', + ) + + self._initialized = True + + def _get_trace_context(self) -> dict[str, str]: + """Extract trace context from current span if available. + + Returns: + Dictionary with trace fields for Cloud Logging, empty if no trace. + """ + span = trace.get_current_span() + if not (span and span.is_recording()): + return {} + + ctx = span.get_span_context() + if not (ctx and ctx.trace_id): + return {} + + trace_id = format(ctx.trace_id, '032x') + span_id = format(ctx.span_id, '016x') + + return { + 'logging.googleapis.com/trace': ( + f'projects/{self._project_id}/traces/{trace_id}' if self._project_id else trace_id + ), + 'logging.googleapis.com/spanId': span_id, + 'logging.googleapis.com/trace_sampled': str(ctx.trace_flags.sampled), + } + + def _write(self, message: str, payload: dict[str, Any], severity: str) -> None: + """Write log to Cloud Logging or fallback to console. + + Args: + message: Log message for fallback logging. + payload: Structured payload with all metadata. + severity: Cloud Logging severity (INFO, ERROR). + """ + if self._export and self._cloud_logger: + try: + self._cloud_logger.log_struct(payload, severity=severity, labels={'module': 'genkit'}) + except Exception as e: + logger.error('Failed to write to Cloud Logging', error=str(e), message=message) + # Fallback to console + if severity == 'ERROR': + logger.error(message, **payload) + else: + logger.info(message, **payload) + else: + if severity == 'ERROR': + logger.error(message, **payload) + else: + logger.info(message, **payload) + + def log_structured(self, message: str, metadata: dict[str, Any] | None = None) -> None: + """Write a structured log entry. + + This method is called by telemetry handlers to write structured logs. + If not initialized, logs an error once and falls back to console logging. + + Args: + message: Log message. + metadata: Additional structured metadata. + """ + if not self._initialized: + # Log error ONCE to avoid spam + if not hasattr(self, '_logged_init_error'): + logger.error( + 'gcp_logger.log_structured() called before initialization', + message=message, + hint='Ensure GcpTelemetry.initialize() was called', + ) + self._logged_init_error = True + + # Fall back to console logging for debugging + logger.warning(f'[FALLBACK] {message}', **(metadata or {})) + return + + payload = metadata.copy() if metadata else {} + payload['message'] = message + payload.update(self._get_trace_context()) + self._write(message, payload, 'INFO') + + def log_structured_error(self, message: str, metadata: dict[str, Any] | None = None) -> None: + """Write a structured error log entry. + + Args: + message: Log message. + metadata: Additional structured metadata. + """ + payload = metadata.copy() if metadata else {} + payload['message'] = message + payload.update(self._get_trace_context()) + self._write(message, payload, 'ERROR') + + +# Singleton instance +gcp_logger = GcpLogger() diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/generate.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/generate.py index d5e3ff1cca..2429fa25ac 100644 --- a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/generate.py +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/generate.py @@ -95,6 +95,7 @@ from genkit.core import GENKIT_VERSION +from .gcp_logger import gcp_logger from .utils import ( create_common_log_attributes, extract_error_name, @@ -429,7 +430,7 @@ def _record_generate_action_config_logs( if config.get('stopSequences'): metadata['stopSequences'] = config['stopSequences'] - logger.info(f'Config[{path}, {model}]', **metadata) + gcp_logger.log_structured(f'Config[{path}, {model}]', metadata) def _record_generate_action_input_logs( self, @@ -475,7 +476,7 @@ def _record_generate_action_input_logs( 'messageIndex': msg_idx, 'totalMessages': total_messages, } - logger.info(f'Input[{path}, {model}] {part_counts}', **metadata) + gcp_logger.log_structured(f'Input[{path}, {model}] {part_counts}', metadata) def _record_generate_action_output_logs( self, @@ -527,7 +528,7 @@ def _record_generate_action_output_logs( if finish_message: metadata['finishMessage'] = truncate(finish_message) - logger.info(f'Output[{path}, {model}] {part_counts}', **metadata) + gcp_logger.log_structured(f'Output[{path}, {model}] {part_counts}', metadata) def _to_part_counts( self, diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/metrics_exporter.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/metrics_exporter.py new file mode 100644 index 0000000000..eb346dcb9d --- /dev/null +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/metrics_exporter.py @@ -0,0 +1,153 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Metrics exporting functionality for GCP telemetry. + +This module contains the metric exporter wrapper that adjusts start +times for Google Cloud Monitoring compatibility. +""" + +from collections.abc import Callable + +from opentelemetry.exporter.cloud_monitoring import CloudMonitoringMetricsExporter +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + MetricExporter, + MetricExportResult, + MetricsData, +) + +from .constants import METRIC_START_TIME_ADJUSTMENT_NS + + +class GenkitMetricExporter(MetricExporter): + """Metric exporter wrapper that adjusts start times for GCP compatibility. + + Cloud Monitoring does not support delta metrics for custom metrics and will + convert any DELTA aggregations to CUMULATIVE ones on export. There is implicit + overlap in the start/end times that the Metric reader sends -- the end_time + of the previous export becomes the start_time of the current export. + + This wrapper adds a microsecond to start times to ensure discrete export + timeframes and prevent data being overwritten. + + This matches the JavaScript MetricExporterWrapper in gcpOpenTelemetry.ts. + """ + + def __init__( + self, + exporter: CloudMonitoringMetricsExporter, + error_handler: Callable[[Exception], None] | None = None, + ) -> None: + """Initialize the metric exporter wrapper. + + Args: + exporter: The underlying CloudMonitoringMetricsExporter. + error_handler: Optional callback for export errors. + """ + self._exporter = exporter + self._error_handler = error_handler + + # Force DELTA temporality for all instrument types to match JS implementation. + delta = AggregationTemporality.DELTA + self._preferred_temporality = { + Counter: delta, + UpDownCounter: delta, + Histogram: delta, + ObservableCounter: delta, + ObservableUpDownCounter: delta, + ObservableGauge: delta, + } + + self._preferred_aggregation = getattr(exporter, '_preferred_aggregation', None) + + def export( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs: object, + ) -> MetricExportResult: + """Export metrics after adjusting start times. + + Modifies start times of each data point to ensure no overlap with + previous exports when GCP converts DELTA to CUMULATIVE. + + Args: + metrics_data: The metrics data to export. + timeout_millis: Export timeout in milliseconds. + **kwargs: Additional arguments for base class compatibility. + + Returns: + The export result from the wrapped exporter. + """ + # Modify start times before export + self._modify_start_times(metrics_data) + + try: + return self._exporter.export(metrics_data, timeout_millis, **kwargs) + except Exception as e: + if self._error_handler: + self._error_handler(e) + raise + + def _modify_start_times(self, metrics_data: MetricsData) -> None: + """Add 1ms to start times to prevent overlap. + + Args: + metrics_data: The metrics data to modify in-place. + """ + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + for data_point in metric.data.data_points: + # Add 1 millisecond to start time + if hasattr(data_point, 'start_time_unix_nano'): + # Modifying frozen dataclass via workaround + object.__setattr__( + data_point, + 'start_time_unix_nano', + data_point.start_time_unix_nano + METRIC_START_TIME_ADJUSTMENT_NS, + ) + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + """Delegate force flush to wrapped exporter. + + Args: + timeout_millis: Timeout in milliseconds. + + Returns: + True if flush succeeded. + """ + if hasattr(self._exporter, 'force_flush'): + return self._exporter.force_flush(timeout_millis) + return True + + def shutdown(self, timeout_millis: float = 30_000, **kwargs: object) -> None: + """Delegate shutdown to wrapped exporter. + + Args: + timeout_millis: Timeout in milliseconds. + **kwargs: Additional arguments for base class compatibility. + """ + self._exporter.shutdown(timeout_millis, **kwargs) diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/path.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/path.py index a37a87582e..b92a620610 100644 --- a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/path.py +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/path.py @@ -39,6 +39,7 @@ from genkit.core import GENKIT_VERSION +from .gcp_logger import gcp_logger from .utils import ( create_common_log_attributes, extract_error_message, @@ -150,7 +151,7 @@ def tick( if thread_name: log_attrs['threadName'] = thread_name - logger.error(f'Error[{display_path}, {error_name}]', **log_attrs) + gcp_logger.log_structured_error(f'Error[{display_path}, {error_name}]', log_attrs) # Singleton instance diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/trace_exporter.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/trace_exporter.py new file mode 100644 index 0000000000..81b5a202bb --- /dev/null +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/trace_exporter.py @@ -0,0 +1,250 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Trace exporting functionality for GCP telemetry. + +This module contains all trace-specific exporters and span wrappers +for Google Cloud Trace integration. +""" + +from collections.abc import Callable, Sequence + +import structlog +from google.api_core import exceptions as core_exceptions, retry as retries +from google.cloud.trace_v2 import BatchWriteSpansRequest +from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult + +from genkit.core.trace.adjusting_exporter import AdjustingTraceExporter, RedactedSpan + +from .action import action_telemetry +from .constants import ( + MIN_SPAN_DURATION_NS, + TRACE_RETRY_DEADLINE, + TRACE_RETRY_INITIAL, + TRACE_RETRY_MAXIMUM, + TRACE_RETRY_MULTIPLIER, +) +from .engagement import engagement_telemetry +from .feature import features_telemetry +from .generate import generate_telemetry +from .path import paths_telemetry + +logger = structlog.get_logger(__name__) + + +class GenkitGCPExporter(CloudTraceSpanExporter): + """Exports spans to Google Cloud Trace with retry logic. + + This exporter extends the base CloudTraceSpanExporter to add + robust retry handling for transient failures. + + Note: + The parent class uses google.auth.default() to get the project ID. + """ + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """Export the spans to Cloud Trace with retry logic. + + Iterates through the provided spans and exports them to GCP. + + Note: + Leverages span transformation and formatting from opentelemetry-exporter-gcp-trace. + See: https://cloud.google.com/python/docs/reference/cloudtrace/latest + + Args: + spans: A sequence of OpenTelemetry ReadableSpan objects to export. + + Returns: + SpanExportResult.SUCCESS upon successful processing (does not guarantee + server-side success), or SpanExportResult.FAILURE if an error occurs. + """ + try: + self.client.batch_write_spans( + request=BatchWriteSpansRequest( + name=f'projects/{self.project_id}', + spans=self._translate_to_cloud_trace(spans), + ), + retry=retries.Retry( + initial=TRACE_RETRY_INITIAL, + maximum=TRACE_RETRY_MAXIMUM, + multiplier=TRACE_RETRY_MULTIPLIER, + predicate=retries.if_exception_type( + core_exceptions.DeadlineExceeded, + ), + deadline=TRACE_RETRY_DEADLINE, + ), + ) + except Exception as ex: + logger.error('Error while writing to Cloud Trace', exc_info=ex) + return SpanExportResult.FAILURE + + return SpanExportResult.SUCCESS + + +class TimeAdjustedSpan(RedactedSpan): + """Wraps a span to ensure non-zero duration for GCP requirements. + + Google Cloud Trace requires end_time > start_time. This wrapper + ensures that all spans meet this requirement by adding a minimum + duration if needed. + """ + + @property + def end_time(self) -> int | None: + """Return the span end time, adjusted to meet GCP requirements. + + Returns: + The span end time, guaranteed to be > start_time if start_time exists. + """ + start = self._span.start_time + end = self._span.end_time + + # GCP requires end_time > start_time. + # If the span is unfinished (end_time is None) or has zero duration, + # we provide a minimum duration. + if start is not None: + if end is None or end <= start: + return start + MIN_SPAN_DURATION_NS + + return end + + +class GcpAdjustingTraceExporter(AdjustingTraceExporter): + """GCP-specific span exporter that adds telemetry recording. + + This extends the base AdjustingTraceExporter to add GCP-specific telemetry + recording (metrics and logs) for each span, matching the JavaScript + implementation in gcpOpenTelemetry.ts. + + The telemetry handlers record: + - Feature metrics (requests, latency) for root spans + - Path metrics for failure tracking + - Generate metrics (tokens, latency) for model actions + - Action logs for tools and generate + - Engagement metrics for user feedback + + Example: + ```python + exporter = GcpAdjustingTraceExporter( + exporter=GenkitGCPExporter(), + log_input_and_output=False, + project_id='my-project', + ) + ``` + """ + + def __init__( + self, + exporter: SpanExporter, + log_input_and_output: bool = False, + project_id: str | None = None, + error_handler: Callable[[Exception], None] | None = None, + ) -> None: + """Initialize the GCP adjusting trace exporter. + + Args: + exporter: The underlying SpanExporter to wrap. + log_input_and_output: If True, preserve input/output in spans and logs. + Defaults to False (redact for privacy). + project_id: Optional GCP project ID for log correlation. + error_handler: Optional callback invoked when export errors occur. + """ + super().__init__( + exporter=exporter, + log_input_and_output=log_input_and_output, + project_id=project_id, + error_handler=error_handler, + ) + + def _adjust(self, span: ReadableSpan) -> ReadableSpan: + """Apply all adjustments to a span including telemetry. + + This overrides the base method to add telemetry recording before + the standard adjustments (redaction, marking, normalization). + + Args: + span: The span to adjust. + + Returns: + The adjusted span with telemetry recorded and time adjusted. + """ + # Record telemetry before adjustments (uses original attributes) + span = self._tick_telemetry(span) + + # Apply standard adjustments from base class + span = super()._adjust(span) + + # Fix start/end times for GCP (must be end > start) + return TimeAdjustedSpan(span, dict(span.attributes) if span.attributes else {}) + + def _tick_telemetry(self, span: ReadableSpan) -> ReadableSpan: + """Record telemetry for a span and apply root state marking. + + This matches the JavaScript tickTelemetry method in gcpOpenTelemetry.ts. + It calls the appropriate telemetry handlers based on span type. + + Args: + span: The span to record telemetry for. + + Returns: + The span, potentially with genkit:rootState added for root spans. + """ + attrs = span.attributes or {} + if 'genkit:type' not in attrs: + return span + + span_type = attrs.get('genkit:type', '') + subtype = attrs.get('genkit:metadata:subtype', '') + is_root = bool(attrs.get('genkit:isRoot')) + + try: + # Always record path telemetry for error tracking + paths_telemetry.tick(span, self._log_input_and_output, self._project_id) + + if is_root: + # Report top level feature request and latency only for root spans + features_telemetry.tick(span, self._log_input_and_output, self._project_id) + + # Set root state explicitly + # (matches JS: span.attributes['genkit:rootState'] = span.attributes['genkit:state']) + state = attrs.get('genkit:state') + if state: + new_attrs = dict(attrs) + new_attrs['genkit:rootState'] = state + span = RedactedSpan(span, new_attrs) + else: + if span_type == 'action' and subtype == 'model': + # Report generate metrics for all model actions + generate_telemetry.tick(span, self._log_input_and_output, self._project_id) + + if span_type == 'action' and subtype == 'tool': + # TODO(#4359): Report input and output for tool actions (matching JS comment) + pass + + if span_type in ('action', 'flow', 'flowStep', 'util'): + # Report request and latency metrics for all actions + action_telemetry.tick(span, self._log_input_and_output, self._project_id) + + if span_type == 'userEngagement': + # Report user acceptance and feedback metrics + engagement_telemetry.tick(span, self._log_input_and_output, self._project_id) + + except Exception as e: + logger.warning('Error recording telemetry', error=str(e)) + + return span diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/tracing.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/tracing.py index c95ca99479..617a2067f5 100644 --- a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/tracing.py +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/tracing.py @@ -208,615 +208,22 @@ - Same project ID resolution order """ -import logging -import os -import uuid -from collections.abc import Callable, Mapping, MutableMapping, Sequence -from typing import Any, cast +from typing import Any import structlog -from google.api_core import exceptions as core_exceptions, retry as retries -from google.cloud.trace_v2 import BatchWriteSpansRequest -from opentelemetry import metrics, trace -from opentelemetry.exporter.cloud_monitoring import CloudMonitoringMetricsExporter -from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter -from opentelemetry.resourcedetector.gcp_resource_detector import ( - GoogleCloudResourceDetector, -) -from opentelemetry.sdk.metrics import ( - Counter, - Histogram, - MeterProvider, - ObservableCounter, - ObservableGauge, - ObservableUpDownCounter, - UpDownCounter, -) -from opentelemetry.sdk.metrics.export import ( - AggregationTemporality, - MetricExporter, - MetricExportResult, - MetricsData, - PeriodicExportingMetricReader, -) -from opentelemetry.sdk.resources import SERVICE_INSTANCE_ID, SERVICE_NAME, Resource -from opentelemetry.sdk.trace import ReadableSpan -from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.sdk.trace.sampling import Sampler -from genkit.core.environment import is_dev_environment -from genkit.core.trace.adjusting_exporter import AdjustingTraceExporter, RedactedSpan -from genkit.core.tracing import add_custom_exporter - -from .action import action_telemetry -from .engagement import engagement_telemetry -from .feature import features_telemetry -from .generate import generate_telemetry -from .path import paths_telemetry +from .config import GcpTelemetry logger = structlog.get_logger(__name__) -# Constants matching JS/Go implementations -MIN_METRIC_EXPORT_INTERVAL_MS = 5000 -DEFAULT_METRIC_EXPORT_INTERVAL_MS = 300000 -DEV_METRIC_EXPORT_INTERVAL_MS = 5000 -PROD_METRIC_EXPORT_INTERVAL_MS = 300000 - - -def _resolve_project_id( - project_id: str | None = None, - credentials: dict[str, Any] | None = None, -) -> str | None: - """Resolve the GCP project ID from various sources. - - Resolution order (matching JS/Go): - 1. Explicit project_id parameter - 2. FIREBASE_PROJECT_ID environment variable - 3. GOOGLE_CLOUD_PROJECT environment variable - 4. GCLOUD_PROJECT environment variable - 5. Project ID from credentials - - Args: - project_id: Explicitly provided project ID. - credentials: Optional credentials dict with project_id. - - Returns: - The resolved project ID or None. - """ - if project_id: - return project_id - - # Check environment variables in order of priority - for env_var in ('FIREBASE_PROJECT_ID', 'GOOGLE_CLOUD_PROJECT', 'GCLOUD_PROJECT'): - env_value = os.environ.get(env_var) - if env_value: - return env_value - - # Check credentials for project_id - if credentials and 'project_id' in credentials: - return credentials['project_id'] - - return None - - -class GcpTelemetry: - """Central manager for GCP Telemetry configuration. - - Encapsulates configuration and manages the lifecycle of Tracing, Metrics, - and Logging setup, ensuring consistent state (like project_id) across all - telemetry components. - """ - - def __init__( - self, - project_id: str | None = None, - credentials: dict[str, Any] | None = None, - sampler: Sampler | None = None, - log_input_and_output: bool = False, - force_dev_export: bool = True, - disable_metrics: bool = False, - disable_traces: bool = False, - metric_export_interval_ms: int | None = None, - metric_export_timeout_ms: int | None = None, - ) -> None: - """Initialize the GCP Telemetry manager. - - Args: - project_id: GCP project ID. - credentials: Optional credentials dict. - sampler: Trace sampler. - log_input_and_output: If False, hides sensitive data. - force_dev_export: Check to force export in dev environment. - disable_metrics: If True, metrics are not exported. - disable_traces: If True, traces are not exported. - metric_export_interval_ms: Export interval in ms. - metric_export_timeout_ms: Export timeout in ms. - """ - self.credentials = credentials - self.sampler = sampler - self.log_input_and_output = log_input_and_output - self.force_dev_export = force_dev_export - self.disable_metrics = disable_metrics - self.disable_traces = disable_traces - - # Resolve project ID immediately - self.project_id = _resolve_project_id(project_id, credentials) - - # Determine metric export settings - is_dev = is_dev_environment() - - default_interval = DEV_METRIC_EXPORT_INTERVAL_MS if is_dev else DEFAULT_METRIC_EXPORT_INTERVAL_MS - self.metric_export_interval_ms = metric_export_interval_ms or default_interval - - if self.metric_export_interval_ms < MIN_METRIC_EXPORT_INTERVAL_MS: - logger.warning( - f'metric_export_interval_ms ({self.metric_export_interval_ms}) is below minimum ' - f'({MIN_METRIC_EXPORT_INTERVAL_MS}), using minimum' - ) - self.metric_export_interval_ms = MIN_METRIC_EXPORT_INTERVAL_MS - - self.metric_export_timeout_ms = metric_export_timeout_ms or self.metric_export_interval_ms - - def initialize(self) -> None: - """Actuates the telemetry configuration.""" - is_dev = is_dev_environment() - should_export = self.force_dev_export or not is_dev - - if not should_export: - logger.debug('Telemetry export disabled in dev environment') - return - - self._configure_logging() - self._configure_tracing() - self._configure_metrics() - - def _configure_logging(self) -> None: - """Configures structlog with trace correlation.""" - try: - current_config = structlog.get_config() - processors = current_config.get('processors', []) - - # Check if our bound method is already registered (by name or other heuristic if needed) - # Since methods are bound, simple equality check might fail if new instance. - # However, for simplicity and common usage, we'll append. - # A better check would be to see if any processor matches our signature/name. - - # Simple deduplication: Check for function name in processors - if not any(getattr(p, '__name__', '') == 'inject_trace_context' for p in processors): - - def inject_trace_context( - logger: Any, # noqa: ANN401 - method_name: str, - event_dict: MutableMapping[str, Any], - ) -> Mapping[str, Any]: - return self._inject_trace_context( - cast(logging.Logger, logger), method_name, cast(dict[str, Any], event_dict) - ) - - new_processors = list(processors) - new_processors.insert(max(0, len(new_processors) - 1), inject_trace_context) - cfg = structlog.get_config() - structlog.configure( - processors=new_processors, - wrapper_class=cfg.get('wrapper_class'), - context_class=cfg.get('context_class'), - logger_factory=cfg.get('logger_factory'), - cache_logger_on_first_use=cfg.get('cache_logger_on_first_use'), - ) - logger.debug('Configured structlog for GCP trace correlation') - - except Exception as e: - logger.warning('Failed to configure structlog for trace correlation', error=str(e)) - - def _configure_tracing(self) -> None: - if self.disable_traces: - return - - exporter_kwargs: dict[str, Any] = {} - if self.project_id: - exporter_kwargs['project_id'] = self.project_id - if self.credentials: - exporter_kwargs['credentials'] = self.credentials - - base_exporter = GenkitGCPExporter(**exporter_kwargs) if exporter_kwargs else GenkitGCPExporter() - - trace_exporter = GcpAdjustingTraceExporter( - exporter=base_exporter, - log_input_and_output=self.log_input_and_output, - project_id=self.project_id, - error_handler=lambda e: _handle_tracing_error(e), - ) - - add_custom_exporter(trace_exporter, 'gcp_telemetry_server') - - def _configure_metrics(self) -> None: - if self.disable_metrics: - return - - try: - resource = Resource.create({ - SERVICE_NAME: 'genkit', - SERVICE_INSTANCE_ID: str(uuid.uuid4()), - }) - - # Suppress detector warnings during GCP resource detection - detector_logger = logging.getLogger('opentelemetry.resourcedetector.gcp_resource_detector') - original_level = detector_logger.level - detector_logger.setLevel(logging.ERROR) - - try: - gcp_resource = GoogleCloudResourceDetector(raise_on_error=True).detect() - resource = resource.merge(gcp_resource) - except Exception as e: - # For detection failure log the exception and use the default resource - detector_logger.warning(f'Google Cloud resource detection failed: {e}') - finally: - detector_logger.setLevel(original_level) - - exporter_kwargs: dict[str, Any] = {} - if self.project_id: - exporter_kwargs['project_id'] = self.project_id - if self.credentials: - exporter_kwargs['credentials'] = self.credentials - - metrics_exporter = GenkitMetricExporter( - exporter=CloudMonitoringMetricsExporter(**exporter_kwargs), - error_handler=lambda e: _handle_metric_error(e), - ) - - reader = PeriodicExportingMetricReader( - metrics_exporter, - export_interval_millis=self.metric_export_interval_ms, - export_timeout_millis=self.metric_export_timeout_ms, - ) - - provider = MeterProvider(metric_readers=[reader], resource=resource) - metrics.set_meter_provider(provider) - - except Exception as e: - _handle_metric_error(e) - - def _inject_trace_context( - self, logger: logging.Logger, method_name: str, event_dict: dict[str, Any] - ) -> dict[str, Any]: - """Structlog processor to inject GCP-compatible trace context.""" - span = trace.get_current_span() - if span == trace.INVALID_SPAN: - return event_dict - - ctx = span.get_span_context() - if not ctx.is_valid: - return event_dict - - if self.project_id: - event_dict['logging.googleapis.com/trace'] = f'projects/{self.project_id}/traces/{ctx.trace_id:032x}' - - event_dict['logging.googleapis.com/spanId'] = f'{ctx.span_id:016x}' - event_dict['logging.googleapis.com/trace_sampled'] = '1' if ctx.trace_flags.sampled else '0' - - return event_dict - - -class GenkitGCPExporter(CloudTraceSpanExporter): - """Exports spans to a GCP telemetry server. - - This exporter sends span data in a specific format to a GCP telemetry server, - for visualization and debugging. - - Super class will use google.auth.default() to get the project id. - """ - - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: - """Export the spans to Cloud Trace. - - Iterates through the provided spans, and adds an attribute to the spans. - - Note: - Leverages span transformation and formatting to opentelemetry-exporter-gcp-trace. - See: https://cloud.google.com/python/docs/reference/cloudtrace/latest - - Args: - spans: A sequence of OpenTelemetry ReadableSpan objects to export. - - Returns: - SpanExportResult.SUCCESS upon successful processing (does not guarantee - server-side success). - """ - try: - self.client.batch_write_spans( - request=BatchWriteSpansRequest( - name=f'projects/{self.project_id}', - spans=self._translate_to_cloud_trace(spans), - ), - retry=retries.Retry( - initial=0.1, - maximum=30.0, - multiplier=2, - predicate=retries.if_exception_type( - core_exceptions.DeadlineExceeded, - ), - deadline=120.0, - ), - ) - except Exception as ex: - logger.error('Error while writing to Cloud Trace', exc_info=ex) - return SpanExportResult.FAILURE - - return SpanExportResult.SUCCESS - - -class GenkitMetricExporter(MetricExporter): - """Metric exporter wrapper that adjusts start times for GCP compatibility. - - Cloud Monitoring does not support delta metrics for custom metrics and will - convert any DELTA aggregations to CUMULATIVE ones on export. There is implicit - overlap in the start/end times that the Metric reader sends -- the end_time - of the previous export becomes the start_time of the current export. - - This wrapper adds a microsecond to start times to ensure discrete export - timeframes and prevent data being overwritten. - - This matches the JavaScript MetricExporterWrapper in gcpOpenTelemetry.ts. - - Args: - exporter: The underlying CloudMonitoringMetricsExporter. - error_handler: Optional callback for export errors. - """ - - def __init__( - self, - exporter: CloudMonitoringMetricsExporter, - error_handler: Callable[[Exception], None] | None = None, - ) -> None: - """Initialize the metric exporter wrapper. - - Args: - exporter: The underlying CloudMonitoringMetricsExporter. - error_handler: Optional callback for export errors. - """ - self._exporter = exporter - self._error_handler = error_handler - - # Force DELTA temporality for all instrument types to match JS implementation. - delta = AggregationTemporality.DELTA - self._preferred_temporality = { - Counter: delta, - UpDownCounter: delta, - Histogram: delta, - ObservableCounter: delta, - ObservableUpDownCounter: delta, - ObservableGauge: delta, - } - - self._preferred_aggregation = getattr(exporter, '_preferred_aggregation', None) - - def export( - self, - metrics_data: MetricsData, - timeout_millis: float = 10_000, - **kwargs: object, - ) -> MetricExportResult: - """Export metrics after adjusting start times. - - Modifies start times of each data point to ensure no overlap with - previous exports when GCP converts DELTA to CUMULATIVE. - - Args: - metrics_data: The metrics data to export. - timeout_millis: Export timeout in milliseconds. - **kwargs: Additional arguments for base class compatibility. - - Returns: - The export result from the wrapped exporter. - """ - # Modify start times before export - self._modify_start_times(metrics_data) - - try: - return self._exporter.export(metrics_data, timeout_millis, **kwargs) - except Exception as e: - if self._error_handler: - self._error_handler(e) - raise - - def _modify_start_times(self, metrics_data: MetricsData) -> None: - """Add 1ms to start times to prevent overlap. - - Args: - metrics_data: The metrics data to modify in-place. - """ - for resource_metrics in metrics_data.resource_metrics: - for scope_metrics in resource_metrics.scope_metrics: - for metric in scope_metrics.metrics: - for data_point in metric.data.data_points: - # Add 1 millisecond (1_000_000 nanoseconds) to start time - if hasattr(data_point, 'start_time_unix_nano'): - # Modifying frozen dataclass via workaround - object.__setattr__( - data_point, - 'start_time_unix_nano', - data_point.start_time_unix_nano + 1_000_000, - ) - - def force_flush(self, timeout_millis: float = 10_000) -> bool: - """Delegate force flush to wrapped exporter. - - Args: - timeout_millis: Timeout in milliseconds. - - Returns: - True if flush succeeded. - """ - if hasattr(self._exporter, 'force_flush'): - return self._exporter.force_flush(timeout_millis) - return True - - def shutdown(self, timeout_millis: float = 30_000, **kwargs: object) -> None: - """Delegate shutdown to wrapped exporter. - - Args: - timeout_millis: Timeout in milliseconds. - **kwargs: Additional arguments for base class compatibility. - """ - self._exporter.shutdown(timeout_millis, **kwargs) - - -class TimeAdjustedSpan(RedactedSpan): - """Wraps a span to ensure non-zero duration for GCP. - - GCP Trace requires end_time > start_time. - """ - - @property - def end_time(self) -> int | None: - """Return the span end time, adjusted to be > start_time.""" - start = self._span.start_time - end = self._span.end_time - - # GCP requires end_time > start_time. - # If the span is unfinished (end_time is None) or has zero duration, - # we provide a minimum 1 microsecond duration. - if start is not None: - if end is None or end <= start: - return start + 1000 - - return end - - -class GcpAdjustingTraceExporter(AdjustingTraceExporter): - """GCP-specific span exporter that adds telemetry recording. - - This extends the base AdjustingTraceExporter to add GCP-specific telemetry - recording (metrics and logs) for each span, matching the JavaScript - implementation in gcpOpenTelemetry.ts. - - The telemetry handlers record: - - Feature metrics (requests, latency) for root spans - - Path metrics for failure tracking - - Generate metrics (tokens, latency) for model actions - - Action logs for tools and generate - - Engagement metrics for user feedback - - Example: - ```python - exporter = GcpAdjustingTraceExporter( - exporter=GenkitGCPExporter(), - log_input_and_output=False, - project_id='my-project', - ) - ``` - """ - - def __init__( - self, - exporter: SpanExporter, - log_input_and_output: bool = False, - project_id: str | None = None, - error_handler: Callable[[Exception], None] | None = None, - ) -> None: - """Initialize the GCP adjusting trace exporter. - - Args: - exporter: The underlying SpanExporter to wrap. - log_input_and_output: If True, preserve input/output in spans and logs. - Defaults to False (redact for privacy). - project_id: Optional GCP project ID for log correlation. - error_handler: Optional callback invoked when export errors occur. - """ - super().__init__( - exporter=exporter, - log_input_and_output=log_input_and_output, - project_id=project_id, - error_handler=error_handler, - ) - self._log_input_and_output = log_input_and_output - self._project_id = project_id - - def _adjust(self, span: ReadableSpan) -> ReadableSpan: - """Apply all adjustments to a span including telemetry. - - This overrides the base method to add telemetry recording before - the standard adjustments (redaction, marking, normalization). - - Args: - span: The span to adjust. - - Returns: - The adjusted span. - """ - # Record telemetry before adjustments (uses original attributes) - span = self._tick_telemetry(span) - - # Apply standard adjustments from base class - span = super()._adjust(span) - - # Fix start/end times for GCP (must be end > start) - return TimeAdjustedSpan(span, dict(span.attributes) if span.attributes else {}) - - def _tick_telemetry(self, span: ReadableSpan) -> ReadableSpan: - """Record telemetry for a span and apply root state marking. - - This matches the JavaScript tickTelemetry method in gcpOpenTelemetry.ts. - It calls the appropriate telemetry handlers based on span type. - - Args: - span: The span to record telemetry for. - - Returns: - The span, potentially with genkit:rootState added for root spans. - """ - attrs = span.attributes or {} - if 'genkit:type' not in attrs: - return span - - span_type = str(attrs.get('genkit:type', '')) - subtype = str(attrs.get('genkit:metadata:subtype', '')) - is_root = bool(attrs.get('genkit:isRoot')) - - try: - # Always record path telemetry for error tracking - paths_telemetry.tick(span, self._log_input_and_output, self._project_id) - - if is_root: - # Report top level feature request and latency only for root spans - features_telemetry.tick(span, self._log_input_and_output, self._project_id) - - # Set root state explicitly - # (matches JS: span.attributes['genkit:rootState'] = span.attributes['genkit:state']) - state = attrs.get('genkit:state') - if state: - new_attrs = dict(attrs) - new_attrs['genkit:rootState'] = state - span = RedactedSpan(span, new_attrs) - else: - if span_type == 'action' and subtype == 'model': - # Report generate metrics for all model actions - generate_telemetry.tick(span, self._log_input_and_output, self._project_id) - - if span_type == 'action' and subtype == 'tool': - # TODO(#4359): Report input and output for tool actions (matching JS comment) - pass - - if span_type in ('action', 'flow', 'flowStep', 'util'): - # Report request and latency metrics for all actions - action_telemetry.tick(span, self._log_input_and_output, self._project_id) - - if span_type == 'userEngagement': - # Report user acceptance and feedback metrics - engagement_telemetry.tick(span, self._log_input_and_output, self._project_id) - - except Exception as e: - logger.warning('Error recording telemetry', error=str(e)) - - return span - def add_gcp_telemetry( project_id: str | None = None, credentials: dict[str, Any] | None = None, sampler: Sampler | None = None, log_input_and_output: bool = False, - force_dev_export: bool = True, + force_dev_export: bool = False, disable_metrics: bool = False, disable_traces: bool = False, metric_export_interval_ms: int | None = None, @@ -919,58 +326,3 @@ def add_gcp_telemetry( ) manager.initialize() - - -# Error handling helpers (matches JS getErrorHandler pattern) -_tracing_error_logged = False -_metrics_error_logged = False - - -def _handle_tracing_error(error: Exception) -> None: - """Handle trace export errors with helpful messages. - - Only logs detailed instructions once to avoid spam. - - Args: - error: The export error. - """ - global _tracing_error_logged - if _tracing_error_logged: - return - - error_str = str(error).lower() - if 'permission' in error_str or 'denied' in error_str or '403' in error_str: - _tracing_error_logged = True - logger.error( - 'Unable to send traces to Google Cloud. ' - 'Ensure the service account has the "Cloud Trace Agent" (roles/cloudtrace.agent) role. ' - f'Error: {error}' - ) - else: - logger.error('Error exporting traces to GCP', error=str(error)) - - -def _handle_metric_error(error: Exception) -> None: - """Handle metrics export errors with helpful messages. - - Only logs detailed instructions once to avoid spam. - - Args: - error: The export error. - """ - global _metrics_error_logged - if _metrics_error_logged: - return - - error_str = str(error).lower() - if 'permission' in error_str or 'denied' in error_str or '403' in error_str: - _metrics_error_logged = True - logger.error( - 'Unable to send metrics to Google Cloud. ' - 'Ensure the service account has the "Monitoring Metric Writer" ' - '(roles/monitoring.metricWriter) or "Cloud Telemetry Metrics Writer" ' - '(roles/telemetry.metricsWriter) role. ' - f'Error: {error}' - ) - else: - logger.error('Error exporting metrics to GCP', error=str(error)) diff --git a/py/plugins/google-cloud/tests/tracing_test.py b/py/plugins/google-cloud/tests/tracing_test.py index 3e37fc8c92..1bc80984c5 100644 --- a/py/plugins/google-cloud/tests/tracing_test.py +++ b/py/plugins/google-cloud/tests/tracing_test.py @@ -39,14 +39,14 @@ def test_add_gcp_telemetry_wraps_with_gcp_adjusting_exporter() -> None: # Set production environment and clear project-related env vars to ensure project_id is None with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}, clear=False), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter') as mock_gcp_exporter, - patch('genkit.plugins.google_cloud.telemetry.tracing.GcpAdjustingTraceExporter') as mock_adjusting, - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter') as mock_add_exporter, - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector'), - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader'), - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter') as mock_gcp_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.GcpAdjustingTraceExporter') as mock_adjusting, + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter') as mock_add_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector'), + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader'), + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), ): # Remove project env vars to ensure project_id is None in the test for key in ['FIREBASE_PROJECT_ID', 'GOOGLE_CLOUD_PROJECT', 'GCLOUD_PROJECT']: @@ -82,14 +82,14 @@ def test_add_gcp_telemetry_with_log_input_and_output_enabled() -> None: """Test that log_input_and_output=True disables PII redaction (JS parity).""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GcpAdjustingTraceExporter') as mock_adjusting, - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector'), - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader'), - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GcpAdjustingTraceExporter') as mock_adjusting, + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector'), + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader'), + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -105,14 +105,14 @@ def test_add_gcp_telemetry_with_project_id() -> None: """Test that project_id is passed to GcpAdjustingTraceExporter (JS/Go parity).""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GcpAdjustingTraceExporter') as mock_adjusting, - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector'), - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader'), - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GcpAdjustingTraceExporter') as mock_adjusting, + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector'), + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader'), + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -128,8 +128,8 @@ def test_add_gcp_telemetry_skips_in_dev_without_force() -> None: """Test that telemetry is skipped in dev environment without force_dev_export (JS/Go parity).""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.DEV}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter') as mock_gcp_exporter, - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter') as mock_add_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter') as mock_gcp_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter') as mock_add_exporter, ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -145,14 +145,14 @@ def test_add_gcp_telemetry_exports_in_dev_with_force() -> None: """Test that telemetry is exported in dev environment with force_dev_export=True (JS/Go parity).""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.DEV}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter') as mock_gcp_exporter, - patch('genkit.plugins.google_cloud.telemetry.tracing.GcpAdjustingTraceExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter') as mock_add_exporter, - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector'), - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader'), - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter') as mock_gcp_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.GcpAdjustingTraceExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter') as mock_add_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector'), + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader'), + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -168,13 +168,13 @@ def test_add_gcp_telemetry_disable_traces() -> None: """Test that disable_traces=True skips trace export (JS/Go parity).""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter') as mock_gcp_exporter, - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter') as mock_add_exporter, - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector'), - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader'), - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter') as mock_gcp_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter') as mock_add_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector'), + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader'), + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -190,14 +190,14 @@ def test_add_gcp_telemetry_disable_metrics() -> None: """Test that disable_metrics=True skips metrics export (JS/Go parity).""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GcpAdjustingTraceExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector') as mock_detector, - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter') as mock_metric_exp, - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter') as mock_genkit_metric, - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader') as mock_reader, - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GcpAdjustingTraceExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector') as mock_detector, + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter') as mock_metric_exp, + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter') as mock_genkit_metric, + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader') as mock_reader, + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -215,14 +215,14 @@ def test_add_gcp_telemetry_custom_metric_interval() -> None: """Test that metric_export_interval_ms is passed correctly (JS/Go parity).""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GcpAdjustingTraceExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector'), - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader') as mock_reader, - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GcpAdjustingTraceExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector'), + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader') as mock_reader, + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -240,14 +240,14 @@ def test_add_gcp_telemetry_enforces_minimum_interval() -> None: """Test that metric_export_interval_ms enforces minimum 5000ms (GCP requirement).""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GcpAdjustingTraceExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector'), - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader') as mock_reader, - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GcpAdjustingTraceExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector'), + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader') as mock_reader, + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -262,7 +262,7 @@ def test_add_gcp_telemetry_enforces_minimum_interval() -> None: def test_resolve_project_id_from_env_vars() -> None: """Test project ID resolution from environment variables (JS/Go parity).""" - from genkit.plugins.google_cloud.telemetry.tracing import _resolve_project_id + from genkit.plugins.google_cloud.telemetry.config import resolve_project_id # Test FIREBASE_PROJECT_ID has highest priority with mock.patch.dict( @@ -273,7 +273,7 @@ def test_resolve_project_id_from_env_vars() -> None: 'GCLOUD_PROJECT': 'gcloud-project', }, ): - assert _resolve_project_id() == 'firebase-project' + assert resolve_project_id() == 'firebase-project' # Test GOOGLE_CLOUD_PROJECT is second priority with mock.patch.dict( @@ -284,47 +284,47 @@ def test_resolve_project_id_from_env_vars() -> None: }, clear=True, ): - assert _resolve_project_id() == 'gcp-project' + assert resolve_project_id() == 'gcp-project' # Test GCLOUD_PROJECT is fallback with mock.patch.dict(os.environ, {'GCLOUD_PROJECT': 'gcloud-project'}, clear=True): - assert _resolve_project_id() == 'gcloud-project' + assert resolve_project_id() == 'gcloud-project' def test_resolve_project_id_explicit_takes_precedence() -> None: """Test that explicit project_id parameter takes precedence over env vars.""" - from genkit.plugins.google_cloud.telemetry.tracing import _resolve_project_id + from genkit.plugins.google_cloud.telemetry.config import resolve_project_id with mock.patch.dict( os.environ, {'FIREBASE_PROJECT_ID': 'firebase-project'}, ): # Explicit project_id should override env var - assert _resolve_project_id(project_id='explicit-project') == 'explicit-project' + assert resolve_project_id(project_id='explicit-project') == 'explicit-project' def test_resolve_project_id_from_credentials() -> None: """Test project ID resolution from credentials dict (Go parity).""" - from genkit.plugins.google_cloud.telemetry.tracing import _resolve_project_id + from genkit.plugins.google_cloud.telemetry.config import resolve_project_id with mock.patch.dict(os.environ, {}, clear=True): # Project ID from credentials credentials = {'project_id': 'creds-project'} - assert _resolve_project_id(credentials=credentials) == 'creds-project' + assert resolve_project_id(credentials=credentials) == 'creds-project' def test_legacy_force_export_parameter() -> None: """Test that legacy force_export parameter still works but shows warning.""" with ( mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.DEV}), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitGCPExporter') as mock_gcp_exporter, - patch('genkit.plugins.google_cloud.telemetry.tracing.GcpAdjustingTraceExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.add_custom_exporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GoogleCloudResourceDetector'), - patch('genkit.plugins.google_cloud.telemetry.tracing.CloudMonitoringMetricsExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.GenkitMetricExporter'), - patch('genkit.plugins.google_cloud.telemetry.tracing.PeriodicExportingMetricReader'), - patch('genkit.plugins.google_cloud.telemetry.tracing.metrics'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter') as mock_gcp_exporter, + patch('genkit.plugins.google_cloud.telemetry.config.GcpAdjustingTraceExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.add_custom_exporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GoogleCloudResourceDetector'), + patch('genkit.plugins.google_cloud.telemetry.config.CloudMonitoringMetricsExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.GenkitMetricExporter'), + patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader'), + patch('genkit.plugins.google_cloud.telemetry.config.metrics'), patch('genkit.plugins.google_cloud.telemetry.tracing.logger') as mock_logger, ): from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry @@ -339,3 +339,25 @@ def test_legacy_force_export_parameter() -> None: # Verify exporter was still created mock_gcp_exporter.assert_called_once() + + +def test_add_gcp_telemetry_is_fail_safe() -> None: + """Test that add_gcp_telemetry does not crash if initialization fails.""" + with ( + mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}), + patch( + 'genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter', + side_effect=Exception('Auth failed'), + ), + patch('genkit.plugins.google_cloud.telemetry.config.handle_tracing_error') as mock_handler, + ): + from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry + + # This should NOT raise an exception + try: + add_gcp_telemetry() + except Exception as e: + raise AssertionError(f'add_gcp_telemetry raised an exception: {e}') from e + + # Verify error handler was called + mock_handler.assert_called_once() diff --git a/py/plugins/observability/src/genkit/plugins/observability/__init__.py b/py/plugins/observability/src/genkit/plugins/observability/__init__.py index 4231d067aa..004979f65a 100644 --- a/py/plugins/observability/src/genkit/plugins/observability/__init__.py +++ b/py/plugins/observability/src/genkit/plugins/observability/__init__.py @@ -390,6 +390,10 @@ def _inject_trace_context(event_dict: MutableMapping[str, Any]) -> MutableMappin Returns: The event dictionary with trace context added. """ + # Only inject if event_dict is a dict or mapping + if not isinstance(event_dict, dict) and not hasattr(event_dict, '__setitem__'): + return event_dict + span = trace.get_current_span() if span == trace.INVALID_SPAN: return event_dict diff --git a/py/pyproject.toml b/py/pyproject.toml index 70ac5a2bf4..ad8609db24 100644 --- a/py/pyproject.toml +++ b/py/pyproject.toml @@ -563,11 +563,13 @@ project_includes = [ # - plugins/mcp/tests: has a local `fakes` module for test mocks # - samples/framework-evaluator-demo: has `evaluator_demo` package with internal imports # - samples/framework-restaurant-demo/src: has internal imports (menu_ai, menu_schemas) +# - samples/sample-test: has `model_performance_test` module for imports search-path = [ ".", "plugins/mcp/tests", "samples/framework-evaluator-demo", "samples/framework-restaurant-demo/src", + "samples/sample-test", "samples/web-endpoints-hello", # Tools "tools/releasekit/src", diff --git a/py/samples/provider-xai-hello/pyproject.toml b/py/samples/provider-xai-hello/pyproject.toml index 9e36575382..27070fc78d 100644 --- a/py/samples/provider-xai-hello/pyproject.toml +++ b/py/samples/provider-xai-hello/pyproject.toml @@ -41,6 +41,7 @@ dependencies = [ "genkit", "genkit-plugin-xai", "genkit-plugin-google-cloud", + "genkit-plugin-firebase", "pydantic>=2.0.0", "structlog>=24.0.0", "uvloop>=0.21.0", @@ -57,6 +58,7 @@ dev = ["watchdog>=6.0.0"] [tool.uv.sources] genkit-plugin-google-cloud = { workspace = true } genkit-plugin-xai = { workspace = true } +genkit-plugin-firebase = { workspace = true } [build-system] build-backend = "hatchling.build" diff --git a/py/samples/provider-xai-hello/src/main.py b/py/samples/provider-xai-hello/src/main.py index b238631e43..7075bf3d05 100755 --- a/py/samples/provider-xai-hello/src/main.py +++ b/py/samples/provider-xai-hello/src/main.py @@ -66,7 +66,7 @@ from genkit.ai import Genkit from genkit.core.action import ActionRunContext from genkit.core.logging import get_logger -from genkit.plugins.google_cloud import add_gcp_telemetry +from genkit.plugins.firebase import add_firebase_telemetry from genkit.plugins.xai import XAI, xai_name from samples.shared import ( CalculatorInput, @@ -109,11 +109,7 @@ logger = get_logger(__name__) -# Enable GCP telemetry -add_gcp_telemetry( - project_id=os.environ.get('GCP_PROJECT_ID'), - log_input_and_output=False, -) +add_firebase_telemetry(force_dev_export=True, log_input_and_output=True) ai = Genkit( plugins=[XAI()],