Skip to content

Commit

Permalink
okhttp: Add server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ejona86 committed Jul 1, 2022
1 parent 91fcc33 commit 4b57d40
Show file tree
Hide file tree
Showing 26 changed files with 3,620 additions and 139 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/io/grpc/ServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public T handshakeTimeout(long timeout, TimeUnit unit) {
/**
* Sets the time without read activity before sending a keepalive ping. An unreasonably small
* value might be increased, and {@code Long.MAX_VALUE} nano seconds or an unreasonably large
* value will disable keepalive. The typical default is infinite when supported.
* value will disable keepalive. The typical default is two hours when supported.
*
* @throws IllegalArgumentException if time is not positive
* @throws UnsupportedOperationException if unsupported
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected interface Sink {
* @param flush {@code true} if more data may not be arriving soon
* @param numMessages the number of messages this frame represents
*/
void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages);
void writeFrame(WritableBuffer frame, boolean flush, int numMessages);

/**
* Sends trailers to the remote end point. This call implies end of stream.
Expand Down Expand Up @@ -108,7 +108,14 @@ public final void deliverFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
// Since endOfStream is triggered by the sending of trailers, avoid flush here and just flush
// after the trailers.
abstractServerStreamSink().writeFrame(frame, endOfStream ? false : flush, numMessages);
if (frame == null) {
assert endOfStream;
return;
}
if (endOfStream) {
flush = false;
}
abstractServerStreamSink().writeFrame(frame, flush, numMessages);
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/io/grpc/internal/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ public static void closeQuietly(@Nullable Closeable message) {
}
}

/** Reads {@code in} until end of stream. */
public static void exhaust(InputStream in) throws IOException {
byte[] buf = new byte[256];
while (in.read(buf) != -1) {}
}

/**
* Checks whether the given item exists in the iterable. This is copied from Guava Collect's
* {@code Iterables.contains()} because Guava Collect is not Android-friendly thus core can't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,13 @@ public void serverStartInterrupted() throws Exception {
}
assumeTrue("transport is not using InetSocketAddress", port != -1);
server.shutdown();
assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS));

server = newServer(port, Arrays.asList(serverStreamTracerFactory));
boolean success;
Thread.currentThread().interrupt();
try {
server.start(serverListener);
server.start(serverListener = new MockServerListener());
success = true;
} catch (Exception ex) {
success = false;
Expand Down
4 changes: 4 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -854,6 +855,9 @@ public void onHeadersRead(ChannelHandlerContext ctx,
keepAliveManager.onDataReceived();
}
NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
if (endStream) {
NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
}
}

@Override
Expand Down
4 changes: 0 additions & 4 deletions netty/src/main/java/io/grpc/netty/NettyServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ public void writeHeaders(Metadata headers) {

private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
if (frame == null) {
writeQueue.scheduleFlush();
return;
}
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf().touch();
final int numBytes = bytebuf.readableBytes();
// Add the bytes to outbound flow control.
Expand Down
2 changes: 1 addition & 1 deletion okhttp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies {
testImplementation project(':grpc-core').sourceSets.test.output,
project(':grpc-api').sourceSets.test.output,
project(':grpc-testing'),
project(':grpc-netty'),
libraries.netty.codec.http2,
libraries.okhttp
signature "org.codehaus.mojo.signature:java17:1.0@signature"
signature "net.sf.androidscents.signature:android-api-level-14:4.0_r4@signature"
Expand Down
7 changes: 7 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/AsyncSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ public void close() {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
try {
if (buffer.size() > 0) {
sink.write(buffer, buffer.size());
}
} catch (IOException e) {
transportExceptionHandler.onException(e);
}
buffer.close();
try {
if (sink != null) {
Expand Down
42 changes: 42 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/HandshakerSocketFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.okhttp;

import com.google.common.base.Preconditions;
import io.grpc.Attributes;
import io.grpc.InternalChannelz;
import java.io.IOException;
import java.net.Socket;

/** Handshakes new connections. */
interface HandshakerSocketFactory {
/** When the returned socket is closed, {@code socket} must be closed. */
HandshakeResult handshake(Socket socket, Attributes attributes) throws IOException;

static final class HandshakeResult {
public final Socket socket;
public final Attributes attributes;
public final InternalChannelz.Security securityInfo;

public HandshakeResult(
Socket socket, Attributes attributes, InternalChannelz.Security securityInfo) {
this.socket = Preconditions.checkNotNull(socket, "socket");
this.attributes = Preconditions.checkNotNull(attributes, "attributes");
this.securityInfo = securityInfo;
}
}
}
85 changes: 62 additions & 23 deletions okhttp/src/main/java/io/grpc/okhttp/Headers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package io.grpc.okhttp;

