Skip to content

Commit

Permalink
feat(core): add labels on flows
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 55e5768 commit 0bc9485
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 30 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {

String description;

Map<String, String> labels;

@Valid
List<Input> inputs;

Expand Down Expand Up @@ -309,6 +311,7 @@ public Flow toDeleted() {
this.namespace,
this.revision + 1,
this.description,
this.labels,
this.inputs,
this.variables,
this.tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import javax.annotation.Nullable;
import javax.validation.ConstraintViolationException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface FlowRepositoryInterface {
Expand Down Expand Up @@ -42,7 +43,12 @@ default Optional<Flow> findById(String namespace, String id) {

List<Flow> findByNamespace(String namespace);

ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace);
ArrayListTotal<Flow> find(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable Map<String, String> labels
);

ArrayListTotal<SearchResult<Flow>> findSourceCode(Pageable pageable, @Nullable String query, @Nullable String namespace);

Expand Down
4 changes: 4 additions & 0 deletions core/src/test/resources/flows/valids/logs.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
id: logs
namespace: io.kestra.tests

labels:
country: FR
region: "Nord"

taskDefaults:
- type: io.kestra.core.tasks.debugs.Echo
values:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Singleton
@H2RepositoryEnabled
Expand All @@ -18,8 +22,26 @@ public H2FlowRepository(ApplicationContext applicationContext) {
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query);
protected Condition findCondition(String query, Map<String, String> labels) {
List<Condition> conditions = new ArrayList<>();

if (query != null) {
conditions.add(this.jdbcRepository.fullTextCondition(List.of("fulltext"), query));
}

if (labels != null) {
labels.forEach((key, value) -> {
Field<String> field = DSL.field("JQ_STRING(\"value\", '.labels." + key + "')", String.class);

if (value == null) {
conditions.add(field.isNotNull());
} else {
conditions.add(field.eq(value));
}
});
}

return conditions.size() == 0 ? DSL.trueCondition() : DSL.and(conditions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.*;
import org.jooq.impl.DSL;

import java.util.Arrays;
import java.util.Collections;
import java.util.*;

@Singleton
@MysqlRepositoryEnabled
Expand All @@ -19,8 +19,26 @@ public MysqlFlowRepository(ApplicationContext applicationContext) {
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query);
protected Condition findCondition(String query, Map<String, String> labels) {
List<Condition> conditions = new ArrayList<>();

if (query != null) {
conditions.add(this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query));
}

if (labels != null) {
labels.forEach((key, value) -> {
Field<String> field = DSL.field("JSON_VALUE(value, '$.labels." + key + "' NULL ON EMPTY)", String.class);

if (value == null) {
conditions.add(field.isNotNull());
} else {
conditions.add(field.eq(value));
}
});
}

return conditions.size() == 0 ? DSL.trueCondition() : DSL.and(conditions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.*;
import org.jooq.impl.DSL;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

@Singleton
@PostgresRepositoryEnabled
Expand All @@ -18,8 +22,26 @@ public PostgresFlowRepository(ApplicationContext applicationContext) {
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query);
protected Condition findCondition(String query, Map<String, String> labels) {
List<Condition> conditions = new ArrayList<>();

if (query != null) {
conditions.add(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}

if (labels != null) {
labels.forEach((key, value) -> {
Field<String> field = DSL.field("value #>> '{labels, " + key + "}'", String.class);

if (value == null) {
conditions.add(field.isNotNull());
} else {
conditions.add(field.eq(value));
}
});
}

return conditions.size() == 0 ? DSL.trueCondition() : DSL.and(conditions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,22 @@ private <R extends Record, E> SelectConditionStep<R> fullTextSelect(DSLContext c
.where(this.defaultFilter());
}

abstract protected Condition findCondition(String query);

public ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace) {
abstract protected Condition findCondition(String query, Map<String, String> labels);

public ArrayListTotal<Flow> find(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable Map<String, String> labels
) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);

SelectConditionStep<Record1<Object>> select = this.fullTextSelect(context, Collections.emptyList());

if (query != null) {
select.and(this.findCondition(query));
}
select.and(this.findCondition(query, labels));

if (namespace != null) {
select.and(field("namespace").likeIgnoreCase(namespace + "%"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import jakarta.inject.Inject;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -27,11 +30,19 @@ public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repo

@Test
void find() {
List<Flow> save = flowRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc("id"))), null, null);
List<Flow> save = flowRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc("id"))), null, null, null);
assertThat((long) save.size(), is(Helpers.FLOWS_COUNT));

save = flowRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "trigger-multiplecondition", null);
save = flowRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "trigger-multiplecondition", null, null);
assertThat((long) save.size(), is(3L));

save = flowRepository.find(Pageable.from(1, 100, Sort.UNSORTED), null, null, Map.of("country", "FR"));
assertThat(save.size(), is(1));

HashMap<String, String> map = new HashMap<>();
map.put("region", null);
save = flowRepository.find(Pageable.from(1, 100, Sort.UNSORTED), null, null, map);
assertThat(save.size(), is(1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,12 @@ public List<Flow> findByNamespace(String namespace) {
}

@Override
public ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace) {
public ArrayListTotal<Flow> find(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable Map<String, String> labels
) {
BoolQueryBuilder bool = this.defaultFilter();

if (query != null) {
Expand All @@ -198,6 +203,16 @@ public ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nul
bool.must(QueryBuilders.prefixQuery("namespace", namespace));
}

if (labels != null) {
labels.forEach((key, value) -> {
if (value != null) {
bool.must(QueryBuilders.termQuery("labels." + key, value));
} else {
bool.must(QueryBuilders.existsQuery("labels." + key));
}
});
}

SearchSourceBuilder sourceBuilder = this.searchSource(bool, Optional.empty(), pageable);
sourceBuilder.fetchSource("*", "sourceCode");

Expand Down
2 changes: 2 additions & 0 deletions repository-elasticsearch/src/main/resources/mappings/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ properties:
type: keyword
type:
type: keyword
labels:
type: flattened
deleted:
type: boolean
sourceCode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import jakarta.inject.Inject;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -39,9 +42,16 @@ class ElasticSearchFlowRepositoryTest extends AbstractFlowRepositoryTest {

@Test
void find() {
List<Flow> save = flowRepository.find(Pageable.from(1, 100, Sort.UNSORTED), null, null);

List<Flow> save = flowRepository.find(Pageable.from(1, 100, Sort.UNSORTED), null, null, null);
assertThat((long) save.size(), is(Helpers.FLOWS_COUNT));

save = flowRepository.find(Pageable.from(1, 100, Sort.UNSORTED), null, null, Map.of("country", "FR"));
assertThat(save.size(), is(1));

HashMap<String, String> map = new HashMap<>();
map.put("region", null);
save = flowRepository.find(Pageable.from(1, 100, Sort.UNSORTED), null, null, map);
assertThat(save.size(), is(1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ public List<Flow> findByNamespace(String namespace) {
.collect(Collectors.toList());
}

public ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace) {
public ArrayListTotal<Flow> find(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable Map<String, String> labels
) {
//TODO Non used query, returns just all at the moment
if (pageable.getNumber() < 1) {
throw new ValueException("Page cannot be < 1");
Expand Down
13 changes: 12 additions & 1 deletion ui/src/components/flows/Flows.vue
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
/>
</template>

<template #cell(labels)="row">
<labels :labels="row.item.labels" />
</template>

<template #cell(triggers)="row">
<trigger-avatar :flow="row.item" />
</template>
Expand Down Expand Up @@ -134,6 +138,7 @@
import TriggerAvatar from "./TriggerAvatar";
import MarkdownTooltip from "../layout/MarkdownTooltip"
import Kicon from "../Kicon"
import Labels from "../layout/Labels"
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions],
Expand All @@ -149,7 +154,8 @@
StateGlobalChart,
TriggerAvatar,
MarkdownTooltip,
Kicon
Kicon,
Labels
},
data() {
return {
Expand Down Expand Up @@ -180,6 +186,11 @@
label: title("flow"),
sortable: true
},
{
key: "labels",
label: title("labels"),
sortable: false
},
{
key: "namespace",
label: title("namespace"),
Expand Down
Loading

0 comments on commit 0bc9485

Please sign in to comment.