Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for delayed annotations #760

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,27 +267,31 @@ def _change_unrecognized_type_to_pickle(t: Type[T]) -> Type[T]:
return t


def transform_signature_to_interface(signature: inspect.Signature, docstring: Optional[Docstring] = None) -> Interface:
def transform_function_to_interface(fn: Callable, docstring: Optional[Docstring] = None) -> Interface:
"""
From the annotations on a task function that the user should have provided, and the output names they want to use
for each output parameter, construct the TypedInterface object

For now the fancy object, maybe in the future a dumb object.

"""
outputs = extract_return_annotation(signature.return_annotation)
type_hints = typing.get_type_hints(fn)
signature = inspect.signature(fn)
return_annotation = type_hints.get("return", None)

outputs = extract_return_annotation(return_annotation)
for k, v in outputs.items():
outputs[k] = _change_unrecognized_type_to_pickle(v)
inputs = OrderedDict()
for k, v in signature.parameters.items():
annotation = v.annotation
annotation = type_hints.get(k, None)
default = v.default if v.default is not inspect.Parameter.empty else None
# Inputs with default values are currently ignored, we may want to look into that in the future
inputs[k] = (_change_unrecognized_type_to_pickle(annotation), default)

# This is just for typing.NamedTuples - in those cases, the user can select a name to call the NamedTuple. We
# would like to preserve that name in our custom collections.namedtuple.
custom_name = None
return_annotation = signature.return_annotation
if hasattr(return_annotation, "__bases__"):
bases = return_annotation.__bases__
if len(bases) == 1 and bases[0] == tuple and hasattr(return_annotation, "_fields"):
Expand Down Expand Up @@ -334,7 +338,7 @@ def output_name_generator(length: int) -> Generator[str, None, None]:
yield default_output_name(x)


def extract_return_annotation(return_annotation: Union[Type, Tuple]) -> Dict[str, Type]:
def extract_return_annotation(return_annotation: Union[Type, Tuple, None]) -> Dict[str, Type]:
"""
The purpose of this function is to sort out whether a function is returning one thing, or multiple things, and to
name the outputs accordingly, either by using our default name function, or from a typing.NamedTuple.
Expand Down Expand Up @@ -368,7 +372,7 @@ def t(a: int, b: str) -> Dict[str, int]: ...

# Handle Option 6
# We can think about whether we should add a default output name with type None in the future.
if return_annotation is None or return_annotation is inspect.Signature.empty:
if return_annotation in (None, type(None), inspect.Signature.empty):
return {}

# This statement results in true for typing.Namedtuple, single and void return types, so this
Expand Down
5 changes: 2 additions & 3 deletions flytekit/core/launch_plan.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from __future__ import annotations

import inspect
from typing import Any, Callable, Dict, List, Optional, Type

from flytekit.core import workflow as _annotated_workflow
from flytekit.core.context_manager import FlyteContext, FlyteContextManager, FlyteEntities
from flytekit.core.interface import Interface, transform_inputs_to_parameters, transform_signature_to_interface
from flytekit.core.interface import Interface, transform_function_to_interface, transform_inputs_to_parameters
from flytekit.core.promise import create_and_link_node, translate_inputs_to_literals
from flytekit.core.reference_entity import LaunchPlanReference, ReferenceEntity
from flytekit.models import common as _common_models
Expand Down Expand Up @@ -399,7 +398,7 @@ def reference_launch_plan(
"""

def wrapper(fn) -> ReferenceLaunchPlan:
interface = transform_signature_to_interface(inspect.signature(fn))
interface = transform_function_to_interface(fn)
return ReferenceLaunchPlan(project, domain, name, version, interface.inputs, interface.outputs)

