Skip to content

Commit

Permalink
client add async send support
Browse files Browse the repository at this point in the history
  • Loading branch information
xiemalin committed Aug 6, 2021
1 parent 03fe48f commit 3522459
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 21 deletions.
41 changes: 41 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,44 @@ func (c *RpcClient) SendRpcRequestWithTimeout(timeout time.Duration, rpcInvocati
return r, nil

}

// RpcResult Rpc response result from client request api under asynchronous way
type RpcResult struct {
rpcData *RpcDataPackage
err error
message proto.Message
}

func (rr *RpcResult) Get() proto.Message {
return rr.message
}

func (rr *RpcResult) GetRpcDataPackage() *RpcDataPackage {
return rr.rpcData
}

func (rr *RpcResult) GetErr() error {
return rr.err
}

// SendRpcRequestAsyc send rpc request to remote server in asynchronous way
func (c *RpcClient) SendRpcRequestAsyc(rpcInvocation *RpcInvocation, responseMessage proto.Message) <-chan *RpcResult {
ch := make(chan *RpcResult, 1)

go func() {
defer func() {
if p := recover(); p != nil {
if err, ok := p.(error); ok {
r := &RpcResult{nil, err, responseMessage}
ch <- r
}
}
}()

resp, err := c.SendRpcRequest(rpcInvocation, responseMessage)
result := &RpcResult{resp, err, responseMessage}
ch <- result
}()

return ch
}
68 changes: 47 additions & 21 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func TestSingleTcpConnectionClient(t *testing.T) {
defer client.Close()
defer conn.Close()

testSendRpc("Client send rpc request", client, false, false, 0)
testSendRpc("Client send rpc request(async)", client, true, false, 0)
testSendRpc("Client send rpc request", client, false, false, 0, false)
testSendRpc("Client send rpc request(async)", client, true, false, 0, false)
})
}

Expand All @@ -72,8 +72,8 @@ func TestSingleTcpConnectionClientWithAuthenticate(t *testing.T) {
defer client.Close()
defer conn.Close()

testSendRpc("Client send rpc request", client, false, true, 0)
testSendRpc("Client send rpc request(async)", client, true, true, 0)
testSendRpc("Client send rpc request", client, false, true, 0, false)
testSendRpc("Client send rpc request(async)", client, true, true, 0, false)
})
}

Expand All @@ -91,8 +91,8 @@ func TestSingleTcpConnectionClientWithChunk(t *testing.T) {
defer client.Close()
defer conn.Close()

testSendRpc("Client send rpc request", client, false, true, 20)
testSendRpc("Client send rpc request(async)", client, true, true, 20)
testSendRpc("Client send rpc request", client, false, true, 20, false)
testSendRpc("Client send rpc request(async)", client, true, true, 20, false)
})
}

Expand All @@ -110,8 +110,8 @@ func TestSingleTcpConnectionClientAndServerWithChunk(t *testing.T) {
defer client.Close()
defer conn.Close()

testSendRpc("Client send rpc request", client, false, true, 20)
testSendRpc("Client send rpc request(async)", client, true, true, 20)
testSendRpc("Client send rpc request", client, false, true, 20, false)
testSendRpc("Client send rpc request(async)", client, true, true, 20, false)
})
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestSingleTcpConnectionClientWithBadChunkCase(t *testing.T) {
client.Session.SendReceive(dataPackage) // send bad chunk data package server will block unitl timeout
}()
time.Sleep(1 * time.Second)
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echo", false, false, false, false, false, 0)
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echo", false, false, false, false, false, 0, false)
})
}

Expand All @@ -163,27 +163,46 @@ func TestPooledTcpConnectionClient(t *testing.T) {
defer client.Close()
defer conn.Close()

testSendRpc("Client send rpc request", client, false, true, 0)
testSendRpc("Client send rpc request(async)", client, true, true, 0)
testSendRpc("Client send rpc request", client, false, true, 0, false)
testSendRpc("Client send rpc request(async)", client, true, true, 0, false)
})
}

