rtpsession: Try media_ssrc if no src can be found for PLI sender_ssrc
[platform/upstream/gst-plugins-good.git] / tests / check / elements / rtpsession.c
1 /* GStreamer
2  *
3  * unit test for gstrtpsession
4  *
5  * Copyright (C) <2009> Wim Taymans <wim.taymans@gmail.com>
6  * Copyright (C) 2013 Collabora Ltd.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 #define GLIB_DISABLE_DEPRECATION_WARNINGS
24
25 #include <gst/check/gstharness.h>
26 #include <gst/check/gstcheck.h>
27 #include <gst/check/gsttestclock.h>
28 #include <gst/check/gstharness.h>
29
30 #include <gst/rtp/gstrtpbuffer.h>
31 #include <gst/rtp/gstrtcpbuffer.h>
32 #include <gst/net/gstnetaddressmeta.h>
33 #include <gst/video/video.h>
34
35 #define TEST_BUF_CLOCK_RATE 8000
36 #define TEST_BUF_PT 0
37 #define TEST_BUF_SSRC 0x01BADBAD
38 #define TEST_BUF_MS  20
39 #define TEST_BUF_DURATION (TEST_BUF_MS * GST_MSECOND)
40 #define TEST_BUF_SIZE (64000 * TEST_BUF_MS / 1000)
41 #define TEST_RTP_TS_DURATION (TEST_BUF_CLOCK_RATE * TEST_BUF_MS / 1000)
42
43 static GstCaps *
44 generate_caps (void)
45 {
46   return gst_caps_new_simple ("application/x-rtp",
47       "clock-rate", G_TYPE_INT, TEST_BUF_CLOCK_RATE,
48       "payload", G_TYPE_INT, TEST_BUF_PT, NULL);
49 }
50
51 static GstBuffer *
52 generate_test_buffer_full (GstClockTime dts,
53     guint seq_num, guint32 rtp_ts, guint ssrc)
54 {
55   GstBuffer *buf;
56   guint8 *payload;
57   guint i;
58   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
59
60   buf = gst_rtp_buffer_new_allocate (TEST_BUF_SIZE, 0, 0);
61   GST_BUFFER_DTS (buf) = dts;
62
63   gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
64   gst_rtp_buffer_set_payload_type (&rtp, TEST_BUF_PT);
65   gst_rtp_buffer_set_seq (&rtp, seq_num);
66   gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
67   gst_rtp_buffer_set_ssrc (&rtp, ssrc);
68
69   payload = gst_rtp_buffer_get_payload (&rtp);
70   for (i = 0; i < TEST_BUF_SIZE; i++)
71     payload[i] = 0xff;
72
73   gst_rtp_buffer_unmap (&rtp);
74
75   return buf;
76 }
77
78 static GstBuffer *
79 generate_test_buffer (guint seq_num, guint ssrc)
80 {
81   return generate_test_buffer_full (seq_num * TEST_BUF_DURATION,
82       seq_num, seq_num * TEST_RTP_TS_DURATION, ssrc);
83 }
84
85 typedef struct
86 {
87   GstHarness *send_rtp_h;
88   GstHarness *recv_rtp_h;
89   GstHarness *rtcp_h;
90
91   GstElement *session;
92   GObject *internal_session;
93   GstTestClock *testclock;
94   GstCaps *caps;
95 } SessionHarness;
96
97 static GstCaps *
98 _pt_map_requested (GstElement * element, guint pt, gpointer data)
99 {
100   SessionHarness *h = data;
101   return gst_caps_copy (h->caps);
102 }
103
104 static SessionHarness *
105 session_harness_new (void)
106 {
107   SessionHarness *h = g_new0 (SessionHarness, 1);
108   h->caps = generate_caps ();
109
110   h->testclock = GST_TEST_CLOCK_CAST (gst_test_clock_new ());
111   gst_system_clock_set_default (GST_CLOCK_CAST (h->testclock));
112
113   h->session = gst_element_factory_make ("rtpsession", NULL);
114   gst_element_set_clock (h->session, GST_CLOCK_CAST (h->testclock));
115
116   h->send_rtp_h = gst_harness_new_with_element (h->session,
117       "send_rtp_sink", "send_rtp_src");
118   gst_harness_set_src_caps (h->send_rtp_h, gst_caps_copy (h->caps));
119
120   h->recv_rtp_h = gst_harness_new_with_element (h->session,
121       "recv_rtp_sink", "recv_rtp_src");
122   gst_harness_set_src_caps (h->recv_rtp_h, gst_caps_copy (h->caps));
123
124   h->rtcp_h = gst_harness_new_with_element (h->session,
125       "recv_rtcp_sink", "send_rtcp_src");
126   gst_harness_set_src_caps_str (h->rtcp_h, "application/x-rtcp");
127
128   g_signal_connect (h->session, "request-pt-map",
129       (GCallback) _pt_map_requested, h);
130
131   g_object_get (h->session, "internal-session", &h->internal_session, NULL);
132
133   return h;
134 }
135
136 static void
137 session_harness_free (SessionHarness * h)
138 {
139   gst_system_clock_set_default (NULL);
140
141   gst_caps_unref (h->caps);
142   gst_object_unref (h->testclock);
143
144   gst_harness_teardown (h->rtcp_h);
145   gst_harness_teardown (h->recv_rtp_h);
146   gst_harness_teardown (h->send_rtp_h);
147
148   g_object_unref (h->internal_session);
149   gst_object_unref (h->session);
150   g_free (h);
151 }
152
153 static GstFlowReturn
154 session_harness_send_rtp (SessionHarness * h, GstBuffer * buf)
155 {
156   return gst_harness_push (h->send_rtp_h, buf);
157 }
158
159 static GstFlowReturn
160 session_harness_recv_rtp (SessionHarness * h, GstBuffer * buf)
161 {
162   return gst_harness_push (h->recv_rtp_h, buf);
163 }
164
165 static GstFlowReturn
166 session_harness_recv_rtcp (SessionHarness * h, GstBuffer * buf)
167 {
168   return gst_harness_push (h->rtcp_h, buf);
169 }
170
171 static GstBuffer *
172 session_harness_pull_rtcp (SessionHarness * h)
173 {
174   return gst_harness_pull (h->rtcp_h);
175 }
176
177 static void
178 session_harness_crank_clock (SessionHarness * h)
179 {
180   gst_test_clock_crank (h->testclock);
181 }
182
183 static gboolean
184 session_harness_advance_and_crank (SessionHarness * h, GstClockTime delta)
185 {
186   GstClockID res, pending;
187   gboolean result;
188   gst_test_clock_wait_for_next_pending_id (h->testclock, &pending);
189   gst_test_clock_advance_time (h->testclock, delta);
190   res = gst_test_clock_process_next_clock_id (h->testclock);
191   if (res == pending)
192     result = TRUE;
193   else
194     result = FALSE;
195   if (res)
196     gst_clock_id_unref (res);
197   gst_clock_id_unref (pending);
198   return result;
199 }
200
201 static void
202 session_harness_produce_rtcp (SessionHarness * h, gint num_rtcp_packets)
203 {
204   /* due to randomness in rescheduling of RTCP timeout, we need to
205      keep cranking until we have the desired amount of packets */
206   while (gst_harness_buffers_in_queue (h->rtcp_h) < num_rtcp_packets)
207     session_harness_crank_clock (h);
208 }
209
210 GST_START_TEST (test_multiple_ssrc_rr)
211 {
212   SessionHarness *h = session_harness_new ();
213   GstFlowReturn res;
214   GstBuffer *in_buf, *out_buf;
215   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
216   GstRTCPPacket rtcp_packet;
217   gint i, j;
218
219   guint ssrcs[] = {
220     0x01BADBAD,
221     0xDEADBEEF,
222   };
223
224   /* receive buffers with multiple ssrcs */
225   for (i = 0; i < 2; i++) {
226     for (j = 0; j < G_N_ELEMENTS (ssrcs); j++) {
227       in_buf = generate_test_buffer (i, ssrcs[j]);
228       res = session_harness_recv_rtp (h, in_buf);
229       fail_unless_equals_int (GST_FLOW_OK, res);
230     }
231   }
232
233   /* crank the rtcp-thread and pull out the rtcp-packet we have generated */
234   session_harness_crank_clock (h);
235   out_buf = session_harness_pull_rtcp (h);
236
237   /* verify we have report blocks for both ssrcs */
238   g_assert (out_buf != NULL);
239   fail_unless (gst_rtcp_buffer_validate (out_buf));
240   gst_rtcp_buffer_map (out_buf, GST_MAP_READ, &rtcp);
241   g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
242   fail_unless_equals_int (GST_RTCP_TYPE_RR,
243       gst_rtcp_packet_get_type (&rtcp_packet));
244
245   fail_unless_equals_int (G_N_ELEMENTS (ssrcs),
246       gst_rtcp_packet_get_rb_count (&rtcp_packet));
247
248   for (j = 0; j < G_N_ELEMENTS (ssrcs); j++) {
249     guint32 ssrc;
250     gst_rtcp_packet_get_rb (&rtcp_packet, j, &ssrc,
251         NULL, NULL, NULL, NULL, NULL, NULL);
252     fail_unless_equals_int (ssrcs[j], ssrc);
253   }
254
255   gst_rtcp_buffer_unmap (&rtcp);
256   gst_buffer_unref (out_buf);
257
258   session_harness_free (h);
259 }
260
261 GST_END_TEST;
262
263 /* This verifies that rtpsession will correctly place RBs round-robin
264  * across multiple RRs when there are too many senders that their RBs
265  * do not fit in one RR */
266 GST_START_TEST (test_multiple_senders_roundrobin_rbs)
267 {
268   SessionHarness *h = session_harness_new ();
269   GstFlowReturn res;
270   GstBuffer *buf;
271   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
272   GstRTCPPacket rtcp_packet;
273   gint i, j, k;
274   guint32 ssrc;
275   GHashTable *rb_ssrcs, *tmp_set;
276
277   g_object_set (h->internal_session, "internal-ssrc", 0xDEADBEEF, NULL);
278
279   for (i = 0; i < 2; i++) {     /* cycles between RR reports */
280     for (j = 0; j < 5; j++) {   /* packets per ssrc */
281       gint seq = (i * 5) + j;
282       for (k = 0; k < 35; k++) {        /* number of ssrcs */
283         buf = generate_test_buffer (seq, 10000 + k);
284         res = session_harness_recv_rtp (h, buf);
285         fail_unless_equals_int (GST_FLOW_OK, res);
286       }
287     }
288   }
289
290   rb_ssrcs = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
291       (GDestroyNotify) g_hash_table_unref);
292
293   /* verify the rtcp packets */
294   for (i = 0; i < 2; i++) {
295     guint expected_rb_count = (i < 1) ? GST_RTCP_MAX_RB_COUNT :
296         (35 - GST_RTCP_MAX_RB_COUNT);
297
298     session_harness_produce_rtcp (h, 1);
299     buf = session_harness_pull_rtcp (h);
300     g_assert (buf != NULL);
301     fail_unless (gst_rtcp_buffer_validate (buf));
302
303     gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
304     fail_unless (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
305     fail_unless_equals_int (GST_RTCP_TYPE_RR,
306         gst_rtcp_packet_get_type (&rtcp_packet));
307
308     ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
309     fail_unless_equals_int (0xDEADBEEF, ssrc);
310
311     /* inspect the RBs */
312     fail_unless_equals_int (expected_rb_count,
313         gst_rtcp_packet_get_rb_count (&rtcp_packet));
314
315     if (i == 0) {
316       tmp_set = g_hash_table_new (g_direct_hash, g_direct_equal);
317       g_hash_table_insert (rb_ssrcs, GUINT_TO_POINTER (ssrc), tmp_set);
318     } else {
319       tmp_set = g_hash_table_lookup (rb_ssrcs, GUINT_TO_POINTER (ssrc));
320       g_assert (tmp_set);
321     }
322
323     for (j = 0; j < expected_rb_count; j++) {
324       gst_rtcp_packet_get_rb (&rtcp_packet, j, &ssrc, NULL, NULL,
325           NULL, NULL, NULL, NULL);
326       g_assert_cmpint (ssrc, >=, 10000);
327       g_assert_cmpint (ssrc, <=, 10035);
328       g_hash_table_add (tmp_set, GUINT_TO_POINTER (ssrc));
329     }
330
331     gst_rtcp_buffer_unmap (&rtcp);
332     gst_buffer_unref (buf);
333   }
334
335   /* now verify all received ssrcs have been reported */
336   fail_unless_equals_int (1, g_hash_table_size (rb_ssrcs));
337   tmp_set = g_hash_table_lookup (rb_ssrcs, GUINT_TO_POINTER (0xDEADBEEF));
338   g_assert (tmp_set);
339   fail_unless_equals_int (35, g_hash_table_size (tmp_set));
340
341   g_hash_table_unref (rb_ssrcs);
342   session_harness_free (h);
343 }
344
345 GST_END_TEST;
346
347 GST_START_TEST (test_no_rbs_for_internal_senders)
348 {
349   SessionHarness *h = session_harness_new ();
350   GstFlowReturn res;
351   GstBuffer *buf;
352   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
353   GstRTCPPacket rtcp_packet;
354   gint i, j, k;
355   guint32 ssrc;
356   GHashTable *sr_ssrcs;
357   GHashTable *rb_ssrcs, *tmp_set;
358
359   /* Push RTP from our send SSRCs */
360   for (j = 0; j < 5; j++) {     /* packets per ssrc */
361     for (k = 0; k < 2; k++) {   /* number of ssrcs */
362       buf = generate_test_buffer (j, 10000 + k);
363       res = session_harness_send_rtp (h, buf);
364       fail_unless_equals_int (GST_FLOW_OK, res);
365     }
366   }
367
368   /* crank the RTCP pad thread */
369   session_harness_crank_clock (h);
370
371   sr_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
372
373   /* verify the rtcp packets */
374   for (i = 0; i < 2; i++) {
375     buf = session_harness_pull_rtcp (h);
376     g_assert (buf != NULL);
377     g_assert (gst_rtcp_buffer_validate (buf));
378
379     gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
380     g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
381     fail_unless_equals_int (GST_RTCP_TYPE_SR,
382         gst_rtcp_packet_get_type (&rtcp_packet));
383
384     gst_rtcp_packet_sr_get_sender_info (&rtcp_packet, &ssrc, NULL, NULL,
385         NULL, NULL);
386     g_assert_cmpint (ssrc, >=, 10000);
387     g_assert_cmpint (ssrc, <=, 10001);
388     g_hash_table_add (sr_ssrcs, GUINT_TO_POINTER (ssrc));
389
390     /* There should be no RBs as there are no remote senders */
391     fail_unless_equals_int (0, gst_rtcp_packet_get_rb_count (&rtcp_packet));
392
393     gst_rtcp_buffer_unmap (&rtcp);
394     gst_buffer_unref (buf);
395   }
396
397   /* Ensure both internal senders generated RTCP */
398   fail_unless_equals_int (2, g_hash_table_size (sr_ssrcs));
399   g_hash_table_unref (sr_ssrcs);
400
401   /* Generate RTP from remote side */
402   for (j = 0; j < 5; j++) {     /* packets per ssrc */
403     for (k = 0; k < 2; k++) {   /* number of ssrcs */
404       buf = generate_test_buffer (j, 20000 + k);
405       res = session_harness_recv_rtp (h, buf);
406       fail_unless_equals_int (GST_FLOW_OK, res);
407     }
408   }
409
410   sr_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
411   rb_ssrcs = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
412       (GDestroyNotify) g_hash_table_unref);
413
414   /* verify the rtcp packets */
415   for (i = 0; i < 2; i++) {
416     session_harness_produce_rtcp (h, 1);
417     buf = session_harness_pull_rtcp (h);
418     g_assert (buf != NULL);
419     g_assert (gst_rtcp_buffer_validate (buf));
420
421     gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
422     g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
423     fail_unless_equals_int (GST_RTCP_TYPE_SR,
424         gst_rtcp_packet_get_type (&rtcp_packet));
425
426     gst_rtcp_packet_sr_get_sender_info (&rtcp_packet, &ssrc, NULL, NULL,
427         NULL, NULL);
428     g_assert_cmpint (ssrc, >=, 10000);
429     g_assert_cmpint (ssrc, <=, 10001);
430     g_hash_table_add (sr_ssrcs, GUINT_TO_POINTER (ssrc));
431
432     /* There should be 2 RBs: one for each remote sender */
433     fail_unless_equals_int (2, gst_rtcp_packet_get_rb_count (&rtcp_packet));
434
435     tmp_set = g_hash_table_new (g_direct_hash, g_direct_equal);
436     g_hash_table_insert (rb_ssrcs, GUINT_TO_POINTER (ssrc), tmp_set);
437
438     for (j = 0; j < 2; j++) {
439       gst_rtcp_packet_get_rb (&rtcp_packet, j, &ssrc, NULL, NULL,
440           NULL, NULL, NULL, NULL);
441       g_assert_cmpint (ssrc, >=, 20000);
442       g_assert_cmpint (ssrc, <=, 20001);
443       g_hash_table_add (tmp_set, GUINT_TO_POINTER (ssrc));
444     }
445
446     gst_rtcp_buffer_unmap (&rtcp);
447     gst_buffer_unref (buf);
448   }
449
450   /* now verify all received ssrcs have been reported */
451   fail_unless_equals_int (2, g_hash_table_size (sr_ssrcs));
452   fail_unless_equals_int (2, g_hash_table_size (rb_ssrcs));
453   for (i = 10000; i < 10002; i++) {
454     tmp_set = g_hash_table_lookup (rb_ssrcs, GUINT_TO_POINTER (i));
455     g_assert (tmp_set);
456     fail_unless_equals_int (2, g_hash_table_size (tmp_set));
457   }
458
459   g_hash_table_unref (rb_ssrcs);
460   g_hash_table_unref (sr_ssrcs);
461
462   session_harness_free (h);
463 }
464
465 GST_END_TEST;
466
467 GST_START_TEST (test_internal_sources_timeout)
468 {
469   SessionHarness *h = session_harness_new ();
470   guint internal_ssrc;
471   guint32 ssrc;
472   GstBuffer *buf;
473   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
474   GstRTCPPacket rtcp_packet;
475   GstRTCPType rtcp_type;
476   GstFlowReturn res;
477   gint i, j;
478   GstCaps *caps;
479   gboolean seen_bye;
480
481   g_object_set (h->internal_session, "internal-ssrc", 0xDEADBEEF, NULL);
482   g_object_get (h->internal_session, "internal-ssrc", &internal_ssrc, NULL);
483   fail_unless_equals_int (0xDEADBEEF, internal_ssrc);
484
485   for (i = 1; i < 4; i++) {
486     buf = generate_test_buffer (i, 0xBEEFDEAD);
487     res = session_harness_recv_rtp (h, buf);
488     fail_unless_equals_int (GST_FLOW_OK, res);
489   }
490
491   /* verify that rtpsession has sent RR for an internally-created
492    * RTPSource that is using the internal-ssrc */
493   session_harness_produce_rtcp (h, 1);
494   buf = session_harness_pull_rtcp (h);
495
496   fail_unless (buf != NULL);
497   fail_unless (gst_rtcp_buffer_validate (buf));
498   gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
499   fail_unless (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
500   fail_unless_equals_int (GST_RTCP_TYPE_RR,
501       gst_rtcp_packet_get_type (&rtcp_packet));
502   ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
503   fail_unless_equals_int (ssrc, internal_ssrc);
504   gst_rtcp_buffer_unmap (&rtcp);
505   gst_buffer_unref (buf);
506
507   /* ok, now let's push some RTP packets */
508   caps = gst_caps_new_simple ("application/x-rtp",
509       "ssrc", G_TYPE_UINT, 0x01BADBAD, NULL);
510   gst_harness_set_src_caps (h->send_rtp_h, caps);
511
512   for (i = 1; i < 4; i++) {
513     buf = generate_test_buffer (i, 0x01BADBAD);
514     res = session_harness_send_rtp (h, buf);
515     fail_unless_equals_int (GST_FLOW_OK, res);
516   }
517
518   /* internal ssrc must have changed already */
519   g_object_get (h->internal_session, "internal-ssrc", &internal_ssrc, NULL);
520   fail_unless (internal_ssrc != ssrc);
521   fail_unless_equals_int (0x01BADBAD, internal_ssrc);
522
523   /* verify SR and RR */
524   j = 0;
525   for (i = 0; i < 2; i++) {
526     session_harness_produce_rtcp (h, 1);
527     buf = session_harness_pull_rtcp (h);
528     g_assert (buf != NULL);
529     fail_unless (gst_rtcp_buffer_validate (buf));
530     gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
531     fail_unless (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
532     rtcp_type = gst_rtcp_packet_get_type (&rtcp_packet);
533
534     if (rtcp_type == GST_RTCP_TYPE_SR) {
535       gst_rtcp_packet_sr_get_sender_info (&rtcp_packet, &ssrc, NULL, NULL, NULL,
536           NULL);
537       fail_unless_equals_int (internal_ssrc, ssrc);
538       fail_unless_equals_int (0x01BADBAD, ssrc);
539       j |= 0x1;
540     } else if (rtcp_type == GST_RTCP_TYPE_RR) {
541       ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
542       fail_unless (internal_ssrc != ssrc);
543       fail_unless_equals_int (0xDEADBEEF, ssrc);
544       j |= 0x2;
545     }
546     gst_rtcp_buffer_unmap (&rtcp);
547     gst_buffer_unref (buf);
548   }
549   fail_unless_equals_int (0x3, j);      /* verify we got both SR and RR */
550
551   /* go 30 seconds in the future and observe both sources timing out:
552    * 0xDEADBEEF -> BYE, 0x01BADBAD -> becomes receiver only */
553   fail_unless (session_harness_advance_and_crank (h, 30 * GST_SECOND));
554
555   /* verify BYE and RR */
556   j = 0;
557   seen_bye = FALSE;
558   while (!seen_bye) {
559     session_harness_produce_rtcp (h, 1);
560     buf = session_harness_pull_rtcp (h);
561     fail_unless (buf != NULL);
562     fail_unless (gst_rtcp_buffer_validate (buf));
563     gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
564     fail_unless (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
565     rtcp_type = gst_rtcp_packet_get_type (&rtcp_packet);
566
567     if (rtcp_type == GST_RTCP_TYPE_RR) {
568       ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
569       if (ssrc == 0x01BADBAD) {
570         j |= 0x1;
571         fail_unless_equals_int (internal_ssrc, ssrc);
572         /* 2 => RR, SDES. There is no BYE here */
573         fail_unless_equals_int (2, gst_rtcp_buffer_get_packet_count (&rtcp));
574       } else if (ssrc == 0xDEADBEEF) {
575         j |= 0x2;
576         g_assert_cmpint (ssrc, !=, internal_ssrc);
577         /* 3 => RR, SDES, BYE */
578         if (gst_rtcp_buffer_get_packet_count (&rtcp) == 3) {
579           fail_unless (gst_rtcp_packet_move_to_next (&rtcp_packet));
580           fail_unless (gst_rtcp_packet_move_to_next (&rtcp_packet));
581           fail_unless_equals_int (GST_RTCP_TYPE_BYE,
582               gst_rtcp_packet_get_type (&rtcp_packet));
583           seen_bye = TRUE;
584         }
585       }
586     }
587     gst_rtcp_buffer_unmap (&rtcp);
588     gst_buffer_unref (buf);
589   }
590   fail_unless_equals_int (0x3, j);      /* verify we got both BYE and RR */
591
592   session_harness_free (h);
593 }
594
595 GST_END_TEST;
596
597 typedef struct
598 {
599   guint8 subtype;
600   guint32 ssrc;
601   gchar *name;
602   GstBuffer *data;
603 } RTCPAppResult;
604
605 static void
606 on_app_rtcp_cb (GObject * session, guint subtype, guint ssrc,
607     const gchar * name, GstBuffer * data, RTCPAppResult * result)
608 {
609   result->subtype = subtype;
610   result->ssrc = ssrc;
611   result->name = g_strdup (name);
612   result->data = data ? gst_buffer_ref (data) : NULL;
613 }
614
615 GST_START_TEST (test_receive_rtcp_app_packet)
616 {
617   SessionHarness *h = session_harness_new ();
618   GstBuffer *buf;
619   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
620   GstRTCPPacket packet;
621   RTCPAppResult result = { 0 };
622   guint8 data[] = { 0x11, 0x22, 0x33, 0x44 };
623
624   g_signal_connect (h->internal_session, "on-app-rtcp",
625       G_CALLBACK (on_app_rtcp_cb), &result);
626
627   /* Push APP buffer with no data */
628   buf = gst_rtcp_buffer_new (1000);
629   fail_unless (gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp));
630   fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet));
631   gst_rtcp_packet_app_set_subtype (&packet, 21);
632   gst_rtcp_packet_app_set_ssrc (&packet, 0x11111111);
633   gst_rtcp_packet_app_set_name (&packet, "Test");
634   gst_rtcp_buffer_unmap (&rtcp);
635
636   fail_unless_equals_int (GST_FLOW_OK, session_harness_recv_rtcp (h, buf));
637
638   fail_unless_equals_int (21, result.subtype);
639   fail_unless_equals_int (0x11111111, result.ssrc);
640   fail_unless_equals_string ("Test", result.name);
641   fail_unless_equals_pointer (NULL, result.data);
642
643   g_free (result.name);
644
645   /* Push APP buffer with data */
646   memset (&result, 0, sizeof (result));
647   buf = gst_rtcp_buffer_new (1000);
648   fail_unless (gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp));
649   fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet));
650   gst_rtcp_packet_app_set_subtype (&packet, 22);
651   gst_rtcp_packet_app_set_ssrc (&packet, 0x22222222);
652   gst_rtcp_packet_app_set_name (&packet, "Test");
653   gst_rtcp_packet_app_set_data_length (&packet, sizeof (data) / 4);
654   memcpy (gst_rtcp_packet_app_get_data (&packet), data, sizeof (data));
655   gst_rtcp_buffer_unmap (&rtcp);
656
657   fail_unless_equals_int (GST_FLOW_OK, session_harness_recv_rtcp (h, buf));
658
659   fail_unless_equals_int (22, result.subtype);
660   fail_unless_equals_int (0x22222222, result.ssrc);
661   fail_unless_equals_string ("Test", result.name);
662   fail_unless (gst_buffer_memcmp (result.data, 0, data, sizeof (data)) == 0);
663
664   g_free (result.name);
665   gst_buffer_unref (result.data);
666
667   session_harness_free (h);
668 }
669
670 GST_END_TEST;
671
672 static void
673 stats_test_cb (GObject * object, GParamSpec * spec, gpointer data)
674 {
675   guint num_sources = 0;
676   gboolean *cb_called = data;
677   g_assert (*cb_called == FALSE);
678   *cb_called = TRUE;
679
680   /* We should be able to get a rtpsession property
681      without introducing the deadlock */
682   g_object_get (object, "num-sources", &num_sources, NULL);
683 }
684
685 GST_START_TEST (test_dont_lock_on_stats)
686 {
687   GstHarness *h_rtcp;
688   GstHarness *h_send;
689   GstClock *clock = gst_test_clock_new ();
690   GstTestClock *testclock = GST_TEST_CLOCK (clock);
691   gboolean cb_called = FALSE;
692
693   /* use testclock as the systemclock to capture the rtcp thread waits */
694   gst_system_clock_set_default (GST_CLOCK (testclock));
695
696   h_rtcp =
697       gst_harness_new_with_padnames ("rtpsession", "recv_rtcp_sink",
698       "send_rtcp_src");
699   h_send =
700       gst_harness_new_with_element (h_rtcp->element, "send_rtp_sink",
701       "send_rtp_src");
702
703   /* connect to the stats-reporting */
704   g_signal_connect (h_rtcp->element, "notify::stats",
705       G_CALLBACK (stats_test_cb), &cb_called);
706
707   /* "crank" and check the stats */
708   g_assert (gst_test_clock_crank (testclock));
709   gst_buffer_unref (gst_harness_pull (h_rtcp));
710   fail_unless (cb_called);
711
712   gst_harness_teardown (h_send);
713   gst_harness_teardown (h_rtcp);
714   gst_object_unref (clock);
715 }
716
717 GST_END_TEST;
718
719 static void
720 suspicious_bye_cb (GObject * object, GParamSpec * spec, gpointer data)
721 {
722   GValueArray *stats_arr;
723   GstStructure *stats, *internal_stats;
724   gboolean *cb_called = data;
725   gboolean internal = FALSE, sent_bye = TRUE;
726   guint ssrc = 0;
727   guint i;
728
729   g_assert (*cb_called == FALSE);
730   *cb_called = TRUE;
731
732   g_object_get (object, "stats", &stats, NULL);
733   stats_arr =
734       g_value_get_boxed (gst_structure_get_value (stats, "source-stats"));
735   g_assert (stats_arr != NULL);
736   fail_unless (stats_arr->n_values >= 1);
737
738   for (i = 0; i < stats_arr->n_values; i++) {
739     internal_stats = g_value_get_boxed (g_value_array_get_nth (stats_arr, i));
740     g_assert (internal_stats != NULL);
741
742     gst_structure_get (internal_stats,
743         "ssrc", G_TYPE_UINT, &ssrc,
744         "internal", G_TYPE_BOOLEAN, &internal,
745         "received-bye", G_TYPE_BOOLEAN, &sent_bye, NULL);
746
747     if (ssrc == 0xDEADBEEF) {
748       fail_unless (internal);
749       fail_unless (!sent_bye);
750       break;
751     }
752   }
753   fail_unless_equals_int (ssrc, 0xDEADBEEF);
754
755   gst_structure_free (stats);
756 }
757
758 static GstBuffer *
759 create_bye_rtcp (guint32 ssrc)
760 {
761   GstRTCPPacket packet;
762   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
763   GSocketAddress *saddr;
764   GstBuffer *buffer = gst_rtcp_buffer_new (1000);
765
766   fail_unless (gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp));
767   fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_BYE, &packet));
768   gst_rtcp_packet_bye_add_ssrc (&packet, ssrc);
769   gst_rtcp_buffer_unmap (&rtcp);
770
771   /* Need to add meta to trigger collision detection */
772   saddr = g_inet_socket_address_new_from_string ("127.0.0.1", 3490);
773   gst_buffer_add_net_address_meta (buffer, saddr);
774   g_object_unref (saddr);
775   return buffer;
776 }
777
778 GST_START_TEST (test_ignore_suspicious_bye)
779 {
780   SessionHarness *h = session_harness_new ();
781   gboolean cb_called = FALSE;
782
783   /* connect to the stats-reporting */
784   g_signal_connect (h->session, "notify::stats",
785       G_CALLBACK (suspicious_bye_cb), &cb_called);
786
787   /* Push RTP buffer making our internal SSRC=0xDEADBEEF */
788   fail_unless_equals_int (GST_FLOW_OK,
789       session_harness_send_rtp (h, generate_test_buffer (0, 0xDEADBEEF)));
790
791   /* Receive BYE RTCP referencing our internal SSRC(!?!) (0xDEADBEEF) */
792   fail_unless_equals_int (GST_FLOW_OK,
793       session_harness_recv_rtcp (h, create_bye_rtcp (0xDEADBEEF)));
794
795   /* "crank" and check the stats */
796   session_harness_crank_clock (h);
797   gst_buffer_unref (session_harness_pull_rtcp (h));
798   fail_unless (cb_called);
799
800   session_harness_free (h);
801 }
802
803 GST_END_TEST;
804
805 static GstBuffer *
806 create_buffer (guint8 * data, gsize size)
807 {
808   return gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
809       data, size, 0, size, NULL, NULL);
810 }
811
812 GST_START_TEST (test_receive_regular_pli)
813 {
814   SessionHarness *h = session_harness_new ();
815   GstEvent *ev;
816
817   /* PLI packet */
818   guint8 rtcp_pkt[] = {
819     0x81,                       /* PLI */
820     0xce,                       /* Type 206 Application layer feedback */
821     0x00, 0x02,                 /* Length */
822     0x37, 0x56, 0x93, 0xed,     /* Sender SSRC */
823     0x37, 0x56, 0x93, 0xed      /* Media SSRC */
824   };
825
826   fail_unless_equals_int (GST_FLOW_OK,
827       session_harness_send_rtp (h, generate_test_buffer (0, 928420845)));
828
829   session_harness_recv_rtcp (h, create_buffer (rtcp_pkt, sizeof (rtcp_pkt)));
830   fail_unless_equals_int (3,
831       gst_harness_upstream_events_received (h->send_rtp_h));
832
833   /* Remove the first 2 reconfigure events */
834   fail_unless ((ev = gst_harness_pull_upstream_event (h->send_rtp_h)) != NULL);
835   fail_unless_equals_int (GST_EVENT_RECONFIGURE, GST_EVENT_TYPE (ev));
836   gst_event_unref (ev);
837   fail_unless ((ev = gst_harness_pull_upstream_event (h->send_rtp_h)) != NULL);
838   fail_unless_equals_int (GST_EVENT_RECONFIGURE, GST_EVENT_TYPE (ev));
839   gst_event_unref (ev);
840
841   /* Then pull and check the force key-unit event */
842   fail_unless ((ev = gst_harness_pull_upstream_event (h->send_rtp_h)) != NULL);
843   fail_unless_equals_int (GST_EVENT_CUSTOM_UPSTREAM, GST_EVENT_TYPE (ev));
844   fail_unless (gst_video_event_is_force_key_unit (ev));
845   gst_event_unref (ev);
846
847   session_harness_free (h);
848 }
849
850 GST_END_TEST;
851
852 GST_START_TEST (test_receive_pli_no_sender_ssrc)
853 {
854   SessionHarness *h = session_harness_new ();
855   GstEvent *ev;
856
857   /* PLI packet */
858   guint8 rtcp_pkt[] = {
859     0x81,                       /* PLI */
860     0xce,                       /* Type 206 Application layer feedback */
861     0x00, 0x02,                 /* Length */
862     0x00, 0x00, 0x00, 0x00,     /* Sender SSRC */
863     0x37, 0x56, 0x93, 0xed      /* Media SSRC */
864   };
865
866   fail_unless_equals_int (GST_FLOW_OK,
867       session_harness_send_rtp (h, generate_test_buffer (0, 928420845)));
868
869   session_harness_recv_rtcp (h, create_buffer (rtcp_pkt, sizeof (rtcp_pkt)));
870   fail_unless_equals_int (3,
871       gst_harness_upstream_events_received (h->send_rtp_h));
872
873   /* Remove the first 2 reconfigure events */
874   fail_unless ((ev = gst_harness_pull_upstream_event (h->send_rtp_h)) != NULL);
875   fail_unless_equals_int (GST_EVENT_RECONFIGURE, GST_EVENT_TYPE (ev));
876   gst_event_unref (ev);
877   fail_unless ((ev = gst_harness_pull_upstream_event (h->send_rtp_h)) != NULL);
878   fail_unless_equals_int (GST_EVENT_RECONFIGURE, GST_EVENT_TYPE (ev));
879   gst_event_unref (ev);
880
881   /* Then pull and check the force key-unit event */
882   fail_unless ((ev = gst_harness_pull_upstream_event (h->send_rtp_h)) != NULL);
883   fail_unless_equals_int (GST_EVENT_CUSTOM_UPSTREAM, GST_EVENT_TYPE (ev));
884   fail_unless (gst_video_event_is_force_key_unit (ev));
885   gst_event_unref (ev);
886
887   session_harness_free (h);
888 }
889
890 GST_END_TEST;
891
892 GST_START_TEST (test_illegal_rtcp_fb_packet)
893 {
894   SessionHarness *h = session_harness_new ();
895   GstBuffer *buf;
896   /* Zero length RTCP feedback packet (reduced size) */
897   const guint8 rtcp_zero_fb_pkt[] = { 0x8f, 0xce, 0x00, 0x00 };
898
899   g_object_set (h->internal_session, "internal-ssrc", 0xDEADBEEF, NULL);
900
901   buf = gst_buffer_new_and_alloc (sizeof (rtcp_zero_fb_pkt));
902   gst_buffer_fill (buf, 0, rtcp_zero_fb_pkt, sizeof (rtcp_zero_fb_pkt));
903   GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) = G_GUINT64_CONSTANT (0);
904
905   /* Push the packet, this did previously crash because length of packet was
906    * never validated. */
907   fail_unless_equals_int (GST_FLOW_OK, session_harness_recv_rtcp (h, buf));
908
909   session_harness_free (h);
910 }
911
912 GST_END_TEST;
913
914 typedef struct
915 {
916   GCond *cond;
917   GMutex *mutex;
918   gboolean fired;
919 } FeedbackRTCPCallbackData;
920
921 static void
922 feedback_rtcp_cb (GstElement * element, guint fbtype, guint fmt,
923     guint sender_ssrc, guint media_ssrc, GstBuffer * fci,
924     FeedbackRTCPCallbackData * cb_data)
925 {
926   g_mutex_lock (cb_data->mutex);
927   cb_data->fired = TRUE;
928   g_cond_wait (cb_data->cond, cb_data->mutex);
929   g_mutex_unlock (cb_data->mutex);
930 }
931
932 static void *
933 send_feedback_rtcp (SessionHarness * h)
934 {
935   GstRTCPPacket packet;
936   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
937   GstBuffer *buffer = gst_rtcp_buffer_new (1000);
938
939   fail_unless (gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp));
940   fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_PSFB, &packet));
941   gst_rtcp_packet_fb_set_type (&packet, GST_RTCP_PSFB_TYPE_PLI);
942   gst_rtcp_packet_fb_set_fci_length (&packet, 0);
943   gst_rtcp_packet_fb_set_media_ssrc (&packet, 0xABE2B0B);
944   gst_rtcp_packet_fb_set_media_ssrc (&packet, 0xDEADBEEF);
945   gst_rtcp_buffer_unmap (&rtcp);
946   fail_unless_equals_int (GST_FLOW_OK, session_harness_recv_rtcp (h, buffer));
947
948   return NULL;
949 }
950
951 GST_START_TEST (test_feedback_rtcp_race)
952 {
953   SessionHarness *h = session_harness_new ();
954
955   GCond cond;
956   GMutex mutex;
957   FeedbackRTCPCallbackData cb_data;
958   GThread *send_rtcp_thread;
959
960   g_cond_init (&cond);
961   g_mutex_init (&mutex);
962   cb_data.cond = &cond;
963   cb_data.mutex = &mutex;
964   cb_data.fired = FALSE;
965   g_signal_connect (h->internal_session, "on-feedback-rtcp",
966       G_CALLBACK (feedback_rtcp_cb), &cb_data);
967
968   /* Push RTP buffer making external source with SSRC=0xDEADBEEF */
969   fail_unless_equals_int (GST_FLOW_OK, session_harness_recv_rtp (h,
970           generate_test_buffer (0, 0xDEADBEEF)));
971
972   /* Push feedback RTCP with media SSRC=0xDEADBEEF */
973   send_rtcp_thread = g_thread_new (NULL, (GThreadFunc) send_feedback_rtcp, h);
974
975   /* Waiting for feedback RTCP callback to fire */
976   while (!cb_data.fired)
977     g_usleep (G_USEC_PER_SEC / 100);
978
979   /* While send_rtcp_thread thread is waiting for our signal
980      advance the clock by 30sec triggering removal of 0xDEADBEEF,
981      as if the source was inactive for too long */
982   session_harness_advance_and_crank (h, GST_SECOND * 30);
983   gst_buffer_unref (session_harness_pull_rtcp (h));
984
985   /* Let send_rtcp_thread finish */
986   g_mutex_lock (&mutex);
987   g_cond_signal (&cond);
988   g_mutex_unlock (&mutex);
989   g_thread_join (send_rtcp_thread);
990
991   session_harness_free (h);
992 }
993
994 GST_END_TEST;
995
996 static Suite *
997 rtpsession_suite (void)
998 {
999   Suite *s = suite_create ("rtpsession");
1000   TCase *tc_chain = tcase_create ("general");
1001
1002   suite_add_tcase (s, tc_chain);
1003   tcase_add_test (tc_chain, test_multiple_ssrc_rr);
1004   tcase_add_test (tc_chain, test_multiple_senders_roundrobin_rbs);
1005   tcase_add_test (tc_chain, test_no_rbs_for_internal_senders);
1006   tcase_add_test (tc_chain, test_internal_sources_timeout);
1007   tcase_add_test (tc_chain, test_receive_rtcp_app_packet);
1008   tcase_add_test (tc_chain, test_dont_lock_on_stats);
1009   tcase_add_test (tc_chain, test_ignore_suspicious_bye);
1010   tcase_add_test (tc_chain, test_illegal_rtcp_fb_packet);
1011   tcase_add_test (tc_chain, test_feedback_rtcp_race);
1012   tcase_add_test (tc_chain, test_receive_regular_pli);
1013   tcase_add_test (tc_chain, test_receive_pli_no_sender_ssrc);
1014   return s;
1015 }
1016
1017 GST_CHECK_MAIN (rtpsession);