Skip to content

Commit

Permalink
pebble based chunk data pack
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Jun 14, 2024
1 parent 42d7ae7 commit e3cb0a9
Show file tree
Hide file tree
Showing 15 changed files with 658 additions and 64 deletions.
10 changes: 6 additions & 4 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error {
return nil
}

func openChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) {
func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) {
log := sutil.NewLogger(logger)

opts := badgerDB.
Expand Down Expand Up @@ -699,17 +699,19 @@ func (exeNode *ExecutionNode) LoadExecutionState(
error,
) {

chunkDataPackDB, err := openChunkDataPackDB(exeNode.exeConf.chunkDataPackDir, node.Logger)
chunkDataPackDB, err := storagepebble.OpenDefaultPebbleDB(exeNode.exeConf.chunkDataPackDir)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not open chunk data pack database: %w", err)
}

exeNode.builder.ShutdownFunc(func() error {
if err := chunkDataPackDB.Close(); err != nil {
return fmt.Errorf("error closing chunk data pack database: %w", err)
}
return nil
})
chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
// chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB)
Expand Down
51 changes: 51 additions & 0 deletions model/flow/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"

"github.com/ipfs/go-cid"
"github.com/vmihailenco/msgpack/v4"
)

var EmptyEventCollectionID Identifier
Expand Down Expand Up @@ -192,3 +193,53 @@ type BlockExecutionDataRoot struct {
// associated with this block.
ChunkExecutionDataIDs []cid.Cid
}

// MarshalMsgpack implements the msgpack.Marshaler interface
func (b BlockExecutionDataRoot) MarshalMsgpack() ([]byte, error) {
return msgpack.Marshal(struct {
BlockID Identifier
ChunkExecutionDataIDs []string
}{
BlockID: b.BlockID,
ChunkExecutionDataIDs: cidsToStrings(b.ChunkExecutionDataIDs),
})
}

// UnmarshalMsgpack implements the msgpack.Unmarshaler interface
func (b *BlockExecutionDataRoot) UnmarshalMsgpack(data []byte) error {
var temp struct {
BlockID Identifier
ChunkExecutionDataIDs []string
}

if err := msgpack.Unmarshal(data, &temp); err != nil {
return err
}

b.BlockID = temp.BlockID
b.ChunkExecutionDataIDs = stringsToCids(temp.ChunkExecutionDataIDs)

return nil
}

// Helper function to convert a slice of cid.Cid to a slice of strings
func cidsToStrings(cids []cid.Cid) []string {
strs := make([]string, len(cids))
for i, c := range cids {
strs[i] = c.String()
}
return strs
}

// Helper function to convert a slice of strings to a slice of cid.Cid
func stringsToCids(strs []string) []cid.Cid {
cids := make([]cid.Cid, len(strs))
for i, s := range strs {
c, err := cid.Decode(s)
if err != nil {
panic(err) // Handle error appropriately in real code
}
cids[i] = c
}
return cids
}
40 changes: 10 additions & 30 deletions storage/badger/chunkDataPacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,32 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
badgermodel "github.com/onflow/flow-go/storage/badger/model"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/transaction"
)

type ChunkDataPacks struct {
db *badger.DB
collections storage.Collections
byChunkIDCache *Cache[flow.Identifier, *badgermodel.StoredChunkDataPack]
byChunkIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack]
}

