diff --git a/core/src/main/java/io/kestra/core/docs/DocumentationGenerator.java b/core/src/main/java/io/kestra/core/docs/DocumentationGenerator.java index e159afa7b0..0c9f9a191b 100644 --- a/core/src/main/java/io/kestra/core/docs/DocumentationGenerator.java +++ b/core/src/main/java/io/kestra/core/docs/DocumentationGenerator.java @@ -254,7 +254,7 @@ public static String render(String templateName, Map vars) t PebbleTemplate compiledTemplate = pebbleEngine.getLiteralTemplate(pebbleTemplate); - Writer writer = new JsonWriter(new StringWriter()); + Writer writer = new JsonWriter(); compiledTemplate.evaluate(writer, vars); String renderer = writer.toString(); diff --git a/core/src/main/java/io/kestra/core/exceptions/ResourceExpiredException.java b/core/src/main/java/io/kestra/core/exceptions/ResourceExpiredException.java new file mode 100644 index 0000000000..e82c2b1a1e --- /dev/null +++ b/core/src/main/java/io/kestra/core/exceptions/ResourceExpiredException.java @@ -0,0 +1,17 @@ +package io.kestra.core.exceptions; + +public class ResourceExpiredException extends Exception { + private static final long serialVersionUID = 1L; + + public ResourceExpiredException(Throwable e) { + super(e); + } + + public ResourceExpiredException(String message) { + super(message); + } + + public ResourceExpiredException(String message, Throwable e) { + super(message, e); + } +} diff --git a/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java b/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java index cec10e8e7f..c4f485ef89 100644 --- a/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java +++ b/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java @@ -162,6 +162,14 @@ public String render(String inline) throws IllegalVariableEvaluationException { return variableRenderer.render(inline, this.variables); } + /** + * {@inheritDoc} + */ + @Override + public Object renderTyped(String inline) throws IllegalVariableEvaluationException { + return variableRenderer.renderTyped(inline, this.variables); + } + @Override @SuppressWarnings("unchecked") public String render(String inline, Map variables) throws IllegalVariableEvaluationException { diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index a8230b7df4..d231005e6f 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -42,6 +42,8 @@ public abstract class RunContext { public abstract String render(String inline) throws IllegalVariableEvaluationException; + public abstract Object renderTyped(String inline) throws IllegalVariableEvaluationException; + public abstract String render(String inline, Map variables) throws IllegalVariableEvaluationException; public abstract List render(List inline) throws IllegalVariableEvaluationException; diff --git a/core/src/main/java/io/kestra/core/runners/VariableRenderer.java b/core/src/main/java/io/kestra/core/runners/VariableRenderer.java index 6b37893fde..03757aca08 100644 --- a/core/src/main/java/io/kestra/core/runners/VariableRenderer.java +++ b/core/src/main/java/io/kestra/core/runners/VariableRenderer.java @@ -1,9 +1,7 @@ package io.kestra.core.runners; import io.kestra.core.exceptions.IllegalVariableEvaluationException; -import io.kestra.core.runners.pebble.ExtensionCustomizer; -import io.kestra.core.runners.pebble.JsonWriter; -import io.kestra.core.runners.pebble.PebbleLruCache; +import io.kestra.core.runners.pebble.*; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.ConfigurationProperties; import io.micronaut.core.annotation.Nullable; @@ -70,37 +68,53 @@ public String render(String inline, Map variables) throws Illega return this.render(inline, variables, this.variableConfiguration.getRecursiveRendering()); } + public Object renderTyped(String inline, Map variables) throws IllegalVariableEvaluationException { + return this.render(inline, variables, this.variableConfiguration.getRecursiveRendering(), false); + } + public String render(String inline, Map variables, boolean recursive) throws IllegalVariableEvaluationException { + return (String) this.render(inline, variables, recursive, true); + } + + public Object render(Object inline, Map variables, boolean recursive, boolean stringify) throws IllegalVariableEvaluationException { if (inline == null) { return null; } - if (inline.indexOf('{') == -1) { + if (inline instanceof String inlineStr && inlineStr.indexOf('{') == -1) { // it's not a Pebble template so we short-circuit rendering return inline; } - String render = recursive - ? renderRecursively(inline, variables) - : renderOnce(inline, variables); + Object render = recursive + ? renderRecursively(inline, variables, stringify) + : renderOnce(inline, variables, stringify); - return RAW_PATTERN.matcher(render).replaceAll("$2"); + if (render instanceof String renderStr) { + return RAW_PATTERN.matcher(renderStr).replaceAll("$2"); + } + + return render; } - public String renderOnce(String inline, Map variables) throws IllegalVariableEvaluationException { - // pre-process raw tags - Matcher rawMatcher = RAW_PATTERN.matcher(inline); - Map replacers = new HashMap<>((int) Math.ceil(rawMatcher.groupCount() / 0.75)); - String result = replaceRawTags(rawMatcher, replacers); + public Object renderOnce(Object inline, Map variables, boolean stringify) throws IllegalVariableEvaluationException { + Object result = inline; + Map replacers = null; + if (inline instanceof String inlineStr) { + // pre-process raw tags + Matcher rawMatcher = RAW_PATTERN.matcher(inlineStr); + replacers = new HashMap<>((int) Math.ceil(rawMatcher.groupCount() / 0.75)); + result = replaceRawTags(rawMatcher, replacers); + } try { - PebbleTemplate compiledTemplate = this.pebbleEngine.getLiteralTemplate(result); + PebbleTemplate compiledTemplate = this.pebbleEngine.getLiteralTemplate((String) result); - Writer writer = new JsonWriter(new StringWriter()); + OutputWriter writer = stringify ? new JsonWriter() : new TypedObjectWriter(); compiledTemplate.evaluate(writer, variables); - result = writer.toString(); + result = writer.output(); } catch (IOException | PebbleException e) { - String alternativeRender = this.alternativeRender(e, inline, variables); + String alternativeRender = this.alternativeRender(e, (String) inline, variables); if (alternativeRender == null) { if (e instanceof PebbleException) { throw properPebbleException((PebbleException) e); @@ -111,8 +125,10 @@ public String renderOnce(String inline, Map variables) throws Il } } - // post-process raw tags - result = putBackRawTags(replacers, result); + if (result instanceof String && replacers != null) { + // post-process raw tags + result = putBackRawTags(replacers, (String) result); + } return result; } @@ -144,21 +160,21 @@ private static String replaceRawTags(Matcher rawMatcher, Map rep }); } - public String renderRecursively(String inline, Map variables) throws IllegalVariableEvaluationException { - return this.renderRecursively(0, inline, variables); + public Object renderRecursively(Object inline, Map variables, boolean stringify) throws IllegalVariableEvaluationException { + return this.renderRecursively(0, inline, variables, stringify); } - private String renderRecursively(int renderingCount, String inline, Map variables) throws IllegalVariableEvaluationException { + private Object renderRecursively(int renderingCount, Object inline, Map variables, boolean stringify) throws IllegalVariableEvaluationException { if (renderingCount > MAX_RENDERING_AMOUNT) { throw new IllegalVariableEvaluationException("Too many rendering attempts"); } - String result = this.renderOnce(inline, variables); + Object result = this.renderOnce(inline, variables, stringify); if (result.equals(inline)) { return result; } - return renderRecursively(++renderingCount, result, variables); + return renderRecursively(++renderingCount, result, variables, stringify); } public Map render(Map in, Map variables) throws IllegalVariableEvaluationException { diff --git a/core/src/main/java/io/kestra/core/runners/pebble/Extension.java b/core/src/main/java/io/kestra/core/runners/pebble/Extension.java index c669bdccec..a077fa0cce 100644 --- a/core/src/main/java/io/kestra/core/runners/pebble/Extension.java +++ b/core/src/main/java/io/kestra/core/runners/pebble/Extension.java @@ -26,6 +26,9 @@ public class Extension extends AbstractExtension { @Inject private SecretFunction secretFunction; + @Inject + private KvFunction kvFunction; + @Inject private ReadFileFunction readFileFunction; @@ -105,6 +108,7 @@ public Map getFunctions() { functions.put("json", new JsonFunction()); functions.put("currentEachOutput", new CurrentEachOutputFunction()); functions.put("secret", secretFunction); + functions.put("kv", kvFunction); functions.put("read", readFileFunction); if (this.renderFunction != null) { functions.put("render", renderFunction); diff --git a/core/src/main/java/io/kestra/core/runners/pebble/JsonWriter.java b/core/src/main/java/io/kestra/core/runners/pebble/JsonWriter.java index f117bcf3e9..5790def7ae 100644 --- a/core/src/main/java/io/kestra/core/runners/pebble/JsonWriter.java +++ b/core/src/main/java/io/kestra/core/runners/pebble/JsonWriter.java @@ -11,14 +11,10 @@ import java.util.Collection; import java.util.Map; -public class JsonWriter extends Writer implements SpecializedWriter { +public class JsonWriter extends OutputWriter implements SpecializedWriter { private static final ObjectMapper MAPPER = JacksonMapper.ofJson(); - private final StringWriter stringWriter; - - public JsonWriter(StringWriter stringWriter) { - this.stringWriter = stringWriter; - } + private final StringWriter stringWriter = new StringWriter(); @Override public void writeSpecialized(int i) { @@ -93,4 +89,9 @@ public void close() throws IOException { public String toString() { return stringWriter.toString(); } + + @Override + public Object output() { + return this.toString(); + } } diff --git a/core/src/main/java/io/kestra/core/runners/pebble/OutputWriter.java b/core/src/main/java/io/kestra/core/runners/pebble/OutputWriter.java new file mode 100644 index 0000000000..2de28fb8bd --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/pebble/OutputWriter.java @@ -0,0 +1,7 @@ +package io.kestra.core.runners.pebble; + +import java.io.Writer; + +public abstract class OutputWriter extends Writer { + public abstract Object output(); +} diff --git a/core/src/main/java/io/kestra/core/runners/pebble/TypedObjectWriter.java b/core/src/main/java/io/kestra/core/runners/pebble/TypedObjectWriter.java new file mode 100644 index 0000000000..17307c39a0 --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/pebble/TypedObjectWriter.java @@ -0,0 +1,179 @@ +package io.kestra.core.runners.pebble; + +import io.pebbletemplates.pebble.extension.writer.SpecializedWriter; +import lombok.SneakyThrows; + +import java.io.IOException; +import java.io.Writer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.stream.IntStream; + +public class TypedObjectWriter extends OutputWriter implements SpecializedWriter { + private Object current; + + @Override + public void writeSpecialized(int i) { + if (current == null) { + current = i; + } else { + Integer currentI = this.ofSameTypeOrThrow(current, Integer.class); + current = currentI + i; + } + } + + @Override + public void writeSpecialized(long l) { + if (current == null) { + current = l; + } else { + Long currentL = this.ofSameTypeOrThrow(current, Long.class); + current = currentL + l; + } + } + + @Override + public void writeSpecialized(double d) { + if (current == null) { + current = d; + } else { + Double currentD = this.ofSameTypeOrThrow(current, Double.class); + current = currentD + d; + } + } + + @Override + public void writeSpecialized(float f) { + if (current == null) { + current = f; + } else { + Float currentF = this.ofSameTypeOrThrow(current, Float.class); + current = currentF + f; + } + } + + @Override + public void writeSpecialized(short s) { + if (current == null) { + current = s; + } else { + Short currentS = null; + try { + currentS = this.ofSameTypeOrThrow(current, Short.class); + } catch (Exception e) { + try { + current = this.ofSameTypeOrThrow(current, Integer.class) + s; + return; + } catch (Exception ex) { + throw e; + } + } + current = currentS + s; + } + } + + @Override + public void writeSpecialized(byte b) { + if (current == null) { + current = b; + } else { + Byte currentB = null; + try { + currentB = this.ofSameTypeOrThrow(current, Byte.class); + } catch (Exception e) { + try { + current = this.ofSameTypeOrThrow(current, Integer.class) + b; + return; + } catch (Exception ex) { + throw e; + } + } + current = currentB + b; + } + } + + @Override + public void writeSpecialized(char c) { + if (current == null) { + current = c; + } else { + Character currentC; + try { + currentC = this.ofSameTypeOrThrow(current, Character.class); + } catch (Exception e) { + try { + current = this.ofSameTypeOrThrow(current, String.class) + c; + return; + } catch (Exception ex) { + throw e; + } + } + current = "" + currentC + c; + } + } + + @Override + public void writeSpecialized(String s) { + if (current == null) { + current = s; + } else { + String currentS = this.ofSameTypeOrThrow(current, String.class); + current = currentS + s; + } + } + + @SneakyThrows + @Override + public void write(Object o) { + if (current == null) { + current = o; + } else { + throwIllegalAddition(current, o.getClass()); + } + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + if (current == null) { + current = String.valueOf(Arrays.copyOfRange(cbuf, off, off + len)); + return; + } + + if (current instanceof String) { + current += String.valueOf(Arrays.copyOfRange(cbuf, off, off + len)); + } else { + for (int idx = off; idx < off + len; idx++) { + this.writeSpecialized(cbuf[idx]); + } + } + } + + @Override + public void flush() throws IOException { + // no-op + } + + @Override + public void close() throws IOException { + // no-op + } + + private T ofSameTypeOrThrow(Object baseObject, Class clazz) { + if (clazz.isAssignableFrom(baseObject.getClass())) { + return clazz.cast(baseObject); + } else { + throwIllegalAddition(baseObject, clazz); + return null; + } + } + + private static void throwIllegalAddition(Object baseObject, Class clazz) { + throw new IllegalArgumentException("Tried to add " + clazz.getName() + " to " + baseObject.getClass().getName()); + } + + @Override + public Object output() { + return this.current; + } +} diff --git a/core/src/main/java/io/kestra/core/runners/pebble/functions/KvFunction.java b/core/src/main/java/io/kestra/core/runners/pebble/functions/KvFunction.java new file mode 100644 index 0000000000..164a137063 --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/pebble/functions/KvFunction.java @@ -0,0 +1,76 @@ +package io.kestra.core.runners.pebble.functions; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.exceptions.ResourceExpiredException; +import io.kestra.core.runners.RunVariables; +import io.kestra.core.secret.SecretService; +import io.kestra.core.services.FlowService; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.kv.InternalKVStore; +import io.kestra.core.utils.TruthUtils; +import io.pebbletemplates.pebble.error.PebbleException; +import io.pebbletemplates.pebble.extension.Function; +import io.pebbletemplates.pebble.template.EvaluationContext; +import io.pebbletemplates.pebble.template.PebbleTemplate; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +@Slf4j +@Singleton +public class KvFunction implements Function { + @Inject + private FlowService flowService; + + @Inject + private StorageInterface storageInterface; + + @Override + public List getArgumentNames() { + return List.of("key", "namespace", "errorOnMissing"); + } + + @SuppressWarnings("unchecked") + @Override + public Object execute(Map args, PebbleTemplate self, EvaluationContext context, int lineNumber) { + String key = getKey(args, self, lineNumber); + String namespace = (String) args.get("namespace"); + Boolean errorOnMissing = (Boolean) args.get("errorOnMissing"); + + Map flow = (Map) context.getVariable("flow"); + String flowNamespace = flow.get("namespace"); + String flowTenantId = flow.get("tenantId"); + if (namespace == null) { + namespace = flowNamespace; + } else { + flowService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace); + } + + Optional value; + try { + value = new InternalKVStore(flowTenantId, namespace, storageInterface).get(key); + } catch (Exception e) { + throw new PebbleException(e, e.getMessage(), lineNumber, self.getName()); + } + + if (value.isEmpty() && errorOnMissing == Boolean.TRUE) { + throw new PebbleException(null, "The key '" + key + "' does not exist in the namespace '" + namespace + "'.", lineNumber, self.getName()); + } + + return value.orElse(null); + } + + protected String getKey(Map args, PebbleTemplate self, int lineNumber) { + if (!args.containsKey("key")) { + throw new PebbleException(null, "The 'kv' function expects an argument 'key'.", lineNumber, self.getName()); + } + + return (String) args.get("key"); + } +} \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/storages/FileAttributes.java b/core/src/main/java/io/kestra/core/storages/FileAttributes.java index f2989d0055..c83b8509f7 100644 --- a/core/src/main/java/io/kestra/core/storages/FileAttributes.java +++ b/core/src/main/java/io/kestra/core/storages/FileAttributes.java @@ -2,6 +2,10 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Map; + @JsonSerialize(as = FileAttributes.class) public interface FileAttributes { String getFileName(); @@ -14,6 +18,8 @@ public interface FileAttributes { long getSize(); + Map getMetadata() throws IOException; + enum FileType { File, Directory diff --git a/core/src/main/java/io/kestra/core/storages/InternalStorage.java b/core/src/main/java/io/kestra/core/storages/InternalStorage.java index b747790617..d0facd70a7 100644 --- a/core/src/main/java/io/kestra/core/storages/InternalStorage.java +++ b/core/src/main/java/io/kestra/core/storages/InternalStorage.java @@ -1,6 +1,8 @@ package io.kestra.core.storages; import io.kestra.core.services.FlowService; +import io.kestra.core.storages.kv.InternalKVStore; +import io.kestra.core.storages.kv.KVStore; import jakarta.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +58,23 @@ public InternalStorage(Logger logger, StorageContext context, StorageInterface s this.flowService = flowService; } + @Override + public KVStore namespaceKv() { + return new InternalKVStore(logger, context.getTenantId(), context.getNamespace(), storage); + } + + @Override + public KVStore namespaceKv(String namespace) { + boolean isExternalNamespace = !namespace.equals(context.getNamespace()); + // Checks whether the contextual namespace is allowed to access the passed namespace. + if (isExternalNamespace && flowService != null) { + flowService.checkAllowedNamespace( + context.getTenantId(), namespace, // requested Tenant/Namespace + context.getTenantId(), context.getNamespace() // from Tenant/Namespace + ); + } + return new InternalKVStore(logger, context.getTenantId(), namespace, storage); } + /** * {@inheritDoc} **/ diff --git a/core/src/main/java/io/kestra/core/storages/Storage.java b/core/src/main/java/io/kestra/core/storages/Storage.java index cbdf20e698..fd86efc7b1 100644 --- a/core/src/main/java/io/kestra/core/storages/Storage.java +++ b/core/src/main/java/io/kestra/core/storages/Storage.java @@ -1,5 +1,6 @@ package io.kestra.core.storages; +import io.kestra.core.storages.kv.KVStore; import jakarta.annotation.Nullable; import java.io.File; @@ -15,6 +16,20 @@ */ public interface Storage { + /** + * Gets access to the Key-Value store for the contextual namespace. + * + * @return The {@link KVStore}. + */ + KVStore namespaceKv(); + + /** + * Gets access to the Key-Value store for the given namespace. + * + * @return The {@link KVStore}. + */ + KVStore namespaceKv(String namespace); + /** * Gets access to the namespace files for the contextual namespace. * diff --git a/core/src/main/java/io/kestra/core/storages/StorageContext.java b/core/src/main/java/io/kestra/core/storages/StorageContext.java index 79f6edb072..f1ec210875 100644 --- a/core/src/main/java/io/kestra/core/storages/StorageContext.java +++ b/core/src/main/java/io/kestra/core/storages/StorageContext.java @@ -30,6 +30,8 @@ public class StorageContext { // /{namespace}/_files static final String PREFIX_FORMAT_NAMESPACE_FILE = "/%s/_files"; + // /{namespace}/_kv + static final String PREFIX_FORMAT_KV = "/%s/_kv"; // /{namespace}/{flow-id} static final String PREFIX_FORMAT_FLOWS = "/%s/%s"; // /{namespace}/{flow-id}/executions/{execution-id} @@ -291,6 +293,11 @@ public static String namespaceFilePrefix(String namespace) { return String.format(PREFIX_FORMAT_NAMESPACE_FILE, namespace.replace(".", "/")); } + + public static String kvPrefix(String namespace) { + return String.format(PREFIX_FORMAT_KV, namespace.replace(".", "/")); + } + /** * A storage context scoped to a Task. */ diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterface.java b/core/src/main/java/io/kestra/core/storages/StorageInterface.java index ffe7629bd2..bafe9b0344 100644 --- a/core/src/main/java/io/kestra/core/storages/StorageInterface.java +++ b/core/src/main/java/io/kestra/core/storages/StorageInterface.java @@ -12,6 +12,7 @@ import java.io.InputStream; import java.net.URI; import java.util.List; +import java.util.Map; public interface StorageInterface extends AutoCloseable, Plugin { @@ -35,6 +36,9 @@ default void close() { @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) InputStream get(String tenantId, URI uri) throws IOException; + @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) + StorageObject getWithMetadata(String tenantId, URI uri) throws IOException; + /** * Returns all objects that start with the given prefix * @@ -67,7 +71,12 @@ default boolean exists(String tenantId, URI uri) { FileAttributes getAttributes(String tenantId, URI uri) throws IOException; @Retryable(includes = {IOException.class}) - URI put(String tenantId, URI uri, InputStream data) throws IOException; + default URI put(String tenantId, URI uri, InputStream data) throws IOException { + return this.put(tenantId, uri, new StorageObject(null, data)); + } + + @Retryable(includes = {IOException.class}) + URI put(String tenantId, URI uri, StorageObject storageObject) throws IOException; @Retryable(includes = {IOException.class}) boolean delete(String tenantId, URI uri) throws IOException; diff --git a/core/src/main/java/io/kestra/core/storages/StorageObject.java b/core/src/main/java/io/kestra/core/storages/StorageObject.java new file mode 100644 index 0000000000..b0a0835903 --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/StorageObject.java @@ -0,0 +1,7 @@ +package io.kestra.core.storages; + +import java.io.InputStream; +import java.util.Map; + +public record StorageObject(Map metadata, InputStream inputStream) { +} diff --git a/core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java b/core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java new file mode 100644 index 0000000000..fb26a35b82 --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java @@ -0,0 +1,123 @@ +package io.kestra.core.storages.kv; + +import io.kestra.core.exceptions.ResourceExpiredException; +import io.kestra.core.serializers.JacksonMapper; +import io.kestra.core.storages.FileAttributes; +import io.kestra.core.storages.Storage; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.StorageObject; +import jakarta.annotation.Nullable; +import org.apache.commons.lang3.NotImplementedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import static io.kestra.core.utils.Rethrow.throwFunction; + +/** + * The default {@link KVStore} implementation. + * This class acts as a facade to the {@link StorageInterface} for manipulating Key-Value store. + * + * @see Storage#namespaceKv() + * @see Storage#namespaceKv(String) + */ +public class InternalKVStore implements KVStore { + + private static final Logger log = LoggerFactory.getLogger(InternalKVStore.class); + + private final String namespace; + private final String tenant; + private final StorageInterface storage; + private final Logger logger; + + /** + * Creates a new {@link InternalKVStore} instance. + * + * @param namespace The namespace + * @param storage The storage. + */ + public InternalKVStore(final String tenant, final String namespace, final StorageInterface storage) { + this(log, tenant, namespace, storage); + } + + /** + * Creates a new {@link InternalKVStore} instance. + * + * @param logger The logger to be used by this class. + * @param namespace The namespace + * @param tenant The tenant. + * @param storage The storage. + */ + public InternalKVStore(final Logger logger, @Nullable final String tenant, final String namespace, final StorageInterface storage) { + this.logger = Objects.requireNonNull(logger, "logger cannot be null"); + this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null"); + this.storage = Objects.requireNonNull(storage, "storage cannot be null"); + this.tenant = tenant; + } + + @Override + public String namespace() { + return this.namespace; + } + + @Override + public void putRaw(String key, KVStoreValueWrapper kvStoreValueWrapper) throws IOException { + this.validateKey(key); + + this.storage.put(this.tenant, this.storageUri(key), new StorageObject( + kvStoreValueWrapper.metadataAsMap(), + new ByteArrayInputStream(kvStoreValueWrapper.value().getBytes()) + )); + } + + @Override + public Optional getRaw(String key) throws IOException, ResourceExpiredException { + this.validateKey(key); + + StorageObject withMetadata; + try { + withMetadata = this.storage.getWithMetadata(this.tenant, this.storageUri(key)); + } catch (FileNotFoundException e) { + return Optional.empty(); + } + KVStoreValueWrapper kvStoreValueWrapper = KVStoreValueWrapper.from(withMetadata); + + Instant expirationDate = kvStoreValueWrapper.kvMetadata().getExpirationDate(); + if (expirationDate != null && Instant.now().isAfter(expirationDate)) { + this.delete(key); + + throw new ResourceExpiredException("The requested value has expired"); + } + return Optional.of(kvStoreValueWrapper.value()); + } + + @Override + public boolean delete(String key) throws IOException { + this.validateKey(key); + + return this.storage.delete(this.tenant, this.storageUri(key)); + } + + @Override + public List list() throws IOException { + List list; + try { + list = this.storage.list(this.tenant, this.storageUri(null)); + } catch (FileNotFoundException e) { + return Collections.emptyList(); + } + return list.stream() + .map(throwFunction(KVEntry::from)) + .filter(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isBefore(expirationDate)).orElse(true)) + .toList(); + } +} diff --git a/core/src/main/java/io/kestra/core/storages/kv/KVEntry.java b/core/src/main/java/io/kestra/core/storages/kv/KVEntry.java new file mode 100644 index 0000000000..5da4696a84 --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/kv/KVEntry.java @@ -0,0 +1,21 @@ +package io.kestra.core.storages.kv; + +import io.kestra.core.storages.FileAttributes; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Optional; + +public record KVEntry(String key, Instant creationDate, Instant updateDate, Instant expirationDate) { + public static KVEntry from(FileAttributes fileAttributes) throws IOException { + return new KVEntry( + fileAttributes.getFileName().replace(".ion", ""), + Instant.ofEpochMilli(fileAttributes.getCreationTime()), + Instant.ofEpochMilli(fileAttributes.getLastModifiedTime()), + Optional.ofNullable(new KVMetadata(fileAttributes.getMetadata()).getExpirationDate()) + .map(expirationDate -> expirationDate.truncatedTo(ChronoUnit.MILLIS)) + .orElse(null) + ); + } +} diff --git a/core/src/main/java/io/kestra/core/storages/kv/KVMetadata.java b/core/src/main/java/io/kestra/core/storages/kv/KVMetadata.java new file mode 100644 index 0000000000..06fa81521f --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/kv/KVMetadata.java @@ -0,0 +1,40 @@ +package io.kestra.core.storages.kv; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class KVMetadata { + private Instant expirationDate; + + public KVMetadata(Duration ttl) { + if (ttl != null && ttl.isNegative()) { + throw new IllegalArgumentException("ttl cannot be negative"); + } + + if (ttl != null) { + this.expirationDate = Instant.now().plus(ttl); + } + } + + public KVMetadata(Map metadata) { + this.expirationDate = Optional.ofNullable(metadata) + .map(map -> map.get("expirationDate")) + .map(Instant::parse) + .orElse(null); + } + + public Instant getExpirationDate() { + return expirationDate; + } + + public Map toMap() { + Map map = new HashMap<>(); + if (expirationDate != null) { + map.put("expirationDate", expirationDate.toString()); + } + return map; + } +} diff --git a/core/src/main/java/io/kestra/core/storages/kv/KVStore.java b/core/src/main/java/io/kestra/core/storages/kv/KVStore.java new file mode 100644 index 0000000000..d5d4a7574f --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/kv/KVStore.java @@ -0,0 +1,77 @@ +package io.kestra.core.storages.kv; + +import io.kestra.core.exceptions.ResourceExpiredException; +import io.kestra.core.serializers.JacksonMapper; +import io.kestra.core.storages.StorageContext; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.regex.Pattern; + +import static io.kestra.core.utils.Rethrow.throwFunction; + +/** + * Service interface for accessing the files attached to a namespace Key-Value store. + */ +public interface KVStore { + Pattern durationPattern = Pattern.compile("^P(?=.)(?:\\d*D)?(?:T(?=.)(?:\\d*H)?(?:\\d*M)?(?:\\d*S)?)?$"); + + default void validateKey(String key) { + if (key == null || key.isEmpty()) { + throw new IllegalArgumentException("Key cannot be null or empty"); + } + + if (!key.matches("[a-zA-Z0-9][a-zA-Z0-9._-]*")) { + throw new IllegalArgumentException("Key must start with an alphanumeric character (uppercase or lowercase) and can contain alphanumeric characters (uppercase or lowercase), dots (.), underscores (_), and hyphens (-) only."); + } + } + + String namespace(); + + default URI storageUri(String key) { + return this.storageUri(key, namespace()); + } + + default URI storageUri(String key, String namespace) { + String filePath = key == null ? "" : ("/" + key + ".ion"); + return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + filePath); + } + + default void put(String key, KVStoreValueWrapper kvStoreValueWrapper) throws IOException { + Object value = kvStoreValueWrapper.value(); + String ionValue; + if (value instanceof Duration duration) { + ionValue = duration.toString(); + } else { + ionValue = JacksonMapper.ofIon().writeValueAsString(value); + } + + this.putRaw(key, new KVStoreValueWrapper<>(kvStoreValueWrapper.kvMetadata(), ionValue)); + } + + void putRaw(String key, KVStoreValueWrapper kvStoreValueWrapper) throws IOException; + + default Optional get(String key) throws IOException, ResourceExpiredException { + return this.getRaw(key).map(throwFunction(raw -> { + Object value = JacksonMapper.ofIon().readValue(raw, Object.class); + if (value instanceof String valueStr && durationPattern.matcher(valueStr).matches()) { + return Duration.parse(valueStr); + } + + return value; + })); + } + + Optional getRaw(String key) throws IOException, ResourceExpiredException; + + boolean delete(String key) throws IOException; + + List list() throws IOException; + + default boolean exists(String key) throws IOException { + return list().stream().anyMatch(kvEntry -> kvEntry.key().equals(key)); + } +} diff --git a/core/src/main/java/io/kestra/core/storages/kv/KVStoreValueWrapper.java b/core/src/main/java/io/kestra/core/storages/kv/KVStoreValueWrapper.java new file mode 100644 index 0000000000..3581d4abec --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/kv/KVStoreValueWrapper.java @@ -0,0 +1,37 @@ +package io.kestra.core.storages.kv; + +import io.kestra.core.storages.StorageObject; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Optional; + +public class KVStoreValueWrapper { + private final KVMetadata kvMetadata; + private final T value; + + public KVStoreValueWrapper(KVMetadata kvMetadata, T value) { + this.kvMetadata = kvMetadata; + this.value = value; + } + + public KVMetadata kvMetadata() { + return kvMetadata; + } + + public Map metadataAsMap() { + return Optional.ofNullable(kvMetadata).map(KVMetadata::toMap).orElse(null); + } + + public T value() { + return value; + } + + static KVStoreValueWrapper from(StorageObject storageObject) throws IOException { + try (InputStream is = storageObject.inputStream()) { + String ionString = new String(is.readAllBytes()); + return new KVStoreValueWrapper<>(new KVMetadata(storageObject.metadata()), ionString); + } + } +} diff --git a/core/src/main/java/io/kestra/plugin/core/kv/Delete.java b/core/src/main/java/io/kestra/plugin/core/kv/Delete.java new file mode 100644 index 0000000000..b7edbe1d9f --- /dev/null +++ b/core/src/main/java/io/kestra/plugin/core/kv/Delete.java @@ -0,0 +1,100 @@ +package io.kestra.plugin.core.kv; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.models.tasks.VoidOutput; +import io.kestra.core.runners.DefaultRunContext; +import io.kestra.core.runners.RunContext; +import io.kestra.core.services.FlowService; +import io.kestra.core.storages.Namespace; +import io.kestra.core.storages.NamespaceFile; +import io.kestra.core.storages.kv.KVMetadata; +import io.kestra.core.storages.kv.KVStoreValueWrapper; +import io.kestra.core.utils.PathMatcherPredicate; +import io.kestra.core.utils.Rethrow; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.codehaus.commons.nullanalysis.NotNull; +import org.slf4j.Logger; + +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +@SuperBuilder(toBuilder = true) +@Getter +@NoArgsConstructor +@Schema( + title = "Deletes a KV pair." +) +@Plugin( + examples = { + @Example( + title = "Delete a KV pair.", + code = """ + id: delete_kv + type: io.kestra.plugin.core.kv.Delete + key: myvariable + namespace: dev # the current namespace of the flow will be used by default""" + ) + } +) +public class Delete extends Task implements RunnableTask { + @NotNull + @Schema( + title = "The key for which to delete the value." + ) + @PluginProperty(dynamic = true) + private String key; + + @NotNull + @Schema( + title = "The namespace on which to set the value." + ) + @PluginProperty(dynamic = true) + @Builder.Default + private String namespace = "{{ flow.namespace }}"; + + @NotNull + @Schema( + title = "Whether to fail if there is no value for the given key." + ) + @PluginProperty + @Builder.Default + private boolean errorOnMissing = false; + + @Override + public Output run(RunContext runContext) throws Exception { + String renderedNamespace = runContext.render(this.namespace); + + FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); + flowService.checkAllowedNamespace(runContext.tenantId(), renderedNamespace, runContext.tenantId(), runContext.flowInfo().namespace()); + + String renderedKey = runContext.render(this.key); + + boolean deleted = runContext.storage().namespaceKv(renderedNamespace).delete(renderedKey); + if (this.errorOnMissing && !deleted) { + throw new NoSuchElementException("No value found for key '" + renderedKey + "' in namespace '" + renderedNamespace + "' and `errorOnMissing` is set to true"); + } + + return Output.builder().deleted(deleted).build(); + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Whether the deletion was successful and had a value." + ) + private final boolean deleted; + } +} diff --git a/core/src/main/java/io/kestra/plugin/core/kv/Get.java b/core/src/main/java/io/kestra/plugin/core/kv/Get.java new file mode 100644 index 0000000000..61082001b0 --- /dev/null +++ b/core/src/main/java/io/kestra/plugin/core/kv/Get.java @@ -0,0 +1,105 @@ +package io.kestra.plugin.core.kv; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.runners.DefaultRunContext; +import io.kestra.core.runners.RunContext; +import io.kestra.core.services.FlowService; +import io.kestra.core.storages.Namespace; +import io.kestra.core.storages.kv.KVMetadata; +import io.kestra.core.storages.kv.KVStoreValueWrapper; +import io.kestra.core.utils.PathMatcherPredicate; +import io.kestra.core.utils.Rethrow; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.commons.nullanalysis.NotNull; +import org.slf4j.Logger; + +import java.io.InputStream; +import java.net.URI; +import java.time.Duration; +import java.util.*; +import java.util.stream.Collectors; + +@Slf4j +@SuperBuilder(toBuilder = true) +@Getter +@NoArgsConstructor +@Schema( + title = "Gets value linked to a key." +) +@Plugin( + examples = { + @Example( + title = "Get value for `myvariable` key in `dev` namespace and fail if it's not present.", + code = """ + id: get_kv + type: io.kestra.plugin.core.kv.Get + key: myvariable + namespace: dev # the current namespace of the flow will be used by default + errorOnMissing: true""" + ) + } +) +public class Get extends Task implements RunnableTask { + @NotNull + @Schema( + title = "The key for which to get the value." + ) + @PluginProperty(dynamic = true) + private String key; + + @NotNull + @Schema( + title = "The namespace on which to get the value." + ) + @PluginProperty(dynamic = true) + @Builder.Default + private String namespace = "{{ flow.namespace }}"; + + @NotNull + @Schema( + title = "Whether to fail if there is no value for the given key." + ) + @PluginProperty + @Builder.Default + private boolean errorOnMissing = false; + + + @Override + public Output run(RunContext runContext) throws Exception { + String renderedNamespace = runContext.render(this.namespace); + + FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); + flowService.checkAllowedNamespace(runContext.tenantId(), renderedNamespace, runContext.tenantId(), runContext.flowInfo().namespace()); + + String renderedKey = runContext.render(this.key); + + Optional maybeValue = runContext.storage().namespaceKv(renderedNamespace).get(renderedKey); + if (this.errorOnMissing && maybeValue.isEmpty()) { + throw new NoSuchElementException("No value found for key '" + renderedKey + "' in namespace '" + renderedNamespace + "' and `errorOnMissing` is set to true"); + } + + return Output.builder() + .value(maybeValue.orElse(null)) + .build(); + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Value retrieve for the key.", + description = "This can be of any type and will keep the same as when it was set." + ) + private final Object value; + } +} diff --git a/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java b/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java new file mode 100644 index 0000000000..05dc9b11c0 --- /dev/null +++ b/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java @@ -0,0 +1,84 @@ +package io.kestra.plugin.core.kv; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.runners.DefaultRunContext; +import io.kestra.core.runners.RunContext; +import io.kestra.core.services.FlowService; +import io.kestra.core.storages.kv.KVEntry; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.commons.nullanalysis.NotNull; + +import java.util.*; + +@Slf4j +@SuperBuilder(toBuilder = true) +@Getter +@NoArgsConstructor +@Schema( + title = "Gets keys matching a given prefix." +) +@Plugin( + examples = { + @Example( + title = "Get keys that are prefixed by `myvar`.", + code = """ + id: keys_kv + type: io.kestra.plugin.core.kv.GetKeys + prefix: myvar + namespace: dev # the current namespace of the flow will be used by default""" + ) + } +) +public class GetKeys extends Task implements RunnableTask { + @Schema( + title = "The key for which to get the value." + ) + @PluginProperty(dynamic = true) + private String prefix; + + @NotNull + @Schema( + title = "The namespace on which to get the value." + ) + @PluginProperty(dynamic = true) + @Builder.Default + private String namespace = "{{ flow.namespace }}"; + + + @Override + public Output run(RunContext runContext) throws Exception { + String renderedNamespace = runContext.render(this.namespace); + + FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); + flowService.checkAllowedNamespace(runContext.tenantId(), renderedNamespace, runContext.tenantId(), runContext.flowInfo().namespace()); + + String renderedPrefix = runContext.render(this.prefix); + + List keys = runContext.storage().namespaceKv(renderedNamespace).list().stream() + .map(KVEntry::key) + .filter(key -> key.startsWith(renderedPrefix)) + .toList(); + + return Output.builder() + .keys(keys) + .build(); + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Found keys for given prefix." + ) + private final List keys; + } +} diff --git a/core/src/main/java/io/kestra/plugin/core/kv/Set.java b/core/src/main/java/io/kestra/plugin/core/kv/Set.java new file mode 100644 index 0000000000..9a5025a312 --- /dev/null +++ b/core/src/main/java/io/kestra/plugin/core/kv/Set.java @@ -0,0 +1,111 @@ +package io.kestra.plugin.core.kv; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.models.tasks.VoidOutput; +import io.kestra.core.runners.DefaultRunContext; +import io.kestra.core.runners.RunContext; +import io.kestra.core.services.FlowService; +import io.kestra.core.storages.Namespace; +import io.kestra.core.storages.kv.KVMetadata; +import io.kestra.core.storages.kv.KVStoreValueWrapper; +import io.kestra.core.utils.FileUtils; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.codehaus.commons.nullanalysis.NotNull; + +import java.io.File; +import java.io.FileInputStream; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static io.kestra.core.utils.PathUtil.checkLeadingSlash; + +@SuperBuilder(toBuilder = true) +@Getter +@NoArgsConstructor +@Schema( + title = "Sets (or modifies) a KV pair." +) +@Plugin( + examples = { + @Example( + title = "Set `query` task `uri` output as value for `myvariable` key in `dev` namespace.", + code = """ + id: set_kv + type: io.kestra.plugin.core.kv.Set + key: myvariable + value: "{{ outputs.query.uri }}" + namespace: dev # the current namespace of the flow will be used by default + overwrite: true # whether to overwrite or fail if a value for that key already exists; default true + ttl: P30D # optional TTL""" + ) + } +) +public class Set extends Task implements RunnableTask { + @NotNull + @Schema( + title = "The key for which to set the value." + ) + @PluginProperty(dynamic = true) + private String key; + + @NotNull + @Schema( + title = "The value to map to the key." + ) + @PluginProperty(dynamic = true) + private String value; + + @NotNull + @Schema( + title = "The namespace on which to set the value." + ) + @PluginProperty(dynamic = true) + @Builder.Default + private String namespace = "{{ flow.namespace }}"; + + @NotNull + @Schema( + title = "Whether to overwrite or fail if a value for the given key already exists." + ) + @PluginProperty + @Builder.Default + private boolean overwrite = true; + + @Schema( + title = "Optional time-to-live for the key-value pair." + ) + @PluginProperty + private Duration ttl; + + @Override + public VoidOutput run(RunContext runContext) throws Exception { + String renderedNamespace = runContext.render(this.namespace); + + FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); + flowService.checkAllowedNamespace(runContext.tenantId(), renderedNamespace, runContext.tenantId(), runContext.flowInfo().namespace()); + + String renderedKey = runContext.render(this.key); + Object renderedValue = runContext.renderTyped(this.value); + + if (!this.overwrite && runContext.storage().namespaceKv(renderedNamespace).exists(renderedKey)) { + throw new IllegalStateException("Key already exists and overwrite is set to `false`"); + } + runContext.storage().namespaceKv(renderedNamespace).put(renderedKey, new KVStoreValueWrapper<>(new KVMetadata(ttl), renderedValue)); + + return null; + } +} diff --git a/core/src/main/java/io/kestra/plugin/core/kv/package-info.java b/core/src/main/java/io/kestra/plugin/core/kv/package-info.java new file mode 100644 index 0000000000..0117eb7b96 --- /dev/null +++ b/core/src/main/java/io/kestra/plugin/core/kv/package-info.java @@ -0,0 +1,4 @@ +@PluginSubGroup(categories = PluginSubGroup.PluginCategory.CORE) +package io.kestra.plugin.core.kv; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/core/src/main/resources/icons/io.kestra.plugin.core.kv.Get.svg b/core/src/main/resources/icons/io.kestra.plugin.core.kv.Get.svg new file mode 100644 index 0000000000..caf9d76310 --- /dev/null +++ b/core/src/main/resources/icons/io.kestra.plugin.core.kv.Get.svg @@ -0,0 +1,4 @@ + + + + diff --git a/core/src/main/resources/icons/io.kestra.plugin.core.kv.GetKeys.svg b/core/src/main/resources/icons/io.kestra.plugin.core.kv.GetKeys.svg new file mode 100644 index 0000000000..a534ad4775 --- /dev/null +++ b/core/src/main/resources/icons/io.kestra.plugin.core.kv.GetKeys.svg @@ -0,0 +1,3 @@ + + + diff --git a/core/src/main/resources/icons/io.kestra.plugin.core.kv.Set.svg b/core/src/main/resources/icons/io.kestra.plugin.core.kv.Set.svg new file mode 100644 index 0000000000..d3eb5b4220 --- /dev/null +++ b/core/src/main/resources/icons/io.kestra.plugin.core.kv.Set.svg @@ -0,0 +1,4 @@ + + + + diff --git a/core/src/main/resources/icons/io.kestra.plugin.core.kv.svg b/core/src/main/resources/icons/io.kestra.plugin.core.kv.svg new file mode 100644 index 0000000000..e92707b778 --- /dev/null +++ b/core/src/main/resources/icons/io.kestra.plugin.core.kv.svg @@ -0,0 +1,3 @@ + + + diff --git a/core/src/test/java/io/kestra/core/runners/pebble/TypedObjectWriterTest.java b/core/src/test/java/io/kestra/core/runners/pebble/TypedObjectWriterTest.java new file mode 100644 index 0000000000..79418a554f --- /dev/null +++ b/core/src/test/java/io/kestra/core/runners/pebble/TypedObjectWriterTest.java @@ -0,0 +1,113 @@ +package io.kestra.core.runners.pebble; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class TypedObjectWriterTest { + @Test + void invalidAddition() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.writeSpecialized(1); + IllegalArgumentException illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, () -> writer.writeSpecialized('a')); + assertThat(illegalArgumentException.getMessage(), is("Tried to add java.lang.Character to java.lang.Integer")); + } + } + + @Test + void writeInts() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.writeSpecialized(1); + writer.writeSpecialized(2); + writer.writeSpecialized(3); + assertThat(writer.output(), is(6)); + } + } + + @Test + void writeLongs() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.writeSpecialized(1L); + writer.writeSpecialized(2L); + writer.writeSpecialized(3L); + assertThat(writer.output(), is(6L)); + } + } + + @Test + void writeDoubles() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.writeSpecialized(1.0); + writer.writeSpecialized(2.0); + writer.writeSpecialized(3.0); + assertThat(writer.output(), is(6.0)); + } + } + + @Test + void writeFloats() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.writeSpecialized(1.0f); + writer.writeSpecialized(2.0f); + writer.writeSpecialized(3.0f); + assertThat(writer.output(), is(6.0f)); + } + } + + @Test + void writeShorts() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.writeSpecialized((short) 1); + writer.writeSpecialized((short) 2); + writer.writeSpecialized((short) 3); + assertThat(writer.output(), is(6)); + } + } + + @Test + void writeBytes() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + byte aByte = "a".getBytes()[0]; + writer.writeSpecialized(aByte); + byte bByte = "b".getBytes()[0]; + writer.writeSpecialized(bByte); + byte cByte = "c".getBytes()[0]; + writer.writeSpecialized(cByte); + assertThat(writer.output(), is((aByte + bByte) + cByte)); + } + } + + @Test + void writeChars() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.writeSpecialized('a'); + writer.writeSpecialized('b'); + writer.writeSpecialized('c'); + assertThat(writer.output(), is("abc")); + } + } + + @Test + void writeStrings() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.writeSpecialized("a"); + writer.writeSpecialized("b"); + writer.writeSpecialized("c"); + assertThat(writer.output(), is("abc")); + } + } + + @Test + void writeObjects() throws IOException { + try (TypedObjectWriter writer = new TypedObjectWriter()){ + writer.write(Map.of("a", "b")); + IllegalArgumentException illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, () -> writer.write(Map.of("c", "d"))); + assertThat(illegalArgumentException.getMessage(), is("Tried to add java.util.ImmutableCollections$Map1 to java.util.ImmutableCollections$Map1")); + } + } +} diff --git a/core/src/test/java/io/kestra/core/runners/pebble/functions/KvFunctionTest.java b/core/src/test/java/io/kestra/core/runners/pebble/functions/KvFunctionTest.java new file mode 100644 index 0000000000..7af1b75bae --- /dev/null +++ b/core/src/test/java/io/kestra/core/runners/pebble/functions/KvFunctionTest.java @@ -0,0 +1,92 @@ +package io.kestra.core.runners.pebble.functions; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.flows.State; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.runners.AbstractMemoryRunnerTest; +import io.kestra.core.runners.RunnerUtils; +import io.kestra.core.secret.SecretService; +import io.kestra.core.storages.StorageContext; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.kv.InternalKVStore; +import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.storages.kv.KVStoreValueWrapper; +import io.kestra.core.utils.TestsUtils; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@KestraTest +public class KvFunctionTest extends AbstractMemoryRunnerTest { + @Inject + private RunnerUtils runnerUtils; + + @Inject + private StorageInterface storageInterface; + + @Inject + private QueueInterface logQueue; + + @BeforeEach + void reset() throws IOException { + storageInterface.deleteByPrefix(null, URI.create(StorageContext.kvPrefix("io.kestra.tests"))); + } + + @Test + void get() throws TimeoutException, IOException { + KVStore kv = new InternalKVStore(null, "io.kestra.tests", storageInterface); + kv.put("my-key", new KVStoreValueWrapper<>(null, Map.of("field", "value"))); + + Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "kv"); + assertThat(execution.getTaskRunList().getFirst().getOutputs().get("value"), is("value")); + assertThat(execution.getTaskRunList().get(1).getOutputs().get("value"), is("value")); + } + + @Test + void getAnotherNamespace() throws TimeoutException, IOException { + KVStore kv = new InternalKVStore(null, "io.kestra.tests", storageInterface); + kv.put("my-key", new KVStoreValueWrapper<>(null, Map.of("field", "value"))); + KVStore kvForOtherNs = new InternalKVStore(null, "io.kestra.another.ns", storageInterface); + kvForOtherNs.put("my-key", new KVStoreValueWrapper<>(null, Map.of("field", "anotherValue"))); + + Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "kv", null, (flow, exec) -> Map.of("namespace", "io.kestra.another.ns")); + assertThat(execution.getTaskRunList().getFirst().getOutputs().get("value"), is("value")); + assertThat(execution.getTaskRunList().get(1).getOutputs().get("value"), is("anotherValue")); + } + + @Test + void getKeyNotFound() throws TimeoutException { + Flux receive = TestsUtils.receive(logQueue); + + Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "kv", null, (flow, exec) -> Map.of("errorOnMissing", true)); + assertThat(execution.getTaskRunList().getFirst().getOutputs().get("value"), is("")); + assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.SUCCESS)); + TaskRun taskRun = execution.getTaskRunList().get(1); + assertThat(taskRun.getState().getCurrent(), is(State.Type.FAILED)); + + assertThat( + receive.toStream() + .filter(logEntry -> logEntry.getTaskRunId() != null && logEntry.getTaskRunId().equals(taskRun.getId())) + .anyMatch(log -> log.getMessage().contains("io.pebbletemplates.pebble.error.PebbleException: The key 'my-key' does not exist in the namespace 'io.kestra.tests'. ({{ kv('my-key', inputs.namespace, inputs.errorOnMissing).field }}:1")), + is(true) + ); + } +} diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java index 9825003113..01b5a03871 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java @@ -102,7 +102,6 @@ void simple() throws Exception { .orElseThrow(); assertThat(executionCount.size(), is(10)); - assertThat(executionList.size(), is(10)); assertThat(SchedulerStreamingTest.startedEvaluate.get(false), is(1)); assertThat(last.getTrigger().getVariables().get("startedEvaluate"), is(1)); } @@ -123,7 +122,6 @@ void failed() throws Exception { .toList(); assertThat(executionCount.size(), greaterThan(10)); - assertThat(executionList.size(), greaterThan(10)); assertThat(SchedulerStreamingTest.startedEvaluate.get(true), greaterThan(1)); } ); diff --git a/core/src/test/java/io/kestra/core/storages/InternalKVStoreTest.java b/core/src/test/java/io/kestra/core/storages/InternalKVStoreTest.java new file mode 100644 index 0000000000..ed54163a3f --- /dev/null +++ b/core/src/test/java/io/kestra/core/storages/InternalKVStoreTest.java @@ -0,0 +1,170 @@ +package io.kestra.core.storages; + +import io.kestra.core.exceptions.ResourceExpiredException; +import io.kestra.core.serializers.JacksonMapper; +import io.kestra.core.storages.kv.InternalKVStore; +import io.kestra.core.storages.kv.KVEntry; +import io.kestra.core.storages.kv.KVMetadata; +import io.kestra.core.storages.kv.KVStoreValueWrapper; +import io.kestra.core.utils.IdUtils; +import io.kestra.storage.local.LocalStorage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +class InternalKVStoreTest { + private static final Instant date = Instant.now().truncatedTo(ChronoUnit.MILLIS); + private static final Map complexValue = Map.of("some", "complex", "object", Map.of("with", "nested", "values", date)); + private static final Logger logger = LoggerFactory.getLogger(InternalKVStoreTest.class); + + LocalStorage storageInterface; + + @BeforeEach + public void setUp() throws IOException { + Path basePath = Files.createTempDirectory("unit"); + storageInterface = new LocalStorage(); + storageInterface.setBasePath(basePath); + storageInterface.init(); + } + + @Test + void list() throws IOException { + Instant before = Instant.now().minusMillis(100); + InternalKVStore kv = kv(); + + assertThat(kv.list().size(), is(0)); + + kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue)); + kv.put("my-second-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(10)), complexValue)); + kv.put("expired-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMillis(1)), complexValue)); + Instant after = Instant.now().plusMillis(100); + + List list = kv.list(); + assertThat(list.size(), is(2)); + + list.forEach(kvEntry -> { + assertThat(kvEntry.creationDate().isAfter(before) && kvEntry.creationDate().isBefore(after), is(true)); + assertThat(kvEntry.updateDate().isAfter(before) && kvEntry.updateDate().isBefore(after), is(true)); + }); + + Map map = list.stream().collect(Collectors.toMap(KVEntry::key, Function.identity())); + // Check that we don't list expired keys + assertThat(map.size(), is(2)); + + KVEntry myKeyValue = map.get("my-key"); + assertThat( + myKeyValue.creationDate().plus(Duration.ofMinutes(4)).isBefore(myKeyValue.expirationDate()) && + myKeyValue.creationDate().plus(Duration.ofMinutes(6)).isAfter(myKeyValue.expirationDate()), + is(true) + ); + + KVEntry mySecondKeyValue = map.get("my-second-key"); + assertThat( + mySecondKeyValue.creationDate().plus(Duration.ofMinutes(9)).isBefore(mySecondKeyValue.expirationDate()) && + mySecondKeyValue.creationDate().plus(Duration.ofMinutes(11)).isAfter(mySecondKeyValue.expirationDate()), + is(true) + ); + } + + @Test + void put() throws IOException { + // Given + final InternalKVStore kv = kv(); + + // When + Instant before = Instant.now(); + kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue)); + + // Then + StorageObject withMetadata = storageInterface.getWithMetadata(null, URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion")); + String valueFile = new String(withMetadata.inputStream().readAllBytes()); + Instant expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate")); + assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(4))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(6))), is(true)); + assertThat(valueFile, is(JacksonMapper.ofIon().writeValueAsString(complexValue))); + + // Re-When + kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(10)), "some-value")); + + // Then + withMetadata = storageInterface.getWithMetadata(null, URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion")); + valueFile = new String(withMetadata.inputStream().readAllBytes()); + expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate")); + assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(9))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(11))), is(true)); + assertThat(valueFile, is("\"some-value\"")); + } + + @Test + void get() throws IOException, ResourceExpiredException { + // Given + final InternalKVStore kv = kv(); + kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), complexValue)); + + // When + Optional value = kv.get("my-key"); + + // Then + assertThat(value.get(), is(complexValue)); + } + + @Test + void getUnknownKey() throws IOException, ResourceExpiredException { + // Given + final InternalKVStore kv = kv(); + + // When + Optional value = kv.get("my-key"); + + // Then + assertThat(value.isEmpty(), is(true)); + } + + @Test + void getExpiredKV() throws IOException { + // Given + final InternalKVStore kv = kv(); + kv.put("my-key", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofNanos(1)), complexValue)); + + // When + Assertions.assertThrows(ResourceExpiredException.class, () -> kv.get("my-key")); + } + + @Test + void illegalKey() { + InternalKVStore kv = kv(); + String expectedErrorMessage = "Key must start with an alphanumeric character (uppercase or lowercase) and can contain alphanumeric characters (uppercase or lowercase), dots (.), underscores (_), and hyphens (-) only."; + + IllegalArgumentException illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, () -> kv.validateKey("a/b")); + assertThat(illegalArgumentException.getMessage(), is(expectedErrorMessage)); + illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, () -> kv.get("a/b")); + assertThat(illegalArgumentException.getMessage(), is(expectedErrorMessage)); + illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, () -> kv.put("a/b", new KVStoreValueWrapper<>(new KVMetadata(Duration.ofMinutes(5)), "content"))); + assertThat(illegalArgumentException.getMessage(), is(expectedErrorMessage)); + illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, () -> kv.delete("a/b")); + assertThat(illegalArgumentException.getMessage(), is(expectedErrorMessage)); + + Assertions.assertDoesNotThrow(() -> kv.validateKey("AN_UPPER.CASE-key")); + } + + private InternalKVStore kv() { + final String namespaceId = "io.kestra." + IdUtils.create(); + return new InternalKVStore(logger, null, namespaceId, storageInterface); + } +} \ No newline at end of file diff --git a/core/src/test/java/io/kestra/plugin/core/kv/DeleteTest.java b/core/src/test/java/io/kestra/plugin/core/kv/DeleteTest.java new file mode 100644 index 0000000000..c5e9ed706f --- /dev/null +++ b/core/src/test/java/io/kestra/plugin/core/kv/DeleteTest.java @@ -0,0 +1,77 @@ +package io.kestra.plugin.core.kv; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.storages.kv.KVStoreValueWrapper; +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.TestsUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Map; +import java.util.NoSuchElementException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +@KestraTest +public class DeleteTest { + @Inject + RunContextFactory runContextFactory; + + @Test + void defaultCase() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + String key = "my-key"; + + Delete delete = Delete.builder() + .id(Delete.class.getSimpleName()) + .type(Delete.class.getName()) + .namespace("{{ inputs.namespace }}") + .key("{{ inputs.key }}") + .build(); + + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, delete, Map.of( + "namespace", namespaceId, + "key", key + )); + + final KVStore kv = runContext.storage().namespaceKv(namespaceId); + kv.put(key, new KVStoreValueWrapper<>(null, "value")); + + Delete.Output run = delete.run(runContext); + + assertThat(run.isDeleted(), is(true)); + } + + @Test + void nonPresentKey() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + Delete delete = Delete.builder() + .id(Delete.class.getSimpleName()) + .type(Delete.class.getName()) + .namespace(namespaceId) + .key("my-key") + .build(); + + + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, delete, Collections.emptyMap()); + Delete.Output run = delete.run(runContext); + + assertThat(run.isDeleted(), is(false)); + + NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> delete.toBuilder().errorOnMissing(true).build().run(runContext)); + assertThat(noSuchElementException.getMessage(), is("No value found for key 'my-key' in namespace '" + namespaceId + "' and `errorOnMissing` is set to true")); + } +} diff --git a/core/src/test/java/io/kestra/plugin/core/kv/GetKeysTest.java b/core/src/test/java/io/kestra/plugin/core/kv/GetKeysTest.java new file mode 100644 index 0000000000..f9285b5567 --- /dev/null +++ b/core/src/test/java/io/kestra/plugin/core/kv/GetKeysTest.java @@ -0,0 +1,73 @@ +package io.kestra.plugin.core.kv; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.storages.kv.KVStoreValueWrapper; +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.TestsUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@KestraTest +public class GetKeysTest { + @Inject + RunContextFactory runContextFactory; + + @Test + void defaultCase() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + String prefix = "my"; + GetKeys getKeys = GetKeys.builder() + .id(GetKeys.class.getSimpleName()) + .type(GetKeys.class.getName()) + .namespace("{{ inputs.namespace }}") + .prefix("{{ inputs.prefix }}") + .build(); + + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, getKeys, Map.of( + "namespace", namespaceId, + "prefix", prefix + )); + + final KVStore kv = runContext.storage().namespaceKv(namespaceId); + kv.put(prefix + "-key", new KVStoreValueWrapper<>(null, "value")); + kv.put(prefix + "-second-key", new KVStoreValueWrapper<>(null, "value")); + kv.put("another-key", new KVStoreValueWrapper<>(null, "value")); + + GetKeys.Output run = getKeys.run(runContext); + + assertThat(run.getKeys(), containsInAnyOrder("my-key", "my-second-key")); + } + + @Test + void noKeysReturnsEmptyList() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + GetKeys getKeys = GetKeys.builder() + .id(GetKeys.class.getSimpleName()) + .type(GetKeys.class.getName()) + .namespace(namespaceId) + .build(); + + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, getKeys, Collections.emptyMap()); + GetKeys.Output run = getKeys.run(runContext); + + assertThat(run.getKeys(), empty()); + } +} diff --git a/core/src/test/java/io/kestra/plugin/core/kv/GetTest.java b/core/src/test/java/io/kestra/plugin/core/kv/GetTest.java new file mode 100644 index 0000000000..9dd4d284e5 --- /dev/null +++ b/core/src/test/java/io/kestra/plugin/core/kv/GetTest.java @@ -0,0 +1,80 @@ +package io.kestra.plugin.core.kv; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.Storage; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.storages.kv.KVStoreValueWrapper; +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.TestsUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Map; +import java.util.NoSuchElementException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +@KestraTest +public class GetTest { + @Inject + RunContextFactory runContextFactory; + + @Test + void defaultCase() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + String key = "my-key"; + var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string"); + + Get get = Get.builder() + .id(Get.class.getSimpleName()) + .type(Get.class.getName()) + .namespace("{{ inputs.namespace }}") + .key("{{ inputs.key }}") + .build(); + + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, get, Map.of( + "namespace", namespaceId, + "key", key + )); + + final KVStore kv = runContext.storage().namespaceKv(namespaceId); + kv.put(key, new KVStoreValueWrapper<>(null, value)); + + Get.Output run = get.run(runContext); + + assertThat(run.getValue(), is(value)); + } + + @Test + void nonPresentKey() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + Get get = Get.builder() + .id(Get.class.getSimpleName()) + .type(Get.class.getName()) + .namespace(namespaceId) + .key("my-key") + .build(); + + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, get, Collections.emptyMap()); + Get.Output run = get.run(runContext); + + assertThat(run.getValue(), nullValue()); + + NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> get.toBuilder().errorOnMissing(true).build().run(runContext)); + assertThat(noSuchElementException.getMessage(), is("No value found for key 'my-key' in namespace '" + namespaceId + "' and `errorOnMissing` is set to true")); + } +} diff --git a/core/src/test/java/io/kestra/plugin/core/kv/SetTest.java b/core/src/test/java/io/kestra/plugin/core/kv/SetTest.java new file mode 100644 index 0000000000..980214d8a9 --- /dev/null +++ b/core/src/test/java/io/kestra/plugin/core/kv/SetTest.java @@ -0,0 +1,106 @@ +package io.kestra.plugin.core.kv; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.TestsUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +@KestraTest +public class SetTest { + @Inject + StorageInterface storageInterface; + + @Inject + RunContextFactory runContextFactory; + + @Test + void defaultCase() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + Set set = Set.builder() + .id(Set.class.getSimpleName()) + .type(Set.class.getName()) + .namespace("{{ inputs.namespace }}") + .key("{{ inputs.key }}") + .value("{{ inputs.value }}") + .build(); + + String key = "my-key"; + var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string"); + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, Map.of( + "namespace", namespaceId, + "key", key, + "value", value + )); + set.run(runContext); + + final KVStore kv = runContext.storage().namespaceKv(namespaceId); + assertThat(kv.get(key).get(), is(value)); + assertThat(kv.list().get(0).expirationDate(), nullValue()); + } + + @Test + void ttl() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + String key = "my-key"; + String value = "value"; + Set set = Set.builder() + .id(Set.class.getSimpleName()) + .type(Set.class.getName()) + .namespace(namespaceId) + .key(key) + .value(value) + .ttl(Duration.ofMinutes(5)) + .build(); + + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, Collections.emptyMap()); + set.run(runContext); + + final KVStore kv = runContext.storage().namespaceKv(namespaceId); + assertThat(kv.get(key).get(), is(value)); + Instant expirationDate = kv.list().get(0).expirationDate(); + assertThat(expirationDate.isAfter(Instant.now().plus(Duration.ofMinutes(4))) && expirationDate.isBefore(Instant.now().plus(Duration.ofMinutes(6))), is(true)); + } + + @Test + void dontAllowOverwriteIfFalse() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); + + String key = "my-key"; + String value = "value"; + Set set = Set.builder() + .id(Set.class.getSimpleName()) + .type(Set.class.getName()) + .namespace(namespaceId) + .key(key) + .value(value) + .overwrite(false) + .build(); + + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, Collections.emptyMap()); + set.run(runContext); + + IllegalStateException illegalStateException = Assertions.assertThrows(IllegalStateException.class, () -> set.run(runContext)); + assertThat(illegalStateException.getMessage(), is("Key already exists and overwrite is set to `false`")); + } +} diff --git a/core/src/test/resources/flows/valids/kv.yaml b/core/src/test/resources/flows/valids/kv.yaml new file mode 100644 index 0000000000..3ff2a46443 --- /dev/null +++ b/core/src/test/resources/flows/valids/kv.yaml @@ -0,0 +1,18 @@ +id: kv +namespace: io.kestra.tests +inputs: + - name: namespace + type: STRING + required: false + defaults: "io.kestra.tests" + - name: errorOnMissing + type: BOOLEAN + required: false + defaults: false +tasks: + - id: get + type: io.kestra.plugin.core.debug.Return + format: "{{ kv('my-key').field ?? null }}" + - id: getWithArgs + type: io.kestra.plugin.core.debug.Return + format: "{{ kv('my-key', inputs.namespace, inputs.errorOnMissing).field }}" \ No newline at end of file diff --git a/storage-local/src/main/java/io/kestra/storage/local/LocalFileAttributes.java b/storage-local/src/main/java/io/kestra/storage/local/LocalFileAttributes.java index 3adc684870..f659b3472e 100644 --- a/storage-local/src/main/java/io/kestra/storage/local/LocalFileAttributes.java +++ b/storage-local/src/main/java/io/kestra/storage/local/LocalFileAttributes.java @@ -1,22 +1,31 @@ package io.kestra.storage.local; -import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.type.TypeReference; +import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.storages.FileAttributes; import lombok.Builder; import lombok.Value; +import org.apache.commons.io.IOUtils; -import javax.naming.directory.InvalidAttributesException; +import java.io.*; +import java.nio.file.Path; import java.nio.file.attribute.BasicFileAttributes; +import java.util.Map; import static io.kestra.core.storages.FileAttributes.FileType.*; @Value @Builder public class LocalFileAttributes implements FileAttributes { - String fileName; + Path filePath; BasicFileAttributes basicFileAttributes; + @Override + public String getFileName() { + return filePath.getFileName().toString(); + } + @Override public long getLastModifiedTime() { return basicFileAttributes.lastModifiedTime().toMillis(); @@ -34,7 +43,7 @@ public FileType getType() { } else if (basicFileAttributes.isDirectory()) { return Directory; } else { - throw new RuntimeException("Unknown type for file %s".formatted(fileName)); + throw new RuntimeException("Unknown type for file %s".formatted(getFileName())); } } @@ -42,4 +51,22 @@ public FileType getType() { public long getSize() { return basicFileAttributes.size(); } + + @Override + public Map getMetadata() throws IOException { + return LocalFileAttributes.getMetadata(this.filePath); + } + + public static Map getMetadata(Path filePath) throws IOException { + File metadataFile = new File(filePath.toString() + ".metadata"); + if (metadataFile.exists()) { + try(InputStream is = new FileInputStream(metadataFile)){ + String metadataFileContent = new String(is.readAllBytes()); + return JacksonMapper.ofIon().readValue(metadataFileContent, new TypeReference<>() { + }); + } + } + + return null; + } } diff --git a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java index 47045b8bac..a8d079ead1 100644 --- a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java +++ b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java @@ -2,8 +2,10 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.StorageObject; import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.NoArgsConstructor; @@ -19,6 +21,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.function.Predicate; import java.util.stream.Stream; @@ -63,6 +66,11 @@ public InputStream get(String tenantId, URI uri) throws IOException { ); } + @Override + public StorageObject getWithMetadata(String tenantId, URI uri) throws IOException { + return new StorageObject(LocalFileAttributes.getMetadata(this.getPath(tenantId, uri)), this.get(tenantId, uri)); + } + @Override public List allByPrefix(String tenantId, URI prefix, boolean includeDirectories) throws IOException { Path fsPath = getPath(tenantId, prefix); @@ -78,7 +86,9 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) th @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { - uris.add(URI.create(file.toString().replace("\\", "/"))); + if (!file.getFileName().toString().endsWith(".metadata")) { + uris.add(URI.create(file.toString().replace("\\", "/"))); + } return FileVisitResult.CONTINUE; } @@ -111,6 +121,7 @@ public boolean exists(String tenantId, URI uri) { public List list(String tenantId, URI uri) throws IOException { try (Stream stream = Files.list(getPath(tenantId, uri))) { return stream + .filter(path -> !path.getFileName().toString().endsWith(".metadata")) .map(throwFunction(file -> { URI relative = URI.create( getPath(tenantId, null).relativize( @@ -126,20 +137,28 @@ public List list(String tenantId, URI uri) throws IOException { } @Override - public URI put(String tenantId, URI uri, InputStream data) throws IOException { + public URI put(String tenantId, URI uri, StorageObject storageObject) throws IOException { File file = getPath(tenantId, uri).toFile(); File parent = file.getParentFile(); if (!parent.exists()) { parent.mkdirs(); } - try (data; OutputStream outStream = new FileOutputStream(file)) { + try (InputStream data = storageObject.inputStream(); OutputStream outStream = new FileOutputStream(file)) { byte[] buffer = new byte[8 * 1024]; int bytesRead; while ((bytesRead = data.read(buffer)) != -1) { outStream.write(buffer, 0, bytesRead); } } + + Map metadata = storageObject.metadata(); + if (metadata != null) { + try (OutputStream outStream = new FileOutputStream(file.toPath() + ".metadata")) { + outStream.write(JacksonMapper.ofIon().writeValueAsBytes(metadata)); + } + } + return URI.create("kestra://" + uri.getPath()); } @@ -148,7 +167,7 @@ public FileAttributes getAttributes(String tenantId, URI uri) throws IOException Path path = getPath(tenantId, uri); try { return LocalFileAttributes.builder() - .fileName(path.getFileName().toString()) + .filePath(path) .basicFileAttributes(Files.readAttributes(path, BasicFileAttributes.class)) .build(); } catch (NoSuchFileException e) { diff --git a/tests/src/main/java/io/kestra/core/storage/StorageTestSuite.java b/tests/src/main/java/io/kestra/core/storage/StorageTestSuite.java index f0530740ca..5dffff48cb 100644 --- a/tests/src/main/java/io/kestra/core/storage/StorageTestSuite.java +++ b/tests/src/main/java/io/kestra/core/storage/StorageTestSuite.java @@ -3,6 +3,7 @@ import com.google.common.io.CharStreams; import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.StorageObject; import io.kestra.core.utils.IdUtils; import io.kestra.core.junit.annotations.KestraTest; import jakarta.inject.Inject; @@ -13,6 +14,7 @@ import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; +import java.util.Map; import static io.kestra.core.utils.Rethrow.throwConsumer; import static org.hamcrest.MatcherAssert.assertThat; @@ -1008,9 +1010,23 @@ void deleteByPrefixWithScheme() throws Exception { assertThat(storageInterface.exists(tenantId, new URI(s)), is(false)); })); } - //endregion + @Test + void metadata() throws Exception { + String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); + + Map expectedMetadata = Map.of( + "key1", "value1", + "key2", "value2" + ); + putFile(tenantId, "/" + prefix + "/storage/get.yml", expectedMetadata); + StorageObject withMetadata = storageInterface.getWithMetadata(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")); + assertThat(CharStreams.toString(new InputStreamReader(withMetadata.inputStream())), is(contentString)); + assertThat(withMetadata.metadata(), is(expectedMetadata)); + } + private URI putFile(String tenantId, String path) throws Exception { return storageInterface.put( tenantId, @@ -1018,4 +1034,15 @@ private URI putFile(String tenantId, String path) throws Exception { new ByteArrayInputStream(contentString.getBytes()) ); } + + private URI putFile(String tenantId, String path, Map metadata) throws Exception { + return storageInterface.put( + tenantId, + new URI(path), + new StorageObject( + metadata, + new ByteArrayInputStream(contentString.getBytes()) + ) + ); + } } diff --git a/ui/src/components/executions/date-select/DateFilter.vue b/ui/src/components/executions/date-select/DateFilter.vue index 663a940c77..5810f7bb39 100644 --- a/ui/src/components/executions/date-select/DateFilter.vue +++ b/ui/src/components/executions/date-select/DateFilter.vue @@ -18,7 +18,7 @@ @update:model-value="onAbsFilterChange" class="w-auto" /> - import DateRange from "../../layout/DateRange.vue"; - import RelativeDateSelect from "./RelativeDateSelect.vue"; + import TimeSelect from "./TimeSelect.vue"; export default { components: { DateRange, - RelativeDateSelect + TimeSelect }, emits: [ "update:isRelative", diff --git a/ui/src/components/executions/date-select/DateSelect.vue b/ui/src/components/executions/date-select/DateSelect.vue index 8303930f83..87e0747737 100644 --- a/ui/src/components/executions/date-select/DateSelect.vue +++ b/ui/src/components/executions/date-select/DateSelect.vue @@ -1,8 +1,10 @@