Skip to content

Commit

Permalink
Merge pull request #3778 from filecoin-project/feat/less-restrictive-…
Browse files Browse the repository at this point in the history
…read-piece-lock

Allow retrievals while sealing
  • Loading branch information
magik6k authored Sep 15, 2020
2 parents 6665a9c + 7dc0910 commit 801e01b
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,25 +203,26 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, ptype stores.Path
}
}

func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) {

// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTNone); err != nil {
returnErr = xerrors.Errorf("acquiring read sector lock: %w", err)
return
}

// passing 0 spt because we only need it when allowFetch is true
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false)
if err != nil {
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
return
}

var readOk bool
var selector WorkerSelector
if len(best) == 0 { // new
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing)
} else { // append to existing
foundUnsealed = len(best) > 0
if foundUnsealed { // append to existing
// There is unsealed sector, see if we can read from it

selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false)
Expand All @@ -231,20 +232,35 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return err
})
if err != nil {
return xerrors.Errorf("reading piece from sealed sector: %w", err)
returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err)
}
} else {
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing)
}
return
}

if readOk {
return nil
}
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size)
if err != nil {
return err
}
if readOk {
return nil
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
}

unsealFetch := func(ctx context.Context, worker Worker) error {
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil {
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
}

if len(best) > 0 {
if foundUnsealed {
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove); err != nil {
return xerrors.Errorf("copy unsealed sector data: %w", err)
}
Expand Down

0 comments on commit 801e01b

Please sign in to comment.