Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make IterableDataset.from_spark more efficient #5986

Merged
merged 7 commits into from
Jul 7, 2023

Conversation

mathewjacob1002
Copy link
Contributor

Moved the code from using collect() to using toLocalIterator, which allows for prefetching partitions that will be selected next, thus allowing for better performance when iterating.

…refetching of next partition. Also reordered the spark dataframe to be in the order it will be traversed, allowing prefetching to work better.
@mathewjacob1002 mathewjacob1002 marked this pull request as draft June 23, 2023 22:18
@mathewjacob1002 mathewjacob1002 marked this pull request as ready for review June 23, 2023 22:19
Copy link
Contributor

@maddiedawson maddiedawson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

for row in rows:
yield f"{partition_id}_{row_id}", row.asDict()
row_id += 1
partition_df, size_of_partitions = reorder_dataframe_by_partition(df_with_partition_id, partition_order)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of keeping track of partition sizes, it may be cleaner to just keep the part_id column but delete it from the row dict map before yielding it

… know which partition we are in, simply don't drop the part_id column, convert to pandas dataframe, and use that info
@@ -6,6 +6,7 @@

import numpy as np
import pyarrow as pa
import pyspark

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not import pyspark from here, since it will make pyspark as a dependency for datasets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we put it inside the generator then?

Copy link
Contributor

@maddiedawson maddiedawson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the added comments can be removed, I think the code is pretty self-explanatory

@@ -31,21 +32,37 @@ class SparkConfig(datasets.BuilderConfig):
features: Optional[datasets.Features] = None


def reorder_dataframe_by_partition(df: "pyspark.sql.DataFrame", new_partition_order: List[int]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a leading underscore to indicate that this function shouldn't be called from outside this file (_ reorder_dataframe_by_partition)

def _generate_iterable_examples(
df: "pyspark.sql.DataFrame",
partition_order: List[int],
):
import pyspark

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this back

row_id += 1
partition_df = _reorder_dataframe_by_partition(df_with_partition_id, partition_order)
row_id = 0
# pipeline partitions to hide latency
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "Prefetch partitions in parallel"

row_id = 0
# pipeline partitions to hide latency
rows = partition_df.toLocalIterator(prefetchPartitions=True)
last_partition = -1 # keep track of the last partition so that we can know when to reset row_id = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this variable to be "current_partition". Also you can remove the comment here

row_as_dict = row.asDict()
part_id = row_as_dict['part_id']
row_as_dict.pop('part_id')
if last_partition != part_id: # we are on new partition, reset row_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove the comment here

@maddiedawson
Copy link
Contributor

@lhoestq would you be able to review this please and also approve the workflow?

@lhoestq
Copy link
Member

lhoestq commented Jul 6, 2023

Sounds good to me :) feel free to run make style to apply code formatting

@HuggingFaceDocBuilderDev
Copy link

HuggingFaceDocBuilderDev commented Jul 6, 2023

The documentation is not available anymore as the PR was closed or merged.

@lhoestq
Copy link
Member

lhoestq commented Jul 6, 2023

cool ! I think we can merge once all comments have been addressed

@mathewjacob1002
Copy link
Contributor Author

@lhoestq I just addressed the comments and I think we can move ahead with this!

Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect ! :)

@lhoestq lhoestq merged commit 396cf94 into huggingface:main Jul 7, 2023
@github-actions
Copy link

github-actions bot commented Jul 7, 2023

Show benchmarks

