Add determinism to output package order
authorTomas Mlcoch <tmlcoch@redhat.com>
Tue, 23 Oct 2012 13:31:17 +0000 (15:31 +0200)
committerTomas Mlcoch <tmlcoch@redhat.com>
Tue, 23 Oct 2012 13:52:18 +0000 (15:52 +0200)
src/createrepo_c.c

index 656f078..777c161 100644 (file)
@@ -122,14 +122,18 @@ allowed_file(const gchar *filename, struct CmdOptions *options)
 #define LOCK_FIL        1
 #define LOCK_OTH        2
 
-G_LOCK_DEFINE (LOCK_PRI);
-G_LOCK_DEFINE (LOCK_FIL);
-G_LOCK_DEFINE (LOCK_OTH);
+GMutex mutex_pri;
+GMutex mutex_fil;
+GMutex mutex_oth;
 
 GCond cond_pri;
 GCond cond_fil;
 GCond cond_oth;
 
+volatile int id_pri = 0;
+volatile int id_fil = 0;
+volatile int id_oth = 0;
+
 
 void
 dumper_thread(gpointer data, gpointer user_data)
@@ -143,6 +147,8 @@ dumper_thread(gpointer data, gpointer user_data)
     struct UserData *udata = (struct UserData *) user_data;
     struct PoolTask *task = (struct PoolTask *) data;
 
+//    printf("Mam task: %ld\n", task->id);
+
     // get location_href without leading part of path (path to repo)
     // including '/' char
     const char *location_href = task->full_path + udata->repodir_name_len;
@@ -201,28 +207,42 @@ dumper_thread(gpointer data, gpointer user_data)
     }
 
     // Write primary data
-    G_LOCK(LOCK_PRI);
+    g_mutex_lock(&mutex_pri);
+//    printf("Current PRI ID: %d\n", id_pri);
+    while (id_pri != task->id)
+        g_cond_wait (&cond_pri, &mutex_pri);
+    id_pri++;
     cr_puts(udata->pri_f, (const char *) res.primary);
-    if (udata->pri_statements) {
+    if (udata->pri_statements)
         cr_add_primary_pkg_db(udata->pri_statements, pkg);
-    }
-    G_UNLOCK(LOCK_PRI);
+    g_mutex_unlock(&mutex_pri);
+    g_cond_broadcast(&cond_pri);
 
     // Write fielists data
-    G_LOCK(LOCK_FIL);
+    g_mutex_lock(&mutex_fil);
+//    printf("Current FIL ID: %d\n", id_fil);
+    while (id_fil != task->id)
+        g_cond_wait (&cond_fil, &mutex_fil);
+    id_fil++;
     cr_puts(udata->fil_f, (const char *) res.filelists);
     if (udata->fil_statements) {
         cr_add_filelists_pkg_db(udata->fil_statements, pkg);
     }
-    G_UNLOCK(LOCK_FIL);
+    g_mutex_unlock(&mutex_fil);
+    g_cond_broadcast(&cond_fil);
 
     // Write other data
-    G_LOCK(LOCK_OTH);
+    g_mutex_lock(&mutex_oth);
+//    printf("Current OTH ID: %d\n", id_oth);
+    while (id_oth != task->id)
+        g_cond_wait (&cond_oth, &mutex_oth);
+    id_oth++;
     cr_puts(udata->oth_f, (const char *) res.other);
     if (udata->oth_statements) {
         cr_add_other_pkg_db(udata->oth_statements, pkg);
     }
-    G_UNLOCK(LOCK_OTH);
+    g_mutex_unlock(&mutex_oth);
+    g_cond_broadcast(&cond_oth);
 
 
     // Clean up
@@ -235,6 +255,30 @@ dumper_thread(gpointer data, gpointer user_data)
     g_free(res.other);
 
 task_cleanup:
