Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/make streams context aware #308

Merged
merged 3 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderI
if len(info.Addrs) > 0 {
c.net.AddAddrs(info.PeerID, info.Addrs)
}
s, err := c.net.NewAskStream(info.PeerID)
s, err := c.net.NewAskStream(ctx, info.PeerID)
if err != nil {
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (c *Client) GetProviderDealState(ctx context.Context, proposalCid cid.Cid)
return nil, xerrors.Errorf("could not get client deal state: %w", err)
}

s, err := c.net.NewDealStatusStream(deal.Miner)
s, err := c.net.NewDealStatusStream(ctx, deal.Miner)
if err != nil {
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}
Expand Down Expand Up @@ -518,8 +518,8 @@ type clientDealEnvironment struct {
c *Client
}

func (c *clientDealEnvironment) NewDealStream(p peer.ID) (network.StorageDealStream, error) {
return c.c.net.NewDealStream(p)
func (c *clientDealEnvironment) NewDealStream(ctx context.Context, p peer.ID) (network.StorageDealStream, error) {
return c.c.net.NewDealStream(ctx, p)
}

func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode {
Expand Down
4 changes: 2 additions & 2 deletions storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var log = logging.Logger("storagemarket_impl")
// dependencies from the storage client environment
type ClientDealEnvironment interface {
Node() storagemarket.StorageClientNode
NewDealStream(p peer.ID) (network.StorageDealStream, error)
NewDealStream(ctx context.Context, p peer.ID) (network.StorageDealStream, error)
StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error
GetProviderDealState(ctx context.Context, proposalCid cid.Cid) (*storagemarket.ProviderDealState, error)
PollingInterval() time.Duration
Expand Down Expand Up @@ -83,7 +83,7 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag
FastRetrieval: deal.FastRetrieval,
}

s, err := environment.NewDealStream(deal.Miner)
s, err := environment.NewDealStream(ctx.Context(), deal.Miner)
if err != nil {
return ctx.Trigger(storagemarket.ClientEventWriteProposalFailed, err)
}
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (fe *fakeEnvironment) WriteDealProposal(_ peer.ID, _ cid.Cid, proposal smne
return fe.dealStream.WriteDealProposal(proposal)
}

func (fe *fakeEnvironment) NewDealStream(_ peer.ID) (smnet.StorageDealStream, error) {
func (fe *fakeEnvironment) NewDealStream(_ context.Context, _ peer.ID) (smnet.StorageDealStream, error) {
return fe.dealStream, nil
}

Expand Down
12 changes: 6 additions & 6 deletions storagemarket/network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type libp2pStorageMarketNetwork struct {
receiver StorageReceiver
}

func (impl *libp2pStorageMarketNetwork) NewAskStream(id peer.ID) (StorageAskStream, error) {
s, err := impl.host.NewStream(context.Background(), id, storagemarket.AskProtocolID)
func (impl *libp2pStorageMarketNetwork) NewAskStream(ctx context.Context, id peer.ID) (StorageAskStream, error) {
s, err := impl.host.NewStream(ctx, id, storagemarket.AskProtocolID)
if err != nil {
log.Warn(err)
return nil, err
Expand All @@ -39,17 +39,17 @@ func (impl *libp2pStorageMarketNetwork) NewAskStream(id peer.ID) (StorageAskStre
return &askStream{p: id, rw: s, buffered: buffered}, nil
}

func (impl *libp2pStorageMarketNetwork) NewDealStream(id peer.ID) (StorageDealStream, error) {
s, err := impl.host.NewStream(context.Background(), id, storagemarket.DealProtocolID)
func (impl *libp2pStorageMarketNetwork) NewDealStream(ctx context.Context, id peer.ID) (StorageDealStream, error) {
s, err := impl.host.NewStream(ctx, id, storagemarket.DealProtocolID)
if err != nil {
return nil, err
}
buffered := bufio.NewReaderSize(s, 16)
return &dealStream{p: id, rw: s, buffered: buffered, host: impl.host}, nil
}

func (impl *libp2pStorageMarketNetwork) NewDealStatusStream(id peer.ID) (DealStatusStream, error) {
s, err := impl.host.NewStream(context.Background(), id, storagemarket.DealStatusProtocolID)
func (impl *libp2pStorageMarketNetwork) NewDealStatusStream(ctx context.Context, id peer.ID) (DealStatusStream, error) {
s, err := impl.host.NewStream(ctx, id, storagemarket.DealStatusProtocolID)
if err != nil {
log.Warn(err)
return nil, err
Expand Down
26 changes: 13 additions & 13 deletions storagemarket/network/libp2p_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestAskStreamSendReceiveMultipleSuccessful(t *testing.T) {
ctx, cancel := context.WithTimeout(ctxBg, 10*time.Second)
defer cancel()

qs, err := nw1.NewAskStream(td.Host2.ID())
qs, err := nw1.NewAskStream(ctx, td.Host2.ID())
require.NoError(t, err)

var resp network.AskResponse
Expand Down Expand Up @@ -203,15 +203,15 @@ func TestDealStreamSendReceiveMultipleSuccessful(t *testing.T) {
}}
require.NoError(t, toNetwork.SetDelegate(tr2))

ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second)
defer cancel()

// start sending deal proposal
ds1, err := fromNetwork.NewDealStream(toPeer)
ds1, err := fromNetwork.NewDealStream(ctx, toPeer)
require.NoError(t, err)

dp := shared_testutil.MakeTestStorageNetworkProposal()

ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second)
defer cancel()

// write proposal
require.NoError(t, ds1.WriteDealProposal(dp))

Expand Down Expand Up @@ -298,7 +298,7 @@ func TestDealStatusStreamSendReceiveMultipleSuccessful(t *testing.T) {
ctx, cancel := context.WithTimeout(ctxBg, 10*time.Second)
defer cancel()

qs, err := nw1.NewDealStatusStream(td.Host2.ID())
qs, err := nw1.NewDealStatusStream(ctx, td.Host2.ID())
require.NoError(t, err)

var resp network.DealStatusResponse
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestLibp2pStorageMarketNetwork_StopHandlingRequests(t *testing.T) {

require.NoError(t, toNetwork.StopHandlingRequests())

_, err := fromNetwork.NewAskStream(toHost)
_, err := fromNetwork.NewAskStream(bgCtx, toHost)
require.Error(t, err, "protocol not supported")
}

Expand All @@ -347,7 +347,7 @@ func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork
ctx, cancel := context.WithTimeout(inCtx, 10*time.Second)
defer cancel()

qs1, err := fromNetwork.NewDealStream(toPeer)
qs1, err := fromNetwork.NewDealStream(ctx, toPeer)
require.NoError(t, err)

// send query to host2
Expand All @@ -368,7 +368,7 @@ func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNet
ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second)
defer cancel()

ds1, err := fromNetwork.NewDealStream(toPeer)
ds1, err := fromNetwork.NewDealStream(ctx, toPeer)
require.NoError(t, err)

dr := shared_testutil.MakeTestStorageNetworkSignedResponse()
Expand All @@ -389,7 +389,7 @@ func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork n
ctx, cancel := context.WithTimeout(inCtx, 10*time.Second)
defer cancel()

as1, err := fromNetwork.NewAskStream(toHost)
as1, err := fromNetwork.NewAskStream(ctx, toHost)
require.NoError(t, err)

// send query to host2
Expand All @@ -415,7 +415,7 @@ func assertAskResponseReceived(inCtx context.Context, t *testing.T,
defer cancel()

// setup query stream host1 --> host 2
as1, err := fromNetwork.NewAskStream(toHost)
as1, err := fromNetwork.NewAskStream(ctx, toHost)
require.NoError(t, err)

// send queryresponse to host2
Expand All @@ -439,7 +439,7 @@ func assertDealStatusRequestReceived(inCtx context.Context, t *testing.T, fromNe
ctx, cancel := context.WithTimeout(inCtx, 10*time.Second)
defer cancel()

as1, err := fromNetwork.NewDealStatusStream(toHost)
as1, err := fromNetwork.NewDealStatusStream(ctx, toHost)
require.NoError(t, err)

// send query to host2
Expand All @@ -465,7 +465,7 @@ func assertDealStatusResponseReceived(inCtx context.Context, t *testing.T,
defer cancel()

// setup query stream host1 --> host 2
as1, err := fromNetwork.NewDealStatusStream(toHost)
as1, err := fromNetwork.NewDealStatusStream(ctx, toHost)
require.NoError(t, err)

// send queryresponse to host2
Expand Down
8 changes: 5 additions & 3 deletions storagemarket/network/network.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package network

import (
"context"

"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -52,9 +54,9 @@ type StorageReceiver interface {

// StorageMarketNetwork is a network abstraction for the storage market
type StorageMarketNetwork interface {
NewAskStream(peer.ID) (StorageAskStream, error)
NewDealStream(peer.ID) (StorageDealStream, error)
NewDealStatusStream(peer.ID) (DealStatusStream, error)
NewAskStream(context.Context, peer.ID) (StorageAskStream, error)
NewDealStream(context.Context, peer.ID) (StorageDealStream, error)
NewDealStatusStream(context.Context, peer.ID) (DealStatusStream, error)
SetDelegate(StorageReceiver) error
StopHandlingRequests() error
ID() peer.ID
Expand Down