Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 188 additions & 28 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
Omit = None

from anthropic.resources import AsyncMessages, Messages
from anthropic.lib.streaming._messages import MessageStreamManager

from anthropic.types import (
MessageStartEvent,
Expand All @@ -59,7 +60,13 @@
from sentry_sdk._types import TextPart

from anthropic import AsyncStream
from anthropic.types import RawMessageStreamEvent
from anthropic.types import (
RawMessageStreamEvent,
MessageParam,
ModelParam,
TextBlockParam,
ToolUnionParam,
)


class _RecordedUsage:
Expand All @@ -84,6 +91,11 @@
Messages.create = _wrap_message_create(Messages.create)
AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)

Messages.stream = _wrap_message_stream(Messages.stream)
MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter(
MessageStreamManager.__enter__
)


def _capture_exception(exc: "Any") -> None:
set_span_errored()
Expand Down Expand Up @@ -253,27 +265,32 @@
]


def _set_input_data(
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
def _common_set_input_data(
span: "Span",
integration: "AnthropicIntegration",
max_tokens: "int",
messages: "Iterable[MessageParam]",
model: "ModelParam",
system: "Optional[Union[str, Iterable[TextBlockParam]]]",
temperature: "Optional[float]",
top_k: "Optional[int]",
top_p: "Optional[float]",
tools: "Optional[Iterable[ToolUnionParam]]",
) -> None:
"""
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
"""
span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
system_instructions: "Union[str, Iterable[TextBlockParam]]" = kwargs.get("system") # type: ignore
messages = kwargs.get("messages")
if (
messages is not None
and len(messages) > 0
and len(messages) > 0 # type: ignore
and should_send_default_pii()
and integration.include_prompts
):
if isinstance(system_instructions, str) or isinstance(
system_instructions, Iterable
):
if isinstance(system, str) or isinstance(system, Iterable):
span.set_data(
SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
json.dumps(_transform_system_instructions(system_instructions)),
json.dumps(_transform_system_instructions(system)),
)

normalized_messages = []
Expand Down Expand Up @@ -329,25 +346,67 @@
span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
)

if max_tokens is not None and _is_given(max_tokens):
span.set_data(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
if model is not None and _is_given(model):
span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
if temperature is not None and _is_given(temperature):
span.set_data(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature)
if top_k is not None and _is_given(top_k):
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_K, top_k)
if top_p is not None and _is_given(top_p):
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p)

if tools is not None and _is_given(tools) and len(tools) > 0: # type: ignore
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))


def _set_create_input_data(
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
) -> None:
"""
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
"""
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False))

kwargs_keys_to_attributes = {
"max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS,
"model": SPANDATA.GEN_AI_REQUEST_MODEL,
"temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE,
"top_k": SPANDATA.GEN_AI_REQUEST_TOP_K,
"top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
}
for key, attribute in kwargs_keys_to_attributes.items():
value = kwargs.get(key)

if value is not None and _is_given(value):
span.set_data(attribute, value)

# Input attributes: Tools
tools = kwargs.get("tools")
if tools is not None and _is_given(tools) and len(tools) > 0:
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))
_common_set_input_data(
span=span,
integration=integration,
max_tokens=kwargs.get("max_tokens"), # type: ignore
messages=kwargs.get("messages"), # type: ignore
model=kwargs.get("model"),
system=kwargs.get("system"),
temperature=kwargs.get("temperature"),
top_k=kwargs.get("top_k"),
top_p=kwargs.get("top_p"),
tools=kwargs.get("tools"),
)


def _set_stream_input_data(
span: "Span",
integration: "AnthropicIntegration",
max_tokens: "int",
messages: "Iterable[MessageParam]",
model: "ModelParam",
system: "Optional[Union[str, Iterable[TextBlockParam]]]",
temperature: "Optional[float]",
top_k: "Optional[int]",
top_p: "Optional[float]",
tools: "Optional[Iterable[ToolUnionParam]]",
) -> None:
_common_set_input_data(
span=span,
integration=integration,
max_tokens=max_tokens,
messages=messages,
model=model,
system=system,
temperature=temperature,
top_k=top_k,
top_p=top_p,
tools=tools,
)


