From c37956b494a535429ff881171db0b4f6db1dc547 Mon Sep 17 00:00:00 2001 From: Piotr Borowski <44824791+pbrw@users.noreply.github.com> Date: Wed, 1 Nov 2023 11:29:52 +0100 Subject: [PATCH] Refactor MinioAdminClient using HTTP client (#1291) Signed-off-by: Bala.FA Co-authored-by: Bala FA --- .github/workflows/ci.yml | 2 +- Makefile | 2 +- minio/__init__.py | 2 +- minio/api.py | 17 +- minio/crypto.py | 146 ++++++++ minio/datatypes.py | 244 ++++++++++++++ minio/error.py | 14 + minio/helpers.py | 75 +++-- minio/minioadmin.py | 683 +++++++++++++++++++++++++++++--------- setup.py | 2 +- tests/unit/crypto_test.py | 35 ++ 11 files changed, 1011 insertions(+), 211 deletions(-) create mode 100644 minio/crypto.py create mode 100644 tests/unit/crypto_test.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 33b636bb3..328209484 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,7 +45,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip setuptools - pip install certifi urllib3 mock pytest + pip install certifi urllib3 mock pytest argon2-cffi pycryptodome - name: Run check if Ubuntu if: matrix.os == 'ubuntu-latest' run: | diff --git a/Makefile b/Makefile index ee466b8f5..fc1375653 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ default: tests getdeps: @echo "Installing required dependencies" - @pip install --user --upgrade autopep8 certifi pytest pylint urllib3 + @pip install --user --upgrade autopep8 certifi pytest pylint urllib3 argon2-cffi pycryptodome check: getdeps @echo "Running checks" diff --git a/minio/__init__.py b/minio/__init__.py index b64eda3e6..d7cc1dbe3 100644 --- a/minio/__init__.py +++ b/minio/__init__.py @@ -33,7 +33,7 @@ __title__ = "minio-py" __author__ = "MinIO, Inc." -__version__ = "7.1.18" +__version__ = "7.2.0" __license__ = "Apache 2.0" __copyright__ = "Copyright 2015, 2016, 2017, 2018, 2019, 2020 MinIO, Inc." diff --git a/minio/api.py b/minio/api.py index 56692595d..2dcffa854 100644 --- a/minio/api.py +++ b/minio/api.py @@ -25,7 +25,6 @@ import itertools import os -import platform import tarfile from datetime import timedelta from io import BytesIO @@ -50,12 +49,13 @@ parse_copy_object, parse_list_objects) from .deleteobjects import DeleteError, DeleteRequest, DeleteResult from .error import InvalidResponseError, S3Error, ServerError -from .helpers import (MAX_MULTIPART_COUNT, MAX_MULTIPART_OBJECT_SIZE, - MAX_PART_SIZE, MIN_PART_SIZE, BaseURL, ObjectWriteResult, - ThreadPool, check_bucket_name, check_non_empty_string, - check_sse, check_ssec, genheaders, get_part_info, +from .helpers import (_DEFAULT_USER_AGENT, MAX_MULTIPART_COUNT, + MAX_MULTIPART_OBJECT_SIZE, MAX_PART_SIZE, MIN_PART_SIZE, + BaseURL, ObjectWriteResult, ThreadPool, + check_bucket_name, check_non_empty_string, check_sse, + check_ssec, genheaders, get_part_info, headers_to_strings, is_valid_policy_type, makedirs, - md5sum_hash, read_part_data, sha256_hash, queryencode) + md5sum_hash, queryencode, read_part_data, sha256_hash) from .legalhold import LegalHold from .lifecycleconfig import LifecycleConfig from .notificationconfig import NotificationConfig @@ -70,11 +70,6 @@ from .versioningconfig import VersioningConfig from .xml import Element, SubElement, findtext, getbytes, marshal, unmarshal -_DEFAULT_USER_AGENT = ( - f"MinIO ({platform.system()}; {platform.machine()}) " - f"{__title__}/{__version__}" -) - class Minio: # pylint: disable=too-many-public-methods """ diff --git a/minio/crypto.py b/minio/crypto.py new file mode 100644 index 000000000..cafdad96d --- /dev/null +++ b/minio/crypto.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C) +# 2015, 2016, 2017 MinIO, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=too-many-lines,disable=too-many-branches,too-many-statements +# pylint: disable=too-many-arguments + +"""Cryptography to read and write encrypted MinIO Admin payload""" + +import os + +from argon2.low_level import Type, hash_secret_raw +from Crypto.Cipher import AES, ChaCha20_Poly1305 + +_NONCE_LEN = 8 +_SALT_LEN = 32 + + +class AesGcmCipherProvider: + """AES-GCM cipher provider""" + @staticmethod + def get_cipher(key: bytes, nonce: bytes): + """Get cipher""" + return AES.new(key, AES.MODE_GCM, nonce) + + +class ChaCha20Poly1305CipherProvider: + """ChaCha20Poly1305 cipher provider""" + @staticmethod + def get_cipher(key: bytes, nonce: bytes): + """Get cipher""" + return ChaCha20_Poly1305.new(key=key, nonce=nonce) + + +def encrypt(payload: bytes, password: str) -> bytes: + """ + Encrypts data using AES-GCM using a 256-bit Argon2ID key. + To see the original implementation in Go, check out the madmin-go library + (https://github.com/minio/madmin-go/blob/main/encrypt.go#L38) + """ + cipher_provider = AesGcmCipherProvider() + nonce = os.urandom(_NONCE_LEN) + salt = os.urandom(_SALT_LEN) + + padded_nonce = [0] * (_NONCE_LEN + 4) + padded_nonce[:_NONCE_LEN] = nonce + + key = _generate_key(password.encode(), salt) + additional_data = _generate_additional_data( + cipher_provider, key, bytes(padded_nonce)) + + padded_nonce[8] = 0x01 + padded_nonce = bytes(padded_nonce) + + cipher = cipher_provider.get_cipher(key, padded_nonce) + cipher.update(additional_data) + encrypted_data, mac = cipher.encrypt_and_digest(payload) + + payload = salt + payload += bytes([0x00]) + payload += nonce + payload += encrypted_data + payload += mac + + return bytes(payload) + + +def decrypt(payload: bytes, password: str) -> bytes: + """ + Decrypts data using AES-GCM or ChaCha20Poly1305 using a + 256-bit Argon2ID key. To see the original implementation in Go, + check out the madmin-go library + (https://github.com/minio/madmin-go/blob/main/encrypt.go#L38) + """ + pos = 0 + salt = payload[pos:pos+_SALT_LEN] + pos += _SALT_LEN + + cipher_id = payload[pos] + if cipher_id == 0: + cipher_provider = AesGcmCipherProvider() + elif cipher_id == 1: + cipher_provider = ChaCha20Poly1305CipherProvider() + else: + return None + + pos += 1 + + nonce = payload[pos:pos+_NONCE_LEN] + pos += _NONCE_LEN + + encrypted_data = payload[pos:-16] + hmac_tag = payload[-16:] + + key = _generate_key(password.encode(), salt) + + padded_nonce = [0] * 12 + padded_nonce[:_NONCE_LEN] = nonce + + additional_data = _generate_additional_data( + cipher_provider, key, bytes(padded_nonce)) + padded_nonce[8] = 1 + + cipher = cipher_provider.get_cipher(key, bytes(padded_nonce)) + + cipher.update(additional_data) + decrypted_data = cipher.decrypt_and_verify(encrypted_data, hmac_tag) + + return decrypted_data + + +def _generate_additional_data(cipher_provider, key: bytes, + padded_nonce: bytes) -> bytes: + """Generate additional data""" + cipher = cipher_provider.get_cipher(key, padded_nonce) + tag = cipher.digest() + new_tag = [0] * 17 + new_tag[1:] = tag + new_tag[0] = 0x80 + return bytes(new_tag) + + +def _generate_key(password: bytes, salt: bytes) -> bytes: + """Generate 256-bit Argon2ID key""" + return hash_secret_raw( + secret=password, + salt=salt, + time_cost=1, + memory_cost=65536, + parallelism=4, + hash_len=32, + type=Type.ID, + version=19 + ) diff --git a/minio/datatypes.py b/minio/datatypes.py index 8f86bbb39..1d2842674 100644 --- a/minio/datatypes.py +++ b/minio/datatypes.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=too-many-lines + """ Response of ListBuckets, ListObjects, ListObjectsV2 and ListObjectVersions API. """ @@ -24,6 +26,7 @@ import datetime import json from collections import OrderedDict +from enum import Enum from urllib.parse import unquote_plus from xml.etree import ElementTree as ET @@ -850,3 +853,244 @@ def __enter__(self): def __exit__(self, exc_type, value, traceback): self._close_response() + + +class PeerSite: + """Represents a cluster/site to be added to the set of replicated sites.""" + + def __init__(self, name, endpoint, access_key, secret_key): + self._name = name + self._endpoint = endpoint + self._access_key = access_key + self._secret_key = secret_key + + def to_dict(self): + """Convert to dictionary.""" + return { + "name": self._name, + "endpoints": self._endpoint, + "accessKey": self._access_key, + "secretKey": self._secret_key, + } + + +class SiteReplicationStatusOptions: + """Represents site replication status options.""" + ENTITY_TYPE = Enum( + "ENTITY_TYPE", + { + "BUCKET": "bucket", + "POLICY": "policy", + "USER": "user", + "GROUP": "group", + }, + ) + + def __init__(self): + self._buckets = False + self._policies = False + self._users = False + self._groups = False + self._metrics = False + self._entity = None + self._entity_value = None + self._show_deleted = False + + @property + def buckets(self): + """Get buckets.""" + return self._buckets + + @buckets.setter + def buckets(self, value): + """Set buckets.""" + self._buckets = value + + @property + def policies(self): + """Get policies.""" + return self._policies + + @policies.setter + def policies(self, value): + """Set policies.""" + self._policies = value + + @property + def users(self): + """Get users.""" + return self._users + + @users.setter + def users(self, value): + """Set users.""" + self._users = value + + @property + def groups(self): + """Get groups.""" + return self._groups + + @groups.setter + def groups(self, value): + """Set groups.""" + self._groups = value + + @property + def metrics(self): + """Get metrics.""" + return self._metrics + + @metrics.setter + def metrics(self, value): + """Set metrics.""" + self._metrics = value + + @property + def entity(self): + """Get entity.""" + return self._entity + + @entity.setter + def entity(self, value): + """Set entity.""" + self._entity = value + + @property + def entity_value(self): + """Get entity value.""" + return self._entity_value + + @entity_value.setter + def entity_value(self, value): + """Set entity value.""" + self._entity_value = value + + @property + def show_deleted(self): + """Get show deleted.""" + return self._show_deleted + + @show_deleted.setter + def show_deleted(self, value): + """Set show deleted.""" + self._show_deleted = value + + def to_query_params(self): + """Convert this options to query parameters.""" + params = { + "buckets": str(self._buckets).lower(), + "policies": str(self._policies).lower(), + "users": str(self._users).lower(), + "groups": str(self._groups).lower(), + "metrics": str(self._metrics).lower(), + "showDeleted": str(self._show_deleted).lower(), + } + if self._entity and self._entity_value: + params["entityvalue"] = self._entity_value + params["entity"] = self._entity.value + return params + + +class PeerInfo: + """Site replication peer information.""" + + def __init__(self, deployment_id, endpoint, bucket_bandwidth_limit, + bucket_bandwidth_set): + self._deployment_id = deployment_id + self._endpoint = endpoint + self._name = None + self._sync_status = None + self._bucket_bandwidth_limit = bucket_bandwidth_limit + self._bucket_bandwidth_set = bucket_bandwidth_set + self._bucket_bandwidth_updated_at = None + + @property + def deployment_id(self): + """Get deployment ID.""" + return self._deployment_id + + @deployment_id.setter + def deployment_id(self, value): + """Set deployment ID.""" + self._deployment_id = value + + @property + def endpoint(self): + """Get endpoint.""" + return self._endpoint + + @endpoint.setter + def endpoint(self, value): + """Set endpoint.""" + self._endpoint = value + + @property + def name(self): + """Get name.""" + return self._name + + @name.setter + def name(self, value): + """Set name.""" + self._name = value + + @property + def sync_status(self): + """Get sync status.""" + return self._sync_status + + @sync_status.setter + def sync_status(self, value): + """Set sync status.""" + self._sync_status = value + + @property + def bucket_bandwidth_limit(self): + """Get bucket bandwidth limit.""" + return self._bucket_bandwidth_limit + + @bucket_bandwidth_limit.setter + def bucket_bandwidth_limit(self, value): + """Set bucket bandwidth limit.""" + self._bucket_bandwidth_limit = value + + @property + def bucket_bandwidth_set(self): + """Get bucket bandwidth set.""" + return self._bucket_bandwidth_set + + @bucket_bandwidth_set.setter + def bucket_bandwidth_set(self, value): + """Set bucket bandwidth set.""" + self._bucket_bandwidth_set = value + + @property + def bucket_bandwidth_updated_at(self): + """Get bucket bandwidth updated at.""" + return self._bucket_bandwidth_updated_at + + @bucket_bandwidth_updated_at.setter + def bucket_bandwidth_updated_at(self, value): + """Set bucket bandwidth updated at.""" + self._bucket_bandwidth_updated_at = value + + def to_dict(self): + """Converts peer information to dictionary.""" + data = { + "endpoint": self._endpoint, + "deploymentID": self._deployment_id, + "defaultbandwidth": { + "bandwidthLimitPerBucket": self._bucket_bandwidth_limit, + "set": self._bucket_bandwidth_set, + }, + } + if self._name: + data["name"] = self._name + if self._sync_status is not None: + data["sync"] = "enable" if self._sync_status else "disable" + if self._bucket_bandwidth_updated_at: + data["defaultbandwidth"]["updatedAt"] = to_iso8601utc( + self._bucket_bandwidth_updated_at, + ) + return data diff --git a/minio/error.py b/minio/error.py index f0fa86467..f73691324 100644 --- a/minio/error.py +++ b/minio/error.py @@ -144,3 +144,17 @@ def copy(self, code, message): self._bucket_name, self._object_name, ) + + +class MinioAdminException(Exception): + """Raised to indicate admin API execution error.""" + + def __init__(self, code, body): + self._code = code + self._body = body + super().__init__( + f"admin request failed; Status: {code}, Body: {body}", + ) + + def __reduce__(self): + return type(self), (self._code, self._body) diff --git a/minio/helpers.py b/minio/helpers.py index 3d369855d..a4617dced 100644 --- a/minio/helpers.py +++ b/minio/helpers.py @@ -23,15 +23,22 @@ import hashlib import math import os +import platform import re import urllib.parse from queue import Queue from threading import BoundedSemaphore, Thread +from . import __title__, __version__ from .sse import Sse, SseCustomerKey from .time import to_iso8601utc # Constants +_DEFAULT_USER_AGENT = ( + f"MinIO ({platform.system()}; {platform.machine()}) " + f"{__title__}/{__version__}" +) + MAX_MULTIPART_COUNT = 10000 # 10000 parts MAX_MULTIPART_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 * 1024 # 5TiB MAX_PART_SIZE = 5 * 1024 * 1024 * 1024 # 5GiB @@ -427,53 +434,61 @@ def _get_aws_info(host, https, region): "dualstack": dualstack}, None) -class BaseURL: - """Base URL of S3 endpoint.""" +def _parse_url(endpoint): + """Parse url string.""" - def __init__(self, endpoint, region): - url = urllib.parse.urlsplit(endpoint) - host = url.hostname + url = urllib.parse.urlsplit(endpoint) + host = url.hostname - if url.scheme.lower() not in ["http", "https"]: - raise ValueError("scheme in endpoint must be http or https") + if url.scheme.lower() not in ["http", "https"]: + raise ValueError("scheme in endpoint must be http or https") - url = url_replace(url, scheme=url.scheme.lower()) + url = url_replace(url, scheme=url.scheme.lower()) - if url.path and url.path != "/": - raise ValueError("path in endpoint is not allowed") + if url.path and url.path != "/": + raise ValueError("path in endpoint is not allowed") - url = url_replace(url, path="") + url = url_replace(url, path="") - if url.query: - raise ValueError("query in endpoint is not allowed") + if url.query: + raise ValueError("query in endpoint is not allowed") - if url.fragment: - raise ValueError("fragment in endpoint is not allowed") + if url.fragment: + raise ValueError("fragment in endpoint is not allowed") - try: - url.port - except ValueError as exc: - raise ValueError("invalid port") from exc + try: + url.port + except ValueError as exc: + raise ValueError("invalid port") from exc - if url.username: - raise ValueError("username in endpoint is not allowed") + if url.username: + raise ValueError("username in endpoint is not allowed") - if url.password: - raise ValueError("password in endpoint is not allowed") + if url.password: + raise ValueError("password in endpoint is not allowed") - if ( - (url.scheme == "http" and url.port == 80) or - (url.scheme == "https" and url.port == 443) - ): - url = url_replace(url, netloc=host) + if ( + (url.scheme == "http" and url.port == 80) or + (url.scheme == "https" and url.port == 443) + ): + url = url_replace(url, netloc=host) + + return url + + +class BaseURL: + """Base URL of S3 endpoint.""" + + def __init__(self, endpoint, region): + url = _parse_url(endpoint) if region and not _REGION_REGEX.match(region): raise ValueError(f"invalid region {region}") self._aws_info, region_in_host = _get_aws_info( - host, url.scheme == "https", region) + url.hostname, url.scheme == "https", region) self._virtual_style_flag = ( - self._aws_info or host.endswith("aliyuncs.com") + self._aws_info or url.hostname.endswith("aliyuncs.com") ) self._url = url self._region = region or region_in_host diff --git a/minio/minioadmin.py b/minio/minioadmin.py index d30dd6de5..19f5ba4f1 100644 --- a/minio/minioadmin.py +++ b/minio/minioadmin.py @@ -16,282 +16,633 @@ # pylint: disable=too-many-public-methods -"""MinIO Admin wrapper using MinIO Client (mc) tool.""" +"""MinIO Admin Client to perform MinIO administration operations.""" from __future__ import absolute_import import json -import subprocess +import os +from datetime import timedelta +from enum import Enum +from urllib.parse import urlunsplit + +import certifi +import urllib3 +from urllib3._collections import HTTPHeaderDict + +from minio.crypto import decrypt, encrypt + +from . import time +from .credentials.providers import Provider +from .error import MinioAdminException +from .helpers import (_DEFAULT_USER_AGENT, _REGION_REGEX, _parse_url, + headers_to_strings, queryencode, sha256_hash, + url_replace) +from .signer import sign_v4_s3 + +_COMMAND = Enum( + "Command", + { + "ADD_USER": "add-user", + "USER_INFO": "user-info", + "LIST_USERS": "list-users", + "REMOVE_USER": "remove-user", + "SET_USER_STATUS": "set-user-status", + "ADD_CANNED_POLICY": "add-canned-policy", + "SET_USER_OR_GROUP_POLICY": "set-user-or-group-policy", + "LIST_CANNED_POLICIES": "list-canned-policies", + "REMOVE_CANNED_POLICY": "remove-canned-policy", + "UNSET_USER_OR_GROUP_POLICY": "idp/builtin/policy/detach", + "CANNED_POLICY_INFO": "info-canned-policy", + "SET_BUCKET_QUOTA": "set-bucket-quota", + "GET_BUCKET_QUOTA": "get-bucket-quota", + "DATA_USAGE_INFO": "datausageinfo", + "ADD_UPDATE_REMOVE_GROUP": "update-group-members", + "SET_GROUP_STATUS": "set-group-status", + "GROUP_INFO": "group", + "LIST_GROUPS": "groups", + "INFO": "info", + "SERVICE": "service", + "UPDATE": "update", + "TOP_LOCKS": "top/locks", + "HELP_CONFIG": "help-config-kv", + "GET_CONFIG": "get-config-kv", + "SET_CONFIG": "set-config-kv", + "DELETE_CONFIG": "del-config-kv", + "LIST_CONFIG_HISTORY": "list-config-history-kv", + "RESOTRE_CONFIG_HISTORY": "restore-config-history-kv", + "START_PROFILE": "profile", + "CREATE_KMS_KEY": "kms/key/create", + "GET_KMS_KEY_STATUS": "kms/key/status", + "SITE_REPLICATION_ADD": "site-replication/add", + "SITE_REPLICATION_INFO": "site-replication/info", + "SITE_REPLICATION_STATUS": "site-replication/status", + "SITE_REPLICATION_EDIT": "site-replication/edit", + "SITE_REPLICATION_REMOVE": "site-replication/remove", + }, +) class MinioAdmin: - """MinIO Admin wrapper using MinIO Client (mc) tool.""" - - def __init__( - self, target, - binary_path=None, config_dir=None, ignore_cert_check=False, - timeout=None, env=None, - ): - self._target = target - self._timeout = timeout - self._env = env - self._base_args = [binary_path or "mc", "--json"] - if config_dir: - self._base_args += ["--config-dir", config_dir] - if ignore_cert_check: - self._base_args.append("--insecure") - self._base_args.append("admin") - - def _run(self, args, multiline=False): - """Execute mc command and return JSON output.""" - proc = subprocess.run( - self._base_args + args, - capture_output=True, - timeout=self._timeout, - env=self._env, - check=True, - text=True, - ) - if not proc.stdout: - return [] if multiline else {} - if multiline: - return [json.loads(line) for line in proc.stdout.splitlines()] - return json.loads(proc.stdout) + """Client to perform MinIO administration operations.""" + + def __init__(self, endpoint, + credentials, + region="", + secure=True, + cert_check=True, + http_client=None): + url = _parse_url(("https://" if secure else "http://") + endpoint) + if not isinstance(credentials, Provider): + raise ValueError("valid credentials must be provided") + if region and not _REGION_REGEX.match(region): + raise ValueError(f"invalid region {region}") + if http_client: + if not isinstance(http_client, urllib3.poolmanager.PoolManager): + raise ValueError( + "HTTP client should be instance of " + "`urllib3.poolmanager.PoolManager`" + ) + else: + timeout = timedelta(minutes=5).seconds + http_client = urllib3.PoolManager( + timeout=urllib3.util.Timeout(connect=timeout, read=timeout), + maxsize=10, + cert_reqs='CERT_REQUIRED' if cert_check else 'CERT_NONE', + ca_certs=os.environ.get('SSL_CERT_FILE') or certifi.where(), + retries=urllib3.Retry( + total=5, + backoff_factor=0.2, + status_forcelist=[500, 502, 503, 504] + ) + ) + + self._url = url + self._provider = credentials + self._region = region + self._secure = secure + self._cert_check = cert_check + self._http = http_client + self._user_agent = _DEFAULT_USER_AGENT + self._trace_stream = None + + def __del__(self): + self._http.clear() + + def _url_open(self, method, command, query_params=None, body=None): + """Execute HTTP request.""" + creds = self._provider.retrieve() + + url = url_replace(self._url, path="/minio/admin/v3/"+command.value) + query = [] + for key, values in sorted((query_params or {}).items()): + values = values if isinstance(values, (list, tuple)) else [values] + query += [ + f"{queryencode(key)}={queryencode(value)}" + for value in sorted(values) + ] + url = url_replace(url, query="&".join(query)) + + date = time.utcnow() + headers = { + "Host": url.netloc, + "User-Agent": self._user_agent, + "x-amz-date": time.to_amz_date(date), + "x-amz-content-sha256": sha256_hash(body), + "Content-Type": "application/octet-stream" + } + if creds.session_token: + headers["X-Amz-Security-Token"] = creds.session_token + if body: + headers["Content-Length"] = str(len(body)) + + headers = sign_v4_s3( + method, + url, + self._region, + headers, + creds, + headers.get("x-amz-content-sha256"), + date, + ) + + if self._trace_stream: + self._trace_stream.write("---------START-HTTP---------\n") + query = ("?" + url.query) if url.query else "" + self._trace_stream.write(f"{method} {url.path}{query} HTTP/1.1\n") + self._trace_stream.write( + headers_to_strings(headers, titled_key=True), + ) + self._trace_stream.write("\n") + if body is not None: + self._trace_stream.write("\n") + self._trace_stream.write( + body.decode() if isinstance(body, bytes) else str(body), + ) + self._trace_stream.write("\n") + self._trace_stream.write("\n") + + http_headers = HTTPHeaderDict() + for key, value in headers.items(): + if isinstance(value, (list, tuple)): + _ = [http_headers.add(key, val) for val in value] + else: + http_headers.add(key, value) + + response = self._http.urlopen( + method, + urlunsplit(url), + body=body, + headers=http_headers, + preload_content=True, + ) + + if self._trace_stream: + self._trace_stream.write(f"HTTP/1.1 {response.status}\n") + self._trace_stream.write( + headers_to_strings(response.headers), + ) + self._trace_stream.write("\n") + self._trace_stream.write("\n") + self._trace_stream.write(response.data.decode()) + self._trace_stream.write("\n") + self._trace_stream.write("----------END-HTTP----------\n") + + if response.status in [200, 204, 206]: + return response + + raise MinioAdminException(response.status, response.data.decode()) + + def set_app_info(self, app_name, app_version): + """ + Set your application name and version to user agent header. + + :param app_name: Application name. + :param app_version: Application version. + + Example:: + client.set_app_info('my_app', '1.0.2') + """ + if not (app_name and app_version): + raise ValueError("Application name/version cannot be empty.") + self._user_agent = f"{_DEFAULT_USER_AGENT} {app_name}/{app_version}" + + def trace_on(self, stream): + """ + Enable http trace. + + :param stream: Stream for writing HTTP call tracing. + """ + if not stream: + raise ValueError('Input stream for trace output is invalid.') + # Save new output stream. + self._trace_stream = stream + + def trace_off(self): + """ + Disable HTTP trace. + """ + self._trace_stream = None def service_restart(self): """Restart MinIO service.""" - return self._run(["service", "restart", self._target]) + response = self._url_open( + "POST", + _COMMAND.SERVICE, + query_params={"action": "restart"} + ) + return response.data.decode() def service_stop(self): """Stop MinIO service.""" - return self._run(["service", "stop", self._target]) + response = self._url_open( + "POST", + _COMMAND.SERVICE, + query_params={"action": "stop"} + ) + return response.data.decode() def update(self): """Update MinIO.""" - return self._run(["update", self._target]) + response = self._url_open( + "POST", + _COMMAND.UPDATE, + query_params={"updateURL": ""} + ) + return response.data.decode() def info(self): """Get MinIO server information.""" - return self._run(["info", self._target]) + response = self._url_open( + "GET", + _COMMAND.INFO, + ) + return response.data.decode() def user_add(self, access_key, secret_key): - """Add a new user.""" - return self._run(["user", "add", self._target, access_key, secret_key]) + """Create user with access and secret keys""" + body = json.dumps( + {"status": "enabled", "secretKey": secret_key}).encode() + response = self._url_open( + "PUT", + _COMMAND.ADD_USER, + query_params={"accessKey": access_key}, + body=encrypt(body, self._provider.retrieve().secret_key), + ) + return response.data.decode() def user_disable(self, access_key): """Disable user.""" - return self._run(["user", "disable", self._target, access_key]) + response = self._url_open( + "PUT", + _COMMAND.SET_USER_STATUS, + query_params={"accessKey": access_key, "status": "disabled"} + ) + return response.data.decode() def user_enable(self, access_key): """Enable user.""" - return self._run(["user", "enable", self._target, access_key]) + response = self._url_open( + "PUT", + _COMMAND.SET_USER_STATUS, + query_params={"accessKey": access_key, "status": "enabled"} + ) + return response.data.decode() def user_remove(self, access_key): - """Remove user.""" - return self._run(["user", "remove", self._target, access_key]) + """Delete user""" + response = self._url_open( + "DELETE", + _COMMAND.REMOVE_USER, + query_params={"accessKey": access_key}, + ) + return response.data.decode() def user_info(self, access_key): - """Get user information.""" - return self._run(["user", "info", self._target, access_key]) + """Get information about user""" + response = self._url_open( + "GET", + _COMMAND.USER_INFO, + query_params={"accessKey": access_key}, + ) + return response.data.decode() def user_list(self): - """List users.""" - return self._run(["user", "list", self._target], multiline=True) + """List all users""" + response = self._url_open("GET", _COMMAND.LIST_USERS) + plain_data = decrypt( + response.data, self._provider.retrieve().secret_key + ) + return plain_data.decode() def group_add(self, group_name, members): """Add users a new or existing group.""" - return self._run(["group", "add", self._target, group_name] + members) + body = json.dumps({ + "group": group_name, + "members": members, + "isRemove": False + }).encode() + response = self._url_open( + "PUT", + _COMMAND.ADD_UPDATE_REMOVE_GROUP, + body=body, + ) + return response.data.decode() def group_disable(self, group_name): """Disable group.""" - return self._run(["group", "disable", self._target, group_name]) + response = self._url_open( + "PUT", + _COMMAND.SET_GROUP_STATUS, + query_params={"group": group_name, "status": "disabled"} + ) + return response.data.decode() def group_enable(self, group_name): """Enable group.""" - return self._run(["group", "enable", self._target, group_name]) + response = self._url_open( + "PUT", + _COMMAND.SET_GROUP_STATUS, + query_params={"group": group_name, "status": "enabled"} + ) + return response.data.decode() def group_remove(self, group_name, members=None): """Remove group or members from a group.""" - return self._run( - ["group", "remove", self._target, group_name] + (members or []), + body = json.dumps({ + "group": group_name, + "members": members, + "isRemove": True + }).encode() + response = self._url_open( + "PUT", + _COMMAND.ADD_UPDATE_REMOVE_GROUP, + body=body, ) + return response.data.decode() def group_info(self, group_name): """Get group information.""" - return self._run(["group", "info", self._target, group_name]) + response = self._url_open( + "GET", + _COMMAND.GROUP_INFO, + query_params={"group": group_name}, + ) + return response.data.decode() def group_list(self): """List groups.""" - return self._run(["group", "list", self._target], multiline=True) + response = self._url_open("GET", _COMMAND.LIST_GROUPS) + return response.data.decode() def policy_add(self, policy_name, policy_file): """Add new policy.""" - return self._run( - ["policy", "create", self._target, policy_name, policy_file], - ) + with open(policy_file, encoding='utf-8') as file: + response = self._url_open( + "PUT", + _COMMAND.ADD_CANNED_POLICY, + query_params={"name": policy_name}, + body=file.read().encode(), + ) + return response.data.decode() def policy_remove(self, policy_name): """Remove policy.""" - return self._run(["policy", "remove", self._target, policy_name]) + response = self._url_open( + "DELETE", + _COMMAND.REMOVE_CANNED_POLICY, + query_params={"name": policy_name}, + ) + return response.data.decode() def policy_info(self, policy_name): """Get policy information.""" - return self._run(["policy", "info", self._target, policy_name]) + response = self._url_open( + "GET", + _COMMAND.CANNED_POLICY_INFO, + query_params={"name": policy_name}, + ) + return response.data.decode() def policy_list(self): """List policies.""" - return self._run(["policy", "list", self._target], multiline=True) + response = self._url_open("GET", _COMMAND.LIST_CANNED_POLICIES) + return response.data.decode() def policy_set(self, policy_name, user=None, group=None): """Set IAM policy on a user or group.""" if (user is not None) ^ (group is not None): - return self._run( - [ - "policy", "attach", self._target, policy_name, - "--user" if user else "--group", user or group, - ], + response = self._url_open( + "PUT", + _COMMAND.SET_USER_OR_GROUP_POLICY, + query_params={"userOrGroup": user or group, + "isGroup": "true" if group else "false", + "policyName": policy_name}, ) + return response.data.decode() raise ValueError("either user or group must be set") def policy_unset(self, policy_name, user=None, group=None): """Unset an IAM policy for a user or group.""" + body = json.dumps({ + "policies": [policy_name], + "group": group, + "user": user + }).encode() if (user is not None) ^ (group is not None): - return self._run( - [ - "policy", "detach", self._target, policy_name, - "--user" if user else "--group", user or group, - ], + response = self._url_open( + "POST", + _COMMAND.UNSET_USER_OR_GROUP_POLICY, + body=encrypt(body, self._provider.retrieve().secret_key), + ) + plain_data = decrypt( + response.data, self._provider.retrieve().secret_key ) + return plain_data.decode() raise ValueError("either user or group must be set") def config_get(self, key=None): """Get configuration parameters.""" - return self._run( - ["config", "get", self._target] + [key] if key else [], - key, + if not key: + response = self._url_open( + "GET", + _COMMAND.HELP_CONFIG, + query_params={"key": "", "subSys": ""}, + ) + return response.data.decode() + + response = self._url_open( + "GET", + _COMMAND.GET_CONFIG, + query_params={"key": key, "subSys": ""}, + ) + plain_text = decrypt( + response.data, self._provider.retrieve().secret_key ) + return plain_text.decode() - def config_set(self, key, config): + def config_set(self, key=None, config=None): """Set configuration parameters.""" - args = [name + "=" + value for name, value in config.items()] - return self._run(["config", "set", self._target, key] + args) + body = " ".join( + [key] + [f"{name}={value}" for name, value in config.items()] + ).encode() + response = self._url_open( + "PUT", + _COMMAND.SET_CONFIG, + body=encrypt(body, self._provider.retrieve().secret_key), + ) + return response.data.decode() def config_reset(self, key, name=None): """Reset configuration parameters.""" if name: key += ":" + name - return self._run(["config", "reset", self._target, key]) - - def config_remove(self, access_key): - """Remove config.""" - return self._run(["config", "remove", self._target, access_key]) + body = key.encode() + response = self._url_open( + "DELETE", + _COMMAND.DELETE_CONFIG, + body=encrypt(body, self._provider.retrieve().secret_key), + ) + return response.data.decode() def config_history(self): """Get historic configuration changes.""" - return self._run(["config", "history", self._target], multiline=True) + response = self._url_open( + "GET", + _COMMAND.LIST_CONFIG_HISTORY, + query_params={"count": "10"} + ) + plain_text = decrypt( + response.data, self._provider.retrieve().secret_key + ) + return plain_text.decode() def config_restore(self, restore_id): """Restore to a specific configuration history.""" - return self._run(["config", "restore", self._target, restore_id]) + response = self._url_open( + "PUT", + _COMMAND.RESOTRE_CONFIG_HISTORY, + query_params={"restoreId": restore_id} + ) + return response.data.decode() def profile_start(self, profilers=()): - """Start recording profile data.""" - args = ["profile", "start"] - if profilers: - args += ["--type", ",".join(profilers)] - args.append(self._target) - return self._run(args) - - def profile_stop(self): - """Stop and download profile data.""" - return self._run(["profile", "stop", self._target]) + """Runs a system profile""" + response = self._url_open( + "POST", + _COMMAND.START_PROFILE, + query_params={"profilerType;": ",".join(profilers)}, + ) + return response.data def top_locks(self): """Get a list of the 10 oldest locks on a MinIO cluster.""" - return self._run(["top", "locks", self._target], multiline=True) - - def prometheus_generate(self): - """Generate prometheus configuration.""" - return self._run(["prometheus", "generate", self._target]) + response = self._url_open( + "GET", + _COMMAND.TOP_LOCKS, + ) + return response.data.decode() def kms_key_create(self, key=None): """Create a new KMS master key.""" - return self._run( - [ - "kms", "key", "create", self._target, key - ] + ([key] if key else []), + response = self._url_open( + "POST", + _COMMAND.CREATE_KMS_KEY, + query_params={"key-id": key}, ) + return response.data.decode() def kms_key_status(self, key=None): """Get status information of a KMS master key.""" - return self._run( - [ - "kms", "key", "status", self._target, key - ] + ([key] if key else []), - ) - - def bucket_remote_add( - self, src_bucket, dest_url, - path=None, region=None, bandwidth=None, service=None, - ): - """Add a new remote target.""" - args = [ - "bucket", "remote", "add", self._target + "/" + src_bucket, - dest_url, "--service", service or "replication", - ] - if path: - args += ["--path", path] - if region: - args += ["--region", region] - if bandwidth: - args += ["--bandwidth", bandwidth] - return self._run(args) - - def bucket_remote_edit(self, src_bucket, dest_url, arn): - """Edit credentials of remote target.""" - return self._run( - [ - "bucket", "remote", "edit", self._target + "/" + src_bucket, - dest_url, "--arn", arn, - ], - ) - - def bucket_remote_list(self, src_bucket=None, service=None): - """List remote targets.""" - return self._run( - [ - "bucket", "remote", "ls", - self._target + ("/" + src_bucket if src_bucket else ""), - "--service", service or "replication", - ], - ) - - def bucket_remote_remove(self, src_bucket, arn): - """Remove configured remote target.""" - return self._run( - [ - "bucket", "remote", "rm", self._target + "/" + src_bucket, - "--arn", arn, - ], - ) - - def bucket_quota_set(self, bucket, fifo=None, hard=None): + response = self._url_open( + "GET", + _COMMAND.GET_KMS_KEY_STATUS, + query_params={"key-id": key or ""} + ) + return response.data.decode() + + def add_site_replication(self, peer_sites): + """Add peer sites to site replication.""" + body = json.dumps( + [peer_site.to_dict() for peer_site in peer_sites]).encode() + response = self._url_open( + "PUT", + _COMMAND.SITE_REPLICATION_ADD, + query_params={"api-version": "1"}, + body=encrypt(body, self._provider.retrieve().secret_key), + ) + return response.data.decode() + + def get_site_replication_info(self): + """Get site replication information.""" + response = self._url_open("GET", _COMMAND.SITE_REPLICATION_INFO) + return response.data.decode() + + def get_site_replication_status(self, options): + """Get site replication information.""" + response = self._url_open( + "GET", + _COMMAND.SITE_REPLICATION_STATUS, + query_params=options.to_query_params(), + ) + return response.data.decode() + + def edit_site_replication(self, peer_info): + """Edit site replication with given peer information.""" + body = json.dumps(peer_info.to_dict()).encode() + response = self._url_open( + "PUT", + _COMMAND.SITE_REPLICATION_EDIT, + query_params={"api-version": "1"}, + body=encrypt(body, self._provider.retrieve().secret_key), + ) + return response.data.decode() + + def remove_site_replication(self, sites=None, all_sites=False): + """Remove given sites or all sites from site replication.""" + data = {} + if all_sites: + data.update({"all": True}) + elif sites: + data.update({"sites": sites}) + else: + raise ValueError("either sites or all flag must be given") + body = json.dumps(data).encode() + response = self._url_open( + "PUT", + _COMMAND.SITE_REPLICATION_REMOVE, + query_params={"api-version": "1"}, + body=encrypt(body, self._provider.retrieve().secret_key), + ) + return response.data.decode() + + def bucket_quota_set(self, bucket, size): """Set bucket quota configuration.""" - if fifo is None and hard is None: - raise ValueError("fifo or hard must be set") - args = ["bucket", "quota", self._target + "/" + bucket] - if fifo: - args += ["--fifo", fifo] - if hard: - args += ["--hard", hard] - return self._run(args) + body = json.dumps({"quota": size, "quotatype": "hard"}).encode() + response = self._url_open( + "PUT", + _COMMAND.SET_BUCKET_QUOTA, + query_params={"bucket": bucket}, + body=body + ) + return response.data.decode() def bucket_quota_clear(self, bucket): """Clear bucket quota configuration.""" - return self._run( - ["bucket", "quota", self._target + "/" + bucket, "--clear"], + body = json.dumps({"quota": 0, "quotatype": "hard"}).encode() + response = self._url_open( + "PUT", + _COMMAND.SET_BUCKET_QUOTA, + query_params={"bucket": bucket}, + body=body ) + return response.data.decode() def bucket_quota_get(self, bucket): """Get bucket quota configuration.""" - return self._run(["bucket", "quota", self._target + "/" + bucket]) + response = self._url_open( + "GET", + _COMMAND.GET_BUCKET_QUOTA, + query_params={"bucket": bucket} + ) + return response.data.decode() diff --git a/setup.py b/setup.py index 544d68d00..28b01e8ea 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ long_description_content_type="text/markdown", package_dir={"minio": "minio"}, packages=["minio", "minio.credentials"], - install_requires=["certifi", "urllib3"], + install_requires=["certifi", "urllib3", "argon2-cffi", "pycryptodome"], tests_require=[], license="Apache-2.0", classifiers=[ diff --git a/tests/unit/crypto_test.py b/tests/unit/crypto_test.py new file mode 100644 index 000000000..367399a46 --- /dev/null +++ b/tests/unit/crypto_test.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# MinIO Python Library for Amazon S3 Compatible Cloud Storage, +# (C) 2015 MinIO, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase + +from minio.crypto import decrypt, encrypt + + +class CryptoTest(TestCase): + def test_correct(self): + secret = "topsecret" + plaintext = "Hello MinIO!" + encrypted = encrypt(plaintext.encode(), secret) + decrypted = decrypt(encrypted, secret).decode() + self.assertEquals(plaintext, decrypted) + + def test_wrong(self): + secret = "topsecret" + secret2 = "othersecret" + plaintext = "Hello MinIO!" + encrypted = encrypt(plaintext.encode(), secret) + self.assertRaises(ValueError, decrypt, encrypted, secret2)