nicesink: Block until component is writable if in reliable mode
authorYouness Alaoui <kakaroto@kakaroto.homelinux.net>
Fri, 8 Aug 2014 19:32:13 +0000 (15:32 -0400)
committerOlivier CrĂȘte <olivier.crete@collabora.com>
Wed, 13 Aug 2014 23:31:01 +0000 (19:31 -0400)
gst/gstnicesink.c
gst/gstnicesink.h

index 6580c8e..5433a30 100644 (file)
@@ -49,6 +49,19 @@ gst_nice_sink_render (
   GstBaseSink *basesink,
   GstBuffer *buffer);
 
+static gboolean
+gst_nice_sink_unlock (GstBaseSink *basesink);
+
+static gboolean
+gst_nice_sink_unlock_stop (GstBaseSink *basesink);
+
+static void
+_reliable_transport_writable (
+    NiceAgent *agent,
+    guint stream_id,
+    guint component_id,
+    GstNiceSink *sink);
+
 static void
 gst_nice_sink_set_property (
   GObject *object,
@@ -99,6 +112,8 @@ gst_nice_sink_class_init (GstNiceSinkClass *klass)
 
   gstbasesink_class = (GstBaseSinkClass *) klass;
   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_nice_sink_render);
+  gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_nice_sink_unlock);
+  gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_nice_sink_unlock_stop);
 
   gobject_class = (GObjectClass *) klass;
   gobject_class->set_property = gst_nice_sink_set_property;
@@ -153,29 +168,89 @@ gst_nice_sink_class_init (GstNiceSinkClass *klass)
 static void
 gst_nice_sink_init (GstNiceSink *sink)
 {
+  g_cond_init (&sink->writable_cond);
+}
+
+static void
+_reliable_transport_writable (NiceAgent *agent, guint stream_id,
+    guint component_id, GstNiceSink *sink)
+{
+  GST_OBJECT_LOCK (sink);
+  if (stream_id == sink->stream_id && component_id == sink->component_id) {
+    g_cond_broadcast (&sink->writable_cond);
+  }
+  GST_OBJECT_UNLOCK (sink);
 }
 
 static GstFlowReturn
 gst_nice_sink_render (GstBaseSink *basesink, GstBuffer *buffer)
 {
   GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+  guint written = 0;
+  gint ret;
+  gchar *data = NULL;
+  guint size = 0;
+  GstFlowReturn flow_ret = GST_FLOW_OK;
 
 #if GST_CHECK_VERSION (1,0,0)
   GstMapInfo info;
 
   gst_buffer_map (buffer, &info, GST_MAP_READ);
+  data = (gchar *) info.data;
+  size = info.size;
+#else
+  data = (gchar *) GST_BUFFER_DATA (buffer);
+  size = GST_BUFFER_SIZE (buffer);
+#endif
 
-  nice_agent_send (nicesink->agent, nicesink->stream_id,
-      nicesink->component_id, info.size, (gchar *) info.data);
+  GST_OBJECT_LOCK (nicesink);
+  do {
+    ret = nice_agent_send (nicesink->agent, nicesink->stream_id,
+        nicesink->component_id, size - written, data + written);
+    if (ret > 0)
+      written += ret;
 
-  gst_buffer_unmap (buffer, &info);
+    if (nicesink->reliable && written < size)
+      g_cond_wait (&nicesink->writable_cond, GST_OBJECT_GET_LOCK (nicesink));
+    if (nicesink->flushing) {
+#if GST_CHECK_VERSION (1,0,0)
+      flow_ret = GST_FLOW_FLUSHING;
 #else
-  nice_agent_send (nicesink->agent, nicesink->stream_id,
-      nicesink->component_id, GST_BUFFER_SIZE (buffer),
-      (gchar *) GST_BUFFER_DATA (buffer));
+      flow_ret = GST_FLOW_WRONG_STATE;
 #endif
+      break;
+    }
+  } while (nicesink->reliable && written < size);
+  GST_OBJECT_UNLOCK (nicesink);
 
-  return GST_FLOW_OK;
+#if GST_CHECK_VERSION (1,0,0)
+  gst_buffer_unmap (buffer, &info);
+#endif
+
+  return flow_ret;
+}
+
+static gboolean gst_nice_sink_unlock (GstBaseSink *basesink)
+{
+  GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+
+  GST_OBJECT_LOCK (nicesink);
+  nicesink->flushing = TRUE;
+  g_cond_broadcast (&nicesink->writable_cond);
+  GST_OBJECT_UNLOCK (nicesink);
+
+  return TRUE;
+}
+
+static gboolean gst_nice_sink_unlock_stop (GstBaseSink *basesink)
+{
+  GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+
+  GST_OBJECT_LOCK (nicesink);
+  nicesink->flushing = FALSE;
+  GST_OBJECT_UNLOCK (nicesink);
+
+  return TRUE;
 }
 
 
@@ -184,10 +259,14 @@ gst_nice_sink_dispose (GObject *object)
 {
   GstNiceSink *sink = GST_NICE_SINK (object);
 
-  if (sink->agent)
+  if (sink->agent) {
+    g_signal_handler_disconnect (sink->agent, sink->writable_id);
     g_object_unref (sink->agent);
+  }
   sink->agent = NULL;
 
+  g_cond_clear (&sink->writable_cond);
+
   G_OBJECT_CLASS (gst_nice_sink_parent_class)->dispose (object);
 }
 
@@ -203,19 +282,29 @@ gst_nice_sink_set_property (
   switch (prop_id)
     {
     case PROP_AGENT:
-      if (sink->agent)
+      if (sink->agent) {
         GST_ERROR_OBJECT (object,
             "Changing the agent on a nice sink not allowed");
-      else
+      } else {
         sink->agent = g_value_dup_object (value);
+        g_object_get (sink->agent, "reliable", &sink->reliable, NULL);
+        if (sink->reliable)
+          sink->writable_id = g_signal_connect (sink->agent,
+              "reliable-transport-writable",
+              (GCallback) _reliable_transport_writable, sink);
+      }
       break;
 
     case PROP_STREAM:
+      GST_OBJECT_LOCK (sink);
       sink->stream_id = g_value_get_uint (value);
+      GST_OBJECT_UNLOCK (sink);
       break;
 
     case PROP_COMPONENT:
+      GST_OBJECT_LOCK (sink);
       sink->component_id = g_value_get_uint (value);
+      GST_OBJECT_UNLOCK (sink);
       break;
 
     default:
@@ -240,11 +329,15 @@ gst_nice_sink_get_property (
       break;
 
     case PROP_STREAM:
+      GST_OBJECT_LOCK (sink);
       g_value_set_uint (value, sink->stream_id);
+      GST_OBJECT_UNLOCK (sink);
       break;
 
     case PROP_COMPONENT:
+      GST_OBJECT_LOCK (sink);
       g_value_set_uint (value, sink->component_id);
+      GST_OBJECT_UNLOCK (sink);
       break;
 
     default:
index 8ec0641..9529f64 100644 (file)
@@ -65,6 +65,10 @@ struct _GstNiceSink
   NiceAgent *agent;
   guint stream_id;
   guint component_id;
+  gboolean reliable;
+  GCond writable_cond;
+  gulong writable_id;
+  gboolean flushing;
 };
 
 typedef struct _GstNiceSinkClass GstNiceSinkClass;