Skip to content

Commit

Permalink
refactor: merge codec related input params
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Aug 21, 2024
1 parent 26ba82b commit e142ec4
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 105 deletions.
4 changes: 2 additions & 2 deletions rollup/internal/controller/relayer/l2_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (r *Layer2Relayer) initializeGenesis() error {

err = r.db.Transaction(func(dbTX *gorm.DB) error {
var dbChunk *orm.Chunk
dbChunk, err = r.chunkOrm.InsertChunk(r.ctx, chunk, encoding.CodecV0, false, rutils.ChunkMetrics{}, dbTX)
dbChunk, err = r.chunkOrm.InsertChunk(r.ctx, chunk, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.ChunkMetrics{}, dbTX)
if err != nil {
return fmt.Errorf("failed to insert chunk: %v", err)
}
Expand All @@ -218,7 +218,7 @@ func (r *Layer2Relayer) initializeGenesis() error {
}

var dbBatch *orm.Batch
dbBatch, err = r.batchOrm.InsertBatch(r.ctx, batch, encoding.CodecV0, false, rutils.BatchMetrics{}, dbTX)
dbBatch, err = r.batchOrm.InsertBatch(r.ctx, batch, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.BatchMetrics{}, dbTX)
if err != nil {
return fmt.Errorf("failed to insert batch: %v", err)
}
Expand Down
42 changes: 21 additions & 21 deletions rollup/internal/controller/relayer/l2_relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func testL2RelayerProcessPendingBatches(t *testing.T) {
err = l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
assert.NoError(t, err)
chunkOrm := orm.NewChunk(db)
_, err = chunkOrm.InsertChunk(context.Background(), chunk1, codecVersion, false, rutils.ChunkMetrics{})
_, err = chunkOrm.InsertChunk(context.Background(), chunk1, rutils.CodecConfig{Version: codecVersion}, rutils.ChunkMetrics{})
assert.NoError(t, err)
_, err = chunkOrm.InsertChunk(context.Background(), chunk2, codecVersion, false, rutils.ChunkMetrics{})
_, err = chunkOrm.InsertChunk(context.Background(), chunk2, rutils.CodecConfig{Version: codecVersion}, rutils.ChunkMetrics{})
assert.NoError(t, err)

batch := &encoding.Batch{
Expand All @@ -92,7 +92,7 @@ func testL2RelayerProcessPendingBatches(t *testing.T) {
}

batchOrm := orm.NewBatch(db)
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, codecVersion, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: codecVersion}, rutils.BatchMetrics{})
assert.NoError(t, err)

relayer.ProcessPendingBatches()
Expand Down Expand Up @@ -128,9 +128,9 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
err = l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
assert.NoError(t, err)
chunkOrm := orm.NewChunk(db)
_, err = chunkOrm.InsertChunk(context.Background(), chunk1, codecVersion, false, rutils.ChunkMetrics{})
_, err = chunkOrm.InsertChunk(context.Background(), chunk1, rutils.CodecConfig{Version: codecVersion}, rutils.ChunkMetrics{})
assert.NoError(t, err)
_, err = chunkOrm.InsertChunk(context.Background(), chunk2, codecVersion, false, rutils.ChunkMetrics{})
_, err = chunkOrm.InsertChunk(context.Background(), chunk2, rutils.CodecConfig{Version: codecVersion}, rutils.ChunkMetrics{})
assert.NoError(t, err)

batch := &encoding.Batch{
Expand All @@ -141,7 +141,7 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
}

batchOrm := orm.NewBatch(db)
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, codecVersion, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: codecVersion}, rutils.BatchMetrics{})
assert.NoError(t, err)

err = batchOrm.UpdateRollupStatus(context.Background(), dbBatch.Hash, types.RollupCommitted)
Expand Down Expand Up @@ -197,7 +197,7 @@ func testL2RelayerProcessPendingBundles(t *testing.T) {
}

batchOrm := orm.NewBatch(db)
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, codecVersion, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: codecVersion}, rutils.BatchMetrics{})
assert.NoError(t, err)

