Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): refactor error metrics (#8314)
Browse files Browse the repository at this point in the history
This is a small refactor to how opencensus metrics are reported when receiving responses from the server.

There's effectively two cases where we consider an error to have occurred on receive:

* Invoking gRPC Recv() on the connection emitted an error, typically a transport issue

* The service embedded an error in the response, which is more akin to an application error (failed to commit data, offset mismatch, etc).

This CL ensures we increment the existing AppendResponseError count metric in both causes, and deals with the unlikely scenario where we're unable to tag the report with the status code.  When that happens, we simply record the metric uncoded.

Towards: https://github.com/googleapis/google-cloud-go/issues/8311
  • Loading branch information
shollyman authored Jul 25, 2023
1 parent 08b151a commit 04d6264
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,18 +494,29 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue
resp, err := arc.Recv()
co.release(nextWrite)
if err != nil {
// The Recv() itself yielded an error. We increment AppendResponseErrors by one, tagged by the status
// code.
status := grpcstatus.Convert(err)
metricCtx := ctx
if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.Code()).String())); tagErr == nil {
metricCtx = tagCtx
}
recordStat(metricCtx, AppendResponseErrors, 1)

nextWrite.writer.processRetry(nextWrite, co, nil, err)
continue
}
// Record that we did in fact get a response from the backend.
recordStat(ctx, AppendResponses, 1)

if status := resp.GetError(); status != nil {
// The response from the backend embedded a status error. We record that the error
// occurred, and tag it based on the response code of the status.
// The response was received successfully, but the response embeds a status error in the payload.
// Increment AppendResponseErrors, tagged by status code.
metricCtx := ctx
if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil {
recordStat(tagCtx, AppendResponseErrors, 1)
metricCtx = tagCtx
}
recordStat(metricCtx, AppendResponseErrors, 1)
respErr := grpcstatus.ErrorProto(status)

nextWrite.writer.processRetry(nextWrite, co, resp, respErr)
Expand Down

0 comments on commit 04d6264

Please sign in to comment.