Skip to content

Commit

Permalink
ODBC Based Databricks Connector (#4814)
Browse files Browse the repository at this point in the history
* ODBC Based Databricks connector.

* Install Databricks' ODBC driver in Docker image

* Add useragent string.

* Add Types enum to redash.query_runner to replace the seprate constants.

* Databricks connector:

1. Parse types.
2. Send additional connection options.
3. Correctly parse errors.

* Switch to TYPE constants to use code with Python 2.

* Add note about the Databricks driver terms and conditions.

* Show message about Databricks driver terms and conditions.

* Handle cases when the query doesn't return any results.

* Update redash/query_runner/databricks.py

Co-Authored-By: Jesse <jesse@whitehouse.dev>

* Use new Databricks logo

* Fix connection string options

Co-authored-by: Jesse <jesse@whitehouse.dev>
  • Loading branch information
arikfr and susodapop authored Apr 24, 2020
1 parent 6ee9b43 commit ea8a075
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 55 deletions.
13 changes: 11 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,24 @@ RUN apt-get update && \
libssl-dev \
default-libmysqlclient-dev \
freetds-dev \
libsasl2-dev && \
# MSSQL ODBC Driver:
libsasl2-dev \
unzip \
libsasl2-modules-gssapi-mit && \
# MSSQL ODBC Driver:
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list && \
apt-get update && \
ACCEPT_EULA=Y apt-get install -y msodbcsql17 && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

ADD https://databricks.com/wp-content/uploads/2.6.10.1010-2/SimbaSparkODBC-2.6.10.1010-2-Debian-64bit.zip /tmp/simba_odbc.zip
RUN unzip /tmp/simba_odbc.zip -d /tmp/ \
&& dpkg -i /tmp/SimbaSparkODBC-2.6.10.1010-2-Debian-64bit/simbaspark_2.6.10.1010-2_amd64.deb \
&& echo "[Simba]\nDriver = /opt/simba/spark/lib/64/libsparkodbc_sb64.so" >> /etc/odbcinst.ini \
&& rm /tmp/simba_odbc.zip \
&& rm -rf /tmp/SimbaSparkODBC*

WORKDIR /app

# Disalbe PIP Cache and Version Check
Expand Down
Binary file modified client/app/assets/images/db-logos/databricks.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions client/app/components/CreateSourceDialog.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ class CreateSourceDialog extends React.Component {
)}
</div>
<DynamicForm id="sourceForm" fields={fields} onSubmit={this.createSource} feedbackIcons hideSubmitButton />
{selectedType.type === "databricks" && (
<small>
By using the Databricks Data Source you agree to the Databricks JDBC/ODBC{" "}
<a href="https://databricks.com/spark/odbc-driver-download" target="_blank" rel="noopener noreferrer">
Driver Download Terms and Conditions
</a>
.
</small>
)}
</div>
);
}
Expand Down
166 changes: 113 additions & 53 deletions redash/query_runner/databricks.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,43 @@
import base64
from .hive_ds import Hive
from redash.query_runner import register
import datetime
from redash.query_runner import (
register,
BaseSQLQueryRunner,
TYPE_STRING,
TYPE_BOOLEAN,
TYPE_DATE,
TYPE_DATETIME,
TYPE_INTEGER,
TYPE_FLOAT,
)
from redash.utils import json_dumps
from redash import __version__

try:
from pyhive import hive
from thrift.transport import THttpClient
import pyodbc

enabled = True
except ImportError:
enabled = False


class Databricks(Hive):
TYPES_MAP = {
str: TYPE_STRING,
bool: TYPE_BOOLEAN,
datetime.date: TYPE_DATE,
datetime.datetime: TYPE_DATETIME,
int: TYPE_INTEGER,
float: TYPE_FLOAT,
}


def _build_odbc_connection_string(**kwargs):
return ";".join([f"{k}={v}" for k, v in kwargs.items()])


class Databricks(BaseSQLQueryRunner):
noop_query = "SELECT 1"
should_annotate_query = False

