gst/gdp/gstgdpdepay.c: Disable seeking.
authorWim Taymans <wim.taymans@gmail.com>
Wed, 2 Aug 2006 16:56:19 +0000 (16:56 +0000)
committerTim-Philipp Müller <tim@centricular.net>
Tue, 11 Sep 2012 00:54:29 +0000 (01:54 +0100)
Original commit message from CVS:
* gst/gdp/gstgdpdepay.c: (gst_gdp_depay_init),
(gst_gdp_depay_finalize), (gst_gdp_depay_sink_event),
(gst_gdp_depay_src_event), (gst_gdp_depay_chain),
(gst_gdp_depay_change_state):
Disable seeking.
Small cleanups.
Clear adapter on disconts.
Clear caps when going to READY instead of NULL
* gst/gdp/gstgdppay.c: (gst_gdp_pay_class_init),
(gst_gdp_pay_init), (gst_gdp_pay_finalize), (gst_gdp_pay_reset),
(gst_gdp_buffer_from_caps), (gst_gdp_pay_buffer_from_buffer),
(gst_gdp_buffer_from_event), (gst_gdp_pay_reset_streamheader),
(gst_gdp_queue_buffer), (gst_gdp_pay_chain),
(gst_gdp_pay_sink_event), (gst_gdp_pay_src_event),
(gst_gdp_pay_change_state):
* gst/gdp/gstgdppay.h:
Reset payloader when going to READY.
Fix leaked buffers in ->queue on push errors.
Disable seeking.
Code cleanups.
Create packetizer in _init, free in _finalize.

gst/gdp/gstgdpdepay.c
gst/gdp/gstgdppay.c
gst/gdp/gstgdppay.h

index 4b0266f..ba391c4 100644 (file)
@@ -56,7 +56,6 @@ GST_ELEMENT_DETAILS ("GDP Depayloader",
 enum
 {
   PROP_0,
-  /* FILL ME */
 };
 
 static GstStaticPadTemplate gdp_depay_sink_template =
@@ -82,6 +81,8 @@ GST_BOILERPLATE_FULL (GstGDPDepay, gst_gdp_depay, GstElement,
     GST_TYPE_ELEMENT, _do_init);
 
 static gboolean gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_gdp_depay_src_event (GstPad * pad, GstEvent * event);
+
 static GstFlowReturn gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer);
 
 static GstStateChangeReturn gst_gdp_depay_change_state (GstElement *
@@ -129,6 +130,8 @@ gst_gdp_depay_init (GstGDPDepay * gdpdepay, GstGDPDepayClass * g_class)
 
   gdpdepay->srcpad =
       gst_pad_new_from_static_template (&gdp_depay_src_template, "src");
+  gst_pad_set_event_function (gdpdepay->srcpad,
+      GST_DEBUG_FUNCPTR (gst_gdp_depay_src_event));
   /* our caps will always be decided by the incoming GDP caps buffers */
   gst_pad_use_fixed_caps (gdpdepay->srcpad);
   gst_element_add_pad (GST_ELEMENT (gdpdepay), gdpdepay->srcpad);
@@ -144,19 +147,13 @@ gst_gdp_depay_finalize (GObject * gobject)
   this = GST_GDP_DEPAY (gobject);
   if (this->caps)
     gst_caps_unref (this->caps);
-  if (this->header)
-    g_free (this->header);
+  g_free (this->header);
   gst_adapter_clear (this->adapter);
   g_object_unref (this->adapter);
 
   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
 }
 
