Skip to content

Commit

Permalink
Merge pull request #3689 from filecoin-project/feat/batch-window-PoST
Browse files Browse the repository at this point in the history
storage: split window PoST submission into multiple messages
  • Loading branch information
magik6k authored Sep 15, 2020
2 parents f9ea393 + db998e9 commit cf82483
Show file tree
Hide file tree
Showing 2 changed files with 473 additions and 103 deletions.
271 changes: 168 additions & 103 deletions storage/wdpost_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package storage
import (
"bytes"
"context"
"errors"
"time"

"github.com/filecoin-project/go-state-types/dline"
Expand All @@ -30,8 +29,6 @@ import (
"github.com/filecoin-project/lotus/journal"
)

var errNoPartitions = errors.New("no partitions")

func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return WdPoStSchedulerEvt{
Expand Down Expand Up @@ -79,23 +76,27 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *dline.Info,
})
}

proof, err := s.runPost(ctx, *deadline, ts)
switch err {
case errNoPartitions:
posts, err := s.runPost(ctx, *deadline, ts)
if err != nil {
log.Errorf("runPost failed: %+v", err)
s.failPost(err, deadline)
return
}

if len(posts) == 0 {
recordProofsEvent(nil, cid.Undef)
return
case nil:
sm, err := s.submitPost(ctx, proof)
}

for i := range posts {
post := &posts[i]
sm, err := s.submitPost(ctx, post)
if err != nil {
log.Errorf("submitPost failed: %+v", err)
s.failPost(err, deadline)
return
} else {
recordProofsEvent(post.Partitions, sm.Cid())
}
recordProofsEvent(proof.Partitions, sm.Cid())
default:
log.Errorf("runPost failed: %+v", err)
s.failPost(err, deadline)
return
}

journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
Expand Down Expand Up @@ -327,7 +328,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
return faults, sm, nil
}

func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) (*miner.SubmitWindowedPoStParams, error) {
func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) {
ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End()

Expand Down Expand Up @@ -399,136 +400,200 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err)
}

// Get the partitions for the given deadline
partitions, err := s.api.StateMinerPartitions(ctx, s.actor, di.Index, ts.Key())
if err != nil {
return nil, xerrors.Errorf("getting partitions: %w", err)
}

params := &miner.SubmitWindowedPoStParams{
Deadline: di.Index,
Partitions: make([]miner.PoStPartition, 0, len(partitions)),
Proofs: nil,
// Split partitions into batches, so as not to exceed the number of sectors
// allowed in a single message
partitionBatches, err := s.batchPartitions(partitions)
if err != nil {
return nil, err
}

skipCount := uint64(0)
postSkipped := bitfield.New()
var postOut []proof.PoStProof
// Generate proofs in batches
posts := make([]miner.SubmitWindowedPoStParams, 0, len(partitionBatches))
for batchIdx, batch := range partitionBatches {
batchPartitionStartIdx := 0
for _, batch := range partitionBatches[:batchIdx] {
batchPartitionStartIdx += len(batch)
}

for retries := 0; retries < 5; retries++ {
var sinfos []proof.SectorInfo
sidToPart := map[abi.SectorNumber]int{}
params := miner.SubmitWindowedPoStParams{
Deadline: di.Index,
Partitions: make([]miner.PoStPartition, 0, len(batch)),
Proofs: nil,
}

for partIdx, partition := range partitions {
// TODO: Can do this in parallel
toProve, err := partition.ActiveSectors()
if err != nil {
return nil, xerrors.Errorf("getting active sectors: %w", err)
}
skipCount := uint64(0)
postSkipped := bitfield.New()
var postOut []proof.PoStProof
somethingToProve := true
for retries := 0; retries < 5; retries++ {
var sinfos []proof.SectorInfo
for partIdx, partition := range batch {
// TODO: Can do this in parallel
toProve, err := partition.ActiveSectors()
if err != nil {
return nil, xerrors.Errorf("getting active sectors: %w", err)
}

toProve, err = bitfield.MergeBitFields(toProve, partition.Recoveries)
if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
}
toProve, err = bitfield.MergeBitFields(toProve, partition.Recoveries)
if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
}

good, err := s.checkSectors(ctx, toProve)
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
}
good, err := s.checkSectors(ctx, toProve)
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
}

good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
}
good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
}

skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
return nil, xerrors.Errorf("toProve - good: %w", err)
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
return nil, xerrors.Errorf("toProve - good: %w", err)
}

