diff --git a/go.mod b/go.mod index e36bf49f..b27020c2 100644 --- a/go.mod +++ b/go.mod @@ -8,12 +8,13 @@ require ( github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce + github.com/filecoin-project/go-padreader v0.0.0-20200130212543-892867c4edf9 github.com/filecoin-project/go-sectorbuilder v0.0.1 github.com/filecoin-project/go-statestore v0.1.0 github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c - github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 + github.com/ipfs/go-car v0.0.3-0.20200131220434-3f68f6ebd093 github.com/ipfs/go-cid v0.0.4 github.com/ipfs/go-datastore v0.1.1 github.com/ipfs/go-graphsync v0.0.4 diff --git a/go.sum b/go.sum index 688133bd..9aa065a7 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMX github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY= github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww= +github.com/filecoin-project/go-padreader v0.0.0-20200130212543-892867c4edf9 h1:CQsjS+oWG96rk5YbeKpPw84fhbgc5H6/BGvrlPgd63A= +github.com/filecoin-project/go-padreader v0.0.0-20200130212543-892867c4edf9/go.mod h1:r0gyD7zvnqyRKSY8stil5G/LF0kXFgNzW/yR4vjga+Y= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= github.com/filecoin-project/go-sectorbuilder v0.0.1 h1:yiLSEprWA1E43DFTSCXLSuCstYuDKiI6RCXiYz4GaRs= @@ -142,8 +144,8 @@ github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJ github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= -github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 h1:l8gnU1VBhftugMKzfh+n7nuDhOw3X1iqfrA33GVBMMY= -github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk= +github.com/ipfs/go-car v0.0.3-0.20200131220434-3f68f6ebd093 h1:mYq7vJKGUzxIkkYfqXfO0uEO8gOmV9F38Tcpvi/p8P8= +github.com/ipfs/go-car v0.0.3-0.20200131220434-3f68f6ebd093/go.mod h1:rEkw0S1sHd5kHL3rUSGEhwNanYqTwwNhjtpp0rwjrr4= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/pieceio/cario/cario.go b/pieceio/cario/cario.go index 294b43bb..38be8c63 100644 --- a/pieceio/cario/cario.go +++ b/pieceio/cario/cario.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/go-car" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/filecoin-project/go-fil-markets/pieceio" ) @@ -16,16 +15,19 @@ import ( type carIO struct { } +// NewCarIO returns a new CarIO utility module that wraps go-car func NewCarIO() pieceio.CarIO { return &carIO{} } -func (c carIO) WriteCar(ctx context.Context, bs pieceio.ReadStore, payloadCid cid.Cid, node ipld.Node, w io.Writer) error { - selector, err := selector.ParseSelector(node) - if err != nil { - return err - } - return car.WriteSelectiveCar(ctx, bs, []car.CarDag{{Root: payloadCid, Selector: selector}}, w) +func (c carIO) WriteCar(ctx context.Context, bs pieceio.ReadStore, payloadCid cid.Cid, selector ipld.Node, w io.Writer) error { + sc := car.NewSelectiveCar(ctx, bs, []car.Dag{{Root: payloadCid, Selector: selector}}) + return sc.Write(w) +} + +func (c carIO) PrepareCar(ctx context.Context, bs pieceio.ReadStore, payloadCid cid.Cid, selector ipld.Node) (pieceio.PreparedCar, error) { + sc := car.NewSelectiveCar(ctx, bs, []car.Dag{{Root: payloadCid, Selector: selector}}) + return sc.Prepare() } func (c carIO) LoadCar(bs pieceio.WriteStore, r io.Reader) (cid.Cid, error) { diff --git a/pieceio/mocks/CarIO.go b/pieceio/mocks/CarIO.go index 2fc4a890..f354576f 100644 --- a/pieceio/mocks/CarIO.go +++ b/pieceio/mocks/CarIO.go @@ -35,13 +35,36 @@ func (_m *CarIO) LoadCar(bs pieceio.WriteStore, r io.Reader) (cid.Cid, error) { return r0, r1 } -// WriteCar provides a mock function with given fields: ctx, bs, payloadCid, selector, w -func (_m *CarIO) WriteCar(ctx context.Context, bs pieceio.ReadStore, payloadCid cid.Cid, selector ipld.Node, w io.Writer) error { - ret := _m.Called(ctx, bs, payloadCid, selector, w) +// PrepareCar provides a mock function with given fields: ctx, bs, payloadCid, node +func (_m *CarIO) PrepareCar(ctx context.Context, bs pieceio.ReadStore, payloadCid cid.Cid, node ipld.Node) (pieceio.PreparedCar, error) { + ret := _m.Called(ctx, bs, payloadCid, node) + + var r0 pieceio.PreparedCar + if rf, ok := ret.Get(0).(func(context.Context, pieceio.ReadStore, cid.Cid, ipld.Node) pieceio.PreparedCar); ok { + r0 = rf(ctx, bs, payloadCid, node) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(pieceio.PreparedCar) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, pieceio.ReadStore, cid.Cid, ipld.Node) error); ok { + r1 = rf(ctx, bs, payloadCid, node) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WriteCar provides a mock function with given fields: ctx, bs, payloadCid, node, w +func (_m *CarIO) WriteCar(ctx context.Context, bs pieceio.ReadStore, payloadCid cid.Cid, node ipld.Node, w io.Writer) error { + ret := _m.Called(ctx, bs, payloadCid, node, w) var r0 error if rf, ok := ret.Get(0).(func(context.Context, pieceio.ReadStore, cid.Cid, ipld.Node, io.Writer) error); ok { - r0 = rf(ctx, bs, payloadCid, selector, w) + r0 = rf(ctx, bs, payloadCid, node, w) } else { r0 = ret.Error(0) } diff --git a/pieceio/mocks/PadReader.go b/pieceio/mocks/PadReader.go deleted file mode 100644 index 31792682..00000000 --- a/pieceio/mocks/PadReader.go +++ /dev/null @@ -1,24 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// PadReader is an autogenerated mock type for the PadReader type -type PadReader struct { - mock.Mock -} - -// PaddedSize provides a mock function with given fields: size -func (_m *PadReader) PaddedSize(size uint64) uint64 { - ret := _m.Called(size) - - var r0 uint64 - if rf, ok := ret.Get(0).(func(uint64) uint64); ok { - r0 = rf(size) - } else { - r0 = ret.Get(0).(uint64) - } - - return r0 -} diff --git a/pieceio/mocks/PreparedCar.go b/pieceio/mocks/PreparedCar.go new file mode 100644 index 00000000..5b6573cd --- /dev/null +++ b/pieceio/mocks/PreparedCar.go @@ -0,0 +1,39 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import io "io" +import mock "github.com/stretchr/testify/mock" + +// PreparedCar is an autogenerated mock type for the PreparedCar type +type PreparedCar struct { + mock.Mock +} + +// Dump provides a mock function with given fields: w +func (_m *PreparedCar) Dump(w io.Writer) error { + ret := _m.Called(w) + + var r0 error + if rf, ok := ret.Get(0).(func(io.Writer) error); ok { + r0 = rf(w) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Size provides a mock function with given fields: +func (_m *PreparedCar) Size() uint64 { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} diff --git a/pieceio/padreader/padreader.go b/pieceio/padreader/padreader.go deleted file mode 100644 index 6a1226f2..00000000 --- a/pieceio/padreader/padreader.go +++ /dev/null @@ -1,39 +0,0 @@ -package padreader - -import ( - "io" - "math/bits" - - ffi "github.com/filecoin-project/filecoin-ffi" -) - -// Functions bellow copied from lotus/lib/padreader/padreader.go -func PaddedSize(size uint64) uint64 { - logv := 64 - bits.LeadingZeros64(size) - - sectSize := uint64(1 << logv) - bound := ffi.GetMaxUserBytesPerStagedSector(sectSize) - if size <= bound { - return bound - } - - return ffi.GetMaxUserBytesPerStagedSector(1 << (logv + 1)) -} - -type nullReader struct{} - -func (nr nullReader) Read(b []byte) (int, error) { - for i := range b { - b[i] = 0 - } - return len(b), nil -} - -func NewPaddedReader(r io.Reader, size uint64) (io.Reader, uint64) { - padSize := PaddedSize(size) - - return io.MultiReader( - io.LimitReader(r, int64(size)), - io.LimitReader(nullReader{}, int64(padSize-size)), - ), padSize -} diff --git a/pieceio/pieceio.go b/pieceio/pieceio.go index 9e65949e..a491b9c9 100644 --- a/pieceio/pieceio.go +++ b/pieceio/pieceio.go @@ -3,34 +3,90 @@ package pieceio import ( "context" "io" + "os" + "sync" + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/ipld/go-ipld-prime" "github.com/filecoin-project/go-fil-markets/filestore" - "github.com/filecoin-project/go-fil-markets/pieceio/padreader" - "github.com/filecoin-project/go-sectorbuilder" ) +type PreparedCar interface { + Size() uint64 + Dump(w io.Writer) error +} + type CarIO interface { // WriteCar writes a given payload to a CAR file and into the passed IO stream - WriteCar(ctx context.Context, bs ReadStore, payloadCid cid.Cid, selector ipld.Node, w io.Writer) error + WriteCar(ctx context.Context, bs ReadStore, payloadCid cid.Cid, node ipld.Node, w io.Writer) error + + // PrepareCar prepares a car so that it's total size can be calculated without writing it to a file. + // It can then be written with PreparedCar.Dump + PrepareCar(ctx context.Context, bs ReadStore, payloadCid cid.Cid, node ipld.Node) (PreparedCar, error) + // LoadCar loads blocks into the a store from a given CAR file LoadCar(bs WriteStore, r io.Reader) (cid.Cid, error) } type pieceIO struct { carIO CarIO - store filestore.FileStore bs blockstore.Blockstore } -func NewPieceIO(carIO CarIO, store filestore.FileStore, bs blockstore.Blockstore) PieceIO { - return &pieceIO{carIO, store, bs} +func NewPieceIO(carIO CarIO, bs blockstore.Blockstore) PieceIO { + return &pieceIO{carIO, bs} +} + +type pieceIOWithStore struct { + pieceIO + store filestore.FileStore +} + +func NewPieceIOWithStore(carIO CarIO, store filestore.FileStore, bs blockstore.Blockstore) PieceIOWithStore { + return &pieceIOWithStore{pieceIO{carIO, bs}, store} +} + +func (pio *pieceIO) GeneratePieceCommitment(payloadCid cid.Cid, selector ipld.Node) ([]byte, uint64, error) { + preparedCar, err := pio.carIO.PrepareCar(context.Background(), pio.bs, payloadCid, selector) + if err != nil { + return nil, 0, err + } + pieceSize := uint64(preparedCar.Size()) + r, w, err := os.Pipe() + if err != nil { + return nil, 0, err + } + var stop sync.WaitGroup + stop.Add(1) + var werr error + go func() { + defer stop.Done() + werr = preparedCar.Dump(w) + err := w.Close() + if werr == nil && err != nil { + werr = err + } + }() + commitment, paddedSize, err := GeneratePieceCommitment(r, pieceSize) + closeErr := r.Close() + if err != nil { + return nil, 0, err + } + if closeErr != nil { + return nil, 0, closeErr + } + stop.Wait() + if werr != nil { + return nil, 0, werr + } + return commitment, paddedSize, nil } -func (pio *pieceIO) GeneratePieceCommitment(payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, uint64, error) { +func (pio *pieceIOWithStore) GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, uint64, error) { f, err := pio.store.CreateTemp() if err != nil { return nil, "", 0, err @@ -60,7 +116,7 @@ func (pio *pieceIO) GeneratePieceCommitment(payloadCid cid.Cid, selector ipld.No } func GeneratePieceCommitment(rd io.Reader, pieceSize uint64) ([]byte, uint64, error) { - paddedReader, paddedSize := padreader.NewPaddedReader(rd, pieceSize) + paddedReader, paddedSize := padreader.New(rd, pieceSize) commitment, err := sectorbuilder.GeneratePieceCommitment(paddedReader, paddedSize) if err != nil { return nil, 0, err diff --git a/pieceio/pieceio_test.go b/pieceio/pieceio_test.go index 3c5b6044..c5a92658 100644 --- a/pieceio/pieceio_test.go +++ b/pieceio/pieceio_test.go @@ -32,7 +32,7 @@ func Test_ThereAndBackAgain(t *testing.T) { sourceBserv := dstest.Bserv() sourceBs := sourceBserv.Blockstore() - pio := pieceio.NewPieceIO(cio, store, sourceBs) + pio := pieceio.NewPieceIOWithStore(cio, store, sourceBs) require.NoError(t, err) dserv := dag.NewDAGService(sourceBserv) @@ -65,7 +65,7 @@ func Test_ThereAndBackAgain(t *testing.T) { ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())))) }).Node() - bytes, tmpPath, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node) + bytes, tmpPath, _, err := pio.GeneratePieceCommitmentToFile(nd3.Cid(), node) require.NoError(t, err) tmpFile, err := store.Open(tmpPath) require.NoError(t, err) @@ -123,7 +123,7 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) { sourceBserv := dstest.Bserv() sourceBs := sourceBserv.Blockstore() - pio := pieceio.NewPieceIO(cio, store, sourceBs) + pio := pieceio.NewPieceIOWithStore(cio, store, sourceBs) dserv := dag.NewDAGService(sourceBserv) a := dag.NewRawNode([]byte("aaaa")) @@ -155,7 +155,7 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) { ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())))) }).Node() - commitment, tmpPath, paddedSize, err := pio.GeneratePieceCommitment(nd3.Cid(), node) + commitment, tmpPath, paddedSize, err := pio.GeneratePieceCommitmentToFile(nd3.Cid(), node) require.NoError(t, err) tmpFile, err := store.Open(tmpPath) require.NoError(t, err) @@ -181,6 +181,61 @@ func Test_StoreRestoreMemoryBuffer(t *testing.T) { require.Equal(t, commitment, secondCommitment[:]) } +func Test_PieceCommitmentEquivalenceMemoryFile(t *testing.T) { + tempDir := filestore.OsPath("./tempDir") + cio := cario.NewCarIO() + + store, err := filestore.NewLocalFileStore(tempDir) + require.NoError(t, err) + + sourceBserv := dstest.Bserv() + sourceBs := sourceBserv.Blockstore() + pio := pieceio.NewPieceIOWithStore(cio, store, sourceBs) + + dserv := dag.NewDAGService(sourceBserv) + a := dag.NewRawNode([]byte("aaaa")) + b := dag.NewRawNode([]byte("bbbb")) + c := dag.NewRawNode([]byte("cccc")) + + nd1 := &dag.ProtoNode{} + _ = nd1.AddNodeLink("cat", a) + + nd2 := &dag.ProtoNode{} + _ = nd2.AddNodeLink("first", nd1) + _ = nd2.AddNodeLink("dog", b) + + nd3 := &dag.ProtoNode{} + _ = nd3.AddNodeLink("second", nd2) + _ = nd3.AddNodeLink("bear", c) + + ctx := context.Background() + _ = dserv.Add(ctx, a) + _ = dserv.Add(ctx, b) + _ = dserv.Add(ctx, c) + _ = dserv.Add(ctx, nd1) + _ = dserv.Add(ctx, nd2) + _ = dserv.Add(ctx, nd3) + + ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) + node := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) { + efsb.Insert("Links", + ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())))) + }).Node() + + fcommitment, tmpPath, fpaddedSize, ferr := pio.GeneratePieceCommitmentToFile(nd3.Cid(), node) + defer func() { + deferErr := store.Delete(tmpPath) + require.NoError(t, deferErr) + }() + + mcommitment, mpaddedSize, merr := pio.GeneratePieceCommitment(nd3.Cid(), node) + require.Equal(t, fcommitment, mcommitment) + require.Equal(t, fpaddedSize, mpaddedSize) + require.Equal(t, ferr, merr) + require.NoError(t, ferr) + require.NoError(t, merr) +} + func Test_Failures(t *testing.T) { sourceBserv := dstest.Bserv() sourceBs := sourceBserv.Blockstore() @@ -217,8 +272,8 @@ func Test_Failures(t *testing.T) { t.Run("create temp file fails", func(t *testing.T) { fsmock := fsmocks.FileStore{} fsmock.On("CreateTemp").Return(nil, fmt.Errorf("Failed")) - pio := pieceio.NewPieceIO(nil, &fsmock, sourceBs) - _, _, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node) + pio := pieceio.NewPieceIOWithStore(nil, &fsmock, sourceBs) + _, _, _, err := pio.GeneratePieceCommitmentToFile(nd3.Cid(), node) require.Error(t, err) }) t.Run("write CAR fails", func(t *testing.T) { @@ -229,8 +284,28 @@ func Test_Failures(t *testing.T) { ciomock := pmocks.CarIO{} any := mock.Anything ciomock.On("WriteCar", any, any, any, any, any).Return(fmt.Errorf("failed to write car")) - pio := pieceio.NewPieceIO(&ciomock, store, sourceBs) - _, _, _, err = pio.GeneratePieceCommitment(nd3.Cid(), node) + pio := pieceio.NewPieceIOWithStore(&ciomock, store, sourceBs) + _, _, _, err = pio.GeneratePieceCommitmentToFile(nd3.Cid(), node) + require.Error(t, err) + }) + t.Run("prepare CAR fails", func(t *testing.T) { + + ciomock := pmocks.CarIO{} + any := mock.Anything + ciomock.On("PrepareCar", any, any, any, any).Return(nil, fmt.Errorf("failed to prepare car")) + pio := pieceio.NewPieceIO(&ciomock, sourceBs) + _, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node) + require.Error(t, err) + }) + t.Run("PreparedCard dump operation fails", func(t *testing.T) { + preparedCarMock := pmocks.PreparedCar{} + ciomock := pmocks.CarIO{} + any := mock.Anything + ciomock.On("PrepareCar", any, any, any, any).Return(&preparedCarMock, nil) + preparedCarMock.On("Size").Return(uint64(1000)) + preparedCarMock.On("Dump", any).Return(fmt.Errorf("failed to write car")) + pio := pieceio.NewPieceIO(&ciomock, sourceBs) + _, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node) require.Error(t, err) }) t.Run("seek fails", func(t *testing.T) { @@ -256,8 +331,8 @@ func Test_Failures(t *testing.T) { mockfile.On("Path").Return(filestore.Path("mock")).Once() mockfile.On("Seek", mock.Anything, mock.Anything).Return(int64(0), fmt.Errorf("seek failed")) - pio := pieceio.NewPieceIO(cio, &fsmock, sourceBs) - _, _, _, err := pio.GeneratePieceCommitment(nd3.Cid(), node) + pio := pieceio.NewPieceIOWithStore(cio, &fsmock, sourceBs) + _, _, _, err := pio.GeneratePieceCommitmentToFile(nd3.Cid(), node) require.Error(t, err) }) } diff --git a/pieceio/types.go b/pieceio/types.go index 601200f5..a9326eeb 100644 --- a/pieceio/types.go +++ b/pieceio/types.go @@ -1,9 +1,10 @@ package pieceio import ( - "github.com/filecoin-project/go-fil-markets/filestore" "io" + "github.com/filecoin-project/go-fil-markets/filestore" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" @@ -19,6 +20,11 @@ type ReadStore interface { // PieceIO converts between payloads and pieces type PieceIO interface { - GeneratePieceCommitment(payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, uint64, error) + GeneratePieceCommitment(payloadCid cid.Cid, selector ipld.Node) ([]byte, uint64, error) ReadPiece(r io.Reader) (cid.Cid, error) } + +type PieceIOWithStore interface { + PieceIO + GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, uint64, error) +} diff --git a/retrievalmarket/impl/client.go b/retrievalmarket/impl/client.go index f12712b1..e3f8d6f6 100644 --- a/retrievalmarket/impl/client.go +++ b/retrievalmarket/impl/client.go @@ -83,6 +83,7 @@ func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pay return s.ReadQueryResponse() } +// Retrieve begins the process of requesting the data referred to by payloadCID, after a deal is accepted func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds tokenamount.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address) retrievalmarket.DealID { /* The implementation of this function is just wrapper for the old code which retrieves UnixFS pieces -- it will be replaced when we do the V0 implementation of the module */ diff --git a/retrievalmarket/impl/clientstates/client_states.go b/retrievalmarket/impl/clientstates/client_states.go index db4fc573..4b047874 100644 --- a/retrievalmarket/impl/clientstates/client_states.go +++ b/retrievalmarket/impl/clientstates/client_states.go @@ -55,7 +55,7 @@ func ProposeDeal(ctx context.Context, environment ClientDealEnvironment, deal rm } response, err := stream.ReadDealResponse() if err != nil { - return errorFunc(xerrors.Errorf("reading deal reaponse: %w", err)) + return errorFunc(xerrors.Errorf("reading deal response: %w", err)) } if response.Status == rm.DealStatusRejected { return func(deal *rm.ClientDealState) { diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index 3103a71b..cf3e9e2c 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -121,10 +121,6 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC } func TestClientCanMakeDealWithProvider(t *testing.T) { - bgCtx := context.Background() - clientPaymentChannel, err := address.NewIDAddress(rand.Uint64()) - require.NoError(t, err) - // -------- SET UP PROVIDER testCases := []struct { @@ -156,8 +152,12 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { unsealing: true}, } - for _, testCase := range testCases { + for i, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { + bgCtx := context.Background() + clientPaymentChannel, err := address.NewIDAddress(uint64(i * 10)) + require.NoError(t, err) + testData := tut.NewLibp2pTestData(bgCtx, t) // Inject a unixFS file on the provider side to its blockstore @@ -166,7 +166,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { c, ok := pieceLink.(cidlink.Link) require.True(t, ok) payloadCID := c.Cid - providerPaymentAddr, err := address.NewIDAddress(rand.Uint64()) + providerPaymentAddr, err := address.NewIDAddress(uint64(i * 99)) require.NoError(t, err) paymentInterval := uint64(10000) paymentIntervalIncrease := uint64(1000) @@ -251,8 +251,10 @@ TotalReceived: %d BytesPaidFor: %d CurrentInterval: %d TotalFunds: %s +Message: %s ` - t.Logf(msg, state.Status, state.TotalReceived, state.BytesPaidFor, state.CurrentInterval, state.TotalFunds.String()) + t.Errorf(msg, state.Status, state.TotalReceived, state.BytesPaidFor, state.CurrentInterval, + state.TotalFunds.String(), state.Message) } }) @@ -270,7 +272,8 @@ FundsReceived: %s Message: %s CurrentInterval: %d ` - t.Logf(msg, state.Status, state.TotalSent, state.FundsReceived.String(), state.Message, state.CurrentInterval) + t.Errorf(msg, state.Status, state.TotalSent, state.FundsReceived.String(), state.Message, + state.CurrentInterval) } }) @@ -290,7 +293,7 @@ CurrentInterval: %d did := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address) assert.Equal(t, did, retrievalmarket.DealID(1)) - ctx, cancel := context.WithTimeout(bgCtx, 5*time.Second) + ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second) defer cancel() // verify that client subscribers will be notified of state changes diff --git a/retrievalmarket/impl/providerstates/provider_states.go b/retrievalmarket/impl/providerstates/provider_states.go index 2f903cfb..3e503393 100644 --- a/retrievalmarket/impl/providerstates/provider_states.go +++ b/retrievalmarket/impl/providerstates/provider_states.go @@ -130,7 +130,7 @@ func ProcessPayment(ctx context.Context, environment ProviderDealEnvironment, de // read payment, or fail payment, err := environment.DealStream().ReadDealPayment() if err != nil { - return errorFunc(xerrors.Errorf("reading payment: %", err)) + return errorFunc(xerrors.Errorf("reading payment: %w", err)) } // attempt to redeem voucher diff --git a/retrievalmarket/network/deal_stream.go b/retrievalmarket/network/deal_stream.go index a6f7b10c..b65efc15 100644 --- a/retrievalmarket/network/deal_stream.go +++ b/retrievalmarket/network/deal_stream.go @@ -1,6 +1,8 @@ package network import ( + "bufio" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" @@ -9,8 +11,9 @@ import ( ) type DealStream struct { - p peer.ID - rw mux.MuxedStream + p peer.ID + rw mux.MuxedStream + buffered *bufio.Reader } var _ RetrievalDealStream = (*DealStream)(nil) @@ -18,7 +21,7 @@ var _ RetrievalDealStream = (*DealStream)(nil) func (d *DealStream) ReadDealProposal() (retrievalmarket.DealProposal, error) { var ds retrievalmarket.DealProposal - if err := ds.UnmarshalCBOR(d.rw); err != nil { + if err := ds.UnmarshalCBOR(d.buffered); err != nil { log.Warn(err) return retrievalmarket.DealProposalUndefined, err } @@ -32,7 +35,7 @@ func (d *DealStream) WriteDealProposal(dp retrievalmarket.DealProposal) error { func (d *DealStream) ReadDealResponse() (retrievalmarket.DealResponse, error) { var dr retrievalmarket.DealResponse - if err := dr.UnmarshalCBOR(d.rw); err != nil { + if err := dr.UnmarshalCBOR(d.buffered); err != nil { return retrievalmarket.DealResponseUndefined, err } return dr, nil diff --git a/retrievalmarket/network/libp2p_impl.go b/retrievalmarket/network/libp2p_impl.go index f284d0c4..43ea7d1e 100644 --- a/retrievalmarket/network/libp2p_impl.go +++ b/retrievalmarket/network/libp2p_impl.go @@ -1,6 +1,7 @@ package network import ( + "bufio" "context" logging "github.com/ipfs/go-log/v2" @@ -31,7 +32,8 @@ func (impl *libp2pRetrievalMarketNetwork) NewQueryStream(id peer.ID) (RetrievalQ log.Warn(err) return nil, err } - return &QueryStream{p: id, rw: s}, nil + buffered := bufio.NewReaderSize(s, 16) + return &QueryStream{p: id, rw: s, buffered: buffered}, nil } func (impl *libp2pRetrievalMarketNetwork) NewDealStream(id peer.ID) (RetrievalDealStream, error) { @@ -39,7 +41,8 @@ func (impl *libp2pRetrievalMarketNetwork) NewDealStream(id peer.ID) (RetrievalDe if err != nil { return nil, err } - return &DealStream{p: id, rw: s}, nil + buffered := bufio.NewReaderSize(s, 16) + return &DealStream{p: id, rw: s, buffered: buffered}, nil } func (impl *libp2pRetrievalMarketNetwork) SetDelegate(r RetrievalReceiver) error { @@ -63,7 +66,8 @@ func (impl *libp2pRetrievalMarketNetwork) handleNewQueryStream(s network.Stream) return } remotePID := s.Conn().RemotePeer() - qs := &QueryStream{remotePID, s} + buffered := bufio.NewReaderSize(s, 16) + qs := &QueryStream{remotePID, s, buffered} impl.receiver.HandleQueryStream(qs) } @@ -74,6 +78,7 @@ func (impl *libp2pRetrievalMarketNetwork) handleNewDealStream(s network.Stream) return } remotePID := s.Conn().RemotePeer() - ds := &DealStream{remotePID, s} + buffered := bufio.NewReaderSize(s, 16) + ds := &DealStream{remotePID, s, buffered} impl.receiver.HandleDealStream(ds) } diff --git a/retrievalmarket/network/query_stream.go b/retrievalmarket/network/query_stream.go index 693964d0..3218d999 100644 --- a/retrievalmarket/network/query_stream.go +++ b/retrievalmarket/network/query_stream.go @@ -1,6 +1,8 @@ package network import ( + "bufio" + "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" @@ -9,8 +11,9 @@ import ( ) type QueryStream struct { - p peer.ID - rw mux.MuxedStream + p peer.ID + rw mux.MuxedStream + buffered *bufio.Reader } var _ RetrievalQueryStream = (*QueryStream)(nil) @@ -18,7 +21,7 @@ var _ RetrievalQueryStream = (*QueryStream)(nil) func (qs *QueryStream) ReadQuery() (retrievalmarket.Query, error) { var q retrievalmarket.Query - if err := q.UnmarshalCBOR(qs.rw); err != nil { + if err := q.UnmarshalCBOR(qs.buffered); err != nil { log.Warn(err) return retrievalmarket.QueryUndefined, err @@ -34,7 +37,7 @@ func (qs *QueryStream) WriteQuery(q retrievalmarket.Query) error { func (qs *QueryStream) ReadQueryResponse() (retrievalmarket.QueryResponse, error) { var resp retrievalmarket.QueryResponse - if err := resp.UnmarshalCBOR(qs.rw); err != nil { + if err := resp.UnmarshalCBOR(qs.buffered); err != nil { log.Warn(err) return retrievalmarket.QueryResponseUndefined, err } diff --git a/shared_testutil/test_types.go b/shared_testutil/test_types.go index 8bd24a09..4f8b6452 100644 --- a/shared_testutil/test_types.go +++ b/shared_testutil/test_types.go @@ -12,6 +12,8 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-fil-markets/storagemarket" + smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) // MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields @@ -110,6 +112,87 @@ func MakeTestDealPayment() retrievalmarket.DealPayment { } } +// MakeTestStorageDealProposal generates a valid storage deal proposal +func MakeTestStorageDealProposal() *storagemarket.StorageDealProposal { + return &storagemarket.StorageDealProposal{ + PieceRef: RandomBytes(32), + PieceSize: rand.Uint64(), + + Client: address.TestAddress, + Provider: address.TestAddress2, + + ProposalExpiration: rand.Uint64(), + Duration: rand.Uint64(), + + StoragePricePerEpoch: MakeTestTokenAmount(), + StorageCollateral: MakeTestTokenAmount(), + + ProposerSignature: MakeTestSignature(), + } +} + +// MakeTestStorageAsk generates a storage ask +func MakeTestStorageAsk() *types.StorageAsk { + return &types.StorageAsk{ + Price: MakeTestTokenAmount(), + MinPieceSize: rand.Uint64(), + Miner: address.TestAddress2, + Timestamp: rand.Uint64(), + Expiry: rand.Uint64(), + SeqNo: rand.Uint64(), + } +} + +// MakeTestSignedStorageAsk generates a signed storage ask +func MakeTestSignedStorageAsk() *types.SignedStorageAsk { + return &types.SignedStorageAsk{ + Ask: MakeTestStorageAsk(), + Signature: MakeTestSignature(), + } +} + +// MakeTestStorageNetworkProposal generates a proposal that can be sent over the +// network to a provider +func MakeTestStorageNetworkProposal() smnet.Proposal { + return smnet.Proposal{ + DealProposal: MakeTestStorageDealProposal(), + Piece: GenerateCids(1)[0], + } +} + +// MakeTestStorageNetworkResponse generates a response to a proposal sent over +// the network +func MakeTestStorageNetworkResponse() smnet.Response { + return smnet.Response{ + State: storagemarket.StorageDealPublished, + Proposal: GenerateCids(1)[0], + PublishMessage: &(GenerateCids(1)[0]), + } +} + +// MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over +// the network that is signed +func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse { + return smnet.SignedResponse{ + Response: MakeTestStorageNetworkResponse(), + Signature: MakeTestSignature(), + } +} + +// MakeTestStorageAskRequest generates a request to get a provider's ask +func MakeTestStorageAskRequest() smnet.AskRequest { + return smnet.AskRequest{ + Miner: address.TestAddress2, + } +} + +// MakeTestStorageAskResponse generates a response to an ask request +func MakeTestStorageAskResponse() smnet.AskResponse { + return smnet.AskResponse{ + Ask: MakeTestSignedStorageAsk(), + } +} + func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket.RetrievalPeer { peers := make([]retrievalmarket.RetrievalPeer, numPeers) for i := range peers { diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 8dbfce3c..2c9258a0 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -3,17 +3,17 @@ package storageimpl import ( "context" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-data-transfer" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/pieceio" "github.com/filecoin-project/go-fil-markets/pieceio/cario" @@ -32,11 +32,11 @@ var log = logging.Logger("deals") type ClientDeal struct { storagemarket.ClientDeal - s inet.Stream + s network.StorageDealStream } type Client struct { - h host.Host + net network.StorageMarketNetwork // dataTransfer // TODO: once the data transfer module is complete, the @@ -52,7 +52,7 @@ type Client struct { node storagemarket.StorageClientNode deals *statestore.StateStore - conns map[cid.Cid]inet.Stream + conns map[cid.Cid]network.StorageDealStream incoming chan *ClientDeal updated chan clientDealUpdate @@ -68,21 +68,20 @@ type clientDealUpdate struct { mut func(*ClientDeal) } -func NewClient(h host.Host, bs blockstore.Blockstore, fs filestore.FileStore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client { +func NewClient(net network.StorageMarketNetwork, bs blockstore.Blockstore, dataTransfer datatransfer.Manager, discovery *discovery.Local, deals *statestore.StateStore, scn storagemarket.StorageClientNode) *Client { carIO := cario.NewCarIO() - pio := pieceio.NewPieceIO(carIO, fs, bs) + pio := pieceio.NewPieceIO(carIO, bs) c := &Client{ - h: h, + net: net, dataTransfer: dataTransfer, bs: bs, - fs: fs, pio: pio, discovery: discovery, node: scn, deals: deals, - conns: map[cid.Cid]inet.Stream{}, + conns: map[cid.Cid]network.StorageDealStream{}, incoming: make(chan *ClientDeal, 16), updated: make(chan clientDealUpdate, 16), @@ -214,18 +213,13 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err) } - s, err := c.h.NewStream(ctx, p.MinerID, storagemarket.DealProtocolID) + s, err := c.net.NewDealStream(p.MinerID) if err != nil { return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err) } - proposal := &Proposal{ - DealProposal: dealProposal, - Piece: p.Data, - } - - if err := cborutil.WriteCborRPC(s, proposal); err != nil { - _ = s.Reset() + proposal := network.Proposal{DealProposal: dealProposal, Piece: p.Data} + if err := s.WriteDealProposal(proposal); err != nil { return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err) } @@ -251,20 +245,18 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro } func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) { - s, err := c.h.NewStream(ctx, p, storagemarket.AskProtocolID) + s, err := c.net.NewAskStream(p) if err != nil { return nil, xerrors.Errorf("failed to open stream to miner: %w", err) } - req := &AskRequest{ - Miner: a, - } - if err := cborutil.WriteCborRPC(s, req); err != nil { + request := network.AskRequest{Miner: a} + if err := s.WriteAskRequest(request); err != nil { return nil, xerrors.Errorf("failed to send ask request: %w", err) } - var out AskResponse - if err := cborutil.ReadCborRPC(s, &out); err != nil { + out, err := s.ReadAskResponse() + if err != nil { return nil, xerrors.Errorf("failed to read ask response: %w", err) } diff --git a/storagemarket/impl/client_utils.go b/storagemarket/impl/client_utils.go index 9797a63e..9b8a6f43 100644 --- a/storagemarket/impl/client_utils.go +++ b/storagemarket/impl/client_utils.go @@ -13,9 +13,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/go-data-transfer" ) func (c *Client) failDeal(id cid.Cid, cerr error) { @@ -26,7 +26,7 @@ func (c *Client) failDeal(id cid.Cid, cerr error) { s, ok := c.conns[id] if ok { - _ = s.Reset() + _ = s.Close() delete(c.conns, id) } @@ -41,28 +41,23 @@ func (c *Client) commP(ctx context.Context, root cid.Cid) ([]byte, uint64, error allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() - commp, tmpPath, paddedSize, err := c.pio.GeneratePieceCommitment(root, allSelector) + commp, paddedSize, err := c.pio.GeneratePieceCommitment(root, allSelector) if err != nil { return nil, 0, xerrors.Errorf("generating CommP: %w", err) } - err = c.fs.Delete(tmpPath) - if err != nil { - return nil, 0, xerrors.Errorf("error deleting temp file from filestore: %w", err) - } - return commp[:], paddedSize, nil } -func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) { +func (c *Client) readStorageDealResp(deal ClientDeal) (*network.Response, error) { s, ok := c.conns[deal.ProposalCid] if !ok { // TODO: Try to re-establish the connection using query protocol return nil, xerrors.Errorf("no connection to miner") } - var resp SignedResponse - if err := cborutil.ReadCborRPC(s, &resp); err != nil { + resp, err := s.ReadDealResponse() + if err != nil { log.Errorw("failed to read Response message", "error", err) return nil, err } diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 83279366..fc7e276e 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -9,13 +9,10 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" blockstore "github.com/ipfs/go-ipfs-blockstore" - "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/pieceio" "github.com/filecoin-project/go-fil-markets/pieceio/cario" @@ -23,7 +20,9 @@ import ( "github.com/filecoin-project/go-fil-markets/shared/tokenamount" "github.com/filecoin-project/go-fil-markets/shared/types" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/go-data-transfer" ) var ProviderDsPrefix = "/deals/provider" @@ -32,10 +31,12 @@ var ProviderDsPrefix = "/deals/provider" type MinerDeal struct { storagemarket.MinerDeal - s inet.Stream + s network.StorageDealStream } type Provider struct { + net network.StorageMarketNetwork + pricePerByteBlock tokenamount.TokenAmount // how much we want for storing one byte for one block minPieceSize uint64 @@ -44,8 +45,8 @@ type Provider struct { spn storagemarket.StorageProviderNode - fs filestore.FileStore - pio pieceio.PieceIO + fs filestore.FileStore + pio pieceio.PieceIOWithStore pieceStore piecestore.PieceStore // dataTransfer is the manager of data transfers used by this storage provider @@ -54,7 +55,7 @@ type Provider struct { deals *statestore.StateStore ds datastore.Batching - conns map[cid.Cid]inet.Stream + conns map[cid.Cid]network.StorageDealStream actor address.Address @@ -76,19 +77,12 @@ var ( ErrDataTransferFailed = errors.New("deal data transfer failed") ) -func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) { - addr, err := ds.Get(datastore.NewKey("miner-address")) - if err != nil { - return nil, err - } - minerAddress, err := address.NewFromBytes(addr) - if err != nil { - return nil, err - } +func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blockstore.Blockstore, fs filestore.FileStore, pieceStore piecestore.PieceStore, dataTransfer datatransfer.Manager, spn storagemarket.StorageProviderNode, minerAddress address.Address) (storagemarket.StorageProvider, error) { carIO := cario.NewCarIO() - pio := pieceio.NewPieceIO(carIO, fs, bs) + pio := pieceio.NewPieceIOWithStore(carIO, fs, bs) h := &Provider{ + net: net, fs: fs, pio: pio, pieceStore: pieceStore, @@ -98,7 +92,7 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F pricePerByteBlock: tokenamount.FromInt(3), // TODO: allow setting minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up)) - conns: map[cid.Cid]inet.Stream{}, + conns: map[cid.Cid]network.StorageDealStream{}, incoming: make(chan MinerDeal), updated: make(chan minerDealUpdate), @@ -130,11 +124,13 @@ func NewProvider(ds datastore.Batching, bs blockstore.Blockstore, fs filestore.F return h, nil } -func (p *Provider) Run(ctx context.Context, host host.Host) { +func (p *Provider) Start(ctx context.Context) error { // TODO: restore state - host.SetStreamHandler(storagemarket.DealProtocolID, p.HandleStream) - host.SetStreamHandler(storagemarket.AskProtocolID, p.HandleAskStream) + err := p.net.SetDelegate(p) + if err != nil { + return err + } go func() { defer log.Warn("quitting deal provider loop") @@ -151,6 +147,7 @@ func (p *Provider) Run(ctx context.Context, host host.Host) { } } }() + return nil } func (p *Provider) onIncoming(deal MinerDeal) { @@ -251,7 +248,7 @@ func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState da } } -func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) { +func (p *Provider) newDeal(s network.StorageDealStream, proposal network.Proposal) (MinerDeal, error) { proposalNd, err := cborutil.AsIpld(proposal.DealProposal) if err != nil { return MinerDeal{}, err @@ -259,7 +256,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) return MinerDeal{ MinerDeal: storagemarket.MinerDeal{ - Client: s.Conn().RemotePeer(), + Client: s.RemotePeer(), Proposal: *proposal.DealProposal, ProposalCid: proposalNd.Cid(), State: storagemarket.StorageDealUnknown, @@ -270,7 +267,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) }, nil } -func (p *Provider) HandleStream(s inet.Stream) { +func (p *Provider) HandleDealStream(s network.StorageDealStream) { log.Info("Handling storage deal proposal!") proposal, err := p.readProposal(s) @@ -290,7 +287,8 @@ func (p *Provider) HandleStream(s inet.Stream) { p.incoming <- deal } -func (p *Provider) Stop() { +func (p *Provider) Stop() error { close(p.stop) <-p.stopped + return p.net.StopHandlingRequests() } diff --git a/storagemarket/impl/provider_asks.go b/storagemarket/impl/provider_asks.go index 00e2d455..bfc24bdd 100644 --- a/storagemarket/impl/provider_asks.go +++ b/storagemarket/impl/provider_asks.go @@ -6,13 +6,13 @@ import ( "time" "github.com/ipfs/go-datastore" - inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-cbor-util" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) func (p *Provider) SetPrice(price tokenamount.TokenAmount, ttlsecs int64) error { @@ -52,24 +52,24 @@ func (p *Provider) GetAsk(m address.Address) *types.SignedStorageAsk { return p.ask } -func (p *Provider) HandleAskStream(s inet.Stream) { +func (p *Provider) HandleAskStream(s network.StorageAskStream) { defer s.Close() - var ar AskRequest - if err := cborutil.ReadCborRPC(s, &ar); err != nil { + ar, err := s.ReadAskRequest() + if err != nil { log.Errorf("failed to read AskRequest from incoming stream: %s", err) return } resp := p.processAskRequest(&ar) - if err := cborutil.WriteCborRPC(s, resp); err != nil { + if err := s.WriteAskResponse(resp); err != nil { log.Errorf("failed to write ask response: %s", err) return } } -func (p *Provider) processAskRequest(ar *AskRequest) *AskResponse { - return &AskResponse{ +func (p *Provider) processAskRequest(ar *network.AskRequest) network.AskResponse { + return network.AskResponse{ Ask: p.GetAsk(ar.Miner), } } diff --git a/storagemarket/impl/provider_states.go b/storagemarket/impl/provider_states.go index 24118492..c3051047 100644 --- a/storagemarket/impl/provider_states.go +++ b/storagemarket/impl/provider_states.go @@ -4,17 +4,17 @@ import ( "bytes" "context" + "github.com/filecoin-project/go-padreader" "github.com/ipfs/go-cid" ipldfree "github.com/ipld/go-ipld-prime/impl/free" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-fil-markets/pieceio/padreader" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) @@ -110,7 +110,7 @@ func (p *Provider) verifydata(ctx context.Context, deal MinerDeal) (func(*MinerD allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() - commp, path, _, err := p.pio.GeneratePieceCommitment(deal.Ref, allSelector) + commp, path, _, err := p.pio.GeneratePieceCommitmentToFile(deal.Ref, allSelector) if err != nil { return nil, err } @@ -150,7 +150,7 @@ func (p *Provider) publishing(ctx context.Context, deal MinerDeal) (func(*MinerD return nil, err } - err = p.sendSignedResponse(ctx, &Response{ + err = p.sendSignedResponse(ctx, &network.Response{ State: storagemarket.StorageDealProposalAccepted, Proposal: deal.ProposalCid, @@ -175,7 +175,7 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal) if err != nil { return nil, err } - paddedReader, paddedSize := padreader.NewPaddedReader(file, uint64(file.Size())) + paddedReader, paddedSize := padreader.New(file, uint64(file.Size())) err = p.spn.OnDealComplete( ctx, storagemarket.MinerDeal{ diff --git a/storagemarket/impl/provider_utils.go b/storagemarket/impl/provider_utils.go index afc1ae74..a7e8c2b0 100644 --- a/storagemarket/impl/provider_utils.go +++ b/storagemarket/impl/provider_utils.go @@ -6,15 +6,14 @@ import ( "github.com/ipld/go-ipld-prime" - "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-cbor-util" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-statestore" "github.com/ipfs/go-cid" - inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" ) @@ -31,7 +30,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { log.Warnf("deal %s failed: %s", id, cerr) - err := p.sendSignedResponse(ctx, &Response{ + err := p.sendSignedResponse(ctx, &network.Response{ State: storagemarket.StorageDealFailing, Message: cerr.Error(), Proposal: id, @@ -39,7 +38,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { s, ok := p.conns[id] if ok { - _ = s.Reset() + _ = s.Close() delete(p.conns, id) } @@ -48,8 +47,9 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { } } -func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) { - if err := cborutil.ReadCborRPC(s, &proposal); err != nil { +func (p *Provider) readProposal(s network.StorageDealStream) (proposal network.Proposal, err error) { + proposal, err = s.ReadDealProposal() + if err != nil { log.Errorw("failed to read proposal message", "error", err) return proposal, err } @@ -70,7 +70,7 @@ func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) { return } -func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error { +func (p *Provider) sendSignedResponse(ctx context.Context, resp *network.Response) error { s, ok := p.conns[resp.Proposal] if !ok { return xerrors.New("couldn't send response: not connected") @@ -91,12 +91,12 @@ func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error return xerrors.Errorf("failed to sign response message: %w", err) } - signedResponse := &SignedResponse{ + signedResponse := network.SignedResponse{ Response: *resp, Signature: sig, } - err = cborutil.WriteCborRPC(s, signedResponse) + err = s.WriteDealResponse(signedResponse) if err != nil { // Assume client disconnected s.Close() diff --git a/storagemarket/impl/types.go b/storagemarket/impl/types.go index f432ae7b..d43d4adc 100644 --- a/storagemarket/impl/types.go +++ b/storagemarket/impl/types.go @@ -6,13 +6,10 @@ import ( "github.com/ipfs/go-cid" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-fil-markets/shared/types" "github.com/filecoin-project/go-fil-markets/storagemarket" ) -//go:generate cbor-gen-for AskRequest AskResponse Proposal Response SignedResponse StorageDataTransferVoucher +//go:generate cbor-gen-for StorageDataTransferVoucher var ( // ErrWrongVoucherType means the voucher was not the correct type can validate against @@ -43,47 +40,6 @@ var ( DataTransferStates = []storagemarket.StorageDealStatus{storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealUnknown} ) -type Proposal struct { - DealProposal *storagemarket.StorageDealProposal - - Piece cid.Cid // Used for retrieving from the client -} - -type Response struct { - State storagemarket.StorageDealStatus - - // DealProposalRejected - Message string - Proposal cid.Cid - - // StorageDealProposalAccepted - PublishMessage *cid.Cid -} - -// TODO: Do we actually need this to be signed? -type SignedResponse struct { - Response Response - - Signature *types.Signature -} - -func (r *SignedResponse) Verify(addr address.Address) error { - b, err := cborutil.Dump(&r.Response) - if err != nil { - return err - } - - return r.Signature.Verify(addr, b) -} - -type AskRequest struct { - Miner address.Address -} - -type AskResponse struct { - Ask *types.SignedStorageAsk -} - // StorageDataTransferVoucher is the voucher type for data transfers // used by the storage market type StorageDataTransferVoucher struct { diff --git a/storagemarket/impl/types_cbor_gen.go b/storagemarket/impl/types_cbor_gen.go index 1383df16..31b295cf 100644 --- a/storagemarket/impl/types_cbor_gen.go +++ b/storagemarket/impl/types_cbor_gen.go @@ -6,376 +6,12 @@ import ( "fmt" "io" - "github.com/filecoin-project/go-fil-markets/shared/types" - "github.com/filecoin-project/go-fil-markets/storagemarket" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) var _ = xerrors.Errorf -func (t *AskRequest) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Miner (address.Address) (struct) - if err := t.Miner.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *AskRequest) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Miner (address.Address) (struct) - - { - - if err := t.Miner.UnmarshalCBOR(br); err != nil { - return err - } - - } - return nil -} - -func (t *AskResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Ask (types.SignedStorageAsk) (struct) - if err := t.Ask.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *AskResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Ask (types.SignedStorageAsk) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Ask = new(types.SignedStorageAsk) - if err := t.Ask.UnmarshalCBOR(br); err != nil { - return err - } - } - - } - return nil -} - -func (t *Proposal) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.DealProposal (storagemarket.StorageDealProposal) (struct) - if err := t.DealProposal.MarshalCBOR(w); err != nil { - return err - } - - // t.Piece (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.Piece); err != nil { - return xerrors.Errorf("failed to write cid field t.Piece: %w", err) - } - - return nil -} - -func (t *Proposal) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.DealProposal (storagemarket.StorageDealProposal) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.DealProposal = new(storagemarket.StorageDealProposal) - if err := t.DealProposal.UnmarshalCBOR(br); err != nil { - return err - } - } - - } - // t.Piece (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.Piece: %w", err) - } - - t.Piece = c - - } - return nil -} - -func (t *Response) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{132}); err != nil { - return err - } - - // t.State (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { - return err - } - - // t.Message (string) (string) - if len(t.Message) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Message was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.Message)); err != nil { - return err - } - - // t.Proposal (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.Proposal); err != nil { - return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) - } - - // t.PublishMessage (cid.Cid) (struct) - - if t.PublishMessage == nil { - if _, err := w.Write(cbg.CborNull); err != nil { - return err - } - } else { - if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { - return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) - } - } - - return nil -} - -func (t *Response) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 4 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.State (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.State = uint64(extra) - // t.Message (string) (string) - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.Message = string(sval) - } - // t.Proposal (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.Proposal: %w", err) - } - - t.Proposal = c - - } - // t.PublishMessage (cid.Cid) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) - } - - t.PublishMessage = &c - } - - } - return nil -} - -func (t *SignedResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.Response (storageimpl.Response) (struct) - if err := t.Response.MarshalCBOR(w); err != nil { - return err - } - - // t.Signature (types.Signature) (struct) - if err := t.Signature.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *SignedResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Response (storageimpl.Response) (struct) - - { - - if err := t.Response.UnmarshalCBOR(br); err != nil { - return err - } - - } - // t.Signature (types.Signature) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Signature = new(types.Signature) - if err := t.Signature.UnmarshalCBOR(br); err != nil { - return err - } - } - - } - return nil -} - func (t *StorageDataTransferVoucher) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) diff --git a/storagemarket/network/ask_stream.go b/storagemarket/network/ask_stream.go new file mode 100644 index 00000000..86996d91 --- /dev/null +++ b/storagemarket/network/ask_stream.go @@ -0,0 +1,53 @@ +package network + +import ( + "bufio" + + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/peer" + + cborutil "github.com/filecoin-project/go-cbor-util" +) + +type askStream struct { + p peer.ID + rw mux.MuxedStream + buffered *bufio.Reader +} + +var _ StorageAskStream = (*askStream)(nil) + +func (as *askStream) ReadAskRequest() (AskRequest, error) { + var a AskRequest + + if err := a.UnmarshalCBOR(as.buffered); err != nil { + log.Warn(err) + return AskRequestUndefined, err + + } + + return a, nil +} + +func (as *askStream) WriteAskRequest(q AskRequest) error { + return cborutil.WriteCborRPC(as.rw, &q) +} + +func (as *askStream) ReadAskResponse() (AskResponse, error) { + var resp AskResponse + + if err := resp.UnmarshalCBOR(as.buffered); err != nil { + log.Warn(err) + return AskResponseUndefined, err + } + + return resp, nil +} + +func (as *askStream) WriteAskResponse(qr AskResponse) error { + return cborutil.WriteCborRPC(as.rw, &qr) +} + +func (as *askStream) Close() error { + return as.rw.Close() +} diff --git a/storagemarket/network/deal_stream.go b/storagemarket/network/deal_stream.go new file mode 100644 index 00000000..dec3f894 --- /dev/null +++ b/storagemarket/network/deal_stream.go @@ -0,0 +1,52 @@ +package network + +import ( + "bufio" + + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/peer" +) + +type dealStream struct { + p peer.ID + rw mux.MuxedStream + buffered *bufio.Reader +} + +var _ StorageDealStream = (*dealStream)(nil) + +func (d *dealStream) ReadDealProposal() (Proposal, error) { + var ds Proposal + + if err := ds.UnmarshalCBOR(d.buffered); err != nil { + log.Warn(err) + return ProposalUndefined, err + } + return ds, nil +} + +func (d *dealStream) WriteDealProposal(dp Proposal) error { + return cborutil.WriteCborRPC(d.rw, &dp) +} + +func (d *dealStream) ReadDealResponse() (SignedResponse, error) { + var dr SignedResponse + + if err := dr.UnmarshalCBOR(d.buffered); err != nil { + return SignedResponseUndefined, err + } + return dr, nil +} + +func (d *dealStream) WriteDealResponse(dr SignedResponse) error { + return cborutil.WriteCborRPC(d.rw, &dr) +} + +func (d *dealStream) Close() error { + return d.rw.Close() +} + +func (d *dealStream) RemotePeer() peer.ID { + return d.p +} diff --git a/storagemarket/network/libp2p_impl.go b/storagemarket/network/libp2p_impl.go new file mode 100644 index 00000000..30e3d136 --- /dev/null +++ b/storagemarket/network/libp2p_impl.go @@ -0,0 +1,84 @@ +package network + +import ( + "bufio" + "context" + + "github.com/filecoin-project/go-fil-markets/storagemarket" + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" +) + +var log = logging.Logger("retrieval_network") + +// NewFromLibp2pHost builds a storage market network on top of libp2p +func NewFromLibp2pHost(h host.Host) StorageMarketNetwork { + return &libp2pStorageMarketNetwork{host: h} +} + +// libp2pStorageMarketNetwork transforms the libp2p host interface, which sends and receives +// NetMessage objects, into the graphsync network interface. +type libp2pStorageMarketNetwork struct { + host host.Host + // inbound messages from the network are forwarded to the receiver + receiver StorageReceiver +} + +func (impl *libp2pStorageMarketNetwork) NewAskStream(id peer.ID) (StorageAskStream, error) { + s, err := impl.host.NewStream(context.Background(), id, storagemarket.AskProtocolID) + if err != nil { + log.Warn(err) + return nil, err + } + buffered := bufio.NewReaderSize(s, 16) + return &askStream{p: id, rw: s, buffered: buffered}, nil +} + +func (impl *libp2pStorageMarketNetwork) NewDealStream(id peer.ID) (StorageDealStream, error) { + s, err := impl.host.NewStream(context.Background(), id, storagemarket.DealProtocolID) + if err != nil { + return nil, err + } + buffered := bufio.NewReaderSize(s, 16) + return &dealStream{p: id, rw: s, buffered: buffered}, nil +} + +func (impl *libp2pStorageMarketNetwork) SetDelegate(r StorageReceiver) error { + impl.receiver = r + impl.host.SetStreamHandler(storagemarket.DealProtocolID, impl.handleNewDealStream) + impl.host.SetStreamHandler(storagemarket.AskProtocolID, impl.handleNewAskStream) + return nil +} + +func (impl *libp2pStorageMarketNetwork) StopHandlingRequests() error { + impl.receiver = nil + impl.host.RemoveStreamHandler(storagemarket.DealProtocolID) + impl.host.RemoveStreamHandler(storagemarket.AskProtocolID) + return nil +} + +func (impl *libp2pStorageMarketNetwork) handleNewAskStream(s network.Stream) { + if impl.receiver == nil { + log.Warn("no receiver set") + s.Reset() // nolint: errcheck,gosec + return + } + remotePID := s.Conn().RemotePeer() + buffered := bufio.NewReaderSize(s, 16) + as := &askStream{remotePID, s, buffered} + impl.receiver.HandleAskStream(as) +} + +func (impl *libp2pStorageMarketNetwork) handleNewDealStream(s network.Stream) { + if impl.receiver == nil { + log.Warn("no receiver set") + s.Reset() // nolint: errcheck,gosec + return + } + remotePID := s.Conn().RemotePeer() + buffered := bufio.NewReaderSize(s, 16) + ds := &dealStream{remotePID, s, buffered} + impl.receiver.HandleDealStream(ds) +} diff --git a/storagemarket/network/libp2p_impl_test.go b/storagemarket/network/libp2p_impl_test.go new file mode 100644 index 00000000..f47f7d53 --- /dev/null +++ b/storagemarket/network/libp2p_impl_test.go @@ -0,0 +1,337 @@ +package network_test + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" +) + +type testReceiver struct { + t *testing.T + dealStreamHandler func(network.StorageDealStream) + askStreamHandler func(network.StorageAskStream) +} + +func (tr *testReceiver) HandleDealStream(s network.StorageDealStream) { + defer s.Close() + if tr.dealStreamHandler != nil { + tr.dealStreamHandler(s) + } +} +func (tr *testReceiver) HandleAskStream(s network.StorageAskStream) { + defer s.Close() + if tr.askStreamHandler != nil { + tr.askStreamHandler(s) + } +} + +func TestAskStreamSendReceiveAskRequest(t *testing.T) { + ctx := context.Background() + td := shared_testutil.NewLibp2pTestData(ctx, t) + + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toHost := td.Host2.ID() + + // host1 gets no-op receiver + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + // host2 gets receiver + achan := make(chan network.AskRequest) + tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { + readq, err := s.ReadAskRequest() + require.NoError(t, err) + achan <- readq + }} + require.NoError(t, toNetwork.SetDelegate(tr2)) + + // setup query stream host1 --> host 2 + assertAskRequestReceived(ctx, t, fromNetwork, toHost, achan) +} + +func TestAskStreamSendReceiveAskResponse(t *testing.T) { + ctx := context.Background() + td := shared_testutil.NewLibp2pTestData(ctx, t) + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toHost := td.Host2.ID() + + // host1 gets no-op receiver + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + // host2 gets receiver + achan := make(chan network.AskResponse) + tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { + a, err := s.ReadAskResponse() + require.NoError(t, err) + achan <- a + }} + require.NoError(t, toNetwork.SetDelegate(tr2)) + + assertAskResponseReceived(ctx, t, fromNetwork, toHost, achan) + +} + +func TestAskStreamSendReceiveMultipleSuccessful(t *testing.T) { + // send query, read in handler, send response back, read response + ctxBg := context.Background() + td := shared_testutil.NewLibp2pTestData(ctxBg, t) + nw1 := network.NewFromLibp2pHost(td.Host1) + nw2 := network.NewFromLibp2pHost(td.Host2) + require.NoError(t, td.Host1.Connect(ctxBg, peer.AddrInfo{ID: td.Host2.ID()})) + + // host2 gets a query and sends a response + ar := shared_testutil.MakeTestStorageAskResponse() + done := make(chan bool) + tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { + _, err := s.ReadAskRequest() + require.NoError(t, err) + + require.NoError(t, s.WriteAskResponse(ar)) + done <- true + }} + require.NoError(t, nw2.SetDelegate(tr2)) + + ctx, cancel := context.WithTimeout(ctxBg, 10*time.Second) + defer cancel() + + qs, err := nw1.NewAskStream(td.Host2.ID()) + require.NoError(t, err) + + var resp network.AskResponse + go require.NoError(t, qs.WriteAskRequest(shared_testutil.MakeTestStorageAskRequest())) + resp, err = qs.ReadAskResponse() + require.NoError(t, err) + + select { + case <-ctx.Done(): + t.Error("response not received") + case <-done: + } + + assert.Equal(t, ar, resp) +} + +func TestDealStreamSendReceiveDealProposal(t *testing.T) { + // send proposal, read in handler + ctx := context.Background() + td := shared_testutil.NewLibp2pTestData(ctx, t) + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toHost := td.Host2.ID() + + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + dchan := make(chan network.Proposal) + tr2 := &testReceiver{ + t: t, + dealStreamHandler: func(s network.StorageDealStream) { + readD, err := s.ReadDealProposal() + require.NoError(t, err) + dchan <- readD + }, + } + require.NoError(t, toNetwork.SetDelegate(tr2)) + + assertDealProposalReceived(ctx, t, fromNetwork, toHost, dchan) +} + +func TestDealStreamSendReceiveDealResponse(t *testing.T) { + ctx := context.Background() + td := shared_testutil.NewLibp2pTestData(ctx, t) + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toPeer := td.Host2.ID() + + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + drChan := make(chan network.SignedResponse) + tr2 := &testReceiver{ + t: t, + dealStreamHandler: func(s network.StorageDealStream) { + readDP, err := s.ReadDealResponse() + require.NoError(t, err) + drChan <- readDP + }, + } + require.NoError(t, toNetwork.SetDelegate(tr2)) + assertDealResponseReceived(ctx, t, fromNetwork, toPeer, drChan) +} + +func TestDealStreamSendReceiveMultipleSuccessful(t *testing.T) { + // send proposal, read in handler, send response back, + // read response, + + bgCtx := context.Background() + td := shared_testutil.NewLibp2pTestData(bgCtx, t) + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toPeer := td.Host2.ID() + + // set up stream handler, channels, and response + dr := shared_testutil.MakeTestStorageNetworkSignedResponse() + done := make(chan bool) + + tr2 := &testReceiver{t: t, dealStreamHandler: func(s network.StorageDealStream) { + _, err := s.ReadDealProposal() + require.NoError(t, err) + + require.NoError(t, s.WriteDealResponse(dr)) + done <- true + }} + require.NoError(t, toNetwork.SetDelegate(tr2)) + + // start sending deal proposal + ds1, err := fromNetwork.NewDealStream(toPeer) + require.NoError(t, err) + + dp := shared_testutil.MakeTestStorageNetworkProposal() + + ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second) + defer cancel() + + // write proposal + require.NoError(t, ds1.WriteDealProposal(dp)) + + // read response and verify it's the one we told toNetwork to send + responseReceived, err := ds1.ReadDealResponse() + require.NoError(t, err) + assert.Equal(t, dr, responseReceived) + + select { + case <-ctx.Done(): + t.Errorf("failed to receive messages") + case <-done: + } + +} + +func TestLibp2pStorageMarketNetwork_StopHandlingRequests(t *testing.T) { + bgCtx := context.Background() + td := shared_testutil.NewLibp2pTestData(bgCtx, t) + + fromNetwork := network.NewFromLibp2pHost(td.Host1) + toNetwork := network.NewFromLibp2pHost(td.Host2) + toHost := td.Host2.ID() + + // host1 gets no-op receiver + tr := &testReceiver{t: t} + require.NoError(t, fromNetwork.SetDelegate(tr)) + + // host2 gets receiver + achan := make(chan network.AskRequest) + tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { + readar, err := s.ReadAskRequest() + require.NoError(t, err) + achan <- readar + }} + require.NoError(t, toNetwork.SetDelegate(tr2)) + + require.NoError(t, toNetwork.StopHandlingRequests()) + + _, err := fromNetwork.NewAskStream(toHost) + require.Error(t, err, "protocol not supported") +} + +// assertDealProposalReceived performs the verification that a deal proposal is received +func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan network.Proposal) { + ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) + defer cancel() + + qs1, err := fromNetwork.NewDealStream(toPeer) + require.NoError(t, err) + + // send query to host2 + dp := shared_testutil.MakeTestStorageNetworkProposal() + require.NoError(t, qs1.WriteDealProposal(dp)) + + var dealReceived network.Proposal + select { + case <-ctx.Done(): + t.Error("deal proposal not received") + case dealReceived = <-inChan: + } + require.NotNil(t, dealReceived) + assert.Equal(t, dp, dealReceived) +} + +func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan network.SignedResponse) { + ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second) + defer cancel() + + ds1, err := fromNetwork.NewDealStream(toPeer) + require.NoError(t, err) + + dr := shared_testutil.MakeTestStorageNetworkSignedResponse() + require.NoError(t, ds1.WriteDealResponse(dr)) + + var responseReceived network.SignedResponse + select { + case <-ctx.Done(): + t.Error("response not received") + case responseReceived = <-inChan: + } + require.NotNil(t, responseReceived) + assert.Equal(t, dr, responseReceived) +} + +// assertAskRequestReceived performs the verification that a AskRequest is received +func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toHost peer.ID, achan chan network.AskRequest) { + ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) + defer cancel() + + as1, err := fromNetwork.NewAskStream(toHost) + require.NoError(t, err) + + // send query to host2 + a := shared_testutil.MakeTestStorageAskRequest() + require.NoError(t, as1.WriteAskRequest(a)) + + var ina network.AskRequest + select { + case <-ctx.Done(): + t.Error("msg not received") + case ina = <-achan: + } + require.NotNil(t, ina) + assert.Equal(t, a.Miner, ina.Miner) +} + +// assertAskResponseReceived performs the verification that a AskResponse is received +func assertAskResponseReceived(inCtx context.Context, t *testing.T, + fromNetwork network.StorageMarketNetwork, + toHost peer.ID, + achan chan network.AskResponse) { + ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) + defer cancel() + + // setup query stream host1 --> host 2 + as1, err := fromNetwork.NewAskStream(toHost) + require.NoError(t, err) + + // send queryresponse to host2 + ar := shared_testutil.MakeTestStorageAskResponse() + require.NoError(t, as1.WriteAskResponse(ar)) + + // read queryresponse + var inar network.AskResponse + select { + case <-ctx.Done(): + t.Error("msg not received") + case inar = <-achan: + } + + require.NotNil(t, inar) + assert.Equal(t, ar, inar) +} diff --git a/storagemarket/network/network.go b/storagemarket/network/network.go new file mode 100644 index 00000000..bc954380 --- /dev/null +++ b/storagemarket/network/network.go @@ -0,0 +1,41 @@ +package network + +import ( + "github.com/libp2p/go-libp2p-core/peer" +) + +// StorageAskStream is a stream for reading/writing requests & +// responses on the Storage Ask protocol +type StorageAskStream interface { + ReadAskRequest() (AskRequest, error) + WriteAskRequest(AskRequest) error + ReadAskResponse() (AskResponse, error) + WriteAskResponse(AskResponse) error + Close() error +} + +// StorageDealStream is a stream for reading and writing requests +// and responses on the storage deal protocol +type StorageDealStream interface { + ReadDealProposal() (Proposal, error) + WriteDealProposal(Proposal) error + ReadDealResponse() (SignedResponse, error) + WriteDealResponse(SignedResponse) error + RemotePeer() peer.ID + Close() error +} + +// StorageReceiver implements functions for receiving +// incoming data on storage protocols +type StorageReceiver interface { + HandleAskStream(StorageAskStream) + HandleDealStream(StorageDealStream) +} + +// StorageMarketNetwork is a network abstraction for the storage market +type StorageMarketNetwork interface { + NewAskStream(peer.ID) (StorageAskStream, error) + NewDealStream(peer.ID) (StorageDealStream, error) + SetDelegate(StorageReceiver) error + StopHandlingRequests() error +} diff --git a/storagemarket/network/types.go b/storagemarket/network/types.go new file mode 100644 index 00000000..f28597e7 --- /dev/null +++ b/storagemarket/network/types.go @@ -0,0 +1,68 @@ +package network + +import ( + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-fil-markets/storagemarket" +) + +//go:generate cbor-gen-for AskRequest AskResponse Proposal Response SignedResponse + +// Proposal is the data sent over the network from client to provider when proposing +// a deal +type Proposal struct { + DealProposal *storagemarket.StorageDealProposal + + Piece cid.Cid // Used for retrieving from the client +} + +var ProposalUndefined = Proposal{} + +// Response is a response to a proposal sent over the network +type Response struct { + State storagemarket.StorageDealStatus + + // DealProposalRejected + Message string + Proposal cid.Cid + + // StorageDealProposalAccepted + PublishMessage *cid.Cid +} + +// SignedResponse is a response that is signed +type SignedResponse struct { + Response Response + + Signature *types.Signature +} + +var SignedResponseUndefined = SignedResponse{} + +// Verify verifies that a proposal was signed by the given provider +func (r *SignedResponse) Verify(addr address.Address) error { + b, err := cborutil.Dump(&r.Response) + if err != nil { + return err + } + + return r.Signature.Verify(addr, b) +} + +// AskRequest is a request for current ask parameters for a given miner +type AskRequest struct { + Miner address.Address +} + +var AskRequestUndefined = AskRequest{} + +// AskResponse is the response sent over the network in response +// to an ask request +type AskResponse struct { + Ask *types.SignedStorageAsk +} + +var AskResponseUndefined = AskResponse{} diff --git a/storagemarket/network/types_cbor_gen.go b/storagemarket/network/types_cbor_gen.go new file mode 100644 index 00000000..a807d1ed --- /dev/null +++ b/storagemarket/network/types_cbor_gen.go @@ -0,0 +1,377 @@ +package network + +import ( + "fmt" + "io" + + "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-fil-markets/storagemarket" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +var _ = xerrors.Errorf + +func (t *AskRequest) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.Miner (address.Address) (struct) + if err := t.Miner.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *AskRequest) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Miner (address.Address) (struct) + + { + + if err := t.Miner.UnmarshalCBOR(br); err != nil { + return err + } + + } + return nil +} + +func (t *AskResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.Ask (types.SignedStorageAsk) (struct) + if err := t.Ask.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *AskResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Ask (types.SignedStorageAsk) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Ask = new(types.SignedStorageAsk) + if err := t.Ask.UnmarshalCBOR(br); err != nil { + return err + } + } + + } + return nil +} + +func (t *Proposal) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.DealProposal (storagemarket.StorageDealProposal) (struct) + if err := t.DealProposal.MarshalCBOR(w); err != nil { + return err + } + + // t.Piece (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.Piece); err != nil { + return xerrors.Errorf("failed to write cid field t.Piece: %w", err) + } + + return nil +} + +func (t *Proposal) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.DealProposal (storagemarket.StorageDealProposal) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.DealProposal = new(storagemarket.StorageDealProposal) + if err := t.DealProposal.UnmarshalCBOR(br); err != nil { + return err + } + } + + } + // t.Piece (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Piece: %w", err) + } + + t.Piece = c + + } + return nil +} + +func (t *Response) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{132}); err != nil { + return err + } + + // t.State (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.Message (string) (string) + if len(t.Message) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Message was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Message)); err != nil { + return err + } + + // t.Proposal (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.Proposal); err != nil { + return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) + } + + // t.PublishMessage (cid.Cid) (struct) + + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } + } + + return nil +} + +func (t *Response) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 4 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.State (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + // t.Message (string) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Message = string(sval) + } + // t.Proposal (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Proposal: %w", err) + } + + t.Proposal = c + + } + // t.PublishMessage (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) + } + + t.PublishMessage = &c + } + + } + return nil +} + +func (t *SignedResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.Response (network.Response) (struct) + if err := t.Response.MarshalCBOR(w); err != nil { + return err + } + + // t.Signature (types.Signature) (struct) + if err := t.Signature.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *SignedResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Response (network.Response) (struct) + + { + + if err := t.Response.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.Signature (types.Signature) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Signature = new(types.Signature) + if err := t.Signature.UnmarshalCBOR(br); err != nil { + return err + } + } + + } + return nil +} diff --git a/storagemarket/types.go b/storagemarket/types.go index 96f8a0f1..3efe28f8 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -6,7 +6,6 @@ import ( "io" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" xerrors "golang.org/x/xerrors" @@ -188,9 +187,9 @@ type ClientDeal struct { // The interface provided for storage providers type StorageProvider interface { - Run(ctx context.Context, host host.Host) + Start(ctx context.Context) error - Stop() + Stop() error AddAsk(price tokenamount.TokenAmount, ttlsecs int64) error