Skip to content

Commit

Permalink
feat(jdbc): adapt ui to remove lucene expression
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 6ea7118 commit eb88681
Show file tree
Hide file tree
Showing 43 changed files with 616 additions and 291 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ out/

### Configurations
docker-compose.override.yml
cli/src/main/resources/application-override.yml
cli/src/main/resources/application-*.yml
*/src/test/resources/application-test.yml
/local

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,31 @@ public static ExecutionUsage of(ExecutionRepositoryInterface executionRepository
.atStartOfDay(ZoneId.systemDefault())
.minusDays(1);

List<DailyExecutionStatistics> dailyTaskRunsCount = null;

try {
dailyTaskRunsCount = executionRepository.dailyStatistics(
null,
null,
null,
startDate,
ZonedDateTime.now(),
true
);
} catch (UnsupportedOperationException ignored) {

}

return ExecutionUsage.builder()
.dailyExecutionsCount(executionRepository.dailyStatistics(
"*",
null,
null,
null,
startDate,
ZonedDateTime.now(),
false
))
.dailyTaskRunsCount(executionRepository.dailyStatistics(
"*",
startDate,
ZonedDateTime.now(),
true
))
.dailyTaskRunsCount(dailyTaskRunsCount)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,64 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nullable;

public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Execution> {
Optional<Execution> findById(String id);

ArrayListTotal<Execution> findByFlowId(String namespace, String id, Pageable pageable);

ArrayListTotal<Execution> find(String query, Pageable pageable, List<State.Type> state);
ArrayListTotal<Execution> find(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
);

ArrayListTotal<TaskRun> findTaskRun(String query, Pageable pageable, List<State.Type> state);
ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> states
);

Integer maxTaskRunSetting();

List<DailyExecutionStatistics> dailyStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean isTaskRun);
List<DailyExecutionStatistics> dailyStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean isTaskRun
);

Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean groupByNamespaceOnly);
Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean groupByNamespaceOnly
);

List<ExecutionCount> executionCounts(List<Flow> flows, String query, ZonedDateTime startDate, ZonedDateTime endDate);
List<ExecutionCount> executionCounts(
List<Flow> flows,
@Nullable List<State.Type> states,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
);

Execution save(Execution flow);

default Function<String, String> sortMapping() throws IllegalArgumentException {
return s -> s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;

import javax.annotation.Nullable;
import javax.validation.ConstraintViolationException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -41,9 +42,9 @@ default Optional<Flow> findById(String namespace, String id) {

List<Flow> findByNamespace(String namespace);

ArrayListTotal<Flow> find(String query, Pageable pageable);
ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace);

ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable);
ArrayListTotal<SearchResult<Flow>> findSourceCode(Pageable pageable, @Nullable String query, @Nullable String namespace);

List<String> findDistinctNamespace();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import io.kestra.core.models.executions.LogEntry;
import org.slf4j.event.Level;

import java.time.ZonedDateTime;
import java.util.List;
import javax.annotation.Nullable;

public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry> {
List<LogEntry> findByExecutionId(String id, Level minLevel);
Expand All @@ -13,7 +15,13 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry

List<LogEntry> findByExecutionIdAndTaskRunId(String executionId, String taskRunId, Level minLevel);

ArrayListTotal<LogEntry> find(String query, Pageable pageable, Level minLevel);
ArrayListTotal<LogEntry> find(
Pageable pageable,
@Nullable String query,
@Nullable Level minLevel,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
);

LogEntry save(LogEntry log);
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package io.kestra.runner.memory;
package io.kestra.core.schedulers;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;

import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
@MemoryQueueEnabled
public class MemorySchedulerExecutionState implements SchedulerExecutionStateInterface {
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
@Inject
private ExecutionRepositoryInterface executionRepository;

Expand Down
11 changes: 1 addition & 10 deletions core/src/main/java/io/kestra/core/tasks/executions/Counts.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,9 @@ public Output run(RunContext runContext) throws Exception {
.getApplicationContext()
.getBean(ExecutionRepositoryInterface.class);

String query = null;
if (this.states != null) {
query = "state.current:(" + this.states
.stream()
.map(Enum::name)
.collect(Collectors.joining(" OR "))
+ ")";
}

List<ExecutionCount> executionCounts = executionRepository.executionCounts(
flows,
query,
this.states,
startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null,
endDate != null ? ZonedDateTime.parse(runContext.render(endDate)) : null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected void inject() {
protected void find() {
inject();

ArrayListTotal<Execution> executions = executionRepository.find("*", Pageable.from(1, 10), null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));
}
Expand All @@ -114,7 +114,7 @@ protected void find() {
protected void findTaskRun() {
inject();

ArrayListTotal<TaskRun> executions = executionRepository.findTaskRun("*", Pageable.from(1, 10), null);
ArrayListTotal<TaskRun> executions = executionRepository.findTaskRun(Pageable.from(1, 10), null, null, null, null, null, null);
assertThat(executions.getTotal(), is(71L));
assertThat(executions.size(), is(10));
}
Expand Down Expand Up @@ -155,7 +155,9 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException {
Thread.sleep(500);

Map<String, Map<String, List<DailyExecutionStatistics>>> result = executionRepository.dailyGroupByFlowStatistics(
"*",
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
Expand All @@ -180,7 +182,9 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException {
assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L));

result = executionRepository.dailyGroupByFlowStatistics(
"*",
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
Expand Down Expand Up @@ -210,7 +214,9 @@ protected void dailyStatistics() throws InterruptedException {
Thread.sleep(500);

List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
"*",
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
Expand All @@ -235,7 +241,9 @@ protected void taskRunsDailyStatistics() {
}

List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
"*",
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
Expand Down Expand Up @@ -270,7 +278,7 @@ protected void executionsCount() throws InterruptedException {
new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "third"),
new Flow(NAMESPACE, "missing")
),
"*",
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ void removeTriggerDelete() throws InterruptedException {
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count(), is(1L));
}

@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace();
assertThat((long) distinctNamespace.size(), is(2L));
}

@Singleton
public static class FlowListener implements ApplicationEventListener<CrudEvent<Flow>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public abstract class AbstractLogRepositoryTest {
@Inject
protected LogRepositoryInterface logRepository;

private static LogEntry.LogEntryBuilder logEntry() {
private static LogEntry.LogEntryBuilder logEntry(Level level) {
return LogEntry.builder()
.flowId(IdUtils.create())
.namespace("io.kestra.unittest")
Expand All @@ -28,25 +28,28 @@ private static LogEntry.LogEntryBuilder logEntry() {
.taskRunId(IdUtils.create())
.attemptNumber(0)
.timestamp(Instant.now())
.level(Level.INFO)
.level(level)
.thread("")
.message("john doe");
}

@Test
void all() {
LogEntry.LogEntryBuilder builder = logEntry();
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);

ArrayListTotal<LogEntry> find = logRepository.find("*", Pageable.UNPAGED, null);
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, null, null, null, null);
assertThat(find.size(), is(0));

LogEntry save = logRepository.save(builder.build());

find = logRepository.find("doe", Pageable.UNPAGED, null);
find = logRepository.find(Pageable.UNPAGED, "doe", null, null, null);
assertThat(find.size(), is(1));
assertThat(find.get(0).getExecutionId(), is(save.getExecutionId()));

find = logRepository.find("*", Pageable.UNPAGED, null);
find = logRepository.find(Pageable.UNPAGED, "doe", Level.WARN,null, null);
assertThat(find.size(), is(0));

find = logRepository.find(Pageable.UNPAGED, null, null, null, null);
assertThat(find.size(), is(1));
assertThat(find.get(0).getExecutionId(), is(save.getExecutionId()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ CREATE TABLE `executions` (
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL,
`state_duration` BIGINT GENERATED ALWAYS AS (value ->> '$.state.duration' * 1000) STORED NOT NULL,
`start_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.startDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,
`end_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.endDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED,
INDEX ix_id (id),
INDEX ix_namespace (namespace),
INDEX ix_flowId (flow_id),
INDEX ix_state_current (state_current),
INDEX ix_start_date (start_date),
INDEX ix_end_date (end_date),
INDEX ix_state_duration (state_duration),
INDEX ix_deleted (deleted),
FULLTEXT ix_fulltext (namespace, flow_id, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ protected Condition findCondition(String query) {

@Override
protected Condition findSourceCodeCondition(String query) {
return DSL.condition("source_code @@ TO_TSQUERY('simple', ?)", query);
return this.jdbcRepository.fullTextCondition(Collections.singletonList("FULLTEXT_INDEX(source_code)"), query);
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package io.kestra.repository.postgres;

import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.jdbc.repository.AbstractLogRepository;
import io.kestra.jdbc.repository.AbstractTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.impl.DSL;
import org.slf4j.event.Level;

import java.util.Collections;
import java.util.stream.Collectors;


@Singleton
@PostgresRepositoryEnabled
Expand All @@ -25,4 +26,15 @@ public PostgresLogRepository(ApplicationContext applicationContext) {
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query);
}

@Override
protected Condition minLevel(Level minLevel) {
return DSL.condition("level in (" +
LogEntry
.findLevelsByMin(minLevel)
.stream()
.map(s -> "'" + s + "'::log_level")
.collect(Collectors.joining(", ")) +
")");
}
}
Loading

0 comments on commit eb88681

Please sign in to comment.