6c8236b92db393f39d2136b3df4c12e3a1ffd685
[platform/upstream/gstreamer.git] / libs / gst / net / gstnetclientclock.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2005 Andy Wingo <wingo@pobox.com>
5  * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
7  *
8  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
9  * the network
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION:gstnetclientclock
28  * @short_description: Special clock that synchronizes to a remote time
29  *                     provider.
30  * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
31  *
32  * #GstNetClientClock implements a custom #GstClock that synchronizes its time
33  * to a remote time provider such as #GstNetTimeProvider. #GstNtpClock
34  * implements a #GstClock that synchronizes its time to a remote NTPv4 server.
35  *
36  * A new clock is created with gst_net_client_clock_new() or
37  * gst_ntp_clock_new(), which takes the address and port of the remote time
38  * provider along with a name and an initial time.
39  *
40  * This clock will poll the time provider and will update its calibration
41  * parameters based on the local and remote observations.
42  *
43  * The "round-trip" property limits the maximum round trip packets can take.
44  *
45  * Various parameters of the clock can be configured with the parent #GstClock
46  * "timeout", "window-size" and "window-threshold" object properties.
47  *
48  * A #GstNetClientClock and #GstNtpClock is typically set on a #GstPipeline with
49  * gst_pipeline_use_clock().
50  *
51  * If you set a #GstBus on the clock via the "bus" object property, it will
52  * send @GST_MESSAGE_ELEMENT messages with an attached #GstStructure containing
53  * statistics about clock accuracy and network traffic.
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include "gstnettimepacket.h"
61 #include "gstntppacket.h"
62 #include "gstnetclientclock.h"
63
64 #include <gio/gio.h>
65
66 #include <string.h>
67
68 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
69 #define GST_CAT_DEFAULT (ncc_debug)
70
71 typedef struct
72 {
73   GstClock *clock;              /* GstNetClientInternalClock */
74
75   GList *clocks;                /* GstNetClientClocks */
76
77   GstClockID remove_id;
78 } ClockCache;
79
80 G_LOCK_DEFINE_STATIC (clocks_lock);
81 static GList *clocks = NULL;
82
83 #define GST_TYPE_NET_CLIENT_INTERNAL_CLOCK \
84   (gst_net_client_internal_clock_get_type())
85 #define GST_NET_CLIENT_INTERNAL_CLOCK(obj) \
86   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClock))
87 #define GST_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
88   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClockClass))
89 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK(obj) \
90   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
91 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
92   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
93
94 typedef struct _GstNetClientInternalClock GstNetClientInternalClock;
95 typedef struct _GstNetClientInternalClockClass GstNetClientInternalClockClass;
96
97 G_GNUC_INTERNAL GType gst_net_client_internal_clock_get_type (void);
98
99 #define DEFAULT_ADDRESS         "127.0.0.1"
100 #define DEFAULT_PORT            5637
101 #define DEFAULT_TIMEOUT         GST_SECOND
102 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
103 /* Minimum timeout will be immediately (ie, as fast as one RTT), but no
104  * more often than 1/20th second (arbitrarily, to spread observations a little) */
105 #define DEFAULT_MINIMUM_UPDATE_INTERVAL (GST_SECOND / 20)
106 #define DEFAULT_BASE_TIME       0
107
108 /* Maximum number of clock updates we can skip before updating */
109 #define MAX_SKIPPED_UPDATES 5
110
111 #define MEDIAN_PRE_FILTERING_WINDOW 9
112
113 enum
114 {
115   PROP_0,
116   PROP_ADDRESS,
117   PROP_PORT,
118   PROP_ROUNDTRIP_LIMIT,
119   PROP_MINIMUM_UPDATE_INTERVAL,
120   PROP_BUS,
121   PROP_BASE_TIME,
122   PROP_INTERNAL_CLOCK,
123   PROP_IS_NTP
124 };
125
126 struct _GstNetClientInternalClock
127 {
128   GstSystemClock clock;
129
130   GThread *thread;
131
132   GSocket *socket;
133   GSocketAddress *servaddr;
134   GCancellable *cancel;
135   gboolean made_cancel_fd;
136
137   GstClockTime timeout_expiration;
138   GstClockTime roundtrip_limit;
139   GstClockTime rtt_avg;
140   GstClockTime minimum_update_interval;
141   GstClockTime last_remote_poll_interval;
142   guint skipped_updates;
143   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
144   gint last_rtts_missing;
145
146   gchar *address;
147   gint port;
148   gboolean is_ntp;
149
150   /* Protected by OBJECT_LOCK */
151   GList *busses;
152 };
153
154 struct _GstNetClientInternalClockClass
155 {
156   GstSystemClockClass parent_class;
157 };
158
159 #define _do_init \
160   GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
161 G_DEFINE_TYPE_WITH_CODE (GstNetClientInternalClock,
162     gst_net_client_internal_clock, GST_TYPE_SYSTEM_CLOCK, _do_init);
163
164 static void gst_net_client_internal_clock_finalize (GObject * object);
165 static void gst_net_client_internal_clock_set_property (GObject * object,
166     guint prop_id, const GValue * value, GParamSpec * pspec);
167 static void gst_net_client_internal_clock_get_property (GObject * object,
168     guint prop_id, GValue * value, GParamSpec * pspec);
169 static void gst_net_client_internal_clock_constructed (GObject * object);
170
171 static gboolean gst_net_client_internal_clock_start (GstNetClientInternalClock *
172     self);
173 static void gst_net_client_internal_clock_stop (GstNetClientInternalClock *
174     self);
175
176 static void
177 gst_net_client_internal_clock_class_init (GstNetClientInternalClockClass *
178     klass)
179 {
180   GObjectClass *gobject_class;
181
182   gobject_class = G_OBJECT_CLASS (klass);
183
184   gobject_class->finalize = gst_net_client_internal_clock_finalize;
185   gobject_class->get_property = gst_net_client_internal_clock_get_property;
186   gobject_class->set_property = gst_net_client_internal_clock_set_property;
187   gobject_class->constructed = gst_net_client_internal_clock_constructed;
188
189   g_object_class_install_property (gobject_class, PROP_ADDRESS,
190       g_param_spec_string ("address", "address",
191           "The IP address of the machine providing a time server",
192           DEFAULT_ADDRESS,
193           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
194   g_object_class_install_property (gobject_class, PROP_PORT,
195       g_param_spec_int ("port", "port",
196           "The port on which the remote server is listening", 0, G_MAXUINT16,
197           DEFAULT_PORT,
198           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
199   g_object_class_install_property (gobject_class, PROP_IS_NTP,
200       g_param_spec_boolean ("is-ntp", "Is NTP",
201           "The clock is using the NTPv4 protocol", FALSE,
202           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
203 }
204
205 static void
206 gst_net_client_internal_clock_init (GstNetClientInternalClock * self)
207 {
208   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
209
210   self->port = DEFAULT_PORT;
211   self->address = g_strdup (DEFAULT_ADDRESS);
212   self->is_ntp = FALSE;
213
214   gst_clock_set_timeout (GST_CLOCK (self), DEFAULT_TIMEOUT);
215
216   self->thread = NULL;
217
218   self->servaddr = NULL;
219   self->rtt_avg = GST_CLOCK_TIME_NONE;
220   self->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
221   self->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
222   self->last_remote_poll_interval = GST_CLOCK_TIME_NONE;
223   self->skipped_updates = 0;
224   self->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW;
225 }
226
227 static void
228 gst_net_client_internal_clock_finalize (GObject * object)
229 {
230   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
231
232   if (self->thread) {
233     gst_net_client_internal_clock_stop (self);
234   }
235
236   g_free (self->address);
237   self->address = NULL;
238
239   if (self->servaddr != NULL) {
240     g_object_unref (self->servaddr);
241     self->servaddr = NULL;
242   }
243
244   if (self->socket != NULL) {
245     if (!g_socket_close (self->socket, NULL))
246       GST_ERROR_OBJECT (self, "Failed to close socket");
247     g_object_unref (self->socket);
248     self->socket = NULL;
249   }
250
251   g_warn_if_fail (self->busses == NULL);
252
253   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->finalize
254       (object);
255 }
256
257 static void
258 gst_net_client_internal_clock_set_property (GObject * object, guint prop_id,
259     const GValue * value, GParamSpec * pspec)
260 {
261   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
262
263   switch (prop_id) {
264     case PROP_ADDRESS:
265       GST_OBJECT_LOCK (self);
266       g_free (self->address);
267       self->address = g_value_dup_string (value);
268       if (self->address == NULL)
269         self->address = g_strdup (DEFAULT_ADDRESS);
270       GST_OBJECT_UNLOCK (self);
271       break;
272     case PROP_PORT:
273       GST_OBJECT_LOCK (self);
274       self->port = g_value_get_int (value);
275       GST_OBJECT_UNLOCK (self);
276       break;
277     case PROP_IS_NTP:
278       GST_OBJECT_LOCK (self);
279       self->is_ntp = g_value_get_boolean (value);
280       GST_OBJECT_UNLOCK (self);
281       break;
282     default:
283       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
284       break;
285   }
286 }
287
288 static void
289 gst_net_client_internal_clock_get_property (GObject * object, guint prop_id,
290     GValue * value, GParamSpec * pspec)
291 {
292   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
293
294   switch (prop_id) {
295     case PROP_ADDRESS:
296       GST_OBJECT_LOCK (self);
297       g_value_set_string (value, self->address);
298       GST_OBJECT_UNLOCK (self);
299       break;
300     case PROP_PORT:
301       g_value_set_int (value, self->port);
302       break;
303     case PROP_IS_NTP:
304       g_value_set_boolean (value, self->is_ntp);
305       break;
306     default:
307       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
308       break;
309   }
310 }
311
312 static void
313 gst_net_client_internal_clock_constructed (GObject * object)
314 {
315   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
316
317   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->constructed
318       (object);
319
320   if (!gst_net_client_internal_clock_start (self)) {
321     g_warning ("failed to start clock '%s'", GST_OBJECT_NAME (self));
322   }
323
324   /* all systems go, cap'n */
325 }
326
327 static gint
328 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
329 {
330   if (*a < *b)
331     return -1;
332   else if (*a > *b)
333     return 1;
334   return 0;
335 }
336
337 static void
338 gst_net_client_internal_clock_observe_times (GstNetClientInternalClock * self,
339     GstClockTime local_1, GstClockTime remote_1, GstClockTime remote_2,
340     GstClockTime local_2)
341 {
342   GstClockTime current_timeout = 0;
343   GstClockTime local_avg, remote_avg;
344   gdouble r_squared;
345   GstClock *clock;
346   GstClockTime rtt, rtt_limit, min_update_interval;
347   /* Use for discont tracking */
348   GstClockTime time_before = 0;
349   GstClockTime min_guess = 0;
350   GstClockTimeDiff time_discont = 0;
351   gboolean synched, now_synched;
352   GstClockTime internal_time, external_time, rate_num, rate_den;
353   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
354       orig_rate_den;
355   GstClockTime max_discont;
356   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
357   GstClockTime median;
358   gint i;
359
360   GST_OBJECT_LOCK (self);
361   rtt_limit = self->roundtrip_limit;
362
363   /* If the server told us a poll interval and it's bigger than the
364    * one configured via the property, use the server's */
365   if (self->last_remote_poll_interval != GST_CLOCK_TIME_NONE &&
366       self->last_remote_poll_interval > self->minimum_update_interval)
367     min_update_interval = self->last_remote_poll_interval;
368   else
369     min_update_interval = self->minimum_update_interval;
370   GST_OBJECT_UNLOCK (self);
371
372   if (local_2 < local_1) {
373     GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
374         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
375         GST_TIME_ARGS (local_2));
376     goto bogus_observation;
377   }
378
379   if (remote_2 < remote_1) {
380     GST_LOG_OBJECT (self,
381         "Dropping observation: remote receive time %" GST_TIME_FORMAT
382         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (remote_1),
383         GST_TIME_ARGS (remote_2));
384     goto bogus_observation;
385   }
386
387   /* The round trip time is (assuming symmetric path delays)
388    * delta = (local_2 - local_1) - (remote_2 - remote_1)
389    */
390
391   rtt = GST_CLOCK_DIFF (local_1, local_2) - GST_CLOCK_DIFF (remote_1, remote_2);
392
393   if ((rtt_limit > 0) && (rtt > rtt_limit)) {
394     GST_LOG_OBJECT (self,
395         "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
396         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
397     goto bogus_observation;
398   }
399
400   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
401     self->last_rtts[i - 1] = self->last_rtts[i];
402   self->last_rtts[i - 1] = rtt;
403
404   if (self->last_rtts_missing) {
405     self->last_rtts_missing--;
406   } else {
407     memcpy (&last_rtts, &self->last_rtts, sizeof (last_rtts));
408     g_qsort_with_data (&last_rtts,
409         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
410         (GCompareDataFunc) compare_clock_time, NULL);
411
412     median = last_rtts[MEDIAN_PRE_FILTERING_WINDOW / 2];
413
414     /* FIXME: We might want to use something else here, like only allowing
415      * things in the interquartile range, or also filtering away delays that
416      * are too small compared to the median. This here worked well enough
417      * in tests so far.
418      */
419     if (rtt > 2 * median) {
420       GST_LOG_OBJECT (self,
421           "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * median %"
422           GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (median));
423       goto bogus_observation;
424     }
425   }
426
427   /* Track an average round trip time, for a bit of smoothing */
428   /* Always update before discarding a sample, so genuine changes in
429    * the network get picked up, eventually */
430   if (self->rtt_avg == GST_CLOCK_TIME_NONE)
431     self->rtt_avg = rtt;
432   else if (rtt < self->rtt_avg) /* Shorter RTTs carry more weight than longer */
433     self->rtt_avg = (3 * self->rtt_avg + rtt) / 4;
434   else
435     self->rtt_avg = (15 * self->rtt_avg + rtt) / 16;
436
437   if (rtt > 2 * self->rtt_avg) {
438     GST_LOG_OBJECT (self,
439         "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
440         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (self->rtt_avg));
441     goto bogus_observation;
442   }
443
444   /* The difference between the local and remote clock (again assuming
445    * symmetric path delays):
446    *
447    * local_1 + delta / 2 - remote_1 = theta
448    * or
449    * local_2 - delta / 2 - remote_2 = theta
450    *
451    * which gives after some simple algebraic transformations:
452    *
453    *         (remote_1 - local_1) + (remote_2 - local_2)
454    * theta = -------------------------------------------
455    *                              2
456    *
457    *
458    * Thus remote time at local_avg is equal to:
459    *
460    * local_avg + theta =
461    *
462    * local_1 + local_2   (remote_1 - local_1) + (remote_2 - local_2)
463    * ----------------- + -------------------------------------------
464    *         2                                2
465    *
466    * =
467    *
468    * remote_1 + remote_2
469    * ------------------- = remote_avg
470    *          2
471    *
472    * We use this for our clock estimation, i.e. local_avg at remote clock
473    * being the same as remote_avg.
474    */
475
476   local_avg = (local_2 + local_1) / 2;
477   remote_avg = (remote_2 + remote_1) / 2;
478
479   GST_LOG_OBJECT (self,
480       "local1 %" G_GUINT64_FORMAT " remote1 %" G_GUINT64_FORMAT " remote2 %"
481       G_GUINT64_FORMAT " remoteavg %" G_GUINT64_FORMAT " localavg %"
482       G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT, local_1, remote_1,
483       remote_2, remote_avg, local_avg, local_2);
484
485   clock = GST_CLOCK_CAST (self);
486
487   /* Store what the clock produced as 'now' before this update */
488   gst_clock_get_calibration (GST_CLOCK_CAST (self), &orig_internal_time,
489       &orig_external_time, &orig_rate_num, &orig_rate_den);
490   internal_time = orig_internal_time;
491   external_time = orig_external_time;
492   rate_num = orig_rate_num;
493   rate_den = orig_rate_den;
494
495   min_guess =
496       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1,
497       internal_time, external_time, rate_num, rate_den);
498   time_before =
499       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
500       internal_time, external_time, rate_num, rate_den);
501
502   /* Maximum discontinuity, when we're synched with the master. Could make this a property,
503    * but this value seems to work fine */
504   max_discont = self->rtt_avg / 4;
505
506   /* If the remote observation was within a max_discont window around our min/max estimates, we're synched */
507   synched =
508       (GST_CLOCK_DIFF (remote_avg, min_guess) < (GstClockTimeDiff) (max_discont)
509       && GST_CLOCK_DIFF (time_before,
510           remote_avg) < (GstClockTimeDiff) (max_discont));
511
512   if (gst_clock_add_observation_unapplied (GST_CLOCK_CAST (self),
513           local_avg, remote_avg, &r_squared, &internal_time, &external_time,
514           &rate_num, &rate_den)) {
515
516     /* Now compare the difference (discont) in the clock
517      * after this observation */
518     time_discont = GST_CLOCK_DIFF (time_before,
519         gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
520             internal_time, external_time, rate_num, rate_den));
521
522     /* If we were in sync with the remote clock, clamp the allowed
523      * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
524      * of remote time correctly windowed the actual remote time observation */
525     if (synched && ABS (time_discont) > max_discont) {
526       GstClockTimeDiff offset;
527       GST_DEBUG_OBJECT (clock,
528           "Too large a discont, clamping to 1/4 average RTT = %"
529           GST_TIME_FORMAT, GST_TIME_ARGS (max_discont));
530       if (time_discont > 0) {   /* Too large a forward step - add a -ve offset */
531         offset = max_discont - time_discont;
532         if (-offset > external_time)
533           external_time = 0;
534         else
535           external_time += offset;
536       } else {                  /* Too large a backward step - add a +ve offset */
537         offset = -(max_discont + time_discont);
538         external_time += offset;
539       }
540
541       time_discont += offset;
542     }
543
544     /* Check if the new clock params would have made our observation within range */
545     now_synched =
546         (GST_CLOCK_DIFF (remote_avg,
547             gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self),
548                 local_1, internal_time, external_time, rate_num,
549                 rate_den)) < (GstClockTimeDiff) (max_discont))
550         &&
551         (GST_CLOCK_DIFF (gst_clock_adjust_with_calibration
552             (GST_CLOCK_CAST (self), local_2, internal_time, external_time,
553                 rate_num, rate_den),
554             remote_avg) < (GstClockTimeDiff) (max_discont));
555
556     /* Only update the clock if we had synch or just gained it */
557     if (synched || now_synched || self->skipped_updates > MAX_SKIPPED_UPDATES) {
558       gst_clock_set_calibration (GST_CLOCK_CAST (self), internal_time,
559           external_time, rate_num, rate_den);
560       /* ghetto formula - shorter timeout for bad correlations */
561       current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
562       current_timeout =
563           MIN (current_timeout, gst_clock_get_timeout (GST_CLOCK_CAST (self)));
564       self->skipped_updates = 0;
565
566       /* FIXME: When do we consider the clock absolutely not synced anymore? */
567       gst_clock_set_synced (GST_CLOCK (self), TRUE);
568     } else {
569       /* Restore original calibration vars for the report, we're not changing the clock */
570       internal_time = orig_internal_time;
571       external_time = orig_external_time;
572       rate_num = orig_rate_num;
573       rate_den = orig_rate_den;
574       time_discont = 0;
575       self->skipped_updates++;
576     }
577   }
578
579   /* Limit the polling to at most one per minimum_update_interval */
580   if (rtt < min_update_interval)
581     current_timeout = MAX (min_update_interval - rtt, current_timeout);
582
583   GST_OBJECT_LOCK (self);
584   if (self->busses) {
585     GstStructure *s;
586     GstMessage *msg;
587     GList *l;
588
589     /* Output a stats message, whether we updated the clock or not */
590     s = gst_structure_new ("gst-netclock-statistics",
591         "synchronised", G_TYPE_BOOLEAN, synched,
592         "rtt", G_TYPE_UINT64, rtt,
593         "rtt-average", G_TYPE_UINT64, self->rtt_avg,
594         "local", G_TYPE_UINT64, local_avg,
595         "remote", G_TYPE_UINT64, remote_avg,
596         "discontinuity", G_TYPE_INT64, time_discont,
597         "remote-min-estimate", G_TYPE_UINT64, min_guess,
598         "remote-max-estimate", G_TYPE_UINT64, time_before,
599         "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote_avg,
600             min_guess), "remote-max-error", G_TYPE_INT64,
601         GST_CLOCK_DIFF (remote_avg, time_before), "request-send", G_TYPE_UINT64,
602         local_1, "request-receive", G_TYPE_UINT64, local_2, "r-squared",
603         G_TYPE_DOUBLE, r_squared, "timeout", G_TYPE_UINT64, current_timeout,
604         "internal-time", G_TYPE_UINT64, internal_time, "external-time",
605         G_TYPE_UINT64, external_time, "rate-num", G_TYPE_UINT64, rate_num,
606         "rate-den", G_TYPE_UINT64, rate_den, "rate", G_TYPE_DOUBLE,
607         (gdouble) (rate_num) / rate_den, "local-clock-offset", G_TYPE_INT64,
608         GST_CLOCK_DIFF (internal_time, external_time), NULL);
609     msg = gst_message_new_element (GST_OBJECT (self), s);
610
611     for (l = self->busses; l; l = l->next)
612       gst_bus_post (l->data, gst_message_ref (msg));
613     gst_message_unref (msg);
614   }
615   GST_OBJECT_UNLOCK (self);
616
617   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
618   self->timeout_expiration = gst_util_get_timestamp () + current_timeout;
619
620   return;
621
622 bogus_observation:
623   /* Schedule a new packet again soon */
624   self->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
625   return;
626 }
627
628 static gpointer
629 gst_net_client_internal_clock_thread (gpointer data)
630 {
631   GstNetClientInternalClock *self = data;
632   GSocket *socket = self->socket;
633   GError *err = NULL;
634
635   GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
636
637   g_socket_set_blocking (socket, TRUE);
638   g_socket_set_timeout (socket, 0);
639
640   while (!g_cancellable_is_cancelled (self->cancel)) {
641     GstClockTime expiration_time = self->timeout_expiration;
642     GstClockTime now = gst_util_get_timestamp ();
643     gint64 socket_timeout;
644
645     if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
646       socket_timeout = 0;
647     } else {
648       socket_timeout = (expiration_time - now) / GST_USECOND;
649     }
650
651     GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
652
653     if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
654             self->cancel, &err)) {
655       /* cancelled, timeout or error */
656       if (err->code == G_IO_ERROR_CANCELLED) {
657         GST_INFO_OBJECT (self, "cancelled");
658         g_clear_error (&err);
659         break;
660       } else if (err->code == G_IO_ERROR_TIMED_OUT) {
661         /* timed out, let's send another packet */
662         GST_DEBUG_OBJECT (self, "timed out");
663
664         if (self->is_ntp) {
665           GstNtpPacket *packet;
666
667           packet = gst_ntp_packet_new (NULL, NULL);
668
669           packet->transmit_time =
670               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
671
672           GST_DEBUG_OBJECT (self,
673               "sending packet, local time = %" GST_TIME_FORMAT,
674               GST_TIME_ARGS (packet->transmit_time));
675
676           gst_ntp_packet_send (packet, self->socket, self->servaddr, NULL);
677
678           g_free (packet);
679         } else {
680           GstNetTimePacket *packet;
681
682           packet = gst_net_time_packet_new (NULL);
683
684           packet->local_time =
685               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
686
687           GST_DEBUG_OBJECT (self,
688               "sending packet, local time = %" GST_TIME_FORMAT,
689               GST_TIME_ARGS (packet->local_time));
690
691           gst_net_time_packet_send (packet, self->socket, self->servaddr, NULL);
692
693           g_free (packet);
694         }
695
696         /* reset timeout (but are expecting a response sooner anyway) */
697         self->timeout_expiration =
698             gst_util_get_timestamp () +
699             gst_clock_get_timeout (GST_CLOCK_CAST (self));
700       } else {
701         GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
702         g_usleep (G_USEC_PER_SEC / 10); /* throttle */
703       }
704       g_clear_error (&err);
705     } else {
706       GstClockTime new_local;
707
708       /* got packet */
709
710       new_local = gst_clock_get_internal_time (GST_CLOCK_CAST (self));
711
712       if (self->is_ntp) {
713         GstNtpPacket *packet;
714
715         packet = gst_ntp_packet_receive (socket, NULL, &err);
716
717         if (packet != NULL) {
718           GST_LOG_OBJECT (self, "got packet back");
719           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
720               GST_TIME_ARGS (packet->origin_time));
721           GST_LOG_OBJECT (self, "remote_1 = %" GST_TIME_FORMAT,
722               GST_TIME_ARGS (packet->receive_time));
723           GST_LOG_OBJECT (self, "remote_2 = %" GST_TIME_FORMAT,
724               GST_TIME_ARGS (packet->transmit_time));
725           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
726               GST_TIME_ARGS (new_local));
727           GST_LOG_OBJECT (self, "poll_interval = %" GST_TIME_FORMAT,
728               GST_TIME_ARGS (packet->poll_interval));
729
730           /* Remember the last poll interval we ever got from the server */
731           if (packet->poll_interval != GST_CLOCK_TIME_NONE)
732             self->last_remote_poll_interval = packet->poll_interval;
733
734           /* observe_times will reset the timeout */
735           gst_net_client_internal_clock_observe_times (self,
736               packet->origin_time, packet->receive_time, packet->transmit_time,
737               new_local);
738
739           g_free (packet);
740         } else if (err != NULL) {
741           if (g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION)
742               || g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY)) {
743             GST_ERROR_OBJECT (self, "fatal receive error: %s", err->message);
744             g_clear_error (&err);
745             break;
746           } else if (g_error_matches (err, GST_NTP_ERROR,
747                   GST_NTP_ERROR_KOD_RATE)) {
748             GST_WARNING_OBJECT (self, "need to limit rate");
749
750             /* If the server did not tell us a poll interval before, double
751              * our minimum poll interval. Otherwise we assume that the server
752              * already told us something sensible and that this error here
753              * was just a spurious error */
754             if (self->last_remote_poll_interval == GST_CLOCK_TIME_NONE)
755               self->minimum_update_interval *= 2;
756
757             /* And wait a bit before we send the next packet instead of
758              * sending it immediately */
759             self->timeout_expiration =
760                 gst_util_get_timestamp () +
761                 gst_clock_get_timeout (GST_CLOCK_CAST (self));
762           } else {
763             GST_WARNING_OBJECT (self, "receive error: %s", err->message);
764           }
765           g_clear_error (&err);
766         }
767       } else {
768         GstNetTimePacket *packet;
769
770         packet = gst_net_time_packet_receive (socket, NULL, &err);
771
772         if (packet != NULL) {
773           GST_LOG_OBJECT (self, "got packet back");
774           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
775               GST_TIME_ARGS (packet->local_time));
776           GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
777               GST_TIME_ARGS (packet->remote_time));
778           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
779               GST_TIME_ARGS (new_local));
780
781           /* observe_times will reset the timeout */
782           gst_net_client_internal_clock_observe_times (self, packet->local_time,
783               packet->remote_time, packet->remote_time, new_local);
784
785           g_free (packet);
786         } else if (err != NULL) {
787           GST_WARNING_OBJECT (self, "receive error: %s", err->message);
788           g_clear_error (&err);
789         }
790       }
791     }
792   }
793   GST_INFO_OBJECT (self, "shutting down net client clock thread");
794   return NULL;
795 }
796
797 static gboolean
798 gst_net_client_internal_clock_start (GstNetClientInternalClock * self)
799 {
800   GSocketAddress *servaddr;
801   GSocketAddress *myaddr;
802   GSocketAddress *anyaddr;
803   GInetAddress *inetaddr;
804   GSocket *socket;
805   GError *error = NULL;
806   GSocketFamily family;
807   GPollFD dummy_pollfd;
808   GResolver *resolver = NULL;
809   GError *err = NULL;
810
811   g_return_val_if_fail (self->address != NULL, FALSE);
812   g_return_val_if_fail (self->servaddr == NULL, FALSE);
813
814   /* create target address */
815   inetaddr = g_inet_address_new_from_string (self->address);
816   if (inetaddr == NULL) {
817     GList *results;
818
819     resolver = g_resolver_get_default ();
820
821     results = g_resolver_lookup_by_name (resolver, self->address, NULL, &err);
822     if (!results)
823       goto failed_to_resolve;
824
825     inetaddr = G_INET_ADDRESS (g_object_ref (results->data));
826     g_resolver_free_addresses (results);
827     g_object_unref (resolver);
828   }
829
830   family = g_inet_address_get_family (inetaddr);
831
832   servaddr = g_inet_socket_address_new (inetaddr, self->port);
833   g_object_unref (inetaddr);
834
835   g_assert (servaddr != NULL);
836
837   GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address,
838       self->port);
839
840   socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
841       G_SOCKET_PROTOCOL_UDP, &error);
842
843   if (socket == NULL)
844     goto no_socket;
845
846   GST_DEBUG_OBJECT (self, "binding socket");
847   inetaddr = g_inet_address_new_any (family);
848   anyaddr = g_inet_socket_address_new (inetaddr, 0);
849   g_socket_bind (socket, anyaddr, TRUE, &error);
850   g_object_unref (anyaddr);
851   g_object_unref (inetaddr);
852
853   if (error != NULL)
854     goto bind_error;
855
856   /* check address we're bound to, mostly for debugging purposes */
857   myaddr = g_socket_get_local_address (socket, &error);
858
859   if (myaddr == NULL)
860     goto getsockname_error;
861
862   GST_DEBUG_OBJECT (self, "socket opened on UDP port %d",
863       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
864
865   g_object_unref (myaddr);
866
867   self->cancel = g_cancellable_new ();
868   self->made_cancel_fd =
869       g_cancellable_make_pollfd (self->cancel, &dummy_pollfd);
870
871   self->socket = socket;
872   self->servaddr = G_SOCKET_ADDRESS (servaddr);
873
874   self->thread = g_thread_try_new ("GstNetClientInternalClock",
875       gst_net_client_internal_clock_thread, self, &error);
876
877   if (error != NULL)
878     goto no_thread;
879
880   return TRUE;
881
882   /* ERRORS */
883 no_socket:
884   {
885     GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
886     g_error_free (error);
887     return FALSE;
888   }
889 bind_error:
890   {
891     GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
892     g_error_free (error);
893     g_object_unref (socket);
894     return FALSE;
895   }
896 getsockname_error:
897   {
898     GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
899     g_error_free (error);
900     g_object_unref (socket);
901     return FALSE;
902   }
903 failed_to_resolve:
904   {
905     GST_ERROR_OBJECT (self, "resolving '%s' failed: %s",
906         self->address, err->message);
907     g_clear_error (&err);
908     g_object_unref (resolver);
909     return FALSE;
910   }
911 no_thread:
912   {
913     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
914     g_object_unref (self->servaddr);
915     self->servaddr = NULL;
916     g_object_unref (self->socket);
917     self->socket = NULL;
918     g_error_free (error);
919     return FALSE;
920   }
921 }
922
923 static void
924 gst_net_client_internal_clock_stop (GstNetClientInternalClock * self)
925 {
926   if (self->thread == NULL)
927     return;
928
929   GST_INFO_OBJECT (self, "stopping...");
930   g_cancellable_cancel (self->cancel);
931
932   g_thread_join (self->thread);
933   self->thread = NULL;
934
935   if (self->made_cancel_fd)
936     g_cancellable_release_fd (self->cancel);
937
938   g_object_unref (self->cancel);
939   self->cancel = NULL;
940
941   g_object_unref (self->servaddr);
942   self->servaddr = NULL;
943
944   g_object_unref (self->socket);
945   self->socket = NULL;
946
947   GST_INFO_OBJECT (self, "stopped");
948 }
949
950 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
951   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
952
953 struct _GstNetClientClockPrivate
954 {
955   GstClock *internal_clock;
956
957   GstClockTime roundtrip_limit;
958   GstClockTime minimum_update_interval;
959
960   GstClockTime base_time, internal_base_time;
961
962   gchar *address;
963   gint port;
964
965   GstBus *bus;
966
967   gboolean is_ntp;
968
969   gulong synced_id;
970 };
971
972 G_DEFINE_TYPE (GstNetClientClock, gst_net_client_clock, GST_TYPE_SYSTEM_CLOCK);
973
974 static void gst_net_client_clock_finalize (GObject * object);
975 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
976     const GValue * value, GParamSpec * pspec);
977 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
978     GValue * value, GParamSpec * pspec);
979 static void gst_net_client_clock_constructed (GObject * object);
980
981 static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock);
982
983 static void
984 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
985 {
986   GObjectClass *gobject_class;
987   GstClockClass *clock_class;
988
989   gobject_class = G_OBJECT_CLASS (klass);
990   clock_class = GST_CLOCK_CLASS (klass);
991
992   g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
993
994   gobject_class->finalize = gst_net_client_clock_finalize;
995   gobject_class->get_property = gst_net_client_clock_get_property;
996   gobject_class->set_property = gst_net_client_clock_set_property;
997   gobject_class->constructed = gst_net_client_clock_constructed;
998
999   g_object_class_install_property (gobject_class, PROP_ADDRESS,
1000       g_param_spec_string ("address", "address",
1001           "The IP address of the machine providing a time server",
1002           DEFAULT_ADDRESS,
1003           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1004   g_object_class_install_property (gobject_class, PROP_PORT,
1005       g_param_spec_int ("port", "port",
1006           "The port on which the remote server is listening", 0, G_MAXUINT16,
1007           DEFAULT_PORT,
1008           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1009   g_object_class_install_property (gobject_class, PROP_BUS,
1010       g_param_spec_object ("bus", "bus",
1011           "A GstBus on which to send clock status information", GST_TYPE_BUS,
1012           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1013
1014   /**
1015    * GstNetClientInternalClock::round-trip-limit:
1016    *
1017    * Maximum allowed round-trip for packets. If this property is set to a nonzero
1018    * value, all packets with a round-trip interval larger than this limit will be
1019    * ignored. This is useful for networks with severe and fluctuating transport
1020    * delays. Filtering out these packets increases stability of the synchronization.
1021    * On the other hand, the lower the limit, the higher the amount of filtered
1022    * packets. Empirical tests are typically necessary to estimate a good value
1023    * for the limit.
1024    * If the property is set to zero, the limit is disabled.
1025    *
1026    * Since: 1.4
1027    */
1028   g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
1029       g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
1030           "Maximum tolerable round-trip interval for packets, in nanoseconds "
1031           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
1032           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1033
1034   g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL,
1035       g_param_spec_uint64 ("minimum-update-interval", "minimum update interval",
1036           "Minimum polling interval for packets, in nanoseconds"
1037           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL,
1038           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1039
1040   g_object_class_install_property (gobject_class, PROP_BASE_TIME,
1041       g_param_spec_uint64 ("base-time", "Base Time",
1042           "Initial time that is reported before synchronization", 0,
1043           G_MAXUINT64, DEFAULT_BASE_TIME,
1044           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1045
1046   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
1047       g_param_spec_object ("internal-clock", "Internal Clock",
1048           "Internal clock that directly slaved to the remote clock",
1049           GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1050
1051   clock_class->get_internal_time = gst_net_client_clock_get_internal_time;
1052 }
1053
1054 static void
1055 gst_net_client_clock_init (GstNetClientClock * self)
1056 {
1057   GstNetClientClockPrivate *priv;
1058   GstClock *clock;
1059
1060   self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
1061
1062   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
1063   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
1064
1065   priv->port = DEFAULT_PORT;
1066   priv->address = g_strdup (DEFAULT_ADDRESS);
1067
1068   priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
1069   priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
1070
1071   clock = gst_system_clock_obtain ();
1072   priv->base_time = DEFAULT_BASE_TIME;
1073   priv->internal_base_time = gst_clock_get_time (clock);
1074   gst_object_unref (clock);
1075 }
1076
1077 /* Must be called with clocks_lock */
1078 static void
1079 update_clock_cache (ClockCache * cache)
1080 {
1081   GstClockTime roundtrip_limit = 0, minimum_update_interval = 0;
1082   GList *l, *busses = NULL;
1083
1084   GST_OBJECT_LOCK (cache->clock);
1085   g_list_free_full (GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses,
1086       (GDestroyNotify) gst_object_unref);
1087
1088   for (l = cache->clocks; l; l = l->next) {
1089     GstNetClientClock *clock = l->data;
1090
1091     if (clock->priv->bus)
1092       busses = g_list_prepend (busses, gst_object_ref (clock->priv->bus));
1093
1094     if (roundtrip_limit == 0)
1095       roundtrip_limit = clock->priv->roundtrip_limit;
1096     else
1097       roundtrip_limit = MAX (roundtrip_limit, clock->priv->roundtrip_limit);
1098
1099     if (minimum_update_interval == 0)
1100       minimum_update_interval = clock->priv->minimum_update_interval;
1101     else
1102       minimum_update_interval =
1103           MIN (minimum_update_interval, clock->priv->minimum_update_interval);
1104   }
1105   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses = busses;
1106   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->roundtrip_limit =
1107       roundtrip_limit;
1108   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->minimum_update_interval =
1109       minimum_update_interval;
1110
1111   GST_OBJECT_UNLOCK (cache->clock);
1112 }
1113
1114 static gboolean
1115 remove_clock_cache (GstClock * clock, GstClockTime time, GstClockID id,
1116     gpointer user_data)
1117 {
1118   ClockCache *cache = user_data;
1119
1120   G_LOCK (clocks_lock);
1121   if (!cache->clocks) {
1122     gst_clock_id_unref (cache->remove_id);
1123     gst_object_unref (cache->clock);
1124     clocks = g_list_remove (clocks, cache);
1125     g_free (cache);
1126   }
1127   G_UNLOCK (clocks_lock);
1128
1129   return TRUE;
1130 }
1131
1132 static void
1133 gst_net_client_clock_finalize (GObject * object)
1134 {
1135   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1136   GList *l;
1137
1138   if (self->priv->synced_id)
1139     g_signal_handler_disconnect (self->priv->internal_clock,
1140         self->priv->synced_id);
1141   self->priv->synced_id = 0;
1142
1143   G_LOCK (clocks_lock);
1144   for (l = clocks; l; l = l->next) {
1145     ClockCache *cache = l->data;
1146
1147     if (cache->clock == self->priv->internal_clock) {
1148       cache->clocks = g_list_remove (cache->clocks, self);
1149
1150       if (cache->clocks) {
1151         update_clock_cache (cache);
1152       } else {
1153         GstClock *sysclock = gst_system_clock_obtain ();
1154         GstClockTime time = gst_clock_get_time (sysclock) + 60 * GST_SECOND;
1155
1156         cache->remove_id = gst_clock_new_single_shot_id (sysclock, time);
1157         gst_clock_id_wait_async (cache->remove_id, remove_clock_cache, cache,
1158             NULL);
1159         gst_object_unref (sysclock);
1160       }
1161       break;
1162     }
1163   }
1164   G_UNLOCK (clocks_lock);
1165
1166   g_free (self->priv->address);
1167   self->priv->address = NULL;
1168
1169   if (self->priv->bus != NULL) {
1170     gst_object_unref (self->priv->bus);
1171     self->priv->bus = NULL;
1172   }
1173
1174   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->finalize (object);
1175 }
1176
1177 static void
1178 gst_net_client_clock_set_property (GObject * object, guint prop_id,
1179     const GValue * value, GParamSpec * pspec)
1180 {
1181   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1182   gboolean update = FALSE;
1183
1184   switch (prop_id) {
1185     case PROP_ADDRESS:
1186       GST_OBJECT_LOCK (self);
1187       g_free (self->priv->address);
1188       self->priv->address = g_value_dup_string (value);
1189       if (self->priv->address == NULL)
1190         self->priv->address = g_strdup (DEFAULT_ADDRESS);
1191       GST_OBJECT_UNLOCK (self);
1192       break;
1193     case PROP_PORT:
1194       GST_OBJECT_LOCK (self);
1195       self->priv->port = g_value_get_int (value);
1196       GST_OBJECT_UNLOCK (self);
1197       break;
1198     case PROP_ROUNDTRIP_LIMIT:
1199       GST_OBJECT_LOCK (self);
1200       self->priv->roundtrip_limit = g_value_get_uint64 (value);
1201       GST_OBJECT_UNLOCK (self);
1202       update = TRUE;
1203       break;
1204     case PROP_MINIMUM_UPDATE_INTERVAL:
1205       GST_OBJECT_LOCK (self);
1206       self->priv->minimum_update_interval = g_value_get_uint64 (value);
1207       GST_OBJECT_UNLOCK (self);
1208       update = TRUE;
1209       break;
1210     case PROP_BUS:
1211       GST_OBJECT_LOCK (self);
1212       if (self->priv->bus)
1213         gst_object_unref (self->priv->bus);
1214       self->priv->bus = g_value_dup_object (value);
1215       GST_OBJECT_UNLOCK (self);
1216       update = TRUE;
1217       break;
1218     case PROP_BASE_TIME:{
1219       GstClock *clock;
1220
1221       self->priv->base_time = g_value_get_uint64 (value);
1222       clock = gst_system_clock_obtain ();
1223       self->priv->internal_base_time = gst_clock_get_time (clock);
1224       gst_object_unref (clock);
1225       break;
1226     }
1227     default:
1228       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1229       break;
1230   }
1231
1232   if (update && self->priv->internal_clock) {
1233     GList *l;
1234
1235     G_LOCK (clocks_lock);
1236     for (l = clocks; l; l = l->next) {
1237       ClockCache *cache = l->data;
1238
1239       if (cache->clock == self->priv->internal_clock) {
1240         update_clock_cache (cache);
1241       }
1242     }
1243     G_UNLOCK (clocks_lock);
1244   }
1245 }
1246
1247 static void
1248 gst_net_client_clock_get_property (GObject * object, guint prop_id,
1249     GValue * value, GParamSpec * pspec)
1250 {
1251   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1252
1253   switch (prop_id) {
1254     case PROP_ADDRESS:
1255       GST_OBJECT_LOCK (self);
1256       g_value_set_string (value, self->priv->address);
1257       GST_OBJECT_UNLOCK (self);
1258       break;
1259     case PROP_PORT:
1260       g_value_set_int (value, self->priv->port);
1261       break;
1262     case PROP_ROUNDTRIP_LIMIT:
1263       GST_OBJECT_LOCK (self);
1264       g_value_set_uint64 (value, self->priv->roundtrip_limit);
1265       GST_OBJECT_UNLOCK (self);
1266       break;
1267     case PROP_MINIMUM_UPDATE_INTERVAL:
1268       GST_OBJECT_LOCK (self);
1269       g_value_set_uint64 (value, self->priv->minimum_update_interval);
1270       GST_OBJECT_UNLOCK (self);
1271       break;
1272     case PROP_BUS:
1273       GST_OBJECT_LOCK (self);
1274       g_value_set_object (value, self->priv->bus);
1275       GST_OBJECT_UNLOCK (self);
1276       break;
1277     case PROP_BASE_TIME:
1278       g_value_set_uint64 (value, self->priv->base_time);
1279       break;
1280     case PROP_INTERNAL_CLOCK:
1281       g_value_set_object (value, self->priv->internal_clock);
1282       break;
1283     default:
1284       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1285       break;
1286   }
1287 }
1288
1289 static void
1290 gst_net_client_clock_synced_cb (GstClock * internal_clock, gboolean synced,
1291     GstClock * self)
1292 {
1293   gst_clock_set_synced (self, synced);
1294 }
1295
1296 static void
1297 gst_net_client_clock_constructed (GObject * object)
1298 {
1299   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1300   GstClock *internal_clock;
1301   GList *l;
1302   ClockCache *cache = NULL;
1303
1304   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->constructed (object);
1305
1306   G_LOCK (clocks_lock);
1307   for (l = clocks; l; l = l->next) {
1308     ClockCache *tmp = l->data;
1309     GstNetClientInternalClock *internal_clock =
1310         GST_NET_CLIENT_INTERNAL_CLOCK (tmp->clock);
1311
1312     if (strcmp (internal_clock->address, self->priv->address) == 0 &&
1313         internal_clock->port == self->priv->port) {
1314       cache = tmp;
1315
1316       if (cache->remove_id) {
1317         gst_clock_id_unschedule (cache->remove_id);
1318         cache->remove_id = NULL;
1319       }
1320       break;
1321     }
1322   }
1323
1324   if (!cache) {
1325     cache = g_new0 (ClockCache, 1);
1326
1327     cache->clock =
1328         g_object_new (GST_TYPE_NET_CLIENT_INTERNAL_CLOCK, "address",
1329         self->priv->address, "port", self->priv->port, "is-ntp",
1330         self->priv->is_ntp, NULL);
1331     clocks = g_list_prepend (clocks, cache);
1332   }
1333
1334   cache->clocks = g_list_prepend (cache->clocks, self);
1335
1336   GST_OBJECT_LOCK (cache->clock);
1337   if (gst_clock_is_synced (cache->clock))
1338     gst_clock_set_synced (GST_CLOCK (self), TRUE);
1339   self->priv->synced_id =
1340       g_signal_connect (cache->clock, "synced",
1341       G_CALLBACK (gst_net_client_clock_synced_cb), self);
1342   GST_OBJECT_UNLOCK (cache->clock);
1343
1344   G_UNLOCK (clocks_lock);
1345
1346   self->priv->internal_clock = internal_clock = cache->clock;
1347
1348   /* all systems go, cap'n */
1349 }
1350
1351 static GstClockTime
1352 gst_net_client_clock_get_internal_time (GstClock * clock)
1353 {
1354   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock);
1355
1356   if (!gst_clock_is_synced (self->priv->internal_clock)) {
1357     GstClockTime now = gst_clock_get_internal_time (self->priv->internal_clock);
1358     return gst_clock_adjust_with_calibration (self->priv->internal_clock, now,
1359         self->priv->internal_base_time, self->priv->base_time, 1, 1);
1360   }
1361
1362   return gst_clock_get_time (self->priv->internal_clock);
1363 }
1364
1365 /**
1366  * gst_net_client_clock_new:
1367  * @name: a name for the clock
1368  * @remote_address: the address or hostname of the remote clock provider
1369  * @remote_port: the port of the remote clock provider
1370  * @base_time: initial time of the clock
1371  *
1372  * Create a new #GstNetClientInternalClock that will report the time
1373  * provided by the #GstNetTimeProvider on @remote_address and 
1374  * @remote_port.
1375  *
1376  * Returns: a new #GstClock that receives a time from the remote
1377  * clock.
1378  */
1379 GstClock *
1380 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
1381     gint remote_port, GstClockTime base_time)
1382 {
1383   GstClock *ret;
1384
1385   g_return_val_if_fail (remote_address != NULL, NULL);
1386   g_return_val_if_fail (remote_port > 0, NULL);
1387   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1388   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1389
1390   ret = g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "address", remote_address,
1391       "port", remote_port, "base-time", base_time, NULL);
1392
1393   return ret;
1394 }
1395
1396 G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_NET_CLIENT_CLOCK);
1397
1398 static void
1399 gst_ntp_clock_class_init (GstNtpClockClass * klass)
1400 {
1401 }
1402
1403 static void
1404 gst_ntp_clock_init (GstNtpClock * self)
1405 {
1406   GST_NET_CLIENT_CLOCK (self)->priv->is_ntp = TRUE;
1407 }
1408
1409 /**
1410  * gst_ntp_clock_new:
1411  * @name: a name for the clock
1412  * @remote_address: the address or hostname of the remote clock provider
1413  * @remote_port: the port of the remote clock provider
1414  * @base_time: initial time of the clock
1415  *
1416  * Create a new #GstNtpClock that will report the time provided by
1417  * the NTPv4 server on @remote_address and @remote_port.
1418  *
1419  * Returns: a new #GstClock that receives a time from the remote
1420  * clock.
1421  *
1422  * Since: 1.6
1423  */
1424 GstClock *
1425 gst_ntp_clock_new (const gchar * name, const gchar * remote_address,
1426     gint remote_port, GstClockTime base_time)
1427 {
1428   GstClock *ret;
1429
1430   g_return_val_if_fail (remote_address != NULL, NULL);
1431   g_return_val_if_fail (remote_port > 0, NULL);
1432   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1433   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1434
1435   ret = g_object_new (GST_TYPE_NTP_CLOCK, "address", remote_address,
1436       "port", remote_port, "base-time", base_time, NULL);
1437
1438   return ret;
1439 }