Skip to content

Commit

Permalink
🐛 Postgres Source Strict Encrypt: Allow connections with sslmodes 'al…
Browse files Browse the repository at this point in the history
…low' and 'prefer' if SSH tunnel established (#19551)

* Postgres Source Strict Encrypt: Allow connections with sslmodes 'allow' and 'prefer' if SSH tunnel established

* updated changelog

* fixed tests

* add test_strictness_level to acceptance-test-config.yml

* remove test_strictness_level to due to failed SAT test

* bump version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
VitaliiMaltsev and octavia-squidington-iii authored Nov 28, 2022
1 parent 2423c7d commit 637781d
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.25
dockerImageTag: 1.0.26
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11150,7 +11150,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.25"
- dockerImage: "airbyte/source-postgres:1.0.26"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.25
LABEL io.airbyte.version=1.0.26
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.25
LABEL io.airbyte.version=1.0.26
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Sour
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
static final Map<String, String> SSL_JDBC_PARAMETERS = ImmutableMap.of(
"ssl", "true",
"sslmode", "require");
private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
Expand All @@ -109,11 +106,7 @@ public static Source sshWrappedSource() {

@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
if (JdbcUtils.useSsl(config)) {
return SSL_JDBC_PARAMETERS;
} else {
return Collections.emptyMap();
}
return Collections.emptyMap();
}

@Override
Expand Down Expand Up @@ -174,7 +167,7 @@ public String toJDBCQueryParams(final Map<String, String> sslParams) {
.map((entry) -> {
try {
final String result = switch (entry.getKey()) {
case SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam(SslMode.valueOf(entry.getValue()))
case AbstractJdbcSource.SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam(SslMode.valueOf(entry.getValue()))
+ JdbcUtils.AMPERSAND + PARAM_SSL + EQUALS + (entry.getValue() == DISABLE ? PARAM_SSL_FALSE : PARAM_SSL_TRUE);
case CA_CERTIFICATE_PATH -> SSL_ROOT_CERT + EQUALS + entry.getValue();
case CLIENT_KEY_STORE_URL -> SSL_KEY + EQUALS + Path.of(new URI(entry.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,112 @@

public class PostgresSourceStrictEncryptTest {

private final PostgresSourceStrictEncrypt source = new PostgresSourceStrictEncrypt();
private final PostgreSQLContainer<?> postgreSQLContainerNoSSL = new PostgreSQLContainer<>("postgres:13-alpine");
private final PostgreSQLContainer<?> postgreSQLContainerWithSSL =
new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres"))
.withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key");
private static final List<String> NON_STRICT_SSL_MODES = List.of("disable", "allow", "prefer");
private static final String SSL_MODE_REQUIRE = "require";

private static final SshBastionContainer bastion = new SshBastionContainer();
private static final Network network = Network.newNetwork();

@Test
void testCheckWithSSlModeDisable() throws Exception {
void testSSlModesDisableAllowPreferWithTunnelIfServerDoesNotSupportSSL() throws Exception {

try (PostgreSQLContainer<?> db = postgreSQLContainerNoSSL.withNetwork(network)) {
bastion.initAndStartBastion(network);
db.start();

for (String sslmode : NON_STRICT_SSL_MODES) {
final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, sslmode);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatus.getStatus());
}

} finally {
bastion.stopAndClose();
}
}

@Test
void testSSlModesDisableAllowPreferWithTunnelIfServerSupportSSL() throws Exception {
try (PostgreSQLContainer<?> db = postgreSQLContainerWithSSL.withNetwork(network)) {

bastion.initAndStartBastion(network);
db.start();
for (String sslmode : NON_STRICT_SSL_MODES) {

final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, sslmode);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatus.getStatus());
}
} finally {
bastion.stopAndClose();
}
}

@Test
void testSSlModesDisableAllowPreferWithFailedTunnelIfServerSupportSSL() throws Exception {
try (PostgreSQLContainer<?> db = postgreSQLContainerWithSSL) {

try (PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine").withNetwork(network)) {
bastion.initAndStartBastion(network);
db.start();
for (String sslmode : NON_STRICT_SSL_MODES) {

// stop to enforce ssl for ssl_mode disable
final ImmutableMap.Builder<Object, Object> builderWithSSLModeDisable = getDatabaseConfigBuilderWithSSLMode(db, "disable");
final JsonNode configWithSSLModeDisable = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, builderWithSSLModeDisable);
final AirbyteConnectionStatus connectionStatusForDisabledMode = new PostgresSourceStrictEncrypt().check(configWithSSLModeDisable);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatusForDisabledMode.getStatus());
final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, sslmode);
assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatus.getStatus());
assertTrue(connectionStatus.getMessage().contains("Connection is not available"));

}
} finally {
bastion.stopAndClose();
}
}

@Test
void testCheckWithSSlModePrefer() throws Exception {
void testSSlRequiredWithTunnelIfServerDoesNotSupportSSL() throws Exception {

try (PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine").withNetwork(network)) {
try (PostgreSQLContainer<?> db = postgreSQLContainerNoSSL.withNetwork(network)) {
bastion.initAndStartBastion(network);
db.start();
// continue to enforce ssl because ssl mode is prefer
final ImmutableMap.Builder<Object, Object> builderWithSSLModePrefer = getDatabaseConfigBuilderWithSSLMode(db, "prefer");
final JsonNode configWithSSLModePrefer = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, builderWithSSLModePrefer);
final AirbyteConnectionStatus connectionStatusForPreferredMode = new PostgresSourceStrictEncrypt().check(configWithSSLModePrefer);
assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatusForPreferredMode.getStatus());
assertEquals("State code: 08004; Message: The server does not support SSL.", connectionStatusForPreferredMode.getMessage());
final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, SSL_MODE_REQUIRE);
assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatus.getStatus());
assertEquals("State code: 08004; Message: The server does not support SSL.", connectionStatus.getMessage());

} finally {
bastion.stopAndClose();
}
}

