Skip to content

Commit

Permalink
feat: OpenTelemetry tracing
Browse files Browse the repository at this point in the history
This PR adds build-in support for OpenTelemetry. All GraphQL queries to the federated services served from bramble are instrumented, this includes service schema updates, service list updates, as well as federated queries from users. The telemetry is exported using the gRPC exporter.
  • Loading branch information
asger-noer committed Jan 21, 2024
1 parent ab11064 commit 7ec99fe
Show file tree
Hide file tree
Showing 18 changed files with 732 additions and 176 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
.idea
.DS_Store
coverage.out
*__debug_*
config.json
81 changes: 68 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/vektah/gqlparser/v2/ast"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
)

// GraphQLClient is a GraphQL client.
type GraphQLClient struct {
HTTPClient *http.Client
MaxResponseSize int64
UserAgent string

tracer trace.Tracer
}

// ClientOpt is a function used to set a GraphQL client option
Expand All @@ -33,10 +40,15 @@ func NewClient(opts ...ClientOpt) *GraphQLClient {
}

func NewClientWithPlugins(plugins []Plugin, opts ...ClientOpt) *GraphQLClient {
var transport http.RoundTripper = http.DefaultTransport
transport = otelhttp.NewTransport(transport)

c := &GraphQLClient{
HTTPClient: &http.Client{
Timeout: 5 * time.Second,
Transport: transport,
Timeout: 5 * time.Second,
},
tracer: otel.GetTracerProvider().Tracer(instrumentationName),
MaxResponseSize: 1024 * 1024,
}

Expand All @@ -51,13 +63,18 @@ func NewClientWithPlugins(plugins []Plugin, opts ...ClientOpt) *GraphQLClient {
}

func NewClientWithoutKeepAlive(opts ...ClientOpt) *GraphQLClient {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.DisableKeepAlives = true
var defaultTransport = http.DefaultTransport.(*http.Transport).Clone()
defaultTransport.DisableKeepAlives = true

var transport http.RoundTripper = defaultTransport
transport = otelhttp.NewTransport(transport)

c := &GraphQLClient{
HTTPClient: &http.Client{
Timeout: 5 * time.Second,
Transport: transport,
},
tracer: otel.GetTracerProvider().Tracer(instrumentationName),
MaxResponseSize: 1024 * 1024,
}

Expand Down Expand Up @@ -93,15 +110,35 @@ func WithUserAgent(userAgent string) ClientOpt {

// Request executes a GraphQL request.
func (c *GraphQLClient) Request(ctx context.Context, url string, request *Request, out interface{}) error {
ctx, span := c.tracer.Start(ctx, "GraphQL Request",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(
semconv.GraphqlOperationTypeKey.String(string(request.OperationType)),
semconv.GraphqlOperationName(request.OperationName),
semconv.GraphqlDocument(request.Query),
),
)

defer span.End()

traceErr := func(err error) error {
if err == nil {
return err
}

span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(request)
if err != nil {
return fmt.Errorf("unable to encode request body: %w", err)
if err := json.NewEncoder(&buf).Encode(request); err != nil {
return traceErr(fmt.Errorf("unable to encode request body: %w", err))
}

httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, &buf)
if err != nil {
return fmt.Errorf("unable to create request: %w", err)
return traceErr(fmt.Errorf("unable to create request: %w", err))
}

if request.Headers != nil {
Expand All @@ -121,8 +158,13 @@ func (c *GraphQLClient) Request(ctx context.Context, url string, request *Reques
promServiceTimeoutErrorCounter.With(prometheus.Labels{
"service": url,
}).Inc()

// Return raw timeout error to allow caller to handle it since a
// downstream caller may want to retry, and they will have to jump
// through hoops to detect this error otherwise.
return traceErr(err)
}
return fmt.Errorf("error during request: %w", err)
return traceErr(fmt.Errorf("error during request: %w", err))
}
defer res.Body.Close()

Expand All @@ -140,25 +182,25 @@ func (c *GraphQLClient) Request(ctx context.Context, url string, request *Reques
Data: out,
}

err = json.NewDecoder(&limitReader).Decode(&graphqlResponse)
if err != nil {
if err = json.NewDecoder(&limitReader).Decode(&graphqlResponse); err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) {
if limitReader.N == 0 {
return fmt.Errorf("response exceeded maximum size of %d bytes", maxResponseSize)
return traceErr(fmt.Errorf("response exceeded maximum size of %d bytes", maxResponseSize))
}
}
return fmt.Errorf("error decoding response: %w", err)
return traceErr(fmt.Errorf("error decoding response: %w", err))
}

if len(graphqlResponse.Errors) > 0 {
return graphqlResponse.Errors
return traceErr(graphqlResponse.Errors)
}

return nil
}

// Request is a GraphQL request.
type Request struct {
OperationType string `json:"operationType,omitempty"`
Query string `json:"query"`
OperationName string `json:"operationName,omitempty"`
Variables map[string]interface{} `json:"variables,omitempty"`
Expand All @@ -177,7 +219,20 @@ func (r *Request) WithHeaders(headers http.Header) *Request {
return r
}

func (r *Request) WithOperationType(operation string) *Request {
op := strings.ToLower(operation)
switch op {
case "query", "mutation", "subscription":
r.OperationType = op
default:
r.OperationType = "query"
}

return r
}

func (r *Request) WithOperationName(operationName string) *Request {

r.OperationName = operationName
return r
}
Expand Down
48 changes: 36 additions & 12 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bramble

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -11,6 +12,8 @@ import (

"github.com/fsnotify/fsnotify"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var Version = "dev"
Expand Down Expand Up @@ -47,8 +50,9 @@ type Config struct {
LogLevel log.Level `json:"loglevel"`
PollInterval string `json:"poll-interval"`
PollIntervalDuration time.Duration
MaxRequestsPerQuery int64 `json:"max-requests-per-query"`
MaxServiceResponseSize int64 `json:"max-service-response-size"`
MaxRequestsPerQuery int64 `json:"max-requests-per-query"`
MaxServiceResponseSize int64 `json:"max-service-response-size"`
Telemetry TelemetryConfig `json:"telemetry"`
Plugins []PluginConfig
// Config extensions that can be shared among plugins
Extensions map[string]json.RawMessage
Expand All @@ -58,6 +62,7 @@ type Config struct {
plugins []Plugin
executableSchema *ExecutableSchema
watcher *fsnotify.Watcher
tracer trace.Tracer
configFiles []string
linkedFiles []string
}
Expand Down Expand Up @@ -250,20 +255,34 @@ func (c *Config) Watch() {
continue
}

err := c.Load()
if err != nil {
if err := c.reload(); err != nil {
log.WithError(err).Error("error reloading config")
}
log.WithField("services", c.Services).Info("config file updated")
err = c.executableSchema.UpdateServiceList(c.Services)
if err != nil {
log.WithError(err).Error("error updating services")
}
log.WithField("services", c.Services).Info("updated services")
}
}
}

func (c *Config) reload() error {
ctx := context.Background()

ctx, span := c.tracer.Start(ctx, "Config Reload")
defer span.End()

if err := c.Load(); err != nil {
log.WithError(err).Error("error reloading config")
}

log.WithField("services", c.Services).Info("config file updated")

if err := c.executableSchema.UpdateServiceList(ctx, c.Services); err != nil {
log.WithError(err).Error("error updating services")
}

log.WithField("services", c.Services).Info("updated services")

return nil
}

// GetConfig returns operational config for the gateway
func GetConfig(configFiles []string) (*Config, error) {
watcher, err := fsnotify.NewWatcher()
Expand Down Expand Up @@ -296,6 +315,7 @@ func GetConfig(configFiles []string) (*Config, error) {
MaxServiceResponseSize: 1024 * 1024,

watcher: watcher,
tracer: otel.GetTracerProvider().Tracer(instrumentationName),
configFiles: configFiles,
linkedFiles: linkedFiles,
}
Expand Down Expand Up @@ -343,13 +363,17 @@ func (c *Config) Init() error {
services = append(services, NewService(s, serviceClientOptions...))
}

queryClientOptions := []ClientOpt{WithMaxResponseSize(c.MaxServiceResponseSize), WithUserAgent(GenerateUserAgent("query"))}
queryClientOptions := []ClientOpt{
WithMaxResponseSize(c.MaxServiceResponseSize),
WithUserAgent(GenerateUserAgent("query")),
}

if c.QueryHTTPClient != nil {
queryClientOptions = append(queryClientOptions, WithHTTPClient(c.QueryHTTPClient))
}
queryClient := NewClientWithPlugins(c.plugins, queryClientOptions...)
es := NewExecutableSchema(c.plugins, c.MaxRequestsPerQuery, queryClient, services...)
err = es.UpdateSchema(true)
err = es.UpdateSchema(context.Background(), true)
if err != nil {
return err
}
Expand Down
21 changes: 21 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ Sample configuration:
"max-requests-per-query": 50,
"max-client-response-size": 1048576,
"id-field-name": "id",
"telemetry": {
"enabled": true,
"insecure": false,
"endpoint": "http://localhost:4317",
"serviceName": "bramble"
},
"plugins": [
{
"name": "admin-ui"
Expand Down Expand Up @@ -84,6 +90,21 @@ Sample configuration:
- Default: `id`
- Supports hot-reload: No

- `telemetry`: OpenTelemetry configuration.
- `enabled`: Enable OpenTelemetry.
- Default: `false`
- Supports hot-reload: No
- `insecure`: Whether to use insecure connection to OpenTelemetry collector.
- Default: `false`
- Supports hot-reload: No
- `endpoint`: OpenTelemetry collector endpoint.
- Default: If no endpoint is specified, telemetry is disabled.
- Supports hot-reload: No
- `serviceName`: Service name to use for OpenTelemetry.
- Default: `bramble`
- Supports hot-reload: No


- `plugins`: Optional list of plugins to enable. See [plugins](plugins.md) for plugins-specific config.

- Supports hot-reload: Partial. `Configure` method of previously enabled plugins will get called with new configuration.
Expand Down
Loading

0 comments on commit 7ec99fe

Please sign in to comment.