Skip to content

Commit

Permalink
ArchiveDb
Browse files Browse the repository at this point in the history
Creates a thin database layer on top of database.Database. ArchiveDb is an
append only database which stores all state changes happening at every block
height. Each record is stored in such way to perform both fast inserts and selects.

Currently its API is quite simple, it has two main functions, one to create a
Batch write with a given block height, inside this batch entries can be added
with a given value or they can be deleted. It also provides a Get function
that takes a given key and a height.

The way it works is as follows:
	- Height: 10
		Set(foo, "foo's value is bar")
		Set(bar, "bar's value is bar")
	- Height: 100
		Set(foo, "updatedfoo's value is bar")
	- Height: 1000
		Set(bar, "updated bar's value is bar")
		Delete(foo)

When requesting `Get(foo, 9)` it will return an errNotFound error because foo
was not defined at block height 9, it was defined later. When calling
`Get(foo, 99)` it will return a tuple `("foo's value is bar", 10)` returning
the value of `foo` at height 99 (which was set at height 10). If requesting
`Get(foo, 2000)` it will return an error because `foo` was deleted at height
  • Loading branch information
nytzuga committed Aug 24, 2023
1 parent cf14d79 commit 220b315
Show file tree
Hide file tree
Showing 5 changed files with 411 additions and 0 deletions.
32 changes: 32 additions & 0 deletions x/archivedb/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package archivedb

import (
"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/codec/linearcodec"
"github.com/ava-labs/avalanchego/utils/units"
"github.com/ava-labs/avalanchego/utils/wrappers"
)

const (
Version = uint16(0)
maxMessageSize = 1 * units.MiB
)

var (
Codec codec.Manager
)

func init() {

Check failure on line 19 in x/archivedb/codec.go

View workflow job for this annotation

GitHub Actions / Static analysis

empty-lines: extra empty line at the start of a block (revive)

Codec = codec.NewManager(maxMessageSize)
c := linearcodec.NewDefault()

errs := wrappers.Errs{}
errs.Add(
c.RegisterType(Key{}),
Codec.RegisterCodec(Version, c),
)
if errs.Errored() {
panic(errs.Err)
}
}
149 changes: 149 additions & 0 deletions x/archivedb/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package archivedb

import (
"bytes"
"context"
"sync"

"github.com/ava-labs/avalanchego/database"
)

// ArchiveDb
//
// Creates a thin database layer on top of database.Database. ArchiveDb is an
// append only database which stores all state changes happening at every block
// height. Each record is stored in such way to perform both fast inserts and selects.
//
// Currently its API is quite simple, it has two main functions, one to create a
// Batch write with a given block height, inside this batch entries can be added
// with a given value or they can be deleted. It also provides a Get function
// that takes a given key and a height.
//
// The way it works is as follows:
// - Height: 10
// Set(foo, "foo's value is bar")
// Set(bar, "bar's value is bar")
// - Height: 100
// Set(foo, "updatedfoo's value is bar")
// - Height: 1000
// Set(bar, "updated bar's value is bar")
// Delete(foo)
//
// When requesting `Get(foo, 9)` it will return an errNotFound error because foo
// was not defined at block height 9, it was defined later. When calling
// `Get(foo, 99)` it will return a tuple `("foo's value is bar", 10)` returning
// the value of `foo` at height 99 (which was set at height 10). If requesting
// `Get(foo, 2000)` it will return an error because `foo` was deleted at height
// 1000.
type archiveDB struct {
// Must be held when reading/writing fields.
lock sync.RWMutex

// Must be held when preparing work to be committed to the DB.
// Used to prevent editing of the trie without restricting read access
// until the full set of changes is ready to be written.
// Should be held before taking [db.lock]
commitLock sync.RWMutex

Check failure on line 46 in x/archivedb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

field `commitLock` is unused (unused)

rawDb database.Database

Check failure on line 48 in x/archivedb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

ST1003: struct field rawDb should be rawDB (stylecheck)
}

type batchWithHeight struct {
height uint64
batch database.Batch
}

func newDatabase(
ctx context.Context,

Check failure on line 57 in x/archivedb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
db database.Database,
) (*archiveDB, error) {
return &archiveDB{
rawDb: db,
}, nil
}

