Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline transform #602

Open
wants to merge 16 commits into
base: dev
Choose a base branch
from
2 changes: 1 addition & 1 deletion data-processing-lib/doc/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ runtime interfacee expected to be implemented by each runtime ([python](python-r
* [DataAccessFactory](../python/src/data_processing/data_access/data_access_factory_base.py) - is
used to configure the input and output data files to be processed and creates
the `DataAccess` instance (see below) according to the CLI parameters.
* [TransformRuntimeConfiguration](../python/src/data_processing/runtime/runtime_configuration.py) - captures
* [TransformRuntimeConfiguration](../python/src/data_processing/transform/runtime_configuration.py) - captures
the `TransformConfiguration` and runtime-specific configuration.
* [DataAccess](../python/src/data_processing/data_access/data_access.py) - is
the interface defining data i/o methods and selection. Implementations for local
Expand Down
42 changes: 42 additions & 0 deletions data-processing-lib/doc/pipelined_transform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Pipelined transform

Typical DPK usage is a sequential invocation of individual transforms that process all of the input data and create
the output one. Such execution is very convenient as it produces all of the intermediate data, which can be useful,
especially during the debugging.

This said, such approach creates a lot of intermediate data and executes a lot of reads and writes, which might
significantly slow down processing, especially in the case of large data sets.

To overcome this drawback, DPK introduced a new type of transform - pipeline transform. Pipeline transform
(somewhat similar to [sklearn pipeline](https://scikit-learn.org/1.5/modules/generated/sklearn.pipeline.Pipeline.html))
is a transform, meaning it transforms one file at a time and a pipeline, meaning that this file is transformed by
a set of individual transformers, passing data between then as a byte array in memory.

## Creating pipeline transform.

Creation of the pipeline transform requires creation of runtime specific transform runtime configuration
leveraging [PipelineTransformConfiguration](../python/src/data_processing/transform/pipeline_transform_configuration.py)
Examples of such configuration can be found:

* [Python](../../transforms/universal/noop/python/src/noop_pipeline_transform_python.py)
* [Ray](../../transforms/universal/noop/ray/src/noop_pipeline_transform_ray.py)
* [Spark](../../transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py)

These are very simple examples using pipeline containing a single transform.

More complex example defining pipeline of two examples - Resize and NOOP can be found
[Python](../python/src/data_processing/test_support/transform/pipeline_transform.py) and
[Ray](../ray/src/data_processing_ray/test_support/transform/pipeline_transform.py)

***Note*** the limitation of pipeline transform is that all participating transforms have to be different,
The same transform can not be included twice.

## Running pipeline transform

Similar to the `ordinary` transforms, pipeline transforms can be invoked using launcher, but parameters,
in this case have to include parameters for all participating transforms. The base class
[AbstractPipelineTransform](../python/src/data_processing/transform/pipeline_transform.py) will initialize
all participating transforms based on these parameters

***Note*** as per DPK convention, parameters for every transform are prefixed by a transform name, which means
that a given transform will always get an appropriate parameter
5 changes: 4 additions & 1 deletion data-processing-lib/doc/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ There are currently two types of transforms defined in DPK:

* [AbstractBinaryTransform](../python/src/data_processing/transform/binary_transform.py) which is a base
class for all data transforms. Data transforms convert a file of data producing zero or more data files
and metadata. A specific class of the binary transform is
and metadata. Specific classes of the binary transform are
[AbstractTableTransform](../python/src/data_processing/transform/table_transform.py) that consumes and produces
data files containing [pyarrow tables](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html)
and [AbstractPipelineTransform](../python/src/data_processing/transform/pipeline_transform.py) that creates
pipelined execution of one or more transforms. For more information on pipelined transforms reffer to
[this](pipelined_transform.md)
* [AbstractFolderTransform](../python/src/data_processing/transform/folder_transform.py) which is a base
class consuming a folder (that can contain an arbitrary set of files, that need to be processed together)
and proces zero or more data files and metadata.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from data_processing.runtime.execution_configuration import TransformExecutionConfiguration, runtime_cli_prefix
from data_processing.runtime.runtime_configuration import TransformRuntimeConfiguration
from data_processing.runtime.transform_launcher import AbstractTransformLauncher, multi_launcher
from data_processing.runtime.transform_file_processor import AbstractTransformFileProcessor
from data_processing.runtime.transform_launcher import AbstractTransformLauncher, multi_launcher
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
# limitations under the License.
################################################################################

from data_processing.runtime import TransformRuntimeConfiguration
from data_processing.runtime.pure_python import DefaultPythonTransformRuntime
from data_processing.transform import TransformConfiguration
from data_processing.transform import TransformConfiguration, TransformRuntimeConfiguration


class PythonTransformRuntimeConfiguration(TransformRuntimeConfiguration):
Expand All @@ -26,12 +25,5 @@ def __init__(
:param transform_config - base configuration class
:param runtime_class: implementation of the transform runtime
"""
self.runtime_class = runtime_class
super().__init__(transform_config=transform_config)
super().__init__(transform_config=transform_config, runtime_class=runtime_class)

def create_transform_runtime(self) -> DefaultPythonTransformRuntime:
"""
Create transform runtime with the parameters captured during apply_input_params()
:return: transform runtime object
"""
return self.runtime_class(self.transform_config.get_transform_params())
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from typing import Any

from data_processing.data_access import DataAccessFactoryBase, DataAccess
from data_processing.transform import TransformStatistics
from data_processing.transform import TransformStatistics, BaseTransformRuntime


class DefaultPythonTransformRuntime:
class DefaultPythonTransformRuntime(BaseTransformRuntime):
"""
Transformer runtime used by processor to to create Transform specific environment
"""
Expand All @@ -26,7 +26,7 @@ def __init__(self, params: dict[str, Any]):
Create/config this runtime.
:param params: parameters, often provided by the CLI arguments as defined by a TableTansformConfiguration.
"""
self.params = params
super().__init__(params)

def get_folders(self, data_access: DataAccess) -> list[str]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
# Add data access and statistics to the processor parameters
self.transform_params = transform_parameters
self.transform_params["data_access"] = self.data_access
self.transform_params["data_access_factory"] = data_access_factory
self.is_folder = is_folder

def process_file(self, f_name: str) -> None:
Expand Down Expand Up @@ -103,10 +104,10 @@ def flush(self) -> None:
the hook for them to return back locally stored data and their statistics.
:return: None
"""
if self.last_file_name is None or self.is_folder:
if self.last_file_name is None:
# for some reason a given worker never processed anything. Happens in testing
# when the amount of workers is greater than the amount of files
self.logger.debug("skipping flush, no name for file is defined or this is a folder transform")
self.logger.debug("skipping flush, no name for file is defined")
return
try:
t_start = time.time()
Expand Down Expand Up @@ -226,7 +227,7 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats
def _publish_stats(self, stats: dict[str, Any]) -> None:
"""
Publishing execution statistics
:param stats: Statistics
:param stats: dictionary
:return: None
"""
raise ValueError("must be implemented by subclass")
raise NotImplemented("must be implemented by subclass")
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import argparse

from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase
from data_processing.runtime import TransformRuntimeConfiguration
from data_processing.transform import TransformRuntimeConfiguration
from data_processing.utils import ParamsUtils, get_logger


Expand All @@ -36,6 +36,7 @@ def __init__(
self.runtime_config = runtime_config
self.name = self.runtime_config.get_name()
self.data_access_factory = data_access_factory
self.execution_config = None

def _get_parser(self) -> argparse.ArgumentParser:
"""
Expand All @@ -56,9 +57,9 @@ def _get_arguments(self, parser: argparse.ArgumentParser) -> argparse.Namespace:
:return: list of arguments
"""
# add additional arguments
self.runtime_config.add_input_params(parser=parser)
self.data_access_factory.add_input_params(parser=parser)
self.execution_config.add_input_params(parser=parser)
self.runtime_config.add_input_params(parser=parser)
return parser.parse_args()

def _get_parameters(self, args: argparse.Namespace) -> bool:
Expand All @@ -67,11 +68,10 @@ def _get_parameters(self, args: argparse.Namespace) -> bool:
and does parameters validation
:return: True if validation passes or False, if not
"""
return (
self.runtime_config.apply_input_params(args=args)
return (self.runtime_config.apply_input_params(args=args)
and self.execution_config.apply_input_params(args=args)
and self.data_access_factory.apply_input_params(args=args)
)
)

def _submit_for_execution(self) -> int:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,14 @@
)
from data_processing.test_support.transform.noop_folder_transform import (
NOOPFolderTransform,
NOOPFolderPythonTransformConfiguration
)
NOOPTransformConfiguration,
NOOPFolderPythonTransformConfiguration,
)
from data_processing.test_support.transform.resize_transform import (
ResizeTransform,
ResizePythonTransformConfiguration,
)

from data_processing.test_support.transform.pipeline_transform import (
ResizeNOOPPythonTransformConfiguration,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from data_processing.runtime.pure_python import PythonTransformLauncher, PythonTransformRuntimeConfiguration
from data_processing.transform.pure_python import PythonPipelineTransform
from data_processing.transform import PipelineTransformConfiguration
from data_processing.utils import get_logger
from data_processing.test_support.transform import NOOPPythonTransformConfiguration, ResizePythonTransformConfiguration

logger = get_logger(__name__)


class ResizeNOOPPythonTransformConfiguration(PythonTransformRuntimeConfiguration):
"""
Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
"""

def __init__(self):
"""
Initialization
"""
super().__init__(transform_config=PipelineTransformConfiguration(
config={"transforms": [ResizePythonTransformConfiguration(),
NOOPPythonTransformConfiguration()]},
transform_class=PythonPipelineTransform))


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = PythonTransformLauncher(ResizeNOOPPythonTransformConfiguration())
logger.info("Launching resize/noop transform")
launcher.launch()
Loading