Skip to content

Commit

Permalink
Merge pull request #413 from lesismal/ws_parse_with_mux
Browse files Browse the repository at this point in the history
Ws parse with mux
  • Loading branch information
lesismal authored Apr 2, 2024
2 parents 327f576 + 0c5fcc1 commit a2b2d7a
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 183 deletions.
2 changes: 1 addition & 1 deletion conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *Conn) Hash() int {
return c.fd
}

// AsyncRead is used for reading data async.
// AsyncReadInPoller is used for reading data async.
func (c *Conn) AsyncRead() {
g := c.p.g

Expand Down
26 changes: 25 additions & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ type Config struct {
// ListenUDP is used to create udp listener for Engine.
ListenUDP func(network string, laddr *net.UDPAddr) (*net.UDPConn, error)

AsyncRead bool
// AsyncReadInPoller represents how the reading events and reading are handled
// by epoll goroutine:
// true : epoll goroutine handles the reading events only, another goroutine
// pool will handles the reading.
// false: epoll goroutine handles both the reading events and the reading.
AsyncReadInPoller bool
// IOExecute is used to handle the aysnc reading, users can customize it.
IOExecute func(f func([]byte))
}

Expand Down Expand Up @@ -146,6 +152,24 @@ type Engine struct {
ioTaskPool *taskpool.IOTaskPool
}

// SetETAsyncRead .
func (e *Engine) SetETAsyncRead() {
if e.NPoller <= 0 {
e.NPoller = 1
}
e.EpollMod = EPOLLET
e.AsyncReadInPoller = true
}

// SetLTSyncRead .
func (e *Engine) SetLTSyncRead() {
if e.NPoller <= 0 {
e.NPoller = runtime.NumCPU()
}
e.EpollMod = EPOLLLT
e.AsyncReadInPoller = false
}

// Stop closes listeners/pollers/conns/timer.
func (g *Engine) Stop() {
for _, l := range g.listeners {
Expand Down
5 changes: 4 additions & 1 deletion engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (g *Engine) Start() error {
g.Timer.Start()
g.isOneshot = (g.EpollMod == EPOLLET && g.EPOLLONESHOT == EPOLLONESHOT)

if g.AsyncRead {
if g.AsyncReadInPoller {
if g.IOExecute == nil {
g.ioTaskPool = taskpool.NewIO(0, 0, 0)
g.IOExecute = g.ioTaskPool.Go
Expand Down Expand Up @@ -145,6 +145,9 @@ func NewEngine(conf Config) *Engine {
}
if conf.NPoller <= 0 {
conf.NPoller = runtime.NumCPU() / 4
if conf.AsyncReadInPoller && conf.EpollMod == EPOLLET {
conf.NPoller = 1
}
if conf.NPoller == 0 {
conf.NPoller = 1
}
Expand Down
81 changes: 58 additions & 23 deletions nbhttp/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ type Config struct {

// WebsocketDecompressor .
WebsocketDecompressor func(r io.Reader) io.ReadCloser

// AsyncReadInPoller represents how the reading events and reading are handled
// by epoll goroutine:
// true : epoll goroutine handles the reading events only, another goroutine
// pool will handles the reading.
// false: epoll goroutine handles both the reading events and the reading.
// false is by defalt.
AsyncReadInPoller bool
// IOExecute is used to handle the aysnc reading, users can customize it.
IOExecute func(f func([]byte))
}

// Engine .
Expand Down Expand Up @@ -416,6 +426,26 @@ func (e *Engine) stopListeners() {
}
}

// SetETAsyncRead .
func (e *Engine) SetETAsyncRead() {
if e.NPoller <= 0 {
e.NPoller = 1
}
e.EpollMod = nbio.EPOLLET
e.AsyncReadInPoller = true
e.Engine.SetETAsyncRead()
}

// SetLTSyncRead .
func (e *Engine) SetLTSyncRead() {
if e.NPoller <= 0 {
e.NPoller = runtime.NumCPU()
}
e.EpollMod = nbio.EPOLLLT
e.AsyncReadInPoller = false
e.Engine.SetLTSyncRead()
}

// Start .
func (e *Engine) Start() error {
modNames := map[int]string{
Expand Down Expand Up @@ -493,17 +523,17 @@ func (e *Engine) DataHandler(c *nbio.Conn, data []byte) {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
logging.Error("execute ReadCloser failed: %v\n%v\n", err, *(*string)(unsafe.Pointer(&buf)))
logging.Error("execute ParserCloser failed: %v\n%v\n", err, *(*string)(unsafe.Pointer(&buf)))
}
}()
readerCloser := c.Session().(ReadCloser)
readerCloser := c.Session().(ParserCloser)
if readerCloser == nil {
logging.Error("nil ReadCloser")
logging.Error("nil ParserCloser")
return
}
err := readerCloser.Read(data)
err := readerCloser.Parse(data)
if err != nil {
logging.Debug("ReadCloser.Read failed: %v", err)
logging.Debug("ParserCloser.Read failed: %v", err)
c.CloseWithError(err)
}
}
Expand All @@ -515,16 +545,16 @@ func (e *Engine) TLSDataHandler(c *nbio.Conn, data []byte) {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
logging.Error("execute ReadCloser failed: %v\n%v\n", err, *(*string)(unsafe.Pointer(&buf)))
logging.Error("execute ParserCloser failed: %v\n%v\n", err, *(*string)(unsafe.Pointer(&buf)))
}
}()
readCloser := c.Session().(ReadCloser)
if readCloser == nil {
logging.Error("nil ReadCloser")
parserCloser := c.Session().(ParserCloser)
if parserCloser == nil {
logging.Error("nil ParserCloser")
c.Close()
return
}
nbhttpConn, ok := readCloser.UnderlayerConn().(*Conn)
nbhttpConn, ok := parserCloser.UnderlayerConn().(*Conn)
if ok {
if tlsConn, ok := nbhttpConn.Conn.(*tls.Conn); ok {
defer tlsConn.ResetOrFreeBuffer()
Expand All @@ -539,9 +569,9 @@ func (e *Engine) TLSDataHandler(c *nbio.Conn, data []byte) {
return
}
if nread > 0 {
err := readCloser.Read(buffer[:nread])
err := parserCloser.Parse(buffer[:nread])
if err != nil {
logging.Debug("ReadCloser.Read failed: %v", err)
logging.Debug("ParserCloser.Read failed: %v", err)
c.CloseWithError(err)
return
}
Expand Down Expand Up @@ -776,11 +806,11 @@ func (engine *Engine) readConnBlocking(conn *Conn, parser *Parser, decrease func
}

buffer := readBufferPool.Malloc(engine.BlockingReadBufferSize)
var readCloser ReadCloser = parser
var parserCloser ParserCloser = parser
defer func() {
readBufferPool.Free(buffer)
if !conn.Trasfered {
readCloser.CloseAndClean(err)
parserCloser.CloseAndClean(err)
}
engine.mux.Lock()
switch vt := conn.Conn.(type) {
Expand All @@ -799,14 +829,14 @@ func (engine *Engine) readConnBlocking(conn *Conn, parser *Parser, decrease func
if err != nil {
return
}
readCloser.Read(buffer[:n])
parserCloser.Parse(buffer[:n])
if conn.Trasfered {
parser.onClose = nil
parser.CloseAndClean(nil)
return
}
if parser != nil && parser.ReadCloser != nil {
readCloser = parser.ReadCloser
if parser != nil && parser.ParserCloser != nil {
parserCloser = parser.ParserCloser
parser.onClose = nil
parser.CloseAndClean(nil)
parser = nil
Expand All @@ -825,11 +855,11 @@ func (engine *Engine) readTLSConnBlocking(conn *Conn, rconn net.Conn, tlsConn *t
readBufferPool = getReadBufferPool(engine.BlockingReadBufferSize)
}
buffer := readBufferPool.Malloc(engine.BlockingReadBufferSize)
var readCloser ReadCloser = parser
var parserCloser ParserCloser = parser
defer func() {
readBufferPool.Free(buffer)
if !conn.Trasfered {
readCloser.CloseAndClean(err)
parserCloser.CloseAndClean(err)
tlsConn.Close()
}
engine.mux.Lock()
Expand Down Expand Up @@ -857,7 +887,7 @@ func (engine *Engine) readTLSConnBlocking(conn *Conn, rconn net.Conn, tlsConn *t
return
}
if nread > 0 {
err = readCloser.Read(buffer[:nread])
err = parserCloser.Parse(buffer[:nread])
if err != nil {
logging.Debug("parser.Read failed: %v", err)
return
Expand All @@ -867,8 +897,8 @@ func (engine *Engine) readTLSConnBlocking(conn *Conn, rconn net.Conn, tlsConn *t
parser.CloseAndClean(nil)
return
}
if parser != nil && parser.ReadCloser != nil {
readCloser = parser.ReadCloser
if parser != nil && parser.ParserCloser != nil {
parserCloser = parser.ParserCloser
parser.onClose = nil
parser.CloseAndClean(nil)
parser = nil
Expand All @@ -891,6 +921,9 @@ func NewEngine(conf Config) *Engine {
}
if conf.NPoller <= 0 {
conf.NPoller = runtime.NumCPU() / 4
if conf.AsyncReadInPoller && conf.EpollMod == nbio.EPOLLET {
conf.NPoller = 1
}
if conf.NPoller == 0 {
conf.NPoller = 1
}
Expand Down Expand Up @@ -989,6 +1022,8 @@ func NewEngine(conf Config) *Engine {
LockListener: conf.LockListener,
EpollMod: conf.EpollMod,
EPOLLONESHOT: conf.EPOLLONESHOT,
AsyncReadInPoller: conf.AsyncReadInPoller,
IOExecute: conf.IOExecute,
}
g := nbio.NewEngine(gopherConf)
g.Execute = serverExecutor
Expand Down Expand Up @@ -1038,7 +1073,7 @@ func NewEngine(conf Config) *Engine {
g.OnClose(func(c *nbio.Conn, err error) {
c.MustExecute(func() {
switch vt := c.Session().(type) {
case ReadCloser:
case ParserCloser:
vt.CloseAndClean(err)
default:
}
Expand Down
14 changes: 7 additions & 7 deletions nbhttp/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ const (
MaxInt = int64(int(MaxUint >> 1))
)

type ReadCloser interface {
type ParserCloser interface {
UnderlayerConn() net.Conn
Read(data []byte) error
Parse(data []byte) error
CloseAndClean(err error)
}

Expand All @@ -44,7 +44,7 @@ type Parser struct {

onClose func(p *Parser, err error)

ReadCloser ReadCloser
ParserCloser ParserCloser

Engine *Engine

Expand Down Expand Up @@ -132,7 +132,7 @@ func parseAndValidateChunkSize(originalStr string) (int, error) {
}

// Read .
func (p *Parser) Read(data []byte) error {
func (p *Parser) Parse(data []byte) error {
p.mux.Lock()
defer p.mux.Unlock()

Expand All @@ -155,12 +155,12 @@ func (p *Parser) Read(data []byte) error {
}

UPGRADER:
if p.ReadCloser != nil {
if p.ParserCloser != nil {
udata := data
if start > 0 {
udata = data[start:]
}
err := p.ReadCloser.Read(udata)
err := p.ParserCloser.Parse(udata)
if p.cache != nil {
mempool.Free(p.cache)
p.cache = nil
Expand All @@ -170,7 +170,7 @@ UPGRADER:

var c byte
for i := offset; i < len(data); i++ {
if p.ReadCloser != nil {
if p.ParserCloser != nil {
p.Processor.Clean(p)
goto UPGRADER
}
Expand Down
10 changes: 5 additions & 5 deletions nbhttp/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,18 @@ func testParser(t *testing.T, isClient bool, data []byte) error {
parser.Conn.Close()
}
}()
err := parser.Read(data)
err := parser.Parse(data)
if err != nil {
t.Fatal(err)
}

for i := 0; i < len(data)-1; i++ {
err = parser.Read(append([]byte{}, data[i:i+1]...))
err = parser.Parse(append([]byte{}, data[i:i+1]...))
if err != nil {
t.Fatal(err)
}
}
err = parser.Read(append([]byte{}, data[len(data)-1:]...))
err = parser.Parse(append([]byte{}, data[len(data)-1:]...))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func testParser(t *testing.T, isClient bool, data []byte) error {
readBuf := append([]byte{}, tmp[:nRead]...)
reads = append(reads, readBuf)
tmp = tmp[nRead:]
err = parser.Read(readBuf)
err = parser.Parse(readBuf)
if err != nil {
t.Fatalf("nRead: %v, numOne: %v, reads: %v, error: %v", len(data)-len(tmp), len(data), reads, err)
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func BenchmarkServerProcessor(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 5; j++ {
err := parser.Read(benchData)
err := parser.Parse(benchData)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion nbhttp/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (p *ServerProcessor) flushResponse(parser *Parser, res *Response) {
if req.Close {
// the data may still in the send queue
conn.Close()
} else if parser.ReadCloser == nil {
} else if parser.ParserCloser == nil {
conn.SetReadDeadline(time.Now().Add(engine.KeepaliveTime))
}
}
Expand Down
Loading

0 comments on commit a2b2d7a

Please sign in to comment.