forked from ebar-go/znet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread.go
64 lines (55 loc) · 1.32 KB
/
thread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package znet
import (
"github.com/ebar-go/ego/utils/pool"
"github.com/ebar-go/ego/utils/runtime"
"github.com/ebar-go/znet/codec"
"log"
)
// Thread represents context manager
type Thread struct {
options ThreadOptions
codec codec.Codec
worker pool.GoroutinePool
engine *Engine
}
// NewThread returns a new Thread instance
func NewThread(options ThreadOptions) *Thread {
return &Thread{
options: options,
codec: options.NewCodec(),
worker: options.NewWorkerPool(),
engine: NewEngine(),
}
}
// Use registers middleware
func (thread *Thread) Use(handlers ...HandleFunc) {
thread.engine.Use(handlers...)
}
// HandleRequest handle new request for connection
func (thread *Thread) HandleRequest(conn *Connection) {
// read message from connection
var (
n = 0
bytes = pool.GetByte(thread.options.MaxReadBufferSize)
packet = codec.NewPacket(thread.codec)
)
err := runtime.Call(func() (lastErr error) {
n, lastErr = conn.Read(bytes)
return
}, func() error {
return packet.Unpack(bytes[:n])
})
if err != nil {
log.Printf("[%s] read failed: %v\n", conn.ID(), err)
// put back immediately when decode failed
pool.PutByte(bytes)
conn.Close()
return
}
// compute
thread.worker.Schedule(func() {
defer runtime.HandleCrash()
defer pool.PutByte(bytes)
thread.engine.compute(conn, packet)
})
}