@Test
void testSSlRequiredNoTunnelIfServerSupportSSL() throws Exception {

try (PostgreSQLContainer<?> db = postgreSQLContainerWithSSL) {
db.start();

final ImmutableMap<Object, Object> configBuilderWithSSLMode = getDatabaseConfigBuilderWithSSLMode(db, SSL_MODE_REQUIRE).build();
final JsonNode config = Jsons.jsonNode(configBuilderWithSSLMode);
addNoTunnel((ObjectNode) config);
final AirbyteConnectionStatus connectionStatus = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatus.getStatus());
}
}

@Test
void testStrictSSLSecuredWithTunnel() throws Exception {

try (PostgreSQLContainer<?> db = postgreSQLContainerWithSSL.withNetwork(network)) {

bastion.initAndStartBastion(network);
db.start();

final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, SSL_MODE_REQUIRE);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatus.getStatus());
} finally {
bastion.stopAndClose();
}
}

private ImmutableMap.Builder<Object, Object> getDatabaseConfigBuilderWithSSLMode(PostgreSQLContainer<?> db, String sslMode) {
return ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, Objects.requireNonNull(db.getContainerInfo()
Expand Down Expand Up @@ -94,53 +161,26 @@ private JsonNode getMockedSSLConfig(String sslMode) {

@Test
void testSslModesUnsecuredNoTunnel() throws Exception {
for (String sslMode : List.of("disable", "allow", "prefer")) {
for (String sslMode : NON_STRICT_SSL_MODES) {
final JsonNode config = getMockedSSLConfig(sslMode);
((ObjectNode) config).putIfAbsent("tunnel_method", Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_method", "NO_TUNNEL")
.build()));
addNoTunnel((ObjectNode) config);

final AirbyteConnectionStatus actual = new PostgresSourceStrictEncrypt().check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus());
assertTrue(actual.getMessage().contains("Unsecured connection not allowed"));
final AirbyteConnectionStatus connectionStatus = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatus.getStatus());
assertTrue(connectionStatus.getMessage().contains("Unsecured connection not allowed"));
}
}

@Test
void testSslModeRequiredNoTunnel() throws Exception {

try (PostgreSQLContainer<?> db =
new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres"))
.withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key")) {
db.start();

final ImmutableMap<Object, Object> configBuilderWithSslModeRequire = getDatabaseConfigBuilderWithSSLMode(db, "require").build();
final JsonNode config = Jsons.jsonNode(configBuilderWithSslModeRequire);
((ObjectNode) config).putIfAbsent("tunnel_method", Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_method", "NO_TUNNEL")
.build()));
final AirbyteConnectionStatus connectionStatusForPreferredMode = new PostgresSourceStrictEncrypt().check(config);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatusForPreferredMode.getStatus());
}
private AirbyteConnectionStatus checkWithTunnel(PostgreSQLContainer<?> db, String sslmode) throws Exception {
final ImmutableMap.Builder<Object, Object> configBuilderWithSSLMode = getDatabaseConfigBuilderWithSSLMode(db, sslmode);
final JsonNode configWithSSLModeDisable = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, configBuilderWithSSLMode);
return source.check(configWithSSLModeDisable);
}

