gstrtpsession: improve stats about rtx requests
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpsession
22  * @see_also: rtpjitterbuffer, rtpbin, rtpptdemux, rtpssrcdemux
23  *
24  * The RTP session manager models participants with unique SSRC in an RTP
25  * session. This session can be used to send and receive RTP and RTCP packets.
26  * Based on what REQUEST pads are requested from the session manager, specific
27  * functionality can be activated.
28  *
29  * The session manager currently implements RFC 3550 including:
30  * <itemizedlist>
31  *   <listitem>
32  *     <para>RTP packet validation based on consecutive sequence numbers.</para>
33  *   </listitem>
34  *   <listitem>
35  *     <para>Maintainance of the SSRC participant database.</para>
36  *   </listitem>
37  *   <listitem>
38  *     <para>Keeping per participant statistics based on received RTCP packets.</para>
39  *   </listitem>
40  *   <listitem>
41  *     <para>Scheduling of RR/SR RTCP packets.</para>
42  *   </listitem>
43  *   <listitem>
44  *     <para>Support for multiple sender SSRC.</para>
45  *   </listitem>
46  * </itemizedlist>
47  *
48  * The rtpsession will not demux packets based on SSRC or payload type, nor will
49  * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
50  * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
51  * perform these tasks. It is usually a good idea to use #GstRtpBin, which
52  * combines all these features in one element.
53  *
54  * To use #GstRtpSession as an RTP receiver, request a recv_rtp_sink pad, which will
55  * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
56  * will be processed in the session and after being validated forwarded on the
57  * recv_rtp_src pad.
58  *
59  * To also use #GstRtpSession as an RTCP receiver, request a recv_rtcp_sink pad,
60  * which will automatically create a sync_src pad. Packets received on the RTCP
61  * pad will be used by the session manager to update the stats and database of
62  * the other participants. SR packets will be forwarded on the sync_src pad
63  * so that they can be used to perform inter-stream synchronisation when needed.
64  *
65  * If you want the session manager to generate and send RTCP packets, request
66  * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
67  * that should be sent to all participants in the session.
68  *
69  * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
70  * automatically create a send_rtp_src pad. The session manager will
71  * forward the packets on the send_rtp_src pad after updating its internal state.
72  *
73  * The session manager needs the clock-rate of the payload types it is handling
74  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
75  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
76  * signal.
77  *
78  * <refsect2>
79  * <title>Example pipelines</title>
80  * |[
81  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
82  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
83  * decoder and display. Note that the application/x-rtp caps on udpsrc should be
84  * configured based on some negotiation process such as RTSP for this pipeline
85  * to work correctly.
86  * |[
87  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession name=session \
88  *        .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
89  *     udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
90  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
91  * decoder and display. Receive RTCP packets from port 5001 and process them in
92  * the session manager.
93  * Note that the application/x-rtp caps on udpsrc should be
94  * configured based on some negotiation process such as RTSP for this pipeline
95  * to work correctly.
96  * |[
97  * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession .send_rtp_src ! udpsink port=5000
98  * ]| Send theora RTP packets through the session manager and out on UDP port
99  * 5000.
100  * |[
101  * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession name=session .send_rtp_src \
102  *     ! udpsink port=5000  session.send_rtcp_src ! udpsink port=5001
103  * ]| Send theora RTP packets through the session manager and out on UDP port
104  * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll
105  * correctly because the second udpsink will not preroll correctly (no RTCP
106  * packets are sent in the PAUSED state). Applications should manually set and
107  * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
108  * </refsect2>
109  */
110
111 #ifdef HAVE_CONFIG_H
112 #include "config.h"
113 #endif
114
115 #include <gst/rtp/gstrtpbuffer.h>
116
117 #include <gst/glib-compat-private.h>
118
119 #include "gstrtpsession.h"
120 #include "rtpsession.h"
121
122 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
123 #define GST_CAT_DEFAULT gst_rtp_session_debug
124
125 GType
126 gst_rtp_ntp_time_source_get_type (void)
127 {
128   static GType type = 0;
129   static const GEnumValue values[] = {
130     {GST_RTP_NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
131     {GST_RTP_NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
132     {GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME,
133           "Running time based on pipeline clock",
134         "running-time"},
135     {GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
136     {0, NULL, NULL},
137   };
138
139   if (!type) {
140     type = g_enum_register_static ("GstRtpNtpTimeSource", values);
141   }
142   return type;
143 }
144
145 /* sink pads */
146 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
147 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
148     GST_PAD_SINK,
149     GST_PAD_REQUEST,
150     GST_STATIC_CAPS ("application/x-rtp")
151     );
152
153 static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
154 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
155     GST_PAD_SINK,
156     GST_PAD_REQUEST,
157     GST_STATIC_CAPS ("application/x-rtcp")
158     );
159
160 static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
162     GST_PAD_SINK,
163     GST_PAD_REQUEST,
164     GST_STATIC_CAPS ("application/x-rtp")
165     );
166
167 /* src pads */
168 static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
169 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
170     GST_PAD_SRC,
171     GST_PAD_SOMETIMES,
172     GST_STATIC_CAPS ("application/x-rtp")
173     );
174
175 static GstStaticPadTemplate rtpsession_sync_src_template =
176 GST_STATIC_PAD_TEMPLATE ("sync_src",
177     GST_PAD_SRC,
178     GST_PAD_SOMETIMES,
179     GST_STATIC_CAPS ("application/x-rtcp")
180     );
181
182 static GstStaticPadTemplate rtpsession_send_rtp_src_template =
183 GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
184     GST_PAD_SRC,
185     GST_PAD_SOMETIMES,
186     GST_STATIC_CAPS ("application/x-rtp")
187     );
188
189 static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
190 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
191     GST_PAD_SRC,
192     GST_PAD_REQUEST,
193     GST_STATIC_CAPS ("application/x-rtcp")
194     );
195
196 /* signals and args */
197 enum
198 {
199   SIGNAL_REQUEST_PT_MAP,
200   SIGNAL_CLEAR_PT_MAP,
201
202   SIGNAL_ON_NEW_SSRC,
203   SIGNAL_ON_SSRC_COLLISION,
204   SIGNAL_ON_SSRC_VALIDATED,
205   SIGNAL_ON_SSRC_ACTIVE,
206   SIGNAL_ON_SSRC_SDES,
207   SIGNAL_ON_BYE_SSRC,
208   SIGNAL_ON_BYE_TIMEOUT,
209   SIGNAL_ON_TIMEOUT,
210   SIGNAL_ON_SENDER_TIMEOUT,
211   SIGNAL_ON_NEW_SENDER_SSRC,
212   SIGNAL_ON_SENDER_SSRC_ACTIVE,
213   LAST_SIGNAL
214 };
215
216 #define DEFAULT_BANDWIDTH            0
217 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
218 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
219 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
220 #define DEFAULT_SDES                 NULL
221 #define DEFAULT_NUM_SOURCES          0
222 #define DEFAULT_NUM_ACTIVE_SOURCES   0
223 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
224 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
225 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
226 #define DEFAULT_MAX_DROPOUT_TIME     60000
227 #define DEFAULT_MAX_MISORDER_TIME    2000
228 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
229 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
230 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
231
232 enum
233 {
234   PROP_0,
235   PROP_BANDWIDTH,
236   PROP_RTCP_FRACTION,
237   PROP_RTCP_RR_BANDWIDTH,
238   PROP_RTCP_RS_BANDWIDTH,
239   PROP_SDES,
240   PROP_NUM_SOURCES,
241   PROP_NUM_ACTIVE_SOURCES,
242   PROP_INTERNAL_SESSION,
243   PROP_USE_PIPELINE_CLOCK,
244   PROP_RTCP_MIN_INTERVAL,
245   PROP_PROBATION,
246   PROP_MAX_DROPOUT_TIME,
247   PROP_MAX_MISORDER_TIME,
248   PROP_STATS,
249   PROP_RTP_PROFILE,
250   PROP_NTP_TIME_SOURCE,
251   PROP_RTCP_SYNC_SEND_TIME
252 };
253
254 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->priv->lock)
255 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock)
256
257 #define GST_RTP_SESSION_WAIT(sess)   g_cond_wait (&(sess)->priv->cond, &(sess)->priv->lock)
258 #define GST_RTP_SESSION_SIGNAL(sess) g_cond_signal (&(sess)->priv->cond)
259
260 struct _GstRtpSessionPrivate
261 {
262   GMutex lock;
263   GCond cond;
264   GstClock *sysclock;
265
266   RTPSession *session;
267
268   /* thread for sending out RTCP */
269   GstClockID id;
270   gboolean stop_thread;
271   GThread *thread;
272   gboolean thread_stopped;
273   gboolean wait_send;
274
275   /* caps mapping */
276   GHashTable *ptmap;
277
278   GstClockTime send_latency;
279
280   gboolean use_pipeline_clock;
281   GstRtpNtpTimeSource ntp_time_source;
282   gboolean rtcp_sync_send_time;
283
284   guint recv_rtx_req_count;
285   guint sent_rtx_req_count;
286 };
287
288 /* callbacks to handle actions from the session manager */
289 static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
290     RTPSource * src, GstBuffer * buffer, gpointer user_data);
291 static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
292     RTPSource * src, gpointer data, gpointer user_data);
293 static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
294     RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
295 static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
296     GstBuffer * buffer, gpointer user_data);
297 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
298     gpointer user_data);
299 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
300 static void gst_rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
301     gboolean all_headers, gpointer user_data);
302 static GstClockTime gst_rtp_session_request_time (RTPSession * session,
303     gpointer user_data);
304 static void gst_rtp_session_notify_nack (RTPSession * sess,
305     guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
306 static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
307 static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
308     gpointer user_data);
309
310 static RTPSessionCallbacks callbacks = {
311   gst_rtp_session_process_rtp,
312   gst_rtp_session_send_rtp,
313   gst_rtp_session_sync_rtcp,
314   gst_rtp_session_send_rtcp,
315   gst_rtp_session_clock_rate,
316   gst_rtp_session_reconsider,
317   gst_rtp_session_request_key_unit,
318   gst_rtp_session_request_time,
319   gst_rtp_session_notify_nack,
320   gst_rtp_session_reconfigure,
321   gst_rtp_session_notify_early_rtcp
322 };
323
324 /* GObject vmethods */
325 static void gst_rtp_session_finalize (GObject * object);
326 static void gst_rtp_session_set_property (GObject * object, guint prop_id,
327     const GValue * value, GParamSpec * pspec);
328 static void gst_rtp_session_get_property (GObject * object, guint prop_id,
329     GValue * value, GParamSpec * pspec);
330
331 /* GstElement vmethods */
332 static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
333     GstStateChange transition);
334 static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
335     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
336 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
337
338 static gboolean gst_rtp_session_sink_setcaps (GstPad * pad,
339     GstRtpSession * rtpsession, GstCaps * caps);
340 static gboolean gst_rtp_session_setcaps_send_rtp (GstPad * pad,
341     GstRtpSession * rtpsession, GstCaps * caps);
342
343 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
344
345 static GstStructure *gst_rtp_session_create_stats (GstRtpSession * rtpsession);
346
347 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
348
349 static void
350 on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
351 {
352   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
353       src->ssrc);
354 }
355
356 static void
357 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
358 {
359   GstPad *send_rtp_sink;
360
361   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
362       src->ssrc);
363
364   GST_RTP_SESSION_LOCK (sess);
365   if ((send_rtp_sink = sess->send_rtp_sink))
366     gst_object_ref (send_rtp_sink);
367   GST_RTP_SESSION_UNLOCK (sess);
368
369   if (send_rtp_sink) {
370     GstStructure *structure;
371     GstEvent *event;
372     RTPSource *internal_src;
373     guint32 suggested_ssrc;
374
375     structure = gst_structure_new ("GstRTPCollision", "ssrc", G_TYPE_UINT,
376         (guint) src->ssrc, NULL);
377
378     /* if there is no source using the suggested ssrc, most probably because
379      * this ssrc has just collided, suggest upstream to use it */
380     suggested_ssrc = rtp_session_suggest_ssrc (session, NULL);
381     internal_src = rtp_session_get_source_by_ssrc (session, suggested_ssrc);
382     if (!internal_src)
383       gst_structure_set (structure, "suggested-ssrc", G_TYPE_UINT,
384           (guint) suggested_ssrc, NULL);
385     else
386       g_object_unref (internal_src);
387
388     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure);
389     gst_pad_push_event (send_rtp_sink, event);
390     gst_object_unref (send_rtp_sink);
391   }
392 }
393
394 static void
395 on_ssrc_validated (RTPSession * session, RTPSource * src, GstRtpSession * sess)
396 {
397   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
398       src->ssrc);
399 }
400
401 static void
402 on_ssrc_active (RTPSession * session, RTPSource * src, GstRtpSession * sess)
403 {
404   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
405       src->ssrc);
406 }
407
408 static void
409 on_ssrc_sdes (RTPSession * session, RTPSource * src, GstRtpSession * sess)
410 {
411   GstStructure *s;
412   GstMessage *m;
413
414   /* convert the new SDES info into a message */
415   RTP_SESSION_LOCK (session);
416   g_object_get (src, "sdes", &s, NULL);
417   RTP_SESSION_UNLOCK (session);
418
419   m = gst_message_new_custom (GST_MESSAGE_ELEMENT, GST_OBJECT (sess), s);
420   gst_element_post_message (GST_ELEMENT_CAST (sess), m);
421
422   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0,
423       src->ssrc);
424 }
425
426 static void
427 on_bye_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
428 {
429   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
430       src->ssrc);
431 }
432
433 static void
434 on_bye_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
435 {
436   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
437       src->ssrc);
438 }
439
440 static void
441 on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
442 {
443   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
444       src->ssrc);
445 }
446
447 static void
448 on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
449 {
450   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
451       src->ssrc);
452 }
453
454 static void
455 on_new_sender_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
456 {
457   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
458       src->ssrc);
459 }
460
461 static void
462 on_sender_ssrc_active (RTPSession * session, RTPSource * src,
463     GstRtpSession * sess)
464 {
465   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
466       src->ssrc);
467 }
468
469 static void
470 on_notify_stats (RTPSession * session, GParamSpec * spec,
471     GstRtpSession * rtpsession)
472 {
473   g_object_notify (G_OBJECT (rtpsession), "stats");
474 }
475
476 #define gst_rtp_session_parent_class parent_class
477 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
478
479 static void
480 gst_rtp_session_class_init (GstRtpSessionClass * klass)
481 {
482   GObjectClass *gobject_class;
483   GstElementClass *gstelement_class;
484
485   gobject_class = (GObjectClass *) klass;
486   gstelement_class = (GstElementClass *) klass;
487
488   gobject_class->finalize = gst_rtp_session_finalize;
489   gobject_class->set_property = gst_rtp_session_set_property;
490   gobject_class->get_property = gst_rtp_session_get_property;
491
492   /**
493    * GstRtpSession::request-pt-map:
494    * @sess: the object which received the signal
495    * @pt: the pt
496    *
497    * Request the payload type as #GstCaps for @pt.
498    */
499   gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
500       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
501       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
502       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 1, G_TYPE_UINT);
503   /**
504    * GstRtpSession::clear-pt-map:
505    * @sess: the object which received the signal
506    *
507    * Clear the cached pt-maps requested with #GstRtpSession::request-pt-map.
508    */
509   gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
510       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
511       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
512       G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
513       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0, G_TYPE_NONE);
514
515   /**
516    * GstRtpSession::on-new-ssrc:
517    * @sess: the object which received the signal
518    * @ssrc: the SSRC
519    *
520    * Notify of a new SSRC that entered @session.
521    */
522   gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
523       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
524       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
525       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
526   /**
527    * GstRtpSession::on-ssrc_collision:
528    * @sess: the object which received the signal
529    * @ssrc: the SSRC
530    *
531    * Notify when we have an SSRC collision
532    */
533   gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
534       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
535       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
536           on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
537       G_TYPE_NONE, 1, G_TYPE_UINT);
538   /**
539    * GstRtpSession::on-ssrc_validated:
540    * @sess: the object which received the signal
541    * @ssrc: the SSRC
542    *
543    * Notify of a new SSRC that became validated.
544    */
545   gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
546       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
547       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
548           on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
549       G_TYPE_NONE, 1, G_TYPE_UINT);
550   /**
551    * GstRtpSession::on-ssrc-active:
552    * @sess: the object which received the signal
553    * @ssrc: the SSRC
554    *
555    * Notify of a SSRC that is active, i.e., sending RTCP.
556    */
557   gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
558       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
559       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
560           on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
561       G_TYPE_NONE, 1, G_TYPE_UINT);
562   /**
563    * GstRtpSession::on-ssrc-sdes:
564    * @session: the object which received the signal
565    * @src: the SSRC
566    *
567    * Notify that a new SDES was received for SSRC.
568    */
569   gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
570       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
571       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_ssrc_sdes),
572       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
573
574   /**
575    * GstRtpSession::on-bye-ssrc:
576    * @sess: the object which received the signal
577    * @ssrc: the SSRC
578    *
579    * Notify of an SSRC that became inactive because of a BYE packet.
580    */
581   gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
582       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
583       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
584       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
585   /**
586    * GstRtpSession::on-bye-timeout:
587    * @sess: the object which received the signal
588    * @ssrc: the SSRC
589    *
590    * Notify of an SSRC that has timed out because of BYE
591    */
592   gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
593       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
594       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
595       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
596   /**
597    * GstRtpSession::on-timeout:
598    * @sess: the object which received the signal
599    * @ssrc: the SSRC
600    *
601    * Notify of an SSRC that has timed out
602    */
603   gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
604       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
605       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
606       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
607   /**
608    * GstRtpSession::on-sender-timeout:
609    * @sess: the object which received the signal
610    * @ssrc: the SSRC
611    *
612    * Notify of a sender SSRC that has timed out and became a receiver
613    */
614   gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
615       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
616       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
617           on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
618       G_TYPE_NONE, 1, G_TYPE_UINT);
619
620   /**
621    * GstRtpSession::on-new-sender-ssrc:
622    * @sess: the object which received the signal
623    * @ssrc: the sender SSRC
624    *
625    * Notify of a new sender SSRC that entered @session.
626    *
627    * Since: 1.8
628    */
629   gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
630       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
631       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
632       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
633
634   /**
635    * GstRtpSession::on-sender-ssrc-active:
636    * @sess: the object which received the signal
637    * @ssrc: the sender SSRC
638    *
639    * Notify of a sender SSRC that is active, i.e., sending RTCP.
640    *
641    * Since: 1.8
642    */
643   gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
644       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
645       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
646           on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
647       G_TYPE_NONE, 1, G_TYPE_UINT);
648
649   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
650       g_param_spec_double ("bandwidth", "Bandwidth",
651           "The bandwidth of the session in bytes per second (0 for auto-discover)",
652           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
653           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
654
655   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
656       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
657           "The RTCP bandwidth of the session in bytes per second "
658           "(or as a real fraction of the RTP bandwidth if < 1.0)",
659           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
660           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
661
662   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
663       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
664           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
665           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
666           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
667
668   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
669       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
670           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
671           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
672           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
673
674   g_object_class_install_property (gobject_class, PROP_SDES,
675       g_param_spec_boxed ("sdes", "SDES",
676           "The SDES items of this session",
677           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
678
679   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
680       g_param_spec_uint ("num-sources", "Num Sources",
681           "The number of sources in the session", 0, G_MAXUINT,
682           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
683
684   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
685       g_param_spec_uint ("num-active-sources", "Num Active Sources",
686           "The number of active sources in the session", 0, G_MAXUINT,
687           DEFAULT_NUM_ACTIVE_SOURCES,
688           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
689
690   g_object_class_install_property (gobject_class, PROP_INTERNAL_SESSION,
691       g_param_spec_object ("internal-session", "Internal Session",
692           "The internal RTPSession object", RTP_TYPE_SESSION,
693           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
694
695   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
696       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
697           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
698           "(DEPRECATED: Use ntp-time-source property)",
699           DEFAULT_USE_PIPELINE_CLOCK,
700           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
701
702   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
703       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
704           "Minimum interval between Regular RTCP packet (in ns)",
705           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
706           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707
708   g_object_class_install_property (gobject_class, PROP_PROBATION,
709       g_param_spec_uint ("probation", "Number of probations",
710           "Consecutive packet sequence numbers to accept the source",
711           0, G_MAXUINT, DEFAULT_PROBATION,
712           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
713
714   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
715       g_param_spec_uint ("max-dropout-time", "Max dropout time",
716           "The maximum time (milliseconds) of missing packets tolerated.",
717           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
718           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
719
720   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
721       g_param_spec_uint ("max-misorder-time", "Max misorder time",
722           "The maximum time (milliseconds) of misordered packets tolerated.",
723           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
724           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
725
726   /**
727    * GstRtpSession::stats:
728    *
729    * Various session statistics. This property returns a GstStructure
730    * with name application/x-rtp-session-stats with the following fields:
731    *
732    *  "recv-rtx-req-count  G_TYPE_UINT   The number of retransmission event
733    *      received from downstream (in receiver mode) (Since 1.16)
734    *  "sent-rtx-req-count" G_TYPE_UINT   The number of retransmission event
735    *      sent downstream (in sender mode) (Since 1.16)
736    *  "rtx-count"          G_TYPE_UINT   DEPRECATED Since 1.16, same as
737    *      "recv-rtx-req-count".
738    *  "rtx-drop-count"     G_TYPE_UINT   The number of retransmission events
739    *      dropped (due to bandwidth constraints)
740    *  "sent-nack-count"    G_TYPE_UINT   Number of NACKs sent
741    *  "recv-nack-count"    G_TYPE_UINT   Number of NACKs received
742    *  "source-stats"       G_TYPE_BOXED  GValueArray of #RTPSource::stats for all
743    *      RTP sources (Since 1.8)
744    *
745    * Since: 1.4
746    */
747   g_object_class_install_property (gobject_class, PROP_STATS,
748       g_param_spec_boxed ("stats", "Statistics",
749           "Various statistics", GST_TYPE_STRUCTURE,
750           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
751
752   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
753       g_param_spec_enum ("rtp-profile", "RTP Profile",
754           "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
755           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
756
757   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
758       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
759           "NTP time source for RTCP packets",
760           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
761           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
762
763   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
764       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
765           "Use send time or capture time for RTCP sync "
766           "(TRUE = send time, FALSE = capture time)",
767           DEFAULT_RTCP_SYNC_SEND_TIME,
768           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
769
770   gstelement_class->change_state =
771       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
772   gstelement_class->request_new_pad =
773       GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
774   gstelement_class->release_pad =
775       GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
776
777   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
778
779   /* sink pads */
780   gst_element_class_add_static_pad_template (gstelement_class,
781       &rtpsession_recv_rtp_sink_template);
782   gst_element_class_add_static_pad_template (gstelement_class,
783       &rtpsession_recv_rtcp_sink_template);
784   gst_element_class_add_static_pad_template (gstelement_class,
785       &rtpsession_send_rtp_sink_template);
786
787   /* src pads */
788   gst_element_class_add_static_pad_template (gstelement_class,
789       &rtpsession_recv_rtp_src_template);
790   gst_element_class_add_static_pad_template (gstelement_class,
791       &rtpsession_sync_src_template);
792   gst_element_class_add_static_pad_template (gstelement_class,
793       &rtpsession_send_rtp_src_template);
794   gst_element_class_add_static_pad_template (gstelement_class,
795       &rtpsession_send_rtcp_src_template);
796
797   gst_element_class_set_static_metadata (gstelement_class, "RTP Session",
798       "Filter/Network/RTP",
799       "Implement an RTP session", "Wim Taymans <wim.taymans@gmail.com>");
800
801   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
802       "rtpsession", 0, "RTP Session");
803 }
804
805 static void
806 gst_rtp_session_init (GstRtpSession * rtpsession)
807 {
808   rtpsession->priv = gst_rtp_session_get_instance_private (rtpsession);
809   g_mutex_init (&rtpsession->priv->lock);
810   g_cond_init (&rtpsession->priv->cond);
811   rtpsession->priv->sysclock = gst_system_clock_obtain ();
812   rtpsession->priv->session = rtp_session_new ();
813   rtpsession->priv->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
814   rtpsession->priv->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
815
816   /* configure callbacks */
817   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
818   /* configure signals */
819   g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
820       (GCallback) on_new_ssrc, rtpsession);
821   g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
822       (GCallback) on_ssrc_collision, rtpsession);
823   g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
824       (GCallback) on_ssrc_validated, rtpsession);
825   g_signal_connect (rtpsession->priv->session, "on-ssrc-active",
826       (GCallback) on_ssrc_active, rtpsession);
827   g_signal_connect (rtpsession->priv->session, "on-ssrc-sdes",
828       (GCallback) on_ssrc_sdes, rtpsession);
829   g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
830       (GCallback) on_bye_ssrc, rtpsession);
831   g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
832       (GCallback) on_bye_timeout, rtpsession);
833   g_signal_connect (rtpsession->priv->session, "on-timeout",
834       (GCallback) on_timeout, rtpsession);
835   g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
836       (GCallback) on_sender_timeout, rtpsession);
837   g_signal_connect (rtpsession->priv->session, "on-new-sender-ssrc",
838       (GCallback) on_new_sender_ssrc, rtpsession);
839   g_signal_connect (rtpsession->priv->session, "on-sender-ssrc-active",
840       (GCallback) on_sender_ssrc_active, rtpsession);
841   g_signal_connect (rtpsession->priv->session, "notify::stats",
842       (GCallback) on_notify_stats, rtpsession);
843   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
844       (GDestroyNotify) gst_caps_unref);
845
846   rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
847
848   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
849   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
850
851   rtpsession->priv->thread_stopped = TRUE;
852
853   rtpsession->priv->recv_rtx_req_count = 0;
854   rtpsession->priv->sent_rtx_req_count = 0;
855
856   rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
857 }
858
859 static void
860 gst_rtp_session_finalize (GObject * object)
861 {
862   GstRtpSession *rtpsession;
863
864   rtpsession = GST_RTP_SESSION (object);
865
866   g_hash_table_destroy (rtpsession->priv->ptmap);
867   g_mutex_clear (&rtpsession->priv->lock);
868   g_cond_clear (&rtpsession->priv->cond);
869   g_object_unref (rtpsession->priv->sysclock);
870   g_object_unref (rtpsession->priv->session);
871
872   G_OBJECT_CLASS (parent_class)->finalize (object);
873 }
874
875 static void
876 gst_rtp_session_set_property (GObject * object, guint prop_id,
877     const GValue * value, GParamSpec * pspec)
878 {
879   GstRtpSession *rtpsession;
880   GstRtpSessionPrivate *priv;
881
882   rtpsession = GST_RTP_SESSION (object);
883   priv = rtpsession->priv;
884
885   switch (prop_id) {
886     case PROP_BANDWIDTH:
887       g_object_set_property (G_OBJECT (priv->session), "bandwidth", value);
888       break;
889     case PROP_RTCP_FRACTION:
890       g_object_set_property (G_OBJECT (priv->session), "rtcp-fraction", value);
891       break;
892     case PROP_RTCP_RR_BANDWIDTH:
893       g_object_set_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
894           value);
895       break;
896     case PROP_RTCP_RS_BANDWIDTH:
897       g_object_set_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
898           value);
899       break;
900     case PROP_SDES:
901       rtp_session_set_sdes_struct (priv->session, g_value_get_boxed (value));
902       break;
903     case PROP_USE_PIPELINE_CLOCK:
904       priv->use_pipeline_clock = g_value_get_boolean (value);
905       break;
906     case PROP_RTCP_MIN_INTERVAL:
907       g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval",
908           value);
909       break;
910     case PROP_PROBATION:
911       g_object_set_property (G_OBJECT (priv->session), "probation", value);
912       break;
913     case PROP_MAX_DROPOUT_TIME:
914       g_object_set_property (G_OBJECT (priv->session), "max-dropout-time",
915           value);
916       break;
917     case PROP_MAX_MISORDER_TIME:
918       g_object_set_property (G_OBJECT (priv->session), "max-misorder-time",
919           value);
920       break;
921     case PROP_RTP_PROFILE:
922       g_object_set_property (G_OBJECT (priv->session), "rtp-profile", value);
923       break;
924     case PROP_NTP_TIME_SOURCE:
925       priv->ntp_time_source = g_value_get_enum (value);
926       break;
927     case PROP_RTCP_SYNC_SEND_TIME:
928       priv->rtcp_sync_send_time = g_value_get_boolean (value);
929       break;
930     default:
931       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
932       break;
933   }
934 }
935
936 static void
937 gst_rtp_session_get_property (GObject * object, guint prop_id,
938     GValue * value, GParamSpec * pspec)
939 {
940   GstRtpSession *rtpsession;
941   GstRtpSessionPrivate *priv;
942
943   rtpsession = GST_RTP_SESSION (object);
944   priv = rtpsession->priv;
945
946   switch (prop_id) {
947     case PROP_BANDWIDTH:
948       g_object_get_property (G_OBJECT (priv->session), "bandwidth", value);
949       break;
950     case PROP_RTCP_FRACTION:
951       g_object_get_property (G_OBJECT (priv->session), "rtcp-fraction", value);
952       break;
953     case PROP_RTCP_RR_BANDWIDTH:
954       g_object_get_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
955           value);
956       break;
957     case PROP_RTCP_RS_BANDWIDTH:
958       g_object_get_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
959           value);
960       break;
961     case PROP_SDES:
962       g_value_take_boxed (value, rtp_session_get_sdes_struct (priv->session));
963       break;
964     case PROP_NUM_SOURCES:
965       g_value_set_uint (value, rtp_session_get_num_sources (priv->session));
966       break;
967     case PROP_NUM_ACTIVE_SOURCES:
968       g_value_set_uint (value,
969           rtp_session_get_num_active_sources (priv->session));
970       break;
971     case PROP_INTERNAL_SESSION:
972       g_value_set_object (value, priv->session);
973       break;
974     case PROP_USE_PIPELINE_CLOCK:
975       g_value_set_boolean (value, priv->use_pipeline_clock);
976       break;
977     case PROP_RTCP_MIN_INTERVAL:
978       g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval",
979           value);
980       break;
981     case PROP_PROBATION:
982       g_object_get_property (G_OBJECT (priv->session), "probation", value);
983       break;
984     case PROP_MAX_DROPOUT_TIME:
985       g_object_get_property (G_OBJECT (priv->session), "max-dropout-time",
986           value);
987       break;
988     case PROP_MAX_MISORDER_TIME:
989       g_object_get_property (G_OBJECT (priv->session), "max-misorder-time",
990           value);
991       break;
992     case PROP_STATS:
993       g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
994       break;
995     case PROP_RTP_PROFILE:
996       g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
997       break;
998     case PROP_NTP_TIME_SOURCE:
999       g_value_set_enum (value, priv->ntp_time_source);
1000       break;
1001     case PROP_RTCP_SYNC_SEND_TIME:
1002       g_value_set_boolean (value, priv->rtcp_sync_send_time);
1003       break;
1004     default:
1005       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1006       break;
1007   }
1008 }
1009
1010 static GstStructure *
1011 gst_rtp_session_create_stats (GstRtpSession * rtpsession)
1012 {
1013   GstStructure *s;
1014
1015   g_object_get (rtpsession->priv->session, "stats", &s, NULL);
1016   gst_structure_set (s, "rtx-count", G_TYPE_UINT,
1017       rtpsession->priv->recv_rtx_req_count, "recv-rtx-req-count", G_TYPE_UINT,
1018       rtpsession->priv->recv_rtx_req_count, "sent-rtx-req-count", G_TYPE_UINT,
1019       rtpsession->priv->sent_rtx_req_count, NULL);
1020
1021   return s;
1022 }
1023
1024 static void
1025 get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
1026     guint64 * ntpnstime)
1027 {
1028   guint64 ntpns = -1;
1029   GstClock *clock;
1030   GstClockTime base_time, rt, clock_time;
1031
1032   GST_OBJECT_LOCK (rtpsession);
1033   if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
1034     base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
1035     gst_object_ref (clock);
1036     GST_OBJECT_UNLOCK (rtpsession);
1037
1038     /* get current clock time and convert to running time */
1039     clock_time = gst_clock_get_time (clock);
1040     rt = clock_time - base_time;
1041
1042     if (rtpsession->priv->use_pipeline_clock) {
1043       ntpns = rt;
1044       /* add constant to convert from 1970 based time to 1900 based time */
1045       ntpns += (2208988800LL * GST_SECOND);
1046     } else {
1047       switch (rtpsession->priv->ntp_time_source) {
1048         case GST_RTP_NTP_TIME_SOURCE_NTP:
1049         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1050           GTimeVal current;
1051
1052           /* get current NTP time */
1053           g_get_current_time (&current);
1054           ntpns = GST_TIMEVAL_TO_TIME (current);
1055
1056           /* add constant to convert from 1970 based time to 1900 based time */
1057           if (rtpsession->priv->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1058             ntpns += (2208988800LL * GST_SECOND);
1059           break;
1060         }
1061         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1062           ntpns = rt;
1063           break;
1064         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1065           ntpns = clock_time;
1066           break;
1067         default:
1068           ntpns = -1;
1069           g_assert_not_reached ();
1070           break;
1071       }
1072     }
1073
1074     gst_object_unref (clock);
1075   } else {
1076     GST_OBJECT_UNLOCK (rtpsession);
1077     rt = -1;
1078     ntpns = -1;
1079   }
1080   if (running_time)
1081     *running_time = rt;
1082   if (ntpnstime)
1083     *ntpnstime = ntpns;
1084 }
1085
1086 /* must be called with GST_RTP_SESSION_LOCK */
1087 static void
1088 signal_waiting_rtcp_thread_unlocked (GstRtpSession * rtpsession)
1089 {
1090   if (rtpsession->priv->wait_send) {
1091     GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1092     rtpsession->priv->wait_send = FALSE;
1093     GST_RTP_SESSION_SIGNAL (rtpsession);
1094   }
1095 }
1096
1097 static void
1098 rtcp_thread (GstRtpSession * rtpsession)
1099 {
1100   GstClockID id;
1101   GstClockTime current_time;
1102   GstClockTime next_timeout;
1103   guint64 ntpnstime;
1104   GstClockTime running_time;
1105   RTPSession *session;
1106   GstClock *sysclock;
1107
1108   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
1109
1110   GST_RTP_SESSION_LOCK (rtpsession);
1111
1112   while (rtpsession->priv->wait_send) {
1113     GST_LOG_OBJECT (rtpsession, "waiting for getting started");
1114     GST_RTP_SESSION_WAIT (rtpsession);
1115     GST_LOG_OBJECT (rtpsession, "signaled...");
1116   }
1117
1118   sysclock = rtpsession->priv->sysclock;
1119   current_time = gst_clock_get_time (sysclock);
1120
1121   session = rtpsession->priv->session;
1122
1123   GST_DEBUG_OBJECT (rtpsession, "starting at %" GST_TIME_FORMAT,
1124       GST_TIME_ARGS (current_time));
1125   session->start_time = current_time;
1126
1127   while (!rtpsession->priv->stop_thread) {
1128     GstClockReturn res;
1129
1130     /* get initial estimate */
1131     next_timeout = rtp_session_next_timeout (session, current_time);
1132
1133     GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
1134         GST_TIME_ARGS (next_timeout));
1135
1136     /* leave if no more timeouts, the session ended */
1137     if (next_timeout == GST_CLOCK_TIME_NONE)
1138       break;
1139
1140     id = rtpsession->priv->id =
1141         gst_clock_new_single_shot_id (sysclock, next_timeout);
1142     GST_RTP_SESSION_UNLOCK (rtpsession);
1143
1144     res = gst_clock_id_wait (id, NULL);
1145
1146     GST_RTP_SESSION_LOCK (rtpsession);
1147     gst_clock_id_unref (id);
1148     rtpsession->priv->id = NULL;
1149
1150     if (rtpsession->priv->stop_thread)
1151       break;
1152
1153     /* update current time */
1154     current_time = gst_clock_get_time (sysclock);
1155
1156     /* get current NTP time */
1157     get_current_times (rtpsession, &running_time, &ntpnstime);
1158
1159     /* we get unlocked because we need to perform reconsideration, don't perform
1160      * the timeout but get a new reporting estimate. */
1161     GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
1162         res, GST_TIME_ARGS (current_time));
1163
1164     /* perform actions, we ignore result. Release lock because it might push. */
1165     GST_RTP_SESSION_UNLOCK (rtpsession);
1166     rtp_session_on_timeout (session, current_time, ntpnstime, running_time);
1167     GST_RTP_SESSION_LOCK (rtpsession);
1168   }
1169   /* mark the thread as stopped now */
1170   rtpsession->priv->thread_stopped = TRUE;
1171   GST_RTP_SESSION_UNLOCK (rtpsession);
1172
1173   GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
1174 }
1175
1176 static gboolean
1177 start_rtcp_thread (GstRtpSession * rtpsession)
1178 {
1179   GError *error = NULL;
1180   gboolean res;
1181
1182   GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
1183
1184   GST_RTP_SESSION_LOCK (rtpsession);
1185   rtpsession->priv->stop_thread = FALSE;
1186   if (rtpsession->priv->thread_stopped) {
1187     /* if the thread stopped, and we still have a handle to the thread, join it
1188      * now. We can safely join with the lock held, the thread will not take it
1189      * anymore. */
1190     if (rtpsession->priv->thread)
1191       g_thread_join (rtpsession->priv->thread);
1192     /* only create a new thread if the old one was stopped. Otherwise we can
1193      * just reuse the currently running one. */
1194     rtpsession->priv->thread = g_thread_try_new ("rtpsession-rtcp-thread",
1195         (GThreadFunc) rtcp_thread, rtpsession, &error);
1196     rtpsession->priv->thread_stopped = FALSE;
1197   }
1198   GST_RTP_SESSION_UNLOCK (rtpsession);
1199
1200   if (error != NULL) {
1201     res = FALSE;
1202     GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
1203     g_error_free (error);
1204   } else {
1205     res = TRUE;
1206   }
1207   return res;
1208 }
1209
1210 static void
1211 stop_rtcp_thread (GstRtpSession * rtpsession)
1212 {
1213   GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
1214
1215   GST_RTP_SESSION_LOCK (rtpsession);
1216   rtpsession->priv->stop_thread = TRUE;
1217   signal_waiting_rtcp_thread_unlocked (rtpsession);
1218   if (rtpsession->priv->id)
1219     gst_clock_id_unschedule (rtpsession->priv->id);
1220   GST_RTP_SESSION_UNLOCK (rtpsession);
1221 }
1222
1223 static void
1224 join_rtcp_thread (GstRtpSession * rtpsession)
1225 {
1226   GST_RTP_SESSION_LOCK (rtpsession);
1227   /* don't try to join when we have no thread */
1228   if (rtpsession->priv->thread != NULL) {
1229     GST_DEBUG_OBJECT (rtpsession, "joining RTCP thread");
1230     GST_RTP_SESSION_UNLOCK (rtpsession);
1231
1232     g_thread_join (rtpsession->priv->thread);
1233
1234     GST_RTP_SESSION_LOCK (rtpsession);
1235     /* after the join, take the lock and clear the thread structure. The caller
1236      * is supposed to not concurrently call start and join. */
1237     rtpsession->priv->thread = NULL;
1238   }
1239   GST_RTP_SESSION_UNLOCK (rtpsession);
1240 }
1241
1242 static GstStateChangeReturn
1243 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
1244 {
1245   GstStateChangeReturn res;
1246   GstRtpSession *rtpsession;
1247
1248   rtpsession = GST_RTP_SESSION (element);
1249
1250   switch (transition) {
1251     case GST_STATE_CHANGE_NULL_TO_READY:
1252       break;
1253     case GST_STATE_CHANGE_READY_TO_PAUSED:
1254       GST_RTP_SESSION_LOCK (rtpsession);
1255       rtpsession->priv->wait_send = TRUE;
1256       GST_RTP_SESSION_UNLOCK (rtpsession);
1257       break;
1258     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1259       break;
1260     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1261     case GST_STATE_CHANGE_PAUSED_TO_READY:
1262       /* no need to join yet, we might want to continue later. Also, the
1263        * dataflow could block downstream so that a join could just block
1264        * forever. */
1265       stop_rtcp_thread (rtpsession);
1266       break;
1267     default:
1268       break;
1269   }
1270
1271   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1272
1273   switch (transition) {
1274     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1275       if (!start_rtcp_thread (rtpsession))
1276         goto failed_thread;
1277       break;
1278     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1279       break;
1280     case GST_STATE_CHANGE_PAUSED_TO_READY:
1281       /* downstream is now releasing the dataflow and we can join. */
1282       join_rtcp_thread (rtpsession);
1283       rtp_session_reset (rtpsession->priv->session);
1284       break;
1285     case GST_STATE_CHANGE_READY_TO_NULL:
1286       break;
1287     default:
1288       break;
1289   }
1290   return res;
1291
1292   /* ERRORS */
1293 failed_thread:
1294   {
1295     return GST_STATE_CHANGE_FAILURE;
1296   }
1297 }
1298
1299 static gboolean
1300 return_true (gpointer key, gpointer value, gpointer user_data)
1301 {
1302   return TRUE;
1303 }
1304
1305 static void
1306 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
1307 {
1308   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
1309 }
1310
1311 /* called when the session manager has an RTP packet or a list of packets
1312  * ready for further processing */
1313 static GstFlowReturn
1314 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
1315     GstBuffer * buffer, gpointer user_data)
1316 {
1317   GstFlowReturn result;
1318   GstRtpSession *rtpsession;
1319   GstPad *rtp_src;
1320
1321   rtpsession = GST_RTP_SESSION (user_data);
1322
1323   GST_RTP_SESSION_LOCK (rtpsession);
1324   if ((rtp_src = rtpsession->recv_rtp_src))
1325     gst_object_ref (rtp_src);
1326   GST_RTP_SESSION_UNLOCK (rtpsession);
1327
1328   if (rtp_src) {
1329     GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
1330     result = gst_pad_push (rtp_src, buffer);
1331     gst_object_unref (rtp_src);
1332   } else {
1333     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
1334     gst_buffer_unref (buffer);
1335     result = GST_FLOW_OK;
1336   }
1337   return result;
1338 }
1339
1340 /* called when the session manager has an RTP packet ready for further
1341  * sending */
1342 static GstFlowReturn
1343 gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
1344     gpointer data, gpointer user_data)
1345 {
1346   GstFlowReturn result;
1347   GstRtpSession *rtpsession;
1348   GstPad *rtp_src;
1349
1350   rtpsession = GST_RTP_SESSION (user_data);
1351
1352   GST_RTP_SESSION_LOCK (rtpsession);
1353   if ((rtp_src = rtpsession->send_rtp_src))
1354     gst_object_ref (rtp_src);
1355   signal_waiting_rtcp_thread_unlocked (rtpsession);
1356   GST_RTP_SESSION_UNLOCK (rtpsession);
1357
1358   if (rtp_src) {
1359     if (GST_IS_BUFFER (data)) {
1360       GST_LOG_OBJECT (rtpsession, "sending RTP packet");
1361       result = gst_pad_push (rtp_src, GST_BUFFER_CAST (data));
1362     } else {
1363       GST_LOG_OBJECT (rtpsession, "sending RTP list");
1364       result = gst_pad_push_list (rtp_src, GST_BUFFER_LIST_CAST (data));
1365     }
1366     gst_object_unref (rtp_src);
1367   } else {
1368     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1369     result = GST_FLOW_OK;
1370   }
1371   return result;
1372 }
1373
1374 static void
1375 do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
1376 {
1377   GstCaps *caps;
1378   GstSegment seg;
1379   GstEvent *event;
1380   gchar *stream_id;
1381   gboolean have_group_id;
1382   guint group_id;
1383
1384   stream_id =
1385       g_strdup_printf ("%08x%08x%08x%08x", g_random_int (), g_random_int (),
1386       g_random_int (), g_random_int ());
1387
1388   GST_RTP_SESSION_LOCK (rtpsession);
1389   if (rtpsession->recv_rtp_sink) {
1390     event =
1391         gst_pad_get_sticky_event (rtpsession->recv_rtp_sink,
1392         GST_EVENT_STREAM_START, 0);
1393     if (event) {
1394       if (gst_event_parse_group_id (event, &group_id))
1395         have_group_id = TRUE;
1396       else
1397         have_group_id = FALSE;
1398       gst_event_unref (event);
1399     } else {
1400       have_group_id = TRUE;
1401       group_id = gst_util_group_id_next ();
1402     }
1403   } else {
1404     have_group_id = TRUE;
1405     group_id = gst_util_group_id_next ();
1406   }
1407   GST_RTP_SESSION_UNLOCK (rtpsession);
1408
1409   event = gst_event_new_stream_start (stream_id);
1410   rtpsession->recv_rtcp_segment_seqnum = gst_event_get_seqnum (event);
1411   gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
1412   if (have_group_id)
1413     gst_event_set_group_id (event, group_id);
1414   gst_pad_push_event (srcpad, event);
1415   g_free (stream_id);
1416
1417   caps = gst_caps_new_empty_simple ("application/x-rtcp");
1418   gst_pad_set_caps (srcpad, caps);
1419   gst_caps_unref (caps);
1420
1421   gst_segment_init (&seg, GST_FORMAT_TIME);
1422   event = gst_event_new_segment (&seg);
1423   gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
1424   gst_pad_push_event (srcpad, event);
1425 }
1426
1427 /* called when the session manager has an RTCP packet ready for further
1428  * sending. The eos flag is set when an EOS event should be sent downstream as
1429  * well. */
1430 static GstFlowReturn
1431 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
1432     GstBuffer * buffer, gboolean all_sources_bye, gpointer user_data)
1433 {
1434   GstFlowReturn result;
1435   GstRtpSession *rtpsession;
1436   GstPad *rtcp_src;
1437
1438   rtpsession = GST_RTP_SESSION (user_data);
1439
1440   GST_RTP_SESSION_LOCK (rtpsession);
1441   if (rtpsession->priv->stop_thread)
1442     goto stopping;
1443
1444   if ((rtcp_src = rtpsession->send_rtcp_src)) {
1445     gst_object_ref (rtcp_src);
1446     GST_RTP_SESSION_UNLOCK (rtpsession);
1447
1448     /* set rtcp caps on output pad */
1449     if (!gst_pad_has_current_caps (rtcp_src))
1450       do_rtcp_events (rtpsession, rtcp_src);
1451
1452     GST_LOG_OBJECT (rtpsession, "sending RTCP");
1453     result = gst_pad_push (rtcp_src, buffer);
1454
1455     /* Forward send an EOS on the RTCP sink if we received an EOS on the
1456      * send_rtp_sink. We don't need to check the recv_rtp_sink since in this
1457      * case the EOS event would already have been sent */
1458     if (all_sources_bye && rtpsession->send_rtp_sink &&
1459         GST_PAD_IS_EOS (rtpsession->send_rtp_sink)) {
1460       GstEvent *event;
1461
1462       GST_LOG_OBJECT (rtpsession, "sending EOS");
1463
1464       event = gst_event_new_eos ();
1465       gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
1466       gst_pad_push_event (rtcp_src, event);
1467     }
1468     gst_object_unref (rtcp_src);
1469   } else {
1470     GST_RTP_SESSION_UNLOCK (rtpsession);
1471
1472     GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
1473     gst_buffer_unref (buffer);
1474     result = GST_FLOW_OK;
1475   }
1476   return result;
1477
1478   /* ERRORS */
1479 stopping:
1480   {
1481     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1482     gst_buffer_unref (buffer);
1483     GST_RTP_SESSION_UNLOCK (rtpsession);
1484     return GST_FLOW_OK;
1485   }
1486 }
1487
1488 /* called when the session manager has an SR RTCP packet ready for handling
1489  * inter stream synchronisation */
1490 static GstFlowReturn
1491 gst_rtp_session_sync_rtcp (RTPSession * sess,
1492     GstBuffer * buffer, gpointer user_data)
1493 {
1494   GstFlowReturn result;
1495   GstRtpSession *rtpsession;
1496   GstPad *sync_src;
1497
1498   rtpsession = GST_RTP_SESSION (user_data);
1499
1500   GST_RTP_SESSION_LOCK (rtpsession);
1501   if (rtpsession->priv->stop_thread)
1502     goto stopping;
1503
1504   if ((sync_src = rtpsession->sync_src)) {
1505     gst_object_ref (sync_src);
1506     GST_RTP_SESSION_UNLOCK (rtpsession);
1507
1508     /* set rtcp caps on output pad, this happens
1509      * when we receive RTCP muxed with RTP according
1510      * to RFC5761. Otherwise we would have forwarded
1511      * the events from the recv_rtcp_sink pad already
1512      */
1513     if (!gst_pad_has_current_caps (sync_src))
1514       do_rtcp_events (rtpsession, sync_src);
1515
1516     GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
1517     result = gst_pad_push (sync_src, buffer);
1518     gst_object_unref (sync_src);
1519   } else {
1520     GST_RTP_SESSION_UNLOCK (rtpsession);
1521
1522     GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
1523     gst_buffer_unref (buffer);
1524     result = GST_FLOW_OK;
1525   }
1526   return result;
1527
1528   /* ERRORS */
1529 stopping:
1530   {
1531     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1532     gst_buffer_unref (buffer);
1533     GST_RTP_SESSION_UNLOCK (rtpsession);
1534     return GST_FLOW_OK;
1535   }
1536 }
1537
1538 static void
1539 gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
1540 {
1541   GstRtpSessionPrivate *priv;
1542   const GstStructure *s;
1543   gint payload;
1544
1545   priv = rtpsession->priv;
1546
1547   GST_DEBUG_OBJECT (rtpsession, "parsing caps");
1548
1549   s = gst_caps_get_structure (caps, 0);
1550   if (!gst_structure_get_int (s, "payload", &payload))
1551     return;
1552
1553   if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
1554     return;
1555
1556   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
1557       gst_caps_ref (caps));
1558 }
1559
1560 static GstCaps *
1561 gst_rtp_session_get_caps_for_pt (GstRtpSession * rtpsession, guint payload)
1562 {
1563   GstCaps *caps = NULL;
1564   GValue args[2] = { {0}, {0} };
1565   GValue ret = { 0 };
1566
1567   GST_RTP_SESSION_LOCK (rtpsession);
1568   caps = g_hash_table_lookup (rtpsession->priv->ptmap,
1569       GINT_TO_POINTER (payload));
1570   if (caps) {
1571     gst_caps_ref (caps);
1572     goto done;
1573   }
1574
1575   /* not found in the cache, try to get it with a signal */
1576   g_value_init (&args[0], GST_TYPE_ELEMENT);
1577   g_value_set_object (&args[0], rtpsession);
1578   g_value_init (&args[1], G_TYPE_UINT);
1579   g_value_set_uint (&args[1], payload);
1580
1581   g_value_init (&ret, GST_TYPE_CAPS);
1582   g_value_set_boxed (&ret, NULL);
1583
1584   GST_RTP_SESSION_UNLOCK (rtpsession);
1585
1586   g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
1587       &ret);
1588
1589   GST_RTP_SESSION_LOCK (rtpsession);
1590
1591   g_value_unset (&args[0]);
1592   g_value_unset (&args[1]);
1593   caps = (GstCaps *) g_value_dup_boxed (&ret);
1594   g_value_unset (&ret);
1595   if (!caps)
1596     goto no_caps;
1597
1598   gst_rtp_session_cache_caps (rtpsession, caps);
1599
1600 done:
1601   GST_RTP_SESSION_UNLOCK (rtpsession);
1602
1603   return caps;
1604
1605 no_caps:
1606   {
1607     GST_DEBUG_OBJECT (rtpsession, "could not get caps");
1608     goto done;
1609   }
1610 }
1611
1612 /* called when the session manager needs the clock rate */
1613 static gint
1614 gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
1615     gpointer user_data)
1616 {
1617   gint result = -1;
1618   GstRtpSession *rtpsession;
1619   GstCaps *caps;
1620   const GstStructure *s;
1621
1622   rtpsession = GST_RTP_SESSION_CAST (user_data);
1623
1624   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1625
1626   if (!caps)
1627     goto done;
1628
1629   s = gst_caps_get_structure (caps, 0);
1630   if (!gst_structure_get_int (s, "clock-rate", &result))
1631     goto no_clock_rate;
1632
1633   gst_caps_unref (caps);
1634
1635   GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
1636
1637 done:
1638
1639   return result;
1640
1641   /* ERRORS */
1642 no_clock_rate:
1643   {
1644     gst_caps_unref (caps);
1645     GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
1646     goto done;
1647   }
1648 }
1649
1650 /* called when the session manager asks us to reconsider the timeout */
1651 static void
1652 gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
1653 {
1654   GstRtpSession *rtpsession;
1655
1656   rtpsession = GST_RTP_SESSION_CAST (user_data);
1657
1658   GST_RTP_SESSION_LOCK (rtpsession);
1659   GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
1660   if (rtpsession->priv->id)
1661     gst_clock_id_unschedule (rtpsession->priv->id);
1662   GST_RTP_SESSION_UNLOCK (rtpsession);
1663 }
1664
1665 static gboolean
1666 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
1667     GstEvent * event)
1668 {
1669   GstRtpSession *rtpsession;
1670   gboolean ret = FALSE;
1671
1672   rtpsession = GST_RTP_SESSION (parent);
1673
1674   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1675       GST_EVENT_TYPE_NAME (event));
1676
1677   switch (GST_EVENT_TYPE (event)) {
1678     case GST_EVENT_CAPS:
1679     {
1680       GstCaps *caps;
1681
1682       /* process */
1683       gst_event_parse_caps (event, &caps);
1684       gst_rtp_session_sink_setcaps (pad, rtpsession, caps);
1685       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1686       break;
1687     }
1688     case GST_EVENT_FLUSH_STOP:
1689       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
1690       rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
1691       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1692       break;
1693     case GST_EVENT_SEGMENT:
1694     {
1695       GstSegment *segment, in_segment;
1696
1697       segment = &rtpsession->recv_rtp_seg;
1698
1699       /* the newsegment event is needed to convert the RTP timestamp to
1700        * running_time, which is needed to generate a mapping from RTP to NTP
1701        * timestamps in SR reports */
1702       gst_event_copy_segment (event, &in_segment);
1703       GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
1704           &in_segment);
1705
1706       /* accept upstream */
1707       gst_segment_copy_into (&in_segment, segment);
1708
1709       /* push event forward */
1710       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1711       break;
1712     }
1713     case GST_EVENT_EOS:
1714     {
1715       GstPad *rtcp_src;
1716
1717       ret =
1718           gst_pad_push_event (rtpsession->recv_rtp_src, gst_event_ref (event));
1719
1720       GST_RTP_SESSION_LOCK (rtpsession);
1721       if ((rtcp_src = rtpsession->send_rtcp_src))
1722         gst_object_ref (rtcp_src);
1723       GST_RTP_SESSION_UNLOCK (rtpsession);
1724
1725       gst_event_unref (event);
1726
1727       if (rtcp_src) {
1728         event = gst_event_new_eos ();
1729         if (rtpsession->recv_rtcp_segment_seqnum != GST_SEQNUM_INVALID)
1730           gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
1731         ret = gst_pad_push_event (rtcp_src, event);
1732         gst_object_unref (rtcp_src);
1733       } else {
1734         ret = TRUE;
1735       }
1736       break;
1737     }
1738     default:
1739       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1740       break;
1741   }
1742
1743   return ret;
1744
1745 }
1746
1747 static gboolean
1748 gst_rtp_session_request_remote_key_unit (GstRtpSession * rtpsession,
1749     guint32 ssrc, guint payload, gboolean all_headers, gint count)
1750 {
1751   GstCaps *caps;
1752
1753   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1754
1755   if (caps) {
1756     const GstStructure *s = gst_caps_get_structure (caps, 0);
1757     gboolean pli;
1758     gboolean fir;
1759
1760     pli = gst_structure_has_field (s, "rtcp-fb-nack-pli");
1761     fir = gst_structure_has_field (s, "rtcp-fb-ccm-fir") && all_headers;
1762
1763     /* Google Talk uses FIR for repair, so send it even if we just want a
1764      * regular PLI */
1765     if (!pli &&
1766         gst_structure_has_field (s, "rtcp-fb-x-gstreamer-fir-as-repair"))
1767       fir = TRUE;
1768
1769     gst_caps_unref (caps);
1770
1771     if (pli || fir)
1772       return rtp_session_request_key_unit (rtpsession->priv->session, ssrc,
1773           fir, count);
1774   }
1775
1776   return FALSE;
1777 }
1778
1779 static gboolean
1780 gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstObject * parent,
1781     GstEvent * event)
1782 {
1783   GstRtpSession *rtpsession;
1784   gboolean forward = TRUE;
1785   gboolean ret = TRUE;
1786   const GstStructure *s;
1787   guint32 ssrc;
1788   guint pt;
1789
1790   rtpsession = GST_RTP_SESSION (parent);
1791
1792   switch (GST_EVENT_TYPE (event)) {
1793     case GST_EVENT_CUSTOM_UPSTREAM:
1794       s = gst_event_get_structure (event);
1795       if (gst_structure_has_name (s, "GstForceKeyUnit") &&
1796           gst_structure_get_uint (s, "ssrc", &ssrc) &&
1797           gst_structure_get_uint (s, "payload", &pt)) {
1798         gboolean all_headers = FALSE;
1799         gint count = -1;
1800
1801         gst_structure_get_boolean (s, "all-headers", &all_headers);
1802         if (gst_structure_get_int (s, "count", &count) && count < 0)
1803           count += G_MAXINT;    /* Make sure count is positive if present */
1804         if (gst_rtp_session_request_remote_key_unit (rtpsession, ssrc, pt,
1805                 all_headers, count))
1806           forward = FALSE;
1807       } else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
1808         GstClockTime running_time;
1809         guint seqnum, delay, deadline, max_delay, avg_rtt;
1810
1811         GST_RTP_SESSION_LOCK (rtpsession);
1812         rtpsession->priv->recv_rtx_req_count++;
1813         GST_RTP_SESSION_UNLOCK (rtpsession);
1814
1815         if (!gst_structure_get_clock_time (s, "running-time", &running_time))
1816           running_time = -1;
1817         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
1818           ssrc = -1;
1819         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
1820           seqnum = -1;
1821         if (!gst_structure_get_uint (s, "delay", &delay))
1822           delay = 0;
1823         if (!gst_structure_get_uint (s, "deadline", &deadline))
1824           deadline = 100;
1825         if (!gst_structure_get_uint (s, "avg-rtt", &avg_rtt))
1826           avg_rtt = 40;
1827
1828         /* remaining time to receive the packet */
1829         max_delay = deadline;
1830         if (max_delay > delay)
1831           max_delay -= delay;
1832         /* estimated RTT */
1833         if (max_delay > avg_rtt)
1834           max_delay -= avg_rtt;
1835         else
1836           max_delay = 0;
1837
1838         if (rtp_session_request_nack (rtpsession->priv->session, ssrc, seqnum,
1839                 max_delay * GST_MSECOND))
1840           forward = FALSE;
1841       }
1842       break;
1843     default:
1844       break;
1845   }
1846
1847   if (forward) {
1848     GstPad *recv_rtp_sink;
1849
1850     GST_RTP_SESSION_LOCK (rtpsession);
1851     if ((recv_rtp_sink = rtpsession->recv_rtp_sink))
1852       gst_object_ref (recv_rtp_sink);
1853     GST_RTP_SESSION_UNLOCK (rtpsession);
1854
1855     if (recv_rtp_sink) {
1856       ret = gst_pad_push_event (recv_rtp_sink, event);
1857       gst_object_unref (recv_rtp_sink);
1858     } else
1859       gst_event_unref (event);
1860   } else {
1861     gst_event_unref (event);
1862   }
1863
1864   return ret;
1865 }
1866
1867
1868 static GstIterator *
1869 gst_rtp_session_iterate_internal_links (GstPad * pad, GstObject * parent)
1870 {
1871   GstRtpSession *rtpsession;
1872   GstPad *otherpad = NULL;
1873   GstIterator *it = NULL;
1874
1875   rtpsession = GST_RTP_SESSION (parent);
1876
1877   GST_RTP_SESSION_LOCK (rtpsession);
1878   if (pad == rtpsession->recv_rtp_src) {
1879     otherpad = gst_object_ref (rtpsession->recv_rtp_sink);
1880   } else if (pad == rtpsession->recv_rtp_sink) {
1881     otherpad = gst_object_ref (rtpsession->recv_rtp_src);
1882   } else if (pad == rtpsession->send_rtp_src) {
1883     otherpad = gst_object_ref (rtpsession->send_rtp_sink);
1884   } else if (pad == rtpsession->send_rtp_sink) {
1885     otherpad = gst_object_ref (rtpsession->send_rtp_src);
1886   }
1887   GST_RTP_SESSION_UNLOCK (rtpsession);
1888
1889   if (otherpad) {
1890     GValue val = { 0, };
1891
1892     g_value_init (&val, GST_TYPE_PAD);
1893     g_value_set_object (&val, otherpad);
1894     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1895     g_value_unset (&val);
1896     gst_object_unref (otherpad);
1897   } else {
1898     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1899   }
1900
1901   return it;
1902 }
1903
1904 static gboolean
1905 gst_rtp_session_sink_setcaps (GstPad * pad, GstRtpSession * rtpsession,
1906     GstCaps * caps)
1907 {
1908   GST_RTP_SESSION_LOCK (rtpsession);
1909   gst_rtp_session_cache_caps (rtpsession, caps);
1910   GST_RTP_SESSION_UNLOCK (rtpsession);
1911
1912   return TRUE;
1913 }
1914
1915 /* receive a packet from a sender, send it to the RTP session manager and
1916  * forward the packet on the rtp_src pad
1917  */
1918 static GstFlowReturn
1919 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent,
1920     GstBuffer * buffer)
1921 {
1922   GstRtpSession *rtpsession;
1923   GstRtpSessionPrivate *priv;
1924   GstFlowReturn ret;
1925   GstClockTime current_time, running_time;
1926   GstClockTime timestamp;
1927   guint64 ntpnstime;
1928
1929   rtpsession = GST_RTP_SESSION (parent);
1930   priv = rtpsession->priv;
1931
1932   GST_LOG_OBJECT (rtpsession, "received RTP packet");
1933
1934   GST_RTP_SESSION_LOCK (rtpsession);
1935   signal_waiting_rtcp_thread_unlocked (rtpsession);
1936   GST_RTP_SESSION_UNLOCK (rtpsession);
1937
1938   /* get NTP time when this packet was captured, this depends on the timestamp. */
1939   timestamp = GST_BUFFER_PTS (buffer);
1940   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1941     /* convert to running time using the segment values */
1942     running_time =
1943         gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
1944         timestamp);
1945     ntpnstime = GST_CLOCK_TIME_NONE;
1946   } else {
1947     get_current_times (rtpsession, &running_time, &ntpnstime);
1948   }
1949   current_time = gst_clock_get_time (priv->sysclock);
1950
1951   ret = rtp_session_process_rtp (priv->session, buffer, current_time,
1952       running_time, ntpnstime);
1953   if (ret != GST_FLOW_OK)
1954     goto push_error;
1955
1956 done:
1957
1958   return ret;
1959
1960   /* ERRORS */
1961 push_error:
1962   {
1963     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1964         gst_flow_get_name (ret));
1965     goto done;
1966   }
1967 }
1968
1969 static gboolean
1970 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
1971     GstEvent * event)
1972 {
1973   GstRtpSession *rtpsession;
1974   gboolean ret = FALSE;
1975
1976   rtpsession = GST_RTP_SESSION (parent);
1977
1978   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1979       GST_EVENT_TYPE_NAME (event));
1980
1981   switch (GST_EVENT_TYPE (event)) {
1982     case GST_EVENT_SEGMENT:
1983       /* Make sure that the sync_src pad has caps before the segment event.
1984        * Otherwise we might get a segment event before caps from the receive
1985        * RTCP pad, and then later when receiving RTCP packets will set caps.
1986        * This will results in a sticky event misordering warning
1987        */
1988       if (!gst_pad_has_current_caps (rtpsession->sync_src)) {
1989         GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtcp");
1990         gst_pad_set_caps (rtpsession->sync_src, caps);
1991         gst_caps_unref (caps);
1992       }
1993       /* fall through */
1994     default:
1995       ret = gst_pad_push_event (rtpsession->sync_src, event);
1996       break;
1997   }
1998
1999   return ret;
2000 }
2001
2002 /* Receive an RTCP packet from a sender, send it to the RTP session manager and
2003  * forward the SR packets to the sync_src pad.
2004  */
2005 static GstFlowReturn
2006 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
2007     GstBuffer * buffer)
2008 {
2009   GstRtpSession *rtpsession;
2010   GstRtpSessionPrivate *priv;
2011   GstClockTime current_time;
2012   GstClockTime running_time;
2013   guint64 ntpnstime;
2014
2015   rtpsession = GST_RTP_SESSION (parent);
2016   priv = rtpsession->priv;
2017
2018   GST_LOG_OBJECT (rtpsession, "received RTCP packet");
2019
2020   GST_RTP_SESSION_LOCK (rtpsession);
2021   signal_waiting_rtcp_thread_unlocked (rtpsession);
2022   GST_RTP_SESSION_UNLOCK (rtpsession);
2023
2024   current_time = gst_clock_get_time (priv->sysclock);
2025   get_current_times (rtpsession, &running_time, &ntpnstime);
2026
2027   rtp_session_process_rtcp (priv->session, buffer, current_time, running_time,
2028       ntpnstime);
2029
2030   return GST_FLOW_OK;           /* always return OK */
2031 }
2032
2033 static gboolean
2034 gst_rtp_session_query_send_rtcp_src (GstPad * pad, GstObject * parent,
2035     GstQuery * query)
2036 {
2037   GstRtpSession *rtpsession;
2038   gboolean ret = FALSE;
2039
2040   rtpsession = GST_RTP_SESSION (parent);
2041
2042   GST_DEBUG_OBJECT (rtpsession, "received QUERY %s",
2043       GST_QUERY_TYPE_NAME (query));
2044
2045   switch (GST_QUERY_TYPE (query)) {
2046     case GST_QUERY_LATENCY:
2047       ret = TRUE;
2048       /* use the defaults for the latency query. */
2049       gst_query_set_latency (query, FALSE, 0, -1);
2050       break;
2051     default:
2052       /* other queries simply fail for now */
2053       break;
2054   }
2055
2056   return ret;
2057 }
2058
2059 static gboolean
2060 gst_rtp_session_event_send_rtcp_src (GstPad * pad, GstObject * parent,
2061     GstEvent * event)
2062 {
2063   GstRtpSession *rtpsession;
2064   gboolean ret = TRUE;
2065
2066   rtpsession = GST_RTP_SESSION (parent);
2067   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2068       GST_EVENT_TYPE_NAME (event));
2069
2070   switch (GST_EVENT_TYPE (event)) {
2071     case GST_EVENT_SEEK:
2072     case GST_EVENT_LATENCY:
2073       gst_event_unref (event);
2074       ret = TRUE;
2075       break;
2076     default:
2077       /* other events simply fail for now */
2078       gst_event_unref (event);
2079       ret = FALSE;
2080       break;
2081   }
2082
2083   return ret;
2084 }
2085
2086
2087 static gboolean
2088 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent,
2089     GstEvent * event)
2090 {
2091   GstRtpSession *rtpsession;
2092   gboolean ret = FALSE;
2093
2094   rtpsession = GST_RTP_SESSION (parent);
2095
2096   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2097       GST_EVENT_TYPE_NAME (event));
2098
2099   switch (GST_EVENT_TYPE (event)) {
2100     case GST_EVENT_CAPS:
2101     {
2102       GstCaps *caps;
2103
2104       /* process */
2105       gst_event_parse_caps (event, &caps);
2106       gst_rtp_session_setcaps_send_rtp (pad, rtpsession, caps);
2107       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2108       break;
2109     }
2110     case GST_EVENT_FLUSH_STOP:
2111       gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
2112       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2113       break;
2114     case GST_EVENT_SEGMENT:{
2115       GstSegment *segment, in_segment;
2116
2117       segment = &rtpsession->send_rtp_seg;
2118
2119       /* the newsegment event is needed to convert the RTP timestamp to
2120        * running_time, which is needed to generate a mapping from RTP to NTP
2121        * timestamps in SR reports */
2122       gst_event_copy_segment (event, &in_segment);
2123       GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
2124           &in_segment);
2125
2126       /* accept upstream */
2127       gst_segment_copy_into (&in_segment, segment);
2128
2129       /* push event forward */
2130       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2131       break;
2132     }
2133     case GST_EVENT_EOS:{
2134       GstClockTime current_time;
2135
2136       /* push downstream FIXME, we are not supposed to leave the session just
2137        * because we stop sending. */
2138       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2139       current_time = gst_clock_get_time (rtpsession->priv->sysclock);
2140
2141       GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message");
2142       rtp_session_mark_all_bye (rtpsession->priv->session, "End Of Stream");
2143       rtp_session_schedule_bye (rtpsession->priv->session, current_time);
2144       break;
2145     }
2146     default:{
2147       GstPad *send_rtp_src;
2148
2149       GST_RTP_SESSION_LOCK (rtpsession);
2150       if ((send_rtp_src = rtpsession->send_rtp_src))
2151         gst_object_ref (send_rtp_src);
2152       GST_RTP_SESSION_UNLOCK (rtpsession);
2153
2154       if (send_rtp_src) {
2155         ret = gst_pad_push_event (send_rtp_src, event);
2156         gst_object_unref (send_rtp_src);
2157       } else
2158         gst_event_unref (event);
2159
2160       break;
2161     }
2162   }
2163
2164   return ret;
2165 }
2166
2167 static gboolean
2168 gst_rtp_session_event_send_rtp_src (GstPad * pad, GstObject * parent,
2169     GstEvent * event)
2170 {
2171   GstRtpSession *rtpsession;
2172   gboolean ret = FALSE;
2173
2174   rtpsession = GST_RTP_SESSION (parent);
2175
2176   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2177       GST_EVENT_TYPE_NAME (event));
2178
2179   switch (GST_EVENT_TYPE (event)) {
2180     case GST_EVENT_LATENCY:
2181       /* save the latency, we need this to know when an RTP packet will be
2182        * rendered by the sink */
2183       gst_event_parse_latency (event, &rtpsession->priv->send_latency);
2184
2185       ret = gst_pad_event_default (pad, parent, event);
2186       break;
2187     default:
2188       ret = gst_pad_event_default (pad, parent, event);
2189       break;
2190   }
2191   return ret;
2192 }
2193
2194 static GstCaps *
2195 gst_rtp_session_getcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2196     GstCaps * filter)
2197 {
2198   GstRtpSessionPrivate *priv;
2199   GstCaps *result;
2200   GstStructure *s1, *s2;
2201   guint ssrc;
2202   gboolean is_random;
2203
2204   priv = rtpsession->priv;
2205
2206   ssrc = rtp_session_suggest_ssrc (priv->session, &is_random);
2207
2208   /* we can basically accept anything but we prefer to receive packets with our
2209    * internal SSRC so that we don't have to patch it. Create a structure with
2210    * the SSRC and another one without.
2211    * Only do this if the session actually decided on an ssrc already,
2212    * otherwise we give upstream the opportunity to select an ssrc itself */
2213   if (!is_random) {
2214     s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc,
2215         NULL);
2216     s2 = gst_structure_new_empty ("application/x-rtp");
2217
2218     result = gst_caps_new_full (s1, s2, NULL);
2219   } else {
2220     result = gst_caps_new_empty_simple ("application/x-rtp");
2221   }
2222
2223   if (filter) {
2224     GstCaps *caps = result;
2225
2226     result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
2227     gst_caps_unref (caps);
2228   }
2229
2230   GST_DEBUG_OBJECT (rtpsession, "getting caps %" GST_PTR_FORMAT, result);
2231
2232   return result;
2233 }
2234
2235 static gboolean
2236 gst_rtp_session_query_send_rtp (GstPad * pad, GstObject * parent,
2237     GstQuery * query)
2238 {
2239   gboolean res = FALSE;
2240   GstRtpSession *rtpsession;
2241
2242   rtpsession = GST_RTP_SESSION (parent);
2243
2244   switch (GST_QUERY_TYPE (query)) {
2245     case GST_QUERY_CAPS:
2246     {
2247       GstCaps *filter, *caps;
2248
2249       gst_query_parse_caps (query, &filter);
2250       caps = gst_rtp_session_getcaps_send_rtp (pad, rtpsession, filter);
2251       gst_query_set_caps_result (query, caps);
2252       gst_caps_unref (caps);
2253       res = TRUE;
2254       break;
2255     }
2256     default:
2257       res = gst_pad_query_default (pad, parent, query);
2258       break;
2259   }
2260
2261   return res;
2262 }
2263
2264 static gboolean
2265 gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2266     GstCaps * caps)
2267 {
2268   GstRtpSessionPrivate *priv;
2269
2270   priv = rtpsession->priv;
2271
2272   rtp_session_update_send_caps (priv->session, caps);
2273
2274   return TRUE;
2275 }
2276
2277 /* Receive an RTP packet or a list of packets to be sent to the receivers,
2278  * send to RTP session manager and forward to send_rtp_src.
2279  */
2280 static GstFlowReturn
2281 gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
2282     gpointer data, gboolean is_list)
2283 {
2284   GstRtpSessionPrivate *priv;
2285   GstFlowReturn ret;
2286   GstClockTime timestamp, running_time;
2287   GstClockTime current_time;
2288
2289   priv = rtpsession->priv;
2290
2291   GST_LOG_OBJECT (rtpsession, "received RTP %s", is_list ? "list" : "packet");
2292
2293   /* get NTP time when this packet was captured, this depends on the timestamp. */
2294   if (is_list) {
2295     GstBuffer *buffer = NULL;
2296
2297     /* All groups in a list have the same timestamp.
2298      * So, just take it from the first group. */
2299     buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
2300     if (buffer)
2301       timestamp = GST_BUFFER_PTS (buffer);
2302     else
2303       timestamp = -1;
2304   } else {
2305     timestamp = GST_BUFFER_PTS (GST_BUFFER_CAST (data));
2306   }
2307
2308   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2309     /* convert to running time using the segment start value. */
2310     running_time =
2311         gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
2312         timestamp);
2313     if (priv->rtcp_sync_send_time)
2314       running_time += priv->send_latency;
2315   } else {
2316     /* no timestamp. */
2317     running_time = -1;
2318   }
2319
2320   current_time = gst_clock_get_time (priv->sysclock);
2321   ret = rtp_session_send_rtp (priv->session, data, is_list, current_time,
2322       running_time);
2323   if (ret != GST_FLOW_OK)
2324     goto push_error;
2325
2326 done:
2327
2328   return ret;
2329
2330   /* ERRORS */
2331 push_error:
2332   {
2333     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
2334         gst_flow_get_name (ret));
2335     goto done;
2336   }
2337 }
2338
2339 static GstFlowReturn
2340 gst_rtp_session_chain_send_rtp (GstPad * pad, GstObject * parent,
2341     GstBuffer * buffer)
2342 {
2343   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2344
2345   return gst_rtp_session_chain_send_rtp_common (rtpsession, buffer, FALSE);
2346 }
2347
2348 static GstFlowReturn
2349 gst_rtp_session_chain_send_rtp_list (GstPad * pad, GstObject * parent,
2350     GstBufferList * list)
2351 {
2352   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2353
2354   return gst_rtp_session_chain_send_rtp_common (rtpsession, list, TRUE);
2355 }
2356
2357 /* Create sinkpad to receive RTP packets from senders. This will also create a
2358  * srcpad for the RTP packets.
2359  */
2360 static GstPad *
2361 create_recv_rtp_sink (GstRtpSession * rtpsession)
2362 {
2363   GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
2364
2365   rtpsession->recv_rtp_sink =
2366       gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
2367       "recv_rtp_sink");
2368   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
2369       gst_rtp_session_chain_recv_rtp);
2370   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
2371       gst_rtp_session_event_recv_rtp_sink);
2372   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
2373       gst_rtp_session_iterate_internal_links);
2374   GST_PAD_SET_PROXY_ALLOCATION (rtpsession->recv_rtp_sink);
2375   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
2376   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2377       rtpsession->recv_rtp_sink);
2378
2379   GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
2380   rtpsession->recv_rtp_src =
2381       gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
2382       "recv_rtp_src");
2383   gst_pad_set_event_function (rtpsession->recv_rtp_src,
2384       gst_rtp_session_event_recv_rtp_src);
2385   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_src,
2386       gst_rtp_session_iterate_internal_links);
2387   gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
2388   gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
2389   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
2390
2391   return rtpsession->recv_rtp_sink;
2392 }
2393
2394 /* Remove sinkpad to receive RTP packets from senders. This will also remove
2395  * the srcpad for the RTP packets.
2396  */
2397 static void
2398 remove_recv_rtp_sink (GstRtpSession * rtpsession)
2399 {
2400   GST_DEBUG_OBJECT (rtpsession, "removing RTP sink pad");
2401
2402   /* deactivate from source to sink */
2403   gst_pad_set_active (rtpsession->recv_rtp_src, FALSE);
2404   gst_pad_set_active (rtpsession->recv_rtp_sink, FALSE);
2405
2406   /* remove pads */
2407   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2408       rtpsession->recv_rtp_sink);
2409   rtpsession->recv_rtp_sink = NULL;
2410
2411   GST_DEBUG_OBJECT (rtpsession, "removing RTP src pad");
2412   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2413       rtpsession->recv_rtp_src);
2414   rtpsession->recv_rtp_src = NULL;
2415 }
2416
2417 /* Create a sinkpad to receive RTCP messages from senders, this will also create a
2418  * sync_src pad for the SR packets.
2419  */
2420 static GstPad *
2421 create_recv_rtcp_sink (GstRtpSession * rtpsession)
2422 {
2423   GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
2424
2425   rtpsession->recv_rtcp_sink =
2426       gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
2427       "recv_rtcp_sink");
2428   gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
2429       gst_rtp_session_chain_recv_rtcp);
2430   gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
2431       gst_rtp_session_event_recv_rtcp_sink);
2432   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtcp_sink,
2433       gst_rtp_session_iterate_internal_links);
2434   gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
2435   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2436       rtpsession->recv_rtcp_sink);
2437
2438   GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
2439   rtpsession->sync_src =
2440       gst_pad_new_from_static_template (&rtpsession_sync_src_template,
2441       "sync_src");
2442   gst_pad_set_iterate_internal_links_function (rtpsession->sync_src,
2443       gst_rtp_session_iterate_internal_links);
2444   gst_pad_use_fixed_caps (rtpsession->sync_src);
2445   gst_pad_set_active (rtpsession->sync_src, TRUE);
2446   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2447
2448   return rtpsession->recv_rtcp_sink;
2449 }
2450
2451 static void
2452 remove_recv_rtcp_sink (GstRtpSession * rtpsession)
2453 {
2454   GST_DEBUG_OBJECT (rtpsession, "removing RTCP sink pad");
2455
2456   gst_pad_set_active (rtpsession->sync_src, FALSE);
2457   gst_pad_set_active (rtpsession->recv_rtcp_sink, FALSE);
2458
2459   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2460       rtpsession->recv_rtcp_sink);
2461   rtpsession->recv_rtcp_sink = NULL;
2462
2463   GST_DEBUG_OBJECT (rtpsession, "removing sync src pad");
2464   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2465   rtpsession->sync_src = NULL;
2466 }
2467
2468 /* Create a sinkpad to receive RTP packets for receivers. This will also create a
2469  * send_rtp_src pad.
2470  */
2471 static GstPad *
2472 create_send_rtp_sink (GstRtpSession * rtpsession)
2473 {
2474   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2475
2476   rtpsession->send_rtp_sink =
2477       gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
2478       "send_rtp_sink");
2479   gst_pad_set_chain_function (rtpsession->send_rtp_sink,
2480       gst_rtp_session_chain_send_rtp);
2481   gst_pad_set_chain_list_function (rtpsession->send_rtp_sink,
2482       gst_rtp_session_chain_send_rtp_list);
2483   gst_pad_set_query_function (rtpsession->send_rtp_sink,
2484       gst_rtp_session_query_send_rtp);
2485   gst_pad_set_event_function (rtpsession->send_rtp_sink,
2486       gst_rtp_session_event_send_rtp_sink);
2487   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_sink,
2488       gst_rtp_session_iterate_internal_links);
2489   GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_sink);
2490   GST_PAD_SET_PROXY_ALLOCATION (rtpsession->send_rtp_sink);
2491   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
2492   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2493       rtpsession->send_rtp_sink);
2494
2495   rtpsession->send_rtp_src =
2496       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
2497       "send_rtp_src");
2498   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_src,
2499       gst_rtp_session_iterate_internal_links);
2500   gst_pad_set_event_function (rtpsession->send_rtp_src,
2501       gst_rtp_session_event_send_rtp_src);
2502   GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_src);
2503   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
2504   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
2505
2506   return rtpsession->send_rtp_sink;
2507 }
2508
2509 static void
2510 remove_send_rtp_sink (GstRtpSession * rtpsession)
2511 {
2512   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2513
2514   gst_pad_set_active (rtpsession->send_rtp_src, FALSE);
2515   gst_pad_set_active (rtpsession->send_rtp_sink, FALSE);
2516
2517   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2518       rtpsession->send_rtp_sink);
2519   rtpsession->send_rtp_sink = NULL;
2520
2521   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2522       rtpsession->send_rtp_src);
2523   rtpsession->send_rtp_src = NULL;
2524 }
2525
2526 /* Create a srcpad with the RTCP packets to send out.
2527  * This pad will be driven by the RTP session manager when it wants to send out
2528  * RTCP packets.
2529  */
2530 static GstPad *
2531 create_send_rtcp_src (GstRtpSession * rtpsession)
2532 {
2533   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2534
2535   rtpsession->send_rtcp_src =
2536       gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
2537       "send_rtcp_src");
2538   gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
2539   gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
2540   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtcp_src,
2541       gst_rtp_session_iterate_internal_links);
2542   gst_pad_set_query_function (rtpsession->send_rtcp_src,
2543       gst_rtp_session_query_send_rtcp_src);
2544   gst_pad_set_event_function (rtpsession->send_rtcp_src,
2545       gst_rtp_session_event_send_rtcp_src);
2546   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2547       rtpsession->send_rtcp_src);
2548
2549   return rtpsession->send_rtcp_src;
2550 }
2551
2552 static void
2553 remove_send_rtcp_src (GstRtpSession * rtpsession)
2554 {
2555   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2556
2557   gst_pad_set_active (rtpsession->send_rtcp_src, FALSE);
2558
2559   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2560       rtpsession->send_rtcp_src);
2561   rtpsession->send_rtcp_src = NULL;
2562 }
2563
2564 static GstPad *
2565 gst_rtp_session_request_new_pad (GstElement * element,
2566     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2567 {
2568   GstRtpSession *rtpsession;
2569   GstElementClass *klass;
2570   GstPad *result;
2571
2572   g_return_val_if_fail (templ != NULL, NULL);
2573   g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
2574
2575   rtpsession = GST_RTP_SESSION (element);
2576   klass = GST_ELEMENT_GET_CLASS (element);
2577
2578   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
2579
2580   GST_RTP_SESSION_LOCK (rtpsession);
2581
2582   /* figure out the template */
2583   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
2584     if (rtpsession->recv_rtp_sink != NULL)
2585       goto exists;
2586
2587     result = create_recv_rtp_sink (rtpsession);
2588   } else if (templ == gst_element_class_get_pad_template (klass,
2589           "recv_rtcp_sink")) {
2590     if (rtpsession->recv_rtcp_sink != NULL)
2591       goto exists;
2592
2593     result = create_recv_rtcp_sink (rtpsession);
2594   } else if (templ == gst_element_class_get_pad_template (klass,
2595           "send_rtp_sink")) {
2596     if (rtpsession->send_rtp_sink != NULL)
2597       goto exists;
2598
2599     result = create_send_rtp_sink (rtpsession);
2600   } else if (templ == gst_element_class_get_pad_template (klass,
2601           "send_rtcp_src")) {
2602     if (rtpsession->send_rtcp_src != NULL)
2603       goto exists;
2604
2605     result = create_send_rtcp_src (rtpsession);
2606   } else
2607     goto wrong_template;
2608
2609   GST_RTP_SESSION_UNLOCK (rtpsession);
2610
2611   return result;
2612
2613   /* ERRORS */
2614 wrong_template:
2615   {
2616     GST_RTP_SESSION_UNLOCK (rtpsession);
2617     g_warning ("rtpsession: this is not our template");
2618     return NULL;
2619   }
2620 exists:
2621   {
2622     GST_RTP_SESSION_UNLOCK (rtpsession);
2623     g_warning ("rtpsession: pad already requested");
2624     return NULL;
2625   }
2626 }
2627
2628 static void
2629 gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
2630 {
2631   GstRtpSession *rtpsession;
2632
2633   g_return_if_fail (GST_IS_RTP_SESSION (element));
2634   g_return_if_fail (GST_IS_PAD (pad));
2635
2636   rtpsession = GST_RTP_SESSION (element);
2637
2638   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2639
2640   GST_RTP_SESSION_LOCK (rtpsession);
2641
2642   if (rtpsession->recv_rtp_sink == pad) {
2643     remove_recv_rtp_sink (rtpsession);
2644   } else if (rtpsession->recv_rtcp_sink == pad) {
2645     remove_recv_rtcp_sink (rtpsession);
2646   } else if (rtpsession->send_rtp_sink == pad) {
2647     remove_send_rtp_sink (rtpsession);
2648   } else if (rtpsession->send_rtcp_src == pad) {
2649     remove_send_rtcp_src (rtpsession);
2650   } else
2651     goto wrong_pad;
2652
2653   GST_RTP_SESSION_UNLOCK (rtpsession);
2654
2655   return;
2656
2657   /* ERRORS */
2658 wrong_pad:
2659   {
2660     GST_RTP_SESSION_UNLOCK (rtpsession);
2661     g_warning ("rtpsession: asked to release an unknown pad");
2662     return;
2663   }
2664 }
2665
2666 static void
2667 gst_rtp_session_request_key_unit (RTPSession * sess,
2668     guint32 ssrc, gboolean all_headers, gpointer user_data)
2669 {
2670   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2671   GstEvent *event;
2672   GstPad *send_rtp_sink;
2673
2674   GST_RTP_SESSION_LOCK (rtpsession);
2675   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2676     gst_object_ref (send_rtp_sink);
2677   GST_RTP_SESSION_UNLOCK (rtpsession);
2678
2679   if (send_rtp_sink) {
2680     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2681         gst_structure_new ("GstForceKeyUnit", "ssrc", G_TYPE_UINT, ssrc,
2682             "all-headers", G_TYPE_BOOLEAN, all_headers, NULL));
2683     gst_pad_push_event (send_rtp_sink, event);
2684     gst_object_unref (send_rtp_sink);
2685   }
2686 }
2687
2688 static GstClockTime
2689 gst_rtp_session_request_time (RTPSession * session, gpointer user_data)
2690 {
2691   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2692
2693   return gst_clock_get_time (rtpsession->priv->sysclock);
2694 }
2695
2696 static void
2697 gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
2698     guint16 blp, guint32 ssrc, gpointer user_data)
2699 {
2700   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2701   GstEvent *event;
2702   GstPad *send_rtp_sink;
2703
2704   GST_RTP_SESSION_LOCK (rtpsession);
2705   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2706     gst_object_ref (send_rtp_sink);
2707   GST_RTP_SESSION_UNLOCK (rtpsession);
2708
2709   if (send_rtp_sink) {
2710     while (TRUE) {
2711       event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2712           gst_structure_new ("GstRTPRetransmissionRequest",
2713               "seqnum", G_TYPE_UINT, (guint) seqnum,
2714               "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
2715       gst_pad_push_event (send_rtp_sink, event);
2716
2717       GST_RTP_SESSION_LOCK (rtpsession);
2718       rtpsession->priv->sent_rtx_req_count++;
2719       GST_RTP_SESSION_UNLOCK (rtpsession);
2720
2721       if (blp == 0)
2722         break;
2723
2724       seqnum++;
2725       while ((blp & 1) == 0) {
2726         seqnum++;
2727         blp >>= 1;
2728       }
2729       blp >>= 1;
2730     }
2731     gst_object_unref (send_rtp_sink);
2732   }
2733 }
2734
2735 static void
2736 gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
2737 {
2738   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2739   GstPad *send_rtp_sink;
2740
2741   GST_RTP_SESSION_LOCK (rtpsession);
2742   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2743     gst_object_ref (send_rtp_sink);
2744   GST_RTP_SESSION_UNLOCK (rtpsession);
2745
2746   if (send_rtp_sink) {
2747     gst_pad_push_event (send_rtp_sink, gst_event_new_reconfigure ());
2748     gst_object_unref (send_rtp_sink);
2749   }
2750 }
2751
2752 static void
2753 gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data)
2754 {
2755   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2756
2757   GST_DEBUG_OBJECT (rtpsession, "Notified of early RTCP");
2758   /* with an early RTCP request, we might have to start the RTCP thread */
2759   GST_RTP_SESSION_LOCK (rtpsession);
2760   signal_waiting_rtcp_thread_unlocked (rtpsession);
2761   GST_RTP_SESSION_UNLOCK (rtpsession);
2762 }