From dc517ba9db35b059e06f2a9c7cfd3a8ab0beeeb3 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 2 Mar 2026 09:53:14 +0100 Subject: [PATCH] test(anthropic): Stop mocking response iterator --- .../integrations/anthropic/test_anthropic.py | 699 ++++++++++-------- 1 file changed, 398 insertions(+), 301 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 4361ba9629..8688c326a6 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,6 +1,7 @@ import pytest from unittest import mock import json +import httpx try: from unittest.mock import AsyncMock @@ -12,7 +13,7 @@ async def __call__(self, *args, **kwargs): from anthropic import Anthropic, AnthropicError, AsyncAnthropic, AsyncStream, Stream -from anthropic.types import MessageDeltaUsage, TextDelta, Usage +from anthropic.types import MessageDeltaUsage, TextDelta, Usage, MessageStreamEvent from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent from anthropic.types.content_block_stop_event import ContentBlockStopEvent @@ -67,6 +68,13 @@ async def __call__(self, *args, **kwargs): ) +def sse_chunks(events): + for event in events: + payload = event.model_dump() + chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n" + yield chunk.encode("utf-8") + + async def async_iterator(values): for value in values: yield value @@ -224,39 +232,49 @@ def test_streaming_create_message( sentry_init, capture_events, send_default_pii, include_prompts ): client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) ), - ] + ) + returned_stream = Stream( + cast_to=MessageStreamEvent, response=response, client=client + ) sentry_init( integrations=[AnthropicIntegration(include_prompts=include_prompts)], @@ -327,40 +345,48 @@ async def test_streaming_create_message_async( sentry_init, capture_events, send_default_pii, include_prompts ): client = AsyncAnthropic(api_key="z") - returned_stream = AsyncStream(cast_to=None, response=None, client=client) - returned_stream._iterator = async_iterator( - [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=MessageStreamEvent, response=response, client=client ) sentry_init( @@ -435,65 +461,85 @@ def test_streaming_create_message_with_input_json_delta( sentry_init, capture_events, send_default_pii, include_prompts ): client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - message=Message( - id="msg_0", - content=[], - model="claude-3-5-sonnet-20240620", - role="assistant", - stop_reason=None, - stop_sequence=None, - type="message", - usage=Usage(input_tokens=366, output_tokens=10), - ), - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=ToolUseBlock( - id="toolu_0", input={}, name="get_weather", type="tool_use" - ), - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="{'location':", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json=" 'S", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="an ", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="Francisco, C", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="A'}", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(stop_reason="tool_use", stop_sequence=None), - usage=MessageDeltaUsage(output_tokens=41), - type="message_delta", + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="{'location':", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json=" 'S", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="an ", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="Francisco, C", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="A'}", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + ) ), - ] + ) + returned_stream = Stream( + cast_to=MessageStreamEvent, response=response, client=client + ) sentry_init( integrations=[AnthropicIntegration(include_prompts=include_prompts)], @@ -570,70 +616,83 @@ async def test_streaming_create_message_with_input_json_delta_async( sentry_init, capture_events, send_default_pii, include_prompts ): client = AsyncAnthropic(api_key="z") - returned_stream = AsyncStream(cast_to=None, response=None, client=client) - returned_stream._iterator = async_iterator( - [ - MessageStartEvent( - message=Message( - id="msg_0", - content=[], - model="claude-3-5-sonnet-20240620", - role="assistant", - stop_reason=None, - stop_sequence=None, - type="message", - usage=Usage(input_tokens=366, output_tokens=10), - ), - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=ToolUseBlock( - id="toolu_0", input={}, name="get_weather", type="tool_use" - ), - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta( - partial_json="{'location':", type="input_json_delta" - ), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json=" 'S", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="an ", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta( - partial_json="Francisco, C", type="input_json_delta" - ), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=InputJSONDelta(partial_json="A'}", type="input_json_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(stop_reason="tool_use", stop_sequence=None), - usage=MessageDeltaUsage(output_tokens=41), - type="message_delta", - ), - ] + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="{'location':", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json=" 'S", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="an ", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="Francisco, C", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="A'}", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=MessageStreamEvent, response=response, client=client ) sentry_init( @@ -1247,39 +1306,49 @@ def test_streaming_create_message_with_system_prompt( ): """Test that system prompts are properly captured in streaming mode.""" client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) ), - ] + ) + returned_stream = Stream( + cast_to=MessageStreamEvent, response=response, client=client + ) sentry_init( integrations=[AnthropicIntegration(include_prompts=include_prompts)], @@ -1365,40 +1434,48 @@ async def test_streaming_create_message_with_system_prompt_async( ): """Test that system prompts are properly captured in streaming mode (async).""" client = AsyncAnthropic(api_key="z") - returned_stream = AsyncStream(cast_to=None, response=None, client=client) - returned_stream._iterator = async_iterator( - [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=MessageStreamEvent, response=response, client=client ) sentry_init( @@ -2372,30 +2449,40 @@ def test_input_tokens_include_cache_read_streaming(sentry_init, capture_events): Same cache-hit scenario as non-streaming, using realistic streaming events. """ client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - type="message_start", - message=Message( - id="id", - model="claude-sonnet-4-20250514", - role="assistant", - content=[], - type="message", - usage=Usage( - input_tokens=19, - output_tokens=0, - cache_read_input_tokens=2846, - cache_creation_input_tokens=0, - ), - ), - ), - MessageDeltaEvent( - type="message_delta", - delta=Delta(stop_reason="end_turn"), - usage=MessageDeltaUsage(output_tokens=14), + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + type="message_start", + message=Message( + id="id", + model="claude-sonnet-4-20250514", + role="assistant", + content=[], + type="message", + usage=Usage( + input_tokens=19, + output_tokens=0, + cache_read_input_tokens=2846, + cache_creation_input_tokens=0, + ), + ), + ), + MessageDeltaEvent( + type="message_delta", + delta=Delta(stop_reason="end_turn"), + usage=MessageDeltaUsage(output_tokens=14), + ), + ] + ) ), - ] + ) + returned_stream = Stream( + cast_to=MessageStreamEvent, response=response, client=client + ) sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) events = capture_events() @@ -2460,30 +2547,40 @@ def test_input_tokens_unchanged_without_caching(sentry_init, capture_events): def test_cache_tokens_streaming(sentry_init, capture_events): """Test cache tokens are tracked for streaming responses.""" client = Anthropic(api_key="z") - returned_stream = Stream(cast_to=None, response=None, client=client) - returned_stream._iterator = [ - MessageStartEvent( - type="message_start", - message=Message( - id="id", - model="claude-3-5-sonnet-20241022", - role="assistant", - content=[], - type="message", - usage=Usage( - input_tokens=100, - output_tokens=0, - cache_read_input_tokens=80, - cache_creation_input_tokens=20, - ), - ), - ), - MessageDeltaEvent( - type="message_delta", - delta=Delta(stop_reason="end_turn"), - usage=MessageDeltaUsage(output_tokens=10), + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + type="message_start", + message=Message( + id="id", + model="claude-3-5-sonnet-20241022", + role="assistant", + content=[], + type="message", + usage=Usage( + input_tokens=100, + output_tokens=0, + cache_read_input_tokens=80, + cache_creation_input_tokens=20, + ), + ), + ), + MessageDeltaEvent( + type="message_delta", + delta=Delta(stop_reason="end_turn"), + usage=MessageDeltaUsage(output_tokens=10), + ), + ] + ) ), - ] + ) + returned_stream = Stream( + cast_to=MessageStreamEvent, response=response, client=client + ) sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) events = capture_events()