-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Auto-throttling plus many improvements & fixes. #694
Conversation
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
…aware Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
@@ -54,6 +54,7 @@ func askFromPbAsk(a *rpc.StorageAsk) ask.StorageAsk { | |||
return ask.StorageAsk{ | |||
Price: a.GetPrice(), | |||
MinPieceSize: a.GetMinPieceSize(), | |||
MaxPieceSize: a.GetMaxPieceSize(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include max piece size in the ask index.
@@ -64,8 +64,7 @@ import ( | |||
) | |||
|
|||
const ( | |||
datastoreFolderName = "datastore" | |||
lotusConnectionRetries = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now a config attribute. Line 115.
lotus.MonitorLotusSync(clientBuilder) | ||
lsm, err := lotus.NewSyncMonitor(clientBuilder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What before was some background running daemon that generated Prometheus metrics for the grafana dashboard regarding Lotus sync height, now is a more formal component to provide Lotus syncing information for decision making.
@@ -250,7 +254,7 @@ func NewServer(conf Config) (*Server, error) { | |||
if conf.Devnet { | |||
conf.FFSMinimumPieceSize = 0 | |||
} | |||
cs := filcold.New(ms, dm, ipfs, chain, l, conf.FFSMinimumPieceSize) | |||
cs := filcold.New(ms, dm, ipfs, chain, l, lsm, conf.FFSMinimumPieceSize, conf.FFSMaxParallelDealPreparing) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new component is used by Cold Storage, plus a new flag that indicates how many data-intensive tasks we throw at lotus in parallel. (More on this later).
defaultDealStartOffset = 72 * 60 * 60 / util.EpochDurationSeconds // 72hs | ||
defaultDealStartOffset = 48 * 60 * 60 / util.EpochDurationSeconds // 48hs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still, a question mark which is the right value for this.
Eventually, we can move it to the Storage Config and let people have the change to play with it, but most prob they won't know the consequences yet.
if f.PieceSize < uint64(sask.MinPieceSize) || f.PieceSize > uint64(sask.MaxPieceSize) { | ||
log.Warnf("skipping miner %s since needed piece size %d doesn't fit bounds (%d, %d)", miners[i], f.PieceSize, sask.MinPieceSize, sask.MaxPieceSize) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with SR2 miner selector.
@@ -28,6 +28,7 @@ func (s *RPC) Get(ctx context.Context, req *GetRequest) (*GetResponse, error) { | |||
storage[key] = &StorageAsk{ | |||
Price: ask.Price, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asutula , I just did the usual RPC/CLI changes to be coherent.. but this will go away anyway in your changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
) | ||
|
||
// SyncMonitor provides information about the Lotus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TL;DR: SyncMonitor analyzes the sync status every 2 minutes, and provides cached value to whoever wants to know about it.
func (lsm *SyncMonitor) SyncHeightDiff() int64 { | ||
lsm.lock.Lock() | ||
defer lsm.lock.Unlock() | ||
return lsm.heightDiff | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide cached value... we don't want people asking lotus about its sync status to add load to it.
Since the syncing process is rather slow, a value 2 minutes old is usually good enough to make decisions.
lsm.lock.Lock() | ||
lsm.heightDiff = maxHeightDiff | ||
lsm.remaining = remaining | ||
lsm.lock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Lotus syncing process has many workers that are trying to sync different possible branches.
I decided to go pessimistic and consider the worst-case scenario. I think this is the best decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very exciting, no doubt that things are going to work much much better than before.
I found one actual bug you need to fix, then just left a couple comments/questions for my own understanding of things.
Very nice work.
return nil, fmt.Errorf("client get deal info: %s", err) | ||
} | ||
|
||
// Workaround ClientGetDealInfo giving unreliable status. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice.
// In the next Lotus version (v1.1.3) a new API will be used which also calculates | ||
// CommP, so we can help Lotus avoid recalculating size and CommP when deals are made. | ||
select { | ||
case fc.semaphDealPrep <- struct{}{}: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sure I'm just missing some understanding about how this works, but I would have expected to see some goroutine spawned off after the fc.semaphDealPrep
is used here to block execution until a slot opens up. As is, it looks to me like calls to calculatePieceSize
are run serially. Like I said, I'm sure I just don't understand, but I'd like to understand this because it seems awesome!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the scheduler decides to run a Job, it spawns a new goroutine and let that goroutine execute all the logic.
This means that at this point, when executing cold storage logic, it should only wait for the "OK" of this semaphore to keep doing the work it was doing.
If you see what's happening in a bigger scope, you might see multiple goroutines (Jobs) waiting in this semaphore to be allowed to move on executing heavy stuff.
The good part of this design is that you shouldn't think about the job execution logic methods as single goroutine doing things for multiple jobs. You can always assume you're doing things for a Job already in its own goroutine.
That's why you see that calculatePieceSize
is serial. Is serial in a single Job execution, but there're multiple goroutines one per executing Job. Whatever buffer this semaphore allows, are parallel executing goroutine Jobs that will be running in parallel calculating their size.
Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea that totally makes sense. I forgot that gouroutines for jobs are created quite early and high up the stack. Yea this is a really nice design because it makes this code look very simple and easy to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly!
// estimatePieceSize estimates the size of the Piece that will be built by the filecoin client | ||
// when making the deal. This calculation should consider the unique node sizes of the DAG, and | ||
// padding. It's important to not underestimate the size since that would lead to deal rejection | ||
// since the miner won't accept the further calculated PricePerEpoch. | ||
func (fc *FilCold) calculatePieceSize(ctx context.Context, c cid.Cid) (uint64, error) { | ||
// Get unique nodes. | ||
seen := cid.NewSet() | ||
if err := dag.Walk(ctx, fc.getLinks, c, seen.Visit); err != nil { | ||
return 0, fmt.Errorf("walking dag for size calculation: %s", err) | ||
} | ||
|
||
// Account for CAR header size. | ||
carHeader := car.CarHeader{ | ||
Roots: []cid.Cid{c}, | ||
Version: 1, | ||
} | ||
totalSize, err := car.HeaderSize(&carHeader) | ||
if err != nil { | ||
return 0, fmt.Errorf("calculating car header size: %s", err) | ||
} | ||
|
||
// Calculate total unique node sizes. | ||
buf := make([]byte, 8) | ||
f := func(c cid.Cid) error { | ||
s, err := fc.ipfs.Block().Stat(ctx, path.IpfsPath(c)) | ||
if err != nil { | ||
return fmt.Errorf("getting stats from DAG node: %s", err) | ||
} | ||
size := uint64(s.Size()) | ||
carBlockHeaderSize := uint64(binary.PutUvarint(buf, size)) | ||
totalSize += carBlockHeaderSize + size | ||
return nil | ||
} | ||
if err := seen.ForEach(f); err != nil { | ||
return 0, fmt.Errorf("aggregating unique nodes size: %s", err) | ||
} | ||
|
||
// Consider padding. | ||
paddedSize := padreader.PaddedSize(totalSize).Padded() | ||
|
||
return uint64(paddedSize), nil | ||
} | ||
|
||
func (fc *FilCold) getLinks(ctx context.Context, c cid.Cid) ([]*format.Link, error) { | ||
return fc.ipfs.Object().Links(ctx, path.IpfsPath(c)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😿
ffs/minerselector/reptop/reptop.go
Outdated
if f.PieceSize < sa.MinPieceSize && f.PieceSize > sa.MaxPieceSize { | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think you mean ||
@@ -28,6 +28,7 @@ func (s *RPC) Get(ctx context.Context, req *GetRequest) (*GetResponse, error) { | |||
storage[key] = &StorageAsk{ | |||
Price: ask.Price, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This PR contains multiple improvements and updates.
Lotus load handling:
WARN lotus-client lotus/metrics.go:93 Louts behind in syncing with height diff 39, todo: 6
. So, it also mentions how much of the difference is and which is the current progress (similar tolotus sync wait
).WARN ffs-filcold filcold/filcold.go:296 backpressure from unsynced Lotus node
.To be clear about the two last points since it might look similar. The former is a general monitoring control that will warn if the Lotus node is falling behind at any moment (maybe you're not even making deals). The latter is indicating that Powergate is trying to make deals, but is throttling since detected the Lotus node is syncing... whenever the Lotus node gets in sync, it will continue with its work.
Enhancements:
calculatePieceSize
logic. Originally, we needed to do this since there wasn't an API to resolve it. Now use the existing Lotus API which might give future-proof size calculation.ClientGetDealInfo
not reporting active deals. (until the real problem is fixed in Lotus).Indices:
MaxPieceSize
information.Miner selection:
reptop
andsr2
miner selection, theMinPieceSize
, andMaxPieceSize
of miners to avoid selecting miners which will reject the deal depending on deals data size.Chore: