Skip to content

Commit

Permalink
feat(jdbc): add metrics & logs to sql query
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 746bcbd commit 8a3bae4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class MetricRegistry {

public final static String STREAMS_STATE_COUNT = "stream.state.count";


public final static String JDBC_QUERY_DURATION = "jdbc.query.duration";

public final static String TAG_TASK_TYPE = "task_type";
public final static String TAG_FLOW_ID = "flow_id";
public final static String TAG_NAMESPACE_ID = "namespace_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, List<Int
.set(
AbstractRepository.field("consumers"),
DSL.field(
"\"consumers\" || ?",
"\"consumers\" || CAST(? AS queue_consumers[])",
SQLDataType.VARCHAR(50).getArrayType(),
(Object) new String[]{consumerGroup}
)
Expand Down
50 changes: 50 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/JooqExecuteListenerProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.kestra.jdbc;

import io.kestra.core.metrics.MetricRegistry;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.jooq.ExecuteContext;
import org.jooq.ExecuteListener;
import org.jooq.impl.DefaultExecuteListener;

import java.time.Duration;
import javax.validation.constraints.NotNull;

@Singleton
@Named("default")
@Slf4j
public class JooqExecuteListenerProvider implements org.jooq.ExecuteListenerProvider {
@Inject
MetricRegistry metricRegistry;

@Override
public @NotNull ExecuteListener provide() {
return new DefaultExecuteListener() {
Long startTime;

@Override
public void executeStart(ExecuteContext ctx) {
super.executeStart(ctx);
startTime = System.currentTimeMillis();
}

@Override
public void executeEnd(ExecuteContext ctx) {
super.executeEnd(ctx);

Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);

metricRegistry.timer(MetricRegistry.JDBC_QUERY_DURATION, "sql", ctx.sql())
.record(duration);

if (log.isTraceEnabled()) {
log.trace("[Duration: {}] [Rows: {}] [Query: {}]", duration, ctx.rows() , ctx.query().toString());
} else if (log.isDebugEnabled()) {
log.debug("[Duration: {}] [Rows: {}] [Query: {}]", duration, ctx.rows() , ctx.sql());
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import io.kestra.core.queues.QueueService;
import io.kestra.core.runners.ExecutionDelay;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;

@Singleton
@Replaces(QueueService.class)
public class JdbcQueueService extends QueueService{
@Requires(property = "kestra.queue.type", pattern = "mysql|postgres|h2")
public class JdbcQueueService extends QueueService {
public String key(Object object) {
if (object.getClass() == JdbcExecutorState.class) {
return ((JdbcExecutorState) object).getExecutionId();
Expand Down

0 comments on commit 8a3bae4

Please sign in to comment.