be4666549ede441f6d210c2720f959a450de8006
[platform/upstream/libnice.git] / gst / gstnicesink.c
1
2 /*
3  * This file is part of the Nice GLib ICE library.
4  *
5  * (C) 2006, 2007 Collabora Ltd.
6  *  Contact: Dafydd Harries
7  * (C) 2006, 2007 Nokia Corporation. All rights reserved.
8  *  Contact: Kai Vehmanen
9  *
10  * The contents of this file are subject to the Mozilla Public License Version
11  * 1.1 (the "License"); you may not use this file except in compliance with
12  * the License. You may obtain a copy of the License at
13  * http://www.mozilla.org/MPL/
14  *
15  * Software distributed under the License is distributed on an "AS IS" basis,
16  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
17  * for the specific language governing rights and limitations under the
18  * License.
19  *
20  * The Original Code is the Nice GLib ICE library.
21  *
22  * The Initial Developers of the Original Code are Collabora Ltd and Nokia
23  * Corporation. All Rights Reserved.
24  *
25  * Contributors:
26  *   Dafydd Harries, Collabora Ltd.
27  *
28  * Alternatively, the contents of this file may be used under the terms of the
29  * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
30  * case the provisions of LGPL are applicable instead of those above. If you
31  * wish to allow use of your version of this file only under the terms of the
32  * LGPL and not to allow others to use your version of this file under the
33  * MPL, indicate your decision by deleting the provisions above and replace
34  * them with the notice and other provisions required by the LGPL. If you do
35  * not delete the provisions above, a recipient may use your version of this
36  * file under either the MPL or the LGPL.
37  */
38 #ifdef HAVE_CONFIG_H
39 # include "config.h"
40 #endif
41
42 #include "gstnicesink.h"
43
44
45 GST_DEBUG_CATEGORY_STATIC (nicesink_debug);
46 #define GST_CAT_DEFAULT nicesink_debug
47
48 static GstFlowReturn
49 gst_nice_sink_render (
50   GstBaseSink *basesink,
51   GstBuffer *buffer);
52 #if GST_CHECK_VERSION (1,0,0)
53 static GstFlowReturn
54 gst_nice_sink_render_list (
55   GstBaseSink *basesink,
56   GstBufferList *buffer_list);
57 #endif
58
59 static gboolean
60 gst_nice_sink_unlock (GstBaseSink *basesink);
61
62 static gboolean
63 gst_nice_sink_unlock_stop (GstBaseSink *basesink);
64
65 static void
66 _reliable_transport_writable (
67     NiceAgent *agent,
68     guint stream_id,
69     guint component_id,
70     GstNiceSink *sink);
71
72 static void
73 gst_nice_sink_set_property (
74   GObject *object,
75   guint prop_id,
76   const GValue *value,
77   GParamSpec *pspec);
78
79 static void
80 gst_nice_sink_get_property (
81   GObject *object,
82   guint prop_id,
83   GValue *value,
84   GParamSpec *pspec);
85
86 static void
87 gst_nice_sink_dispose (GObject *object);
88 #if GST_CHECK_VERSION (1,0,0)
89 static void
90 gst_nice_sink_finalize (GObject *object);
91 #endif
92
93 static GstStateChangeReturn
94 gst_nice_sink_change_state (
95     GstElement * element,
96     GstStateChange transition);
97
98 static GstStaticPadTemplate gst_nice_sink_sink_template =
99 GST_STATIC_PAD_TEMPLATE (
100     "sink",
101     GST_PAD_SINK,
102     GST_PAD_ALWAYS,
103     GST_STATIC_CAPS_ANY);
104
105 G_DEFINE_TYPE (GstNiceSink, gst_nice_sink, GST_TYPE_BASE_SINK);
106
107 enum
108 {
109   PROP_AGENT = 1,
110   PROP_STREAM,
111   PROP_COMPONENT
112 };
113
114 static void
115 gst_nice_sink_class_init (GstNiceSinkClass *klass)
116 {
117   GstBaseSinkClass *gstbasesink_class;
118   GstElementClass *gstelement_class;
119   GObjectClass *gobject_class;
120
121   GST_DEBUG_CATEGORY_INIT (nicesink_debug, "nicesink",
122       0, "libnice sink");
123
124   gstbasesink_class = (GstBaseSinkClass *) klass;
125   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_nice_sink_render);
126 #if GST_CHECK_VERSION (1,0,0)
127   gstbasesink_class->render_list = GST_DEBUG_FUNCPTR (gst_nice_sink_render_list);
128 #endif
129   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_nice_sink_unlock);
130   gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_nice_sink_unlock_stop);
131
132   gobject_class = (GObjectClass *) klass;
133   gobject_class->set_property = gst_nice_sink_set_property;
134   gobject_class->get_property = gst_nice_sink_get_property;
135   gobject_class->dispose = gst_nice_sink_dispose;
136 #if GST_CHECK_VERSION (1,0,0)
137   gobject_class->finalize = gst_nice_sink_finalize;
138 #endif
139
140   gstelement_class = (GstElementClass *) klass;
141   gstelement_class->change_state = gst_nice_sink_change_state;
142
143   gst_element_class_add_pad_template (gstelement_class,
144       gst_static_pad_template_get (&gst_nice_sink_sink_template));
145 #if GST_CHECK_VERSION (1,0,0)
146   gst_element_class_set_metadata (gstelement_class,
147 #else
148   gst_element_class_set_details_simple (gstelement_class,
149 #endif
150     "ICE sink",
151     "Sink",
152     "Interactive UDP connectivity establishment",
153     "Dafydd Harries <dafydd.harries@collabora.co.uk>");
154
155
156   g_object_class_install_property (gobject_class, PROP_AGENT,
157       g_param_spec_object (
158          "agent",
159          "Agent",
160          "The NiceAgent this source is bound to",
161          NICE_TYPE_AGENT,
162          G_PARAM_READWRITE));
163
164   g_object_class_install_property (gobject_class, PROP_STREAM,
165       g_param_spec_uint (
166          "stream",
167          "Stream ID",
168          "The ID of the stream to read from",
169          0,
170          G_MAXUINT,
171          0,
172          G_PARAM_READWRITE));
173
174   g_object_class_install_property (gobject_class, PROP_COMPONENT,
175       g_param_spec_uint (
176          "component",
177          "Component ID",
178          "The ID of the component to read from",
179          0,
180          G_MAXUINT,
181          0,
182          G_PARAM_READWRITE));
183 }
184
185 static void
186 gst_nice_sink_init (GstNiceSink *sink)
187 {
188 #if GST_CHECK_VERSION (1,0,0)
189   guint max_mem;
190 #endif
191
192   g_cond_init (&sink->writable_cond);
193
194 #if GST_CHECK_VERSION (1,0,0)
195   /* pre-allocate OutputVector, MapInfo and OutputMessage arrays
196    * for use in the render and render_list functions */
197   max_mem = gst_buffer_get_max_memory ();
198
199   sink->n_vecs = max_mem;
200   sink->vecs = g_new (GOutputVector, sink->n_vecs);
201
202   sink->n_maps = max_mem;
203   sink->maps = g_new (GstMapInfo, sink->n_maps);
204
205   sink->n_messages = 1;
206   sink->messages = g_new (NiceOutputMessage, sink->n_messages);
207 #endif
208 }
209
210 static void
211 _reliable_transport_writable (NiceAgent *agent, guint stream_id,
212     guint component_id, GstNiceSink *sink)
213 {
214   GST_OBJECT_LOCK (sink);
215   if (stream_id == sink->stream_id && component_id == sink->component_id) {
216     g_cond_broadcast (&sink->writable_cond);
217   }
218   GST_OBJECT_UNLOCK (sink);
219 }
220
221 #if GST_CHECK_VERSION (1,0,0)
222 static gsize
223 fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
224 {
225   GstMemory *mem;
226   gsize size = 0;
227   guint i;
228
229   g_assert_cmpuint (gst_buffer_n_memory (buf), ==, n);
230
231   for (i = 0; i < n; ++i) {
232     mem = gst_buffer_peek_memory (buf, i);
233     if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
234       vecs[i].buffer = maps[i].data;
235       vecs[i].size = maps[i].size;
236     } else {
237       GST_WARNING ("Failed to map memory %p for reading", mem);
238       vecs[i].buffer = "";
239       vecs[i].size = 0;
240     }
241     size += vecs[i].size;
242   }
243
244   return size;
245 }
246
247 /* Buffer list code written by
248  *   Tim-Philipp Müller <tim@centricular.com>
249  * taken from
250  *   gst-plugins-good/gst/udp/gstmultiudpsink.c
251  */
252 static GstFlowReturn
253 gst_nice_sink_render_buffers (GstNiceSink * sink, GstBuffer ** buffers,
254     guint num_buffers, guint8 * mem_nums, guint total_mem_num)
255 {
256   NiceOutputMessage *msgs;
257   GOutputVector *vecs;
258   GstMapInfo *map_infos;
259   guint i, mem;
260   guint written = 0;
261   gint ret;
262   GstFlowReturn flow_ret = GST_FLOW_OK;
263
264   GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent",
265       num_buffers, total_mem_num);
266
267   if (sink->n_vecs < total_mem_num) {
268     sink->n_vecs = GST_ROUND_UP_16 (total_mem_num);
269     g_free (sink->vecs);
270     sink->vecs = g_new (GOutputVector, sink->n_vecs);
271   }
272   vecs = sink->vecs;
273
274   if (sink->n_maps < total_mem_num) {
275     sink->n_maps = GST_ROUND_UP_16 (total_mem_num);
276     g_free (sink->maps);
277     sink->maps = g_new (GstMapInfo, sink->n_maps);
278   }
279   map_infos = sink->maps;
280
281   if (sink->n_messages < num_buffers) {
282     sink->n_messages = GST_ROUND_UP_16 (num_buffers);
283     g_free (sink->messages);
284     sink->messages = g_new (NiceOutputMessage, sink->n_messages);
285   }
286   msgs = sink->messages;
287
288   for (i = 0, mem = 0; i < num_buffers; ++i) {
289     fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]);
290     msgs[i].buffers = &vecs[mem];
291     msgs[i].n_buffers = mem_nums[i];
292     mem += mem_nums[i];
293   }
294
295   GST_OBJECT_LOCK (sink);
296   do {
297     ret = nice_agent_send_messages_nonblocking(sink->agent, sink->stream_id,
298         sink->component_id, msgs + written, num_buffers - written, NULL, NULL);
299
300     if (ret > 0)
301       written += ret;
302
303     if (sink->reliable && written < num_buffers)
304       g_cond_wait (&sink->writable_cond, GST_OBJECT_GET_LOCK (sink));
305
306     if (sink->flushing) {
307       flow_ret = GST_FLOW_FLUSHING;
308       break;
309     }
310   } while (sink->reliable && written < num_buffers);
311   GST_OBJECT_UNLOCK (sink);
312
313   for (i = 0; i < mem; ++i)
314     gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
315
316   return flow_ret;
317 }
318 #endif
319
320 static GstFlowReturn
321 gst_nice_sink_render (GstBaseSink *basesink, GstBuffer *buffer)
322 {
323   GstNiceSink *nicesink = GST_NICE_SINK (basesink);
324   GstFlowReturn flow_ret = GST_FLOW_OK;
325 #if GST_CHECK_VERSION (1,0,0)
326   guint8 n_mem;
327
328   n_mem = gst_buffer_n_memory (buffer);
329
330   if (n_mem > 0) {
331     flow_ret = gst_nice_sink_render_buffers (nicesink, &buffer, 1, &n_mem,
332         n_mem);
333   }
334
335 #else
336   guint written = 0;
337   gint ret;
338   gchar *data = NULL;
339   guint size = 0;
340
341   data = (gchar *) GST_BUFFER_DATA (buffer);
342   size = GST_BUFFER_SIZE (buffer);
343
344   GST_OBJECT_LOCK (nicesink);
345   do {
346     ret = nice_agent_send (nicesink->agent, nicesink->stream_id,
347         nicesink->component_id, size - written, data + written);
348     if (ret > 0)
349       written += ret;
350
351     if (nicesink->reliable && written < size)
352       g_cond_wait (&nicesink->writable_cond, GST_OBJECT_GET_LOCK (nicesink));
353     if (nicesink->flushing) {
354       flow_ret = GST_FLOW_WRONG_STATE;
355       break;
356     }
357   } while (nicesink->reliable && written < size);
358   GST_OBJECT_UNLOCK (nicesink);
359
360 #endif
361   return flow_ret;
362 }
363
364 #if GST_CHECK_VERSION (1,0,0)
365 static GstFlowReturn
366 gst_nice_sink_render_list (GstBaseSink *basesink, GstBufferList *buffer_list)
367 {
368   GstNiceSink *nicesink = GST_NICE_SINK (basesink);
369   GstBuffer **buffers;
370   GstFlowReturn flow_ret = GST_FLOW_OK;
371   guint8 *mem_nums;
372   guint total_mems;
373   guint i, num_buffers;
374
375   num_buffers = gst_buffer_list_length (buffer_list);
376   if (num_buffers == 0)
377     goto no_data;
378
379   buffers = g_newa (GstBuffer *, num_buffers);
380   mem_nums = g_newa (guint8, num_buffers);
381   for (i = 0, total_mems = 0; i < num_buffers; ++i) {
382     buffers[i] = gst_buffer_list_get (buffer_list, i);
383     mem_nums[i] = gst_buffer_n_memory (buffers[i]);
384     total_mems += mem_nums[i];
385   }
386
387   flow_ret = gst_nice_sink_render_buffers (nicesink, buffers, num_buffers,
388       mem_nums, total_mems);
389
390   return flow_ret;
391
392 no_data:
393   {
394     GST_LOG_OBJECT (nicesink, "empty buffer");
395     return GST_FLOW_OK;
396   }
397
398   return flow_ret;
399 }
400 #endif
401
402 static gboolean gst_nice_sink_unlock (GstBaseSink *basesink)
403 {
404   GstNiceSink *nicesink = GST_NICE_SINK (basesink);
405
406   GST_OBJECT_LOCK (nicesink);
407   nicesink->flushing = TRUE;
408   g_cond_broadcast (&nicesink->writable_cond);
409   GST_OBJECT_UNLOCK (nicesink);
410
411   return TRUE;
412 }
413
414 static gboolean gst_nice_sink_unlock_stop (GstBaseSink *basesink)
415 {
416   GstNiceSink *nicesink = GST_NICE_SINK (basesink);
417
418   GST_OBJECT_LOCK (nicesink);
419   nicesink->flushing = FALSE;
420   GST_OBJECT_UNLOCK (nicesink);
421
422   return TRUE;
423 }
424
425 static void
426 gst_nice_sink_dispose (GObject *object)
427 {
428   GstNiceSink *sink = GST_NICE_SINK (object);
429
430   if (sink->agent && sink->writable_id)
431     g_signal_handler_disconnect (sink->agent, sink->writable_id);
432   sink->writable_id = 0;
433   g_clear_object (&sink->agent);
434
435   g_cond_clear (&sink->writable_cond);
436
437   G_OBJECT_CLASS (gst_nice_sink_parent_class)->dispose (object);
438 }
439
440 #if GST_CHECK_VERSION (1,0,0)
441 static void
442 gst_nice_sink_finalize (GObject *object)
443 {
444   GstNiceSink *sink = GST_NICE_SINK (object);
445
446   g_free (sink->vecs);
447   sink->vecs = NULL;
448   sink->n_vecs = 0;
449   g_free (sink->maps);
450   sink->maps = NULL;
451   sink->n_maps = 0;
452   g_free (sink->messages);
453   sink->messages = NULL;
454   sink->n_messages = 0;
455
456   G_OBJECT_CLASS (gst_nice_sink_parent_class)->finalize (object);
457 }
458 #endif
459
460 static void
461 gst_nice_sink_set_property (
462   GObject *object,
463   guint prop_id,
464   const GValue *value,
465   GParamSpec *pspec)
466 {
467   GstNiceSink *sink = GST_NICE_SINK (object);
468
469   switch (prop_id)
470     {
471     case PROP_AGENT:
472       if (sink->agent) {
473         GST_ERROR_OBJECT (object,
474             "Changing the agent on a nice sink not allowed");
475       } else {
476         sink->agent = g_value_dup_object (value);
477         g_object_get (sink->agent, "reliable", &sink->reliable, NULL);
478         if (sink->reliable)
479           sink->writable_id = g_signal_connect (sink->agent,
480               "reliable-transport-writable",
481               (GCallback) _reliable_transport_writable, sink);
482       }
483       break;
484
485     case PROP_STREAM:
486       GST_OBJECT_LOCK (sink);
487       sink->stream_id = g_value_get_uint (value);
488       GST_OBJECT_UNLOCK (sink);
489       break;
490
491     case PROP_COMPONENT:
492       {
493         guint new_component_id = g_value_get_uint (value);
494         GST_OBJECT_LOCK (sink);
495         if (sink->component_id != new_component_id) {
496           sink->component_id = new_component_id;
497           g_cond_broadcast (&sink->writable_cond);
498         }
499         GST_OBJECT_UNLOCK (sink);
500       }
501       break;
502
503     default:
504       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
505       break;
506     }
507 }
508
509 static void
510 gst_nice_sink_get_property (
511   GObject *object,
512   guint prop_id,
513   GValue *value,
514   GParamSpec *pspec)
515 {
516   GstNiceSink *sink = GST_NICE_SINK (object);
517
518   switch (prop_id)
519     {
520     case PROP_AGENT:
521       g_value_set_object (value, sink->agent);
522       break;
523
524     case PROP_STREAM:
525       GST_OBJECT_LOCK (sink);
526       g_value_set_uint (value, sink->stream_id);
527       GST_OBJECT_UNLOCK (sink);
528       break;
529
530     case PROP_COMPONENT:
531       GST_OBJECT_LOCK (sink);
532       g_value_set_uint (value, sink->component_id);
533       GST_OBJECT_UNLOCK (sink);
534       break;
535
536     default:
537       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
538       break;
539     }
540 }
541
542 static GstStateChangeReturn
543 gst_nice_sink_change_state (GstElement * element, GstStateChange transition)
544 {
545   GstNiceSink *sink;
546   GstStateChangeReturn ret;
547
548   sink = GST_NICE_SINK (element);
549
550   switch (transition) {
551     case GST_STATE_CHANGE_NULL_TO_READY:
552       if (sink->agent == NULL)
553         {
554           GST_ERROR_OBJECT (element,
555               "Trying to start Nice sink without an agent set");
556           return GST_STATE_CHANGE_FAILURE;
557         }
558       else if (sink->stream_id == 0)
559           {
560             GST_ERROR_OBJECT (element,
561                 "Trying to start Nice sink without a stream set");
562             return GST_STATE_CHANGE_FAILURE;
563           }
564       else if (sink->component_id == 0)
565           {
566             GST_ERROR_OBJECT (element,
567                 "Trying to start Nice sink without a component set");
568             return GST_STATE_CHANGE_FAILURE;
569           }
570       break;
571     case GST_STATE_CHANGE_READY_TO_PAUSED:
572     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
573     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
574     case GST_STATE_CHANGE_PAUSED_TO_READY:
575     case GST_STATE_CHANGE_READY_TO_NULL:
576     default:
577       break;
578   }
579
580   ret = GST_ELEMENT_CLASS (gst_nice_sink_parent_class)->change_state (element,
581       transition);
582
583   return ret;
584 }