-
Notifications
You must be signed in to change notification settings - Fork 0
/
transport.go
99 lines (92 loc) · 2.71 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package hedgehog
import (
"context"
"net/http"
"golang.org/x/sync/errgroup"
)
// NewHTTPClient wraps provided http client with hedged transport.
// If nil client is provided default client will be used, if nil transport is provided default transport will be used.
func NewHTTPClient(client *http.Client, calls uint64, resources ...Resource) *http.Client {
if client == nil {
client = http.DefaultClient
}
if client.Transport == nil {
client.Transport = http.DefaultTransport
}
client.Transport = NewRoundTripper(client.Transport, calls, resources...)
return client
}
type transport struct {
internal http.RoundTripper
resources []Resource
calls uint64
}
// NewRoundTripper returns new http hedged transport with provided resources.
// Returned transport makes hedged http calls in case of resource matching http request up to calls+1 times,
// original http call starts right away and then all hedged calls start together after delay specified by resource.
// Returned transport processes and returns first successful http response all other requests in flight are canceled,
// in case all hedged response failed it simply returns first occurred error.
// If no matching resources were found - the transport simply calls underlying transport.
func NewRoundTripper(internal http.RoundTripper, calls uint64, resources ...Resource) http.RoundTripper {
return transport{internal: internal, calls: calls, resources: resources}
}
func (t transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
for _, rs := range t.resources {
if rs.Match(req) {
return t.multiRoundTrip(req, rs)
}
}
return t.internal.RoundTrip(req)
}
func (t transport) multiRoundTrip(req *http.Request, rs Resource) (resp *http.Response, err error) {
g, ctx := errgroup.WithContext(req.Context())
res := make(chan interface{}, t.calls+1)
defer close(res)
g.Go(func() error {
for i := uint64(0); i < t.calls+1; i++ {
select {
case r := <-res:
switch tr := r.(type) {
case *http.Response:
resp = tr
err = nil
// if we got result hard stop execution.
return context.Canceled
case error:
// keep only first occurred error.
if err == nil {
err = tr
}
}
case <-ctx.Done():
err = ctx.Err()
// if group was canceled hard stop execution.
return context.Canceled
}
}
return nil
})
roundTrip := func() error {
req := req.Clone(ctx)
h := rs.Hook(req)
resp, err := t.internal.RoundTrip(req)
if err != nil {
res <- err
return nil
}
if err := rs.Check(resp); err != nil {
res <- err
return nil
}
h(resp)
res <- resp
return nil
}
g.Go(roundTrip)
<-rs.After()
for i := uint64(0); i < t.calls; i++ {
g.Go(roundTrip)
}
_ = g.Wait()
return
}