Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallel transfers between same two peers #254

Merged
merged 4 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.6
github.com/ipfs/go-graphsync v0.9.0
github.com/ipfs/go-graphsync v0.9.1
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.9.0 h1:T22kORlNbJUIm/+avUIfLnAf1BkwKG6aS19NsRVjVVY=
github.com/ipfs/go-graphsync v0.9.0/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
github.com/ipfs/go-graphsync v0.9.1 h1:jo7ZaAZ3lal89RhKxKoRkPzIO8lmOY6KUWA1mDRZ2+U=
github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
Expand Down
190 changes: 179 additions & 11 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package impl_test
import (
"bytes"
"context"
"fmt"
"math/rand"
"os"
"testing"
Expand All @@ -21,6 +22,7 @@ import (
chunker "github.com/ipfs/go-ipfs-chunker"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipldformat "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
Expand Down Expand Up @@ -1724,7 +1726,7 @@ func TestMultipleMessagesInExtension(t *testing.T) {
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender

root, origBytes := LoadRandomData(ctx, t, gsData.DagService1)
root, origBytes := LoadRandomData(ctx, t, gsData.DagService1, 256000)
gsData.OrigBytes = origBytes
rootCid := root.(cidlink.Link).Cid
tp1 := gsData.SetupGSTransportHost1()
Expand Down Expand Up @@ -1860,8 +1862,174 @@ func TestMultipleMessagesInExtension(t *testing.T) {
gsData.VerifyFileTransferred(t, root, true)
}

func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAGService) (ipld.Link, []byte) {
data := make([]byte, 256000)
// completeRevalidator does not pause when sending the last voucher to confirm the deal is completed
type completeRevalidator struct {
*retrievalRevalidator
}

func (r *completeRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, datatransfer.VoucherResult, error) {
return true, r.finalVoucher, nil
}

func TestMultipleParallelTransfers(t *testing.T) {
SetDTLogLevelDebug()

// Add more sizes here to trigger more transfers.
sizes := []int{300000, 256000, 200000, 256000}

ctx := context.Background()

gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender

tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)

dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

// In this retrieval flow we expect 2 voucher results:
// The first one is sent as a response from the initial request telling the client
// the provider has accepted the request and is starting to send blocks
respVoucher := testutil.NewFakeDTType()
encodedRVR, err := encoding.Encode(respVoucher)
require.NoError(t, err)

// The final voucher result is sent by the provider to let the client know the deal is completed
finalVoucherResult := testutil.NewFakeDTType()
encodedFVR, err := encoding.Encode(finalVoucherResult)
require.NoError(t, err)

sv := testutil.NewStubbedValidator()
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
// Stub in the validator so it returns that exact voucher when calling ValidatePull
// this validator will not pause transfer when accepting a transfer and will start
// sending blocks immediately
sv.StubResult(respVoucher)

// no need for intermediary voucher results
voucherResults := []datatransfer.VoucherResult{}

pausePoints := []uint64{}
srv := &retrievalRevalidator{
testutil.NewStubbedRevalidator(), 0, 0, pausePoints, finalVoucherResult, voucherResults,
}
srv.ExpectSuccessErrResume()
require.NoError(t, dt1.RegisterRevalidator(testutil.NewFakeDTType(), srv))

// Register our response voucher with the client
require.NoError(t, dt2.RegisterVoucherResultType(respVoucher))

// for each size we create a new random DAG of the given size and try to retrieve it
for _, size := range sizes {
size := size
t.Run(fmt.Sprintf("size %d", size), func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

errChan := make(chan struct{}, 2)

clientGotResponse := make(chan struct{}, 1)
clientFinished := make(chan struct{}, 1)

var chid datatransfer.ChannelID
chidReceived := make(chan struct{})
dt2.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
<-chidReceived
if chid != channelState.ChannelID() {
return
}
if event.Code == datatransfer.Error {
errChan <- struct{}{}
}
// Here we verify reception of voucherResults by the client
if event.Code == datatransfer.NewVoucherResult {
voucherResult := channelState.LastVoucherResult()
encodedVR, err := encoding.Encode(voucherResult)
require.NoError(t, err)

// If this voucher result is the response voucher no action is needed
// we just know that the provider has accepted the transfer and is sending blocks
if bytes.Equal(encodedVR, encodedRVR) {
// The test will fail if no response voucher is received
clientGotResponse <- struct{}{}
}

// If this voucher result is the final voucher result we need
// to send a new voucher to unpause the provider and complete the transfer
if bytes.Equal(encodedVR, encodedFVR) {
_ = dt2.SendVoucher(ctx, chid, testutil.NewFakeDTType())
}
}

if channelState.Status() == datatransfer.Completed {
clientFinished <- struct{}{}
}
})

providerFinished := make(chan struct{}, 1)
dt1.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
<-chidReceived
if chid != channelState.ChannelID() {
return
}
if event.Code == datatransfer.Error {
fmt.Println(event.Message)
errChan <- struct{}{}
}
if channelState.Status() == datatransfer.Completed {
providerFinished <- struct{}{}
}
})

root, origBytes := LoadRandomData(ctx, t, gsData.DagService1, size)
rootCid := root.(cidlink.Link).Cid

voucher := testutil.NewFakeDTType()
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector)
require.NoError(t, err)
close(chidReceived)
// Expect the client to receive a response voucher, the provider to complete the transfer and
// the client to finish the transfer
for clientGotResponse != nil || providerFinished != nil || clientFinished != nil {
select {
case <-ctx.Done():
reason := "Did not complete successful data transfer"
switch true {
case clientGotResponse != nil:
reason = "client did not get initial response"
case clientFinished != nil:
reason = "client did not finish"
case providerFinished != nil:
reason = "provider did not finish"
}
t.Fatal(reason)
case <-clientGotResponse:
clientGotResponse = nil
case <-providerFinished:
providerFinished = nil
case <-clientFinished:
clientFinished = nil
case <-errChan:
t.Fatal("received unexpected error")
}
}
sv.VerifyExpectations(t)
srv.VerifyExpectations(t)
testutil.VerifyHasFile(gsData.Ctx, t, gsData.DagService2, root, origBytes)
})
}
}

func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAGService, size int) (ipld.Link, []byte) {
data := make([]byte, size)
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)

// import to UnixFS
Expand All @@ -1874,7 +2042,7 @@ func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAG
Dagserv: bufferedDS,
}

db, err := params.New(chunker.NewSizeSplitter(bytes.NewReader(data), int64(1<<10)))
db, err := params.New(chunker.NewSizeSplitter(bytes.NewReader(data), 128000))
require.NoError(t, err)

nd, err := balanced.Layout(db)
Expand Down Expand Up @@ -1928,10 +2096,10 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,

}

//func SetDTLogLevelDebug() {
// _ = logging.SetLogLevel("dt-impl", "debug")
// _ = logging.SetLogLevel("dt-chanmon", "debug")
// _ = logging.SetLogLevel("dt_graphsync", "debug")
// _ = logging.SetLogLevel("data_transfer", "debug")
// _ = logging.SetLogLevel("data_transfer_network", "debug")
//}
func SetDTLogLevelDebug() {
_ = logging.SetLogLevel("dt-impl", "debug")
_ = logging.SetLogLevel("dt-chanmon", "debug")
_ = logging.SetLogLevel("dt_graphsync", "debug")
_ = logging.SetLogLevel("data-transfer", "debug")
_ = logging.SetLogLevel("data_transfer_network", "debug")
}