Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial implementation of the dagstore. #20

Merged
merged 17 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions accessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package dagstore

import (
"context"
"fmt"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-car/v2/index"
)

// ReadBlockstore is a read-only view of Blockstores. This will be implemented
// by the CARv2 indexed blockstore.
type ReadBlockstore interface {
Has(cid.Cid) (bool, error)
Get(cid.Cid) (blocks.Block, error)
GetSize(cid.Cid) (int, error)
AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
HashOnRead(enabled bool)
}

// ShardAccessor provides various means to access the data contained
// in a shard.
type ShardAccessor struct {
key shard.Key
data *car.Reader
idx index.Index
}

func NewShardAccessor(key shard.Key, data mount.Reader, idx index.Index) (*ShardAccessor, error) {
reader, err := car.NewReader(data)
if err != nil {
return nil, fmt.Errorf("failed to determine car version when opening shard accessor: %w", err)
}

return &ShardAccessor{
key: key,
data: reader,
idx: idx,
}, nil
}

func (sa *ShardAccessor) Shard() shard.Key {
return sa.key
}

func (sa *ShardAccessor) Blockstore() (ReadBlockstore, error) {
bs := blockstore.ReadOnlyOf(sa.data.CarV1Reader(), sa.idx)
return bs, nil
}

// Close terminates this shard accessor, releasing any resources associated
// with it, and decrementing internal refcounts.
func (sa *ShardAccessor) Close() error {
return sa.data.Close()
}
270 changes: 270 additions & 0 deletions dagstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package dagstore

