Skip to content

Commit

Permalink
feat: implement more efficient aggregation of schema usage metrics wi…
Browse files Browse the repository at this point in the history
…th caching (#1095)
  • Loading branch information
jensneuse authored Aug 19, 2024
1 parent e37e806 commit a40c9d8
Show file tree
Hide file tree
Showing 29 changed files with 2,282 additions and 1,229 deletions.
124 changes: 105 additions & 19 deletions graphqlmetrics/core/metrics_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,32 @@ func (s *MetricsService) saveOperations(ctx context.Context, insertTime time.Tim
return 0, fmt.Errorf("failed to prepare batch for operations: %w", err)
}

for _, schemaUsage := range schemaUsage {

operationType := strings.ToLower(schemaUsage.OperationInfo.Type.String())
abort := true

for _, schemaUsage := range schemaUsage {
// If the operation is already in the cache, we can skip it and don't write it again
if _, ok := s.opGuardCache.Get(schemaUsage.OperationInfo.Hash); !ok {
err := opBatch.Append(
insertTime,
schemaUsage.OperationInfo.Name,
schemaUsage.OperationInfo.Hash,
operationType,
schemaUsage.RequestDocument,
)
if err != nil {
return 0, fmt.Errorf("failed to append operation to batch: %w", err)
}
if _, exists := s.opGuardCache.Get(schemaUsage.OperationInfo.Hash); exists {
continue
}
if abort {
abort = false
}
err := opBatch.Append(
insertTime,
schemaUsage.OperationInfo.Name,
schemaUsage.OperationInfo.Hash,
strings.ToLower(schemaUsage.OperationInfo.Type.String()),
schemaUsage.RequestDocument,
)
if err != nil {
return 0, fmt.Errorf("failed to append operation to batch: %w", err)
}
}

// if we skipped saving all operations, in case they were already stored (known from the cache),
// we can abort the batch as there is nothing to write
if abort {
return 0, opBatch.Abort()
}

if err := opBatch.Send(); err != nil {
Expand Down Expand Up @@ -218,14 +226,16 @@ func (s *MetricsService) PublishGraphQLMetrics(
}

dispatched := s.pool.TrySubmit(func() {
var sentOps, sentMetrics = 0, 0
var (
storedOperations, storedMetrics int
)
insertTime := time.Now()

defer func() {
requestLogger.Debug("operations write finished",
zap.Duration("duration", time.Since(insertTime)),
zap.Int("metrics", sentMetrics),
zap.Int("operations", sentOps),
zap.Int("storedOperations", storedOperations),
zap.Int("storedMetrics", storedMetrics),
)
}()

Expand All @@ -236,7 +246,7 @@ func (s *MetricsService) PublishGraphQLMetrics(
if err != nil {
return err
}
sentOps += writtenOps
storedOperations += writtenOps
return nil
})

Expand All @@ -249,7 +259,83 @@ func (s *MetricsService) PublishGraphQLMetrics(
if err != nil {
return err
}
sentMetrics += writtenMetrics
storedMetrics += writtenMetrics
return nil
})

if err != nil {
requestLogger.Error("Failed to write metrics", zap.Error(err))
}
})

if !dispatched {
requestLogger.Error("Failed to dispatch request to worker pool")

// Will force the client (router) to retry the request
return nil, errPublishFailed
}

return res, nil
}

func (s *MetricsService) PublishAggregatedGraphQLMetrics(ctx context.Context, req *connect.Request[graphqlmetricsv1.PublishAggregatedGraphQLRequestMetricsRequest]) (*connect.Response[graphqlmetricsv1.PublishAggregatedGraphQLRequestMetricsResponse], error) {
requestLogger := s.logger.With(zap.String("procedure", req.Spec().Procedure))
res := connect.NewResponse(&graphqlmetricsv1.PublishAggregatedGraphQLRequestMetricsResponse{})

claims, err := utils.GetClaims(ctx)
if err != nil {
return nil, errNotAuthenticated
}

schemaUsage := make([]*graphqlmetricsv1.SchemaUsageInfo, len(req.Msg.Aggregation))
for i, agg := range req.Msg.Aggregation {
for j := range agg.SchemaUsage.ArgumentMetrics {
agg.SchemaUsage.ArgumentMetrics[j].Count = agg.RequestCount
}
for j := range agg.SchemaUsage.InputMetrics {
agg.SchemaUsage.InputMetrics[j].Count = agg.RequestCount
}
for j := range agg.SchemaUsage.TypeFieldMetrics {
agg.SchemaUsage.TypeFieldMetrics[j].Count = agg.RequestCount
}
schemaUsage[i] = agg.SchemaUsage
}

dispatched := s.pool.TrySubmit(func() {
var (
storedOperations, storedMetrics int
)
insertTime := time.Now()

defer func() {
requestLogger.Debug("operations write finished",
zap.Duration("duration", time.Since(insertTime)),
zap.Int("storedOperations", storedOperations),
zap.Int("storedMetrics", storedMetrics),
)
}()

insertCtx := context.Background()

err = retryOnError(insertCtx, requestLogger.With(zap.String("component", "operations")), func(ctx context.Context) error {
writtenOps, err := s.saveOperations(ctx, insertTime, schemaUsage)
if err != nil {
return err
}
storedOperations += writtenOps
return nil
})

if err != nil {
requestLogger.Error("Failed to write operations", zap.Error(err))
}

err = retryOnError(insertCtx, requestLogger.With(zap.String("component", "metrics")), func(ctx context.Context) error {
writtenMetrics, err := s.saveUsageMetrics(ctx, insertTime, claims, schemaUsage)
if err != nil {
return err
}
storedMetrics += writtenMetrics
return nil
})

Expand Down
115 changes: 115 additions & 0 deletions graphqlmetrics/core/metrics_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,121 @@ func TestPublishGraphQLMetrics(t *testing.T) {
assert.Greater(t, fieldUsageCountMv, uint64(0))
}

func TestPublishAggregatedGraphQLMetrics(t *testing.T) {
if os.Getenv("INT_TESTS") != "true" {
t.Skip("Skipping integration tests")
}

db := test.GetTestDatabase(t)

msvc := NewMetricsService(zap.NewNop(), db)

req := &graphqlmetricsv1.PublishAggregatedGraphQLRequestMetricsRequest{
Aggregation: []*graphqlmetricsv1.SchemaUsageInfoAggregation{
{
SchemaUsage: &graphqlmetricsv1.SchemaUsageInfo{
RequestDocument: "query Hello { hello }",
TypeFieldMetrics: []*graphqlmetricsv1.TypeFieldUsageInfo{
{
Path: []string{"hello"},
TypeNames: []string{"Query"},
SubgraphIDs: []string{"sub123"},
},
},
OperationInfo: &graphqlmetricsv1.OperationInfo{
Hash: "hash123",
Name: "Hello",
Type: graphqlmetricsv1.OperationType_QUERY,
},
SchemaInfo: &graphqlmetricsv1.SchemaInfo{
Version: "v1",
},
ClientInfo: &graphqlmetricsv1.ClientInfo{
Name: "wundergraph",
Version: "1.0.0",
},
RequestInfo: &graphqlmetricsv1.RequestInfo{
StatusCode: 200,
Error: true,
},
Attributes: map[string]string{
"test": "test123",
},
},
RequestCount: 1,
},
},
}

pReq := connect.NewRequest[graphqlmetricsv1.PublishAggregatedGraphQLRequestMetricsRequest](req)

ctx := utils.SetClaims(context.Background(), &utils.GraphAPITokenClaims{
FederatedGraphID: "fed123",
OrganizationID: "org123",
})

_, err := msvc.PublishAggregatedGraphQLMetrics(
ctx,
pReq,
)
require.NoError(t, err)

// Wait for batch to be processed
msvc.Shutdown(time.Second * 5)

// Validate insert

var opCount uint64
require.NoError(t, db.QueryRow(ctx, `
SELECT COUNT(*) FROM gql_metrics_operations
WHERE OperationHash = 'hash123' AND
OperationName = 'Hello' AND
OperationType = 'query' AND
OperationContent = 'query Hello { hello }'
GROUP BY OperationHash LIMIT 1
`).Scan(&opCount))

assert.Greater(t, opCount, uint64(0))

// Validate insert

var fieldUsageCount uint64
require.NoError(t, db.QueryRow(ctx, `
SELECT COUNT(*) FROM gql_metrics_schema_usage
WHERE OperationHash = 'hash123' AND
OrganizationID = 'org123' AND
FederatedGraphID = 'fed123' AND
RouterConfigVersion = 'v1' AND
Attributes['test'] = 'test123' AND
HttpStatusCode = '200' AND
HasError = true AND
ClientName = 'wundergraph' AND
ClientVersion = '1.0.0' AND
hasAny(TypeNames, ['Query']) AND
startsWith(Path, ['hello'])
`).Scan(&fieldUsageCount))

// Validate materialized view

var fieldUsageCountMv uint64
require.NoError(t, db.QueryRow(ctx, `
SELECT COUNT(*) FROM gql_metrics_schema_usage_5m_90d_mv
WHERE OperationHash = 'hash123' AND
OrganizationID = 'org123' AND
FederatedGraphID = 'fed123' AND
RouterConfigVersion = 'v1' AND
TotalErrors = 1 AND
TotalUsages = 1 AND
TotalClientErrors = 0 AND
ClientName = 'wundergraph' AND
ClientVersion = '1.0.0' AND
hasAny(TypeNames, ['Query']) AND
startsWith(Path, ['hello'])
`).Scan(&fieldUsageCountMv))

assert.Greater(t, fieldUsageCountMv, uint64(0))
}

func TestAuthentication(t *testing.T) {
if os.Getenv("INT_TESTS") != "true" {
t.Skip("Skipping integration tests")
Expand Down
Loading

0 comments on commit a40c9d8

Please sign in to comment.