PyArrow==8.0.0

Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.007734 / 0.011353 (-0.003619) 0.004608 / 0.011008 (-0.006400) 0.094466 / 0.038508 (0.055958) 0.086477 / 0.023109 (0.063368) 0.410311 / 0.275898 (0.134413) 0.455560 / 0.323480 (0.132080) 0.006112 / 0.007986 (-0.001874) 0.003845 / 0.004328 (-0.000483) 0.072506 / 0.004250 (0.068256) 0.066721 / 0.037052 (0.029669) 0.409967 / 0.258489 (0.151478) 0.460480 / 0.293841 (0.166639) 0.036700 / 0.128546 (-0.091847) 0.009854 / 0.075646 (-0.065792) 0.320936 / 0.419271 (-0.098335) 0.061002 / 0.043533 (0.017469) 0.413963 / 0.255139 (0.158824) 0.426787 / 0.283200 (0.143588) 0.029182 / 0.141683 (-0.112501) 1.685136 / 1.452155 (0.232981) 1.754590 / 1.492716 (0.261873)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.222698 / 0.018006 (0.204692) 0.505929 / 0.000490 (0.505440) 0.005291 / 0.000200 (0.005091) 0.000097 / 0.000054 (0.000042)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.032527 / 0.037411 (-0.004884) 0.094842 / 0.014526 (0.080317) 0.110138 / 0.176557 (-0.066418) 0.193786 / 0.737135 (-0.543349) 0.112593 / 0.296338 (-0.183745)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.441671 / 0.215209 (0.226461) 4.392961 / 2.077655 (2.315306) 2.161111 / 1.504120 (0.656991) 1.967080 / 1.541195 (0.425885) 2.065411 / 1.468490 (0.596920) 0.561080 / 4.584777 (-4.023697) 4.159612 / 3.745712 (0.413900) 6.435248 / 5.269862 (1.165386) 3.732338 / 4.565676 (-0.833339) 0.066156 / 0.424275 (-0.358119) 0.008030 / 0.007607 (0.000423) 0.532182 / 0.226044 (0.306137) 5.315142 / 2.268929 (3.046213) 2.680157 / 55.444624 (-52.764467) 2.303799 / 6.876477 (-4.572677) 2.530911 / 2.142072 (0.388838) 0.669504 / 4.805227 (-4.135723) 0.151940 / 6.500664 (-6.348724) 0.066999 / 0.075469 (-0.008470)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 1.424275 / 1.841788 (-0.417513) 21.550742 / 8.074308 (13.476434) 16.031414 / 10.191392 (5.840022) 0.194681 / 0.680424 (-0.485743) 0.020389 / 0.534201 (-0.513812) 0.429808 / 0.579283 (-0.149475) 0.457503 / 0.434364 (0.023139) 0.511522 / 0.540337 (-0.028816) 0.682621 / 1.386936 (-0.704315)
PyArrow==latest
Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.007519 / 0.011353 (-0.003834) 0.004445 / 0.011008 (-0.006563) 0.071946 / 0.038508 (0.033438) 0.082982 / 0.023109 (0.059873) 0.459938 / 0.275898 (0.184040) 0.504875 / 0.323480 (0.181395) 0.005805 / 0.007986 (-0.002181) 0.003740 / 0.004328 (-0.000589) 0.071998 / 0.004250 (0.067747) 0.062580 / 0.037052 (0.025527) 0.462263 / 0.258489 (0.203774) 0.506355 / 0.293841 (0.212514) 0.036321 / 0.128546 (-0.092225) 0.009830 / 0.075646 (-0.065816) 0.079810 / 0.419271 (-0.339461) 0.055291 / 0.043533 (0.011758) 0.464093 / 0.255139 (0.208954) 0.481109 / 0.283200 (0.197910) 0.026909 / 0.141683 (-0.114774) 1.652538 / 1.452155 (0.200383) 1.750713 / 1.492716 (0.257997)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.267552 / 0.018006 (0.249546) 0.502021 / 0.000490 (0.501531) 0.001635 / 0.000200 (0.001435) 0.000099 / 0.000054 (0.000044)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.033747 / 0.037411 (-0.003665) 0.104242 / 0.014526 (0.089716) 0.113829 / 0.176557 (-0.062728) 0.176242 / 0.737135 (-0.560893) 0.117002 / 0.296338 (-0.179336)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.476731 / 0.215209 (0.261522) 4.727054 / 2.077655 (2.649399) 2.589396 / 1.504120 (1.085276) 2.511180 / 1.541195 (0.969985) 2.634122 / 1.468490 (1.165632) 0.563840 / 4.584777 (-4.020937) 4.140212 / 3.745712 (0.394500) 6.188789 / 5.269862 (0.918928) 3.716897 / 4.565676 (-0.848780) 0.065823 / 0.424275 (-0.358452) 0.007705 / 0.007607 (0.000098) 0.566580 / 0.226044 (0.340535) 5.653306 / 2.268929 (3.384377) 3.028756 / 55.444624 (-52.415868) 2.592319 / 6.876477 (-4.284158) 2.614250 / 2.142072 (0.472178) 0.667135 / 4.805227 (-4.138093) 0.153455 / 6.500664 (-6.347209) 0.069321 / 0.075469 (-0.006148)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 1.541978 / 1.841788 (-0.299810) 21.747360 / 8.074308 (13.673052) 15.963657 / 10.191392 (5.772265) 0.192843 / 0.680424 (-0.487581) 0.020702 / 0.534201 (-0.513499) 0.433620 / 0.579283 (-0.145663) 0.467327 / 0.434364 (0.032963) 0.507398 / 0.540337 (-0.032940) 0.692797 / 1.386936 (-0.694140)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants