Skip to content

Commit

Permalink
feat(ui): introduce KV Store
Browse files Browse the repository at this point in the history
  • Loading branch information
brian-mulier-p committed Jul 5, 2024
1 parent 4f385dd commit 1ba8525
Show file tree
Hide file tree
Showing 19 changed files with 484 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ public void writeSpecialized(short s) {
if (current == null) {
current = s;
} else {
Short currentS = this.ofSameTypeOrThrow(current, Short.class);
Short currentS = null;
try {
currentS = this.ofSameTypeOrThrow(current, Short.class);
} catch (Exception e) {
try {
current = this.ofSameTypeOrThrow(current, Integer.class) + s;
return;
} catch (Exception ex) {
throw e;
}
}
current = currentS + s;
}
}
Expand All @@ -68,7 +78,17 @@ public void writeSpecialized(byte b) {
if (current == null) {
current = b;
} else {
Byte currentB = this.ofSameTypeOrThrow(current, Byte.class);
Byte currentB = null;
try {
currentB = this.ofSameTypeOrThrow(current, Byte.class);
} catch (Exception e) {
try {
current = this.ofSameTypeOrThrow(current, Integer.class) + b;
return;
} catch (Exception ex) {
throw e;
}
}
current = currentB + b;
}
}
Expand All @@ -78,8 +98,18 @@ public void writeSpecialized(char c) {
if (current == null) {
current = c;
} else {
Character currentC = this.ofSameTypeOrThrow(current, Character.class);
current = currentC + c;
Character currentC;
try {
currentC = this.ofSameTypeOrThrow(current, Character.class);
} catch (Exception e) {
try {
current = this.ofSameTypeOrThrow(current, String.class) + c;
return;
} catch (Exception ex) {
throw e;
}
}
current = "" + currentC + c;
}
}

Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/kestra/core/storages/kv/KVEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;

public record KVEntry(String key, Instant creationDate, Instant updateDate, Instant expirationDate) {
public static KVEntry from(FileAttributes fileAttributes) throws IOException {
return new KVEntry(
fileAttributes.getFileName().replace(".ion", ""),
Instant.ofEpochMilli(fileAttributes.getCreationTime()),
Instant.ofEpochMilli(fileAttributes.getLastModifiedTime()),
new KVMetadata(fileAttributes.getMetadata()).getExpirationDate()
Optional.ofNullable(new KVMetadata(fileAttributes.getMetadata()).getExpirationDate())
.map(expirationDate -> expirationDate.truncatedTo(ChronoUnit.MILLIS))
.orElse(null)
);
}
}
24 changes: 21 additions & 3 deletions core/src/main/java/io/kestra/core/storages/kv/KVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.utils.Rethrow;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;

import static io.kestra.core.utils.Rethrow.throwFunction;

/**
* Service interface for accessing the files attached to a namespace Key-Value store.
*/
public interface KVStore {
Pattern durationPattern = Pattern.compile("^P(?=.)(?:\\d*D)?(?:T(?=.)(?:\\d*H)?(?:\\d*M)?(?:\\d*S)?)?$");

default void validateKey(String key) {
if (key == null || key.isEmpty()) {
throw new IllegalArgumentException("Key cannot be null or empty");
Expand All @@ -38,13 +41,28 @@ default URI storageUri(String key, String namespace) {
}

default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper) throws IOException {
this.putRaw(key, KVStoreValueWrapper.ionStringify(kvStoreValueWrapper));
Object value = kvStoreValueWrapper.value();
String ionValue;
if (value instanceof Duration duration) {
ionValue = duration.toString();
} else {
ionValue = JacksonMapper.ofIon().writeValueAsString(value);
}

this.putRaw(key, new KVStoreValueWrapper<>(kvStoreValueWrapper.kvMetadata(), ionValue));
}

void putRaw(String key, KVStoreValueWrapper<String> kvStoreValueWrapper) throws IOException;

default Optional<Object> get(String key) throws IOException, ResourceExpiredException {
return this.getRaw(key).map(throwFunction(raw -> JacksonMapper.ofIon().readValue(raw, Object.class)));
return this.getRaw(key).map(throwFunction(raw -> {
Object value = JacksonMapper.ofIon().readValue(raw, Object.class);
if (value instanceof String valueStr && durationPattern.matcher(valueStr).matches()) {
return Duration.parse(valueStr);
}

return value;
}));
}

Optional<String> getRaw(String key) throws IOException, ResourceExpiredException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.kestra.core.storages.kv;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageObject;

import java.io.IOException;
Expand Down Expand Up @@ -36,8 +34,4 @@ static KVStoreValueWrapper<String> from(StorageObject storageObject) throws IOEx
return new KVStoreValueWrapper<>(new KVMetadata(storageObject.metadata()), ionString);
}
}