// Fetches the value of a given prefix at a given height.
//
// If the value does not exists or it was actually removed an error is returned.
// Otherwise a value does exists it will be returned, alongside with the height
// at which it was updated prior the requested height.
func (db *archiveDB) Get(key []byte, height uint64) ([]byte, uint64, error) {
db.lock.RLock()
defer db.lock.RUnlock()

internalKey, err := NewKey(key, height)
if err != nil {
return nil, 0, err
}
iterator := db.rawDb.NewIteratorWithStart(internalKey.Bytes)
if !iterator.Next() {
// There is no available key with the requested prefix
return nil, 0, database.ErrNotFound
}

internalKey, err = ParseKey(iterator.Key())
if err != nil {
return nil, 0, err
}

if bytes.Compare(internalKey.Prefix, key) != 0 || internalKey.IsDeleted {

Check failure on line 89 in x/archivedb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

S1004: should use !bytes.Equal(internalKey.Prefix, key) instead (gosimple)
// The current key has either a different prefix or the found key has a
// deleted flag.
//
// The previous key that was found does has another prefix, because the
// iterator is not aware of prefixes. If this happens it means the
// prefix at the requested height does not exists.
//
// The database is append only, so when removing a record creates a new
// record with an special flag is being created. Before returning the
// value we check if the deleted flag is present or not.
return nil, 0, database.ErrNotFound
}

return iterator.Value(), internalKey.Height, nil
}

// Creates a new batch to append database changes in a given height
func (db *archiveDB) NewBatch(height uint64) batchWithHeight {
return batchWithHeight{
height: height,
batch: db.rawDb.NewBatch(),
}
}

// Writes the changes to the database
func (c *batchWithHeight) Write() error {
return c.batch.Write()
}

// Delete any previous state that may be stored in the database
func (c *batchWithHeight) Delete(key []byte) error {
internalKey, err := NewKey(key, c.height)
if err != nil {
return err
}
if err = internalKey.SetDeleted(); err != nil {
return err
}
return c.batch.Put(internalKey.Bytes, []byte{})
}

// Queues an insert for a key with a given
func (c *batchWithHeight) Put(key []byte, value []byte) error {
internalKey, err := NewKey(key, c.height)
if err != nil {
return err
}

return c.batch.Put(internalKey.Bytes, value)
}

// Returns the sizes to be commited in the database

Check failure on line 141 in x/archivedb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

`commited` is a misspelling of `committed` (misspell)
func (c *batchWithHeight) Size() int {
return c.batch.Size()
}

// Removed all pending writes and deletes to the database
func (c *batchWithHeight) Reset() {
c.batch.Reset()
}
97 changes: 97 additions & 0 deletions x/archivedb/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package archivedb

import (
"context"
"testing"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/stretchr/testify/require"
)

func getBasicDB() (*archiveDB, error) {
return newDatabase(
context.Background(),
memdb.New(),
)
}

func TestDbEntries(t *testing.T) {
db, err := getBasicDB()
require.NoError(t, err)

writer := db.NewBatch(10)
require.NoError(t, writer.Put([]byte("key1"), []byte("value1@10")))
require.NoError(t, writer.Put([]byte("key2"), []byte("value2@10")))
require.NoError(t, writer.Write())

writer = db.NewBatch(100)
require.NoError(t, writer.Put([]byte("key1"), []byte("value1@100")))
require.NoError(t, writer.Write())

writer = db.NewBatch(1000)
require.NoError(t, writer.Put([]byte("key1"), []byte("value1@1000")))
require.NoError(t, writer.Put([]byte("key2"), []byte("value2@1000")))
require.NoError(t, writer.Write())

value, height, err := db.Get([]byte("key1"), 10000)
require.NoError(t, err)
require.Equal(t, uint64(1000), height)
require.Equal(t, []byte("value1@1000"), value)

value, height, err = db.Get([]byte("key1"), 999)
require.NoError(t, err)
require.Equal(t, uint64(100), height)
require.Equal(t, []byte("value1@100"), value)

value, height, err = db.Get([]byte("key2"), 1999)
require.NoError(t, err)
require.Equal(t, uint64(1000), height)
require.Equal(t, []byte("value2@1000"), value)

value, height, err = db.Get([]byte("key2"), 999)
require.NoError(t, err)
require.Equal(t, uint64(10), height)
require.Equal(t, []byte("value2@10"), value)

_, _, err = db.Get([]byte("key1"), 9)
require.ErrorIs(t, err, database.ErrNotFound)

_, _, err = db.Get([]byte("key3"), 1999)
require.ErrorIs(t, err, database.ErrNotFound)
}

