Moving mpg123 plugin from -ugly
[platform/upstream/gst-plugins-good.git] / tests / check / elements / rtpsession.c
1 /* GStreamer
2  *
3  * unit test for gstrtpsession
4  *
5  * Copyright (C) <2009> Wim Taymans <wim.taymans@gmail.com>
6  * Copyright (C) 2013 Collabora Ltd.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 #define GLIB_DISABLE_DEPRECATION_WARNINGS
24
25 #include <gst/check/gstharness.h>
26 #include <gst/check/gstcheck.h>
27 #include <gst/check/gsttestclock.h>
28
29 #include <gst/rtp/gstrtpbuffer.h>
30 #include <gst/rtp/gstrtcpbuffer.h>
31 #include <gst/net/gstnetaddressmeta.h>
32
33 static const guint payload_size = 160;
34 static const guint clock_rate = 8000;
35 static const guint payload_type = 0;
36
37 typedef struct
38 {
39   GstElement *session;
40   GstPad *src, *rtcp_sink, *rtpsrc;
41   GstClock *clock;
42   GAsyncQueue *rtcp_queue;
43 } TestData;
44
45 static GstCaps *
46 generate_caps (void)
47 {
48   return gst_caps_new_simple ("application/x-rtp",
49       "clock-rate", G_TYPE_INT, clock_rate,
50       "payload-type", G_TYPE_INT, payload_type, NULL);
51 }
52
53 static GstBuffer *
54 generate_test_buffer (GstClockTime gst_ts,
55     gboolean marker_bit, guint seq_num, guint32 rtp_ts, guint ssrc)
56 {
57   GstBuffer *buf;
58   guint8 *payload;
59   guint i;
60   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
61
62   buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0);
63   GST_BUFFER_DTS (buf) = gst_ts;
64   GST_BUFFER_PTS (buf) = gst_ts;
65
66   gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
67   gst_rtp_buffer_set_payload_type (&rtp, payload_type);
68   gst_rtp_buffer_set_marker (&rtp, marker_bit);
69   gst_rtp_buffer_set_seq (&rtp, seq_num);
70   gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
71   gst_rtp_buffer_set_ssrc (&rtp, ssrc);
72
73   payload = gst_rtp_buffer_get_payload (&rtp);
74   for (i = 0; i < payload_size; i++)
75     payload[i] = 0xff;
76
77   gst_rtp_buffer_unmap (&rtp);
78
79   return buf;
80 }
81
82 static GstFlowReturn
83 test_sink_pad_chain_cb (GstPad * pad, GstObject * parent, GstBuffer * buffer)
84 {
85   TestData *data = gst_pad_get_element_private (pad);
86   g_async_queue_push (data->rtcp_queue, buffer);
87   GST_DEBUG ("chained");
88   return GST_FLOW_OK;
89 }
90
91 static GstCaps *
92 pt_map_requested (GstElement * elemen, guint pt, gpointer data)
93 {
94   return generate_caps ();
95 }
96
97 static void
98 destroy_testharness (TestData * data)
99 {
100   g_assert_cmpint (gst_element_set_state (data->session, GST_STATE_NULL),
101       ==, GST_STATE_CHANGE_SUCCESS);
102   gst_object_unref (data->session);
103   data->session = NULL;
104
105   gst_object_unref (data->src);
106   data->src = NULL;
107
108   gst_object_unref (data->rtcp_sink);
109   data->rtcp_sink = NULL;
110
111   gst_object_unref (data->rtpsrc);
112   data->rtpsrc = NULL;
113
114   gst_object_unref (data->clock);
115   data->clock = NULL;
116
117   g_async_queue_unref (data->rtcp_queue);
118   data->rtcp_queue = NULL;
119 }
120
121 static void
122 setup_testharness (TestData * data, gboolean session_as_sender)
123 {
124   GstPad *rtp_sink_pad, *rtcp_src_pad, *rtp_src_pad;
125   GstSegment seg;
126   GstMiniObject *obj;
127   GstCaps *caps;
128
129   data->clock = gst_test_clock_new ();
130   GST_DEBUG ("Setting default system clock to test clock");
131   gst_system_clock_set_default (data->clock);
132   g_assert (data->clock);
133   gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), 0);
134
135   data->session = gst_element_factory_make ("rtpsession", NULL);
136   g_signal_connect (data->session, "request-pt-map",
137       (GCallback) pt_map_requested, data);
138   g_assert (data->session);
139   gst_element_set_clock (data->session, data->clock);
140   g_assert_cmpint (gst_element_set_state (data->session,
141           GST_STATE_PLAYING), !=, GST_STATE_CHANGE_FAILURE);
142
143   data->rtcp_queue =
144       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
145
146   /* link in the test source-pad */
147   data->src = gst_pad_new ("src", GST_PAD_SRC);
148   g_assert (data->src);
149   rtp_sink_pad = gst_element_get_request_pad (data->session,
150       session_as_sender ? "send_rtp_sink" : "recv_rtp_sink");
151   g_assert (rtp_sink_pad);
152   g_assert_cmpint (gst_pad_link (data->src, rtp_sink_pad), ==, GST_PAD_LINK_OK);
153   gst_object_unref (rtp_sink_pad);
154
155   data->rtpsrc = gst_pad_new ("sink", GST_PAD_SINK);
156   g_assert (data->rtpsrc);
157   rtp_src_pad = gst_element_get_static_pad (data->session,
158       session_as_sender ? "send_rtp_src" : "recv_rtp_src");
159   g_assert (rtp_src_pad);
160   g_assert_cmpint (gst_pad_link (rtp_src_pad, data->rtpsrc), ==,
161       GST_PAD_LINK_OK);
162   gst_object_unref (rtp_src_pad);
163
164   /* link in the test sink-pad */
165   data->rtcp_sink = gst_pad_new ("sink", GST_PAD_SINK);
166   g_assert (data->rtcp_sink);
167   gst_pad_set_element_private (data->rtcp_sink, data);
168   caps = generate_caps ();
169   gst_pad_set_caps (data->rtcp_sink, caps);
170   gst_pad_set_chain_function (data->rtcp_sink, test_sink_pad_chain_cb);
171   rtcp_src_pad = gst_element_get_request_pad (data->session, "send_rtcp_src");
172   g_assert (rtcp_src_pad);
173   g_assert_cmpint (gst_pad_link (rtcp_src_pad, data->rtcp_sink), ==,
174       GST_PAD_LINK_OK);
175   gst_object_unref (rtcp_src_pad);
176
177   g_assert (gst_pad_set_active (data->src, TRUE));
178   g_assert (gst_pad_set_active (data->rtcp_sink, TRUE));
179
180   gst_segment_init (&seg, GST_FORMAT_TIME);
181   gst_pad_push_event (data->src, gst_event_new_stream_start ("stream0"));
182   gst_pad_set_caps (data->src, caps);
183   gst_pad_push_event (data->src, gst_event_new_segment (&seg));
184   gst_caps_unref (caps);
185
186   while ((obj = g_async_queue_try_pop (data->rtcp_queue)))
187     gst_mini_object_unref (obj);
188 }
189
190 GST_START_TEST (test_multiple_ssrc_rr)
191 {
192   TestData data;
193   GstFlowReturn res;
194   GstClockID id, tid;
195   GstBuffer *in_buf, *out_buf;
196   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
197   GstRTCPPacket rtcp_packet;
198   int i;
199   guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
200   gint32 packetslost;
201   guint8 fractionlost;
202
203   setup_testharness (&data, FALSE);
204
205   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_MSECOND);
206
207   for (i = 0; i < 5; i++) {
208     GST_DEBUG ("Push %i", i);
209     in_buf =
210         generate_test_buffer (i * 20 * GST_MSECOND, FALSE, i, i * 20,
211         0x01BADBAD);
212     res = gst_pad_push (data.src, in_buf);
213     fail_unless (res == GST_FLOW_OK || res == GST_FLOW_FLUSHING);
214
215
216     gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
217     tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
218     gst_clock_id_unref (id);
219     if (tid)
220       gst_clock_id_unref (tid);
221
222     in_buf =
223         generate_test_buffer (i * 20 * GST_MSECOND, FALSE, i, i * 20,
224         0xDEADBEEF);
225     res = gst_pad_push (data.src, in_buf);
226     fail_unless (res == GST_FLOW_OK || res == GST_FLOW_FLUSHING);
227
228     gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
229     tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
230     GST_DEBUG ("pushed %i", i);
231     gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
232         gst_clock_id_get_time (id));
233     gst_clock_id_unref (id);
234     if (tid)
235       gst_clock_id_unref (tid);
236   }
237
238   out_buf = g_async_queue_try_pop (data.rtcp_queue);
239   if (out_buf)
240     gst_buffer_unref (out_buf);
241
242   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
243       gst_clock_id_get_time (id) + (5 * GST_SECOND));
244   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
245   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
246   gst_clock_id_unref (id);
247   gst_clock_id_unref (tid);
248
249   out_buf = g_async_queue_pop (data.rtcp_queue);
250   g_assert (out_buf != NULL);
251   g_assert (gst_rtcp_buffer_validate (out_buf));
252   gst_rtcp_buffer_map (out_buf, GST_MAP_READ, &rtcp);
253   g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
254   g_assert (gst_rtcp_packet_get_type (&rtcp_packet) == GST_RTCP_TYPE_RR);
255   g_assert_cmpint (gst_rtcp_packet_get_rb_count (&rtcp_packet), ==, 2);
256
257   gst_rtcp_packet_get_rb (&rtcp_packet, 0, &ssrc, &fractionlost, &packetslost,
258       &exthighestseq, &jitter, &lsr, &dlsr);
259
260   g_assert_cmpint (ssrc, ==, 0x01BADBAD);
261
262   gst_rtcp_packet_get_rb (&rtcp_packet, 1, &ssrc, &fractionlost, &packetslost,
263       &exthighestseq, &jitter, &lsr, &dlsr);
264   g_assert_cmpint (ssrc, ==, 0xDEADBEEF);
265   gst_rtcp_buffer_unmap (&rtcp);
266   gst_buffer_unref (out_buf);
267
268   destroy_testharness (&data);
269 }
270
271 GST_END_TEST;
272
273 /* This verifies that rtpsession will correctly place RBs round-robin
274  * across multiple SRs when there are too many senders that their RBs
275  * do not fit in one SR */
276 GST_START_TEST (test_multiple_senders_roundrobin_rbs)
277 {
278   TestData data;
279   GstFlowReturn res;
280   GstClockID id, tid;
281   GstBuffer *buf;
282   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
283   GstRTCPPacket rtcp_packet;
284   GstClockTime time;
285   gint queue_length;
286   gint i, j, k;
287   guint32 ssrc;
288   GHashTable *sr_ssrcs, *rb_ssrcs, *tmp_set;
289
290   setup_testharness (&data, TRUE);
291
292   /* only the RTCP thread waits on the clock */
293   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
294
295   for (i = 0; i < 2; i++) {     /* cycles between SR reports */
296     for (j = 0; j < 5; j++) {   /* packets per ssrc */
297       gint seq = (i * 5) + j;
298       GST_DEBUG ("Push %i", seq);
299
300       gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock),
301           200 * GST_MSECOND);
302
303       for (k = 0; k < 35; k++) {        /* number of ssrcs */
304         buf =
305             generate_test_buffer (seq * 200 * GST_MSECOND, FALSE, seq,
306             seq * 200, 10000 + k);
307         res = gst_pad_push (data.src, buf);
308         fail_unless (res == GST_FLOW_OK || res == GST_FLOW_FLUSHING);
309       }
310
311       GST_DEBUG ("pushed %i", seq);
312     }
313
314     queue_length = g_async_queue_length (data.rtcp_queue);
315
316     do {
317       /* crank the RTCP pad thread */
318       time = gst_clock_id_get_time (id);
319       GST_DEBUG ("Advancing time to %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
320       gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), time);
321       tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
322       fail_unless_equals_pointer (tid, id);
323       gst_clock_id_unref (id);
324       gst_clock_id_unref (tid);
325
326       /* wait for the RTCP pad thread to output its data
327        * and start waiting on the next timeout */
328       gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock),
329           &id);
330
331       /* and retry as long as there are no new RTCP packets out,
332        * because the RTCP thread may randomly decide to reschedule
333        * the RTCP timeout for later */
334     } while (g_async_queue_length (data.rtcp_queue) == queue_length);
335
336     GST_DEBUG ("RTCP timeout processed");
337   }
338   gst_clock_id_unref (id);
339
340   sr_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
341   rb_ssrcs = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
342       (GDestroyNotify) g_hash_table_unref);
343
344   /* verify the rtcp packets */
345   for (i = 0; i < 2 * 35; i++) {
346     guint expected_rb_count = (i < 35) ? GST_RTCP_MAX_RB_COUNT :
347         (35 - GST_RTCP_MAX_RB_COUNT - 1);
348
349     GST_DEBUG ("pop %d", i);
350
351     buf = g_async_queue_pop (data.rtcp_queue);
352     g_assert (buf != NULL);
353     g_assert (gst_rtcp_buffer_validate (buf));
354
355     gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
356     g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
357     g_assert_cmpint (gst_rtcp_packet_get_type (&rtcp_packet), ==,
358         GST_RTCP_TYPE_SR);
359
360     gst_rtcp_packet_sr_get_sender_info (&rtcp_packet, &ssrc, NULL, NULL, NULL,
361         NULL);
362     g_assert_cmpint (ssrc, >=, 10000);
363     g_assert_cmpint (ssrc, <=, 10035);
364     g_hash_table_add (sr_ssrcs, GUINT_TO_POINTER (ssrc));
365
366     /* inspect the RBs */
367     g_assert_cmpint (gst_rtcp_packet_get_rb_count (&rtcp_packet), ==,
368         expected_rb_count);
369
370     if (i < 35) {
371       tmp_set = g_hash_table_new (g_direct_hash, g_direct_equal);
372       g_hash_table_insert (rb_ssrcs, GUINT_TO_POINTER (ssrc), tmp_set);
373     } else {
374       tmp_set = g_hash_table_lookup (rb_ssrcs, GUINT_TO_POINTER (ssrc));
375       g_assert (tmp_set);
376     }
377
378     for (j = 0; j < expected_rb_count; j++) {
379       gst_rtcp_packet_get_rb (&rtcp_packet, j, &ssrc, NULL, NULL,
380           NULL, NULL, NULL, NULL);
381       g_assert_cmpint (ssrc, >=, 10000);
382       g_assert_cmpint (ssrc, <=, 10035);
383       g_hash_table_add (tmp_set, GUINT_TO_POINTER (ssrc));
384     }
385
386     gst_rtcp_buffer_unmap (&rtcp);
387     gst_buffer_unref (buf);
388
389     /* cycle done, verify all ssrcs have issued SR reports */
390     if ((i + 1) == 35 || (i + 1) == (2 * 35)) {
391       g_assert_cmpint (g_hash_table_size (sr_ssrcs), ==, 35);
392       g_hash_table_remove_all (sr_ssrcs);
393     }
394   }
395
396   /* now verify all other ssrcs have been reported on each ssrc's SR */
397   g_assert_cmpint (g_hash_table_size (rb_ssrcs), ==, 35);
398   for (i = 10000; i < 10035; i++) {
399     tmp_set = g_hash_table_lookup (rb_ssrcs, GUINT_TO_POINTER (i));
400     g_assert (tmp_set);
401     /* SR contains RBs for each other ssrc except the ssrc of the SR */
402     g_assert_cmpint (g_hash_table_size (tmp_set), ==, 34);
403     g_assert (!g_hash_table_contains (tmp_set, GUINT_TO_POINTER (i)));
404   }
405
406   g_hash_table_unref (sr_ssrcs);
407   g_hash_table_unref (rb_ssrcs);
408
409   destroy_testharness (&data);
410 }
411
412 GST_END_TEST;
413
414 static void
415 crank_rtcp_thread (TestData * data, GstClockTime * time, GstClockID * id)
416 {
417   gint queue_length;
418   GstClockID *tid;
419
420   queue_length = g_async_queue_length (data->rtcp_queue);
421   do {
422     *time = gst_clock_id_get_time (*id);
423     GST_DEBUG ("Advancing time to %" GST_TIME_FORMAT, GST_TIME_ARGS (*time));
424     if (*time > gst_clock_get_time (data->clock))
425       gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), *time);
426     tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data->clock));
427     fail_unless_equals_pointer (tid, *id);
428
429     gst_clock_id_unref (tid);
430     gst_clock_id_unref (*id);
431     *id = NULL;
432
433     /* wait for the RTCP pad thread to output its data
434      * and start waiting on the next timeout */
435     gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data->clock), id);
436
437     /* and retry as long as there are no new RTCP packets out,
438      * because the RTCP thread may randomly decide to reschedule
439      * the RTCP timeout for later */
440   } while (g_async_queue_length (data->rtcp_queue) == queue_length);
441 }
442
443 GST_START_TEST (test_internal_sources_timeout)
444 {
445   TestData data;
446   GstClockID id;
447   GstClockTime time;
448   GObject *internal_session;
449   guint internal_ssrc;
450   guint32 ssrc;
451   GstBuffer *buf;
452   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
453   GstRTCPPacket rtcp_packet;
454   GstFlowReturn res;
455   gint i, j;
456   GstCaps *caps;
457
458   setup_testharness (&data, TRUE);
459   g_object_get (data.session, "internal-session", &internal_session, NULL);
460   g_object_set (internal_session, "internal-ssrc", 0xDEADBEEF, NULL);
461
462   /* only the RTCP thread waits on the clock */
463   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
464
465   /* crank the RTCP pad thread until it creates a RR for its internal-ssrc
466    * source, since we have not pushed any RTP packets and it doesn't have
467    * any other source available */
468   crank_rtcp_thread (&data, &time, &id);
469
470   g_object_get (internal_session, "internal-ssrc", &internal_ssrc, NULL);
471   g_assert_cmpint (internal_ssrc, ==, 0xDEADBEEF);
472
473   /* verify that rtpsession has sent RR for an internally-created
474    * RTPSource that is using the internal-ssrc */
475   buf = g_async_queue_pop (data.rtcp_queue);
476   g_assert (buf != NULL);
477   g_assert (gst_rtcp_buffer_validate (buf));
478   gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
479   g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
480   g_assert_cmpint (gst_rtcp_packet_get_type (&rtcp_packet), ==,
481       GST_RTCP_TYPE_RR);
482   ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
483   g_assert_cmpint (ssrc, ==, internal_ssrc);
484   gst_rtcp_buffer_unmap (&rtcp);
485   gst_buffer_unref (buf);
486
487   /* ok, now let's push some RTP packets */
488   caps =
489       gst_caps_new_simple ("application/x-rtp", "ssrc", G_TYPE_UINT, 0x01BADBAD,
490       NULL);
491   gst_pad_set_caps (data.src, caps);
492   gst_caps_unref (caps);
493
494   for (i = 1; i < 4; i++) {
495     gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock),
496         200 * GST_MSECOND);
497     buf =
498         generate_test_buffer (time + i * 200 * GST_MSECOND, FALSE, i, i * 200,
499         0x01BADBAD);
500     res = gst_pad_push (data.src, buf);
501     fail_unless (res == GST_FLOW_OK || res == GST_FLOW_FLUSHING);
502   }
503
504   /* internal ssrc must have changed already */
505   g_object_get (internal_session, "internal-ssrc", &internal_ssrc, NULL);
506   g_assert_cmpint (ssrc, !=, internal_ssrc);
507   g_assert_cmpint (internal_ssrc, ==, 0x01BADBAD);
508
509   /* wait for SR */
510   crank_rtcp_thread (&data, &time, &id);
511
512   /* verify SR and RR */
513   j = 0;
514   for (i = 0; i < 2; i++) {
515     buf = g_async_queue_pop (data.rtcp_queue);
516     g_assert (buf != NULL);
517     g_assert (gst_rtcp_buffer_validate (buf));
518     gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
519     g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
520     if (gst_rtcp_packet_get_type (&rtcp_packet) == GST_RTCP_TYPE_SR) {
521       gst_rtcp_packet_sr_get_sender_info (&rtcp_packet, &ssrc, NULL, NULL, NULL,
522           NULL);
523       g_assert_cmpint (ssrc, ==, internal_ssrc);
524       g_assert_cmpint (ssrc, ==, 0x01BADBAD);
525       j |= 0x1;
526     } else if (gst_rtcp_packet_get_type (&rtcp_packet) == GST_RTCP_TYPE_RR) {
527       ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
528       g_assert_cmpint (ssrc, !=, internal_ssrc);
529       g_assert_cmpint (ssrc, ==, 0xDEADBEEF);
530       j |= 0x2;
531     }
532     gst_rtcp_buffer_unmap (&rtcp);
533     gst_buffer_unref (buf);
534   }
535   g_assert_cmpint (j, ==, 0x3); /* verify we got both SR and RR */
536
537   /* go 30 seconds in the future and observe both sources timing out:
538    * 0xDEADBEEF -> BYE, 0x01BADBAD -> becomes receiver only */
539   gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock), 30 * GST_SECOND);
540   crank_rtcp_thread (&data, &time, &id);
541
542   /* verify BYE and RR */
543   j = 0;
544   for (i = 0; i < 2; i++) {
545     buf = g_async_queue_pop (data.rtcp_queue);
546     g_assert (buf != NULL);
547     g_assert (gst_rtcp_buffer_validate (buf));
548     gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
549
550     g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
551     g_assert_cmpint (gst_rtcp_packet_get_type (&rtcp_packet), ==,
552         GST_RTCP_TYPE_RR);
553     ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
554     if (ssrc == 0x01BADBAD) {
555       j |= 0x1;
556       g_assert_cmpint (ssrc, ==, internal_ssrc);
557       /* 2 => RR, SDES. There is no BYE here */
558       g_assert_cmpint (gst_rtcp_buffer_get_packet_count (&rtcp), ==, 2);
559     } else if (ssrc == 0xDEADBEEF) {
560       j |= 0x2;
561       g_assert_cmpint (ssrc, !=, internal_ssrc);
562       /* 3 => RR, SDES, BYE */
563       g_assert_cmpint (gst_rtcp_buffer_get_packet_count (&rtcp), ==, 3);
564       g_assert (gst_rtcp_packet_move_to_next (&rtcp_packet));
565       g_assert (gst_rtcp_packet_move_to_next (&rtcp_packet));
566       g_assert_cmpint (gst_rtcp_packet_get_type (&rtcp_packet), ==,
567           GST_RTCP_TYPE_BYE);
568     }
569
570     gst_rtcp_buffer_unmap (&rtcp);
571     gst_buffer_unref (buf);
572   }
573   g_assert_cmpint (j, ==, 0x3); /* verify we got both BYE and RR */
574   gst_clock_id_unref (id);
575
576   g_object_unref (internal_session);
577   destroy_testharness (&data);
578 }
579
580 GST_END_TEST;
581
582 typedef struct
583 {
584   guint8 subtype;
585   guint32 ssrc;
586   gchar *name;
587   GstBuffer *data;
588 } RTCPAppResult;
589
590 static void
591 on_app_rtcp_cb (GObject * session, guint subtype, guint ssrc,
592     const gchar * name, GstBuffer * data, RTCPAppResult * result)
593 {
594   result->subtype = subtype;
595   result->ssrc = ssrc;
596   result->name = g_strdup (name);
597   result->data = data ? gst_buffer_ref (data) : NULL;
598 }
599
600 GST_START_TEST (test_receive_rtcp_app_packet)
601 {
602   GstHarness *h;
603   GstBuffer *buffer;
604   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
605   GstRTCPPacket packet;
606   RTCPAppResult result = { 0 };
607   GstElement *internal_session;
608   guint8 data[] = { 0x11, 0x22, 0x33, 0x44 };
609
610   h = gst_harness_new_with_padnames ("rtpsession", "recv_rtcp_sink", NULL);
611   g_object_get (h->element, "internal-session", &internal_session, NULL);
612
613   g_signal_connect (internal_session, "on-app-rtcp",
614       G_CALLBACK (on_app_rtcp_cb), &result);
615
616   /* Push APP buffer with no data */
617   buffer = gst_rtcp_buffer_new (1000);
618   fail_unless (gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp));
619   fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet));
620   gst_rtcp_packet_app_set_subtype (&packet, 21);
621   gst_rtcp_packet_app_set_ssrc (&packet, 0x11111111);
622   gst_rtcp_packet_app_set_name (&packet, "Test");
623   gst_rtcp_buffer_unmap (&rtcp);
624
625   gst_harness_set_src_caps_str (h, "application/x-rtcp");
626   fail_unless_equals_int (gst_harness_push (h, buffer), GST_FLOW_OK);
627
628   fail_unless_equals_int (result.subtype, 21);
629   fail_unless_equals_int (result.ssrc, 0x11111111);
630   fail_unless_equals_string (result.name, "Test");
631   fail_unless_equals_pointer (result.data, NULL);
632
633   g_free (result.name);
634
635   /* Push APP buffer with data */
636   memset (&result, 0, sizeof (result));
637   buffer = gst_rtcp_buffer_new (1000);
638   fail_unless (gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp));
639   fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet));
640   gst_rtcp_packet_app_set_subtype (&packet, 22);
641   gst_rtcp_packet_app_set_ssrc (&packet, 0x22222222);
642   gst_rtcp_packet_app_set_name (&packet, "Test");
643   gst_rtcp_packet_app_set_data_length (&packet, sizeof (data) / 4);
644   memcpy (gst_rtcp_packet_app_get_data (&packet), data, sizeof (data));
645   gst_rtcp_buffer_unmap (&rtcp);
646
647   fail_unless_equals_int (gst_harness_push (h, buffer), GST_FLOW_OK);
648
649   fail_unless_equals_int (result.subtype, 22);
650   fail_unless_equals_int (result.ssrc, 0x22222222);
651   fail_unless_equals_string (result.name, "Test");
652   fail_unless (gst_buffer_memcmp (result.data, 0, data, sizeof (data)) == 0);
653
654   g_free (result.name);
655   gst_buffer_unref (result.data);
656
657   gst_object_unref (internal_session);
658   gst_harness_teardown (h);
659 }
660
661 GST_END_TEST;
662
663 static void
664 stats_test_cb (GObject * object, GParamSpec * spec, gpointer data)
665 {
666   guint num_sources = 0;
667   gboolean *cb_called = data;
668   g_assert (*cb_called == FALSE);
669   *cb_called = TRUE;
670
671   /* We should be able to get a rtpsession property
672      without introducing the deadlock */
673   g_object_get (object, "num-sources", &num_sources, NULL);
674 }
675
676 GST_START_TEST (test_dont_lock_on_stats)
677 {
678   GstHarness *h_rtcp;
679   GstHarness *h_send;
680   GstClock *clock = gst_test_clock_new ();
681   GstTestClock *testclock = GST_TEST_CLOCK (clock);
682   gboolean cb_called = FALSE;
683
684   /* use testclock as the systemclock to capture the rtcp thread waits */
685   gst_system_clock_set_default (GST_CLOCK (testclock));
686
687   h_rtcp =
688       gst_harness_new_with_padnames ("rtpsession", "recv_rtcp_sink",
689       "send_rtcp_src");
690   h_send =
691       gst_harness_new_with_element (h_rtcp->element, "send_rtp_sink",
692       "send_rtp_src");
693
694   /* connect to the stats-reporting */
695   g_signal_connect (h_rtcp->element, "notify::stats",
696       G_CALLBACK (stats_test_cb), &cb_called);
697
698   /* "crank" and check the stats */
699   g_assert (gst_test_clock_crank (testclock));
700   gst_buffer_unref (gst_harness_pull (h_rtcp));
701   fail_unless (cb_called);
702
703   gst_harness_teardown (h_send);
704   gst_harness_teardown (h_rtcp);
705   gst_object_unref (clock);
706 }
707
708 GST_END_TEST;
709
710 static void
711 suspicious_bye_cb (GObject * object, GParamSpec * spec, gpointer data)
712 {
713   GValueArray *stats_arr;
714   GstStructure *stats, *internal_stats;
715   gboolean *cb_called = data;
716   gboolean internal = FALSE, sent_bye = TRUE;
717   guint ssrc = 0;
718   guint i;
719
720   g_assert (*cb_called == FALSE);
721   *cb_called = TRUE;
722
723   g_object_get (object, "stats", &stats, NULL);
724   stats_arr =
725       g_value_get_boxed (gst_structure_get_value (stats, "source-stats"));
726   g_assert (stats_arr != NULL);
727   fail_unless (stats_arr->n_values >= 1);
728
729   for (i = 0; i < stats_arr->n_values; i++) {
730     internal_stats = g_value_get_boxed (g_value_array_get_nth (stats_arr, i));
731     g_assert (internal_stats != NULL);
732
733     gst_structure_get (internal_stats,
734         "ssrc", G_TYPE_UINT, &ssrc,
735         "internal", G_TYPE_BOOLEAN, &internal,
736         "received-bye", G_TYPE_BOOLEAN, &sent_bye, NULL);
737
738     if (ssrc == 0xDEADBEEF) {
739       fail_unless (internal);
740       fail_unless (!sent_bye);
741       break;
742     }
743   }
744   fail_unless_equals_int (ssrc, 0xDEADBEEF);
745
746   gst_structure_free (stats);
747 }
748
749 static GstBuffer *
750 create_bye_rtcp (guint32 ssrc)
751 {
752   GstRTCPPacket packet;
753   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
754   GSocketAddress *saddr;
755   GstBuffer *buffer = gst_rtcp_buffer_new (1000);
756
757   fail_unless (gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp));
758   fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_BYE, &packet));
759   gst_rtcp_packet_bye_add_ssrc (&packet, ssrc);
760   gst_rtcp_buffer_unmap (&rtcp);
761
762   /* Need to add meta to trigger collision detection */
763   saddr = g_inet_socket_address_new_from_string ("127.0.0.1", 3490);
764   gst_buffer_add_net_address_meta (buffer, saddr);
765   g_object_unref (saddr);
766   return buffer;
767 }
768
769 GST_START_TEST (test_ignore_suspicious_bye)
770 {
771   GstHarness *h_rtcp = NULL;
772   GstHarness *h_send = NULL;
773   gboolean cb_called = FALSE;
774   GstTestClock *testclock = GST_TEST_CLOCK (gst_test_clock_new ());
775
776   /* use testclock as the systemclock to capture the rtcp thread waits */
777   gst_system_clock_set_default (GST_CLOCK (testclock));
778
779   h_rtcp =
780       gst_harness_new_with_padnames ("rtpsession", "recv_rtcp_sink",
781       "send_rtcp_src");
782   h_send =
783       gst_harness_new_with_element (h_rtcp->element, "send_rtp_sink",
784       "send_rtp_src");
785
786   /* connect to the stats-reporting */
787   g_signal_connect (h_rtcp->element, "notify::stats",
788       G_CALLBACK (suspicious_bye_cb), &cb_called);
789
790   /* Push RTP buffer making our internal SSRC=0xDEADBEEF */
791   gst_harness_set_src_caps_str (h_send,
792       "application/x-rtp,ssrc=(uint)0xDEADBEEF,"
793       "clock-rate=90000,seqnum-offset=(uint)12345");
794   gst_harness_push (h_send,
795       generate_test_buffer (0, FALSE, 12345, 0, 0xDEADBEEF));
796
797   /* Push BYE RTCP with internal SSRC (0xDEADBEEF) */
798   gst_harness_set_src_caps_str (h_rtcp, "application/x-rtcp");
799   gst_harness_push (h_rtcp, create_bye_rtcp (0xDEADBEEF));
800
801   /* "crank" and check the stats */
802   g_assert (gst_test_clock_crank (testclock));
803   gst_buffer_unref (gst_harness_pull (h_rtcp));
804   fail_unless (cb_called);
805
806   gst_harness_teardown (h_send);
807   gst_harness_teardown (h_rtcp);
808   gst_object_unref (testclock);
809 }
810
811 GST_END_TEST;
812
813 static Suite *
814 rtpsession_suite (void)
815 {
816   Suite *s = suite_create ("rtpsession");
817   TCase *tc_chain = tcase_create ("general");
818
819   suite_add_tcase (s, tc_chain);
820   tcase_add_test (tc_chain, test_multiple_ssrc_rr);
821   tcase_add_test (tc_chain, test_multiple_senders_roundrobin_rbs);
822   tcase_add_test (tc_chain, test_internal_sources_timeout);
823   tcase_add_test (tc_chain, test_receive_rtcp_app_packet);
824   tcase_add_test (tc_chain, test_dont_lock_on_stats);
825   tcase_add_test (tc_chain, test_ignore_suspicious_bye);
826
827   return s;
828 }
829
830 GST_CHECK_MAIN (rtpsession);