From 17f2a62be8e4b6d8e2f12fb38a1c37945a4b8f7e Mon Sep 17 00:00:00 2001 From: karthik Date: Fri, 23 Jan 2026 10:38:52 -0500 Subject: [PATCH] fix: notify session when SSE stream closes without event ID Fixes #1811 When an SSE stream closes without having received any events with IDs, the client would silently return without notifying the session layer. This left `call_tool()` hanging indefinitely since the session never received a response or error. Now when the stream closes without a `last_event_id` (meaning we can't reconnect), we send a JSONRPCError with CONNECTION_CLOSED code back to the session so the request properly fails instead of hanging. Co-Authored-By: Claude Opus 4.5 --- src/mcp/client/streamable_http.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 555dd1290..d6a418b23 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -17,6 +17,7 @@ from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( + CONNECTION_CLOSED, ErrorData, InitializeResult, JSONRPCError, @@ -337,9 +338,25 @@ async def _handle_sse_response( logger.debug(f"SSE stream ended: {e}") # Stream ended without response - reconnect if we received an event with ID - if last_event_id is not None: # pragma: no branch + if last_event_id is not None: logger.info("SSE stream disconnected, reconnecting...") await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) + else: + # No event ID received - cannot reconnect, notify session of failure + request_id: RequestId | None = None + if isinstance(ctx.session_message.message, JSONRPCRequest): + request_id = ctx.session_message.message.id + + if request_id is not None: + jsonrpc_error = JSONRPCError( + jsonrpc="2.0", + id=request_id, + error=ErrorData( + code=CONNECTION_CLOSED, + message="SSE stream closed unexpectedly without response", + ), + ) + await ctx.read_stream_writer.send(SessionMessage(jsonrpc_error)) async def _handle_reconnection( self,