diff --git a/temporalio/_log_utils.py b/temporalio/_log_utils.py new file mode 100644 index 000000000..854d00428 --- /dev/null +++ b/temporalio/_log_utils.py @@ -0,0 +1,80 @@ +"""Internal utilities for Temporal logging. + +This module is internal and may change at any time. +""" + +from __future__ import annotations + +from collections.abc import Mapping, MutableMapping +from typing import Any, Literal + +TemporalLogExtraMode = Literal["dict", "flatten"] +"""Mode controlling how Temporal context is added to log record extra. + +Values: + dict: (default) Add context as a nested dictionary under a single key. + This is the original behavior. Suitable for logging handlers that + support nested structures. + flatten: Add each context field as a separate top-level key with a + namespaced prefix. Values that are not primitives (str/int/float/bool) + are converted to strings. This mode is recommended for OpenTelemetry + and other logging pipelines that require flat, scalar attributes. +""" + + +def _apply_temporal_context_to_extra( + extra: MutableMapping[str, Any], + *, + key: str, + ctx: Mapping[str, Any], + mode: TemporalLogExtraMode, +) -> None: + """Apply temporal context to log record extra based on the configured mode. + + Args: + extra: The mutable extra dict to update. + key: The base key (e.g., "temporal_workflow"). In dict mode this is + used directly. In flatten mode the prefix is derived by replacing + underscores with dots (e.g., "temporal.workflow"). + ctx: The context mapping containing temporal fields. + mode: The mode controlling how context is added. + """ + if mode == "flatten": + prefix = key.replace("_", ".") + for k, v in ctx.items(): + # Ensure value is a primitive type safe for OTel attributes + if not isinstance(v, (str, int, float, bool, type(None))): + v = str(v) + extra[f"{prefix}.{k}"] = v + else: + extra[key] = dict(ctx) + + +def _update_temporal_context_in_extra( + extra: MutableMapping[str, Any], + *, + key: str, + update_ctx: Mapping[str, Any], + mode: TemporalLogExtraMode, +) -> None: + """Update existing temporal context in extra with additional fields. + + This is used when adding update info to existing workflow context. + + Args: + extra: The mutable extra dict to update. + key: The base key (e.g., "temporal_workflow"). In dict mode this is + used directly. In flatten mode the prefix is derived by replacing + underscores with dots (e.g., "temporal.workflow"). + update_ctx: Additional context fields to add/update. + mode: The mode controlling how context is added. + """ + if mode == "flatten": + prefix = key.replace("_", ".") + for k, v in update_ctx.items(): + # Ensure value is a primitive type safe for OTel attributes + if not isinstance(v, (str, int, float, bool, type(None))): + v = str(v) + extra[f"{prefix}.{k}"] = v + else: + extra.setdefault(key, {}).update(update_ctx) diff --git a/temporalio/activity.py b/temporalio/activity.py index 5e973a540..3acc2705f 100644 --- a/temporalio/activity.py +++ b/temporalio/activity.py @@ -30,6 +30,7 @@ import temporalio.common import temporalio.converter +from ._log_utils import TemporalLogExtraMode, _apply_temporal_context_to_extra from .types import CallableType if TYPE_CHECKING: @@ -474,6 +475,10 @@ class LoggerAdapter(logging.LoggerAdapter): value will be added to the ``extra`` dictionary with the entire activity info, making it present on the ``LogRecord.__dict__`` for use by others. Default is False. + temporal_extra_mode: Controls how activity context is added to log + ``extra``. Default is ``"dict"`` (current behavior). Set to + ``"flatten"`` for OpenTelemetry compatibility (scalar attributes + with ``temporal.activity.`` prefix). """ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> None: @@ -482,6 +487,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N self.activity_info_on_message = True self.activity_info_on_extra = True self.full_activity_info_on_extra = False + self.temporal_extra_mode: TemporalLogExtraMode = "dict" def process( self, msg: Any, kwargs: MutableMapping[str, Any] @@ -499,7 +505,12 @@ def process( if self.activity_info_on_extra: # Extra can be absent or None, this handles both extra = kwargs.get("extra", None) or {} - extra["temporal_activity"] = context.logger_details + _apply_temporal_context_to_extra( + extra, + key="temporal_activity", + ctx=context.logger_details, + mode=self.temporal_extra_mode, + ) kwargs["extra"] = extra if self.full_activity_info_on_extra: # Extra can be absent or None, this handles both diff --git a/temporalio/workflow.py b/temporalio/workflow.py index a1b3cb918..51370e79b 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -61,6 +61,11 @@ import temporalio.workflow from temporalio.nexus._util import ServiceHandlerT +from ._log_utils import ( + TemporalLogExtraMode, + _apply_temporal_context_to_extra, + _update_temporal_context_in_extra, +) from .types import ( AnyType, CallableAsyncNoParam, @@ -1566,6 +1571,10 @@ class LoggerAdapter(logging.LoggerAdapter): use by others. Default is False. log_during_replay: Boolean for whether logs should occur during replay. Default is False. + temporal_extra_mode: Controls how workflow context is added to log + ``extra``. Default is ``"dict"`` (current behavior). Set to + ``"flatten"`` for OpenTelemetry compatibility (scalar attributes + with ``temporal.workflow.`` prefix). Values added to ``extra`` are merged with the ``extra`` dictionary from a logging call, with values from the logging call taking precedence. I.e. the @@ -1579,6 +1588,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N self.workflow_info_on_extra = True self.full_workflow_info_on_extra = False self.log_during_replay = False + self.temporal_extra_mode: TemporalLogExtraMode = "dict" self.disable_sandbox = False def process( @@ -1599,7 +1609,12 @@ def process( if self.workflow_info_on_message: msg_extra.update(workflow_details) if self.workflow_info_on_extra: - extra["temporal_workflow"] = workflow_details + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + ctx=workflow_details, + mode=self.temporal_extra_mode, + ) if self.full_workflow_info_on_extra: extra["workflow_info"] = runtime.workflow_info() update_info = current_update_info() @@ -1608,7 +1623,12 @@ def process( if self.workflow_info_on_message: msg_extra.update(update_details) if self.workflow_info_on_extra: - extra.setdefault("temporal_workflow", {}).update(update_details) + _update_temporal_context_in_extra( + extra, + key="temporal_workflow", + update_ctx=update_details, + mode=self.temporal_extra_mode, + ) kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})} if msg_extra: diff --git a/tests/test_log_utils.py b/tests/test_log_utils.py new file mode 100644 index 000000000..2504d4992 --- /dev/null +++ b/tests/test_log_utils.py @@ -0,0 +1,244 @@ +"""Tests for Temporal logging utilities and LoggerAdapter modes.""" + +from __future__ import annotations + +import logging +from typing import Any + +import pytest + +from temporalio._log_utils import ( + _apply_temporal_context_to_extra, + _update_temporal_context_in_extra, +) + + +@pytest.fixture +def sample_context() -> dict[str, Any]: + return { + "attempt": 1, + "namespace": "default", + "run_id": "abc123", + "task_queue": "test-queue", + "workflow_id": "wf-001", + "workflow_type": "TestWorkflow", + } + + +class TestApplyTemporalContextToExtra: + """Tests for _apply_temporal_context_to_extra helper.""" + + def test_dict_mode_adds_nested_dict(self, sample_context: dict[str, Any]) -> None: + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, key="temporal_workflow", ctx=sample_context, mode="dict" + ) + + assert "temporal_workflow" in extra + assert extra["temporal_workflow"] == sample_context + # Verify it's a copy, not the same object + assert extra["temporal_workflow"] is not sample_context + + def test_flatten_mode_adds_prefixed_keys( + self, sample_context: dict[str, Any] + ) -> None: + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, key="temporal_workflow", ctx=sample_context, mode="flatten" + ) + + assert "temporal_workflow" not in extra + assert extra["temporal.workflow.attempt"] == 1 + assert extra["temporal.workflow.namespace"] == "default" + assert extra["temporal.workflow.run_id"] == "abc123" + assert extra["temporal.workflow.task_queue"] == "test-queue" + assert extra["temporal.workflow.workflow_id"] == "wf-001" + assert extra["temporal.workflow.workflow_type"] == "TestWorkflow" + + def test_flatten_mode_converts_non_primitives_to_string(self) -> None: + ctx = { + "string_val": "hello", + "int_val": 42, + "float_val": 3.14, + "bool_val": True, + "none_val": None, + "list_val": [1, 2, 3], + "dict_val": {"nested": "value"}, + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, key="temporal_test", ctx=ctx, mode="flatten" + ) + + assert extra["temporal.test.string_val"] == "hello" + assert extra["temporal.test.int_val"] == 42 + assert extra["temporal.test.float_val"] == 3.14 + assert extra["temporal.test.bool_val"] is True + assert extra["temporal.test.none_val"] is None + assert extra["temporal.test.list_val"] == "[1, 2, 3]" + assert extra["temporal.test.dict_val"] == "{'nested': 'value'}" + + +class TestUpdateTemporalContextInExtra: + """Tests for _update_temporal_context_in_extra helper.""" + + @pytest.fixture + def initial_context(self) -> dict[str, Any]: + return {"workflow_id": "wf-001", "workflow_type": "TestWorkflow"} + + @pytest.fixture + def update_context(self) -> dict[str, Any]: + return {"update_id": "upd-001", "update_name": "my_update"} + + @pytest.mark.parametrize("mode", ["dict", "flatten"]) + def test_update_merges_context( + self, + initial_context: dict[str, Any], + update_context: dict[str, Any], + mode: str, + ) -> None: + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, key="temporal_workflow", ctx=initial_context, mode=mode + ) + _update_temporal_context_in_extra( + extra, key="temporal_workflow", update_ctx=update_context, mode=mode + ) + + if mode == "dict": + assert extra["temporal_workflow"]["workflow_id"] == "wf-001" + assert extra["temporal_workflow"]["workflow_type"] == "TestWorkflow" + assert extra["temporal_workflow"]["update_id"] == "upd-001" + assert extra["temporal_workflow"]["update_name"] == "my_update" + elif mode == "flatten": + assert extra["temporal.workflow.workflow_id"] == "wf-001" + assert extra["temporal.workflow.workflow_type"] == "TestWorkflow" + assert extra["temporal.workflow.update_id"] == "upd-001" + assert extra["temporal.workflow.update_name"] == "my_update" + + +class TestActivityPrefixes: + """Tests for activity-specific key derivation.""" + + def test_activity_flatten_uses_activity_prefix(self) -> None: + ctx = { + "activity_id": "act-001", + "activity_type": "TestActivity", + "attempt": 1, + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, key="temporal_activity", ctx=ctx, mode="flatten" + ) + + assert "temporal_activity" not in extra + assert extra["temporal.activity.activity_id"] == "act-001" + assert extra["temporal.activity.activity_type"] == "TestActivity" + assert extra["temporal.activity.attempt"] == 1 + + +class TestFlattenModeOTelSafety: + """Critical tests to verify flatten mode is fully OTel-safe.""" + + @pytest.mark.parametrize("key", ["temporal_workflow", "temporal_activity"]) + def test_flatten_mode_produces_no_dicts(self, key: str) -> None: + """Verify flatten mode has zero dict values for any temporal keys.""" + ctx = { + "workflow_id": "wf-001", + "workflow_type": "TestWorkflow", + "run_id": "run-001", + "namespace": "default", + "task_queue": "test-queue", + "attempt": 1, + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra(extra, key=key, ctx=ctx, mode="flatten") + + for k, v in extra.items(): + assert not isinstance(v, dict), ( + f"Flatten mode violation: {k}={type(v).__name__} " + f"(expected primitive, got dict)" + ) + assert key not in extra + + def test_flatten_mode_with_update_produces_no_dicts(self) -> None: + """Verify flatten mode with update info still produces zero dicts.""" + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + ctx={"workflow_id": "wf-001", "workflow_type": "TestWorkflow"}, + mode="flatten", + ) + _update_temporal_context_in_extra( + extra, + key="temporal_workflow", + update_ctx={"update_id": "upd-001", "update_name": "my_update"}, + mode="flatten", + ) + + for k, v in extra.items(): + assert not isinstance(v, dict), ( + f"Flatten mode violation after update: {k}={type(v).__name__}" + ) + assert "temporal_workflow" not in extra + assert extra["temporal.workflow.workflow_id"] == "wf-001" + assert extra["temporal.workflow.update_id"] == "upd-001" + + def test_flattened_keys_do_not_conflict_with_logrecord_attrs(self) -> None: + """Verify temporal prefixed keys won't conflict with LogRecord attributes.""" + logrecord_attrs = { + "name", "msg", "args", "created", "filename", "funcName", + "levelname", "levelno", "lineno", "module", "msecs", "pathname", + "process", "processName", "relativeCreated", "stack_info", + "exc_info", "exc_text", "thread", "threadName", "message", + } + ctx = { + "msecs": 999, + "name": "workflow-name", + "workflow_id": "wf-001", + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, key="temporal_workflow", ctx=ctx, mode="flatten" + ) + + for key in extra: + assert key not in logrecord_attrs, ( + f"Key {key} conflicts with LogRecord attribute" + ) + assert key.startswith("temporal."), ( + f"Key {key} doesn't have temporal prefix" + ) + + +class TestLogRecordAccessibility: + """Tests to verify flattened attributes are accessible on LogRecord.__dict__.""" + + def test_flattened_attrs_accessible_via_record_dict(self) -> None: + ctx = {"workflow_id": "wf-001", "attempt": 1} + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, key="temporal_workflow", ctx=ctx, mode="flatten" + ) + + record = logging.LogRecord( + name="test", + level=logging.INFO, + pathname="", + lineno=0, + msg="test message", + args=(), + exc_info=None, + ) + for key, value in extra.items(): + setattr(record, key, value) + + assert record.__dict__["temporal.workflow.workflow_id"] == "wf-001" + assert record.__dict__["temporal.workflow.attempt"] == 1 + + for key in extra: + value = record.__dict__[key] + assert isinstance(value, (str, int, float, bool, type(None))), ( + f"Value for {key} is not primitive: {type(value)}" + ) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index d3bc3e8be..fdc830070 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1008,34 +1008,63 @@ async def some_activity(): ) +@pytest.mark.parametrize("temporal_extra_mode", ["dict", "flatten"]) async def test_activity_logging( client: Client, worker: ExternalWorker, shared_state_manager: SharedStateManager, + temporal_extra_mode: str, ): + """Test that activity logger produces correct log records for each extra mode.""" + @activity.defn async def say_hello(name: str) -> str: activity.logger.info(f"Called with arg: {name}") return f"Hello, {name}!" - # Create a queue, add handler to logger, call normal activity, then check + original_mode = activity.logger.temporal_extra_mode + activity.logger.temporal_extra_mode = temporal_extra_mode + handler = logging.handlers.QueueHandler(queue.Queue()) - with LogHandler.apply(activity.logger.base_logger, handler): - activity.logger.base_logger.setLevel(logging.INFO) - result = await _execute_workflow_with_activity( - client, - worker, - say_hello, - "Temporal", - shared_state_manager=shared_state_manager, - ) + try: + with LogHandler.apply(activity.logger.base_logger, handler): + activity.logger.base_logger.setLevel(logging.INFO) + result = await _execute_workflow_with_activity( + client, + worker, + say_hello, + "Temporal", + shared_state_manager=shared_state_manager, + ) + finally: + activity.logger.temporal_extra_mode = original_mode + assert result.result == "Hello, Temporal!" records: list[logging.LogRecord] = list(handler.queue.queue) # type: ignore assert len(records) > 0 - assert records[-1].message.startswith( - "Called with arg: Temporal ({'activity_id': '" - ) - assert records[-1].__dict__["temporal_activity"]["activity_type"] == "say_hello" + record = records[-1] + + if temporal_extra_mode == "dict": + # Dict mode appends context to message and uses nested dict + assert record.message.startswith("Called with arg: Temporal ({'activity_id': '") + assert record.__dict__["temporal_activity"]["activity_type"] == "say_hello" + else: + # Flatten mode uses OTel-safe scalar attributes + assert "temporal_activity" not in record.__dict__ + assert record.__dict__["temporal.activity.activity_type"] == "say_hello" + assert "temporal.activity.activity_id" in record.__dict__ + assert "temporal.activity.workflow_id" in record.__dict__ + assert "temporal.activity.workflow_run_id" in record.__dict__ + assert "temporal.activity.namespace" in record.__dict__ + assert "temporal.activity.task_queue" in record.__dict__ + assert record.__dict__["temporal.activity.attempt"] == 1 + + # Verify all temporal.activity.* values are primitives (OTel-safe) + for key, value in record.__dict__.items(): + if key.startswith("temporal.activity."): + assert isinstance( + value, (str, int, float, bool, type(None)) + ), f"Key {key} has non-primitive value: {type(value)}" async def test_activity_worker_shutdown( diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index b597a85ab..0d1f6b71f 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1991,78 +1991,96 @@ def last_signal(self) -> str: return self._last_signal -async def test_workflow_logging(client: Client): +@pytest.mark.parametrize("temporal_extra_mode", ["dict", "flatten"]) +async def test_workflow_logging(client: Client, temporal_extra_mode: str): + """Test workflow logging: extra mode formatting, replay suppression, and full_workflow_info.""" + original_mode = workflow.logger.temporal_extra_mode + original_full_info = workflow.logger.full_workflow_info_on_extra + workflow.logger.temporal_extra_mode = temporal_extra_mode workflow.logger.full_workflow_info_on_extra = True - with LogCapturer().logs_captured( - workflow.logger.base_logger, activity.logger.base_logger - ) as capturer: - # Log two signals and kill worker before completing. Need to disable - # workflow cache since we restart the worker and don't want to pay the - # sticky queue penalty. - async with new_worker( - client, LoggingWorkflow, max_cached_workflows=0 - ) as worker: - handle = await client.start_workflow( - LoggingWorkflow.run, - id=f"workflow-{uuid.uuid4()}", - task_queue=worker.task_queue, - ) - # Send some signals and updates - await handle.signal(LoggingWorkflow.my_signal, "signal 1") - await handle.signal(LoggingWorkflow.my_signal, "signal 2") - await handle.execute_update( - LoggingWorkflow.my_update, "update 1", id="update-1" - ) - await handle.execute_update( - LoggingWorkflow.my_update, "update 2", id="update-2" - ) - assert "signal 2" == await handle.query(LoggingWorkflow.last_signal) - - # Confirm logs were produced - assert capturer.find_log("Signal: signal 1 ({'attempt':") - assert capturer.find_log("Signal: signal 2") - assert capturer.find_log("Update: update 1") - assert capturer.find_log("Update: update 2") - assert capturer.find_log("Query called") - assert not capturer.find_log("Signal: signal 3") - # Also make sure it has some workflow info and correct funcName - record = capturer.find_log("Signal: signal 1") - assert ( - record - and record.__dict__["temporal_workflow"]["workflow_type"] - == "LoggingWorkflow" - and record.funcName == "my_signal" - ) - # Since we enabled full info, make sure it's there - assert isinstance(record.__dict__["workflow_info"], workflow.Info) - # Check the log emitted by the update execution. - record = capturer.find_log("Update: update 1") - assert ( - record - and record.__dict__["temporal_workflow"]["update_id"] == "update-1" - and record.__dict__["temporal_workflow"]["update_name"] == "my_update" - and "'update_id': 'update-1'" in record.message - and "'update_name': 'my_update'" in record.message - ) - # Clear queue and start a new one with more signals - capturer.log_queue.queue.clear() - async with new_worker( - client, - LoggingWorkflow, - task_queue=worker.task_queue, - max_cached_workflows=0, - ) as worker: - # Send signals and updates - await handle.signal(LoggingWorkflow.my_signal, "signal 3") - await handle.signal(LoggingWorkflow.my_signal, "finish") - await handle.result() - - # Confirm replayed logs are not present but new ones are - assert not capturer.find_log("Signal: signal 1") - assert not capturer.find_log("Signal: signal 2") - assert capturer.find_log("Signal: signal 3") - assert capturer.find_log("Signal: finish") + try: + with LogCapturer().logs_captured(workflow.logger.base_logger) as capturer: + # --- First execution: logs should appear --- + # Disable workflow cache so worker restart triggers replay + async with new_worker( + client, LoggingWorkflow, max_cached_workflows=0 + ) as worker: + handle = await client.start_workflow( + LoggingWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + await handle.signal(LoggingWorkflow.my_signal, "signal 1") + await handle.signal(LoggingWorkflow.my_signal, "signal 2") + await handle.execute_update( + LoggingWorkflow.my_update, "update 1", id="update-1" + ) + assert "signal 2" == await handle.query(LoggingWorkflow.last_signal) + + # Verify logs from first execution + record = capturer.find_log("Signal: signal 1") + assert record is not None + assert record.funcName == "my_signal" + assert capturer.find_log("Signal: signal 2") + assert capturer.find_log("Query called") + + update_record = capturer.find_log("Update: update 1") + assert update_record is not None + + # Verify full_workflow_info_on_extra + assert isinstance(record.__dict__["workflow_info"], workflow.Info) + + # Verify extra mode formatting + if temporal_extra_mode == "dict": + # Dict mode appends context to message and uses nested dict + assert "({'attempt':" in record.message + assert record.__dict__["temporal_workflow"]["workflow_type"] == "LoggingWorkflow" + assert update_record.__dict__["temporal_workflow"]["update_id"] == "update-1" + assert update_record.__dict__["temporal_workflow"]["update_name"] == "my_update" + assert "'update_id': 'update-1'" in update_record.message + else: + # Flatten mode uses OTel-safe scalar attributes + assert "temporal_workflow" not in record.__dict__ + assert record.__dict__["temporal.workflow.workflow_type"] == "LoggingWorkflow" + assert "temporal.workflow.workflow_id" in record.__dict__ + assert "temporal.workflow.run_id" in record.__dict__ + assert "temporal.workflow.namespace" in record.__dict__ + assert "temporal.workflow.task_queue" in record.__dict__ + assert record.__dict__["temporal.workflow.attempt"] == 1 + assert update_record.__dict__["temporal.workflow.update_id"] == "update-1" + assert update_record.__dict__["temporal.workflow.update_name"] == "my_update" + + # Verify all temporal.workflow.* values are primitives (OTel-safe) + for key, value in record.__dict__.items(): + if key.startswith("temporal.workflow."): + assert isinstance( + value, (str, int, float, bool, type(None)) + ), f"Key {key} has non-primitive value: {type(value)}" + + # --- Clear logs and continue execution (replay path) --- + # When the new worker starts, it replays the workflow history (signals 1 & 2). + # Replay suppression should prevent those logs from appearing again. + capturer.log_queue.queue.clear() + + async with new_worker( + client, + LoggingWorkflow, + task_queue=worker.task_queue, + max_cached_workflows=0, + ) as worker: + await handle.signal(LoggingWorkflow.my_signal, "signal 3") + await handle.signal(LoggingWorkflow.my_signal, "finish") + await handle.result() + + # --- Replay execution: no duplicate logs --- + assert not capturer.find_log("Signal: signal 1") + assert not capturer.find_log("Signal: signal 2") + assert capturer.find_log("Signal: signal 3") + assert capturer.find_log("Signal: finish") + finally: + workflow.logger.temporal_extra_mode = original_mode + workflow.logger.full_workflow_info_on_extra = original_full_info @activity.defn @@ -2126,6 +2144,18 @@ async def test_workflow_logging_task_fail(client: Client): == "task_fail_once_activity" ) + def workflow_failure_with_identifier(l: logging.LogRecord): + if ( + hasattr(l, "__temporal_error_identifier") + and getattr(l, "__temporal_error_identifier") + == "WorkflowTaskFailure" + ): + assert l.msg.startswith("Failed activation on workflow") + return True + return False + + assert capturer.find(workflow_failure_with_identifier) is not None + @workflow.defn class StackTraceWorkflow: @@ -7985,33 +8015,6 @@ async def test_quick_activity_swallows_cancellation(client: Client): assert cause.message == "Workflow cancelled" -async def test_workflow_logging_trace_identifier(client: Client): - with LogCapturer().logs_captured( - temporalio.worker._workflow_instance.logger - ) as capturer: - async with new_worker( - client, - TaskFailOnceWorkflow, - activities=[task_fail_once_activity], - ) as worker: - await client.execute_workflow( - TaskFailOnceWorkflow.run, - id="workflow_failure_trace_identifier", - task_queue=worker.task_queue, - ) - - def workflow_failure(l: logging.LogRecord): - if ( - hasattr(l, "__temporal_error_identifier") - and getattr(l, "__temporal_error_identifier") == "WorkflowTaskFailure" - ): - assert l.msg.startswith("Failed activation on workflow") - return True - return False - - assert capturer.find(workflow_failure) is not None - - @activity.defn def use_in_workflow() -> bool: return workflow.in_workflow()