Skip to content

Commit

Permalink
Replace waitgroups with retries in Integration Test (cosmos#992)
Browse files Browse the repository at this point in the history
## Overview
This depends on rollkit/rollkit#963. As part of
Stage 2 of rollkit/rollkit#954, this PR
intends to replace all of the waitgroups in the helpers and tests with
retry checks.

## Checklist
- [ ] New and updated code has appropriate documentation
- [ ] New and updated code has new and/or updated testing
- [ ] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [ ] Linked issues closed with keywords

---------

Co-authored-by: Connor O'Hara <connor@switchboard.xyz>
  • Loading branch information
S1nus and Connor O'Hara authored Jun 23, 2023
1 parent 1e1d8f9 commit e4f8bd4
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 81 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/celestiaorg/go-cnc v0.3.0
github.com/celestiaorg/go-fraud v0.1.0
github.com/celestiaorg/go-header v0.2.8
github.com/celestiaorg/utils v0.1.0
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/go-kit/kit v0.12.0
github.com/gogo/protobuf v1.3.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ github.com/celestiaorg/go-header v0.2.8 h1:NS6fOzfRoGqQF3RrstSjAxLArWyJWob+25T4G
github.com/celestiaorg/go-header v0.2.8/go.mod h1:i9OpY70+PJ1xPw1IgMfF0Pk6vBD6VWPmjY3bgubJBcU=
github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao=
github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo=
github.com/celestiaorg/utils v0.1.0 h1:WsP3O8jF7jKRgLNFmlDCwdThwOFMFxg0MnqhkLFVxPo=
github.com/celestiaorg/utils v0.1.0/go.mod h1:vQTh7MHnvpIeCQZ2/Ph+w7K1R2UerDheZbgJEJD2hSU=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4/go.mod h1:fzuHnhzj1pUygGz+1ZkB3uQbEUL4htqCGJ4Qs2LwMZA=
github.com/celestiaorg/nmt v0.15.0 h1:ID9QlMIeP6WK/iiGcfnYLu2qqVIq0UYe/dc3TVPt6EA=
Expand Down
134 changes: 57 additions & 77 deletions node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"context"
"crypto/rand"
"errors"
"fmt"
mrand "math/rand"
"strconv"
Expand All @@ -28,6 +29,8 @@ import (
"github.com/rollkit/rollkit/p2p"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/types"

testutils "github.com/celestiaorg/utils/test"
)

func TestAggregatorMode(t *testing.T) {
Expand Down Expand Up @@ -147,7 +150,7 @@ func TestLazyAggregator(t *testing.T) {
key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
genesisValidators, signingKey := getGenesisValidatorSetWithSigner(1)
blockManagerConfig := config.BlockManagerConfig{
BlockTime: 1 * time.Second,
BlockTime: 1 * time.Millisecond,
NamespaceID: types.NamespaceID{1, 2, 3, 4, 5, 6, 7, 8},
}

Expand All @@ -169,25 +172,22 @@ func TestLazyAggregator(t *testing.T) {

require.NoError(err)

// delay to ensure it waits for block 1 to be built
time.Sleep(1 * time.Second)
require.NoError(waitForFirstBlock(node.(*FullNode)))

client := node.GetClient()

_, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 1})
assert.NoError(err)
time.Sleep(2 * time.Second)
assert.Equal(node.(*FullNode).Store.Height(), uint64(2))
require.NoError(waitForAtLeastNBlocks(node, 2))

_, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 2})
assert.NoError(err)
time.Sleep(2 * time.Second)
assert.Equal(node.(*FullNode).Store.Height(), uint64(3))
require.NoError(waitForAtLeastNBlocks(node, 3))

_, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 3})
assert.NoError(err)
time.Sleep(2 * time.Second)
assert.Equal(node.(*FullNode).Store.Height(), uint64(4))

require.NoError(waitForAtLeastNBlocks(node, 4))

}

