Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass implementation of NIO client #357

Merged
merged 34 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
056f8b5
First pass implementation of NIO client.
glbrntt Jan 14, 2019
30511f4
Renaming and removal of force unwrap/try
glbrntt Jan 23, 2019
6b2d8bc
Improve error handling in NIO server.
glbrntt Jan 29, 2019
d2be70d
Update error delegate
glbrntt Jan 30, 2019
fd76ad3
Renaming, tidy up HTTP1ToRawGRPCClientCodec, CallOptions
glbrntt Jan 30, 2019
bb0ce38
Client code-gen
glbrntt Jan 31, 2019
005c70d
Strongly hold errorDelegate in the server until shutdown
glbrntt Jan 31, 2019
80eada1
Timeoutes, tidying up, documentation
glbrntt Feb 1, 2019
66e8d6e
GRPCTimeout documentation
glbrntt Feb 5, 2019
d3727f6
Add timeout to request headers
glbrntt Feb 5, 2019
64da48a
Add client cancelling and timeout tests.
glbrntt Feb 6, 2019
86e1e3d
Fix typos, missing doc
glbrntt Feb 7, 2019
6c34cff
Add allCases to CompressionMehcnaism for swift < 4.2
glbrntt Feb 18, 2019
97326e3
Update LinuxMain
glbrntt Feb 18, 2019
f9e60b1
More errors to a dedicated enum, fix typos, etc.
glbrntt Feb 19, 2019
2c05703
PR feedback; docs, tidying
glbrntt Feb 21, 2019
aa0acdc
Renaming, typo fixes
glbrntt Feb 25, 2019
13485c9
Merge branch 'master' into error-handling
glbrntt Feb 25, 2019
2b82ae8
Split out GRPCChannelHandlerTests and HTTPToRawGRPCServerCodecTests
glbrntt Feb 26, 2019
344a6e3
Update LinuxMain
glbrntt Feb 26, 2019
0fe5dd2
Add missing commas to LinuxMain
glbrntt Feb 26, 2019
e39d0a2
Fix grpc-web testUnaryLotsOfRequests on Linux
glbrntt Feb 26, 2019
0128ed3
Disable broken Linux test
glbrntt Feb 26, 2019
5b201af
Merge branch 'error-handling' into client
glbrntt Feb 26, 2019
e7b0746
Split errors into server and client enums.
glbrntt Feb 26, 2019
79040a4
Add more client-specific errors
glbrntt Feb 27, 2019
605d033
Fixup comments, documentation
glbrntt Feb 27, 2019
7ed5f94
Merge commit '158c4ef' into client
glbrntt Feb 27, 2019
e0f275f
Merge branch 'master' into client
glbrntt Feb 27, 2019
4c85d77
Fix typos and clarify documentation
glbrntt Feb 28, 2019
918dc7c
Enqueue messages to be sent
glbrntt Feb 28, 2019
5e30ed3
Workaround compile error for swift<4.2
glbrntt Mar 1, 2019
941e15a
Fix documentation, add TODOs
glbrntt Mar 1, 2019
d58519a
Increase timeout for bidi tests
glbrntt Mar 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ test-plugin:

test-plugin-nio:
swift build $(CFLAGS) --product protoc-gen-swiftgrpc
protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=Client=false,NIO=true
diff -u /tmp/echo.grpc.swift Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift
protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=NIO=true
diff -u /tmp/echo.grpc.swift Sources/Examples/EchoNIO/Generated/echo.grpc.swift

xcodebuild: project
xcodebuild -project SwiftGRPC.xcodeproj -configuration "Debug" -parallelizeTargets -target SwiftGRPC -target Echo -target Simple -target protoc-gen-swiftgrpc build
Expand Down
6 changes: 0 additions & 6 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ cgrpc_channel *cgrpc_channel_create_secure(const char *address,

c->channel = grpc_secure_channel_create(creds, address, &channel_args, NULL);
c->completion_queue = grpc_completion_queue_create_for_next(NULL);

grpc_channel_credentials_release(creds);

return c;
}

Expand All @@ -76,9 +73,6 @@ cgrpc_channel *cgrpc_channel_create_google(const char *address,

c->channel = grpc_secure_channel_create(google_creds, address, &channel_args, NULL);
c->completion_queue = grpc_completion_queue_create_for_next(NULL);

grpc_channel_credentials_release(google_creds);

return c;
}

Expand Down
144 changes: 144 additions & 0 deletions Sources/Examples/EchoNIO/Generated/echo.grpc.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//
// DO NOT EDIT.
//
// Generated by the protocol buffer compiler.
// Source: echo.proto
//

//
// Copyright 2018, gRPC Authors All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
import Foundation
import NIO
import NIOHTTP1
import SwiftGRPCNIO
import SwiftProtobuf