bundleOrm := orm.NewBundle(db)
Expand Down Expand Up @@ -259,9 +259,9 @@ func testL2RelayerFinalizeTimeoutBatches(t *testing.T) {
err = l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
assert.NoError(t, err)
chunkOrm := orm.NewChunk(db)
chunkDB1, err := chunkOrm.InsertChunk(context.Background(), chunk1, codecVersion, false, rutils.ChunkMetrics{})
chunkDB1, err := chunkOrm.InsertChunk(context.Background(), chunk1, rutils.CodecConfig{Version: codecVersion}, rutils.ChunkMetrics{})
assert.NoError(t, err)
chunkDB2, err := chunkOrm.InsertChunk(context.Background(), chunk2, codecVersion, false, rutils.ChunkMetrics{})
chunkDB2, err := chunkOrm.InsertChunk(context.Background(), chunk2, rutils.CodecConfig{Version: codecVersion}, rutils.ChunkMetrics{})
assert.NoError(t, err)

batch := &encoding.Batch{
Expand All @@ -272,7 +272,7 @@ func testL2RelayerFinalizeTimeoutBatches(t *testing.T) {
}

batchOrm := orm.NewBatch(db)
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, codecVersion, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: codecVersion}, rutils.BatchMetrics{})
assert.NoError(t, err)

err = batchOrm.UpdateRollupStatus(context.Background(), dbBatch.Hash, types.RollupCommitted)
Expand Down Expand Up @@ -326,9 +326,9 @@ func testL2RelayerFinalizeTimeoutBundles(t *testing.T) {
err = l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
assert.NoError(t, err)
chunkOrm := orm.NewChunk(db)
chunkDB1, err := chunkOrm.InsertChunk(context.Background(), chunk1, codecVersion, false, rutils.ChunkMetrics{})
chunkDB1, err := chunkOrm.InsertChunk(context.Background(), chunk1, rutils.CodecConfig{Version: codecVersion}, rutils.ChunkMetrics{})
assert.NoError(t, err)
chunkDB2, err := chunkOrm.InsertChunk(context.Background(), chunk2, codecVersion, false, rutils.ChunkMetrics{})
chunkDB2, err := chunkOrm.InsertChunk(context.Background(), chunk2, rutils.CodecConfig{Version: codecVersion}, rutils.ChunkMetrics{})
assert.NoError(t, err)

batch := &encoding.Batch{
Expand All @@ -339,7 +339,7 @@ func testL2RelayerFinalizeTimeoutBundles(t *testing.T) {
}

batchOrm := orm.NewBatch(db)
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, codecVersion, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: codecVersion}, rutils.BatchMetrics{})
assert.NoError(t, err)

err = batchOrm.UpdateRollupStatus(context.Background(), dbBatch.Hash, types.RollupCommitted)
Expand Down Expand Up @@ -411,7 +411,7 @@ func testL2RelayerCommitConfirm(t *testing.T) {
Chunks: []*encoding.Chunk{chunk1, chunk2},
}

dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, encoding.CodecV0, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.BatchMetrics{})
assert.NoError(t, err)
batchHashes[i] = dbBatch.Hash
}
Expand Down Expand Up @@ -467,7 +467,7 @@ func testL2RelayerFinalizeBatchConfirm(t *testing.T) {
Chunks: []*encoding.Chunk{chunk1, chunk2},
}

dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, encoding.CodecV0, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.BatchMetrics{})
assert.NoError(t, err)
batchHashes[i] = dbBatch.Hash
}
Expand Down Expand Up @@ -525,7 +525,7 @@ func testL2RelayerFinalizeBundleConfirm(t *testing.T) {
Chunks: []*encoding.Chunk{chunk1, chunk2},
}

dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, encoding.CodecV0, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.BatchMetrics{})
assert.NoError(t, err)
batchHashes[i] = dbBatch.Hash

Expand Down Expand Up @@ -580,7 +580,7 @@ func testL2RelayerGasOracleConfirm(t *testing.T) {
}

batchOrm := orm.NewBatch(db)
dbBatch1, err := batchOrm.InsertBatch(context.Background(), batch1, encoding.CodecV0, false, rutils.BatchMetrics{})
dbBatch1, err := batchOrm.InsertBatch(context.Background(), batch1, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.BatchMetrics{})
assert.NoError(t, err)

