-
Notifications
You must be signed in to change notification settings - Fork 3
/
valve.go
156 lines (134 loc) · 3.7 KB
/
valve.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package valve
import (
"context"
"errors"
"net/http"
"sync"
"time"
)
var (
ValveCtxKey = &contextKey{"Valve"}
ErrTimedout = errors.New("valve: shutdown timed out")
ErrShuttingdown = errors.New("valve: shutdown in progress")
ErrOff = errors.New("valve: valve already shutdown")
)
type Valve struct {
stopCh chan struct{}
wg sync.WaitGroup
shutdown bool
mu sync.Mutex
}
type LeverControl interface {
Stop() <-chan struct{}
Add(delta int) error
Done()
Open() error
Close()
}
func New() *Valve {
return &Valve{
stopCh: make(chan struct{}, 0),
}
}
// Context returns a fresh context with the Lever value set.
//
// It is useful as the base context in a server, that provides shutdown
// signaling across a context tree.
func (v *Valve) Context() context.Context {
return context.WithValue(context.Background(), ValveCtxKey, LeverControl(v))
}
// Lever returns the lever controls from a context object.
func Lever(ctx context.Context) LeverControl {
valveCtx, ok := ctx.Value(ValveCtxKey).(LeverControl)
if !ok {
panic("valve: ValveCtxKey has not been set on the context.")
}
return valveCtx
}
// Shutdown will signal to the context to stop all processing, and will
// give a grace period of `timeout` duration. If `timeout` is 0 then it will
// wait indefinitely until all valves are closed.
func (v *Valve) Shutdown(timeout time.Duration) error {
v.mu.Lock()
defer v.mu.Unlock()
if v.shutdown {
return ErrOff
}
close(v.stopCh)
v.shutdown = true
if timeout == 0 {
v.wg.Wait()
} else {
tc := make(chan struct{})
go func() {
defer close(tc)
v.wg.Wait()
}()
select {
case <-tc:
return nil
case <-time.After(timeout):
return ErrTimedout
}
}
return nil
}
// Stop returns a channel that will be closed once the system is supposed to
// be stopped. It mimics the behaviou of the ctx.Done() method in "context".
func (v *Valve) Stop() <-chan struct{} {
return v.stopCh
}
// Add increments by `delta` (should be 1), to a waitgroup on the valve that
// signifies that a block of code must complete before we exit the system.
func (v *Valve) Add(delta int) error {
select {
case <-v.stopCh:
return ErrShuttingdown
default:
v.wg.Add(delta)
return nil
}
}
// Done decrements the valve waitgroup that informs the lever control that the
// non-preemptive app code is finished.
func (v *Valve) Done() {
v.wg.Done()
}
// Open is an alias for Add(1) intended to read better for opening a valve.
func (v *Valve) Open() error {
return v.Add(1)
}
// Close is an alias for Done() intended to read better for closing a valve.
func (v *Valve) Close() {
v.Done()
}
// ShutdownHandler is an optional HTTP middleware handler that will stop
// accepting new connections if the server is in a shutting-down state.
//
// If you're using something that github.com/tylerb/graceful which stops
// accepting new connections on the socket anyways, then this handler
// wouldnt be necessary, but it is handy otherwise.
func (v *Valve) ShutdownHandler(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
lever := Lever(r.Context())
lever.Open()
defer lever.Close()
select {
case <-lever.Stop():
// Shutdown in progress - don't accept new requests
http.Error(w, ErrShuttingdown.Error(), http.StatusServiceUnavailable)
default:
next.ServeHTTP(w, r)
}
}
return http.HandlerFunc(fn)
}
// contextKey is a value for use with context.WithValue. It's used as
// a pointer so it fits in an interface{} without allocation. This technique
// for defining context keys was copied from Go 1.7's new use of context in net/http.
type contextKey struct {
name string
}
func (k *contextKey) String() string {
return "valve context value " + k.name
}