func NewChunkDataPacks(collector module.CacheMetrics, db *badger.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks {

store := func(key flow.Identifier, val *badgermodel.StoredChunkDataPack) func(*transaction.Tx) error {
store := func(key flow.Identifier, val *storage.StoredChunkDataPack) func(*transaction.Tx) error {
return transaction.WithTx(operation.SkipDuplicates(operation.InsertChunkDataPack(val)))
}

retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
var c badgermodel.StoredChunkDataPack
retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) {
var c storage.StoredChunkDataPack
err := operation.RetrieveChunkDataPack(key, &c)(tx)
return &c, err
}
}

cache := newCache(collector, metrics.ResourceChunkDataPack,
withLimit[flow.Identifier, *badgermodel.StoredChunkDataPack](byChunkIDCacheSize),
withLimit[flow.Identifier, *storage.StoredChunkDataPack](byChunkIDCacheSize),
withStore(store),
withRetrieve(retrieve),
)
Expand Down Expand Up @@ -71,7 +70,7 @@ func (ch *ChunkDataPacks) Remove(chunkIDs []flow.Identifier) error {
// No errors are expected during normal operation, but it may return generic error
// if entity is not serializable or Badger unexpectedly fails to process request
func (ch *ChunkDataPacks) BatchStore(c *flow.ChunkDataPack, batch storage.BatchStorage) error {
sc := toStoredChunkDataPack(c)
sc := storage.ToStoredChunkDataPack(c)
writeBatch := batch.GetWriter()
batch.OnSucceed(func() {
ch.byChunkIDCache.Insert(sc.ChunkID, sc)
Expand Down Expand Up @@ -133,7 +132,7 @@ func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPac
return chdp, nil
}

func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*badgermodel.StoredChunkDataPack, error) {
func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*storage.StoredChunkDataPack, error) {
tx := ch.db.NewTransaction(false)
defer tx.Discard()

Expand All @@ -145,31 +144,12 @@ func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*badgermodel.Store
return schdp, nil
}

func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn) (*storage.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) {
val, err := ch.byChunkIDCache.Get(chunkID)(tx)
if err != nil {
return nil, err
}
return val, nil
}
}

func toStoredChunkDataPack(c *flow.ChunkDataPack) *badgermodel.StoredChunkDataPack {
sc := &badgermodel.StoredChunkDataPack{
ChunkID: c.ChunkID,
StartState: c.StartState,
Proof: c.Proof,
SystemChunk: false,
ExecutionDataRoot: c.ExecutionDataRoot,
}

if c.Collection != nil {
// non system chunk
sc.CollectionID = c.Collection.ID()
} else {
sc.SystemChunk = true
}

return sc
}
17 changes: 0 additions & 17 deletions storage/badger/model/storedChunkDataPack.go

This file was deleted.

8 changes: 4 additions & 4 deletions storage/badger/operation/chunkDataPacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/model/flow"
badgermodel "github.com/onflow/flow-go/storage/badger/model"
"github.com/onflow/flow-go/storage"
)

