Skip to content

Commit

Permalink
Merge pull request #5703 from filecoin-project/feat/batch-delete
Browse files Browse the repository at this point in the history
add DeleteMany to Blockstore interface
  • Loading branch information
vyzo authored Mar 2, 2021
2 parents bb026de + e960885 commit 5c13901
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 2 deletions.
38 changes: 38 additions & 0 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,44 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
})
}

func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
}

batch := b.DB.NewWriteBatch()
defer batch.Cancel()

// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
var toReturn [][]byte
if b.prefixing {
toReturn = make([][]byte, 0, len(cids))
defer func() {
for _, b := range toReturn {
KeyPool.Put(b)
}
}()
}

for _, cid := range cids {
k, pooled := b.PooledStorageKey(cid)
if pooled {
toReturn = append(toReturn, k)
}
if err := batch.Delete(k); err != nil {
return err
}
}

err := batch.Flush()
if err != nil {
err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err)
}
return err
}

// AllKeysChan implements Blockstore.AllKeysChan.
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
Expand Down
33 changes: 31 additions & 2 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package blockstore

import (
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"

Expand All @@ -18,20 +18,38 @@ var ErrNotFound = blockstore.ErrNotFound
type Blockstore interface {
blockstore.Blockstore
blockstore.Viewer
BatchDeleter
}

// BasicBlockstore is an alias to the original IPFS Blockstore.
type BasicBlockstore = blockstore.Blockstore

type Viewer = blockstore.Viewer

type BatchDeleter interface {
DeleteMany(cids []cid.Cid) error
}

// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
// The ID store filters out all puts for blocks with CIDs using the "identity"
// hash function. It also extracts inlined blocks from CIDs using the identity
// hash function and returns them on get/has, ignoring the contents of the
// blockstore.
func WrapIDStore(bstore blockstore.Blockstore) Blockstore {
return blockstore.NewIdStore(bstore).(Blockstore)
if is, ok := bstore.(*idstore); ok {
// already wrapped
return is
}

if bs, ok := bstore.(Blockstore); ok {
// we need to wrap our own because we don't want to neuter the DeleteMany method
// the underlying blockstore has implemented an (efficient) DeleteMany
return NewIDStore(bs)
}

// The underlying blockstore does not implement DeleteMany, so we need to shim it.
// This is less efficient as it'll iterate and perform single deletes.
return NewIDStore(Adapt(bstore))
}

// FromDatastore creates a new blockstore backed by the given datastore.
Expand All @@ -53,6 +71,17 @@ func (a *adaptedBlockstore) View(cid cid.Cid, callback func([]byte) error) error
return callback(blk.RawData())
}

func (a *adaptedBlockstore) DeleteMany(cids []cid.Cid) error {
for _, cid := range cids {
err := a.DeleteBlock(cid)
if err != nil {
return err
}
}

return nil
}

// Adapt adapts a standard blockstore to a Lotus blockstore by
// enriching it with the extra methods that Lotus requires (e.g. View, Sync).
//
Expand Down
8 changes: 8 additions & 0 deletions blockstore/buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func (bs *BufferedBlockstore) DeleteBlock(c cid.Cid) error {
return bs.write.DeleteBlock(c)
}

func (bs *BufferedBlockstore) DeleteMany(cids []cid.Cid) error {
if err := bs.read.DeleteMany(cids); err != nil {
return err
}

return bs.write.DeleteMany(cids)
}

