codec/progressive: Allow the usage of multithreading for decoding
authorPascal Nowack <Pascal.Nowack@gmx.de>
Mon, 24 May 2021 09:28:02 +0000 (11:28 +0200)
committerakallabeth <akallabeth@users.noreply.github.com>
Tue, 25 May 2021 13:30:50 +0000 (15:30 +0200)
While decoding RemoteFX encoded frames is multithreaded, decoding
RemoteFX Progressive frames is not, although both codecs work
relatively similarly.
This is especially noticeable with frames, that have a resolution
larger than 1920x1080 pixels.

decompress_tile_first() and decompress_tile_upgrade() can both run in
different threads at the same time for different tiles without necessary
adjustments.

So, do exactly that using the ThreadPool that already exists in the
RFX_CONTEXT to decrease the decoding time and therefore increase the
performance.
On a 3K display (2880x1620 pixels) this makes out of a choppy
experience a fluid experience.

(cherry picked from commit e79fefe21ccd86aa9db3b5c7cd91313da4bd9dd9)

libfreerdp/codec/progressive.c

index 6cc92dd..08b8199 100644 (file)
@@ -37,6 +37,7 @@
 #include "rfx_quantization.h"
 #include "rfx_dwt.h"
 #include "rfx_rlgr.h"
+#include "rfx_types.h"
 #include "progressive.h"
 
 #define TAG FREERDP_TAG("codec.progressive")
@@ -1621,18 +1622,55 @@ static INLINE BOOL progressive_tile_read(PROGRESSIVE_CONTEXT* progressive, BOOL
        return progressive_surface_tile_replace(surface, region, &tile, FALSE);
 }
 
+struct _PROGRESSIVE_TILE_PROCESS_WORK_PARAM
+{
+       PROGRESSIVE_CONTEXT* progressive;
+       PROGRESSIVE_BLOCK_REGION* region;
+       PROGRESSIVE_BLOCK_CONTEXT* context;
+       RFX_PROGRESSIVE_TILE* tile;
+};
+typedef struct _PROGRESSIVE_TILE_PROCESS_WORK_PARAM PROGRESSIVE_TILE_PROCESS_WORK_PARAM;
+
+static void CALLBACK progressive_process_tiles_tile_work_callback(PTP_CALLBACK_INSTANCE instance,
+                                                                  void* context, PTP_WORK work)
+{
+       PROGRESSIVE_TILE_PROCESS_WORK_PARAM* param = (PROGRESSIVE_TILE_PROCESS_WORK_PARAM*)context;
+
+       switch (param->tile->blockType)
+       {
+               case PROGRESSIVE_WBT_TILE_SIMPLE:
+               case PROGRESSIVE_WBT_TILE_FIRST:
+                       progressive_decompress_tile_first(param->progressive, param->tile, param->region,
+                                                         param->context);
+                       break;
+
+               case PROGRESSIVE_WBT_TILE_UPGRADE:
+                       progressive_decompress_tile_upgrade(param->progressive, param->tile, param->region,
+                                                           param->context);
+                       break;
+               default:
+                       WLog_Print(param->progressive->log, WLOG_ERROR, "Invalid block type %04 (%s)" PRIx16,
+                                  param->tile->blockType,
+                                  progressive_get_block_type_string(param->tile->blockType));
+                       break;
+       }
+}
+
 static INLINE int progressive_process_tiles(PROGRESSIVE_CONTEXT* progressive, wStream* s,
                                             PROGRESSIVE_BLOCK_REGION* region,
                                             PROGRESSIVE_SURFACE_CONTEXT* surface,
                                             const PROGRESSIVE_BLOCK_CONTEXT* context)
 {
-       int status = -1;
+       int status = 0;
        size_t end;
        const size_t start = Stream_GetPosition(s);
        UINT16 index;
        UINT16 blockType;
        UINT32 blockLen;
        UINT32 count = 0;
+       PTP_WORK* work_objects = NULL;
+       PROGRESSIVE_TILE_PROCESS_WORK_PARAM* params = NULL;
+       UINT16 close_cnt = 0;
 
        if (Stream_GetRemainingLength(s) < region->tileDataSize)
        {
@@ -1714,24 +1752,46 @@ static INLINE int progressive_process_tiles(PROGRESSIVE_CONTEXT* progressive, wS
                return -1044;
        }
 
+       if (progressive->rfx_context->priv->UseThreads)
+       {
+               work_objects = (PTP_WORK*)calloc(region->numTiles, sizeof(PTP_WORK));
+               if (!work_objects)
+                       return -1;
+
+               params = (PROGRESSIVE_TILE_PROCESS_WORK_PARAM*)calloc(
+                   region->numTiles, sizeof(PROGRESSIVE_TILE_PROCESS_WORK_PARAM));
+               if (!params)
+               {
+                       free(work_objects);
+                       return -1;
+               }
+       }
+
        for (index = 0; index < region->numTiles; index++)
        {
                RFX_PROGRESSIVE_TILE* tile = region->tiles[index];
+               params[index].progressive = progressive;
+               params[index].region = region;
+               params[index].context = context;
+               params[index].tile = tile;
 
-               switch (tile->blockType)
+               if (progressive->rfx_context->priv->UseThreads)
                {
-                       case PROGRESSIVE_WBT_TILE_SIMPLE:
-                       case PROGRESSIVE_WBT_TILE_FIRST:
-                               status = progressive_decompress_tile_first(progressive, tile, region, context);
+                       if (!(work_objects[index] = CreateThreadpoolWork(
+                                 progressive_process_tiles_tile_work_callback, (void*)&params[index],
+                                 &progressive->rfx_context->priv->ThreadPoolEnv)))
+                       {
+                               WLog_ERR(TAG, "CreateThreadpoolWork failed.");
+                               status = -1;
                                break;
+                       }
 
-                       case PROGRESSIVE_WBT_TILE_UPGRADE:
-                               status = progressive_decompress_tile_upgrade(progressive, tile, region, context);
-                               break;
-                       default:
-                               WLog_Print(progressive->log, WLOG_ERROR, "Invalid block type %04 (%s)" PRIx16,
-                                          tile->blockType, progressive_get_block_type_string(tile->blockType));
-                               return -42;
+                       SubmitThreadpoolWork(work_objects[index]);
+                       close_cnt = index + 1;
+               }
+               else
+               {
+                       progressive_process_tiles_tile_work_callback(0, &params[index], 0);
                }
 
                if (status < 0)
@@ -1742,6 +1802,25 @@ static INLINE int progressive_process_tiles(PROGRESSIVE_CONTEXT* progressive, wS
                }
        }
 
+       if (status < 0)
+               WLog_Print(progressive->log, WLOG_ERROR,
+                          "Failed to create ThreadpoolWork for tile %" PRIu16, index);
+
+       if (progressive->rfx_context->priv->UseThreads)
+       {
+               for (index = 0; index < close_cnt; index++)
+               {
+                       WaitForThreadpoolWorkCallbacks(work_objects[index], FALSE);
+                       CloseThreadpoolWork(work_objects[index]);
+               }
+       }
+
+       free(work_objects);
+       free(params);
+
+       if (status < 0)
+               return -1;
+
        return (int)(end - start);
 }