Skip to content

Commit

Permalink
Merge pull request #5755 from filecoin-project/feat/kvlog
Browse files Browse the repository at this point in the history
Metadata datastore log
  • Loading branch information
magik6k authored Mar 10, 2021
2 parents cc490b9 + 3f1054d commit bd8864a
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 18 deletions.
5 changes: 4 additions & 1 deletion cli/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func BackupCmd(repoFlag string, rt repo.RepoType, getApi BackupApiFn) *cli.Comma
return xerrors.Errorf("getting metadata datastore: %w", err)
}

bds := backupds.Wrap(mds)
bds, err := backupds.Wrap(mds, backupds.NoLogdir)
if err != nil {
return err
}

fpath, err := homedir.Expand(cctx.Args().First())
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"fmt"
"os"

"github.com/filecoin-project/lotus/chain/market"

gen "github.com/whyrusleeping/cbor-gen"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/paychmgr"
)
Expand Down Expand Up @@ -105,4 +105,12 @@ func main() {
fmt.Println(err)
os.Exit(1)
}

err = gen.WriteTupleEncodersToFile("./lib/backupds/cbor_gen.go", "backupds",
backupds.Entry{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
157 changes: 157 additions & 0 deletions lib/backupds/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 59 additions & 6 deletions lib/backupds/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,50 @@ import (
"crypto/sha256"
"io"
"sync"
"time"

logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/multierr"
"golang.org/x/xerrors"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
)

var log = logging.Logger("backupds")

const NoLogdir = ""

type Datastore struct {
child datastore.Batching

backupLk sync.RWMutex

log chan Entry
closing, closed chan struct{}
}

func Wrap(child datastore.Batching) *Datastore {
return &Datastore{
type Entry struct {
Key, Value []byte
Timestamp int64
}

func Wrap(child datastore.Batching, logdir string) (*Datastore, error) {
ds := &Datastore{
child: child,
}

if logdir != NoLogdir {
ds.closing, ds.closed = make(chan struct{}), make(chan struct{})
ds.log = make(chan Entry)

if err := ds.startLog(logdir); err != nil {
return nil, err
}
}

return ds, nil
}

// Writes a datastore dump into the provided writer as
Expand Down Expand Up @@ -129,6 +152,14 @@ func (d *Datastore) Put(key datastore.Key, value []byte) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()

if d.log != nil {
d.log <- Entry{
Key: []byte(key.String()),
Value: value,
Timestamp: time.Now().Unix(),
}
}

return d.child.Put(key, value)
}

Expand All @@ -146,11 +177,23 @@ func (d *Datastore) Sync(prefix datastore.Key) error {
return d.child.Sync(prefix)
}

func (d *Datastore) Close() error {
func (d *Datastore) CloseLog() error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()

return d.child.Close()
if d.closing != nil {
close(d.closing)
<-d.closed
}

return nil
}

func (d *Datastore) Close() error {
return multierr.Combine(
d.child.Close(),
d.CloseLog(),
)
}

func (d *Datastore) Batch() (datastore.Batch, error) {
Expand All @@ -160,17 +203,27 @@ func (d *Datastore) Batch() (datastore.Batch, error) {
}

return &bbatch{
d: d,
b: b,
rlk: d.backupLk.RLocker(),
}, nil
}

type bbatch struct {
d *Datastore
b datastore.Batch
rlk sync.Locker
}

func (b *bbatch) Put(key datastore.Key, value []byte) error {
if b.d.log != nil {
b.d.log <- Entry{
Key: []byte(key.String()),
Value: value,
Timestamp: time.Now().Unix(),
}
}

return b.b.Put(key, value)
}

Expand Down
Loading

0 comments on commit bd8864a

Please sign in to comment.