Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉Source Clickhouse: added ssl support and "strict-encrypt" connector #7127

Merged
merged 10 commits into from
Oct 25, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION source-clickhouse-strict-encrypt

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-clickhouse-strict-encrypt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Clickhouse Strict Encrypt Test Configuration

In order to test the Clickhouse destination, you need to have the up and running Clickhouse database that has SSL enabled.

This connector inherits the Clickhouse source, but support SSL connections only.

# Integration tests
For ssl test custom image is used. To push it run this command under the tools\integration-tests-ssl dir:
*docker build -t your_user/clickhouse-with-ssl:dev -f Clickhouse.Dockerfile .*
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-clickhouse-strict-encrypt:dev
tests:
spec:
- spec_path: "src/test-integration/resources/expected_spec.json"
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.source.clickhouse.ClickHouseStrictEncryptSource'
applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')
implementation project(':airbyte-integrations:connectors:source-clickhouse')
implementation project(':airbyte-protocol:models')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation 'ru.yandex.clickhouse:clickhouse-jdbc:0.3.1'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-clickhouse')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-clickhouse-strict-encrypt')
integrationTestJavaImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
integrationTestJavaImplementation "org.testcontainers:clickhouse:1.16.0"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.clickhouse;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.spec_modification.SpecModifyingSource;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickHouseStrictEncryptSource extends SpecModifyingSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseStrictEncryptSource.class);

public ClickHouseStrictEncryptSource() {
super(ClickHouseSource.getWrappedSource());
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
// SSL property should be enabled by default for secure versions of connectors
// that can be used in the Airbyte cloud. User should not be able to change this property.
((ObjectNode) spec.getConnectionSpecification().get("properties")).remove("ssl");
return spec;
}

