Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Update bytewax materialization #3368

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
from bytewax import Dataflow, cluster_main # type: ignore
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
from bytewax.parse import proc_env
from bytewax.dataflow import Dataflow # type: ignore
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig, distribute
from bytewax.outputs import ManualOutputConfig
from tqdm import tqdm

from feast import FeatureStore, FeatureView, RepoConfig
Expand Down Expand Up @@ -37,20 +38,15 @@ def process_path(self, path):

return batches

def input_builder(self, worker_index, worker_count, resume_epoch):
def input_builder(self, worker_index, worker_count, _state):
worker_paths = distribute(self.paths, worker_index, worker_count)
epoch = 0
for path in worker_paths:
yield AdvanceTo(epoch)
yield Emit(path)
epoch += 1
yield None, path

return

def output_builder(self, worker_index, worker_count):
def output_fn(epoch_batch):
_, batch = epoch_batch

def output_fn(batch):
table = pa.Table.from_batches([batch])

if self.feature_view.batch_source.field_mapping is not None:
Expand Down Expand Up @@ -79,11 +75,7 @@ def output_fn(epoch_batch):

def _run_dataflow(self):
flow = Dataflow()
flow.input("inp", ManualInputConfig(self.input_builder))
flow.flat_map(self.process_path)
flow.capture()
cluster_main(
flow,
ManualInputConfig(self.input_builder),
self.output_builder,
**proc_env(),
)
flow.capture(ManualOutputConfig(self.output_builder))
cluster_main(flow, [], 0)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
from feast.utils import _get_column_names, get_default_yaml_file_path

from .bytewax_materialization_job import BytewaxMaterializationJob

Expand Down Expand Up @@ -157,9 +157,6 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
# Create a k8s configmap with information needed by bytewax
self._create_configuration_map(job_id, paths, feature_view, self.namespace)

# Create the k8s service definition, used for bytewax communication
self._create_service_definition(job_id, self.namespace)

# Create the k8s job definition
self._create_job_definition(
job_id,
Expand All @@ -175,14 +172,10 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
def _create_configuration_map(self, job_id, paths, feature_view, namespace):
"""Create a Kubernetes configmap for this job"""

feature_store_configuration = yaml.dump(
yaml.safe_load(
self.repo_config.json(
exclude={"repo_path"},
exclude_unset=True,
)
)
)
repo_path = self.repo_config.repo_path
assert repo_path
feature_store_path = get_default_yaml_file_path(repo_path)
feature_store_configuration = feature_store_path.read_text()

materialization_config = yaml.dump(
{"paths": paths, "feature_view": feature_view.name}
Expand All @@ -204,41 +197,6 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
body=configmap_manifest,
)

def _create_service_definition(self, job_id, namespace):
"""Creates a kubernetes service definition.

This service definition is created to allow bytewax workers
to communicate with each other.
"""
service_definition = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"dataflow-{job_id}",
"namespace": namespace,
},
"spec": {
"clusterIP": "None",
"clusterIPs": ["None"],
"internalTrafficPolicy": "Cluster",
"ipFamilies": ["IPv4"],
"ipFamilyPolicy": "SingleStack",
"ports": [
{
"name": "worker",
"port": 9999,
"protocol": "TCP",
"targetPort": 9999,
}
],
"selector": {"job-name": f"dataflow-{job_id}"},
"sessionAffinity": "None",
"type": "ClusterIP",
},
}

utils.create_from_dict(self.k8s_client, service_definition)

def _create_job_definition(self, job_id, namespace, pods, env):
"""Create a kubernetes job definition."""
job_env = [
Expand Down Expand Up @@ -269,10 +227,6 @@ def _create_job_definition(self, job_id, namespace, pods, env):
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",
"value": "false",
},
{
"name": "BYTEWAX_HOSTFILE_PATH",
"value": "/etc/bytewax/hostfile.txt",
},
{
"name": "BYTEWAX_STATEFULSET_NAME",
"value": f"dataflow-{job_id}",
Expand All @@ -299,11 +253,6 @@ def _create_job_definition(self, job_id, namespace, pods, env):
"subdomain": f"dataflow-{job_id}",
"initContainers": [
{
"command": [
"sh",
"-c",
f'set -ex\n# Generate hostfile.txt.\necho "dataflow-{job_id}-0.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" > /etc/bytewax/hostfile.txt\nreplicas=$(($BYTEWAX_REPLICAS-1))\nx=1\nwhile [ $x -le $replicas ]\ndo\n echo "dataflow-{job_id}-$x.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" >> /etc/bytewax/hostfile.txt\n x=$(( $x + 1 ))\ndone',
],
"env": [
{
"name": "BYTEWAX_REPLICAS",
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@

AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"]

BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"]
BYTEWAX_REQUIRED = ["bytewax==0.13.1", "docker>=5.0.2", "kubernetes<=20.13.0"]

SNOWFLAKE_REQUIRED = [
"snowflake-connector-python[pandas]>=2.7.3,<3",
Expand Down