From 0beabe7d34d99b86ce6e9fa6c5be6e8edb4c10e9 Mon Sep 17 00:00:00 2001 From: Christopher Lee Date: Thu, 8 Sep 2022 16:21:53 -0600 Subject: [PATCH] Add Python Multiprocessing Helpers (#117) * Add Python Multiprocessing Helpers * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Documentation * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Limit multiprocessing tests to python>3.6 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix typo * typo * fix test * fix test * Cleanup * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add max_workers * Generate MP test file instead of using autzen * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * change test file path * Move init_mp to outside function * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * docs * pin laspy * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * change to lazrs * update * update * Skip windows MP tests * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update * update * update * update to master Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .github/workflows/pip.yaml | 2 +- .pre-commit-config.yaml | 3 +- cpp/include/copc-lib/io/copc_reader.hpp | 4 + example/example_mp.py | 107 +++++++++++ python/bindings.cpp | 1 + python/copclib/mp/__init__.py | 0 python/copclib/mp/read.py | 239 ++++++++++++++++++++++++ python/copclib/mp/transform.py | 199 ++++++++++++++++++++ python/copclib/mp/utils.py | 4 + setup.py | 2 +- test/mp_read_test.py | 67 +++++++ test/mp_transform_test.py | 85 +++++++++ test/utils.py | 108 ++++++++++- 13 files changed, 817 insertions(+), 4 deletions(-) create mode 100644 example/example_mp.py create mode 100644 python/copclib/mp/__init__.py create mode 100644 python/copclib/mp/read.py create mode 100644 python/copclib/mp/transform.py create mode 100644 python/copclib/mp/utils.py create mode 100644 test/mp_read_test.py create mode 100644 test/mp_transform_test.py diff --git a/.github/workflows/pip.yaml b/.github/workflows/pip.yaml index 58befa27..7f380531 100644 --- a/.github/workflows/pip.yaml +++ b/.github/workflows/pip.yaml @@ -11,7 +11,7 @@ on: env: CIBW_TEST_COMMAND: pytest {project}/test CIBW_TEST_EXTRAS: test - CIBW_SKIP: "cp310-win* cp311-win* pp*" # just exclude platforms that error :)) + CIBW_SKIP: "cp310-win* cp311* pp*" # just exclude platforms that error :)) jobs: build_sdist: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2aa11d5b..2256119e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -24,8 +24,9 @@ repos: always_run: true args: ["-i"] additional_dependencies: ["clang-format==12.0.1"] + - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 22.8.0 hooks: - id: black language_version: python diff --git a/cpp/include/copc-lib/io/copc_reader.hpp b/cpp/include/copc-lib/io/copc_reader.hpp index 2134a37b..f15d6f29 100644 --- a/cpp/include/copc-lib/io/copc_reader.hpp +++ b/cpp/include/copc-lib/io/copc_reader.hpp @@ -102,6 +102,7 @@ class FileReader : public Reader FileReader(const std::string &file_path) : is_open_(true) { auto f_stream = new std::fstream; + this->file_path_ = file_path; f_stream->open(file_path.c_str(), std::ios::in | std::ios::binary); if (!f_stream->good()) throw std::runtime_error("FileReader: Error while opening file path."); @@ -120,10 +121,13 @@ class FileReader : public Reader } } + std::string FilePath() { return file_path_; } + ~FileReader() { Close(); } private: bool is_open_; + std::string file_path_; }; } // namespace copc diff --git a/example/example_mp.py b/example/example_mp.py new file mode 100644 index 00000000..c51a3d66 --- /dev/null +++ b/example/example_mp.py @@ -0,0 +1,107 @@ +import copclib as copc +from copclib.mp.transform import transform_multithreaded +from scipy.spatial.transform import Rotation as R +import numpy as np + +from tqdm import tqdm +import os + + +DATADIRECTORY = os.path.join(os.path.dirname(__file__), "..", "test", "data") +if not os.path.exists(os.path.join(os.path.join(DATADIRECTORY, "out"))): + os.makedirs(os.path.join(DATADIRECTORY, "out")) + + +def _rotate_transform(rotation_center, points, **kwargs): + """Transformation function that gets called in multiprocessing to rotate a + set of copclib.Points + """ + + # Read in the XYZ coordinates + xyz = np.stack([points.x, points.y, points.z], axis=1) + + # Center about the rotation center + xyz -= rotation_center + # Construct the rotation matrix + rot_mat = R.from_euler("XYZ", (0, 0, 90), degrees=True).as_matrix() + # Do the rotation + rotated_points = np.matmul(rot_mat, xyz.T).T + # Reset the center back to where it was + rotated_points += rotation_center + + # Assign the rotated points back to the points + points.x = rotated_points[:, 0] + points.y = rotated_points[:, 1] + points.z = rotated_points[:, 2] + + return points + + +def rotate_file_mp(): + """An example of using transform_multithreaded. It reads in Autzen and rotates all the points 90 degrees, + updating the header min/max as well. + """ + # Open a new reader and writer + file_path = os.path.join(DATADIRECTORY, "out", "autzen-rotated.copc.laz") + reader = copc.FileReader(os.path.join(DATADIRECTORY, "autzen-classified.copc.laz")) + writer = copc.FileWriter( + file_path, + reader.copc_config, + ) + + # Set the center of rotation to the minimum XYZ + min = reader.copc_config.las_header.min + rotation_center = np.array([min.x, min.y, min.z]) + with tqdm() as progress: + # Do the transformation, passing the rotation_center as a parameter to _rotate_transform + transform_multithreaded( + reader=reader, + writer=writer, + transform_function=_rotate_transform, + transform_function_args=dict(rotation_center=rotation_center), + progress=progress, + update_minmax=True, + ) + + writer.Close() + + # validate + new_reader = copc.FileReader(file_path) + for node in reader.GetAllNodes(): + assert node.IsValid() + new_node = new_reader.FindNode(node.key) + assert new_node.IsValid() + assert new_node.key == node.key + assert new_node.point_count == node.point_count + + +def copy_file_mp(): + """An example of the default behavior of transform_multithreaded, + which copies the points directly over to a new file. + """ + # Open the reader and writer + file_path = os.path.join(DATADIRECTORY, "out", "autzen-rotated.copc.laz") + reader = copc.FileReader(os.path.join(DATADIRECTORY, "autzen-copied.copc.laz")) + writer = copc.FileWriter( + file_path, + reader.copc_config, + ) + + # Do the transformation + transform_multithreaded(reader, writer) + writer.Close() + + # validate + new_reader = copc.FileReader(file_path) + for node in reader.GetAllNodes(): + assert node.IsValid() + new_node = new_reader.FindNode(node.key) + assert new_node.IsValid() + assert new_node.key == node.key + assert new_node.point_count == node.point_count + assert new_node.byte_size == node.byte_size + + +if __name__ == "__main__": + rotate_file_mp() + copy_file_mp() diff --git a/python/bindings.cpp b/python/bindings.cpp index 95579bc1..7336ddb4 100644 --- a/python/bindings.cpp +++ b/python/bindings.cpp @@ -382,6 +382,7 @@ PYBIND11_MODULE(_core, m) py::class_(m, "FileReader") .def(py::init()) .def("Close", &FileReader::Close) + .def_property_readonly("path", &FileReader::FilePath) .def("FindNode", &Reader::FindNode, py::arg("key")) .def_property_readonly("copc_config", &Reader::CopcConfig) .def("GetPointData", py::overload_cast(&Reader::GetPointData), py::arg("node")) diff --git a/python/copclib/mp/__init__.py b/python/copclib/mp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/copclib/mp/read.py b/python/copclib/mp/read.py new file mode 100644 index 00000000..6fe9a4f0 --- /dev/null +++ b/python/copclib/mp/read.py @@ -0,0 +1,239 @@ +from .utils import chunks +import copclib as copc +import numpy as np + +import concurrent.futures + +# Initialize each multiprocessing thread with a copy of the copc reader +def init_mp(copc_path): + _read_node.copc_reader = copc.FileReader(copc_path) + + +def read_multithreaded( + reader, + read_function, + read_function_args={}, + nodes=None, + resolution=-1, + progress=None, + completed_callback=None, + chunk_size=1024, + max_workers=None, +): + """Scaffolding for reading COPC files in a multithreaded way to increase performance. + It queues all nodes from either the provided list of nodes or nodes within the given resolution to be processed. + Within the multiprocess, `read_function` is called with `read_function_args` keyword arguments, as well as the + keyword arguments "points", "node", and "reader". + This function should take those parameters and return a pickleable object that represents those points - + for example, a list of XYZ points limited by classification. + Note that whatever is returned from this function must be wrapped in a dictionary, + because the return values get passed as keyword arguments to `callback_function`. + The return value of `read_function`, as well as the currently processed node, is passed back to the main thread, + where `callback_function` is called with the node and the return values of `read_function` as keyword arguments. + This function can aggregate your results as they back from the multithreading. + + Args: + reader (copclib.CopcReader): A copc reader for the file you are reading + read_function (function): A function which takes each input node and its points and + returns a pickleable object as output. + read_function_args (dict, optional): A key/value pair of keyword arguments to pass to the read_function. Defaults to {}. + nodes (list[copc.Node], optional): A list of nodes to run the reader on. Defaults to reading all the nodes. + resolution (float, optional): If a list of nodes is not provided, reads all nodes up to this resolution. + Defaults to reading all nodes. + progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None. + completed_callback (function, optional): A function which is called after a node is processed + and returned from multiprocessing. Defaults to None. + chunk_size (int, optional): Limits the amount of nodes which are queued for multiprocessing at once. Defaults to 1024. + max_workers (int, optional): Manually set the number of processors to use when multiprocessing. Defaults to all processors. + + Raises: + RuntimeError + """ + if nodes is not None and resolution > -1: + raise RuntimeError("You can only specify one of: 'nodes', 'resolution'") + + # Sets the nodes to iterate over, if none are provided + if nodes is None: + if resolution > -1: + nodes = reader.GetNodesWithinResolution(resolution) + else: + nodes = reader.GetAllNodes() + + # Reset the progress bar to the new total number of nodes + if progress is not None: + progress.reset(len(nodes)) + + # Initialize the multiprocessing + with concurrent.futures.ProcessPoolExecutor( + max_workers=max_workers, + initializer=init_mp, + initargs=(reader.path,), + ) as executor: + # Chunk the nodes so we're not flooding executor.submit + for chunk in chunks(nodes, chunk_size): + futures = [] + # Call _read_node, which calls the read_function + for node in chunk: + future = executor.submit( + _read_node, + read_function, + read_function_args, + node, + ) + # Update the progress bar, if necessary + if progress is not None: + future.add_done_callback(lambda _: progress.update()) + futures.append(future) + + # As each node completes + for fut in concurrent.futures.as_completed(futures): + node, return_vals = fut.result() + # Call competed_callback if provided + if completed_callback: + completed_callback( + node=node, + **return_vals, + ) + + +def _read_node(read_function, read_function_args, node): + """Helper function which gets called by executor.submit in the multiprocessing. + Calls read_function and returns the results. + """ + reader = _read_node.copc_reader + # Decompress and unpack the points within each thread + points = reader.GetPoints(node) + + # If any of these arguments are provided, they'll throw an error since we use those argument names + for argument_name in read_function_args.keys(): + assert argument_name not in [ + "points", + "node", + "reader", + ], f"Use of protected keyword argument '{argument_name}'!" + # Actually call the read_function + ret = read_function(points=points, node=node, reader=reader, **read_function_args) + assert isinstance( + ret, dict + ), "The read_function return value should be a dictionary of kwargs!" + + return node, ret + + +def _do_read_xyz(points, class_limits=None, **kwargs): + """A 'read_function' which returns a numpy array of XYZ points from the COPC file, + optionally limiting to certain classifications. + + Args: + points (copc.Points): The decompressed and unpacked Points from the given node + class_limits (list[int], optional): Limits the points returned to those whose + classification. Defaults to None. + + Returns: + dict(xyz=np.array): The numpy array of points, with "xyz" as their kwarg key + """ + xyzs = [] + # Iterate over each point in the node and check if it's + # within the provided classificaiton limits + for point in points: + if not class_limits or point.classification in class_limits: + xyzs.append([point.x, point.y, point.z]) + + # Reshape to always be (Nx3), in case there's no points + return dict(xyz=np.array(xyzs).reshape(len(xyzs), 3)) + + +def read_concat_xyz_class_limit( + reader, classification_limits=[], resolution=-1, progress=None, **kwargs +): + """Reads the nodes in a COPC file and returns a concatenated list of XYZ coordinates, + optionally limited by classifications and resolution. + + Args: + reader (copclib.CopcReader): A copc reader for the file you are reading + classification_limits (list[int], optional): Limit the coordinates returned + to those points with a classification inside this list. Defaults to []. + resolution (float, optional): Reads all nodes up to this resolution. Defaults to reading all nodes. + progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None. + + Raises: + RuntimeError + + Returns: + np.array: An (nx3) array of XYZ coordinates. + """ + # We provide these arguments within this function, so the user isn't able to provide them. + invalid_args = ["filter_function", "filter_function_args", "completed_callback"] + for invalid_arg in invalid_args: + if invalid_arg in kwargs: + raise RuntimeError(f"Invalid kwarg '{invalid_arg}'!") + + # Container of all XYZ points + all_xyz = [] + + # After each node is done, add the array of that node's XYZ coordinates + # to our container + def callback(xyz, **kwargs): + all_xyz.append(xyz) + + read_multithreaded( + reader=reader, + read_function=_do_read_xyz, + read_function_args=dict(class_limits=classification_limits), + completed_callback=callback, + resolution=resolution, + progress=progress, + **kwargs, + ) + + # Concatenate all the points in the end, and return one large array of + # all the points in the file + return np.concatenate(all_xyz) + + +def read_map_xyz_class_limit( + reader, classification_limits=[], resolution=-1, progress=None, **kwargs +): + """Reads the nodes in a COPC file and returns a dictionary which maps stringified node keys to + their respective XYZ coordinates, optionally limited by classifications and resolution. + If a node has no points matching the constraints, it won't be added to this dictionary. + + Args: + reader (copclib.CopcReader): A copc reader for the file you are reading + classification_limits (list[int], optional): Limit the coordinates returned + to those points with a classification inside this list. Defaults to []. + resolution (float, optional): Reads all nodes up to this resolution. Defaults to reading all nodes. + progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None. + + Raises: + RuntimeError + + Returns: + dict[str: np.array]: A mapping of stringified COPC keys to an (nx3) array of XYZ coordinates. + """ + # We provide these arguments within this function, so the user isn't able to provide them. + invalid_args = ["filter_function", "filter_function_args", "completed_callback"] + for invalid_arg in invalid_args: + if invalid_arg in kwargs: + raise RuntimeError(f"Invalid kwarg '{invalid_arg}'!") + + # Map of stringified VoxelKeys to numpy arrays of coordinates + key_xyz_map = {} + + # After each node is done processing, add the returned coordinates + # to the map + def callback(xyz, node, **kwargs): + if len(xyz) > 0: + key_xyz_map[str(node.key)] = xyz + + read_multithreaded( + reader, + read_function=_do_read_xyz, + read_function_args=dict(class_limits=classification_limits), + completed_callback=callback, + resolution=resolution, + progress=progress, + **kwargs, + ) + + return key_xyz_map diff --git a/python/copclib/mp/transform.py b/python/copclib/mp/transform.py new file mode 100644 index 00000000..30ace756 --- /dev/null +++ b/python/copclib/mp/transform.py @@ -0,0 +1,199 @@ +from .utils import chunks +import copclib as copc + +import concurrent.futures + + +def _copy_points_transform(points, **kwargs): + """A default transform_function which simply copies the points directly over.""" + return points + + +# Initialize each multiprocessing thread with a copy of the copc reader +def init_mp(copc_path): + _transform_node.copc_reader = copc.FileReader(copc_path) + + +def transform_multithreaded( + reader, + writer, + transform_function=None, + transform_function_args={}, + nodes=None, + resolution=-1, + progress=None, + completed_callback=None, + chunk_size=1024, + max_workers=None, + update_minmax=False, +): + """Scaffolding for reading COPC files and writing them back out in a multithreaded way. + It queues all nodes from either the provided list of nodes or nodes within the given resolution to be processed. + Within the multiprocess, `transform_function` is called with `transform_function_args` keyword arguments, as well as the + keyword arguments "points", "node", and "reader". + This function should take those parameters and return a copclib.Points object, and optionally a dictionary of + return values which will be passed to the callback function. + The points returned by the transform_function are passed back to the main thread, where those points + are written out to the `writer`. + Optionally, the `completed_callback` is called in the main thread with the dictionary of keyword arguments returned from + the `transform_function` as arguments. This allows tracking values from the points and bringing them back to the main + thread for further processing if needed (for example, finding the maximum intensity value that gets written). + Optionally, the header of the LAS file is updated with the new XYZ extents. + + Args: + reader (copclib.CopcReader): A copc reader for the file you are reading + writer (copclib.CopcWriter): A copc writer for the output file. + transform_function (function, optional): A function which modifies the input points in some way before getting written. + Defaults to None. + transform_function_args (dict, optional): A key/value pair of keyword arguments that get passed to `transform_function`. + Defaults to {}. + nodes (list[copc.Node], optional): A list of nodes to run the reader on. Defaults to reading all the nodes. + resolution (float, optional): If a list of nodes is not provided, reads all nodes up to this resolution. + Defaults to reading all nodes. + progress (tqdm.tqdm, optional): A TQDM progress bar to track progress. Defaults to None. + completed_callback (function, optional): A function which is called after a node is processed + and returned from multiprocessing. Defaults to None. + chunk_size (int, optional): Limits the amount of nodes which are queued for multiprocessing at once. Defaults to 1024. + max_workers (int, optional): Manually set the number of processors to use when multiprocessing. Defaults to all processors. + update_minmax (bool, optional): If true, updates the header of the new COPC file with the correct XYZ min/max. + Defaults to False. + + Raises: + RuntimeError + """ + if nodes is not None and resolution > -1: + raise RuntimeError("You can only specify one of: 'nodes', 'resolution'") + + # Sets the nodes to iterate over, if none are provided + if nodes is None: + if resolution > -1: + nodes = reader.GetNodesWithinResolution(resolution) + else: + nodes = reader.GetAllNodes() + + # Reset the progress bar to the new total number of nodes + if progress is not None: + progress.reset(len(nodes)) + + # Copy the points as a default + if transform_function is None: + transform_function = _copy_points_transform + + # keep track of all the mins and maxs + all_mins = [] + all_maxs = [] + # Initialize the multiprocessing + with concurrent.futures.ProcessPoolExecutor( + max_workers=max_workers, + initializer=init_mp, + initargs=(reader.path,), + ) as executor: + # Chunk the nodes so we're not flooding executor.submit + for chunk in chunks(nodes, chunk_size): + futures = [] + for node in chunk: + # Call _transform_node, which calls the transform_function + future = executor.submit( + _transform_node, + transform_function, + transform_function_args, + node, + writer.copc_config.las_header, + update_minmax, + ) + # Update the progress bar, if necessary + if progress is not None: + future.add_done_callback(lambda _: progress.update()) + futures.append(future) + + # As each node completes + for fut in concurrent.futures.as_completed(futures): + ( + compressed_points, + node, + point_count, + xyz_min, + xyz_max, + return_vals, + ) = fut.result() + # Call competed_callback if provided + if completed_callback: + completed_callback( + node=node, + point_count=point_count, + compressed_points=compressed_points, + **return_vals, + ) + + # Add the min/max of each node to the list + all_mins.append(xyz_min) + all_maxs.append(xyz_max) + + # Write the node out + writer.AddNodeCompressed( + node.key, + compressed_points, + point_count, + node.page_key, + ) + + # Compute the global min/max of all the nodes and update the LAS header, if necessary + if update_minmax: + import numpy as np + + global_min = np.min(all_mins, axis=0) + global_max = np.max(all_maxs, axis=0) + writer.copc_config.las_header.min = list(global_min) + writer.copc_config.las_header.max = list(global_max) + + +def _transform_node( + transform_function, transform_function_args, node, writer_header, update_minmax +): + """Helper function that gets called by executor.submit in the multiprocess. + Calls transform_function and keeps track of the min/max XYZ in case they need to be updated. + """ + reader = _transform_node.copc_reader + # Decompress and unpack the points within each thread + points = reader.GetPoints(node) + + # If any of these arguments are provided, they'll throw an error since we use those argument names + for argument_name in transform_function_args.keys(): + assert argument_name not in [ + "points", + "node", + "writer_header", + "reader", + ], f"Use of protected keyword argument '{argument_name}'!" + # Actually call the transform_function + ret = transform_function( + points=points, + node=node, + writer_header=writer_header, + reader=reader, + **transform_function_args, + ) + # The transform_function can either return just the points, or the points plus other return values encoded in a dictionary + if isinstance(ret, tuple): + assert ( + len(ret) == 2 + ), "The transform_function return value should be a tuple of the points and a dictionary of kwargs!" + points, return_vals = ret + assert isinstance( + return_vals, dict + ), "The transform_function return value should be a tuple of the points and a dictionary of kwargs!" + else: + points = ret + return_vals = {} + + # compute the minimum and maximum of the node's points, if necessary + xyz_min = None + xyz_max = None + if update_minmax: + xyz_min = [min(points.x), min(points.y), min(points.z)] + xyz_max = [max(points.x), max(points.y), max(points.z)] + + point_count = len(points) + # Repack and compress the points using the new writer header + compressed_points = copc.CompressBytes(points.Pack(writer_header), writer_header) + return compressed_points, node, point_count, xyz_min, xyz_max, return_vals diff --git a/python/copclib/mp/utils.py b/python/copclib/mp/utils.py new file mode 100644 index 00000000..71cf7461 --- /dev/null +++ b/python/copclib/mp/utils.py @@ -0,0 +1,4 @@ +def chunks(lst, n): + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(lst), n): + yield lst[i : i + n] diff --git a/setup.py b/setup.py index 57a5e396..6249920c 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ long_description=readme, long_description_content_type="text/markdown", url="https://github.com/RockRobotic/copc-lib", - extras_require={"test": ["pytest"]}, + extras_require={"test": ["pytest", "numpy"]}, cmake_args=["-DWITH_PYTHON:BOOL=ON", "-DWITH_TESTS=OFF", "-DEMSCRIPTEN=OFF"], cmake_install_dir="python/copclib", packages=find_packages(where="python"), diff --git a/test/mp_read_test.py b/test/mp_read_test.py new file mode 100644 index 00000000..be4f04d0 --- /dev/null +++ b/test/mp_read_test.py @@ -0,0 +1,67 @@ +import sys + +from .utils import generate_test_file + +import copclib as copc +from copclib.mp.read import read_concat_xyz_class_limit, read_map_xyz_class_limit +import pytest +import numpy as np + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7") +def test_xyz_noclass_limit(): + reader = copc.FileReader(generate_test_file()) + xyz = read_concat_xyz_class_limit(reader) + + assert len(xyz) == reader.copc_config.las_header.point_count + + +# @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7") +# def test_xyz_class_limit(): +# classifications_to_test = [0, 1, 2] + +# reader = copc.FileReader(generate_test_file()) + +# for classification in classifications_to_test: +# xyz = read_concat_xyz_class_limit(reader, [classification]) + +# las = laspy.read(generate_test_file()) +# las_points = las.points[las.classification == classification] +# assert len(xyz) == len(las_points) +# print(len(xyz), len(las_points)) + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7") +def test_xyz_map_noclass_limit(): + reader = copc.FileReader(generate_test_file()) + key_xyz_map = read_map_xyz_class_limit(reader) + + for node in reader.GetAllNodes(): + points = reader.GetPoints(node) + xyz_real = np.stack([points.x, points.y, points.z], axis=1) + xyz_test = key_xyz_map[str(node.key)] + np.testing.assert_allclose(xyz_real, xyz_test) + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7") +def test_xyz_map_class_limit(): + classification_limits = [0, 1, 2] + + reader = copc.FileReader(generate_test_file()) + + key_xyz_map = read_map_xyz_class_limit( + reader, classification_limits=classification_limits + ) + + for node in reader.GetAllNodes(): + + points = reader.GetPoints(node) + point_limit = [p for p in points if p.classification in classification_limits] + if len(point_limit) == 0: + assert str(node.key) not in key_xyz_map + else: + xyz_test = key_xyz_map[str(node.key)] + points = copc.Points(point_limit) + xyz_real = np.stack([points.x, points.y, points.z], axis=1) + print(xyz_real.shape, xyz_test.shape) + np.testing.assert_allclose(xyz_real, xyz_test) diff --git a/test/mp_transform_test.py b/test/mp_transform_test.py new file mode 100644 index 00000000..7910fba9 --- /dev/null +++ b/test/mp_transform_test.py @@ -0,0 +1,85 @@ +import os +import sys + +from .utils import generate_test_file, get_data_dir + +import copclib as copc +from copclib.mp.transform import transform_multithreaded +import numpy as np +import pytest + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7") +def test_copc_copy(): + file_path = os.path.join(get_data_dir(), "writer_test.copc.laz") + reader = copc.FileReader(generate_test_file()) + writer = copc.FileWriter( + file_path, + reader.copc_config, + ) + + transform_multithreaded(reader, writer) + writer.Close() + + # validate + new_reader = copc.FileReader(file_path) + for node in reader.GetAllNodes(): + assert node.IsValid() + new_node = new_reader.FindNode(node.key) + assert new_node.IsValid() + assert new_node.key == node.key + assert new_node.point_count == node.point_count + assert new_node.byte_size == node.byte_size + assert new_reader.GetPointDataCompressed( + new_node + ) == reader.GetPointDataCompressed(node) + + +def _transform_fun(offset, points, **kwargs): + xyz = np.stack([points.x, points.y, points.z], axis=1) + xyz += offset + + points.x = xyz[:, 0] + points.y = xyz[:, 1] + points.z = xyz[:, 2] + + return points + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7") +def test_translate(): + file_path = os.path.join(get_data_dir(), "writer_test.copc.laz") + reader = copc.FileReader(generate_test_file()) + writer = copc.FileWriter( + file_path, + reader.copc_config, + ) + + offset = [50, 30, 20] + + transform_multithreaded( + reader, + writer, + _transform_fun, + transform_function_args=dict(offset=offset), + update_minmax=True, + ) + writer.Close() + + # validate + new_reader = copc.FileReader(file_path) + + old_header_min = reader.copc_config.las_header.min + old_header_min = np.array([old_header_min.x, old_header_min.y, old_header_min.z]) + new_header_min = new_reader.copc_config.las_header.min + new_header_min = np.array([new_header_min.x, new_header_min.y, new_header_min.z]) + + np.testing.assert_allclose(old_header_min + offset, new_header_min, atol=0.01) + + for node in reader.GetAllNodes(): + assert node.IsValid() + new_node = new_reader.FindNode(node.key) + assert new_node.IsValid() + assert new_node.key == node.key + assert new_node.point_count == node.point_count + assert new_node.byte_size == node.byte_size diff --git a/test/utils.py b/test/utils.py index 21104732..193095a0 100644 --- a/test/utils.py +++ b/test/utils.py @@ -1,5 +1,7 @@ import os -import sys +import random +import copclib as copc +import numpy as np DATADIRECTORY = os.path.join(os.path.dirname(__file__), "data") @@ -10,3 +12,107 @@ def get_autzen_file(): def get_data_dir(): return DATADIRECTORY + + +def generate_test_file(): + """Generates a test file of random points. Taken from example_writer.py. + + Returns: + str: Path to the generated test file + """ + file_path = os.path.join(DATADIRECTORY, "new-copc.copc.laz") + if os.path.exists(file_path): + return file_path + + MIN_BOUNDS = copc.Vector3(-2000, -5000, 20) # Argument Constructor + MAX_BOUNDS = copc.Vector3([5000, 1034, 125]) # List Constructor + NUM_POINTS = 3000 + + mins = [] + maxs = [] + + # This function will generate `NUM_POINTS` random points within the voxel bounds + def RandomPoints(key, las_header, number_points): + + # Voxel cube dimensions will be calculated from the maximum spacing of the file + span = max( + { + las_header.max.x - las_header.min.x, + las_header.max.y - las_header.min.y, + las_header.max.z - las_header.min.z, + } + ) + # Step size accounts for depth level + step = span / pow(2, key.d) + + minx = las_header.min.x + (step * key.x) + miny = las_header.min.y + (step * key.y) + minz = las_header.min.z + (step * key.z) + + # Create a Points object based on the LAS header + points = copc.Points(las_header) + + # Populate the points + for i in range(number_points): + # Create a point with a given point format + point = points.CreatePoint() + # point has getters/setters for all attributes + point.x = random.uniform( + max(las_header.min.x, minx), + min(las_header.max.x, minx + step), + ) + point.y = random.uniform( + max(las_header.min.y, miny), + min(las_header.max.y, miny + step), + ) + point.z = random.uniform( + max(las_header.min.z, minz), + min(las_header.max.z, minz + step), + ) + point.classification = random.randint(0, 10) + + # For visualization purposes + point.point_source_id = key.d + key.x + key.y + key.z + + points.AddPoint(point) + + mins.append([min(points.x), min(points.y), min(points.z)]) + maxs.append([max(points.x), max(points.y), max(points.z)]) + return points + + cfg = copc.CopcConfigWriter( + 8, [0.01, 0.01, 0.01], [50, 50, 50], "TEST_WKT", has_extended_stats=True + ) + cfg.las_header.min = MIN_BOUNDS + cfg.las_header.max = MAX_BOUNDS + + cfg.copc_info.spacing = 10 + + # Now, we can create our COPC writer: + writer = copc.FileWriter(file_path, cfg) + header = writer.copc_config.las_header + + key = copc.VoxelKey(0, 0, 0, 0) + points = RandomPoints(key, header, NUM_POINTS) + writer.AddNode(key, points) + + page_key = copc.VoxelKey(1, 1, 1, 0) + key = copc.VoxelKey(1, 1, 1, 0) + points = RandomPoints(key, header, NUM_POINTS) + writer.AddNode(key, points, page_key) + + key = copc.VoxelKey(2, 2, 2, 0) + points = RandomPoints(key, header, NUM_POINTS) + writer.AddNode(key, points, page_key) + + sub_page_key = copc.VoxelKey(3, 4, 4, 0) + points = RandomPoints(sub_page_key, header, NUM_POINTS) + writer.AddNode(sub_page_key, points, sub_page_key) + + # Reduce the header min/max to the actual point boundaries, + # since we might not actually produce a point at the header min/max + writer.copc_config.las_header.min = np.min(mins, axis=0).tolist() + writer.copc_config.las_header.max = np.max(maxs, axis=0).tolist() + + writer.Close() + return file_path