sc, err := skipped.Count()
if err != nil {
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}
sc, err := skipped.Count()
if err != nil {
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}

skipCount += sc
skipCount += sc

ssi, err := s.sectorsForProof(ctx, good, partition.Sectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
ssi, err := s.sectorsForProof(ctx, good, partition.Sectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}

if len(ssi) == 0 {
continue
}

if len(ssi) == 0 {
continue
sinfos = append(sinfos, ssi...)
params.Partitions = append(params.Partitions, miner.PoStPartition{
Index: uint64(batchPartitionStartIdx + partIdx),
Skipped: skipped,
})
}

sinfos = append(sinfos, ssi...)
for _, si := range ssi {
sidToPart[si.SectorNumber] = partIdx
if len(sinfos) == 0 {
// nothing to prove for this batch
somethingToProve = false
break
}

params.Partitions = append(params.Partitions, miner.PoStPartition{
Index: uint64(partIdx),
Skipped: skipped,
})
}
// Generate proof
log.Infow("running windowPost",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount)

if len(sinfos) == 0 {
// nothing to prove..
return nil, errNoPartitions
}
tsStart := build.Clock.Now()

log.Infow("running windowPost",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount)
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, err
}

tsStart := build.Clock.Now()
var ps []abi.SectorID
postOut, ps, err = s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand))
elapsed := time.Since(tsStart)

mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, err
}
log.Infow("computing window PoSt", "batch", batchIdx, "elapsed", elapsed)

var ps []abi.SectorID
postOut, ps, err = s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand))
elapsed := time.Since(tsStart)
if err == nil {
// Proof generation successful, stop retrying
break
}

log.Infow("computing window PoSt", "elapsed", elapsed)
// Proof generation failed, so retry

if err == nil {
break
}
if len(ps) == 0 {
return nil, xerrors.Errorf("running post failed: %w", err)
}

log.Warnw("generate window PoSt skipped sectors", "sectors", ps, "error", err, "try", retries)

if len(ps) == 0 {
return nil, xerrors.Errorf("running post failed: %w", err)
skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
}
}

log.Warnw("generate window PoSt skipped sectors", "sectors", ps, "error", err, "try", retries)
// Nothing to prove for this batch, try the next batch
if !somethingToProve {
continue
}

skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
}

if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
params.Proofs = postOut

params.Proofs = postOut
posts = append(posts, params)
}

// Compute randomness after generating proofs so as to reduce the impact
// of chain reorgs (which change randomness)
commEpoch := di.Open
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
if err != nil {
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err)
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err)
}

for i := range posts {
posts[i].ChainCommitEpoch = commEpoch
posts[i].ChainCommitRand = commRand
}
params.ChainCommitEpoch = commEpoch
params.ChainCommitRand = commRand

log.Infow("submitting window PoSt")
return posts, nil
}

return params, nil
func (s *WindowPoStScheduler) batchPartitions(partitions []*miner.Partition) ([][]*miner.Partition, error) {
// Get the number of sectors allowed in a partition, for this proof size
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(s.proofType)
if err != nil {
return nil, xerrors.Errorf("getting sectors per partition: %w", err)
}

// We don't want to exceed the number of sectors allowed in a message.
// So given the number of sectors in a partition, work out the number of
// partitions that can be in a message without exceeding sectors per
// message:
// floor(number of sectors allowed in a message / sectors per partition)
// eg:
// max sectors per message 7: ooooooo
// sectors per partition 3: ooo
// partitions per message 2: oooOOO
// <1><2> (3rd doesn't fit)
partitionsPerMsg := int(miner.AddressedSectorsMax / sectorsPerPartition)

// The number of messages will be:
// ceiling(number of partitions / partitions per message)
batchCount := len(partitions) / partitionsPerMsg
if len(partitions)%partitionsPerMsg != 0 {
batchCount++
}

// Split the partitions into batches
batches := make([][]*miner.Partition, 0, batchCount)
for i := 0; i < len(partitions); i += partitionsPerMsg {
end := i + partitionsPerMsg
if end > len(partitions) {
end = len(partitions)
}
batches = append(batches, partitions[i:end])
}
return batches, nil
}

func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors, allSectors bitfield.BitField, ts *types.TipSet) ([]proof.SectorInfo, error) {
Expand Down
Loading

0 comments on commit cf82483

Please sign in to comment.