diff --git a/cmd/server/app/serve.go b/cmd/server/app/serve.go index 63f2d6d230..289a7df8ab 100644 --- a/cmd/server/app/serve.go +++ b/cmd/server/app/serve.go @@ -34,6 +34,7 @@ import ( "github.com/stacklok/mediator/internal/engine" "github.com/stacklok/mediator/internal/events" "github.com/stacklok/mediator/internal/logger" + provtelemetry "github.com/stacklok/mediator/internal/providers/telemetry" "github.com/stacklok/mediator/internal/reconcilers" ) @@ -101,20 +102,21 @@ var serveCmd = &cobra.Command{ } serverMetrics := controlplane.NewMetrics() + providerMetrics := provtelemetry.NewProviderMetrics() - s, err := controlplane.NewServer(store, evt, serverMetrics, cfg, vldtr) + s, err := controlplane.NewServer(store, evt, serverMetrics, cfg, vldtr, controlplane.WithProviderMetrics(providerMetrics)) if err != nil { return fmt.Errorf("unable to create server: %w", err) } - exec, err := engine.NewExecutor(store, &cfg.Auth) + exec, err := engine.NewExecutor(store, &cfg.Auth, engine.WithProviderMetrics(providerMetrics)) if err != nil { return fmt.Errorf("unable to create executor: %w", err) } s.ConsumeEvents(exec) - rec, err := reconcilers.NewReconciler(store, evt, &cfg.Auth) + rec, err := reconcilers.NewReconciler(store, evt, &cfg.Auth, reconcilers.WithProviderMetrics(providerMetrics)) if err != nil { return fmt.Errorf("unable to create reconciler: %w", err) } diff --git a/internal/controlplane/handlers_githubwebhooks.go b/internal/controlplane/handlers_githubwebhooks.go index cedfbcf69d..7d3aac97f2 100644 --- a/internal/controlplane/handlers_githubwebhooks.go +++ b/internal/controlplane/handlers_githubwebhooks.go @@ -398,7 +398,10 @@ func (s *Server) parseGithubEventForProcessing( return fmt.Errorf("error getting provider: %w", err) } - provBuilder, err := providers.GetProviderBuilder(ctx, prov, dbRepo.ProjectID, s.store, s.cryptoEngine) + pbOpts := []providers.ProviderBuilderOption{ + providers.WithProviderMetrics(s.provMt), + } + provBuilder, err := providers.GetProviderBuilder(ctx, prov, dbRepo.ProjectID, s.store, s.cryptoEngine, pbOpts...) if err != nil { return fmt.Errorf("error building client: %w", err) } diff --git a/internal/controlplane/handlers_repositories.go b/internal/controlplane/handlers_repositories.go index 513db5a998..ed6e8ae80b 100644 --- a/internal/controlplane/handlers_repositories.go +++ b/internal/controlplane/handlers_repositories.go @@ -71,7 +71,10 @@ func (s *Server) RegisterRepository(ctx context.Context, return nil, providerError(fmt.Errorf("provider error: %w", err)) } - p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine) + pbOpts := []providers.ProviderBuilderOption{ + providers.WithProviderMetrics(s.provMt), + } + p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, pbOpts...) if err != nil { return nil, status.Errorf(codes.Internal, "cannot get provider builder: %v", err) } @@ -452,7 +455,10 @@ func (s *Server) ListRemoteRepositoriesFromProvider( return nil, status.Errorf(codes.PermissionDenied, "cannot get access token for provider") } - p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine) + pbOpts := []providers.ProviderBuilderOption{ + providers.WithProviderMetrics(s.provMt), + } + p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, pbOpts...) if err != nil { return nil, status.Errorf(codes.Internal, "cannot get provider builder: %v", err) } diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index 3985c0e52d..c0aca8acfa 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -50,6 +50,7 @@ import ( "github.com/stacklok/mediator/internal/db" "github.com/stacklok/mediator/internal/events" "github.com/stacklok/mediator/internal/logger" + provtelemetry "github.com/stacklok/mediator/internal/providers/telemetry" "github.com/stacklok/mediator/internal/util" legacy "github.com/stacklok/mediator/pkg/api/protobuf/go/mediator/v1" pb "github.com/stacklok/mediator/pkg/api/protobuf/go/minder/v1" @@ -67,6 +68,7 @@ type Server struct { cfg *config.Config evt *events.Eventer mt *metrics + provMt provtelemetry.ProviderMetrics grpcServer *grpc.Server vldtr auth.JwtValidator pb.UnimplementedHealthServiceServer @@ -89,20 +91,44 @@ type Server struct { cryptoEngine *crypto.Engine } +// ServerOption is a function that modifies a server +type ServerOption func(*Server) + +// WithProviderMetrics sets the provider metrics for the server +func WithProviderMetrics(mt provtelemetry.ProviderMetrics) ServerOption { + return func(s *Server) { + s.provMt = mt + } +} + // NewServer creates a new server instance -func NewServer(store db.Store, evt *events.Eventer, cpm *metrics, cfg *config.Config, vldtr auth.JwtValidator) (*Server, error) { +func NewServer( + store db.Store, + evt *events.Eventer, + cpm *metrics, + cfg *config.Config, + vldtr auth.JwtValidator, + opts ...ServerOption, +) (*Server, error) { eng, err := crypto.EngineFromAuthConfig(&cfg.Auth) if err != nil { return nil, fmt.Errorf("failed to create crypto engine: %w", err) } - return &Server{ + s := &Server{ store: store, cfg: cfg, evt: evt, cryptoEngine: eng, vldtr: vldtr, mt: cpm, - }, nil + provMt: provtelemetry.NewNoopMetrics(), + } + + for _, opt := range opts { + opt(s) + } + + return s, nil } var _ (events.Registrar) = (*Server)(nil) diff --git a/internal/engine/executor.go b/internal/engine/executor.go index 29812ef600..00c6ca8141 100644 --- a/internal/engine/executor.go +++ b/internal/engine/executor.go @@ -30,6 +30,7 @@ import ( engif "github.com/stacklok/mediator/internal/engine/interfaces" "github.com/stacklok/mediator/internal/events" "github.com/stacklok/mediator/internal/providers" + providertelemetry "github.com/stacklok/mediator/internal/providers/telemetry" pb "github.com/stacklok/mediator/pkg/api/protobuf/go/minder/v1" ) @@ -42,19 +43,41 @@ const ( type Executor struct { querier db.Store crypteng *crypto.Engine + provMt providertelemetry.ProviderMetrics +} + +// ExecutorOption is a function that modifies an executor +type ExecutorOption func(*Executor) + +// WithProviderMetrics sets the provider metrics for the executor +func WithProviderMetrics(mt providertelemetry.ProviderMetrics) ExecutorOption { + return func(e *Executor) { + e.provMt = mt + } } // NewExecutor creates a new executor -func NewExecutor(querier db.Store, authCfg *config.AuthConfig) (*Executor, error) { +func NewExecutor( + querier db.Store, + authCfg *config.AuthConfig, + opts ...ExecutorOption, +) (*Executor, error) { crypteng, err := crypto.EngineFromAuthConfig(authCfg) if err != nil { return nil, err } - return &Executor{ + e := &Executor{ querier: querier, crypteng: crypteng, - }, nil + provMt: providertelemetry.NewNoopMetrics(), + } + + for _, opt := range opts { + opt(e) + } + + return e, nil } // Register implements the Consumer interface. @@ -89,7 +112,10 @@ func (e *Executor) HandleEntityEvent(msg *message.Message) error { return fmt.Errorf("error getting provider: %w", err) } - cli, err := providers.GetProviderBuilder(ctx, provider, *projectID, e.querier, e.crypteng) + pbOpts := []providers.ProviderBuilderOption{ + providers.WithProviderMetrics(e.provMt), + } + cli, err := providers.GetProviderBuilder(ctx, provider, *projectID, e.querier, e.crypteng, pbOpts...) if err != nil { return fmt.Errorf("error building client: %w", err) } diff --git a/internal/providers/github/github.go b/internal/providers/github/github.go index fd08d5f2f3..8e4b9a94d5 100644 --- a/internal/providers/github/github.go +++ b/internal/providers/github/github.go @@ -26,6 +26,7 @@ import ( "golang.org/x/oauth2" "github.com/stacklok/mediator/internal/db" + "github.com/stacklok/mediator/internal/providers/telemetry" minderv1 "github.com/stacklok/mediator/pkg/api/protobuf/go/minder/v1" provifv1 "github.com/stacklok/mediator/pkg/providers/v1" ) @@ -63,14 +64,22 @@ var _ provifv1.GitHub = (*RestClient)(nil) func NewRestClient( ctx context.Context, config *minderv1.GitHubProviderConfig, + metrics telemetry.HttpClientMetrics, token string, owner string, ) (*RestClient, error) { + var err error + ts := oauth2.StaticTokenSource( &oauth2.Token{AccessToken: token}, ) tc := oauth2.NewClient(ctx, ts) + tc.Transport, err = metrics.NewDurationRoundTripper(tc.Transport, db.ProviderTypeGithub) + if err != nil { + return nil, fmt.Errorf("error creating duration round tripper: %w", err) + } + ghClient := github.NewClient(tc) if config.Endpoint != "" { diff --git a/internal/providers/github/github_test.go b/internal/providers/github/github_test.go index 3c79c9816c..29f4154a00 100644 --- a/internal/providers/github/github_test.go +++ b/internal/providers/github/github_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/assert" + provtelemetry "github.com/stacklok/mediator/internal/providers/telemetry" minderv1 "github.com/stacklok/mediator/pkg/api/protobuf/go/minder/v1" ) @@ -28,7 +29,10 @@ func TestNewRestClient(t *testing.T) { client, err := NewRestClient(context.Background(), &minderv1.GitHubProviderConfig{ Endpoint: "https://api.github.com", - }, "token", "") + }, + provtelemetry.NewNoopMetrics(), + "token", "") + assert.NoError(t, err) assert.NotNil(t, client) } diff --git a/internal/providers/http/http.go b/internal/providers/http/http.go index 19383c7739..23f6497406 100644 --- a/internal/providers/http/http.go +++ b/internal/providers/http/http.go @@ -25,6 +25,8 @@ import ( "golang.org/x/oauth2" + "github.com/stacklok/mediator/internal/db" + "github.com/stacklok/mediator/internal/providers/telemetry" minderv1 "github.com/stacklok/mediator/pkg/api/protobuf/go/minder/v1" provifv1 "github.com/stacklok/mediator/pkg/providers/v1" ) @@ -40,8 +42,13 @@ type REST struct { var _ provifv1.REST = (*REST)(nil) // NewREST creates a new RESTful client. -func NewREST(config *minderv1.RESTProviderConfig, tok string) (*REST, error) { +func NewREST( + config *minderv1.RESTProviderConfig, + metrics telemetry.HttpClientMetrics, + tok string, +) (*REST, error) { var cli *http.Client + var err error if tok != "" { ts := oauth2.StaticTokenSource( @@ -52,8 +59,13 @@ func NewREST(config *minderv1.RESTProviderConfig, tok string) (*REST, error) { cli = &http.Client{} } + cli.Transport, err = metrics.NewDurationRoundTripper(cli.Transport, db.ProviderTypeRest) + if err != nil { + return nil, fmt.Errorf("error creating duration round tripper: %w", err) + } + var baseURL *url.URL - baseURL, err := baseURL.Parse(config.GetBaseUrl()) + baseURL, err = baseURL.Parse(config.GetBaseUrl()) if err != nil { return nil, err } diff --git a/internal/providers/providers.go b/internal/providers/providers.go index 425c48e687..155fbe05d2 100644 --- a/internal/providers/providers.go +++ b/internal/providers/providers.go @@ -28,6 +28,7 @@ import ( gitclient "github.com/stacklok/mediator/internal/providers/git" ghclient "github.com/stacklok/mediator/internal/providers/github" httpclient "github.com/stacklok/mediator/internal/providers/http" + "github.com/stacklok/mediator/internal/providers/telemetry" provinfv1 "github.com/stacklok/mediator/pkg/providers/v1" ) @@ -39,6 +40,7 @@ func GetProviderBuilder( projectID uuid.UUID, store db.Store, crypteng *crypto.Engine, + opts ...ProviderBuilderOption, ) (*ProviderBuilder, error) { encToken, err := store.GetAccessTokenByProjectID(ctx, db.GetAccessTokenByProjectIDParams{Provider: prov.Name, ProjectID: projectID}) @@ -51,7 +53,7 @@ func GetProviderBuilder( return nil, fmt.Errorf("error decrypting access token: %w", err) } - return NewProviderBuilder(&prov, encToken, decryptedToken.AccessToken), nil + return NewProviderBuilder(&prov, encToken, decryptedToken.AccessToken, opts...), nil } // ProviderBuilder is a utility struct which allows for the creation of @@ -60,6 +62,17 @@ type ProviderBuilder struct { p *db.Provider tokenInf db.ProviderAccessToken tok string + metrics telemetry.ProviderMetrics +} + +// ProviderBuilderOption is a function which can be used to set options on the ProviderBuilder. +type ProviderBuilderOption func(*ProviderBuilder) + +// WithProviderMetrics sets the metrics for the ProviderBuilder +func WithProviderMetrics(metrics telemetry.ProviderMetrics) ProviderBuilderOption { + return func(pb *ProviderBuilder) { + pb.metrics = metrics + } } // NewProviderBuilder creates a new provider builder. @@ -67,12 +80,20 @@ func NewProviderBuilder( p *db.Provider, tokenInf db.ProviderAccessToken, tok string, + opts ...ProviderBuilderOption, ) *ProviderBuilder { - return &ProviderBuilder{ + pb := &ProviderBuilder{ p: p, tokenInf: tokenInf, tok: tok, + metrics: telemetry.NewNoopMetrics(), } + + for _, opt := range opts { + opt(pb) + } + + return pb } // Implements returns true if the provider implements the given type. @@ -123,7 +144,7 @@ func (pb *ProviderBuilder) GetHTTP(ctx context.Context) (provinfv1.REST, error) return nil, fmt.Errorf("error parsing http config: %w", err) } - return httpclient.NewREST(cfg, pb.tok) + return httpclient.NewREST(cfg, pb.metrics, pb.tok) } // GetGitHub returns a github client for the provider. @@ -142,7 +163,7 @@ func (pb *ProviderBuilder) GetGitHub(ctx context.Context) (*ghclient.RestClient, return nil, fmt.Errorf("error parsing github config: %w", err) } - cli, err := ghclient.NewRestClient(ctx, cfg, pb.GetToken(), pb.tokenInf.OwnerFilter.String) + cli, err := ghclient.NewRestClient(ctx, cfg, pb.metrics, pb.GetToken(), pb.tokenInf.OwnerFilter.String) if err != nil { return nil, fmt.Errorf("error creating github client: %w", err) } diff --git a/internal/providers/telemetry/interface.go b/internal/providers/telemetry/interface.go new file mode 100644 index 0000000000..0affb0586a --- /dev/null +++ b/internal/providers/telemetry/interface.go @@ -0,0 +1,33 @@ +// +// Copyright 2023 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package telemetry provides the telemetry interfaces and implementations for providers +package telemetry + +import ( + "net/http" + + "github.com/stacklok/mediator/internal/db" +) + +// HttpClientMetrics provides the httpClientMetrics for http clients +type HttpClientMetrics interface { + NewDurationRoundTripper(wrapped http.RoundTripper, providerType db.ProviderType) (http.RoundTripper, error) +} + +// ProviderMetrics provides the httpClientMetrics for providers +type ProviderMetrics interface { + HttpClientMetrics +} diff --git a/internal/providers/telemetry/noop.go b/internal/providers/telemetry/noop.go new file mode 100644 index 0000000000..8a0e8526f1 --- /dev/null +++ b/internal/providers/telemetry/noop.go @@ -0,0 +1,35 @@ +// +// Copyright 2023 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "net/http" + + "github.com/stacklok/mediator/internal/db" +) + +type noop struct{} + +var _ ProviderMetrics = (*noop)(nil) + +// NewNoopMetrics returns a new noop httpClientMetrics provider +func NewNoopMetrics() *noop { + return &noop{} +} + +func (_ *noop) NewDurationRoundTripper(wrapped http.RoundTripper, _ db.ProviderType) (http.RoundTripper, error) { + return wrapped, nil +} diff --git a/internal/providers/telemetry/telemetry.go b/internal/providers/telemetry/telemetry.go new file mode 100644 index 0000000000..6e82076563 --- /dev/null +++ b/internal/providers/telemetry/telemetry.go @@ -0,0 +1,123 @@ +// +// Copyright 2023 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "fmt" + "log" + "net/http" + "time" + + "github.com/puzpuzpuz/xsync" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/stacklok/mediator/internal/db" +) + +var _ http.RoundTripper = (*instrumentedRoundTripper)(nil) + +type instrumentedRoundTripper struct { + baseRoundTripper http.RoundTripper + durationHistogram metric.Int64Histogram +} + +func (irt *instrumentedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + startTime := time.Now() + + resp, err := irt.baseRoundTripper.RoundTrip(r) + + duration := time.Since(startTime).Milliseconds() + labels := []attribute.KeyValue{ + attribute.String("http_method", r.Method), + attribute.String("http_host", r.URL.Host), + attribute.Int("http_status_code", resp.StatusCode), + } + irt.durationHistogram.Record(r.Context(), duration, metric.WithAttributes(labels...)) + + return resp, err +} + +var _ ProviderMetrics = (*providerMetrics)(nil) + +type providerMetrics struct { + httpClientMetrics +} + +// NewProviderMetrics creates a new provider metrics instance. +func NewProviderMetrics() *providerMetrics { + return &providerMetrics{ + httpClientMetrics: *newHttpClientMetrics(), + } +} + +var _ HttpClientMetrics = (*httpClientMetrics)(nil) + +type httpClientMetrics struct { + providersMeter metric.Meter + + httpProviderHistograms *xsync.MapOf[db.ProviderType, metric.Int64Histogram] +} + +func newProviderMapOf[V any]() *xsync.MapOf[db.ProviderType, V] { + return xsync.NewTypedMapOf[db.ProviderType, V](func(k db.ProviderType) uint64 { + return xsync.StrHash64(string(k)) + }) +} + +// newHttpClientMetrics creates a new http provider metrics instance. +func newHttpClientMetrics() *httpClientMetrics { + return &httpClientMetrics{ + providersMeter: otel.Meter("providers"), + httpProviderHistograms: newProviderMapOf[metric.Int64Histogram](), + } +} + +func (m *httpClientMetrics) createProviderHistogram(providerType db.ProviderType) (metric.Int64Histogram, error) { + histogramName := fmt.Sprintf("%s.http.roundtrip.duration", providerType) + return m.providersMeter.Int64Histogram(histogramName, + metric.WithDescription("HTTP roundtrip duration for provider"), + metric.WithUnit("ms"), + ) +} + +func (m *httpClientMetrics) getHistogramForProvider(providerType db.ProviderType) metric.Int64Histogram { + histogram, _ := m.httpProviderHistograms.LoadOrCompute(providerType, func() metric.Int64Histogram { + newHistogram, err := m.createProviderHistogram(providerType) + if err != nil { + log.Printf("failed to create histogram for provider %s: %v", providerType, err) + return nil + } + return newHistogram + }) + return histogram +} + +func (m *httpClientMetrics) NewDurationRoundTripper( + wrapped http.RoundTripper, + providerType db.ProviderType, +) (http.RoundTripper, error) { + histogram := m.getHistogramForProvider(providerType) + if histogram == nil { + return nil, fmt.Errorf("failed to retrieve histogram for provider %s", providerType) + } + + return &instrumentedRoundTripper{ + baseRoundTripper: wrapped, + durationHistogram: histogram, + }, nil +} diff --git a/internal/reconcilers/artifacts.go b/internal/reconcilers/artifacts.go index 4c6d7ff7be..5b7b2ff349 100644 --- a/internal/reconcilers/artifacts.go +++ b/internal/reconcilers/artifacts.go @@ -106,7 +106,10 @@ func (e *Reconciler) handleArtifactsReconcilerEvent(ctx context.Context, evt *Re return fmt.Errorf("error retrieving provider: %w", err) } - p, err := providers.GetProviderBuilder(ctx, prov, evt.Project, e.store, e.crypteng) + pbOpts := []providers.ProviderBuilderOption{ + providers.WithProviderMetrics(e.provMt), + } + p, err := providers.GetProviderBuilder(ctx, prov, evt.Project, e.store, e.crypteng, pbOpts...) if err != nil { return fmt.Errorf("error building client: %w", err) } diff --git a/internal/reconcilers/reconcilers.go b/internal/reconcilers/reconcilers.go index 561164c7e9..3dd9a11ae8 100644 --- a/internal/reconcilers/reconcilers.go +++ b/internal/reconcilers/reconcilers.go @@ -21,6 +21,7 @@ import ( "github.com/stacklok/mediator/internal/crypto" "github.com/stacklok/mediator/internal/db" "github.com/stacklok/mediator/internal/events" + providertelemetry "github.com/stacklok/mediator/internal/providers/telemetry" ) const ( @@ -35,20 +36,43 @@ type Reconciler struct { store db.Store evt *events.Eventer crypteng *crypto.Engine + provMt providertelemetry.ProviderMetrics +} + +// ReconcilerOption is a function that modifies a reconciler +type ReconcilerOption func(*Reconciler) + +// WithProviderMetrics sets the provider metrics for the reconciler +func WithProviderMetrics(mt providertelemetry.ProviderMetrics) ReconcilerOption { + return func(r *Reconciler) { + r.provMt = mt + } } // NewReconciler creates a new reconciler object -func NewReconciler(store db.Store, evt *events.Eventer, authCfg *config.AuthConfig) (*Reconciler, error) { +func NewReconciler( + store db.Store, + evt *events.Eventer, + authCfg *config.AuthConfig, + opts ...ReconcilerOption, +) (*Reconciler, error) { crypteng, err := crypto.EngineFromAuthConfig(authCfg) if err != nil { return nil, err } - return &Reconciler{ + r := &Reconciler{ store: store, evt: evt, crypteng: crypteng, - }, nil + provMt: providertelemetry.NewNoopMetrics(), + } + + for _, opt := range opts { + opt(r) + } + + return r, nil } // Register implements the Consumer interface.