From 10eb4356da57cab335eeea0f2cb469b7bf3d2986 Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Wed, 8 Jan 2020 13:09:34 -0800 Subject: [PATCH] Use CAR file in generation of CommP (#26) * Using PieceIO for StorageMarket. Incomplete due to differences between IPLD implementations (go-ipld-prime and go-ipld-format.) * Use CAR file when generating CommP * Propagate errors from tempfile handling --- pieceio/mocks/PieceIO.go | 10 +++--- pieceio/pieceio.go | 16 ++++----- pieceio/pieceio_test.go | 57 +++++++++++++++--------------- pieceio/types.go | 2 +- storagemarket/impl/client.go | 33 +++++++++++++---- storagemarket/impl/client_utils.go | 53 ++++++++++----------------- 6 files changed, 89 insertions(+), 82 deletions(-) diff --git a/pieceio/mocks/PieceIO.go b/pieceio/mocks/PieceIO.go index ccce9ea3..baac4ddb 100644 --- a/pieceio/mocks/PieceIO.go +++ b/pieceio/mocks/PieceIO.go @@ -15,7 +15,7 @@ type PieceIO struct { } // GeneratePieceCommitment provides a mock function with given fields: bs, payloadCid, selector -func (_m *PieceIO) GeneratePieceCommitment(bs pieceio.ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, error) { +func (_m *PieceIO) GeneratePieceCommitment(bs pieceio.ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error) { ret := _m.Called(bs, payloadCid, selector) var r0 []byte @@ -27,11 +27,13 @@ func (_m *PieceIO) GeneratePieceCommitment(bs pieceio.ReadStore, payloadCid cid. } } - var r1 filestore.Path - if rf, ok := ret.Get(1).(func(pieceio.ReadStore, cid.Cid, ipld.Node) filestore.Path); ok { + var r1 filestore.File + if rf, ok := ret.Get(1).(func(pieceio.ReadStore, cid.Cid, ipld.Node) filestore.File); ok { r1 = rf(bs, payloadCid, selector) } else { - r1 = ret.Get(1).(filestore.Path) + if ret.Get(1) != nil { + r1 = ret.Get(1).(filestore.File) + } } var r2 error diff --git a/pieceio/pieceio.go b/pieceio/pieceio.go index 41359fec..d4f84c64 100644 --- a/pieceio/pieceio.go +++ b/pieceio/pieceio.go @@ -37,10 +37,10 @@ func NewPieceIO(padReader PadReader, carIO CarIO, sectorCalculator SectorCalcula return &pieceIO{padReader, carIO, sectorCalculator, store} } -func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, error) { +func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error) { f, err := pio.store.CreateTemp() if err != nil { - return nil, "", err + return nil, nil, err } cleanup := func() { f.Close() @@ -49,7 +49,7 @@ func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, se err = pio.carIO.WriteCar(context.Background(), bs, payloadCid, selector, f) if err != nil { cleanup() - return nil, "", err + return nil, nil, err } size := f.Size() pieceSize := uint64(size) @@ -59,23 +59,23 @@ func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, se padded, err := f.Write(padbuf) if err != nil { cleanup() - return nil, "", err + return nil, nil, err } if uint64(padded) != remaining { cleanup() - return nil, "", fmt.Errorf("wrote %d byte of padding while expecting %d to be written", padded, remaining) + return nil, nil, fmt.Errorf("wrote %d byte of padding while expecting %d to be written", padded, remaining) } _, err = f.Seek(0, io.SeekStart) if err != nil { cleanup() - return nil, "", err + return nil, nil, err } commitment, err := pio.sectorCalculator.GeneratePieceCommitment(f, paddedSize) if err != nil { cleanup() - return nil, "", err + return nil, nil, err } - return commitment, f.Path(), nil + return commitment, f, nil } func (pio *pieceIO) ReadPiece(r io.Reader, bs WriteStore) (cid.Cid, error) { diff --git a/pieceio/pieceio_test.go b/pieceio/pieceio_test.go index ee7dbbdf..1a162e9a 100644 --- a/pieceio/pieceio_test.go +++ b/pieceio/pieceio_test.go @@ -19,7 +19,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "io" - "os" "testing" ) @@ -66,26 +65,28 @@ func Test_ThereAndBackAgain(t *testing.T) { ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())))) }).Node() - bytes, filename, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node) + bytes, tmpFile, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node) require.NoError(t, err) + defer func() { + deferErr := tmpFile.Close() + require.NoError(t, deferErr) + deferErr = store.Delete(tmpFile.Path()) + require.NoError(t, deferErr) + }() for _, b := range bytes { require.NotEqual(t, 0, b) } - f, err := os.Open(string(filename)) - require.NoError(t, err) - info, err := os.Stat(string(filename)) - require.NoError(t, err) bufSize := int64(16) // small buffer to illustrate the logic buf := make([]byte, bufSize) var readErr error padStart := int64(-1) loops := int64(-1) read := 0 - skipped, err := f.Seek(info.Size()/2, io.SeekStart) + skipped, err := tmpFile.Seek(tmpFile.Size()/2, io.SeekStart) require.NoError(t, err) for readErr == nil { loops++ - read, readErr = f.Read(buf) + read, readErr = tmpFile.Read(buf) for idx := int64(0); idx < int64(read); idx++ { if buf[idx] == 0 { if padStart == -1 { @@ -96,18 +97,17 @@ func Test_ThereAndBackAgain(t *testing.T) { } } } - _, err = f.Seek(0, io.SeekStart) + _, err = tmpFile.Seek(0, io.SeekStart) require.NoError(t, err) var reader io.Reader if padStart != -1 { - reader = io.LimitReader(f, padStart) + reader = io.LimitReader(tmpFile, padStart) } else { - reader = f + reader = tmpFile } id, err := pio.ReadPiece(reader, sourceBs) - os.Remove(string(filename)) require.NoError(t, err) require.Equal(t, nd3.Cid(), id) } @@ -154,24 +154,25 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) { ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())))) }).Node() - commitment, filename, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node) - require.NoError(t, err) - for _, b := range commitment { - require.NotEqual(t, 0, b) - } - f, err := os.Open(string(filename)) + commitment, tmpFile, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node) require.NoError(t, err) defer func() { - f.Close() - os.Remove(f.Name()) + deferErr := tmpFile.Close() + require.NoError(t, deferErr) + deferErr = store.Delete(tmpFile.Path()) + require.NoError(t, deferErr) }() - info, err := os.Stat(string(filename)) + _, err = tmpFile.Seek(0, io.SeekStart) require.NoError(t, err) - buf := make([]byte, info.Size()) - _, err = f.Read(buf) + + for _, b := range commitment { + require.NotEqual(t, 0, b) + } + buf := make([]byte, tmpFile.Size()) + _, err = tmpFile.Read(buf) require.NoError(t, err) buffer := bytes.NewBuffer(buf) - secondCommitment, err := sc.GeneratePieceCommitment(buffer, uint64(info.Size())) + secondCommitment, err := sc.GeneratePieceCommitment(buffer, uint64(tmpFile.Size())) require.NoError(t, err) require.Equal(t, commitment, secondCommitment) } @@ -244,7 +245,7 @@ func Test_Failures(t *testing.T) { counter := 0 size := 0 - mockfile.On("Write", mock.Anything).Run(func (args mock.Arguments) { + mockfile.On("Write", mock.Anything).Run(func(args mock.Arguments) { arg := args[0] buf := arg.([]byte) size := len(buf) @@ -273,7 +274,7 @@ func Test_Failures(t *testing.T) { counter := 0 size := 0 - mockfile.On("Write", mock.Anything).Run(func (args mock.Arguments) { + mockfile.On("Write", mock.Anything).Run(func(args mock.Arguments) { arg := args[0] buf := arg.([]byte) size := len(buf) @@ -302,7 +303,7 @@ func Test_Failures(t *testing.T) { counter := 0 size := 0 - mockfile.On("Write", mock.Anything).Run(func (args mock.Arguments) { + mockfile.On("Write", mock.Anything).Run(func(args mock.Arguments) { arg := args[0] buf := arg.([]byte) size := len(buf) @@ -332,4 +333,4 @@ func Test_Failures(t *testing.T) { _, _, err = pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node) require.Error(t, err) }) -} \ No newline at end of file +} diff --git a/pieceio/types.go b/pieceio/types.go index 6fdca918..55268979 100644 --- a/pieceio/types.go +++ b/pieceio/types.go @@ -19,6 +19,6 @@ type ReadStore interface { // PieceIO converts between payloads and pieces type PieceIO interface { - GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, error) + GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error) ReadPiece(r io.Reader, bs WriteStore) (cid.Cid, error) } diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 6073dc9a..c4577c6b 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -4,10 +4,16 @@ import ( "context" "github.com/filecoin-project/go-data-transfer" + blockstore "github.com/ipfs/go-ipfs-blockstore" + + "github.com/filecoin-project/go-fil-components/filestore" + "github.com/filecoin-project/go-fil-components/pieceio" + "github.com/filecoin-project/go-fil-components/pieceio/cario" + "github.com/filecoin-project/go-fil-components/pieceio/padreader" + "github.com/filecoin-project/go-fil-components/pieceio/sectorcalculator" "github.com/filecoin-project/go-fil-components/shared/tokenamount" "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" inet "github.com/libp2p/go-libp2p-core/network" @@ -15,13 +21,15 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-components/retrievalmarket" "github.com/filecoin-project/go-fil-components/retrievalmarket/discovery" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/go-fil-components/shared/types" "github.com/filecoin-project/go-fil-components/storagemarket" - "github.com/filecoin-project/go-statestore" ) //go:generate cbor-gen-for ClientDeal ClientDealProposal @@ -43,7 +51,9 @@ type Client struct { // Because we are using only a fake DAGService // implementation, there's no validation or events on the client side dataTransfer datatransfer.Manager - dag ipld.DAGService + bs blockstore.Blockstore + fs filestore.FileStore + pio pieceio.PieceIO discovery *discovery.Local node storagemarket.StorageClientNode @@ -65,11 +75,22 @@ type clientDealUpdate struct { mut func(*ClientDeal) } -func NewClient(h host.Host, dag ipld.DAGService, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client { +func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) (*Client, error) { + pr := padreader.NewPadReader() + carIO := cario.NewCarIO() + sectorCalculator := sectorcalculator.NewSectorCalculator("") + fs, err := filestore.NewLocalFileStore("") + if err != nil { + return nil, err + } + pio := pieceio.NewPieceIO(pr, carIO, sectorCalculator, fs) + c := &Client{ h: h, dataTransfer: dataTransfer, - dag: dag, + bs: bs, + fs: fs, + pio: pio, discovery: discovery, node: scn, @@ -83,7 +104,7 @@ func NewClient(h host.Host, dag ipld.DAGService, dataTransfer datatransfer.Manag stopped: make(chan struct{}), } - return c + return c, nil } func (c *Client) Run(ctx context.Context) { diff --git a/storagemarket/impl/client_utils.go b/storagemarket/impl/client_utils.go index a5257aea..9effef64 100644 --- a/storagemarket/impl/client_utils.go +++ b/storagemarket/impl/client_utils.go @@ -2,22 +2,19 @@ package storageimpl import ( "context" - "io/ioutil" - "os" "runtime" + ipldfree "github.com/ipld/go-ipld-prime/impl/free" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" + "github.com/ipfs/go-cid" - files "github.com/ipfs/go-ipfs-files" - unixfile "github.com/ipfs/go-unixfs/file" "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-fil-components/filestore" - "github.com/filecoin-project/go-fil-components/pieceio/padreader" - "github.com/filecoin-project/go-fil-components/pieceio/sectorcalculator" "github.com/filecoin-project/go-statestore" ) @@ -37,44 +34,30 @@ func (c *Client) failDeal(id cid.Cid, cerr error) { log.Errorf("deal %s failed: %+v", id, cerr) } -func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, uint64, error) { - root, err := c.dag.Get(ctx, data) - if err != nil { - log.Errorf("failed to get file root for deal: %s", err) - return nil, 0, err - } - - n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) - if err != nil { - log.Errorf("cannot open unixfs file: %s", err) - return nil, 0, err - } +func (c *Client) commP(ctx context.Context, root cid.Cid) ([]byte, uint64, error) { + ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) - uf, ok := n.(files.File) - if !ok { - // TODO: we probably got directory, how should we handle this in unixfs mode? - return nil, 0, xerrors.New("unsupported unixfs type") - } + // entire DAG selector + allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() - s, err := uf.Size() + commp, tmpFile, err := c.pio.GeneratePieceCommitment(c.bs, root, allSelector) if err != nil { - return nil, 0, err + return nil, 0, xerrors.Errorf("generating CommP: %w", err) } + size := tmpFile.Size() - pr, psize := padreader.NewPaddedReader(uf, uint64(s)) - - dir, err := ioutil.TempDir("", "sector") + err = tmpFile.Close() if err != nil { - return nil, 0, err + return nil, 0, xerrors.Errorf("error closing temp file: %w", err) } - defer os.RemoveAll(dir) - calculator := sectorcalculator.NewSectorCalculator(filestore.Path(dir)) - commp, err := calculator.GeneratePieceCommitment(pr, psize) + + err = c.fs.Delete(tmpFile.Path()) if err != nil { - return nil, 0, xerrors.Errorf("generating CommP: %w", err) + return nil, 0, xerrors.Errorf("error deleting temp file from filestore: %w", err) } - return commp[:], psize, nil + return commp[:], uint64(size), nil } func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) {