Deltarpm support (Experimental - the used drpm library is not stable!)
authorTomas Mlcoch <tmlcoch@redhat.com>
Fri, 18 Jul 2014 09:17:01 +0000 (11:17 +0200)
committerTomas Mlcoch <tmlcoch@redhat.com>
Fri, 18 Jul 2014 11:44:06 +0000 (13:44 +0200)
Git of the DRPM library: https://git.fedorahosted.org/git/drpm.git

Check the README.md for information how to build createrepo_c with delta rpm support

Note: Delta rpm support is disabled by default

17 files changed:
.gitignore
CMakeLists.txt
README.md
doc/createrepo_c.8.gz
doc/mergerepo_c.8.gz
doc/modifyrepo_c.8.gz
src/CMakeLists.txt
src/cmd_parser.c
src/cmd_parser.h
src/createrepo_c.c
src/createrepo_c.h
src/deltarpms.c [new file with mode: 0644]
src/deltarpms.h.in [new file with mode: 0644]
src/dumper_thread.c
src/dumper_thread.h
src/xml_dump.h
src/xml_dump_deltapackage.c [new file with mode: 0644]

index 8b7f88d..428af16 100644 (file)
@@ -26,6 +26,7 @@ CPackSourceConfig.cmake
 _CPack_Packages/
 src/version.h
 src/createrepo_c.pc
+src/deltarpms.h
 
 # Devel stuff
 notes
index e2f1162..a9962a9 100644 (file)
@@ -69,6 +69,18 @@ ELSE (RPM_PATH)
     message("Using system RPM: ${RPMDB_LIBRARY}")
 ENDIF (RPM_PATH)
 
+# drpm
+if (DRPM_PATH)
+    include_directories (${DRPM_PATH}/)
+    find_library (DRPM_LIBRARY NAMES drpm PATHS ${DRPM_PATH}/ NO_DEFAULT_PATH)
+    set(CR_DELTA_RPM_SUPPORT "1")
+    message("Using custom DRPM: ${DRPM_LIBRARY}")
+ELSE (DRPM_PATH)
+    FIND_LIBRARY (DRPM_LIBRARY NAMES drpm)
+    IF (NOT DRPM_LIBRARY)
+        MESSAGE("No DRPM library installed")
+    ENDIF (NOT DRPM_LIBRARY)
+endif (DRPM_PATH)
 
 # Get package version
 INCLUDE (${CMAKE_SOURCE_DIR}/VERSION.cmake)
