forked from taskcluster/taskcluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
protocol.go
176 lines (156 loc) · 4.53 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package workerproto
import (
"log"
"sync"
)
type MessageCallback func(msg Message)
type Protocol struct {
// transport over which this protocol is running
transport Transport
// local and remote capabilities
localCapabilities *capabilities
remoteCapabilities *capabilities
// callbacks per message type
callbacks map[string][]MessageCallback
// tracking for whether Start has been called yet
started bool
startedMutex sync.Mutex
// tracking for whether this protocol is intialized
initialized bool
initializedCond sync.Cond
// tracking for EOF from the read side of the transport
eof bool
eofCond sync.Cond
}
func NewProtocol(transport Transport) *Protocol {
return &Protocol{
transport: transport,
localCapabilities: EmptyCapabilities(),
remoteCapabilities: EmptyCapabilities(),
callbacks: make(map[string][]MessageCallback),
initialized: false,
initializedCond: sync.Cond{
L: &sync.Mutex{},
},
eofCond: sync.Cond{
L: &sync.Mutex{},
},
}
}
// Register a callback for the given message type. This must occur before the
// protocol is started.
func (prot *Protocol) Register(messageType string, callback MessageCallback) {
callbacks := prot.callbacks[messageType]
callbacks = append(callbacks, callback)
prot.callbacks[messageType] = callbacks
}
// convert an anymous interface into a list of strings; useful for parsing lists
// out of messages
func listOfStrings(val interface{}) []string {
aslist := val.([]interface{})
rv := make([]string, 0, len(aslist))
for _, elt := range aslist {
rv = append(rv, elt.(string))
}
return rv
}
// Start the protocol and initiate the hello/welcome transaction.
func (prot *Protocol) Start(asWorker bool) {
if asWorker {
prot.Register("welcome", func(msg Message) {
prot.remoteCapabilities = FromCapabilitiesList(listOfStrings(msg.Properties["capabilities"]))
prot.Send(Message{
Type: "hello",
Properties: map[string]interface{}{
"capabilities": prot.localCapabilities.List(),
},
})
prot.SetInitialized()
})
} else {
prot.Register("hello", func(msg Message) {
prot.remoteCapabilities = FromCapabilitiesList(listOfStrings(msg.Properties["capabilities"]))
prot.SetInitialized()
})
// send a welcome message, but don't wait for it to complete
go func() {
prot.Send(Message{
Type: "welcome",
Properties: map[string]interface{}{
"capabilities": prot.localCapabilities.List(),
},
})
}()
}
prot.startedMutex.Lock()
prot.started = true
prot.startedMutex.Unlock()
go prot.recvLoop()
}
// Set this protocol as initialized. Ordinarily this happens automatically, but in cases
// where the worker does not support the protocol, this method can be used to indicate that
// the protocol is "initialized" with no capabilities.
func (prot *Protocol) SetInitialized() {
// announce that we are now initialized
prot.initializedCond.L.Lock()
defer prot.initializedCond.L.Unlock()
prot.initialized = true
prot.initializedCond.Broadcast()
}
// Wait until this protocol is initialized.
func (prot *Protocol) WaitUntilInitialized() {
prot.initializedCond.L.Lock()
defer prot.initializedCond.L.Unlock()
for !prot.initialized {
prot.initializedCond.Wait()
}
}
// Add the given capability to the local capabilities
func (prot *Protocol) AddCapability(c string) {
prot.startedMutex.Lock()
defer prot.startedMutex.Unlock()
if prot.started {
panic("Cannot AddCapability after protocol is started")
}
prot.localCapabilities.Add(c)
}
// Check if a capability is supported by both ends of the protocol, after
// waiting for initialization.
func (prot *Protocol) Capable(c string) bool {
prot.WaitUntilInitialized()
return prot.localCapabilities.Has(c) && prot.remoteCapabilities.Has(c)
}
// Wait until all message have been read from the transport.
func (prot *Protocol) WaitForEOF() {
prot.eofCond.L.Lock()
defer prot.eofCond.L.Unlock()
for !prot.eof {
prot.eofCond.Wait()
}
}
// Send a message. This happens without waiting for initialization; as the
// caller should already have used prot.Capable to determine whether the
// message was supported.
func (prot *Protocol) Send(msg Message) {
prot.transport.Send(msg)
}
func (prot *Protocol) recvLoop() {
for {
msg, ok := prot.transport.Recv()
if !ok {
prot.eofCond.L.Lock()
prot.eof = true
prot.eofCond.Broadcast()
prot.eofCond.L.Unlock()
return
}
callbacks, ok := prot.callbacks[msg.Type]
if ok {
for _, cb := range callbacks {
cb(msg)
}
} else {
log.Printf("No callback registered for message %s\n", msg.Type)
}
}
}