func TestDelete(t *testing.T) {
db, err := getBasicDB()
require.NoError(t, err)

writer := db.NewBatch(10)
require.NoError(t, writer.Put([]byte("key1"), []byte("value1@10")))
require.NoError(t, writer.Put([]byte("key2"), []byte("value2@10")))
require.NoError(t, writer.Write())

writer = db.NewBatch(100)
require.NoError(t, writer.Put([]byte("key1"), []byte("value1@100")))
require.NoError(t, writer.Write())

writer = db.NewBatch(1000)
require.NoError(t, writer.Delete([]byte("key1")))
require.NoError(t, writer.Delete([]byte("key2")))
require.NoError(t, writer.Write())

value, height, err := db.Get([]byte("key1"), 999)
require.NoError(t, err)
require.Equal(t, uint64(100), height)
require.Equal(t, []byte("value1@100"), value)

value, height, err = db.Get([]byte("key2"), 999)
require.NoError(t, err)
require.Equal(t, uint64(10), height)
require.Equal(t, []byte("value2@10"), value)

_, _, err = db.Get([]byte("key2"), 1000)
require.Error(t, err)

Check failure on line 93 in x/archivedb/db_test.go

View workflow job for this annotation

GitHub Actions / Static analysis

use of `require.Error` forbidden because "ErrorIs should be used instead" (forbidigo)

_, _, err = db.Get([]byte("key1"), 10000)
require.Error(t, err)

Check failure on line 96 in x/archivedb/db_test.go

View workflow job for this annotation

GitHub Actions / Static analysis

use of `require.Error` forbidden because "ErrorIs should be used instead" (forbidigo)
}
77 changes: 77 additions & 0 deletions x/archivedb/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package archivedb

import (
"errors"
"math"
)

var errInvalidVersion = errors.New("Invalid version")

Check failure on line 8 in x/archivedb/key.go

View workflow job for this annotation

GitHub Actions / Static analysis

ST1005: error strings should not be capitalized (stylecheck)

// Key
//
// The keys contains a few extra information alongside the given key. The key is
// what is known outside of this internal scope as the key, but this struct has
// more information compacted inside the Key to take advantage of natural
// sorting. The inversed height is stored right after the Prefix, which is
// MaxUint64 minus the desired Height. An extra byte is also packed inside the
// key to indicate if the key is a deletion of any previous value (because
// archivedb is an append only database).
//
// Any other property are to not serialized but they are useful when parsing a
// Key struct from the database
type Key struct {
Prefix []byte `serialize:"true"`
HeightReversed uint64 `serialize:"true"`
IsDeleted bool `serialize:"true"`
Height uint64
Bytes []byte
}

// Creates a new Key struct with a given key and its height
func NewKey(key []byte, height uint64) (Key, error) {
internalKey := Key{
Prefix: key,
HeightReversed: math.MaxUint64 - height,
IsDeleted: false,
Height: height,
}

bytes, err := Codec.Marshal(Version, &internalKey)

if err != nil {
return Key{}, err
}

internalKey.Bytes = bytes

return internalKey, nil
}

// Flag the current Key sturct as a deletion. If this happens the entry Value is
// not important and it will usually an empty vector of bytes
func (k *Key) SetDeleted() error {
k.IsDeleted = true
bytes, err := Codec.Marshal(Version, &k)
if err != nil {
return err
}
k.Bytes = bytes
return nil
}

// Takes a vector of bytes and returns a Key struct
func ParseKey(keyBytes []byte) (Key, error) {
var key Key
version, err := Codec.Unmarshal(keyBytes, &key)
if err != nil {
return Key{}, err
}
if version != Version {
return Key{}, errInvalidVersion
}

key.Height = math.MaxUint64 - key.HeightReversed
key.Bytes = keyBytes

return key, nil
}
Loading

0 comments on commit 220b315

Please sign in to comment.