From d25b0967a180beef7cce494f32f5cb2330519bf2 Mon Sep 17 00:00:00 2001 From: Shiva Raisinghani Date: Tue, 31 Aug 2021 00:20:25 -0700 Subject: [PATCH] feat: Add parquet upload (#14449) * allow csv upload to accept parquet file * fix mypy * fix if statement * add test for specificying columns in CSV upload * clean up test * change order in test * fix failures * upload parquet to seperate table in test * fix error message * fix mypy again * rename other extensions to columnar * add new form for columnar upload * add support for zip files * undo csv form changes except usecols * add more tests for zip * isort & black * pylint * fix trailing space * address more review comments * pylint * black * resolve remaining issues --- superset/config.py | 3 +- superset/initialization/__init__.py | 17 +- .../columnar_to_database_view/edit.html | 64 ++++++++ .../form_view/csv_to_database_view/edit.html | 2 +- .../excel_to_database_view/edit.html | 2 +- superset/views/core.py | 6 +- superset/views/database/forms.py | 144 ++++++++++++++++- superset/views/database/views.py | 152 +++++++++++++++++- tests/integration_tests/core_tests.py | 2 +- tests/integration_tests/csv_upload_tests.py | 111 +++++++++++++ 10 files changed, 493 insertions(+), 10 deletions(-) create mode 100644 superset/templates/superset/form_view/columnar_to_database_view/edit.html diff --git a/superset/config.py b/superset/config.py index c0946740c57ff..a794635edd505 100644 --- a/superset/config.py +++ b/superset/config.py @@ -556,7 +556,8 @@ def _try_json_readsha(filepath: str, length: int) -> Optional[str]: # Allowed format types for upload on Database view EXCEL_EXTENSIONS = {"xlsx", "xls"} CSV_EXTENSIONS = {"csv", "tsv", "txt"} -ALLOWED_EXTENSIONS = {*EXCEL_EXTENSIONS, *CSV_EXTENSIONS} +COLUMNAR_EXTENSIONS = {"parquet", "zip"} +ALLOWED_EXTENSIONS = {*EXCEL_EXTENSIONS, *CSV_EXTENSIONS, *COLUMNAR_EXTENSIONS} # CSV Options: key/value pairs that will be passed as argument to DataFrame.to_csv # method. diff --git a/superset/initialization/__init__.py b/superset/initialization/__init__.py index 79c466069e762..d94ac76aa777a 100644 --- a/superset/initialization/__init__.py +++ b/superset/initialization/__init__.py @@ -167,6 +167,7 @@ def init_views(self) -> None: DashboardModelViewAsync, ) from superset.views.database.views import ( + ColumnarToDatabaseView, CsvToDatabaseView, DatabaseView, ExcelToDatabaseView, @@ -281,6 +282,7 @@ def init_views(self) -> None: appbuilder.add_view_no_menu(CssTemplateAsyncModelView) appbuilder.add_view_no_menu(CsvToDatabaseView) appbuilder.add_view_no_menu(ExcelToDatabaseView) + appbuilder.add_view_no_menu(ColumnarToDatabaseView) appbuilder.add_view_no_menu(Dashboard) appbuilder.add_view_no_menu(DashboardModelViewAsync) appbuilder.add_view_no_menu(Datasource) @@ -371,7 +373,20 @@ def init_views(self) -> None: ) ), ) - + appbuilder.add_link( + "Upload a Columnar file", + label=__("Upload a Columnar file"), + href="/columnartodatabaseview/form", + icon="fa-upload", + category="Data", + category_label=__("Data"), + category_icon="fa-wrench", + cond=lambda: bool( + self.config["COLUMNAR_EXTENSIONS"].intersection( + self.config["ALLOWED_EXTENSIONS"] + ) + ), + ) try: import xlrd # pylint: disable=unused-import diff --git a/superset/templates/superset/form_view/columnar_to_database_view/edit.html b/superset/templates/superset/form_view/columnar_to_database_view/edit.html new file mode 100644 index 0000000000000..2371554edf43e --- /dev/null +++ b/superset/templates/superset/form_view/columnar_to_database_view/edit.html @@ -0,0 +1,64 @@ +{# + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +#} +{% extends 'appbuilder/general/model/edit.html' %} + +{% block tail_js %} + {{ super() }} + +{% endblock %} diff --git a/superset/templates/superset/form_view/csv_to_database_view/edit.html b/superset/templates/superset/form_view/csv_to_database_view/edit.html index ab647d4c26f50..57e5f70c103dc 100644 --- a/superset/templates/superset/form_view/csv_to_database_view/edit.html +++ b/superset/templates/superset/form_view/csv_to_database_view/edit.html @@ -36,7 +36,7 @@ function update_schemas_allowed_for_csv_upload(db_id) { $.ajax({ method: "GET", - url: "/superset/schemas_access_for_csv_upload", + url: "/superset/schemas_access_for_file_upload", data: {db_id: db_id}, dataType: 'json', contentType: "application/json; charset=utf-8" diff --git a/superset/templates/superset/form_view/excel_to_database_view/edit.html b/superset/templates/superset/form_view/excel_to_database_view/edit.html index fb7c432231ba6..b44346aaae9b1 100644 --- a/superset/templates/superset/form_view/excel_to_database_view/edit.html +++ b/superset/templates/superset/form_view/excel_to_database_view/edit.html @@ -36,7 +36,7 @@ function update_schemas_allowed_for_excel_upload(db_id) { $.ajax({ method: "GET", - url: "/superset/schemas_access_for_excel_upload", + url: "/superset/schemas_access_for_file_upload", data: {db_id: db_id}, dataType: 'json', contentType: "application/json; charset=utf-8" diff --git a/superset/views/core.py b/superset/views/core.py index 6ec0709f6fef5..65d4250671649 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -3072,11 +3072,11 @@ def sqllab_history(self) -> FlaskResponse: @api @has_access_api @event_logger.log_this - @expose("/schemas_access_for_csv_upload") - def schemas_access_for_csv_upload(self) -> FlaskResponse: + @expose("/schemas_access_for_file_upload") + def schemas_access_for_file_upload(self) -> FlaskResponse: """ This method exposes an API endpoint to - get the schema access control settings for csv upload in this database + get the schema access control settings for file upload in this database """ if not request.args.get("db_id"): return json_error_response("No database is allowed for your csv upload") diff --git a/superset/views/database/forms.py b/superset/views/database/forms.py index 1dd1c0b823194..1e391bc9c1e18 100644 --- a/superset/views/database/forms.py +++ b/superset/views/database/forms.py @@ -21,7 +21,13 @@ from flask_appbuilder.forms import DynamicForm from flask_babel import lazy_gettext as _ from flask_wtf.file import FileAllowed, FileField, FileRequired -from wtforms import BooleanField, IntegerField, SelectField, StringField +from wtforms import ( + BooleanField, + IntegerField, + MultipleFileField, + SelectField, + StringField, +) from wtforms.ext.sqlalchemy.fields import QuerySelectField from wtforms.validators import DataRequired, Length, NumberRange, Optional @@ -163,6 +169,15 @@ def at_least_one_schema_is_allowed(database: Database) -> bool: _("Mangle Duplicate Columns"), description=_('Specify duplicate columns as "X.0, X.1".'), ) + usecols = JsonListField( + _("Use Columns"), + default=None, + description=_( + "Json list of the column names that should be read. " + "If not None, only these columns will be read from the file." + ), + validators=[Optional()], + ) skipinitialspace = BooleanField( _("Skip Initial Space"), description=_("Skip spaces after delimiter.") ) @@ -402,3 +417,130 @@ def at_least_one_schema_is_allowed(database: Database) -> bool: 'Use [""] for empty string.' ), ) + + +class ColumnarToDatabaseForm(DynamicForm): + # pylint: disable=E0211 + def columnar_allowed_dbs() -> List[Database]: # type: ignore + # TODO: change allow_csv_upload to allow_file_upload + columnar_enabled_dbs = ( + db.session.query(Database).filter_by(allow_csv_upload=True).all() + ) + return [ + columnar_enabled_db + for columnar_enabled_db in columnar_enabled_dbs + if ColumnarToDatabaseForm.at_least_one_schema_is_allowed( + columnar_enabled_db + ) + ] + + @staticmethod + def at_least_one_schema_is_allowed(database: Database) -> bool: + """ + If the user has access to the database or all datasource + 1. if schemas_allowed_for_csv_upload is empty + a) if database does not support schema + user is able to upload columnar without specifying schema name + b) if database supports schema + user is able to upload columnar to any schema + 2. if schemas_allowed_for_csv_upload is not empty + a) if database does not support schema + This situation is impossible and upload will fail + b) if database supports schema + user is able to upload to schema in schemas_allowed_for_csv_upload + elif the user does not access to the database or all datasource + 1. if schemas_allowed_for_csv_upload is empty + a) if database does not support schema + user is unable to upload columnar + b) if database supports schema + user is unable to upload columnar + 2. if schemas_allowed_for_csv_upload is not empty + a) if database does not support schema + This situation is impossible and user is unable to upload columnar + b) if database supports schema + user is able to upload to schema in schemas_allowed_for_csv_upload + """ + if security_manager.can_access_database(database): + return True + schemas = database.get_schema_access_for_csv_upload() + if schemas and security_manager.schemas_accessible_by_user( + database, schemas, False + ): + return True + return False + + name = StringField( + _("Table Name"), + description=_("Name of table to be created from columnar data."), + validators=[DataRequired()], + widget=BS3TextFieldWidget(), + ) + columnar_file = MultipleFileField( + _("Columnar File"), + description=_("Select a Columnar file to be uploaded to a database."), + validators=[ + DataRequired(), + FileAllowed( + config["ALLOWED_EXTENSIONS"].intersection( + config["COLUMNAR_EXTENSIONS"] + ), + _( + "Only the following file extensions are allowed: " + "%(allowed_extensions)s", + allowed_extensions=", ".join( + config["ALLOWED_EXTENSIONS"].intersection( + config["COLUMNAR_EXTENSIONS"] + ) + ), + ), + ), + ], + ) + + con = QuerySelectField( + _("Database"), + query_factory=columnar_allowed_dbs, + get_pk=lambda a: a.id, + get_label=lambda a: a.database_name, + ) + schema = StringField( + _("Schema"), + description=_("Specify a schema (if database flavor supports this)."), + validators=[Optional()], + widget=BS3TextFieldWidget(), + ) + if_exists = SelectField( + _("Table Exists"), + description=_( + "If table exists do one of the following: " + "Fail (do nothing), Replace (drop and recreate table) " + "or Append (insert data)." + ), + choices=[ + ("fail", _("Fail")), + ("replace", _("Replace")), + ("append", _("Append")), + ], + validators=[DataRequired()], + ) + usecols = JsonListField( + _("Use Columns"), + default=None, + description=_( + "Json list of the column names that should be read. " + "If not None, only these columns will be read from the file." + ), + validators=[Optional()], + ) + index = BooleanField( + _("Dataframe Index"), description=_("Write dataframe index as a column.") + ) + index_label = StringField( + _("Column Label(s)"), + description=_( + "Column label for index column(s). If None is given " + "and Dataframe Index is True, Index Names are used." + ), + validators=[Optional()], + widget=BS3TextFieldWidget(), + ) diff --git a/superset/views/database/views.py b/superset/views/database/views.py index 0a3a274c5acfb..9e60e054710e9 100644 --- a/superset/views/database/views.py +++ b/superset/views/database/views.py @@ -14,8 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import io import os import tempfile +import zipfile from typing import TYPE_CHECKING import pandas as pd @@ -38,7 +40,7 @@ from superset.utils import core as utils from superset.views.base import DeleteMixin, SupersetModelView, YamlExportMixin -from .forms import CsvToDatabaseForm, ExcelToDatabaseForm +from .forms import ColumnarToDatabaseForm, CsvToDatabaseForm, ExcelToDatabaseForm from .mixins import DatabaseMixin from .validators import schema_allows_csv_upload, sqlalchemy_uri_validator @@ -162,6 +164,7 @@ def form_post(self, form: CsvToDatabaseForm) -> Response: iterator=True, keep_default_na=not form.null_values.data, mangle_dupe_cols=form.mangle_dupe_cols.data, + usecols=form.usecols.data if form.usecols.data else None, na_values=form.null_values.data if form.null_values.data else None, nrows=form.nrows.data, parse_dates=form.parse_dates.data, @@ -392,3 +395,150 @@ def form_post(self, form: ExcelToDatabaseForm) -> Response: flash(message, "info") stats_logger.incr("successful_excel_upload") return redirect("/tablemodelview/list/") + + +class ColumnarToDatabaseView(SimpleFormView): + form = ColumnarToDatabaseForm + form_template = "superset/form_view/columnar_to_database_view/edit.html" + form_title = _("Columnar to Database configuration") + add_columns = ["database", "schema", "table_name"] + + def form_get(self, form: ColumnarToDatabaseForm) -> None: + form.if_exists.data = "fail" + + def form_post( # pylint: disable=too-many-locals + self, form: ColumnarToDatabaseForm + ) -> Response: + database = form.con.data + columnar_table = Table(table=form.name.data, schema=form.schema.data) + files = form.columnar_file.data + file_type = {file.filename.split(".")[-1] for file in files} + + if file_type == {"zip"}: + zipfile_ob = zipfile.ZipFile( # pylint: disable=consider-using-with + form.columnar_file.data[0] + ) # pylint: disable=consider-using-with + file_type = {filename.split(".")[-1] for filename in zipfile_ob.namelist()} + files = [ + io.BytesIO((zipfile_ob.open(filename).read(), filename)[0]) + for filename in zipfile_ob.namelist() + ] + + if len(file_type) > 1: + message = _( + "Multiple file extensions are not allowed for columnar uploads." + " Please make sure all files are of the same extension.", + ) + flash(message, "danger") + return redirect("/columnartodatabaseview/form") + + read = pd.read_parquet + kwargs = { + "columns": form.usecols.data if form.usecols.data else None, + } + + if not schema_allows_csv_upload(database, columnar_table.schema): + message = _( + 'Database "%(database_name)s" schema "%(schema_name)s" ' + "is not allowed for columnar uploads. " + "Please contact your Superset Admin.", + database_name=database.database_name, + schema_name=columnar_table.schema, + ) + flash(message, "danger") + return redirect("/columnartodatabaseview/form") + + if "." in columnar_table.table and columnar_table.schema: + message = _( + "You cannot specify a namespace both in the name of the table: " + '"%(columnar_table.table)s" and in the schema field: ' + '"%(columnar_table.schema)s". Please remove one', + table=columnar_table.table, + schema=columnar_table.schema, + ) + flash(message, "danger") + return redirect("/columnartodatabaseview/form") + + try: + chunks = [read(file, **kwargs) for file in files] + df = pd.concat(chunks) + + database = ( + db.session.query(models.Database) + .filter_by(id=form.data.get("con").data.get("id")) + .one() + ) + + database.db_engine_spec.df_to_sql( + database, + columnar_table, + df, + to_sql_kwargs={ + "chunksize": 1000, + "if_exists": form.if_exists.data, + "index": form.index.data, + "index_label": form.index_label.data, + }, + ) + + # Connect table to the database that should be used for exploration. + # E.g. if hive was used to upload a csv, presto will be a better option + # to explore the table. + expore_database = database + explore_database_id = database.explore_database_id + if explore_database_id: + expore_database = ( + db.session.query(models.Database) + .filter_by(id=explore_database_id) + .one_or_none() + or database + ) + + sqla_table = ( + db.session.query(SqlaTable) + .filter_by( + table_name=columnar_table.table, + schema=columnar_table.schema, + database_id=expore_database.id, + ) + .one_or_none() + ) + + if sqla_table: + sqla_table.fetch_metadata() + if not sqla_table: + sqla_table = SqlaTable(table_name=columnar_table.table) + sqla_table.database = expore_database + sqla_table.database_id = database.id + sqla_table.user_id = g.user.get_id() + sqla_table.schema = columnar_table.schema + sqla_table.fetch_metadata() + db.session.add(sqla_table) + db.session.commit() + except Exception as ex: # pylint: disable=broad-except + db.session.rollback() + message = _( + 'Unable to upload Columnar file "%(filename)s" to table ' + '"%(table_name)s" in database "%(db_name)s". ' + "Error message: %(error_msg)s", + filename=[file.filename for file in form.columnar_file.data], + table_name=form.name.data, + db_name=database.database_name, + error_msg=str(ex), + ) + + flash(message, "danger") + stats_logger.incr("failed_columnar_upload") + return redirect("/columnartodatabaseview/form") + + # Go back to welcome page / splash screen + message = _( + 'Columnar file "%(columnar_filename)s" uploaded to table "%(table_name)s" ' + 'in database "%(db_name)s"', + columnar_filename=[file.filename for file in form.columnar_file.data], + table_name=str(columnar_table), + db_name=sqla_table.database.database_name, + ) + flash(message, "info") + stats_logger.incr("successful_columnar_upload") + return redirect("/tablemodelview/list/") diff --git a/tests/integration_tests/core_tests.py b/tests/integration_tests/core_tests.py index c609e6ba383d0..57955c2a7620e 100644 --- a/tests/integration_tests/core_tests.py +++ b/tests/integration_tests/core_tests.py @@ -1184,7 +1184,7 @@ def test_schemas_access_for_csv_upload_endpoint( mock_can_access_database.return_value = False mock_schemas_accessible.return_value = ["this_schema_is_allowed_too"] data = self.get_json_resp( - url="/superset/schemas_access_for_csv_upload?db_id={db_id}".format( + url="/superset/schemas_access_for_file_upload?db_id={db_id}".format( db_id=dbobj.id ) ) diff --git a/tests/integration_tests/csv_upload_tests.py b/tests/integration_tests/csv_upload_tests.py index a8821fb0308d2..23ebc27d3ed2c 100644 --- a/tests/integration_tests/csv_upload_tests.py +++ b/tests/integration_tests/csv_upload_tests.py @@ -19,6 +19,7 @@ import json import logging import os +import shutil from typing import Dict, Optional from unittest import mock @@ -43,9 +44,14 @@ CSV_FILENAME1 = "testCSV1.csv" CSV_FILENAME2 = "testCSV2.csv" EXCEL_FILENAME = "testExcel.xlsx" +PARQUET_FILENAME1 = "testZip/testParquet1.parquet" +PARQUET_FILENAME2 = "testZip/testParquet2.parquet" +ZIP_DIRNAME = "testZip" +ZIP_FILENAME = "testZip.zip" EXCEL_UPLOAD_TABLE = "excel_upload" CSV_UPLOAD_TABLE = "csv_upload" +PARQUET_UPLOAD_TABLE = "parquet_upload" CSV_UPLOAD_TABLE_W_SCHEMA = "csv_upload_w_schema" CSV_UPLOAD_TABLE_W_EXPLORE = "csv_upload_w_explore" @@ -70,6 +76,7 @@ def setup_csv_upload(): engine = upload_db.get_sqla_engine() engine.execute(f"DROP TABLE IF EXISTS {EXCEL_UPLOAD_TABLE}") engine.execute(f"DROP TABLE IF EXISTS {CSV_UPLOAD_TABLE}") + engine.execute(f"DROP TABLE IF EXISTS {PARQUET_UPLOAD_TABLE}") engine.execute(f"DROP TABLE IF EXISTS {CSV_UPLOAD_TABLE_W_SCHEMA}") engine.execute(f"DROP TABLE IF EXISTS {CSV_UPLOAD_TABLE_W_EXPLORE}") db.session.delete(upload_db) @@ -97,6 +104,17 @@ def create_excel_files(): os.remove(EXCEL_FILENAME) +@pytest.fixture() +def create_columnar_files(): + os.mkdir(ZIP_DIRNAME) + pd.DataFrame({"a": ["john", "paul"], "b": [1, 2]}).to_parquet(PARQUET_FILENAME1) + pd.DataFrame({"a": ["max", "bob"], "b": [3, 4]}).to_parquet(PARQUET_FILENAME2) + shutil.make_archive(ZIP_DIRNAME, "zip", ZIP_DIRNAME) + yield + os.remove(ZIP_FILENAME) + shutil.rmtree(ZIP_DIRNAME) + + def get_upload_db(): return db.session.query(Database).filter_by(database_name=CSV_UPLOAD_DATABASE).one() @@ -134,6 +152,22 @@ def upload_excel( return get_resp(test_client, "/exceltodatabaseview/form", data=form_data) +def upload_columnar( + filename: str, table_name: str, extra: Optional[Dict[str, str]] = None +): + columnar_upload_db_id = get_upload_db().id + form_data = { + "columnar_file": open(filename, "rb"), + "name": table_name, + "con": columnar_upload_db_id, + "if_exists": "fail", + "index_label": "test_label", + } + if extra: + form_data.update(extra) + return get_resp(test_client, "/columnartodatabaseview/form", data=form_data) + + def mock_upload_to_s3(filename: str, upload_prefix: str, table: Table) -> str: """ HDFS is used instead of S3 for the unit tests.integration_tests. @@ -249,6 +283,18 @@ def test_import_csv(setup_csv_upload, create_csv_files): ) assert success_msg_f1 in resp + # upload again with replace mode and specific columns + resp = upload_csv( + CSV_FILENAME1, + CSV_UPLOAD_TABLE, + extra={"if_exists": "replace", "usecols": '["a"]'}, + ) + assert success_msg_f1 in resp + + # make sure only specified column name was read + table = SupersetTestCase.get_table(name=CSV_UPLOAD_TABLE) + assert "b" not in table.column_names + # upload again with replace mode resp = upload_csv(CSV_FILENAME1, CSV_UPLOAD_TABLE, extra={"if_exists": "replace"}) assert success_msg_f1 in resp @@ -328,3 +374,68 @@ def test_import_excel(setup_csv_upload, create_excel_files): .fetchall() ) assert data == [(0, "john", 1), (1, "paul", 2)] + + +@mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3) +def test_import_parquet(setup_csv_upload, create_columnar_files): + if utils.backend() == "hive": + pytest.skip("Hive doesn't allow parquet upload.") + + success_msg_f1 = f'Columnar file "[\'{PARQUET_FILENAME1}\']" uploaded to table "{PARQUET_UPLOAD_TABLE}"' + + # initial upload with fail mode + resp = upload_columnar(PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE) + assert success_msg_f1 in resp + + # upload again with fail mode; should fail + fail_msg = f'Unable to upload Columnar file "[\'{PARQUET_FILENAME1}\']" to table "{PARQUET_UPLOAD_TABLE}"' + resp = upload_columnar(PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE) + assert fail_msg in resp + + if utils.backend() != "hive": + # upload again with append mode + resp = upload_columnar( + PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE, extra={"if_exists": "append"} + ) + assert success_msg_f1 in resp + + # upload again with replace mode and specific columns + resp = upload_columnar( + PARQUET_FILENAME1, + PARQUET_UPLOAD_TABLE, + extra={"if_exists": "replace", "usecols": '["a"]'}, + ) + assert success_msg_f1 in resp + + # make sure only specified column name was read + table = SupersetTestCase.get_table(name=PARQUET_UPLOAD_TABLE) + assert "b" not in table.column_names + + # upload again with replace mode + resp = upload_columnar( + PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE, extra={"if_exists": "replace"} + ) + assert success_msg_f1 in resp + + data = ( + get_upload_db() + .get_sqla_engine() + .execute(f"SELECT * from {PARQUET_UPLOAD_TABLE}") + .fetchall() + ) + assert data == [("john", 1), ("paul", 2)] + + # replace table with zip file + resp = upload_columnar( + ZIP_FILENAME, PARQUET_UPLOAD_TABLE, extra={"if_exists": "replace"} + ) + success_msg_f2 = f'Columnar file "[\'{ZIP_FILENAME}\']" uploaded to table "{PARQUET_UPLOAD_TABLE}"' + assert success_msg_f2 in resp + + data = ( + get_upload_db() + .get_sqla_engine() + .execute(f"SELECT * from {PARQUET_UPLOAD_TABLE}") + .fetchall() + ) + assert data == [("john", 1), ("paul", 2), ("max", 3), ("bob", 4)]