From 290edd294a8422c54d6e9d64c8704864cd5879ff Mon Sep 17 00:00:00 2001 From: Matthias Fasching <5011972+fasmat@users.noreply.github.com> Date: Thu, 28 Nov 2024 15:59:38 +0000 Subject: [PATCH] Improve stability of system tests (#6486) ## Motivation This PR tries to improve unstable system tests --- api/grpcserver/globalstate_service.go | 2 +- fetch/mesh_data.go | 22 +++- go.mod | 2 +- go.sum | 4 +- miner/proposal_builder.go | 4 +- sql/layers/layers.go | 4 +- systest/Makefile | 13 ++- systest/cluster/nodes.go | 3 +- systest/testcontext/context.go | 12 +- systest/tests/checkpoint_test.go | 25 ++-- systest/tests/common.go | 162 ++++++++++++-------------- systest/tests/nodes_test.go | 13 +-- systest/tests/partition_test.go | 63 +++++++--- systest/tests/poets_test.go | 1 - systest/tests/smeshing_test.go | 4 +- systest/tests/steps_test.go | 3 +- systest/tests/timeskew_test.go | 44 +++---- systest/tests/transactions_test.go | 53 +++++---- 18 files changed, 237 insertions(+), 197 deletions(-) diff --git a/api/grpcserver/globalstate_service.go b/api/grpcserver/globalstate_service.go index a00c4f6ada..639f42f31a 100644 --- a/api/grpcserver/globalstate_service.go +++ b/api/grpcserver/globalstate_service.go @@ -453,7 +453,7 @@ func (s *GlobalStateService) GlobalStateStream( root, err := s.conState.GetLayerStateRoot(layer.LayerID) if err != nil { ctxzap.Warn(stream.Context(), "error retrieving layer data", zap.Error(err)) - root = types.Hash32{} + root = types.EmptyHash32 } resp := &pb.GlobalStateStreamResponse{Datum: &pb.GlobalStateData{Datum: &pb.GlobalStateData_GlobalState{ GlobalState: &pb.GlobalStateHash{ diff --git a/fetch/mesh_data.go b/fetch/mesh_data.go index ad271f307b..f07cede6f4 100644 --- a/fetch/mesh_data.go +++ b/fetch/mesh_data.go @@ -9,6 +9,7 @@ import ( "github.com/spacemeshos/go-scale" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/codec" @@ -177,12 +178,32 @@ func (f *Fetch) GetBlocks(ctx context.Context, ids []types.BlockID) error { // GetProposalTxs fetches the txs provided as IDs and validates them, returns an error if one TX failed to be fetched. func (f *Fetch) GetProposalTxs(ctx context.Context, ids []types.TransactionID) error { + f.logger.Debug("requesting proposal txs from peer", + log.ZContext(ctx), + zap.Int("num_txs", len(ids)), + zap.Array("txs", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, id := range ids { + enc.AppendString(id.ShortString()) + } + return nil + })), + ) return f.getTxs(ctx, ids, f.validators.txProposal.HandleMessage) } // GetBlockTxs fetches the txs provided as IDs and saves them, they will be validated // before block is applied. func (f *Fetch) GetBlockTxs(ctx context.Context, ids []types.TransactionID) error { + f.logger.Debug("requesting block txs from peer", + log.ZContext(ctx), + zap.Int("num_txs", len(ids)), + zap.Array("txs", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, id := range ids { + enc.AppendString(id.ShortString()) + } + return nil + })), + ) return f.getTxs(ctx, ids, f.validators.txBlock.HandleMessage) } @@ -190,7 +211,6 @@ func (f *Fetch) getTxs(ctx context.Context, ids []types.TransactionID, receiver if len(ids) == 0 { return nil } - f.logger.Debug("requesting txs from peer", log.ZContext(ctx), zap.Int("num_txs", len(ids))) hashes := types.TransactionIDsToHashes(ids) return f.getHashes(ctx, hashes, datastore.TXDB, receiver) } diff --git a/go.mod b/go.mod index f790ef7985..753e85904d 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.60.1 github.com/quic-go/quic-go v0.48.2 - github.com/rqlite/sql v0.0.0-20241105143344-71b14bed566c + github.com/rqlite/sql v0.0.0-20241111133259-a4122fabb196 github.com/rs/cors v1.11.1 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/seehuhn/mt19937 v1.0.0 diff --git a/go.sum b/go.sum index d31b3e5d0f..772f084c06 100644 --- a/go.sum +++ b/go.sum @@ -575,8 +575,8 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/rqlite/sql v0.0.0-20241105143344-71b14bed566c h1:2hHhSEvDfn6pkvLLUf3jUJcZQjfPL8aB9wG2NG7s2yU= -github.com/rqlite/sql v0.0.0-20241105143344-71b14bed566c/go.mod h1:ib9zVtNgRKiGuoMyUqqL5aNpk+r+++YlyiVIkclVqPg= +github.com/rqlite/sql v0.0.0-20241111133259-a4122fabb196 h1:SjRKMwKLTEE3STO6unJlz4VlMjMv5NZgIdI9HikBeAc= +github.com/rqlite/sql v0.0.0-20241111133259-a4122fabb196/go.mod h1:ib9zVtNgRKiGuoMyUqqL5aNpk+r+++YlyiVIkclVqPg= github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= diff --git a/miner/proposal_builder.go b/miner/proposal_builder.go index f0ec54d16a..1bd96bc192 100644 --- a/miner/proposal_builder.go +++ b/miner/proposal_builder.go @@ -483,7 +483,9 @@ func (pb *ProposalBuilder) initSharedData(ctx context.Context, current types.Lay // // Additionally all activesets that are older than 2 epochs are deleted at the beginning of an epoch anyway, but // maybe we should revisit this when activesets are no longer bootstrapped. - return pb.db.WithTx(ctx, func(tx sql.Transaction) error { + // + // TODO(mafa): I'm still seeing SQL_BUSY errors in the logs, so for now I change this back to TxImmediate. + return pb.db.WithTxImmediate(ctx, func(tx sql.Transaction) error { yes, err := activesets.Has(tx, pb.shared.active.id) if err != nil { return err diff --git a/sql/layers/layers.go b/sql/layers/layers.go index 5ba9116808..9945a35ce6 100644 --- a/sql/layers/layers.go +++ b/sql/layers/layers.go @@ -96,7 +96,7 @@ func GetLatestStateHash(db sql.Executor) (rst types.Hash32, err error) { }); err != nil { return rst, fmt.Errorf("failed to load latest state root %w", err) } else if rows == 0 { - return rst, fmt.Errorf("%w: state root doesnt exist", sql.ErrNotFound) + return rst, fmt.Errorf("%w: state root does not exist", sql.ErrNotFound) } return rst, err } @@ -117,7 +117,7 @@ func GetStateHash(db sql.Executor, lid types.LayerID) (rst types.Hash32, err err }); err != nil { return rst, fmt.Errorf("failed to load state root for %v: %w", lid, err) } else if rows == 0 { - return rst, fmt.Errorf("%w: %s doesnt exist", sql.ErrNotFound, lid) + return rst, fmt.Errorf("%w: %s does not exist", sql.ErrNotFound, lid) } return rst, err } diff --git a/systest/Makefile b/systest/Makefile index 105b77ba28..21d2c6ec7a 100644 --- a/systest/Makefile +++ b/systest/Makefile @@ -8,9 +8,9 @@ image_name ?= $(org)/systest:$(version_info) certifier_image ?= $(org)/certifier-service:v0.8.4 poet_image ?= $(org)/poet:v0.10.10 post_service_image ?= $(org)/post-service:v0.8.4 -post_init_image ?= $(org)/postcli:v0.12.5 +post_init_image ?= $(org)/postcli:v0.12.10 smesher_image ?= $(org)/go-spacemesh-dev:$(version_info) -old_smesher_image ?= $(org)/go-spacemesh-dev:7b9337a # Update this when new version is released +old_smesher_image ?= $(org)/go-spacemesh-dev:v1.7.7 bs_image ?= $(org)/go-spacemesh-dev-bs:$(version_info) test_id ?= systest-$(version_info) @@ -18,7 +18,7 @@ test_job_name ?= systest-$(version_info)-$(date) keep ?= false clusters ?= 1 size ?= 10 -poet_size ?= 3 +poet_size ?= 2 level ?= debug bootstrap ?= 5m storage ?= standard=1Gi @@ -38,8 +38,11 @@ ifeq ($(configname),$(test_job_name)) run_deps = config endif -command := gotestsum --raw-command -- test2json -t -p systest \ - /bin/tests -test.v -test.count=$(count) -test.timeout=60m -test.run=$(test_name) -test.parallel=$(clusters) \ +# command := gotestsum --raw-command -- test2json -t -p systest \ + /bin/tests -test.v -test.count=$(count) -test.timeout=60m -test.run=$(test_name) -test.parallel=$(clusters) \ + -test.failfast=$(failfast) -clusters=$(clusters) -level=$(level) -configname=$(configname) + +command := /bin/tests -test.v -test.count=$(count) -test.timeout=60m -test.run=$(test_name) -test.parallel=$(clusters) \ -test.failfast=$(failfast) -clusters=$(clusters) -level=$(level) -configname=$(configname) .PHONY: docker diff --git a/systest/cluster/nodes.go b/systest/cluster/nodes.go index 1984817606..685fe9c44a 100644 --- a/systest/cluster/nodes.go +++ b/systest/cluster/nodes.go @@ -878,8 +878,7 @@ func deployNode( corev1.Volume().WithName("config"). WithConfigMap(corev1.ConfigMapVolumeSource().WithName(spacemeshConfigMapName)), corev1.Volume().WithName("data"). - WithEmptyDir(corev1.EmptyDirVolumeSource(). - WithSizeLimit(resource.MustParse(ctx.Storage.Size))), + WithEmptyDir(corev1.EmptyDirVolumeSource().WithSizeLimit(resource.MustParse(ctx.Storage.Size))), ). WithDNSConfig(corev1.PodDNSConfig().WithOptions( corev1.PodDNSConfigOption().WithName("timeout").WithValue("1"), diff --git a/systest/testcontext/context.go b/systest/testcontext/context.go index 8eb41fb0af..32ef855875 100644 --- a/systest/testcontext/context.go +++ b/systest/testcontext/context.go @@ -115,7 +115,7 @@ var ( 10, ) poetSize = parameters.Int( - "poet-size", "size of the poet servers", 1, + "poet-size", "size of the poet servers", 2, ) bsSize = parameters.Int( "bs-size", "size of bootstrappers", 1, @@ -250,7 +250,8 @@ func updateContext(ctx *Context) error { keep, err := strconv.ParseBool(keepval) if err != nil { ctx.Log.Panicw("invalid state. keep label should be parsable as a boolean", - "keepval", keepval) + "keepval", keepval, + ) } ctx.Keep = ctx.Keep || keep @@ -261,7 +262,8 @@ func updateContext(ctx *Context) error { psize, err := strconv.Atoi(psizeval) if err != nil { ctx.Log.Panicw("invalid state. poet size label should be parsable as an integer", - "psizeval", psizeval) + "psizeval", psizeval, + ) } ctx.PoetSize = psize return nil @@ -360,9 +362,9 @@ func New(t *testing.T, opts ...Opt) *Context { Keep: keep.Get(p), ClusterSize: clSize, BootnodeSize: max(2, (clSize/1000)*2), - RemoteSize: 0, + RemoteSize: clSize / 2, // 50% of smeshers are remote PoetSize: poetSize.Get(p), - OldSize: 0, + OldSize: clSize / 4, // 25% of smeshers are old (use previous version of go-spacemesh) BootstrapperSize: bsSize.Get(p), Image: imageFlag.Get(p), OldImage: oldImageFlag.Get(p), diff --git a/systest/tests/checkpoint_test.go b/systest/tests/checkpoint_test.go index 239d8f7646..28d357507b 100644 --- a/systest/tests/checkpoint_test.go +++ b/systest/tests/checkpoint_test.go @@ -39,12 +39,9 @@ func TestCheckpoint(t *testing.T) { tctx := testcontext.New(t) addedLater := 2 - size := min(tctx.ClusterSize, 30) - oldSize := size - addedLater - if tctx.ClusterSize > oldSize { - tctx.Log.Info("cluster size changed to ", oldSize) - tctx.ClusterSize = oldSize - } + oldSize := tctx.ClusterSize - addedLater + tctx.Log.Info("cluster size changed to ", oldSize) + tctx.ClusterSize = oldSize // at the last layer of epoch 3, in the beginning of poet round 2. // it is important to avoid check-pointing in the middle of cycle gap @@ -63,14 +60,15 @@ func TestCheckpoint(t *testing.T) { require.EqualValues(t, 4, layersPerEpoch, "checkpoint layer require tuning as layersPerEpoch is changed") layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) - eg, ctx := errgroup.WithContext(tctx) first := layersPerEpoch * 2 - stop := first + 2 + stop := first + 5 receiver := types.GenerateAddress([]byte{11, 1, 1}) tctx.Log.Infow("sending transactions", "from", first, "to", stop-1) - require.NoError(t, sendTransactions(ctx, eg, tctx.Log, cl, first, stop, receiver, 1, 100)) - require.NoError(t, eg.Wait()) + deadline := cl.Genesis().Add(time.Duration(stop+1) * layerDuration) + ctx, cancel := context.WithDeadline(tctx, deadline) + defer cancel() + require.NoError(t, sendTransactions(ctx, tctx.Log.Desugar(), cl, first, stop, receiver, 1, 100)) require.NoError(t, waitLayer(tctx, cl.Client(0), snapshotLayer)) tctx.Log.Debugw("getting account balances") @@ -100,7 +98,8 @@ func TestCheckpoint(t *testing.T) { diffs = append(diffs, cl.Client(i).Name) tctx.Log.Errorw("diff checkpoint data", fmt.Sprintf("reference %v", cl.Client(0).Name), string(checkpoints[0]), - fmt.Sprintf("client %v", cl.Client(i).Name), string(checkpoints[i])) + fmt.Sprintf("client %v", cl.Client(i).Name), string(checkpoints[i]), + ) } } require.Empty(t, diffs) @@ -173,8 +172,8 @@ func TestCheckpoint(t *testing.T) { ensureSmeshing(t, tctx, cl, checkpointEpoch) // increase the cluster size to the original test size - tctx.Log.Info("cluster size changed to ", size) - tctx.ClusterSize = size + tctx.ClusterSize += addedLater + tctx.Log.Info("cluster size changed to ", tctx.ClusterSize) require.NoError(t, cl.AddSmeshers(tctx, addedLater)) tctx.Log.Infow("waiting for all miners to be smeshing", "last epoch", lastEpoch) diff --git a/systest/tests/common.go b/systest/tests/common.go index 86cd7e1092..b6b3d267c0 100644 --- a/systest/tests/common.go +++ b/systest/tests/common.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "testing" "time" @@ -32,77 +33,74 @@ var retryBackoff = 10 * time.Second func sendTransactions( ctx context.Context, - eg *errgroup.Group, - logger *zap.SugaredLogger, + logger *zap.Logger, cl *cluster.Cluster, first, stop uint32, receiver types.Address, batch, amount int, ) error { + eg, ctx := errgroup.WithContext(ctx) for i := range cl.Accounts() { client := cl.Client(i % cl.Total()) nonce, err := getNonce(ctx, client, cl.Address(i)) if err != nil { - return fmt.Errorf("get nonce failed (%s:%s): %w", client.Name, cl.Address(i), err) + return fmt.Errorf("get nonce failed (%s: %s): %w", client.Name, cl.Address(i), err) } - watchLayers(ctx, eg, client, logger.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number == stop { + spawnLayer := math.MinInt + watchLayers(ctx, eg, client, logger, func(layer *pb.LayerStreamResponse) (bool, error) { + if layer.Layer.Number.Number >= stop { return false, nil } - if layer.Layer.Status != pb.Layer_LAYER_STATUS_APPROVED || - layer.Layer.Number.Number < first { + if int(layer.Layer.Number.Number) < spawnLayer+2 { + // wait for the spawn transaction to be applied + return true, nil + } + if layer.Layer.Status != pb.Layer_LAYER_STATUS_APPROVED || layer.Layer.Number.Number < first { return true, nil } - // give some time for a previous layer to be applied - // TODO(dshulyak) introduce api that simply subscribes to internal clock - // and outputs events when the tick for the layer is available - time.Sleep(200 * time.Millisecond) if nonce == 0 { - logger.Infow("address needs to be spawned", "account", i) + logger.Info("address needs to be spawned", + zap.String("client", client.Name), + zap.Stringer("address", cl.Address(i)), + ) if err := submitSpawn(ctx, cl, i, client); err != nil { return false, fmt.Errorf("failed to spawn %w", err) } + spawnLayer = int(layer.Layer.Number.Number) nonce++ return true, nil } - logger.Debugw("submitting transactions", - "layer", layer.Layer.Number.Number, - "client", client.Name, - "account", i, - "nonce", nonce, - "batch", batch, + logger.Debug("submitting transactions", + zap.Uint32("layer", layer.Layer.Number.Number), + zap.String("client", client.Name), + zap.Stringer("address", cl.Address(i)), + zap.Uint64("nonce", nonce), + zap.Int("batch", batch), ) - for j := 0; j < batch; j++ { - // in case spawn isn't executed on this particular client - retries := 3 - spendClient := client - for k := 0; k < retries; k++ { - err = submitSpend(ctx, cl, i, receiver, uint64(amount), nonce+uint64(j), spendClient) + for j := range batch { + var err error + for range 3 { // retry on failure 3 times + err = submitSpend(ctx, cl, i, receiver, uint64(amount), nonce+uint64(j), client) if err == nil { break } - logger.Warnw( - "failed to spend", - "client", - spendClient.Name, - "account", - i, - "nonce", - nonce+uint64(j), - "err", - err.Error(), + logger.Warn("failed to spend", + zap.String("client", client.Name), + zap.Stringer("address", cl.Address(i)), + zap.Uint64("nonce", nonce+uint64(j)), + zap.Error(err), ) - spendClient = cl.Client((i + k + 1) % cl.Total()) + time.Sleep(1 * time.Second) // wait before retrying } if err != nil { - return false, fmt.Errorf("spend failed %s %w", spendClient.Name, err) + return false, fmt.Errorf("spend failed %s %w", client.Name, err) } } nonce += uint64(batch) return true, nil }) } - return nil + return eg.Wait() } func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient) ([]byte, error) { @@ -145,9 +143,7 @@ BACKOFF: if cont, err := collector(state); !cont { return err } - case codes.Canceled: - return nil - case codes.DeadlineExceeded: + case codes.Canceled, codes.DeadlineExceeded: return nil case codes.Unavailable: if retries == attempts { @@ -331,53 +327,51 @@ func waitTransaction(ctx context.Context, eg *errgroup.Group, client *cluster.No }) } -func watchTransactionResults(ctx context.Context, - eg *errgroup.Group, +func watchTransactionResults( + ctx context.Context, client *cluster.NodeClient, log *zap.Logger, collector func(*pb.TransactionResult) (bool, error), -) { - eg.Go(func() error { - retries := 0 - BACKOFF: - api := pb.NewTransactionServiceClient(client.PubConn()) - rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true}) - if err != nil { +) error { + retries := 0 +BACKOFF: + api := pb.NewTransactionServiceClient(client.PubConn()) + rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true}) + if err != nil { + return err + } + for { + rst, err := rsts.Recv() + s, ok := status.FromError(err) + if !ok { return err } - for { - rst, err := rsts.Recv() - s, ok := status.FromError(err) - if !ok { + switch s.Code() { + case codes.OK: + if cont, err := collector(rst); !cont { return err } - switch s.Code() { - case codes.OK: - if cont, err := collector(rst); !cont { - return err - } - case codes.Canceled: - return nil - case codes.DeadlineExceeded: - return nil - case codes.Unavailable: - if retries == attempts { - return errors.New("transaction results unavailable") - } - retries++ - time.Sleep(retryBackoff) - goto BACKOFF - default: - log.Warn( - "transactions stream error", - zap.String("client", client.Name), - zap.Error(err), - zap.Any("status", s), - ) - return fmt.Errorf("stream error on receiving result %s: %w", client.Name, err) + case codes.Canceled: + return nil + case codes.DeadlineExceeded: + return nil + case codes.Unavailable: + if retries == attempts { + return errors.New("transaction results unavailable") } + retries++ + time.Sleep(retryBackoff) + goto BACKOFF + default: + log.Warn( + "transactions stream error", + zap.String("client", client.Name), + zap.Error(err), + zap.Any("status", s), + ) + return fmt.Errorf("stream error on receiving result %s: %w", client.Name, err) } - }) + } } func watchProposals( @@ -508,7 +502,8 @@ func submitSpawn(ctx context.Context, cluster *cluster.Cluster, account int, cli defer cancel() _, err := submitTransaction(ctx, wallet.SelfSpawn(cluster.Private(account), 0, sdk.WithGenesisID(cluster.GenesisID())), - client) + client, + ) return err } @@ -522,13 +517,8 @@ func submitSpend( ) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - _, err := submitTransaction(ctx, - wallet.Spend( - cluster.Private(account), receiver, amount, - nonce, - sdk.WithGenesisID(cluster.GenesisID()), - ), - client) + tx := wallet.Spend(cluster.Private(account), receiver, amount, nonce, sdk.WithGenesisID(cluster.GenesisID())) + _, err := submitTransaction(ctx, tx, client) return err } diff --git a/systest/tests/nodes_test.go b/systest/tests/nodes_test.go index 35c061f6f1..cca3674d93 100644 --- a/systest/tests/nodes_test.go +++ b/systest/tests/nodes_test.go @@ -27,18 +27,15 @@ func TestAddNodes(t *testing.T) { ) tctx := testcontext.New(t) - size := min(tctx.ClusterSize, 30) - oldSize := size - addedLater - if tctx.ClusterSize > oldSize { - tctx.Log.Info("cluster size changed to ", oldSize) - tctx.ClusterSize = oldSize - } + oldSize := tctx.ClusterSize - addedLater + tctx.Log.Info("cluster size changed to ", oldSize) + tctx.ClusterSize = oldSize cl, err := cluster.ReuseWait(tctx, cluster.WithKeys(tctx.ClusterSize)) require.NoError(t, err) // increase the cluster size to the original test size - tctx.Log.Info("cluster size changed to ", size) - tctx.ClusterSize = size + tctx.ClusterSize += addedLater + tctx.Log.Info("cluster size changed to ", tctx.ClusterSize) var eg errgroup.Group watchLayers(tctx, &eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { diff --git a/systest/tests/partition_test.go b/systest/tests/partition_test.go index 99cd3184af..6a9940ba39 100644 --- a/systest/tests/partition_test.go +++ b/systest/tests/partition_test.go @@ -2,8 +2,10 @@ package tests import ( "context" + "errors" "fmt" "testing" + "time" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/stretchr/testify/require" @@ -16,8 +18,8 @@ import ( "github.com/spacemeshos/go-spacemesh/systest/testcontext" ) -func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, pct int, wait uint32) { - require.Greater(t, cl.Bootnodes(), 1) +func testPartition(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster, pct int, wait uint32) { + require.Greater(tb, cl.Bootnodes(), 1) layersPerEpoch := uint32(testcontext.LayersPerEpoch.Get(tctx.Parameters)) var ( @@ -29,7 +31,11 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, ) tctx.Log.Debug("scheduling chaos...") - eg, ctx := errgroup.WithContext(tctx) + layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) + deadline := cl.Genesis().Add(time.Duration(stop) * layerDuration) + ctx, cancel := context.WithDeadline(tctx, deadline) + defer cancel() + eg, ctx := errgroup.WithContext(ctx) // make sure the first boot node is in the 2nd partition so the poet proof can be broadcast to both splits split := pct*cl.Total()/100 + 1 scheduleChaos(ctx, eg, cl.Client(0), tctx.Log.Desugar(), startSplit, rejoin, @@ -60,9 +66,14 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, // start sending transactions tctx.Log.Debug("sending transactions...") - eg2, ctx2 := errgroup.WithContext(tctx) receiver := types.GenerateAddress([]byte{11, 1, 1}) - require.NoError(t, sendTransactions(ctx2, eg2, tctx.Log, cl, first, stop, receiver, 10, 100)) + + ctx2, cancel := context.WithDeadline(tctx, deadline) + eg2, ctx2 := errgroup.WithContext(ctx2) + defer cancel() + eg2.Go(func() error { + return sendTransactions(ctx2, tctx.Log.Desugar(), cl, first, stop, receiver, 10, 100) + }) type stateUpdate struct { layer uint32 @@ -80,7 +91,7 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, return stateHashStream(ctx, node, tctx.Log.Desugar(), func(state *pb.GlobalStateStreamResponse) (bool, error) { data := state.Datum.Datum - require.IsType(t, &pb.GlobalStateData_GlobalState{}, data) + require.IsType(tb, &pb.GlobalStateData_GlobalState{}, data) resp := data.(*pb.GlobalStateData_GlobalState) layer := resp.GlobalState.Layer.Number @@ -92,11 +103,23 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, tctx.Log.Debugw("state hash collected", "client", node.Name, "layer", layer, - "state", stateHash.ShortString()) - stateCh <- &stateUpdate{ + "state", stateHash.ShortString(), + ) + select { + case stateCh <- &stateUpdate{ layer: layer, hash: stateHash, client: node.Name, + }: // continue + case <-ctx.Done(): + return false, ctx.Err() + default: + tctx.Log.Errorw("state hash channel is full", + "client", node.Name, + "layer", layer, + "state", stateHash.ShortString(), + ) + return false, errors.New("state hash channel is full") } return true, nil }, @@ -154,7 +177,8 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, "ref_client", cl.Client(0).Name, "layer", layer, "client_hash", clientState[layer], - "ref_hash", refState[layer]) + "ref_hash", refState[layer], + ) agree = false break } @@ -162,13 +186,14 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, if agree { tctx.Log.Debugw("client agreed with ref client on all layers", "client", cl.Client(i).Name, - "ref_client", cl.Client(0).Name) + "ref_client", cl.Client(0).Name, + ) } pass = pass && agree } - require.NoError(t, finalErr) - require.True(t, pass) - eg2.Wait() + require.NoError(tb, finalErr) + require.True(tb, pass) + require.NoError(tb, eg2.Wait()) } // TestPartition_30_70 tests the network partitioning with 30% and 70% of the nodes in each partition. @@ -176,9 +201,9 @@ func TestPartition_30_70(t *testing.T) { t.Parallel() tctx := testcontext.New(t) - if tctx.ClusterSize > 30 { - tctx.Log.Info("cluster size changed to 30") - tctx.ClusterSize = 30 + if tctx.ClusterSize > 20 { + tctx.Log.Info("cluster size changed to 20") + tctx.ClusterSize = 20 } cl, err := cluster.ReuseWait(tctx, cluster.WithKeys(tctx.ClusterSize)) require.NoError(t, err) @@ -191,9 +216,9 @@ func TestPartition_50_50(t *testing.T) { t.Parallel() tctx := testcontext.New(t) - if tctx.ClusterSize > 30 { - tctx.Log.Info("cluster size changed to 30") - tctx.ClusterSize = 30 + if tctx.ClusterSize > 20 { + tctx.Log.Info("cluster size changed to 20") + tctx.ClusterSize = 20 } cl, err := cluster.ReuseWait(tctx, cluster.WithKeys(tctx.ClusterSize)) require.NoError(t, err) diff --git a/systest/tests/poets_test.go b/systest/tests/poets_test.go index c2119e1b2c..ba04aa566b 100644 --- a/systest/tests/poets_test.go +++ b/systest/tests/poets_test.go @@ -213,7 +213,6 @@ func TestNodesUsingDifferentPoets(t *testing.T) { func TestRegisteringInPoetWithPowAndCert(t *testing.T) { t.Parallel() tctx := testcontext.New(t) - tctx.PoetSize = 2 cl := cluster.New(tctx, cluster.WithKeys(10)) require.NoError(t, cl.AddBootnodes(tctx, 2)) diff --git a/systest/tests/smeshing_test.go b/systest/tests/smeshing_test.go index 4531ead859..6ce1b74992 100644 --- a/systest/tests/smeshing_test.go +++ b/systest/tests/smeshing_test.go @@ -43,8 +43,6 @@ func TestSmeshing(t *testing.T) { t.Parallel() tctx := testcontext.New(t) - tctx.RemoteSize = tctx.ClusterSize / 4 // 25% of nodes are remote - tctx.OldSize = tctx.ClusterSize / 4 // 25% of nodes are old vests := vestingAccs{ prepareVesting(t, 3, 8, 20, 1e15, 10e15), prepareVesting(t, 5, 8, 20, 1e15, 10e15), @@ -57,7 +55,7 @@ func TestSmeshing(t *testing.T) { ) require.NoError(t, err) testSmeshing(t, tctx, cl) - testTransactions(t, tctx, cl, 8) + testTransactions(t, tctx, cl, 10) testVesting(t, tctx, cl, vests...) } diff --git a/systest/tests/steps_test.go b/systest/tests/steps_test.go index f2b463afef..8b525da77b 100644 --- a/systest/tests/steps_test.go +++ b/systest/tests/steps_test.go @@ -189,8 +189,7 @@ func TestStepReplaceNodes(t *testing.T) { require.NoError(t, err) var ( - max = cctx.ClusterSize * 2 / 10 - delete = rand.Intn(max) + 1 + delete = rand.Intn(cctx.ClusterSize*2/10) + 1 deleting []*cluster.NodeClient ) for i := cl.Bootnodes(); i < cl.Total() && len(deleting) < delete; i++ { diff --git a/systest/tests/timeskew_test.go b/systest/tests/timeskew_test.go index 80a6188e02..6a9f03d705 100644 --- a/systest/tests/timeskew_test.go +++ b/systest/tests/timeskew_test.go @@ -3,6 +3,7 @@ package tests import ( "context" "testing" + "time" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/stretchr/testify/require" @@ -13,8 +14,8 @@ import ( "github.com/spacemeshos/go-spacemesh/systest/testcontext" ) -// TestShortTimeskew runs a network where ~20% of nodes have their clocks skewed by 3 seconds. -func TestShortTimeskew(t *testing.T) { +// TestShortTimeSkew runs a network where ~20% of nodes have their clocks skewed by 3 seconds. +func TestShortTimeSkew(t *testing.T) { t.Parallel() tctx := testcontext.New(t) @@ -33,8 +34,13 @@ func TestShortTimeskew(t *testing.T) { "stop test", stopTest, ) + layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) + deadline := cl.Genesis().Add(time.Duration(stopTest) * layerDuration) + ctx, cancel := context.WithDeadline(tctx, deadline) + defer cancel() + eg, ctx := errgroup.WithContext(ctx) + failed := int(0.2 * float64(tctx.ClusterSize)) - eg, ctx := errgroup.WithContext(tctx) client := cl.Client(0) scheduleChaos( ctx, @@ -63,27 +69,21 @@ func TestShortTimeskew(t *testing.T) { // abstain on one or two layers. in such case longer delay might be necessary to confirm that layer var confirmed uint32 - watchLayers( - ctx, - eg, - client, - tctx.Log.Desugar(), - func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number == stopTest { + watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { + if layer.Layer.Number.Number >= stopTest { + return false, nil + } + if layer.Layer.Status == pb.Layer_LAYER_STATUS_APPLIED { + tctx.Log.Debugw( + "layer applied", "layer", layer.Layer.Number.Number, "hash", prettyHex(layer.Layer.Hash), + ) + confirmed = layer.Layer.Number.Number + if confirmed >= stopSkew { return false, nil } - if layer.Layer.Status == pb.Layer_LAYER_STATUS_APPLIED { - tctx.Log.Debugw( - "layer applied", "layer", layer.Layer.Number.Number, "hash", prettyHex(layer.Layer.Hash), - ) - confirmed = layer.Layer.Number.Number - if confirmed >= stopSkew { - return false, nil - } - } - return true, nil - }, - ) + } + return true, nil + }) require.NoError(t, eg.Wait()) require.LessOrEqual(t, int(stopSkew), int(confirmed)) } diff --git a/systest/tests/transactions_test.go b/systest/tests/transactions_test.go index 4b722bba01..8f4ee7fcbb 100644 --- a/systest/tests/transactions_test.go +++ b/systest/tests/transactions_test.go @@ -1,8 +1,10 @@ package tests import ( + "context" "encoding/hex" "testing" + "time" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/stretchr/testify/require" @@ -21,10 +23,10 @@ func testTransactions( ) { var ( // start sending transactions after two layers or after genesis - first = max(currentLayer(tctx, tb, cl.Client(0))+2, 8) - stopSending = first + sendFor - batch = 10 - amount = 100 + first = max(currentLayer(tctx, tb, cl.Client(0))+2, 8) + stop = first + sendFor + batch = 10 + amount = 100 // each account creates spawn transaction in the first layer // plus batch number of spend transactions in every layer after that @@ -32,7 +34,7 @@ func testTransactions( ) tctx.Log.Debugw("running transactions test", "from", first, - "stop sending", stopSending, + "stop sending", stop, "expected transactions", expectedCount, ) receiver := types.GenerateAddress([]byte{11, 1, 1}) @@ -44,28 +46,33 @@ func testTransactions( require.NoError(tb, err) before := response.AccountWrapper.StateCurrent.Balance - eg, ctx := errgroup.WithContext(tctx) - require.NoError( - tb, - sendTransactions(ctx, eg, tctx.Log, cl, first, stopSending, receiver, batch, amount), - ) + layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) + deadline := cl.Genesis().Add(time.Duration(stop+2) * layerDuration) // add some buffer for results to arrive + ctx, cancel := context.WithDeadline(tctx, deadline) + defer cancel() + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return sendTransactions(ctx, tctx.Log.Desugar(), cl, first, stop, receiver, batch, amount) + }) txs := make([][]*pb.Transaction, cl.Total()) for i := range cl.Total() { client := cl.Client(i) - watchTransactionResults(tctx.Context, eg, client, tctx.Log.Desugar(), - func(rst *pb.TransactionResult) (bool, error) { - txs[i] = append(txs[i], rst.Tx) - count := len(txs[i]) - tctx.Log.Debugw("received transaction client", - "layer", rst.Layer, - "client", client.Name, - "tx", "0x"+hex.EncodeToString(rst.Tx.Id), - "count", count, - ) - return len(txs[i]) < expectedCount, nil - }, - ) + eg.Go(func() error { + return watchTransactionResults(ctx, client, tctx.Log.Desugar(), + func(rst *pb.TransactionResult) (bool, error) { + txs[i] = append(txs[i], rst.Tx) + count := len(txs[i]) + tctx.Log.Debugw("received transaction client", + "layer", rst.Layer, + "client", client.Name, + "tx", "0x"+hex.EncodeToString(rst.Tx.Id), + "count", count, + ) + return len(txs[i]) < expectedCount, nil + }, + ) + }) } require.NoError(tb, eg.Wait())