Skip to content

Commit

Permalink
Merge branch 'main' into fix/issue-2371-dynamodb-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
kukushking authored Jul 6, 2023
2 parents bf718c6 + 6eb8462 commit 0b8a3d6
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ If you would like us to include your company’s name and/or logo in the README
- [Infomach](https://www.infomach.com.br/)
- [Informa Markets](https://www.informamarkets.com/en/home.html) [[@mateusmorato]](http://github.com/mateusmorato)
- [LINE TV](https://www.linetv.tw/) [[@bryanyang0528](https://github.com/bryanyang0528)]
- [LogicalCube](https://www.logicalcube.com) [[@zolabud](https://github.com/zolabud)]
- [Magnataur](https://magnataur.com) [[@brianmingus2](https://github.com/brianmingus2)]
- [M4U](https://www.m4u.com.br/) [[@Thiago-Dantas](https://github.com/Thiago-Dantas)]
- [NBCUniversal](https://www.nbcuniversal.com/) [[@vibe](https://github.com/vibe)]
Expand Down
10 changes: 9 additions & 1 deletion awswrangler/athena/_write_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ def _create_iceberg_table(
encryption: Optional[str] = None,
kms_key: Optional[str] = None,
boto3_session: Optional[boto3.Session] = None,
dtype: Optional[Dict[str, str]] = None,
) -> None:
if not path:
raise exceptions.InvalidArgumentValue("Must specify table location to create the table.")

columns_types, _ = catalog.extract_athena_types(df=df, index=index)
columns_types, _ = catalog.extract_athena_types(df=df, index=index, dtype=dtype)
cols_str: str = ", ".join([f"{k} {v}" for k, v in columns_types.items()])
partition_cols_str: str = f"PARTITIONED BY ({', '.join([col for col in partition_cols])})" if partition_cols else ""
table_properties_str: str = (
Expand Down Expand Up @@ -86,6 +87,7 @@ def to_iceberg(
boto3_session: Optional[boto3.Session] = None,
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
additional_table_properties: Optional[Dict[str, Any]] = None,
dtype: Optional[Dict[str, str]] = None,
) -> None:
"""
Insert into Athena Iceberg table using INSERT INTO ... SELECT. Will create Iceberg table if it does not exist.
Expand Down Expand Up @@ -133,6 +135,10 @@ def to_iceberg(
e.g. additional_table_properties={'write_target_data_file_size_bytes': '536870912'}
https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html#querying-iceberg-table-properties
dtype: Optional[Dict[str, str]]
Dictionary of columns names and Athena/Glue types to be casted.
Useful when you have columns with undetermined or mixed data types.
e.g. {'col name': 'bigint', 'col2 name': 'int'}
Returns
-------
Expand Down Expand Up @@ -192,6 +198,7 @@ def to_iceberg(
encryption=encryption,
kms_key=kms_key,
boto3_session=boto3_session,
dtype=dtype,
)

# Create temporary external table, write the results
Expand All @@ -203,6 +210,7 @@ def to_iceberg(
table=temp_table,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
dtype=dtype,
)

# Insert into iceberg table
Expand Down
6 changes: 3 additions & 3 deletions awswrangler/cleanrooms/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ def read_sql_query(
)["protectedQuery"]["id"]

_logger.debug("query_id: %s", query_id)
path: str = wait_query(membership_id=membership_id, query_id=query_id)["protectedQuery"]["result"]["output"]["s3"][
"location"
]
path: str = wait_query(membership_id=membership_id, query_id=query_id, boto3_session=boto3_session)[
"protectedQuery"
]["result"]["output"]["s3"]["location"]

_logger.debug("path: %s", path)
chunked: Union[bool, int] = False if chunksize is None else chunksize
Expand Down
4 changes: 3 additions & 1 deletion awswrangler/redshift/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,9 @@ def _create_table( # pylint: disable=too-many-locals,too-many-arguments,too-man
primary_keys=primary_keys,
)
cols_str: str = "".join([f'"{k}" {v},\n' for k, v in redshift_types.items()])[:-2]
primary_keys_str: str = f",\nPRIMARY KEY ({', '.join(primary_keys)})" if primary_keys else ""
primary_keys_str: str = (
",\nPRIMARY KEY ({})".format(", ".join('"' + pk + '"' for pk in primary_keys)) if primary_keys else ""
)
distkey_str: str = f"\nDISTKEY({distkey})" if distkey and diststyle == "KEY" else ""
sortkey_str: str = f"\n{sortstyle} SORTKEY({','.join(sortkey)})" if sortkey else ""
sql = (
Expand Down
45 changes: 45 additions & 0 deletions tests/unit/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,3 +1516,48 @@ def test_athena_to_iceberg(path, path2, glue_database, glue_table, partition_col
)

assert df.equals(df_out)


def test_to_iceberg_cast(path, path2, glue_table, glue_database):
df = pd.DataFrame(
{
"c0": [
datetime.date(4000, 1, 1),
datetime.datetime(2000, 1, 1, 10),
"2020",
"2020-01",
1,
None,
pd.NA,
pd.NaT,
np.nan,
np.inf,
]
}
)
df_expected = pd.DataFrame(
{
"c0": [
datetime.date(1970, 1, 1),
datetime.date(2000, 1, 1),
datetime.date(2020, 1, 1),
datetime.date(2020, 1, 1),
datetime.date(4000, 1, 1),
None,
None,
None,
None,
None,
]
}
)
wr.athena.to_iceberg(
df=df,
database=glue_database,
table=glue_table,
table_location=path,
temp_path=path2,
dtype={"c0": "date"},
)
df2 = wr.athena.read_sql_table(database=glue_database, table=glue_table, ctas_approach=False)
assert pandas_equals(df_expected, df2.sort_values("c0").reset_index(drop=True))
16 changes: 16 additions & 0 deletions tests/unit/test_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from awswrangler import _utils

from .._utils import (
assert_pandas_equals,
dt,
ensure_data_types,
ensure_data_types_category,
Expand Down Expand Up @@ -63,6 +64,21 @@ def test_to_sql_simple(redshift_table: str, redshift_con: redshift_connector.Con
wr.redshift.to_sql(df, redshift_con, redshift_table, "public", "overwrite", overwrite_method, True)


def test_to_sql_with_hyphenated_primary_key(
redshift_table: str,
redshift_con: redshift_connector.Connection,
) -> None:
schema = "public"
df = pd.DataFrame({"id-col": [1, 2, 3], "other-col": ["foo", "boo", "bar"]})
df["id-col"] = df["id-col"].astype("Int64")
df["other-col"] = df["other-col"].astype("string")
wr.redshift.to_sql(
df=df, con=redshift_con, table=redshift_table, schema=schema, mode="overwrite", primary_keys=["id-col"]
)
df_out = wr.redshift.read_sql_table(table=redshift_table, con=redshift_con, schema=schema)
assert_pandas_equals(df, df_out)


def test_empty_table(redshift_table: str, redshift_con: redshift_connector.Connection) -> None:
with redshift_con.cursor() as cursor:
cursor.execute(f"CREATE TABLE public.{redshift_table}(c0 integer not null, c1 integer, primary key(c0));")
Expand Down

0 comments on commit 0b8a3d6

Please sign in to comment.