Skip to content

Commit

Permalink
Merge branch 'main' into upgrade-ray-2.30
Browse files Browse the repository at this point in the history
  • Loading branch information
jaidisido authored Jun 25, 2024
2 parents 2190bac + a474296 commit 96840d8
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 10 deletions.
4 changes: 2 additions & 2 deletions awswrangler/athena/_executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def start_query_execution(
If cached results are valid, awswrangler ignores the `ctas_approach`, `s3_output`, `encryption`, `kms_key`,
`keep_files` and `ctas_temp_table_name` params.
If reading cached data fails for any reason, execution falls back to the usual query run path.
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
data_source : str, optional
Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default.
Expand Down Expand Up @@ -211,7 +211,7 @@ def wait_query(
Athena query execution ID.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
Returns
Expand Down
6 changes: 3 additions & 3 deletions awswrangler/athena/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ def get_query_results(
Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame.
Valid values include "split_blocks", "self_destruct", "ignore_metadata".
e.g. pyarrow_additional_kwargs={'split_blocks': True}.
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
Returns
Expand Down Expand Up @@ -960,7 +960,7 @@ def read_sql_query(
If reading cached data fails for any reason, execution falls back to the usual query run path.
data_source : str, optional
Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default.
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
params: Dict[str, any] | List[str], optional
Parameters that will be used for constructing the SQL query.
Expand Down Expand Up @@ -1426,7 +1426,7 @@ def unload(
- ``named``
- ``qmark``
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
Returns
Expand Down
8 changes: 4 additions & 4 deletions awswrangler/athena/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ def repair_table(
None, 'SSE_S3', 'SSE_KMS', 'CSE_KMS'.
kms_key : str, optional
For SSE-KMS and CSE-KMS , this is the KMS key ARN or ID.
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
Expand Down Expand Up @@ -582,7 +582,7 @@ def describe_table(
None, 'SSE_S3', 'SSE_KMS', 'CSE_KMS'.
kms_key : str, optional
For SSE-KMS and CSE-KMS , this is the KMS key ARN or ID.
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
s3_additional_kwargs : dict[str, Any], optional
Forwarded to botocore requests.
Expand Down Expand Up @@ -700,7 +700,7 @@ def create_ctas_table(
Recommended for memory restricted environments.
wait : bool, default False
Whether to wait for the query to finish and return a dictionary with the Query metadata.
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
execution_params: List[str], optional [DEPRECATED]
A list of values for the parameters that are used in the SQL query.
Expand Down Expand Up @@ -912,7 +912,7 @@ def show_create_table(
None, 'SSE_S3', 'SSE_KMS', 'CSE_KMS'.
kms_key : str, optional
For SSE-KMS and CSE-KMS , this is the KMS key ARN or ID.
athena_query_wait_polling_delay: float, default: 0.25 seconds
athena_query_wait_polling_delay: float, default: 1.0 seconds
Interval in seconds for how often the function will check if the Athena query has completed.
s3_additional_kwargs: dict[str, Any]
Forwarded to botocore requests.
Expand Down
4 changes: 3 additions & 1 deletion awswrangler/athena/_write_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,9 @@ def to_iceberg(
sql_statement = f"""
MERGE INTO "{database}"."{table}" target
USING "{database}"."{temp_table}" source
ON {' AND '.join([f'target."{x}" = source."{x}"' for x in merge_cols])}
ON {' AND '.join([
f'(target."{x}" = source."{x}" OR (target."{x}" IS NULL AND source."{x}" IS NULL))'
for x in merge_cols])}
{match_condition}
WHEN NOT MATCHED THEN
INSERT ({', '.join([f'"{x}"' for x in df.columns])})
Expand Down
68 changes: 68 additions & 0 deletions tests/unit/test_athena_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,74 @@ def test_athena_to_iceberg_merge_into(path: str, path2: str, glue_database: str,
assert_pandas_equals(df_expected, df_out)


def test_athena_to_iceberg_merge_into_nulls(path: str, path2: str, glue_database: str, glue_table: str) -> None:
df = pd.DataFrame(
{
"col1": ["a", "a", "a", np.nan],
"col2": [0.0, 1.1, np.nan, 2.2],
"action": ["insert", "insert", "insert", "insert"],
}
)
df["col1"] = df["col1"].astype("string")
df["col2"] = df["col2"].astype("float64")
df["action"] = df["action"].astype("string")

wr.athena.to_iceberg(
df=df,
database=glue_database,
table=glue_table,
table_location=path,
temp_path=path2,
keep_files=False,
)

# Perform MERGE INTO
df2 = pd.DataFrame(
{
"col1": ["a", "a", np.nan, "b"],
"col2": [1.1, np.nan, 2.2, 3.3],
"action": ["update", "update", "update", "insert"],
}
)
df2["col1"] = df2["col1"].astype("string")
df2["col2"] = df2["col2"].astype("float64")
df2["action"] = df2["action"].astype("string")

wr.athena.to_iceberg(
df=df2,
database=glue_database,
table=glue_table,
table_location=path,
temp_path=path2,
keep_files=False,
merge_cols=["col1", "col2"],
)

# Expected output
df_expected = pd.DataFrame(
{
"col1": ["a", "a", "a", np.nan, "b"],
"col2": [0.0, 1.1, np.nan, 2.2, 3.3],
"action": ["insert", "update", "update", "update", "insert"],
}
)
df_expected["col1"] = df_expected["col1"].astype("string")
df_expected["col2"] = df_expected["col2"].astype("float64")
df_expected["action"] = df_expected["action"].astype("string")

df_out = wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)

assert_pandas_equals(
df_out.sort_values(df_out.columns.to_list()).reset_index(drop=True),
df_expected.sort_values(df_expected.columns.to_list()).reset_index(drop=True),
)


def test_athena_to_iceberg_merge_into_ignore(path: str, path2: str, glue_database: str, glue_table: str) -> None:
df = pd.DataFrame({"title": ["Dune", "Fargo"], "year": ["1984", "1996"], "gross": [35_000_000, 60_000_000]})
df["title"] = df["title"].astype("string")
Expand Down

0 comments on commit 96840d8

Please sign in to comment.