From fd1b7eb9ecbf1d250be9a6e253df751e9bfb0a92 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Tue, 30 Jul 2024 18:35:47 -0400 Subject: [PATCH 01/15] [batch] Compact And Drop Records from `job_group_inst_coll_cancellable_resources` Resolves: #14623 --- batch/batch/driver/main.py | 85 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index ac6f99d756e..b8520443cae 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1492,6 +1492,90 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data ) +async def compact_job_group_cancellable_resources_records(app, db: Database): + if not app['feature_flags']['compact_billing_tables']: + return + + keyfields = [ + 'batch_id', + 'update_id', + 'job_group_id', + 'inst_coll', + ] + + rowfields = [ + *keyfields, + 'token', + 'n_creating_cancellable_jobs', + 'n_ready_cancellable_jobs', + 'n_running_cancellable_jobs', + 'ready_cancellable_cores_mcpu', + 'running_cancellable_cores_mcpu', + ] + + @transaction(db) + async def compact(tx: Transaction, record: dict): + await tx.just_execute( + """\ +DELETE FROM job_group_inst_coll_cancellable_resources +WHERE batch_id = %s, update_id = %s, job_group_id = %s, inst_coll = %s +""", + (record[k] for k in keyfields), + ) + + await tx.execute_insertone( + f"""\ +INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)}) +VALUES ({','.join(['%s' for _ in rowfields])}), +""", + [{**record, 'token': 0}[k] for k in rowfields], + ) + + targets = db.execute_and_fetchall( + """\ +SELECT R.* +FROM job_groups AS G +INNER JOIN job_group_self_and_ancestors AS D + ON G.batch_id = D.batch_id + AND G.job_group_id = D.job_group_id +LEFT JOIN job_groups_cancelled AS C + ON C.id = G.batch_id + AND C.job_group_id = D.ancestor_id +INNER JOIN LATERAL ( + SELECT R.batch_id + , R.update_id + , R.job_group_id + , R.inst_coll + , SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs + , SUM(R.n_ready_cancellable_jobs) AS n_ready_cancellable_jobs + , SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs + , SUM(R.ready_cancellable_cores_mcpu) AS ready_cancellable_cores_mcpu + , SUM(R.running_cancellable_cores_mcpu) AS running_cancellable_cores_mcpu + , COUNT(*) as `count` + FROM job_group_inst_coll_cancellable_resources AS R + WHERE R.batch_id = G.batch_id + AND R.job_group_id = G.job_group_id + GROUP BY R.batch_id + , R.update_id + , R.job_group_id + , R.inst_coll + ORDER BY R.batch_id ASC + , R.update_id ASC + , R.job_group_id ASC + , R.inst_coll ASC +) AS R ON TRUE +WHERE G.time_completed IS NOT NULL + AND C.id IS NULL + AND R.`count` > 1 +LIMIT 1000 +""", + query_name='find_finished_cancellable_resources_records_to_compact', + ) + + async for target in targets: + await compact(target) + + async def compact_agg_billing_project_users_table(app, db: Database): if not app['feature_flags']['compact_billing_tables']: return @@ -1754,6 +1838,7 @@ async def close_and_wait(): task_manager.ensure_future(periodically_call(60, compact_agg_billing_project_users_by_date_table, app, db)) task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db)) task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db)) + task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, app, db)) async def on_cleanup(app): From 31d74b87aab1a21d3ab77caa9f108bc9bbc62993 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Tue, 30 Jul 2024 18:48:30 -0400 Subject: [PATCH 02/15] dont need to `ORDER BY` anymore --- batch/batch/driver/main.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index b8520443cae..61ddbc2f7ff 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1559,10 +1559,6 @@ async def compact(tx: Transaction, record: dict): , R.update_id , R.job_group_id , R.inst_coll - ORDER BY R.batch_id ASC - , R.update_id ASC - , R.job_group_id ASC - , R.inst_coll ASC ) AS R ON TRUE WHERE G.time_completed IS NOT NULL AND C.id IS NULL From b89f5bed1d654f2be8b33a4121c62ed277496300 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Tue, 30 Jul 2024 21:11:57 -0400 Subject: [PATCH 03/15] delete unused records --- batch/batch/driver/main.py | 66 ++++++++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 61ddbc2f7ff..2abb7333ab1 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1516,9 +1516,9 @@ async def compact_job_group_cancellable_resources_records(app, db: Database): @transaction(db) async def compact(tx: Transaction, record: dict): await tx.just_execute( - """\ + f"""\ DELETE FROM job_group_inst_coll_cancellable_resources -WHERE batch_id = %s, update_id = %s, job_group_id = %s, inst_coll = %s +WHERE {','.join([f'{k} = %s' for k in keyfields])}; """, (record[k] for k in keyfields), ) @@ -1526,7 +1526,7 @@ async def compact(tx: Transaction, record: dict): await tx.execute_insertone( f"""\ INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)}) -VALUES ({','.join(['%s' for _ in rowfields])}), +VALUES ({','.join(['%s' for _ in rowfields])});, """, [{**record, 'token': 0}[k] for k in rowfields], ) @@ -1551,7 +1551,6 @@ async def compact(tx: Transaction, record: dict): , SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs , SUM(R.ready_cancellable_cores_mcpu) AS ready_cancellable_cores_mcpu , SUM(R.running_cancellable_cores_mcpu) AS running_cancellable_cores_mcpu - , COUNT(*) as `count` FROM job_group_inst_coll_cancellable_resources AS R WHERE R.batch_id = G.batch_id AND R.job_group_id = G.job_group_id @@ -1559,11 +1558,11 @@ async def compact(tx: Transaction, record: dict): , R.update_id , R.job_group_id , R.inst_coll + HAVING COUNT(*) > 1 ) AS R ON TRUE WHERE G.time_completed IS NOT NULL AND C.id IS NULL - AND R.`count` > 1 -LIMIT 1000 +LIMIT 1000; """, query_name='find_finished_cancellable_resources_records_to_compact', ) @@ -1572,6 +1571,60 @@ async def compact(tx: Transaction, record: dict): await compact(target) +async def delete_dead_job_group_cancellable_resources_records(db: Database): + keyfields = [ + 'batch_id', + 'update_id', + 'job_group_id', + 'inst_coll', + ] + + targets = db.execute_and_fetchall( + f"""\ +SELECT {','.join([f'R.{k}' for k in keyfields])} +FROM job_groups AS G +INNER JOIN job_group_self_and_ancestors AS D + ON G.batch_id = D.batch_id + AND G.job_group_id = D.job_group_id +LEFT JOIN job_groups_cancelled AS C + ON C.id = G.batch_id + AND C.job_group_id = D.ancestor_id +INNER JOIN LATERAL ( + SELECT R.batch_id + , R.update_id + , R.job_group_id + , R.inst_coll + FROM job_group_inst_coll_cancellable_resources AS R + WHERE R.batch_id = G.batch_id + AND R.job_group_id = G.job_group_id + GROUP BY R.batch_id + , R.update_id + , R.job_group_id + , R.inst_coll + HAVING COUNT(*) = 1 + AND MAX(R.n_creating_cancellable_jobs) = 0 + AND MAX(R.n_ready_cancellable_jobs) = 0 + AND MAX(R.n_running_cancellable_jobs) = 0 + AND MAX(R.ready_cancellable_cores_mcpu) = 0 + AND MAX(R.running_cancellable_cores_mcpu) = 0 +) AS R ON TRUE +WHERE G.time_completed IS NOT NULL + AND C.id IS NULL +LIMIT 1000; +""", + query_name='find_dead_cancellable_resources_records_to_delete', + ) + + async for target in targets: + await db.just_execute( + f"""\ +DELETE FROM job_group_inst_coll_cancellable_resources +WHERE {','.join([f'{k} = %s' for k in keyfields])}; +""", + (target[k] for k in keyfields), + ) + + async def compact_agg_billing_project_users_table(app, db: Database): if not app['feature_flags']['compact_billing_tables']: return @@ -1835,6 +1888,7 @@ async def close_and_wait(): task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db)) task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db)) task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, app, db)) + task_manager.ensure_future(periodically_call(60, delete_dead_job_group_cancellable_resources_records, app, db)) async def on_cleanup(app): From 494a9a598d86682daa62dc09daecd0ae9c0a47e7 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Tue, 30 Jul 2024 21:19:02 -0400 Subject: [PATCH 04/15] hack less --- batch/batch/driver/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 2abb7333ab1..c62681a85c8 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1528,7 +1528,7 @@ async def compact(tx: Transaction, record: dict): INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)}) VALUES ({','.join(['%s' for _ in rowfields])});, """, - [{**record, 'token': 0}[k] for k in rowfields], + (record[k] for k in rowfields), ) targets = db.execute_and_fetchall( @@ -1568,7 +1568,7 @@ async def compact(tx: Transaction, record: dict): ) async for target in targets: - await compact(target) + await compact({**target, 'token': 0}) async def delete_dead_job_group_cancellable_resources_records(db: Database): From 95e82957a7b1f0bbb313f831cd09284221babc57 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 12:09:02 -0400 Subject: [PATCH 05/15] fix nargin error --- batch/batch/driver/main.py | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index c62681a85c8..b759141ac33 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1531,8 +1531,9 @@ async def compact(tx: Transaction, record: dict): (record[k] for k in rowfields), ) + keys = ','.join([f'R.{k}' for k in keyfields]) targets = db.execute_and_fetchall( - """\ + f"""\ SELECT R.* FROM job_groups AS G INNER JOIN job_group_self_and_ancestors AS D @@ -1542,10 +1543,7 @@ async def compact(tx: Transaction, record: dict): ON C.id = G.batch_id AND C.job_group_id = D.ancestor_id INNER JOIN LATERAL ( - SELECT R.batch_id - , R.update_id - , R.job_group_id - , R.inst_coll + SELECT {keys} , SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs , SUM(R.n_ready_cancellable_jobs) AS n_ready_cancellable_jobs , SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs @@ -1554,10 +1552,7 @@ async def compact(tx: Transaction, record: dict): FROM job_group_inst_coll_cancellable_resources AS R WHERE R.batch_id = G.batch_id AND R.job_group_id = G.job_group_id - GROUP BY R.batch_id - , R.update_id - , R.job_group_id - , R.inst_coll + GROUP BY {keys} HAVING COUNT(*) > 1 ) AS R ON TRUE WHERE G.time_completed IS NOT NULL @@ -1579,9 +1574,10 @@ async def delete_dead_job_group_cancellable_resources_records(db: Database): 'inst_coll', ] + keys = ','.join([f'R.{k}' for k in keyfields]) targets = db.execute_and_fetchall( f"""\ -SELECT {','.join([f'R.{k}' for k in keyfields])} +SELECT {keys} FROM job_groups AS G INNER JOIN job_group_self_and_ancestors AS D ON G.batch_id = D.batch_id @@ -1590,17 +1586,11 @@ async def delete_dead_job_group_cancellable_resources_records(db: Database): ON C.id = G.batch_id AND C.job_group_id = D.ancestor_id INNER JOIN LATERAL ( - SELECT R.batch_id - , R.update_id - , R.job_group_id - , R.inst_coll + SELECT {keys} FROM job_group_inst_coll_cancellable_resources AS R WHERE R.batch_id = G.batch_id AND R.job_group_id = G.job_group_id - GROUP BY R.batch_id - , R.update_id - , R.job_group_id - , R.inst_coll + GROUP BY {keys} HAVING COUNT(*) = 1 AND MAX(R.n_creating_cancellable_jobs) = 0 AND MAX(R.n_ready_cancellable_jobs) = 0 @@ -1888,7 +1878,7 @@ async def close_and_wait(): task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db)) task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db)) task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, app, db)) - task_manager.ensure_future(periodically_call(60, delete_dead_job_group_cancellable_resources_records, app, db)) + task_manager.ensure_future(periodically_call(60, delete_dead_job_group_cancellable_resources_records, db)) async def on_cleanup(app): From 0d10677f4366de1756ec666971d39b0c67563fd2 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 12:32:38 -0400 Subject: [PATCH 06/15] dont pass generator to db methods --- batch/batch/driver/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index b759141ac33..7225c419b96 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1520,7 +1520,7 @@ async def compact(tx: Transaction, record: dict): DELETE FROM job_group_inst_coll_cancellable_resources WHERE {','.join([f'{k} = %s' for k in keyfields])}; """, - (record[k] for k in keyfields), + [record[k] for k in keyfields], ) await tx.execute_insertone( @@ -1528,7 +1528,7 @@ async def compact(tx: Transaction, record: dict): INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)}) VALUES ({','.join(['%s' for _ in rowfields])});, """, - (record[k] for k in rowfields), + [record[k] for k in rowfields], ) keys = ','.join([f'R.{k}' for k in keyfields]) @@ -1611,7 +1611,7 @@ async def delete_dead_job_group_cancellable_resources_records(db: Database): DELETE FROM job_group_inst_coll_cancellable_resources WHERE {','.join([f'{k} = %s' for k in keyfields])}; """, - (target[k] for k in keyfields), + [target[k] for k in keyfields], ) From 634c10b554d4bd8ff798224e49000200801342cc Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 12:44:57 -0400 Subject: [PATCH 07/15] join lateral wins again --- batch/batch/driver/main.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 7225c419b96..0321aa927fd 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1536,12 +1536,14 @@ async def compact(tx: Transaction, record: dict): f"""\ SELECT R.* FROM job_groups AS G -INNER JOIN job_group_self_and_ancestors AS D - ON G.batch_id = D.batch_id - AND G.job_group_id = D.job_group_id -LEFT JOIN job_groups_cancelled AS C - ON C.id = G.batch_id - AND C.job_group_id = D.ancestor_id +LEFT JOIN LATERAL ( + SELECT C.id FROM job_groups_cancelled AS C + INNER JOIN job_group_self_and_ancestors AS D + ON C.id = D.batch_id + AND C.job_group_id = D.job_group_id + WHERE D.batch_id = G.batch_id + AND D.ancestor_id = G.job_group_id +) AS C ON TRUE INNER JOIN LATERAL ( SELECT {keys} , SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs @@ -1579,12 +1581,14 @@ async def delete_dead_job_group_cancellable_resources_records(db: Database): f"""\ SELECT {keys} FROM job_groups AS G -INNER JOIN job_group_self_and_ancestors AS D - ON G.batch_id = D.batch_id - AND G.job_group_id = D.job_group_id -LEFT JOIN job_groups_cancelled AS C - ON C.id = G.batch_id - AND C.job_group_id = D.ancestor_id +LEFT JOIN LATERAL ( + SELECT C.id FROM job_groups_cancelled AS C + INNER JOIN job_group_self_and_ancestors AS D + ON C.id = D.batch_id + AND C.job_group_id = D.job_group_id + WHERE D.batch_id = G.batch_id + AND D.ancestor_id = G.job_group_id +) AS C ON TRUE INNER JOIN LATERAL ( SELECT {keys} FROM job_group_inst_coll_cancellable_resources AS R From 9cdc9d78ca805026df3141846b43172152f1a9bc Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 12:47:09 -0400 Subject: [PATCH 08/15] formatting --- batch/batch/driver/main.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 0321aa927fd..1d8f6e8c58d 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1539,8 +1539,8 @@ async def compact(tx: Transaction, record: dict): LEFT JOIN LATERAL ( SELECT C.id FROM job_groups_cancelled AS C INNER JOIN job_group_self_and_ancestors AS D - ON C.id = D.batch_id - AND C.job_group_id = D.job_group_id + ON C.id = D.batch_id + AND C.job_group_id = D.job_group_id WHERE D.batch_id = G.batch_id AND D.ancestor_id = G.job_group_id ) AS C ON TRUE @@ -1584,8 +1584,8 @@ async def delete_dead_job_group_cancellable_resources_records(db: Database): LEFT JOIN LATERAL ( SELECT C.id FROM job_groups_cancelled AS C INNER JOIN job_group_self_and_ancestors AS D - ON C.id = D.batch_id - AND C.job_group_id = D.job_group_id + ON C.id = D.batch_id + AND C.job_group_id = D.job_group_id WHERE D.batch_id = G.batch_id AND D.ancestor_id = G.job_group_id ) AS C ON TRUE From 21e24216a900c71adaf90428359f0e290a0e08af Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 13:07:44 -0400 Subject: [PATCH 09/15] sql syntax error --- batch/batch/driver/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 1d8f6e8c58d..2a2565d7bc5 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1518,7 +1518,7 @@ async def compact(tx: Transaction, record: dict): await tx.just_execute( f"""\ DELETE FROM job_group_inst_coll_cancellable_resources -WHERE {','.join([f'{k} = %s' for k in keyfields])}; +WHERE {'AND'.join([f'{k} = %s' for k in keyfields])}; """, [record[k] for k in keyfields], ) From 7a3a33c586519149d9662ce30245c8cb31c588c0 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 13:13:18 -0400 Subject: [PATCH 10/15] sql syntax error --- batch/batch/driver/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 2a2565d7bc5..552e9630f13 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1613,7 +1613,7 @@ async def delete_dead_job_group_cancellable_resources_records(db: Database): await db.just_execute( f"""\ DELETE FROM job_group_inst_coll_cancellable_resources -WHERE {','.join([f'{k} = %s' for k in keyfields])}; +WHERE {'AND'.join([f'{k} = %s' for k in keyfields])}; """, [target[k] for k in keyfields], ) From e012bb0a9d0c8a7add2944fe6f17ddb6c75dceb7 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 13:23:02 -0400 Subject: [PATCH 11/15] sql syntax error --- batch/batch/driver/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 552e9630f13..396a55f0868 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1518,7 +1518,7 @@ async def compact(tx: Transaction, record: dict): await tx.just_execute( f"""\ DELETE FROM job_group_inst_coll_cancellable_resources -WHERE {'AND'.join([f'{k} = %s' for k in keyfields])}; +WHERE {' AND '.join([f'{k} = %s' for k in keyfields])}; """, [record[k] for k in keyfields], ) @@ -1613,7 +1613,7 @@ async def delete_dead_job_group_cancellable_resources_records(db: Database): await db.just_execute( f"""\ DELETE FROM job_group_inst_coll_cancellable_resources -WHERE {'AND'.join([f'{k} = %s' for k in keyfields])}; +WHERE {' AND '.join([f'{k} = %s' for k in keyfields])}; """, [target[k] for k in keyfields], ) From 8b40080c3b01d21792c8f429bcb33580f8015226 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 13:38:52 -0400 Subject: [PATCH 12/15] sql syntax error --- batch/batch/driver/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 396a55f0868..c7b485be9f4 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1526,7 +1526,7 @@ async def compact(tx: Transaction, record: dict): await tx.execute_insertone( f"""\ INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)}) -VALUES ({','.join(['%s' for _ in rowfields])});, +VALUES ({','.join(['%s' for _ in rowfields])}); """, [record[k] for k in rowfields], ) From af92cd62aacb4f3fb844c06dc5abfe4403e3e864 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 15:09:25 -0400 Subject: [PATCH 13/15] remove unapplicable feature flag --- batch/batch/driver/main.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index c7b485be9f4..96fb8f19e4a 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1493,9 +1493,6 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data async def compact_job_group_cancellable_resources_records(app, db: Database): - if not app['feature_flags']['compact_billing_tables']: - return - keyfields = [ 'batch_id', 'update_id', @@ -1881,7 +1878,7 @@ async def close_and_wait(): task_manager.ensure_future(periodically_call(60, compact_agg_billing_project_users_by_date_table, app, db)) task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db)) task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db)) - task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, app, db)) + task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, db)) task_manager.ensure_future(periodically_call(60, delete_dead_job_group_cancellable_resources_records, db)) From 5ed4355c130c4271b9a2b10c72ee6f2ad9391001 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Wed, 31 Jul 2024 15:10:27 -0400 Subject: [PATCH 14/15] remove unused arg --- batch/batch/driver/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 96fb8f19e4a..5880b853b46 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1492,7 +1492,7 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data ) -async def compact_job_group_cancellable_resources_records(app, db: Database): +async def compact_job_group_cancellable_resources_records(db: Database): keyfields = [ 'batch_id', 'update_id', From 7d576f8fcef6f74cbfa6271d78c9bf6e328f92b2 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Mon, 5 Aug 2024 15:07:07 -0400 Subject: [PATCH 15/15] query for non-cancelled job groups --- batch/batch/driver/main.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index 5880b853b46..c2b8198bf9d 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1537,9 +1537,9 @@ async def compact(tx: Transaction, record: dict): SELECT C.id FROM job_groups_cancelled AS C INNER JOIN job_group_self_and_ancestors AS D ON C.id = D.batch_id - AND C.job_group_id = D.job_group_id - WHERE D.batch_id = G.batch_id - AND D.ancestor_id = G.job_group_id + AND C.job_group_id = D.ancestor_id + WHERE D.batch_id = G.batch_id + AND D.job_group_id = G.job_group_id ) AS C ON TRUE INNER JOIN LATERAL ( SELECT {keys} @@ -1582,9 +1582,9 @@ async def delete_dead_job_group_cancellable_resources_records(db: Database): SELECT C.id FROM job_groups_cancelled AS C INNER JOIN job_group_self_and_ancestors AS D ON C.id = D.batch_id - AND C.job_group_id = D.job_group_id - WHERE D.batch_id = G.batch_id - AND D.ancestor_id = G.job_group_id + AND C.job_group_id = D.ancestor_id + WHERE D.batch_id = G.batch_id + AND D.job_group_id = G.job_group_id ) AS C ON TRUE INNER JOIN LATERAL ( SELECT {keys}