Skip to content
This repository has been archived by the owner on Jul 9, 2022. It is now read-only.

Commit

Permalink
Adding schema_filter and table_filter for Presto runner
Browse files Browse the repository at this point in the history
And load schemas one by one. This should improve performance
for large Presto instances where a single schema may contain
thousands of tables.

Plus, in lower version of Presto, sometimes the old query will throw
"outputFormat should not be accessed from a null StorageFormat"
error (see prestodb/presto#6972). This change allows us to skip
this error and still return valid results.
  • Loading branch information
ktmud committed Jul 3, 2019
1 parent da21310 commit 6ae4e50
Showing 1 changed file with 44 additions and 20 deletions.
64 changes: 44 additions & 20 deletions redash/query_runner/presto.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import re
import logging

from collections import defaultdict
from redash.query_runner import *
from redash.utils import json_dumps, json_loads

import logging

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -47,9 +50,17 @@ def configuration_schema(cls):
'port': {
'type': 'number'
},
'schema': {
'default_schema': {
'type': 'string'
},
'schema_filter': {
'type': 'string',
'default': 'RegExp to filter schema name'
},
'table_filter': {
'type': 'string',
'default': 'RegExp to filter schema.table'
},
'catalog': {
'type': 'string'
},
Expand All @@ -60,7 +71,8 @@ def configuration_schema(cls):
'type': 'string'
},
},
'order': ['host', 'protocol', 'port', 'username', 'password', 'schema', 'catalog'],
'order': ['host', 'protocol', 'port', 'username', 'password',
'default_schema', 'schema_filter', 'table_filter', 'catalog'],
'required': ['host']
}

Expand All @@ -72,28 +84,40 @@ def enabled(cls):
def type(cls):
return "presto"

def get_schema(self, get_stats=False):
schema = {}
query = """
SELECT table_schema, table_name, column_name
FROM information_schema.columns
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""

def get_rows(self, query, colnames):
"""Return query results as a list of tuples"""
results, error = self.run_query(query, None)

# Skip null StorageFormat error
if error == 'outputFormat should not be accessed from a null StorageFormat':
return []
if error is not None:
raise Exception("Failed getting schema.")

results = json_loads(results)
return [tuple(row[col] for col in colnames) for row in results["rows"]]

for row in results['rows']:
table_name = '{}.{}'.format(row['table_schema'], row['table_name'])

if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}

schema[table_name]['columns'].append(row['column_name'])
def get_schema(self, get_stats=False):
schema = {}
catalog = self.configuration.get('catalog', 'hive')
table_filter = self.configuration.get('table_filter', '')
schema_filter = re.compile(self.configuration.get('schema_filter', ''))

for (schem, ) in self.get_rows("SHOW SCHEMAS", ["Schema"]):
if not schema_filter.match(schem):
continue
query = """
SELECT
table_name, column_name
FROM system.jdbc.columns
WHERE table_cat = '{catalog}'
AND regexp_like(concat(table_schem, '.', table_name), '{table_filter}')
AND table_schem = '{schem}'
""".format(catalog=catalog, schem=schem, table_filter=table_filter)

for (tbl, col) in self.get_rows(query, ("table_name", "column_name")):
table_name = '{}.{}'.format(schem, tbl)
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
schema[table_name]['columns'].append(col)

return schema.values()

Expand Down

0 comments on commit 6ae4e50

Please sign in to comment.