From b465d47afbc007b54720dc5f36f4d99e56714f54 Mon Sep 17 00:00:00 2001 From: rachid Date: Wed, 14 Jun 2023 00:50:41 +0100 Subject: [PATCH] feat: add support for orchestrator catchup on data commitments --- orchestrator/orchestrator.go | 26 +++--- orchestrator/orchestrator_test.go | 51 ++++++++++++ orchestrator/suite_test.go | 13 ++- testing/celestia_network.go | 132 +++++++++++++++++++++++++++++- testing/testnode.go | 4 +- 5 files changed, 209 insertions(+), 17 deletions(-) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 260b389c..7efdef94 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -172,18 +172,20 @@ func (orch Orchestrator) StartNewEventsListener( // we only want to handle the attestation when the block is committed continue } - attestationEvent := mustGetEvent(result, attestationEventName) - nonce, err := strconv.Atoi(attestationEvent[0]) - if err != nil { - return err - } - orch.Logger.Info("enqueueing new attestation nonce", "nonce", nonce) - select { - case <-ctx.Done(): - return ctx.Err() - case <-signalChan: - return ErrSignalChanNotif - case queue <- uint64(nonce): + attestationEvents := mustGetEvent(result, attestationEventName) + for _, attEvent := range attestationEvents { + nonce, err := strconv.Atoi(attEvent) + if err != nil { + return err + } + orch.Logger.Info("enqueueing new attestation nonce", "nonce", nonce) + select { + case <-ctx.Done(): + return ctx.Err() + case <-signalChan: + return ErrSignalChanNotif + case queue <- uint64(nonce): + } } } } diff --git a/orchestrator/orchestrator_test.go b/orchestrator/orchestrator_test.go index 3e05af4b..e8566d73 100644 --- a/orchestrator/orchestrator_test.go +++ b/orchestrator/orchestrator_test.go @@ -1,10 +1,16 @@ package orchestrator_test import ( + "context" "math/big" "testing" "time" + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/celestiaorg/orchestrator-relayer/rpc" + tmlog "github.com/tendermint/tendermint/libs/log" + "github.com/celestiaorg/orchestrator-relayer/types" "github.com/ethereum/go-ethereum/common/hexutil" @@ -112,3 +118,48 @@ func TestValidatorPartOfValset(t *testing.T) { }) } } + +func (s *OrchestratorTestSuite) TestEnqueuingAttestationNonces() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t := s.T() + _, err := s.Node.CelestiaNetwork.WaitForHeight(10) + require.NoError(t, err) + + // nonces queue will be closed below + noncesQueue := make(chan uint64, 100) + signalChan := make(chan struct{}) + defer close(signalChan) + + go func() { + _ = s.Orchestrator.StartNewEventsListener(ctx, noncesQueue, signalChan) + }() + go func() { + _ = s.Orchestrator.EnqueueMissingEvents(ctx, noncesQueue, signalChan) + }() + + // set the data commitment window to a high value + s.Node.CelestiaNetwork.SetDataCommitmentWindow(t, 1000) + _, err = s.Node.CelestiaNetwork.WaitForHeightWithTimeout(1500, time.Minute) + assert.NoError(t, err) + + // set the data commitment window to a low value + s.Node.CelestiaNetwork.SetDataCommitmentWindow(t, 100) + + ecfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) + appQuerier := rpc.NewAppQuerier( + tmlog.NewNopLogger(), + s.Node.CelestiaNetwork.GRPCAddr, + ecfg, + ) + require.NoError(s.T(), appQuerier.Start()) + defer appQuerier.Stop() //nolint:errcheck + + latestNonce, err := appQuerier.QueryLatestAttestationNonce(ctx) + s.NoError(err) + + cancel() + close(noncesQueue) + assert.GreaterOrEqual(t, len(noncesQueue), int(latestNonce)) +} diff --git a/orchestrator/suite_test.go b/orchestrator/suite_test.go index 79ed33a7..95c0c1c7 100644 --- a/orchestrator/suite_test.go +++ b/orchestrator/suite_test.go @@ -4,6 +4,10 @@ import ( "context" "testing" + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/celestiaorg/celestia-app/test/util/testnode" + "github.com/celestiaorg/celestia-app/x/qgb/types" "github.com/celestiaorg/orchestrator-relayer/orchestrator" qgbtesting "github.com/celestiaorg/orchestrator-relayer/testing" "github.com/stretchr/testify/suite" @@ -18,7 +22,14 @@ type OrchestratorTestSuite struct { func (s *OrchestratorTestSuite) SetupSuite() { t := s.T() ctx := context.Background() - s.Node = qgbtesting.NewTestNode(ctx, t) + codec := encoding.MakeConfig(app.ModuleEncodingRegisters...).Codec + s.Node = qgbtesting.NewTestNode( + ctx, + t, + testnode.ImmediateProposals(codec), + qgbtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}), + // qgbtesting.SetVotingParams(codec, v1beta1.VotingParams{VotingPeriod: 100 * time.Hour}), + ) s.Orchestrator = qgbtesting.NewOrchestrator(t, s.Node) } diff --git a/testing/celestia_network.go b/testing/celestia_network.go index 2bb8ca83..433eadf8 100644 --- a/testing/celestia_network.go +++ b/testing/celestia_network.go @@ -2,11 +2,26 @@ package testing import ( "context" + "encoding/json" + "fmt" "testing" + "time" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + sdk "github.com/cosmos/cosmos-sdk/types" + v1 "github.com/cosmos/cosmos-sdk/x/gov/types/v1" + "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1" + "github.com/cosmos/cosmos-sdk/x/params/types/proposal" "github.com/stretchr/testify/require" celestiatestnode "github.com/celestiaorg/celestia-app/test/util/testnode" + "github.com/cosmos/cosmos-sdk/codec" + abci "github.com/tendermint/tendermint/abci/types" + tmrand "github.com/tendermint/tendermint/libs/rand" ) // CelestiaNetwork is a Celestia-app validator running in-process. @@ -22,7 +37,7 @@ type CelestiaNetwork struct { // NewCelestiaNetwork creates a new CelestiaNetwork. // Uses `testing.T` to fail if an error happens. // Only supports the creation of a single validator currently. -func NewCelestiaNetwork(ctx context.Context, t *testing.T) *CelestiaNetwork { +func NewCelestiaNetwork(ctx context.Context, t *testing.T, genesisOpts ...celestiatestnode.GenesisOption) *CelestiaNetwork { if testing.Short() { // The main reason for skipping these tests in short mode is to avoid detecting unrelated // race conditions. @@ -31,7 +46,26 @@ func NewCelestiaNetwork(ctx context.Context, t *testing.T) *CelestiaNetwork { // Thus, we can skip them as the races detected are not related to this repo. t.Skip("skipping tests in short mode.") } - accounts, clientContext := celestiatestnode.DefaultNetwork(t) + + // we create an arbitrary number of funded accounts + accounts := make([]string, 300) + for i := 0; i < 300; i++ { + accounts[i] = tmrand.Str(9) + } + + tmCfg := celestiatestnode.DefaultTendermintConfig() + tmCfg.Consensus.TargetHeightDuration = time.Millisecond * 5 + appConf := celestiatestnode.DefaultAppConfig() + + clientContext, _, _ := celestiatestnode.NewNetwork( + t, + celestiatestnode.DefaultParams(), + tmCfg, + appConf, + accounts, + genesisOpts..., + ) + appRPC := clientContext.GRPCClient.Target() status, err := clientContext.Client.Status(ctx) require.NoError(t, err) @@ -42,3 +76,97 @@ func NewCelestiaNetwork(ctx context.Context, t *testing.T) *CelestiaNetwork { RPCAddr: status.NodeInfo.ListenAddr, } } + +// SetDataCommitmentWindowParams will set the provided data commitment window as genesis state. +func SetDataCommitmentWindowParams(codec codec.Codec, params types.Params) celestiatestnode.GenesisOption { + return func(state map[string]json.RawMessage) map[string]json.RawMessage { + qgbGenState := types.DefaultGenesis() + qgbGenState.Params = ¶ms + state[types.ModuleName] = codec.MustMarshalJSON(qgbGenState) + return state + } +} + +// SetDataCommitmentWindow will use the validator account to set the data commitment +// window param. It assumes that the governance params have been set to +// allow for fast acceptance of proposals, and will fail the test if the +// parameters are not set as expected. +func (cn *CelestiaNetwork) SetDataCommitmentWindow(t *testing.T, window uint64) { + account := "validator" + + // create and submit a new param change proposal for the data commitment window + change := proposal.NewParamChange( + types.ModuleName, + string(types.ParamsStoreKeyDataCommitmentWindow), + fmt.Sprintf("\"%d\"", window), + ) + content := proposal.NewParameterChangeProposal( + "data commitment window update", + "description", + []proposal.ParamChange{change}, + ) + addr := getAddress(account, cn.Context.Keyring) + + msg, err := v1beta1.NewMsgSubmitProposal( + content, + sdk.NewCoins( + sdk.NewCoin(app.BondDenom, sdk.NewInt(1000000000000))), + addr, + ) + require.NoError(t, err) + + ecfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) + res, err := celestiatestnode.SignAndBroadcastTx(ecfg, cn.Context.Context, account, msg) + require.Equal(t, res.Code, abci.CodeTypeOK, res.RawLog) + require.NoError(t, err) + resp, err := cn.Context.WaitForTx(res.TxHash, 10) + require.NoError(t, err) + require.Equal(t, abci.CodeTypeOK, resp.TxResult.Code) + + require.NoError(t, cn.Context.WaitForNextBlock()) + + // query the proposal to get the id + gqc := v1.NewQueryClient(cn.Context.GRPCClient) + gresp, err := gqc.Proposals( + cn.Context.GoContext(), + &v1.QueryProposalsRequest{ + ProposalStatus: v1.ProposalStatus_PROPOSAL_STATUS_VOTING_PERIOD, + }, + ) + require.NoError(t, err) + require.Len(t, gresp.Proposals, 1) + + // create and submit a new vote + vote := v1.NewMsgVote( + getAddress(account, cn.Context.Keyring), + gresp.Proposals[0].Id, + v1.VoteOption_VOTE_OPTION_YES, + "", + ) + res, err = celestiatestnode.SignAndBroadcastTx(ecfg, cn.Context.Context, account, vote) + require.NoError(t, err) + resp, err = cn.Context.WaitForTx(res.TxHash, 10) + require.NoError(t, err) + require.Equal(t, abci.CodeTypeOK, resp.TxResult.Code) + + // wait for the voting period to complete + time.Sleep(time.Second * 5) + + // check that the parameters got updated as expected + bqc := types.NewQueryClient(cn.Context.GRPCClient) + presp, err := bqc.Params(cn.Context.GoContext(), &types.QueryParamsRequest{}) + require.NoError(t, err) + require.Equal(t, window, presp.Params.DataCommitmentWindow) +} + +func getAddress(account string, kr keyring.Keyring) sdk.AccAddress { + rec, err := kr.Key(account) + if err != nil { + panic(err) + } + addr, err := rec.GetAddress() + if err != nil { + panic(err) + } + return addr +} diff --git a/testing/testnode.go b/testing/testnode.go index 8dd367d1..6ccace5b 100644 --- a/testing/testnode.go +++ b/testing/testnode.go @@ -15,8 +15,8 @@ type TestNode struct { EVMChain *EVMChain } -func NewTestNode(ctx context.Context, t *testing.T) *TestNode { - celestiaNetwork := NewCelestiaNetwork(ctx, t) +func NewTestNode(ctx context.Context, t *testing.T, genesisOpts ...celestiatestnode.GenesisOption) *TestNode { + celestiaNetwork := NewCelestiaNetwork(ctx, t, genesisOpts...) dhtNetwork := NewDHTNetwork(ctx, 2) evmChain := NewEVMChain(celestiatestnode.NodeEVMPrivateKey)