From d54533fdbd3031358dffc9bce75c7aee6a542c9a Mon Sep 17 00:00:00 2001 From: Maciej Urbanski Date: Tue, 15 Aug 2023 22:19:44 +0200 Subject: [PATCH] changed `threads` default --- CHANGELOG.md | 1 + b2/_cli/const.py | 4 +++ b2/console_tool.py | 80 ++++++++++++++++++++++++++-------------------- 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6670f8ef4..3472b1604 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed * Better help text for --corsRules +* if `--threads` is not explicitly set, number of threads is no longer guaranteed to be 10 ### Infrastructure * Remove unsupported PyPy 3.7 from tests matrix and add PyPy 3.10 instead diff --git a/b2/_cli/const.py b/b2/_cli/const.py index 5ac8acc20..44d26f36b 100644 --- a/b2/_cli/const.py +++ b/b2/_cli/const.py @@ -19,6 +19,10 @@ B2_DESTINATION_SSE_C_KEY_ID_ENV_VAR = 'B2_DESTINATION_SSE_C_KEY_ID' B2_SOURCE_SSE_C_KEY_B64_ENV_VAR = 'B2_SOURCE_SSE_C_KEY_B64' +# Threads defaults + +DEFAULT_THREADS = 10 + # Constants used in the B2 API # TODO B2-47 move API related constants to b2sdk CREATE_BUCKET_TYPES = ('allPublic', 'allPrivate') diff --git a/b2/console_tool.py b/b2/console_tool.py index a230eb216..befa88132 100644 --- a/b2/console_tool.py +++ b/b2/console_tool.py @@ -116,6 +116,7 @@ B2_USER_AGENT_APPEND_ENV_VAR, CREATE_BUCKET_TYPES, DEFAULT_MIN_PART_SIZE, + DEFAULT_THREADS, ) from b2._cli.shell import detect_shell from b2._utils.filesystem import points_to_fifo @@ -532,6 +533,28 @@ def _setup_parser(cls, parser): super()._setup_parser(parser) # noqa +class ThreadsMixin(Described): + """ + Use --threads to manually adjust number of threads used in the operation. + Otherwise, the number of threads will be automatically chosen. + """ + + @classmethod + def _setup_parser(cls, parser): + parser.add_argument('--threads', type=int, default=None) + + super()._setup_parser(parser) # noqa + + def _get_threads_from_args(self, args) -> int: + return args.threads or DEFAULT_THREADS + + def _set_threads_from_args(self, args): + threads = self._get_threads_from_args(args) + # FIXME: This is using deprecated API. It should be be replaced when moving to b2sdk apiver 3. + # There is `max_download_workers` param in B2Api constructor for this. + self.api.services.download_manager.set_thread_pool_size(threads) + + class Command(Described): # Set to True for commands that receive sensitive information in arguments FORBID_LOGGING_ARGUMENTS = False @@ -1297,13 +1320,14 @@ def _print_file_attribute(self, label, value): @B2.register_subcommand class DownloadFileById( - ProgressMixin, SourceSseMixin, WriteBufferSizeMixin, SkipHashVerificationMixin, + ThreadsMixin, ProgressMixin, SourceSseMixin, WriteBufferSizeMixin, SkipHashVerificationMixin, MaxDownloadStreamsMixin, DownloadCommand ): """ Downloads the given file, and stores it in the given local file. {PROGRESSMIXIN} + {THREADSMIXIN} {SOURCESSEMIXIN} {WRITEBUFFERSIZEMIXIN} {SKIPHASHVERIFICATIONMIXIN} @@ -1316,7 +1340,6 @@ class DownloadFileById( @classmethod def _setup_parser(cls, parser): - parser.add_argument('--threads', type=int, default=10) parser.add_argument('fileId') parser.add_argument('localFileName') super()._setup_parser(parser) @@ -1324,10 +1347,7 @@ def _setup_parser(cls, parser): def run(self, args): progress_listener = make_progress_listener(args.localFileName, args.noProgress) encryption_setting = self._get_source_sse_setting(args) - if args.threads: - # FIXME: This is using deprecated API. It should be replaced when moving to b2sdk apiver 3. - # There is `max_download_workers` param in B2Api constructor for this. - self.api.services.download_manager.set_thread_pool_size(args.threads) + self._set_threads_from_args(args) downloaded_file = self.api.download_file_by_id( args.fileId, progress_listener, encryption=encryption_setting ) @@ -1340,6 +1360,7 @@ def run(self, args): @B2.register_subcommand class DownloadFileByName( ProgressMixin, + ThreadsMixin, SourceSseMixin, WriteBufferSizeMixin, SkipHashVerificationMixin, @@ -1350,6 +1371,7 @@ class DownloadFileByName( Downloads the given file, and stores it in the given local file. {PROGRESSMIXIN} + {THREADSMIXIN} {SOURCESSEMIXIN} {WRITEBUFFERSIZEMIXIN} {SKIPHASHVERIFICATIONMIXIN} @@ -1362,17 +1384,13 @@ class DownloadFileByName( @classmethod def _setup_parser(cls, parser): - parser.add_argument('--threads', type=int, default=10) parser.add_argument('bucketName').completer = bucket_name_completer parser.add_argument('b2FileName').completer = file_name_completer parser.add_argument('localFileName') super()._setup_parser(parser) def run(self, args): - if args.threads: - # FIXME: This is using deprecated API. It should be replaced when moving to b2sdk apiver 3. - # There is `max_download_workers` param in B2Api constructor for this. - self.api.services.download_manager.set_thread_pool_size(args.threads) + self._set_threads_from_args(args) bucket = self.api.get_bucket_by_name(args.bucketName) progress_listener = make_progress_listener(args.localFileName, args.noProgress) encryption_setting = self._get_source_sse_setting(args) @@ -1923,7 +1941,7 @@ def format_ls_entry(self, file_version: FileVersion, replication: bool): @B2.register_subcommand -class Rm(AbstractLsCommand): +class Rm(ThreadsMixin, AbstractLsCommand): """ Removes a "folder" or a set of files matching a pattern. Use with caution. @@ -1939,11 +1957,8 @@ class Rm(AbstractLsCommand): To list (but not remove) files to be deleted, use ``--dryRun``. You can also list files via ``ls`` command - the listing behaviour is exactly the same. - Users with multiple files to be removed will benefit from multi-threaded - capabilities. The default number of threads is 10. - Progress is displayed on the console unless ``--noProgress`` is specified. - + {THREADSMIXIN} {ABSTRACTLSCOMMAND} The ``--dryRun`` option prints all the files that would be affected by @@ -2004,7 +2019,6 @@ class Rm(AbstractLsCommand): - **deleteFiles** """ - DEFAULT_THREADS = 10 PROGRESS_REPORT_CLASS = ProgressReport class SubmitThread(threading.Thread): @@ -2018,12 +2032,14 @@ def __init__( args: argparse.Namespace, messages_queue: queue.Queue, reporter: ProgressReport, + threads: int, ): self.runner = runner self.args = args self.messages_queue = messages_queue self.reporter = reporter - removal_queue_size = self.args.queueSize or (2 * self.args.threads) + self.threads = threads + removal_queue_size = self.args.queueSize or (2 * self.threads) self.semaphore = threading.BoundedSemaphore(value=removal_queue_size) self.fail_fast_event = threading.Event() self.mapping_lock = threading.Lock() @@ -2032,7 +2048,7 @@ def __init__( def run(self) -> None: try: - with ThreadPoolExecutor(max_workers=self.args.threads) as executor: + with ThreadPoolExecutor(max_workers=self.threads) as executor: self._run_removal(executor) except Exception as error: self.messages_queue.put((self.EXCEPTION_TAG, error)) @@ -2086,7 +2102,6 @@ def _removal_done(self, future: Future) -> None: @classmethod def _setup_parser(cls, parser): parser.add_argument('--dryRun', action='store_true') - parser.add_argument('--threads', type=int, default=cls.DEFAULT_THREADS) parser.add_argument( '--queueSize', type=int, @@ -2105,8 +2120,9 @@ def run(self, args): failed_on_any_file = False messages_queue = queue.Queue() + threads = self._get_threads_from_args(args) with self.PROGRESS_REPORT_CLASS(self.stdout, args.noProgress) as reporter: - submit_thread = self.SubmitThread(self, args, messages_queue, reporter) + submit_thread = self.SubmitThread(self, args, messages_queue, reporter, threads=threads) # This thread is started in daemon mode, no joining needed. submit_thread.start() @@ -2167,6 +2183,7 @@ def run(self, args): @B2.register_subcommand class Sync( + ThreadsMixin, DestinationSseMixin, SourceSseMixin, WriteBufferSizeMixin, @@ -2198,9 +2215,9 @@ class Sync( The default is to fail when the specified source directory doesn't exist or is empty. (This check only applies to version 1.0 and later.) - Users with high-performance networks, or file sets with very small - files, will benefit from multi-threaded uploads and downloads. The default - number of threads for syncing, downloading, and uploading is 10. + {THREADSMIXIN} + + You can alternatively control number of threads per each operation. The number of files processed in parallel is set by ``--syncThreads``, the number of files/file parts downloaded in parallel is set by``--downloadThreads``, and the number of files/file parts uploaded in parallel is set by `--uploadThreads``. @@ -2347,7 +2364,6 @@ def _setup_parser(cls, parser): parser.add_argument('--dryRun', action='store_true') parser.add_argument('--allowEmptySource', action='store_true') parser.add_argument('--excludeAllSymlinks', action='store_true') - parser.add_argument('--threads', type=int) parser.add_argument('--syncThreads', type=int, default=cls.DEFAULT_SYNC_THREADS) parser.add_argument('--downloadThreads', type=int, default=cls.DEFAULT_DOWNLOAD_THREADS) parser.add_argument('--uploadThreads', type=int, default=cls.DEFAULT_UPLOAD_THREADS) @@ -2640,6 +2656,7 @@ def _setup_parser(cls, parser): class UploadFileMixin( MinPartSizeMixin, + ThreadsMixin, ProgressMixin, DestinationSseMixin, LegalHoldMixin, @@ -2669,9 +2686,6 @@ def _setup_parser(cls, parser): parser.add_argument( '--sha1', help="SHA-1 of the data being uploaded for verifying file integrity" ) - parser.add_argument( - '--threads', type=int, default=10, help="number of threads used for the operation" - ) parser.add_argument('--cache-control', default=None) parser.add_argument( '--info', @@ -2694,11 +2708,7 @@ def _setup_parser(cls, parser): super()._setup_parser(parser) # add parameters from the mixins def run(self, args): - - # FIXME: This is using deprecated API. It should be replaced when moving to b2sdk apiver 3. - # There is `max_upload_workers` param in B2Api constructor for this. - self.api.services.upload_manager.set_thread_pool_size(args.threads) - + self._set_threads_from_args(args) upload_kwargs = self.get_execute_kwargs(args) file_info = self.execute_operation(**upload_kwargs) if not args.quiet: @@ -2737,7 +2747,7 @@ def get_execute_kwargs(self, args) -> dict: "min_part_size": args.minPartSize, "progress_listener": make_progress_listener(args.localFilePath, args.noProgress), "sha1_sum": args.sha1, - "threads": args.threads, + "threads": self._get_threads_from_args(args), } @abstractmethod @@ -2801,6 +2811,7 @@ class UploadFile(UploadFileMixin, UploadModeMixin, Command): {UPLOADFILEMIXIN} {MINPARTSIZEMIXIN} {PROGRESSMIXIN} + {THREADSMIXIN} {DESTINATIONSSEMIXIN} {FILERETENTIONSETTINGMIXIN} {LEGALHOLDMIXIN} @@ -2862,6 +2873,7 @@ class UploadUnboundStream(UploadFileMixin, Command): if you expect the stream to be larger than 50GB. {PROGRESSMIXIN} + {THREADSMIXIN} {DESTINATIONSSEMIXIN} {FILERETENTIONSETTINGMIXIN} {LEGALHOLDMIXIN}