From 5064a49bb6e392221eb938d3178d6b6b6207706b Mon Sep 17 00:00:00 2001 From: Derrick Stolee Date: Wed, 18 Sep 2024 14:38:05 -0400 Subject: [PATCH] pack-objects: thread the path-based compression Adapting the implementation of ll_find_deltas(), create a threaded version of the --path-walk compression step in 'git pack-objects'. Using the Git repository as a test repo, the p5313 performance test shows that the resulting size of the repo is the same, but the threaded implementation gives gains of varying degrees depending on the number of objects being packed. (This was tested on a 16-core machine.) Test HEAD~1 HEAD -------------------------------------------------------------- 5313.6: thin pack with --path-walk 0.01 0.01 +0.0% 5313.7: thin pack size with --path-walk 475 475 +0.0% 5313.12: big pack with --path-walk 2.27 2.01 -11.5% 5313.13: big pack size with --path-walk 13.3M 13.3M +0.0% 5313.18: repack with --path-walk 98.00 41.53 -57.6% 5313.19: repack size with --path-walk 215.0K 215.0K +0.0% Signed-off-by: Derrick Stolee --- builtin/pack-objects.c | 158 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 156 insertions(+), 2 deletions(-) diff --git a/builtin/pack-objects.c b/builtin/pack-objects.c index ee2f985664cc9d..cf5032210b5f1f 100644 --- a/builtin/pack-objects.c +++ b/builtin/pack-objects.c @@ -2963,6 +2963,7 @@ static void find_deltas(struct object_entry **list, unsigned *list_size, struct thread_params { pthread_t thread; struct object_entry **list; + struct packing_region *regions; unsigned list_size; unsigned remaining; int window; @@ -3273,6 +3274,159 @@ static void find_deltas_by_region(struct object_entry *list, stop_progress(&progress_state); } +static void *threaded_find_deltas_by_path(void *arg) +{ + struct thread_params *me = arg; + + progress_lock(); + while (me->remaining) { + while (me->remaining) { + progress_unlock(); + find_deltas_for_region(to_pack.objects, + me->regions, + me->processed); + progress_lock(); + me->remaining--; + me->regions++; + } + + me->working = 0; + pthread_cond_signal(&progress_cond); + progress_unlock(); + + /* + * We must not set ->data_ready before we wait on the + * condition because the main thread may have set it to 1 + * before we get here. In order to be sure that new + * work is available if we see 1 in ->data_ready, it + * was initialized to 0 before this thread was spawned + * and we reset it to 0 right away. + */ + pthread_mutex_lock(&me->mutex); + while (!me->data_ready) + pthread_cond_wait(&me->cond, &me->mutex); + me->data_ready = 0; + pthread_mutex_unlock(&me->mutex); + + progress_lock(); + } + progress_unlock(); + /* leave ->working 1 so that this doesn't get more work assigned */ + return NULL; +} + +static void ll_find_deltas_by_region(struct object_entry *list, + struct packing_region *regions, + uint32_t start, uint32_t nr) +{ + struct thread_params *p; + int i, ret, active_threads = 0; + unsigned int processed = 0; + uint32_t progress_nr = regions[nr - 1].start + regions[nr - 1].nr; + init_threaded_search(); + + if (delta_search_threads <= 1) { + find_deltas_by_region(list, regions, start, nr); + cleanup_threaded_search(); + return; + } + + if (progress > pack_to_stdout) + fprintf_ln(stderr, _("Path-based delta compression using up to %d threads"), + delta_search_threads); + CALLOC_ARRAY(p, delta_search_threads); + + if (progress) + progress_state = start_progress(_("Compressing objects by path"), + progress_nr); + /* Partition the work amongst work threads. */ + for (i = 0; i < delta_search_threads; i++) { + unsigned sub_size = nr / (delta_search_threads - i); + + p[i].window = window; + p[i].depth = depth; + p[i].processed = &processed; + p[i].working = 1; + p[i].data_ready = 0; + + p[i].regions = regions; + p[i].list_size = sub_size; + p[i].remaining = sub_size; + + regions += sub_size; + nr -= sub_size; + } + + /* Start work threads. */ + for (i = 0; i < delta_search_threads; i++) { + if (!p[i].list_size) + continue; + pthread_mutex_init(&p[i].mutex, NULL); + pthread_cond_init(&p[i].cond, NULL); + ret = pthread_create(&p[i].thread, NULL, + threaded_find_deltas_by_path, &p[i]); + if (ret) + die(_("unable to create thread: %s"), strerror(ret)); + active_threads++; + } + + /* + * Now let's wait for work completion. Each time a thread is done + * with its work, we steal half of the remaining work from the + * thread with the largest number of unprocessed objects and give + * it to that newly idle thread. This ensure good load balancing + * until the remaining object list segments are simply too short + * to be worth splitting anymore. + */ + while (active_threads) { + struct thread_params *target = NULL; + struct thread_params *victim = NULL; + unsigned sub_size = 0; + + progress_lock(); + for (;;) { + for (i = 0; !target && i < delta_search_threads; i++) + if (!p[i].working) + target = &p[i]; + if (target) + break; + pthread_cond_wait(&progress_cond, &progress_mutex); + } + + for (i = 0; i < delta_search_threads; i++) + if (p[i].remaining > 2*window && + (!victim || victim->remaining < p[i].remaining)) + victim = &p[i]; + if (victim) { + sub_size = victim->remaining / 2; + target->regions = victim->regions + victim->remaining - sub_size; + victim->list_size -= sub_size; + victim->remaining -= sub_size; + } + target->list_size = sub_size; + target->remaining = sub_size; + target->working = 1; + progress_unlock(); + + pthread_mutex_lock(&target->mutex); + target->data_ready = 1; + pthread_cond_signal(&target->cond); + pthread_mutex_unlock(&target->mutex); + + if (!sub_size) { + pthread_join(target->thread, NULL); + pthread_cond_destroy(&target->cond); + pthread_mutex_destroy(&target->mutex); + active_threads--; + } + } + cleanup_threaded_search(); + free(p); + + display_progress(progress_state, progress_nr); + stop_progress(&progress_state); +} + static void prepare_pack(int window, int depth) { struct object_entry **delta_list; @@ -3298,8 +3452,8 @@ static void prepare_pack(int window, int depth) return; if (path_walk) - find_deltas_by_region(to_pack.objects, to_pack.regions, - 0, to_pack.nr_regions); + ll_find_deltas_by_region(to_pack.objects, to_pack.regions, + 0, to_pack.nr_regions); ALLOC_ARRAY(delta_list, to_pack.nr_objects); nr_deltas = n = 0;