Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix postgres integration tests #5370

Merged
merged 2 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,17 @@ public void testDataTypes() throws Exception {
"The streamer " + streamName + " should return all expected values. Missing values: " + values));
}

private String getValueFromJsonNode(JsonNode jsonNode) {
protected String getValueFromJsonNode(JsonNode jsonNode) {
if (jsonNode != null) {
String nodeText = jsonNode.asText();
String nodeString = jsonNode.toString();
String value = (nodeText != null && !nodeText.equals("") ? nodeText : nodeString);
if (jsonNode.isArray()) {
return jsonNode.toString();
}

String value = jsonNode.asText();
value = (value != null && value.equals("null") ? null : value);
return value;
} else
return null;
}
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@
public class CdcPostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String SLOT_NAME_BASE = "debezium_slot";
private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String NAMESPACE = "public";
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String PUBLICATION = "publication";

private PostgreSQLContainer<?> container;
private JsonNode config;
Expand All @@ -76,13 +78,18 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception
* {@link io.airbyte.integrations.source.postgres.PostgresSource#isCdc(JsonNode)} returns false, as
* a result no test in this class runs through the cdc path.
*/
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.put("replication_slot", SLOT_NAME_BASE)
.put("publication", PUBLICATION)
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", container.getDatabaseName())
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("replication_method", ImmutableMap.of("replication_slot", SLOT_NAME_BASE))
.put("replication_method", replicationMethod)
.build());

final Database database = Databases.createDatabase(
Expand All @@ -100,6 +107,7 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception
*/
database.query(ctx -> {
ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');");
ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;");
ctx.execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
ctx.execute("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
ctx.execute("CREATE TABLE starships(id INTEGER, name VARCHAR(200));");
Expand Down Expand Up @@ -147,6 +155,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
NAMESPACE,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
Expand All @@ -156,6 +165,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
NAMESPACE,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
Expand Down
Loading