Skip to content

Commit

Permalink
Message compatibility on graphsync (#102)
Browse files Browse the repository at this point in the history
* feat(graphsync): make compatible across protocol

while we had setup message comptability over libp2p we had not handled if the other peer only sends
message1_0 over graphsync. Support this by always sending both extensions when possible and
defaulting to 1_1 when present

* fix(deps): update to tagged go-graphsync
  • Loading branch information
hannahhoward authored Oct 14, 2020
1 parent eee8d1c commit ff7f826
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c
github.com/ipfs/go-graphsync v0.3.0
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c h1:De/AZGvRa3WMyw5zdMMhcvRcho46BVo+C0NRud+T4io=
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
github.com/ipfs/go-graphsync v0.3.0 h1:I6Y20kSuCWkUvPoUWo4V3am704/9QjgDVVkf0zIV8+8=
github.com/ipfs/go-graphsync v0.3.0/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
8 changes: 4 additions & 4 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
extData := buf.Bytes()

request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
Name: extension.ExtensionDataTransfer,
Name: extension.ExtensionDataTransfer1_1,
Data: extData,
})
gsmessage := gsmsg.New()
Expand All @@ -1183,7 +1183,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
extData := buf.Bytes()

request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
Name: extension.ExtensionDataTransfer,
Name: extension.ExtensionDataTransfer1_1,
Data: extData,
})
gsmessage := gsmsg.New()
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
extData := buf.Bytes()

gsRequest := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
Name: extension.ExtensionDataTransfer,
Name: extension.ExtensionDataTransfer1_1,
Data: extData,
})

