Skip to content

Commit

Permalink
fix(core): only allow KV to be created for an existing namespace (#4522)
Browse files Browse the repository at this point in the history
Fix: #4522
  • Loading branch information
fhussonnois committed Aug 2, 2024
1 parent 3e7f059 commit 6b6b01d
Show file tree
Hide file tree
Showing 19 changed files with 380 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ void string() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("string").get(), is("stringValue"));
assertThat(kvStore.getRaw("string").get(), is("\"stringValue\""));
assertThat(kvStore.getValue("string").get(), is("stringValue"));
assertThat(kvStore.getRawValue("string").get(), is("\"stringValue\""));
}
}

Expand All @@ -63,10 +63,10 @@ void integer() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("int").get(), is(1));
assertThat(kvStore.getRaw("int").get(), is("1"));
assertThat(kvStore.getValue("int").get(), is(1));
assertThat(kvStore.getRawValue("int").get(), is("1"));
}
}

Expand All @@ -90,10 +90,10 @@ void integerStr() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("intStr").get(), is("1"));
assertThat(kvStore.getRaw("intStr").get(), is("\"1\""));
assertThat(kvStore.getValue("intStr").get(), is("1"));
assertThat(kvStore.getRawValue("intStr").get(), is("\"1\""));
}
}

Expand All @@ -116,10 +116,10 @@ void object() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("object").get(), is(Map.of("some", "json")));
assertThat(kvStore.getRaw("object").get(), is("{some:\"json\"}"));
assertThat(kvStore.getValue("object").get(), is(Map.of("some", "json")));
assertThat(kvStore.getRawValue("object").get(), is("{some:\"json\"}"));
}
}

