Skip to content

Commit

Permalink
fix(core): support null value for KV store (#4504)
Browse files Browse the repository at this point in the history
Fix: #4504
  • Loading branch information
fhussonnois committed Aug 2, 2024
1 parent 6b6b01d commit 07957db
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValue;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
Expand Down Expand Up @@ -39,7 +40,7 @@ void string() throws IOException, ResourceExpiredException {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("string").get(), is("stringValue"));
assertThat(kvStore.getValue("string").get(), is(new KVValue("stringValue")));
assertThat(kvStore.getRawValue("string").get(), is("\"stringValue\""));
}
}
Expand All @@ -65,7 +66,7 @@ void integer() throws IOException, ResourceExpiredException {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("int").get(), is(1));
assertThat(kvStore.getValue("int").get(), is(new KVValue(1)));
assertThat(kvStore.getRawValue("int").get(), is("1"));
}
}
Expand All @@ -92,7 +93,7 @@ void integerStr() throws IOException, ResourceExpiredException {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("intStr").get(), is("1"));
assertThat(kvStore.getValue("intStr").get(), is(new KVValue("1")));
assertThat(kvStore.getRawValue("intStr").get(), is("\"1\""));
}
}
Expand All @@ -118,7 +119,7 @@ void object() throws IOException, ResourceExpiredException {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("object").get(), is(Map.of("some", "json")));
assertThat(kvStore.getValue("object").get(), is(new KVValue(Map.of("some", "json"))));
assertThat(kvStore.getRawValue("object").get(), is("{some:\"json\"}"));
}
}
Expand All @@ -145,7 +146,7 @@ void objectStr() throws IOException, ResourceExpiredException {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("objectStr").get(), is("{\"some\":\"json\"}"));
assertThat(kvStore.getValue("objectStr").get(), is(new KVValue("{\"some\":\"json\"}")));
assertThat(kvStore.getRawValue("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
}
}
Expand Down Expand Up @@ -177,7 +178,7 @@ void fromFile() throws IOException, ResourceExpiredException {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("objectFromFile").get(), is(Map.of("some", "json", "from", "file")));
assertThat(kvStore.getValue("objectFromFile").get(), is(new KVValue(Map.of("some", "json", "from", "file"))));
assertThat(kvStore.getRawValue("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.kv.KVValue;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
Expand Down Expand Up @@ -43,7 +44,7 @@ public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationC
namespace = flowNamespace;
}

Optional<Object> value;
Optional<KVValue> value;
try {
value = kvStoreService.get(flowTenantId, namespace, flowNamespace).getValue(key);
} catch (Exception e) {
Expand All @@ -54,7 +55,7 @@ public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationC
throw new PebbleException(null, "The key '" + key + "' does not exist in the namespace '" + namespace + "'.", lineNumber, self.getName());
}

return value.orElse(null);
return value.map(KVValue::value).orElse(null);
}

protected String getKey(Map<String, Object> args, PebbleTemplate self, int lineNumber) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/core/storages/kv/KVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper, bo

void putRaw(String key, KVStoreValueWrapper<String> kvStoreValueWrapper) throws IOException;

default Optional<Object> getValue(String key) throws IOException, ResourceExpiredException {
default Optional<KVValue> getValue(String key) throws IOException, ResourceExpiredException {
return this.getRawValue(key).map(throwFunction(raw -> {
Object value = JacksonMapper.ofIon().readValue(raw, Object.class);
if (value instanceof String valueStr && durationPattern.matcher(valueStr).matches()) {
return Duration.parse(valueStr);
return new KVValue(Duration.parse(valueStr));
}

return value;
return new KVValue(value);
}));
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/storages/kv/KVValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.kestra.core.storages.kv;

import jakarta.annotation.Nullable;

/**
* A K/V store entry value.
*
* @param value The value - can be null
*/
public record KVValue(@Nullable Object value) {
}
5 changes: 3 additions & 2 deletions core/src/main/java/io/kestra/plugin/core/kv/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.kv.KVValue;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -73,13 +74,13 @@ public Output run(RunContext runContext) throws Exception {

String renderedKey = runContext.render(this.key);

Optional<Object> maybeValue = runContext.namespaceKv(renderedNamespace).getValue(renderedKey);
Optional<KVValue> maybeValue = runContext.namespaceKv(renderedNamespace).getValue(renderedKey);
if (this.errorOnMissing && maybeValue.isEmpty()) {
throw new NoSuchElementException("No value found for key '" + renderedKey + "' in namespace '" + renderedNamespace + "' and `errorOnMissing` is set to true");
}

return Output.builder()
.value(maybeValue.orElse(null))
.value(maybeValue.map(KVValue::value).orElse(null))
.build();
}

Expand Down
44 changes: 28 additions & 16 deletions core/src/test/java/io/kestra/core/storages/InternalKVStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.storages.kv.KVMetadata;
import io.kestra.core.storages.kv.KVStoreValueWrapper;
import io.kestra.core.storages.kv.KVValue;
import io.kestra.core.utils.IdUtils;
import io.kestra.storage.local.LocalStorage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
Expand All @@ -33,7 +32,7 @@
class InternalKVStoreTest {
private static final Instant date = Instant.now().truncatedTo(ChronoUnit.MILLIS);
private static final Map<String, Object> complexValue = Map.of("some", "complex", "object", Map.of("with", "nested", "values", date));
private static final Logger logger = LoggerFactory.getLogger(InternalKVStoreTest.class);
static final String TEST_KV_KEY = "my-key";

LocalStorage storageInterface;

Expand All @@ -52,7 +51,7 @@ void list() throws IOException {

assertThat(kv.list().size(), is(0));

kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue));
kv.put(TEST_KV_KEY, new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue));
kv.put("my-second-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(10)), complexValue));
kv.put("expired-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMillis(1)), complexValue));
Instant after = Instant.now().plusMillis(100);
Expand All @@ -69,7 +68,7 @@ void list() throws IOException {
// Check that we don't list expired keys
assertThat(map.size(), is(2));

KVEntry myKeyValue = map.get("my-key");
KVEntry myKeyValue = map.get(TEST_KV_KEY);
assertThat(
myKeyValue.creationDate().plus(Duration.ofMinutes(4)).isBefore(myKeyValue.expirationDate()) &&
myKeyValue.creationDate().plus(Duration.ofMinutes(6)).isAfter(myKeyValue.expirationDate()),
Expand All @@ -91,7 +90,7 @@ void put() throws IOException {

// When
Instant before = Instant.now();
kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue));
kv.put(TEST_KV_KEY, new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue));

// Then
StorageObject withMetadata = storageInterface.getWithMetadata(null, URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
Expand All @@ -101,7 +100,7 @@ void put() throws IOException {
assertThat(valueFile, is(JacksonMapper.ofIon().writeValueAsString(complexValue)));

// Re-When
kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(10)), "some-value"));
kv.put(TEST_KV_KEY, new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(10)), "some-value"));

// Then
withMetadata = storageInterface.getWithMetadata(null, URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
Expand All @@ -112,38 +111,51 @@ void put() throws IOException {
}

@Test
void get() throws IOException, ResourceExpiredException {
void shouldGetGivenEntryWithNullValue() throws IOException, ResourceExpiredException {
// Given
final InternalKVStore kv = kv();
kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue));
kv.put(TEST_KV_KEY, new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), null));

// When
Optional<Object> value = kv.getValue("my-key");
Optional<KVValue> value = kv.getValue(TEST_KV_KEY);

// Then
assertThat(value.get(), is(complexValue));
assertThat(value, is(Optional.of(new KVValue(null))));
}

@Test
void getUnknownKey() throws IOException, ResourceExpiredException {
void shouldGetGivenEntryWithComplexValue() throws IOException, ResourceExpiredException {
// Given
final InternalKVStore kv = kv();
kv.put(TEST_KV_KEY, new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue));

// When
Optional<Object> value = kv.getValue("my-key");
Optional<KVValue> value = kv.getValue(TEST_KV_KEY);

// Then
assertThat(value.get(), is(new KVValue(complexValue)));
}

@Test
void shouldGetEmptyGivenNonExistingKey() throws IOException, ResourceExpiredException {
// Given
final InternalKVStore kv = kv();

// When
Optional<KVValue> value = kv.getValue(TEST_KV_KEY);

// Then
assertThat(value.isEmpty(), is(true));
}

@Test
void getExpiredKV() throws IOException {
void shouldThrowGivenExpiredEntry() throws IOException {
// Given
final InternalKVStore kv = kv();
kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofNanos(1)), complexValue));
kv.put(TEST_KV_KEY, new KVStoreValueWrapper<>(new KVMetadata(Duration.ofNanos(1)), complexValue));

// When
Assertions.assertThrows(ResourceExpiredException.class, () -> kv.getValue("my-key"));
Assertions.assertThrows(ResourceExpiredException.class, () -> kv.getValue(TEST_KV_KEY));
}

@Test
Expand Down
9 changes: 5 additions & 4 deletions core/src/test/java/io/kestra/plugin/core/kv/SetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVStoreException;
import io.kestra.core.storages.kv.KVValue;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -52,7 +53,7 @@ void shouldSetKVGivenNoNamespace() throws Exception {

// Then
final KVStore kv = runContext.namespaceKv(runContext.flowInfo().namespace());
assertThat(kv.getValue(TEST_KEY), is(Optional.of(value)));
assertThat(kv.getValue(TEST_KEY), is(Optional.of(new KVValue(value))));
assertThat(kv.list().getFirst().expirationDate(), nullValue());
}

Expand Down Expand Up @@ -80,7 +81,7 @@ void shouldSetKVGivenSameNamespace() throws Exception {

// Then
final KVStore kv = runContext.namespaceKv("io.kestra.test");
assertThat(kv.getValue(TEST_KEY), is(Optional.of("test-value")));
assertThat(kv.getValue(TEST_KEY), is(Optional.of(new KVValue("test-value"))));
assertThat(kv.list().getFirst().expirationDate(), nullValue());
}

Expand All @@ -107,7 +108,7 @@ void shouldSetKVGivenChildNamespace() throws Exception {

// then
final KVStore kv = runContext.namespaceKv("io.kestra.test.unit");
assertThat(kv.getValue(TEST_KEY), is(Optional.of("test-value")));
assertThat(kv.getValue(TEST_KEY), is(Optional.of(new KVValue("test-value"))));
assertThat(kv.list().getFirst().expirationDate(), nullValue());
}

Expand Down Expand Up @@ -156,7 +157,7 @@ void shouldSetKVGivenTTL() throws Exception {

// Then
final KVStore kv = runContext.namespaceKv(runContext.flowInfo().namespace());
assertThat(kv.getValue(TEST_KEY), is(Optional.of(value)));
assertThat(kv.getValue(TEST_KEY), is(Optional.of(new KVValue(value))));
Instant expirationDate = kv.get(TEST_KEY).get().expirationDate();
assertThat(expirationDate.isAfter(Instant.now().plus(Duration.ofMinutes(4))) && expirationDate.isBefore(Instant.now().plus(Duration.ofMinutes(6))), is(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ public TypedValue get(
@Parameter(description = "The namespace id") @PathVariable String namespace,
@Parameter(description = "The key") @PathVariable String key
) throws IOException, URISyntaxException, ResourceExpiredException {
Object value = kvStore(namespace).getValue(key).orElseThrow(() -> new NoSuchElementException("No value found for key '" + key + "' in namespace '" + namespace + "'"));
return new TypedValue(KVType.from(value), value);
KVValue value = kvStore(namespace)
.getValue(key)
.orElseThrow(() -> new NoSuchElementException("No value found for key '" + key + "' in namespace '" + namespace + "'"));
return new TypedValue(KVType.from(value.value()), value);
}

@ExecuteOn(TaskExecutors.IO)
Expand Down Expand Up @@ -92,6 +94,8 @@ public enum KVType {
JSON;

public static KVType from(Object value) {
if (value == null) return KVType.STRING;

return switch (value) {
case String ignored -> STRING;
case Number ignored -> NUMBER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void put(MediaType mediaType, String value, Class<?> expectedClass) throws IOExc
client.toBlocking().exchange(HttpRequest.PUT("/api/v1/namespaces/" + NAMESPACE + "/kv/my-key", value).contentType(mediaType).header("ttl", "PT5M"));

KVStore kvStore = new InternalKVStore(null, NAMESPACE, storageInterface);
Class<?> valueClazz = kvStore.getValue("my-key").get().getClass();
Class<?> valueClazz = kvStore.getValue("my-key").get().value().getClass();
assertThat("Expected value to be a " + expectedClass + " but was " + valueClazz, expectedClass.isAssignableFrom(valueClazz), is(true));

List<KVEntry> list = kvStore.list();
Expand Down

0 comments on commit 07957db

Please sign in to comment.