Skip to content

Commit

Permalink
fix(core): fix KV store types (#4483)
Browse files Browse the repository at this point in the history
Fix: #4483
  • Loading branch information
fhussonnois committed Aug 2, 2024
1 parent 07957db commit 986f91b
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 197 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
package io.kestra.cli.commands.namespaces.kv;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.kv.KVMetadata;
import io.kestra.core.storages.kv.KVStoreValueWrapper;
import io.kestra.core.utils.KestraIgnore;
import io.kestra.core.serializers.JacksonMapper;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.multipart.MultipartBody;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Option;

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.regex.Pattern;

@CommandLine.Command(
name = "update",
Expand All @@ -29,7 +23,6 @@
)
@Slf4j
public class KvUpdateCommand extends AbstractApiCommand {
private static final Pattern STRING_PATTERN = Pattern.compile("^(?![\\d{\\[\"]+)(?!false)(?!true)(?!P(?=[^T]|T.)(?:\\d*D)?(?:T(?=.)(?:\\d*H)?(?:\\d*M)?(?:\\d*S)?)?).*$");

@CommandLine.Parameters(index = "0", description = "the namespace to update")
public String namespace;
Expand All @@ -40,13 +33,13 @@ public class KvUpdateCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "2", description = "the value to assign to the key. If the value is an object, it must be in JSON format. If the value must be read from file, use -f parameter.")
public String value;

@CommandLine.Option(names = {"-e", "--expiration"}, description = "the duration after which the key should expire.")
@Option(names = {"-e", "--expiration"}, description = "the duration after which the key should expire.")
public String expiration;

@CommandLine.Option(names = {"-t", "--type"}, description = "the type of the value. Optional and useful to override the deduced type (eg. numbers, booleans or JSON as full string). Must be one of STRING, NUMBER, BOOLEAN, DATETIME, DATE, DURATION, JSON.")
public String type;
@Option(names = {"-t", "--type"}, description = "the type of the value. Optional and useful to override the deduced type (eg. numbers, booleans or JSON as full string). Valid values: ${COMPLETION-CANDIDATES}.")
public Type type;

@CommandLine.Option(names = {"-f", "--file-value"}, description = "the file from which to read the value to set. If this is provided, it will take precedence over any specified value.")
@Option(names = {"-f", "--file-value"}, description = "the file from which to read the value to set. If this is provided, it will take precedence over any specified value.")
public Path fileValue;

@Override
Expand All @@ -57,20 +50,14 @@ public Integer call() throws Exception {
value = Files.readString(Path.of(fileValue.toString().trim()));
}

String formattedValue = value;
if (type != null) {
if (type.trim().equals("STRING") && !value.startsWith("\"")) {
formattedValue = "\"" + value.replace("\"", "\\\"") + "\"";
}
} else if (STRING_PATTERN.matcher(value).matches()) {
formattedValue = "\"" + value + "\"";
if (isLiteral(value) || type == Type.STRING) {
value = wrapAsJsonLiteral(value);
}

Duration ttl = expiration == null ? null : Duration.parse(expiration);
String trimmedValue = formattedValue.trim();
boolean isJson = trimmedValue.startsWith("{") && trimmedValue.endsWith("}") || trimmedValue.startsWith("[") && trimmedValue.endsWith("]");
MutableHttpRequest<String> request = HttpRequest.PUT(apiUri("/namespaces/") + namespace + "/kv/" + key, formattedValue)
.contentType(isJson ? MediaType.APPLICATION_JSON_TYPE : MediaType.TEXT_PLAIN_TYPE);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/") + namespace + "/kv/" + key, value)
.contentType(MediaType.APPLICATION_JSON_TYPE);

if (ttl != null) {
request.header("ttl", ttl.toString());
Expand All @@ -79,7 +66,25 @@ public Integer call() throws Exception {
try (DefaultHttpClient client = client()) {
client.toBlocking().exchange(this.requestOptions(request));
}

return 0;
}

private static boolean isLiteral(final String input) {
// use ION mapper to properly handle timestamp
ObjectMapper mapper = JacksonMapper.ofIon();
try {
mapper.readTree(input);
return false;
} catch (JsonProcessingException e) {
return true;
}
}

public static String wrapAsJsonLiteral(final String input) {
return "\"" + input.replace("\"", "\\\"") + "\"";
}

enum Type {
STRING, NUMBER, BOOLEAN, DATETIME, DATE, DURATION, JSON;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValue;
import io.micronaut.configuration.picocli.PicocliRunner;
Expand Down Expand Up @@ -41,7 +42,7 @@ void string() throws IOException, ResourceExpiredException {
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("string").get(), is(new KVValue("stringValue")));
assertThat(kvStore.getRawValue("string").get(), is("\"stringValue\""));
assertThat(((InternalKVStore)kvStore).getRawValue("string").get(), is("\"stringValue\""));
}
}

Expand All @@ -67,7 +68,7 @@ void integer() throws IOException, ResourceExpiredException {
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("int").get(), is(new KVValue(1)));
assertThat(kvStore.getRawValue("int").get(), is("1"));
assertThat(((InternalKVStore)kvStore).getRawValue("int").get(), is("1"));
}
}

Expand All @@ -86,15 +87,16 @@ void integerStr() throws IOException, ResourceExpiredException {
"io.kestra.cli",
"intStr",
"1",
"-t STRING"
"-t",
"STRING"
};
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("intStr").get(), is(new KVValue("1")));
assertThat(kvStore.getRawValue("intStr").get(), is("\"1\""));
assertThat(((InternalKVStore)kvStore).getRawValue("intStr").get(), is("\"1\""));
}
}

