[Gst/MQTT/Sink] Support dynamic buffer re-allocation
authorWook Song <wook16.song@samsung.com>
Fri, 23 Apr 2021 09:42:10 +0000 (18:42 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Mon, 3 May 2021 11:57:05 +0000 (20:57 +0900)
This patch changes the message buffer allocation mechanism. When the
size of the incoming GStreamer buffer exceeds that of the current
message buffer allocated, the existing buffer would be freed and
re-allocated. Note that in the case the size of the message buffer
is fixed by the max-buffer-size property, this feature is disabled.

Signed-off-by: Wook Song <wook16.song@samsung.com>
gst/mqtt/mqttsink.c

index bb20763..e264314 100644 (file)
@@ -594,6 +594,8 @@ _mqtt_set_msg_buf_hdr (GstBuffer * gst_buf, GstMQTTMessageHdr * hdr)
 static GstFlowReturn
 gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
 {
+  const gsize in_buf_size = gst_buffer_get_size (in_buf);
+  static gboolean is_static_sized_buf = FALSE;
   GstMqttSink *self = GST_MQTT_SINK (basesink);
   GstFlowReturn ret = GST_FLOW_ERROR;
   GstMemory *in_buf_mem;
@@ -634,6 +636,14 @@ gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
     self->num_buffers -= 1;
   }
 
+  if ((!is_static_sized_buf) && (self->mqtt_msg_buf) &&
+      (self->mqtt_msg_buf_size != 0) &&
+      (self->mqtt_msg_buf_size < in_buf_size + GST_MQTT_LEN_MSG_HDR)) {
+    g_free (self->mqtt_msg_buf);
+    self->mqtt_msg_buf = NULL;
+    self->mqtt_msg_buf_size = 0;
+  }
+
   /** Allocate a message buffer */
   if ((!self->mqtt_msg_buf) && (self->mqtt_msg_buf_size == 0)) {
     if (!_mqtt_set_msg_buf_hdr (in_buf, &self->mqtt_msg_hdr)) {
@@ -642,11 +652,8 @@ gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
     }
 
     if (self->max_msg_buf_size == 0) {
-      self->mqtt_msg_buf_size = gst_buffer_get_size (in_buf) +
-          GST_MQTT_LEN_MSG_HDR;
+      self->mqtt_msg_buf_size = in_buf_size + GST_MQTT_LEN_MSG_HDR;
     } else {
-      gsize in_buf_size = gst_buffer_get_size (in_buf);
-
       if (self->max_msg_buf_size < in_buf_size) {
         g_printerr ("%s: The given size for a message buffer is too small: "
             "given (%lu bytes) vs. incomming (%lu bytes)\n",
@@ -655,6 +662,7 @@ gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
         goto ret_with;
       }
       self->mqtt_msg_buf_size = self->max_msg_buf_size + GST_MQTT_LEN_MSG_HDR;
+      is_static_sized_buf = TRUE;
     }
 
     self->mqtt_msg_buf = g_malloc0 (self->mqtt_msg_buf_size);