Expand All @@ -143,10 +143,10 @@ void objectStr() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("objectStr").get(), is("{\"some\":\"json\"}"));
assertThat(kvStore.getRaw("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
assertThat(kvStore.getValue("objectStr").get(), is("{\"some\":\"json\"}"));
assertThat(kvStore.getRawValue("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
}
}

Expand Down Expand Up @@ -175,10 +175,10 @@ void fromFile() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("objectFromFile").get(), is(Map.of("some", "json", "from", "file")));
assertThat(kvStore.getRaw("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
assertThat(kvStore.getValue("objectFromFile").get(), is(Map.of("some", "json", "from", "file")));
assertThat(kvStore.getRawValue("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface;
Expand Down Expand Up @@ -439,7 +438,7 @@ public String version() {

@Override
public KVStore namespaceKv(String namespace) {
return kvStoreService.namespaceKv(tenantId(), namespace, this.flowInfo().namespace());
return kvStoreService.get(tenantId(), namespace, this.flowInfo().namespace());
}

/**
Expand Down
9 changes: 0 additions & 9 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,6 @@ public abstract class RunContext {
*/
public abstract String version();

/**
* Gets access to the Key-Value store for the contextual namespace.
*
* @return The {@link KVStore}.
*/
public KVStore namespaceKv() {
return this.namespaceKv(this.flowInfo().namespace());
}

/**
* Gets access to the Key-Value store for the given namespace.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
package io.kestra.core.runners.pebble.functions;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.runners.RunVariables;
import io.kestra.core.secret.SecretService;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.utils.TruthUtils;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
Expand All @@ -17,11 +10,9 @@
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

@Slf4j
@Singleton
Expand Down Expand Up @@ -54,7 +45,7 @@ public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationC

Optional<Object> value;
try {
value = kvStoreService.namespaceKv(flowTenantId, namespace, flowNamespace).get(key);
value = kvStoreService.get(flowTenantId, namespace, flowNamespace).getValue(key);
} catch (Exception e) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
}
Expand Down
41 changes: 38 additions & 3 deletions core/src/main/java/io/kestra/core/services/KVStoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,60 @@
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVStoreException;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
public class KVStoreService {

@Inject
private StorageInterface storageInterface;

@Inject
private FlowService flowService;

@Inject
private NamespaceService namespaceService;

/**
* Gets access to the Key-Value store for the given namespace.
*
* @param tenant The tenant ID.
* @param namespace The namespace of the K/V store.
* @param fromNamespace The namespace from which the K/V store is accessed.
* @return The {@link KVStore}.
*/
public KVStore namespaceKv(String tenant, String namespace, String fromNamespace) {
if (fromNamespace != null && !fromNamespace.equals(namespace)) {
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
public KVStore get(String tenant, String namespace, @Nullable String fromNamespace) {

boolean checkIfNamespaceExists = fromNamespace == null || !namespace.startsWith(fromNamespace);

if (checkIfNamespaceExists && !namespaceService.isNamespaceExists(tenant, namespace)) {
throw new KVStoreException(String.format(
"Cannot access the KV store. The namespace '%s' does not exist.",
namespace
));
}

boolean isNotSameNamespace = fromNamespace != null && !namespace.equals(fromNamespace);

if (isNotSameNamespace && !namespace.startsWith(fromNamespace)) {
throw new KVStoreException(String.format(
"Cannot access the KV store. The '%s' namespace is neither equal to nor a descendant of '%s'",
namespace,
fromNamespace
));
}

if (isNotSameNamespace) {
try {
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
} catch (IllegalArgumentException e) {
throw new KVStoreException(String.format(
"Cannot access the KV store. Access to '%s' namespace is not allowed from '%s'.", namespace, fromNamespace)
);
}
}

return new InternalKVStore(tenant, namespace, storageInterface);
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/io/kestra/core/services/NamespaceService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.kestra.core.services;

import io.kestra.core.repositories.FlowRepositoryInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.List;
import java.util.Objects;

@Singleton
public class NamespaceService {

private final FlowRepositoryInterface flowRepository;

@Inject
public NamespaceService(FlowRepositoryInterface flowRepository) {
this.flowRepository = flowRepository;
}

/**
* Checks whether a given namespace exists.
*
* @param tenant The tenant ID
* @param namespace The namespace - cannot be null.
* @return {@code true} if the namespace exist. Otherwise {@link false}.
*/
public boolean isNamespaceExists(String tenant, String namespace) {
Objects.requireNonNull(namespace, "namespace cannot be null");

List<String> namespaces = flowRepository.findDistinctNamespace(tenant);
return namespaces.stream().anyMatch(ns -> ns.equals(namespace));
}

}
27 changes: 17 additions & 10 deletions core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package io.kestra.core.storages.kv;

import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageObject;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
Expand All @@ -29,8 +23,6 @@
*/
public class InternalKVStore implements KVStore {

private static final Logger log = LoggerFactory.getLogger(InternalKVStore.class);

private final String namespace;
private final String tenant;
private final StorageInterface storage;
Expand Down Expand Up @@ -64,7 +56,7 @@ public void putRaw(String key, KVStoreValueWrapper<String> kvStoreValueWrapper)
}

@Override
public Optional<String> getRaw(String key) throws IOException, ResourceExpiredException {
public Optional<String> getRawValue(String key) throws IOException, ResourceExpiredException {
this.validateKey(key);

StorageObject withMetadata;
Expand All @@ -78,7 +70,6 @@ public Optional<String> getRaw(String key) throws IOException, ResourceExpiredEx
Instant expirationDate = kvStoreValueWrapper.kvMetadata().getExpirationDate();
if (expirationDate != null && Instant.now().isAfter(expirationDate)) {
this.delete(key);

throw new ResourceExpiredException("The requested value has expired");
}
return Optional.of(kvStoreValueWrapper.value());
Expand All @@ -104,4 +95,20 @@ public List<KVEntry> list() throws IOException {
.filter(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isBefore(expirationDate)).orElse(true))
.toList();
}

@Override
public Optional<KVEntry> get(final String key) throws IOException {
this.validateKey(key);

try {
KVEntry entry = KVEntry.from(this.storage.getAttributes(this.tenant, this.storageUri(key)));
if (entry.expirationDate() != null && Instant.now().isAfter(entry.expirationDate())) {
this.delete(key);
return Optional.empty();
}
return Optional.of(entry);
} catch (FileNotFoundException e) {
return Optional.empty();
}
}
}
44 changes: 41 additions & 3 deletions core/src/main/java/io/kestra/core/storages/kv/KVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -41,6 +42,16 @@ default URI storageUri(String key, String namespace) {
}

default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper) throws IOException {
put(key, kvStoreValueWrapper, true);
}
default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper, boolean overwrite) throws IOException {
Objects.requireNonNull(key, "key cannot be null");

if (!overwrite && exists(key)) {
throw new KVStoreException(String.format(
"Cannot set value for key '%s'. Key already exists and `overwrite` is set to `false`.", key));
}

Object value = kvStoreValueWrapper.value();
String ionValue;
if (value instanceof Duration duration) {
Expand All @@ -54,8 +65,8 @@ default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper) th

void putRaw(String key, KVStoreValueWrapper<String> kvStoreValueWrapper) throws IOException;

default Optional<Object> get(String key) throws IOException, ResourceExpiredException {
return this.getRaw(key).map(throwFunction(raw -> {
default Optional<Object> getValue(String key) throws IOException, ResourceExpiredException {
return this.getRawValue(key).map(throwFunction(raw -> {
Object value = JacksonMapper.ofIon().readValue(raw, Object.class);
if (value instanceof String valueStr && durationPattern.matcher(valueStr).matches()) {
return Duration.parse(valueStr);
Expand All @@ -65,12 +76,39 @@ default Optional<Object> get(String key) throws IOException, ResourceExpiredExce
}));
}

Optional<String> getRaw(String key) throws IOException, ResourceExpiredException;
Optional<String> getRawValue(String key) throws IOException, ResourceExpiredException;

/**
* Deletes the K/V store entry for the given key.
*
* @param key The entry key.
* @throws IOException if an error occurred while executing the operation on the K/V store.
*/
boolean delete(String key) throws IOException;

/**
* Lists all the K/V store entries.
*
* @return The list of {@link KVEntry}.
* @throws IOException if an error occurred while executing the operation on the K/V store.
*/
List<KVEntry> list() throws IOException;

/**
* Finds the K/V store entry for the given key.
*
* @return The {@link KVEntry} or {@link Optional#empty()} if entry exists or the entry expired.
* @throws IOException if an error occurred while executing the operation on the K/V store.
*/
Optional<KVEntry> get(String key) throws IOException;

/**
* Checks whether a K/V entry exists for teh given key.
*
* @param key The entry key.
* @return {@code true} of an entry exists.
* @throws IOException if an error occurred while executing the operation on the K/V store.
*/
default boolean exists(String key) throws IOException {
return list().stream().anyMatch(kvEntry -> kvEntry.key().equals(key));
}
Expand Down
Loading

0 comments on commit 6b6b01d

Please sign in to comment.