From 2d835c133e4ea285874ede34e474c4707d03d02c Mon Sep 17 00:00:00 2001 From: "Bala.FA" Date: Wed, 17 Jul 2024 12:00:09 +0530 Subject: [PATCH] Error out for invalid object name with '.' and '..' Signed-off-by: Bala.FA --- minio/api.py | 36 ++++++++++++++++++------------------ minio/helpers.py | 10 ++++++++++ 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/minio/api.py b/minio/api.py index 56af05781..025834044 100644 --- a/minio/api.py +++ b/minio/api.py @@ -65,7 +65,7 @@ from .helpers import (_DEFAULT_USER_AGENT, MAX_MULTIPART_COUNT, MAX_MULTIPART_OBJECT_SIZE, MAX_PART_SIZE, MIN_PART_SIZE, BaseURL, DictType, ObjectWriteResult, ProgressType, - ThreadPool, check_bucket_name, check_non_empty_string, + ThreadPool, check_bucket_name, check_object_name, check_sse, check_ssec, genheaders, get_part_info, headers_to_strings, is_valid_policy_type, makedirs, md5sum_hash, queryencode, read_part_data, sha256_hash) @@ -594,7 +594,7 @@ def select_object_content( print(result.stats()) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) if not isinstance(request, SelectRequest): raise ValueError("request must be SelectRequest type") body = marshal(request) @@ -1107,7 +1107,7 @@ def fget_object( ) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) if os.path.isdir(file_path): raise ValueError(f"file {file_path} is a directory") @@ -1227,7 +1227,7 @@ def get_object( response.release_conn() """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) check_ssec(ssec) headers = cast(DictType, ssec.headers() if ssec else {}) @@ -1315,7 +1315,7 @@ def copy_object( print(result.object_name, result.version_id) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) if not isinstance(source, CopySource): raise ValueError("source must be CopySource type") check_sse(sse) @@ -1535,7 +1535,7 @@ def compose_object( print(result.object_name, result.version_id) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) if not isinstance(sources, (list, tuple)) or not sources: raise ValueError("sources must be non-empty list or tuple type") i = 0 @@ -1808,7 +1808,7 @@ def put_object( ) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) check_sse(sse) if tags is not None and not isinstance(tags, Tags): raise ValueError("tags must be Tags type") @@ -2031,7 +2031,7 @@ def stat_object( """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) check_ssec(ssec) headers = cast(DictType, ssec.headers() if ssec else {}) @@ -2089,7 +2089,7 @@ def remove_object( ) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) self._execute( "DELETE", bucket_name, @@ -2252,7 +2252,7 @@ def get_presigned_url( print(url) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) if expires.total_seconds() < 1 or expires.total_seconds() > 604800: raise ValueError("expires must be between 1 second to 7 days") @@ -2636,7 +2636,7 @@ def delete_object_tags( client.delete_object_tags("my-bucket", "my-object") """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) query_params = {"versionId": version_id} if version_id else {} query_params["tagging"] = "" self._execute( @@ -2664,7 +2664,7 @@ def get_object_tags( tags = client.get_object_tags("my-bucket", "my-object") """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) query_params = {"versionId": version_id} if version_id else {} query_params["tagging"] = "" try: @@ -2703,7 +2703,7 @@ def set_object_tags( client.set_object_tags("my-bucket", "my-object", tags) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) if not isinstance(tags, Tags): raise ValueError("tags must be Tags type") body = marshal(Tagging(tags)) @@ -2735,7 +2735,7 @@ def enable_object_legal_hold( client.enable_object_legal_hold("my-bucket", "my-object") """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) body = marshal(LegalHold(True)) query_params = {"versionId": version_id} if version_id else {} query_params["legal-hold"] = "" @@ -2765,7 +2765,7 @@ def disable_object_legal_hold( client.disable_object_legal_hold("my-bucket", "my-object") """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) body = marshal(LegalHold(False)) query_params = {"versionId": version_id} if version_id else {} query_params["legal-hold"] = "" @@ -2798,7 +2798,7 @@ def is_object_legal_hold_enabled( print("legal hold is not enabled on my-object") """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) query_params = {"versionId": version_id} if version_id else {} query_params["legal-hold"] = "" try: @@ -2889,7 +2889,7 @@ def get_object_retention( config = client.get_object_retention("my-bucket", "my-object") """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) query_params = {"versionId": version_id} if version_id else {} query_params["retention"] = "" try: @@ -2927,7 +2927,7 @@ def set_object_retention( client.set_object_retention("my-bucket", "my-object", config) """ check_bucket_name(bucket_name, s3_check=self._base_url.is_aws_host) - check_non_empty_string(object_name) + check_object_name(object_name) if not isinstance(config, Retention): raise ValueError("config must be Retention type") body = marshal(config) diff --git a/minio/helpers.py b/minio/helpers.py index c7f6c9010..2fb11ea04 100644 --- a/minio/helpers.py +++ b/minio/helpers.py @@ -267,6 +267,16 @@ def check_non_empty_string(string: str | bytes): raise TypeError() from exc +def check_object_name(object_name: str): + """Check whether given object name is valid.""" + check_non_empty_string(object_name) + tokens = object_name.split("/") + if "." in tokens or ".." in tokens: + raise ValueError( + "object name with '.' or '..' path segment is not supported", + ) + + def is_valid_policy_type(policy: str | bytes): """ Validate if policy is type str