Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5889] Bmoric/add application name [2/3] #7267

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.airbyte.workers;

public interface Application {

default String getApplicationName() {
// This value should only be used in the U-Test, it is an empty string instead of airbyte-test in order to avoid displaying airbyte-test in prod
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbtTransformationRunner implements AutoCloseable {
public class DbtTransformationRunner implements AutoCloseable, Application {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually a worker. I think it's confusing to split at this level instead of at the worker / container level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationRunner.class);
private static final String DBT_ENTRYPOINT_SH = "entrypoint.sh";
Expand All @@ -43,14 +43,13 @@ public void start() throws Exception {
}

/**
* The docker image used by the DbtTransformationRunner is provided by the User, so we can't ensure
* to have the right python, dbt, dependencies etc software installed to successfully run our
* transform-config scripts (to translate Airbyte Catalogs into Dbt profiles file). Thus, we depend
* on the NormalizationRunner to configure the dbt project with the appropriate destination settings
* and pull the custom git repository into the workspace.
*
* Once the workspace folder/files is setup to run, we invoke the custom transformation command as
* provided by the user to execute whatever extra transformation has been implemented.
* The docker image used by the DbtTransformationRunner is provided by the User, so we can't ensure to have the right python, dbt, dependencies etc
* software installed to successfully run our transform-config scripts (to translate Airbyte Catalogs into Dbt profiles file). Thus, we depend on
* the NormalizationRunner to configure the dbt project with the appropriate destination settings and pull the custom git repository into the
* workspace.
* <p>
* Once the workspace folder/files is setup to run, we invoke the custom transformation command as provided by the user to execute whatever extra
* transformation has been implemented.
*/
public boolean run(final String jobId,
final int attempt,
Expand Down Expand Up @@ -117,4 +116,7 @@ public void close() throws Exception {
}
}

@Override public String getApplicationName() {
return "airbyte-dbt-transformation-runner";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbtTransformationWorker implements Worker<OperatorDbtInput, Void> {
public class DbtTransformationWorker implements Worker<OperatorDbtInput, Void>, Application {

private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationWorker.class);

Expand Down Expand Up @@ -77,4 +77,7 @@ public void cancel() {
}
}

@Override public String getApplicationName() {
return "airbyte-dbt-transformation-worker";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "airbyte-dbt-transformation-worker";
return "dbt-worker";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,7 @@ public void cancel() {
WorkerUtils.cancelProcess(process);
}

@Override public String getApplicationName() {
return "airbyte-connection-checker";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "airbyte-connection-checker";
return "check-connection-worker";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,7 @@ public void cancel() {
WorkerUtils.cancelProcess(process);
}

@Override public String getApplicationName() {
return "airbyte-catalog-discovery";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "airbyte-catalog-discovery";
return "discover-worker";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,7 @@ public void cancel() {
WorkerUtils.cancelProcess(process);
}

@Override public String getApplicationName() {
return "airbyte-spec-getter";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "airbyte-spec-getter";
return "get-spec-worker";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,7 @@ public void cancel() {
}
}

@Override public String getApplicationName() {
return "airbyte-normalization";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "airbyte-normalization";
return "normalization-worker";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ public DefaultReplicationWorker(final String jobId,
}

/**
* Run executes two threads. The first pipes data from STDOUT of the source to STDIN of the
* destination. The second listen on STDOUT of the destination. The goal of this second thread is to
* detect when the destination emits state messages. Only state messages emitted by the destination
* should be treated as state that is safe to return from run. In the case when the destination
* emits no state, we fall back on whatever state is pass in as an argument to this method.
* Run executes two threads. The first pipes data from STDOUT of the source to STDIN of the destination. The second listen on STDOUT of the
* destination. The goal of this second thread is to detect when the destination emits state messages. Only state messages emitted by the
* destination should be treated as state that is safe to return from run. In the case when the destination emits no state, we fall back on whatever
* state is pass in as an argument to this method.
*
* @param syncInput all configuration for running replication
* @param jobRoot file root that worker is allowed to use
* @param jobRoot file root that worker is allowed to use
* @return output of the replication attempt (including state)
* @throws WorkerException
*/
Expand Down Expand Up @@ -271,4 +270,7 @@ public void cancel() {

}

@Override public String getApplicationName() {
return "airbyte-replication";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "airbyte-replication";
return "sync-worker";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public class EchoWorker implements Worker<String, String> {

private static final Logger LOGGER = LoggerFactory.getLogger(EchoWorker.class);

public EchoWorker() {}
public EchoWorker() {
}

@Override
public String run(final String string, final Path jobRoot) {
Expand All @@ -25,4 +26,7 @@ public void cancel() {
// no-op
}

@Override public String getApplicationName() {
return "airbyte-echo-worker";
}
}
9 changes: 4 additions & 5 deletions airbyte-workers/src/main/java/io/airbyte/workers/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@

import java.nio.file.Path;

public interface Worker<InputType, OutputType> {
public interface Worker<InputType, OutputType> extends Application {

/**
* Blocking call to run the worker's workflow. Once this is complete, getStatus should return either
* COMPLETE, FAILED, or CANCELLED.
* Blocking call to run the worker's workflow. Once this is complete, getStatus should return either COMPLETE, FAILED, or CANCELLED.
*/
OutputType run(InputType inputType, Path jobRoot) throws WorkerException;

/**
* Cancels in-progress workers. Although all workers support cancel, in reality only the
* asynchronous {@link DefaultReplicationWorker}'s cancel is used.
* Cancels in-progress workers. Although all workers support cancel, in reality only the asynchronous {@link DefaultReplicationWorker}'s cancel is
* used.
*/
void cancel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.workers.Application;
import java.nio.file.Path;
import java.util.Optional;

public interface Destination<T> extends CheckedConsumer<T, Exception>, AutoCloseable {
public interface Destination<T> extends CheckedConsumer<T, Exception>, AutoCloseable, Application {

void start(WorkerDestinationConfig destinationConfig, Path jobRoot) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
package io.airbyte.workers.protocols;

import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.Application;

public interface Mapper<T> {
public interface Mapper<T> extends Application {

ConfiguredAirbyteCatalog mapCatalog(ConfiguredAirbyteCatalog catalog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
package io.airbyte.workers.protocols;

import io.airbyte.config.State;
import io.airbyte.workers.Application;
import java.util.Optional;
import java.util.function.Consumer;

public interface MessageTracker<T> extends Consumer<T> {
public interface MessageTracker<T> extends Consumer<T>, Application {
Copy link
Contributor

@michel-tricot michel-tricot Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem right. Message Tracker isn't an Application. I wonder if the way to track name is wrong or just the naming of this interface. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need make sure that I properly understand how granular we want to log. My initial thought was that we have a different prefix within the same application (a different prefix per step). So if we want to go in this direction we will probably need to rename the interface. If not I should remove the Application interface and move this method to the worker interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the iface name should change.

Will follow up offline, but I'm am not understanding the granularity we are going for right now. The issue is focused on distinguishing source, destination, and platform logs from each other. This PR is focused on distinguishing worker logs from each other, and I'm a little less clear on why that is valuable.

Copy link
Contributor Author

@benmoriceau benmoriceau Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sync up offline with Michel on that. I will abandon the interface and this review for something more suited to what is needed.


@Override
void accept(T message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
package io.airbyte.workers.protocols;

import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.workers.Application;
import java.nio.file.Path;
import java.util.Optional;

public interface Source<T> extends AutoCloseable {
public interface Source<T> extends AutoCloseable, Application {

void start(WorkerSourceConfig sourceConfig, Path jobRoot) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ public Optional<State> getOutputState() {
return Optional.ofNullable(outputState.get());
}

@Override public String getApplicationName() {
return "airbyte-message-tracker";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
package io.airbyte.workers.protocols.airbyte;

import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.Application;
import java.io.BufferedReader;
import java.util.stream.Stream;

public interface AirbyteStreamFactory {
public interface AirbyteStreamFactory extends Application {

Stream<AirbyteMessage> create(BufferedReader bufferedReader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,7 @@ public Optional<AirbyteMessage> attemptRead() {
return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
}

@Override public String getApplicationName() {
return "airbyte-destination";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even this level is a bit misleading. To a user in the UI, "destination" would likely make it seem like something running on the destination pod/container, not the destination class within the sync worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I have removed it.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory(), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION));
}

@VisibleForTesting
DefaultAirbyteSource(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final HeartbeatMonitor heartbeatMonitor) {
@VisibleForTesting DefaultAirbyteSource(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final HeartbeatMonitor heartbeatMonitor) {
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.heartbeatMonitor = heartbeatMonitor;
Expand Down Expand Up @@ -129,4 +128,7 @@ public void cancel() throws Exception {
}
}

@Override public String getApplicationName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return "airbyte-source";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
import org.slf4j.LoggerFactory;

/**
* Creates a stream from an input stream. The produced stream attempts to parse each line of the
* InputStream into a AirbyteMessage. If the line cannot be parsed into a AirbyteMessage it is
* dropped. Each record MUST be new line separated.
* Creates a stream from an input stream. The produced stream attempts to parse each line of the InputStream into a AirbyteMessage. If the line cannot
* be parsed into a AirbyteMessage it is dropped. Each record MUST be new line separated.
*
* <p>
* If a line starts with a AirbyteMessage and then has other characters after it, that
* AirbyteMessage will still be parsed. If there are multiple AirbyteMessage records on the same
* line, only the first will be parsed.
* If a line starts with a AirbyteMessage and then has other characters after it, that AirbyteMessage will still be parsed. If there are multiple
* AirbyteMessage records on the same line, only the first will be parsed.
*/
public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory {

Expand Down Expand Up @@ -89,4 +87,7 @@ private void internalLog(final AirbyteLogMessage logMessage) {
}
}

@Override public String getApplicationName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return "airbyte-stream-factory";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This source will never emit any messages. It can be used in cases where that is helpful (hint:
* reset connection jobs).
* This source will never emit any messages. It can be used in cases where that is helpful (hint: reset connection jobs).
*/
public class EmptyAirbyteSource implements AirbyteSource {

Expand Down Expand Up @@ -56,4 +55,7 @@ public void cancel() throws Exception {
// no op.
}

@Override public String getApplicationName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return "airbyte-empty-source";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
import org.slf4j.LoggerFactory;

/**
* We apply some transformations on the fly on the catalog (same should be done on records too) from
* the source before it reaches the destination. One of the transformation is to define the
* destination namespace where data will be stored and how to mirror (or not) the namespace used in
* the source (if any). This is configured in the UI through the syncInput.
* We apply some transformations on the fly on the catalog (same should be done on records too) from the source before it reaches the destination. One
* of the transformation is to define the destination namespace where data will be stored and how to mirror (or not) the namespace used in the source
* (if any). This is configured in the UI through the syncInput.
*/
public class NamespacingMapper implements Mapper<AirbyteMessage> {

Expand Down Expand Up @@ -97,4 +96,7 @@ private static String transformStreamName(final String streamName, final String
}
}

@Override public String getApplicationName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return "airbyte-namespacing-mapper";
}
}