Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expirations chart #111

Merged
merged 5 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions web/api/webrpc/actor_sector_expirations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package webrpc

import (
"context"

"github.com/samber/lo"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin"

"github.com/filecoin-project/lotus/chain/types"
)

type SectorExpirationBucket struct {
Expiration int64 `db:"expiration_bucket"`
Count int64 `db:"count"`

// db ignored
Days int64 `db:"-"`
}

type SectorExpirations struct {
All []SectorExpirationBucket
CC []SectorExpirationBucket
}

func (a *WebRPC) ActorSectorExpirations(ctx context.Context, maddr address.Address) (*SectorExpirations, error) {
spID, err := address.IDFromAddress(maddr)
if err != nil {
return nil, xerrors.Errorf("id from %s: %w", maddr, err)
}

out := SectorExpirations{
All: []SectorExpirationBucket{},
CC: []SectorExpirationBucket{},
}

now, err := a.deps.Chain.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("getting head: %w", err)
}

nowBucketNum := int64(now.Height()) / 10000

err = a.deps.DB.Select(ctx, &out.All, `SELECT
(expiration_epoch / 10000) * 10000 AS expiration_bucket,
COUNT(*) as count
FROM
sectors_meta
WHERE
sp_id = $1
AND expiration_epoch IS NOT NULL
AND expiration_epoch >= $2
GROUP BY
(expiration_epoch / 10000) * 10000
ORDER BY
expiration_bucket`, int64(spID), nowBucketNum)
if err != nil {
return nil, err
}
err = a.deps.DB.Select(ctx, &out.CC, `SELECT
(expiration_epoch / 10000) * 10000 AS expiration_bucket,
COUNT(*) as count
FROM
sectors_meta
WHERE
sp_id = $1
AND is_cc = true
AND expiration_epoch IS NOT NULL
AND expiration_epoch >= $2
GROUP BY
(expiration_epoch / 10000) * 10000
ORDER BY
expiration_bucket`, int64(spID), nowBucketNum)
if err != nil {
return nil, err
}

out.All, err = a.prepExpirationBucket(out.All, now)
if err != nil {
return nil, err
}
out.CC, err = a.prepExpirationBucket(out.CC, now)
if err != nil {
return nil, err
}

// If first point in CC is larger than first point in All, shift the first CC point to the first All point
if len(out.CC) > 0 && len(out.All) > 0 && out.CC[0].Expiration > out.All[0].Expiration {
out.CC[0].Expiration = out.All[0].Expiration
}

return &out, nil
}

func (a *WebRPC) prepExpirationBucket(out []SectorExpirationBucket, now *types.TipSet) ([]SectorExpirationBucket, error) {
totalCount := lo.Reduce(out, func(acc int64, b SectorExpirationBucket, _ int) int64 {
return acc + b.Count
}, int64(0))

if len(out) == 0 {
return out, nil
}

if out[0].Expiration > int64(now.Height()) {
nowBucket := SectorExpirationBucket{
Expiration: int64(now.Height()),
Count: 0,
}
out = append([]SectorExpirationBucket{nowBucket}, out...)
}

for i := range out {
newTotal := totalCount - out[i].Count
out[i].Count = newTotal
totalCount = newTotal

epochsToExpiry := abi.ChainEpoch(out[i].Expiration) - now.Height()
secsToExpiry := epochsToExpiry * builtin.EpochDurationSeconds
daysToExpiry := secsToExpiry / (60 * 60 * 24)

out[i].Days = int64(daysToExpiry)
}

return out, nil
}
234 changes: 234 additions & 0 deletions web/api/webrpc/actor_summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package webrpc

import (
"context"
"sort"

"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"

"github.com/filecoin-project/curio/lib/curiochain"

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)

type ActorSummary struct {
Address string
CLayers []string

QualityAdjustedPower string
RawBytePower string

ActorBalance, ActorAvailable, WorkerBalance string

Win1, Win7, Win30 int64

Deadlines []ActorDeadline
}

type ActorDeadline struct {
Empty bool
Current bool
Proven bool
PartFaulty bool
Faulty bool
}

type minimalActorInfo struct {
Addresses []struct {
MinerAddresses []string
}
}

func (a *WebRPC) ActorSummary(ctx context.Context) ([]ActorSummary, error) {
stor := store.ActorStore(ctx, blockstore.NewReadCachedBlockstore(blockstore.NewAPIBlockstore(a.deps.Chain), curiochain.ChainBlockCache))

var actorInfos []ActorSummary

confNameToAddr := map[address.Address][]string{}

err := forEachConfig(a, func(name string, info minimalActorInfo) error {
for _, aset := range info.Addresses {
for _, addr := range aset.MinerAddresses {
a, err := address.NewFromString(addr)
if err != nil {
return xerrors.Errorf("parsing address: %w", err)
}
confNameToAddr[a] = append(confNameToAddr[a], name)
}
}
return nil
})
if err != nil {
return nil, err
}

wins, err := a.spWins(ctx)
if err != nil {
return nil, xerrors.Errorf("getting sp wins: %w", err)
}

for addr, cnames := range confNameToAddr {
p, err := a.deps.Chain.StateMinerPower(ctx, addr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting miner power: %w", err)
}

mact, err := a.deps.Chain.StateGetActor(ctx, addr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting actor: %w", err)
}

mas, err := miner.Load(stor, mact)
if err != nil {
return nil, err
}

deadlines, err := a.getDeadlines(ctx, addr)
if err != nil {
return nil, err
}

avail, err := mas.AvailableBalance(mact.Balance)
if err != nil {
return nil, xerrors.Errorf("getting available balance: %w", err)
}

mi, err := mas.Info()
if err != nil {
return nil, xerrors.Errorf("getting miner info: %w", err)
}

wbal, err := a.deps.Chain.WalletBalance(ctx, mi.Worker)
if err != nil {
return nil, xerrors.Errorf("getting worker balance: %w", err)
}

sort.Strings(cnames)

actorInfos = append(actorInfos, ActorSummary{
Address: addr.String(),
CLayers: cnames,
QualityAdjustedPower: types.DeciStr(p.MinerPower.QualityAdjPower),
RawBytePower: types.DeciStr(p.MinerPower.RawBytePower),
Deadlines: deadlines,
ActorBalance: types.FIL(mact.Balance).Short(),
ActorAvailable: types.FIL(avail).Short(),
WorkerBalance: types.FIL(wbal).Short(),
Win1: wins[addr].Win1,
Win7: wins[addr].Win7,
Win30: wins[addr].Win30,
})
}

sort.Slice(actorInfos, func(i, j int) bool {
return actorInfos[i].Address < actorInfos[j].Address
})

return actorInfos, nil
}

