Skip to content

Commit

Permalink
fix(ui): dailyGroupByFlowStatistics is not filtered correctly
Browse files Browse the repository at this point in the history
we keep all lucene query in the front office
  • Loading branch information
tchiotludo committed Sep 19, 2022
1 parent c78f773 commit 2202dab
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.micronaut.data.model.Pageable;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Execution> {
Boolean isTaskRunEnabled();
Expand Down Expand Up @@ -59,11 +63,22 @@ Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatist
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable List<FlowFilter> flows,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean groupByNamespaceOnly
);

@Getter
@SuperBuilder
@NoArgsConstructor
class FlowFilter {
@NotNull
private String namespace;
@NotNull
private String id;
}

List<ExecutionCount> executionCounts(
List<Flow> flows,
@Nullable List<State.Type> states,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException {
null,
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
Expand Down Expand Up @@ -188,6 +189,7 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException {
null,
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
Expand All @@ -202,6 +204,20 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException {
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(20L));
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));

result = executionRepository.dailyGroupByFlowStatistics(
null,
null,
null,
List.of(ExecutionRepositoryInterface.FlowFilter.builder().namespace("io.kestra.unittest").id(FLOW).build()),
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
);

assertThat(result.size(), is(1));
assertThat(result.get("io.kestra.unittest").size(), is(1));
assertThat(result.get("io.kestra.unittest").get(FLOW).size(), is(11));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public ArrayListTotal<Execution> find(
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());

select = filteringQuery(select, namespace, flowId, query);
select = filteringQuery(select, namespace, flowId, null, query);

if (startDate != null) {
select = select.and(field("start_date").greaterOrEqual(startDate.toOffsetDateTime()));
Expand Down Expand Up @@ -194,6 +194,7 @@ public List<DailyExecutionStatistics> dailyStatistics(
query,
namespace,
flowId,
null,
startDate,
endDate
);
Expand Down Expand Up @@ -225,6 +226,7 @@ private Results dailyStatisticsQuery(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
List<FlowFilter> flows,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
) {
Expand All @@ -251,7 +253,7 @@ private Results dailyStatisticsQuery(
.and(field("start_date").greaterOrEqual(finalStartDate.toOffsetDateTime()))
.and(field("start_date").lessOrEqual(finalEndDate.toOffsetDateTime()));

select = filteringQuery(select, namespace, flowId, query);
select = filteringQuery(select, namespace, flowId, flows, query);

List<Field<?>> groupFields = new ArrayList<>();
if (context.configuration().dialect() != SQLDialect.H2) {
Expand All @@ -273,6 +275,7 @@ private <T extends Record> SelectConditionStep<T> filteringQuery(
SelectConditionStep<T> select,
@Nullable String namespace,
@Nullable String flowId,
@Nullable List<FlowFilter> flows,
@Nullable String query
) {
if (flowId != null && namespace != null) {
Expand All @@ -286,6 +289,17 @@ private <T extends Record> SelectConditionStep<T> filteringQuery(
select = select.and(this.findCondition(query));
}

if (flows != null) {
select = select.and(DSL.or(
flows
.stream()
.map(e -> field("namespace").eq(e.getNamespace())
.and(field("flow_id").eq(e.getId()))
)
.collect(Collectors.toList())
));
}

return select;
}

Expand All @@ -295,6 +309,7 @@ public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlow
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable List<FlowFilter> flows,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean groupByNamespaceOnly
Expand All @@ -314,6 +329,7 @@ public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlow
query,
namespace,
flowId,
flows,
startDate,
endDate
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.lucene.search.join.ScoreMode;
import jakarta.inject.Singleton;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
Expand Down Expand Up @@ -54,9 +53,6 @@
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import javax.annotation.Nullable;

@Singleton
Expand Down Expand Up @@ -104,6 +100,7 @@ public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlow
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable List<FlowFilter> flows,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean groupByNamespaceOnly
Expand All @@ -129,7 +126,7 @@ public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlow
);

SearchSourceBuilder sourceBuilder = this.searchSource(
this.filters(query, startDate, endDate, namespace, flowId, null),
this.filters(query, startDate, endDate, namespace, flowId, flows, null),
Optional.of(Collections.singletonList(
agg
)),
Expand Down Expand Up @@ -212,7 +209,7 @@ public List<ExecutionCount> executionCounts(
}

SearchSourceBuilder sourceBuilder = this.searchSource(
this.filters(null, startDate, endDate, null, null, states),
this.filters(null, startDate, endDate, null, null, null, states),
Optional.of(Collections.singletonList(
AggregationBuilders.filters(
"FILTERS",
Expand Down Expand Up @@ -278,7 +275,7 @@ public List<DailyExecutionStatistics> dailyStatistics(
}

SearchSourceBuilder sourceBuilder = this.searchSource(
this.filters(query, startDate, endDate, namespace, flowId, null),
this.filters(query, startDate, endDate, namespace, flowId, null, null),
Optional.of(Collections.singletonList(
agg
)),
Expand Down Expand Up @@ -310,11 +307,28 @@ public List<DailyExecutionStatistics> dailyStatistics(
}
}

private BoolQueryBuilder filters(String query, ZonedDateTime startDate, ZonedDateTime endDate, String namespace, String flowId, List<State.Type> state) {
return this.filters(query, startDate, endDate, namespace, flowId, state, false);
private BoolQueryBuilder filters(
String query,
ZonedDateTime startDate,
ZonedDateTime endDate,
String namespace,
String flowId,
List<FlowFilter> flows,
List<State.Type> state
) {
return this.filters(query, startDate, endDate, namespace, flowId, flows, state, false);
}

private BoolQueryBuilder filters(String query, ZonedDateTime startDate, ZonedDateTime endDate, String namespace, String flowId, List<State.Type> state, boolean isTaskRun) {
private BoolQueryBuilder filters(
String query,
ZonedDateTime startDate,
ZonedDateTime endDate,
String namespace,
String flowId,
List<FlowFilter> flows,
List<State.Type> state,
boolean isTaskRun
) {
String prefix = isTaskRun ? "taskRunList." : "";

BoolQueryBuilder bool = isTaskRun ? QueryBuilders.boolQuery() : this.defaultFilter();
Expand All @@ -327,6 +341,21 @@ private BoolQueryBuilder filters(String query, ZonedDateTime startDate, ZonedDat
}
}

if (flows != null) {
BoolQueryBuilder flowsBool = QueryBuilders.boolQuery()
.minimumShouldMatch(1);

flows
.stream()
.map(e -> QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery(prefix + "flowId", e.getId()))
.must(QueryBuilders.matchQuery(prefix + "namespace", e.getNamespace()))
)
.forEach(flowsBool::should);

bool.must(flowsBool);
}

if (flowId != null && namespace != null) {
bool = bool.must(QueryBuilders.matchQuery(prefix + "flowId", flowId));
bool = bool.must(QueryBuilders.matchQuery(prefix + "namespace", namespace));
Expand Down Expand Up @@ -420,7 +449,7 @@ public ArrayListTotal<Execution> find(
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
) {
BoolQueryBuilder bool = this.filters(query, startDate, endDate, namespace, flowId, state);
BoolQueryBuilder bool = this.filters(query, startDate, endDate, namespace, flowId, null, state);

SearchSourceBuilder sourceBuilder = this.searchSource(bool, Optional.empty(), pageable);

Expand All @@ -437,7 +466,7 @@ public ArrayListTotal<TaskRun> findTaskRun(
@javax.annotation.Nullable ZonedDateTime endDate,
@Nullable List<State.Type> states
) {
BoolQueryBuilder filterAggQuery = filters(query, startDate, endDate, namespace, flowId, states, true);
BoolQueryBuilder filterAggQuery = filters(query, startDate, endDate, namespace, flowId, null, states, true);

NestedAggregationBuilder nestedAgg = AggregationBuilders
.nested("NESTED", "taskRunList")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,15 @@ public Execution save(Execution execution) {
}

@Override
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(@Nullable String query, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, boolean groupByNamespaceOnly) {
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable List<FlowFilter> flows,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean groupByNamespaceOnly
) {
throw new UnsupportedOperationException();
}

Expand Down
9 changes: 4 additions & 5 deletions ui/src/components/flows/Flows.vue
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,13 @@
callback();
if (flows.results && flows.results.length > 0) {
let query = "((" + flows.results
.map(flow => "flowId:" + flow.id + " AND namespace:" + flow.namespace)
.join(") OR (") + "))"
if (this.user && this.user.hasAny(permission.EXECUTION)) {
this.$store
.dispatch("stat/dailyGroupByFlow", {
q: query,
flows: flows.results
.map(flow => {
return {namespace: flow.namespace, id: flow.id}
}),
startDate: this.$moment(this.startDate).add(-1, "day").startOf("day").toISOString(true),
endDate: this.$moment(this.endDate).endOf("day").toISOString(true)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class StatsController {
@Inject
protected ExecutionRepositoryInterface executionRepository;


@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/daily", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Stats"}, summary = "Get daily statistics for executions")
Expand Down Expand Up @@ -75,15 +74,16 @@ public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlow
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "A list of flows filter") @Nullable List<ExecutionRepositoryInterface.FlowFilter> flows,
@Parameter(description = "The start datetime, default to now - 30 days") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime, default to now") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "Return only namespace result and skip flows") @Nullable Boolean namespaceOnly
) {

return executionRepository.dailyGroupByFlowStatistics(
query,
namespace,
flowId,
flows != null && flows.get(0).getNamespace() != null ? flows : null,
startDate != null ? startDate.withZoneSameInstant(ZoneId.systemDefault()) : null,
endDate != null ? endDate.withZoneSameInstant(ZoneId.systemDefault()) : null,
namespaceOnly != null && namespaceOnly
Expand Down

0 comments on commit 2202dab

Please sign in to comment.