forked from mtfelian/golang-socketio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent.go
139 lines (113 loc) · 3.42 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package gosocketio
import (
"encoding/json"
"reflect"
"sync"
"github.com/mtfelian/golang-socketio/logging"
"github.com/mtfelian/golang-socketio/protocol"
)
const (
OnConnection = "connection"
OnDisconnection = "disconnection"
OnError = "error"
)
// systemEventHandler function for internal handler processing
type systemEventHandler func(c *Channel)
// event abstracts a mapping of a handler names to handler functions
type event struct {
handlers map[string]*handler // maps handler name to handler function representation
handlersMu sync.RWMutex
onConnection systemEventHandler
onDisconnection systemEventHandler
}
// init initializes events mapping
func (e *event) init() { e.handlers = make(map[string]*handler) }
// On registers message processing function and binds it to the given event name
func (e *event) On(name string, f interface{}) error {
c, err := newHandler(f)
if err != nil {
return err
}
e.handlersMu.Lock()
e.handlers[name] = c
e.handlersMu.Unlock()
return nil
}
// findHandler returns a handler representation for the given event name
// the second parameter is true if such event found.
func (e *event) findHandler(name string) (*handler, bool) {
e.handlersMu.RLock()
f, ok := e.handlers[name]
e.handlersMu.RUnlock()
return f, ok
}
// callHandler for the given channel c and event name
func (e *event) callHandler(c *Channel, name string) {
if e.onConnection != nil && name == OnConnection {
logging.Log().Debug("event.callHandler(): OnConnection handler")
e.onConnection(c)
}
if e.onDisconnection != nil && name == OnDisconnection {
e.onDisconnection(c)
}
f, ok := e.findHandler(name)
if !ok {
logging.Log().Debug("event.callHandler(): handler not found")
return
}
f.call(c, &struct{}{})
}
// processIncoming checks incoming message m on channel c
func (e *event) processIncoming(c *Channel, m *protocol.Message) {
logging.Log().Debug("event.processIncoming() fired with:", m)
switch m.Type {
case protocol.MessageTypeEmit:
logging.Log().Debug("event.processIncoming() is finding handler for msg.Event:", m.EventName)
f, ok := e.findHandler(m.EventName)
if !ok {
logging.Log().Debug("event.processIncoming(): handler not found")
return
}
logging.Log().Debug("event.processIncoming() found handler:", f)
if !f.hasArgs {
f.call(c, &struct{}{})
return
}
data := f.arguments()
logging.Log().Debug("event.processIncoming(), f.arguments() returned:", data)
if err := json.Unmarshal([]byte(m.Args), &data); err != nil {
logging.Log().Infof("event.processIncoming() failed to json.Unmaeshal(). msg.Args: %s, data: %v, err: %v",
m.Args, data, err)
return
}
f.call(c, data)
case protocol.MessageTypeAckRequest:
logging.Log().Debug("event.processIncoming() ack request")
f, ok := e.findHandler(m.EventName)
if !ok || !f.out {
return
}
var result []reflect.Value
if f.hasArgs {
// data type should be defined for Unmarshal()
data := f.arguments()
if err := json.Unmarshal([]byte(m.Args), &data); err != nil {
return
}
result = f.call(c, data)
} else {
result = f.call(c, &struct{}{})
}
ackResponse := &protocol.Message{
Type: protocol.MessageTypeAckResponse,
AckID: m.AckID,
}
c.send(ackResponse, result[0].Interface())
case protocol.MessageTypeAckResponse:
logging.Log().Debug("event.processIncoming() ack response")
ackC, err := c.ack.obtain(m.AckID)
if err == nil {
ackC <- m.Args
}
}
}