From 9a50a1f3ccd10646f7b13279f8c675a81dbc060f Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 7 Sep 2021 03:03:16 -0700 Subject: [PATCH] Fix parallel transfers between same two peers (#254) * add test reproducing race * update test to run parallel transfers between 2 peers * test(impl): fix parallel transfer test and fix issue Update the parallel transfer test to fix the issues that made the test ineffective and then update go-graphsync to fix the actual issue * feat(deps): update to tagged GS Co-authored-by: tchardin --- go.mod | 2 +- go.sum | 4 +- impl/integration_test.go | 190 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 182 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 955c2b76..ed11ffee 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-ds-badger v0.2.6 - github.com/ipfs/go-graphsync v0.9.0 + github.com/ipfs/go-graphsync v0.9.1 github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index 6b09ba11..4c41415c 100644 --- a/go.sum +++ b/go.sum @@ -223,8 +223,8 @@ github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6 github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.9.0 h1:T22kORlNbJUIm/+avUIfLnAf1BkwKG6aS19NsRVjVVY= -github.com/ipfs/go-graphsync v0.9.0/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw= +github.com/ipfs/go-graphsync v0.9.1 h1:jo7ZaAZ3lal89RhKxKoRkPzIO8lmOY6KUWA1mDRZ2+U= +github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ= diff --git a/impl/integration_test.go b/impl/integration_test.go index bda25604..69a5a98c 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -3,6 +3,7 @@ package impl_test import ( "bytes" "context" + "fmt" "math/rand" "os" "testing" @@ -21,6 +22,7 @@ import ( chunker "github.com/ipfs/go-ipfs-chunker" offline "github.com/ipfs/go-ipfs-exchange-offline" ipldformat "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-merkledag" "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" @@ -1724,7 +1726,7 @@ func TestMultipleMessagesInExtension(t *testing.T) { gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) host1 := gsData.Host1 // initiator, data sender - root, origBytes := LoadRandomData(ctx, t, gsData.DagService1) + root, origBytes := LoadRandomData(ctx, t, gsData.DagService1, 256000) gsData.OrigBytes = origBytes rootCid := root.(cidlink.Link).Cid tp1 := gsData.SetupGSTransportHost1() @@ -1860,8 +1862,174 @@ func TestMultipleMessagesInExtension(t *testing.T) { gsData.VerifyFileTransferred(t, root, true) } -func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAGService) (ipld.Link, []byte) { - data := make([]byte, 256000) +// completeRevalidator does not pause when sending the last voucher to confirm the deal is completed +type completeRevalidator struct { + *retrievalRevalidator +} + +func (r *completeRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, datatransfer.VoucherResult, error) { + return true, r.finalVoucher, nil +} + +func TestMultipleParallelTransfers(t *testing.T) { + SetDTLogLevelDebug() + + // Add more sizes here to trigger more transfers. + sizes := []int{300000, 256000, 200000, 256000} + + ctx := context.Background() + + gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) + host1 := gsData.Host1 // initiator, data sender + + tp1 := gsData.SetupGSTransportHost1() + tp2 := gsData.SetupGSTransportHost2() + + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) + require.NoError(t, err) + testutil.StartAndWaitForReady(ctx, t, dt1) + + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) + require.NoError(t, err) + testutil.StartAndWaitForReady(ctx, t, dt2) + + // In this retrieval flow we expect 2 voucher results: + // The first one is sent as a response from the initial request telling the client + // the provider has accepted the request and is starting to send blocks + respVoucher := testutil.NewFakeDTType() + encodedRVR, err := encoding.Encode(respVoucher) + require.NoError(t, err) + + // The final voucher result is sent by the provider to let the client know the deal is completed + finalVoucherResult := testutil.NewFakeDTType() + encodedFVR, err := encoding.Encode(finalVoucherResult) + require.NoError(t, err) + + sv := testutil.NewStubbedValidator() + require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv)) + // Stub in the validator so it returns that exact voucher when calling ValidatePull + // this validator will not pause transfer when accepting a transfer and will start + // sending blocks immediately + sv.StubResult(respVoucher) + + // no need for intermediary voucher results + voucherResults := []datatransfer.VoucherResult{} + + pausePoints := []uint64{} + srv := &retrievalRevalidator{ + testutil.NewStubbedRevalidator(), 0, 0, pausePoints, finalVoucherResult, voucherResults, + } + srv.ExpectSuccessErrResume() + require.NoError(t, dt1.RegisterRevalidator(testutil.NewFakeDTType(), srv)) + + // Register our response voucher with the client + require.NoError(t, dt2.RegisterVoucherResultType(respVoucher)) + + // for each size we create a new random DAG of the given size and try to retrieve it + for _, size := range sizes { + size := size + t.Run(fmt.Sprintf("size %d", size), func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() + + errChan := make(chan struct{}, 2) + + clientGotResponse := make(chan struct{}, 1) + clientFinished := make(chan struct{}, 1) + + var chid datatransfer.ChannelID + chidReceived := make(chan struct{}) + dt2.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + <-chidReceived + if chid != channelState.ChannelID() { + return + } + if event.Code == datatransfer.Error { + errChan <- struct{}{} + } + // Here we verify reception of voucherResults by the client + if event.Code == datatransfer.NewVoucherResult { + voucherResult := channelState.LastVoucherResult() + encodedVR, err := encoding.Encode(voucherResult) + require.NoError(t, err) + + // If this voucher result is the response voucher no action is needed + // we just know that the provider has accepted the transfer and is sending blocks + if bytes.Equal(encodedVR, encodedRVR) { + // The test will fail if no response voucher is received + clientGotResponse <- struct{}{} + } + + // If this voucher result is the final voucher result we need + // to send a new voucher to unpause the provider and complete the transfer + if bytes.Equal(encodedVR, encodedFVR) { + _ = dt2.SendVoucher(ctx, chid, testutil.NewFakeDTType()) + } + } + + if channelState.Status() == datatransfer.Completed { + clientFinished <- struct{}{} + } + }) + + providerFinished := make(chan struct{}, 1) + dt1.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + <-chidReceived + if chid != channelState.ChannelID() { + return + } + if event.Code == datatransfer.Error { + fmt.Println(event.Message) + errChan <- struct{}{} + } + if channelState.Status() == datatransfer.Completed { + providerFinished <- struct{}{} + } + }) + + root, origBytes := LoadRandomData(ctx, t, gsData.DagService1, size) + rootCid := root.(cidlink.Link).Cid + + voucher := testutil.NewFakeDTType() + chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, gsData.AllSelector) + require.NoError(t, err) + close(chidReceived) + // Expect the client to receive a response voucher, the provider to complete the transfer and + // the client to finish the transfer + for clientGotResponse != nil || providerFinished != nil || clientFinished != nil { + select { + case <-ctx.Done(): + reason := "Did not complete successful data transfer" + switch true { + case clientGotResponse != nil: + reason = "client did not get initial response" + case clientFinished != nil: + reason = "client did not finish" + case providerFinished != nil: + reason = "provider did not finish" + } + t.Fatal(reason) + case <-clientGotResponse: + clientGotResponse = nil + case <-providerFinished: + providerFinished = nil + case <-clientFinished: + clientFinished = nil + case <-errChan: + t.Fatal("received unexpected error") + } + } + sv.VerifyExpectations(t) + srv.VerifyExpectations(t) + testutil.VerifyHasFile(gsData.Ctx, t, gsData.DagService2, root, origBytes) + }) + } +} + +func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAGService, size int) (ipld.Link, []byte) { + data := make([]byte, size) rand.New(rand.NewSource(time.Now().UnixNano())).Read(data) // import to UnixFS @@ -1874,7 +2042,7 @@ func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAG Dagserv: bufferedDS, } - db, err := params.New(chunker.NewSizeSplitter(bytes.NewReader(data), int64(1<<10))) + db, err := params.New(chunker.NewSizeSplitter(bytes.NewReader(data), 128000)) require.NoError(t, err) nd, err := balanced.Layout(db) @@ -1928,10 +2096,10 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context, } -//func SetDTLogLevelDebug() { -// _ = logging.SetLogLevel("dt-impl", "debug") -// _ = logging.SetLogLevel("dt-chanmon", "debug") -// _ = logging.SetLogLevel("dt_graphsync", "debug") -// _ = logging.SetLogLevel("data_transfer", "debug") -// _ = logging.SetLogLevel("data_transfer_network", "debug") -//} +func SetDTLogLevelDebug() { + _ = logging.SetLogLevel("dt-impl", "debug") + _ = logging.SetLogLevel("dt-chanmon", "debug") + _ = logging.SetLogLevel("dt_graphsync", "debug") + _ = logging.SetLogLevel("data-transfer", "debug") + _ = logging.SetLogLevel("data_transfer_network", "debug") +}