diff --git a/examples/productionizing/productionizing/customizing_resources.py b/examples/productionizing/productionizing/customizing_resources.py index 1484ca880..28be48118 100644 --- a/examples/productionizing/productionizing/customizing_resources.py +++ b/examples/productionizing/productionizing/customizing_resources.py @@ -85,6 +85,7 @@ def my_workflow(x: typing.List[int]) -> int: # # ## Using `with_overrides` # +# ### override Resources # You can use the `with_overrides` method to override the resources allocated to the tasks dynamically. # Let's understand how the resources can be initialized with an example. @@ -131,6 +132,10 @@ def my_pipeline(x: typing.List[int]) -> int: print(count_unique_numbers_1(x=[1, 1, 2])) print(my_pipeline(x=[1, 1, 2])) +from typing import NamedTuple + +import tensorflow as tf + # %% [markdown] # You can see the memory allocation below. The memory limit is `500Mi` rather than `350Mi`, and the # CPU limit is 4, whereas it should have been 6 as specified using `with_overrides`. @@ -142,3 +147,78 @@ def my_pipeline(x: typing.List[int]) -> int: # Resource allocated using "with_overrides" method # ::: # +# ### override task_config +# Another example for using `with_overrides` method to override the `task_config`. +# In the following we take TF Trainning for example. +# Let’s understand how the TfJob can be initialized and override with an example. +# +# For task_config, refer to the {py:func}`flytekit:flytekit.task` documentation. +# +# To create a TensorFlow task, add {py:class}`flytekitplugins:flytekitplugins.kftensorflow.TfJob` config to the Flyte task, that is a plugin can run distributed TensorFlow training on Kubernetes. +# %% +from flytekit import Resources, dynamic, task, workflow +from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker + +TrainingOutputs = NamedTuple( + "TrainingOutputs", + [ + ("model", tf.keras.Model), + ("accuracy", float), + ("loss", float), + ], +) + + +@task( + task_config=TfJob(worker=Worker(replicas=1), ps=PS(replicas=1), chief=Chief(replicas=1)), + cache_version="1.0", + cache=True, + requests=Resources(cpu="1", mem="2048Mi"), + limits=Resources(cpu="1", mem="2048Mi"), +) +def train_model() -> TrainingOutputs: + (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data() + X_train, X_test = X_train / 255.0, X_test / 255.0 + strategy = tf.distribute.MirroredStrategy() + with strategy.scope(): + model = tf.keras.models.Sequential( + [ + tf.keras.layers.Flatten(input_shape=(28, 28)), + tf.keras.layers.Dense(128, activation="relu"), + tf.keras.layers.Dropout(0.2), + tf.keras.layers.Dense(10), + ] + ) + model.compile( + optimizer="adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=["accuracy"] + ) + BATCH_SIZE = 64 + NUM_EPOCHS = 5 + model.fit(X_train, y_train, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE) + test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=2) + + return TrainingOutputs(model=model, accuracy=test_accuracy, loss=test_loss) + + +# %% [markdown] +# You can use `@dynamic` to generate tasks at runtime with any custom configurations you want, and `with_overrides` method overrides the old configuration allocations. +# For here we override the worker replica count. +# %% +@workflow +def my_tensorflow_workflow() -> TrainingOutputs: + return train_model() + + +@dynamic +def dynamic_run(new_worker: int) -> TrainingOutputs: + return train_model().with_overrides( + task_config=TfJob(worker=Worker(replicas=new_worker), ps=PS(replicas=1), chief=Chief(replicas=1)) + ) + + +# %% [markdown] +# You can execute the workflow locally. +# %% +if __name__ == "__main__": + print(my_tensorflow_workflow()) + print(dynamic_run(new_worker=4))