Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Postgres: Fix not being able to configure wal2json plugin #19985

Merged
merged 11 commits into from
Dec 6, 2022
2 changes: 2 additions & 0 deletions airbyte-integrations/connectors/source-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ dependencies {
testImplementation libs.postgresql
testImplementation libs.connectors.testcontainers.postgresql

testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation libs.connectors.testcontainers.postgresql

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,26 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

/**
* Runs the acceptance tests in the source-jdbc test module. We want this module to run these tests
* itself as a sanity check. The trade off here is that this class is duplicated from the one used
* in source-postgres.
*/
@ExtendWith(SystemStubsExtension.class)
class DefaultJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

private static PostgreSQLContainer<?> PSQL_DB;

private JsonNode config;
Expand All @@ -55,7 +63,6 @@ class DefaultJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
static void init() {
PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine");
PSQL_DB.start();
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s BIT(3) NOT NULL);";
INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES(B'101');";
}
Expand All @@ -72,6 +79,8 @@ public void setup() throws Exception {
.put(JdbcUtils.PASSWORD_KEY, PSQL_DB.getPassword())
.build());

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");

final String initScriptName = "init_" + dbName.concat(".sql");
final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";");
PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1252,17 +1252,4 @@ protected AirbyteMessage createStateMessage(final DbStreamState dbStreamState, f
}
}

public static void setEnv(final String key, final String value) {
try {
final Map<String, String> env = System.getenv();
final Class<?> cl = env.getClass();
final java.lang.reflect.Field field = cl.getDeclaredField("m");
field.setAccessible(true);
final Map<String, String> writableEnv = (Map<String, String>) field.get(env);
writableEnv.put(key, value);
} catch (final Exception e) {
throw new IllegalStateException("Failed to set environment variable", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {

testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation project(':airbyte-test-utils')
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'

testImplementation libs.connectors.testcontainers.mysql

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,19 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.Network;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
class MySqlStrictEncryptJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

protected static final String TEST_USER = "test";
protected static final String TEST_PASSWORD = "test";
protected static MySQLContainer<?> container;
Expand All @@ -71,13 +79,13 @@ static void init() throws SQLException {
.withEnv("MYSQL_ROOT_HOST", "%")
.withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD);
container.start();
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", container.getPassword());
connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
}

@BeforeEach
public void setup() throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
testImplementation libs.connectors.testcontainers.mysql

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static io.airbyte.integrations.io.airbyte.integration_tests.sources.utils.TestConstants.INITIAL_CDC_WAITING_SECONDS;
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -39,10 +38,18 @@
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private MySQLContainer<?> container;
Expand Down Expand Up @@ -105,7 +112,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) {
.put("method", "CDC")
.put("initial_waiting_seconds", INITIAL_CDC_WAITING_SECONDS)
.build());
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
Expand All @@ -29,10 +28,18 @@
import java.util.HashMap;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";

Expand All @@ -46,7 +53,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS;
Expand Down Expand Up @@ -46,10 +45,18 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class CdcMysqlSourceTest extends CdcSourceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

private static final String DB_NAME = MODELS_SCHEMA;
private MySQLContainer<?> container;
private Database database;
Expand All @@ -58,7 +65,7 @@ public class CdcMysqlSourceTest extends CdcSourceTest {

@BeforeEach
public void setup() throws SQLException {
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
init();
revokeAllPermissions();
grantCorrectPermissions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

protected static final String USERNAME_WITHOUT_PERMISSION = "new_user";
protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password";
protected static final String TEST_USER = "test";
Expand All @@ -61,7 +69,6 @@ class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {

@BeforeAll
static void init() throws Exception {
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0")
.withUsername(TEST_USER)
.withPassword(TEST_PASSWORD.call())
Expand All @@ -74,6 +81,7 @@ static void init() throws Exception {

@BeforeEach
public void setup() throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
testImplementation project(':airbyte-test-utils')
testImplementation libs.connectors.testcontainers.jdbc
testImplementation libs.connectors.testcontainers.postgresql
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
performanceTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@
"title": "Plugin",
"description": "A logical decoding plugin installed on the PostgreSQL server. The `pgoutput` plugin is used by default. If the replication table contains a lot of big jsonb values it is recommended to use `wal2json` plugin. Read more about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-2-select-a-replication-plugin\">selecting replication plugins</a>.",
"enum": ["pgoutput", "wal2json"],
"const": "pgoutput",
"default": "pgoutput",
"order": 2
},
"replication_slot": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@
"title": "Plugin",
"description": "A logical decoding plugin installed on the PostgreSQL server. The `pgoutput` plugin is used by default. If the replication table contains a lot of big jsonb values it is recommended to use `wal2json` plugin. Read more about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-2-select-a-replication-plugin\">selecting replication plugins</a>.",
"enum": ["pgoutput", "wal2json"],
"const": "pgoutput",
"default": "pgoutput",
"order": 2
},
"replication_slot": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_LSN;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -61,12 +60,20 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
abstract class CdcPostgresSourceTest extends CdcSourceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

protected static final String SLOT_NAME_BASE = "debezium_slot";
protected static final String PUBLICATION = "publication";
protected static final int INITIAL_WAITING_SECONDS = 5;
Expand Down Expand Up @@ -94,7 +101,7 @@ protected void setup() throws SQLException {
.withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), "/etc/postgresql/postgresql.conf")
.withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf");
container.start();
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
source = new PostgresSource();
dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,19 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

private static final String DATABASE = "new_db";
protected static final String USERNAME_WITHOUT_PERMISSION = "new_user";
protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password";
Expand All @@ -62,12 +70,12 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
static void init() {
PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine");
PSQL_DB.start();
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
}

@Override
@BeforeEach
public void setup() throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
COLUMN_CLAUSE_WITH_PK =
"id INTEGER, name VARCHAR(200) NOT NULL, updated_at DATE NOT NULL, wakeup_at TIMETZ NOT NULL, last_visited_at TIMESTAMPTZ NOT NULL, last_comment_at TIMESTAMP NOT NULL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@
"title": "Plugin",
"description": "A logical decoding plugin installed on the PostgreSQL server. The `pgoutput` plugin is used by default. If the replication table contains a lot of big jsonb values it is recommended to use `wal2json` plugin. Read more about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-2-select-a-replication-plugin\">selecting replication plugins</a>.",
"enum": ["pgoutput", "wal2json"],
"const": "pgoutput",
"default": "pgoutput",
"order": 2
},
"replication_slot": {
Expand Down