Skip to content

Commit

Permalink
fix(core): add trigger type to worker trigger metrics
Browse files Browse the repository at this point in the history
Fixes #4259
  • Loading branch information
loicmathieu committed Jul 8, 2024
1 parent dc4c78a commit fcb249f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 24 deletions.
32 changes: 10 additions & 22 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private String metricName(String name) {
*
* @param workerTask the current WorkerTask
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(WorkerTask workerTask, String workerGroup, String... tags) {
var baseTags = ArrayUtils.addAll(
Expand All @@ -166,7 +166,7 @@ public String[] tags(WorkerTask workerTask, String workerGroup, String... tags)
*
* @param workerTrigger the current WorkerTask
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(WorkerTrigger workerTrigger, String workerGroup, String... tags) {
var baseTags = ArrayUtils.addAll(
Expand All @@ -186,7 +186,7 @@ public String[] tags(WorkerTrigger workerTrigger, String workerGroup, String...
* Return tags for current {@link WorkerTaskResult}
*
* @param workerTaskResult the current WorkerTaskResult
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(WorkerTaskResult workerTaskResult, String... tags) {
var baseTags = ArrayUtils.addAll(
Expand All @@ -202,7 +202,7 @@ public String[] tags(WorkerTaskResult workerTaskResult, String... tags) {
* Return tags for current {@link WorkerTaskResult}
*
* @param subflowExecutionResult the current WorkerTaskResult
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(SubflowExecutionResult subflowExecutionResult, String... tags) {
var baseTags = ArrayUtils.addAll(
Expand All @@ -218,7 +218,7 @@ public String[] tags(SubflowExecutionResult subflowExecutionResult, String... ta
* Return tags for current {@link Task}
*
* @param task the current Task
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(Task task) {
return new String[]{
Expand All @@ -242,7 +242,7 @@ public String[] tags(AbstractTrigger trigger) {
* Return tags for current {@link Execution}
*
* @param execution the current Execution
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(Execution execution) {
var baseTags = new String[]{
Expand All @@ -257,33 +257,21 @@ public String[] tags(Execution execution) {
* Return tags for current {@link TriggerContext}
*
* @param triggerContext the current TriggerContext
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(TriggerContext triggerContext, String workerGroup) {
public String[] tags(TriggerContext triggerContext) {
var baseTags = new String[]{
TAG_FLOW_ID, triggerContext.getFlowId(),
TAG_NAMESPACE_ID, triggerContext.getNamespace()
};
baseTags = workerGroup == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_WORKER_GROUP, workerGroup);
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
}

/**
* Return tags for current {@link TriggerContext}
*
* @param triggerContext the current TriggerContext
* @return tags to applied to metrics
*/
public String[] tags(TriggerContext triggerContext) {
return tags(triggerContext, null);
}

/**
* Return tags for current {@link SchedulerExecutionWithTrigger}.
*
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
return ArrayUtils.addAll(
Expand All @@ -296,7 +284,7 @@ public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger
/**
* Return globals tags
*
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public Tags tags(String... tags) {
return Tags.of(tags);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,13 @@ private void handleTrigger(WorkerTrigger workerTrigger) {
triggerQueue.emit(trigger);

this.metricRegistry
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger, workerGroup))
.record(() -> {
StopWatch stopWatch = new StopWatch();
stopWatch.start();

this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);

DefaultRunContext runContext = (DefaultRunContext)workerTrigger.getConditionContext().getRunContext();
Expand Down

0 comments on commit fcb249f

Please sign in to comment.