Skip to content

Commit

Permalink
Polish
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Dec 17, 2024
1 parent 42c20ef commit dfc9de9
Showing 1 changed file with 57 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1631,21 +1631,20 @@ void testIssue632() throws Exception {
}

@Test
void testIssue3538() throws Exception {
void testIssue3538() {
disposableServer =
createServer()
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();

req
.receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);
req.receiveContent()
.switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);

return res.sendObject(action);
}
))
.bindNow();
return res.sendObject(action);
}))
.bindNow();
assertThat(disposableServer).isNotNull();

final ByteBuf content = createHttpClientForContextWithPort()
Expand All @@ -1662,56 +1661,55 @@ void testIssue3538() throws Exception {
}

@Test
void testIssue3538GetWithPayload() throws Exception {
void testIssue3538GetWithPayload() {
disposableServer =
createServer()
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();

req
.receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);
req.receiveContent()
.switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);

return res.sendObject(action);
}
))
.bindNow();
return res.sendObject(action);
}))
.bindNow();
assertThat(disposableServer).isNotNull();

// The H2C max content length is 0 by default (no content is expected),
// so the request is rejected with HTTP/413 Content Too Large
StepVerifier.create(createHttpClientForContextWithPort()
createHttpClientForContextWithPort()
.protocol(HttpProtocol.HTTP11)
.headers(h ->
h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)
.add(HttpHeaderNames.UPGRADE, "TLS/1.2"))
.request(HttpMethod.GET)
.send((req, res) -> res.sendString(Mono.just("testIssue3538")))
.uri("/")
.response((r, buf) -> Mono.just(r.status().code())))
.expectNextMatches(status -> status == 413)
.expectComplete()
.verify(Duration.ofSeconds(30));
.response((r, buf) -> Mono.just(r.status().code()))
.as(StepVerifier::create)
.expectNextMatches(status -> status == 413)
.expectComplete()
.verify(Duration.ofSeconds(30));
}

@Test
void testIssue3538GetWithPayloadAndH2cMaxContentLength() throws Exception {
void testIssue3538GetWithPayloadAndH2cMaxContentLength() {
disposableServer =
createServer()
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.httpRequestDecoder(spec -> spec.h2cMaxContentLength(100))
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.httpRequestDecoder(spec -> spec.h2cMaxContentLength(100))
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();

req
.receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);
req.receiveContent()
.switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);

return res.sendObject(action);
}
))
.bindNow();
return res.sendObject(action);
}))
.bindNow();
assertThat(disposableServer).isNotNull();

final ByteBuf content = createHttpClientForContextWithPort()
Expand Down Expand Up @@ -3574,28 +3572,28 @@ void testDeleteMethod(boolean chunked) {
.verify(Duration.ofSeconds(5));
}

private static class EchoAction implements Publisher<HttpContent>, Consumer<HttpContent> {
private final Publisher<HttpContent> sender;
private volatile FluxSink<HttpContent> emitter;
private static final class EchoAction implements Publisher<HttpContent>, Consumer<HttpContent> {
private final Publisher<HttpContent> sender;
private volatile FluxSink<HttpContent> emitter;

EchoAction() {
this.sender = Flux.create(emitter -> this.emitter = emitter);
}
EchoAction() {
this.sender = Flux.create(emitter -> this.emitter = emitter);
}

@Override
public void accept(HttpContent message) {
if (message.content().readableBytes() > 0) {
emitter.next(new DefaultHttpContent(message.content().retain()));
}
@Override
public void accept(HttpContent message) {
if (message.content().readableBytes() > 0) {
emitter.next(new DefaultHttpContent(message.content().retain()));
}

if (message instanceof LastHttpContent) {
emitter.complete();
}
}
if (message instanceof LastHttpContent) {
emitter.complete();
}
}

@Override
public void subscribe(Subscriber<? super HttpContent> s) {
sender.subscribe(s);
}
}
@Override
public void subscribe(Subscriber<? super HttpContent> s) {
sender.subscribe(s);
}
}
}

0 comments on commit dfc9de9

Please sign in to comment.