-
Notifications
You must be signed in to change notification settings - Fork 5
/
subreactor.go
98 lines (81 loc) · 2.67 KB
/
subreactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package znet
import (
"github.com/ebar-go/ego/utils/runtime"
"github.com/ebar-go/ego/utils/structure"
)
type SubReactor interface {
RegisterConnection(conn *Connection)
UnregisterConnection(conn *Connection)
GetConnection(fd int) *Connection
Offer(fds ...int)
Polling(stopCh <-chan struct{}, callback func(int))
}
// SingleSubReactor represents sub reactor
type SingleSubReactor struct {
// buffer manage active file descriptors
buffer *structure.Queue[int]
// container manage all connections
container *structure.ConcurrentMap[int, *Connection]
}
// RegisterConnection registers a new connection to the epoll listener
func (sub *SingleSubReactor) RegisterConnection(conn *Connection) {
sub.container.Set(conn.fd, conn)
}
// UnregisterConnection removes the connection from the epoll listener
func (sub *SingleSubReactor) UnregisterConnection(conn *Connection) {
sub.container.Del(conn.fd)
}
// GetConnection returns a connection by fd
func (sub *SingleSubReactor) GetConnection(fd int) *Connection {
conn, _ := sub.container.Get(fd)
return conn
}
// Offer push the active connections fd to the queue
func (sub *SingleSubReactor) Offer(fds ...int) {
sub.buffer.Offer(fds...)
}
// Polling poll with callback function
func (sub *SingleSubReactor) Polling(stopCh <-chan struct{}, callback func(int)) {
sub.buffer.Polling(stopCh, func(active int) {
callback(active)
})
}
// NewSingleSubReactor creates an instance of a SingleSubReactor
func NewSingleSubReactor(bufferSize int) *SingleSubReactor {
return &SingleSubReactor{
buffer: structure.NewQueue[int](bufferSize),
container: structure.NewConcurrentMap[int, *Connection](),
}
}
type ShardSubReactor struct {
container structure.Sharding[*SingleSubReactor]
}
func (shard *ShardSubReactor) RegisterConnection(conn *Connection) {
shard.container.GetShard(conn.fd).RegisterConnection(conn)
}
func (shard *ShardSubReactor) UnregisterConnection(conn *Connection) {
shard.container.GetShard(conn.fd).UnregisterConnection(conn)
}
func (shard *ShardSubReactor) GetConnection(fd int) *Connection {
return shard.container.GetShard(fd).GetConnection(fd)
}
func (shard *ShardSubReactor) Offer(fds ...int) {
for _, fd := range fds {
shard.container.GetShard(fd).Offer(fd)
}
}
func (shard *ShardSubReactor) Polling(stopCh <-chan struct{}, callback func(int)) {
shard.container.Iterator(func(sub *SingleSubReactor) {
go func() {
defer runtime.HandleCrash()
sub.Polling(stopCh, callback)
}()
})
}
func NewShardSubReactor(shardCount, bufferSize int) *ShardSubReactor {
return &ShardSubReactor{
container: structure.NewSharding[*SingleSubReactor](shardCount, func() *SingleSubReactor {
return NewSingleSubReactor(bufferSize)
}),
}
}