GstBaseSink *basesink,
GstBuffer *buffer);
+static gboolean
+gst_nice_sink_unlock (GstBaseSink *basesink);
+
+static gboolean
+gst_nice_sink_unlock_stop (GstBaseSink *basesink);
+
+static void
+_reliable_transport_writable (
+ NiceAgent *agent,
+ guint stream_id,
+ guint component_id,
+ GstNiceSink *sink);
+
static void
gst_nice_sink_set_property (
GObject *object,
gstbasesink_class = (GstBaseSinkClass *) klass;
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_nice_sink_render);
+ gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_nice_sink_unlock);
+ gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_nice_sink_unlock_stop);
gobject_class = (GObjectClass *) klass;
gobject_class->set_property = gst_nice_sink_set_property;
static void
gst_nice_sink_init (GstNiceSink *sink)
{
+ g_cond_init (&sink->writable_cond);
+}
+
+static void
+_reliable_transport_writable (NiceAgent *agent, guint stream_id,
+ guint component_id, GstNiceSink *sink)
+{
+ GST_OBJECT_LOCK (sink);
+ if (stream_id == sink->stream_id && component_id == sink->component_id) {
+ g_cond_broadcast (&sink->writable_cond);
+ }
+ GST_OBJECT_UNLOCK (sink);
}
static GstFlowReturn
gst_nice_sink_render (GstBaseSink *basesink, GstBuffer *buffer)
{
GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+ guint written = 0;
+ gint ret;
+ gchar *data = NULL;
+ guint size = 0;
+ GstFlowReturn flow_ret = GST_FLOW_OK;
#if GST_CHECK_VERSION (1,0,0)
GstMapInfo info;
gst_buffer_map (buffer, &info, GST_MAP_READ);
+ data = (gchar *) info.data;
+ size = info.size;
+#else
+ data = (gchar *) GST_BUFFER_DATA (buffer);
+ size = GST_BUFFER_SIZE (buffer);
+#endif
- nice_agent_send (nicesink->agent, nicesink->stream_id,
- nicesink->component_id, info.size, (gchar *) info.data);
+ GST_OBJECT_LOCK (nicesink);
+ do {
+ ret = nice_agent_send (nicesink->agent, nicesink->stream_id,
+ nicesink->component_id, size - written, data + written);
+ if (ret > 0)
+ written += ret;
- gst_buffer_unmap (buffer, &info);
+ if (nicesink->reliable && written < size)
+ g_cond_wait (&nicesink->writable_cond, GST_OBJECT_GET_LOCK (nicesink));
+ if (nicesink->flushing) {
+#if GST_CHECK_VERSION (1,0,0)
+ flow_ret = GST_FLOW_FLUSHING;
#else
- nice_agent_send (nicesink->agent, nicesink->stream_id,
- nicesink->component_id, GST_BUFFER_SIZE (buffer),
- (gchar *) GST_BUFFER_DATA (buffer));
+ flow_ret = GST_FLOW_WRONG_STATE;
#endif
+ break;
+ }
+ } while (nicesink->reliable && written < size);
+ GST_OBJECT_UNLOCK (nicesink);
- return GST_FLOW_OK;
+#if GST_CHECK_VERSION (1,0,0)
+ gst_buffer_unmap (buffer, &info);
+#endif
+
+ return flow_ret;
+}
+
+static gboolean gst_nice_sink_unlock (GstBaseSink *basesink)
+{
+ GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+
+ GST_OBJECT_LOCK (nicesink);
+ nicesink->flushing = TRUE;
+ g_cond_broadcast (&nicesink->writable_cond);
+ GST_OBJECT_UNLOCK (nicesink);
+
+ return TRUE;
+}
+
+static gboolean gst_nice_sink_unlock_stop (GstBaseSink *basesink)
+{
+ GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+
+ GST_OBJECT_LOCK (nicesink);
+ nicesink->flushing = FALSE;
+ GST_OBJECT_UNLOCK (nicesink);
+
+ return TRUE;
}
{
GstNiceSink *sink = GST_NICE_SINK (object);
- if (sink->agent)
+ if (sink->agent) {
+ g_signal_handler_disconnect (sink->agent, sink->writable_id);
g_object_unref (sink->agent);
+ }
sink->agent = NULL;
+ g_cond_clear (&sink->writable_cond);
+
G_OBJECT_CLASS (gst_nice_sink_parent_class)->dispose (object);
}
switch (prop_id)
{
case PROP_AGENT:
- if (sink->agent)
+ if (sink->agent) {
GST_ERROR_OBJECT (object,
"Changing the agent on a nice sink not allowed");
- else
+ } else {
sink->agent = g_value_dup_object (value);
+ g_object_get (sink->agent, "reliable", &sink->reliable, NULL);
+ if (sink->reliable)
+ sink->writable_id = g_signal_connect (sink->agent,
+ "reliable-transport-writable",
+ (GCallback) _reliable_transport_writable, sink);
+ }
break;
case PROP_STREAM:
+ GST_OBJECT_LOCK (sink);
sink->stream_id = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (sink);
break;
case PROP_COMPONENT:
+ GST_OBJECT_LOCK (sink);
sink->component_id = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (sink);
break;
default:
break;
case PROP_STREAM:
+ GST_OBJECT_LOCK (sink);
g_value_set_uint (value, sink->stream_id);
+ GST_OBJECT_UNLOCK (sink);
break;
case PROP_COMPONENT:
+ GST_OBJECT_LOCK (sink);
g_value_set_uint (value, sink->component_id);
+ GST_OBJECT_UNLOCK (sink);
break;
default: