Skip to content

Commit

Permalink
Generate internal/retry in otlploggrpc (#5313)
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM committed May 9, 2024
1 parent 654ce01 commit 498292b
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 3 deletions.
6 changes: 3 additions & 3 deletions exporters/otlp/otlplog/otlploggrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
)

// Option applies an option to the Exporter.
Expand All @@ -32,9 +34,7 @@ func newConfig(options []Option) config {
//
// This configuration does not define any network retry strategy. That is
// entirely handled by the gRPC ClientConn.
type RetryConfig struct {
// TODO: implement.
}
type RetryConfig retry.Config

// WithInsecure disables client transport security for the Exporter's gRPC
// connection, just like grpc.WithInsecure()
Expand Down
1 change: 1 addition & 0 deletions exporters/otlp/otlplog/otlploggrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc
go 1.21

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel/sdk/log v0.2.0-alpha
google.golang.org/grpc v1.63.2
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down
7 changes: 7 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/internal/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"

//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry.go.tmpl "--data={}" --out=retry/retry.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry_test.go.tmpl "--data={}" --out=retry/retry_test.go
145 changes: 145 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/retry/retry.go.tmpl

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package retry provides request retry functionality that can perform
// configurable exponential backoff for transient errors and honor any
// explicit throttle responses received.
package retry // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"

import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
)

// DefaultConfig are the recommended defaults to use.
var DefaultConfig = Config{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}

// Config defines configuration for retrying batches in case of export failure
// using an exponential backoff.
type Config struct {
// Enabled indicates whether to not retry sending batches in case of
// export failure.
Enabled bool
// InitialInterval the time to wait after the first failure before
// retrying.
InitialInterval time.Duration
// MaxInterval is the upper bound on backoff interval. Once this value is
// reached the delay between consecutive retries will always be
// `MaxInterval`.
MaxInterval time.Duration
// MaxElapsedTime is the maximum amount of time (including retries) spent
// trying to send a request/batch. Once this value is reached, the data
// is discarded.
MaxElapsedTime time.Duration
}

// RequestFunc wraps a request with retry logic.
type RequestFunc func(context.Context, func(context.Context) error) error

// EvaluateFunc returns if an error is retry-able and if an explicit throttle
// duration should be honored that was included in the error.
//
// The function must return true if the error argument is retry-able,
// otherwise it must return false for the first return parameter.
//
// The function must return a non-zero time.Duration if the error contains
// explicit throttle duration that should be honored, otherwise it must return
// a zero valued time.Duration.
type EvaluateFunc func(error) (bool, time.Duration)

// RequestFunc returns a RequestFunc using the evaluate function to determine
// if requests can be retried and based on the exponential backoff
// configuration of c.
func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc {
if !c.Enabled {
return func(ctx context.Context, fn func(context.Context) error) error {
return fn(ctx)
}
}

return func(ctx context.Context, fn func(context.Context) error) error {
// Do not use NewExponentialBackOff since it calls Reset and the code here
// must call Reset after changing the InitialInterval (this saves an
// unnecessary call to Now).
b := &backoff.ExponentialBackOff{
InitialInterval: c.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: c.MaxInterval,
MaxElapsedTime: c.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b.Reset()

for {
err := fn(ctx)
if err == nil {
return nil
}

retryable, throttle := evaluate(err)
if !retryable {
return err
}

bOff := b.NextBackOff()
if bOff == backoff.Stop {
return fmt.Errorf("max retry time elapsed: %w", err)
}

// Wait for the greater of the backoff or throttle delay.
var delay time.Duration
if bOff > throttle {
delay = bOff
} else {
elapsed := b.GetElapsedTime()
if b.MaxElapsedTime != 0 && elapsed+throttle > b.MaxElapsedTime {
return fmt.Errorf("max retry time would elapse: %w", err)
}
delay = throttle
}

if ctxErr := waitFunc(ctx, delay); ctxErr != nil {
return fmt.Errorf("%w: %s", ctxErr, err)
}
}
}
}

// Allow override for testing.
var waitFunc = wait

// wait takes the caller's context, and the amount of time to wait. It will
// return nil if the timer fires before or at the same time as the context's
// deadline. This indicates that the call can be retried.
func wait(ctx context.Context, delay time.Duration) error {
timer := time.NewTimer(delay)
defer timer.Stop()

select {
case <-ctx.Done():
// Handle the case where the timer and context deadline end
// simultaneously by prioritizing the timer expiration nil value
// response.
select {
case <-timer.C:
default:
return ctx.Err()
}
case <-timer.C:
}

return nil
}
Loading

0 comments on commit 498292b

Please sign in to comment.