openjpegenc: support for a multithreaded encoding.
authorStéphane Cerveau <scerveau@collabora.com>
Tue, 21 Apr 2020 18:56:03 +0000 (20:56 +0200)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Mon, 20 Sep 2021 15:11:02 +0000 (15:11 +0000)
This commit introduces a multithreaded encoder allowing
to encode mulitple stripes or subframes in separated threads.
This feature aims to enhance the overall latency of a codec
pipeline.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/979>

docs/plugins/gst_plugins_cache.json
ext/openjpeg/gstopenjpeg.h
ext/openjpeg/gstopenjpegenc.c
ext/openjpeg/gstopenjpegenc.h

index 5d4a163..521d78d 100644 (file)
                         "type": "gint",
                         "writable": true
                     },
+                    "num-threads": {
+                        "blurb": "Max number of simultaneous threads to encode stripe or frame, default: encode with streaming thread.",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "0",
+                        "max": "2147483647",
+                        "min": "0",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "guint",
+                        "writable": true
+                    },
                     "progression-order": {
                         "blurb": "Progression order",
                         "conditionally-available": false,
index 4f07542..275b8dc 100644 (file)
 
 #include <openjpeg.h>
 
+typedef enum
+{
+  OPENJPEG_ERROR_NONE = 0,
+  OPENJPEG_ERROR_INIT,
+  OPENJPEG_ERROR_ENCODE,
+  OPENJPEG_ERROR_DECODE,
+  OPENJPEG_ERROR_OPEN,
+  OPENJPEG_ERROR_MAP_READ,
+  OPENJPEG_ERROR_MAP_WRITE,
+  OPENJPEG_ERROR_FILL_IMAGE,
+  OPENJPEG_ERROR_NEGOCIATE,
+  OPENJPEG_ERROR_ALLOCATE,
+} OpenJPEGErrorCode;
+
+typedef struct
+{
+  GstVideoCodecFrame *frame;
+  GstBuffer *output_buffer;
+  gint stripe;
+  OpenJPEGErrorCode last_error;
+  gboolean direct;
+} GstOpenJPEGCodecMessage;
+
 #endif /* __GST_OPENJPEG_H__ */
index f129e56..af5814b 100644 (file)
  *
  */
 
+/**
+ * SECTION:element-openjpegenc
+ * @title: openjpegenc
+ * @see_also: openjpegdec
+ *
+ * openjpegenc encodes raw video stream.
+ *
+ * ## Example launch lines
+ * |[
+ * gst-launch-1.0 -v videotestsrc num-buffers=10 ! openjpegenc ! jpeg2000parse ! openjpegdec ! videoconvert ! autovideosink sync=false
+ * ]| Encode and decode whole frames.
+ * |[
+ * gst-launch-1.0 -v videotestsrc num-buffers=10 ! openjpegenc num-threads=8 num-stripes=8 ! jpeg2000parse ! openjpegdec max-threads=8 ! videoconvert ! autovideosink sync=fals
+ * ]| Encode and decode frame split with stripes.
+ *
+ */
+
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
@@ -69,9 +86,11 @@ enum
   PROP_TILE_WIDTH,
   PROP_TILE_HEIGHT,
   PROP_NUM_STRIPES,
+  PROP_NUM_THREADS,
   PROP_LAST
 };
 
+
 #define DEFAULT_NUM_LAYERS 1
 #define DEFAULT_NUM_RESOLUTIONS 6
 #define DEFAULT_PROGRESSION_ORDER OPJ_LRCP
@@ -80,6 +99,13 @@ enum
 #define DEFAULT_TILE_WIDTH 0
 #define DEFAULT_TILE_HEIGHT 0
 #define GST_OPENJPEG_ENC_DEFAULT_NUM_STRIPES  1
