Skip to content

Commit

Permalink
feat: Redshift UNLOAD - add parallel parameter (#2507)
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
  • Loading branch information
kukushking authored Oct 31, 2023
1 parent f26e869 commit 80a4b99
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 1 deletion.
16 changes: 15 additions & 1 deletion awswrangler/redshift/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def unload_to_files(
aws_session_token: Optional[str] = None,
region: Optional[str] = None,
unload_format: Optional[Literal["CSV", "PARQUET"]] = None,
parallel: bool = True,
max_file_size: Optional[float] = None,
kms_key_id: Optional[str] = None,
manifest: bool = False,
Expand Down Expand Up @@ -265,6 +266,11 @@ def unload_to_files(
unload_format: str, optional
Format of the unloaded S3 objects from the query.
Valid values: "CSV", "PARQUET". Case sensitive. Defaults to PARQUET.
parallel: bool
Whether to unload to multiple files in parallel. Defaults to True.
By default, UNLOAD writes data in parallel to multiple files, according to the number of
slices in the cluster. If parallel is False, UNLOAD writes to one or more data files serially,
sorted absolutely according to the ORDER BY clause, if one is used.
max_file_size : float, optional
Specifies the maximum size (MB) of files that UNLOAD creates in Amazon S3.
Specify a decimal value between 5.0 MB and 6200.0 MB. If None, the default
Expand Down Expand Up @@ -305,6 +311,7 @@ def unload_to_files(
partition_str: str = f"\nPARTITION BY ({','.join(partition_cols)})" if partition_cols else ""
manifest_str: str = "\nmanifest" if manifest is True else ""
region_str: str = f"\nREGION AS '{region}'" if region is not None else ""
parallel_str: str = "\nPARALLEL ON" if parallel else "\nPARALLEL OFF"
if not max_file_size and engine.get() == EngineEnum.RAY:
_logger.warning(
"Unload `MAXFILESIZE` is not specified. "
Expand All @@ -330,7 +337,7 @@ def unload_to_files(
f"TO '{path}'\n"
f"{auth_str}"
"ALLOWOVERWRITE\n"
"PARALLEL ON\n"
f"{parallel_str}\n"
f"FORMAT {format_str}\n"
"ENCRYPTED"
f"{kms_key_id_str}"
Expand Down Expand Up @@ -362,6 +369,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
chunked: Union[bool, int] = False,
keep_files: bool = False,
parallel: bool = True,
use_threads: Union[bool, int] = True,
boto3_session: Optional[boto3.Session] = None,
s3_additional_kwargs: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -433,6 +441,11 @@ def unload(
used to encrypt data files on Amazon S3.
keep_files : bool
Should keep stage files?
parallel: bool
Whether to unload to multiple files in parallel. Defaults to True.
By default, UNLOAD writes data in parallel to multiple files, according to the number of
slices in the cluster. If parallel is False, UNLOAD writes to one or more data files serially,
sorted absolutely according to the ORDER BY clause, if one is used.
dtype_backend: str, optional
Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
nullable dtypes are used for all dtypes that have a nullable implementation when
Expand Down Expand Up @@ -487,6 +500,7 @@ def unload(
max_file_size=max_file_size,
kms_key_id=kms_key_id,
manifest=False,
parallel=parallel,
boto3_session=boto3_session,
)
if chunked is False:
Expand Down
5 changes: 5 additions & 0 deletions awswrangler/redshift/_read.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def unload_to_files(
aws_session_token: Optional[str] = ...,
region: Optional[str] = ...,
unload_format: Optional[Literal["CSV", "PARQUET"]] = ...,
parallel: bool = ...,
max_file_size: Optional[float] = ...,
kms_key_id: Optional[str] = ...,
manifest: bool = ...,
Expand All @@ -121,6 +122,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = ...,
chunked: Literal[False] = ...,
keep_files: bool = ...,
parallel: bool = ...,
use_threads: Union[bool, int] = ...,
boto3_session: Optional[boto3.Session] = ...,
s3_additional_kwargs: Optional[Dict[str, str]] = ...,
Expand All @@ -142,6 +144,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = ...,
chunked: Literal[True],
keep_files: bool = ...,
parallel: bool = ...,
use_threads: Union[bool, int] = ...,
boto3_session: Optional[boto3.Session] = ...,
s3_additional_kwargs: Optional[Dict[str, str]] = ...,
Expand All @@ -163,6 +166,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = ...,
chunked: bool,
keep_files: bool = ...,
parallel: bool = ...,
use_threads: Union[bool, int] = ...,
boto3_session: Optional[boto3.Session] = ...,
s3_additional_kwargs: Optional[Dict[str, str]] = ...,
Expand All @@ -184,6 +188,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = ...,
chunked: Union[bool, int],
keep_files: bool = ...,
parallel: bool = ...,
use_threads: Union[bool, int] = ...,
boto3_session: Optional[boto3.Session] = ...,
s3_additional_kwargs: Optional[Dict[str, str]] = ...,
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,13 +649,15 @@ def test_spectrum_decimal_cast(
[None, {"ServerSideEncryption": "AES256"}, {"ServerSideEncryption": "aws:kms", "SSEKMSKeyId": None}],
)
@pytest.mark.parametrize("use_threads", [True, False])
@pytest.mark.parametrize("parallel", [True, False])
def test_copy_unload_kms(
path: str,
redshift_table: str,
redshift_con: redshift_connector.Connection,
databases_parameters: Dict[str, Any],
kms_key_id: str,
use_threads: bool,
parallel: bool,
s3_additional_kwargs: Optional[Dict[str, Any]],
) -> None:
df = pd.DataFrame({"id": [1, 2, 3]})
Expand All @@ -678,6 +680,7 @@ def test_copy_unload_kms(
iam_role=databases_parameters["redshift"]["role"],
path=path,
keep_files=False,
parallel=parallel,
use_threads=use_threads,
s3_additional_kwargs=s3_additional_kwargs,
)
Expand Down

0 comments on commit 80a4b99

Please sign in to comment.