Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opt in PanicHandler #646

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ type Server struct {
// * ErrBrokenChunks
ErrorHandler func(ctx *RequestCtx, err error)

// PanicHandler for reacting on a panic. Called with the result of calling recover() if it is not nil.
//
// To be on the safe side, implementors are advised to re-panic so the stack would continue to unwind.
// This would make sure you're not left in some inconsistent state due to the original panic.
PanicHandler func(r interface{})

// HeaderReceived is called after receiving the header
//
// non zero RequestConfig field values will overwrite the default configs
Expand Down Expand Up @@ -1578,6 +1584,7 @@ func (s *Server) Serve(ln net.Listener) error {
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
PanicHandler: s.PanicHandler,
Logger: s.logger(),
connState: s.setState,
}
Expand Down Expand Up @@ -2147,14 +2154,20 @@ func (s *Server) setState(nc net.Conn, state ConnState) {

func hijackConnHandler(r io.Reader, c net.Conn, s *Server, h HijackHandler) {
hjc := s.acquireHijackConn(r, c)
h(hjc)

if br, ok := r.(*bufio.Reader); ok {
releaseReader(s, br)
if s.PanicHandler != nil {
defer func() {
s.cleanAfterHijackConn(r, c, hjc)
if r := recover(); r != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this r shadows io.Reader defined above. I think we should rename this variable

s.PanicHandler(r)
}
}()
}
if !s.KeepHijackedConns {
c.Close()
s.releaseHijackConn(hjc)

h(hjc)

if s.PanicHandler == nil {
s.cleanAfterHijackConn(r, c, hjc)
}
}

Expand All @@ -2180,6 +2193,16 @@ func (s *Server) releaseHijackConn(hjc *hijackConn) {
s.hijackConnPool.Put(hjc)
}

func (s *Server) cleanAfterHijackConn(r io.Reader, c net.Conn, hjc *hijackConn) {
if br, ok := r.(*bufio.Reader); ok {
releaseReader(s, br)
}
if !s.KeepHijackedConns {
c.Close()
s.releaseHijackConn(hjc)
}
}

type hijackConn struct {
net.Conn
r io.Reader
Expand Down
26 changes: 23 additions & 3 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type workerPool struct {

LogAllErrors bool

PanicHandler func(r interface{})

MaxIdleWorkerDuration time.Duration

Logger Logger
Expand Down Expand Up @@ -200,9 +202,27 @@ func (wp *workerPool) release(ch *workerChan) bool {
return true
}

func (wp *workerPool) workerDone() {
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}

func (wp *workerPool) workerFunc(ch *workerChan) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that this change will silently affect users with PanicHandler == nil as well.

if wp.PanicHandler != nil for == nil would at least execute redundant JZ, and the if wp.PanicHandler == nil below implies something like

JNZ FUNCRET;
JMP WORKER_DONE;
FUNCRET:

and that's all while the user doesn't even use this feature.

while I understand what (*wp).workerDone() is easier to read in the context of this feature, I'm not sure that this feature is really required, or at least that this implementation is optimal for both users who want the PanicHandler and who don't.

Again, I use fasthttp for several years now and I still didn't find a case when fasthttp would panic.

I'd like to hear some more thoughts about this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it's better if we just handle panics from Response.SetBodyStream and Response.SetBodyStreamWriter and use s.logger().Printf("... and return without writing anything more in the response as we don't know what's written already.

For people who want more control it's not that hard to wrap the reader and handle these panics themselves.

This way we don't expose any extra API again.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the above.

Moreover, #687 is definitely a step in the right direction for us. So closing this one.

var c net.Conn

if wp.PanicHandler != nil {
defer func() {
wp.workerDone()
if r := recover(); r != nil {
if c != nil {
c.Close()
}
wp.PanicHandler(r)
}
}()
}

var err error
for c = range ch.ch {
if c == nil {
Expand Down Expand Up @@ -231,7 +251,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
}
}

wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
if wp.PanicHandler == nil {
wp.workerDone()
}
}
110 changes: 110 additions & 0 deletions workerpool_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package fasthttp

import (
"fmt"
"io/ioutil"
"net"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -167,3 +169,111 @@ func testWorkerPoolMaxWorkersCount(t *testing.T) {
}
wp.Stop()
}

func TestWorkerPoolPanicErrorSerial(t *testing.T) {
testWorkerPoolPanicErrorMulti(t)
}

func TestWorkerPoolPanicErrorConcurrent(t *testing.T) {
concurrency := 10
ch := make(chan struct{}, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
testWorkerPoolPanicErrorMulti(t)
ch <- struct{}{}
}()
}
for i := 0; i < concurrency; i++ {
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}
}

func testWorkerPoolPanicErrorMulti(t *testing.T) {
var globalCount uint64
var recoverCount uint64
wp := &workerPool{
WorkerFunc: func(conn net.Conn) error {
count := atomic.AddUint64(&globalCount, 1)
switch count % 3 {
case 0:
panic("foobar")
case 1:
return fmt.Errorf("fake error")
}
return nil
},
MaxWorkersCount: 1000,
MaxIdleWorkerDuration: time.Millisecond,
Logger: &customLogger{},
PanicHandler: func(r interface{}) {
if r == nil {
t.Fatalf("PanicHandler got nil")
}
atomic.AddUint64(&recoverCount, 1)
},
}

for i := 0; i < 10; i++ {
testWorkerPoolPanicError(t, wp)
}

if recoverCount == 0 {
t.Fatalf("PanicHandler was not called")
}
}

func testWorkerPoolPanicError(t *testing.T, wp *workerPool) {
wp.Start()

ln := fasthttputil.NewInmemoryListener()

clientsCount := 10
clientCh := make(chan struct{}, clientsCount)
for i := 0; i < clientsCount; i++ {
go func() {
conn, err := ln.Dial()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
data, err := ioutil.ReadAll(conn)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(data) > 0 {
t.Fatalf("unexpected data read: %q. Expecting empty data", data)
}
if err = conn.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
clientCh <- struct{}{}
}()
}

for i := 0; i < clientsCount; i++ {
conn, err := ln.Accept()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !wp.Serve(conn) {
t.Fatalf("worker pool mustn't be full")
}
}

for i := 0; i < clientsCount; i++ {
select {
case <-clientCh:
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}

if err := ln.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}

wp.Stop()
}