+#define GST_OPENJPEG_ENC_DEFAULT_NUM_THREADS 0
+
+/* prototypes */
+static void gst_openjpeg_enc_finalize (GObject * object);
+
+static GstStateChangeReturn
+gst_openjpeg_enc_change_state (GstElement * element, GstStateChange transition);
 
 static void gst_openjpeg_enc_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -94,6 +120,12 @@ static GstFlowReturn gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder,
     GstVideoCodecFrame * frame);
 static gboolean gst_openjpeg_enc_propose_allocation (GstVideoEncoder * encoder,
     GstQuery * query);
+static GstFlowReturn gst_openjpeg_enc_encode_frame_multiple (GstVideoEncoder *
+    encoder, GstVideoCodecFrame * frame);
+static GstFlowReturn gst_openjpeg_enc_encode_frame_single (GstVideoEncoder *
+    encoder, GstVideoCodecFrame * frame);
+static GstOpenJPEGCodecMessage
+    * gst_openjpeg_encode_message_free (GstOpenJPEGCodecMessage * message);
 
 #if G_BYTE_ORDER == G_LITTLE_ENDIAN
 #define GRAY16 "GRAY16_LE"
@@ -150,6 +182,10 @@ gst_openjpeg_enc_class_init (GstOpenJPEGEncClass * klass)
 
   gobject_class->set_property = gst_openjpeg_enc_set_property;
   gobject_class->get_property = gst_openjpeg_enc_get_property;