def _set_output_data(
Expand Down Expand Up @@ -543,7 +602,7 @@
)
span.__enter__()

_set_input_data(span, kwargs, integration)
_set_create_input_data(span, kwargs, integration)

result = yield f, args, kwargs

Expand Down Expand Up @@ -664,6 +723,107 @@
return _sentry_patched_create_async


def _sentry_patched_stream_common(
stream_manager: "MessageStreamManager",
max_tokens: "int",
messages: "Iterable[MessageParam]",
model: "ModelParam",
system: "Union[str, Iterable[TextBlockParam]]",
temperature: "float",
top_k: "int",
top_p: "float",
tools: "Iterable[ToolUnionParam]",
) -> None:
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

if integration is None:
return stream_manager

if messages is None:
return stream_manager

try:
iter(messages)
except TypeError:
return stream_manager

if model is None:
model = ""

span = get_start_span_function()(
op=OP.GEN_AI_CHAT,
name=f"chat {model}".strip(),
origin=AnthropicIntegration.origin,
)
span.__enter__()

span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
_set_stream_input_data(
span,
integration,
max_tokens=max_tokens,
messages=messages,
model=model,
system=system,
temperature=temperature,
top_k=top_k,
top_p=top_p,
tools=tools,
)
_patch_streaming_response_iterator(stream_manager, span, integration)

Check warning on line 773 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: code-review

Span opened but not closed if setup fails after span.__enter__()

In `_sentry_patched_stream_common`, a span is entered at line 758 (`span.__enter__()`), but if `_set_stream_input_data` or `_patch_streaming_response_iterator` raise an exception, the span is never closed. This could lead to orphaned spans in the trace. The similar `_sentry_patched_create_common` function has error handling in its callers, but `_wrap_message_stream_manager_enter` lacks equivalent exception handling.

Check warning on line 773 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: find-bugs

Span entered but never exited if exception occurs before iterator patching

In `_sentry_patched_stream_common`, the span is entered via `span.__enter__()` on line 758, but there's no try/except/finally to ensure `span.__exit__()` is called if `_set_stream_input_data()` or `_patch_streaming_response_iterator()` raises an exception. This would leave an unclosed span in Sentry's span stack, potentially corrupting tracing state. Other functions in the same file (lines 680-683, 718-721) properly handle this with `finally` blocks.


def _wrap_message_stream(f: "Any") -> "Any":
"""
Attaches user-provided arguments to the returned context manager.
The attributes are set on `gen_ai.chat` spans in the patch for the context manager.
"""

@wraps(f)
def _sentry_patched_stream(*args: "Any", **kwargs: "Any") -> "MessageStreamManager":
stream_manager = f(*args, **kwargs)

stream_manager._max_tokens = kwargs.get("max_tokens")
stream_manager._messages = kwargs.get("messages")
stream_manager._model = kwargs.get("model")
stream_manager._system = kwargs.get("system")
stream_manager._temperature = kwargs.get("temperature")
stream_manager._top_k = kwargs.get("top_k")
stream_manager._top_p = kwargs.get("top_p")
stream_manager._tools = kwargs.get("tools")

return stream_manager

return _sentry_patched_stream


def _wrap_message_stream_manager_enter(f: "Any") -> "Any":
"""
Creates and manages `gen_ai.chat` spans.
"""

@wraps(f)
def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStreamManager":
stream_manager = f(self)
if not hasattr(self, "_max_tokens"):
return stream_manager

_sentry_patched_stream_common(
stream_manager=stream_manager,
max_tokens=self._max_tokens,
messages=self._messages,
model=self._model,
system=self._system,
temperature=self._temperature,
top_k=self._top_k,
top_p=self._top_p,
tools=self._tools,
)
return stream_manager

return _sentry_patched_enter


def _is_given(obj: "Any") -> bool:
"""
Check for givenness safely across different anthropic versions.
Expand Down
Loading
Loading