import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;

import com.google.common.base.Preconditions;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
Expand All @@ -39,7 +36,7 @@ class Headers {
public static final Header METHOD_HEADER = new Header(Header.TARGET_METHOD, GrpcUtil.HTTP_METHOD);
public static final Header METHOD_GET_HEADER = new Header(Header.TARGET_METHOD, "GET");
public static final Header CONTENT_TYPE_HEADER =
new Header(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC);
new Header(GrpcUtil.CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC);
public static final Header TE_HEADER = new Header("te", GrpcUtil.TE_TRAILERS);

/**
Expand All @@ -58,10 +55,7 @@ public static List<Header> createRequestHeaders(
Preconditions.checkNotNull(defaultPath, "defaultPath");
Preconditions.checkNotNull(authority, "authority");

// Discard any application supplied duplicates of the reserved headers
headers.discardAll(GrpcUtil.CONTENT_TYPE_KEY);
headers.discardAll(GrpcUtil.TE_HEADER);
headers.discardAll(GrpcUtil.USER_AGENT_KEY);
stripNonApplicationHeaders(headers);

// 7 is the number of explicit add calls below.
List<Header> okhttpHeaders = new ArrayList<>(7 + InternalMetadata.headerCount(headers));
Expand Down Expand Up @@ -89,27 +83,72 @@ public static List<Header> createRequestHeaders(
okhttpHeaders.add(TE_HEADER);

// Now add any application-provided headers.
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers);
return addMetadata(okhttpHeaders, headers);
}

/**
* Serializes the given headers and creates a list of OkHttp {@link Header}s to be used when
* starting a response. Since this serializes the headers, this method should be called in the
* application thread context.
*/
public static List<Header> createResponseHeaders(Metadata headers) {
stripNonApplicationHeaders(headers);

// 2 is the number of explicit add calls below.
List<Header> okhttpHeaders = new ArrayList<>(2 + InternalMetadata.headerCount(headers));
okhttpHeaders.add(new Header(Header.RESPONSE_STATUS, "200"));
// All non-pseudo headers must come after pseudo headers.
okhttpHeaders.add(CONTENT_TYPE_HEADER);
return addMetadata(okhttpHeaders, headers);
}

/**
* Serializes the given headers and creates a list of OkHttp {@link Header}s to be used when
* finishing a response. Since this serializes the headers, this method should be called in the
* application thread context.
*/
public static List<Header> createResponseTrailers(Metadata trailers, boolean headersSent) {
if (!headersSent) {
return createResponseHeaders(trailers);
}
stripNonApplicationHeaders(trailers);

List<Header> okhttpTrailers = new ArrayList<>(InternalMetadata.headerCount(trailers));
return addMetadata(okhttpTrailers, trailers);
}

/**
* Serializes the given headers and creates a list of OkHttp {@link Header}s to be used when
* failing with an HTTP response.
*/
public static List<Header> createHttpResponseHeaders(
int httpCode, String contentType, Metadata headers) {
// 2 is the number of explicit add calls below.
List<Header> okhttpHeaders = new ArrayList<>(2 + InternalMetadata.headerCount(headers));
okhttpHeaders.add(new Header(Header.RESPONSE_STATUS, "" + httpCode));
// All non-pseudo headers must come after pseudo headers.
okhttpHeaders.add(new Header(GrpcUtil.CONTENT_TYPE_KEY.name(), contentType));
return addMetadata(okhttpHeaders, headers);
}

private static List<Header> addMetadata(List<Header> okhttpHeaders, Metadata toAdd) {
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(toAdd);
for (int i = 0; i < serializedHeaders.length; i += 2) {
ByteString key = ByteString.of(serializedHeaders[i]);
String keyString = key.utf8();
if (isApplicationHeader(keyString)) {
ByteString value = ByteString.of(serializedHeaders[i + 1]);
okhttpHeaders.add(new Header(key, value));
// Don't allow HTTP/2 pseudo headers to be added by the application.
if (key.size() == 0 || key.getByte(0) == ':') {
continue;
}
ByteString value = ByteString.of(serializedHeaders[i + 1]);
okhttpHeaders.add(new Header(key, value));
}

return okhttpHeaders;
}

/**
* Returns {@code true} if the given header is an application-provided header. Otherwise, returns
* {@code false} if the header is reserved by GRPC.
*/
private static boolean isApplicationHeader(String key) {
// Don't allow HTTP/2 pseudo headers or content-type to be added by the application.
return (!key.startsWith(":")
&& !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key))
&& !USER_AGENT_KEY.name().equalsIgnoreCase(key);
/** Strips all non-pseudo headers reserved by gRPC, to avoid duplicates and misinterpretation. */
private static void stripNonApplicationHeaders(Metadata headers) {
headers.discardAll(GrpcUtil.CONTENT_TYPE_KEY);
headers.discardAll(GrpcUtil.TE_HEADER);
headers.discardAll(GrpcUtil.USER_AGENT_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void close(Executor executor) {
((ExecutorService) executor).shutdown();
}
};
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
SharedResourcePool.forResource(SHARED_EXECUTOR);

/** Creates a new builder for the given server host and port. */
Expand Down
33 changes: 18 additions & 15 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ class OkHttpClientStream extends AbstractClientStream {
private final String userAgent;
private final StatsTraceContext statsTraceCtx;
private String authority;
private Object outboundFlowState;
private volatile int id = ABSENT_ID;
private final TransportState state;
private final Sink sink = new Sink();
private final Attributes attributes;
Expand Down Expand Up @@ -120,10 +118,6 @@ public MethodDescriptor.MethodType getType() {
return method.getType();
}

public int id() {
return id;
}

/**
* Returns whether the stream uses GET. This is not known until after {@link Sink#writeHeaders} is
* invoked.
Expand Down Expand Up @@ -198,7 +192,8 @@ public void cancel(Status reason) {
}
}

class TransportState extends Http2ClientStreamTransportState {
class TransportState extends Http2ClientStreamTransportState
implements OutboundFlowController.Stream {
private final int initialWindowSize;
private final Object lock;
@GuardedBy("lock")
Expand All @@ -223,6 +218,9 @@ class TransportState extends Http2ClientStreamTransportState {
@GuardedBy("lock")
private boolean canStart = true;
private final Tag tag;
@GuardedBy("lock")
private OutboundFlowController.StreamState outboundFlowState;
private int id = ABSENT_ID;

public TransportState(
int maxMessageSize,
Expand All @@ -249,6 +247,7 @@ public TransportState(
public void start(int streamId) {
checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId);
id = streamId;
outboundFlowState = outboundFlow.createState(this, streamId);
// TODO(b/145386688): This access should be guarded by 'OkHttpClientStream.this.state.lock';
// instead found: 'this.lock'
state.onStreamAllocated();
Expand All @@ -260,7 +259,9 @@ public void start(int streamId) {
requestHeaders = null;

if (pendingData.size() > 0) {
outboundFlow.data(pendingDataHasEndOfStream, id, pendingData, flushPendingData);
outboundFlow.data(
pendingDataHasEndOfStream, outboundFlowState, pendingData, flushPendingData);

}
canStart = false;
}
Expand Down Expand Up @@ -396,7 +397,7 @@ private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) {
checkState(id() != ABSENT_ID, "streamId should be set");
// If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is
// properly chunked.
outboundFlow.data(endOfStream, id(), buffer, flush);
outboundFlow.data(endOfStream, outboundFlowState, buffer, flush);
}
}

Expand All @@ -419,13 +420,15 @@ private void streamReady(Metadata metadata, String path) {
Tag tag() {
return tag;
}
}

void setOutboundFlowState(Object outboundFlowState) {
this.outboundFlowState = outboundFlowState;
}
int id() {
return id;
}

Object getOutboundFlowState() {
return outboundFlowState;
OutboundFlowController.StreamState getOutboundFlowState() {
synchronized (lock) {
return outboundFlowState;
}
}
}
}
Loading

0 comments on commit 4b57d40

Please sign in to comment.