Skip to content

Commit

Permalink
feat(jdbc): fix taskruns search
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 2, 2022
1 parent 5319612 commit f99a099
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 94 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/models/flows/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public boolean isCreated() {
@JsonIgnore
public static Type[] runningTypes() {
return Arrays.stream(Type.values())
.filter(Type::isRunning)
.filter(type -> type.isRunning() || type.isCreated())
.toArray(Type[]::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ public class MultipleConditionWindow {

String conditionId;

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm[:ss][.SSS]XXX")
ZonedDateTime start;

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm[:ss][.SSS]XXX")
ZonedDateTime end;

Map<String, Boolean> results;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,4 @@ public class PostgresMultipleConditionStorage extends AbstractJdbcMultipleCondit
public PostgresMultipleConditionStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(MultipleConditionWindow.class, applicationContext));
}

@Override
public List<MultipleConditionWindow> expired() {
ZonedDateTime now = ZonedDateTime.now();

// bug on postgres with timestamp, use unix integer

return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(
DSL.field("start_date").lessOrEqual((int) now.toInstant().getEpochSecond())
.and(DSL.field("end_date").lessOrEqual((int) now.toInstant().getEpochSecond()))
);

return this.jdbcRepository.fetch(select);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ CREATE TABLE multipleconditions (
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED,
condition_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'conditionId') STORED,
start_date INT NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_TIMESTAMP(value ->> 'start')) STORED,
end_date INT NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_TIMESTAMP(value ->> 'end')) STORED
start_date TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'start')) STORED,
end_date TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'end')) STORED
);

CREATE INDEX multipleconditions_namespace__flow_id__condition_id ON multipleconditions (namespace, flow_id, condition_id);
Expand Down
2 changes: 0 additions & 2 deletions jdbc-postgres/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,4 @@
<appender-ref ref="STDOUT" />
<appender-ref ref="STDERR" />
</root>

