From e61f85e87f403e7c010d9ab295514fd4c20248c9 Mon Sep 17 00:00:00 2001 From: Maximilian Speicher Date: Sat, 24 Jul 2021 09:16:27 +0200 Subject: [PATCH 1/2] Use botocore for s3 kwargs --- awswrangler/mysql.py | 10 ++--- awswrangler/s3/_fs.py | 88 +++++++------------------------------------ 2 files changed, 18 insertions(+), 80 deletions(-) diff --git a/awswrangler/mysql.py b/awswrangler/mysql.py index 289375cb4..fefe2c801 100644 --- a/awswrangler/mysql.py +++ b/awswrangler/mysql.py @@ -18,7 +18,7 @@ _logger: logging.Logger = logging.getLogger(__name__) -def _validate_connection(con: pymysql.connections.Connection) -> None: +def _validate_connection(con: "pymysql.connections.Connection[Any]") -> None: if not isinstance(con, pymysql.connections.Connection): raise exceptions.InvalidConnection( "Invalid 'conn' argument, please pass a " @@ -77,7 +77,7 @@ def connect( read_timeout: Optional[int] = None, write_timeout: Optional[int] = None, connect_timeout: int = 10, -) -> pymysql.connections.Connection: +) -> "pymysql.connections.Connection[Any]": """Return a pymysql connection from a Glue Catalog Connection or Secrets Manager. https://pymysql.readthedocs.io @@ -150,7 +150,7 @@ def connect( def read_sql_query( sql: str, - con: pymysql.connections.Connection, + con: "pymysql.connections.Connection[Any]", index_col: Optional[Union[str, List[str]]] = None, params: Optional[Union[List[Any], Tuple[Any, ...], Dict[Any, Any]]] = None, chunksize: Optional[int] = None, @@ -206,7 +206,7 @@ def read_sql_query( def read_sql_table( table: str, - con: pymysql.connections.Connection, + con: "pymysql.connections.Connection[Any]", schema: Optional[str] = None, index_col: Optional[Union[str, List[str]]] = None, params: Optional[Union[List[Any], Tuple[Any, ...], Dict[Any, Any]]] = None, @@ -268,7 +268,7 @@ def read_sql_table( @apply_configs def to_sql( df: pd.DataFrame, - con: pymysql.connections.Connection, + con: "pymysql.connections.Connection[Any]", table: str, schema: str, mode: str = "append", diff --git a/awswrangler/s3/_fs.py b/awswrangler/s3/_fs.py index 32c759e93..183ff7f30 100644 --- a/awswrangler/s3/_fs.py +++ b/awswrangler/s3/_fs.py @@ -8,10 +8,12 @@ import socket from contextlib import contextmanager from errno import ESPIPE -from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Set, Tuple, Union, cast +from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Tuple, Union, cast import boto3 from botocore.exceptions import ReadTimeoutError +from botocore.loaders import Loader +from botocore.model import ServiceModel from awswrangler import _utils, exceptions from awswrangler._config import apply_configs @@ -24,84 +26,20 @@ _MIN_WRITE_BLOCK: int = 5_242_880 # 5 MB (5 * 2**20) _MIN_PARALLEL_READ_BLOCK: int = 5_242_880 # 5 MB (5 * 2**20) -BOTOCORE_ACCEPTED_KWARGS: Dict[str, Set[str]] = { - "get_object": { - "SSECustomerAlgorithm", - "SSECustomerKey", - "RequestPayer", - "ExpectedBucketOwner", - "VersionId", - }, - "copy_object": { - "ACL", - "Metadata", - "ServerSideEncryption", - "StorageClass", - "SSECustomerAlgorithm", - "SSECustomerKey", - "SSEKMSKeyId", - "SSEKMSEncryptionContext", - "Tagging", - "RequestPayer", - "ExpectedBucketOwner", - "CopySource", - }, - "create_multipart_upload": { - "ACL", - "Metadata", - "ServerSideEncryption", - "StorageClass", - "SSECustomerAlgorithm", - "SSECustomerKey", - "SSEKMSKeyId", - "SSEKMSEncryptionContext", - "Tagging", - "RequestPayer", - "ExpectedBucketOwner", - }, - "upload_part": { - "SSECustomerAlgorithm", - "SSECustomerKey", - "RequestPayer", - "ExpectedBucketOwner", - }, - "complete_multipart_upload": { - "RequestPayer", - "ExpectedBucketOwner", - }, - "put_object": { - "ACL", - "Metadata", - "ServerSideEncryption", - "StorageClass", - "SSECustomerAlgorithm", - "SSECustomerKey", - "SSEKMSKeyId", - "SSEKMSEncryptionContext", - "Tagging", - "RequestPayer", - "ExpectedBucketOwner", - }, - "list_objects_v2": { - "RequestPayer", - "ExpectedBucketOwner", - }, - "delete_objects": { - "RequestPayer", - "ExpectedBucketOwner", - "Objects", - }, - "head_object": { - "RequestPayer", - "ExpectedBucketOwner", - "VersionId", - }, -} +_BOTOCORE_LOADER = Loader() +_S3_JSON_MODEL = _BOTOCORE_LOADER.load_service_model(service_name="s3", type_name="service-2") +_S3_SERVICE_MODEL = ServiceModel(_S3_JSON_MODEL, service_name="s3") + + +def _snake_to_camel_case(s: str) -> str: + return "".join(c.title() for c in s.split("_")) def get_botocore_valid_kwargs(function_name: str, s3_additional_kwargs: Dict[str, Any]) -> Dict[str, Any]: """Filter and keep only the valid botocore key arguments.""" - return {k: v for k, v in s3_additional_kwargs.items() if k in BOTOCORE_ACCEPTED_KWARGS[function_name]} + s3_operation_model = _S3_SERVICE_MODEL.operation_model(_snake_to_camel_case(function_name)) + allowed_kwargs = s3_operation_model.input_shape.members.keys() # pylint: disable=E1101 + return {k: v for k, v in s3_additional_kwargs.items() if k in allowed_kwargs} def _fetch_range( From 01aa506ffd02da56b0d5fb928fca9b623c96cc10 Mon Sep 17 00:00:00 2001 From: Maximilian Speicher Date: Mon, 26 Jul 2021 09:43:24 +0200 Subject: [PATCH 2/2] Remove docs on valid s3 kwargs --- awswrangler/athena/_read.py | 4 ++-- awswrangler/athena/_utils.py | 4 ++-- awswrangler/redshift.py | 8 ++------ awswrangler/s3/_copy.py | 8 ++------ awswrangler/s3/_delete.py | 2 +- awswrangler/s3/_describe.py | 4 ++-- awswrangler/s3/_list.py | 8 +++++--- awswrangler/s3/_write_excel.py | 4 +--- awswrangler/s3/_write_parquet.py | 8 ++------ awswrangler/s3/_write_text.py | 8 ++------ 10 files changed, 21 insertions(+), 37 deletions(-) diff --git a/awswrangler/athena/_read.py b/awswrangler/athena/_read.py index dc08d2504..1229bba88 100644 --- a/awswrangler/athena/_read.py +++ b/awswrangler/athena/_read.py @@ -779,7 +779,7 @@ def read_sql_query( The dict needs to contain the information in the form {'name': 'value'} and the SQL query needs to contain `:name;`. Note that for varchar columns and similar, you must surround the value in single quotes. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} Returns @@ -1043,7 +1043,7 @@ def read_sql_table( data_source : str, optional Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} Returns diff --git a/awswrangler/athena/_utils.py b/awswrangler/athena/_utils.py index edb88d2bf..a9a17acc9 100644 --- a/awswrangler/athena/_utils.py +++ b/awswrangler/athena/_utils.py @@ -595,7 +595,7 @@ def describe_table( kms_key : str, optional For SSE-KMS and CSE-KMS , this is the KMS key ARN or ID. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -666,7 +666,7 @@ def show_create_table( kms_key : str, optional For SSE-KMS and CSE-KMS , this is the KMS key ARN or ID. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. diff --git a/awswrangler/redshift.py b/awswrangler/redshift.py index 3ab21d19d..b4e55eadd 100644 --- a/awswrangler/redshift.py +++ b/awswrangler/redshift.py @@ -1253,9 +1253,7 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs: - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} Returns @@ -1452,9 +1450,7 @@ def copy( # pylint: disable=too-many-arguments boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs: - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} max_rows_by_file : int Max number of rows in each file. diff --git a/awswrangler/s3/_copy.py b/awswrangler/s3/_copy.py index c7307cef7..ef983f2e0 100644 --- a/awswrangler/s3/_copy.py +++ b/awswrangler/s3/_copy.py @@ -85,9 +85,7 @@ def merge_datasets( boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} Returns @@ -189,9 +187,7 @@ def copy_objects( boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} Returns diff --git a/awswrangler/s3/_delete.py b/awswrangler/s3/_delete.py index 4d84666a8..bf9ee3ca8 100644 --- a/awswrangler/s3/_delete.py +++ b/awswrangler/s3/_delete.py @@ -121,7 +121,7 @@ def delete_objects( Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. diff --git a/awswrangler/s3/_describe.py b/awswrangler/s3/_describe.py index 9a28d8f64..a4670bb4b 100644 --- a/awswrangler/s3/_describe.py +++ b/awswrangler/s3/_describe.py @@ -100,7 +100,7 @@ def describe_objects( Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -197,7 +197,7 @@ def size_objects( True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. diff --git a/awswrangler/s3/_list.py b/awswrangler/s3/_list.py index 43e09eab9..1232c67ef 100644 --- a/awswrangler/s3/_list.py +++ b/awswrangler/s3/_list.py @@ -149,10 +149,12 @@ def does_object_exist( path: str S3 path (e.g. s3://bucket/key). s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. + version_id: str, optional + Specific version of the object that should exist. Returns ------- @@ -216,7 +218,7 @@ def list_directories( path : str S3 path (e.g. s3://bucket/prefix). s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -286,7 +288,7 @@ def list_objects( ignore_empty: bool Ignore files with 0 bytes. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'RequestPayer': 'requester'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. diff --git a/awswrangler/s3/_write_excel.py b/awswrangler/s3/_write_excel.py index af3b7fd4a..b6910a5c5 100644 --- a/awswrangler/s3/_write_excel.py +++ b/awswrangler/s3/_write_excel.py @@ -46,9 +46,7 @@ def to_excel( boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} use_threads : bool True to enable concurrent requests, False to disable multiple threads. diff --git a/awswrangler/s3/_write_parquet.py b/awswrangler/s3/_write_parquet.py index 520e961b1..3f61e582b 100644 --- a/awswrangler/s3/_write_parquet.py +++ b/awswrangler/s3/_write_parquet.py @@ -256,9 +256,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} sanitize_columns : bool True to sanitize columns names (using `wr.catalog.sanitize_table_name` and `wr.catalog.sanitize_column_name`) @@ -743,9 +741,7 @@ def store_parquet_metadata( # pylint: disable=too-many-arguments https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html (e.g. {'col_name': '1', 'col2_name': '2'}) s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. diff --git a/awswrangler/s3/_write_text.py b/awswrangler/s3/_write_text.py index 619ebb617..3d352ca06 100644 --- a/awswrangler/s3/_write_text.py +++ b/awswrangler/s3/_write_text.py @@ -152,9 +152,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} sanitize_columns : bool True to sanitize columns names or False to keep it as is. @@ -601,9 +599,7 @@ def to_json( boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. s3_additional_kwargs : Optional[Dict[str, Any]] - Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", - "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", - "RequestPayer", "ExpectedBucketOwner". + Forwarded to botocore requests. e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} use_threads : bool True to enable concurrent requests, False to disable multiple threads.