func testSendRpc(testName string, client *baidurpc.RpcClient, async, auth bool, chunksize uint32) {
// TestSingleTcpConnectionClientByAsync
func TestSingleTcpConnectionClientByAsync(t *testing.T) {
Convey("TestSingleTcpConnectionClientByAsync", t, func() {
tcpServer := startRpcServer(0)
tcpServer.SetAuthService(new(StringMatchAuthService))
defer tcpServer.Stop()

conn, client, err := createClient()
So(err, ShouldBeNil)
So(conn, ShouldNotBeNil)
So(client, ShouldNotBeNil)
defer client.Close()
defer conn.Close()

testSendRpc("Client send rpc request", client, false, true, 0, true)
testSendRpc("Client send rpc request(async)", client, true, true, 0, true)
})
}

func testSendRpc(testName string, client *baidurpc.RpcClient, timeout, auth bool, chunksize uint32, async bool) {
Convey(testName, func() {
Convey("Test send request EchoService!echo", func() {
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echo", false, false, async, false, auth, chunksize)
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echo", false, false, timeout, false, auth, chunksize, async)
})
Convey("Test send request EchoService!echoWithAttchement", func() {
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithAttchement", true, false, async, false, auth, chunksize)
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithAttchement", true, false, timeout, false, auth, chunksize, async)
})
Convey("Test send request EchoService!echoWithCustomizedError", func() {
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithCustomizedError", false, true, async, false, auth, chunksize)
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithCustomizedError", false, true, timeout, false, auth, chunksize, async)
})
Convey("Test send request EchoService!echoWithoutContext", func() {
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithoutContext", false, false, async, false, auth, chunksize)
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "echoWithoutContext", false, false, timeout, false, auth, chunksize, async)
})
Convey("Test send request EchoService!EchoSlowTest", func() {
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "EchoSlowTest", false, false, async, true, auth, chunksize)
doSimpleRPCInvokeWithSignatureWithConvey(client, "EchoService", "EchoSlowTest", false, false, timeout, true, auth, chunksize, async)
})
})
}
Expand Down Expand Up @@ -262,7 +281,7 @@ func createPooledConnectionClient() (baidurpc.Connection, *baidurpc.RpcClient, e

// doSimpleRPCInvokeWithSignatureWithConvey send rpc request
func doSimpleRPCInvokeWithSignatureWithConvey(rpcClient *baidurpc.RpcClient, serviceName, methodName string,
withAttachement, withCustomErr, async, timeout, auth bool, chunkSize uint32) {
withAttachement, withCustomErr, timeout, timeoutCheck, auth bool, chunkSize uint32, async bool) {
Convey("Test Client send rpc request", func() {
rpcInvocation := baidurpc.NewRpcInvocation(&serviceName, &methodName)

Expand All @@ -284,14 +303,21 @@ func doSimpleRPCInvokeWithSignatureWithConvey(rpcClient *baidurpc.RpcClient, ser
parameterOut := EchoMessage{}
var response *baidurpc.RpcDataPackage
var err error
if async {
if timeout {
response, err = rpcClient.SendRpcRequestWithTimeout(1*time.Second, rpcInvocation, &parameterOut)
if timeout {
if timeoutCheck {
So(err, ShouldNotBeNil)
return
}
} else {
response, err = rpcClient.SendRpcRequest(rpcInvocation, &parameterOut)
if async {
ch := rpcClient.SendRpcRequestAsyc(rpcInvocation, &parameterOut)
rpcResult := <-ch
response = rpcResult.GetRpcDataPackage()
err = rpcResult.GetErr()
} else {
response, err = rpcClient.SendRpcRequest(rpcInvocation, &parameterOut)
}
}
if withCustomErr {
So(err, ShouldNotBeNil)
Expand Down

0 comments on commit 3522459

Please sign in to comment.