From 3981c57e681b9458fcb73afa81ece29c27cf4cf7 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Thu, 26 Jan 2023 13:12:27 -0800 Subject: [PATCH] Log `allowedHosts` missing value checks and skip `allowedHosts` on resets (#21935) --- .../airbyte/workers/process/DockerProcessFactory.java | 2 +- .../airbyte/workers/process/KubeProcessFactory.java | 2 +- .../activities/GenerateInputActivityImpl.java | 8 ++++++-- .../java/io/airbyte/workers/utils/ConfigReplacer.java | 11 +++++++++++ .../io/airbyte/workers/utils/ConfigReplacerTest.java | 6 +++++- 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java index 7d11394a875c..77381fb1b12a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java @@ -122,7 +122,7 @@ public Process create(final String jobType, "--log-driver", "none"); final String containerName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, DOCKER_NAME_LEN_LIMIT); - LOGGER.info("Creating docker container = {} with resources {} and allowed hosts {}", containerName, resourceRequirements, allowedHosts); + LOGGER.info("Creating docker container = {} with resources {} and allowedHosts {}", containerName, resourceRequirements, allowedHosts); cmd.add("--name"); cmd.add(containerName); cmd.addAll(localDebuggingOptions(containerName)); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index a8d7cc5e411d..b30a5793d3bd 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -95,7 +95,7 @@ public Process create( try { // used to differentiate source and destination processes with the same id and attempt final String podName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, KUBE_NAME_LEN_LIMIT); - LOGGER.info("Attempting to start pod = {} for {} with resources {} and allowed hosts {}", podName, imageName, resourceRequirements, + LOGGER.info("Attempting to start pod = {} for {} with resources {} and allowedHosts {}", podName, imageName, resourceRequirements, allowedHosts); final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 80f0c489242a..62d27f1bf1f6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -35,6 +35,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Singleton @Requires(env = WorkerMode.CONTROL_PLANE) @@ -42,6 +44,7 @@ public class GenerateInputActivityImpl implements GenerateInputActivity { private final JobPersistence jobPersistence; private final ConfigRepository configRepository; + private static final Logger LOGGER = LoggerFactory.getLogger(GenerateInputActivity.class); public GenerateInputActivityImpl(final JobPersistence jobPersistence, final ConfigRepository configRepository) { @@ -52,7 +55,7 @@ public GenerateInputActivityImpl(final JobPersistence jobPersistence, @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { - final ConfigReplacer configReplacer = new ConfigReplacer(); + final ConfigReplacer configReplacer = new ConfigReplacer(LOGGER); try { ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); @@ -117,7 +120,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withDockerImage(config.getSourceDockerImage()) .withProtocolVersion(config.getSourceProtocolVersion()) .withIsCustomConnector(config.getIsSourceCustomConnector()) - .withAllowedHosts(configReplacer.getAllowedHosts(sourceDefinition.getAllowedHosts(), config.getSourceConfiguration())); + .withAllowedHosts(ConfigType.RESET_CONNECTION.equals(jobConfigType) ? null + : configReplacer.getAllowedHosts(sourceDefinition.getAllowedHosts(), config.getSourceConfiguration())); final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(jobId)) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/utils/ConfigReplacer.java b/airbyte-workers/src/main/java/io/airbyte/workers/utils/ConfigReplacer.java index 791b8f80e955..6af5d9053a30 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/utils/ConfigReplacer.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/utils/ConfigReplacer.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.text.StringSubstitutor; +import org.slf4j.Logger; /** * This class takes values from a connector's configuration and uses it to fill in template-string @@ -22,6 +23,12 @@ */ public class ConfigReplacer { + private final Logger logger; + + public ConfigReplacer(Logger logger) { + this.logger = logger; + } + /** * Note: This method does not interact with the secret manager. It is currently expected that all * replacement values are not secret (e.g. host vs password). This also assumed that the JSON config @@ -48,6 +55,10 @@ public AllowedHosts getAllowedHosts(AllowedHosts allowedHosts, JsonNode config) final List hosts = allowedHosts.getHosts(); for (String host : hosts) { final String replacedString = sub.replace(host); + if (replacedString.contains("${")) { + this.logger.error( + "The allowedHost value, '" + host + "', is expecting an interpolation value from the connector's configuration, but none is present"); + } resolvedHosts.add(replacedString); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java index 511c0dd98df2..0c6148f3a2b2 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java @@ -13,10 +13,14 @@ import java.util.ArrayList; import java.util.List; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class ConfigReplacerTest { - final ConfigReplacer replacer = new ConfigReplacer(); + final Logger logger = LoggerFactory.getLogger(ConfigReplacerTest.class); + + final ConfigReplacer replacer = new ConfigReplacer(logger); final ObjectMapper mapper = new ObjectMapper(); @Test