Skip to content

Commit

Permalink
Hardcode centralized sequencer behavior (rollkit#1301)
Browse files Browse the repository at this point in the history
## Overview
Goal is to close rollkit#1273,
rollkit#1272, and possibly
rollkit#1254 and
rollkit#1270.

## Checklist
- [ ] New and updated code has appropriate documentation
- [ ] New and updated code has new and/or updated testing
- [ ] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [ ] Linked issues closed with keywords


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
  - Added a search functionality to the app.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Connor O'Hara <connor@switchboard.xyz>
Co-authored-by: Manav Aggarwal <manavaggarwal1234@gmail.com>
Co-authored-by: Ganesha Upadhyaya <gupadhyaya@Ganeshas-MacBook-Pro-2.local>
Co-authored-by: Ganesha Upadhyaya <ganeshrvce@gmail.com>
  • Loading branch information
5 people authored Nov 23, 2023
1 parent 2437b3d commit 4892ede
Show file tree
Hide file tree
Showing 32 changed files with 355 additions and 1,091 deletions.
61 changes: 5 additions & 56 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) {
}
start := time.Now()
err := m.publishBlock(ctx)
if err != nil {
if err != nil && ctx.Err() == nil {
m.logger.Error("error while publishing block", "error", err)
}
timer.Reset(m.getRemainingSleep(start))
Expand All @@ -263,7 +263,7 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) {
case <-timer.C:
// build a block with all the transactions received in the last 1 second
err := m.publishBlock(ctx)
if err != nil {
if err != nil && ctx.Err() == nil {
m.logger.Error("error while publishing block", "error", err)
}
// this can be used to notify multiple subscribers when a block has been built
Expand Down Expand Up @@ -397,12 +397,6 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
return fmt.Errorf("failed to save block responses: %w", err)
}

// SaveValidators commits the DB tx
err = m.saveValidatorsToStore(bHeight)
if err != nil {
return err
}

m.store.SetHeight(bHeight)

if daHeight > newState.DAHeight {
Expand Down Expand Up @@ -564,19 +558,12 @@ func (m *Manager) getCommit(header types.Header) (*types.Commit, error) {

// IsProposer returns whether or not the manager is a proposer
func (m *Manager) IsProposer() (bool, error) {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
// if proposer is not set, assume self proposer
if m.lastState.Validators.Proposer == nil {
return true, nil
}

signerPubBytes, err := m.proposerKey.GetPublic().Raw()
if err != nil {
return false, err
}

return bytes.Equal(m.lastState.Validators.Proposer.PubKey.Bytes(), signerPubBytes), nil
return bytes.Equal(m.genesis.Validators[0].PubKey.Bytes(), signerPubBytes), nil
}

func (m *Manager) publishBlock(ctx context.Context) error {
Expand Down Expand Up @@ -627,7 +614,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
if err != nil {
return nil
}
block.SignedHeader.Header.NextAggregatorsHash = m.getNextAggregatorsHash()

commit, err = m.getCommit(block.SignedHeader.Header)
if err != nil {
return err
Expand All @@ -636,8 +623,6 @@ func (m *Manager) publishBlock(ctx context.Context) error {
// set the commit to current block's signed header
block.SignedHeader.Commit = *commit

block.SignedHeader.Validators = m.getLastStateValidators()

// SaveBlock commits the DB tx
err = m.store.SaveBlock(block, commit)
if err != nil {
Expand All @@ -657,8 +642,6 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

block.SignedHeader.Header.NextAggregatorsHash = newState.NextValidators.Hash()

commit, err = m.getCommit(block.SignedHeader.Header)
if err != nil {
return err
Expand All @@ -667,8 +650,6 @@ func (m *Manager) publishBlock(ctx context.Context) error {
// set the commit to current block's signed header
block.SignedHeader.Commit = *commit

block.SignedHeader.Validators = m.getLastStateValidators()

// Validate the created block before storing
if err := m.executor.Validate(m.lastState, block); err != nil {
return fmt.Errorf("failed to validate block: %w", err)
Expand Down Expand Up @@ -702,12 +683,6 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

// SaveValidators commits the DB tx
err = m.saveValidatorsToStore(blockHeight)
if err != nil {
return err
}

newState.DAHeight = atomic.LoadUint64(&m.daHeight)
// After this call m.lastState is the NEW state returned from ApplyBlock
// updateState also commits the DB tx
Expand Down Expand Up @@ -776,24 +751,6 @@ func (m *Manager) updateState(s types.State) error {
return nil
}

func (m *Manager) saveValidatorsToStore(height uint64) error {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
return m.store.SaveValidators(height, m.lastState.Validators)
}

func (m *Manager) getLastStateValidators() *cmtypes.ValidatorSet {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
return m.lastState.Validators
}

func (m *Manager) getNextAggregatorsHash() types.Hash {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
return m.lastState.NextValidators.Hash()
}

func (m *Manager) getLastBlockTime() time.Time {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
Expand All @@ -811,6 +768,7 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block) (types.Sta
defer m.lastStateMtx.RUnlock()
return m.executor.ApplyBlock(ctx, m.lastState, block)
}

func updateState(s *types.State, res *abci.ResponseInitChain) {
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
Expand Down Expand Up @@ -843,13 +801,4 @@ func updateState(s *types.State, res *abci.ResponseInitChain) {
// We update the last results hash with the empty hash, to conform with RFC-6962.
s.LastResultsHash = merkle.HashFromByteSlices(nil)

if len(res.Validators) > 0 {
vals, err := cmtypes.PB2TM.ValidatorUpdates(res.Validators)
if err != nil {
// TODO(tzdybal): handle error properly
panic(err)
}
s.Validators = cmtypes.NewValidatorSet(vals)
s.NextValidators = cmtypes.NewValidatorSet(vals).CopyIncrementProposerPriority(1)
}
}
5 changes: 2 additions & 3 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ import (
)

func TestInitialState(t *testing.T) {
genesisValidators, _ := types.GetGenesisValidatorSetWithSigner()
genesis := &cmtypes.GenesisDoc{
ChainID: "genesis id",
InitialHeight: 100,
Validators: genesisValidators,
}
sampleState := types.State{
ChainID: "state id",
InitialHeight: 123,
LastBlockHeight: 128,
LastValidators: types.GetRandomValidatorSet(),
Validators: types.GetRandomValidatorSet(),
NextValidators: types.GetRandomValidatorSet(),
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
1 change: 0 additions & 1 deletion da/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (m *DataAvailabilityLayerClient) SubmitBlocks(ctx context.Context, blocks [
for _, block := range blocks {
blockHeight := uint64(block.Height())
m.logger.Debug("Submitting blocks to DA layer!", "height", blockHeight, "dataLayerHeight", daHeight)

hash := block.Hash()
blob, err := block.MarshalBinary()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/multiformats/go-multiaddr v0.12.0
github.com/prometheus/client_golang v1.17.0
github.com/rollkit/celestia-openrpc v0.3.0
github.com/rollkit/go-da v0.0.0-20231024133951-57bc36006772
github.com/rollkit/go-da v0.0.0-20231117151938-ee3b613d7a3a
github.com/rs/cors v1.10.1
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.17.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1377,8 +1377,8 @@ github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rollkit/celestia-openrpc v0.3.0 h1:jMLsdLNQ7T20yiNDlisBhlyurFOpN1gZ6vC068FPrQA=
github.com/rollkit/celestia-openrpc v0.3.0/go.mod h1:2ZhU01YF2hsHIROWzxfMZOYM09Kgyy4roH5JWoNJzp0=
github.com/rollkit/go-da v0.0.0-20231024133951-57bc36006772 h1:0qbVvvxy++RIjwoI2GmqgZDNP5yShBMA+swWjKt7mQE=
github.com/rollkit/go-da v0.0.0-20231024133951-57bc36006772/go.mod h1:cy1LA9kCyCJHgszKkTh9hJn816l5Oa87GMA2c1imfqA=
github.com/rollkit/go-da v0.0.0-20231117151938-ee3b613d7a3a h1:d/2491oTlCydpZepyxG66D8s5tT9QG9n4YuemL0eCmQ=
github.com/rollkit/go-da v0.0.0-20231117151938-ee3b613d7a3a/go.mod h1:cy1LA9kCyCJHgszKkTh9hJn816l5Oa87GMA2c1imfqA=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo=
Expand Down
31 changes: 0 additions & 31 deletions node/crypto.go

This file was deleted.

52 changes: 0 additions & 52 deletions node/crypto_test.go

This file was deleted.

48 changes: 30 additions & 18 deletions node/full_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,25 +498,28 @@ func (c *FullClient) Commit(ctx context.Context, height *int64) (*ctypes.ResultC
// Validators returns paginated list of validators at given height.
func (c *FullClient) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) {
height := c.normalizeHeight(heightPtr)
validators, err := c.node.Store.GetValidators(height)
if err != nil {
return nil, fmt.Errorf("failed to load validators for height %d: %w", height, err)
}
genesisValidators := c.node.GetGenesis().Validators

totalCount := len(validators.Validators)
perPage := validatePerPage(perPagePtr)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {
return nil, err
if len(genesisValidators) != 1 {
return nil, fmt.Errorf("there should be exactly one validator in genesis")
}
// Since it's a centralized sequencer
// changed behavior to get this from genesis
genesisValidator := genesisValidators[0]
validator := cmtypes.Validator{
Address: genesisValidator.Address,
PubKey: genesisValidator.PubKey,
VotingPower: int64(1),
ProposerPriority: int64(1),
}

skipCount := validateSkipCount(page, perPage)
v := validators.Validators[skipCount : skipCount+cmmath.MinInt(perPage, totalCount-skipCount)]
return &ctypes.ResultValidators{
BlockHeight: int64(height),
Validators: v,
Count: len(v),
Total: totalCount,
Validators: []*cmtypes.Validator{
&validator,
},
Count: 1,
Total: 1,
}, nil
}

Expand Down Expand Up @@ -696,11 +699,20 @@ func (c *FullClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
return nil, fmt.Errorf("failed to find earliest block: %w", err)
}

validators, err := c.node.Store.GetValidators(latest.Height())
if err != nil {
return nil, fmt.Errorf("failed to fetch the validator info at latest block: %w", err)
genesisValidators := c.node.GetGenesis().Validators

if len(genesisValidators) != 1 {
return nil, fmt.Errorf("there should be exactly one validator in genesis")
}

// Changed behavior to get this from genesis
genesisValidator := genesisValidators[0]
validator := cmtypes.Validator{
Address: genesisValidator.Address,
PubKey: genesisValidator.PubKey,
VotingPower: int64(1),
ProposerPriority: int64(1),
}
_, validator := validators.GetByAddress(latest.SignedHeader.ProposerAddress)

state, err := c.node.Store.GetState()
if err != nil {
Expand Down
Loading

0 comments on commit 4892ede

Please sign in to comment.