Skip to content

Commit

Permalink
add deadlock regression test
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 10, 2023
1 parent 5f8244c commit 628f799
Showing 1 changed file with 107 additions and 0 deletions.
107 changes: 107 additions & 0 deletions internal/coord/query_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package coord

import (
"context"
"sync"
"testing"

"github.com/benbjohnson/clock"
"github.com/plprobelab/zikade/internal/coord/coordt"
"github.com/plprobelab/zikade/internal/kadtest"
"github.com/plprobelab/zikade/internal/nettest"
"github.com/plprobelab/zikade/kadt"
"github.com/plprobelab/zikade/pb"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -68,3 +77,101 @@ func TestPooledQueryConfigValidate(t *testing.T) {
require.Error(t, cfg.Validate())
})
}

func TestPooledQuery_deadlock_regression(t *testing.T) {
ctx := kadtest.CtxShort(t)
msg := &pb.Message{}
queryID := coordt.QueryID("test")

_, nodes, err := nettest.LinearTopology(3, clock.New())
require.NoError(t, err)

c, err := NewCoordinator(nodes[0].NodeID, nodes[0].Router, nodes[0].RoutingTable, nil)
require.NoError(t, err)
require.NoError(t, c.Close()) // close immediately so that we control the state machine progression

// define a function that produces success messages
successMsg := func(to kadt.PeerID, closer ...kadt.PeerID) *EventSendMessageSuccess {
return &EventSendMessageSuccess{
QueryID: queryID,
Request: msg,
To: to,
Response: nil,
CloserNodes: closer,
}
}

// start query
waiter := NewWaiter[BehaviourEvent]()

waiterDone := make(chan struct{})
waiterMsg := make(chan struct{})
go func() {
defer close(waiterDone)
defer close(waiterMsg)
_, _, err = c.waitForQuery(ctx, queryID, waiter, func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coordt.QueryStats) error {
waiterMsg <- struct{}{}
return coordt.ErrSkipRemaining
})
}()

// start the message query
c.queryBehaviour.Notify(ctx, &EventStartMessageQuery{
QueryID: queryID,
Target: msg.Target(),
Message: msg,
KnownClosestNodes: []kadt.PeerID{nodes[1].NodeID},
Notify: waiter,
NumResults: 0,
})

// advance state machines and assert that the state machine
// wants to send an outbound message to another peer
ev, _ := c.queryBehaviour.Perform(ctx)
require.IsType(t, &EventOutboundSendMessage{}, ev)

// simulate a successful response from another node that returns one new node
// This should result in a message for the waiter
c.queryBehaviour.Notify(ctx, successMsg(nodes[1].NodeID, nodes[2].NodeID))

// Because we're blocking on the waiterMsg channel in the waitForQuery
// method above, we simulate a slow receiving waiter.

// Advance the query pool state machine. Because we returned a new node
// above, the query pool state machine wants to send another outbound query
ev, _ = c.queryBehaviour.Perform(ctx)
require.IsType(t, &EventAddNode{}, ev) // event to notify the routing table
ev, _ = c.queryBehaviour.Perform(ctx)
require.IsType(t, &EventOutboundSendMessage{}, ev)

// Simulate a successful response from the new node. This node didn't return
// any new nodes to contact. This means the query pool behaviour will notify
// the waiter about a query progression and afterward about a finished
// query. Because (at the time of writing) the waiter has a channel buffer
// of 1, the channel cannot hold both events. At the same time, the waiter
// doesn't consume the messages because it's busy processing the previous
// query event (because we haven't released the blocking waiterMsg call above).
var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
c.queryBehaviour.Notify(ctx, successMsg(nodes[2].NodeID))
}()

wg.Wait()
<-waiterMsg

// At this point, the waitForQuery QueryFunc callback returned a
// coordt.ErrSkipRemaining. This instructs the waitForQuery method to notify
// the query behaviour with an EventStopQuery event. However, because the
// query behaviour is busy sending a message to the waiter it is holding the
// lock on the pending events to process. Therefore, this notify call will
// also block. At the same time, the waiter cannot read the new messages
// from the query behaviour because it tries to notify it.

select {
case <-waiterDone:
case <-ctx.Done():
t.Fatalf("tiemout")
}
}

0 comments on commit 628f799

Please sign in to comment.