Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage sectors by size instead of proof type. #4511

Merged
merged 1 commit into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,18 +300,18 @@ type StorageMinerStruct struct {

SealingSchedDiag func(context.Context) (interface{}, error) `perm:"admin"`

StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
StorageStat func(context.Context, stores.ID) (fsutil.FsStat, error) `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"`
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType, bool) error `perm:"admin"`
StorageDropSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, stores.SectorFileType, abi.RegisteredSealProof, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"`
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
StorageBestAlloc func(ctx context.Context, allocate stores.SectorFileType, spt abi.RegisteredSealProof, sealing stores.PathType) ([]stores.StorageInfo, error) `perm:"admin"`
StorageReportHealth func(ctx context.Context, id stores.ID, report stores.HealthReport) error `perm:"admin"`
StorageLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) error `perm:"admin"`
StorageTryLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) (bool, error) `perm:"admin"`
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
StorageStat func(context.Context, stores.ID) (fsutil.FsStat, error) `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"`
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType, bool) error `perm:"admin"`
StorageDropSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, stores.SectorFileType, abi.SectorSize, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"`
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
StorageBestAlloc func(ctx context.Context, allocate stores.SectorFileType, ssize abi.SectorSize, sealing stores.PathType) ([]stores.StorageInfo, error) `perm:"admin"`
Comment on lines +309 to +311
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking.

StorageReportHealth func(ctx context.Context, id stores.ID, report stores.HealthReport) error `perm:"admin"`
StorageLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) error `perm:"admin"`
StorageTryLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) (bool, error) `perm:"admin"`

DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
DealsList func(ctx context.Context) ([]api.MarketDeal, error) `perm:"read"`
Expand Down Expand Up @@ -1203,8 +1203,8 @@ func (c *StorageMinerStruct) StorageDropSector(ctx context.Context, storageId st
return c.Internal.StorageDropSector(ctx, storageId, s, ft)
}

func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types stores.SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]stores.SectorStorageInfo, error) {
return c.Internal.StorageFindSector(ctx, si, types, spt, allowFetch)
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types stores.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]stores.SectorStorageInfo, error) {
return c.Internal.StorageFindSector(ctx, si, types, ssize, allowFetch)
}

func (c *StorageMinerStruct) StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) {
Expand All @@ -1223,8 +1223,8 @@ func (c *StorageMinerStruct) StorageInfo(ctx context.Context, id stores.ID) (sto
return c.Internal.StorageInfo(ctx, id)
}

func (c *StorageMinerStruct) StorageBestAlloc(ctx context.Context, allocate stores.SectorFileType, spt abi.RegisteredSealProof, pt stores.PathType) ([]stores.StorageInfo, error) {
return c.Internal.StorageBestAlloc(ctx, allocate, spt, pt)
func (c *StorageMinerStruct) StorageBestAlloc(ctx context.Context, allocate stores.SectorFileType, ssize abi.SectorSize, pt stores.PathType) ([]stores.StorageInfo, error) {
return c.Internal.StorageBestAlloc(ctx, allocate, ssize, pt)
}

func (c *StorageMinerStruct) StorageReportHealth(ctx context.Context, id stores.ID, report stores.HealthReport) error {
Expand Down
2 changes: 1 addition & 1 deletion build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func VersionForType(nodeType NodeType) (Version, error) {
// semver versions of the rpc api exposed
var (
FullAPIVersion = newVer(0, 17, 0)
MinerAPIVersion = newVer(0, 15, 0)
MinerAPIVersion = newVer(0, 16, 0)
WorkerAPIVersion = newVer(0, 15, 0)
)

Expand Down
8 changes: 4 additions & 4 deletions extern/sector-storage/faults.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (

// FaultTracker TODO: Track things more actively
type FaultTracker interface {
CheckProvable(ctx context.Context, spt abi.RegisteredSealProof, sectors []abi.SectorID) ([]abi.SectorID, error)
CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []abi.SectorID) ([]abi.SectorID, error)
}

// CheckProvable returns unprovable sectors
func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof, sectors []abi.SectorID) ([]abi.SectorID, error) {
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []abi.SectorID) ([]abi.SectorID, error) {
var bad []abi.SectorID

ssize, err := spt.SectorSize()
ssize, err := pp.SectorSize()
if err != nil {
return nil, err
}
Expand All @@ -43,7 +43,7 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof
return nil
}

lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, stores.PathStorage, stores.AcquireMove)
lp, _, err := m.localStore.AcquireSector(ctx, sector, ssize, stores.FTSealed|stores.FTCache, stores.FTNone, stores.PathStorage, stores.AcquireMove)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad = append(bad, sector)
Expand Down
14 changes: 11 additions & 3 deletions extern/sector-storage/localworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ type localWorkerPathProvider struct {

func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) {

paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op)
ssize, err := l.w.scfg.SealProofType.SectorSize()
if err != nil {
return stores.SectorPaths{}, nil, err
}
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, ssize, existing, allocate, sealing, l.op)
if err != nil {
return stores.SectorPaths{}, nil, err
}

releaseStorage, err := l.w.localStore.Reserve(ctx, sector, l.w.scfg.SealProofType, allocate, storageIDs, stores.FSOverheadSeal)
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, ssize, allocate, storageIDs, stores.FSOverheadSeal)
if err != nil {
return stores.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
Expand Down Expand Up @@ -212,7 +216,11 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
}

