diff --git a/cli/storage-deals.go b/cli/storage-deals.go index 80c14c31..37a4574a 100644 --- a/cli/storage-deals.go +++ b/cli/storage-deals.go @@ -749,6 +749,10 @@ func outputStorageDeal(deal *market.MinerDeal) error { if err != nil { return err } + dataCid, err := cid.Decode(label) + if err != nil { + return fmt.Errorf("parse %s to cid failed: %v", label, err) + } if deal.AddFundsCid != nil { addFundsCid = deal.AddFundsCid.String() } @@ -783,7 +787,7 @@ func outputStorageDeal(deal *market.MinerDeal) error { {"StoragePricePerEpoch", deal.Proposal.StoragePricePerEpoch}, {"ProviderCollateral", deal.Proposal.ProviderCollateral}, {"ClientCollateral", deal.Proposal.ClientCollateral}, - {"Label", label}, + {"Label", dataCid}, {"MinerPeerID", deal.Miner.Pretty()}, {"ClientPeerID", deal.Client.Pretty()}, {"FundsReserved", deal.FundsReserved}, diff --git a/tools/index/README.md b/tools/index/README.md index a3894e04..277ab29c 100644 --- a/tools/index/README.md +++ b/tools/index/README.md @@ -18,6 +18,9 @@ make index * --mysql-url:MySQL 的连接地址,用于存储 shard 状态,要和 `droplet` 使用同一个数据库,表名是 `shards`。 * --droplet-url:droplet 服务的 RPC 地址。 * --droplet-token:droplet 服务的 token。 +* --start:订单创建时间需大于设置的值。 +* --end:订单创建时间需小于设置的值。 +* --concurrency:生成索引的并发数,默认是 1。 ```bash ./index-tool gen-index \ diff --git a/tools/index/main.go b/tools/index/main.go index c6db1369..939e523e 100644 --- a/tools/index/main.go +++ b/tools/index/main.go @@ -6,11 +6,15 @@ import ( "log" "os" "path/filepath" + "sort" "strings" + "sync" + "time" dagstore2 "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/index" "github.com/filecoin-project/dagstore/shard" + "github.com/filecoin-project/dagstore/throttle" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-jsonrpc" marketapi "github.com/filecoin-project/venus/venus-shared/api/market/v1" @@ -19,6 +23,7 @@ import ( "github.com/ipfs-force-community/droplet/v2/dagstore" "github.com/ipfs-force-community/droplet/v2/models/mysql" "github.com/ipfs-force-community/droplet/v2/models/repo" + "github.com/ipfs-force-community/droplet/v2/utils" "github.com/ipld/go-car/v2" carindex "github.com/ipld/go-car/v2/index" "github.com/multiformats/go-multihash" @@ -29,9 +34,8 @@ const indexSuffix = ".full.idx" var ( mongoURLFlag = &cli.StringFlag{ - Name: "mongo-url", - Usage: "mongo url, use for store topIndex", - Required: true, + Name: "mongo-url", + Usage: "mongo url, use for store topIndex", } mysqlURLFlag = &cli.StringFlag{ Name: "mysql-url", @@ -58,6 +62,19 @@ var ( Usage: "droplet token", Required: true, } + startFlag = &cli.StringFlag{ + Name: "start", + Usage: "The index will only be created when the deal creation time is greater than 'start', eg. 2023-07-26", + } + endFlag = &cli.StringFlag{ + Name: "end", + Usage: "The index will only be created when the deal creation time is less than end 'end', eg. 2023-07-27", + } + concurrencyFlag = &cli.IntFlag{ + Name: "concurrency", + Usage: "Concurrent number of indexes generated", + Value: 1, + } ) func main() { @@ -85,6 +102,9 @@ var generateIndexCmd = &cli.Command{ carDirFlag, dropletTokenFlag, dropletURLFlag, + startFlag, + endFlag, + concurrencyFlag, }, Action: func(cctx *cli.Context) error { ctx := cctx.Context @@ -94,6 +114,10 @@ var generateIndexCmd = &cli.Command{ if err != nil { return err } + p.concurrency = cctx.Int(concurrencyFlag.Name) + if p.concurrency < 1 { + p.concurrency = 1 + } fmt.Println("car dir:", carDir, "index dir:", indexDir) @@ -101,12 +125,20 @@ var generateIndexCmd = &cli.Command{ }, } +type pieceInfo struct { + piece string + payloadSize uint64 + pieceSize uint64 +} + type params struct { api marketapi.IMarket close jsonrpc.ClientCloser topIndexRepo *dagstore.MongoTopIndex shardRepo repo.IShardRepo pieces map[string]struct{} + pieceInfos []*pieceInfo + concurrency int } func paramsFromContext(cctx *cli.Context) (*params, error) { @@ -131,15 +163,37 @@ func paramsFromContext(cctx *cli.Context) (*params, error) { if err != nil { return nil, fmt.Errorf("list deal failed: %v", err) } + sort.Slice(deals, func(i, j int) bool { + return deals[i].CreationTime.Time().After(deals[i].CreationTime.Time()) + }) + + start, end, err := getStartEndTime(cctx) + if err != nil { + return nil, fmt.Errorf("parse time failed: %v", err) + } pieces := make(map[string]struct{}, len(deals)) + pieceInfos := make([]*pieceInfo, 0, len(deals)) for _, deal := range deals { - pieces[deal.Proposal.PieceCID.String()] = struct{}{} + if start != nil && start.After(deal.CreationTime.Time()) { + continue + } + if end != nil && end.Before(deal.CreationTime.Time()) { + continue + } + p := deal.Proposal.PieceCID.String() + if _, ok := pieces[p]; !ok { + pieces[p] = struct{}{} + pieceInfos = append(pieceInfos, &pieceInfo{piece: p, payloadSize: deal.PayloadSize, pieceSize: uint64(deal.Proposal.PieceSize)}) + } } - fmt.Printf("had %d active deals, had %d piece\n", len(deals), len(pieces)) + fmt.Printf("active deals: %d, valid deals: %d, pieces: %d\n", len(deals), len(pieceInfos), len(pieces)) - topIndexRepo, err := dagstore.NewMongoTopIndex(ctx, mongoURL) - if err != nil { - return nil, fmt.Errorf("connect to mongo failed: %v", err) + var topIndexRepo *dagstore.MongoTopIndex + if len(mongoURL) != 0 { + topIndexRepo, err = dagstore.NewMongoTopIndex(ctx, mongoURL) + if err != nil { + return nil, fmt.Errorf("connect to mongo failed: %v", err) + } } cfg := config.DefaultMarketConfig @@ -155,35 +209,52 @@ func paramsFromContext(cctx *cli.Context) (*params, error) { topIndexRepo: topIndexRepo, shardRepo: repo.ShardRepo(), pieces: pieces, + pieceInfos: pieceInfos, }, nil } -func generateIndex(ctx context.Context, carDir string, indexDir string, p *params) error { - for piece := range p.pieces { - has, err := hasIndex(ctx, piece, indexDir) +func getStartEndTime(cctx *cli.Context) (*time.Time, *time.Time, error) { + var start, end *time.Time + if cctx.IsSet(startFlag.Name) { + t, err := time.Parse("2006-01-02", cctx.String(startFlag.Name)) if err != nil { - return err + return nil, nil, err } - if has { - continue + start = &t + } + if cctx.IsSet(endFlag.Name) { + t, err := time.Parse("2006-01-02", cctx.String(endFlag.Name)) + if err != nil { + return nil, nil, err } + end = &t + } + fmt.Println("start:", start, "end:", end) + return start, end, nil +} +func generateIndex(ctx context.Context, carDir string, indexDir string, p *params) error { + doGenIndex := func(pi *pieceInfo) error { + piece := pi.piece f, err := os.Open(filepath.Join(carDir, piece)) if err != nil { - fmt.Println(err) - continue + return err } defer f.Close() //nolint - idx, err := car.ReadOrGenerateIndex(f, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true)) + // piece may padding + r := utils.NewAlgnZeroMountReader(f, int(pi.payloadSize), int(pi.pieceSize)) + idx, err := car.ReadOrGenerateIndex(r, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true)) if err == nil { fmt.Printf("generate index success: %s\n", piece) if err := saveIndex(idx, indexDir, piece); err != nil { return fmt.Errorf("save index failed, piece: %s, error: %v", piece, err) } - if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil { - return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err) + if p.topIndexRepo != nil { + if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil { + return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err) + } } if err := saveShardToMysql(ctx, piece, p.shardRepo); err != nil { return fmt.Errorf("save shard to mysql failed, piece: %s, error: %vs", piece, err) @@ -191,9 +262,45 @@ func generateIndex(ctx context.Context, carDir string, indexDir string, p *param } else { fmt.Printf("generate index failed, piece: %s, error: %v\n", piece, err) } + return nil } - return nil + wg := sync.WaitGroup{} + th := throttle.Fixed(p.concurrency) + var globalErr error + for _, pi := range p.pieceInfos { + pi := pi + has, err := hasIndex(ctx, pi.piece, indexDir) + if err != nil { + return err + } + if has { + fmt.Println("already had index:", pi.piece) + continue + } + if globalErr != nil { + break + } + + wg.Add(1) + go func() { + defer wg.Done() + err = th.Do(ctx, func(ctx context.Context) error { + err := doGenIndex(pi) + if err != nil && os.IsNotExist(err) { + fmt.Println(err) + return nil + } + return err + }) + if err != nil { + globalErr = err + } + }() + } + wg.Wait() + + return globalErr } func hasIndex(ctx context.Context, piece string, indexDir string) (bool, error) { @@ -240,7 +347,7 @@ func saveShardToMysql(ctx context.Context, piece string, shardRepo repo.IShardRe Error: "", } - return shardRepo.CreateShard(ctx, &shard) + return shardRepo.SaveShard(ctx, &shard) } var migrateIndexCmd = &cli.Command{ @@ -287,6 +394,7 @@ func migrateIndex(ctx context.Context, indexDir string, p *params) error { return err } if has { + fmt.Println("already had shard:", piece) return nil } @@ -300,9 +408,12 @@ func migrateIndex(ctx context.Context, indexDir string, p *params) error { return err } - if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil { - return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err) + if p.topIndexRepo != nil { + if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil { + return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err) + } } + if err := saveShardToMysql(ctx, piece, p.shardRepo); err != nil { return fmt.Errorf("save shard to mysql failed, piece: %s, error: %vs", piece, err) }