From cf3242a09d85603e5d1352d4d65d9039702acc8d Mon Sep 17 00:00:00 2001 From: Tomas Mlcoch Date: Tue, 23 Oct 2012 15:31:17 +0200 Subject: [PATCH] Add determinism to output package order --- src/createrepo_c.c | 101 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 17 deletions(-) diff --git a/src/createrepo_c.c b/src/createrepo_c.c index 656f078..777c161 100644 --- a/src/createrepo_c.c +++ b/src/createrepo_c.c @@ -122,14 +122,18 @@ allowed_file(const gchar *filename, struct CmdOptions *options) #define LOCK_FIL 1 #define LOCK_OTH 2 -G_LOCK_DEFINE (LOCK_PRI); -G_LOCK_DEFINE (LOCK_FIL); -G_LOCK_DEFINE (LOCK_OTH); +GMutex mutex_pri; +GMutex mutex_fil; +GMutex mutex_oth; GCond cond_pri; GCond cond_fil; GCond cond_oth; +volatile int id_pri = 0; +volatile int id_fil = 0; +volatile int id_oth = 0; + void dumper_thread(gpointer data, gpointer user_data) @@ -143,6 +147,8 @@ dumper_thread(gpointer data, gpointer user_data) struct UserData *udata = (struct UserData *) user_data; struct PoolTask *task = (struct PoolTask *) data; +// printf("Mam task: %ld\n", task->id); + // get location_href without leading part of path (path to repo) // including '/' char const char *location_href = task->full_path + udata->repodir_name_len; @@ -201,28 +207,42 @@ dumper_thread(gpointer data, gpointer user_data) } // Write primary data - G_LOCK(LOCK_PRI); + g_mutex_lock(&mutex_pri); +// printf("Current PRI ID: %d\n", id_pri); + while (id_pri != task->id) + g_cond_wait (&cond_pri, &mutex_pri); + id_pri++; cr_puts(udata->pri_f, (const char *) res.primary); - if (udata->pri_statements) { + if (udata->pri_statements) cr_add_primary_pkg_db(udata->pri_statements, pkg); - } - G_UNLOCK(LOCK_PRI); + g_mutex_unlock(&mutex_pri); + g_cond_broadcast(&cond_pri); // Write fielists data - G_LOCK(LOCK_FIL); + g_mutex_lock(&mutex_fil); +// printf("Current FIL ID: %d\n", id_fil); + while (id_fil != task->id) + g_cond_wait (&cond_fil, &mutex_fil); + id_fil++; cr_puts(udata->fil_f, (const char *) res.filelists); if (udata->fil_statements) { cr_add_filelists_pkg_db(udata->fil_statements, pkg); } - G_UNLOCK(LOCK_FIL); + g_mutex_unlock(&mutex_fil); + g_cond_broadcast(&cond_fil); // Write other data - G_LOCK(LOCK_OTH); + g_mutex_lock(&mutex_oth); +// printf("Current OTH ID: %d\n", id_oth); + while (id_oth != task->id) + g_cond_wait (&cond_oth, &mutex_oth); + id_oth++; cr_puts(udata->oth_f, (const char *) res.other); if (udata->oth_statements) { cr_add_other_pkg_db(udata->oth_statements, pkg); } - G_UNLOCK(LOCK_OTH); + g_mutex_unlock(&mutex_oth); + g_cond_broadcast(&cond_oth); // Clean up @@ -235,6 +255,30 @@ dumper_thread(gpointer data, gpointer user_data) g_free(res.other); task_cleanup: + if (id_pri <= task->id) { + // An error was encountered and we have to wait to increment counters + g_mutex_lock(&mutex_pri); + while (id_pri != task->id) + g_cond_wait (&cond_pri, &mutex_pri); + id_pri++; + g_mutex_unlock(&mutex_pri); + g_cond_broadcast(&cond_pri); + + g_mutex_lock(&mutex_fil); + while (id_fil != task->id) + g_cond_wait (&cond_fil, &mutex_fil); + id_fil++; + g_mutex_unlock(&mutex_fil); + g_cond_broadcast(&cond_fil); + + g_mutex_lock(&mutex_oth); + while (id_oth != task->id) + g_cond_wait (&cond_oth, &mutex_oth); + id_oth++; + g_mutex_unlock(&mutex_oth); + g_cond_broadcast(&cond_oth); + } + g_free(task->full_path); g_free(task->filename); g_free(task->path); @@ -245,12 +289,28 @@ task_cleanup: int +task_cmp(gconstpointer a_p, gconstpointer b_p, gpointer user_data) +{ + int ret; + const struct PoolTask *a = a_p; + const struct PoolTask *b = b_p; + CR_UNUSED(user_data); + ret = g_strcmp0(a->filename, b->filename); + if (ret) return ret; + ret = g_strcmp0(a->path, b->path); + return ret; +} + + +int fill_pool(GThreadPool *pool, gchar *in_dir, struct CmdOptions *cmd_options, GSList **current_pkglist, FILE *output_pkg_list) { + GQueue queue = G_QUEUE_INIT; + struct PoolTask *task; int package_count = 0; if (!(cmd_options->include_pkgs)) { @@ -317,7 +377,7 @@ fill_pool(GThreadPool *pool, if (allowed_file(repo_relative_path, cmd_options)) { // FINALLY! Add file into pool g_debug("Adding pkg: %s", full_path); - struct PoolTask *task = g_malloc(sizeof(struct PoolTask)); + task = g_malloc(sizeof(struct PoolTask)); task->full_path = full_path; task->filename = g_strdup(filename); task->path = g_strdup(dirname); @@ -325,8 +385,8 @@ fill_pool(GThreadPool *pool, fprintf(output_pkg_list, "%s\n", repo_relative_path); *current_pkglist = g_slist_prepend(*current_pkglist, task->filename); // TODO: One common path for all tasks with the same path? - g_thread_pool_push(pool, task, NULL); - package_count++; +// g_thread_pool_push(pool, task, NULL); + g_queue_insert_sorted(&queue, task, task_cmp, NULL); } else g_free(full_path); } @@ -367,19 +427,26 @@ fill_pool(GThreadPool *pool, gchar *full_path = g_strconcat(in_dir, relative_path, NULL); // ^^^ /path/to/in_repo/packages/i386/foobar.rpm g_debug("Adding pkg: %s", full_path); - struct PoolTask *task = g_malloc(sizeof(struct PoolTask)); + task = g_malloc(sizeof(struct PoolTask)); task->full_path = full_path; task->filename = g_strdup(filename); // foobar.rpm task->path = strndup(relative_path, x); // packages/i386/ if (output_pkg_list) fprintf(output_pkg_list, "%s\n", relative_path); *current_pkglist = g_slist_prepend(*current_pkglist, task->filename); - g_thread_pool_push(pool, task, NULL); - package_count++; +// g_thread_pool_push(pool, task, NULL); + g_queue_insert_sorted(&queue, task, task_cmp, NULL); } } } + // Push sorted tasks into the thread pool + while ((task = g_queue_pop_head(&queue)) != NULL) { + task->id = package_count; + g_thread_pool_push(pool, task, NULL); + package_count++; + } + return package_count; } -- 2.7.4