Revert "Revert "Add Tile-SB-Row based Multi-threading in Decoder""
authorRitu Baldwa <ritu.baldwa@ittiam.com>
Wed, 2 Jan 2019 06:28:34 +0000 (11:58 +0530)
committerRitu Baldwa <ritu.baldwa@ittiam.com>
Sat, 19 Jan 2019 04:50:32 +0000 (10:20 +0530)
This reverts commit 06983668cf41f66765528db044419f954e5a5d64.
Fixes Visual Studio build errors introduced by earlier row mt commit

BUG=webm:1587

Change-Id: I792df86e8254cd6b2a511955b691af619a569cd0

vp9/common/vp9_enums.h
vp9/common/vp9_thread_common.c
vp9/common/vp9_thread_common.h
vp9/decoder/vp9_decodeframe.c
vp9/decoder/vp9_decoder.c
vp9/decoder/vp9_decoder.h
vp9/decoder/vp9_job_queue.c [new file with mode: 0644]
vp9/decoder/vp9_job_queue.h [new file with mode: 0644]
vp9/vp9dx.mk
vpx_util/vpx_thread.h

index bc66553..b33a3a2 100644 (file)
@@ -41,6 +41,8 @@ typedef enum BITSTREAM_PROFILE {
   MAX_PROFILES
 } BITSTREAM_PROFILE;
 
+typedef enum PARSE_RECON_FLAG { PARSE = 1, RECON = 2 } PARSE_RECON_FLAG;
+
 #define BLOCK_4X4 0
 #define BLOCK_4X8 1
 #define BLOCK_8X4 2
index b008ed5..00882a5 100644 (file)
@@ -475,6 +475,12 @@ void vp9_set_row(VP9LfSync *lf_sync, int num_tiles, int row, int is_last_row,
 #endif  // CONFIG_MULTITHREAD
 }
 
+void vp9_loopfilter_job(LFWorkerData *lf_data, VP9LfSync *lf_sync) {
+  thread_loop_filter_rows(lf_data->frame_buffer, lf_data->cm, lf_data->planes,
+                          lf_data->start, lf_data->stop, lf_data->y_only,
+                          lf_sync);
+}
+
 // Accumulate frame counts.
 void vp9_accumulate_frame_counts(FRAME_COUNTS *accum,
                                  const FRAME_COUNTS *counts, int is_dec) {
index b97e9ee..1a2d79a 100644 (file)
@@ -70,7 +70,7 @@ void vp9_loopfilter_rows(LFWorkerData *lf_data, VP9LfSync *lf_sync);
 void vp9_set_row(VP9LfSync *lf_sync, int num_tiles, int row, int is_last_row,
                  int corrupted);
 
-void vp9_set_last_decoded_row(struct VP9Common *cm, int tile_col, int mi_row);
+void vp9_loopfilter_job(LFWorkerData *lf_data, VP9LfSync *lf_sync);
 
 void vp9_accumulate_frame_counts(struct FRAME_COUNTS *accum,
                                  const struct FRAME_COUNTS *counts, int is_dec);
index c9c8505..27e9ca0 100644 (file)
@@ -42,6 +42,7 @@
 #include "vp9/decoder/vp9_decodemv.h"
 #include "vp9/decoder/vp9_decoder.h"
 #include "vp9/decoder/vp9_dsubexp.h"
+#include "vp9/decoder/vp9_job_queue.h"
 
 #define MAX_VP9_HEADER_SIZE 80
 
@@ -1027,7 +1028,6 @@ static void recon_block(TileWorkerData *twd, VP9Decoder *const pbi, int mi_row,
 static void parse_block(TileWorkerData *twd, VP9Decoder *const pbi, int mi_row,
                         int mi_col, BLOCK_SIZE bsize, int bwl, int bhl) {
   VP9_COMMON *const cm = &pbi->common;
-  const int less8x8 = bsize < BLOCK_8X8;
   const int bw = 1 << (bwl - 1);
   const int bh = 1 << (bhl - 1);
   const int x_mis = VPXMIN(bw, cm->mi_cols - mi_col);
@@ -1059,7 +1059,7 @@ static void parse_block(TileWorkerData *twd, VP9Decoder *const pbi, int mi_row,
       const int eobtotal =
           predict_recon_inter(xd, mi, twd, parse_inter_block_row_mt);
 
-      if (!less8x8 && eobtotal == 0) mi->skip = 1;  // skip loopfilter
+      if (bsize >= BLOCK_8X8 && eobtotal == 0) mi->skip = 1;  // skip loopfilter
     }
   }
 
@@ -1172,9 +1172,10 @@ static void decode_partition(TileWorkerData *twd, VP9Decoder *const pbi,
     dec_update_partition_context(twd, mi_row, mi_col, subsize, num_8x8_wh);
 }
 
-static void recon_partition(TileWorkerData *twd, VP9Decoder *const pbi,
-                            int mi_row, int mi_col, BLOCK_SIZE bsize,
-                            int n4x4_l2) {
+static void process_partition(TileWorkerData *twd, VP9Decoder *const pbi,
+                              int mi_row, int mi_col, BLOCK_SIZE bsize,
+                              int n4x4_l2, int parse_recon_flag,
+                              process_block_fn_t process_block) {
   VP9_COMMON *const cm = &pbi->common;
   const int n8x8_l2 = n4x4_l2 - 1;
   const int num_8x8_wh = 1 << n8x8_l2;
@@ -1187,60 +1188,10 @@ static void recon_partition(TileWorkerData *twd, VP9Decoder *const pbi,
 
   if (mi_row >= cm->mi_rows || mi_col >= cm->mi_cols) return;
 
-  partition = *xd->partition;
-  xd->partition++;
-
-  subsize = get_subsize(bsize, partition);
-  if (!hbs) {
-    // calculate bmode block dimensions (log 2)
-    xd->bmode_blocks_wl = 1 >> !!(partition & PARTITION_VERT);
-    xd->bmode_blocks_hl = 1 >> !!(partition & PARTITION_HORZ);
-    recon_block(twd, pbi, mi_row, mi_col, subsize, 1, 1);
-  } else {
-    switch (partition) {
-      case PARTITION_NONE:
-        recon_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n4x4_l2);
-        break;
-      case PARTITION_HORZ:
-        recon_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n8x8_l2);
-        if (has_rows)
-          recon_block(twd, pbi, mi_row + hbs, mi_col, subsize, n4x4_l2,
-                      n8x8_l2);
-        break;
-      case PARTITION_VERT:
-        recon_block(twd, pbi, mi_row, mi_col, subsize, n8x8_l2, n4x4_l2);
-        if (has_cols)
-          recon_block(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2,
-                      n4x4_l2);
-        break;
-      case PARTITION_SPLIT:
-        recon_partition(twd, pbi, mi_row, mi_col, subsize, n8x8_l2);
-        recon_partition(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2);
-        recon_partition(twd, pbi, mi_row + hbs, mi_col, subsize, n8x8_l2);
-        recon_partition(twd, pbi, mi_row + hbs, mi_col + hbs, subsize, n8x8_l2);
-        break;
-      default: assert(0 && "Invalid partition type");
-    }
+  if (parse_recon_flag & PARSE) {
+    *xd->partition =
+        read_partition(twd, mi_row, mi_col, has_rows, has_cols, n8x8_l2);
   }
-}
-
-static void parse_partition(TileWorkerData *twd, VP9Decoder *const pbi,
-                            int mi_row, int mi_col, BLOCK_SIZE bsize,
-                            int n4x4_l2) {
-  VP9_COMMON *const cm = &pbi->common;
-  const int n8x8_l2 = n4x4_l2 - 1;
-  const int num_8x8_wh = 1 << n8x8_l2;
-  const int hbs = num_8x8_wh >> 1;
-  PARTITION_TYPE partition;
-  BLOCK_SIZE subsize;
-  const int has_rows = (mi_row + hbs) < cm->mi_rows;
-  const int has_cols = (mi_col + hbs) < cm->mi_cols;
-  MACROBLOCKD *const xd = &twd->xd;
-
-  if (mi_row >= cm->mi_rows || mi_col >= cm->mi_cols) return;
-
-  *xd->partition =
-      read_partition(twd, mi_row, mi_col, has_rows, has_cols, n8x8_l2);
 
   partition = *xd->partition;
   xd->partition++;
@@ -1250,38 +1201,44 @@ static void parse_partition(TileWorkerData *twd, VP9Decoder *const pbi,
     // calculate bmode block dimensions (log 2)
     xd->bmode_blocks_wl = 1 >> !!(partition & PARTITION_VERT);
     xd->bmode_blocks_hl = 1 >> !!(partition & PARTITION_HORZ);
-    parse_block(twd, pbi, mi_row, mi_col, subsize, 1, 1);
+    process_block(twd, pbi, mi_row, mi_col, subsize, 1, 1);
   } else {
     switch (partition) {
       case PARTITION_NONE:
-        parse_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n4x4_l2);
+        process_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n4x4_l2);
         break;
       case PARTITION_HORZ:
-        parse_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n8x8_l2);
+        process_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n8x8_l2);
         if (has_rows)
-          parse_block(twd, pbi, mi_row + hbs, mi_col, subsize, n4x4_l2,
-                      n8x8_l2);
+          process_block(twd, pbi, mi_row + hbs, mi_col, subsize, n4x4_l2,
+                        n8x8_l2);
         break;
       case PARTITION_VERT:
-        parse_block(twd, pbi, mi_row, mi_col, subsize, n8x8_l2, n4x4_l2);
+        process_block(twd, pbi, mi_row, mi_col, subsize, n8x8_l2, n4x4_l2);
         if (has_cols)
-          parse_block(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2,
-                      n4x4_l2);
+          process_block(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2,
+                        n4x4_l2);
         break;
       case PARTITION_SPLIT:
-        parse_partition(twd, pbi, mi_row, mi_col, subsize, n8x8_l2);
-        parse_partition(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2);
-        parse_partition(twd, pbi, mi_row + hbs, mi_col, subsize, n8x8_l2);
-        parse_partition(twd, pbi, mi_row + hbs, mi_col + hbs, subsize, n8x8_l2);
+        process_partition(twd, pbi, mi_row, mi_col, subsize, n8x8_l2,
+                          parse_recon_flag, process_block);
+        process_partition(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2,
+                          parse_recon_flag, process_block);
+        process_partition(twd, pbi, mi_row + hbs, mi_col, subsize, n8x8_l2,
+                          parse_recon_flag, process_block);
+        process_partition(twd, pbi, mi_row + hbs, mi_col + hbs, subsize,
+                          n8x8_l2, parse_recon_flag, process_block);
         break;
       default: assert(0 && "Invalid partition type");
     }
   }
 
