Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First implementation of gRPC based on SwiftNIO #281

Merged
merged 29 commits into from
Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
68e0a09
First experiments with a NIO-based gRPC server.
MrMage Jun 9, 2018
d891c38
Make `ServerStreamingCallHandler.sendMessage` return a send future as…
MrMage Nov 12, 2018
0866ea0
Re-enable two more tests and suppress two warnings.
MrMage Nov 12, 2018
6374d88
Unify the interface across the different call handlers.
MrMage Nov 12, 2018
ae35416
Rename `...CallHandler.handler` to `.eventObserver`.
MrMage Nov 12, 2018
29ad4f6
Add support for returning custom statuses (e.g. with additional metad…
MrMage Nov 12, 2018
7b592f1
Minor argument reordering.
MrMage Nov 12, 2018
d4c6c0e
Avoid forcing unary call handlers to return an event loop future. Ins…
MrMage Nov 12, 2018
488d243
Add a TODO.
MrMage Nov 12, 2018
137a1c1
Add codegen support for non-TestStub NIO server code.
MrMage Nov 12, 2018
de5ec1c
Add more properties to GRPCCallHandler.
MrMage Nov 12, 2018
495cdf4
Store the full `HTTPRequestHead` alongside a gRPC call handler.
MrMage Nov 12, 2018
c4efbf7
Add support for having client-streaming request handlers return a fut…
MrMage Nov 13, 2018
367bd32
Make `StatusSendingHandler.statusPromise` public.
MrMage Nov 13, 2018
4915ce5
Convert a few non-blocking calls in tests to blocking ones to simplif…
MrMage Nov 15, 2018
0ec7137
Refactoring: pass special `ResponseHandler` objects to NIO server cal…
MrMage Nov 15, 2018
973618b
Code review fixes, interface improvements.
MrMage Nov 16, 2018
ddef365
Rename a few NIO tests.
MrMage Nov 16, 2018
036ea30
Add documentation.
MrMage Nov 16, 2018
0aa0af0
Rename "headers" to "request".
MrMage Nov 20, 2018
a66d449
Minor performance improvement by avoiding one copy.
MrMage Nov 20, 2018
b3bbc7d
Make unary calls take a `StatusOnlyCallContext` instead of `UnaryResp…
MrMage Nov 20, 2018
c7a9eef
Rename `sendOperationChain` in tests to `endOfSendOperationQueue`.
MrMage Nov 21, 2018
7a131a6
Review fixes.
MrMage Nov 21, 2018
b3537b9
Add one more comment to the README.
MrMage Nov 21, 2018
5503cb6
Oops, fix the tests.
MrMage Nov 21, 2018
a347855
Remove two unnecessary server channel options.
MrMage Nov 23, 2018
0a879a9
Add some more documentation.
MrMage Nov 26, 2018
7ab7a05
Pin `SwiftNIOHTTP2` for the time being.
MrMage Nov 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ sudo: false

addons:
apt:
sources:
- sourceline: 'ppa:ondrej/apache2' # for libnghttp2-dev
packages:
- clang-3.8
- lldb-3.8
Expand All @@ -46,6 +48,10 @@ addons:
- uuid-dev
- curl
- unzip
- libnghttp2-dev

before_install:
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew install nghttp2; fi

install: ./.travis-install.sh

Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ test-plugin:
protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=TestStubs=true
diff -u /tmp/echo.grpc.swift Sources/Examples/Echo/Generated/echo.grpc.swift

test-plugin-nio:
swift build $(CFLAGS) --product protoc-gen-swiftgrpc
protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=Client=false,NIO=true
diff -u /tmp/echo.grpc.swift Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift

xcodebuild: project
xcodebuild -project SwiftGRPC.xcodeproj -configuration "Debug" -parallelizeTargets -target SwiftGRPC -target Echo -target Simple -target protoc-gen-swiftgrpc build

Expand Down
17 changes: 14 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import PackageDescription

var packageDependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-protobuf.git", .upToNextMinor(from: "1.1.1")),
.package(url: "https://github.com/kylef/Commander.git", from: "0.8.0"),
.package(url: "https://github.com/apple/swift-nio-zlib-support.git", from: "1.0.0")
.package(url: "https://github.com/kylef/Commander.git", .upToNextMinor(from: "0.8.0")),
.package(url: "https://github.com/apple/swift-nio-zlib-support.git", .upToNextMinor(from: "1.0.0")),
.package(url: "https://github.com/apple/swift-nio.git", .upToNextMinor(from: "1.11.0")),
.package(url: "https://github.com/apple/swift-nio-nghttp2-support.git", .upToNextMinor(from: "1.0.0")),
.package(url: "https://github.com/apple/swift-nio-http2.git", .revision("38b8235868e1e6277c420b73ac5cfdfa66382a85"))
]

var cGRPCDependencies: [Target.Dependency] = []
Expand All @@ -35,11 +38,18 @@ let package = Package(
name: "SwiftGRPC",
products: [
.library(name: "SwiftGRPC", targets: ["SwiftGRPC"]),
.library(name: "SwiftGRPCNIO", targets: ["SwiftGRPCNIO"]),
],
dependencies: packageDependencies,
targets: [
.target(name: "SwiftGRPC",
dependencies: ["CgRPC", "SwiftProtobuf"]),
.target(name: "SwiftGRPCNIO",
dependencies: [
"NIOFoundationCompat",
"NIOHTTP1",
"NIOHTTP2",
"SwiftProtobuf"]),
.target(name: "CgRPC",
dependencies: cGRPCDependencies),
.target(name: "RootsEncoder"),
Expand All @@ -58,7 +68,8 @@ let package = Package(
.target(name: "Simple",
dependencies: ["SwiftGRPC", "Commander"],
path: "Sources/Examples/Simple"),
.testTarget(name: "SwiftGRPCTests", dependencies: ["SwiftGRPC"])
.testTarget(name: "SwiftGRPCTests", dependencies: ["SwiftGRPC"]),
.testTarget(name: "SwiftGRPCNIOTests", dependencies: ["SwiftGRPC", "SwiftGRPCNIO"])
],
cLanguageStandard: .gnu11,
cxxLanguageStandard: .cxx11)
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ testing with the following versions:
- Swift 4.0
- swift-protobuf 1.1.1

## `SwiftGRPCNIO` package

`SwiftGRPCNIO` is a clean-room implementation of the gRPC protocol on top of the [`SwiftNIO`](http://github.com/apple/swift-nio) library. This implementation is not yet production-ready as it lacks several things recommended for production use:

- Better test coverage
- Full error handling
- SSL support
- Client support
- Example projects
- iOS support
- Removal of the `libnghttp2` dependency from `SwiftNIOHTTP2`

However, if you are planning to implement a gRPC service based on `SwiftNIO` or the Vapor framework, you might find this package useful. In addition, once ready, this package should provide more predictable and reliable behavior in the future, combined with an improved API and better developer experience.

You may also want to have a look at [this presentation](https://docs.google.com/presentation/d/1Mnsaq4mkeagZSP4mK1k0vewZrJKynm_MCteRDyM3OX8/edit) for more details on the motivation for this package.

## License

grpc-swift is released under the same license as
Expand Down
36 changes: 36 additions & 0 deletions Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP1

/// Provides a means for decoding incoming gRPC messages into protobuf objects.
///
/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
public class BaseCallHandler<RequestMessage: Message, ResponseMessage: Message>: GRPCCallHandler {
public func makeGRPCServerCodec() -> ChannelHandler { return GRPCServerCodec<RequestMessage, ResponseMessage>() }

/// Called whenever a message has been received.
///
/// Overridden by subclasses.
public func processMessage(_ message: RequestMessage) {
fatalError("needs to be overridden")
}

/// Called when the client has half-closed the stream, indicating that they won't send any further data.
///
/// Overridden by subclasses if the "end-of-stream" event is relevant.
public func endOfStreamReceived() { }
}

extension BaseCallHandler: ChannelInboundHandler {
public typealias InboundIn = GRPCServerRequestPart<RequestMessage>
public typealias OutboundOut = GRPCServerResponsePart<ResponseMessage>

public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
switch self.unwrapInboundIn(data) {
case .head: preconditionFailure("should not have received headers")
case .message(let message): processMessage(message)
case .end: endOfStreamReceived()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP1

/// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
///
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
/// - To close the call and send the status, fulfill `context.statusPromise`.
public class BidirectionalStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
private var eventObserver: EventLoopFuture<EventObserver>?

private var context: StreamingResponseCallContext<ResponseMessage>?
MrMage marked this conversation as resolved.
Show resolved Hide resolved

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
super.init()
let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.context = context
let eventObserver = eventObserverFactory(context)
self.eventObserver = eventObserver
// Terminate the call if no observer is provided.
eventObserver.cascadeFailure(promise: context.statusPromise)
context.statusPromise.futureResult.whenComplete {
// When done, reset references to avoid retain cycles.
self.eventObserver = nil
self.context = nil
}
}

public override func processMessage(_ message: RequestMessage) {
eventObserver?.whenSuccess { observer in
observer(.message(message))
}
}

public override func endOfStreamReceived() {
eventObserver?.whenSuccess { observer in
observer(.end)
}
}
}
43 changes: 43 additions & 0 deletions Sources/SwiftGRPCNIO/CallHandlers/ClientStreamingCallHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP1

/// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
///
/// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done.
public class ClientStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
private var eventObserver: EventLoopFuture<EventObserver>?

private var context: UnaryResponseCallContext<ResponseMessage>?
MrMage marked this conversation as resolved.
Show resolved Hide resolved

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
super.init()
let context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.context = context
let eventObserver = eventObserverFactory(context)
self.eventObserver = eventObserver
// Terminate the call if no observer is provided.
eventObserver.cascadeFailure(promise: context.responsePromise)
context.responsePromise.futureResult.whenComplete {
// When done, reset references to avoid retain cycles.
self.eventObserver = nil
self.context = nil
}
}

public override func processMessage(_ message: RequestMessage) {
eventObserver?.whenSuccess { observer in
observer(.message(message))
}
}

public override func endOfStreamReceived() {
eventObserver?.whenSuccess { observer in
observer(.end)
}
}
}
43 changes: 43 additions & 0 deletions Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP1

/// Handles server-streaming calls. Calls the observer block with the request message.
///
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
/// - To close the call and send the status, complete the status future returned by the observer block.
public class ServerStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<GRPCStatus>
private var eventObserver: EventObserver?

private var context: StreamingResponseCallContext<ResponseMessage>?

public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventObserver) {
super.init()
let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.context = context
self.eventObserver = eventObserverFactory(context)
context.statusPromise.futureResult.whenComplete {
// When done, reset references to avoid retain cycles.
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
self.eventObserver = nil
self.context = nil
}
}


public override func processMessage(_ message: RequestMessage) {
guard let eventObserver = self.eventObserver,
let context = self.context else {
//! FIXME: Better handle this error?
print("multiple messages received on unary call")
return
}

let resultFuture = eventObserver(message)
resultFuture
// Fulfill the status promise with whatever status the framework user has provided.
.cascade(promise: context.statusPromise)
self.eventObserver = nil
}
}
43 changes: 43 additions & 0 deletions Sources/SwiftGRPCNIO/CallHandlers/UnaryCallHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP1

/// Handles unary calls. Calls the observer block with the request message.
///
/// - The observer block is implemented by the framework user and returns a future containing the call result.
/// - To return a response to the client, the framework user should complete that future
/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
public class UnaryCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<ResponseMessage>
private var eventObserver: EventObserver?

private var context: UnaryResponseCallContext<ResponseMessage>?

public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventObserver) {
super.init()
let context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
self.context = context
self.eventObserver = eventObserverFactory(context)
context.responsePromise.futureResult.whenComplete {
// When done, reset references to avoid retain cycles.
self.eventObserver = nil
self.context = nil
}
}

public override func processMessage(_ message: RequestMessage) {
guard let eventObserver = self.eventObserver,
let context = self.context else {
//! FIXME: Better handle this error?
print("multiple messages received on unary call")
return
}

let resultFuture = eventObserver(message)
resultFuture
// Fulfill the response promise with whatever response (or error) the framework user has provided.
.cascade(promise: context.responsePromise)
self.eventObserver = nil
}
}
72 changes: 72 additions & 0 deletions Sources/SwiftGRPCNIO/GRPCChannelHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP1

/// Processes individual gRPC messages and stream-close events on a HTTP2 channel.
public protocol GRPCCallHandler: ChannelHandler {
func makeGRPCServerCodec() -> ChannelHandler
MrMage marked this conversation as resolved.
Show resolved Hide resolved
}

/// Provides `GRPCCallHandler` objects for the methods on a particular service name.
///
/// Implemented by the generated code.
public protocol CallHandlerProvider: class {
/// The name of the service this object is providing methods for, including the package path.
///
/// - Example: "io.grpc.Echo.EchoService"
var serviceName: String { get }

/// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's
/// method. Returns nil for methods not handled by this service.
func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel) -> GRPCCallHandler?
}

/// Listens on a newly-opened HTTP2 subchannel and yields to the sub-handler matching a call, if available.
///
/// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
/// for an `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
public final class GRPCChannelHandler {
private let servicesByName: [String: CallHandlerProvider]

public init(servicesByName: [String: CallHandlerProvider]) {
self.servicesByName = servicesByName
}
}

extension GRPCChannelHandler: ChannelInboundHandler {
public typealias InboundIn = RawGRPCServerRequestPart
public typealias OutboundOut = RawGRPCServerResponsePart

public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
MrMage marked this conversation as resolved.
Show resolved Hide resolved
let requestPart = self.unwrapInboundIn(data)
switch requestPart {
case .head(let requestHead):
// URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
// - uriComponents[0]: empty
// - uriComponents[1]: service name (including the package name);
// `CallHandlerProvider`s should provide the service name including the package name.
// - uriComponents[2]: method name.
let uriComponents = requestHead.uri.components(separatedBy: "/")
guard uriComponents.count >= 3 && uriComponents[0].isEmpty,
MrMage marked this conversation as resolved.
Show resolved Hide resolved
let providerForServiceName = servicesByName[uriComponents[1]],
let callHandler = providerForServiceName.handleMethod(uriComponents[2], request: requestHead, serverHandler: self, channel: ctx.channel) else {
ctx.writeAndFlush(self.wrapOutboundOut(.status(.unimplemented(method: requestHead.uri))), promise: nil)
return
}

var responseHeaders = HTTPHeaders()
responseHeaders.add(name: "content-type", value: "application/grpc")
ctx.write(self.wrapOutboundOut(.headers(responseHeaders)), promise: nil)

let codec = callHandler.makeGRPCServerCodec()
ctx.pipeline.add(handler: codec, after: self)
.then { ctx.pipeline.add(handler: callHandler, after: codec) }
//! FIXME(lukasa): Fix the ordering of this with NIO 1.12 and replace with `remove(, promise:)`.
.whenComplete { _ = ctx.pipeline.remove(handler: self) }
MrMage marked this conversation as resolved.
Show resolved Hide resolved

case .message, .end:
preconditionFailure("received \(requestPart), should have been removed as a handler at this point")
}
}
}
Loading