diff --git a/.changeset/large-oranges-warn.md b/.changeset/large-oranges-warn.md new file mode 100644 index 00000000000..db29d9b5d77 --- /dev/null +++ b/.changeset/large-oranges-warn.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Adds prometheus metrics for automation streams error handling diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index b9847dd3e0d..5a4b701f61a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" v02 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02" v03 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -121,6 +122,7 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper lookupLggr := s.lggr.With("where", "StreamsLookup") if checkResult.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) { // Streams Lookup only works when upkeep target check reverts + prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorReasonNotReverted).Inc() return } @@ -134,11 +136,13 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper if err != nil { lookupLggr.Debugf("at block %d upkeep %s DecodeStreamsLookupRequest failed: %v", block, upkeepId, err) // user contract did not revert with StreamsLookup error + prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorDecodeRequestFailed).Inc() return } streamsLookupResponse := &mercury.StreamsLookup{StreamsLookupError: streamsLookupErr} if s.mercuryConfig.Credentials() == nil { lookupLggr.Errorf("at block %d upkeep %s tries to access mercury server but mercury credential is not configured", block, upkeepId) + prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorCredentialsNotConfigured).Inc() return } @@ -179,6 +183,7 @@ func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *merc values, errCode, err := s.DoMercuryRequest(ctx, lookup, checkResults, i) if err != nil { s.lggr.Errorf("at block %d upkeep %s requested time %s DoMercuryRequest err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) + prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorDoMercuryRequest).Inc() return } @@ -187,6 +192,7 @@ func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *merc if err != nil { s.lggr.Errorf("at block %d upkeep %s requested time %s CheckErrorHandler err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) } + prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorCodeNotNil).Inc() return } @@ -194,10 +200,12 @@ func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *merc err = s.CheckCallback(ctx, values, lookup, checkResults, i) if err != nil { s.lggr.Errorf("at block %d upkeep %s requested time %s CheckCallback err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) + prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorCheckCallback).Inc() } } func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) error { + prommetrics.AutomationStreamsLookupStep.WithLabelValues(prommetrics.StreamsLookupStepCheckCallback).Inc() payload, err := s.abi.Pack("checkCallback", lookup.UpkeepId, values, lookup.ExtraData) if err != nil { checkResults[i].Retryable = false @@ -243,6 +251,7 @@ func (s *streams) makeCallbackEthCall(ctx context.Context, payload []byte, looku // Does the mercury request for the checkResult. Returns either the looked up values or an error code if something is wrong with mercury // In case of any pipeline processing issues, returns an error and also sets approriate state on the checkResult itself func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, encoding.ErrCode, error) { + prommetrics.AutomationStreamsLookupStep.WithLabelValues(prommetrics.StreamsLookupStepDoMercuryRequest).Inc() var state, values, errCode, retryable, retryInterval = encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeNil, false, 0 * time.Second var err error pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block) @@ -276,11 +285,13 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL func (s *streams) CheckErrorHandler(ctx context.Context, errCode encoding.ErrCode, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) error { s.lggr.Debugf("at block %d upkeep %s requested time %s CheckErrorHandler error code: %d", lookup.Block, lookup.UpkeepId, lookup.Time, errCode) + prommetrics.AutomationStreamsLookupStep.WithLabelValues(prommetrics.StreamsLookupStepCheckErrorHandler).Inc() userPayload, err := s.packer.PackUserCheckErrorHandler(errCode, lookup.ExtraData) if err != nil { checkResults[i].Retryable = false checkResults[i].PipelineExecutionState = uint8(encoding.PackUnpackDecodeFailed) + prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorPackUserCheckErrorHandler).Inc() return err } @@ -288,6 +299,7 @@ func (s *streams) CheckErrorHandler(ctx context.Context, errCode encoding.ErrCod if err != nil { checkResults[i].Retryable = false checkResults[i].PipelineExecutionState = uint8(encoding.PackUnpackDecodeFailed) + prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorPackExecuteCallback).Inc() return err } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go index 6b612a3f350..5e954475a8d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go @@ -21,6 +21,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -172,6 +173,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur sent := false retryErr := retry.Do( func() error { + prommetrics.AutomationStreamsRetries.WithLabelValues(prommetrics.StreamsVersion02).Inc() var httpResponse *http.Response var responseBody []byte var blobBytes []byte @@ -206,6 +208,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur return nil } + prommetrics.AutomationStreamsResponses.WithLabelValues(prommetrics.StreamsVersion02, fmt.Sprintf("%d", httpResponse.StatusCode)).Inc() switch httpResponse.StatusCode { case http.StatusNotFound, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: // Considered as pipeline error, but if retry attempts go over threshold, is changed upstream to ErrCode diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go index 8ac8696ddbb..39a26b6b5d9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go @@ -20,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -149,6 +150,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur defer cancel() retryErr := retry.Do( func() error { + prommetrics.AutomationStreamsRetries.WithLabelValues(prommetrics.StreamsVersion03).Inc() retryable = false resp, err := c.httpClient.Do(req) if err != nil { @@ -180,6 +182,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur } c.lggr.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + prommetrics.AutomationStreamsResponses.WithLabelValues(prommetrics.StreamsVersion03, fmt.Sprintf("%d", resp.StatusCode)).Inc() switch resp.StatusCode { case http.StatusUnauthorized: c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go index 0925ce1c153..6b68f5c6afd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go @@ -5,8 +5,36 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -// AutomationNamespace is the namespace for all Automation related metrics -const AutomationLogTriggerNamespace = "automation_log_trigger" +// Namespaces +const ( + NamespaceAutomationLogTrigger = "automation_log_trigger" + NamespaceAutomationStreams = "automation_streams" +) + +// Streams steps +const ( + StreamsLookupStepDoMercuryRequest = "do_mercury_request" + StreamsLookupStepCheckErrorHandler = "check_error_handler" + StreamsLookupStepCheckCallback = "check_callback" +) + +// Streams error labels +const ( + StreamsLookupErrorReasonNotReverted = "reason_not_target_check_reverted" + StreamsLookupErrorDecodeRequestFailed = "decode_request_failed" + StreamsLookupErrorCredentialsNotConfigured = "credentials_not_configured" + StreamsLookupErrorDoMercuryRequest = "do_mercury_request" + StreamsLookupErrorCodeNotNil = "err_code_not_nil" + StreamsLookupErrorCheckCallback = "check_callback" + StreamsLookupErrorPackUserCheckErrorHandler = "pack_user_check_error_handler" + StreamsLookupErrorPackExecuteCallback = "pack_execute_callback" +) + +// Streams versions +const ( + StreamsVersion02 = "v02" + StreamsVersion03 = "v03" +) // Metric labels const ( @@ -17,31 +45,63 @@ const ( // Automation metrics var ( + // Log Trigger metrics AutomationLogBufferFlow = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: AutomationLogTriggerNamespace, + Namespace: NamespaceAutomationLogTrigger, Name: "num_logs_in_log_buffer", Help: "The total number of logs currently being stored in the log buffer", }, []string{ "direction", }) AutomationRecovererMissedLogs = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: AutomationLogTriggerNamespace, + Namespace: NamespaceAutomationLogTrigger, Name: "num_recoverer_missed_logs", Help: "How many valid log triggers were identified as being missed by the recoverer", }) AutomationRecovererPendingPayloads = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: AutomationLogTriggerNamespace, + Namespace: NamespaceAutomationLogTrigger, Name: "num_recoverer_pending_payloads", Help: "How many log trigger payloads are currently pending in the recoverer", }) AutomationActiveUpkeeps = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: AutomationLogTriggerNamespace, + Namespace: NamespaceAutomationLogTrigger, Name: "num_active_upkeeps", Help: "How many log trigger upkeeps are currently active", }) AutomationLogProviderLatestBlock = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: AutomationLogTriggerNamespace, + Namespace: NamespaceAutomationLogTrigger, Name: "log_provider_latest_block", Help: "The latest block number the log provider has seen", }) + + // Streams metrics + AutomationStreamsLookupStep = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: NamespaceAutomationStreams, + Name: "streams_lookup_step_count", + Help: "How many times individual steps of the streams lookup process run", + }, []string{ + "step", + }) + AutomationStreamsLookupError = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: NamespaceAutomationStreams, + Name: "streams_lookup_error_count", + Help: "Errors occurred during a streams lookup attempt", + }, []string{ + "error", + }) + AutomationStreamsRetries = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: NamespaceAutomationStreams, + Name: "streams_retries", + Help: "Count of the times a streams lookup was retried", + }, []string{ + "version", + }) + AutomationStreamsResponses = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: NamespaceAutomationStreams, + Name: "streams_responses", + Help: "Count of individual response codes from streams lookup", + }, []string{ + "version", + "status", + }) )