Skip to content

Commit

Permalink
main: Properly converge reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
shazow committed Dec 12, 2019
1 parent 4ef82e7 commit ebf7fa6
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
22 changes: 19 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewClients(endpoints []string, concurrency int) (Clients, error) {
if err != nil {
return nil, err
}
clients = append(clients, *c)
clients = append(clients, c)
}
return clients, nil
}
Expand All @@ -63,6 +63,7 @@ func (client *Client) Serve(ctx context.Context) error {
defer close(respCh)

g, ctx := errgroup.WithContext(ctx)
ctx, finalized := context.WithCancel(ctx)

g.Go(func() error {
for {
Expand Down Expand Up @@ -95,6 +96,11 @@ func (client *Client) Serve(ctx context.Context) error {
logger.Debug().Str("endpoint", client.Endpoint).Msg("shutting down client")
return nil
case req := <-client.In:
if req.ID == -1 {
// Final request received, shutdown
finalized()
return nil
}
respCh <- req.Do(t)
}
}
Expand All @@ -106,13 +112,23 @@ func (client *Client) Serve(ctx context.Context) error {

var id requestID

type Clients []Client
type Clients []*Client

// Finalize sends a request with ID -1 which signals the end of the stream, so
// serving will end cleanly.
func (c Clients) Finalize() {
for _, client := range c {
client.In <- Request{
ID: -1,
}
}
}

func (c Clients) Send(line []byte) error {
id += 1
for _, client := range c {
client.In <- Request{
client: &client,
client: client,
ID: id,

Line: line,
Expand Down
25 changes: 20 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func main() {
panic("aborted")
}(abort)

if err := run(ctx, options); err != nil {
exit(2, "error during run: %s", err)
}
}

func run(ctx context.Context, options Options) error {
if options.Duration != "" {
d, err := time.ParseDuration(options.Duration)
if err != nil {
Expand All @@ -89,22 +95,29 @@ func main() {
}
}

ctx, done := context.WithCancel(ctx)
defer done()

g, ctx := errgroup.WithContext(ctx)

// Launch clients
clients, err := NewClients(options.Args.Endpoints, options.Concurrency)
if err != nil {
exit(3, "failed to create clients: %s", err)
return fmt.Errorf("failed to create clients: %w", err)
}

r, err := Report(clients)
if err != nil {
exit(3, "failed to create report: %s", err)
return fmt.Errorf("failed to create report: %w", err)
}

g.Go(func() error {
return r.Serve(ctx)
})

if len(options.Verbose) > 0 {
r.MismatchedResponse = func(resps []Response) {
logger.Info().Msgf("mismatched responses: %s", resps)
logger.Info().Msgf("mismatched responses: %v", resps)
}
}

Expand All @@ -115,14 +128,13 @@ func main() {
logger.Info().Int("clients", len(clients)).Msg("started endpoint clients, pumping stdin")

g.Go(func() error {
defer abort()
return pump(ctx, os.Stdin, clients)
})

if err := g.Wait(); err == context.Canceled {
// Shutting down
} else if err != nil {
exit(3, "failed: %s", err)
return fmt.Errorf("failed to serve: %w", err)
}

// TODO: Make a report
Expand All @@ -131,6 +143,7 @@ func main() {
}

fmt.Printf("report: %+v\n", r)
return nil
}

// pump takes lines from a reader and pumps them into the clients
Expand All @@ -153,6 +166,8 @@ func pump(ctx context.Context, r io.Reader, c Clients) error {
}
}

c.Finalize()

if err := scanner.Err(); err != nil {
return err
}
Expand Down
19 changes: 15 additions & 4 deletions report.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ func (r *report) compareResponses(resp Response) {
delete(r.pendingResponses, resp.ID) // TODO: Reuse these arrays
r.completed += 1

durations := make([]time.Duration, 0, len(r.clients))
durations = append(durations, resp.Elapsed)

for _, other := range otherResponses {
durations = append(durations, other.Elapsed)

if !other.Equal(resp) {
// Mismatch found, report the whole response set
r.mismatched += 1
Expand All @@ -84,6 +89,8 @@ func (r *report) compareResponses(resp Response) {
}
}
}

logger.Debug().Int("id", int(resp.ID)).Msgf("converged responses, durations: %v", durations)
}

func (r *report) handle(resp Response) error {
Expand All @@ -93,10 +100,14 @@ func (r *report) handle(resp Response) error {
}

func (r *report) Serve(ctx context.Context) error {
for resp := range r.respCh {
if err := r.handle(resp); err != nil {
return err
for {
select {
case <-ctx.Done():
return ctx.Err()
case resp := <-r.respCh:
if err := r.handle(resp); err != nil {
return err
}
}
}
return nil
}

0 comments on commit ebf7fa6

Please sign in to comment.