From 704dc13345a7e54c341ae0b2362532241b37f627 Mon Sep 17 00:00:00 2001 From: Christopher Hertel Date: Mon, 2 Feb 2026 00:06:55 +0100 Subject: [PATCH] Wire up notifications for changing prompt, resource & tools lists --- composer.json | 1 + .../change-events/ListChangingHandlers.php | 66 +++++++++++ examples/server/change-events/server.php | 46 +++++++ src/Event/Dispatcher.php | 34 ++++++ src/Event/ListenerProvider.php | 44 +++++++ src/Schema/Tool.php | 4 +- src/Server/Builder.php | 27 ++++- src/Server/ChangeListener.php | 42 +++++++ src/Server/Protocol.php | 112 +++++++++--------- tests/Unit/Capability/RegistryTest.php | 2 +- 10 files changed, 318 insertions(+), 60 deletions(-) create mode 100644 examples/server/change-events/ListChangingHandlers.php create mode 100644 examples/server/change-events/server.php create mode 100644 src/Event/Dispatcher.php create mode 100644 src/Event/ListenerProvider.php create mode 100644 src/Server/ChangeListener.php diff --git a/composer.json b/composer.json index 8803947c..5c82ee13 100644 --- a/composer.json +++ b/composer.json @@ -54,6 +54,7 @@ "autoload-dev": { "psr-4": { "Mcp\\Example\\Server\\CachedDiscovery\\": "examples/server/cached-discovery/", + "Mcp\\Example\\Server\\ChangeEvents\\": "examples/server/change-events/", "Mcp\\Example\\Server\\ClientCommunication\\": "examples/server/client-communication/", "Mcp\\Example\\Server\\ClientLogging\\": "examples/server/client-logging/", "Mcp\\Example\\Server\\CombinedRegistration\\": "examples/server/combined-registration/", diff --git a/examples/server/change-events/ListChangingHandlers.php b/examples/server/change-events/ListChangingHandlers.php new file mode 100644 index 00000000..6736a841 --- /dev/null +++ b/examples/server/change-events/ListChangingHandlers.php @@ -0,0 +1,66 @@ +registry->registerPrompt( + new Prompt($name), + static fn () => [new PromptMessage(Role::User, new TextContent($content))], + isManual: true, + ); + + return \sprintf('Prompt "%s" registered.', $name); + } + + public function addResource(string $uri, string $name): string + { + $this->registry->registerResource( + new Resource($uri, $name), + static fn () => \sprintf('This is the content of the dynamically added resource "%s" at URI "%s".', $name, $uri), + true, + ); + + return \sprintf('Resource "%s" registered.', $name); + } + + public function addTool(string $name): string + { + $this->registry->registerTool( + new Tool( + $name, + ['type' => 'object', 'properties' => new \stdClass(), 'required' => []], + 'Dynamically added tool', + null + ), + static fn () => \sprintf('This is the output of the dynamically added tool "%s".', $name), + true, + ); + + return \sprintf('Tool "%s" registered.', $name); + } +} diff --git a/examples/server/change-events/server.php b/examples/server/change-events/server.php new file mode 100644 index 00000000..5bb8f064 --- /dev/null +++ b/examples/server/change-events/server.php @@ -0,0 +1,46 @@ +info('Starting MCP Change Events Server...'); + +$listenerProvider = new ListenerProvider(); +$dispatcher = new Dispatcher($listenerProvider); +$registry = new Registry($dispatcher, logger()); +$container = container(); +$container->set(RegistryInterface::class, $registry); + +$server = Server::builder() + ->setServerInfo('Server with Changing Lists', '1.0.0') + ->setLogger(logger()) + ->setContainer($container) + ->setRegistry($registry) + ->setEventDispatcher($dispatcher) + ->setEventListenerProvider($listenerProvider) + ->addTool([ListChangingHandlers::class, 'addPrompt'], 'add_prompt', 'Tool that adds a new prompt to the registry with the given name and content.') + ->addTool([ListChangingHandlers::class, 'addResource'], 'add_resource', 'Tool that adds a new resource to the registry with the given name and URL.') + ->addTool([ListChangingHandlers::class, 'addTool'], 'add_tool', 'Tool that adds a new tool to the registry with the given name.') + ->build(); + +$result = $server->run(transport()); + +logger()->info('Server listener stopped gracefully.', ['result' => $result]); + +shutdown($result); diff --git a/src/Event/Dispatcher.php b/src/Event/Dispatcher.php new file mode 100644 index 00000000..8b4cbbbb --- /dev/null +++ b/src/Event/Dispatcher.php @@ -0,0 +1,34 @@ + + */ +final class Dispatcher implements EventDispatcherInterface +{ + public function __construct( + private readonly ListenerProvider $listenerProvider, + ) { + } + + public function dispatch(object $event): object + { + foreach ($this->listenerProvider->getListenersForEvent($event) as $listener) { + $listener($event); + } + + return $event; + } +} diff --git a/src/Event/ListenerProvider.php b/src/Event/ListenerProvider.php new file mode 100644 index 00000000..fd275672 --- /dev/null +++ b/src/Event/ListenerProvider.php @@ -0,0 +1,44 @@ + + */ +final class ListenerProvider implements ListenerProviderInterface +{ + /** + * @var array + */ + private array $listeners = []; + + public function addListener(string $eventClass, callable $listener): void + { + $this->listeners[$eventClass][] = $listener; + } + + /** + * @return iterable + */ + public function getListenersForEvent(object $event): iterable + { + if (isset($this->listeners[$event::class])) { + foreach ($this->listeners[$event::class] as $listener) { + yield $listener; + } + } + } +} diff --git a/src/Schema/Tool.php b/src/Schema/Tool.php index 6255701d..7892821e 100644 --- a/src/Schema/Tool.php +++ b/src/Schema/Tool.php @@ -21,8 +21,8 @@ * * @phpstan-type ToolInputSchema array{ * type: 'object', - * properties: array, - * required: string[]|null + * properties: array|object, + * required: string[] * } * @phpstan-type ToolOutputSchema array{ * type: 'object', diff --git a/src/Server/Builder.php b/src/Server/Builder.php index 9e9b6b2f..15a1c1fd 100644 --- a/src/Server/Builder.php +++ b/src/Server/Builder.php @@ -23,6 +23,11 @@ use Mcp\Capability\Registry\Loader\LoaderInterface; use Mcp\Capability\Registry\ReferenceHandler; use Mcp\Capability\RegistryInterface; +use Mcp\Event\Dispatcher; +use Mcp\Event\ListenerProvider; +use Mcp\Event\PromptListChangedEvent; +use Mcp\Event\ResourceListChangedEvent; +use Mcp\Event\ToolListChangedEvent; use Mcp\JsonRpc\MessageFactory; use Mcp\Schema\Annotations; use Mcp\Schema\Enum\ProtocolVersion; @@ -60,6 +65,8 @@ final class Builder private ?EventDispatcherInterface $eventDispatcher = null; + private ?ListenerProvider $eventListenerProvider = null; + private ?ContainerInterface $container = null; private ?SchemaGeneratorInterface $schemaGenerator = null; @@ -284,6 +291,13 @@ public function setEventDispatcher(EventDispatcherInterface $eventDispatcher): s return $this; } + public function setEventListenerProvider(ListenerProvider $listenerProvider): self + { + $this->eventListenerProvider = $listenerProvider; + + return $this; + } + /** * Provides a PSR-11 DI container, primarily for resolving user-defined handler classes. * Defaults to a basic internal container. @@ -488,6 +502,8 @@ public function build(): Server { $logger = $this->logger ?? new NullLogger(); $container = $this->container ?? new Container(); + $this->eventListenerProvider ??= new ListenerProvider(); + $this->eventDispatcher ??= new Dispatcher($this->eventListenerProvider); $registry = $this->registry ?? new Registry($this->eventDispatcher, $logger); $loaders = [ @@ -511,12 +527,12 @@ public function build(): Server $capabilities = $this->serverCapabilities ?? new ServerCapabilities( tools: $registry->hasTools(), - toolsListChanged: $this->eventDispatcher instanceof EventDispatcherInterface, + toolsListChanged: true, resources: $registry->hasResources() || $registry->hasResourceTemplates(), resourcesSubscribe: false, - resourcesListChanged: $this->eventDispatcher instanceof EventDispatcherInterface, + resourcesListChanged: true, prompts: $registry->hasPrompts(), - promptsListChanged: $this->eventDispatcher instanceof EventDispatcherInterface, + promptsListChanged: true, logging: true, completions: true, ); @@ -552,6 +568,11 @@ public function build(): Server logger: $logger, ); + $changeListener = new ChangeListener($protocol); + $this->eventListenerProvider->addListener(PromptListChangedEvent::class, $changeListener->onPromptListChange(...)); + $this->eventListenerProvider->addListener(ResourceListChangedEvent::class, $changeListener->onResourceListChange(...)); + $this->eventListenerProvider->addListener(ToolListChangedEvent::class, $changeListener->onToolListChange(...)); + return new Server($protocol, $logger); } diff --git a/src/Server/ChangeListener.php b/src/Server/ChangeListener.php new file mode 100644 index 00000000..c0cd0d2e --- /dev/null +++ b/src/Server/ChangeListener.php @@ -0,0 +1,42 @@ + + */ +final class ChangeListener +{ + public function __construct( + private readonly Protocol $protocol, + ) { + } + + public function onPromptListChange(): void + { + $this->protocol->sendNotification(new PromptListChangedNotification()); + } + + public function onResourceListChange(): void + { + $this->protocol->sendNotification(new ResourceListChangedNotification()); + } + + public function onToolListChange(): void + { + $this->protocol->sendNotification(new ToolListChangedNotification()); + } +} diff --git a/src/Server/Protocol.php b/src/Server/Protocol.php index feedae3b..4496ac31 100644 --- a/src/Server/Protocol.php +++ b/src/Server/Protocol.php @@ -57,6 +57,8 @@ class Protocol public const SESSION_LOGGING_LEVEL = '_mcp.logging_level'; + private ?SessionInterface $session = null; + /** * @param array>> $requestHandlers * @param array $notificationHandlers @@ -113,29 +115,30 @@ public function processInput(TransportInterface $transport, string $input, ?Uuid } catch (\JsonException $e) { $this->logger->warning('Failed to decode json message.', ['exception' => $e]); $error = Error::forParseError($e->getMessage()); - $this->sendResponse($transport, $error, null); + $this->sendResponse($transport, $error); return; } - $session = $this->resolveSession($transport, $sessionId, $messages); - if (null === $session) { + $this->resolveSession($transport, $sessionId, $messages); + if (null === $this->session) { return; } foreach ($messages as $message) { if ($message instanceof InvalidInputMessageException) { - $this->handleInvalidMessage($transport, $message, $session); + $this->handleInvalidMessage($transport, $message); } elseif ($message instanceof Request) { - $this->handleRequest($transport, $message, $session); + $this->handleRequest($transport, $message); } elseif ($message instanceof Response || $message instanceof Error) { - $this->handleResponse($message, $session); + $this->handleResponse($message); } elseif ($message instanceof Notification) { - $this->handleNotification($message, $session); + $this->handleNotification($message); } } - $session->save(); + $this->session->save(); + $this->session = null; } /** @@ -143,12 +146,12 @@ public function processInput(TransportInterface $transport, string $input, ?Uuid * * @param TransportInterface $transport */ - private function handleInvalidMessage(TransportInterface $transport, InvalidInputMessageException $exception, SessionInterface $session): void + private function handleInvalidMessage(TransportInterface $transport, InvalidInputMessageException $exception): void { $this->logger->warning('Failed to create message.', ['exception' => $exception]); $error = Error::forInvalidRequest($exception->getMessage()); - $this->sendResponse($transport, $error, $session); + $this->sendResponse($transport, $error); } /** @@ -156,11 +159,11 @@ private function handleInvalidMessage(TransportInterface $transport, InvalidInpu * * @param TransportInterface $transport */ - private function handleRequest(TransportInterface $transport, Request $request, SessionInterface $session): void + private function handleRequest(TransportInterface $transport, Request $request): void { $this->logger->info('Handling request.', ['request' => $request]); - $session->set(self::SESSION_ACTIVE_REQUEST_META, $request->getMeta()); + $this->session->set(self::SESSION_ACTIVE_REQUEST_META, $request->getMeta()); $handlerFound = false; @@ -172,6 +175,7 @@ private function handleRequest(TransportInterface $transport, Request $request, $handlerFound = true; try { + $session = $this->session; /** @var McpFiber $fiber */ $fiber = new \Fiber(static fn () => $handler->handle($request, $session)); @@ -181,31 +185,31 @@ private function handleRequest(TransportInterface $transport, Request $request, if (\is_array($result) && isset($result['type'])) { if ('notification' === $result['type']) { $notification = $result['notification']; - $this->sendNotification($notification, $session); + $this->sendNotification($notification); } elseif ('request' === $result['type']) { $request = $result['request']; $timeout = $result['timeout'] ?? 120; - $this->sendRequest($request, $timeout, $session); + $this->sendRequest($request, $timeout); } } - $transport->attachFiberToSession($fiber, $session->getId()); + $transport->attachFiberToSession($fiber, $this->session->getId()); return; } $finalResult = $fiber->getReturn(); - $this->sendResponse($transport, $finalResult, $session); + $this->sendResponse($transport, $finalResult); } catch (\InvalidArgumentException $e) { $this->logger->warning(\sprintf('Invalid argument: %s', $e->getMessage()), ['exception' => $e]); $error = Error::forInvalidParams($e->getMessage(), $request->getId()); - $this->sendResponse($transport, $error, $session); + $this->sendResponse($transport, $error); } catch (\Throwable $e) { $this->logger->error(\sprintf('Uncaught exception: %s', $e->getMessage()), ['exception' => $e]); $error = Error::forInternalError($e->getMessage(), $request->getId()); - $this->sendResponse($transport, $error, $session); + $this->sendResponse($transport, $error); } break; @@ -213,28 +217,28 @@ private function handleRequest(TransportInterface $transport, Request $request, if (!$handlerFound) { $error = Error::forMethodNotFound(\sprintf('No handler found for method "%s".', $request::getMethod()), $request->getId()); - $this->sendResponse($transport, $error, $session); + $this->sendResponse($transport, $error); } } /** * @param Response>|Error $response */ - private function handleResponse(Response|Error $response, SessionInterface $session): void + private function handleResponse(Response|Error $response): void { $this->logger->info('Handling response from client.', ['response' => $response]); $messageId = $response->getId(); - $session->set(self::SESSION_RESPONSES.".{$messageId}", $response->jsonSerialize()); - $session->forget(self::SESSION_ACTIVE_REQUEST_META); + $this->session->set(self::SESSION_RESPONSES.".{$messageId}", $response->jsonSerialize()); + $this->session->forget(self::SESSION_ACTIVE_REQUEST_META); $this->logger->info('Client response stored in session', [ 'message_id' => $messageId, ]); } - private function handleNotification(Notification $notification, SessionInterface $session): void + private function handleNotification(Notification $notification): void { $this->logger->info('Handling notification.', ['notification' => $notification]); @@ -244,7 +248,7 @@ private function handleNotification(Notification $notification, SessionInterface } try { - $handler->handle($notification, $session); + $handler->handle($notification, $this->session); } catch (\Throwable $e) { $this->logger->error(\sprintf('Error while handling notification: %s', $e->getMessage()), ['exception' => $e]); } @@ -254,11 +258,11 @@ private function handleNotification(Notification $notification, SessionInterface /** * Sends a request to the client and returns the request ID. */ - public function sendRequest(Request $request, int $timeout, SessionInterface $session): int + public function sendRequest(Request $request, int $timeout): int { - $counter = $session->get(self::SESSION_REQUEST_ID_COUNTER, 1000); + $counter = $this->session->get(self::SESSION_REQUEST_ID_COUNTER, 1000); $requestId = $counter++; - $session->set(self::SESSION_REQUEST_ID_COUNTER, $counter); + $this->session->set(self::SESSION_REQUEST_ID_COUNTER, $counter); $requestWithId = $request->withId($requestId); @@ -267,15 +271,15 @@ public function sendRequest(Request $request, int $timeout, SessionInterface $se 'method' => $request::getMethod(), ]); - $pending = $session->get(self::SESSION_PENDING_REQUESTS, []); + $pending = $this->session->get(self::SESSION_PENDING_REQUESTS, []); $pending[$requestId] = [ 'request_id' => $requestId, 'timeout' => $timeout, 'timestamp' => time(), ]; - $session->set(self::SESSION_PENDING_REQUESTS, $pending); + $this->session->set(self::SESSION_PENDING_REQUESTS, $pending); - $this->queueOutgoing($requestWithId, ['type' => 'request'], $session); + $this->queueOutgoing($requestWithId, ['type' => 'request']); return $requestId; } @@ -283,13 +287,13 @@ public function sendRequest(Request $request, int $timeout, SessionInterface $se /** * Queues a notification for later delivery. */ - public function sendNotification(Notification $notification, SessionInterface $session): void + public function sendNotification(Notification $notification): void { $this->logger->info('Queueing server notification to client', [ 'method' => $notification::getMethod(), ]); - $this->queueOutgoing($notification, ['type' => 'notification'], $session); + $this->queueOutgoing($notification, ['type' => 'notification']); } /** @@ -299,9 +303,9 @@ public function sendNotification(Notification $notification, SessionInterface $s * @param Response>|Error $response * @param array $context */ - private function sendResponse(TransportInterface $transport, Response|Error $response, ?SessionInterface $session, array $context = []): void + private function sendResponse(TransportInterface $transport, Response|Error $response, array $context = []): void { - if (null === $session) { + if (null === $this->session) { $this->logger->info('Sending immediate response', [ 'response_id' => $response->getId(), ]); @@ -330,7 +334,7 @@ private function sendResponse(TransportInterface $transport, Response|Error $res 'response_id' => $response->getId(), ]); - $this->queueOutgoing($response, ['type' => 'response'], $session); + $this->queueOutgoing($response, ['type' => 'response']); } } @@ -340,7 +344,7 @@ private function sendResponse(TransportInterface $transport, Response|Error $res * @param Request|Notification|Response>|Error $message * @param array $context */ - private function queueOutgoing(Request|Notification|Response|Error $message, array $context, SessionInterface $session): void + private function queueOutgoing(Request|Notification|Response|Error $message, array $context): void { try { $encoded = json_encode($message, \JSON_THROW_ON_ERROR); @@ -352,12 +356,12 @@ private function queueOutgoing(Request|Notification|Response|Error $message, arr return; } - $queue = $session->get(self::SESSION_OUTGOING_QUEUE, []); + $queue = $this->session->get(self::SESSION_OUTGOING_QUEUE, []); $queue[] = [ 'message' => $encoded, 'context' => $context, ]; - $session->set(self::SESSION_OUTGOING_QUEUE, $queue); + $this->session->set(self::SESSION_OUTGOING_QUEUE, $queue); } /** @@ -476,7 +480,7 @@ public function handleFiberYield(mixed $yieldedValue, ?Uuid $sessionId): void return; } - $this->sendNotification($notification, $session); + $this->sendNotification($notification); } elseif ('request' === $yieldedValue['type']) { $request = $yieldedValue['request'] ?? null; if (!$request instanceof Request) { @@ -488,7 +492,7 @@ public function handleFiberYield(mixed $yieldedValue, ?Uuid $sessionId): void } $timeout = isset($yieldedValue['timeout']) ? (int) $yieldedValue['timeout'] : 120; - $this->sendRequest($request, $timeout, $session); + $this->sendRequest($request, $timeout); } else { $this->logger->warning('Fiber yielded unknown operation type.', [ 'type' => $yieldedValue['type'], @@ -520,50 +524,50 @@ private function hasInitializeRequest(array $messages): bool * @param Uuid|null $sessionId The session ID from the transport * @param array $messages The parsed messages */ - private function resolveSession(TransportInterface $transport, ?Uuid $sessionId, array $messages): ?SessionInterface + private function resolveSession(TransportInterface $transport, ?Uuid $sessionId, array $messages): void { if ($this->hasInitializeRequest($messages)) { // Spec: An initialize request must not be part of a batch. if (\count($messages) > 1) { $error = Error::forInvalidRequest('The "initialize" request MUST NOT be part of a batch.'); - $this->sendResponse($transport, $error, null); + $this->sendResponse($transport, $error); - return null; + return; } // Spec: An initialize request must not have a session ID. if ($sessionId) { $error = Error::forInvalidRequest('A session ID MUST NOT be sent with an "initialize" request.'); - $this->sendResponse($transport, $error, null); + $this->sendResponse($transport, $error); - return null; + return; } - $session = $this->sessionFactory->create($this->sessionStore); + $this->session = $this->sessionFactory->create($this->sessionStore); $this->logger->debug('Created new session for initialize', [ - 'session_id' => $session->getId()->toRfc4122(), + 'session_id' => $this->session->getId()->toRfc4122(), ]); - $transport->setSessionId($session->getId()); + $transport->setSessionId($this->session->getId()); - return $session; + return; } if (!$sessionId) { $error = Error::forInvalidRequest('A valid session id is REQUIRED for non-initialize requests.'); - $this->sendResponse($transport, $error, null, ['status_code' => 400]); + $this->sendResponse($transport, $error, ['status_code' => 400]); - return null; + return; } if (!$this->sessionStore->exists($sessionId)) { $error = Error::forInvalidRequest('Session not found or has expired.'); - $this->sendResponse($transport, $error, null, ['status_code' => 404]); + $this->sendResponse($transport, $error, ['status_code' => 404]); - return null; + return; } - return $this->sessionFactory->createWithId($sessionId, $this->sessionStore); + $this->session = $this->sessionFactory->createWithId($sessionId, $this->sessionStore); } /** diff --git a/tests/Unit/Capability/RegistryTest.php b/tests/Unit/Capability/RegistryTest.php index e8b19585..e97b6495 100644 --- a/tests/Unit/Capability/RegistryTest.php +++ b/tests/Unit/Capability/RegistryTest.php @@ -620,7 +620,7 @@ private function createValidTool(string $name, ?array $outputSchema = null): Too 'properties' => [ 'param' => ['type' => 'string'], ], - 'required' => null, + 'required' => [], ], description: "Test tool: {$name}", annotations: null,