Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: In Beam Dataframe api, the fillna operation doesn't work when applied on individual columns #31855

Closed
1 of 16 tasks
tvalentyn opened this issue Jul 11, 2024 · 6 comments · Fixed by #32594
Closed
1 of 16 tasks

Comments

@tvalentyn
Copy link
Contributor

What happened?

I was experimenting with Beam Dataframe API through Beam Notebooks + Interactive runner and wasn't able to use fillna on individual columns. Here is a repro on a dataframe with two columns:

%%writefile numbers.csv
col1,col2
1,1
NaN,1
-1,1
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
pipeline = beam.Pipeline(InteractiveRunner())

beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('numbers.csv')
beam_df['col1'] = beam_df['col1'].fillna(0)
ib.collect(beam_df)

This fails with ValueError: "[ConstantExpression[constant_int_140226804834624]]:140226804986784" requires a pipeline to be specified as there are no deferred inputs.

A rewritten version:

c1 = beam_df['col1']
c1 = c1.fillna(0)
ib.collect(c1)  

also fails.

The snippets pass without issues on Pandas or Dask.

Full Stacktrace

ValueError Traceback (most recent call last)
Cell In[62], line 16
14 beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('numbers.csv')
15 beam_df['col1'] = beam_df['col1'].fillna(0)
---> 16 ib.collect(beam_df) # fails with ValueError: "[ConstantExpression[constant_int_140226804834624]]:140226804986784" requires a pipeline to be specified as there are no deferred inputs.
18 c1 = beam_df['col1']
19 c1 = c1.fillna(0)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/utils.py:277, in progress_indicated..run_within_progress_indicator(*args, **kwargs)
274 @functools.wraps(func)
275 def run_within_progress_indicator(*args, **kwargs):
276 with ProgressIndicator(f'Processing... {func.name}', 'Done.'):
--> 277 return func(*args, **kwargs)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py:906, in collect(pcoll, n, duration, include_window_info)
902 # Remember the element type so we can make an informed decision on how to
903 # collect the result in elements_to_df.
904 if isinstance(pcoll, DeferredBase):
905 # Get the proxy so we can get the output shape of the DataFrame.
--> 906 pcoll, element_type = deferred_df_to_pcollection(pcoll)
907 watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
908 else:

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/utils.py:313, in deferred_df_to_pcollection(df)
310 cache.replace_with_cached(df._expr)
312 proxy = df._expr.proxy()
--> 313 return to_pcollection(df, yield_elements='pandas', label=str(df._expr)), proxy

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/convert.py:261, in to_pcollection(label, always_return_tuple, yield_elements, include_indexes, pipeline, *dataframes)
257 new_dataframes = [
258 df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE
259 ]
260 if len(new_dataframes):
--> 261 new_results = {p: extract_input(p)
262 for p in placeholders
263 } | label >> transforms._DataframeExpressionsTransform({
264 ix: df._expr
265 for (ix, df) in enumerate(new_dataframes)
266 }) # type: Dict[Any, pvalue.PCollection]
268 TO_PCOLLECTION_CACHE.update(
269 {new_dataframes[ix]._expr._id: pc
270 for ix, pc in new_results.items()})
272 raw_results = {
273 ix: TO_PCOLLECTION_CACHE[df._expr._id]
274 for ix,
275 df in enumerate(dataframes)
276 }

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:1110, in _NamedPTransform.ror(self, pvalueish, _unused)
1109 def ror(self, pvalueish, _unused=None):
-> 1110 return self.transform.ror(pvalueish, self.label)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:623, in PTransform.ror(self, left, label)
621 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
622 self.pipeline = p
--> 623 result = p.apply(self, pvalueish, label)
624 if deferred:
625 return result

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/pipeline.py:679, in Pipeline.apply(self, transform, pvalueish, label)
677 old_label, transform.label = transform.label, label
678 try:
--> 679 return self.apply(transform, pvalueish)
680 finally:
681 transform.label = old_label

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/pipeline.py:732, in Pipeline.apply(self, transform, pvalueish, label)
729 if type_options.pipeline_type_check:
730 transform.type_check_inputs(pvalueish)
--> 732 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
734 if type_options is not None and type_options.pipeline_type_check:
735 transform.type_check_outputs(pvalueish_result)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/interactive_runner.py:131, in InteractiveRunner.apply(self, transform, pvalueish, options)
129 def apply(self, transform, pvalueish, options):
130 # TODO(qinyeli, BEAM-646): Remove runner interception of apply.
--> 131 return self._underlying_runner.apply(transform, pvalueish, options)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/runner.py:203, in PipelineRunner.apply(self, transform, input, options)
197 def apply(self,
198 transform, # type: PTransform
199 input, # type: Optional[pvalue.PValue]
200 options # type: PipelineOptions
201 ):
202 # TODO(robertwb): Remove indirection once internal references are fixed.
--> 203 return self.apply_PTransform(transform, input, options)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/runner.py:207, in PipelineRunner.apply_PTransform(self, transform, input, options)
205 def apply_PTransform(self, transform, input, options):
206 # TODO(robertwb): Remove indirection once internal references are fixed.
--> 207 return transform.expand(input)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:151, in _DataframeExpressionsTransform.expand(self, inputs)
150 def expand(self, inputs):
--> 151 return self._apply_deferred_ops(inputs, self._outputs)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:470, in _DataframeExpressionsTransform._apply_deferred_ops(self, inputs, outputs)
467 return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
469 # Now we can compute and return the result.
--> 470 return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:470, in (.0)
467 return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
469 # Now we can compute and return the result.
--> 470 return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562, in _memoize..wrapper(*args, **kwargs)
560 key = args, tuple(sorted(kwargs.items()))
561 if key not in cache:
--> 562 cache[key] = f(*args, **kwargs)
563 return cache[key]

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:432, in _DataframeExpressionsTransform._apply_deferred_ops..expr_to_pcoll(expr)
430 return inputs[expr]
431 else:
--> 432 return stage_to_result(expr_to_stage(expr))[expr._id]

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562, in _memoize..wrapper(*args, **kwargs)
560 key = args, tuple(sorted(kwargs.items()))
561 if key not in cache:
--> 562 cache[key] = f(*args, **kwargs)
563 return cache[key]

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424, in _DataframeExpressionsTransform._apply_deferred_ops..stage_to_result(stage)
422 @_memoize
423 def stage_to_result(stage):
--> 424 return {expr._id: expr_to_pcoll(expr)
425 for expr in stage.inputs} | ComputeStage(stage)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424, in (.0)
422 @_memoize
423 def stage_to_result(stage):
--> 424 return {expr._id: expr_to_pcoll(expr)
425 for expr in stage.inputs} | ComputeStage(stage)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562, in _memoize..wrapper(*args, **kwargs)
560 key = args, tuple(sorted(kwargs.items()))
561 if key not in cache:
--> 562 cache[key] = f(*args, **kwargs)
563 return cache[key]

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:432, in _DataframeExpressionsTransform._apply_deferred_ops..expr_to_pcoll(expr)
430 return inputs[expr]
431 else:
--> 432 return stage_to_result(expr_to_stage(expr))[expr._id]

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562, in _memoize..wrapper(*args, **kwargs)
560 key = args, tuple(sorted(kwargs.items()))
561 if key not in cache:
--> 562 cache[key] = f(*args, **kwargs)
563 return cache[key]

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424, in _DataframeExpressionsTransform._apply_deferred_ops..stage_to_result(stage)
422 @_memoize
423 def stage_to_result(stage):
--> 424 return {expr._id: expr_to_pcoll(expr)
425 for expr in stage.inputs} | ComputeStage(stage)

