Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
chadrik committed Sep 4, 2019
1 parent 9c4941f commit 577bbd1
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 18 deletions.
1 change: 1 addition & 0 deletions sdks/python/apache_beam/runners/direct/bundle_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(self, pcollection, stacked=True):
self._tag = None # optional tag information for this bundle

def get_elements_iterable(self, make_copy=False):
# type: (bool) -> Iterable[WindowedValues]
"""Returns iterable elements.
Args:
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/direct/evaluation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ def get_execution_context(self, applied_ptransform):
self._transform_keyed_states[applied_ptransform])

def create_bundle(self, output_pcollection):
# type: (pvalue.PCollection) -> _Bundle
# type: (PCollection) -> _Bundle
"""Create an uncommitted bundle for the specified PCollection."""
return self._bundle_factory.create_bundle(output_pcollection)

def create_empty_committed_bundle(self, output_pcollection):
# type: (pvalue.PCollection) -> _Bundle
# type: (PCollection) -> _Bundle
"""Create empty bundle useful for triggering evaluation."""
return self._bundle_factory.create_empty_committed_bundle(
output_pcollection)
Expand Down
21 changes: 16 additions & 5 deletions sdks/python/apache_beam/runners/direct/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from apache_beam.utils import counters

if typing.TYPE_CHECKING:
from apache_beam.runners.direct.bundle_factory import _Bundle
from apache_beam.runners.direct.evaluation_context import EvaluationContext

T = TypeVar('T')
Expand Down Expand Up @@ -280,9 +281,15 @@ class TransformExecutor(_ExecutorService.CallableTask):

_MAX_RETRY_PER_BUNDLE = 4

def __init__(self, transform_evaluator_registry, evaluation_context,
input_bundle, fired_timers, applied_ptransform,
completion_callback, transform_evaluation_state):
def __init__(self,
transform_evaluator_registry,
evaluation_context,
input_bundle, # type: _Bundle
fired_timers,
applied_ptransform,
completion_callback,
transform_evaluation_state
):
self._transform_evaluator_registry = transform_evaluator_registry
self._evaluation_context = evaluation_context
self._input_bundle = input_bundle
Expand Down Expand Up @@ -464,6 +471,7 @@ def request_shutdown(self):
self.executor_service.shutdown()

def schedule_consumers(self, committed_bundle):
# type: (_Bundle) -> None
if committed_bundle.pcollection in self.value_to_consumers:
consumers = self.value_to_consumers[committed_bundle.pcollection]
for applied_ptransform in consumers:
Expand All @@ -474,8 +482,11 @@ def schedule_unprocessed_bundle(self, applied_ptransform,
unprocessed_bundle):
self.node_to_pending_bundles[applied_ptransform].append(unprocessed_bundle)

def schedule_consumption(self, consumer_applied_ptransform, committed_bundle,
fired_timers, on_complete):
def schedule_consumption(self,
consumer_applied_ptransform,
committed_bundle, # type: _Bundle
fired_timers, on_complete
):
"""Schedules evaluation of the given bundle with the transform."""
assert consumer_applied_ptransform
assert committed_bundle
Expand Down
39 changes: 31 additions & 8 deletions sdks/python/apache_beam/runners/direct/watermark_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import typing
from builtins import object
from typing import Dict
from typing import Iterable
from typing import List
from typing import Set

from apache_beam import pipeline
from apache_beam import pvalue
Expand All @@ -33,6 +36,8 @@

if typing.TYPE_CHECKING:
from apache_beam.pipeline import AppliedPTransform
from apache_beam.runners.direct.bundle_factory import _Bundle
from apache_beam.utils.timestamp import Timestamp


class WatermarkManager(object):
Expand Down Expand Up @@ -66,6 +71,7 @@ def __init__(self, clock, root_transforms, value_to_consumers,
self._update_input_transform_watermarks(consumer)

def _update_input_transform_watermarks(self, applied_ptransform):
# type: (AppliedPTransform) -> None
assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
input_transform_watermarks = []
for input_pvalue in applied_ptransform.inputs:
Expand Down Expand Up @@ -99,9 +105,15 @@ def get_watermarks(self, applied_ptransform):

return self._transform_to_watermarks[applied_ptransform]

def update_watermarks(self, completed_committed_bundle, applied_ptransform,
completed_timers, outputs, unprocessed_bundles,
keyed_earliest_holds, side_inputs_container):
def update_watermarks(self,
completed_committed_bundle, # type: _Bundle
applied_ptransform, # type: AppliedPTransform
completed_timers,
outputs,
unprocessed_bundles,
keyed_earliest_holds,
side_inputs_container
):
assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
self._update_pending(
completed_committed_bundle, applied_ptransform, completed_timers,
Expand All @@ -110,9 +122,13 @@ def update_watermarks(self, completed_committed_bundle, applied_ptransform,
tw.hold(keyed_earliest_holds)
return self._refresh_watermarks(applied_ptransform, side_inputs_container)

def _update_pending(self, input_committed_bundle, applied_ptransform,
completed_timers, output_committed_bundles,
unprocessed_bundles):
def _update_pending(self,
input_committed_bundle,
applied_ptransform, # type: AppliedPTransform
completed_timers,
output_committed_bundles, # type: Iterable[_Bundle]
unprocessed_bundles # type: Iterable[_Bundle]
):
"""Updated list of pending bundles for the given AppliedPTransform."""

# Update pending elements. Filter out empty bundles. They do not impact
Expand Down Expand Up @@ -181,17 +197,19 @@ class _TransformWatermarks(object):
def __init__(self, clock, keyed_states, transform):
self._clock = clock
self._keyed_states = keyed_states
self._input_transform_watermarks = []
self._input_transform_watermarks = [] # type: List[_TransformWatermarks]
self._input_watermark = WatermarkManager.WATERMARK_NEG_INF
self._output_watermark = WatermarkManager.WATERMARK_NEG_INF
self._keyed_earliest_holds = {}
self._pending = set() # Scheduled bundles targeted for this transform.
# Scheduled bundles targeted for this transform.
self._pending = set() # type: Set[_Bundle]
self._fired_timers = set()
self._lock = threading.Lock()

self._label = str(transform)

def update_input_transform_watermarks(self, input_transform_watermarks):
# type: (List[_TransformWatermarks]) -> None
with self._lock:
self._input_transform_watermarks = input_transform_watermarks

Expand All @@ -202,11 +220,13 @@ def update_timers(self, completed_timers):

@property
def input_watermark(self):
# type: () -> Timestamp
with self._lock:
return self._input_watermark

@property
def output_watermark(self):
# type: () -> Timestamp
with self._lock:
return self._output_watermark

Expand All @@ -219,17 +239,20 @@ def hold(self, keyed_earliest_holds):
del self._keyed_earliest_holds[key]

def add_pending(self, pending):
# type: (_Bundle) -> None
with self._lock:
self._pending.add(pending)

def remove_pending(self, completed):
# type: (_Bundle) -> None
with self._lock:
# Ignore repeated removes. This will happen if a transform has a repeated
# input.
if completed in self._pending:
self._pending.remove(completed)

def refresh(self):
# type: () -> bool
"""Refresh the watermark for a given transform.
This method looks at the watermark coming from all input PTransforms, and
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/runners/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ class Environment(object):
Provides consistency with how the other componentes are accessed.
"""
def __init__(self, proto):
# type: (beam_runner_api_pb2.Environment) -> None
self._proto = proto

def to_runner_api(self, context):
# type: (PipelineContext) -> Any
# type: (PipelineContext) -> beam_runner_api_pb2.Environment
return self._proto

@staticmethod
def from_runner_api(proto, context):
# type: (beam_runner_api_pb2.Environment) -> Environment
return Environment(proto)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from apache_beam.runners.worker import worker_pool_main

if typing.TYPE_CHECKING:
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline

__all__ = ['PortableRunner']
Expand Down Expand Up @@ -110,7 +111,7 @@ def default_docker_image():

@staticmethod
def _create_environment(options):
# type: (...) -> beam_runner_api_pb2.Environment
# type: (PipelineOptions) -> beam_runner_api_pb2.Environment
portable_options = options.view_as(PortableOptions)
environment_urn = common_urns.environments.DOCKER.urn
if portable_options.environment_type == 'DOCKER':
Expand Down Expand Up @@ -178,7 +179,7 @@ def create_job_service(self, options):
return server.start()

def run_pipeline(self, pipeline, options):
# type: (Pipeline, Any) -> PipelineResult
# type: (Pipeline, PipelineOptions) -> PipelineResult
portable_options = options.view_as(PortableOptions)

# TODO: https://issues.apache.org/jira/browse/BEAM-5525
Expand Down

0 comments on commit 577bbd1

Please sign in to comment.