Skip to content

Commit 67d02d3

Browse files
wiggzzclaude
andcommitted
Fix stateless HTTP task accumulation causing memory leak (#756)
In stateless mode, each request spawned a `run_stateless_server` task into the manager's global `_task_group`. After `handle_request()` completed and `terminate()` was called, the task continued running inside `app.run()`, blocked on `async for message in session.incoming_messages`. These zombie tasks accumulated indefinitely, leaking memory. Replace the global task group spawn with a request-scoped task group so that server tasks are automatically cancelled when their request completes. Add a regression test that simulates the real blocking behavior of `app.run()` using `anyio.sleep_forever()` and verifies no tasks linger in the global task group after requests finish. Closes #756 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0fe16dd commit 67d02d3

File tree

2 files changed

+112
-9
lines changed

2 files changed

+112
-9
lines changed

src/mcp/server/streamable_http_manager.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,12 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
151151
await self._handle_stateful_request(scope, receive, send)
152152

153153
async def _handle_stateless_request(self, scope: Scope, receive: Receive, send: Send) -> None:
154-
"""Process request in stateless mode - creating a new transport for each request."""
154+
"""Process request in stateless mode - creating a new transport for each request.
155+
156+
Uses a request-scoped task group so the server task is automatically
157+
cancelled when the request completes, preventing task accumulation in
158+
the manager's global task group.
159+
"""
155160
logger.debug("Stateless mode: Creating new transport for this request")
156161
# No session ID needed in stateless mode
157162
http_transport = StreamableHTTPServerTransport(
@@ -176,16 +181,23 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA
176181
except Exception: # pragma: no cover
177182
logger.exception("Stateless session crashed")
178183

179-
# Assert task group is not None for type checking
180-
assert self._task_group is not None
181-
# Start the server task
182-
await self._task_group.start(run_stateless_server)
184+
# Use a request-scoped task group instead of the global one.
185+
# This ensures the server task is cancelled when the request
186+
# finishes, preventing zombie tasks from accumulating.
187+
# See: https://github.com/modelcontextprotocol/python-sdk/issues/756
188+
async with anyio.create_task_group() as request_tg:
183189

184-
# Handle the HTTP request and return the response
185-
await http_transport.handle_request(scope, receive, send)
190+
async def run_request_handler(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED):
191+
task_status.started()
192+
# Handle the HTTP request and return the response
193+
await http_transport.handle_request(scope, receive, send)
194+
# Terminate the transport after the request is handled
195+
await http_transport.terminate()
196+
# Cancel the request-scoped task group to stop the server task
197+
request_tg.cancel_scope.cancel()
186198

187-
# Terminate the transport after the request is handled
188-
await http_transport.terminate()
199+
await request_tg.start(run_stateless_server)
200+
await request_tg.start(run_request_handler)
189201

190202
async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: Send) -> None:
191203
"""Process request in stateful mode - maintaining session state between requests."""

tests/server/test_streamable_http_manager.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,97 @@ async def mock_receive():
268268
assert len(transport._request_streams) == 0, "Transport should have no active request streams"
269269

270270

271+
@pytest.mark.anyio
272+
async def test_stateless_requests_task_leak_on_client_disconnect():
273+
"""Test that stateless tasks don't leak when clients disconnect mid-request.
274+
275+
Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/756
276+
277+
Reproduces the production memory leak: a client sends a tool call, the tool
278+
handler takes some time, and the client disconnects before the response is
279+
delivered. The SSE response pipeline detects the disconnect but app.run()
280+
continues in the background. After the tool finishes, the response has
281+
nowhere to go, and app.run() blocks on ``async for message in
282+
session.incoming_messages`` forever — leaking the task in the global
283+
task group.
284+
285+
The test uses real Server.run() with a real tool handler, real SSE streaming
286+
via httpx.ASGITransport, and simulates client disconnect by cancelling the
287+
request task.
288+
"""
289+
from mcp.types import CallToolResult, TextContent, Tool
290+
291+
tool_started = anyio.Event()
292+
tool_gate = anyio.Event()
293+
294+
async def handle_list_tools(
295+
ctx: ServerRequestContext, params: PaginatedRequestParams | None
296+
) -> ListToolsResult:
297+
return ListToolsResult(
298+
tools=[Tool(name="slow_tool", description="A slow tool", inputSchema={"type": "object"})]
299+
)
300+
301+
async def handle_call_tool(
302+
ctx: ServerRequestContext, params: Any
303+
) -> CallToolResult:
304+
tool_started.set()
305+
# Simulate a slow tool (e.g., API call to Discovery/Snowflake)
306+
await tool_gate.wait()
307+
return CallToolResult(content=[TextContent(type="text", text="done")])
308+
309+
app = Server(
310+
"test-stateless-leak",
311+
on_list_tools=handle_list_tools,
312+
on_call_tool=handle_call_tool,
313+
)
314+
315+
host = "testserver"
316+
mcp_app = app.streamable_http_app(host=host, stateless_http=True)
317+
318+
async with (
319+
mcp_app.router.lifespan_context(mcp_app),
320+
httpx.ASGITransport(mcp_app) as transport,
321+
):
322+
session_manager = app._session_manager
323+
324+
async def make_and_abandon_tool_call():
325+
async with httpx.AsyncClient(
326+
transport=transport, base_url=f"http://{host}", timeout=30.0
327+
) as http_client:
328+
async with Client(
329+
streamable_http_client(f"http://{host}/mcp", http_client=http_client)
330+
) as client:
331+
# Start tool call — this will block until tool completes
332+
# We'll cancel it from outside to simulate disconnect
333+
await client.call_tool("slow_tool", {})
334+
335+
num_requests = 3
336+
for _ in range(num_requests):
337+
async with anyio.create_task_group() as tg:
338+
tg.start_soon(make_and_abandon_tool_call)
339+
# Wait for the tool handler to actually start
340+
await tool_started.wait()
341+
tool_started = anyio.Event() # Reset for next iteration
342+
# Simulate client disconnect by cancelling the request
343+
tg.cancel_scope.cancel()
344+
345+
# Let the tool finish now (response has nowhere to go)
346+
tool_gate.set()
347+
tool_gate = anyio.Event() # Reset for next iteration
348+
349+
# Give tasks a chance to settle
350+
await anyio.sleep(0.1)
351+
352+
# Check for leaked tasks in the session manager's global task group
353+
await anyio.sleep(0.1)
354+
leaked = len(session_manager._task_group._tasks)
355+
356+
assert leaked == 0, (
357+
f"Expected 0 lingering tasks but found {leaked}. "
358+
f"Stateless request tasks are leaking after client disconnect."
359+
)
360+
361+
271362
@pytest.mark.anyio
272363
async def test_unknown_session_id_returns_404():
273364
"""Test that requests with unknown session IDs return HTTP 404 per MCP spec."""

0 commit comments

Comments
 (0)