-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
185 lines (157 loc) · 5.26 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package kubenvoy
import (
"context"
"fmt"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"kubenvoy/utils"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/glog"
)
// XDSStream implements (and also wraps) EndpointDiscoveryService_StreamEndpointsServer.
// It has customized Recv() and Send() functions, aiming for simplying streaming of
// DiscoveryRequest & DiscoveryResponse.
type XDSStream struct {
grpc.ServerStream
responseChan chan *envoy.DiscoveryResponse
ackChan map[string]chan *envoy.DiscoveryRequest
mutex sync.Mutex
ctx context.Context
// TypeURL/name to Version map
appliedVersion map[string]string
versionMutex sync.RWMutex
}
// NewXDSStream wraps EndpointDiscoveryService_StreamEndpointsServer
func NewXDSStream(stream grpc.ServerStream) *XDSStream {
ctx, cancel := context.WithCancel(stream.Context())
utils.OnTerminate(cancel)
return &XDSStream{
ServerStream: stream,
ctx: ctx,
responseChan: make(chan *envoy.DiscoveryResponse),
ackChan: make(map[string]chan *envoy.DiscoveryRequest),
appliedVersion: make(map[string]string),
}
}
func (s *XDSStream) Context() context.Context {
return s.ctx
}
// listen listens responses sent to responseChan and then send them into stream.
// GRPC doesn't allow multiple go routines calling one
// EndpointDiscoveryService_StreamEndpointsServer.Send so we created a channel instead
func (s *XDSStream) listen() {
for {
select {
case resp, ok := <-s.responseChan:
if !ok {
return
}
if err := s.ServerStream.SendMsg(resp); err != nil {
glog.Error(err)
return
}
case <-s.Context().Done():
return
}
}
}
// Recv implements one method of EndpointDiscoveryService_StreamEndpointsServer
func (s *XDSStream) Recv() (*envoy.DiscoveryRequest, error) {
r := new(envoy.DiscoveryRequest)
if err := s.ServerStream.RecvMsg(r); err != nil {
return nil, err
}
glog.V(1).Infof("Received request %v", r)
if r.GetResponseNonce() == "" {
// first request from envoy, likely when envoy server starts
return r, nil
}
s.updateAppliedVersion(r.GetTypeUrl(), strings.Join(r.GetResourceNames(), "|"), r.VersionInfo)
ackChan, exist := s.ackChanForResponse(r.GetResponseNonce())
if !exist {
// nonce not found in this server, we will then process it as if it's new
return r, nil
}
// At this point, nonce is found in this server, so the request is a ACK/NACK for
// our response. Here we then send the request to ackChan and let original go routine
// handle it. And since the request is merely a ACK/NACK, we skip it and get next
// request
go func() {
ackChan <- r
close(ackChan)
}()
return s.Recv()
}
func (s *XDSStream) ackChanForResponse(nonce string) (ackChan chan *envoy.DiscoveryRequest, exist bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
ackChan, exist = s.ackChan[nonce]
return
}
// ACK channel for response, identified by nonce, nonce must be unique
func (s *XDSStream) getOrCreateAckChanForResponse(nonce string) chan *envoy.DiscoveryRequest {
s.mutex.Lock()
defer s.mutex.Unlock()
_, exist := s.ackChan[nonce]
if !exist {
s.ackChan[nonce] = make(chan *envoy.DiscoveryRequest)
}
return s.ackChan[nonce]
}
func (s *XDSStream) deleteAckChanForResponse(nonce string) {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.ackChan, nonce)
}
func (s *XDSStream) send(resp *envoy.DiscoveryResponse, count int) error {
const MaximumRetrial = 10
if count == MaximumRetrial {
return fmt.Errorf("failed to receive ACK/NAK from client for response %v, maximum retrials reached", resp)
}
s.responseChan <- resp
ack := s.getOrCreateAckChanForResponse(resp.GetNonce())
timer := time.NewTimer(20 * time.Second)
for {
select {
case <-s.Context().Done():
return nil
case <-timer.C:
glog.Warningf("failed to receive ACK/NAK from client for DiscoveryResponse %v, retry", resp.Nonce)
return s.send(resp, count+1)
case r := <-ack:
s.deleteAckChanForResponse(r.GetResponseNonce())
if r.GetTypeUrl() != resp.GetTypeUrl() {
return fmt.Errorf("got unexpected type in ACK request %v", r)
}
if r.GetVersionInfo() != resp.GetVersionInfo() {
return fmt.Errorf("client %v NACKed new config %v: %v", r.GetNode().GetId(), resp.GetVersionInfo(), r.GetErrorDetail())
}
s.updateAppliedVersion(r.GetTypeUrl(), strings.Join(r.GetResourceNames(), "|"), r.VersionInfo)
glog.V(0).Infof("Client %v ACKed new config %v for %v [type: %v]", r.GetNode().GetId(), resp.GetVersionInfo(), r.GetResourceNames(), r.GetTypeUrl())
return nil
}
}
}
func (s *XDSStream) updateAppliedVersion(typeURL, resourceNames, appliedVersion string) {
s.versionMutex.Lock()
defer s.versionMutex.Unlock()
s.appliedVersion[typeURL+"/"+resourceNames] = appliedVersion
}
func (s *XDSStream) AppliedVersion(typeURL, resourceNames string) string {
s.versionMutex.RLock()
defer s.versionMutex.RUnlock()
return s.appliedVersion[typeURL+"/"+resourceNames]
}
// Send implements Send in EndpointDiscoveryService_StreamEndpointsServer interface and overrides the original one.
// s.listen() must be called before any Send() calls
func (s *XDSStream) Send(resp *envoy.DiscoveryResponse) error {
glog.V(2).Infof("Sending response %v", resp)
go func() {
if err := s.send(resp, 0); err != nil {
glog.Errorf("failed to send new config to client %v", err)
}
}()
return nil
}