Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist and recover shard states. #41

Merged
merged 7 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ import (
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
)

var (
// StoreNamespace is the namespace under which shard state will be persisted.
StoreNamespace = ds.NewKey("dagstore")
)

var log = logging.Logger("dagstore")

var (
Expand All @@ -39,6 +47,7 @@ type DAGStore struct {
shards map[shard.Key]*Shard
config Config
indices index.FullIndexRepo
store ds.Datastore

// externalCh receives external tasks.
externalCh chan *task
Expand Down Expand Up @@ -75,28 +84,27 @@ type ShardResult struct {
}

type Config struct {
// ScrapRoot is the path to the scratch space, where local copies of
// remote mounts are saved.
ScratchSpaceDir string
// TransientsDir is the path to directory where local transient files will
// be created for remote mounts.
TransientsDir string

// IndexDir is the path where indices are stored.
IndexDir string

// Datastore is the datastore where shard state will be persisted.
Datastore ds.Datastore

// MountTypes are the recognized mount types, bound to their corresponding
// URL schemes.
MountTypes map[string]mount.Type
// MountRegistry contains the set of recognized mount types.
MountRegistry *mount.Registry
}

// NewDAGStore constructs a new DAG store with the supplied configuration.
func NewDAGStore(cfg Config) (*DAGStore, error) {
// validate and manage scratch root directory.
if cfg.ScratchSpaceDir == "" {
if cfg.TransientsDir == "" {
return nil, fmt.Errorf("missing scratch area root path")
}
if err := ensureDir(cfg.ScratchSpaceDir); err != nil {
if err := ensureDir(cfg.TransientsDir); err != nil {
return nil, fmt.Errorf("failed to create scratch root dir: %w", err)
}

Expand All @@ -119,25 +127,23 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
// handle the datastore.
if cfg.Datastore == nil {
log.Warnf("no datastore provided; falling back to in-mem datastore; shard state will not survive restarts")
cfg.Datastore = ds.NewMapDatastore()
cfg.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) // TODO can probably remove mutex wrap, since access is single-threaded
}

// create the registry and register all mount types.
mounts := mount.NewRegistry()
for scheme, typ := range cfg.MountTypes {
if err := mounts.Register(scheme, typ); err != nil {
return nil, fmt.Errorf("failed to register mount factory: %w", err)
}
}
// namespace all store operations.
cfg.Datastore = namespace.Wrap(cfg.Datastore, StoreNamespace)

// TODO: recover persisted shard state from the Datastore.
if cfg.MountRegistry == nil {
cfg.MountRegistry = mount.NewRegistry()
}

ctx, cancel := context.WithCancel(context.Background())
dagst := &DAGStore{
mounts: mounts,
mounts: cfg.MountRegistry,
config: cfg,
indices: indices,
shards: make(map[shard.Key]*Shard),
store: cfg.Datastore,
externalCh: make(chan *task, 128), // len=128, concurrent external tasks that can be queued up before exercising backpressure.
internalCh: make(chan *task, 1), // len=1, because eventloop will only ever stage another internal event.
completionCh: make(chan *task, 64), // len=64, hitting this limit will just make async tasks wait.
Expand All @@ -146,6 +152,24 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
cancelFn: cancel,
}

if err := dagst.restoreState(); err != nil {
// TODO add a lenient mode.
return nil, fmt.Errorf("failed to restore dagstore state: %w", err)
}

// reset in-progress states.
for _, s := range dagst.shards {
if s.state == ShardStateServing {
// no active acquirers at start.
s.state = ShardStateAvailable
}
if s.state == ShardStateInitializing {
// restart the registration.
s.state = ShardStateNew
_ = dagst.queueTask(&task{op: OpShardRegister, shard: s}, dagst.externalCh)
}
}

dagst.wg.Add(1)
go dagst.control()

Expand All @@ -157,7 +181,7 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {

type RegisterOpts struct {
// ExistingTransient can be supplied when registering a shard to indicate that
// there's already an existing local transient local that can be used for
// there's already an existing local transient copy that can be used for
// indexing.
ExistingTransient string
}
Expand All @@ -174,7 +198,7 @@ func (d *DAGStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.M
return fmt.Errorf("%s: %w", key.String(), ErrShardExists)
}

upgraded, err := mount.Upgrade(mnt, opts.ExistingTransient)
upgraded, err := mount.Upgrade(mnt, d.config.TransientsDir, opts.ExistingTransient)
if err != nil {
d.lk.Unlock()
return err
Expand All @@ -184,6 +208,7 @@ func (d *DAGStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.M

// add the shard to the shard catalogue, and drop the lock.
s := &Shard{
d: d,
key: key,
state: ShardStateNew,
mount: upgraded,
Expand Down Expand Up @@ -264,6 +289,7 @@ func (d *DAGStore) AllShardsInfo() AllShardsInfo {
func (d *DAGStore) Close() error {
d.cancelFn()
d.wg.Wait()
_ = d.store.Sync(ds.Key{})
return nil
}

Expand All @@ -276,6 +302,25 @@ func (d *DAGStore) queueTask(tsk *task, ch chan<- *task) error {
}
}

func (d *DAGStore) restoreState() error {
results, err := d.store.Query(query.Query{})
if err != nil {
return fmt.Errorf("failed to recover dagstore state from store: %w", err)
}
for {
res, ok := results.NextSync()
if !ok {
return nil
}
s := &Shard{d: d}
if err := s.UnmarshalJSON(res.Value); err != nil {
log.Warnf("failed to recover state of shard %s: %s; skipping", shard.KeyFromString(res.Key), err)
continue
}
d.shards[s.key] = s
}
}

// ensureDir checks whether the specified path is a directory, and if not it
// attempts to create it.
func ensureDir(path string) error {
Expand Down
21 changes: 17 additions & 4 deletions dagstore_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ func (o OpType) String() string {
func (d *DAGStore) control() {
defer d.wg.Done()

tsk, err := d.consumeNext()
for ; err == nil; tsk, err = d.consumeNext() {
var (
tsk *task
err error
)

for {
if tsk, err = d.consumeNext(); err != nil {
break
}

log.Debugw("processing task", "op", tsk.op, "shard", tsk.shard.key, "error", tsk.err)

s := tsk.shard
Expand Down Expand Up @@ -76,6 +84,7 @@ func (d *DAGStore) control() {

s.state = ShardStateServing
s.refs++

go d.acquireAsync(tsk.ctx, w, s, s.mount)

case OpShardRelease:
Expand Down Expand Up @@ -128,8 +137,12 @@ func (d *DAGStore) control() {

}

s.lk.Unlock()
// persist the current shard state.
if err := s.persist(d.config.Datastore); err != nil { // TODO maybe fail shard?
log.Warnw("failed to persist shard", "shard", s.key, "error", err)
}

s.lk.Unlock()
}

if err != context.Canceled {
Expand All @@ -152,6 +165,6 @@ func (d *DAGStore) consumeNext() (tsk *task, error error) {
case tsk = <-d.completionCh:
return tsk, nil
case <-d.ctx.Done():
return // TODO drain and process before returning?
return nil, d.ctx.Err() // TODO drain and process before returning?
}
}
Loading