/// Usage: instantiate Echo_EchoService_NIOClient, then call methods of this protocol to make API calls.
internal protocol Echo_EchoService_NIO {
func get(_ request: Echo_EchoRequest, callOptions: CallOptions?) -> UnaryClientCall<Echo_EchoRequest, Echo_EchoResponse>
func expand(_ request: Echo_EchoRequest, callOptions: CallOptions?, handler: @escaping (Echo_EchoResponse) -> Void) -> ServerStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse>
func collect(callOptions: CallOptions?) -> ClientStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse>
func update(callOptions: CallOptions?, handler: @escaping (Echo_EchoResponse) -> Void) -> BidirectionalStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse>
}

internal final class Echo_EchoService_NIOClient: GRPCServiceClient, Echo_EchoService_NIO {
internal let client: GRPCClient
internal let service = "echo.Echo"
internal var defaultCallOptions: CallOptions

/// Creates a client for the echo.Echo service.
///
/// - Parameters:
/// - client: `GRPCClient` with a connection to the service host.
/// - defaultCallOptions: Options to use for each service call if the user doesn't provide them. Defaults to `client.defaultCallOptions`.
internal init(client: GRPCClient, defaultCallOptions: CallOptions? = nil) {
self.client = client
self.defaultCallOptions = defaultCallOptions ?? client.defaultCallOptions
}

/// Asynchronous unary call to Get.
///
/// - Parameters:
/// - request: Request to send to Get.
/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.
/// - Returns: A `UnaryClientCall` with futures for the metadata, status and response.
internal func get(_ request: Echo_EchoRequest, callOptions: CallOptions? = nil) -> UnaryClientCall<Echo_EchoRequest, Echo_EchoResponse> {
return UnaryClientCall(client: client, path: path(forMethod: "Get"), request: request, callOptions: callOptions ?? self.defaultCallOptions)
}

/// Asynchronous server-streaming call to Expand.
///
/// - Parameters:
/// - request: Request to send to Expand.
/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.
/// - handler: A closure called when each response is received from the server.
/// - Returns: A `ServerStreamingClientCall` with futures for the metadata and status.
internal func expand(_ request: Echo_EchoRequest, callOptions: CallOptions? = nil, handler: @escaping (Echo_EchoResponse) -> Void) -> ServerStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse> {
return ServerStreamingClientCall(client: client, path: path(forMethod: "Expand"), request: request, callOptions: callOptions ?? self.defaultCallOptions, handler: handler)
}

/// Asynchronous client-streaming call to Collect.
///
/// Callers should use the `send` method on the returned object to send messages
/// to the server. The caller should send an `.end` after the final message has been sent.
///
/// - Parameters:
/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.
/// - Returns: A `ClientStreamingClientCall` with futures for the metadata, status and response.
internal func collect(callOptions: CallOptions? = nil) -> ClientStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse> {
return ClientStreamingClientCall(client: client, path: path(forMethod: "Collect"), callOptions: callOptions ?? self.defaultCallOptions)
}

/// Asynchronous bidirectional-streaming call to Update.
///
/// Callers should use the `send` method on the returned object to send messages
/// to the server. The caller should send an `.end` after the final message has been sent.
///
/// - Parameters:
/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.
/// - handler: A closure called when each response is received from the server.
/// - Returns: A `ClientStreamingClientCall` with futures for the metadata and status.
internal func update(callOptions: CallOptions? = nil, handler: @escaping (Echo_EchoResponse) -> Void) -> BidirectionalStreamingClientCall<Echo_EchoRequest, Echo_EchoResponse> {
return BidirectionalStreamingClientCall(client: client, path: path(forMethod: "Update"), callOptions: callOptions ?? self.defaultCallOptions, handler: handler)
}

}

/// To build a server, implement a class that conforms to this protocol.
internal protocol Echo_EchoProvider_NIO: CallHandlerProvider {
func get(request: Echo_EchoRequest, context: StatusOnlyCallContext) -> EventLoopFuture<Echo_EchoResponse>
func expand(request: Echo_EchoRequest, context: StreamingResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<GRPCStatus>
func collect(context: UnaryResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void>
func update(context: StreamingResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void>
}

extension Echo_EchoProvider_NIO {
internal var serviceName: String { return "echo.Echo" }

/// Determines, calls and returns the appropriate request handler, depending on the request's method.
/// Returns nil for methods not handled by this service.
internal func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel, errorDelegate: ServerErrorDelegate?) -> GRPCCallHandler? {
switch methodName {
case "Get":
return UnaryCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in
return { request in
self.get(request: request, context: context)
}
}

case "Expand":
return ServerStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in
return { request in
self.expand(request: request, context: context)
}
}

case "Collect":
return ClientStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in
return self.collect(context: context)
}

case "Update":
return BidirectionalStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in
return self.update(context: context)
}

default: return nil
}
}
}

2 changes: 1 addition & 1 deletion Sources/Examples/EchoNIO/Generated/echo.pb.swift
1 change: 0 additions & 1 deletion Sources/Examples/EchoNIO/Generated/echo_nio.grpc.swift

This file was deleted.

155 changes: 148 additions & 7 deletions Sources/Examples/EchoNIO/main.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
* Copyright 2019, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,9 +24,23 @@ func addressOption(_ address: String) -> Option<String> {
return Option("address", default: address, description: "address of server")
}

