Skip to content

Commit

Permalink
fix: respect order of columns in to_iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
jaidisido committed Apr 9, 2024
1 parent b59008d commit 9e93edc
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
2 changes: 1 addition & 1 deletion awswrangler/athena/_write_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ def to_iceberg(
"""
else:
sql_statement = f"""
INSERT INTO "{database}"."{table}"
INSERT INTO "{database}"."{table}" ({', '.join([f'"{x}"' for x in df.columns])})
SELECT {', '.join([f'"{x}"' for x in df.columns])}
FROM "{database}"."{temp_table}"
"""
Expand Down
52 changes: 52 additions & 0 deletions tests/unit/test_athena_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,58 @@ def test_athena_to_iceberg_merge_into(path: str, path2: str, glue_database: str,
assert_pandas_equals(df_expected, df_out)


def test_athena_to_iceberg_cols_order(path: str, path2: str, glue_database: str, glue_table: str) -> None:
kwargs = {
"database": glue_database,
"table": glue_table,
"table_location": path,
"temp_path": path2,
"partition_cols": ["partition"],
"schema_evolution": True,
"keep_files": False,
}

df = pd.DataFrame(
{
"partition": [1, 1, 2, 2],
"column1": ["X", "Y", "Z", "Z"],
"column2": ["A", "B", "C", "D"],
}
)
wr.athena.to_iceberg(df=df, mode="overwrite_partitions", **kwargs)

# Adding a column
df_new_col_last = pd.DataFrame(
{
"partition": [2, 2],
"column1": ["Z", "Z"],
"column2": ["C", "D"],
"new_column": [True, False],
}
)
wr.athena.to_iceberg(df=df_new_col_last, mode="overwrite_partitions", **kwargs)

# Switching the order of columns
df_new_col_not_last = pd.DataFrame(
{
"partition": [2, 2],
"column1": ["Z", "Z"],
"new_column": [True, False],
"column2": ["C", "D"],
}
)
wr.athena.to_iceberg(df=df_new_col_not_last, mode="overwrite_partitions", **kwargs)

df_out = wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
assert len(df) == len(df_out)
assert len(df.columns) + 1 == len(df_out.columns)


def test_athena_to_iceberg_empty_df_error(
path: str,
path2: str,
Expand Down

0 comments on commit 9e93edc

Please sign in to comment.