Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Namespace destination tables #1992

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions airbyte-commons/src/main/java/io/airbyte/commons/text/Names.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,6 @@ public static String toAlphanumericAndUnderscore(String s) {
.replaceAll(NON_ALPHANUMERIC_AND_UNDERSCORE_PATTERN, "_");
}

/**
* Concatenate Strings together, but handles the case where the strings are already quoted
*/
public static String concatQuotedNames(final String inputStr1, final String inputStr2) {
boolean anyQuotes = false;
String unquotedStr1 = inputStr1;
String unquotedStr2 = inputStr2;
if (inputStr1.startsWith("\"")) {
unquotedStr1 = inputStr1.substring(1, inputStr1.length() - 1);
anyQuotes = true;
}
if (inputStr2.startsWith("\"")) {
unquotedStr2 = inputStr2.substring(1, inputStr2.length() - 1);
anyQuotes = true;
}
if (anyQuotes) {
return "\"" + unquotedStr1 + unquotedStr2 + "\"";
} else {
return unquotedStr1 + unquotedStr2;
}
}

public static String doubleQuote(String value) {
return internalQuote(value, '"');
}
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ plugins {

dependencies {
implementation 'commons-cli:commons-cli:1.4'
implementation 'org.apache.commons:commons-lang3:3.11'

implementation project(':airbyte-db')
implementation project(':airbyte-config:models')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
public class ExtendedNameTransformer extends StandardNameTransformer {

@Override
protected String convertStreamName(String input) {
return super.convertStreamName(input);
public String getIdentifier(String input) {
return super.getIdentifier(input);
}

// Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785)
protected String disabled_convertStreamName(String input) {
protected String disabled_getIdentifier(String input) {
if (useExtendedIdentifiers(input)) {
return "\"" + input + "\"";
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,19 @@
package io.airbyte.integrations.destination;

/**
* Destination have their own Naming conventions (which characters are valid or rejected in
* Databases have their own Naming conventions (which characters are valid or rejected in
* identifiers names) This class transform a random string used to a valid identifier names for each
* specific destination.
* specific database.
*/
public interface NamingConventionTransformer {

/**
* Handle Naming Conversions of an input name to output a valid identifier name for the desired
* destination.
* database.
*
* @param name of the identifier to check proper naming conventions
* @return modified name with invalid characters replaced by '_' and adapted for the chosen
* destination.
* @return modified name with invalid characters replaced by '_' or quoted identifiers
*/
String getIdentifier(String name);

/**
* Same as getIdentifier but returns also the name of the table for storing raw data
*
* @param name of the identifier to check proper naming conventions
* @return modified name with invalid characters replaced by '_' and adapted for the chosen
* destination.
*
* @deprecated as this is very SQL specific, prefer using getIdentifier instead
*/
@Deprecated
String getRawTableName(String name);

/**
* Same as getIdentifier but returns also the name of the table for storing tmp data
*
* @param name of the identifier to check proper naming conventions
* @return modified name with invalid characters replaced by '_' and adapted for the chosen
* destination.
*
* @deprecated as this is very SQL specific, prefer using getIdentifier instead
*/
@Deprecated
String getTmpTableName(String name);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination;

import java.time.Instant;
import org.apache.commons.lang3.RandomStringUtils;

public class NamingHelper {

/**
* Returns the name of a schema for storing raw data associated with a schema name. Make sure to
* apply the proper naming convention on the final table name for the database
*/
public static String getTmpSchemaName(NamingConventionTransformer transformer, String schemaName) {
if (schemaName != null)
return transformer.getIdentifier("_airbyte_" + schemaName);
else
return transformer.getIdentifier("_airbyte");
}

/**
* Returns the name of the table for storing tmp data associated with a stream name. Name is
* randomly generated.
*/
public static String getTmpTableName(NamingConventionTransformer transformer, String streamName) {
return transformer
.getIdentifier(String.format("_tmp_%s%s_%s", RandomStringUtils.randomAlphanumeric(4), Instant.now().toEpochMilli(), streamName));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@
package io.airbyte.integrations.destination;

import io.airbyte.commons.text.Names;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardNameTransformer implements NamingConventionTransformer {

@Override
public String getIdentifier(String name) {
return convertStreamName(name);
}
private static final int MAX_IDENTIFIER_LENGTH = 1024;

@Override
public String getRawTableName(String streamName) {
return convertStreamName("_airbyte_raw_" + streamName);
}
private static final Logger LOGGER = LoggerFactory.getLogger(StandardNameTransformer.class);

@Override
public String getTmpTableName(String streamName) {
return convertStreamName("_airbyte_" + Instant.now().toEpochMilli() + "_" + streamName);
protected int getMaxIdentifierLength() {
return MAX_IDENTIFIER_LENGTH;
}

protected String convertStreamName(String input) {
return Names.toAlphanumericAndUnderscore(input);
@Override
public String getIdentifier(String name) {
if (name.length() >= getMaxIdentifierLength()) {
final String newName = name.substring(0, getMaxIdentifierLength() - 1);
LOGGER.warn(String.format("Identifier '%s' is too long, truncating it to: `%s`", name, newName));
name = newName;
}
return Names.toAlphanumericAndUnderscore(name);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
public class WriteConfig {

private final String streamName;
private final String outputNamespaceName;
private final String tmpNamespaceName;
private final String tmpTableName;
private final String outputTableName;
private final SyncMode syncMode;

public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, SyncMode syncMode) {
public WriteConfig(String streamName, String tmpNamespaceName, String tmpTableName, String outputTableName, SyncMode syncMode) {
this.streamName = streamName;
this.outputNamespaceName = outputNamespaceName;
this.tmpNamespaceName = tmpNamespaceName;
this.tmpTableName = tmpTableName;
this.outputTableName = outputTableName;
this.syncMode = syncMode;
Expand All @@ -46,12 +46,12 @@ public String getStreamName() {
return streamName;
}

public String getTmpTableName() {
return tmpTableName;
public String getTmpNamespaceName() {
return tmpNamespaceName;
}

public String getOutputNamespaceName() {
return outputNamespaceName;
public String getTmpTableName() {
return tmpTableName;
}

public String getOutputTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ class IntegrationRunnerTest {

private static final String CONFIG_STRING = "{ \"username\": \"airbyte\" }";
private static final JsonNode CONFIG = Jsons.deserialize(CONFIG_STRING);
private static final String NAMESPACE = "test";
private static final String STREAM_NAME = "users";
private static final Long EMITTED_AT = Instant.now().toEpochMilli();
private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests");

private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME)));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG);
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(NAMESPACE, CATALOG);
private static final JsonNode STATE = Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945"));

private IntegrationCliParser cliParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ void testStandardSQLNaming() {
assertEquals("identifier_name", namingResolver.getIdentifier("identifier name"));
assertEquals("identifier_", namingResolver.getIdentifier("identifier%"));
assertEquals("_identifier_", namingResolver.getIdentifier("`identifier`"));

assertEquals("_airbyte_raw_identifier_name", namingResolver.getRawTableName("identifier_name"));
}

// Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785)
Expand All @@ -74,9 +72,6 @@ void testExtendedSQLNaming() {
assertEquals("\"identifier name\"", namingResolver.getIdentifier("identifier name"));
assertEquals("\"identifier%\"", namingResolver.getIdentifier("identifier%"));
assertEquals("\"`identifier`\"", namingResolver.getIdentifier("`identifier`"));

assertEquals("_airbyte_raw_identifier_name", namingResolver.getRawTableName("identifier_name"));
assertEquals("\"_airbyte_raw_identifiêr name\"", namingResolver.getRawTableName("identifiêr name"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public abstract class TestDestination {

private static final long JOB_ID = 0L;
private static final int JOB_ATTEMPT = 0;
private static final String NAMESPACE = "standard-integration-tests";

private static final Logger LOGGER = LoggerFactory.getLogger(TestDestination.class);

Expand Down Expand Up @@ -131,6 +132,13 @@ public abstract class TestDestination {
*/
protected abstract List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName) throws Exception;

/**
* Function that @returns the namespace to use for running standard tests in the destination
*/
protected String getNamespace() {
return NAMESPACE;
}

/**
* Override to return true to if the destination implements basic normalization and it should be
* tested here.
Expand Down Expand Up @@ -272,7 +280,7 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
@ArgumentsSource(DataArgumentsProvider.class)
public void testSync(String messagesFilename, String catalogFilename) throws Exception {
final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(getNamespace(), catalog);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfig(), messages, configuredCatalog);
Expand All @@ -293,7 +301,7 @@ public void testIncrementalSync() throws Exception {

final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource("exchange_rate_catalog.json"), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(getNamespace(), catalog);
configuredCatalog.getStreams().forEach(s -> s.withSyncMode(SyncMode.INCREMENTAL));
final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource("exchange_rate_messages.txt").lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
Expand Down Expand Up @@ -326,7 +334,7 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil
}

final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(getNamespace(), catalog);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfigWithBasicNormalization(), messages, configuredCatalog);
Expand All @@ -344,7 +352,7 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil
public void testSecondSync() throws Exception {
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource("exchange_rate_catalog.json"), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(getNamespace(), catalog);
final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource("exchange_rate_messages.txt").lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfig(), firstSyncMessages, configuredCatalog);
Expand Down
Loading