Skip to content

Commit

Permalink
lotus-pcr: add recover-miners command
Browse files Browse the repository at this point in the history
  • Loading branch information
travisperson committed Sep 9, 2020
1 parent 6c8a44b commit b23d5d1
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 17 deletions.
167 changes: 157 additions & 10 deletions cmd/lotus-pcr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var log = logging.Logger("main")
func main() {
local := []*cli.Command{
runCmd,
recoverMinersCmd,
versionCmd,
}

Expand Down Expand Up @@ -101,6 +102,80 @@ var versionCmd = &cli.Command{
},
}

var recoverMinersCmd = &cli.Command{
Name: "recover-miners",
Usage: "Ensure all miners with a negative available blanace have a FIL surplus across accounts",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "from",
EnvVars: []string{"LOTUS_PCR_FROM"},
Usage: "wallet address to send refund from",
},
&cli.BoolFlag{
Name: "no-sync",
EnvVars: []string{"LOTUS_PCR_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
},
&cli.BoolFlag{
Name: "dry-run",
EnvVars: []string{"LOTUS_PCR_DRY_RUN"},
Usage: "do not send any messages",
Value: false,
},
&cli.IntFlag{
Name: "miner-total-funds-threashold",
EnvVars: []string{"LOTUS_PCR_MINER_TOTAL_FUNDS_THREASHOLD"},
Usage: "total filecoin across all accounts that should be met, if the miner balancer drops below zero",
Value: 0,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path"))
if err != nil {
log.Fatal(err)
}
defer closer()

from, err := address.NewFromString(cctx.String("from"))
if err != nil {
return xerrors.Errorf("parsing source address (provide correct --from flag!): %w", err)
}

if !cctx.Bool("no-sync") {
if err := stats.WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}

dryRun := cctx.Bool("dry-run")
minerTotalFundsThreashold := uint64(cctx.Int("miner-total-funds-threashold"))

rf := &refunder{
api: api,
wallet: from,
dryRun: dryRun,
minerTotalFundsThreashold: types.FromFil(minerTotalFundsThreashold),
}

refundTipset, err := api.ChainHead(ctx)
if err != nil {
return err
}

negativeBalancerRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund())
if err != nil {
return err
}

if err := rf.Refund(ctx, "refund negative balancer miner", refundTipset, negativeBalancerRefund, 0); err != nil {
return err
}

return nil
},
}

var runCmd = &cli.Command{
Name: "run",
Usage: "Start message reimpursement",
Expand Down Expand Up @@ -230,7 +305,7 @@ var runCmd = &cli.Command{
return err
}

if err := rf.Refund(ctx, refundTipset, refunds, rounds); err != nil {
if err := rf.Refund(ctx, "refund stats", refundTipset, refunds, rounds); err != nil {
return err
}

Expand Down Expand Up @@ -289,7 +364,6 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) {

m.count = m.count + 1
m.totalRefunds = types.BigAdd(m.totalRefunds, value)

m.refunds[addr] = types.BigAdd(m.refunds[addr], value)
}

Expand Down Expand Up @@ -318,21 +392,94 @@ type refunderNodeApi interface {
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error)
}

type refunder struct {
api refunderNodeApi
wallet address.Address
percentExtra int
dryRun bool
preCommitEnabled bool
proveCommitEnabled bool
api refunderNodeApi
wallet address.Address
percentExtra int
dryRun bool
preCommitEnabled bool
proveCommitEnabled bool
minerTotalFundsThreashold big.Int
}

func (r *refunder) EnsureMinerMinimums(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) {
miners, err := r.api.StateListMiners(ctx, tipset.Key())
if err != nil {
return nil, err
}

for _, maddr := range miners {
mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}

if !mact.Balance.GreaterThan(big.Zero()) {
continue
}

minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}

if minerAvailableBalance.GreaterThanEqual(big.Zero()) {
log.Debugw("skipping over miner with positive balance", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}

// Look up and find all addresses associated with the miner
minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key())

allAddresses := []address.Address{minerInfo.Worker, minerInfo.Owner}
allAddresses = append(allAddresses, minerInfo.ControlAddresses...)

// Sum the balancer of all the addresses
addrSum := big.Zero()
addrCheck := make(map[address.Address]struct{}, len(allAddresses))
for _, addr := range allAddresses {
if _, found := addrCheck[addr]; !found {
balance, err := r.api.WalletBalance(ctx, addr)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}

addrSum = big.Add(addrSum, balance)
addrCheck[addr] = struct{}{}
}
}

totalAvailableBalance := big.Add(addrSum, minerAvailableBalance)

// If the miner has available balance they should use it
if totalAvailableBalance.GreaterThanEqual(r.minerTotalFundsThreashold) {
log.Debugw("skipping over miner with enough funds cross all accounts", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr, "miner_available_balance", minerAvailableBalance, "wallet_total_balances", addrSum)
continue
}

// Calculate the required FIL to bring the miner up to the minimum across all of their accounts
refundValue := big.Add(totalAvailableBalance.Abs(), r.minerTotalFundsThreashold)
refunds.Track(maddr, refundValue)

log.Debugw("processing negative balance miner", "miner", maddr, "refund", refundValue)
}

return refunds, nil
}

func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) {
Expand Down Expand Up @@ -464,7 +611,7 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refu
return refunds, nil
}

func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, rounds int) error {
func (r *refunder) Refund(ctx context.Context, name string, tipset *types.TipSet, refunds *MinersRefund, rounds int) error {
if refunds.Count() == 0 {
log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil
Expand Down Expand Up @@ -522,7 +669,7 @@ func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *Mi
refundSum = types.BigAdd(refundSum, msg.Value)
}

log.Infow("refund stats", "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count())
log.Infow(name, "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count())
return nil
}

Expand Down
22 changes: 15 additions & 7 deletions tools/stats/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,24 @@ func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, ti
return nil
}

type apiIpldStore struct {
type ApiIpldStore struct {
ctx context.Context
api api.FullNode
api apiIpldStoreApi
}

func (ht *apiIpldStore) Context() context.Context {
type apiIpldStoreApi interface {
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
}

func NewApiIpldStore(ctx context.Context, api apiIpldStoreApi) *ApiIpldStore {
return &ApiIpldStore{ctx, api}
}

func (ht *ApiIpldStore) Context() context.Context {
return ht.ctx
}

func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
func (ht *ApiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
raw, err := ht.api.ChainReadObj(ctx, c)
if err != nil {
return err
Expand All @@ -207,8 +215,8 @@ func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) err
return fmt.Errorf("Object does not implement CBORUnmarshaler")
}

func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore")
func (ht *ApiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on ApiIpldStore")
}

func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
Expand Down Expand Up @@ -259,7 +267,7 @@ func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointLis
return fmt.Errorf("failed to unmarshal power actor state: %w", err)
}

s := &apiIpldStore{ctx, api}
s := NewApiIpldStore(ctx, api)
mp, err := adt.AsMap(s, powerActorState.Claims)
if err != nil {
return err
Expand Down

0 comments on commit b23d5d1

Please sign in to comment.