-
Notifications
You must be signed in to change notification settings - Fork 3
/
delegate.go
160 lines (138 loc) · 3.76 KB
/
delegate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"slices"
"sync"
"time"
)
type SyncQueue struct {
Name string `json:"Name"`
ExpireMessages bool `json:"ExpireMessages"`
ExpiryTime uint `json:"ExpiryTime"`
Messages []SyncMessage `json:"Messages"`
}
type SyncMessage struct {
Key string `json:"Key"`
Data []byte `json:"Data"`
Timestamp time.Time `json:"Timestamp"`
}
type NodeMeta struct {
Name string
BindAddress string
MemberlistPort uint
}
type Delegate struct {
fairyMQ *FairyMQ
}
func (delegate *Delegate) NodeMeta(limit int) []byte {
meta := NodeMeta{
Name: fmt.Sprintf("%s:%d", delegate.fairyMQ.Config.BindAddress, delegate.fairyMQ.Config.MemberlistPort),
BindAddress: delegate.fairyMQ.Config.BindAddress,
MemberlistPort: delegate.fairyMQ.Config.MemberlistPort,
}
mb := make([]byte, limit)
mb, err := json.Marshal(meta)
if err != nil {
log.Println("Error: ", err)
return []byte{}
}
return mb
}
func (delegate *Delegate) NotifyMsg([]byte) {
// No-Op
}
func (delegate *Delegate) GetBroadcasts(overhead, limit int) [][]byte {
// No-Op
return [][]byte{}
}
func (delegate *Delegate) LocalState(join bool) []byte {
var queues []SyncQueue
for queueName, mut := range fairyMQ.QueueMutexes {
mut.Lock()
// Extract messages from queue
var messages []SyncMessage
for _, m := range fairyMQ.Queues[queueName].Messages {
messages = append(messages, SyncMessage{
Key: m.Key,
Data: m.Data,
Timestamp: m.Timestamp,
})
}
queues = append(queues, SyncQueue{
Name: queueName,
ExpireMessages: fairyMQ.Queues[queueName].ExpireMessages,
ExpiryTime: fairyMQ.Queues[queueName].ExpiryTime,
Messages: messages,
})
mut.Unlock()
}
b, err := json.Marshal(queues)
if err != nil {
log.Println("Could not encode state for sync: ", err.Error())
return []byte{}
}
return b
}
func (delegate *Delegate) MergeRemoteState(buf []byte, join bool) {
var queues []SyncQueue
err := json.Unmarshal(buf, &queues)
if err != nil {
log.Println("Could not decode state for merge: ", err.Error())
return
}
for _, q := range queues {
var messages []Message
mut, ok := fairyMQ.QueueMutexes[q.Name]
if !ok { // If queue does not exist, add it.
for _, m := range q.Messages {
messages = append(messages, Message{
Key: m.Key,
Data: m.Data,
Timestamp: m.Timestamp,
AcknowledgedConsumers: []Consumer{},
})
}
fairyMQ.QueueMutexes[q.Name] = &sync.Mutex{}
fairyMQ.QueueMutexes[q.Name].Lock()
fairyMQ.Queues[q.Name] = &Queue{
ExpireMessages: q.ExpireMessages,
ExpiryTime: q.ExpiryTime,
Messages: messages,
Consumers: []string{},
}
fairyMQ.QueueMutexes[q.Name].Unlock()
continue
}
// If queue exists, merge the messages.
mut.Lock()
for _, m := range q.Messages {
msgIdx := slices.IndexFunc(fairyMQ.Queues[q.Name].Messages, func(message Message) bool {
return (m.Key == message.Key) && (m.Timestamp == message.Timestamp) && bytes.Equal(m.Data, message.Data)
})
if msgIdx == -1 {
// Current message is not contained in the messages, add the message
fairyMQ.Queues[q.Name].Messages = append(fairyMQ.Queues[q.Name].Messages, Message{
Key: m.Key,
Data: m.Data,
Timestamp: m.Timestamp,
AcknowledgedConsumers: []Consumer{},
})
}
}
// Sort the messages by timestamp
slices.SortFunc(fairyMQ.Queues[q.Name].Messages, func(a, b Message) int {
switch {
case a.Timestamp.Before(b.Timestamp):
return 1
case a.Timestamp.After(b.Timestamp):
return -1
default:
return 0
}
})
mut.Unlock()
}
}