From 9e93edc795511a459beba44371c3693c5f3b7537 Mon Sep 17 00:00:00 2001 From: Abdel Jaidi Date: Tue, 9 Apr 2024 10:23:54 +0100 Subject: [PATCH] fix: respect order of columns in to_iceberg --- awswrangler/athena/_write_iceberg.py | 2 +- tests/unit/test_athena_iceberg.py | 52 ++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 50c1b3c69..9a6a5b4ba 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -493,7 +493,7 @@ def to_iceberg( """ else: sql_statement = f""" - INSERT INTO "{database}"."{table}" + INSERT INTO "{database}"."{table}" ({', '.join([f'"{x}"' for x in df.columns])}) SELECT {', '.join([f'"{x}"' for x in df.columns])} FROM "{database}"."{temp_table}" """ diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 032e53755..26b3ca26c 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -650,6 +650,58 @@ def test_athena_to_iceberg_merge_into(path: str, path2: str, glue_database: str, assert_pandas_equals(df_expected, df_out) +def test_athena_to_iceberg_cols_order(path: str, path2: str, glue_database: str, glue_table: str) -> None: + kwargs = { + "database": glue_database, + "table": glue_table, + "table_location": path, + "temp_path": path2, + "partition_cols": ["partition"], + "schema_evolution": True, + "keep_files": False, + } + + df = pd.DataFrame( + { + "partition": [1, 1, 2, 2], + "column1": ["X", "Y", "Z", "Z"], + "column2": ["A", "B", "C", "D"], + } + ) + wr.athena.to_iceberg(df=df, mode="overwrite_partitions", **kwargs) + + # Adding a column + df_new_col_last = pd.DataFrame( + { + "partition": [2, 2], + "column1": ["Z", "Z"], + "column2": ["C", "D"], + "new_column": [True, False], + } + ) + wr.athena.to_iceberg(df=df_new_col_last, mode="overwrite_partitions", **kwargs) + + # Switching the order of columns + df_new_col_not_last = pd.DataFrame( + { + "partition": [2, 2], + "column1": ["Z", "Z"], + "new_column": [True, False], + "column2": ["C", "D"], + } + ) + wr.athena.to_iceberg(df=df_new_col_not_last, mode="overwrite_partitions", **kwargs) + + df_out = wr.athena.read_sql_query( + sql=f'SELECT * FROM "{glue_table}"', + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + assert len(df) == len(df_out) + assert len(df.columns) + 1 == len(df_out.columns) + + def test_athena_to_iceberg_empty_df_error( path: str, path2: str,