Skip to content

Commit

Permalink
Merge pull request #41 from filecoin-project/raulk/persist
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Jul 7, 2021
2 parents 265dfb1 + 31b64b8 commit 649a9e7
Show file tree
Hide file tree
Showing 16 changed files with 971 additions and 277 deletions.
85 changes: 65 additions & 20 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ import (
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
)

var (
// StoreNamespace is the namespace under which shard state will be persisted.
StoreNamespace = ds.NewKey("dagstore")
)

var log = logging.Logger("dagstore")

var (
Expand All @@ -39,6 +47,7 @@ type DAGStore struct {
shards map[shard.Key]*Shard
config Config
indices index.FullIndexRepo
store ds.Datastore

// externalCh receives external tasks.
externalCh chan *task
Expand Down Expand Up @@ -75,28 +84,27 @@ type ShardResult struct {
}

type Config struct {
// ScrapRoot is the path to the scratch space, where local copies of
// remote mounts are saved.
ScratchSpaceDir string
// TransientsDir is the path to directory where local transient files will
// be created for remote mounts.
TransientsDir string

// IndexDir is the path where indices are stored.
IndexDir string

// Datastore is the datastore where shard state will be persisted.
Datastore ds.Datastore

// MountTypes are the recognized mount types, bound to their corresponding
// URL schemes.
MountTypes map[string]mount.Type
// MountRegistry contains the set of recognized mount types.
MountRegistry *mount.Registry
}

// NewDAGStore constructs a new DAG store with the supplied configuration.
func NewDAGStore(cfg Config) (*DAGStore, error) {
// validate and manage scratch root directory.
if cfg.ScratchSpaceDir == "" {
if cfg.TransientsDir == "" {
return nil, fmt.Errorf("missing scratch area root path")
}
if err := ensureDir(cfg.ScratchSpaceDir); err != nil {
if err := ensureDir(cfg.TransientsDir); err != nil {
return nil, fmt.Errorf("failed to create scratch root dir: %w", err)
}

Expand All @@ -119,25 +127,23 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
// handle the datastore.
if cfg.Datastore == nil {
log.Warnf("no datastore provided; falling back to in-mem datastore; shard state will not survive restarts")
cfg.Datastore = ds.NewMapDatastore()
cfg.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) // TODO can probably remove mutex wrap, since access is single-threaded
}

// create the registry and register all mount types.
mounts := mount.NewRegistry()
for scheme, typ := range cfg.MountTypes {
if err := mounts.Register(scheme, typ); err != nil {
return nil, fmt.Errorf("failed to register mount factory: %w", err)
}
}
// namespace all store operations.
cfg.Datastore = namespace.Wrap(cfg.Datastore, StoreNamespace)

// TODO: recover persisted shard state from the Datastore.
if cfg.MountRegistry == nil {
cfg.MountRegistry = mount.NewRegistry()
}

ctx, cancel := context.WithCancel(context.Background())
dagst := &DAGStore{
mounts: mounts,
mounts: cfg.MountRegistry,
config: cfg,
indices: indices,
shards: make(map[shard.Key]*Shard),
store: cfg.Datastore,
externalCh: make(chan *task, 128), // len=128, concurrent external tasks that can be queued up before exercising backpressure.
internalCh: make(chan *task, 1), // len=1, because eventloop will only ever stage another internal event.
completionCh: make(chan *task, 64), // len=64, hitting this limit will just make async tasks wait.
Expand All @@ -146,6 +152,24 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
cancelFn: cancel,
}

if err := dagst.restoreState(); err != nil {
// TODO add a lenient mode.
return nil, fmt.Errorf("failed to restore dagstore state: %w", err)
}

// reset in-progress states.
for _, s := range dagst.shards {
if s.state == ShardStateServing {
// no active acquirers at start.
s.state = ShardStateAvailable
}
if s.state == ShardStateInitializing {
// restart the registration.
s.state = ShardStateNew
_ = dagst.queueTask(&task{op: OpShardRegister, shard: s}, dagst.externalCh)
}
}

dagst.wg.Add(1)
go dagst.control()

Expand All @@ -157,7 +181,7 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {

type RegisterOpts struct {
// ExistingTransient can be supplied when registering a shard to indicate that
// there's already an existing local transient local that can be used for
// there's already an existing local transient copy that can be used for
// indexing.
ExistingTransient string
}
Expand All @@ -174,7 +198,7 @@ func (d *DAGStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.M
return fmt.Errorf("%s: %w", key.String(), ErrShardExists)
}

upgraded, err := mount.Upgrade(mnt, opts.ExistingTransient)
upgraded, err := mount.Upgrade(mnt, d.config.TransientsDir, opts.ExistingTransient)
if err != nil {
d.lk.Unlock()
return err
Expand All @@ -184,6 +208,7 @@ func (d *DAGStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.M

// add the shard to the shard catalogue, and drop the lock.
s := &Shard{
d: d,
key: key,
state: ShardStateNew,
mount: upgraded,
Expand Down Expand Up @@ -264,6 +289,7 @@ func (d *DAGStore) AllShardsInfo() AllShardsInfo {
func (d *DAGStore) Close() error {
d.cancelFn()
d.wg.Wait()
_ = d.store.Sync(ds.Key{})
return nil
}

Expand All @@ -276,6 +302,25 @@ func (d *DAGStore) queueTask(tsk *task, ch chan<- *task) error {
}
}

func (d *DAGStore) restoreState() error {
results, err := d.store.Query(query.Query{})
if err != nil {
return fmt.Errorf("failed to recover dagstore state from store: %w", err)
}
for {
res, ok := results.NextSync()
if !ok {
return nil
}
s := &Shard{d: d}
if err := s.UnmarshalJSON(res.Value); err != nil {
log.Warnf("failed to recover state of shard %s: %s; skipping", shard.KeyFromString(res.Key), err)
continue
}
d.shards[s.key] = s
}
}

// ensureDir checks whether the specified path is a directory, and if not it
// attempts to create it.
func ensureDir(path string) error {
Expand Down
21 changes: 17 additions & 4 deletions dagstore_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ func (o OpType) String() string {
func (d *DAGStore) control() {
defer d.wg.Done()

tsk, err := d.consumeNext()
for ; err == nil; tsk, err = d.consumeNext() {
var (
tsk *task
err error
)

for {
if tsk, err = d.consumeNext(); err != nil {
break
}

log.Debugw("processing task", "op", tsk.op, "shard", tsk.shard.key, "error", tsk.err)

s := tsk.shard
Expand Down Expand Up @@ -76,6 +84,7 @@ func (d *DAGStore) control() {

s.state = ShardStateServing
s.refs++

go d.acquireAsync(tsk.ctx, w, s, s.mount)

case OpShardRelease:
Expand Down Expand Up @@ -128,8 +137,12 @@ func (d *DAGStore) control() {

}

s.lk.Unlock()
// persist the current shard state.
if err := s.persist(d.config.Datastore); err != nil { // TODO maybe fail shard?
log.Warnw("failed to persist shard", "shard", s.key, "error", err)
}

s.lk.Unlock()
}

if err != context.Canceled {
Expand All @@ -152,6 +165,6 @@ func (d *DAGStore) consumeNext() (tsk *task, error error) {
case tsk = <-d.completionCh:
return tsk, nil
case <-d.ctx.Done():
return // TODO drain and process before returning?
return nil, d.ctx.Err() // TODO drain and process before returning?
}
}
Loading

0 comments on commit 649a9e7

Please sign in to comment.