Skip to content

Commit

Permalink
[Data Sources] Add: Azure Data Explorer (Kusto) query runner (#4091)
Browse files Browse the repository at this point in the history
* [Data Sources] Add: Azure Data Explorer (Kusto) query runner

* CodeClimate fixes

* Remove TODO

* Fixed configuration properties names for Azure Kusto

* Azure Kusto: get_schema in one query

* azure-kusto-data update to 0.0.32

* Add Kusto to the default query runners list
  • Loading branch information
spacentropy authored and arikfr committed Aug 26, 2019
1 parent a2b68a3 commit ef9a4d5
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 1 deletion.
Binary file added client/app/assets/images/db-logos/azure_kusto.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
159 changes: 159 additions & 0 deletions redash/query_runner/azure_kusto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from redash.query_runner import BaseQueryRunner, register
from redash.query_runner import TYPE_STRING, TYPE_DATE, TYPE_DATETIME, TYPE_INTEGER, TYPE_FLOAT, TYPE_BOOLEAN
from redash.utils import json_dumps, json_loads


try:
from azure.kusto.data.request import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
enabled = True
except ImportError:
enabled = False

TYPES_MAP = {
'boolean': TYPE_BOOLEAN,
'datetime': TYPE_DATETIME,
'date': TYPE_DATE,
'dynamic': TYPE_STRING,
'guid': TYPE_STRING,
'int': TYPE_INTEGER,
'long': TYPE_INTEGER,
'real': TYPE_FLOAT,
'string': TYPE_STRING,
'timespan': TYPE_STRING,
'decimal': TYPE_FLOAT
}


class AzureKusto(BaseQueryRunner):
noop_query = "let noop = datatable (Noop:string)[1]; noop"

def __init__(self, configuration):
super(AzureKusto, self).__init__(configuration)
self.syntax = 'custom'

@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"cluster": {
"type": "string"
},
"azure_ad_client_id": {
"type": "string",
"title": "Azure AD Client ID"
},
"azure_ad_client_secret": {
"type": "string",
"title": "Azure AD Client Secret"
},
"azure_ad_tenant_id": {
"type": "string",
"title": "Azure AD Tenant Id"
},
"database": {
"type": "string"
}
},
"required": [
"cluster", "azure_ad_client_id", "azure_ad_client_secret",
"azure_ad_tenant_id", "database"
],
"order": [
"cluster", "azure_ad_client_id", "azure_ad_client_secret",
"azure_ad_tenant_id", "database"
],
"secret": ["azure_ad_client_secret"]
}

@classmethod
def enabled(cls):
return enabled

@classmethod
def annotate_query(cls):
return False

@classmethod
def type(cls):
return "azure_kusto"

@classmethod
def name(cls):
return "Azure Data Explorer (Kusto)"

def run_query(self, query, user):

kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
connection_string=self.configuration['cluster'],
aad_app_id=self.configuration['azure_ad_client_id'],
app_key=self.configuration['azure_ad_client_secret'],
authority_id=self.configuration['azure_ad_tenant_id'])

client = KustoClient(kcsb)

db = self.configuration['database']
try:
response = client.execute(db, query)

result_cols = response.primary_results[0].columns
result_rows = response.primary_results[0].rows

columns = []
rows = []
for c in result_cols:
columns.append({
'name': c.column_name,
'friendly_name': c.column_name,
'type': TYPES_MAP.get(c.column_type, None)
})

# rows must be [{'column1': value, 'column2': value}]
for row in result_rows:
rows.append(row.to_dict())

error = None
data = {'columns': columns, 'rows': rows}
json_data = json_dumps(data)

except KustoServiceError as err:
json_data = None
try:
error = err.args[1][0]['error']['@message']
except (IndexError, KeyError):
error = err.args[1]
except KeyboardInterrupt:
json_data = None
error = "Query cancelled by user."

return json_data, error

def get_schema(self, get_stats=False):
query = ".show database schema as json"

results, error = self.run_query(query, None)

if error is not None:
raise Exception("Failed getting schema.")

results = json_loads(results)

schema_as_json = json_loads(results['rows'][0]['DatabaseSchema'])
tables_list = schema_as_json['Databases'][self.configuration['database']]['Tables'].values()

schema = {}

for table in tables_list:
table_name = table['Name']

if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}

for column in table['OrderedColumns']:
schema[table_name]['columns'].append(column['Name'])

return schema.values()


register(AzureKusto)
1 change: 1 addition & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ def email_server_is_configured():
'redash.query_runner.json_ds',
'redash.query_runner.cass',
'redash.query_runner.dgraph',
'redash.query_runner.azure_kusto',
]

enabled_query_runners = array_from_string(os.environ.get("REDASH_ENABLED_QUERY_RUNNERS", ",".join(default_query_runners)))
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ passlib==1.6.2
aniso8601==1.1.0
blinker==1.3
psycopg2==2.7.3.2
python-dateutil==2.7.5
python-dateutil==2.8.0
pytz==2016.7
PyYAML==3.12
redis==3.2.1
Expand Down
1 change: 1 addition & 0 deletions requirements_all_ds.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ phoenixdb==0.7
# certifi is needed to support MongoDB and SSL:
certifi
pydgraph==1.2.0
azure-kusto-data==0.0.32

0 comments on commit ef9a4d5

Please sign in to comment.