From 5293f5521d795d9f97a7470b1b9bd97091a190f4 Mon Sep 17 00:00:00 2001 From: Rob Moore Date: Thu, 19 Oct 2023 14:38:13 +0100 Subject: [PATCH] fix(sqllab): reinstate "Force trino client async execution" (#25680) --- .../databases/installing-database-drivers.mdx | 81 ++++++++++--------- docs/docs/frequently-asked-questions.mdx | 2 +- .../installation/configuring-superset.mdx | 4 +- superset/config.py | 5 +- superset/db_engine_specs/base.py | 18 +++++ superset/db_engine_specs/trino.py | 66 +++++++++++++-- superset/sql_lab.py | 7 +- .../unit_tests/db_engine_specs/test_trino.py | 31 ++++++- tests/unit_tests/sql_lab_test.py | 10 +-- 9 files changed, 163 insertions(+), 61 deletions(-) diff --git a/docs/docs/databases/installing-database-drivers.mdx b/docs/docs/databases/installing-database-drivers.mdx index e4e972f0648b2..57652db4b8cb7 100644 --- a/docs/docs/databases/installing-database-drivers.mdx +++ b/docs/docs/databases/installing-database-drivers.mdx @@ -22,46 +22,47 @@ as well as the packages needed to connect to the databases you want to access th Some of the recommended packages are shown below. Please refer to [setup.py](https://github.com/apache/superset/blob/master/setup.py) for the versions that are compatible with Superset. -| Database | PyPI package | Connection String | -| --------------------------------------------------------- | ---------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------- | -| [Amazon Athena](/docs/databases/athena) | `pip install pyathena[pandas]` , `pip install PyAthenaJDBC` | `awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com/{ ` | -| [Amazon DynamoDB](/docs/databases/dynamodb) | `pip install pydynamodb` | `dynamodb://{access_key_id}:{secret_access_key}@dynamodb.{region_name}.amazonaws.com?connector=superset` | -| [Amazon Redshift](/docs/databases/redshift) | `pip install sqlalchemy-redshift` | ` redshift+psycopg2://:@:5439/` | -| [Apache Drill](/docs/databases/drill) | `pip install sqlalchemy-drill` | `drill+sadrill:// For JDBC drill+jdbc://` | -| [Apache Druid](/docs/databases/druid) | `pip install pydruid` | `druid://:@:/druid/v2/sql` | -| [Apache Hive](/docs/databases/hive) | `pip install pyhive` | `hive://hive@{hostname}:{port}/{database}` | -| [Apache Impala](/docs/databases/impala) | `pip install impyla` | `impala://{hostname}:{port}/{database}` | -| [Apache Kylin](/docs/databases/kylin) | `pip install kylinpy` | `kylin://:@:/?=&=` | -| [Apache Pinot](/docs/databases/pinot) | `pip install pinotdb` | `pinot://BROKER:5436/query?server=http://CONTROLLER:5983/` | -| [Apache Solr](/docs/databases/solr) | `pip install sqlalchemy-solr` | `solr://{username}:{password}@{hostname}:{port}/{server_path}/{collection}` | -| [Apache Spark SQL](/docs/databases/spark-sql) | `pip install pyhive` | `hive://hive@{hostname}:{port}/{database}` | -| [Ascend.io](/docs/databases/ascend) | `pip install impyla` | `ascend://{username}:{password}@{hostname}:{port}/{database}?auth_mechanism=PLAIN;use_ssl=true` | -| [Azure MS SQL](/docs/databases/sql-server) | `pip install pymssql` | `mssql+pymssql://UserName@presetSQL:TestPassword@presetSQL.database.windows.net:1433/TestSchema` | -| [Big Query](/docs/databases/bigquery) | `pip install sqlalchemy-bigquery` | `bigquery://{project_id}` | -| [ClickHouse](/docs/databases/clickhouse) | `pip install clickhouse-connect` | `clickhousedb://{username}:{password}@{hostname}:{port}/{database}` | -| [CockroachDB](/docs/databases/cockroachdb) | `pip install cockroachdb` | `cockroachdb://root@{hostname}:{port}/{database}?sslmode=disable` | -| [Dremio](/docs/databases/dremio) | `pip install sqlalchemy_dremio` | `dremio://user:pwd@host:31010/` | -| [Elasticsearch](/docs/databases/elasticsearch) | `pip install elasticsearch-dbapi` | `elasticsearch+http://{user}:{password}@{host}:9200/` | -| [Exasol](/docs/databases/exasol) | `pip install sqlalchemy-exasol` | `exa+pyodbc://{username}:{password}@{hostname}:{port}/my_schema?CONNECTIONLCALL=en_US.UTF-8&driver=EXAODBC` | -| [Google Sheets](/docs/databases/google-sheets) | `pip install shillelagh[gsheetsapi]` | `gsheets://` | -| [Firebolt](/docs/databases/firebolt) | `pip install firebolt-sqlalchemy` | `firebolt://{username}:{password}@{database} or firebolt://{username}:{password}@{database}/{engine_name}` | -| [Hologres](/docs/databases/hologres) | `pip install psycopg2` | `postgresql+psycopg2://:@/` | -| [IBM Db2](/docs/databases/ibm-db2) | `pip install ibm_db_sa` | `db2+ibm_db://` | -| [IBM Netezza Performance Server](/docs/databases/netezza) | `pip install nzalchemy` | `netezza+nzpy://:@/` | -| [MySQL](/docs/databases/mysql) | `pip install mysqlclient` | `mysql://:@/` | -| [Oracle](/docs/databases/oracle) | `pip install cx_Oracle` | `oracle://` | -| [PostgreSQL](/docs/databases/postgres) | `pip install psycopg2` | `postgresql://:@/` | -| [Trino](/docs/databases/trino) | `pip install trino` | `trino://{username}:{password}@{hostname}:{port}/{catalog}` | -| [Presto](/docs/databases/presto) | `pip install pyhive` | `presto://` | -| [SAP Hana](/docs/databases/hana) | `pip install hdbcli sqlalchemy-hana or pip install apache-superset[hana]` | `hana://{username}:{password}@{host}:{port}` | -| [StarRocks](/docs/databases/starrocks) | `pip install starrocks` | `starrocks://:@:/.` | -| [Snowflake](/docs/databases/snowflake) | `pip install snowflake-sqlalchemy` | `snowflake://{user}:{password}@{account}.{region}/{database}?role={role}&warehouse={warehouse}` | -| SQLite | No additional library needed | `sqlite://` | -| [SQL Server](/docs/databases/sql-server) | `pip install pymssql` | `mssql+pymssql://` | -| [Teradata](/docs/databases/teradata) | `pip install teradatasqlalchemy` | `teradatasql://{user}:{password}@{host}` | -| [TimescaleDB](/docs/databases/timescaledb) | `pip install psycopg2` | `postgresql://:@:/` | -| [Vertica](/docs/databases/vertica) | `pip install sqlalchemy-vertica-python` | `vertica+vertica_python://:@/` | -| [YugabyteDB](/docs/databases/yugabytedb) | `pip install psycopg2` | `postgresql://:@/` | +| Database | PyPI package | Connection String | +| --------------------------------------------------------- | ------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------- | +| [Amazon Athena](/docs/databases/athena) | `pip install pyathena[pandas]` , `pip install PyAthenaJDBC` | `awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com/{ ` | +| [Amazon DynamoDB](/docs/databases/dynamodb) | `pip install pydynamodb` | `dynamodb://{access_key_id}:{secret_access_key}@dynamodb.{region_name}.amazonaws.com?connector=superset` | +| [Amazon Redshift](/docs/databases/redshift) | `pip install sqlalchemy-redshift` | ` redshift+psycopg2://:@:5439/` | +| [Apache Drill](/docs/databases/drill) | `pip install sqlalchemy-drill` | `drill+sadrill:// For JDBC drill+jdbc://` | +| [Apache Druid](/docs/databases/druid) | `pip install pydruid` | `druid://:@:/druid/v2/sql` | +| [Apache Hive](/docs/databases/hive) | `pip install pyhive` | `hive://hive@{hostname}:{port}/{database}` | +| [Apache Impala](/docs/databases/impala) | `pip install impyla` | `impala://{hostname}:{port}/{database}` | +| [Apache Kylin](/docs/databases/kylin) | `pip install kylinpy` | `kylin://:@:/?=&=` | +| [Apache Pinot](/docs/databases/pinot) | `pip install pinotdb` | `pinot://BROKER:5436/query?server=http://CONTROLLER:5983/` | +| [Apache Solr](/docs/databases/solr) | `pip install sqlalchemy-solr` | `solr://{username}:{password}@{hostname}:{port}/{server_path}/{collection}` | +| [Apache Spark SQL](/docs/databases/spark-sql) | `pip install pyhive` | `hive://hive@{hostname}:{port}/{database}` | +| [Ascend.io](/docs/databases/ascend) | `pip install impyla` | `ascend://{username}:{password}@{hostname}:{port}/{database}?auth_mechanism=PLAIN;use_ssl=true` | +| [Azure MS SQL](/docs/databases/sql-server) | `pip install pymssql` | `mssql+pymssql://UserName@presetSQL:TestPassword@presetSQL.database.windows.net:1433/TestSchema` | +| [Big Query](/docs/databases/bigquery) | `pip install sqlalchemy-bigquery` | `bigquery://{project_id}` | +| [ClickHouse](/docs/databases/clickhouse) | `pip install clickhouse-connect` | `clickhousedb://{username}:{password}@{hostname}:{port}/{database}` | +| [CockroachDB](/docs/databases/cockroachdb) | `pip install cockroachdb` | `cockroachdb://root@{hostname}:{port}/{database}?sslmode=disable` | +| [Dremio](/docs/databases/dremio) | `pip install sqlalchemy_dremio` | `dremio://user:pwd@host:31010/` | +| [Elasticsearch](/docs/databases/elasticsearch) | `pip install elasticsearch-dbapi` | `elasticsearch+http://{user}:{password}@{host}:9200/` | +| [Exasol](/docs/databases/exasol) | `pip install sqlalchemy-exasol` | `exa+pyodbc://{username}:{password}@{hostname}:{port}/my_schema?CONNECTIONLCALL=en_US.UTF-8&driver=EXAODBC` | +| [Google Sheets](/docs/databases/google-sheets) | `pip install shillelagh[gsheetsapi]` | `gsheets://` | +| [Firebolt](/docs/databases/firebolt) | `pip install firebolt-sqlalchemy` | `firebolt://{username}:{password}@{database} or firebolt://{username}:{password}@{database}/{engine_name}` | +| [Hologres](/docs/databases/hologres) | `pip install psycopg2` | `postgresql+psycopg2://:@/` | +| [IBM Db2](/docs/databases/ibm-db2) | `pip install ibm_db_sa` | `db2+ibm_db://` | +| [IBM Netezza Performance Server](/docs/databases/netezza) | `pip install nzalchemy` | `netezza+nzpy://:@/` | +| [MySQL](/docs/databases/mysql) | `pip install mysqlclient` | `mysql://:@/` | +| [Oracle](/docs/databases/oracle) | `pip install cx_Oracle` | `oracle://` | +| [PostgreSQL](/docs/databases/postgres) | `pip install psycopg2` | `postgresql://:@/` | +| [Trino](/docs/databases/trino) | `pip install trino` | `trino://{username}:{password}@{hostname}:{port}/{catalog}` | +| [Presto](/docs/databases/presto) | `pip install pyhive` | `presto://` | +| [SAP Hana](/docs/databases/hana) | `pip install hdbcli sqlalchemy-hana or pip install apache-superset[hana]` | `hana://{username}:{password}@{host}:{port}` | +| [StarRocks](/docs/databases/starrocks) | `pip install starrocks` | `starrocks://:@:/.` | +| [Snowflake](/docs/databases/snowflake) | `pip install snowflake-sqlalchemy` | `snowflake://{user}:{password}@{account}.{region}/{database}?role={role}&warehouse={warehouse}` | +| SQLite | No additional library needed | `sqlite://path/to/file.db?check_same_thread=false` | +| [SQL Server](/docs/databases/sql-server) | `pip install pymssql` | `mssql+pymssql://` | +| [Teradata](/docs/databases/teradata) | `pip install teradatasqlalchemy` | `teradatasql://{user}:{password}@{host}` | +| [TimescaleDB](/docs/databases/timescaledb) | `pip install psycopg2` | `postgresql://:@:/` | +| [Vertica](/docs/databases/vertica) | `pip install sqlalchemy-vertica-python` | `vertica+vertica_python://:@/` | +| [YugabyteDB](/docs/databases/yugabytedb) | `pip install psycopg2` | `postgresql://:@/` | + --- Note that many other databases are supported, the main criteria being the existence of a functional diff --git a/docs/docs/frequently-asked-questions.mdx b/docs/docs/frequently-asked-questions.mdx index bbb94d617b986..79a0863b088dc 100644 --- a/docs/docs/frequently-asked-questions.mdx +++ b/docs/docs/frequently-asked-questions.mdx @@ -168,7 +168,7 @@ Another workaround is to change where superset stores the sqlite database by add `superset_config.py`: ``` -SQLALCHEMY_DATABASE_URI = 'sqlite:////new/location/superset.db' +SQLALCHEMY_DATABASE_URI = 'sqlite:////new/location/superset.db?check_same_thread=false' ``` You can read more about customizing Superset using the configuration file diff --git a/docs/docs/installation/configuring-superset.mdx b/docs/docs/installation/configuring-superset.mdx index 9cb3aaefacc71..c6108d6f59c8f 100644 --- a/docs/docs/installation/configuring-superset.mdx +++ b/docs/docs/installation/configuring-superset.mdx @@ -32,7 +32,9 @@ SECRET_KEY = 'YOUR_OWN_RANDOM_GENERATED_SECRET_KEY' # superset metadata (slices, connections, tables, dashboards, ...). # Note that the connection information to connect to the datasources # you want to explore are managed directly in the web UI -SQLALCHEMY_DATABASE_URI = 'sqlite:////path/to/superset.db' +# The check_same_thread=false property ensures the sqlite client does not attempt +# to enforce single-threaded access, which may be problematic in some edge cases +SQLALCHEMY_DATABASE_URI = 'sqlite:////path/to/superset.db?check_same_thread=false' # Flask-WTF flag for CSRF WTF_CSRF_ENABLED = True diff --git a/superset/config.py b/superset/config.py index 27f78832d1e3b..73553fcc6c303 100644 --- a/superset/config.py +++ b/superset/config.py @@ -186,7 +186,10 @@ def _try_json_readsha(filepath: str, length: int) -> str | None: SECRET_KEY = os.environ.get("SUPERSET_SECRET_KEY") or CHANGE_ME_SECRET_KEY # The SQLAlchemy connection string. -SQLALCHEMY_DATABASE_URI = "sqlite:///" + os.path.join(DATA_DIR, "superset.db") +SQLALCHEMY_DATABASE_URI = ( + f"""sqlite:///{os.path.join(DATA_DIR, "superset.db")}?check_same_thread=false""" +) + # SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp' # SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp' diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index 5836e6163f8d9..6be3ab24b0c13 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -1053,6 +1053,24 @@ def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None: query object""" # TODO: Fix circular import error caused by importing sql_lab.Query + @classmethod + def execute_with_cursor( + cls, cursor: Any, sql: str, query: Query, session: Session + ) -> None: + """ + Trigger execution of a query and handle the resulting cursor. + + For most implementations this just makes calls to `execute` and + `handle_cursor` consecutively, but in some engines (e.g. Trino) we may + need to handle client limitations such as lack of async support and + perform a more complicated operation to get information from the cursor + in a timely manner and facilitate operations such as query stop + """ + logger.debug("Query %d: Running query: %s", query.id, sql) + cls.execute(cursor, sql, async_=True) + logger.debug("Query %d: Handling cursor", query.id) + cls.handle_cursor(cursor, query, session) + @classmethod def extract_error_message(cls, ex: Exception) -> str: return f"{cls.engine} error: {cls._extract_error_message(ex)}" diff --git a/superset/db_engine_specs/trino.py b/superset/db_engine_specs/trino.py index eff78c4fa4eb5..f758f1fadd1aa 100644 --- a/superset/db_engine_specs/trino.py +++ b/superset/db_engine_specs/trino.py @@ -17,6 +17,8 @@ from __future__ import annotations import logging +import threading +import time from typing import Any, TYPE_CHECKING import simplejson as json @@ -154,14 +156,21 @@ def get_tracking_url(cls, cursor: Cursor) -> str | None: @classmethod def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None: - if tracking_url := cls.get_tracking_url(cursor): - query.tracking_url = tracking_url + """ + Handle a trino client cursor. + + WARNING: if you execute a query, it will block until complete and you + will not be able to handle the cursor until complete. Use + `execute_with_cursor` instead, to handle this asynchronously. + """ # Adds the executed query id to the extra payload so the query can be cancelled - query.set_extra_json_key( - key=QUERY_CANCEL_KEY, - value=(cancel_query_id := cursor.stats["queryId"]), - ) + cancel_query_id = cursor.query_id + logger.debug("Query %d: queryId %s found in cursor", query.id, cancel_query_id) + query.set_extra_json_key(key=QUERY_CANCEL_KEY, value=cancel_query_id) + + if tracking_url := cls.get_tracking_url(cursor): + query.tracking_url = tracking_url session.commit() @@ -176,6 +185,51 @@ def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None: super().handle_cursor(cursor=cursor, query=query, session=session) + @classmethod + def execute_with_cursor( + cls, cursor: Any, sql: str, query: Query, session: Session + ) -> None: + """ + Trigger execution of a query and handle the resulting cursor. + + Trino's client blocks until the query is complete, so we need to run it + in another thread and invoke `handle_cursor` to poll for the query ID + to appear on the cursor in parallel. + """ + execute_result: dict[str, Any] = {} + + def _execute(results: dict[str, Any]) -> None: + logger.debug("Query %d: Running query: %s", query.id, sql) + + # Pass result / exception information back to the parent thread + try: + cls.execute(cursor, sql) + results["complete"] = True + except Exception as ex: # pylint: disable=broad-except + results["complete"] = True + results["error"] = ex + + execute_thread = threading.Thread(target=_execute, args=(execute_result,)) + execute_thread.start() + + # Wait for a query ID to be available before handling the cursor, as + # it's required by that method; it may never become available on error. + while not cursor.query_id and not execute_result.get("complete"): + time.sleep(0.1) + + logger.debug("Query %d: Handling cursor", query.id) + cls.handle_cursor(cursor, query, session) + + # Block until the query completes; same behaviour as the client itself + logger.debug("Query %d: Waiting for query to complete", query.id) + while not execute_result.get("complete"): + time.sleep(0.5) + + # Unfortunately we'll mangle the stack trace due to the thread, but + # throwing the original exception allows mapping database errors as normal + if err := execute_result.get("error"): + raise err + @classmethod def prepare_cancel_query(cls, query: Query, session: Session) -> None: if QUERY_CANCEL_KEY not in query.extra: diff --git a/superset/sql_lab.py b/superset/sql_lab.py index afc682b10fbcf..ca157b324085d 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -191,7 +191,7 @@ def get_sql_results( # pylint: disable=too-many-arguments return handle_query_error(ex, query, session) -def execute_sql_statement( # pylint: disable=too-many-arguments,too-many-statements +def execute_sql_statement( # pylint: disable=too-many-arguments sql_statement: str, query: Query, session: Session, @@ -271,10 +271,7 @@ def execute_sql_statement( # pylint: disable=too-many-arguments,too-many-statem ) session.commit() with stats_timing("sqllab.query.time_executing_query", stats_logger): - logger.debug("Query %d: Running query: %s", query.id, sql) - db_engine_spec.execute(cursor, sql, async_=True) - logger.debug("Query %d: Handling cursor", query.id) - db_engine_spec.handle_cursor(cursor, query, session) + db_engine_spec.execute_with_cursor(cursor, sql, query, session) with stats_timing("sqllab.query.time_fetching_results", stats_logger): logger.debug( diff --git a/tests/unit_tests/db_engine_specs/test_trino.py b/tests/unit_tests/db_engine_specs/test_trino.py index 963953d18b48e..1b50a683a0841 100644 --- a/tests/unit_tests/db_engine_specs/test_trino.py +++ b/tests/unit_tests/db_engine_specs/test_trino.py @@ -352,7 +352,7 @@ def test_handle_cursor_early_cancel( query_id = "myQueryId" cursor_mock = engine_mock.return_value.__enter__.return_value - cursor_mock.stats = {"queryId": query_id} + cursor_mock.query_id = query_id session_mock = mocker.MagicMock() query = Query() @@ -366,3 +366,32 @@ def test_handle_cursor_early_cancel( assert cancel_query_mock.call_args[1]["cancel_query_id"] == query_id else: assert cancel_query_mock.call_args is None + + +def test_execute_with_cursor_in_parallel(mocker: MockerFixture): + """Test that `execute_with_cursor` fetches query ID from the cursor""" + from superset.db_engine_specs.trino import TrinoEngineSpec + + query_id = "myQueryId" + + mock_cursor = mocker.MagicMock() + mock_cursor.query_id = None + + mock_query = mocker.MagicMock() + mock_session = mocker.MagicMock() + + def _mock_execute(*args, **kwargs): + mock_cursor.query_id = query_id + + mock_cursor.execute.side_effect = _mock_execute + + TrinoEngineSpec.execute_with_cursor( + cursor=mock_cursor, + sql="SELECT 1 FROM foo", + query=mock_query, + session=mock_session, + ) + + mock_query.set_extra_json_key.assert_called_once_with( + key=QUERY_CANCEL_KEY, value=query_id + ) diff --git a/tests/unit_tests/sql_lab_test.py b/tests/unit_tests/sql_lab_test.py index 29f45eab682a0..edc1fd2ec4a5d 100644 --- a/tests/unit_tests/sql_lab_test.py +++ b/tests/unit_tests/sql_lab_test.py @@ -55,8 +55,8 @@ def test_execute_sql_statement(mocker: MockerFixture, app: None) -> None: ) database.apply_limit_to_sql.assert_called_with("SELECT 42 AS answer", 2, force=True) - db_engine_spec.execute.assert_called_with( - cursor, "SELECT 42 AS answer LIMIT 2", async_=True + db_engine_spec.execute_with_cursor.assert_called_with( + cursor, "SELECT 42 AS answer LIMIT 2", query, session ) SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec) @@ -106,10 +106,8 @@ def test_execute_sql_statement_with_rls( 101, force=True, ) - db_engine_spec.execute.assert_called_with( - cursor, - "SELECT * FROM sales WHERE organization_id=42 LIMIT 101", - async_=True, + db_engine_spec.execute_with_cursor.assert_called_with( + cursor, "SELECT * FROM sales WHERE organization_id=42 LIMIT 101", query, session ) SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)