Skip to content

Commit

Permalink
feat: use different extension names to fit multiple hooks data in sam…
Browse files Browse the repository at this point in the history
…e graphsync message (#204)

* feat: use different extension names to fit multiple payloads in the same
message

* remove logs in test

* add comments, update tests and loop over extension names

* add default extension name for each hook

* add comment

* simplify extension names loop

* trigger OnResponseReceived for multiple extensions

* use processExtension + use var instead of prop for ext names

Co-authored-by: Hannah Howard <hannah@hannahhoward.net>
  • Loading branch information
tchardin and hannahhoward authored May 18, 2021
1 parent 717c6d9 commit 3a130c3
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 33 deletions.
186 changes: 184 additions & 2 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import (
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
bstore "github.com/ipfs/go-ipfs-blockstore"
chunker "github.com/ipfs/go-ipfs-chunker"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -983,14 +986,19 @@ type retrievalRevalidator struct {
providerPausePoint int
pausePoints []uint64
finalVoucher datatransfer.VoucherResult
revalVouchers []datatransfer.VoucherResult
}

func (r *retrievalRevalidator) OnPullDataSent(chid datatransfer.ChannelID, additionalBytesSent uint64) (bool, datatransfer.VoucherResult, error) {
r.dataSoFar += additionalBytesSent
if r.providerPausePoint < len(r.pausePoints) &&
r.dataSoFar >= r.pausePoints[r.providerPausePoint] {
var v datatransfer.VoucherResult = testutil.NewFakeDTType()
if len(r.revalVouchers) > r.providerPausePoint {
v = r.revalVouchers[r.providerPausePoint]
}
r.providerPausePoint++
return true, testutil.NewFakeDTType(), datatransfer.ErrPause
return true, v, datatransfer.ErrPause
}
return true, nil, nil
}
Expand Down Expand Up @@ -1098,7 +1106,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))

srv := &retrievalRevalidator{
testutil.NewStubbedRevalidator(), 0, 0, config.pausePoints, finalVoucherResult,
testutil.NewStubbedRevalidator(), 0, 0, config.pausePoints, finalVoucherResult, []datatransfer.VoucherResult{},
}
srv.ExpectSuccessErrResume()
require.NoError(t, dt1.RegisterRevalidator(testutil.NewFakeDTType(), srv))
Expand Down Expand Up @@ -1713,6 +1721,180 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
}
}

// Test the ability to attach data from multiple hooks in the same extension payload by using
// different names
func TestMultipleMessagesInExtension(t *testing.T) {
pausePoints := []uint64{1000, 3000, 6000, 10000, 15000}

ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender

root, origBytes := LoadRandomData(ctx, t, gsData.DagService1)
gsData.OrigBytes = origBytes
rootCid := root.(cidlink.Link).Cid
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)

dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

var chid datatransfer.ChannelID
errChan := make(chan struct{}, 2)

clientPausePoint := 0

clientGotResponse := make(chan struct{}, 1)
clientFinished := make(chan struct{}, 1)

// In this retrieval flow we expect 2 voucher results:
// The first one is sent as a response from the initial request telling the client
// the provider has accepted the request and is starting to send blocks
respVoucher := testutil.NewFakeDTType()
encodedRVR, err := encoding.Encode(respVoucher)
require.NoError(t, err)

// voucher results are sent by the providers to request payment while pausing until a voucher is sent
// to revalidate
voucherResults := []datatransfer.VoucherResult{
&testutil.FakeDTType{Data: "one"},
&testutil.FakeDTType{Data: "two"},
&testutil.FakeDTType{Data: "thr"},
&testutil.FakeDTType{Data: "for"},
&testutil.FakeDTType{Data: "fiv"},
}

// The final voucher result is sent by the provider to request a last payment voucher
finalVoucherResult := testutil.NewFakeDTType()
encodedFVR, err := encoding.Encode(finalVoucherResult)
require.NoError(t, err)