+    if (id_pri <= task->id) {
+        // An error was encountered and we have to wait to increment counters
+        g_mutex_lock(&mutex_pri);
+        while (id_pri != task->id)
+            g_cond_wait (&cond_pri, &mutex_pri);
+        id_pri++;
+        g_mutex_unlock(&mutex_pri);
+        g_cond_broadcast(&cond_pri);
+
+        g_mutex_lock(&mutex_fil);
+        while (id_fil != task->id)
+            g_cond_wait (&cond_fil, &mutex_fil);
+        id_fil++;
+        g_mutex_unlock(&mutex_fil);
+        g_cond_broadcast(&cond_fil);
+
+        g_mutex_lock(&mutex_oth);
+        while (id_oth != task->id)
+            g_cond_wait (&cond_oth, &mutex_oth);
+        id_oth++;
+        g_mutex_unlock(&mutex_oth);
+        g_cond_broadcast(&cond_oth);
+    }
+
     g_free(task->full_path);
     g_free(task->filename);
     g_free(task->path);
@@ -245,12 +289,28 @@ task_cleanup:
 
 
 int
+task_cmp(gconstpointer a_p, gconstpointer b_p, gpointer user_data)
+{
+    int ret;
+    const struct PoolTask *a = a_p;
+    const struct PoolTask *b = b_p;
+    CR_UNUSED(user_data);
+    ret = g_strcmp0(a->filename, b->filename);
+    if (ret) return ret;
+    ret = g_strcmp0(a->path, b->path);
+    return ret;
+}
+
+
+int
 fill_pool(GThreadPool *pool,
           gchar *in_dir,
           struct CmdOptions *cmd_options,
           GSList **current_pkglist,
           FILE *output_pkg_list)
 {
+    GQueue queue = G_QUEUE_INIT;
+    struct PoolTask *task;
     int package_count = 0;
 
     if (!(cmd_options->include_pkgs)) {
@@ -317,7 +377,7 @@ fill_pool(GThreadPool *pool,
                 if (allowed_file(repo_relative_path, cmd_options)) {
                     // FINALLY! Add file into pool
                     g_debug("Adding pkg: %s", full_path);
-                    struct PoolTask *task = g_malloc(sizeof(struct PoolTask));
+                    task = g_malloc(sizeof(struct PoolTask));
                     task->full_path = full_path;
                     task->filename = g_strdup(filename);
                     task->path = g_strdup(dirname);
@@ -325,8 +385,8 @@ fill_pool(GThreadPool *pool,
                         fprintf(output_pkg_list, "%s\n", repo_relative_path);
                     *current_pkglist = g_slist_prepend(*current_pkglist, task->filename);
                     // TODO: One common path for all tasks with the same path?
-                    g_thread_pool_push(pool, task, NULL);
-                    package_count++;
+//                    g_thread_pool_push(pool, task, NULL);
+                    g_queue_insert_sorted(&queue, task, task_cmp, NULL);
                 } else
                     g_free(full_path);
             }
@@ -367,19 +427,26 @@ fill_pool(GThreadPool *pool,
                 gchar *full_path = g_strconcat(in_dir, relative_path, NULL);
                 //     ^^^ /path/to/in_repo/packages/i386/foobar.rpm
                 g_debug("Adding pkg: %s", full_path);
-                struct PoolTask *task = g_malloc(sizeof(struct PoolTask));
+                task = g_malloc(sizeof(struct PoolTask));
                 task->full_path = full_path;
                 task->filename  = g_strdup(filename);         // foobar.rpm
                 task->path      = strndup(relative_path, x);  // packages/i386/
                 if (output_pkg_list)
                     fprintf(output_pkg_list, "%s\n", relative_path);
                 *current_pkglist = g_slist_prepend(*current_pkglist, task->filename);
-                g_thread_pool_push(pool, task, NULL);
-                package_count++;
+//                g_thread_pool_push(pool, task, NULL);
+                g_queue_insert_sorted(&queue, task, task_cmp, NULL);
             }
         }
     }
 
+    // Push sorted tasks into the thread pool
+    while ((task = g_queue_pop_head(&queue)) != NULL) {
+        task->id = package_count;
+        g_thread_pool_push(pool, task, NULL);
+        package_count++;
+    }
+
     return package_count;
 }