Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add localmod storage #3351

Merged
merged 7 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changes/pr3351.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
enhancement:
- "Allow python path in Local storage - [#3351](https://github.com/PrefectHQ/prefect/pull/3351)"
contributor:
- "[Jack D. Sundberg](https://github.com/jacksund)"
19 changes: 13 additions & 6 deletions src/prefect/environments/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import prefect
from prefect.engine.results import LocalResult
from prefect.environments.storage import Storage
from prefect.utilities.storage import extract_flow_from_file
from prefect.utilities.storage import extract_flow_from_file, extract_flow_from_module

if TYPE_CHECKING:
from prefect.core.flow import Flow
Expand All @@ -32,7 +32,8 @@ class Local(Storage):
absolute path and created. Defaults to `True`
- path (str, optional): a direct path to the location of the flow file if
`stored_as_script=True`, otherwise this path will be used when storing the serialized,
pickled flow.
pickled flow. If `stored_as_script=True`, the direct path may be a file path
(such as 'path/to/myflow.py') or a direct python path (such as 'myrepo.mymodule.myflow')
- stored_as_script (bool, optional): boolean for specifying if the flow has been stored
as a `.py` file. Defaults to `False`
- **kwargs (Any, optional): any additional `Storage` initialization options
Expand Down Expand Up @@ -76,7 +77,8 @@ def get_flow(self, flow_location: str = None) -> "Flow":

Args:
- flow_location (str, optional): the location of a flow within this Storage; in this case,
a file path where a Flow has been serialized to. Will use `path` if not provided.
a file path or python path where a Flow has been serialized to. Will use `path`
if not provided.

Returns:
- Flow: the requested flow
Expand All @@ -92,10 +94,15 @@ def get_flow(self, flow_location: str = None) -> "Flow":
else:
raise ValueError("No flow location provided")

if self.stored_as_script:
return extract_flow_from_file(file_path=flow_location)
# check if the path given is a file path
if os.path.isfile(flow_location):
if self.stored_as_script:
return extract_flow_from_file(file_path=flow_location)
else:
return prefect.core.flow.Flow.load(flow_location)
# otherwise the path is given in the module format
else:
return prefect.core.flow.Flow.load(flow_location)
return extract_flow_from_module(module_str=flow_location)

def add_flow(self, flow: "Flow") -> str:
"""
Expand Down
32 changes: 32 additions & 0 deletions src/prefect/utilities/storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
from typing import TYPE_CHECKING

import prefect
Expand Down Expand Up @@ -82,3 +83,34 @@ def extract_flow_from_file(
return exec_vals[var]

raise ValueError("No flow found in file.")


def extract_flow_from_module(module_str: str, flow_name: str = None) -> "Flow":
"""
Extract a flow object from a python module.

Args:
- module_str (str): A module path pointing to a .py file containing a flow.
For example, 'myrepo.mymodule.myflow' where myflow.py contains the flow.
- flow_name (str, optional): A specific name of a flow to extract from a file.
If not set then the first flow object retrieved from file will be returned.
The "first" flow object will be based on the dir(module) ordering, which
is alphabetical and capitalized letters come first.

Returns:
- Flow: A flow object extracted from a file
"""

# load the module
module = importlib.import_module(module_str)

# if flow_name is specified, grab it from the module
if flow_name:
return getattr(module, flow_name)
# otherwise loop until we get a Flow object
else:
for var in dir(module):
if isinstance(getattr(module, var), prefect.Flow):
return getattr(module, var)

raise ValueError("No flow found in module.")