@classmethod
def type(cls):
return "databricks"
Expand All @@ -26,67 +52,101 @@ def configuration_schema(cls):
"type": "object",
"properties": {
"host": {"type": "string"},
"database": {"type": "string"},
"http_path": {"type": "string", "title": "HTTP Path"},
# We're using `http_password` here for legacy reasons
"http_password": {"type": "string", "title": "Access Token"},
"schemas": {"type": "string", "title": "Schemas to Load Metadata For"},
},
"order": ["host", "http_path", "http_password", "database"],
"order": ["host", "http_path", "http_password"],
"secret": ["http_password"],
"required": ["host", "database", "http_path", "http_password"],
"required": ["host", "http_path", "http_password"],
}

def _get_connection(self):
host = self.configuration["host"]
def _get_cursor(self):
user_agent = "Redash/{} (Databricks)".format(__version__.split("-")[0])
connection_string = _build_odbc_connection_string(
Driver="Simba",
UID="token",
PORT="443",
SSL="1",
THRIFTTRANSPORT="2",
SPARKSERVERTYPE="3",
AUTHMECH=3,
# Use the query as is without rewriting:
UseNativeQuery="1",
# Automatically reconnect to the cluster if an error occurs
AutoReconnect="1",
# Minimum interval between consecutive polls for query execution status (1ms)
AsyncExecPollInterval="1",
UserAgentEntry=user_agent,
HOST=self.configuration["host"],
PWD=self.configuration["http_password"],
HTTPPath=self.configuration["http_path"],
)

connection = pyodbc.connect(connection_string, autocommit=True)
return connection.cursor()

def run_query(self, query, user):
try:
cursor = self._get_cursor()

cursor.execute(query)

if cursor.description is not None:
data = cursor.fetchall()
columns = self.fetch_columns(
[
(i[0], TYPES_MAP.get(i[1], TYPE_STRING))
for i in cursor.description
]
)

# if path is set but is missing initial slash, append it
path = self.configuration.get("http_path", "")
if path and path[0] != "/":
path = "/" + path
rows = [
dict(zip((column["name"] for column in columns), row))
for row in data
]

http_uri = "https://{}{}".format(host, path)
data = {"columns": columns, "rows": rows}
json_data = json_dumps(data)
error = None
else:
error = None
json_data = json_dumps(
{
"columns": [{"name": "result", "type": TYPE_STRING}],
"rows": [{"result": "No data was returned."}],
}
)

cursor.close()
except pyodbc.Error as e:
if len(e.args) > 1:
error = str(e.args[1])
else:
error = str(e)
json_data = None

return json_data, error

transport = THttpClient.THttpClient(http_uri)
def _get_tables(self, schema):
cursor = self._get_cursor()

password = self.configuration.get("http_password", "")
auth = base64.b64encode(b"token:" + password.encode("ascii"))
transport.setCustomHeaders({"Authorization": "Basic " + auth.decode()})
schemas = self.configuration.get(
"schemas", self.configuration.get("database", "")
).split(",")

connection = hive.connect(thrift_transport=transport)
return connection
for schema_name in schemas:
cursor.columns(schema=schema_name)

def _get_tables(self, schema):
schemas_query = "show schemas"
tables_query = "show tables in %s"
columns_query = "show columns in %s.%s"

schemas = self._run_query_internal(schemas_query)

for schema_name in [
a for a in [str(a["databaseName"]) for a in schemas] if len(a) > 0
]:
for table_name in [
a
for a in [
str(a["tableName"])
for a in self._run_query_internal(tables_query % schema_name)
]
if len(a) > 0
]:
columns = [
a
for a in [
str(a["col_name"])
for a in self._run_query_internal(
columns_query % (schema_name, table_name)
)
]
if len(a) > 0
]
for column in cursor:
table_name = "{}.{}".format(column[1], column[2])

if table_name not in schema:
schema[table_name] = {"name": table_name, "columns": []}

if schema_name != "default":
table_name = "{}.{}".format(schema_name, table_name)
schema[table_name]["columns"].append(column[3])

schema[table_name] = {"name": table_name, "columns": columns}
return list(schema.values())


Expand Down

0 comments on commit ea8a075

Please sign in to comment.