-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patheventserver.go
519 lines (468 loc) · 16.8 KB
/
eventserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
// Command eventserver is a socket server which reads events from an event
// source and forwards them to the user clients when appropriate.
package main
import (
"bufio"
"errors"
"flag"
"io"
"log"
"net"
"os"
"os/signal"
"strconv"
"strings"
"github.com/sahildua2305/go-eventserver/config"
)
var (
logInfo *log.Logger
logErr *log.Logger
)
// eventServer represents the server state.
type eventServer struct {
// Listener object listening for event source on EventListenerPort.
esListener net.Listener
// Listener object listening for user clients on ClientListenerPort.
ucListener net.Listener
// Boolean to keep track of the running state of the server.
hasStopped bool
// Channel to support communication with go routines while stopping
// the server gracefully.
quit chan struct{}
currentEventSequence int
}
// event represents an event struct as received by the event source.
type event struct {
payload string // Raw event message which will be sent to user clients
sequence int // Sequence number of the event
eventType string // Type of the event among valid types - F, U, B, P, S
fromUserID int // From User Id, meaning depends on the event type
toUserID int // To User Id, meaning depends on the event type
}
// userClient represents a user client that connects to the server.
type userClient struct {
userID int // User Id that the user client represents
conn net.Conn // Connection object for the user client
}
// Initialize the two log handlers for INFO and ERROR level.
func init() {
logInfo = log.New(os.Stdout, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
logErr = log.New(os.Stderr, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile)
}
// Function that acts as the starting of the server.
// Returns *eventServer pointer reference with all fields set.
func startServer(cfg *config.EventServerConfig) (*eventServer, error) {
quit := make(chan struct{})
es := &eventServer{
currentEventSequence: 1,
quit: quit,
hasStopped: false,
}
// Creates a background worker for handling events processing and new
// user client connections.
eventsChan, usersChan, err := es.backgroundWorkerInit(quit)
if err != nil {
return nil, err
}
// Start listening on EventListenerPort.
esl, err := net.Listen("tcp", ":"+strconv.Itoa((*cfg).EventListenerPort))
if err != nil {
recover()
return nil, err
}
logInfo.Println("Listening for event source on port:", (*cfg).EventListenerPort)
// Start listening on ClientListenerPort.
ucl, err := net.Listen("tcp", ":"+strconv.Itoa((*cfg).ClientListenerPort))
if err != nil {
recover()
return nil, err
}
logInfo.Println("Listening for user clients on port:", (*cfg).ClientListenerPort)
// Go routine to handle event source connections.
go es.listenForEventSource(esl, eventsChan, quit)
// Go routine to handle user client connections.
go es.listenForUserClients(ucl, usersChan, quit)
es.esListener = esl
es.ucListener = ucl
return es, nil
}
// Function to stop the running event server **gracefully**.
// In case, the event server has already stopped, this will throw an error.
// If server is running, it will close the quit channel, hence signalling
// to all the running go routines to return and also close the listeners for
// the event source and the user clients.
func (e *eventServer) gracefulStop() error {
if e == nil {
return errors.New("invalid event server passed")
}
if e.hasStopped {
return errors.New("event server has already been stopped")
}
close(e.quit) // close the quit channel
e.hasStopped = true
e.esListener.Close() // close the event source listener
e.ucListener.Close() // close the user clients listener
return nil
}
// backgroundWorkerInit function which initializes a new go routine to run in
// background. The go routine is mainly used to separate the two operations:
// - processing the received events
// - storing new user client connections
//
// We could have done this thing in two separate functions keeping the event
// processing part coupled with the event receiving function and keeping the
// user handling part coupled with the function receiving user connections.
// However, that would be vulnerable to race conditions.
//
// We need this function to avoid the race conditions between processing
// the incoming events and the storing new user clients.
func (es *eventServer)backgroundWorkerInit(quit chan struct{}) (chan<- event, chan<- userClient, error) {
// Channel to keep the incoming events.
eventsChan := make(chan event)
// Channel to keep the incoming user clients.
usersChan := make(chan userClient)
// Map used to store the event channels assigned for each userID.
userEventChannels := make(map[int]chan event)
// Map used to store the events against their sequence number. This map
// is basically used as a queue to make sure we keep adding the new events
// to the queue at proper indices and keep processing as and when we can.
// We could have used an actual priority queue here to keep events in
// sequence, however, using a map seems to be a reasonably good choice
// because we can easily check whether we have an event for a sequence
// number or not.
eventsMap := make(map[int]event)
// Map used to store the followers for every user. Basically, we want to
// maintain a list of followers for every user, but then it will be
// computationally complex to delete any follower when we get 'Unfollow' event.
// Hence, we use a map which stores a map of followers for every user.
// A map allows us to (computationally) easily add or delete any new follower
// as well as iterate over all followers of a user.
followersMap := make(map[int]map[int]bool)
// Run a goroutine in background to handle any incoming events as well as
// incoming new user client connections.
// This is done in a different routine to make sure we don't
// block the event source or the user client routine. Events source
// routine can keep reading the events from the event source and sending them
// to the events channel to process in correct sequence. Similarly, the user
// client routine can keep accepting the new user connections and sending
// them to the users channel.
go func() {
for {
select {
// For handling the new connecting users.
case newUser := <-usersChan:
// Create a new channel for sending events to this user.
// This channel will be used whenever we want to send any
// event to this user client.
userEventChan := make(chan event, 1)
// Store the newly created event channel against the userID.
// This will eventually be used to decide which all channels
// should an event be sent to.
userEventChannels[newUser.userID] = userEventChan
// Wait for incoming events on this user channel.
go waitForEvent(newUser, userEventChan, quit)
// For handling the new incoming events from event source.
case ev := <-eventsChan:
// Add that event to the map against its sequence number.
eventsMap[ev.sequence] = ev
// Process as many events we can process after arrival of this event.
es.processEventsInOrder(eventsMap, userEventChannels, followersMap)
// For returning from the go routine.
case <-quit:
return
}
}
}()
return eventsChan, usersChan, nil
}
// Wait for a given user client to receive events on its assigned events channel.
func waitForEvent(uc userClient, userEventChan <-chan event, quit <-chan struct{}) {
for {
// Listen for either an event on user's assigned event channel
// or something on the quit channel.
select {
// For handling the incoming event message.
case ev := <-userEventChan:
writeEvent(uc, ev)
// For returning from the go routine.
case <-quit:
return
}
}
}
// Writes a given event's payload to a given user's connection.
// TODO: It's possible that the given user has disconnected.
func writeEvent(uc userClient, ev event) {
_, err := uc.conn.Write([]byte(ev.payload + "\r\n"))
if err != nil {
logErr.Println("Unable to write to user:", uc.userID)
}
}
// Keep processing the events as long as we can. That basically means
// that we will keep processing the events in sequence order as long
// as we have already received the events. We will stop as we find some
// sequence for which event is missing.
func (es *eventServer)processEventsInOrder(eventsMap map[int]event, userEventChannels map[int]chan event, followersMap map[int]map[int]bool) {
for {
e, ok := eventsMap[es.currentEventSequence]
if !ok {
break
}
delete(eventsMap, e.sequence)
processSingleEvent(e, userEventChannels, followersMap)
es.currentEventSequence++
}
}
// Accepts connection for event source and starts listening for events
// in a go routine. The read event is then sent to the events channel
// created by backgroundWorkerInit() to process in correct order of sequence number.
func (es *eventServer)listenForEventSource(listener net.Listener, eventsChan chan<- event, quit <-chan struct{}) {
for {
connChan := make(chan net.Conn, 1)
// We have to accept the connections in a different go routine now,
// because otherwise we won't be able to quit gracefully from this
// routine. To be able to support quit channel functioning, we need
// to make everything else in this go routine non-blocking.
go acceptConnAndSendToChan(listener, connChan)
// Now since we need to end the go routines when the server is quit,
// we need to have this blocking listener for one of the two channels:
// - quit: when server is about to quit, close the listener.
// - connChan: when a new connection is accepted.
select {
case <-quit:
listener.Close()
return
case conn := <-connChan:
// Once the event source has connected, start listening to the events
// in a go routine and perform these actions:
// - Read message sent from the event source.
// - Parse the event message to form event struct.
// - Send the parsed event to eventsChan.
go func() {
r := bufio.NewReader(conn)
for {
// Read new event message from the event source
message, err := r.ReadString('\n')
if err != nil {
if err == io.EOF {
logInfo.Println("End of messages from event source, got EOF")
return
}
logErr.Println("Unable to read from event source, got error:", err)
return
}
// Clean/trim the message.
message = strings.Trim(message, "\n")
message = strings.Trim(message, "\r")
// Parse the message to form event struct
event, err := parseEvent(message)
if err != nil {
logErr.Println("Error while parsing the event message, got error:", err)
continue
}
// Send the parsed event to the eventsChan.
eventsChan <- *event
}
}()
}
}
}
// Accepts the connections on given listener and sends the connections to
// the given channel if the connection was successful.
func acceptConnAndSendToChan(listener net.Listener, connChan chan<- net.Conn) {
conn, err := listener.Accept()
if err != nil {
return
}
connChan <- conn
}
// Reads the event message string and parses it into event struct.
// Parse the payload and sequence first and then parse the fromUserID and
// toUserID depending on the type of the event (F|U|B|P|S).
func parseEvent(message string) (*event, error) {
var e event
var err error
e.payload = message
ev := strings.Split(message, "|")
if len(ev) < 2 || len(ev) > 4 {
return nil, errors.New("invalid event message format")
}
e.sequence, err = strconv.Atoi(ev[0])
if err != nil {
return nil, err
}
e.eventType = ev[1]
switch e.eventType {
case "F", "U", "P":
// Follow / Unfollow / Private event
finalEvent, err := fillUserIds(e, ev)
if err != nil {
return nil, err
}
return finalEvent, nil
case "B":
// Broadcast message event
if len(ev) != 2 {
return nil, errors.New("invalid event message format")
}
return &e, nil
case "S":
// Status update event
if len(ev) != 3 {
return nil, errors.New("invalid event message format")
}
e.fromUserID, err = strconv.Atoi(ev[2])
if err != nil {
return nil, err
}
e.toUserID = 0
return &e, nil
}
return nil, errors.New("invalid event type")
}
// Converts the splitted event message and converts the User Ids from it.
// Then fills them into the event struct and returns the final event.
func fillUserIds(e event, s []string) (*event, error) {
if len(s) != 4 {
return nil, errors.New("invalid event message format")
}
fromUserID, err := strconv.Atoi(s[2])
if err != nil {
return nil, err
}
toUserID, err := strconv.Atoi(s[3])
if err != nil {
return nil, err
}
e.fromUserID = fromUserID
e.toUserID = toUserID
return &e, nil
}
// Accepts the parsed event and depending on the type of the event, sends event
// to the user channels associated with the user clients which should be notified
// for a particular event.
func processSingleEvent(e event, userEventChannels map[int]chan event, followersMap map[int]map[int]bool) {
switch e.eventType {
case "F":
// Follow event
// Get the existing followers of the user.
f, exists := followersMap[e.toUserID]
if !exists {
f = make(map[int]bool)
}
// Add a new follower to the list of followers and save it in followersMap.
f[e.fromUserID] = true
followersMap[e.toUserID] = f
// Send event to the channel assigned for user event.toUserID.
if uec, exists := userEventChannels[e.toUserID]; exists {
uec <- e
}
case "U":
// Unfollow event
// Get all the followers of the user and delete entry for toUserID.
if f, exists := followersMap[e.toUserID]; exists {
delete(f, e.fromUserID)
}
case "B":
// Broadcast message event
// Send message to channels for all connected user clients.
for _, uec := range userEventChannels {
uec <- e
}
case "P":
// Private message event
// Send event to the channel assigned for user event.toUserID.
if uec, exists := userEventChannels[e.toUserID]; exists {
uec <- e
}
case "S":
// Status update event
for u := range followersMap[e.fromUserID] {
// Send event to the channel assigned for every follower
if uec, exists := userEventChannels[u]; exists {
uec <- e
}
}
}
}
// Accepts connection for new user clients and starts listening for their
// first message in a go routine. The read message contains the userID of
// the user a client represents. After a userID is read, the userClient
// is sent to the users channel which was created using backgroundWorkerInit().
func (es *eventServer)listenForUserClients(listener net.Listener, usersChan chan<- userClient, quit <-chan struct{}) {
for {
connChan := make(chan net.Conn, 1)
// We have to accept the connections in a different go routine now,
// because otherwise we won't be able to quit gracefully from this
// routine. To be able to support quit channel functioning, we need
// to make everything else in this go routine non-blocking.
go acceptConnAndSendToChan(listener, connChan)
select {
case <-quit:
listener.Close()
return
case conn := <-connChan:
// Once a user client has connected, we go into a go routine to
// read the message from the client which will contain the userID
// associated with the client.
// We also need to handle the case when we don't receive any message
// from the connected client.
go func() {
userID, err := readAndParseUserID(conn)
if err != nil {
logErr.Println("Unable to receive user id from the client, got error:", err)
return
}
// Send this user client to usersChan which we created in the
// background worker.
usersChan <- userClient{
userID: *userID,
conn: conn,
}
}()
}
}
}
// Reads the message from user client connection and parses the message
// to fetch the userID that connection represents.
// Returns the parsed userID if successful, otherwise error.
func readAndParseUserID(conn net.Conn) (*int, error) {
m, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
return nil, err
}
m = strings.Trim(m, "\n")
m = strings.Trim(m, "\r")
userID, err := strconv.Atoi(m)
if err != nil {
return nil, err
}
return &userID, nil
}
func main() {
cfgFile := flag.String("config", "./config/config.json", "path of config file to load")
flag.Parse()
// Read server configuration from local config.json
cfg, err := config.LoadEventServerConfig(*cfgFile)
if err != nil {
logErr.Fatalln("Unable to load server config, got error:", err)
}
logInfo.Println("Loaded the event server config from:", *cfgFile)
es, err := startServer(cfg)
if err != nil {
logErr.Fatalln("Unable to start the server, got error:", err)
}
logInfo.Println("Started the event server")
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
// Wait for a signal on this channel.
<-signalChan
// Once a signal is received on signalChan, stop the server gracefully.
logInfo.Println("Stopping the event server gracefully")
err = es.gracefulStop()
if err != nil {
logErr.Fatalln("Unable to stop the server gracefully, got error:", err)
}
signal.Stop(signalChan)
logInfo.Println("Exiting event server!")
}