func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) error {
if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, types); err != nil {
ssize, err := l.scfg.SealProofType.SectorSize()
if err != nil {
return err
}
if err := l.storage.MoveStorage(ctx, sector, ssize, types); err != nil {
return xerrors.Errorf("moving sealed data to storage: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
return nil, err
}

prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}, cfg)
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si, spt: cfg.SealProofType}, cfg)
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *st
lstor, err := stores.NewLocal(ctx, st, si, nil)
require.NoError(t, err)

prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg)
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, spt: cfg.SealProofType}, cfg)
require.NoError(t, err)

stor := stores.NewRemote(lstor, si, nil, 6000)
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (mgr *SectorMgr) Remove(ctx context.Context, sector abi.SectorID) error {
return nil
}

func (mgr *SectorMgr) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof, ids []abi.SectorID) ([]abi.SectorID, error) {
func (mgr *SectorMgr) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, ids []abi.SectorID) ([]abi.SectorID, error) {
var bad []abi.SectorID

for _, sid := range ids {
Expand Down
7 changes: 6 additions & 1 deletion extern/sector-storage/roprov.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
}

ssize, err := l.spt.SectorSize()
if err != nil {
return stores.SectorPaths{}, nil, xerrors.Errorf("failed to determine sector size: %w", err)
}

ctx, cancel := context.WithCancel(ctx)

// use TryLock to avoid blocking
Expand All @@ -34,7 +39,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
return stores.SectorPaths{}, nil, xerrors.Errorf("failed to acquire sector lock")
}

p, _, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing, stores.AcquireMove)
p, _, err := l.stor.AcquireSector(ctx, id, ssize, existing, allocate, sealing, stores.AcquireMove)

return p, cancel, err
}
7 changes: 6 additions & 1 deletion extern/sector-storage/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
have[path.ID] = struct{}{}
}

best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, s.ptype)
ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype)
if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion extern/sector-storage/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
have[path.ID] = struct{}{}
}

best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, spt, s.allowFetch)
ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
if err != nil {
return false, xerrors.Errorf("finding best storage: %w", err)
}
Expand Down
7 changes: 1 addition & 6 deletions extern/sector-storage/stores/filetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ func (t SectorFileType) Has(singleType SectorFileType) bool {
return t&singleType == singleType
}

