From d5cd1010ecb2381376188c8a47ab861cf8b5dc3d Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Wed, 13 Mar 2024 10:15:50 +0100 Subject: [PATCH 1/8] sse: support Server Sent Event Close #746 --- js/modules/k6/experimental/sse/sse.go | 543 +++++++++++++++++++++ js/modules/k6/experimental/sse/sse_test.go | 346 +++++++++++++ lib/testutils/httpmultibin/httpmultibin.go | 32 ++ metrics/builtin.go | 7 + 4 files changed, 928 insertions(+) create mode 100644 js/modules/k6/experimental/sse/sse.go create mode 100644 js/modules/k6/experimental/sse/sse_test.go diff --git a/js/modules/k6/experimental/sse/sse.go b/js/modules/k6/experimental/sse/sse.go new file mode 100644 index 00000000000..4ea346122aa --- /dev/null +++ b/js/modules/k6/experimental/sse/sse.go @@ -0,0 +1,543 @@ +// Package sse implements a k6/sse for k6. It provides basic functionality to handle Server-Sent Event over http +// that *blocks* the event loop while the http connection is opened. +package sse + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/http/cookiejar" + "net/http/httptrace" + "strconv" + "strings" + "sync" + "time" + + "github.com/dop251/goja" + "go.k6.io/k6/js/common" + "go.k6.io/k6/js/modules" + httpModule "go.k6.io/k6/js/modules/k6/http" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" +) + +type ( + // RootModule is the global module instance that will create module + // instances for each VU. + RootModule struct{} + + // sse represents a module instance of the sse module. + sse struct { + vu modules.VU + obj *goja.Object + } +) + +var ( + _ modules.Module = &RootModule{} + _ modules.Instance = &sse{} +) + +// New returns a pointer to a new RootModule instance. +func New() *RootModule { + return &RootModule{} +} + +// NewModuleInstance implements the modules.Module interface to return +// a new instance for each VU. +func (*RootModule) NewModuleInstance(m modules.VU) modules.Instance { + rt := m.Runtime() + mi := &sse{ + vu: m, + } + obj := rt.NewObject() + if err := obj.Set("open", mi.Open); err != nil { + common.Throw(rt, err) + } + + mi.obj = obj + return mi +} + +// ErrSSEInInitContext is returned when sse are using in the init context +var ErrSSEInInitContext = common.NewInitContextError("using sse in the init context is not supported") + +// Client is the representation of the sse returned to the js. +type Client struct { + rt *goja.Runtime + ctx context.Context //nolint:containedctx + resp *http.Response + eventHandlers map[string][]goja.Callable + done chan struct{} + shutdownOnce sync.Once + + tagsAndMeta *metrics.TagsAndMeta + samplesOutput chan<- metrics.SampleContainer + builtinMetrics *metrics.BuiltinMetrics +} + +// HTTPResponse is the http response returned by sse.open. +type HTTPResponse struct { + URL string `json:"url"` + Status int `json:"status"` + Headers map[string]string `json:"headers"` + Error string `json:"error"` +} + +// Event represents a Server-Sent Event +type Event struct { + ID string + Name string + Data string +} + +type sseOpenArgs struct { + setupFn goja.Callable + headers http.Header + method string + body string + cookieJar *cookiejar.Jar + tagsAndMeta *metrics.TagsAndMeta +} + +// Exports returns the exports of the sse module. +func (mi *sse) Exports() modules.Exports { + return modules.Exports{Default: mi.obj} +} + +// Open establishes a client connection based on the parameters provided. +// +//nolint:funlen +func (mi *sse) Open(url string, args ...goja.Value) (*HTTPResponse, error) { + ctx := mi.vu.Context() + rt := mi.vu.Runtime() + state := mi.vu.State() + if state == nil { + return nil, ErrSSEInInitContext + } + + parsedArgs, err := parseConnectArgs(state, rt, args...) + if err != nil { + return nil, err + } + + parsedArgs.tagsAndMeta.SetSystemTagOrMetaIfEnabled(state.Options.SystemTags, metrics.TagURL, url) + + //nolint:bodyclose // as it's deferred closed in closeResponseBody + client, httpResponse, connEndHook, err := mi.open(ctx, state, rt, url, parsedArgs) + defer connEndHook() + if err != nil { + // Pass the error to the user script before exiting immediately + client.handleEvent("error", rt.ToValue(err)) + if state.Options.Throw.Bool { + return nil, err + } + if httpResponse != nil { + return wrapHTTPResponse(httpResponse) + } + return &HTTPResponse{Error: err.Error()}, nil + } + + // Run the user-provided set up function + if _, err := parsedArgs.setupFn(goja.Undefined(), rt.ToValue(&client)); err != nil { + _ = client.closeResponseBody() + return nil, err + } + + // The connection is now open, emit the event + client.handleEvent("open") + + readEventChan := make(chan Event) + readErrChan := make(chan error) + readCloseChan := make(chan int) + + reader := bufio.NewReader(httpResponse.Body) + + // Wraps a couple of channels + go client.readEvents(reader, readEventChan, readErrChan, readCloseChan) + + // This is the main control loop. All JS code (including error handlers) + // should only be executed by this thread to avoid race conditions + for { + select { + case event := <-readEventChan: + metrics.PushIfNotDone(ctx, client.samplesOutput, metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: client.builtinMetrics.SSEEventReceived, + Tags: client.tagsAndMeta.Tags, + }, + Time: time.Now(), + Metadata: client.tagsAndMeta.Metadata, + Value: 1, + }) + + client.handleEvent("event", rt.ToValue(event)) + + case readErr := <-readErrChan: + client.handleEvent("error", rt.ToValue(readErr)) + + case <-ctx.Done(): + // VU is shutting down during an interrupt + // client events will not be forwarded to the VU + _ = client.closeResponseBody() + + case <-readCloseChan: + _ = client.closeResponseBody() + + case <-client.done: + // This is the final exit point normally triggered by closeResponseBody + sseResponse, sseRespErr := wrapHTTPResponse(httpResponse) + if sseRespErr != nil { + return nil, sseRespErr + } + sseResponse.URL = url + return sseResponse, nil + } + } +} + +func (mi *sse) open( + ctx context.Context, state *lib.State, rt *goja.Runtime, url string, + args *sseOpenArgs, +) (*Client, *http.Response, func(), error) { + client := Client{ + ctx: ctx, + rt: rt, + eventHandlers: make(map[string][]goja.Callable), + done: make(chan struct{}), + samplesOutput: state.Samples, + tagsAndMeta: args.tagsAndMeta, + builtinMetrics: state.BuiltinMetrics, + } + + // Overriding the NextProtos to avoid talking http2 + var tlsConfig *tls.Config + if state.TLSConfig != nil { + tlsConfig = state.TLSConfig.Clone() + tlsConfig.NextProtos = []string{"http/1.1"} + } + + httpClient := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsConfig, + }, + } + // this is needed because of how interfaces work and that ssed.Jar is http.Cookiejar + if args.cookieJar != nil { + httpClient.Jar = args.cookieJar + } + + httpMethod := http.MethodGet + if args.method != "" { + httpMethod = args.method + } + + req, err := http.NewRequestWithContext(ctx, httpMethod, url, strings.NewReader(args.body)) + if err != nil { + return &client, nil, nil, err + } + + req.Header.Set("Cache-Control", "no-cache") + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Connection", "keep-alive") + + trace := &httptrace.ClientTrace{ + GotConn: func(connInfo httptrace.GotConnInfo) { + if state.Options.SystemTags.Has(metrics.TagIP) { + if ip, _, err2 := net.SplitHostPort(connInfo.Conn.RemoteAddr().String()); err2 == nil { + args.tagsAndMeta.SetSystemTagOrMeta(metrics.TagIP, ip) + } + } + }, + } + + //nolint:contextcheck // as it's passed in the request + req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + + connStart := time.Now() + resp, err := httpClient.Do(req) + connEnd := time.Now() + + if resp != nil { + client.resp = resp + if state.Options.SystemTags.Has(metrics.TagStatus) { + args.tagsAndMeta.SetSystemTagOrMeta( + metrics.TagStatus, strconv.Itoa(resp.StatusCode)) + } + } + + connEndHook := client.pushSessionMetrics(connStart, connEnd) + + return &client, resp, connEndHook, err +} + +// On is used to configure what the client should do on each event. +func (s *Client) On(event string, handler goja.Value) { + if handler, ok := goja.AssertFunction(handler); ok { + s.eventHandlers[event] = append(s.eventHandlers[event], handler) + } +} + +func (s *Client) handleEvent(event string, args ...goja.Value) { + if handlers, ok := s.eventHandlers[event]; ok { + for _, handler := range handlers { + if _, err := handler(goja.Undefined(), args...); err != nil { + common.Throw(s.rt, err) + } + } + } +} + +// closeResponseBody cleanly closes the response body. +// Returns an error if sending the response body cannot be closed. +func (s *Client) closeResponseBody() error { + var err error + + s.shutdownOnce.Do(func() { + err = s.resp.Body.Close() + if err != nil { + // Call the user-defined error handler + s.handleEvent("error", s.rt.ToValue(err)) + } + close(s.done) + }) + + return err +} + +func (s *Client) pushSessionMetrics(connStart, connEnd time.Time) func() { + connDuration := metrics.D(connEnd.Sub(connStart)) + + metrics.PushIfNotDone(s.ctx, s.samplesOutput, metrics.ConnectedSamples{ + Samples: []metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: s.builtinMetrics.HTTPReqSending, + Tags: s.tagsAndMeta.Tags, + }, + Time: connStart, + Metadata: s.tagsAndMeta.Metadata, + Value: connDuration, + }, + }, + Tags: s.tagsAndMeta.Tags, + Time: connStart, + }) + + return func() { + end := time.Now() + requestDuration := metrics.D(end.Sub(connStart)) + + metrics.PushIfNotDone(s.ctx, s.samplesOutput, metrics.ConnectedSamples{ + Samples: []metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: s.builtinMetrics.HTTPReqs, + Tags: s.tagsAndMeta.Tags, + }, + Time: end, + Metadata: s.tagsAndMeta.Metadata, + Value: 1, + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: s.builtinMetrics.HTTPReqSending, + Tags: s.tagsAndMeta.Tags, + }, + Time: end, + Metadata: s.tagsAndMeta.Metadata, + Value: connDuration, + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: s.builtinMetrics.HTTPReqDuration, + Tags: s.tagsAndMeta.Tags, + }, + Time: end, + Metadata: s.tagsAndMeta.Metadata, + Value: requestDuration, + }, + }, + Tags: s.tagsAndMeta.Tags, + Time: end, + }) + } +} + +// Wraps SSE in a channel +func (s *Client) readEvents(reader *bufio.Reader, readChan chan Event, errorChan chan error, closeChan chan int) { + ev := Event{} + + var buf bytes.Buffer + + sendEvent := false + for { + line, err := reader.ReadBytes('\n') + if err != nil { + if errors.Is(err, io.EOF) { + select { + case closeChan <- -1: + return + case <-s.done: + return + } + } else { + select { + case errorChan <- err: + return + case <-s.done: + return + } + } + } + + switch { + case hasPrefix(line, ":"): + // Comment, do nothing + + case hasPrefix(line, "retry:"): + // Retry, do nothing for now + + // id of event + case hasPrefix(line, "id: "): + ev.ID = stripPrefix(line, 4) + case hasPrefix(line, "id:"): + ev.ID = stripPrefix(line, 3) + + // name of event + case hasPrefix(line, "event: "): + ev.Name = stripPrefix(line, 7) + case hasPrefix(line, "event:"): + ev.Name = stripPrefix(line, 6) + + // event data + case hasPrefix(line, "data: "): + buf.Write(line[6:]) + sendEvent = true + case hasPrefix(line, "data:"): + buf.Write(line[5:]) + sendEvent = true + + // end of event + case bytes.Equal(line, []byte("\n")): + if sendEvent { + // Report an unexpected closure + ev.Data = buf.String() + select { + case readChan <- ev: + sendEvent = false + buf.Reset() + ev = Event{} + case <-s.done: + return + } + } + default: + select { + case errorChan <- errors.New("unknown event: " + string(line)): + case <-s.done: + return + } + } + } +} + +// Wrap the raw HTTPResponse we received to a sseHTTPResponse we can pass to the user +func wrapHTTPResponse(httpResponse *http.Response) (*HTTPResponse, error) { + sseResponse := HTTPResponse{ + Status: httpResponse.StatusCode, + } + + sseResponse.Headers = make(map[string]string, len(httpResponse.Header)) + for k, vs := range httpResponse.Header { + sseResponse.Headers[k] = strings.Join(vs, ", ") + } + + return &sseResponse, nil +} + +func parseConnectArgs(state *lib.State, rt *goja.Runtime, args ...goja.Value) (*sseOpenArgs, error) { + // The params argument is optional + var callableV, paramsV goja.Value + switch len(args) { + case 2: + paramsV = args[0] + callableV = args[1] + case 1: + paramsV = goja.Undefined() + callableV = args[0] + default: + return nil, errors.New("invalid number of arguments to sse.open") + } + // Get the callable (required) + setupFn, isFunc := goja.AssertFunction(callableV) + if !isFunc { + return nil, errors.New("last argument to sse.open must be a function") + } + + headers := make(http.Header) + headers.Set("User-Agent", state.Options.UserAgent.String) + tagsAndMeta := state.Tags.GetCurrentValues() + parsedArgs := &sseOpenArgs{ + setupFn: setupFn, + headers: headers, + cookieJar: state.CookieJar, + tagsAndMeta: &tagsAndMeta, + } + + if goja.IsUndefined(paramsV) || goja.IsNull(paramsV) { + return parsedArgs, nil + } + + // Parse the optional second argument (params) + params := paramsV.ToObject(rt) + for _, k := range params.Keys() { + switch k { + case "headers": + headersV := params.Get(k) + if goja.IsUndefined(headersV) || goja.IsNull(headersV) { + continue + } + headersObj := headersV.ToObject(rt) + if headersObj == nil { + continue + } + for _, key := range headersObj.Keys() { + parsedArgs.headers.Set(key, headersObj.Get(key).String()) + } + case "tags": + if err := common.ApplyCustomUserTags(rt, parsedArgs.tagsAndMeta, params.Get(k)); err != nil { + return nil, fmt.Errorf("invalid sse.open() metric tags: %w", err) + } + case "jar": + jarV := params.Get(k) + if goja.IsUndefined(jarV) || goja.IsNull(jarV) { + continue + } + if v, ok := jarV.Export().(*httpModule.CookieJar); ok { + parsedArgs.cookieJar = v.Jar + } + case "method": + parsedArgs.method = strings.TrimSpace(params.Get(k).ToString().String()) + case "body": + parsedArgs.body = strings.TrimSpace(params.Get(k).ToString().String()) + } + } + + return parsedArgs, nil +} + +func hasPrefix(s []byte, prefix string) bool { + return bytes.HasPrefix(s, []byte(prefix)) +} + +func stripPrefix(line []byte, start int) string { + return string(line[start : len(line)-1]) +} diff --git a/js/modules/k6/experimental/sse/sse_test.go b/js/modules/k6/experimental/sse/sse_test.go new file mode 100644 index 00000000000..e83ccc19599 --- /dev/null +++ b/js/modules/k6/experimental/sse/sse_test.go @@ -0,0 +1,346 @@ +package sse + +import ( + "crypto/tls" + "net/http" + "net/http/cookiejar" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + httpModule "go.k6.io/k6/js/modules/k6/http" + "go.k6.io/k6/js/modulestest" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils/httpmultibin" + "go.k6.io/k6/metrics" + "gopkg.in/guregu/null.v3" +) + +func assertSSEMetricsEmitted(t *testing.T, sampleContainers []metrics.SampleContainer, subprotocol, url string, status int, group string) { //nolint:unparam + seenEvents := false + seenRequestDuration := false + seenHTTPReq := false + + for _, sampleContainer := range sampleContainers { + for _, sample := range sampleContainer.GetSamples() { + tags := sample.Tags.Map() + if tags["url"] == url { + switch sample.Metric.Name { + case metrics.HTTPReqsName: + seenHTTPReq = true + case metrics.HTTPReqDurationName: + seenRequestDuration = true + case metrics.SSEName: + seenEvents = true + } + + assert.Equal(t, strconv.Itoa(status), tags["status"]) + assert.Equal(t, subprotocol, tags["subproto"]) + assert.Equal(t, group, tags["group"]) + } + } + } + assert.True(t, seenEvents, "url %s didn't emit SSE events", url) + assert.True(t, seenRequestDuration, "url %s didn't emit seenRequestDuration", url) + assert.True(t, seenHTTPReq, "url %s didn't emit seenHTTPReq", url) +} + +func assertMetricEmittedCount(t *testing.T, metricName string, sampleContainers []metrics.SampleContainer, url string, count int) { + t.Helper() + actualCount := 0 + + for _, sampleContainer := range sampleContainers { + for _, sample := range sampleContainer.GetSamples() { + surl, ok := sample.Tags.Get("url") + assert.True(t, ok) + if surl == url && sample.Metric.Name == metricName { + actualCount++ + } + } + } + assert.Equal(t, count, actualCount, "url %s emitted %s %d times, expected was %d times", url, metricName, actualCount, count) +} + +type testState struct { + *modulestest.Runtime + tb *httpmultibin.HTTPMultiBin + samples chan metrics.SampleContainer +} + +func newTestState(t testing.TB) testState { + tb := httpmultibin.NewHTTPMultiBin(t) + + testRuntime := modulestest.NewRuntime(t) + samples := make(chan metrics.SampleContainer, 1000) + + root, err := lib.NewGroup("", nil) + require.NoError(t, err) + registry := metrics.NewRegistry() + state := &lib.State{ + Group: root, + Dialer: tb.Dialer, + Options: lib.Options{ + SystemTags: metrics.NewSystemTagSet( + metrics.TagURL, + metrics.TagProto, + metrics.TagStatus, + metrics.TagSubproto, + ), + UserAgent: null.StringFrom("TestUserAgent"), + Throw: null.BoolFrom(true), + }, + Samples: samples, + TLSConfig: tb.TLSClientConfig, + BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), + Tags: lib.NewVUStateTags(registry.RootTagSet()), + } + + m := New().NewModuleInstance(testRuntime.VU) + require.NoError(t, testRuntime.VU.RuntimeField.Set("sse", m.Exports().Default)) + testRuntime.MoveToVUContext(state) + + return testState{ + Runtime: testRuntime, + tb: tb, + samples: samples, + } +} + +func TestOpen(t *testing.T) { + t.Parallel() + + t.Run("nominal", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(sr(` + var error = false; + var events = []; + var res = sse.open("HTTPBIN_IP_URL/sse", function(client){ + client.on("error", function(err) { + error = true + }); + client.on("event", function(event) { + events.push(event); + }); + }); + if (error) { + throw new Error("error raised"); + } + for (let i = 0; i < events.length; i++) { + let event = events[i]; + switch(i) { + case 0: + if (event.id !== "ABCD") { + throw new Error("unexpected event id: " + event.id); + } + if (event.data !== '{"ping": "pong"}\n{"hello": "sse"}\n') { + throw new Error("unexpected event data: " + event.data); + } + break; + case 1: + if (event.id !== "") { + throw new Error("unexpected event id: " + event.id); + } + if (event.name !== "EFGH") { + throw new Error("unexpected event name: " + event.name); + } + if (event.data !== '{"hello": "sse"}\n') { + throw new Error("unexpected event data: " + event.data); + } + break; + default: + throw new Error("unexpected event"); + } + } + `)) + require.NoError(t, err) + samplesBuf := metrics.GetBufferedSamples(test.samples) + assertMetricEmittedCount(t, metrics.SSEName, samplesBuf, sr("HTTPBIN_IP_URL/sse"), 2) + }) +} + +func TestErrors(t *testing.T) { + t.Parallel() + + t.Run("invalid_url", func(t *testing.T) { + t.Parallel() + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(` + var res = sse.open("INVALID", function(client){ + client.on("open", function(client){}); + }); + `) + assert.Error(t, err) + }) + + t.Run("error_in_setup", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_URL/sse-echo", function(client){ + throw new Error("error in setup"); + }); + `)) + assert.Error(t, err) + }) + + t.Run("error_in_response", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(sr(` + var error = false; + var res = sse.open("HTTPBIN_IP_URL/sse-invalid", function(client){ + client.on("error", function(err) { + error = true + }); + }); + if (!error) { + throw new Error("no error raised"); + } + `)) + require.NoError(t, err) + }) +} + +func TestOpenWrongStatusCode(t *testing.T) { + t.Parallel() + test := newTestState(t) + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + test.VU.StateField.Options.Throw = null.BoolFrom(false) + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/status/404", function(client){}); + if (res.status != 404) { + throw new Error ("no status code set for invalid response"); + } + `)) + assert.NoError(t, err) +} + +func TestUserAgent(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + tb.Mux.HandleFunc("/sse-echo-useragent", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + // Echo back User-Agent header if it exists + responseHeaders := w.Header() + if ua := req.Header.Get("User-Agent"); ua != "" { + responseHeaders.Add("X-Echo-User-Agent", req.Header.Get("User-Agent")) + } + _, err := w.Write([]byte(`data: {"ping": "pong"}` + "\n\n")) + require.NoError(t, err) + })) + + test := newTestState(t) + + // client handler should echo back User-Agent as Echo-User-Agent for this test to work + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/sse-echo-useragent", function(client){}) + var userAgent = res.headers["X-Echo-User-Agent"]; + if (userAgent == undefined) { + throw new Error("user agent is not echoed back by test server"); + } + if (userAgent != "Go-http-client/1.1") { + throw new Error("incorrect user agent: " + userAgent); + } + `)) + require.NoError(t, err) + + assertSSEMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("HTTPBIN_IP_URL/sse-echo-useragent"), http.StatusOK, "") +} + +func TestCookieJar(t *testing.T) { + t.Parallel() + ts := newTestState(t) + sr := ts.tb.Replacer.Replace + + ts.tb.Mux.HandleFunc("/sse-echo-someheader", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + responseHeaders := w.Header() + if sh, err := req.Cookie("someheader"); err == nil { + responseHeaders.Add("Echo-Someheader", sh.Value) + } + _, err := w.Write([]byte(`data: {"ping": "pong"}` + "\n\n")) + require.NoError(t, err) + })) + + err := ts.VU.Runtime().Set("http", httpModule.New().NewModuleInstance(ts.VU).Exports().Default) + require.NoError(t, err) + ts.VU.State().CookieJar, _ = cookiejar.New(nil) + + _, err = ts.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/sse-echo-someheader", function(client){}) + var someheader = res.headers["Echo-Someheader"]; + if (someheader !== undefined) { + throw new Error("someheader is echoed back by test server even though it doesn't exist"); + } + + http.cookieJar().set("HTTPBIN_IP_URL/sse-echo-someheader", "someheader", "defaultjar") + res = sse.open("HTTPBIN_IP_URL/sse-echo-someheader", function(client){}) + someheader = res.headers["Echo-Someheader"]; + if (someheader != "defaultjar") { + throw new Error("someheader has wrong value "+ someheader + " instead of defaultjar"); + } + + var jar = new http.CookieJar(); + jar.set("HTTPBIN_IP_URL/sse-echo-someheader", "someheader", "customjar") + res = sse.open("HTTPBIN_IP_URL/sse-echo-someheader", {jar: jar}, function(client){}) + someheader = res.headers["Echo-Someheader"]; + if (someheader != "customjar") { + throw new Error("someheader has wrong value "+ someheader + " instead of customjar"); + } + `)) + require.NoError(t, err) + + assertSSEMetricsEmitted(t, metrics.GetBufferedSamples(ts.samples), "", sr("HTTPBIN_IP_URL/sse-echo-someheader"), http.StatusOK, "") +} + +func TestTLSConfig(t *testing.T) { + t.Parallel() + t.Run("insecure skip verify", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + test.VU.StateField.TLSConfig = &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec + } + + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/sse", function(client){}); + if (res.status != 200) { throw new Error("TLS connection failed with status: " + res.status); } + `)) + require.NoError(t, err) + assertSSEMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("HTTPBIN_IP_URL/sse"), http.StatusOK, "") + }) + + t.Run("custom certificates", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + test.VU.StateField.TLSConfig = tb.TLSClientConfig + + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/sse", function(client){}); + if (res.status != 200) { + throw new Error("TLS connection failed with status: " + res.status); + } + `)) + require.NoError(t, err) + assertSSEMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("HTTPBIN_IP_URL/sse"), http.StatusOK, "") + }) +} diff --git a/lib/testutils/httpmultibin/httpmultibin.go b/lib/testutils/httpmultibin/httpmultibin.go index 36e4ad37387..538eba7acef 100644 --- a/lib/testutils/httpmultibin/httpmultibin.go +++ b/lib/testutils/httpmultibin/httpmultibin.go @@ -160,6 +160,36 @@ func echoHandler(t testing.TB, closePrematurely bool) http.Handler { }) } +// sseHandler handles sse requests and generates some events. +// If generateErrors is true then it generates junk +// without respecting the protocol. +func sseHandler(t testing.TB, generateErrors bool) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + if generateErrors { + _, _ = w.Write([]byte("junk\n")) + } else { + _, err := w.Write([]byte(": hello\n")) // comment + require.NoError(t, err) + + _, err = w.Write([]byte("retry: 10000\n")) // retry + require.NoError(t, err) + + _, err = w.Write([]byte("id: ABCD\n")) // id + require.NoError(t, err) + + _, err = w.Write([]byte(`data: {"ping": "pong"}` + "\n")) // data 1 event 1 + require.NoError(t, err) + _, err = w.Write([]byte(`data: {"hello": "sse"}` + "\n\n")) // data 2 event 1 + require.NoError(t, err) + + _, err = w.Write([]byte("event: EFGH\n")) // event name + require.NoError(t, err) + _, err = w.Write([]byte(`data: {"hello": "sse"}` + "\n\n")) // data event 2 + require.NoError(t, err) + } + }) +} + func writeJSON(w io.Writer, v interface{}) error { e := json.NewEncoder(w) e.SetIndent("", " ") @@ -294,6 +324,8 @@ func NewHTTPMultiBin(t testing.TB) *HTTPMultiBin { mux.Handle("/ws-echo-invalid", echoHandler(t, true)) mux.Handle("/ws-close", autocloseHandler(t)) mux.Handle("/ws-close-invalid", echoHandler(t, true)) + mux.Handle("/sse", sseHandler(t, false)) + mux.Handle("/sse-invalid", sseHandler(t, true)) mux.Handle("/zstd", getEncodedHandler(t, "zstd")) mux.Handle("/zstd-br", getZstdBrHandler(t)) mux.Handle("/", httpbin.New().Handler()) diff --git a/metrics/builtin.go b/metrics/builtin.go index d05f2baf002..0842632c089 100644 --- a/metrics/builtin.go +++ b/metrics/builtin.go @@ -33,6 +33,8 @@ const ( DataSentName = "data_sent" DataReceivedName = "data_received" + + SSEName = "sse_event" ) // BuiltinMetrics represent all the builtin metrics of k6 @@ -72,6 +74,9 @@ type BuiltinMetrics struct { // Network-related; used for future protocols as well. DataSent *Metric DataReceived *Metric + + // SSE-related + SSEEventReceived *Metric } // RegisterBuiltinMetrics register and returns the builtin metrics in the provided registry @@ -107,5 +112,7 @@ func RegisterBuiltinMetrics(registry *Registry) *BuiltinMetrics { DataSent: registry.MustNewMetric(DataSentName, Counter, Data), DataReceived: registry.MustNewMetric(DataReceivedName, Counter, Data), + + SSEEventReceived: registry.MustNewMetric(SSEName, Counter), } } From be23f193cd72b9fa702ed794851fbdef734f3880 Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Wed, 13 Mar 2024 13:43:08 +0100 Subject: [PATCH 2/8] sse: linter, clearer code and comments --- js/modules/k6/experimental/sse/sse.go | 134 +++++++++++++------------- 1 file changed, 65 insertions(+), 69 deletions(-) diff --git a/js/modules/k6/experimental/sse/sse.go b/js/modules/k6/experimental/sse/sse.go index 4ea346122aa..29d64c325fd 100644 --- a/js/modules/k6/experimental/sse/sse.go +++ b/js/modules/k6/experimental/sse/sse.go @@ -1,4 +1,5 @@ -// Package sse implements a k6/sse for k6. It provides basic functionality to handle Server-Sent Event over http +// Package sse implements a k6/sse javascript module extension for k6. +// It provides basic functionality to handle Server-Sent Event over http // that *blocks* the event loop while the http connection is opened. package sse @@ -71,7 +72,8 @@ var ErrSSEInInitContext = common.NewInitContextError("using sse in the init cont // Client is the representation of the sse returned to the js. type Client struct { rt *goja.Runtime - ctx context.Context //nolint:containedctx + ctx context.Context + url string resp *http.Response eventHandlers map[string][]goja.Callable done chan struct{} @@ -111,9 +113,7 @@ func (mi *sse) Exports() modules.Exports { return modules.Exports{Default: mi.obj} } -// Open establishes a client connection based on the parameters provided. -// -//nolint:funlen +// Open establishes a http client connection based on the parameters provided. func (mi *sse) Open(url string, args ...goja.Value) (*HTTPResponse, error) { ctx := mi.vu.Context() rt := mi.vu.Runtime() @@ -129,8 +129,7 @@ func (mi *sse) Open(url string, args ...goja.Value) (*HTTPResponse, error) { parsedArgs.tagsAndMeta.SetSystemTagOrMetaIfEnabled(state.Options.SystemTags, metrics.TagURL, url) - //nolint:bodyclose // as it's deferred closed in closeResponseBody - client, httpResponse, connEndHook, err := mi.open(ctx, state, rt, url, parsedArgs) + client, connEndHook, err := mi.open(ctx, state, rt, url, parsedArgs) defer connEndHook() if err != nil { // Pass the error to the user script before exiting immediately @@ -138,10 +137,7 @@ func (mi *sse) Open(url string, args ...goja.Value) (*HTTPResponse, error) { if state.Options.Throw.Bool { return nil, err } - if httpResponse != nil { - return wrapHTTPResponse(httpResponse) - } - return &HTTPResponse{Error: err.Error()}, nil + return client.wrapHTTPResponse(err.Error()) } // Run the user-provided set up function @@ -157,10 +153,8 @@ func (mi *sse) Open(url string, args ...goja.Value) (*HTTPResponse, error) { readErrChan := make(chan error) readCloseChan := make(chan int) - reader := bufio.NewReader(httpResponse.Body) - // Wraps a couple of channels - go client.readEvents(reader, readEventChan, readErrChan, readCloseChan) + go client.readEvents(readEventChan, readErrChan, readCloseChan) // This is the main control loop. All JS code (including error handlers) // should only be executed by this thread to avoid race conditions @@ -192,23 +186,18 @@ func (mi *sse) Open(url string, args ...goja.Value) (*HTTPResponse, error) { case <-client.done: // This is the final exit point normally triggered by closeResponseBody - sseResponse, sseRespErr := wrapHTTPResponse(httpResponse) - if sseRespErr != nil { - return nil, sseRespErr - } - sseResponse.URL = url - return sseResponse, nil + return client.wrapHTTPResponse("") } } } -func (mi *sse) open( - ctx context.Context, state *lib.State, rt *goja.Runtime, url string, - args *sseOpenArgs, -) (*Client, *http.Response, func(), error) { - client := Client{ +func (mi *sse) open(ctx context.Context, state *lib.State, + rt *goja.Runtime, url string, args *sseOpenArgs, +) (*Client, func(), error) { + sseClient := Client{ ctx: ctx, rt: rt, + url: url, eventHandlers: make(map[string][]goja.Callable), done: make(chan struct{}), samplesOutput: state.Samples, @@ -229,7 +218,8 @@ func (mi *sse) open( TLSClientConfig: tlsConfig, }, } - // this is needed because of how interfaces work and that ssed.Jar is http.Cookiejar + + // httpClient.Jar must never be nil if args.cookieJar != nil { httpClient.Jar = args.cookieJar } @@ -241,13 +231,14 @@ func (mi *sse) open( req, err := http.NewRequestWithContext(ctx, httpMethod, url, strings.NewReader(args.body)) if err != nil { - return &client, nil, nil, err + return &sseClient, nil, err } req.Header.Set("Cache-Control", "no-cache") req.Header.Set("Accept", "text/event-stream") req.Header.Set("Connection", "keep-alive") + // Wrap the request to retrieve the server IP tag trace := &httptrace.ClientTrace{ GotConn: func(connInfo httptrace.GotConnInfo) { if state.Options.SystemTags.Has(metrics.TagIP) { @@ -258,38 +249,39 @@ func (mi *sse) open( }, } - //nolint:contextcheck // as it's passed in the request + //nolint:contextcheck // parent context already passed in the request req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) connStart := time.Now() + //nolint:bodyclose // Body is deferred closed in closeResponseBody resp, err := httpClient.Do(req) connEnd := time.Now() if resp != nil { - client.resp = resp + sseClient.resp = resp if state.Options.SystemTags.Has(metrics.TagStatus) { args.tagsAndMeta.SetSystemTagOrMeta( metrics.TagStatus, strconv.Itoa(resp.StatusCode)) } } - connEndHook := client.pushSessionMetrics(connStart, connEnd) + connEndHook := sseClient.pushSSEMetrics(connStart, connEnd) - return &client, resp, connEndHook, err + return &sseClient, connEndHook, err } // On is used to configure what the client should do on each event. -func (s *Client) On(event string, handler goja.Value) { +func (c *Client) On(event string, handler goja.Value) { if handler, ok := goja.AssertFunction(handler); ok { - s.eventHandlers[event] = append(s.eventHandlers[event], handler) + c.eventHandlers[event] = append(c.eventHandlers[event], handler) } } -func (s *Client) handleEvent(event string, args ...goja.Value) { - if handlers, ok := s.eventHandlers[event]; ok { +func (c *Client) handleEvent(event string, args ...goja.Value) { + if handlers, ok := c.eventHandlers[event]; ok { for _, handler := range handlers { if _, err := handler(goja.Undefined(), args...); err != nil { - common.Throw(s.rt, err) + common.Throw(c.rt, err) } } } @@ -297,37 +289,37 @@ func (s *Client) handleEvent(event string, args ...goja.Value) { // closeResponseBody cleanly closes the response body. // Returns an error if sending the response body cannot be closed. -func (s *Client) closeResponseBody() error { +func (c *Client) closeResponseBody() error { var err error - s.shutdownOnce.Do(func() { - err = s.resp.Body.Close() + c.shutdownOnce.Do(func() { + err = c.resp.Body.Close() if err != nil { // Call the user-defined error handler - s.handleEvent("error", s.rt.ToValue(err)) + c.handleEvent("error", c.rt.ToValue(err)) } - close(s.done) + close(c.done) }) return err } -func (s *Client) pushSessionMetrics(connStart, connEnd time.Time) func() { +func (c *Client) pushSSEMetrics(connStart, connEnd time.Time) func() { connDuration := metrics.D(connEnd.Sub(connStart)) - metrics.PushIfNotDone(s.ctx, s.samplesOutput, metrics.ConnectedSamples{ + metrics.PushIfNotDone(c.ctx, c.samplesOutput, metrics.ConnectedSamples{ Samples: []metrics.Sample{ { TimeSeries: metrics.TimeSeries{ - Metric: s.builtinMetrics.HTTPReqSending, - Tags: s.tagsAndMeta.Tags, + Metric: c.builtinMetrics.HTTPReqSending, + Tags: c.tagsAndMeta.Tags, }, Time: connStart, - Metadata: s.tagsAndMeta.Metadata, + Metadata: c.tagsAndMeta.Metadata, Value: connDuration, }, }, - Tags: s.tagsAndMeta.Tags, + Tags: c.tagsAndMeta.Tags, Time: connStart, }) @@ -335,46 +327,46 @@ func (s *Client) pushSessionMetrics(connStart, connEnd time.Time) func() { end := time.Now() requestDuration := metrics.D(end.Sub(connStart)) - metrics.PushIfNotDone(s.ctx, s.samplesOutput, metrics.ConnectedSamples{ + metrics.PushIfNotDone(c.ctx, c.samplesOutput, metrics.ConnectedSamples{ Samples: []metrics.Sample{ { TimeSeries: metrics.TimeSeries{ - Metric: s.builtinMetrics.HTTPReqs, - Tags: s.tagsAndMeta.Tags, + Metric: c.builtinMetrics.HTTPReqs, + Tags: c.tagsAndMeta.Tags, }, Time: end, - Metadata: s.tagsAndMeta.Metadata, + Metadata: c.tagsAndMeta.Metadata, Value: 1, }, { TimeSeries: metrics.TimeSeries{ - Metric: s.builtinMetrics.HTTPReqSending, - Tags: s.tagsAndMeta.Tags, + Metric: c.builtinMetrics.HTTPReqSending, + Tags: c.tagsAndMeta.Tags, }, Time: end, - Metadata: s.tagsAndMeta.Metadata, + Metadata: c.tagsAndMeta.Metadata, Value: connDuration, }, { TimeSeries: metrics.TimeSeries{ - Metric: s.builtinMetrics.HTTPReqDuration, - Tags: s.tagsAndMeta.Tags, + Metric: c.builtinMetrics.HTTPReqDuration, + Tags: c.tagsAndMeta.Tags, }, Time: end, - Metadata: s.tagsAndMeta.Metadata, + Metadata: c.tagsAndMeta.Metadata, Value: requestDuration, }, }, - Tags: s.tagsAndMeta.Tags, + Tags: c.tagsAndMeta.Tags, Time: end, }) } } // Wraps SSE in a channel -func (s *Client) readEvents(reader *bufio.Reader, readChan chan Event, errorChan chan error, closeChan chan int) { +func (c *Client) readEvents(readChan chan Event, errorChan chan error, closeChan chan int) { + reader := bufio.NewReader(c.resp.Body) ev := Event{} - var buf bytes.Buffer sendEvent := false @@ -385,14 +377,14 @@ func (s *Client) readEvents(reader *bufio.Reader, readChan chan Event, errorChan select { case closeChan <- -1: return - case <-s.done: + case <-c.done: return } } else { select { case errorChan <- err: return - case <-s.done: + case <-c.done: return } } @@ -435,28 +427,32 @@ func (s *Client) readEvents(reader *bufio.Reader, readChan chan Event, errorChan sendEvent = false buf.Reset() ev = Event{} - case <-s.done: + case <-c.done: return } } default: select { case errorChan <- errors.New("unknown event: " + string(line)): - case <-s.done: + case <-c.done: return } } } } -// Wrap the raw HTTPResponse we received to a sseHTTPResponse we can pass to the user -func wrapHTTPResponse(httpResponse *http.Response) (*HTTPResponse, error) { +// Wrap the raw HTTPResponse we received to a sse.HTTPResponse we can pass to the user +func (c *Client) wrapHTTPResponse(errMessage string) (*HTTPResponse, error) { + if errMessage != "" { + return &HTTPResponse{Error: errMessage}, nil + } sseResponse := HTTPResponse{ - Status: httpResponse.StatusCode, + URL: c.url, + Status: c.resp.StatusCode, } - sseResponse.Headers = make(map[string]string, len(httpResponse.Header)) - for k, vs := range httpResponse.Header { + sseResponse.Headers = make(map[string]string, len(c.resp.Header)) + for k, vs := range c.resp.Header { sseResponse.Headers[k] = strings.Join(vs, ", ") } From d1c57a30eea8571ab1802576989e8de40c5fd206 Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Wed, 13 Mar 2024 14:25:08 +0100 Subject: [PATCH 3/8] sse: Register the experimental sse module and add example, and api design proposal --- docs/design/021-sse-api.md | 59 +++++++++++++++++++++++++++ examples/sse.js | 27 ++++++++++++ js/jsmodules.go | 2 + js/modules/k6/experimental/sse/sse.go | 1 + js/runner_test.go | 14 +++++++ 5 files changed, 103 insertions(+) create mode 100644 docs/design/021-sse-api.md create mode 100644 examples/sse.js diff --git a/docs/design/021-sse-api.md b/docs/design/021-sse-api.md new file mode 100644 index 00000000000..15089a20704 --- /dev/null +++ b/docs/design/021-sse-api.md @@ -0,0 +1,59 @@ +# Introduce an SSE API module for k6 + +| | | +|:---------------------|:-------------------------------------------------------------| +| **author** | @phymbert | +| **status** | 🔧 proposal | +| **revisions** | [initial](https://github.com/grafana/k6/pull/3639) | +| **Proof of concept** | [branch](https://github.com/phymbert/k6/tree/hp/feature/sse) | +| **references** | [#746](https://github.com/grafana/k6/issues/746) | + +## Problem definition + +The current version of k6 reads the full http response body before returning to the client, +which make impossible testing [Server-Sent Event](https://fr.wikipedia.org/wiki/Server-sent_events). + +We propose to introduce a new `sse` module. +This module is intended to offer an intuitive and user-friendly API for SSE interactions within k6 scripts. + +## Proposed solution + +We suggest to implement a minimalist, experimental (`sse`) module based on the go http client. +The new module will allow users to interact with SSE. +The module will provide an `open` which will allow the user to pass a setup function to configure an `event` callback as it is done in the `ws` module with `message`. + +### Limitation +The module will not support async io and the javascript main loop will be blocked during the http request duration. + +### Example usage + +```javascript +import sse from 'k6/experimental/sse'; + +var url = "https://echo.websocket.org/.sse"; +var params = {"tags": {"my_tag": "hello"}}; + +var response = sse.open(url, params, function (client) { + client.on('open', function open() { + console.log('connected'); + }); + + client.on('event', function (event) { + console.log(`event id=${event.id}, name=${event.name}, data=${event.data}`); + }); + + client.on('close', function close() { + console.log('disconnected'); + }); + + client.on('error', function (e) { + console.log('An unexpected error occurred: ', e.error()); + }); +}); + +check(response, {"status is 200": (r) => r && r.status === 200}); +``` + +### Conclusion + +We believe the [proof of concept](https://github.com/grafana/k6/blob/d5cd1010ecb2381376188c8a47ab861cf8b5dc3d/js/modules/k6/experimental/sse/sse.go) developed with this proposal illustrates the feasibility and benefits of developing such an API. diff --git a/examples/sse.js b/examples/sse.js new file mode 100644 index 00000000000..e8a5bf20b30 --- /dev/null +++ b/examples/sse.js @@ -0,0 +1,27 @@ +import sse from "k6/experimental/sse"; +import {check} from "k6"; + +export default function () { + var url = "https://echo.websocket.org/.sse"; + var params = {"tags": {"my_tag": "hello"}}; + + var response = sse.open(url, params, function (client) { + client.on('open', function open() { + console.log('connected'); + }); + + client.on('event', function (event) { + console.log(`event id=${event.id}, name=${event.name}, data=${event.data}`); + }); + + client.on('close', function close() { + console.log('disconnected'); + }); + + client.on('error', function (e) { + console.log('An unexpected error occurred: ', e.error()); + }); + }); + + check(response, {"status is 200": (r) => r && r.status === 200}); +}; diff --git a/js/jsmodules.go b/js/jsmodules.go index ea0da9a58c7..534783d48f3 100644 --- a/js/jsmodules.go +++ b/js/jsmodules.go @@ -1,6 +1,7 @@ package js import ( + "go.k6.io/k6/js/modules/k6/experimental/sse" "sync" "go.k6.io/k6/ext" @@ -45,6 +46,7 @@ func getInternalJSModules() map[string]interface{} { "k6/experimental/tracing": tracing.New(), "k6/experimental/browser": browser.New(), "k6/experimental/fs": fs.New(), + "k6/experimental/sse": sse.New(), "k6/net/grpc": grpc.New(), "k6/html": html.New(), "k6/http": http.New(), diff --git a/js/modules/k6/experimental/sse/sse.go b/js/modules/k6/experimental/sse/sse.go index 29d64c325fd..b78a4114e4a 100644 --- a/js/modules/k6/experimental/sse/sse.go +++ b/js/modules/k6/experimental/sse/sse.go @@ -1,6 +1,7 @@ // Package sse implements a k6/sse javascript module extension for k6. // It provides basic functionality to handle Server-Sent Event over http // that *blocks* the event loop while the http connection is opened. +// [File API design document]: https://github.com/grafana/k6/blob/master/docs/design/021-sse-api.md#proposed-solution package sse import ( diff --git a/js/runner_test.go b/js/runner_test.go index f8a0273f0a1..ed0409e622b 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -10,6 +10,7 @@ import ( "crypto/x509/pkix" "encoding/pem" "fmt" + "go.k6.io/k6/js/modules/k6/experimental/sse" "go/build" "io" "io/fs" @@ -1913,6 +1914,19 @@ func TestInitContextForbidden(t *testing.T) { exports.default = function() { console.log("p"); }`, ws.ErrWSInInitContext.Error(), }, + { + "sse", + `var sse = require("k6/experimental/sse"); + var url = "https://echo.websocket.org/.sse"; + var response = sse.open(url, function (client) { + client.on('open', function open() { + console.log('connected'); + }) + }); + + exports.default = function() { console.log("p"); }`, + sse.ErrSSEInInitContext.Error(), + }, { "metric", `var Counter = require("k6/metrics").Counter; From e122ad71666461b5eb17e5ab0d97e20565c7e11b Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Wed, 13 Mar 2024 14:28:24 +0100 Subject: [PATCH 4/8] sse: test the open event --- docs/design/021-sse-api.md | 4 ---- examples/sse.js | 4 ---- js/modules/k6/experimental/sse/sse_test.go | 10 ++++++++++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/design/021-sse-api.md b/docs/design/021-sse-api.md index 15089a20704..a1b7e33983f 100644 --- a/docs/design/021-sse-api.md +++ b/docs/design/021-sse-api.md @@ -42,10 +42,6 @@ var response = sse.open(url, params, function (client) { console.log(`event id=${event.id}, name=${event.name}, data=${event.data}`); }); - client.on('close', function close() { - console.log('disconnected'); - }); - client.on('error', function (e) { console.log('An unexpected error occurred: ', e.error()); }); diff --git a/examples/sse.js b/examples/sse.js index e8a5bf20b30..edb48efdd85 100644 --- a/examples/sse.js +++ b/examples/sse.js @@ -14,10 +14,6 @@ export default function () { console.log(`event id=${event.id}, name=${event.name}, data=${event.data}`); }); - client.on('close', function close() { - console.log('disconnected'); - }); - client.on('error', function (e) { console.log('An unexpected error occurred: ', e.error()); }); diff --git a/js/modules/k6/experimental/sse/sse_test.go b/js/modules/k6/experimental/sse/sse_test.go index e83ccc19599..7b32c31ceb7 100644 --- a/js/modules/k6/experimental/sse/sse_test.go +++ b/js/modules/k6/experimental/sse/sse_test.go @@ -117,9 +117,16 @@ func TestOpen(t *testing.T) { test := newTestState(t) _, err := test.VU.Runtime().RunString(sr(` + var open = false; var error = false; var events = []; var res = sse.open("HTTPBIN_IP_URL/sse", function(client){ + client.on("error", function(err) { + error = true + }); + client.on("open", function(err) { + open = true + }); client.on("error", function(err) { error = true }); @@ -127,6 +134,9 @@ func TestOpen(t *testing.T) { events.push(event); }); }); + if (!open) { + throw new Error("opened is not called"); + } if (error) { throw new Error("error raised"); } From d7112231c6960d22d25f43a07c9ec70bc70148bf Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Wed, 13 Mar 2024 14:39:27 +0100 Subject: [PATCH 5/8] sse: test the method post with body --- js/modules/k6/experimental/sse/sse_test.go | 36 +++++++++++++++- lib/testutils/httpmultibin/httpmultibin.go | 50 ++++++++++++++-------- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/js/modules/k6/experimental/sse/sse_test.go b/js/modules/k6/experimental/sse/sse_test.go index 7b32c31ceb7..02a49ce1945 100644 --- a/js/modules/k6/experimental/sse/sse_test.go +++ b/js/modules/k6/experimental/sse/sse_test.go @@ -110,7 +110,7 @@ func newTestState(t testing.TB) testState { func TestOpen(t *testing.T) { t.Parallel() - t.Run("nominal", func(t *testing.T) { + t.Run("nominal get", func(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) sr := tb.Replacer.Replace @@ -171,6 +171,40 @@ func TestOpen(t *testing.T) { samplesBuf := metrics.GetBufferedSamples(test.samples) assertMetricEmittedCount(t, metrics.SSEName, samplesBuf, sr("HTTPBIN_IP_URL/sse"), 2) }) + + t.Run("post method", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(sr(` + var events = []; + var res = sse.open("HTTPBIN_IP_URL/sse", {method: 'POST', body: '{"ping": true}'}, function(client){ + client.on("event", function(event) { + events.push(event); + }); + }); + for (let i = 0; i < events.length; i++) { + let event = events[i]; + switch(i) { + case 0: + if (event.id !== "pong") { + throw new Error("unexpected event id: " + event.id); + } + if (event.data !== '{"ping": "pong"}\n') { + throw new Error("unexpected event data: " + event.data); + } + break; + default: + throw new Error("unexpected event"); + } + } + `)) + require.NoError(t, err) + samplesBuf := metrics.GetBufferedSamples(test.samples) + assertMetricEmittedCount(t, metrics.SSEName, samplesBuf, sr("HTTPBIN_IP_URL/sse"), 1) + }) } func TestErrors(t *testing.T) { diff --git a/lib/testutils/httpmultibin/httpmultibin.go b/lib/testutils/httpmultibin/httpmultibin.go index 538eba7acef..4be351683ba 100644 --- a/lib/testutils/httpmultibin/httpmultibin.go +++ b/lib/testutils/httpmultibin/httpmultibin.go @@ -7,6 +7,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "github.com/stretchr/testify/assert" "io" "net" "net/http" @@ -164,28 +165,39 @@ func echoHandler(t testing.TB, closePrematurely bool) http.Handler { // If generateErrors is true then it generates junk // without respecting the protocol. func sseHandler(t testing.TB, generateErrors bool) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if generateErrors { _, _ = w.Write([]byte("junk\n")) } else { - _, err := w.Write([]byte(": hello\n")) // comment - require.NoError(t, err) - - _, err = w.Write([]byte("retry: 10000\n")) // retry - require.NoError(t, err) - - _, err = w.Write([]byte("id: ABCD\n")) // id - require.NoError(t, err) - - _, err = w.Write([]byte(`data: {"ping": "pong"}` + "\n")) // data 1 event 1 - require.NoError(t, err) - _, err = w.Write([]byte(`data: {"hello": "sse"}` + "\n\n")) // data 2 event 1 - require.NoError(t, err) - - _, err = w.Write([]byte("event: EFGH\n")) // event name - require.NoError(t, err) - _, err = w.Write([]byte(`data: {"hello": "sse"}` + "\n\n")) // data event 2 - require.NoError(t, err) + if req.Method == http.MethodPost { + body, err := io.ReadAll(req.Body) + require.NoError(t, err) + assert.Equal(t, `{"ping": true}`, string(body)) + + _, err = w.Write([]byte("id: pong\n")) // event id + require.NoError(t, err) + _, err = w.Write([]byte(`data: {"ping": "pong"}` + "\n\n")) // event data + require.NoError(t, err) + } else { + _, err := w.Write([]byte(": hello\n")) // comment + require.NoError(t, err) + + _, err = w.Write([]byte("retry: 10000\n")) // retry + require.NoError(t, err) + + _, err = w.Write([]byte("id: ABCD\n")) // id + require.NoError(t, err) + + _, err = w.Write([]byte(`data: {"ping": "pong"}` + "\n")) // event 1 data 1 + require.NoError(t, err) + _, err = w.Write([]byte(`data: {"hello": "sse"}` + "\n\n")) // event 1 data 2 + require.NoError(t, err) + + _, err = w.Write([]byte("event: EFGH\n")) // event name + require.NoError(t, err) + _, err = w.Write([]byte(`data: {"hello": "sse"}` + "\n\n")) // event 2 data + require.NoError(t, err) + } } }) } From e9dec064a72b1676cdbe17bff37afadc2177102f Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Wed, 13 Mar 2024 14:40:28 +0100 Subject: [PATCH 6/8] sse: fix documentation --- js/modules/k6/experimental/sse/sse.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/modules/k6/experimental/sse/sse.go b/js/modules/k6/experimental/sse/sse.go index b78a4114e4a..0f2fdfef815 100644 --- a/js/modules/k6/experimental/sse/sse.go +++ b/js/modules/k6/experimental/sse/sse.go @@ -1,7 +1,7 @@ // Package sse implements a k6/sse javascript module extension for k6. // It provides basic functionality to handle Server-Sent Event over http // that *blocks* the event loop while the http connection is opened. -// [File API design document]: https://github.com/grafana/k6/blob/master/docs/design/021-sse-api.md#proposed-solution +// [SSE API design document]: https://github.com/grafana/k6/blob/master/docs/design/021-sse-api.md#proposed-solution package sse import ( From 65506f29565ed2a383a5bed4b9d824ae9af2f961 Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Wed, 13 Mar 2024 14:42:51 +0100 Subject: [PATCH 7/8] sse: tests remove import to assert in httpmultibin --- lib/testutils/httpmultibin/httpmultibin.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/testutils/httpmultibin/httpmultibin.go b/lib/testutils/httpmultibin/httpmultibin.go index 4be351683ba..e04eaa733b9 100644 --- a/lib/testutils/httpmultibin/httpmultibin.go +++ b/lib/testutils/httpmultibin/httpmultibin.go @@ -7,7 +7,6 @@ import ( "crypto/x509" "encoding/json" "fmt" - "github.com/stretchr/testify/assert" "io" "net" "net/http" @@ -172,7 +171,9 @@ func sseHandler(t testing.TB, generateErrors bool) http.Handler { if req.Method == http.MethodPost { body, err := io.ReadAll(req.Body) require.NoError(t, err) - assert.Equal(t, `{"ping": true}`, string(body)) + if `{"ping": true}` != string(body) { + t.Fail() + } _, err = w.Write([]byte("id: pong\n")) // event id require.NoError(t, err) From ee60d0fff9759fe0acde63c70f47ead9351563ea Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Thu, 14 Mar 2024 19:50:49 +0100 Subject: [PATCH 8/8] sse: PR feedback: support event comment. Simplify the event loop parsing, add spec link. Trim trailing spaces --- js/modules/k6/experimental/sse/sse.go | 51 ++++++++++++---------- js/modules/k6/experimental/sse/sse_test.go | 9 ++-- lib/testutils/httpmultibin/httpmultibin.go | 4 +- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/js/modules/k6/experimental/sse/sse.go b/js/modules/k6/experimental/sse/sse.go index 0f2fdfef815..f01bd53679c 100644 --- a/js/modules/k6/experimental/sse/sse.go +++ b/js/modules/k6/experimental/sse/sse.go @@ -95,9 +95,10 @@ type HTTPResponse struct { // Event represents a Server-Sent Event type Event struct { - ID string - Name string - Data string + ID string + Comment string + Name string + Data string } type sseOpenArgs struct { @@ -364,13 +365,13 @@ func (c *Client) pushSSEMetrics(connStart, connEnd time.Time) func() { } } -// Wraps SSE in a channel +// Wraps SSE in a channel, follow the SSE format described in: +// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events func (c *Client) readEvents(readChan chan Event, errorChan chan error, closeChan chan int) { reader := bufio.NewReader(c.resp.Body) ev := Event{} var buf bytes.Buffer - sendEvent := false for { line, err := reader.ReadBytes('\n') if err != nil { @@ -392,18 +393,18 @@ func (c *Client) readEvents(readChan chan Event, errorChan chan error, closeChan } switch { - case hasPrefix(line, ":"): - // Comment, do nothing - - case hasPrefix(line, "retry:"): - // Retry, do nothing for now - // id of event case hasPrefix(line, "id: "): ev.ID = stripPrefix(line, 4) case hasPrefix(line, "id:"): ev.ID = stripPrefix(line, 3) + // Comment + case hasPrefix(line, ": "): + ev.Comment = stripPrefix(line, 2) + case hasPrefix(line, ":"): + ev.Comment = stripPrefix(line, 1) + // name of event case hasPrefix(line, "event: "): ev.Name = stripPrefix(line, 7) @@ -413,24 +414,26 @@ func (c *Client) readEvents(readChan chan Event, errorChan chan error, closeChan // event data case hasPrefix(line, "data: "): buf.Write(line[6:]) - sendEvent = true + case hasPrefix(line, "data:"): buf.Write(line[5:]) - sendEvent = true + + case hasPrefix(line, "retry:"): + // Retry, do nothing for now // end of event case bytes.Equal(line, []byte("\n")): - if sendEvent { - // Report an unexpected closure - ev.Data = buf.String() - select { - case readChan <- ev: - sendEvent = false - buf.Reset() - ev = Event{} - case <-c.done: - return - } + // Trailing newlines are removed. + ev.Data = strings.TrimRightFunc(buf.String(), func(r rune) bool { + return r == '\r' || r == '\n' + }) + + select { + case readChan <- ev: + buf.Reset() + ev = Event{} + case <-c.done: + return } default: select { diff --git a/js/modules/k6/experimental/sse/sse_test.go b/js/modules/k6/experimental/sse/sse_test.go index 02a49ce1945..31acf015ba8 100644 --- a/js/modules/k6/experimental/sse/sse_test.go +++ b/js/modules/k6/experimental/sse/sse_test.go @@ -147,7 +147,10 @@ func TestOpen(t *testing.T) { if (event.id !== "ABCD") { throw new Error("unexpected event id: " + event.id); } - if (event.data !== '{"ping": "pong"}\n{"hello": "sse"}\n') { + if (event.comment !== 'hello') { + throw new Error("unexpected event comment: " + event.comment); + } + if (event.data !== '{"ping": "pong"}\n{"hello": "sse"}') { throw new Error("unexpected event data: " + event.data); } break; @@ -158,7 +161,7 @@ func TestOpen(t *testing.T) { if (event.name !== "EFGH") { throw new Error("unexpected event name: " + event.name); } - if (event.data !== '{"hello": "sse"}\n') { + if (event.data !== '{"hello": "sse"}') { throw new Error("unexpected event data: " + event.data); } break; @@ -192,7 +195,7 @@ func TestOpen(t *testing.T) { if (event.id !== "pong") { throw new Error("unexpected event id: " + event.id); } - if (event.data !== '{"ping": "pong"}\n') { + if (event.data !== '{"ping": "pong"}') { throw new Error("unexpected event data: " + event.data); } break; diff --git a/lib/testutils/httpmultibin/httpmultibin.go b/lib/testutils/httpmultibin/httpmultibin.go index e04eaa733b9..eec4a3a04d5 100644 --- a/lib/testutils/httpmultibin/httpmultibin.go +++ b/lib/testutils/httpmultibin/httpmultibin.go @@ -180,10 +180,10 @@ func sseHandler(t testing.TB, generateErrors bool) http.Handler { _, err = w.Write([]byte(`data: {"ping": "pong"}` + "\n\n")) // event data require.NoError(t, err) } else { - _, err := w.Write([]byte(": hello\n")) // comment + _, err := w.Write([]byte("retry: 10000\n")) // retry require.NoError(t, err) - _, err = w.Write([]byte("retry: 10000\n")) // retry + _, err = w.Write([]byte(": hello\n")) // comment require.NoError(t, err) _, err = w.Write([]byte("id: ABCD\n")) // id