import (
"context"
"errors"
"fmt"
"os"
"sync"

"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("dagstore")

var (
// ErrShardUnknown is the error returned when the requested shard is
// not known to the DAG store.
ErrShardUnknown = errors.New("shard not found")

// ErrShardExists is the error returned upon registering a duplicate shard.
ErrShardExists = errors.New("shard already exists")

// ErrShardInitializationFailed is returned when shard initialization fails.
ErrShardInitializationFailed = errors.New("shard initialization failed")

// ErrShardInUse is returned when the user attempts to destroy a shard that
// is in use.
ErrShardInUse = errors.New("shard in use")
)

// DAGStore is the central object of the DAG store.
type DAGStore struct {
lk sync.RWMutex
mounts *mount.Registry
shards map[shard.Key]*Shard
config Config
indices index.FullIndexRepo

// externalCh receives external tasks.
externalCh chan *task
// internalCh receives internal tasks to the event loop.
internalCh chan *task
// completionCh receives tasks queued up as a result of async completions.
completionCh chan *task
// dispatchCh is serviced by the dispatcher goroutine.
dispatchCh chan *dispatch

ctx context.Context
cancelFn context.CancelFunc
wg sync.WaitGroup
}

type dispatch struct {
w *waiter
res *ShardResult
}

// Task represents an operation to be performed on a shard or the DAG store.
type task struct {
*waiter

op OpType
shard *Shard
err error
}

// ShardResult encapsulates a result from an asynchronous operation.
type ShardResult struct {
Key shard.Key
Error error
Accessor *ShardAccessor
}

type Config struct {
// ScrapRoot is the path to the scratch space, where local copies of
// remote mounts are saved.
ScratchSpaceDir string

// IndexDir is the path where indices are stored.
IndexDir string

// Datastore is the datastore where shard state will be persisted.
Datastore ds.Datastore

// MountTypes are the recognized mount types, bound to their corresponding
// URL schemes.
MountTypes map[string]mount.Type
}

// NewDAGStore constructs a new DAG store with the supplied configuration.
func NewDAGStore(cfg Config) (*DAGStore, error) {
// validate and manage scratch root directory.
if cfg.ScratchSpaceDir == "" {
return nil, fmt.Errorf("missing scratch area root path")
}
if err := ensureDir(cfg.ScratchSpaceDir); err != nil {
return nil, fmt.Errorf("failed to create scratch root dir: %w", err)
}

// instantiate the index repo.
var indices index.FullIndexRepo
if cfg.IndexDir == "" {
log.Info("using in-memory index store")
indices = index.NewMemoryRepo()
} else {
err := ensureDir(cfg.IndexDir)
if err != nil {
return nil, fmt.Errorf("failed to create index root dir: %w", err)
}
indices, err = index.NewFSRepo(cfg.IndexDir)
if err != nil {
return nil, fmt.Errorf("failed to instantiate full index repo: %w", err)
}
}

// handle the datastore.
if cfg.Datastore == nil {
log.Warnf("no datastore provided; falling back to in-mem datastore; shard state will not survive restarts")
cfg.Datastore = ds.NewMapDatastore()
}

// create the registry and register all mount types.
mounts := mount.NewRegistry()
for scheme, typ := range cfg.MountTypes {
if err := mounts.Register(scheme, typ); err != nil {
return nil, fmt.Errorf("failed to register mount factory: %w", err)
}
}

// TODO: recover persisted shard state from the Datastore.

ctx, cancel := context.WithCancel(context.Background())
dagst := &DAGStore{
mounts: mounts,
config: cfg,
indices: indices,
shards: make(map[shard.Key]*Shard),
externalCh: make(chan *task, 128), // len=128, concurrent external tasks that can be queued up before exercising backpressure.
internalCh: make(chan *task, 1), // len=1, because eventloop will only ever stage another internal event.
completionCh: make(chan *task, 64), // len=64, hitting this limit will just make async tasks wait.
dispatchCh: make(chan *dispatch, 128), // len=128, same as externalCh (input channel).
ctx: ctx,
cancelFn: cancel,
}

dagst.wg.Add(1)
go dagst.control()

dagst.wg.Add(1)
go dagst.dispatcher()

return dagst, nil
}

type RegisterOpts struct {
// ExistingTransient can be supplied when registering a shard to indicate that
// there's already an existing local transient local that can be used for
// indexing.
ExistingTransient string
}

// RegisterShard initiates the registration of a new shard.
//
// This method returns an error synchronously if preliminary validation fails.
// Otherwise, it queues the shard for registration. The caller should monitor
// supplied channel for a result.
func (d *DAGStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan ShardResult, opts RegisterOpts) error {
d.lk.Lock()
if _, ok := d.shards[key]; ok {
d.lk.Unlock()
return fmt.Errorf("%s: %w", key.String(), ErrShardExists)
}

upgraded, err := mount.Upgrade(mnt, opts.ExistingTransient)
if err != nil {
d.lk.Unlock()
return err
}

w := &waiter{outCh: out, ctx: ctx}

// add the shard to the shard catalogue, and drop the lock.
s := &Shard{
key: key,
state: ShardStateNew,
mount: upgraded,
wRegister: w,
}
d.shards[key] = s
d.lk.Unlock()

tsk := &task{op: OpShardRegister, shard: s, waiter: w}
return d.queueTask(tsk, d.externalCh)
}

type DestroyOpts struct {
}

func (d *DAGStore) DestroyShard(ctx context.Context, key shard.Key, out chan ShardResult, _ DestroyOpts) error {
d.lk.Lock()
s, ok := d.shards[key]
if !ok {
d.lk.Unlock()
return ErrShardUnknown // TODO: encode shard key
}
d.lk.Unlock()

tsk := &task{op: OpShardDestroy, shard: s, waiter: &waiter{ctx: ctx, outCh: out}}
return d.queueTask(tsk, d.externalCh)
}

type AcquireOpts struct {
}

// AcquireShard acquires access to the specified shard, and returns a
// ShardAccessor, an object that enables various patterns of access to the data
// contained within the shard.
//
// This operation may resolve near-instantaneosly if the shard is available
// locally. If not, the shard data may be fetched from its mount.
//
// This method returns an error synchronously if preliminary validation fails.
// Otherwise, it queues the shard for acquisition. The caller should monitor
// supplied channel for a result.
func (d *DAGStore) AcquireShard(ctx context.Context, key shard.Key, out chan ShardResult, _ AcquireOpts) error {
d.lk.Lock()
s, ok := d.shards[key]
if !ok {
d.lk.Unlock()
return fmt.Errorf("%s: %w", key.String(), ErrShardUnknown)
}
d.lk.Unlock()

tsk := &task{op: OpShardAcquire, shard: s, waiter: &waiter{ctx: ctx, outCh: out}}
return d.queueTask(tsk, d.externalCh)
}

func (d *DAGStore) Close() error {
d.cancelFn()
d.wg.Wait()
return nil
}

func (d *DAGStore) queueTask(tsk *task, ch chan<- *task) error {
select {
case <-d.ctx.Done():
return fmt.Errorf("dag store closed")
case ch <- tsk:
return nil
}
}

// ensureDir checks whether the specified path is a directory, and if not it
// attempts to create it.
func ensureDir(path string) error {
fi, err := os.Stat(path)
if err != nil {
// We need to create the directory.
return os.MkdirAll(path, os.ModePerm)
}

if !fi.IsDir() {
return fmt.Errorf("path %s exists, and it is not a directory", path)
}
return nil
}
Loading