Skip to content

Commit

Permalink
Add a second argument to callbacks for grpc streams with metadata (#3801
Browse files Browse the repository at this point in the history
)

Change updates the grpc stream callbacks to all receivers to be passed
a meta data object that includes the timestamp of the original event.
This change facilitates being able to accurately check the time when a
message was recevied by a client stream.

Co-authored-by: Joan López de la Franca Beltran <5459617+joanlopez@users.noreply.github.com>
  • Loading branch information
cchamplin and joanlopez authored Sep 12, 2024
1 parent 910eb0d commit f53d1c6
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 10 deletions.
10 changes: 5 additions & 5 deletions js/modules/k6/grpc/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type eventListener struct {

// this return sobek.value *and* error in order to return error on exception instead of panic
// https://pkg.go.dev/github.com/grafana/sobek#hdr-Functions
list []func(sobek.Value) (sobek.Value, error)
list []func(sobek.Value, sobek.Value) (sobek.Value, error)
}

// newListener creates a new listener of a certain type
Expand All @@ -38,7 +38,7 @@ func newListener(eventType string) *eventListener {
}

// add adds a listener to the listener list
func (l *eventListener) add(fn func(sobek.Value) (sobek.Value, error)) {
func (l *eventListener) add(fn func(sobek.Value, sobek.Value) (sobek.Value, error)) {
l.list = append(l.list, fn)
}

Expand All @@ -59,7 +59,7 @@ func (l *eventListeners) getType(t string) *eventListener {
}

// add adds a listener to the listeners
func (l *eventListeners) add(t string, f func(sobek.Value) (sobek.Value, error)) error {
func (l *eventListeners) add(t string, f func(sobek.Value, sobek.Value) (sobek.Value, error)) error {
list := l.getType(t)

if list == nil {
Expand All @@ -72,11 +72,11 @@ func (l *eventListeners) add(t string, f func(sobek.Value) (sobek.Value, error))
}

// all returns all possible listeners for a certain event type or an empty array
func (l *eventListeners) all(t string) []func(sobek.Value) (sobek.Value, error) {
func (l *eventListeners) all(t string) []func(sobek.Value, sobek.Value) (sobek.Value, error) {
list := l.getType(t)

if list == nil {
return []func(sobek.Value) (sobek.Value, error){}
return []func(sobek.Value, sobek.Value) (sobek.Value, error){}
}

return list.list
Expand Down
34 changes: 29 additions & 5 deletions js/modules/k6/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
closed
)

const (
timestampMetadata = "ts"
)

type stream struct {
vu modules.VU
client *Client
Expand Down Expand Up @@ -143,12 +147,13 @@ func (s *stream) loop() {
}

func (s *stream) queueMessage(msg interface{}) {
now := time.Now()
metrics.PushIfNotDone(s.vu.Context(), s.vu.State().Samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: s.instanceMetrics.StreamsMessagesReceived,
Tags: s.tagsAndMeta.Tags,
},
Time: time.Now(),
Time: now,
Metadata: s.tagsAndMeta.Metadata,
Value: 1,
})
Expand All @@ -157,8 +162,14 @@ func (s *stream) queueMessage(msg interface{}) {
rt := s.vu.Runtime()
listeners := s.eventListeners.all(eventData)

metadataObj := rt.NewObject()
err := metadataObj.Set(timestampMetadata, rt.ToValue(now.Unix()))
if err != nil {
return err
}

for _, messageListener := range listeners {
if _, err := messageListener(rt.ToValue(msg)); err != nil {
if _, err := messageListener(rt.ToValue(msg), metadataObj); err != nil {
// TODO(olegbespalov) consider logging the error
_ = s.closeWithError(err)

Expand Down Expand Up @@ -294,7 +305,7 @@ func (s *stream) processSendError(err error) {
}

// on registers a handler for a certain event type
func (s *stream) on(event string, handler func(sobek.Value) (sobek.Value, error)) {
func (s *stream) on(event string, handler func(sobek.Value, sobek.Value) (sobek.Value, error)) {
if handler == nil {
common.Throw(s.vu.Runtime(), fmt.Errorf("handler for %q event isn't a callable function", event))
}
Expand Down Expand Up @@ -384,8 +395,15 @@ func (s *stream) callErrorListeners(e error) error {
s.logger.Warnf("no handlers for error registered, but an error happened: %s", e)
}

now := time.Now()
metadataObj := rt.NewObject()
err := metadataObj.Set(timestampMetadata, rt.ToValue(now.Unix()))
if err != nil {
return err
}

for _, errorListener := range list {
if _, err := errorListener(rt.ToValue(obj)); err != nil {
if _, err := errorListener(rt.ToValue(obj), metadataObj); err != nil {
return err
}
}
Expand Down Expand Up @@ -426,10 +444,16 @@ func extractError(e error) grpcError {
}

func (s *stream) callEventListeners(eventType string) error {
now := time.Now()
rt := s.vu.Runtime()

metadataObj := rt.NewObject()
err := metadataObj.Set(timestampMetadata, rt.ToValue(now.Unix()))
if err != nil {
return err
}
for _, listener := range s.eventListeners.all(eventType) {
if _, err := listener(rt.ToValue(struct{}{})); err != nil {
if _, err := listener(rt.ToValue(struct{}{}), metadataObj); err != nil {
return err
}
}
Expand Down
86 changes: 86 additions & 0 deletions js/modules/k6/grpc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -357,6 +358,91 @@ func TestStream_ReceiveAllServerResponsesAfterEndWithDiscardedMessages(t *testin
}, ts.callRecorder.Recorded())
}

func TestStream_ReceiveMetadata(t *testing.T) {
t.Parallel()

ts := newTestState(t)

stub := &featureExplorerStub{}

savedFeatures := []*grpcservice.Feature{
{
Name: "foo",
Location: &grpcservice.Point{
Latitude: 1,
Longitude: 2,
},
},
{
Name: "bar",
Location: &grpcservice.Point{
Latitude: 3,
Longitude: 4,
},
},
}

stub.listFeatures = func(_ *grpcservice.Rectangle, stream grpcservice.FeatureExplorer_ListFeaturesServer) error {
for _, feature := range savedFeatures {
// adding a delay to make server response "slower"
time.Sleep(200 * time.Millisecond)

if err := stream.Send(feature); err != nil {
return err
}
}

return nil
}

grpcservice.RegisterFeatureExplorerServer(ts.httpBin.ServerGRPC, stub)

initString := codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/grpcservice/route_guide.proto");`,
}
vuString := codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
let stream = new grpc.Stream(client, "main.FeatureExplorer/ListFeatures")
stream.on('data', function (data, meta) {
call(meta.ts);
});
stream.on('end', function (_, meta) {
call(meta.ts);
});
stream.write({
lo: {
latitude: 1,
longitude: 2,
},
hi: {
latitude: 1,
longitude: 2,
},
});
stream.end();
`,
}

val, err := ts.Run(initString.code)
assertResponse(t, initString, err, val, ts)

ts.ToVUContext()

val, err = ts.RunOnEventLoop(vuString.code)

assertResponse(t, vuString, err, val, ts)

for _, call := range ts.callRecorder.Recorded() {
seconds, err := strconv.ParseInt(call, 10, 64)
assert.NoError(t, err)
metaTS := time.Unix(seconds, 0)
assert.WithinDuration(t, time.Now(), metaTS, 1*time.Minute)
}
}

// featureExplorerStub is a stub for FeatureExplorerServer
// it has ability to override methods
type featureExplorerStub struct {
Expand Down

0 comments on commit f53d1c6

Please sign in to comment.