Skip to content

Commit

Permalink
additional logging
Browse files Browse the repository at this point in the history
  • Loading branch information
DoNotPanicUA committed Apr 26, 2022
1 parent 6d7e04f commit 9a3964d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
LOGGER.warn("parsed 1 : {} ", parsed);
LOGGER.warn("parsed 2 : {} ", parsed.getCatalogPath());
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
LOGGER.warn("Cat : {} ", catalog);
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
AirbyteSentry.executeWithTracing("WriteDestination", () -> runConsumer(consumer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> o
final ConfiguredAirbyteCatalog catalog,
final StreamCopierFactory<T> streamCopierFactory,
final String defaultSchema) {
LOGGER.warn("Create PAIR : {}", defaultSchema);
final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier = createWriteConfigs(
namingResolver,
config,
Expand Down Expand Up @@ -74,7 +75,9 @@ private static <T> Map<AirbyteStreamNameNamespacePair, StreamCopier> createWrite
final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier = new HashMap<>();
final String stagingFolder = UUID.randomUUID().toString();
for (final var configuredStream : catalog.getStreams()) {
LOGGER.warn("stream {}", configuredStream);
final var stream = configuredStream.getStream();
LOGGER.warn("pair {}", stream);
final var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream);
final var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, configuredStream, namingResolver, database, sqlOperations);

Expand Down Expand Up @@ -157,6 +160,8 @@ private static void closeAsOneTransaction(final Map<AirbyteStreamNameNamespacePa
}
}
if (!hasFailed) {
LOGGER.warn("namespace1 : {}", pairToCopier);
LOGGER.warn("namespace2 : {}", pairToCopier.keySet());
sqlOperations.onDestinationCloseOperations(db,
pairToCopier.keySet().stream().map(AirbyteStreamNameNamespacePair::getNamespace).collect(toSet()));
sqlOperations.executeTransaction(db, queries);
Expand Down

0 comments on commit 9a3964d

Please sign in to comment.