Skip to content

Commit

Permalink
fix races in requester
Browse files Browse the repository at this point in the history
  • Loading branch information
SaveTheRbtz committed Apr 12, 2022
1 parent bb24d8f commit 05a87c7
Showing 1 changed file with 20 additions and 16 deletions.
36 changes: 20 additions & 16 deletions engine/common/requester/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package requester

import (
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -141,11 +142,16 @@ func TestDispatchRequestVarious(t *testing.T) {

con.AssertExpectations(t)

request.unit.Lock()
assert.Contains(t, request.requests, nonce)
request.unit.Unlock()

// TODO: racy/slow test
time.Sleep(2 * cfg.RetryInitial)

request.unit.Lock()
assert.NotContains(t, request.requests, nonce)
request.unit.Unlock()
}

func TestDispatchRequestBatchSize(t *testing.T) {
Expand Down Expand Up @@ -269,7 +275,8 @@ func TestOnEntityResponseValid(t *testing.T) {
EntityIDs: []flow.Identifier{wanted1.ID(), wanted2.ID(), unavailable.ID()},
}

called := 0
done := make(chan struct{})
called := int64(0)
request := Engine{
unit: engine.NewUnit(),
metrics: metrics.NewNoopCollector(),
Expand All @@ -278,7 +285,11 @@ func TestOnEntityResponseValid(t *testing.T) {
requests: make(map[uint64]*messages.EntityRequest),
selector: filter.HasNodeID(targetID),
create: func() flow.Entity { return &flow.Collection{} },
handle: func(flow.Identifier, flow.Entity) { called++ },
handle: func(flow.Identifier, flow.Entity) {
if atomic.AddInt64(&called, 1) >= 2 {
close(done)
}
},
}

request.items[iwanted1.EntityID] = iwanted1
Expand All @@ -300,10 +311,8 @@ func TestOnEntityResponseValid(t *testing.T) {
// check that the missing item is still there
assert.Contains(t, request.items, unavailable.ID())

time.Sleep(100 * time.Millisecond)

// make sure we processed two items
assert.Equal(t, called, 2)
unittest.AssertClosesBefore(t, done, time.Second)

// check that the missing items timestamp was reset
assert.Equal(t, iunavailable.LastRequested, time.Time{})
Expand Down Expand Up @@ -354,7 +363,7 @@ func TestOnEntityIntegrityCheck(t *testing.T) {
EntityIDs: []flow.Identifier{wanted.ID()},
}

called := 0
called := make(chan struct{})
request := Engine{
unit: engine.NewUnit(),
metrics: metrics.NewNoopCollector(),
Expand All @@ -363,7 +372,7 @@ func TestOnEntityIntegrityCheck(t *testing.T) {
requests: make(map[uint64]*messages.EntityRequest),
selector: filter.HasNodeID(targetID),
create: func() flow.Entity { return &flow.Collection{} },
handle: func(flow.Identifier, flow.Entity) { called++ },
handle: func(flow.Identifier, flow.Entity) { close(called) },
}

request.items[iwanted.EntityID] = iwanted
Expand All @@ -379,20 +388,15 @@ func TestOnEntityIntegrityCheck(t *testing.T) {
// check that the provided item wasn't removed
assert.Contains(t, request.items, wanted.ID())

// make sure we didn't process items
assert.Equal(t, 0, called)

iwanted.checkIntegrity = false
request.items[iwanted.EntityID] = iwanted
request.requests[req.Nonce] = req

err = request.onEntityResponse(targetID, res)
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)

// make sure we process item without checking integrity
assert.Equal(t, 1, called)
unittest.AssertClosesBefore(t, called, time.Second)
}

// Verify that the origin should not be checked when ValidateStaking config is set to false
Expand Down Expand Up @@ -459,12 +463,12 @@ func TestOriginValidation(t *testing.T) {
)
assert.NoError(t, err)

called := false
called := make(chan struct{})

e.WithHandle(func(origin flow.Identifier, _ flow.Entity) {
// we expect wrong origin to propagate here with validation disabled
assert.Equal(t, wrongID, origin)
called = true
close(called)
})

e.items[iwanted.EntityID] = iwanted
Expand All @@ -480,5 +484,5 @@ func TestOriginValidation(t *testing.T) {
assert.NoError(t, err)

// handler are called async, but this should be extremely quick
require.Eventually(t, func() bool { return called }, 100*time.Millisecond, 10*time.Millisecond)
unittest.AssertClosesBefore(t, called, time.Second)
}

0 comments on commit 05a87c7

Please sign in to comment.