index 0a018a9..ff25243 100644 (file)
--- a/README.md
+++ b/README.md
@@ -27,7 +27,7 @@ Package build requires - Pkg name in Fedora/Ubuntu:
 * *Documentation:* sphinx (http://sphinx-doc.org/) - python-sphinx/
 * **Test requires:** check (http://check.sourceforge.net/) - check-devel/check
 * **Test requires:** python-nose (https://nose.readthedocs.org/) - python-nose/python-nose
-
+* **Experimental support:** drpm (https://git.fedorahosted.org/git/drpm.git)
 
 From your checkout dir:
 
@@ -47,7 +47,7 @@ To build the documentation, from the build/ directory:
 
 E.g. when you want to try weak and rich dependencies.
 
-    cmake -D RPM_PATH="/home/tmlcoch/git/rpm" .. && make
+    cmake -DRPM_PATH="/home/tmlcoch/git/rpm" .. && make
 
 **Note:** The RPM must be buit in that directory
 
@@ -57,6 +57,20 @@ Commands I am using for building of RPM:
     CPPFLAGS='-I/usr/include/nss3/ -I/usr/include/nspr4/' ./autogen.sh --rpmconfigure --with-vendor=redhat --with-external-db --with-lua --with-selinux --with-cap --with-acl --enable-python
     make clean && make
 
+## Building with delta rpm support (drpm)
+
+At first, you have to checkout drpm library from the
+https://git.fedorahosted.org/git/drpm.git and build it.
+
+    git clone ssh://git.fedorahosted.org/git/drpm.git
+    cd drpm/
+    make
+
+Then run ``cmake`` for createrepo_c with param ``-DDRPM_PATH="/home/tmlcoch/git/drpm"``
+where the path is path to your build of drpm library.
+
+    cmake -DDRPM_PATH="/home/tmlcoch/git/drpm" .. && make
+
 ## Build tarball
 
     utils/make_tarball.sh [git revision]
index 6b39c6b..f404d7c 100644 (file)
Binary files a/doc/createrepo_c.8.gz and b/doc/createrepo_c.8.gz differ
index a1da534..54e6444 100644 (file)
Binary files a/doc/mergerepo_c.8.gz and b/doc/mergerepo_c.8.gz differ
index 93f375b..4ae802c 100644 (file)
Binary files a/doc/modifyrepo_c.8.gz and b/doc/modifyrepo_c.8.gz differ
index ccacffc..a795ac8 100644 (file)
@@ -1,6 +1,7 @@
 SET (createrepo_c_SRCS
      checksum.c
      compression_wrapper.c
+     deltarpms.c
      dumper_thread.c
      error.c
      helpers.c
@@ -15,6 +16,7 @@ SET (createrepo_c_SRCS
      sqlite.c
      threads.c
      xml_dump.c
+     xml_dump_deltapackage.c
      xml_dump_filelists.c
      xml_dump_other.c
      xml_dump_primary.c
@@ -31,6 +33,7 @@ SET(headers
     compression_wrapper.h
     constants.h
     createrepo_c.h
+    deltarpms.h
     error.h
     helpers.h
     load_metadata.h
@@ -60,6 +63,9 @@ TARGET_LINK_LIBRARIES(libcreaterepo_c ${OPENSSL_LIBRARIES})
 TARGET_LINK_LIBRARIES(libcreaterepo_c ${RPMDB_LIBRARY})
 TARGET_LINK_LIBRARIES(libcreaterepo_c ${SQLITE3_LIBRARIES})
 TARGET_LINK_LIBRARIES(libcreaterepo_c ${ZLIB_LIBRARY})
+IF (DRPM_LIBRARY)
+    TARGET_LINK_LIBRARIES(libcreaterepo_c ${DRPM_LIBRARY})
+ENDIF (DRPM_LIBRARY)
 
 
 SET_TARGET_PROPERTIES(libcreaterepo_c PROPERTIES
@@ -88,6 +94,7 @@ TARGET_LINK_LIBRARIES(modifyrepo_c
 
 CONFIGURE_FILE("createrepo_c.pc.cmake" "${CMAKE_SOURCE_DIR}/src/createrepo_c.pc" @ONLY)
 CONFIGURE_FILE("version.h.in" "${CMAKE_CURRENT_SOURCE_DIR}/version.h" @ONLY)
+CONFIGURE_FILE("deltarpms.h.in" "${CMAKE_CURRENT_SOURCE_DIR}/deltarpms.h" @ONLY)
 
 IF (CMAKE_SIZEOF_VOID_P MATCHES "8")
     SET (LIB_SUFFIX "64")
index 06ea516..db900d6 100644 (file)
@@ -23,6 +23,7 @@
 #include <assert.h>
 #include <errno.h>
 #include "cmd_parser.h"
+#include "deltarpms.h"
 #include "error.h"
 #include "compression_wrapper.h"
 #include "misc.h"
@@ -45,6 +46,10 @@ struct CmdOptions _cmd_options = {
         .ignore_lock          = DEFAULT_IGNORE_LOCK,
         .md_max_age           = 0,
         .cachedir             = NULL,
+
+        .num_deltas           = 1,
+        .max_delta_rpm_size   = CR_DEFAULT_MAX_DELTA_RPM_SIZE,
+
         .checksum_cachedir    = NULL,
     };
 
@@ -138,6 +143,15 @@ static GOptionEntry cmd_entries[] =
       "Available units (m - minutes, h - hours, d - days)", "AGE" },
     { "cachedir", 'c', 0, G_OPTION_ARG_FILENAME, &(_cmd_options.cachedir),
       "Set path to cache dir", "CACHEDIR." },
+    { "deltas", 0, 0, G_OPTION_ARG_NONE, &(_cmd_options.deltas),
+      "Tells createrepo to generate deltarpms and the delta metadata.", NULL },
+    { "oldpackagedirs", 0, 0, G_OPTION_ARG_FILENAME_ARRAY, &(_cmd_options.oldpackagedirs),
+      "Paths to look for older pkgs to delta against. Can be specified "
+      "multiple times.", "PATH" },
+    { "num-deltas", 0, 0, G_OPTION_ARG_INT, &(_cmd_options.num_deltas),
+      "The number of older versions to make deltas against. Defaults to 1.", "INT" },
+    { "max-delta-rpm-size", 0, 0, G_OPTION_ARG_INT64, &(_cmd_options.max_delta_rpm_size),
+      "Max size of an rpm that to run deltarpm against (in bytes).", "MAX_DELTA_RPM_SIZE" },
     { NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL },
 };
 
@@ -443,6 +457,16 @@ check_arguments(struct CmdOptions *options,
             return FALSE;
     }
 
+    // Check oldpackagedirs
+    x = 0;
+    while (options->oldpackagedirs && options->oldpackagedirs[x]) {
+        char *path = options->oldpackagedirs[x];
+        options->oldpackagedirs_paths = g_slist_prepend(
+                                            options->oldpackagedirs_paths,
+                                            (gpointer) path);
+        x++;
+    }
+
     return TRUE;
 }
 
@@ -469,6 +493,7 @@ free_options(struct CmdOptions *options)
     g_strfreev(options->distro_tags);
     g_strfreev(options->content_tags);
     g_strfreev(options->repo_tags);
+    g_strfreev(options->oldpackagedirs);
 
     cr_slist_free_full(options->include_pkgs, g_free);
     cr_slist_free_full(options->exclude_masks,
@@ -476,4 +501,5 @@ free_options(struct CmdOptions *options)
     cr_slist_free_full(options->l_update_md_paths, g_free);
     cr_slist_free_full(options->distro_cpeids, g_free);
     cr_slist_free_full(options->distro_values, g_free);
+    cr_slist_free_full(options->oldpackagedirs_paths, g_free);
 }
index 16314b9..bf60e6a 100644 (file)
@@ -85,6 +85,14 @@ struct CmdOptions {
                                      d - days) */
     char *cachedir;             /*!< Cache dir for checksums */
 
+    gboolean deltas;            /*!< Is delta generation enabled? */
+    char **oldpackagedirs;      /*!< Paths to look for older pks
+                                     to delta agains */
+    gint num_deltas;            /*!< Number of older version to make
+                                     deltas against */
+    gint64 max_delta_rpm_size;  /*!< Max size of an rpm that to run
+                                     deltarpm against */
+
     /* Items filled by check_arguments() */
 
     char *groupfile_fullpath;   /*!< full path to groupfile */
@@ -104,6 +112,7 @@ struct CmdOptions {
                                      Filled if --retain-old-md-by-age
                                      is used */
     char *checksum_cachedir;    /*!< Path to cachedir */
+    GSList *oldpackagedirs_paths; /*!< paths to look for older pkgs to delta against */
 };
 
 /**
index 496e822..bd0acd2 100644 (file)
@@ -32,6 +32,7 @@
 #include <unistd.h>
 #include "cmd_parser.h"
 #include "compression_wrapper.h"
+#include "deltarpms.h"
 #include "dumper_thread.h"
 #include "checksum.h"
 #include "error.h"
@@ -47,6 +48,7 @@
 #include "xml_dump.h"
 #include "xml_file.h"
 
+#define OUTDELTADIR "drpms/"
 
 // Global variables used by the signal handler failure_exit_cleanup
 char *global_lock_dir     = NULL; // Path to .repodata/ dir that is used as a lock
@@ -750,20 +752,25 @@ main(int argc, char **argv)
     // Setup compression types
 
     const char *sqlite_compression_suffix = NULL;
+    const char *prestodelta_compression_suffix = NULL;
     cr_CompressionType sqlite_compression = CR_CW_BZ2_COMPRESSION;
     cr_CompressionType groupfile_compression = CR_CW_GZ_COMPRESSION;
+    cr_CompressionType prestodelta_compression = CR_CW_GZ_COMPRESSION;
 
     if (cmd_options->compression_type != CR_CW_UNKNOWN_COMPRESSION) {
-        sqlite_compression    = cmd_options->compression_type;
-        groupfile_compression = cmd_options->compression_type;
+        sqlite_compression      = cmd_options->compression_type;
+        groupfile_compression   = cmd_options->compression_type;
+        prestodelta_compression = cmd_options->compression_type;
     }
 
     if (cmd_options->xz_compression) {
-        sqlite_compression    = CR_CW_XZ_COMPRESSION;
-        groupfile_compression = CR_CW_XZ_COMPRESSION;
+        sqlite_compression      = CR_CW_XZ_COMPRESSION;
+        groupfile_compression   = CR_CW_XZ_COMPRESSION;
+        prestodelta_compression = CR_CW_GZ_COMPRESSION;
     }
 
     sqlite_compression_suffix = cr_compression_suffix(sqlite_compression);
+    prestodelta_compression_suffix = cr_compression_suffix(prestodelta_compression);
 
 
     // Create and open new compressed files
@@ -911,12 +918,10 @@ main(int argc, char **argv)
     user_data.checksum_type     = cmd_options->checksum_type;
     user_data.checksum_cachedir = cmd_options->checksum_cachedir;
     user_data.skip_symlinks     = cmd_options->skip_symlinks;
-    user_data.skip_stat         = cmd_options->skip_stat;
-    user_data.old_metadata      = old_metadata;
     user_data.repodir_name_len  = strlen(in_dir);
     user_data.package_count     = package_count;
-    user_data.buffer            = g_queue_new();
-    user_data.mutex_buffer      = g_mutex_new();
+    user_data.skip_stat         = cmd_options->skip_stat;
+    user_data.old_metadata      = old_metadata;
     user_data.mutex_pri         = g_mutex_new();
     user_data.mutex_fil         = g_mutex_new();
     user_data.mutex_oth         = g_mutex_new();
@@ -926,6 +931,12 @@ main(int argc, char **argv)
     user_data.id_pri            = 0;
     user_data.id_fil            = 0;
     user_data.id_oth            = 0;
+    user_data.buffer            = g_queue_new();
+    user_data.mutex_buffer      = g_mutex_new();
+    user_data.deltas            = cmd_options->deltas;
+    user_data.max_delta_rpm_size= cmd_options->max_delta_rpm_size;
+    user_data.mutex_deltatargetpackages = g_mutex_new();
+    user_data.deltatargetpackages = NULL;
 
     g_debug("Thread pool user data ready");
 
@@ -955,6 +966,7 @@ main(int argc, char **argv)
     g_mutex_free(user_data.mutex_pri);
     g_mutex_free(user_data.mutex_fil);
     g_mutex_free(user_data.mutex_oth);
+    g_mutex_free(user_data.mutex_deltatargetpackages);
 
 
     // Create repomd records for each file
@@ -972,6 +984,7 @@ main(int argc, char **argv)
     cr_RepomdRecord *groupfile_rec            = NULL;
     cr_RepomdRecord *compressed_groupfile_rec = NULL;
     cr_RepomdRecord *updateinfo_rec           = NULL;
+    cr_RepomdRecord *prestodelta_rec          = NULL;
 
 
     // XML
@@ -1148,6 +1161,108 @@ main(int argc, char **argv)
         cr_repomdrecordfilltask_free(oth_db_fill_task, NULL);
     }
 
+#ifdef CR_DELTA_RPM_SUPPORT
+    // Delta generation
+    if (cmd_options->deltas) {
+        gboolean ret;
+        gchar *filename, *outdeltadir = NULL;
+        gchar *prestodelta_xml_filename = NULL;
+        GHashTable *ht_oldpackagedirs = NULL;
+        cr_XmlFile *prestodelta_cr_file = NULL;
+        cr_ContentStat *prestodelta_stat = NULL;
+
+        filename = g_strconcat("prestodelta.xml",
+                               prestodelta_compression_suffix,
+                               NULL);
+        outdeltadir = g_build_filename(out_dir, OUTDELTADIR, NULL);
+        prestodelta_xml_filename = g_build_filename(tmp_out_repo,
+                                                    filename,
+                                                    NULL);
+        g_free(filename);
+
+        // 0) Prepare outdeltadir
+        if (g_file_test(outdeltadir, G_FILE_TEST_EXISTS)) {
+            if (!g_file_test(outdeltadir, G_FILE_TEST_IS_DIR)) {
+                g_error("The file %s already exists and it is not a directory",
+                        outdeltadir);
+                goto deltaerror;
+            }
+        } else if (g_mkdir(outdeltadir, S_IRWXU|S_IRWXG|S_IROTH|S_IXOTH)) {
+            g_error("Cannot create %s: %s", outdeltadir, strerror(errno));
+            goto deltaerror;
+        }
+
+        // 1) Scan old package directories
+        ht_oldpackagedirs = cr_deltarpms_scan_oldpackagedirs(cmd_options->oldpackagedirs_paths,
+                                                   cmd_options->max_delta_rpm_size,
+                                                   &tmp_err);
+        if (!ht_oldpackagedirs) {
+            g_error("cr_deltarpms_scan_oldpackagedirs failed: %s\n", tmp_err->message);
+            g_clear_error(&tmp_err);
+            goto deltaerror;
+        }
+
+        // 2) Generate drpms in parallel
+        ret = cr_deltarpms_parallel_deltas(user_data.deltatargetpackages,
+                                 ht_oldpackagedirs,
+                                 outdeltadir,
+                                 cmd_options->num_deltas,
+                                 cmd_options->workers,
+                                 cmd_options->max_delta_rpm_size,
+                                 cmd_options->max_delta_rpm_size,
+                                 &tmp_err);
+        if (!ret) {
+            g_error("Parallel generation of drpms failed: %s", tmp_err->message);
+            g_clear_error(&tmp_err);
+            goto deltaerror;
+        }
+
+        // 3) Generate prestodelta.xml file
+        prestodelta_stat = cr_contentstat_new(cmd_options->checksum_type, NULL);
+        prestodelta_cr_file = cr_xmlfile_sopen_prestodelta(prestodelta_xml_filename,
+                                                           prestodelta_compression,
+                                                           prestodelta_stat,
+                                                           &tmp_err);
+        if (!prestodelta_cr_file) {
+            g_error("Cannot open %s: %s", prestodelta_xml_filename, tmp_err->message);
+            g_clear_error(&tmp_err);
+            goto deltaerror;
+        }
+
+        ret = cr_deltarpms_generate_prestodelta_file(
+                        outdeltadir,
+                        prestodelta_cr_file,
+                        //cmd_options->checksum_type,
+                        CR_CHECKSUM_SHA256, // Createrepo always uses SHA256
+                        cmd_options->workers,
+                        out_dir,
+                        &tmp_err);
+        if (!ret) {
+            g_error("Cannot generate %s: %s", prestodelta_xml_filename,
+                    tmp_err->message);
+            g_clear_error(&tmp_err);
+            goto deltaerror;
+        }
+
+        cr_xmlfile_close(prestodelta_cr_file, NULL);
+        prestodelta_cr_file = NULL;
+
+        // 4) Prepare repomd record
+        prestodelta_rec = cr_repomd_record_new("prestodelta", prestodelta_xml_filename);
+        cr_repomd_record_load_contentstat(prestodelta_rec, prestodelta_stat);
+        cr_repomd_record_fill(prestodelta_rec, cmd_options->checksum_type, NULL);
+
+deltaerror:
+        // 5) Cleanup
+        g_hash_table_destroy(ht_oldpackagedirs);
+        g_free(outdeltadir);
+        g_free(prestodelta_xml_filename);
+        cr_xmlfile_close(prestodelta_cr_file, NULL);
+        cr_contentstat_free(prestodelta_stat, NULL);
+        cr_slist_free_full(user_data.deltatargetpackages,
+                       (GDestroyNotify) cr_deltatargetpackage_free);
+    }
+#endif
 
     // Add checksums into files names
 
@@ -1161,6 +1276,7 @@ main(int argc, char **argv)
         cr_repomd_record_rename_file(groupfile_rec, NULL);
         cr_repomd_record_rename_file(compressed_groupfile_rec, NULL);
         cr_repomd_record_rename_file(updateinfo_rec, NULL);
+        cr_repomd_record_rename_file(prestodelta_rec, NULL);
     }
 
 
@@ -1175,6 +1291,7 @@ main(int argc, char **argv)
     cr_repomd_set_record(repomd_obj, groupfile_rec);
     cr_repomd_set_record(repomd_obj, compressed_groupfile_rec);
     cr_repomd_set_record(repomd_obj, updateinfo_rec);
+    cr_repomd_set_record(repomd_obj, prestodelta_rec);
 
     int i = 0;
     while (cmd_options->repo_tags && cmd_options->repo_tags[i])
index 5c40590..3bf35a4 100644 (file)
@@ -41,6 +41,7 @@ extern "C" {
 #include <glib.h>
 #include "checksum.h"
 #include "compression_wrapper.h"
+#include "deltarpms.h"
 #include "error.h"
 #include "load_metadata.h"
 #include "locate_metadata.h"
diff --git a/src/deltarpms.c b/src/deltarpms.c
new file mode 100644 (file)
index 0000000..9594c10
--- /dev/null
@@ -0,0 +1,907 @@
+/* createrepo_c - Library of routines for manipulation with repodata
+ * Copyright (C) 2014  Tomas Mlcoch
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301,
+ * USA.
+ */
+
+#include <glib.h>
+#include <glib/gstdio.h>
+#include <errno.h>
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include "deltarpms.h"
+#ifdef    CR_DELTA_RPM_SUPPORT
+#include <drpm.h>
+#endif
+#include "package.h"
+#include "parsepkg.h"
+#include "misc.h"
+#include "error.h"
+
+#define MAKEDELTARPM    "/usr/bin/makedeltarpm"
+
+
+gboolean
+cr_drpm_support(void)
+{
+#ifdef    CR_DELTA_RPM_SUPPORT
+    if (g_file_test(MAKEDELTARPM, G_FILE_TEST_IS_REGULAR
+                                   | G_FILE_TEST_IS_EXECUTABLE))
+        return TRUE;
+#endif
+    return FALSE;
+}
+
+#ifdef    CR_DELTA_RPM_SUPPORT
+
+char *
+cr_drpm_create(cr_DeltaTargetPackage *old,
+               cr_DeltaTargetPackage *new,
+               const char *destdir,
+               GError **err)
+{
+    gchar *drpmfn, *drpmpath, *error_str = NULL;
+    GPtrArray *cmd_array;
+    int spawn_flags = G_SPAWN_SEARCH_PATH | G_SPAWN_STDOUT_TO_DEV_NULL;
+    GError *tmp_err = NULL;
+    gint status = 0;
+    gboolean ret;
+
+    drpmfn = g_strdup_printf("%s-%s-%s_%s-%s.%s.drpm",
+                             old->name, old->version, old->release,
+                             new->version, new->release, old->arch);
+    drpmpath = g_build_filename(destdir, drpmfn, NULL);
+    g_free(drpmfn);
+
+    cmd_array = g_ptr_array_new();
+    g_ptr_array_add(cmd_array, MAKEDELTARPM);
+    g_ptr_array_add(cmd_array, (gpointer) old->path);
+    g_ptr_array_add(cmd_array, (gpointer) new->path);
+    g_ptr_array_add(cmd_array, (gpointer) drpmpath);
+    g_ptr_array_add(cmd_array, (gpointer) NULL);
+
+    g_spawn_sync(NULL,              // working directory
+                 (char **) cmd_array->pdata,  // argv
+                 NULL,              // envp
+                 spawn_flags,       // spawn flags
+                 NULL,              // child setup function
+                 NULL,              // user data for child setup
+                 NULL,              // stdout
+                 &error_str,        // stderr
+                 &status,           // status
+                 &tmp_err           // err
+                );
+
+    g_ptr_array_free(cmd_array, TRUE);
+
+    if (tmp_err) {
+        g_free(error_str);
+        free(drpmpath);
+        g_propagate_error(err, tmp_err);
+        return NULL;
+    }
+
+    ret = cr_spawn_check_exit_status(status, &tmp_err);
+    if (!ret) {
+        g_propagate_prefixed_error(err, tmp_err, "%s: ", error_str);
+        free(drpmpath);
+        g_free(error_str);
+        return NULL;
+    }
+
+    g_free(error_str);
+
+    return drpmpath;
+}
+
+void
+cr_deltapackage_free(cr_DeltaPackage *deltapackage)
+{
+    if (!deltapackage)
+        return;
+    cr_package_free(deltapackage->package);
+    g_string_chunk_free(deltapackage->chunk);
+    g_free(deltapackage);
+}
+
+cr_DeltaPackage *
+cr_deltapackage_from_drpm_base(const char *filename,
+                               int changelog_limit,
+                               cr_HeaderReadingFlags flags,
+                               GError **err)
+{
+    struct drpm *delta = NULL;
+    cr_DeltaPackage *deltapackage = NULL;
+    char *str;
+
+    assert(!err || *err == NULL);
+
+    deltapackage = g_new0(cr_DeltaPackage, 1);
+    deltapackage->chunk = g_string_chunk_new(0);
+
+    deltapackage->package = cr_package_from_rpm_base(filename,
+                                                     changelog_limit,
+                                                     flags,
+                                                     err);
+    if (!deltapackage->package)
+        goto errexit;
+
+    if (drpm_read(filename, &delta) != EOK) {
+        g_set_error(err, CR_DELTARPMS_ERROR, CRE_DELTARPM,
+                    "Deltarpm cannot read %s", filename);
+        goto errexit;
+    }
+
+    if (drpm_get_string(delta, DRPM_SOURCE_NEVR, &str) != EOK) {
+        g_set_error(err, CR_DELTARPMS_ERROR, CRE_DELTARPM,
+                    "Deltarpm cannot read source NEVR from %s", filename);
+        goto errexit;
+    }
+
+    deltapackage->nevr = cr_safe_string_chunk_insert_null(
+                                    deltapackage->chunk, str);
+
+    if (drpm_get_string(delta, DRPM_SEQUENCE, &str) != EOK) {
+        g_set_error(err, CR_DELTARPMS_ERROR, CRE_DELTARPM,
+                    "Deltarpm cannot read delta sequence from %s", filename);
+        goto errexit;
+    }
+
+    deltapackage->sequence = cr_safe_string_chunk_insert_null(
+                                    deltapackage->chunk, str);
+
+    drpm_destroy(&delta);
+
+    return deltapackage;
+
+errexit:
+
+    drpm_destroy(&delta);
+    cr_deltapackage_free(deltapackage);
+
+    return NULL;
+}
+
+
+static void
+cr_free_gslist_of_strings(gpointer list)
+{
+    if (!list) return;
+    cr_slist_free_full((GSList *) list, (GDestroyNotify) g_free);
+}
+
+/*
+ * 1) Scanning for old candidate rpms
+ */
+
+GHashTable *
+cr_deltarpms_scan_oldpackagedirs(GSList *oldpackagedirs,
+                                 gint64 max_delta_rpm_size,
+                                 GError **err)
+{
+    GHashTable *ht = NULL;
+
+    assert(!err || *err == NULL);
+
+    ht = g_hash_table_new_full(g_str_hash,
+                               g_str_equal,
+                               (GDestroyNotify) g_free,
+                               (GDestroyNotify) cr_free_gslist_of_strings);
+
+    for (GSList *elem = oldpackagedirs; elem; elem = g_slist_next(elem)) {
+        gchar *dirname = elem->data;
+        const gchar *filename;
+        GDir *dirp;
+        GSList *filenames = NULL;
+
+        dirp = g_dir_open(dirname, 0, NULL);
+        if (!dirp) {
+            g_warning("Cannot open directory %s", dirname);
+            continue;
+        }
+
+        while ((filename = g_dir_read_name(dirp))) {
+            gchar *full_path;
+            struct stat st;
+
+            if (!g_str_has_suffix(filename, ".rpm"))
+                continue;  // Skip non rpm files
+
+            full_path = g_build_filename(dirname, filename, NULL);
+
+            if (stat(full_path, &st) == -1) {
+                g_warning("Cannot stat %s: %s", full_path, strerror(errno));
+                g_free(full_path);
+                continue;
+            }
+
+            if (st.st_size > max_delta_rpm_size) {
+                g_debug("%s: Skipping %s that is > max_delta_rpm_size",
+                        __func__, full_path);
+                g_free(full_path);
+                continue;
+            }
+
+            g_free(full_path);
+
+            filenames = g_slist_prepend(filenames, g_strdup(filename));
+        }
+
+        if (filenames) {
+            g_hash_table_replace(ht,
+                                 (gpointer) g_strdup(dirname),
+                                 (gpointer) filenames);
+        }
+
+        g_dir_close(dirp);
+    }
+
+
+    return ht;
+}
+
+/*
+ * 2) Parallel delta generation
+ */
+
+
+typedef struct {
+    cr_DeltaTargetPackage *tpkg;
+} cr_DeltaTask;
+
+
+typedef struct {
+    const char *outdeltadir;
+    gint num_deltas;
+    GHashTable *oldpackages;
+    GMutex *mutex;
+    gint64 active_work_size;
+    gint active_tasks;
+    GCond *cond_task_finished;
+} cr_DeltaThreadUserData;
+
+
+static gint
+cmp_deltatargetpackage_evr(gconstpointer aa, gconstpointer bb)
+{
+    const cr_DeltaTargetPackage *a = aa;
+    const cr_DeltaTargetPackage *b = bb;
+
+    return cr_cmp_evr(a->epoch, a->version, a->release,
+                      b->epoch, b->version, b->release);
+}
+
+
+static void
+cr_delta_thread(gpointer data, gpointer udata)
+{
+    cr_DeltaTask *task = data;
+    cr_DeltaThreadUserData *user_data = udata;
+    cr_DeltaTargetPackage *tpkg = task->tpkg;  // Shortcut
+
+    GHashTableIter iter;
+    gpointer key, value;
+
+    // Iterate through specified oldpackage directories
+    g_hash_table_iter_init(&iter, user_data->oldpackages);
+    while (g_hash_table_iter_next(&iter, &key, &value)) {
+        gchar *dirname = key;
+        GSList *local_candidates = NULL;
+
+        // Select appropriate candidates from the directory
+        for (GSList *elem = value; elem; elem = g_slist_next(elem)) {
+            gchar *filename = elem->data;
+            if (g_str_has_prefix(filename, tpkg->name)) {
+                cr_DeltaTargetPackage *l_tpkg;
+                gchar *path = g_build_filename(dirname, filename, NULL);
+                l_tpkg = cr_deltatargetpackage_from_rpm(path, NULL);
+                g_free(path);
+                if (!l_tpkg)
+                    continue;
+
+                // Check the candidate more carefully
+
+                if (g_strcmp0(tpkg->name, l_tpkg->name)) {
+                    cr_deltatargetpackage_free(l_tpkg);
+                    continue;
+                }
+
+                if (g_strcmp0(tpkg->arch, l_tpkg->arch)) {
+                    cr_deltatargetpackage_free(l_tpkg);
+                    continue;
+                }
+
+                if (cr_cmp_evr(tpkg->epoch, tpkg->version, tpkg->release,
+                        l_tpkg->epoch, l_tpkg->version, l_tpkg->release) <= 0)
+                {
+                    cr_deltatargetpackage_free(l_tpkg);
+                    continue;
+                }
+
+                // This candidate looks good
+                local_candidates = g_slist_prepend(local_candidates, l_tpkg);
+            }
+        }
+
+        // Sort the candidates
+        local_candidates = g_slist_sort(local_candidates,
+                                        cmp_deltatargetpackage_evr);
+        local_candidates = g_slist_reverse(local_candidates);
+
+        // Generate deltas
+        int x = 0;
+        for (GSList *lelem = local_candidates; lelem; lelem = g_slist_next(lelem)){
+            GError *tmp_err = NULL;
+            cr_DeltaTargetPackage *old = lelem->data;
+
+            g_debug("Generating delta %s -> %s", old->path, tpkg->path);
+            cr_drpm_create(old, tpkg, user_data->outdeltadir, &tmp_err);
+            if (tmp_err) {
+                g_warning("Cannot generate delta %s -> %s : %s",
+                          old->path, tpkg->path, tmp_err->message);
+                g_error_free(tmp_err);
+                continue;
+            }
+            if (++x == user_data->num_deltas)
+                break;
+        }
+    }
+
+    g_debug("Deltas for \"%s\" (%"G_GINT64_FORMAT") generated",
+            tpkg->name, tpkg->size_installed);
+
+    g_mutex_lock(user_data->mutex);
+    user_data->active_work_size -= tpkg->size_installed;
+    user_data->active_tasks--;
+    g_cond_signal(user_data->cond_task_finished);
+    g_mutex_unlock(user_data->mutex);
+}
+
+
+static gint
+cmp_deltatargetpackage_sizes(gconstpointer a, gconstpointer b)
+{
+    const cr_DeltaTargetPackage *dtpk_a = a;
+    const cr_DeltaTargetPackage *dtpk_b = b;
+
+    if (dtpk_a->size_installed < dtpk_b->size_installed)
+        return -1;
+    else if (dtpk_a->size_installed == dtpk_b->size_installed)
+        return 0;
+    else
+        return 1;
+}
+
+
+gboolean
+cr_deltarpms_parallel_deltas(GSList *targetpackages,
+                   GHashTable *oldpackages,
+                   const char *outdeltadir,
+                   gint num_deltas,
+                   gint workers,
+                   gint64 max_delta_rpm_size,
+                   gint64 max_work_size,
+                   GError **err)
+{
+    GThreadPool *pool;
+    cr_DeltaThreadUserData user_data;
+    GList *targets = NULL;
+    GError *tmp_err = NULL;
+
+    assert(!err || *err == NULL);
+
+    if (num_deltas < 1)
+        return TRUE;
+
+    if (workers < 1) {
+        g_set_error(err, CR_DELTARPMS_ERROR, CRE_DELTARPM,
+                    "Number of delta workers must be a positive integer number");
+        return FALSE;
+    }
+
+    // Init user_data
+    user_data.outdeltadir           = outdeltadir;
+    user_data.num_deltas            = num_deltas;
+    user_data.oldpackages           = oldpackages;
+    user_data.mutex                 = g_mutex_new();
+    user_data.active_work_size      = 0;
+    user_data.active_tasks          = 0;
+    user_data.cond_task_finished    = g_cond_new();
+
+    // Make sorted list of targets without packages
+    // that are bigger then max_delta_rpm_size
+    for (GSList *elem = targetpackages; elem; elem = g_slist_next(elem)) {
+        cr_DeltaTargetPackage *tpkg = elem->data;
+        if (tpkg->size_installed < max_delta_rpm_size)
+            targets = g_list_insert_sorted(targets, tpkg, cmp_deltatargetpackage_sizes);
+    }
+    targets = g_list_reverse(targets);
+
+    // Setup the pool of workers
+    pool = g_thread_pool_new(cr_delta_thread,
+                             &user_data,
+                             workers,
+                             TRUE,
+                             &tmp_err);
+    if (tmp_err) {
+        g_propagate_prefixed_error(err, tmp_err, "Cannot create delta pool: ");
+        return FALSE;
+    }
+
+    // Push tasks into the pool
+    while (targets) {
+        gboolean inserted = FALSE;
+        gint64 active_work_size;
+        gint64 active_tasks;
+
+        g_mutex_lock(user_data.mutex);
+        while (user_data.active_tasks == workers)
+            // Wait if all available threads are busy
+            g_cond_wait(user_data.cond_task_finished, user_data.mutex);
+        active_work_size = user_data.active_work_size;
+        active_tasks = user_data.active_tasks;
+        g_mutex_unlock(user_data.mutex);
+
+        for (GList *elem = targets; elem; elem = g_list_next(elem)) {
+            cr_DeltaTargetPackage *tpkg = elem->data;
+            if ((active_work_size + tpkg->size_installed) <= max_work_size) {
+                cr_DeltaTask *task = g_new0(cr_DeltaTask, 1);
+                task->tpkg = tpkg;
+
+                g_mutex_lock(user_data.mutex);
+                user_data.active_work_size += tpkg->size_installed;
+                user_data.active_tasks++;
+                g_mutex_unlock(user_data.mutex);
+
+                g_thread_pool_push(pool, task, NULL);
+                targets = g_list_delete_link(targets, elem);
+                inserted = TRUE;
+                break;
+            }
+        }
+
+        if (!inserted) {
+            // In this iteration, no task was pushed to the pool
+            g_mutex_lock(user_data.mutex);
+            while (user_data.active_tasks == active_tasks)
+                // Wait until any of running tasks finishes
+                g_cond_wait(user_data.cond_task_finished, user_data.mutex);
+            g_mutex_unlock(user_data.mutex);
+        }
+    }
+
+    g_thread_pool_free(pool, FALSE, TRUE);
+    g_list_free(targets);
+}
+
+
+cr_DeltaTargetPackage *
+cr_deltatargetpackage_from_package(cr_Package *pkg,
+                                   const char *path,
+                                   GError **err)
+{
+    cr_DeltaTargetPackage *tpkg;
+
+    assert(pkg);
+    assert(!err || *err == NULL);
+
+    tpkg = g_new0(cr_DeltaTargetPackage, 1);
+    tpkg->chunk = g_string_chunk_new(0);
+    tpkg->name = cr_safe_string_chunk_insert(tpkg->chunk, pkg->name);
+    tpkg->arch = cr_safe_string_chunk_insert(tpkg->chunk, pkg->arch);
+    tpkg->epoch = cr_safe_string_chunk_insert(tpkg->chunk, pkg->epoch);
+    tpkg->version = cr_safe_string_chunk_insert(tpkg->chunk, pkg->version);
+    tpkg->release = cr_safe_string_chunk_insert(tpkg->chunk, pkg->release);
+    tpkg->location_href = cr_safe_string_chunk_insert(tpkg->chunk, pkg->location_href);
+    tpkg->size_installed = pkg->size_installed;
+    tpkg->path = cr_safe_string_chunk_insert(tpkg->chunk, path);
+
+    return tpkg;
+}
+
+
+cr_DeltaTargetPackage *
+cr_deltatargetpackage_from_rpm(const char *path, GError **err)
+{
+    cr_Package *pkg;
+    cr_DeltaTargetPackage *tpkg;
+
+    assert(!err || *err == NULL);
+
+    pkg = cr_package_from_rpm_base(path, 0, 0, err);
+    if (!pkg)
+        return NULL;
+
+    tpkg = cr_deltatargetpackage_from_package(pkg, path, err);
+    cr_package_free(pkg);
+    return tpkg;
+}
+
+
+void
+cr_deltatargetpackage_free(cr_DeltaTargetPackage *tpkg)
+{
+    if (!tpkg)
+        return;
+    g_string_chunk_free(tpkg->chunk);
+    g_free(tpkg);
+}
+
+
+GSList *
+cr_deltarpms_scan_targetdir(const char *path,
+                            gint64 max_delta_rpm_size,
+                            GError **err)
+{
+    GSList *targets = NULL;
+    GDir *dirp;
+    GQueue *sub_dirs = g_queue_new();
+    GStringChunk *sub_dirs_chunk = g_string_chunk_new(1024);
+
+    assert(!err || *err == NULL);
+
+    g_queue_push_head(sub_dirs, g_strdup(path));
+
+    // Recursively walk the dir
+    gchar *dirname;
+    while ((dirname = g_queue_pop_head(sub_dirs))) {
+
+        // Open the directory
+        GDir *dirp = g_dir_open(dirname, 0, NULL);
+        if (!dirp) {
+            g_warning("Cannot open directory %s", dirname);
+            return NULL;
+        }
+
+        // Iterate over files in directory
+        const gchar *filename;
+        while ((filename = g_dir_read_name(dirp))) {
+            gchar *full_path;
+            struct stat st;
+            cr_DeltaTargetPackage *tpkg;
+
+            full_path = g_build_filename(dirname, filename, NULL);
+
+            if (!g_str_has_suffix(filename, ".rpm")) {
+                if (g_file_test(full_path, G_FILE_TEST_IS_DIR)) {
+                    // Directory
+                    gchar *sub_dir_in_chunk;
+                    sub_dir_in_chunk = g_string_chunk_insert(sub_dirs_chunk,
+                                                             full_path);
+                    g_queue_push_head(sub_dirs, sub_dir_in_chunk);
+                    g_debug("Dir to scan: %s", sub_dir_in_chunk);
+                }
+                g_free(full_path);
+                continue;
+            }
+
+            if (stat(full_path, &st) == -1) {
+                g_warning("Cannot stat %s: %s", full_path, strerror(errno));
+                g_free(full_path);
+                continue;
+            }
+
+            if (st.st_size > max_delta_rpm_size) {
+                g_debug("%s: Skipping %s that is > max_delta_rpm_size",
+                        __func__, full_path);
+                g_free(full_path);
+                continue;
+            }
+
+            tpkg = cr_deltatargetpackage_from_rpm(full_path, NULL);
+            if (tpkg)
+                targets = g_slist_prepend(targets, tpkg);
+            g_free(full_path);
+        }
+
+        g_dir_close(dirp);
+    }
+
+    g_queue_free_full(sub_dirs, g_free);
+    g_string_chunk_free(sub_dirs_chunk);
+
+    return targets;
+}
+
+
+/*
+ * 3) Parallel xml chunk generation
+ */
+
+typedef struct {
+    gchar *full_path;
+} cr_PrestoDeltaTask;
+
+typedef struct {
+    GMutex *mutex;
+    GHashTable *ht;
+    cr_ChecksumType checksum_type;
+    gchar *prefix_to_strip;
+    size_t prefix_len;
+} cr_PrestoDeltaUserData;
+
+void
+cr_prestodeltatask_free(cr_PrestoDeltaTask *task)
+{
+    if (!task)
+        return;
+    g_free(task->full_path);
+    g_free(task);
+}
+
+static gboolean
+walk_drpmsdir(const gchar *drpmsdir, GSList **inlist, GError **err)
+{
+    gboolean ret = TRUE;
+    GSList *candidates = NULL;
+    GQueue *sub_dirs = g_queue_new();
+    GStringChunk *sub_dirs_chunk = g_string_chunk_new(1024);
+
+    assert(drpmsdir);
+    assert(inlist);
+    assert(!err || *err == NULL);
+
+    g_queue_push_head(sub_dirs, g_strdup(drpmsdir));
+
+    // Recursively walk the drpmsdir
+    gchar *dirname;
+    while ((dirname = g_queue_pop_head(sub_dirs))) {
+
+        // Open the directory
+        GDir *dirp = g_dir_open(drpmsdir, 0, NULL);
+        if (!dirp) {
+            g_set_error(err, CR_DELTARPMS_ERROR, CRE_IO,
+                        "Cannot open directory %s", drpmsdir);
+            goto exit;
+        }
+
+        // Iterate over files in directory
+        const gchar *filename;
+        while ((filename = g_dir_read_name(dirp))) {
+            gchar *full_path = g_build_filename(dirname, filename, NULL);
+
+            // Non .rpm files
+            if (!g_str_has_suffix (filename, ".drpm")) {
+                if (g_file_test(full_path, G_FILE_TEST_IS_DIR)) {
+                    // Directory
+                    gchar *sub_dir_in_chunk;
+                    sub_dir_in_chunk = g_string_chunk_insert(sub_dirs_chunk,
+                                                             full_path);
+                    g_queue_push_head(sub_dirs, sub_dir_in_chunk);
+                    g_debug("Dir to scan: %s", sub_dir_in_chunk);
+                }
+                g_free(full_path);
+                continue;
+            }
+
+            // Take the file
+            cr_PrestoDeltaTask *task = g_new0(cr_PrestoDeltaTask, 1);
+            task->full_path = full_path;
+            candidates = g_slist_prepend(candidates, task);
+        }
+        g_free(dirname);
+    }
+
+    *inlist = candidates;
+    candidates = NULL;
+
+exit:
+    g_slist_free_full(candidates, (GDestroyNotify) cr_prestodeltatask_free);
+    g_queue_free_full(sub_dirs, g_free);
+    g_string_chunk_free(sub_dirs_chunk);
+
+    return ret;
+}
+
+
+static void
+cr_prestodelta_thread(gpointer data, gpointer udata)
+{
+    cr_PrestoDeltaTask *task          = data;
+    cr_PrestoDeltaUserData *user_data = udata;
+
+    cr_DeltaPackage *dpkg = NULL;
+    struct stat st;
+    gchar *xml_chunk = NULL, *key = NULL, *checksum = NULL;
+    gpointer val;
+    GError *tmp_err = NULL;
+
+    printf("%s\n", task->full_path);
+
+    // Load delta package
+    dpkg = cr_deltapackage_from_drpm_base(task->full_path, 0, 0, &tmp_err);
+    if (!dpkg) {
+        g_warning("Cannot read drpm %s: %s", task->full_path, tmp_err->message);
+        g_error_free(tmp_err);
+        goto exit;
+    }
+
+    // Set the filename
+    dpkg->package->location_href = cr_safe_string_chunk_insert(
+                                    dpkg->package->chunk,
+                                    task->full_path + user_data->prefix_len);
+
+    // Stat the package (to get the size)
+    if (stat(task->full_path, &st) == -1) {
+        g_warning("%s: stat(%s) error (%s)", __func__,
+                  task->full_path, strerror(errno));
+        goto exit;
+    } else {
+        dpkg->package->size_package = st.st_size;
+    }
+
+    // Calculate the checksum
+    checksum = cr_checksum_file(task->full_path,
+                                user_data->checksum_type,
+                                &tmp_err);
+    if (!checksum) {
+        g_warning("Cannot calculate checksum for %s: %s",
+                  task->full_path, tmp_err->message);
+        g_error_free(tmp_err);
+        goto exit;
+    }
+    dpkg->package->checksum_type = cr_safe_string_chunk_insert(
+                                        dpkg->package->chunk,
+                                        cr_checksum_name_str(
+                                            user_data->checksum_type));
+    dpkg->package->pkgId = cr_safe_string_chunk_insert(dpkg->package->chunk,
+                                                       checksum);
+
+    // Generate XML
+    xml_chunk = cr_xml_dump_deltapackage(dpkg, &tmp_err);
+    if (tmp_err) {
+        g_warning("Cannot generate xml for drpm %s: %s",
+                  task->full_path, tmp_err->message);
+        g_error_free(tmp_err);
+        goto exit;
+    }
+
+    // Put the XML into the shared hash table
+    key = cr_package_nevra(dpkg->package);
+    g_mutex_lock(user_data->mutex);
+    if (g_hash_table_lookup_extended(user_data->ht, key, NULL, &val)) {
+        // Key exists in the table
+        g_slist_append( ((GSList *) val), xml_chunk);
+    } else {
+        // Key doesn't exist yet
+        GSList *list = g_slist_prepend(NULL, xml_chunk);
+        g_hash_table_insert(user_data->ht, g_strdup(key), list);
+    }
+    g_mutex_unlock(user_data->mutex);
+
+exit:
+    g_free(checksum);
+    g_free(key);
+    cr_deltapackage_free(dpkg);
+}
+
+static gchar *
+gen_newpackage_xml_chunk(const char *strnevra,
+                         GSList *delta_chunks,
+                         GError **err)
+{
+    cr_NEVRA *nevra;
+    GString *chunk;
+
+    if (!delta_chunks)
+        return NULL;
+
+    nevra = cr_str_to_nevra(strnevra);
+
+    chunk = g_string_new(NULL);
+    g_string_printf(chunk, "  <newpackage name=\"%s\" epoch=\"%s\" "
+                    "version=\"%s\" release=\"%s\" arch=\"%s\">\n",
+                    nevra->name, nevra->epoch ? nevra->epoch : "0",
+                    nevra->version, nevra->release, nevra->arch);
+
+    cr_nevra_free(nevra);
+
+    for (GSList *elem = delta_chunks; elem; elem = g_slist_next(elem)) {
+        gchar *delta_chunk = elem->data;
+        g_string_append(chunk, delta_chunk);
+    }
+
+    g_string_append(chunk, "  </newpackage>\n");
+
+    return g_string_free(chunk, FALSE);
+}
+
+gboolean
+cr_deltarpms_generate_prestodelta_file(const gchar *drpmsdir,
+                                       cr_XmlFile *f,
+                                       cr_ChecksumType checksum_type,
+                                       gint workers,
+                                       const gchar *prefix_to_strip,
+                                       GError **err)
+{
+    gboolean ret = TRUE;
+    GSList *candidates = NULL;
+    GThreadPool *pool;
+    cr_PrestoDeltaUserData user_data;
+    GHashTable *ht = NULL;
+    GHashTableIter iter;
+    gpointer key, value;
+    GError *tmp_err = NULL;
+
+    assert(drpmsdir);
+    assert(f);
+    assert(!err || *err == NULL);
+
+    // Walk the drpms directory
+
+    if (!walk_drpmsdir(drpmsdir, &candidates, &tmp_err)) {
+        g_propagate_prefixed_error(err, tmp_err, "%s: ", __func__);
+        ret = FALSE;
+        goto exit;
+    }
+
+    // Setup pool of workers
+
+    ht = g_hash_table_new_full(g_str_hash,
+                               g_str_equal,
+                               (GDestroyNotify) g_free,
+                               (GDestroyNotify) cr_free_gslist_of_strings);
+
+    user_data.mutex             = g_mutex_new();
+    user_data.ht                = ht;
+    user_data.checksum_type     = checksum_type;
+    user_data.prefix_to_strip   = prefix_to_strip,
+    user_data.prefix_len        = prefix_to_strip ? strlen(prefix_to_strip) : 0;
+
+    pool = g_thread_pool_new(cr_prestodelta_thread,
+                             &user_data,
+                             workers,
+                             TRUE,
+                             &tmp_err);
+    if (tmp_err) {
+        g_propagate_prefixed_error(err, tmp_err,
+                "Cannot create pool for prestodelta file generation: ");
+        return FALSE;
+    }
+
+    // Push tasks to the pool
+
+    for (GSList *elem = candidates; elem; elem = g_slist_next(elem)) {
+        g_thread_pool_push(pool, elem->data, NULL);
+    }
+
+    // Wait until the pool finishes
+
+    g_thread_pool_free(pool, FALSE, TRUE);
+
+    // Write out the results
+
+
+    g_hash_table_iter_init(&iter, user_data.ht);
+    while (g_hash_table_iter_next(&iter, &key, &value)) {
+        gchar *chunk = NULL;
+        gchar *nevra = key;
+
+        chunk = gen_newpackage_xml_chunk(nevra, (GSList *) value, NULL);
+        cr_xmlfile_add_chunk(f, chunk, NULL);
+    }
+
+exit:
+    g_slist_free_full(candidates, (GDestroyNotify) cr_prestodeltatask_free);
+
+    return ret;
+}
+
+#endif
diff --git a/src/deltarpms.h.in b/src/deltarpms.h.in
new file mode 100644 (file)
index 0000000..684af18
--- /dev/null
@@ -0,0 +1,126 @@
+/* createrepo_c - Library of routines for manipulation with repodata
+ * Copyright (C) 2014  Tomas Mlcoch
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301,
+ * USA.
+ */
+
+#ifndef __C_CREATEREPOLIB_DELTARPMS_H__
+#define __C_CREATEREPOLIB_DELTARPMS_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rpm/rpmlib.h>
+#include <glib.h>
+#include "package.h"
+#include "parsehdr.h"
+#include "xml_file.h"
+
+/** \defgroup   deltarpms    Support for deltarpms
+ *  \addtogroup deltarpms
+ *  @{
+ */
+
+#cmakedefine CR_DELTA_RPM_SUPPORT
+#define CR_DEFAULT_MAX_DELTA_RPM_SIZE   100000000
+
+typedef struct {
+    cr_Package *package;
+    char *nevr;
+    char *sequence;
+    GStringChunk *chunk;
+} cr_DeltaPackage;
+
+typedef struct {
+    char *name;
+    char *arch;
+    char *epoch;
+    char *version;
+    char *release;
+    char *location_href;
+    gint64 size_installed;
+
+    char *path;
+    GStringChunk *chunk;
+} cr_DeltaTargetPackage;
+
+gboolean cr_drpm_support(void);
+
+#ifdef CR_DELTA_RPM_SUPPORT
+char *
+cr_drpm_create(cr_DeltaTargetPackage *old,
+               cr_DeltaTargetPackage *new,
+               const char *destdir,
+               GError **err);
+
+cr_DeltaPackage *
+cr_deltapackage_from_drpm_base(const char *filename,
+                               int changelog_limit,
+                               cr_HeaderReadingFlags flags,
+                               GError **err);
+
+void
+cr_deltapackage_free(cr_DeltaPackage *deltapackage);
+
+GHashTable *
+cr_deltarpms_scan_oldpackagedirs(GSList *oldpackagedirs,
+                                 gint64 max_delta_rpm_size,
+                                 GError **err);
+
+cr_DeltaTargetPackage *
+cr_deltatargetpackage_from_package(cr_Package *pkg,
+                                   const char *path,
+                                   GError **err);
+
+cr_DeltaTargetPackage *
+cr_deltatargetpackage_from_rpm(const char *path, GError **err);
+
+void
+cr_deltatargetpackage_free(cr_DeltaTargetPackage *tpkg);
+
+gboolean
+cr_deltarpms_parallel_deltas(GSList *targetpackages,
+                             GHashTable *oldpackages,
+                             const char *outdeltadir,
+                             gint num_deltas,
+                             gint workers,
+                             gint64 max_delta_rpm_size,
+                             gint64 max_work_size,
+                             GError **err);
+
+GSList *
+cr_deltarpms_scan_targetdir(const char *path,
+                            gint64 max_delta_rpm_size,
+                            GError **err);
+
+gboolean
+cr_deltarpms_generate_prestodelta_file(const gchar *drpmdir,
+                                       cr_XmlFile *f,
+                                       cr_ChecksumType checksum_type,
+                                       gint workers,
+                                       const gchar *prefix_to_strip,
+                                       GError **err);
+#endif
+
+
+/** @} */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __C_CREATEREPOLIB_DELTARPMS_H__ */
index de098e5..f75e60b 100644 (file)
@@ -27,6 +27,7 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include "checksum.h"
+#include "deltarpms.h"
 #include "dumper_thread.h"
 #include "error.h"
 #include "misc.h"
@@ -392,6 +393,29 @@ cr_dumper_thread(gpointer data, gpointer user_data)
         }
     }
 
+#ifdef CR_DELTA_RPM_SUPPORT
+    // Delta candidate
+    if (udata->deltas
+        && !old_used
+        && pkg->size_installed < udata->max_delta_rpm_size)
+    {
+        cr_DeltaTargetPackage *tpkg;
+        tpkg = cr_deltatargetpackage_from_package(pkg,
+                                                  task->full_path,
+                                                  NULL);
+        if (tpkg) {
+            g_mutex_lock(udata->mutex_deltatargetpackages);
+            udata->deltatargetpackages = g_slist_prepend(
+                                                udata->deltatargetpackages,
+                                                tpkg);
+            g_mutex_unlock(udata->mutex_deltatargetpackages);
+        } else {
+            g_warning("Cannot create deltatargetpackage for: %s-%s-%s",
+                      pkg->name, pkg->version, pkg->release);
+        }
+    }
+#endif
+
     // Buffering stuff
     g_mutex_lock(udata->mutex_buffer);
 
index 6ff0694..cf9852f 100644 (file)
@@ -46,7 +46,6 @@ struct PoolTask {
 };
 
 struct UserData {
-    GThreadPool *pool;              // thread pool
     cr_XmlFile *pri_f;              // Opened compressed primary.xml.*
     cr_XmlFile *fil_f;              // Opened compressed filelists.xml.*
     cr_XmlFile *oth_f;              // Opened compressed other.xml.*
@@ -81,6 +80,13 @@ struct UserData {
     // Buffering
     GQueue *buffer;                 // Buffer for done tasks
     GMutex *mutex_buffer;           // Mutex for accessing the buffer
+
+    // Delta generation
+    gboolean deltas;                // Are deltas enabled?
+    gint64 max_delta_rpm_size;      // Max size of an rpm that to run
+                                    // deltarpm against
+    GMutex *mutex_deltatargetpackages; // Mutex
+    GSList *deltatargetpackages;    // List of cr_DeltaTargetPackages
 };
 
 
index b296863..194b76f 100644 (file)
@@ -24,6 +24,7 @@
 extern "C" {
 #endif
 
+#include "deltarpms.h"
 #include "package.h"
 #include "repomd.h"
 
@@ -122,6 +123,13 @@ struct cr_XmlStruct cr_xml_dump(cr_Package *package, GError **err);
  */
 char *cr_xml_dump_repomd(cr_Repomd *repomd, GError **err);
 
+/** Generate xml representation of cr_DeltaPackage
+ * @param dpkg          cr_DeltaPackage
+ * @param err           **GError
+ * @return              xml chunk string or NULL on error
+ */
+char *cr_xml_dump_deltapackage(cr_DeltaPackage *dpkg, GError **err);
+
 /** Prepare string to xml dump.
  * If string is not utf8 it is converted (source encoding is supposed to be
  * iso-8859-1).
diff --git a/src/xml_dump_deltapackage.c b/src/xml_dump_deltapackage.c
new file mode 100644 (file)
index 0000000..1fed947
--- /dev/null
@@ -0,0 +1,144 @@
+/* createrepo_c - Library of routines for manipulation with repodata
+ * Copyright (C) 2014  Tomas Mlcoch
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301,
+ * USA.
+ */
+
+#include <glib.h>
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include <libxml/encoding.h>
+#include <libxml/xmlwriter.h>
+#include <libxml/xmlsave.h>
+#include "deltarpms.h"
+#include "error.h"
+#include "misc.h"
+#include "package.h"
+#include "xml_dump.h"
+#include "xml_dump_internal.h"
+
+#define INDENT  4
+
+void
+cr_xml_dump_delta(xmlNodePtr root, cr_DeltaPackage *package)
+{
+    /***********************************
+     Element: delta
+    ************************************/
+
+    cr_NEVR * nevr = cr_str_to_nevr(package->nevr);
+
+    // Add oldepoch attribute
+    cr_xmlNewProp(root, BAD_CAST "oldepoch",
+                  BAD_CAST ((nevr->epoch && *(nevr->epoch)) ? nevr->epoch : "0"));
+
+    // Add oldversion attribute
+    cr_xmlNewProp(root, BAD_CAST "oldversion", BAD_CAST nevr->version);
+
+    // Add oldrelease attribute
+    cr_xmlNewProp(root, BAD_CAST "oldrelease", BAD_CAST nevr->release);
+
+    cr_nevr_free(nevr);
+
+    /***********************************
+     Element: filename
+    ************************************/
+
+    cr_xmlNewTextChild(root, NULL,
+                       BAD_CAST "filename",
+                       BAD_CAST package->package->location_href);
+
+    /***********************************
+    Element: sequence
+    ************************************/
+
+    char *sequence = g_strconcat(package->nevr, "-", package->sequence, NULL);
+    cr_xmlNewTextChild(root, NULL,
+                       BAD_CAST "sequence",
+                       BAD_CAST sequence);
+    g_free(sequence);
+
+    /***********************************
+     Element: size
+    ************************************/
+
+    char size_str[SIZE_STR_MAX_LEN];
+
+    g_snprintf(size_str, SIZE_STR_MAX_LEN, "%"G_GINT64_FORMAT,
+               package->package->size_package);
+
+    cr_xmlNewTextChild(root, NULL, BAD_CAST "size", BAD_CAST size_str);
+
+    /***********************************
+     Element: checksum
+    ************************************/
+
+    xmlNodePtr checksum;
+
+    checksum = cr_xmlNewTextChild(root,
+                                  NULL,
+                                  BAD_CAST "checksum",
+                                  BAD_CAST package->package->pkgId);
+
+    cr_xmlNewProp(checksum,
+                  BAD_CAST "type",
+                  BAD_CAST package->package->checksum_type);
+}
+
+
+char *
+cr_xml_dump_deltapackage(cr_DeltaPackage *package, GError **err)
+{
+    xmlNodePtr root;
+    char *result;
+
+    assert(!err || *err == NULL);
+
+    if (!package)
+        return NULL;
+
+
+    // Dump IT!
+
+    xmlBufferPtr buf = xmlBufferCreate();
+    if (buf == NULL) {
+        g_critical("%s: Error creating the xml buffer", __func__);
+        g_set_error(err, CR_XML_DUMP_OTHER_ERROR, CRE_MEMORY,
+                    "Cannot create an xml buffer");
+        return NULL;
+    }
+
+    root = xmlNewNode(NULL, BAD_CAST "delta");
+    cr_xml_dump_delta(root, package);
+    // xmlNodeDump seems to be a little bit faster than xmlDocDumpFormatMemory
+    xmlNodeDump(buf, NULL, root, 2, FORMAT_XML);
+    assert(buf->content);
+    // First line in the buf is not indented, we must indent it by ourself
+    result = g_malloc(sizeof(char *) * buf->use + INDENT + 1);
+    for (int x = 0; x < INDENT; x++) result[x] = ' ';
+    memcpy((void *) result+INDENT, buf->content, buf->use);
+    result[buf->use + INDENT]   = '\n';
+    result[buf->use + INDENT + 1]   = '\0';
+
+
+    // Cleanup
+
+    xmlBufferFree(buf);
+    xmlFreeNode(root);
+
+    return result;
+}