Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): only allow KV to be created for an existing namespace (#4522) #4523

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ void string() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("string").get(), is("stringValue"));
assertThat(kvStore.getRaw("string").get(), is("\"stringValue\""));
assertThat(kvStore.getValue("string").get(), is("stringValue"));
assertThat(kvStore.getRawValue("string").get(), is("\"stringValue\""));
}
}

Expand All @@ -63,10 +63,10 @@ void integer() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("int").get(), is(1));
assertThat(kvStore.getRaw("int").get(), is("1"));
assertThat(kvStore.getValue("int").get(), is(1));
assertThat(kvStore.getRawValue("int").get(), is("1"));
}
}

Expand All @@ -90,10 +90,10 @@ void integerStr() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("intStr").get(), is("1"));
assertThat(kvStore.getRaw("intStr").get(), is("\"1\""));
assertThat(kvStore.getValue("intStr").get(), is("1"));
assertThat(kvStore.getRawValue("intStr").get(), is("\"1\""));
}
}

Expand All @@ -116,10 +116,10 @@ void object() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("object").get(), is(Map.of("some", "json")));
assertThat(kvStore.getRaw("object").get(), is("{some:\"json\"}"));
assertThat(kvStore.getValue("object").get(), is(Map.of("some", "json")));
assertThat(kvStore.getRawValue("object").get(), is("{some:\"json\"}"));
}
}

