Skip to content

Commit

Permalink
Retry with exponential backoff for stream opening (#382)
Browse files Browse the repository at this point in the history
* retry stream opening

* tests

* changes as per review

* fix test and lint

* add lint to Makefile
  • Loading branch information
aarshkshah1992 authored Sep 2, 2020
1 parent 0b14443 commit 90b72dd
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 24 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ cbor-gen: FORCE
tidy: FORCE
go mod tidy

prepare-pr: cbor-gen tidy diagrams imports
lint: FORCE
git fetch
golangci-lint run -v --concurrency 2 --new-from-rev origin/master

prepare-pr: cbor-gen tidy diagrams imports lint
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae
github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jpillora/backoff v1.0.0
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/multiformats/go-multiaddr v0.2.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
Expand Down
3 changes: 2 additions & 1 deletion retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func TestProvider_Stop(t *testing.T) {
client, expectedCIDs, _, _, retrievalPeer, provider := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)
require.NoError(t, provider.Stop())
_, err := client.Query(bgCtx, retrievalPeer, expectedCIDs[0], retrievalmarket.QueryParams{})
assert.EqualError(t, err, "protocol not supported")

assert.EqualError(t, err, "exhausted 5 attempts but failed to open stream, err: protocol not supported")
}

func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payChAddr address.Address) (retrievalmarket.RetrievalClient,
Expand Down
30 changes: 29 additions & 1 deletion retrievalmarket/network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import (
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/jpillora/backoff"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)

const maxStreamOpenAttempts = 5

var log = logging.Logger("retrieval_network")
var _ RetrievalMarketNetwork = new(libp2pRetrievalMarketNetwork)

Expand All @@ -34,7 +39,7 @@ type libp2pRetrievalMarketNetwork struct {

// NewQueryStream creates a new RetrievalQueryStream using the provided peer.ID
func (impl *libp2pRetrievalMarketNetwork) NewQueryStream(id peer.ID) (RetrievalQueryStream, error) {
s, err := impl.host.NewStream(context.Background(), id, retrievalmarket.QueryProtocolID)
s, err := impl.openStream(context.Background(), id, retrievalmarket.QueryProtocolID)
if err != nil {
log.Warn(err)
return nil, err
Expand All @@ -43,6 +48,29 @@ func (impl *libp2pRetrievalMarketNetwork) NewQueryStream(id peer.ID) (RetrievalQ
return &queryStream{p: id, rw: s, buffered: buffered}, nil
}

func (impl *libp2pRetrievalMarketNetwork) openStream(ctx context.Context, id peer.ID, protocol protocol.ID) (network.Stream, error) {
b := &backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 10 * time.Second,
Factor: 5,
Jitter: true,
}

for {
s, err := impl.host.NewStream(ctx, id, protocol)
if err == nil {
return s, err
}

nAttempts := b.Attempt()
if nAttempts == maxStreamOpenAttempts {
return nil, xerrors.Errorf("exhausted %d attempts but failed to open stream, err: %w", maxStreamOpenAttempts, err)
}
d := b.Duration()
time.Sleep(d)
}
}

// SetDelegate sets a RetrievalReceiver to handle stream data
func (impl *libp2pRetrievalMarketNetwork) SetDelegate(r RetrievalReceiver) error {
impl.receiver = r
Expand Down
3 changes: 0 additions & 3 deletions retrievalmarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (

//go:generate cbor-gen-for Query QueryResponse DealProposal DealResponse Params QueryParams DealPayment ClientDealState ProviderDealState PaymentInfo RetrievalPeer Ask

// ProtocolID is the protocol for proposing / responding to retrieval deals
const ProtocolID = "/fil/retrieval/0.0.1"

// QueryProtocolID is the protocol for querying information about retrieval
// deal parameters
const QueryProtocolID = "/fil/retrieval/qry/0.0.1"
Expand Down
40 changes: 25 additions & 15 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,21 @@ func TestRestartClient(t *testing.T) {
assert.NoError(t, err)
assert.NotEqual(t, storagemarket.StorageDealActive, cd.State)

h = newHarnessWithTestData(t, ctx, h.TestData, h.SMState, true)
h = newHarnessWithTestData(t, ctx, h.TestData, h.SMState, true, h.TempFilePath)

// deal could have expired already on the provider side for the `ClientEventDealAccepted` event
// so, we should wait on the `ProviderEventDealExpired` event ONLY if the deal has not expired.
providerState, err := h.Provider.ListLocalDeals()
assert.NoError(t, err)

if len(providerState) == 0 || providerState[0].State != storagemarket.StorageDealExpired {
wg.Add(1)
_ = h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
if event == storagemarket.ProviderEventDealExpired {
wg.Done()
}
})
}

wg.Add(1)
_ = h.Client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
Expand All @@ -331,13 +345,6 @@ func TestRestartClient(t *testing.T) {
}
})

wg.Add(1)
_ = h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
if event == storagemarket.ProviderEventDealExpired {
wg.Done()
}
})

require.NoError(t, h.Provider.Start(ctx))
require.NoError(t, h.Client.Start(ctx))

Expand Down Expand Up @@ -371,14 +378,15 @@ type harness struct {
SMState *testnodes.StorageMarketState
ProviderInfo storagemarket.StorageProviderInfo
TestData *shared_testutil.Libp2pTestData
TempFilePath string
}

func newHarness(t *testing.T, ctx context.Context, useStore bool) *harness {
smState := testnodes.NewStorageMarketState()
return newHarnessWithTestData(t, ctx, shared_testutil.NewLibp2pTestData(ctx, t), smState, useStore)
return newHarnessWithTestData(t, ctx, shared_testutil.NewLibp2pTestData(ctx, t), smState, useStore, "")
}

func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testutil.Libp2pTestData, smState *testnodes.StorageMarketState, useStore bool) *harness {
func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testutil.Libp2pTestData, smState *testnodes.StorageMarketState, useStore bool, tempPath string) *harness {
epoch := abi.ChainEpoch(100)
fpath := filepath.Join("storagemarket", "fixtures", "payload.txt")
var rootLink ipld.Link
Expand All @@ -405,8 +413,12 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut
assert.NoError(t, err)

providerAddr := address.TestAddress2
tempPath, err := ioutil.TempDir("", "storagemarket_test")
assert.NoError(t, err)

if len(tempPath) == 0 {
tempPath, err = ioutil.TempDir("", "storagemarket_test")
assert.NoError(t, err)
}

ps := piecestore.NewPieceStore(td.Ds2)
providerNode := &testnodes.FakeProviderNode{
FakeCommonNode: testnodes.FakeCommonNode{
Expand Down Expand Up @@ -470,9 +482,6 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut
err = provider.SetAsk(big.NewInt(0), big.NewInt(0), 50_000)
assert.NoError(t, err)

err = provider.Start(ctx)
assert.NoError(t, err)

// Closely follows the MinerInfo struct in the spec
providerInfo := storagemarket.StorageProviderInfo{
Address: providerAddr,
Expand All @@ -497,6 +506,7 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut
ProviderInfo: providerInfo,
TestData: td,
SMState: smState,
TempFilePath: tempPath,
}
}

Expand Down
34 changes: 31 additions & 3 deletions storagemarket/network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import (
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/jpillora/backoff"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/storagemarket"
)

const maxStreamOpenAttempts = 4

var log = logging.Logger("storagemarket_network")

// NewFromLibp2pHost builds a storage market network on top of libp2p
Expand All @@ -30,7 +35,7 @@ type libp2pStorageMarketNetwork struct {
}

func (impl *libp2pStorageMarketNetwork) NewAskStream(ctx context.Context, id peer.ID) (StorageAskStream, error) {
s, err := impl.host.NewStream(ctx, id, storagemarket.AskProtocolID)
s, err := impl.openStream(ctx, id, storagemarket.AskProtocolID)
if err != nil {
log.Warn(err)
return nil, err
Expand All @@ -40,7 +45,7 @@ func (impl *libp2pStorageMarketNetwork) NewAskStream(ctx context.Context, id pee
}

func (impl *libp2pStorageMarketNetwork) NewDealStream(ctx context.Context, id peer.ID) (StorageDealStream, error) {
s, err := impl.host.NewStream(ctx, id, storagemarket.DealProtocolID)
s, err := impl.openStream(ctx, id, storagemarket.DealProtocolID)
if err != nil {
return nil, err
}
Expand All @@ -49,7 +54,7 @@ func (impl *libp2pStorageMarketNetwork) NewDealStream(ctx context.Context, id pe
}

func (impl *libp2pStorageMarketNetwork) NewDealStatusStream(ctx context.Context, id peer.ID) (DealStatusStream, error) {
s, err := impl.host.NewStream(ctx, id, storagemarket.DealStatusProtocolID)
s, err := impl.openStream(ctx, id, storagemarket.DealStatusProtocolID)
if err != nil {
log.Warn(err)
return nil, err
Expand All @@ -58,6 +63,29 @@ func (impl *libp2pStorageMarketNetwork) NewDealStatusStream(ctx context.Context,
return &dealStatusStream{p: id, rw: s, buffered: buffered}, nil
}

func (impl *libp2pStorageMarketNetwork) openStream(ctx context.Context, id peer.ID, protocol protocol.ID) (network.Stream, error) {
b := &backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 10 * time.Second,
Factor: 5,
Jitter: true,
}

for {
s, err := impl.host.NewStream(ctx, id, protocol)
if err == nil {
return s, err
}

nAttempts := b.Attempt()
if nAttempts == maxStreamOpenAttempts {
return nil, xerrors.Errorf("exhausted %d attempts but failed to open stream, err: %w", maxStreamOpenAttempts, err)
}
d := b.Duration()
time.Sleep(d)
}
}

func (impl *libp2pStorageMarketNetwork) SetDelegate(r StorageReceiver) error {
impl.receiver = r
impl.host.SetStreamHandler(storagemarket.DealProtocolID, impl.handleNewDealStream)
Expand Down
37 changes: 37 additions & 0 deletions storagemarket/network/libp2p_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,43 @@ func (tr *testReceiver) HandleDealStatusStream(s network.DealStatusStream) {
}
}

func TestOpenStreamWithRetries(t *testing.T) {
ctx := context.Background()
td := shared_testutil.NewLibp2pTestData(ctx, t)

fromNetwork := network.NewFromLibp2pHost(td.Host1)
toNetwork := network.NewFromLibp2pHost(td.Host2)
toHost := td.Host2.ID()

// host1 gets no-op receiver
tr := &testReceiver{t: t}
require.NoError(t, fromNetwork.SetDelegate(tr))

// host2 gets a receiver that will start after some time -> so we can verify exponential backoff kicks in
require.NoError(t, td.Host2.Close())
achan := make(chan network.AskRequest)
tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) {
readq, err := s.ReadAskRequest()
require.NoError(t, err)
achan <- readq
}}

var err error

go func() {
select {
case <-time.After(3 * time.Second):
err = toNetwork.SetDelegate(tr2)
case <-ctx.Done():
return
}
}()

// setup query stream host1 --> host 2
assertAskRequestReceived(ctx, t, fromNetwork, toHost, achan)
assert.NoError(t, err)
}

func TestAskStreamSendReceiveAskRequest(t *testing.T) {
ctx := context.Background()
td := shared_testutil.NewLibp2pTestData(ctx, t)
Expand Down

0 comments on commit 90b72dd

Please sign in to comment.