Skip to content

Commit

Permalink
Multiprocessing Additions (#120)
Browse files Browse the repository at this point in the history
* Allow specifiying MP init function

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix

* Update docstring

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
CCInc and pre-commit-ci[bot] authored Sep 12, 2022
1 parent e610a7d commit 2315b31
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions python/copclib/mp/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ def _copy_points_transform(points, **kwargs):


# Initialize each multiprocessing thread with a copy of the copc reader
def init_mp(copc_path):
def init_mp(copc_path, mp_init_function, mp_init_function_args):
_transform_node.copc_reader = copc.FileReader(copc_path)
if mp_init_function:
mp_init_function(**mp_init_function_args)


def transform_multithreaded(
Expand All @@ -26,6 +28,8 @@ def transform_multithreaded(
chunk_size=1024,
max_workers=None,
update_minmax=False,
mp_init_function=None,
mp_init_function_args={},
):
"""Scaffolding for reading COPC files and writing them back out in a multithreaded way.
It queues all nodes from either the provided list of nodes or nodes within the given resolution to be processed.
Expand Down Expand Up @@ -57,6 +61,9 @@ def transform_multithreaded(
max_workers (int, optional): Manually set the number of processors to use when multiprocessing. Defaults to all processors.
update_minmax (bool, optional): If true, updates the header of the new COPC file with the correct XYZ min/max.
Defaults to False.
mp_init_function: (function, optional): A function that gets called in the ProcessPoolExeuctor initializer
mp_init_function_args: (dict, optional): A key/value pair of keyword arguments that get passed to `mp_init_function`.
Defaults to {}.
Raises:
RuntimeError
Expand Down Expand Up @@ -86,7 +93,7 @@ def transform_multithreaded(
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_workers,
initializer=init_mp,
initargs=(reader.path,),
initargs=(reader.path, mp_init_function, mp_init_function_args),
) as executor:
# Chunk the nodes so we're not flooding executor.submit
for chunk in chunks(nodes, chunk_size):
Expand Down

0 comments on commit 2315b31

Please sign in to comment.