dt2.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.Error {
errChan <- struct{}{}
}
// Here we verify reception of voucherResults by the client
if event.Code == datatransfer.NewVoucherResult {
voucherResult := channelState.LastVoucherResult()
encodedVR, err := encoding.Encode(voucherResult)
require.NoError(t, err)

// If this voucher result is the response voucher no action is needed
// we just know that the provider has accepted the transfer and is sending blocks
if bytes.Equal(encodedVR, encodedRVR) {
// The test will fail if no response voucher is received
clientGotResponse <- struct{}{}
}

// If this voucher is a revalidation request we need to send a new voucher
// to revalidate and unpause the transfer
if clientPausePoint < 5 {
encodedExpected, err := encoding.Encode(voucherResults[clientPausePoint])
require.NoError(t, err)
if bytes.Equal(encodedVR, encodedExpected) {
_ = dt2.SendVoucher(ctx, chid, testutil.NewFakeDTType())
clientPausePoint++
}
}

// If this voucher result is the final voucher result we need
// to send a new voucher to unpause the provider and complete the transfer
if bytes.Equal(encodedVR, encodedFVR) {
_ = dt2.SendVoucher(ctx, chid, testutil.NewFakeDTType())
}
}

if channelState.Status() == datatransfer.Completed {
clientFinished <- struct{}{}
}
})

providerFinished := make(chan struct{}, 1)
dt1.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.Error {
errChan <- struct{}{}
}
if channelState.Status() == datatransfer.Completed {
providerFinished <- struct{}{}
}
})

sv := testutil.NewStubbedValidator()
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
// Stub in the validator so it returns that exact voucher when calling ValidatePull
// this validator will not pause transfer when accepting a transfer and will start
// sending blocks immediately
sv.StubResult(respVoucher)

srv := &retrievalRevalidator{
testutil.NewStubbedRevalidator(), 0, 0, pausePoints, finalVoucherResult, voucherResults,
}
// The stubbed revalidator will authorize Revalidate and return ErrResume to finisht the transfer
srv.ExpectSuccessErrResume()
require.NoError(t, dt1.RegisterRevalidator(testutil.NewFakeDTType(), srv))

// Register our response voucher with the client
require.NoError(t, dt2.RegisterVoucherResultType(respVoucher))

voucher := testutil.FakeDTType{Data: "applesauce"}
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector)
require.NoError(t, err)

// Expect the client to receive a response voucher, the provider to complete the transfer and
// the client to finish the transfer
for clientGotResponse != nil || providerFinished != nil || clientFinished != nil {
select {
case <-ctx.Done():
t.Fatal("Did not complete successful data transfer")
case <-clientGotResponse:
clientGotResponse = nil
case <-providerFinished:
providerFinished = nil
case <-clientFinished:
clientFinished = nil
case <-errChan:
t.Fatal("received unexpected error")
}
}
sv.VerifyExpectations(t)
srv.VerifyExpectations(t)
gsData.VerifyFileTransferred(t, root, true)
}

func LoadRandomData(ctx context.Context, t *testing.T, dagService ipldformat.DAGService) (ipld.Link, []byte) {
data := make([]byte, 256000)
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)

// import to UnixFS
bufferedDS := ipldformat.NewBufferedDAG(ctx, dagService)

params := ihelper.DagBuilderParams{
Maxlinks: 1024,
RawLeaves: true,
CidBuilder: nil,
Dagserv: bufferedDS,
}

db, err := params.New(chunker.NewSizeSplitter(bytes.NewReader(data), int64(1<<10)))
require.NoError(t, err)

nd, err := balanced.Layout(db)
require.NoError(t, err)

err = bufferedDS.Commit()
require.NoError(t, err)

// save the original files bytes
return cidlink.Link{Cid: nd.Cid()}, data
}

type receivedMessage struct {
message datatransfer.Message
sender peer.ID
Expand Down
33 changes: 19 additions & 14 deletions transport/graphsync/extension/gsextension.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
)

const (
// ExtensionIncomingRequest1_1 is the identifier for data sent by the IncomingRequest hook
ExtensionIncomingRequest1_1 = graphsync.ExtensionName("fil/data-transfer/incoming-request/1.1")
// ExtensionOutgoingBlock1_1 is the identifier for data sent by the OutgoingBlock hook
ExtensionOutgoingBlock1_1 = graphsync.ExtensionName("fil/data-transfer/outgoing-block/1.1")
// ExtensionDataTransfer1_1 is the identifier for the current data transfer extension to graphsync
ExtensionDataTransfer1_1 = graphsync.ExtensionName("fil/data-transfer/1.1")
// ExtensionDataTransfer1_0 is the identifier for the legacy data transfer extension to graphsync
Expand All @@ -22,8 +26,10 @@ const (

// ProtocolMap maps graphsync extensions to their libp2p protocols
var ProtocolMap = map[graphsync.ExtensionName]protocol.ID{
ExtensionDataTransfer1_1: datatransfer.ProtocolDataTransfer1_1,
ExtensionDataTransfer1_0: datatransfer.ProtocolDataTransfer1_0,
ExtensionIncomingRequest1_1: datatransfer.ProtocolDataTransfer1_1,
ExtensionOutgoingBlock1_1: datatransfer.ProtocolDataTransfer1_1,
ExtensionDataTransfer1_1: datatransfer.ProtocolDataTransfer1_1,
ExtensionDataTransfer1_0: datatransfer.ProtocolDataTransfer1_0,
}

// ToExtensionData converts a message to a graphsync extension
Expand Down Expand Up @@ -64,23 +70,22 @@ type GsExtended interface {
// * nil + nil if the extension is not found
// * nil + error if the extendedData fails to unmarshal
// * unmarshaled ExtensionDataTransferData + nil if all goes well
func GetTransferData(extendedData GsExtended) (datatransfer.Message, error) {
extName := ExtensionDataTransfer1_1
data, ok := extendedData.Extension(extName)
if !ok {
extName = ExtensionDataTransfer1_0
data, ok = extendedData.Extension(extName)
if !ok {
return nil, nil
func GetTransferData(extendedData GsExtended, extNames []graphsync.ExtensionName) (datatransfer.Message, error) {
for _, name := range extNames {
data, ok := extendedData.Extension(name)
if ok {
reader := bytes.NewReader(data)
return decoders[name](reader)
}
}
reader := bytes.NewReader(data)
return decoders[extName](reader)
return nil, nil
}

type decoder func(io.Reader) (datatransfer.Message, error)

var decoders = map[graphsync.ExtensionName]decoder{
ExtensionDataTransfer1_1: message.FromNet,
ExtensionDataTransfer1_0: message1_0.FromNet,
ExtensionIncomingRequest1_1: message.FromNet,
ExtensionOutgoingBlock1_1: message.FromNet,
ExtensionDataTransfer1_1: message.FromNet,
ExtensionDataTransfer1_0: message1_0.FromNet,
}
48 changes: 39 additions & 9 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,22 @@ type graphsyncKey struct {
p peer.ID
}

var defaultSupportedExtensions = []graphsync.ExtensionName{extension.ExtensionDataTransfer1_1, extension.ExtensionDataTransfer1_0}
var defaultSupportedExtensions = []graphsync.ExtensionName{
extension.ExtensionDataTransfer1_1,
extension.ExtensionDataTransfer1_0,
}

var incomingReqExtensions = []graphsync.ExtensionName{
extension.ExtensionIncomingRequest1_1,
extension.ExtensionDataTransfer1_1,
extension.ExtensionDataTransfer1_0,
}

var outgoingBlkExtensions = []graphsync.ExtensionName{
extension.ExtensionOutgoingBlock1_1,
extension.ExtensionDataTransfer1_1,
extension.ExtensionDataTransfer1_0,
}

// Option is an option for setting up the graphsync transport
type Option func(*Transport)
Expand Down Expand Up @@ -308,7 +323,7 @@ func (t *Transport) UseStore(channelID datatransfer.ChannelID, loader ipld.Loade

// gsOutgoingRequestHook is called when a graphsync request is made
func (t *Transport) gsOutgoingRequestHook(p peer.ID, request graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
message, _ := extension.GetTransferData(request)
message, _ := extension.GetTransferData(request, t.supportedExtensions)

// extension not found; probably not our request.
if message == nil {
Expand Down Expand Up @@ -420,7 +435,9 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData
}

if msg != nil {
extensions, err := extension.ToExtensionData(msg, t.supportedExtensions)
// gsOutgoingBlockHook uses a unique extension name so it can be attached with data from a different hook
// outgoingBlkExtensions also includes the default extension name so it remains compatible with all data-transfer protocol versions out there
extensions, err := extension.ToExtensionData(msg, outgoingBlkExtensions)
if err != nil {
hookActions.TerminateWithError(err)
return
Expand All @@ -433,7 +450,8 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData

// gsReqRecdHook is called when graphsync receives an incoming request for data
func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
msg, err := extension.GetTransferData(request)
// if this is a push request the sender is us.
msg, err := extension.GetTransferData(request, t.supportedExtensions)
if err != nil {
hookActions.TerminateWithError(err)
return
Expand Down Expand Up @@ -486,7 +504,10 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook

// If we need to send a response, add the response message as an extension
if responseMessage != nil {
extensions, extensionErr := extension.ToExtensionData(responseMessage, t.supportedExtensions)
// gsReqRecdHook uses a unique extension name so it can be attached with data from a different hook
// incomingReqExtensions also includes default extension name so it remains compatible with previous data-transfer
// protocol versions out there.
extensions, extensionErr := extension.ToExtensionData(responseMessage, incomingReqExtensions)
if extensionErr != nil {
hookActions.TerminateWithError(err)
return
Expand Down Expand Up @@ -593,7 +614,7 @@ func (t *Transport) gsRequestUpdatedHook(p peer.ID, request graphsync.RequestDat
return
}

responseMessage, err := t.processExtension(chid, update, p)
responseMessage, err := t.processExtension(chid, update, p, t.supportedExtensions)

if responseMessage != nil {
extensions, extensionErr := extension.ToExtensionData(responseMessage, t.supportedExtensions)
Expand All @@ -619,7 +640,7 @@ func (t *Transport) gsIncomingResponseHook(p peer.ID, response graphsync.Respons
return
}

responseMessage, err := t.processExtension(chid, response, p)
responseMessage, err := t.processExtension(chid, response, p, incomingReqExtensions)

if responseMessage != nil {
extensions, extensionErr := extension.ToExtensionData(responseMessage, t.supportedExtensions)
Expand All @@ -635,12 +656,21 @@ func (t *Transport) gsIncomingResponseHook(p peer.ID, response graphsync.Respons
if err != nil {
hookActions.TerminateWithError(err)
}

// In a case where the transfer sends blocks immediately this extension may contain both a
// response message and a revalidation request so we trigger OnResponseReceived again for this
// specific extension name
_, err = t.processExtension(chid, response, p, []graphsync.ExtensionName{extension.ExtensionOutgoingBlock1_1})

if err != nil {
hookActions.TerminateWithError(err)
}
}

func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extension.GsExtended, p peer.ID) (datatransfer.Message, error) {
func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extension.GsExtended, p peer.ID, exts []graphsync.ExtensionName) (datatransfer.Message, error) {

// if this is a push request the sender is us.
msg, err := extension.GetTransferData(gsMsg)
msg, err := extension.GetTransferData(gsMsg, exts)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 3a130c3

Please sign in to comment.