Skip to content

Commit

Permalink
wip: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
shazow committed Nov 20, 2019
1 parent 990d7e2 commit 6c8edda
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 15 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# versus
# Versus

versus takes a stream of requests and runs them against N endpoints
Versus takes a stream of requests and runs them against N endpoints
simultaneously, comparing the output and timing.


## Design


28 changes: 22 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,28 @@ import (
"golang.org/x/sync/errgroup"
)

type clientStats struct {
NumTotal int // Number of requests
NumErrors int // Number of errors
TimeTotal time.Duration // Total duration of requests
}

func (stats *clientStats) Add(err error, elapsed time.Duration) {
stats.NumTotal += 1
if err != nil {
stats.NumErrors += 1
}
stats.TimeTotal += elapsed
}

const chanBuffer = 20

type Client struct {
Endpoint string
Concurrency int // Number of goroutines to make requests with. Must be >=1.
In chan Request
Report report
Out chan Response
Stats clientStats
}

// Serve starts the async request and response goroutine consumers.
Expand All @@ -22,13 +37,13 @@ func (client *Client) Serve(ctx context.Context) error {

g, ctx := errgroup.WithContext(ctx)

go func() {
// Consume responses
rep := report{}
g.Go(func() error {
for resp := range respCh {
rep.Add(resp.Err, resp.Elapsed)
client.Stats.Add(resp.Err, resp.Elapsed)
// TODO: Relay response to client.Out
}
}()
return nil
})

if client.Concurrency < 1 {
client.Concurrency = 1
Expand All @@ -53,6 +68,7 @@ func (client *Client) Serve(ctx context.Context) error {
}

err := g.Wait()

close(respCh)
return err
}
Expand Down
7 changes: 3 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func main() {
Endpoint: endpoint,
Concurrency: options.Concurrency,
In: make(chan Request, chanBuffer),
Out: make(chan Response, chanBuffer),
}
clients = append(clients, c)
g.Go(func() error {
Expand All @@ -94,12 +95,10 @@ func main() {
exit(3, "failed: %s", err)
}

// TODO: Combine reports
finalReport := report{}
// TODO: Make a report
for _, client := range clients {
client.Report.MergeInto(&finalReport)
fmt.Println(client.Stats)
}
fmt.Println(finalReport)
}

// pump takes lines from a reader and pumps them into the clients
Expand Down
13 changes: 10 additions & 3 deletions report.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package main
import "time"

type report struct {
numTotal int // Number of requests
numErrors int // Number of errors

numTotal int // Number of requests
numErrors int // Number of errors
timeTotal time.Duration // Total duration of requests
}

Expand All @@ -27,3 +26,11 @@ func (r *report) MergeInto(into *report) *report {
into.timeTotal += r.timeTotal
return into
}

// TODO: Need a separate service to compare returned bodies
func (r *report) Serve(out <-chan Response) error {
for resp := range out {
r.Add(resp.Err, resp.Elapsed)
}
return nil
}

0 comments on commit 6c8edda

Please sign in to comment.