-  // update partition context
-  if (bsize >= BLOCK_8X8 &&
-      (bsize == BLOCK_8X8 || partition != PARTITION_SPLIT))
-    dec_update_partition_context(twd, mi_row, mi_col, subsize, num_8x8_wh);
+  if (parse_recon_flag & PARSE) {
+    // update partition context
+    if ((bsize == BLOCK_8X8 || partition != PARTITION_SPLIT) &&
+        bsize >= BLOCK_8X8)
+      dec_update_partition_context(twd, mi_row, mi_col, subsize, num_8x8_wh);
+  }
 }
 
 static void setup_token_decoder(const uint8_t *data, const uint8_t *data_end,
@@ -1688,6 +1645,307 @@ static void get_tile_buffers(VP9Decoder *pbi, const uint8_t *data,
   }
 }
 
+static void map_write(RowMTWorkerData *row_mt_worker_data, int idx) {
+#if CONFIG_MULTITHREAD
+  pthread_mutex_lock(&row_mt_worker_data->map_mutex);
+  row_mt_worker_data->recon_map[idx] = 1;
+  pthread_mutex_unlock(&row_mt_worker_data->map_mutex);
+#else
+  (void)row_mt_worker_data;
+  (void)idx;
+#endif
+}
+
+static void map_read(RowMTWorkerData *row_mt_worker_data, int idx) {
+#if CONFIG_MULTITHREAD
+  volatile int8_t *map = row_mt_worker_data->recon_map + idx;
+  pthread_mutex_lock(&row_mt_worker_data->map_mutex);
+  // TODO(ritu.baldwa): Replace this with a condition variable
+  while (!*map) {
+    pthread_mutex_unlock(&row_mt_worker_data->map_mutex);
+    sched_yield();
+    pthread_mutex_lock(&row_mt_worker_data->map_mutex);
+  }
+  pthread_mutex_unlock(&row_mt_worker_data->map_mutex);
+#else
+  (void)row_mt_worker_data;
+  (void)idx;
+#endif
+}
+
+static int lpf_map_write_check(VP9LfSync *lf_sync, int row, int num_tile_cols) {
+  int return_val = 0;
+#if CONFIG_MULTITHREAD
+  int corrupted;
+  pthread_mutex_lock(&lf_sync->lf_mutex);
+  corrupted = lf_sync->corrupted;
+  pthread_mutex_unlock(&lf_sync->lf_mutex);
+  if (!corrupted) {
+    pthread_mutex_lock(&lf_sync->recon_done_mutex[row]);
+    lf_sync->num_tiles_done[row] += 1;
+    if (num_tile_cols == lf_sync->num_tiles_done[row]) return_val = 1;
+    pthread_mutex_unlock(&lf_sync->recon_done_mutex[row]);
+  }
+#else
+  (void)lf_sync;
+  (void)row;
+  (void)num_tile_cols;
+#endif
+  return return_val;
+}
+
+static void vp9_tile_done(VP9Decoder *pbi) {
+#if CONFIG_MULTITHREAD
+  int terminate;
+  RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+  const int all_parse_done = 1 << pbi->common.log2_tile_cols;
+  pthread_mutex_lock(&row_mt_worker_data->recon_mutex);
+  row_mt_worker_data->num_tiles_done++;
+  terminate = all_parse_done == row_mt_worker_data->num_tiles_done;
+  pthread_mutex_unlock(&row_mt_worker_data->recon_mutex);
+  if (terminate) {
+    vp9_jobq_terminate(&row_mt_worker_data->jobq);
+  }
+#else
+  (void)pbi;
+#endif
+}
+
+static void vp9_jobq_alloc(VP9Decoder *pbi) {
+  VP9_COMMON *const cm = &pbi->common;
+  RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+  const int aligned_rows = mi_cols_aligned_to_sb(cm->mi_rows);
+  const int sb_rows = aligned_rows >> MI_BLOCK_SIZE_LOG2;
+  const int tile_cols = 1 << cm->log2_tile_cols;
+  const size_t jobq_size = (tile_cols * sb_rows * 2 + sb_rows) * sizeof(Job);
+
+  if (jobq_size > row_mt_worker_data->jobq_size) {
+    vpx_free(row_mt_worker_data->jobq_buf);
+    CHECK_MEM_ERROR(cm, row_mt_worker_data->jobq_buf, vpx_calloc(1, jobq_size));
+    vp9_jobq_init(&row_mt_worker_data->jobq, row_mt_worker_data->jobq_buf,
+                  jobq_size);
+    row_mt_worker_data->jobq_size = jobq_size;
+  }
+}
+
+static void recon_tile_row(TileWorkerData *tile_data, VP9Decoder *pbi,
+                           int mi_row, int is_last_row, VP9LfSync *lf_sync) {
+  VP9_COMMON *const cm = &pbi->common;
+  RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+  const int tile_cols = 1 << cm->log2_tile_cols;
+  const int aligned_cols = mi_cols_aligned_to_sb(cm->mi_cols);
+  const int sb_cols = aligned_cols >> MI_BLOCK_SIZE_LOG2;
+  const int cur_sb_row = mi_row >> MI_BLOCK_SIZE_LOG2;
+  int mi_col_start = tile_data->xd.tile.mi_col_start;
+  int mi_col_end = tile_data->xd.tile.mi_col_end;
+  int mi_col;
+
+  vp9_zero(tile_data->xd.left_context);
+  vp9_zero(tile_data->xd.left_seg_context);
+  for (mi_col = mi_col_start; mi_col < mi_col_end; mi_col += MI_BLOCK_SIZE) {
+    const int c = mi_col >> MI_BLOCK_SIZE_LOG2;
+    int plane;
+    const int sb_num = (cur_sb_row * (aligned_cols >> MI_BLOCK_SIZE_LOG2) + c);
+
+    // Top Dependency
+    if (cur_sb_row) {
+      map_read(row_mt_worker_data, ((cur_sb_row - 1) * sb_cols) + c);
+    }
+
+    for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
+      tile_data->xd.plane[plane].eob =
+          row_mt_worker_data->eob[plane] + (sb_num << EOBS_PER_SB_LOG2);
+      tile_data->xd.plane[plane].dqcoeff =
+          row_mt_worker_data->dqcoeff[plane] + (sb_num << DQCOEFFS_PER_SB_LOG2);
+    }
+    tile_data->xd.partition =
+        row_mt_worker_data->partition + (sb_num * PARTITIONS_PER_SB);
+    process_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4, RECON,
+                      recon_block);
+    if (cm->lf.filter_level && !cm->skip_loop_filter) {
+      // Queue LPF_JOB
+      int is_lpf_job_ready = 0;
+
+      if (mi_col + MI_BLOCK_SIZE >= mi_col_end) {
+        // Checks if this row has been decoded in all tiles
+        is_lpf_job_ready = lpf_map_write_check(lf_sync, cur_sb_row, tile_cols);
+
+        if (is_lpf_job_ready) {
+          Job lpf_job;
+          lpf_job.job_type = LPF_JOB;
+          if (cur_sb_row > 0) {
+            lpf_job.row_num = mi_row - MI_BLOCK_SIZE;
+            vp9_jobq_queue(&row_mt_worker_data->jobq, &lpf_job,
+                           sizeof(lpf_job));
+          }
+          if (is_last_row) {
+            lpf_job.row_num = mi_row;
+            vp9_jobq_queue(&row_mt_worker_data->jobq, &lpf_job,
+                           sizeof(lpf_job));
+          }
+        }
+      }
+    }
+    map_write(row_mt_worker_data, (cur_sb_row * sb_cols) + c);
+  }
+}
+
+static void parse_tile_row(TileWorkerData *tile_data, VP9Decoder *pbi,
+                           int mi_row, int cur_tile_col, uint8_t **data_end) {
+  int mi_col;
+  VP9_COMMON *const cm = &pbi->common;
+  RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+  TileInfo *tile = &tile_data->xd.tile;
+  TileBuffer *const buf = &pbi->tile_buffers[cur_tile_col];
+  const int aligned_cols = mi_cols_aligned_to_sb(cm->mi_cols);
+
+  vp9_zero(tile_data->dqcoeff);
+  vp9_tile_init(tile, cm, 0, cur_tile_col);
+
+  /* Update reader only at the beginning of each row in a tile */
+  if (mi_row == 0) {
+    setup_token_decoder(buf->data, *data_end, buf->size, &tile_data->error_info,
+                        &tile_data->bit_reader, pbi->decrypt_cb,
+                        pbi->decrypt_state);
+  }
+  vp9_init_macroblockd(cm, &tile_data->xd, tile_data->dqcoeff);
+  tile_data->xd.error_info = &tile_data->error_info;
+
+  vp9_zero(tile_data->xd.left_context);
+  vp9_zero(tile_data->xd.left_seg_context);
+  for (mi_col = tile->mi_col_start; mi_col < tile->mi_col_end;
+       mi_col += MI_BLOCK_SIZE) {
+    const int r = mi_row >> MI_BLOCK_SIZE_LOG2;
+    const int c = mi_col >> MI_BLOCK_SIZE_LOG2;
+    int plane;
+    const int sb_num = (r * (aligned_cols >> MI_BLOCK_SIZE_LOG2) + c);
+    for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
+      tile_data->xd.plane[plane].eob =
+          row_mt_worker_data->eob[plane] + (sb_num << EOBS_PER_SB_LOG2);
+      tile_data->xd.plane[plane].dqcoeff =
+          row_mt_worker_data->dqcoeff[plane] + (sb_num << DQCOEFFS_PER_SB_LOG2);
+    }
+    tile_data->xd.partition =
+        row_mt_worker_data->partition + sb_num * PARTITIONS_PER_SB;
+    process_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4, PARSE,
+                      parse_block);
+  }
+}
+
+static int row_decode_worker_hook(ThreadData *const thread_data,
+                                  uint8_t **data_end) {
+  VP9Decoder *const pbi = thread_data->pbi;
+  VP9_COMMON *const cm = &pbi->common;
+  RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+  const int aligned_cols = mi_cols_aligned_to_sb(cm->mi_cols);
+  const int aligned_rows = mi_cols_aligned_to_sb(cm->mi_rows);
+  const int sb_rows = aligned_rows >> MI_BLOCK_SIZE_LOG2;
+  Job job;
+  LFWorkerData *lf_data = thread_data->lf_data;
+  VP9LfSync *lf_sync = thread_data->lf_sync;
+  volatile int corrupted = 0;
+
+  while (!vp9_jobq_dequeue(&row_mt_worker_data->jobq, &job, sizeof(job), 1)) {
+    int mi_col;
+    const int mi_row = job.row_num;
+
+    if (job.job_type == LPF_JOB) {
+      lf_data->start = mi_row;
+      lf_data->stop = lf_data->start + MI_BLOCK_SIZE;
+
+      if (cm->lf.filter_level && !cm->skip_loop_filter &&
+          mi_row < cm->mi_rows) {
+        vp9_loopfilter_job(lf_data, lf_sync);
+      }
+    } else if (job.job_type == RECON_JOB) {
+      const int cur_sb_row = mi_row >> MI_BLOCK_SIZE_LOG2;
+      const int is_last_row = sb_rows - 1 == cur_sb_row;
+      TileWorkerData twd_recon;
+      TileWorkerData *const tile_data_recon = &twd_recon;
+      int mi_col_start, mi_col_end;
+
+      tile_data_recon->xd = pbi->mb;
+      vp9_tile_init(&tile_data_recon->xd.tile, cm, 0, job.tile_col);
+      vp9_init_macroblockd(cm, &tile_data_recon->xd, tile_data_recon->dqcoeff);
+      mi_col_start = tile_data_recon->xd.tile.mi_col_start;
+      mi_col_end = tile_data_recon->xd.tile.mi_col_end;
+
+      if (setjmp(tile_data_recon->error_info.jmp)) {
+        const int sb_cols = aligned_cols >> MI_BLOCK_SIZE_LOG2;
+        tile_data_recon->error_info.setjmp = 0;
+        corrupted = 1;
+        for (mi_col = mi_col_start; mi_col < mi_col_end;
+             mi_col += MI_BLOCK_SIZE) {
+          const int c = mi_col >> MI_BLOCK_SIZE_LOG2;
+          map_write(row_mt_worker_data, (cur_sb_row * sb_cols) + c);
+        }
+        if (is_last_row) {
+          vp9_tile_done(pbi);
+        }
+        continue;
+      }
+
+      tile_data_recon->error_info.setjmp = 1;
+      tile_data_recon->xd.error_info = &tile_data_recon->error_info;
+
+      recon_tile_row(tile_data_recon, pbi, mi_row, is_last_row, lf_sync);
+
+      if (corrupted)
+        vpx_internal_error(&tile_data_recon->error_info,
+                           VPX_CODEC_CORRUPT_FRAME,
+                           "Failed to decode tile data");
+
+      if (is_last_row) {
+        vp9_tile_done(pbi);
+      }
+    } else if (job.job_type == PARSE_JOB) {
+      TileWorkerData *const tile_data = &pbi->tile_worker_data[job.tile_col];
+
+      if (setjmp(tile_data->error_info.jmp)) {
+        tile_data->error_info.setjmp = 0;
+        corrupted = 1;
+        vp9_tile_done(pbi);
+        continue;
+      }
+
+      tile_data->xd = pbi->mb;
+      tile_data->xd.counts =
+          cm->frame_parallel_decoding_mode ? 0 : &tile_data->counts;
+
+      tile_data->error_info.setjmp = 1;
+
+      parse_tile_row(tile_data, pbi, mi_row, job.tile_col, data_end);
+
+      corrupted |= tile_data->xd.corrupted;
+      if (corrupted)
+        vpx_internal_error(&tile_data->error_info, VPX_CODEC_CORRUPT_FRAME,
+                           "Failed to decode tile data");
+
+      /* Queue in the recon_job for this row */
+      {
+        Job recon_job;
+        recon_job.row_num = mi_row;
+        recon_job.tile_col = job.tile_col;
+        recon_job.job_type = RECON_JOB;
+        vp9_jobq_queue(&row_mt_worker_data->jobq, &recon_job,
+                       sizeof(recon_job));
+      }
+
+      /* Queue next parse job */
+      if (mi_row + MI_BLOCK_SIZE < cm->mi_rows) {
+        Job parse_job;
+        parse_job.row_num = mi_row + MI_BLOCK_SIZE;
+        parse_job.tile_col = job.tile_col;
+        parse_job.job_type = PARSE_JOB;
+        vp9_jobq_queue(&row_mt_worker_data->jobq, &parse_job,
+                       sizeof(parse_job));
+      }
+    }
+  }
+
+  return !corrupted;
+}
+
 static const uint8_t *decode_tiles(VP9Decoder *pbi, const uint8_t *data,
                                    const uint8_t *data_end) {
   VP9_COMMON *const cm = &pbi->common;
@@ -1775,7 +2033,8 @@ static const uint8_t *decode_tiles(VP9Decoder *pbi, const uint8_t *data,
                   row_mt_worker_data->dqcoeff[plane];
             }
             tile_data->xd.partition = row_mt_worker_data->partition;
-            parse_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4);
+            process_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4,
+                              PARSE, parse_block);
 
             for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
               tile_data->xd.plane[plane].eob = row_mt_worker_data->eob[plane];