-/* we ignore most events because the only events we want to output are those
- * found in the gdp data stream.
- * An exception is the EOS event, if we get that on the sinkpad, we certainly
- * are not going to produce more data.
- */
 static gboolean
 gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
 {
@@ -166,10 +163,20 @@ gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
   this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
 
   switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_START:
+    case GST_EVENT_FLUSH_STOP:
+      /* forward flush start and stop */
+      res = gst_pad_push_event (this->srcpad, event);
+      break;
     case GST_EVENT_EOS:
+      /* after EOS, we don't expect to output anything anymore */
       res = gst_pad_push_event (this->srcpad, event);
       break;
+    case GST_EVENT_NEWSEGMENT:
+    case GST_EVENT_TAG:
+    case GST_EVENT_BUFFERSIZE:
     default:
+      /* we unref most events as we take them from the datastream */
       gst_event_unref (event);
       break;
   }
@@ -178,6 +185,32 @@ gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
   return res;
 }
 
+static gboolean
+gst_gdp_depay_src_event (GstPad * pad, GstEvent * event)
+{
+  GstGDPDepay *this;
+  gboolean res = TRUE;
+
+  this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEEK:
+      /* we refuse seek for now. */
+      gst_event_unref (event);
+      res = FALSE;
+      break;
+    case GST_EVENT_QOS:
+    case GST_EVENT_NAVIGATION:
+    default:
+      /* everything else is passed */
+      res = gst_pad_push_event (this->sinkpad, event);
+      break;
+  }
+  gst_object_unref (this);
+
+  return res;
+}
+
 static GstFlowReturn
 gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
 {
@@ -186,108 +219,124 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
   GstCaps *caps;
   GstBuffer *buf;
   GstEvent *event;
-  guint8 *header = NULL;
-  guint8 *payload = NULL;
   guint available;
-  gboolean running = TRUE;
 
   this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
 
+  /* On DISCONT, get rid of accumulated data. We assume a buffer after the
+   * DISCONT contains (part of) a new valid header, if not we error because we
+   * lost sync */
+  if (GST_BUFFER_IS_DISCONT (buffer)) {
+    gst_adapter_clear (this->adapter);
+    this->state = GST_GDP_DEPAY_STATE_HEADER;
+  }
   gst_adapter_push (this->adapter, buffer);
 
-  while (running) {
+  while (TRUE) {
     switch (this->state) {
       case GST_GDP_DEPAY_STATE_HEADER:
+      {
+        guint8 *header;
+
+        /* collect a complete header, validate and store the header. Figure out
+         * the payload length and switch to the PAYLOAD state */
         available = gst_adapter_available (this->adapter);
-        if (available < GST_DP_HEADER_LENGTH) {
-          running = FALSE;
-          break;
-        }
+        if (available < GST_DP_HEADER_LENGTH)
+          goto done;
 
-        if (this->header)
-          g_free (this->header);
         GST_LOG_OBJECT (this, "reading GDP header from adapter");
         header = gst_adapter_take (this->adapter, GST_DP_HEADER_LENGTH);
-        if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
+        if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) {
+          g_free (header);
           goto header_validate_error;
+        }
 
+        /* store types and payload length. Also store the header, which we need
+         * to make the payload. */
         this->payload_length = gst_dp_header_payload_length (header);
         this->payload_type = gst_dp_header_payload_type (header);
+        /* free previous header and store new one. */
+        g_free (this->header);
         this->header = header;
+
         GST_LOG_OBJECT (this,
             "read GDP header, payload size %d, switching to state PAYLOAD",
             this->payload_length);
         this->state = GST_GDP_DEPAY_STATE_PAYLOAD;
         break;
-
+      }
       case GST_GDP_DEPAY_STATE_PAYLOAD:
+      {
+        /* in this state we wait for all the payload data to be available in the
+         * adapter. Then we switch to the state where we actually process the
+         * payload. */
         available = gst_adapter_available (this->adapter);
-        if (available < this->payload_length) {
-          running = FALSE;
-          break;
-        }
+        if (available < this->payload_length)
+          goto done;
 
         /* change state based on type */
-        if (this->payload_type == GST_DP_PAYLOAD_BUFFER) {
-          GST_LOG_OBJECT (this, "switching to state BUFFER");
-          this->state = GST_GDP_DEPAY_STATE_BUFFER;
-        } else if (this->payload_type == GST_DP_PAYLOAD_CAPS) {
-          GST_LOG_OBJECT (this, "switching to state CAPS");
-          this->state = GST_GDP_DEPAY_STATE_CAPS;
-        } else if (this->payload_type >= GST_DP_PAYLOAD_EVENT_NONE) {
-          GST_LOG_OBJECT (this, "switching to state EVENT");
-          this->state = GST_GDP_DEPAY_STATE_EVENT;
-        } else
-          goto wrong_type;
+        switch (this->payload_type) {
+          case GST_DP_PAYLOAD_BUFFER:
+            GST_LOG_OBJECT (this, "switching to state BUFFER");
+            this->state = GST_GDP_DEPAY_STATE_BUFFER;
+            break;
+          case GST_DP_PAYLOAD_CAPS:
+            GST_LOG_OBJECT (this, "switching to state CAPS");
+            this->state = GST_GDP_DEPAY_STATE_CAPS;
+            break;
+          case GST_DP_PAYLOAD_EVENT_NONE:
+            GST_LOG_OBJECT (this, "switching to state EVENT");
+            this->state = GST_GDP_DEPAY_STATE_EVENT;
+            break;
+          default:
+            goto wrong_type;
+        }
         break;
-
+      }
       case GST_GDP_DEPAY_STATE_BUFFER:
-        if (!this->caps) {
-          GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
-              ("Received a buffer without first receiving caps"));
-          ret = GST_FLOW_NOT_NEGOTIATED;
-          goto done;
-        }
+      {
+
+        /* if we receive a buffer without caps first, we error out */
+        if (!this->caps)
+          goto no_caps;
 
         GST_LOG_OBJECT (this, "reading GDP buffer from adapter");
         buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, this->header);
-        if (!buf) {
-          GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
-              ("could not create buffer from GDP packet"));
-          ret = GST_FLOW_ERROR;
-          goto done;
-        }
+        if (!buf)
+          goto buffer_failed;
 
-        payload = gst_adapter_take (this->adapter, this->payload_length);
-        memcpy (GST_BUFFER_DATA (buf), payload, this->payload_length);
-        g_free (payload);
-        payload = NULL;
+        /* now take the payload if there is any */
+        if (this->payload_length > 0) {
+          guint8 *payload;
+
+          payload = gst_adapter_take (this->adapter, this->payload_length);
+          memcpy (GST_BUFFER_DATA (buf), payload, this->payload_length);
+          g_free (payload);
+        }
 
+        /* set caps and push */
         gst_buffer_set_caps (buf, this->caps);
         ret = gst_pad_push (this->srcpad, buf);
-        if (ret != GST_FLOW_OK) {
-          GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d",
-              ret);
-          goto done;
-        }
+        if (ret != GST_FLOW_OK)
+          goto push_error;
 
         GST_LOG_OBJECT (this, "switching to state HEADER");
         this->state = GST_GDP_DEPAY_STATE_HEADER;
         break;
