From 35224591283cb5c08852f8fc2d6a184a06b2010b Mon Sep 17 00:00:00 2001 From: xiemalin Date: Fri, 6 Aug 2021 17:09:08 +0800 Subject: [PATCH] client add async send support --- client.go | 41 ++++++++++++++++++++++++++++++ client_test.go | 68 ++++++++++++++++++++++++++++++++++---------------- 2 files changed, 88 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index 5911660..e561892 100644 --- a/client.go +++ b/client.go @@ -357,3 +357,44 @@ func (c *RpcClient) SendRpcRequestWithTimeout(timeout time.Duration, rpcInvocati return r, nil } + +// RpcResult Rpc response result from client request api under asynchronous way +type RpcResult struct { + rpcData *RpcDataPackage + err error + message proto.Message +} + +func (rr *RpcResult) Get() proto.Message { + return rr.message +} + +func (rr *RpcResult) GetRpcDataPackage() *RpcDataPackage { + return rr.rpcData +} + +func (rr *RpcResult) GetErr() error { + return rr.err +} + +// SendRpcRequestAsyc send rpc request to remote server in asynchronous way +func (c *RpcClient) SendRpcRequestAsyc(rpcInvocation *RpcInvocation, responseMessage proto.Message) <-chan *RpcResult { + ch := make(chan *RpcResult, 1) + + go func() { + defer func() { + if p := recover(); p != nil { + if err, ok := p.(error); ok { + r := &RpcResult{nil, err, responseMessage} + ch <- r + } + } + }() + + resp, err := c.SendRpcRequest(rpcInvocation, responseMessage) + result := &RpcResult{resp, err, responseMessage} + ch <- result + }() + + return ch +} diff --git a/client_test.go b/client_test.go index af69899..9a1a76c 100644 --- a/client_test.go +++ b/client_test.go @@ -53,8 +53,8 @@ func TestSingleTcpConnectionClient(t *testing.T) { defer client.Close() defer conn.Close() - testSendRpc("Client send rpc request", client, false, false, 0) - testSendRpc("Client send rpc request(async)", client, true, false, 0) + testSendRpc("Client send rpc request", client, false, false, 0, false) + testSendRpc("Client send rpc request(async)", client, true, false, 0, false) }) } @@ -72,8 +72,8 @@ func TestSingleTcpConnectionClientWithAuthenticate(t *testing.T) { defer client.Close() defer conn.Close() - testSendRpc("Client send rpc request", client, false, true, 0) - testSendRpc("Client send rpc request(async)", client, true, true, 0) + testSendRpc("Client send rpc request", client, false, true, 0, false) + testSendRpc("Client send rpc request(async)", client, true, true, 0, false) }) } @@ -91,8 +91,8 @@ func TestSingleTcpConnectionClientWithChunk(t *testing.T) { defer client.Close() defer conn.Close() - testSendRpc("Client send rpc request", client, false, true, 20) - testSendRpc("Client send rpc request(async)", client, true, true, 20) + testSendRpc("Client send rpc request", client, false, true, 20, false) + testSendRpc("Client send rpc request(async)", client, true, true, 20, false) }) } @@ -110,8 +110,8 @@ func TestSingleTcpConnectionClientAndServerWithChunk(t *testing.T) { defer client.Close() defer conn.Close() - testSendRpc("Client send rpc request", client, false, true, 20) - testSendRpc("Client send rpc request(async)", client, true, true, 20) + testSendRpc("Client send rpc request", client, false, true, 20, false) + testSendRpc("Client send rpc request(async)", client, true, true, 20, false) }) } @@ -146,7 +146,7 @@ func TestSingleTcpConnectionClientWithBadChunkCase(t *testing.T) { client.Session.SendReceive(dataPackage) // send bad chunk data package server will block unitl timeout }() time.Sleep(1 * time.Second) - doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echo", false, false, false, false, false, 0) + doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echo", false, false, false, false, false, 0, false) }) } @@ -163,27 +163,46 @@ func TestPooledTcpConnectionClient(t *testing.T) { defer client.Close() defer conn.Close() - testSendRpc("Client send rpc request", client, false, true, 0) - testSendRpc("Client send rpc request(async)", client, true, true, 0) + testSendRpc("Client send rpc request", client, false, true, 0, false) + testSendRpc("Client send rpc request(async)", client, true, true, 0, false) }) } -func testSendRpc(testName string, client *baidurpc.RpcClient, async, auth bool, chunksize uint32) { +// TestSingleTcpConnectionClientByAsync +func TestSingleTcpConnectionClientByAsync(t *testing.T) { + Convey("TestSingleTcpConnectionClientByAsync", t, func() { + tcpServer := startRpcServer(0) + tcpServer.SetAuthService(new(StringMatchAuthService)) + defer tcpServer.Stop() + + conn, client, err := createClient() + So(err, ShouldBeNil) + So(conn, ShouldNotBeNil) + So(client, ShouldNotBeNil) + defer client.Close() + defer conn.Close() + + testSendRpc("Client send rpc request", client, false, true, 0, true) + testSendRpc("Client send rpc request(async)", client, true, true, 0, true) + }) +} + +func testSendRpc(testName string, client *baidurpc.RpcClient, timeout, auth bool, chunksize uint32, async bool) { Convey(testName, func() { Convey("Test send request EchoService!echo", func() { - doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echo", false, false, async, false, auth, chunksize) + doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echo", false, false, timeout, false, auth, chunksize, async) }) Convey("Test send request EchoService!echoWithAttchement", func() { - doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithAttchement", true, false, async, false, auth, chunksize) + doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithAttchement", true, false, timeout, false, auth, chunksize, async) }) Convey("Test send request EchoService!echoWithCustomizedError", func() { - doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithCustomizedError", false, true, async, false, auth, chunksize) + doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithCustomizedError", false, true, timeout, false, auth, chunksize, async) }) Convey("Test send request EchoService!echoWithoutContext", func() { - doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithoutContext", false, false, async, false, auth, chunksize) + doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithoutContext", false, false, timeout, false, auth, chunksize, async) }) Convey("Test send request EchoService!EchoSlowTest", func() { - doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "EchoSlowTest", false, false, async, true, auth, chunksize) + doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "EchoSlowTest", false, false, timeout, true, auth, chunksize, async) }) }) } @@ -262,7 +281,7 @@ func createPooledConnectionClient() (baidurpc.Connection, *baidurpc.RpcClient, e // doSimpleRPCInvokeWithSignatureWithConvey send rpc request func doSimpleRPCInvokeWithSignatureWithConvey(rpcClient *baidurpc.RpcClient, serviceName, methodName string, - withAttachement, withCustomErr, async, timeout, auth bool, chunkSize uint32) { + withAttachement, withCustomErr, timeout, timeoutCheck, auth bool, chunkSize uint32, async bool) { Convey("Test Client send rpc request", func() { rpcInvocation := baidurpc.NewRpcInvocation(&serviceName, &methodName) @@ -284,14 +303,21 @@ func doSimpleRPCInvokeWithSignatureWithConvey(rpcClient *baidurpc.RpcClient, ser parameterOut := EchoMessage{} var response *baidurpc.RpcDataPackage var err error - if async { + if timeout { response, err = rpcClient.SendRpcRequestWithTimeout(1*time.Second, rpcInvocation, ¶meterOut) - if timeout { + if timeoutCheck { So(err, ShouldNotBeNil) return } } else { - response, err = rpcClient.SendRpcRequest(rpcInvocation, ¶meterOut) + if async { + ch := rpcClient.SendRpcRequestAsyc(rpcInvocation, ¶meterOut) + rpcResult := <-ch + response = rpcResult.GetRpcDataPackage() + err = rpcResult.GetErr() + } else { + response, err = rpcClient.SendRpcRequest(rpcInvocation, ¶meterOut) + } } if withCustomErr { So(err, ShouldNotBeNil)