diff --git a/README.md b/README.md index 03fa723df..de1f6e5b4 100644 --- a/README.md +++ b/README.md @@ -242,6 +242,7 @@ If you would like us to include your company’s name and/or logo in the README - [Infomach](https://www.infomach.com.br/) - [Informa Markets](https://www.informamarkets.com/en/home.html) [[@mateusmorato]](http://github.com/mateusmorato) - [LINE TV](https://www.linetv.tw/) [[@bryanyang0528](https://github.com/bryanyang0528)] +- [LogicalCube](https://www.logicalcube.com) [[@zolabud](https://github.com/zolabud)] - [Magnataur](https://magnataur.com) [[@brianmingus2](https://github.com/brianmingus2)] - [M4U](https://www.m4u.com.br/) [[@Thiago-Dantas](https://github.com/Thiago-Dantas)] - [NBCUniversal](https://www.nbcuniversal.com/) [[@vibe](https://github.com/vibe)] diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index cbee27385..a0e500917 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -33,11 +33,12 @@ def _create_iceberg_table( encryption: Optional[str] = None, kms_key: Optional[str] = None, boto3_session: Optional[boto3.Session] = None, + dtype: Optional[Dict[str, str]] = None, ) -> None: if not path: raise exceptions.InvalidArgumentValue("Must specify table location to create the table.") - columns_types, _ = catalog.extract_athena_types(df=df, index=index) + columns_types, _ = catalog.extract_athena_types(df=df, index=index, dtype=dtype) cols_str: str = ", ".join([f"{k} {v}" for k, v in columns_types.items()]) partition_cols_str: str = f"PARTITIONED BY ({', '.join([col for col in partition_cols])})" if partition_cols else "" table_properties_str: str = ( @@ -86,6 +87,7 @@ def to_iceberg( boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, Any]] = None, additional_table_properties: Optional[Dict[str, Any]] = None, + dtype: Optional[Dict[str, str]] = None, ) -> None: """ Insert into Athena Iceberg table using INSERT INTO ... SELECT. Will create Iceberg table if it does not exist. @@ -133,6 +135,10 @@ def to_iceberg( e.g. additional_table_properties={'write_target_data_file_size_bytes': '536870912'} https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html#querying-iceberg-table-properties + dtype: Optional[Dict[str, str]] + Dictionary of columns names and Athena/Glue types to be casted. + Useful when you have columns with undetermined or mixed data types. + e.g. {'col name': 'bigint', 'col2 name': 'int'} Returns ------- @@ -192,6 +198,7 @@ def to_iceberg( encryption=encryption, kms_key=kms_key, boto3_session=boto3_session, + dtype=dtype, ) # Create temporary external table, write the results @@ -203,6 +210,7 @@ def to_iceberg( table=temp_table, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, + dtype=dtype, ) # Insert into iceberg table diff --git a/awswrangler/cleanrooms/_read.py b/awswrangler/cleanrooms/_read.py index afea1bcec..71a255601 100644 --- a/awswrangler/cleanrooms/_read.py +++ b/awswrangler/cleanrooms/_read.py @@ -101,9 +101,9 @@ def read_sql_query( )["protectedQuery"]["id"] _logger.debug("query_id: %s", query_id) - path: str = wait_query(membership_id=membership_id, query_id=query_id)["protectedQuery"]["result"]["output"]["s3"][ - "location" - ] + path: str = wait_query(membership_id=membership_id, query_id=query_id, boto3_session=boto3_session)[ + "protectedQuery" + ]["result"]["output"]["s3"]["location"] _logger.debug("path: %s", path) chunked: Union[bool, int] = False if chunksize is None else chunksize diff --git a/awswrangler/redshift/_utils.py b/awswrangler/redshift/_utils.py index 0c6c39ad5..9d38f18d0 100644 --- a/awswrangler/redshift/_utils.py +++ b/awswrangler/redshift/_utils.py @@ -349,7 +349,9 @@ def _create_table( # pylint: disable=too-many-locals,too-many-arguments,too-man primary_keys=primary_keys, ) cols_str: str = "".join([f'"{k}" {v},\n' for k, v in redshift_types.items()])[:-2] - primary_keys_str: str = f",\nPRIMARY KEY ({', '.join(primary_keys)})" if primary_keys else "" + primary_keys_str: str = ( + ",\nPRIMARY KEY ({})".format(", ".join('"' + pk + '"' for pk in primary_keys)) if primary_keys else "" + ) distkey_str: str = f"\nDISTKEY({distkey})" if distkey and diststyle == "KEY" else "" sortkey_str: str = f"\n{sortstyle} SORTKEY({','.join(sortkey)})" if sortkey else "" sql = ( diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index 8153ec22f..8c8ad8886 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -1516,3 +1516,48 @@ def test_athena_to_iceberg(path, path2, glue_database, glue_table, partition_col ) assert df.equals(df_out) + + +def test_to_iceberg_cast(path, path2, glue_table, glue_database): + df = pd.DataFrame( + { + "c0": [ + datetime.date(4000, 1, 1), + datetime.datetime(2000, 1, 1, 10), + "2020", + "2020-01", + 1, + None, + pd.NA, + pd.NaT, + np.nan, + np.inf, + ] + } + ) + df_expected = pd.DataFrame( + { + "c0": [ + datetime.date(1970, 1, 1), + datetime.date(2000, 1, 1), + datetime.date(2020, 1, 1), + datetime.date(2020, 1, 1), + datetime.date(4000, 1, 1), + None, + None, + None, + None, + None, + ] + } + ) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + dtype={"c0": "date"}, + ) + df2 = wr.athena.read_sql_table(database=glue_database, table=glue_table, ctas_approach=False) + assert pandas_equals(df_expected, df2.sort_values("c0").reset_index(drop=True)) diff --git a/tests/unit/test_redshift.py b/tests/unit/test_redshift.py index 07131add4..39ba0b0ff 100644 --- a/tests/unit/test_redshift.py +++ b/tests/unit/test_redshift.py @@ -17,6 +17,7 @@ from awswrangler import _utils from .._utils import ( + assert_pandas_equals, dt, ensure_data_types, ensure_data_types_category, @@ -63,6 +64,21 @@ def test_to_sql_simple(redshift_table: str, redshift_con: redshift_connector.Con wr.redshift.to_sql(df, redshift_con, redshift_table, "public", "overwrite", overwrite_method, True) +def test_to_sql_with_hyphenated_primary_key( + redshift_table: str, + redshift_con: redshift_connector.Connection, +) -> None: + schema = "public" + df = pd.DataFrame({"id-col": [1, 2, 3], "other-col": ["foo", "boo", "bar"]}) + df["id-col"] = df["id-col"].astype("Int64") + df["other-col"] = df["other-col"].astype("string") + wr.redshift.to_sql( + df=df, con=redshift_con, table=redshift_table, schema=schema, mode="overwrite", primary_keys=["id-col"] + ) + df_out = wr.redshift.read_sql_table(table=redshift_table, con=redshift_con, schema=schema) + assert_pandas_equals(df, df_out) + + def test_empty_table(redshift_table: str, redshift_con: redshift_connector.Connection) -> None: with redshift_con.cursor() as cursor: cursor.execute(f"CREATE TABLE public.{redshift_table}(c0 integer not null, c1 integer, primary key(c0));")