Expand All @@ -199,116 +199,94 @@ func TestHeaderExchange(t *testing.T) {
}

func testSingleAggreatorSingleFullNode(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

var wg sync.WaitGroup
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
clientNodes := 1
nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, false, &wg, t)
nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, false, t)

node1 := nodes[0]
node2 := nodes[1]

require.NoError(node1.Start())
time.Sleep(2 * time.Second) // wait for more than 1 blocktime for syncer to work

require.NoError(waitForFirstBlock(node1))
require.NoError(node2.Start())

time.Sleep(3 * time.Second)
require.NoError(waitForAtLeastNBlocks(node2, 2))

n1h := node1.hExService.headerStore.Height()
aggCancel()
require.NoError(node1.Stop())

time.Sleep(3 * time.Second)
require.NoError(verifyNodesSynced(node1, node2))

n2h := node2.hExService.headerStore.Height()
cancel()
require.NoError(node2.Stop())

assert.Equal(n1h, n2h, "heights must match")
}

func testSingleAggreatorTwoFullNode(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

var wg sync.WaitGroup
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
clientNodes := 2
nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, false, &wg, t)
nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, false, t)

node1 := nodes[0]
node2 := nodes[1]
node3 := nodes[2]

require.NoError(node1.Start())
time.Sleep(2 * time.Second) // wait for more than 1 blocktime for syncer to work
require.NoError(waitForFirstBlock(node1))
require.NoError(node2.Start())
require.NoError(node3.Start())

time.Sleep(3 * time.Second)
require.NoError(waitForAtLeastNBlocks(node2, 2))

n1h := node1.hExService.headerStore.Height()
aggCancel()
require.NoError(node1.Stop())

time.Sleep(3 * time.Second)

n2h := node2.hExService.headerStore.Height()
require.NoError(verifyNodesSynced(node1, node2))
cancel()
require.NoError(node2.Stop())

n3h := node3.hExService.headerStore.Height()
require.NoError(node3.Stop())

assert.Equal(n1h, n2h, "heights must match")
assert.Equal(n1h, n3h, "heights must match")
}

func testSingleAggreatorSingleFullNodeTrustedHash(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

var wg sync.WaitGroup
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
clientNodes := 1
nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, false, &wg, t)
nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, false, t)

node1 := nodes[0]
node2 := nodes[1]

require.NoError(node1.Start())
time.Sleep(2 * time.Second) // wait for more than 1 blocktime for syncer to work

require.NoError(waitForFirstBlock(node1))

// Get the trusted hash from node1 and pass it to node2 config
trustedHash, err := node1.hExService.headerStore.GetByHeight(aggCtx, 1)
require.NoError(err)
node2.conf.TrustedHash = trustedHash.Hash().String()
require.NoError(node2.Start())

time.Sleep(3 * time.Second)
require.NoError(waitForAtLeastNBlocks(node1, 2))

n1h := node1.hExService.headerStore.Height()
aggCancel()
require.NoError(node1.Stop())

time.Sleep(3 * time.Second)

n2h := node2.hExService.headerStore.Height()
require.NoError(verifyNodesSynced(node1, node2))
cancel()
require.NoError(node2.Stop())

assert.Equal(n1h, n2h, "heights must match")
}

