Skip to content

Commit

Permalink
Use CAR file in generation of CommP (#26)
Browse files Browse the repository at this point in the history
* Using PieceIO for StorageMarket. Incomplete due to differences between IPLD implementations (go-ipld-prime and go-ipld-format.)

* Use CAR file when generating CommP

* Propagate errors from tempfile handling
  • Loading branch information
ingar committed Jan 8, 2020
1 parent 86ecc3c commit 10eb435
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 82 deletions.
10 changes: 6 additions & 4 deletions pieceio/mocks/PieceIO.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions pieceio/pieceio.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func NewPieceIO(padReader PadReader, carIO CarIO, sectorCalculator SectorCalcula
return &pieceIO{padReader, carIO, sectorCalculator, store}
}

func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, error) {
func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error) {
f, err := pio.store.CreateTemp()
if err != nil {
return nil, "", err
return nil, nil, err
}
cleanup := func() {
f.Close()
Expand All @@ -49,7 +49,7 @@ func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, se
err = pio.carIO.WriteCar(context.Background(), bs, payloadCid, selector, f)
if err != nil {
cleanup()
return nil, "", err
return nil, nil, err
}
size := f.Size()
pieceSize := uint64(size)
Expand All @@ -59,23 +59,23 @@ func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, se
padded, err := f.Write(padbuf)
if err != nil {
cleanup()
return nil, "", err
return nil, nil, err
}
if uint64(padded) != remaining {
cleanup()
return nil, "", fmt.Errorf("wrote %d byte of padding while expecting %d to be written", padded, remaining)
return nil, nil, fmt.Errorf("wrote %d byte of padding while expecting %d to be written", padded, remaining)
}
_, err = f.Seek(0, io.SeekStart)
if err != nil {
cleanup()
return nil, "", err
return nil, nil, err
}
commitment, err := pio.sectorCalculator.GeneratePieceCommitment(f, paddedSize)
if err != nil {
cleanup()
return nil, "", err
return nil, nil, err
}
return commitment, f.Path(), nil
return commitment, f, nil
}

func (pio *pieceIO) ReadPiece(r io.Reader, bs WriteStore) (cid.Cid, error) {
Expand Down
57 changes: 29 additions & 28 deletions pieceio/pieceio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"io"
"os"
"testing"
)

Expand Down Expand Up @@ -66,26 +65,28 @@ func Test_ThereAndBackAgain(t *testing.T) {
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

bytes, filename, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
bytes, tmpFile, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
require.NoError(t, err)
defer func() {
deferErr := tmpFile.Close()
require.NoError(t, deferErr)
deferErr = store.Delete(tmpFile.Path())
require.NoError(t, deferErr)
}()
for _, b := range bytes {
require.NotEqual(t, 0, b)
}
f, err := os.Open(string(filename))
require.NoError(t, err)
info, err := os.Stat(string(filename))
require.NoError(t, err)
bufSize := int64(16) // small buffer to illustrate the logic
buf := make([]byte, bufSize)
var readErr error
padStart := int64(-1)
loops := int64(-1)
read := 0
skipped, err := f.Seek(info.Size()/2, io.SeekStart)
skipped, err := tmpFile.Seek(tmpFile.Size()/2, io.SeekStart)
require.NoError(t, err)
for readErr == nil {
loops++
read, readErr = f.Read(buf)
read, readErr = tmpFile.Read(buf)
for idx := int64(0); idx < int64(read); idx++ {
if buf[idx] == 0 {
if padStart == -1 {
Expand All @@ -96,18 +97,17 @@ func Test_ThereAndBackAgain(t *testing.T) {
}
}
}
_, err = f.Seek(0, io.SeekStart)
_, err = tmpFile.Seek(0, io.SeekStart)
require.NoError(t, err)

var reader io.Reader
if padStart != -1 {
reader = io.LimitReader(f, padStart)
reader = io.LimitReader(tmpFile, padStart)
} else {
reader = f
reader = tmpFile
}

id, err := pio.ReadPiece(reader, sourceBs)
os.Remove(string(filename))
require.NoError(t, err)
require.Equal(t, nd3.Cid(), id)
}
Expand Down Expand Up @@ -154,24 +154,25 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) {
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

commitment, filename, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
require.NoError(t, err)
for _, b := range commitment {
require.NotEqual(t, 0, b)
}
f, err := os.Open(string(filename))
commitment, tmpFile, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
require.NoError(t, err)
defer func() {
f.Close()
os.Remove(f.Name())
deferErr := tmpFile.Close()
require.NoError(t, deferErr)
deferErr = store.Delete(tmpFile.Path())
require.NoError(t, deferErr)
}()
info, err := os.Stat(string(filename))
_, err = tmpFile.Seek(0, io.SeekStart)
require.NoError(t, err)
buf := make([]byte, info.Size())
_, err = f.Read(buf)

for _, b := range commitment {
require.NotEqual(t, 0, b)
}
buf := make([]byte, tmpFile.Size())
_, err = tmpFile.Read(buf)
require.NoError(t, err)
buffer := bytes.NewBuffer(buf)
secondCommitment, err := sc.GeneratePieceCommitment(buffer, uint64(info.Size()))
secondCommitment, err := sc.GeneratePieceCommitment(buffer, uint64(tmpFile.Size()))
require.NoError(t, err)
require.Equal(t, commitment, secondCommitment)
}
Expand Down Expand Up @@ -244,7 +245,7 @@ func Test_Failures(t *testing.T) {

counter := 0
size := 0
mockfile.On("Write", mock.Anything).Run(func (args mock.Arguments) {
mockfile.On("Write", mock.Anything).Run(func(args mock.Arguments) {
arg := args[0]
buf := arg.([]byte)
size := len(buf)
Expand Down Expand Up @@ -273,7 +274,7 @@ func Test_Failures(t *testing.T) {

counter := 0
size := 0
mockfile.On("Write", mock.Anything).Run(func (args mock.Arguments) {
mockfile.On("Write", mock.Anything).Run(func(args mock.Arguments) {
arg := args[0]
buf := arg.([]byte)
size := len(buf)
Expand Down Expand Up @@ -302,7 +303,7 @@ func Test_Failures(t *testing.T) {

counter := 0
size := 0
mockfile.On("Write", mock.Anything).Run(func (args mock.Arguments) {
mockfile.On("Write", mock.Anything).Run(func(args mock.Arguments) {
arg := args[0]
buf := arg.([]byte)
size := len(buf)
Expand Down Expand Up @@ -332,4 +333,4 @@ func Test_Failures(t *testing.T) {
_, _, err = pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
require.Error(t, err)
})
}
}
2 changes: 1 addition & 1 deletion pieceio/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ type ReadStore interface {

// PieceIO converts between payloads and pieces
type PieceIO interface {
GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, error)
GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error)
ReadPiece(r io.Reader, bs WriteStore) (cid.Cid, error)
}
33 changes: 27 additions & 6 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@ import (
"context"

"github.com/filecoin-project/go-data-transfer"
blockstore "github.com/ipfs/go-ipfs-blockstore"

"github.com/filecoin-project/go-fil-components/filestore"
"github.com/filecoin-project/go-fil-components/pieceio"
"github.com/filecoin-project/go-fil-components/pieceio/cario"
"github.com/filecoin-project/go-fil-components/pieceio/padreader"
"github.com/filecoin-project/go-fil-components/pieceio/sectorcalculator"
"github.com/filecoin-project/go-fil-components/shared/tokenamount"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-cbor-util"

"github.com/filecoin-project/go-fil-components/retrievalmarket"
"github.com/filecoin-project/go-fil-components/retrievalmarket/discovery"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-statestore"

"github.com/filecoin-project/go-fil-components/shared/types"
"github.com/filecoin-project/go-fil-components/storagemarket"
"github.com/filecoin-project/go-statestore"
)

//go:generate cbor-gen-for ClientDeal ClientDealProposal
Expand All @@ -43,7 +51,9 @@ type Client struct {
// Because we are using only a fake DAGService
// implementation, there's no validation or events on the client side
dataTransfer datatransfer.Manager
dag ipld.DAGService
bs blockstore.Blockstore
fs filestore.FileStore
pio pieceio.PieceIO
discovery *discovery.Local

node storagemarket.StorageClientNode
Expand All @@ -65,11 +75,22 @@ type clientDealUpdate struct {
mut func(*ClientDeal)
}

func NewClient(h host.Host, dag ipld.DAGService, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client {
func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) (*Client, error) {
pr := padreader.NewPadReader()
carIO := cario.NewCarIO()
sectorCalculator := sectorcalculator.NewSectorCalculator("")
fs, err := filestore.NewLocalFileStore("")
if err != nil {
return nil, err
}
pio := pieceio.NewPieceIO(pr, carIO, sectorCalculator, fs)

c := &Client{
h: h,
dataTransfer: dataTransfer,
dag: dag,
bs: bs,
fs: fs,
pio: pio,
discovery: discovery,
node: scn,

Expand All @@ -83,7 +104,7 @@ func NewClient(h host.Host, dag ipld.DAGService, dataTransfer datatransfer.Manag
stopped: make(chan struct{}),
}

return c
return c, nil
}

func (c *Client) Run(ctx context.Context) {
Expand Down
53 changes: 18 additions & 35 deletions storagemarket/impl/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@ package storageimpl

import (
"context"
"io/ioutil"
"os"
"runtime"

ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"

"github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-components/filestore"
"github.com/filecoin-project/go-fil-components/pieceio/padreader"
"github.com/filecoin-project/go-fil-components/pieceio/sectorcalculator"
"github.com/filecoin-project/go-statestore"
)

Expand All @@ -37,44 +34,30 @@ func (c *Client) failDeal(id cid.Cid, cerr error) {
log.Errorf("deal %s failed: %+v", id, cerr)
}

func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, uint64, error) {
root, err := c.dag.Get(ctx, data)
if err != nil {
log.Errorf("failed to get file root for deal: %s", err)
return nil, 0, err
}

n, err := unixfile.NewUnixfsFile(ctx, c.dag, root)
if err != nil {
log.Errorf("cannot open unixfs file: %s", err)
return nil, 0, err
}
func (c *Client) commP(ctx context.Context, root cid.Cid) ([]byte, uint64, error) {
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())

uf, ok := n.(files.File)
if !ok {
// TODO: we probably got directory, how should we handle this in unixfs mode?
return nil, 0, xerrors.New("unsupported unixfs type")
}
// entire DAG selector
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

s, err := uf.Size()
commp, tmpFile, err := c.pio.GeneratePieceCommitment(c.bs, root, allSelector)
if err != nil {
return nil, 0, err
return nil, 0, xerrors.Errorf("generating CommP: %w", err)
}
size := tmpFile.Size()

pr, psize := padreader.NewPaddedReader(uf, uint64(s))

dir, err := ioutil.TempDir("", "sector")
err = tmpFile.Close()
if err != nil {
return nil, 0, err
return nil, 0, xerrors.Errorf("error closing temp file: %w", err)
}
defer os.RemoveAll(dir)
calculator := sectorcalculator.NewSectorCalculator(filestore.Path(dir))
commp, err := calculator.GeneratePieceCommitment(pr, psize)

err = c.fs.Delete(tmpFile.Path())
if err != nil {
return nil, 0, xerrors.Errorf("generating CommP: %w", err)
return nil, 0, xerrors.Errorf("error deleting temp file from filestore: %w", err)
}

return commp[:], psize, nil
return commp[:], uint64(size), nil
}

func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) {
Expand Down

0 comments on commit 10eb435

Please sign in to comment.