Merge commit 'f9207722ca8fd8dcc1e7215d8af85efe4debfdf4' into 0.11
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsource.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19 #include <string.h>
20
21 #include <gst/rtp/gstrtpbuffer.h>
22 #include <gst/rtp/gstrtcpbuffer.h>
23
24 #include "rtpsource.h"
25
26 GST_DEBUG_CATEGORY_STATIC (rtp_source_debug);
27 #define GST_CAT_DEFAULT rtp_source_debug
28
29 #define RTP_MAX_PROBATION_LEN  32
30
31 /* signals and args */
32 enum
33 {
34   LAST_SIGNAL
35 };
36
37 #define DEFAULT_SSRC                 0
38 #define DEFAULT_IS_CSRC              FALSE
39 #define DEFAULT_IS_VALIDATED         FALSE
40 #define DEFAULT_IS_SENDER            FALSE
41 #define DEFAULT_SDES                 NULL
42
43 enum
44 {
45   PROP_0,
46   PROP_SSRC,
47   PROP_IS_CSRC,
48   PROP_IS_VALIDATED,
49   PROP_IS_SENDER,
50   PROP_SDES,
51   PROP_STATS,
52   PROP_LAST
53 };
54
55 /* GObject vmethods */
56 static void rtp_source_finalize (GObject * object);
57 static void rtp_source_set_property (GObject * object, guint prop_id,
58     const GValue * value, GParamSpec * pspec);
59 static void rtp_source_get_property (GObject * object, guint prop_id,
60     GValue * value, GParamSpec * pspec);
61
62 /* static guint rtp_source_signals[LAST_SIGNAL] = { 0 }; */
63
64 G_DEFINE_TYPE (RTPSource, rtp_source, G_TYPE_OBJECT);
65
66 static void
67 rtp_source_class_init (RTPSourceClass * klass)
68 {
69   GObjectClass *gobject_class;
70
71   gobject_class = (GObjectClass *) klass;
72
73   gobject_class->finalize = rtp_source_finalize;
74
75   gobject_class->set_property = rtp_source_set_property;
76   gobject_class->get_property = rtp_source_get_property;
77
78   g_object_class_install_property (gobject_class, PROP_SSRC,
79       g_param_spec_uint ("ssrc", "SSRC",
80           "The SSRC of this source", 0, G_MAXUINT, DEFAULT_SSRC,
81           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
82
83   g_object_class_install_property (gobject_class, PROP_IS_CSRC,
84       g_param_spec_boolean ("is-csrc", "Is CSRC",
85           "If this SSRC is acting as a contributing source",
86           DEFAULT_IS_CSRC, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
87
88   g_object_class_install_property (gobject_class, PROP_IS_VALIDATED,
89       g_param_spec_boolean ("is-validated", "Is Validated",
90           "If this SSRC is validated", DEFAULT_IS_VALIDATED,
91           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
92
93   g_object_class_install_property (gobject_class, PROP_IS_SENDER,
94       g_param_spec_boolean ("is-sender", "Is Sender",
95           "If this SSRC is a sender", DEFAULT_IS_SENDER,
96           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
97
98   /**
99    * RTPSource::sdes
100    *
101    * The current SDES items of the source. Returns a structure with name
102    * application/x-rtp-source-sdes and may contain the following fields:
103    *
104    *  'cname'       G_TYPE_STRING  : The canonical name
105    *  'name'        G_TYPE_STRING  : The user name
106    *  'email'       G_TYPE_STRING  : The user's electronic mail address
107    *  'phone'       G_TYPE_STRING  : The user's phone number
108    *  'location'    G_TYPE_STRING  : The geographic user location
109    *  'tool'        G_TYPE_STRING  : The name of application or tool
110    *  'note'        G_TYPE_STRING  : A notice about the source
111    *
112    *  other fields may be present and these represent private items in
113    *  the SDES where the field name is the prefix.
114    */
115   g_object_class_install_property (gobject_class, PROP_SDES,
116       g_param_spec_boxed ("sdes", "SDES",
117           "The SDES information for this source",
118           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
119
120   /**
121    * RTPSource::stats
122    *
123    * The statistics of the source. This property returns a GstStructure with
124    * name application/x-rtp-source-stats with the following fields:
125    *
126    *  "ssrc"         G_TYPE_UINT     The SSRC of this source
127    *  "internal"     G_TYPE_BOOLEAN  If this source is the source of the session
128    *  "validated"    G_TYPE_BOOLEAN  If the source is validated
129    *  "received-bye" G_TYPE_BOOLEAN  If we received a BYE from this source
130    *  "is-csrc"      G_TYPE_BOOLEAN  If this source was found as CSRC
131    *  "is-sender"    G_TYPE_BOOLEAN  If this source is a sender
132    *  "seqnum-base"  G_TYPE_INT      first seqnum if known
133    *  "clock-rate"   G_TYPE_INT      the clock rate of the media
134    *
135    * The following two fields are only present when known.
136    *
137    *  "rtp-from"     G_TYPE_STRING   where we received the last RTP packet from
138    *  "rtcp-from"    G_TYPE_STRING   where we received the last RTCP packet from
139    *
140    * The following fields make sense for internal sources and will only increase
141    * when "is-sender" is TRUE:
142    *
143    *  "octets-sent"  G_TYPE_UINT64   number of bytes we sent
144    *  "packets-sent" G_TYPE_UINT64   number of packets we sent
145    *
146    * The following fields make sense for non-internal sources and will only
147    * increase when "is-sender" is TRUE.
148    *
149    *  "octets-received"  G_TYPE_UINT64  total number of bytes received
150    *  "packets-received" G_TYPE_UINT64  total number of packets received
151    *
152    * Following fields are updated when "is-sender" is TRUE.
153    *
154    *  "bitrate"      G_TYPE_UINT64   bitrate in bits per second
155    *  "jitter"       G_TYPE_UINT     estimated jitter
156    *  "packets-lost" G_TYPE_INT      estimated amount of packets lost
157    *
158    * The last SR report this source sent. This only updates when "is-sender" is
159    * TRUE.
160    *
161    *  "have-sr"         G_TYPE_BOOLEAN  the source has sent SR
162    *  "sr-ntptime"      G_TYPE_UINT64   ntptime of SR
163    *  "sr-rtptime"      G_TYPE_UINT     rtptime of SR
164    *  "sr-octet-count"  G_TYPE_UINT     the number of bytes in the SR
165    *  "sr-packet-count" G_TYPE_UINT     the number of packets in the SR
166    *
167    * The following fields are only present for non-internal sources and
168    * represent the content of the last RB packet that was sent to this source.
169    * These values are only updated when the source is sending.
170    *
171    *  "sent-rb"               G_TYPE_BOOLEAN  we have sent an RB
172    *  "sent-rb-fractionlost"  G_TYPE_UINT     calculated lost fraction
173    *  "sent-rb-packetslost"   G_TYPE_INT      lost packets
174    *  "sent-rb-exthighestseq" G_TYPE_UINT     last seen seqnum
175    *  "sent-rb-jitter"        G_TYPE_UINT     jitter
176    *  "sent-rb-lsr"           G_TYPE_UINT     last SR time
177    *  "sent-rb-dlsr"          G_TYPE_UINT     delay since last SR
178    *
179    * The following fields are only present for non-internal sources and
180    * represents the last RB that this source sent. This is only updated
181    * when the source is receiving data and sending RB blocks.
182    *
183    *  "have-rb"          G_TYPE_BOOLEAN  the source has sent RB
184    *  "rb-fractionlost"  G_TYPE_UINT     lost fraction
185    *  "rb-packetslost"   G_TYPE_INT      lost packets
186    *  "rb-exthighestseq" G_TYPE_UINT     highest received seqnum
187    *  "rb-jitter"        G_TYPE_UINT     reception jitter
188    *  "rb-lsr"           G_TYPE_UINT     last SR time
189    *  "rb-dlsr"          G_TYPE_UINT     delay since last SR
190    *
191    * The round trip of this source. This is calculated from the last RB
192    * values and the recption time of the last RB packet. Only present for
193    * non-internal sources.
194    *
195    *  "rb-round-trip"    G_TYPE_UINT     the round trip time in nanoseconds
196    */
197   g_object_class_install_property (gobject_class, PROP_STATS,
198       g_param_spec_boxed ("stats", "Stats",
199           "The stats of this source", GST_TYPE_STRUCTURE,
200           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
201
202   GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
203 }
204
205 /**
206  * rtp_source_reset:
207  * @src: an #RTPSource
208  *
209  * Reset the stats of @src.
210  */
211 void
212 rtp_source_reset (RTPSource * src)
213 {
214   src->received_bye = FALSE;
215
216   src->stats.cycles = -1;
217   src->stats.jitter = 0;
218   src->stats.transit = -1;
219   src->stats.curr_sr = 0;
220   src->stats.curr_rr = 0;
221 }
222
223 static void
224 rtp_source_init (RTPSource * src)
225 {
226   /* sources are initialy on probation until we receive enough valid RTP
227    * packets or a valid RTCP packet */
228   src->validated = FALSE;
229   src->internal = FALSE;
230   src->probation = RTP_DEFAULT_PROBATION;
231   src->closing = FALSE;
232
233   src->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
234
235   src->payload = -1;
236   src->clock_rate = -1;
237   src->packets = g_queue_new ();
238   src->seqnum_base = -1;
239   src->last_rtptime = -1;
240
241   src->retained_feedback = g_queue_new ();
242
243   rtp_source_reset (src);
244 }
245
246 static void
247 rtp_conflicting_address_free (RTPConflictingAddress * addr)
248 {
249   g_object_unref (addr->address);
250   g_free (addr);
251 }
252
253 static void
254 rtp_source_finalize (GObject * object)
255 {
256   RTPSource *src;
257   GstBuffer *buffer;
258
259   src = RTP_SOURCE_CAST (object);
260
261   while ((buffer = g_queue_pop_head (src->packets)))
262     gst_buffer_unref (buffer);
263   g_queue_free (src->packets);
264
265   gst_structure_free (src->sdes);
266
267   g_free (src->bye_reason);
268
269   gst_caps_replace (&src->caps, NULL);
270
271   g_list_foreach (src->conflicting_addresses,
272       (GFunc) rtp_conflicting_address_free, NULL);
273   g_list_free (src->conflicting_addresses);
274
275   while ((buffer = g_queue_pop_head (src->retained_feedback)))
276     gst_buffer_unref (buffer);
277   g_queue_free (src->retained_feedback);
278
279   if (src->rtp_from)
280     g_object_unref (src->rtp_from);
281   if (src->rtcp_from)
282     g_object_unref (src->rtcp_from);
283
284   G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object);
285 }
286
287 static GstStructure *
288 rtp_source_create_stats (RTPSource * src)
289 {
290   GstStructure *s;
291   gboolean is_sender = src->is_sender;
292   gboolean internal = src->internal;
293   gchar *address_str;
294   gboolean have_rb;
295   guint8 fractionlost = 0;
296   gint32 packetslost = 0;
297   guint32 exthighestseq = 0;
298   guint32 jitter = 0;
299   guint32 lsr = 0;
300   guint32 dlsr = 0;
301   guint32 round_trip = 0;
302   gboolean have_sr;
303   GstClockTime time = 0;
304   guint64 ntptime = 0;
305   guint32 rtptime = 0;
306   guint32 packet_count = 0;
307   guint32 octet_count = 0;
308
309
310   /* common data for all types of sources */
311   s = gst_structure_new ("application/x-rtp-source-stats",
312       "ssrc", G_TYPE_UINT, (guint) src->ssrc,
313       "internal", G_TYPE_BOOLEAN, internal,
314       "validated", G_TYPE_BOOLEAN, src->validated,
315       "received-bye", G_TYPE_BOOLEAN, src->received_bye,
316       "is-csrc", G_TYPE_BOOLEAN, src->is_csrc,
317       "is-sender", G_TYPE_BOOLEAN, is_sender,
318       "seqnum-base", G_TYPE_INT, src->seqnum_base,
319       "clock-rate", G_TYPE_INT, src->clock_rate, NULL);
320
321   /* add address and port */
322   if (src->rtp_from) {
323     address_str = __g_socket_address_to_string (src->rtp_from);
324     gst_structure_set (s, "rtp-from", G_TYPE_STRING, address_str, NULL);
325     g_free (address_str);
326   }
327   if (src->rtcp_from) {
328     address_str = __g_socket_address_to_string (src->rtcp_from);
329     gst_structure_set (s, "rtcp-from", G_TYPE_STRING, address_str, NULL);
330     g_free (address_str);
331   }
332
333   gst_structure_set (s,
334       "octets-sent", G_TYPE_UINT64, src->stats.octets_sent,
335       "packets-sent", G_TYPE_UINT64, src->stats.packets_sent,
336       "octets-received", G_TYPE_UINT64, src->stats.octets_received,
337       "packets-received", G_TYPE_UINT64, src->stats.packets_received,
338       "bitrate", G_TYPE_UINT64, src->bitrate,
339       "packets-lost", G_TYPE_INT,
340       (gint) rtp_stats_get_packets_lost (&src->stats), "jitter", G_TYPE_UINT,
341       (guint) (src->stats.jitter >> 4), NULL);
342
343   /* get the last SR. */
344   have_sr = rtp_source_get_last_sr (src, &time, &ntptime, &rtptime,
345       &packet_count, &octet_count);
346   gst_structure_set (s,
347       "have-sr", G_TYPE_BOOLEAN, have_sr,
348       "sr-ntptime", G_TYPE_UINT64, ntptime,
349       "sr-rtptime", G_TYPE_UINT, (guint) rtptime,
350       "sr-octet-count", G_TYPE_UINT, (guint) octet_count,
351       "sr-packet-count", G_TYPE_UINT, (guint) packet_count, NULL);
352
353   if (!internal) {
354     /* get the last RB we sent */
355     gst_structure_set (s,
356         "sent-rb", G_TYPE_BOOLEAN, src->last_rr.is_valid,
357         "sent-rb-fractionlost", G_TYPE_UINT, (guint) src->last_rr.fractionlost,
358         "sent-rb-packetslost", G_TYPE_INT, (gint) src->last_rr.packetslost,
359         "sent-rb-exthighestseq", G_TYPE_UINT,
360         (guint) src->last_rr.exthighestseq, "sent-rb-jitter", G_TYPE_UINT,
361         (guint) src->last_rr.jitter, "sent-rb-lsr", G_TYPE_UINT,
362         (guint) src->last_rr.lsr, "sent-rb-dlsr", G_TYPE_UINT,
363         (guint) src->last_rr.dlsr, NULL);
364
365     /* get the last RB */
366     have_rb = rtp_source_get_last_rb (src, &fractionlost, &packetslost,
367         &exthighestseq, &jitter, &lsr, &dlsr, &round_trip);
368
369     gst_structure_set (s,
370         "have-rb", G_TYPE_BOOLEAN, have_rb,
371         "rb-fractionlost", G_TYPE_UINT, (guint) fractionlost,
372         "rb-packetslost", G_TYPE_INT, (gint) packetslost,
373         "rb-exthighestseq", G_TYPE_UINT, (guint) exthighestseq,
374         "rb-jitter", G_TYPE_UINT, (guint) jitter,
375         "rb-lsr", G_TYPE_UINT, (guint) lsr,
376         "rb-dlsr", G_TYPE_UINT, (guint) dlsr,
377         "rb-round-trip", G_TYPE_UINT, (guint) round_trip, NULL);
378   }
379
380   return s;
381 }
382
383 /**
384  * rtp_source_get_sdes_struct:
385  * @src: an #RTPSource
386  *
387  * Get the SDES from @src. See the SDES property for more details.
388  *
389  * Returns: %GstStructure of type "application/x-rtp-source-sdes". The result is
390  * valid until the SDES items of @src are modified.
391  */
392 const GstStructure *
393 rtp_source_get_sdes_struct (RTPSource * src)
394 {
395   g_return_val_if_fail (RTP_IS_SOURCE (src), NULL);
396
397   return src->sdes;
398 }
399
400 static gboolean
401 sdes_struct_compare_func (GQuark field_id, const GValue * value,
402     gpointer user_data)
403 {
404   GstStructure *old;
405   const gchar *field;
406
407   old = GST_STRUCTURE (user_data);
408   field = g_quark_to_string (field_id);
409
410   if (!gst_structure_has_field (old, field))
411     return FALSE;
412
413   g_assert (G_VALUE_HOLDS_STRING (value));
414
415   return strcmp (g_value_get_string (value), gst_structure_get_string (old,
416           field)) == 0;
417 }
418
419 /**
420  * rtp_source_set_sdes:
421  * @src: an #RTPSource
422  * @sdes: the SDES structure
423  *
424  * Store the @sdes in @src. @sdes must be a structure of type
425  * "application/x-rtp-source-sdes", see the SDES property for more details.
426  *
427  * This function takes ownership of @sdes.
428  *
429  * Returns: %FALSE if the SDES was unchanged.
430  */
431 gboolean
432 rtp_source_set_sdes_struct (RTPSource * src, GstStructure * sdes)
433 {
434   gboolean changed;
435
436   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
437   g_return_val_if_fail (strcmp (gst_structure_get_name (sdes),
438           "application/x-rtp-source-sdes") == 0, FALSE);
439
440   changed = !gst_structure_foreach (sdes, sdes_struct_compare_func, src->sdes);
441
442   if (changed) {
443     gst_structure_free (src->sdes);
444     src->sdes = sdes;
445   } else {
446     gst_structure_free (sdes);
447   }
448
449   return changed;
450 }
451
452 static void
453 rtp_source_set_property (GObject * object, guint prop_id,
454     const GValue * value, GParamSpec * pspec)
455 {
456   RTPSource *src;
457
458   src = RTP_SOURCE (object);
459
460   switch (prop_id) {
461     case PROP_SSRC:
462       src->ssrc = g_value_get_uint (value);
463       break;
464     default:
465       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
466       break;
467   }
468 }
469
470 static void
471 rtp_source_get_property (GObject * object, guint prop_id,
472     GValue * value, GParamSpec * pspec)
473 {
474   RTPSource *src;
475
476   src = RTP_SOURCE (object);
477
478   switch (prop_id) {
479     case PROP_SSRC:
480       g_value_set_uint (value, rtp_source_get_ssrc (src));
481       break;
482     case PROP_IS_CSRC:
483       g_value_set_boolean (value, rtp_source_is_as_csrc (src));
484       break;
485     case PROP_IS_VALIDATED:
486       g_value_set_boolean (value, rtp_source_is_validated (src));
487       break;
488     case PROP_IS_SENDER:
489       g_value_set_boolean (value, rtp_source_is_sender (src));
490       break;
491     case PROP_SDES:
492       g_value_set_boxed (value, rtp_source_get_sdes_struct (src));
493       break;
494     case PROP_STATS:
495       g_value_take_boxed (value, rtp_source_create_stats (src));
496       break;
497     default:
498       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
499       break;
500   }
501 }
502
503 /**
504  * rtp_source_new:
505  * @ssrc: an SSRC
506  *
507  * Create a #RTPSource with @ssrc.
508  *
509  * Returns: a new #RTPSource. Use g_object_unref() after usage.
510  */
511 RTPSource *
512 rtp_source_new (guint32 ssrc)
513 {
514   RTPSource *src;
515
516   src = g_object_new (RTP_TYPE_SOURCE, NULL);
517   src->ssrc = ssrc;
518
519   return src;
520 }
521
522 /**
523  * rtp_source_set_callbacks:
524  * @src: an #RTPSource
525  * @cb: callback functions
526  * @user_data: user data
527  *
528  * Set the callbacks for the source.
529  */
530 void
531 rtp_source_set_callbacks (RTPSource * src, RTPSourceCallbacks * cb,
532     gpointer user_data)
533 {
534   g_return_if_fail (RTP_IS_SOURCE (src));
535
536   src->callbacks.push_rtp = cb->push_rtp;
537   src->callbacks.clock_rate = cb->clock_rate;
538   src->user_data = user_data;
539 }
540
541 /**
542  * rtp_source_get_ssrc:
543  * @src: an #RTPSource
544  *
545  * Get the SSRC of @source.
546  *
547  * Returns: the SSRC of src.
548  */
549 guint32
550 rtp_source_get_ssrc (RTPSource * src)
551 {
552   guint32 result;
553
554   g_return_val_if_fail (RTP_IS_SOURCE (src), 0);
555
556   result = src->ssrc;
557
558   return result;
559 }
560
561 /**
562  * rtp_source_set_as_csrc:
563  * @src: an #RTPSource
564  *
565  * Configure @src as a CSRC, this will also validate @src.
566  */
567 void
568 rtp_source_set_as_csrc (RTPSource * src)
569 {
570   g_return_if_fail (RTP_IS_SOURCE (src));
571
572   src->validated = TRUE;
573   src->is_csrc = TRUE;
574 }
575
576 /**
577  * rtp_source_is_as_csrc:
578  * @src: an #RTPSource
579  *
580  * Check if @src is a contributing source.
581  *
582  * Returns: %TRUE if @src is acting as a contributing source.
583  */
584 gboolean
585 rtp_source_is_as_csrc (RTPSource * src)
586 {
587   gboolean result;
588
589   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
590
591   result = src->is_csrc;
592
593   return result;
594 }
595
596 /**
597  * rtp_source_is_active:
598  * @src: an #RTPSource
599  *
600  * Check if @src is an active source. A source is active if it has been
601  * validated and has not yet received a BYE packet
602  *
603  * Returns: %TRUE if @src is an qactive source.
604  */
605 gboolean
606 rtp_source_is_active (RTPSource * src)
607 {
608   gboolean result;
609
610   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
611
612   result = RTP_SOURCE_IS_ACTIVE (src);
613
614   return result;
615 }
616
617 /**
618  * rtp_source_is_validated:
619  * @src: an #RTPSource
620  *
621  * Check if @src is a validated source.
622  *
623  * Returns: %TRUE if @src is a validated source.
624  */
625 gboolean
626 rtp_source_is_validated (RTPSource * src)
627 {
628   gboolean result;
629
630   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
631
632   result = src->validated;
633
634   return result;
635 }
636
637 /**
638  * rtp_source_is_sender:
639  * @src: an #RTPSource
640  *
641  * Check if @src is a sending source.
642  *
643  * Returns: %TRUE if @src is a sending source.
644  */
645 gboolean
646 rtp_source_is_sender (RTPSource * src)
647 {
648   gboolean result;
649
650   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
651
652   result = RTP_SOURCE_IS_SENDER (src);
653
654   return result;
655 }
656
657 /**
658  * rtp_source_received_bye:
659  * @src: an #RTPSource
660  *
661  * Check if @src has receoved a BYE packet.
662  *
663  * Returns: %TRUE if @src has received a BYE packet.
664  */
665 gboolean
666 rtp_source_received_bye (RTPSource * src)
667 {
668   gboolean result;
669
670   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
671
672   result = src->received_bye;
673
674   return result;
675 }
676
677
678 /**
679  * rtp_source_get_bye_reason:
680  * @src: an #RTPSource
681  *
682  * Get the BYE reason for @src. Check if the source receoved a BYE message first
683  * with rtp_source_received_bye().
684  *
685  * Returns: The BYE reason or NULL when no reason was given or the source did
686  * not receive a BYE message yet. g_fee() after usage.
687  */
688 gchar *
689 rtp_source_get_bye_reason (RTPSource * src)
690 {
691   gchar *result;
692
693   g_return_val_if_fail (RTP_IS_SOURCE (src), NULL);
694
695   result = g_strdup (src->bye_reason);
696
697   return result;
698 }
699
700 /**
701  * rtp_source_update_caps:
702  * @src: an #RTPSource
703  * @caps: a #GstCaps
704  *
705  * Parse @caps and store all relevant information in @source.
706  */
707 void
708 rtp_source_update_caps (RTPSource * src, GstCaps * caps)
709 {
710   GstStructure *s;
711   guint val;
712   gint ival;
713
714   /* nothing changed, return */
715   if (caps == NULL || src->caps == caps)
716     return;
717
718   s = gst_caps_get_structure (caps, 0);
719
720   if (gst_structure_get_int (s, "payload", &ival))
721     src->payload = ival;
722   else
723     src->payload = -1;
724   GST_DEBUG ("got payload %d", src->payload);
725
726   if (gst_structure_get_int (s, "clock-rate", &ival))
727     src->clock_rate = ival;
728   else
729     src->clock_rate = -1;
730
731   GST_DEBUG ("got clock-rate %d", src->clock_rate);
732
733   if (gst_structure_get_uint (s, "seqnum-base", &val))
734     src->seqnum_base = val;
735   else
736     src->seqnum_base = -1;
737
738   GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base);
739
740   gst_caps_replace (&src->caps, caps);
741 }
742
743 /**
744  * rtp_source_set_sdes_string:
745  * @src: an #RTPSource
746  * @type: the type of the SDES item
747  * @data: the SDES data
748  *
749  * Store an SDES item of @type in @src.
750  *
751  * Returns: %FALSE if the SDES item was unchanged or @type is unknown.
752  */
753 gboolean
754 rtp_source_set_sdes_string (RTPSource * src, GstRTCPSDESType type,
755     const gchar * data)
756 {
757   const gchar *old;
758   const gchar *field;
759
760   field = gst_rtcp_sdes_type_to_name (type);
761
762   if (gst_structure_has_field (src->sdes, field))
763     old = gst_structure_get_string (src->sdes, field);
764   else
765     old = NULL;
766
767   if (old == NULL && data == NULL)
768     return FALSE;
769
770   if (old != NULL && data != NULL && strcmp (old, data) == 0)
771     return FALSE;
772
773   if (data == NULL)
774     gst_structure_remove_field (src->sdes, field);
775   else
776     gst_structure_set (src->sdes, field, G_TYPE_STRING, data, NULL);
777
778   return TRUE;
779 }
780
781 /**
782  * rtp_source_get_sdes_string:
783  * @src: an #RTPSource
784  * @type: the type of the SDES item
785  *
786  * Get the SDES item of @type from @src.
787  *
788  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
789  * valid or the SDES item was unset. g_free() after usage.
790  */
791 gchar *
792 rtp_source_get_sdes_string (RTPSource * src, GstRTCPSDESType type)
793 {
794   gchar *result;
795   const gchar *type_name;
796
797   g_return_val_if_fail (RTP_IS_SOURCE (src), NULL);
798
799   if (type < 0 || type > GST_RTCP_SDES_PRIV - 1)
800     return NULL;
801
802   type_name = gst_rtcp_sdes_type_to_name (type);
803
804   if (!gst_structure_has_field (src->sdes, type_name))
805     return NULL;
806
807   result = g_strdup (gst_structure_get_string (src->sdes, type_name));
808
809   return result;
810 }
811
812 /**
813  * rtp_source_set_rtp_from:
814  * @src: an #RTPSource
815  * @address: the RTP address to set
816  *
817  * Set that @src is receiving RTP packets from @address. This is used for
818  * collistion checking.
819  */
820 void
821 rtp_source_set_rtp_from (RTPSource * src, GSocketAddress * address)
822 {
823   g_return_if_fail (RTP_IS_SOURCE (src));
824
825   if (src->rtp_from)
826     g_object_unref (src->rtp_from);
827   src->rtp_from = G_SOCKET_ADDRESS (g_object_ref (address));
828 }
829
830 /**
831  * rtp_source_set_rtcp_from:
832  * @src: an #RTPSource
833  * @address: the RTCP address to set
834  *
835  * Set that @src is receiving RTCP packets from @address. This is used for
836  * collistion checking.
837  */
838 void
839 rtp_source_set_rtcp_from (RTPSource * src, GSocketAddress * address)
840 {
841   g_return_if_fail (RTP_IS_SOURCE (src));
842
843   if (src->rtcp_from)
844     g_object_unref (src->rtcp_from);
845   src->rtcp_from = G_SOCKET_ADDRESS (g_object_ref (address));
846 }
847
848 static GstFlowReturn
849 push_packet (RTPSource * src, GstBuffer * buffer)
850 {
851   GstFlowReturn ret = GST_FLOW_OK;
852
853   /* push queued packets first if any */
854   while (!g_queue_is_empty (src->packets)) {
855     GstBuffer *buffer = GST_BUFFER_CAST (g_queue_pop_head (src->packets));
856
857     GST_LOG ("pushing queued packet");
858     if (src->callbacks.push_rtp)
859       src->callbacks.push_rtp (src, buffer, src->user_data);
860     else
861       gst_buffer_unref (buffer);
862   }
863   GST_LOG ("pushing new packet");
864   /* push packet */
865   if (src->callbacks.push_rtp)
866     ret = src->callbacks.push_rtp (src, buffer, src->user_data);
867   else
868     gst_buffer_unref (buffer);
869
870   return ret;
871 }
872
873 static gint
874 get_clock_rate (RTPSource * src, guint8 payload)
875 {
876   if (src->payload == -1) {
877     /* first payload received, nothing was in the caps, lock on to this payload */
878     src->payload = payload;
879     GST_DEBUG ("first payload %d", payload);
880   } else if (payload != src->payload) {
881     /* we have a different payload than before, reset the clock-rate */
882     GST_DEBUG ("new payload %d", payload);
883     src->payload = payload;
884     src->clock_rate = -1;
885     src->stats.transit = -1;
886   }
887
888   if (src->clock_rate == -1) {
889     gint clock_rate = -1;
890
891     if (src->callbacks.clock_rate)
892       clock_rate = src->callbacks.clock_rate (src, payload, src->user_data);
893
894     GST_DEBUG ("got clock-rate %d", clock_rate);
895
896     src->clock_rate = clock_rate;
897   }
898   return src->clock_rate;
899 }
900
901 /* Jitter is the variation in the delay of received packets in a flow. It is
902  * measured by comparing the interval when RTP packets were sent to the interval
903  * at which they were received. For instance, if packet #1 and packet #2 leave
904  * 50 milliseconds apart and arrive 60 milliseconds apart, then the jitter is 10
905  * milliseconds. */
906 static void
907 calculate_jitter (RTPSource * src, GstBuffer * buffer,
908     RTPArrivalStats * arrival)
909 {
910   GstClockTime running_time;
911   guint32 rtparrival, transit, rtptime;
912   gint32 diff;
913   gint clock_rate;
914   guint8 pt;
915   GstRTPBuffer rtp = { NULL };
916
917   /* get arrival time */
918   if ((running_time = arrival->running_time) == GST_CLOCK_TIME_NONE)
919     goto no_time;
920
921   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
922   pt = gst_rtp_buffer_get_payload_type (&rtp);
923
924   GST_LOG ("SSRC %08x got payload %d", src->ssrc, pt);
925
926   /* get clockrate */
927   if ((clock_rate = get_clock_rate (src, pt)) == -1) {
928     gst_rtp_buffer_unmap (&rtp);
929     goto no_clock_rate;
930   }
931
932   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
933
934   /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
935    * care about the absolute value, just the difference. */
936   rtparrival = gst_util_uint64_scale_int (running_time, clock_rate, GST_SECOND);
937
938   /* transit time is difference with RTP timestamp */
939   transit = rtparrival - rtptime;
940
941   /* get ABS diff with previous transit time */
942   if (src->stats.transit != -1) {
943     if (transit > src->stats.transit)
944       diff = transit - src->stats.transit;
945     else
946       diff = src->stats.transit - transit;
947   } else
948     diff = 0;
949
950   src->stats.transit = transit;
951
952   /* update jitter, the value we store is scaled up so we can keep precision. */
953   src->stats.jitter += diff - ((src->stats.jitter + 8) >> 4);
954
955   src->stats.prev_rtptime = src->stats.last_rtptime;
956   src->stats.last_rtptime = rtparrival;
957
958   GST_LOG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %f",
959       rtparrival, rtptime, clock_rate, diff, (src->stats.jitter) / 16.0);
960
961   gst_rtp_buffer_unmap (&rtp);
962   return;
963
964   /* ERRORS */
965 no_time:
966   {
967     GST_WARNING ("cannot get current running_time");
968     return;
969   }
970 no_clock_rate:
971   {
972     GST_WARNING ("cannot get clock-rate for pt %d", pt);
973     return;
974   }
975 }
976
977 static void
978 init_seq (RTPSource * src, guint16 seq)
979 {
980   src->stats.base_seq = seq;
981   src->stats.max_seq = seq;
982   src->stats.bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */
983   src->stats.cycles = 0;
984   src->stats.packets_received = 0;
985   src->stats.octets_received = 0;
986   src->stats.bytes_received = 0;
987   src->stats.prev_received = 0;
988   src->stats.prev_expected = 0;
989
990   GST_DEBUG ("base_seq %d", seq);
991 }
992
993 #define BITRATE_INTERVAL (2 * GST_SECOND)
994
995 static void
996 do_bitrate_estimation (RTPSource * src, GstClockTime running_time,
997     guint64 * bytes_handled)
998 {
999   guint64 elapsed;
1000
1001   if (src->prev_rtime) {
1002     elapsed = running_time - src->prev_rtime;
1003
1004     if (elapsed > BITRATE_INTERVAL) {
1005       guint64 rate;
1006
1007       rate = gst_util_uint64_scale (*bytes_handled, 8 * GST_SECOND, elapsed);
1008
1009       GST_LOG ("Elapsed %" G_GUINT64_FORMAT ", bytes %" G_GUINT64_FORMAT
1010           ", rate %" G_GUINT64_FORMAT, elapsed, *bytes_handled, rate);
1011
1012       if (src->bitrate == 0)
1013         src->bitrate = rate;
1014       else
1015         src->bitrate = ((src->bitrate * 3) + rate) / 4;
1016
1017       src->prev_rtime = running_time;
1018       *bytes_handled = 0;
1019     }
1020   } else {
1021     GST_LOG ("Reset bitrate measurement");
1022     src->prev_rtime = running_time;
1023     src->bitrate = 0;
1024   }
1025 }
1026
1027 /**
1028  * rtp_source_process_rtp:
1029  * @src: an #RTPSource
1030  * @buffer: an RTP buffer
1031  *
1032  * Let @src handle the incomming RTP @buffer.
1033  *
1034  * Returns: a #GstFlowReturn.
1035  */
1036 GstFlowReturn
1037 rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
1038     RTPArrivalStats * arrival)
1039 {
1040   GstFlowReturn result = GST_FLOW_OK;
1041   guint16 seqnr, udelta;
1042   RTPSourceStats *stats;
1043   guint16 expected;
1044   GstRTPBuffer rtp = { NULL };
1045
1046   g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
1047   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1048
1049   stats = &src->stats;
1050
1051   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
1052   seqnr = gst_rtp_buffer_get_seq (&rtp);
1053   gst_rtp_buffer_unmap (&rtp);
1054
1055   if (stats->cycles == -1) {
1056     GST_DEBUG ("received first buffer");
1057     /* first time we heard of this source */
1058     init_seq (src, seqnr);
1059     src->stats.max_seq = seqnr - 1;
1060     src->probation = RTP_DEFAULT_PROBATION;
1061   }
1062
1063   udelta = seqnr - stats->max_seq;
1064
1065   /* if we are still on probation, check seqnum */
1066   if (src->probation) {
1067     expected = src->stats.max_seq + 1;
1068
1069     /* when in probation, we require consecutive seqnums */
1070     if (seqnr == expected) {
1071       /* expected packet */
1072       GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
1073       src->probation--;
1074       src->stats.max_seq = seqnr;
1075       if (src->probation == 0) {
1076         GST_DEBUG ("probation done!");
1077         init_seq (src, seqnr);
1078       } else {
1079         GstBuffer *q;
1080
1081         GST_DEBUG ("probation %d: queue buffer", src->probation);
1082         /* when still in probation, keep packets in a list. */
1083         g_queue_push_tail (src->packets, buffer);
1084         /* remove packets from queue if there are too many */
1085         while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
1086           q = g_queue_pop_head (src->packets);
1087           gst_buffer_unref (q);
1088         }
1089         goto done;
1090       }
1091     } else {
1092       /* unexpected seqnum in probation */
1093       goto probation_seqnum;
1094     }
1095   } else if (udelta < RTP_MAX_DROPOUT) {
1096     /* in order, with permissible gap */
1097     if (seqnr < stats->max_seq) {
1098       /* sequence number wrapped - count another 64K cycle. */
1099       stats->cycles += RTP_SEQ_MOD;
1100     }
1101     stats->max_seq = seqnr;
1102   } else if (udelta <= RTP_SEQ_MOD - RTP_MAX_MISORDER) {
1103     /* the sequence number made a very large jump */
1104     if (seqnr == stats->bad_seq) {
1105       /* two sequential packets -- assume that the other side
1106        * restarted without telling us so just re-sync
1107        * (i.e., pretend this was the first packet).  */
1108       init_seq (src, seqnr);
1109     } else {
1110       /* unacceptable jump */
1111       stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1);
1112       goto bad_sequence;
1113     }
1114   } else {
1115     /* duplicate or reordered packet, will be filtered by jitterbuffer. */
1116     GST_WARNING ("duplicate or reordered packet");
1117   }
1118
1119   src->stats.octets_received += arrival->payload_len;
1120   src->stats.bytes_received += arrival->bytes;
1121   src->stats.packets_received++;
1122   /* for the bitrate estimation */
1123   src->bytes_received += arrival->payload_len;
1124   /* the source that sent the packet must be a sender */
1125   src->is_sender = TRUE;
1126   src->validated = TRUE;
1127
1128   do_bitrate_estimation (src, arrival->running_time, &src->bytes_received);
1129
1130   GST_LOG ("seq %d, PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT,
1131       seqnr, src->stats.packets_received, src->stats.octets_received);
1132
1133   /* calculate jitter for the stats */
1134   calculate_jitter (src, buffer, arrival);
1135
1136   /* we're ready to push the RTP packet now */
1137   result = push_packet (src, buffer);
1138
1139 done:
1140   return result;
1141
1142   /* ERRORS */
1143 bad_sequence:
1144   {
1145     GST_WARNING ("unacceptable seqnum received");
1146     gst_buffer_unref (buffer);
1147     return GST_FLOW_OK;
1148   }
1149 probation_seqnum:
1150   {
1151     GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected);
1152     src->probation = RTP_DEFAULT_PROBATION;
1153     src->stats.max_seq = seqnr;
1154     gst_buffer_unref (buffer);
1155     return GST_FLOW_OK;
1156   }
1157 }
1158
1159 /**
1160  * rtp_source_process_bye:
1161  * @src: an #RTPSource
1162  * @reason: the reason for leaving
1163  *
1164  * Notify @src that a BYE packet has been received. This will make the source
1165  * inactive.
1166  */
1167 void
1168 rtp_source_process_bye (RTPSource * src, const gchar * reason)
1169 {
1170   g_return_if_fail (RTP_IS_SOURCE (src));
1171
1172   GST_DEBUG ("marking SSRC %08x as BYE, reason: %s", src->ssrc,
1173       GST_STR_NULL (reason));
1174
1175   /* copy the reason and mark as received_bye */
1176   g_free (src->bye_reason);
1177   src->bye_reason = g_strdup (reason);
1178   src->received_bye = TRUE;
1179 }
1180
1181 static gboolean
1182 set_ssrc (GstBuffer ** buffer, guint idx, RTPSource * src)
1183 {
1184   GstRTPBuffer rtp = { NULL };
1185
1186   *buffer = gst_buffer_make_writable (*buffer);
1187   gst_rtp_buffer_map (*buffer, GST_MAP_WRITE, &rtp);
1188   gst_rtp_buffer_set_ssrc (&rtp, src->ssrc);
1189   gst_rtp_buffer_unmap (&rtp);
1190   return TRUE;
1191 }
1192
1193 /**
1194  * rtp_source_send_rtp:
1195  * @src: an #RTPSource
1196  * @data: an RTP buffer or a list of RTP buffers
1197  * @is_list: if @data is a buffer or list
1198  * @running_time: the running time of @data
1199  *
1200  * Send @data (an RTP buffer or list of buffers) originating from @src.
1201  * This will make @src a sender. This function takes ownership of @data and
1202  * modifies the SSRC in the RTP packet to that of @src when needed.
1203  *
1204  * Returns: a #GstFlowReturn.
1205  */
1206 GstFlowReturn
1207 rtp_source_send_rtp (RTPSource * src, gpointer data, gboolean is_list,
1208     GstClockTime running_time)
1209 {
1210   GstFlowReturn result;
1211   guint len;
1212   guint32 rtptime;
1213   guint64 ext_rtptime;
1214   guint64 rt_diff, rtp_diff;
1215   GstBufferList *list = NULL;
1216   GstBuffer *buffer = NULL;
1217   guint packets;
1218   guint32 ssrc;
1219   GstRTPBuffer rtp = { NULL };
1220
1221   g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
1222   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
1223
1224   if (is_list) {
1225     list = GST_BUFFER_LIST_CAST (data);
1226
1227     /* We can grab the caps from the first group, since all
1228      * groups of a buffer list have same caps. */
1229     buffer = gst_buffer_list_get (list, 0);
1230     if (!buffer)
1231       goto no_buffer;
1232   } else {
1233     buffer = GST_BUFFER_CAST (data);
1234   }
1235
1236   /* we are a sender now */
1237   src->is_sender = TRUE;
1238
1239   if (is_list) {
1240     gint i;
1241
1242     /* Each group makes up a network packet. */
1243     packets = gst_buffer_list_length (list);
1244     for (i = 0, len = 0; i < packets; i++) {
1245       gst_rtp_buffer_map (gst_buffer_list_get (list, i), GST_MAP_READ, &rtp);
1246       len += gst_rtp_buffer_get_payload_len (&rtp);
1247       gst_rtp_buffer_unmap (&rtp);
1248     }
1249     /* subsequent info taken from first list member */
1250     gst_rtp_buffer_map (gst_buffer_list_get (list, 0), GST_MAP_READ, &rtp);
1251   } else {
1252     packets = 1;
1253     gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
1254     len = gst_rtp_buffer_get_payload_len (&rtp);
1255   }
1256
1257   /* update stats for the SR */
1258   src->stats.packets_sent += packets;
1259   src->stats.octets_sent += len;
1260   src->bytes_sent += len;
1261
1262   do_bitrate_estimation (src, running_time, &src->bytes_sent);
1263
1264   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1265   ext_rtptime = src->last_rtptime;
1266   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
1267
1268   GST_LOG ("SSRC %08x, RTP %" G_GUINT64_FORMAT ", running_time %"
1269       GST_TIME_FORMAT, src->ssrc, ext_rtptime, GST_TIME_ARGS (running_time));
1270
1271   if (ext_rtptime > src->last_rtptime) {
1272     rtp_diff = ext_rtptime - src->last_rtptime;
1273     rt_diff = running_time - src->last_rtime;
1274
1275     /* calc the diff so we can detect drift at the sender. This can also be used
1276      * to guestimate the clock rate if the NTP time is locked to the RTP
1277      * timestamps (as is the case when the capture device is providing the clock). */
1278     GST_LOG ("SSRC %08x, diff RTP %" G_GUINT64_FORMAT ", diff running_time %"
1279         GST_TIME_FORMAT, src->ssrc, rtp_diff, GST_TIME_ARGS (rt_diff));
1280   }
1281
1282   /* we keep track of the last received RTP timestamp and the corresponding
1283    * buffer running_time so that we can use this info when constructing SR reports */
1284   src->last_rtime = running_time;
1285   src->last_rtptime = ext_rtptime;
1286
1287   /* push packet */
1288   if (!src->callbacks.push_rtp) {
1289     gst_rtp_buffer_unmap (&rtp);
1290     goto no_callback;
1291   }
1292
1293   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
1294   gst_rtp_buffer_unmap (&rtp);
1295
1296   if (ssrc != src->ssrc) {
1297     /* the SSRC of the packet is not correct, make a writable buffer and
1298      * update the SSRC. This could involve a complete copy of the packet when
1299      * it is not writable. Usually the payloader will use caps negotiation to
1300      * get the correct SSRC from the session manager before pushing anything. */
1301
1302     /* FIXME, we don't want to warn yet because we can't inform any payloader
1303      * of the changes SSRC yet because we don't implement pad-alloc. */
1304     GST_LOG ("updating SSRC from %08x to %08x, fix the payloader", ssrc,
1305         src->ssrc);
1306
1307     if (is_list) {
1308       list = gst_buffer_list_make_writable (list);
1309       gst_buffer_list_foreach (list, (GstBufferListFunc) set_ssrc, src);
1310     } else {
1311       set_ssrc (&buffer, 0, src);
1312     }
1313   }
1314   GST_LOG ("pushing RTP %s %" G_GUINT64_FORMAT, is_list ? "list" : "packet",
1315       src->stats.packets_sent);
1316
1317   result = src->callbacks.push_rtp (src, data, src->user_data);
1318
1319   return result;
1320
1321   /* ERRORS */
1322 no_buffer:
1323   {
1324     GST_WARNING ("no buffers in buffer list");
1325     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1326     return GST_FLOW_OK;
1327   }
1328 no_callback:
1329   {
1330     GST_WARNING ("no callback installed, dropping packet");
1331     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1332     return GST_FLOW_OK;
1333   }
1334 }
1335
1336 /**
1337  * rtp_source_process_sr:
1338  * @src: an #RTPSource
1339  * @time: time of packet arrival
1340  * @ntptime: the NTP time in 32.32 fixed point
1341  * @rtptime: the RTP time
1342  * @packet_count: the packet count
1343  * @octet_count: the octect count
1344  *
1345  * Update the sender report in @src.
1346  */
1347 void
1348 rtp_source_process_sr (RTPSource * src, GstClockTime time, guint64 ntptime,
1349     guint32 rtptime, guint32 packet_count, guint32 octet_count)
1350 {
1351   RTPSenderReport *curr;
1352   gint curridx;
1353
1354   g_return_if_fail (RTP_IS_SOURCE (src));
1355
1356   GST_DEBUG ("got SR packet: SSRC %08x, NTP %08x:%08x, RTP %" G_GUINT32_FORMAT
1357       ", PC %" G_GUINT32_FORMAT ", OC %" G_GUINT32_FORMAT, src->ssrc,
1358       (guint32) (ntptime >> 32), (guint32) (ntptime & 0xffffffff), rtptime,
1359       packet_count, octet_count);
1360
1361   curridx = src->stats.curr_sr ^ 1;
1362   curr = &src->stats.sr[curridx];
1363
1364   /* this is a sender now */
1365   src->is_sender = TRUE;
1366
1367   /* update current */
1368   curr->is_valid = TRUE;
1369   curr->ntptime = ntptime;
1370   curr->rtptime = rtptime;
1371   curr->packet_count = packet_count;
1372   curr->octet_count = octet_count;
1373   curr->time = time;
1374
1375   /* make current */
1376   src->stats.curr_sr = curridx;
1377
1378   src->stats.prev_rtcptime = src->stats.last_rtcptime;
1379   src->stats.last_rtcptime = time;
1380 }
1381
1382 /**
1383  * rtp_source_process_rb:
1384  * @src: an #RTPSource
1385  * @ntpnstime: the current time in nanoseconds since 1970
1386  * @fractionlost: fraction lost since last SR/RR
1387  * @packetslost: the cumululative number of packets lost
1388  * @exthighestseq: the extended last sequence number received
1389  * @jitter: the interarrival jitter
1390  * @lsr: the last SR packet from this source
1391  * @dlsr: the delay since last SR packet
1392  *
1393  * Update the report block in @src.
1394  */
1395 void
1396 rtp_source_process_rb (RTPSource * src, guint64 ntpnstime,
1397     guint8 fractionlost, gint32 packetslost, guint32 exthighestseq,
1398     guint32 jitter, guint32 lsr, guint32 dlsr)
1399 {
1400   RTPReceiverReport *curr;
1401   gint curridx;
1402   guint32 ntp, A;
1403   guint64 f_ntp;
1404
1405   g_return_if_fail (RTP_IS_SOURCE (src));
1406
1407   GST_DEBUG ("got RB packet: SSRC %08x, FL %2x, PL %d, HS %" G_GUINT32_FORMAT
1408       ", jitter %" G_GUINT32_FORMAT ", LSR %04x:%04x, DLSR %04x:%04x",
1409       src->ssrc, fractionlost, packetslost, exthighestseq, jitter, lsr >> 16,
1410       lsr & 0xffff, dlsr >> 16, dlsr & 0xffff);
1411
1412   curridx = src->stats.curr_rr ^ 1;
1413   curr = &src->stats.rr[curridx];
1414
1415   /* update current */
1416   curr->is_valid = TRUE;
1417   curr->fractionlost = fractionlost;
1418   curr->packetslost = packetslost;
1419   curr->exthighestseq = exthighestseq;
1420   curr->jitter = jitter;
1421   curr->lsr = lsr;
1422   curr->dlsr = dlsr;
1423
1424   /* convert the NTP time in nanoseconds to 32.32 fixed point */
1425   f_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND);
1426   /* calculate round trip, round the time up */
1427   ntp = ((f_ntp + 0xffff) >> 16) & 0xffffffff;
1428
1429   A = dlsr + lsr;
1430   if (A > 0 && ntp > A)
1431     A = ntp - A;
1432   else
1433     A = 0;
1434   curr->round_trip = A;
1435
1436   GST_DEBUG ("NTP %04x:%04x, round trip %04x:%04x", ntp >> 16, ntp & 0xffff,
1437       A >> 16, A & 0xffff);
1438
1439   /* make current */
1440   src->stats.curr_rr = curridx;
1441 }
1442
1443 /**
1444  * rtp_source_get_new_sr:
1445  * @src: an #RTPSource
1446  * @ntpnstime: the current time in nanoseconds since 1970
1447  * @running_time: the current running_time of the pipeline.
1448  * @ntptime: the NTP time in 32.32 fixed point
1449  * @rtptime: the RTP time corresponding to @ntptime
1450  * @packet_count: the packet count
1451  * @octet_count: the octect count
1452  *
1453  * Get new values to put into a new SR report from this source.
1454  *
1455  * @running_time and @ntpnstime are captured at the same time and represent the
1456  * running time of the pipeline clock and the absolute current system time in
1457  * nanoseconds respectively. Together with the last running_time and rtp timestamp
1458  * we have observed in the source, we can generate @ntptime and @rtptime for an SR
1459  * packet. @ntptime is basically the fixed point representation of @ntpnstime
1460  * and @rtptime the associated RTP timestamp.
1461  *
1462  * Returns: %TRUE on success.
1463  */
1464 gboolean
1465 rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime,
1466     GstClockTime running_time, guint64 * ntptime, guint32 * rtptime,
1467     guint32 * packet_count, guint32 * octet_count)
1468 {
1469   guint64 t_rtp;
1470   guint64 t_current_ntp;
1471   GstClockTimeDiff diff;
1472
1473   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
1474
1475   /* We last saw a buffer with last_rtptime at last_rtime. Given a running_time
1476    * and an NTP time, we can scale the RTP timestamps so that they match the
1477    * given NTP time.  for scaling, we assume that the slope of the rtptime vs
1478    * running_time vs ntptime curve is close to 1, which is certainly
1479    * sufficient for the frequency at which we report SR and the rate we send
1480    * out RTP packets. */
1481   t_rtp = src->last_rtptime;
1482
1483   GST_DEBUG ("last_rtime %" GST_TIME_FORMAT ", last_rtptime %"
1484       G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_rtime), t_rtp);
1485
1486   if (src->clock_rate != -1) {
1487     /* get the diff between the clock running_time and the buffer running_time.
1488      * This is the elapsed time, as measured against the pipeline clock, between
1489      * when the rtp timestamp was observed and the current running_time.
1490      *
1491      * We need to apply this diff to the RTP timestamp to get the RTP timestamp
1492      * for the given ntpnstime. */
1493     diff = GST_CLOCK_DIFF (src->last_rtime, running_time);
1494
1495     /* now translate the diff to RTP time, handle positive and negative cases.
1496      * If there is no diff, we already set rtptime correctly above. */
1497     if (diff > 0) {
1498       GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT,
1499           GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff));
1500       t_rtp += gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND);
1501     } else {
1502       diff = -diff;
1503       GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT,
1504           GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff));
1505       t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND);
1506     }
1507   } else {
1508     GST_WARNING ("no clock-rate, cannot interpolate rtp time");
1509   }
1510
1511   /* convert the NTP time in nanoseconds to 32.32 fixed point */
1512   t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND);
1513
1514   GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT,
1515       (guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff),
1516       (guint32) t_rtp);
1517
1518   if (ntptime)
1519     *ntptime = t_current_ntp;
1520   if (rtptime)
1521     *rtptime = t_rtp;
1522   if (packet_count)
1523     *packet_count = src->stats.packets_sent;
1524   if (octet_count)
1525     *octet_count = src->stats.octets_sent;
1526
1527   return TRUE;
1528 }
1529
1530 /**
1531  * rtp_source_get_new_rb:
1532  * @src: an #RTPSource
1533  * @time: the current time of the system clock
1534  * @fractionlost: fraction lost since last SR/RR
1535  * @packetslost: the cumululative number of packets lost
1536  * @exthighestseq: the extended last sequence number received
1537  * @jitter: the interarrival jitter
1538  * @lsr: the last SR packet from this source
1539  * @dlsr: the delay since last SR packet
1540  *
1541  * Get new values to put into a new report block from this source.
1542  *
1543  * Returns: %TRUE on success.
1544  */
1545 gboolean
1546 rtp_source_get_new_rb (RTPSource * src, GstClockTime time,
1547     guint8 * fractionlost, gint32 * packetslost, guint32 * exthighestseq,
1548     guint32 * jitter, guint32 * lsr, guint32 * dlsr)
1549 {
1550   RTPSourceStats *stats;
1551   guint64 extended_max, expected;
1552   guint64 expected_interval, received_interval, ntptime;
1553   gint64 lost, lost_interval;
1554   guint32 fraction, LSR, DLSR;
1555   GstClockTime sr_time;
1556
1557   stats = &src->stats;
1558
1559   extended_max = stats->cycles + stats->max_seq;
1560   expected = extended_max - stats->base_seq + 1;
1561
1562   GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT
1563       ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT,
1564       extended_max, expected, stats->packets_received, stats->base_seq);
1565
1566   lost = expected - stats->packets_received;
1567   lost = CLAMP (lost, -0x800000, 0x7fffff);
1568
1569   expected_interval = expected - stats->prev_expected;
1570   stats->prev_expected = expected;
1571   received_interval = stats->packets_received - stats->prev_received;
1572   stats->prev_received = stats->packets_received;
1573
1574   lost_interval = expected_interval - received_interval;
1575
1576   if (expected_interval == 0 || lost_interval <= 0)
1577     fraction = 0;
1578   else
1579     fraction = (lost_interval << 8) / expected_interval;
1580
1581   GST_DEBUG ("add RR for SSRC %08x", src->ssrc);
1582   /* we scaled the jitter up for additional precision */
1583   GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT
1584       ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost,
1585       extended_max, stats->jitter >> 4);
1586
1587   if (rtp_source_get_last_sr (src, &sr_time, &ntptime, NULL, NULL, NULL)) {
1588     GstClockTime diff;
1589
1590     /* LSR is middle 32 bits of the last ntptime */
1591     LSR = (ntptime >> 16) & 0xffffffff;
1592     diff = time - sr_time;
1593     GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff));
1594     /* DLSR, delay since last SR is expressed in 1/65536 second units */
1595     DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND);
1596   } else {
1597     /* No valid SR received, LSR/DLSR are set to 0 then */
1598     GST_DEBUG ("no valid SR received");
1599     LSR = 0;
1600     DLSR = 0;
1601   }
1602   GST_DEBUG ("LSR %04x:%04x, DLSR %04x:%04x", LSR >> 16, LSR & 0xffff,
1603       DLSR >> 16, DLSR & 0xffff);
1604
1605   if (fractionlost)
1606     *fractionlost = fraction;
1607   if (packetslost)
1608     *packetslost = lost;
1609   if (exthighestseq)
1610     *exthighestseq = extended_max;
1611   if (jitter)
1612     *jitter = stats->jitter >> 4;
1613   if (lsr)
1614     *lsr = LSR;
1615   if (dlsr)
1616     *dlsr = DLSR;
1617
1618   return TRUE;
1619 }
1620
1621 /**
1622  * rtp_source_get_last_sr:
1623  * @src: an #RTPSource
1624  * @time: time of packet arrival
1625  * @ntptime: the NTP time in 32.32 fixed point
1626  * @rtptime: the RTP time
1627  * @packet_count: the packet count
1628  * @octet_count: the octect count
1629  *
1630  * Get the values of the last sender report as set with rtp_source_process_sr().
1631  *
1632  * Returns: %TRUE if there was a valid SR report.
1633  */
1634 gboolean
1635 rtp_source_get_last_sr (RTPSource * src, GstClockTime * time, guint64 * ntptime,
1636     guint32 * rtptime, guint32 * packet_count, guint32 * octet_count)
1637 {
1638   RTPSenderReport *curr;
1639
1640   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
1641
1642   curr = &src->stats.sr[src->stats.curr_sr];
1643   if (!curr->is_valid)
1644     return FALSE;
1645
1646   if (ntptime)
1647     *ntptime = curr->ntptime;
1648   if (rtptime)
1649     *rtptime = curr->rtptime;
1650   if (packet_count)
1651     *packet_count = curr->packet_count;
1652   if (octet_count)
1653     *octet_count = curr->octet_count;
1654   if (time)
1655     *time = curr->time;
1656
1657   return TRUE;
1658 }
1659
1660 /**
1661  * rtp_source_get_last_rb:
1662  * @src: an #RTPSource
1663  * @fractionlost: fraction lost since last SR/RR
1664  * @packetslost: the cumululative number of packets lost
1665  * @exthighestseq: the extended last sequence number received
1666  * @jitter: the interarrival jitter
1667  * @lsr: the last SR packet from this source
1668  * @dlsr: the delay since last SR packet
1669  * @round_trip: the round trip time
1670  *
1671  * Get the values of the last RB report set with rtp_source_process_rb().
1672  *
1673  * Returns: %TRUE if there was a valid SB report.
1674  */
1675 gboolean
1676 rtp_source_get_last_rb (RTPSource * src, guint8 * fractionlost,
1677     gint32 * packetslost, guint32 * exthighestseq, guint32 * jitter,
1678     guint32 * lsr, guint32 * dlsr, guint32 * round_trip)
1679 {
1680   RTPReceiverReport *curr;
1681
1682   g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
1683
1684   curr = &src->stats.rr[src->stats.curr_rr];
1685   if (!curr->is_valid)
1686     return FALSE;
1687
1688   if (fractionlost)
1689     *fractionlost = curr->fractionlost;
1690   if (packetslost)
1691     *packetslost = curr->packetslost;
1692   if (exthighestseq)
1693     *exthighestseq = curr->exthighestseq;
1694   if (jitter)
1695     *jitter = curr->jitter;
1696   if (lsr)
1697     *lsr = curr->lsr;
1698   if (dlsr)
1699     *dlsr = curr->dlsr;
1700   if (round_trip)
1701     *round_trip = curr->round_trip;
1702
1703   return TRUE;
1704 }
1705
1706 /**
1707  * rtp_source_find_conflicting_address:
1708  * @src: The source the packet came in
1709  * @address: address to check for
1710  * @time: The time when the packet that is possibly in conflict arrived
1711  *
1712  * Checks if an address which has a conflict is already known. If it is
1713  * a known conflict, remember the time
1714  *
1715  * Returns: TRUE if it was a known conflict, FALSE otherwise
1716  */
1717 gboolean
1718 rtp_source_find_conflicting_address (RTPSource * src, GSocketAddress * address,
1719     GstClockTime time)
1720 {
1721   GList *item;
1722
1723   for (item = g_list_first (src->conflicting_addresses);
1724       item; item = g_list_next (item)) {
1725     RTPConflictingAddress *known_conflict = item->data;
1726
1727     if (__g_socket_address_equal (address, known_conflict->address)) {
1728       known_conflict->time = time;
1729       return TRUE;
1730     }
1731   }
1732
1733   return FALSE;
1734 }
1735
1736 /**
1737  * rtp_source_add_conflicting_address:
1738  * @src: The source the packet came in
1739  * @address: address to remember
1740  * @time: The time when the packet that is in conflict arrived
1741  *
1742  * Adds a new conflict address
1743  */
1744 void
1745 rtp_source_add_conflicting_address (RTPSource * src,
1746     GSocketAddress * address, GstClockTime time)
1747 {
1748   RTPConflictingAddress *new_conflict;
1749
1750   new_conflict = g_new0 (RTPConflictingAddress, 1);
1751
1752   new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address));
1753   new_conflict->time = time;
1754
1755   src->conflicting_addresses = g_list_prepend (src->conflicting_addresses,
1756       new_conflict);
1757 }
1758
1759 /**
1760  * rtp_source_timeout:
1761  * @src: The #RTPSource
1762  * @current_time: The current time
1763  * @collision_timeout: The amount of time after which a collision is timed out
1764  * @feedback_retention_window: The running time before which retained feedback
1765  * packets have to be discarded
1766  *
1767  * This is processed on each RTCP interval. It times out old collisions.
1768  * It also times out old retained feedback packets
1769  */
1770 void
1771 rtp_source_timeout (RTPSource * src, GstClockTime current_time,
1772     GstClockTime collision_timeout, GstClockTime feedback_retention_window)
1773 {
1774   GList *item;
1775   GstRTCPPacket *pkt;
1776
1777   item = g_list_first (src->conflicting_addresses);
1778   while (item) {
1779     RTPConflictingAddress *known_conflict = item->data;
1780     GList *next_item = g_list_next (item);
1781
1782     if (known_conflict->time < current_time - collision_timeout) {
1783       gchar *buf;
1784
1785       src->conflicting_addresses =
1786           g_list_delete_link (src->conflicting_addresses, item);
1787       buf = __g_socket_address_to_string (known_conflict->address);
1788       GST_DEBUG ("collision %p timed out: %s", known_conflict, buf);
1789       g_free (buf);
1790       g_object_unref (known_conflict->address);
1791       g_free (known_conflict);
1792     }
1793     item = next_item;
1794   }
1795
1796   /* Time out AVPF packets that are older than the desired length */
1797   while ((pkt = g_queue_peek_tail (src->retained_feedback)) &&
1798       GST_BUFFER_TIMESTAMP (pkt) < feedback_retention_window)
1799     gst_buffer_unref (g_queue_pop_tail (src->retained_feedback));
1800 }
1801
1802 static gint
1803 compare_buffers (gconstpointer a, gconstpointer b, gpointer user_data)
1804 {
1805   const GstBuffer *bufa = a;
1806   const GstBuffer *bufb = b;
1807
1808   return GST_BUFFER_TIMESTAMP (bufa) - GST_BUFFER_TIMESTAMP (bufb);
1809 }
1810
1811 void
1812 rtp_source_retain_rtcp_packet (RTPSource * src, GstRTCPPacket * packet,
1813     GstClockTime running_time)
1814 {
1815   GstBuffer *buffer;
1816
1817   buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY,
1818       packet->offset, (gst_rtcp_packet_get_length (packet) + 1) * 4);
1819
1820   GST_BUFFER_TIMESTAMP (buffer) = running_time;
1821
1822   g_queue_insert_sorted (src->retained_feedback, buffer, compare_buffers, NULL);
1823 }
1824
1825 gboolean
1826 rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data)
1827 {
1828   if (g_queue_find_custom (src->retained_feedback, data, func))
1829     return TRUE;
1830   else
1831     return FALSE;
1832 }