Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Error when ParDo returns None for all elements in DirectRunner using more than 1 worker #23228

Closed
Wal8800 opened this issue Sep 14, 2022 · 9 comments · Fixed by #27373 or #27676
Closed

Comments

@Wal8800
Copy link

Wal8800 commented Sep 14, 2022

What happened?

When a custom DoFn can return None depending on the element and running more than 2 worker in the DirectRunner, the pipeline triggers the following error when all the element from the inputs return None in the DoFn.

Error:

  File "/home/wal8800/workspace/test_project/venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1393, in process_bundle
    for ix, part in enumerate(input.partition(self._num_workers)):
AttributeError: 'NoneType' object has no attribute 'partition'

Example pipeline script:

import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class WordExtractingDoFn(beam.DoFn):
    """Parse each line of input text into words."""

    def __init__(self):
        beam.DoFn.__init__(self)

    def process(self, element, *args, **kwargs):
        text_line = element.strip()
        if not text_line:
            return

        return re.findall(r'[\w\']+', text_line, re.UNICODE)


def main(argv=None, save_main_session=True):
    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=True,
        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p = beam.Pipeline(options=pipeline_options)

    lines = p | 'read' >> ReadFromText(known_args.input)

    def count_ones(word_ones):
        (word, ones) = word_ones
        return (word, sum(ones))

    counts = (
            lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))

    # Format the counts into a PCollection of strings.
    def format_result(word_count):
        (word, count) = word_count
        return '%s: %d' % (word, count)

    output = counts | 'format' >> beam.Map(format_result)
    output | 'write' >> WriteToText(known_args.output)

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    main()

Example input.txt (lines of empty string):




Example command:

python bug_example.py --runner direct --direct_running_mode in_memory --direct_num_workers 2 --input input.txt --output output.txt

Ran the example in apache beam 2.41.0

Issue Priority

Priority: 2

Issue Component

Component: runner-direct

@hanneshapke
Copy link

@Wal8800 What was the root cause for the issue? Getting the same error now under 2.41.0 and 2.42.0.

@kdcyberdude
Copy link

Is there any temporary workaround until 2.44.0 get's released?

@hanneshapke
Copy link

@kdcyberdude Not a great solution, but I set the num_workers explicitly to 1. Not great for big workloads. Looking forward to the 2.44.0 release.

@Asphalt2017
Copy link

Asphalt2017 commented Dec 7, 2022

not solved
Even fixed num_workers, but problem still exits.

@OhadRubin
Copy link

This seems like a pretty big bug, the fact that I have to use your library with something way more complex like flink or dataflow.
How has this been not fixed so far?

@nonlinearmiles
Copy link

Was there a resolution to this bug? I am having the same issue in 2.46

@sid-kap
Copy link

sid-kap commented Apr 6, 2023

Switching to single-threaded/single-worker mode fixed it for me

@nonlinearmiles
Copy link

nonlinearmiles commented Apr 6, 2023

Thanks! It works for single-threaded/single-workers for me as well. However, I would like to run my beam pipeline on multiple cores using the DirectRunner.

@AnandInguva
Copy link
Contributor

This should be solved in the version 2.50.0. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment