erofs-utils: mkfs: implement multi-threaded fragments
authorGao Xiang <hsiangkao@linux.alibaba.com>
Sun, 23 Mar 2025 04:34:51 +0000 (12:34 +0800)
committerGao Xiang <hsiangkao@linux.alibaba.com>
Sun, 23 Mar 2025 11:25:37 +0000 (19:25 +0800)
Currently, only `-Eall-fragments` is allowed for multi-threaded
compression.  However, in many cases, we don't want the entire file
merged into the packed inode, as it may impact runtime performance.

Let's implement multi-threaded compression for `-Efragments` now,
although it's still not very fast and need to optimize performance
even further for this option.

Note that the image sizes could be larger without `-Ededupe` compared
to `-Eall-fragments` since the head parts aren't deduplicated for now.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
Link: https://lore.kernel.org/r/20250323043451.2907228-1-hsiangkao@linux.alibaba.com
lib/compress.c
lib/fragments.c
lib/inode.c

index 0b48c06ad63a539bc2e4694c29e0fe1d57b38557..32f58b5f8ffa74a2ca53b5f0a187a315c369b310 100644 (file)
@@ -110,9 +110,10 @@ struct erofs_compress_work {
 };
 
 static struct {
-       struct erofs_workqueue wq;
+       struct erofs_workqueue wq, fwq;
        struct erofs_compress_work *idle;
        pthread_mutex_t mutex;
+       bool hasfwq;
 } z_erofs_mt_ctrl;
 #endif
 
@@ -577,11 +578,11 @@ static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx,
        if (len <= ctx->pclustersize) {
                if (!final || !len)
                        return 1;
-               if (inode->fragment_size && !ictx->fix_dedupedfrag) {
-                       ctx->pclustersize = roundup(len, blksz);
-                       goto fix_dedupedfrag;
-               }
                if (may_packing) {
+                       if (inode->fragment_size && !ictx->fix_dedupedfrag) {
+                               ctx->pclustersize = roundup(len, blksz);
+                               goto fix_dedupedfrag;
+                       }
                        e->length = len;
                        goto frag_packing;
                }
@@ -1056,7 +1057,22 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
                             u64 offset, erofs_blk_t blkaddr)
 {
        struct z_erofs_compress_ictx *ictx = ctx->ictx;
+       struct erofs_inode *inode = ictx->inode;
+       bool frag = cfg.c_fragments && !erofs_is_packed_inode(inode) &&
+               ctx->seg_idx >= ictx->seg_num - 1;
        int fd = ictx->fd;
+       int ret;
+
+       DBG_BUGON(offset != -1 && frag && inode->fragment_size);
+       if (offset != -1 && frag && !inode->fragment_size &&
+           cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
+               ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
+               if (ret < 0)
+                       return ret;
+               if (inode->fragment_size > ctx->remaining)
+                       inode->fragment_size = ctx->remaining;
+               ctx->remaining -= inode->fragment_size;
+       }
 
        ctx->blkaddr = blkaddr;
        while (ctx->remaining) {
@@ -1088,8 +1104,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
        }
 
        /* generate an extra extent for the deduplicated fragment */
-       if (ctx->seg_idx >= ictx->seg_num - 1 &&
-           ictx->inode->fragment_size && !ictx->fragemitted) {
+       if (frag && inode->fragment_size && !ictx->fragemitted) {
                struct z_erofs_extent_item *ei;
 
                ei = malloc(sizeof(*ei));
@@ -1097,7 +1112,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
                        return -ENOMEM;
 
                ei->e = (struct z_erofs_inmem_extent) {
-                       .length = ictx->inode->fragment_size,
+                       .length = inode->fragment_size,
                        .compressedblks = 0,
                        .raw = false,
                        .partial = false,
@@ -1207,6 +1222,8 @@ err_free_idata:
        return ret;
 }
 
+static struct z_erofs_compress_ictx g_ictx;
+
 #ifdef EROFS_MT_ENABLED
 void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
 {
@@ -1354,9 +1371,12 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
        struct erofs_compress_work *cur, *head = NULL, **last = &head;
        struct erofs_compress_cfg *ccfg = ictx->ccfg;
        struct erofs_inode *inode = ictx->inode;
-       int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_mkfs_segment_size);
-       int i;
+       unsigned int segsz = cfg.c_mkfs_segment_size;
+       int nsegs, i;
 
+       nsegs = DIV_ROUND_UP(inode->i_size - inode->fragment_size, segsz);
+       if (!nsegs)
+               nsegs = 1;
        ictx->seg_num = nsegs;
        pthread_mutex_init(&ictx->mutex, NULL);
        pthread_cond_init(&ictx->cond, NULL);
@@ -1385,13 +1405,6 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
                };
                init_list_head(&cur->ctx.extents);
 
-               if (i == nsegs - 1)
-                       cur->ctx.remaining = inode->i_size -
-                                             inode->fragment_size -
-                                             i * cfg.c_mkfs_segment_size;
-               else
-                       cur->ctx.remaining = cfg.c_mkfs_segment_size;
-
                cur->alg_id = ccfg->handle.alg->id;
                cur->alg_name = ccfg->handle.alg->name;
                cur->comp_level = ccfg->handle.compression_level;
@@ -1399,6 +1412,17 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
                cur->errcode = 1;       /* mark as "in progress" */
 
                cur->work.fn = z_erofs_mt_workfn;
+               if (i >= nsegs - 1) {
+                       cur->ctx.remaining = inode->i_size -
+                                       inode->fragment_size - (u64)i * segsz;
+                       if (z_erofs_mt_ctrl.hasfwq) {
+                               erofs_queue_work(&z_erofs_mt_ctrl.fwq,
+                                                &cur->work);
+                               continue;
+                       }
+               } else {
+                       cur->ctx.remaining = segsz;
+               }
                erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
        }
        ictx->mtworks = head;
@@ -1460,14 +1484,53 @@ out:
        free(ictx);
        return ret;
 }
