diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py index 5a10bcfd7031..6c4e63ddf13a 100644 --- a/sdks/python/apache_beam/runners/direct/bundle_factory.py +++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py @@ -121,6 +121,7 @@ def __init__(self, pcollection, stacked=True): self._tag = None # optional tag information for this bundle def get_elements_iterable(self, make_copy=False): + # type: (bool) -> Iterable[WindowedValues] """Returns iterable elements. Args: diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 2b22c086099b..43e358eb4918 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -376,12 +376,12 @@ def get_execution_context(self, applied_ptransform): self._transform_keyed_states[applied_ptransform]) def create_bundle(self, output_pcollection): - # type: (pvalue.PCollection) -> _Bundle + # type: (PCollection) -> _Bundle """Create an uncommitted bundle for the specified PCollection.""" return self._bundle_factory.create_bundle(output_pcollection) def create_empty_committed_bundle(self, output_pcollection): - # type: (pvalue.PCollection) -> _Bundle + # type: (PCollection) -> _Bundle """Create empty bundle useful for triggering evaluation.""" return self._bundle_factory.create_empty_committed_bundle( output_pcollection) diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index a49e2e992c4a..bad19e3026d6 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -42,6 +42,7 @@ from apache_beam.utils import counters if typing.TYPE_CHECKING: + from apache_beam.runners.direct.bundle_factory import _Bundle from apache_beam.runners.direct.evaluation_context import EvaluationContext T = TypeVar('T') @@ -280,9 +281,15 @@ class TransformExecutor(_ExecutorService.CallableTask): _MAX_RETRY_PER_BUNDLE = 4 - def __init__(self, transform_evaluator_registry, evaluation_context, - input_bundle, fired_timers, applied_ptransform, - completion_callback, transform_evaluation_state): + def __init__(self, + transform_evaluator_registry, + evaluation_context, + input_bundle, # type: _Bundle + fired_timers, + applied_ptransform, + completion_callback, + transform_evaluation_state + ): self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle @@ -464,6 +471,7 @@ def request_shutdown(self): self.executor_service.shutdown() def schedule_consumers(self, committed_bundle): + # type: (_Bundle) -> None if committed_bundle.pcollection in self.value_to_consumers: consumers = self.value_to_consumers[committed_bundle.pcollection] for applied_ptransform in consumers: @@ -474,8 +482,11 @@ def schedule_unprocessed_bundle(self, applied_ptransform, unprocessed_bundle): self.node_to_pending_bundles[applied_ptransform].append(unprocessed_bundle) - def schedule_consumption(self, consumer_applied_ptransform, committed_bundle, - fired_timers, on_complete): + def schedule_consumption(self, + consumer_applied_ptransform, + committed_bundle, # type: _Bundle + fired_timers, on_complete + ): """Schedules evaluation of the given bundle with the transform.""" assert consumer_applied_ptransform assert committed_bundle diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 5a0c4179542a..a02fd1faa55e 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -23,6 +23,9 @@ import typing from builtins import object from typing import Dict +from typing import Iterable +from typing import List +from typing import Set from apache_beam import pipeline from apache_beam import pvalue @@ -33,6 +36,8 @@ if typing.TYPE_CHECKING: from apache_beam.pipeline import AppliedPTransform + from apache_beam.runners.direct.bundle_factory import _Bundle + from apache_beam.utils.timestamp import Timestamp class WatermarkManager(object): @@ -66,6 +71,7 @@ def __init__(self, clock, root_transforms, value_to_consumers, self._update_input_transform_watermarks(consumer) def _update_input_transform_watermarks(self, applied_ptransform): + # type: (AppliedPTransform) -> None assert isinstance(applied_ptransform, pipeline.AppliedPTransform) input_transform_watermarks = [] for input_pvalue in applied_ptransform.inputs: @@ -99,9 +105,15 @@ def get_watermarks(self, applied_ptransform): return self._transform_to_watermarks[applied_ptransform] - def update_watermarks(self, completed_committed_bundle, applied_ptransform, - completed_timers, outputs, unprocessed_bundles, - keyed_earliest_holds, side_inputs_container): + def update_watermarks(self, + completed_committed_bundle, # type: _Bundle + applied_ptransform, # type: AppliedPTransform + completed_timers, + outputs, + unprocessed_bundles, + keyed_earliest_holds, + side_inputs_container + ): assert isinstance(applied_ptransform, pipeline.AppliedPTransform) self._update_pending( completed_committed_bundle, applied_ptransform, completed_timers, @@ -110,9 +122,13 @@ def update_watermarks(self, completed_committed_bundle, applied_ptransform, tw.hold(keyed_earliest_holds) return self._refresh_watermarks(applied_ptransform, side_inputs_container) - def _update_pending(self, input_committed_bundle, applied_ptransform, - completed_timers, output_committed_bundles, - unprocessed_bundles): + def _update_pending(self, + input_committed_bundle, + applied_ptransform, # type: AppliedPTransform + completed_timers, + output_committed_bundles, # type: Iterable[_Bundle] + unprocessed_bundles # type: Iterable[_Bundle] + ): """Updated list of pending bundles for the given AppliedPTransform.""" # Update pending elements. Filter out empty bundles. They do not impact @@ -181,17 +197,19 @@ class _TransformWatermarks(object): def __init__(self, clock, keyed_states, transform): self._clock = clock self._keyed_states = keyed_states - self._input_transform_watermarks = [] + self._input_transform_watermarks = [] # type: List[_TransformWatermarks] self._input_watermark = WatermarkManager.WATERMARK_NEG_INF self._output_watermark = WatermarkManager.WATERMARK_NEG_INF self._keyed_earliest_holds = {} - self._pending = set() # Scheduled bundles targeted for this transform. + # Scheduled bundles targeted for this transform. + self._pending = set() # type: Set[_Bundle] self._fired_timers = set() self._lock = threading.Lock() self._label = str(transform) def update_input_transform_watermarks(self, input_transform_watermarks): + # type: (List[_TransformWatermarks]) -> None with self._lock: self._input_transform_watermarks = input_transform_watermarks @@ -202,11 +220,13 @@ def update_timers(self, completed_timers): @property def input_watermark(self): + # type: () -> Timestamp with self._lock: return self._input_watermark @property def output_watermark(self): + # type: () -> Timestamp with self._lock: return self._output_watermark @@ -219,10 +239,12 @@ def hold(self, keyed_earliest_holds): del self._keyed_earliest_holds[key] def add_pending(self, pending): + # type: (_Bundle) -> None with self._lock: self._pending.add(pending) def remove_pending(self, completed): + # type: (_Bundle) -> None with self._lock: # Ignore repeated removes. This will happen if a transform has a repeated # input. @@ -230,6 +252,7 @@ def remove_pending(self, completed): self._pending.remove(completed) def refresh(self): + # type: () -> bool """Refresh the watermark for a given transform. This method looks at the watermark coming from all input PTransforms, and diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index 727c2d59a96d..3285c97a7f24 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -66,14 +66,16 @@ class Environment(object): Provides consistency with how the other componentes are accessed. """ def __init__(self, proto): + # type: (beam_runner_api_pb2.Environment) -> None self._proto = proto def to_runner_api(self, context): - # type: (PipelineContext) -> Any + # type: (PipelineContext) -> beam_runner_api_pb2.Environment return self._proto @staticmethod def from_runner_api(proto, context): + # type: (beam_runner_api_pb2.Environment) -> Environment return Environment(proto) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index f086108c9982..2a168f51328e 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -48,6 +48,7 @@ from apache_beam.runners.worker import worker_pool_main if typing.TYPE_CHECKING: + from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import Pipeline __all__ = ['PortableRunner'] @@ -110,7 +111,7 @@ def default_docker_image(): @staticmethod def _create_environment(options): - # type: (...) -> beam_runner_api_pb2.Environment + # type: (PipelineOptions) -> beam_runner_api_pb2.Environment portable_options = options.view_as(PortableOptions) environment_urn = common_urns.environments.DOCKER.urn if portable_options.environment_type == 'DOCKER': @@ -178,7 +179,7 @@ def create_job_service(self, options): return server.start() def run_pipeline(self, pipeline, options): - # type: (Pipeline, Any) -> PipelineResult + # type: (Pipeline, PipelineOptions) -> PipelineResult portable_options = options.view_as(PortableOptions) # TODO: https://issues.apache.org/jira/browse/BEAM-5525