Status | Accepted |
---|---|
Author(s) | Ruoyu Liu (ruoyu@google.com), Hui Miao (huimiao@google.com), Hongye Sun (hongyes@google.com), Renmin Gu (renming@google.com) |
Sponsor | Konstantinos Katsiapis (katsiapis@google.com), Mitch Trott (trott@google.com), Zhitao Li (zhitaoli@google.com) |
Updated | 2020-07-05 |
This RFC documents the data model that supports the TFX DSL semantics. It also introduces the TFX DSL intermediate representation (IR) and the workflow based on that. The IR is the bridge between the DSL and its orchestration / execution on all supported platforms and the workflow is the procedure that all platforms should follow to reflect the data model in MLMD.
NOTE: While this doc has more than usual details than a typical design doc, it's still a design doc rather than a spec doc. However the long term goal is to make IR a specification for ML pipelines.
When TFX DSL and orchestration were first introduced, artifact-centric and lineage-driven were among the core properties of the stack. Although these properties are guaranteed by the underlying data model of ML Metadata, it was arguably hard to notice the importance of the data model at that time since the TFX stack only supported pipelines with synchronous artifact fetching (each node only consumes artifacts that are produced within the pipeline run).
Later on, the
Resolver
node was introduced, which enables asynchronous artifact fetching (a
ResolverNode
is able to fetch artifacts produced from past pipeline runs).
This is essential to supporting more advanced features such as warm-start
training and model validation. As a result, the importance of a consistent data
model became more visible since it would be infeasible to define different
Resolver
logic reliably without every pipeline run following the same data
model when reading and writing ML metadata.
The importance of the data model was further amplified with the proposal of the advanced DSL semantics, in which the concept of synchronous versus asynchronous execution pipelines and synchronous versus asynchronous data pipelines was formalized. It is also in this proposal that asynchronous execution pipelines are considered to be the future of ML productionisation as training data becomes more and more accessible and the data volume is likely to overwhelm the computing power of systems running training pipelines. One of the consequences of this trend is that asynchronous artifact fetching will become the mainstream, as it is by nature a property of asynchronous execution pipelines. However, synchronous artifact fetching is still needed and is likely to dominate for use cases where one-shot pipelines are good enough. It is also useful even in the context of asynchronous execution pipelines, where data synchronization is required. A good example will be the sub-pipeline idea.
Thus, it is highly desired to have a uniform data model that is consistent across synchronous artifact fetching and asynchronous artifact fetching. Moreover, the data model should also be flexible and extensible enough so that it can adapt to not only existing features, but also future features including but not limited to sub-pipeline and conditionals.
Besides consistency across synchronous and asynchronous artifact fetching, a consistency across platforms of the data model is also important as TFX is designed to be a portable stack that is platform agnostic. This is especially important for the 'local development first and cloud deployment after' experience.
A good example is the Resolver
logic, which is supposed to be written once and
deployed anywhere. However, if the data models are inconsistent across
platforms, the artifact resolution behaviors on different platforms are likely
to be different. This will potentially cause problems that are often hard to
debug.
Beyond correctness and consistent behavior across platforms of existing features, a consistent data model also helps to make the adoption of future semantics / features easier, and potentially reduces duplicate work. Although it is still possible to implement the same semantics / features (e.g., sub-pipeline) with the same behavior on top of different data models, the efforts will be super-linear to the number of features added and are likely to become unmanageable over time.
Under the existing layering of TFX DSL / orchestration, it is not easy to guarantee a consistent data model across platforms since the data model is mostly embedded in the implementation. Since code reuse is possible for some platforms but might be hard or even infeasible for others (e.g., a golang-based platform can hardly reuse a python-based module), a more explicit contract is desired to serve as the bridge between the pipeline definition script and the platforms that run the pipelines. For this reason, we would like to introduce TFX Intermediate representation (IR) in this RFC.
The IR should provide a serialized format of TFX DSL that has the following properties:
-
Carries over all TFX DSL semantics in a uniform way. This means that the same TFX DSL IR should be able to support the following pairs:
- Synchronous execution & asynchronous execution
- Synchronous data fetching & asynchronous data fetching
- Data dependencies & task dependencies
- Basic semantics & advanced semantics (sub-pipeline, conditional, etc)
-
Provides a fully declarative specification of a TFX pipeline. As a comparison, a large chunk of the specification is hidden in the code (in the current, pre-IR world).
-
Provides flexibility and extensibility in the following ways:
- Enables using different frontend languages to compose TFX pipelines.
- Enables attaching platform specific extensions to IR in a transparent way so that it can be understood by specific platform runners.
- Enables applying different optimization strategies on top of the
pipeline graph. These optimization strategies can be generic (e.g.,
combining
ResolverNode
and its only consumer) or can be specialized to a certain platform (e.g., better leverage Fusion in Dataflow to make data processing more efficient).
IR, together with the workflow based on it should be able to:
-
Reflect the TFX DSL data model by describing how a pipeline definition should be executed and what the MLMD representation of the execution should look like.
-
Provide portability across all existing platforms and enable running TFX pipelines on more platforms. The portability can be further categorized into the following:
- Portability of scheduling semantics (i.e., what gets run, and when it runs)
- Portability of underlying recorded data model (i.e., what gets stored in MLMD)
- Portability of data I/O semantics (i.e., how input selection happens, and how dependency between inputs and executions are enforced)
The design will be introduced in three sections:
- Section 3.1 describes how a TFX pipeline execution will be represented in MLMD database (a.k.a., the end result of a pipeline execution).
- Section 3.2 describes a specification of a TFX pipeline.
- Section 3.3 describes how the detailed workflow given a specification of a pipeline and how that works for different semantics.
As mentioned previously, the data model of TFX DSL is built on top of ML Metadata (MLMD) which provides a graph data model that is specialized for machine learning workflows.
In TFX, the three types of vertices in MLMD are used as follows:
- Artifact: an
Artifact
maps to an output of a node in a TFX pipeline and can be potentially fed into another node as an input. Anartifact
is always typed and the payload of the data is always referenced by theuri
field in theartifact
. Once published (marked asLIVE
), the payload and most metadata of anartifact
in TFX will not be mutated, with a few exceptions (e.g., system-owned metadata fields such asstate
). - Execution: an
Execution
records a node run in a TFX pipeline. It takes zero or moreArtifacts
and produces zero or moreArtifacts
. The non-Artifact inputs (e.g., parameters) of the execution will be stored as the properties of theExecution
instance. - Context: a
Context
describes grouping concepts of workflow elements. In TFX, it does not map to any specific entity, instead it groups artifacts and executions under certain meanings (e.g., a pipeline run). Note that anExecution
or anArtifact
can be linked to more than oneContexts
.
There are also three types of edges that are used in TFX, linking the aforementioned vertices:
- Event: an
Event
links anArtifact
and anExecution
. In TFX, theEvent
contains the following info:- The type of the
Event
. For example, typeINPUT
means that theArtifact
is an input of theExecution
. - The key of the input / output
Artifact
. For example, aTrainer
might produce two groups of models: one group of models are normal TensorFlow models and the other group of models are optimized for mobile. The key can be used to differentiate them. - The index of the input / output
Artifact
. For example, anExampleGen
might produce a list of outputs with the same output key. To differentiate them, the index will be used.
- The type of the
- Attribution: an
Attribution
links anArtifact
to aContext
. - Association: an
Association
links anExecution
to aContext
.
In TFX DSL, artifacts flow between components in
Channels
.
In the simple example below, MyTrainer
uses the output Channel
of
MyExampleGen
as its input Channel
for examples data. In other words,
MyExampleGen
is writing to its output Channel
and MyTrainer
is reading
from the same Channel
.
def create_pipeline():
eg = MyExampleGen(param_one=...)
tr = MyTrainer(input_examples=eg.outputs['output_examples'], ...)
return Pipeline.pipeline(
pipeline_name='my_pipeline',
components=[eg, tr],
execution_mode=SYNC
...)
Channel
is an abstract concept in TFX instead of a real entity in MLMD like
Artifact
. Channel
. It is built on top of the basic elements described
previously. When being mapped to MLMD, a Channel
is essentially a view of a
collection of Artifact
that satisfies a set of restrictions defined by five
categories of predicates shown below. Note that the predicates soly rely on
information from MLMD and ArtifactType
is the only required one.
Required? | Type of Predicates |
---|---|
Yes | Predicates on the type of artifacts |
No | Predicates on the properties of the artifacts |
No | Predicates on the Context attribution |
No | Predicates on the producer Execution of the artifacts |
No | Predicates on the Event linking the artifacts and the Execution |
Although ‘writing to a Channel
’ and ‘reading from a Channel
’ are percepted
as a pair of symmetrical actions under the context of TFX DSL, they are very
different when being mapped to the underlying data model:
- ‘Writing to a
Channel
’ essentially means publishing artifacts to MLMD. In addition, lineages of those artifact are also kept in MLMD, such as its producer execution, related contexts (e.g., pipeline run) and the edges linking them. The entries published to MLMD follow the data model so that the consumers of the writer's outputs can get the artifacts correctly. - ‘Reading from a
Channel
’, on the other hand, refers to a node getting desiredArtifact
entries (not payload) from MLMD by a query (in implementation, a set of MLMD API calls) that is constructed from the information of theChannel
. In other words, aChannel
should contain all the information that can be used to precisely recreate all the predicates needed to construct the query, with which the reader can get all the desired artifacts.
Taking the code snippet above as an example:
- In
MyExampleGen
, anArtifact
along with other entities are published to MLMD as shown in the figure below. - In
MyTrainer
, it queries MLMD (currently in the form of a set of MLMD API calls) to cover the following restrictions:- The type of the artifacts matches desired types (In this case,
Examples
). - The artifacts are linked to the correct
Context
(s). In this case, it should be at least linked to aContext
representing the pipeline subgraph ofmy_pipeline
. - The artifacts are produced by the
MyExampleGen
node in the pipeline. If there are more than oneMyExampleGen
nodes, the query should be able to distinguish one from the others. - The artifacts are produced through the output key of
output_examples
.
- The type of the artifacts matches desired types (In this case,
Channel
construction happens during DSL compilation time. The compiler will
generate channels containing different information for different DSL features /
semantics (more in the next section). The second part of the design proposal
will cover how Channel
is represented in IR.
A Channel
for synchronous data mode (synchronous artifacts fetching) and a
Channel
for asynchronous data mode (asynchronous artifacts fetching) is only
different in that, the final query created for the former always includes a rule
with predicate on the MLMD Context
representing a particular synchronization
group run (e.g., pipeline run, which is the same pipeline run as the component
doing the query is in). This aligns with the goal to have a uniform data model
for both data modes.
The Channel
data model is also consistent across all platforms since it only
relies on reading and writing to MLMD, which is platform agnostic.
Data model of Channel
described above is the core of TFX data model. Not only
the basic functionalities but also those advanced semantics such as sub-pipeline
and (future) conditionals are all built on top of it. We will visit the details
of these features along with their IR form in the next section.
The previous section introduced the data model and its mechanism for most cases. This section introduces how the same data model supports more advanced semantics like resolvers and sub-pipelines. Similarly, the data model can be applied to potential future features like conditionals, which will not be covered in this RFC.
A ResolverNode
is an auxiliary node which runs a Resolver
definition. It is
a special 'execution' since simply marking input artifacts for the downstream
nodes, instead of really consuming or producing any data. From the end user’s
perspective, a ResolverNode
does not perform actual work, while from an
orchestrator's perspective, a ResolverNode
marks the inputs for the downstream
components.
...
a = ComponentA(...)
b = ComponentB(...)
r = ResolverNode(
key_one=a.outputs[...],
key_two=b.outputs[...], ...)
c = ComponentC(
input_one=r.outputs['key_one'],
input_two=r.outputs['key_two'],)
...
When mapped to TFX data model, the Execution
of a ResolverNode
does not
publish any artifact entry into MLMD. Instead, they are captured as internal
events linking the ResolverNode
execution with existing artifacts. These
internal events (with event types INTERNAL_INPUT
and INTERNAL_OUTPUT
) are
hidden from end users when using INPUT
/ OUTPUT
events during lineage
tracking.
For the simple example code in Table 3, the figure above
demonstrates two views of the MLMD graph stored in the database. To better
illustrate the entities involved, the figure shows a base (degenerate) case
where the ResolverNode
only needs to resolve an artifact from a candidate list
containing a single artifact per each key.
- The left part of the graph is the workflow lineage view. It is what end
users typically want to see and use. The execution of the
ResolverNode
and related events are hidden as they do not affect the lineage. - The right part of the graph is the system view. It contains every node and
edge in the MLMD subgraph after the code runs. Table 4 below
lists out information of all events for the execution. As discussed
previously, events {
e3
,e4
,e5
,e6
} will not be visible to end users (since they are of typeINTERNAL_INPUT
andINTERNAL_OUTPUT
). ResolverNode executionr
will not be visible to end users either. This hidden information will be used by the orchestrator / system to resolve input artifacts for individual executions (details in the next paragraph).
Event | Event type | Related execution | Related artifact |
---|---|---|---|
e1 | OUTPUT |
a | 1 |
e2 | OUTPUT |
b | 2 |
e3 | INTERNAL_INPUT |
r | 1 |
e4 | INTERNAL_INPUT |
r | 2 |
e5 | INTERNAL_OUTPUT |
r | 1 |
e6 | INTERNAL_OUTPUT |
r | 2 |
e7 | INPUT |
c | 1 |
e8 | INPUT |
c | 2 |
After executing the ResolverNode
r
and the last node c
, the interactions
with MLMD data model are listed in details below:
r
constructs a set of MLMD API calls based on the information in the input channels (one froma
and one fromb
) and get candidate artifacts- Based on the resolver policy and config,
r
further filters the candidate artifacts it received from (1) and publishes the following atomically:- An execution.
- One or more events of INTERNAL_INPUT linking the execution in (2.1) and the artifacts got from (1).
- One or more events of INTERNAL_OUTPUT linking the execution in (2.1) and the artifacts that pass the filter.
c
constructs a set of MLMD API calls based on the information in the input channels. The API calls will include the following:- Gets the execution that has the same execution type and instance name as
r
(and associated with some specific pipeline run context, if under synchronous execution mode). - Gets the output events of the execution returned in (3.1). Note that the
events will be filtered by event state direction (input or output) but
will not be filtered by end-user visibility. In our example, there will
only be an output event with type
INTERNAL_OUTPUT
, linking the execution and the artifact we want to get. - Gets the artifact id of the event(s) returned in (3.2) and fetch the artifact(s).
- Gets the execution that has the same execution type and instance name as
c
executes and publishes its execution and output artifacts along with events linking them into MLMD.
In contrast with the base case illustrated above, the figure below shows a more
common case where the ResolverNode
takes in the output artifacts of x
executions of the same node across multiple pipeline runs and marks a subset of
it (in this case, one artifact) as the output of itself through
INTERNAL_OUTPUT
event so that the consumer c
can use it as its input.
Recall that a sub-pipeline is a synchronous execution pipeline inside an asynchronous parent pipeline. A run graph of a sub-pipeline in TFX DSL can be mapped as a subgraph in MLMD. The border of the subgraph is drawn w.r.t. an MLMD context. All executions in the sub-pipeline are linked with the context through associations and all the artifacts that are consumed or produced by those executions are linked with the context through attributions. For better illustration, let's use the example from the advanced semantics RFC shown below.
def create_subpipeline(eg, eb):
b = tfx.experimental.SubpipelineInputs(
inputs={'examples': eg.outputs['examples']},
async_inputs={'embedding': eb.outputs['embedding']})
tx = tfx.Transform(
examples=b.inputs['examples'])
tr = tfx.Trainer(
examples=b.inputs['examples'],
embedding=b.async_inputs['embedding'],
transform_graph=tx.outputs['transform_graph'])
iv = tfx.InfraValidator(model=tr.outputs['model'])
return tfx.experimental.Subpipeline(
pipeline_name='my_sub_pipeline',
components=[tx, tr, iv],
inputs=b,
outputs={
'model': tr.outputs['model'],
'validation_result': iv.outputs['validation_result']
},
async_outputs={'model': tr.outputs['model']})
eg = tfx.ExampleGen(...) # Irrelevant parts omitted
eb = tfx.EmbeddingGenerator(...) # Irrelevant parts omitted
sp = create_subpipeline(eg, eb)
p = tfx.Pusher(
model=sp.outputs['model'],
validation_result=sp.outputs['validation_result'])
lt = tfx.TFLiteConverter(model=sp.async_outputs['model'])
return pipeline.Pipeline(
pipeline_name='my_pipeline',
components=[eg, eb, sp, p, lt], execution_mode=ASYNC)
The figure below demonstrates how the example in Table 5 maps to
the MLMD representation. The big blue box represents the Context
of the
sub-pipeline. Instead of explicitly showing the attribution
and association
edges to the Context
, all Executions
and Artifacts
that are linked to the
Context
are placed within the box.
The only special things for a sub-pipeline are the 'Barnacle' nodes (with dashed
edges) alongside with the nodes representing executions of explicitly-defined
components (ExampleGen
, Trainer
, etc.). These 'Barnacle' nodes are
artificial nodes similar to ResolverNode
that are added as the head and the
tail of a sub-pipeline and will be used for different flavors of inputs access
and outputs access that will be discussed in the next paragraph.
There will be two flavors for a node inside the sub-pipeline to get input artifacts that are NOT produced inside the sub-pipeline:
- Asynchronous inputs: This is the same behavior as a normal node in an
asynchronous pipeline. If there are two nodes inside a sub-pipeline that are
trying to read from the same input
Channel
produced outside of the sub-pipeline asynchronously, they might get different results. In the example above,Trainer
is reading the outputs ofEmbeddingGenerator
in this way. - Synchronous inputs: As a comparison, reading inputs synchronously
guarantees that the same set of artifacts will be returned as the result
of the read throughout the sub-pipeline lifetime. This is achieved by
atomically snapshotting inputs at the beginning of the sub-pipeline. The
snapshotting (represented as the 'Head Barnacle' in the figure above) is
modeled in the same way as a
ResolverNode
is modeled. In the example above, bothTransform
andTrainer
are reading the outputs ofExampleGen
in this flavor. As a result, they are all linked with theExecution
of the 'Head Barnacle' when mapped to MLMD.
Symmetrically, there are also two flavors for a node outside of the sub-pipeline to read the outputs of a node inside the sub-pipeline:
- Asynchronous outputs: This is the same behavior as normal asynchronous
data fetching for two nodes inside an asynchronous execution pipeline: The
consumer node is able to get the outputs of the producer node right after
the producer node finishes an execution. In the example above,
TFLite converter
is reading the outputs ofTrainer
in this fashion. - Synchronous outputs: As a comparison, if a node is trying to read the
synchronous outputs of a node inside the sub-pipeline, the consumer node
will not get the outputs until all the nodes inside the sub-pipeline finish
execution. This is achieved by adding a 'Tail Barnacle' as the 'sink' of the
sub-pipeline. The 'Tail Barnacle' will snapshot (similar to
ResolverNode
and 'Head Barnacle') all the outputs produced in the sub-pipeline. The consumer nodes that need synchronous outputs will be linked to the execution of the 'Tail Barnacle' instead of the real producer of the artifacts. In the example above,Pusher
is trying to read synchronous outputs produced by the nodes inside the sub-pipeline and thus will read the results ofTrainer
andInfraValidator
simulnateously. This guarantees that the combination of model and infra validation it gets is always meaningful.
The 'Barnacles' will be generated automatically during compilation time and will share the same representation as any other node in the pipeline. There will be a section below that discusses the details.
The intermediate representation (IR) of TFX DSL provides a way to structurally describe a TFX pipeline. The design of the IR is tightly coupled with the TFX data model discussed in the previous section. As the reader will see, this is especially obvious in the design of the representation of a pipeline node, which is the core part of the IR. Thus this section will first go through the pipeline node representation design first, followed by the pipeline representation structure and finally explore the IR for some semantics and features in TFX DSL as a demonstration.
A pipeline node is the basic unsplittable execution unit in a pipeline. Components, Importer node, Resolver node, other special nodes are all considered pipeline nodes. A sub-pipeline, however, is not considered as a simple node and will be discussed separately.
// Basic info of a pipeline node, including the type and id of the node.
// The information in `NodeInfo` should stay stable across time. Asynchronous
// data fetching behavior might change if this changes.
message NodeInfo {
// The MLMD type of the node. For example, is it an `ExampleGen` or `Trainer`.
ml_metadata.ExecutionType type = 1;
// The unique identifier of the node within the pipeline definition. This id
// will be used in upstream and downstream nodes to indicate node
// dependencies. This is generated by the system.
string id = 2;
}
// Pipeline node definition.
message PipelineNode {
// Basic info of a pipeline node.
NodeInfo node_info = 1;
// Specification for contexts that this node belongs to.
NodeContexts contexts = 2;
// Specification for node inputs.
NodeInputs inputs = 3;
// Specification for node outputs.
NodeOutputs outputs = 4;
// Specification for node parameters.
NodeParameters parameters = 5;
// Specification for the executor of the node.
ExecutorSpec executor = 6;
// Ids of the upstream nodes of the current node.
repeated string upstream_nodes = 7;
// Options for executing the node.
NodeExecutionOptions execution_options = 9;
}
The basic structure of a pipeline node is defined above. Most of the fields are
optional, except node_info
, which contains the basic information of the node
such as the type of the node and the unique id of the node within the outermost
pipeline definition. Note that node_info
is supposed to be stable across
time. Otherwise it will potentially break asynchronous data fetching.
While some of the fields are explicit enough through their names and comments, we will use the remainder of this section to zoom into the rest fields that are more interesting.
The NodeContexts
message specify the contexts that the node belongs to. This
means the execution of the node and the artifacts used by the execution (both
input artifacts and output artifacts) will be attributed / associated with the
contexts. There should be at least one ContextSpec
message to mark the scope
of the pipeline the node is in. For synchronous execution, a ContextSpec
message to mark the specific pipeline run is also expected. This information
will be used by the consumers of the node to locate the search scope when
resolving input artifacts. Also, the flexibility to add more ContextSpec
messages is an important extension point to support customized grouping (e.g.,
experiments).
// Spec of a context.
message ContextSpec {
// The type of the context.
ml_metadata.ContextType type = 1;
// The name of the context.
Value name = 2;
// Properties of the context.
map<string, Value> properties = 3;
}
// Specifications of contexts that this node belongs to. All input artifacts,
// output artifacts and execution of the node will be linked to the (MLMD)
// contexts generated from these specifications.
message NodeContexts {
repeated ContextSpec contexts = 1;
}
NodeParameters
represent non-artifact inputs to a node. They will be resolved
during runtime and passed into Executor as exec_properties
. The definition of
parameters is a map from key to Value
. As shown below, a Value
can be either
a static field_value
, or a dynamic value (detailed of the two runtime
parameter definitions are omitted and can be found
here.
Also note that the SDK for the dynamic value is under developing and is only
partially
available). The same Value
definition will also be used broadly in other
message definitions in IR.
// Definition for Value in DSL IR. A Value instance can be either a field value
// that is determined during compilation time, or a runtime parameter which will
// be determined during runtime.
message Value {
oneof value {
ml_metadata.Value field_value = 1;
RuntimeParameter runtime_parameter = 2;
StructuralRuntimeParameter structural_runtime_parameter = 3;
}
}
// Specifications for node parameters.
message NodeParameters {
map<string, Value> parameters = 1;
}
Input specs provide instructions of how to fetch and resolve input artifacts that are needed for a specific pipeline node execution. It has two parts:
- A map between input key and
InputSpec
message. This map is required for all use cases. - A
ResolverConfig
message. This is optional and is only needed for asynchronous data fetching. We will defer the design and discussion forResolverConfig
as it requires more consideration and is highly coupled with MLMD declarative query efforts. For now, let’s treat it as a global filter applied on top of regular artifact resolution specified by theInputSpec
map.
// A proto message wrapping all information needed to query one set of artifacts
// from MLMD.
message InputSpec {
message Channel {...}
repeated Channel channels = 1;
// The minimum number of artifacts desired. If minimum requirement is not met,
// the execution should not be triggered. If min_count is less than or equal
// to 0, it means this input is optional.
int32 min_count = 2;
}
// The proto message describes specs of all inputs needed for a component
// execution.
message NodeInputs {
// A map between the input tag and specs for the inputs of that tag.
map<string, InputSpec> inputs = 1;
// Optional resolver configs. This will apply on top of the results of all
// inputs.
ResolverConfig resolver_config = 2;
}
As shown above, InputSpec
represents a union of Channels
while Channel
represents a 'Channel' that was discussed previously in the 'Data model'
section. Note that all Channel
instances within an InputSpec
should share
the same artifact type. This will be enforced during pipeline compilation time.
Zooming into Channel
: recall that in TFX data model, a Channel contains
information of 5 categories of predicates. As the serialized representation of
that, a Channel
message includes all the information for those predicates so
that the system can get desired artifacts through three stages:
-
Contexts filtering: Uses the
ContextQuery
message(s) to locate the search scope. EachContextQuery
represents a predicate on aContext
attribution. When multipleContextQuery
are specified, the search scope will be the intersection of all contexts. -
Executions filtering: Uses
ProducerNodeQuery
message to find all qualified producer executions within the search scope defined by (1). TheProducerNodeQuery
message specifies information about qualified executions such as node id as well as some predicates on the properties of the executions (detailed ofPropertyPredicate
is omitted and can be found here). This maps to the predicates on the producerExecution
of the artifacts. -
Artifacts filtering: Uses the
output_key
field to get the candidate artifacts from the executions obtained from (2). This maps to the predicate on theEvent
linking the artifacts and producerExecution
. Note that an implicit rule not specified in the message is that theEvent
should have a valid outputEvent
type (e.g.,OUTPUT
).Then uses
ArtifactQuery
message to further filter the candidate artifacts to get the final result. Predicate on the artifact type and the predicate(s) on the artifacts (properties) are applied.
// A proto message wrapping all information needed to query one set of artifacts
// from MLMD.
message InputSpec {
message Channel {
// Information to query the producer node of the artifacts.
message ProducerNodeQuery {
// The unique identifier of the node that produced the artifacts.
string id = 1;
// Predicate on producer node properties.
PropertyPredicate property_predicate = 2;
}
// Information to query the contexts the desired artifacts are in.
message ContextQuery {
// The type of the Context.
ml_metadata.ContextType type = 1;
// The name of the context.
Value name = 2;
// Predicate on the context properties.
PropertyPredicate property_predicate = 3;
}
// Information to query the desired artifacts.
message ArtifactQuery {
// The type of the artifact.
ml_metadata.ArtifactType type = 1;
// Predicate on the artifact properties.
PropertyPredicate property_predicate = 2;
}
ProducerNodeQuery producer_node_query = 1;
repeated ContextQuery context_queries = 2;
ArtifactQuery artifact_query = 3;
// The output key of the channel. Consider a `Trainer` with two output
// channels: when downstream nodes consume its outputs, output key(s) need
// to be specified:
// ```
// evaluator = tfx.Evaluator(model=trainer.outputs['some_output_key'])
// ```
// where 'some_output_key' is the output key for the channel that evaluator
// uses as one of its input.
string output_key = 4;
}
repeated Channel channels = 1;
// The minimum number of artifacts desired. If minimum requirement is not met,
// the execution should not be triggered. If min_count is less than or equal
// to 0, it means this input is optional.
int32 min_count = 2;
}
The definition of NodeOutputs
message is shown below. It represents the
outputs of a node. Each entry in the map represents an output key and the output
specification for that key. The output specification currently contains the
artifact information such as artifact type and additional properties that should
be attached to the output artifact.
// A proto message wrapping all information needed to query one set of artifacts
// from MLMD.
message OutputSpec {
// Information of the desired artifacts.
message ArtifactSpec {
// The name of the artifact type.
ml_metadata.ArtifactType type = 1;
// Additional properties to set when outputting artifacts.
map<string, Value> additional_properties = 2;
}
ArtifactSpec artifact_spec = 1;
}
// Specifications for node outputs.
message NodeOutputs {
map<string, OutputSpec> outputs = 1;
}
ExecutorSpec
specifies the executor of a node. Currently it only includes
Python class based specification in the definition as defining executor specs
for different platforms is not the goal of this RFC.
NOTE: ExecutorSpec
is meant to be a platform-specific extension point. Later
evolution of the IR will include more updates on this part to make it more
extensible for different form factors of executors such as containers.
// ExecutorSpec is still WIP
message ExecutorSpec {
// Executor specification for Python-class based executors.
message PythonClassExecutorSpec {
// The full class path of the executor.
string class_path = 1;
}
oneof spec {
PythonClassExecutorSpec python_class_executor_spec = 1;
}
}
Similar to ExecutorSpec
, NodeExecutionOptions
is another place for
platform-specific extensions. Currently it only contains some simple options for
caching control, but in the future we plan to further enhance this, and
introduce configurations that are specific to different platforms.
// Options for executing the node.
message NodeExecutionOptions {
message CachingOptions {
// Whether or not to enable cache for this node.
bool enable_cache = 1;
}
CachingOptions caching_options = 1;
}
The definition of a pipeline is shown below. Since most of the fields should be self-explanatory, we will not go through all the fields but only highlight some of them instead:
- The
pipeline_info
must be stable across time. Otherwise it will break asynchronous data fetching. - The
ExecutionMode
definition reflects the design for TFX DSL semantics: Each pipeline should be in eitherSYNC
mode orASYNC
mode, corresponding to synchronous execution and asynchronous execution respectively. Only the outermost pipeline can be inASYNC
mode. - A pipeline can have an arbitrary number of nodes. Those nodes can be either
of the following:
- A
PipelineNode
instance, representing a normal pipeline node, which is the unsplittable execution unit. - A
Pipeline
instance, representing a sub-pipeline.
- A
// Message struct that contains pipeline runtime specifications.
message PipelineRuntimeSpec {
// Required field. Base directory of the pipeline. If not specified in DSL,
// sub-pipelines will be compiled to use the same pipeline root as the parent
// pipeline.
Value pipeline_root = 1;
// A unique id to identify a pipeline run. This will not be set during
// compilation time but is required for synchronous pipeline execution.
Value pipeline_run_id = 2;
}
// Basic info of a pipeline.
// The information in `PipelineInfo` should stay stable across time.
// Asynchronous data fetching behavior might change if this changes.
message PipelineInfo {
// Required field. A pipeline must have an id.
string id = 1;
}
// Definition for a pipeline. This is also the definition of a sub-pipeline.
message Pipeline {
enum ExecutionMode {
EXECUTION_MODE_UNSPECIFIED = 0;
SYNC = 1;
ASYNC = 2;
}
// A node inside a pipeline can be either a `PipelineNode` or a `Pipeline` as
// a sub-pipeline.
message PipelineOrNode {
oneof node {
// A normal pipeline node. This is the unsplittable execution unit.
PipelineNode pipeline_node = 1;
// Sub-pipelines should only have execution mode `SYNC`.
Pipeline sub_pipeline = 2;
}
}
PipelineInfo pipeline_info = 1;
repeated PipelineOrNode nodes = 2;
PipelineRuntimeSpec runtime_spec = 3;
// Execution mode of the pipeline. Only the outermost pipeline can be `ASYNC`.
ExecutionMode execution_mode = 4;
// Configs for different platforms, keyed by tags for different platforms that
// users provide.
map<string, google.protobuf.Any> platform_configs = 5;
// TFX DSL SDK version for this pipeline.
string sdk_version = 6;
}
In this section, we will first go through the standard workflow of a pipeline (especially a pipeline node) execution and then see how the IR can instruct the executions of pipelines in different scenarios.
We will mainly focus on the execution of a single pipeline node as the full pipeline execution is just a collection of pipeline node executions under the DSL data model.
Here are the steps for a standard pipeline node execution:
-
Input artifacts resolution. In this step,
NodeInputs
is leveraged to produce instructions to fetch artifacts from MLMD, as discussed previously. The end result will be aDict[Text, List[Artifact]]
. Note that ifContextSpec
is specified for a pipeline run, the data fetching will be synchronous as the search scope is limited to the pipeline run. -
Execution properties resolution. In this step, values of execution properties will be resolved (potentially from runtime parameters) from
NodeParameters
in the IR. The end result will be aDict[Text, Any]
. -
Register contexts. Create and register contexts for this node, using instructions specified by
NodeContexts
messages. Contexts that are already registered will not be registered again but reused. Note that nodes inside a synchronous pipeline will have an extra context for pipeline run than an asynchronous pipeline. The pipeline run context can uniquely identify a pipeline run, thus it can guarantee synchronous data fetching when being specified as aContextQueries
message in theInputSpec
definition for one of the input of a consumer node. -
[Optional] Search for cache. Queries MLMD to see whether there is an cached execution available given the input artifacts (got from step (1)), execution properties (got from step (2)) and output specification (
NodeOutputs
in IR). This step and the next step are optional. If cache is not enabled, directly go to step (6). -
[Optional] If cache is hit in (4). We do not need to re-execute:
- In synchronous execution, we publish the execution with state
CACHED
with all the resolved input artifacts, execution properties and the cached output artifacts linked to it. All the execution and artifacts will be associated with the contexts obtained from step (3). - In Asynchronous execution, we do nothing and simply return.
NOTE: For asynchronous execution, the current design can be problematic for nodes which do not expect every input to be part of its trigger condition. This will be addressed in a follow-up design on
TriggerPolicy
. - In synchronous execution, we publish the execution with state
-
Prepare outputs. This step prepares the output artifacts based on
NodeOutputs
, including the URIs of the outputs for the upcoming execution. The end result is aDict[Text, List[Artifact]]
. This and the following two steps are needed if (a) cache is disabled, or (b) cache not available. -
Execute.
ExecutorSpec
message will be used to instruct the execution. -
Publish execution. This step publishes the following things atomically to MLMD:
- The execution.
- The output artifacts.
- Newly created contexts if any.
- Events linking all the input and output artifacts to the execution.
- Associations between the new execution and the contexts from (3).
- Attributions between the artifacts (both input and output) and contexts from (3).
In this section, we will use some typical scenarios to showcase how the IR helps to drive pipeline executions.
This is the simplest scenario:
- Pipeline in synchronous execution mode
- Data fetching is synchronous. This means that a downstream node strictly depends on the output of its direct upstream node(s).
Table 18 that was defined previously demonstrates this scenario well, containing only two nodes. The IR representation of the two nodes are shown in Table 15 and Table 16.
While much of the representation is straightforward, the connection between two nodes is the interesting part. As we discussed in the previously, there are three stages to get the input artifacts for a node: context filtering, execution filtering and artifact filtering. The information used to instruct those filterings are colored accordingly in Table 16. The part with the same color in Table 15 represents how the producer node registers itself with the same information so that the consumer node can find the desired artifacts.
node_info: { type: {name: "MyExampleGen"} # execution info id: "MyExampleGen" # execution info } contexts { # context info contexts { . type: {name: "pipeline"} . name: "my_pipeline" . properties {...} . } . contexts { . type: {name: "pipeline_run"} . name: "my_pipeline.my_run" . properties {...} . } . } # context info parameters{ parameters { key: "param_one" value {...} } } outputs { outputs { key: "output_examples" # artifact info value { . artifact_spec { . type { . name: "my_examples_type" . } . } # artifact info } } } executor {...}
node_info: { type_name: "MyTrainer" id: "MyTrainer" } contexts {...} parameters {...} inputs { inputs { key: "input_examples" value { channels { producer_node_query { # execution filtering id: "MyExampleGen" . } # execution filtering context_queries { # context filtering type: {name: "pipeline"} . name: "my_pipeline" . } . context_queries { . type: {name: "pipeline_run"} . name: "my_pipeline.my_run" . } # context filtering artifact_query { # artifact filtering type: {name: "my_examples_type"} . } . output_key: "output_examples" # artifact filtering } required: True } } } outputs {...} executor {...} upstream_nodes: "MyExampleGen"
def create_pipeline():
example_gen = MyExampleGen(param_one=...)
trainer = MyTrainer(input_examples=example_gen.output['output_examples'])
return Pipeline(
pipeline_name='my_pipeline',
components=[example_gen, trainer],
execution_mode=ASYNC, ...)
The representation of a pipeline node under asynchronous mode shares the same structure as a pipeline node under synchronous mode. The differences between these two are (as highlighted in Table 18 and Table 16, for producer and consumer respectively):
- A node under asynchronous mode will NOT be associated with a pipeline run context.
- A node under asynchronous mode will have a
ResolverConfig
message in its input specification. This is because that the search scope for a node under asynchronous mode will be the historical results of all previous runs under the same pipeline definition, which means further filtering is needed. The resolver policy can be as simple as ‘latest one’.
node_info: {
type: {name: "MyExampleGen"}
id: "MyExampleGen"
}
contexts {
contexts {
type: {name: "pipeline"}
name: "my_pipeline"
properties {...}
}
- # contexts {
- # type: {name: "pipeline_run"}
- # name: "my_pipeline.my_run"
- # properties {...}
- # }
}
parameters{
parameters {
key: "param_one"
value {...}
}
}
outputs {
outputs {
key: "output_examples"
value {
artifact_spec {
type {
name: "my_examples_type"
}
}
}
}
}
executor {...}
node_info: {
type_name: "MyTrainer"
id: "MyTrainer"
}
contexts {
contexts {
type: {name: "pipeline"}
name: "my_pipeline"
properties {...}
}
contexts {
type: {name: "pipeline_run"}
name: "my_pipeline.my_run"
properties {...}
}
}
parameters {...}
inputs {
inputs {
key: "input_examples"
value {
channels {
producer_node_query {
id: "MyExampleGen"
}
context_queries {
type: {name: "pipeline"}
name: "my_pipeline"
}
- # context_queries {
- # type: {name: "pipeline_run"}
- # name: "my_pipeline.my_run"
- # }
artifact_query {
type: {name: "my_examples_type"}
}
output_key: "output_examples"
}
required: True
}
}
+ resolver_config {...}
}
outputs {...}
executor {...}
upstream_nodes: "MyExampleGen"
The IR for a ResolverNode
does not have an executor specification. What is
more, it will not have specification messages for outputs either. The consumer
of the resolver node will be encoded to use the key of the inputs in resolvers
as the output_key
in the their input specification definition.
A ResolverNode
will always carry a resolver_config
definition (highlighted),
which will be applied on top of all inputs to a resolver for further filtering.
This is the same as the nodes under asynchronous execution pipelines since they
are all in asynchronous data mode.
Since a ResolverNode
is a mix of synchronous execution mode and asynchronous
data mode, the contexts related representation is special:
- The
Contexts
specification will contain at least one specification for the pipeline and one specification for the pipeline run. This is because the result of aResolverNode
will be consumed synchronously by its downstream nodes in the same synchronous execution pipeline. - The
ContextQueries
specification will NOT contain the specification for pipeline run context related queries. This is because it needs to use the entire pipeline sub-graph as its search scope to find right artifacts.
node_info: {...}
contexts {...}
parameters {...}
inputs {
inputs {
key: ...
value {
channels {
producer_node_query {...}
context_queries {
type: {name: "pipeline"}
name: "my_pipeline"
}
artifact_query {...}
output_key: ...
}
required: True
}
}
+ resolver_config {...}
}
...
In this section, we will use the example in Table 5 to illustrate how the data mode of sub-pipelines are represented in IR.
Table 21 shows the structure of the entire pipeline (including the parent pipeline and the sub-pipeline). There are several places worth noting:
- The execution mode of the sub-pipeline is
SYNC
. As discussed in the semantics RFC, a sub-pipeline should be always inSYNC
execution mode. - The sub-pipeline has its own pipeline id. As we shall see later, this will also be encoded into the context related specifications for nodes that are either inside the sub-pipeline or consumes the outputs of nodes that are inside the sub-pipeline.
- There are two artificial nodes that are added into the sub-pipeline: a 'Head Barnacle' node and a 'Tail Barnacle' node, as mentioned in the data mode section.
execution_mode: ASYNC
pipeline_info {
id: 'my_pipeline'
}
nodes {
pipeline_node {...} // ExampleGen
}
nodes {
pipeline_node {...} // EmbeddinGenerator
}
nodes {
+ sub_pipeline {
+ execution_mode: SYNC
+ pipeline_info {
+ id: 'my_sub_pipeline'
+ }
+ nodes {...} // Head barnacle</span>
nodes {...} // Transform
nodes {...} // Trainer
nodes {...} // InfraValidator
+ nodes {...} // Tail barnacle</span>
}
}
nodes {
pipeline_node {...} // Pusher
}
nodes {
pipeline_node {...} // TFLite converter
}
...
Some nodes in the pipeline are not different from any other nodes inside an
asychronous execution pipeline (ExampleGen
, EmbeddingGenerator
) or a
synchronous execution pipeline (InfraValidator
). We will focus on the nodes
that are special to a sub-pipeline.
Table 22 demonstrates the IR of the ‘Head Barnacle’ node. Similar
to ResolverNode
:
- It does not have a specification for executor.
- It does not have a specification for outputs.
Beside that, there are a couple of points to pay attention to: The ‘Head
Barnacle’ node, along with all the nodes inside the sub-pipeline, will have at
least three ContextSpec
messages for the following contexts that the nodes
will be associated to. This should be verifiable on the execution backend.
- Parent pipeline context
- Sub-pipeline context
- Sub-pipeline run context
node_info: { type: {name: "HeadBarnacle"} id: "head_barnacle" } contexts { contexts {...} # parent pipeline context contexts {...} # sub-pipeline context contexts {...} # sub-pipeline run context } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "example_gen" } context_queries { type {name: "pipeline"} name: "my_pipeline" # The context of the parent pipeline. } artifact_query { type: {name: "Example"} } output_key: "examples" } required: True } } }
Next, let's take a look at how synchronous and asynchronous inputs access are
represented in IR. We will use Trainer
as the example as it has one
synchronous input and one asynchronous input.
- For synchronous inputs, the
Trainer
is encoded to read from the 'Head Barnacle' to get the snapshot that is also used byTransform
(omitted as it shares the same behavior asTrainer
). This is shown in the firstinputs
block below. - For asynchronous inputs, the
Trainer
is encoded in the same way as a node inside an asynchronous execution pipeline, reading directly fromEmbedding generator
. This is shown in the secondinputs
block below.
Moreover, it also reads from Transform
in the same way one as one might expect
for a normal synchronous execution pipeline. This is shown in the third inputs
block below.
node_info: { type: {name: "Trainer"} id: "trainer" } contexts { contexts {...} # parent pipeline context contexts {...} # sub-pipeline context contexts {...} # sub-pipeline run context } inputs { # 1. synchronous input to a sub-pipeline. inputs { key: "examples" value { channels { producer_node_query { id: "head_barnacle" } context_queries { # The context query for the parent pipeline. type {name: "pipeline"} name: "my_pipeline" } context_queries { # The context query for the sub-pipeline. type {name: "pipeline"} name: "my_sub_pipeline" } context_queries { # The context query for the sub-pipeline run. type {name: "pipeline_run"} name: "my_sub_pipeline.run_id" } artifact_query { type: {name: "Example"} } output_key: "examples" } required: True } } # 2. asynchronous input to a sub-pipeline. inputs { key: "embedding" value { channels { producer_node_query { id: "embedding_generator" } context_queries { # The context query for the parent pipeline. type {name: "pipeline"} name: "my_pipeline" } artifact_query { type: {name: "Embedding"} } output_key: "embedding" } required: True } } # 3. The same way as in normal synchronous execution pipelines. inputs { key: "transform_graph" value { channels { producer_node_query { id: "transform" } context_queries {...} # The context query for the parent pipeline. context_queries {...} # The context query for the parent pipeline. context_queries {...} # The context query for the parent pipeline. artifact_query { type: {name: "TransformGraph"} } output_key: "transform_graph" } required: True } } resolver_config {...} }
The 'Tail Barnacle' will be symmetric to the 'Head Barnacle', snapshotting the
output artifacts produced by the nodes inside the sub-pipeline so that they can
be accessed as synchronous outputs from the outer pipeline. For simplicity, the
represnetation of the 'Tail Barnacle' is not shown but its node id
tail_barnacle
will be used by the Pusher
discussed next.
Consuming asynchronous outputs of a node inside a sub-pipeline is similar to the
normal pattern in asynchronous excution pipelines. The only difference is that
the ContextQueries
will need to contain the context for the sub-pipeline.
Consuming synchronous outputs of a node inside a sub-pipeline, on the other
hand, will be encoded differently. As shown below, the ProducerNodeQuery
of
the input indicates the data model underneath that the Pusher
is actually
reading from the output Channel
of the 'Tail Barnacle'.
node_info: {
type: {name: "Pusher"}
id: "pusher"
}
contexts {
contexts {...} # parent pipeline context
}
inputs {
inputs {
key: "model"
value {
channels {
producer_node_query {
id: "tail_barnacle"
}
context_queries { # The context query for the parent pipeline.
type {name: "pipeline"}
name: "my_pipeline"
}
context_queries { # The context query for the sub-pipeline.
type {name: "pipeline"}
name: "my_sub_pipeline"
}
artifact_query {
type: {name: "Model"}
}
output_key: "model"
}
required: True
}
}
inputs {
key: "validation_result"
value {...} # Similar to 'model', omitted.
}
resolver_config {...}
}
...
There are three directions of future improvement on the data model and IR:
-
Better extensibility
ExecutorSpec
needs to be enhanced to support more executor form factors and platforms.NodeExecutionOptions
needs to be enhanced to support platform-specific configs.
-
Richer functionality / expressiveness
ResolverConfig
needs to be enhanced for better flexibility.TriggerPolicy
needs to be introduced, in order to better support asynchronous execution pipelines, conditionals and potentially custom trigger logic of a node.- Output specification needs to be enhanced to better support the scenario
of multiple outputs per
Channel
.
-
Better performance
- Needs to be able to support parallel executions within a node. This is needed for scenarios like batch backfill and SIMD-style components.
- Graph optimization is another potential performance optimization. With some of the technology TFX depends on (e.g., Apache Beam), fusion / merging of different nodes can potentially reduce resource usage and shorten the processing time of the entire pipeline.