-
Notifications
You must be signed in to change notification settings - Fork 72
/
scheduler.go
429 lines (377 loc) · 12.8 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
package quic
import (
"time"
"github.com/lucas-clemente/quic-go/ackhandler"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
)
type scheduler struct {
// XXX Currently round-robin based, inspired from MPTCP scheduler
quotas map[protocol.PathID]uint
}
func (sch *scheduler) setup() {
sch.quotas = make(map[protocol.PathID]uint)
}
func (sch *scheduler) getRetransmission(s *session) (hasRetransmission bool, retransmitPacket *ackhandler.Packet, pth *path) {
// check for retransmissions first
for {
// TODO add ability to reinject on another path
// XXX We need to check on ALL paths if any packet should be first retransmitted
s.pathsLock.RLock()
retransmitLoop:
for _, pthTmp := range s.paths {
retransmitPacket = pthTmp.sentPacketHandler.DequeuePacketForRetransmission()
if retransmitPacket != nil {
pth = pthTmp
break retransmitLoop
}
}
s.pathsLock.RUnlock()
if retransmitPacket == nil {
break
}
hasRetransmission = true
if retransmitPacket.EncryptionLevel != protocol.EncryptionForwardSecure {
if s.handshakeComplete {
// Don't retransmit handshake packets when the handshake is complete
continue
}
utils.Debugf("\tDequeueing handshake retransmission for packet 0x%x", retransmitPacket.PacketNumber)
return
}
utils.Debugf("\tDequeueing retransmission of packet 0x%x from path %d", retransmitPacket.PacketNumber, pth.pathID)
// resend the frames that were in the packet
for _, frame := range retransmitPacket.GetFramesForRetransmission() {
switch f := frame.(type) {
case *wire.StreamFrame:
s.streamFramer.AddFrameForRetransmission(f)
case *wire.WindowUpdateFrame:
// only retransmit WindowUpdates if the stream is not yet closed and the we haven't sent another WindowUpdate with a higher ByteOffset for the stream
// XXX Should it be adapted to multiple paths?
currentOffset, err := s.flowControlManager.GetReceiveWindow(f.StreamID)
if err == nil && f.ByteOffset >= currentOffset {
s.packer.QueueControlFrame(f, pth)
}
case *wire.PathsFrame:
// Schedule a new PATHS frame to send
s.schedulePathsFrame()
default:
s.packer.QueueControlFrame(frame, pth)
}
}
}
return
}
func (sch *scheduler) selectPathRoundRobin(s *session, hasRetransmission bool, hasStreamRetransmission bool, fromPth *path) *path {
if sch.quotas == nil {
sch.setup()
}
// XXX Avoid using PathID 0 if there is more than 1 path
if len(s.paths) <= 1 {
if !hasRetransmission && !s.paths[protocol.InitialPathID].SendingAllowed() {
return nil
}
return s.paths[protocol.InitialPathID]
}
// TODO cope with decreasing number of paths (needed?)
var selectedPath *path
var lowerQuota, currentQuota uint
var ok bool
// Max possible value for lowerQuota at the beginning
lowerQuota = ^uint(0)
pathLoop:
for pathID, pth := range s.paths {
// Don't block path usage if we retransmit, even on another path
if !hasRetransmission && !pth.SendingAllowed() {
continue pathLoop
}
// If this path is potentially failed, do no consider it for sending
if pth.potentiallyFailed.Get() {
continue pathLoop
}
// XXX Prevent using initial pathID if multiple paths
if pathID == protocol.InitialPathID {
continue pathLoop
}
currentQuota, ok = sch.quotas[pathID]
if !ok {
sch.quotas[pathID] = 0
currentQuota = 0
}
if currentQuota < lowerQuota {
selectedPath = pth
lowerQuota = currentQuota
}
}
return selectedPath
}
func (sch *scheduler) selectPathLowLatency(s *session, hasRetransmission bool, hasStreamRetransmission bool, fromPth *path) *path {
// XXX Avoid using PathID 0 if there is more than 1 path
if len(s.paths) <= 1 {
if !hasRetransmission && !s.paths[protocol.InitialPathID].SendingAllowed() {
return nil
}
return s.paths[protocol.InitialPathID]
}
// FIXME Only works at the beginning... Cope with new paths during the connection
if hasRetransmission && hasStreamRetransmission && fromPth.rttStats.SmoothedRTT() == 0 {
// Is there any other path with a lower number of packet sent?
currentQuota := sch.quotas[fromPth.pathID]
for pathID, pth := range s.paths {
if pathID == protocol.InitialPathID || pathID == fromPth.pathID {
continue
}
// The congestion window was checked when duplicating the packet
if sch.quotas[pathID] < currentQuota {
return pth
}
}
}
var selectedPath *path
var lowerRTT time.Duration
var currentRTT time.Duration
selectedPathID := protocol.PathID(255)
pathLoop:
for pathID, pth := range s.paths {
// Don't block path usage if we retransmit, even on another path
if !hasRetransmission && !pth.SendingAllowed() {
continue pathLoop
}
// If this path is potentially failed, do not consider it for sending
if pth.potentiallyFailed.Get() {
continue pathLoop
}
// XXX Prevent using initial pathID if multiple paths
if pathID == protocol.InitialPathID {
continue pathLoop
}
currentRTT = pth.rttStats.SmoothedRTT()
// Prefer staying single-path if not blocked by current path
// Don't consider this sample if the smoothed RTT is 0
if lowerRTT != 0 && currentRTT == 0 {
continue pathLoop
}
// Case if we have multiple paths unprobed
if currentRTT == 0 {
currentQuota, ok := sch.quotas[pathID]
if !ok {
sch.quotas[pathID] = 0
currentQuota = 0
}
lowerQuota, _ := sch.quotas[selectedPathID]
if selectedPath != nil && currentQuota > lowerQuota {
continue pathLoop
}
}
if currentRTT != 0 && lowerRTT != 0 && selectedPath != nil && currentRTT >= lowerRTT {
continue pathLoop
}
// Update
lowerRTT = currentRTT
selectedPath = pth
selectedPathID = pathID
}
return selectedPath
}
// Lock of s.paths must be held
func (sch *scheduler) selectPath(s *session, hasRetransmission bool, hasStreamRetransmission bool, fromPth *path) *path {
// XXX Currently round-robin
// TODO select the right scheduler dynamically
return sch.selectPathLowLatency(s, hasRetransmission, hasStreamRetransmission, fromPth)
// return sch.selectPathRoundRobin(s, hasRetransmission, hasStreamRetransmission, fromPth)
}
// Lock of s.paths must be free (in case of log print)
func (sch *scheduler) performPacketSending(s *session, windowUpdateFrames []*wire.WindowUpdateFrame, pth *path) (*ackhandler.Packet, bool, error) {
// add a retransmittable frame
if pth.sentPacketHandler.ShouldSendRetransmittablePacket() {
s.packer.QueueControlFrame(&wire.PingFrame{}, pth)
}
packet, err := s.packer.PackPacket(pth)
if err != nil || packet == nil {
return nil, false, err
}
if err = s.sendPackedPacket(packet, pth); err != nil {
return nil, false, err
}
// send every window update twice
for _, f := range windowUpdateFrames {
s.packer.QueueControlFrame(f, pth)
}
// Packet sent, so update its quota
sch.quotas[pth.pathID]++
// Provide some logging if it is the last packet
for _, frame := range packet.frames {
switch frame := frame.(type) {
case *wire.StreamFrame:
if frame.FinBit {
// Last packet to send on the stream, print stats
s.pathsLock.RLock()
utils.Infof("Info for stream %x of %x", frame.StreamID, s.connectionID)
for pathID, pth := range s.paths {
sntPkts, sntRetrans, sntLost := pth.sentPacketHandler.GetStatistics()
rcvPkts := pth.receivedPacketHandler.GetStatistics()
utils.Infof("Path %x: sent %d retrans %d lost %d; rcv %d rtt %v", pathID, sntPkts, sntRetrans, sntLost, rcvPkts, pth.rttStats.SmoothedRTT())
}
s.pathsLock.RUnlock()
}
default:
}
}
pkt := &ackhandler.Packet{
PacketNumber: packet.number,
Frames: packet.frames,
Length: protocol.ByteCount(len(packet.raw)),
EncryptionLevel: packet.encryptionLevel,
}
return pkt, true, nil
}
// Lock of s.paths must be free
func (sch *scheduler) ackRemainingPaths(s *session, totalWindowUpdateFrames []*wire.WindowUpdateFrame) error {
// Either we run out of data, or CWIN of usable paths are full
// Send ACKs on paths not yet used, if needed. Either we have no data to send and
// it will be a pure ACK, or we will have data in it, but the CWIN should then
// not be an issue.
s.pathsLock.RLock()
defer s.pathsLock.RUnlock()
// get WindowUpdate frames
// this call triggers the flow controller to increase the flow control windows, if necessary
windowUpdateFrames := totalWindowUpdateFrames
if len(windowUpdateFrames) == 0 {
windowUpdateFrames = s.getWindowUpdateFrames(s.peerBlocked)
}
for _, pthTmp := range s.paths {
ackTmp := pthTmp.GetAckFrame()
for _, wuf := range windowUpdateFrames {
s.packer.QueueControlFrame(wuf, pthTmp)
}
if ackTmp != nil || len(windowUpdateFrames) > 0 {
if pthTmp.pathID == protocol.InitialPathID && ackTmp == nil {
continue
}
swf := pthTmp.GetStopWaitingFrame(false)
if swf != nil {
s.packer.QueueControlFrame(swf, pthTmp)
}
s.packer.QueueControlFrame(ackTmp, pthTmp)
// XXX (QDC) should we instead call PackPacket to provides WUFs?
var packet *packedPacket
var err error
if ackTmp != nil {
// Avoid internal error bug
packet, err = s.packer.PackAckPacket(pthTmp)
} else {
packet, err = s.packer.PackPacket(pthTmp)
}
if err != nil {
return err
}
err = s.sendPackedPacket(packet, pthTmp)
if err != nil {
return err
}
}
}
s.peerBlocked = false
return nil
}
func (sch *scheduler) sendPacket(s *session) error {
var pth *path
// Update leastUnacked value of paths
s.pathsLock.RLock()
for _, pthTmp := range s.paths {
pthTmp.SetLeastUnacked(pthTmp.sentPacketHandler.GetLeastUnacked())
}
s.pathsLock.RUnlock()
// get WindowUpdate frames
// this call triggers the flow controller to increase the flow control windows, if necessary
windowUpdateFrames := s.getWindowUpdateFrames(false)
for _, wuf := range windowUpdateFrames {
s.packer.QueueControlFrame(wuf, pth)
}
// Repeatedly try sending until we don't have any more data, or run out of the congestion window
for {
// We first check for retransmissions
hasRetransmission, retransmitHandshakePacket, fromPth := sch.getRetransmission(s)
// XXX There might still be some stream frames to be retransmitted
hasStreamRetransmission := s.streamFramer.HasFramesForRetransmission()
// Select the path here
s.pathsLock.RLock()
pth = sch.selectPath(s, hasRetransmission, hasStreamRetransmission, fromPth)
s.pathsLock.RUnlock()
// XXX No more path available, should we have a new QUIC error message?
if pth == nil {
windowUpdateFrames := s.getWindowUpdateFrames(false)
return sch.ackRemainingPaths(s, windowUpdateFrames)
}
// If we have an handshake packet retransmission, do it directly
if hasRetransmission && retransmitHandshakePacket != nil {
s.packer.QueueControlFrame(pth.sentPacketHandler.GetStopWaitingFrame(true), pth)
packet, err := s.packer.PackHandshakeRetransmission(retransmitHandshakePacket, pth)
if err != nil {
return err
}
if err = s.sendPackedPacket(packet, pth); err != nil {
return err
}
continue
}
// XXX Some automatic ACK generation should be done someway
var ack *wire.AckFrame
ack = pth.GetAckFrame()
if ack != nil {
s.packer.QueueControlFrame(ack, pth)
}
if ack != nil || hasStreamRetransmission {
swf := pth.sentPacketHandler.GetStopWaitingFrame(hasStreamRetransmission)
if swf != nil {
s.packer.QueueControlFrame(swf, pth)
}
}
// Also add CLOSE_PATH frames, if any
for cpf := s.streamFramer.PopClosePathFrame(); cpf != nil; cpf = s.streamFramer.PopClosePathFrame() {
s.packer.QueueControlFrame(cpf, pth)
}
// Also add ADD ADDRESS frames, if any
for aaf := s.streamFramer.PopAddAddressFrame(); aaf != nil; aaf = s.streamFramer.PopAddAddressFrame() {
s.packer.QueueControlFrame(aaf, pth)
}
// Also add PATHS frames, if any
for pf := s.streamFramer.PopPathsFrame(); pf != nil; pf = s.streamFramer.PopPathsFrame() {
s.packer.QueueControlFrame(pf, pth)
}
pkt, sent, err := sch.performPacketSending(s, windowUpdateFrames, pth)
if err != nil {
return err
}
windowUpdateFrames = nil
if !sent {
// Prevent sending empty packets
return sch.ackRemainingPaths(s, windowUpdateFrames)
}
// Duplicate traffic when it was sent on an unknown performing path
// FIXME adapt for new paths coming during the connection
if pth.rttStats.SmoothedRTT() == 0 {
currentQuota := sch.quotas[pth.pathID]
// Was the packet duplicated on all potential paths?
duplicateLoop:
for pathID, tmpPth := range s.paths {
if pathID == protocol.InitialPathID || pathID == pth.pathID {
continue
}
if sch.quotas[pathID] < currentQuota && tmpPth.sentPacketHandler.SendingAllowed() {
// Duplicate it
pth.sentPacketHandler.DuplicatePacket(pkt)
break duplicateLoop
}
}
}
// And try pinging on potentially failed paths
if fromPth != nil && fromPth.potentiallyFailed.Get() {
err = s.sendPing(fromPth)
if err != nil {
return err
}
}
}
}