Skip to content

Commit

Permalink
feat: return channel from AddIndex and GetIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed May 16, 2023
1 parent 5c712b5 commit bc740d4
Show file tree
Hide file tree
Showing 17 changed files with 433 additions and 166 deletions.
19 changes: 15 additions & 4 deletions cmd/migrate-lid/couch-to-yuga.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/filecoin-project/boostd-data/couchbase"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/yugabyte"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -211,16 +212,26 @@ func migrateLidToLidIndex(ctx context.Context, pieceCid cid.Cid, source StoreMig
}

// Load the index from the source store
records, err := source.GetIndex(ctx, pieceCid)
idx, err := source.GetIndex(ctx, pieceCid)
if err != nil {
return false, fmt.Errorf("loading index %s: %w", pieceCid, err)
}

var records []model.Record
for r := range idx {
if r.Error != nil {
return false, r.Error
}
records = append(records, r.Record)
}

// Add the index to the destination store
addStart := time.Now()
err = dest.AddIndex(ctx, pieceCid, records, true)
if err != nil {
return false, fmt.Errorf("adding index %s to store: %w", pieceCid, err)
respch := dest.AddIndex(ctx, pieceCid, records, true)
for resp := range respch {
if resp.Err != "" {
return false, fmt.Errorf("adding index %s to store: %s", pieceCid, err)
}
}
log.Debugw("AddIndex", "took", time.Since(addStart).String())

Expand Down
13 changes: 8 additions & 5 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/boostd-data/ldb"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-address"
vfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand Down Expand Up @@ -43,8 +44,8 @@ import (
type StoreMigrationApi interface {
Start(ctx context.Context) error
IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error)
GetIndex(context.Context, cid.Cid) ([]model.Record, error)
AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error
GetIndex(context.Context, cid.Cid) (<-chan types.IndexRecord, error)
AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress
AddDealForPiece(ctx context.Context, pcid cid.Cid, info model.DealInfo) error
ListPieces(ctx context.Context) ([]cid.Cid, error)
GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error)
Expand Down Expand Up @@ -333,9 +334,11 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f

// Add the index to the store
addStart := time.Now()
err = store.AddIndex(ctx, pieceCid, records, false)
if err != nil {
return false, fmt.Errorf("adding index %s to store: %w", ipath.path, err)
respch := store.AddIndex(ctx, pieceCid, records, false)
for resp := range respch {
if resp.Err != "" {
return false, fmt.Errorf("adding index %s to store: %s", ipath.path, err)
}
}
log.Debugw("AddIndex", "took", time.Since(addStart).String())

Expand Down
31 changes: 21 additions & 10 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var log = logger.Logger("boostd-data-client")
type Store struct {
client struct {
AddDealForPiece func(context.Context, cid.Cid, model.DealInfo) error
AddIndex func(context.Context, cid.Cid, []model.Record, bool) error
AddIndex func(context.Context, cid.Cid, []model.Record, bool) <-chan types.AddIndexProgress
IsIndexed func(ctx context.Context, pieceCid cid.Cid) (bool, error)
IsCompleteIndex func(ctx context.Context, pieceCid cid.Cid) (bool, error)
GetIndex func(context.Context, cid.Cid) (<-chan types.IndexRecord, error)
Expand All @@ -38,16 +38,17 @@ type Store struct {
FlaggedPiecesList func(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context) (int, error)
}
closer jsonrpc.ClientCloser
closer jsonrpc.ClientCloser
dialOpts []jsonrpc.Option
}

func NewStore() *Store {
return &Store{}
func NewStore(dialOpts ...jsonrpc.Option) *Store {
return &Store{dialOpts: dialOpts}
}

func (s *Store) Dial(ctx context.Context, addr string) error {
var err error
s.closer, err = jsonrpc.NewClient(ctx, addr, "boostddata", &s.client, nil)
s.closer, err = jsonrpc.NewMergeClient(ctx, addr, "boostddata", []interface{}{&s.client}, nil, s.dialOpts...)
if err != nil {
return fmt.Errorf("dialing local index directory server: %w", err)
}
Expand All @@ -68,6 +69,9 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (index.Index, er

var records []index.Record
for r := range resp {
if r.Error != nil {
return nil, r.Error
}
records = append(records, index.Record{
Cid: r.Cid,
Offset: r.Offset,
Expand All @@ -93,10 +97,10 @@ func (s *Store) GetRecords(ctx context.Context, pieceCid cid.Cid) ([]model.Recor

var records []model.Record
for r := range resp {
records = append(records, model.Record{
Cid: r.Cid,
OffsetSize: model.OffsetSize{Offset: r.Offset},
})
if r.Error != nil {
return nil, r.Error
}
records = append(records, r.Record)
}

return records, nil
Expand All @@ -121,7 +125,14 @@ func (s *Store) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
log.Debugw("add-index", "piece-cid", pieceCid, "records", len(records))

return s.client.AddIndex(ctx, pieceCid, records, isCompleteIndex)
respch := s.client.AddIndex(ctx, pieceCid, records, isCompleteIndex)
for resp := range respch {
if resp.Err != "" {
return fmt.Errorf("add index with piece cid %s: %s", pieceCid, resp.Err)
}
//fmt.Printf("%s: Percent complete: %f%%\n", time.Now(), resp.Progress*100)
}
return nil
}

func (s *Store) IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
Expand Down
48 changes: 26 additions & 22 deletions extern/boostd-data/couchbase/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package couchbase
import (
"context"
"fmt"
"github.com/ipld/go-car/v2/index"
"time"

"github.com/filecoin-project/boostd-data/model"
Expand Down Expand Up @@ -155,12 +154,7 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.In

recs := make(chan types.IndexRecord, len(records))
for _, r := range records {
recs <- types.IndexRecord{
Record: index.Record{
Cid: r.Cid,
Offset: r.Offset,
},
}
recs <- types.IndexRecord{Record: r}
}
close(recs)

Expand Down Expand Up @@ -193,7 +187,7 @@ func (s *Store) IsCompleteIndex(ctx context.Context, pieceCid cid.Cid) (bool, er
return md.CompleteIndex, nil
}

func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress {
log.Debugw("handle.add-index", "records", len(records))

ctx, span := tracing.Tracer.Start(ctx, "store.add_index")
Expand All @@ -209,22 +203,32 @@ func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.
mhs = append(mhs, r.Cid.Hash())
}

setMhStart := time.Now()
err := s.db.SetMultihashesToPieceCid(ctx, mhs, pieceCid)
if err != nil {
return fmt.Errorf("failed to add entry from mh to pieceCid: %w", err)
}
log.Debugw("handled.add-index SetMultihashesToPieceCid", "took", time.Since(setMhStart).String())
progress := make(chan types.AddIndexProgress, 1)
go func() {
defer close(progress)
progress <- types.AddIndexProgress{Progress: 0}

// Add a mapping from piece cid -> offset / size of each block so that
// clients can get the block info for all blocks in a piece
addOffsetsStart := time.Now()
if err := s.db.AddIndexRecords(ctx, pieceCid, records); err != nil {
return err
}
log.Debugw("handled.add-index AddIndexRecords", "took", time.Since(addOffsetsStart).String())
setMhStart := time.Now()
err := s.db.SetMultihashesToPieceCid(ctx, mhs, pieceCid)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
log.Debugw("handled.add-index SetMultihashesToPieceCid", "took", time.Since(setMhStart).String())
progress <- types.AddIndexProgress{Progress: 0.5}

// Add a mapping from piece cid -> offset / size of each block so that
// clients can get the block info for all blocks in a piece
addOffsetsStart := time.Now()
if err := s.db.AddIndexRecords(ctx, pieceCid, records); err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
log.Debugw("handled.add-index AddIndexRecords", "took", time.Since(addOffsetsStart).String())
progress <- types.AddIndexProgress{Progress: 1}
}()

return s.db.MarkIndexingComplete(ctx, pieceCid, len(records), isCompleteIndex)
return progress
}

func (s *Store) IndexedAt(ctx context.Context, pieceCid cid.Cid) (time.Time, error) {
Expand Down
123 changes: 67 additions & 56 deletions extern/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,7 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.In

recs := make(chan types.IndexRecord, len(records))
for _, r := range records {
recs <- types.IndexRecord{
Record: carindex.Record{
Cid: r.Cid,
Offset: r.Offset,
},
}
recs <- types.IndexRecord{Record: r}
}
close(recs)

Expand Down Expand Up @@ -267,7 +262,7 @@ func (s *Store) IsCompleteIndex(ctx context.Context, pieceCid cid.Cid) (bool, er
return md.CompleteIndex, nil
}

func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress {
log.Debugw("handle.add-index", "records", len(records))

ctx, span := tracing.Tracer.Start(ctx, "store.add_index")
Expand All @@ -277,68 +272,84 @@ func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.
log.Debugw("handled.add-index", "took", time.Since(now).String())
}(time.Now())

s.Lock()
defer s.Unlock()
progress := make(chan types.AddIndexProgress, 1)
go func() {
defer close(progress)

var recs []carindex.Record
for _, r := range records {
recs = append(recs, carindex.Record{
Cid: r.Cid,
Offset: r.Offset,
})
}
s.Lock()
defer s.Unlock()

err := s.db.SetMultihashesToPieceCid(ctx, recs, pieceCid)
if err != nil {
return fmt.Errorf("failed to add entry from mh to pieceCid: %w", err)
}
var recs []carindex.Record
for _, r := range records {
recs = append(recs, carindex.Record{
Cid: r.Cid,
Offset: r.Offset,
})
}

// get and set next cursor (handle synchronization, maybe with CAS)
cursor, keyCursorPrefix, err := s.db.NextCursor(ctx)
if err != nil {
return fmt.Errorf("couldnt generate next cursor: %w", err)
}
err := s.db.SetMultihashesToPieceCid(ctx, recs, pieceCid)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
progress <- types.AddIndexProgress{Progress: 0.45}

// allocate metadata for pieceCid
err = s.db.SetNextCursor(ctx, cursor+1)
if err != nil {
return err
}
// get and set next cursor (handle synchronization, maybe with CAS)
cursor, keyCursorPrefix, err := s.db.NextCursor(ctx)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}

// process index and store entries
for _, rec := range records {
err := s.db.AddIndexRecord(ctx, keyCursorPrefix, rec)
// allocate metadata for pieceCid
err = s.db.SetNextCursor(ctx, cursor+1)
if err != nil {
return err
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
}

// get the metadata for the piece
md, err := s.db.GetPieceCidToMetadata(ctx, pieceCid)
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return fmt.Errorf("getting piece cid metadata for piece %s: %w", pieceCid, err)
// process index and store entries
for _, rec := range records {
err := s.db.AddIndexRecord(ctx, keyCursorPrefix, rec)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
}
// there isn't yet any metadata, so create new metadata
md = newLeveldbMetadata()
}
progress <- types.AddIndexProgress{Progress: 0.9}

// mark indexing as complete
md.Cursor = cursor
md.IndexedAt = time.Now()
md.CompleteIndex = isCompleteIndex
// get the metadata for the piece
md, err := s.db.GetPieceCidToMetadata(ctx, pieceCid)
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
// there isn't yet any metadata, so create new metadata
md = newLeveldbMetadata()
}

err = s.db.SetPieceCidToMetadata(ctx, pieceCid, md)
if err != nil {
return err
}
// mark indexing as complete
md.Cursor = cursor
md.IndexedAt = time.Now()
md.CompleteIndex = isCompleteIndex

err = s.db.Sync(ctx, ds.NewKey(fmt.Sprintf("%d", cursor)))
if err != nil {
return err
}
err = s.db.SetPieceCidToMetadata(ctx, pieceCid, md)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
progress <- types.AddIndexProgress{Progress: 0.95}

return nil
err = s.db.Sync(ctx, ds.NewKey(fmt.Sprintf("%d", cursor)))
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
progress <- types.AddIndexProgress{Progress: 1}
}()

return progress
}

func (s *Store) IndexedAt(ctx context.Context, pieceCid cid.Cid) (time.Time, error) {
Expand Down
6 changes: 3 additions & 3 deletions extern/boostd-data/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ type Metadata struct {

// Record is the information stored in the index for each block in a piece
type Record struct {
Cid cid.Cid
Cid cid.Cid `json:"c"`
OffsetSize
}

type OffsetSize struct {
// Offset is the offset into the CAR file of the section, where a section
// is <section size><cid><block data>
Offset uint64
Offset uint64 `json:"o"`
// Size is the size of the block data (not the whole section)
Size uint64
Size uint64 `json:"s"`
}

func (ofsz *OffsetSize) MarshallBase64() string {
Expand Down
Loading

0 comments on commit bc740d4

Please sign in to comment.