// InsertChunkDataPack inserts a chunk data pack keyed by chunk ID.
func InsertChunkDataPack(c *badgermodel.StoredChunkDataPack) func(*badger.Txn) error {
func InsertChunkDataPack(c *storage.StoredChunkDataPack) func(*badger.Txn) error {
return insert(makePrefix(codeChunkDataPack, c.ChunkID), c)
}

// BatchInsertChunkDataPack inserts a chunk data pack keyed by chunk ID into a batch
func BatchInsertChunkDataPack(c *badgermodel.StoredChunkDataPack) func(batch *badger.WriteBatch) error {
func BatchInsertChunkDataPack(c *storage.StoredChunkDataPack) func(batch *badger.WriteBatch) error {
return batchWrite(makePrefix(codeChunkDataPack, c.ChunkID), c)
}

Expand All @@ -25,7 +25,7 @@ func BatchRemoveChunkDataPack(chunkID flow.Identifier) func(batch *badger.WriteB
}

// RetrieveChunkDataPack retrieves a chunk data pack by chunk ID.
func RetrieveChunkDataPack(chunkID flow.Identifier, c *badgermodel.StoredChunkDataPack) func(*badger.Txn) error {
func RetrieveChunkDataPack(chunkID flow.Identifier, c *storage.StoredChunkDataPack) func(*badger.Txn) error {
return retrieve(makePrefix(codeChunkDataPack, chunkID), c)
}

Expand Down
10 changes: 5 additions & 5 deletions storage/badger/operation/chunkDataPacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

storagemodel "github.com/onflow/flow-go/storage/badger/model"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/unittest"
)

func TestChunkDataPack(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
collectionID := unittest.IdentifierFixture()
expected := &storagemodel.StoredChunkDataPack{
expected := &storage.StoredChunkDataPack{
ChunkID: unittest.IdentifierFixture(),
StartState: unittest.StateCommitmentFixture(),
Proof: []byte{'p'},
CollectionID: collectionID,
}

t.Run("Retrieve non-existent", func(t *testing.T) {
var actual storagemodel.StoredChunkDataPack
var actual storage.StoredChunkDataPack
err := db.View(RetrieveChunkDataPack(expected.ChunkID, &actual))
assert.Error(t, err)
})
Expand All @@ -31,7 +31,7 @@ func TestChunkDataPack(t *testing.T) {
err := db.Update(InsertChunkDataPack(expected))
require.NoError(t, err)

var actual storagemodel.StoredChunkDataPack
var actual storage.StoredChunkDataPack
err = db.View(RetrieveChunkDataPack(expected.ChunkID, &actual))
assert.NoError(t, err)

Expand All @@ -42,7 +42,7 @@ func TestChunkDataPack(t *testing.T) {
err := db.Update(RemoveChunkDataPack(expected.ChunkID))
require.NoError(t, err)

var actual storagemodel.StoredChunkDataPack
var actual storage.StoredChunkDataPack
err = db.View(RetrieveChunkDataPack(expected.ChunkID, &actual))
assert.Error(t, err)
})
Expand Down
34 changes: 31 additions & 3 deletions storage/chunkDataPacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ type ChunkDataPacks interface {
// No errors are expected during normal operation, but it may return generic error
Remove(cs []flow.Identifier) error

// BatchStore inserts the chunk header, keyed by chunk ID into a given batch
BatchStore(c *flow.ChunkDataPack, batch BatchStorage) error

// ByChunkID returns the chunk data for the given a chunk ID.
ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error)

Expand All @@ -26,3 +23,34 @@ type ChunkDataPacks interface {
// If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.
BatchRemove(chunkID flow.Identifier, batch BatchStorage) error
}

// StoredChunkDataPack is an in-storage representation of chunk data pack.
// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining
// the collection on a secondary storage.
type StoredChunkDataPack struct {
ChunkID flow.Identifier
StartState flow.StateCommitment
Proof flow.StorageProof
CollectionID flow.Identifier
SystemChunk bool
ExecutionDataRoot flow.BlockExecutionDataRoot
}

func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack {
sc := &StoredChunkDataPack{
ChunkID: c.ChunkID,
StartState: c.StartState,
Proof: c.Proof,
SystemChunk: false,
ExecutionDataRoot: c.ExecutionDataRoot,
}

if c.Collection != nil {
// non system chunk
sc.CollectionID = c.Collection.ID()
} else {
sc.SystemChunk = true
}

return sc
}
57 changes: 57 additions & 0 deletions storage/pebble/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package pebble

import (
"sync"

"github.com/cockroachdb/pebble"
)

type Batch struct {
writer *pebble.Batch

lock sync.RWMutex
callbacks []func()
}

func NewBatch(db *pebble.DB) *Batch {
batch := db.NewBatch()
return &Batch{
writer: batch,
callbacks: make([]func(), 0),
}
}

func (b *Batch) GetWriter() *pebble.Batch {
return b.writer
}

// OnSucceed adds a callback to execute after the batch has
// been successfully flushed.
// useful for implementing the cache where we will only cache
// after the batch has been successfully flushed
func (b *Batch) OnSucceed(callback func()) {
b.lock.Lock()
defer b.lock.Unlock()
b.callbacks = append(b.callbacks, callback)
}

// Flush will call the badger Batch's Flush method, in
// addition, it will call the callbacks added by
// OnSucceed
func (b *Batch) Flush() error {
err := b.writer.Commit(nil)
if err != nil {
return err
}

b.lock.RLock()
defer b.lock.RUnlock()
for _, callback := range b.callbacks {
callback()
}
return nil
}

func (b *Batch) Close() error {
return b.writer.Close()
}
Loading

0 comments on commit e3cb0a9

Please sign in to comment.