-
+      }
       case GST_GDP_DEPAY_STATE_CAPS:
+      {
+        guint8 *payload;
+
+        /* take the payload of the caps */
         GST_LOG_OBJECT (this, "reading GDP caps from adapter");
         payload = gst_adapter_take (this->adapter, this->payload_length);
         caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, this->header,
             payload);
         g_free (payload);
-        payload = NULL;
-        if (!caps) {
-          GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
-              ("could not create caps from GDP packet"));
-          ret = GST_FLOW_ERROR;
-          goto done;
-        }
+        if (!caps)
+          goto caps_failed;
+
         GST_DEBUG_OBJECT (this, "read caps %" GST_PTR_FORMAT, caps);
         gst_caps_replace (&(this->caps), caps);
         gst_pad_set_caps (this->srcpad, caps);
@@ -297,24 +346,24 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
         GST_LOG_OBJECT (this, "switching to state HEADER");
         this->state = GST_GDP_DEPAY_STATE_HEADER;
         break;
-
+      }
       case GST_GDP_DEPAY_STATE_EVENT:
+      {
+        guint8 *payload;
+
         GST_LOG_OBJECT (this, "reading GDP event from adapter");
+
         /* adapter doesn't like 0 length payload */
         if (this->payload_length > 0)
           payload = gst_adapter_take (this->adapter, this->payload_length);
+        else
+          payload = NULL;
         event = gst_dp_event_from_packet (GST_DP_HEADER_LENGTH, this->header,
             payload);
-        if (payload) {
-          g_free (payload);
-          payload = NULL;
-        }
-        if (!event) {
-          GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
-              ("could not create event from GDP packet"));
-          ret = GST_FLOW_ERROR;
-          goto done;
-        }
+        g_free (payload);
+        if (!event)
+          goto event_failed;
+
         GST_DEBUG_OBJECT (this, "sending deserialized event %p of type %s",
             event, gst_event_type_get_name (event->type));
         gst_pad_push_event (this->srcpad, event);
@@ -322,27 +371,62 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
         GST_LOG_OBJECT (this, "switching to state HEADER");
         this->state = GST_GDP_DEPAY_STATE_HEADER;
         break;
+      }
     }
   }
-  goto done;
-
-header_validate_error:
-  GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
-      ("GDP packet header does not validate"));
-  g_free (header);
-  ret = GST_FLOW_ERROR;
-  goto done;
-
-wrong_type:
-  GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
-      ("GDP packet header is of wrong type"));
-  g_free (header);
-  ret = GST_FLOW_ERROR;
-  goto done;
 
 done:
   gst_object_unref (this);
   return ret;
+
+  /* ERRORS */
+header_validate_error:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+        ("GDP packet header does not validate"));
+    ret = GST_FLOW_ERROR;
+    goto done;
+  }
+wrong_type:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+        ("GDP packet header is of wrong type"));
+    ret = GST_FLOW_ERROR;
+    goto done;
+  }
+no_caps:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+        ("Received a buffer without first receiving caps"));
+    ret = GST_FLOW_NOT_NEGOTIATED;
+    goto done;
+  }
+buffer_failed:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+        ("could not create buffer from GDP packet"));
+    ret = GST_FLOW_ERROR;
+    goto done;
+  }
+push_error:
+  {
+    GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d", ret);
+    goto done;
+  }
+caps_failed:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+        ("could not create caps from GDP packet"));
+    ret = GST_FLOW_ERROR;
+    goto done;
+  }
+event_failed:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+        ("could not create event from GDP packet"));
+    ret = GST_FLOW_ERROR;
+    goto done;
+  }
 }
 
 static GstStateChangeReturn
@@ -354,7 +438,7 @@ gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
 
   switch (transition) {
-    case GST_STATE_CHANGE_READY_TO_NULL:
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
       if (this->caps) {
         gst_caps_unref (this->caps);
         this->caps = NULL;
@@ -363,7 +447,6 @@ gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
     default:
       break;
   }
-
   return ret;
 }
 
index 26eaab6..e164b50 100644 (file)
@@ -85,8 +85,12 @@ enum
 GST_BOILERPLATE_FULL (GstGDPPay, gst_gdp_pay, GstElement,
     GST_TYPE_ELEMENT, _do_init);
 
+static void gst_gdp_pay_reset (GstGDPPay * this);
 static GstFlowReturn gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer);
+
+static gboolean gst_gdp_pay_src_event (GstPad * pad, GstEvent * event);
 static gboolean gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event);
+
 static GstStateChangeReturn gst_gdp_pay_change_state (GstElement *
     element, GstStateChange transition);
 
@@ -95,7 +99,7 @@ static void gst_gdp_pay_set_property (GObject * object, guint prop_id,
 static void gst_gdp_pay_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 
-static void gst_gdp_pay_dispose (GObject * gobject);
+static void gst_gdp_pay_finalize (GObject * gobject);
 
 static void
 gst_gdp_pay_base_init (gpointer g_class)
@@ -121,8 +125,7 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
 
   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_set_property);
   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_get_property);
-  gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_gdp_pay_dispose);
-  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
+  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_gdp_pay_finalize);
 
   g_object_class_install_property (gobject_class, PROP_CRC_HEADER,
       g_param_spec_boolean ("crc-header", "CRC Header",
@@ -136,6 +139,8 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
       g_param_spec_enum ("version", "Version",
           "Version of the GStreamer Data Protocol",
           GST_TYPE_DP_VERSION, DEFAULT_VERSION, G_PARAM_READWRITE));
+
+  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
 }
 
 static void
@@ -151,30 +156,61 @@ gst_gdp_pay_init (GstGDPPay * gdppay, GstGDPPayClass * g_class)
 
   gdppay->srcpad =
       gst_pad_new_from_static_template (&gdp_pay_src_template, "src");
+  gst_pad_set_event_function (gdppay->srcpad,
+      GST_DEBUG_FUNCPTR (gst_gdp_pay_src_event));
   gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->srcpad);
 
-  gdppay->offset = 0;
-
   gdppay->crc_header = DEFAULT_CRC_HEADER;
   gdppay->crc_payload = DEFAULT_CRC_PAYLOAD;
   gdppay->header_flag = gdppay->crc_header | gdppay->crc_payload;
   gdppay->version = DEFAULT_VERSION;
+  gdppay->offset = 0;
+
+  gdppay->packetizer = gst_dp_packetizer_new (gdppay->version);
 }
 
 static void
-gst_gdp_pay_dispose (GObject * gobject)
+gst_gdp_pay_finalize (GObject * gobject)
 {
   GstGDPPay *this = GST_GDP_PAY (gobject);
 
+  gst_gdp_pay_reset (this);
+  gst_dp_packetizer_free (this->packetizer);
+
+  GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
+}
+
+static void
+gst_gdp_pay_reset (GstGDPPay * this)
+{
+  /* clear the queued buffers */
+  while (this->queue) {
+    GstBuffer *buffer;
+
+    buffer = GST_BUFFER_CAST (this->queue->data);
+    GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
+
+    /* delete buffer from queue now */
+    this->queue = g_list_delete_link (this->queue, this->queue);
+  }
+  if (this->caps) {
+    gst_caps_unref (this->caps);
+    this->caps = NULL;
+  }
   if (this->caps_buf) {
     gst_buffer_unref (this->caps_buf);
     this->caps_buf = NULL;
   }
+  if (this->tag_buf) {
+    gst_buffer_unref (this->tag_buf);
+    this->tag_buf = NULL;
+  }
   if (this->new_segment_buf) {
     gst_buffer_unref (this->new_segment_buf);
     this->new_segment_buf = NULL;
   }
-  GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (gobject));
+  this->sent_streamheader = FALSE;
+  this->offset = 0;
 }
 
 /* set OFFSET and OFFSET_END with running count */
@@ -195,10 +231,8 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
   guint len;
 
   if (!this->packetizer->packet_from_caps (caps, this->header_flag, &len,
-          &header, &payload)) {
-    GST_WARNING_OBJECT (this, "could not create GDP header from caps");
-    return NULL;
-  }
+          &header, &payload))
+    goto packet_failed;
 
   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from caps");
   headerbuf = gst_buffer_new ();
@@ -211,6 +245,13 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
   GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
 
   return gst_buffer_join (headerbuf, payloadbuf);
+
+  /* ERRORS */
+packet_failed:
+  {
+    GST_WARNING_OBJECT (this, "could not create GDP header from caps");
+    return NULL;
+  }
 }
 
 static GstBuffer *
