Skip to content

Commit

Permalink
Merge pull request #6175 from filecoin-project/feat/dynamic-retreival…
Browse files Browse the repository at this point in the history
…-pricing

Dynamic Retrieval pricing
  • Loading branch information
magik6k authored Jun 17, 2021
2 parents d6dc081 + 9002fcb commit 44de67c
Show file tree
Hide file tree
Showing 21 changed files with 983 additions and 33 deletions.
9 changes: 8 additions & 1 deletion cmd/lotus-storage-miner/allinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ func TestMinerAllInfo(t *testing.T) {
t.Run("pre-info-all", run)

dh := kit.NewDealHarness(t, client, miner)
dh.MakeFullDeal(context.Background(), 6, false, false, 0)
_, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{
Ctx: context.Background(),
Rseed: 6,
CarExport: false,
FastRet: false,
StartEpoch: 0,
DoRetrieval: true,
})

t.Run("post-info-all", run)
}
2 changes: 1 addition & 1 deletion cmd/lotus-storage-miner/retrieval-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ var retrievalSetAskCmd = &cli.Command{

var retrievalGetAskCmd = &cli.Command{
Name: "get-ask",
Usage: "Get the provider's current retrieval ask",
Usage: "Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
ctx := lcli.DaemonContext(cctx)
Expand Down
4 changes: 2 additions & 2 deletions documentation/en/cli-lotus-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ COMMANDS:
selection Configure acceptance criteria for retrieval deal proposals
list List all active retrieval deals for this miner
set-ask Configure the provider's retrieval ask
get-ask Get the provider's current retrieval ask
get-ask Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command
help, h Shows a list of commands or help for one command
OPTIONS:
Expand Down Expand Up @@ -848,7 +848,7 @@ OPTIONS:
### lotus-miner retrieval-deals get-ask
```
NAME:
lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask
lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command
USAGE:
lotus-miner retrieval-deals get-ask [command options] [arguments...]
Expand Down
4 changes: 4 additions & 0 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (mgr *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) {
return id, nil
}

func (mgr *SectorMgr) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
return false, nil
}

func (mgr *SectorMgr) ForceState(sid storage.SectorRef, st int) error {
mgr.lk.Lock()
ss, ok := mgr.sectors[sid.ID]
Expand Down
23 changes: 23 additions & 0 deletions extern/sector-storage/piece_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ type Unsealer interface {
type PieceProvider interface {
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}

var _ PieceProvider = &pieceProvider{}

type pieceProvider struct {
storage *stores.Remote
index stores.SectorIndex
Expand All @@ -40,6 +43,26 @@ func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unse
}
}

// IsUnsealed checks if we have the unsealed piece at the given offset in an already
// existing unsealed file either locally or on any of the workers.
func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
if err := offset.Valid(); err != nil {
return false, xerrors.Errorf("offset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return false, xerrors.Errorf("size is not a valid piece size: %w", err)
}

ctxLock, cancel := context.WithCancel(ctx)
defer cancel()

if err := p.index.StorageLock(ctxLock, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
return false, xerrors.Errorf("acquiring read sector lock: %w", err)
}

return p.storage.CheckIsUnsealed(ctxLock, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
}

// tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it.
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
Expand Down
29 changes: 29 additions & 0 deletions extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,32 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// pre-commit 1
preCommit1 := ppt.preCommit1(t)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
// read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)

// pre-commit 2
ppt.preCommit2(t, preCommit1)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
// read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)

// finalize -> nil here will remove unsealed file
ppt.finalizeSector(t, nil)

// check if IsUnsealed -> false
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
// Read the piece -> will have to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
true, pieceData)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
// read the piece -> will not have to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)
Expand Down Expand Up @@ -118,12 +126,18 @@ func TestReadPieceRemoteWorkers(t *testing.T) {

// pre-commit 1
pC1 := ppt.preCommit1(t)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> no need to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
false, pd1)

// pre-commit 2
ppt.preCommit2(t, pC1)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> no need to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
false, pd1)
Expand All @@ -133,6 +147,8 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
// sending nil here will remove all unsealed files after sector is finalized.
ppt.finalizeSector(t, nil)

// check if IsUnsealed -> false
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> have to unseal since we removed the file.
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
true, pd1)
Expand All @@ -142,14 +158,21 @@ func TestReadPieceRemoteWorkers(t *testing.T) {

// remove the unsealed file and read again -> will have to unseal.
ppt.removeAllUnsealedSectorFiles(t)
// check if IsUnsealed -> false
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
true, pd1)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
// Read Piece 2 -> no unsealing as it got unsealed above.
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2)

// remove all unseal files -> Read Piece 2 -> will have to Unseal.
ppt.removeAllUnsealedSectorFiles(t)

// check if IsUnsealed -> false
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2)
}

Expand Down Expand Up @@ -306,6 +329,12 @@ func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreC
p.commD = commD
}

func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) bool {
b, err := p.pp.IsUnsealed(p.ctx, p.sector, offset, size)
require.NoError(t, err)
return b
}

func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
Expand Down
100 changes: 90 additions & 10 deletions extern/sector-storage/stores/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,87 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa
return resp.Body, nil
}

// CheckIsUnsealed checks if we have an unsealed piece at the given offset in an already unsealed sector file for the given piece
// either locally or on any of the workers.
// Returns true if we have the unsealed piece, false otherwise.
func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (bool, error) {
ft := storiface.FTUnsealed

paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return false, xerrors.Errorf("acquire local: %w", err)
}

path := storiface.PathByType(paths, ft)
if path != "" {
// if we have the unsealed file locally, check if it has the unsealed piece.
log.Infof("Read local %s (+%d,%d)", path, offset, size)
ssize, err := s.ProofType.SectorSize()
if err != nil {
return false, err
}

// open the unsealed sector file for the given sector size located at the given path.
pf, err := r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return false, xerrors.Errorf("opening partial file: %w", err)
}
log.Debugf("local partial file opened %s (+%d,%d)", path, offset, size)

// even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece
// in the unsealed sector file. That is what `HasAllocated` checks for.
has, err := r.pfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
if err != nil {
return false, xerrors.Errorf("has allocated: %w", err)
}

// close the local unsealed file.
if err := r.pfHandler.Close(pf); err != nil {
return false, xerrors.Errorf("failed to close partial file: %s", err)
}
log.Debugf("checked if local partial file has the piece %s (+%d,%d), returning answer=%t", path, offset, size, has)

// Sector files can technically not have a piece unsealed locally, but have it unsealed in remote storage, so we probably
// want to return only if has is true
if has {
return has, nil
}
}

// --- We don't have the unsealed piece in an unsealed sector file locally
// Check if we have it in a remote cluster.

si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
return false, xerrors.Errorf("StorageFindSector: %s", err)
}

if len(si) == 0 {
return false, nil
}

sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})

for _, info := range si {
for _, url := range info.URLs {
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
continue
}
if !ok {
continue
}

return true, nil
}
}

return false, nil
}

// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy.
// If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file
Expand All @@ -507,7 +588,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
if path != "" {
// if we have the unsealed file locally, return a reader that can be used to read the contents of the
// unsealed piece.
log.Infof("Read local %s (+%d,%d)", path, offset, size)
log.Debugf("Check local %s (+%d,%d)", path, offset, size)
ssize, err := s.ProofType.SectorSize()
if err != nil {
return nil, err
Expand All @@ -529,19 +610,18 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
}
log.Debugf("check if partial file is allocated %s (+%d,%d)", path, offset, size)

if !has {
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
if err := r.pfHandler.Close(pf); err != nil {
return nil, xerrors.Errorf("close partial file: %w", err)
}
return nil, nil
if has {
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
}

log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
if err := r.pfHandler.Close(pf); err != nil {
return nil, xerrors.Errorf("close partial file: %w", err)
}
}

// --- We don't have the unsealed sector file locally
// --- We don't have the unsealed piece in an unsealed sector file locally

// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
// to determine which workers have the unsealed file and then query those workers to know
Expand Down
Loading

0 comments on commit 44de67c

Please sign in to comment.