rtpjitterbuffer: serialize events in the buffer
[platform/upstream/gst-plugins-good.git] / tests / check / elements / rtpjitterbuffer.c
1 /* GStreamer
2  *
3  * Copyright (C) 2009 Nokia Corporation and its subsidary(-ies)
4  *               contact: <stefan.kost@nokia.com>
5  * Copyright (C) 2012 Cisco Systems, Inc
6  *               Authors: Kelley Rogers <kelro@cisco.com>
7  *               Havard Graff <hgraff@cisco.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 #include <gst/check/gstcheck.h>
26 #include <gst/check/gsttestclock.h>
27
28 #include <gst/rtp/gstrtpbuffer.h>
29
30 /* For ease of programming we use globals to keep refs for our floating
31  * src and sink pads we create; otherwise we always have to do get_pad,
32  * get_peer, and then remove references in every test function */
33 static GstPad *mysrcpad, *mysinkpad;
34 /* we also have a list of src buffers */
35 static GList *inbuffers = NULL;
36 static gint num_dropped = 0;
37
38 #define RTP_CAPS_STRING    \
39     "application/x-rtp, "               \
40     "media = (string)audio, "           \
41     "payload = (int) 0, "               \
42     "clock-rate = (int) 8000, "         \
43     "encoding-name = (string)PCMU"
44
45 #define RTP_FRAME_SIZE 20
46
47 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
48     GST_PAD_SINK,
49     GST_PAD_ALWAYS,
50     GST_STATIC_CAPS ("application/x-rtp")
51     );
52 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
53     GST_PAD_SRC,
54     GST_PAD_ALWAYS,
55     GST_STATIC_CAPS ("application/x-rtp, "
56         "clock-rate = (int) [ 1, 2147483647 ]")
57     );
58
59 static void
60 buffer_dropped (gpointer data, GstMiniObject * obj)
61 {
62   GST_DEBUG ("dropping buffer %p", obj);
63   num_dropped++;
64 }
65
66 static GstElement *
67 setup_jitterbuffer (gint num_buffers)
68 {
69   GstElement *jitterbuffer;
70   GstClock *clock;
71   GstBuffer *buffer;
72   GstCaps *caps;
73   /* a 20 sample audio block (2,5 ms) generated with
74    * gst-launch audiotestsrc wave=silence blocksize=40 num-buffers=3 !
75    *    "audio/x-raw,channels=1,rate=8000" ! mulawenc ! rtppcmupay !
76    *     fakesink dump=1
77    */
78   guint8 in[] = {               /* first 4 bytes are rtp-header, next 4 bytes are timestamp */
79     0x80, 0x80, 0x1c, 0x24, 0x46, 0xcd, 0xb7, 0x11, 0x3c, 0x3a, 0x7c, 0x5b,
80     0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
81     0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
82   };
83   GstClockTime ts = G_GUINT64_CONSTANT (0);
84   GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000);
85   /*guint latency = GST_TIME_AS_MSECONDS (num_buffers * tso); */
86   gint i;
87
88   GST_DEBUG ("setup_jitterbuffer");
89   jitterbuffer = gst_check_setup_element ("rtpjitterbuffer");
90   /* we need a clock here */
91   clock = gst_system_clock_obtain ();
92   gst_element_set_clock (jitterbuffer, clock);
93   gst_object_unref (clock);
94   /* setup latency */
95   /* latency would be 7 for 3 buffers here, default is 200
96      g_object_set (G_OBJECT (jitterbuffer), "latency", latency, NULL);
97      GST_INFO_OBJECT (jitterbuffer, "set latency to %u ms", latency);
98    */
99
100   mysrcpad = gst_check_setup_src_pad (jitterbuffer, &srctemplate);
101   mysinkpad = gst_check_setup_sink_pad (jitterbuffer, &sinktemplate);
102   gst_pad_set_active (mysrcpad, TRUE);
103   gst_pad_set_active (mysinkpad, TRUE);
104
105   /* create n buffers */
106   caps = gst_caps_from_string (RTP_CAPS_STRING);
107   gst_check_setup_events (mysrcpad, jitterbuffer, caps, GST_FORMAT_TIME);
108   gst_caps_unref (caps);
109
110   for (i = 0; i < num_buffers; i++) {
111     buffer = gst_buffer_new_and_alloc (sizeof (in));
112     gst_buffer_fill (buffer, 0, in, sizeof (in));
113     GST_BUFFER_DTS (buffer) = ts;
114     GST_BUFFER_PTS (buffer) = ts;
115     GST_BUFFER_DURATION (buffer) = tso;
116     gst_mini_object_weak_ref (GST_MINI_OBJECT (buffer), buffer_dropped, NULL);
117     GST_DEBUG ("created buffer: %p", buffer);
118
119     if (!i)
120       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
121
122     inbuffers = g_list_append (inbuffers, buffer);
123
124     /* hackish way to update the rtp header */
125     in[1] = 0x00;
126     in[3]++;                    /* seqnumber */
127     in[7] += RTP_FRAME_SIZE;    /* inc. timestamp with framesize */
128     ts += tso;
129   }
130   num_dropped = 0;
131
132   return jitterbuffer;
133 }
134
135 static GstStateChangeReturn
136 start_jitterbuffer (GstElement * jitterbuffer)
137 {
138   GstStateChangeReturn ret;
139   GstClockTime now;
140   GstClock *clock;
141
142   clock = gst_element_get_clock (jitterbuffer);
143   now = gst_clock_get_time (clock);
144   gst_object_unref (clock);
145
146   gst_element_set_base_time (jitterbuffer, now);
147   ret = gst_element_set_state (jitterbuffer, GST_STATE_PLAYING);
148
149   return ret;
150 }
151
152 static void
153 cleanup_jitterbuffer (GstElement * jitterbuffer)
154 {
155   GST_DEBUG ("cleanup_jitterbuffer");
156
157   g_list_foreach (buffers, (GFunc) gst_mini_object_unref, NULL);
158   g_list_free (buffers);
159   buffers = NULL;
160
161   g_list_free (inbuffers);
162   inbuffers = NULL;
163
164   gst_pad_set_active (mysrcpad, FALSE);
165   gst_pad_set_active (mysinkpad, FALSE);
166   gst_check_teardown_src_pad (jitterbuffer);
167   gst_check_teardown_sink_pad (jitterbuffer);
168   gst_check_teardown_element (jitterbuffer);
169 }
170
171 static void
172 check_jitterbuffer_results (GstElement * jitterbuffer, gint num_buffers)
173 {
174   GstBuffer *buffer;
175   GList *node;
176   GstClockTime ts = G_GUINT64_CONSTANT (0);
177   GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000);
178   GstMapInfo map;
179   guint16 prev_sn = 0, cur_sn;
180   guint32 prev_ts = 0, cur_ts;
181
182   /* sleep for twice the latency */
183   g_usleep (400 * 1000);
184
185   GST_INFO ("of %d buffer %d/%d received/dropped", num_buffers,
186       g_list_length (buffers), num_dropped);
187   /* if this fails, not all buffers have been processed */
188   fail_unless_equals_int ((g_list_length (buffers) + num_dropped), num_buffers);
189
190   /* check the buffer list */
191   fail_unless_equals_int (g_list_length (buffers), num_buffers);
192   for (node = buffers; node; node = g_list_next (node)) {
193     fail_if ((buffer = (GstBuffer *) node->data) == NULL);
194     fail_if (GST_BUFFER_PTS (buffer) != ts);
195     fail_if (GST_BUFFER_DTS (buffer) != ts);
196     gst_buffer_map (buffer, &map, GST_MAP_READ);
197     cur_sn = ((guint16) map.data[2] << 8) | map.data[3];
198     cur_ts = ((guint32) map.data[4] << 24) | ((guint32) map.data[5] << 16) |
199         ((guint32) map.data[6] << 8) | map.data[7];
200     gst_buffer_unmap (buffer, &map);
201
202     if (node != buffers) {
203       fail_unless (cur_sn > prev_sn);
204       fail_unless (cur_ts > prev_ts);
205
206       prev_sn = cur_sn;
207       prev_ts = cur_ts;
208     }
209     ts += tso;
210   }
211 }
212
213 GST_START_TEST (test_push_forward_seq)
214 {
215   GstElement *jitterbuffer;
216   const guint num_buffers = 3;
217   GstBuffer *buffer;
218   GList *node;
219
220   jitterbuffer = setup_jitterbuffer (num_buffers);
221   fail_unless (start_jitterbuffer (jitterbuffer)
222       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
223
224   /* push buffers: 0,1,2, */
225   for (node = inbuffers; node; node = g_list_next (node)) {
226     buffer = (GstBuffer *) node->data;
227     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
228   }
229
230   /* check the buffer list */
231   check_jitterbuffer_results (jitterbuffer, num_buffers);
232
233   /* cleanup */
234   cleanup_jitterbuffer (jitterbuffer);
235 }
236
237 GST_END_TEST;
238
239 GST_START_TEST (test_push_backward_seq)
240 {
241   GstElement *jitterbuffer;
242   const guint num_buffers = 4;
243   GstBuffer *buffer;
244   GList *node;
245
246   jitterbuffer = setup_jitterbuffer (num_buffers);
247   fail_unless (start_jitterbuffer (jitterbuffer)
248       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
249
250   /* push buffers: 0,3,2,1 */
251   buffer = (GstBuffer *) inbuffers->data;
252   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
253   for (node = g_list_last (inbuffers); node != inbuffers;
254       node = g_list_previous (node)) {
255     buffer = (GstBuffer *) node->data;
256     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
257   }
258
259   /* check the buffer list */
260   check_jitterbuffer_results (jitterbuffer, num_buffers);
261
262   /* cleanup */
263   cleanup_jitterbuffer (jitterbuffer);
264 }
265
266 GST_END_TEST;
267
268 GST_START_TEST (test_push_unordered)
269 {
270   GstElement *jitterbuffer;
271   const guint num_buffers = 4;
272   GstBuffer *buffer;
273
274   jitterbuffer = setup_jitterbuffer (num_buffers);
275   fail_unless (start_jitterbuffer (jitterbuffer)
276       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
277
278   /* push buffers; 0,2,1,3 */
279   buffer = (GstBuffer *) inbuffers->data;
280   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
281   buffer = g_list_nth_data (inbuffers, 2);
282   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
283   buffer = g_list_nth_data (inbuffers, 1);
284   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
285   buffer = g_list_nth_data (inbuffers, 3);
286   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
287
288   /* check the buffer list */
289   check_jitterbuffer_results (jitterbuffer, num_buffers);
290
291   /* cleanup */
292   cleanup_jitterbuffer (jitterbuffer);
293 }
294
295 GST_END_TEST;
296
297 GST_START_TEST (test_basetime)
298 {
299   GstElement *jitterbuffer;
300   const guint num_buffers = 3;
301   GstBuffer *buffer;
302   GList *node;
303   GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000);
304
305   jitterbuffer = setup_jitterbuffer (num_buffers);
306   fail_unless (start_jitterbuffer (jitterbuffer)
307       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
308
309   /* push buffers: 2,1,0 */
310   for (node = g_list_last (inbuffers); node; node = g_list_previous (node)) {
311     buffer = (GstBuffer *) node->data;
312     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
313   }
314
315   /* sleep for twice the latency */
316   g_usleep (400 * 1000);
317
318   /* if this fails, not all buffers have been processed */
319   fail_unless_equals_int ((g_list_length (buffers) + num_dropped), num_buffers);
320
321   buffer = (GstBuffer *) buffers->data;
322   fail_unless (GST_BUFFER_DTS (buffer) != (num_buffers * tso));
323   fail_unless (GST_BUFFER_PTS (buffer) != (num_buffers * tso));
324
325   /* cleanup */
326   cleanup_jitterbuffer (jitterbuffer);
327 }
328
329 GST_END_TEST;
330
331 static GstCaps *
332 request_pt_map (GstElement * jitterbuffer, guint pt)
333 {
334   fail_unless (pt == 0);
335
336   return gst_caps_from_string (RTP_CAPS_STRING);
337 }
338
339 GST_START_TEST (test_clear_pt_map)
340 {
341   GstElement *jitterbuffer;
342   const guint num_buffers = 10;
343   gint i;
344   GstBuffer *buffer;
345   GList *node;
346
347   jitterbuffer = setup_jitterbuffer (num_buffers);
348   fail_unless (start_jitterbuffer (jitterbuffer)
349       == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
350
351   g_signal_connect (jitterbuffer, "request-pt-map", (GCallback)
352       request_pt_map, NULL);
353
354   /* push buffers: 0,1,2, */
355   for (node = inbuffers, i = 0; node && i < 3; node = g_list_next (node), i++) {
356     buffer = (GstBuffer *) node->data;
357     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
358   }
359
360   g_usleep (400 * 1000);
361
362   g_signal_emit_by_name (jitterbuffer, "clear-pt-map", NULL);
363
364   for (; node && i < 10; node = g_list_next (node), i++) {
365     buffer = (GstBuffer *) node->data;
366     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
367   }
368
369   /* check the buffer list */
370   check_jitterbuffer_results (jitterbuffer, num_buffers);
371
372   /* cleanup */
373   cleanup_jitterbuffer (jitterbuffer);
374 }
375
376 GST_END_TEST;
377 static const guint payload_size = 160;
378 static const guint clock_rate = 8000;
379 static const guint pcmu_payload_type = 0;
380 static const guint test_ssrc = 0x01BADBAD;
381
382 typedef struct
383 {
384   GstElement *jitter_buffer;
385   GstPad *test_sink_pad, *test_src_pad;
386   GstClock *clock;
387   GAsyncQueue *buf_queue;
388   GAsyncQueue *sink_event_queue;
389   GAsyncQueue *src_event_queue;
390   gint lost_event_count;
391   gint rtx_event_count;
392 } TestData;
393
394 static GstCaps *
395 generate_caps (void)
396 {
397   return gst_caps_new_simple ("application/x-rtp",
398       "media", G_TYPE_STRING, "audio",
399       "clock-rate", G_TYPE_INT, clock_rate,
400       "encoding-name", G_TYPE_STRING, "PCMU",
401       "payload", G_TYPE_INT, pcmu_payload_type,
402       "ssrc", G_TYPE_UINT, test_ssrc, NULL);
403 }
404
405 static GstBuffer *
406 generate_test_buffer (GstClockTime gst_ts,
407     gboolean marker_bit, guint seq_num, guint32 rtp_ts)
408 {
409   GstBuffer *buf;
410   guint8 *payload;
411   guint i;
412   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
413
414   buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0);
415   GST_BUFFER_DTS (buf) = gst_ts;
416   GST_BUFFER_PTS (buf) = gst_ts;
417
418   gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
419   gst_rtp_buffer_set_payload_type (&rtp, pcmu_payload_type);
420   gst_rtp_buffer_set_marker (&rtp, marker_bit);
421   gst_rtp_buffer_set_seq (&rtp, seq_num);
422   gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
423   gst_rtp_buffer_set_ssrc (&rtp, test_ssrc);
424
425   payload = gst_rtp_buffer_get_payload (&rtp);
426   for (i = 0; i < payload_size; i++)
427     payload[i] = 0xff;
428
429   gst_rtp_buffer_unmap (&rtp);
430
431   return buf;
432 }
433
434 static GstFlowReturn
435 test_sink_pad_chain_cb (GstPad * pad, GstObject * parent, GstBuffer * buffer)
436 {
437   TestData *data = gst_pad_get_element_private (pad);
438   g_async_queue_push (data->buf_queue, buffer);
439   return GST_FLOW_OK;
440 }
441
442 static gboolean
443 test_sink_pad_event_cb (GstPad * pad, GstObject * parent, GstEvent * event)
444 {
445   TestData *data = gst_pad_get_element_private (pad);
446   const GstStructure *structure = gst_event_get_structure (event);
447
448   GST_DEBUG ("got event %" GST_PTR_FORMAT, event);
449
450   if (strcmp (gst_structure_get_name (structure), "GstRTPPacketLost") == 0) {
451     data->lost_event_count++;
452     GST_DEBUG ("lost event count %d", data->lost_event_count);
453   }
454
455   g_async_queue_push (data->sink_event_queue, event);
456   return TRUE;
457 }
458
459 static gboolean
460 test_src_pad_event_cb (GstPad * pad, GstObject * parent, GstEvent * event)
461 {
462   TestData *data = gst_pad_get_element_private (pad);
463   const GstStructure *structure = gst_event_get_structure (event);
464
465   GST_DEBUG ("got event %" GST_PTR_FORMAT, event);
466
467   if (structure
468       && strcmp (gst_structure_get_name (structure),
469           "GstRTPRetransmissionRequest") == 0) {
470     data->rtx_event_count++;
471     GST_DEBUG ("rtx event count %d", data->rtx_event_count);
472   }
473
474   g_async_queue_push (data->src_event_queue, event);
475   return TRUE;
476 }
477
478 static void
479 setup_testharness (TestData * data)
480 {
481   GstPad *jb_sink_pad, *jb_src_pad;
482   GstSegment seg;
483   GstMiniObject *obj;
484
485   /* create the testclock */
486   data->clock = gst_test_clock_new ();
487   g_assert (data->clock);
488   gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), 0);
489
490   /* rig up the jitter buffer */
491   data->jitter_buffer = gst_element_factory_make ("rtpjitterbuffer", NULL);
492   g_assert (data->jitter_buffer);
493   gst_element_set_clock (data->jitter_buffer, data->clock);
494   g_object_set (data->jitter_buffer, "do-lost", TRUE, NULL);
495   g_assert_cmpint (gst_element_set_state (data->jitter_buffer,
496           GST_STATE_PLAYING), !=, GST_STATE_CHANGE_FAILURE);
497
498   /* set up the buf and event queues */
499   data->buf_queue =
500       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
501   data->sink_event_queue =
502       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
503   data->src_event_queue =
504       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
505
506   data->lost_event_count = 0;
507   data->rtx_event_count = 0;
508
509   /* link in the test source-pad */
510   data->test_src_pad = gst_pad_new ("src", GST_PAD_SRC);
511   gst_pad_set_element_private (data->test_src_pad, data);
512   gst_pad_set_event_function (data->test_src_pad, test_src_pad_event_cb);
513   jb_sink_pad = gst_element_get_static_pad (data->jitter_buffer, "sink");
514   g_assert_cmpint (gst_pad_link (data->test_src_pad, jb_sink_pad), ==,
515       GST_PAD_LINK_OK);
516   gst_object_unref (jb_sink_pad);
517
518   /* link in the test sink-pad */
519   data->test_sink_pad = gst_pad_new ("sink", GST_PAD_SINK);
520   gst_pad_set_element_private (data->test_sink_pad, data);
521   gst_pad_set_caps (data->test_sink_pad, generate_caps ());
522   gst_pad_set_chain_function (data->test_sink_pad, test_sink_pad_chain_cb);
523   gst_pad_set_event_function (data->test_sink_pad, test_sink_pad_event_cb);
524   jb_src_pad = gst_element_get_static_pad (data->jitter_buffer, "src");
525   g_assert_cmpint (gst_pad_link (jb_src_pad, data->test_sink_pad), ==,
526       GST_PAD_LINK_OK);
527   gst_object_unref (jb_src_pad);
528
529   g_assert (gst_pad_set_active (data->test_src_pad, TRUE));
530   g_assert (gst_pad_set_active (data->test_sink_pad, TRUE));
531
532   gst_segment_init (&seg, GST_FORMAT_TIME);
533
534   gst_pad_push_event (data->test_src_pad,
535       gst_event_new_stream_start ("stream0"));
536   gst_pad_set_caps (data->test_src_pad, generate_caps ());
537   gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg));
538
539   obj = g_async_queue_pop (data->sink_event_queue);
540   gst_mini_object_unref (obj);
541   obj = g_async_queue_pop (data->sink_event_queue);
542   gst_mini_object_unref (obj);
543   obj = g_async_queue_pop (data->sink_event_queue);
544   gst_mini_object_unref (obj);
545 }
546
547 static void
548 destroy_testharness (TestData * data)
549 {
550   /* clean up */
551   g_assert_cmpint (gst_element_set_state (data->jitter_buffer, GST_STATE_NULL),
552       ==, GST_STATE_CHANGE_SUCCESS);
553   gst_object_unref (data->jitter_buffer);
554   data->jitter_buffer = NULL;
555
556   gst_object_unref (data->test_src_pad);
557   data->test_src_pad = NULL;
558
559   gst_object_unref (data->test_sink_pad);
560   data->test_sink_pad = NULL;
561
562   gst_object_unref (data->clock);
563   data->clock = NULL;
564
565   g_async_queue_unref (data->buf_queue);
566   data->buf_queue = NULL;
567
568   g_async_queue_unref (data->sink_event_queue);
569   data->sink_event_queue = NULL;
570   g_async_queue_unref (data->src_event_queue);
571   data->src_event_queue = NULL;
572
573   data->lost_event_count = 0;
574 }
575
576 static void
577 verify_lost_event (GstEvent * event, guint32 expected_seqnum,
578     GstClockTime expected_timestamp, GstClockTime expected_duration,
579     gboolean expected_late)
580 {
581   const GstStructure *s = gst_event_get_structure (event);
582   const GValue *value;
583   guint32 seqnum;
584   GstClockTime timestamp;
585   GstClockTime duration;
586   gboolean late;
587
588   g_assert (gst_structure_get_uint (s, "seqnum", &seqnum));
589
590   value = gst_structure_get_value (s, "timestamp");
591   g_assert (value && G_VALUE_HOLDS_UINT64 (value));
592   timestamp = g_value_get_uint64 (value);
593
594   value = gst_structure_get_value (s, "duration");
595   g_assert (value && G_VALUE_HOLDS_UINT64 (value));
596   duration = g_value_get_uint64 (value);
597
598   g_assert (gst_structure_get_boolean (s, "late", &late));
599
600   g_assert_cmpint (seqnum, ==, expected_seqnum);
601   g_assert_cmpint (timestamp, ==, expected_timestamp);
602   g_assert_cmpint (duration, ==, expected_duration);
603   g_assert (late == expected_late);
604 }
605
606 static void
607 verify_rtx_event (GstEvent * event, guint32 expected_seqnum,
608     GstClockTime expected_timestamp, guint expected_delay,
609     GstClockTime expected_spacing)
610 {
611   const GstStructure *s = gst_event_get_structure (event);
612   const GValue *value;
613   guint32 seqnum;
614   GstClockTime timestamp, spacing;
615   guint delay;
616
617   g_assert (gst_structure_get_uint (s, "seqnum", &seqnum));
618
619   value = gst_structure_get_value (s, "running-time");
620   g_assert (value && G_VALUE_HOLDS_UINT64 (value));
621   timestamp = g_value_get_uint64 (value);
622
623   value = gst_structure_get_value (s, "delay");
624   g_assert (value && G_VALUE_HOLDS_UINT (value));
625   delay = g_value_get_uint (value);
626
627   value = gst_structure_get_value (s, "packet-spacing");
628   g_assert (value && G_VALUE_HOLDS_UINT64 (value));
629   spacing = g_value_get_uint64 (value);
630
631   g_assert_cmpint (seqnum, ==, expected_seqnum);
632   g_assert_cmpint (timestamp, ==, expected_timestamp);
633   g_assert_cmpint (delay, ==, expected_delay);
634   g_assert_cmpint (spacing, ==, expected_spacing);
635 }
636
637 GST_START_TEST (test_only_one_lost_event_on_large_gaps)
638 {
639   TestData data;
640   GstClockID id, test_id;
641   GstBuffer *in_buf, *out_buf;
642   GstEvent *out_event;
643   gint jb_latency_ms = 200;
644   guint buffer_size_ms = (payload_size * 1000) / clock_rate;
645   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
646
647   setup_testharness (&data);
648
649   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
650
651   /* push the first buffer in */
652   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
653   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
654   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
655
656   /* wait for the first buffer to be synced to timestamp + latency */
657   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
658
659   /* increase the time to timestamp + latency and release the wait */
660   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
661       jb_latency_ms * GST_MSECOND);
662   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
663       == id);
664
665   /* check for the buffer coming out that was pushed in */
666   out_buf = g_async_queue_pop (data.buf_queue);
667   g_assert (out_buf != NULL);
668   g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, 0);
669   g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, 0);
670
671   /* move time ahead 10 seconds */
672   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_SECOND);
673
674   /* wait a bit */
675   g_usleep (G_USEC_PER_SEC / 10);
676
677   /* check that no buffers have been pushed out and no pending waits */
678   g_assert_cmpint (g_async_queue_length (data.buf_queue), ==, 0);
679   g_assert (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock),
680           &id) == FALSE);
681
682   /* a buffer now arrives perfectly on time */
683   in_buf = generate_test_buffer (10 * GST_SECOND, FALSE, 500, 500 * 160);
684   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_SECOND);
685   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
686
687   /* release the wait */
688   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
689   gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock), GST_MSECOND * 20);
690   test_id = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
691   g_assert (id == test_id);
692
693   /* we should now receive a packet-lost-event for buffers 1 through 489 */
694   out_event = g_async_queue_pop (data.sink_event_queue);
695   g_assert (out_event != NULL);
696   g_assert_cmpint (data.lost_event_count, ==, 1);
697   verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 490,
698       TRUE);
699
700   /* churn through sync_times until the new buffer gets pushed out */
701   while (g_async_queue_length (data.buf_queue) < 1) {
702     if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
703       GstClockTime t = gst_clock_id_get_time (id);
704       if (t > gst_clock_get_time (data.clock)) {
705         gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
706       }
707       gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
708     }
709   }
710
711   out_buf = g_async_queue_pop (data.buf_queue);
712   g_assert (out_buf != NULL);
713   g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
714   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
715   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 500);
716   gst_rtp_buffer_unmap (&rtp);
717   g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, (10 * GST_SECOND));
718   g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, (10 * GST_SECOND));
719
720   /* we get as many lost events as the the number of buffers the jitterbuffer
721    * is able to wait for (+ the one we already got) */
722   g_assert_cmpint (data.lost_event_count, ==, jb_latency_ms / buffer_size_ms);
723
724   destroy_testharness (&data);
725 }
726
727 GST_END_TEST;
728
729 GST_START_TEST (test_two_lost_one_arrives_in_time)
730 {
731   TestData data;
732   GstClockID id;
733   GstBuffer *in_buf, *out_buf;
734   GstEvent *out_event;
735   gint jb_latency_ms = 100;
736   GstClockTime buffer_time, now;
737   gint b;
738   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
739
740   setup_testharness (&data);
741
742   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
743
744   /* push the first buffer in */
745   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
746   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
747   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
748   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
749   now = jb_latency_ms * GST_MSECOND;
750   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), now);
751   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
752       == id);
753   out_buf = g_async_queue_pop (data.buf_queue);
754   g_assert (out_buf != NULL);
755
756   /* push some buffers arriving in perfect time! */
757   for (b = 1; b < 3; b++) {
758     buffer_time = b * GST_MSECOND * 20;
759     in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
760     gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), now + buffer_time);
761     g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
762
763     /* check for the buffer coming out that was pushed in */
764     out_buf = g_async_queue_pop (data.buf_queue);
765     g_assert (out_buf != NULL);
766     g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, buffer_time);
767     g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, buffer_time);
768   }
769
770   /* hop over 2 packets and make another one (gap of 2) */
771   b = 5;
772   buffer_time = b * GST_MSECOND * 20;
773   in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
774   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
775
776   /* verify that the jitterbuffer now wait for the latest moment it can push */
777   /* the first lost buffer (buffer 3) out on (buffer-timestamp (60) + latency (10) = 70) */
778   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
779   g_assert_cmpint (gst_clock_id_get_time (id), ==,
780       (3 * GST_MSECOND * 20) + (jb_latency_ms * GST_MSECOND));
781
782   /* let the time expire... */
783   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
784       gst_clock_id_get_time (id));
785   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
786       == id);
787
788   /* we should now receive a packet-lost-event for buffer 3 */
789   out_event = g_async_queue_pop (data.sink_event_queue);
790   g_assert (out_event != NULL);
791   g_assert_cmpint (data.lost_event_count, ==, 1);
792   verify_lost_event (out_event, 3, 3 * GST_MSECOND * 20, GST_MSECOND * 20,
793       FALSE);
794
795   /* buffer 4 now arrives just in time (time is 70, buffer 4 expires at 90) */
796   b = 4;
797   buffer_time = b * GST_MSECOND * 20;
798   in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
799   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
800
801   /* verify that buffer 4 made it through! */
802   out_buf = g_async_queue_pop (data.buf_queue);
803   g_assert (out_buf != NULL);
804   g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
805   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
806   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 4);
807   gst_rtp_buffer_unmap (&rtp);
808
809   /* and see that buffer 5 now arrives in a normal fashion */
810   out_buf = g_async_queue_pop (data.buf_queue);
811   g_assert (out_buf != NULL);
812   g_assert (!GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
813   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
814   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 5);
815   gst_rtp_buffer_unmap (&rtp);
816
817   /* should still have only seen 1 packet lost event */
818   g_assert_cmpint (data.lost_event_count, ==, 1);
819
820   destroy_testharness (&data);
821 }
822
823 GST_END_TEST;
824
825 GST_START_TEST (test_late_packets_still_makes_lost_events)
826 {
827   TestData data;
828   GstClockID id;
829   GstBuffer *in_buf, *out_buf;
830   GstEvent *out_event;
831   gint jb_latency_ms = 10;
832   GstClockTime buffer_time;
833   gint b;
834   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
835
836   setup_testharness (&data);
837
838   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
839
840   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_SECOND);
841
842   /* push the first buffer in */
843   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
844   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
845
846   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
847   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
848       == id);
849   out_buf = g_async_queue_pop (data.buf_queue);
850   g_assert (out_buf != NULL);
851
852   /* push some buffers in! */
853   for (b = 1; b < 3; b++) {
854     buffer_time = b * GST_MSECOND * 20;
855     in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
856     g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
857
858     /* check for the buffer coming out that was pushed in */
859     out_buf = g_async_queue_pop (data.buf_queue);
860     g_assert (out_buf != NULL);
861     g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, buffer_time);
862     g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, buffer_time);
863   }
864
865   /* hop over 2 packets and make another one (gap of 2) */
866   b = 5;
867   buffer_time = b * GST_MSECOND * 20;
868   in_buf = generate_test_buffer (buffer_time, TRUE, b, b * 160);
869   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
870
871   /* we should now receive a packet-lost-event for buffer 3 and 4 */
872   out_event = g_async_queue_pop (data.sink_event_queue);
873   g_assert (out_event != NULL);
874   g_assert_cmpint (data.lost_event_count, ==, 1);
875   verify_lost_event (out_event, 3, 3 * GST_MSECOND * 20, GST_MSECOND * 20 * 2,
876       TRUE);
877
878   /* verify that buffer 5 made it through! */
879   out_buf = g_async_queue_pop (data.buf_queue);
880   g_assert (out_buf != NULL);
881   g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
882   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
883   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 5);
884   gst_rtp_buffer_unmap (&rtp);
885
886   /* should still have only seen 1 packet lost event */
887   g_assert_cmpint (data.lost_event_count, ==, 1);
888
889   destroy_testharness (&data);
890 }
891
892 GST_END_TEST;
893
894 GST_START_TEST (test_all_packets_are_timestamped_zero)
895 {
896   TestData data;
897   GstClockID id;
898   GstBuffer *in_buf, *out_buf;
899   GstEvent *out_event;
900   gint jb_latency_ms = 10;
901   gint b;
902   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
903
904   setup_testharness (&data);
905
906   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
907
908   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 10 * GST_SECOND);
909
910   /* push the first buffer in */
911   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
912   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
913
914   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
915   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
916       == id);
917   out_buf = g_async_queue_pop (data.buf_queue);
918   g_assert (out_buf != NULL);
919
920   /* push some buffers in! */
921   for (b = 1; b < 3; b++) {
922     in_buf = generate_test_buffer (0, TRUE, b, 0);
923     g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
924
925     /* check for the buffer coming out that was pushed in */
926     out_buf = g_async_queue_pop (data.buf_queue);
927     g_assert (out_buf != NULL);
928     g_assert_cmpint (GST_BUFFER_DTS (out_buf), ==, 0);
929     g_assert_cmpint (GST_BUFFER_PTS (out_buf), ==, 0);
930   }
931
932   /* hop over 2 packets and make another one (gap of 2) */
933   b = 5;
934   in_buf = generate_test_buffer (0, TRUE, b, 0);
935   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
936
937   /* we should now receive a packet-lost-event for buffer 3 and 4 */
938   out_event = g_async_queue_pop (data.sink_event_queue);
939   g_assert (out_event != NULL);
940   verify_lost_event (out_event, 3, 0, 0, FALSE);
941
942   out_event = g_async_queue_pop (data.sink_event_queue);
943   g_assert (out_event != NULL);
944   verify_lost_event (out_event, 4, 0, 0, FALSE);
945
946   g_assert_cmpint (data.lost_event_count, ==, 2);
947
948   /* verify that buffer 5 made it through! */
949   out_buf = g_async_queue_pop (data.buf_queue);
950   g_assert (out_buf != NULL);
951   g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
952   gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
953   g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 5);
954   gst_rtp_buffer_unmap (&rtp);
955
956   /* should still have only seen 2 packet lost events */
957   g_assert_cmpint (data.lost_event_count, ==, 2);
958
959   destroy_testharness (&data);
960 }
961
962 GST_END_TEST;
963
964 GST_START_TEST (test_rtx_expected_next)
965 {
966   TestData data;
967   GstClockID id, tid;
968   GstBuffer *in_buf, *out_buf;
969   GstEvent *out_event;
970   gint jb_latency_ms = 200;
971
972   setup_testharness (&data);
973   g_object_set (data.jitter_buffer, "do-retransmission", TRUE, NULL);
974   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
975   g_object_set (data.jitter_buffer, "rtx-retry-period", 120, NULL);
976
977   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
978
979   /* push the first buffer in */
980   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
981   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
982
983   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 20 * GST_MSECOND);
984
985   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
986
987   /* put second buffer, the jitterbuffer should now know that the packet spacing
988    * is 20ms and should ask for retransmission of seqnum 2 in 20ms */
989   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 1, 160);
990   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
991
992   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
993   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 60 * GST_MSECOND);
994   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
995       == id);
996
997   out_event = g_async_queue_pop (data.src_event_queue);
998   g_assert (out_event != NULL);
999   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 20, 20 * GST_MSECOND);
1000
1001   /* now we wait for the next timeout */
1002   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1003   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 100 * GST_MSECOND);
1004   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1005   g_assert (id == tid);
1006
1007   out_event = g_async_queue_pop (data.src_event_queue);
1008   g_assert (out_event != NULL);
1009   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 60, 20 * GST_MSECOND);
1010
1011   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1012   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 140 * GST_MSECOND);
1013   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1014   g_assert (id == tid);
1015
1016   out_event = g_async_queue_pop (data.src_event_queue);
1017   g_assert (out_event != NULL);
1018   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 100, 20 * GST_MSECOND);
1019
1020   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1021   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 200 * GST_MSECOND);
1022   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1023   g_assert (id == tid);
1024
1025   out_buf = g_async_queue_pop (data.buf_queue);
1026   g_assert (out_buf != NULL);
1027
1028
1029   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1030   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 260 * GST_MSECOND);
1031   g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
1032       == id);
1033
1034   /* we should now receive a packet-lost-event for buffer 2 */
1035   out_event = g_async_queue_pop (data.sink_event_queue);
1036   g_assert (out_event != NULL);
1037   verify_lost_event (out_event, 2, 40 * GST_MSECOND, 20 * GST_MSECOND, FALSE);
1038
1039   destroy_testharness (&data);
1040 }
1041
1042 GST_END_TEST;
1043
1044 GST_START_TEST (test_rtx_two_missing)
1045 {
1046   TestData data;
1047   GstClockID id, tid;
1048   GstBuffer *in_buf, *out_buf;
1049   GstEvent *out_event;
1050   gint jb_latency_ms = 200;
1051   gint i;
1052   GstStructure *rtx_stats;
1053   const GValue *rtx_stat;
1054   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1055
1056   setup_testharness (&data);
1057   g_object_set (data.jitter_buffer, "do-retransmission", TRUE, NULL);
1058   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
1059   g_object_set (data.jitter_buffer, "rtx-retry-period", 120, NULL);
1060
1061   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
1062
1063   /* push the first buffer in */
1064   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
1065   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1066
1067   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 20 * GST_MSECOND);
1068
1069   /* put second buffer, the jitterbuffer should now know that the packet spacing
1070    * is 20ms and should ask for retransmission of seqnum 2 at 60ms */
1071   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 1, 160);
1072   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1073
1074   /* push buffer 4, 2 and 3 are missing now, we should get retransmission events
1075    * for 3 at 100ms*/
1076   in_buf = generate_test_buffer (80 * GST_MSECOND, TRUE, 4, 4 * 160);
1077   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1078
1079   /* wait for first retransmission request */
1080   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 60 * GST_MSECOND);
1081   do {
1082     gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1083   } while (id !=
1084       gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)));
1085
1086   /* we should have 2 events now, one for 2 and another for 3 */
1087   out_event = g_async_queue_pop (data.src_event_queue);
1088   g_assert (out_event != NULL);
1089   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 20, 20 * GST_MSECOND);
1090   out_event = g_async_queue_pop (data.src_event_queue);
1091   g_assert (out_event != NULL);
1092   verify_rtx_event (out_event, 3, 60 * GST_MSECOND, 0, 20 * GST_MSECOND);
1093
1094   /* now we wait for the next timeout */
1095   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1096   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 100 * GST_MSECOND);
1097   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1098   g_assert (id == tid);
1099
1100   /* we should have 2 events now, one for 2 and another for 3 */
1101   out_event = g_async_queue_pop (data.src_event_queue);
1102   g_assert (out_event != NULL);
1103   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 60, 20 * GST_MSECOND);
1104   out_event = g_async_queue_pop (data.src_event_queue);
1105   g_assert (out_event != NULL);
1106   verify_rtx_event (out_event, 3, 60 * GST_MSECOND, 40, 20 * GST_MSECOND);
1107
1108   /* make buffer 3 */
1109   in_buf = generate_test_buffer (60 * GST_MSECOND, TRUE, 3, 3 * 160);
1110   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1111
1112   /* make more buffers */
1113   for (i = 5; i < 15; i++) {
1114     in_buf = generate_test_buffer (i * 20 * GST_MSECOND, TRUE, i, i * 160);
1115     g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1116   }
1117
1118   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1119   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 140 * GST_MSECOND);
1120   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1121   g_assert (id == tid);
1122
1123   /* now we only get requests for 2 */
1124   out_event = g_async_queue_pop (data.src_event_queue);
1125   g_assert (out_event != NULL);
1126   verify_rtx_event (out_event, 2, 40 * GST_MSECOND, 100, 20 * GST_MSECOND);
1127
1128   /* this is when buffer 0 deadline expires */
1129   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1130   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 200 * GST_MSECOND);
1131   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1132   g_assert (id == tid);
1133
1134   for (i = 0; i < 2; i++) {
1135     GST_DEBUG ("popping %d", i);
1136     out_buf = g_async_queue_pop (data.buf_queue);
1137     g_assert (out_buf != NULL);
1138     gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
1139     g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, i);
1140     gst_rtp_buffer_unmap (&rtp);
1141   }
1142
1143   /* this is when 2 is lost */
1144   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1145   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 240 * GST_MSECOND);
1146   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1147   g_assert (id == tid);
1148
1149   /* we should now receive a packet-lost-event for buffer 2 */
1150   out_event = g_async_queue_pop (data.sink_event_queue);
1151   g_assert (out_event != NULL);
1152   verify_lost_event (out_event, 2, 40 * GST_MSECOND, 20 * GST_MSECOND, FALSE);
1153
1154   /* verify that buffers made it through! */
1155   for (i = 3; i < 15; i++) {
1156     GST_DEBUG ("popping %d", i);
1157     out_buf = g_async_queue_pop (data.buf_queue);
1158     g_assert (out_buf != NULL);
1159     gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
1160     g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, i);
1161     gst_rtp_buffer_unmap (&rtp);
1162   }
1163   /* should still have only seen 1 packet lost events */
1164   g_assert_cmpint (data.lost_event_count, ==, 1);
1165
1166   g_object_get (data.jitter_buffer, "stats", &rtx_stats, NULL);
1167
1168   rtx_stat = gst_structure_get_value (rtx_stats, "rtx-count");
1169   g_assert_cmpuint (g_value_get_uint64 (rtx_stat), ==, 5);
1170
1171   rtx_stat = gst_structure_get_value (rtx_stats, "rtx-success-count");
1172   g_assert_cmpuint (g_value_get_uint64 (rtx_stat), ==, 1);
1173
1174   rtx_stat = gst_structure_get_value (rtx_stats, "rtx-rtt");
1175   g_assert_cmpuint (g_value_get_uint64 (rtx_stat), ==, 0);
1176
1177   destroy_testharness (&data);
1178 }
1179
1180 GST_END_TEST;
1181
1182 GST_START_TEST (test_rtx_packet_delay)
1183 {
1184   TestData data;
1185   GstClockID id, tid;
1186   GstBuffer *in_buf, *out_buf;
1187   GstEvent *out_event;
1188   gint jb_latency_ms = 200;
1189   gint i;
1190   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1191
1192   setup_testharness (&data);
1193   g_object_set (data.jitter_buffer, "do-retransmission", TRUE, NULL);
1194   g_object_set (data.jitter_buffer, "latency", jb_latency_ms, NULL);
1195   g_object_set (data.jitter_buffer, "rtx-retry-period", 120, NULL);
1196
1197   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 0);
1198
1199   /* push the first buffer in */
1200   in_buf = generate_test_buffer (0 * GST_MSECOND, TRUE, 0, 0);
1201   GST_BUFFER_FLAG_SET (in_buf, GST_BUFFER_FLAG_DISCONT);
1202   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1203
1204   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 20 * GST_MSECOND);
1205
1206   /* put second buffer, the jitterbuffer should now know that the packet spacing
1207    * is 20ms and should ask for retransmission of seqnum 2 at 60ms */
1208   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 1, 160);
1209   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1210
1211   /* push buffer 8, 2 -> 7 are missing now. note that the rtp time is the same
1212    * as packet 1 because it was part of a fragmented payload. This means that
1213    * the estimate for 2 could be refined now to 20ms. also packet 2, 3 and 4 are
1214    * exceeding the max allowed reorder distance and should request a
1215    * retransmission right away */
1216   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 8, 8 * 160);
1217   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1218
1219   /* we should now receive retransmission requests for 2 -> 5 */
1220   out_event = g_async_queue_pop (data.src_event_queue);
1221   g_assert (out_event != NULL);
1222   verify_rtx_event (out_event, 2, 20 * GST_MSECOND, 40, 20 * GST_MSECOND);
1223
1224   for (i = 3; i < 5; i++) {
1225     GST_DEBUG ("popping %d", i);
1226     out_event = g_async_queue_pop (data.src_event_queue);
1227     g_assert (out_event != NULL);
1228     verify_rtx_event (out_event, i, 20 * GST_MSECOND, 0, 20 * GST_MSECOND);
1229   }
1230   g_assert_cmpint (data.rtx_event_count, ==, 3);
1231
1232   /* push 9, this should immediately request retransmission of 5 */
1233   in_buf = generate_test_buffer (20 * GST_MSECOND, TRUE, 9, 9 * 160);
1234   g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
1235
1236   /* we should now receive retransmission requests for 5 */
1237   out_event = g_async_queue_pop (data.src_event_queue);
1238   g_assert (out_event != NULL);
1239   verify_rtx_event (out_event, 5, 20 * GST_MSECOND, 0, 20 * GST_MSECOND);
1240
1241   /* wait for timeout for rtx 6 -> 7 */
1242   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1243   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1244   g_assert (id == tid);
1245
1246   for (i = 6; i < 8; i++) {
1247     GST_DEBUG ("popping %d", i);
1248     out_event = g_async_queue_pop (data.src_event_queue);
1249     g_assert (out_event != NULL);
1250     verify_rtx_event (out_event, i, 20 * GST_MSECOND, 0, 20 * GST_MSECOND);
1251   }
1252
1253   /* churn through sync_times until the new buffer gets pushed out */
1254   while (g_async_queue_length (data.buf_queue) < 1) {
1255     if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
1256       GstClockTime t = gst_clock_id_get_time (id);
1257       if (t >= 240 * GST_MSECOND)
1258         break;
1259       if (t > gst_clock_get_time (data.clock)) {
1260         gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
1261       }
1262       gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1263     }
1264   }
1265
1266   /* verify that buffer 0 and 1 made it through! */
1267   for (i = 0; i < 2; i++) {
1268     out_buf = g_async_queue_pop (data.buf_queue);
1269     g_assert (out_buf != NULL);
1270     if (i == 0)
1271       g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
1272     gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
1273     g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, i);
1274     gst_rtp_buffer_unmap (&rtp);
1275   }
1276
1277   /* churn through sync_times until the next buffer gets pushed out */
1278   while (g_async_queue_length (data.buf_queue) < 1) {
1279     if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
1280       GstClockTime t = gst_clock_id_get_time (id);
1281       if (t >= 240 * GST_MSECOND)
1282         break;
1283       if (t > gst_clock_get_time (data.clock)) {
1284         gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
1285       }
1286       gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1287     }
1288   }
1289
1290   for (i = 2; i < 8; i++) {
1291     GST_DEBUG ("popping lost event %d", i);
1292     out_event = g_async_queue_pop (data.sink_event_queue);
1293     g_assert (out_event != NULL);
1294     verify_lost_event (out_event, i, 20 * GST_MSECOND, 0, FALSE);
1295   }
1296
1297   /* verify that buffer 8 made it through! */
1298   for (i = 8; i < 10; i++) {
1299     GST_DEBUG ("popping buffer %d", i);
1300     out_buf = g_async_queue_pop (data.buf_queue);
1301     g_assert (out_buf != NULL);
1302     if (i == 8)
1303       g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
1304     gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
1305     g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, i);
1306     gst_rtp_buffer_unmap (&rtp);
1307   }
1308
1309   GST_DEBUG ("waiting for 240ms");
1310   gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
1311   gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), 240 * GST_MSECOND);
1312   tid = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
1313   g_assert (id == tid);
1314
1315   GST_DEBUG ("popping lost event 10");
1316   out_event = g_async_queue_pop (data.sink_event_queue);
1317   g_assert (out_event != NULL);
1318   verify_lost_event (out_event, 10, 40 * GST_MSECOND, 20 * GST_MSECOND, FALSE);
1319
1320   /* should have seen 6 packet lost events */
1321   g_assert_cmpint (data.lost_event_count, ==, 7);
1322   g_assert_cmpint (data.rtx_event_count, ==, 26);
1323
1324   destroy_testharness (&data);
1325 }
1326
1327 GST_END_TEST;
1328
1329 static Suite *
1330 rtpjitterbuffer_suite (void)
1331 {
1332   Suite *s = suite_create ("rtpjitterbuffer");
1333   TCase *tc_chain = tcase_create ("general");
1334
1335   suite_add_tcase (s, tc_chain);
1336   tcase_add_test (tc_chain, test_push_forward_seq);
1337   tcase_add_test (tc_chain, test_push_backward_seq);
1338   tcase_add_test (tc_chain, test_push_unordered);
1339   tcase_add_test (tc_chain, test_basetime);
1340   tcase_add_test (tc_chain, test_clear_pt_map);
1341   tcase_add_test (tc_chain, test_only_one_lost_event_on_large_gaps);
1342   tcase_add_test (tc_chain, test_two_lost_one_arrives_in_time);
1343   tcase_add_test (tc_chain, test_late_packets_still_makes_lost_events);
1344   tcase_add_test (tc_chain, test_all_packets_are_timestamped_zero);
1345   tcase_add_test (tc_chain, test_rtx_expected_next);
1346   tcase_add_test (tc_chain, test_rtx_two_missing);
1347   tcase_add_test (tc_chain, test_rtx_packet_delay);
1348
1349   return s;
1350 }
1351
1352 GST_CHECK_MAIN (rtpjitterbuffer);