func (a *WebRPC) getDeadlines(ctx context.Context, addr address.Address) ([]ActorDeadline, error) {
dls, err := a.deps.Chain.StateMinerDeadlines(ctx, addr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting deadlines: %w", err)
}

outDls := make([]ActorDeadline, 48)

for dlidx := range dls {
p, err := a.deps.Chain.StateMinerPartitions(ctx, addr, uint64(dlidx), types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting partition: %w", err)
}

dl := ActorDeadline{}

var live, faulty uint64

for _, part := range p {
l, err := part.LiveSectors.Count()
if err != nil {
return nil, xerrors.Errorf("getting live sectors: %w", err)
}
live += l

f, err := part.FaultySectors.Count()
if err != nil {
return nil, xerrors.Errorf("getting faulty sectors: %w", err)
}
faulty += f
}

dl.Empty = live == 0
dl.Proven = live > 0 && faulty == 0
dl.PartFaulty = faulty > 0
dl.Faulty = faulty > 0 && faulty == live

outDls[dlidx] = dl
}

pd, err := a.deps.Chain.StateMinerProvingDeadline(ctx, addr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting proving deadline: %w", err)
}

outDls[pd.Index].Current = true

return outDls, nil
}

type wins struct {
SpID int64 `db:"sp_id"`
Win1 int64 `db:"win1"`
Win7 int64 `db:"win7"`
Win30 int64 `db:"win30"`
}

func (a *WebRPC) spWins(ctx context.Context) (map[address.Address]wins, error) {
var w []wins

// note: this query uses mining_tasks_won_sp_id_base_compute_time_index
err := a.deps.DB.Select(ctx, &w, `WITH wins AS (
SELECT
sp_id,
base_compute_time,
won
FROM
mining_tasks
WHERE
won = true
AND base_compute_time > NOW() - INTERVAL '30 days'
)

SELECT
sp_id,
COUNT(*) FILTER (WHERE base_compute_time > NOW() - INTERVAL '1 day') AS "win1",
COUNT(*) FILTER (WHERE base_compute_time > NOW() - INTERVAL '7 days') AS "win7",
COUNT(*) FILTER (WHERE base_compute_time > NOW() - INTERVAL '30 days') AS "win30"
FROM
wins
GROUP BY
sp_id
ORDER BY
sp_id`)
if err != nil {
return nil, xerrors.Errorf("query win counts: %w", err)
}

wm := make(map[address.Address]wins)
for _, wi := range w {
ma, err := address.NewIDAddress(uint64(wi.SpID))
if err != nil {
return nil, xerrors.Errorf("parsing miner address: %w", err)
}

wm[ma] = wi
}

return wm, nil
}
2 changes: 1 addition & 1 deletion web/api/webrpc/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type OpenDealInfo struct {
}

func (a *WebRPC) DealsPending(ctx context.Context) ([]OpenDealInfo, error) {
var deals []OpenDealInfo
deals := []OpenDealInfo{}
err := a.deps.DB.Select(ctx, &deals, `SELECT sp_id, sector_number, piece_cid, piece_size, created_at, is_snap FROM open_sector_pieces ORDER BY created_at DESC`)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion web/api/webrpc/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type TaskSummary struct {
}

func (a *WebRPC) ClusterTaskSummary(ctx context.Context) ([]TaskSummary, error) {
var ts []TaskSummary
var ts = []TaskSummary{}
err := a.deps.DB.Select(ctx, &ts, `SELECT
t.id as id, t.name as name, t.update_time as since_posted, t.owner_id as owner_id, hm.host_and_port as owner
FROM harmony_task t LEFT JOIN harmony_machines hm ON hm.id = t.owner_id
Expand Down
6 changes: 4 additions & 2 deletions web/api/webrpc/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ type UpgradeSector struct {
TaskIDSubmit *uint64 `db:"task_id_submit"`
AfterSubmit bool `db:"after_submit"`

AfterProveSuccess bool `db:"after_prove_msg_success"`

TaskIDMoveStorage *uint64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`
}

func (a *WebRPC) UpgradeSectors(ctx context.Context) ([]UpgradeSector, error) {
var sectors []UpgradeSector
err := a.deps.DB.Select(ctx, &sectors, `SELECT sp_id, sector_number, task_id_encode, after_encode, task_id_prove, after_prove, task_id_submit, after_submit, task_id_move_storage, after_move_storage FROM sectors_snap_pipeline`)
sectors := []UpgradeSector{}
err := a.deps.DB.Select(ctx, &sectors, `SELECT sp_id, sector_number, task_id_encode, after_encode, task_id_prove, after_prove, task_id_submit, after_submit, after_prove_msg_success, task_id_move_storage, after_move_storage FROM sectors_snap_pipeline`)
if err != nil {
return nil, err
}
Expand Down
Loading