From 8a5092530d6647912e1c5dc34ffed85c74a81df7 Mon Sep 17 00:00:00 2001 From: Lev Zakharov Date: Mon, 1 Jul 2024 19:52:48 +0300 Subject: [PATCH 1/4] Add `discardResponseMessage` option for grpc client --- js/modules/k6/grpc/client.go | 13 +++--- js/modules/k6/grpc/client_test.go | 69 +++++++++++++++++++++++++++++++ js/modules/k6/grpc/params.go | 9 ++-- js/modules/k6/grpc/params_test.go | 41 ++++++++++++++++++ lib/netext/grpcext/conn.go | 15 +++---- lib/netext/grpcext/conn_test.go | 22 ++++++++++ 6 files changed, 153 insertions(+), 16 deletions(-) diff --git a/js/modules/k6/grpc/client.go b/js/modules/k6/grpc/client.go index fdcd80048d0..8ab8c42ce70 100644 --- a/js/modules/k6/grpc/client.go +++ b/js/modules/k6/grpc/client.go @@ -365,12 +365,13 @@ func (c *Client) buildInvokeRequest( p.SetSystemTags(state, c.addr, method) return grpcext.InvokeRequest{ - Method: method, - MethodDescriptor: methodDesc, - Timeout: p.Timeout, - Message: b, - TagsAndMeta: &p.TagsAndMeta, - Metadata: p.Metadata, + Method: method, + MethodDescriptor: methodDesc, + Timeout: p.Timeout, + DiscardResponseMessage: p.DiscardResponseMessage, + Message: b, + TagsAndMeta: &p.TagsAndMeta, + Metadata: p.Metadata, }, nil } diff --git a/js/modules/k6/grpc/client_test.go b/js/modules/k6/grpc/client_test.go index 11951986c3c..3f5bc71ae9f 100644 --- a/js/modules/k6/grpc/client_test.go +++ b/js/modules/k6/grpc/client_test.go @@ -310,6 +310,19 @@ func TestClient(t *testing.T) { client.invoke("grpc.testing.TestService/EmptyCall", {}, { timeout: 2000 })`, }, }, + { + name: "InvokeDiscardResponseMessage", + initString: codeBlock{ + code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`, + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + client.invoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true })`, + }, + }, { name: "Invoke", initString: codeBlock{code: ` @@ -333,6 +346,32 @@ func TestClient(t *testing.T) { }, }, }, + { + name: "InvokeDiscardResponseMessage", + initString: codeBlock{code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`}, + setup: func(tb *httpmultibin.HTTPMultiBin) { + tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) { + return &grpc_testing.Empty{}, nil + } + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + var resp = client.invoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true }) + if (resp.status !== grpc.StatusOK) { + throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status) + } + if (resp.message !== null) { + throw new Error("unexpected message: " + JSON.stringify(resp.message)) + }`, + asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) { + samplesBuf := metrics.GetBufferedSamples(samples) + assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall")) + }, + }, + }, { name: "AsyncInvoke", initString: codeBlock{code: ` @@ -360,6 +399,36 @@ func TestClient(t *testing.T) { }, }, }, + { + name: "AsyncInvokeDiscardResponseMessage", + initString: codeBlock{code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`}, + setup: func(tb *httpmultibin.HTTPMultiBin) { + tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) { + return &grpc_testing.Empty{}, nil + } + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true }).then(function(resp) { + if (resp.status !== grpc.StatusOK) { + throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status) + } + if (resp.message !== null) { + throw new Error("unexpected message: " + JSON.stringify(resp.message)) + } + }, (err) => { + throw new Error("unexpected error: " + err) + }) + `, + asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) { + samplesBuf := metrics.GetBufferedSamples(samples) + assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall")) + }, + }, + }, { name: "InvokeAnyProto", initString: codeBlock{code: ` diff --git a/js/modules/k6/grpc/params.go b/js/modules/k6/grpc/params.go index f03d10bea13..25af351350c 100644 --- a/js/modules/k6/grpc/params.go +++ b/js/modules/k6/grpc/params.go @@ -18,9 +18,10 @@ import ( // callParams is the parameters that can be passed to a gRPC calls // like invoke or newStream. type callParams struct { - Metadata metadata.MD - TagsAndMeta metrics.TagsAndMeta - Timeout time.Duration + Metadata metadata.MD + TagsAndMeta metrics.TagsAndMeta + Timeout time.Duration + DiscardResponseMessage bool } // newCallParams constructs the call parameters from the input value. @@ -58,6 +59,8 @@ func newCallParams(vu modules.VU, input sobek.Value) (*callParams, error) { if err != nil { return result, fmt.Errorf("invalid timeout value: %w", err) } + case "discardResponseMessage": + result.DiscardResponseMessage = params.Get(k).ToBoolean() default: return result, fmt.Errorf("unknown param: %q", k) } diff --git a/js/modules/k6/grpc/params_test.go b/js/modules/k6/grpc/params_test.go index ef0c20c14ca..76dbcdd9f19 100644 --- a/js/modules/k6/grpc/params_test.go +++ b/js/modules/k6/grpc/params_test.go @@ -139,6 +139,47 @@ func TestCallParamsTimeOutParse(t *testing.T) { } } +func TestCallParamsDiscardResponseMessageParse(t *testing.T) { + t.Parallel() + + testCases := []struct { + Name string + JSON string + DiscardResponseMessage bool + }{ + { + Name: "Empty", + JSON: `{}`, + DiscardResponseMessage: false, + }, + { + Name: "DiscardResponseMessageFalse", + JSON: `{ discardResponseMessage: false }`, + DiscardResponseMessage: false, + }, + { + Name: "DiscardResponseMessageTrue", + JSON: `{ discardResponseMessage: true }`, + DiscardResponseMessage: true, + }, + } + + for _, tc := range testCases { + tc := tc + + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + + testRuntime, params := newParamsTestRuntime(t, tc.JSON) + + p, err := newCallParams(testRuntime.VU, params) + require.NoError(t, err) + + assert.Equal(t, tc.DiscardResponseMessage, p.DiscardResponseMessage) + }) + } +} + // newParamsTestRuntime creates a new test runtime // that could be used to test the params // it also moves to the VU context and creates the params diff --git a/lib/netext/grpcext/conn.go b/lib/netext/grpcext/conn.go index 386a6a110ba..d9d76ad0bc7 100644 --- a/lib/netext/grpcext/conn.go +++ b/lib/netext/grpcext/conn.go @@ -30,12 +30,13 @@ import ( // InvokeRequest represents a unary gRPC request. type InvokeRequest struct { - Method string - MethodDescriptor protoreflect.MethodDescriptor - Timeout time.Duration - TagsAndMeta *metrics.TagsAndMeta - Message []byte - Metadata metadata.MD + Method string + MethodDescriptor protoreflect.MethodDescriptor + Timeout time.Duration + TagsAndMeta *metrics.TagsAndMeta + DiscardResponseMessage bool + Message []byte + Metadata metadata.MD } // InvokeResponse represents a gRPC response. @@ -165,7 +166,7 @@ func (c *Conn) Invoke( response.Error = errMsg } - if resp != nil { + if resp != nil && !req.DiscardResponseMessage { msg, err := convert(marshaler, resp) if err != nil { return nil, fmt.Errorf("unable to convert response object to JSON: %w", err) diff --git a/lib/netext/grpcext/conn_test.go b/lib/netext/grpcext/conn_test.go index 42bbb829f92..ad6650f26bd 100644 --- a/lib/netext/grpcext/conn_test.go +++ b/lib/netext/grpcext/conn_test.go @@ -64,6 +64,28 @@ func TestInvokeWithCallOptions(t *testing.T) { assert.NotNil(t, res) } +func TestInvokeWithDiscardResponseMessage(t *testing.T) { + t.Parallel() + + reply := func(_, _ *dynamicpb.Message, opts ...grpc.CallOption) error { + assert.Len(t, opts, 3) // two by default plus one injected + return nil + } + + c := Conn{raw: invokemock(reply)} + r := InvokeRequest{ + Method: "/hello.HelloService/NoOp", + MethodDescriptor: methodFromProto("NoOp"), + DiscardResponseMessage: true, + Message: []byte(`{}`), + Metadata: metadata.New(nil), + } + res, err := c.Invoke(context.Background(), r, grpc.UseCompressor("fakeone")) + require.NoError(t, err) + assert.NotNil(t, res) + assert.Nil(t, res.Message) +} + func TestInvokeReturnError(t *testing.T) { t.Parallel() From bf95696de32129c116769fd77bc0594a10ff1204 Mon Sep 17 00:00:00 2001 From: Lev Zakharov Date: Sat, 13 Jul 2024 18:01:09 +0300 Subject: [PATCH 2/4] Use emptypb.Empty for discarded response message --- lib/netext/grpcext/conn.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/netext/grpcext/conn.go b/lib/netext/grpcext/conn.go index d9d76ad0bc7..b476da1aa81 100644 --- a/lib/netext/grpcext/conn.go +++ b/lib/netext/grpcext/conn.go @@ -26,8 +26,11 @@ import ( "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/dynamicpb" + "google.golang.org/protobuf/types/known/emptypb" ) +var emptyDescriptor = (&emptypb.Empty{}).ProtoReflect().Descriptor() + // InvokeRequest represents a unary gRPC request. type InvokeRequest struct { Method string @@ -134,7 +137,13 @@ func (c *Conn) Invoke( ctx = withRPCState(ctx, &rpcState{tagsAndMeta: req.TagsAndMeta}) - resp := dynamicpb.NewMessage(req.MethodDescriptor.Output()) + var resp *dynamicpb.Message + if req.DiscardResponseMessage { + resp = dynamicpb.NewMessage(emptyDescriptor) + } else { + resp = dynamicpb.NewMessage(req.MethodDescriptor.Output()) + } + header, trailer := metadata.New(nil), metadata.New(nil) copts := make([]grpc.CallOption, 0, len(opts)+2) From c104c945a1ce6734a1281a2800c239f31089cf04 Mon Sep 17 00:00:00 2001 From: Lev Zakharov Date: Wed, 17 Jul 2024 15:48:53 +0300 Subject: [PATCH 3/4] Add nolint for emptyDescriptor global variable --- lib/netext/grpcext/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/netext/grpcext/conn.go b/lib/netext/grpcext/conn.go index b476da1aa81..24b9afebec2 100644 --- a/lib/netext/grpcext/conn.go +++ b/lib/netext/grpcext/conn.go @@ -29,7 +29,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -var emptyDescriptor = (&emptypb.Empty{}).ProtoReflect().Descriptor() +var emptyDescriptor = (&emptypb.Empty{}).ProtoReflect().Descriptor() //nolint:gochecknoglobals // InvokeRequest represents a unary gRPC request. type InvokeRequest struct { From 591cd4c8cceb0a01b12be469d030df86f1f01a9b Mon Sep 17 00:00:00 2001 From: Lev Zakharov Date: Fri, 19 Jul 2024 17:55:44 +0300 Subject: [PATCH 4/4] Remove global variable for emptyDescriptor --- lib/netext/grpcext/conn.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/netext/grpcext/conn.go b/lib/netext/grpcext/conn.go index 24b9afebec2..849fc3408e3 100644 --- a/lib/netext/grpcext/conn.go +++ b/lib/netext/grpcext/conn.go @@ -29,8 +29,6 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -var emptyDescriptor = (&emptypb.Empty{}).ProtoReflect().Descriptor() //nolint:gochecknoglobals - // InvokeRequest represents a unary gRPC request. type InvokeRequest struct { Method string @@ -139,7 +137,7 @@ func (c *Conn) Invoke( var resp *dynamicpb.Message if req.DiscardResponseMessage { - resp = dynamicpb.NewMessage(emptyDescriptor) + resp = dynamicpb.NewMessage((&emptypb.Empty{}).ProtoReflect().Descriptor()) } else { resp = dynamicpb.NewMessage(req.MethodDescriptor.Output()) }