Skip to content

Commit

Permalink
Merge branch 'main' into chandini/load-store
Browse files Browse the repository at this point in the history
  • Loading branch information
chandiniv1 authored Nov 20, 2023
2 parents 456b429 + 58b3152 commit 6e8d52b
Show file tree
Hide file tree
Showing 41 changed files with 404 additions and 152 deletions.
10 changes: 7 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ run:
- mempool
- state/indexer
- state/txindex
- third_party

linters:
enable:
- deadcode
- errcheck
- gofmt
- goimports
Expand All @@ -20,13 +20,14 @@ linters:
- misspell
- revive
- staticcheck
- structcheck
- typecheck
- unused
- varcheck

issues:
exclude-use-default: false
include:
- EXC0012 # EXC0012 revive: Annoying issue about not having a comment. The rare codebase has such comments
- EXC0014 # EXC0014 revive: Annoying issue about not having a comment. The rare codebase has such comments

linters-settings:
revive:
Expand All @@ -35,6 +36,9 @@ linters-settings:
disabled: true
- name: duplicated-imports
severity: warning
- name: exported
arguments:
- disableStutteringCheck

goimports:
local-prefixes: github.com/rollkit
18 changes: 9 additions & 9 deletions block/block-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ sequenceDiagram
Sequencer->>Full Node 2: Gossip Block
Full Node 1->>Full Node 1: Verify Block
Full Node 1->>Full Node 2: Gossip Block
Full Node 1->>Full Node 1: Mark Block Soft-Confirmed
Full Node 1->>Full Node 1: Mark Block Soft Confirmed
Full Node 2->>Full Node 2: Verify Block
Full Node 2->>Full Node 2: Mark Block Soft-Confirmed
Full Node 2->>Full Node 2: Mark Block Soft Confirmed
DA Layer->>Full Node 1: Retrieve Block
Full Node 1->>Full Node 1: Mark Block Hard-Confirmed
Full Node 1->>Full Node 1: Mark Block DA Included
DA Layer->>Full Node 2: Retrieve Block
Full Node 2->>Full Node 2: Mark Block Hard-Confirmed
Full Node 2->>Full Node 2: Mark Block DA Included
```

## Protocol/Component Description
Expand Down Expand Up @@ -90,7 +90,7 @@ The block manager of the sequencer full nodes regularly publishes the produced b

### Block Retrieval from DA Network

The block manager of the full nodes regularly pulls blocks from the DA network at `DABlockTime` intervals and starts off with a DA height read from the last state stored in the local store or `DAStartHeight` configuration parameter, whichever is the latest. The block manager also actively maintains and increments the `daHeight` counter after every DA pull. The pull happens by making the `RetrieveBlocks(daHeight)` request using the Data Availability Light Client (DALC) retriever, which can return either `Success`, `NotFound`, or `Error`. In the event of an error, a retry logic kicks in after a delay of 100 milliseconds delay between every retry and after 10 retries, an error is logged and the `daHeight` counter is not incremented, which basically results in the intentional stalling of the block retrieval logic. In the block `NotFound` scenario, there is no error as it is acceptable to have no rollup block at every DA height. The retrieval successfully increments the `daHeight` counter in this case. Finally, for the `Success` scenario, first, blocks that are successfully retrieved are marked as hard confirmed and are sent to be applied (or state update). A successful state update triggers fresh DA and block store pulls without respecting the `DABlockTime` and `BlockTime` intervals.
The block manager of the full nodes regularly pulls blocks from the DA network at `DABlockTime` intervals and starts off with a DA height read from the last state stored in the local store or `DAStartHeight` configuration parameter, whichever is the latest. The block manager also actively maintains and increments the `daHeight` counter after every DA pull. The pull happens by making the `RetrieveBlocks(daHeight)` request using the Data Availability Light Client (DALC) retriever, which can return either `Success`, `NotFound`, or `Error`. In the event of an error, a retry logic kicks in after a delay of 100 milliseconds delay between every retry and after 10 retries, an error is logged and the `daHeight` counter is not incremented, which basically results in the intentional stalling of the block retrieval logic. In the block `NotFound` scenario, there is no error as it is acceptable to have no rollup block at every DA height. The retrieval successfully increments the `daHeight` counter in this case. Finally, for the `Success` scenario, first, blocks that are successfully retrieved are marked as DA included and are sent to be applied (or state update). A successful state update triggers fresh DA and block store pulls without respecting the `DABlockTime` and `BlockTime` intervals.

### Block Sync Service

Expand All @@ -109,13 +109,13 @@ For non-sequencer full nodes, Blocks gossiped through the P2P network are retrie
Starting off with a block store height of zero, for every `blockTime` unit of time, a signal is sent to the `blockStoreCh` channel in the block manager and when this signal is received, the `BlockStoreRetrieveLoop` retrieves blocks from the block store.
It keeps track of the last retrieved block's height and every time the current block store's height is greater than the last retrieved block's height, it retrieves all blocks from the block store that are between these two heights.
For each retrieved block, it sends a new block event to the `blockInCh` channel which is the same channel in which blocks retrieved from the DA layer are sent.
This block is marked as soft-confirmed by the validating full node until the same block is seen on the DA layer and then marked hard-confirmed.
This block is marked as soft confirmed by the validating full node until the same block is seen on the DA layer and then marked DA-included.

Although a sequencer does not need to retrieve blocks from the P2P network, it still runs the `BlockStoreRetrieveLoop`.

#### About Soft/Hard Confirmations
#### About Soft Confirmations and DA Inclusions

The block manager retrieves blocks from both the P2P network and the underlying DA network because the blocks are available in the P2P network faster and DA retrieval is slower (e.g., 1 second vs 15 seconds). The blocks retrieved from the P2P network are only marked as soft confirmed until the DA retrieval succeeds on those blocks and they are marked hard confirmed. The hard confirmations can be considered to have a higher level of finality.
The block manager retrieves blocks from both the P2P network and the underlying DA network because the blocks are available in the P2P network faster and DA retrieval is slower (e.g., 1 second vs 15 seconds). The blocks retrieved from the P2P network are only marked as soft confirmed until the DA retrieval succeeds on those blocks and they are marked DA included. DA included blocks can be considered to have a higher level of finality.

### State Update after Block Retrieval

Expand Down Expand Up @@ -145,7 +145,7 @@ The communication between the full node and block manager:
* The default mode for sequencer nodes is normal (not lazy).
* The sequencer can produce empty blocks.
* The block manager uses persistent storage (disk) when the `root_dir` and `db_path` configuration parameters are specified in `config.toml` file under the app directory. If these configuration parameters are not specified, the in-memory storage is used, which will not be persistent if the node stops.
* The block manager does not re-apply the block again (in other words, create a new updated state and persist it) when a block was initially applied using P2P block sync, but later was hard confirmed by DA retrieval. The block is only set hard confirmed in this case.
* The block manager does not re-apply the block again (in other words, create a new updated state and persist it) when a block was initially applied using P2P block sync, but later was DA included during DA retrieval. The block is only marked DA included in this case.
* The block sync store is created by prefixing `blockSync` on the main data store.
* The genesis `ChainID` is used to create the `PubSubTopID` in go-header with the string `-block` appended to it. This append is because the full node also has a P2P header sync running with a different P2P network. Refer to go-header specs for more details.
* Block sync over the P2P network works only when a full node is connected to the P2P network by specifying the initial seeds to connect to via `P2PConfig.Seeds` configuration parameter when starting the full node.
Expand Down
26 changes: 14 additions & 12 deletions block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ import (
"github.com/rollkit/rollkit/types"
)

// BlockCache maintains blocks that are seen and hard confirmed
type BlockCache struct {
blocks map[uint64]*types.Block
hashes map[string]bool
hardConfirmations map[string]bool
mtx *sync.RWMutex
blocks map[uint64]*types.Block
hashes map[string]bool
daIncluded map[string]bool
mtx *sync.RWMutex
}

// NewBlockCache returns a new BlockCache struct
func NewBlockCache() *BlockCache {
return &BlockCache{
blocks: make(map[uint64]*types.Block),
hashes: make(map[string]bool),
hardConfirmations: make(map[string]bool),
mtx: new(sync.RWMutex),
blocks: make(map[uint64]*types.Block),
hashes: make(map[string]bool),
daIncluded: make(map[string]bool),
mtx: new(sync.RWMutex),
}
}

Expand Down Expand Up @@ -53,14 +55,14 @@ func (bc *BlockCache) setSeen(hash string) {
bc.hashes[hash] = true
}

func (bc *BlockCache) isHardConfirmed(hash string) bool {
func (bc *BlockCache) isDAIncluded(hash string) bool {
bc.mtx.RLock()
defer bc.mtx.RUnlock()
return bc.hardConfirmations[hash]
return bc.daIncluded[hash]
}

func (bc *BlockCache) setHardConfirmed(hash string) {
func (bc *BlockCache) setDAIncluded(hash string) {
bc.mtx.Lock()
defer bc.mtx.Unlock()
bc.hardConfirmations[hash] = true
bc.daIncluded[hash] = true
}
8 changes: 4 additions & 4 deletions block/block_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestBlockCache(t *testing.T) {
bc.setSeen("hash")
require.True(bc.isSeen("hash"), "isSeen should return true for seen hash")

// Test setHardConfirmed
require.False(bc.isHardConfirmed("hash"), "hardConfirmations should be false for unseen hash")
bc.setHardConfirmed("hash")
require.True(bc.isHardConfirmed("hash"), "hardConfirmations should be true for seen hash")
// Test setDAIncluded
require.False(bc.isDAIncluded("hash"), "DAIncluded should be false for unseen hash")
bc.setDAIncluded("hash")
require.True(bc.isDAIncluded("hash"), "DAIncluded should be true for seen hash")
}
14 changes: 9 additions & 5 deletions block/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/rollkit/rollkit/types"
)

// P2P Sync Service for block that implements the go-header interface.
// Contains a block store where synced blocks are stored.
// BlockSyncService is the P2P Sync Service for block that implements the
// go-header interface. Contains a block store where synced blocks are stored.
// Uses the go-header library for handling all P2P logic.
type BlockSyncService struct {
conf config.NodeConfig
Expand All @@ -43,6 +43,7 @@ type BlockSyncService struct {
ctx context.Context
}

// NewBlockSyncService returns a new BlockSyncService.
func NewBlockSyncService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*BlockSyncService, error) {
if genesis == nil {
return nil, errors.New("genesis doc cannot be nil")
Expand Down Expand Up @@ -90,8 +91,10 @@ func (bSyncService *BlockSyncService) initBlockStoreAndStartSyncer(ctx context.C
return nil
}

// Initialize block store if needed and broadcasts provided block.
// Note: Only returns an error in case block store can't be initialized. Logs error if there's one while broadcasting.
// WriteToBlockStoreAndBroadcast initializes block store if needed and broadcasts
// provided block.
// Note: Only returns an error in case block store can't be initialized. Logs
// error if there's one while broadcasting.
func (bSyncService *BlockSyncService) WriteToBlockStoreAndBroadcast(ctx context.Context, block *types.Block) error {
// For genesis block initialize the store and start the syncer
if int64(block.Height()) == bSyncService.genesis.InitialHeight {
Expand Down Expand Up @@ -205,7 +208,7 @@ func (bSyncService *BlockSyncService) Start() error {
return nil
}

// OnStop is a part of Service interface.
// Stop is a part of Service interface.
func (bSyncService *BlockSyncService) Stop() error {
err := bSyncService.blockStore.Stop(bSyncService.ctx)
err = multierr.Append(err, bSyncService.p2pServer.Stop(bSyncService.ctx))
Expand Down Expand Up @@ -254,6 +257,7 @@ func newBlockSyncer(
return goheadersync.NewSyncer[*types.Block](ex, store, sub, opts...)
}

// StartSyncer starts the BlockSyncService's syncer
func (bSyncService *BlockSyncService) StartSyncer() error {
bSyncService.syncerStatus.m.Lock()
defer bSyncService.syncerStatus.m.Unlock()
Expand Down
31 changes: 17 additions & 14 deletions block/header_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"github.com/rollkit/rollkit/types"
)

// P2P Sync Service for header that implements the go-header interface.
// HeaderSyncService is the P2P Sync Service for header that implements the
// go-header interface.
// Contains a header store where synced headers are stored.
// Uses the go-header library for handling all P2P logic.
type HeaderSynceService struct {
type HeaderSyncService struct {
conf config.NodeConfig
genesis *cmtypes.GenesisDoc
p2p *p2p.Client
Expand All @@ -43,7 +44,8 @@ type HeaderSynceService struct {
ctx context.Context
}

func NewHeaderSynceService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderSynceService, error) {
// NewHeaderSyncService returns a new HeaderSyncService.
func NewHeaderSyncService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderSyncService, error) {
if genesis == nil {
return nil, errors.New("genesis doc cannot be nil")
}
Expand All @@ -61,7 +63,7 @@ func NewHeaderSynceService(ctx context.Context, store ds.TxnDatastore, conf conf
return nil, fmt.Errorf("failed to initialize the header store: %w", err)
}

return &HeaderSynceService{
return &HeaderSyncService{
conf: conf,
genesis: genesis,
p2p: p2p,
Expand All @@ -73,11 +75,11 @@ func NewHeaderSynceService(ctx context.Context, store ds.TxnDatastore, conf conf
}

// HeaderStore returns the headerstore of the HeaderSynceService
func (hSyncService *HeaderSynceService) HeaderStore() *goheaderstore.Store[*types.SignedHeader] {
func (hSyncService *HeaderSyncService) HeaderStore() *goheaderstore.Store[*types.SignedHeader] {
return hSyncService.headerStore
}

func (hSyncService *HeaderSynceService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error {
func (hSyncService *HeaderSyncService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error {
if initial == nil {
return fmt.Errorf("failed to initialize the headerstore and start syncer")
}
Expand All @@ -90,9 +92,9 @@ func (hSyncService *HeaderSynceService) initHeaderStoreAndStartSyncer(ctx contex
return nil
}

// Initialize header store if needed and broadcasts provided header.
// WriteToHeaderStoreAndBroadcast initializes header store if needed and broadcasts provided header.
// Note: Only returns an error in case header store can't be initialized. Logs error if there's one while broadcasting.
func (hSyncService *HeaderSynceService) WriteToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error {
func (hSyncService *HeaderSyncService) WriteToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error {
// For genesis header initialize the store and start the syncer
if int64(signedHeader.Height()) == hSyncService.genesis.InitialHeight {
if err := hSyncService.headerStore.Init(ctx, signedHeader); err != nil {
Expand All @@ -111,12 +113,12 @@ func (hSyncService *HeaderSynceService) WriteToHeaderStoreAndBroadcast(ctx conte
return nil
}

func (hSyncService *HeaderSynceService) isInitialized() bool {
func (hSyncService *HeaderSyncService) isInitialized() bool {
return hSyncService.headerStore.Height() > 0
}

// OnStart is a part of Service interface.
func (hSyncService *HeaderSynceService) Start() error {
// Start is a part of Service interface.
func (hSyncService *HeaderSyncService) Start() error {
// have to do the initializations here to utilize the p2p node which is created on start
ps := hSyncService.p2p.PubSub()

Expand Down Expand Up @@ -203,8 +205,8 @@ func (hSyncService *HeaderSynceService) Start() error {
return nil
}

// OnStop is a part of Service interface.
func (hSyncService *HeaderSynceService) Stop() error {
// Stop is a part of Service interface.
func (hSyncService *HeaderSyncService) Stop() error {
err := hSyncService.headerStore.Stop(hSyncService.ctx)
err = multierr.Append(err, hSyncService.p2pServer.Stop(hSyncService.ctx))
err = multierr.Append(err, hSyncService.ex.Stop(hSyncService.ctx))
Expand Down Expand Up @@ -253,7 +255,8 @@ func newSyncer(
return goheadersync.NewSyncer[*types.SignedHeader](ex, store, sub, opts...)
}

func (hSyncService *HeaderSynceService) StartSyncer() error {
// StartSyncer starts the HeaderSyncService's syncer
func (hSyncService *HeaderSyncService) StartSyncer() error {
hSyncService.syncerStatus.m.Lock()
defer hSyncService.syncerStatus.m.Unlock()
if hSyncService.syncerStatus.started {
Expand Down
11 changes: 6 additions & 5 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ func (m *Manager) GetStoreHeight() uint64 {
return m.store.Height()
}

// GetHardConfirmation returns true if the block is hard confirmed
func (m *Manager) GetHardConfirmation(hash types.Hash) bool {
return m.blockCache.isHardConfirmed(hash.String())
// IsDAIncluded returns true if the block with the given hash has been seen on DA.
func (m *Manager) IsDAIncluded(hash types.Hash) bool {
return m.blockCache.isDAIncluded(hash.String())
}

// AggregationLoop is responsible for aggregating transactions into rollup-blocks.
Expand Down Expand Up @@ -508,8 +508,8 @@ func (m *Manager) processNextDABlock(ctx context.Context) error {
m.logger.Debug("retrieved potential blocks", "n", len(blockResp.Blocks), "daHeight", daHeight)
for _, block := range blockResp.Blocks {
blockHash := block.Hash().String()
m.blockCache.setHardConfirmed(blockHash)
m.logger.Info("block marked as hard confirmed", "blockHeight", block.Height(), "blockHash", blockHash)
m.blockCache.setDAIncluded(blockHash)
m.logger.Info("block marked as DA included", "blockHeight", block.Height(), "blockHash", blockHash)
if !m.blockCache.isSeen(blockHash) {
m.blockInCh <- newBlockEvent{block, daHeight}
}
Expand Down Expand Up @@ -562,6 +562,7 @@ func (m *Manager) getCommit(header types.Header) (*types.Commit, error) {
}, nil
}

// IsProposer returns whether or not the manager is a proposer
func (m *Manager) IsProposer() (bool, error) {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
Expand Down
13 changes: 6 additions & 7 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func getMockDALC(logger log.Logger) da.DataAvailabilityLayerClient {
return dalc
}

func TestGetHardConfirmation(t *testing.T) {
func TestIsDAIncluded(t *testing.T) {
require := require.New(t)

// Create a minimalistic block manager
Expand All @@ -112,11 +112,10 @@ func TestGetHardConfirmation(t *testing.T) {
}
hash := types.Hash([]byte("hash"))

// GetHardConfirmation should return false for unseen hash
require.False(m.GetHardConfirmation(hash))
// IsDAIncluded should return false for unseen hash
require.False(m.IsDAIncluded(hash))

// Set the hash as hard confirmed and verify GetHardConfirmation returns
// true
m.blockCache.setHardConfirmed(hash.String())
require.True(m.GetHardConfirmation(hash))
// Set the hash as DAIncluded and verify IsDAIncluded returns true
m.blockCache.setDAIncluded(hash.String())
require.True(m.IsDAIncluded(hash))
}
3 changes: 2 additions & 1 deletion block/pending_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"github.com/rollkit/rollkit/types"
)

// Maintains blocks that need to be published to DA layer
// PendingBlocks maintains blocks that need to be published to DA layer
type PendingBlocks struct {
pendingBlocks []*types.Block
mtx *sync.RWMutex
}

// NewPendingBlocks returns a new PendingBlocks struct
func NewPendingBlocks() *PendingBlocks {
return &PendingBlocks{
pendingBlocks: make([]*types.Block, 0),
Expand Down
4 changes: 2 additions & 2 deletions block/syncer_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"sync"
)

// Used by header and block exchange service for keeping track of
// the status of the syncer in them.
// SyncerStatus is used by header and block exchange service for keeping track
// of the status of the syncer in them.
type SyncerStatus struct {
started bool
m sync.RWMutex
Expand Down
Loading

0 comments on commit 6e8d52b

Please sign in to comment.