-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support placeholders for processing step #155
Changes from 1 commit
927b24f
003b5e8
a7700a6
6b6443a
7f6ef30
00830f3
c708da7
2ea9e1f
17543ed
36e2ee8
ea40f7c
e499108
4c63229
34bb281
a098c61
06eb069
da99c92
37b2422
c433576
fd640ab
1dfa0e3
6143783
ebc5e22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,6 +14,7 @@ | |||||
|
||||||
import boto3 | ||||||
import logging | ||||||
from stepfunctions.inputs import Placeholder | ||||||
|
||||||
logger = logging.getLogger('stepfunctions') | ||||||
|
||||||
|
@@ -45,3 +46,24 @@ def get_aws_partition(): | |||||
return cur_partition | ||||||
|
||||||
return cur_partition | ||||||
|
||||||
|
||||||
def merge_dicts(first, second, first_name, second_name): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could also be used to merge the hyperparameters in TrainingStep - will make the changes in another PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: First and second don't describe the side effects and which dict gets merged into what. Borrowing from JavaScript's
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 - I also like to push for doc strings where behaviour is not entirely intuitive. i.e. what happens if there are clashes, are overwrites allowed, etc. |
||||||
""" | ||||||
ca-nguyen marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
Merges first and second dictionaries into the first one. | ||||||
ca-nguyen marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
Values in the first dict are updated with the values of the second one. | ||||||
""" | ||||||
if all(isinstance(d, dict) for d in [first, second]): | ||||||
ca-nguyen marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
for key, value in second.items(): | ||||||
if key in first: | ||||||
if isinstance(first[key], dict) and isinstance(second[key], dict): | ||||||
merge_dicts(first[key], second[key], first_name, second_name) | ||||||
elif first[key] is value: | ||||||
ca-nguyen marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
pass | ||||||
else: | ||||||
logger.info( | ||||||
f"{first_name} property: <{key}> with value: <{first[key]}>" | ||||||
f" will be overwritten with value provided in {second_name} : <{value}>") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: Do we think this is useful? If not, can just use Python's built-in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The built in update() does not take into account nested dictionary values - for ex:
Will have following output: Since we would expect to get There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initially added them to facilitate troubleshooting, but I'm open to remove the logs if we deem them not useful enough or too noisy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the expected behaviour is well documented it seems unnecessary. Since the method only exists for logging, if we get rid of it there's less code to maintain. What do you think, @shivlaks? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Missed this comment. Since we need a deep merge, dict.update is not going to work here |
||||||
first[key] = second[key] | ||||||
else: | ||||||
first[key] = second[key] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,6 @@ | |
|
||
from stepfunctions.inputs import ExecutionInput | ||
from stepfunctions.steps import Chain | ||
from stepfunctions.steps.fields import Field | ||
from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, TuningStep, ProcessingStep | ||
from stepfunctions.workflow import Workflow | ||
|
||
|
@@ -384,27 +383,41 @@ def test_processing_step_with_placeholders(sklearn_processor_fixture, sagemaker_ | |
|
||
# Build workflow definition | ||
execution_input = ExecutionInput(schema={ | ||
Field.ImageUri.value: str, | ||
Field.InstanceCount.value: int, | ||
Field.Entrypoint.value: str, | ||
Field.Role.value: str, | ||
Field.VolumeSizeInGB.value: int, | ||
Field.MaxRuntimeInSeconds.value: int | ||
'image_uri': str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're only using these values for test purposes, using the direct string values for better code readability |
||
'instance_count': int, | ||
'entrypoint': str, | ||
'role': str, | ||
'volume_size_in_gb': int, | ||
'max_runtime_in_seconds': int, | ||
'container_arguments': [str], | ||
}) | ||
|
||
parameters = { | ||
'AppSpecification': { | ||
'ContainerEntrypoint': execution_input['entrypoint'], | ||
'ImageUri': execution_input['image_uri'] | ||
}, | ||
'ProcessingResources': { | ||
'ClusterConfig': { | ||
'InstanceCount': execution_input['instance_count'], | ||
'VolumeSizeInGB': execution_input['volume_size_in_gb'] | ||
} | ||
}, | ||
'RoleArn': execution_input['role'], | ||
'StoppingCondition': { | ||
'MaxRuntimeInSeconds': execution_input['max_runtime_in_seconds'] | ||
} | ||
} | ||
|
||
job_name = generate_job_name() | ||
processing_step = ProcessingStep('create_processing_job_step', | ||
processor=sklearn_processor_fixture, | ||
job_name=job_name, | ||
inputs=inputs, | ||
outputs=outputs, | ||
container_arguments=['--train-test-split-ratio', '0.2'], | ||
container_entrypoint=execution_input[Field.Entrypoint.value], | ||
image_uri=execution_input[Field.ImageUri.value], | ||
instance_count=execution_input[Field.InstanceCount.value], | ||
role=execution_input[Field.Role.value], | ||
volume_size_in_gb=execution_input[Field.VolumeSizeInGB.value], | ||
max_runtime_in_seconds=execution_input[Field.MaxRuntimeInSeconds.value] | ||
container_arguments=execution_input['container_arguments'], | ||
container_entrypoint=execution_input['entrypoint'], | ||
parameters=parameters | ||
) | ||
workflow_graph = Chain([processing_step]) | ||
|
||
|
@@ -418,12 +431,13 @@ def test_processing_step_with_placeholders(sklearn_processor_fixture, sagemaker_ | |
) | ||
|
||
execution_input = { | ||
Field.ImageUri.value: '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3', | ||
Field.InstanceCount.value: 1, | ||
Field.Entrypoint.value: ['python3', '/opt/ml/processing/input/code/preprocessor.py'], | ||
Field.Role.value: sagemaker_role_arn, | ||
Field.VolumeSizeInGB.value: 30, | ||
Field.MaxRuntimeInSeconds.value: 500 | ||
'image_uri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3', | ||
'instance_count': 1, | ||
'entrypoint': ['python3', '/opt/ml/processing/input/code/preprocessor.py'], | ||
'role': sagemaker_role_arn, | ||
'volume_size_in_gb': 30, | ||
'max_runtime_in_seconds': 500, | ||
'container_arguments': ['--train-test-split-ratio', '0.2'] | ||
} | ||
|
||
# Execute workflow | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice to see us embracing pep8 in files we touch 🙌
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙌🙌🙌