Skip to content

Commit

Permalink
polish: add sector deal tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Aug 26, 2020
1 parent 3f165cc commit f650f17
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 58 deletions.
56 changes: 0 additions & 56 deletions cmd/lotus-chainwatch/processor/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,6 @@ create table if not exists market_deal_states
);
create table if not exists minerid_dealid_sectorid
(
deal_id bigint not null
constraint sectors_sector_ids_id_fk
references market_deal_proposals(deal_id),
sector_id bigint not null,
miner_id text not null,
foreign key (sector_id, miner_id) references sector_precommit_info(sector_id, miner_id),
constraint miner_sector_deal_ids_pk
primary key (miner_id, sector_id, deal_id)
);
`); err != nil {
return err
}
Expand Down Expand Up @@ -266,48 +252,6 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip

}

func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}

stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}

for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}

if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil

}

func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
start := time.Now()
defer func() {
Expand Down
73 changes: 71 additions & 2 deletions cmd/lotus-chainwatch/processor/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ create table if not exists miner_sector_events
primary key (sector_id, event, miner_id, state_root)
);
create table if not exists minerid_dealid_sectorid
(
deal_id bigint not null
constraint sectors_sector_ids_id_fk
references market_deal_proposals(deal_id),
sector_id bigint not null,
miner_id text not null,
foreign key (sector_id, miner_id) references sector_precommit_info(sector_id, miner_id),
constraint miner_sector_deal_ids_pk
primary key (miner_id, sector_id, deal_id)
);
`); err != nil {
return err
}
Expand Down Expand Up @@ -271,6 +285,11 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
preCommitEvents := make(chan *MinerSectorsEvent, 8)
sectorEvents := make(chan *MinerSectorsEvent, 8)
partitionEvents := make(chan *MinerSectorsEvent, 8)
dealEvents := make(chan *SectorDealEvent, 8)

grp.Go(func() error {
return p.storePreCommitDealInfo(dealEvents)
})

grp.Go(func() error {
return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents)
Expand All @@ -279,8 +298,9 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
grp.Go(func() error {
defer func() {
close(preCommitEvents)
close(dealEvents)
}()
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents)
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents)
})

grp.Go(func() error {
Expand All @@ -296,7 +316,7 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
return grp.Wait()
}

func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent) error {
func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent, sectorDeals chan<- *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -332,6 +352,13 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA

preCommitAdded := make([]uint64, len(changes.Added))
for i, added := range changes.Added {
if len(added.Info.DealIDs) > 0 {
sectorDeals <- &SectorDealEvent{
MinerID: m.common.addr,
SectorID: uint64(added.Info.SectorNumber),
DealIDs: added.Info.DealIDs,
}
}
if added.Info.ReplaceCapacity {
if _, err := stmt.Exec(
m.common.addr.String(),
Expand Down Expand Up @@ -902,6 +929,48 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine
return tx.Commit()
}

func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}

stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}

for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}

if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil

}

func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
start := time.Now()
defer func() {
Expand Down

0 comments on commit f650f17

Please sign in to comment.