diff --git a/docs/design/021-sse-api.md b/docs/design/021-sse-api.md new file mode 100644 index 00000000000..a1b7e33983f --- /dev/null +++ b/docs/design/021-sse-api.md @@ -0,0 +1,55 @@ +# Introduce an SSE API module for k6 + +| | | +|:---------------------|:-------------------------------------------------------------| +| **author** | @phymbert | +| **status** | 🔧 proposal | +| **revisions** | [initial](https://github.com/grafana/k6/pull/3639) | +| **Proof of concept** | [branch](https://github.com/phymbert/k6/tree/hp/feature/sse) | +| **references** | [#746](https://github.com/grafana/k6/issues/746) | + +## Problem definition + +The current version of k6 reads the full http response body before returning to the client, +which make impossible testing [Server-Sent Event](https://fr.wikipedia.org/wiki/Server-sent_events). + +We propose to introduce a new `sse` module. +This module is intended to offer an intuitive and user-friendly API for SSE interactions within k6 scripts. + +## Proposed solution + +We suggest to implement a minimalist, experimental (`sse`) module based on the go http client. +The new module will allow users to interact with SSE. +The module will provide an `open` which will allow the user to pass a setup function to configure an `event` callback as it is done in the `ws` module with `message`. + +### Limitation +The module will not support async io and the javascript main loop will be blocked during the http request duration. + +### Example usage + +```javascript +import sse from 'k6/experimental/sse'; + +var url = "https://echo.websocket.org/.sse"; +var params = {"tags": {"my_tag": "hello"}}; + +var response = sse.open(url, params, function (client) { + client.on('open', function open() { + console.log('connected'); + }); + + client.on('event', function (event) { + console.log(`event id=${event.id}, name=${event.name}, data=${event.data}`); + }); + + client.on('error', function (e) { + console.log('An unexpected error occurred: ', e.error()); + }); +}); + +check(response, {"status is 200": (r) => r && r.status === 200}); +``` + +### Conclusion + +We believe the [proof of concept](https://github.com/grafana/k6/blob/d5cd1010ecb2381376188c8a47ab861cf8b5dc3d/js/modules/k6/experimental/sse/sse.go) developed with this proposal illustrates the feasibility and benefits of developing such an API. diff --git a/examples/sse.js b/examples/sse.js new file mode 100644 index 00000000000..edb48efdd85 --- /dev/null +++ b/examples/sse.js @@ -0,0 +1,23 @@ +import sse from "k6/experimental/sse"; +import {check} from "k6"; + +export default function () { + var url = "https://echo.websocket.org/.sse"; + var params = {"tags": {"my_tag": "hello"}}; + + var response = sse.open(url, params, function (client) { + client.on('open', function open() { + console.log('connected'); + }); + + client.on('event', function (event) { + console.log(`event id=${event.id}, name=${event.name}, data=${event.data}`); + }); + + client.on('error', function (e) { + console.log('An unexpected error occurred: ', e.error()); + }); + }); + + check(response, {"status is 200": (r) => r && r.status === 200}); +}; diff --git a/js/jsmodules.go b/js/jsmodules.go index ea0da9a58c7..534783d48f3 100644 --- a/js/jsmodules.go +++ b/js/jsmodules.go @@ -1,6 +1,7 @@ package js import ( + "go.k6.io/k6/js/modules/k6/experimental/sse" "sync" "go.k6.io/k6/ext" @@ -45,6 +46,7 @@ func getInternalJSModules() map[string]interface{} { "k6/experimental/tracing": tracing.New(), "k6/experimental/browser": browser.New(), "k6/experimental/fs": fs.New(), + "k6/experimental/sse": sse.New(), "k6/net/grpc": grpc.New(), "k6/html": html.New(), "k6/http": http.New(), diff --git a/js/modules/k6/experimental/sse/sse.go b/js/modules/k6/experimental/sse/sse.go new file mode 100644 index 00000000000..f01bd53679c --- /dev/null +++ b/js/modules/k6/experimental/sse/sse.go @@ -0,0 +1,543 @@ +// Package sse implements a k6/sse javascript module extension for k6. +// It provides basic functionality to handle Server-Sent Event over http +// that *blocks* the event loop while the http connection is opened. +// [SSE API design document]: https://github.com/grafana/k6/blob/master/docs/design/021-sse-api.md#proposed-solution +package sse + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/http/cookiejar" + "net/http/httptrace" + "strconv" + "strings" + "sync" + "time" + + "github.com/dop251/goja" + "go.k6.io/k6/js/common" + "go.k6.io/k6/js/modules" + httpModule "go.k6.io/k6/js/modules/k6/http" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" +) + +type ( + // RootModule is the global module instance that will create module + // instances for each VU. + RootModule struct{} + + // sse represents a module instance of the sse module. + sse struct { + vu modules.VU + obj *goja.Object + } +) + +var ( + _ modules.Module = &RootModule{} + _ modules.Instance = &sse{} +) + +// New returns a pointer to a new RootModule instance. +func New() *RootModule { + return &RootModule{} +} + +// NewModuleInstance implements the modules.Module interface to return +// a new instance for each VU. +func (*RootModule) NewModuleInstance(m modules.VU) modules.Instance { + rt := m.Runtime() + mi := &sse{ + vu: m, + } + obj := rt.NewObject() + if err := obj.Set("open", mi.Open); err != nil { + common.Throw(rt, err) + } + + mi.obj = obj + return mi +} + +// ErrSSEInInitContext is returned when sse are using in the init context +var ErrSSEInInitContext = common.NewInitContextError("using sse in the init context is not supported") + +// Client is the representation of the sse returned to the js. +type Client struct { + rt *goja.Runtime + ctx context.Context + url string + resp *http.Response + eventHandlers map[string][]goja.Callable + done chan struct{} + shutdownOnce sync.Once + + tagsAndMeta *metrics.TagsAndMeta + samplesOutput chan<- metrics.SampleContainer + builtinMetrics *metrics.BuiltinMetrics +} + +// HTTPResponse is the http response returned by sse.open. +type HTTPResponse struct { + URL string `json:"url"` + Status int `json:"status"` + Headers map[string]string `json:"headers"` + Error string `json:"error"` +} + +// Event represents a Server-Sent Event +type Event struct { + ID string + Comment string + Name string + Data string +} + +type sseOpenArgs struct { + setupFn goja.Callable + headers http.Header + method string + body string + cookieJar *cookiejar.Jar + tagsAndMeta *metrics.TagsAndMeta +} + +// Exports returns the exports of the sse module. +func (mi *sse) Exports() modules.Exports { + return modules.Exports{Default: mi.obj} +} + +// Open establishes a http client connection based on the parameters provided. +func (mi *sse) Open(url string, args ...goja.Value) (*HTTPResponse, error) { + ctx := mi.vu.Context() + rt := mi.vu.Runtime() + state := mi.vu.State() + if state == nil { + return nil, ErrSSEInInitContext + } + + parsedArgs, err := parseConnectArgs(state, rt, args...) + if err != nil { + return nil, err + } + + parsedArgs.tagsAndMeta.SetSystemTagOrMetaIfEnabled(state.Options.SystemTags, metrics.TagURL, url) + + client, connEndHook, err := mi.open(ctx, state, rt, url, parsedArgs) + defer connEndHook() + if err != nil { + // Pass the error to the user script before exiting immediately + client.handleEvent("error", rt.ToValue(err)) + if state.Options.Throw.Bool { + return nil, err + } + return client.wrapHTTPResponse(err.Error()) + } + + // Run the user-provided set up function + if _, err := parsedArgs.setupFn(goja.Undefined(), rt.ToValue(&client)); err != nil { + _ = client.closeResponseBody() + return nil, err + } + + // The connection is now open, emit the event + client.handleEvent("open") + + readEventChan := make(chan Event) + readErrChan := make(chan error) + readCloseChan := make(chan int) + + // Wraps a couple of channels + go client.readEvents(readEventChan, readErrChan, readCloseChan) + + // This is the main control loop. All JS code (including error handlers) + // should only be executed by this thread to avoid race conditions + for { + select { + case event := <-readEventChan: + metrics.PushIfNotDone(ctx, client.samplesOutput, metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: client.builtinMetrics.SSEEventReceived, + Tags: client.tagsAndMeta.Tags, + }, + Time: time.Now(), + Metadata: client.tagsAndMeta.Metadata, + Value: 1, + }) + + client.handleEvent("event", rt.ToValue(event)) + + case readErr := <-readErrChan: + client.handleEvent("error", rt.ToValue(readErr)) + + case <-ctx.Done(): + // VU is shutting down during an interrupt + // client events will not be forwarded to the VU + _ = client.closeResponseBody() + + case <-readCloseChan: + _ = client.closeResponseBody() + + case <-client.done: + // This is the final exit point normally triggered by closeResponseBody + return client.wrapHTTPResponse("") + } + } +} + +func (mi *sse) open(ctx context.Context, state *lib.State, + rt *goja.Runtime, url string, args *sseOpenArgs, +) (*Client, func(), error) { + sseClient := Client{ + ctx: ctx, + rt: rt, + url: url, + eventHandlers: make(map[string][]goja.Callable), + done: make(chan struct{}), + samplesOutput: state.Samples, + tagsAndMeta: args.tagsAndMeta, + builtinMetrics: state.BuiltinMetrics, + } + + // Overriding the NextProtos to avoid talking http2 + var tlsConfig *tls.Config + if state.TLSConfig != nil { + tlsConfig = state.TLSConfig.Clone() + tlsConfig.NextProtos = []string{"http/1.1"} + } + + httpClient := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsConfig, + }, + } + + // httpClient.Jar must never be nil + if args.cookieJar != nil { + httpClient.Jar = args.cookieJar + } + + httpMethod := http.MethodGet + if args.method != "" { + httpMethod = args.method + } + + req, err := http.NewRequestWithContext(ctx, httpMethod, url, strings.NewReader(args.body)) + if err != nil { + return &sseClient, nil, err + } + + req.Header.Set("Cache-Control", "no-cache") + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Connection", "keep-alive") + + // Wrap the request to retrieve the server IP tag + trace := &httptrace.ClientTrace{ + GotConn: func(connInfo httptrace.GotConnInfo) { + if state.Options.SystemTags.Has(metrics.TagIP) { + if ip, _, err2 := net.SplitHostPort(connInfo.Conn.RemoteAddr().String()); err2 == nil { + args.tagsAndMeta.SetSystemTagOrMeta(metrics.TagIP, ip) + } + } + }, + } + + //nolint:contextcheck // parent context already passed in the request + req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + + connStart := time.Now() + //nolint:bodyclose // Body is deferred closed in closeResponseBody + resp, err := httpClient.Do(req) + connEnd := time.Now() + + if resp != nil { + sseClient.resp = resp + if state.Options.SystemTags.Has(metrics.TagStatus) { + args.tagsAndMeta.SetSystemTagOrMeta( + metrics.TagStatus, strconv.Itoa(resp.StatusCode)) + } + } + + connEndHook := sseClient.pushSSEMetrics(connStart, connEnd) + + return &sseClient, connEndHook, err +} + +// On is used to configure what the client should do on each event. +func (c *Client) On(event string, handler goja.Value) { + if handler, ok := goja.AssertFunction(handler); ok { + c.eventHandlers[event] = append(c.eventHandlers[event], handler) + } +} + +func (c *Client) handleEvent(event string, args ...goja.Value) { + if handlers, ok := c.eventHandlers[event]; ok { + for _, handler := range handlers { + if _, err := handler(goja.Undefined(), args...); err != nil { + common.Throw(c.rt, err) + } + } + } +} + +// closeResponseBody cleanly closes the response body. +// Returns an error if sending the response body cannot be closed. +func (c *Client) closeResponseBody() error { + var err error + + c.shutdownOnce.Do(func() { + err = c.resp.Body.Close() + if err != nil { + // Call the user-defined error handler + c.handleEvent("error", c.rt.ToValue(err)) + } + close(c.done) + }) + + return err +} + +func (c *Client) pushSSEMetrics(connStart, connEnd time.Time) func() { + connDuration := metrics.D(connEnd.Sub(connStart)) + + metrics.PushIfNotDone(c.ctx, c.samplesOutput, metrics.ConnectedSamples{ + Samples: []metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: c.builtinMetrics.HTTPReqSending, + Tags: c.tagsAndMeta.Tags, + }, + Time: connStart, + Metadata: c.tagsAndMeta.Metadata, + Value: connDuration, + }, + }, + Tags: c.tagsAndMeta.Tags, + Time: connStart, + }) + + return func() { + end := time.Now() + requestDuration := metrics.D(end.Sub(connStart)) + + metrics.PushIfNotDone(c.ctx, c.samplesOutput, metrics.ConnectedSamples{ + Samples: []metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: c.builtinMetrics.HTTPReqs, + Tags: c.tagsAndMeta.Tags, + }, + Time: end, + Metadata: c.tagsAndMeta.Metadata, + Value: 1, + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: c.builtinMetrics.HTTPReqSending, + Tags: c.tagsAndMeta.Tags, + }, + Time: end, + Metadata: c.tagsAndMeta.Metadata, + Value: connDuration, + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: c.builtinMetrics.HTTPReqDuration, + Tags: c.tagsAndMeta.Tags, + }, + Time: end, + Metadata: c.tagsAndMeta.Metadata, + Value: requestDuration, + }, + }, + Tags: c.tagsAndMeta.Tags, + Time: end, + }) + } +} + +// Wraps SSE in a channel, follow the SSE format described in: +// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events +func (c *Client) readEvents(readChan chan Event, errorChan chan error, closeChan chan int) { + reader := bufio.NewReader(c.resp.Body) + ev := Event{} + var buf bytes.Buffer + + for { + line, err := reader.ReadBytes('\n') + if err != nil { + if errors.Is(err, io.EOF) { + select { + case closeChan <- -1: + return + case <-c.done: + return + } + } else { + select { + case errorChan <- err: + return + case <-c.done: + return + } + } + } + + switch { + // id of event + case hasPrefix(line, "id: "): + ev.ID = stripPrefix(line, 4) + case hasPrefix(line, "id:"): + ev.ID = stripPrefix(line, 3) + + // Comment + case hasPrefix(line, ": "): + ev.Comment = stripPrefix(line, 2) + case hasPrefix(line, ":"): + ev.Comment = stripPrefix(line, 1) + + // name of event + case hasPrefix(line, "event: "): + ev.Name = stripPrefix(line, 7) + case hasPrefix(line, "event:"): + ev.Name = stripPrefix(line, 6) + + // event data + case hasPrefix(line, "data: "): + buf.Write(line[6:]) + + case hasPrefix(line, "data:"): + buf.Write(line[5:]) + + case hasPrefix(line, "retry:"): + // Retry, do nothing for now + + // end of event + case bytes.Equal(line, []byte("\n")): + // Trailing newlines are removed. + ev.Data = strings.TrimRightFunc(buf.String(), func(r rune) bool { + return r == '\r' || r == '\n' + }) + + select { + case readChan <- ev: + buf.Reset() + ev = Event{} + case <-c.done: + return + } + default: + select { + case errorChan <- errors.New("unknown event: " + string(line)): + case <-c.done: + return + } + } + } +} + +// Wrap the raw HTTPResponse we received to a sse.HTTPResponse we can pass to the user +func (c *Client) wrapHTTPResponse(errMessage string) (*HTTPResponse, error) { + if errMessage != "" { + return &HTTPResponse{Error: errMessage}, nil + } + sseResponse := HTTPResponse{ + URL: c.url, + Status: c.resp.StatusCode, + } + + sseResponse.Headers = make(map[string]string, len(c.resp.Header)) + for k, vs := range c.resp.Header { + sseResponse.Headers[k] = strings.Join(vs, ", ") + } + + return &sseResponse, nil +} + +func parseConnectArgs(state *lib.State, rt *goja.Runtime, args ...goja.Value) (*sseOpenArgs, error) { + // The params argument is optional + var callableV, paramsV goja.Value + switch len(args) { + case 2: + paramsV = args[0] + callableV = args[1] + case 1: + paramsV = goja.Undefined() + callableV = args[0] + default: + return nil, errors.New("invalid number of arguments to sse.open") + } + // Get the callable (required) + setupFn, isFunc := goja.AssertFunction(callableV) + if !isFunc { + return nil, errors.New("last argument to sse.open must be a function") + } + + headers := make(http.Header) + headers.Set("User-Agent", state.Options.UserAgent.String) + tagsAndMeta := state.Tags.GetCurrentValues() + parsedArgs := &sseOpenArgs{ + setupFn: setupFn, + headers: headers, + cookieJar: state.CookieJar, + tagsAndMeta: &tagsAndMeta, + } + + if goja.IsUndefined(paramsV) || goja.IsNull(paramsV) { + return parsedArgs, nil + } + + // Parse the optional second argument (params) + params := paramsV.ToObject(rt) + for _, k := range params.Keys() { + switch k { + case "headers": + headersV := params.Get(k) + if goja.IsUndefined(headersV) || goja.IsNull(headersV) { + continue + } + headersObj := headersV.ToObject(rt) + if headersObj == nil { + continue + } + for _, key := range headersObj.Keys() { + parsedArgs.headers.Set(key, headersObj.Get(key).String()) + } + case "tags": + if err := common.ApplyCustomUserTags(rt, parsedArgs.tagsAndMeta, params.Get(k)); err != nil { + return nil, fmt.Errorf("invalid sse.open() metric tags: %w", err) + } + case "jar": + jarV := params.Get(k) + if goja.IsUndefined(jarV) || goja.IsNull(jarV) { + continue + } + if v, ok := jarV.Export().(*httpModule.CookieJar); ok { + parsedArgs.cookieJar = v.Jar + } + case "method": + parsedArgs.method = strings.TrimSpace(params.Get(k).ToString().String()) + case "body": + parsedArgs.body = strings.TrimSpace(params.Get(k).ToString().String()) + } + } + + return parsedArgs, nil +} + +func hasPrefix(s []byte, prefix string) bool { + return bytes.HasPrefix(s, []byte(prefix)) +} + +func stripPrefix(line []byte, start int) string { + return string(line[start : len(line)-1]) +} diff --git a/js/modules/k6/experimental/sse/sse_test.go b/js/modules/k6/experimental/sse/sse_test.go new file mode 100644 index 00000000000..31acf015ba8 --- /dev/null +++ b/js/modules/k6/experimental/sse/sse_test.go @@ -0,0 +1,393 @@ +package sse + +import ( + "crypto/tls" + "net/http" + "net/http/cookiejar" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + httpModule "go.k6.io/k6/js/modules/k6/http" + "go.k6.io/k6/js/modulestest" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils/httpmultibin" + "go.k6.io/k6/metrics" + "gopkg.in/guregu/null.v3" +) + +func assertSSEMetricsEmitted(t *testing.T, sampleContainers []metrics.SampleContainer, subprotocol, url string, status int, group string) { //nolint:unparam + seenEvents := false + seenRequestDuration := false + seenHTTPReq := false + + for _, sampleContainer := range sampleContainers { + for _, sample := range sampleContainer.GetSamples() { + tags := sample.Tags.Map() + if tags["url"] == url { + switch sample.Metric.Name { + case metrics.HTTPReqsName: + seenHTTPReq = true + case metrics.HTTPReqDurationName: + seenRequestDuration = true + case metrics.SSEName: + seenEvents = true + } + + assert.Equal(t, strconv.Itoa(status), tags["status"]) + assert.Equal(t, subprotocol, tags["subproto"]) + assert.Equal(t, group, tags["group"]) + } + } + } + assert.True(t, seenEvents, "url %s didn't emit SSE events", url) + assert.True(t, seenRequestDuration, "url %s didn't emit seenRequestDuration", url) + assert.True(t, seenHTTPReq, "url %s didn't emit seenHTTPReq", url) +} + +func assertMetricEmittedCount(t *testing.T, metricName string, sampleContainers []metrics.SampleContainer, url string, count int) { + t.Helper() + actualCount := 0 + + for _, sampleContainer := range sampleContainers { + for _, sample := range sampleContainer.GetSamples() { + surl, ok := sample.Tags.Get("url") + assert.True(t, ok) + if surl == url && sample.Metric.Name == metricName { + actualCount++ + } + } + } + assert.Equal(t, count, actualCount, "url %s emitted %s %d times, expected was %d times", url, metricName, actualCount, count) +} + +type testState struct { + *modulestest.Runtime + tb *httpmultibin.HTTPMultiBin + samples chan metrics.SampleContainer +} + +func newTestState(t testing.TB) testState { + tb := httpmultibin.NewHTTPMultiBin(t) + + testRuntime := modulestest.NewRuntime(t) + samples := make(chan metrics.SampleContainer, 1000) + + root, err := lib.NewGroup("", nil) + require.NoError(t, err) + registry := metrics.NewRegistry() + state := &lib.State{ + Group: root, + Dialer: tb.Dialer, + Options: lib.Options{ + SystemTags: metrics.NewSystemTagSet( + metrics.TagURL, + metrics.TagProto, + metrics.TagStatus, + metrics.TagSubproto, + ), + UserAgent: null.StringFrom("TestUserAgent"), + Throw: null.BoolFrom(true), + }, + Samples: samples, + TLSConfig: tb.TLSClientConfig, + BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), + Tags: lib.NewVUStateTags(registry.RootTagSet()), + } + + m := New().NewModuleInstance(testRuntime.VU) + require.NoError(t, testRuntime.VU.RuntimeField.Set("sse", m.Exports().Default)) + testRuntime.MoveToVUContext(state) + + return testState{ + Runtime: testRuntime, + tb: tb, + samples: samples, + } +} + +func TestOpen(t *testing.T) { + t.Parallel() + + t.Run("nominal get", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(sr(` + var open = false; + var error = false; + var events = []; + var res = sse.open("HTTPBIN_IP_URL/sse", function(client){ + client.on("error", function(err) { + error = true + }); + client.on("open", function(err) { + open = true + }); + client.on("error", function(err) { + error = true + }); + client.on("event", function(event) { + events.push(event); + }); + }); + if (!open) { + throw new Error("opened is not called"); + } + if (error) { + throw new Error("error raised"); + } + for (let i = 0; i < events.length; i++) { + let event = events[i]; + switch(i) { + case 0: + if (event.id !== "ABCD") { + throw new Error("unexpected event id: " + event.id); + } + if (event.comment !== 'hello') { + throw new Error("unexpected event comment: " + event.comment); + } + if (event.data !== '{"ping": "pong"}\n{"hello": "sse"}') { + throw new Error("unexpected event data: " + event.data); + } + break; + case 1: + if (event.id !== "") { + throw new Error("unexpected event id: " + event.id); + } + if (event.name !== "EFGH") { + throw new Error("unexpected event name: " + event.name); + } + if (event.data !== '{"hello": "sse"}') { + throw new Error("unexpected event data: " + event.data); + } + break; + default: + throw new Error("unexpected event"); + } + } + `)) + require.NoError(t, err) + samplesBuf := metrics.GetBufferedSamples(test.samples) + assertMetricEmittedCount(t, metrics.SSEName, samplesBuf, sr("HTTPBIN_IP_URL/sse"), 2) + }) + + t.Run("post method", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(sr(` + var events = []; + var res = sse.open("HTTPBIN_IP_URL/sse", {method: 'POST', body: '{"ping": true}'}, function(client){ + client.on("event", function(event) { + events.push(event); + }); + }); + for (let i = 0; i < events.length; i++) { + let event = events[i]; + switch(i) { + case 0: + if (event.id !== "pong") { + throw new Error("unexpected event id: " + event.id); + } + if (event.data !== '{"ping": "pong"}') { + throw new Error("unexpected event data: " + event.data); + } + break; + default: + throw new Error("unexpected event"); + } + } + `)) + require.NoError(t, err) + samplesBuf := metrics.GetBufferedSamples(test.samples) + assertMetricEmittedCount(t, metrics.SSEName, samplesBuf, sr("HTTPBIN_IP_URL/sse"), 1) + }) +} + +func TestErrors(t *testing.T) { + t.Parallel() + + t.Run("invalid_url", func(t *testing.T) { + t.Parallel() + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(` + var res = sse.open("INVALID", function(client){ + client.on("open", function(client){}); + }); + `) + assert.Error(t, err) + }) + + t.Run("error_in_setup", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_URL/sse-echo", function(client){ + throw new Error("error in setup"); + }); + `)) + assert.Error(t, err) + }) + + t.Run("error_in_response", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + _, err := test.VU.Runtime().RunString(sr(` + var error = false; + var res = sse.open("HTTPBIN_IP_URL/sse-invalid", function(client){ + client.on("error", function(err) { + error = true + }); + }); + if (!error) { + throw new Error("no error raised"); + } + `)) + require.NoError(t, err) + }) +} + +func TestOpenWrongStatusCode(t *testing.T) { + t.Parallel() + test := newTestState(t) + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + test.VU.StateField.Options.Throw = null.BoolFrom(false) + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/status/404", function(client){}); + if (res.status != 404) { + throw new Error ("no status code set for invalid response"); + } + `)) + assert.NoError(t, err) +} + +func TestUserAgent(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + tb.Mux.HandleFunc("/sse-echo-useragent", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + // Echo back User-Agent header if it exists + responseHeaders := w.Header() + if ua := req.Header.Get("User-Agent"); ua != "" { + responseHeaders.Add("X-Echo-User-Agent", req.Header.Get("User-Agent")) + } + _, err := w.Write([]byte(`data: {"ping": "pong"}` + "\n\n")) + require.NoError(t, err) + })) + + test := newTestState(t) + + // client handler should echo back User-Agent as Echo-User-Agent for this test to work + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/sse-echo-useragent", function(client){}) + var userAgent = res.headers["X-Echo-User-Agent"]; + if (userAgent == undefined) { + throw new Error("user agent is not echoed back by test server"); + } + if (userAgent != "Go-http-client/1.1") { + throw new Error("incorrect user agent: " + userAgent); + } + `)) + require.NoError(t, err) + + assertSSEMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("HTTPBIN_IP_URL/sse-echo-useragent"), http.StatusOK, "") +} + +func TestCookieJar(t *testing.T) { + t.Parallel() + ts := newTestState(t) + sr := ts.tb.Replacer.Replace + + ts.tb.Mux.HandleFunc("/sse-echo-someheader", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + responseHeaders := w.Header() + if sh, err := req.Cookie("someheader"); err == nil { + responseHeaders.Add("Echo-Someheader", sh.Value) + } + _, err := w.Write([]byte(`data: {"ping": "pong"}` + "\n\n")) + require.NoError(t, err) + })) + + err := ts.VU.Runtime().Set("http", httpModule.New().NewModuleInstance(ts.VU).Exports().Default) + require.NoError(t, err) + ts.VU.State().CookieJar, _ = cookiejar.New(nil) + + _, err = ts.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/sse-echo-someheader", function(client){}) + var someheader = res.headers["Echo-Someheader"]; + if (someheader !== undefined) { + throw new Error("someheader is echoed back by test server even though it doesn't exist"); + } + + http.cookieJar().set("HTTPBIN_IP_URL/sse-echo-someheader", "someheader", "defaultjar") + res = sse.open("HTTPBIN_IP_URL/sse-echo-someheader", function(client){}) + someheader = res.headers["Echo-Someheader"]; + if (someheader != "defaultjar") { + throw new Error("someheader has wrong value "+ someheader + " instead of defaultjar"); + } + + var jar = new http.CookieJar(); + jar.set("HTTPBIN_IP_URL/sse-echo-someheader", "someheader", "customjar") + res = sse.open("HTTPBIN_IP_URL/sse-echo-someheader", {jar: jar}, function(client){}) + someheader = res.headers["Echo-Someheader"]; + if (someheader != "customjar") { + throw new Error("someheader has wrong value "+ someheader + " instead of customjar"); + } + `)) + require.NoError(t, err) + + assertSSEMetricsEmitted(t, metrics.GetBufferedSamples(ts.samples), "", sr("HTTPBIN_IP_URL/sse-echo-someheader"), http.StatusOK, "") +} + +func TestTLSConfig(t *testing.T) { + t.Parallel() + t.Run("insecure skip verify", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + test.VU.StateField.TLSConfig = &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec + } + + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/sse", function(client){}); + if (res.status != 200) { throw new Error("TLS connection failed with status: " + res.status); } + `)) + require.NoError(t, err) + assertSSEMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("HTTPBIN_IP_URL/sse"), http.StatusOK, "") + }) + + t.Run("custom certificates", func(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + sr := tb.Replacer.Replace + + test := newTestState(t) + test.VU.StateField.TLSConfig = tb.TLSClientConfig + + _, err := test.VU.Runtime().RunString(sr(` + var res = sse.open("HTTPBIN_IP_URL/sse", function(client){}); + if (res.status != 200) { + throw new Error("TLS connection failed with status: " + res.status); + } + `)) + require.NoError(t, err) + assertSSEMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("HTTPBIN_IP_URL/sse"), http.StatusOK, "") + }) +} diff --git a/js/runner_test.go b/js/runner_test.go index f8a0273f0a1..ed0409e622b 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -10,6 +10,7 @@ import ( "crypto/x509/pkix" "encoding/pem" "fmt" + "go.k6.io/k6/js/modules/k6/experimental/sse" "go/build" "io" "io/fs" @@ -1913,6 +1914,19 @@ func TestInitContextForbidden(t *testing.T) { exports.default = function() { console.log("p"); }`, ws.ErrWSInInitContext.Error(), }, + { + "sse", + `var sse = require("k6/experimental/sse"); + var url = "https://echo.websocket.org/.sse"; + var response = sse.open(url, function (client) { + client.on('open', function open() { + console.log('connected'); + }) + }); + + exports.default = function() { console.log("p"); }`, + sse.ErrSSEInInitContext.Error(), + }, { "metric", `var Counter = require("k6/metrics").Counter; diff --git a/lib/testutils/httpmultibin/httpmultibin.go b/lib/testutils/httpmultibin/httpmultibin.go index 36e4ad37387..eec4a3a04d5 100644 --- a/lib/testutils/httpmultibin/httpmultibin.go +++ b/lib/testutils/httpmultibin/httpmultibin.go @@ -160,6 +160,49 @@ func echoHandler(t testing.TB, closePrematurely bool) http.Handler { }) } +// sseHandler handles sse requests and generates some events. +// If generateErrors is true then it generates junk +// without respecting the protocol. +func sseHandler(t testing.TB, generateErrors bool) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if generateErrors { + _, _ = w.Write([]byte("junk\n")) + } else { + if req.Method == http.MethodPost { + body, err := io.ReadAll(req.Body) + require.NoError(t, err) + if `{"ping": true}` != string(body) { + t.Fail() + } + + _, err = w.Write([]byte("id: pong\n")) // event id + require.NoError(t, err) + _, err = w.Write([]byte(`data: {"ping": "pong"}` + "\n\n")) // event data + require.NoError(t, err) + } else { + _, err := w.Write([]byte("retry: 10000\n")) // retry + require.NoError(t, err) + + _, err = w.Write([]byte(": hello\n")) // comment + require.NoError(t, err) + + _, err = w.Write([]byte("id: ABCD\n")) // id + require.NoError(t, err) + + _, err = w.Write([]byte(`data: {"ping": "pong"}` + "\n")) // event 1 data 1 + require.NoError(t, err) + _, err = w.Write([]byte(`data: {"hello": "sse"}` + "\n\n")) // event 1 data 2 + require.NoError(t, err) + + _, err = w.Write([]byte("event: EFGH\n")) // event name + require.NoError(t, err) + _, err = w.Write([]byte(`data: {"hello": "sse"}` + "\n\n")) // event 2 data + require.NoError(t, err) + } + } + }) +} + func writeJSON(w io.Writer, v interface{}) error { e := json.NewEncoder(w) e.SetIndent("", " ") @@ -294,6 +337,8 @@ func NewHTTPMultiBin(t testing.TB) *HTTPMultiBin { mux.Handle("/ws-echo-invalid", echoHandler(t, true)) mux.Handle("/ws-close", autocloseHandler(t)) mux.Handle("/ws-close-invalid", echoHandler(t, true)) + mux.Handle("/sse", sseHandler(t, false)) + mux.Handle("/sse-invalid", sseHandler(t, true)) mux.Handle("/zstd", getEncodedHandler(t, "zstd")) mux.Handle("/zstd-br", getZstdBrHandler(t)) mux.Handle("/", httpbin.New().Handler()) diff --git a/metrics/builtin.go b/metrics/builtin.go index d05f2baf002..0842632c089 100644 --- a/metrics/builtin.go +++ b/metrics/builtin.go @@ -33,6 +33,8 @@ const ( DataSentName = "data_sent" DataReceivedName = "data_received" + + SSEName = "sse_event" ) // BuiltinMetrics represent all the builtin metrics of k6 @@ -72,6 +74,9 @@ type BuiltinMetrics struct { // Network-related; used for future protocols as well. DataSent *Metric DataReceived *Metric + + // SSE-related + SSEEventReceived *Metric } // RegisterBuiltinMetrics register and returns the builtin metrics in the provided registry @@ -107,5 +112,7 @@ func RegisterBuiltinMetrics(registry *Registry) *BuiltinMetrics { DataSent: registry.MustNewMetric(DataSentName, Counter, Data), DataReceived: registry.MustNewMetric(DataReceivedName, Counter, Data), + + SSEEventReceived: registry.MustNewMetric(SSEName, Counter), } }