Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pow: support offline Filecoin batch preparation #848

Merged
merged 5 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli-docs/pow/pow_offline_prepare.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pow offline prepare [cid | path] [output CAR file path] [flags]
### Options

```
--aggregate aggregates a folder of files
-h, --help help for prepare
--ipfs-api string IPFS HTTP API multiaddress that stores the cid (only for Cid processing instead of file/folder path)
--json avoid pretty output and use json formatting
Expand Down
161 changes: 129 additions & 32 deletions cmd/pow/cmd/prepare/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -12,8 +13,12 @@ import (
"sync"
"time"

files "github.com/ipfs/go-ipfs-files"
unixfile "github.com/ipfs/go-unixfs/file"

"github.com/cheggaaa/pb/v3"
"github.com/dustin/go-humanize"
aggregator "github.com/filecoin-project/go-dagaggregator-unixfs"
bsrv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-car"
"github.com/ipfs/go-cid"
Expand All @@ -36,6 +41,7 @@ func init() {
prepare.Flags().String("tmpdir", os.TempDir(), "path of folder where a temporal blockstore is created for processing data")
prepare.Flags().String("ipfs-api", "", "IPFS HTTP API multiaddress that stores the cid (only for Cid processing instead of file/folder path)")
prepare.Flags().Bool("json", false, "avoid pretty output and use json formatting")
prepare.Flags().Bool("aggregate", false, "aggregates a folder of files")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New flag that indicates the files in the provided folder should be aggregated with the aggregator library.


commp.Flags().Bool("json", false, "avoid pretty output and use json formatting")
commp.Flags().Bool("skip-car-validation", false, "skips CAR validation when processing a path")
Expand Down Expand Up @@ -86,7 +92,7 @@ If a Cid is provided, an extra --ipfs-api flag should be provided to connect to
if err != nil {
c.Fatal(fmt.Errorf("parsing json flag: %s", err))
}
dataCid, dagService, cls, err := prepareDAGService(cmd, args, quiet)
dataCid, dagService, _, cls, err := prepareDAGService(cmd, args, quiet)
if err != nil {
c.Fatal(fmt.Errorf("creating dag-service: %s", err))
}
Expand Down Expand Up @@ -147,7 +153,7 @@ You can use the --skip-car-validation, but usually shouldn't be done unless you
c.Fatal(fmt.Errorf("parsing json flag: %s", err))
}
if jsonFlag {
printJSONResult(pieceSize, pieceCID)
printJSONResult(pieceSize, cid.Undef, pieceCID, nil)
return
}
c.Message("Piece-size: %d (%s)", pieceSize, humanize.IBytes(pieceSize))
Expand Down Expand Up @@ -183,7 +189,19 @@ The piece-size and piece-cid are printed to stderr. For scripting usage, its rec
if err != nil {
c.Fatal(fmt.Errorf("parsing json flag: %s", err))
}
dataCid, dagService, cls, err := prepareDAGService(cmd, args, json)
aggregate, err := cmd.Flags().GetBool("aggregate")
if err != nil {
c.Fatal(fmt.Errorf("aggregate flag: %s", err))
}
ipfsAPI, err := cmd.Flags().GetString("ipfs-api")
if err != nil {
c.Fatal(fmt.Errorf("get api addr flag: %s", err))
}
if ipfsAPI != "" && aggregate {
c.Fatal(errors.New("the --aggregate flag can't be used with a remote go-ipfs node"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least for now... we could keep working to support it.

}

payloadCid, dagService, aggrFiles, cls, err := prepareDAGService(cmd, args, json)
if err != nil {
c.Fatal(fmt.Errorf("creating temporal dag-service: %s", err))
}
Expand Down Expand Up @@ -216,7 +234,7 @@ The piece-size and piece-cid are printed to stderr. For scripting usage, its rec
c.Fatal(fmt.Errorf("closing car writer: %s", err))
}
}()
if err := car.WriteCar(ctx, dagService, []cid.Cid{dataCid}, pwCAR); err != nil {
if err := car.WriteCar(ctx, dagService, []cid.Cid{payloadCid}, pwCAR); err != nil {
writeCarErr = err
return
}
Expand Down Expand Up @@ -248,64 +266,99 @@ The piece-size and piece-cid are printed to stderr. For scripting usage, its rec
if errCommP != nil {
c.Fatal(fmt.Errorf("calculating piece-size and PieceCID: %s", err))
}

if aggregate {
rootNd, err := dagService.Get(ctx, payloadCid)
if err != nil {
c.Fatal(fmt.Errorf("get root node: %s", err))
}
manifestLnk := rootNd.Links()[0]
manifestNd, err := dagService.Get(ctx, manifestLnk.Cid)
if err != nil {
c.Fatal(fmt.Errorf("get manifest node: %s", err))
}
manifestF, err := unixfile.NewUnixfsFile(context.Background(), dagService, manifestNd)
if err != nil {
c.Fatal(fmt.Errorf("get unifxfs manifest file: %s", err))
}
manifest := manifestF.(files.File)
fManifest, err := os.Create(args[1] + ".manifest")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we generate the manifest file on the fly in dagify after walked the directory, without writing anything to the local directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm writing it to the local directory as a feature.
The manifest file is generated on the fly by the aggregator library and included in the batch UnixFS DAG.

Also, the dagify is per file and the manifest is a batch scoped concept.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha. I confused the order ;)

if err != nil {
c.Fatal(fmt.Errorf("creating manifest file: %s", err))

}
defer func() {
if err := fManifest.Close(); err != nil {
c.Fatal(fmt.Errorf("closing manifest file: %s", err))
}
}()
if _, err := io.Copy(fManifest, manifest); err != nil {
c.Fatal(fmt.Errorf("writing manifest file: %s", err))
}
Comment on lines +270 to +297
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code section is to retrieve from the generated batch the manifest file that lives inside.
The manifest file lives as the first file in the root folder (defined in the spec), thus Links()[0] on 275 will always succeed and give us the file.

The rest is pulling the file and writing it to the current folder.

}

if json {
printJSONResult(pieceSize, pieceCid)
printJSONResult(pieceSize, payloadCid, pieceCid, aggrFiles)
return
}
c.Message("Created CAR file, and piece digest in %.02fs.", time.Since(start).Seconds())
c.Message("Payload cid: %s", payloadCid)
c.Message("Piece size: %d (%s)", pieceSize, humanize.IBytes(pieceSize))
c.Message("Piece CID: %s\n", pieceCid)

c.Message("Lotus offline-deal command:")
c.Message("lotus client deal --manual-piece-cid=%s --manual-piece-size=%d %s <miner> <price> <duration>", pieceCid, pieceSize, dataCid)
c.Message("lotus client deal --manual-piece-cid=%s --manual-piece-size=%d %s <miner> <price> <duration>", pieceCid, pieceSize, payloadCid)
},
}

type closeFunc func() error

func prepareDAGService(cmd *cobra.Command, args []string, quiet bool) (cid.Cid, ipld.DAGService, closeFunc, error) {
func prepareDAGService(cmd *cobra.Command, args []string, quiet bool) (cid.Cid, ipld.DAGService, []aggregatedFile, closeFunc, error) {
ipfsAPI, err := cmd.Flags().GetString("ipfs-api")
if err != nil {
return cid.Undef, nil, nil, fmt.Errorf("getting ipfs api flag: %s", err)
return cid.Undef, nil, nil, nil, fmt.Errorf("getting ipfs api flag: %s", err)
}
aggregate, err := cmd.Flags().GetBool("aggregate")
if err != nil {
return cid.Undef, nil, nil, nil, fmt.Errorf("aggregate flag: %s", err)
}

if ipfsAPI == "" {
path := args[0]
tmpDir, err := cmd.Flags().GetString("tmpdir")
if err != nil {
return cid.Undef, nil, nil, fmt.Errorf("getting tmpdir directory: %s", err)
return cid.Undef, nil, nil, nil, fmt.Errorf("getting tmpdir directory: %s", err)
}

dagService, cls, err := createTmpDAGService(tmpDir)
if err != nil {
return cid.Undef, nil, nil, fmt.Errorf("creating temporary dag-service: %s", err)
return cid.Undef, nil, nil, nil, fmt.Errorf("creating temporary dag-service: %s", err)
}
dataCid, err := dagify(context.Background(), dagService, path, quiet)
dataCid, aggregatedFiles, err := dagify(context.Background(), dagService, path, quiet, aggregate)
if err != nil {
return cid.Undef, nil, nil, fmt.Errorf("creating dag for data: %s", err)
return cid.Undef, nil, nil, nil, fmt.Errorf("creating dag for data: %s", err)
}

return dataCid, dagService, cls, nil
return dataCid, dagService, aggregatedFiles, cls, nil
}

if len(args) == 0 {
return cid.Undef, nil, nil, fmt.Errorf("cid argument is empty")
return cid.Undef, nil, nil, nil, fmt.Errorf("cid argument is empty")
}
dataCid, err := cid.Decode(args[0])
if err != nil {
return cid.Undef, nil, nil, fmt.Errorf("parsing cid: %s", err)
return cid.Undef, nil, nil, nil, fmt.Errorf("parsing cid: %s", err)
}

ipfsAPIMA, err := multiaddr.NewMultiaddr(ipfsAPI)
if err != nil {
return cid.Undef, nil, nil, fmt.Errorf("parsing ipfs-api multiaddress: %s", err)
return cid.Undef, nil, nil, nil, fmt.Errorf("parsing ipfs-api multiaddress: %s", err)
}
ipfs, err := httpapi.NewApi(ipfsAPIMA)
if err != nil {
return cid.Undef, nil, nil, fmt.Errorf("creating ipfs client: %s", err)
return cid.Undef, nil, nil, nil, fmt.Errorf("creating ipfs client: %s", err)
}

return dataCid, ipfs.Dag(), closeFunc(func() error { return nil }), nil
return dataCid, ipfs.Dag(), nil, closeFunc(func() error { return nil }), nil
}

func createTmpDAGService(tmpDir string) (ipld.DAGService, closeFunc, error) {
Expand Down Expand Up @@ -333,30 +386,36 @@ func createTmpDAGService(tmpDir string) (ipld.DAGService, closeFunc, error) {

var jsonOutput = io.Writer(os.Stderr)

func printJSONResult(pieceSize uint64, pieceCID cid.Cid) {
func printJSONResult(pieceSize uint64, payloadCid, pieceCID cid.Cid, aggrFiles []aggregatedFile) {
outData := struct {
PieceSize uint64 `json:"piece_size"`
PieceCid string `json:"piece_cid"`
PayloadCid string `json:"payload_cid,omitempty"`
PieceSize uint64 `json:"piece_size"`
PieceCid string `json:"piece_cid"`
AggregatedFiles []aggregatedFile `json:"files,omitempty"`
}{
PieceSize: pieceSize,
PieceCid: pieceCID.String(),
PieceSize: pieceSize,
PieceCid: pieceCID.String(),
AggregatedFiles: aggrFiles,
}
if payloadCid.Defined() {
outData.PayloadCid = payloadCid.String()
}
out, err := json.Marshal(outData)
c.CheckErr(err)
fmt.Fprint(jsonOutput, string(out))
}

func dagify(ctx context.Context, dagService ipld.DAGService, path string, quiet bool) (cid.Cid, error) {
func dagify(ctx context.Context, dagService ipld.DAGService, path string, quiet bool, aggregate bool) (cid.Cid, []aggregatedFile, error) {
var progressChan chan int64
if !quiet {
f, err := os.Open(path)
if err != nil {
return cid.Undef, fmt.Errorf("opening path: %s", err)
return cid.Undef, nil, fmt.Errorf("opening path: %s", err)
}
stat, err := f.Stat()
if err != nil {
_ = f.Close()
return cid.Undef, fmt.Errorf("getting stat of data: %s", err)
return cid.Undef, nil, fmt.Errorf("getting stat of data: %s", err)
}
_ = f.Close()

Expand All @@ -375,7 +434,7 @@ func dagify(ctx context.Context, dagService ipld.DAGService, path string, quiet
return err
})
if err != nil {
return cid.Undef, fmt.Errorf("walking path: %s", err)
return cid.Undef, nil, fmt.Errorf("walking path: %s", err)
}
}
bar := pb.StartNew(dataSize)
Expand All @@ -395,10 +454,48 @@ func dagify(ctx context.Context, dagService ipld.DAGService, path string, quiet
c.Message("DAG created in %.02fs.", time.Since(start).Seconds())
}()
}
dataCid, err := dataprep.Dagify(ctx, dagService, path, progressChan)
if err != nil {
return cid.Undef, fmt.Errorf("creating dag for data: %s", err)

var aggregatedFiles []aggregatedFile
var dataCid cid.Cid
var err error
if !aggregate {
dataCid, err = dataprep.Dagify(ctx, dagService, path, progressChan)
if err != nil {
return cid.Undef, nil, fmt.Errorf("creating dag for data: %s", err)
}
} else {
var lst []aggregator.AggregateDagEntry
err = filepath.Walk(path, func(p string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
dcid, err := dataprep.Dagify(ctx, dagService, p, progressChan)
if err != nil {
return err
}
lst = append(lst, aggregator.AggregateDagEntry{
RootCid: dcid,
})
aggregatedFiles = append(aggregatedFiles, aggregatedFile{Name: p, Cid: dcid.String()})
return err
})
if err != nil {
return cid.Undef, nil, fmt.Errorf("walking the target folder: %s", err)
}

dataCid, err = aggregator.Aggregate(ctx, dagService, lst)
if err != nil {
return cid.Undef, nil, fmt.Errorf("aggregating: %s", err)
}
}

return dataCid, nil
return dataCid, aggregatedFiles, nil
}

type aggregatedFile struct {
Name string `json:"name"`
Cid string `jsong:"cid"`
}
6 changes: 3 additions & 3 deletions cmd/pow/cmd/prepare/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestOfflinePreparation(t *testing.T) {
size int
json string
}{
{size: 10000, json: `{"piece_size":16384,"piece_cid":"baga6ea4seaqjuk4uh5g7cu5znbvrr7wvfsn2l3xj47rbymvi63uiiroya44lkiy"}`},
{size: 1000, json: `{"piece_size":2048,"piece_cid":"baga6ea4seaqadahcx4ct54tlbvgkqlhmif7kxxkvxz3yf3vr2e4puhvsxdbrgka"}`},
{size: 100, json: `{"piece_size":256,"piece_cid":"baga6ea4seaqd4hgfl6texpf377k7igx2ga2mfwn3lb4c4kdpaq3g3oao2yftuki"}`},
{size: 10000, json: `{"payload_cid":"QmRP8TCp9bthhzLACAao6mh8cfLypqXncdNbzuPtuqFYP7","piece_size":16384,"piece_cid":"baga6ea4seaqjuk4uh5g7cu5znbvrr7wvfsn2l3xj47rbymvi63uiiroya44lkiy"}`},
{size: 1000, json: `{"payload_cid":"QmQRAjpSLWziADGz8p5ezxTguFNn18yYbSnduKqMrbJ93c","piece_size":2048,"piece_cid":"baga6ea4seaqadahcx4ct54tlbvgkqlhmif7kxxkvxz3yf3vr2e4puhvsxdbrgka"}`},
{size: 100, json: `{"payload_cid":"QmY6zHswPvyZkAyxwM9uup1DDPb67hZqChv8hnyu4MrFWk","piece_size":256,"piece_cid":"baga6ea4seaqd4hgfl6texpf377k7igx2ga2mfwn3lb4c4kdpaq3g3oao2yftuki"}`},
}

for _, test := range testCases {
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dustin/go-humanize v1.0.0
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-dagaggregator-unixfs v0.1.0
github.com/filecoin-project/go-data-transfer v1.4.3
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-commp-hashhash v0.0.0-20210329070555-d36b89a2504c
Expand All @@ -36,14 +37,15 @@ require (
github.com/ipfs/go-ds-badger2 v0.1.1-0.20200708190120-187fc06f714e
github.com/ipfs/go-graphsync v0.7.0 // indirect
github.com/ipfs/go-ipfs v0.8.0
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blockstore v1.0.4
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-http-client v0.1.0
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.1.3
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-unixfs v0.2.6
github.com/ipfs/interface-go-ipfs-core v0.4.0
github.com/ipld/go-car v0.2.1-0.20210322190947-cffd36d39d90 // indirect
github.com/jessevdk/go-assets v0.0.0-20160921144138-4f4301a06e15
Expand Down Expand Up @@ -81,4 +83,6 @@ require (
nhooyr.io/websocket v1.8.6 // indirect
)

replace github.com/ipfs/go-unixfs => github.com/ipfs/go-unixfs v0.2.2

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this instead of setting the version directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Becase a lot of libs in this ecosystem use v0. That means that the dependency graph is very brittle when you include a new dependency that ends up using a newer version of an indirect dependency considering how Go MVS version resolving works.

A lot of libs in the IPFS ecosystem that are v0 make breaking changes in further v0.. and that doesn't play well with Go MVS (which selects the maximum referenced minor version of a major version).

Yes... horrible. Usually you have some luck by trying to upgrade other deps hoping that they will stop reference the old v0 version that is causing this needed downgrade, but that's not usually an easy task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long story short, if you don't use strict semantic versioning in the Go ecosystem.. you'll create nightmares for your clients.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I see this is to force its direct and indirect dependencies to use this specific version instead of relying on MVS. Maybe worth a comment? Or maybe I'm ignorant enough to not understanding the intention in the first place ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's exactly the intention. We usually only use replace to solve MVS problems on dependencies that don't follow strict semantic versioning. So that's the default correct interpretation.


replace github.com/dgraph-io/badger/v2 => github.com/dgraph-io/badger/v2 v2.2007.2
Loading