Skip to content

Commit

Permalink
Merge pull request #90 from filecoin-project/feat/fetch-reserve-space
Browse files Browse the repository at this point in the history
remote: Fetch storage reservation
  • Loading branch information
magik6k committed Aug 5, 2020
2 parents 7c3197d + 77e4adb commit b7dca7f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 38 deletions.
6 changes: 5 additions & 1 deletion fsutil/filesize_unix.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fsutil

import (
"os"
"syscall"

"golang.org/x/xerrors"
Expand All @@ -14,12 +15,15 @@ type SizeInfo struct {
func FileSize(path string) (SizeInfo, error) {
var stat syscall.Stat_t
if err := syscall.Stat(path, &stat); err != nil {
if err == syscall.ENOENT {
return SizeInfo{}, os.ErrNotExist
}
return SizeInfo{}, xerrors.Errorf("stat: %w", err)
}

// NOTE: stat.Blocks is in 512B blocks, NOT in stat.Blksize
// See https://www.gnu.org/software/libc/manual/html_node/Attribute-Meanings.html
return SizeInfo{
int64(stat.Blocks) * 512,
int64(stat.Blocks) * 512, // NOTE: int64 cast is needed on osx
}, nil
}
12 changes: 6 additions & 6 deletions selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
)

type existingSelector struct {
index stores.SectorIndex
sector abi.SectorID
alloc stores.SectorFileType
index stores.SectorIndex
sector abi.SectorID
alloc stores.SectorFileType
allowFetch bool
}

func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) *existingSelector {
return &existingSelector{
index: index,
sector: sector,
alloc: alloc,
index: index,
sector: sector,
alloc: alloc,
allowFetch: allowFetch,
}
}
Expand Down
8 changes: 4 additions & 4 deletions stores/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
type PathType string

const (
PathStorage = "storage"
PathSealing = "sealing"
PathStorage PathType = "storage"
PathSealing PathType = "sealing"
)

type AcquireMode string

const (
AcquireMove = "move"
AcquireCopy = "copy"
AcquireMove AcquireMode = "move"
AcquireCopy AcquireMode = "copy"
)

type Store interface {
Expand Down
21 changes: 17 additions & 4 deletions stores/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ type LocalStorage interface {
SetStorage(func(*StorageConfig)) error

Stat(path string) (fsutil.FsStat, error)
DiskUsage(path string) (int64, error) // returns real disk usage for a file/directory

// returns real disk usage for a file/directory
// os.ErrNotExit when file doesn't exist
DiskUsage(path string) (int64, error)
}

const MetaFile = "sectorstore.json"
Expand All @@ -77,7 +80,7 @@ type path struct {
func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
stat, err := ls.Stat(p.local)
if err != nil {
return fsutil.FsStat{}, err
return fsutil.FsStat{}, xerrors.Errorf("stat %s: %w", p.local, err)
}

stat.Reserved = p.reserved
Expand All @@ -88,7 +91,17 @@ func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
continue
}

used, err := ls.DiskUsage(p.sectorPath(id, fileType))
sp := p.sectorPath(id, fileType)

used, err := ls.DiskUsage(sp)
if err == os.ErrNotExist {
p, ferr := tempFetchDest(sp, false)
if ferr != nil {
return fsutil.FsStat{}, ferr
}

used, err = ls.DiskUsage(p)
}
if err != nil {
log.Errorf("getting disk usage of '%s': %+v", p.sectorPath(id, fileType), err)
continue
Expand Down Expand Up @@ -279,7 +292,7 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.Register

stat, err := p.stat(st.localStorage)
if err != nil {
return nil, err
return nil, xerrors.Errorf("getting local storage stat: %w", err)
}

overhead := int64(overheadTab[fileType]) * int64(ssize) / FSOverheadDen
Expand Down
71 changes: 48 additions & 23 deletions stores/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,33 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
}

var toFetch SectorFileType
for _, fileType := range PathTypes {
if fileType&existing == 0 {
continue
}

if PathByType(paths, fileType) == "" {
toFetch |= fileType
}
}

apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, toFetch, pathType, op)
if err != nil {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
}

odt := FSOverheadSeal
if pathType == PathStorage {
odt = FsOverheadFinalized
}

releaseStorage, err := r.local.Reserve(ctx, s, spt, toFetch, ids, odt)
if err != nil {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
}
defer releaseStorage()

for _, fileType := range PathTypes {
if fileType&existing == 0 {
continue
Expand All @@ -104,15 +131,18 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
continue
}

ap, storageID, url, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op)
dest := PathByType(apaths, fileType)
storageID := PathByType(ids, fileType)

url, err := r.acquireFromRemote(ctx, s, fileType, dest)
if err != nil {
return SectorPaths{}, SectorPaths{}, err
}

SetPathByType(&paths, fileType, ap)
SetPathByType(&stores, fileType, string(storageID))
SetPathByType(&paths, fileType, dest)
SetPathByType(&stores, fileType, storageID)

if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType, op == AcquireMove); err != nil {
if err := r.index.StorageDeclareSector(ctx, ID(storageID), s, fileType, op == AcquireMove); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
continue
}
Expand All @@ -127,49 +157,44 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
return paths, stores, nil
}

func tempDest(spath string) (string, error) {
func tempFetchDest(spath string, create bool) (string, error) {
st, b := filepath.Split(spath)
tempdir := filepath.Join(st, FetchTempSubdir)
if err := os.MkdirAll(tempdir, 755); err != nil {
return "", xerrors.Errorf("creating temp fetch dir: %w", err)
if create {
if err := os.MkdirAll(tempdir, 0755); err != nil {
return "", xerrors.Errorf("creating temp fetch dir: %w", err)
}
}

return filepath.Join(tempdir, b), nil
}

func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, error) {
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, dest string) (string, error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
if err != nil {
return "", "", "", err
return "", err
}

if len(si) == 0 {
return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound)
return "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound)
}

sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})

apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op)
if err != nil {
return "", "", "", xerrors.Errorf("allocate local sector for fetching: %w", err)
}
dest := PathByType(apaths, fileType)
storageID := PathByType(ids, fileType)

var merr error
for _, info := range si {
// TODO: see what we have local, prefer that

for _, url := range info.URLs {
tempDest, err := tempDest(dest)
tempDest, err := tempFetchDest(dest, true)
if err != nil {
return "", "", "", err
return "", err
}

if err := os.RemoveAll(dest); err != nil {
return "", "", "", xerrors.Errorf("removing dest: %w", err)
return "", xerrors.Errorf("removing dest: %w", err)
}

err = r.fetch(ctx, url, tempDest)
Expand All @@ -179,17 +204,17 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.
}

if err := move(tempDest, dest); err != nil {
return "", "", "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err)
return "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err)
}

if merr != nil {
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
}
return dest, ID(storageID), url, nil
return url, nil
}
}

return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
}

func (r *Remote) fetch(ctx context.Context, url, outname string) error {
Expand Down

0 comments on commit b7dca7f

Please sign in to comment.