func (t SectorFileType) SealSpaceUse(spt abi.RegisteredSealProof) (uint64, error) {
ssize, err := spt.SectorSize()
if err != nil {
return 0, xerrors.Errorf("getting sector size: %w", err)
}

func (t SectorFileType) SealSpaceUse(ssize abi.SectorSize) (uint64, error) {
var need uint64
for _, pathType := range PathTypes {
if !t.Has(pathType) {
Expand Down
12 changes: 6 additions & 6 deletions extern/sector-storage/stores/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type SectorIndex interface { // part of storage-miner api

StorageDeclareSector(ctx context.Context, storageID ID, s abi.SectorID, ft SectorFileType, primary bool) error
StorageDropSector(ctx context.Context, storageID ID, s abi.SectorID, ft SectorFileType) error
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error)
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]SectorStorageInfo, error)

StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredSealProof, pathType PathType) ([]StorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, ssize abi.SectorSize, pathType PathType) ([]StorageInfo, error)

// atomically acquire locks on all sector file types. close ctx to unlock
StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error
Expand Down Expand Up @@ -246,7 +246,7 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID ID, s abi.Secto
return nil
}

func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error) {
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]SectorStorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()

Expand Down Expand Up @@ -297,7 +297,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
}

if allowFetch {
spaceReq, err := ft.SealSpaceUse(spt)
spaceReq, err := ft.SealSpaceUse(ssize)
if err != nil {
return nil, xerrors.Errorf("estimating required space: %w", err)
}
Expand Down Expand Up @@ -365,13 +365,13 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
return *si.info, nil
}

func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredSealProof, pathType PathType) ([]StorageInfo, error) {
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, ssize abi.SectorSize, pathType PathType) ([]StorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()

var candidates []storageEntry

spaceReq, err := allocate.SealSpaceUse(spt)
spaceReq, err := allocate.SealSpaceUse(ssize)
if err != nil {
return nil, xerrors.Errorf("estimating required space: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions extern/sector-storage/stores/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ const (
)

type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, err error)
AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error

// like remove, but doesn't remove the primary sector copy, nor the last
// non-primary copy if there no primary copies
RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error

// move sectors into storage
MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error
MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, types SectorFileType) error

FsStat(ctx context.Context, id ID) (fsutil.FsStat, error)
}
19 changes: 7 additions & 12 deletions extern/sector-storage/stores/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,7 @@ func (st *Local) reportHealth(ctx context.Context) {
}
}

func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, ft SectorFileType, storageIDs SectorPaths, overheadTab map[SectorFileType]int) (func(), error) {
ssize, err := spt.SectorSize()
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
}

func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, ssize abi.SectorSize, ft SectorFileType, storageIDs SectorPaths, overheadTab map[SectorFileType]int) (func(), error) {
st.localLk.Lock()

done := func() {}
Expand Down Expand Up @@ -324,7 +319,7 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.Register
return done, nil
}

func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi.SectorSize, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}
Expand All @@ -340,7 +335,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
continue
}

si, err := st.index.StorageFindSector(ctx, sid, fileType, spt, false)
si, err := st.index.StorageFindSector(ctx, sid, fileType, ssize, false)
if err != nil {
log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err)
continue
Expand Down Expand Up @@ -370,7 +365,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
continue
}

sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType)
sis, err := st.index.StorageBestAlloc(ctx, fileType, ssize, pathType)
if err != nil {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("finding best storage for allocating : %w", err)
}
Expand Down Expand Up @@ -525,13 +520,13 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF
return nil
}

func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error {
dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, PathStorage, AcquireMove)
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, types SectorFileType) error {
dest, destIds, err := st.AcquireSector(ctx, s, ssize, FTNone, types, PathStorage, AcquireMove)
if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err)
}

src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove)
src, srcIds, err := st.AcquireSector(ctx, s, ssize, types, FTNone, PathStorage, AcquireMove)
if err != nil {
return xerrors.Errorf("acquire src storage: %w", err)
}
Expand Down
Loading