Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: dont reuse buffer if conn is closed by poller #325

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (c *connection) onClose() error {

// closeBuffer recycle input & output LinkBuffer.
func (c *connection) closeBuffer() {
// if c is not closed by user, we shouldn't reuse buffer because user may still hold the buffer
if !c.isCloseBy(user) {
return
}
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
// if client close the connection, we cannot ensure that the poller is not process the buffer,
Expand Down
46 changes: 46 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"syscall"
"testing"
"time"
"unsafe"
)

func TestConnectionWrite(t *testing.T) {
Expand Down Expand Up @@ -182,6 +183,51 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) {
wg.Wait()
}

func TestConnectionReadRepeatedlyAfterClosed(t *testing.T) {
buffers := map[uintptr][]byte{}
for i := 0; i < 1000; i++ {
msg1 := []byte(fmt.Sprintf("%5d", i))
msg2 := []byte(fmt.Sprintf("%5d", i+1))

r1, w1 := GetSysFdPairs()
r2, w2 := GetSysFdPairs()
rconn1, rconn2 := new(connection), new(connection)
rconn1.init(&netFD{fd: r1}, nil)
rconn2.init(&netFD{fd: r2}, nil)
rconn1.SetOnConnect(func(ctx context.Context, connection Connection) context.Context {
return ctx
})
rconn2.SetOnConnect(func(ctx context.Context, connection Connection) context.Context {
return ctx
})
trigger := make(chan struct{})
go func() {
syscall.Write(w1, msg1)
trigger <- struct{}{}
<-trigger // wait read msg1

syscall.Close(w1)
syscall.Write(w2, msg2)
trigger <- struct{}{}
//syscall.Close(w2)
}()

<-trigger // wait write msg1
buf1, _ := rconn1.Reader().Next(5)
Equal(t, string(buf1), string(msg1))
trigger <- struct{}{}

<-trigger // wait write msg2
buf2, _ := rconn2.Reader().Next(5)
Equal(t, string(buf2), string(msg2))
Equal(t, string(buf1), string(msg1))
Assert(t, buffers[uintptr(unsafe.Pointer(&buf1[0]))] == nil)
Assert(t, buffers[uintptr(unsafe.Pointer(&buf2[0]))] == nil)
buffers[uintptr(unsafe.Pointer(&buf1[0]))] = buf1
buffers[uintptr(unsafe.Pointer(&buf2[0]))] = buf2
}
}

func TestReadTimer(t *testing.T) {
read := time.NewTimer(time.Second)
MustTrue(t, read.Stop())
Expand Down
Loading