Skip to content

Commit

Permalink
Remove sequencer gossip (cosmos#786)
Browse files Browse the repository at this point in the history
Fixes rollkit/rollkit#781

---------

Co-authored-by: Ganesha Upadhyaya <gupadhyaya@Ganeshas-MacBook-Pro-2.local>
  • Loading branch information
gupadhyaya and Ganesha Upadhyaya authored Mar 16, 2023
1 parent 8af02b6 commit 4624443
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 127 deletions.
85 changes: 19 additions & 66 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,12 @@ type Manager struct {
// daHeight is the height of the latest processed DA block
daHeight uint64

HeaderOutCh chan *types.SignedHeader
HeaderInCh chan *types.SignedHeader
SyncedHeadersCh chan *types.SignedHeader

lastCommit atomic.Value
HeaderCh chan *types.SignedHeader

FraudProofInCh chan *abci.FraudProof

syncTarget uint64
blockInCh chan newBlockEvent
syncCache map[uint64]*types.Block
blockInCh chan newBlockEvent
syncCache map[uint64]*types.Block

// retrieveMtx is used by retrieveCond
retrieveMtx *sync.Mutex
Expand Down Expand Up @@ -152,9 +147,7 @@ func NewManager(
retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
daHeight: s.DAHeight,
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderOutCh: make(chan *types.SignedHeader, 100),
HeaderInCh: make(chan *types.SignedHeader, 100),
SyncedHeadersCh: make(chan *types.SignedHeader, 100),
HeaderCh: make(chan *types.SignedHeader, 100),
blockInCh: make(chan newBlockEvent, 100),
FraudProofInCh: make(chan *abci.FraudProof, 100),
retrieveMtx: new(sync.Mutex),
Expand Down Expand Up @@ -262,27 +255,6 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
select {
case <-daTicker.C:
m.retrieveCond.Signal()
case header := <-m.HeaderInCh:
m.logger.Debug("block header received", "height", header.Header.Height(), "hash", header.Header.Hash())
m.SyncedHeadersCh <- header
newHeight := header.Header.BaseHeader.Height
currentHeight := m.store.Height()
// in case of client reconnecting after being offline
// newHeight may be significantly larger than currentHeight
// it's handled gently in RetrieveLoop
if newHeight > currentHeight {
atomic.StoreUint64(&m.syncTarget, newHeight)
m.retrieveCond.Signal()
}
commit := &header.Commit
// TODO(tzdybal): check if it's from right aggregator
m.lastCommit.Store(commit)
err := m.trySyncNextBlock(ctx, 0)
if err != nil {
m.logger.Info("failed to sync next block", "error", err)
} else {
m.logger.Debug("synced using signed header", "height", commit.Height)
}
case blockEvent := <-m.blockInCh:
block := blockEvent.block
daHeight := blockEvent.daHeight
Expand Down Expand Up @@ -335,43 +307,36 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
// If block at height h+1 is not available, value of last gossiped commit is checked.
// If commit for block h is available, we proceed with sync process, and remove synced block from sync cache.
func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
var b1 *types.Block
var commit *types.Commit
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory

b1, ok1 := m.syncCache[currentHeight+1]
if !ok1 {
b, ok := m.syncCache[currentHeight+1]
if !ok {
return nil
}
b2, ok2 := m.syncCache[currentHeight+2]
if ok2 {
m.logger.Debug("using last commit from next block")
commit = &b2.SignedHeader.Commit
} else {
lastCommit := m.getLastCommit()
if lastCommit != nil && lastCommit.Height == currentHeight+1 {
m.logger.Debug("using gossiped commit")
commit = lastCommit
}

signedHeader := &b.SignedHeader
if signedHeader != nil {
commit = &b.SignedHeader.Commit
}

if b1 != nil && commit != nil {
m.logger.Info("Syncing block", "height", b1.SignedHeader.Header.Height())
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b1)
if b != nil && commit != nil {
m.logger.Info("Syncing block", "height", b.SignedHeader.Header.Height())
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b)
if err != nil {
return fmt.Errorf("failed to ApplyBlock: %w", err)
}
err = m.store.SaveBlock(b1, commit)
err = m.store.SaveBlock(b, commit)
if err != nil {
return fmt.Errorf("failed to save block: %w", err)
}
_, _, err = m.executor.Commit(ctx, newState, b1, responses)
_, _, err = m.executor.Commit(ctx, newState, b, responses)
if err != nil {
return fmt.Errorf("failed to Commit: %w", err)
}
m.store.SetHeight(uint64(b1.SignedHeader.Header.Height()))
m.store.SetHeight(uint64(b.SignedHeader.Header.Height()))

err = m.store.SaveBlockResponses(uint64(b1.SignedHeader.Header.Height()), responses)
err = m.store.SaveBlockResponses(uint64(b.SignedHeader.Header.Height()), responses)
if err != nil {
return fmt.Errorf("failed to save block responses: %w", err)
}
Expand All @@ -390,14 +355,6 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
return nil
}

func (m *Manager) getLastCommit() *types.Commit {
ptr := m.lastCommit.Load()
if ptr == nil {
return nil
}
return ptr.(*types.Commit)
}

// RetrieveLoop is responsible for interacting with DA layer.
func (m *Manager) RetrieveLoop(ctx context.Context) {
// waitCh is used to signal the retrieve loop, that it should process next blocks
Expand Down Expand Up @@ -602,7 +559,8 @@ func (m *Manager) publishBlock(ctx context.Context) error {
// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(uint64(block.SignedHeader.Header.Height()))

m.publishSignedHeader(block, commit)
// Publish header to channel so that header exchange service can broadcast
m.HeaderCh <- &block.SignedHeader

return nil
}
Expand Down Expand Up @@ -639,11 +597,6 @@ func (m *Manager) exponentialBackoff(backoff time.Duration) time.Duration {
return backoff
}

// TODO(tzdybal): consider inlining
func (m *Manager) publishSignedHeader(block *types.Block, commit *types.Commit) {
m.HeaderOutCh <- &types.SignedHeader{Header: block.SignedHeader.Header, Commit: *commit}
}

func updateState(s *types.State, res *abci.ResponseInitChain) {
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
Expand Down
35 changes: 2 additions & 33 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/rollkit/rollkit/state/txindex"
"github.com/rollkit/rollkit/state/txindex/kv"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/types"
)

// prefixes used in KV store to separate main node data from DALC data
Expand Down Expand Up @@ -151,7 +150,7 @@ func newFullNode(
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
}

headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, blockManager.SyncedHeadersCh, logger.With("module", "HeaderExchangeService"))
headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
}
Expand Down Expand Up @@ -182,7 +181,6 @@ func newFullNode(
node.BaseService = *service.NewBaseService(logger, "Node", node)

node.P2P.SetTxValidator(node.newTxValidator())
node.P2P.SetHeaderValidator(node.newHeaderValidator())
node.P2P.SetFraudProofValidator(node.newFraudProofValidator())

return node, nil
Expand Down Expand Up @@ -220,16 +218,8 @@ func (n *FullNode) initGenesisChunks() error {
func (n *FullNode) headerPublishLoop(ctx context.Context) {
for {
select {
case signedHeader := <-n.blockManager.HeaderOutCh:
case signedHeader := <-n.blockManager.HeaderCh:
n.hExService.writeToHeaderStoreAndBroadcast(ctx, signedHeader)
headerBytes, err := signedHeader.MarshalBinary()
if err != nil {
n.Logger.Error("failed to serialize signed block header", "error", err)
}
err = n.P2P.GossipSignedHeader(ctx, headerBytes)
if err != nil {
n.Logger.Error("failed to gossip signed block header", "error", err)
}
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -363,27 +353,6 @@ func (n *FullNode) newTxValidator() p2p.GossipValidator {
}
}

// newHeaderValidator returns a pubsub validator that runs basic checks and forwards
// the deserialized header for further processing
func (n *FullNode) newHeaderValidator() p2p.GossipValidator {
return func(headerMsg *p2p.GossipMessage) bool {
n.Logger.Debug("header received", "from", headerMsg.From, "bytes", len(headerMsg.Data))
var header types.SignedHeader
err := header.UnmarshalBinary(headerMsg.Data)
if err != nil {
n.Logger.Error("failed to deserialize header", "error", err)
return false
}
err = header.ValidateBasic()
if err != nil {
n.Logger.Error("failed to validate header", "error", err)
return false
}
n.blockManager.HeaderInCh <- &header
return true
}
}

// newFraudProofValidator returns a pubsub validator that validates a fraud proof and forwards
// it to be verified
func (n *FullNode) newFraudProofValidator() p2p.GossipValidator {
Expand Down
1 change: 1 addition & 0 deletions node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, d
ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n),
}
bmConfig := config.BlockManagerConfig{
DABlockTime: 100 * time.Millisecond,
BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly
NamespaceID: types.NamespaceID{8, 7, 6, 5, 4, 3, 2, 1},
FraudProofs: true,
Expand Down
41 changes: 16 additions & 25 deletions node/header_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,21 @@ import (
)

type HeaderExchangeService struct {
conf config.NodeConfig
genesis *tmtypes.GenesisDoc
p2p *p2p.Client
ex *goheaderp2p.Exchange[*types.SignedHeader]
syncer *sync.Syncer[*types.SignedHeader]
sub *goheaderp2p.Subscriber[*types.SignedHeader]
p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader]
headerStore *goheaderstore.Store[*types.SignedHeader]
syncerStarted bool
syncedHeadersCh chan *types.SignedHeader
conf config.NodeConfig
genesis *tmtypes.GenesisDoc
p2p *p2p.Client
ex *goheaderp2p.Exchange[*types.SignedHeader]
syncer *sync.Syncer[*types.SignedHeader]
sub *goheaderp2p.Subscriber[*types.SignedHeader]
p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader]
headerStore *goheaderstore.Store[*types.SignedHeader]
syncerStarted bool

logger log.Logger
ctx context.Context
}

func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *tmtypes.GenesisDoc, p2p *p2p.Client, syncedHeadersCh chan *types.SignedHeader, logger log.Logger) (*HeaderExchangeService, error) {
func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *tmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderExchangeService, error) {
// store is TxnDatastore, but we require Batching, hence the type assertion
// note, the badger datastore impl that is used in the background implements both
storeBatch, ok := store.(ds.Batching)
Expand All @@ -54,13 +53,12 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c
}

return &HeaderExchangeService{
conf: conf,
genesis: genesis,
p2p: p2p,
ctx: ctx,
headerStore: ss,
syncedHeadersCh: syncedHeadersCh,
logger: logger,
conf: conf,
genesis: genesis,
p2p: p2p,
ctx: ctx,
headerStore: ss,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -92,13 +90,6 @@ func (hExService *HeaderExchangeService) tryInitHeaderStoreAndStartSyncer(ctx co
if err := hExService.initHeaderStoreAndStartSyncer(ctx, trustedHeader); err != nil {
hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err)
}
} else {
signedHeader := <-hExService.syncedHeadersCh
if signedHeader.Header.Height() == hExService.genesis.InitialHeight {
if err := hExService.initHeaderStoreAndStartSyncer(ctx, signedHeader); err != nil {
hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err)
}
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions node/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/rollkit/rollkit/config"
"github.com/rollkit/rollkit/p2p"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/types"
)

var _ Node = &LightNode{}
Expand Down Expand Up @@ -56,7 +55,7 @@ func newLightNode(
return nil, err
}

headerExchangeService, err := NewHeaderExchangeService(ctx, datastore, conf, genesis, client, make(chan *types.SignedHeader), logger.With("module", "HeaderExchangeService"))
headerExchangeService, err := NewHeaderExchangeService(ctx, datastore, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
}
Expand Down
27 changes: 26 additions & 1 deletion types/signed_header.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package types

import "github.com/celestiaorg/go-header"
import (
"fmt"

"github.com/celestiaorg/go-header"
)

func (sH *SignedHeader) New() header.Header {
return new(SignedHeader)
Expand All @@ -9,3 +13,24 @@ func (sH *SignedHeader) New() header.Header {
func (sH *SignedHeader) IsZero() bool {
return sH == nil
}

func (sH *SignedHeader) VerifyAdjacent(untrst header.Header) error {
// Explicit type checks are required due to embedded Header which also does the explicit type check
untrstH, ok := untrst.(*SignedHeader)
if !ok {
return &header.VerifyError{
Reason: fmt.Errorf("%T is not of type %T", untrst, sH),
}
}
return sH.Header.VerifyAdjacent(&untrstH.Header)
}

func (sH *SignedHeader) VerifyNonAdjacent(untrst header.Header) error {
untrstH, ok := untrst.(*SignedHeader)
if !ok {
return &header.VerifyError{
Reason: fmt.Errorf("%T is not of type %T", untrst, sH),
}
}
return sH.Header.VerifyNonAdjacent(&untrstH.Header)
}

0 comments on commit 4624443

Please sign in to comment.