-
Notifications
You must be signed in to change notification settings - Fork 177
/
Copy pathdiscovery.go
140 lines (120 loc) · 3.92 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package discovery
import (
"container/heap"
"context"
"math"
"math/rand"
"net/url"
"time"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/server"
"github.com/golang/glog"
)
var getOrchestratorsTimeoutLoop = 3 * time.Second
var serverGetOrchInfo = server.GetOrchestratorInfo
type orchestratorPool struct {
uris []*url.URL
pred func(info *net.OrchestratorInfo) bool
bcast common.Broadcaster
}
func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL) *orchestratorPool {
if len(uris) <= 0 {
// Should we return here?
glog.Error("Orchestrator pool does not have any URIs")
}
return &orchestratorPool{uris: uris, bcast: bcast}
}
func NewOrchestratorPoolWithPred(bcast common.Broadcaster, addresses []*url.URL, pred func(*net.OrchestratorInfo) bool) *orchestratorPool {
pool := NewOrchestratorPool(bcast, addresses)
pool.pred = pred
return pool
}
func (o *orchestratorPool) GetURLs() []*url.URL {
return o.uris
}
func (o *orchestratorPool) GetOrchestrators(numOrchestrators int, suspender common.Suspender, caps common.CapabilityComparator) ([]*net.OrchestratorInfo, error) {
numAvailableOrchs := len(o.uris)
numOrchestrators = int(math.Min(float64(numAvailableOrchs), float64(numOrchestrators)))
ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop)
infoCh := make(chan *net.OrchestratorInfo, numAvailableOrchs)
errCh := make(chan error, numAvailableOrchs)
// The following allows us to avoid capability check for jobs that only
// depend on "legacy" features, since older orchestrators support these
// features without capability discovery. This enables interop between
// older orchestrators and newer orchestrators as long as the job only
// requires the legacy feature set.
//
// When / if it's justified to completely break interop with older
// orchestrators, then we can probably remove this check and work with
// the assumption that all orchestrators support capability discovery.
legacyCapsOnly := caps.LegacyOnly()
isCompatible := func(info *net.OrchestratorInfo) bool {
if o.pred != nil && !o.pred(info) {
return false
}
// Legacy features already have support on the orchestrator.
// Capabilities can be omitted in this case for older orchestrators.
// Otherwise, capabilities are required to be present.
if info.Capabilities == nil {
if legacyCapsOnly {
return true
}
return false
}
return caps.CompatibleWith(info.Capabilities)
}
getOrchInfo := func(uri *url.URL) {
info, err := serverGetOrchInfo(ctx, o.bcast, uri)
if err == nil && isCompatible(info) {
infoCh <- info
return
}
if err != nil && monitor.Enabled {
monitor.LogDiscoveryError(err.Error())
}
errCh <- err
}
// Shuffle into new slice to avoid mutating underlying data
uris := make([]*url.URL, numAvailableOrchs)
for i, j := range rand.Perm(numAvailableOrchs) {
uris[i] = o.uris[j]
}
for _, uri := range uris {
go getOrchInfo(uri)
}
timeout := false
infos := []*net.OrchestratorInfo{}
suspendedInfos := newSuspensionQueue()
nbResp := 0
for i := 0; i < numAvailableOrchs && len(infos) < numOrchestrators && !timeout; i++ {
select {
case info := <-infoCh:
if penalty := suspender.Suspended(info.Transcoder); penalty == 0 {
infos = append(infos, info)
} else {
heap.Push(suspendedInfos, &suspension{info, penalty})
}
nbResp++
case <-errCh:
nbResp++
case <-ctx.Done():
timeout = true
}
}
cancel()
if len(infos) < numOrchestrators {
diff := numOrchestrators - len(infos)
for i := 0; i < diff && suspendedInfos.Len() > 0; i++ {
info := heap.Pop(suspendedInfos).(*suspension).orch
infos = append(infos, info)
}
}
glog.Infof("Done fetching orch info numOrch=%d responses=%d/%d timeout=%t",
len(infos), nbResp, len(uris), timeout)
return infos, nil
}
func (o *orchestratorPool) Size() int {
return len(o.uris)
}