net: GST_EXPORT -> GST_NET_API
[platform/upstream/gstreamer.git] / libs / gst / net / gstnetclientclock.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2005 Andy Wingo <wingo@pobox.com>
5  * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
7  *
8  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
9  * the network
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION:gstnetclientclock
28  * @title: GstNetClientClock
29  * @short_description: Special clock that synchronizes to a remote time
30  *                     provider.
31  * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
32  *
33  * #GstNetClientClock implements a custom #GstClock that synchronizes its time
34  * to a remote time provider such as #GstNetTimeProvider. #GstNtpClock
35  * implements a #GstClock that synchronizes its time to a remote NTPv4 server.
36  *
37  * A new clock is created with gst_net_client_clock_new() or
38  * gst_ntp_clock_new(), which takes the address and port of the remote time
39  * provider along with a name and an initial time.
40  *
41  * This clock will poll the time provider and will update its calibration
42  * parameters based on the local and remote observations.
43  *
44  * The "round-trip" property limits the maximum round trip packets can take.
45  *
46  * Various parameters of the clock can be configured with the parent #GstClock
47  * "timeout", "window-size" and "window-threshold" object properties.
48  *
49  * A #GstNetClientClock and #GstNtpClock is typically set on a #GstPipeline with
50  * gst_pipeline_use_clock().
51  *
52  * If you set a #GstBus on the clock via the "bus" object property, it will
53  * send @GST_MESSAGE_ELEMENT messages with an attached #GstStructure containing
54  * statistics about clock accuracy and network traffic.
55  */
56
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
60
61 #include "gstnettimepacket.h"
62 #include "gstntppacket.h"
63 #include "gstnetclientclock.h"
64 #include "gstnetutils.h"
65
66 #include <gio/gio.h>
67
68 #include <string.h>
69
70 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
71 #define GST_CAT_DEFAULT (ncc_debug)
72
73 typedef struct
74 {
75   GstClock *clock;              /* GstNetClientInternalClock */
76
77   GList *clocks;                /* GstNetClientClocks */
78
79   GstClockID remove_id;
80 } ClockCache;
81
82 G_LOCK_DEFINE_STATIC (clocks_lock);
83 static GList *clocks = NULL;
84
85 #define GST_TYPE_NET_CLIENT_INTERNAL_CLOCK \
86   (gst_net_client_internal_clock_get_type())
87 #define GST_NET_CLIENT_INTERNAL_CLOCK(obj) \
88   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClock))
89 #define GST_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
90   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClockClass))
91 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK(obj) \
92   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
93 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
94   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
95
96 typedef struct _GstNetClientInternalClock GstNetClientInternalClock;
97 typedef struct _GstNetClientInternalClockClass GstNetClientInternalClockClass;
98
99 G_GNUC_INTERNAL GType gst_net_client_internal_clock_get_type (void);
100
101 #define DEFAULT_ADDRESS         "127.0.0.1"
102 #define DEFAULT_PORT            5637
103 #define DEFAULT_TIMEOUT         GST_SECOND
104 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
105 /* Minimum timeout will be immediately (ie, as fast as one RTT), but no
106  * more often than 1/20th second (arbitrarily, to spread observations a little) */
107 #define DEFAULT_MINIMUM_UPDATE_INTERVAL (GST_SECOND / 20)
108 #define DEFAULT_BASE_TIME       0
109 #define DEFAULT_QOS_DSCP        -1
110
111 /* Maximum number of clock updates we can skip before updating */
112 #define MAX_SKIPPED_UPDATES 5
113
114 #define MEDIAN_PRE_FILTERING_WINDOW 9
115
116 enum
117 {
118   PROP_0,
119   PROP_ADDRESS,
120   PROP_PORT,
121   PROP_ROUNDTRIP_LIMIT,
122   PROP_MINIMUM_UPDATE_INTERVAL,
123   PROP_BUS,
124   PROP_BASE_TIME,
125   PROP_INTERNAL_CLOCK,
126   PROP_IS_NTP,
127   PROP_QOS_DSCP
128 };
129
130 struct _GstNetClientInternalClock
131 {
132   GstSystemClock clock;
133
134   GThread *thread;
135
136   GSocket *socket;
137   GSocketAddress *servaddr;
138   GCancellable *cancel;
139   gboolean made_cancel_fd;
140
141   GstClockTime timeout_expiration;
142   GstClockTime roundtrip_limit;
143   GstClockTime rtt_avg;
144   GstClockTime minimum_update_interval;
145   GstClockTime last_remote_poll_interval;
146   guint skipped_updates;
147   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
148   gint last_rtts_missing;
149
150   gchar *address;
151   gint port;
152   gboolean is_ntp;
153   gint qos_dscp;
154
155   /* Protected by OBJECT_LOCK */
156   GList *busses;
157 };
158
159 struct _GstNetClientInternalClockClass
160 {
161   GstSystemClockClass parent_class;
162 };
163
164 #define _do_init \
165   GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
166 G_DEFINE_TYPE_WITH_CODE (GstNetClientInternalClock,
167     gst_net_client_internal_clock, GST_TYPE_SYSTEM_CLOCK, _do_init);
168
169 static void gst_net_client_internal_clock_finalize (GObject * object);
170 static void gst_net_client_internal_clock_set_property (GObject * object,
171     guint prop_id, const GValue * value, GParamSpec * pspec);
172 static void gst_net_client_internal_clock_get_property (GObject * object,
173     guint prop_id, GValue * value, GParamSpec * pspec);
174 static void gst_net_client_internal_clock_constructed (GObject * object);
175
176 static gboolean gst_net_client_internal_clock_start (GstNetClientInternalClock *
177     self);
178 static void gst_net_client_internal_clock_stop (GstNetClientInternalClock *
179     self);
180
181 static void
182 gst_net_client_internal_clock_class_init (GstNetClientInternalClockClass *
183     klass)
184 {
185   GObjectClass *gobject_class;
186
187   gobject_class = G_OBJECT_CLASS (klass);
188
189   gobject_class->finalize = gst_net_client_internal_clock_finalize;
190   gobject_class->get_property = gst_net_client_internal_clock_get_property;
191   gobject_class->set_property = gst_net_client_internal_clock_set_property;
192   gobject_class->constructed = gst_net_client_internal_clock_constructed;
193
194   g_object_class_install_property (gobject_class, PROP_ADDRESS,
195       g_param_spec_string ("address", "address",
196           "The IP address of the machine providing a time server",
197           DEFAULT_ADDRESS,
198           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
199   g_object_class_install_property (gobject_class, PROP_PORT,
200       g_param_spec_int ("port", "port",
201           "The port on which the remote server is listening", 0, G_MAXUINT16,
202           DEFAULT_PORT,
203           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
204   g_object_class_install_property (gobject_class, PROP_IS_NTP,
205       g_param_spec_boolean ("is-ntp", "Is NTP",
206           "The clock is using the NTPv4 protocol", FALSE,
207           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
208 }
209
210 static void
211 gst_net_client_internal_clock_init (GstNetClientInternalClock * self)
212 {
213   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
214
215   self->port = DEFAULT_PORT;
216   self->address = g_strdup (DEFAULT_ADDRESS);
217   self->is_ntp = FALSE;
218   self->qos_dscp = DEFAULT_QOS_DSCP;
219
220   gst_clock_set_timeout (GST_CLOCK (self), DEFAULT_TIMEOUT);
221
222   self->thread = NULL;
223
224   self->servaddr = NULL;
225   self->rtt_avg = GST_CLOCK_TIME_NONE;
226   self->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
227   self->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
228   self->last_remote_poll_interval = GST_CLOCK_TIME_NONE;
229   self->skipped_updates = 0;
230   self->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW;
231 }
232
233 static void
234 gst_net_client_internal_clock_finalize (GObject * object)
235 {
236   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
237
238   if (self->thread) {
239     gst_net_client_internal_clock_stop (self);
240   }
241
242   g_free (self->address);
243   self->address = NULL;
244
245   if (self->servaddr != NULL) {
246     g_object_unref (self->servaddr);
247     self->servaddr = NULL;
248   }
249
250   if (self->socket != NULL) {
251     if (!g_socket_close (self->socket, NULL))
252       GST_ERROR_OBJECT (self, "Failed to close socket");
253     g_object_unref (self->socket);
254     self->socket = NULL;
255   }
256
257   g_warn_if_fail (self->busses == NULL);
258
259   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->finalize
260       (object);
261 }
262
263 static void
264 gst_net_client_internal_clock_set_property (GObject * object, guint prop_id,
265     const GValue * value, GParamSpec * pspec)
266 {
267   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
268
269   switch (prop_id) {
270     case PROP_ADDRESS:
271       GST_OBJECT_LOCK (self);
272       g_free (self->address);
273       self->address = g_value_dup_string (value);
274       if (self->address == NULL)
275         self->address = g_strdup (DEFAULT_ADDRESS);
276       GST_OBJECT_UNLOCK (self);
277       break;
278     case PROP_PORT:
279       GST_OBJECT_LOCK (self);
280       self->port = g_value_get_int (value);
281       GST_OBJECT_UNLOCK (self);
282       break;
283     case PROP_IS_NTP:
284       GST_OBJECT_LOCK (self);
285       self->is_ntp = g_value_get_boolean (value);
286       GST_OBJECT_UNLOCK (self);
287       break;
288     default:
289       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
290       break;
291   }
292 }
293
294 static void
295 gst_net_client_internal_clock_get_property (GObject * object, guint prop_id,
296     GValue * value, GParamSpec * pspec)
297 {
298   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
299
300   switch (prop_id) {
301     case PROP_ADDRESS:
302       GST_OBJECT_LOCK (self);
303       g_value_set_string (value, self->address);
304       GST_OBJECT_UNLOCK (self);
305       break;
306     case PROP_PORT:
307       g_value_set_int (value, self->port);
308       break;
309     case PROP_IS_NTP:
310       g_value_set_boolean (value, self->is_ntp);
311       break;
312     default:
313       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
314       break;
315   }
316 }
317
318 static void
319 gst_net_client_internal_clock_constructed (GObject * object)
320 {
321   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
322
323   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->constructed
324       (object);
325
326   if (!gst_net_client_internal_clock_start (self)) {
327     g_warning ("failed to start clock '%s'", GST_OBJECT_NAME (self));
328   }
329
330   /* all systems go, cap'n */
331 }
332
333 static gint
334 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
335 {
336   if (*a < *b)
337     return -1;
338   else if (*a > *b)
339     return 1;
340   return 0;
341 }
342
343 static void
344 gst_net_client_internal_clock_observe_times (GstNetClientInternalClock * self,
345     GstClockTime local_1, GstClockTime remote_1, GstClockTime remote_2,
346     GstClockTime local_2)
347 {
348   GstClockTime current_timeout = 0;
349   GstClockTime local_avg, remote_avg;
350   gdouble r_squared;
351   GstClock *clock;
352   GstClockTime rtt, rtt_limit, min_update_interval;
353   /* Use for discont tracking */
354   GstClockTime time_before = 0;
355   GstClockTime min_guess = 0;
356   GstClockTimeDiff time_discont = 0;
357   gboolean synched, now_synched;
358   GstClockTime internal_time, external_time, rate_num, rate_den;
359   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
360       orig_rate_den;
361   GstClockTime max_discont;
362   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
363   GstClockTime median;
364   gint i;
365
366   GST_OBJECT_LOCK (self);
367   rtt_limit = self->roundtrip_limit;
368
369   GST_LOG_OBJECT (self,
370       "local1 %" G_GUINT64_FORMAT " remote1 %" G_GUINT64_FORMAT " remote2 %"
371       G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT, local_1, remote_1,
372       remote_2, local_2);
373
374   /* If the server told us a poll interval and it's bigger than the
375    * one configured via the property, use the server's */
376   if (self->last_remote_poll_interval != GST_CLOCK_TIME_NONE &&
377       self->last_remote_poll_interval > self->minimum_update_interval)
378     min_update_interval = self->last_remote_poll_interval;
379   else
380     min_update_interval = self->minimum_update_interval;
381   GST_OBJECT_UNLOCK (self);
382
383   if (local_2 < local_1) {
384     GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
385         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
386         GST_TIME_ARGS (local_2));
387     goto bogus_observation;
388   }
389
390   if (remote_2 < remote_1) {
391     GST_LOG_OBJECT (self,
392         "Dropping observation: remote receive time %" GST_TIME_FORMAT
393         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (remote_1),
394         GST_TIME_ARGS (remote_2));
395     goto bogus_observation;
396   }
397
398   /* The round trip time is (assuming symmetric path delays)
399    * delta = (local_2 - local_1) - (remote_2 - remote_1)
400    */
401
402   rtt = GST_CLOCK_DIFF (local_1, local_2) - GST_CLOCK_DIFF (remote_1, remote_2);
403
404   if ((rtt_limit > 0) && (rtt > rtt_limit)) {
405     GST_LOG_OBJECT (self,
406         "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
407         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
408     goto bogus_observation;
409   }
410
411   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
412     self->last_rtts[i - 1] = self->last_rtts[i];
413   self->last_rtts[i - 1] = rtt;
414
415   if (self->last_rtts_missing) {
416     self->last_rtts_missing--;
417   } else {
418     memcpy (&last_rtts, &self->last_rtts, sizeof (last_rtts));
419     g_qsort_with_data (&last_rtts,
420         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
421         (GCompareDataFunc) compare_clock_time, NULL);
422
423     median = last_rtts[MEDIAN_PRE_FILTERING_WINDOW / 2];
424
425     /* FIXME: We might want to use something else here, like only allowing
426      * things in the interquartile range, or also filtering away delays that
427      * are too small compared to the median. This here worked well enough
428      * in tests so far.
429      */
430     if (rtt > 2 * median) {
431       GST_LOG_OBJECT (self,
432           "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * median %"
433           GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (median));
434       goto bogus_observation;
435     }
436   }
437
438   /* Track an average round trip time, for a bit of smoothing */
439   /* Always update before discarding a sample, so genuine changes in
440    * the network get picked up, eventually */
441   if (self->rtt_avg == GST_CLOCK_TIME_NONE)
442     self->rtt_avg = rtt;
443   else if (rtt < self->rtt_avg) /* Shorter RTTs carry more weight than longer */
444     self->rtt_avg = (3 * self->rtt_avg + rtt) / 4;
445   else
446     self->rtt_avg = (15 * self->rtt_avg + rtt) / 16;
447
448   if (rtt > 2 * self->rtt_avg) {
449     GST_LOG_OBJECT (self,
450         "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
451         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (self->rtt_avg));
452     goto bogus_observation;
453   }
454
455   /* The difference between the local and remote clock (again assuming
456    * symmetric path delays):
457    *
458    * local_1 + delta / 2 - remote_1 = theta
459    * or
460    * local_2 - delta / 2 - remote_2 = theta
461    *
462    * which gives after some simple algebraic transformations:
463    *
464    *         (remote_1 - local_1) + (remote_2 - local_2)
465    * theta = -------------------------------------------
466    *                              2
467    *
468    *
469    * Thus remote time at local_avg is equal to:
470    *
471    * local_avg + theta =
472    *
473    * local_1 + local_2   (remote_1 - local_1) + (remote_2 - local_2)
474    * ----------------- + -------------------------------------------
475    *         2                                2
476    *
477    * =
478    *
479    * remote_1 + remote_2
480    * ------------------- = remote_avg
481    *          2
482    *
483    * We use this for our clock estimation, i.e. local_avg at remote clock
484    * being the same as remote_avg.
485    */
486
487   local_avg = (local_2 + local_1) / 2;
488   remote_avg = (remote_2 + remote_1) / 2;
489
490   GST_LOG_OBJECT (self,
491       "remoteavg %" G_GUINT64_FORMAT " localavg %" G_GUINT64_FORMAT,
492       remote_avg, local_avg);
493
494   clock = GST_CLOCK_CAST (self);
495
496   /* Store what the clock produced as 'now' before this update */
497   gst_clock_get_calibration (GST_CLOCK_CAST (self), &orig_internal_time,
498       &orig_external_time, &orig_rate_num, &orig_rate_den);
499   internal_time = orig_internal_time;
500   external_time = orig_external_time;
501   rate_num = orig_rate_num;
502   rate_den = orig_rate_den;
503
504   min_guess =
505       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1,
506       internal_time, external_time, rate_num, rate_den);
507   time_before =
508       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
509       internal_time, external_time, rate_num, rate_den);
510
511   /* Maximum discontinuity, when we're synched with the master. Could make this a property,
512    * but this value seems to work fine */
513   max_discont = self->rtt_avg / 4;
514
515   /* If the remote observation was within a max_discont window around our min/max estimates, we're synched */
516   synched =
517       (GST_CLOCK_DIFF (remote_avg, min_guess) < (GstClockTimeDiff) (max_discont)
518       && GST_CLOCK_DIFF (time_before,
519           remote_avg) < (GstClockTimeDiff) (max_discont));
520
521   if (gst_clock_add_observation_unapplied (GST_CLOCK_CAST (self),
522           local_avg, remote_avg, &r_squared, &internal_time, &external_time,
523           &rate_num, &rate_den)) {
524
525     /* Now compare the difference (discont) in the clock
526      * after this observation */
527     time_discont = GST_CLOCK_DIFF (time_before,
528         gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
529             internal_time, external_time, rate_num, rate_den));
530
531     /* If we were in sync with the remote clock, clamp the allowed
532      * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
533      * of remote time correctly windowed the actual remote time observation */
534     if (synched && ABS (time_discont) > max_discont) {
535       GstClockTimeDiff offset;
536       GST_DEBUG_OBJECT (clock,
537           "Too large a discont, clamping to 1/4 average RTT = %"
538           GST_TIME_FORMAT, GST_TIME_ARGS (max_discont));
539       if (time_discont > 0) {   /* Too large a forward step - add a -ve offset */
540         offset = max_discont - time_discont;
541         if (-offset > external_time)
542           external_time = 0;
543         else
544           external_time += offset;
545       } else {                  /* Too large a backward step - add a +ve offset */
546         offset = -(max_discont + time_discont);
547         external_time += offset;
548       }
549
550       time_discont += offset;
551     }
552
553     /* Check if the new clock params would have made our observation within range */
554     now_synched =
555         (GST_CLOCK_DIFF (remote_avg,
556             gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self),
557                 local_1, internal_time, external_time, rate_num,
558                 rate_den)) < (GstClockTimeDiff) (max_discont))
559         &&
560         (GST_CLOCK_DIFF (gst_clock_adjust_with_calibration
561             (GST_CLOCK_CAST (self), local_2, internal_time, external_time,
562                 rate_num, rate_den),
563             remote_avg) < (GstClockTimeDiff) (max_discont));
564
565     /* Only update the clock if we had synch or just gained it */
566     if (synched || now_synched || self->skipped_updates > MAX_SKIPPED_UPDATES) {
567       gst_clock_set_calibration (GST_CLOCK_CAST (self), internal_time,
568           external_time, rate_num, rate_den);
569       /* ghetto formula - shorter timeout for bad correlations */
570       current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
571       current_timeout =
572           MIN (current_timeout, gst_clock_get_timeout (GST_CLOCK_CAST (self)));
573       self->skipped_updates = 0;
574
575       /* FIXME: When do we consider the clock absolutely not synced anymore? */
576       gst_clock_set_synced (GST_CLOCK (self), TRUE);
577     } else {
578       /* Restore original calibration vars for the report, we're not changing the clock */
579       internal_time = orig_internal_time;
580       external_time = orig_external_time;
581       rate_num = orig_rate_num;
582       rate_den = orig_rate_den;
583       time_discont = 0;
584       self->skipped_updates++;
585     }
586   }
587
588   /* Limit the polling to at most one per minimum_update_interval */
589   if (rtt < min_update_interval)
590     current_timeout = MAX (min_update_interval - rtt, current_timeout);
591
592   GST_OBJECT_LOCK (self);
593   if (self->busses) {
594     GstStructure *s;
595     GstMessage *msg;
596     GList *l;
597
598     /* Output a stats message, whether we updated the clock or not */
599     s = gst_structure_new ("gst-netclock-statistics",
600         "synchronised", G_TYPE_BOOLEAN, synched,
601         "rtt", G_TYPE_UINT64, rtt,
602         "rtt-average", G_TYPE_UINT64, self->rtt_avg,
603         "local", G_TYPE_UINT64, local_avg,
604         "remote", G_TYPE_UINT64, remote_avg,
605         "discontinuity", G_TYPE_INT64, time_discont,
606         "remote-min-estimate", G_TYPE_UINT64, min_guess,
607         "remote-max-estimate", G_TYPE_UINT64, time_before,
608         "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote_avg,
609             min_guess), "remote-max-error", G_TYPE_INT64,
610         GST_CLOCK_DIFF (remote_avg, time_before), "request-send", G_TYPE_UINT64,
611         local_1, "request-receive", G_TYPE_UINT64, local_2, "r-squared",
612         G_TYPE_DOUBLE, r_squared, "timeout", G_TYPE_UINT64, current_timeout,
613         "internal-time", G_TYPE_UINT64, internal_time, "external-time",
614         G_TYPE_UINT64, external_time, "rate-num", G_TYPE_UINT64, rate_num,
615         "rate-den", G_TYPE_UINT64, rate_den, "rate", G_TYPE_DOUBLE,
616         (gdouble) (rate_num) / rate_den, "local-clock-offset", G_TYPE_INT64,
617         GST_CLOCK_DIFF (internal_time, external_time), NULL);
618     msg = gst_message_new_element (GST_OBJECT (self), s);
619
620     for (l = self->busses; l; l = l->next)
621       gst_bus_post (l->data, gst_message_ref (msg));
622     gst_message_unref (msg);
623   }
624   GST_OBJECT_UNLOCK (self);
625
626   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
627   self->timeout_expiration = gst_util_get_timestamp () + current_timeout;
628
629   return;
630
631 bogus_observation:
632   /* Schedule a new packet again soon */
633   self->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
634   return;
635 }
636
637 static gpointer
638 gst_net_client_internal_clock_thread (gpointer data)
639 {
640   GstNetClientInternalClock *self = data;
641   GSocket *socket = self->socket;
642   GError *err = NULL;
643   gint cur_qos_dscp = DEFAULT_QOS_DSCP;
644
645   GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
646
647   g_socket_set_blocking (socket, TRUE);
648   g_socket_set_timeout (socket, 0);
649
650   while (!g_cancellable_is_cancelled (self->cancel)) {
651     GstClockTime expiration_time = self->timeout_expiration;
652     GstClockTime now = gst_util_get_timestamp ();
653     gint64 socket_timeout;
654
655     if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
656       socket_timeout = 0;
657     } else {
658       socket_timeout = (expiration_time - now) / GST_USECOND;
659     }
660
661     GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
662
663     if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
664             self->cancel, &err)) {
665       /* cancelled, timeout or error */
666       if (err->code == G_IO_ERROR_CANCELLED) {
667         GST_INFO_OBJECT (self, "cancelled");
668         g_clear_error (&err);
669         break;
670       } else if (err->code == G_IO_ERROR_TIMED_OUT) {
671         gint new_qos_dscp;
672
673         /* timed out, let's send another packet */
674         GST_DEBUG_OBJECT (self, "timed out");
675
676         /* before next sending check if need to change QoS */
677         new_qos_dscp = self->qos_dscp;
678         if (cur_qos_dscp != new_qos_dscp &&
679             gst_net_utils_set_socket_dscp (socket, new_qos_dscp)) {
680           GST_DEBUG_OBJECT (self, "changed QoS DSCP to: %d", new_qos_dscp);
681           cur_qos_dscp = new_qos_dscp;
682         }
683
684         if (self->is_ntp) {
685           GstNtpPacket *packet;
686
687           packet = gst_ntp_packet_new (NULL, NULL);
688
689           packet->transmit_time =
690               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
691
692           GST_DEBUG_OBJECT (self,
693               "sending packet, local time = %" GST_TIME_FORMAT,
694               GST_TIME_ARGS (packet->transmit_time));
695
696           gst_ntp_packet_send (packet, self->socket, self->servaddr, NULL);
697
698           g_free (packet);
699         } else {
700           GstNetTimePacket *packet;
701
702           packet = gst_net_time_packet_new (NULL);
703
704           packet->local_time =
705               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
706
707           GST_DEBUG_OBJECT (self,
708               "sending packet, local time = %" GST_TIME_FORMAT,
709               GST_TIME_ARGS (packet->local_time));
710
711           gst_net_time_packet_send (packet, self->socket, self->servaddr, NULL);
712
713           g_free (packet);
714         }
715
716         /* reset timeout (but are expecting a response sooner anyway) */
717         self->timeout_expiration =
718             gst_util_get_timestamp () +
719             gst_clock_get_timeout (GST_CLOCK_CAST (self));
720       } else {
721         GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
722         g_usleep (G_USEC_PER_SEC / 10); /* throttle */
723       }
724       g_clear_error (&err);
725     } else {
726       GstClockTime new_local;
727
728       /* got packet */
729
730       new_local = gst_clock_get_internal_time (GST_CLOCK_CAST (self));
731
732       if (self->is_ntp) {
733         GstNtpPacket *packet;
734
735         packet = gst_ntp_packet_receive (socket, NULL, &err);
736
737         if (packet != NULL) {
738           GST_LOG_OBJECT (self, "got packet back");
739           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
740               GST_TIME_ARGS (packet->origin_time));
741           GST_LOG_OBJECT (self, "remote_1 = %" GST_TIME_FORMAT,
742               GST_TIME_ARGS (packet->receive_time));
743           GST_LOG_OBJECT (self, "remote_2 = %" GST_TIME_FORMAT,
744               GST_TIME_ARGS (packet->transmit_time));
745           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
746               GST_TIME_ARGS (new_local));
747           GST_LOG_OBJECT (self, "poll_interval = %" GST_TIME_FORMAT,
748               GST_TIME_ARGS (packet->poll_interval));
749
750           /* Remember the last poll interval we ever got from the server */
751           if (packet->poll_interval != GST_CLOCK_TIME_NONE)
752             self->last_remote_poll_interval = packet->poll_interval;
753
754           /* observe_times will reset the timeout */
755           gst_net_client_internal_clock_observe_times (self,
756               packet->origin_time, packet->receive_time, packet->transmit_time,
757               new_local);
758
759           g_free (packet);
760         } else if (err != NULL) {
761           if (g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION)
762               || g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY)) {
763             GST_ERROR_OBJECT (self, "fatal receive error: %s", err->message);
764             g_clear_error (&err);
765             break;
766           } else if (g_error_matches (err, GST_NTP_ERROR,
767                   GST_NTP_ERROR_KOD_RATE)) {
768             GST_WARNING_OBJECT (self, "need to limit rate");
769
770             /* If the server did not tell us a poll interval before, double
771              * our minimum poll interval. Otherwise we assume that the server
772              * already told us something sensible and that this error here
773              * was just a spurious error */
774             if (self->last_remote_poll_interval == GST_CLOCK_TIME_NONE)
775               self->minimum_update_interval *= 2;
776
777             /* And wait a bit before we send the next packet instead of
778              * sending it immediately */
779             self->timeout_expiration =
780                 gst_util_get_timestamp () +
781                 gst_clock_get_timeout (GST_CLOCK_CAST (self));
782           } else {
783             GST_WARNING_OBJECT (self, "receive error: %s", err->message);
784           }
785           g_clear_error (&err);
786         }
787       } else {
788         GstNetTimePacket *packet;
789
790         packet = gst_net_time_packet_receive (socket, NULL, &err);
791
792         if (packet != NULL) {
793           GST_LOG_OBJECT (self, "got packet back");
794           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
795               GST_TIME_ARGS (packet->local_time));
796           GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
797               GST_TIME_ARGS (packet->remote_time));
798           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
799               GST_TIME_ARGS (new_local));
800
801           /* observe_times will reset the timeout */
802           gst_net_client_internal_clock_observe_times (self, packet->local_time,
803               packet->remote_time, packet->remote_time, new_local);
804
805           g_free (packet);
806         } else if (err != NULL) {
807           GST_WARNING_OBJECT (self, "receive error: %s", err->message);
808           g_clear_error (&err);
809         }
810       }
811     }
812   }
813   GST_INFO_OBJECT (self, "shutting down net client clock thread");
814   return NULL;
815 }
816
817 static gboolean
818 gst_net_client_internal_clock_start (GstNetClientInternalClock * self)
819 {
820   GSocketAddress *servaddr;
821   GSocketAddress *myaddr;
822   GSocketAddress *anyaddr;
823   GInetAddress *inetaddr;
824   GSocket *socket;
825   GError *error = NULL;
826   GSocketFamily family;
827   GPollFD dummy_pollfd;
828   GResolver *resolver = NULL;
829   GError *err = NULL;
830
831   g_return_val_if_fail (self->address != NULL, FALSE);
832   g_return_val_if_fail (self->servaddr == NULL, FALSE);
833
834   /* create target address */
835   inetaddr = g_inet_address_new_from_string (self->address);
836   if (inetaddr == NULL) {
837     GList *results;
838
839     resolver = g_resolver_get_default ();
840
841     results = g_resolver_lookup_by_name (resolver, self->address, NULL, &err);
842     if (!results)
843       goto failed_to_resolve;
844
845     inetaddr = G_INET_ADDRESS (g_object_ref (results->data));
846     g_resolver_free_addresses (results);
847     g_object_unref (resolver);
848   }
849
850   family = g_inet_address_get_family (inetaddr);
851
852   servaddr = g_inet_socket_address_new (inetaddr, self->port);
853   g_object_unref (inetaddr);
854
855   g_assert (servaddr != NULL);
856
857   GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address,
858       self->port);
859
860   socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
861       G_SOCKET_PROTOCOL_UDP, &error);
862
863   if (socket == NULL)
864     goto no_socket;
865
866   GST_DEBUG_OBJECT (self, "binding socket");
867   inetaddr = g_inet_address_new_any (family);
868   anyaddr = g_inet_socket_address_new (inetaddr, 0);
869   g_socket_bind (socket, anyaddr, TRUE, &error);
870   g_object_unref (anyaddr);
871   g_object_unref (inetaddr);
872
873   if (error != NULL)
874     goto bind_error;
875
876   /* check address we're bound to, mostly for debugging purposes */
877   myaddr = g_socket_get_local_address (socket, &error);
878
879   if (myaddr == NULL)
880     goto getsockname_error;
881
882   GST_DEBUG_OBJECT (self, "socket opened on UDP port %d",
883       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
884
885   g_object_unref (myaddr);
886
887   self->cancel = g_cancellable_new ();
888   self->made_cancel_fd =
889       g_cancellable_make_pollfd (self->cancel, &dummy_pollfd);
890
891   self->socket = socket;
892   self->servaddr = G_SOCKET_ADDRESS (servaddr);
893
894   self->thread = g_thread_try_new ("GstNetClientInternalClock",
895       gst_net_client_internal_clock_thread, self, &error);
896
897   if (error != NULL)
898     goto no_thread;
899
900   return TRUE;
901
902   /* ERRORS */
903 no_socket:
904   {
905     GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
906     g_error_free (error);
907     return FALSE;
908   }
909 bind_error:
910   {
911     GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
912     g_error_free (error);
913     g_object_unref (socket);
914     return FALSE;
915   }
916 getsockname_error:
917   {
918     GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
919     g_error_free (error);
920     g_object_unref (socket);
921     return FALSE;
922   }
923 failed_to_resolve:
924   {
925     GST_ERROR_OBJECT (self, "resolving '%s' failed: %s",
926         self->address, err->message);
927     g_clear_error (&err);
928     g_object_unref (resolver);
929     return FALSE;
930   }
931 no_thread:
932   {
933     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
934     g_object_unref (self->servaddr);
935     self->servaddr = NULL;
936     g_object_unref (self->socket);
937     self->socket = NULL;
938     g_error_free (error);
939     return FALSE;
940   }
941 }
942
943 static void
944 gst_net_client_internal_clock_stop (GstNetClientInternalClock * self)
945 {
946   if (self->thread == NULL)
947     return;
948
949   GST_INFO_OBJECT (self, "stopping...");
950   g_cancellable_cancel (self->cancel);
951
952   g_thread_join (self->thread);
953   self->thread = NULL;
954
955   if (self->made_cancel_fd)
956     g_cancellable_release_fd (self->cancel);
957
958   g_object_unref (self->cancel);
959   self->cancel = NULL;
960
961   g_object_unref (self->servaddr);
962   self->servaddr = NULL;
963
964   g_object_unref (self->socket);
965   self->socket = NULL;
966
967   GST_INFO_OBJECT (self, "stopped");
968 }
969
970 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
971   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
972
973 struct _GstNetClientClockPrivate
974 {
975   GstClock *internal_clock;
976
977   GstClockTime roundtrip_limit;
978   GstClockTime minimum_update_interval;
979
980   GstClockTime base_time, internal_base_time;
981
982   gchar *address;
983   gint port;
984   gint qos_dscp;
985
986   GstBus *bus;
987
988   gboolean is_ntp;
989
990   gulong synced_id;
991 };
992
993 G_DEFINE_TYPE (GstNetClientClock, gst_net_client_clock, GST_TYPE_SYSTEM_CLOCK);
994
995 static void gst_net_client_clock_finalize (GObject * object);
996 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
997     const GValue * value, GParamSpec * pspec);
998 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
999     GValue * value, GParamSpec * pspec);
1000 static void gst_net_client_clock_constructed (GObject * object);
1001
1002 static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock);
1003
1004 static void
1005 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
1006 {
1007   GObjectClass *gobject_class;
1008   GstClockClass *clock_class;
1009
1010   gobject_class = G_OBJECT_CLASS (klass);
1011   clock_class = GST_CLOCK_CLASS (klass);
1012
1013   g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
1014
1015   gobject_class->finalize = gst_net_client_clock_finalize;
1016   gobject_class->get_property = gst_net_client_clock_get_property;
1017   gobject_class->set_property = gst_net_client_clock_set_property;
1018   gobject_class->constructed = gst_net_client_clock_constructed;
1019
1020   g_object_class_install_property (gobject_class, PROP_ADDRESS,
1021       g_param_spec_string ("address", "address",
1022           "The IP address of the machine providing a time server",
1023           DEFAULT_ADDRESS,
1024           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1025   g_object_class_install_property (gobject_class, PROP_PORT,
1026       g_param_spec_int ("port", "port",
1027           "The port on which the remote server is listening", 0, G_MAXUINT16,
1028           DEFAULT_PORT,
1029           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1030   g_object_class_install_property (gobject_class, PROP_BUS,
1031       g_param_spec_object ("bus", "bus",
1032           "A GstBus on which to send clock status information", GST_TYPE_BUS,
1033           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1034
1035   /**
1036    * GstNetClientInternalClock::round-trip-limit:
1037    *
1038    * Maximum allowed round-trip for packets. If this property is set to a nonzero
1039    * value, all packets with a round-trip interval larger than this limit will be
1040    * ignored. This is useful for networks with severe and fluctuating transport
1041    * delays. Filtering out these packets increases stability of the synchronization.
1042    * On the other hand, the lower the limit, the higher the amount of filtered
1043    * packets. Empirical tests are typically necessary to estimate a good value
1044    * for the limit.
1045    * If the property is set to zero, the limit is disabled.
1046    *
1047    * Since: 1.4
1048    */
1049   g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
1050       g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
1051           "Maximum tolerable round-trip interval for packets, in nanoseconds "
1052           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
1053           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1054
1055   g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL,
1056       g_param_spec_uint64 ("minimum-update-interval", "minimum update interval",
1057           "Minimum polling interval for packets, in nanoseconds"
1058           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL,
1059           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1060
1061   g_object_class_install_property (gobject_class, PROP_BASE_TIME,
1062       g_param_spec_uint64 ("base-time", "Base Time",
1063           "Initial time that is reported before synchronization", 0,
1064           G_MAXUINT64, DEFAULT_BASE_TIME,
1065           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1066
1067   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
1068       g_param_spec_object ("internal-clock", "Internal Clock",
1069           "Internal clock that directly slaved to the remote clock",
1070           GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1071
1072   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
1073       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
1074           "Quality of Service, differentiated services code point (-1 default)",
1075           -1, 63, DEFAULT_QOS_DSCP,
1076           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1077
1078   clock_class->get_internal_time = gst_net_client_clock_get_internal_time;
1079 }
1080
1081 static void
1082 gst_net_client_clock_init (GstNetClientClock * self)
1083 {
1084   GstNetClientClockPrivate *priv;
1085   GstClock *clock;
1086
1087   self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
1088
1089   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
1090   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
1091
1092   priv->port = DEFAULT_PORT;
1093   priv->address = g_strdup (DEFAULT_ADDRESS);
1094   priv->qos_dscp = DEFAULT_QOS_DSCP;
1095
1096   priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
1097   priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
1098
1099   clock = gst_system_clock_obtain ();
1100   priv->base_time = DEFAULT_BASE_TIME;
1101   priv->internal_base_time = gst_clock_get_time (clock);
1102   gst_object_unref (clock);
1103 }
1104
1105 /* Must be called with clocks_lock */
1106 static void
1107 update_clock_cache (ClockCache * cache)
1108 {
1109   GstClockTime roundtrip_limit = 0, minimum_update_interval = 0;
1110   GList *l, *busses = NULL;
1111   gint qos_dscp = DEFAULT_QOS_DSCP;
1112
1113   GST_OBJECT_LOCK (cache->clock);
1114   g_list_free_full (GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses,
1115       (GDestroyNotify) gst_object_unref);
1116
1117   for (l = cache->clocks; l; l = l->next) {
1118     GstNetClientClock *clock = l->data;
1119
1120     if (clock->priv->bus)
1121       busses = g_list_prepend (busses, gst_object_ref (clock->priv->bus));
1122
1123     if (roundtrip_limit == 0)
1124       roundtrip_limit = clock->priv->roundtrip_limit;
1125     else
1126       roundtrip_limit = MAX (roundtrip_limit, clock->priv->roundtrip_limit);
1127
1128     if (minimum_update_interval == 0)
1129       minimum_update_interval = clock->priv->minimum_update_interval;
1130     else
1131       minimum_update_interval =
1132           MIN (minimum_update_interval, clock->priv->minimum_update_interval);
1133
1134     qos_dscp = MAX (qos_dscp, clock->priv->qos_dscp);
1135   }
1136   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses = busses;
1137   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->roundtrip_limit =
1138       roundtrip_limit;
1139   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->minimum_update_interval =
1140       minimum_update_interval;
1141   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->qos_dscp = qos_dscp;
1142
1143   GST_OBJECT_UNLOCK (cache->clock);
1144 }
1145
1146 static gboolean
1147 remove_clock_cache (GstClock * clock, GstClockTime time, GstClockID id,
1148     gpointer user_data)
1149 {
1150   ClockCache *cache = user_data;
1151
1152   G_LOCK (clocks_lock);
1153   if (!cache->clocks) {
1154     gst_clock_id_unref (cache->remove_id);
1155     gst_object_unref (cache->clock);
1156     clocks = g_list_remove (clocks, cache);
1157     g_free (cache);
1158   }
1159   G_UNLOCK (clocks_lock);
1160
1161   return TRUE;
1162 }
1163
1164 static void
1165 gst_net_client_clock_finalize (GObject * object)
1166 {
1167   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1168   GList *l;
1169
1170   if (self->priv->synced_id)
1171     g_signal_handler_disconnect (self->priv->internal_clock,
1172         self->priv->synced_id);
1173   self->priv->synced_id = 0;
1174
1175   G_LOCK (clocks_lock);
1176   for (l = clocks; l; l = l->next) {
1177     ClockCache *cache = l->data;
1178
1179     if (cache->clock == self->priv->internal_clock) {
1180       cache->clocks = g_list_remove (cache->clocks, self);
1181
1182       if (cache->clocks) {
1183         update_clock_cache (cache);
1184       } else {
1185         GstClock *sysclock = gst_system_clock_obtain ();
1186         GstClockTime time = gst_clock_get_time (sysclock) + 60 * GST_SECOND;
1187
1188         cache->remove_id = gst_clock_new_single_shot_id (sysclock, time);
1189         gst_clock_id_wait_async (cache->remove_id, remove_clock_cache, cache,
1190             NULL);
1191         gst_object_unref (sysclock);
1192       }
1193       break;
1194     }
1195   }
1196   G_UNLOCK (clocks_lock);
1197
1198   g_free (self->priv->address);
1199   self->priv->address = NULL;
1200
1201   if (self->priv->bus != NULL) {
1202     gst_object_unref (self->priv->bus);
1203     self->priv->bus = NULL;
1204   }
1205
1206   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->finalize (object);
1207 }
1208
1209 static void
1210 gst_net_client_clock_set_property (GObject * object, guint prop_id,
1211     const GValue * value, GParamSpec * pspec)
1212 {
1213   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1214   gboolean update = FALSE;
1215
1216   switch (prop_id) {
1217     case PROP_ADDRESS:
1218       GST_OBJECT_LOCK (self);
1219       g_free (self->priv->address);
1220       self->priv->address = g_value_dup_string (value);
1221       if (self->priv->address == NULL)
1222         self->priv->address = g_strdup (DEFAULT_ADDRESS);
1223       GST_OBJECT_UNLOCK (self);
1224       break;
1225     case PROP_PORT:
1226       GST_OBJECT_LOCK (self);
1227       self->priv->port = g_value_get_int (value);
1228       GST_OBJECT_UNLOCK (self);
1229       break;
1230     case PROP_ROUNDTRIP_LIMIT:
1231       GST_OBJECT_LOCK (self);
1232       self->priv->roundtrip_limit = g_value_get_uint64 (value);
1233       GST_OBJECT_UNLOCK (self);
1234       update = TRUE;
1235       break;
1236     case PROP_MINIMUM_UPDATE_INTERVAL:
1237       GST_OBJECT_LOCK (self);
1238       self->priv->minimum_update_interval = g_value_get_uint64 (value);
1239       GST_OBJECT_UNLOCK (self);
1240       update = TRUE;
1241       break;
1242     case PROP_BUS:
1243       GST_OBJECT_LOCK (self);
1244       if (self->priv->bus)
1245         gst_object_unref (self->priv->bus);
1246       self->priv->bus = g_value_dup_object (value);
1247       GST_OBJECT_UNLOCK (self);
1248       update = TRUE;
1249       break;
1250     case PROP_BASE_TIME:{
1251       GstClock *clock;
1252
1253       self->priv->base_time = g_value_get_uint64 (value);
1254       clock = gst_system_clock_obtain ();
1255       self->priv->internal_base_time = gst_clock_get_time (clock);
1256       gst_object_unref (clock);
1257       break;
1258     }
1259     case PROP_QOS_DSCP:
1260       GST_OBJECT_LOCK (self);
1261       self->priv->qos_dscp = g_value_get_int (value);
1262       GST_OBJECT_UNLOCK (self);
1263       update = TRUE;
1264       break;
1265     default:
1266       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1267       break;
1268   }
1269
1270   if (update && self->priv->internal_clock) {
1271     GList *l;
1272
1273     G_LOCK (clocks_lock);
1274     for (l = clocks; l; l = l->next) {
1275       ClockCache *cache = l->data;
1276
1277       if (cache->clock == self->priv->internal_clock) {
1278         update_clock_cache (cache);
1279       }
1280     }
1281     G_UNLOCK (clocks_lock);
1282   }
1283 }
1284
1285 static void
1286 gst_net_client_clock_get_property (GObject * object, guint prop_id,
1287     GValue * value, GParamSpec * pspec)
1288 {
1289   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1290
1291   switch (prop_id) {
1292     case PROP_ADDRESS:
1293       GST_OBJECT_LOCK (self);
1294       g_value_set_string (value, self->priv->address);
1295       GST_OBJECT_UNLOCK (self);
1296       break;
1297     case PROP_PORT:
1298       g_value_set_int (value, self->priv->port);
1299       break;
1300     case PROP_ROUNDTRIP_LIMIT:
1301       GST_OBJECT_LOCK (self);
1302       g_value_set_uint64 (value, self->priv->roundtrip_limit);
1303       GST_OBJECT_UNLOCK (self);
1304       break;
1305     case PROP_MINIMUM_UPDATE_INTERVAL:
1306       GST_OBJECT_LOCK (self);
1307       g_value_set_uint64 (value, self->priv->minimum_update_interval);
1308       GST_OBJECT_UNLOCK (self);
1309       break;
1310     case PROP_BUS:
1311       GST_OBJECT_LOCK (self);
1312       g_value_set_object (value, self->priv->bus);
1313       GST_OBJECT_UNLOCK (self);
1314       break;
1315     case PROP_BASE_TIME:
1316       g_value_set_uint64 (value, self->priv->base_time);
1317       break;
1318     case PROP_INTERNAL_CLOCK:
1319       g_value_set_object (value, self->priv->internal_clock);
1320       break;
1321     case PROP_QOS_DSCP:
1322       GST_OBJECT_LOCK (self);
1323       g_value_set_int (value, self->priv->qos_dscp);
1324       GST_OBJECT_UNLOCK (self);
1325       break;
1326     default:
1327       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1328       break;
1329   }
1330 }
1331
1332 static void
1333 gst_net_client_clock_synced_cb (GstClock * internal_clock, gboolean synced,
1334     GstClock * self)
1335 {
1336   gst_clock_set_synced (self, synced);
1337 }
1338
1339 static void
1340 gst_net_client_clock_constructed (GObject * object)
1341 {
1342   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1343   GstClock *internal_clock;
1344   GList *l;
1345   ClockCache *cache = NULL;
1346
1347   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->constructed (object);
1348
1349   G_LOCK (clocks_lock);
1350   for (l = clocks; l; l = l->next) {
1351     ClockCache *tmp = l->data;
1352     GstNetClientInternalClock *internal_clock =
1353         GST_NET_CLIENT_INTERNAL_CLOCK (tmp->clock);
1354
1355     if (strcmp (internal_clock->address, self->priv->address) == 0 &&
1356         internal_clock->port == self->priv->port) {
1357       cache = tmp;
1358
1359       if (cache->remove_id) {
1360         gst_clock_id_unschedule (cache->remove_id);
1361         cache->remove_id = NULL;
1362       }
1363       break;
1364     }
1365   }
1366
1367   if (!cache) {
1368     cache = g_new0 (ClockCache, 1);
1369
1370     cache->clock =
1371         g_object_new (GST_TYPE_NET_CLIENT_INTERNAL_CLOCK, "address",
1372         self->priv->address, "port", self->priv->port, "is-ntp",
1373         self->priv->is_ntp, NULL);
1374     gst_object_ref_sink (cache->clock);
1375     clocks = g_list_prepend (clocks, cache);
1376
1377     /* Not actually leaked but is cached for a while before being disposed,
1378      * see gst_net_client_clock_finalize, so pretend it is to not confuse
1379      * tests. */
1380     GST_OBJECT_FLAG_SET (cache->clock, GST_OBJECT_FLAG_MAY_BE_LEAKED);
1381   }
1382
1383   cache->clocks = g_list_prepend (cache->clocks, self);
1384
1385   GST_OBJECT_LOCK (cache->clock);
1386   if (gst_clock_is_synced (cache->clock))
1387     gst_clock_set_synced (GST_CLOCK (self), TRUE);
1388   self->priv->synced_id =
1389       g_signal_connect (cache->clock, "synced",
1390       G_CALLBACK (gst_net_client_clock_synced_cb), self);
1391   GST_OBJECT_UNLOCK (cache->clock);
1392
1393   G_UNLOCK (clocks_lock);
1394
1395   self->priv->internal_clock = internal_clock = cache->clock;
1396
1397   /* all systems go, cap'n */
1398 }
1399
1400 static GstClockTime
1401 gst_net_client_clock_get_internal_time (GstClock * clock)
1402 {
1403   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock);
1404
1405   if (!gst_clock_is_synced (self->priv->internal_clock)) {
1406     GstClockTime now = gst_clock_get_internal_time (self->priv->internal_clock);
1407     return gst_clock_adjust_with_calibration (self->priv->internal_clock, now,
1408         self->priv->internal_base_time, self->priv->base_time, 1, 1);
1409   }
1410
1411   return gst_clock_get_time (self->priv->internal_clock);
1412 }
1413
1414 /**
1415  * gst_net_client_clock_new:
1416  * @name: a name for the clock
1417  * @remote_address: the address or hostname of the remote clock provider
1418  * @remote_port: the port of the remote clock provider
1419  * @base_time: initial time of the clock
1420  *
1421  * Create a new #GstNetClientInternalClock that will report the time
1422  * provided by the #GstNetTimeProvider on @remote_address and
1423  * @remote_port.
1424  *
1425  * Returns: (transfer full): a new #GstClock that receives a time from the remote
1426  * clock.
1427  */
1428 GstClock *
1429 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
1430     gint remote_port, GstClockTime base_time)
1431 {
1432   GstClock *ret;
1433
1434   g_return_val_if_fail (remote_address != NULL, NULL);
1435   g_return_val_if_fail (remote_port > 0, NULL);
1436   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1437   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1438
1439   ret =
1440       g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "name", name, "address",
1441       remote_address, "port", remote_port, "base-time", base_time, NULL);
1442
1443   /* Clear floating flag */
1444   gst_object_ref_sink (ret);
1445
1446   return ret;
1447 }
1448
1449 G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_NET_CLIENT_CLOCK);
1450
1451 static void
1452 gst_ntp_clock_class_init (GstNtpClockClass * klass)
1453 {
1454 }
1455
1456 static void
1457 gst_ntp_clock_init (GstNtpClock * self)
1458 {
1459   GST_NET_CLIENT_CLOCK (self)->priv->is_ntp = TRUE;
1460 }
1461
1462 /**
1463  * gst_ntp_clock_new:
1464  * @name: a name for the clock
1465  * @remote_address: the address or hostname of the remote clock provider
1466  * @remote_port: the port of the remote clock provider
1467  * @base_time: initial time of the clock
1468  *
1469  * Create a new #GstNtpClock that will report the time provided by
1470  * the NTPv4 server on @remote_address and @remote_port.
1471  *
1472  * Returns: (transfer full): a new #GstClock that receives a time from the remote
1473  * clock.
1474  *
1475  * Since: 1.6
1476  */
1477 GstClock *
1478 gst_ntp_clock_new (const gchar * name, const gchar * remote_address,
1479     gint remote_port, GstClockTime base_time)
1480 {
1481   GstClock *ret;
1482
1483   g_return_val_if_fail (remote_address != NULL, NULL);
1484   g_return_val_if_fail (remote_port > 0, NULL);
1485   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1486   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1487
1488   ret =
1489       g_object_new (GST_TYPE_NTP_CLOCK, "name", name, "address", remote_address,
1490       "port", remote_port, "base-time", base_time, NULL);
1491
1492   gst_object_ref_sink (ret);
1493
1494   return ret;
1495 }