Skip to content

Commit

Permalink
Merge branch 'main' into chandini/blockchain-info
Browse files Browse the repository at this point in the history
  • Loading branch information
chandiniv1 committed Nov 16, 2023
2 parents 4400483 + f96b077 commit e30afe2
Show file tree
Hide file tree
Showing 30 changed files with 174 additions and 52 deletions.
10 changes: 7 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ run:
- mempool
- state/indexer
- state/txindex
- third_party

linters:
enable:
- deadcode
- errcheck
- gofmt
- goimports
Expand All @@ -20,13 +20,14 @@ linters:
- misspell
- revive
- staticcheck
- structcheck
- typecheck
- unused
- varcheck

issues:
exclude-use-default: false
include:
- EXC0012 # EXC0012 revive: Annoying issue about not having a comment. The rare codebase has such comments
- EXC0014 # EXC0014 revive: Annoying issue about not having a comment. The rare codebase has such comments

linters-settings:
revive:
Expand All @@ -35,6 +36,9 @@ linters-settings:
disabled: true
- name: duplicated-imports
severity: warning
- name: exported
arguments:
- disableStutteringCheck

goimports:
local-prefixes: github.com/rollkit
2 changes: 2 additions & 0 deletions block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"github.com/rollkit/rollkit/types"
)

// BlockCache maintains blocks that are seen and hard confirmed
type BlockCache struct {
blocks map[uint64]*types.Block
hashes map[string]bool
hardConfirmations map[string]bool
mtx *sync.RWMutex
}

