Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable send message timeouts #136

Merged
merged 1 commit into from
Jan 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (

var log = logging.Logger("data_transfer_network")

var sendMessageTimeout = time.Minute * 10
// The maximum amount of time to wait to open a stream
const defaultOpenStreamTimeout = 10 * time.Second

// The maximum time to wait for a message to be sent
var defaultSendMessageTimeout = 10 * time.Second

// The max number of attempts to open a stream
const defaultMaxStreamOpenAttempts = 5
Expand All @@ -48,6 +52,14 @@ func DataTransferProtocols(protocols []protocol.ID) Option {
}
}

// SendMessageParameters changes the default parameters around sending messages
func SendMessageParameters(openStreamTimeout time.Duration, sendMessageTimeout time.Duration) Option {
return func(impl *libp2pDataTransferNetwork) {
impl.sendMessageTimeout = sendMessageTimeout
impl.openStreamTimeout = openStreamTimeout
}
}

// RetryParameters changes the default parameters around connection reopening
func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) Option {
return func(impl *libp2pDataTransferNetwork) {
Expand All @@ -63,6 +75,8 @@ func NewFromLibp2pHost(host host.Host, options ...Option) DataTransferNetwork {
dataTransferNetwork := libp2pDataTransferNetwork{
host: host,

openStreamTimeout: defaultOpenStreamTimeout,
sendMessageTimeout: defaultSendMessageTimeout,
maxStreamOpenAttempts: defaultMaxStreamOpenAttempts,
minAttemptDuration: defaultMinAttemptDuration,
maxAttemptDuration: defaultMaxAttemptDuration,
Expand All @@ -84,6 +98,8 @@ type libp2pDataTransferNetwork struct {
// inbound messages from the network are forwarded to the receiver
receiver Receiver

openStreamTimeout time.Duration
sendMessageTimeout time.Duration
maxStreamOpenAttempts float64
minAttemptDuration time.Duration
maxAttemptDuration time.Duration
Expand All @@ -100,8 +116,12 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I
}

for {
tctx, cancel := context.WithTimeout(ctx, impl.openStreamTimeout)
defer cancel()

// will use the first among the given protocols that the remote peer supports
s, err := impl.host.NewStream(ctx, id, protocols...)
at := time.Now()
s, err := impl.host.NewStream(tctx, id, protocols...)
if err == nil {
return s, err
}
Expand All @@ -113,8 +133,8 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I
}

d := b.Duration()
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %s",
id, nAttempts, impl.maxStreamOpenAttempts, d, err)
log.Warnf("failed to open stream to %s on attempt %g of %g after %s, waiting %s to try again, err: %s",
id, nAttempts, impl.maxStreamOpenAttempts, time.Since(at), d, err)

select {
case <-ctx.Done():
Expand All @@ -139,7 +159,7 @@ func (dtnet *libp2pDataTransferNetwork) SendMessage(
return xerrors.Errorf("failed to convert message for protocol: %w", err)
}

if err = msgToStream(ctx, s, outgoing); err != nil {
if err = dtnet.msgToStream(ctx, s, outgoing); err != nil {
if err2 := s.Reset(); err2 != nil {
log.Error(err)
return err2
Expand Down Expand Up @@ -222,12 +242,12 @@ func (dtnet *libp2pDataTransferNetwork) Unprotect(id peer.ID, tag string) bool {
return dtnet.host.ConnManager().Unprotect(id, tag)
}

func msgToStream(ctx context.Context, s network.Stream, msg datatransfer.Message) error {
func (dtnet *libp2pDataTransferNetwork) msgToStream(ctx context.Context, s network.Stream, msg datatransfer.Message) error {
if msg.IsRequest() {
log.Debugf("Outgoing request message for transfer ID: %d", msg.TransferID())
}

deadline := time.Now().Add(sendMessageTimeout)
deadline := time.Now().Add(dtnet.sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
Expand Down