Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add parquet upload #14449

Merged
merged 32 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4cc378a
allow csv upload to accept parquet file
exemplary-citizen May 3, 2021
c44fcee
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen May 20, 2021
968c048
fix mypy
exemplary-citizen May 20, 2021
c23b1ec
fix if statement
exemplary-citizen May 21, 2021
97bea75
add test for specificying columns in CSV upload
exemplary-citizen May 21, 2021
2f8482b
clean up test
exemplary-citizen May 21, 2021
77b381f
change order in test
exemplary-citizen May 21, 2021
f8b0dfc
fix failures
exemplary-citizen May 21, 2021
b40a3e2
upload parquet to seperate table in test
exemplary-citizen May 24, 2021
f2911b5
fix error message
exemplary-citizen May 25, 2021
dd889b0
fix mypy again
exemplary-citizen May 25, 2021
cfdd3be
rename other extensions to columnar
exemplary-citizen Jun 4, 2021
5e063bd
add new form for columnar upload
exemplary-citizen Jun 6, 2021
4c74594
add support for zip files
exemplary-citizen Jun 6, 2021
9096faf
undo csv form changes except usecols
exemplary-citizen Jun 7, 2021
98c75ba
add more tests for zip
exemplary-citizen Jun 7, 2021
d92a290
isort & black
exemplary-citizen Jun 7, 2021
3501fac
Merge branch 'master' into upload_parquet
exemplary-citizen Jun 7, 2021
f209999
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen Jun 11, 2021
10d6229
pylint
exemplary-citizen Jun 15, 2021
35eaea6
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen Jun 15, 2021
10e825d
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen Aug 2, 2021
658656d
fix trailing space
exemplary-citizen Aug 3, 2021
0f1816b
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen Aug 3, 2021
7b1b53b
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen Aug 5, 2021
b885b38
address more review comments
exemplary-citizen Aug 23, 2021
6bcc0d9
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen Aug 23, 2021
60ae1fe
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen Aug 26, 2021
5b5eb74
pylint
exemplary-citizen Aug 26, 2021
df2930f
black
exemplary-citizen Aug 26, 2021
9b4d35b
Merge branch 'master' of https://github.com/apache/superset into uplo…
exemplary-citizen Aug 31, 2021
de3509e
resolve remaining issues
exemplary-citizen Aug 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ def _try_json_readsha( # pylint: disable=unused-argument
# Allowed format types for upload on Database view
EXCEL_EXTENSIONS = {"xlsx", "xls"}
CSV_EXTENSIONS = {"csv", "tsv", "txt"}
ALLOWED_EXTENSIONS = {*EXCEL_EXTENSIONS, *CSV_EXTENSIONS}
OTHER_EXTENSIONS = {"parquet"}
ALLOWED_EXTENSIONS = {*EXCEL_EXTENSIONS, *CSV_EXTENSIONS, *OTHER_EXTENSIONS}

# CSV Options: key/value pairs that will be passed as argument to DataFrame.to_csv
# method.
Expand Down
13 changes: 12 additions & 1 deletion superset/views/database/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def at_least_one_schema_is_allowed(database: Database) -> bool:
validators=[
FileRequired(),
FileAllowed(
config["ALLOWED_EXTENSIONS"].intersection(config["CSV_EXTENSIONS"]),
config["ALLOWED_EXTENSIONS"].intersection(
config["CSV_EXTENSIONS"].union(config["OTHER_EXTENSIONS"])
),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add the union to the error message below.

_(
"Only the following file extensions are allowed: "
"%(allowed_extensions)s",
Expand Down Expand Up @@ -163,6 +165,15 @@ def at_least_one_schema_is_allowed(database: Database) -> bool:
_("Mangle Duplicate Columns"),
description=_('Specify duplicate columns as "X.0, X.1".'),
)
usecols = JsonListField(
_("Use Columns"),
default=None,
description=_(
"Json list of the column names that should be read. "
"If not None, only these columns will be read from the file."
),
validators=[Optional()],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide a screenshot of the updated form UI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a screenshot to the summary above

Comment on lines +172 to +180
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@villebro do you still want to keep the Use Columns feature in the csv form? if so, should we keep it in this PR or move it to a separate one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's keep this, I think it's a great addition 👍

skipinitialspace = BooleanField(
_("Skip Initial Space"), description=_("Skip spaces after delimiter.")
)
Expand Down
48 changes: 28 additions & 20 deletions superset/views/database/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,32 @@ def form_get(self, form: CsvToDatabaseForm) -> None:
def form_post(self, form: CsvToDatabaseForm) -> Response:
database = form.con.data
csv_table = Table(table=form.name.data, schema=form.schema.data)
file_type = form.csv_file.data.filename.split(".")[-1]
if file_type == "parquet":
read = pd.read_parquet
kwargs = {
"columns": form.usecols.data,
}
else:
read = pd.read_csv
kwargs = {
"chunksize": 1000,
"encoding": "utf-8",
"header": form.header.data if form.header.data else 0,
"index_col": form.index_col.data,
"infer_datetime_format": form.infer_datetime_format.data,
"iterator": True,
"keep_default_na": not form.null_values.data,
"mangle_dupe_cols": form.mangle_dupe_cols.data,
"usecols": form.usecols.data,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to change the behavior of the existing CSV upload functionality by specifying columns. Can you add some tests around this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a scenario to test_import_csv that tests uploading a CSV with specific columns

"na_values": form.null_values.data if form.null_values.data else None,
"nrows": form.nrows.data,
"parse_dates": form.parse_dates.data,
"sep": form.sep.data,
"skip_blank_lines": form.skip_blank_lines.data,
"skipinitialspace": form.skipinitialspace.data,
"skiprows": form.skiprows.data,
}

if not schema_allows_csv_upload(database, csv_table.schema):
message = _(
Expand All @@ -151,26 +177,8 @@ def form_post(self, form: CsvToDatabaseForm) -> Response:
return redirect("/csvtodatabaseview/form")

try:
df = pd.concat(
pd.read_csv(
chunksize=1000,
encoding="utf-8",
filepath_or_buffer=form.csv_file.data,
header=form.header.data if form.header.data else 0,
index_col=form.index_col.data,
infer_datetime_format=form.infer_datetime_format.data,
iterator=True,
keep_default_na=not form.null_values.data,
mangle_dupe_cols=form.mangle_dupe_cols.data,
na_values=form.null_values.data if form.null_values.data else None,
nrows=form.nrows.data,
parse_dates=form.parse_dates.data,
sep=form.sep.data,
skip_blank_lines=form.skip_blank_lines.data,
skipinitialspace=form.skipinitialspace.data,
skiprows=form.skiprows.data,
)
)
chunks = read(form.csv_file.data, **kwargs)
df = pd.concat(chunks) if isinstance(chunks, list) else chunks

database = (
db.session.query(models.Database)
Expand Down
51 changes: 51 additions & 0 deletions tests/csv_upload_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
CSV_FILENAME1 = "testCSV1.csv"
CSV_FILENAME2 = "testCSV2.csv"
EXCEL_FILENAME = "testExcel.xlsx"
PARQUET_FILENAME = "testParquet.parquet"

EXCEL_UPLOAD_TABLE = "excel_upload"
CSV_UPLOAD_TABLE = "csv_upload"
Expand Down Expand Up @@ -90,6 +91,12 @@ def create_csv_files():
os.remove(CSV_FILENAME2)


def create_parquet_files():
pd.DataFrame({"a": ["john", "paul"], "b": [1, 2]}).to_parquet(PARQUET_FILENAME)
yield
os.remove(PARQUET_FILENAME)


@pytest.fixture()
def create_excel_files():
pd.DataFrame({"a": ["john", "paul"], "b": [1, 2]}).to_excel(EXCEL_FILENAME)
Expand Down Expand Up @@ -328,3 +335,47 @@ def test_import_excel(setup_csv_upload, create_excel_files):
.fetchall()
)
assert data == [(0, "john", 1), (1, "paul", 2)]


@mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3)
def test_import_parquet(setup_csv_upload, create_parquet_files):
if utils.backend() == "hive":
pytest.skip("Hive doesn't allow parquet upload.")

success_msg = (
f'CSV file "{PARQUET_FILENAME}" uploaded to table "{CSV_UPLOAD_TABLE}"'
)

# initial upload with fail mode
resp = upload_csv(PARQUET_FILENAME, CSV_UPLOAD_TABLE)
assert success_msg in resp

# upload again with fail mode; should fail
fail_msg = (
f'Unable to upload CSV file "{PARQUET_FILENAME}" to table "{CSV_UPLOAD_TABLE}"'
)
resp = upload_csv(PARQUET_FILENAME, CSV_UPLOAD_TABLE)
assert fail_msg in resp

if utils.backend() != "hive":
# upload again with append mode
resp = upload_csv(
PARQUET_FILENAME, CSV_UPLOAD_TABLE, extra={"if_exists": "append"}
)
assert success_msg in resp

# upload again with replace mode
resp = upload_csv(
PARQUET_FILENAME, CSV_UPLOAD_TABLE, extra={"if_exists": "replace"}
)
assert success_msg in resp

# make sure that john and empty string are replaced with None
data = (
get_upload_db()
.get_sqla_engine()
.execute(f"SELECT * from {CSV_UPLOAD_TABLE}")
.fetchall()
)
print(data)
assert data == [(0, "john", 1), (1, "paul", 2)]