@Test
void testStrictSSLSecuredWithTunnel() throws Exception {
try (PostgreSQLContainer<?> db =
new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres"))
.withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key")
.withNetwork(network)) {

bastion.initAndStartBastion(network);
db.start();

final ImmutableMap.Builder<Object, Object> builderWithSSLModePrefer = getDatabaseConfigBuilderWithSSLMode(db, "require");
final JsonNode configWithSslAndSsh = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, builderWithSSLModePrefer);
final AirbyteConnectionStatus connectionStatusForPreferredMode = new PostgresSourceStrictEncrypt().check(configWithSslAndSsh);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatusForPreferredMode.getStatus());
} finally {
bastion.stopAndClose();
}
private static void addNoTunnel(ObjectNode config) {
config.putIfAbsent("tunnel_method", Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_method", "NO_TUNNEL")
.build()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,6 @@ private JsonNode getConfig(final PostgreSQLContainer<?> psqlDb, final String dbN
.build());
}

private JsonNode getConfigWithSsl(final PostgreSQLContainer<?> psqlDb, final String dbName) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", psqlDb.getHost())
.put("port", psqlDb.getFirstMappedPort())
.put("database", dbName)
.put("schemas", List.of(SCHEMA_NAME))
.put("username", psqlDb.getUsername())
.put("password", psqlDb.getPassword())
.put("ssl", true)
.build());
}

private JsonNode getConfig(final PostgreSQLContainer<?> psqlDb, final String dbName, final String user, final String password) {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, psqlDb.getHost())
Expand Down Expand Up @@ -481,22 +469,6 @@ void testIsCdc() {
assertTrue(PostgresUtils.isCdc(config));
}

@Test
void testGetDefaultConnectionPropertiesWithoutSsl() {
final JsonNode config = getConfig(PSQL_DB, dbName);
final Map<String, String> defaultConnectionProperties = new PostgresSource().getDefaultConnectionProperties(config);
assertEquals(defaultConnectionProperties, Collections.emptyMap());
};

@Test
void testGetDefaultConnectionPropertiesWithSsl() {
final JsonNode config = getConfigWithSsl(PSQL_DB, dbName);
final Map<String, String> defaultConnectionProperties = new PostgresSource().getDefaultConnectionProperties(config);
assertEquals(defaultConnectionProperties, ImmutableMap.of(
"ssl", "true",
"sslmode", "require"));
};

@Test
void testGetUsername() {
final String username = "airbyte-user";
Expand Down Expand Up @@ -568,7 +540,8 @@ private JsonNode buildConfigEscapingNeeded() {
JdbcUtils.HOST_KEY, "localhost",
JdbcUtils.PORT_KEY, 1111,
JdbcUtils.USERNAME_KEY, "user",
JdbcUtils.DATABASE_KEY, "db/foo"));
JdbcUtils.DATABASE_KEY, "db/foo",
JdbcUtils.SSL_KEY, "false"));
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.26 | 2022-11-18 | [19551](https://github.com/airbytehq/airbyte/pull/19551) | Fixes bug with ssl modes |
| 1.0.25 | 2022-11-16 | [19004](https://github.com/airbytehq/airbyte/pull/19004) | Use Debezium heartbeats to improve CDC replication of large databases. |
| 1.0.24 | 2022-11-07 | [19291](https://github.com/airbytehq/airbyte/pull/19291) | Default timeout is reduced from 1 min to 10sec |
| 1.0.23 | 2022-11-07 | [19025](https://github.com/airbytehq/airbyte/pull/19025) | Stop enforce SSL if ssl mode is disabled |
Expand Down

0 comments on commit 637781d

Please sign in to comment.