Skip to content

Commit

Permalink
Merge pull request #386 from ipfs-force-community/fix/gen-index-tool
Browse files Browse the repository at this point in the history
Fix/gen index tool
  • Loading branch information
LinZexiao authored and simlecode committed Jul 31, 2023
1 parent fd76589 commit 05ead19
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 24 deletions.
6 changes: 5 additions & 1 deletion cli/storage-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ func outputStorageDeal(deal *market.MinerDeal) error {
if err != nil {
return err
}
dataCid, err := cid.Decode(label)
if err != nil {
return fmt.Errorf("parse %s to cid failed: %v", label, err)
}
if deal.AddFundsCid != nil {
addFundsCid = deal.AddFundsCid.String()
}
Expand Down Expand Up @@ -783,7 +787,7 @@ func outputStorageDeal(deal *market.MinerDeal) error {
{"StoragePricePerEpoch", deal.Proposal.StoragePricePerEpoch},
{"ProviderCollateral", deal.Proposal.ProviderCollateral},
{"ClientCollateral", deal.Proposal.ClientCollateral},
{"Label", label},
{"Label", dataCid},
{"MinerPeerID", deal.Miner.Pretty()},
{"ClientPeerID", deal.Client.Pretty()},
{"FundsReserved", deal.FundsReserved},
Expand Down
3 changes: 3 additions & 0 deletions tools/index/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ make index
* --mysql-url:MySQL 的连接地址,用于存储 shard 状态,要和 `droplet` 使用同一个数据库,表名是 `shards`
* --droplet-url:droplet 服务的 RPC 地址。
* --droplet-token:droplet 服务的 token。
* --start:订单创建时间需大于设置的值。
* --end:订单创建时间需小于设置的值。
* --concurrency:生成索引的并发数,默认是 1。

```bash
./index-tool gen-index \
Expand Down
157 changes: 134 additions & 23 deletions tools/index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"log"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

dagstore2 "github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc"
marketapi "github.com/filecoin-project/venus/venus-shared/api/market/v1"
Expand All @@ -19,6 +23,7 @@ import (
"github.com/ipfs-force-community/droplet/v2/dagstore"
"github.com/ipfs-force-community/droplet/v2/models/mysql"
"github.com/ipfs-force-community/droplet/v2/models/repo"
"github.com/ipfs-force-community/droplet/v2/utils"
"github.com/ipld/go-car/v2"
carindex "github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
Expand All @@ -29,9 +34,8 @@ const indexSuffix = ".full.idx"

var (
mongoURLFlag = &cli.StringFlag{
Name: "mongo-url",
Usage: "mongo url, use for store topIndex",
Required: true,
Name: "mongo-url",
Usage: "mongo url, use for store topIndex",
}
mysqlURLFlag = &cli.StringFlag{
Name: "mysql-url",
Expand All @@ -58,6 +62,19 @@ var (
Usage: "droplet token",
Required: true,
}
startFlag = &cli.StringFlag{
Name: "start",
Usage: "The index will only be created when the deal creation time is greater than 'start', eg. 2023-07-26",
}
endFlag = &cli.StringFlag{
Name: "end",
Usage: "The index will only be created when the deal creation time is less than end 'end', eg. 2023-07-27",
}
concurrencyFlag = &cli.IntFlag{
Name: "concurrency",
Usage: "Concurrent number of indexes generated",
Value: 1,
}
)

func main() {
Expand Down Expand Up @@ -85,6 +102,9 @@ var generateIndexCmd = &cli.Command{
carDirFlag,
dropletTokenFlag,
dropletURLFlag,
startFlag,
endFlag,
concurrencyFlag,
},
Action: func(cctx *cli.Context) error {
ctx := cctx.Context
Expand All @@ -94,19 +114,31 @@ var generateIndexCmd = &cli.Command{
if err != nil {
return err
}
p.concurrency = cctx.Int(concurrencyFlag.Name)
if p.concurrency < 1 {
p.concurrency = 1
}

fmt.Println("car dir:", carDir, "index dir:", indexDir)

return generateIndex(ctx, carDir, indexDir, p)
},
}

type pieceInfo struct {
piece string
payloadSize uint64
pieceSize uint64
}

type params struct {
api marketapi.IMarket
close jsonrpc.ClientCloser
topIndexRepo *dagstore.MongoTopIndex
shardRepo repo.IShardRepo
pieces map[string]struct{}
pieceInfos []*pieceInfo
concurrency int
}

func paramsFromContext(cctx *cli.Context) (*params, error) {
Expand All @@ -131,15 +163,37 @@ func paramsFromContext(cctx *cli.Context) (*params, error) {
if err != nil {
return nil, fmt.Errorf("list deal failed: %v", err)
}
sort.Slice(deals, func(i, j int) bool {
return deals[i].CreationTime.Time().After(deals[i].CreationTime.Time())
})

start, end, err := getStartEndTime(cctx)
if err != nil {
return nil, fmt.Errorf("parse time failed: %v", err)
}
pieces := make(map[string]struct{}, len(deals))
pieceInfos := make([]*pieceInfo, 0, len(deals))
for _, deal := range deals {
pieces[deal.Proposal.PieceCID.String()] = struct{}{}
if start != nil && start.After(deal.CreationTime.Time()) {
continue
}
if end != nil && end.Before(deal.CreationTime.Time()) {
continue
}
p := deal.Proposal.PieceCID.String()
if _, ok := pieces[p]; !ok {
pieces[p] = struct{}{}
pieceInfos = append(pieceInfos, &pieceInfo{piece: p, payloadSize: deal.PayloadSize, pieceSize: uint64(deal.Proposal.PieceSize)})
}
}
fmt.Printf("had %d active deals, had %d piece\n", len(deals), len(pieces))
fmt.Printf("active deals: %d, valid deals: %d, pieces: %d\n", len(deals), len(pieceInfos), len(pieces))

topIndexRepo, err := dagstore.NewMongoTopIndex(ctx, mongoURL)
if err != nil {
return nil, fmt.Errorf("connect to mongo failed: %v", err)
var topIndexRepo *dagstore.MongoTopIndex
if len(mongoURL) != 0 {
topIndexRepo, err = dagstore.NewMongoTopIndex(ctx, mongoURL)
if err != nil {
return nil, fmt.Errorf("connect to mongo failed: %v", err)
}
}

cfg := config.DefaultMarketConfig
Expand All @@ -155,45 +209,98 @@ func paramsFromContext(cctx *cli.Context) (*params, error) {
topIndexRepo: topIndexRepo,
shardRepo: repo.ShardRepo(),
pieces: pieces,
pieceInfos: pieceInfos,
}, nil
}

func generateIndex(ctx context.Context, carDir string, indexDir string, p *params) error {
for piece := range p.pieces {
has, err := hasIndex(ctx, piece, indexDir)
func getStartEndTime(cctx *cli.Context) (*time.Time, *time.Time, error) {
var start, end *time.Time
if cctx.IsSet(startFlag.Name) {
t, err := time.Parse("2006-01-02", cctx.String(startFlag.Name))
if err != nil {
return err
return nil, nil, err
}
if has {
continue
start = &t
}
if cctx.IsSet(endFlag.Name) {
t, err := time.Parse("2006-01-02", cctx.String(endFlag.Name))
if err != nil {
return nil, nil, err
}
end = &t
}
fmt.Println("start:", start, "end:", end)
return start, end, nil
}

func generateIndex(ctx context.Context, carDir string, indexDir string, p *params) error {
doGenIndex := func(pi *pieceInfo) error {
piece := pi.piece
f, err := os.Open(filepath.Join(carDir, piece))
if err != nil {
fmt.Println(err)
continue
return err
}
defer f.Close() //nolint

idx, err := car.ReadOrGenerateIndex(f, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true))
// piece may padding
r := utils.NewAlgnZeroMountReader(f, int(pi.payloadSize), int(pi.pieceSize))
idx, err := car.ReadOrGenerateIndex(r, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true))
if err == nil {
fmt.Printf("generate index success: %s\n", piece)

if err := saveIndex(idx, indexDir, piece); err != nil {
return fmt.Errorf("save index failed, piece: %s, error: %v", piece, err)
}
if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil {
return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err)
if p.topIndexRepo != nil {
if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil {
return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err)
}
}
if err := saveShardToMysql(ctx, piece, p.shardRepo); err != nil {
return fmt.Errorf("save shard to mysql failed, piece: %s, error: %vs", piece, err)
}
} else {
fmt.Printf("generate index failed, piece: %s, error: %v\n", piece, err)
}
return nil
}

return nil
wg := sync.WaitGroup{}
th := throttle.Fixed(p.concurrency)
var globalErr error
for _, pi := range p.pieceInfos {
pi := pi
has, err := hasIndex(ctx, pi.piece, indexDir)
if err != nil {
return err
}
if has {
fmt.Println("already had index:", pi.piece)
continue
}
if globalErr != nil {
break
}

wg.Add(1)
go func() {
defer wg.Done()
err = th.Do(ctx, func(ctx context.Context) error {
err := doGenIndex(pi)
if err != nil && os.IsNotExist(err) {
fmt.Println(err)
return nil
}
return err
})
if err != nil {
globalErr = err
}
}()
}
wg.Wait()

return globalErr
}

func hasIndex(ctx context.Context, piece string, indexDir string) (bool, error) {
Expand Down Expand Up @@ -240,7 +347,7 @@ func saveShardToMysql(ctx context.Context, piece string, shardRepo repo.IShardRe
Error: "",
}

return shardRepo.CreateShard(ctx, &shard)
return shardRepo.SaveShard(ctx, &shard)
}

var migrateIndexCmd = &cli.Command{
Expand Down Expand Up @@ -287,6 +394,7 @@ func migrateIndex(ctx context.Context, indexDir string, p *params) error {
return err
}
if has {
fmt.Println("already had shard:", piece)
return nil
}

Expand All @@ -300,9 +408,12 @@ func migrateIndex(ctx context.Context, indexDir string, p *params) error {
return err
}

if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil {
return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err)
if p.topIndexRepo != nil {
if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil {
return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err)
}
}

if err := saveShardToMysql(ctx, piece, p.shardRepo); err != nil {
return fmt.Errorf("save shard to mysql failed, piece: %s, error: %vs", piece, err)
}
Expand Down

0 comments on commit 05ead19

Please sign in to comment.