Skip to content

Commit

Permalink
fix(core): only allow KV to be created for an existing namespace (#4522)
Browse files Browse the repository at this point in the history
Fix: #4522
  • Loading branch information
fhussonnois committed Aug 2, 2024
1 parent 02c8689 commit c2c4150
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void string() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("string").get(), is("stringValue"));
assertThat(kvStore.getRaw("string").get(), is("\"stringValue\""));
Expand All @@ -63,7 +63,7 @@ void integer() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("int").get(), is(1));
assertThat(kvStore.getRaw("int").get(), is("1"));
Expand All @@ -90,7 +90,7 @@ void integerStr() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("intStr").get(), is("1"));
assertThat(kvStore.getRaw("intStr").get(), is("\"1\""));
Expand All @@ -116,7 +116,7 @@ void object() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("object").get(), is(Map.of("some", "json")));
assertThat(kvStore.getRaw("object").get(), is("{some:\"json\"}"));
Expand All @@ -143,7 +143,7 @@ void objectStr() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("objectStr").get(), is("{\"some\":\"json\"}"));
assertThat(kvStore.getRaw("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
Expand Down Expand Up @@ -175,7 +175,7 @@ void fromFile() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("objectFromFile").get(), is(Map.of("some", "json", "from", "file")));
assertThat(kvStore.getRaw("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface;
Expand Down Expand Up @@ -439,7 +438,7 @@ public String version() {

@Override
public KVStore namespaceKv(String namespace) {
return kvStoreService.namespaceKv(tenantId(), namespace, this.flowInfo().namespace());
return kvStoreService.get(tenantId(), namespace, this.flowInfo().namespace());
}

/**
Expand Down
9 changes: 0 additions & 9 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,6 @@ public abstract class RunContext {
*/
public abstract String version();

/**
* Gets access to the Key-Value store for the contextual namespace.
*
* @return The {@link KVStore}.
*/
public KVStore namespaceKv() {
return this.namespaceKv(this.flowInfo().namespace());
}

/**
* Gets access to the Key-Value store for the given namespace.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
package io.kestra.core.runners.pebble.functions;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.runners.RunVariables;
import io.kestra.core.secret.SecretService;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.utils.TruthUtils;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
Expand All @@ -17,11 +10,9 @@
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

@Slf4j
@Singleton
Expand Down Expand Up @@ -54,7 +45,7 @@ public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationC

Optional<Object> value;
try {
value = kvStoreService.namespaceKv(flowTenantId, namespace, flowNamespace).get(key);
value = kvStoreService.get(flowTenantId, namespace, flowNamespace).get(key);
} catch (Exception e) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
}
Expand Down
41 changes: 39 additions & 2 deletions core/src/main/java/io/kestra/core/services/KVStoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,64 @@
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVStoreException;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
public class KVStoreService {

@Inject
private StorageInterface storageInterface;

@Inject
private FlowService flowService;

@Inject
private NamespaceService namespaceService;

/**
* Gets access to the Key-Value store for the given namespace.
*
* @param tenant The tenant ID.
* @param namespace The namespace of the K/V store.
* @param fromNamespace The namespace from which the K/V store is accessed.
* @return The {@link KVStore}.
*/
public KVStore namespaceKv(String tenant, String namespace, String fromNamespace) {
public KVStore get(String tenant, String namespace, @Nullable String fromNamespace) {

if (!namespaceService.isNamespaceExists(tenant, namespace)) {
throw new KVStoreException(String.format(
"Cannot access the KV store. The namespace '%s' does not exist.",
namespace
));
}

if (fromNamespace != null && !isEqualsOrDescendant(namespace, fromNamespace)) {

throw new KVStoreException(String.format(
"Cannot access the KV store. The '%s' namespace. " +
"is neither equal to nor a descendant of '%s'",
namespace,
fromNamespace
));
}

if (fromNamespace != null && !fromNamespace.equals(namespace)) {
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
try {
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
} catch (IllegalArgumentException e) {
throw new KVStoreException(String.format(
"Cannot access the KV store. Access to '%s' namespace is not allowed from '%s'.", namespace, fromNamespace)
);
}
}

return new InternalKVStore(tenant, namespace, storageInterface);
}

private static boolean isEqualsOrDescendant(String namespace, String parent) {
return namespace.equals(parent) || namespace.startsWith(parent);
}
}
34 changes: 34 additions & 0 deletions core/src/main/java/io/kestra/core/services/NamespaceService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.kestra.core.services;

import io.kestra.core.repositories.FlowRepositoryInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.List;
import java.util.Objects;

@Singleton
public class NamespaceService {

private final FlowRepositoryInterface flowRepository;

@Inject
public NamespaceService(FlowRepositoryInterface flowRepository) {
this.flowRepository = flowRepository;
}

/**
* Checks whether a given namespace exists.
*
* @param tenant The tenant ID
* @param namespace The namespace - cannot be null.
* @return {@code true} if the namespace exist. Otherwise {@link false}.
*/
public boolean isNamespaceExists(String tenant, String namespace) {
Objects.requireNonNull(namespace, "namespace cannot be null");

List<String> namespaces = flowRepository.findDistinctNamespace(tenant);
return namespaces.stream().anyMatch(ns -> ns.equals(namespace));
}

}
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package io.kestra.core.storages.kv;

import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageObject;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
Expand All @@ -29,8 +23,6 @@
*/
public class InternalKVStore implements KVStore {

private static final Logger log = LoggerFactory.getLogger(InternalKVStore.class);

private final String namespace;
private final String tenant;
private final StorageInterface storage;
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/storages/kv/KVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -41,6 +42,16 @@ default URI storageUri(String key, String namespace) {
}

default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper) throws IOException {
put(key, kvStoreValueWrapper, true);
}
default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper, boolean overwrite) throws IOException {
Objects.requireNonNull(key, "key cannot be null");

if (!overwrite && exists(key)) {
throw new KVStoreException(String.format(
"Cannot set value for key '%s'. Key already exists and `overwrite` is set to `false`.", key));
}

Object value = kvStoreValueWrapper.value();
String ionValue;
if (value instanceof Duration duration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.kestra.core.storages.kv;

import io.kestra.core.exceptions.KestraRuntimeException;

import java.io.Serial;

/**
* The base class for all other KVStore exceptions.
*/
public class KVStoreException extends KestraRuntimeException {

@Serial
private static final long serialVersionUID = 1L;

public KVStoreException() {
}

public KVStoreException(String message) {
super(message);
}

public KVStoreException(String message, Throwable cause) {
super(message, cause);
}

public KVStoreException(Throwable cause) {
super(cause);
}
}
25 changes: 3 additions & 22 deletions core/src/main/java/io/kestra/plugin/core/kv/Set.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,18 @@
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.kv.KVMetadata;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVStoreValueWrapper;
import io.kestra.core.utils.FileUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.codehaus.commons.nullanalysis.NotNull;

import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.kestra.core.utils.PathUtil.checkLeadingSlash;

@SuperBuilder(toBuilder = true)
@Getter
Expand Down Expand Up @@ -96,16 +82,11 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
public VoidOutput run(RunContext runContext) throws Exception {
String renderedNamespace = runContext.render(this.namespace);

FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(runContext.tenantId(), renderedNamespace, runContext.tenantId(), runContext.flowInfo().namespace());

String renderedKey = runContext.render(this.key);
Object renderedValue = runContext.renderTyped(this.value);

if (!this.overwrite && runContext.namespaceKv(renderedNamespace).exists(renderedKey)) {
throw new IllegalStateException("Key already exists and overwrite is set to `false`");
}
runContext.namespaceKv(renderedNamespace).put(renderedKey, new KVStoreValueWrapper<>(new KVMetadata(ttl), renderedValue));
KVStore kvStore = runContext.namespaceKv(renderedNamespace);
kvStore.put(renderedKey, new KVStoreValueWrapper<>(new KVMetadata(ttl), renderedValue), this.overwrite);

return null;
}
Expand Down

0 comments on commit c2c4150

Please sign in to comment.