-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial implementation of the dagstore. #20
Conversation
I suggest that the public API accepts contexts to facilitate request tracing, metrics and logging |
if stat, err := u.underlying.Stat(ctx); err != nil { | ||
return fmt.Errorf("underlying mount stat returned error: %w", err) | ||
} else if !stat.Exists { | ||
return fmt.Errorf("underlying mount no longer exists") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rely on Fetch to return these errors if the file doesn't exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rationale?
if u.transient != "" { | ||
if _, err := os.Stat(u.transient); err == nil { | ||
return os.Open(u.transient) | ||
} | ||
} | ||
|
||
// transient appears to be dead, refetch. | ||
if err := u.refetch(ctx); err != nil { | ||
return nil, err | ||
} | ||
|
||
return os.Open(u.transient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels a little racy - can we return a file handle from u.refetch()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's racy? The body of this method and refetch are both performed under a lock.
We now have three "queues": 1. internal: tasks emitted by the event loop, buffer=1. 2. completion: tasks emitted by asynchronous jobs. 3. external: intake of external tasks through public API. We now drain internal first; then move on to completion and external.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raulk First review. Looking great ! I think I'm convinced the caller having to pass in the (context, result channel) is the way to go.
dagstore.go
Outdated
case OpShardIndex: | ||
s.state = ShardStateIndexing | ||
go func(ctx context.Context, mnt mount.Mount) { | ||
reader, err := mnt.Fetch(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the WaitGroup
to refcount all go-routines spawned by the DAG Store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we want to do this for async goroutines, as not all operations are cancellable (e.g. CARv2 stuff).
dagstore.go
Outdated
shards: make(map[shard.Key]*Shard), | ||
externalCh: make(chan *Task, 128), // len=128, concurrent external tasks that can be queued up before putting backpressure. | ||
internalCh: make(chan *Task, 1), // len=1, because eventloop will only ever stage another internal event. | ||
completionCh: make(chan *Task, 64), // len=64, hitting this limit will just make async tasks wait. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a separate completionCh
to send requests produced by async computations to the control loop when we can probably use the externalCh
for it ? Is it to make the code easy to reason about ? I don't really see a difference between the two in terms of use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it right now, but it separates concerns nicely, and makes the code more future proof in case we do need to process tasks differently going forward.
dagstore.go
Outdated
|
||
func (d *DAGStore) acquireAsync(acqCh chan ShardResult, s *Shard, mnt mount.Mount) { | ||
k := s.key | ||
reader, err := mnt.Fetch(d.ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity:
Even if the Shard is remote, this will not always result in actually fetching the Shard data from a remote location as the upgraded mount here serves the retrieval from a local transient copy if it can before resorting to fetching from a remote loc.
dagstore.go
Outdated
|
||
case OpShardFetchDone: | ||
s.state = ShardStateFetched | ||
if !s.indexed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What purpose does this indexed
field serve ? Is the idea that Indexes will be kept around even when shards are destroyed and therefore we wouldn't want to reindex on re-registration ? If that is the case, till when will we keep those Indices around ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I conceived the Fetching
state as a general purpose state for when the shard's contents are being fetched from the mount. That means that when the shard has been registered completely, but the local transient goes missing, and a request is made, the shard would move to this state. Not sure if we'll end up using it this way after all. We can revisit later.
bb451ec
to
3c2f727
Compare
We have enough feedback here. I'm going to merge this PR and we can iterate on it. There's plenty to do still. |
🐉🐉🐉🐉🐉🐉 BEWARE OF DRAGONS!
This PR contributes an initial implementation of the DAG store.
Concurrency model
The DAG store operates on an event loop to perform mutations to the shard state. This is important as it allows shard operations to be non-blocking (although we can provide a blocking stub on top of this async plumbing). This means that the user does not need to worry about for a shard registration to have returned before they can go and ask to acquire that shard. I explored numerous alternatives and discarded them, such as:
Finite state machine
Shards to go through a finite state machine. It would be worth to formalise this FSM.
Public API
The public API is quite simple:
The caller supplies a channel where they want the result returned, when available. This is much better and flexibile than having the DAG store return a channel, since it enables the caller to send all results to a single channel with a single goroutine servicing it.
The caller can also pass a per-call channel if they so desire. Moreover, we can easily create sugar
*Sync()
method counterparts that do this behind the scenes, giving the appearance of a synchronous interface.Mount upgrader
This PR contributes a "mount upgrader". This is in turn a Mount that acts as a locally cached copy of a remote mount (e.g. Lotus, HTTP, FTP, NFS, etc.)
Indices
I was able to remove the
FullIndex
abstraction, since the CAR library already provides anindex.Index
interface that we can use directly.TODO
This is an initial, unstable, buggy implementation to get the ball rolling. There are many features missing here, such as:
mount.Upgrader
. Aka "scrap area".Proper backpressure management (backlogged task channel).