From 78a1291789195e8e064f87081fd071c1bcd2f111 Mon Sep 17 00:00:00 2001 From: Daniel Alm Date: Mon, 19 Mar 2018 16:23:54 +0100 Subject: [PATCH 1/5] Fix two huge memory leaks in `cgrpc_call` and `cgrpc_handler`. These would all GRPC call objects to never get released, which in turn caused their completion queues and associated file descriptors to never get released. This became apparent after ~3-5k requests on macOS. --- Sources/CgRPC/shim/call.c | 4 +++- Sources/CgRPC/shim/channel.c | 1 + Sources/CgRPC/shim/handler.c | 13 ++++++++++--- Sources/SwiftGRPC/Core/Handler.swift | 2 +- Sources/SwiftGRPC/Core/Metadata.swift | 6 +++--- Sources/SwiftGRPC/Runtime/ServiceServer.swift | 16 ++++++++++------ Tests/SwiftGRPCTests/EchoTests.swift | 15 +++++++++++++++ 7 files changed, 43 insertions(+), 14 deletions(-) diff --git a/Sources/CgRPC/shim/call.c b/Sources/CgRPC/shim/call.c index ee604bb2f..034042661 100644 --- a/Sources/CgRPC/shim/call.c +++ b/Sources/CgRPC/shim/call.c @@ -21,7 +21,9 @@ #include void cgrpc_call_destroy(cgrpc_call *call) { - //grpc_call_destroy(call->call); + if (call->call) { + grpc_call_unref(call->call); + } free(call); } diff --git a/Sources/CgRPC/shim/channel.c b/Sources/CgRPC/shim/channel.c index 619433e42..f791439a1 100644 --- a/Sources/CgRPC/shim/channel.c +++ b/Sources/CgRPC/shim/channel.c @@ -85,6 +85,7 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel, // create call host_slice = grpc_slice_from_copied_string(host); gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout); + // The resulting call will have a retain call of +1. We'll release it in `cgrpc_call_destroy()`. grpc_call *channel_call = grpc_channel_create_call(channel->channel, NULL, GRPC_PROPAGATE_DEFAULTS, diff --git a/Sources/CgRPC/shim/handler.c b/Sources/CgRPC/shim/handler.c index 0f54fe9f0..8a0f58a81 100644 --- a/Sources/CgRPC/shim/handler.c +++ b/Sources/CgRPC/shim/handler.c @@ -38,7 +38,7 @@ void cgrpc_handler_destroy(cgrpc_handler *h) { grpc_metadata_array_destroy(&(h->request_metadata_recv)); grpc_call_details_destroy(&(h->call_details)); if (h->server_call) { - //grpc_call_destroy(h->server_call); + grpc_call_unref(h->server_call); } free(h); } @@ -67,6 +67,10 @@ cgrpc_call *cgrpc_handler_get_call(cgrpc_handler *h) { cgrpc_call *call = (cgrpc_call *) malloc(sizeof(cgrpc_call)); memset(call, 0, sizeof(cgrpc_call)); call->call = h->server_call; + if (call->call) { + // This retain will be balanced by `cgrpc_call_destroy()`. + grpc_call_ref(call->call); + } return call; } @@ -77,6 +81,11 @@ cgrpc_completion_queue *cgrpc_handler_get_completion_queue(cgrpc_handler *h) { grpc_call_error cgrpc_handler_request_call(cgrpc_handler *h, cgrpc_metadata_array *metadata, long tag) { + if (h->server_call != NULL) { + return GRPC_CALL_OK; + } + // This fills `h->server_call` with a call with retain count of +1. + // We'll release that retain in `cgrpc_handler_destroy()`. return grpc_server_request_call(h->server->server, &(h->server_call), &(h->call_details), @@ -85,5 +94,3 @@ grpc_call_error cgrpc_handler_request_call(cgrpc_handler *h, h->server->completion_queue, cgrpc_create_tag(tag)); } - - diff --git a/Sources/SwiftGRPC/Core/Handler.swift b/Sources/SwiftGRPC/Core/Handler.swift index 55d2f7390..fa18c4e8a 100644 --- a/Sources/SwiftGRPC/Core/Handler.swift +++ b/Sources/SwiftGRPC/Core/Handler.swift @@ -32,7 +32,7 @@ public class Handler { /// A Call object that can be used to respond to the request public private(set) lazy var call: Call = { Call(underlyingCall: cgrpc_handler_get_call(self.underlyingHandler), - owned: false, + owned: true, completionQueue: self.completionQueue) }() diff --git a/Sources/SwiftGRPC/Core/Metadata.swift b/Sources/SwiftGRPC/Core/Metadata.swift index 236230de9..5e01fa1fb 100644 --- a/Sources/SwiftGRPC/Core/Metadata.swift +++ b/Sources/SwiftGRPC/Core/Metadata.swift @@ -69,13 +69,13 @@ public class Metadata: CustomStringConvertible { } public var description: String { - var result = "" + var lines: [String] = [] for i in 0.. Metadata { diff --git a/Sources/SwiftGRPC/Runtime/ServiceServer.swift b/Sources/SwiftGRPC/Runtime/ServiceServer.swift index ba22dd48e..ce48a87a8 100644 --- a/Sources/SwiftGRPC/Runtime/ServiceServer.swift +++ b/Sources/SwiftGRPC/Runtime/ServiceServer.swift @@ -22,6 +22,8 @@ open class ServiceServer { public let address: String public let server: Server + public var shouldLogRequests = true + /// Create a server that accepts insecure connections. public init(address: String) { gRPC.initialize() @@ -58,13 +60,15 @@ open class ServiceServer { return } - let unwrappedHost = handler.host ?? "(nil)" let unwrappedMethod = handler.method ?? "(nil)" - let unwrappedCaller = handler.caller ?? "(nil)" - print("Server received request to " + unwrappedHost - + " calling " + unwrappedMethod - + " from " + unwrappedCaller - + " with " + handler.requestMetadata.description) + if strongSelf.shouldLogRequests == true { + let unwrappedHost = handler.host ?? "(nil)" + let unwrappedCaller = handler.caller ?? "(nil)" + print("Server received request to " + unwrappedHost + + " calling " + unwrappedMethod + + " from " + unwrappedCaller + + " with " + handler.requestMetadata.description) + } do { if !(try strongSelf.handleMethod(unwrappedMethod, handler: handler, queue: queue)) { diff --git a/Tests/SwiftGRPCTests/EchoTests.swift b/Tests/SwiftGRPCTests/EchoTests.swift index a1a871dea..b63b40cd5 100644 --- a/Tests/SwiftGRPCTests/EchoTests.swift +++ b/Tests/SwiftGRPCTests/EchoTests.swift @@ -22,6 +22,7 @@ class EchoTests: BasicEchoTestCase { static var allTests: [(String, (EchoTests) -> () throws -> Void)] { return [ ("testUnary", testUnary), + ("testUnaryLotsOfRequests", testUnaryLotsOfRequests), ("testClientStreaming", testClientStreaming), ("testClientStreamingLotsOfMessages", testClientStreamingLotsOfMessages), ("testServerStreaming", testServerStreaming), @@ -48,6 +49,20 @@ extension EchoTests { XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text) XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text) } + + func testUnaryLotsOfRequests() { + // No need to spam the log with 50k lines. + server.shouldLogRequests = false + let clockStart = clock() + let numberOfRequests = 50_000 + for i in 0.. Date: Mon, 19 Mar 2018 16:25:03 +0100 Subject: [PATCH 2/5] Fix GRPCTests, hopefully once and for all (see 415307ee for an explanation). --- Tests/SwiftGRPCTests/GRPCTests.swift | 124 +++++++++++++++++---------- 1 file changed, 81 insertions(+), 43 deletions(-) diff --git a/Tests/SwiftGRPCTests/GRPCTests.swift b/Tests/SwiftGRPCTests/GRPCTests.swift index bbc43380a..3c190d033 100644 --- a/Tests/SwiftGRPCTests/GRPCTests.swift +++ b/Tests/SwiftGRPCTests/GRPCTests.swift @@ -19,12 +19,21 @@ import Foundation import XCTest class gRPCTests: XCTestCase { + // We have seen this test flake out in rare cases fairly often due to race conditions. + // To detect such rare errors, we run the tests several times. + // (By now, all known errors should have been fixed, but we'd still like to detect new ones.) + let testRepetitions = 10 + func testConnectivity() { - runTest(useSSL: false) + for _ in 0.. () throws -> Void)] { @@ -75,11 +84,8 @@ let helloServerStream = "/hello.server-stream" let helloBiDiStream = "/hello.bidi-stream" // Return code/message for unary test -let oddStatusCode = StatusCode.ok let oddStatusMessage = "OK" - -let evenStatusCode = StatusCode.notFound -let eventStatusMessage = "Not Found" +let evenStatusMessage = "some other status message" func runTest(useSSL: Bool) { gRPC.initialize() @@ -141,9 +147,12 @@ func runClient(useSSL: Bool) throws { } channel.host = host - try callUnary(channel: channel) - try callServerStream(channel: channel) - try callBiDiStream(channel: channel) + for _ in 0..<10 { + // Send several calls to each server we spin up, to ensure that each individual server can handle many requests. + try callUnary(channel: channel) + try callServerStream(channel: channel) + try callBiDiStream(channel: channel) + } } func callUnary(channel: Channel) throws { @@ -157,23 +166,32 @@ func callUnary(channel: Channel) throws { try call.start(.unary, metadata: metadata, message: message) { response in // verify the basic response from the server - XCTAssertEqual(response.statusCode, (i % 2 == 0) ? evenStatusCode : oddStatusCode) - XCTAssertEqual(response.statusMessage, (i % 2 == 0) ? eventStatusMessage : oddStatusMessage) + XCTAssertEqual(response.statusCode, .ok) + XCTAssertEqual(response.statusMessage, (i % 2 == 0) ? evenStatusMessage : oddStatusMessage) // verify the message from the server if (i % 2) == 0 { - let resultData = response.resultData! - let messageString = String(data: resultData, encoding: .utf8) - XCTAssertEqual(messageString, serverText) + if let resultData = response.resultData { + let messageString = String(data: resultData, encoding: .utf8) + XCTAssertEqual(messageString, serverText) + } else { + XCTFail("callUnary response missing") + } } - + // verify the initial metadata from the server - let initialMetadata = response.initialMetadata! - verify_metadata(initialMetadata, expected: initialServerMetadata) - + if let initialMetadata = response.initialMetadata { + verify_metadata(initialMetadata, expected: initialServerMetadata) + } else { + XCTFail("callUnary initial metadata missing") + } + // verify the trailing metadata from the server - let trailingMetadata = response.trailingMetadata! - verify_metadata(trailingMetadata, expected: trailingServerMetadata) + if let trailingMetadata = response.trailingMetadata { + verify_metadata(trailingMetadata, expected: trailingServerMetadata) + } else { + XCTFail("callUnary trailing metadata missing") + } // report completion sem.signal() @@ -197,8 +215,11 @@ func callServerStream(channel: Channel) throws { XCTAssertEqual(response.statusMessage, "Custom Status Message ServerStreaming") // verify the trailing metadata from the server - let trailingMetadata = response.trailingMetadata! - verify_metadata(trailingMetadata, expected: trailingServerMetadata) + if let trailingMetadata = response.trailingMetadata { + verify_metadata(trailingMetadata, expected: trailingServerMetadata) + } else { + XCTFail("callServerStream trailing metadata missing") + } sem.signal() // signal call is finished } @@ -224,29 +245,31 @@ let clientPing = "ping" let serverPong = "pong" func callBiDiStream(channel: Channel) throws { - let message = clientPing.data(using: .utf8) let metadata = Metadata(initialClientMetadata) let sem = DispatchSemaphore(value: 0) let method = helloBiDiStream let call = channel.makeCall(method) - try call.start(.bidiStreaming, metadata: metadata, message: message) { + try call.start(.bidiStreaming, metadata: metadata, message: nil) { response in XCTAssertEqual(response.statusCode, .ok) XCTAssertEqual(response.statusMessage, "Custom Status Message BiDi") // verify the trailing metadata from the server - let trailingMetadata = response.trailingMetadata! - verify_metadata(trailingMetadata, expected: trailingServerMetadata) + if let trailingMetadata = response.trailingMetadata { + verify_metadata(trailingMetadata, expected: trailingServerMetadata) + } else { + XCTFail("callBiDiStream trailing metadata missing") + } sem.signal() // signal call is finished } // Send pings + let message = clientPing.data(using: .utf8)! for _ in 0.. Date: Mon, 19 Mar 2018 17:42:38 +0100 Subject: [PATCH 3/5] Shift the responsibility for draining and destroying a completion queue to the queue itself. This is needed because it appears that otherwise, the underlying completion queue gets deallocated during its spinloop, which it doesn't like. --- Sources/CgRPC/shim/cgrpc.h | 3 + Sources/CgRPC/shim/channel.c | 2 - Sources/CgRPC/shim/handler.c | 2 - Sources/CgRPC/shim/server.c | 2 - Sources/SwiftGRPC/Core/CompletionQueue.swift | 5 + Sources/SwiftGRPC/Core/Server.swift | 10 +- Tests/SwiftGRPCTests/EchoTests.swift | 4 +- Tests/SwiftGRPCTests/GRPCTests.swift | 127 ++++++++++++------- 8 files changed, 94 insertions(+), 61 deletions(-) diff --git a/Sources/CgRPC/shim/cgrpc.h b/Sources/CgRPC/shim/cgrpc.h index 57ef4424b..98c66a9b2 100644 --- a/Sources/CgRPC/shim/cgrpc.h +++ b/Sources/CgRPC/shim/cgrpc.h @@ -110,6 +110,9 @@ void grpc_shutdown(void); const char *grpc_version_string(void); const char *grpc_g_stands_for(void); +void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq); +void grpc_completion_queue_destroy(cgrpc_completion_queue *cq); + // helper void cgrpc_free_copied_string(char *string); diff --git a/Sources/CgRPC/shim/channel.c b/Sources/CgRPC/shim/channel.c index f791439a1..e04d0b3a9 100644 --- a/Sources/CgRPC/shim/channel.c +++ b/Sources/CgRPC/shim/channel.c @@ -71,8 +71,6 @@ void cgrpc_channel_destroy(cgrpc_channel *c) { c->channel = NULL; grpc_completion_queue_shutdown(c->completion_queue); - cgrpc_completion_queue_drain(c->completion_queue); - grpc_completion_queue_destroy(c->completion_queue); free(c); } diff --git a/Sources/CgRPC/shim/handler.c b/Sources/CgRPC/shim/handler.c index 8a0f58a81..9566da8e0 100644 --- a/Sources/CgRPC/shim/handler.c +++ b/Sources/CgRPC/shim/handler.c @@ -33,8 +33,6 @@ cgrpc_handler *cgrpc_handler_create_with_server(cgrpc_server *server) { void cgrpc_handler_destroy(cgrpc_handler *h) { grpc_completion_queue_shutdown(h->completion_queue); - cgrpc_completion_queue_drain(h->completion_queue); - grpc_completion_queue_destroy(h->completion_queue); grpc_metadata_array_destroy(&(h->request_metadata_recv)); grpc_call_details_destroy(&(h->call_details)); if (h->server_call) { diff --git a/Sources/CgRPC/shim/server.c b/Sources/CgRPC/shim/server.c index 184452802..6ee6f4381 100644 --- a/Sources/CgRPC/shim/server.c +++ b/Sources/CgRPC/shim/server.c @@ -77,8 +77,6 @@ void cgrpc_server_destroy(cgrpc_server *server) { server->server = NULL; grpc_completion_queue_shutdown(server->completion_queue); - cgrpc_completion_queue_drain(server->completion_queue); - grpc_completion_queue_destroy(server->completion_queue); } void cgrpc_server_start(cgrpc_server *server) { diff --git a/Sources/SwiftGRPC/Core/CompletionQueue.swift b/Sources/SwiftGRPC/Core/CompletionQueue.swift index 0a34fe636..9701de07c 100644 --- a/Sources/SwiftGRPC/Core/CompletionQueue.swift +++ b/Sources/SwiftGRPC/Core/CompletionQueue.swift @@ -77,6 +77,11 @@ class CompletionQueue { self.underlyingCompletionQueue = underlyingCompletionQueue self.name = name } + + deinit { + cgrpc_completion_queue_drain(underlyingCompletionQueue) + grpc_completion_queue_destroy(underlyingCompletionQueue) + } /// Waits for an operation group to complete /// diff --git a/Sources/SwiftGRPC/Core/Server.swift b/Sources/SwiftGRPC/Core/Server.swift index 80f78f786..28a05d69c 100644 --- a/Sources/SwiftGRPC/Core/Server.swift +++ b/Sources/SwiftGRPC/Core/Server.swift @@ -66,8 +66,7 @@ public class Server { cgrpc_server_start(underlyingServer) // run the server on a new background thread dispatchQueue.async { - var running = true - while running { + spinloop: while true { do { let handler = Handler(underlyingServer: self.underlyingServer) try handler.requestCall(tag: Server.handlerCallTag) @@ -97,16 +96,17 @@ public class Server { } } } else if event.tag == Server.stopTag || event.tag == Server.destroyTag { - running = false // exit the loop + break spinloop } } else if event.type == .queueTimeout { // everything is fine + continue } else if event.type == .queueShutdown { - running = false + break spinloop } } catch { print("server call error: \(error)") - running = false + break spinloop } } self.onCompletion?() diff --git a/Tests/SwiftGRPCTests/EchoTests.swift b/Tests/SwiftGRPCTests/EchoTests.swift index b63b40cd5..fbbc05461 100644 --- a/Tests/SwiftGRPCTests/EchoTests.swift +++ b/Tests/SwiftGRPCTests/EchoTests.swift @@ -53,10 +53,12 @@ extension EchoTests { func testUnaryLotsOfRequests() { // No need to spam the log with 50k lines. server.shouldLogRequests = false + // Sending that many requests at once can sometimes trip things up, it seems. + client.timeout = 5.0 let clockStart = clock() let numberOfRequests = 50_000 for i in 0.. 0 { print("\(i) requests sent so far, elapsed time: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))") } XCTAssertEqual("Swift echo get: foo \(i)", try client.get(Echo_EchoRequest(text: "foo \(i)")).text) diff --git a/Tests/SwiftGRPCTests/GRPCTests.swift b/Tests/SwiftGRPCTests/GRPCTests.swift index 3c190d033..8b5b7d42b 100644 --- a/Tests/SwiftGRPCTests/GRPCTests.swift +++ b/Tests/SwiftGRPCTests/GRPCTests.swift @@ -46,7 +46,8 @@ class gRPCTests: XCTestCase { let address = "localhost:8085" let host = "example.com" -let clientText = "hello, server!" +let evenClientText = "hello, server!" +let oddClientText = "hello, server, please fail!" let serverText = "hello, client!" let initialClientMetadata = [ @@ -87,6 +88,10 @@ let helloBiDiStream = "/hello.bidi-stream" let oddStatusMessage = "OK" let evenStatusMessage = "some other status message" +// Parsing very large messages as String is very inefficient, +// so we avoid it anything above this threshold. +let sizeThresholdForReturningDataVerbatim = 10_000 + func runTest(useSSL: Bool) { gRPC.initialize() @@ -147,62 +152,78 @@ func runClient(useSSL: Bool) throws { } channel.host = host + let largeMessage = Data(repeating: 88 /* 'X' */, count: 4_000_000) for _ in 0..<10 { // Send several calls to each server we spin up, to ensure that each individual server can handle many requests. try callUnary(channel: channel) try callServerStream(channel: channel) try callBiDiStream(channel: channel) } + // Test sending a large message. + try callUnaryIndividual(channel: channel, message: largeMessage, shouldSucceed: true) + try callUnaryIndividual(channel: channel, message: largeMessage, shouldSucceed: true) } func callUnary(channel: Channel) throws { - let message = clientText.data(using: .utf8) - + let evenMessage = evenClientText.data(using: .utf8)! + let oddMessage = oddClientText.data(using: .utf8)! for i in 0..= sizeThresholdForReturningDataVerbatim { + XCTAssertEqual(message, resultData) + } else { let messageString = String(data: resultData, encoding: .utf8) XCTAssertEqual(messageString, serverText) - } else { - XCTFail("callUnary response missing") } - } - - // verify the initial metadata from the server - if let initialMetadata = response.initialMetadata { - verify_metadata(initialMetadata, expected: initialServerMetadata) - } else { - XCTFail("callUnary initial metadata missing") - } - - // verify the trailing metadata from the server - if let trailingMetadata = response.trailingMetadata { - verify_metadata(trailingMetadata, expected: trailingServerMetadata) } else { - XCTFail("callUnary trailing metadata missing") + XCTFail("callUnary response missing") } - - // report completion - sem.signal() } - // wait for the call to complete - _ = sem.wait() + + // verify the initial metadata from the server + if let initialMetadata = response.initialMetadata { + verify_metadata(initialMetadata, expected: initialServerMetadata) + } else { + XCTFail("callUnary initial metadata missing") + } + + // verify the trailing metadata from the server + if let trailingMetadata = response.trailingMetadata { + verify_metadata(trailingMetadata, expected: trailingServerMetadata) + } else { + XCTFail("callUnary trailing metadata missing") + } + + // report completion + sem.signal() } + // wait for the call to complete + _ = sem.wait() } func callServerStream(channel: Channel) throws { - let message = clientText.data(using: .utf8) + let message = evenClientText.data(using: .utf8) let metadata = Metadata(initialClientMetadata) let sem = DispatchSemaphore(value: 0) @@ -300,14 +321,13 @@ func callBiDiStream(channel: Channel) throws { } func runServer(server: Server) throws -> DispatchSemaphore { - var requestCount = 0 let sem = DispatchSemaphore(value: 0) server.run { requestHandler in do { if let method = requestHandler.method { switch method { case hello: - try handleUnary(requestHandler: requestHandler, requestCount: requestCount) + try handleUnary(requestHandler: requestHandler) case helloServerStream: try handleServerStream(requestHandler: requestHandler) case helloBiDiStream: @@ -316,8 +336,6 @@ func runServer(server: Server) throws -> DispatchSemaphore { XCTFail("Invalid method \(method)") } } - - requestCount += 1 } catch { XCTFail("error \(error)") } @@ -330,33 +348,44 @@ func runServer(server: Server) throws -> DispatchSemaphore { return sem } -func handleUnary(requestHandler: Handler, requestCount: Int) throws { +func handleUnary(requestHandler: Handler) throws { XCTAssertEqual(requestHandler.host, host) XCTAssertEqual(requestHandler.method, hello) let initialMetadata = requestHandler.requestMetadata verify_metadata(initialMetadata, expected: initialClientMetadata) let initialMetadataToSend = Metadata(initialServerMetadata) + let receiveSem = DispatchSemaphore(value: 0) + var inputMessage: Data? try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) { if let messageData = $0 { - let messageString = String(data: messageData, encoding: .utf8) - XCTAssertEqual(messageString, clientText) + inputMessage = messageData + if messageData.count < sizeThresholdForReturningDataVerbatim { + let messageString = String(data: messageData, encoding: .utf8)! + XCTAssertTrue(messageString == evenClientText || messageString == oddClientText, + "handleUnary unexpected message string \(messageString)") + } } else { XCTFail("handleUnary message missing") } + receiveSem.signal() } + receiveSem.wait() // We need to return status OK in both cases, as it seems like the server might never send out the last few messages // once it has been asked to send a non-OK status. Alternatively, we could send a non-OK status here, but then we // would need to sleep for a few milliseconds before sending the non-OK status. - let replyMessage = serverText.data(using: .utf8)! - if (requestCount % 2) == 0 { - let trailingMetadataToSend = Metadata(trailingServerMetadata) + let replyMessage = (inputMessage == nil || inputMessage!.count < sizeThresholdForReturningDataVerbatim) + ? serverText.data(using: .utf8)! + : inputMessage! + let trailingMetadataToSend = Metadata(trailingServerMetadata) + if let inputMessage = inputMessage, + inputMessage.count >= sizeThresholdForReturningDataVerbatim + || inputMessage == evenClientText.data(using: .utf8)! { try requestHandler.sendResponse(message: replyMessage, status: ServerStatus(code: .ok, message: evenStatusMessage, trailingMetadata: trailingMetadataToSend)) } else { - let trailingMetadataToSend = Metadata(trailingServerMetadata) try requestHandler.sendStatus(ServerStatus(code: .ok, message: oddStatusMessage, trailingMetadata: trailingMetadataToSend)) @@ -373,7 +402,7 @@ func handleServerStream(requestHandler: Handler) throws { try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) { if let messageData = $0 { let messageString = String(data: messageData, encoding: .utf8) - XCTAssertEqual(messageString, clientText) + XCTAssertEqual(messageString, evenClientText) } else { XCTFail("handleServerStream message missing") } From 9b9ba22eed706c2e37626552639f0f40df87bc57 Mon Sep 17 00:00:00 2001 From: Daniel Alm Date: Mon, 19 Mar 2018 17:57:38 +0100 Subject: [PATCH 4/5] Try increasing the default timeout for server-cancelling tests to 5 seconds? --- Tests/SwiftGRPCTests/ServerCancellingTests.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Tests/SwiftGRPCTests/ServerCancellingTests.swift b/Tests/SwiftGRPCTests/ServerCancellingTests.swift index e355bd029..56985a37b 100644 --- a/Tests/SwiftGRPCTests/ServerCancellingTests.swift +++ b/Tests/SwiftGRPCTests/ServerCancellingTests.swift @@ -51,6 +51,8 @@ class ServerCancellingTests: BasicEchoTestCase { } override func makeProvider() -> Echo_EchoProvider { return CancellingProvider() } + + override var defaultTimeout: TimeInterval { return 5.0 } } extension ServerCancellingTests { From e68cec0db8b1cd90f319ff99c9a8417a711a0c0a Mon Sep 17 00:00:00 2001 From: Daniel Alm Date: Mon, 19 Mar 2018 18:51:47 +0100 Subject: [PATCH 5/5] Add a simple API to provide a channel's connectivity state. (See #186.) --- Sources/CgRPC/shim/cgrpc.h | 19 +++++++++++ Sources/CgRPC/shim/channel.c | 4 +++ Sources/SwiftGRPC/Core/Channel.swift | 4 +++ .../SwiftGRPC/Core/ConnectivityState.swift | 32 +++++++++++++++++++ 4 files changed, 59 insertions(+) create mode 100644 Sources/SwiftGRPC/Core/ConnectivityState.swift diff --git a/Sources/CgRPC/shim/cgrpc.h b/Sources/CgRPC/shim/cgrpc.h index 98c66a9b2..87a82670e 100644 --- a/Sources/CgRPC/shim/cgrpc.h +++ b/Sources/CgRPC/shim/cgrpc.h @@ -91,6 +91,22 @@ typedef enum grpc_completion_type { GRPC_OP_COMPLETE } grpc_completion_type; +/** Connectivity state of a channel. */ +typedef enum grpc_connectivity_state { + /** channel has just been initialized */ + GRPC_CHANNEL_INIT = -1, + /** channel is idle */ + GRPC_CHANNEL_IDLE, + /** channel is connecting */ + GRPC_CHANNEL_CONNECTING, + /** channel is ready for work */ + GRPC_CHANNEL_READY, + /** channel has seen a failure but expects to recover */ + GRPC_CHANNEL_TRANSIENT_FAILURE, + /** channel has seen a failure that it cannot recover from */ + GRPC_CHANNEL_SHUTDOWN +} grpc_connectivity_state; + typedef struct grpc_event { /** The type of the completion. */ grpc_completion_type type; @@ -129,6 +145,9 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel, double timeout); cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel); +grpc_connectivity_state cgrpc_channel_check_connectivity_state( + cgrpc_channel *channel, int try_to_connect); + // server support cgrpc_server *cgrpc_server_create(const char *address); cgrpc_server *cgrpc_server_create_secure(const char *address, diff --git a/Sources/CgRPC/shim/channel.c b/Sources/CgRPC/shim/channel.c index e04d0b3a9..6f5bf24e1 100644 --- a/Sources/CgRPC/shim/channel.c +++ b/Sources/CgRPC/shim/channel.c @@ -101,3 +101,7 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel, cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) { return channel->completion_queue; } + +grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) { + return grpc_channel_check_connectivity_state(channel->channel, try_to_connect); +} diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index c8b3d0a17..19a19490b 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -31,6 +31,10 @@ public class Channel { /// Default host to use for new calls public var host: String + + public var connectivityState: ConnectivityState? { + return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0)) + } /// Initializes a gRPC channel /// diff --git a/Sources/SwiftGRPC/Core/ConnectivityState.swift b/Sources/SwiftGRPC/Core/ConnectivityState.swift new file mode 100644 index 000000000..58e6c01ca --- /dev/null +++ b/Sources/SwiftGRPC/Core/ConnectivityState.swift @@ -0,0 +1,32 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#if SWIFT_PACKAGE + import CgRPC +#endif +import Foundation + +public enum ConnectivityState: Int32, Error { + case initializing = -1 + case idle + case connecting + case ready + case transient_failure + case shutdown + + static func fromCEnum(_ connectivityState: grpc_connectivity_state) -> ConnectivityState? { + return ConnectivityState(rawValue: connectivityState.rawValue) + } +}