batch2 := &encoding.Batch{
Expand All @@ -590,7 +590,7 @@ func testL2RelayerGasOracleConfirm(t *testing.T) {
Chunks: []*encoding.Chunk{chunk2},
}

dbBatch2, err := batchOrm.InsertBatch(context.Background(), batch2, encoding.CodecV0, false, rutils.BatchMetrics{})
dbBatch2, err := batchOrm.InsertBatch(context.Background(), batch2, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.BatchMetrics{})
assert.NoError(t, err)

// Create and set up the Layer2 Relayer.
Expand Down Expand Up @@ -742,9 +742,9 @@ func testGetBatchStatusByIndex(t *testing.T) {
err = l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
assert.NoError(t, err)
chunkOrm := orm.NewChunk(db)
_, err = chunkOrm.InsertChunk(context.Background(), chunk1, encoding.CodecV0, false, rutils.ChunkMetrics{})
_, err = chunkOrm.InsertChunk(context.Background(), chunk1, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.ChunkMetrics{})
assert.NoError(t, err)
_, err = chunkOrm.InsertChunk(context.Background(), chunk2, encoding.CodecV0, false, rutils.ChunkMetrics{})
_, err = chunkOrm.InsertChunk(context.Background(), chunk2, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.ChunkMetrics{})
assert.NoError(t, err)

batch := &encoding.Batch{
Expand All @@ -755,7 +755,7 @@ func testGetBatchStatusByIndex(t *testing.T) {
}

batchOrm := orm.NewBatch(db)
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, encoding.CodecV0, false, rutils.BatchMetrics{})
dbBatch, err := batchOrm.InsertBatch(context.Background(), batch, rutils.CodecConfig{Version: encoding.CodecV0}, rutils.BatchMetrics{})
assert.NoError(t, err)

status, err := relayer.getBatchStatusByIndex(dbBatch)
Expand Down
32 changes: 19 additions & 13 deletions rollup/internal/controller/watcher/batch_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ func (p *BatchProposer) TryProposeBatch() {

func (p *BatchProposer) updateDBBatchInfo(batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics *utils.BatchMetrics) error {
compatibilityBreachOccurred := false
enableCompress := true
codecConfig := utils.CodecConfig{
Version: codecVersion,
EnableCompress: true, // codecv4 is the only version that supports conditional compression, default to enable compression
}

for {
compatible, err := utils.CheckBatchCompressedDataCompatibility(batch, codecVersion)
Expand All @@ -169,7 +172,7 @@ func (p *BatchProposer) updateDBBatchInfo(batch *encoding.Batch, codecVersion en
if len(batch.Chunks) == 1 {
log.Warn("Disable compression: cannot truncate batch with only 1 chunk for compatibility", "start block number", batch.Chunks[0].Blocks[0].Header.Number.Uint64(),
"end block number", batch.Chunks[0].Blocks[len(batch.Chunks[0].Blocks)-1].Header.Number.Uint64())
enableCompress = false
codecConfig.EnableCompress = false
break
}

Expand All @@ -183,7 +186,7 @@ func (p *BatchProposer) updateDBBatchInfo(batch *encoding.Batch, codecVersion en

// recalculate batch metrics after truncation
var calcErr error
metrics, calcErr = utils.CalculateBatchMetrics(batch, codecVersion, enableCompress)
metrics, calcErr = utils.CalculateBatchMetrics(batch, codecConfig)
if calcErr != nil {
return fmt.Errorf("failed to calculate batch metrics, batch index: %v, error: %w", batch.Index, calcErr)
}
Expand All @@ -194,9 +197,9 @@ func (p *BatchProposer) updateDBBatchInfo(batch *encoding.Batch, codecVersion en

p.proposeBatchUpdateInfoTotal.Inc()
err := p.db.Transaction(func(dbTX *gorm.DB) error {
dbBatch, dbErr := p.batchOrm.InsertBatch(p.ctx, batch, codecVersion, enableCompress, *metrics, dbTX)
dbBatch, dbErr := p.batchOrm.InsertBatch(p.ctx, batch, codecConfig, *metrics, dbTX)
if dbErr != nil {
log.Warn("BatchProposer.updateBatchInfoInDB insert batch failure", "index", batch.Index, "parent hash", batch.ParentBatchHash.Hex(), "codec version", codecVersion, "enable compress", enableCompress, "error", dbErr)
log.Warn("BatchProposer.updateBatchInfoInDB insert batch failure", "index", batch.Index, "parent hash", batch.ParentBatchHash.Hex(), "codec version", codecVersion, "enable compress", codecConfig.EnableCompress, "error", dbErr)
return dbErr
}
if dbErr = p.chunkOrm.UpdateBatchHashInRange(p.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex, dbBatch.Hash, dbTX); dbErr != nil {
Expand Down Expand Up @@ -235,8 +238,8 @@ func (p *BatchProposer) proposeBatch() error {
return nil
}

// Ensure all blocks in the same chunk use the same hardfork name
// If a different hardfork name is found, truncate the blocks slice at that point
// Ensure all chunks in the same batch use the same hardfork name
// If a different hardfork name is found, truncate the chunks slice at that point
hardforkName := forks.GetHardforkName(p.chainCfg, dbChunks[0].StartBlockNumber, dbChunks[0].StartBlockTime)
for i := 1; i < len(dbChunks); i++ {
currentHardfork := forks.GetHardforkName(p.chainCfg, dbChunks[i].StartBlockNumber, dbChunks[i].StartBlockTime)
Expand All @@ -257,7 +260,10 @@ func (p *BatchProposer) proposeBatch() error {
return err
}

codecVersion := forks.GetCodecVersion(p.chainCfg, firstUnbatchedChunk.StartBlockNumber, firstUnbatchedChunk.StartBlockTime)
codecConfig := utils.CodecConfig{
Version: forks.GetCodecVersion(p.chainCfg, firstUnbatchedChunk.StartBlockNumber, firstUnbatchedChunk.StartBlockTime),
EnableCompress: true, // codecv4 is the only version that supports conditional compression, default to enable compression
}

var batch encoding.Batch
batch.Index = dbParentBatch.Index + 1
Expand All @@ -266,7 +272,7 @@ func (p *BatchProposer) proposeBatch() error {

for i, chunk := range daChunks {
batch.Chunks = append(batch.Chunks, chunk)
metrics, calcErr := utils.CalculateBatchMetrics(&batch, codecVersion, true /* enable compress for codecv4 */)
metrics, calcErr := utils.CalculateBatchMetrics(&batch, codecConfig)
if calcErr != nil {
return fmt.Errorf("failed to calculate batch metrics: %w", calcErr)
}
Expand Down Expand Up @@ -295,17 +301,17 @@ func (p *BatchProposer) proposeBatch() error {

batch.Chunks = batch.Chunks[:len(batch.Chunks)-1]

metrics, err := utils.CalculateBatchMetrics(&batch, codecVersion, true /* enable compress for codecv4 */)
metrics, err := utils.CalculateBatchMetrics(&batch, codecConfig)
if err != nil {
return fmt.Errorf("failed to calculate batch metrics: %w", err)
}

p.recordAllBatchMetrics(metrics)
return p.updateDBBatchInfo(&batch, codecVersion, metrics)
return p.updateDBBatchInfo(&batch, codecConfig.Version, metrics)
}
}

metrics, calcErr := utils.CalculateBatchMetrics(&batch, codecVersion, true /* enable compress for codecv4 */)
metrics, calcErr := utils.CalculateBatchMetrics(&batch, codecConfig)
if calcErr != nil {
return fmt.Errorf("failed to calculate batch metrics: %w", calcErr)
}
Expand All @@ -319,7 +325,7 @@ func (p *BatchProposer) proposeBatch() error {

p.batchFirstBlockTimeoutReached.Inc()
p.recordAllBatchMetrics(metrics)
return p.updateDBBatchInfo(&batch, codecVersion, metrics)
return p.updateDBBatchInfo(&batch, codecConfig.Version, metrics)
}

log.Debug("pending chunks do not reach one of the constraints or contain a timeout block")
Expand Down
Loading

0 comments on commit e142ec4

Please sign in to comment.