Expand All @@ -143,10 +143,10 @@ void objectStr() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("objectStr").get(), is("{\"some\":\"json\"}"));
assertThat(kvStore.getRaw("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
assertThat(kvStore.getValue("objectStr").get(), is("{\"some\":\"json\"}"));
assertThat(kvStore.getRawValue("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
}
}

Expand Down Expand Up @@ -175,10 +175,10 @@ void fromFile() throws IOException, ResourceExpiredException {
PicocliRunner.call(KvUpdateCommand.class, ctx, args);

KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.namespaceKv(null, "io.kestra.cli", null);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);

assertThat(kvStore.get("objectFromFile").get(), is(Map.of("some", "json", "from", "file")));
assertThat(kvStore.getRaw("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
assertThat(kvStore.getValue("objectFromFile").get(), is(Map.of("some", "json", "from", "file")));
assertThat(kvStore.getRawValue("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface;
Expand Down Expand Up @@ -439,7 +438,7 @@ public String version() {

@Override
public KVStore namespaceKv(String namespace) {
return kvStoreService.namespaceKv(tenantId(), namespace, this.flowInfo().namespace());
return kvStoreService.get(tenantId(), namespace, this.flowInfo().namespace());
}

/**
Expand Down
9 changes: 0 additions & 9 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,6 @@ public abstract class RunContext {
*/
public abstract String version();

/**
* Gets access to the Key-Value store for the contextual namespace.
*
* @return The {@link KVStore}.
*/
public KVStore namespaceKv() {
return this.namespaceKv(this.flowInfo().namespace());
}

/**
* Gets access to the Key-Value store for the given namespace.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
package io.kestra.core.runners.pebble.functions;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.runners.RunVariables;
import io.kestra.core.secret.SecretService;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.utils.TruthUtils;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
Expand All @@ -17,11 +10,9 @@
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

@Slf4j
@Singleton
Expand Down Expand Up @@ -54,7 +45,7 @@ public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationC

Optional<Object> value;
try {
value = kvStoreService.namespaceKv(flowTenantId, namespace, flowNamespace).get(key);
value = kvStoreService.get(flowTenantId, namespace, flowNamespace).getValue(key);
} catch (Exception e) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
}
Expand Down
41 changes: 38 additions & 3 deletions core/src/main/java/io/kestra/core/services/KVStoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,60 @@
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVStoreException;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
public class KVStoreService {

@Inject
private StorageInterface storageInterface;

@Inject
private FlowService flowService;

@Inject
private NamespaceService namespaceService;

/**
* Gets access to the Key-Value store for the given namespace.
*
* @param tenant The tenant ID.
* @param namespace The namespace of the K/V store.
* @param fromNamespace The namespace from which the K/V store is accessed.
* @return The {@link KVStore}.
*/
public KVStore namespaceKv(String tenant, String namespace, String fromNamespace) {
if (fromNamespace != null && !fromNamespace.equals(namespace)) {
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
public KVStore get(String tenant, String namespace, @Nullable String fromNamespace) {

boolean checkIfNamespaceExists = fromNamespace == null || !namespace.startsWith(fromNamespace);

if (checkIfNamespaceExists && !namespaceService.isNamespaceExists(tenant, namespace)) {
throw new KVStoreException(String.format(
"Cannot access the KV store. The namespace '%s' does not exist.",
namespace
));
}

boolean isNotSameNamespace = fromNamespace != null && !namespace.equals(fromNamespace);

if (isNotSameNamespace && !namespace.startsWith(fromNamespace)) {
throw new KVStoreException(String.format(
"Cannot access the KV store. The '%s' namespace is neither equal to nor a descendant of '%s'",
namespace,
fromNamespace
));
}

if (isNotSameNamespace) {
try {
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
} catch (IllegalArgumentException e) {
throw new KVStoreException(String.format(
"Cannot access the KV store. Access to '%s' namespace is not allowed from '%s'.", namespace, fromNamespace)
);
}
}

return new InternalKVStore(tenant, namespace, storageInterface);
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/io/kestra/core/services/NamespaceService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.kestra.core.services;

import io.kestra.core.repositories.FlowRepositoryInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.List;
import java.util.Objects;

@Singleton
public class NamespaceService {

private final FlowRepositoryInterface flowRepository;

@Inject
public NamespaceService(FlowRepositoryInterface flowRepository) {
this.flowRepository = flowRepository;
}

/**
* Checks whether a given namespace exists.
*
* @param tenant The tenant ID
* @param namespace The namespace - cannot be null.
* @return {@code true} if the namespace exist. Otherwise {@link false}.
*/
public boolean isNamespaceExists(String tenant, String namespace) {
Objects.requireNonNull(namespace, "namespace cannot be null");

List<String> namespaces = flowRepository.findDistinctNamespace(tenant);
return namespaces.stream().anyMatch(ns -> ns.equals(namespace));
}

}
27 changes: 17 additions & 10 deletions core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package io.kestra.core.storages.kv;

import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageObject;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
Expand All @@ -29,8 +23,6 @@
*/
public class InternalKVStore implements KVStore {

private static final Logger log = LoggerFactory.getLogger(InternalKVStore.class);

private final String namespace;
private final String tenant;
private final StorageInterface storage;
Expand Down Expand Up @@ -64,7 +56,7 @@ public void putRaw(String key, KVStoreValueWrapper<String> kvStoreValueWrapper)
}

@Override
public Optional<String> getRaw(String key) throws IOException, ResourceExpiredException {
public Optional<String> getRawValue(String key) throws IOException, ResourceExpiredException {
this.validateKey(key);

StorageObject withMetadata;
Expand All @@ -78,7 +70,6 @@ public Optional<String> getRaw(String key) throws IOException, ResourceExpiredEx
Instant expirationDate = kvStoreValueWrapper.kvMetadata().getExpirationDate();
if (expirationDate != null && Instant.now().isAfter(expirationDate)) {
this.delete(key);

throw new ResourceExpiredException("The requested value has expired");
}
return Optional.of(kvStoreValueWrapper.value());
Expand All @@ -104,4 +95,20 @@ public List<KVEntry> list() throws IOException {
.filter(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isBefore(expirationDate)).orElse(true))
.toList();
}

@Override
public Optional<KVEntry> get(final String key) throws IOException {
this.validateKey(key);

try {
KVEntry entry = KVEntry.from(this.storage.getAttributes(this.tenant, this.storageUri(key)));
if (entry.expirationDate() != null && Instant.now().isAfter(entry.expirationDate())) {
this.delete(key);
return Optional.empty();
}
return Optional.of(entry);
} catch (FileNotFoundException e) {
return Optional.empty();
}
}
}
44 changes: 41 additions & 3 deletions core/src/main/java/io/kestra/core/storages/kv/KVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -41,6 +42,16 @@ default URI storageUri(String key, String namespace) {
}

default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper) throws IOException {
put(key, kvStoreValueWrapper, true);
}
default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper, boolean overwrite) throws IOException {
Objects.requireNonNull(key, "key cannot be null");

if (!overwrite && exists(key)) {
throw new KVStoreException(String.format(
"Cannot set value for key '%s'. Key already exists and `overwrite` is set to `false`.", key));
}

Object value = kvStoreValueWrapper.value();
String ionValue;
if (value instanceof Duration duration) {
Expand All @@ -54,8 +65,8 @@ default void put(String key, KVStoreValueWrapper<Object> kvStoreValueWrapper) th

void putRaw(String key, KVStoreValueWrapper<String> kvStoreValueWrapper) throws IOException;

default Optional<Object> get(String key) throws IOException, ResourceExpiredException {
return this.getRaw(key).map(throwFunction(raw -> {
default Optional<Object> getValue(String key) throws IOException, ResourceExpiredException {
return this.getRawValue(key).map(throwFunction(raw -> {
Object value = JacksonMapper.ofIon().readValue(raw, Object.class);
if (value instanceof String valueStr && durationPattern.matcher(valueStr).matches()) {
return Duration.parse(valueStr);
Expand All @@ -65,12 +76,39 @@ default Optional<Object> get(String key) throws IOException, ResourceExpiredExce
}));
}

Optional<String> getRaw(String key) throws IOException, ResourceExpiredException;
Optional<String> getRawValue(String key) throws IOException, ResourceExpiredException;

/**
* Deletes the K/V store entry for the given key.
*
* @param key The entry key.
* @throws IOException if an error occurred while executing the operation on the K/V store.
*/
boolean delete(String key) throws IOException;

/**
* Lists all the K/V store entries.
*
* @return The list of {@link KVEntry}.
* @throws IOException if an error occurred while executing the operation on the K/V store.
*/
List<KVEntry> list() throws IOException;

/**
* Finds the K/V store entry for the given key.
*
* @return The {@link KVEntry} or {@link Optional#empty()} if entry exists or the entry expired.
* @throws IOException if an error occurred while executing the operation on the K/V store.
*/
Optional<KVEntry> get(String key) throws IOException;

/**
* Checks whether a K/V entry exists for teh given key.
*
* @param key The entry key.
* @return {@code true} of an entry exists.
* @throws IOException if an error occurred while executing the operation on the K/V store.
*/
default boolean exists(String key) throws IOException {
return list().stream().anyMatch(kvEntry -> kvEntry.key().equals(key));
}
Expand Down
Loading
Loading