let portOption = Option("port",
default: "8080",
description: "port of server")
let portOption = Option("port", default: 8080)
let messageOption = Option("message",
default: "Testing 1 2 3",
description: "message to send")

/// Create en `EchoClient` and wait for it to initialize. Returns nil if initialisation fails.
func makeEchoClient(address: String, port: Int) -> Echo_EchoService_NIOClient? {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
do {
return try GRPCClient.start(host: address, port: port, eventLoopGroup: eventLoopGroup)
.map { client in Echo_EchoService_NIOClient(client: client) }
.wait()
} catch {
print("Unable to create an EchoClient: \(error)")
return nil
}
}

Group {
$0.command("serve",
Expand All @@ -38,14 +52,141 @@ Group {

print("starting insecure server")
_ = try! GRPCServer.start(hostname: address,
port: Int(port)!,
eventLoopGroup: eventLoopGroup,
serviceProviders: [EchoProviderNIO()])
port: port,
eventLoopGroup: eventLoopGroup,
serviceProviders: [EchoProviderNIO()])
.wait()

// This blocks to keep the main thread from finishing while the server runs,
// but the server never exits. Kill the process to stop it.
_ = sem.wait()
}

$0.command(
"get",
addressOption("localhost"),
portOption,
messageOption,
description: "Perform a unary get()."
) { address, port, message in
print("calling get")
guard let echo = makeEchoClient(address: address, port: port) else { return }

var requestMessage = Echo_EchoRequest()
requestMessage.text = message

print("get sending: \(requestMessage.text)")
let get = echo.get(requestMessage)
get.response.whenSuccess { response in
print("get received: \(response.text)")
}

get.response.whenFailure { error in
print("get response failed with error: \(error)")
}

// wait() on the status to stop the program from exiting.
do {
let status = try get.status.wait()
print("get completed with status: \(status)")
} catch {
print("get status failed with error: \(error)")
}
}

$0.command(
"expand",
addressOption("localhost"),
portOption,
messageOption,
description: "Perform a server-streaming expand()."
) { address, port, message in
print("calling expand")
guard let echo = makeEchoClient(address: address, port: port) else { return }

let requestMessage = Echo_EchoRequest.with { $0.text = message }

print("expand sending: \(requestMessage.text)")
let expand = echo.expand(requestMessage) { response in
print("expand received: \(response.text)")
}

// wait() on the status to stop the program from exiting.
do {
let status = try expand.status.wait()
print("expand completed with status: \(status)")
} catch {
print("expand status failed with error: \(error)")
}
}

$0.command(
"collect",
addressOption("localhost"),
portOption,
messageOption,
description: "Perform a client-streaming collect()."
) { address, port, message in
print("calling collect")
guard let echo = makeEchoClient(address: address, port: port) else { return }

let collect = echo.collect()

var queue = collect.newMessageQueue()
for part in message.components(separatedBy: " ") {
var requestMessage = Echo_EchoRequest()
requestMessage.text = part
print("collect sending: \(requestMessage.text)")
queue = queue.then { collect.sendMessage(requestMessage) }
}
queue.whenSuccess { collect.sendEnd(promise: nil) }

collect.response.whenSuccess { respone in
print("collect received: \(respone.text)")
}

collect.response.whenFailure { error in
print("collect response failed with error: \(error)")
}

// wait() on the status to stop the program from exiting.
do {
let status = try collect.status.wait()
print("collect completed with status: \(status)")
} catch {
print("collect status failed with error: \(error)")
}
}

$0.command(
"update",
addressOption("localhost"),
portOption,
messageOption,
description: "Perform a bidirectional-streaming update()."
) { address, port, message in
print("calling update")
guard let echo = makeEchoClient(address: address, port: port) else { return }

let update = echo.update { response in
print("update received: \(response.text)")
}

var queue = update.newMessageQueue()
for part in message.components(separatedBy: " ") {
var requestMessage = Echo_EchoRequest()
requestMessage.text = part
print("update sending: \(requestMessage.text)")
queue = queue.then { update.sendMessage(requestMessage) }
}
queue.whenSuccess { update.sendEnd(promise: nil) }

// wait() on the status to stop the program from exiting.
do {
let status = try update.status.wait()
print("update completed with status: \(status)")
} catch {
print("update status failed with error: \(error)")
}
}
}.run()
4 changes: 2 additions & 2 deletions Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ extension BaseCallHandler: ChannelInboundHandler {
switch self.unwrapInboundIn(data) {
case .head(let requestHead):
// Head should have been handled by `GRPCChannelHandler`.
self.errorCaught(ctx: ctx, error: GRPCServerError.invalidState("unexpected request head received \(requestHead)"))
self.errorCaught(ctx: ctx, error: GRPCError.server(.invalidState("unexpected request head received \(requestHead)")))

case .message(let message):
do {
Expand All @@ -71,7 +71,7 @@ extension BaseCallHandler: ChannelOutboundHandler {

public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
guard serverCanWrite else {
promise?.fail(error: GRPCServerError.serverNotWritable)
promise?.fail(error: GRPCError.server(.serverNotWritable))
return
}

Expand Down
Loading