From 5290b0fd5ddef5252f02f82f2cf3f55473c48363 Mon Sep 17 00:00:00 2001 From: Dan Herrera Date: Tue, 29 Nov 2022 12:22:37 -0800 Subject: [PATCH 1/3] Updates Bytewax materialization engine. - Since no data is exchanged between workers, remove k8s service definition. - Update bytewax dataflow to only use one worker. Signed-off-by: Dan Herrera --- .../bytewax_materialization_dataflow.py | 5 +- .../bytewax/bytewax_materialization_engine.py | 61 ++----------------- 2 files changed, 6 insertions(+), 60 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index 1fad2c909f..02be584758 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -82,8 +82,5 @@ def _run_dataflow(self): flow.flat_map(self.process_path) flow.capture() cluster_main( - flow, - ManualInputConfig(self.input_builder), - self.output_builder, - **proc_env(), + flow, ManualInputConfig(self.input_builder), self.output_builder, [], 0 ) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 0477722eb1..9a456376bf 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -23,7 +23,7 @@ from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel from feast.stream_feature_view import StreamFeatureView -from feast.utils import _get_column_names +from feast.utils import _get_column_names, get_default_yaml_file_path from .bytewax_materialization_job import BytewaxMaterializationJob @@ -157,9 +157,6 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): # Create a k8s configmap with information needed by bytewax self._create_configuration_map(job_id, paths, feature_view, self.namespace) - # Create the k8s service definition, used for bytewax communication - self._create_service_definition(job_id, self.namespace) - # Create the k8s job definition self._create_job_definition( job_id, @@ -175,14 +172,10 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): def _create_configuration_map(self, job_id, paths, feature_view, namespace): """Create a Kubernetes configmap for this job""" - feature_store_configuration = yaml.dump( - yaml.safe_load( - self.repo_config.json( - exclude={"repo_path"}, - exclude_unset=True, - ) - ) - ) + repo_path = self.repo_config.repo_path + assert repo_path + feature_store_path = get_default_yaml_file_path(repo_path) + feature_store_configuration = feature_store_path.read_text() materialization_config = yaml.dump( {"paths": paths, "feature_view": feature_view.name} @@ -204,41 +197,6 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): body=configmap_manifest, ) - def _create_service_definition(self, job_id, namespace): - """Creates a kubernetes service definition. - - This service definition is created to allow bytewax workers - to communicate with each other. - """ - service_definition = { - "apiVersion": "v1", - "kind": "Service", - "metadata": { - "name": f"dataflow-{job_id}", - "namespace": namespace, - }, - "spec": { - "clusterIP": "None", - "clusterIPs": ["None"], - "internalTrafficPolicy": "Cluster", - "ipFamilies": ["IPv4"], - "ipFamilyPolicy": "SingleStack", - "ports": [ - { - "name": "worker", - "port": 9999, - "protocol": "TCP", - "targetPort": 9999, - } - ], - "selector": {"job-name": f"dataflow-{job_id}"}, - "sessionAffinity": "None", - "type": "ClusterIP", - }, - } - - utils.create_from_dict(self.k8s_client, service_definition) - def _create_job_definition(self, job_id, namespace, pods, env): """Create a kubernetes job definition.""" job_env = [ @@ -269,10 +227,6 @@ def _create_job_definition(self, job_id, namespace, pods, env): "name": "BYTEWAX_KEEP_CONTAINER_ALIVE", "value": "false", }, - { - "name": "BYTEWAX_HOSTFILE_PATH", - "value": "/etc/bytewax/hostfile.txt", - }, { "name": "BYTEWAX_STATEFULSET_NAME", "value": f"dataflow-{job_id}", @@ -299,11 +253,6 @@ def _create_job_definition(self, job_id, namespace, pods, env): "subdomain": f"dataflow-{job_id}", "initContainers": [ { - "command": [ - "sh", - "-c", - f'set -ex\n# Generate hostfile.txt.\necho "dataflow-{job_id}-0.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" > /etc/bytewax/hostfile.txt\nreplicas=$(($BYTEWAX_REPLICAS-1))\nx=1\nwhile [ $x -le $replicas ]\ndo\n echo "dataflow-{job_id}-$x.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" >> /etc/bytewax/hostfile.txt\n x=$(( $x + 1 ))\ndone', - ], "env": [ { "name": "BYTEWAX_REPLICAS", From a8d4ac471cbcd2397849d5621ea06be0d6f20689 Mon Sep 17 00:00:00 2001 From: Dan Herrera Date: Tue, 29 Nov 2022 14:12:08 -0800 Subject: [PATCH 2/3] Update Bytewax to latest version. Signed-off-by: Dan Herrera --- .../bytewax_materialization_dataflow.py | 27 +++++++++---------- setup.py | 2 +- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index 02be584758..b602b30922 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -3,9 +3,12 @@ import pyarrow as pa import pyarrow.parquet as pq import s3fs -from bytewax import Dataflow, cluster_main # type: ignore -from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute -from bytewax.parse import proc_env + +from bytewax.dataflow import Dataflow # type: ignore +from bytewax.execution import cluster_main +from bytewax.inputs import ManualInputConfig, distribute +from bytewax.outputs import ManualOutputConfig + from tqdm import tqdm from feast import FeatureStore, FeatureView, RepoConfig @@ -37,20 +40,15 @@ def process_path(self, path): return batches - def input_builder(self, worker_index, worker_count, resume_epoch): + def input_builder(self, worker_index, worker_count, _state): worker_paths = distribute(self.paths, worker_index, worker_count) - epoch = 0 for path in worker_paths: - yield AdvanceTo(epoch) - yield Emit(path) - epoch += 1 + yield None, path return def output_builder(self, worker_index, worker_count): - def output_fn(epoch_batch): - _, batch = epoch_batch - + def output_fn(batch): table = pa.Table.from_batches([batch]) if self.feature_view.batch_source.field_mapping is not None: @@ -79,8 +77,7 @@ def output_fn(epoch_batch): def _run_dataflow(self): flow = Dataflow() + flow.input("inp", ManualInputConfig(self.input_builder)) flow.flat_map(self.process_path) - flow.capture() - cluster_main( - flow, ManualInputConfig(self.input_builder), self.output_builder, [], 0 - ) + flow.capture(ManualOutputConfig(self.output_builder)) + cluster_main(flow, [], 0) diff --git a/setup.py b/setup.py index 2764faa697..27f4ff7ed3 100644 --- a/setup.py +++ b/setup.py @@ -93,7 +93,7 @@ AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"] -BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"] +BYTEWAX_REQUIRED = ["bytewax==0.13.1", "docker>=5.0.2", "kubernetes<=20.13.0"] SNOWFLAKE_REQUIRED = [ "snowflake-connector-python[pandas]>=2.7.3,<3", From 49aa989c8b03fe25412e8d2ebb2d549e794fd5a8 Mon Sep 17 00:00:00 2001 From: Dan Herrera Date: Wed, 30 Nov 2022 15:59:31 -0800 Subject: [PATCH 3/3] Format imports. Signed-off-by: Dan Herrera --- .../contrib/bytewax/bytewax_materialization_dataflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index b602b30922..bf5229303a 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -3,12 +3,10 @@ import pyarrow as pa import pyarrow.parquet as pq import s3fs - from bytewax.dataflow import Dataflow # type: ignore from bytewax.execution import cluster_main from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig - from tqdm import tqdm from feast import FeatureStore, FeatureView, RepoConfig