Skip to content

Commit

Permalink
fix: missing metrics result tag when passing options (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored May 26, 2023
1 parent f318d1f commit a8d0f00
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 20 deletions.
4 changes: 2 additions & 2 deletions components/ledger/libs/otlp/otlpmetrics/otlpexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func LoadOTLPMetricsGRPCExporter(options ...otlpmetricgrpc.Option) (sdkmetric.Ex
func ProvideOTLPMetricsGRPCExporter() fx.Option {
return fx.Options(
fx.Provide(
fx.Annotate(LoadOTLPMetricsGRPCExporter, fx.As(new(sdkmetric.Exporter))),
fx.Annotate(LoadOTLPMetricsGRPCExporter, fx.ParamTags(OTLPMetricsGRPCOptionsKey), fx.As(new(sdkmetric.Exporter))),
),
)
}
Expand All @@ -30,7 +30,7 @@ func LoadOTLPMetricsHTTPExporter(options ...otlpmetrichttp.Option) (sdkmetric.Ex
func ProvideOTLPMetricsHTTPExporter() fx.Option {
return fx.Options(
fx.Provide(
fx.Annotate(LoadOTLPMetricsHTTPExporter, fx.As(new(sdkmetric.Exporter))),
fx.Annotate(LoadOTLPMetricsHTTPExporter, fx.ParamTags(OTLPMetricsHTTPOptionsKey), fx.As(new(sdkmetric.Exporter))),
),
)
}
Expand Down
17 changes: 16 additions & 1 deletion components/stargate/internal/server/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -53,13 +54,22 @@ func (s *Server) Stargate(stream api.StargateService_StargateServer) error {
ctx := stream.Context()
organizationID, stackID, err := orgaAndStackIDFromIncomingContext(ctx)
if err != nil {
s.metricsRegistry.StreamErrors().Add(ctx, 1, []attribute.KeyValue{
attribute.Int("code", int(codes.InvalidArgument)),
}...)
return status.Errorf(codes.InvalidArgument, "cannot get organization and stack id from contex metadata: %v", err)
}

logger := s.logger.WithFields(map[string]any{
"organization_id": organizationID,
"stack_id": stackID,
})
attrs := []attribute.KeyValue{
attribute.String("organization_id", organizationID),
attribute.String("stack_id", stackID),
}
s.metricsRegistry.ClientsConnected().Add(ctx, 1, attrs...)
defer s.metricsRegistry.ClientsConnected().Add(ctx, -1, attrs...)

logger.Infof("[GRPC] new stargate connection")
defer logger.Infof("[GRPC] stargate connection closed")
Expand All @@ -71,6 +81,9 @@ func (s *Server) Stargate(stream api.StargateService_StargateServer) error {
logger.Debugf("[GRPC] subscribing to nats subject %s", subject)
sub, err := s.natsConn.QueueSubscribeSync(subject, subject)
if err != nil {
s.metricsRegistry.StreamErrors().Add(ctx, 1, []attribute.KeyValue{
attribute.Int("code", int(codes.Internal)),
}...)
return status.Errorf(codes.Internal, "cannot subscribe to nats subject")
}

Expand Down Expand Up @@ -185,7 +198,9 @@ func (s *Server) Stargate(stream api.StargateService_StargateServer) error {
})

if err := eg.Wait(); err != nil {
// TODO(polo): should we expose the error here ?
s.metricsRegistry.StreamErrors().Add(ctx, 1, []attribute.KeyValue{
attribute.Int("code", int(codes.Internal)),
}...)
return status.Errorf(codes.Internal, "internal error: %v", err)
}

Expand Down
54 changes: 48 additions & 6 deletions components/stargate/internal/server/grpc/opentelemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ import (

type MetricsRegistry interface {
UnAuthenticatedCalls() instrument.Int64Counter
ClientsConnected() instrument.Int64UpDownCounter
StreamErrors() instrument.Int64Counter
GRPCLatencies() instrument.Int64Histogram
CorrelationIDNotFound() instrument.Int64Counter
}

type metricsRegistry struct {
unAuthenticatedCalls instrument.Int64Counter
clientsConnected instrument.Int64UpDownCounter
streamErrors instrument.Int64Counter
grpcLatencies instrument.Int64Histogram
correlationIDNotFound instrument.Int64Counter
}
Expand All @@ -21,16 +25,34 @@ func RegisterMetricsRegistry(meterProvider metric.MeterProvider) (MetricsRegistr
meter := meterProvider.Meter("server_grpc")

unAuthenticatedCalls, err := meter.Int64Counter(
"unauthenticated_calls",
"stargate_server_unauthenticated_calls",
instrument.WithUnit("1"),
instrument.WithDescription("Unauthenticated calls"),
)
if err != nil {
return nil, err
}

clientsConnected, err := meter.Int64UpDownCounter(
"stargate_server_clients_connected",
instrument.WithUnit("1"),
instrument.WithDescription("Number of connected clients"),
)
if err != nil {
return nil, err
}

streamErrors, err := meter.Int64Counter(
"stargate_server_stream_errors",
instrument.WithUnit("1"),
instrument.WithDescription("Stream errors"),
)
if err != nil {
return nil, err
}

grpcLatencies, err := meter.Int64Histogram(
"grpc_latencies",
"stargate_server_grpc_latencies",
instrument.WithUnit("ms"),
instrument.WithDescription("Latency of gRPC calls"),
)
Expand All @@ -39,7 +61,7 @@ func RegisterMetricsRegistry(meterProvider metric.MeterProvider) (MetricsRegistr
}

correlationIDNotFound, err := meter.Int64Counter(
"correlation_id_not_found",
"stargate_server_correlation_id_not_found",
instrument.WithUnit("1"),
instrument.WithDescription("Correlation ID not found"),
)
Expand All @@ -49,6 +71,8 @@ func RegisterMetricsRegistry(meterProvider metric.MeterProvider) (MetricsRegistr

return &metricsRegistry{
unAuthenticatedCalls: unAuthenticatedCalls,
clientsConnected: clientsConnected,
streamErrors: streamErrors,
grpcLatencies: grpcLatencies,
correlationIDNotFound: correlationIDNotFound,
}, nil
Expand All @@ -66,23 +90,41 @@ func (m *metricsRegistry) CorrelationIDNotFound() instrument.Int64Counter {
return m.correlationIDNotFound
}

func (m *metricsRegistry) ClientsConnected() instrument.Int64UpDownCounter {
return m.clientsConnected
}

func (m *metricsRegistry) StreamErrors() instrument.Int64Counter {
return m.streamErrors
}

type NoOpMetricsRegistry struct{}

func NewNoOpMetricsRegistry() *NoOpMetricsRegistry {
return &NoOpMetricsRegistry{}
}

func (m *NoOpMetricsRegistry) UnAuthenticatedCalls() instrument.Int64Counter {
counter, _ := metric.NewNoopMeter().Int64Counter("unauthenticated_calls")
counter, _ := metric.NewNoopMeter().Int64Counter("stargate_server_unauthenticated_calls")
return counter
}

func (m *NoOpMetricsRegistry) GRPCLatencies() instrument.Int64Histogram {
histogram, _ := metric.NewNoopMeter().Int64Histogram("grpc_latencies")
histogram, _ := metric.NewNoopMeter().Int64Histogram("stargate_server_grpc_latencies")
return histogram
}

func (m *NoOpMetricsRegistry) CorrelationIDNotFound() instrument.Int64Counter {
counter, _ := metric.NewNoopMeter().Int64Counter("correlation_id_not_found")
counter, _ := metric.NewNoopMeter().Int64Counter("stargate_server_correlation_id_not_found")
return counter
}

func (m *NoOpMetricsRegistry) ClientsConnected() instrument.Int64UpDownCounter {
counter, _ := metric.NewNoopMeter().Int64UpDownCounter("stargate_server_clients_connected")
return counter
}

func (m *NoOpMetricsRegistry) StreamErrors() instrument.Int64Counter {
counter, _ := metric.NewNoopMeter().Int64Counter("stargate_server_stream_errors")
return counter
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var (
ErrNoResponders = errors.New("no responders")
)

func ResponseError(w http.ResponseWriter, r *http.Request, err error) {
func ResponseError(w http.ResponseWriter, r *http.Request, err error) int {
status, code, details := coreErrorToErrorCode(err)

baseError := errors.Cause(err)
Expand All @@ -40,6 +40,8 @@ func ResponseError(w http.ResponseWriter, r *http.Request, err error) {
} else {
logging.FromContext(r.Context()).Errorf("internal server error: %s", err)
}

return status
}

func coreErrorToErrorCode(err error) (int, string, string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,26 @@ func (s *StargateController) HandleCalls(w http.ResponseWriter, r *http.Request)
return
}

var status int
attrs := []attribute.KeyValue{
attribute.String("organization_id", organizationID),
attribute.String("stack_id", stackID),
attribute.String("path", r.URL.Path),
}
s.metricsRegistry.ReceivedHTTPCallByPath().Add(ctx, 1, attrs...)
defer func() {
attrs = append(attrs, attribute.Int("status", status))
s.metricsRegistry.ReceivedHTTPCallByPath().Add(ctx, 1, attrs...)
}()

msg, err := requestToProto(r)
if err != nil {
ResponseError(w, r, errors.Wrapf(err, "failed to parse request"))
status = ResponseError(w, r, errors.Wrapf(err, "failed to parse request"))
return
}

buf, err := proto.Marshal(msg)
if err != nil {
ResponseError(w, r, errors.Wrapf(err, "failed to marshal message"))
status = ResponseError(w, r, errors.Wrapf(err, "failed to marshal message"))
return
}

Expand All @@ -50,13 +54,13 @@ func (s *StargateController) HandleCalls(w http.ResponseWriter, r *http.Request)
resp, err := s.natsConn.Request(subject, buf, s.config.natsRequestTimeout)
if err != nil {
s.logger.Errorf("[HTTP] error sending message to %s with path: %s", subject, r.URL.Path)
ResponseError(w, r, ErrNoResponders)
status = ResponseError(w, r, ErrNoResponders)
return
}

var response service.StargateClientMessage
if err = proto.Unmarshal(resp.Data, &response); err != nil {
ResponseError(w, r, errors.Wrapf(err, "failed to unmarshal response"))
status = ResponseError(w, r, errors.Wrapf(err, "failed to unmarshal response"))
return
}

Expand All @@ -70,10 +74,11 @@ func (s *StargateController) HandleCalls(w http.ResponseWriter, r *http.Request)
}
}

status = int(ev.ApiCallResponse.StatusCode)
api.WriteResponse(w, int(ev.ApiCallResponse.StatusCode), ev.ApiCallResponse.Body)
return
default:
ResponseError(w, r, errors.Wrapf(err, "invalid response from client"))
status = ResponseError(w, r, errors.Wrapf(err, "invalid response from client"))
return
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func RegisterMetricsRegistry(meterProvider metric.MeterProvider) (MetricsRegistr
meter := meterProvider.Meter("server_http")

receivedHTTPCallByPath, err := meter.Int64Counter(
"received_http_call_by_path",
"stargate_server_received_http_call_by_path",
instrument.WithUnit("1"),
instrument.WithDescription("Received HTTP call by path"),
)
Expand All @@ -41,6 +41,6 @@ func NewNoOpMetricsRegistry() *NoOpMetricsRegistry {
}

func (m *NoOpMetricsRegistry) ReceivedHTTPCallByPath() instrument.Int64Counter {
counter, _ := metric.NewNoopMeter().Int64Counter("received_http_call_by_path")
counter, _ := metric.NewNoopMeter().Int64Counter("stargate_server_received_http_call_by_path")
return counter
}
4 changes: 2 additions & 2 deletions libs/go-libs/otlp/otlpmetrics/otlpexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func LoadOTLPMetricsGRPCExporter(options ...otlpmetricgrpc.Option) (sdkmetric.Ex
func ProvideOTLPMetricsGRPCExporter() fx.Option {
return fx.Options(
fx.Provide(
fx.Annotate(LoadOTLPMetricsGRPCExporter, fx.As(new(sdkmetric.Exporter))),
fx.Annotate(LoadOTLPMetricsGRPCExporter, fx.ParamTags(OTLPMetricsGRPCOptionsKey), fx.As(new(sdkmetric.Exporter))),
),
)
}
Expand All @@ -30,7 +30,7 @@ func LoadOTLPMetricsHTTPExporter(options ...otlpmetrichttp.Option) (sdkmetric.Ex
func ProvideOTLPMetricsHTTPExporter() fx.Option {
return fx.Options(
fx.Provide(
fx.Annotate(LoadOTLPMetricsHTTPExporter, fx.As(new(sdkmetric.Exporter))),
fx.Annotate(LoadOTLPMetricsHTTPExporter, fx.ParamTags(OTLPMetricsHTTPOptionsKey), fx.As(new(sdkmetric.Exporter))),
),
)
}
Expand Down

0 comments on commit a8d0f00

Please sign in to comment.