#define LOCK_FIL 1
#define LOCK_OTH 2
-G_LOCK_DEFINE (LOCK_PRI);
-G_LOCK_DEFINE (LOCK_FIL);
-G_LOCK_DEFINE (LOCK_OTH);
+GMutex mutex_pri;
+GMutex mutex_fil;
+GMutex mutex_oth;
GCond cond_pri;
GCond cond_fil;
GCond cond_oth;
+volatile int id_pri = 0;
+volatile int id_fil = 0;
+volatile int id_oth = 0;
+
void
dumper_thread(gpointer data, gpointer user_data)
struct UserData *udata = (struct UserData *) user_data;
struct PoolTask *task = (struct PoolTask *) data;
+// printf("Mam task: %ld\n", task->id);
+
// get location_href without leading part of path (path to repo)
// including '/' char
const char *location_href = task->full_path + udata->repodir_name_len;
}
// Write primary data
- G_LOCK(LOCK_PRI);
+ g_mutex_lock(&mutex_pri);
+// printf("Current PRI ID: %d\n", id_pri);
+ while (id_pri != task->id)
+ g_cond_wait (&cond_pri, &mutex_pri);
+ id_pri++;
cr_puts(udata->pri_f, (const char *) res.primary);
- if (udata->pri_statements) {
+ if (udata->pri_statements)
cr_add_primary_pkg_db(udata->pri_statements, pkg);
- }
- G_UNLOCK(LOCK_PRI);
+ g_mutex_unlock(&mutex_pri);
+ g_cond_broadcast(&cond_pri);
// Write fielists data
- G_LOCK(LOCK_FIL);
+ g_mutex_lock(&mutex_fil);
+// printf("Current FIL ID: %d\n", id_fil);
+ while (id_fil != task->id)
+ g_cond_wait (&cond_fil, &mutex_fil);
+ id_fil++;
cr_puts(udata->fil_f, (const char *) res.filelists);
if (udata->fil_statements) {
cr_add_filelists_pkg_db(udata->fil_statements, pkg);
}
- G_UNLOCK(LOCK_FIL);
+ g_mutex_unlock(&mutex_fil);
+ g_cond_broadcast(&cond_fil);
// Write other data
- G_LOCK(LOCK_OTH);
+ g_mutex_lock(&mutex_oth);
+// printf("Current OTH ID: %d\n", id_oth);
+ while (id_oth != task->id)
+ g_cond_wait (&cond_oth, &mutex_oth);
+ id_oth++;
cr_puts(udata->oth_f, (const char *) res.other);
if (udata->oth_statements) {
cr_add_other_pkg_db(udata->oth_statements, pkg);
}
- G_UNLOCK(LOCK_OTH);
+ g_mutex_unlock(&mutex_oth);
+ g_cond_broadcast(&cond_oth);
// Clean up
g_free(res.other);
task_cleanup:
+ if (id_pri <= task->id) {
+ // An error was encountered and we have to wait to increment counters
+ g_mutex_lock(&mutex_pri);
+ while (id_pri != task->id)
+ g_cond_wait (&cond_pri, &mutex_pri);
+ id_pri++;
+ g_mutex_unlock(&mutex_pri);
+ g_cond_broadcast(&cond_pri);
+
+ g_mutex_lock(&mutex_fil);
+ while (id_fil != task->id)
+ g_cond_wait (&cond_fil, &mutex_fil);
+ id_fil++;
+ g_mutex_unlock(&mutex_fil);
+ g_cond_broadcast(&cond_fil);
+
+ g_mutex_lock(&mutex_oth);
+ while (id_oth != task->id)
+ g_cond_wait (&cond_oth, &mutex_oth);
+ id_oth++;
+ g_mutex_unlock(&mutex_oth);
+ g_cond_broadcast(&cond_oth);
+ }
+
g_free(task->full_path);
g_free(task->filename);
g_free(task->path);
int
+task_cmp(gconstpointer a_p, gconstpointer b_p, gpointer user_data)
+{
+ int ret;
+ const struct PoolTask *a = a_p;
+ const struct PoolTask *b = b_p;
+ CR_UNUSED(user_data);
+ ret = g_strcmp0(a->filename, b->filename);
+ if (ret) return ret;
+ ret = g_strcmp0(a->path, b->path);
+ return ret;
+}
+
+
+int
fill_pool(GThreadPool *pool,
gchar *in_dir,
struct CmdOptions *cmd_options,
GSList **current_pkglist,
FILE *output_pkg_list)
{
+ GQueue queue = G_QUEUE_INIT;
+ struct PoolTask *task;
int package_count = 0;
if (!(cmd_options->include_pkgs)) {
if (allowed_file(repo_relative_path, cmd_options)) {
// FINALLY! Add file into pool
g_debug("Adding pkg: %s", full_path);
- struct PoolTask *task = g_malloc(sizeof(struct PoolTask));
+ task = g_malloc(sizeof(struct PoolTask));
task->full_path = full_path;
task->filename = g_strdup(filename);
task->path = g_strdup(dirname);
fprintf(output_pkg_list, "%s\n", repo_relative_path);
*current_pkglist = g_slist_prepend(*current_pkglist, task->filename);
// TODO: One common path for all tasks with the same path?
- g_thread_pool_push(pool, task, NULL);
- package_count++;
+// g_thread_pool_push(pool, task, NULL);
+ g_queue_insert_sorted(&queue, task, task_cmp, NULL);
} else
g_free(full_path);
}
gchar *full_path = g_strconcat(in_dir, relative_path, NULL);
// ^^^ /path/to/in_repo/packages/i386/foobar.rpm
g_debug("Adding pkg: %s", full_path);
- struct PoolTask *task = g_malloc(sizeof(struct PoolTask));
+ task = g_malloc(sizeof(struct PoolTask));
task->full_path = full_path;
task->filename = g_strdup(filename); // foobar.rpm
task->path = strndup(relative_path, x); // packages/i386/
if (output_pkg_list)
fprintf(output_pkg_list, "%s\n", relative_path);
*current_pkglist = g_slist_prepend(*current_pkglist, task->filename);
- g_thread_pool_push(pool, task, NULL);
- package_count++;
+// g_thread_pool_push(pool, task, NULL);
+ g_queue_insert_sorted(&queue, task, task_cmp, NULL);
}
}
}
+ // Push sorted tasks into the thread pool
+ while ((task = g_queue_pop_head(&queue)) != NULL) {
+ task->id = package_count;
+ g_thread_pool_push(pool, task, NULL);
+ package_count++;
+ }
+
return package_count;
}