Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable colocated join by default #16381

Conversation

radek-kondziolka
Copy link
Contributor

@radek-kondziolka radek-kondziolka commented Mar 6, 2023

Description

This PR enables colocated joins by default.

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Hive
* Improve performance of queries with joins where both sides of a join have the same table bucketing definition on the join keys. ({issue}`16381`)

@cla-bot cla-bot bot added the cla-signed label Mar 6, 2023
@radek-kondziolka radek-kondziolka force-pushed the rk/enable-colocated-join-by-default branch from 23a87b6 to e4b7c17 Compare March 7, 2023 14:22
@radek-kondziolka radek-kondziolka force-pushed the rk/enable-colocated-join-by-default branch 2 times, most recently from 5396d2b to e4ee9e7 Compare March 8, 2023 10:43
@github-actions github-actions bot added the docs label Mar 8, 2023
@radek-kondziolka radek-kondziolka force-pushed the rk/enable-colocated-join-by-default branch from e4ee9e7 to 019ff87 Compare March 9, 2023 11:11
@radek-kondziolka
Copy link
Contributor Author

Some results about colocated join (enabled by default from now):

Intro info:

SHOW CREATE TABLE hive.default.table
                      Create Table
---------------------------------------------------------
 CREATE TABLE table (
    c bigint,
    name varchar(80),
    class varchar(50)
 )
 WITH (
    bucket_count = 100,
    bucketed_by = ARRAY['c'],
    bucketing_version = 1,
    format = 'ORC',
    partitioned_by = ARRAY['class'],
    sorted_by = ARRAY[]
 )

explain analyze for: explain select * from table j1 join table j2 on j1.c = j2.c where length(j1.name) > 45;

1. When colocated-join is enabled (default one)

 Trino version: 407-369-g019ff87
 Queued: 237.69us, Analysis: 5.86ms, Planning: 173.59ms, Execution: 29.47s
 Fragment 1 [hive:HivePartitioningHandle{buckets=100, hiveTypes=[bigint]}]
     CPU: 1.59m, Scheduled: 3.07m, Blocked 3.48h (Input: 0.00ns, Output: 3.42h), Input: 200000 rows (12.01MB); per task: avg.: 33333.33 std.dev.: 9427.38, Output: 499870075 rows (40.70GB)
     Output layout: [c, name, class, name_1, class_2]
     Output partitioning: SINGLE []
     InnerJoin[criteria = ("c" = "c_0"), hash = [$hashvalue, $hashvalue_8], distribution = PARTITIONED]
     │   Layout: [c:bigint, name:varchar(80), class:varchar(50), name_1:varchar(80), class_2:varchar(50)]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
     │   CPU: 1.40m (89.64%), Scheduled: 1.61m (53.29%), Blocked: 3.47h (99.99%), Output: 499870075 rows (40.70GB)
     │   Left (probe) Input avg.: 100.00 rows, Input std.dev.: 9.69%
     │   Right (build) Input avg.: 8331.17 rows, Input std.dev.: 28.21%
     │   Distribution: PARTITIONED
     │   dynamicFilterAssignments = {c_0 -> #df_378}
     ├─ ScanFilterProject[table = hive:default:table buckets=100, dynamicFilters = {"c" = #df_378}]
     │      Layout: [c:bigint, name:varchar(80), class:varchar(50), $hashvalue:bigint]
     │      Estimates: {rows: 100000 (2.85MB), cpu: 1.99M, memory: 0B, network: 0B}/{rows: 100000 (2.85MB), cpu: 1.99M, memory: 0B, network: 0B}/{rows: 100000 (2.85MB), cpu: 2.85M, memory: 0B, network: 0B}
     │      CPU: 3.17s (3.39%), Scheduled: 29.74s (16.40%), Blocked: 0.00ns (0.00%), Output: 100000 rows (7.47MB)
     │      Input avg.: 100.00 rows, Input std.dev.: 9.69%
     │      $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("c"), 0))
     │      c := c:bigint:REGULAR
     │      name := name:varchar(80):REGULAR
     │      class := class:varchar(50):PARTITION_KEY
     │          :: [[0, 99]]
     │      Input: 100000 rows (6.00MB), Filtered: 0.00%, Physical input: 2.01MB, Physical input time: 27320.00ms
     │      Dynamic filters:
     │          - df_378, [ SortedRangeSet[type=bigint, ranges=10, {[0], ..., [9]}] ], collection time=351.18ms
     └─ LocalExchange[partitioning = SINGLE]
        │   Layout: [c_0:bigint, name_1:varchar(80), class_2:varchar(50), $hashvalue_8:bigint]
        │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
        │   CPU: 52.00ms (0.06%), Scheduled: 54.00ms (0.03%), Blocked: 1.06s (0.01%), Output: 49987 rows (4.45MB)
        │   Input avg.: 49.99 rows, Input std.dev.: 13.40%
        └─ ScanFilterProject[table = hive:default:table buckets=100, filterPredicate = (length("name_1") > BIGINT '48')]
               Layout: [c_0:bigint, name_1:varchar(80), class_2:varchar(50), $hashvalue_9:bigint]
               Estimates: {rows: 100000 (2.85MB), cpu: 1.99M, memory: 0B, network: 0B}/{rows: ? (?), cpu: 1.99M, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
               CPU: 6.47s (6.92%), Scheduled: 54.89s (30.28%), Blocked: 0.00ns (0.00%), Output: 49987 rows (4.45MB)
               Input avg.: 100.00 rows, Input std.dev.: 9.69%
               $hashvalue_9 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("c_0"), 0))
               name_1 := name:varchar(80):REGULAR
               c_0 := c:bigint:REGULAR
               class_2 := class:varchar(50):PARTITION_KEY
                   :: [[0, 99]]
               Input: 100000 rows (6.00MB), Filtered: 50.01%, Physical input: 2.01MB, Physical input time: 53550.00ms

2. When colocated join is disabled - there is more data shuffling.

 Trino version: 407-369-g019ff87
 Queued: 255.20us, Analysis: 2.49s, Planning: 3.58s, Execution: 39.65s
 Fragment 1 [hive:HivePartitioningHandle{buckets=100, hiveTypes=[bigint]}]
     CPU: 2.14m, Scheduled: 3.96m, Blocked 1.90h (Input: 13.58m, Output: 1.46h), Input: 149987 rows (10.46MB); per task: avg.: 24997.83 std.dev.: 7063.73, Output: 499870075 rows (40.70GB)
     Output layout: [c, name, class, name_1, class_2]
     Output partitioning: SINGLE []
     InnerJoin[criteria = ("c" = "c_0"), hash = [$hashvalue, $hashvalue_8], distribution = PARTITIONED]
     │   Layout: [c:bigint, name:varchar(80), class:varchar(50), name_1:varchar(80), class_2:varchar(50)]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
     │   CPU: 2.01m (62.15%), Scheduled: 3.15m (11.56%), Blocked: 1.67h (87.75%), Output: 499870075 rows (40.70GB)
     │   Left (probe) Input avg.: 100.00 rows, Input std.dev.: 9.69%
     │   Right (build) Input avg.: 8331.17 rows, Input std.dev.: 28.21%
     │   Distribution: PARTITIONED
     │   dynamicFilterAssignments = {c_0 -> #df_383}
     ├─ ScanFilterProject[table = hive:default:table buckets=100, dynamicFilters = {"c" = #df_383}]
     │      Layout: [c:bigint, name:varchar(80), class:varchar(50), $hashvalue:bigint]
     │      Estimates: {rows: 100000 (2.85MB), cpu: 1.99M, memory: 0B, network: 0B}/{rows: 100000 (2.85MB), cpu: 1.99M, memory: 0B, network: 0B}/{rows: 100000 (2.85MB), cpu: 2.85M, memory: 0B, network: 0B}
     │      CPU: 5.94s (3.06%), Scheduled: 43.22s (2.64%), Blocked: 0.00ns (0.00%), Output: 100000 rows (7.47MB)
     │      Input avg.: 100.00 rows, Input std.dev.: 9.69%
     │      $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("c"), 0))
     │      c := c:bigint:REGULAR
     │      name := name:varchar(80):REGULAR
     │      class := class:varchar(50):PARTITION_KEY
     │          :: [[0, 99]]
     │      Input: 100000 rows (6.00MB), Filtered: 0.00%, Physical input: 2.01MB, Physical input time: 33970.00ms
     │      Dynamic filters:
     │          - df_383, [ SortedRangeSet[type=bigint, ranges=10, {[0], ..., [9]}] ], collection time=5.00s
     └─ LocalExchange[partitioning = SINGLE]
        │   Layout: [c_0:bigint, name_1:varchar(80), class_2:varchar(50), $hashvalue_8:bigint]
        │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
        │   CPU: 43.00ms (0.02%), Scheduled: 55.00ms (0.00%), Blocked: 25.32s (0.37%), Output: 49987 rows (4.45MB)
        │   Input avg.: 260.35 rows, Input std.dev.: 89.13%
        └─ RemoteSource[sourceFragmentIds = [2]]
               Layout: [c_0:bigint, name_1:varchar(80), class_2:varchar(50), $hashvalue_9:bigint]
               CPU: 103.00ms (0.05%), Scheduled: 329.00ms (0.02%), Blocked: 13.58m (11.88%), Output: 49987 rows (4.45MB)
               Input avg.: 260.35 rows, Input std.dev.: 89.13%

 Fragment 2 [hive:HivePartitioningHandle{buckets=100, hiveTypes=[bigint]}]
     CPU: 1.13m, Scheduled: 23.46m, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 100000 rows (6.00MB); per task: avg.: 16666.67 std.dev.: 4713.69, Output: 49098 rows (4.37MB)
     Output layout: [c_0, name_1, class_2, $hashvalue_10]
     Output partitioning: hive:HivePartitioningHandle{buckets=100, hiveTypes=[bigint]} [c_0]
     ScanFilterProject[table = hive:default:table buckets=100, filterPredicate = (length("name_1") > BIGINT '48')]
         Layout: [c_0:bigint, name_1:varchar(80), class_2:varchar(50), $hashvalue_10:bigint]
         Estimates: {rows: 100000 (2.85MB), cpu: 1.99M, memory: 0B, network: 0B}/{rows: ? (?), cpu: 1.99M, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         CPU: 1.12m (34.71%), Scheduled: 23.42m (85.78%), Blocked: 0.00ns (0.00%), Output: 49098 rows (4.37MB)
         Input avg.: 100.00 rows, Input std.dev.: 9.69%
         $hashvalue_10 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("c_0"), 0))
         name_1 := name:varchar(80):REGULAR
         c_0 := c:bigint:REGULAR
         class_2 := class:varchar(50):PARTITION_KEY
             :: [[0, 99]]
         Input: 100000 rows (6.00MB), Filtered: 50.90%, Physical input: 2.01MB, Physical input time: 1012800.00ms

@raunaqmorarka raunaqmorarka merged commit 0b2e189 into trinodb:master Mar 13, 2023
@github-actions github-actions bot added this to the 411 milestone Mar 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants