Skip to content

Commit

Permalink
Get python SDK to work with Flink using the portable representation.
Browse files Browse the repository at this point in the history
Add a hard-coded environment to all Python ParDos to use $USER and bintray
Add reshuffle translation using the portable pipeline representation.
  • Loading branch information
lukecwik committed Mar 23, 2018
1 parent 0ee6547 commit 0362fd1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ interface PTransformTranslator<T> {
this::translateGroupByKey);
urnToTransformTranslator.put(PTransformTranslation.IMPULSE_TRANSFORM_URN,
this::translateImpulse);
urnToTransformTranslator.put(ExecutableStage.URN, this::translateExecutableStage);
urnToTransformTranslator.put(ExecutableStage.URN,
this::translateExecutableStage);
urnToTransformTranslator.put(PTransformTranslation.RESHUFFLE_URN,
this::translateReshuffle);
}

@Override
Expand All @@ -182,6 +185,18 @@ public void translate(

}

private <K, V> void translateReshuffle(
String id,
RunnerApi.Pipeline pipeline,
BatchTranslationContext context) {
RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
DataSet<WindowedValue<KV<K, V>>> inputDataSet =
context.getDataSetOrThrow(
Iterables.getOnlyElement(transform.getInputsMap().values()));
context.addDataSet(Iterables.getOnlyElement(transform.getOutputsMap().values()),
inputDataSet.rebalance());
}

private <InputT> void translateExecutableStage(
String id,
RunnerApi.Pipeline pipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -98,7 +99,10 @@ interface PTransformTranslator<T> {
this::translateGroupByKey);
urnToTransformTranslator.put(PTransformTranslation.IMPULSE_TRANSFORM_URN,
this::translateImpulse);
urnToTransformTranslator.put(ExecutableStage.URN, this::translateExecutableStage);
urnToTransformTranslator.put(ExecutableStage.URN,
this::translateExecutableStage);
urnToTransformTranslator.put(PTransformTranslation.RESHUFFLE_URN,
this::translateReshuffle);
}


Expand All @@ -123,6 +127,18 @@ public void urnNotFound(
id));
}

private <K, V> void translateReshuffle(
String id,
RunnerApi.Pipeline pipeline,
StreamingTranslationContext context) {
RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
DataStream<WindowedValue<KV<K, V>>> inputDataSet =
context.getDataStreamOrThrow(
Iterables.getOnlyElement(transform.getInputsMap().values()));
context.addDataStream(Iterables.getOnlyElement(transform.getOutputsMap().values()),
inputDataSet.rebalance());
}

public <T> void translateFlatten(
String id,
RunnerApi.Pipeline pipeline,
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/runners/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
For internal use only; no backwards-compatibility guarantees.
"""

import os

from apache_beam import coders
from apache_beam import pipeline
Expand Down Expand Up @@ -106,7 +107,12 @@ def from_runner_api(proto):
return PipelineContext(proto)

def to_runner_api(self):
# TODO: Use pipeline options docker container image flag instead
# of hard coding this here.
environment_proto = beam_runner_api_pb2.Environment(
url=os.environ['USER'] + '-docker-apache.bintray.io/beam/python:latest')
context_proto = beam_runner_api_pb2.Components()
context_proto.environments['python'].CopyFrom(environment_proto)
for name in self._COMPONENT_TYPES:
getattr(self, name).populate_map(getattr(context_proto, name))
return context_proto
1 change: 1 addition & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ def to_runner_api_parameter(self, context):
common_urns.PARDO_TRANSFORM,
beam_runner_api_pb2.ParDoPayload(
do_fn=beam_runner_api_pb2.SdkFunctionSpec(
environment_id='python',
spec=beam_runner_api_pb2.FunctionSpec(
urn=python_urns.PICKLED_DOFN_INFO,
payload=picked_pardo_fn_data)),
Expand Down

0 comments on commit 0362fd1

Please sign in to comment.