Skip to content

Commit

Permalink
Merge pull request cosmos#341 from celestiaorg/tzdybal/new_dalc_api
Browse files Browse the repository at this point in the history
* new DALC proto definitions and implementation
* add DAStartHeight configuration to block.Manager
  • Loading branch information
tzdybal authored Apr 10, 2022
2 parents 404e426 + 3e4a966 commit cf71b4d
Show file tree
Hide file tree
Showing 16 changed files with 487 additions and 292 deletions.
104 changes: 79 additions & 25 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package block
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"
"go.uber.org/multierr"

"github.com/celestiaorg/optimint/config"
"github.com/celestiaorg/optimint/da"
Expand All @@ -22,6 +24,9 @@ import (
"github.com/celestiaorg/optimint/types"
)

// defaultDABlockTime is used only if DABlockTime is not configured for manager
const defaultDABlockTime = 30 * time.Second

// Manager is responsible for aggregating transactions into blocks.
type Manager struct {
lastState state.State
Expand All @@ -36,15 +41,21 @@ type Manager struct {

dalc da.DataAvailabilityLayerClient
retriever da.BlockRetriever
// daHeight is the height of the latest processed DA block
daHeight uint64

HeaderOutCh chan *types.Header
HeaderInCh chan *types.Header

syncTarget uint64
blockInCh chan *types.Block
retrieveCh chan uint64
syncCache map[uint64]*types.Block

// retrieveMtx is used by retrieveCond
retrieveMtx *sync.Mutex
// retrieveCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve data
retrieveCond *sync.Cond

logger log.Logger
}

Expand Down Expand Up @@ -72,12 +83,20 @@ func NewManager(
if err != nil {
return nil, err
}
if s.DAHeight < conf.DAStartHeight {
s.DAHeight = conf.DAStartHeight
}

proposerAddress, err := getAddress(proposerKey)
if err != nil {
return nil, err
}

if conf.DABlockTime == 0 {
logger.Info("WARNING: using default DA block time", "DABlockTime", defaultDABlockTime)
conf.DABlockTime = defaultDABlockTime
}

exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger)
if s.LastBlockHeight+1 == genesis.InitialHeight {
res, err := exec.InitChain(genesis)
Expand All @@ -100,13 +119,16 @@ func NewManager(
executor: exec,
dalc: dalc,
retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
HeaderOutCh: make(chan *types.Header),
HeaderInCh: make(chan *types.Header),
blockInCh: make(chan *types.Block),
retrieveCh: make(chan uint64),
daHeight: s.DAHeight,
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderOutCh: make(chan *types.Header, 100),
HeaderInCh: make(chan *types.Header, 100),
blockInCh: make(chan *types.Block, 100),
retrieveMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
logger: logger,
}
agg.retrieveCond = sync.NewCond(agg.retrieveMtx)

return agg, nil
}
Expand Down Expand Up @@ -142,8 +164,11 @@ func (m *Manager) AggregationLoop(ctx context.Context) {
}

func (m *Manager) SyncLoop(ctx context.Context) {
daTicker := time.NewTicker(m.conf.DABlockTime)
for {
select {
case <-daTicker.C:
m.retrieveCond.Signal()
case header := <-m.HeaderInCh:
m.logger.Debug("block header received", "height", header.Height, "hash", header.Hash())
newHeight := header.Height
Expand All @@ -153,14 +178,15 @@ func (m *Manager) SyncLoop(ctx context.Context) {
// it's handled gently in RetrieveLoop
if newHeight > currentHeight {
atomic.StoreUint64(&m.syncTarget, newHeight)
m.retrieveCh <- newHeight
m.retrieveCond.Signal()
}
case block := <-m.blockInCh:
m.logger.Debug("block body retrieved from DALC",
"height", block.Header.Height,
"hash", block.Hash(),
)
m.syncCache[block.Header.Height] = block
m.retrieveCond.Signal()
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory
b1, ok1 := m.syncCache[currentHeight+1]
b2, ok2 := m.syncCache[currentHeight+2]
Expand All @@ -181,6 +207,7 @@ func (m *Manager) SyncLoop(ctx context.Context) {
continue
}

newState.DAHeight = atomic.LoadUint64(&m.daHeight)
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
Expand All @@ -195,50 +222,75 @@ func (m *Manager) SyncLoop(ctx context.Context) {
}
}

// RetrieveLoop is responsible for interacting with DA layer.
func (m *Manager) RetrieveLoop(ctx context.Context) {
// waitCh is used to signal the retrieve loop, that it should process next blocks
// retrieveCond can be signalled in completely async manner, and goroutine below
// works as some kind of "buffer" for those signals
waitCh := make(chan interface{})
go func() {
for {
m.retrieveMtx.Lock()
m.retrieveCond.Wait()
waitCh <- nil
m.retrieveMtx.Unlock()
if ctx.Err() != nil {
return
}
}
}()

for {
select {
case <-m.retrieveCh:
target := atomic.LoadUint64(&m.syncTarget)
for h := m.store.Height() + 1; h <= target; h++ {
m.logger.Debug("trying to retrieve block from DALC", "height", h)
m.mustRetrieveBlock(ctx, h)
case <-waitCh:
for {
daHeight := atomic.LoadUint64(&m.daHeight)
m.logger.Debug("retrieve", "daHeight", daHeight)
err := m.processNextDABlock()
if err != nil {
m.logger.Error("failed to retrieve block from DALC", "daHeight", daHeight, "errors", err.Error())
break
}
atomic.AddUint64(&m.daHeight, 1)
}
case <-ctx.Done():
return
}
}
}

func (m *Manager) mustRetrieveBlock(ctx context.Context, height uint64) {
func (m *Manager) processNextDABlock() error {
// TODO(tzdybal): extract configuration option
maxRetries := 10
daHeight := atomic.LoadUint64(&m.daHeight)

var err error
m.logger.Debug("trying to retrieve block from DA", "daHeight", daHeight)
for r := 0; r < maxRetries; r++ {
err := m.fetchBlock(ctx, height)
if err == nil {
return
blockResp, fetchErr := m.fetchBlock(daHeight)
if fetchErr != nil {
err = multierr.Append(err, fetchErr)
time.Sleep(100 * time.Millisecond)
} else {
for _, block := range blockResp.Blocks {
m.blockInCh <- block
}
return nil
}
// TODO(tzdybal): configuration option
// TODO(tzdybal): exponential backoff
time.Sleep(100 * time.Millisecond)
}
// TODO(tzdybal): this is only temporary solution, for MVP
panic("failed to retrieve block with DALC")
return err
}

func (m *Manager) fetchBlock(ctx context.Context, height uint64) error {
func (m *Manager) fetchBlock(daHeight uint64) (da.ResultRetrieveBlocks, error) {
var err error
blockRes := m.retriever.RetrieveBlock(height)
blockRes := m.retriever.RetrieveBlocks(daHeight)
switch blockRes.Code {
case da.StatusSuccess:
m.blockInCh <- blockRes.Block
case da.StatusError:
err = fmt.Errorf("failed to retrieve block: %s", blockRes.Message)
case da.StatusTimeout:
err = fmt.Errorf("timeout during retrieve block: %s", blockRes.Message)
}
return err
return blockRes, err
}

func (m *Manager) getRemainingSleep(start time.Time) time.Duration {
Expand Down Expand Up @@ -305,6 +357,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

newState.DAHeight = atomic.LoadUint64(&m.daHeight)
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
Expand All @@ -320,6 +373,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

func (m *Manager) broadcastBlock(ctx context.Context, block *types.Block) error {
m.logger.Debug("submitting block to DA layer", "height", block.Header.Height)
res := m.dalc.SubmitBlock(block)
if res.Code != da.StatusSuccess {
return fmt.Errorf("DA layer submission failed: %s", res.Message)
Expand Down
9 changes: 7 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ type NodeConfig struct {

// BlockManagerConfig consists of all parameters required by BlockManagerConfig
type BlockManagerConfig struct {
BlockTime time.Duration `mapstructure:"block_time"`
NamespaceID [8]byte `mapstructure:"namespace_id"`
// BlockTime defines how often new blocks are produced
BlockTime time.Duration `mapstructure:"block_time"`
// DABlockTime informs about block time of underlying data availability layer
DABlockTime time.Duration `mapstructure:"da_block_time"`
// DAStartHeight allows skipping first DAStartHeight-1 blocks when querying for blocks.
DAStartHeight uint64 `mapstructure:"da_start_height"`
NamespaceID [8]byte `mapstructure:"namespace_id"`
}

func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error {
Expand Down
12 changes: 7 additions & 5 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type DAResult struct {
Code StatusCode
// Message may contain DA layer specific information (like DA block height/hash, detailed error message, etc)
Message string
// DAHeight informs about a height on Data Availability Layer for given result.
DAHeight uint64
}

// ResultSubmitBlock contains information returned from DA layer after block submission.
Expand All @@ -43,11 +45,11 @@ type ResultCheckBlock struct {
DataAvailable bool
}

type ResultRetrieveBlock struct {
type ResultRetrieveBlocks struct {
DAResult
// Block is the full block retrieved from Data Availability Layer.
// If Code is not equal to StatusSuccess, it has to be nil.
Block *types.Block
Blocks []*types.Block
}

// DataAvailabilityLayerClient defines generic interface for DA layer block submission.
Expand All @@ -65,12 +67,12 @@ type DataAvailabilityLayerClient interface {
SubmitBlock(block *types.Block) ResultSubmitBlock

// CheckBlockAvailability queries DA layer to check data availability of block corresponding to given header.
CheckBlockAvailability(header *types.Header) ResultCheckBlock
CheckBlockAvailability(dataLayerHeight uint64) ResultCheckBlock
}

// BlockRetriever is additional interface that can be implemented by Data Availability Layer Client that is able to retrieve
// block data from DA layer. This gives the ability to use it for block synchronization.
type BlockRetriever interface {
// RetrieveBlock returns block at given height from data availability layer.
RetrieveBlock(height uint64) ResultRetrieveBlock
// RetrieveBlocks returns blocks at given data layer height from data availability layer.
RetrieveBlocks(dataLayerHeight uint64) ResultRetrieveBlocks
}
38 changes: 25 additions & 13 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ func (d *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultS
}
}
return da.ResultSubmitBlock{
DAResult: da.DAResult{Code: da.StatusCode(resp.Result.Code), Message: resp.Result.Message},
DAResult: da.DAResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
DAHeight: resp.Result.DataLayerHeight,
},
}
}

func (d *DataAvailabilityLayerClient) CheckBlockAvailability(header *types.Header) da.ResultCheckBlock {
resp, err := d.client.CheckBlockAvailability(context.TODO(), &dalc.CheckBlockAvailabilityRequest{Header: header.ToProto()})
func (d *DataAvailabilityLayerClient) CheckBlockAvailability(dataLayerHeight uint64) da.ResultCheckBlock {
resp, err := d.client.CheckBlockAvailability(context.TODO(), &dalc.CheckBlockAvailabilityRequest{DataLayerHeight: dataLayerHeight})
if err != nil {
return da.ResultCheckBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
}
Expand All @@ -90,19 +94,27 @@ func (d *DataAvailabilityLayerClient) CheckBlockAvailability(header *types.Heade
}
}

func (d *DataAvailabilityLayerClient) RetrieveBlock(height uint64) da.ResultRetrieveBlock {
resp, err := d.client.RetrieveBlock(context.TODO(), &dalc.RetrieveBlockRequest{Height: height})
func (d *DataAvailabilityLayerClient) RetrieveBlocks(dataLayerHeight uint64) da.ResultRetrieveBlocks {
resp, err := d.client.RetrieveBlocks(context.TODO(), &dalc.RetrieveBlocksRequest{DataLayerHeight: dataLayerHeight})
if err != nil {
return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
return da.ResultRetrieveBlocks{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
}

var b types.Block
err = b.FromProto(resp.Block)
if err != nil {
return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
blocks := make([]*types.Block, len(resp.Blocks))
for i, block := range resp.Blocks {
var b types.Block
err = b.FromProto(block)
if err != nil {
return da.ResultRetrieveBlocks{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
}
blocks[i] = &b
}
return da.ResultRetrieveBlock{
DAResult: da.DAResult{Code: da.StatusCode(resp.Result.Code), Message: resp.Result.Message},
Block: &b,
return da.ResultRetrieveBlocks{
DAResult: da.DAResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
DAHeight: dataLayerHeight,
},
Blocks: blocks,
}
}
2 changes: 1 addition & 1 deletion da/grpc/mockserv/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func main() {
log.Panic(err)
}
log.Println("Listening on:", lis.Addr())
srv := mockserv.GetServer(kv, conf)
srv := mockserv.GetServer(kv, conf, nil)
if err := srv.Serve(lis); err != nil {
log.Println("error while serving:", err)
}
Expand Down
Loading

0 comments on commit cf71b4d

Please sign in to comment.