diff --git a/retrievalmarket/impl/clientstates/client_states.go b/retrievalmarket/impl/clientstates/client_states.go index bfff0b75..db4fc573 100644 --- a/retrievalmarket/impl/clientstates/client_states.go +++ b/retrievalmarket/impl/clientstates/client_states.go @@ -4,11 +4,11 @@ import ( "context" "fmt" + "golang.org/x/xerrors" + rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" - - "golang.org/x/xerrors" ) // ClientDealEnvironment is a bridge to the environment a client deal is executing in @@ -89,7 +89,7 @@ func ProcessPaymentRequested(ctx context.Context, environment ClientDealEnvironm } // check that totalReceived - bytesPaidFor >= currentInterval, or fail - if (deal.TotalReceived-deal.BytesPaidFor < deal.CurrentInterval) && deal.Status != rm.DealStatusFundsNeededLastPayment{ + if (deal.TotalReceived-deal.BytesPaidFor < deal.CurrentInterval) && deal.Status != rm.DealStatusFundsNeededLastPayment { return errorFunc(xerrors.New("not enough bytes received between payment request")) } diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index 72231a1f..aef822fb 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -8,7 +8,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-data-transfer/testutil" - "github.com/ipfs/go-log/v2" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -92,7 +91,6 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC } func TestClientCanMakeDealWithProvider(t *testing.T) { - log.SetDebugLogging() bgCtx := context.Background() clientPaymentChannel, err := address.NewIDAddress(rand.Uint64()) require.NoError(t, err) @@ -101,124 +99,171 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { // -------- SET UP PROVIDER - // Inject a unixFS file on the provider side to its blockstore - // obtained via `ls -laf` on this file - - // pieceLink := testData.LoadUnixFSFile(t, "lorem_big.txt", true) - // fileSize := uint64(89359) - pieceLink := testData.LoadUnixFSFile(t, "lorem.txt", true) - fileSize := uint64(19000) - - - pieceCID := []byte("pieceCID") - providerPaymentAddr, err := address.NewIDAddress(rand.Uint64()) - require.NoError(t, err) - paymentInterval := uint64(10000) - paymentIntervalIncrease := uint64(1000) - pricePerByte := tokenamount.FromInt(1000) - - expectedQR := retrievalmarket.QueryResponse{ - Size: 1024, - PaymentAddress: providerPaymentAddr, - MinPricePerByte: pricePerByte, - MaxPaymentInterval: paymentInterval, - MaxPaymentIntervalIncrease: paymentIntervalIncrease, + testCases := []struct { + name string + filename string + filesize uint64 + voucherAmts []tokenamount.TokenAmount + }{ + {name: "1 block file retrieval succeeds", + filename: "lorem_under_1_block.txt", + filesize: 410, + voucherAmts: []tokenamount.TokenAmount{tokenamount.FromInt(410000)}}, + {name: "multi-block file retrieval succeeds", + filename: "lorem.txt", + filesize: 19000, + voucherAmts: []tokenamount.TokenAmount{tokenamount.FromInt(10136000), tokenamount.FromInt(9784000)}}, } - - providerNode := setupProvider(t, testData, pieceCID, expectedQR, providerPaymentAddr ) - - retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID(),} - - expectedVoucher := tut.MakeTestSignedVoucher() - - // just make sure there is enough to cover the transfer - expectedTotal := tokenamount.Mul(pricePerByte, tokenamount.FromInt(fileSize*2)) - - // this is just pulled from the actual answer so the expected keys in the test node match up. - // later we compare the voucher values. - expectedVoucher.Amount = tokenamount.FromInt(10136000) - proof := []byte("") - require.NoError(t, providerNode.ExpectVoucher(clientPaymentChannel, expectedVoucher, proof, expectedVoucher.Amount, expectedVoucher.Amount, nil)) - - // ------- SET UP CLIENT - nw1 := rmnet.NewFromLibp2pHost(testData.Host1) - - createdChan, newLaneAddr, createdVoucher, client := setupClient(clientPaymentChannel, expectedVoucher, nw1, testData) - - dealStateChan := make(chan retrievalmarket.ClientDealState) - client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) { - switch event { - case retrievalmarket.ClientEventComplete: - dealStateChan <- state - case retrievalmarket.ClientEventError: - msg := ` -Status: %d -TotalReceived: %d -BytesPaidFor: %d + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + // Inject a unixFS file on the provider side to its blockstore + // obtained via `ls -laf` on this file + pieceLink := testData.LoadUnixFSFile(t, testCase.filename, true) + + pieceCID := []byte("pieceCID") + providerPaymentAddr, err := address.NewIDAddress(rand.Uint64()) + require.NoError(t, err) + paymentInterval := uint64(10000) + paymentIntervalIncrease := uint64(1000) + pricePerByte := tokenamount.FromInt(1000) + + expectedQR := retrievalmarket.QueryResponse{ + Size: 1024, + PaymentAddress: providerPaymentAddr, + MinPricePerByte: pricePerByte, + MaxPaymentInterval: paymentInterval, + MaxPaymentIntervalIncrease: paymentIntervalIncrease, + } + + providerNode, provider := setupProvider(t, testData, pieceCID, expectedQR, providerPaymentAddr) + + retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID(),} + + expectedVoucher := tut.MakeTestSignedVoucher() + + // just make sure there is enough to cover the transfer + expectedTotal := tokenamount.Mul(pricePerByte, tokenamount.FromInt(testCase.filesize*2)) + + // voucherAmts are pulled from the actual answer so the expected keys in the test node match up. + // later we compare the voucher values. The last voucherAmt is a remainder + proof := []byte("") + for _, voucherAmt := range testCase.voucherAmts { + require.NoError(t, providerNode.ExpectVoucher(clientPaymentChannel, expectedVoucher, proof, voucherAmt, voucherAmt, nil)) + } + + // ------- SET UP CLIENT + nw1 := rmnet.NewFromLibp2pHost(testData.Host1) + + createdChan, newLaneAddr, createdVoucher, client := setupClient(clientPaymentChannel, expectedVoucher, nw1, testData) + + clientDealStateChan := make(chan retrievalmarket.ClientDealState) + client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) { + switch event { + case retrievalmarket.ClientEventComplete: + clientDealStateChan <- state + case retrievalmarket.ClientEventError: + msg := ` +Status: %d +TotalReceived: %d +BytesPaidFor: %d CurrentInterval: %d -TotalFunds: %s +TotalFunds: %s ` - t.Logf(msg, state.Status, state.TotalReceived, state.BytesPaidFor, state.CurrentInterval,state.TotalFunds.String(),) - } - }) - - // **** Send the query for the Piece - // set up retrieval params - resp, err := client.Query(bgCtx, *retrievalPeer, pieceCID, retrievalmarket.QueryParams{}) - require.NoError(t, err) - require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status) - - c, ok := pieceLink.(cidlink.Link) - require.True(t, ok) - payloadCID := c.Cid - - rmParams := retrievalmarket.Params{ - PricePerByte: pricePerByte, - PaymentInterval: paymentInterval, - PaymentIntervalIncrease: paymentIntervalIncrease, - PayloadCID: payloadCID, + t.Logf(msg, state.Status, state.TotalReceived, state.BytesPaidFor, state.CurrentInterval, state.TotalFunds.String(), ) + } + }) + + providerDealStateChan := make(chan retrievalmarket.ProviderDealState) + provider.SubscribeToEvents(func(event retrievalmarket.ProviderEvent, state retrievalmarket.ProviderDealState) { + switch event { + case retrievalmarket.ProviderEventComplete: + providerDealStateChan <- state + case retrievalmarket.ProviderEventError: + msg := ` +Status: %d +TotalSent: %d +FundsReceived: %s +Message: %s +CurrentInterval: %d +` + t.Logf(msg, state.Status, state.TotalSent, state.FundsReceived.String(), state.Message, state.CurrentInterval) + } + }) + + // **** Send the query for the Piece + // set up retrieval params + resp, err := client.Query(bgCtx, *retrievalPeer, pieceCID, retrievalmarket.QueryParams{}) + require.NoError(t, err) + require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status) + + c, ok := pieceLink.(cidlink.Link) + require.True(t, ok) + payloadCID := c.Cid + + rmParams := retrievalmarket.Params{ + PricePerByte: pricePerByte, + PaymentInterval: paymentInterval, + PaymentIntervalIncrease: paymentIntervalIncrease, + PayloadCID: payloadCID, + } + + // *** Retrieve the piece + did := client.Retrieve(bgCtx, pieceCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address) + assert.Equal(t, did, retrievalmarket.DealID(1)) + + ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second) + defer cancel() + + // verify that client subscribers will be notified of state changes + var clientDealState retrievalmarket.ClientDealState + select { + case <-ctx.Done(): + t.Error("deal never completed") + t.FailNow() + case clientDealState = <-clientDealStateChan: + } + assert.Equal(t, clientDealState.Lane, expectedVoucher.Lane) + require.NotNil(t, createdChan) + require.Equal(t, expectedTotal, createdChan.amt) + require.Equal(t, clientPaymentChannel, *newLaneAddr) + // verify that the voucher was saved/seen by the client with correct values + require.NotNil(t, createdVoucher) + assert.True(t, createdVoucher.Equals(expectedVoucher)) + + ctx, cancel = context.WithTimeout(bgCtx, 10*time.Second) + defer cancel() + var providerDealState retrievalmarket.ProviderDealState + select { + case <-ctx.Done(): + t.Error("provider never saw completed deal") + t.FailNow() + case providerDealState = <- providerDealStateChan: + } + + require.Equal(t, retrievalmarket.DealStatusCompleted, providerDealState.Status) + + // verify that the provider saved the same voucher values + providerNode.VerifyExpectations(t) + testData.VerifyFileTransferred(t, pieceLink, false) + }) } - // *** Retrieve the piece - did := client.Retrieve(bgCtx, pieceCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address) - assert.Equal(t, did, retrievalmarket.DealID(1)) - - ctx , cancel := context.WithTimeout(bgCtx, 20*time.Second) - defer cancel() - - var dealState retrievalmarket.ClientDealState - select { - case <- ctx.Done(): - t.Error("deal never completed") - t.FailNow() - case dealState = <- dealStateChan: - } - assert.Equal(t, dealState.Lane, expectedVoucher.Lane) - require.NotNil(t, createdChan) - require.Equal(t, expectedTotal, createdChan.amt) - require.Equal(t, clientPaymentChannel, *newLaneAddr) - // verify that the voucher was saved/seen by the client with correct values - require.NotNil(t, createdVoucher) - assert.True(t, createdVoucher.Equals(expectedVoucher)) - // // verify that the provider saved the same voucher values - providerNode.VerifyExpectations(t) - testData.VerifyFileTransferred(t, pieceLink, false) } func setupClient( - clientPaymentChannel address.Address, - expectedVoucher *types.SignedVoucher, - nw1 rmnet.RetrievalMarketNetwork, - testData *tut.Libp2pTestData) ( *pmtChan, - *address.Address, - *types.SignedVoucher, - retrievalmarket.RetrievalClient) { - var createdChan pmtChan + clientPaymentChannel address.Address, + expectedVoucher *types.SignedVoucher, + nw1 rmnet.RetrievalMarketNetwork, + testData *tut.Libp2pTestData) (*pmtChan, + *address.Address, + *types.SignedVoucher, + retrievalmarket.RetrievalClient) { + var createdChan pmtChan paymentChannelRecorder := func(client, miner address.Address, amt tokenamount.TokenAmount) { createdChan = pmtChan{client, miner, amt} } - var newLaneAddr address.Address + var newLaneAddr address.Address laneRecorder := func(paymentChannel address.Address) { newLaneAddr = paymentChannel } @@ -239,7 +284,7 @@ func setupClient( return &createdChan, &newLaneAddr, &createdVoucher, client } -func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte, expectedQR retrievalmarket.QueryResponse, providerPaymentAddr address.Address) *testnodes.TestRetrievalProviderNode { +func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte, expectedQR retrievalmarket.QueryResponse, providerPaymentAddr address.Address) (*testnodes.TestRetrievalProviderNode, retrievalmarket.RetrievalProvider) { nw2 := rmnet.NewFromLibp2pHost(testData.Host2) providerNode := testnodes.NewTestRetrievalProviderNode() providerNode.SetBlockstore(testData.Bs2) @@ -248,7 +293,7 @@ func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte, provider.SetPaymentInterval(expectedQR.MaxPaymentInterval, expectedQR.MaxPaymentIntervalIncrease) provider.SetPricePerByte(expectedQR.MinPricePerByte) require.NoError(t, provider.Start()) - return providerNode + return providerNode, provider } type pmtChan struct { diff --git a/retrievalmarket/impl/providerstates/provider_states.go b/retrievalmarket/impl/providerstates/provider_states.go index 218aac1c..51c4befa 100644 --- a/retrievalmarket/impl/providerstates/provider_states.go +++ b/retrievalmarket/impl/providerstates/provider_states.go @@ -3,10 +3,11 @@ package providerstates import ( "context" + "golang.org/x/xerrors" + rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" - "golang.org/x/xerrors" ) // ProviderDealEnvironment is a bridge to the environment a provider deal is executing in