Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐞 Destination databricks: update jdbc driver to patch log4j #7622

Merged
merged 15 commits into from
Jan 7, 2022
4 changes: 4 additions & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ jobs:
uses: actions/checkout@v2
with:
repository: ${{github.event.pull_request.head.repo.full_name}} # always use the branch's repository
- name: Install Unzip for Databricks
if: github.event.inputs.connector == 'connectors/destination-databricks'
run: |
apt-get update && apt-get install -y unzip
- name: Install Java
uses: actions/setup-java@v1
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ jobs:
uses: actions/checkout@v2
with:
repository: ${{ github.event.inputs.repo }}
- name: Install Unzip for Databricks
if: github.event.inputs.connector == 'connectors/destination-databricks'
run: |
apt-get update && apt-get install -y unzip
- name: Install Java
uses: actions/setup-java@v1
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-github:0.2.9"
- dockerImage: "airbyte/source-github:0.2.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/github"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/builds.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
| BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) |
| ClickHouse | [![destination-clickhouse](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-clickhouse%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-clickhouse) |
| Cassandra | [![destination-cassandra](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-cassandra%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-cassandra) |
| Databricks | (Temporarily Not Available) |
| Databricks | [![destination-databricks](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-databricks%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-databricks) |
| Dev Null | [![destination-dev-null](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-dev-null%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-dev-null) |
| Elasticsearch | (Temporarily Not Available) |
| End-to-End Testing | [![destination-e2e-test](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-e2e-test%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-e2e-test) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ For information about how to use this connector within Airbyte, see [the User Do
## Databricks JDBC Driver
This connector requires a JDBC driver to connect to Databricks cluster. The driver is developed by Simba. Before downloading and using this driver, you must agree to the [JDBC ODBC driver license](https://databricks.com/jdbc-odbc-driver-license). This means that you can only use this driver to connector third party applications to Apache Spark SQL within a Databricks offering using the ODBC and/or JDBC protocols. The driver can be downloaded from [here](https://databricks.com/spark/jdbc-drivers-download).

The CI downloads the JDBC driver in [this script](https://github.com/airbytehq/airbyte/blob/master/tools/lib/databricks.sh).

This is currently a private connector that is only available in Airbyte Cloud. To build and publish this connector, first download the driver and put it under the `lib` directory. Please do not publish this connector publicly. We are working on a solution to publicize it.

## Local development
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ protected void tearDown(final TestDestinationEnv testEnv) throws SQLException {
.deleteObjects(new DeleteObjectsRequest(s3Config.getBucketName()).withKeys(keysToDelete));
LOGGER.info("Deleted {} file(s).", result.getDeletedObjects().size());
}
s3Client.shutdown();

// clean up database
LOGGER.info("Dropping database schema {}", databricksConfig.getDatabaseSchema());
final Database database = getDatabase(databricksConfig);
// we cannot use jooq dropSchemaIfExists method here because there is no proper dialect for
// Databricks, and it incorrectly quotes the schema name
database.query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", databricksConfig.getDatabaseSchema())));
try (final Database database = getDatabase(databricksConfig)) {
// we cannot use jooq dropSchemaIfExists method here because there is no proper dialect for
// Databricks, and it incorrectly quotes the schema name
database.query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", databricksConfig.getDatabaseSchema())));
} catch (final Exception e) {
throw new SQLException(e);
}
}

private static Database getDatabase(final DatabricksDestinationConfig databricksConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private static void closeAsOneTransaction(final List<StreamCopier> streamCopiers
for (final var copier : streamCopiers) {
copier.removeFileAndDropTmpTable();
}
db.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgardens, @sherifnada, I found that if we don't close the database connection here, the Databricks destination acceptance test will stuck. This is a new issue. Previously it was not the case.

Do you know any recent change that may lead to this new issue? If not, the only alternative explanation I can think of is that we upgraded the Spark JDBC Driver. However, nothing in the release note seems related.

}
if (firstException != null) {
throw firstException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
.withMessage("Could not connect to the staging persistence with the provided configuration. \n" + e.getMessage());
}

try {
try (final JdbcDatabase database = getDatabase(config)) {
final var nameTransformer = getNameTransformer();
final var outputSchema = nameTransformer.convertStreamName(config.get(schemaFieldName).asText());
final JdbcDatabase database = getDatabase(config);
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());

return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public StandardCheckConnectionOutput run(final StandardCheckConnectionInput inpu
LOGGER.debug("Check connection job received output: {}", output);
return output;
} else {
throw new WorkerException("Error while getting checking connection.");
throw new WorkerException(String.format("Error checking connection, status: %s, exit code: %d", status, exitCode));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrhizor, a check operation failed in the acceptance test. I added more information to the exception message and saw this:

Error checking connection, status: Optional[io.airbyte.protocol.models.AirbyteConnectionStatus@605d010e[status=SUCCEEDED,message=<null>,additionalProperties=***]], exit code: 143

Do you know what might be the root cause behind this exit code? Could it be related to the recent change in WorkerUtils#gentleClose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the gentle close changes look related. Are you sure this docker container isn't actually outputting an error code?

Copy link
Contributor Author

@tuliren tuliren Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what happens:

  • The database connection somehow requires an explicit close method call, otherwise, the connector will not shutdown.
  • The worker will wait for 1 minute for the connector to shutdown, otherwise it will close it by force (code here) by sending a SIGTERM.
  • The SIGTERM results in a 143 exit code.

So the root cause is that we must always close the database at the end for Databricks, which was not the case in the default check command implementation in CopyDestination. The issue has been resolved.

}

} catch (final Exception e) {
Expand Down
6 changes: 6 additions & 0 deletions tools/bin/ci_integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
set -e

. tools/lib/lib.sh
. tools/lib/databricks.sh

# runs integration tests for an integration name

connector="$1"
all_integration_tests=$(./gradlew integrationTest --dry-run | grep 'integrationTest SKIPPED' | cut -d: -f 4)
run() {
if [[ "$connector" == "all" ]] ; then
_get_databricks_jdbc_driver
echo "Running: ./gradlew --no-daemon --scan integrationTest"
./gradlew --no-daemon --scan integrationTest
else
Expand All @@ -34,6 +36,10 @@ else
integrationTestCommand=":airbyte-integrations:connectors:$connector:integrationTest"
fi
if [ -n "$selected_integration_test" ] ; then
if [[ "$selected_integration_test" == *"databricks"* ]] ; then
_get_databricks_jdbc_driver
fi

echo "Running: ./gradlew --no-daemon --scan $integrationTestCommand"
./gradlew --no-daemon --scan "$integrationTestCommand"
else
Expand Down
6 changes: 6 additions & 0 deletions tools/integrations/manage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set -e
set -x

. tools/lib/lib.sh
. tools/lib/databricks.sh

USAGE="
Usage: $(basename "$0") <cmd>
Expand Down Expand Up @@ -37,6 +38,11 @@ cmd_build() {
[ -d "$path" ] || error "Path must be the root path of the integration"

local run_tests=$1; shift || run_tests=true

if [[ "airbyte-integrations/connectors/destination-databricks" == "${path}" ]]; then
_get_databricks_jdbc_driver
fi

echo "Building $path"
./gradlew --no-daemon "$(_to_gradle_path "$path" clean)"
./gradlew --no-daemon "$(_to_gradle_path "$path" build)"
Expand Down
24 changes: 24 additions & 0 deletions tools/lib/databricks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

. tools/lib/lib.sh

# Whoever runs this script must accept the following terms & conditions:
# https://databricks.com/jdbc-odbc-driver-license
_get_databricks_jdbc_driver() {
local driver_zip="SimbaSparkJDBC42-2.6.21.1039.zip"
local driver_file="SparkJDBC42.jar"
local driver_url="https://databricks-bi-artifacts.s3.us-east-2.amazonaws.com/simbaspark-drivers/jdbc/2.6.21/${driver_zip}"
local connector_path="airbyte-integrations/connectors/destination-databricks"

if [[ -f "${connector_path}/lib/${driver_file}" ]] ; then
echo "[Databricks] Spark JDBC driver already exists"
else
echo "[Databricks] Downloading Spark JDBC driver..."
curl -o "${connector_path}/lib/${driver_zip}" "${driver_url}"

echo "[Databricks] Extracting Spark JDBC driver..."
unzip "${connector_path}/lib/${driver_zip}" "${driver_file}"
mv "${driver_file}" "${connector_path}/lib/"
rm "${connector_path}/lib/${driver_zip}"
fi
}