// NewBlockCache returns a new BlockCache struct
func NewBlockCache() *BlockCache {
return &BlockCache{
blocks: make(map[uint64]*types.Block),
Expand Down
14 changes: 9 additions & 5 deletions block/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/rollkit/rollkit/types"
)

// P2P Sync Service for block that implements the go-header interface.
// Contains a block store where synced blocks are stored.
// BlockSyncService is the P2P Sync Service for block that implements the
// go-header interface. Contains a block store where synced blocks are stored.
// Uses the go-header library for handling all P2P logic.
type BlockSyncService struct {
conf config.NodeConfig
Expand All @@ -43,6 +43,7 @@ type BlockSyncService struct {
ctx context.Context
}

// NewBlockSyncService returns a new BlockSyncService.
func NewBlockSyncService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*BlockSyncService, error) {
if genesis == nil {
return nil, errors.New("genesis doc cannot be nil")
Expand Down Expand Up @@ -90,8 +91,10 @@ func (bSyncService *BlockSyncService) initBlockStoreAndStartSyncer(ctx context.C
return nil
}

// Initialize block store if needed and broadcasts provided block.
// Note: Only returns an error in case block store can't be initialized. Logs error if there's one while broadcasting.
// WriteToBlockStoreAndBroadcast initializes block store if needed and broadcasts
// provided block.
// Note: Only returns an error in case block store can't be initialized. Logs
// error if there's one while broadcasting.
func (bSyncService *BlockSyncService) WriteToBlockStoreAndBroadcast(ctx context.Context, block *types.Block) error {
// For genesis block initialize the store and start the syncer
if int64(block.Height()) == bSyncService.genesis.InitialHeight {
Expand Down Expand Up @@ -205,7 +208,7 @@ func (bSyncService *BlockSyncService) Start() error {
return nil
}

// OnStop is a part of Service interface.
// Stop is a part of Service interface.
func (bSyncService *BlockSyncService) Stop() error {
err := bSyncService.blockStore.Stop(bSyncService.ctx)
err = multierr.Append(err, bSyncService.p2pServer.Stop(bSyncService.ctx))
Expand Down Expand Up @@ -254,6 +257,7 @@ func newBlockSyncer(
return goheadersync.NewSyncer[*types.Block](ex, store, sub, opts...)
}

// StartSyncer starts the BlockSyncService's syncer
func (bSyncService *BlockSyncService) StartSyncer() error {
bSyncService.syncerStatus.m.Lock()
defer bSyncService.syncerStatus.m.Unlock()
Expand Down
31 changes: 17 additions & 14 deletions block/header_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"github.com/rollkit/rollkit/types"
)

// P2P Sync Service for header that implements the go-header interface.
// HeaderSyncService is the P2P Sync Service for header that implements the
// go-header interface.
// Contains a header store where synced headers are stored.
// Uses the go-header library for handling all P2P logic.
type HeaderSynceService struct {
type HeaderSyncService struct {
conf config.NodeConfig
genesis *cmtypes.GenesisDoc
p2p *p2p.Client
Expand All @@ -43,7 +44,8 @@ type HeaderSynceService struct {
ctx context.Context
}

func NewHeaderSynceService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderSynceService, error) {
// NewHeaderSyncService returns a new HeaderSyncService.
func NewHeaderSyncService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderSyncService, error) {
if genesis == nil {
return nil, errors.New("genesis doc cannot be nil")
}
Expand All @@ -61,7 +63,7 @@ func NewHeaderSynceService(ctx context.Context, store ds.TxnDatastore, conf conf
return nil, fmt.Errorf("failed to initialize the header store: %w", err)
}

return &HeaderSynceService{
return &HeaderSyncService{
conf: conf,
genesis: genesis,
p2p: p2p,
Expand All @@ -73,11 +75,11 @@ func NewHeaderSynceService(ctx context.Context, store ds.TxnDatastore, conf conf
}

// HeaderStore returns the headerstore of the HeaderSynceService
func (hSyncService *HeaderSynceService) HeaderStore() *goheaderstore.Store[*types.SignedHeader] {
func (hSyncService *HeaderSyncService) HeaderStore() *goheaderstore.Store[*types.SignedHeader] {
return hSyncService.headerStore
}

func (hSyncService *HeaderSynceService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error {
func (hSyncService *HeaderSyncService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error {
if initial == nil {
return fmt.Errorf("failed to initialize the headerstore and start syncer")
}
Expand All @@ -90,9 +92,9 @@ func (hSyncService *HeaderSynceService) initHeaderStoreAndStartSyncer(ctx contex
return nil
}

// Initialize header store if needed and broadcasts provided header.
// WriteToHeaderStoreAndBroadcast initializes header store if needed and broadcasts provided header.
// Note: Only returns an error in case header store can't be initialized. Logs error if there's one while broadcasting.
func (hSyncService *HeaderSynceService) WriteToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error {
func (hSyncService *HeaderSyncService) WriteToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error {
// For genesis header initialize the store and start the syncer
if int64(signedHeader.Height()) == hSyncService.genesis.InitialHeight {
if err := hSyncService.headerStore.Init(ctx, signedHeader); err != nil {
Expand All @@ -111,12 +113,12 @@ func (hSyncService *HeaderSynceService) WriteToHeaderStoreAndBroadcast(ctx conte
return nil
}

func (hSyncService *HeaderSynceService) isInitialized() bool {
func (hSyncService *HeaderSyncService) isInitialized() bool {
return hSyncService.headerStore.Height() > 0
}

// OnStart is a part of Service interface.
func (hSyncService *HeaderSynceService) Start() error {
// Start is a part of Service interface.
func (hSyncService *HeaderSyncService) Start() error {
// have to do the initializations here to utilize the p2p node which is created on start
ps := hSyncService.p2p.PubSub()

Expand Down Expand Up @@ -203,8 +205,8 @@ func (hSyncService *HeaderSynceService) Start() error {
return nil
}

// OnStop is a part of Service interface.
func (hSyncService *HeaderSynceService) Stop() error {
// Stop is a part of Service interface.
func (hSyncService *HeaderSyncService) Stop() error {
err := hSyncService.headerStore.Stop(hSyncService.ctx)
err = multierr.Append(err, hSyncService.p2pServer.Stop(hSyncService.ctx))
err = multierr.Append(err, hSyncService.ex.Stop(hSyncService.ctx))
Expand Down Expand Up @@ -253,7 +255,8 @@ func newSyncer(
return goheadersync.NewSyncer[*types.SignedHeader](ex, store, sub, opts...)
}

func (hSyncService *HeaderSynceService) StartSyncer() error {
// StartSyncer starts the HeaderSyncService's syncer
func (hSyncService *HeaderSyncService) StartSyncer() error {
hSyncService.syncerStatus.m.Lock()
defer hSyncService.syncerStatus.m.Unlock()
if hSyncService.syncerStatus.started {
Expand Down
1 change: 1 addition & 0 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ func (m *Manager) getCommit(header types.Header) (*types.Commit, error) {
}, nil
}

// IsProposer returns whether or not the manager is a proposer
func (m *Manager) IsProposer() (bool, error) {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
Expand Down
3 changes: 2 additions & 1 deletion block/pending_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"github.com/rollkit/rollkit/types"
)

// Maintains blocks that need to be published to DA layer
// PendingBlocks maintains blocks that need to be published to DA layer
type PendingBlocks struct {
pendingBlocks []*types.Block
mtx *sync.RWMutex
}

// NewPendingBlocks returns a new PendingBlocks struct
func NewPendingBlocks() *PendingBlocks {
return &PendingBlocks{
pendingBlocks: make([]*types.Block, 0),
Expand Down
4 changes: 2 additions & 2 deletions block/syncer_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"sync"
)

// Used by header and block exchange service for keeping track of
// the status of the syncer in them.
// SyncerStatus is used by header and block exchange service for keeping track
// of the status of the syncer in them.
type SyncerStatus struct {
started bool
m sync.RWMutex
Expand Down
3 changes: 2 additions & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
const (
// DefaultListenAddress is a default listen address for P2P client.
DefaultListenAddress = "/ip4/0.0.0.0/tcp/7676"
Version = "0.4.0"
// Version is the current rollkit version and is used for checking RPC compatibility??
Version = "0.4.0"
)

// DefaultNodeConfig keeps default values of NodeConfig
Expand Down
2 changes: 2 additions & 0 deletions da/celestia/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/rollkit/rollkit/types"
)

// ErrorCode represents RPC error code.
type ErrorCode int

type respError struct {
Expand All @@ -27,6 +28,7 @@ type respError struct {
Meta json.RawMessage `json:"meta,omitempty"`
}

// Error returns error message string.
func (e *respError) Error() string {
if e.Code >= -32768 && e.Code <= -32000 {
return fmt.Sprintf("RPC error (%d): %s", e.Code, e.Message)
Expand Down
6 changes: 4 additions & 2 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ var (
ErrDataNotFound = errors.New("data not found")
// ErrNamespaceNotFound is used to indicate that the block contains data, but not for the requested namespace.
ErrNamespaceNotFound = errors.New("namespace not found in data")
ErrBlobNotFound = errors.New("blob: not found")
ErrEDSNotFound = errors.New("eds not found")
// ErrBlobNotFound is used to indicate that the blob was not found.
ErrBlobNotFound = errors.New("blob: not found")
// ErrEDSNotFound is used to indicate that the EDS was not found.
ErrEDSNotFound = errors.New("eds not found")
)

// StatusCode is a type for DA layer return status.
Expand Down
2 changes: 1 addition & 1 deletion da/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/rollkit/rollkit/da/mock"
)

func TestRegistery(t *testing.T) {
func TestRegistry(t *testing.T) {
assert := assert.New(t)

expected := []string{"mock", "grpc", "celestia"}
Expand Down
4 changes: 0 additions & 4 deletions mempool/clist/clist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ func TestSmall(t *testing.T) {

// This test is quite hacky because it relies on SetFinalizer
// which isn't guaranteed to run at all.
//
//nolint:unused,deadcode
func _TestGCFifo(t *testing.T) {
if runtime.GOARCH != "amd64" {
t.Skipf("Skipping on non-amd64 machine")
Expand Down Expand Up @@ -118,8 +116,6 @@ func _TestGCFifo(t *testing.T) {

// This test is quite hacky because it relies on SetFinalizer
// which isn't guaranteed to run at all.
//
//nolint:unused,deadcode
func _TestGCRandom(t *testing.T) {
if runtime.GOARCH != "amd64" {
t.Skipf("Skipping on non-amd64 machine")
Expand Down
4 changes: 2 additions & 2 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (txmp *TxMempool) allEntriesSorted() []*WrappedTx {
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
var totalGas, totalBytes int64

keep := make([]types.Tx, 0, len(txmp.allEntriesSorted()))
var keep []types.Tx
for _, w := range txmp.allEntriesSorted() {
// N.B. When computing byte size, we need to include the overhead for
// encoding as protobuf to send to the application. This actually overestimates it
Expand Down Expand Up @@ -369,7 +369,7 @@ func (txmp *TxMempool) TxsFront() *clist.CElement { return txmp.txs.Front() }
// The result may have fewer than max elements (possibly zero) if the mempool
// does not have that many transactions available.
func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
var keep []types.Tx //nolint:prealloc
var keep []types.Tx

for _, w := range txmp.allEntriesSorted() {
if max >= 0 && len(keep) >= max {
Expand Down
6 changes: 3 additions & 3 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type FullNode struct {
eventBus *cmtypes.EventBus
dalc da.DataAvailabilityLayerClient
p2pClient *p2p.Client
hSyncService *block.HeaderSynceService
hSyncService *block.HeaderSyncService
bSyncService *block.BlockSyncService
// TODO(tzdybal): consider extracting "mempool reactor"
Mempool mempool.Mempool
Expand Down Expand Up @@ -217,8 +217,8 @@ func initMempool(logger log.Logger, proxyApp proxy.AppConns) *mempoolv1.TxMempoo
return mempool
}

func initHeaderSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderSynceService, error) {
headerSyncService, err := block.NewHeaderSynceService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderSyncService"))
func initHeaderSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderSyncService, error) {
headerSyncService, err := block.NewHeaderSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderSyncService"))
if err != nil {
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions node/full_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func NewFullClient(node *FullNode) *FullClient {
}
}

// GetClient returns a new RPC client for the full node.
//
// TODO: should this be NewRPPCClient? Or should we add the client as a field of
// the FullNode so that it is just created once?
func (n *FullNode) GetClient() rpcclient.Client {
return NewFullClient(n)
}
Expand Down Expand Up @@ -796,11 +800,13 @@ func (c *FullClient) CheckTx(ctx context.Context, tx cmtypes.Tx) (*ctypes.Result
return &ctypes.ResultCheckTx{ResponseCheckTx: *res}, nil
}

// Header returns a cometbft ResultsHeader for the FullClient
func (c *FullClient) Header(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) {
blockMeta := c.getBlockMeta(*height)
return &ctypes.ResultHeader{Header: &blockMeta.Header}, nil
}

// HeaderByHash loads the block for the provided hash and returns the header
func (c *FullClient) HeaderByHash(ctx context.Context, hash cmbytes.HexBytes) (*ctypes.ResultHeader, error) {
// N.B. The hash parameter is HexBytes so that the reflective parameter
// decoding logic in the HTTP service will correctly translate from JSON.
Expand Down
24 changes: 24 additions & 0 deletions node/full_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,3 +1189,27 @@ func TestHealth(t *testing.T) {
assert.Nil(err)
assert.Empty(resultHealth)
}

func TestNetInfo(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

mockApp, rpc := getRPC(t)
mockApp.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{})
mockApp.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})
mockApp.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{})
mockApp.On("Commit", mock.Anything).Return(abci.ResponseCommit{})

err := rpc.node.Start()
require.NoError(err)
defer func() {
err := rpc.node.Stop()
require.NoError(err)
}()

netInfo, err := rpc.NetInfo(context.Background())
require.NoError(err)
assert.NotNil(netInfo)
assert.True(netInfo.Listening)
assert.Equal(0, len(netInfo.Peers))
}
Loading

0 comments on commit e30afe2

Please sign in to comment.