func (bs *BufferedBlockstore) View(c cid.Cid, callback func([]byte) error) error {
// both stores are viewable.
if err := bs.write.View(c, callback); err == ErrNotFound {
Expand Down
174 changes: 174 additions & 0 deletions blockstore/idstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package blockstore

import (
"context"
"io"

"golang.org/x/xerrors"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)

var _ Blockstore = (*idstore)(nil)

type idstore struct {
bs Blockstore
}

func NewIDStore(bs Blockstore) Blockstore {
return &idstore{bs: bs}
}

func decodeCid(cid cid.Cid) (inline bool, data []byte, err error) {
if cid.Prefix().MhType != mh.IDENTITY {
return false, nil, nil
}

dmh, err := mh.Decode(cid.Hash())
if err != nil {
return false, nil, err
}

if dmh.Code == mh.IDENTITY {
return true, dmh.Digest, nil
}

return false, nil, err
}

func (b *idstore) Has(cid cid.Cid) (bool, error) {
inline, _, err := decodeCid(cid)
if err != nil {
return false, xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return true, nil
}

return b.bs.Has(cid)
}

func (b *idstore) Get(cid cid.Cid) (blocks.Block, error) {
inline, data, err := decodeCid(cid)
if err != nil {
return nil, xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return blocks.NewBlockWithCid(data, cid)
}

return b.bs.Get(cid)
}

func (b *idstore) GetSize(cid cid.Cid) (int, error) {
inline, data, err := decodeCid(cid)
if err != nil {
return 0, xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return len(data), err
}

return b.bs.GetSize(cid)
}

func (b *idstore) View(cid cid.Cid, cb func([]byte) error) error {
inline, data, err := decodeCid(cid)
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return cb(data)
}

return b.bs.View(cid, cb)
}

func (b *idstore) Put(blk blocks.Block) error {
inline, _, err := decodeCid(blk.Cid())
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return nil
}

return b.bs.Put(blk)
}

func (b *idstore) PutMany(blks []blocks.Block) error {
toPut := make([]blocks.Block, 0, len(blks))
for _, blk := range blks {
inline, _, err := decodeCid(blk.Cid())
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
continue
}
toPut = append(toPut, blk)
}

if len(toPut) > 0 {
return b.bs.PutMany(toPut)
}

return nil
}

func (b *idstore) DeleteBlock(cid cid.Cid) error {
inline, _, err := decodeCid(cid)
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return nil
}

return b.bs.DeleteBlock(cid)
}

func (b *idstore) DeleteMany(cids []cid.Cid) error {
toDelete := make([]cid.Cid, 0, len(cids))
for _, cid := range cids {
inline, _, err := decodeCid(cid)
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
continue
}
toDelete = append(toDelete, cid)
}

if len(toDelete) > 0 {
return b.bs.DeleteMany(toDelete)
}

return nil
}

func (b *idstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.bs.AllKeysChan(ctx)
}

func (b *idstore) HashOnRead(enabled bool) {
b.bs.HashOnRead(enabled)
}

func (b *idstore) Close() error {
if c, ok := b.bs.(io.Closer); ok {
return c.Close()
}
return nil
}
7 changes: 7 additions & 0 deletions blockstore/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ func (m MemBlockstore) DeleteBlock(k cid.Cid) error {
return nil
}

func (m MemBlockstore) DeleteMany(ks []cid.Cid) error {
for _, k := range ks {
delete(m, k)
}
return nil
}

func (m MemBlockstore) Has(k cid.Cid) (bool, error) {
_, ok := m[k]
return ok, nil
Expand Down
5 changes: 5 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func (s *SplitStore) DeleteBlock(_ cid.Cid) error {
return errors.New("DeleteBlock not implemented on SplitStore; don't do this Luke!") //nolint
}

func (s *SplitStore) DeleteMany(_ []cid.Cid) error {
// afaict we don't seem to be using this method, so it's not implemented
return errors.New("DeleteMany not implemented on SplitStore; don't do this Luke!") //nolint
}

func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
has, err := s.hot.Has(cid)

Expand Down
6 changes: 6 additions & 0 deletions blockstore/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func (m *SyncBlockstore) DeleteBlock(k cid.Cid) error {
return m.bs.DeleteBlock(k)
}

func (m *SyncBlockstore) DeleteMany(ks []cid.Cid) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.DeleteMany(ks)
}

func (m *SyncBlockstore) Has(k cid.Cid) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
Expand Down
6 changes: 6 additions & 0 deletions blockstore/timed.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func (t *TimedCacheBlockstore) DeleteBlock(k cid.Cid) error {
return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k))
}

func (t *TimedCacheBlockstore) DeleteMany(ks []cid.Cid) error {
t.mu.Lock()
defer t.mu.Unlock()
return multierr.Combine(t.active.DeleteMany(ks), t.inactive.DeleteMany(ks))
}

func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) {
t.mu.RLock()
defer t.mu.RUnlock()
Expand Down

0 comments on commit 5c13901

Please sign in to comment.