diff --git a/.github/mergify.yml b/.github/mergify.yml new file mode 100644 index 000000000..5db55cc51 --- /dev/null +++ b/.github/mergify.yml @@ -0,0 +1,17 @@ +pull_request_rules: + - name: backport patches to v0.11.x branch + conditions: + - base=main + - label=backport:v0.11.x + actions: + backport: + branches: + - v0.11.x + - name: backport patches to v0.10.x branch + conditions: + - base=main + - label=backport:v0.10.x + actions: + backport: + branches: + - v0.10.x-lts diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index e6a397a01..ea973ed97 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -28,7 +28,7 @@ jobs: **/**.go go.mod go.sum - - uses: golangci/golangci-lint-action@v3.6.0 + - uses: golangci/golangci-lint-action@v3.7.0 with: version: latest args: --timeout 10m diff --git a/README.md b/README.md index 963135803..7a2f22137 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,8 @@ Please join our [Community Discord](https://discord.com/invite/YsnTPcSfWQ) to as ## Dependency Graph -To see our progress and a possible future of Rollkit visit our [Dependency Graph](./docs/specification/rollkit-dependency-graph.md). +To see our progress and a possible future of Rollkit visit our [Dependency +Graph](./docs/specification/rollkit-dependency-graph.md). ## Code of Conduct diff --git a/block/block_cache.go b/block/block_cache.go new file mode 100644 index 000000000..dcb9a9990 --- /dev/null +++ b/block/block_cache.go @@ -0,0 +1,60 @@ +package block + +import ( + "sync" + + "github.com/rollkit/rollkit/types" +) + +type BlockCache struct { + blocks map[uint64]*types.Block + hashes map[string]bool + hardConfirmations map[string]bool + mtx *sync.RWMutex +} + +func NewBlockCache() *BlockCache { + return &BlockCache{ + blocks: make(map[uint64]*types.Block), + hashes: make(map[string]bool), + hardConfirmations: make(map[string]bool), + mtx: new(sync.RWMutex), + } +} + +func (bc *BlockCache) getBlock(height uint64) (*types.Block, bool) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + block, ok := bc.blocks[height] + return block, ok +} + +func (bc *BlockCache) setBlock(height uint64, block *types.Block) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + bc.blocks[height] = block +} + +func (bc *BlockCache) deleteBlock(height uint64) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + delete(bc.blocks, height) +} + +func (bc *BlockCache) isSeen(hash string) bool { + bc.mtx.Lock() + defer bc.mtx.Unlock() + return bc.hashes[hash] +} + +func (bc *BlockCache) setSeen(hash string) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + bc.hashes[hash] = true +} + +func (bc *BlockCache) setHardConfirmed(hash string) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + bc.hardConfirmations[hash] = true +} diff --git a/block/manager.go b/block/manager.go index c17121d1b..8ea4939e6 100644 --- a/block/manager.go +++ b/block/manager.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "time" + goheaderstore "github.com/celestiaorg/go-header/store" abci "github.com/cometbft/cometbft/abci/types" cmcrypto "github.com/cometbft/cometbft/crypto" "github.com/cometbft/cometbft/crypto/merkle" @@ -30,10 +31,19 @@ import ( // defaultDABlockTime is used only if DABlockTime is not configured for manager const defaultDABlockTime = 30 * time.Second +// defaultBlockTime is used only if BlockTime is not configured for manager +const defaultBlockTime = 1 * time.Second + // maxSubmitAttempts defines how many times Rollkit will re-try to publish block to DA layer. // This is temporary solution. It will be removed in future versions. const maxSubmitAttempts = 30 +// Applies to most channels, 100 is a large enough buffer to avoid blocking +const channelLength = 100 + +// Applies to the blockInCh, 10000 is a large enough number for blocks per DA block. +const blockInChLength = 10000 + // initialBackoff defines initial value for block submission backoff var initialBackoff = 100 * time.Millisecond @@ -61,10 +71,18 @@ type Manager struct { // daHeight is the height of the latest processed DA block daHeight uint64 - HeaderCh chan *types.SignedHeader - BlockCh chan *types.Block - blockInCh chan newBlockEvent - syncCache map[uint64]*types.Block + HeaderCh chan *types.SignedHeader + BlockCh chan *types.Block + + blockInCh chan newBlockEvent + blockStore *goheaderstore.Store[*types.Block] + + blockCache *BlockCache + + // blockStoreMtx is used by blockStoreCond + blockStoreMtx *sync.Mutex + // blockStoreCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve blocks from blockStore + blockStoreCond *sync.Cond // retrieveMtx is used by retrieveCond retrieveMtx *sync.Mutex @@ -77,6 +95,10 @@ type Manager struct { buildingBlock bool txsAvailable <-chan struct{} doneBuildingBlock chan struct{} + + // Maintains blocks that need to be published to DA layer + pendingBlocks []*types.Block + pendingBlocksMtx *sync.RWMutex } // getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc. @@ -100,6 +122,7 @@ func NewManager( eventBus *cmtypes.EventBus, logger log.Logger, doneBuildingCh chan struct{}, + blockStore *goheaderstore.Store[*types.Block], ) (*Manager, error) { s, err := getInitialState(store, genesis) if err != nil { @@ -115,10 +138,15 @@ func NewManager( } if conf.DABlockTime == 0 { - logger.Info("WARNING: using default DA block time", "DABlockTime", defaultDABlockTime) + logger.Info("Using default DA block time", "DABlockTime", defaultDABlockTime) conf.DABlockTime = defaultDABlockTime } + if conf.BlockTime == 0 { + logger.Info("Using default block time", "BlockTime", defaultBlockTime) + conf.BlockTime = defaultBlockTime + } + exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger) if s.LastBlockHeight+1 == genesis.InitialHeight { res, err := exec.InitChain(genesis) @@ -150,18 +178,23 @@ func NewManager( retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP) daHeight: s.DAHeight, // channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary - HeaderCh: make(chan *types.SignedHeader, 100), - BlockCh: make(chan *types.Block, 100), - blockInCh: make(chan newBlockEvent, 100), + HeaderCh: make(chan *types.SignedHeader, channelLength), + BlockCh: make(chan *types.Block, channelLength), + blockInCh: make(chan newBlockEvent, blockInChLength), + blockStoreMtx: new(sync.Mutex), + blockStore: blockStore, retrieveMtx: new(sync.Mutex), lastStateMtx: new(sync.RWMutex), - syncCache: make(map[uint64]*types.Block), + blockCache: NewBlockCache(), logger: logger, txsAvailable: txsAvailableCh, doneBuildingBlock: doneBuildingCh, buildingBlock: false, + pendingBlocks: make([]*types.Block, 0), + pendingBlocksMtx: new(sync.RWMutex), } agg.retrieveCond = sync.NewCond(agg.retrieveMtx) + agg.blockStoreCond = sync.NewCond(agg.blockStoreMtx) return agg, nil } @@ -180,6 +213,11 @@ func (m *Manager) SetDALC(dalc da.DataAvailabilityLayerClient) { m.retriever = dalc.(da.BlockRetriever) } +// GetStoreHeight returns the manager's store height +func (m *Manager) GetStoreHeight() uint64 { + return m.store.Height() +} + // AggregationLoop is responsible for aggregating transactions into rollup-blocks. func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { initialHeight := uint64(m.genesis.InitialHeight) @@ -199,7 +237,6 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { time.Sleep(delay) } - //var timer *time.Timer timer := time.NewTimer(0) if !lazy { @@ -208,13 +245,13 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { case <-ctx.Done(): return case <-timer.C: - start := time.Now() - err := m.publishBlock(ctx) - if err != nil { - m.logger.Error("error while publishing block", "error", err) - } - timer.Reset(m.getRemainingSleep(start)) } + start := time.Now() + err := m.publishBlock(ctx) + if err != nil { + m.logger.Error("error while publishing block", "error", err) + } + timer.Reset(m.getRemainingSleep(start)) } } else { for { @@ -245,31 +282,60 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { } } +// BlockSubmissionLoop is responsible for submitting blocks to the DA layer. +func (m *Manager) BlockSubmissionLoop(ctx context.Context) { + timer := time.NewTicker(m.conf.DABlockTime) + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + } + err := m.submitBlocksToDA(ctx) + if err != nil { + m.logger.Error("error while submitting block to DA", "error", err) + } + } +} + // SyncLoop is responsible for syncing blocks. // -// SyncLoop processes headers gossiped in P2p network to know what's the latest block height, +// SyncLoop processes headers gossiped in P2P network to know what's the latest block height, // block data is retrieved from DA layer. func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) { daTicker := time.NewTicker(m.conf.DABlockTime) + blockTicker := time.NewTicker(m.conf.BlockTime) for { select { case <-daTicker.C: m.retrieveCond.Signal() + case <-blockTicker.C: + m.blockStoreCond.Signal() case blockEvent := <-m.blockInCh: block := blockEvent.block daHeight := blockEvent.daHeight - m.logger.Debug("block body retrieved from DALC", - "height", block.SignedHeader.Header.Height(), + blockHash := block.Hash().String() + blockHeight := uint64(block.Height()) + m.logger.Debug("block body retrieved", + "height", blockHeight, "daHeight", daHeight, - "hash", block.Hash(), + "hash", blockHash, ) - m.syncCache[block.SignedHeader.Header.BaseHeader.Height] = block + if m.blockCache.isSeen(blockHash) { + m.logger.Debug("block already seen", "height", blockHeight, "block hash", blockHash) + continue + } + m.blockCache.setBlock(blockHeight, block) + + m.blockStoreCond.Signal() m.retrieveCond.Signal() err := m.trySyncNextBlock(ctx, daHeight) if err != nil { m.logger.Info("failed to sync next block", "error", err) + continue } + m.blockCache.setSeen(blockHash) case <-ctx.Done(): return } @@ -285,7 +351,7 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { var commit *types.Commit currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory - b, ok := m.syncCache[currentHeight+1] + b, ok := m.blockCache.getBlock(currentHeight + 1) if !ok { return nil } @@ -296,7 +362,8 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { } if b != nil && commit != nil { - m.logger.Info("Syncing block", "height", b.SignedHeader.Header.Height()) + bHeight := uint64(b.Height()) + m.logger.Info("Syncing block", "height", bHeight) newState, responses, err := m.applyBlock(ctx, b) if err != nil { return fmt.Errorf("failed to ApplyBlock: %w", err) @@ -310,18 +377,18 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { return fmt.Errorf("failed to Commit: %w", err) } - err = m.store.SaveBlockResponses(uint64(b.SignedHeader.Header.Height()), responses) + err = m.store.SaveBlockResponses(uint64(bHeight), responses) if err != nil { return fmt.Errorf("failed to save block responses: %w", err) } // SaveValidators commits the DB tx - err = m.saveValidatorsToStore(uint64(b.SignedHeader.Header.Height())) + err = m.saveValidatorsToStore(bHeight) if err != nil { return err } - m.store.SetHeight(uint64(b.SignedHeader.Header.Height())) + m.store.SetHeight(bHeight) if daHeight > newState.DAHeight { newState.DAHeight = daHeight @@ -330,12 +397,74 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { if err != nil { m.logger.Error("failed to save updated state", "error", err) } - delete(m.syncCache, currentHeight+1) + m.blockCache.deleteBlock(currentHeight + 1) } return nil } +// BlockStoreRetrieveLoop is responsible for retrieving blocks from the Block Store. +func (m *Manager) BlockStoreRetrieveLoop(ctx context.Context) { + // waitCh is used to signal the block store retrieve loop, that it should check block store for new blocks + // blockStoreCond can be signalled in completely async manner, and goroutine below + // works as some kind of "buffer" for those signals + waitCh := make(chan interface{}) + lastBlockStoreHeight := uint64(0) + go func() { + for { + // This infinite loop is expected to be stopped once the context is + // cancelled or throws an error and cleaned up by the GC. This is OK + // because it waits using a conditional which is only signaled periodically. + m.blockStoreMtx.Lock() + m.blockStoreCond.Wait() + waitCh <- nil + m.blockStoreMtx.Unlock() + if ctx.Err() != nil { + return + } + } + }() + for { + select { + case <-ctx.Done(): + return + case <-waitCh: + } + blockStoreHeight := m.blockStore.Height() + if blockStoreHeight > lastBlockStoreHeight { + blocks, err := m.getBlocksFromBlockStore(ctx, lastBlockStoreHeight+1, blockStoreHeight) + if err != nil { + m.logger.Error("failed to get blocks from Block Store", "lastBlockHeight", lastBlockStoreHeight, "blockStoreHeight", blockStoreHeight, "errors", err.Error()) + continue + } + daHeight := atomic.LoadUint64(&m.daHeight) + for _, block := range blocks { + m.logger.Debug("block retrieved from p2p block sync", "blockHeight", block.Height(), "daHeight", daHeight) + m.blockInCh <- newBlockEvent{block, daHeight} + } + } + lastBlockStoreHeight = blockStoreHeight + } +} + +func (m *Manager) getBlocksFromBlockStore(ctx context.Context, startHeight, endHeight uint64) ([]*types.Block, error) { + if startHeight > endHeight { + return nil, fmt.Errorf("startHeight (%d) is greater than endHeight (%d)", startHeight, endHeight) + } + if startHeight == 0 { + startHeight++ + } + blocks := make([]*types.Block, endHeight-startHeight+1) + for i := startHeight; i <= endHeight; i++ { + block, err := m.blockStore.GetByHeight(ctx, i) + if err != nil { + return nil, err + } + blocks[i-startHeight] = block + } + return blocks, nil +} + // RetrieveLoop is responsible for interacting with DA layer. func (m *Manager) RetrieveLoop(ctx context.Context) { // waitCh is used to signal the retrieve loop, that it should process next blocks @@ -344,6 +473,9 @@ func (m *Manager) RetrieveLoop(ctx context.Context) { waitCh := make(chan interface{}) go func() { for { + // This infinite loop is expected to be stopped once the context is + // cancelled or throws an error and cleaned up by the GC. This is OK + // because it waits using a conditional which is only signaled periodically. m.retrieveMtx.Lock() m.retrieveCond.Wait() waitCh <- nil @@ -356,25 +488,17 @@ func (m *Manager) RetrieveLoop(ctx context.Context) { for { select { - case <-waitCh: - for { - select { - case <-ctx.Done(): - return - default: - } - daHeight := atomic.LoadUint64(&m.daHeight) - m.logger.Debug("retrieve", "daHeight", daHeight) - err := m.processNextDABlock(ctx) - if err != nil { - m.logger.Error("failed to retrieve block from DALC", "daHeight", daHeight, "errors", err.Error()) - break - } - atomic.AddUint64(&m.daHeight, 1) - } case <-ctx.Done(): return + case <-waitCh: } + daHeight := atomic.LoadUint64(&m.daHeight) + err := m.processNextDABlock(ctx) + if err != nil { + m.logger.Error("failed to retrieve block from DALC", "daHeight", daHeight, "errors", err.Error()) + continue + } + atomic.AddUint64(&m.daHeight, 1) } } @@ -394,7 +518,12 @@ func (m *Manager) processNextDABlock(ctx context.Context) error { } m.logger.Debug("retrieved potential blocks", "n", len(blockResp.Blocks), "daHeight", daHeight) for _, block := range blockResp.Blocks { - m.blockInCh <- newBlockEvent{block, daHeight} + blockHash := block.Hash().String() + m.blockCache.setHardConfirmed(blockHash) + m.logger.Info("block marked as hard confirmed", "blockHeight", block.Height(), "blockHash", blockHash) + if !m.blockCache.isSeen(blockHash) { + m.blockInCh <- newBlockEvent{block, daHeight} + } } return nil } @@ -527,6 +656,13 @@ func (m *Manager) publishBlock(ctx context.Context) error { return err } + blockHeight := uint64(block.Height()) + // Update the stored height before submitting to the DA layer and committing to the DB + m.store.SetHeight(blockHeight) + + blockHash := block.Hash().String() + m.blockCache.setSeen(blockHash) + if commit == nil { commit, err = m.getCommit(block.SignedHeader.Header) if err != nil { @@ -540,13 +676,10 @@ func (m *Manager) publishBlock(ctx context.Context) error { return err } - err = m.submitBlockToDA(ctx, block) - if err != nil { - m.logger.Error("Failed to submit block to DA Layer") - return err - } - - blockHeight := uint64(block.SignedHeader.Header.Height()) + // Submit block to be published to the DA layer + m.pendingBlocksMtx.Lock() + m.pendingBlocks = append(m.pendingBlocks, block) + m.pendingBlocksMtx.Unlock() // Commit the new state and block which writes to disk on the proxy app _, _, err = m.executor.Commit(ctx, newState, block, responses) @@ -566,9 +699,6 @@ func (m *Manager) publishBlock(ctx context.Context) error { return err } - // Only update the stored height after successfully submitting to DA layer and committing to the DB - m.store.SetHeight(blockHeight) - newState.DAHeight = atomic.LoadUint64(&m.daHeight) // After this call m.lastState is the NEW state returned from ApplyBlock // updateState also commits the DB tx @@ -583,20 +713,20 @@ func (m *Manager) publishBlock(ctx context.Context) error { // Publish block to channel so that block exchange service can broadcast m.BlockCh <- block - m.logger.Debug("successfully proposed block", "proposer", hex.EncodeToString(block.SignedHeader.ProposerAddress), "height", block.SignedHeader.Height()) + m.logger.Debug("successfully proposed block", "proposer", hex.EncodeToString(block.SignedHeader.ProposerAddress), "height", blockHeight) return nil } -func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error { - m.logger.Info("submitting block to DA layer", "height", block.SignedHeader.Header.Height()) - +func (m *Manager) submitBlocksToDA(ctx context.Context) error { + m.pendingBlocksMtx.Lock() + defer m.pendingBlocksMtx.Unlock() submitted := false backoff := initialBackoff for attempt := 1; ctx.Err() == nil && !submitted && attempt <= maxSubmitAttempts; attempt++ { - res := m.dalc.SubmitBlocks(ctx, []*types.Block{block}) + res := m.dalc.SubmitBlocks(ctx, m.pendingBlocks) if res.Code == da.StatusSuccess { - m.logger.Info("successfully submitted Rollkit block to DA layer", "rollkitHeight", block.SignedHeader.Header.Height(), "daHeight", res.DAHeight) + m.logger.Info("successfully submitted Rollkit block to DA layer", "daHeight", res.DAHeight) submitted = true } else { m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt) @@ -606,9 +736,9 @@ func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error } if !submitted { - return fmt.Errorf("Failed to submit block to DA layer after %d attempts", maxSubmitAttempts) + return fmt.Errorf("failed to submit block to DA layer after %d attempts", maxSubmitAttempts) } - + m.pendingBlocks = make([]*types.Block, 0) return nil } diff --git a/block/manager_test.go b/block/manager_test.go index 9b271b8f0..50083ff6c 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -85,7 +85,7 @@ func TestInitialState(t *testing.T) { require.NoError(t, dalc.Stop()) }() dumbChan := make(chan struct{}) - agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, dumbChan) + agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, dumbChan, nil) assert.NoError(err) assert.NotNil(agg) agg.lastStateMtx.RLock() diff --git a/config/defaults.go b/config/defaults.go index 2a218cb25..eeb0a9dbd 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -21,7 +21,8 @@ var DefaultNodeConfig = NodeConfig{ Aggregator: false, LazyAggregator: false, BlockManagerConfig: BlockManagerConfig{ - BlockTime: 30 * time.Second, + BlockTime: 1 * time.Second, + DABlockTime: 30 * time.Second, NamespaceID: types.NamespaceID{}, }, DALayer: "mock", diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index 81cfd83b8..7162a0510 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -4,11 +4,9 @@ import ( "context" "encoding/hex" "encoding/json" - "fmt" "strings" "time" - "cosmossdk.io/math" "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" @@ -99,7 +97,7 @@ func (c *DataAvailabilityLayerClient) SubmitBlocks(ctx context.Context, blocks [ blobs[blockIndex] = blockBlob } - txResponse, err := c.rpc.State.SubmitPayForBlob(ctx, math.NewInt(c.config.Fee), c.config.GasLimit, blobs) + dataLayerHeight, err := c.rpc.Blob.Submit(ctx, blobs) if err != nil { return da.ResultSubmitBlocks{ BaseResult: da.BaseResult{ @@ -109,24 +107,12 @@ func (c *DataAvailabilityLayerClient) SubmitBlocks(ctx context.Context, blocks [ } } - c.logger.Debug("successfully submitted PayForBlob transaction", - "fee", c.config.Fee, "gasLimit", c.config.GasLimit, - "daHeight", txResponse.Height, "daTxHash", txResponse.TxHash) - - if txResponse.Code != 0 { - return da.ResultSubmitBlocks{ - BaseResult: da.BaseResult{ - Code: da.StatusError, - Message: fmt.Sprintf("Codespace: '%s', Code: %d, Message: %s", txResponse.Codespace, txResponse.Code, txResponse.RawLog), - }, - } - } + c.logger.Debug("successfully submitted blobs", "daHeight", dataLayerHeight) return da.ResultSubmitBlocks{ BaseResult: da.BaseResult{ Code: da.StatusSuccess, - Message: "tx hash: " + txResponse.TxHash, - DAHeight: uint64(txResponse.Height), + DAHeight: uint64(dataLayerHeight), }, } } diff --git a/da/celestia/mock/messages.go b/da/celestia/mock/messages.go deleted file mode 100644 index 961476e8f..000000000 --- a/da/celestia/mock/messages.go +++ /dev/null @@ -1,13 +0,0 @@ -package mock - -// This code is extracted from celestia-app. It's here to build shares from messages (serialized blocks). -// TODO(tzdybal): if we stop using `/namespaced_shares` we can get rid of this file. - -// Share contains the raw share data without the corresponding namespace. -type Share []byte - -// NamespacedShare extends a Share with the corresponding namespace. -type NamespacedShare struct { - Share - ID []byte -} diff --git a/da/celestia/mock/server.go b/da/celestia/mock/server.go index 6bc088d66..329b84518 100644 --- a/da/celestia/mock/server.go +++ b/da/celestia/mock/server.go @@ -1,20 +1,18 @@ package mock import ( - "context" "encoding/base64" "encoding/json" "errors" "fmt" - "net" "net/http" + "net/http/httptest" "time" mux2 "github.com/gorilla/mux" "github.com/rollkit/celestia-openrpc/types/blob" "github.com/rollkit/celestia-openrpc/types/header" - "github.com/rollkit/celestia-openrpc/types/sdk" mockda "github.com/rollkit/rollkit/da/mock" "github.com/rollkit/rollkit/log" "github.com/rollkit/rollkit/store" @@ -55,7 +53,7 @@ type response struct { type Server struct { mock *mockda.DataAvailabilityLayerClient blockTime time.Duration - server *http.Server + server *httptest.Server logger log.Logger } @@ -69,33 +67,27 @@ func NewServer(blockTime time.Duration, logger log.Logger) *Server { } // Start starts HTTP server with given listener. -func (s *Server) Start(listener net.Listener) error { +func (s *Server) Start() (string, error) { kvStore, err := store.NewDefaultInMemoryKVStore() if err != nil { - return err + return "", err } err = s.mock.Init([8]byte{}, []byte(s.blockTime.String()), kvStore, s.logger) if err != nil { - return err + return "", err } err = s.mock.Start() if err != nil { - return err + return "", err } - go func() { - s.server = new(http.Server) - s.server.Handler = s.getHandler() - err := s.server.Serve(listener) - s.logger.Debug("http server exited with", "error", err) - }() - return nil + s.server = httptest.NewServer(s.getHandler()) + s.logger.Debug("http server exited with", "error", err) + return s.server.URL, nil } // Stop shuts down the Server. func (s *Server) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - _ = s.server.Shutdown(ctx) + s.server.Close() } func (s *Server) getHandler() http.Handler { @@ -197,20 +189,20 @@ func (s *Server) rpc(w http.ResponseWriter, r *http.Request) { return } s.writeResponse(w, bytes) - case "state.SubmitPayForBlob": + case "blob.Submit": var params []interface{} err := json.Unmarshal(req.Params, ¶ms) if err != nil { s.writeError(w, err) return } - if len(params) != 3 { - s.writeError(w, errors.New("expected 3 params: fee (uint64), gaslimit (uint64), data (base64 string)")) + if len(params) != 1 { + s.writeError(w, errors.New("expected 1 param: data (base64 string)")) return } - blocks := make([]*types.Block, len(params[2].([]interface{}))) - for i, data := range params[2].([]interface{}) { + blocks := make([]*types.Block, len(params[0].([]interface{}))) + for i, data := range params[0].([]interface{}) { blockBase64 := data.(map[string]interface{})["data"].(string) blockData, err := base64.StdEncoding.DecodeString(blockBase64) if err != nil { @@ -228,11 +220,9 @@ func (s *Server) rpc(w http.ResponseWriter, r *http.Request) { res := s.mock.SubmitBlocks(r.Context(), blocks) resp := &response{ Jsonrpc: "2.0", - Result: &sdk.TxResponse{ - Height: int64(res.DAHeight), - }, - ID: req.ID, - Error: nil, + Result: int64(res.DAHeight), + ID: req.ID, + Error: nil, } bytes, err := json.Marshal(resp) if err != nil { diff --git a/da/mock/mock.go b/da/mock/mock.go index 83e29ac0c..083b219db 100644 --- a/da/mock/mock.go +++ b/da/mock/mock.go @@ -52,7 +52,10 @@ func (m *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, d if err != nil { return err } - dah := core.NewDataAvailabilityHeader(eds) + dah, err := core.NewDataAvailabilityHeader(eds) + if err != nil { + return err + } m.daHeadersLock.Lock() m.daHeaders[m.daHeight] = &dah m.daHeadersLock.Unlock() @@ -215,7 +218,11 @@ func (m *DataAvailabilityLayerClient) updateDAHeight() { fmt.Println(err) return } - dah := core.NewDataAvailabilityHeader(eds) + dah, err := core.NewDataAvailabilityHeader(eds) + if err != nil { + fmt.Println(err) + return + } m.daHeadersLock.Lock() m.daHeaders[atomic.LoadUint64(&m.daHeight)] = &dah defer m.daHeadersLock.Unlock() diff --git a/da/mock/util.go b/da/mock/util.go index 9e21e4407..e2a454ae5 100644 --- a/da/mock/util.go +++ b/da/mock/util.go @@ -142,7 +142,7 @@ func RandEDS(size int) (*rsmt2d.ExtendedDataSquare, error) { return nil, err } // recompute the eds - return rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), NewConstructor(uint64(size))) + return rsmt2d.ComputeExtendedDataSquare(share.ToBytes(shares), share.DefaultRSMT2DCodec(), NewConstructor(uint64(size))) } // RandShares generate 'total' amount of shares filled with random data. It uses require.TestingT @@ -152,7 +152,7 @@ func RandShares(total int) ([]share.Share, error) { return nil, fmt.Errorf("total must be power of 2: %d", total) } - var r = rand.New(rand.NewSource(time.Now().Unix())) //nolint:gosec + var r = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec shares := make([]share.Share, total) for i := range shares { shr := make([]byte, appconsts.ShareSize) @@ -161,16 +161,20 @@ func RandShares(total int) ([]share.Share, error) { if err != nil { return nil, err } - shares[i] = shr + share, err := share.NewShare(shr) + if err != nil { + return nil, err + } + shares[i] = *share } - sort.Slice(shares, func(i, j int) bool { return bytes.Compare(shares[i], shares[j]) < 0 }) + sort.Slice(shares, func(i, j int) bool { return bytes.Compare(shares[i].ToBytes(), shares[j].ToBytes()) < 0 }) return shares, nil } // RandNamespace generates random valid data namespace for testing purposes. func RandNamespace() share.Namespace { - var r = rand.New(rand.NewSource(time.Now().Unix())) //nolint:gosec + var r = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec rb := make([]byte, namespace.NamespaceVersionZeroIDSize) r.Read(rb) // nolint:gosec for { diff --git a/da/test/da_test.go b/da/test/da_test.go index 34fac350f..269e46965 100644 --- a/da/test/da_test.go +++ b/da/test/da_test.go @@ -33,7 +33,6 @@ var ( testNamespaceID = types.NamespaceID{0, 1, 2, 3, 4, 5, 6, 7} testConfig = celestia.Config{ - BaseURL: "http://localhost:26658", Timeout: 30 * time.Second, GasLimit: 3000000, } @@ -119,16 +118,12 @@ func startMockGRPCServ() *grpc.Server { func startMockCelestiaNodeServer() *cmock.Server { httpSrv := cmock.NewServer(mockDaBlockTime, cmlog.NewTMLogger(os.Stdout)) - l, err := net.Listen("tcp4", "127.0.0.1:26658") - if err != nil { - fmt.Println("failed to create listener for mock celestia-node RPC server, error: %w", err) - return nil - } - err = httpSrv.Start(l) + url, err := httpSrv.Start() if err != nil { fmt.Println("can't start mock celestia-node RPC server") return nil } + testConfig.BaseURL = url return httpSrv } diff --git a/docker/mockserv.Dockerfile b/docker/mockserv.Dockerfile index 213dd5135..e263cb1e2 100644 --- a/docker/mockserv.Dockerfile +++ b/docker/mockserv.Dockerfile @@ -9,7 +9,7 @@ RUN go mod tidy -compat=1.19 && \ go build /src/da/grpc/mockserv/cmd/main.go # Final image -FROM alpine:3.18.2 +FROM alpine:3.18.3 WORKDIR /root diff --git a/go.mod b/go.mod index b0a9938aa..6f13dd7b8 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,9 @@ module github.com/rollkit/rollkit go 1.20 require ( - cosmossdk.io/math v1.0.1 - github.com/celestiaorg/go-header v0.2.12 - github.com/celestiaorg/nmt v0.17.0 - github.com/celestiaorg/rsmt2d v0.9.0 + github.com/celestiaorg/go-header v0.2.13 + github.com/celestiaorg/nmt v0.18.1 + github.com/celestiaorg/rsmt2d v0.10.0 github.com/celestiaorg/utils v0.1.0 github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.12 github.com/cometbft/cometbft v0.37.2 @@ -25,7 +24,7 @@ require ( github.com/libp2p/go-libp2p-pubsub v0.9.3 github.com/multiformats/go-multiaddr v0.10.1 github.com/prometheus/client_golang v1.16.0 - github.com/rollkit/celestia-openrpc v0.1.1 + github.com/rollkit/celestia-openrpc v0.1.2 github.com/rs/cors v1.9.0 github.com/spf13/cobra v1.7.0 github.com/spf13/viper v1.16.0 @@ -33,12 +32,13 @@ require ( github.com/tendermint/tendermint v0.35.9 go.uber.org/multierr v1.11.0 golang.org/x/net v0.14.0 - google.golang.org/grpc v1.55.0 + google.golang.org/grpc v1.56.1 google.golang.org/protobuf v1.31.0 // github.com/chandiniv1/go-substrate-rpc-client/v4 dfd1152c8eca19e7090112c105770db21149e3cf ) require ( + cosmossdk.io/math v1.0.1 // indirect github.com/ChainSafe/go-schnorrkel v1.0.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -53,7 +53,7 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect - github.com/cosmos/gogoproto v1.4.1 // indirect + github.com/cosmos/gogoproto v1.4.10 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set v1.8.0 // indirect @@ -85,6 +85,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/flatbuffers v1.12.1 // indirect + github.com/google/go-cmp v0.5.9 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20230405160723-4a4c7d95572b // indirect github.com/google/uuid v1.3.0 // indirect @@ -148,7 +149,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect - github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect + github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect github.com/pierrec/xxHash v0.1.5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index fafc75e09..4035f126c 100644 --- a/go.sum +++ b/go.sum @@ -194,16 +194,16 @@ github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n github.com/casbin/casbin/v2 v2.37.0/go.mod h1:vByNa/Fchek0KZUgG5wEsl7iFsiviAYKRtgrQfcJqHg= github.com/celestiaorg/go-fraud v0.1.2 h1:Bf7yIN3lZ4IR/Vlu5OtmcVCVNESBKEJ/xwu28rRKGA8= github.com/celestiaorg/go-fraud v0.1.2/go.mod h1:kHZXQY+6gd1kYkoWRFFKgWyrLPWRgDN3vd1Ll9gE/oo= -github.com/celestiaorg/go-header v0.2.12 h1:3H9nir20+MTY1vXbLxOUOV05ZspotR6JOiZGKxACHCQ= -github.com/celestiaorg/go-header v0.2.12/go.mod h1:NhiWq97NtAYyRBu8quzYOUghQULjgOzO2Ql0iVEFOf0= +github.com/celestiaorg/go-header v0.2.13 h1:sUJLXYs8ViPpxLXyIIaW3h4tPFgtVYMhzsLC4GHfS8I= +github.com/celestiaorg/go-header v0.2.13/go.mod h1:NhiWq97NtAYyRBu8quzYOUghQULjgOzO2Ql0iVEFOf0= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4/go.mod h1:fzuHnhzj1pUygGz+1ZkB3uQbEUL4htqCGJ4Qs2LwMZA= -github.com/celestiaorg/nmt v0.17.0 h1:/k8YLwJvuHgT/jQ435zXKaDX811+sYEMXL4B/vYdSLU= -github.com/celestiaorg/nmt v0.17.0/go.mod h1:ZndCeAR4l9lxm7W51ouoyTo1cxhtFgK+4DpEIkxRA3A= -github.com/celestiaorg/rsmt2d v0.9.0 h1:kon78I748ZqjNzI8OAqPN+2EImuZuanj/6gTh8brX3o= -github.com/celestiaorg/rsmt2d v0.9.0/go.mod h1:E06nDxfoeBDltWRvTR9dLviiUZI5/6mLXAuhSJzz3Iw= +github.com/celestiaorg/nmt v0.18.1 h1:zU3apzW4y0fs0ilQA74XnEYW8FvRv0CUK2LXK66L3rA= +github.com/celestiaorg/nmt v0.18.1/go.mod h1:0l8q6UYRju1xNrxtvV6NwPdW3lfsN6KuZ0htRnModdc= +github.com/celestiaorg/rsmt2d v0.10.0 h1:8dprr6CW5mCk5YPnbiLdirojw9YsJOE+XB+GORb8sT0= +github.com/celestiaorg/rsmt2d v0.10.0/go.mod h1:BiCZkCJfhDHUEOJKXUeu+CudjluecKvRTqHcuxKvodc= github.com/celestiaorg/utils v0.1.0 h1:WsP3O8jF7jKRgLNFmlDCwdThwOFMFxg0MnqhkLFVxPo= github.com/celestiaorg/utils v0.1.0/go.mod h1:vQTh7MHnvpIeCQZ2/Ph+w7K1R2UerDheZbgJEJD2hSU= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= @@ -270,8 +270,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y= github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY= github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw= -github.com/cosmos/gogoproto v1.4.1 h1:WoyH+0/jbCTzpKNvyav5FL1ZTWsp1im1MxEpJEzKUB8= -github.com/cosmos/gogoproto v1.4.1/go.mod h1:Ac9lzL4vFpBMcptJROQ6dQ4M3pOEK5Z/l0Q9p+LoCr4= +github.com/cosmos/gogoproto v1.4.10 h1:QH/yT8X+c0F4ZDacDv3z+xE3WU1P1Z3wQoLMBRJoKuI= +github.com/cosmos/gogoproto v1.4.10/go.mod h1:3aAZzeRWpAwr+SS/LLkICX2/kDFyaYVzckBDzygIxek= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -542,6 +542,7 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -1271,8 +1272,9 @@ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNc github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/performancecopilot/speed/v4 v4.0.0/go.mod h1:qxrSyuDGrTOWfV+uKRFhfxw6h/4HXRGUiZiufxo49BM= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= -github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= +github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 h1:hDSdbBuw3Lefr6R18ax0tZ2BJeNB3NehB3trOwYBsdU= +github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d/go.mod h1:3OzsM7FXDQlpCiw2j81fOmAwQLnZnLGXVKUzeKQXIAw= github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= @@ -1387,8 +1389,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rollkit/celestia-openrpc v0.1.1 h1:Ub8ydB1c0qDQJmYL+s7i5u9YiQyINxw88AL1S8A+Lig= -github.com/rollkit/celestia-openrpc v0.1.1/go.mod h1:Or8y0vuAzJu3SLOahIOt96+y4Vxaf74vrPFkUJl0oj0= +github.com/rollkit/celestia-openrpc v0.1.2 h1:EglDzgTLVF8JT+szvvRJ3cyZIliRQWf0+OJZjdksr1s= +github.com/rollkit/celestia-openrpc v0.1.2/go.mod h1:rAnRxt1XXLPBCIcZ5M1RWbXLlg8IrYFOXFmyNgn2148= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= diff --git a/mempool/clist/clist.go b/mempool/clist/clist.go index 29f7f5379..e565cc309 100644 --- a/mempool/clist/clist.go +++ b/mempool/clist/clist.go @@ -14,8 +14,6 @@ to ensure garbage collection of removed elements. import ( "fmt" "sync" - - cmsync "github.com/cometbft/cometbft/libs/sync" ) // MaxLength is the max allowed number of elements a linked list is @@ -42,7 +40,7 @@ and there's no reason to serialize that condition for goroutines waiting on NextWait() (since it's just a read operation). */ type CElement struct { - mtx cmsync.RWMutex + mtx sync.RWMutex prev *CElement prevWg *sync.WaitGroup prevWaitCh chan struct{} @@ -218,7 +216,7 @@ func (e *CElement) SetRemoved() { // Operations are goroutine-safe. // Panics if length grows beyond the max. type CList struct { - mtx cmsync.RWMutex + mtx sync.RWMutex wg *sync.WaitGroup waitCh chan struct{} head *CElement // first element diff --git a/node/full.go b/node/full.go index e097fd541..f36949161 100644 --- a/node/full.go +++ b/node/full.go @@ -151,12 +151,6 @@ func newFullNode( mpIDs := newMempoolIDs() mp.EnableTxsAvailable() - doneBuildingChannel := make(chan struct{}) - blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel) - if err != nil { - return nil, fmt.Errorf("BlockManager initialization error: %w", err) - } - headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "HeaderExchangeService")) if err != nil { return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err) @@ -167,6 +161,12 @@ func newFullNode( return nil, fmt.Errorf("BlockExchangeService initialization error: %w", err) } + doneBuildingChannel := make(chan struct{}) + blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel, blockExchangeService.blockStore) + if err != nil { + return nil, fmt.Errorf("BlockManager initialization error: %w", err) + } + ctx, cancel := context.WithCancel(ctx) node := &FullNode{ @@ -283,10 +283,12 @@ func (n *FullNode) OnStart() error { if n.conf.Aggregator { n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime) go n.blockManager.AggregationLoop(n.ctx, n.conf.LazyAggregator) + go n.blockManager.BlockSubmissionLoop(n.ctx) go n.headerPublishLoop(n.ctx) go n.blockPublishLoop(n.ctx) } go n.blockManager.RetrieveLoop(n.ctx) + go n.blockManager.BlockStoreRetrieveLoop(n.ctx) go n.blockManager.SyncLoop(n.ctx, n.cancel) return nil } diff --git a/node/full_client_test.go b/node/full_client_test.go index 6a0ede851..4fcafeaae 100644 --- a/node/full_client_test.go +++ b/node/full_client_test.go @@ -970,7 +970,7 @@ func TestMempool2Nodes(t *testing.T) { require.NoError(node2.Stop()) }() - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 3*time.Second) defer timeoutCancel() diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go index 32558f9f9..3a35160c7 100644 --- a/node/full_node_integration_test.go +++ b/node/full_node_integration_test.go @@ -169,68 +169,120 @@ func TestLazyAggregator(t *testing.T) { }, key, signingKey, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) assert.False(node.IsRunning()) assert.NoError(err) - err = node.Start() - assert.NoError(err) + + assert.NoError(node.Start()) defer func() { require.NoError(node.Stop()) }() assert.True(node.IsRunning()) - require.NoError(err) - - require.NoError(waitForFirstBlock(node.(*FullNode), false)) + require.NoError(waitForFirstBlock(node.(*FullNode), Header)) client := node.GetClient() _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 1}) assert.NoError(err) - require.NoError(waitForAtLeastNBlocks(node, 2, false)) + require.NoError(waitForAtLeastNBlocks(node, 2, Header)) _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 2}) assert.NoError(err) - require.NoError(waitForAtLeastNBlocks(node, 3, false)) + require.NoError(waitForAtLeastNBlocks(node, 3, Header)) _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 3}) assert.NoError(err) - require.NoError(waitForAtLeastNBlocks(node, 4, false)) + require.NoError(waitForAtLeastNBlocks(node, 4, Header)) +} + +// TestSingleAggregatorTwoFullNodesBlockSyncSpeed tests the scenario where the chain's block time is much faster than the DA's block time. In this case, the full nodes should be able to use block sync to sync blocks much faster than syncing from the DA layer, and the test should conclude within block time +func TestSingleAggregatorTwoFullNodesBlockSyncSpeed(t *testing.T) { + require := require.New(t) + aggCtx, aggCancel := context.WithCancel(context.Background()) + defer aggCancel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + clientNodes := 3 + bmConfig := getBMConfig() + bmConfig.BlockTime = 1 * time.Second + bmConfig.DABlockTime = 10 * time.Second + const numberOfBlocksTSyncTill = 5 + + ch := make(chan struct{}) + defer close(ch) + timer := time.NewTimer(numberOfBlocksTSyncTill * bmConfig.BlockTime) + + go func() { + select { + case <-ch: + // Channel closed before timer expired. + return + case <-timer.C: + // Timer expired before channel closed. + t.Error("nodes did not sync before DA Block time") + return + } + }() + nodes, _ := createNodes(aggCtx, ctx, clientNodes, bmConfig, t) + + node1 := nodes[0] + node2 := nodes[1] + node3 := nodes[2] + require.NoError(node1.Start()) + defer func() { + require.NoError(node1.Stop()) + }() + require.NoError(waitForFirstBlock(node1, Store)) + require.NoError(node2.Start()) + defer func() { + require.NoError(node2.Stop()) + }() + require.NoError(node3.Start()) + defer func() { + require.NoError(node3.Stop()) + }() + + require.NoError(waitForAtLeastNBlocks(node2, numberOfBlocksTSyncTill, Store)) + require.NoError(waitForAtLeastNBlocks(node3, numberOfBlocksTSyncTill, Store)) + + require.NoError(verifyNodesSynced(node1, node2, Store)) + require.NoError(verifyNodesSynced(node1, node3, Store)) } func TestBlockExchange(t *testing.T) { t.Run("SingleAggregatorSingleFullNode", func(t *testing.T) { - testSingleAggregatorSingleFullNode(t, true) + testSingleAggregatorSingleFullNode(t, Block) }) t.Run("SingleAggregatorSingleFullNode", func(t *testing.T) { - testSingleAggregatorTwoFullNode(t, true) + testSingleAggregatorTwoFullNode(t, Block) }) t.Run("SingleAggregatorSingleFullNode", func(t *testing.T) { - testSingleAggregatorSingleFullNodeTrustedHash(t, true) + testSingleAggregatorSingleFullNodeTrustedHash(t, Block) }) } func TestHeaderExchange(t *testing.T) { t.Run("SingleAggregatorSingleFullNode", func(t *testing.T) { - testSingleAggregatorSingleFullNode(t, false) + testSingleAggregatorSingleFullNode(t, Header) }) t.Run("SingleAggregatorTwoFullNode", func(t *testing.T) { - testSingleAggregatorTwoFullNode(t, false) + testSingleAggregatorTwoFullNode(t, Header) }) t.Run("SingleAggregatorSingleFullNodeTrustedHash", func(t *testing.T) { - testSingleAggregatorSingleFullNodeTrustedHash(t, false) + testSingleAggregatorSingleFullNodeTrustedHash(t, Header) }) t.Run("SingleAggregatorSingleFullNodeSingleLightNode", testSingleAggregatorSingleFullNodeSingleLightNode) } -func testSingleAggregatorSingleFullNode(t *testing.T, useBlockExchange bool) { +func testSingleAggregatorSingleFullNode(t *testing.T, source Source) { require := require.New(t) aggCtx, aggCancel := context.WithCancel(context.Background()) defer aggCancel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clientNodes := 1 - nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, t) + clientNodes := 2 + nodes, _ := createNodes(aggCtx, ctx, clientNodes, getBMConfig(), t) node1 := nodes[0] node2 := nodes[1] @@ -240,26 +292,26 @@ func testSingleAggregatorSingleFullNode(t *testing.T, useBlockExchange bool) { require.NoError(node1.Stop()) }() - require.NoError(waitForFirstBlock(node1, useBlockExchange)) + require.NoError(waitForFirstBlock(node1, source)) require.NoError(node2.Start()) defer func() { require.NoError(node2.Stop()) }() - require.NoError(waitForAtLeastNBlocks(node2, 2, useBlockExchange)) - require.NoError(verifyNodesSynced(node1, node2, useBlockExchange)) + require.NoError(waitForAtLeastNBlocks(node2, 2, source)) + require.NoError(verifyNodesSynced(node1, node2, source)) } -func testSingleAggregatorTwoFullNode(t *testing.T, useBlockExchange bool) { +func testSingleAggregatorTwoFullNode(t *testing.T, source Source) { require := require.New(t) aggCtx, aggCancel := context.WithCancel(context.Background()) defer aggCancel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clientNodes := 2 - nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, t) + clientNodes := 3 + nodes, _ := createNodes(aggCtx, ctx, clientNodes, getBMConfig(), t) node1 := nodes[0] node2 := nodes[1] @@ -269,7 +321,7 @@ func testSingleAggregatorTwoFullNode(t *testing.T, useBlockExchange bool) { defer func() { require.NoError(node1.Stop()) }() - require.NoError(waitForFirstBlock(node1, useBlockExchange)) + require.NoError(waitForFirstBlock(node1, source)) require.NoError(node2.Start()) defer func() { require.NoError(node2.Stop()) @@ -279,19 +331,21 @@ func testSingleAggregatorTwoFullNode(t *testing.T, useBlockExchange bool) { require.NoError(node3.Stop()) }() - require.NoError(waitForAtLeastNBlocks(node2, 2, useBlockExchange)) - require.NoError(verifyNodesSynced(node1, node2, useBlockExchange)) + require.NoError(waitForAtLeastNBlocks(node2, 2, source)) + require.NoError(waitForAtLeastNBlocks(node3, 2, source)) + require.NoError(verifyNodesSynced(node1, node2, source)) + require.NoError(verifyNodesSynced(node1, node3, source)) } -func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, useBlockExchange bool) { +func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, source Source) { require := require.New(t) aggCtx, aggCancel := context.WithCancel(context.Background()) defer aggCancel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clientNodes := 1 - nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, t) + clientNodes := 2 + nodes, _ := createNodes(aggCtx, ctx, clientNodes, getBMConfig(), t) node1 := nodes[0] node2 := nodes[1] @@ -301,7 +355,7 @@ func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, useBlockExchang require.NoError(node1.Stop()) }() - require.NoError(waitForFirstBlock(node1, useBlockExchange)) + require.NoError(waitForFirstBlock(node1, source)) // Get the trusted hash from node1 and pass it to node2 config trustedHash, err := node1.hExService.headerStore.GetByHeight(aggCtx, 1) @@ -312,8 +366,8 @@ func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, useBlockExchang require.NoError(node2.Stop()) }() - require.NoError(waitForAtLeastNBlocks(node1, 2, useBlockExchange)) - require.NoError(verifyNodesSynced(node1, node2, useBlockExchange)) + require.NoError(waitForAtLeastNBlocks(node1, 2, source)) + require.NoError(verifyNodesSynced(node1, node2, source)) } func testSingleAggregatorSingleFullNodeSingleLightNode(t *testing.T) { @@ -335,15 +389,16 @@ func testSingleAggregatorSingleFullNodeSingleLightNode(t *testing.T) { defer func() { require.NoError(dalc.Stop()) }() - sequencer, _ := createNode(aggCtx, 0, true, false, keys, t) - fullNode, _ := createNode(ctx, 1, false, false, keys, t) + bmConfig := getBMConfig() + sequencer, _ := createNode(aggCtx, 0, true, false, keys, bmConfig, t) + fullNode, _ := createNode(ctx, 1, false, false, keys, bmConfig, t) sequencer.(*FullNode).dalc = dalc sequencer.(*FullNode).blockManager.SetDALC(dalc) fullNode.(*FullNode).dalc = dalc fullNode.(*FullNode).blockManager.SetDALC(dalc) - lightNode, _ := createNode(ctx, 2, false, true, keys, t) + lightNode, _ := createNode(ctx, 2, false, true, keys, bmConfig, t) require.NoError(sequencer.Start()) defer func() { @@ -358,8 +413,9 @@ func testSingleAggregatorSingleFullNodeSingleLightNode(t *testing.T) { require.NoError(lightNode.Stop()) }() - require.NoError(waitForAtLeastNBlocks(sequencer.(*FullNode), 2, false)) - require.NoError(verifyNodesSynced(fullNode, lightNode, false)) + require.NoError(waitForAtLeastNBlocks(sequencer.(*FullNode), 2, Header)) + require.NoError(verifyNodesSynced(sequencer, fullNode, Header)) + require.NoError(verifyNodesSynced(fullNode, lightNode, Header)) } // Creates a starts the given number of client nodes along with an aggregator node. Uses the given flag to decide whether to have the aggregator produce malicious blocks. @@ -368,7 +424,7 @@ func createAndStartNodes(clientNodes int, t *testing.T) ([]*FullNode, []*mocks.A defer aggCancel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, t) + nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, getBMConfig(), t) startNodes(nodes, apps, t) defer func() { for _, n := range nodes { @@ -384,13 +440,15 @@ func startNodes(nodes []*FullNode, apps []*mocks.Application, t *testing.T) { // Wait for aggregator node to publish the first block for full nodes to initialize header exchange service require.NoError(t, nodes[0].Start()) - require.NoError(t, waitForFirstBlock(nodes[0], false)) + require.NoError(t, waitForFirstBlock(nodes[0], Header)) for i := 1; i < len(nodes); i++ { require.NoError(t, nodes[i].Start()) } // wait for nodes to start up and establish connections; 1 second ensures that test pass even on CI. - require.NoError(t, waitForAtLeastNBlocks(nodes[1], 2, false)) + for i := 1; i < len(nodes); i++ { + require.NoError(t, waitForAtLeastNBlocks(nodes[i], 2, Header)) + } for i := 1; i < len(nodes); i++ { data := strconv.Itoa(i) + time.Now().String() @@ -403,7 +461,7 @@ func startNodes(nodes []*FullNode, apps []*mocks.Application, t *testing.T) { defer close(doneChan) // create a MockTester, to catch the Failed asserts from the Mock package m := MockTester{t: t} - // We don't nedd to check any specific arguments to DeliverTx + // We don't need to check any specific arguments to DeliverTx // so just use a function that returns "true" for matching the args matcher := mock.MatchedBy(func(i interface{}) bool { return true }) err := testutils.Retry(300, 100*time.Millisecond, func() error { @@ -425,7 +483,7 @@ func startNodes(nodes []*FullNode, apps []*mocks.Application, t *testing.T) { } // Creates the given number of nodes the given nodes using the given wait group to synchornize them -func createNodes(aggCtx, ctx context.Context, num int, t *testing.T) ([]*FullNode, []*mocks.Application) { +func createNodes(aggCtx, ctx context.Context, num int, bmConfig config.BlockManagerConfig, t *testing.T) ([]*FullNode, []*mocks.Application) { t.Helper() if aggCtx == nil { @@ -447,14 +505,14 @@ func createNodes(aggCtx, ctx context.Context, num int, t *testing.T) ([]*FullNod ds, _ := store.NewDefaultInMemoryKVStore() _ = dalc.Init([8]byte{}, nil, ds, test.NewFileLoggerCustom(t, test.TempLogFileName(t, "dalc"))) _ = dalc.Start() - node, app := createNode(aggCtx, 0, true, false, keys, t) + node, app := createNode(aggCtx, 0, true, false, keys, bmConfig, t) apps[0] = app nodes[0] = node.(*FullNode) // use same, common DALC, so nodes can share data nodes[0].dalc = dalc nodes[0].blockManager.SetDALC(dalc) for i := 1; i < num; i++ { - node, apps[i] = createNode(ctx, i, false, false, keys, t) + node, apps[i] = createNode(ctx, i, false, false, keys, bmConfig, t) nodes[i] = node.(*FullNode) nodes[i].dalc = dalc nodes[i].blockManager.SetDALC(dalc) @@ -463,7 +521,7 @@ func createNodes(aggCtx, ctx context.Context, num int, t *testing.T) ([]*FullNod return nodes, apps } -func createNode(ctx context.Context, n int, aggregator bool, isLight bool, keys []crypto.PrivKey, t *testing.T) (Node, *mocks.Application) { +func createNode(ctx context.Context, n int, aggregator bool, isLight bool, keys []crypto.PrivKey, bmConfig config.BlockManagerConfig, t *testing.T) (Node, *mocks.Application) { t.Helper() require := require.New(t) // nodes will listen on consecutive ports on local interface @@ -472,11 +530,6 @@ func createNode(ctx context.Context, n int, aggregator bool, isLight bool, keys p2pConfig := config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n), } - bmConfig := config.BlockManagerConfig{ - DABlockTime: 100 * time.Millisecond, - BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly - NamespaceID: types.NamespaceID{8, 7, 6, 5, 4, 3, 2, 1}, - } for i := 0; i < len(keys); i++ { if i == n { continue diff --git a/node/helpers_test.go b/node/helpers_test.go index 8f515fa8f..f4972a11d 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -40,8 +40,9 @@ func TestGetNodeHeight(t *testing.T) { for i := 0; i < num; i++ { keys[i], _, _ = crypto.GenerateEd25519Key(rand.Reader) } - fullNode, _ := createNode(ctx, 0, true, false, keys, t) - lightNode, _ := createNode(ctx, 1, true, true, keys, t) + bmConfig := getBMConfig() + fullNode, _ := createNode(ctx, 0, true, false, keys, bmConfig, t) + lightNode, _ := createNode(ctx, 1, true, true, keys, bmConfig, t) fullNode.(*FullNode).dalc = dalc fullNode.(*FullNode).blockManager.SetDALC(dalc) require.NoError(fullNode.Start()) @@ -55,7 +56,7 @@ func TestGetNodeHeight(t *testing.T) { }() require.NoError(testutils.Retry(1000, 100*time.Millisecond, func() error { - num, err := getNodeHeight(fullNode, false) + num, err := getNodeHeight(fullNode, Header) if err != nil { return err } @@ -65,7 +66,27 @@ func TestGetNodeHeight(t *testing.T) { return errors.New("expected height > 0") })) require.NoError(testutils.Retry(1000, 100*time.Millisecond, func() error { - num, err := getNodeHeight(lightNode, false) + num, err := getNodeHeight(fullNode, Block) + if err != nil { + return err + } + if num > 0 { + return nil + } + return errors.New("expected height > 0") + })) + require.NoError(testutils.Retry(1000, 100*time.Millisecond, func() error { + num, err := getNodeHeight(fullNode, Store) + if err != nil { + return err + } + if num > 0 { + return nil + } + return errors.New("expected height > 0") + })) + require.NoError(testutils.Retry(1000, 100*time.Millisecond, func() error { + num, err := getNodeHeight(lightNode, Header) if err != nil { return err } diff --git a/node/mempool.go b/node/mempool.go index 1884b130f..214e8c185 100644 --- a/node/mempool.go +++ b/node/mempool.go @@ -3,8 +3,8 @@ package node import ( "fmt" "math" + "sync" - cmsync "github.com/cometbft/cometbft/libs/sync" "github.com/libp2p/go-libp2p/core/peer" ) @@ -13,7 +13,7 @@ const ( ) type mempoolIDs struct { - mtx cmsync.RWMutex + mtx sync.RWMutex peerMap map[peer.ID]uint16 nextID uint16 // assumes that a node will never have over 65536 active peers activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter diff --git a/node/test_helpers.go b/node/test_helpers.go index 4185a49f0..e2e19d946 100644 --- a/node/test_helpers.go +++ b/node/test_helpers.go @@ -12,7 +12,17 @@ import ( cmtypes "github.com/cometbft/cometbft/types" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/rollkit/rollkit/config" "github.com/rollkit/rollkit/conv" + "github.com/rollkit/rollkit/types" +) + +type Source int + +const ( + Header Source = iota + Block + Store ) var genesisValidatorKey = ed25519.GenPrivKey() @@ -29,14 +39,32 @@ func (m MockTester) Logf(format string, args ...interface{}) {} func (m MockTester) Errorf(format string, args ...interface{}) {} -func waitForFirstBlock(node *FullNode, useBlockExchange bool) error { - return waitForAtLeastNBlocks(node, 1, useBlockExchange) +func waitForFirstBlock(node Node, source Source) error { + return waitForAtLeastNBlocks(node, 1, source) } -func getNodeHeight(node Node, useBlockExchange bool) (uint64, error) { - if useBlockExchange { +func getBMConfig() config.BlockManagerConfig { + return config.BlockManagerConfig{ + DABlockTime: 100 * time.Millisecond, + BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly + NamespaceID: types.NamespaceID{8, 7, 6, 5, 4, 3, 2, 1}, + } +} + +func getNodeHeight(node Node, source Source) (uint64, error) { + switch source { + case Header: + return getNodeHeightFromHeader(node) + case Block: return getNodeHeightFromBlock(node) + case Store: + return getNodeHeightFromStore(node) + default: + return 0, errors.New("invalid source") } +} + +func getNodeHeightFromHeader(node Node) (uint64, error) { if fn, ok := node.(*FullNode); ok { return fn.hExService.headerStore.Height(), nil } @@ -53,13 +81,20 @@ func getNodeHeightFromBlock(node Node) (uint64, error) { return 0, errors.New("not a full node") } -func verifyNodesSynced(node1, node2 Node, useBlockExchange bool) error { +func getNodeHeightFromStore(node Node) (uint64, error) { + if fn, ok := node.(*FullNode); ok { + return fn.blockManager.GetStoreHeight(), nil + } + return 0, errors.New("not a full node") +} + +func verifyNodesSynced(node1, node2 Node, source Source) error { return testutils.Retry(300, 100*time.Millisecond, func() error { - n1Height, err := getNodeHeight(node1, useBlockExchange) + n1Height, err := getNodeHeight(node1, source) if err != nil { return err } - n2Height, err := getNodeHeight(node2, useBlockExchange) + n2Height, err := getNodeHeight(node2, source) if err != nil { return err } @@ -70,9 +105,9 @@ func verifyNodesSynced(node1, node2 Node, useBlockExchange bool) error { }) } -func waitForAtLeastNBlocks(node Node, n int, useBlockExchange bool) error { +func waitForAtLeastNBlocks(node Node, n int, source Source) error { return testutils.Retry(300, 100*time.Millisecond, func() error { - nHeight, err := getNodeHeight(node, useBlockExchange) + nHeight, err := getNodeHeight(node, source) if err != nil { return err } diff --git a/state/executor.go b/state/executor.go index 5cb6f38c8..5d8de0556 100644 --- a/state/executor.go +++ b/state/executor.go @@ -104,7 +104,7 @@ func (e *BlockExecutor) CreateBlock(height uint64, lastCommit *types.Commit, las BaseHeader: types.BaseHeader{ ChainID: e.chainID, Height: height, - Time: uint64(time.Now().Unix()), // TODO(tzdybal): how to get TAI64? + Time: uint64(time.Now().UnixNano()), // TODO(tzdybal): how to get TAI64? }, //LastHeaderHash: lastHeaderHash, //LastCommitHash: lastCommitHash, diff --git a/state/indexer/query_range.go b/state/indexer/query_range.go index 69907afe7..57cc4eaf1 100644 --- a/state/indexer/query_range.go +++ b/state/indexer/query_range.go @@ -48,7 +48,7 @@ func (qr QueryRange) LowerBoundValue() interface{} { tmp := new(big.Int) return tmp.Add(t, big.NewInt(1)) case time.Time: - return t.Unix() + 1 + return t.UnixNano() + 1 default: panic("not implemented") @@ -73,7 +73,7 @@ func (qr QueryRange) UpperBoundValue() interface{} { tmp := new(big.Int) return tmp.Sub(t, big.NewInt(1)) case time.Time: - return t.Unix() - 1 + return t.UnixNano() - 1 default: panic("not implemented") diff --git a/types/header.go b/types/header.go index 70c362a0c..61ce13121 100644 --- a/types/header.go +++ b/types/header.go @@ -15,7 +15,7 @@ type Hash = header.Hash type BaseHeader struct { // Height represents the block height (aka block number) of a given header Height uint64 - // Time contains Unix time of a block + // Time contains Unix nanotime of a block Time uint64 // The Chain ID ChainID string @@ -71,8 +71,9 @@ func (h *Header) LastHeader() Hash { return h.LastHeaderHash[:] } +// Returns unix time with nanosecond precision func (h *Header) Time() time.Time { - return time.Unix(int64(h.BaseHeader.Time), 0) + return time.Unix(0, int64(h.BaseHeader.Time)) } func (h *Header) Verify(untrst header.Header) error { diff --git a/types/signed_header.go b/types/signed_header.go index b8226dc07..575ed7157 100644 --- a/types/signed_header.go +++ b/types/signed_header.go @@ -30,6 +30,17 @@ func (sH *SignedHeader) Verify(untrst header.Header) error { Reason: err, } } + if err := sH.Header.Verify(&untrstH.Header); err != nil { + return &header.VerifyError{ + Reason: err, + } + } + + // TODO: Accept non-adjacent headers until go-header implements feature to accept non-adjacent + if sH.Height()+1 < untrst.Height() { + return nil + } + sHHash := sH.Header.Hash() if !bytes.Equal(untrstH.LastHeaderHash[:], sHHash) { return &header.VerifyError{ @@ -42,7 +53,7 @@ func (sH *SignedHeader) Verify(untrst header.Header) error { Reason: fmt.Errorf("last commit hash %v does not match hash of previous header %v", untrstH.LastCommitHash[:], sHHash), } } - return sH.Header.Verify(&untrstH.Header) + return nil } var _ header.Header = &SignedHeader{} diff --git a/types/signed_header_test.go b/types/signed_header_test.go index da377fd08..5f7012195 100644 --- a/types/signed_header_test.go +++ b/types/signed_header_test.go @@ -56,14 +56,14 @@ func TestVerify(t *testing.T) { }, { prepare: func() *SignedHeader { - untrustedAdj.Header.BaseHeader.Time = uint64(untrustedAdj.Header.Time().Truncate(time.Hour).Unix()) + untrustedAdj.Header.BaseHeader.Time = uint64(untrustedAdj.Header.Time().Truncate(time.Hour).UnixNano()) return untrustedAdj }, err: true, }, { prepare: func() *SignedHeader { - untrustedAdj.Header.BaseHeader.Time = uint64(untrustedAdj.Header.Time().Add(time.Minute).Unix()) + untrustedAdj.Header.BaseHeader.Time = uint64(untrustedAdj.Header.Time().Add(time.Minute).UnixNano()) return untrustedAdj }, err: true, diff --git a/types/test_utils.go b/types/test_utils.go index 2afa6f02a..7469cf037 100644 --- a/types/test_utils.go +++ b/types/test_utils.go @@ -37,7 +37,7 @@ func GetRandomSignedHeader() (*SignedHeader, ed25519.PrivKey, error) { BaseHeader: BaseHeader{ ChainID: "test", Height: rand.Uint64(), //nolint:gosec, - Time: uint64(time.Now().Unix()), + Time: uint64(time.Now().UnixNano()), }, LastHeaderHash: GetRandomBytes(32), LastCommitHash: GetRandomBytes(32), @@ -70,7 +70,7 @@ func GetNextRandomHeader(signedHeader *SignedHeader, privKey ed25519.PrivKey) (* BaseHeader: BaseHeader{ ChainID: "test", Height: uint64(signedHeader.Height() + 1), - Time: uint64(time.Now().Unix()), + Time: uint64(time.Now().UnixNano()), }, LastHeaderHash: signedHeader.Hash(), DataHash: GetRandomBytes(32),