Skip to content

Commit

Permalink
Merge pull request #52 from filecoin-project/raulk/implement-gc
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored Jul 12, 2021
2 parents 2629bb9 + 9d99412 commit 939c620
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 6 deletions.
54 changes: 54 additions & 0 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/hashicorp/go-multierror"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -331,6 +332,59 @@ func (d *DAGStore) AllShardsInfo() AllShardsInfo {
return ret
}

// GC attempts to reclaim the transient files of shards that are currently
// available but inactive.
//
// It is not strictly atomic for now, as it determines which shards to reclaim
// first, sends operations to the event loop, and waits for them to execute.
// In the meantime, there could be state transitions that change reclaimability
// of shards (some shards deemed reclaimable are no longer so, and vice versa).
//
// However, the event loop checks for safety prior to deletion, so it will skip
// over shards that are no longer safe to delete.
func (d *DAGStore) GC(ctx context.Context) (map[shard.Key]error, error) {
var (
merr *multierror.Error
reclaim []*Shard
)

d.lk.RLock()
for _, s := range d.shards {
s.lk.RLock()
if s.state == ShardStateAvailable || s.state == ShardStateErrored {
reclaim = append(reclaim, s)
}
s.lk.RUnlock()
}
d.lk.RUnlock()

var await int
ch := make(chan ShardResult, len(reclaim))
for _, s := range reclaim {
tsk := &task{op: OpShardGC, shard: s, waiter: &waiter{ctx: ctx, outCh: ch}}

err := d.queueTask(tsk, d.externalCh)
if err == nil {
await++
} else {
merr = multierror.Append(merr, fmt.Errorf("failed to enqueue GC task for shard %s: %w", s.key, err))
}
}

// collect all results.
results := make(map[shard.Key]error, await)
for i := 0; i < await; i++ {
select {
case res := <-ch:
results[res.Key] = res.Error
case <-ctx.Done():
return results, ctx.Err()
}
}

return results, nil
}

func (d *DAGStore) Close() error {
d.cancelFn()
d.wg.Wait()
Expand Down
14 changes: 13 additions & 1 deletion dagstore_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
OpShardAcquire
OpShardFail
OpShardRelease
OpShardGC
)

func (o OpType) String() string {
Expand All @@ -23,7 +24,8 @@ func (o OpType) String() string {
"OpShardDestroy",
"OpShardAcquire",
"OpShardFail",
"OpShardRelease"}[o]
"OpShardRelease",
"OpShardGC"}[o]
}

// control runs the DAG store's event loop.
Expand Down Expand Up @@ -132,6 +134,7 @@ func (d *DAGStore) control() {
Error: fmt.Errorf("failed to register shard: %w", tsk.err),
}
d.sendResult(res, s.wRegister)
s.wRegister = nil
}

// fail waiting acquirers.
Expand Down Expand Up @@ -171,6 +174,15 @@ func (d *DAGStore) control() {
d.lk.Unlock()
// TODO are we guaranteed that there are no queued items for this shard?

case OpShardGC:
var err error
if s.state == ShardStateAvailable || s.state == ShardStateErrored {
err = s.mount.DeleteTransient()
} else {
err = fmt.Errorf("ignored request to GC shard in state: %d", s.state)
}
res := &ShardResult{Key: s.key, Error: err}
d.sendResult(res, tsk.waiter)
}

// persist the current shard state.
Expand Down
45 changes: 40 additions & 5 deletions dagstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,46 @@ func TestRestartResumesRegistration(t *testing.T) {
require.Equal(t, ShardStateAvailable, traces[1].After.ShardState)
}

func TestGC(t *testing.T) {
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
})
require.NoError(t, err)

// register 100 shards
// acquire 25 with 5 acquirers, release 2 acquirers (refcount 3); non reclaimable
// acquire another 25, release them all, they're reclaimable
shards := registerShards(t, dagst, 100, carv2mnt)
for _, k := range shards[0:25] {
accessors := acquireShard(t, dagst, k, 5)
for _, acc := range accessors[:2] {
err := acc.Close()
require.NoError(t, err)
}
}
for _, k := range shards[25:50] {
accessors := acquireShard(t, dagst, k, 5)
releaseAll(t, dagst, k, accessors)
}

results, err := dagst.GC(context.Background())
require.NoError(t, err)
require.Len(t, results, 75) // all but the second batch of 25 have been reclaimed.

var keys []string
for k, err := range results {
keys = append(keys, k.String())
require.NoError(t, err)
}

var expect []string
for i := 25; i < 100; i++ {
expect = append(expect, fmt.Sprintf("shard-%d", i))
}
require.ElementsMatch(t, expect, keys)
}

// TestBlockCallback tests that blocking a callback blocks the dispatcher
// but not the event loop.
func TestBlockCallback(t *testing.T) {
Expand Down Expand Up @@ -479,11 +519,6 @@ func releaseAll(t *testing.T, dagst *DAGStore, k shard.Key, accs []*ShardAccesso
return err == nil && info.ShardState == ShardStateAvailable && info.refs == 0
}, 5*time.Second, 100*time.Millisecond)

// // refs should be zero now since shard accessors have been closed and transient file should be cleaned up.
// require.Zero(t, info.refs)
// _, err = os.Stat(abs)
// require.Error(t, err)
//
}

func testRegistry(t *testing.T) *mount.Registry {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/hashicorp/go-multierror v1.0.0
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.0.8-0.20210702173502-41f2377d9672
github.com/ipfs/go-datastore v0.4.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfm
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
Expand Down

0 comments on commit 939c620

Please sign in to comment.