Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python Multiprocessing Helpers #117

Merged
merged 37 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
cbc0c9a
Add Python Multiprocessing Helpers
clee-ai Sep 6, 2022
598d923
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2022
5a187d7
Documentation
clee-ai Sep 6, 2022
65c5529
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2022
421297e
Limit multiprocessing tests to python>3.6
clee-ai Sep 6, 2022
183573e
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai Sep 6, 2022
b0a09ad
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2022
a3abef4
fix typo
clee-ai Sep 6, 2022
3b5e789
typo
clee-ai Sep 6, 2022
e74b631
fix test
clee-ai Sep 7, 2022
5471df7
fix test
clee-ai Sep 7, 2022
a45190b
Cleanup
clee-ai Sep 7, 2022
05a556b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2022
711eb7c
Add max_workers
clee-ai Sep 7, 2022
8df3a1b
Generate MP test file instead of using autzen
clee-ai Sep 7, 2022
d8a4cba
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai Sep 7, 2022
47e8847
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2022
f009658
change test file path
clee-ai Sep 7, 2022
b91f2de
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai Sep 7, 2022
541efe1
Move init_mp to outside function
clee-ai Sep 7, 2022
27ddaa3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2022
8bfea6d
docs
clee-ai Sep 7, 2022
74bb82e
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai Sep 7, 2022
c0d2756
pin laspy
clee-ai Sep 7, 2022
b66ecd4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2022
8ac2a21
change to lazrs
clee-ai Sep 7, 2022
9f8de1a
Merge branch 'multiprocessing' of github.com:RockRobotic/copc-lib int…
clee-ai Sep 7, 2022
29ab649
update
clee-ai Sep 7, 2022
b210d27
update
clee-ai Sep 7, 2022
a1f0630
Merge remote-tracking branch 'origin/main' into multiprocessing
clee-ai Sep 7, 2022
68cc432
Skip windows MP tests
clee-ai Sep 7, 2022
088c303
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2022
1433e98
update
clee-ai Sep 7, 2022
0e3482b
update
clee-ai Sep 7, 2022
0d27ba8
update
clee-ai Sep 7, 2022
1e9e05c
Merge remote-tracking branch 'origin/main' into multiprocessing
clee-ai Sep 8, 2022
8891351
update to master
clee-ai Sep 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ repos:
always_run: true
args: ["-i"]
additional_dependencies: ["clang-format==12.0.1"]

- repo: https://github.com/psf/black
rev: 22.3.0
rev: 22.8.0
hooks:
- id: black
language_version: python
4 changes: 4 additions & 0 deletions cpp/include/copc-lib/io/copc_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class FileReader : public Reader
FileReader(const std::string &file_path) : is_open_(true)
{
auto f_stream = new std::fstream;
this->file_path_ = file_path;
f_stream->open(file_path.c_str(), std::ios::in | std::ios::binary);
if (!f_stream->good())
throw std::runtime_error("FileReader: Error while opening file path.");
Expand All @@ -120,10 +121,13 @@ class FileReader : public Reader
}
}

std::string FilePath() { return file_path_; }

~FileReader() { Close(); }

private:
bool is_open_;
std::string file_path_;
};

} // namespace copc
Expand Down
89 changes: 89 additions & 0 deletions example/example_mp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import copclib as copc
from copclib.mp.transform import transform_multithreaded
from scipy.spatial.transform import Rotation as R
import numpy as np

from tqdm import tqdm
import os


DATADIRECTORY = os.path.join(os.path.dirname(__file__), "..", "test", "data")
if not os.path.exists(os.path.join(os.path.join(DATADIRECTORY, "out"))):
os.makedirs(os.path.join(DATADIRECTORY, "out"))


def _rotate_transform(rotation_center, points, **kwargs):
xyz = np.stack([points.x, points.y, points.z], axis=1)

# Rotation matrix
xyz -= rotation_center
# print(xyz)
rot_mat = R.from_euler("XYZ", (0, 0, 90), degrees=True).as_matrix()

rotated_points = np.matmul(rot_mat, xyz.T).T
rotated_points += rotation_center

points.x = rotated_points[:, 0]
points.y = rotated_points[:, 1]
points.z = rotated_points[:, 2]

return points


def rotate_file_mp():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a ton of comments to this function? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if that's better!

