Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ib.collect(...) to take multiple PCollections. #32392

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 58 additions & 38 deletions sdks/python/apache_beam/runners/interactive/interactive_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,11 +876,12 @@ def show(

@progress_indicated
def collect(
pcoll,
*pcolls,
n='inf',
duration='inf',
include_window_info=False,
force_compute=False):
force_compute=False,
force_tuple=False):
"""Materializes the elements from a PCollection into a Dataframe.

This reads each element from file and reads only the amount that it needs
Expand All @@ -889,13 +890,16 @@ def collect(
it is assumed to be infinite.

Args:
pcolls: PCollections to compute.
n: (optional) max number of elements to visualize. Default 'inf'.
duration: (optional) max duration of elements to read in integer seconds or
a string duration. Default 'inf'.
include_window_info: (optional) if True, appends the windowing information
to each row. Default False.
force_compute: (optional) if True, forces recomputation rather than using
cached PCollections
force_tuple: (optional) if True, return a 1-tuple or results rather than
the bare results if only one PCollection is computed

For example::

Expand All @@ -906,17 +910,27 @@ def collect(
# Run the pipeline and bring the PCollection into memory as a Dataframe.
in_memory_square = head(square, n=5)
"""
# Remember the element type so we can make an informed decision on how to
# collect the result in elements_to_df.
if isinstance(pcoll, DeferredBase):
# Get the proxy so we can get the output shape of the DataFrame.
pcoll, element_type = deferred_df_to_pcollection(pcoll)
watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
else:
element_type = pcoll.element_type
if len(pcolls) == 0:
return ()

def as_pcollection(pcoll_or_df):
if isinstance(pcoll_or_df, DeferredBase):
# Get the proxy so we can get the output shape of the DataFrame.
pcoll, element_type = deferred_df_to_pcollection(pcoll_or_df)
watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
return pcoll, element_type
elif isinstance(pcoll_or_df, beam.pvalue.PCollection):
return pcoll_or_df, pcoll_or_df.element_type
else:
raise TypeError(f'{pcoll} is not an apache_beam.pvalue.PCollection.')

assert isinstance(pcoll, beam.pvalue.PCollection), (
'{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
pcolls_with_element_types = [as_pcollection(p) for p in pcolls]
pcolls_to_element_types = dict(pcolls_with_element_types)
pcolls = [pcoll for pcoll, _ in pcolls_with_element_types]
pipelines = set(pcoll.pipeline for pcoll in pcolls)
if len(pipelines) != 1:
raise ValueError('All PCollections must belong to the same pipeline.')
pipeline, = pipelines

if isinstance(n, str):
assert n == 'inf', (
Expand All @@ -935,45 +949,51 @@ def collect(
if duration == 'inf':
duration = float('inf')

user_pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
user_pipeline = ie.current_env().user_pipeline(pipeline)
# Possibly collecting a PCollection defined in a local scope that is not
# explicitly watched. Ad hoc watch it though it's a little late.
if not user_pipeline:
watch({'anonymous_pipeline_{}'.format(id(pcoll.pipeline)): pcoll.pipeline})
user_pipeline = pcoll.pipeline
watch({'anonymous_pipeline_{}'.format(id(pipeline)): pipeline})
user_pipeline = pipeline
recording_manager = ie.current_env().get_recording_manager(
user_pipeline, create_if_absent=True)

# If already computed, directly read the stream and return.
if pcoll in ie.current_env().computed_pcollections and not force_compute:
pcoll_name = find_pcoll_name(pcoll)
elements = list(
recording_manager.read(pcoll_name, pcoll, n, duration).read())
return elements_to_df(
elements,
include_window_info=include_window_info,
element_type=element_type)

recording = recording_manager.record([pcoll],
max_n=n,
max_duration=duration,
force_compute=force_compute)

try:
elements = list(recording.stream(pcoll).read())
except KeyboardInterrupt:
recording.cancel()
return pd.DataFrame()
computed = {}
for pcoll in pcolls_to_element_types.keys():
if pcoll in ie.current_env().computed_pcollections and not force_compute:
pcoll_name = find_pcoll_name(pcoll)
computed[pcoll] = list(
recording_manager.read(pcoll_name, pcoll, n, duration).read())

uncomputed = set(pcolls) - set(computed.keys())
if uncomputed:
recording = recording_manager.record(
uncomputed, max_n=n, max_duration=duration, force_compute=force_compute)

try:
for pcoll in uncomputed:
computed[pcoll] = list(recording.stream(pcoll).read())
except KeyboardInterrupt:
recording.cancel()

if n == float('inf'):
n = None

# Collecting DataFrames may have a length > n, so slice again to be sure. Note
# that array[:None] returns everything.
return elements_to_df(
elements,
include_window_info=include_window_info,
element_type=element_type)[:n]
empty = pd.DataFrame()
result_tuple = tuple(
elements_to_df(
computed[pcoll],
include_window_info=include_window_info,
element_type=pcolls_to_element_types[pcoll])[:n] if pcoll in
computed else empty for pcoll in pcolls)

if len(result_tuple) == 1 and not force_tuple:
return result_tuple[0]
else:
return result_tuple


@progress_indicated
Expand Down
Loading
Loading