Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable larger join tests #645

Merged
merged 6 commits into from
Jan 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions tests/benchmarks/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,9 @@
# (mem_mult, shuffle)
params = [
(0.1, "tasks"),
# shuffling takes a long time with 1 or higher
(0.1, "p2p"),
pytest.param(
1, "p2p", marks=pytest.mark.skip(reason="client OOMs, see coiled-runtime#633")
),
pytest.param(
10, "p2p", marks=pytest.mark.skip(reason="client OOMs, see coiled-runtime#633")
),
(1, "tasks"),
(1, "p2p"),
]


Expand All @@ -24,17 +19,21 @@ def test_join_big(small_client, mem_mult, shuffle):
with dask.config.set(shuffle=shuffle):
memory = cluster_memory(small_client) # 76.66 GiB

df1_big = timeseries_of_size(memory * mem_mult)
df1_big["x2"] = df1_big["x"] * 1e9
df1_big = df1_big.astype({"x2": "int"})
df1_big = timeseries_of_size(
memory * mem_mult, dtypes={str(i): float for i in range(100)}
) # 66.58 MiB partitions
df1_big["predicate"] = df1_big["0"] * 1e9
df1_big = df1_big.astype({"predicate": "int"})

df2_big = timeseries_of_size(memory * mem_mult)
df2_big = timeseries_of_size(
memory * mem_mult, dtypes={str(i): float for i in range(100)}
) # 66.58 MiB partitions

# Control cardinality on column to join - this produces cardinality ~ to len(df)
df2_big["x2"] = df2_big["x"] * 1e9
df2_big = df2_big.astype({"x2": "int"})
df2_big["predicate"] = df2_big["0"] * 1e9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why did you choose the name "predicate"?

Copy link
Member Author

@hendrikmakait hendrikmakait Jan 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's me coming from a DB background. I wanted a name that's more descriptive in this context than x2 and since it's the column used in the join predicate (i.e., the expression used to merge the tables/dataframes), that's what I ended up with. This could also be merge_col or something like that if you find that easier to understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's good, no need to change it, it was more out of curiosity and I just learn something new :)
I'm merging this in.

df2_big = df2_big.astype({"predicate": "int"})

join = dd.merge(df1_big, df2_big, on="x2", how="inner")
join = dd.merge(df1_big, df2_big, on="predicate", how="inner")
result = join.size
wait(result, small_client, 20 * 60)

Expand All @@ -44,17 +43,21 @@ def test_join_big_small(small_client, mem_mult, shuffle):
with dask.config.set(shuffle=shuffle):
memory = cluster_memory(small_client) # 76.66 GiB

df_big = timeseries_of_size(memory * mem_mult)
df_big = timeseries_of_size(
memory * mem_mult, dtypes={str(i): float for i in range(100)}
) # 66.58 MiB partitions

# Control cardinality on column to join - this produces cardinality ~ to len(df)
df_big["x2"] = df_big["x"] * 1e9
df_big = df_big.astype({"x2": "int"})
df_big["predicate"] = df_big["0"] * 1e9
df_big = df_big.astype({"predicate": "int"})

df_small = timeseries_of_size("50 MB") # make it obviously small
df_small = timeseries_of_size(
"100 MB", dtypes={str(i): float for i in range(100)}
) # make it obviously small

df_small["x2"] = df_small["x"] * 1e9
df_small_pd = df_small.astype({"x2": "int"}).compute()
df_small["predicate"] = df_small["0"] * 1e9
df_small_pd = df_small.astype({"predicate": "int"}).compute()

join = dd.merge(df_big, df_small_pd, on="x2", how="inner")
join = dd.merge(df_big, df_small_pd, on="predicate", how="inner")
result = join.size
wait(result, small_client, 20 * 60)