Expand All @@ -120,7 +122,7 @@ void object() throws IOException, ResourceExpiredException {
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("object").get(), is(new KVValue(Map.of("some", "json"))));
assertThat(kvStore.getRawValue("object").get(), is("{some:\"json\"}"));
assertThat(((InternalKVStore)kvStore).getRawValue("object").get(), is("{some:\"json\"}"));
}
}

Expand All @@ -139,15 +141,16 @@ void objectStr() throws IOException, ResourceExpiredException {
"io.kestra.cli",
"objectStr",
"{\"some\":\"json\"}",
"-t STRING"
"-t",
"STRING"
};
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("objectStr").get(), is(new KVValue("{\"some\":\"json\"}")));
assertThat(kvStore.getRawValue("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
assertThat(((InternalKVStore)kvStore).getRawValue("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
}
}

Expand Down Expand Up @@ -179,7 +182,7 @@ void fromFile() throws IOException, ResourceExpiredException {
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.getValue("objectFromFile").get(), is(new KVValue(Map.of("some", "json", "from", "file"))));
assertThat(kvStore.getRawValue("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
assertThat(((InternalKVStore)kvStore).getRawValue("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
}
}
}
61 changes: 50 additions & 11 deletions core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.storages.kv;

import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageObject;
Expand All @@ -9,11 +10,13 @@
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;

import static io.kestra.core.utils.Rethrow.throwFunction;

Expand All @@ -23,6 +26,8 @@
*/
public class InternalKVStore implements KVStore {

private static final Pattern DURATION_PATTERN = Pattern.compile("^P(?=[^T]|T.)(?:\\d*D)?(?:T(?=.)(?:\\d*H)?(?:\\d*M)?(?:\\d*S)?)?$");

private final String namespace;
private final String tenant;
private final StorageInterface storage;
Expand All @@ -40,48 +45,79 @@ public InternalKVStore(@Nullable final String tenant, final String namespace, fi
this.tenant = tenant;
}

/**
* {@inheritDoc}
*/
@Override
public String namespace() {
return this.namespace;
}

/**
* {@inheritDoc}
*/
@Override
public void putRaw(String key, KVStoreValueWrapper<String> kvStoreValueWrapper) throws IOException {
this.validateKey(key);
public void put(String key, KVValueAndMetadata value, boolean overwrite) throws IOException {
KVStore.validateKey(key);

if (!overwrite && exists(key)) {
throw new KVStoreException(String.format(
"Cannot set value for key '%s'. Key already exists and `overwrite` is set to `false`.", key));
}

byte[] serialized = JacksonMapper.ofIon().writeValueAsBytes(value.value());

this.storage.put(this.tenant, this.storageUri(key), new StorageObject(
kvStoreValueWrapper.metadataAsMap(),
new ByteArrayInputStream(kvStoreValueWrapper.value().getBytes())
value.metadataAsMap(),
new ByteArrayInputStream(serialized)
));
}

/**
* {@inheritDoc}
*/
@Override
public Optional<KVValue> getValue(String key) throws IOException, ResourceExpiredException {
return this.getRawValue(key).map(throwFunction(raw -> {
Object value = JacksonMapper.ofIon().readValue(raw, Object.class);
if (value instanceof String valueStr && DURATION_PATTERN.matcher(valueStr).matches()) {
return new KVValue(Duration.parse(valueStr));
}
return new KVValue(value);
}));
}

public Optional<String> getRawValue(String key) throws IOException, ResourceExpiredException {
this.validateKey(key);
KVStore.validateKey(key);

StorageObject withMetadata;
try {
withMetadata = this.storage.getWithMetadata(this.tenant, this.storageUri(key));
} catch (FileNotFoundException e) {
return Optional.empty();
}
KVStoreValueWrapper<String> kvStoreValueWrapper = KVStoreValueWrapper.from(withMetadata);
KVValueAndMetadata kvStoreValueWrapper = KVValueAndMetadata.from(withMetadata);

Instant expirationDate = kvStoreValueWrapper.kvMetadata().getExpirationDate();
Instant expirationDate = kvStoreValueWrapper.metadata().getExpirationDate();
if (expirationDate != null && Instant.now().isAfter(expirationDate)) {
this.delete(key);
throw new ResourceExpiredException("The requested value has expired");
}
return Optional.of(kvStoreValueWrapper.value());
return Optional.of((String)(kvStoreValueWrapper.value()));
}

/**
* {@inheritDoc}
*/
@Override
public boolean delete(String key) throws IOException {
this.validateKey(key);

KVStore.validateKey(key);
return this.storage.delete(this.tenant, this.storageUri(key));
}

/**
* {@inheritDoc}
*/
@Override
public List<KVEntry> list() throws IOException {
List<FileAttributes> list;
Expand All @@ -96,9 +132,12 @@ public List<KVEntry> list() throws IOException {
.toList();
}

/**
* {@inheritDoc}
*/
@Override
public Optional<KVEntry> get(final String key) throws IOException {
this.validateKey(key);
KVStore.validateKey(key);

try {
KVEntry entry = KVEntry.from(this.storage.getAttributes(this.tenant, this.storageUri(key)));
Expand Down
Loading

0 comments on commit 986f91b

Please sign in to comment.