Skip to content

Commit

Permalink
feat(opentelemetry): Instrumentalize http calls
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 23, 2022
1 parent 02ed410 commit 4fa9c54
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 43 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ require (
github.com/xdg-go/scram v1.1.1
go.mongodb.org/mongo-driver v1.10.1
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4
go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/trace v1.11.1
go.uber.org/dig v1.15.0
go.uber.org/fx v1.18.1
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
Expand Down Expand Up @@ -105,8 +107,8 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1 // indirect
go.opentelemetry.io/otel/metric v0.33.0 // indirect
go.opentelemetry.io/otel/sdk v1.11.1 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.34.0/go.mod h1:8cfNbNK5aJIRQnqOFGEALF17iZPDym2mmYP+ECIxi8M=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0 h1:OkXMRbgldT4yZR7RwB4SFYTjYJGTXwPQVX69pYtTnc4=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0/go.mod h1:zMu+r6aEorSQi8Ad0Y1fNrznm+VM8F10D2WlZp3HeFw=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4 h1:aUEBEdCa6iamGzg6fuYxDA8ThxvOG240mAvWDU+XLio=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4/go.mod h1:l2MdsbKTocpPS5nQZscqTR9jd8u96VYZdcpF8Sye7mA=
go.opentelemetry.io/contrib/propagators/b3 v1.11.1 h1:icQ6ttRV+r/2fnU46BIo/g/mPu6Rs5Ug8Rtohe3KqzI=
go.opentelemetry.io/contrib/propagators/b3 v1.11.1/go.mod h1:ECIveyMXgnl4gorxFcA7RYjJY/Ql9n20ubhbfDc3QfA=
go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4=
Expand All @@ -521,6 +523,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1 h1:tFl63
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1/go.mod h1:X620Jww3RajCJXw/unA+8IRTgxkdS7pi+ZwK9b7KUJk=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1 h1:3Yvzs7lgOw8MmbxmLRsQGwYdCubFmUHSooKaEhQunFQ=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1/go.mod h1:pyHDt0YlyuENkD2VwHsiRDf+5DfI3EH7pfhUYW6sQUE=
go.opentelemetry.io/otel/metric v0.33.0 h1:xQAyl7uGEYvrLAiV/09iTJlp1pZnQ9Wl793qbVvED1E=
go.opentelemetry.io/otel/metric v0.33.0/go.mod h1:QlTYc+EnYNq/M2mNk1qDDMRLpqCOj2f/r5c7Fd5FYaI=
go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs=
go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys=
go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ=
Expand Down
32 changes: 21 additions & 11 deletions internal/pkg/connectors/bankingcircle/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package bankingcircle

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/formancehq/go-libs/sharedlogging"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type client struct {
Expand All @@ -25,9 +27,16 @@ type client struct {
accessTokenExpiresAt time.Time
}

func newHTTPClient() *http.Client {
return &http.Client{
Timeout: 10 * time.Second,
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
}

func newClient(username, password, endpoint, authorizationEndpoint string, logger sharedlogging.Logger) (*client, error) {
c := &client{
httpClient: &http.Client{Timeout: 10 * time.Second},
httpClient: newHTTPClient(),

username: username,
password: password,
Expand All @@ -37,15 +46,16 @@ func newClient(username, password, endpoint, authorizationEndpoint string, logge
logger: logger,
}

if err := c.login(); err != nil {
if err := c.login(context.TODO()); err != nil {
return nil, err
}

return c, nil
}

func (c *client) login() error {
req, err := http.NewRequest(http.MethodGet, c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody)
func (c *client) login(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody)
if err != nil {
return fmt.Errorf("failed to create login request: %w", err)
}
Expand Down Expand Up @@ -87,12 +97,12 @@ func (c *client) login() error {
return nil
}

func (c *client) ensureAccessTokenIsValid() error {
func (c *client) ensureAccessTokenIsValid(ctx context.Context) error {
if c.accessTokenExpiresAt.After(time.Now()) {
return nil
}

return c.login()
return c.login(ctx)
}

//nolint:tagliatelle // allow for client-side structures
Expand Down Expand Up @@ -164,11 +174,11 @@ type payment struct {
} `json:"creditorInformation"`
}

func (c *client) getAllPayments() ([]*payment, error) {
func (c *client) getAllPayments(ctx context.Context) ([]*payment, error) {
var payments []*payment

for page := 0; ; page++ {
pagedPayments, err := c.getPayments(page)
pagedPayments, err := c.getPayments(ctx, page)
if err != nil {
return nil, err
}
Expand All @@ -183,12 +193,12 @@ func (c *client) getAllPayments() ([]*payment, error) {
return payments, nil
}

func (c *client) getPayments(page int) ([]*payment, error) {
if err := c.ensureAccessTokenIsValid(); err != nil {
func (c *client) getPayments(ctx context.Context, page int) ([]*payment, error) {
if err := c.ensureAccessTokenIsValid(ctx); err != nil {
return nil, err
}

req, err := http.NewRequest(http.MethodGet, c.endpoint+"/api/v1/payments/singles", http.NoBody)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+"/api/v1/payments/singles", http.NoBody)
if err != nil {
return nil, fmt.Errorf("failed to create login request: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func taskFetchPayments(logger sharedlogging.Logger, client *client) task.Task {
scheduler task.Scheduler[TaskDescriptor],
ingester ingestion.Ingester,
) error {
paymentsList, err := client.getAllPayments()
paymentsList, err := client.getAllPayments(ctx)
if err != nil {
return err
}
Expand Down
26 changes: 22 additions & 4 deletions internal/pkg/connectors/currencycloud/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"context"
"fmt"
"net/http"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type apiTransport struct {
authToken string
authToken string
underlying *otelhttp.Transport
}

func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Add("X-Auth-Token", t.authToken)

return http.DefaultTransport.RoundTrip(req)
return t.underlying.RoundTrip(req)
}

type Client struct {
Expand All @@ -29,14 +32,29 @@ func (c *Client) buildEndpoint(path string, args ...interface{}) string {

const devAPIEndpoint = "https://devapi.currencycloud.com"

func newAuthenticatedHTTPClient(authToken string) *http.Client {
return &http.Client{
Transport: &apiTransport{
authToken: authToken,
underlying: otelhttp.NewTransport(http.DefaultTransport),
},
}
}

func newHTTPClient() *http.Client {
return &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
}

// NewClient creates a new client for the CurrencyCloud API.
func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client, error) {
if endpoint == "" {
endpoint = devAPIEndpoint
}

c := &Client{
httpClient: &http.Client{},
httpClient: newHTTPClient(),
endpoint: endpoint,
loginID: loginID,
apiKey: apiKey,
Expand All @@ -47,7 +65,7 @@ func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client,
return nil, err
}

c.httpClient.Transport = &apiTransport{authToken: authToken}
c.httpClient = newAuthenticatedHTTPClient(authToken)

return c, nil
}
13 changes: 8 additions & 5 deletions internal/pkg/connectors/modulr/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
"net/http"

"github.com/formancehq/payments/internal/pkg/connectors/modulr/hmac"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type apiTransport struct {
apiKey string
headers map[string]string
apiKey string
headers map[string]string
underlying http.RoundTripper
}

func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Add("Authorization", t.apiKey)

return http.DefaultTransport.RoundTrip(req)
return t.underlying.RoundTrip(req)
}

type responseWrapper[t any] struct {
Expand Down Expand Up @@ -50,8 +52,9 @@ func NewClient(apiKey, apiSecret, endpoint string) (*Client, error) {
return &Client{
httpClient: &http.Client{
Transport: &apiTransport{
headers: headers,
apiKey: apiKey,
headers: headers,
apiKey: apiKey,
underlying: otelhttp.NewTransport(http.DefaultTransport),
},
},
endpoint: endpoint,
Expand Down
15 changes: 10 additions & 5 deletions internal/pkg/connectors/stripe/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"

"github.com/formancehq/payments/internal/pkg/writeonly"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/pkg/errors"
"github.com/stripe/stripe-go/v72"
Expand Down Expand Up @@ -51,13 +52,11 @@ func (d *DefaultClient) ForAccount(account string) Client {
func (d *DefaultClient) BalanceTransactions(ctx context.Context,
options ...ClientOption,
) ([]*stripe.BalanceTransaction, bool, error) {
req, err := http.NewRequest(http.MethodGet, balanceTransactionsEndpoint, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, balanceTransactionsEndpoint, nil)
if err != nil {
return nil, false, errors.Wrap(err, "creating http request")
}

req = req.WithContext(ctx)

for _, opt := range options {
opt.apply(req)
}
Expand Down Expand Up @@ -127,9 +126,15 @@ func (d *DefaultClient) BalanceTransactions(ctx context.Context,
return asBalanceTransactions, rsp.HasMore, nil
}

func NewDefaultClient(httpClient *http.Client, apiKey string, storage writeonly.Storage) *DefaultClient {
func newHTTPClient() *http.Client {
return &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
}

func NewDefaultClient(apiKey string, storage writeonly.Storage) *DefaultClient {
return &DefaultClient{
httpClient: httpClient,
httpClient: newHTTPClient(),
apiKey: apiKey,
storage: storage,
}
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/connectors/stripe/task_connected_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stripe

import (
"context"
"net/http"

"github.com/formancehq/payments/internal/pkg/ingestion"
"github.com/formancehq/payments/internal/pkg/task"
Expand Down Expand Up @@ -57,7 +56,7 @@ func ConnectedAccountTask(config Config, account string) func(ctx context.Contex
IngesterFn(func(ctx context.Context, bts []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error {
return ingestBatch(ctx, logger, ingester, bts, commitState, tail)
}),
NewTimeline(NewDefaultClient(http.DefaultClient, config.APIKey, storage).
NewTimeline(NewDefaultClient(config.APIKey, storage).
ForAccount(account), config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})),
)

Expand Down
6 changes: 2 additions & 4 deletions internal/pkg/connectors/stripe/task_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package stripe

import (
"context"
"net/http"

"github.com/formancehq/go-libs/sharedlogging"
"github.com/formancehq/payments/internal/pkg/ingestion"
"github.com/formancehq/payments/internal/pkg/task"
"github.com/formancehq/payments/internal/pkg/writeonly"

"github.com/formancehq/go-libs/sharedlogging"
"github.com/pkg/errors"
"github.com/stripe/stripe-go/v72"
)
Expand Down Expand Up @@ -62,7 +60,7 @@ func MainTask(config Config) func(ctx context.Context, logger sharedlogging.Logg
) error {
return ingest(ctx, logger, scheduler, ingester, batch, commitState, tail)
}),
NewTimeline(NewDefaultClient(http.DefaultClient, config.APIKey, storage),
NewTimeline(NewDefaultClient(config.APIKey, storage),
config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})),
),
config.PollingPeriod,
Expand Down
14 changes: 9 additions & 5 deletions internal/pkg/connectors/wise/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ import (
"fmt"
"io"
"net/http"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

const apiEndpoint = "https://api.wise.com"

type apiTransport struct {
APIKey string
APIKey string
underlying http.RoundTripper
}

func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.APIKey))

return http.DefaultTransport.RoundTrip(req)
return t.underlying.RoundTrip(req)
}

type client struct {
Expand Down Expand Up @@ -77,14 +80,14 @@ func (w *client) getProfiles() ([]profile, error) {
return profiles, nil
}

func (w *client) getTransfers(profile *profile) ([]transfer, error) {
func (w *client) getTransfers(ctx context.Context, profile *profile) ([]transfer, error) {
var transfers []transfer

limit := 10
offset := 0

for {
req, err := http.NewRequestWithContext(context.TODO(),
req, err := http.NewRequestWithContext(ctx,
http.MethodGet, w.endpoint("v1/transfers"), http.NoBody)
if err != nil {
return transfers, err
Expand Down Expand Up @@ -134,7 +137,8 @@ func (w *client) getTransfers(profile *profile) ([]transfer, error) {
func newClient(apiKey string) *client {
httpClient := &http.Client{
Transport: &apiTransport{
APIKey: apiKey,
APIKey: apiKey,
underlying: otelhttp.NewTransport(http.DefaultTransport),
},
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/connectors/wise/task_fetch_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func taskFetchTransfers(logger sharedlogging.Logger, client *client, profileID u
scheduler task.Scheduler[TaskDescriptor],
ingester ingestion.Ingester,
) error {
transfers, err := client.getTransfers(&profile{
transfers, err := client.getTransfers(ctx, &profile{
ID: profileID,
})
if err != nil {
Expand Down
Loading

0 comments on commit 4fa9c54

Please sign in to comment.