Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5889] Bmoric/log application name for all workers [2/2] #7268

Merged
merged 41 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
99f4546
Add the log source to the MDP
benmoriceau Oct 21, 2021
ed4bcc6
Add helper to safely update the MDC
benmoriceau Oct 21, 2021
85b81a2
Format
benmoriceau Oct 21, 2021
3b1a65a
Add an application name
benmoriceau Oct 21, 2021
0b27e02
Tmp
benmoriceau Oct 21, 2021
2e794b1
Log the application name
benmoriceau Oct 21, 2021
1103348
PR Comments
benmoriceau Oct 22, 2021
dd8a123
Merge branch 'bmoric/mdc-manipulation-tools' of github.com:airbytehq/…
benmoriceau Oct 22, 2021
20964f0
PR Comments
benmoriceau Oct 22, 2021
f5ff0dc
Merge branch 'bmoric/add-application-name' of github.com:airbytehq/ai…
benmoriceau Oct 22, 2021
80d5aab
Update related to merge
benmoriceau Oct 22, 2021
d536342
Format
benmoriceau Oct 22, 2021
bd4a7ff
Merge branch 'bmoric/mdc-manipulation-tools' of github.com:airbytehq/…
benmoriceau Oct 22, 2021
ee6b7e9
Format
benmoriceau Oct 22, 2021
c3c82bb
Merge branch 'bmoric/add-application-name' of github.com:airbytehq/ai…
benmoriceau Oct 22, 2021
a041a7d
Format
benmoriceau Oct 22, 2021
988c9d8
Fix one test
benmoriceau Oct 22, 2021
bbdb3e9
Remove the need of an application interface
benmoriceau Oct 22, 2021
7e64527
restore dbt runner
benmoriceau Oct 22, 2021
2c1d9fc
Rollback changes non related to the issue.
benmoriceau Oct 22, 2021
0c08536
Add missing file
benmoriceau Oct 22, 2021
2f5e0dc
limit the labelling to logs coming from a container and not a worker
benmoriceau Oct 22, 2021
b5796b2
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/log…
benmoriceau Oct 22, 2021
734bc1a
Add the right colot to the prefix
benmoriceau Oct 22, 2021
c1c46d9
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/log…
benmoriceau Oct 22, 2021
3fa1260
Rm useless print of a stack trace
benmoriceau Oct 22, 2021
0001cc2
Format
benmoriceau Oct 22, 2021
1b313a1
Add a ScopeMdc to the line gobbler
benmoriceau Oct 25, 2021
b0cb21e
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/log…
benmoriceau Oct 25, 2021
be40626
Fix the check connection activity
benmoriceau Oct 26, 2021
f1c7bbb
Merge branch 'bmoric/fix-temporal' into bmoric/log-application-name
benmoriceau Oct 26, 2021
2861087
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/log…
benmoriceau Oct 26, 2021
62fa485
Add tests
benmoriceau Oct 26, 2021
e1b1fd8
Test compilation
benmoriceau Oct 26, 2021
cfa5192
Change to logJobRoot name
benmoriceau Oct 26, 2021
89e732d
Fix test setup
benmoriceau Oct 27, 2021
430f3dd
Add tag to normalization and dbt
benmoriceau Oct 27, 2021
d5abecd
Update test
benmoriceau Oct 27, 2021
af40532
PR comments
benmoriceau Oct 27, 2021
1f6f2c4
Update test
benmoriceau Oct 27, 2021
cc019c9
Format
benmoriceau Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.commons.io;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.logging.MdcScope;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -21,13 +22,17 @@ public class LineGobbler implements VoidCallable {
private final static Logger LOGGER = LoggerFactory.getLogger(LineGobbler.class);

public static void gobble(final InputStream is, final Consumer<String> consumer) {
gobble(is, consumer, "generic");
gobble(is, consumer, "generic", MdcScope.DEFAULT);
}

public static void gobble(final InputStream is, final Consumer<String> consumer, final String caller) {
public static void gobble(final InputStream is, final Consumer<String> consumer, final MdcScope mdcScope) {
gobble(is, consumer, "generic", mdcScope);
}

public static void gobble(final InputStream is, final Consumer<String> consumer, final String caller, final MdcScope mdcScope) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Map<String, String> mdc = MDC.getCopyOfContextMap();
final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller);
final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller, mdcScope);
executor.submit(gobbler);
}

