Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: DirectRunner fails after grouping into batches, and applying flat map with assertion error #28716

Open
1 of 16 tasks
damianr13 opened this issue Sep 28, 2023 · 7 comments
Open
1 of 16 tasks

Comments

@damianr13
Copy link

What happened?

I am running the following code:

import argparse

import apache_beam as beam
import structlog
from apache_beam.options.pipeline_options import PipelineOptions

logger = structlog.getLogger()


def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()

    # Parse the arguments from the command line as defined in the options class
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    with (beam.Pipeline(options=pipeline_options) as p):
        (
            p
            | "Read Vertex embedder output" >> beam.Create(["a", "b", "c"])
            | "Assign dummy keys" >> beam.Map(lambda x: (None, x))
            | "Batch up to 30 elements" >> beam.GroupIntoBatches(30)
            | "Flat map" >> beam.FlatMap(lambda x: x)
            | "Write to GCS"
            >> beam.io.WriteToText(
                "gs://my_bucket/apache_beam_test/output",
                file_name_suffix=".jsonl",
            )
        )


if __name__ == "__main__":
    run()

As you can see this code generates 3 dummy values. Let's say I want to take my elements in batches of 30, so I assign the same dummy key to all of them. After working with my batches, I want to go back to single elements, and then publish the results to Google Cloud Storage.

Running the code above fails with the following error:

ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
WARNING:apache_beam.runners.direct.executor:A task failed with exception: 
Traceback (most recent call last):
  File "/Users/robert-andreidamian/Workspace/panprices/product_image_hasher/after_embeddings.py", line 34, in <module>
    run()
  File "/Users/robert-andreidamian/Workspace/panprices/product_image_hasher/after_embeddings.py", line 18, in run
    with (beam.Pipeline(options=pipeline_options) as p):
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/pipeline.py", line 601, in __exit__
    self.result.wait_until_finish()
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/direct_runner.py", line 585, in wait_until_finish
    self._executor.await_completion()
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py", line 432, in await_completion
    self._executor.await_completion()
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py", line 480, in await_completion
    raise update.exception
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py", line 370, in call
    self.attempt_call(
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py", line 413, in attempt_call
    evaluator.process_element(value)
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 939, in process_element
    assert not self.global_state.get_state(
AssertionError

I am running the code in a pipenv environent, with python version 3.11. I attached the requirements.
requirements.txt

Here is my Pipfile:

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
google-cloud-bigquery-storage = "*"
google-cloud-bigquery = {extras = ["bqstorage", "pandas"], version = "*"}
google-cloud-tasks = "*"
firebase-admin = "*"
Pillow = "*"
imagehash = "*"
structlog = "*"
functions-framework = "*"
opencv-python = "*"
numpy = "*"
asyncio = "*"
pydantic = {extras = ["dotenv"], version="~=1.10"}
psycopg2-binary = "*"
google-cloud-aiplatform = "*"
gcloud-aio-storage = "*"
gcloud-rest-storage = "*"
validators = "*"
avif = {extras = ["pillow"], version="*"}
pillow-avif-plugin = "*"
wheel = "*"
apache-beam = {extras = ["gcp"], version = "*"}
cloud-sql-python-connector = {extras = ["pg8000"], version = "*"}

[dev-packages]
flask = {extras = ["async"], version="*"}

[requires]
python_version = "3.11"

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@jrmccluskey
Copy link
Contributor

Okay so I can repro the issue, specifically when the batch size specified is > the number of elements in the input PCollection and the another GBK happens in the pipeline after the GroupIntoBatches call. (GBK is marked as complete here and then re-invoked here.)

This would happen if _is_final_bundle() returns true, AKA the watermark was advanced to infinity prematurely for the next invocation of a GBK. This seems to be the case if batch size > num elements, with the watermark still being at positive infinity on the next invocation instead of being reset to negative infinity for the next DoFn.

I can demonstrate this with this pipeline + some extra logging within the GBK transform evaluator:

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()

    # Parse the arguments from the command line as defined in the options class
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    with (beam.Pipeline(options=pipeline_options) as p):
        (
            p
            | "Read Vertex embedder output" >> beam.Create(["a", "b", "c"])
            | "Assign dummy keys" >> beam.Map(lambda x: ("key", x))
            | "Batch up to 30 elements" >> beam.GroupIntoBatches(3)
            | "Flat map" >> beam.FlatMap(lambda x: x[1])
            | "re-key" >> beam.Map(lambda x: ("key2", x))
            | "gbk" >> beam.GroupByKey()
        )


if __name__ == "__main__":
    run()

With batch size = 3 we get each element through both GBKs, but with batch size > 3 the second GBK invocation fails on the first element because the Completion Tag and watermark are not reset. I haven't quite found where the reset between DoFn invocations happens yet, but this is where the problem is.

@tvalentyn
Copy link
Contributor

tvalentyn commented Oct 12, 2023

Somehow this pipeline is using a wrong direct runner (apache_beam/runners/direct/executor.py).

the old BundleBased direct runner is no longer maintained, and it is not expected that it is still being used for this batch pipeline.

The relevant code is in

if _FnApiRunnerSupportVisitor().accept(pipeline):
from apache_beam.portability.api import beam_provision_api_pb2
from apache_beam.runners.portability.fn_api_runner import fn_runner
from apache_beam.runners.portability.portable_runner import JobServiceHandle
all_options = options.get_all_options()
encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
provision_info = fn_runner.ExtendedProvisionInfo(
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=encoded_options))
runner = fn_runner.FnApiRunner(provision_info=provision_info)
else:
runner = BundleBasedDirectRunner()

We should have used the FnApi runner.

@damianr13
Copy link
Author

damianr13 commented Oct 13, 2023

@tvalentyn

Thanks for your reply. Based on what you said I added some breakpoints and I discovered that this is where the decision to not use FnApi runner is taken (line 111):

if userstate.is_stateful_dofn(dofn):
_, timer_specs = userstate.get_dofn_specs(dofn)
for timer in timer_specs:
if timer.time_domain == TimeDomain.REAL_TIME:
self.supported_by_fnapi_runner = False

This happens while it checks _GroupIntoBatchesDoFn.

Here is a screenshot of the timer data I took in my IDE:
Screenshot 2023-10-13 at 11 55 01

I could investigate further and come up with a PR maybe, but I will need a few pointers:

  • Why is the check for "REAL_TIME" here in the first place?
  • What does it mean for a ParDo to be "REAL_TIME"?

@tvalentyn
Copy link
Contributor

Thanks!

The implementation of GroupIntoBatches includes a processing-time timer:

BUFFERING_TIMER = TimerSpec('buffering_end', TimeDomain.REAL_TIME)
, you can find more information about timers in https://beam.apache.org/documentation/programming-guide/#processing-time-timers.

This logic was added in #13144

@robertwb do you know if this restriction still applies:

if timer.time_domain == TimeDomain.REAL_TIME:
self.supported_by_fnapi_runner = False
?

We could also try to lift it, run the tests and see what fails.

@damianr13
Copy link
Author

I was trying to set up my environment using the following guide: https://cwiki.apache.org/confluence/display/BEAM/Python+Tips

I noticed that the build-requirements.txt file is missing. I am guessing that one can get the dependencies using the gradle build file.

Should I open another issue that is about updating the documentation?

@AnandInguva
Copy link
Contributor

@damianr13 I am working in the updating the wiki page. Thanks.

You don't need to use build-requirements.txt anymore if you rebase/merge onto master branch.

@keunhong
Copy link

GroupIntoBatches seems to still cause this issue with beam 2.58.0 when using DirectRunner and even TestPipeline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants