Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename launch package to runtime #106

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions data-processing-lib/doc/advanced-transform-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ from typing import Any
import pyarrow as pa
import ray
from data_processing.data_access import DataAccessFactory
from data_processing.launch.ray import (
RayLauncherConfiguration,
DefaultTableTransformRuntimeRay,
RayUtils,
RayTransformLauncher,
from data_processing.runtime.ray import (
RayLauncherConfiguration,
DefaultTableTransformRuntimeRay,
RayUtils,
RayTransformLauncher,
)
from data_processing.transform import AbstractTableTransform
from data_processing.utils import GB, TransformUtils
Expand All @@ -70,10 +70,10 @@ from ray.actor import ActorHandle

class EdedupTransform(AbstractTableTransform):

def __init__(self, config: dict):
super().__init__(config)
self.doc_column = config.get("doc_column", "")
self.hashes = config.get("hashes", [])
def __init__(self, config: dict):
super().__init__(config)
self.doc_column = config.get("doc_column", "")
self.hashes = config.get("hashes", [])
```
The `EdedupTransform` class extends the `AbstractTableTransform`, which defines the required methods.

Expand Down Expand Up @@ -136,7 +136,7 @@ If there is no metadata then simply return an empty dictionary.

First, let's define the transform runtime class. To do this we extend
the base abstract/interface class
[DefaultTableTransformRuntime](../src/data_processing/launch/ray/transform_runtime.py),
[DefaultTableTransformRuntime](../src/data_processing/runtime/ray/transform_runtime.py),
which requires definition of the following:
* an initializer (i.e. `init()`) that accepts a dictionary of configuration
data. For this example, the configuration data will only be defined by
Expand Down
16 changes: 8 additions & 8 deletions data-processing-lib/doc/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ process many input files in parallel using a distribute network of RayWorkers.

The architecture includes the following core components:

* [RayLauncher](../src/data_processing/launch/ray/transform_launcher.py) accepts and validates
* [RayLauncher](../src/data_processing/runtime/ray/transform_launcher.py) accepts and validates
CLI parameters to establish the Ray Orchestrator with the proper configuration.
It uses the following components, all of which can/do define CLI configuration parameters.:
* [Transform Orchestrator Configuration](../src/data_processing/launch/ray/transform_orchestrator_configuration.py) is responsible
* [Transform Orchestrator Configuration](../src/data_processing/runtime/ray/transform_orchestrator_configuration.py) is responsible
for defining and validating infrastructure parameters
(e.g., number of workers, memory and cpu, local or remote cluster, etc.). This class has very simple state
(several dictionaries) and is fully pickleable. As a result framework uses its instance as a
Expand All @@ -25,14 +25,14 @@ It uses the following components, all of which can/do define CLI configuration p
configuration for the type of DataAccess to use when reading/writing the input/output data for
the transforms. Similar to Transform Orchestrator Configuration, this is a pickleable
instance that is passed between Launcher, Orchestrator and Workers.
* [TransformConfiguration](../src/data_processing/launch/ray/transform_runtime.py) - defines specifics
* [TransformConfiguration](../src/data_processing/runtime/ray/transform_runtime.py) - defines specifics
of the transform implementation including transform implementation class, its short name, any transform-
specific CLI parameters, and an optional TransformRuntime class, discussed below.

After all parameters are validated, the ray cluster is started and the DataAccessFactory, TransformOrchestratorConfiguraiton
and TransformConfiguration are given to the Ray Orchestrator, via Ray remote() method invocation.
The Launcher waits for the Ray Orchestrator to complete.
* [Ray Orchestrator](../src/data_processing/launch/ray/transform_orchestrator.py) is responsible for overall management of
* [Ray Orchestrator](../src/data_processing/runtime/ray/transform_orchestrator.py) is responsible for overall management of
the data processing job. It creates the actors, determines the set of input data and distributes the
references to the data files to be processed by the workers. More specifically, it performs the following:
1. Uses the DataAccess instance created by the DataAccessFactory to determine the set of the files
Expand All @@ -53,12 +53,12 @@ It uses the following components, all of which can/do define CLI configuration p
Once all data is processed, the orchestrator will collect execution statistics (from the statistics actor)
and build and save it in the form of execution metadata (`metadata.json`). Finally, it will return the execution
result to the Launcher.
* [Ray worker](../src/data_processing/launch/ray/transform_table_processor.py) is responsible for
* [Ray worker](../src/data_processing/runtime/ray/transform_table_processor.py) is responsible for
reading files (as [PyArrow Tables](https://levelup.gitconnected.com/deep-dive-into-pyarrow-understanding-its-features-and-benefits-2cce8b1466c8))
assigned by the orchestrator, applying the transform to the input table and writing out the
resulting table(s). Metadata produced by each table transformation is aggregated into
Transform Statistics (below).
* [Transform Statistics](../src/data_processing/launch/ray/transform_statistics.py) is a general
* [Transform Statistics](../src/data_processing/runtime/ray/transform_statistics.py) is a general
purpose data collector actor aggregating the numeric metadata from different places of
the framework (especially metadata produced by the transform).
These statistics are reported as metadata (`metadata.json`) by the orchestrator upon completion.
Expand Down Expand Up @@ -92,10 +92,10 @@ For a more complete discussion, see the [tutorials](transform-tutorials.md).
of any transform implementation - `transform()` and `flush()` - and provides the bulk of any transform implementation
convert one Table to 0 or more new Tables. In general, this is not tied to the above Ray infrastructure
and so can usually be used independent of Ray.
* [TransformRuntime ](../src/data_processing/launch/ray/transform_runtime.py) - this class only needs to be
* [TransformRuntime ](../src/data_processing/runtime/ray/transform_runtime.py) - this class only needs to be
extended/implemented when additional Ray components (actors, shared memory objects, etc.) are used
by the transform. The main method `get_transform_config()` is used to enable these extensions.
* [TransformConfiguration](../src/data_processing/launch/ray/transform_runtime.py) - this is the bootstrap
* [TransformConfiguration](../src/data_processing/runtime/ray/transform_runtime.py) - this is the bootstrap
class provided to the Launcher that enables the instantiation of the Transform and the TransformRuntime within
the architecture. It is a CLIProvider, which allows it to define transform-specific CLI configuration
that is made available to the Transform's initializer.
Expand Down
4 changes: 2 additions & 2 deletions data-processing-lib/doc/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ developers of data transformation are:

* [Transformation](../src/data_processing/transform/table_transform.py) - a simple, easily-implemented interface defines
the specifics of a given data transformation.
* [Transform Configuration](../src/data_processing/launch/ray/transform_runtime.py) - defines
* [Transform Configuration](../src/data_processing/runtime/ray/transform_runtime.py) - defines
the transform implementation and runtime classes, the
command line arguments specific to transform, and the short name for the transform.
* [Transformation Runtime](../src/data_processing/launch/ray/transform_runtime.py) - allows for customization of the Ray environment for the transformer.
* [Transformation Runtime](../src/data_processing/runtime/ray/transform_runtime.py) - allows for customization of the Ray environment for the transformer.
This might include provisioning of shared memory objects or creation of additional actors.

To learn more consider the following:
Expand Down
2 changes: 1 addition & 1 deletion data-processing-lib/doc/simplest-transform-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ from argparse import ArgumentParser, Namespace
from typing import Any

import pyarrow as pa
from data_processing.launch.ray import (
from data_processing.runtime.ray import (
RayLauncherConfiguration,
DefaultTableTransformRuntimeRay,
RayTransformLauncher,
Expand Down
24 changes: 12 additions & 12 deletions data-processing-lib/doc/transform-tutorials.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ All transforms operate on a [pyarrow Table](https://arrow.apache.org/docs/python
and produce zero or more transformed tables and transform specific metadata.
The Transform itself need only be concerned with the conversion of one in memory table at a time.

When running in the Ray worker (i.e. [TransformTableProcessor](../src/data_processing/launch/ray/transform_table_processor.py) ), the input
When running in the Ray worker (i.e. [TransformTableProcessor](../src/data_processing/runtime/ray/transform_table_processor.py) ), the input
tables are read from parquet files and the transform table(s) is/are stored in destination parquet files.
Metadata accumulated across calls to all transforms is stored in the destination.

Expand Down Expand Up @@ -39,55 +39,55 @@ not need this feature, a default implementation is provided to return an empty l
### Running in Ray
When a transform is run using the Ray-based framework a number of other capabilities are involved:

* [Transform Runtime](../src/data_processing/launch/ray/transform_runtime.py) - this provides the ability for the
* [Transform Runtime](../src/data_processing/runtime/ray/transform_runtime.py) - this provides the ability for the
transform implementor to create additional Ray resources
and include them in the configuration used to create a transform
(see, for example, [exact dedup](../../transforms/universal/ededup/src/ededup_transform.py)
or [blocklist](../../transforms/universal/blocklisting/src/blocklist_transform.py)).
This also provide the ability to supplement the statics collected by
[Statistics](../src/data_processing/launch/ray/transform_statistics.py) (see below).
* [Transform Configuration](../src/data_processing/launch/ray/transform_runtime.py) - defines the following:
[Statistics](../src/data_processing/runtime/ray/transform_statistics.py) (see below).
* [Transform Configuration](../src/data_processing/runtime/ray/transform_runtime.py) - defines the following:
* the transform class to be used,
* command line arguments used to initialize the Transform Runtime and generally, the Transform.
* Transform Runtime class to use
* transform short name
* [Transform Launcher](../src/data_processing/launch/ray/transform_launcher.py) - this is a class generally used to
* [Transform Launcher](../src/data_processing/runtime/ray/transform_launcher.py) - this is a class generally used to
implement `main()` that makes use of a Transform Configuration to start the Ray runtime and execute the transforms.

Roughly speaking the following steps are completed to establish transforms in the RayWorkers

1. Launcher parses the CLI parameters using an ArgumentParser configured with its own CLI parameters
along with those of the Transform Configuration,
2. Launcher passes the Transform Configuration and CLI parameters to the [RayOrchestrator](../src/data_processing/launch/ray/transform_orchestrator.py)
2. Launcher passes the Transform Configuration and CLI parameters to the [RayOrchestrator](../src/data_processing/runtime/ray/transform_orchestrator.py)
3. RayOrchestrator creates the Transform Runtime using the Transform Configuration and its CLI parameter values
4. Transform Runtime creates transform initialization/configuration including the CLI parameters,
and any Ray components need by the transform.
5. [RayWorker](../src/data_processing/launch/ray/transform_table_processor.py) is started with configuration from the Transform Runtime.
5. [RayWorker](../src/data_processing/runtime/ray/transform_table_processor.py) is started with configuration from the Transform Runtime.
6. RayWorker creates the Transform using the configuration provided by the Transform Runtime.
7. Statistics is used to collect the statistics submitted by the individual transform, that
is used for building execution metadata.

![Processing Architecture](processing-architecture.jpg)

#### Transform Launcher
The [TransformLauncher](../src/data_processing/launch/ray/transform_launcher.py) uses the Transform Configuration
The [TransformLauncher](../src/data_processing/runtime/ray/transform_launcher.py) uses the Transform Configuration
and provides a single method, `launch()`, that kicks off the Ray environment and transform execution coordinated
by [orchestrator](../src/data_processing/launch/ray/transform_orchestrator.py).
by [orchestrator](../src/data_processing/runtime/ray/transform_orchestrator.py).
For example,
```python
launcher = TransformLauncher(MyTransformConfiguration())
launcher.launch()
```
Note that the launcher defines some additional CLI parameters that are used to control the operation of the
[orchestrator and workers](../src/data_processing/launch/ray/transform_orchestrator_configuration.py) and
[orchestrator and workers](../src/data_processing/runtime/ray/transform_orchestrator_configuration.py) and
[data access](../src/data_processing/data_access/data_access_factory.py). Things such as data access configuration,
number of workers, worker resources, etc.
Discussion of these options is beyond the scope of this document
(see [Launcher Options](launcher-options.md) for a list of available options.)

#### Transform Configuration
The
[DefaultTableTransformConfiguration](../src/data_processing/launch/ray/transform_runtime.py)
[DefaultTableTransformConfiguration](../src/data_processing/runtime/ray/transform_runtime.py)
class is sub-classed and initialized with transform-specific name, and implementation
and runtime classes. In addition, it is responsible for providing transform-specific
methods to define and filter optional command line arguments.
Expand All @@ -112,7 +112,7 @@ Details are covered in the samples below.

#### Transform Runtime
The
[DefaultTableTransformRuntime](../src/data_processing/launch/ray/transform_runtime.py)
[DefaultTableTransformRuntime](../src/data_processing/runtime/ray/transform_runtime.py)
class is provided and will be
sufficient for many use cases, especially 1:1 table transformation.
However, some transforms will require use of the Ray environment, for example,
Expand Down
2 changes: 0 additions & 2 deletions data-processing-lib/src/data_processing/launch/__init__.py

This file was deleted.

This file was deleted.

This file was deleted.

2 changes: 2 additions & 0 deletions data-processing-lib/src/data_processing/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from data_processing.runtime.execution_configuration import TransformExecutionConfiguration
from data_processing.runtime.transform_launcher import AbstractTransformLauncher
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from data_processing.runtime.pure_python.python_launcher_configuration import PythonLauncherConfiguration
from data_processing.runtime.pure_python.transform_table_processor import TransformTableProcessor
from data_processing.runtime.pure_python.transform_orchestrator import orchestrate
from data_processing.runtime.pure_python.transform_launcher import PythonTransformLauncher
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Any
from argparse import ArgumentParser, Namespace
from typing import Any

from data_processing.transform import TransformConfiguration
from data_processing.transform import AbstractTableTransform
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import CLIArgumentProvider


Expand All @@ -28,7 +27,8 @@ class PythonLauncherConfiguration(CLIArgumentProvider):
"""

def __init__(
self, transform_configuration: TransformConfiguration,
self,
transform_configuration: TransformConfiguration,
):
"""
Initialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
import time

from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase
from data_processing.launch import TransformExecutionConfiguration
from data_processing.runtime import TransformExecutionConfiguration
from data_processing.runtime.pure_python import PythonLauncherConfiguration, orchestrate
from data_processing.runtime.transform_launcher import AbstractTransformLauncher
from data_processing.transform import TransformConfiguration
from data_processing.launch.pure_python import orchestrate
from data_processing.launch.transform_launcher import AbstractTransformLauncher

from data_processing.launch.pure_python import PythonLauncherConfiguration
from data_processing.utils import get_logger


Expand All @@ -33,7 +31,7 @@ class PythonTransformLauncher(AbstractTransformLauncher):

def __init__(
self,
# transform_runtime_config: PythonLauncherConfiguration,
# transform_runtime_config: PythonLauncherConfiguration,
transform_config: TransformConfiguration,
data_access_factory: DataAccessFactoryBase = DataAccessFactory(),
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
from datetime import datetime

from data_processing.data_access import DataAccessFactoryBase
from data_processing.launch import TransformExecutionConfiguration
from data_processing.launch.pure_python import TransformTableProcessor
from data_processing.transform import (
TransformStatistics,
from data_processing.runtime import TransformExecutionConfiguration
from data_processing.runtime.pure_python import (
PythonLauncherConfiguration,
TransformTableProcessor,
)
from data_processing.launch.pure_python import PythonLauncherConfiguration
from data_processing.transform import TransformStatistics
from data_processing.utils import get_logger


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

import pyarrow as pa
from data_processing.data_access import DataAccessFactoryBase
from data_processing.runtime.pure_python import PythonLauncherConfiguration
from data_processing.transform import TransformStatistics
from data_processing.utils import TransformUtils, get_logger
from data_processing.launch.pure_python import PythonLauncherConfiguration


logger = get_logger(__name__)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from data_processing.runtime.ray.ray_utils import RayUtils
from data_processing.runtime.ray.transform_statistics import TransformStatisticsRay
from data_processing.runtime.ray.transform_table_processor import TransformTableProcessorRay
from data_processing.runtime.ray.transform_runtime import DefaultTableTransformRuntimeRay
from data_processing.runtime.ray.transform_launch_configuration import RayLauncherConfiguration
from data_processing.runtime.ray.transform_orchestrator_configuration import TransformOrchestratorConfiguration
from data_processing.runtime.ray.transform_orchestrator import orchestrate
from data_processing.runtime.ray.transform_launcher import RayTransformLauncher
Loading