Skip to content

Commit

Permalink
changed threads default
Browse files Browse the repository at this point in the history
  • Loading branch information
mjurbanski-reef committed Aug 16, 2023
1 parent 7211c98 commit d54533f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
* Better help text for --corsRules
* if `--threads` is not explicitly set, number of threads is no longer guaranteed to be 10

### Infrastructure
* Remove unsupported PyPy 3.7 from tests matrix and add PyPy 3.10 instead
Expand Down
4 changes: 4 additions & 0 deletions b2/_cli/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
B2_DESTINATION_SSE_C_KEY_ID_ENV_VAR = 'B2_DESTINATION_SSE_C_KEY_ID'
B2_SOURCE_SSE_C_KEY_B64_ENV_VAR = 'B2_SOURCE_SSE_C_KEY_B64'

# Threads defaults

DEFAULT_THREADS = 10

# Constants used in the B2 API
# TODO B2-47 move API related constants to b2sdk
CREATE_BUCKET_TYPES = ('allPublic', 'allPrivate')
Expand Down
80 changes: 46 additions & 34 deletions b2/console_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
B2_USER_AGENT_APPEND_ENV_VAR,
CREATE_BUCKET_TYPES,
DEFAULT_MIN_PART_SIZE,
DEFAULT_THREADS,
)
from b2._cli.shell import detect_shell
from b2._utils.filesystem import points_to_fifo
Expand Down Expand Up @@ -532,6 +533,28 @@ def _setup_parser(cls, parser):
super()._setup_parser(parser) # noqa


class ThreadsMixin(Described):
"""
Use --threads to manually adjust number of threads used in the operation.
Otherwise, the number of threads will be automatically chosen.
"""

@classmethod
def _setup_parser(cls, parser):
parser.add_argument('--threads', type=int, default=None)

super()._setup_parser(parser) # noqa

def _get_threads_from_args(self, args) -> int:
return args.threads or DEFAULT_THREADS

def _set_threads_from_args(self, args):
threads = self._get_threads_from_args(args)
# FIXME: This is using deprecated API. It should be be replaced when moving to b2sdk apiver 3.
# There is `max_download_workers` param in B2Api constructor for this.
self.api.services.download_manager.set_thread_pool_size(threads)


class Command(Described):
# Set to True for commands that receive sensitive information in arguments
FORBID_LOGGING_ARGUMENTS = False
Expand Down Expand Up @@ -1297,13 +1320,14 @@ def _print_file_attribute(self, label, value):

@B2.register_subcommand
class DownloadFileById(
ProgressMixin, SourceSseMixin, WriteBufferSizeMixin, SkipHashVerificationMixin,
ThreadsMixin, ProgressMixin, SourceSseMixin, WriteBufferSizeMixin, SkipHashVerificationMixin,
MaxDownloadStreamsMixin, DownloadCommand
):
"""
Downloads the given file, and stores it in the given local file.
{PROGRESSMIXIN}
{THREADSMIXIN}
{SOURCESSEMIXIN}
{WRITEBUFFERSIZEMIXIN}
{SKIPHASHVERIFICATIONMIXIN}
Expand All @@ -1316,18 +1340,14 @@ class DownloadFileById(

@classmethod
def _setup_parser(cls, parser):
parser.add_argument('--threads', type=int, default=10)
parser.add_argument('fileId')
parser.add_argument('localFileName')
super()._setup_parser(parser)

def run(self, args):
progress_listener = make_progress_listener(args.localFileName, args.noProgress)
encryption_setting = self._get_source_sse_setting(args)
if args.threads:
# FIXME: This is using deprecated API. It should be replaced when moving to b2sdk apiver 3.
# There is `max_download_workers` param in B2Api constructor for this.
self.api.services.download_manager.set_thread_pool_size(args.threads)
self._set_threads_from_args(args)
downloaded_file = self.api.download_file_by_id(
args.fileId, progress_listener, encryption=encryption_setting
)
Expand All @@ -1340,6 +1360,7 @@ def run(self, args):
@B2.register_subcommand
class DownloadFileByName(
ProgressMixin,
ThreadsMixin,
SourceSseMixin,
WriteBufferSizeMixin,
SkipHashVerificationMixin,
Expand All @@ -1350,6 +1371,7 @@ class DownloadFileByName(
Downloads the given file, and stores it in the given local file.
{PROGRESSMIXIN}
{THREADSMIXIN}
{SOURCESSEMIXIN}
{WRITEBUFFERSIZEMIXIN}
{SKIPHASHVERIFICATIONMIXIN}
Expand All @@ -1362,17 +1384,13 @@ class DownloadFileByName(

@classmethod
def _setup_parser(cls, parser):
parser.add_argument('--threads', type=int, default=10)
parser.add_argument('bucketName').completer = bucket_name_completer
parser.add_argument('b2FileName').completer = file_name_completer
parser.add_argument('localFileName')
super()._setup_parser(parser)

def run(self, args):
if args.threads:
# FIXME: This is using deprecated API. It should be replaced when moving to b2sdk apiver 3.
# There is `max_download_workers` param in B2Api constructor for this.
self.api.services.download_manager.set_thread_pool_size(args.threads)
self._set_threads_from_args(args)
bucket = self.api.get_bucket_by_name(args.bucketName)
progress_listener = make_progress_listener(args.localFileName, args.noProgress)
encryption_setting = self._get_source_sse_setting(args)
Expand Down Expand Up @@ -1923,7 +1941,7 @@ def format_ls_entry(self, file_version: FileVersion, replication: bool):


@B2.register_subcommand
class Rm(AbstractLsCommand):
class Rm(ThreadsMixin, AbstractLsCommand):
"""
Removes a "folder" or a set of files matching a pattern. Use with caution.
Expand All @@ -1939,11 +1957,8 @@ class Rm(AbstractLsCommand):
To list (but not remove) files to be deleted, use ``--dryRun``. You can also
list files via ``ls`` command - the listing behaviour is exactly the same.
Users with multiple files to be removed will benefit from multi-threaded
capabilities. The default number of threads is 10.
Progress is displayed on the console unless ``--noProgress`` is specified.
{THREADSMIXIN}
{ABSTRACTLSCOMMAND}
The ``--dryRun`` option prints all the files that would be affected by
Expand Down Expand Up @@ -2004,7 +2019,6 @@ class Rm(AbstractLsCommand):
- **deleteFiles**
"""

DEFAULT_THREADS = 10
PROGRESS_REPORT_CLASS = ProgressReport

class SubmitThread(threading.Thread):
Expand All @@ -2018,12 +2032,14 @@ def __init__(
args: argparse.Namespace,
messages_queue: queue.Queue,
reporter: ProgressReport,
threads: int,
):
self.runner = runner
self.args = args
self.messages_queue = messages_queue
self.reporter = reporter
removal_queue_size = self.args.queueSize or (2 * self.args.threads)
self.threads = threads
removal_queue_size = self.args.queueSize or (2 * self.threads)
self.semaphore = threading.BoundedSemaphore(value=removal_queue_size)
self.fail_fast_event = threading.Event()
self.mapping_lock = threading.Lock()
Expand All @@ -2032,7 +2048,7 @@ def __init__(

def run(self) -> None:
try:
with ThreadPoolExecutor(max_workers=self.args.threads) as executor:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
self._run_removal(executor)
except Exception as error:
self.messages_queue.put((self.EXCEPTION_TAG, error))
Expand Down Expand Up @@ -2086,7 +2102,6 @@ def _removal_done(self, future: Future) -> None:
@classmethod
def _setup_parser(cls, parser):
parser.add_argument('--dryRun', action='store_true')
parser.add_argument('--threads', type=int, default=cls.DEFAULT_THREADS)
parser.add_argument(
'--queueSize',
type=int,
Expand All @@ -2105,8 +2120,9 @@ def run(self, args):
failed_on_any_file = False
messages_queue = queue.Queue()

threads = self._get_threads_from_args(args)
with self.PROGRESS_REPORT_CLASS(self.stdout, args.noProgress) as reporter:
submit_thread = self.SubmitThread(self, args, messages_queue, reporter)
submit_thread = self.SubmitThread(self, args, messages_queue, reporter, threads=threads)
# This thread is started in daemon mode, no joining needed.
submit_thread.start()

Expand Down Expand Up @@ -2167,6 +2183,7 @@ def run(self, args):

@B2.register_subcommand
class Sync(
ThreadsMixin,
DestinationSseMixin,
SourceSseMixin,
WriteBufferSizeMixin,
Expand Down Expand Up @@ -2198,9 +2215,9 @@ class Sync(
The default is to fail when the specified source directory doesn't exist
or is empty. (This check only applies to version 1.0 and later.)
Users with high-performance networks, or file sets with very small
files, will benefit from multi-threaded uploads and downloads. The default
number of threads for syncing, downloading, and uploading is 10.
{THREADSMIXIN}
You can alternatively control number of threads per each operation.
The number of files processed in parallel is set by ``--syncThreads``,
the number of files/file parts downloaded in parallel is set by``--downloadThreads``,
and the number of files/file parts uploaded in parallel is set by `--uploadThreads``.
Expand Down Expand Up @@ -2347,7 +2364,6 @@ def _setup_parser(cls, parser):
parser.add_argument('--dryRun', action='store_true')
parser.add_argument('--allowEmptySource', action='store_true')
parser.add_argument('--excludeAllSymlinks', action='store_true')
parser.add_argument('--threads', type=int)
parser.add_argument('--syncThreads', type=int, default=cls.DEFAULT_SYNC_THREADS)
parser.add_argument('--downloadThreads', type=int, default=cls.DEFAULT_DOWNLOAD_THREADS)
parser.add_argument('--uploadThreads', type=int, default=cls.DEFAULT_UPLOAD_THREADS)
Expand Down Expand Up @@ -2640,6 +2656,7 @@ def _setup_parser(cls, parser):

class UploadFileMixin(
MinPartSizeMixin,
ThreadsMixin,
ProgressMixin,
DestinationSseMixin,
LegalHoldMixin,
Expand Down Expand Up @@ -2669,9 +2686,6 @@ def _setup_parser(cls, parser):
parser.add_argument(
'--sha1', help="SHA-1 of the data being uploaded for verifying file integrity"
)
parser.add_argument(
'--threads', type=int, default=10, help="number of threads used for the operation"
)
parser.add_argument('--cache-control', default=None)
parser.add_argument(
'--info',
Expand All @@ -2694,11 +2708,7 @@ def _setup_parser(cls, parser):
super()._setup_parser(parser) # add parameters from the mixins

def run(self, args):

# FIXME: This is using deprecated API. It should be replaced when moving to b2sdk apiver 3.
# There is `max_upload_workers` param in B2Api constructor for this.
self.api.services.upload_manager.set_thread_pool_size(args.threads)

self._set_threads_from_args(args)
upload_kwargs = self.get_execute_kwargs(args)
file_info = self.execute_operation(**upload_kwargs)
if not args.quiet:
Expand Down Expand Up @@ -2737,7 +2747,7 @@ def get_execute_kwargs(self, args) -> dict:
"min_part_size": args.minPartSize,
"progress_listener": make_progress_listener(args.localFilePath, args.noProgress),
"sha1_sum": args.sha1,
"threads": args.threads,
"threads": self._get_threads_from_args(args),
}

@abstractmethod
Expand Down Expand Up @@ -2801,6 +2811,7 @@ class UploadFile(UploadFileMixin, UploadModeMixin, Command):
{UPLOADFILEMIXIN}
{MINPARTSIZEMIXIN}
{PROGRESSMIXIN}
{THREADSMIXIN}
{DESTINATIONSSEMIXIN}
{FILERETENTIONSETTINGMIXIN}
{LEGALHOLDMIXIN}
Expand Down Expand Up @@ -2862,6 +2873,7 @@ class UploadUnboundStream(UploadFileMixin, Command):
if you expect the stream to be larger than 50GB.
{PROGRESSMIXIN}
{THREADSMIXIN}
{DESTINATIONSSEMIXIN}
{FILERETENTIONSETTINGMIXIN}
{LEGALHOLDMIXIN}
Expand Down

0 comments on commit d54533f

Please sign in to comment.