Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split flagged pieces #1493

Merged
merged 1 commit into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Store struct {
RemovePieceMetadata func(context.Context, cid.Cid) error
RemoveIndexes func(context.Context, cid.Cid) error
NextPiecesToCheck func(ctx context.Context) ([]cid.Cid, error)
FlagPiece func(ctx context.Context, pieceCid cid.Cid) error
FlagPiece func(ctx context.Context, pieceCid cid.Cid, hasUnsealedDeal bool) error
UnflagPiece func(ctx context.Context, pieceCid cid.Cid) error
FlaggedPiecesList func(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context) (int, error)
Expand Down Expand Up @@ -171,8 +171,8 @@ func (s *Store) NextPiecesToCheck(ctx context.Context) ([]cid.Cid, error) {
return s.client.NextPiecesToCheck(ctx)
}

func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid) error {
return s.client.FlagPiece(ctx, pieceCid)
func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedDeal bool) error {
return s.client.FlagPiece(ctx, pieceCid, hasUnsealedDeal)
}

func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error {
Expand Down
2 changes: 1 addition & 1 deletion extern/boostd-data/couchbase/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (s *Store) NextPiecesToCheck(ctx context.Context) ([]cid.Cid, error) {
return s.db.NextPiecesToCheck(ctx)
}

func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid) error {
func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid, _ bool) error {
ctx, span := tracing.Tracer.Start(ctx, "store.flag_piece")
span.SetAttributes(attribute.String("pieceCid", pieceCid.String()))
defer span.End()
Expand Down
10 changes: 6 additions & 4 deletions extern/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ const pieceMetadataVersion = "1"
var log = logging.Logger("boostd-data-ldb")

type LeveldbFlaggedMetadata struct {
CreatedAt time.Time `json:"c"`
UpdatedAt time.Time `json:"u"`
CreatedAt time.Time `json:"c"`
UpdatedAt time.Time `json:"u"`
HasUnsealedCopy bool `json:"huc"`
}

type LeveldbMetadata struct {
Expand Down Expand Up @@ -397,8 +398,8 @@ func (s *Store) NextPiecesToCheck(ctx context.Context) ([]cid.Cid, error) {
return s.db.NextPiecesToCheck(ctx)
}

func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid) error {
log.Debugw("handle.flag-piece", "piece-cid", pieceCid)
func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy bool) error {
log.Debugw("handle.flag-piece", "piece-cid", pieceCid, "hasUnsealedCopy", hasUnsealedCopy)

ctx, span := tracing.Tracer.Start(ctx, "store.flag_piece")
defer span.End()
Expand All @@ -423,6 +424,7 @@ func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid) error {
}

fm.UpdatedAt = now
fm.HasUnsealedCopy = hasUnsealedCopy

// Write the piece metadata back to the db
err = s.db.SetPieceCidToFlagged(ctx, pieceCid, fm)
Expand Down
6 changes: 4 additions & 2 deletions extern/boostd-data/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (ofsz *OffsetSize) UnmarshallBase64(str string) error {
// FlaggedPiece is a piece that has been flagged for the user's attention
// (eg because the index is missing)
type FlaggedPiece struct {
CreatedAt time.Time
PieceCid cid.Cid
PieceCid cid.Cid
CreatedAt time.Time
UpdatedAt time.Time
HasUnsealedCopy bool
}
2 changes: 1 addition & 1 deletion extern/boostd-data/svc/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Service interface {
RemovePieceMetadata(context.Context, cid.Cid) error
RemoveIndexes(context.Context, cid.Cid) error
NextPiecesToCheck(ctx context.Context) ([]cid.Cid, error)
FlagPiece(ctx context.Context, pieceCid cid.Cid) error
FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy bool) error
UnflagPiece(ctx context.Context, pieceCid cid.Cid) error
FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount(ctx context.Context) (int, error)
Expand Down
3 changes: 2 additions & 1 deletion extern/boostd-data/yugabyte/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ CREATE INDEX IF NOT EXISTS PieceTrackerUpdatedAt ON PieceTracker (UpdatedAt);
CREATE TABLE IF NOT EXISTS PieceFlagged (
PieceCid TEXT PRIMARY KEY,
CreatedAt TIMESTAMP,
UpdatedAt TIMESTAMP
UpdatedAt TIMESTAMP,
HasUnsealedCopy BOOLEAN
);

CREATE INDEX IF NOT EXISTS PieceFlaggedCreatedAt ON PieceFlagged (CreatedAt);
Expand Down
16 changes: 9 additions & 7 deletions extern/boostd-data/yugabyte/piecedoctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,16 @@ func (s *Store) getPieceCheckPeriod(ctx context.Context) (time.Duration, error)
return period, nil
}

func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid) error {
func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy bool) error {
ctx, span := tracing.Tracer.Start(ctx, "store.flag_piece")
span.SetAttributes(attribute.String("pieceCid", pieceCid.String()))
defer span.End()

now := time.Now()
qry := `INSERT INTO PieceFlagged (PieceCid, CreatedAt, UpdatedAt) ` +
`VALUES ($1, $2, $3) ` +
qry := `INSERT INTO PieceFlagged (PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy) ` +
`VALUES ($1, $2, $3, $4) ` +
`ON CONFLICT (PieceCid) DO UPDATE SET UpdatedAt = excluded.UpdatedAt`
_, err := s.db.Exec(ctx, qry, pieceCid.String(), now, now)
_, err := s.db.Exec(ctx, qry, pieceCid.String(), now, now, hasUnsealedCopy)
if err != nil {
return fmt.Errorf("flagging piece %s: %w", pieceCid, err)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset

var args []interface{}
idx := 0
qry := `SELECT PieceCid, CreatedAt from PieceFlagged `
qry := `SELECT PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy from PieceFlagged`
if cursor != nil {
qry += `WHERE CreatedAt < $1 `
args = append(args, cursor)
Expand All @@ -253,8 +253,10 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset
var pieces []model.FlaggedPiece
var pcid string
var createdAt time.Time
var updatedAt time.Time
var hasUnsealedCopy bool
for rows.Next() {
err := rows.Scan(&pcid, &createdAt)
err := rows.Scan(&pcid, &createdAt, &updatedAt, &hasUnsealedCopy)
if err != nil {
return nil, fmt.Errorf("scanning flagged piece: %w", err)
}
Expand All @@ -263,7 +265,7 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset
if err != nil {
return nil, fmt.Errorf("parsing flagged piece cid %s: %w", pcid, err)
}
pieces = append(pieces, model.FlaggedPiece{PieceCid: c, CreatedAt: createdAt})
pieces = append(pieces, model.FlaggedPiece{PieceCid: c, CreatedAt: createdAt, UpdatedAt: updatedAt, HasUnsealedCopy: hasUnsealedCopy})
}

if err := rows.Err(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions piecedirectory/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *Doctor) checkPiece(ctx context.Context, pieceCid cid.Cid, lu *sectorsta
}

if !isIndexed {
err = d.store.FlagPiece(ctx, pieceCid)
err = d.store.FlagPiece(ctx, pieceCid, false)
if err != nil {
return fmt.Errorf("failed to flag unindexed piece %s: %w", pieceCid, err)
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func (d *Doctor) checkPiece(ctx context.Context, pieceCid cid.Cid, lu *sectorsta
}

if !hasUnsealedDeal && !lacksActiveSector {
err = d.store.FlagPiece(ctx, pieceCid)
err = d.store.FlagPiece(ctx, pieceCid, false)
if err != nil {
return fmt.Errorf("failed to flag piece %s with no unsealed deal: %w", pieceCid, err)
}
Expand Down
2 changes: 1 addition & 1 deletion piecedirectory/piecedirectory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.Equal(t, 0, len(pcids))

// Flag a piece
err = cl.FlagPiece(ctx, commpCalc.PieceCID)
err = cl.FlagPiece(ctx, commpCalc.PieceCID, false)
require.NoError(t, err)

// Count and list of pieces should contain one piece
Expand Down