Skip to content

Commit

Permalink
First pass implementation of NIO client (#357)
Browse files Browse the repository at this point in the history
* First pass implementation of NIO client.

* Renaming and removal of force unwrap/try

* Improve error handling in NIO server.

- Adds a user-configurable error handler to the server
- Updates NIO server codegen to provide an optional error handler
- Errors are handled by GRPCChannelHandler or BaseCallHandler,
  depending on the pipeline state
- Adds some error handling tests
- Tidies some logic in HTTP1ToRawGRPCServerCodec
- Extends message handling logic in HTTP1ToRawGRPCServerCodec
  to handle messages split across multiple ByteBuffers (i.e. when a
  message exceeds a the size of a frame)

* Update error delegate

* Renaming, tidy up HTTP1ToRawGRPCClientCodec, CallOptions

* Client code-gen

* Strongly hold errorDelegate in the server until shutdown

* Timeoutes, tidying up, documentation

* GRPCTimeout documentation

* Add timeout to request headers

* Add client cancelling and timeout tests.

* Fix typos, missing doc

* Add allCases to CompressionMehcnaism for swift < 4.2

* Update LinuxMain

* More errors to a dedicated enum, fix typos, etc.

* PR feedback; docs, tidying

* Renaming, typo fixes

* Split out GRPCChannelHandlerTests and HTTPToRawGRPCServerCodecTests

* Update LinuxMain

* Add missing commas to LinuxMain

* Fix grpc-web testUnaryLotsOfRequests on Linux

* Disable broken Linux test

* Split errors into server and client enums.

* Add more client-specific errors

* Fixup comments, documentation

* Fix typos and clarify documentation

* Enqueue messages to be sent

* Workaround compile error for swift<4.2

* Fix documentation, add TODOs

* Increase timeout for bidi tests
  • Loading branch information
glbrntt authored and MrMage committed Mar 1, 2019
1 parent ee0f374 commit 97ff923
Show file tree
Hide file tree
Showing 52 changed files with 2,701 additions and 594 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ test-plugin:

test-plugin-nio:
swift build $(CFLAGS) --product protoc-gen-swiftgrpc
protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=Client=false,NIO=true
diff -u /tmp/echo.grpc.swift Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift
protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=NIO=true
diff -u /tmp/echo.grpc.swift Sources/Examples/EchoNIO/Generated/echo.grpc.swift

xcodebuild: project
xcodebuild -project SwiftGRPC.xcodeproj -configuration "Debug" -parallelizeTargets -target SwiftGRPC -target Echo -target Simple -target protoc-gen-swiftgrpc build
Expand Down
6 changes: 0 additions & 6 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ cgrpc_channel *cgrpc_channel_create_secure(const char *address,

c->channel = grpc_secure_channel_create(creds, address, &channel_args, NULL);
c->completion_queue = grpc_completion_queue_create_for_next(NULL);

grpc_channel_credentials_release(creds);

return c;
}

Expand All @@ -76,9 +73,6 @@ cgrpc_channel *cgrpc_channel_create_google(const char *address,

c->channel = grpc_secure_channel_create(google_creds, address, &channel_args, NULL);
c->completion_queue = grpc_completion_queue_create_for_next(NULL);

grpc_channel_credentials_release(google_creds);

return c;
}

Expand Down
144 changes: 144 additions & 0 deletions Sources/Examples/EchoNIO/Generated/echo.grpc.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//
// DO NOT EDIT.
//
// Generated by the protocol buffer compiler.
// Source: echo.proto
//

//
// Copyright 2018, gRPC Authors All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
import Foundation
import NIO
import NIOHTTP1
import SwiftGRPCNIO
import SwiftProtobuf


/// Usage: instantiate Echo_EchoService_NIOClient, then call methods of this protocol to make API calls.
internal protocol Echo_EchoService_NIO {
func get(_ request: Echo_EchoRequest, callOptions: CallOptions?) -> UnaryClientCall<Echo_EchoRequest, Echo_EchoResponse>
func expand(_ request: Echo_EchoRequest, callOptions: CallOptions?, handler: @escaping (Echo_EchoResponse) -> Void) -> ServerStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse>
func collect(callOptions: CallOptions?) -> ClientStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse>
func update(callOptions: CallOptions?, handler: @escaping (Echo_EchoResponse) -> Void) -> BidirectionalStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse>
}

internal final class Echo_EchoService_NIOClient: GRPCServiceClient, Echo_EchoService_NIO {
internal let client: GRPCClient
internal let service = "echo.Echo"
internal var defaultCallOptions: CallOptions

/// Creates a client for the echo.Echo service.
///
/// - Parameters:
/// - client: `GRPCClient` with a connection to the service host.
/// - defaultCallOptions: Options to use for each service call if the user doesn't provide them. Defaults to `client.defaultCallOptions`.
internal init(client: GRPCClient, defaultCallOptions: CallOptions? = nil) {
self.client = client
self.defaultCallOptions = defaultCallOptions ?? client.defaultCallOptions
}

/// Asynchronous unary call to Get.
///
/// - Parameters:
/// - request: Request to send to Get.
/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.
/// - Returns: A `UnaryClientCall` with futures for the metadata, status and response.
internal func get(_ request: Echo_EchoRequest, callOptions: CallOptions? = nil) -> UnaryClientCall<Echo_EchoRequest, Echo_EchoResponse> {
return UnaryClientCall(client: client, path: path(forMethod: "Get"), request: request, callOptions: callOptions ?? self.defaultCallOptions)
}

/// Asynchronous server-streaming call to Expand.
///
/// - Parameters:
/// - request: Request to send to Expand.
/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.
/// - handler: A closure called when each response is received from the server.
/// - Returns: A `ServerStreamingClientCall` with futures for the metadata and status.
internal func expand(_ request: Echo_EchoRequest, callOptions: CallOptions? = nil, handler: @escaping (Echo_EchoResponse) -> Void) -> ServerStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse> {
return ServerStreamingClientCall(client: client, path: path(forMethod: "Expand"), request: request, callOptions: callOptions ?? self.defaultCallOptions, handler: handler)
}

/// Asynchronous client-streaming call to Collect.
///
/// Callers should use the `send` method on the returned object to send messages
/// to the server. The caller should send an `.end` after the final message has been sent.
///
/// - Parameters:
/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.
/// - Returns: A `ClientStreamingClientCall` with futures for the metadata, status and response.
internal func collect(callOptions: CallOptions? = nil) -> ClientStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse> {
return ClientStreamingClientCall(client: client, path: path(forMethod: "Collect"), callOptions: callOptions ?? self.defaultCallOptions)
}

/// Asynchronous bidirectional-streaming call to Update.
///
/// Callers should use the `send` method on the returned object to send messages
/// to the server. The caller should send an `.end` after the final message has been sent.
///
/// - Parameters:
/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.
/// - handler: A closure called when each response is received from the server.
/// - Returns: A `ClientStreamingClientCall` with futures for the metadata and status.
internal func update(callOptions: CallOptions? = nil, handler: @escaping (Echo_EchoResponse) -> Void) -> BidirectionalStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse> {
return BidirectionalStreamingClientCall(client: client, path: path(forMethod: "Update"), callOptions: callOptions ?? self.defaultCallOptions, handler: handler)
}

}

/// To build a server, implement a class that conforms to this protocol.
internal protocol Echo_EchoProvider_NIO: CallHandlerProvider {
func get(request: Echo_EchoRequest, context: StatusOnlyCallContext) -> EventLoopFuture<Echo_EchoResponse>
func expand(request: Echo_EchoRequest, context: StreamingResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<GRPCStatus>
func collect(context: UnaryResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void>
func update(context: StreamingResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void>
}

extension Echo_EchoProvider_NIO {
internal var serviceName: String { return "echo.Echo" }

/// Determines, calls and returns the appropriate request handler, depending on the request's method.
/// Returns nil for methods not handled by this service.
internal func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel, errorDelegate: ServerErrorDelegate?) -> GRPCCallHandler? {
switch methodName {
case "Get":
return UnaryCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in
return { request in
self.get(request: request, context: context)
}
}

case "Expand":
return ServerStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in
return { request in
self.expand(request: request, context: context)
}
}

case "Collect":
return ClientStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in
return self.collect(context: context)
}

case "Update":
return BidirectionalStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in
return self.update(context: context)
}

default: return nil
}
}
}

