static gboolean gst_rtmp_sink_stop (GstBaseSink * sink);
static gboolean gst_rtmp_sink_start (GstBaseSink * sink);
static gboolean gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event);
+static gboolean gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps);
static GstFlowReturn gst_rtmp_sink_render (GstBaseSink * sink, GstBuffer * buf);
#define gst_rtmp_sink_parent_class parent_class
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_sink_stop);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp_sink_render);
- gstbasesink_class->event = gst_rtmp_sink_event;
+ gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp_sink_setcaps);
+ gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp_sink_event);
GST_DEBUG_CATEGORY_INIT (gst_rtmp_sink_debug, "rtmpsink", 0,
"RTMP server element");
{
GstRTMPSink *sink = GST_RTMP_SINK (basesink);
- gst_buffer_replace (&sink->cache, NULL);
-
+ if (sink->header) {
+ gst_buffer_unref (sink->header);
+ sink->header = NULL;
+ }
if (sink->rtmp) {
RTMP_Close (sink->rtmp);
RTMP_Free (sink->rtmp);
gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstRTMPSink *sink = GST_RTMP_SINK (bsink);
- GstBuffer *reffed_buf = NULL;
+ gboolean need_unref = FALSE;
GstMapInfo map = GST_MAP_INFO_INIT;
if (sink->rtmp == NULL) {
return GST_FLOW_ERROR;
}
+ /* Ignore buffers that are in the stream headers (caps) */
+ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
+ return GST_FLOW_OK;
+ }
+
if (sink->first) {
/* open the connection */
if (!RTMP_IsConnected (sink->rtmp)) {
GST_DEBUG_OBJECT (sink, "Opened connection to %s", sink->rtmp_uri);
}
- /* FIXME: Parse the first buffer and see if it contains a header plus a packet instead
- * of just assuming it's only the header */
- GST_LOG_OBJECT (sink, "Caching first buffer of size %" G_GSIZE_FORMAT
- " for concatenation", gst_buffer_get_size (buf));
- gst_buffer_replace (&sink->cache, buf);
- sink->first = FALSE;
- return GST_FLOW_OK;
- }
+ /* Prepend the header from the caps to the first non header buffer */
+ if (sink->header) {
+ buf = gst_buffer_append (gst_buffer_ref (sink->header),
+ gst_buffer_ref (buf));
+ need_unref = TRUE;
+ }
- if (sink->cache) {
- GST_LOG_OBJECT (sink, "Joining 2nd buffer of size %" G_GSIZE_FORMAT
- " to cached buf", gst_buffer_get_size (buf));
- gst_buffer_ref (buf);
- reffed_buf = buf = gst_buffer_append (sink->cache, buf);
- sink->cache = NULL;
+ sink->first = FALSE;
}
if (sink->have_write_error)
goto write_failed;
gst_buffer_unmap (buf, &map);
- if (reffed_buf)
- gst_buffer_unref (reffed_buf);
+ if (need_unref)
+ gst_buffer_unref (buf);
return GST_FLOW_OK;
{
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data"));
gst_buffer_unmap (buf, &map);
- if (reffed_buf)
- gst_buffer_unref (reffed_buf);
+ if (need_unref)
+ gst_buffer_unref (buf);
sink->have_write_error = TRUE;
return GST_FLOW_ERROR;
}
}
}
+static gboolean
+gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps)
+{
+ GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink);
+ GstStructure *s;
+ const GValue *sh;
+ GArray *buffers;
+ gint i;
+
+ GST_DEBUG_OBJECT (sink, "caps set to %" GST_PTR_FORMAT, caps);
+
+ /* Clear our current header buffer */
+ if (rtmpsink->header) {
+ gst_buffer_unref (rtmpsink->header);
+ rtmpsink->header = NULL;
+ }
+
+ rtmpsink->header = gst_buffer_new ();
+
+ s = gst_caps_get_structure (caps, 0);
+
+ sh = gst_structure_get_value (s, "streamheader");
+ buffers = g_value_peek_pointer (sh);
+
+ /* Concatenate all buffers in streamheader into one */
+ for (i = 0; i < buffers->len; ++i) {
+ GValue *val;
+ GstBuffer *buf;
+
+ val = &g_array_index (buffers, GValue, i);
+ buf = g_value_peek_pointer (val);
+
+ gst_buffer_ref (buf);
+
+ rtmpsink->header = gst_buffer_append (rtmpsink->header, buf);
+ }
+
+ GST_DEBUG_OBJECT (rtmpsink, "have %" G_GSIZE_FORMAT " bytes of header data",
+ gst_buffer_get_size (rtmpsink->header));
+
+ return TRUE;
+}
+
static gboolean
gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event)
{