Skip to content

Commit

Permalink
Add orchestrator label. (#20904)
Browse files Browse the repository at this point in the history
Add the orchestrator label to orchestrators so we can better differentiate orchestrator pods.

This is useful since orchestrator pods are the only pods in the job namespace with a need to talk to the main Airbyte application pods. These labels allow us to apply more granular network filtering.

Also took the chance to do some clean up of labels.
  • Loading branch information
davinchia authored Dec 28, 2022
1 parent 99335da commit 54c0ef1
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

package io.airbyte.workers.general;

import static io.airbyte.workers.process.Metadata.CUSTOM_STEP;
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand All @@ -17,7 +22,6 @@
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -93,7 +97,7 @@ public boolean transform(final String jobId,
Collections.addAll(dbtArguments, Commandline.translateCommandline(dbtConfig.getDbtArguments()));
process =
processFactory.create(
AirbyteIntegrationLauncher.CUSTOM_STEP,
CUSTOM_STEP,
jobId,
attempt,
jobRoot,
Expand All @@ -103,8 +107,7 @@ public boolean transform(final String jobId,
files,
"/bin/bash",
resourceRequirements,
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.CUSTOM_STEP),
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, CUSTOM_STEP),
Collections.emptyMap(),
Collections.emptyMap(),
dbtArguments.toArray(new String[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

package io.airbyte.workers.normalization;

import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
import static io.airbyte.workers.process.Metadata.NORMALIZE_STEP;
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand All @@ -25,7 +30,6 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.io.InputStream;
import java.nio.file.Path;
Expand Down Expand Up @@ -120,7 +124,7 @@ private boolean runProcess(final String jobId,
try {
LOGGER.info("Running with normalization version: {}", normalizationImageName);
process = processFactory.create(
AirbyteIntegrationLauncher.NORMALIZE_STEP,
NORMALIZE_STEP,
jobId,
attempt,
jobRoot,
Expand All @@ -130,8 +134,7 @@ private boolean runProcess(final String jobId,
false, files,
null,
resourceRequirements,
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.NORMALIZE_STEP),
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP),
Collections.emptyMap(),
Collections.emptyMap(),
args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
import static io.airbyte.workers.process.Metadata.CHECK_JOB;
import static io.airbyte.workers.process.Metadata.DISCOVER_JOB;
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
import static io.airbyte.workers.process.Metadata.READ_STEP;
import static io.airbyte.workers.process.Metadata.SPEC_JOB;
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
import static io.airbyte.workers.process.Metadata.WRITE_STEP;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
Expand All @@ -27,29 +35,8 @@

public class AirbyteIntegrationLauncher implements IntegrationLauncher {

/**
* The following variables help, either via names or labels, add metadata to processes actually
* running operations. These are more readable forms of
* {@link io.airbyte.config.JobTypeResourceLimit.JobType}.
*/
public static final String JOB_TYPE = "job_type";
public static final String SYNC_JOB = "sync";
public static final String SPEC_JOB = "spec";
public static final String CHECK_JOB = "check";
public static final String DISCOVER_JOB = "discover";

private static final String CONFIG = "--config";

/**
* A sync job can actually be broken down into the following steps. Try to be as precise as possible
* with naming/labels to help operations.
*/
public static final String SYNC_STEP = "sync_step";
public static final String READ_STEP = "read";
public static final String WRITE_STEP = "write";
public static final String NORMALIZE_STEP = "normalize";
public static final String CUSTOM_STEP = "custom";

private final String jobId;
private final int attempt;
private final String imageName;
Expand Down Expand Up @@ -94,7 +81,7 @@ public Process spec(final Path jobRoot) throws WorkerException {
Collections.emptyMap(),
null,
resourceRequirement,
Map.of(JOB_TYPE, SPEC_JOB),
Map.of(JOB_TYPE_KEY, SPEC_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
"spec");
Expand All @@ -115,7 +102,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri
ImmutableMap.of(configFilename, configContents),
null,
resourceRequirement,
Map.of(JOB_TYPE, CHECK_JOB),
Map.of(JOB_TYPE_KEY, CHECK_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
"check",
Expand All @@ -137,7 +124,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S
ImmutableMap.of(configFilename, configContents),
null,
resourceRequirement,
Map.of(JOB_TYPE, DISCOVER_JOB),
Map.of(JOB_TYPE_KEY, DISCOVER_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
"discover",
Expand Down Expand Up @@ -183,7 +170,7 @@ public Process read(final Path jobRoot,
files,
null,
resourceRequirement,
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, READ_STEP),
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP),
getWorkerMetadata(),
Collections.emptyMap(),
arguments.toArray(new String[arguments.size()]));
Expand Down Expand Up @@ -213,7 +200,7 @@ public Process write(final Path jobRoot,
files,
null,
resourceRequirement,
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, WRITE_STEP),
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP),
getWorkerMetadata(),
Collections.emptyMap(),
"write",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ public class KubeProcessFactory implements ProcessFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);

private static final String JOB_LABEL_KEY = "job_id";
private static final String ATTEMPT_LABEL_KEY = "attempt_id";
private static final String WORKER_POD_LABEL_KEY = "airbyte";
private static final String WORKER_POD_LABEL_VALUE = "worker-pod";

private final WorkerConfigs workerConfigs;
private final String namespace;
private final KubernetesClient fabricClient;
Expand Down Expand Up @@ -146,13 +141,17 @@ public Process create(
}
}

/**
* Returns general labels to be applied to all Kubernetes pods. All general labels should be added
* here.
*/
public static Map<String, String> getLabels(final String jobId, final int attemptId, final Map<String, String> customLabels) {
final var allLabels = new HashMap<>(customLabels);

final var generalKubeLabels = Map.of(
JOB_LABEL_KEY, jobId,
ATTEMPT_LABEL_KEY, String.valueOf(attemptId),
WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE);
Metadata.JOB_LABEL_KEY, jobId,
Metadata.ATTEMPT_LABEL_KEY, String.valueOf(attemptId),
Metadata.WORKER_POD_LABEL_KEY, Metadata.WORKER_POD_LABEL_VALUE);

allLabels.putAll(generalKubeLabels);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.process;

/**
* The following variables help, either via names or labels, add metadata to processes actually
* running operations to ease operations.
*/
public final class Metadata {

/**
* General Metadata
*/
static final String JOB_LABEL_KEY = "job_id";
static final String ATTEMPT_LABEL_KEY = "attempt_id";
static final String WORKER_POD_LABEL_KEY = "airbyte";
static final String WORKER_POD_LABEL_VALUE = "job-pod";
public static final String CONNECTION_ID_LABEL_KEY = "connection_id";

/**
* These are more readable forms of {@link io.airbyte.config.JobTypeResourceLimit.JobType}.
*/
public static final String JOB_TYPE_KEY = "job_type";
public static final String SYNC_JOB = "sync";
public static final String SPEC_JOB = "spec";
public static final String CHECK_JOB = "check";
public static final String DISCOVER_JOB = "discover";

/**
* A sync job can actually be broken down into the following steps. Try to be as precise as possible
* with naming/labels to help operations.
*/
public static final String SYNC_STEP_KEY = "sync_step";
public static final String READ_STEP = "read";
public static final String WRITE_STEP = "write";
public static final String NORMALIZE_STEP = "normalize";
public static final String CUSTOM_STEP = "custom";
public static final String ORCHESTRATOR_STEP = "orchestrator";

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.PROCESS_EXIT_VALUE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
import static io.airbyte.workers.process.Metadata.CONNECTION_ID_LABEL_KEY;
import static io.airbyte.workers.process.Metadata.ORCHESTRATOR_STEP;
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;

import com.google.common.base.Stopwatch;
import datadog.trace.api.Trace;
Expand Down Expand Up @@ -57,7 +60,6 @@
@Slf4j
public class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUTPUT> {

private static final String CONNECTION_ID_LABEL_KEY = "connection_id";
private static final Duration MAX_DELETION_TIMEOUT = Duration.ofSeconds(45);

private final UUID connectionId;
Expand Down Expand Up @@ -138,7 +140,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
final var allLabels = KubeProcessFactory.getLabels(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
Map.of(CONNECTION_ID_LABEL_KEY, connectionId.toString()));
Map.of(CONNECTION_ID_LABEL_KEY, connectionId.toString(), SYNC_STEP_KEY, ORCHESTRATOR_STEP));

final var podNameAndJobPrefix = podNamePrefix + "-job-" + jobRunConfig.getJobId() + "-attempt-";
final var podName = podNameAndJobPrefix + jobRunConfig.getAttemptId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
package io.airbyte.workers.normalization;

import static io.airbyte.commons.logging.LoggingHelper.RESET;
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
import static io.airbyte.workers.process.Metadata.NORMALIZE_STEP;
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -27,7 +31,6 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -85,11 +88,10 @@ void setup() throws IOException, WorkerException {
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
when(processFactory.create(NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null,
workerConfigs.getResourceRequirements(),
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.NORMALIZE_STEP),
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP),
Map.of(),
Map.of(),
"run",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

package io.airbyte.workers.process;

import static io.airbyte.workers.process.AirbyteIntegrationLauncher.CHECK_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.DISCOVER_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.JOB_TYPE;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.READ_STEP;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SPEC_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_STEP;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.WRITE_STEP;
import static io.airbyte.workers.process.Metadata.CHECK_JOB;
import static io.airbyte.workers.process.Metadata.DISCOVER_JOB;
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
import static io.airbyte.workers.process.Metadata.READ_STEP;
import static io.airbyte.workers.process.Metadata.SPEC_JOB;
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
import static io.airbyte.workers.process.Metadata.WRITE_STEP;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -72,7 +72,7 @@ void spec() throws WorkerException {
launcher.spec(JOB_ROOT);

Mockito.verify(processFactory).create(SPEC_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, Collections.emptyMap(), null,
workerConfigs.getResourceRequirements(), Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SPEC_JOB), JOB_METADATA,
workerConfigs.getResourceRequirements(), Map.of(JOB_TYPE_KEY, SPEC_JOB), JOB_METADATA,
Map.of(),
"spec");
}
Expand All @@ -83,7 +83,7 @@ void check() throws WorkerException {

Mockito.verify(processFactory).create(CHECK_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null,
workerConfigs.getResourceRequirements(),
Map.of(JOB_TYPE, CHECK_JOB),
Map.of(JOB_TYPE_KEY, CHECK_JOB),
JOB_METADATA,
Map.of(),
"check",
Expand All @@ -96,7 +96,7 @@ void discover() throws WorkerException {

Mockito.verify(processFactory).create(DISCOVER_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null,
workerConfigs.getResourceRequirements(),
Map.of(JOB_TYPE, DISCOVER_JOB),
Map.of(JOB_TYPE_KEY, DISCOVER_JOB),
JOB_METADATA,
Map.of(),
"discover",
Expand All @@ -109,7 +109,7 @@ void read() throws WorkerException {

Mockito.verify(processFactory).create(READ_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_CATALOG_STATE_FILES, null,
workerConfigs.getResourceRequirements(),
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, READ_STEP),
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP),
JOB_METADATA,
Map.of(),
Lists.newArrayList(
Expand All @@ -125,7 +125,7 @@ void write() throws WorkerException {

Mockito.verify(processFactory).create(WRITE_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, true, CONFIG_CATALOG_FILES, null,
workerConfigs.getResourceRequirements(),
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, WRITE_STEP),
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP),
JOB_METADATA,
Map.of(),
"write",
Expand Down
Loading

0 comments on commit 54c0ef1

Please sign in to comment.