Skip to content

Commit

Permalink
pack-objects: fix threaded load balancing
Browse files Browse the repository at this point in the history
The current method consists of a master thread serving chunks of objects
to work threads when they're done with their previous chunk.  The issue
is to determine the best chunk size: making it too large creates poor
load balancing, while making it too small has a negative effect on pack
size because of the increased number of chunk boundaries and poor delta
window utilization.

This patch implements a completely different approach by initially
splitting the work in large chunks uniformly amongst all threads, and
whenever a thread is done then it steals half of the remaining work from
another thread with the largest amount of unprocessed objects.

This has the advantage of greatly reducing the number of chunk boundaries
with an almost perfect load balancing.

Signed-off-by: Nicolas Pitre <nico@cam.org>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
  • Loading branch information
Nicolas Pitre authored and Junio C Hamano committed Dec 8, 2007
1 parent b904166 commit 384b32c
Showing 1 changed file with 85 additions and 32 deletions.
117 changes: 85 additions & 32 deletions builtin-pack-objects.c
Original file line number Diff line number Diff line change
Expand Up @@ -1479,22 +1479,34 @@ static unsigned long free_unpacked(struct unpacked *n)
return freed_mem;
}

static void find_deltas(struct object_entry **list, unsigned list_size,
static void find_deltas(struct object_entry **list, unsigned *list_size,
int window, int depth, unsigned *processed)
{
uint32_t i = 0, idx = 0, count = 0;
uint32_t i, idx = 0, count = 0;
unsigned int array_size = window * sizeof(struct unpacked);
struct unpacked *array;
unsigned long mem_usage = 0;

array = xmalloc(array_size);
memset(array, 0, array_size);

do {
struct object_entry *entry = list[i++];
for (;;) {
struct object_entry *entry = *list++;
struct unpacked *n = array + idx;
int j, max_depth, best_base = -1;

progress_lock();
if (!*list_size) {
progress_unlock();
break;
}
(*list_size)--;
if (!entry->preferred_base) {
(*processed)++;
display_progress(progress_state, *processed);
}
progress_unlock();

mem_usage -= free_unpacked(n);
n->entry = entry;

Expand All @@ -1512,11 +1524,6 @@ static void find_deltas(struct object_entry **list, unsigned list_size,
if (entry->preferred_base)
goto next;

progress_lock();
(*processed)++;
display_progress(progress_state, *processed);
progress_unlock();

/*
* If the current object is at pack edge, take the depth the
* objects that depend on the current object into account
Expand Down Expand Up @@ -1576,7 +1583,7 @@ static void find_deltas(struct object_entry **list, unsigned list_size,
count++;
if (idx >= window)
idx = 0;
} while (i < list_size);
}

for (i = 0; i < window; ++i) {
free_delta_index(array[i].index);
Expand All @@ -1591,6 +1598,7 @@ struct thread_params {
pthread_t thread;
struct object_entry **list;
unsigned list_size;
unsigned remaining;
int window;
int depth;
unsigned *processed;
Expand All @@ -1612,10 +1620,10 @@ static void *threaded_find_deltas(void *arg)
pthread_mutex_lock(&data_ready);
pthread_mutex_unlock(&data_request);

if (!me->list_size)
if (!me->remaining)
return NULL;

find_deltas(me->list, me->list_size,
find_deltas(me->list, &me->remaining,
me->window, me->depth, me->processed);
}
}
Expand All @@ -1624,57 +1632,102 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
int window, int depth, unsigned *processed)
{
struct thread_params *target, p[delta_search_threads];
int i, ret;
unsigned chunk_size;
int i, ret, active_threads = 0;

if (delta_search_threads <= 1) {
find_deltas(list, list_size, window, depth, processed);
find_deltas(list, &list_size, window, depth, processed);
return;
}

pthread_mutex_lock(&data_provider);
pthread_mutex_lock(&data_ready);

/* Start work threads. */
for (i = 0; i < delta_search_threads; i++) {
p[i].window = window;
p[i].depth = depth;
p[i].processed = processed;
p[i].remaining = 0;
ret = pthread_create(&p[i].thread, NULL,
threaded_find_deltas, &p[i]);
if (ret)
die("unable to create thread: %s", strerror(ret));
active_threads++;
}

/* this should be auto-tuned somehow */
chunk_size = window * 1000;
/* Then partition the work amongst them. */
for (i = 0; i < delta_search_threads; i++) {
unsigned sub_size = list_size / (delta_search_threads - i);

do {
unsigned sublist_size = chunk_size;
if (sublist_size > list_size)
sublist_size = list_size;
pthread_mutex_lock(&data_provider);
target = data_requester;
if (!sub_size) {
pthread_mutex_unlock(&data_ready);
pthread_join(target->thread, NULL);
active_threads--;
continue;
}

/* try to split chunks on "path" boundaries */
while (sublist_size < list_size && list[sublist_size]->hash &&
list[sublist_size]->hash == list[sublist_size-1]->hash)
sublist_size++;
while (sub_size < list_size && list[sub_size]->hash &&
list[sub_size]->hash == list[sub_size-1]->hash)
sub_size++;

target->list = list;
target->list_size = sub_size;
target->remaining = sub_size;
pthread_mutex_unlock(&data_ready);

list += sub_size;
list_size -= sub_size;
}

/*
* Now let's wait for work completion. Each time a thread is done
* with its work, we steal half of the remaining work from the
* thread with the largest number of unprocessed objects and give
* it to that newly idle thread. This ensure good load balancing
* until the remaining object list segments are simply too short
* to be worth splitting anymore.
*/
do {
struct thread_params *victim = NULL;
unsigned sub_size = 0;
pthread_mutex_lock(&data_provider);
target = data_requester;
target->list = list;
target->list_size = sublist_size;

progress_lock();
for (i = 0; i < delta_search_threads; i++)
if (p[i].remaining > 2*window &&
(!victim || victim->remaining < p[i].remaining))
victim = &p[i];
if (victim) {
sub_size = victim->remaining / 2;
list = victim->list + victim->list_size - sub_size;
while (sub_size && list[0]->hash &&
list[0]->hash == list[-1]->hash) {
list++;
sub_size--;
}
target->list = list;
victim->list_size -= sub_size;
victim->remaining -= sub_size;
}
progress_unlock();

target->list_size = sub_size;
target->remaining = sub_size;
pthread_mutex_unlock(&data_ready);

list += sublist_size;
list_size -= sublist_size;
if (!sublist_size) {
if (!sub_size) {
pthread_join(target->thread, NULL);
i--;
active_threads--;
}
} while (i);
} while (active_threads);
}

#else
#define ll_find_deltas find_deltas
#define ll_find_deltas(l, s, w, d, p) find_deltas(l, &s, w, d, p)
#endif

static void prepare_pack(int window, int depth)
Expand Down

0 comments on commit 384b32c

Please sign in to comment.