From 14906747f5e8a19ea622d5f723b83798347c3f6c Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Thu, 23 Jan 2020 16:34:42 -0500 Subject: [PATCH] Wire up skipping response comparing for later --- main.go | 8 ++++++-- report.go | 5 +++++ transport.go | 17 ++++++++++++----- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index ae5b871..1ecfc24 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ type Options struct { Timeout string `long:"timeout" description:"Abort request after duration" default:"30s"` StopAfter string `long:"stop-after" description:"Stop after N requests per endpoint, N can be a number or duration."` Concurrency int `long:"concurrency" description:"Concurrent requests per endpoint" default:"1"` + //CompareResponse string `long:"compare-response" description:"Load all response bodies and compare between endpoints, will affect throughput." default:"on"` //Source string `long:"source" description:"Where requests come from (options: stdin-post, stdin-get)" default:"stdin-jsons"` // Someday: stdin-tcpdump, file://foo.json, ws://remote-endpoint @@ -135,8 +136,12 @@ func run(ctx context.Context, options Options) error { g, ctx := errgroup.WithContext(ctx) + respBuffer := options.Concurrency * 4 + if respBuffer < 50 { + respBuffer = 50 + } // responses is closed when clients are shut down - responses := make(chan Response, options.Concurrency*4) + responses := make(chan Response, respBuffer) // Launch clients clients, err := NewClients(options.Args.Endpoints, options.Concurrency, timeout) @@ -148,7 +153,6 @@ func run(ctx context.Context, options Options) error { g.Go(func() error { return r.Serve(ctx, responses) }) - if len(options.Verbose) > 0 { r.MismatchedResponse = func(resps []Response) { logger.Info().Int("id", int(resps[0].ID)).Msgf("mismatched responses: %s", Responses(resps).String()) diff --git a/report.go b/report.go index dcee721..84d8877 100644 --- a/report.go +++ b/report.go @@ -15,6 +15,7 @@ type report struct { // MismatchedResponse is called when a response set does not match across clients MismatchedResponse func([]Response) + skipCompare bool once sync.Once pendingResponses map[requestID][]Response @@ -106,6 +107,10 @@ func (r *report) compareResponses(resp Response) { func (r *report) handle(resp Response) error { r.count(resp.Err, resp.Elapsed) + if r.skipCompare { + return nil + } + r.compareResponses(resp) return nil } diff --git a/transport.go b/transport.go index 37f5fe6..a437ae4 100644 --- a/transport.go +++ b/transport.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "io" "io/ioutil" "net/http" "net/url" @@ -31,6 +32,10 @@ func NewTransport(endpoint string, timeout time.Duration) (Transport, error) { t = &httpTransport{ Client: http.Client{Timeout: timeout}, endpoint: url.String(), + bodyReader: func(body io.ReadCloser) ([]byte, error) { + defer body.Close() + return ioutil.ReadAll(body) + }, } case "ws", "wss": // TODO: Implement @@ -69,6 +74,8 @@ type httpTransport struct { getHost string getPath string + + bodyReader func(io.ReadCloser) ([]byte, error) } func (t *httpTransport) Mode(m string) error { @@ -107,12 +114,12 @@ func (t *httpTransport) Send(body []byte) ([]byte, error) { if resp.StatusCode >= 400 { return nil, fmt.Errorf("bad status code: %d", resp.StatusCode) } - // TODO: Avoid reading the entire body into memory - buf, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err + if t.bodyReader == nil { + resp.Body.Close() + return nil, nil } - return buf, nil + // TODO: Avoid reading the whole body into memory + return t.bodyReader(resp.Body) } type websocketTransport struct {