Skip to content

Commit

Permalink
wip: Sketching out components
Browse files Browse the repository at this point in the history
  • Loading branch information
shazow committed Nov 20, 2019
1 parent c6e4a8a commit ccc40ed
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 1 deletion.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# versus
Benchmark multiple API endpoints against each other

versus takes a stream of requests and runs them against N endpoints
simultaneously, comparing the output and timing.
93 changes: 93 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"context"
"sync"
"time"
)

func NewClient(endpoint string) (*Client, error) {
t, err := NewTransport(endpoint)
if err != nil {
return nil, err
}
return &Client{
Endpoint: endpoint,

t: t,
in: make(chan Request, chanBuffer),
out: make(chan Response, chanBuffer),
}, nil
}

type Client struct {
Endpoint string
Concurrency int // Number of goroutines to make requests with. Must be >=1.

t Transport
in chan Request
out chan Response
report report
}

// Serve starts the async request and response goroutine consumers.
func (client *Client) Serve(ctx context.Context) error {
wg := sync.WaitGroup{}

go func() {
// Consume responses
wg.Add(1)
defer wg.Done()

for {
select {
case <-ctx.Done():
return
case resp := <-client.out:
client.report.Add(resp.Err, resp.Elapsed)
}
}
}()

if client.Concurrency < 1 {
client.Concurrency = 1
}

for i := 0; i < client.Concurrency; i++ {
go func() {
// Consume requests
wg.Add(1)
defer wg.Done()

for {
select {
case <-ctx.Done():
return
case req := <-client.in:
req.Do()
}
}
}()
}

wg.Wait()
return nil
}

var id int

type Clients []Client

func (c Clients) Send(line []byte) error {
id += 1
for _, client := range c {
client.in <- Request{
client: &client,
id: id,

Line: line,
Timestamp: time.Now(),
}
}
return nil
}
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module github.com/INFURA/versus

go 1.12

require (
github.com/gorilla/websocket v1.4.1 // indirect
github.com/jessevdk/go-flags v1.4.0
github.com/valyala/fasthttp v1.4.0
)
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.4.0 h1:PuaTGZIw3mjYhhhbVbCQp8aciRZN9YdoB7MGX9Ko76A=
github.com/valyala/fasthttp v1.4.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
118 changes: 118 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main // import "github.com/INFURA/versus"

import (
"bufio"
"bytes"
"context"
"fmt"
"log"
"os"
"strings"
"time"

flags "github.com/jessevdk/go-flags"
)

const chanBuffer = 20

// Options contains the flag options
type Options struct {
Endpoints []string `positional-args:"yes"`
}

func exit(code int, format string, args ...interface{}) {
fmt.Fprintf(os.Stderr, format, args...)
os.Exit(code)
}

func main() {
options := Options{}
p, err := flags.NewParser(&options, flags.Default).ParseArgs(os.Args[1:])
if err != nil {
if p == nil {
fmt.Println(err)
}
return
}

clients := make(Clients, 0, len(options.Endpoints))
for _, endpoint := range options.Endpoints {
c, err := NewClient(endpoint)
if err != nil {
exit(1, "failed to create client: %s", err)
}
clients = append(clients, *c)
}

ctx := context.Background()

scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
}

if err := clients.Send(scanner.Bytes()); err != nil {
log.Print(err)
return
}
}

if err := scanner.Err(); err != nil {
log.Print(err)
return
}
}

type Request struct {
client *Client
id int

Line []byte
Timestamp time.Time
}

func (req *Request) Do() {
body, err := req.client.t.Send(req.Line)
resp := Response{
req.client, body, err, time.Now().Sub(req.Timestamp),
}
req.client.out <- resp
}

type Response struct {
client *Client

Body []byte
Err error

Elapsed time.Duration
}

func (r *Response) Equal(other Response) bool {
return r.Err.Error() == other.Err.Error() && bytes.Equal(r.Body, other.Body)
}

type Responses []Response

func (r Responses) String() string {
var buf strings.Builder

// TODO: Sort before printing
last := r[0]
for i, resp := range r {
fmt.Fprintf(&buf, "\t%s", resp.Elapsed)

if resp.Err.Error() != last.Err.Error() {
fmt.Fprintf(&buf, "[%d: error mismatch: %s != %s]", i, resp.Err, last.Err)
}

if !bytes.Equal(resp.Body, last.Body) {
fmt.Fprintf(&buf, "[%d: body mismatch: %s != %s]", i, resp.Body[:20], last.Body[:20])
}
}

return buf.String()
}
18 changes: 18 additions & 0 deletions report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import "time"

type report struct {
numTotal int // Number of requests
numErrors int // Number of errors

timeTotal time.Duration // Total duration of requests
}

func (r *report) Add(err error, elapsed time.Duration) {
r.numTotal += 1
if err != nil {
r.numErrors += 1
}
r.timeTotal += elapsed
}
53 changes: 53 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"errors"
"fmt"
"net/url"

"github.com/valyala/fasthttp"
)

func NewTransport(endpoint string) (Transport, error) {
url, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
switch url.Scheme {
case "http", "https":
return &fasthttpTransport{
endpoint: endpoint,
}, nil
case "ws", "wss":
// TODO: Implement
return &websocketTransport{}, nil
}
return nil, fmt.Errorf("unsupported transport: %s", url.Scheme)
}

type Transport interface {
Send(body []byte) ([]byte, error)
}

type fasthttpTransport struct {
endpoint string
fasthttp.Client
}

func (t *fasthttpTransport) Send(body []byte) ([]byte, error) {
code, resp, err := t.Post(body, t.endpoint, nil)
if err != nil {
return nil, err
}
if code >= 400 {
return nil, fmt.Errorf("bad status code: %d", code)
}
return resp, nil
}

type websocketTransport struct {
}

func (t *websocketTransport) Send(body []byte) ([]byte, error) {
return nil, errors.New("websocketTransport: not implemented")
}

0 comments on commit ccc40ed

Please sign in to comment.