Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First implementation of gRPC based on SwiftNIO #281

Merged
merged 29 commits into from
Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
68e0a09
First experiments with a NIO-based gRPC server.
MrMage Jun 9, 2018
d891c38
Make `ServerStreamingCallHandler.sendMessage` return a send future as…
MrMage Nov 12, 2018
0866ea0
Re-enable two more tests and suppress two warnings.
MrMage Nov 12, 2018
6374d88
Unify the interface across the different call handlers.
MrMage Nov 12, 2018
ae35416
Rename `...CallHandler.handler` to `.eventObserver`.
MrMage Nov 12, 2018
29ad4f6
Add support for returning custom statuses (e.g. with additional metad…
MrMage Nov 12, 2018
7b592f1
Minor argument reordering.
MrMage Nov 12, 2018
d4c6c0e
Avoid forcing unary call handlers to return an event loop future. Ins…
MrMage Nov 12, 2018
488d243
Add a TODO.
MrMage Nov 12, 2018
137a1c1
Add codegen support for non-TestStub NIO server code.
MrMage Nov 12, 2018
de5ec1c
Add more properties to GRPCCallHandler.
MrMage Nov 12, 2018
495cdf4
Store the full `HTTPRequestHead` alongside a gRPC call handler.
MrMage Nov 12, 2018
c4efbf7
Add support for having client-streaming request handlers return a fut…
MrMage Nov 13, 2018
367bd32
Make `StatusSendingHandler.statusPromise` public.
MrMage Nov 13, 2018
4915ce5
Convert a few non-blocking calls in tests to blocking ones to simplif…
MrMage Nov 15, 2018
0ec7137
Refactoring: pass special `ResponseHandler` objects to NIO server cal…
MrMage Nov 15, 2018
973618b
Code review fixes, interface improvements.
MrMage Nov 16, 2018
ddef365
Rename a few NIO tests.
MrMage Nov 16, 2018
036ea30
Add documentation.
MrMage Nov 16, 2018
0aa0af0
Rename "headers" to "request".
MrMage Nov 20, 2018
a66d449
Minor performance improvement by avoiding one copy.
MrMage Nov 20, 2018
b3bbc7d
Make unary calls take a `StatusOnlyCallContext` instead of `UnaryResp…
MrMage Nov 20, 2018
c7a9eef
Rename `sendOperationChain` in tests to `endOfSendOperationQueue`.
MrMage Nov 21, 2018
7a131a6
Review fixes.
MrMage Nov 21, 2018
b3537b9
Add one more comment to the README.
MrMage Nov 21, 2018
5503cb6
Oops, fix the tests.
MrMage Nov 21, 2018
a347855
Remove two unnecessary server channel options.
MrMage Nov 23, 2018
0a879a9
Add some more documentation.
MrMage Nov 26, 2018
7ab7a05
Pin `SwiftNIOHTTP2` for the time being.
MrMage Nov 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import PackageDescription

var packageDependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-protobuf.git", .upToNextMinor(from: "1.1.1")),
.package(url: "https://github.com/kylef/Commander.git", from: "0.8.0"),
.package(url: "https://github.com/apple/swift-nio-zlib-support.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "1.9.0"),
.package(url: "https://github.com/apple/swift-nio-nghttp2-support.git", from: "1.0.0"),
.package(url: "https://github.com/lukasa/swift-nio-http2.git", .branch("cb-issue-24"))
.package(url: "https://github.com/kylef/Commander.git", .upToNextMinor(from: "0.8.0")),
.package(url: "https://github.com/apple/swift-nio-zlib-support.git", .upToNextMinor(from: "1.0.0")),
.package(url: "https://github.com/apple/swift-nio.git", .upToNextMinor(from: "1.11.0")),
.package(url: "https://github.com/apple/swift-nio-nghttp2-support.git", .upToNextMinor(from: "1.0.0")),
.package(url: "https://github.com/apple/swift-nio-http2.git", .branch("master"))
MrMage marked this conversation as resolved.
Show resolved Hide resolved
]

var cGRPCDependencies: [Target.Dependency] = []
Expand All @@ -45,7 +45,11 @@ let package = Package(
.target(name: "SwiftGRPC",
dependencies: ["CgRPC", "SwiftProtobuf"]),
.target(name: "SwiftGRPCNIO",
dependencies: ["SwiftProtobuf", "NIOHTTP1", "NIOFoundationCompat", "NIOHTTP2"]),
dependencies: [
"NIOFoundationCompat",
"NIOHTTP1",
"NIOHTTP2",
"SwiftProtobuf"]),
.target(name: "CgRPC",
dependencies: cGRPCDependencies),
.target(name: "RootsEncoder"),
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ testing with the following versions:
- Swift 4.0
- swift-protobuf 1.1.1

## `SwiftGRPCNIO` package

`SwiftGRPCNIO` is a clean-room implementation of the gRPC protocol on top of the [`SwiftNIO`](http://github.com/apple/swift-nio) library. This implementation is not yet production-ready as it lacks several things recommended for production use:

- Better test coverage
- Full error handling
- SSL support
- Client support
- Example projects
- iOS support
- Removal of the `libnghttp2` dependency from `SwiftNIOHTTP2`

However, if you are planning to implement a gRPC service based on `SwiftNIO` or the Vapor framework, you might find this package useful. In addition, once ready, this package should provide more predictable and reliable behavior in the future, combined with an improved API and better developer experience.

You may also want to have a look at [this presentation](https://docs.google.com/presentation/d/1Mnsaq4mkeagZSP4mK1k0vewZrJKynm_MCteRDyM3OX8/edit) for more details on the motivation for this package.

## License

grpc-swift is released under the same license as
Expand Down
27 changes: 18 additions & 9 deletions Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,26 @@ import SwiftProtobuf
import NIO
import NIOHTTP1

// Provides a means for decoding incoming gRPC messages into protobuf objects.
// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
public class BaseCallHandler<RequestMessage: Message, ResponseMessage: Message>: GRPCCallHandler, ChannelInboundHandler {
/// Provides a means for decoding incoming gRPC messages into protobuf objects.
///
/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
public class BaseCallHandler<RequestMessage: Message, ResponseMessage: Message>: GRPCCallHandler {
public func makeGRPCServerCodec() -> ChannelHandler { return GRPCServerCodec<RequestMessage, ResponseMessage>() }

/// Called whenever a message has been received.
///
/// Overridden by subclasses.
public func processMessage(_ message: RequestMessage) {
fatalError("needs to be overridden")
}

/// Called when the client has half-closed the stream, indicating that they won't send any further data.
///
/// Overridden by subclasses if the "end-of-stream" event is relevant.
public func endOfStreamReceived() { }
}

extension BaseCallHandler: ChannelInboundHandler {
public typealias InboundIn = GRPCServerRequestPart<RequestMessage>
public typealias OutboundOut = GRPCServerResponsePart<ResponseMessage>

Expand All @@ -18,10 +33,4 @@ public class BaseCallHandler<RequestMessage: Message, ResponseMessage: Message>:
case .end: endOfStreamReceived()
}
}

public func processMessage(_ message: RequestMessage) {
fatalError("needs to be overridden")
}

public func endOfStreamReceived() { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@ import SwiftProtobuf
import NIO
import NIOHTTP1

// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
// The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
// To close the call and send the status, fulfill `context.statusPromise`.
/// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
///
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
/// - To close the call and send the status, fulfill `context.statusPromise`.
public class BidirectionalStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
fileprivate var eventObserver: EventLoopFuture<EventObserver>?
private var eventObserver: EventLoopFuture<EventObserver>?

//! FIXME: Do we need to keep the context around at all here?
public private(set) var context: StreamingResponseCallContext<ResponseMessage>?
private var context: StreamingResponseCallContext<ResponseMessage>?
MrMage marked this conversation as resolved.
Show resolved Hide resolved

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
super.init()
self.context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.eventObserver = eventObserverFactory(context!)
let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.context = context
let eventObserver = eventObserverFactory(context)
self.eventObserver = eventObserver
// Terminate the call if no observer is provided.
self.eventObserver?.cascadeFailure(promise: context!.statusPromise)
context!.statusPromise.futureResult.whenComplete {
eventObserver.cascadeFailure(promise: context.statusPromise)
context.statusPromise.futureResult.whenComplete {
// When done, reset references to avoid retain cycles.
self.eventObserver = nil
self.context = nil
Expand Down
20 changes: 11 additions & 9 deletions Sources/SwiftGRPCNIO/CallHandlers/ClientStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,26 @@ import SwiftProtobuf
import NIO
import NIOHTTP1

// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
// The observer block is implemented by the framework user and fulfills `context.responsePromise` when done.
/// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
///
/// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done.
public class ClientStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
fileprivate var eventObserver: EventLoopFuture<EventObserver>?
private var eventObserver: EventLoopFuture<EventObserver>?

//! FIXME: Do we need to keep the context around at all here?
public private(set) var context: UnaryResponseCallContext<ResponseMessage>?
private var context: UnaryResponseCallContext<ResponseMessage>?
MrMage marked this conversation as resolved.
Show resolved Hide resolved

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
super.init()
self.context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.eventObserver = eventObserverFactory(context!)
let context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.context = context
let eventObserver = eventObserverFactory(context)
self.eventObserver = eventObserver
// Terminate the call if no observer is provided.
self.eventObserver!.cascadeFailure(promise: context!.responsePromise)
context!.responsePromise.futureResult.whenComplete {
eventObserver.cascadeFailure(promise: context.responsePromise)
context.responsePromise.futureResult.whenComplete {
// When done, reset references to avoid retain cycles.
self.eventObserver = nil
self.context = nil
Expand Down
34 changes: 17 additions & 17 deletions Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ import SwiftProtobuf
import NIO
import NIOHTTP1

// Handles server-streaming calls. Calls the observer block with the request message.
// The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
// To close the call and send the status, complete the status future returned by the observer block.
/// Handles server-streaming calls. Calls the observer block with the request message.
///
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
/// - To close the call and send the status, complete the status future returned by the observer block.
public class ServerStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<GRPCStatus>
fileprivate var eventObserver: EventObserver?
private var eventObserver: EventObserver?

fileprivate var hasReceivedRequest = false

//! FIXME: Do we need to keep the context around at all here?
public private(set) var context: StreamingResponseCallContext<ResponseMessage>?
private var context: StreamingResponseCallContext<ResponseMessage>?

public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventObserver) {
super.init()
self.context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.eventObserver = eventObserverFactory(context!)
context!.statusPromise.futureResult.whenComplete {
let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.context = context
self.eventObserver = eventObserverFactory(context)
context.statusPromise.futureResult.whenComplete {
// When done, reset references to avoid retain cycles.
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
self.eventObserver = nil
self.context = nil
Expand All @@ -28,16 +27,17 @@ public class ServerStreamingCallHandler<RequestMessage: Message, ResponseMessage


public override func processMessage(_ message: RequestMessage) {
guard !hasReceivedRequest else {
//! FIXME: Better handle this error.
print("multiple messages received on server-streaming call")
guard let eventObserver = self.eventObserver,
let context = self.context else {
//! FIXME: Better handle this error?
print("multiple messages received on unary call")
return
}
hasReceivedRequest = true

let resultFuture = self.eventObserver!(message)
let resultFuture = eventObserver(message)
resultFuture
// Fulfill the status promise with whatever status the framework user has provided.
.cascade(promise: context!.statusPromise)
.cascade(promise: context.statusPromise)
self.eventObserver = nil
}
}
34 changes: 17 additions & 17 deletions Sources/SwiftGRPCNIO/CallHandlers/UnaryCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,41 @@ import SwiftProtobuf
import NIO
import NIOHTTP1

// Handles unary calls. Calls the observer block with the request message.
// The observer block is implemented by the framework user and returns a future containing the call result.
// To return a response to the client, the framework user should complete that framework
// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
/// Handles unary calls. Calls the observer block with the request message.
///
/// - The observer block is implemented by the framework user and returns a future containing the call result.
/// - To return a response to the client, the framework user should complete that future
/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
public class UnaryCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<ResponseMessage>
fileprivate var eventObserver: EventObserver?
private var eventObserver: EventObserver?

fileprivate var hasReceivedRequest = false

//! FIXME: Do we need to keep the context around at all here?
public private(set) var context: UnaryResponseCallContext<ResponseMessage>?
private var context: UnaryResponseCallContext<ResponseMessage>?

public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventObserver) {
super.init()
self.context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.eventObserver = eventObserverFactory(self.context!)
context!.responsePromise.futureResult.whenComplete {
let context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.context = context
self.eventObserver = eventObserverFactory(context)
context.responsePromise.futureResult.whenComplete {
// When done, reset references to avoid retain cycles.
self.eventObserver = nil
self.context = nil
}
}

public override func processMessage(_ message: RequestMessage) {
guard !hasReceivedRequest else {
//! FIXME: Better handle this error.
guard let eventObserver = self.eventObserver,
let context = self.context else {
//! FIXME: Better handle this error?
print("multiple messages received on unary call")
return
}
hasReceivedRequest = true

let resultFuture = self.eventObserver!(message)
let resultFuture = eventObserver(message)
resultFuture
// Fulfill the response promise with whatever response (or error) the framework user has provided.
.cascade(promise: context!.responsePromise)
.cascade(promise: context.responsePromise)
self.eventObserver = nil
}
}
31 changes: 20 additions & 11 deletions Sources/SwiftGRPCNIO/GRPCChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,47 @@ import SwiftProtobuf
import NIO
import NIOHTTP1

// Processes individual gRPC messages and stream-close events on a HTTP2 channel.
/// Processes individual gRPC messages and stream-close events on a HTTP2 channel.
public protocol GRPCCallHandler: ChannelHandler {
func makeGRPCServerCodec() -> ChannelHandler
MrMage marked this conversation as resolved.
Show resolved Hide resolved
}

// Provides `GRPCCallHandler` objects for the methods on a particular service name.
// Implemented by the generated code.
/// Provides `GRPCCallHandler` objects for the methods on a particular service name.
///
/// Implemented by the generated code.
public protocol CallHandlerProvider {
var serviceName: String { get }

// Looks up and returns the `GRPCCallHandler` for a particular method. Returns nil for unsupported methods.
func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel) -> GRPCCallHandler?
}

// Listens on a newly-opened HTTP2 subchannel and waits for the request headers to become available.
// Once those are available, asks the `CallHandlerProvider` corresponding to the request's service name for an
// `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
public final class GRPCChannelHandler: ChannelInboundHandler {
public typealias InboundIn = RawGRPCServerRequestPart

public typealias OutboundOut = RawGRPCServerResponsePart
/// Listens on a newly-opened HTTP2 subchannel and yields to the sub-handler matching a call, if available.
///
/// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
/// for an `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
public final class GRPCChannelHandler {

fileprivate let servicesByName: [String: CallHandlerProvider]
private let servicesByName: [String: CallHandlerProvider]

public init(servicesByName: [String: CallHandlerProvider]) {
self.servicesByName = servicesByName
}
}

extension GRPCChannelHandler: ChannelInboundHandler {
public typealias InboundIn = RawGRPCServerRequestPart
public typealias OutboundOut = RawGRPCServerResponsePart

public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
MrMage marked this conversation as resolved.
Show resolved Hide resolved
let requestPart = self.unwrapInboundIn(data)
switch requestPart {
case .head(let requestHead):
// URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
// - uriComponents[0]: empty
// - uriComponents[1]: service name (including the package name);
// `CallHandlerProvider`s should provide the service name including the package name.
// - uriComponents[2]: method name.
let uriComponents = requestHead.uri.components(separatedBy: "/")
guard uriComponents.count >= 3 && uriComponents[0].isEmpty,
MrMage marked this conversation as resolved.
Show resolved Hide resolved
let providerForServiceName = servicesByName[uriComponents[1]],
Expand Down
14 changes: 8 additions & 6 deletions Sources/SwiftGRPCNIO/GRPCServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ import NIO
import NIOHTTP1
import NIOHTTP2

// Wrapper object to manage the lifecycle of a gRPC server.
/// Wrapper object to manage the lifecycle of a gRPC server.
public final class GRPCServer {
/// Starts up a server that serves the given providers.
///
/// - Returns: A future that is completed when the server has successfully started up.
public static func start(
MrMage marked this conversation as resolved.
Show resolved Hide resolved
hostname: String,
port: Int,
eventLoopGroup: EventLoopGroup,
serviceProviders: [CallHandlerProvider]) -> EventLoopFuture<GRPCServer> {
let servicesByName = Dictionary(uniqueKeysWithValues: serviceProviders.map { ($0.serviceName, $0) })
let bootstrap = ServerBootstrap(group: eventLoopGroup)
// Specify backlog and enable SO_REUSEADDR for the server itself
// Specify a backlog to avoid overloading the server.
.serverChannelOption(ChannelOptions.backlog, value: 256)
// Enable `SO_REUSEADDR` to avoid "address already in use" error.
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)

// Set the handlers that are applied to the accepted Channels
Expand All @@ -33,16 +37,14 @@ public final class GRPCServer {
// Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)

return bootstrap.bind(host: hostname, port: port)
.map { GRPCServer(channel: $0) }
}

fileprivate let channel: Channel
private let channel: Channel

fileprivate init(channel: Channel) {
private init(channel: Channel) {
self.channel = channel
}

Expand Down
Loading