forked from jcelliott/turnpike
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbroker.go
95 lines (85 loc) · 2.73 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package turnpike
// Broker is the interface implemented by an object that handles routing EVENTS
// from Publishers to Subscribers.
type Broker interface {
// Publishes a message to all Subscribers.
Publish(Sender, *Publish)
// Subscribes to messages on a URI.
Subscribe(Sender, *Subscribe)
// Unsubscribes from messages on a URI.
Unsubscribe(Sender, *Unsubscribe)
}
// A super simple broker that matches URIs to Subscribers.
type defaultBroker struct {
routes map[URI]map[ID]Sender
subscriptions map[ID]URI
}
// NewDefaultBroker initializes and returns a simple broker that matches URIs to
// Subscribers.
func NewDefaultBroker() Broker {
return &defaultBroker{
routes: make(map[URI]map[ID]Sender),
subscriptions: make(map[ID]URI),
}
}
// Publish sends a message to all subscribed clients except for the sender.
//
// If msg.Options["acknowledge"] == true, the publisher receives a Published event
// after the message has been sent to all subscribers.
func (br *defaultBroker) Publish(pub Sender, msg *Publish) {
pubID := NewID()
evtTemplate := Event{
Publication: pubID,
Arguments: msg.Arguments,
ArgumentsKw: msg.ArgumentsKw,
Details: make(map[string]interface{}),
}
for id, sub := range br.routes[msg.Topic] {
// shallow-copy the template
event := evtTemplate
event.Subscription = id
// don't send event to publisher
if sub != pub {
sub.Send(&event)
}
}
// only send published message if acknowledge is present and set to true
if doPub, _ := msg.Options["acknowledge"].(bool); doPub {
pub.Send(&Published{Request: msg.Request, Publication: pubID})
}
}
// Subscribe subscribes the client to the given topic.
func (br *defaultBroker) Subscribe(sub Sender, msg *Subscribe) {
if _, ok := br.routes[msg.Topic]; !ok {
br.routes[msg.Topic] = make(map[ID]Sender)
}
id := NewID()
br.routes[msg.Topic][id] = sub
br.subscriptions[id] = msg.Topic
sub.Send(&Subscribed{Request: msg.Request, Subscription: id})
}
func (br *defaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) {
topic, ok := br.subscriptions[msg.Subscription]
if !ok {
err := &Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: ErrNoSuchSubscription,
}
sub.Send(err)
log.Printf("Error unsubscribing: no such subscription %v", msg.Subscription)
return
}
delete(br.subscriptions, msg.Subscription)
if r, ok := br.routes[topic]; !ok {
log.Printf("Error unsubscribing: unable to find routes for %s topic", topic)
} else if _, ok := r[msg.Subscription]; !ok {
log.Printf("Error unsubscribing: %s route does not exist for %v subscription", topic, msg.Subscription)
} else {
delete(r, msg.Subscription)
if len(r) == 0 {
delete(br.routes, topic)
}
}
sub.Send(&Unsubscribed{Request: msg.Request})
}