Skip to content

Commit

Permalink
Wire up skipping response comparing for later
Browse files Browse the repository at this point in the history
  • Loading branch information
shazow committed Jan 23, 2020
1 parent e1a8964 commit 1490674
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
8 changes: 6 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Options struct {
Timeout string `long:"timeout" description:"Abort request after duration" default:"30s"`
StopAfter string `long:"stop-after" description:"Stop after N requests per endpoint, N can be a number or duration."`
Concurrency int `long:"concurrency" description:"Concurrent requests per endpoint" default:"1"`
//CompareResponse string `long:"compare-response" description:"Load all response bodies and compare between endpoints, will affect throughput." default:"on"`

//Source string `long:"source" description:"Where requests come from (options: stdin-post, stdin-get)" default:"stdin-jsons"` // Someday: stdin-tcpdump, file://foo.json, ws://remote-endpoint

Expand Down Expand Up @@ -135,8 +136,12 @@ func run(ctx context.Context, options Options) error {

g, ctx := errgroup.WithContext(ctx)

respBuffer := options.Concurrency * 4
if respBuffer < 50 {
respBuffer = 50
}
// responses is closed when clients are shut down
responses := make(chan Response, options.Concurrency*4)
responses := make(chan Response, respBuffer)

// Launch clients
clients, err := NewClients(options.Args.Endpoints, options.Concurrency, timeout)
Expand All @@ -148,7 +153,6 @@ func run(ctx context.Context, options Options) error {
g.Go(func() error {
return r.Serve(ctx, responses)
})

if len(options.Verbose) > 0 {
r.MismatchedResponse = func(resps []Response) {
logger.Info().Int("id", int(resps[0].ID)).Msgf("mismatched responses: %s", Responses(resps).String())
Expand Down
5 changes: 5 additions & 0 deletions report.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type report struct {
// MismatchedResponse is called when a response set does not match across clients
MismatchedResponse func([]Response)

skipCompare bool
once sync.Once
pendingResponses map[requestID][]Response

Expand Down Expand Up @@ -106,6 +107,10 @@ func (r *report) compareResponses(resp Response) {

func (r *report) handle(resp Response) error {
r.count(resp.Err, resp.Elapsed)
if r.skipCompare {
return nil
}

r.compareResponses(resp)
return nil
}
Expand Down
17 changes: 12 additions & 5 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -31,6 +32,10 @@ func NewTransport(endpoint string, timeout time.Duration) (Transport, error) {
t = &httpTransport{
Client: http.Client{Timeout: timeout},
endpoint: url.String(),
bodyReader: func(body io.ReadCloser) ([]byte, error) {
defer body.Close()
return ioutil.ReadAll(body)
},
}
case "ws", "wss":
// TODO: Implement
Expand Down Expand Up @@ -69,6 +74,8 @@ type httpTransport struct {

getHost string
getPath string

bodyReader func(io.ReadCloser) ([]byte, error)
}

func (t *httpTransport) Mode(m string) error {
Expand Down Expand Up @@ -107,12 +114,12 @@ func (t *httpTransport) Send(body []byte) ([]byte, error) {
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("bad status code: %d", resp.StatusCode)
}
// TODO: Avoid reading the entire body into memory
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
if t.bodyReader == nil {
resp.Body.Close()
return nil, nil
}
return buf, nil
// TODO: Avoid reading the whole body into memory
return t.bodyReader(resp.Body)
}

type websocketTransport struct {
Expand Down

0 comments on commit 1490674

Please sign in to comment.