@@ -1783,7 +2042,8 @@ static const uint8_t *decode_tiles(VP9Decoder *pbi, const uint8_t *data,
                   row_mt_worker_data->dqcoeff[plane];
             }
             tile_data->xd.partition = row_mt_worker_data->partition;
-            recon_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4);
+            process_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4,
+                              RECON, recon_block);
           } else {
             decode_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4);
           }
@@ -1951,22 +2211,12 @@ static int compare_tile_buffers(const void *a, const void *b) {
   return (buf_a->size < buf_b->size) - (buf_a->size > buf_b->size);
 }
 
-static const uint8_t *decode_tiles_mt(VP9Decoder *pbi, const uint8_t *data,
-                                      const uint8_t *data_end) {
+static INLINE void init_mt(VP9Decoder *pbi) {
+  int n;
   VP9_COMMON *const cm = &pbi->common;
-  const VPxWorkerInterface *const winterface = vpx_get_worker_interface();
-  const uint8_t *bit_reader_end = NULL;
   VP9LfSync *lf_row_sync = &pbi->lf_row_sync;
-  YV12_BUFFER_CONFIG *const new_fb = get_frame_new_buffer(cm);
   const int aligned_mi_cols = mi_cols_aligned_to_sb(cm->mi_cols);
-  const int tile_cols = 1 << cm->log2_tile_cols;
-  const int tile_rows = 1 << cm->log2_tile_rows;
-  const int num_workers = VPXMIN(pbi->max_threads, tile_cols);
-  int n;
-
-  assert(tile_cols <= (1 << 6));
-  assert(tile_rows == 1);
-  (void)tile_rows;
+  const VPxWorkerInterface *const winterface = vpx_get_worker_interface();
 
   if (pbi->num_tile_workers == 0) {
     const int num_threads = pbi->max_threads;
@@ -1985,11 +2235,160 @@ static const uint8_t *decode_tiles_mt(VP9Decoder *pbi, const uint8_t *data,
   }
 
   // Initialize LPF
-  if (pbi->lpf_mt_opt && cm->lf.filter_level && !cm->skip_loop_filter) {
+  if ((pbi->lpf_mt_opt || pbi->row_mt) && cm->lf.filter_level &&
+      !cm->skip_loop_filter) {
     vp9_lpf_mt_init(lf_row_sync, cm, cm->lf.filter_level,
                     pbi->num_tile_workers);
   }
 
+  // Note: this memset assumes above_context[0], [1] and [2]
+  // are allocated as part of the same buffer.
+  memset(cm->above_context, 0,
+         sizeof(*cm->above_context) * MAX_MB_PLANE * 2 * aligned_mi_cols);
+
+  memset(cm->above_seg_context, 0,
+         sizeof(*cm->above_seg_context) * aligned_mi_cols);
+
+  vp9_reset_lfm(cm);
+}
+
+static const uint8_t *decode_tiles_row_wise_mt(VP9Decoder *pbi,
+                                               const uint8_t *data,
+                                               const uint8_t *data_end) {
+  VP9_COMMON *const cm = &pbi->common;
+  RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+  const VPxWorkerInterface *const winterface = vpx_get_worker_interface();
+  const int tile_cols = 1 << cm->log2_tile_cols;
+  const int tile_rows = 1 << cm->log2_tile_rows;
+  const int num_workers = pbi->max_threads;
+  int i, n;
+  int col;
+  int corrupted = 0;
+  const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
+  const int sb_cols = mi_cols_aligned_to_sb(cm->mi_cols) >> MI_BLOCK_SIZE_LOG2;
+  VP9LfSync *lf_row_sync = &pbi->lf_row_sync;
+  YV12_BUFFER_CONFIG *const new_fb = get_frame_new_buffer(cm);
+
+  assert(tile_cols <= (1 << 6));
+  assert(tile_rows == 1);
+  (void)tile_rows;
+
+  memset(row_mt_worker_data->recon_map, 0,
+         sb_rows * sb_cols * sizeof(*row_mt_worker_data->recon_map));
+
+  init_mt(pbi);
+
+  // Reset tile decoding hook
+  for (n = 0; n < num_workers; ++n) {
+    VPxWorker *const worker = &pbi->tile_workers[n];
+    ThreadData *const thread_data = &pbi->row_mt_worker_data->thread_data[n];
+    winterface->sync(worker);
+
+    if (cm->lf.filter_level && !cm->skip_loop_filter) {
+      thread_data->lf_sync = lf_row_sync;
+      thread_data->lf_data = &thread_data->lf_sync->lfdata[n];
+      vp9_loop_filter_data_reset(thread_data->lf_data, new_fb, cm,
+                                 pbi->mb.plane);
+    }
+
+    thread_data->pbi = pbi;
+
+    worker->hook = (VPxWorkerHook)row_decode_worker_hook;
+    worker->data1 = thread_data;
+    worker->data2 = (void *)&row_mt_worker_data->data_end;
+  }
+
+  for (col = 0; col < tile_cols; ++col) {
+    TileWorkerData *const tile_data = &pbi->tile_worker_data[col];
+    tile_data->xd = pbi->mb;
+    tile_data->xd.counts =
+        cm->frame_parallel_decoding_mode ? NULL : &tile_data->counts;
+  }
+
+  /* Reset the jobq to start of the jobq buffer */
+  vp9_jobq_reset(&row_mt_worker_data->jobq);
+  row_mt_worker_data->num_tiles_done = 0;
+  row_mt_worker_data->data_end = NULL;
+
+  // Load tile data into tile_buffers
+  get_tile_buffers(pbi, data, data_end, tile_cols, tile_rows,
+                   &pbi->tile_buffers);
+
+  // Initialize thread frame counts.
+  if (!cm->frame_parallel_decoding_mode) {
+    for (col = 0; col < tile_cols; ++col) {
+      TileWorkerData *const tile_data =
+          (TileWorkerData *)&pbi->tile_worker_data[col];
+      vp9_zero(tile_data->counts);
+    }
+  }
+
+  // queue parse jobs for 0th row of every tile
+  for (col = 0; col < tile_cols; ++col) {
+    Job parse_job;
+    parse_job.row_num = 0;
+    parse_job.tile_col = col;
+    parse_job.job_type = PARSE_JOB;
+    vp9_jobq_queue(&row_mt_worker_data->jobq, &parse_job, sizeof(parse_job));
+  }
+
+  for (i = 0; i < num_workers; ++i) {
+    VPxWorker *const worker = &pbi->tile_workers[i];
+    worker->had_error = 0;
+    if (i == num_workers - 1) {
+      winterface->execute(worker);
+    } else {
+      winterface->launch(worker);
+    }
+  }
+
+  for (; n > 0; --n) {
+    VPxWorker *const worker = &pbi->tile_workers[n - 1];
+    // TODO(jzern): The tile may have specific error data associated with
+    // its vpx_internal_error_info which could be propagated to the main info
+    // in cm. Additionally once the threads have been synced and an error is
+    // detected, there's no point in continuing to decode tiles.
+    corrupted |= !winterface->sync(worker);
+  }
+
+  pbi->mb.corrupted = corrupted;
+
+  {
+    /* Set data end */
+    TileWorkerData *const tile_data = &pbi->tile_worker_data[tile_cols - 1];
+    row_mt_worker_data->data_end = vpx_reader_find_end(&tile_data->bit_reader);
+  }
+
+  // Accumulate thread frame counts.
+  if (!cm->frame_parallel_decoding_mode) {
+    for (i = 0; i < tile_cols; ++i) {
+      TileWorkerData *const tile_data =
+          (TileWorkerData *)&pbi->tile_worker_data[i];
+      vp9_accumulate_frame_counts(&cm->counts, &tile_data->counts, 1);
+    }
+  }
+
+  return row_mt_worker_data->data_end;
+}
+
+static const uint8_t *decode_tiles_mt(VP9Decoder *pbi, const uint8_t *data,
+                                      const uint8_t *data_end) {
+  VP9_COMMON *const cm = &pbi->common;
+  const VPxWorkerInterface *const winterface = vpx_get_worker_interface();
+  const uint8_t *bit_reader_end = NULL;
+  VP9LfSync *lf_row_sync = &pbi->lf_row_sync;
+  YV12_BUFFER_CONFIG *const new_fb = get_frame_new_buffer(cm);
+  const int tile_cols = 1 << cm->log2_tile_cols;
+  const int tile_rows = 1 << cm->log2_tile_rows;
+  const int num_workers = VPXMIN(pbi->max_threads, tile_cols);
+  int n;
+
+  assert(tile_cols <= (1 << 6));
+  assert(tile_rows == 1);
+  (void)tile_rows;
+
+  init_mt(pbi);
+
   // Reset tile decoding hook
   for (n = 0; n < num_workers; ++n) {
     VPxWorker *const worker = &pbi->tile_workers[n];
@@ -2012,15 +2411,6 @@ static const uint8_t *decode_tiles_mt(VP9Decoder *pbi, const uint8_t *data,
     worker->data2 = pbi;
   }
 
-  // Note: this memset assumes above_context[0], [1] and [2]
-  // are allocated as part of the same buffer.
-  memset(cm->above_context, 0,
-         sizeof(*cm->above_context) * MAX_MB_PLANE * 2 * aligned_mi_cols);
-  memset(cm->above_seg_context, 0,
-         sizeof(*cm->above_seg_context) * aligned_mi_cols);
-
-  vp9_reset_lfm(cm);
-
   // Load tile data into tile_buffers
   get_tile_buffers(pbi, data, data_end, tile_cols, tile_rows,
                    &pbi->tile_buffers);
@@ -2370,6 +2760,10 @@ static size_t read_uncompressed_header(VP9Decoder *pbi,
     if (pbi->row_mt_worker_data == NULL) {
       CHECK_MEM_ERROR(cm, pbi->row_mt_worker_data,
                       vpx_calloc(1, sizeof(*pbi->row_mt_worker_data)));
+#if CONFIG_MULTITHREAD
+      pthread_mutex_init(&pbi->row_mt_worker_data->recon_mutex, NULL);
+      pthread_mutex_init(&pbi->row_mt_worker_data->map_mutex, NULL);
+#endif
     }
 
     if (pbi->max_threads > 1) {
@@ -2383,8 +2777,10 @@ static size_t read_uncompressed_header(VP9Decoder *pbi,
 
     if (num_sbs > pbi->row_mt_worker_data->num_sbs) {
       vp9_dec_free_row_mt_mem(pbi->row_mt_worker_data);
-      vp9_dec_alloc_row_mt_mem(pbi->row_mt_worker_data, cm, num_sbs);
+      vp9_dec_alloc_row_mt_mem(pbi->row_mt_worker_data, cm, num_sbs,
+                               pbi->max_threads);
     }
+    vp9_jobq_alloc(pbi);
   }
   sz = vpx_rb_read_literal(rb, 16);
 
@@ -2544,21 +2940,27 @@ void vp9_decode_frame(VP9Decoder *pbi, const uint8_t *data,
     pbi->total_tiles = tile_rows * tile_cols;
   }
 
-  if (pbi->max_threads > 1 && tile_rows == 1 && tile_cols > 1) {
-    // Multi-threaded tile decoder
-    *p_data_end = decode_tiles_mt(pbi, data + first_partition_size, data_end);
-    if (!pbi->lpf_mt_opt) {
-      if (!xd->corrupted) {
-        if (!cm->skip_loop_filter) {
-          // If multiple threads are used to decode tiles, then we use those
-          // threads to do parallel loopfiltering.
-          vp9_loop_filter_frame_mt(new_fb, cm, pbi->mb.plane,
-                                   cm->lf.filter_level, 0, 0, pbi->tile_workers,
-                                   pbi->num_tile_workers, &pbi->lf_row_sync);
+  if (pbi->max_threads > 1 && tile_rows == 1 &&
+      (tile_cols > 1 || pbi->row_mt == 1)) {
+    if (pbi->row_mt == 1) {
+      *p_data_end =
+          decode_tiles_row_wise_mt(pbi, data + first_partition_size, data_end);
+    } else {
+      // Multi-threaded tile decoder
+      *p_data_end = decode_tiles_mt(pbi, data + first_partition_size, data_end);
+      if (!pbi->lpf_mt_opt) {
+        if (!xd->corrupted) {
+          if (!cm->skip_loop_filter) {
+            // If multiple threads are used to decode tiles, then we use those
+            // threads to do parallel loopfiltering.
+            vp9_loop_filter_frame_mt(
+                new_fb, cm, pbi->mb.plane, cm->lf.filter_level, 0, 0,
+                pbi->tile_workers, pbi->num_tile_workers, &pbi->lf_row_sync);
+          }
+        } else {
+          vpx_internal_error(&cm->error, VPX_CODEC_CORRUPT_FRAME,
+                             "Decode failed. Frame data is corrupted.");
         }
-      } else {
-        vpx_internal_error(&cm->error, VPX_CODEC_CORRUPT_FRAME,
-                           "Decode failed. Frame data is corrupted.");
       }
     }
   } else {
index 7fde0b0..cd37501 100644 (file)
@@ -56,7 +56,7 @@ static void vp9_dec_setup_mi(VP9_COMMON *cm) {
 }
 
 void vp9_dec_alloc_row_mt_mem(RowMTWorkerData *row_mt_worker_data,
-                              VP9_COMMON *cm, int num_sbs) {
+                              VP9_COMMON *cm, int num_sbs, int max_threads) {
   int plane;
   const size_t dqcoeff_size = (num_sbs << DQCOEFFS_PER_SB_LOG2) *
                               sizeof(*row_mt_worker_data->dqcoeff[0]);
@@ -74,6 +74,14 @@ void vp9_dec_alloc_row_mt_mem(RowMTWorkerData *row_mt_worker_data,
                              sizeof(*row_mt_worker_data->partition)));
   CHECK_MEM_ERROR(cm, row_mt_worker_data->recon_map,
                   vpx_calloc(num_sbs, sizeof(*row_mt_worker_data->recon_map)));
+
+  // allocate memory for thread_data
+  if (row_mt_worker_data->thread_data == NULL) {
+    const size_t thread_size =
+        max_threads * sizeof(*row_mt_worker_data->thread_data);
+    CHECK_MEM_ERROR(cm, row_mt_worker_data->thread_data,
+                    vpx_memalign(32, thread_size));
+  }
 }
 
 void vp9_dec_free_row_mt_mem(RowMTWorkerData *row_mt_worker_data) {
@@ -89,6 +97,8 @@ void vp9_dec_free_row_mt_mem(RowMTWorkerData *row_mt_worker_data) {
     row_mt_worker_data->partition = NULL;
     vpx_free(row_mt_worker_data->recon_map);
     row_mt_worker_data->recon_map = NULL;
+    vpx_free(row_mt_worker_data->thread_data);
+    row_mt_worker_data->thread_data = NULL;
   }
 }
 
@@ -179,8 +189,17 @@ void vp9_decoder_remove(VP9Decoder *pbi) {
 
   if (pbi->row_mt == 1) {
     vp9_dec_free_row_mt_mem(pbi->row_mt_worker_data);
+    if (pbi->row_mt_worker_data != NULL) {
+      vp9_jobq_deinit(&pbi->row_mt_worker_data->jobq);
+      vpx_free(pbi->row_mt_worker_data->jobq_buf);
+#if CONFIG_MULTITHREAD
+      pthread_mutex_destroy(&pbi->row_mt_worker_data->recon_mutex);
+      pthread_mutex_destroy(&pbi->row_mt_worker_data->map_mutex);
+#endif
+    }
     vpx_free(pbi->row_mt_worker_data);
   }
+
   vp9_remove_common(&pbi->common);
   vpx_free(pbi);
 }
index 9a582ff..ad39bc0 100644 (file)
@@ -21,6 +21,7 @@
 #include "vp9/common/vp9_thread_common.h"
 #include "vp9/common/vp9_onyxc_int.h"
 #include "vp9/common/vp9_ppflags.h"
+#include "./vp9_job_queue.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -30,6 +31,14 @@ extern "C" {
 #define DQCOEFFS_PER_SB_LOG2 12
 #define PARTITIONS_PER_SB 85
 
+typedef enum JobType { PARSE_JOB, RECON_JOB, LPF_JOB } JobType;
+
+typedef struct ThreadData {
+  struct VP9Decoder *pbi;
+  LFWorkerData *lf_data;
+  VP9LfSync *lf_sync;
+} ThreadData;
+
 typedef struct TileBuffer {
   const uint8_t *data;
   size_t size;
@@ -49,14 +58,36 @@ typedef struct TileWorkerData {
   struct vpx_internal_error_info error_info;
 } TileWorkerData;
 
+typedef void (*process_block_fn_t)(TileWorkerData *twd,
+                                   struct VP9Decoder *const pbi, int mi_row,
+                                   int mi_col, BLOCK_SIZE bsize, int bwl,
+                                   int bhl);
+
 typedef struct RowMTWorkerData {
   int num_sbs;
   int *eob[MAX_MB_PLANE];
   PARTITION_TYPE *partition;
   tran_low_t *dqcoeff[MAX_MB_PLANE];
   int8_t *recon_map;
+  const uint8_t *data_end;
+  uint8_t *jobq_buf;
+  JobQueueRowMt jobq;
+  size_t jobq_size;
+  int num_tiles_done;
+#if CONFIG_MULTITHREAD
+  pthread_mutex_t recon_mutex;
+  pthread_mutex_t map_mutex;
+#endif
+  ThreadData *thread_data;
 } RowMTWorkerData;
 
+/* Structure to queue and dequeue row decode jobs */
+typedef struct Job {
+  int row_num;
+  int tile_col;
+  JobType job_type;
+} Job;
+
 typedef struct VP9Decoder {
   DECLARE_ALIGNED(16, MACROBLOCKD, mb);
 
@@ -128,7 +159,7 @@ struct VP9Decoder *vp9_decoder_create(BufferPool *const pool);
 void vp9_decoder_remove(struct VP9Decoder *pbi);
 
 void vp9_dec_alloc_row_mt_mem(RowMTWorkerData *row_mt_worker_data,
-                              VP9_COMMON *cm, int num_sbs);
+                              VP9_COMMON *cm, int num_sbs, int max_threads);
 void vp9_dec_free_row_mt_mem(RowMTWorkerData *row_mt_worker_data);
 
 static INLINE void decrease_ref_count(int idx, RefCntBuffer *const frame_bufs,
diff --git a/vp9/decoder/vp9_job_queue.c b/vp9/decoder/vp9_job_queue.c
new file mode 100644 (file)
index 0000000..9a31f5a
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ *  Copyright (c) 2018 The WebM project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <assert.h>
+#include <string.h>
+
+#include "vpx/vpx_integer.h"
+
+#include "vp9/decoder/vp9_job_queue.h"
+
+void vp9_jobq_init(JobQueueRowMt *jobq, uint8_t *buf, size_t buf_size) {
+#if CONFIG_MULTITHREAD
+  pthread_mutex_init(&jobq->mutex, NULL);
+  pthread_cond_init(&jobq->cond, NULL);
+#endif
+  jobq->buf_base = buf;
+  jobq->buf_wr = buf;
+  jobq->buf_rd = buf;
+  jobq->buf_end = buf + buf_size;
+  jobq->terminate = 0;
+}
+
+void vp9_jobq_reset(JobQueueRowMt *jobq) {
+#if CONFIG_MULTITHREAD
+  pthread_mutex_lock(&jobq->mutex);
+#endif
+  jobq->buf_wr = jobq->buf_base;
+  jobq->buf_rd = jobq->buf_base;
+  jobq->terminate = 0;
+#if CONFIG_MULTITHREAD
+  pthread_mutex_unlock(&jobq->mutex);
+#endif
+}
+
+void vp9_jobq_deinit(JobQueueRowMt *jobq) {
+  vp9_jobq_reset(jobq);
+#if CONFIG_MULTITHREAD
+  pthread_mutex_destroy(&jobq->mutex);
+  pthread_cond_destroy(&jobq->cond);
+#endif
+}
+
+void vp9_jobq_terminate(JobQueueRowMt *jobq) {
+#if CONFIG_MULTITHREAD
+  pthread_mutex_lock(&jobq->mutex);
+#endif
+  jobq->terminate = 1;
+#if CONFIG_MULTITHREAD
+  pthread_cond_broadcast(&jobq->cond);
+  pthread_mutex_unlock(&jobq->mutex);
+#endif
+}
+
+int vp9_jobq_queue(JobQueueRowMt *jobq, void *job, size_t job_size) {
+  int ret = 0;
+#if CONFIG_MULTITHREAD
+  pthread_mutex_lock(&jobq->mutex);
+#endif
+  if (jobq->buf_end >= jobq->buf_wr + job_size) {
+    memcpy(jobq->buf_wr, job, job_size);
+    jobq->buf_wr = jobq->buf_wr + job_size;
+#if CONFIG_MULTITHREAD
+    pthread_cond_signal(&jobq->cond);
+#endif
+    ret = 0;
+  } else {
+    /* Wrap around case is not supported */
+    assert(0);
+    ret = 1;
+  }
+#if CONFIG_MULTITHREAD
+  pthread_mutex_unlock(&jobq->mutex);
+#endif
+  return ret;
+}
+
+int vp9_jobq_dequeue(JobQueueRowMt *jobq, void *job, size_t job_size,
+                     int blocking) {
+  int ret = 0;
+#if CONFIG_MULTITHREAD
+  pthread_mutex_lock(&jobq->mutex);
+#endif
+  if (jobq->buf_end >= jobq->buf_rd + job_size) {
+    while (1) {
+      if (jobq->buf_wr >= jobq->buf_rd + job_size) {
+        memcpy(job, jobq->buf_rd, job_size);
+        jobq->buf_rd = jobq->buf_rd + job_size;
+        ret = 0;
+        break;
+      } else {
+        /* If all the entries have been dequeued, then break and return */
+        if (jobq->terminate == 1) {
+          ret = 1;
+          break;
+        }
+        if (blocking == 1) {
+#if CONFIG_MULTITHREAD
+          pthread_cond_wait(&jobq->cond, &jobq->mutex);
+#endif
+        } else {
+          /* If there is no job available,
+           * and this is non blocking call then return fail */
+          ret = 1;
+          break;
+        }
+      }
+    }
+  } else {
+    /* Wrap around case is not supported */
+    ret = 1;
+  }
+#if CONFIG_MULTITHREAD
+  pthread_mutex_unlock(&jobq->mutex);
+#endif
+
+  return ret;
+}
diff --git a/vp9/decoder/vp9_job_queue.h b/vp9/decoder/vp9_job_queue.h
new file mode 100644 (file)
index 0000000..bc23bf9
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ *  Copyright (c) 2018 The WebM project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef VPX_VP9_DECODER_VP9_JOB_QUEUE_H_
+#define VPX_VP9_DECODER_VP9_JOB_QUEUE_H_
+
+#include "vpx_util/vpx_thread.h"
+
+typedef struct {
+  // Pointer to buffer base which contains the jobs
+  uint8_t *buf_base;
+
+  // Pointer to current address where new job can be added
+  uint8_t *volatile buf_wr;
+
+  // Pointer to current address from where next job can be obtained
+  uint8_t *volatile buf_rd;
+
+  // Pointer to end of job buffer
+  uint8_t *buf_end;
+
+  int terminate;
+
+#if CONFIG_MULTITHREAD
+  pthread_mutex_t mutex;
+  pthread_cond_t cond;
+#endif
+} JobQueueRowMt;
+
+void vp9_jobq_init(JobQueueRowMt *jobq, uint8_t *buf, size_t buf_size);
+void vp9_jobq_reset(JobQueueRowMt *jobq);
+void vp9_jobq_deinit(JobQueueRowMt *jobq);
+void vp9_jobq_terminate(JobQueueRowMt *jobq);
+int vp9_jobq_queue(JobQueueRowMt *jobq, void *job, size_t job_size);
+int vp9_jobq_dequeue(JobQueueRowMt *jobq, void *job, size_t job_size,
+                     int blocking);
+
+#endif  // VPX_VP9_DECODER_VP9_JOB_QUEUE_H_
index 59f612b..93a5f36 100644 (file)
@@ -28,5 +28,7 @@ VP9_DX_SRCS-yes += decoder/vp9_decoder.c
 VP9_DX_SRCS-yes += decoder/vp9_decoder.h
 VP9_DX_SRCS-yes += decoder/vp9_dsubexp.c
 VP9_DX_SRCS-yes += decoder/vp9_dsubexp.h
+VP9_DX_SRCS-yes += decoder/vp9_job_queue.c
+VP9_DX_SRCS-yes += decoder/vp9_job_queue.h
 
 VP9_DX_SRCS-yes := $(filter-out $(VP9_DX_SRCS_REMOVE-yes),$(VP9_DX_SRCS-yes))
index 4c20f37..26de0de 100644 (file)
@@ -211,6 +211,17 @@ static INLINE int pthread_cond_wait(pthread_cond_t *const condition,
 #endif
   return !ok;
 }
+
+static INLINE int sched_yield() {
+  int ok = 0;
+#if _WIN32_WINNT >= 0x0400  // Windows XP and above
+  SwitchToThread();
+#else
+  Sleep(0);
+#endif  // _WIN32_WINNT >= 0x0400
+  return ok;
+}
+
 #elif defined(__OS2__)
 #define INCL_DOS
 #include <os2.h>  // NOLINT