From 80507233975185b377079934aaedda9acbf5174a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20JACQUES?= Date: Mon, 1 Apr 2024 17:39:33 +0200 Subject: [PATCH] OkHttpServer: support maxConcurrentCallsPerConnection (Fixes #11062). (#11063) * Add option in OkHttpServerBuilder * Add value as MAX_CONCURRENT_STREAM setting in settings frame sent by the server to the client per connection * Enforce limit by sending a RST frame with REFUSED_STREAM error --- .../io/grpc/okhttp/OkHttpServerBuilder.java | 14 +++++++++ .../io/grpc/okhttp/OkHttpServerTransport.java | 11 +++++++ .../okhttp/OkHttpServerTransportTest.java | 29 +++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java index 8269a8ddf0f..068474d70bc 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java @@ -74,6 +74,7 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder 0, + "max must be positive: %s", maxConcurrentCallsPerConnection); + this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection; + return this; + } + /** * Sets the maximum message size allowed to be received on the server. If not called, defaults to * defaults to 4 MiB. The default provides protection to servers who haven't considered the diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java index 8fb74d3f1b5..3ef8b613394 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java @@ -219,6 +219,10 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow); OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize); + if (config.maxConcurrentStreams != Integer.MAX_VALUE) { + OkHttpSettingsUtil.set(settings, + OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, config.maxConcurrentStreams); + } frameWriter.settings(settings); if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) { frameWriter.windowUpdate( @@ -520,6 +524,7 @@ static final class Config { final long permitKeepAliveTimeInNanos; final long maxConnectionAgeInNanos; final long maxConnectionAgeGraceInNanos; + final int maxConcurrentStreams; public Config( OkHttpServerBuilder builder, @@ -544,6 +549,7 @@ public Config( permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos; maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos; maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos; + maxConcurrentStreams = builder.maxConcurrentCallsPerConnection; } } @@ -638,6 +644,11 @@ public void headers(boolean outFinished, newStream = streamId > lastStreamId; if (newStream) { lastStreamId = streamId; + if (config.maxConcurrentStreams <= streams.size()) { + streamError(streamId, ErrorCode.REFUSED_STREAM, + "Max concurrent stream reached. RFC7540 section 5.1.2"); + return; + } } } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java index 3f88c35e017..6438cf83a1d 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java @@ -1264,6 +1264,35 @@ public void keepAliveEnforcer_noticesActive() throws Exception { eq(ByteString.encodeString("too_many_pings", GrpcUtil.US_ASCII))); } + @Test + public void maxConcurrentCallsPerConnection_failsWithRst() throws Exception { + int maxConcurrentCallsPerConnection = 1; + serverBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection); + initTransport(); + handshake(); + + ArgumentCaptor settingsCaptor = ArgumentCaptor.forClass(Settings.class); + verify(clientFramesRead).settings(eq(false), settingsCaptor.capture()); + final Settings settings = settingsCaptor.getValue(); + assertThat(OkHttpSettingsUtil.get(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) + .isEqualTo(maxConcurrentCallsPerConnection); + + final List
headers = Arrays.asList( + HTTP_SCHEME_HEADER, + METHOD_HEADER, + new Header(Header.TARGET_AUTHORITY, "example.com:80"), + new Header(Header.TARGET_PATH, "/com.example/SimpleService/doit"), + CONTENT_TYPE_HEADER, + TE_HEADER); + + clientFrameWriter.headers(1, headers); + clientFrameWriter.headers(3, headers); + clientFrameWriter.flush(); + + assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); + verify(clientFramesRead).rstStream(3, ErrorCode.REFUSED_STREAM); + } + private void initTransport() throws Exception { serverTransport = new OkHttpServerTransport( new OkHttpServerTransport.Config(serverBuilder, Arrays.asList()),