rtpjitterbuffer: improve clear-pt-map handling
[platform/upstream/gst-plugins-good.git] / tests / check / elements / rtpjitterbuffer.c
1 /* GStreamer
2  *
3  * Copyright (C) 2009 Nokia Corporation and its subsidary(-ies)
4  *               contact: <stefan.kost@nokia.com>
5  * Copyright (C) 2012 Cisco Systems, Inc
6  *               Authors: Kelley Rogers <kelro@cisco.com>
7  *               Havard Graff <hgraff@cisco.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 #include <gst/check/gstcheck.h>
26 #include <gst/check/gsttestclock.h>
27
28 #include <gst/rtp/gstrtpbuffer.h>
29
30 /* For ease of programming we use globals to keep refs for our floating
31  * src and sink pads we create; otherwise we always have to do get_pad,
32  * get_peer, and then remove references in every test function */
33 static GstPad *mysrcpad, *mysinkpad;
34 /* we also have a list of src buffers */
35 static GList *inbuffers = NULL;
36 static gint num_dropped = 0;
37
38 #define RTP_CAPS_STRING    \
39     "application/x-rtp, "               \
40     "media = (string)audio, "           \
41     "payload = (int) 0, "               \
42     "clock-rate = (int) 8000, "         \
43     "encoding-name = (string)PCMU"
44
45 #define RTP_FRAME_SIZE 20
46
47 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
48     GST_PAD_SINK,
49     GST_PAD_ALWAYS,
50     GST_STATIC_CAPS ("application/x-rtp")
51     );
52 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
53     GST_PAD_SRC,
54     GST_PAD_ALWAYS,
55     GST_STATIC_CAPS ("application/x-rtp, "
56         "clock-rate = (int) [ 1, 2147483647 ]")
57     );
58
59 static void
60 buffer_dropped (gpointer data, GstMiniObject * obj)
61 {
62   GST_DEBUG ("dropping buffer %p", obj);
63   num_dropped++;
64 }
65
66 static GstElement *
67 setup_jitterbuffer (gint num_buffers)
68 {
69   GstElement *jitterbuffer;
70   GstClock *clock;
71   GstBuffer *buffer;
72   GstCaps *caps;
73   /* a 20 sample audio block (2,5 ms) generated with
74    * gst-launch audiotestsrc wave=silence blocksize=40 num-buffers=3 !
75    *    "audio/x-raw,channels=1,rate=8000" ! mulawenc ! rtppcmupay !
76    *     fakesink dump=1
77    */
78   guint8 in[] = {               /* first 4 bytes are rtp-header, next 4 bytes are timestamp */
79     0x80, 0x80, 0x1c, 0x24, 0x46, 0xcd, 0xb7, 0x11, 0x3c, 0x3a, 0x7c, 0x5b,
80     0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
81     0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
82   };
83   GstClockTime ts = G_GUINT64_CONSTANT (0);
84   GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000);
85   /*guint latency = GST_TIME_AS_MSECONDS (num_buffers * tso); */
86   gint i;
87
88   GST_DEBUG ("setup_jitterbuffer");
89   jitterbuffer = gst_check_setup_element ("rtpjitterbuffer");
90   /* we need a clock here */
91   clock = gst_system_clock_obtain ();
92   gst_element_set_clock (jitterbuffer, clock);
93   gst_object_unref (clock);
94   /* setup latency */
95   /* latency would be 7 for 3 buffers here, default is 200
96      g_object_set (G_OBJECT (jitterbuffer), "latency", latency, NULL);
97      GST_INFO_OBJECT (jitterbuffer, "set latency to %u ms", latency);
98    */
99
100   mysrcpad = gst_check_setup_src_pad (jitterbuffer, &srctemplate);
101   mysinkpad = gst_check_setup_sink_pad (jitterbuffer, &sinktemplate);
102   gst_pad_set_active (mysrcpad, TRUE);
103   gst_pad_set_active (mysinkpad, TRUE);
104
105   /* create n buffers */
106   caps = gst_caps_from_string (RTP_CAPS_STRING);
107   gst_check_setup_events (mysrcpad, jitterbuffer, caps, GST_FORMAT_TIME);
108   gst_caps_unref (caps);
109
110   for (i = 0; i < num_buffers; i++) {
111     buffer = gst_buffer_new_and_alloc (sizeof (in));
112     gst_buffer_fill (buffer, 0, in, sizeof (in));
113     GST_BUFFER_DTS (buffer) = ts;
114     GST_BUFFER_PTS (buffer) = ts;
115     GST_BUFFER_DURATION (buffer) = tso;
116     gst_mini_object_weak_ref (GST_MINI_OBJECT (buffer), buffer_dropped, NULL);
117     GST_DEBUG ("created buffer: %p", buffer);
118
119     if (!i)
120       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
121
122     inbuffers = g_list_append (inbuffers, buffer);
123
124     /* hackish way to update the rtp header */
125     in[1] = 0x00;
126     in[3]++;                    /* seqnumber */
127     in[7] += RTP_FRAME_SIZE;    /* inc. timestamp with framesize */
128     ts += tso;
129   }
130   num_dropped = 0;
131
132   return jitterbuffer;
133 }
134
135 static GstStateChangeReturn
136 start_jitterbuffer (GstElement * jitterbuffer)
137 {
138   GstStateChangeReturn ret;
139   GstClockTime now;
140   GstClock *clock;
141
142   clock = gst_element_get_clock (jitterbuffer);
143   now = gst_clock_get_time (clock);
144   gst_object_unref (clock);
145
146   gst_element_set_base_time (jitterbuffer, now);
147   ret = gst_element_set_state (jitterbuffer, GST_STATE_PLAYING);
148
149   return ret;
150 }
151
152 static void
153 cleanup_jitterbuffer (GstElement * jitterbuffer)
154 {
155   GST_DEBUG ("cleanup_jitterbuffer");
156
157   g_list_foreach (buffers, (GFunc) gst_mini_object_unref, NULL);
158   g_list_free (buffers);
159   buffers = NULL;
160
161   g_list_free (inbuffers);
162   inbuffers = NULL;
163
164   gst_pad_set_active (mysrcpad, FALSE);
165   gst_pad_set_active (mysinkpad, FALSE);
166   gst_check_teardown_src_pad (jitterbuffer);
167   gst_check_teardown_sink_pad (jitterbuffer);
168   gst_check_teardown_element (jitterbuffer);
169 }
170
171 static void
172 check_jitterbuffer_results (GstElement * jitterbuffer, gint num_buffers)
173 {
174   GstBuffer *buffer;
175   GList *node;
176   GstClockTime ts = G_GUINT64_CONSTANT (0);
177   GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000);
178   GstMapInfo map;
179   guint16 prev_sn = 0, cur_sn;
180   guint32 prev_ts = 0, cur_ts;
181
182   /* sleep for twice the latency */
183   g_usleep (400 * 1000);
184
185   GST_INFO ("of %d buffer %d/%d received/dropped", num_buffers,
186       g_list_length (buffers), num_dropped);
187   /* if this fails, not all buffers have been processed */
188   fail_unless_equals_int ((g_list_length (buffers) + num_dropped), num_buffers);
189
190   /* check the buffer list */
191   fail_unless_equals_int (g_list_length (buffers), num_buffers);
192   for (node = buffers; node; node = g_list_next (node)) {
193     fail_if ((buffer = (GstBuffer *) node->data) == NULL);
194     fail_if (GST_BUFFER_PTS (buffer) != ts);
195     fail_if (GST_BUFFER_DTS (buffer) != ts);
196     gst_buffer_map (buffer, &map, GST_MAP_READ);
197     cur_sn = ((guint16) map.data[2] << 8) | map.data[3];
198     cur_ts = ((guint32) map.data[4] << 24) | ((guint32) map.data[5] << 16) |
199         ((guint32) map.data[6] << 8) | map.data[7];
200     gst_buffer_unmap (buffer, &map);
201
202     if (node != buffers) {
203       fail_unless (cur_sn > prev_sn);
204       fail_unless (cur_ts > prev_ts);
205
206       prev_sn = cur_sn;
207       prev_ts = cur_ts;
208     }
209     ts += tso;
210   }
211 }
212
213 GST_START_TEST (test_push_forward_seq)
214 {
215   GstElement *jitterbuffer;
216   const guint num_buffers = 3;
217   GstBuffer *buffer;
218   GList *node;
219
220   jitterbuffer = setup_jitterbuffer (num_buffers);
221   fail_unless (start_jitterbuffer (jitterbuffer)
222       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
223
224   /* push buffers: 0,1,2, */
225   for (node = inbuffers; node; node = g_list_next (node)) {
226     buffer = (GstBuffer *) node->data;
227     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
228   }
229
230   /* check the buffer list */
231   check_jitterbuffer_results (jitterbuffer, num_buffers);
232
233   /* cleanup */
234   cleanup_jitterbuffer (jitterbuffer);
235 }
236
237 GST_END_TEST;
238
239 GST_START_TEST (test_push_backward_seq)
240 {
241   GstElement *jitterbuffer;
242   const guint num_buffers = 4;
243   GstBuffer *buffer;
244   GList *node;
245
246   jitterbuffer = setup_jitterbuffer (num_buffers);
247   fail_unless (start_jitterbuffer (jitterbuffer)
248       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
249
250   /* push buffers: 0,3,2,1 */
251   buffer = (GstBuffer *) inbuffers->data;
252   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
253   for (node = g_list_last (inbuffers); node != inbuffers;
254       node = g_list_previous (node)) {
255     buffer = (GstBuffer *) node->data;
256     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
257   }
258
259   /* check the buffer list */
260   check_jitterbuffer_results (jitterbuffer, num_buffers);
261
262   /* cleanup */
263   cleanup_jitterbuffer (jitterbuffer);
264 }
265
266 GST_END_TEST;
267
268 GST_START_TEST (test_push_unordered)
269 {
270   GstElement *jitterbuffer;
271   const guint num_buffers = 4;
272   GstBuffer *buffer;
273
274   jitterbuffer = setup_jitterbuffer (num_buffers);
275   fail_unless (start_jitterbuffer (jitterbuffer)
276       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
277
278   /* push buffers; 0,2,1,3 */
279   buffer = (GstBuffer *) inbuffers->data;
280   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
281   buffer = g_list_nth_data (inbuffers, 2);
282   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
283   buffer = g_list_nth_data (inbuffers, 1);
284   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
285   buffer = g_list_nth_data (inbuffers, 3);
286   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
287
288   /* check the buffer list */
289   check_jitterbuffer_results (jitterbuffer, num_buffers);
290
291   /* cleanup */
292   cleanup_jitterbuffer (jitterbuffer);
293 }
294
295 GST_END_TEST;
296
297 GST_START_TEST (test_basetime)
298 {
299   GstElement *jitterbuffer;
300   const guint num_buffers = 3;
301   GstBuffer *buffer;
302   GList *node;
303   GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000);
304
305   jitterbuffer = setup_jitterbuffer (num_buffers);
306   fail_unless (start_jitterbuffer (jitterbuffer)
307       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
308
309   /* push buffers: 2,1,0 */
310   for (node = g_list_last (inbuffers); node; node = g_list_previous (node)) {
311     buffer = (GstBuffer *) node->data;
312     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
313   }
314
315   /* sleep for twice the latency */
316   g_usleep (400 * 1000);
317
318   /* if this fails, not all buffers have been processed */
319   fail_unless_equals_int ((g_list_length (buffers) + num_dropped), num_buffers);
320
321   buffer = (GstBuffer *) buffers->data;
322   fail_unless (GST_BUFFER_DTS (buffer) != (num_buffers * tso));
323   fail_unless (GST_BUFFER_PTS (buffer) != (num_buffers * tso));
324
325   /* cleanup */
326   cleanup_jitterbuffer (jitterbuffer);
327 }
328
329 GST_END_TEST;
330
331 static GstCaps *
332 request_pt_map (GstElement * jitterbuffer, guint pt)
333 {
334   fail_unless (pt == 0);
335
336   return gst_caps_from_string (RTP_CAPS_STRING);
337 }
338
339 GST_START_TEST (test_clear_pt_map)
340 {
341   GstElement *jitterbuffer;
342   const guint num_buffers = 10;
343   gint i;
344   GstBuffer *buffer;
345   GList *node;
346
347   jitterbuffer = setup_jitterbuffer (num_buffers);
348   fail_unless (start_jitterbuffer (jitterbuffer)
349       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
350
351   g_signal_connect (jitterbuffer, "request-pt-map", (GCallback)
352       request_pt_map, NULL);
353
354   /* push buffers: 0,1,2, */
355   for (node = inbuffers, i = 0; node && i < 3; node = g_list_next (node), i++) {
356     buffer = (GstBuffer *) node->data;
357     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
358   }
359
360   g_usleep (400 * 1000);
361
362   g_signal_emit_by_name (jitterbuffer, "clear-pt-map", NULL);
363
364   for (; node && i < 10; node = g_list_next (node), i++) {
365     buffer = (GstBuffer *) node->data;
366     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
367   }
368
369   /* check the buffer list */
370   check_jitterbuffer_results (jitterbuffer, num_buffers);
371
372   /* cleanup */
373   cleanup_jitterbuffer (jitterbuffer);
374 }
375
376 GST_END_TEST;
377 static const guint payload_size = 160;
378 static const guint clock_rate = 8000;
379 static const guint pcmu_payload_type = 0;
380 static const guint test_ssrc = 0x01BADBAD;
381
382 typedef struct
383 {
384   GstElement *jitter_buffer;
385   GstPad *test_sink_pad, *test_src_pad;
386   GstClock *clock;
387   GAsyncQueue *buf_queue;
388   GAsyncQueue *sink_event_queue;
389   GAsyncQueue *src_event_queue;
390   gint lost_event_count;
391   gint rtx_event_count;
392 } TestData;
393
394 static GstCaps *
395 generate_caps (void)
396 {
397   return gst_caps_new_simple ("application/x-rtp",
398       "media", G_TYPE_STRING, "audio",
399       "clock-rate", G_TYPE_INT, clock_rate,
400       "encoding-name", G_TYPE_STRING, "PCMU",
401       "payload", G_TYPE_INT, pcmu_payload_type,
402       "ssrc", G_TYPE_UINT, test_ssrc, NULL);
403 }
404
405 static GstBuffer *
406 generate_test_buffer (GstClockTime gst_ts,
407     gboolean marker_bit, guint seq_num, guint32 rtp_ts)
408 {
409   GstBuffer *buf;
410   guint8 *payload;
411   guint i;
412   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
413
414   buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0);
415   GST_BUFFER_DTS (buf) = gst_ts;
416   GST_BUFFER_PTS (buf) = gst_ts;
417
418   gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
419   gst_rtp_buffer_set_payload_type (&rtp, pcmu_payload_type);
420   gst_rtp_buffer_set_marker (&rtp, marker_bit);
421   gst_rtp_buffer_set_seq (&rtp, seq_num);
422   gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
423   gst_rtp_buffer_set_ssrc (&rtp, test_ssrc);
424
425   payload = gst_rtp_buffer_get_payload (&rtp);
426   for (i = 0; i < payload_size; i++)
427     payload[i] = 0xff;
428
429   gst_rtp_buffer_unmap (&rtp);
430
431   return buf;
432 }
433
434 static GstFlowReturn
435 test_sink_pad_chain_cb (GstPad * pad, GstObject * parent, GstBuffer * buffer)
436 {
437   TestData *data = gst_pad_get_element_private (pad);
438   g_async_queue_push (data->buf_queue, buffer);
439   return GST_FLOW_OK;
440 }
441
442 static gboolean
443 test_sink_pad_event_cb (GstPad * pad, GstObject * parent, GstEvent * event)
444 {
445   TestData *data = gst_pad_get_element_private (pad);
446   const GstStructure *structure = gst_event_get_structure (event);
447
448   GST_DEBUG ("got event %" GST_PTR_FORMAT, event);
449
450   if (strcmp (gst_structure_get_name (structure), "GstRTPPacketLost") == 0) {
451     data->lost_event_count++;
452     GST_DEBUG ("lost event count %d", data->lost_event_count);
453   }
454
455   g_async_queue_push (data->sink_event_queue, event);
456   return TRUE;
457 }
458
459 static gboolean
460 test_src_pad_event_cb (GstPad * pad, GstObject * parent, GstEvent * event)
461 {
462   TestData *data = gst_pad_get_element_private (pad);
463   const GstStructure *structure = gst_event_get_structure (event);
464
465   GST_DEBUG ("got event %" GST_PTR_FORMAT, event);
466
467   if (structure
468       && strcmp (gst_structure_get_name (structure),
469           "GstRTPRetransmissionRequest") == 0) {
470     data->rtx_event_count++;
471     GST_DEBUG ("rtx event count %d", data->rtx_event_count);
472   }
473
474   g_async_queue_push (data->src_event_queue, event);
475   return TRUE;
476 }
477
478 static void
479 setup_testharness (TestData * data)
480 {
481   GstPad *jb_sink_pad, *jb_src_pad;
482   GstSegment seg;
483   GstMiniObject *obj;
484
485   /* create the testclock */
486   data->clock = gst_test_clock_new ();
487   g_assert (data->clock);
488   gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), 0);
489
490   /* rig up the jitter buffer */
491   data->jitter_buffer = gst_element_factory_make ("rtpjitterbuffer", NULL);
492   g_assert (data->jitter_buffer);
493   gst_element_set_clock (data->jitter_buffer, data->clock);
494   g_object_set (data->jitter_buffer, "do-lost", TRUE, NULL);
495   g_assert_cmpint (gst_element_set_state (data->jitter_buffer,
496           GST_STATE_PLAYING), !=, GST_STATE_CHANGE_FAILURE);
497
498   /* set up the buf and event queues */
499   data->buf_queue =
500       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
501   data->sink_event_queue =
502       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
503   data->src_event_queue =
504       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
505
506   data->lost_event_count = 0;
507   data->rtx_event_count = 0;
508
509   /* link in the test source-pad */
510   data->test_src_pad = gst_pad_new ("src", GST_PAD_SRC);
511   gst_pad_set_element_private (data->test_src_pad, data);
512   gst_pad_set_event_function (data->test_src_pad, test_src_pad_event_cb);
513   jb_sink_pad = gst_element_get_static_pad (data->jitter_buffer, "sink");
514   g_assert_cmpint (gst_pad_link (data->test_src_pad, jb_sink_pad), ==,
515       GST_PAD_LINK_OK);
516   gst_object_unref (jb_sink_pad);
517
518   /* link in the test sink-pad */
519   data->test_sink_pad = gst_pad_new ("sink", GST_PAD_SINK);
520   gst_pad_set_element_private (data->test_sink_pad, data);
521   gst_pad_set_caps (data->test_sink_pad, generate_caps ());
522   gst_pad_set_chain_function (data->test_sink_pad, test_sink_pad_chain_cb);
523   gst_pad_set_event_function (data->test_sink_pad, test_sink_pad_event_cb);
524   jb_src_pad = gst_element_get_static_pad (data->jitter_buffer, "src");
525   g_assert_cmpint (gst_pad_link (jb_src_pad, data->test_sink_pad), ==,
526       GST_PAD_LINK_OK);
527   gst_object_unref (jb_src_pad);
528
529   g_assert (gst_pad_set_active (data->test_src_pad, TRUE));
530   g_assert (gst_pad_set_active (data->test_sink_pad, TRUE));
531
532   gst_segment_init (&seg, GST_FORMAT_TIME);
533
534   gst_pad_push_event (data->test_src_pad,
535       gst_event_new_stream_start ("stream0"));
536   gst_pad_set_caps (data->test_src_pad, generate_caps ());
537   gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg));
538
539   while ((obj = g_async_queue_try_pop (data->sink_event_queue)))
540     gst_mini_object_unref (obj);
541 }
542
543 static void
544 destroy_testharness (TestData * data)
545 {
546   /* clean up */
547   g_assert_cmpint (gst_element_set_state (data->jitter_buffer, GST_STATE_NULL),
548       ==, GST_STATE_CHANGE_SUCCESS);
549   gst_object_unref (data->jitter_buffer);
550   data->jitter_buffer = NULL;
551
552   gst_object_unref (data->test_src_pad);
553   data->test_src_pad = NULL;
554
555   gst_object_unref (data->test_sink_pad);
556   data->test_sink_pad = NULL;
557
558   gst_object_unref (data->clock);
559   data->clock = NULL;
560
561   g_async_queue_unref (data->buf_queue);
562   data->buf_queue = NULL;
563
564   g_async_queue_unref (data->sink_event_queue);
565   data->sink_event_queue = NULL;
566   g_async_queue_unref (data->src_event_queue);
567   data->src_event_queue = NULL;
568
569   data->lost_event_count = 0;
570 }
571
572 static void
573 verify_lost_event (GstEvent * event, guint32 expected_seqnum,
574     GstClockTime expected_timestamp, GstClockTime expected_duration,
575     gboolean expected_late)
576 {
577   const GstStructure *s = gst_event_get_structure (event);
578   const GValue *value;
579   guint32 seqnum;
580   GstClockTime timestamp;
581   GstClockTime duration;
582   gboolean late;
583
584   g_assert (gst_structure_get_uint (s, "seqnum", &seqnum));
585
586   value = gst_structure_get_value (s, "timestamp");
587   g_assert (value && G_VALUE_HOLDS_UINT64 (value));
588   timestamp = g_value_get_uint64 (value);
589
590   value = gst_structure_get_value (s, "duration");
591   g_assert (value && G_VALUE_HOLDS_UINT64 (value));
592   duration = g_value_get_uint64 (value);
593
594   g_assert (gst_structure_get_boolean (s, "late", &late));
595
596   g_assert_cmpint (seqnum, ==, expected_seqnum);
597   g_assert_cmpint (timestamp, ==, expected_timestamp);
598   g_assert_cmpint (duration, ==, expected_duration);
599   g_assert (late == expected_late);
600 }
601
602 static void
603 verify_rtx_event (GstEvent * event, guint32 expected_seqnum,
604     GstClockTime expected_timestamp, guint expected_delay,
605     GstClockTime expected_spacing)
606 {
607   const GstStructure *s = gst_event_get_structure (event);
608   const GValue *value;
609   guint32 seqnum;
610   GstClockTime timestamp, spacing;
611   guint delay;
612
613   g_assert (gst_structure_get_uint (s, "seqnum", &seqnum));
614
615   value = gst_structure_get_value (s, "running-time");
616   g_assert (value && G_VALUE_HOLDS_UINT64 (value));
617   timestamp = g_value_get_uint64 (value);
618
619   value = gst_structure_get_value (s, "delay");
620   g_assert (value && G_VALUE_HOLDS_UINT (value));
621   delay = g_value_get_uint (value);
622
623   value = gst_structure_get_value (s, "packet-spacing");
624   g_assert (value && G_VALUE_HOLDS_UINT64 (value));
625   spacing = g_value_get_uint64 (value);
626
627   g_assert_cmpint (seqnum, ==, expected_seqnum);
628   g_assert_cmpint (timestamp, ==, expected_timestamp);
629   g_assert_cmpint (delay, ==, expected_delay);
630   g_assert_cmpint (spacing, ==, expected_spacing);
631 }
632
633 GST_START_TEST (test_only_one_lost_event_on_large_gaps)
634 {
635   TestData data;
636   GstClockID id, test_id;
637   GstBuffer *in_buf, *out_buf;
638   GstEvent *out_event;
639   gint jb_latency_ms = 200;
640   guint buffer_size_ms = (payload_size * 1000) / clock_rate;
641   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
642
643   setup_testharness (&data);
644
645   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
646
647   /* push the first buffer in */
648   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
649   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
650   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
651
652   /* wait for the first buffer to be synced to timestamp + latency */
653   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
654
655   /* increase the time to timestamp + latency and release the wait */
656   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
657       jb_latency_ms * GST_MSECOND);
658   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
659       == id);
660
661   /* check for the buffer coming out that was pushed in */
662   out_buf = g_async_queue_pop (data.buf_queue);
663   g_assert (out_buf != NULL);
664   g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, 0);
665   g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, 0);
666
667   /* move time ahead 10 seconds */
668   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_SECOND);
669
670   /* wait a bit */
671   g_usleep (G_USEC_PER_SEC / 10);
672
673   /* check that no buffers have been pushed out and no pending waits */
674   g_assert_cmpint (g_async_queue_length (data.buf_queue), ==, 0);
675   g_assert (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock),
676           &id) == FALSE);
677
678   /* a buffer now arrives perfectly on time */
679   in_buf = generate_test_buffer (10 * GST_SECOND, FALSE, 500, 500 * 160);
680   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_SECOND);
681   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
682
683   /* release the wait */
684   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
685   gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock), GST_MSECOND * 20);
686   test_id = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
687   g_assert (id == test_id);
688
689   /* we should now receive a packet-lost-event for buffers 1 through 489 */
690   out_event = g_async_queue_pop (data.sink_event_queue);
691   g_assert (out_event != NULL);
692   g_assert_cmpint (data.lost_event_count, ==, 1);
693   verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 490,
694       TRUE);
695
696   /* churn through sync_times until the new buffer gets pushed out */
697   while (g_async_queue_length (data.buf_queue) < 1) {
698     if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
699       GstClockTime t = gst_clock_id_get_time (id);
700       if (t > gst_clock_get_time (data.clock)) {
701         gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
702       }
703       gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
704     }
705   }
706
707   out_buf = g_async_queue_pop (data.buf_queue);
708   g_assert (out_buf != NULL);
709   g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
710   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
711   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 500);
712   gst_rtp_buffer_unmap (&rtp);
713   g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, (10 * GST_SECOND));
714   g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, (10 * GST_SECOND));
715
716   /* we get as many lost events as the the number of buffers the jitterbuffer
717    * is able to wait for (+ the one we already got) */
718   g_assert_cmpint (data.lost_event_count, ==, jb_latency_ms / buffer_size_ms);
719
720   destroy_testharness (&data);
721 }
722
723 GST_END_TEST;
724
725 GST_START_TEST (test_two_lost_one_arrives_in_time)
726 {
727   TestData data;
728   GstClockID id;
729   GstBuffer *in_buf, *out_buf;
730   GstEvent *out_event;
731   gint jb_latency_ms = 100;
732   GstClockTime buffer_time, now;
733   gint b;
734   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
735
736   setup_testharness (&data);
737
738   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
739
740   /* push the first buffer in */
741   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
742   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
743   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
744   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
745   now = jb_latency_ms * GST_MSECOND;
746   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), now);
747   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
748       == id);
749   out_buf = g_async_queue_pop (data.buf_queue);
750   g_assert (out_buf != NULL);
751
752   /* push some buffers arriving in perfect time! */
753   for (b = 1; b < 3; b++) {
754     buffer_time = b * GST_MSECOND * 20;
755     in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
756     gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), now + buffer_time);
757     g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
758
759     /* check for the buffer coming out that was pushed in */
760     out_buf = g_async_queue_pop (data.buf_queue);
761     g_assert (out_buf != NULL);
762     g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, buffer_time);
763     g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, buffer_time);
764   }
765
766   /* hop over 2 packets and make another one (gap of 2) */
767   b = 5;
768   buffer_time = b * GST_MSECOND * 20;
769   in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
770   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
771
772   /* verify that the jitterbuffer now wait for the latest moment it can push */
773   /* the first lost buffer (buffer 3) out on (buffer-timestamp (60) + latency (10) = 70) */
774   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
775   g_assert_cmpint (gst_clock_id_get_time (id), ==,
776       (3 * GST_MSECOND * 20) + (jb_latency_ms * GST_MSECOND));
777
778   /* let the time expire... */
779   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
780       gst_clock_id_get_time (id));
781   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
782       == id);
783
784   /* we should now receive a packet-lost-event for buffer 3 */
785   out_event = g_async_queue_pop (data.sink_event_queue);
786   g_assert (out_event != NULL);
787   g_assert_cmpint (data.lost_event_count, ==, 1);
788   verify_lost_event (out_event, 3, 3 * GST_MSECOND * 20, GST_MSECOND * 20,
789       FALSE);
790
791   /* buffer 4 now arrives just in time (time is 70, buffer 4 expires at 90) */
792   b = 4;
793   buffer_time = b * GST_MSECOND * 20;
794   in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
795   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
796
797   /* verify that buffer 4 made it through! */
798   out_buf = g_async_queue_pop (data.buf_queue);
799   g_assert (out_buf != NULL);
800   g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
801   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
802   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 4);
803   gst_rtp_buffer_unmap (&rtp);
804
805   /* and see that buffer 5 now arrives in a normal fashion */
806   out_buf = g_async_queue_pop (data.buf_queue);
807   g_assert (out_buf != NULL);
808   g_assert (!GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
809   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
810   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 5);
811   gst_rtp_buffer_unmap (&rtp);
812
813   /* should still have only seen 1 packet lost event */
814   g_assert_cmpint (data.lost_event_count, ==, 1);
815
816   destroy_testharness (&data);
817 }
818
819 GST_END_TEST;
820
821 GST_START_TEST (test_late_packets_still_makes_lost_events)
822 {
823   TestData data;
824   GstClockID id;
825   GstBuffer *in_buf, *out_buf;
826   GstEvent *out_event;
827   gint jb_latency_ms = 10;
828   GstClockTime buffer_time;
829   gint b;
830   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
831
832   setup_testharness (&data);
833
834   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
835
836   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_SECOND);
837
838   /* push the first buffer in */
839   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
840   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
841
842   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
843   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
844       == id);
845   out_buf = g_async_queue_pop (data.buf_queue);
846   g_assert (out_buf != NULL);
847
848   /* push some buffers in! */
849   for (b = 1; b < 3; b++) {
850     buffer_time = b * GST_MSECOND * 20;
851     in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
852     g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
853
854     /* check for the buffer coming out that was pushed in */
855     out_buf = g_async_queue_pop (data.buf_queue);
856     g_assert (out_buf != NULL);
857     g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, buffer_time);
858     g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, buffer_time);
859   }
860
861   /* hop over 2 packets and make another one (gap of 2) */
862   b = 5;
863   buffer_time = b * GST_MSECOND * 20;
864   in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
865   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
866
867   /* we should now receive a packet-lost-event for buffer 3 and 4 */
868   out_event = g_async_queue_pop (data.sink_event_queue);
869   g_assert (out_event != NULL);
870   g_assert_cmpint (data.lost_event_count, ==, 1);
871   verify_lost_event (out_event, 3, 3 * GST_MSECOND * 20, GST_MSECOND * 20 * 2,
872       TRUE);
873
874   /* verify that buffer 5 made it through! */
875   out_buf = g_async_queue_pop (data.buf_queue);
876   g_assert (out_buf != NULL);
877   g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
878   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
879   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 5);
880   gst_rtp_buffer_unmap (&rtp);
881
882   /* should still have only seen 1 packet lost event */
883   g_assert_cmpint (data.lost_event_count, ==, 1);
884
885   destroy_testharness (&data);
886 }
887
888 GST_END_TEST;
889
890 GST_START_TEST (test_all_packets_are_timestamped_zero)
891 {
892   TestData data;
893   GstClockID id;
894   GstBuffer *in_buf, *out_buf;
895   GstEvent *out_event;
896   gint jb_latency_ms = 10;
897   gint b;
898   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
899
900   setup_testharness (&data);
901
902   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
903
904   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_SECOND);
905
906   /* push the first buffer in */
907   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
908   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
909
910   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
911   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
912       == id);
913   out_buf = g_async_queue_pop (data.buf_queue);
914   g_assert (out_buf != NULL);
915
916   /* push some buffers in! */
917   for (b = 1; b < 3; b++) {
918     in_buf = generate_test_buffer (0, TRUE, b, 0);
919     g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
920
921     /* check for the buffer coming out that was pushed in */
922     out_buf = g_async_queue_pop (data.buf_queue);
923     g_assert (out_buf != NULL);
924     g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, 0);
925     g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, 0);
926   }
927
928   /* hop over 2 packets and make another one (gap of 2) */
929   b = 5;
930   in_buf = generate_test_buffer (0, TRUE, b, 0);
931   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
932
933   /* we should now receive a packet-lost-event for buffer 3 and 4 */
934   out_event = g_async_queue_pop (data.sink_event_queue);
935   g_assert (out_event != NULL);
936   verify_lost_event (out_event, 3, 0, 0, FALSE);
937
938   out_event = g_async_queue_pop (data.sink_event_queue);
939   g_assert (out_event != NULL);
940   verify_lost_event (out_event, 4, 0, 0, FALSE);
941
942   g_assert_cmpint (data.lost_event_count, ==, 2);
943
944   /* verify that buffer 5 made it through! */
945   out_buf = g_async_queue_pop (data.buf_queue);
946   g_assert (out_buf != NULL);
947   g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
948   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
949   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 5);
950   gst_rtp_buffer_unmap (&rtp);
951
952   /* should still have only seen 2 packet lost events */
953   g_assert_cmpint (data.lost_event_count, ==, 2);
954
955   destroy_testharness (&data);
956 }
957
958 GST_END_TEST;
959
960 GST_START_TEST (test_rtx_expected_next)
961 {
962   TestData data;
963   GstClockID id, tid;
964   GstBuffer *in_buf, *out_buf;
965   GstEvent *out_event;
966   gint jb_latency_ms = 200;
967
968   setup_testharness (&data);
969   g_object_set (data.jitter_buffer, "do-retransmission", TRUE, NULL);
970   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
971   g_object_set (data.jitter_buffer, "rtx-retry-period", 120, NULL);
972
973   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
974
975   /* push the first buffer in */
976   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
977   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
978
979   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 20 * GST_MSECOND);
980
981   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
982
983   /* put second buffer, the jitterbuffer should now know that the packet spacing
984    * is 20ms and should ask for retransmission of seqnum 2 in 20ms */
985   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 1, 160);
986   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
987
988   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
989   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 60 * GST_MSECOND);
990   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
991       == id);
992
993   out_event = g_async_queue_pop (data.src_event_queue);
994   g_assert (out_event != NULL);
995   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 20, 20 * GST_MSECOND);
996
997   /* now we wait for the next timeout */
998   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
999   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 100 * GST_MSECOND);
1000   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1001   g_assert (id == tid);
1002
1003   out_event = g_async_queue_pop (data.src_event_queue);
1004   g_assert (out_event != NULL);
1005   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 60, 20 * GST_MSECOND);
1006
1007   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1008   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 140 * GST_MSECOND);
1009   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1010   g_assert (id == tid);
1011
1012   out_event = g_async_queue_pop (data.src_event_queue);
1013   g_assert (out_event != NULL);
1014   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 100, 20 * GST_MSECOND);
1015
1016   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1017   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 200 * GST_MSECOND);
1018   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1019   g_assert (id == tid);
1020
1021   out_buf = g_async_queue_pop (data.buf_queue);
1022   g_assert (out_buf != NULL);
1023
1024
1025   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1026   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 260 * GST_MSECOND);
1027   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
1028       == id);
1029
1030   /* we should now receive a packet-lost-event for buffer 2 */
1031   out_event = g_async_queue_pop (data.sink_event_queue);
1032   g_assert (out_event != NULL);
1033   verify_lost_event (out_event, 2, 40 * GST_MSECOND, 20 * GST_MSECOND, FALSE);
1034
1035   destroy_testharness (&data);
1036 }
1037
1038 GST_END_TEST;
1039
1040 GST_START_TEST (test_rtx_two_missing)
1041 {
1042   TestData data;
1043   GstClockID id, tid;
1044   GstBuffer *in_buf, *out_buf;
1045   GstEvent *out_event;
1046   gint jb_latency_ms = 200;
1047   gint i;
1048   GstStructure *rtx_stats;
1049   const GValue *rtx_stat;
1050   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1051
1052   setup_testharness (&data);
1053   g_object_set (data.jitter_buffer, "do-retransmission", TRUE, NULL);
1054   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
1055   g_object_set (data.jitter_buffer, "rtx-retry-period", 120, NULL);
1056
1057   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
1058
1059   /* push the first buffer in */
1060   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
1061   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1062
1063   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 20 * GST_MSECOND);
1064
1065   /* put second buffer, the jitterbuffer should now know that the packet spacing
1066    * is 20ms and should ask for retransmission of seqnum 2 at 60ms */
1067   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 1, 160);
1068   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1069
1070   /* push buffer 4, 2 and 3 are missing now, we should get retransmission events
1071    * for 3 at 100ms*/
1072   in_buf = generate_test_buffer (80 * GST_MSECOND, TRUE, 4, 4 * 160);
1073   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1074
1075   /* wait for first retransmission request */
1076   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 60 * GST_MSECOND);
1077   do {
1078     gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1079   } while (id !=
1080       gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)));
1081
1082   /* we should have 2 events now, one for 2 and another for 3 */
1083   out_event = g_async_queue_pop (data.src_event_queue);
1084   g_assert (out_event != NULL);
1085   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 20, 20 * GST_MSECOND);
1086   out_event = g_async_queue_pop (data.src_event_queue);
1087   g_assert (out_event != NULL);
1088   verify_rtx_event (out_event, 3, 60 * GST_MSECOND, 0, 20 * GST_MSECOND);
1089
1090   /* now we wait for the next timeout */
1091   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1092   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 100 * GST_MSECOND);
1093   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1094   g_assert (id == tid);
1095
1096   /* we should have 2 events now, one for 2 and another for 3 */
1097   out_event = g_async_queue_pop (data.src_event_queue);
1098   g_assert (out_event != NULL);
1099   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 60, 20 * GST_MSECOND);
1100   out_event = g_async_queue_pop (data.src_event_queue);
1101   g_assert (out_event != NULL);
1102   verify_rtx_event (out_event, 3, 60 * GST_MSECOND, 40, 20 * GST_MSECOND);
1103
1104   /* make buffer 3 */
1105   in_buf = generate_test_buffer (60 * GST_MSECOND, TRUE, 3, 3 * 160);
1106   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1107
1108   /* make more buffers */
1109   for (i = 5; i < 15; i++) {
1110     in_buf = generate_test_buffer (i * 20 * GST_MSECOND, TRUE, i, i * 160);
1111     g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1112   }
1113
1114   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1115   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 140 * GST_MSECOND);
1116   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1117   g_assert (id == tid);
1118
1119   /* now we only get requests for 2 */
1120   out_event = g_async_queue_pop (data.src_event_queue);
1121   g_assert (out_event != NULL);
1122   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 100, 20 * GST_MSECOND);
1123
1124   /* this is when buffer 0 deadline expires */
1125   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1126   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 200 * GST_MSECOND);
1127   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1128   g_assert (id == tid);
1129
1130   for (i = 0; i < 2; i++) {
1131     GST_DEBUG ("popping %d", i);
1132     out_buf = g_async_queue_pop (data.buf_queue);
1133     g_assert (out_buf != NULL);
1134     gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
1135     g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, i);
1136     gst_rtp_buffer_unmap (&rtp);
1137   }
1138
1139   /* this is when 2 is lost */
1140   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1141   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 240 * GST_MSECOND);
1142   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1143   g_assert (id == tid);
1144
1145   /* we should now receive a packet-lost-event for buffer 2 */
1146   out_event = g_async_queue_pop (data.sink_event_queue);
1147   g_assert (out_event != NULL);
1148   verify_lost_event (out_event, 2, 40 * GST_MSECOND, 20 * GST_MSECOND, FALSE);
1149
1150   /* verify that buffers made it through! */
1151   for (i = 3; i < 15; i++) {
1152     GST_DEBUG ("popping %d", i);
1153     out_buf = g_async_queue_pop (data.buf_queue);
1154     g_assert (out_buf != NULL);
1155     gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
1156     g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, i);
1157     gst_rtp_buffer_unmap (&rtp);
1158   }
1159   /* should still have only seen 1 packet lost events */
1160   g_assert_cmpint (data.lost_event_count, ==, 1);
1161
1162   g_object_get (data.jitter_buffer, "stats", &rtx_stats, NULL);
1163
1164   rtx_stat = gst_structure_get_value (rtx_stats, "rtx-count");
1165   g_assert_cmpuint (g_value_get_uint64 (rtx_stat), ==, 5);
1166
1167   rtx_stat = gst_structure_get_value (rtx_stats, "rtx-success-count");
1168   g_assert_cmpuint (g_value_get_uint64 (rtx_stat), ==, 1);
1169
1170   rtx_stat = gst_structure_get_value (rtx_stats, "rtx-rtt");
1171   g_assert_cmpuint (g_value_get_uint64 (rtx_stat), ==, 0);
1172
1173   destroy_testharness (&data);
1174 }
1175
1176 GST_END_TEST;
1177
1178 GST_START_TEST (test_rtx_packet_delay)
1179 {
1180   TestData data;
1181   GstClockID id, tid;
1182   GstBuffer *in_buf, *out_buf;
1183   GstEvent *out_event;
1184   gint jb_latency_ms = 200;
1185   gint i;
1186   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1187
1188   setup_testharness (&data);
1189   g_object_set (data.jitter_buffer, "do-retransmission", TRUE, NULL);
1190   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
1191   g_object_set (data.jitter_buffer, "rtx-retry-period", 120, NULL);
1192
1193   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
1194
1195   /* push the first buffer in */
1196   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
1197   GST_BUFFER_FLAG_SET (in_buf, GST_BUFFER_FLAG_DISCONT);
1198   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1199
1200   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 20 * GST_MSECOND);
1201
1202   /* put second buffer, the jitterbuffer should now know that the packet spacing
1203    * is 20ms and should ask for retransmission of seqnum 2 at 60ms */
1204   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 1, 160);
1205   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1206
1207   /* push buffer 8, 2 -> 7 are missing now. note that the rtp time is the same
1208    * as packet 1 because it was part of a fragmented payload. This means that
1209    * the estimate for 2 could be refined now to 20ms. also packet 2, 3 and 4 are
1210    * exceeding the max allowed reorder distance and should request a
1211    * retransmission right away */
1212   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 8, 8 * 160);
1213   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1214
1215   /* we should now receive retransmission requests for 2 -> 5 */
1216   out_event = g_async_queue_pop (data.src_event_queue);
1217   g_assert (out_event != NULL);
1218   verify_rtx_event (out_event, 2, 20 * GST_MSECOND, 40, 20 * GST_MSECOND);
1219
1220   for (i = 3; i < 5; i++) {
1221     GST_DEBUG ("popping %d", i);
1222     out_event = g_async_queue_pop (data.src_event_queue);
1223     g_assert (out_event != NULL);
1224     verify_rtx_event (out_event, i, 20 * GST_MSECOND, 0, 20 * GST_MSECOND);
1225   }
1226   g_assert_cmpint (data.rtx_event_count, ==, 3);
1227
1228   /* push 9, this should immediately request retransmission of 5 */
1229   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 9, 9 * 160);
1230   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1231
1232   /* we should now receive retransmission requests for 5 */
1233   out_event = g_async_queue_pop (data.src_event_queue);
1234   g_assert (out_event != NULL);
1235   verify_rtx_event (out_event, 5, 20 * GST_MSECOND, 0, 20 * GST_MSECOND);
1236
1237   /* wait for timeout for rtx 6 -> 7 */
1238   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1239   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1240   g_assert (id == tid);
1241
1242   for (i = 6; i < 8; i++) {
1243     GST_DEBUG ("popping %d", i);
1244     out_event = g_async_queue_pop (data.src_event_queue);
1245     g_assert (out_event != NULL);
1246     verify_rtx_event (out_event, i, 20 * GST_MSECOND, 0, 20 * GST_MSECOND);
1247   }
1248
1249   /* churn through sync_times until the new buffer gets pushed out */
1250   while (g_async_queue_length (data.buf_queue) < 1) {
1251     if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
1252       GstClockTime t = gst_clock_id_get_time (id);
1253       if (t > gst_clock_get_time (data.clock)) {
1254         gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
1255       }
1256       gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1257     }
1258   }
1259
1260   /* verify that buffer 0 and 1 made it through! */
1261   for (i = 0; i < 2; i++) {
1262     out_buf = g_async_queue_pop (data.buf_queue);
1263     g_assert (out_buf != NULL);
1264     if (i == 0)
1265       g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
1266     gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
1267     g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, i);
1268     gst_rtp_buffer_unmap (&rtp);
1269   }
1270
1271   /* churn through sync_times until the next buffer gets pushed out */
1272   while (g_async_queue_length (data.buf_queue) < 1) {
1273     if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
1274       GstClockTime t = gst_clock_id_get_time (id);
1275       if (t >= 240 * GST_MSECOND)
1276         break;
1277       if (t > gst_clock_get_time (data.clock)) {
1278         gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
1279       }
1280       gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1281     }
1282   }
1283
1284   for (i = 2; i < 8; i++) {
1285     GST_DEBUG ("popping lost event %d", i);
1286     out_event = g_async_queue_pop (data.sink_event_queue);
1287     g_assert (out_event != NULL);
1288     verify_lost_event (out_event, i, 20 * GST_MSECOND, 0, FALSE);
1289   }
1290
1291   /* verify that buffer 8 made it through! */
1292   for (i = 8; i < 10; i++) {
1293     GST_DEBUG ("popping buffer %d", i);
1294     out_buf = g_async_queue_pop (data.buf_queue);
1295     g_assert (out_buf != NULL);
1296     if (i == 8)
1297       g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
1298     gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
1299     g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, i);
1300     gst_rtp_buffer_unmap (&rtp);
1301   }
1302
1303   GST_DEBUG ("waiting for 240ms");
1304   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1305   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 240 * GST_MSECOND);
1306   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1307   g_assert (id == tid);
1308
1309   GST_DEBUG ("popping lost event 10");
1310   out_event = g_async_queue_pop (data.sink_event_queue);
1311   g_assert (out_event != NULL);
1312   verify_lost_event (out_event, 10, 40 * GST_MSECOND, 20 * GST_MSECOND, FALSE);
1313
1314   /* should have seen 6 packet lost events */
1315   g_assert_cmpint (data.lost_event_count, ==, 7);
1316   g_assert_cmpint (data.rtx_event_count, ==, 26);
1317
1318   destroy_testharness (&data);
1319 }
1320
1321 GST_END_TEST;
1322
1323 static Suite *
1324 rtpjitterbuffer_suite (void)
1325 {
1326   Suite *s = suite_create ("rtpjitterbuffer");
1327   TCase *tc_chain = tcase_create ("general");
1328
1329   suite_add_tcase (s, tc_chain);
1330   tcase_add_test (tc_chain, test_push_forward_seq);
1331   tcase_add_test (tc_chain, test_push_backward_seq);
1332   tcase_add_test (tc_chain, test_push_unordered);
1333   tcase_add_test (tc_chain, test_basetime);
1334   tcase_add_test (tc_chain, test_clear_pt_map);
1335   tcase_add_test (tc_chain, test_only_one_lost_event_on_large_gaps);
1336   tcase_add_test (tc_chain, test_two_lost_one_arrives_in_time);
1337   tcase_add_test (tc_chain, test_late_packets_still_makes_lost_events);
1338   tcase_add_test (tc_chain, test_all_packets_are_timestamped_zero);
1339   tcase_add_test (tc_chain, test_rtx_expected_next);
1340   tcase_add_test (tc_chain, test_rtx_two_missing);
1341   tcase_add_test (tc_chain, test_rtx_packet_delay);
1342
1343   return s;
1344 }
1345
1346 GST_CHECK_MAIN (rtpjitterbuffer);