diff --git a/market/fakelm/lmimpl.go b/market/fakelm/lmimpl.go index d52dc1e35..99ea25bbb 100644 --- a/market/fakelm/lmimpl.go +++ b/market/fakelm/lmimpl.go @@ -76,38 +76,72 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, // TODO: Add snap, Add open_sector_pieces var ssip []struct { - PieceCID *string `db:"piece_cid"` - DealID *int64 `db:"f05_deal_id"` - DDOPAM *string `db:"ddo_pam"` - Complete bool `db:"after_commit_msg_success"` - Failed bool `db:"failed"` - SDR bool `db:"after_sdr"` - PoRep bool `db:"after_porep"` - Tree bool `db:"after_tree_r"` - IsSnap bool `db:"is_snap"` + PieceCID *string `db:"piece_cid"` + DealID *int64 `db:"f05_deal_id"` + DDOPAM *string `db:"ddo_pam"` + Complete bool `db:"after_commit_msg_success"` + Failed bool `db:"failed"` + SDR bool `db:"after_sdr"` + PoRep bool `db:"after_porep"` + Tree bool `db:"after_tree_r"` + IsSnap bool `db:"is_snap"` + Encode bool `db:"after_encode"` + SnapProve bool `db:"after_prove"` + SnapCommit bool `db:"after_prove_msg_success"` + SnapMoveStorage bool `db:"after_move_storage"` } err := l.db.Select(ctx, &ssip, ` - WITH CheckCommit AS ( - SELECT - COALESCE(sp.sp_id, sm.sp_id) AS sp_id, - COALESCE(sp.sector_number, sm.sector_num) AS sector_number, - COALESCE(sp.after_commit_msg, TRUE) AS after_commit_msg, - COALESCE(sp.failed, FALSE) AS failed, - COALESCE(sp.after_sdr, TRUE) AS after_sdr, - COALESCE(sp.after_porep, TRUE) AS after_porep, - COALESCE(sp.after_tree_r, TRUE) AS after_tree_r, - COALESCE(sp.after_commit_msg_success, TRUE) AS after_commit_msg_success, - COALESCE(snap.after_prove_msg_success, snap.after_prove_msg_success is null) AS after_snap_msg_success, - COALESCE(sm.orig_sealed_cid != sm.cur_sealed_cid, FALSE) AS is_snap - FROM - sectors_sdr_pipeline sp - FULL OUTER JOIN sectors_meta sm ON sp.sp_id = sm.sp_id AND sp.sector_number = sm.sector_num - LEFT JOIN sectors_snap_pipeline snap ON sm.sp_id = snap.sp_id AND sm.sector_num = snap.sector_number - WHERE - (sp.sp_id = $1 AND sp.sector_number = $2) OR (sm.sp_id = $1 AND sm.sector_num = $2) - ), - MetaPieces AS ( + WITH SectorMeta AS ( + SELECT + sm.sp_id, + sm.sector_num, + sm.orig_sealed_cid, + sm.cur_sealed_cid + FROM + sectors_meta sm + WHERE + sm.sp_id = $1 AND sm.sector_num = $2 + ), + SDRMeta AS ( + SELECT + sp.sp_id, + sp.sector_number, + sp.after_commit_msg, + sp.failed, + sp.after_sdr, + sp.after_porep, + sp.after_tree_r, + sp.after_commit_msg_success + FROM + sectors_sdr_pipeline sp + WHERE + sp.sp_id = $1 AND sp.sector_number = $2 + ), + CheckCommit AS ( + SELECT + COALESCE(sp.sp_id, sm.sp_id) AS sp_id, + COALESCE(sp.sector_number, sm.sector_num) AS sector_number, + COALESCE(sp.after_commit_msg, TRUE) AS after_commit_msg, + COALESCE(sp.failed, FALSE) AS failed, + COALESCE(sp.after_sdr, TRUE) AS after_sdr, + COALESCE(sp.after_porep, TRUE) AS after_porep, + COALESCE(sp.after_tree_r, TRUE) AS after_tree_r, + COALESCE(sp.after_commit_msg_success, TRUE) AS after_commit_msg_success, + COALESCE(snap.after_prove_msg_success, snap.after_prove_msg_success is null) AS after_snap_msg_success, + COALESCE(sm.orig_sealed_cid != sm.cur_sealed_cid, FALSE) AS is_snap, + COALESCE(snap.after_encode, FALSE) AS after_encode, + COALESCE(snap.after_prove, FALSE) AS after_prove, + COALESCE(snap.after_prove_msg_success, FALSE) AS after_prove_msg_success, + COALESCE(snap.after_move_storage, FALSE) AS after_move_storage + FROM + SDRMeta sp + FULL OUTER JOIN SectorMeta sm ON sp.sp_id = sm.sp_id AND sp.sector_number = sm.sector_num + LEFT JOIN sectors_snap_pipeline snap ON sm.sp_id = snap.sp_id AND sm.sector_num = snap.sector_number + WHERE + (sp.sp_id = $1 AND sp.sector_number = $2) OR (sm.sp_id = $1 AND sm.sector_num = $2) + ), + MetaPieces AS ( SELECT mp.piece_cid, mp.f05_deal_id, @@ -117,7 +151,11 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, cc.after_sdr, cc.after_tree_r, cc.after_porep, - cc.is_snap + cc.is_snap, + cc.after_encode, + cc.after_prove, + cc.after_prove_msg_success, + cc.after_move_storage FROM sectors_meta_pieces mp INNER JOIN @@ -135,7 +173,11 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, cc.after_sdr, cc.after_tree_r, cc.after_porep, - FALSE as is_snap + FALSE as is_snap, + FALSE as after_encode, + FALSE as after_prove, + FALSE as after_prove_msg_success, + FALSE as after_move_storage FROM sectors_sdr_initial_pieces ip INNER JOIN @@ -153,7 +195,11 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, FALSE AS after_sdr, FALSE AS after_tree_r, FALSE AS after_porep, - TRUE AS is_snap + TRUE AS is_snap, + cc.after_encode, + cc.after_prove, + cc.after_prove_msg_success, + cc.after_move_storage FROM sectors_snap_initial_pieces ip INNER JOIN @@ -171,7 +217,11 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, FALSE as after_sdr, FALSE as after_tree_r, FALSE as after_porep, - op.is_snap as is_snap + op.is_snap as is_snap, + FALSE as after_encode, + FALSE as after_prove, + FALSE as after_prove_msg_success, + FALSE as after_move_storage FROM open_sector_pieces op WHERE @@ -191,6 +241,7 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, var deals []abi.DealID var seenDealIDs = make(map[abi.DealID]struct{}) + var isSnap bool if len(ssip) > 0 { for _, d := range ssip { @@ -221,6 +272,10 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, dealID = abi.DealID(val) } + if !isSnap && d.IsSnap { + isSnap = true + } + if _, ok := seenDealIDs[dealID]; !ok { deals = append(deals, dealID) seenDealIDs[dealID] = struct{}{} @@ -268,9 +323,19 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, currentSSIP := ssip[0] switch { + case isSnap && !currentSSIP.Encode: + ret.State = lapi.SectorState(sealing.UpdateReplica) + case currentSSIP.Encode && !currentSSIP.SnapProve: + ret.State = lapi.SectorState(sealing.ProveReplicaUpdate) + case currentSSIP.SnapProve && !currentSSIP.SnapCommit: + ret.State = lapi.SectorState(sealing.SubmitReplicaUpdate) + case currentSSIP.SnapCommit && !currentSSIP.SnapMoveStorage: + ret.State = lapi.SectorState(sealing.FinalizeReplicaUpdate) + case currentSSIP.SnapMoveStorage: + ret.State = lapi.SectorState(sealing.Proving) case currentSSIP.Failed: ret.State = lapi.SectorState(sealing.FailedUnrecoverable) - case !currentSSIP.SDR: + case !isSnap && !currentSSIP.SDR: ret.State = lapi.SectorState(sealing.PreCommit1) case currentSSIP.SDR && !currentSSIP.Tree: ret.State = lapi.SectorState(sealing.PreCommit2)