Skip to content

Commit

Permalink
Merge pull request #6474 from filecoin-project/feat/splitstore-redux
Browse files Browse the repository at this point in the history
Splitstore Enhanchements
  • Loading branch information
magik6k authored Jul 13, 2021
2 parents 5c3e79e + af39952 commit c37401a
Show file tree
Hide file tree
Showing 20 changed files with 2,058 additions and 1,131 deletions.
157 changes: 123 additions & 34 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"io"
"runtime"
"sync/atomic"
"sync"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
Expand Down Expand Up @@ -73,20 +73,16 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) {
}

const (
stateOpen int64 = iota
stateOpen = iota
stateClosing
stateClosed
)

// Blockstore is a badger-backed IPLD blockstore.
//
// NOTE: once Close() is called, methods will try their best to return
// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent
// operation calls after Close() has returned, but it may not happen for
// operations in progress. Those are likely to fail with a different error.
type Blockstore struct {
// state is accessed atomically
state int64
stateLk sync.RWMutex
state int
viewers sync.WaitGroup

DB *badger.DB

Expand All @@ -97,6 +93,7 @@ type Blockstore struct {

var _ blockstore.Blockstore = (*Blockstore)(nil)
var _ blockstore.Viewer = (*Blockstore)(nil)
var _ blockstore.BlockstoreIterator = (*Blockstore)(nil)
var _ io.Closer = (*Blockstore)(nil)

// Open creates a new badger-backed blockstore, with the supplied options.
Expand Down Expand Up @@ -124,19 +121,51 @@ func Open(opts Options) (*Blockstore, error) {
// Close closes the store. If the store has already been closed, this noops and
// returns an error, even if the first closure resulted in error.
func (b *Blockstore) Close() error {
if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) {
b.stateLk.Lock()
if b.state != stateOpen {
b.stateLk.Unlock()
return nil
}
b.state = stateClosing
b.stateLk.Unlock()

defer func() {
b.stateLk.Lock()
b.state = stateClosed
b.stateLk.Unlock()
}()

// wait for all accesses to complete
b.viewers.Wait()

defer atomic.StoreInt64(&b.state, stateClosed)
return b.DB.Close()
}

func (b *Blockstore) access() error {
b.stateLk.RLock()
defer b.stateLk.RUnlock()

if b.state != stateOpen {
return ErrBlockstoreClosed
}

b.viewers.Add(1)
return nil
}

func (b *Blockstore) isOpen() bool {
b.stateLk.RLock()
defer b.stateLk.RUnlock()

return b.state == stateOpen
}

// CollectGarbage runs garbage collection on the value log
func (b *Blockstore) CollectGarbage() error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

var err error
for err == nil {
Expand All @@ -153,9 +182,10 @@ func (b *Blockstore) CollectGarbage() error {

// Compact runs a synchronous compaction
func (b *Blockstore) Compact() error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

nworkers := runtime.NumCPU() / 2
if nworkers < 2 {
Expand All @@ -168,9 +198,10 @@ func (b *Blockstore) Compact() error {
// View implements blockstore.Viewer, which leverages zero-copy read-only
// access to values.
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

k, pooled := b.PooledStorageKey(cid)
if pooled {
Expand All @@ -191,9 +222,10 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {

// Has implements Blockstore.Has.
func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
return false, ErrBlockstoreClosed
if err := b.access(); err != nil {
return false, err
}
defer b.viewers.Done()

k, pooled := b.PooledStorageKey(cid)
if pooled {
Expand Down Expand Up @@ -221,9 +253,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
return nil, blockstore.ErrNotFound
}

if atomic.LoadInt64(&b.state) != stateOpen {
return nil, ErrBlockstoreClosed
if err := b.access(); err != nil {
return nil, err
}
defer b.viewers.Done()

k, pooled := b.PooledStorageKey(cid)
if pooled {
Expand All @@ -250,9 +283,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {

// GetSize implements Blockstore.GetSize.
func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
return -1, ErrBlockstoreClosed
if err := b.access(); err != nil {
return 0, err
}
defer b.viewers.Done()

k, pooled := b.PooledStorageKey(cid)
if pooled {
Expand All @@ -279,9 +313,10 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {

// Put implements Blockstore.Put.
func (b *Blockstore) Put(block blocks.Block) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
Expand All @@ -299,9 +334,10 @@ func (b *Blockstore) Put(block blocks.Block) error {

// PutMany implements Blockstore.PutMany.
func (b *Blockstore) PutMany(blocks []blocks.Block) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
Expand Down Expand Up @@ -338,9 +374,10 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {

// DeleteBlock implements Blockstore.DeleteBlock.
func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

k, pooled := b.PooledStorageKey(cid)
if pooled {
Expand All @@ -353,9 +390,10 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
}

func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
Expand Down Expand Up @@ -392,8 +430,8 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {

// AllKeysChan implements Blockstore.AllKeysChan.
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
return nil, ErrBlockstoreClosed
if err := b.access(); err != nil {
return nil, err
}

txn := b.DB.NewTransaction(false)
Expand All @@ -405,6 +443,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {

ch := make(chan cid.Cid)
go func() {
defer b.viewers.Done()
defer close(ch)
defer iter.Close()

Expand All @@ -415,7 +454,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if ctx.Err() != nil {
return // context has fired.
}
if atomic.LoadInt64(&b.state) != stateOpen {
if !b.isOpen() {
// open iterators will run even after the database is closed...
return // closing, yield.
}
Expand All @@ -442,6 +481,56 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return ch, nil
}

// Implementation of BlockstoreIterator interface
func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

txn := b.DB.NewTransaction(false)
defer txn.Discard()

opts := badger.IteratorOptions{PrefetchSize: 100}
if b.prefixing {
opts.Prefix = b.prefix
}

iter := txn.NewIterator(opts)
defer iter.Close()

var buf []byte
for iter.Rewind(); iter.Valid(); iter.Next() {
if !b.isOpen() {
return ErrBlockstoreClosed
}

k := iter.Item().Key()
if b.prefixing {
k = k[b.prefixLen:]
}

klen := base32.RawStdEncoding.DecodedLen(len(k))
if klen > len(buf) {
buf = make([]byte, klen)
}

n, err := base32.RawStdEncoding.Decode(buf, k)
if err != nil {
return err
}

c := cid.NewCidV1(cid.Raw, buf[:n])

err = f(c)
if err != nil {
return err
}
}

return nil
}

// HashOnRead implements Blockstore.HashOnRead. It is not supported by this
// blockstore.
func (b *Blockstore) HashOnRead(_ bool) {
Expand Down
5 changes: 5 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type BatchDeleter interface {
DeleteMany(cids []cid.Cid) error
}

// BlockstoreIterator is a trait for efficient iteration
type BlockstoreIterator interface {
ForEachKey(func(cid.Cid) error) error
}

// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
// The ID store filters out all puts for blocks with CIDs using the "identity"
// hash function. It also extracts inlined blocks from CIDs using the identity
Expand Down
66 changes: 66 additions & 0 deletions blockstore/discard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package blockstore

import (
"context"
"io"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)

var _ Blockstore = (*discardstore)(nil)

type discardstore struct {
bs Blockstore
}

func NewDiscardStore(bs Blockstore) Blockstore {
return &discardstore{bs: bs}
}

func (b *discardstore) Has(cid cid.Cid) (bool, error) {
return b.bs.Has(cid)
}

func (b *discardstore) HashOnRead(hor bool) {
b.bs.HashOnRead(hor)
}

func (b *discardstore) Get(cid cid.Cid) (blocks.Block, error) {
return b.bs.Get(cid)
}

func (b *discardstore) GetSize(cid cid.Cid) (int, error) {
return b.bs.GetSize(cid)
}

func (b *discardstore) View(cid cid.Cid, f func([]byte) error) error {
return b.bs.View(cid, f)
}

func (b *discardstore) Put(blk blocks.Block) error {
return nil
}

func (b *discardstore) PutMany(blks []blocks.Block) error {
return nil
}

func (b *discardstore) DeleteBlock(cid cid.Cid) error {
return nil
}

func (b *discardstore) DeleteMany(cids []cid.Cid) error {
return nil
}

func (b *discardstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.bs.AllKeysChan(ctx)
}

func (b *discardstore) Close() error {
if c, ok := b.bs.(io.Closer); ok {
return c.Close()
}
return nil
}
Loading

0 comments on commit c37401a

Please sign in to comment.