Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/strands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
from .event_loop._retry import ModelRetryStrategy
from .plugins import Plugin
from .tools.decorator import tool
from .types.cancellation import CancellationToken
from .types.tools import ToolContext

__all__ = [
"Agent",
"AgentBase",
"agent",
"CancellationToken",
"models",
"ModelRetryStrategy",
"Plugin",
Expand Down
15 changes: 15 additions & 0 deletions src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..event_loop._retry import ModelRetryStrategy
from ..event_loop.event_loop import INITIAL_DELAY, MAX_ATTEMPTS, MAX_DELAY, event_loop_cycle
from ..tools._tool_helpers import generate_missing_tool_result_content
from ..types.cancellation import CancellationToken

if TYPE_CHECKING:
from ..tools import ToolProvider
Expand Down Expand Up @@ -135,6 +136,7 @@ def __init__(
tool_executor: ToolExecutor | None = None,
retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY,
concurrent_invocation_mode: ConcurrentInvocationMode = ConcurrentInvocationMode.THROW,
cancellation_token: CancellationToken | None = None,
):
"""Initialize the Agent with the specified configuration.

Expand Down Expand Up @@ -201,6 +203,12 @@ def __init__(
Set to "unsafe_reentrant" to skip lock acquisition entirely, allowing concurrent invocations.
Warning: "unsafe_reentrant" makes no guarantees about resulting behavior and is provided
only for advanced use cases where the caller understands the risks.
cancellation_token: Optional token for cancelling agent execution.
When provided, the agent will check this token at strategic checkpoints during execution
(before model calls, during streaming, before tool execution) and stop gracefully if
cancellation is requested. The token can be cancelled from external contexts (other threads,
web requests, etc.) by calling token.cancel().
Defaults to None (no cancellation support).

Raises:
ValueError: If agent id contains path separators.
Expand Down Expand Up @@ -240,6 +248,9 @@ def __init__(
self.record_direct_tool_call = record_direct_tool_call
self.load_tools_from_directory = load_tools_from_directory

# Store cancellation token for graceful termination
self.cancellation_token = cancellation_token

self.tool_registry = ToolRegistry()

# Process tool list if provided
Expand Down Expand Up @@ -724,6 +735,10 @@ async def stream_async(
if invocation_state is not None:
merged_state = invocation_state

# Add cancellation token to invocation state if provided
if self.cancellation_token is not None:
merged_state["cancellation_token"] = self.cancellation_token

callback_handler = self.callback_handler
if kwargs:
callback_handler = kwargs.get("callback_handler", self.callback_handler)
Expand Down
67 changes: 67 additions & 0 deletions src/strands/event_loop/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TypedEvent,
)
from ..types.content import Message, Messages
from ..types.event_loop import Metrics, Usage
from ..types.exceptions import (
ContextWindowOverflowException,
EventLoopException,
Expand All @@ -55,6 +56,23 @@
MAX_DELAY = 240 # 4 minutes


def _should_cancel(invocation_state: dict[str, Any]) -> bool:
"""Check if cancellation has been requested.

This helper function checks the cancellation token in the invocation state
and returns True if cancellation has been requested. It's called at strategic
checkpoints throughout the event loop to enable graceful termination.

Args:
invocation_state: Invocation state containing optional cancellation token.

Returns:
True if cancellation has been requested, False otherwise.
"""
token = invocation_state.get("cancellation_token")
return token.is_cancelled() if token else False


def _has_tool_use_in_latest_message(messages: "Messages") -> bool:
"""Check if the latest message contains any ToolUse content blocks.

Expand Down Expand Up @@ -129,6 +147,22 @@ async def event_loop_cycle(
yield StartEvent()
yield StartEventLoopEvent()

# CHECKPOINT 1: Check for cancellation at start of event loop cycle
# This allows cancellation before any model or tool execution begins
if _should_cancel(invocation_state):
logger.debug(
"event_loop_cycle_id=<%s> | cancellation detected at cycle start",
invocation_state.get("event_loop_cycle_id"),
)
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)
yield EventLoopStopEvent(
"cancelled",
{"role": "assistant", "content": []},
agent.event_loop_metrics,
invocation_state["request_state"],
)
return

# Create tracer span for this event loop cycle
tracer = get_tracer()
cycle_span = tracer.start_event_loop_cycle_span(
Expand Down Expand Up @@ -307,6 +341,23 @@ async def _handle_model_execution(
# Retry loop - actual retry logic is handled by retry_strategy hook
# Hooks control when to stop retrying via the event.retry flag
while True:
# CHECKPOINT 2: Check for cancellation before model call
# This prevents unnecessary model invocations when cancellation is requested
if _should_cancel(invocation_state):
logger.debug(
"model_id=<%s> | cancellation detected before model call",
agent.model.config.get("model_id") if hasattr(agent.model, "config") else None,
)
# Return cancelled stop reason with empty message and zero usage/metrics
# since no model execution occurred
yield ModelStopReason(
stop_reason="cancelled",
message={"role": "assistant", "content": []},
usage=Usage(inputTokens=0, outputTokens=0, totalTokens=0),
metrics=Metrics(latencyMs=0),
)
return

model_id = agent.model.config.get("model_id") if hasattr(agent.model, "config") else None
model_invoke_span = tracer.start_model_invoke_span(
messages=agent.messages,
Expand Down Expand Up @@ -465,6 +516,22 @@ async def _handle_tool_execution(
tool_uses = [tool_use for tool_use in tool_uses if tool_use["toolUseId"] not in tool_use_ids]

interrupts = []

# CHECKPOINT 4: Check for cancellation before tool execution
# This prevents tool execution when cancellation is requested
if _should_cancel(invocation_state):
logger.debug("tool_count=<%d> | cancellation detected before tool execution", len(tool_uses))
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
yield EventLoopStopEvent(
"cancelled",
message,
agent.event_loop_metrics,
invocation_state["request_state"],
)
if cycle_span:
tracer.end_event_loop_cycle_span(span=cycle_span, message=message)
return

tool_events = agent.tool_executor._execute(
agent, tool_uses, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context
)
Expand Down
17 changes: 15 additions & 2 deletions src/strands/event_loop/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ToolUseStreamEvent,
TypedEvent,
)
from ..types.cancellation import CancellationToken
from ..types.citations import CitationsContentBlock
from ..types.content import ContentBlock, Message, Messages, SystemContentBlock
from ..types.streaming import (
Expand Down Expand Up @@ -368,13 +369,16 @@ def extract_usage_metrics(event: MetadataEvent, time_to_first_byte_ms: int | Non


async def process_stream(
chunks: AsyncIterable[StreamEvent], start_time: float | None = None
chunks: AsyncIterable[StreamEvent],
start_time: float | None = None,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[TypedEvent, None]:
"""Processes the response stream from the API, constructing the final message and extracting usage metrics.

Args:
chunks: The chunks of the response stream from the model.
start_time: Time when the model request is initiated
cancellation_token: Optional token to check for cancellation during streaming.

Yields:
The reason for stopping, the constructed message, and the usage metrics.
Expand All @@ -395,6 +399,14 @@ async def process_stream(
metrics: Metrics = Metrics(latencyMs=0, timeToFirstByteMs=0)

async for chunk in chunks:
# CHECKPOINT 3: Check for cancellation before processing stream chunks
# This allows cancellation during model response streaming
if cancellation_token and cancellation_token.is_cancelled():
logger.debug("cancellation detected during stream processing")
# Return cancelled stop reason with current state
yield ModelStopReason(stop_reason="cancelled", message=state["message"], usage=usage, metrics=metrics)
return

# Track first byte time when we get first content
if first_byte_time is None and ("contentBlockDelta" in chunk or "contentBlockStart" in chunk):
first_byte_time = time.time()
Expand Down Expand Up @@ -463,5 +475,6 @@ async def stream_messages(
invocation_state=invocation_state,
)

async for event in process_stream(chunks, start_time):
cancellation_token = invocation_state.get("cancellation_token") if invocation_state else None
async for event in process_stream(chunks, start_time, cancellation_token):
yield event
2 changes: 1 addition & 1 deletion src/strands/plugins/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Plugin(ABC):
"""Base class for objects that extend agent functionality.

Plugins provide a composable way to add behavior changes to agents.
They can register hooks, modify agent attributes, or perform other
They can register hooks, modify agent attributes, or perform other
setup tasks on an agent instance.

Attributes:
Expand Down
3 changes: 2 additions & 1 deletion src/strands/types/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""SDK type definitions."""

from .cancellation import CancellationToken
from .collections import PaginatedList

__all__ = ["PaginatedList"]
__all__ = ["CancellationToken", "PaginatedList"]
55 changes: 55 additions & 0 deletions src/strands/types/cancellation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Cancellation token types for graceful agent termination."""

import threading


class CancellationToken:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During design review, I would recommend bringing up the naming of this. Throughout the SDK we now use stop, interrupt, and cancel. In my mind, they have the following meanings:

  • stop: Stop the agent loop and exit out.
  • interrupt: Interrupt the agent loop to get a user response. Once received, resume from where agent loop was interrupted.
  • cancel: We use this specifically on components. For example, we can cancel a tool call that is about to happen. So it isn't so much stopping as it is about preventing something from getting started.

I am thinking cancel is the right word here because we are checking it at certain points in the loop to essentially say, don't go forward with this action. stop as a verb is probably something we should avoid using going forward. So we could still have a "stop event" or "stop reason", but the underlying reason is because of things like "interrupted", "cancelled", etc.

So with all that said, I would probably recommend calling this CancelSignal. It also fits in line with python's asyncio task.cancel() terminology.

"""Thread-safe cancellation token for graceful agent termination.

This token can be used to signal cancellation requests from any thread
and checked synchronously during agent execution. When cancelled, the
agent will stop processing and yield a stop event with interrupt reasoning.

Example:
```python
token = CancellationToken()

# In another thread or external system
token.cancel()

# In agent execution
if token.is_cancelled():
# Stop processing
pass
```

Note:
This is a minimal implementation focused on cancellation signaling.
Callback registration for resource cleanup can be added in a future
phase if resource cleanup use cases emerge.
"""

def __init__(self) -> None:
"""Initialize a new cancellation token."""
self._cancelled = False
self._lock = threading.Lock()

def cancel(self) -> None:
"""Signal cancellation request.

This method is thread-safe and can be called from any thread.
Multiple calls to cancel() are safe and idempotent.
"""
with self._lock:
self._cancelled = True

def is_cancelled(self) -> bool:
"""Check if cancellation has been requested.

This method is thread-safe and can be called from any thread.

Returns:
True if cancellation has been requested, False otherwise.
"""
with self._lock:
return self._cancelled
2 changes: 2 additions & 0 deletions src/strands/types/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Metrics(TypedDict, total=False):


StopReason = Literal[
"cancelled",
"content_filtered",
"end_turn",
"guardrail_intervened",
Expand All @@ -47,6 +48,7 @@ class Metrics(TypedDict, total=False):
]
"""Reason for the model ending its response generation.
- "cancelled": Agent execution was cancelled via CancellationToken
- "content_filtered": Content was filtered due to policy violation
- "end_turn": Normal completion of the response
- "guardrail_intervened": Guardrail system intervened
Expand Down
Loading