Skip to content

Commit

Permalink
🎉 Inject Job Type into processes names. (#12561)
Browse files Browse the repository at this point in the history
We currently use sync for all process names (e.g. container names on Docker and pod names on Kube). This is confusing as though syncs are the majority of operations, not all operations are syncs.

This PR makes job type part of the process name so it's clearer to airbyte operators what processes correspond to what jobs. Intention is to help with oncall.

After this change, process names will look like (truncated to fit the underlying system):

- check = <connector-image>-check-<job-info>-<random-suffix>
- spec = <connector-image>-spec-<job-info>-<random-suffix>
- discover = <connector-image>-discover-<job-info>-<random-suffix>
- sync = <connector-image>-read-<job-info>-<random-suffix> for source connectors, <connector-image>-write-<job-info>-<random-suffix> for destination connectors, and <connector-image>-normalize-<job-info>-<random-suffix> for normalization processes.
- custom operations = <connector-image>-custom-<job-info>-<random-suffix> (not enabled on Cloud).
  • Loading branch information
davinchia authored May 5, 2022
1 parent 4b9ff01 commit 0c843e3
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -94,6 +94,7 @@ public boolean transform(final String jobId,
Collections.addAll(dbtArguments, Commandline.translateCommandline(dbtConfig.getDbtArguments()));
process =
processFactory.create(
AirbyteIntegrationLauncher.CUSTOM_STEP,
jobId,
attempt,
jobRoot,
Expand All @@ -102,7 +103,8 @@ public boolean transform(final String jobId,
files,
"/bin/bash",
resourceRequirements,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.CUSTOM_STEP),
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.CUSTOM_STEP),
Collections.emptyMap(),
Collections.emptyMap(),
dbtArguments.toArray(new String[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.nio.file.Path;
import java.util.Collections;
Expand Down Expand Up @@ -121,14 +121,16 @@ private boolean runProcess(final String jobId,
try {
LOGGER.info("Running with normalization version: {}", normalizationImageName);
process = processFactory.create(
AirbyteIntegrationLauncher.NORMALIZE_STEP,
jobId,
attempt,
jobRoot,
normalizationImageName,
false, files,
null,
resourceRequirements,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP),
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.NORMALIZE_STEP),
Collections.emptyMap(),
Collections.emptyMap(),
args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@

public class AirbyteIntegrationLauncher implements IntegrationLauncher {

/**
* The following variables help, either via names or labels, add metadata to processes actually
* running operations. These are more readable forms of
* {@link io.airbyte.config.JobTypeResourceLimit.JobType}.
*/
public static final String JOB_TYPE = "job_type";
public static final String SYNC_JOB = "sync";
public static final String SPEC_JOB = "spec";
public static final String CHECK_JOB = "check";
public static final String DISCOVER_JOB = "discover";

/**
* A sync job can actually be broken down into the following steps. Try to be as precise as possible
* with naming/labels to help operations.
*/
public static final String SYNC_STEP = "sync_step";
public static final String READ_STEP = "read";
public static final String WRITE_STEP = "write";
public static final String NORMALIZE_STEP = "normalize";
public static final String CUSTOM_STEP = "custom";

private final String jobId;
private final int attempt;
private final String imageName;
Expand All @@ -39,6 +60,7 @@ public AirbyteIntegrationLauncher(final String jobId,
@Override
public Process spec(final Path jobRoot) throws WorkerException {
return processFactory.create(
SPEC_JOB,
jobId,
attempt,
jobRoot,
Expand All @@ -47,7 +69,7 @@ public Process spec(final Path jobRoot) throws WorkerException {
Collections.emptyMap(),
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SPEC_JOB),
Map.of(JOB_TYPE, SPEC_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
"spec");
Expand All @@ -56,6 +78,7 @@ public Process spec(final Path jobRoot) throws WorkerException {
@Override
public Process check(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException {
return processFactory.create(
CHECK_JOB,
jobId,
attempt,
jobRoot,
Expand All @@ -64,7 +87,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri
ImmutableMap.of(configFilename, configContents),
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.CHECK_JOB),
Map.of(JOB_TYPE, CHECK_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
"check",
Expand All @@ -74,6 +97,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri
@Override
public Process discover(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException {
return processFactory.create(
DISCOVER_JOB,
jobId,
attempt,
jobRoot,
Expand All @@ -82,7 +106,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S
ImmutableMap.of(configFilename, configContents),
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.DISCOVER_JOB),
Map.of(JOB_TYPE, DISCOVER_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
"discover",
Expand Down Expand Up @@ -116,6 +140,7 @@ public Process read(final Path jobRoot,
}

return processFactory.create(
READ_STEP,
jobId,
attempt,
jobRoot,
Expand All @@ -124,7 +149,7 @@ public Process read(final Path jobRoot,
files,
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.READ_STEP),
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, READ_STEP),
getWorkerMetadata(),
Collections.emptyMap(),
arguments.toArray(new String[arguments.size()]));
Expand All @@ -142,6 +167,7 @@ public Process write(final Path jobRoot,
catalogFilename, catalogContents);

return processFactory.create(
WRITE_STEP,
jobId,
attempt,
jobRoot,
Expand All @@ -150,7 +176,7 @@ public Process write(final Path jobRoot,
files,
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.WRITE_STEP),
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, WRITE_STEP),
getWorkerMetadata(),
Collections.emptyMap(),
"write",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ private static Path prepareImageExistsScript() {
}

@Override
public Process create(final String jobId,
public Process create(final String jobType,
final String jobId,
final int attempt,
final Path jobRoot,
final String imageName,
Expand Down Expand Up @@ -115,7 +116,7 @@ public Process create(final String jobId,
rebasePath(jobRoot).toString(), // rebases the job root on the job data mount
"--log-driver",
"none");
final String containerName = ProcessFactory.createProcessName(imageName, jobId, attempt, DOCKER_NAME_LEN_LIMIT);
final String containerName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, DOCKER_NAME_LEN_LIMIT);
cmd.add("--name");
cmd.add(containerName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,6 @@ public class KubeProcessFactory implements ProcessFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);

public static final String JOB_TYPE = "job_type";
public static final String SYNC_JOB = "sync";
public static final String SPEC_JOB = "spec";
public static final String CHECK_JOB = "check";
public static final String DISCOVER_JOB = "discover";

public static final String SYNC_STEP = "sync_step";
public static final String READ_STEP = "read";
public static final String WRITE_STEP = "write";
public static final String NORMALISE_STEP = "normalise";
public static final String CUSTOM_STEP = "custom";

private static final String JOB_LABEL_KEY = "job_id";
private static final String ATTEMPT_LABEL_KEY = "attempt_id";
private static final String WORKER_POD_LABEL_KEY = "airbyte";
Expand Down Expand Up @@ -91,9 +79,11 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs,
}

@Override
public Process create(final String jobId,
public Process create(
final String jobType,
final String jobId,
final int attempt,
final Path jobRoot, // todo: remove unused
final Path jobRoot,
final String imageName,
final boolean usesStdin,
final Map<String, String> files,
Expand All @@ -106,7 +96,7 @@ public Process create(final String jobId,
throws WorkerException {
try {
// used to differentiate source and destination processes with the same id and attempt
final String podName = ProcessFactory.createProcessName(imageName, jobId, attempt, KUBE_NAME_LEN_LIMIT);
final String podName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, KUBE_NAME_LEN_LIMIT);
LOGGER.info("Attempting to start pod = {} for {}", podName, imageName);

final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface ProcessFactory {
/**
* Creates a ProcessBuilder to run a program in a new Process.
*
* @param jobType type of job to add to name for easier operational processes.
* @param jobId job Id
* @param attempt attempt Id
* @param jobPath Workspace directory to run the process from.
Expand All @@ -37,7 +38,8 @@ public interface ProcessFactory {
* @return ProcessBuilder object to run the process.
* @throws WorkerException
*/
Process create(String jobId,
Process create(String jobType,
String jobId,
int attempt,
final Path jobPath,
final String imageName,
Expand All @@ -59,14 +61,14 @@ Process create(String jobId,
* With these two facts, attempt to construct a unique process name with the image name present that
* can be used by the factories implementing this interface for easier operations.
*/
static String createProcessName(final String fullImagePath, final String jobId, final int attempt, final int lenLimit) {
static String createProcessName(final String fullImagePath, final String jobType, final String jobId, final int attempt, final int lenLimit) {
final var noVersion = fullImagePath.split(VERSION_DELIMITER)[0];

final var nameParts = noVersion.split(DOCKER_DELIMITER);
var imageName = nameParts[nameParts.length - 1];

final var randSuffix = RandomStringUtils.randomAlphabetic(5).toLowerCase();
final String suffix = "sync" + "-" + jobId + "-" + attempt + "-" + randSuffix;
final String suffix = jobType + "-" + jobId + "-" + attempt + "-" + randSuffix;

var processName = imageName + "-" + suffix;
if (processName.length() > lenLimit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public static String getEntrypointEnvVariable(final ProcessFactory processFactor
final String imageName)
throws IOException, InterruptedException, WorkerException {
final Process process = processFactory.create(
"entrypoint-checker",
jobId,
jobAttempt,
jobRoot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ private Process getProcess(final String entrypoint, final Map<String, String> fi
private Process getProcess(final Map<String, String> customLabels, final String entrypoint, final Map<String, String> files)
throws WorkerException {
return processFactory.create(
"tester",
"some-id",
0,
Path.of("/tmp/job-root"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -76,9 +76,11 @@ void setup() throws IOException, WorkerException {
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

when(processFactory.create(JOB_ID, JOB_ATTEMPT, jobRoot, NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, false, files, null,
when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, false, files, null,
workerConfigs.getResourceRequirements(),
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP),
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.NORMALIZE_STEP),
Map.of(),
Map.of(),
"run",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.workers.process;

import static io.airbyte.workers.process.AirbyteIntegrationLauncher.*;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.config.EnvConfigs;
Expand Down Expand Up @@ -52,18 +54,19 @@ void setUp() {
void spec() throws WorkerException {
launcher.spec(JOB_ROOT);

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, Collections.emptyMap(), null,
workerConfigs.getResourceRequirements(), Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SPEC_JOB), JOB_METADATA, Map.of(),
Mockito.verify(processFactory).create(SPEC_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, Collections.emptyMap(), null,
workerConfigs.getResourceRequirements(), Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SPEC_JOB), JOB_METADATA,
Map.of(),
"spec");
}

@Test
void check() throws WorkerException {
launcher.check(JOB_ROOT, "config", "{}");

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
Mockito.verify(processFactory).create(CHECK_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
workerConfigs.getResourceRequirements(),
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.CHECK_JOB),
Map.of(JOB_TYPE, CHECK_JOB),
JOB_METADATA,
Map.of(),
"check",
Expand All @@ -74,9 +77,9 @@ void check() throws WorkerException {
void discover() throws WorkerException {
launcher.discover(JOB_ROOT, "config", "{}");

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
Mockito.verify(processFactory).create(DISCOVER_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
workerConfigs.getResourceRequirements(),
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.DISCOVER_JOB),
Map.of(JOB_TYPE, DISCOVER_JOB),
JOB_METADATA,
Map.of(),
"discover",
Expand All @@ -87,9 +90,9 @@ void discover() throws WorkerException {
void read() throws WorkerException {
launcher.read(JOB_ROOT, "config", "{}", "catalog", "{}", "state", "{}");

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_CATALOG_STATE_FILES, null,
Mockito.verify(processFactory).create(READ_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_CATALOG_STATE_FILES, null,
workerConfigs.getResourceRequirements(),
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.READ_STEP),
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, READ_STEP),
JOB_METADATA,
Map.of(),
Lists.newArrayList(
Expand All @@ -103,9 +106,9 @@ void read() throws WorkerException {
void write() throws WorkerException {
launcher.write(JOB_ROOT, "config", "{}", "catalog", "{}");

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, true, CONFIG_CATALOG_FILES, null,
Mockito.verify(processFactory).create(WRITE_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, true, CONFIG_CATALOG_FILES, null,
workerConfigs.getResourceRequirements(),
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.WRITE_STEP),
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, WRITE_STEP),
JOB_METADATA,
Map.of(),
"write",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testFileWriting() throws IOException, WorkerException {

final DockerProcessFactory processFactory =
new DockerProcessFactory(new WorkerConfigs(new EnvConfigs()), workspaceRoot, null, null, null);
processFactory.create("job_id", 0, jobRoot, "busybox", false, ImmutableMap.of("config.json", "{\"data\": 2}"), "echo hi",
processFactory.create("tester", "job_id", 0, jobRoot, "busybox", false, ImmutableMap.of("config.json", "{\"data\": 2}"), "echo hi",
new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of(), Map.of(), Map.of());

assertEquals(
Expand Down Expand Up @@ -113,6 +113,7 @@ public void testEnvMapSet() throws IOException, WorkerException, InterruptedExce
waitForDockerToInitialize(processFactory, jobRoot, workerConfigs);

final Process process = processFactory.create(
"tester",
"job_id",
0,
jobRoot,
Expand Down Expand Up @@ -144,6 +145,7 @@ private void waitForDockerToInitialize(final ProcessFactory processFactory, fina

while (stopwatch.elapsed().compareTo(Duration.ofSeconds(30)) < 0) {
final Process p = processFactory.create(
"tester",
"job_id_" + RandomStringUtils.randomAlphabetic(4),
0,
jobRoot,
Expand Down
Loading

0 comments on commit 0c843e3

Please sign in to comment.