static KVStoreValueWrapper<String> ionStringify(KVStoreValueWrapper<Object> kvStoreValueWrapper) throws JsonProcessingException {
return new KVStoreValueWrapper<>(kvStoreValueWrapper.kvMetadata(), JacksonMapper.ofIon().writeValueAsString(kvStoreValueWrapper.value()));
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions core/src/main/resources/icons/io.kestra.plugin.core.kv.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.kestra.core.runners.pebble;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class TypedObjectWriterTest {
@Test
void invalidAddition() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.writeSpecialized(1);
IllegalArgumentException illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, () -> writer.writeSpecialized('a'));
assertThat(illegalArgumentException.getMessage(), is("Tried to add java.lang.Character to java.lang.Integer"));
}
}

@Test
void writeInts() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.writeSpecialized(1);
writer.writeSpecialized(2);
writer.writeSpecialized(3);
assertThat(writer.output(), is(6));
}
}

@Test
void writeLongs() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.writeSpecialized(1L);
writer.writeSpecialized(2L);
writer.writeSpecialized(3L);
assertThat(writer.output(), is(6L));
}
}

@Test
void writeDoubles() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.writeSpecialized(1.0);
writer.writeSpecialized(2.0);
writer.writeSpecialized(3.0);
assertThat(writer.output(), is(6.0));
}
}

@Test
void writeFloats() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.writeSpecialized(1.0f);
writer.writeSpecialized(2.0f);
writer.writeSpecialized(3.0f);
assertThat(writer.output(), is(6.0f));
}
}

@Test
void writeShorts() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.writeSpecialized((short) 1);
writer.writeSpecialized((short) 2);
writer.writeSpecialized((short) 3);
assertThat(writer.output(), is(6));
}
}

@Test
void writeBytes() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
byte aByte = "a".getBytes()[0];
writer.writeSpecialized(aByte);
byte bByte = "b".getBytes()[0];
writer.writeSpecialized(bByte);
byte cByte = "c".getBytes()[0];
writer.writeSpecialized(cByte);
assertThat(writer.output(), is((aByte + bByte) + cByte));
}
}

@Test
void writeChars() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.writeSpecialized('a');
writer.writeSpecialized('b');
writer.writeSpecialized('c');
assertThat(writer.output(), is("abc"));
}
}

@Test
void writeStrings() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.writeSpecialized("a");
writer.writeSpecialized("b");
writer.writeSpecialized("c");
assertThat(writer.output(), is("abc"));
}
}

@Test
void writeObjects() throws IOException {
try (TypedObjectWriter writer = new TypedObjectWriter()){
writer.write(Map.of("a", "b"));
IllegalArgumentException illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, () -> writer.write(Map.of("c", "d")));
assertThat(illegalArgumentException.getMessage(), is("Tried to add java.util.ImmutableCollections$Map1 to java.util.ImmutableCollections$Map1"));
}
}
}
4 changes: 2 additions & 2 deletions core/src/test/java/io/kestra/plugin/core/kv/SetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void defaultCase() throws Exception {
set.run(runContext);

final KVStore kv = runContext.storage().namespaceKv(namespaceId);
assertThat(kv.get(key), is(value));
assertThat(kv.get(key).get(), is(value));
assertThat(kv.list().get(0).expirationDate(), nullValue());
}

Expand All @@ -76,7 +76,7 @@ void ttl() throws Exception {
set.run(runContext);

final KVStore kv = runContext.storage().namespaceKv(namespaceId);
assertThat(kv.get(key), is(value));
assertThat(kv.get(key).get(), is(value));
Instant expirationDate = kv.list().get(0).expirationDate();
assertThat(expirationDate.isAfter(Instant.now().plus(Duration.ofMinutes(4))) && expirationDate.isBefore(Instant.now().plus(Duration.ofMinutes(6))), is(true));
}
Expand Down
6 changes: 3 additions & 3 deletions ui/src/components/executions/date-select/DateFilter.vue
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@update:model-value="onAbsFilterChange"
class="w-auto"
/>
<relative-date-select
<time-select
v-if="selectedFilterType === filterType.RELATIVE"
:time-range="timeRange"
@update:model-value="onRelFilterChange"
Expand All @@ -27,12 +27,12 @@

<script>
import DateRange from "../../layout/DateRange.vue";
import RelativeDateSelect from "./RelativeDateSelect.vue";
import TimeSelect from "./TimeSelect.vue";
export default {
components: {
DateRange,
RelativeDateSelect
TimeSelect
},
emits: [
"update:isRelative",
Expand Down
16 changes: 13 additions & 3 deletions ui/src/components/executions/date-select/DateSelect.vue
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
<template>
<el-tooltip :content="tooltip" effect="light">
<el-tooltip :disabled="tooltip === undefined" :content="tooltip" effect="light">
<el-select
:model-value="value"
:placeholder="placeholder"
@change="$emit('change', $event)"
:clearable="clearable"
>
<template #prefix>
<clock-outline />
Expand All @@ -28,17 +30,25 @@
"change"
],
props: {
placeholder: {
type: String,
default: undefined
},
value: {
type: String,
required: true
default: undefined
},
options: {
type: Array,
default: () => []
},
tooltip: {
type: String,
required: true
default: undefined
},
clearable: {
type: Boolean,
default: false
}
}
}
Expand Down
Loading

0 comments on commit 1ba8525

Please sign in to comment.