-
Notifications
You must be signed in to change notification settings - Fork 700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SDK] Create Unify Training Client #1719
[SDK] Create Unify Training Client #1719
Conversation
/hold for review |
Pull Request Test Coverage Report for Build 3914850839
💛 - Coveralls |
27a1b70
to
32e32f2
Compare
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
I've updated the SDK examples, this PR is ready for review |
namespace=SDK_TEST_NAMESPACE): | ||
raise RuntimeError("The MXJob is not succeeded.") | ||
TRAINING_CLIENT.wait_for_job_conditions( | ||
JOB_NAME, SDK_TEST_NAMESPACE, constants.MXJOB_KIND |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to validate success condition in Job condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnugeorge By default it waits for the Succeeded condition:
expected_conditions: Set = {constants.JOB_CONDITION_SUCCEEDED}, |
So if condition is incorrect this API fails.
raise RuntimeError("The MXJob is not succeeded.") | ||
TRAINING_CLIENT.wait_for_job_conditions( | ||
JOB_NAME, SDK_TEST_NAMESPACE, constants.MXJOB_KIND | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, can you add few more extra validations with other SDK functions as well (which are missing now) like get_job_pod_names, list_mxjobs, is_job_succeeded, is_job_created, get_job_conditions etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnugeorge Sure, I'll add them.
FYI This is a breaking SDK change |
@@ -75,7 +78,7 @@ $ cat /tmp/output/artifact/junit_test_simple_tfjob_cpu.xml | |||
|
|||
## Common issues | |||
|
|||
1. ksonnet is not installed | |||
1. ksonnet is not installed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we removed all configuration files for ksonnet in this PR. So Can we remove this section?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to update these docs for the new E2Es, do we want to do it in the following PRs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I'm ok with either PR.
# Add Kubernetes models to proper deserialization of Training models. | ||
with open(os.path.join(sdk_dir, "kubeflow/training/models/__init__.py"), "r") as f: | ||
new_lines = [] | ||
for line in f.readlines(): | ||
new_lines.append(line) | ||
if line.startswith("from __future__ import absolute_import"): | ||
new_lines.append("\n") | ||
new_lines.append("# Import Kubernetes models.\n") | ||
new_lines.append("from kubernetes.client import *\n") | ||
with open(os.path.join(sdk_dir, "kubeflow/training/models/__init__.py"), "w") as f: | ||
f.writelines(new_lines) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to generate swagger.json containing kubernetes APIs, not importing them to Python SDK.
That helps users to generate SDK of other languages by themselves.
For example, we can generate swagger.json with kubernetes core API in the following:
$ openapi-gen --input-dirs github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1,github.com/kubeflow/common/pkg/apis/common/v1,k8s.io/api/core/v1 --report-filename=hack/violation_exception.list \
--output-package github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1 \
--go-header-file hack/boilerplate/boilerplate.go.txt \
--output-base "${TEMP_DIR}"
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, but in that case we are going to store all the Kubernetes models in our repo.
Do we want it @tenzen-y @johnugeorge ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... You're correct.
It might be better to import kubernetes models to Python SDK in this PR. And then we create an issue about this.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that we can keep the way as it is for this PR and create a separate issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let's discuss the long term plan for it separately.
namespace: str = utils.get_default_target_namespace(), | ||
job_kind: str = constants.TFJOB_KIND, | ||
job: object = None, | ||
timeout: int = constants.DEFAULT_TIMEOUT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make timeout configurable, the same as Katib SDK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I've already done it in this commit: c6d1516.
Does it sound good ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have checked that commit. Sounds good. Thanks!
models.KubeflowOrgV1TFJob, | ||
models.KubeflowOrgV1PyTorchJob, | ||
models.KubeflowOrgV1MXJob, | ||
models.KubeflowOrgV1XGBoostJob, | ||
models.KubeflowOrgV1MPIJob, | ||
models.KubeflowOrgV1PaddleJob, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use constants, {list(constants.JOB_KINDS.keys())}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tenzen-y Currently, I store Job classes under JOB_KIND["key"]["model"]
parameter. So list(constants.JOB_KINDS.keys()
returns ["TFJob", "PyTorchJob", ...]
Any ideas how to simplify this check ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use list comprehensions in the following:
f"{[d.get('model') for d in list(constants.JOB_KIND.values())]}"
Although, the above expressions might make it more complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great :)
label_selector = f"{constants.JOB_NAME_LABEL}={name}" | ||
|
||
# Add Job role label if that is required. | ||
if is_master: | ||
label_selector += f",{constants.JOB_ROLE_LABEL}={constants.JOB_ROLE_MASTER}" | ||
|
||
# Add Replica type label if that is required. | ||
if replica_type: | ||
label_selector += ( | ||
f",{constants.REPLICA_TYPE_LABEL}={str.lower(replica_type)}" | ||
) | ||
|
||
# Add Replica index label if that is required. | ||
if replica_index is not None: | ||
label_selector += f",{constants.REPLICA_INDEX_LABEL}={replica_index}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the intention of using label selector instead of OwnerReference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's needed if user wants to get Job's pod names for the appropriate replica type or replica index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Thanks for clarifying.
if ( | ||
num_chief_replicas is None | ||
and num_ps_replicas is None | ||
and num_worker_replicas is None | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, we can remove those validations once we introduce CEL validations.
Ref: #1708
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me add the comment about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
# Check if at least one worker replica is set. | ||
if num_worker_replicas is None: | ||
raise ValueError("At least one Worker replica for PyTorchJob must be set") | ||
|
||
# Check if function is callable. | ||
if not callable(func): | ||
raise ValueError( | ||
f"Training function must be callable, got function type: {type(func)}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, we can remove those validations once we introduce CEL validations.
Ref: #1708
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, if we can validate training function in the CEL validation tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion.
I meant only L870~L872. Maybe, we can validate num_worker_replicas
in the following:
+kubebuilder:validation:XValidation:rule="self['Master'].replicas == 0 && self['Worker'].replicas == 0"
# Add Chief, PS, and Worker replicas to the TFJob. | ||
if num_chief_replicas is not None: | ||
tfjob.spec.tf_replica_specs["Chief"] = models.V1ReplicaSpec( | ||
replicas=num_chief_replicas, template=pod_template_spec, | ||
) | ||
|
||
if num_ps_replicas is not None: | ||
tfjob.spec.tf_replica_specs["PS"] = models.V1ReplicaSpec( | ||
replicas=num_ps_replicas, template=pod_template_spec, | ||
) | ||
|
||
if num_worker_replicas is not None: | ||
tfjob.spec.tf_replica_specs["Worker"] = models.V1ReplicaSpec( | ||
replicas=num_worker_replicas, template=pod_template_spec, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make roles like Master
, Worker
, and more constants?
def create_mxjob_from_func(self): | ||
"""Create MXJob from the function. | ||
TODO (andreyvelich): Implement this function. | ||
""" | ||
logging.warning("This API has not been implemented yet.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to work on this in this PR? Or another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, we can implement those in the following PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
def create_xgboostjob_from_func(self): | ||
"""Create XGBoost from the function. | ||
TODO (andreyvelich): Implement this function. | ||
""" | ||
logging.warning("This API has not been implemented yet.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to work on this in this PR? Or another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do that additional APIs in the following PRs @tenzen-y (maybe in the next release).
@johnugeorge @tenzen-y @terrytangyuan I believe, I addressed all of your suggestions. |
@andreyvelich Looks great! Thanks for this! |
/lgtm /assign @terrytangyuan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: andreyvelich, terrytangyuan The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Thanks everyone for the review! |
Fixes: #1691
I created unify
TrainingClient
for our SDK to improve UX and reduce code duplication.I added the following APIs:
Common APIs:
get_job_conditions
to get Training Job conditions.is_job_created
,is_job_running
,is_job_restarting
,is_job_succeeded
, `is_job_failed to check Job status.wait_for_job_conditions
to wait for Job Conditions. I remove Watch Parameter in get API and print Job status inwait_for_job_conditions
API. I don't think users usewatch
parameter in get API. If that is required, we can update it later. Does it sound good ?get_job_pod_names
to get Job pod names.get_job_logs
to get Job logs.Job-related APIs (TFJob, PyTorchJob, MXJob, XGBoostJob, MPIJob, PaddleJob):
create_tfjob
to create Job.create_tfjob_from_func
to create Job from func.get_tfjob
to get Job.list_tfjobs
to list Jobs.delete_tfjob
to delete Job.Please let me know if we can reduce code duplication more and how we can improve our SDK further.
Also, I deleted
py
andtest
ksonnet old files since we are not longer using it.TODO: Modify SDK examples.
/assign @kubeflow/wg-training-leads @tenzen-y @anencore94 @kuizhiqing @alembiewski