diff --git a/.travis.yml b/.travis.yml index e55c98f4c..f2463be49 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,6 +34,8 @@ sudo: false addons: apt: + sources: + - sourceline: 'ppa:ondrej/apache2' # for libnghttp2-dev packages: - clang-3.8 - lldb-3.8 @@ -46,6 +48,10 @@ addons: - uuid-dev - curl - unzip + - libnghttp2-dev + +before_install: + - if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew install nghttp2; fi install: ./.travis-install.sh diff --git a/Makefile b/Makefile index ecd342e91..b19dbccfd 100644 --- a/Makefile +++ b/Makefile @@ -45,6 +45,11 @@ test-plugin: protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=TestStubs=true diff -u /tmp/echo.grpc.swift Sources/Examples/Echo/Generated/echo.grpc.swift +test-plugin-nio: + swift build $(CFLAGS) --product protoc-gen-swiftgrpc + protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=Client=false,NIO=true + diff -u /tmp/echo.grpc.swift Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift + xcodebuild: project xcodebuild -project SwiftGRPC.xcodeproj -configuration "Debug" -parallelizeTargets -target SwiftGRPC -target Echo -target Simple -target protoc-gen-swiftgrpc build diff --git a/Package.swift b/Package.swift index adae16b80..8cd141c22 100644 --- a/Package.swift +++ b/Package.swift @@ -18,8 +18,11 @@ import PackageDescription var packageDependencies: [Package.Dependency] = [ .package(url: "https://github.com/apple/swift-protobuf.git", .upToNextMinor(from: "1.1.1")), - .package(url: "https://github.com/kylef/Commander.git", from: "0.8.0"), - .package(url: "https://github.com/apple/swift-nio-zlib-support.git", from: "1.0.0") + .package(url: "https://github.com/kylef/Commander.git", .upToNextMinor(from: "0.8.0")), + .package(url: "https://github.com/apple/swift-nio-zlib-support.git", .upToNextMinor(from: "1.0.0")), + .package(url: "https://github.com/apple/swift-nio.git", .upToNextMinor(from: "1.11.0")), + .package(url: "https://github.com/apple/swift-nio-nghttp2-support.git", .upToNextMinor(from: "1.0.0")), + .package(url: "https://github.com/apple/swift-nio-http2.git", .revision("38b8235868e1e6277c420b73ac5cfdfa66382a85")) ] var cGRPCDependencies: [Target.Dependency] = [] @@ -35,11 +38,18 @@ let package = Package( name: "SwiftGRPC", products: [ .library(name: "SwiftGRPC", targets: ["SwiftGRPC"]), + .library(name: "SwiftGRPCNIO", targets: ["SwiftGRPCNIO"]), ], dependencies: packageDependencies, targets: [ .target(name: "SwiftGRPC", dependencies: ["CgRPC", "SwiftProtobuf"]), + .target(name: "SwiftGRPCNIO", + dependencies: [ + "NIOFoundationCompat", + "NIOHTTP1", + "NIOHTTP2", + "SwiftProtobuf"]), .target(name: "CgRPC", dependencies: cGRPCDependencies), .target(name: "RootsEncoder"), @@ -58,7 +68,8 @@ let package = Package( .target(name: "Simple", dependencies: ["SwiftGRPC", "Commander"], path: "Sources/Examples/Simple"), - .testTarget(name: "SwiftGRPCTests", dependencies: ["SwiftGRPC"]) + .testTarget(name: "SwiftGRPCTests", dependencies: ["SwiftGRPC"]), + .testTarget(name: "SwiftGRPCNIOTests", dependencies: ["SwiftGRPC", "SwiftGRPCNIO"]) ], cLanguageStandard: .gnu11, cxxLanguageStandard: .cxx11) diff --git a/README.md b/README.md index 540002447..7fb4c45ec 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,22 @@ testing with the following versions: - Swift 4.0 - swift-protobuf 1.1.1 +## `SwiftGRPCNIO` package + +`SwiftGRPCNIO` is a clean-room implementation of the gRPC protocol on top of the [`SwiftNIO`](http://github.com/apple/swift-nio) library. This implementation is not yet production-ready as it lacks several things recommended for production use: + +- Better test coverage +- Full error handling +- SSL support +- Client support +- Example projects +- iOS support +- Removal of the `libnghttp2` dependency from `SwiftNIOHTTP2` + +However, if you are planning to implement a gRPC service based on `SwiftNIO` or the Vapor framework, you might find this package useful. In addition, once ready, this package should provide more predictable and reliable behavior in the future, combined with an improved API and better developer experience. + +You may also want to have a look at [this presentation](https://docs.google.com/presentation/d/1Mnsaq4mkeagZSP4mK1k0vewZrJKynm_MCteRDyM3OX8/edit) for more details on the motivation for this package. + ## License grpc-swift is released under the same license as diff --git a/Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift b/Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift new file mode 100644 index 000000000..33d18d5e6 --- /dev/null +++ b/Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift @@ -0,0 +1,36 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Provides a means for decoding incoming gRPC messages into protobuf objects. +/// +/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses. +public class BaseCallHandler: GRPCCallHandler { + public func makeGRPCServerCodec() -> ChannelHandler { return GRPCServerCodec() } + + /// Called whenever a message has been received. + /// + /// Overridden by subclasses. + public func processMessage(_ message: RequestMessage) { + fatalError("needs to be overridden") + } + + /// Called when the client has half-closed the stream, indicating that they won't send any further data. + /// + /// Overridden by subclasses if the "end-of-stream" event is relevant. + public func endOfStreamReceived() { } +} + +extension BaseCallHandler: ChannelInboundHandler { + public typealias InboundIn = GRPCServerRequestPart + public typealias OutboundOut = GRPCServerResponsePart + + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + switch self.unwrapInboundIn(data) { + case .head: preconditionFailure("should not have received headers") + case .message(let message): processMessage(message) + case .end: endOfStreamReceived() + } + } +} diff --git a/Sources/SwiftGRPCNIO/CallHandlers/BidirectionalStreamingCallHandler.swift b/Sources/SwiftGRPCNIO/CallHandlers/BidirectionalStreamingCallHandler.swift new file mode 100644 index 000000000..46f4b7622 --- /dev/null +++ b/Sources/SwiftGRPCNIO/CallHandlers/BidirectionalStreamingCallHandler.swift @@ -0,0 +1,44 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block. +/// +/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed. +/// - To close the call and send the status, fulfill `context.statusPromise`. +public class BidirectionalStreamingCallHandler: BaseCallHandler { + public typealias EventObserver = (StreamEvent) -> Void + private var eventObserver: EventLoopFuture? + + private var context: StreamingResponseCallContext? + + // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call. + // If authentication fails, they can simply fail the observer future, which causes the call to be terminated. + public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext) -> EventLoopFuture) { + super.init() + let context = StreamingResponseCallContextImpl(channel: channel, request: request) + self.context = context + let eventObserver = eventObserverFactory(context) + self.eventObserver = eventObserver + // Terminate the call if no observer is provided. + eventObserver.cascadeFailure(promise: context.statusPromise) + context.statusPromise.futureResult.whenComplete { + // When done, reset references to avoid retain cycles. + self.eventObserver = nil + self.context = nil + } + } + + public override func processMessage(_ message: RequestMessage) { + eventObserver?.whenSuccess { observer in + observer(.message(message)) + } + } + + public override func endOfStreamReceived() { + eventObserver?.whenSuccess { observer in + observer(.end) + } + } +} diff --git a/Sources/SwiftGRPCNIO/CallHandlers/ClientStreamingCallHandler.swift b/Sources/SwiftGRPCNIO/CallHandlers/ClientStreamingCallHandler.swift new file mode 100644 index 000000000..bd03ae744 --- /dev/null +++ b/Sources/SwiftGRPCNIO/CallHandlers/ClientStreamingCallHandler.swift @@ -0,0 +1,43 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block. +/// +/// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done. +public class ClientStreamingCallHandler: BaseCallHandler { + public typealias EventObserver = (StreamEvent) -> Void + private var eventObserver: EventLoopFuture? + + private var context: UnaryResponseCallContext? + + // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call. + // If authentication fails, they can simply fail the observer future, which causes the call to be terminated. + public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (UnaryResponseCallContext) -> EventLoopFuture) { + super.init() + let context = UnaryResponseCallContextImpl(channel: channel, request: request) + self.context = context + let eventObserver = eventObserverFactory(context) + self.eventObserver = eventObserver + // Terminate the call if no observer is provided. + eventObserver.cascadeFailure(promise: context.responsePromise) + context.responsePromise.futureResult.whenComplete { + // When done, reset references to avoid retain cycles. + self.eventObserver = nil + self.context = nil + } + } + + public override func processMessage(_ message: RequestMessage) { + eventObserver?.whenSuccess { observer in + observer(.message(message)) + } + } + + public override func endOfStreamReceived() { + eventObserver?.whenSuccess { observer in + observer(.end) + } + } +} diff --git a/Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift b/Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift new file mode 100644 index 000000000..893745c69 --- /dev/null +++ b/Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift @@ -0,0 +1,43 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Handles server-streaming calls. Calls the observer block with the request message. +/// +/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed. +/// - To close the call and send the status, complete the status future returned by the observer block. +public class ServerStreamingCallHandler: BaseCallHandler { + public typealias EventObserver = (RequestMessage) -> EventLoopFuture + private var eventObserver: EventObserver? + + private var context: StreamingResponseCallContext? + + public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext) -> EventObserver) { + super.init() + let context = StreamingResponseCallContextImpl(channel: channel, request: request) + self.context = context + self.eventObserver = eventObserverFactory(context) + context.statusPromise.futureResult.whenComplete { + // When done, reset references to avoid retain cycles. + self.eventObserver = nil + self.context = nil + } + } + + + public override func processMessage(_ message: RequestMessage) { + guard let eventObserver = self.eventObserver, + let context = self.context else { + //! FIXME: Better handle this error? + print("multiple messages received on unary call") + return + } + + let resultFuture = eventObserver(message) + resultFuture + // Fulfill the status promise with whatever status the framework user has provided. + .cascade(promise: context.statusPromise) + self.eventObserver = nil + } +} diff --git a/Sources/SwiftGRPCNIO/CallHandlers/UnaryCallHandler.swift b/Sources/SwiftGRPCNIO/CallHandlers/UnaryCallHandler.swift new file mode 100644 index 000000000..8971b2cca --- /dev/null +++ b/Sources/SwiftGRPCNIO/CallHandlers/UnaryCallHandler.swift @@ -0,0 +1,43 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Handles unary calls. Calls the observer block with the request message. +/// +/// - The observer block is implemented by the framework user and returns a future containing the call result. +/// - To return a response to the client, the framework user should complete that future +/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor). +public class UnaryCallHandler: BaseCallHandler { + public typealias EventObserver = (RequestMessage) -> EventLoopFuture + private var eventObserver: EventObserver? + + private var context: UnaryResponseCallContext? + + public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (UnaryResponseCallContext) -> EventObserver) { + super.init() + let context = UnaryResponseCallContextImpl(channel: channel, request: request) + self.context = context + self.eventObserver = eventObserverFactory(context) + context.responsePromise.futureResult.whenComplete { + // When done, reset references to avoid retain cycles. + self.eventObserver = nil + self.context = nil + } + } + + public override func processMessage(_ message: RequestMessage) { + guard let eventObserver = self.eventObserver, + let context = self.context else { + //! FIXME: Better handle this error? + print("multiple messages received on unary call") + return + } + + let resultFuture = eventObserver(message) + resultFuture + // Fulfill the response promise with whatever response (or error) the framework user has provided. + .cascade(promise: context.responsePromise) + self.eventObserver = nil + } +} diff --git a/Sources/SwiftGRPCNIO/GRPCChannelHandler.swift b/Sources/SwiftGRPCNIO/GRPCChannelHandler.swift new file mode 100644 index 000000000..e16879efe --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCChannelHandler.swift @@ -0,0 +1,72 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Processes individual gRPC messages and stream-close events on a HTTP2 channel. +public protocol GRPCCallHandler: ChannelHandler { + func makeGRPCServerCodec() -> ChannelHandler +} + +/// Provides `GRPCCallHandler` objects for the methods on a particular service name. +/// +/// Implemented by the generated code. +public protocol CallHandlerProvider: class { + /// The name of the service this object is providing methods for, including the package path. + /// + /// - Example: "io.grpc.Echo.EchoService" + var serviceName: String { get } + + /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's + /// method. Returns nil for methods not handled by this service. + func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel) -> GRPCCallHandler? +} + +/// Listens on a newly-opened HTTP2 subchannel and yields to the sub-handler matching a call, if available. +/// +/// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name +/// for an `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages. +public final class GRPCChannelHandler { + private let servicesByName: [String: CallHandlerProvider] + + public init(servicesByName: [String: CallHandlerProvider]) { + self.servicesByName = servicesByName + } +} + +extension GRPCChannelHandler: ChannelInboundHandler { + public typealias InboundIn = RawGRPCServerRequestPart + public typealias OutboundOut = RawGRPCServerResponsePart + + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + let requestPart = self.unwrapInboundIn(data) + switch requestPart { + case .head(let requestHead): + // URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash: + // - uriComponents[0]: empty + // - uriComponents[1]: service name (including the package name); + // `CallHandlerProvider`s should provide the service name including the package name. + // - uriComponents[2]: method name. + let uriComponents = requestHead.uri.components(separatedBy: "/") + guard uriComponents.count >= 3 && uriComponents[0].isEmpty, + let providerForServiceName = servicesByName[uriComponents[1]], + let callHandler = providerForServiceName.handleMethod(uriComponents[2], request: requestHead, serverHandler: self, channel: ctx.channel) else { + ctx.writeAndFlush(self.wrapOutboundOut(.status(.unimplemented(method: requestHead.uri))), promise: nil) + return + } + + var responseHeaders = HTTPHeaders() + responseHeaders.add(name: "content-type", value: "application/grpc") + ctx.write(self.wrapOutboundOut(.headers(responseHeaders)), promise: nil) + + let codec = callHandler.makeGRPCServerCodec() + ctx.pipeline.add(handler: codec, after: self) + .then { ctx.pipeline.add(handler: callHandler, after: codec) } + //! FIXME(lukasa): Fix the ordering of this with NIO 1.12 and replace with `remove(, promise:)`. + .whenComplete { _ = ctx.pipeline.remove(handler: self) } + + case .message, .end: + preconditionFailure("received \(requestPart), should have been removed as a handler at this point") + } + } +} diff --git a/Sources/SwiftGRPCNIO/GRPCServer.swift b/Sources/SwiftGRPCNIO/GRPCServer.swift new file mode 100644 index 000000000..f87a5166a --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCServer.swift @@ -0,0 +1,59 @@ +import Foundation +import NIO +import NIOHTTP1 +import NIOHTTP2 + +/// Wrapper object to manage the lifecycle of a gRPC server. +public final class GRPCServer { + /// Starts up a server that serves the given providers. + /// + /// - Returns: A future that is completed when the server has successfully started up. + public static func start( + hostname: String, + port: Int, + eventLoopGroup: EventLoopGroup, + serviceProviders: [CallHandlerProvider]) -> EventLoopFuture { + let servicesByName = Dictionary(uniqueKeysWithValues: serviceProviders.map { ($0.serviceName, $0) }) + let bootstrap = ServerBootstrap(group: eventLoopGroup) + // Specify a backlog to avoid overloading the server. + .serverChannelOption(ChannelOptions.backlog, value: 256) + // Enable `SO_REUSEADDR` to avoid "address already in use" error. + .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + + // Set the handlers that are applied to the accepted Channels + .childChannelInitializer { channel in + //! FIXME: Add an option for gRPC-via-HTTP1 (pPRC). + return channel.pipeline.add(handler: HTTP2Parser(mode: .server)).then { + let multiplexer = HTTP2StreamMultiplexer { (channel, streamID) -> EventLoopFuture in + return channel.pipeline.add(handler: HTTP2ToHTTP1ServerCodec(streamID: streamID)) + .then { channel.pipeline.add(handler: HTTP1ToRawGRPCServerCodec()) } + .then { channel.pipeline.add(handler: GRPCChannelHandler(servicesByName: servicesByName)) } + } + + return channel.pipeline.add(handler: multiplexer) + } + } + + // Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels + .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1) + .childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + + return bootstrap.bind(host: hostname, port: port) + .map { GRPCServer(channel: $0) } + } + + private let channel: Channel + + private init(channel: Channel) { + self.channel = channel + } + + /// Fired when the server shuts down. + public var onClose: EventLoopFuture { + return channel.closeFuture + } + + public func close() -> EventLoopFuture { + return channel.close(mode: .all) + } +} diff --git a/Sources/SwiftGRPCNIO/GRPCServerCodec.swift b/Sources/SwiftGRPCNIO/GRPCServerCodec.swift new file mode 100644 index 000000000..21652e8bb --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCServerCodec.swift @@ -0,0 +1,71 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOFoundationCompat +import NIOHTTP1 + +/// Incoming gRPC package with a fixed message type. +public enum GRPCServerRequestPart { + case head(HTTPRequestHead) + case message(MessageType) + case end +} + +/// Outgoing gRPC package with a fixed message type. +public enum GRPCServerResponsePart { + case headers(HTTPHeaders) + case message(MessageType) + case status(GRPCStatus) +} + +/// A simple channel handler that translates raw gRPC packets into decoded protobuf messages, and vice versa. +public final class GRPCServerCodec { } + +extension GRPCServerCodec: ChannelInboundHandler { + public typealias InboundIn = RawGRPCServerRequestPart + public typealias InboundOut = GRPCServerRequestPart + + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + switch self.unwrapInboundIn(data) { + case .head(let requestHead): + ctx.fireChannelRead(self.wrapInboundOut(.head(requestHead))) + + case .message(var message): + let messageAsData = message.readData(length: message.readableBytes)! + do { + ctx.fireChannelRead(self.wrapInboundOut(.message(try RequestMessage(serializedData: messageAsData)))) + } catch { + //! FIXME: Ensure that the last handler in the pipeline returns `.dataLoss` here? + ctx.fireErrorCaught(error) + } + + case .end: + ctx.fireChannelRead(self.wrapInboundOut(.end)) + } + } +} + +extension GRPCServerCodec: ChannelOutboundHandler { + public typealias OutboundIn = GRPCServerResponsePart + public typealias OutboundOut = RawGRPCServerResponsePart + + public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let responsePart = self.unwrapOutboundIn(data) + switch responsePart { + case .headers(let headers): + ctx.write(self.wrapOutboundOut(.headers(headers)), promise: promise) + case .message(let message): + do { + let messageData = try message.serializedData() + var responseBuffer = ctx.channel.allocator.buffer(capacity: messageData.count) + responseBuffer.write(bytes: messageData) + ctx.write(self.wrapOutboundOut(.message(responseBuffer)), promise: promise) + } catch { + promise?.fail(error: error) + ctx.fireErrorCaught(error) + } + case .status(let status): + ctx.write(self.wrapOutboundOut(.status(status)), promise: promise) + } + } +} diff --git a/Sources/SwiftGRPCNIO/GRPCStatus.swift b/Sources/SwiftGRPCNIO/GRPCStatus.swift new file mode 100644 index 000000000..9e4109b82 --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCStatus.swift @@ -0,0 +1,30 @@ +import Foundation +import NIOHTTP1 + +/// Encapsulates the result of a gRPC call. +public struct GRPCStatus: Error { + /// The code to return in the `grpc-status` header. + public let code: StatusCode + /// The message to return in the `grpc-message` header. + public let message: String + /// Additional HTTP headers to return in the trailers. + public let trailingMetadata: HTTPHeaders + + public init(code: StatusCode, message: String, trailingMetadata: HTTPHeaders = HTTPHeaders()) { + self.code = code + self.message = message + self.trailingMetadata = trailingMetadata + } + + // Frequently used "default" statuses. + + /// The default status to return for succeeded calls. + public static let ok = GRPCStatus(code: .ok, message: "OK") + /// "Internal server error" status. + public static let processingError = GRPCStatus(code: .internalError, message: "unknown error processing request") + + /// Status indicating that the given method is not implemented. + public static func unimplemented(method: String) -> GRPCStatus { + return GRPCStatus(code: .unimplemented, message: "unknown method " + method) + } +} diff --git a/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCServerCodec.swift b/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCServerCodec.swift new file mode 100644 index 000000000..f8a84cfe3 --- /dev/null +++ b/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCServerCodec.swift @@ -0,0 +1,137 @@ +import Foundation +import NIO +import NIOHTTP1 + +/// Incoming gRPC package with an unknown message type (represented by a byte buffer). +public enum RawGRPCServerRequestPart { + case head(HTTPRequestHead) + case message(ByteBuffer) + case end +} + +/// Outgoing gRPC package with an unknown message type (represented by a byte buffer). +public enum RawGRPCServerResponsePart { + case headers(HTTPHeaders) + case message(ByteBuffer) + case status(GRPCStatus) +} + +/// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa. +/// +/// This codec allows us to use the "raw" gRPC protocol on a low level, with further handlers operationg the protocol +/// on a "higher" level. +/// +/// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2 +/// primitives while providing all the functionality we need. In addition, this should make implementing gRPC-over-HTTP1 +/// (sometimes also called pPRC) easier in the future. +/// +/// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`. +public final class HTTP1ToRawGRPCServerCodec { + private enum State { + case expectingHeaders + case expectingCompressedFlag + case expectingMessageLength + case receivedMessageLength(UInt32) + + var expectingBody: Bool { + switch self { + case .expectingHeaders: return false + case .expectingCompressedFlag, .expectingMessageLength, .receivedMessageLength: return true + } + } + } + + private var state = State.expectingHeaders + + private var buffer: NIO.ByteBuffer? +} + +extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler { + public typealias InboundIn = HTTPServerRequestPart + public typealias InboundOut = RawGRPCServerRequestPart + + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + switch self.unwrapInboundIn(data) { + case .head(let requestHead): + guard case .expectingHeaders = state + else { preconditionFailure("received headers while in state \(state)") } + + state = .expectingCompressedFlag + buffer = ctx.channel.allocator.buffer(capacity: 5) + + ctx.fireChannelRead(self.wrapInboundOut(.head(requestHead))) + + case .body(var body): + guard var buffer = buffer + else { preconditionFailure("buffer not initialized") } + assert(state.expectingBody, "received body while in state \(state)") + buffer.write(buffer: &body) + + // Iterate over all available incoming data, trying to read length-delimited messages. + // Each message has the following format: + // - 1 byte "compressed" flag (currently always zero, as we do not support for compression) + // - 4 byte signed-integer payload length (N) + // - N bytes payload (normally a valid wire-format protocol buffer) + requestProcessing: while true { + switch state { + case .expectingHeaders: preconditionFailure("unexpected state \(state)") + case .expectingCompressedFlag: + guard let compressionFlag: Int8 = buffer.readInteger() else { break requestProcessing } + //! FIXME: Avoid crashing here and instead drop the connection. + precondition(compressionFlag == 0, "unexpected compression flag \(compressionFlag); compression is not supported and we did not indicate support for it") + state = .expectingMessageLength + + case .expectingMessageLength: + guard let messageLength: UInt32 = buffer.readInteger() else { break requestProcessing } + state = .receivedMessageLength(messageLength) + + case .receivedMessageLength(let messageLength): + guard let messageBytes = buffer.readBytes(length: numericCast(messageLength)) else { break } + + //! FIXME: Use a slice of this buffer instead of copying to a new buffer. + var responseBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.count) + responseBuffer.write(bytes: messageBytes) + ctx.fireChannelRead(self.wrapInboundOut(.message(responseBuffer))) + //! FIXME: Call buffer.discardReadBytes() here? + //! ALTERNATIVE: Check if the buffer has no further data right now, then clear it. + + state = .expectingCompressedFlag + } + } + + case .end(let trailers): + if let trailers = trailers { + //! FIXME: Better handle this error. + print("unexpected trailers received: \(trailers)") + return + } + ctx.fireChannelRead(self.wrapInboundOut(.end)) + } + } +} + +extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler { + public typealias OutboundIn = RawGRPCServerResponsePart + public typealias OutboundOut = HTTPServerResponsePart + + public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let responsePart = self.unwrapOutboundIn(data) + switch responsePart { + case .headers(let headers): + //! FIXME: Should return a different version if we want to support pPRC. + ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: .init(major: 2, minor: 0), status: .ok, headers: headers))), promise: promise) + case .message(var messageBytes): + // Write out a length-delimited message payload. See `channelRead` fpor the corresponding format. + var responseBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.readableBytes + 5) + responseBuffer.write(integer: Int8(0)) // Compression flag: no compression + responseBuffer.write(integer: UInt32(messageBytes.readableBytes)) + responseBuffer.write(buffer: &messageBytes) + ctx.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise) + case .status(let status): + var trailers = status.trailingMetadata + trailers.add(name: "grpc-status", value: String(describing: status.code.rawValue)) + trailers.add(name: "grpc-message", value: status.message) + ctx.write(self.wrapOutboundOut(.end(trailers)), promise: promise) + } + } +} diff --git a/Sources/SwiftGRPCNIO/ServerCallContexts/ServerCallContext.swift b/Sources/SwiftGRPCNIO/ServerCallContexts/ServerCallContext.swift new file mode 100644 index 000000000..35a80e40f --- /dev/null +++ b/Sources/SwiftGRPCNIO/ServerCallContexts/ServerCallContext.swift @@ -0,0 +1,17 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Base class providing data provided to the framework user for all server calls. +open class ServerCallContext { + /// The event loop this call is served on. + public let eventLoop: EventLoop + /// Generic metadata provided with this request. + public let request: HTTPRequestHead + + public init(eventLoop: EventLoop, request: HTTPRequestHead) { + self.eventLoop = eventLoop + self.request = request + } +} diff --git a/Sources/SwiftGRPCNIO/ServerCallContexts/StreamingResponseCallContext.swift b/Sources/SwiftGRPCNIO/ServerCallContexts/StreamingResponseCallContext.swift new file mode 100644 index 000000000..9b18f04dc --- /dev/null +++ b/Sources/SwiftGRPCNIO/ServerCallContexts/StreamingResponseCallContext.swift @@ -0,0 +1,63 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Abstract base class exposing a method to send multiple messages over the wire and a promise for the final RPC status. +/// +/// - When `statusPromise` is fulfilled, the call is closed and the provided status transmitted. +/// - If `statusPromise` is failed and the error is of type `GRPCStatus`, that error will be returned to the client. +/// - For other errors, `GRPCStatus.processingError` is returned to the client. +open class StreamingResponseCallContext: ServerCallContext { + public typealias WrappedResponse = GRPCServerResponsePart + + public let statusPromise: EventLoopPromise + + public override init(eventLoop: EventLoop, request: HTTPRequestHead) { + self.statusPromise = eventLoop.newPromise() + super.init(eventLoop: eventLoop, request: request) + } + + open func sendResponse(_ message: ResponseMessage) -> EventLoopFuture { + fatalError("needs to be overridden") + } +} + +/// Concrete implementation of `StreamingResponseCallContext` used by our generated code. +open class StreamingResponseCallContextImpl: StreamingResponseCallContext { + public let channel: Channel + + public init(channel: Channel, request: HTTPRequestHead) { + self.channel = channel + + super.init(eventLoop: channel.eventLoop, request: request) + + statusPromise.futureResult + // Ensure that any error provided is of type `GRPCStatus`, using "internal server error" as a fallback. + .mapIfError { error in + (error as? GRPCStatus) ?? .processingError + } + // Finish the call by returning the final status. + .whenSuccess { + self.channel.writeAndFlush(NIOAny(WrappedResponse.status($0)), promise: nil) + } + } + + open override func sendResponse(_ message: ResponseMessage) -> EventLoopFuture { + let promise: EventLoopPromise = eventLoop.newPromise() + channel.writeAndFlush(NIOAny(WrappedResponse.message(message)), promise: promise) + return promise.futureResult + } +} + +/// Concrete implementation of `StreamingResponseCallContext` used for testing. +/// +/// Simply records all sent messages. +open class StreamingResponseCallContextTestStub: StreamingResponseCallContext { + open var recordedResponses: [ResponseMessage] = [] + + open override func sendResponse(_ message: ResponseMessage) -> EventLoopFuture { + recordedResponses.append(message) + return eventLoop.newSucceededFuture(result: ()) + } +} diff --git a/Sources/SwiftGRPCNIO/ServerCallContexts/UnaryResponseCallContext.swift b/Sources/SwiftGRPCNIO/ServerCallContexts/UnaryResponseCallContext.swift new file mode 100644 index 000000000..2f329842e --- /dev/null +++ b/Sources/SwiftGRPCNIO/ServerCallContexts/UnaryResponseCallContext.swift @@ -0,0 +1,74 @@ +import Foundation +import SwiftProtobuf +import NIO +import NIOHTTP1 + +/// Abstract base class exposing a method that exposes a promise fot the RPC response. +/// +/// - When `responsePromise` is fulfilled, the call is closed and the provided response transmitted with status `responseStatus` (`.ok` by default). +/// - If `statusPromise` is failed and the error is of type `GRPCStatus`, that error will be returned to the client. +/// - For other errors, `GRPCStatus.processingError` is returned to the client. +/// +/// For unary calls, the response is not actually provided by fulfilling `responsePromise`, but instead by completing +/// the future returned by `UnaryCallHandler.EventObserver`. +open class UnaryResponseCallContext: ServerCallContext, StatusOnlyCallContext { + public typealias WrappedResponse = GRPCServerResponsePart + + public let responsePromise: EventLoopPromise + public var responseStatus: GRPCStatus = .ok + + public override init(eventLoop: EventLoop, request: HTTPRequestHead) { + self.responsePromise = eventLoop.newPromise() + super.init(eventLoop: eventLoop, request: request) + } +} + +/// Protocol variant of `UnaryResponseCallContext` that only exposes the `responseStatus` field, but not +/// `responsePromise`. +/// +/// Motivation: `UnaryCallHandler` already asks the call handler return an `EventLoopFuture` which +/// is automatically cascaded into `UnaryResponseCallContext.responsePromise`, so that promise does not (and should not) +/// be fulfilled by the user. +/// +/// We can use a protocol (instead of an abstract base class) here because removing the generic `responsePromise` field +/// lets us avoid associated-type requirements on the protol. +public protocol StatusOnlyCallContext { + var eventLoop: EventLoop { get } + var request: HTTPRequestHead { get } + + var responseStatus: GRPCStatus { get set } +} + +/// Concrete implementation of `UnaryResponseCallContext` used by our generated code. +open class UnaryResponseCallContextImpl: UnaryResponseCallContext { + public let channel: Channel + + public init(channel: Channel, request: HTTPRequestHead) { + self.channel = channel + + super.init(eventLoop: channel.eventLoop, request: request) + + responsePromise.futureResult + .map { responseMessage in + // Send the response provided to the promise. + //! FIXME: It would be nicer to chain sending the status onto a successful write, but for some reason the + // "write message" future doesn't seem to get fulfilled? + self.channel.write(NIOAny(WrappedResponse.message(responseMessage)), promise: nil) + + return self.responseStatus + } + // Ensure that any error provided is of type `GRPCStatus`, using "internal server error" as a fallback. + .mapIfError { error in + (error as? GRPCStatus) ?? .processingError + } + // Finish the call by returning the final status. + .whenSuccess { status in + self.channel.writeAndFlush(NIOAny(WrappedResponse.status(status)), promise: nil) + } + } +} + +/// Concrete implementation of `UnaryResponseCallContext` used for testing. +/// +/// Only provided to make it clear in tests that no "real" implementation is used. +open class UnaryResponseCallContextTestStub: UnaryResponseCallContext { } diff --git a/Sources/SwiftGRPCNIO/StatusCode.swift b/Sources/SwiftGRPCNIO/StatusCode.swift new file mode 120000 index 000000000..d4e22bc0d --- /dev/null +++ b/Sources/SwiftGRPCNIO/StatusCode.swift @@ -0,0 +1 @@ +../SwiftGRPC/Core/StatusCode.swift \ No newline at end of file diff --git a/Sources/SwiftGRPCNIO/StreamEvent.swift b/Sources/SwiftGRPCNIO/StreamEvent.swift new file mode 100644 index 000000000..6e683da34 --- /dev/null +++ b/Sources/SwiftGRPCNIO/StreamEvent.swift @@ -0,0 +1,8 @@ +import SwiftProtobuf + +/// An event that can occur on a client-streaming RPC. Provided to the event observer registered for that call. +public enum StreamEvent { + case message(Message) + case end + //! FIXME: Also support errors in this type, to propagate them to the event handler. +} diff --git a/Sources/protoc-gen-swiftgrpc/Generator-Names.swift b/Sources/protoc-gen-swiftgrpc/Generator-Names.swift index cbd76b5ef..6fe3ddea1 100644 --- a/Sources/protoc-gen-swiftgrpc/Generator-Names.swift +++ b/Sources/protoc-gen-swiftgrpc/Generator-Names.swift @@ -48,7 +48,11 @@ extension Generator { } internal var providerName: String { - return nameForPackageService(file, service) + "Provider" + if options.generateNIOImplementation { + return nameForPackageService(file, service) + "Provider_NIO" + } else { + return nameForPackageService(file, service) + "Provider" + } } internal var callName: String { diff --git a/Sources/protoc-gen-swiftgrpc/Generator-Server.swift b/Sources/protoc-gen-swiftgrpc/Generator-Server.swift index 943a32296..45de736ff 100644 --- a/Sources/protoc-gen-swiftgrpc/Generator-Server.swift +++ b/Sources/protoc-gen-swiftgrpc/Generator-Server.swift @@ -18,9 +18,12 @@ import SwiftProtobuf import SwiftProtobufPluginLibrary extension Generator { - internal func printServer() { printServerProtocol() + + guard !options.generateNIOImplementation + else { return } + for method in service.methods { self.method = method switch streamingType(method) { @@ -39,21 +42,37 @@ extension Generator { private func printServerProtocol() { println("/// To build a server, implement a class that conforms to this protocol.") - println("/// If one of the methods returning `ServerStatus?` returns nil,") - println("/// it is expected that you have already returned a status to the client by means of `session.close`.") - println("\(access) protocol \(providerName): ServiceProvider {") + if !options.generateNIOImplementation { + println("/// If one of the methods returning `ServerStatus?` returns nil,") + println("/// it is expected that you have already returned a status to the client by means of `session.close`.") + } + println("\(access) protocol \(providerName): \(options.generateNIOImplementation ? "CallHandlerProvider" : "ServiceProvider") {") indent() for method in service.methods { self.method = method - switch streamingType(method) { - case .unary: - println("func \(methodFunctionName)(request: \(methodInputName), session: \(methodSessionName)) throws -> \(methodOutputName)") - case .serverStreaming: - println("func \(methodFunctionName)(request: \(methodInputName), session: \(methodSessionName)) throws -> ServerStatus?") - case .clientStreaming: - println("func \(methodFunctionName)(session: \(methodSessionName)) throws -> \(methodOutputName)?") - case .bidirectionalStreaming: - println("func \(methodFunctionName)(session: \(methodSessionName)) throws -> ServerStatus?") + + if options.generateNIOImplementation { + switch streamingType(method) { + case .unary: + println("func \(methodFunctionName)(request: \(methodInputName), context: StatusOnlyCallContext) -> EventLoopFuture<\(methodOutputName)>") + case .serverStreaming: + println("func \(methodFunctionName)(request: \(methodInputName), context: StreamingResponseCallContext<\(methodOutputName)>) -> EventLoopFuture") + case .clientStreaming: + println("func \(methodFunctionName)(context: UnaryResponseCallContext<\(methodOutputName)>) -> EventLoopFuture<(StreamEvent<\(methodInputName)>) -> Void>") + case .bidirectionalStreaming: + println("func \(methodFunctionName)(context: StreamingResponseCallContext<\(methodOutputName)>) -> EventLoopFuture<(StreamEvent<\(methodInputName)>) -> Void>") + } + } else { + switch streamingType(method) { + case .unary: + println("func \(methodFunctionName)(request: \(methodInputName), session: \(methodSessionName)) throws -> \(methodOutputName)") + case .serverStreaming: + println("func \(methodFunctionName)(request: \(methodInputName), session: \(methodSessionName)) throws -> ServerStatus?") + case .clientStreaming: + println("func \(methodFunctionName)(session: \(methodSessionName)) throws -> \(methodOutputName)?") + case .bidirectionalStreaming: + println("func \(methodFunctionName)(session: \(methodSessionName)) throws -> ServerStatus?") + } } } outdent() @@ -63,44 +82,84 @@ extension Generator { indent() println("\(access) var serviceName: String { return \"\(servicePath)\" }") println() - println("/// Determines and calls the appropriate request handler, depending on the request's method.") - println("/// Throws `HandleMethodError.unknownMethod` for methods not handled by this service.") - println("\(access) func handleMethod(_ method: String, handler: Handler) throws -> ServerStatus? {") - indent() - println("switch method {") - for method in service.methods { - self.method = method - println("case \(methodPath):") + if options.generateNIOImplementation { + println("/// Determines, calls and returns the appropriate request handler, depending on the request's method.") + println("/// Returns nil for methods not handled by this service.") + println("\(access) func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel) -> GRPCCallHandler? {") indent() - switch streamingType(method) { - case .unary, .serverStreaming: - println("return try \(methodSessionName)Base(") + println("switch methodName {") + for method in service.methods { + self.method = method + println("case \"\(method.name)\":") indent() - println("handler: handler,") - println("providerBlock: { try self.\(methodFunctionName)(request: $0, session: $1 as! \(methodSessionName)Base) })") + let callHandlerType: String + switch streamingType(method) { + case .unary: callHandlerType = "UnaryCallHandler" + case .serverStreaming: callHandlerType = "ServerStreamingCallHandler" + case .clientStreaming: callHandlerType = "ClientStreamingCallHandler" + case .bidirectionalStreaming: callHandlerType = "BidirectionalStreamingCallHandler" + } + println("return \(callHandlerType)(channel: channel, request: request) { context in") indent() - println(".run()") + switch streamingType(method) { + case .unary, .serverStreaming: + println("return { request in") + indent() + println("self.\(methodFunctionName)(request: request, context: context)") + outdent() + println("}") + case .clientStreaming, .bidirectionalStreaming: + println("return self.\(methodFunctionName)(context: context)") + } outdent() + println("}") outdent() - default: - println("return try \(methodSessionName)Base(") - indent() - println("handler: handler,") - println("providerBlock: { try self.\(methodFunctionName)(session: $0 as! \(methodSessionName)Base) })") + println() + } + println("default: return nil") + println("}") + outdent() + println("}") + } else { + println("/// Determines and calls the appropriate request handler, depending on the request's method.") + println("/// Throws `HandleMethodError.unknownMethod` for methods not handled by this service.") + println("\(access) func handleMethod(_ method: String, handler: Handler) throws -> ServerStatus? {") + indent() + println("switch method {") + for method in service.methods { + self.method = method + println("case \(methodPath):") indent() - println(".run()") - outdent() + switch streamingType(method) { + case .unary, .serverStreaming: + println("return try \(methodSessionName)Base(") + indent() + println("handler: handler,") + println("providerBlock: { try self.\(methodFunctionName)(request: $0, session: $1 as! \(methodSessionName)Base) })") + indent() + println(".run()") + outdent() + outdent() + default: + println("return try \(methodSessionName)Base(") + indent() + println("handler: handler,") + println("providerBlock: { try self.\(methodFunctionName)(session: $0 as! \(methodSessionName)Base) })") + indent() + println(".run()") + outdent() + outdent() + } outdent() } + println("default:") + indent() + println("throw HandleMethodError.unknownMethod") + outdent() + println("}") outdent() + println("}") } - println("default:") - indent() - println("throw HandleMethodError.unknownMethod") - outdent() - println("}") - outdent() - println("}") outdent() println("}") println() diff --git a/Sources/protoc-gen-swiftgrpc/Generator.swift b/Sources/protoc-gen-swiftgrpc/Generator.swift index fcc2790a7..44b9beb67 100644 --- a/Sources/protoc-gen-swiftgrpc/Generator.swift +++ b/Sources/protoc-gen-swiftgrpc/Generator.swift @@ -73,12 +73,29 @@ class Generator { //\n """) - for moduleName in ["Foundation", "Dispatch", "SwiftGRPC", "SwiftProtobuf"] { + let moduleNames: [String] + if options.generateNIOImplementation { + moduleNames = [ + "Foundation", + "NIO", + "NIOHTTP1", + "SwiftGRPCNIO", + "SwiftProtobuf"] + } else { + moduleNames = [ + "Foundation", + "Dispatch", + "SwiftGRPC", + "SwiftProtobuf"] + } + for moduleName in moduleNames { println("import \(moduleName)") } println() if options.generateClient { + guard !options.generateNIOImplementation else { fatalError("Generating client code is not yet supported for SwiftGRPC-NIO.") } + for service in file.services { self.service = service printClient(asynchronousCode: options.generateAsynchronous, @@ -88,6 +105,10 @@ class Generator { println() if options.generateServer { + if options.generateTestStubs && options.generateNIOImplementation { + fatalError("Generating test stubs is not yet supported for SwiftGRPC-NIO.") + } + for service in file.services { self.service = service printServer() diff --git a/Sources/protoc-gen-swiftgrpc/options.swift b/Sources/protoc-gen-swiftgrpc/options.swift index 6070ca831..b94cd8baa 100644 --- a/Sources/protoc-gen-swiftgrpc/options.swift +++ b/Sources/protoc-gen-swiftgrpc/options.swift @@ -52,6 +52,7 @@ final class GeneratorOptions { private(set) var generateAsynchronous = true private(set) var generateSynchronous = true private(set) var generateTestStubs = false + private(set) var generateNIOImplementation = false init(parameter: String?) throws { for pair in GeneratorOptions.parseParameter(string: parameter) { @@ -90,12 +91,19 @@ final class GeneratorOptions { } else { throw GenerationError.invalidParameterValue(name: pair.key, value: pair.value) } - + case "TestStubs": if let value = Bool(pair.value) { - generateTestStubs = value + generateTestStubs = value + } else { + throw GenerationError.invalidParameterValue(name: pair.key, value: pair.value) + } + + case "NIO": + if let value = Bool(pair.value) { + generateNIOImplementation = value } else { - throw GenerationError.invalidParameterValue(name: pair.key, value: pair.value) + throw GenerationError.invalidParameterValue(name: pair.key, value: pair.value) } default: diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index ec2080c46..f8dac310b 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -15,8 +15,10 @@ */ import XCTest @testable import SwiftGRPCTests +@testable import SwiftGRPCNIOTests XCTMain([ + // SwiftGRPC testCase(gRPCTests.allTests), testCase(ChannelArgumentTests.allTests), testCase(ClientCancellingTests.allTests), @@ -31,5 +33,8 @@ XCTMain([ testCase(ServerCancellingTests.allTests), testCase(ServerTestExample.allTests), testCase(ServerThrowingTests.allTests), - testCase(ServerTimeoutTests.allTests) + testCase(ServerTimeoutTests.allTests), + + // SwiftGRPCNIO + testCase(NIOServerTests.allTests) ]) diff --git a/Tests/SwiftGRPCNIOTests/EchoProvider.swift b/Tests/SwiftGRPCNIOTests/EchoProvider.swift new file mode 120000 index 000000000..7d14bfe94 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/EchoProvider.swift @@ -0,0 +1 @@ +../../Sources/Examples/Echo/EchoProvider.swift \ No newline at end of file diff --git a/Tests/SwiftGRPCNIOTests/NIOServerTestCase.swift b/Tests/SwiftGRPCNIOTests/NIOServerTestCase.swift new file mode 100644 index 000000000..cd0848214 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/NIOServerTestCase.swift @@ -0,0 +1,57 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Dispatch +import Foundation +@testable import SwiftGRPC +import XCTest + +extension Echo_EchoRequest { + init(text: String) { + self.text = text + } +} + +extension Echo_EchoResponse { + init(text: String) { + self.text = text + } +} + +class NIOServerTestCase: XCTestCase { + func makeProvider() -> Echo_EchoProvider { return EchoProvider() } + + var provider: Echo_EchoProvider! + var client: Echo_EchoServiceClient! + + var defaultTimeout: TimeInterval { return 1.0 } + var address: String { return "localhost:5050" } + + override func setUp() { + super.setUp() + + provider = makeProvider() + + client = Echo_EchoServiceClient(address: address, secure: false) + + client.timeout = defaultTimeout + } + + override func tearDown() { + client = nil + + super.tearDown() + } +} diff --git a/Tests/SwiftGRPCNIOTests/NIOServerTests.swift b/Tests/SwiftGRPCNIOTests/NIOServerTests.swift new file mode 100644 index 000000000..61801389d --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/NIOServerTests.swift @@ -0,0 +1,302 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Dispatch +import Foundation +import NIO +import NIOHTTP1 +import NIOHTTP2 +@testable import SwiftGRPC +@testable import SwiftGRPCNIO +import XCTest + +// This class is what the SwiftGRPC user would actually implement to provide their service. +final class EchoProvider_NIO: Echo_EchoProvider_NIO { + func get(request: Echo_EchoRequest, context: StatusOnlyCallContext) -> EventLoopFuture { + var response = Echo_EchoResponse() + response.text = "Swift echo get: " + request.text + return context.eventLoop.newSucceededFuture(result: response) + } + + func collect(context: UnaryResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> { + var parts: [String] = [] + return context.eventLoop.newSucceededFuture(result: { event in + switch event { + case .message(let message): + parts.append(message.text) + + case .end: + var response = Echo_EchoResponse() + response.text = "Swift echo collect: " + parts.joined(separator: " ") + context.responsePromise.succeed(result: response) + } + }) + } + + func expand(request: Echo_EchoRequest, context: StreamingResponseCallContext) -> EventLoopFuture { + var endOfSendOperationQueue = context.eventLoop.newSucceededFuture(result: ()) + let parts = request.text.components(separatedBy: " ") + for (i, part) in parts.enumerated() { + var response = Echo_EchoResponse() + response.text = "Swift echo expand (\(i)): \(part)" + endOfSendOperationQueue = endOfSendOperationQueue.then { context.sendResponse(response) } + } + return endOfSendOperationQueue.map { GRPCStatus.ok } + } + + func update(context: StreamingResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> { + var endOfSendOperationQueue = context.eventLoop.newSucceededFuture(result: ()) + var count = 0 + return context.eventLoop.newSucceededFuture(result: { event in + switch event { + case .message(let message): + var response = Echo_EchoResponse() + response.text = "Swift echo update (\(count)): \(message.text)" + endOfSendOperationQueue = endOfSendOperationQueue.then { context.sendResponse(response) } + count += 1 + + case .end: + endOfSendOperationQueue + .map { GRPCStatus.ok } + .cascade(promise: context.statusPromise) + } + }) + } +} + +class NIOServerTests: NIOServerTestCase { + static var allTests: [(String, (NIOServerTests) -> () throws -> Void)] { + return [ + ("testUnary", testUnary), + ("testUnaryLotsOfRequests", testUnaryLotsOfRequests), + ("testClientStreaming", testClientStreaming), + ("testClientStreamingLotsOfMessages", testClientStreamingLotsOfMessages), + ("testServerStreaming", testServerStreaming), + ("testServerStreamingLotsOfMessages", testServerStreamingLotsOfMessages), + ("testBidirectionalStreamingBatched", testBidirectionalStreamingBatched), + ("testBidirectionalStreamingPingPong", testBidirectionalStreamingPingPong), + ("testBidirectionalStreamingLotsOfMessagesBatched", testBidirectionalStreamingLotsOfMessagesBatched), + ("testBidirectionalStreamingLotsOfMessagesPingPong", testBidirectionalStreamingLotsOfMessagesPingPong) + ] + } + + static let lotsOfStrings = (0..<1000).map { String(describing: $0) } + + var eventLoopGroup: MultiThreadedEventLoopGroup! + var server: GRPCServer! + + override func setUp() { + super.setUp() + + // This is how a GRPC server would actually be set up. + eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + server = try! GRPCServer.start( + hostname: "localhost", port: 5050, eventLoopGroup: eventLoopGroup, serviceProviders: [EchoProvider_NIO()]) + .wait() + } + + override func tearDown() { + XCTAssertNoThrow(try server.close().wait()) + + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + eventLoopGroup = nil + + super.tearDown() + } +} + +extension NIOServerTests { + func testUnary() { + XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text) + } + + func testUnaryLotsOfRequests() { + // Sending that many requests at once can sometimes trip things up, it seems. + client.timeout = 5.0 + let clockStart = clock() + let numberOfRequests = 2_000 + for i in 0.. 0 { + print("\(i) requests sent so far, elapsed time: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))") + } + XCTAssertEqual("Swift echo get: foo \(i)", try client.get(Echo_EchoRequest(text: "foo \(i)")).text) + } + print("total time for \(numberOfRequests) requests: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))") + } +} + +extension NIOServerTests { + func testClientStreaming() { + let completionHandlerExpectation = expectation(description: "final completion handler called") + let call = try! client.collect { callResult in + XCTAssertEqual(.ok, callResult.statusCode) + completionHandlerExpectation.fulfill() + } + + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "foo"))) + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "bar"))) + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "baz"))) + call.waitForSendOperationsToFinish() + + let response = try! call.closeAndReceive() + XCTAssertEqual("Swift echo collect: foo bar baz", response.text) + + waitForExpectations(timeout: defaultTimeout) + } + + func testClientStreamingLotsOfMessages() { + let completionHandlerExpectation = expectation(description: "completion handler called") + let call = try! client.collect { callResult in + XCTAssertEqual(.ok, callResult.statusCode) + completionHandlerExpectation.fulfill() + } + + for string in NIOServerTests.lotsOfStrings { + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: string))) + } + call.waitForSendOperationsToFinish() + + let response = try! call.closeAndReceive() + XCTAssertEqual("Swift echo collect: " + NIOServerTests.lotsOfStrings.joined(separator: " "), response.text) + + waitForExpectations(timeout: defaultTimeout) + } +} + +extension NIOServerTests { + func testServerStreaming() { + let completionHandlerExpectation = expectation(description: "completion handler called") + let call = try! client.expand(Echo_EchoRequest(text: "foo bar baz")) { callResult in + XCTAssertEqual(.ok, callResult.statusCode) + completionHandlerExpectation.fulfill() + } + + XCTAssertEqual("Swift echo expand (0): foo", try! call.receive()!.text) + XCTAssertEqual("Swift echo expand (1): bar", try! call.receive()!.text) + XCTAssertEqual("Swift echo expand (2): baz", try! call.receive()!.text) + XCTAssertNil(try! call.receive()) + + waitForExpectations(timeout: defaultTimeout) + } + + func testServerStreamingLotsOfMessages() { + let completionHandlerExpectation = expectation(description: "completion handler called") + let call = try! client.expand(Echo_EchoRequest(text: NIOServerTests.lotsOfStrings.joined(separator: " "))) { callResult in + XCTAssertEqual(.ok, callResult.statusCode) + completionHandlerExpectation.fulfill() + } + + for string in NIOServerTests.lotsOfStrings { + XCTAssertEqual("Swift echo expand (\(string)): \(string)", try! call.receive()!.text) + } + XCTAssertNil(try! call.receive()) + + waitForExpectations(timeout: defaultTimeout) + } +} + +extension NIOServerTests { + func testBidirectionalStreamingBatched() { + let finalCompletionHandlerExpectation = expectation(description: "final completion handler called") + let call = try! client.update { callResult in + XCTAssertEqual(.ok, callResult.statusCode) + finalCompletionHandlerExpectation.fulfill() + } + + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "foo"))) + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "bar"))) + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "baz"))) + + call.waitForSendOperationsToFinish() + + XCTAssertNoThrow(try call.closeSend()) + + XCTAssertEqual("Swift echo update (0): foo", try! call.receive()!.text) + XCTAssertEqual("Swift echo update (1): bar", try! call.receive()!.text) + XCTAssertEqual("Swift echo update (2): baz", try! call.receive()!.text) + XCTAssertNil(try! call.receive()) + + waitForExpectations(timeout: defaultTimeout) + } + + func testBidirectionalStreamingPingPong() { + let finalCompletionHandlerExpectation = expectation(description: "final completion handler called") + let call = try! client.update { callResult in + XCTAssertEqual(.ok, callResult.statusCode) + finalCompletionHandlerExpectation.fulfill() + } + + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "foo"))) + XCTAssertEqual("Swift echo update (0): foo", try! call.receive()!.text) + + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "bar"))) + XCTAssertEqual("Swift echo update (1): bar", try! call.receive()!.text) + + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "baz"))) + XCTAssertEqual("Swift echo update (2): baz", try! call.receive()!.text) + + call.waitForSendOperationsToFinish() + + XCTAssertNoThrow(try call.closeSend()) + + XCTAssertNil(try! call.receive()) + + waitForExpectations(timeout: defaultTimeout) + } + + func testBidirectionalStreamingLotsOfMessagesBatched() { + let finalCompletionHandlerExpectation = expectation(description: "final completion handler called") + let call = try! client.update { callResult in + XCTAssertEqual(.ok, callResult.statusCode) + finalCompletionHandlerExpectation.fulfill() + } + + for string in NIOServerTests.lotsOfStrings { + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: string))) + } + + call.waitForSendOperationsToFinish() + + XCTAssertNoThrow(try call.closeSend()) + + for string in NIOServerTests.lotsOfStrings { + XCTAssertEqual("Swift echo update (\(string)): \(string)", try! call.receive()!.text) + } + XCTAssertNil(try! call.receive()) + + waitForExpectations(timeout: defaultTimeout) + } + + func testBidirectionalStreamingLotsOfMessagesPingPong() { + let finalCompletionHandlerExpectation = expectation(description: "final completion handler called") + let call = try! client.update { callResult in + XCTAssertEqual(.ok, callResult.statusCode) + finalCompletionHandlerExpectation.fulfill() + } + + for string in NIOServerTests.lotsOfStrings { + XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: string))) + XCTAssertEqual("Swift echo update (\(string)): \(string)", try! call.receive()!.text) + } + + call.waitForSendOperationsToFinish() + + XCTAssertNoThrow(try call.closeSend()) + + XCTAssertNil(try! call.receive()) + + waitForExpectations(timeout: defaultTimeout) + } +} diff --git a/Tests/SwiftGRPCNIOTests/echo.grpc.swift b/Tests/SwiftGRPCNIOTests/echo.grpc.swift new file mode 120000 index 000000000..9fef35792 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/echo.grpc.swift @@ -0,0 +1 @@ +../../Sources/Examples/Echo/Generated/echo.grpc.swift \ No newline at end of file diff --git a/Tests/SwiftGRPCNIOTests/echo.pb.swift b/Tests/SwiftGRPCNIOTests/echo.pb.swift new file mode 120000 index 000000000..9dbe28075 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/echo.pb.swift @@ -0,0 +1 @@ +../../Sources/Examples/Echo/Generated/echo.pb.swift \ No newline at end of file diff --git a/Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift b/Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift new file mode 100644 index 000000000..e3cfbfb44 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift @@ -0,0 +1,73 @@ +// +// DO NOT EDIT. +// +// Generated by the protocol buffer compiler. +// Source: echo.proto +// + +// +// Copyright 2018, gRPC Authors All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +import Foundation +import NIO +import NIOHTTP1 +import SwiftGRPCNIO +import SwiftProtobuf + + +/// To build a server, implement a class that conforms to this protocol. +internal protocol Echo_EchoProvider_NIO: CallHandlerProvider { + func get(request: Echo_EchoRequest, context: StatusOnlyCallContext) -> EventLoopFuture + func expand(request: Echo_EchoRequest, context: StreamingResponseCallContext) -> EventLoopFuture + func collect(context: UnaryResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> + func update(context: StreamingResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> +} + +extension Echo_EchoProvider_NIO { + internal var serviceName: String { return "echo.Echo" } + + /// Determines, calls and returns the appropriate request handler, depending on the request's method. + /// Returns nil for methods not handled by this service. + internal func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel) -> GRPCCallHandler? { + switch methodName { + case "Get": + return UnaryCallHandler(channel: channel, request: request) { context in + return { request in + self.get(request: request, context: context) + } + } + + case "Expand": + return ServerStreamingCallHandler(channel: channel, request: request) { context in + return { request in + self.expand(request: request, context: context) + } + } + + case "Collect": + return ClientStreamingCallHandler(channel: channel, request: request) { context in + return self.collect(context: context) + } + + case "Update": + return BidirectionalStreamingCallHandler(channel: channel, request: request) { context in + return self.update(context: context) + } + + default: return nil + } + } +} +