-#endif
 
-static struct z_erofs_compress_ictx g_ictx;
+static int z_erofs_mt_init(void)
+{
+       unsigned int workers = cfg.c_mt_workers;
+       int ret;
+
+       if (workers < 1)
+               return 0;
+       if (workers >= 1 && cfg.c_dedupe) {
+               erofs_warn("multi-threaded dedupe is NOT implemented for now");
+               cfg.c_mt_workers = 0;
+       } else {
+               if (cfg.c_fragments && workers > 1) {
+                       ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.fwq, 1, 32,
+                                                   z_erofs_mt_wq_tls_alloc,
+                                                   z_erofs_mt_wq_tls_free);
+                       if (ret)
+                               return ret;
+                       z_erofs_mt_ctrl.hasfwq = true;
+                       --workers;
+               }
+
+               ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, workers,
+                                           workers << 2,
+                                           z_erofs_mt_wq_tls_alloc,
+                                           z_erofs_mt_wq_tls_free);
+               if (ret)
+                       return ret;
+               z_erofs_mt_enabled = true;
+       }
+       pthread_mutex_init(&g_ictx.mutex, NULL);
+       pthread_cond_init(&g_ictx.cond, NULL);
+       return 0;
+}
+#else
+static int z_erofs_mt_init(void)
+{
+       return 0;
+}
+#endif
 
 void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 {
        struct erofs_sb_info *sbi = inode->sbi;
        struct z_erofs_compress_ictx *ictx;
+       bool all_fragments = cfg.c_all_fragments &&
+                                       !erofs_is_packed_inode(inode);
        int ret;
 
        /* initialize per-file compression setting */
@@ -1502,8 +1565,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
        inode->idata_size = 0;
        inode->fragment_size = 0;
 
-       if (!z_erofs_mt_enabled ||
-           (cfg.c_all_fragments && !erofs_is_packed_inode(inode))) {
+       if (!z_erofs_mt_enabled || all_fragments) {
 #ifdef EROFS_MT_ENABLED
                pthread_mutex_lock(&g_ictx.mutex);
                if (g_ictx.seg_num)
@@ -1529,7 +1591,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
         * parts into the packed inode.
         */
        if (cfg.c_fragments && !erofs_is_packed_inode(inode) &&
-           cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
+           ictx == &g_ictx && cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
                ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
                if (ret < 0)
                        goto err_free_ictx;
@@ -1547,8 +1609,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
        ictx->fix_dedupedfrag = false;
        ictx->fragemitted = false;
 
-       if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
-           !inode->fragment_size) {
+       if (all_fragments && !inode->fragment_size) {
                ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum);
                if (ret)
                        goto err_free_idata;
@@ -1819,30 +1880,7 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
        }
 
        z_erofs_mt_enabled = false;
-#ifdef EROFS_MT_ENABLED
-       if (cfg.c_mt_workers >= 1 && (cfg.c_dedupe ||
-                                     (cfg.c_fragments && !cfg.c_all_fragments))) {
-               if (cfg.c_dedupe)
-                       erofs_warn("multi-threaded dedupe is NOT implemented for now");
-               if (cfg.c_fragments)
-                       erofs_warn("multi-threaded fragments is NOT implemented for now");
-               cfg.c_mt_workers = 0;
-       }
-
-       if (cfg.c_mt_workers >= 1) {
-               ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
-                                           cfg.c_mt_workers,
-                                           cfg.c_mt_workers << 2,
-                                           z_erofs_mt_wq_tls_alloc,
-                                           z_erofs_mt_wq_tls_free);
-               if (ret)
-                       return ret;
-               z_erofs_mt_enabled = true;
-       }
-       pthread_mutex_init(&g_ictx.mutex, NULL);
-       pthread_cond_init(&g_ictx.cond, NULL);
-#endif
-       return 0;
+       return z_erofs_mt_init();
 }
 
 int z_erofs_compress_exit(void)
@@ -1858,6 +1896,9 @@ int z_erofs_compress_exit(void)
        if (z_erofs_mt_enabled) {
 #ifdef EROFS_MT_ENABLED
                ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
+               if (ret)
+                       return ret;
+               ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.fwq);
                if (ret)
                        return ret;
                while (z_erofs_mt_ctrl.idle) {
index fecebb5cd5677f22d72c48b54ac05231c75945c0..41b9912549c1f65597aee1a3a9b8f6a85a73c0ac 100644 (file)
@@ -146,21 +146,13 @@ int z_erofs_fragments_dedupe(struct erofs_inode *inode, int fd, u32 *tofcrc)
        if (inode->i_size <= EROFS_TOF_HASHLEN)
                return 0;
 
-       if (erofs_lseek64(fd, inode->i_size - EROFS_TOF_HASHLEN, SEEK_SET) < 0)
-               return -errno;
-
-       ret = read(fd, data_to_hash, EROFS_TOF_HASHLEN);
+       ret = pread(fd, data_to_hash, EROFS_TOF_HASHLEN,
+                   inode->i_size - EROFS_TOF_HASHLEN);
        if (ret != EROFS_TOF_HASHLEN)
                return -errno;
 
        *tofcrc = erofs_crc32c(~0, data_to_hash, EROFS_TOF_HASHLEN);
-       ret = z_erofs_fragments_dedupe_find(inode, fd, *tofcrc);
-       if (ret < 0)
-               return ret;
-       ret = lseek(fd, 0, SEEK_SET);
-       if (ret < 0)
-               return -errno;
-       return 0;
+       return z_erofs_fragments_dedupe_find(inode, fd, *tofcrc);
 }
 
 static int z_erofs_fragments_dedupe_insert(struct list_head *hash, void *data,
index 8c9a8ec1c8a8c4450d9c1b9aa725a1c111545130..c4edd43d1aedbc24a53e8be53cbb6ca6a230aa6c 100644 (file)
@@ -1312,7 +1312,7 @@ struct erofs_mkfs_dfops {
        bool idle;      /* initialize as false before the dfops worker runs */
 };
 
-#define EROFS_MT_QUEUE_SIZE 128
+#define EROFS_MT_QUEUE_SIZE 256
 
 static void erofs_mkfs_flushjobs(struct erofs_sb_info *sbi)
 {