From 6984fa2d7d6bfb03713e570ac1cadab4e33f1605 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Mon, 26 Mar 2018 15:24:31 +0300 Subject: [PATCH 01/12] Fix data race with concurrent map read and write This fixes https://github.com/loadimpact/k6/issues/207 --- lib/models.go | 43 ++++++++++++++++-------------------------- lib/models_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 27 deletions(-) diff --git a/lib/models.go b/lib/models.go index 9802f1c9893..d09e87dae38 100644 --- a/lib/models.go +++ b/lib/models.go @@ -148,46 +148,35 @@ func NewGroup(name string, parent *Group) (*Group, error) { }, nil } -// Create a child group belonging to this group. +// Group creates a child group belonging to this group. // This is safe to call from multiple goroutines simultaneously. func (g *Group) Group(name string) (*Group, error) { - snapshot := g.Groups - group, ok := snapshot[name] + g.groupMutex.Lock() + defer g.groupMutex.Unlock() + group, ok := g.Groups[name] if !ok { - g.groupMutex.Lock() - defer g.groupMutex.Unlock() - - group, ok := g.Groups[name] - if !ok { - group, err := NewGroup(name, g) - if err != nil { - return nil, err - } - g.Groups[name] = group - return group, nil + group, err := NewGroup(name, g) + if err != nil { + return nil, err } + g.Groups[name] = group return group, nil } return group, nil } -// Create a check belonging to this group. +// Check creates a chold check belonging to this group. // This is safe to call from multiple goroutines simultaneously. func (g *Group) Check(name string) (*Check, error) { - snapshot := g.Checks - check, ok := snapshot[name] + g.checkMutex.Lock() + defer g.checkMutex.Unlock() + check, ok := g.Checks[name] if !ok { - g.checkMutex.Lock() - defer g.checkMutex.Unlock() - check, ok := g.Checks[name] - if !ok { - check, err := NewCheck(name, g) - if err != nil { - return nil, err - } - g.Checks[name] = check - return check, nil + check, err := NewCheck(name, g) + if err != nil { + return nil, err } + g.Checks[name] = check return check, nil } return check, nil diff --git a/lib/models_test.go b/lib/models_test.go index e53e447abb2..85d020fd3b8 100644 --- a/lib/models_test.go +++ b/lib/models_test.go @@ -22,6 +22,7 @@ package lib import ( "encoding/json" + "sync" "testing" "time" @@ -41,3 +42,49 @@ func TestStageJSON(t *testing.T) { assert.NoError(t, json.Unmarshal(data, &s2)) assert.Equal(t, s, s2) } + +// Suggested by @nkovacs in https://github.com/loadimpact/k6/issues/207#issuecomment-330545467 +func TestDataRaces(t *testing.T) { + t.Run("Check race", func(t *testing.T) { + group, err := NewGroup("test", nil) + assert.Nil(t, err, "NewGroup") + wg := sync.WaitGroup{} + wg.Add(2) + var check1, check2 *Check + go func() { + var err error // using the outer err would result in a data race + check1, err = group.Check("race") + assert.Nil(t, err, "Check 1") + wg.Done() + }() + go func() { + var err error + check2, err = group.Check("race") + assert.Nil(t, err, "Check 2") + wg.Done() + }() + wg.Wait() + assert.Equal(t, check1, check2, "Checks are the same") + }) + t.Run("Group race", func(t *testing.T) { + group, err := NewGroup("test", nil) + assert.Nil(t, err, "NewGroup") + wg := sync.WaitGroup{} + wg.Add(2) + var group1, group2 *Group + go func() { + var err error + group1, err = group.Group("race") + assert.Nil(t, err, "Group 1") + wg.Done() + }() + go func() { + var err error + group2, err = group.Group("race") + assert.Nil(t, err, "Group 2") + wg.Done() + }() + wg.Wait() + assert.Equal(t, group1, group2, "Groups are the same") + }) +} From 1d76bdc5e8c07ea50dbb4755e9d9bd01a22e6ba0 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 28 Mar 2018 01:14:35 +0300 Subject: [PATCH 02/12] Fix data races in netext.Tracer --- lib/netext/tracer.go | 258 +++++++++++++++++++++++--------------- lib/netext/tracer_test.go | 3 + 2 files changed, 162 insertions(+), 99 deletions(-) diff --git a/lib/netext/tracer.go b/lib/netext/tracer.go index 90a74935899..46a1b7c1787 100644 --- a/lib/netext/tracer.go +++ b/lib/netext/tracer.go @@ -1,7 +1,7 @@ /* * * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact + * Copyright (C) 2018 Load Impact * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -24,6 +24,8 @@ import ( "crypto/tls" "net" "net/http/httptrace" + "sync" + "sync/atomic" "time" "github.com/loadimpact/k6/lib/metrics" @@ -41,16 +43,17 @@ type Trail struct { Blocked time.Duration // Waiting to acquire a connection. Connecting time.Duration // Connecting to remote host. + TLSHandshaking time.Duration // Executing TLS handshake. Sending time.Duration // Writing request. Waiting time.Duration // Waiting for first byte. Receiving time.Duration // Receiving response. - TLSHandshaking time.Duration // Executing TLS handshake. // Detailed connection information. ConnReused bool ConnRemoteAddr net.Addr } +// Samples returns a slice with all of the pre-calculated sample values for the request func (tr Trail) Samples(tags map[string]string) []stats.Sample { return []stats.Sample{ {Metric: metrics.HTTPReqs, Time: tr.EndTime, Tags: tags, Value: 1}, @@ -67,147 +70,204 @@ func (tr Trail) Samples(tags map[string]string) []stats.Sample { // A Tracer wraps "net/http/httptrace" to collect granular timings for HTTP requests. // Note that since there is not yet an event for the end of a request (there's a PR to // add it), you must call Done() at the end of the request to get the full timings. -// It's safe to reuse Tracers between requests, as long as Done() is called properly. +// It's NOT safe to reuse Tracers between requests. // Cheers, love, the cavalry's here. type Tracer struct { - getConn time.Time - gotConn time.Time - gotFirstResponseByte time.Time - connectStart time.Time - connectDone time.Time - wroteRequest time.Time - tlsHandshakeStart time.Time - tlsHandshakeDone time.Time + getConn int64 + connectStart int64 + connectDone int64 + tlsHandshakeStart int64 + tlsHandshakeDone int64 + gotConn int64 + wroteRequest int64 + gotFirstResponseByte int64 connReused bool connRemoteAddr net.Addr - protoError error + protoErrorsMutex sync.Mutex + protoErrors []error } -// Trace() returns a premade ClientTrace that calls all of the Tracer's hooks. +// Trace returns a premade ClientTrace that calls all of the Tracer's hooks. func (t *Tracer) Trace() *httptrace.ClientTrace { return &httptrace.ClientTrace{ GetConn: t.GetConn, - GotConn: t.GotConn, - GotFirstResponseByte: t.GotFirstResponseByte, ConnectStart: t.ConnectStart, ConnectDone: t.ConnectDone, - WroteRequest: t.WroteRequest, TLSHandshakeStart: t.TLSHandshakeStart, TLSHandshakeDone: t.TLSHandshakeDone, + GotConn: t.GotConn, + WroteRequest: t.WroteRequest, + GotFirstResponseByte: t.GotFirstResponseByte, } } -// Call when the request is finished. Calculates metrics and resets the tracer. -func (t *Tracer) Done() Trail { - done := time.Now() +// Add an error in a thread-safe way +func (t *Tracer) addError(err error) { + t.protoErrorsMutex.Lock() + defer t.protoErrorsMutex.Unlock() + t.protoErrors = append(t.protoErrors, err) +} - trail := Trail{ - ConnReused: t.connReused, - ConnRemoteAddr: t.connRemoteAddr, - } +func now() int64 { + return time.Now().UnixNano() +} - if !t.gotConn.IsZero() && !t.getConn.IsZero() { - trail.Blocked = t.gotConn.Sub(t.getConn) - } - if !t.connectDone.IsZero() && !t.connectStart.IsZero() { - trail.Connecting = t.connectDone.Sub(t.connectStart) - } - if !t.tlsHandshakeDone.IsZero() && !t.tlsHandshakeStart.IsZero() { - trail.TLSHandshaking = t.tlsHandshakeDone.Sub(t.tlsHandshakeStart) - } - if !t.wroteRequest.IsZero() { - trail.Sending = t.wroteRequest.Sub(t.connectDone) - // If the request was sent over TLS, we need to use - // TLS Handshake Done time to calculate sending duration - if !t.tlsHandshakeDone.IsZero() { - trail.Sending = t.wroteRequest.Sub(t.tlsHandshakeDone) - } +// GetConn is called before a connection is created or +// retrieved from an idle pool. The hostPort is the +// "host:port" of the target or proxy. GetConn is called even +// if there's already an idle cached connection available. +// +// Keep in mind that GetConn won't be called if a connection +// is reused though, for example when there's a redirect. +// If it's called, it will be called before all other hooks. +func (t *Tracer) GetConn(hostPort string) { + t.getConn = now() +} - if !t.gotFirstResponseByte.IsZero() { - trail.Waiting = t.gotFirstResponseByte.Sub(t.wroteRequest) - } - } - if !t.gotFirstResponseByte.IsZero() { - trail.Receiving = done.Sub(t.gotFirstResponseByte) - } +// ConnectStart is called when a new connection's Dial begins. +// If net.Dialer.DualStack (IPv6 "Happy Eyeballs") support is +// enabled, this may be called multiple times. +// +// If the connection is reused, this won't be called. Otherwise, +// it will be called after GetConn() and before ConnectDone(). +func (t *Tracer) ConnectStart(network, addr string) { + // If using dual-stack dialing, it's possible to get this + // multiple times, so the atomic compareAndSwap ensures + // that only the first call's time is recorded + atomic.CompareAndSwapInt64(&t.connectStart, 0, now()) +} - // Calculate total times using adjusted values. - trail.EndTime = done - trail.Duration = trail.Sending + trail.Waiting + trail.Receiving - trail.StartTime = trail.EndTime.Add(-trail.Duration) +// ConnectDone is called when a new connection's Dial +// completes. The provided err indicates whether the +// connection completedly successfully. +// If net.Dialer.DualStack ("Happy Eyeballs") support is +// enabled, this may be called multiple times. +// +// If the connection is reused, this won't be called. Otherwise, +// it will be called after ConnectStart() and before either +// TLSHandshakeStart() (for TLS connections) or GotConn(). +func (t *Tracer) ConnectDone(network, addr string, err error) { + // If using dual-stack dialing, it's possible to get this + // multiple times, so the atomic compareAndSwap ensures + // that only the first call's time is recorded + atomic.CompareAndSwapInt64(&t.connectDone, 0, now()) - *t = Tracer{} - return trail + if err != nil { + t.addError(err) + } } -// GetConn event hook. -func (t *Tracer) GetConn(hostPort string) { - t.getConn = time.Now() +// TLSHandshakeStart is called when the TLS handshake is started. When +// connecting to a HTTPS site via a HTTP proxy, the handshake happens after +// the CONNECT request is processed by the proxy. +// +// If the connection is reused, this won't be called. Otherwise, +// it will be called after ConnectDone() and before TLSHandshakeDone(). +func (t *Tracer) TLSHandshakeStart() { + // This shouldn't be called multiple times so no synchronization here, + // it's better for the race detector to panic if we're wrong. + t.tlsHandshakeStart = now() +} + +// TLSHandshakeDone is called after the TLS handshake with either the +// successful handshake's connection state, or a non-nil error on handshake +// failure. +// +// If the connection is reused, this won't be called. Otherwise, +// it will be called after TLSHandshakeStart() and before GotConn(). +func (t *Tracer) TLSHandshakeDone(state tls.ConnectionState, err error) { + // This shouldn't be called multiple times so no synchronization here, + // it's better for the race detector to panic if we're wrong. + t.tlsHandshakeDone = now() + + if err != nil { + t.addError(err) + } } -// GotConn event hook. +// GotConn is called after a successful connection is +// obtained. There is no hook for failure to obtain a +// connection; instead, use the error from Transport.RoundTrip. +// +// This is the fist hook called for reused connections. For new +// connections, it's called either after TLSHandshakeDone() +// (for TLS connections) or after ConnectDone() func (t *Tracer) GotConn(info httptrace.GotConnInfo) { - t.gotConn = time.Now() + now := now() + + // This shouldn't be called multiple times so no synchronization here, + // it's better for the race detector to panic if we're wrong. + t.gotConn = now t.connReused = info.Reused t.connRemoteAddr = info.Conn.RemoteAddr() if t.connReused { - t.connectStart = t.gotConn - t.connectDone = t.gotConn + atomic.CompareAndSwapInt64(&t.connectStart, 0, now) + atomic.CompareAndSwapInt64(&t.connectDone, 0, now) } } -// GotFirstResponseByte hook. -func (t *Tracer) GotFirstResponseByte() { - t.gotFirstResponseByte = time.Now() -} +// WroteRequest is called with the result of writing the +// request and any body. It may be called multiple times +// in the case of retried requests. +// +// +func (t *Tracer) WroteRequest(info httptrace.WroteRequestInfo) { + atomic.CompareAndSwapInt64(&t.wroteRequest, 0, now()) -// ConnectStart hook. -func (t *Tracer) ConnectStart(network, addr string) { - // If using dual-stack dialing, it's possible to get this multiple times. - if !t.connectStart.IsZero() { - return + if info.Err != nil { + t.addError(info.Err) } - t.connectStart = time.Now() } -// ConnectDone hook. -func (t *Tracer) ConnectDone(network, addr string, err error) { - // If using dual-stack dialing, it's possible to get this multiple times. - if !t.connectDone.IsZero() { - return - } - - t.connectDone = time.Now() - if t.gotConn.IsZero() { - t.gotConn = t.connectDone - } - - if err != nil { - t.protoError = err - } +// GotFirstResponseByte is called when the first byte of the response +// headers is available. +func (t *Tracer) GotFirstResponseByte() { + // This shouldn't be called multiple times so no synchronization here, + // it's better for the race detector to panic if we're wrong. + t.gotFirstResponseByte = now() } -// TLSHandshakeStart hook. -func (t *Tracer) TLSHandshakeStart() { - t.tlsHandshakeStart = time.Now() -} +// Done calculates all metrics and should be called when the request is finished. +func (t *Tracer) Done() Trail { + done := time.Now() -// TLSHandshakeDone hook. -func (t *Tracer) TLSHandshakeDone(state tls.ConnectionState, err error) { - t.tlsHandshakeDone = time.Now() + trail := Trail{ + ConnReused: t.connReused, + ConnRemoteAddr: t.connRemoteAddr, + } - if err != nil { - t.protoError = err + if t.gotConn != 0 && t.getConn != 0 { + trail.Blocked = time.Duration(t.gotConn - t.getConn) } -} + if t.connectDone != 0 && t.connectStart != 0 { + trail.Connecting = time.Duration(t.connectDone - t.connectStart) + } + if t.tlsHandshakeDone != 0 && t.tlsHandshakeStart != 0 { + trail.TLSHandshaking = time.Duration(t.tlsHandshakeDone - t.tlsHandshakeStart) + } + if t.wroteRequest != 0 { + trail.Sending = time.Duration(t.wroteRequest - t.connectDone) + // If the request was sent over TLS, we need to use + // TLS Handshake Done time to calculate sending duration + if t.tlsHandshakeDone != 0 { + trail.Sending = time.Duration(t.wroteRequest - t.tlsHandshakeDone) + } -// WroteRequest hook. -func (t *Tracer) WroteRequest(info httptrace.WroteRequestInfo) { - t.wroteRequest = time.Now() - if info.Err != nil { - t.protoError = info.Err + if t.gotFirstResponseByte != 0 { + trail.Waiting = time.Duration(t.gotFirstResponseByte - t.wroteRequest) + } + } + if t.gotFirstResponseByte != 0 { + trail.Receiving = done.Sub(time.Unix(0, t.gotFirstResponseByte)) } + + // Calculate total times using adjusted values. + trail.EndTime = done + trail.Duration = trail.Sending + trail.Waiting + trail.Receiving + trail.StartTime = trail.EndTime.Add(-trail.Duration) + + return trail } diff --git a/lib/netext/tracer_test.go b/lib/netext/tracer_test.go index 94dae5249db..4013f6d9e08 100644 --- a/lib/netext/tracer_test.go +++ b/lib/netext/tracer_test.go @@ -96,3 +96,6 @@ func TestTracer(t *testing.T) { }) } } + +//TODO: test what happens with redirects +//TODO: test what happens with HTTP/HTTPS proxies From e78459601b6db2b634556eba29f5d5591ea214d8 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 28 Mar 2018 01:17:18 +0300 Subject: [PATCH 03/12] Add -race when testing --- .circleci/config.yml | 2 +- CONTRIBUTING.md | 10 +++++----- Makefile | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index e6ce4612aa7..518dead0b7b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -37,7 +37,7 @@ jobs: export PATH=$GOPATH/bin:$PATH echo "mode: set" > coverage.txt for pkg in $(go list ./... | grep -v vendor); do - go test -timeout 30s -coverprofile=$(echo $pkg | tr / -).coverage $pkg + go test -race -timeout 180s -coverprofile=$(echo $pkg | tr / -).coverage $pkg done grep -h -v "^mode:" *.coverage >> coverage.txt rm -f *.coverage diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 09649abaa40..2d8b4332d7e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -24,7 +24,7 @@ Contributing code If you'd like to contribute code to k6, this is the basic procedure. Make sure to follow the [style guide](#style-guide) described below! 1. Find an issue you'd like to fix. If there is none already, or you'd like to add a feature, please open one and we can talk about how to do it. - + Remember, there's more to software development than code; if it's not properly planned, stuff gets messy real fast. 2. Create a fork and open a feature branch - `feature/my-cool-feature` is the classic way to name these, but it really doesn't matter. @@ -74,22 +74,22 @@ vendorcheck ./... To exercise the entire test suite: ```bash -go test ./... +go test -race ./... ``` To run the tests of a specific package: ```bash -go test github.com/loadimpact/k6/core +go test -race github.com/loadimpact/k6/core ``` To run just a specific test case use `-run` and pass in a regex that matches the name of the test: ```bash -go test ./... -run ^TestEngineRun$ +go test -race ./... -run ^TestEngineRun$ ``` Combining the two above we can run a specific test case in a specific package: ```bash -go test github.com/loadimpact/k6/core -run ^TestEngineRun$ +go test -race github.com/loadimpact/k6/core -run ^TestEngineRun$ ``` Style guide diff --git a/Makefile b/Makefile index d78abb1f069..a281cfaa165 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ format: .PHONY: check check: gometalinter --deadline 10m --config gometalinter.json --vendor ./... - go test -timeout 30s ./... + go test -race -timeout 180s ./... .PHONY: docs docs: From b27d2174bceaed717064aa4e66ee5bba986fbbb5 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 28 Mar 2018 15:06:12 +0300 Subject: [PATCH 04/12] Improve netext.Tracer error handling and testing --- lib/netext/tracer.go | 2 + lib/netext/tracer_test.go | 78 +++++++++++++++++++++++++++------------ 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/lib/netext/tracer.go b/lib/netext/tracer.go index 46a1b7c1787..e27dbdee389 100644 --- a/lib/netext/tracer.go +++ b/lib/netext/tracer.go @@ -51,6 +51,7 @@ type Trail struct { // Detailed connection information. ConnReused bool ConnRemoteAddr net.Addr + Errors []error } // Samples returns a slice with all of the pre-calculated sample values for the request @@ -237,6 +238,7 @@ func (t *Tracer) Done() Trail { trail := Trail{ ConnReused: t.connReused, ConnRemoteAddr: t.connRemoteAddr, + Errors: t.protoErrors, } if t.gotConn != 0 && t.getConn != 0 { diff --git a/lib/netext/tracer_test.go b/lib/netext/tracer_test.go index 4013f6d9e08..25715b88b0b 100644 --- a/lib/netext/tracer_test.go +++ b/lib/netext/tracer_test.go @@ -22,46 +22,70 @@ package netext import ( "context" + "fmt" "io" "io/ioutil" "net" "net/http" "net/http/httptest" + "runtime" "testing" "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/stats" "github.com/mccutchen/go-httpbin/httpbin" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestTracer(t *testing.T) { + t.Parallel() srv := httptest.NewTLSServer(httpbin.NewHTTPBin().Handler()) defer srv.Close() - client := srv.Client() - client.Transport.(*http.Transport).DialContext = NewDialer(net.Dialer{}).DialContext + transport, ok := srv.Client().Transport.(*http.Transport) + assert.True(t, ok) + transport.DialContext = NewDialer(net.Dialer{}).DialContext - for _, isReuse := range []bool{false, true} { - name := "First" - if isReuse { - name = "Reuse" + var prev int64 + assertLaterOrZero := func(t *testing.T, val int64, canBeZero bool) { + if canBeZero && val == 0 { + return } - t.Run(name, func(t *testing.T) { + if prev > val { + _, file, line, _ := runtime.Caller(1) + t.Errorf("Expected %d to be greater or equal to %d (from %s:%d)", val, prev, file, line) + return + } + prev = val + } + + for tnum, isReuse := range []bool{false, true, true} { + t.Run(fmt.Sprintf("Test #%d", tnum), func(t *testing.T) { + // Do not enable parallel testing, test relies on sequential execution tracer := &Tracer{} req, err := http.NewRequest("GET", srv.URL+"/get", nil) - if !assert.NoError(t, err) { - return - } - res, err := client.Do(req.WithContext(WithTracer(context.Background(), tracer))) - if !assert.NoError(t, err) { - return - } + require.NoError(t, err) + + res, err := transport.RoundTrip(req.WithContext(WithTracer(context.Background(), tracer))) + require.NoError(t, err) + _, err = io.Copy(ioutil.Discard, res.Body) assert.NoError(t, err) assert.NoError(t, res.Body.Close()) samples := tracer.Done().Samples(map[string]string{"tag": "value"}) + assert.Empty(t, tracer.protoErrors) + assertLaterOrZero(t, tracer.getConn, isReuse) + assertLaterOrZero(t, tracer.connectStart, isReuse) + assertLaterOrZero(t, tracer.connectDone, isReuse) + assertLaterOrZero(t, tracer.tlsHandshakeStart, isReuse) + assertLaterOrZero(t, tracer.tlsHandshakeDone, isReuse) + assertLaterOrZero(t, tracer.gotConn, false) + assertLaterOrZero(t, tracer.wroteRequest, false) + assertLaterOrZero(t, tracer.gotFirstResponseByte, false) + assertLaterOrZero(t, now(), false) + assert.Len(t, samples, 8) seenMetrics := map[*stats.Metric]bool{} for i, s := range samples { @@ -75,7 +99,7 @@ func TestTracer(t *testing.T) { case metrics.HTTPReqs: assert.Equal(t, 1.0, s.Value) assert.Equal(t, 0, i, "`HTTPReqs` is reported before the other HTTP metrics") - case metrics.HTTPReqConnecting: + case metrics.HTTPReqConnecting, metrics.HTTPReqTLSHandshaking: if isReuse { assert.Equal(t, 0.0, s.Value) break @@ -83,12 +107,6 @@ func TestTracer(t *testing.T) { fallthrough case metrics.HTTPReqDuration, metrics.HTTPReqBlocked, metrics.HTTPReqSending, metrics.HTTPReqWaiting, metrics.HTTPReqReceiving: assert.True(t, s.Value > 0.0, "%s is <= 0", s.Metric.Name) - case metrics.HTTPReqTLSHandshaking: - if !isReuse { - assert.True(t, s.Value > 0.0, "%s is <= 0", s.Metric.Name) - continue - } - assert.True(t, s.Value == 0.0, "%s is <> 0", s.Metric.Name) default: t.Errorf("unexpected metric: %s", s.Metric.Name) } @@ -97,5 +115,19 @@ func TestTracer(t *testing.T) { } } -//TODO: test what happens with redirects -//TODO: test what happens with HTTP/HTTPS proxies +func TestTracerError(t *testing.T) { + t.Parallel() + srv := httptest.NewTLSServer(httpbin.NewHTTPBin().Handler()) + defer srv.Close() + + tracer := &Tracer{} + req, err := http.NewRequest("GET", srv.URL+"/get", nil) + require.NoError(t, err) + + _, err = http.DefaultTransport.RoundTrip(req.WithContext(WithTracer(context.Background(), tracer))) + assert.Error(t, err) + + assert.Len(t, tracer.protoErrors, 1) + assert.Error(t, tracer.protoErrors[0]) + assert.Equal(t, tracer.protoErrors, tracer.Done().Errors) +} From d74251123e7d2462d4b1505c382cff69fe6cd13f Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 28 Mar 2018 16:11:07 +0300 Subject: [PATCH 05/12] Fix data race from shared rand source that was not thread safe --- js/bundle.go | 2 +- js/common/randsource.go | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/js/bundle.go b/js/bundle.go index 433ccaee2e4..2eeaba43ee6 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -215,7 +215,7 @@ func (b *Bundle) Instantiate() (*BundleInstance, error) { // of other things, will potentially thrash data and makes a mess in it if the operation fails. func (b *Bundle) instantiate(rt *goja.Runtime, init *InitContext) error { rt.SetFieldNameMapper(common.FieldNameMapper{}) - rt.SetRandSource(common.DefaultRandSource) + rt.SetRandSource(common.NewRandSource()) if _, err := rt.RunProgram(jslib.CoreJS); err != nil { return err diff --git a/js/common/randsource.go b/js/common/randsource.go index 4728b841044..9ae9962d701 100644 --- a/js/common/randsource.go +++ b/js/common/randsource.go @@ -23,18 +23,20 @@ package common import ( crand "crypto/rand" "encoding/binary" + "fmt" "math/rand" "github.com/dop251/goja" - "github.com/pkg/errors" ) -var DefaultRandSource = NewRandSource() - +// NewRandSource is copied from goja's source code: +// https://github.com/dop251/goja/blob/master/goja/main.go#L44 +// The returned RandSource is NOT safe for concurrent use: +// https://golang.org/pkg/math/rand/#NewSource func NewRandSource() goja.RandSource { var seed int64 if err := binary.Read(crand.Reader, binary.LittleEndian, &seed); err != nil { - panic(errors.New("Couldn't read bytes for random seed")) + panic(fmt.Errorf("Could not read random bytes: %v", err)) } return rand.New(rand.NewSource(seed)).Float64 } From ea3af91fa76cb56339b5605d5a6f902a960d4b14 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 28 Mar 2018 17:01:49 +0300 Subject: [PATCH 06/12] Refactor TestSentReceivedMetrics() --- core/engine_test.go | 47 +++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/core/engine_test.go b/core/engine_test.go index 02e7d9b2d49..cccdcbb6ba5 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -530,7 +530,7 @@ func getMetricSum(samples []stats.Sample, name string) (result float64) { return } func TestSentReceivedMetrics(t *testing.T) { - //t.Parallel() + t.Parallel() srv := httptest.NewServer(httpbin.NewHTTPBin().Handler()) defer srv.Close() @@ -539,25 +539,28 @@ func TestSentReceivedMetrics(t *testing.T) { } expectedSingleData := 50000.0 - r, err := js.New(&lib.SourceData{ - Filename: "/script.js", - Data: []byte(` - import http from "k6/http"; - export default function() { - http.get(` + burl(10000) + `); - http.batch([` + burl(10000) + `,` + burl(20000) + `,` + burl(10000) + `]); - } - `), - }, afero.NewMemMapFs(), lib.RuntimeOptions{}) - require.NoError(t, err) - testCases := []struct{ Iterations, VUs int64 }{ - {1, 1}, {1, 2}, {2, 1}, {2, 2}, {3, 1}, {5, 2}, {10, 3}, {25, 2}, + type testCase struct{ Iterations, VUs int64 } + testCases := []testCase{ + {1, 1}, {1, 2}, {2, 1}, {2, 2}, {3, 1}, {5, 2}, {10, 3}, {25, 2}, {50, 5}, } - for testn, tc := range testCases { - t.Run(fmt.Sprintf("SentReceivedMetrics_%d", testn), func(t *testing.T) { + getTestCase := func(t *testing.T, tc testCase) func(t *testing.T) { + return func(t *testing.T) { + //TODO: figure out why it fails if we uncomment this: //t.Parallel() + r, err := js.New(&lib.SourceData{ + Filename: "/script.js", + Data: []byte(` + import http from "k6/http"; + export default function() { + http.get(` + burl(10000) + `); + http.batch([` + burl(10000) + `,` + burl(20000) + `,` + burl(10000) + `]); + } + `), + }, afero.NewMemMapFs(), lib.RuntimeOptions{}) + require.NoError(t, err) + options := lib.Options{ Iterations: null.IntFrom(tc.Iterations), VUs: null.IntFrom(tc.VUs), @@ -595,6 +598,16 @@ func TestSentReceivedMetrics(t *testing.T) { receivedData, ) } - }) + } } + + // This Run will not return until the parallel subtests complete. + t.Run("group", func(t *testing.T) { + for testn, tc := range testCases { + t.Run( + fmt.Sprintf("SentReceivedMetrics_%d(%d, %d)", testn, tc.Iterations, tc.VUs), + getTestCase(t, tc), + ) + } + }) } From 67b6e8ce1bff4c30668d606a866a40ae26e4cf68 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 29 Mar 2018 13:33:41 +0300 Subject: [PATCH 07/12] Fix netext.Tracer data races for cancelled requests --- lib/netext/tracer.go | 62 ++++++++++++++++++++++++--------------- lib/netext/tracer_test.go | 35 ++++++++++++++++++++++ 2 files changed, 73 insertions(+), 24 deletions(-) diff --git a/lib/netext/tracer.go b/lib/netext/tracer.go index e27dbdee389..b925b4508cc 100644 --- a/lib/netext/tracer.go +++ b/lib/netext/tracer.go @@ -167,9 +167,7 @@ func (t *Tracer) ConnectDone(network, addr string, err error) { // If the connection is reused, this won't be called. Otherwise, // it will be called after ConnectDone() and before TLSHandshakeDone(). func (t *Tracer) TLSHandshakeStart() { - // This shouldn't be called multiple times so no synchronization here, - // it's better for the race detector to panic if we're wrong. - t.tlsHandshakeStart = now() + atomic.CompareAndSwapInt64(&t.tlsHandshakeStart, 0, now()) } // TLSHandshakeDone is called after the TLS handshake with either the @@ -178,10 +176,10 @@ func (t *Tracer) TLSHandshakeStart() { // // If the connection is reused, this won't be called. Otherwise, // it will be called after TLSHandshakeStart() and before GotConn(). +// If the request was cancelled, this could be called after the +// RoundTrip() method has returned. func (t *Tracer) TLSHandshakeDone(state tls.ConnectionState, err error) { - // This shouldn't be called multiple times so no synchronization here, - // it's better for the race detector to panic if we're wrong. - t.tlsHandshakeDone = now() + atomic.CompareAndSwapInt64(&t.tlsHandshakeDone, 0, now()) if err != nil { t.addError(err) @@ -213,8 +211,6 @@ func (t *Tracer) GotConn(info httptrace.GotConnInfo) { // WroteRequest is called with the result of writing the // request and any body. It may be called multiple times // in the case of retried requests. -// -// func (t *Tracer) WroteRequest(info httptrace.WroteRequestInfo) { atomic.CompareAndSwapInt64(&t.wroteRequest, 0, now()) @@ -225,10 +221,10 @@ func (t *Tracer) WroteRequest(info httptrace.WroteRequestInfo) { // GotFirstResponseByte is called when the first byte of the response // headers is available. +// If the request was cancelled, this could be called after the +// RoundTrip() method has returned. func (t *Tracer) GotFirstResponseByte() { - // This shouldn't be called multiple times so no synchronization here, - // it's better for the race detector to panic if we're wrong. - t.gotFirstResponseByte = now() + atomic.CompareAndSwapInt64(&t.gotFirstResponseByte, 0, now()) } // Done calculates all metrics and should be called when the request is finished. @@ -238,32 +234,44 @@ func (t *Tracer) Done() Trail { trail := Trail{ ConnReused: t.connReused, ConnRemoteAddr: t.connRemoteAddr, - Errors: t.protoErrors, } if t.gotConn != 0 && t.getConn != 0 { trail.Blocked = time.Duration(t.gotConn - t.getConn) } - if t.connectDone != 0 && t.connectStart != 0 { - trail.Connecting = time.Duration(t.connectDone - t.connectStart) + + // It's possible for some of the methods of httptrace.ClientTrace to + // actually be called after the http.Client or http.RoundTripper have + // already returned our result and we've called Done(). This happens + // mostly for cancelled requests, but we have to use atomics here as + // well (or use global Tracer locking) so we can avoid data races. + connectStart := atomic.LoadInt64(&t.connectStart) + connectDone := atomic.LoadInt64(&t.connectDone) + tlsHandshakeStart := atomic.LoadInt64(&t.tlsHandshakeStart) + tlsHandshakeDone := atomic.LoadInt64(&t.tlsHandshakeDone) + wroteRequest := atomic.LoadInt64(&t.wroteRequest) + gotFirstResponseByte := atomic.LoadInt64(&t.gotFirstResponseByte) + + if connectDone != 0 && connectStart != 0 { + trail.Connecting = time.Duration(connectDone - connectStart) } - if t.tlsHandshakeDone != 0 && t.tlsHandshakeStart != 0 { - trail.TLSHandshaking = time.Duration(t.tlsHandshakeDone - t.tlsHandshakeStart) + if tlsHandshakeDone != 0 && tlsHandshakeStart != 0 { + trail.TLSHandshaking = time.Duration(tlsHandshakeDone - tlsHandshakeStart) } - if t.wroteRequest != 0 { - trail.Sending = time.Duration(t.wroteRequest - t.connectDone) + if wroteRequest != 0 { + trail.Sending = time.Duration(wroteRequest - connectDone) // If the request was sent over TLS, we need to use // TLS Handshake Done time to calculate sending duration - if t.tlsHandshakeDone != 0 { - trail.Sending = time.Duration(t.wroteRequest - t.tlsHandshakeDone) + if tlsHandshakeDone != 0 { + trail.Sending = time.Duration(wroteRequest - tlsHandshakeDone) } - if t.gotFirstResponseByte != 0 { - trail.Waiting = time.Duration(t.gotFirstResponseByte - t.wroteRequest) + if gotFirstResponseByte != 0 { + trail.Waiting = time.Duration(gotFirstResponseByte - wroteRequest) } } - if t.gotFirstResponseByte != 0 { - trail.Receiving = done.Sub(time.Unix(0, t.gotFirstResponseByte)) + if gotFirstResponseByte != 0 { + trail.Receiving = done.Sub(time.Unix(0, gotFirstResponseByte)) } // Calculate total times using adjusted values. @@ -271,5 +279,11 @@ func (t *Tracer) Done() Trail { trail.Duration = trail.Sending + trail.Waiting + trail.Receiving trail.StartTime = trail.EndTime.Add(-trail.Duration) + t.protoErrorsMutex.Lock() + defer t.protoErrorsMutex.Unlock() + if len(t.protoErrors) > 0 { + trail.Errors = append([]error{}, t.protoErrors...) + } + return trail } diff --git a/lib/netext/tracer_test.go b/lib/netext/tracer_test.go index 25715b88b0b..4988b4d864d 100644 --- a/lib/netext/tracer_test.go +++ b/lib/netext/tracer_test.go @@ -25,11 +25,13 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "net" "net/http" "net/http/httptest" "runtime" "testing" + "time" "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/stats" @@ -131,3 +133,36 @@ func TestTracerError(t *testing.T) { assert.Error(t, tracer.protoErrors[0]) assert.Equal(t, tracer.protoErrors, tracer.Done().Errors) } + +func TestCancelledRequest(t *testing.T) { + t.Parallel() + srv := httptest.NewTLSServer(httpbin.NewHTTPBin().Handler()) + defer srv.Close() + + cancelTest := func(t *testing.T) { + t.Parallel() + tracer := &Tracer{} + req, err := http.NewRequest("GET", srv.URL+"/delay/1", nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(WithTracer(req.Context(), tracer)) + req = req.WithContext(ctx) + go func() { + time.Sleep(time.Duration(rand.Int31n(50)) * time.Millisecond) + cancel() + }() + + resp, err := srv.Client().Transport.RoundTrip(req) + trail := tracer.Done() + if resp == nil && err == nil && len(trail.Errors) == 0 { + t.Errorf("Expected either a RoundTrip response, error or trail errors but got %#v, %#v and %#v", resp, err, trail.Errors) + } + } + + // This Run will not return until the parallel subtests complete. + t.Run("group", func(t *testing.T) { + for i := 0; i < 200; i++ { + t.Run(fmt.Sprintf("TestCancelledRequest_%d", i), cancelTest) + } + }) +} From c68ff5f597d6ffe62716b63776e5a09f9d552200 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Fri, 30 Mar 2018 10:22:35 +0300 Subject: [PATCH 08/12] Fix data races when using the shared DefaultCompiler This is just a temporary fix, we should probably remove the shared DefaultCompiler altogether if possible. If the babel parsing is taking the majority of the time, we could just do that in an init function and then create "compiler" runtimes based on it. --- js/compiler/compiler.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/js/compiler/compiler.go b/js/compiler/compiler.go index 6e1d838596c..e9894fb8af5 100644 --- a/js/compiler/compiler.go +++ b/js/compiler/compiler.go @@ -21,6 +21,7 @@ package compiler import ( + "sync" "time" "github.com/GeertJohan/go.rice" @@ -52,6 +53,7 @@ type Compiler struct { // JS pointers. this goja.Value transform goja.Callable + mutex sync.Mutex //TODO: cache goja.CompileAST() in an init() function? } // Constructs a new compiler. @@ -78,6 +80,9 @@ func (c *Compiler) Transform(src, filename string) (code string, srcmap SourceMa } opts["filename"] = filename + c.mutex.Lock() + defer c.mutex.Unlock() + startTime := time.Now() v, err := c.transform(c.this, c.vm.ToValue(src), c.vm.ToValue(opts)) if err != nil { From f157788f366a880249bbdf6a82d045d245565911 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Mon, 2 Apr 2018 14:42:09 +0300 Subject: [PATCH 09/12] Fix data races with shared global metrics I don't think this would have been a problem with real-world usage of k6 (for now), but the way metrics are defined should probably be refactored to avoid shared global variables. --- core/engine.go | 4 ++-- core/engine_test.go | 2 +- lib/metrics/metrics.go | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/engine.go b/core/engine.go index 8911f04c9aa..4aad6dcab0a 100644 --- a/core/engine.go +++ b/core/engine.go @@ -57,7 +57,7 @@ type Engine struct { logger *log.Logger Metrics map[string]*stats.Metric - MetricsLock sync.RWMutex + MetricsLock sync.Mutex // Assigned to metrics upon first received sample. thresholds map[string]stats.Thresholds @@ -319,7 +319,7 @@ func (e *Engine) processSamples(samples ...stats.Sample) { for i, sample := range samples { m, ok := e.Metrics[sample.Metric.Name] if !ok { - m = sample.Metric + m = stats.New(sample.Metric.Name, sample.Metric.Type, sample.Metric.Contains) m.Thresholds = e.thresholds[m.Name] m.Submetrics = e.submetrics[m.Name] e.Metrics[m.Name] = m diff --git a/core/engine_test.go b/core/engine_test.go index cccdcbb6ba5..9abe945d012 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -548,7 +548,7 @@ func TestSentReceivedMetrics(t *testing.T) { getTestCase := func(t *testing.T, tc testCase) func(t *testing.T) { return func(t *testing.T) { //TODO: figure out why it fails if we uncomment this: - //t.Parallel() + t.Parallel() r, err := js.New(&lib.SourceData{ Filename: "/script.js", Data: []byte(` diff --git a/lib/metrics/metrics.go b/lib/metrics/metrics.go index 72af67876b4..20fd28cbaf0 100644 --- a/lib/metrics/metrics.go +++ b/lib/metrics/metrics.go @@ -24,6 +24,8 @@ import ( "github.com/loadimpact/k6/stats" ) +//TODO: refactor this, using non thread-safe global variables seems like a bad idea for various reasons... + var ( // Engine-emitted. VUs = stats.New("vus", stats.Gauge) From 6083775b1b2b5938e75a864c4f96e005e53698e7 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Mon, 2 Apr 2018 16:51:16 +0300 Subject: [PATCH 10/12] Add HTTPS requests to sent/received data test --- core/engine_test.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/core/engine_test.go b/core/engine_test.go index 9abe945d012..bef2f77e0fc 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -23,17 +23,16 @@ package core import ( "context" "fmt" - "net/http/httptest" "testing" "time" "github.com/loadimpact/k6/core/local" "github.com/loadimpact/k6/js" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/stats/dummy" - "github.com/mccutchen/go-httpbin/httpbin" log "github.com/sirupsen/logrus" logtest "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/afero" @@ -531,13 +530,17 @@ func getMetricSum(samples []stats.Sample, name string) (result float64) { } func TestSentReceivedMetrics(t *testing.T) { t.Parallel() - srv := httptest.NewServer(httpbin.NewHTTPBin().Handler()) - defer srv.Close() - - burl := func(bytecount uint32) string { - return fmt.Sprintf(`"%s/bytes/%d"`, srv.URL, bytecount) - } - + tb := testutils.NewHTTPMultiBin(t) + defer tb.Cleanup() + + script := []byte(tb.Replacer.Replace(` + import http from "k6/http"; + export default function() { + http.get("HTTPBIN_URL/bytes/5000"); + http.get("HTTPSBIN_URL/bytes/5000"); + http.batch(["HTTPBIN_URL/bytes/10000", "HTTPBIN_URL/bytes/20000", "HTTPSBIN_URL/bytes/10000"]); + } + `)) expectedSingleData := 50000.0 type testCase struct{ Iterations, VUs int64 } @@ -547,27 +550,24 @@ func TestSentReceivedMetrics(t *testing.T) { getTestCase := func(t *testing.T, tc testCase) func(t *testing.T) { return func(t *testing.T) { - //TODO: figure out why it fails if we uncomment this: t.Parallel() - r, err := js.New(&lib.SourceData{ - Filename: "/script.js", - Data: []byte(` - import http from "k6/http"; - export default function() { - http.get(` + burl(10000) + `); - http.batch([` + burl(10000) + `,` + burl(20000) + `,` + burl(10000) + `]); - } - `), - }, afero.NewMemMapFs(), lib.RuntimeOptions{}) + r, err := js.New( + &lib.SourceData{Filename: "/script.js", Data: script}, + afero.NewMemMapFs(), + lib.RuntimeOptions{}, + ) require.NoError(t, err) options := lib.Options{ Iterations: null.IntFrom(tc.Iterations), VUs: null.IntFrom(tc.VUs), VUsMax: null.IntFrom(tc.VUs), + Hosts: tb.Dialer.Hosts, + InsecureSkipTLSVerify: null.BoolFrom(true), } //TODO: test for differences with NoConnectionReuse enabled and disabled + r.SetOptions(options) engine, err := NewEngine(local.New(r), options) require.NoError(t, err) @@ -579,7 +579,7 @@ func TestSentReceivedMetrics(t *testing.T) { go func() { errC <- engine.Run(ctx) }() select { - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): cancel() t.Fatal("Test timed out") case err := <-errC: From 36dfe990d647b2b9bf0f934efcbaaf855f5f5f96 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 4 Apr 2018 13:00:19 +0300 Subject: [PATCH 11/12] Fix a typo --- lib/models.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/models.go b/lib/models.go index d09e87dae38..c187b5b2382 100644 --- a/lib/models.go +++ b/lib/models.go @@ -165,7 +165,7 @@ func (g *Group) Group(name string) (*Group, error) { return group, nil } -// Check creates a chold check belonging to this group. +// Check creates a child check belonging to this group. // This is safe to call from multiple goroutines simultaneously. func (g *Group) Check(name string) (*Check, error) { g.checkMutex.Lock() From 975dc9f01fb95a3392fae4886f6ce58228a80d41 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 4 Apr 2018 13:05:10 +0300 Subject: [PATCH 12/12] Update upcoming.md --- release notes/upcoming.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/release notes/upcoming.md b/release notes/upcoming.md index efb4ef944f8..0d6b145ffe8 100644 --- a/release notes/upcoming.md +++ b/release notes/upcoming.md @@ -34,4 +34,8 @@ export default function() { ## UX -* Clearer error message when using `open` function outside init context (#563) \ No newline at end of file +* Clearer error message when using `open` function outside init context (#563) + +## Internals + +* Fixed various data races and enabled automated testing with `-race` (#564) \ No newline at end of file