File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:602, in PTransform.ror(self, left, label)
600 p = self.pipeline
601 else:
--> 602 raise ValueError(
603 '"%s" requires a pipeline to be specified '
604 'as there are no deferred inputs.' % self.label)
605 else:
606 p = self.pipeline or pipelines[0]

ValueError: "[ConstantExpression[constant_int_140226803777984]]:140226803785616" requires a pipeline to be specified as there are no deferred inputs.
import apache_beam as beam

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@tvalentyn
Copy link
Contributor Author

beam_df['col1'] = beam_df['col1'].abs() works as expected.

@DKER2
Copy link
Contributor

DKER2 commented Sep 26, 2024

.take-issue

@oyugicollins
Copy link

oyugicollins commented Nov 5, 2024

Hello was this bug truly solved and solution merged .I am facing the same issue and I am using the latest apache beam ,please advice /assist on what I need to do to pick up the changes

Thanks

@liferoad
Copy link
Collaborator

liferoad commented Nov 5, 2024

The newly added test test_dataframe_column_fillna_constant_as_value seems working. So can you paste your code to reproduce the issue?

@oyugicollins
Copy link

Below is a section of my code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
import os
import typing

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_json_file .json"

class BmsSchema(typing.NamedTuple):
    ident: str
    ain_2: typing.Optional[float]
    can_data_frame_1: typing.Optional[str]

beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)

class ParsePubSubMessage(beam.DoFn):
    def process(self, message):
        import json
        all_columns = ['ident', 'ain_2', 'can_data_frame_1']
        main_dict = dict(zip(all_columns, [None] * len(all_columns)))
        record = json.loads(message.decode('utf-8'))
        main_dict.update(record)

        yield {
            'ident': main_dict["ident"],
            'ain_2': main_dict["ain_2"],
            'can_data_frame_1': main_dict["can_data_frame_1"]
        }

def run():
    options = PipelineOptions(
        project='dwingestion',
        runner='DataflowRunner',
        streaming=True,
        save_main_session=True
    )

    options.view_as(StandardOptions).streaming = True

    input_subscription = 'projects........../flespi_data_streaming'

    table_schema = {
        "fields": [
            {"name": "ident", "type": "STRING", "mode": "NULLABLE"},
            {"name": "ain_2", "type": "FLOAT", "mode": "NULLABLE"},
            {"name": "voltage_mV", "type": "INTEGER", "mode": "NULLABLE"}
        ]
    }

    with beam.Pipeline(options=options) as p:
        messages = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                    | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                    | 'Attach Schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                    )

        # Convert the messages to a DataFrame within the pipeline context
        df = to_dataframe(messages)
        df['voltage_mV'] = (df['can_data_frame_1'].str[0:4].fillna(0).astype(str).apply(int, base=16)) * 10 #here  is where the error is coming from 
        df.drop(columns=['can_data_frame_1'], inplace=True)

        # Convert the DataFrame back to a PCollection and map it to dictionaries
        transformed_pcol = (
            to_pcollection(df)
            | 'Convert to Dict' >> beam.Map(lambda row: {
                'ident': row.ident,
                'ain_2': row.ain_2,
                'voltage_mV': row.voltage_mV
            })
        )

        transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(........
           ...........
        )

if __name__ == '__main__':
    run()

Full Trace Error Log
Traceback (most recent call last):
File "C:\Users\coyugi\Documents\watu_etls\bodawerk_etls\bms_test_schema.py", line 86, in
run()
File "C:\Users\coyugi\Documents\watu_etls\bodawerk_etls\bms_test_schema.py", line 69, in run
to_pcollection(df)
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\convert.py", line 255, in to_pcollection
new_results: Dict[Any, pvalue.PCollection] = {
^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py", line 1110, in ror
return self.transform.ror(pvalueish, self.label)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py", line 623, in ror
result = p.apply(self, pvalueish, label)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\pipeline.py", line 686, in apply
return self.apply(transform, pvalueish)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\pipeline.py", line 748, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply
return self.apply_PTransform(transform, input, options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform
return transform.expand(input)
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 151, in expand
return self._apply_deferred_ops(inputs, self._outputs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 471, in _apply_deferred_ops
return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 563, in wrapper
cache[key] = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 433, in expr_to_pcoll
return stage_to_result(expr_to_stage(expr))[expr._id]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 563, in wrapper
cache[key] = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 425, in stage_to_result
return {expr._id: expr_to_pcoll(expr)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 563, in wrapper
cache[key] = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 433, in expr_to_pcoll
return stage_to_result(expr_to_stage(expr))[expr._id]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 563, in wrapper
cache[key] = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py", line 425, in stage_to_result
return {expr._id: expr_to_pcoll(expr)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py", line 602, in ror
raise ValueError(
ValueError: "[ConstantExpression[constant_int_2412180160336]]:2412182918816" requires a pipeline to be specified as there are no deferred inputs.

@liferoad
Copy link
Collaborator

liferoad commented Nov 7, 2024

Just checked. This PR is not part of Beam 2.60.0, which was cut on Oct 2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants