-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Python Multiprocessing Helpers #117
Merged
Merged
Changes from 21 commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
cbc0c9a
Add Python Multiprocessing Helpers
clee-ai 598d923
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 5a187d7
Documentation
clee-ai 65c5529
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 421297e
Limit multiprocessing tests to python>3.6
clee-ai 183573e
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai b0a09ad
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] a3abef4
fix typo
clee-ai 3b5e789
typo
clee-ai e74b631
fix test
clee-ai 5471df7
fix test
clee-ai a45190b
Cleanup
clee-ai 05a556b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 711eb7c
Add max_workers
clee-ai 8df3a1b
Generate MP test file instead of using autzen
clee-ai d8a4cba
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai 47e8847
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] f009658
change test file path
clee-ai b91f2de
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai 541efe1
Move init_mp to outside function
clee-ai 27ddaa3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 8bfea6d
docs
clee-ai 74bb82e
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai c0d2756
pin laspy
clee-ai b66ecd4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 8ac2a21
change to lazrs
clee-ai 9f8de1a
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai 29ab649
update
clee-ai b210d27
update
clee-ai a1f0630
Merge remote-tracking branch 'origin/main' into multiprocessing
clee-ai 68cc432
Skip windows MP tests
clee-ai 088c303
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 1433e98
update
clee-ai 0e3482b
update
clee-ai 0d27ba8
update
clee-ai 1e9e05c
Merge remote-tracking branch 'origin/main' into multiprocessing
clee-ai 8891351
update to master
clee-ai File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import copclib as copc | ||
from copclib.mp.transform import transform_multithreaded | ||
from scipy.spatial.transform import Rotation as R | ||
import numpy as np | ||
|
||
from tqdm import tqdm | ||
import os | ||
|
||
|
||
DATADIRECTORY = os.path.join(os.path.dirname(__file__), "..", "test", "data") | ||
if not os.path.exists(os.path.join(os.path.join(DATADIRECTORY, "out"))): | ||
os.makedirs(os.path.join(DATADIRECTORY, "out")) | ||
|
||
|
||
def _rotate_transform(rotation_center, points, **kwargs): | ||
"""Transformation function that gets called in multiprocessing to rotate a | ||
set of copclib.Points | ||
""" | ||
|
||
# Read in the XYZ coordinates | ||
xyz = np.stack([points.x, points.y, points.z], axis=1) | ||
|
||
# Center about the rotation center | ||
xyz -= rotation_center | ||
# Construct the rotation matrix | ||
rot_mat = R.from_euler("XYZ", (0, 0, 90), degrees=True).as_matrix() | ||
# Do the rotation | ||
rotated_points = np.matmul(rot_mat, xyz.T).T | ||
# Reset the center back to where it was | ||
rotated_points += rotation_center | ||
|
||
# Assign the rotated points back to the points | ||
points.x = rotated_points[:, 0] | ||
points.y = rotated_points[:, 1] | ||
points.z = rotated_points[:, 2] | ||
|
||
return points | ||
|
||
|
||
def rotate_file_mp(): | ||
"""An example of using transform_multithreaded. It reads in Autzen and rotates all the points 90 degrees, | ||
updating the header min/max as well. | ||
""" | ||
# Open a new reader and writer | ||
file_path = os.path.join(DATADIRECTORY, "out", "autzen-rotated.copc.laz") | ||
reader = copc.FileReader(os.path.join(DATADIRECTORY, "autzen-classified.copc.laz")) | ||
writer = copc.FileWriter( | ||
file_path, | ||
reader.copc_config, | ||
) | ||
|
||
# Set the center of rotation to the minimum XYZ | ||
min = reader.copc_config.las_header.min | ||
rotation_center = np.array([min.x, min.y, min.z]) | ||
with tqdm() as progress: | ||
# Do the transformation, passing the rotation_center as a parameter to _rotate_transform | ||
transform_multithreaded( | ||
reader=reader, | ||
writer=writer, | ||
transform_function=_rotate_transform, | ||
transform_function_args=dict(rotation_center=rotation_center), | ||
progress=progress, | ||
update_minmax=True, | ||
) | ||
|
||
writer.Close() | ||
|
||
# validate | ||
new_reader = copc.FileReader(file_path) | ||
for node in reader.GetAllNodes(): | ||
assert node.IsValid() | ||
new_node = new_reader.FindNode(node.key) | ||
assert new_node.IsValid() | ||
assert new_node.key == node.key | ||
assert new_node.point_count == node.point_count | ||
|
||
|
||
def copy_file_mp(): | ||
"""An example of the default behavior of transform_multithreaded, | ||
which copies the points directly over to a new file. | ||
""" | ||
# Open the reader and writer | ||
file_path = os.path.join(DATADIRECTORY, "out", "autzen-rotated.copc.laz") | ||
reader = copc.FileReader(os.path.join(DATADIRECTORY, "autzen-copied.copc.laz")) | ||
writer = copc.FileWriter( | ||
file_path, | ||
reader.copc_config, | ||
) | ||
|
||
# Do the transformation | ||
transform_multithreaded(reader, writer) | ||
writer.Close() | ||
|
||
# validate | ||
new_reader = copc.FileReader(file_path) | ||
for node in reader.GetAllNodes(): | ||
assert node.IsValid() | ||
new_node = new_reader.FindNode(node.key) | ||
assert new_node.IsValid() | ||
assert new_node.key == node.key | ||
assert new_node.point_count == node.point_count | ||
assert new_node.byte_size == node.byte_size | ||
|
||
|
||
if __name__ == "__main__": | ||
rotate_file_mp() | ||
copy_file_mp() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
from .utils import chunks | ||
import copclib as copc | ||
import numpy as np | ||
|
||
import concurrent.futures | ||
|
||
# Initialize each multiprocessing thread with a copy of the copc reader | ||
def init_mp(copc_path): | ||
_read_node.copc_reader = copc.FileReader(copc_path) | ||
|
||
|
||
def read_multithreaded( | ||
reader, | ||
read_function, | ||
read_function_args={}, | ||
nodes=None, | ||
resolution=-1, | ||
progress=None, | ||
completed_callback=None, | ||
chunk_size=1024, | ||
max_workers=None, | ||
): | ||
"""Scaffolding for reading COPC files in a multithreaded way to increase performance. | ||
It queues all nodes from either the provided list of nodes or nodes within the given resolution to be processed. | ||
Within the multiprocess, `read_function` is called with `read_function_args` keyword arguments, as well as the | ||
keyword arguments "points", "node", and "reader". | ||
This function should take those parameters and return a pickleable object that represents those points - | ||
for example, a list of XYZ points limited by classification. | ||
Note that whatever is returned from this function must be wrapped in a dictionary, | ||
because the return values get passed as keyword arguments to `callback_function`. | ||
The return value of `read_function`, as well as the currently processed node, is passed back to the main thread, | ||
where `callback_function` is called with the node and the return values of `read_function` as keyword arguments. | ||
This function can aggregate your results as they back from the multithreading. | ||
|
||
Args: | ||
reader (copclib.CopcReader): A copc reader for the file you are reading | ||
read_function (function): A function which takes each input node and its points and | ||
returns a pickleable object as output. | ||
read_function_args (dict, optional): A key/value pair of keyword arguments to pass to the read_function. Defaults to {}. | ||
nodes (list[copc.Node], optional): A list of nodes to run the reader on. Defaults to reading all the nodes. | ||
resolution (float, optional): If a list of nodes is not provided, reads all nodes up to this resolution. | ||
Defaults to reading all nodes. | ||
progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None. | ||
completed_callback (function, optional): A function which is called after a node is processed | ||
and returned from multiprocessing. Defaults to None. | ||
chunk_size (int, optional): Limits the amount of nodes which are queued for multiprocessing at once. Defaults to 1024. | ||
max_workers (int, optional): Manually set the number of processors to use when multiprocessing. Defaults to all processors. | ||
|
||
Raises: | ||
RuntimeError | ||
""" | ||
if nodes is not None and resolution > -1: | ||
raise RuntimeError("You can only specify one of: 'nodes', 'resolution'") | ||
|
||
# Sets the nodes to iterate over, if none are provided | ||
if nodes is None: | ||
if resolution > -1: | ||
nodes = reader.GetNodesWithinResolution(resolution) | ||
else: | ||
nodes = reader.GetAllNodes() | ||
|
||
# Reset the progress bar to the new total number of nodes | ||
if progress is not None: | ||
progress.reset(len(nodes)) | ||
|
||
# Initialize the multiprocessing | ||
with concurrent.futures.ProcessPoolExecutor( | ||
max_workers=max_workers, | ||
initializer=init_mp, | ||
initargs=(reader.path,), | ||
clee-ai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) as executor: | ||
# Chunk the nodes so we're not flooding executor.submit | ||
for chunk in chunks(nodes, chunk_size): | ||
futures = [] | ||
# Call _read_node, which calls the read_function | ||
for node in chunk: | ||
future = executor.submit( | ||
_read_node, | ||
read_function, | ||
read_function_args, | ||
node, | ||
) | ||
# Update the progress bar, if necessary | ||
if progress is not None: | ||
future.add_done_callback(lambda _: progress.update()) | ||
futures.append(future) | ||
|
||
# As each node completes | ||
for fut in concurrent.futures.as_completed(futures): | ||
node, return_vals = fut.result() | ||
# Call competed_callback if provided | ||
if completed_callback: | ||
completed_callback( | ||
node=node, | ||
**return_vals, | ||
) | ||
|
||
|
||
def _read_node(read_function, read_function_args, node): | ||
"""Helper function which gets called by executor.submit in the multiprocessing. | ||
Calls read_function and returns the results. | ||
""" | ||
reader = _read_node.copc_reader | ||
# Decompress and unpack the points within each thread | ||
points = reader.GetPoints(node) | ||
|
||
# If any of these arguments are provided, they'll throw an error since we use those argument names | ||
for argument_name in read_function_args.keys(): | ||
assert argument_name not in [ | ||
"points", | ||
"node", | ||
"reader", | ||
], f"Use of protected keyword argument '{argument_name}'!" | ||
# Actually call the read_function | ||
ret = read_function(points=points, node=node, reader=reader, **read_function_args) | ||
assert isinstance( | ||
ret, dict | ||
), "The read_function return value should be a dictionary of kwargs!" | ||
|
||
return node, ret | ||
|
||
|
||
def _do_read_xyz(points, class_limits=None, **kwargs): | ||
"""A 'read_function' which returns a numpy array of XYZ points from the COPC file, | ||
optionally limiting to certain classifications. | ||
|
||
Args: | ||
points (copc.Points): The decompressed and unpacked Points from the given node | ||
class_limits (list[int], optional): Limits the points returned to those whose | ||
classification. Defaults to None. | ||
|
||
Returns: | ||
dict(xyz=np.array): The numpy array of points, with "xyz" as their kwarg key | ||
""" | ||
xyzs = [] | ||
# Iterate over each point in the node and check if it's | ||
# within the provided classificaiton limits | ||
for point in points: | ||
if not class_limits or point.classification in class_limits: | ||
xyzs.append([point.x, point.y, point.z]) | ||
|
||
# Reshape to always be (Nx3), in case there's no points | ||
return dict(xyz=np.array(xyzs).reshape(len(xyzs), 3)) | ||
|
||
|
||
def read_concat_xyz_class_limit( | ||
reader, classification_limits=[], resolution=-1, progress=None, **kwargs | ||
): | ||
"""Reads the nodes in a COPC file and returns a concatenated list of XYZ coordinates, | ||
optionally limited by classifications and resolution. | ||
|
||
Args: | ||
reader (copclib.CopcReader): A copc reader for the file you are reading | ||
classification_limits (list[int], optional): Limit the coordinates returned | ||
to those points with a classification inside this list. Defaults to []. | ||
resolution (float, optional): Reads all nodes up to this resolution. Defaults to reading all nodes. | ||
progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None. | ||
|
||
Raises: | ||
RuntimeError | ||
|
||
Returns: | ||
np.array: An (nx3) array of XYZ coordinates. | ||
""" | ||
# We provide these arguments within this function, so the user isn't able to provide them. | ||
invalid_args = ["filter_function", "filter_function_args", "completed_callback"] | ||
for invalid_arg in invalid_args: | ||
if invalid_arg in kwargs: | ||
raise RuntimeError(f"Invalid kwarg '{invalid_arg}'!") | ||
|
||
# Container of all XYZ points | ||
all_xyz = [] | ||
|
||
# After each node is done, add the array of that node's XYZ coordinates | ||
# to our container | ||
def callback(xyz, **kwargs): | ||
all_xyz.append(xyz) | ||
|
||
read_multithreaded( | ||
reader=reader, | ||
read_function=_do_read_xyz, | ||
read_function_args=dict(class_limits=classification_limits), | ||
completed_callback=callback, | ||
resolution=resolution, | ||
progress=progress, | ||
**kwargs, | ||
) | ||
|
||
# Concatenate all the points in the end, and return one large array of | ||
# all the points in the file | ||
return np.concatenate(all_xyz) | ||
|
||
|
||
def read_map_xyz_class_limit( | ||
reader, classification_limits=[], resolution=-1, progress=None, **kwargs | ||
): | ||
"""Reads the nodes in a COPC file and returns a dictionary which maps stringified node keys to | ||
their respective XYZ coordinates, optionally limited by classifications and resolution. | ||
If a node has no points matching the constraints, it won't be added to this dictionary. | ||
|
||
Args: | ||
reader (copclib.CopcReader): A copc reader for the file you are reading | ||
classification_limits (list[int], optional): Limit the coordinates returned | ||
to those points with a classification inside this list. Defaults to []. | ||
resolution (float, optional): Reads all nodes up to this resolution. Defaults to reading all nodes. | ||
progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None. | ||
|
||
Raises: | ||
RuntimeError | ||
|
||
Returns: | ||
dict[str: np.array]: A mapping of stringified COPC keys to an (nx3) array of XYZ coordinates. | ||
""" | ||
# We provide these arguments within this function, so the user isn't able to provide them. | ||
invalid_args = ["filter_function", "filter_function_args", "completed_callback"] | ||
for invalid_arg in invalid_args: | ||
if invalid_arg in kwargs: | ||
raise RuntimeError(f"Invalid kwarg '{invalid_arg}'!") | ||
|
||
# Map of stringified VoxelKeys to numpy arrays of coordinates | ||
key_xyz_map = {} | ||
|
||
# After each node is done processing, add the returned coordinates | ||
# to the map | ||
def callback(xyz, node, **kwargs): | ||
if len(xyz) > 0: | ||
key_xyz_map[str(node.key)] = xyz | ||
|
||
read_multithreaded( | ||
reader, | ||
read_function=_do_read_xyz, | ||
read_function_args=dict(class_limits=classification_limits), | ||
completed_callback=callback, | ||
resolution=resolution, | ||
progress=progress, | ||
**kwargs, | ||
) | ||
|
||
return key_xyz_map |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a ton of comments to this function? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if that's better!