public static void main(String[] args) throws Exception {
final Source source = new ClickHouseStrictEncryptSource();
LOGGER.info("starting source: {}", ClickHouseStrictEncryptSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed source: {}", ClickHouseStrictEncryptSource.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.Assert.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.clickhouse.ClickHouseSource;
import io.airbyte.integrations.source.clickhouse.ClickHouseStrictEncryptSource;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.List;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;

public class ClickHouseStrictEncryptJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {

private static GenericContainer container;
private static JdbcDatabase db;
private JsonNode config;
private String dbName;

@Override
public boolean supportsSchemas() {
return false;
}

@Override
public JsonNode getConfig() {
return Jsons.clone(config);
}

@Override
public String getDriverClass() {
return ClickHouseSource.DRIVER_CLASS;
}

@Override
public String createTableQuery(final String tableName,
final String columnClause,
final String primaryKeyClause) {
// ClickHouse requires Engine to be mentioned as part of create table query.
// Refer : https://clickhouse.tech/docs/en/engines/table-engines/ for more information
return String.format("CREATE TABLE %s(%s) %s",
dbName + "." + tableName, columnClause, primaryKeyClause.equals("") ? "Engine = TinyLog"
: "ENGINE = MergeTree() ORDER BY " + primaryKeyClause + " PRIMARY KEY "
+ primaryKeyClause);
}

@BeforeAll
static void init() {
container = new GenericContainer("etsybaev/clickhouse-with-ssl:dev").withExposedPorts(8443);
container.start();
}

@BeforeEach
public void setup() throws Exception {
final JsonNode configWithoutDbName = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("username", "default")
.put("password", "")
.build());

db = Databases.createJdbcDatabase(
configWithoutDbName.get("username").asText(),
configWithoutDbName.get("password").asText(),
String.format("jdbc:clickhouse://%s:%s?ssl=true&sslmode=none",
configWithoutDbName.get("host").asText(),
configWithoutDbName.get("port").asText()),
ClickHouseSource.DRIVER_CLASS);

dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();

db.execute(ctx -> ctx.createStatement().execute(String.format("CREATE DATABASE %s;", dbName)));
config = Jsons.clone(configWithoutDbName);
((ObjectNode) config).put("database", dbName);

super.setup();
}

@AfterEach
public void tearDownMySql() throws Exception {
db.execute(ctx -> ctx.createStatement().execute(String.format("DROP DATABASE %s;", dbName)));
super.tearDown();
}

@AfterAll
public static void cleanUp() throws Exception {
db.close();
container.close();
}

@Override
public String primaryKeyClause(final List<String> columns) {
if (columns.isEmpty()) {
return "";
}

final StringBuilder clause = new StringBuilder();
clause.append("(");
for (int i = 0; i < columns.size(); i++) {
clause.append(columns.get(i));
if (i != (columns.size() - 1)) {
clause.append(",");
}
}
clause.append(")");
return clause.toString();
}

@Override
public AbstractJdbcSource getJdbcSource() {
return new ClickHouseSource();
}

@Override
public Source getSource() {
return new ClickHouseStrictEncryptSource();
}

@Test
void testSpec() throws Exception {
final ConnectorSpecification actual = source.spec();
final ConnectorSpecification expected =
Jsons.deserialize(MoreResources.readResource("expected_spec.json"),
ConnectorSpecification.class);
assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/clickhouse",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ClickHouse Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"additionalProperties": false,
"properties": {
"host": {
"description": "Host Endpoint of the Clickhouse Cluster",
"type": "string"
},
"port": {
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 8123,
"examples": ["8123"]
},
"database": {
"description": "Name of the database.",
"type": "string",
"examples": ["default"]
},
"username": {
"description": "Username to use to access the database.",
"type": "string"
},
"password": {
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
}
}
}
}
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-clickhouse/ReadMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Integration tests
For ssl test custom image is used. To push it run this command under the tools\integration-tests-ssl dir:
*docker build -t your_user/clickhouse-with-ssl:dev -f Clickhouse.Dockerfile .*
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ dependencies {
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-clickhouse')
integrationTestJavaImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
integrationTestJavaImplementation "org.testcontainers:clickhouse:1.15.3"
integrationTestJavaImplementation "org.testcontainers:clickhouse:1.16.0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public class ClickHouseSource extends AbstractJdbcSource implements Source {
* https://clickhouse.tech/docs/en/operations/system-tables/columns/ to fetch the primary keys.
*/

public static final List<String> SSL_PARAMETERS = List.of(
"ssl=true",
"sslmode=none");

@Override
protected Map<String, List<String>> discoverPrimaryKeys(final JdbcDatabase database,
final List<TableInfo<CommonField<JDBCType>>> tableInfos) {
Expand All @@ -61,6 +65,10 @@ protected Map<String, List<String>> discoverPrimaryKeys(final JdbcDatabase datab
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSource.class);
public static final String DRIVER_CLASS = "ru.yandex.clickhouse.ClickHouseDriver";

public static Source getWrappedSource() {
return new ClickHouseSource();
}

/**
* The reason we use NoOpJdbcStreamingQueryConfiguration(not setting auto commit to false and not
* setting fetch size to 1000) for ClickHouse is cause method
Expand All @@ -73,13 +81,20 @@ public ClickHouseSource() {

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:clickhouse://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));

// assume ssl if not explicitly mentioned.
if (!config.has("ssl") || config.get("ssl").asBoolean()) {
jdbcUrl.append("?").append(String.join("&", SSL_PARAMETERS));
}

return Jsons.jsonNode(ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("password", config.get("password").asText())
.put("jdbc_url", String.format("jdbc:clickhouse://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()))
.put("jdbc_url", jdbcUrl.toString())
.build());
}

Expand All @@ -89,7 +104,7 @@ public Set<String> getExcludedInternalNameSpaces() {
}

public static void main(final String[] args) throws Exception {
final Source source = new ClickHouseSource();
final Source source = ClickHouseSource.getWrappedSource();
LOGGER.info("starting source: {}", ClickHouseSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed source: {}", ClickHouseSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
},
"ssl": {
"title": "SSL Connection",
"description": "Encrypt data using SSL.",
"type": "boolean",
"default": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public String primaryKeyClause(final List<String> columns) {
@Override
@BeforeEach
public void setup() throws Exception {
db = new ClickHouseContainer("yandex/clickhouse-server:21.3.10.1-alpine");
db = new ClickHouseContainer("yandex/clickhouse-server:21.8.8.29-alpine");
db.start();

config = Jsons.jsonNode(ImmutableMap.builder()
Expand All @@ -85,6 +85,7 @@ public void setup() throws Exception {
.put("database", SCHEMA_NAME)
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("ssl", false)
.build());

super.setup();
Expand Down
Loading