diff --git a/cmd/lotus-chainwatch/processor/market.go b/cmd/lotus-chainwatch/processor/market.go index 9577f341ed..11f02271ed 100644 --- a/cmd/lotus-chainwatch/processor/market.go +++ b/cmd/lotus-chainwatch/processor/market.go @@ -61,20 +61,6 @@ create table if not exists market_deal_states ); -create table if not exists minerid_dealid_sectorid -( - deal_id bigint not null - constraint sectors_sector_ids_id_fk - references market_deal_proposals(deal_id), - - sector_id bigint not null, - miner_id text not null, - foreign key (sector_id, miner_id) references sector_precommit_info(sector_id, miner_id), - - constraint miner_sector_deal_ids_pk - primary key (miner_id, sector_id, deal_id) -); - `); err != nil { return err } @@ -266,48 +252,6 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip } -func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err) - } - - stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`) - if err != nil { - return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err) - } - - for sde := range dealEvents { - for _, did := range sde.DealIDs { - if _, err := stmt.Exec( - uint64(did), - sde.MinerID.String(), - sde.SectorID, - ); err != nil { - return err - } - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("Failed to close miner sector deals statement: %w", err) - } - - if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil { - return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("Failed to commit miner deal sector table: %w", err) - } - return nil - -} - func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error { start := time.Now() defer func() { diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go index b9835742a0..3a3710c66f 100644 --- a/cmd/lotus-chainwatch/processor/miner.go +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -130,6 +130,20 @@ create table if not exists miner_sector_events primary key (sector_id, event, miner_id, state_root) ); +create table if not exists minerid_dealid_sectorid +( + deal_id bigint not null + constraint sectors_sector_ids_id_fk + references market_deal_proposals(deal_id), + + sector_id bigint not null, + miner_id text not null, + foreign key (sector_id, miner_id) references sector_precommit_info(sector_id, miner_id), + + constraint miner_sector_deal_ids_pk + primary key (miner_id, sector_id, deal_id) +); + `); err != nil { return err } @@ -271,6 +285,11 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) preCommitEvents := make(chan *MinerSectorsEvent, 8) sectorEvents := make(chan *MinerSectorsEvent, 8) partitionEvents := make(chan *MinerSectorsEvent, 8) + dealEvents := make(chan *SectorDealEvent, 8) + + grp.Go(func() error { + return p.storePreCommitDealInfo(dealEvents) + }) grp.Go(func() error { return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents) @@ -279,8 +298,9 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) grp.Go(func() error { defer func() { close(preCommitEvents) + close(dealEvents) }() - return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents) + return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents) }) grp.Go(func() error { @@ -296,7 +316,7 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) return grp.Wait() } -func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent) error { +func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent, sectorDeals chan<- *SectorDealEvent) error { tx, err := p.db.Begin() if err != nil { return err @@ -332,6 +352,13 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA preCommitAdded := make([]uint64, len(changes.Added)) for i, added := range changes.Added { + if len(added.Info.DealIDs) > 0 { + sectorDeals <- &SectorDealEvent{ + MinerID: m.common.addr, + SectorID: uint64(added.Info.SectorNumber), + DealIDs: added.Info.DealIDs, + } + } if added.Info.ReplaceCapacity { if _, err := stmt.Exec( m.common.addr.String(), @@ -902,6 +929,48 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine return tx.Commit() } +func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err) + } + + stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`) + if err != nil { + return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err) + } + + for sde := range dealEvents { + for _, did := range sde.DealIDs { + if _, err := stmt.Exec( + uint64(did), + sde.MinerID.String(), + sde.SectorID, + ); err != nil { + return err + } + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("Failed to close miner sector deals statement: %w", err) + } + + if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil { + return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("Failed to commit miner deal sector table: %w", err) + } + return nil + +} + func (p *Processor) storeMinersPower(miners []minerActorInfo) error { start := time.Now() defer func() {