-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
fe22799
commit 825168f
Showing
4 changed files
with
347 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
# go-fil-components/datatransfer | ||
|
||
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) | ||
|
||
## Description | ||
This module encapsulates retrieval market functionality | ||
|
||
|
||
## Table of Contents | ||
* [Background](https://github.com/filecoin-project/go-fil-components/datatransfer/#background) | ||
* [Usage](https://github.com/filecoin-project/go-fil-components#usage) | ||
* [Initialize a data transfer module](https://github.com/filecoin-project/go-fil-components#initialize-a-data-transfer-module) | ||
* [Register a validator](https://github.com/filecoin-project/go-fil-components#register-a-validator) | ||
* [Open a Push or Pull Request](https://github.com/filecoin-project/go-fil-components#open-a-push-or-pull-request) | ||
* [Subscribe to Events](https://github.com/filecoin-project/go-fil-components#subscribe-to-events) | ||
* [Contribute](https://github.com/filecoin-project/go-fil-components#contribute) | ||
|
||
## Background | ||
|
||
Please see the [design documentation](https://github.com/filecoin-project/go-fil-components/tree/master/datatransfer/docs/DESIGNDOC) | ||
for this module for a high-level overview and and explanation of the terms and concepts. | ||
|
||
## Usage | ||
|
||
**Requires go 1.13** | ||
|
||
Install the module in your package or app with `go get "github.com/filecoin-project/go-fil-components/datatransfer"` | ||
|
||
|
||
### Initialize a data transfer module | ||
1. Set up imports. You need, minimally, the following imports: | ||
```go | ||
package mypackage | ||
|
||
import ( | ||
gsimpl "github.com/ipfs/go-graphsync/impl" | ||
"github.com/filecoin-project/go-fil-components/datatransfer" | ||
"github.com/libp2p/go-libp2p-core/host" | ||
) | ||
|
||
``` | ||
1. Provide or create a [libp2p host.Host](https://github.com/libp2p/go-libp2p-examples/tree/master/libp2p-host) | ||
1. Provide or create a [go-graphsync GraphExchange](https://github.com/ipfs/go-graphsync#initializing-a-graphsync-exchange) | ||
1. Create a new instance of GraphsyncDataTransfer | ||
```go | ||
func NewGraphsyncDatatransfer(h host.Host, gs graphsync.GraphExchange) { | ||
dt := datatransfer.NewGraphSyncDataTransfer(h, gs) | ||
} | ||
``` | ||
|
||
1. If needed, build out your voucher struct and its validator. | ||
|
||
A push or pull request must include a voucher. The voucher's type must have been registered with | ||
the node receiving the request before it's sent, otherwise the request will be rejected. | ||
|
||
[datatransfer.Voucher](https://github.com/filecoin-project/go-fil-components/blob/21dd66ba370176224114b13030ee68cb785fadb2/datatransfer/types.go#L17) | ||
and [datatransfer.Validator](https://github.com/filecoin-project/go-fil-components/blob/21dd66ba370176224114b13030ee68cb785fadb2/datatransfer/types.go#L153) | ||
are the interfaces used for validation of graphsync datatransfer messages. Voucher types plus a Validator for them must be registered | ||
with the peer to whom requests will be sent. | ||
|
||
#### Example Toy Voucher and Validator | ||
```go | ||
type myVoucher struct { | ||
data string | ||
} | ||
func (v *myVoucher) ToBytes() ([]byte, error) { | ||
return []byte(v.data), nil | ||
} | ||
func (v *myVoucher) FromBytes(data []byte) error { | ||
v.data = string(data) | ||
return nil | ||
} | ||
func (v *myVoucher) Type() string { | ||
return "FakeDTType" | ||
} | ||
type myValidator struct { | ||
ctx context.Context | ||
validationsReceived chan receivedValidation | ||
} | ||
func (vl *myValidator) ValidatePush( | ||
sender peer.ID, | ||
voucher datatransfer.Voucher, | ||
baseCid cid.Cid, | ||
selector ipld.Node) error { | ||
v := voucher.(*myVoucher) | ||
if v.data == "" || v.data != "validpush" { | ||
return errors.New("invalid") | ||
} | ||
return nil | ||
} | ||
func (vl *myValidator) ValidatePull( | ||
receiver peer.ID, | ||
voucher datatransfer.Voucher, | ||
baseCid cid.Cid, | ||
selector ipld.Node) error { | ||
v := voucher.(*myVoucher) | ||
if v.data == "" || v.data != "validpull" { | ||
return errors.New("invalid") | ||
} | ||
return nil | ||
} | ||
``` | ||
|
||
|
||
Please see | ||
[go-fil-components/blob/master/datatransfer/types.go](https://github.com/filecoin-project/go-fil-components/blob/master/datatransfer/types.go) | ||
for more detail. | ||
|
||
|
||
### Register a validator | ||
Before sending push or pull requests, you must register a `datatransfer.Voucher` | ||
by its `reflect.Type` and `dataTransfer.RequestValidator` for vouchers that | ||
must be sent with the request. Using the trivial examples above: | ||
```go | ||
func NewGraphsyncDatatransfer(h host.Host, gs graphsync.GraphExchange) { | ||
dt := datatransfer.NewGraphSyncDataTransfer(h, gs) | ||
vouch := &myVoucher{} | ||
mv := &myValidator{} | ||
dt.RegisterVoucherType(reflect.TypeOf(vouch), mv) | ||
} | ||
``` | ||
|
||
For more detail, please see the [unit tests](https://github.com/filecoin-project/go-fil-components/blob/master/datatransfer/impl/graphsync/graphsync_impl_test.go). | ||
|
||
### Open a Push or Pull Request | ||
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These | ||
calls return a `datatransfer.ChannelID` and any error: | ||
```go | ||
channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucher, baseCid, selector) | ||
// OR | ||
channelID, err := dtm.OpenPushDataChannel(ctx, recipient, voucher, baseCid, selector) | ||
``` | ||
|
||
### Subscribe to Events | ||
|
||
The module allows the consumer to be notified when a graphsync Request is sent or a datatransfer push or pull request response is received: | ||
|
||
```go | ||
func ToySubscriberFunc (event Event, channelState ChannelState) { | ||
if event.Code == datatransfer.Error { | ||
// log error, flail about helplessly | ||
return | ||
} | ||
// | ||
if channelState.Recipient() == our.PeerID && channelState.Received() > 0 { | ||
// log some stuff, update some state somewhere, send data to a channel, etc. | ||
} | ||
} | ||
dtm := SetupDataTransferManager(ctx, h, gs, baseCid, snode) | ||
unsubFunc := dtm.SubscribeToEvents(ToySubscriberFunc) | ||
// . . . later, when you don't need to know about events any more: | ||
unsubFunc() | ||
``` | ||
|
||
## Contributing | ||
PRs are welcome! Please first read the design docs and look over the current code. PRs against | ||
master require approval of at least two maintainers. For the rest, please see our | ||
[CONTRIBUTING](https://github.com/filecoin-project/go-fil-components/CONTRIBUTING.md) guide. | ||
|
||
Copyright 2019. Protocol Labs, Inc. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,19 @@ | ||
package network | ||
|
||
import host "github.com/libp2p/go-libp2p-core/host" | ||
import ( | ||
logging "github.com/ipfs/go-log" | ||
host "github.com/libp2p/go-libp2p-core/host" | ||
) | ||
|
||
func NewLibp2pNetwork(host.Host) RetrievalMarketNetwork { | ||
var log = logging.Logger("retrieval_network") | ||
|
||
func NewFromLibp2pHost(host.Host) RetrievalMarketNetwork { | ||
return nil | ||
} | ||
// libp2pDataTransferNetwork transforms the libp2p host interface, which sends and receives | ||
// NetMessage objects, into the graphsync network interface. | ||
type libp2pDataTransferNetwork struct { | ||
host host.Host | ||
// inbound messages from the network are forwarded to the receiver | ||
receiver | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
package network_test | ||
|
||
import ( | ||
"errors" | ||
rm "github.com/filecoin-project/go-fil-components/retrievalmarket" | ||
rmnet "github.com/filecoin-project/go-fil-components/retrievalmarket/network" | ||
"github.com/ipfs/go-cid" | ||
"github.com/libp2p/go-libp2p-core/host" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
type TestRetrievalQueryStream struct{ | ||
query rm.Query | ||
queryResp rm.QueryResponse | ||
failRead, failWrite bool | ||
} | ||
|
||
func (trqs TestRetrievalQueryStream) NewTestRetrievalQueryStream(pieceCid cid.Cid,) *TestRetrievalQueryStream { | ||
q := rm.NewQueryV0(pieceCid.Bytes()) | ||
return &TestRetrievalQueryStream{ query: q } | ||
} | ||
|
||
func (trqs TestRetrievalQueryStream)ReadQuery() (rm.Query, error){ | ||
if trqs.failRead { | ||
return rm.Query{}, errors.New("fail ReadQuery") | ||
} | ||
return trqs.query, nil | ||
} | ||
func (trqs TestRetrievalQueryStream)WriteQuery(newQuery rm.Query) error{ | ||
if trqs.failWrite { | ||
return errors.New("fail WriteQuery") | ||
} | ||
trqs.query = newQuery | ||
return nil | ||
} | ||
func (trqs TestRetrievalQueryStream)ReadQueryResponse() (rm.QueryResponse, error){ | ||
if trqs.failRead { | ||
return rm.QueryResponse{}, errors.New("fail ReadQueryResponse") | ||
} | ||
return trqs.queryResp, nil | ||
} | ||
func (trqs TestRetrievalQueryStream)WriteQueryResponse(newResp rm.QueryResponse) error{ | ||
if trqs.failWrite { | ||
return errors.New("fail WriteQueryResponse") | ||
} | ||
trqs.queryResp = newResp | ||
return nil | ||
} | ||
|
||
type TestRetrievalDealStream struct{ | ||
dprop rm.DealProposal | ||
dresp rm.DealResponse | ||
dpaym rm.DealPayment | ||
|
||
failRead, failWrite bool | ||
} | ||
|
||
func NewTestRetrievalDealStream(dprop rm.DealProposal, dresp rm.DealResponse, dpaym rm.DealPayment, fr, fw bool) *TestRetrievalDealStream { | ||
return &TestRetrievalDealStream{ dprop, dresp, dpaym, fr, fw } | ||
} | ||
|
||
func (trds TestRetrievalDealStream)ReadDealProposal() (rm.DealProposal, error){ | ||
if trds.failRead { | ||
return rm.DealProposal{}, errors.New("fail ReadDealProposal") | ||
} | ||
return trds.dprop, nil | ||
} | ||
func (trds TestRetrievalDealStream)WriteDealProposal(rm.DealProposal) error{ | ||
if trds.failWrite { | ||
return errors.New("fail WriteDealProposal") | ||
} | ||
return nil | ||
} | ||
func (trds TestRetrievalDealStream)ReadDealResponse() (rm.DealResponse, error){ | ||
if trds.failRead { | ||
return rm.DealResponse{}, errors.New("fail ReadDealResponse") | ||
} | ||
return trds.dresp, nil | ||
} | ||
func (trds TestRetrievalDealStream)WriteDealResponse(rm.DealResponse) error{ | ||
if trds.failWrite { | ||
return errors.New("fail WriteDealResponse") | ||
} | ||
return nil | ||
} | ||
|
||
func (trds TestRetrievalDealStream)ReadDealPayment() (rm.DealPayment, error){ | ||
if trds.failRead { | ||
return rm.DealPayment{}, errors.New("fail ReadDealPayment") | ||
} | ||
return trds.dpaym, nil | ||
} | ||
func (trds TestRetrievalDealStream)WriteDealPayment(rm.DealPayment) error{ | ||
if trds.failWrite { | ||
return errors.New("fail WriteDealPayment") | ||
} | ||
return nil | ||
} | ||
|
||
|
||
type TestRetrievalReceiver struct{ | ||
queryStreamHandler func(stream rmnet.RetrievalQueryStream) | ||
retrievalDealHandler func(stream rmnet.RetrievalDealStream) | ||
} | ||
|
||
func NewTestRetrievalReceiver( qsh func(stream rmnet.RetrievalQueryStream), | ||
rdh func(stream rmnet.RetrievalDealStream)) *TestRetrievalReceiver { | ||
return &TestRetrievalReceiver{ queryStreamHandler: qsh, retrievalDealHandler: rdh} | ||
} | ||
|
||
func (trr TestRetrievalReceiver)HandleQueryStream(stream rmnet.RetrievalQueryStream){ | ||
if trr.queryStreamHandler != nil { | ||
trr.queryStreamHandler(stream) | ||
} | ||
} | ||
|
||
func (trr TestRetrievalReceiver)HandleDealStream(stream rmnet.RetrievalDealStream) { | ||
if trr.queryStreamHandler != nil { | ||
trr.retrievalDealHandler(stream) | ||
} | ||
} | ||
|
||
type TestRetrievalMarketNetwork struct{ | ||
netHost host.Host | ||
receiver rmnet.RetrievalReceiver | ||
peers []peer.ID | ||
} | ||
|
||
func NewTestRetrievalMarketNetwork(netHost host.Host, peers []peer.ID) *TestRetrievalMarketNetwork { | ||
return &TestRetrievalMarketNetwork{ netHost:netHost, peers:peers} | ||
} | ||
|
||
func (trmn TestRetrievalMarketNetwork)NewQueryStream(id peer.ID) (rmnet.RetrievalQueryStream, error){ | ||
return nil, nil | ||
} | ||
func (trmn TestRetrievalMarketNetwork)NewDealStream(id peer.ID) (rmnet.RetrievalDealStream, error){ | ||
return nil, nil | ||
} | ||
func (trmn TestRetrievalMarketNetwork)SetDelegate(r rmnet.RetrievalReceiver) error { | ||
trmn.receiver = r | ||
return nil | ||
} |