+  gobject_class->finalize = gst_openjpeg_enc_finalize;
+
+  element_class->change_state =
+      GST_DEBUG_FUNCPTR (gst_openjpeg_enc_change_state);
 
   g_object_class_install_property (gobject_class, PROP_NUM_LAYERS,
       g_param_spec_int ("num-layers", "Number of layers",
@@ -199,6 +235,18 @@ gst_openjpeg_enc_class_init (GstOpenJPEGEncClass * klass)
           "Number of stripes for low latency encoding. (1 = low latency disabled)",
           1, G_MAXINT, GST_OPENJPEG_ENC_DEFAULT_NUM_STRIPES,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstOpenJPEGEnc:num-threads:
+   *
+   * Max number of simultaneous threads to encode stripes, default: encode with streaming thread
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_NUM_THREADS,
+      g_param_spec_uint ("num-threads", "Number of threads",
+          "Max number of simultaneous threads to encode stripe or frame, default: encode with streaming thread.",
+          0, G_MAXINT, GST_OPENJPEG_ENC_DEFAULT_NUM_THREADS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_static_pad_template (element_class,
       &gst_openjpeg_enc_src_template);
@@ -259,6 +307,10 @@ gst_openjpeg_enc_init (GstOpenJPEGEnc * self)
       && self->params.cp_tdy != 0);
 
   self->num_stripes = GST_OPENJPEG_ENC_DEFAULT_NUM_STRIPES;
+  g_cond_init (&self->messages_cond);
+  g_queue_init (&self->messages);
+
+  self->available_threads = GST_OPENJPEG_ENC_DEFAULT_NUM_THREADS;
 }
 
 static void
@@ -296,6 +348,9 @@ gst_openjpeg_enc_set_property (GObject * object, guint prop_id,
     case PROP_NUM_STRIPES:
       self->num_stripes = g_value_get_int (value);
       break;
+    case PROP_NUM_THREADS:
+      self->available_threads = g_value_get_uint (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -333,6 +388,9 @@ gst_openjpeg_enc_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_NUM_STRIPES:
       g_value_set_int (value, self->num_stripes);
       break;
+    case PROP_NUM_THREADS:
+      g_value_set_uint (value, self->available_threads);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -345,6 +403,10 @@ gst_openjpeg_enc_start (GstVideoEncoder * encoder)
   GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (encoder);
 
   GST_DEBUG_OBJECT (self, "Starting");
+  if (self->available_threads)
+    self->encode_frame = gst_openjpeg_enc_encode_frame_multiple;
+  else
+    self->encode_frame = gst_openjpeg_enc_encode_frame_single;
 
   return TRUE;
 }
@@ -371,6 +433,56 @@ gst_openjpeg_enc_stop (GstVideoEncoder * video_encoder)
   return TRUE;
 }
 
+static void
+gst_openjpeg_enc_flush_messages (GstOpenJPEGEnc * self)
+{
+  GstOpenJPEGCodecMessage *enc_params;
+
+  GST_OBJECT_LOCK (self);
+  while ((enc_params = g_queue_pop_head (&self->messages))) {
+    gst_openjpeg_encode_message_free (enc_params);
+  }
+  g_cond_broadcast (&self->messages_cond);
+  GST_OBJECT_UNLOCK (self);
+}
+
+static void
+gst_openjpeg_enc_finalize (GObject * object)
+{
+  GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (object);
+
+  gst_openjpeg_enc_flush_messages (self);
+  g_cond_clear (&self->messages_cond);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstStateChangeReturn
+gst_openjpeg_enc_change_state (GstElement * element, GstStateChange transition)
+{
+  GstOpenJPEGEnc *self;
+
+  g_return_val_if_fail (GST_IS_OPENJPEG_ENC (element),
+      GST_STATE_CHANGE_FAILURE);
+  self = GST_OPENJPEG_ENC (element);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      gst_openjpeg_enc_flush_messages (self);
+      break;
+    default:
+      break;
+  }
+
+  return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+}
+
 static guint
 get_stripe_height (GstOpenJPEGEnc * self, guint slice_num, guint frame_height)
 {
@@ -809,7 +921,7 @@ gst_openjpeg_enc_fill_image (GstOpenJPEGEnc * self, GstVideoFrame * frame,
   for (i = 0; i < ncomps; i++) {
     gint nominal_height = min_height / comps[i].dy;
 
-    comps[i].h = (slice_num < self->num_stripes - 1) ?
+    comps[i].h = (slice_num < self->num_stripes) ?
         nominal_height
         : GST_VIDEO_FRAME_COMP_HEIGHT (frame,
         i) - (self->num_stripes - 1) * nominal_height;
@@ -837,10 +949,10 @@ gst_openjpeg_enc_fill_image (GstOpenJPEGEnc * self, GstVideoFrame * frame,
 
   image->x0 = 0;
   image->x1 = GST_VIDEO_FRAME_WIDTH (frame);
-  image->y0 = slice_num * min_height;
+  image->y0 = (slice_num - 1) * min_height;
   image->y1 =
       (slice_num <
-      self->num_stripes - 1) ? image->y0 +
+      self->num_stripes) ? image->y0 +
       min_height : GST_VIDEO_FRAME_HEIGHT (frame);
   self->fill_image (image, frame);
 
@@ -940,21 +1052,299 @@ seek_fn (OPJ_OFF_T p_nb_bytes, void *p_user_data)
   return OPJ_TRUE;
 }
 
+static gboolean
+gst_openjpeg_encode_is_last_subframe (GstVideoEncoder * enc, int stripe)
+{
+  GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (enc);
+
+  return (stripe == self->num_stripes);
+}
+
+static GstOpenJPEGCodecMessage *
+gst_openjpeg_encode_message_new (GstOpenJPEGEnc * self,
+    GstVideoCodecFrame * frame, int num_stripe)
+{
+  GstOpenJPEGCodecMessage *message = g_slice_new0 (GstOpenJPEGCodecMessage);
+
+  message->frame = gst_video_codec_frame_ref (frame);
+  message->stripe = num_stripe;
+  message->last_error = OPENJPEG_ERROR_NONE;
+
+  return message;
+}
+
+static GstOpenJPEGCodecMessage *
+gst_openjpeg_encode_message_free (GstOpenJPEGCodecMessage * message)
+{
+  if (message) {
+    gst_video_codec_frame_unref (message->frame);
+    if (message->output_buffer)
+      gst_buffer_unref (message->output_buffer);
+    g_slice_free (GstOpenJPEGCodecMessage, message);
+  }
+  return NULL;
+}
+
+#define ENCODE_ERROR(encode_params, err_code) { \
+      encode_params->last_error = err_code; \
+      goto done; \
+}
+
+/* callback method to be called asynchronously or not*/
+static void
+gst_openjpeg_enc_encode_stripe (GstElement * element, gpointer user_data)
+{
+  GstOpenJPEGCodecMessage *message = (GstOpenJPEGCodecMessage *) user_data;
+  GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (element);
+  opj_codec_t *enc = NULL;
+  opj_stream_t *stream = NULL;
+  MemStream mstream;
+  opj_image_t *image = NULL;
+  GstVideoFrame vframe;
+
+  GST_INFO_OBJECT (self, "Encode stripe %d/%d", message->stripe,
+      self->num_stripes);
+
+  mstream.data = NULL;
+  enc = opj_create_compress (self->codec_format);
+  if (!enc)
+    ENCODE_ERROR (message, OPENJPEG_ERROR_INIT);
+
+  if (G_UNLIKELY (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >=
+          GST_LEVEL_TRACE)) {
+    opj_set_info_handler (enc, gst_openjpeg_enc_opj_info, self);
+    opj_set_warning_handler (enc, gst_openjpeg_enc_opj_warning, self);
+    opj_set_error_handler (enc, gst_openjpeg_enc_opj_error, self);
+  } else {
+    opj_set_info_handler (enc, NULL, NULL);
+    opj_set_warning_handler (enc, NULL, NULL);
+    opj_set_error_handler (enc, NULL, NULL);
+  }
+  if (!gst_video_frame_map (&vframe, &self->input_state->info,
+          message->frame->input_buffer, GST_MAP_READ))
+    ENCODE_ERROR (message, OPENJPEG_ERROR_MAP_READ);
+  image = gst_openjpeg_enc_fill_image (self, &vframe, message->stripe);
+  gst_video_frame_unmap (&vframe);
+  if (!image)
+    ENCODE_ERROR (message, OPENJPEG_ERROR_FILL_IMAGE);
+
+  if (vframe.info.finfo->flags & GST_VIDEO_FORMAT_FLAG_RGB) {
+    self->params.tcp_mct = 1;
+  }
+  opj_setup_encoder (enc, &self->params, image);
+  stream = opj_stream_create (4096, OPJ_FALSE);
+  if (!stream)
+    ENCODE_ERROR (message, OPENJPEG_ERROR_OPEN);
+
+  mstream.allocsize = 4096;
+  mstream.data = g_malloc (mstream.allocsize);
+  mstream.offset = 0;
+  mstream.size = 0;
+
+  opj_stream_set_read_function (stream, read_fn);
+  opj_stream_set_write_function (stream, write_fn);
+  opj_stream_set_skip_function (stream, skip_fn);
+  opj_stream_set_seek_function (stream, seek_fn);
+  opj_stream_set_user_data (stream, &mstream, NULL);
+  opj_stream_set_user_data_length (stream, mstream.size);
+
+  if (!opj_start_compress (enc, image, stream))
+    ENCODE_ERROR (message, OPENJPEG_ERROR_ENCODE);
+
+  if (!opj_encode (enc, stream))
+    ENCODE_ERROR (message, OPENJPEG_ERROR_ENCODE);
+
+  if (!opj_end_compress (enc, stream))
+    ENCODE_ERROR (message, OPENJPEG_ERROR_ENCODE);
+
+  opj_image_destroy (image);
+  image = NULL;
+  opj_stream_destroy (stream);
+  stream = NULL;
+  opj_destroy_codec (enc);
+  enc = NULL;
+
+  message->output_buffer = gst_buffer_new ();
+
+  if (self->is_jp2c) {
+    GstMapInfo map;
+    GstMemory *mem;
+
+    mem = gst_allocator_alloc (NULL, 8, NULL);
+    gst_memory_map (mem, &map, GST_MAP_WRITE);
+    GST_WRITE_UINT32_BE (map.data, mstream.size + 8);
+    GST_WRITE_UINT32_BE (map.data + 4, GST_MAKE_FOURCC ('j', 'p', '2', 'c'));
+    gst_memory_unmap (mem, &map);
+    gst_buffer_append_memory (message->output_buffer, mem);
+  }
+
+  gst_buffer_append_memory (message->output_buffer,
+      gst_memory_new_wrapped (0, mstream.data, mstream.allocsize, 0,
+          mstream.size, mstream.data, (GDestroyNotify) g_free));
+  message->last_error = OPENJPEG_ERROR_NONE;
+
+  GST_INFO_OBJECT (self,
+      "Stripe %d encoded successfully, pass it to the streaming thread",
+      message->stripe);
+
+done:
+  if (message->last_error != OPENJPEG_ERROR_NONE) {
+    if (mstream.data)
+      g_free (mstream.data);
+    if (enc)
+      opj_destroy_codec (enc);
+    if (image)
+      opj_image_destroy (image);
+    if (stream)
+      opj_stream_destroy (stream);
+  }
+  if (!message->direct) {
+    GST_OBJECT_LOCK (self);
+    g_queue_push_tail (&self->messages, message);
+    g_cond_signal (&self->messages_cond);
+    GST_OBJECT_UNLOCK (self);
+  }
+
+}
+
+static GstOpenJPEGCodecMessage *
+gst_openjpeg_enc_wait_for_new_message (GstOpenJPEGEnc * self)
+{
+  GstOpenJPEGCodecMessage *message = NULL;
+
+  GST_OBJECT_LOCK (self);
+  while (g_queue_is_empty (&self->messages))
+    g_cond_wait (&self->messages_cond, GST_OBJECT_GET_LOCK (self));
+  message = g_queue_pop_head (&self->messages);
+  GST_OBJECT_UNLOCK (self);
+
+  return message;
+}
+
+static GstFlowReturn
+gst_openjpeg_enc_encode_frame_multiple (GstVideoEncoder * encoder,
+    GstVideoCodecFrame * frame)
+{
+  GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (encoder);
+  GstFlowReturn ret = GST_FLOW_OK;
+  guint i;
+  guint encoded_stripes = 0;
+  guint enqueued_stripes = 0;
+  GstOpenJPEGCodecMessage *message = NULL;
+
+  /* The method receives a frame and split it into n stripes and
+   * and create a thread per stripe to encode it.
+   * As the number of stripes can be greater than the
+   * available threads to encode, there is two loop, one to
+   * count the enqueues stripes and one to count the encoded
+   * stripes.
+   */
+  while (encoded_stripes < self->num_stripes) {
+    for (i = 1;
+        i <= self->available_threads
+        && enqueued_stripes < (self->num_stripes - encoded_stripes); i++) {
+      message =
+          gst_openjpeg_encode_message_new (self, frame, i + encoded_stripes);
+      GST_LOG_OBJECT (self,
+          "About to enqueue an encoding message from frame %p stripe %d", frame,
+          message->stripe);
+      gst_element_call_async (GST_ELEMENT (self),
+          (GstElementCallAsyncFunc) gst_openjpeg_enc_encode_stripe, message,
+          NULL);
+      enqueued_stripes++;
+    }
+    while (enqueued_stripes > 0) {
+      message = gst_openjpeg_enc_wait_for_new_message (self);
+      if (!message)
+        continue;
+      enqueued_stripes--;
+      if (message->last_error == OPENJPEG_ERROR_NONE) {
+        GST_LOG_OBJECT (self,
+            "About to push frame %p stripe %d", frame, message->stripe);
+        frame->output_buffer = gst_buffer_ref (message->output_buffer);
+        if (gst_openjpeg_encode_is_last_subframe (encoder, encoded_stripes + 1)) {
+          GST_VIDEO_CODEC_FRAME_SET_SYNC_POINT (frame);
+          ret = gst_video_encoder_finish_frame (encoder, frame);
+        } else
+          ret = gst_video_encoder_finish_subframe (encoder, frame);
+        if (ret != GST_FLOW_OK) {
+          GST_WARNING_OBJECT
+              (self, "An error occurred pushing the frame %s",
+              gst_flow_get_name (ret));
+          goto done;
+        }
+        encoded_stripes++;
+        message = gst_openjpeg_encode_message_free (message);
+      } else {
+        GST_WARNING_OBJECT
+            (self, "An error occurred %d during the JPEG encoding",
+            message->last_error);
+        gst_video_codec_frame_unref (frame);
+        self->last_error = message->last_error;
+        ret = GST_FLOW_ERROR;
+        goto done;
+      }
+    }
+  }
+
+done:
+  gst_openjpeg_encode_message_free (message);
+  return ret;
+}
+
+static GstFlowReturn
+gst_openjpeg_enc_encode_frame_single (GstVideoEncoder * encoder,
+    GstVideoCodecFrame * frame)
+{
+  GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (encoder);
+  GstFlowReturn ret = GST_FLOW_OK;
+  guint i;
+  GstOpenJPEGCodecMessage *message = NULL;
+
+  for (i = 1; i <= self->num_stripes; ++i) {
+    message = gst_openjpeg_encode_message_new (self, frame, i);
+    message->direct = TRUE;
+    gst_openjpeg_enc_encode_stripe (GST_ELEMENT (self), message);
+    if (message->last_error != OPENJPEG_ERROR_NONE) {
+      GST_WARNING_OBJECT
+          (self, "An error occured %d during the JPEG encoding",
+          message->last_error);
+      gst_video_codec_frame_unref (frame);
+      self->last_error = message->last_error;
+      ret = GST_FLOW_ERROR;
+      goto done;
+    }
+    frame->output_buffer = gst_buffer_ref (message->output_buffer);
+    if (gst_openjpeg_encode_is_last_subframe (encoder, message->stripe)) {
+      GST_VIDEO_CODEC_FRAME_SET_SYNC_POINT (frame);
+      ret = gst_video_encoder_finish_frame (encoder, frame);
+    } else
+      ret = gst_video_encoder_finish_subframe (encoder, frame);
+    if (ret != GST_FLOW_OK) {
+      GST_WARNING_OBJECT
+          (self, "An error occurred pushing the frame %s",
+          gst_flow_get_name (ret));
+      goto done;
+    }
+    message = gst_openjpeg_encode_message_free (message);
+  }
+
+done:
+  gst_openjpeg_encode_message_free (message);
+  return ret;
+}
+
 static GstFlowReturn
 gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder,
     GstVideoCodecFrame * frame)
 {
   GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (encoder);
   GstFlowReturn ret = GST_FLOW_OK;
-  opj_codec_t *enc;
-  opj_stream_t *stream;
-  MemStream mstream;
-  opj_image_t *image;
   GstVideoFrame vframe;
-  guint i;
+
   GstCaps *current_caps;
   GstStructure *s;
-  gboolean stripe_mode =
+  gboolean subframe_mode =
       self->num_stripes != GST_OPENJPEG_ENC_DEFAULT_NUM_STRIPES;
 
   GST_DEBUG_OBJECT (self, "Handling frame");
@@ -962,7 +1352,7 @@ gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder,
   current_caps = gst_pad_get_current_caps (GST_VIDEO_ENCODER_SRC_PAD (encoder));
   s = gst_caps_get_structure (current_caps, 0);
 
-  if (stripe_mode) {
+  if (subframe_mode) {
     const gchar *str = gst_structure_get_string (s, "alignment");
     gint min_res;
 
@@ -980,9 +1370,8 @@ gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder,
     if (!gst_video_frame_map (&vframe, &self->input_state->info,
             frame->input_buffer, GST_MAP_READ)) {
       gst_video_codec_frame_unref (frame);
-      GST_ELEMENT_ERROR (self, CORE, FAILED,
-          ("Failed to map input buffer"), (NULL));
-      return GST_FLOW_ERROR;
+      self->last_error = OPENJPEG_ERROR_MAP_READ;
+      goto error;
     }
     /* find stripe with least height */
     min_res =
@@ -995,146 +1384,42 @@ gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder,
     self->params.numresolution = MIN (min_res + 1, self->params.numresolution);
     gst_video_frame_unmap (&vframe);
   }
-
-
-  for (i = 0; i < self->num_stripes; ++i) {
-    enc = opj_create_compress (self->codec_format);
-    if (!enc)
-      goto initialization_error;
-
-    if (G_UNLIKELY (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >=
-            GST_LEVEL_TRACE)) {
-      opj_set_info_handler (enc, gst_openjpeg_enc_opj_info, self);
-      opj_set_warning_handler (enc, gst_openjpeg_enc_opj_warning, self);
-      opj_set_error_handler (enc, gst_openjpeg_enc_opj_error, self);
-    } else {
-      opj_set_info_handler (enc, NULL, NULL);
-      opj_set_warning_handler (enc, NULL, NULL);
-      opj_set_error_handler (enc, NULL, NULL);
-    }
-
-    if (!gst_video_frame_map (&vframe, &self->input_state->info,
-            frame->input_buffer, GST_MAP_READ))
-      goto map_read_error;
-
-    image = gst_openjpeg_enc_fill_image (self, &vframe, i);
-    if (!image)
-      goto fill_image_error;
-    gst_video_frame_unmap (&vframe);
-
-    if (vframe.info.finfo->flags & GST_VIDEO_FORMAT_FLAG_RGB) {
-      self->params.tcp_mct = 1;
-    }
-    opj_setup_encoder (enc, &self->params, image);
-    stream = opj_stream_create (4096, OPJ_FALSE);
-    if (!stream)
-      goto open_error;
-
-    mstream.allocsize = 4096;
-    mstream.data = g_malloc (mstream.allocsize);
-    mstream.offset = 0;
-    mstream.size = 0;
-
-    opj_stream_set_read_function (stream, read_fn);
-    opj_stream_set_write_function (stream, write_fn);
-    opj_stream_set_skip_function (stream, skip_fn);
-    opj_stream_set_seek_function (stream, seek_fn);
-    opj_stream_set_user_data (stream, &mstream, NULL);
-    opj_stream_set_user_data_length (stream, mstream.size);
-
-    if (!opj_start_compress (enc, image, stream))
-      goto encode_error;
-
-    if (!opj_encode (enc, stream))
-      goto encode_error;
-
-    if (!opj_end_compress (enc, stream))
-      goto encode_error;
-
-    opj_image_destroy (image);
-    opj_stream_destroy (stream);
-    opj_destroy_codec (enc);
-
-    frame->output_buffer = gst_buffer_new ();
-
-    if (self->is_jp2c) {
-      GstMapInfo map;
-      GstMemory *mem;
-
-      mem = gst_allocator_alloc (NULL, 8, NULL);
-      gst_memory_map (mem, &map, GST_MAP_WRITE);
-      GST_WRITE_UINT32_BE (map.data, mstream.size + 8);
-      GST_WRITE_UINT32_BE (map.data + 4, GST_MAKE_FOURCC ('j', 'p', '2', 'c'));
-      gst_memory_unmap (mem, &map);
-      gst_buffer_append_memory (frame->output_buffer, mem);
-    }
-
-    gst_buffer_append_memory (frame->output_buffer,
-        gst_memory_new_wrapped (0, mstream.data, mstream.allocsize, 0,
-            mstream.size, mstream.data, (GDestroyNotify) g_free));
-
-    GST_VIDEO_CODEC_FRAME_SET_SYNC_POINT (frame);
-    ret =
-        (i ==
-        self->num_stripes -
-        1) ? gst_video_encoder_finish_frame (encoder,
-        frame) : gst_video_encoder_finish_subframe (encoder, frame);
-
-  }
-
+  if (self->encode_frame (encoder, frame) != GST_FLOW_OK)
+    goto error;
 done:
   if (current_caps)
     gst_caps_unref (current_caps);
   return ret;
 
-initialization_error:
-  {
-    gst_video_codec_frame_unref (frame);
-    GST_ELEMENT_ERROR (self, LIBRARY, INIT,
-        ("Failed to initialize OpenJPEG encoder"), (NULL));
-    return GST_FLOW_ERROR;
-  }
-map_read_error:
-  {
-    opj_destroy_codec (enc);
-    gst_video_codec_frame_unref (frame);
-
-    GST_ELEMENT_ERROR (self, CORE, FAILED,
-        ("Failed to map input buffer"), (NULL));
-    return GST_FLOW_ERROR;
-  }
-fill_image_error:
-  {
-    opj_destroy_codec (enc);
-    gst_video_frame_unmap (&vframe);
-    gst_video_codec_frame_unref (frame);
-
-    GST_ELEMENT_ERROR (self, LIBRARY, INIT,
-        ("Failed to fill OpenJPEG image"), (NULL));
-    return GST_FLOW_ERROR;
-  }
-open_error:
-  {
-    opj_image_destroy (image);
-    opj_destroy_codec (enc);
-    gst_video_codec_frame_unref (frame);
-
-    GST_ELEMENT_ERROR (self, LIBRARY, INIT,
-        ("Failed to open OpenJPEG data"), (NULL));
-    return GST_FLOW_ERROR;
-  }
-encode_error:
-  {
-    opj_stream_destroy (stream);
-    g_free (mstream.data);
-    opj_image_destroy (image);
-    opj_destroy_codec (enc);
-    gst_video_codec_frame_unref (frame);
-
-    GST_ELEMENT_ERROR (self, STREAM, ENCODE,
-        ("Failed to encode OpenJPEG stream"), (NULL));
-    return GST_FLOW_ERROR;
+error:
+  switch (self->last_error) {
+    case OPENJPEG_ERROR_INIT:
+      GST_ELEMENT_ERROR (self, LIBRARY, INIT,
+          ("Failed to initialize OpenJPEG encoder"), (NULL));
+      break;
+    case OPENJPEG_ERROR_MAP_READ:
+      GST_ELEMENT_ERROR (self, CORE, FAILED,
+          ("Failed to map input buffer"), (NULL));
+      break;
+    case OPENJPEG_ERROR_FILL_IMAGE:
+      GST_ELEMENT_ERROR (self, LIBRARY, INIT,
+          ("Failed to fill OpenJPEG image"), (NULL));
+      break;
+    case OPENJPEG_ERROR_OPEN:
+      GST_ELEMENT_ERROR (self, LIBRARY, INIT,
+          ("Failed to open OpenJPEG data"), (NULL));
+      break;
+    case OPENJPEG_ERROR_ENCODE:
+      GST_ELEMENT_ERROR (self, LIBRARY, INIT,
+          ("Failed to encode OpenJPEG data"), (NULL));
+      break;
+    default:
+      GST_ELEMENT_ERROR (self, LIBRARY, INIT,
+          ("Failed to encode OpenJPEG data"), (NULL));
+      break;
   }
+  gst_openjpeg_enc_flush_messages (self);
+  return GST_FLOW_ERROR;
 }
 
 static gboolean
index 754b3b4..a2c81bb 100644 (file)
@@ -55,9 +55,17 @@ struct _GstOpenJPEGEnc
   gboolean is_jp2c;
 
   void (*fill_image) (opj_image_t * image, GstVideoFrame *frame);
+  gboolean (*encode_frame) (GstVideoEncoder * encoder, GstVideoCodecFrame *frame);
 
   opj_cparameters_t params;
   gint num_stripes;
+
+  guint available_threads;
+  GQueue messages;
+
+  GCond messages_cond;
+
+  OpenJPEGErrorCode last_error;
 };
 
 struct _GstOpenJPEGEncClass