2 changes: 1 addition & 1 deletion Sources/Examples/EchoNIO/Generated/echo.pb.swift
1 change: 0 additions & 1 deletion Sources/Examples/EchoNIO/Generated/echo_nio.grpc.swift

This file was deleted.

155 changes: 148 additions & 7 deletions Sources/Examples/EchoNIO/main.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
* Copyright 2019, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,9 +24,23 @@ func addressOption(_ address: String) -> Option<String> {
return Option("address", default: address, description: "address of server")
}

let portOption = Option("port",
default: "8080",
description: "port of server")
let portOption = Option("port", default: 8080)
let messageOption = Option("message",
default: "Testing 1 2 3",
description: "message to send")

/// Create en `EchoClient` and wait for it to initialize. Returns nil if initialisation fails.
func makeEchoClient(address: String, port: Int) -> Echo_EchoService_NIOClient? {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
do {
return try GRPCClient.start(host: address, port: port, eventLoopGroup: eventLoopGroup)
.map { client in Echo_EchoService_NIOClient(client: client) }
.wait()
} catch {
print("Unable to create an EchoClient: \(error)")
return nil
}
}

Group {
$0.command("serve",
Expand All @@ -38,14 +52,141 @@ Group {

print("starting insecure server")
_ = try! GRPCServer.start(hostname: address,
port: Int(port)!,
eventLoopGroup: eventLoopGroup,
serviceProviders: [EchoProviderNIO()])
port: port,
eventLoopGroup: eventLoopGroup,
serviceProviders: [EchoProviderNIO()])
.wait()

// This blocks to keep the main thread from finishing while the server runs,
// but the server never exits. Kill the process to stop it.
_ = sem.wait()
}

$0.command(
"get",
addressOption("localhost"),
portOption,
messageOption,
description: "Perform a unary get()."
) { address, port, message in
print("calling get")
guard let echo = makeEchoClient(address: address, port: port) else { return }

var requestMessage = Echo_EchoRequest()
requestMessage.text = message

print("get sending: \(requestMessage.text)")
let get = echo.get(requestMessage)
get.response.whenSuccess { response in
print("get received: \(response.text)")
}

get.response.whenFailure { error in
print("get response failed with error: \(error)")
}

// wait() on the status to stop the program from exiting.
do {
let status = try get.status.wait()
print("get completed with status: \(status)")
} catch {
print("get status failed with error: \(error)")
}
}

$0.command(
"expand",
addressOption("localhost"),
portOption,
messageOption,
description: "Perform a server-streaming expand()."
) { address, port, message in
print("calling expand")
guard let echo = makeEchoClient(address: address, port: port) else { return }

let requestMessage = Echo_EchoRequest.with { $0.text = message }

print("expand sending: \(requestMessage.text)")
let expand = echo.expand(requestMessage) { response in
print("expand received: \(response.text)")
}

// wait() on the status to stop the program from exiting.
do {
let status = try expand.status.wait()
print("expand completed with status: \(status)")
} catch {
print("expand status failed with error: \(error)")
}
}

$0.command(
"collect",
addressOption("localhost"),
portOption,
messageOption,
description: "Perform a client-streaming collect()."
) { address, port, message in
print("calling collect")
guard let echo = makeEchoClient(address: address, port: port) else { return }

let collect = echo.collect()

var queue = collect.newMessageQueue()
for part in message.components(separatedBy: " ") {
var requestMessage = Echo_EchoRequest()
requestMessage.text = part
print("collect sending: \(requestMessage.text)")
queue = queue.then { collect.sendMessage(requestMessage) }
}
queue.whenSuccess { collect.sendEnd(promise: nil) }

collect.response.whenSuccess { respone in
print("collect received: \(respone.text)")
}

collect.response.whenFailure { error in
print("collect response failed with error: \(error)")
}

// wait() on the status to stop the program from exiting.
do {
let status = try collect.status.wait()
print("collect completed with status: \(status)")
} catch {
print("collect status failed with error: \(error)")
}
}

$0.command(
"update",
addressOption("localhost"),
portOption,
messageOption,
description: "Perform a bidirectional-streaming update()."
) { address, port, message in
print("calling update")
guard let echo = makeEchoClient(address: address, port: port) else { return }

let update = echo.update { response in
print("update received: \(response.text)")
}

var queue = update.newMessageQueue()
for part in message.components(separatedBy: " ") {
var requestMessage = Echo_EchoRequest()
requestMessage.text = part
print("update sending: \(requestMessage.text)")
queue = queue.then { update.sendMessage(requestMessage) }
}
queue.whenSuccess { update.sendEnd(promise: nil) }

// wait() on the status to stop the program from exiting.
do {
let status = try update.status.wait()
print("update completed with status: \(status)")
} catch {
print("update status failed with error: \(error)")
}
}
}.run()
4 changes: 2 additions & 2 deletions Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ extension BaseCallHandler: ChannelInboundHandler {
switch self.unwrapInboundIn(data) {
case .head(let requestHead):
// Head should have been handled by `GRPCChannelHandler`.
self.errorCaught(ctx: ctx, error: GRPCServerError.invalidState("unexpected request head received \(requestHead)"))
self.errorCaught(ctx: ctx, error: GRPCError.server(.invalidState("unexpected request head received \(requestHead)")))

case .message(let message):
do {
Expand All @@ -71,7 +71,7 @@ extension BaseCallHandler: ChannelOutboundHandler {

public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
guard serverCanWrite else {
promise?.fail(error: GRPCServerError.serverNotWritable)
promise?.fail(error: GRPCError.server(.serverNotWritable))
return
}

Expand Down
Loading

0 comments on commit 97ff923

Please sign in to comment.