From f7a5ba060f40c0912c6f84220f5c5e0c24d805d5 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Thu, 19 Sep 2024 21:04:48 +0100 Subject: [PATCH 01/16] pipeline transform --- .../src/data_processing/runtime/__init__.py | 1 + .../runtime/base_transform_runtime.py | 30 ++++ .../pure_python/runtime_configuration.py | 9 +- .../runtime/pure_python/transform_runtime.py | 5 +- .../runtime/runtime_configuration.py | 15 +- .../runtime/transform_file_processor.py | 8 +- .../runtime/transform_launcher.py | 1 + .../transform/pipeline_transform.py | 157 ++++++++++++++++++ .../data_processing/utils/transform_utils.py | 2 +- .../runtime/ray/runtime_configuration.py | 10 +- .../runtime/ray/transform_runtime.py | 7 +- .../runtime/spark/transform_file_processor.py | 2 +- .../runtime/spark/transform_runtime.py | 16 +- .../spark/src/doc_id_transform_spark.py | 32 +--- 14 files changed, 230 insertions(+), 65 deletions(-) create mode 100644 data-processing-lib/python/src/data_processing/runtime/base_transform_runtime.py create mode 100644 data-processing-lib/python/src/data_processing/transform/pipeline_transform.py diff --git a/data-processing-lib/python/src/data_processing/runtime/__init__.py b/data-processing-lib/python/src/data_processing/runtime/__init__.py index 7fb42a33a..7ddf4f60b 100644 --- a/data-processing-lib/python/src/data_processing/runtime/__init__.py +++ b/data-processing-lib/python/src/data_processing/runtime/__init__.py @@ -1,3 +1,4 @@ +from data_processing.runtime.base_transform_runtime import BaseTransformRuntime from data_processing.runtime.execution_configuration import TransformExecutionConfiguration, runtime_cli_prefix from data_processing.runtime.runtime_configuration import TransformRuntimeConfiguration from data_processing.runtime.transform_launcher import AbstractTransformLauncher, multi_launcher diff --git a/data-processing-lib/python/src/data_processing/runtime/base_transform_runtime.py b/data-processing-lib/python/src/data_processing/runtime/base_transform_runtime.py new file mode 100644 index 000000000..dc9575219 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/runtime/base_transform_runtime.py @@ -0,0 +1,30 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any + +from data_processing.data_access import DataAccessFactoryBase +from data_processing.transform import TransformStatistics + + +class BaseTransformRuntime: + """ + Base Transformer runtime used by processor to to create Transform specific environment + Every Runtime defines specific implementation of this class + """ + + def __init__(self, params: dict[str, Any]): + """ + Create/config this runtime. + :param params: parameters, often provided by the CLI arguments as defined by a TableTansformConfiguration. + """ + self.params = params diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/runtime_configuration.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/runtime_configuration.py index 10f9bcf27..be0101174 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/runtime_configuration.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/runtime_configuration.py @@ -26,12 +26,5 @@ def __init__( :param transform_config - base configuration class :param runtime_class: implementation of the transform runtime """ - self.runtime_class = runtime_class - super().__init__(transform_config=transform_config) + super().__init__(transform_config=transform_config, runtime_class=runtime_class) - def create_transform_runtime(self) -> DefaultPythonTransformRuntime: - """ - Create transform runtime with the parameters captured during apply_input_params() - :return: transform runtime object - """ - return self.runtime_class(self.transform_config.get_transform_params()) diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py index 478d40837..740362b29 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py @@ -14,9 +14,10 @@ from data_processing.data_access import DataAccessFactoryBase, DataAccess from data_processing.transform import TransformStatistics +from data_processing.runtime import BaseTransformRuntime -class DefaultPythonTransformRuntime: +class DefaultPythonTransformRuntime(BaseTransformRuntime): """ Transformer runtime used by processor to to create Transform specific environment """ @@ -26,7 +27,7 @@ def __init__(self, params: dict[str, Any]): Create/config this runtime. :param params: parameters, often provided by the CLI arguments as defined by a TableTansformConfiguration. """ - self.params = params + super().__init__(params) def get_folders(self, data_access: DataAccess) -> list[str]: """ diff --git a/data-processing-lib/python/src/data_processing/runtime/runtime_configuration.py b/data-processing-lib/python/src/data_processing/runtime/runtime_configuration.py index ef85d1363..3b6d16e9b 100644 --- a/data-processing-lib/python/src/data_processing/runtime/runtime_configuration.py +++ b/data-processing-lib/python/src/data_processing/runtime/runtime_configuration.py @@ -15,15 +15,21 @@ from data_processing.transform import AbstractBinaryTransform, TransformConfiguration from data_processing.utils import CLIArgumentProvider +from data_processing.runtime import BaseTransformRuntime class TransformRuntimeConfiguration(CLIArgumentProvider): - def __init__(self, transform_config: TransformConfiguration): + def __init__(self, + transform_config: TransformConfiguration, + runtime_class: type[BaseTransformRuntime] + ): """ Initialization :param transform_config - base configuration class + :param runtime_class - base runtime class """ self.transform_config = transform_config + self.runtime_class = runtime_class def add_input_params(self, parser: ArgumentParser) -> None: self.transform_config.add_input_params(parser) @@ -62,3 +68,10 @@ def get_transform_params(self) -> dict[str, Any]: :return: transform parameters """ return self.transform_config.get_transform_params() + + def create_transform_runtime(self) -> BaseTransformRuntime: + """ + Create transform runtime with the parameters captured during apply_input_params() + :return: transform runtime object + """ + return self.runtime_class(self.transform_config.get_transform_params()) diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py index 4075f40be..a1084d769 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py @@ -103,10 +103,10 @@ def flush(self) -> None: the hook for them to return back locally stored data and their statistics. :return: None """ - if self.last_file_name is None or self.is_folder: + if self.last_file_name is None: # for some reason a given worker never processed anything. Happens in testing # when the amount of workers is greater than the amount of files - self.logger.debug("skipping flush, no name for file is defined or this is a folder transform") + self.logger.debug("skipping flush, no name for file is defined") return try: t_start = time.time() @@ -226,7 +226,7 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats def _publish_stats(self, stats: dict[str, Any]) -> None: """ Publishing execution statistics - :param stats: Statistics + :param stats: dictionary :return: None """ - raise ValueError("must be implemented by subclass") + raise NotImplemented("must be implemented by subclass") diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py b/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py index 4c3abbd83..becb4b6c3 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py @@ -36,6 +36,7 @@ def __init__( self.runtime_config = runtime_config self.name = self.runtime_config.get_name() self.data_access_factory = data_access_factory + self.execution_config = None def _get_parser(self) -> argparse.ArgumentParser: """ diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py new file mode 100644 index 000000000..f4fca7489 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -0,0 +1,157 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +from data_processing.transform import AbstractBinaryTransform +from data_processing.runtime import TransformRuntimeConfiguration, BaseTransformRuntime +from data_processing.utils import get_logger, UnrecoverableException, TransformUtils + + +class PipelineTransformBase(AbstractBinaryTransform): + """ + Transform that executes a set of base transforms sequentially. Data is passed between + participating transforms in memory + """ + + def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConfiguration]): + """ + Initializes pipeline execution for the list of transforms + :param config - configuration parameters + :param transforms - list of transforms in the pipeline. Note that transforms will + be executed + """ + super().__init__(config) + self.logger = get_logger(__name__) + if len(transforms) == 0: + # Empty pipeline + self.logger.error("Pipeline transform with empty list") + raise UnrecoverableException("Pipeline transform with empty list") + self.data_access_factory = config.get("data_access_factory", None) + if self.data_access_factory is None: + self.logger.error("pipeline transform - Data access factory is not defined") + raise UnrecoverableException("pipeline transform - Data access factory is not defined") + self.statistics = config.get("statistics", None) + if self.statistics is None: + self.logger.error("pipeline transform - Statistics is not defined") + raise UnrecoverableException("pipeline transform - Statistics is not defined") + participants = [] + # for every transform in the pipeline + for transform in transforms: + # create runtime + runtime = transform.create_transform_runtime() + # get parameters + transform_params = self._get_transform_params(runtime) + # Create transform + tr = transform.get_transform_class()(transform_params) + participants.append((tr, runtime)) + # save participating transforms + self.transforms = participants + + def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: + """ + get transform parameters + :param runtime - runtime + :return: transform params + """ + raise NotImplemented("must be implemented by subclass") + + def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: + """ + Converts input file into o or more output files. + If there is an error, an exception must be raised - exit()ing is not generally allowed. + :param byte_array: contents of the input file to be transformed. + :param file_name: the name of the file containing the given byte_array. + :return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated + to metadata. Each element of the return list, is a tuple of the transformed bytes and a string + holding the extension to be used when writing out the new bytes. + """ + # process transforms sequentially + data = [(byte_array, file_name)] + stats = {} + for transform, _ in self.transforms: + data, st = self._process_transform(transform=transform, data=data) + # Accumulate stats + stats |= st + if len(data) == 0: + # no data returned by this transform + return [], stats + # all done + return data, stats + + @staticmethod + def _process_transform(transform: AbstractBinaryTransform, data: list[tuple[bytes, str]] + ) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: + """ + Process individual transform. Note here that the predecessor could create multiple data objects + :param transform - transform + :param data - data to process + :return: + """ + stats = {} + res = [] + for dt in data: + # for every data element + src = TransformUtils.get_file_extension(dt[1]) + out_files, st = transform.transform_binary(byte_array=dt[0], file_name=dt[1]) + # Accumulate results + for ouf in out_files: + res.append((ouf[0], src[0] + ouf[1])) + # accumulate statistics + stats |= st + return res, stats + + def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: + """ + This is supporting method for transformers, that implement buffering of data, for example coalesce. + These transformers can have buffers containing data that were not written to the output immediately. + Flush is the hook for them to return back locally stored data and their statistics. + The majority of transformers are expected not to use such buffering and can use this default implementation. + If there is an error, an exception must be raised - exit()ing is not generally allowed. + :return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated + to metadata. Each element of the return list, is a tuple of the transformed bytes and a string + holding the extension to be used when writing out the new bytes. + """ + stats = {} + res = [] + i = 0 + for transform, _ in self.transforms: + out_files, st = transform.flush_binary() + # accumulate statistics + stats |= st + if len(out_files) > 0 and i < len(self.transforms) - 1: + # flush produced output - run it through the rest of the chain + data = [] + for ouf in out_files: + data.append((ouf[0], f"file{ouf[1]}")) + for n in range(i + 1, len(self.transforms)): + data, st = self._process_transform(transform=self.transforms[n][0], data=data) + # Accumulate stats + stats |= st + if len(data) == 0: + # no data returned by this transform + break + res += data + else: + res += out_files + # Done flushing, compute execution stats + for _, runtime in self.transforms: + self._compute_execution_stats(runtime=runtime, st=stats) + return res, {} + + def _compute_execution_stats(self, runtime: BaseTransformRuntime, st: dict[str, Any]) -> dict[str, Any]: + """ + get transform parameters + :param runtime - runtime + :param st - statistics + :return: + """ + raise NotImplemented("must be implemented by subclass") diff --git a/data-processing-lib/python/src/data_processing/utils/transform_utils.py b/data-processing-lib/python/src/data_processing/utils/transform_utils.py index e2d37581c..adfe00afd 100644 --- a/data-processing-lib/python/src/data_processing/utils/transform_utils.py +++ b/data-processing-lib/python/src/data_processing/utils/transform_utils.py @@ -96,7 +96,7 @@ def get_file_extension(file_path) -> list[str]: """ Get the file's root and extension from the given file path. :param file_path : The path of the file. - :return: str: The file extension including the dot ('.') if present, otherwise an empty string. + :return: str: The file name and extension """ return os.path.splitext(file_path) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/runtime_configuration.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/runtime_configuration.py index 321937dd4..11f83780f 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/runtime_configuration.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/runtime_configuration.py @@ -26,12 +26,4 @@ def __init__( :param transform_config - base configuration class :param runtime_class: implementation of the transform runtime """ - super().__init__(transform_config=transform_config) - self.runtime_class = runtime_class - - def create_transform_runtime(self) -> DefaultRayTransformRuntime: - """ - Create transform runtime with the parameters captured during apply_input_params() - :return: transform runtime object - """ - return self.runtime_class(self.transform_config.get_transform_params()) + super().__init__(transform_config=transform_config, runtime_class=runtime_class) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py index 64479302c..5f2b90a3a 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py @@ -12,11 +12,12 @@ from typing import Any -from data_processing.data_access import DataAccessFactoryBase, DataAccess +from data_processing.data_access import DataAccessFactoryBase +from data_processing.runtime import BaseTransformRuntime from ray.actor import ActorHandle -class DefaultRayTransformRuntime: +class DefaultRayTransformRuntime(BaseTransformRuntime): """ Transformer runtime used by processor to to create Transform specific environment """ @@ -26,7 +27,7 @@ def __init__(self, params: dict[str, Any]): Create/config this runtime. :param params: parameters, often provided by the CLI arguments as defined by a TableTansformConfiguration. """ - self.params = params + super().__init__(params) def get_folders(self, data_access: DataAccess) -> list[str]: """ diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py index a0968ab1d..c3abec626 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py @@ -39,7 +39,7 @@ def __init__( transform_parameters=runtime_configuration.get_transform_params(), is_folder=is_folder, ) - # Add data access ant statistics to the processor parameters + # Add statistics to the processor parameters self.runtime_configuration = runtime_configuration self.transform = None # set up statistics diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py index 7410d09d1..887af5c36 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py @@ -14,9 +14,10 @@ from data_processing.data_access import DataAccessFactoryBase, DataAccess from data_processing.transform import TransformStatistics +from data_processing.runtime import BaseTransformRuntime -class DefaultSparkTransformRuntime: +class DefaultSparkTransformRuntime(BaseTransformRuntime): """ Transformer runtime used by processor to to create Transform specific environment """ @@ -24,9 +25,9 @@ class DefaultSparkTransformRuntime: def __init__(self, params: dict[str, Any]): """ Create/config this runtime. - :param params: parameters, often provided by the CLI arguments as defined by a TableTansformConfiguration. + :param params: parameters, often provided by the CLI arguments as defined by a Transform Configuration. """ - self.params = params + super().__init__(params) def get_folders(self, data_access: DataAccess) -> list[str]: """ @@ -42,8 +43,10 @@ def get_transform_config( """ Get the dictionary of configuration that will be provided to the transform's initializer. This is the opportunity for this runtime to create a new set of configuration based on the - config/params provided to this instance's initializer. - :param partition - the partition assigned to this worker, needed by transforms like doc_id + config/params provided to this instance's initializer. This may include the addition + of new configuration data such as ray shared memory, new actors, etc., that might be needed and + expected by the transform in its initializer and/or transform() methods. + :param partition - Spark partition :param data_access_factory - data access factory class being used by the RayOrchestrator. :param statistics - reference to statistics actor :return: dictionary of transform init params @@ -66,5 +69,6 @@ def compute_execution_stats(self, stats: TransformStatistics) -> None: This method does not return a value; the job execution statistics are generally reported as metadata by the Spark Orchestrator. :param stats: output of statistics as aggregated across all calls to all transforms. + :return: job execution statistics. These are generally reported as metadata by the Ray Orchestrator. """ - pass + return diff --git a/transforms/universal/doc_id/spark/src/doc_id_transform_spark.py b/transforms/universal/doc_id/spark/src/doc_id_transform_spark.py index beeb77ce5..7a01b370d 100644 --- a/transforms/universal/doc_id/spark/src/doc_id_transform_spark.py +++ b/transforms/universal/doc_id/spark/src/doc_id_transform_spark.py @@ -15,11 +15,9 @@ import pyarrow as pa from data_processing.transform import AbstractTableTransform, TransformConfiguration -from data_processing.data_access import DataAccessFactoryBase -from data_processing.transform import TransformStatistics from data_processing.utils import CLIArgumentProvider, TransformUtils from data_processing_spark.runtime.spark import SparkTransformLauncher -from data_processing_spark.runtime.spark import SparkTransformRuntimeConfiguration, DefaultSparkTransformRuntime +from data_processing_spark.runtime.spark import SparkTransformRuntimeConfiguration short_name = "doc_id" @@ -136,32 +134,6 @@ def apply_input_params(self, args: Namespace) -> bool: return True -class DocIDSparkTransformRuntime(DefaultSparkTransformRuntime): - - def __init__(self, params: dict[str, Any]): - """ - Create/config this runtime. - :param params: parameters, often provided by the CLI arguments as defined by a TableTansformConfiguration. - """ - super().__init__(params) - - def get_transform_config( - self, partition: int, data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics - ) -> dict[str, Any]: - """ - Get the dictionary of configuration that will be provided to the transform's initializer. - This is the opportunity for this runtime to create a new set of configuration based on the - config/params provided to this instance's initializer. This may include the addition - of new configuration data such as ray shared memory, new actors, etc, that might be needed and - expected by the transform in its initializer and/or transform() methods. - :param data_access_factory - data access factory class being used by the RayOrchestrator. - :param statistics - reference to statistics actor - :return: dictionary of transform init params - """ - return self.params | {"partition_index": partition} - - - class DocIDSparkTransformConfiguration(SparkTransformRuntimeConfiguration): """ Implements the SparkTransformConfiguration for NOOP as required by the PythonTransformLauncher. @@ -173,7 +145,7 @@ def __init__(self): """ Initialization """ - super().__init__(transform_config=DocIDTransformConfiguration(), runtime_class=DocIDSparkTransformRuntime) + super().__init__(transform_config=DocIDTransformConfiguration()) if __name__ == "__main__": From 5084185980f95c897e903f018c5f7ebfff425fa8 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Fri, 20 Sep 2024 13:06:25 +0100 Subject: [PATCH 02/16] fixed build --- data-processing-lib/doc/overview.md | 2 +- .../src/data_processing/runtime/__init__.py | 4 +- .../pure_python/runtime_configuration.py | 3 +- .../runtime/pure_python/transform_runtime.py | 5 +- .../runtime/transform_launcher.py | 2 +- .../src/data_processing/transform/__init__.py | 3 ++ .../base_transform_runtime.py | 3 -- .../transform/pipeline_transform.py | 24 ++++----- .../transform/pure_python/__init__.py | 1 + .../pure_python/pipeline_transform.py | 51 ++++++++++++++++++ .../runtime_configuration.py | 3 +- .../input/{ => s3_support}/sample1.parquet | Bin .../data_access/data_access_s3_test.py | 2 +- 13 files changed, 75 insertions(+), 28 deletions(-) rename data-processing-lib/python/src/data_processing/{runtime => transform}/base_transform_runtime.py (90%) create mode 100644 data-processing-lib/python/src/data_processing/transform/pure_python/__init__.py create mode 100644 data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py rename data-processing-lib/python/src/data_processing/{runtime => transform}/runtime_configuration.py (97%) rename data-processing-lib/python/test-data/data_processing/input/{ => s3_support}/sample1.parquet (100%) diff --git a/data-processing-lib/doc/overview.md b/data-processing-lib/doc/overview.md index f4571d5b9..c1bc23104 100644 --- a/data-processing-lib/doc/overview.md +++ b/data-processing-lib/doc/overview.md @@ -32,7 +32,7 @@ runtime interfacee expected to be implemented by each runtime ([python](python-r * [DataAccessFactory](../python/src/data_processing/data_access/data_access_factory_base.py) - is used to configure the input and output data files to be processed and creates the `DataAccess` instance (see below) according to the CLI parameters. -* [TransformRuntimeConfiguration](../python/src/data_processing/runtime/runtime_configuration.py) - captures +* [TransformRuntimeConfiguration](../python/src/data_processing/transform/runtime_configuration.py) - captures the `TransformConfiguration` and runtime-specific configuration. * [DataAccess](../python/src/data_processing/data_access/data_access.py) - is the interface defining data i/o methods and selection. Implementations for local diff --git a/data-processing-lib/python/src/data_processing/runtime/__init__.py b/data-processing-lib/python/src/data_processing/runtime/__init__.py index 7ddf4f60b..88a8bf10b 100644 --- a/data-processing-lib/python/src/data_processing/runtime/__init__.py +++ b/data-processing-lib/python/src/data_processing/runtime/__init__.py @@ -1,5 +1,3 @@ -from data_processing.runtime.base_transform_runtime import BaseTransformRuntime from data_processing.runtime.execution_configuration import TransformExecutionConfiguration, runtime_cli_prefix -from data_processing.runtime.runtime_configuration import TransformRuntimeConfiguration -from data_processing.runtime.transform_launcher import AbstractTransformLauncher, multi_launcher from data_processing.runtime.transform_file_processor import AbstractTransformFileProcessor +from data_processing.runtime.transform_launcher import AbstractTransformLauncher, multi_launcher diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/runtime_configuration.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/runtime_configuration.py index be0101174..c34bf607a 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/runtime_configuration.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/runtime_configuration.py @@ -10,9 +10,8 @@ # limitations under the License. ################################################################################ -from data_processing.runtime import TransformRuntimeConfiguration from data_processing.runtime.pure_python import DefaultPythonTransformRuntime -from data_processing.transform import TransformConfiguration +from data_processing.transform import TransformConfiguration, TransformRuntimeConfiguration class PythonTransformRuntimeConfiguration(TransformRuntimeConfiguration): diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py index 740362b29..085859fca 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py @@ -12,9 +12,8 @@ from typing import Any -from data_processing.data_access import DataAccessFactoryBase, DataAccess -from data_processing.transform import TransformStatistics -from data_processing.runtime import BaseTransformRuntime +from data_processing.data_access import DataAccessFactoryBase +from data_processing.transform import TransformStatistics, BaseTransformRuntime class DefaultPythonTransformRuntime(BaseTransformRuntime): diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py b/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py index becb4b6c3..648d48669 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py @@ -15,7 +15,7 @@ import argparse from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase -from data_processing.runtime import TransformRuntimeConfiguration +from data_processing.transform import TransformRuntimeConfiguration from data_processing.utils import ParamsUtils, get_logger diff --git a/data-processing-lib/python/src/data_processing/transform/__init__.py b/data-processing-lib/python/src/data_processing/transform/__init__.py index 20254e47b..df900d21c 100644 --- a/data-processing-lib/python/src/data_processing/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/transform/__init__.py @@ -3,4 +3,7 @@ from data_processing.transform.binary_transform import AbstractBinaryTransform from data_processing.transform.table_transform import AbstractTableTransform from data_processing.transform.transform_statistics import TransformStatistics +from data_processing.transform.base_transform_runtime import BaseTransformRuntime from data_processing.transform.transform_configuration import TransformConfiguration, get_transform_config +from data_processing.transform.runtime_configuration import TransformRuntimeConfiguration +from data_processing.transform.pipeline_transform import AbstractPipelineTransform diff --git a/data-processing-lib/python/src/data_processing/runtime/base_transform_runtime.py b/data-processing-lib/python/src/data_processing/transform/base_transform_runtime.py similarity index 90% rename from data-processing-lib/python/src/data_processing/runtime/base_transform_runtime.py rename to data-processing-lib/python/src/data_processing/transform/base_transform_runtime.py index dc9575219..706c1af2f 100644 --- a/data-processing-lib/python/src/data_processing/runtime/base_transform_runtime.py +++ b/data-processing-lib/python/src/data_processing/transform/base_transform_runtime.py @@ -12,9 +12,6 @@ from typing import Any -from data_processing.data_access import DataAccessFactoryBase -from data_processing.transform import TransformStatistics - class BaseTransformRuntime: """ diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index f4fca7489..34feb050d 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -11,12 +11,11 @@ ################################################################################ from typing import Any -from data_processing.transform import AbstractBinaryTransform -from data_processing.runtime import TransformRuntimeConfiguration, BaseTransformRuntime +from data_processing.transform import AbstractBinaryTransform, BaseTransformRuntime, TransformRuntimeConfiguration from data_processing.utils import get_logger, UnrecoverableException, TransformUtils -class PipelineTransformBase(AbstractBinaryTransform): +class AbstractPipelineTransform(AbstractBinaryTransform): """ Transform that executes a set of base transforms sequentially. Data is passed between participating transforms in memory @@ -43,6 +42,7 @@ def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConf if self.statistics is None: self.logger.error("pipeline transform - Statistics is not defined") raise UnrecoverableException("pipeline transform - Statistics is not defined") + self.transforms = transforms participants = [] # for every transform in the pipeline for transform in transforms: @@ -54,7 +54,7 @@ def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConf tr = transform.get_transform_class()(transform_params) participants.append((tr, runtime)) # save participating transforms - self.transforms = participants + self.participants = participants def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: """ @@ -77,7 +77,7 @@ def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tupl # process transforms sequentially data = [(byte_array, file_name)] stats = {} - for transform, _ in self.transforms: + for transform, _ in self.participants: data, st = self._process_transform(transform=transform, data=data) # Accumulate stats stats |= st @@ -123,17 +123,17 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: stats = {} res = [] i = 0 - for transform, _ in self.transforms: + for transform, _ in self.participants: out_files, st = transform.flush_binary() # accumulate statistics stats |= st - if len(out_files) > 0 and i < len(self.transforms) - 1: + if len(out_files) > 0 and i < len(self.participants) - 1: # flush produced output - run it through the rest of the chain data = [] for ouf in out_files: data.append((ouf[0], f"file{ouf[1]}")) - for n in range(i + 1, len(self.transforms)): - data, st = self._process_transform(transform=self.transforms[n][0], data=data) + for n in range(i + 1, len(self.participants)): + data, st = self._process_transform(transform=self.participants[n][0], data=data) # Accumulate stats stats |= st if len(data) == 0: @@ -143,15 +143,15 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: else: res += out_files # Done flushing, compute execution stats - for _, runtime in self.transforms: + for _, runtime in self.participants: self._compute_execution_stats(runtime=runtime, st=stats) return res, {} - def _compute_execution_stats(self, runtime: BaseTransformRuntime, st: dict[str, Any]) -> dict[str, Any]: + def _compute_execution_stats(self, runtime: BaseTransformRuntime, st: dict[str, Any]) -> None: """ get transform parameters :param runtime - runtime :param st - statistics - :return: + :return: None """ raise NotImplemented("must be implemented by subclass") diff --git a/data-processing-lib/python/src/data_processing/transform/pure_python/__init__.py b/data-processing-lib/python/src/data_processing/transform/pure_python/__init__.py new file mode 100644 index 000000000..9c7653525 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/pure_python/__init__.py @@ -0,0 +1 @@ +from data_processing.transform.pure_python.pipeline_transform import PythonPipelineTransform diff --git a/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py new file mode 100644 index 000000000..e798e6a36 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py @@ -0,0 +1,51 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +from data_processing.transform import AbstractPipelineTransform +from data_processing.runtime import TransformRuntimeConfiguration, BaseTransformRuntime + + +class PythonPipelineTransform(AbstractPipelineTransform): + """ + Transform that executes a set of base transforms sequentially. Data is passed between + participating transforms in memory + """ + + def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConfiguration]): + """ + Initializes pipeline execution for the list of transforms + :param config - configuration parameters + :param transforms - list of transforms in the pipeline. Note that transforms will + be executed + """ + super().__init__(config, transforms) + + def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: + """ + get transform parameters + :param runtime - runtime + :return: transform params + """ + return runtime.get_transform_config(data_access_factory=self.data_access_factory, + statistics=self.statistics, files=[]) + + def _compute_execution_stats(self, runtime: BaseTransformRuntime, st: dict[str, Any]) -> None: + """ + get transform parameters + :param runtime - runtime + :param st - statistics + :return: None + """ + self.statistics.add_stats(st) + runtime.compute_execution_stats(stats=self.statistics) + return diff --git a/data-processing-lib/python/src/data_processing/runtime/runtime_configuration.py b/data-processing-lib/python/src/data_processing/transform/runtime_configuration.py similarity index 97% rename from data-processing-lib/python/src/data_processing/runtime/runtime_configuration.py rename to data-processing-lib/python/src/data_processing/transform/runtime_configuration.py index 3b6d16e9b..75850f4f8 100644 --- a/data-processing-lib/python/src/data_processing/runtime/runtime_configuration.py +++ b/data-processing-lib/python/src/data_processing/transform/runtime_configuration.py @@ -13,9 +13,8 @@ from argparse import ArgumentParser, Namespace from typing import Any -from data_processing.transform import AbstractBinaryTransform, TransformConfiguration +from data_processing.transform import AbstractBinaryTransform, TransformConfiguration, BaseTransformRuntime from data_processing.utils import CLIArgumentProvider -from data_processing.runtime import BaseTransformRuntime class TransformRuntimeConfiguration(CLIArgumentProvider): diff --git a/data-processing-lib/python/test-data/data_processing/input/sample1.parquet b/data-processing-lib/python/test-data/data_processing/input/s3_support/sample1.parquet similarity index 100% rename from data-processing-lib/python/test-data/data_processing/input/sample1.parquet rename to data-processing-lib/python/test-data/data_processing/input/s3_support/sample1.parquet diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_s3_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_s3_test.py index 9cff1f6b0..1a4ffb9b2 100644 --- a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_s3_test.py +++ b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_s3_test.py @@ -33,7 +33,7 @@ def _create_and_populate_bucket(d_a: DataAccessS3, input_location: str, n_files: d_a.arrS3.s3_client.create_bucket(Bucket="test") # upload file loc = os.path.abspath( - os.path.join(os.path.dirname(__file__), "../../../test-data/data_processing/input/sample1.parquet") + os.path.join(os.path.dirname(__file__), "../../../test-data/data_processing/input/s3_support/sample1.parquet") ) with open(loc, "rb") as file: bdata = file.read() From b24f495ad0e174728d5cb2e285cbca0c7e395de8 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Fri, 20 Sep 2024 13:14:37 +0100 Subject: [PATCH 03/16] fixed build --- .../src/data_processing_ray/runtime/ray/transform_runtime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py index 5f2b90a3a..b88ade2d5 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py @@ -13,7 +13,7 @@ from typing import Any from data_processing.data_access import DataAccessFactoryBase -from data_processing.runtime import BaseTransformRuntime +from data_processing.transform import BaseTransformRuntime from ray.actor import ActorHandle From d9159da9f765f0375829c163ab0b1bb22746f69f Mon Sep 17 00:00:00 2001 From: blublinsky Date: Fri, 20 Sep 2024 14:17:39 +0100 Subject: [PATCH 04/16] fixed build --- .../transform/resize_transform.py | 217 ++++++++++++++++++ .../runtime/ray/runtime_configuration.py | 3 +- 2 files changed, 218 insertions(+), 2 deletions(-) create mode 100644 data-processing-lib/python/src/data_processing/test_support/transform/resize_transform.py diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/resize_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/resize_transform.py new file mode 100644 index 000000000..7247ee3bc --- /dev/null +++ b/data-processing-lib/python/src/data_processing/test_support/transform/resize_transform.py @@ -0,0 +1,217 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from argparse import ArgumentParser, Namespace +from typing import Any + +import pyarrow as pa +from data_processing.transform import AbstractTableTransform, TransformConfiguration +from data_processing.utils import ( + LOCAL_TO_DISK, + MB, + CLIArgumentProvider, + UnrecoverableException, + get_logger, +) +from data_processing.runtime.pure_python.runtime_configuration import ( + PythonTransformRuntimeConfiguration, +) +from data_processing.runtime.pure_python import PythonTransformLauncher + + +logger = get_logger(__name__) + +max_rows_per_table_key = "max_rows_per_table" +max_mbytes_per_table_key = "max_mbytes_per_table" +size_type_key = "size_type" +shortname = "resize" +cli_prefix = f"{shortname}_" +max_rows_per_table_cli_param = f"{cli_prefix}{max_rows_per_table_key}" +max_mbytes_per_table_cli_param = f"{cli_prefix}{max_mbytes_per_table_key}" +size_type_cli_param = f"{cli_prefix}{size_type_key}" +size_type_disk = "disk" +size_type_memory = "memory" +size_type_default = size_type_disk + + +class ResizeTransform(AbstractTableTransform): + """ + Implements splitting large files into smaller ones. + Two flavours of splitting are supported - based on the amount of documents and based on the size + """ + + def __init__(self, config: dict[str, Any]): + """ + Initialize based on the dictionary of configuration information. + """ + super().__init__(config) + self.max_rows_per_table = config.get(max_rows_per_table_key, 0) + self.max_bytes_per_table = MB * config.get(max_mbytes_per_table_key, 0) + disk_memory = config.get(size_type_key, size_type_default) + if size_type_default in disk_memory: + self.max_bytes_per_table *= LOCAL_TO_DISK + + self.logger.debug(f"max bytes = {self.max_bytes_per_table}") + self.logger.debug(f"max rows = {self.max_rows_per_table}") + self.buffer = None + if self.max_rows_per_table <= 0 and self.max_bytes_per_table <= 0: + raise ValueError("Neither max rows per table nor max table size are defined") + if self.max_rows_per_table > 0 and self.max_bytes_per_table > 0: + raise ValueError("Both max rows per table and max table size are defined. Only one should be present") + + def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: + """ + split larger files into the smaller ones + :param table: table + :param file_name: name of the file + :return: resulting set of tables + """ + self.logger.debug(f"got new table with {table.num_rows} rows") + if self.buffer is not None: + try: + self.logger.debug( + f"concatenating buffer with {self.buffer.num_rows} rows to table with {table.num_rows} rows" + ) + # table = pa.concat_tables([self.buffer, table], unicode_promote_options="permissive") + table = pa.concat_tables([self.buffer, table]) + self.buffer = None + self.logger.debug(f"concatenated table has {table.num_rows} rows") + except Exception as _: # Can happen if schemas are different + # Raise unrecoverable error to stop the execution + self.logger.warning(f"table in {file_name} can't be merged with the buffer") + self.logger.warning(f"incoming table columns {table.schema.names} ") + self.logger.warning(f"buffer columns {self.buffer.schema.names}") + raise UnrecoverableException() + + result = [] + start_row = 0 + if self.max_rows_per_table > 0: + # split file with max documents + n_rows = table.num_rows + rows_left = n_rows + while start_row < n_rows and rows_left >= self.max_rows_per_table: + length = n_rows - start_row + if length > self.max_rows_per_table: + length = self.max_rows_per_table + a_slice = table.slice(offset=start_row, length=length) + self.logger.debug(f"created table slice with {a_slice.num_rows} rows, starting with row {start_row}") + result.append(a_slice) + start_row = start_row + self.max_rows_per_table + rows_left = rows_left - self.max_rows_per_table + else: + # split based on size + current_size = 0.0 + if table.nbytes >= self.max_bytes_per_table: + for n in range(table.num_rows): + current_size += table.slice(offset=n, length=1).nbytes + if current_size >= self.max_bytes_per_table: + self.logger.debug(f"capturing slice, current_size={current_size}") + # Reached the size + a_slice = table.slice(offset=start_row, length=(n - start_row)) + result.append(a_slice) + start_row = n + current_size = 0.0 + if start_row < table.num_rows: + # buffer remaining chunk for next call + self.logger.debug(f"Buffering table starting at row {start_row}") + self.buffer = table.slice(offset=start_row, length=(table.num_rows - start_row)) + self.logger.debug(f"buffered table has {self.buffer.num_rows} rows") + self.logger.debug(f"returning {len(result)} tables") + return result, {} + + def flush(self) -> tuple[list[pa.Table], dict[str, Any]]: + result = [] + if self.buffer is not None: + self.logger.debug(f"flushing buffered table with {self.buffer.num_rows} rows of size {self.buffer.nbytes}") + result.append(self.buffer) + self.buffer = None + else: + self.logger.debug(f"Empty buffer. nothing to flush.") + return result, {} + + +class ResizeTransformConfiguration(TransformConfiguration): + + """ + Provides support for configuring and using the associated Transform class include + configuration with CLI args and combining of metadata. + """ + + def __init__(self): + super().__init__(name=shortname, transform_class=ResizeTransform) + + def add_input_params(self, parser: ArgumentParser) -> None: + """ + Add Transform-specific arguments to the given parser. + This will be included in a dictionary used to initialize the resizeTransform. + By convention a common prefix should be used for all transform-specific CLI args + (e.g, noop_, pii_, etc.) + """ + parser.add_argument( + f"--{max_rows_per_table_cli_param}", + type=int, + default=-1, + help="Max number of rows per table", + ) + parser.add_argument( + f"--{max_mbytes_per_table_cli_param}", + type=float, + default=-1, + help=f"Max table size (MB). Size is measured according to the --{size_type_cli_param} parameter", + ) + parser.add_argument( + f"--{size_type_cli_param}", + type=str, + required=False, + default=size_type_default, + choices=[size_type_disk, size_type_memory], + help=f"Determines how memory is measured when using the --{max_mbytes_per_table_cli_param} option." + "\n'memory' measures the in-process memory footprint and \n'disk' makes an estimate of the resulting parquet file size.", + ) + + def apply_input_params(self, args: Namespace) -> bool: + """ + Validate and apply the arguments that have been parsed + :param args: user defined arguments. + :return: True, if validate pass or False otherwise + """ + # Capture the args that are specific to this transform + captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False) + self.params = self.params | captured + # dargs = vars(args) + if self.params.get(max_rows_per_table_key) <= 0 and self.params.get(max_mbytes_per_table_key) <= 0: + logger.info("Neither max documents per table nor max table size are defined") + return False + if self.params.get(max_rows_per_table_key) > 0 and self.params.get(max_mbytes_per_table_key) > 0: + logger.info("Both max documents per table and max table size are defined. Only one should be present") + return False + logger.info(f"Split file parameters are : {self.params}") + return True + + +class ResizePythonTransformConfiguration(PythonTransformRuntimeConfiguration): + """ + Implements the RayTransformConfiguration for resize as required by the RayTransformLauncher. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=ResizeTransformConfiguration()) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = PythonTransformLauncher(ResizePythonTransformConfiguration()) + logger.info("Launching noop transform") + launcher.launch() diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/runtime_configuration.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/runtime_configuration.py index 11f83780f..8c53df641 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/runtime_configuration.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/runtime_configuration.py @@ -10,8 +10,7 @@ # limitations under the License. ################################################################################ -from data_processing.runtime import TransformRuntimeConfiguration -from data_processing.transform import TransformConfiguration +from data_processing.transform import TransformConfiguration, TransformRuntimeConfiguration from data_processing_ray.runtime.ray import DefaultRayTransformRuntime From 45a6826a6ec92dca9c11ae858d8916310b19b057 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sat, 21 Sep 2024 19:30:02 +0100 Subject: [PATCH 05/16] made it work --- .../runtime/transform_launcher.py | 7 +- .../test_support/transform/__init__.py | 12 ++- .../transform/pipeline_transform.py | 45 +++++++++++ .../src/data_processing/transform/__init__.py | 1 + .../transform/pipeline_transform.py | 30 ++++--- .../pipeline_transform_configuration.py | 80 +++++++++++++++++++ .../pure_python/pipeline_transform.py | 6 +- .../transform/test_noop.py | 8 +- .../transform/test_resize.py | 53 ++++++++++++ .../transform/test_resize_noop.py | 54 +++++++++++++ 10 files changed, 271 insertions(+), 25 deletions(-) create mode 100644 data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py create mode 100644 data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py create mode 100644 data-processing-lib/python/test/data_processing_tests/transform/test_resize.py create mode 100644 data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py b/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py index 648d48669..3344491d2 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py @@ -57,9 +57,9 @@ def _get_arguments(self, parser: argparse.ArgumentParser) -> argparse.Namespace: :return: list of arguments """ # add additional arguments - self.runtime_config.add_input_params(parser=parser) self.data_access_factory.add_input_params(parser=parser) self.execution_config.add_input_params(parser=parser) + self.runtime_config.add_input_params(parser=parser) return parser.parse_args() def _get_parameters(self, args: argparse.Namespace) -> bool: @@ -68,11 +68,10 @@ def _get_parameters(self, args: argparse.Namespace) -> bool: and does parameters validation :return: True if validation passes or False, if not """ - return ( - self.runtime_config.apply_input_params(args=args) + return (self.runtime_config.apply_input_params(args=args) and self.execution_config.apply_input_params(args=args) and self.data_access_factory.apply_input_params(args=args) - ) + ) def _submit_for_execution(self) -> int: """ diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py index 04d6f3b0f..b868a38b3 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py @@ -5,7 +5,11 @@ NOOPTransformConfiguration, NOOPPythonTransformConfiguration ) -from data_processing.test_support.transform.noop_folder_transform import ( - NOOPFolderTransform, - NOOPFolderPythonTransformConfiguration -) \ No newline at end of file +from data_processing.test_support.transform.resize_transform import ( + ResizeTransform, + ResizePythonTransformConfiguration, +) + +from data_processing.test_support.transform.pipeline_transform import ( + ResizeNOOPPythonTransformConfiguration, +) diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py new file mode 100644 index 000000000..591f679b8 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py @@ -0,0 +1,45 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.runtime.pure_python.runtime_configuration import ( + PythonTransformRuntimeConfiguration, +) +from data_processing.transform import PipelineTransformConfiguration +from data_processing.utils import get_logger +from data_processing.test_support.transform import NOOPPythonTransformConfiguration, ResizePythonTransformConfiguration + +logger = get_logger(__name__) + + +class ResizeNOOPPythonTransformConfiguration(PythonTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config= + PipelineTransformConfiguration({"transforms": [ResizePythonTransformConfiguration(), + NOOPPythonTransformConfiguration()]}) + ) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = PythonTransformLauncher(ResizeNOOPPythonTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/data-processing-lib/python/src/data_processing/transform/__init__.py b/data-processing-lib/python/src/data_processing/transform/__init__.py index df900d21c..6e7609934 100644 --- a/data-processing-lib/python/src/data_processing/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/transform/__init__.py @@ -7,3 +7,4 @@ from data_processing.transform.transform_configuration import TransformConfiguration, get_transform_config from data_processing.transform.runtime_configuration import TransformRuntimeConfiguration from data_processing.transform.pipeline_transform import AbstractPipelineTransform +from data_processing.transform.pipeline_transform_configuration import PipelineTransformConfiguration diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index 34feb050d..d825d4697 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -11,7 +11,7 @@ ################################################################################ from typing import Any -from data_processing.transform import AbstractBinaryTransform, BaseTransformRuntime, TransformRuntimeConfiguration +from data_processing.transform import AbstractBinaryTransform, BaseTransformRuntime from data_processing.utils import get_logger, UnrecoverableException, TransformUtils @@ -21,15 +21,16 @@ class AbstractPipelineTransform(AbstractBinaryTransform): participating transforms in memory """ - def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConfiguration]): + def __init__(self, config: dict[str, Any]): """ Initializes pipeline execution for the list of transforms - :param config - configuration parameters - :param transforms - list of transforms in the pipeline. Note that transforms will + :param config - configuration parameters - dictionary of transforms in the pipeline. + Note that transforms will be executed be executed """ - super().__init__(config) + super().__init__({}) self.logger = get_logger(__name__) + transforms = config.get("transforms", []) if len(transforms) == 0: # Empty pipeline self.logger.error("Pipeline transform with empty list") @@ -85,7 +86,17 @@ def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tupl # no data returned by this transform return [], stats # all done - return data, stats + return self._convert_output(data), stats + + @staticmethod + def _convert_output(data: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]: + res = [None] * len(data) + i = 0 + for dt in data: + fname = TransformUtils.get_file_extension(dt[1]) + res[i] = (dt[0], fname[1]) + i += 1 + return res @staticmethod def _process_transform(transform: AbstractBinaryTransform, data: list[tuple[bytes, str]] @@ -107,7 +118,7 @@ def _process_transform(transform: AbstractBinaryTransform, data: list[tuple[byte res.append((ouf[0], src[0] + ouf[1])) # accumulate statistics stats |= st - return res, stats + return res, stats def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: """ @@ -139,9 +150,10 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: if len(data) == 0: # no data returned by this transform break - res += data + res += self._convert_output(data) else: - res += out_files + res += self._convert_output(out_files) + i += 1 # Done flushing, compute execution stats for _, runtime in self.participants: self._compute_execution_stats(runtime=runtime, st=stats) diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py new file mode 100644 index 000000000..8416c2884 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py @@ -0,0 +1,80 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +from argparse import ArgumentParser, Namespace + +from data_processing.transform import TransformConfiguration +from data_processing.transform.pure_python import PythonPipelineTransform +from data_processing.utils import get_logger + +logger = get_logger(__name__) + + +class PipelineTransformConfiguration(TransformConfiguration): + + """ + Provides support for configuring and using the associated Transform class include + configuration with CLI args. + """ + + def __init__(self, config: dict[str, Any]): + super().__init__( + name="pipeline", + transform_class=PythonPipelineTransform, + ) + self.params = config + + def add_input_params(self, parser: ArgumentParser) -> None: + """ + Add Transform-specific arguments to the given parser. + This will be included in a dictionary used to initialize the NOOPTransform. + By convention a common prefix should be used for all transform-specific CLI args + (e.g, noop_, pii_, etc.) + """ + for t in self.params["transforms"]: + t.transform_config.add_input_params(parser=parser) + + def apply_input_params(self, args: Namespace) -> bool: + """ + Validate and apply the arguments that have been parsed + :param args: user defined arguments. + :return: True, if validate pass or False otherwise + """ + res = True + for t in self.params["transforms"]: + res = res and t.transform_config.apply_input_params(args=args) + return res + + def get_input_params(self) -> dict[str, Any]: + """ + Provides a default implementation if the user has provided a set of keys to the initializer. + These keys are used in apply_input_params() to extract our key/values from the global Namespace of args. + :return: + """ + params = {} + for t in self.params["transforms"]: + params |= t.transform_config.get_input_params() + return params + + def get_transform_metadata(self) -> dict[str, Any]: + """ + Get transform metadata. Before returning remove all parameters key accumulated in + self.remove_from metadata. This allows transform developer to mark any input parameters + that should not make it to the metadata. This can be parameters containing sensitive + information, access keys, secrets, passwords, etc. + :return parameters for metadata: + """ + params = {} + for t in self.params["transforms"]: + params |= t.transform_config.get_transform_metadata() + return params diff --git a/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py index e798e6a36..d52e3a0bb 100644 --- a/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py @@ -12,7 +12,7 @@ from typing import Any from data_processing.transform import AbstractPipelineTransform -from data_processing.runtime import TransformRuntimeConfiguration, BaseTransformRuntime +from data_processing.transform import TransformRuntimeConfiguration, BaseTransformRuntime class PythonPipelineTransform(AbstractPipelineTransform): @@ -21,14 +21,14 @@ class PythonPipelineTransform(AbstractPipelineTransform): participating transforms in memory """ - def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConfiguration]): + def __init__(self, config: dict[str, Any]): """ Initializes pipeline execution for the list of transforms :param config - configuration parameters :param transforms - list of transforms in the pipeline. Note that transforms will be executed """ - super().__init__(config, transforms) + super().__init__(config) def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: """ diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py b/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py index 1eb85fe48..caf1c60f6 100644 --- a/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py @@ -10,11 +10,9 @@ # limitations under the License. ################################################################################ -from typing import Tuple - import pyarrow as pa -from data_processing.test_support.transform.noop_transform import NOOPTransform -from data_processing.test_support.transform.table_transform_test import ( +from data_processing.test_support.transform import NOOPTransform +from data_processing.test_support.transform import ( AbstractTableTransformTest, ) @@ -30,7 +28,7 @@ class TestNOOPTransform(AbstractTableTransformTest): The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. """ - def get_test_transform_fixtures(self) -> list[Tuple]: + def get_test_transform_fixtures(self) -> list[tuple]: fixtures = [ (NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list), (NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list), diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py b/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py new file mode 100644 index 000000000..61ec43c50 --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py @@ -0,0 +1,53 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +from data_processing.test_support.transform import ResizePythonTransformConfiguration +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) + + +class TestPythonResizeTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + # The following based on 3 identical input files of about 39kbytes, and 200 rows + fixtures = [] + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../../transforms/universal/resize/python/test-data")) + launcher = PythonTransformLauncher(ResizePythonTransformConfiguration()) + + # Split into 4 or so files + config = {"resize_max_rows_per_table": 125} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125")) + + # Merge into 2 or so files + config = {"resize_max_rows_per_table": 300} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300")) + + # # Merge all into a single table + config = {"resize_max_mbytes_per_table": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1")) + + # # Merge the 1st 2 and some of the 2nd with the 3rd + config = {"resize_max_mbytes_per_table": 0.05} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05")) + + # Split into 4 or so files + config = {"resize_max_mbytes_per_table": 0.02} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02")) + + return fixtures diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py b/data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py new file mode 100644 index 000000000..939b34da0 --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py @@ -0,0 +1,54 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +from data_processing.test_support.transform import ResizeNOOPPythonTransformConfiguration +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) + + +class TestPythonResizeNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + # The following based on 3 identical input files of about 39kbytes, and 200 rows + fixtures = [] + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), + "../../../../../transforms/universal/resize/python/test-data")) + launcher = PythonTransformLauncher(ResizeNOOPPythonTransformConfiguration()) + + # Split into 4 or so files + config = {"resize_max_rows_per_table": 125, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125")) + + # Merge into 2 or so files + config = {"resize_max_rows_per_table": 300, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300")) + + # # Merge all into a single table + config = {"resize_max_mbytes_per_table": 1, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1")) + + # # Merge the 1st 2 and some of the 2nd with the 3rd + config = {"resize_max_mbytes_per_table": 0.05, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05")) + + # Split into 4 or so files + config = {"resize_max_mbytes_per_table": 0.02, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02")) + + return fixtures From fe743b10ccd878b9b852bb59ffc8056234b6403a Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sat, 21 Sep 2024 22:00:28 +0100 Subject: [PATCH 06/16] fixed small bug --- .../src/data_processing/transform/pipeline_transform.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index d825d4697..ea0de5175 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -94,7 +94,10 @@ def _convert_output(data: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]: i = 0 for dt in data: fname = TransformUtils.get_file_extension(dt[1]) - res[i] = (dt[0], fname[1]) + ext = fname[1] + if len(ext) <= 1: + ext = fname[0] + res[i] = (dt[0], ext) i += 1 return res From 14e3cbb38abc992df1ce9250b834b883f0f9a350 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 22 Sep 2024 09:28:16 +0100 Subject: [PATCH 07/16] small bugs fixes --- .../data_processing/transform/pipeline_transform.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index ea0de5175..e7eb349b6 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -56,6 +56,7 @@ def __init__(self, config: dict[str, Any]): participants.append((tr, runtime)) # save participating transforms self.participants = participants + self.file_name = "" def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: """ @@ -76,6 +77,7 @@ def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tupl holding the extension to be used when writing out the new bytes. """ # process transforms sequentially + self.file_name = file_name data = [(byte_array, file_name)] stats = {} for transform, _ in self.participants: @@ -94,10 +96,7 @@ def _convert_output(data: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]: i = 0 for dt in data: fname = TransformUtils.get_file_extension(dt[1]) - ext = fname[1] - if len(ext) <= 1: - ext = fname[0] - res[i] = (dt[0], ext) + res[i] = (dt[0], fname[1]) i += 1 return res @@ -145,7 +144,7 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: # flush produced output - run it through the rest of the chain data = [] for ouf in out_files: - data.append((ouf[0], f"file{ouf[1]}")) + data.append((ouf[0], self.file_name)) for n in range(i + 1, len(self.participants)): data, st = self._process_transform(transform=self.participants[n][0], data=data) # Accumulate stats @@ -155,7 +154,7 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: break res += self._convert_output(data) else: - res += self._convert_output(out_files) + res += out_files i += 1 # Done flushing, compute execution stats for _, runtime in self.participants: From d3048ff09dadeac4f464c47b5499c07b2b037cfa Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 22 Sep 2024 19:13:45 +0100 Subject: [PATCH 08/16] more tests - ededup and noop --- ...dedup_pipeline_local_python_incremental.py | 49 +++++++++++++++++++ .../src/ededup_pipeline_transform_python.py | 43 ++++++++++++++++ .../test/test_ededup_pipeline_python.py | 35 +++++++++++++ .../python/src/noop_pipeline_local_python.py | 45 +++++++++++++++++ .../src/noop_pipeline_transform_python.py | 43 ++++++++++++++++ .../python/test/test_noop_pipeline_python.py | 47 ++++++++++++++++++ 6 files changed, 262 insertions(+) create mode 100644 transforms/universal/ededup/python/src/ededup_pipeline_local_python_incremental.py create mode 100644 transforms/universal/ededup/python/src/ededup_pipeline_transform_python.py create mode 100644 transforms/universal/ededup/python/test/test_ededup_pipeline_python.py create mode 100644 transforms/universal/noop/python/src/noop_pipeline_local_python.py create mode 100644 transforms/universal/noop/python/src/noop_pipeline_transform_python.py create mode 100644 transforms/universal/noop/python/test/test_noop_pipeline_python.py diff --git a/transforms/universal/ededup/python/src/ededup_pipeline_local_python_incremental.py b/transforms/universal/ededup/python/src/ededup_pipeline_local_python_incremental.py new file mode 100644 index 000000000..170c248db --- /dev/null +++ b/transforms/universal/ededup/python/src/ededup_pipeline_local_python_incremental.py @@ -0,0 +1,49 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.utils import ParamsUtils +from ededup_pipeline_transform_python import EdedupPypelinePythonTransformConfiguration +from ededup_transform_base import ( + doc_column_name_cli_param, + int_column_name_cli_param, +) + + +# create launcher +launcher = PythonTransformLauncher(EdedupPypelinePythonTransformConfiguration()) +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data/input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # orchestrator + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # ededup parameters + doc_column_name_cli_param: "contents", + int_column_name_cli_param: "document_id", +} +sys.argv = ParamsUtils.dict_to_req(d=params) + +# launch +launcher.launch() diff --git a/transforms/universal/ededup/python/src/ededup_pipeline_transform_python.py b/transforms/universal/ededup/python/src/ededup_pipeline_transform_python.py new file mode 100644 index 000000000..a10d3f06e --- /dev/null +++ b/transforms/universal/ededup/python/src/ededup_pipeline_transform_python.py @@ -0,0 +1,43 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.runtime.pure_python.runtime_configuration import ( + PythonTransformRuntimeConfiguration, +) +from data_processing.transform import PipelineTransformConfiguration +from data_processing.utils import get_logger +from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration + +logger = get_logger(__name__) + + +class EdedupPypelinePythonTransformConfiguration(PythonTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config= + PipelineTransformConfiguration({"transforms": [EdedupPythonTransformRuntimeConfiguration()]})) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = PythonTransformLauncher(EdedupPypelinePythonTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/transforms/universal/ededup/python/test/test_ededup_pipeline_python.py b/transforms/universal/ededup/python/test/test_ededup_pipeline_python.py new file mode 100644 index 000000000..81f09c4e7 --- /dev/null +++ b/transforms/universal/ededup/python/test/test_ededup_pipeline_python.py @@ -0,0 +1,35 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from ededup_pipeline_transform_python import EdedupPypelinePythonTransformConfiguration +from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param + + +class TestEdedupPypilinePythonTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + # The following based on 3 identical input files of about 39kbytes, and 200 rows + fixtures = [] + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data")) + launcher = PythonTransformLauncher(EdedupPypelinePythonTransformConfiguration()) + config = {doc_column_name_cli_param: "contents", int_column_name_cli_param: "document_id"} + return [(launcher, config, basedir + "/input", basedir + "/expected")] diff --git a/transforms/universal/noop/python/src/noop_pipeline_local_python.py b/transforms/universal/noop/python/src/noop_pipeline_local_python.py new file mode 100644 index 000000000..c3d2b648d --- /dev/null +++ b/transforms/universal/noop/python/src/noop_pipeline_local_python.py @@ -0,0 +1,45 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.utils import ParamsUtils +from noop_pipeline_transform_python import NOOPPypelinePythonTransformConfiguration + + +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # execution info + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # noop params + "noop_sleep_sec": 1, +} +if __name__ == "__main__": + # Set the simulated command line args + sys.argv = ParamsUtils.dict_to_req(d=params) + # create launcher + launcher = PythonTransformLauncher(runtime_config=NOOPPypelinePythonTransformConfiguration()) + # Launch the ray actor(s) to process the input + launcher.launch() diff --git a/transforms/universal/noop/python/src/noop_pipeline_transform_python.py b/transforms/universal/noop/python/src/noop_pipeline_transform_python.py new file mode 100644 index 000000000..381f13149 --- /dev/null +++ b/transforms/universal/noop/python/src/noop_pipeline_transform_python.py @@ -0,0 +1,43 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.runtime.pure_python.runtime_configuration import ( + PythonTransformRuntimeConfiguration, +) +from data_processing.transform import PipelineTransformConfiguration +from data_processing.utils import get_logger +from noop_transform_python import NOOPPythonTransformConfiguration + +logger = get_logger(__name__) + + +class NOOPPypelinePythonTransformConfiguration(PythonTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config= + PipelineTransformConfiguration({"transforms": [NOOPPythonTransformConfiguration()]})) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = PythonTransformLauncher(NOOPPypelinePythonTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/transforms/universal/noop/python/test/test_noop_pipeline_python.py b/transforms/universal/noop/python/test/test_noop_pipeline_python.py new file mode 100644 index 000000000..d0fec66a8 --- /dev/null +++ b/transforms/universal/noop/python/test/test_noop_pipeline_python.py @@ -0,0 +1,47 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from noop_transform import sleep_cli_param +from noop_pipeline_transform_python import NOOPPypelinePythonTransformConfiguration + + +class TestPythonNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + src_file_dir = os.path.abspath(os.path.dirname(__file__)) + fixtures = [] + + launcher = PythonTransformLauncher(NOOPPypelinePythonTransformConfiguration()) + input_dir = os.path.join(src_file_dir, "../test-data/input") + expected_dir = os.path.join(src_file_dir, "../test-data/expected") + transform_config = {sleep_cli_param: 0} + fixtures.append( + ( + launcher, + transform_config, + input_dir, + expected_dir, + [], # optional list of column names to ignore in comparing test-generated with expected. + ) + ) + + return fixtures From f2256b3644af7e26d376cc51ca7052e28f1674fe Mon Sep 17 00:00:00 2001 From: blublinsky Date: Mon, 23 Sep 2024 10:17:00 +0100 Subject: [PATCH 09/16] add Ray implementation --- .../transform/pipeline_transform.py | 14 ++- .../transform/resize_transform.py | 5 +- .../transform/pipeline_transform.py | 19 ++-- .../pipeline_transform_configuration.py | 11 ++- .../pure_python/pipeline_transform.py | 20 ++-- .../runtime/ray/transform_statistics.py | 8 ++ .../transform/ray/__init__.py | 1 + .../transform/ray/pipeline_transform.py | 52 ++++++++++ ...tal.py => ededup_pipeline_local_python.py} | 0 .../src/ededup_pipeline_transform_python.py | 11 ++- .../src/noop_pipeline_transform_python.py | 11 +-- .../python/test-data/expected/metadata.json | 90 +++++++++--------- .../python/test-data/expected/test1.parquet | Bin 753 -> 759 bytes .../python/test/test_noop_pipeline_python.py | 2 +- .../noop/ray/src/noop_pipeline_local_ray.py | 47 +++++++++ .../ray/src/noop_pipeline_transform_ray.py | 42 ++++++++ .../noop/ray/test/test_noop_pipeline_ray.py | 48 ++++++++++ 17 files changed, 287 insertions(+), 94 deletions(-) create mode 100644 data-processing-lib/ray/src/data_processing_ray/transform/ray/__init__.py create mode 100644 data-processing-lib/ray/src/data_processing_ray/transform/ray/pipeline_transform.py rename transforms/universal/ededup/python/src/{ededup_pipeline_local_python_incremental.py => ededup_pipeline_local_python.py} (100%) create mode 100644 transforms/universal/noop/ray/src/noop_pipeline_local_ray.py create mode 100644 transforms/universal/noop/ray/src/noop_pipeline_transform_ray.py create mode 100644 transforms/universal/noop/ray/test/test_noop_pipeline_ray.py diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py index 591f679b8..b7afdc2e7 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py @@ -10,10 +10,8 @@ # limitations under the License. ################################################################################ -from data_processing.runtime.pure_python import PythonTransformLauncher -from data_processing.runtime.pure_python.runtime_configuration import ( - PythonTransformRuntimeConfiguration, -) +from data_processing.runtime.pure_python import PythonTransformLauncher, PythonTransformRuntimeConfiguration +from data_processing.transform.pure_python import PythonPipelineTransform from data_processing.transform import PipelineTransformConfiguration from data_processing.utils import get_logger from data_processing.test_support.transform import NOOPPythonTransformConfiguration, ResizePythonTransformConfiguration @@ -32,10 +30,10 @@ def __init__(self): """ Initialization """ - super().__init__(transform_config= - PipelineTransformConfiguration({"transforms": [ResizePythonTransformConfiguration(), - NOOPPythonTransformConfiguration()]}) - ) + super().__init__(transform_config=PipelineTransformConfiguration( + config={"transforms": [ResizePythonTransformConfiguration(), + NOOPPythonTransformConfiguration()]}, + transform_class=PythonPipelineTransform)) if __name__ == "__main__": diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/resize_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/resize_transform.py index 7247ee3bc..96c43830b 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/resize_transform.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/resize_transform.py @@ -22,10 +22,7 @@ UnrecoverableException, get_logger, ) -from data_processing.runtime.pure_python.runtime_configuration import ( - PythonTransformRuntimeConfiguration, -) -from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.runtime.pure_python import PythonTransformLauncher, PythonTransformRuntimeConfiguration logger = get_logger(__name__) diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index e7eb349b6..5a410ceba 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -24,9 +24,8 @@ class AbstractPipelineTransform(AbstractBinaryTransform): def __init__(self, config: dict[str, Any]): """ Initializes pipeline execution for the list of transforms - :param config - configuration parameters - dictionary of transforms in the pipeline. - Note that transforms will be executed - be executed + :param config - configuration parameters - list of transforms in the pipeline. + Note that transforms will be executed in the order they are defined """ super().__init__({}) self.logger = get_logger(__name__) @@ -95,8 +94,8 @@ def _convert_output(data: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]: res = [None] * len(data) i = 0 for dt in data: - fname = TransformUtils.get_file_extension(dt[1]) - res[i] = (dt[0], fname[1]) + f_name = TransformUtils.get_file_extension(dt[1]) + res[i] = (dt[0], f_name[1]) i += 1 return res @@ -157,15 +156,13 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: res += out_files i += 1 # Done flushing, compute execution stats - for _, runtime in self.participants: - self._compute_execution_stats(runtime=runtime, st=stats) + self._compute_execution_statistics(stats) return res, {} - def _compute_execution_stats(self, runtime: BaseTransformRuntime, st: dict[str, Any]) -> None: + def _compute_execution_statistics(self, stats: dict[str, Any]) -> None: """ - get transform parameters - :param runtime - runtime - :param st - statistics + Compute execution statistics + :param stats: current statistics from flush :return: None """ raise NotImplemented("must be implemented by subclass") diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py index 8416c2884..8e5bb4097 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py @@ -13,8 +13,7 @@ from typing import Any from argparse import ArgumentParser, Namespace -from data_processing.transform import TransformConfiguration -from data_processing.transform.pure_python import PythonPipelineTransform +from data_processing.transform import TransformConfiguration, AbstractPipelineTransform from data_processing.utils import get_logger logger = get_logger(__name__) @@ -27,10 +26,14 @@ class PipelineTransformConfiguration(TransformConfiguration): configuration with CLI args. """ - def __init__(self, config: dict[str, Any]): + def __init__( + self, + config: dict[str, Any], + transform_class: type[AbstractPipelineTransform], + ): super().__init__( name="pipeline", - transform_class=PythonPipelineTransform, + transform_class=transform_class, ) self.params = config diff --git a/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py index d52e3a0bb..922c71683 100644 --- a/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py @@ -12,7 +12,7 @@ from typing import Any from data_processing.transform import AbstractPipelineTransform -from data_processing.transform import TransformRuntimeConfiguration, BaseTransformRuntime +from data_processing.transform import BaseTransformRuntime class PythonPipelineTransform(AbstractPipelineTransform): @@ -24,9 +24,8 @@ class PythonPipelineTransform(AbstractPipelineTransform): def __init__(self, config: dict[str, Any]): """ Initializes pipeline execution for the list of transforms - :param config - configuration parameters - :param transforms - list of transforms in the pipeline. Note that transforms will - be executed + :param config - configuration parameters - list of transforms in the pipeline. + Note that transforms will be executed in the order they are defined """ super().__init__(config) @@ -39,13 +38,12 @@ def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any] return runtime.get_transform_config(data_access_factory=self.data_access_factory, statistics=self.statistics, files=[]) - def _compute_execution_stats(self, runtime: BaseTransformRuntime, st: dict[str, Any]) -> None: + def _compute_execution_statistics(self, stats: dict[str, Any]) -> None: """ - get transform parameters - :param runtime - runtime - :param st - statistics + Compute execution statistics + :param stats: current statistics from flush :return: None """ - self.statistics.add_stats(st) - runtime.compute_execution_stats(stats=self.statistics) - return + self.statistics.add_stats(stats) + for _, runtime in self.participants: + runtime.compute_execution_stats(stats=self.statistics) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_statistics.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_statistics.py index 2095820c8..b8ae35990 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_statistics.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_statistics.py @@ -74,3 +74,11 @@ def add_stats(self, stats=dict[str, Any]) -> None: self.transform_exceptions_counter.inc(val) if key == "data access retries": self.data_retries_counter.inc(val) + + def update_stats(self, stats=dict[str, Any]) -> None: + """ + Update (overwrite) statistics + :param stats - dictionary creating new statistics + :return: None + """ + self.stats = stats \ No newline at end of file diff --git a/data-processing-lib/ray/src/data_processing_ray/transform/ray/__init__.py b/data-processing-lib/ray/src/data_processing_ray/transform/ray/__init__.py new file mode 100644 index 000000000..8339074af --- /dev/null +++ b/data-processing-lib/ray/src/data_processing_ray/transform/ray/__init__.py @@ -0,0 +1 @@ +from data_processing_ray.transform.ray.pipeline_transform import RayPipelineTransform diff --git a/data-processing-lib/ray/src/data_processing_ray/transform/ray/pipeline_transform.py b/data-processing-lib/ray/src/data_processing_ray/transform/ray/pipeline_transform.py new file mode 100644 index 000000000..433f76d7d --- /dev/null +++ b/data-processing-lib/ray/src/data_processing_ray/transform/ray/pipeline_transform.py @@ -0,0 +1,52 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +import ray +from data_processing.transform import AbstractPipelineTransform +from data_processing.transform import BaseTransformRuntime + + +class RayPipelineTransform(AbstractPipelineTransform): + """ + Transform that executes a set of base transforms sequentially. Data is passed between + participating transforms in memory + """ + + def __init__(self, config: dict[str, Any]): + """ + Initializes pipeline execution for the list of transforms + :param config - configuration parameters - list of transforms in the pipeline. + Note that transforms will be executed in the order they are defined + """ + super().__init__(config) + + def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: + """ + get transform parameters + :param runtime - runtime + :return: transform params + """ + return runtime.get_transform_config(data_access_factory=self.data_access_factory, + statistics=self.statistics, files=[]) + + def _compute_execution_statistics(self, stats: dict[str, Any]) -> None: + """ + Compute execution statistics + :param stats: current statistics from flush + :return: None + """ + current = ray.get(self.statistics.get_execution_stats.remote()) + current |= stats + for _, runtime in self.participants: + current = runtime.compute_execution_stats(stats=current) + ray.get(self.statistics.update_stats.remote(current)) diff --git a/transforms/universal/ededup/python/src/ededup_pipeline_local_python_incremental.py b/transforms/universal/ededup/python/src/ededup_pipeline_local_python.py similarity index 100% rename from transforms/universal/ededup/python/src/ededup_pipeline_local_python_incremental.py rename to transforms/universal/ededup/python/src/ededup_pipeline_local_python.py diff --git a/transforms/universal/ededup/python/src/ededup_pipeline_transform_python.py b/transforms/universal/ededup/python/src/ededup_pipeline_transform_python.py index a10d3f06e..99e0ca487 100644 --- a/transforms/universal/ededup/python/src/ededup_pipeline_transform_python.py +++ b/transforms/universal/ededup/python/src/ededup_pipeline_transform_python.py @@ -10,10 +10,11 @@ # limitations under the License. ################################################################################ -from data_processing.runtime.pure_python import PythonTransformLauncher -from data_processing.runtime.pure_python.runtime_configuration import ( +from data_processing.runtime.pure_python import ( + PythonTransformLauncher, PythonTransformRuntimeConfiguration, ) +from data_processing.transform.pure_python import PythonPipelineTransform from data_processing.transform import PipelineTransformConfiguration from data_processing.utils import get_logger from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration @@ -32,8 +33,10 @@ def __init__(self): """ Initialization """ - super().__init__(transform_config= - PipelineTransformConfiguration({"transforms": [EdedupPythonTransformRuntimeConfiguration()]})) + super().__init__( + transform_config=PipelineTransformConfiguration( + config={"transforms": [EdedupPythonTransformRuntimeConfiguration()]}, + transform_class=PythonPipelineTransform)) if __name__ == "__main__": diff --git a/transforms/universal/noop/python/src/noop_pipeline_transform_python.py b/transforms/universal/noop/python/src/noop_pipeline_transform_python.py index 381f13149..2f0d9c936 100644 --- a/transforms/universal/noop/python/src/noop_pipeline_transform_python.py +++ b/transforms/universal/noop/python/src/noop_pipeline_transform_python.py @@ -10,10 +10,8 @@ # limitations under the License. ################################################################################ -from data_processing.runtime.pure_python import PythonTransformLauncher -from data_processing.runtime.pure_python.runtime_configuration import ( - PythonTransformRuntimeConfiguration, -) +from data_processing.runtime.pure_python import PythonTransformLauncher, PythonTransformRuntimeConfiguration +from data_processing.transform.pure_python import PythonPipelineTransform from data_processing.transform import PipelineTransformConfiguration from data_processing.utils import get_logger from noop_transform_python import NOOPPythonTransformConfiguration @@ -32,8 +30,9 @@ def __init__(self): """ Initialization """ - super().__init__(transform_config= - PipelineTransformConfiguration({"transforms": [NOOPPythonTransformConfiguration()]})) + super().__init__(transform_config=PipelineTransformConfiguration( + config={"transforms": [NOOPPythonTransformConfiguration()]}, + transform_class=PythonPipelineTransform)) if __name__ == "__main__": diff --git a/transforms/universal/noop/python/test-data/expected/metadata.json b/transforms/universal/noop/python/test-data/expected/metadata.json index eed590d79..ca124d145 100644 --- a/transforms/universal/noop/python/test-data/expected/metadata.json +++ b/transforms/universal/noop/python/test-data/expected/metadata.json @@ -1,46 +1,46 @@ { - "pipeline": "pipeline_id", - "job details": { - "job category": "preprocessing", - "job name": "NOOP", - "job type": "ray", - "job id": "job_id", - "start_time": "2024-03-01 15:17:56", - "end_time": "2024-03-01 15:17:57", - "status": "success" - }, - "code": [null], - "job_input_params": { - "sleep": 0, - "checkpointing": false, - "max_files": -1, - "number of workers": 1, - "worker options": { - "num_cpus": 0.8 - }, - "actor creation delay": 0 - }, - "execution_stats": { - "cpus": 10, - "gpus": 0, - "memory": 14.031964112073183, - "object_store": 2.0 - }, - "job_output_stats": { - "source_files": 1, - "source_size": 16534, - "result_files": 1, - "result_size": 16534, - "table_processing": 0.012392997741699219, - "nfiles": 1, - "nrows": 5 - }, - "source": { - "name": "test-data/data_processing/ray/noop/input", - "type": "path" - }, - "target": { - "name": "/tmp/NOOP4o9gv2bq", - "type": "path" - } -} + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "noop", + "job type": "pure python", + "job id": "job_id", + "start_time": "2024-09-22 21:58:36", + "end_time": "2024-09-22 21:58:37", + "status": "success" + }, + "code": { + "github": "github", + "commit_hash": "12345", + "path": "path" + }, + "job_input_params": { + "sleep_sec": 1, + "checkpointing": false, + "max_files": -1, + "random_samples": -1, + "files_to_use": [ + ".parquet" + ], + "num_processors": 0 + }, + "job_output_stats": { + "source_files": 1, + "source_size": 753, + "result_files": 1, + "result_size": 759, + "processing_time": 1.093, + "nfiles": 1, + "nrows": 7, + "source_doc_count": 7, + "result_doc_count": 7 + }, + "source": { + "name": "/Users/borisl/Projects/data-prep-kit/transforms/universal/noop/python/test-data/input", + "type": "path" + }, + "target": { + "name": "/Users/borisl/Projects/data-prep-kit/transforms/universal/noop/python/output", + "type": "path" + } +} \ No newline at end of file diff --git a/transforms/universal/noop/python/test-data/expected/test1.parquet b/transforms/universal/noop/python/test-data/expected/test1.parquet index 7baa6d82f33adfa2fecb41f053f7e9a250daea06..043fb7aad775ee3768b9ef74b9f91c341b0b010c 100644 GIT binary patch delta 267 zcmey!`kl2tz%j^BltuIhljt)>A5lJ01|bFpjjj5B750QOGh8|Z0f06}MBTN2u(-qKd0@ zfYe0pHO_0aB2F|+KRDN$N-J+WMp6fg#ZAqVNhED delta 261 zcmey)`jNFhz%j^BltuIhljsXZA5lJ01|bH9J&Z2A3=9kzB_##LR{Ht{`Pr#O0(OE7 zJbGzGqHZD#68ZUhNr^?mvXVN9Ny&PtY3X{&`MC=d)wgvor2O>zX3tYb`I bJT;l0>8h}ao`If`q>K#E_)bO!29VzYgB?Jt diff --git a/transforms/universal/noop/python/test/test_noop_pipeline_python.py b/transforms/universal/noop/python/test/test_noop_pipeline_python.py index d0fec66a8..acb1b3f06 100644 --- a/transforms/universal/noop/python/test/test_noop_pipeline_python.py +++ b/transforms/universal/noop/python/test/test_noop_pipeline_python.py @@ -20,7 +20,7 @@ from noop_pipeline_transform_python import NOOPPypelinePythonTransformConfiguration -class TestPythonNOOPTransform(AbstractTransformLauncherTest): +class TestPythonNOOPPipelineTransform(AbstractTransformLauncherTest): """ Extends the super-class to define the test data for the tests defined there. The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. diff --git a/transforms/universal/noop/ray/src/noop_pipeline_local_ray.py b/transforms/universal/noop/ray/src/noop_pipeline_local_ray.py new file mode 100644 index 000000000..aca0f6cd3 --- /dev/null +++ b/transforms/universal/noop/ray/src/noop_pipeline_local_ray.py @@ -0,0 +1,47 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing_ray.runtime.ray import RayTransformLauncher +from data_processing.utils import ParamsUtils +from noop_pipeline_transform_ray import NOOPPypelineRayTransformConfiguration + + +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # where to run + "run_locally": True, + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # execution info + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # noop params + "noop_sleep_sec": 1, +} +if __name__ == "__main__": + # Set the simulated command line args + sys.argv = ParamsUtils.dict_to_req(d=params) + # create launcher + launcher = RayTransformLauncher(runtime_config=NOOPPypelineRayTransformConfiguration()) + # Launch the ray actor(s) to process the input + launcher.launch() diff --git a/transforms/universal/noop/ray/src/noop_pipeline_transform_ray.py b/transforms/universal/noop/ray/src/noop_pipeline_transform_ray.py new file mode 100644 index 000000000..00803049a --- /dev/null +++ b/transforms/universal/noop/ray/src/noop_pipeline_transform_ray.py @@ -0,0 +1,42 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing_ray.runtime.ray import RayTransformLauncher, RayTransformRuntimeConfiguration +from data_processing.transform import PipelineTransformConfiguration +from data_processing_ray.transform.ray import RayPipelineTransform +from data_processing.utils import get_logger +from noop_transform_ray import NOOPRayTransformConfiguration + +logger = get_logger(__name__) + + +class NOOPPypelineRayTransformConfiguration(RayTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=PipelineTransformConfiguration( + config={"transforms": [NOOPRayTransformConfiguration()]}, + transform_class=RayPipelineTransform)) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = RayTransformLauncher(NOOPPypelineRayTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/transforms/universal/noop/ray/test/test_noop_pipeline_ray.py b/transforms/universal/noop/ray/test/test_noop_pipeline_ray.py new file mode 100644 index 000000000..bc87a1af5 --- /dev/null +++ b/transforms/universal/noop/ray/test/test_noop_pipeline_ray.py @@ -0,0 +1,48 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing_ray.runtime.ray import RayTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from noop_transform import sleep_cli_param +from noop_pipeline_transform_ray import NOOPPypelineRayTransformConfiguration + + +class TestRayNOOPPipelineTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + src_file_dir = os.path.abspath(os.path.dirname(__file__)) + fixtures = [] + + launcher = RayTransformLauncher(NOOPPypelineRayTransformConfiguration()) + input_dir = os.path.join(src_file_dir, "../test-data/input") + expected_dir = os.path.join(src_file_dir, "../test-data/expected") + runtime_config = {"run_locally": True} + transform_config = {sleep_cli_param: 0} + fixtures.append( + ( + launcher, + transform_config | runtime_config, + input_dir, + expected_dir, + [], # optional list of column names to ignore in comparing test-generated with expected. + ) + ) + + return fixtures From 58824a1d69d31a6b15b0bcb9a355b92945d5ff7a Mon Sep 17 00:00:00 2001 From: blublinsky Date: Mon, 23 Sep 2024 10:52:54 +0100 Subject: [PATCH 10/16] add ededup Ray sample --- .../ray/src/ededup_pipeline_local_ray.py | 55 +++++++++++++++++++ .../ray/src/ededup_pipeline_transform_ray.py | 42 ++++++++++++++ .../ray/test/test_ededup_pipeline_ray.py | 43 +++++++++++++++ 3 files changed, 140 insertions(+) create mode 100644 transforms/universal/ededup/ray/src/ededup_pipeline_local_ray.py create mode 100644 transforms/universal/ededup/ray/src/ededup_pipeline_transform_ray.py create mode 100644 transforms/universal/ededup/ray/test/test_ededup_pipeline_ray.py diff --git a/transforms/universal/ededup/ray/src/ededup_pipeline_local_ray.py b/transforms/universal/ededup/ray/src/ededup_pipeline_local_ray.py new file mode 100644 index 000000000..d8c3b09c8 --- /dev/null +++ b/transforms/universal/ededup/ray/src/ededup_pipeline_local_ray.py @@ -0,0 +1,55 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing.utils import ParamsUtils +from data_processing_ray.runtime.ray import RayTransformLauncher +from ededup_pipeline_transform_ray import EdedupPypelineRayTransformConfiguration +from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param +from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params + + +# create launcher +launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration()) +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data/input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +worker_options = {"num_cpus": 0.5} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # where to run + "run_locally": True, + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # orchestrator + "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options), + "runtime_num_workers": 2, + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_creation_delay": 0, + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # ededup parameters + hash_cpu_cli_params: 0.5, + num_hashes_cli_params: 2, + doc_column_name_cli_param: "contents", + int_column_name_cli_param: "document_id", +} +sys.argv = ParamsUtils.dict_to_req(d=params) + +# launch +launcher.launch() diff --git a/transforms/universal/ededup/ray/src/ededup_pipeline_transform_ray.py b/transforms/universal/ededup/ray/src/ededup_pipeline_transform_ray.py new file mode 100644 index 000000000..5b2e1216d --- /dev/null +++ b/transforms/universal/ededup/ray/src/ededup_pipeline_transform_ray.py @@ -0,0 +1,42 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing_ray.runtime.ray import RayTransformLauncher, RayTransformRuntimeConfiguration +from data_processing.transform import PipelineTransformConfiguration +from data_processing_ray.transform.ray import RayPipelineTransform +from data_processing.utils import get_logger +from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration + +logger = get_logger(__name__) + + +class EdedupPypelineRayTransformConfiguration(RayTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=PipelineTransformConfiguration( + config={"transforms": [EdedupRayTransformRuntimeConfiguration()]}, + transform_class=RayPipelineTransform)) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/transforms/universal/ededup/ray/test/test_ededup_pipeline_ray.py b/transforms/universal/ededup/ray/test/test_ededup_pipeline_ray.py new file mode 100644 index 000000000..d9460ef93 --- /dev/null +++ b/transforms/universal/ededup/ray/test/test_ededup_pipeline_ray.py @@ -0,0 +1,43 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing_ray.runtime.ray import RayTransformLauncher +from ededup_pipeline_transform_ray import EdedupPypelineRayTransformConfiguration +from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param +from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params + + +class TestRayEdedupTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data")) + config = { + "run_locally": True, + # When running in ray, our Runtime's get_transform_config() method will load the domains using + # the orchestrator's DataAccess/Factory. So we don't need to provide the bl_local_config configuration. + hash_cpu_cli_params: 0.5, + num_hashes_cli_params: 2, + doc_column_name_cli_param: "contents", + int_column_name_cli_param: "document_id", + } + launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration()) + fixtures = [(launcher, config, basedir + "/input", basedir + "/expected")] + return fixtures From f0317321418280475d3a4f6dc964f8eb81ad0994 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Mon, 23 Sep 2024 12:49:47 +0100 Subject: [PATCH 11/16] completed Ray tests --- .../transform/test_noop.py | 38 ++-- .../transform/test_resize.py | 3 +- ...e_noop.py => test_resize_noop_pipeline.py} | 0 .../test_support/transform/__init__.py | 2 + .../transform/pipeline_transform.py | 43 ++++ .../transform/resize_transform.py | 214 ++++++++++++++++++ .../launch/ray/ray_test_resize.py | 55 +++++ .../ray/ray_test_resize_noop_pipeline.py | 55 +++++ 8 files changed, 393 insertions(+), 17 deletions(-) rename data-processing-lib/python/test/data_processing_tests/transform/{test_resize_noop.py => test_resize_noop_pipeline.py} (100%) create mode 100644 data-processing-lib/ray/src/data_processing_ray/test_support/transform/pipeline_transform.py create mode 100644 data-processing-lib/ray/src/data_processing_ray/test_support/transform/resize_transform.py create mode 100644 data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_resize.py create mode 100644 data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_resize_noop_pipeline.py diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py b/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py index caf1c60f6..0e5b0396a 100644 --- a/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py @@ -9,28 +9,34 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - -import pyarrow as pa -from data_processing.test_support.transform import NOOPTransform -from data_processing.test_support.transform import ( - AbstractTableTransformTest, +import os +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, ) +from data_processing.test_support.transform.noop_transform import NOOPPythonTransformConfiguration, sleep_cli_param - -table = pa.Table.from_pydict({"name": pa.array(["Tom", "Dick", "Harry"]), "age": pa.array([0, 1, 2])}) -expected_table = table # We're a noop after all. -expected_metadata_list = [{"nfiles": 1, "nrows": 3}, {}] # transform() result # flush() result - - -class TestNOOPTransform(AbstractTableTransformTest): +class TestPythonNOOPTransform(AbstractTransformLauncherTest): """ Extends the super-class to define the test data for the tests defined there. The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. """ def get_test_transform_fixtures(self) -> list[tuple]: - fixtures = [ - (NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list), - (NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list), - ] + src_file_dir = os.path.abspath(os.path.dirname(__file__)) + fixtures = [] + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), + "../../../../../transforms/universal/noop/python/test-data")) + launcher = PythonTransformLauncher(NOOPPythonTransformConfiguration()) + transform_config = {sleep_cli_param: 0} + fixtures.append( + ( + launcher, + transform_config, + basedir + "/input", + basedir + "/expected", + [], # optional list of column names to ignore in comparing test-generated with expected. + ) + ) + return fixtures diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py b/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py index 61ec43c50..d90a5fa8e 100644 --- a/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py @@ -27,7 +27,8 @@ class TestPythonResizeTransform(AbstractTransformLauncherTest): def get_test_transform_fixtures(self) -> list[tuple]: # The following based on 3 identical input files of about 39kbytes, and 200 rows fixtures = [] - basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../../transforms/universal/resize/python/test-data")) + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), + "../../../../../transforms/universal/resize/python/test-data")) launcher = PythonTransformLauncher(ResizePythonTransformConfiguration()) # Split into 4 or so files diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py b/data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop_pipeline.py similarity index 100% rename from data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py rename to data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop_pipeline.py diff --git a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/__init__.py b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/__init__.py index dd095c961..695b03471 100644 --- a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/__init__.py +++ b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/__init__.py @@ -1,2 +1,4 @@ from data_processing_ray.test_support.transform.noop_transform import NOOPRayTransformConfiguration from data_processing_ray.test_support.transform.noop_folder_transform import NOOPFolderRayTransformConfiguration +from data_processing_ray.test_support.transform.resize_transform import ResizeRayTransformConfiguration +from data_processing_ray.test_support.transform.pipeline_transform import ResizeNOOPRayTransformConfiguration diff --git a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/pipeline_transform.py b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/pipeline_transform.py new file mode 100644 index 000000000..32db42606 --- /dev/null +++ b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/pipeline_transform.py @@ -0,0 +1,43 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing_ray.runtime.ray import RayTransformLauncher, RayTransformRuntimeConfiguration +from data_processing_ray.transform.ray import RayPipelineTransform +from data_processing.transform import PipelineTransformConfiguration +from data_processing.utils import get_logger +from data_processing_ray.test_support.transform import NOOPRayTransformConfiguration, ResizeRayTransformConfiguration + +logger = get_logger(__name__) + + +class ResizeNOOPRayTransformConfiguration(RayTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=PipelineTransformConfiguration( + config={"transforms": [ResizeRayTransformConfiguration(), + NOOPRayTransformConfiguration()]}, + transform_class=RayPipelineTransform)) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = RayTransformLauncher(ResizeNOOPRayTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/data-processing-lib/ray/src/data_processing_ray/test_support/transform/resize_transform.py b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/resize_transform.py new file mode 100644 index 000000000..19546950d --- /dev/null +++ b/data-processing-lib/ray/src/data_processing_ray/test_support/transform/resize_transform.py @@ -0,0 +1,214 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from argparse import ArgumentParser, Namespace +from typing import Any + +import pyarrow as pa +from data_processing.transform import AbstractTableTransform, TransformConfiguration +from data_processing.utils import ( + LOCAL_TO_DISK, + MB, + CLIArgumentProvider, + UnrecoverableException, + get_logger, +) +from data_processing_ray.runtime.ray import RayTransformLauncher, RayTransformRuntimeConfiguration + + +logger = get_logger(__name__) + +max_rows_per_table_key = "max_rows_per_table" +max_mbytes_per_table_key = "max_mbytes_per_table" +size_type_key = "size_type" +shortname = "resize" +cli_prefix = f"{shortname}_" +max_rows_per_table_cli_param = f"{cli_prefix}{max_rows_per_table_key}" +max_mbytes_per_table_cli_param = f"{cli_prefix}{max_mbytes_per_table_key}" +size_type_cli_param = f"{cli_prefix}{size_type_key}" +size_type_disk = "disk" +size_type_memory = "memory" +size_type_default = size_type_disk + + +class ResizeTransform(AbstractTableTransform): + """ + Implements splitting large files into smaller ones. + Two flavours of splitting are supported - based on the amount of documents and based on the size + """ + + def __init__(self, config: dict[str, Any]): + """ + Initialize based on the dictionary of configuration information. + """ + super().__init__(config) + self.max_rows_per_table = config.get(max_rows_per_table_key, 0) + self.max_bytes_per_table = MB * config.get(max_mbytes_per_table_key, 0) + disk_memory = config.get(size_type_key, size_type_default) + if size_type_default in disk_memory: + self.max_bytes_per_table *= LOCAL_TO_DISK + + self.logger.debug(f"max bytes = {self.max_bytes_per_table}") + self.logger.debug(f"max rows = {self.max_rows_per_table}") + self.buffer = None + if self.max_rows_per_table <= 0 and self.max_bytes_per_table <= 0: + raise ValueError("Neither max rows per table nor max table size are defined") + if self.max_rows_per_table > 0 and self.max_bytes_per_table > 0: + raise ValueError("Both max rows per table and max table size are defined. Only one should be present") + + def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: + """ + split larger files into the smaller ones + :param table: table + :param file_name: name of the file + :return: resulting set of tables + """ + self.logger.debug(f"got new table with {table.num_rows} rows") + if self.buffer is not None: + try: + self.logger.debug( + f"concatenating buffer with {self.buffer.num_rows} rows to table with {table.num_rows} rows" + ) + # table = pa.concat_tables([self.buffer, table], unicode_promote_options="permissive") + table = pa.concat_tables([self.buffer, table]) + self.buffer = None + self.logger.debug(f"concatenated table has {table.num_rows} rows") + except Exception as _: # Can happen if schemas are different + # Raise unrecoverable error to stop the execution + self.logger.warning(f"table in {file_name} can't be merged with the buffer") + self.logger.warning(f"incoming table columns {table.schema.names} ") + self.logger.warning(f"buffer columns {self.buffer.schema.names}") + raise UnrecoverableException() + + result = [] + start_row = 0 + if self.max_rows_per_table > 0: + # split file with max documents + n_rows = table.num_rows + rows_left = n_rows + while start_row < n_rows and rows_left >= self.max_rows_per_table: + length = n_rows - start_row + if length > self.max_rows_per_table: + length = self.max_rows_per_table + a_slice = table.slice(offset=start_row, length=length) + self.logger.debug(f"created table slice with {a_slice.num_rows} rows, starting with row {start_row}") + result.append(a_slice) + start_row = start_row + self.max_rows_per_table + rows_left = rows_left - self.max_rows_per_table + else: + # split based on size + current_size = 0.0 + if table.nbytes >= self.max_bytes_per_table: + for n in range(table.num_rows): + current_size += table.slice(offset=n, length=1).nbytes + if current_size >= self.max_bytes_per_table: + self.logger.debug(f"capturing slice, current_size={current_size}") + # Reached the size + a_slice = table.slice(offset=start_row, length=(n - start_row)) + result.append(a_slice) + start_row = n + current_size = 0.0 + if start_row < table.num_rows: + # buffer remaining chunk for next call + self.logger.debug(f"Buffering table starting at row {start_row}") + self.buffer = table.slice(offset=start_row, length=(table.num_rows - start_row)) + self.logger.debug(f"buffered table has {self.buffer.num_rows} rows") + self.logger.debug(f"returning {len(result)} tables") + return result, {} + + def flush(self) -> tuple[list[pa.Table], dict[str, Any]]: + result = [] + if self.buffer is not None: + self.logger.debug(f"flushing buffered table with {self.buffer.num_rows} rows of size {self.buffer.nbytes}") + result.append(self.buffer) + self.buffer = None + else: + self.logger.debug(f"Empty buffer. nothing to flush.") + return result, {} + + +class ResizeTransformConfiguration(TransformConfiguration): + + """ + Provides support for configuring and using the associated Transform class include + configuration with CLI args and combining of metadata. + """ + + def __init__(self): + super().__init__(name=shortname, transform_class=ResizeTransform) + + def add_input_params(self, parser: ArgumentParser) -> None: + """ + Add Transform-specific arguments to the given parser. + This will be included in a dictionary used to initialize the resizeTransform. + By convention a common prefix should be used for all transform-specific CLI args + (e.g, noop_, pii_, etc.) + """ + parser.add_argument( + f"--{max_rows_per_table_cli_param}", + type=int, + default=-1, + help="Max number of rows per table", + ) + parser.add_argument( + f"--{max_mbytes_per_table_cli_param}", + type=float, + default=-1, + help=f"Max table size (MB). Size is measured according to the --{size_type_cli_param} parameter", + ) + parser.add_argument( + f"--{size_type_cli_param}", + type=str, + required=False, + default=size_type_default, + choices=[size_type_disk, size_type_memory], + help=f"Determines how memory is measured when using the --{max_mbytes_per_table_cli_param} option." + "\n'memory' measures the in-process memory footprint and \n'disk' makes an estimate of the resulting parquet file size.", + ) + + def apply_input_params(self, args: Namespace) -> bool: + """ + Validate and apply the arguments that have been parsed + :param args: user defined arguments. + :return: True, if validate pass or False otherwise + """ + # Capture the args that are specific to this transform + captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False) + self.params = self.params | captured + # dargs = vars(args) + if self.params.get(max_rows_per_table_key) <= 0 and self.params.get(max_mbytes_per_table_key) <= 0: + logger.info("Neither max documents per table nor max table size are defined") + return False + if self.params.get(max_rows_per_table_key) > 0 and self.params.get(max_mbytes_per_table_key) > 0: + logger.info("Both max documents per table and max table size are defined. Only one should be present") + return False + logger.info(f"Split file parameters are : {self.params}") + return True + + +class ResizeRayTransformConfiguration(RayTransformRuntimeConfiguration): + """ + Implements the RayTransformConfiguration for resize as required by the RayTransformLauncher. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=ResizeTransformConfiguration()) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = RayTransformLauncher(ResizeRayTransformConfiguration()) + logger.info("Launching noop transform") + launcher.launch() diff --git a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_resize.py b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_resize.py new file mode 100644 index 000000000..4dbd121b9 --- /dev/null +++ b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_resize.py @@ -0,0 +1,55 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +from data_processing_ray.test_support.transform import ResizeRayTransformConfiguration +from data_processing_ray.runtime.ray import RayTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) + + +class TestRayResizeTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + # The following based on 3 identical input files of about 39kbytes, and 200 rows + fixtures = [] + common_config = {"runtime_num_workers": 1, "run_locally": True} + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), + "../../../../../../transforms/universal/resize/ray/test-data")) + launcher = RayTransformLauncher(ResizeRayTransformConfiguration()) + + # Split into 4 or so files + config = {"resize_max_rows_per_table": 125} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125")) + + # Merge into 2 or so files + config = {"resize_max_rows_per_table": 300} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300")) + + # # Merge all into a single table + config = {"resize_max_mbytes_per_table": 1} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1")) + + # # Merge the 1st 2 and some of the 2nd with the 3rd + config = {"resize_max_mbytes_per_table": 0.05} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05")) + + # Split into 4 or so files + config = {"resize_max_mbytes_per_table": 0.02} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02")) + + return fixtures diff --git a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_resize_noop_pipeline.py b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_resize_noop_pipeline.py new file mode 100644 index 000000000..276fdfc98 --- /dev/null +++ b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_resize_noop_pipeline.py @@ -0,0 +1,55 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +from data_processing_ray.test_support.transform import ResizeNOOPRayTransformConfiguration +from data_processing_ray.runtime.ray import RayTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) + + +class TestPythonResizeNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + # The following based on 3 identical input files of about 39kbytes, and 200 rows + fixtures = [] + common_config = {"runtime_num_workers": 1, "run_locally": True} + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), + "../../../../../../transforms/universal/resize/ray/test-data")) + launcher = RayTransformLauncher(ResizeNOOPRayTransformConfiguration()) + + # Split into 4 or so files + config = {"resize_max_rows_per_table": 125, "noop_sleep_sec": 1} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125")) + + # Merge into 2 or so files + config = {"resize_max_rows_per_table": 300, "noop_sleep_sec": 1} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300")) + + # # Merge all into a single table + config = {"resize_max_mbytes_per_table": 1, "noop_sleep_sec": 1} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1")) + + # # Merge the 1st 2 and some of the 2nd with the 3rd + config = {"resize_max_mbytes_per_table": 0.05, "noop_sleep_sec": 1} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05")) + + # Split into 4 or so files + config = {"resize_max_mbytes_per_table": 0.02, "noop_sleep_sec": 1} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02")) + + return fixtures From c7d7c633ca49a680484e7f6f0855968f26dcd625 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Mon, 23 Sep 2024 16:20:36 +0100 Subject: [PATCH 12/16] Add Spark support and tests --- .../transform/spark/__init__.py | 1 + .../transform/spark/pipeline_transform.py | 50 +++++++++++++++++++ .../spark/src/noop_pipeline_local_spark.py | 45 +++++++++++++++++ .../src/noop_pipeline_transform_spark.py | 42 ++++++++++++++++ .../spark/test/test_noop_pipeline_spark.py | 34 +++++++++++++ 5 files changed, 172 insertions(+) create mode 100644 data-processing-lib/spark/src/data_processing_spark/transform/spark/__init__.py create mode 100644 data-processing-lib/spark/src/data_processing_spark/transform/spark/pipeline_transform.py create mode 100644 transforms/universal/noop/spark/src/noop_pipeline_local_spark.py create mode 100644 transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py create mode 100644 transforms/universal/noop/spark/test/test_noop_pipeline_spark.py diff --git a/data-processing-lib/spark/src/data_processing_spark/transform/spark/__init__.py b/data-processing-lib/spark/src/data_processing_spark/transform/spark/__init__.py new file mode 100644 index 000000000..9a72e3503 --- /dev/null +++ b/data-processing-lib/spark/src/data_processing_spark/transform/spark/__init__.py @@ -0,0 +1 @@ +from data_processing_spark.transform.spark.pipeline_transform import SparkPipelineTransform diff --git a/data-processing-lib/spark/src/data_processing_spark/transform/spark/pipeline_transform.py b/data-processing-lib/spark/src/data_processing_spark/transform/spark/pipeline_transform.py new file mode 100644 index 000000000..c49551c6f --- /dev/null +++ b/data-processing-lib/spark/src/data_processing_spark/transform/spark/pipeline_transform.py @@ -0,0 +1,50 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +from data_processing.transform import AbstractPipelineTransform +from data_processing.transform import BaseTransformRuntime + + +class SparkPipelineTransform(AbstractPipelineTransform): + """ + Transform that executes a set of base transforms sequentially. Data is passed between + participating transforms in memory + """ + + def __init__(self, config: dict[str, Any]): + """ + Initializes pipeline execution for the list of transforms + :param config - configuration parameters - list of transforms in the pipeline. + Note that transforms will be executed in the order they are defined + """ + self.partition = config.get("partition_index", 0) + super().__init__(config) + + def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: + """ + get transform parameters + :param runtime - runtime + :return: transform params + """ + return runtime.get_transform_config(partition=self.partition, + data_access_factory=self.data_access_factory,statistics=self.statistics) + + def _compute_execution_statistics(self, stats: dict[str, Any]) -> None: + """ + Compute execution statistics + :param stats: current statistics from flush + :return: None + """ + self.statistics.add_stats(stats) + for _, runtime in self.participants: + runtime.compute_execution_stats(stats=self.statistics) \ No newline at end of file diff --git a/transforms/universal/noop/spark/src/noop_pipeline_local_spark.py b/transforms/universal/noop/spark/src/noop_pipeline_local_spark.py new file mode 100644 index 000000000..1d7eea850 --- /dev/null +++ b/transforms/universal/noop/spark/src/noop_pipeline_local_spark.py @@ -0,0 +1,45 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing_spark.runtime.spark import SparkTransformLauncher +from data_processing.utils import ParamsUtils +from noop_pipeline_transform_spark import NOOPPypelineSparkTransformConfiguration + + +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # execution info + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # noop params + "noop_sleep_sec": 1, +} +if __name__ == "__main__": + # Set the simulated command line args + sys.argv = ParamsUtils.dict_to_req(d=params) + # create launcher + launcher = SparkTransformLauncher(runtime_config=NOOPPypelineSparkTransformConfiguration()) + # Launch the ray actor(s) to process the input + launcher.launch() diff --git a/transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py b/transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py new file mode 100644 index 000000000..4c3d6718e --- /dev/null +++ b/transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py @@ -0,0 +1,42 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing_spark.runtime.spark import SparkTransformLauncher, SparkTransformRuntimeConfiguration +from data_processing.transform import PipelineTransformConfiguration +from data_processing_spark.transform.spark import SparkPipelineTransform +from data_processing.utils import get_logger +from noop_transform_spark import NOOPSparkTransformConfiguration + +logger = get_logger(__name__) + + +class NOOPPypelineSparkTransformConfiguration(SparkTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=PipelineTransformConfiguration( + config={"transforms": [NOOPSparkTransformConfiguration()]}, + transform_class=SparkPipelineTransform)) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = SparkTransformLauncher(NOOPPypelineSparkTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/transforms/universal/noop/spark/test/test_noop_pipeline_spark.py b/transforms/universal/noop/spark/test/test_noop_pipeline_spark.py new file mode 100644 index 000000000..03759b9e6 --- /dev/null +++ b/transforms/universal/noop/spark/test/test_noop_pipeline_spark.py @@ -0,0 +1,34 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing_spark.runtime.spark import SparkTransformLauncher +from noop_pipeline_transform_spark import NOOPPypelineSparkTransformConfiguration + + +class TestSparkNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + basedir = "../test-data" + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir)) + fixtures = [] + launcher = SparkTransformLauncher(NOOPPypelineSparkTransformConfiguration()) + fixtures.append((launcher, {"noop_sleep_sec": 1}, basedir + "/input", basedir + "/expected")) + return fixtures From da2870ecfd2e25e91fa981bbfe74bae138c37e52 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Tue, 26 Nov 2024 19:50:44 +0000 Subject: [PATCH 13/16] fixed merging issue --- .../data_processing/runtime/pure_python/transform_runtime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py index 085859fca..1f60f678d 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_runtime.py @@ -12,7 +12,7 @@ from typing import Any -from data_processing.data_access import DataAccessFactoryBase +from data_processing.data_access import DataAccessFactoryBase, DataAccess from data_processing.transform import TransformStatistics, BaseTransformRuntime From a95b07ed61de7c4b43871070d3718a332e4f69c2 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Wed, 27 Nov 2024 09:13:57 +0000 Subject: [PATCH 14/16] fixed merging issue --- .../src/data_processing/test_support/transform/__init__.py | 5 +++++ .../src/data_processing/transform/pipeline_transform.py | 2 +- .../src/data_processing_ray/runtime/ray/transform_runtime.py | 2 +- .../runtime/spark/runtime_configuration.py | 5 ++--- .../data_processing_spark/runtime/spark/transform_runtime.py | 3 +-- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py index b868a38b3..61a02aae9 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py @@ -5,6 +5,11 @@ NOOPTransformConfiguration, NOOPPythonTransformConfiguration ) +from data_processing.test_support.transform.noop_folder_transform import ( + NOOPFolderTransform, + NOOPTransformConfiguration, + NOOPFolderPythonTransformConfiguration, +) from data_processing.test_support.transform.resize_transform import ( ResizeTransform, ResizePythonTransformConfiguration, diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index 5a410ceba..e38050cb3 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -34,7 +34,7 @@ def __init__(self, config: dict[str, Any]): # Empty pipeline self.logger.error("Pipeline transform with empty list") raise UnrecoverableException("Pipeline transform with empty list") - self.data_access_factory = config.get("data_access_factory", None) + self.data_access_factory = config.get("data_access", None) if self.data_access_factory is None: self.logger.error("pipeline transform - Data access factory is not defined") raise UnrecoverableException("pipeline transform - Data access factory is not defined") diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py index b88ade2d5..8f2a72b02 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_runtime.py @@ -12,7 +12,7 @@ from typing import Any -from data_processing.data_access import DataAccessFactoryBase +from data_processing.data_access import DataAccessFactoryBase, DataAccess from data_processing.transform import BaseTransformRuntime from ray.actor import ActorHandle diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/runtime_configuration.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/runtime_configuration.py index 0f788396e..89e662b66 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/runtime_configuration.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/runtime_configuration.py @@ -13,8 +13,7 @@ from typing import Any from data_processing.data_access import DataAccessFactoryBase -from data_processing.runtime import TransformRuntimeConfiguration -from data_processing.transform import TransformConfiguration +from data_processing.transform import TransformConfiguration, TransformRuntimeConfiguration from data_processing_spark.runtime.spark import DefaultSparkTransformRuntime @@ -29,7 +28,7 @@ def __init__( :param transform_config - base configuration class :param runtime_class: implementation of the transform runtime """ - super().__init__(transform_config=transform_config) + super().__init__(transform_config=transform_config, runtime_class=runtime_class) self.runtime_class = runtime_class def get_bcast_params(self, data_access_factory: DataAccessFactoryBase) -> dict[str, Any]: diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py index 887af5c36..59513a765 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_runtime.py @@ -13,8 +13,7 @@ from typing import Any from data_processing.data_access import DataAccessFactoryBase, DataAccess -from data_processing.transform import TransformStatistics -from data_processing.runtime import BaseTransformRuntime +from data_processing.transform import BaseTransformRuntime, TransformStatistics class DefaultSparkTransformRuntime(BaseTransformRuntime): From 01f79b80cafa03c01f6ac3f31d5c25438353aa1e Mon Sep 17 00:00:00 2001 From: blublinsky Date: Wed, 27 Nov 2024 10:41:25 +0000 Subject: [PATCH 15/16] fixed merging issue --- .../src/data_processing/runtime/transform_file_processor.py | 1 + .../python/src/data_processing/transform/pipeline_transform.py | 2 +- .../universal/ededup/python/src/ededup_transform_python.py | 2 -- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py index a1084d769..298c7cab0 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py @@ -48,6 +48,7 @@ def __init__( # Add data access and statistics to the processor parameters self.transform_params = transform_parameters self.transform_params["data_access"] = self.data_access + self.transform_params["data_access_factory"] = data_access_factory self.is_folder = is_folder def process_file(self, f_name: str) -> None: diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index e38050cb3..5a410ceba 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -34,7 +34,7 @@ def __init__(self, config: dict[str, Any]): # Empty pipeline self.logger.error("Pipeline transform with empty list") raise UnrecoverableException("Pipeline transform with empty list") - self.data_access_factory = config.get("data_access", None) + self.data_access_factory = config.get("data_access_factory", None) if self.data_access_factory is None: self.logger.error("pipeline transform - Data access factory is not defined") raise UnrecoverableException("pipeline transform - Data access factory is not defined") diff --git a/transforms/universal/ededup/python/src/ededup_transform_python.py b/transforms/universal/ededup/python/src/ededup_transform_python.py index 0135d11bc..97dbcb6c1 100644 --- a/transforms/universal/ededup/python/src/ededup_transform_python.py +++ b/transforms/universal/ededup/python/src/ededup_transform_python.py @@ -65,8 +65,6 @@ def __init__(self, params: dict[str, Any]): self.filter = None self.logger = get_logger(__name__) - - def get_transform_config( self, data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, files: list[str] ) -> dict[str, Any]: From 52b5a0dfac5ede129ea0758bb452615a29a57a5c Mon Sep 17 00:00:00 2001 From: blublinsky Date: Wed, 27 Nov 2024 12:01:09 +0000 Subject: [PATCH 16/16] add documentation --- .../doc/pipelined_transform.md | 42 +++++++++++++++++++ data-processing-lib/doc/transforms.md | 5 ++- 2 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 data-processing-lib/doc/pipelined_transform.md diff --git a/data-processing-lib/doc/pipelined_transform.md b/data-processing-lib/doc/pipelined_transform.md new file mode 100644 index 000000000..5fb55cffd --- /dev/null +++ b/data-processing-lib/doc/pipelined_transform.md @@ -0,0 +1,42 @@ +# Pipelined transform + +Typical DPK usage is a sequential invocation of individual transforms that process all of the input data and create +the output one. Such execution is very convenient as it produces all of the intermediate data, which can be useful, +especially during the debugging. + +This said, such approach creates a lot of intermediate data and executes a lot of reads and writes, which might +significantly slow down processing, especially in the case of large data sets. + +To overcome this drawback, DPK introduced a new type of transform - pipeline transform. Pipeline transform +(somewhat similar to [sklearn pipeline](https://scikit-learn.org/1.5/modules/generated/sklearn.pipeline.Pipeline.html)) +is a transform, meaning it transforms one file at a time and a pipeline, meaning that this file is transformed by +a set of individual transformers, passing data between then as a byte array in memory. + +## Creating pipeline transform. + +Creation of the pipeline transform requires creation of runtime specific transform runtime configuration +leveraging [PipelineTransformConfiguration](../python/src/data_processing/transform/pipeline_transform_configuration.py) +Examples of such configuration can be found: + +* [Python](../../transforms/universal/noop/python/src/noop_pipeline_transform_python.py) +* [Ray](../../transforms/universal/noop/ray/src/noop_pipeline_transform_ray.py) +* [Spark](../../transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py) + +These are very simple examples using pipeline containing a single transform. + +More complex example defining pipeline of two examples - Resize and NOOP can be found +[Python](../python/src/data_processing/test_support/transform/pipeline_transform.py) and +[Ray](../ray/src/data_processing_ray/test_support/transform/pipeline_transform.py) + +***Note*** the limitation of pipeline transform is that all participating transforms have to be different, +The same transform can not be included twice. + +## Running pipeline transform + +Similar to the `ordinary` transforms, pipeline transforms can be invoked using launcher, but parameters, +in this case have to include parameters for all participating transforms. The base class +[AbstractPipelineTransform](../python/src/data_processing/transform/pipeline_transform.py) will initialize +all participating transforms based on these parameters + +***Note*** as per DPK convention, parameters for every transform are prefixed by a transform name, which means +that a given transform will always get an appropriate parameter \ No newline at end of file diff --git a/data-processing-lib/doc/transforms.md b/data-processing-lib/doc/transforms.md index fc3509ba3..3d5169c90 100644 --- a/data-processing-lib/doc/transforms.md +++ b/data-processing-lib/doc/transforms.md @@ -9,9 +9,12 @@ There are currently two types of transforms defined in DPK: * [AbstractBinaryTransform](../python/src/data_processing/transform/binary_transform.py) which is a base class for all data transforms. Data transforms convert a file of data producing zero or more data files -and metadata. A specific class of the binary transform is +and metadata. Specific classes of the binary transform are [AbstractTableTransform](../python/src/data_processing/transform/table_transform.py) that consumes and produces data files containing [pyarrow tables](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html) +and [AbstractPipelineTransform](../python/src/data_processing/transform/pipeline_transform.py) that creates +pipelined execution of one or more transforms. For more information on pipelined transforms reffer to +[this](pipelined_transform.md) * [AbstractFolderTransform](../python/src/data_processing/transform/folder_transform.py) which is a base class consuming a folder (that can contain an arbitrary set of files, that need to be processed together) and proces zero or more data files and metadata.