You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
PythonObjectArray is a subclass of ExtensionArray, but as such it is missing some of the mandatory methods described here, specifically isna, take, copy, and _concat_same_type (the last method interpolate does not seem to be required). Such storage of Python objects in Arrow blocks was added to Ray Data in #45272 and was released as part of Ray 2.33.
Custom Python objects currently cannot be wrapped/serialized due to pandas.errors.AbstractMethodError: This method must be defined in the concrete class PythonObjectArray failures, as shown by the contrived example below.
I have a draft PR that adds the missing methods (#48737), but it's currently failing one of the rllib tests (rllib: examples). I find that somewhat surprising but I haven't yet looked at why that could be happening, so I'd appreciate any pointers in the mean time. Thanks!
from dataclasses import dataclass, field
import pandas as pd
import ray
@dataclass
class Message:
data: dict = field(default_factory=dict)
class Stage:
def __init__(self, data_in, data_out):
self.data_in = data_in
self.data_out = data_out
def __call__(self, input_data):
df = pd.DataFrame(input_data[self.data_in]) # <--- point of failure when copy (one of the missing methods) gets called
dicts = df.to_dict("records")
data = self.run_stage(dicts)
return pd.DataFrame(data)
def run_stage(self, dicts):
raise NotImplementedError
class FirstStage(Stage):
def run_stage(self, dicts):
for entry in dicts:
value = entry[self.data_in].data["value"]
entry[self.data_out] = Message(data={"value": 2 * value})
return dicts
class SecondStage(Stage):
def run_stage(self, dicts):
for entry in dicts:
value = entry[self.data_in].data["value"]
entry[self.data_out] = Message(data={"value": 5 * value})
return dicts
ray.init()
dataset = ray.data.from_items(
[
{"input_data": Message(data={"value": 8})},
{"input_data": Message(data={"value": 10})},
]
)
for stage_class, data_in, data_out in zip(
[FirstStage, SecondStage],
["input_data", "intermediate_data"],
["intermediate_data", "final_data"],
):
map_args = {
"fn": stage_class,
"fn_constructor_args": (data_in, data_out),
"concurrency": 2,
"batch_format": "pandas",
}
dataset = dataset.map_batches(**map_args)
print(dataset.to_pandas()["final_data"])
ray.shutdown()
Issue Severity
High: It blocks me from completing my task.
The text was updated successfully, but these errors were encountered:
akavalar
added
bug
Something that is supposed to be working; but isn't
triage
Needs triage (eg: priority, bug/not-bug, and owning component)
labels
Nov 14, 2024
What happened + What you expected to happen
PythonObjectArray
is a subclass ofExtensionArray
, but as such it is missing some of the mandatory methods described here, specificallyisna
,take
,copy
, and_concat_same_type
(the last methodinterpolate
does not seem to be required). Such storage of Python objects in Arrow blocks was added to Ray Data in #45272 and was released as part of Ray 2.33.Custom Python objects currently cannot be wrapped/serialized due to
pandas.errors.AbstractMethodError: This method must be defined in the concrete class PythonObjectArray
failures, as shown by the contrived example below.I have a draft PR that adds the missing methods (#48737), but it's currently failing one of the
rllib
tests (rllib: examples
). I find that somewhat surprising but I haven't yet looked at why that could be happening, so I'd appreciate any pointers in the mean time. Thanks!Versions / Dependencies
Reproduction script
Issue Severity
High: It blocks me from completing my task.
The text was updated successfully, but these errors were encountered: