diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index f8e0db4ee..7a3302906 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -233,7 +233,7 @@ func (c *Client) GetProviderDealState(ctx context.Context, proposalCid cid.Cid) return nil, xerrors.Errorf("could not get client deal state: %w", err) } - s, err := c.net.NewDealStatusStream(deal.Miner) + s, err := c.net.NewDealStatusStream(ctx, deal.Miner) if err != nil { return nil, xerrors.Errorf("failed to open stream to miner: %w", err) } diff --git a/storagemarket/network/libp2p_impl.go b/storagemarket/network/libp2p_impl.go index 5e5074b2b..c83583b29 100644 --- a/storagemarket/network/libp2p_impl.go +++ b/storagemarket/network/libp2p_impl.go @@ -48,8 +48,8 @@ func (impl *libp2pStorageMarketNetwork) NewDealStream(ctx context.Context, id pe return &dealStream{p: id, rw: s, buffered: buffered, host: impl.host}, nil } -func (impl *libp2pStorageMarketNetwork) NewDealStatusStream(id peer.ID) (DealStatusStream, error) { - s, err := impl.host.NewStream(context.Background(), id, storagemarket.DealStatusProtocolID) +func (impl *libp2pStorageMarketNetwork) NewDealStatusStream(ctx context.Context, id peer.ID) (DealStatusStream, error) { + s, err := impl.host.NewStream(ctx, id, storagemarket.DealStatusProtocolID) if err != nil { log.Warn(err) return nil, err diff --git a/storagemarket/network/libp2p_impl_test.go b/storagemarket/network/libp2p_impl_test.go index 4731fc2ab..1e82cd875 100644 --- a/storagemarket/network/libp2p_impl_test.go +++ b/storagemarket/network/libp2p_impl_test.go @@ -298,7 +298,7 @@ func TestDealStatusStreamSendReceiveMultipleSuccessful(t *testing.T) { ctx, cancel := context.WithTimeout(ctxBg, 10*time.Second) defer cancel() - qs, err := nw1.NewDealStatusStream(td.Host2.ID()) + qs, err := nw1.NewDealStatusStream(ctx, td.Host2.ID()) require.NoError(t, err) var resp network.DealStatusResponse @@ -439,7 +439,7 @@ func assertDealStatusRequestReceived(inCtx context.Context, t *testing.T, fromNe ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) defer cancel() - as1, err := fromNetwork.NewDealStatusStream(toHost) + as1, err := fromNetwork.NewDealStatusStream(ctx, toHost) require.NoError(t, err) // send query to host2 @@ -465,7 +465,7 @@ func assertDealStatusResponseReceived(inCtx context.Context, t *testing.T, defer cancel() // setup query stream host1 --> host 2 - as1, err := fromNetwork.NewDealStatusStream(toHost) + as1, err := fromNetwork.NewDealStatusStream(ctx, toHost) require.NoError(t, err) // send queryresponse to host2 diff --git a/storagemarket/network/network.go b/storagemarket/network/network.go index b30550a38..50df0830e 100644 --- a/storagemarket/network/network.go +++ b/storagemarket/network/network.go @@ -56,7 +56,7 @@ type StorageReceiver interface { type StorageMarketNetwork interface { NewAskStream(context.Context, peer.ID) (StorageAskStream, error) NewDealStream(context.Context, peer.ID) (StorageDealStream, error) - NewDealStatusStream(peer.ID) (DealStatusStream, error) + NewDealStatusStream(context.Context, peer.ID) (DealStatusStream, error) SetDelegate(StorageReceiver) error StopHandlingRequests() error ID() peer.ID