Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a line prefix to the line gobbler #7200

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.io;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.concurrency.VoidCallable;
import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -19,15 +20,48 @@
public class LineGobbler implements VoidCallable {

private final static Logger LOGGER = LoggerFactory.getLogger(LineGobbler.class);
private final static String DEFAULT_CALLER = "generic";
private final static String DEFAULT_PREFIX = "";

@VisibleForTesting
final static String SEPARATOR = " - ";

/**
* Create a {@LineGobbler} which will forward the logs of the input stream a consumer.
*
* @param is - the input stream to be consume
* @param consumer - the consumer which will process the
*/
public static void gobble(final InputStream is, final Consumer<String> consumer) {
gobble(is, consumer, "generic");
gobble(is, consumer, DEFAULT_CALLER);
}

/**
* Create a {@LineGobbler} which will forward the logs of the input stream a consumer.
*
* @param is - the input stream to be consume
* @param consumer - the consumer which will process the
* @param caller - A caller, which is a tag that will be used when logging that the operation is
* success or failure
*/
public static void gobble(final InputStream is, final Consumer<String> consumer, final String caller) {
gobble(is, consumer, caller, DEFAULT_PREFIX);
}

/**
* Create a {@LineGobbler} which will forward the logs of the input stream a consumer.
*
* @param is - the input stream to be consume
* @param consumer - the consumer which will process the
* @param caller - A caller, which is a tag that will be used when logging that the operation is
* success or failure
* @param prefix - A prefix that will be added to every line coming from the input stream, it will
* be seperated from the line by " - "
*/
public static void gobble(final InputStream is, final Consumer<String> consumer, final String caller, final String prefix) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Map<String, String> mdc = MDC.getCopyOfContextMap();
final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller);
final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller, prefix);
executor.submit(gobbler);
}

Expand All @@ -36,24 +70,35 @@ public static void gobble(final InputStream is, final Consumer<String> consumer,
private final ExecutorService executor;
private final Map<String, String> mdc;
private final String caller;
private final String prefix;

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc) {
this(is, consumer, executor, mdc, "generic");
this(is, consumer, executor, mdc, DEFAULT_CALLER);
}

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final String caller) {
this(is, consumer, executor, mdc, caller, DEFAULT_PREFIX);
}

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final String caller,
final String prefix) {
this.is = IOs.newBufferedReader(is);
this.consumer = consumer;
this.executor = executor;
this.mdc = mdc;
this.caller = caller;
this.prefix = prefix;
}

@Override
Expand All @@ -62,7 +107,12 @@ public void voidCall() {
try {
String line;
while ((line = is.readLine()) != null) {
consumer.accept(line);
if (prefix != DEFAULT_PREFIX) {
consumer.accept(prefix + SEPARATOR + line);
} else {
consumer.accept(line);
}

}
} catch (final IOException i) {
LOGGER.warn("{} gobbler IOException: {}. Typically happens when cancelling a job.", caller, i.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

package io.airbyte.commons.io;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.anyString;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -15,6 +13,7 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

Expand All @@ -25,7 +24,7 @@ class LineGobblerTest {
void readAllLines() {
final Consumer<String> consumer = Mockito.mock(Consumer.class);
final InputStream is = new ByteArrayInputStream("test\ntest2\n".getBytes(StandardCharsets.UTF_8));
final ExecutorService executor = spy(MoreExecutors.newDirectExecutorService());
final ExecutorService executor = Mockito.spy(MoreExecutors.newDirectExecutorService());

executor.submit(new LineGobbler(is, consumer, executor, ImmutableMap.of()));

Expand All @@ -34,12 +33,29 @@ void readAllLines() {
Mockito.verify(executor).shutdown();
}

@Test
@DisplayName("Ensure that a prefix is append to the consumer in put if specified")
void appendsPrefixOnAllLine() {
final var consumer = Mockito.mock(Consumer.class);
final var is = new ByteArrayInputStream("test\ntest2\n".getBytes(StandardCharsets.UTF_8));
final ExecutorService executor = Mockito.spy(MoreExecutors.newDirectExecutorService());

final var caller = "generic";
final var prefix = "prefix";

executor.submit(new LineGobbler(is, consumer, executor, ImmutableMap.of(), caller, prefix));

Mockito.verify(consumer).accept(prefix + LineGobbler.SEPARATOR + "test");
Mockito.verify(consumer).accept(prefix + LineGobbler.SEPARATOR + "test2");
Mockito.verify(executor).shutdown();
}

@Test
@SuppressWarnings("unchecked")
void shutdownOnSuccess() {
final Consumer<String> consumer = Mockito.mock(Consumer.class);
final InputStream is = new ByteArrayInputStream("test\ntest2\n".getBytes(StandardCharsets.UTF_8));
final ExecutorService executor = spy(MoreExecutors.newDirectExecutorService());
final ExecutorService executor = Mockito.spy(MoreExecutors.newDirectExecutorService());

executor.submit(new LineGobbler(is, consumer, executor, ImmutableMap.of()));

Expand All @@ -53,11 +69,11 @@ void shutdownOnError() {
final Consumer<String> consumer = Mockito.mock(Consumer.class);
Mockito.doThrow(RuntimeException.class).when(consumer).accept(anyString());
final InputStream is = new ByteArrayInputStream("test\ntest2\n".getBytes(StandardCharsets.UTF_8));
final ExecutorService executor = spy(MoreExecutors.newDirectExecutorService());
final ExecutorService executor = Mockito.spy(MoreExecutors.newDirectExecutorService());

executor.submit(new LineGobbler(is, consumer, executor, ImmutableMap.of()));

verify(consumer).accept(anyString());
Mockito.verify(consumer).accept(anyString());
Mockito.verify(executor).shutdown();
}

Expand Down