diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index 1fad2c909f..bf5229303a 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -3,9 +3,10 @@ import pyarrow as pa import pyarrow.parquet as pq import s3fs -from bytewax import Dataflow, cluster_main # type: ignore -from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute -from bytewax.parse import proc_env +from bytewax.dataflow import Dataflow # type: ignore +from bytewax.execution import cluster_main +from bytewax.inputs import ManualInputConfig, distribute +from bytewax.outputs import ManualOutputConfig from tqdm import tqdm from feast import FeatureStore, FeatureView, RepoConfig @@ -37,20 +38,15 @@ def process_path(self, path): return batches - def input_builder(self, worker_index, worker_count, resume_epoch): + def input_builder(self, worker_index, worker_count, _state): worker_paths = distribute(self.paths, worker_index, worker_count) - epoch = 0 for path in worker_paths: - yield AdvanceTo(epoch) - yield Emit(path) - epoch += 1 + yield None, path return def output_builder(self, worker_index, worker_count): - def output_fn(epoch_batch): - _, batch = epoch_batch - + def output_fn(batch): table = pa.Table.from_batches([batch]) if self.feature_view.batch_source.field_mapping is not None: @@ -79,11 +75,7 @@ def output_fn(epoch_batch): def _run_dataflow(self): flow = Dataflow() + flow.input("inp", ManualInputConfig(self.input_builder)) flow.flat_map(self.process_path) - flow.capture() - cluster_main( - flow, - ManualInputConfig(self.input_builder), - self.output_builder, - **proc_env(), - ) + flow.capture(ManualOutputConfig(self.output_builder)) + cluster_main(flow, [], 0) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 0477722eb1..9a456376bf 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -23,7 +23,7 @@ from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel from feast.stream_feature_view import StreamFeatureView -from feast.utils import _get_column_names +from feast.utils import _get_column_names, get_default_yaml_file_path from .bytewax_materialization_job import BytewaxMaterializationJob @@ -157,9 +157,6 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): # Create a k8s configmap with information needed by bytewax self._create_configuration_map(job_id, paths, feature_view, self.namespace) - # Create the k8s service definition, used for bytewax communication - self._create_service_definition(job_id, self.namespace) - # Create the k8s job definition self._create_job_definition( job_id, @@ -175,14 +172,10 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): def _create_configuration_map(self, job_id, paths, feature_view, namespace): """Create a Kubernetes configmap for this job""" - feature_store_configuration = yaml.dump( - yaml.safe_load( - self.repo_config.json( - exclude={"repo_path"}, - exclude_unset=True, - ) - ) - ) + repo_path = self.repo_config.repo_path + assert repo_path + feature_store_path = get_default_yaml_file_path(repo_path) + feature_store_configuration = feature_store_path.read_text() materialization_config = yaml.dump( {"paths": paths, "feature_view": feature_view.name} @@ -204,41 +197,6 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): body=configmap_manifest, ) - def _create_service_definition(self, job_id, namespace): - """Creates a kubernetes service definition. - - This service definition is created to allow bytewax workers - to communicate with each other. - """ - service_definition = { - "apiVersion": "v1", - "kind": "Service", - "metadata": { - "name": f"dataflow-{job_id}", - "namespace": namespace, - }, - "spec": { - "clusterIP": "None", - "clusterIPs": ["None"], - "internalTrafficPolicy": "Cluster", - "ipFamilies": ["IPv4"], - "ipFamilyPolicy": "SingleStack", - "ports": [ - { - "name": "worker", - "port": 9999, - "protocol": "TCP", - "targetPort": 9999, - } - ], - "selector": {"job-name": f"dataflow-{job_id}"}, - "sessionAffinity": "None", - "type": "ClusterIP", - }, - } - - utils.create_from_dict(self.k8s_client, service_definition) - def _create_job_definition(self, job_id, namespace, pods, env): """Create a kubernetes job definition.""" job_env = [ @@ -269,10 +227,6 @@ def _create_job_definition(self, job_id, namespace, pods, env): "name": "BYTEWAX_KEEP_CONTAINER_ALIVE", "value": "false", }, - { - "name": "BYTEWAX_HOSTFILE_PATH", - "value": "/etc/bytewax/hostfile.txt", - }, { "name": "BYTEWAX_STATEFULSET_NAME", "value": f"dataflow-{job_id}", @@ -299,11 +253,6 @@ def _create_job_definition(self, job_id, namespace, pods, env): "subdomain": f"dataflow-{job_id}", "initContainers": [ { - "command": [ - "sh", - "-c", - f'set -ex\n# Generate hostfile.txt.\necho "dataflow-{job_id}-0.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" > /etc/bytewax/hostfile.txt\nreplicas=$(($BYTEWAX_REPLICAS-1))\nx=1\nwhile [ $x -le $replicas ]\ndo\n echo "dataflow-{job_id}-$x.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" >> /etc/bytewax/hostfile.txt\n x=$(( $x + 1 ))\ndone', - ], "env": [ { "name": "BYTEWAX_REPLICAS", diff --git a/setup.py b/setup.py index 2764faa697..27f4ff7ed3 100644 --- a/setup.py +++ b/setup.py @@ -93,7 +93,7 @@ AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"] -BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"] +BYTEWAX_REQUIRED = ["bytewax==0.13.1", "docker>=5.0.2", "kubernetes<=20.13.0"] SNOWFLAKE_REQUIRED = [ "snowflake-connector-python[pandas]>=2.7.3,<3",