@@ -221,10 +262,8 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
   guint len;
 
   if (!this->packetizer->header_from_buffer (buffer, this->header_flag, &len,
-          &header)) {
-    GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
-    return NULL;
-  }
+          &header))
+    goto no_buffer;
 
   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from buffer");
   headerbuf = gst_buffer_new ();
@@ -233,7 +272,15 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
 
   /* we do not want to lose the ref on the incoming buffer */
   gst_buffer_ref (buffer);
+
   return gst_buffer_join (headerbuf, buffer);
+
+  /* ERRORS */
+no_buffer:
+  {
+    GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
+    return NULL;
+  }
 }
 
 static GstBuffer *
@@ -248,12 +295,8 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
   ret =
       this->packetizer->packet_from_event (event, this->header_flag, &len,
       &header, &payload);
-
-  if (!ret) {
-    GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
-        gst_event_type_get_name (event->type), event->type);
-    return NULL;
-  }
+  if (!ret)
+    goto no_event;
 
   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from event");
   headerbuf = gst_buffer_new ();
@@ -266,6 +309,14 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
   GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
 
   return gst_buffer_join (headerbuf, payloadbuf);
+
+  /* ERRORS */
+no_event:
+  {
+    GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
+        gst_event_type_get_name (event->type), event->type);
+    return NULL;
+  }
 }
 
 
@@ -341,14 +392,13 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
       bufval = &g_array_index (buffers, GValue, i);
       buffer = g_value_peek_pointer (bufval);
       outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
-      if (outbuffer) {
-        g_value_init (&value, GST_TYPE_BUFFER);
-        gst_value_set_buffer (&value, outbuffer);
-        gst_value_array_append_value (&array, &value);
-        g_value_unset (&value);
-      }
-      /* FIXME: if one or more in this loop fail to produce an outbuffer,
-       * should we error out ? Once ? Every time ? */
+      if (!outbuffer)
+        goto no_buffer;
+
+      g_value_init (&value, GST_TYPE_BUFFER);
+      gst_value_set_buffer (&value, outbuffer);
+      gst_value_array_append_value (&array, &value);
+      g_value_unset (&value);
     }
   }
 
@@ -406,38 +456,50 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
   this->sent_streamheader = TRUE;
   GST_DEBUG_OBJECT (this, "need to push %d queued buffers",
       g_list_length (this->queue));
-  if (this->queue) {
-    GList *l;
-
-    for (l = this->queue; l; l = g_list_next (l)) {
-      GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", l->data);
-      gst_buffer_set_caps (l->data, caps);
-      r = gst_pad_push (this->srcpad, l->data);
-      if (r != GST_FLOW_OK) {
-        GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
-        return r;
-      }
+  while (this->queue) {
+    GstBuffer *buffer;
+
+    buffer = GST_BUFFER_CAST (this->queue->data);
+    GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
+
+    /* delete buffer from queue now */
+    this->queue = g_list_delete_link (this->queue, this->queue);
+
+    /* set caps en push */
+    gst_buffer_set_caps (buffer, caps);
+    r = gst_pad_push (this->srcpad, buffer);
+    if (r != GST_FLOW_OK) {
+      GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
+      return r;
     }
   }
-
   return r;
+
+  /* ERRORS */
+no_buffer:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
+        ("failed to create GDP buffer from streamheader"));
+    return GST_FLOW_ERROR;
+  }
 }
 
 /* queue a buffer internally if we haven't sent streamheader buffers yet;
- * otherwise, just push on */
+ * otherwise, just push on, this takes ownership of the buffer. */
 static GstFlowReturn
 gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer)
 {
   if (this->sent_streamheader) {
-    GST_LOG_OBJECT (this, "Pushing GDP buffer %p", buffer);
-    GST_LOG_OBJECT (this, "set caps %" GST_PTR_FORMAT, this->caps);
+    GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT,
+        buffer, this->caps);
     return gst_pad_push (this->srcpad, buffer);
   }
 
-  /* store it on an internal queue */
+  /* store it on an internal queue. buffer remains reffed. */
   this->queue = g_list_append (this->queue, buffer);
   GST_DEBUG_OBJECT (this, "queued buffer %p, now %d buffers queued",
       buffer, g_list_length (this->queue));
+
   return GST_FLOW_OK;
 }
 
@@ -466,12 +528,7 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
     if (!outbuffer) {
       GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
           ("Could not create GDP buffer from new segment event"));
-/*
-      ret = GST_FLOW_ERROR;
-      goto done;
-*/
     } else {
-
       gst_gdp_stamp_buffer (this, outbuffer);
       GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
       GST_BUFFER_DURATION (outbuffer) = 0;
@@ -483,26 +540,16 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
 
   /* make sure we've received caps before */
   caps = gst_buffer_get_caps (buffer);
-  if (!this->caps && !caps) {
-    GST_WARNING_OBJECT (this, "first received buffer does not have caps set");
-    if (caps)
-      gst_caps_unref (caps);
-    ret = GST_FLOW_NOT_NEGOTIATED;
-    goto done;
-  }
+  if (!this->caps && !caps)
+    goto no_caps;
 
   /* if the caps have changed, process caps first */
   if (caps && !gst_caps_is_equal (this->caps, caps)) {
     GST_LOG_OBJECT (this, "caps changed to %p, %" GST_PTR_FORMAT, caps, caps);
     gst_caps_replace (&(this->caps), caps);
     outbuffer = gst_gdp_buffer_from_caps (this, caps);
-    if (!outbuffer) {
-      GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
-          ("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
-      gst_caps_unref (caps);
-      ret = GST_FLOW_ERROR;
-      goto done;
-    }
+    if (!outbuffer)
+      goto no_caps_buffer;
 
     gst_gdp_stamp_buffer (this, outbuffer);
     GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@@ -515,12 +562,8 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
   /* create a GDP header packet,
    * then create a GST buffer of the header packet and the buffer contents */
   outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
-  if (!outbuffer) {
-    GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
-        ("Could not create GDP buffer from buffer"));
-    ret = GST_FLOW_ERROR;
-    goto done;
-  }
+  if (!outbuffer)
+    goto no_buffer;
 
   gst_gdp_stamp_buffer (this, outbuffer);
   GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@@ -532,6 +575,34 @@ done:
   gst_buffer_unref (buffer);
   gst_object_unref (this);
   return ret;
+
+  /* ERRORS */
+no_caps:
+  {
+    /* when returning a fatal error as a GstFlowReturn we must post an error
+     * message */
+    GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
+        ("first received buffer does not have caps set"));
+    if (caps)
+      gst_caps_unref (caps);
+    ret = GST_FLOW_NOT_NEGOTIATED;
+    goto done;
+  }
+no_caps_buffer:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
+        ("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
+    gst_caps_unref (caps);
+    ret = GST_FLOW_ERROR;
+    goto done;
+  }
+no_buffer:
+  {
+    GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
+        ("Could not create GDP buffer from buffer"));
+    ret = GST_FLOW_ERROR;
+    goto done;
+  }
 }
 
 static gboolean
@@ -547,13 +618,9 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
 
   /* now turn the event into a buffer */
   outbuffer = gst_gdp_buffer_from_event (this, event);
-  if (!outbuffer) {
-    GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
-        ("Could not create GDP buffer from received event (type %s)",
-            gst_event_type_get_name (event->type)));
-    ret = FALSE;
-    goto done;
-  }
+  if (!outbuffer)
+    goto no_outbuffer;
+
   gst_gdp_stamp_buffer (this, outbuffer);
   GST_BUFFER_TIMESTAMP (outbuffer) = GST_EVENT_TIMESTAMP (event);
   GST_BUFFER_DURATION (outbuffer) = 0;
@@ -562,47 +629,87 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
    * and not send it on */
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_NEWSEGMENT:
-      GST_DEBUG_OBJECT (this, "received new_segment event");
-      if (this->new_segment_buf) {
-        gst_buffer_unref (this->new_segment_buf);
-      }
       GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf",
           outbuffer);
+
+      if (this->new_segment_buf)
+        gst_buffer_unref (this->new_segment_buf);
       this->new_segment_buf = outbuffer;
+
       gst_gdp_pay_reset_streamheader (this);
       break;
     case GST_EVENT_TAG:
-      GST_DEBUG_OBJECT (this, "received tag event");
-      if (this->tag_buf) {
-        gst_buffer_unref (this->tag_buf);
-      }
       GST_DEBUG_OBJECT (this, "Storing buffer %p as tag_buf", outbuffer);
+
+      if (this->tag_buf)
+        gst_buffer_unref (this->tag_buf);
       this->tag_buf = outbuffer;
+
       gst_gdp_pay_reset_streamheader (this);
       break;
     default:
       GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer,
           event);
       flowret = gst_gdp_queue_buffer (this, outbuffer);
-      if (flowret != GST_FLOW_OK) {
-        GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d",
-            flowret);
-        ret = FALSE;
-        goto done;
-      }
+      if (flowret != GST_FLOW_OK)
+        goto push_error;
       break;
   }
 
   /* if we have EOS, we should send on EOS ourselves */
   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
     GST_DEBUG_OBJECT (this, "Sending on EOS event %p", event);
-    return gst_pad_push_event (this->srcpad, event);
-  };
+    /* ref, we unref later again */
+    ret = gst_pad_push_event (this->srcpad, gst_event_ref (event));
+  }
 
 done:
-  gst_object_unref (this);
   gst_event_unref (event);
+  gst_object_unref (this);
+
   return ret;
+
+  /* ERRORS */
+no_outbuffer:
+  {
+    GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
+        ("Could not create GDP buffer from received event (type %s)",
+            gst_event_type_get_name (event->type)));
+    ret = FALSE;
+    goto done;
+  }
+push_error:
+  {
+    GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d", flowret);
+    ret = FALSE;
+    goto done;
+  }
+}
+
+static gboolean
+gst_gdp_pay_src_event (GstPad * pad, GstEvent * event)
+{
+  GstGDPPay *this;
+  gboolean res = TRUE;
+
+  this = GST_GDP_PAY (gst_pad_get_parent (pad));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEEK:
+      /* we refuse seek for now. */
+      gst_event_unref (event);
+      res = FALSE;
+      break;
+    case GST_EVENT_QOS:
+    case GST_EVENT_NAVIGATION:
+    default:
+      /* everything else is passed */
+      res = gst_pad_push_event (this->sinkpad, event);
+      break;
+  }
+  gst_object_unref (this);
+
+  return res;
 }
 
 static void
@@ -667,7 +774,6 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
 
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
-      this->packetizer = gst_dp_packetizer_new (this->version);
       break;
     default:
       break;
@@ -677,16 +783,7 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
 
   switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      if (this->packetizer) {
-        gst_dp_packetizer_free (this->packetizer);
-        this->packetizer = NULL;
-      }
-      break;
-    case GST_STATE_CHANGE_READY_TO_NULL:
-      if (this->caps) {
-        gst_caps_unref (this->caps);
-        this->caps = NULL;
-      }
+      gst_gdp_pay_reset (this);
       break;
     default:
       break;
index 5bde082..22bbcd2 100644 (file)
@@ -41,6 +41,7 @@ typedef struct _GstGDPPayClass GstGDPPayClass;
 struct _GstGDPPay
 {
   GstElement element;
+
   GstPad *sinkpad;
   GstPad *srcpad;