Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Techdebt/1 block file retrieval test #51

Merged
merged 6 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions retrievalmarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"fmt"

"golang.org/x/xerrors"

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"

"golang.org/x/xerrors"
)

// ClientDealEnvironment is a bridge to the environment a client deal is executing in
Expand Down Expand Up @@ -89,7 +89,7 @@ func ProcessPaymentRequested(ctx context.Context, environment ClientDealEnvironm
}

// check that totalReceived - bytesPaidFor >= currentInterval, or fail
if (deal.TotalReceived-deal.BytesPaidFor < deal.CurrentInterval) && deal.Status != rm.DealStatusFundsNeededLastPayment{
if (deal.TotalReceived-deal.BytesPaidFor < deal.CurrentInterval) && deal.Status != rm.DealStatusFundsNeededLastPayment {
return errorFunc(xerrors.New("not enough bytes received between payment request"))
}

Expand Down
265 changes: 155 additions & 110 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-data-transfer/testutil"
"github.com/ipfs/go-log/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -92,7 +91,6 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC
}

func TestClientCanMakeDealWithProvider(t *testing.T) {
log.SetDebugLogging()
bgCtx := context.Background()
clientPaymentChannel, err := address.NewIDAddress(rand.Uint64())
require.NoError(t, err)
Expand All @@ -101,124 +99,171 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {

// -------- SET UP PROVIDER

// Inject a unixFS file on the provider side to its blockstore
// obtained via `ls -laf` on this file

// pieceLink := testData.LoadUnixFSFile(t, "lorem_big.txt", true)
// fileSize := uint64(89359)
pieceLink := testData.LoadUnixFSFile(t, "lorem.txt", true)
fileSize := uint64(19000)


pieceCID := []byte("pieceCID")
providerPaymentAddr, err := address.NewIDAddress(rand.Uint64())
require.NoError(t, err)
paymentInterval := uint64(10000)
paymentIntervalIncrease := uint64(1000)
pricePerByte := tokenamount.FromInt(1000)

expectedQR := retrievalmarket.QueryResponse{
Size: 1024,
PaymentAddress: providerPaymentAddr,
MinPricePerByte: pricePerByte,
MaxPaymentInterval: paymentInterval,
MaxPaymentIntervalIncrease: paymentIntervalIncrease,
testCases := []struct {
name string
filename string
filesize uint64
voucherAmts []tokenamount.TokenAmount
}{
{name: "1 block file retrieval succeeds",
filename: "lorem_under_1_block.txt",
filesize: 410,
voucherAmts: []tokenamount.TokenAmount{tokenamount.FromInt(410000)}},
{name: "multi-block file retrieval succeeds",
filename: "lorem.txt",
filesize: 19000,
voucherAmts: []tokenamount.TokenAmount{tokenamount.FromInt(10136000), tokenamount.FromInt(9784000)}},
}

providerNode := setupProvider(t, testData, pieceCID, expectedQR, providerPaymentAddr )

retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID(),}

expectedVoucher := tut.MakeTestSignedVoucher()

// just make sure there is enough to cover the transfer
expectedTotal := tokenamount.Mul(pricePerByte, tokenamount.FromInt(fileSize*2))

// this is just pulled from the actual answer so the expected keys in the test node match up.
// later we compare the voucher values.
expectedVoucher.Amount = tokenamount.FromInt(10136000)
proof := []byte("")
require.NoError(t, providerNode.ExpectVoucher(clientPaymentChannel, expectedVoucher, proof, expectedVoucher.Amount, expectedVoucher.Amount, nil))

// ------- SET UP CLIENT
nw1 := rmnet.NewFromLibp2pHost(testData.Host1)

createdChan, newLaneAddr, createdVoucher, client := setupClient(clientPaymentChannel, expectedVoucher, nw1, testData)

dealStateChan := make(chan retrievalmarket.ClientDealState)
client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
switch event {
case retrievalmarket.ClientEventComplete:
dealStateChan <- state
case retrievalmarket.ClientEventError:
msg := `
Status: %d
TotalReceived: %d
BytesPaidFor: %d
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// Inject a unixFS file on the provider side to its blockstore
// obtained via `ls -laf` on this file
pieceLink := testData.LoadUnixFSFile(t, testCase.filename, true)

pieceCID := []byte("pieceCID")
providerPaymentAddr, err := address.NewIDAddress(rand.Uint64())
require.NoError(t, err)
paymentInterval := uint64(10000)
paymentIntervalIncrease := uint64(1000)
pricePerByte := tokenamount.FromInt(1000)

expectedQR := retrievalmarket.QueryResponse{
Size: 1024,
PaymentAddress: providerPaymentAddr,
MinPricePerByte: pricePerByte,
MaxPaymentInterval: paymentInterval,
MaxPaymentIntervalIncrease: paymentIntervalIncrease,
}

providerNode, provider := setupProvider(t, testData, pieceCID, expectedQR, providerPaymentAddr)

retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID(),}

expectedVoucher := tut.MakeTestSignedVoucher()

// just make sure there is enough to cover the transfer
expectedTotal := tokenamount.Mul(pricePerByte, tokenamount.FromInt(testCase.filesize*2))

// voucherAmts are pulled from the actual answer so the expected keys in the test node match up.
// later we compare the voucher values. The last voucherAmt is a remainder
proof := []byte("")
for _, voucherAmt := range testCase.voucherAmts {
require.NoError(t, providerNode.ExpectVoucher(clientPaymentChannel, expectedVoucher, proof, voucherAmt, voucherAmt, nil))
}

// ------- SET UP CLIENT
nw1 := rmnet.NewFromLibp2pHost(testData.Host1)

createdChan, newLaneAddr, createdVoucher, client := setupClient(clientPaymentChannel, expectedVoucher, nw1, testData)

clientDealStateChan := make(chan retrievalmarket.ClientDealState)
client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
switch event {
case retrievalmarket.ClientEventComplete:
clientDealStateChan <- state
case retrievalmarket.ClientEventError:
msg := `
Status: %d
TotalReceived: %d
BytesPaidFor: %d
CurrentInterval: %d
TotalFunds: %s
TotalFunds: %s
`
t.Logf(msg, state.Status, state.TotalReceived, state.BytesPaidFor, state.CurrentInterval,state.TotalFunds.String(),)
}
})

// **** Send the query for the Piece
// set up retrieval params
resp, err := client.Query(bgCtx, *retrievalPeer, pieceCID, retrievalmarket.QueryParams{})
require.NoError(t, err)
require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status)

c, ok := pieceLink.(cidlink.Link)
require.True(t, ok)
payloadCID := c.Cid

rmParams := retrievalmarket.Params{
PricePerByte: pricePerByte,
PaymentInterval: paymentInterval,
PaymentIntervalIncrease: paymentIntervalIncrease,
PayloadCID: payloadCID,
t.Logf(msg, state.Status, state.TotalReceived, state.BytesPaidFor, state.CurrentInterval, state.TotalFunds.String(), )
}
})

providerDealStateChan := make(chan retrievalmarket.ProviderDealState)
provider.SubscribeToEvents(func(event retrievalmarket.ProviderEvent, state retrievalmarket.ProviderDealState) {
switch event {
case retrievalmarket.ProviderEventComplete:
providerDealStateChan <- state
case retrievalmarket.ProviderEventError:
msg := `
Status: %d
TotalSent: %d
FundsReceived: %s
Message: %s
CurrentInterval: %d
`
t.Logf(msg, state.Status, state.TotalSent, state.FundsReceived.String(), state.Message, state.CurrentInterval)
}
})

// **** Send the query for the Piece
// set up retrieval params
resp, err := client.Query(bgCtx, *retrievalPeer, pieceCID, retrievalmarket.QueryParams{})
require.NoError(t, err)
require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status)

c, ok := pieceLink.(cidlink.Link)
require.True(t, ok)
payloadCID := c.Cid

rmParams := retrievalmarket.Params{
PricePerByte: pricePerByte,
PaymentInterval: paymentInterval,
PaymentIntervalIncrease: paymentIntervalIncrease,
PayloadCID: payloadCID,
}

// *** Retrieve the piece
did := client.Retrieve(bgCtx, pieceCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address)
assert.Equal(t, did, retrievalmarket.DealID(1))

ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second)
defer cancel()

// verify that client subscribers will be notified of state changes
var clientDealState retrievalmarket.ClientDealState
select {
case <-ctx.Done():
t.Error("deal never completed")
t.FailNow()
case clientDealState = <-clientDealStateChan:
}
assert.Equal(t, clientDealState.Lane, expectedVoucher.Lane)
require.NotNil(t, createdChan)
require.Equal(t, expectedTotal, createdChan.amt)
require.Equal(t, clientPaymentChannel, *newLaneAddr)
// verify that the voucher was saved/seen by the client with correct values
require.NotNil(t, createdVoucher)
assert.True(t, createdVoucher.Equals(expectedVoucher))

ctx, cancel = context.WithTimeout(bgCtx, 10*time.Second)
defer cancel()
var providerDealState retrievalmarket.ProviderDealState
select {
case <-ctx.Done():
t.Error("provider never saw completed deal")
t.FailNow()
case providerDealState = <- providerDealStateChan:
}

require.Equal(t, retrievalmarket.DealStatusCompleted, providerDealState.Status)

// verify that the provider saved the same voucher values
providerNode.VerifyExpectations(t)
testData.VerifyFileTransferred(t, pieceLink, false)
})
}

// *** Retrieve the piece
did := client.Retrieve(bgCtx, pieceCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address)
assert.Equal(t, did, retrievalmarket.DealID(1))

ctx , cancel := context.WithTimeout(bgCtx, 20*time.Second)
defer cancel()

var dealState retrievalmarket.ClientDealState
select {
case <- ctx.Done():
t.Error("deal never completed")
t.FailNow()
case dealState = <- dealStateChan:
}
assert.Equal(t, dealState.Lane, expectedVoucher.Lane)
require.NotNil(t, createdChan)
require.Equal(t, expectedTotal, createdChan.amt)
require.Equal(t, clientPaymentChannel, *newLaneAddr)
// verify that the voucher was saved/seen by the client with correct values
require.NotNil(t, createdVoucher)
assert.True(t, createdVoucher.Equals(expectedVoucher))
// // verify that the provider saved the same voucher values
providerNode.VerifyExpectations(t)
testData.VerifyFileTransferred(t, pieceLink, false)
}

func setupClient(
clientPaymentChannel address.Address,
expectedVoucher *types.SignedVoucher,
nw1 rmnet.RetrievalMarketNetwork,
testData *tut.Libp2pTestData) ( *pmtChan,
*address.Address,
*types.SignedVoucher,
retrievalmarket.RetrievalClient) {
var createdChan pmtChan
clientPaymentChannel address.Address,
expectedVoucher *types.SignedVoucher,
nw1 rmnet.RetrievalMarketNetwork,
testData *tut.Libp2pTestData) (*pmtChan,
*address.Address,
*types.SignedVoucher,
retrievalmarket.RetrievalClient) {
var createdChan pmtChan
paymentChannelRecorder := func(client, miner address.Address, amt tokenamount.TokenAmount) {
createdChan = pmtChan{client, miner, amt}
}

var newLaneAddr address.Address
var newLaneAddr address.Address
laneRecorder := func(paymentChannel address.Address) {
newLaneAddr = paymentChannel
}
Expand All @@ -239,7 +284,7 @@ func setupClient(
return &createdChan, &newLaneAddr, &createdVoucher, client
}

func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte, expectedQR retrievalmarket.QueryResponse, providerPaymentAddr address.Address) *testnodes.TestRetrievalProviderNode {
func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte, expectedQR retrievalmarket.QueryResponse, providerPaymentAddr address.Address) (*testnodes.TestRetrievalProviderNode, retrievalmarket.RetrievalProvider) {
nw2 := rmnet.NewFromLibp2pHost(testData.Host2)
providerNode := testnodes.NewTestRetrievalProviderNode()
providerNode.SetBlockstore(testData.Bs2)
Expand All @@ -248,7 +293,7 @@ func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte,
provider.SetPaymentInterval(expectedQR.MaxPaymentInterval, expectedQR.MaxPaymentIntervalIncrease)
provider.SetPricePerByte(expectedQR.MinPricePerByte)
require.NoError(t, provider.Start())
return providerNode
return providerNode, provider
}

type pmtChan struct {
Expand Down
3 changes: 2 additions & 1 deletion retrievalmarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package providerstates
import (
"context"

"golang.org/x/xerrors"

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"golang.org/x/xerrors"
)

// ProviderDealEnvironment is a bridge to the environment a provider deal is executing in
Expand Down