From b4f365b8330c03a771db0738158903b987abbc9b Mon Sep 17 00:00:00 2001 From: Sergey Egorov Date: Wed, 10 Jan 2024 11:14:32 +0200 Subject: [PATCH 1/6] Fix connection reaper deadlock. Added explicitly running test to detect deadlock. Use ```go test -tags=TestConnReaperDeadlockLoop -run ^TestConnReaperDeadlockLoop$```. Be aware the test generate a lot of sockets in TIME_WAIT. Without patch to socket.go it breaks pretty quick on fast and slow machines. With the patch it runs way longer but gets slower when there are plenty of sockets in TIME_WAIT. --- socket.go | 11 +++++++++-- socket_loop_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 socket_loop_test.go diff --git a/socket.go b/socket.go index ca198a6..ced9760 100644 --- a/socket.go +++ b/socket.go @@ -11,6 +11,7 @@ import ( "log" "net" "os" + "slices" "sort" "strings" "sync" @@ -384,10 +385,16 @@ func (sck *socket) connReaper() { return } - for _, c := range sck.closedConns { + // Clone the known closed connections to avoid data race + // and remove those under reaper unlocked. + // That would be resoling deadlock from #149 simpler way. + cc := slices.Clone(sck.closedConns) + sck.closedConns = nil + sck.reaperCond.L.Unlock() + for _, c := range cc { sck.rmConn(c) } - sck.closedConns = nil + sck.reaperCond.L.Lock() } } diff --git a/socket_loop_test.go b/socket_loop_test.go new file mode 100644 index 0000000..c0478a6 --- /dev/null +++ b/socket_loop_test.go @@ -0,0 +1,27 @@ +//go:build TestConnReaperDeadlockLoop + +package zmq4_test + +import ( + "log" + "testing" + "time" +) + +// Use ```go test -tags=TestConnReaperDeadlockLoop -run ^TestConnReaperDeadlockLoop$``` +func TestConnReaperDeadlockLoop(t *testing.T) { + n := uint64(0) + for { + n++ + if n%100 == 0 { + log.Printf("%d ...", n) + } + timer := time.AfterFunc(30*time.Second, func() { + log.Fatalf("failed at %d!!!", n) + }) + + TestConnReaperDeadlock(t) + + timer.Stop() + } +} From 4df9a62fab11d54dd06548968d86d294d1f47d47 Mon Sep 17 00:00:00 2001 From: Sergey Egorov Date: Wed, 10 Jan 2024 11:39:10 +0200 Subject: [PATCH 2/6] Remove needs of package slices --- socket.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/socket.go b/socket.go index ced9760..48060a1 100644 --- a/socket.go +++ b/socket.go @@ -11,7 +11,6 @@ import ( "log" "net" "os" - "slices" "sort" "strings" "sync" @@ -388,7 +387,7 @@ func (sck *socket) connReaper() { // Clone the known closed connections to avoid data race // and remove those under reaper unlocked. // That would be resoling deadlock from #149 simpler way. - cc := slices.Clone(sck.closedConns) + cc := append([]*Conn{}, sck.closedConns...) // clone sck.closedConns = nil sck.reaperCond.L.Unlock() for _, c := range cc { From d02690e820a7b2e683c5bc52e3c62feba3de6a06 Mon Sep 17 00:00:00 2001 From: Sergey Egorov Date: Tue, 16 Jan 2024 19:03:05 +0200 Subject: [PATCH 3/6] Apply suggestions from code review Co-authored-by: Sebastien Binet --- socket.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/socket.go b/socket.go index 48060a1..7a41c71 100644 --- a/socket.go +++ b/socket.go @@ -386,9 +386,9 @@ func (sck *socket) connReaper() { // Clone the known closed connections to avoid data race // and remove those under reaper unlocked. - // That would be resoling deadlock from #149 simpler way. + // That should fix the deadlock reported in #149. cc := append([]*Conn{}, sck.closedConns...) // clone - sck.closedConns = nil + sck.closedConns = sck.closedConns[:0] sck.reaperCond.L.Unlock() for _, c := range cc { sck.rmConn(c) From 899e12c869b104ed3d0da9ddad9013d0dd51a967 Mon Sep 17 00:00:00 2001 From: egorse Date: Sun, 21 Jan 2024 12:39:47 +0200 Subject: [PATCH 4/6] Add improved connection reaper deadlock test The test TestConnReaperDeadlock2 designed after TestConnReaperDeadlock but uses internals to ensure problematic sequnence, which in case of TestConnReaperDeadlock requires number of runs. --- reaper_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++++ socket_loop_test.go | 27 ------------ 2 files changed, 104 insertions(+), 27 deletions(-) create mode 100644 reaper_test.go delete mode 100644 socket_loop_test.go diff --git a/reaper_test.go b/reaper_test.go new file mode 100644 index 0000000..cce9e33 --- /dev/null +++ b/reaper_test.go @@ -0,0 +1,104 @@ +// Copyright 2024 The go-zeromq Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package zmq4 + +import ( + "context" + "fmt" + "io" + "net" + "runtime" + "testing" + "time" +) + +func TestConnReaperDeadlock2(t *testing.T) { + ep := must(EndPoint("tcp")) + defer cleanUp(ep) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Bind the server. + srv := NewRouter(ctx, WithLogger(Devnull)).(*routerSocket) + if err := srv.Listen(ep); err != nil { + t.Fatalf("could not listen on %q: %+v", ep, err) + } + defer srv.Close() + + // Connect clients. + // Use same client ID so the srv.Send will hold mutex + // for longer time + var clients []Socket + id := "client-x" + for i := 0; i < 10; i++ { + c := NewReq(ctx, WithLogger(Devnull), WithID(SocketIdentity(id))) + if err := c.Dial(ep); err != nil { + t.Fatalf("could not dial %q: %+v", ep, err) + } + clients = append(clients, c) + } + + // Modify clients connection sockets in server + // so any send to client will trigger context switch + // and be failing. + // Idea is that while srv.Send is progresing, + // the connection will be closed and assigned + // for connection reaper. + rmw := srv.sck.w.(*routerMWriter) + for i := range rmw.ws { + rmw.ws[i].rw = &sockSendEof{rmw.ws[i].rw} + } + + // Now try to send a message from the server to all clients. + msg := NewMsgFrom(nil, nil, []byte("payload")) + msg.Frames[0] = []byte(id) + if err := srv.Send(msg); err != nil { + t.Logf("Send to %s failed: %+v\n", id, err) + } + + for i := range clients { + clients[i].Close() + } +} + +type sockSendEof struct { + net.Conn +} + +func (r *sockSendEof) Write(b []byte) (n int, err error) { + runtime.Gosched() + time.Sleep(1 * time.Second) + return 0, io.EOF +} + +func must(str string, err error) string { + if err != nil { + panic(err) + } + return str +} + +func EndPoint(transport string) (string, error) { + switch transport { + case "tcp": + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return "", err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return "", err + } + defer l.Close() + return fmt.Sprintf("tcp://%s", l.Addr()), nil + case "ipc": + return "ipc://tmp-" + newUUID(), nil + case "inproc": + return "inproc://tmp-" + newUUID(), nil + default: + panic("invalid transport: [" + transport + "]") + } +} diff --git a/socket_loop_test.go b/socket_loop_test.go deleted file mode 100644 index c0478a6..0000000 --- a/socket_loop_test.go +++ /dev/null @@ -1,27 +0,0 @@ -//go:build TestConnReaperDeadlockLoop - -package zmq4_test - -import ( - "log" - "testing" - "time" -) - -// Use ```go test -tags=TestConnReaperDeadlockLoop -run ^TestConnReaperDeadlockLoop$``` -func TestConnReaperDeadlockLoop(t *testing.T) { - n := uint64(0) - for { - n++ - if n%100 == 0 { - log.Printf("%d ...", n) - } - timer := time.AfterFunc(30*time.Second, func() { - log.Fatalf("failed at %d!!!", n) - }) - - TestConnReaperDeadlock(t) - - timer.Stop() - } -} From 9fcf8e40be2377e697bc091be425af03f3ba2876 Mon Sep 17 00:00:00 2001 From: egorse Date: Sun, 21 Jan 2024 13:52:44 +0200 Subject: [PATCH 5/6] Fixed data race during TestConnReaperDeadlock2 --- reaper_test.go | 66 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/reaper_test.go b/reaper_test.go index cce9e33..83e21fc 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -28,29 +28,26 @@ func TestConnReaperDeadlock2(t *testing.T) { } defer srv.Close() - // Connect clients. - // Use same client ID so the srv.Send will hold mutex - // for longer time - var clients []Socket - id := "client-x" - for i := 0; i < 10; i++ { - c := NewReq(ctx, WithLogger(Devnull), WithID(SocketIdentity(id))) - if err := c.Dial(ep); err != nil { - t.Fatalf("could not dial %q: %+v", ep, err) - } - clients = append(clients, c) - } - - // Modify clients connection sockets in server + // Add modified clients connection to server // so any send to client will trigger context switch // and be failing. - // Idea is that while srv.Send is progresing, + // Idea is that while srv.Send is progressing, // the connection will be closed and assigned - // for connection reaper. + // for connection reaper, and reaper will try to remove those + id := "client-x" + srv.sck.mu.Lock() rmw := srv.sck.w.(*routerMWriter) - for i := range rmw.ws { - rmw.ws[i].rw = &sockSendEof{rmw.ws[i].rw} + for i := 0; i < 10; i++ { + w := &Conn{} + w.Peer.Meta = make(Metadata) + w.Peer.Meta[sysSockID] = id + w.rw = &sockSendEof{} + w.onCloseErrorCB = srv.sck.scheduleRmConn + // Do not to call srv.addConn as we dont want to have listener on this fake socket + rmw.addConn(w) + srv.sck.conns = append(srv.sck.conns, w) } + srv.sck.mu.Unlock() // Now try to send a message from the server to all clients. msg := NewMsgFrom(nil, nil, []byte("payload")) @@ -58,14 +55,9 @@ func TestConnReaperDeadlock2(t *testing.T) { if err := srv.Send(msg); err != nil { t.Logf("Send to %s failed: %+v\n", id, err) } - - for i := range clients { - clients[i].Close() - } } type sockSendEof struct { - net.Conn } func (r *sockSendEof) Write(b []byte) (n int, err error) { @@ -74,6 +66,34 @@ func (r *sockSendEof) Write(b []byte) (n int, err error) { return 0, io.EOF } +func (r *sockSendEof) Read(b []byte) (int, error) { + return 0, nil +} + +func (r *sockSendEof) Close() error { + return nil +} + +func (r *sockSendEof) LocalAddr() net.Addr { + return nil +} + +func (r *sockSendEof) RemoteAddr() net.Addr { + return nil +} + +func (r *sockSendEof) SetDeadline(t time.Time) error { + return nil +} + +func (r *sockSendEof) SetReadDeadline(t time.Time) error { + return nil +} + +func (r *sockSendEof) SetWriteDeadline(t time.Time) error { + return nil +} + func must(str string, err error) string { if err != nil { panic(err) From 66d39ee5d9b571b14ca92a3f078d0e7a1eb4dbeb Mon Sep 17 00:00:00 2001 From: egorse Date: Mon, 22 Jan 2024 20:08:06 +0200 Subject: [PATCH 6/6] Minor correction after core review. Additional test simplification. Now it needs only two connections, where 1st fails asap and 2nd fails with delay - giving connection reaper time to meet deadlock --- reaper_test.go | 66 +++++++++++++++++--------------------------------- zall_test.go | 31 ++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 44 deletions(-) diff --git a/reaper_test.go b/reaper_test.go index 83e21fc..4b756c5 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -6,10 +6,9 @@ package zmq4 import ( "context" - "fmt" "io" "net" - "runtime" + "sync/atomic" "testing" "time" ) @@ -37,11 +36,11 @@ func TestConnReaperDeadlock2(t *testing.T) { id := "client-x" srv.sck.mu.Lock() rmw := srv.sck.w.(*routerMWriter) - for i := 0; i < 10; i++ { + for i := 0; i < 2; i++ { w := &Conn{} w.Peer.Meta = make(Metadata) w.Peer.Meta[sysSockID] = id - w.rw = &sockSendEof{} + w.rw = &sockSendEOF{} w.onCloseErrorCB = srv.sck.scheduleRmConn // Do not to call srv.addConn as we dont want to have listener on this fake socket rmw.addConn(w) @@ -57,68 +56,47 @@ func TestConnReaperDeadlock2(t *testing.T) { } } -type sockSendEof struct { +type sockSendEOF struct { } -func (r *sockSendEof) Write(b []byte) (n int, err error) { - runtime.Gosched() - time.Sleep(1 * time.Second) +var a atomic.Int32 + +func (r *sockSendEOF) Write(b []byte) (n int, err error) { + // Each odd write fails asap. + // Each even write fails after sleep. + // Such a way we ensure the short write failure + // will cause socket be assinged to connection reaper + // while srv.Send is still in progress due to long writes. + if x := a.Add(1); x&1 == 0 { + time.Sleep(1 * time.Second) + } return 0, io.EOF } -func (r *sockSendEof) Read(b []byte) (int, error) { +func (r *sockSendEOF) Read(b []byte) (int, error) { return 0, nil } -func (r *sockSendEof) Close() error { +func (r *sockSendEOF) Close() error { return nil } -func (r *sockSendEof) LocalAddr() net.Addr { +func (r *sockSendEOF) LocalAddr() net.Addr { return nil } -func (r *sockSendEof) RemoteAddr() net.Addr { +func (r *sockSendEOF) RemoteAddr() net.Addr { return nil } -func (r *sockSendEof) SetDeadline(t time.Time) error { +func (r *sockSendEOF) SetDeadline(t time.Time) error { return nil } -func (r *sockSendEof) SetReadDeadline(t time.Time) error { +func (r *sockSendEOF) SetReadDeadline(t time.Time) error { return nil } -func (r *sockSendEof) SetWriteDeadline(t time.Time) error { +func (r *sockSendEOF) SetWriteDeadline(t time.Time) error { return nil } - -func must(str string, err error) string { - if err != nil { - panic(err) - } - return str -} - -func EndPoint(transport string) (string, error) { - switch transport { - case "tcp": - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") - if err != nil { - return "", err - } - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return "", err - } - defer l.Close() - return fmt.Sprintf("tcp://%s", l.Addr()), nil - case "ipc": - return "ipc://tmp-" + newUUID(), nil - case "inproc": - return "inproc://tmp-" + newUUID(), nil - default: - panic("invalid transport: [" + transport + "]") - } -} diff --git a/zall_test.go b/zall_test.go index c3cd079..90fc514 100644 --- a/zall_test.go +++ b/zall_test.go @@ -5,10 +5,41 @@ package zmq4 import ( + "fmt" "io" "log" + "net" ) var ( Devnull = log.New(io.Discard, "zmq4: ", 0) ) + +func must(str string, err error) string { + if err != nil { + panic(err) + } + return str +} + +func EndPoint(transport string) (string, error) { + switch transport { + case "tcp": + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return "", err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return "", err + } + defer l.Close() + return fmt.Sprintf("tcp://%s", l.Addr()), nil + case "ipc": + return "ipc://tmp-" + newUUID(), nil + case "inproc": + return "inproc://tmp-" + newUUID(), nil + default: + panic("invalid transport: [" + transport + "]") + } +}