Expand All @@ -36,24 +41,35 @@ public static void gobble(final InputStream is, final Consumer<String> consumer,
private final ExecutorService executor;
private final Map<String, String> mdc;
private final String caller;
private final MdcScope containerLogMDC;

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc) {
this(is, consumer, executor, mdc, "generic");
this(is, consumer, executor, mdc, "generic", MdcScope.DEFAULT);
}

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final MdcScope mdcScope) {
this(is, consumer, executor, mdc, "generic", mdcScope);
}

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final String caller) {
final String caller,
final MdcScope mdcScope) {
this.is = IOs.newBufferedReader(is);
this.consumer = consumer;
this.executor = executor;
this.mdc = mdc;
this.caller = caller;
this.containerLogMDC = mdcScope;
}

@Override
Expand All @@ -62,7 +78,9 @@ public void voidCall() {
try {
String line;
while ((line = is.readLine()) != null) {
consumer.accept(line);
try (containerLogMDC) {
consumer.accept(line);
}
}
} catch (final IOException i) {
LOGGER.warn("{} gobbler IOException: {}. Typically happens when cancelling a job.", caller, i.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.logging;

import com.google.common.annotations.VisibleForTesting;

public class LoggingHelper {

public enum Color {
Expand Down Expand Up @@ -31,7 +33,8 @@ public String getCode() {

public static final String LOG_SOURCE_MDC_KEY = "log_source";

private static final String RESET = "\u001B[0m";
@VisibleForTesting
public static final String RESET = "\u001B[0m";

public static String applyColor(final Color color, final String msg) {
return color.getCode() + msg + RESET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.commons.logging;

import io.airbyte.commons.logging.LoggingHelper.Color;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.MDC;

/**
Expand All @@ -25,6 +28,8 @@
*/
public class MdcScope implements AutoCloseable {

public final static MdcScope DEFAULT = new Builder().build();

private final Map<String, String> originalContextMap;

public MdcScope(final Map<String, String> keyValuesToAdd) {
Expand All @@ -35,8 +40,41 @@ public MdcScope(final Map<String, String> keyValuesToAdd) {
}

@Override
public void close() throws Exception {
public void close() {
MDC.setContextMap(originalContextMap);
}

public static class Builder {

private Optional<String> maybeLogPrefix = Optional.empty();
private Optional<Color> maybePrefixColor = Optional.empty();

public Builder setLogPrefix(final String logPrefix) {
this.maybeLogPrefix = Optional.ofNullable(logPrefix);

return this;
}

public Builder setPrefixColor(final Color color) {
this.maybePrefixColor = Optional.ofNullable(color);

return this;
}

public MdcScope build() {
final Map<String, String> extraMdcEntries = new HashMap<>();

maybeLogPrefix.stream().forEach(logPrefix -> {
final String potentiallyColoredLog = maybePrefixColor
.map(color -> LoggingHelper.applyColor(color, logPrefix))
.orElse(logPrefix);

extraMdcEntries.put(LoggingHelper.LOG_SOURCE_MDC_KEY, potentiallyColoredLog);
});

return new MdcScope(extraMdcEntries);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,14 @@ public void testMDCModified() {

});

} catch (final Exception e) {
e.printStackTrace();
}
}

@Test
@DisplayName("The MDC context is properly restored")
public void testMDCRestore() {
try (final MdcScope mdcScope = new MdcScope(modificationInMDC)) {} catch (final Exception e) {
e.printStackTrace();
}
try (final MdcScope mdcScope = new MdcScope(modificationInMDC)) {}

final Map<String, String> mdcState = MDC.getCopyOfContextMap();

Assertions.assertThat(mdcState).containsAllEntriesOf(originalMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
Expand All @@ -28,6 +31,10 @@ public class DbtTransformationRunner implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationRunner.class);
private static final String DBT_ENTRYPOINT_SH = "entrypoint.sh";
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("dbt-container-log")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.setLogPrefix("dbt-container-log")
.setLogPrefix("dbt")

.setPrefixColor(Color.CYAN)
.build();

private final ProcessFactory processFactory;
private final NormalizationRunner normalizationRunner;
Expand All @@ -48,7 +55,7 @@ public void start() throws Exception {
* transform-config scripts (to translate Airbyte Catalogs into Dbt profiles file). Thus, we depend
* on the NormalizationRunner to configure the dbt project with the appropriate destination settings
* and pull the custom git repository into the workspace.
*
* <p>
* Once the workspace folder/files is setup to run, we invoke the custom transformation command as
* provided by the user to execute whatever extra transformation has been implemented.
*/
Expand All @@ -59,10 +66,12 @@ public boolean run(final String jobId,
final ResourceRequirements resourceRequirements,
final OperatorDbt dbtConfig)
throws Exception {
if (!normalizationRunner.configureDbt(jobId, attempt, jobRoot, config, resourceRequirements, dbtConfig)) {
return false;
try (CONTAINER_LOG_MDC) {
if (!normalizationRunner.configureDbt(jobId, attempt, jobRoot, config, resourceRequirements, dbtConfig)) {
return false;
}
return transform(jobId, attempt, jobRoot, config, resourceRequirements, dbtConfig);
}
return transform(jobId, attempt, jobRoot, config, resourceRequirements, dbtConfig);
}

public boolean transform(final String jobId,
Expand All @@ -72,7 +81,7 @@ public boolean transform(final String jobId,
final ResourceRequirements resourceRequirements,
final OperatorDbt dbtConfig)
throws Exception {
try {
try (CONTAINER_LOG_MDC) {
final Map<String, String> files = ImmutableMap.of(
DBT_ENTRYPOINT_SH, MoreResources.readResource("dbt_transformation_entrypoint.sh"),
"sshtunneling.sh", MoreResources.readResource("sshtunneling.sh"));
Expand Down Expand Up @@ -104,16 +113,18 @@ public boolean transform(final String jobId,

@Override
public void close() throws Exception {
normalizationRunner.close();
try (CONTAINER_LOG_MDC) {
normalizationRunner.close();

if (process == null) {
return;
}
if (process == null) {
return;
}

LOGGER.debug("Closing dbt transformation process");
WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES);
if (process.isAlive() || process.exitValue() != 0) {
throw new WorkerException("Dbt transformation process wasn't successful");
LOGGER.debug("Closing dbt transformation process");
WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES);
if (process.isAlive() || process.exitValue() != 0) {
throw new WorkerException("Dbt transformation process wasn't successful");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand All @@ -27,6 +30,10 @@
public class DefaultNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("normalization-container-log")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.setLogPrefix("normalization-container-log")
.setLogPrefix("normalization")

.setPrefixColor(Color.GREEN)
.build();

private final DestinationType destinationType;
private final ProcessFactory processFactory;
Expand Down Expand Up @@ -58,24 +65,26 @@ public boolean configureDbt(final String jobId,
final ResourceRequirements resourceRequirements,
final OperatorDbt dbtConfig)
throws Exception {
final Map<String, String> files = ImmutableMap.of(
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config));
final String gitRepoUrl = dbtConfig.getGitRepoUrl();
if (Strings.isNullOrEmpty(gitRepoUrl)) {
throw new WorkerException("Git Repo Url is required");
}
final String gitRepoBranch = dbtConfig.getGitRepoBranch();
if (Strings.isNullOrEmpty(gitRepoBranch)) {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl);
} else {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl,
"--git-branch", gitRepoBranch);
try (CONTAINER_LOG_MDC) {
final Map<String, String> files = ImmutableMap.of(
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config));
final String gitRepoUrl = dbtConfig.getGitRepoUrl();
if (Strings.isNullOrEmpty(gitRepoUrl)) {
throw new WorkerException("Git Repo Url is required");
}
final String gitRepoBranch = dbtConfig.getGitRepoBranch();
if (Strings.isNullOrEmpty(gitRepoBranch)) {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl);
} else {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl,
"--git-branch", gitRepoBranch);
}
}
}

Expand All @@ -87,14 +96,16 @@ public boolean normalize(final String jobId,
final ConfiguredAirbyteCatalog catalog,
final ResourceRequirements resourceRequirements)
throws Exception {
final Map<String, String> files = ImmutableMap.of(
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "run",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME);
try (CONTAINER_LOG_MDC) {
final Map<String, String> files = ImmutableMap.of(
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "run",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME);
}
}

private boolean runProcess(final String jobId,
Expand All @@ -104,7 +115,7 @@ private boolean runProcess(final String jobId,
final ResourceRequirements resourceRequirements,
final String... args)
throws Exception {
try {
try (CONTAINER_LOG_MDC) {
LOGGER.info("Running with normalization version: {}", normalizationImageName);
process = processFactory.create(jobId, attempt, jobRoot, normalizationImageName, false, files, null, resourceRequirements,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP), args);
Expand All @@ -126,14 +137,16 @@ private boolean runProcess(final String jobId,

@Override
public void close() throws Exception {
if (process == null) {
return;
}
try (CONTAINER_LOG_MDC) {
if (process == null) {
return;
}

LOGGER.debug("Closing normalization process");
WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES);
if (process.isAlive() || process.exitValue() != 0) {
throw new WorkerException("Normalization process wasn't successful");
LOGGER.debug("Closing normalization process");
WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES);
if (process.isAlive() || process.exitValue() != 0) {
throw new WorkerException("Normalization process wasn't successful");
}
}
}

Expand Down
Loading