Expand Down Expand Up @@ -1304,7 +1304,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
require.NoError(t, err)
extData := buf.Bytes()
request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
Name: extension.ExtensionDataTransfer,
Name: extension.ExtensionDataTransfer1_1,
Data: extData,
})
gsmessage := gsmsg.New()
Expand Down
14 changes: 7 additions & 7 deletions testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func matchDtMessage(t *testing.T, extensions []graphsync.ExtensionData) datatransfer.Message {
var matchedExtension *graphsync.ExtensionData
for _, ext := range extensions {
if ext.Name == extension.ExtensionDataTransfer {
if ext.Name == extension.ExtensionDataTransfer1_1 {
matchedExtension = &ext
break
}
Expand Down Expand Up @@ -512,12 +512,12 @@ var _ graphsync.IncomingBlockHookActions = &FakeIncomingBlockHookActions{}

type FakeOutgoingBlockHookActions struct {
TerminationError error
SentExtension graphsync.ExtensionData
SentExtensions []graphsync.ExtensionData
Paused bool
}

func (fa *FakeOutgoingBlockHookActions) SendExtensionData(extension graphsync.ExtensionData) {
fa.SentExtension = extension
fa.SentExtensions = append(fa.SentExtensions, extension)
}

func (fa *FakeOutgoingBlockHookActions) TerminateWithError(err error) {
Expand All @@ -534,12 +534,12 @@ type FakeIncomingRequestHookActions struct {
PersistenceOption string
TerminationError error
Validated bool
SentExtension graphsync.ExtensionData
SentExtensions []graphsync.ExtensionData
Paused bool
}

func (fa *FakeIncomingRequestHookActions) SendExtensionData(ext graphsync.ExtensionData) {
fa.SentExtension = ext
fa.SentExtensions = append(fa.SentExtensions, ext)
}

func (fa *FakeIncomingRequestHookActions) UsePersistenceOption(name string) {
Expand All @@ -565,12 +565,12 @@ var _ graphsync.IncomingRequestHookActions = &FakeIncomingRequestHookActions{}

type FakeRequestUpdatedActions struct {
TerminationError error
SentExtension graphsync.ExtensionData
SentExtensions []graphsync.ExtensionData
Unpaused bool
}

func (fa *FakeRequestUpdatedActions) SendExtensionData(extension graphsync.ExtensionData) {
fa.SentExtension = extension
fa.SentExtensions = append(fa.SentExtensions, extension)
}

func (fa *FakeRequestUpdatedActions) TerminateWithError(err error) {
Expand Down
31 changes: 28 additions & 3 deletions testutil/gstestdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/network"
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/filecoin-project/go-data-transfer/transport/graphsync/extension"
)

var allSelector ipld.Node
Expand All @@ -58,9 +59,16 @@ func init() {
const unixfsChunkSize uint64 = 1 << 10
const unixfsLinksPerLevel = 1024

var extsForProtocol = map[protocol.ID]graphsync.ExtensionName{
datatransfer.ProtocolDataTransfer1_1: extension.ExtensionDataTransfer1_1,
datatransfer.ProtocolDataTransfer1_0: extension.ExtensionDataTransfer1_0,
}

// GraphsyncTestingData is a test harness for testing data transfer on top of
// graphsync
type GraphsyncTestingData struct {
host1Protocols []protocol.ID
host2Protocols []protocol.ID
Ctx context.Context
Mn mocknet.Mocknet
StoredCounter1 *storedcounter.StoredCounter
Expand Down Expand Up @@ -147,7 +155,8 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T, host1Protocols [

// create a selector for the whole UnixFS dag
gsData.AllSelector = allSelector

gsData.host1Protocols = host1Protocols
gsData.host2Protocols = host2Protocols
return gsData
}

Expand All @@ -161,7 +170,15 @@ func (gsData *GraphsyncTestingData) SetupGraphsyncHost1() graphsync.GraphExchang
func (gsData *GraphsyncTestingData) SetupGSTransportHost1() datatransfer.Transport {
// setup graphsync
gs := gsData.SetupGraphsyncHost1()
return gstransport.NewTransport(gsData.Host1.ID(), gs)
var opts []gstransport.Option
if len(gsData.host1Protocols) != 0 {
supportedExtensions := make([]graphsync.ExtensionName, 0, len(gsData.host1Protocols))
for _, protoID := range gsData.host1Protocols {
supportedExtensions = append(supportedExtensions, extsForProtocol[protoID])
}
opts = append(opts, gstransport.SupportedExtensions(supportedExtensions))
}
return gstransport.NewTransport(gsData.Host1.ID(), gs, opts...)
}

// SetupGraphsyncHost2 sets up a new, real graphsync instance on top of the second host
Expand All @@ -174,7 +191,15 @@ func (gsData *GraphsyncTestingData) SetupGraphsyncHost2() graphsync.GraphExchang
func (gsData *GraphsyncTestingData) SetupGSTransportHost2() datatransfer.Transport {
// setup graphsync
gs := gsData.SetupGraphsyncHost2()
return gstransport.NewTransport(gsData.Host2.ID(), gs)
var opts []gstransport.Option
if len(gsData.host2Protocols) != 0 {
supportedExtensions := make([]graphsync.ExtensionName, 0, len(gsData.host2Protocols))
for _, protoID := range gsData.host2Protocols {
supportedExtensions = append(supportedExtensions, extsForProtocol[protoID])
}
opts = append(opts, gstransport.SupportedExtensions(supportedExtensions))
}
return gstransport.NewTransport(gsData.Host2.ID(), gs, opts...)
}

// LoadUnixFSFile loads a fixtures file we can test dag transfer with
Expand Down
68 changes: 53 additions & 15 deletions transport/graphsync/extension/gsextension.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,56 @@ package extension

import (
"bytes"
"errors"
"io"

"github.com/ipfs/go-graphsync"
"github.com/libp2p/go-libp2p-core/protocol"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/message"
"github.com/filecoin-project/go-data-transfer/message/message1_0"
)

const (
// ExtensionDataTransfer is the identifier for the data transfer extension to graphsync
ExtensionDataTransfer = graphsync.ExtensionName("fil/data-transfer")
// ExtensionDataTransfer1_1 is the identifier for the current data transfer extension to graphsync
ExtensionDataTransfer1_1 = graphsync.ExtensionName("fil/data-transfer/1.1")
// ExtensionDataTransfer1_0 is the identifier for the legacy data transfer extension to graphsync
ExtensionDataTransfer1_0 = graphsync.ExtensionName("fil/data-transfer")
)

// ProtocolMap maps graphsync extensions to their libp2p protocols
var ProtocolMap = map[graphsync.ExtensionName]protocol.ID{
ExtensionDataTransfer1_1: datatransfer.ProtocolDataTransfer1_1,
ExtensionDataTransfer1_0: datatransfer.ProtocolDataTransfer1_0,
}

// ToExtensionData converts a message to a graphsync extension
func ToExtensionData(msg datatransfer.Message) (graphsync.ExtensionData, error) {
buf := new(bytes.Buffer)
err := msg.ToNet(buf)
if err != nil {
return graphsync.ExtensionData{}, err
func ToExtensionData(msg datatransfer.Message, supportedExtensions []graphsync.ExtensionName) ([]graphsync.ExtensionData, error) {
exts := make([]graphsync.ExtensionData, 0, len(supportedExtensions))
for _, supportedExtension := range supportedExtensions {
protoID, ok := ProtocolMap[supportedExtension]
if !ok {
return nil, errors.New("unsupported protocol")
}
versionedMsg, err := msg.MessageForProtocol(protoID)
if err != nil {
continue
}
buf := new(bytes.Buffer)
err = versionedMsg.ToNet(buf)
if err != nil {
return nil, err
}
exts = append(exts, graphsync.ExtensionData{
Name: supportedExtension,
Data: buf.Bytes(),
})
}
return graphsync.ExtensionData{
Name: ExtensionDataTransfer,
Data: buf.Bytes(),
}, nil
if len(exts) == 0 {
return nil, errors.New("message not encodable in any supported extensions")
}
return exts, nil
}

// GsExtended is a small interface used by getExtensionData
Expand All @@ -38,11 +65,22 @@ type GsExtended interface {
// * nil + error if the extendedData fails to unmarshal
// * unmarshaled ExtensionDataTransferData + nil if all goes well
func GetTransferData(extendedData GsExtended) (datatransfer.Message, error) {
data, ok := extendedData.Extension(ExtensionDataTransfer)
extName := ExtensionDataTransfer1_1
data, ok := extendedData.Extension(extName)
if !ok {
return nil, nil
extName = ExtensionDataTransfer1_0
data, ok = extendedData.Extension(extName)
if !ok {
return nil, nil
}
}

reader := bytes.NewReader(data)
return message.FromNet(reader)
return decoders[extName](reader)
}

type decoder func(io.Reader) (datatransfer.Message, error)

var decoders = map[graphsync.ExtensionName]decoder{
ExtensionDataTransfer1_1: message.FromNet,
ExtensionDataTransfer1_0: message1_0.FromNet,
}
Loading

0 comments on commit ff7f826

Please sign in to comment.