<logger name="org.jooq" level="TRACE" />
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public List<MultipleConditionWindow> expired() {
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(
DSL.field("start_date").lt(now.toInstant())
.and(DSL.field("end_date").lt(now.toInstant()))
DSL.field("start_date").lt(now.toOffsetDateTime())
.and(DSL.field("end_date").lt(now.toOffsetDateTime()))
);

return this.jdbcRepository.fetch(select);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,16 @@ protected static QueryStringQueryBuilder queryString(@Nullable String query) {
return QueryBuilders.queryStringQuery("*");
}

String lucene;

List<String> words = Arrays.stream(query.split("[^a-zA-Z0-9_.-]+"))
.filter(r -> !r.equals(""))
.map(QueryParser::escape)
.collect(Collectors.toList());

String lucene = "(*" + String.join("*", words) + "*)^3 OR (*" + String.join("* AND *", words) + "*)";


if (words.size() == 1) {
lucene = "(" + query + ")^5 OR " + query;
} else {
lucene = "(*" + String.join("*", words) + "*)^3 OR (*" + String.join("* AND *", words) + "*)";
lucene = "(" + query + ")^5 OR " + lucene;
}

return QueryBuilders.queryStringQuery(lucene);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,33 +291,39 @@ public List<DailyExecutionStatistics> dailyStatistics(
}

private BoolQueryBuilder filters(String query, ZonedDateTime startDate, ZonedDateTime endDate, String namespace, String flowId, List<State.Type> state) {
BoolQueryBuilder bool = this.defaultFilter();
return this.filters(query, startDate, endDate, namespace, flowId, state, false);
}

private BoolQueryBuilder filters(String query, ZonedDateTime startDate, ZonedDateTime endDate, String namespace, String flowId, List<State.Type> state, boolean isTaskRun) {
String prefix = isTaskRun ? "taskRunList." : "";

BoolQueryBuilder bool = isTaskRun ? QueryBuilders.boolQuery() : this.defaultFilter();

if (query != null) {
bool.must(queryString(query).field("*.fulltext"));
if (isTaskRun) {
bool.must(queryString(query));
} else {
bool.must(queryString(query).field("*.fulltext"));
}
}

if (flowId != null && namespace != null) {
bool = bool.must(QueryBuilders.matchQuery("flowId", flowId));
bool = bool.must(QueryBuilders.matchQuery("namespace", namespace));
bool = bool.must(QueryBuilders.matchQuery(prefix + "flowId", flowId));
bool = bool.must(QueryBuilders.matchQuery(prefix + "namespace", namespace));
} else if (namespace != null) {
bool = bool.must(QueryBuilders.prefixQuery("namespace", namespace));
bool = bool.must(QueryBuilders.prefixQuery(prefix + "namespace", namespace));
}

if (startDate != null) {
bool.must(QueryBuilders.rangeQuery("state.startDate").gte(startDate));
bool.must(QueryBuilders.rangeQuery(prefix + "state.startDate").gte(startDate));
}

if (endDate != null) {
bool.must(QueryBuilders.rangeQuery("state.startDate").lte(endDate));
bool.must(QueryBuilders.rangeQuery(prefix + "state.startDate").lte(endDate));
}

if (state != null) {
bool = bool.must(QueryBuilders.termsQuery("state.current", stateConvert(state)));
}

if (query != null) {
bool.must(queryString(query).field("*.fulltext"));
bool = bool.must(QueryBuilders.termsQuery(prefix + "state.current", stateConvert(state)));
}

return bool;
Expand Down Expand Up @@ -411,15 +417,7 @@ public ArrayListTotal<TaskRun> findTaskRun(
@javax.annotation.Nullable ZonedDateTime endDate,
@Nullable List<State.Type> states
) {
BoolQueryBuilder filterAggQuery = QueryBuilders.boolQuery();

if (query != null) {
filterAggQuery.must(QueryBuilders.queryStringQuery(query).field("*.fulltext"));
}

if (states != null) {
filterAggQuery = filterAggQuery.must(QueryBuilders.termsQuery("taskRunList.state.current", stateConvert(states)));
}
BoolQueryBuilder filterAggQuery = filters(query, startDate, endDate, namespace, flowId, states, true);

NestedAggregationBuilder nestedAgg = AggregationBuilders
.nested("NESTED", "taskRunList")
Expand All @@ -434,12 +432,7 @@ public ArrayListTotal<TaskRun> findTaskRun(
)
);

BoolQueryBuilder mainQuery = this.defaultFilter()
.filter(QueryBuilders.nestedQuery(
"taskRunList",
query != null ? QueryBuilders.queryStringQuery(query).field("*.fulltext") : QueryBuilders.matchAllQuery(),
ScoreMode.Total
));
BoolQueryBuilder mainQuery = this.defaultFilter();

SearchSourceBuilder sourceBuilder = this.searchSource(mainQuery, Optional.of(List.of(nestedAgg)), null)
.fetchSource(false);
Expand Down
2 changes: 1 addition & 1 deletion ui/src/components/layout/Pagination.vue
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/>
</div>

<small v-if="max" class="d-sm-none d-md text-total">
<small v-if="max" class="d-md-none d-lg-block total btn-outline-light mr-1">
{{ $t('Max displayable') }}: {{ max }}
</small>

Expand Down
45 changes: 15 additions & 30 deletions ui/src/components/taskruns/TaskRuns.vue
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@
import Kicon from "../Kicon"
import RestoreUrl from "../../mixins/restoreUrl";
import State from "../../utils/state";
import qb from "../../utils/queryBuilder";
import Id from "../Id";
import _merge from "lodash/merge";
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions],
Expand Down Expand Up @@ -228,6 +228,9 @@
.toDate();
}
},
created() {
this.$store.dispatch("taskrun/maxTaskRunSetting");
},
methods: {
isRunning(item){
return State.isRunning(item.state.current);
Expand All @@ -238,50 +241,32 @@
params: {namespace: item.namespace, flowId: item.flowId, id: item.executionId, tab: "gantt"},
});
},
loadQuery(stats) {
let filter = []
let query = this.queryWithFilter();
loadQuery(base, stats) {
let queryFilter = this.queryWithFilter();
if (query.namespace) {
filter.push(`${!stats ? "taskRunList.namespace" : "namespace"}:${query.namespace}*`)
if (stats) {
delete queryFilter["startDate"];
delete queryFilter["endDate"];
}
if (query.q) {
filter.push(qb.toLucene(query.q));
}
if (query.start && !stats) {
filter.push(`taskRunList.state.startDate:[${query.start} TO *]`)
}
if (query.end && !stats) {
filter.push(`taskRunList.state.endDate:[* TO ${query.end}]`)
}
return filter.join(" AND ") || "*"
return _merge(base, queryFilter)
},
loadData(callback) {
this.$store
.dispatch("stat/taskRunDaily", {
q: this.loadQuery(true),
.dispatch("stat/taskRunDaily", this.loadQuery({
startDate: this.$moment(this.startDate).startOf("day").add(-1, "day").toISOString(true),
endDate: this.$moment(this.endDate).endOf("day").toISOString(true)
})
}, true))
.then(() => {
this.dailyReady = true;
});
this.$store.dispatch("taskrun/maxTaskRunSetting");
this.$store
.dispatch("taskrun/findTaskRuns", {
.dispatch("taskrun/findTaskRuns", this.loadQuery({
size: parseInt(this.$route.query.size || 25),
page: parseInt(this.$route.query.page || 1),
q: this.loadQuery(false),
sort: this.$route.query.sort || "taskRunList.state.startDate:desc",
state: this.$route.query.status
})
state: this.$route.query.status ? [this.$route.query.status] : this.statuses
}, false))
.finally(callback);
},
durationFrom(item) {
Expand Down

0 comments on commit f99a099

Please sign in to comment.