diff --git a/spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/DefaultMcpSession.java b/spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/DefaultMcpSession.java index f145855..6efb23c 100644 --- a/spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/DefaultMcpSession.java +++ b/spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/DefaultMcpSession.java @@ -48,6 +48,15 @@ public class DefaultMcpSession implements McpSession { private final McpTransport transport; + private final ConcurrentHashMap requestHandlers = new ConcurrentHashMap<>(); + + @FunctionalInterface + public interface RequestHandler { + + Mono handle(Object params); + + } + public DefaultMcpSession(Duration requestTimeout, ObjectMapper objectMapper, McpTransport transport) { Assert.notNull(objectMapper, "The ObjectMapper can not be null"); @@ -70,7 +79,13 @@ public DefaultMcpSession(Duration requestTimeout, ObjectMapper objectMapper, Mcp } } else if (message instanceof McpSchema.JSONRPCRequest request) { - logger.info("Client does not yet support server requests"); + handleIncomingRequest(request).subscribe(response -> transport.sendMessage(response).subscribe(), + error -> { + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), + null, new McpSchema.JSONRPCResponse.JSONRPCError( + McpSchema.ErrorCodes.INTERNAL_ERROR, error.getMessage(), null)); + transport.sendMessage(errorResponse).subscribe(); + }); } else if (message instanceof McpSchema.JSONRPCNotification notification) { logger.info("Notifications not yet supported"); @@ -82,6 +97,27 @@ else if (message instanceof McpSchema.JSONRPCNotification notification) { this.transport.start(); } + private Mono handleIncomingRequest(McpSchema.JSONRPCRequest request) { + return Mono.defer(() -> { + var handler = requestHandlers.get(request.method()); + if (handler == null) { + return Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null, + new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.METHOD_NOT_FOUND, + "Method not found: " + request.method(), null))); + } + + return handler.handle(request.params()) + .map(result -> new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), result, null)) + .onErrorResume(error -> Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), + null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR, + error.getMessage(), null)))); + }); + } + + public void registerRequestHandler(String method, RequestHandler handler) { + requestHandlers.put(method, handler); + } + @Override public Mono sendRequest(String method, Object requestParams, TypeReference typeRef) { // TODO: UUID API is blocking. Consider non-blocking alternatives to generate diff --git a/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseMcpAsyncClientTests.java b/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseMcpAsyncClientTests.java index 71860dc..dd8f04f 100644 --- a/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseMcpAsyncClientTests.java +++ b/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseMcpAsyncClientTests.java @@ -38,7 +38,7 @@ class SseMcpAsyncClientTests extends AbstractMcpAsyncClientTests { // Uses the https://github.com/tzolov/mcp-everything-server-docker-image @SuppressWarnings("resource") - static GenericContainer container = new GenericContainer<>("tzolov/mcp-everything-server:v1") + static GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) .withExposedPorts(3001) .waitingFor(Wait.forHttp("/").forStatusCode(404)); diff --git a/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseMcpSyncClientTests.java b/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseMcpSyncClientTests.java index 7c7c32e..20d717f 100644 --- a/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseMcpSyncClientTests.java +++ b/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseMcpSyncClientTests.java @@ -38,7 +38,7 @@ class SseMcpSyncClientTests extends AbstractMcpSyncClientTests { // Uses the https://github.com/tzolov/mcp-everything-server-docker-image @SuppressWarnings("resource") - static GenericContainer container = new GenericContainer<>("tzolov/mcp-everything-server:v1") + static GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) .withExposedPorts(3001) .waitingFor(Wait.forHttp("/").forStatusCode(404)); diff --git a/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseServerTransportTests.java b/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseServerTransportTests.java index 453fa93..5758430 100644 --- a/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseServerTransportTests.java +++ b/spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/sse/SseServerTransportTests.java @@ -53,7 +53,7 @@ class SseServerTransportTests { static String host = "http://localhost:3001"; @SuppressWarnings("resource") - static GenericContainer container = new GenericContainer<>("tzolov/mcp-everything-server:v1") + static GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) .withExposedPorts(3001) .waitingFor(Wait.forHttp("/").forStatusCode(404));