file_path = os.path.join(DATADIRECTORY, "out", "autzen-rotated.copc.laz")
reader = copc.FileReader(os.path.join(DATADIRECTORY, "autzen-classified.copc.laz"))
writer = copc.FileWriter(
file_path,
reader.copc_config,
)

min = reader.copc_config.las_header.min
rotation_center = np.array([min.x, min.y, min.z])
with tqdm() as progress:
transform_multithreaded(
reader,
clee-ai marked this conversation as resolved.
Show resolved Hide resolved
writer,
_rotate_transform,
transform_function_args=dict(rotation_center=rotation_center),
progress=progress,
update_minmax=True,
)

writer.Close()

# validate
new_reader = copc.FileReader(file_path)
for node in reader.GetAllNodes():
assert node.IsValid()
new_node = new_reader.FindNode(node.key)
assert new_node.IsValid()
assert new_node.key == node.key
assert new_node.point_count == node.point_count


def copy_file_mp():
file_path = os.path.join(DATADIRECTORY, "out", "autzen-rotated.copc.laz")
reader = copc.FileReader(os.path.join(DATADIRECTORY, "autzen-copied.copc.laz"))
writer = copc.FileWriter(
file_path,
reader.copc_config,
)

transform_multithreaded(reader, writer)
writer.Close()

# validate
new_reader = copc.FileReader(file_path)
for node in reader.GetAllNodes():
assert node.IsValid()
new_node = new_reader.FindNode(node.key)
assert new_node.IsValid()
assert new_node.key == node.key
assert new_node.point_count == node.point_count
assert new_node.byte_size == node.byte_size


if __name__ == "__main__":
rotate_file_mp()
copy_file_mp()
1 change: 1 addition & 0 deletions python/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ PYBIND11_MODULE(_core, m)
py::class_<FileReader>(m, "FileReader")
.def(py::init<std::string &>())
.def("Close", &FileReader::Close)
.def_property_readonly("path", &FileReader::FilePath)
.def("FindNode", &Reader::FindNode, py::arg("key"))
.def_property_readonly("copc_config", &Reader::CopcConfig)
.def("GetPointData", py::overload_cast<const Node &>(&Reader::GetPointData), py::arg("node"))
Expand Down
Empty file added python/copclib/mp/__init__.py
Empty file.
211 changes: 211 additions & 0 deletions python/copclib/mp/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
from .utils import chunks
import copclib as copc
import numpy as np

import concurrent.futures


def read_multithreaded(
reader,
read_function,
read_function_args={},
nodes=None,
resolution=-1,
progress=None,
completed_callback=None,
chunk_size=1024,
):
"""Scaffolding for reading COPC files in a multithreaded way to increase performance.
It queues all nodes from either the provided list of nodes or nodes within the given resolution to be processed.
Within the multiprocess, `read_function` is called with `read_function_args` keyword arguments, as well as the
keyword arguments "points", "node", and "reader".
This function should take those parameters and return a pickleable object that represents those points -
for example, a list of XYZ points limited by classification.
Note that whatever is returned from this function must be wrapped in a dictionary,
because the return values get passed as keyword arguments to `callback_function`.
The return value of `read_function`, as well as the currently processed node, is passed back to the main thread,
where `callback_function` is called with the node and the return values of `read_function` as keyword arguments.
This function can aggregate your results as they back from the multithreading.

Args:
reader (copclib.CopcReader): A copc reader for the file you are reading
read_function (function): A function which takes each input node and its points and
returns a pickleable object as output.
read_function_args (dict, optional): A key/value pair of keyword arguments to pass to the read_function. Defaults to {}.
nodes (list[copc.Node], optional): A list of nodes to run the reader on. Defaults to reading all the nodes.
resolution (float, optional): If a list of nodes is not provided, reads all nodes up to this resolution.
Defaults to reading all nodes.
progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None.
completed_callback (function, optional): A function which is called after a node is processed
and returned from multiprocessing. Defaults to None.
chunk_size (int, optional): Limits the amount of nodes which are queued for multiprocessing at once. Defaults to 1024.

Raises:
RuntimeError
"""
if nodes is not None and resolution > -1:
raise RuntimeError("You can only specify one of: 'nodes', 'resolution'")

if nodes is None:
if resolution > -1:
nodes = reader.GetNodesWithinResolution(resolution)
else:
nodes = reader.GetAllNodes()

if progress is not None:
progress.reset(len(nodes))

def init_mp(copc_path):
_read_node.copc_reader = copc.FileReader(copc_path)

with concurrent.futures.ProcessPoolExecutor(
initializer=init_mp,
initargs=(reader.path,),
clee-ai marked this conversation as resolved.
Show resolved Hide resolved
) as executor:
for chunk in chunks(nodes, chunk_size):
futures = []
for node in chunk:
future = executor.submit(
_read_node,
read_function,
read_function_args,
node,
)
if progress is not None:
future.add_done_callback(lambda _: progress.update())
futures.append(future)

for fut in concurrent.futures.as_completed(futures):
node, return_vals = fut.result()
if completed_callback:
completed_callback(
node=node,
**return_vals,
)


def _read_node(read_function, read_function_args, node):
"""Helper function which gets called by executor.submit in the multiprocessing.
Calls read_function and returns the results.
"""
reader = _read_node.copc_reader
points = reader.GetPoints(node)

for argument_name in read_function_args.keys():
assert argument_name not in [
"points",
"node",
"reader",
], f"Use of protected keyword argument '{argument_name}'!"
ret = read_function(points=points, node=node, reader=reader, **read_function_args)
assert isinstance(
ret, dict
), "The read_function return value should be a dictionary of kwargs!"

return node, ret


def _do_read_xyz(points, class_limits=None, **kwargs):
"""A 'read_function' which returns a numpy array of XYZ points from the COPC file,
optionally limiting to certain classifications.

Args:
points (copc.Points): The decompressed and unpacked Points from the given node
class_limits (list[int], optional): Limits the points returned to those whose
classification. Defaults to None.

Returns:
dict(xyz=np.array): The numpy array of points, with "xyz" as their kwarg key
"""
xyzs = []
for point in points:
if not class_limits or point.classification in class_limits:
xyzs.append([point.x, point.y, point.z])

return dict(xyz=np.array(xyzs).reshape(len(xyzs), 3))


def read_concat_xyz_class_limit(
reader, classification_limits=[], resolution=-1, progress=None, **kwargs
):
"""Reads the nodes in a COPC file and returns a concatenated list of XYZ coordinates,
optionally limited by classifications and resolution.

Args:
reader (copclib.CopcReader): A copc reader for the file you are reading
classification_limits (list[int], optional): Limit the coordinates returned
to those points with a classification inside this list. Defaults to [].
resolution (float, optional): Reads all nodes up to this resolution. Defaults to reading all nodes.
progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None.

Raises:
RuntimeError

Returns:
np.array: An (nx3) array of XYZ coordinates.
"""
invalid_args = ["filter_function", "filter_function_args", "completed_callback"]
for invalid_arg in invalid_args:
if invalid_arg in kwargs:
raise RuntimeError(f"Invalid kwarg '{invalid_arg}'!")

all_xyz = []

def callback(xyz, **kwargs):
all_xyz.append(xyz)

read_multithreaded(
reader,
read_function=_do_read_xyz,
read_function_args=dict(class_limits=classification_limits),
completed_callback=callback,
resolution=resolution,
progress=progress,
**kwargs,
)

return np.concatenate(all_xyz)


def read_map_xyz_class_limit(
reader, classification_limits=[], resolution=-1, progress=None, **kwargs
):
"""Reads the nodes in a COPC file and returns a dictionary which maps stringified node keys to
their respective XYZ coordinates, optionally limited by classifications and resolution.
If a node has no points matching the constraints, it won't be added to this dictionary.

Args:
reader (copclib.CopcReader): A copc reader for the file you are reading
classification_limits (list[int], optional): Limit the coordinates returned
to those points with a classification inside this list. Defaults to [].
resolution (float, optional): Reads all nodes up to this resolution. Defaults to reading all nodes.
progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None.

Raises:
RuntimeError

Returns:
dict[str: np.array]: A mapping of stringified COPC keys to an (nx3) array of XYZ coordinates.
"""
invalid_args = ["filter_function", "filter_function_args", "completed_callback"]
for invalid_arg in invalid_args:
if invalid_arg in kwargs:
raise RuntimeError(f"Invalid kwarg '{invalid_arg}'!")

key_xyz_map = {}

def callback(xyz, node, **kwargs):
if len(xyz) > 0:
key_xyz_map[str(node.key)] = xyz

read_multithreaded(
reader,
read_function=_do_read_xyz,
read_function_args=dict(class_limits=classification_limits),
completed_callback=callback,
resolution=resolution,
progress=progress,
**kwargs,
)

return key_xyz_map
Loading