cce8952f8eec7eca4cfaaa0082215a7346a13817
[platform/upstream/gstreamer.git] / libs / gst / net / gstnetclientclock.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2005 Andy Wingo <wingo@pobox.com>
5  * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
7  *
8  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
9  * the network
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION:gstnetclientclock
28  * @short_description: Special clock that synchronizes to a remote time
29  *                     provider.
30  * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
31  *
32  * #GstNetClientClock implements a custom #GstClock that synchronizes its time
33  * to a remote time provider such as #GstNetTimeProvider. #GstNtpClock
34  * implements a #GstClock that synchronizes its time to a remote NTPv4 server.
35  *
36  * A new clock is created with gst_net_client_clock_new() or
37  * gst_ntp_clock_new(), which takes the address and port of the remote time
38  * provider along with a name and an initial time.
39  *
40  * This clock will poll the time provider and will update its calibration
41  * parameters based on the local and remote observations.
42  *
43  * The "round-trip" property limits the maximum round trip packets can take.
44  *
45  * Various parameters of the clock can be configured with the parent #GstClock
46  * "timeout", "window-size" and "window-threshold" object properties.
47  *
48  * A #GstNetClientClock and #GstNtpClock is typically set on a #GstPipeline with
49  * gst_pipeline_use_clock().
50  *
51  * If you set a #GstBus on the clock via the "bus" object property, it will
52  * send @GST_MESSAGE_ELEMENT messages with an attached #GstStructure containing
53  * statistics about clock accuracy and network traffic.
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include "gstnettimepacket.h"
61 #include "gstntppacket.h"
62 #include "gstnetclientclock.h"
63
64 #include <gio/gio.h>
65
66 #include <string.h>
67
68 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
69 #define GST_CAT_DEFAULT (ncc_debug)
70
71 typedef struct
72 {
73   GstClock *clock;              /* GstNetClientInternalClock */
74
75   GList *clocks;                /* GstNetClientClocks */
76
77   GstClockID remove_id;
78 } ClockCache;
79
80 G_LOCK_DEFINE_STATIC (clocks_lock);
81 static GList *clocks = NULL;
82
83 #define GST_TYPE_NET_CLIENT_INTERNAL_CLOCK \
84   (gst_net_client_internal_clock_get_type())
85 #define GST_NET_CLIENT_INTERNAL_CLOCK(obj) \
86   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClock))
87 #define GST_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
88   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClockClass))
89 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK(obj) \
90   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
91 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
92   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
93
94 typedef struct _GstNetClientInternalClock GstNetClientInternalClock;
95 typedef struct _GstNetClientInternalClockClass GstNetClientInternalClockClass;
96
97 G_GNUC_INTERNAL GType gst_net_client_internal_clock_get_type (void);
98
99 #define DEFAULT_ADDRESS         "127.0.0.1"
100 #define DEFAULT_PORT            5637
101 #define DEFAULT_TIMEOUT         GST_SECOND
102 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
103 /* Minimum timeout will be immediately (ie, as fast as one RTT), but no
104  * more often than 1/20th second (arbitrarily, to spread observations a little) */
105 #define DEFAULT_MINIMUM_UPDATE_INTERVAL (GST_SECOND / 20)
106 #define DEFAULT_BASE_TIME       0
107
108 /* Maximum number of clock updates we can skip before updating */
109 #define MAX_SKIPPED_UPDATES 5
110
111 #define MEDIAN_PRE_FILTERING_WINDOW 9
112
113 enum
114 {
115   PROP_0,
116   PROP_ADDRESS,
117   PROP_PORT,
118   PROP_ROUNDTRIP_LIMIT,
119   PROP_MINIMUM_UPDATE_INTERVAL,
120   PROP_BUS,
121   PROP_BASE_TIME,
122   PROP_INTERNAL_CLOCK,
123   PROP_IS_NTP
124 };
125
126 struct _GstNetClientInternalClock
127 {
128   GstSystemClock clock;
129
130   GThread *thread;
131
132   GSocket *socket;
133   GSocketAddress *servaddr;
134   GCancellable *cancel;
135   gboolean made_cancel_fd;
136
137   GstClockTime timeout_expiration;
138   GstClockTime roundtrip_limit;
139   GstClockTime rtt_avg;
140   GstClockTime minimum_update_interval;
141   GstClockTime last_remote_poll_interval;
142   guint skipped_updates;
143   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
144   gint last_rtts_missing;
145
146   gchar *address;
147   gint port;
148   gboolean is_ntp;
149
150   /* Protected by OBJECT_LOCK */
151   GList *busses;
152 };
153
154 struct _GstNetClientInternalClockClass
155 {
156   GstSystemClockClass parent_class;
157 };
158
159 #define _do_init \
160   GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
161 G_DEFINE_TYPE_WITH_CODE (GstNetClientInternalClock,
162     gst_net_client_internal_clock, GST_TYPE_SYSTEM_CLOCK, _do_init);
163
164 static void gst_net_client_internal_clock_finalize (GObject * object);
165 static void gst_net_client_internal_clock_set_property (GObject * object,
166     guint prop_id, const GValue * value, GParamSpec * pspec);
167 static void gst_net_client_internal_clock_get_property (GObject * object,
168     guint prop_id, GValue * value, GParamSpec * pspec);
169 static void gst_net_client_internal_clock_constructed (GObject * object);
170
171 static gboolean gst_net_client_internal_clock_start (GstNetClientInternalClock *
172     self);
173 static void gst_net_client_internal_clock_stop (GstNetClientInternalClock *
174     self);
175
176 static void
177 gst_net_client_internal_clock_class_init (GstNetClientInternalClockClass *
178     klass)
179 {
180   GObjectClass *gobject_class;
181
182   gobject_class = G_OBJECT_CLASS (klass);
183
184   gobject_class->finalize = gst_net_client_internal_clock_finalize;
185   gobject_class->get_property = gst_net_client_internal_clock_get_property;
186   gobject_class->set_property = gst_net_client_internal_clock_set_property;
187   gobject_class->constructed = gst_net_client_internal_clock_constructed;
188
189   g_object_class_install_property (gobject_class, PROP_ADDRESS,
190       g_param_spec_string ("address", "address",
191           "The IP address of the machine providing a time server",
192           DEFAULT_ADDRESS,
193           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
194   g_object_class_install_property (gobject_class, PROP_PORT,
195       g_param_spec_int ("port", "port",
196           "The port on which the remote server is listening", 0, G_MAXUINT16,
197           DEFAULT_PORT,
198           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
199   g_object_class_install_property (gobject_class, PROP_IS_NTP,
200       g_param_spec_boolean ("is-ntp", "Is NTP",
201           "The clock is using the NTPv4 protocol", FALSE,
202           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
203 }
204
205 static void
206 gst_net_client_internal_clock_init (GstNetClientInternalClock * self)
207 {
208   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
209
210   self->port = DEFAULT_PORT;
211   self->address = g_strdup (DEFAULT_ADDRESS);
212   self->is_ntp = FALSE;
213
214   gst_clock_set_timeout (GST_CLOCK (self), DEFAULT_TIMEOUT);
215
216   self->thread = NULL;
217
218   self->servaddr = NULL;
219   self->rtt_avg = GST_CLOCK_TIME_NONE;
220   self->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
221   self->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
222   self->last_remote_poll_interval = GST_CLOCK_TIME_NONE;
223   self->skipped_updates = 0;
224   self->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW;
225 }
226
227 static void
228 gst_net_client_internal_clock_finalize (GObject * object)
229 {
230   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
231
232   if (self->thread) {
233     gst_net_client_internal_clock_stop (self);
234   }
235
236   g_free (self->address);
237   self->address = NULL;
238
239   if (self->servaddr != NULL) {
240     g_object_unref (self->servaddr);
241     self->servaddr = NULL;
242   }
243
244   if (self->socket != NULL) {
245     if (!g_socket_close (self->socket, NULL))
246       GST_ERROR_OBJECT (self, "Failed to close socket");
247     g_object_unref (self->socket);
248     self->socket = NULL;
249   }
250
251   g_warn_if_fail (self->busses == NULL);
252
253   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->finalize
254       (object);
255 }
256
257 static void
258 gst_net_client_internal_clock_set_property (GObject * object, guint prop_id,
259     const GValue * value, GParamSpec * pspec)
260 {
261   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
262
263   switch (prop_id) {
264     case PROP_ADDRESS:
265       GST_OBJECT_LOCK (self);
266       g_free (self->address);
267       self->address = g_value_dup_string (value);
268       if (self->address == NULL)
269         self->address = g_strdup (DEFAULT_ADDRESS);
270       GST_OBJECT_UNLOCK (self);
271       break;
272     case PROP_PORT:
273       GST_OBJECT_LOCK (self);
274       self->port = g_value_get_int (value);
275       GST_OBJECT_UNLOCK (self);
276       break;
277     case PROP_IS_NTP:
278       GST_OBJECT_LOCK (self);
279       self->is_ntp = g_value_get_boolean (value);
280       GST_OBJECT_UNLOCK (self);
281       break;
282     default:
283       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
284       break;
285   }
286 }
287
288 static void
289 gst_net_client_internal_clock_get_property (GObject * object, guint prop_id,
290     GValue * value, GParamSpec * pspec)
291 {
292   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
293
294   switch (prop_id) {
295     case PROP_ADDRESS:
296       GST_OBJECT_LOCK (self);
297       g_value_set_string (value, self->address);
298       GST_OBJECT_UNLOCK (self);
299       break;
300     case PROP_PORT:
301       g_value_set_int (value, self->port);
302       break;
303     case PROP_IS_NTP:
304       g_value_set_boolean (value, self->is_ntp);
305       break;
306     default:
307       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
308       break;
309   }
310 }
311
312 static void
313 gst_net_client_internal_clock_constructed (GObject * object)
314 {
315   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
316
317   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->constructed
318       (object);
319
320   if (!gst_net_client_internal_clock_start (self)) {
321     g_warning ("failed to start clock '%s'", GST_OBJECT_NAME (self));
322   }
323
324   /* all systems go, cap'n */
325 }
326
327 static gint
328 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
329 {
330   if (*a < *b)
331     return -1;
332   else if (*a > *b)
333     return 1;
334   return 0;
335 }
336
337 static void
338 gst_net_client_internal_clock_observe_times (GstNetClientInternalClock * self,
339     GstClockTime local_1, GstClockTime remote_1, GstClockTime remote_2,
340     GstClockTime local_2)
341 {
342   GstClockTime current_timeout = 0;
343   GstClockTime local_avg, remote_avg;
344   gdouble r_squared;
345   GstClock *clock;
346   GstClockTime rtt, rtt_limit, min_update_interval;
347   /* Use for discont tracking */
348   GstClockTime time_before = 0;
349   GstClockTime min_guess = 0;
350   GstClockTimeDiff time_discont = 0;
351   gboolean synched, now_synched;
352   GstClockTime internal_time, external_time, rate_num, rate_den;
353   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
354       orig_rate_den;
355   GstClockTime max_discont;
356   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
357   GstClockTime median;
358   gint i;
359
360   GST_OBJECT_LOCK (self);
361   rtt_limit = self->roundtrip_limit;
362
363   GST_LOG_OBJECT (self,
364       "local1 %" G_GUINT64_FORMAT " remote1 %" G_GUINT64_FORMAT " remote2 %"
365       G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT, local_1, remote_1,
366       remote_2, local_2);
367
368   /* If the server told us a poll interval and it's bigger than the
369    * one configured via the property, use the server's */
370   if (self->last_remote_poll_interval != GST_CLOCK_TIME_NONE &&
371       self->last_remote_poll_interval > self->minimum_update_interval)
372     min_update_interval = self->last_remote_poll_interval;
373   else
374     min_update_interval = self->minimum_update_interval;
375   GST_OBJECT_UNLOCK (self);
376
377   if (local_2 < local_1) {
378     GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
379         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
380         GST_TIME_ARGS (local_2));
381     goto bogus_observation;
382   }
383
384   if (remote_2 < remote_1) {
385     GST_LOG_OBJECT (self,
386         "Dropping observation: remote receive time %" GST_TIME_FORMAT
387         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (remote_1),
388         GST_TIME_ARGS (remote_2));
389     goto bogus_observation;
390   }
391
392   /* The round trip time is (assuming symmetric path delays)
393    * delta = (local_2 - local_1) - (remote_2 - remote_1)
394    */
395
396   rtt = GST_CLOCK_DIFF (local_1, local_2) - GST_CLOCK_DIFF (remote_1, remote_2);
397
398   if ((rtt_limit > 0) && (rtt > rtt_limit)) {
399     GST_LOG_OBJECT (self,
400         "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
401         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
402     goto bogus_observation;
403   }
404
405   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
406     self->last_rtts[i - 1] = self->last_rtts[i];
407   self->last_rtts[i - 1] = rtt;
408
409   if (self->last_rtts_missing) {
410     self->last_rtts_missing--;
411   } else {
412     memcpy (&last_rtts, &self->last_rtts, sizeof (last_rtts));
413     g_qsort_with_data (&last_rtts,
414         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
415         (GCompareDataFunc) compare_clock_time, NULL);
416
417     median = last_rtts[MEDIAN_PRE_FILTERING_WINDOW / 2];
418
419     /* FIXME: We might want to use something else here, like only allowing
420      * things in the interquartile range, or also filtering away delays that
421      * are too small compared to the median. This here worked well enough
422      * in tests so far.
423      */
424     if (rtt > 2 * median) {
425       GST_LOG_OBJECT (self,
426           "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * median %"
427           GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (median));
428       goto bogus_observation;
429     }
430   }
431
432   /* Track an average round trip time, for a bit of smoothing */
433   /* Always update before discarding a sample, so genuine changes in
434    * the network get picked up, eventually */
435   if (self->rtt_avg == GST_CLOCK_TIME_NONE)
436     self->rtt_avg = rtt;
437   else if (rtt < self->rtt_avg) /* Shorter RTTs carry more weight than longer */
438     self->rtt_avg = (3 * self->rtt_avg + rtt) / 4;
439   else
440     self->rtt_avg = (15 * self->rtt_avg + rtt) / 16;
441
442   if (rtt > 2 * self->rtt_avg) {
443     GST_LOG_OBJECT (self,
444         "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
445         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (self->rtt_avg));
446     goto bogus_observation;
447   }
448
449   /* The difference between the local and remote clock (again assuming
450    * symmetric path delays):
451    *
452    * local_1 + delta / 2 - remote_1 = theta
453    * or
454    * local_2 - delta / 2 - remote_2 = theta
455    *
456    * which gives after some simple algebraic transformations:
457    *
458    *         (remote_1 - local_1) + (remote_2 - local_2)
459    * theta = -------------------------------------------
460    *                              2
461    *
462    *
463    * Thus remote time at local_avg is equal to:
464    *
465    * local_avg + theta =
466    *
467    * local_1 + local_2   (remote_1 - local_1) + (remote_2 - local_2)
468    * ----------------- + -------------------------------------------
469    *         2                                2
470    *
471    * =
472    *
473    * remote_1 + remote_2
474    * ------------------- = remote_avg
475    *          2
476    *
477    * We use this for our clock estimation, i.e. local_avg at remote clock
478    * being the same as remote_avg.
479    */
480
481   local_avg = (local_2 + local_1) / 2;
482   remote_avg = (remote_2 + remote_1) / 2;
483
484   GST_LOG_OBJECT (self,
485       "remoteavg %" G_GUINT64_FORMAT " localavg %" G_GUINT64_FORMAT,
486       remote_avg, local_avg);
487
488   clock = GST_CLOCK_CAST (self);
489
490   /* Store what the clock produced as 'now' before this update */
491   gst_clock_get_calibration (GST_CLOCK_CAST (self), &orig_internal_time,
492       &orig_external_time, &orig_rate_num, &orig_rate_den);
493   internal_time = orig_internal_time;
494   external_time = orig_external_time;
495   rate_num = orig_rate_num;
496   rate_den = orig_rate_den;
497
498   min_guess =
499       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1,
500       internal_time, external_time, rate_num, rate_den);
501   time_before =
502       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
503       internal_time, external_time, rate_num, rate_den);
504
505   /* Maximum discontinuity, when we're synched with the master. Could make this a property,
506    * but this value seems to work fine */
507   max_discont = self->rtt_avg / 4;
508
509   /* If the remote observation was within a max_discont window around our min/max estimates, we're synched */
510   synched =
511       (GST_CLOCK_DIFF (remote_avg, min_guess) < (GstClockTimeDiff) (max_discont)
512       && GST_CLOCK_DIFF (time_before,
513           remote_avg) < (GstClockTimeDiff) (max_discont));
514
515   if (gst_clock_add_observation_unapplied (GST_CLOCK_CAST (self),
516           local_avg, remote_avg, &r_squared, &internal_time, &external_time,
517           &rate_num, &rate_den)) {
518
519     /* Now compare the difference (discont) in the clock
520      * after this observation */
521     time_discont = GST_CLOCK_DIFF (time_before,
522         gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
523             internal_time, external_time, rate_num, rate_den));
524
525     /* If we were in sync with the remote clock, clamp the allowed
526      * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
527      * of remote time correctly windowed the actual remote time observation */
528     if (synched && ABS (time_discont) > max_discont) {
529       GstClockTimeDiff offset;
530       GST_DEBUG_OBJECT (clock,
531           "Too large a discont, clamping to 1/4 average RTT = %"
532           GST_TIME_FORMAT, GST_TIME_ARGS (max_discont));
533       if (time_discont > 0) {   /* Too large a forward step - add a -ve offset */
534         offset = max_discont - time_discont;
535         if (-offset > external_time)
536           external_time = 0;
537         else
538           external_time += offset;
539       } else {                  /* Too large a backward step - add a +ve offset */
540         offset = -(max_discont + time_discont);
541         external_time += offset;
542       }
543
544       time_discont += offset;
545     }
546
547     /* Check if the new clock params would have made our observation within range */
548     now_synched =
549         (GST_CLOCK_DIFF (remote_avg,
550             gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self),
551                 local_1, internal_time, external_time, rate_num,
552                 rate_den)) < (GstClockTimeDiff) (max_discont))
553         &&
554         (GST_CLOCK_DIFF (gst_clock_adjust_with_calibration
555             (GST_CLOCK_CAST (self), local_2, internal_time, external_time,
556                 rate_num, rate_den),
557             remote_avg) < (GstClockTimeDiff) (max_discont));
558
559     /* Only update the clock if we had synch or just gained it */
560     if (synched || now_synched || self->skipped_updates > MAX_SKIPPED_UPDATES) {
561       gst_clock_set_calibration (GST_CLOCK_CAST (self), internal_time,
562           external_time, rate_num, rate_den);
563       /* ghetto formula - shorter timeout for bad correlations */
564       current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
565       current_timeout =
566           MIN (current_timeout, gst_clock_get_timeout (GST_CLOCK_CAST (self)));
567       self->skipped_updates = 0;
568
569       /* FIXME: When do we consider the clock absolutely not synced anymore? */
570       gst_clock_set_synced (GST_CLOCK (self), TRUE);
571     } else {
572       /* Restore original calibration vars for the report, we're not changing the clock */
573       internal_time = orig_internal_time;
574       external_time = orig_external_time;
575       rate_num = orig_rate_num;
576       rate_den = orig_rate_den;
577       time_discont = 0;
578       self->skipped_updates++;
579     }
580   }
581
582   /* Limit the polling to at most one per minimum_update_interval */
583   if (rtt < min_update_interval)
584     current_timeout = MAX (min_update_interval - rtt, current_timeout);
585
586   GST_OBJECT_LOCK (self);
587   if (self->busses) {
588     GstStructure *s;
589     GstMessage *msg;
590     GList *l;
591
592     /* Output a stats message, whether we updated the clock or not */
593     s = gst_structure_new ("gst-netclock-statistics",
594         "synchronised", G_TYPE_BOOLEAN, synched,
595         "rtt", G_TYPE_UINT64, rtt,
596         "rtt-average", G_TYPE_UINT64, self->rtt_avg,
597         "local", G_TYPE_UINT64, local_avg,
598         "remote", G_TYPE_UINT64, remote_avg,
599         "discontinuity", G_TYPE_INT64, time_discont,
600         "remote-min-estimate", G_TYPE_UINT64, min_guess,
601         "remote-max-estimate", G_TYPE_UINT64, time_before,
602         "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote_avg,
603             min_guess), "remote-max-error", G_TYPE_INT64,
604         GST_CLOCK_DIFF (remote_avg, time_before), "request-send", G_TYPE_UINT64,
605         local_1, "request-receive", G_TYPE_UINT64, local_2, "r-squared",
606         G_TYPE_DOUBLE, r_squared, "timeout", G_TYPE_UINT64, current_timeout,
607         "internal-time", G_TYPE_UINT64, internal_time, "external-time",
608         G_TYPE_UINT64, external_time, "rate-num", G_TYPE_UINT64, rate_num,
609         "rate-den", G_TYPE_UINT64, rate_den, "rate", G_TYPE_DOUBLE,
610         (gdouble) (rate_num) / rate_den, "local-clock-offset", G_TYPE_INT64,
611         GST_CLOCK_DIFF (internal_time, external_time), NULL);
612     msg = gst_message_new_element (GST_OBJECT (self), s);
613
614     for (l = self->busses; l; l = l->next)
615       gst_bus_post (l->data, gst_message_ref (msg));
616     gst_message_unref (msg);
617   }
618   GST_OBJECT_UNLOCK (self);
619
620   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
621   self->timeout_expiration = gst_util_get_timestamp () + current_timeout;
622
623   return;
624
625 bogus_observation:
626   /* Schedule a new packet again soon */
627   self->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
628   return;
629 }
630
631 static gpointer
632 gst_net_client_internal_clock_thread (gpointer data)
633 {
634   GstNetClientInternalClock *self = data;
635   GSocket *socket = self->socket;
636   GError *err = NULL;
637
638   GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
639
640   g_socket_set_blocking (socket, TRUE);
641   g_socket_set_timeout (socket, 0);
642
643   while (!g_cancellable_is_cancelled (self->cancel)) {
644     GstClockTime expiration_time = self->timeout_expiration;
645     GstClockTime now = gst_util_get_timestamp ();
646     gint64 socket_timeout;
647
648     if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
649       socket_timeout = 0;
650     } else {
651       socket_timeout = (expiration_time - now) / GST_USECOND;
652     }
653
654     GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
655
656     if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
657             self->cancel, &err)) {
658       /* cancelled, timeout or error */
659       if (err->code == G_IO_ERROR_CANCELLED) {
660         GST_INFO_OBJECT (self, "cancelled");
661         g_clear_error (&err);
662         break;
663       } else if (err->code == G_IO_ERROR_TIMED_OUT) {
664         /* timed out, let's send another packet */
665         GST_DEBUG_OBJECT (self, "timed out");
666
667         if (self->is_ntp) {
668           GstNtpPacket *packet;
669
670           packet = gst_ntp_packet_new (NULL, NULL);
671
672           packet->transmit_time =
673               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
674
675           GST_DEBUG_OBJECT (self,
676               "sending packet, local time = %" GST_TIME_FORMAT,
677               GST_TIME_ARGS (packet->transmit_time));
678
679           gst_ntp_packet_send (packet, self->socket, self->servaddr, NULL);
680
681           g_free (packet);
682         } else {
683           GstNetTimePacket *packet;
684
685           packet = gst_net_time_packet_new (NULL);
686
687           packet->local_time =
688               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
689
690           GST_DEBUG_OBJECT (self,
691               "sending packet, local time = %" GST_TIME_FORMAT,
692               GST_TIME_ARGS (packet->local_time));
693
694           gst_net_time_packet_send (packet, self->socket, self->servaddr, NULL);
695
696           g_free (packet);
697         }
698
699         /* reset timeout (but are expecting a response sooner anyway) */
700         self->timeout_expiration =
701             gst_util_get_timestamp () +
702             gst_clock_get_timeout (GST_CLOCK_CAST (self));
703       } else {
704         GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
705         g_usleep (G_USEC_PER_SEC / 10); /* throttle */
706       }
707       g_clear_error (&err);
708     } else {
709       GstClockTime new_local;
710
711       /* got packet */
712
713       new_local = gst_clock_get_internal_time (GST_CLOCK_CAST (self));
714
715       if (self->is_ntp) {
716         GstNtpPacket *packet;
717
718         packet = gst_ntp_packet_receive (socket, NULL, &err);
719
720         if (packet != NULL) {
721           GST_LOG_OBJECT (self, "got packet back");
722           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
723               GST_TIME_ARGS (packet->origin_time));
724           GST_LOG_OBJECT (self, "remote_1 = %" GST_TIME_FORMAT,
725               GST_TIME_ARGS (packet->receive_time));
726           GST_LOG_OBJECT (self, "remote_2 = %" GST_TIME_FORMAT,
727               GST_TIME_ARGS (packet->transmit_time));
728           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
729               GST_TIME_ARGS (new_local));
730           GST_LOG_OBJECT (self, "poll_interval = %" GST_TIME_FORMAT,
731               GST_TIME_ARGS (packet->poll_interval));
732
733           /* Remember the last poll interval we ever got from the server */
734           if (packet->poll_interval != GST_CLOCK_TIME_NONE)
735             self->last_remote_poll_interval = packet->poll_interval;
736
737           /* observe_times will reset the timeout */
738           gst_net_client_internal_clock_observe_times (self,
739               packet->origin_time, packet->receive_time, packet->transmit_time,
740               new_local);
741
742           g_free (packet);
743         } else if (err != NULL) {
744           if (g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION)
745               || g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY)) {
746             GST_ERROR_OBJECT (self, "fatal receive error: %s", err->message);
747             g_clear_error (&err);
748             break;
749           } else if (g_error_matches (err, GST_NTP_ERROR,
750                   GST_NTP_ERROR_KOD_RATE)) {
751             GST_WARNING_OBJECT (self, "need to limit rate");
752
753             /* If the server did not tell us a poll interval before, double
754              * our minimum poll interval. Otherwise we assume that the server
755              * already told us something sensible and that this error here
756              * was just a spurious error */
757             if (self->last_remote_poll_interval == GST_CLOCK_TIME_NONE)
758               self->minimum_update_interval *= 2;
759
760             /* And wait a bit before we send the next packet instead of
761              * sending it immediately */
762             self->timeout_expiration =
763                 gst_util_get_timestamp () +
764                 gst_clock_get_timeout (GST_CLOCK_CAST (self));
765           } else {
766             GST_WARNING_OBJECT (self, "receive error: %s", err->message);
767           }
768           g_clear_error (&err);
769         }
770       } else {
771         GstNetTimePacket *packet;
772
773         packet = gst_net_time_packet_receive (socket, NULL, &err);
774
775         if (packet != NULL) {
776           GST_LOG_OBJECT (self, "got packet back");
777           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
778               GST_TIME_ARGS (packet->local_time));
779           GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
780               GST_TIME_ARGS (packet->remote_time));
781           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
782               GST_TIME_ARGS (new_local));
783
784           /* observe_times will reset the timeout */
785           gst_net_client_internal_clock_observe_times (self, packet->local_time,
786               packet->remote_time, packet->remote_time, new_local);
787
788           g_free (packet);
789         } else if (err != NULL) {
790           GST_WARNING_OBJECT (self, "receive error: %s", err->message);
791           g_clear_error (&err);
792         }
793       }
794     }
795   }
796   GST_INFO_OBJECT (self, "shutting down net client clock thread");
797   return NULL;
798 }
799
800 static gboolean
801 gst_net_client_internal_clock_start (GstNetClientInternalClock * self)
802 {
803   GSocketAddress *servaddr;
804   GSocketAddress *myaddr;
805   GSocketAddress *anyaddr;
806   GInetAddress *inetaddr;
807   GSocket *socket;
808   GError *error = NULL;
809   GSocketFamily family;
810   GPollFD dummy_pollfd;
811   GResolver *resolver = NULL;
812   GError *err = NULL;
813
814   g_return_val_if_fail (self->address != NULL, FALSE);
815   g_return_val_if_fail (self->servaddr == NULL, FALSE);
816
817   /* create target address */
818   inetaddr = g_inet_address_new_from_string (self->address);
819   if (inetaddr == NULL) {
820     GList *results;
821
822     resolver = g_resolver_get_default ();
823
824     results = g_resolver_lookup_by_name (resolver, self->address, NULL, &err);
825     if (!results)
826       goto failed_to_resolve;
827
828     inetaddr = G_INET_ADDRESS (g_object_ref (results->data));
829     g_resolver_free_addresses (results);
830     g_object_unref (resolver);
831   }
832
833   family = g_inet_address_get_family (inetaddr);
834
835   servaddr = g_inet_socket_address_new (inetaddr, self->port);
836   g_object_unref (inetaddr);
837
838   g_assert (servaddr != NULL);
839
840   GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address,
841       self->port);
842
843   socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
844       G_SOCKET_PROTOCOL_UDP, &error);
845
846   if (socket == NULL)
847     goto no_socket;
848
849   GST_DEBUG_OBJECT (self, "binding socket");
850   inetaddr = g_inet_address_new_any (family);
851   anyaddr = g_inet_socket_address_new (inetaddr, 0);
852   g_socket_bind (socket, anyaddr, TRUE, &error);
853   g_object_unref (anyaddr);
854   g_object_unref (inetaddr);
855
856   if (error != NULL)
857     goto bind_error;
858
859   /* check address we're bound to, mostly for debugging purposes */
860   myaddr = g_socket_get_local_address (socket, &error);
861
862   if (myaddr == NULL)
863     goto getsockname_error;
864
865   GST_DEBUG_OBJECT (self, "socket opened on UDP port %d",
866       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
867
868   g_object_unref (myaddr);
869
870   self->cancel = g_cancellable_new ();
871   self->made_cancel_fd =
872       g_cancellable_make_pollfd (self->cancel, &dummy_pollfd);
873
874   self->socket = socket;
875   self->servaddr = G_SOCKET_ADDRESS (servaddr);
876
877   self->thread = g_thread_try_new ("GstNetClientInternalClock",
878       gst_net_client_internal_clock_thread, self, &error);
879
880   if (error != NULL)
881     goto no_thread;
882
883   return TRUE;
884
885   /* ERRORS */
886 no_socket:
887   {
888     GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
889     g_error_free (error);
890     return FALSE;
891   }
892 bind_error:
893   {
894     GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
895     g_error_free (error);
896     g_object_unref (socket);
897     return FALSE;
898   }
899 getsockname_error:
900   {
901     GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
902     g_error_free (error);
903     g_object_unref (socket);
904     return FALSE;
905   }
906 failed_to_resolve:
907   {
908     GST_ERROR_OBJECT (self, "resolving '%s' failed: %s",
909         self->address, err->message);
910     g_clear_error (&err);
911     g_object_unref (resolver);
912     return FALSE;
913   }
914 no_thread:
915   {
916     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
917     g_object_unref (self->servaddr);
918     self->servaddr = NULL;
919     g_object_unref (self->socket);
920     self->socket = NULL;
921     g_error_free (error);
922     return FALSE;
923   }
924 }
925
926 static void
927 gst_net_client_internal_clock_stop (GstNetClientInternalClock * self)
928 {
929   if (self->thread == NULL)
930     return;
931
932   GST_INFO_OBJECT (self, "stopping...");
933   g_cancellable_cancel (self->cancel);
934
935   g_thread_join (self->thread);
936   self->thread = NULL;
937
938   if (self->made_cancel_fd)
939     g_cancellable_release_fd (self->cancel);
940
941   g_object_unref (self->cancel);
942   self->cancel = NULL;
943
944   g_object_unref (self->servaddr);
945   self->servaddr = NULL;
946
947   g_object_unref (self->socket);
948   self->socket = NULL;
949
950   GST_INFO_OBJECT (self, "stopped");
951 }
952
953 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
954   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
955
956 struct _GstNetClientClockPrivate
957 {
958   GstClock *internal_clock;
959
960   GstClockTime roundtrip_limit;
961   GstClockTime minimum_update_interval;
962
963   GstClockTime base_time, internal_base_time;
964
965   gchar *address;
966   gint port;
967
968   GstBus *bus;
969
970   gboolean is_ntp;
971
972   gulong synced_id;
973 };
974
975 G_DEFINE_TYPE (GstNetClientClock, gst_net_client_clock, GST_TYPE_SYSTEM_CLOCK);
976
977 static void gst_net_client_clock_finalize (GObject * object);
978 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
979     const GValue * value, GParamSpec * pspec);
980 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
981     GValue * value, GParamSpec * pspec);
982 static void gst_net_client_clock_constructed (GObject * object);
983
984 static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock);
985
986 static void
987 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
988 {
989   GObjectClass *gobject_class;
990   GstClockClass *clock_class;
991
992   gobject_class = G_OBJECT_CLASS (klass);
993   clock_class = GST_CLOCK_CLASS (klass);
994
995   g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
996
997   gobject_class->finalize = gst_net_client_clock_finalize;
998   gobject_class->get_property = gst_net_client_clock_get_property;
999   gobject_class->set_property = gst_net_client_clock_set_property;
1000   gobject_class->constructed = gst_net_client_clock_constructed;
1001
1002   g_object_class_install_property (gobject_class, PROP_ADDRESS,
1003       g_param_spec_string ("address", "address",
1004           "The IP address of the machine providing a time server",
1005           DEFAULT_ADDRESS,
1006           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1007   g_object_class_install_property (gobject_class, PROP_PORT,
1008       g_param_spec_int ("port", "port",
1009           "The port on which the remote server is listening", 0, G_MAXUINT16,
1010           DEFAULT_PORT,
1011           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1012   g_object_class_install_property (gobject_class, PROP_BUS,
1013       g_param_spec_object ("bus", "bus",
1014           "A GstBus on which to send clock status information", GST_TYPE_BUS,
1015           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1016
1017   /**
1018    * GstNetClientInternalClock::round-trip-limit:
1019    *
1020    * Maximum allowed round-trip for packets. If this property is set to a nonzero
1021    * value, all packets with a round-trip interval larger than this limit will be
1022    * ignored. This is useful for networks with severe and fluctuating transport
1023    * delays. Filtering out these packets increases stability of the synchronization.
1024    * On the other hand, the lower the limit, the higher the amount of filtered
1025    * packets. Empirical tests are typically necessary to estimate a good value
1026    * for the limit.
1027    * If the property is set to zero, the limit is disabled.
1028    *
1029    * Since: 1.4
1030    */
1031   g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
1032       g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
1033           "Maximum tolerable round-trip interval for packets, in nanoseconds "
1034           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
1035           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1036
1037   g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL,
1038       g_param_spec_uint64 ("minimum-update-interval", "minimum update interval",
1039           "Minimum polling interval for packets, in nanoseconds"
1040           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL,
1041           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1042
1043   g_object_class_install_property (gobject_class, PROP_BASE_TIME,
1044       g_param_spec_uint64 ("base-time", "Base Time",
1045           "Initial time that is reported before synchronization", 0,
1046           G_MAXUINT64, DEFAULT_BASE_TIME,
1047           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1048
1049   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
1050       g_param_spec_object ("internal-clock", "Internal Clock",
1051           "Internal clock that directly slaved to the remote clock",
1052           GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1053
1054   clock_class->get_internal_time = gst_net_client_clock_get_internal_time;
1055 }
1056
1057 static void
1058 gst_net_client_clock_init (GstNetClientClock * self)
1059 {
1060   GstNetClientClockPrivate *priv;
1061   GstClock *clock;
1062
1063   self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
1064
1065   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
1066   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
1067
1068   priv->port = DEFAULT_PORT;
1069   priv->address = g_strdup (DEFAULT_ADDRESS);
1070
1071   priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
1072   priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
1073
1074   clock = gst_system_clock_obtain ();
1075   priv->base_time = DEFAULT_BASE_TIME;
1076   priv->internal_base_time = gst_clock_get_time (clock);
1077   gst_object_unref (clock);
1078 }
1079
1080 /* Must be called with clocks_lock */
1081 static void
1082 update_clock_cache (ClockCache * cache)
1083 {
1084   GstClockTime roundtrip_limit = 0, minimum_update_interval = 0;
1085   GList *l, *busses = NULL;
1086
1087   GST_OBJECT_LOCK (cache->clock);
1088   g_list_free_full (GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses,
1089       (GDestroyNotify) gst_object_unref);
1090
1091   for (l = cache->clocks; l; l = l->next) {
1092     GstNetClientClock *clock = l->data;
1093
1094     if (clock->priv->bus)
1095       busses = g_list_prepend (busses, gst_object_ref (clock->priv->bus));
1096
1097     if (roundtrip_limit == 0)
1098       roundtrip_limit = clock->priv->roundtrip_limit;
1099     else
1100       roundtrip_limit = MAX (roundtrip_limit, clock->priv->roundtrip_limit);
1101
1102     if (minimum_update_interval == 0)
1103       minimum_update_interval = clock->priv->minimum_update_interval;
1104     else
1105       minimum_update_interval =
1106           MIN (minimum_update_interval, clock->priv->minimum_update_interval);
1107   }
1108   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses = busses;
1109   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->roundtrip_limit =
1110       roundtrip_limit;
1111   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->minimum_update_interval =
1112       minimum_update_interval;
1113
1114   GST_OBJECT_UNLOCK (cache->clock);
1115 }
1116
1117 static gboolean
1118 remove_clock_cache (GstClock * clock, GstClockTime time, GstClockID id,
1119     gpointer user_data)
1120 {
1121   ClockCache *cache = user_data;
1122
1123   G_LOCK (clocks_lock);
1124   if (!cache->clocks) {
1125     gst_clock_id_unref (cache->remove_id);
1126     gst_object_unref (cache->clock);
1127     clocks = g_list_remove (clocks, cache);
1128     g_free (cache);
1129   }
1130   G_UNLOCK (clocks_lock);
1131
1132   return TRUE;
1133 }
1134
1135 static void
1136 gst_net_client_clock_finalize (GObject * object)
1137 {
1138   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1139   GList *l;
1140
1141   if (self->priv->synced_id)
1142     g_signal_handler_disconnect (self->priv->internal_clock,
1143         self->priv->synced_id);
1144   self->priv->synced_id = 0;
1145
1146   G_LOCK (clocks_lock);
1147   for (l = clocks; l; l = l->next) {
1148     ClockCache *cache = l->data;
1149
1150     if (cache->clock == self->priv->internal_clock) {
1151       cache->clocks = g_list_remove (cache->clocks, self);
1152
1153       if (cache->clocks) {
1154         update_clock_cache (cache);
1155       } else {
1156         GstClock *sysclock = gst_system_clock_obtain ();
1157         GstClockTime time = gst_clock_get_time (sysclock) + 60 * GST_SECOND;
1158
1159         cache->remove_id = gst_clock_new_single_shot_id (sysclock, time);
1160         gst_clock_id_wait_async (cache->remove_id, remove_clock_cache, cache,
1161             NULL);
1162         gst_object_unref (sysclock);
1163       }
1164       break;
1165     }
1166   }
1167   G_UNLOCK (clocks_lock);
1168
1169   g_free (self->priv->address);
1170   self->priv->address = NULL;
1171
1172   if (self->priv->bus != NULL) {
1173     gst_object_unref (self->priv->bus);
1174     self->priv->bus = NULL;
1175   }
1176
1177   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->finalize (object);
1178 }
1179
1180 static void
1181 gst_net_client_clock_set_property (GObject * object, guint prop_id,
1182     const GValue * value, GParamSpec * pspec)
1183 {
1184   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1185   gboolean update = FALSE;
1186
1187   switch (prop_id) {
1188     case PROP_ADDRESS:
1189       GST_OBJECT_LOCK (self);
1190       g_free (self->priv->address);
1191       self->priv->address = g_value_dup_string (value);
1192       if (self->priv->address == NULL)
1193         self->priv->address = g_strdup (DEFAULT_ADDRESS);
1194       GST_OBJECT_UNLOCK (self);
1195       break;
1196     case PROP_PORT:
1197       GST_OBJECT_LOCK (self);
1198       self->priv->port = g_value_get_int (value);
1199       GST_OBJECT_UNLOCK (self);
1200       break;
1201     case PROP_ROUNDTRIP_LIMIT:
1202       GST_OBJECT_LOCK (self);
1203       self->priv->roundtrip_limit = g_value_get_uint64 (value);
1204       GST_OBJECT_UNLOCK (self);
1205       update = TRUE;
1206       break;
1207     case PROP_MINIMUM_UPDATE_INTERVAL:
1208       GST_OBJECT_LOCK (self);
1209       self->priv->minimum_update_interval = g_value_get_uint64 (value);
1210       GST_OBJECT_UNLOCK (self);
1211       update = TRUE;
1212       break;
1213     case PROP_BUS:
1214       GST_OBJECT_LOCK (self);
1215       if (self->priv->bus)
1216         gst_object_unref (self->priv->bus);
1217       self->priv->bus = g_value_dup_object (value);
1218       GST_OBJECT_UNLOCK (self);
1219       update = TRUE;
1220       break;
1221     case PROP_BASE_TIME:{
1222       GstClock *clock;
1223
1224       self->priv->base_time = g_value_get_uint64 (value);
1225       clock = gst_system_clock_obtain ();
1226       self->priv->internal_base_time = gst_clock_get_time (clock);
1227       gst_object_unref (clock);
1228       break;
1229     }
1230     default:
1231       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1232       break;
1233   }
1234
1235   if (update && self->priv->internal_clock) {
1236     GList *l;
1237
1238     G_LOCK (clocks_lock);
1239     for (l = clocks; l; l = l->next) {
1240       ClockCache *cache = l->data;
1241
1242       if (cache->clock == self->priv->internal_clock) {
1243         update_clock_cache (cache);
1244       }
1245     }
1246     G_UNLOCK (clocks_lock);
1247   }
1248 }
1249
1250 static void
1251 gst_net_client_clock_get_property (GObject * object, guint prop_id,
1252     GValue * value, GParamSpec * pspec)
1253 {
1254   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1255
1256   switch (prop_id) {
1257     case PROP_ADDRESS:
1258       GST_OBJECT_LOCK (self);
1259       g_value_set_string (value, self->priv->address);
1260       GST_OBJECT_UNLOCK (self);
1261       break;
1262     case PROP_PORT:
1263       g_value_set_int (value, self->priv->port);
1264       break;
1265     case PROP_ROUNDTRIP_LIMIT:
1266       GST_OBJECT_LOCK (self);
1267       g_value_set_uint64 (value, self->priv->roundtrip_limit);
1268       GST_OBJECT_UNLOCK (self);
1269       break;
1270     case PROP_MINIMUM_UPDATE_INTERVAL:
1271       GST_OBJECT_LOCK (self);
1272       g_value_set_uint64 (value, self->priv->minimum_update_interval);
1273       GST_OBJECT_UNLOCK (self);
1274       break;
1275     case PROP_BUS:
1276       GST_OBJECT_LOCK (self);
1277       g_value_set_object (value, self->priv->bus);
1278       GST_OBJECT_UNLOCK (self);
1279       break;
1280     case PROP_BASE_TIME:
1281       g_value_set_uint64 (value, self->priv->base_time);
1282       break;
1283     case PROP_INTERNAL_CLOCK:
1284       g_value_set_object (value, self->priv->internal_clock);
1285       break;
1286     default:
1287       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1288       break;
1289   }
1290 }
1291
1292 static void
1293 gst_net_client_clock_synced_cb (GstClock * internal_clock, gboolean synced,
1294     GstClock * self)
1295 {
1296   gst_clock_set_synced (self, synced);
1297 }
1298
1299 static void
1300 gst_net_client_clock_constructed (GObject * object)
1301 {
1302   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1303   GstClock *internal_clock;
1304   GList *l;
1305   ClockCache *cache = NULL;
1306
1307   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->constructed (object);
1308
1309   G_LOCK (clocks_lock);
1310   for (l = clocks; l; l = l->next) {
1311     ClockCache *tmp = l->data;
1312     GstNetClientInternalClock *internal_clock =
1313         GST_NET_CLIENT_INTERNAL_CLOCK (tmp->clock);
1314
1315     if (strcmp (internal_clock->address, self->priv->address) == 0 &&
1316         internal_clock->port == self->priv->port) {
1317       cache = tmp;
1318
1319       if (cache->remove_id) {
1320         gst_clock_id_unschedule (cache->remove_id);
1321         cache->remove_id = NULL;
1322       }
1323       break;
1324     }
1325   }
1326
1327   if (!cache) {
1328     cache = g_new0 (ClockCache, 1);
1329
1330     cache->clock =
1331         g_object_new (GST_TYPE_NET_CLIENT_INTERNAL_CLOCK, "address",
1332         self->priv->address, "port", self->priv->port, "is-ntp",
1333         self->priv->is_ntp, NULL);
1334     clocks = g_list_prepend (clocks, cache);
1335
1336     /* Not actually leaked but is cached for a while before being disposed,
1337      * see gst_net_client_clock_finalize, so pretend it is to not confuse
1338      * tests. */
1339     GST_OBJECT_FLAG_SET (cache->clock, GST_OBJECT_FLAG_MAY_BE_LEAKED);
1340   }
1341
1342   cache->clocks = g_list_prepend (cache->clocks, self);
1343
1344   GST_OBJECT_LOCK (cache->clock);
1345   if (gst_clock_is_synced (cache->clock))
1346     gst_clock_set_synced (GST_CLOCK (self), TRUE);
1347   self->priv->synced_id =
1348       g_signal_connect (cache->clock, "synced",
1349       G_CALLBACK (gst_net_client_clock_synced_cb), self);
1350   GST_OBJECT_UNLOCK (cache->clock);
1351
1352   G_UNLOCK (clocks_lock);
1353
1354   self->priv->internal_clock = internal_clock = cache->clock;
1355
1356   /* all systems go, cap'n */
1357 }
1358
1359 static GstClockTime
1360 gst_net_client_clock_get_internal_time (GstClock * clock)
1361 {
1362   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock);
1363
1364   if (!gst_clock_is_synced (self->priv->internal_clock)) {
1365     GstClockTime now = gst_clock_get_internal_time (self->priv->internal_clock);
1366     return gst_clock_adjust_with_calibration (self->priv->internal_clock, now,
1367         self->priv->internal_base_time, self->priv->base_time, 1, 1);
1368   }
1369
1370   return gst_clock_get_time (self->priv->internal_clock);
1371 }
1372
1373 /**
1374  * gst_net_client_clock_new:
1375  * @name: a name for the clock
1376  * @remote_address: the address or hostname of the remote clock provider
1377  * @remote_port: the port of the remote clock provider
1378  * @base_time: initial time of the clock
1379  *
1380  * Create a new #GstNetClientInternalClock that will report the time
1381  * provided by the #GstNetTimeProvider on @remote_address and 
1382  * @remote_port.
1383  *
1384  * Returns: a new #GstClock that receives a time from the remote
1385  * clock.
1386  */
1387 GstClock *
1388 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
1389     gint remote_port, GstClockTime base_time)
1390 {
1391   GstClock *ret;
1392
1393   g_return_val_if_fail (remote_address != NULL, NULL);
1394   g_return_val_if_fail (remote_port > 0, NULL);
1395   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1396   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1397
1398   ret =
1399       g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "name", name, "address",
1400       remote_address, "port", remote_port, "base-time", base_time, NULL);
1401
1402   return ret;
1403 }
1404
1405 G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_NET_CLIENT_CLOCK);
1406
1407 static void
1408 gst_ntp_clock_class_init (GstNtpClockClass * klass)
1409 {
1410 }
1411
1412 static void
1413 gst_ntp_clock_init (GstNtpClock * self)
1414 {
1415   GST_NET_CLIENT_CLOCK (self)->priv->is_ntp = TRUE;
1416 }
1417
1418 /**
1419  * gst_ntp_clock_new:
1420  * @name: a name for the clock
1421  * @remote_address: the address or hostname of the remote clock provider
1422  * @remote_port: the port of the remote clock provider
1423  * @base_time: initial time of the clock
1424  *
1425  * Create a new #GstNtpClock that will report the time provided by
1426  * the NTPv4 server on @remote_address and @remote_port.
1427  *
1428  * Returns: a new #GstClock that receives a time from the remote
1429  * clock.
1430  *
1431  * Since: 1.6
1432  */
1433 GstClock *
1434 gst_ntp_clock_new (const gchar * name, const gchar * remote_address,
1435     gint remote_port, GstClockTime base_time)
1436 {
1437   GstClock *ret;
1438
1439   g_return_val_if_fail (remote_address != NULL, NULL);
1440   g_return_val_if_fail (remote_port > 0, NULL);
1441   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1442   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1443
1444   ret =
1445       g_object_new (GST_TYPE_NTP_CLOCK, "name", name, "address", remote_address,
1446       "port", remote_port, "base-time", base_time, NULL);
1447
1448   return ret;
1449 }