Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CAR and padding for piece data #27

Merged
merged 4 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions pieceio/pieceio.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package pieceio
import (
"context"
"fmt"
"github.com/filecoin-project/go-fil-markets/filestore"
"io"

"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-ipld-prime"
"io"

"github.com/filecoin-project/go-fil-markets/filestore"
)

type SectorCalculator interface {
Expand All @@ -31,13 +34,14 @@ type pieceIO struct {
carIO CarIO
sectorCalculator SectorCalculator
store filestore.FileStore
bs blockstore.Blockstore
}

func NewPieceIO(padReader PadReader, carIO CarIO, sectorCalculator SectorCalculator, store filestore.FileStore) PieceIO {
return &pieceIO{padReader, carIO, sectorCalculator, store}
func NewPieceIO(padReader PadReader, carIO CarIO, sectorCalculator SectorCalculator, store filestore.FileStore, bs blockstore.Blockstore) PieceIO {
return &pieceIO{padReader, carIO, sectorCalculator, store, bs}
}

func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error) {
func (pio *pieceIO) GeneratePieceCommitment(payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error) {
f, err := pio.store.CreateTemp()
if err != nil {
return nil, nil, err
Expand All @@ -46,7 +50,7 @@ func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, se
f.Close()
_ = pio.store.Delete(f.Path())
}
err = pio.carIO.WriteCar(context.Background(), bs, payloadCid, selector, f)
err = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, selector, f)
if err != nil {
cleanup()
return nil, nil, err
Expand Down Expand Up @@ -78,6 +82,6 @@ func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, se
return commitment, f, nil
}

func (pio *pieceIO) ReadPiece(r io.Reader, bs WriteStore) (cid.Cid, error) {
return pio.carIO.LoadCar(bs, r)
func (pio *pieceIO) ReadPiece(r io.Reader) (cid.Cid, error) {
return pio.carIO.LoadCar(pio.bs, r)
}
39 changes: 21 additions & 18 deletions pieceio/pieceio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ func Test_ThereAndBackAgain(t *testing.T) {

store, err := filestore.NewLocalFileStore(tempDir)
require.NoError(t, err)
pio := pieceio.NewPieceIO(pr, cio, sc, store)
require.NoError(t, err)

sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()

pio := pieceio.NewPieceIO(pr, cio, sc, store, sourceBs)
require.NoError(t, err)

dserv := dag.NewDAGService(sourceBserv)
a := dag.NewRawNode([]byte("aaaa"))
b := dag.NewRawNode([]byte("bbbb"))
Expand Down Expand Up @@ -65,7 +67,7 @@ func Test_ThereAndBackAgain(t *testing.T) {
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

bytes, tmpFile, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
bytes, tmpFile, err := pio.GeneratePieceCommitment(nd3.Cid(), node)
require.NoError(t, err)
defer func() {
deferErr := tmpFile.Close()
Expand Down Expand Up @@ -107,7 +109,7 @@ func Test_ThereAndBackAgain(t *testing.T) {
reader = tmpFile
}

id, err := pio.ReadPiece(reader, sourceBs)
id, err := pio.ReadPiece(reader)
require.NoError(t, err)
require.Equal(t, nd3.Cid(), id)
}
Expand All @@ -120,10 +122,11 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) {

store, err := filestore.NewLocalFileStore(tempDir)
require.NoError(t, err)
pio := pieceio.NewPieceIO(pr, cio, sc, store)

sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()
pio := pieceio.NewPieceIO(pr, cio, sc, store, sourceBs)

dserv := dag.NewDAGService(sourceBserv)
a := dag.NewRawNode([]byte("aaaa"))
b := dag.NewRawNode([]byte("bbbb"))
Expand Down Expand Up @@ -154,7 +157,7 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) {
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

commitment, tmpFile, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
commitment, tmpFile, err := pio.GeneratePieceCommitment(nd3.Cid(), node)
require.NoError(t, err)
defer func() {
deferErr := tmpFile.Close()
Expand Down Expand Up @@ -213,8 +216,8 @@ func Test_Failures(t *testing.T) {
t.Run("create temp file fails", func(t *testing.T) {
fsmock := fsmocks.FileStore{}
fsmock.On("CreateTemp").Return(nil, fmt.Errorf("Failed"))
pio := pieceio.NewPieceIO(nil, nil, nil, &fsmock)
_, _, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
pio := pieceio.NewPieceIO(nil, nil, nil, &fsmock, sourceBs)
_, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node)
require.Error(t, err)
})
t.Run("write CAR fails", func(t *testing.T) {
Expand All @@ -227,8 +230,8 @@ func Test_Failures(t *testing.T) {
ciomock := pmocks.CarIO{}
any := mock.Anything
ciomock.On("WriteCar", any, any, any, any, any).Return(fmt.Errorf("failed to write car"))
pio := pieceio.NewPieceIO(pr, &ciomock, sc, store)
_, _, err = pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
pio := pieceio.NewPieceIO(pr, &ciomock, sc, store, sourceBs)
_, _, err = pio.GeneratePieceCommitment(nd3.Cid(), node)
require.Error(t, err)
})
t.Run("padding fails", func(t *testing.T) {
Expand Down Expand Up @@ -256,8 +259,8 @@ func Test_Failures(t *testing.T) {
mockfile.On("Close").Return(nil).Once()
mockfile.On("Path").Return(filestore.Path("mock")).Once()

pio := pieceio.NewPieceIO(pr, cio, sc, &fsmock)
_, _, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
pio := pieceio.NewPieceIO(pr, cio, sc, &fsmock, sourceBs)
_, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node)
require.Error(t, err)
})
t.Run("incorrect padding", func(t *testing.T) {
Expand Down Expand Up @@ -285,8 +288,8 @@ func Test_Failures(t *testing.T) {
mockfile.On("Close").Return(nil).Once()
mockfile.On("Path").Return(filestore.Path("mock")).Once()

pio := pieceio.NewPieceIO(pr, cio, sc, &fsmock)
_, _, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
pio := pieceio.NewPieceIO(pr, cio, sc, &fsmock, sourceBs)
_, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node)
require.Error(t, err)
})
t.Run("seek fails", func(t *testing.T) {
Expand Down Expand Up @@ -315,8 +318,8 @@ func Test_Failures(t *testing.T) {
mockfile.On("Path").Return(filestore.Path("mock")).Once()
mockfile.On("Seek", mock.Anything, mock.Anything).Return(int64(0), fmt.Errorf("seek failed"))

pio := pieceio.NewPieceIO(pr, cio, sc, &fsmock)
_, _, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
pio := pieceio.NewPieceIO(pr, cio, sc, &fsmock, sourceBs)
_, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node)
require.Error(t, err)
})
t.Run("generate piece commitment fails", func(t *testing.T) {
Expand All @@ -329,8 +332,8 @@ func Test_Failures(t *testing.T) {

store, err := filestore.NewLocalFileStore(tempDir)
require.NoError(t, err)
pio := pieceio.NewPieceIO(pr, cio, &sc, store)
_, _, err = pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
pio := pieceio.NewPieceIO(pr, cio, &sc, store, sourceBs)
_, _, err = pio.GeneratePieceCommitment(nd3.Cid(), node)
require.Error(t, err)
})
}
4 changes: 2 additions & 2 deletions pieceio/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ type ReadStore interface {

// PieceIO converts between payloads and pieces
type PieceIO interface {
GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error)
ReadPiece(r io.Reader, bs WriteStore) (cid.Cid, error)
GeneratePieceCommitment(payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.File, error)
ReadPiece(r io.Reader) (cid.Cid, error)
}
2 changes: 1 addition & 1 deletion storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewClient(h host.Host, bs blockstore.Blockstore, dataTransfer datatransfer.
if err != nil {
return nil, err
}
pio := pieceio.NewPieceIO(pr, carIO, sectorCalculator, fs)
pio := pieceio.NewPieceIO(pr, carIO, sectorCalculator, fs, bs)

c := &Client{
h: h,
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/impl/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Client) commP(ctx context.Context, root cid.Cid) ([]byte, uint64, error
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

commp, tmpFile, err := c.pio.GeneratePieceCommitment(c.bs, root, allSelector)
commp, tmpFile, err := c.pio.GeneratePieceCommitment(root, allSelector)
if err != nil {
return nil, 0, xerrors.Errorf("generating CommP: %w", err)
}
Expand Down
10 changes: 4 additions & 6 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ipld "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket"
Expand All @@ -40,9 +40,7 @@ type Provider struct {

spn storagemarket.StorageProviderNode

// TODO: This will go away once storage market module + CAR
// is implemented
dag ipld.DAGService
pio pieceio.PieceIO

// dataTransfer is the manager of data transfers used by this storage provider
dataTransfer datatransfer.Manager
Expand Down Expand Up @@ -72,7 +70,7 @@ var (
ErrDataTransferFailed = errors.New("deal data transfer failed")
)

func NewProvider(ds datastore.Batching, dag ipld.DAGService, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
func NewProvider(ds datastore.Batching, pio pieceio.PieceIO, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
addr, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil {
return nil, err
Expand All @@ -83,7 +81,7 @@ func NewProvider(ds datastore.Batching, dag ipld.DAGService, dataTransfer datatr
}

h := &Provider{
dag: dag,
pio: pio,
dataTransfer: dataTransfer,
spn: spn,

Expand Down
19 changes: 18 additions & 1 deletion storagemarket/impl/provider_states.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storageimpl

import (
"bytes"
"context"

ipldfree "github.com/ipld/go-ipld-prime/impl/free"
Expand Down Expand Up @@ -134,6 +135,21 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
// STAGED

func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
// entire DAG selector
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

commp, file, err := p.pio.GeneratePieceCommitment(deal.Ref, allSelector)
if err != nil {
return nil, err
}

// Verify CommP matches
if !bytes.Equal(commp, deal.Proposal.PieceRef) {
return nil, xerrors.Errorf("proposal CommP doesn't match calculated CommP")
}

sectorID, err := p.spn.OnDealComplete(
ctx,
storagemarket.MinerDeal{
Expand All @@ -144,7 +160,7 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
Ref: deal.Ref,
DealID: deal.DealID,
},
"",
string(deal.PiecePath),
)

if err != nil {
Expand All @@ -153,6 +169,7 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)

return func(deal *MinerDeal) {
deal.SectorID = sectorID
deal.PiecePath = file.Path()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe (could be wrong) that in order for this to work, PiecePath also needs to be passed into OnDealComplete

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also also, I forgot that OnDealComplete has a third parameter for the piece path, and given that, I wonder if we need to retain it in the miner deal. (this part is non-blocking, the above is blocking)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, also, also, you should check with @laser, but I believe the miner implementation is not gonna be able to return us the SectorID, and we have an alternate mechanism for that, and, we should probably change that signature (though, no need to do that just yet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, this makes sense. I think we do need to store the piece path between deal states, as (upcoming) we will have a few states between writing the piece to the FileStore and when we call OnDealComplete().

}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions storagemarket/impl/provider_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

"github.com/ipld/go-ipld-prime"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-data-transfer"

"github.com/filecoin-project/go-fil-markets/storagemarket"

"github.com/filecoin-project/go-cbor-util"
Expand Down Expand Up @@ -58,7 +58,7 @@ func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) {
return proposal, xerrors.Errorf("incoming deal proposal has no signature")
}

if err := proposal.DealProposal.Verify(address.Undef); err != nil {
if err := proposal.DealProposal.Verify(); err != nil {
return proposal, xerrors.Errorf("verifying StorageDealProposal: %w", err)
}

Expand Down
22 changes: 9 additions & 13 deletions storagemarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
)
Expand Down Expand Up @@ -104,21 +105,15 @@ func (sdp *StorageDealProposal) Cid() (cid.Cid, error) {
return nd.Cid(), nil
}

func (sdp *StorageDealProposal) Verify(worker address.Address) error {
if sdp.Client != worker || worker == address.Undef {
unsigned := *sdp
unsigned.ProposerSignature = nil
var buf bytes.Buffer
if err := unsigned.MarshalCBOR(&buf); err != nil {
return err
}

if err := sdp.ProposerSignature.Verify(sdp.Client, buf.Bytes()); err != nil {
return err
}
func (sdp *StorageDealProposal) Verify() error {
unsigned := *sdp
unsigned.ProposerSignature = nil
var buf bytes.Buffer
if err := unsigned.MarshalCBOR(&buf); err != nil {
return err
}

return nil
return sdp.ProposerSignature.Verify(sdp.Client, buf.Bytes())
}

type StorageDeal struct {
Expand Down Expand Up @@ -149,6 +144,7 @@ type MinerDeal struct {
Miner peer.ID
Client peer.ID
State DealState
PiecePath filestore.Path

Ref cid.Cid

Expand Down