func testSingleAggreatorSingleFullNodeSingleLightNode(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

var wg sync.WaitGroup
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -321,36 +299,29 @@ func testSingleAggreatorSingleFullNodeSingleLightNode(t *testing.T) {
ds, _ := store.NewDefaultInMemoryKVStore()
_ = dalc.Init([8]byte{}, nil, ds, log.TestingLogger())
_ = dalc.Start()
sequencer, _ := createNode(aggCtx, 0, false, true, false, keys, &wg, t)
fullNode, _ := createNode(ctx, 1, false, false, false, keys, &wg, t)
sequencer, _ := createNode(aggCtx, 0, false, true, false, keys, t)
fullNode, _ := createNode(ctx, 1, false, false, false, keys, t)

sequencer.(*FullNode).dalc = dalc
sequencer.(*FullNode).blockManager.SetDALC(dalc)
fullNode.(*FullNode).dalc = dalc
fullNode.(*FullNode).blockManager.SetDALC(dalc)

lightNode, _ := createNode(ctx, 2, false, false, true, keys, &wg, t)
lightNode, _ := createNode(ctx, 2, false, false, true, keys, t)

require.NoError(sequencer.Start())
require.NoError(fullNode.Start())
require.NoError(lightNode.Start())

time.Sleep(3 * time.Second)
require.NoError(waitForAtLeastNBlocks(sequencer.(*FullNode), 2))

n1h := sequencer.(*FullNode).hExService.headerStore.Height()
aggCancel()
require.NoError(sequencer.Stop())

time.Sleep(3 * time.Second)

n2h := fullNode.(*FullNode).hExService.headerStore.Height()
n3h := lightNode.(*LightNode).hExService.headerStore.Height()
require.NoError(verifyNodesSynced(fullNode, lightNode))
cancel()
require.NoError(fullNode.Stop())
require.NoError(lightNode.Stop())

assert.Equal(n1h, n2h, "heights must match")
assert.Equal(n1h, n3h, "heights must match")
}

func testSingleAggreatorSingleFullNodeFraudProofGossip(t *testing.T) {
Expand All @@ -361,7 +332,7 @@ func testSingleAggreatorSingleFullNodeFraudProofGossip(t *testing.T) {
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
clientNodes := 1
nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, true, &wg, t)
nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, true, t)

for _, app := range apps {
app.On("VerifyFraudProof", mock.Anything).Return(abci.ResponseVerifyFraudProof{Success: true}).Run(func(args mock.Arguments) {
Expand All @@ -374,7 +345,7 @@ func testSingleAggreatorSingleFullNodeFraudProofGossip(t *testing.T) {

wg.Add(clientNodes + 1)
require.NoError(aggNode.Start())
time.Sleep(2 * time.Second)
require.NoError(waitForAtLeastNBlocks(aggNode, 2))
require.NoError(fullNode.Start())

wg.Wait()
Expand Down Expand Up @@ -408,7 +379,7 @@ func testSingleAggreatorTwoFullNodeFraudProofSync(t *testing.T) {
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
clientNodes := 2
nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, true, &wg, t)
nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, true, t)

for _, app := range apps {
app.On("VerifyFraudProof", mock.Anything).Return(abci.ResponseVerifyFraudProof{Success: true}).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -525,11 +496,10 @@ func TestFraudProofService(t *testing.T) {

// Creates a starts the given number of client nodes along with an aggregator node. Uses the given flag to decide whether to have the aggregator produce malicious blocks.
func createAndStartNodes(clientNodes int, isMalicious bool, t *testing.T) ([]*FullNode, []*mocks.Application) {
var wg sync.WaitGroup
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, isMalicious, &wg, t)
startNodes(nodes, &wg, t)
nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, isMalicious, t)
startNodes(nodes, apps, t)
aggCancel()
time.Sleep(100 * time.Millisecond)
for _, n := range nodes {
Expand All @@ -542,19 +512,17 @@ func createAndStartNodes(clientNodes int, isMalicious bool, t *testing.T) ([]*Fu

// Starts the given nodes using the given wait group to synchronize them
// and wait for them to gossip transactions
func startNodes(nodes []*FullNode, wg *sync.WaitGroup, t *testing.T) {
numNodes := len(nodes)
wg.Add((numNodes) * (numNodes - 1))
func startNodes(nodes []*FullNode, apps []*mocks.Application, t *testing.T) {

// Wait for aggregator node to publish the first block for full nodes to initialize header exchange service
require.NoError(t, nodes[0].Start())
time.Sleep(1 * time.Second)
require.NoError(t, waitForFirstBlock(nodes[0]))
for i := 1; i < len(nodes); i++ {
require.NoError(t, nodes[i].Start())
}

// wait for nodes to start up and establish connections; 1 second ensures that test pass even on CI.
time.Sleep(1 * time.Second)
require.NoError(t, waitForAtLeastNBlocks(nodes[1], 2))

for i := 1; i < len(nodes); i++ {
data := strconv.Itoa(i) + time.Now().String()
Expand All @@ -565,7 +533,21 @@ func startNodes(nodes []*FullNode, wg *sync.WaitGroup, t *testing.T) {
doneChan := make(chan struct{})
go func() {
defer close(doneChan)
wg.Wait()
// create a MockTester, to catch the Failed asserts from the Mock package
m := MockTester{t: t}
// We don't nedd to check any specific arguments to DeliverTx
// so just use a function that returns "true" for matching the args
matcher := mock.MatchedBy(func(i interface{}) bool { return true })
err := testutils.Retry(300, 100*time.Millisecond, func() error {
for i := 0; i < len(apps); i++ {
if !apps[i].AssertCalled(m, "DeliverTx", matcher) {
return errors.New("DeliverTx hasn't been called yet")
}
}
return nil
})
assert := assert.New(t)
assert.NoError(err)
}()
select {
case <-doneChan:
Expand All @@ -575,7 +557,7 @@ func startNodes(nodes []*FullNode, wg *sync.WaitGroup, t *testing.T) {
}

// Creates the given number of nodes the given nodes using the given wait group to synchornize them
func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *sync.WaitGroup, t *testing.T) ([]*FullNode, []*mocks.Application) {
func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, t *testing.T) ([]*FullNode, []*mocks.Application) {
t.Helper()

if aggCtx == nil {
Expand All @@ -597,14 +579,14 @@ func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *syn
ds, _ := store.NewDefaultInMemoryKVStore()
_ = dalc.Init([8]byte{}, nil, ds, log.TestingLogger())
_ = dalc.Start()
node, app := createNode(aggCtx, 0, isMalicious, true, false, keys, wg, t)
node, app := createNode(aggCtx, 0, isMalicious, true, false, keys, t)
apps[0] = app
nodes[0] = node.(*FullNode)
// use same, common DALC, so nodes can share data
nodes[0].dalc = dalc
nodes[0].blockManager.SetDALC(dalc)
for i := 1; i < num; i++ {
node, apps[i] = createNode(ctx, i, isMalicious, false, false, keys, wg, t)
node, apps[i] = createNode(ctx, i, isMalicious, false, false, keys, t)
nodes[i] = node.(*FullNode)
nodes[i].dalc = dalc
nodes[i].blockManager.SetDALC(dalc)
Expand All @@ -613,7 +595,7 @@ func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *syn
return nodes, apps
}

func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, isLight bool, keys []crypto.PrivKey, wg *sync.WaitGroup, t *testing.T) (Node, *mocks.Application) {
func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, isLight bool, keys []crypto.PrivKey, t *testing.T) (Node, *mocks.Application) {
t.Helper()
require := require.New(t)
// nodes will listen on consecutive ports on local interface
Expand Down Expand Up @@ -645,6 +627,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, i
app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{})
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{})
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{})
app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{})
maliciousAppHash := []byte{9, 8, 7, 6}
nonMaliciousAppHash := []byte{1, 2, 3, 4}
if isMalicious && aggregator {
Expand All @@ -656,9 +639,6 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, i
if isMalicious && !aggregator {
app.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{FraudProof: &abci.FraudProof{BlockHeight: 1, FraudulentBeginBlock: &abci.RequestBeginBlock{Hash: []byte("123")}, ExpectedValidAppHash: nonMaliciousAppHash}})
}
app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}).Run(func(args mock.Arguments) {
wg.Done()
})
if ctx == nil {
ctx = context.Background()
}
Expand Down
Loading

0 comments on commit e4f8bd4

Please sign in to comment.