return wrapper
2 changes: 1 addition & 1 deletion flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ def create_task_output(
if len(promises) == 1:
if not entity_interface:
return promises[0]
# See transform_signature_to_interface for more information, we're using the existence of a name as a proxy
# See transform_function_to_interface for more information, we're using the existence of a name as a proxy
# for the user having specified a one-element typing.NamedTuple, which means we should _not_ extract it. We
# should still return a tuple but it should be one of ours.
if not entity_interface.output_tuple_name:
Expand Down
7 changes: 2 additions & 5 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""


import inspect
from abc import ABC
from collections import OrderedDict
from enum import Enum
Expand All @@ -24,7 +23,7 @@
from flytekit.core.base_task import Task, TaskResolverMixin
from flytekit.core.context_manager import ExecutionState, FastSerializationSettings, FlyteContext, FlyteContextManager
from flytekit.core.docstring import Docstring
from flytekit.core.interface import transform_signature_to_interface
from flytekit.core.interface import transform_function_to_interface
from flytekit.core.python_auto_container import PythonAutoContainerTask, default_task_resolver
from flytekit.core.tracker import is_functools_wrapped_module_level, isnested, istestfunction
from flytekit.core.workflow import (
Expand Down Expand Up @@ -114,9 +113,7 @@ def __init__(
"""
if task_function is None:
raise ValueError("TaskFunction is a required parameter for PythonFunctionTask")
self._native_interface = transform_signature_to_interface(
inspect.signature(task_function), Docstring(callable_=task_function)
)
self._native_interface = transform_function_to_interface(task_function, Docstring(callable_=task_function))
mutated_interface = self._native_interface.remove_inputs(ignore_input_vars)
super().__init__(
task_type=task_type,
Expand Down
5 changes: 2 additions & 3 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import datetime as _datetime
import inspect
from typing import Any, Callable, Dict, List, Optional, Type, Union

from flytekit.core.base_task import TaskMetadata, TaskResolverMixin
from flytekit.core.interface import transform_signature_to_interface
from flytekit.core.interface import transform_function_to_interface
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
Expand Down Expand Up @@ -240,7 +239,7 @@ def reference_task(
"""

def wrapper(fn) -> ReferenceTask:
interface = transform_signature_to_interface(inspect.signature(fn))
interface = transform_function_to_interface(fn)
return ReferenceTask(project, domain, name, version, interface.inputs, interface.outputs)

return wrapper
7 changes: 6 additions & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ def get_literal_type(self, t: Type[T]) -> LiteralType:
v.load_by = LoadDumpOptions.name
schema = JSONSchema().dump(s)
except Exception as e:
logger.warn("failed to extract schema for object %s, (will run schemaless) error: %s", str(t), e)
# https://github.com/lovasoa/marshmallow_dataclass/issues/13
logger.warning(
f"Failed to extract schema for object {t}, (will run schemaless) error: {e}"
f"If you have postponed annotations turned on (PEP 563) turn it off please. Postponed"
f"evaluation doesn't work with json dataclasses"
)

return _primitives.Generic.to_flyte_literal_type(metadata=schema)

Expand Down
7 changes: 3 additions & 4 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import inspect
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
Expand All @@ -15,9 +14,9 @@
from flytekit.core.docstring import Docstring
from flytekit.core.interface import (
Interface,
transform_function_to_interface,
transform_inputs_to_parameters,
transform_interface_to_typed_interface,
transform_signature_to_interface,
)
from flytekit.core.launch_plan import LaunchPlan
from flytekit.core.node import Node
Expand Down Expand Up @@ -574,7 +573,7 @@ def __init__(
):
name = f"{workflow_function.__module__}.{workflow_function.__name__}"
self._workflow_function = workflow_function
native_interface = transform_signature_to_interface(inspect.signature(workflow_function), docstring=docstring)
native_interface = transform_function_to_interface(workflow_function, docstring=docstring)

# TODO do we need this - can this not be in launchplan only?
# This can be in launch plan only, but is here only so that we don't have to re-evaluate. Or
Expand Down Expand Up @@ -770,7 +769,7 @@ def reference_workflow(
"""

def wrapper(fn) -> ReferenceWorkflow:
interface = transform_signature_to_interface(inspect.signature(fn))
interface = transform_function_to_interface(fn)
return ReferenceWorkflow(project, domain, name, version, interface.inputs, interface.outputs)

return wrapper
1 change: 1 addition & 0 deletions tests/flytekit/unit/core/functools/simple_decorator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Script used for testing local execution of functool.wraps-wrapped tasks"""
from __future__ import annotations

import os
from functools import wraps
Expand Down
1 change: 1 addition & 0 deletions tests/flytekit/unit/core/functools/test_decorators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Test local execution of files that use functools to decorate tasks and workflows."""
from __future__ import annotations

import os
import subprocess
Expand Down
Loading