Port gtk-doc comments to their equivalent markdown syntax
[platform/upstream/gstreamer.git] / libs / gst / net / gstnetclientclock.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2005 Andy Wingo <wingo@pobox.com>
5  * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
7  *
8  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
9  * the network
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION:gstnetclientclock
28  * @title: GstNetClientClock
29  * @short_description: Special clock that synchronizes to a remote time
30  *                     provider.
31  * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
32  *
33  * #GstNetClientClock implements a custom #GstClock that synchronizes its time
34  * to a remote time provider such as #GstNetTimeProvider. #GstNtpClock
35  * implements a #GstClock that synchronizes its time to a remote NTPv4 server.
36  *
37  * A new clock is created with gst_net_client_clock_new() or
38  * gst_ntp_clock_new(), which takes the address and port of the remote time
39  * provider along with a name and an initial time.
40  *
41  * This clock will poll the time provider and will update its calibration
42  * parameters based on the local and remote observations.
43  *
44  * The "round-trip" property limits the maximum round trip packets can take.
45  *
46  * Various parameters of the clock can be configured with the parent #GstClock
47  * "timeout", "window-size" and "window-threshold" object properties.
48  *
49  * A #GstNetClientClock and #GstNtpClock is typically set on a #GstPipeline with
50  * gst_pipeline_use_clock().
51  *
52  * If you set a #GstBus on the clock via the "bus" object property, it will
53  * send @GST_MESSAGE_ELEMENT messages with an attached #GstStructure containing
54  * statistics about clock accuracy and network traffic.
55  */
56
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
60
61 #include "gstnettimepacket.h"
62 #include "gstntppacket.h"
63 #include "gstnetclientclock.h"
64
65 #include <gio/gio.h>
66
67 #include <string.h>
68
69 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
70 #define GST_CAT_DEFAULT (ncc_debug)
71
72 typedef struct
73 {
74   GstClock *clock;              /* GstNetClientInternalClock */
75
76   GList *clocks;                /* GstNetClientClocks */
77
78   GstClockID remove_id;
79 } ClockCache;
80
81 G_LOCK_DEFINE_STATIC (clocks_lock);
82 static GList *clocks = NULL;
83
84 #define GST_TYPE_NET_CLIENT_INTERNAL_CLOCK \
85   (gst_net_client_internal_clock_get_type())
86 #define GST_NET_CLIENT_INTERNAL_CLOCK(obj) \
87   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClock))
88 #define GST_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
89   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClockClass))
90 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK(obj) \
91   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
92 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
93   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
94
95 typedef struct _GstNetClientInternalClock GstNetClientInternalClock;
96 typedef struct _GstNetClientInternalClockClass GstNetClientInternalClockClass;
97
98 G_GNUC_INTERNAL GType gst_net_client_internal_clock_get_type (void);
99
100 #define DEFAULT_ADDRESS         "127.0.0.1"
101 #define DEFAULT_PORT            5637
102 #define DEFAULT_TIMEOUT         GST_SECOND
103 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
104 /* Minimum timeout will be immediately (ie, as fast as one RTT), but no
105  * more often than 1/20th second (arbitrarily, to spread observations a little) */
106 #define DEFAULT_MINIMUM_UPDATE_INTERVAL (GST_SECOND / 20)
107 #define DEFAULT_BASE_TIME       0
108
109 /* Maximum number of clock updates we can skip before updating */
110 #define MAX_SKIPPED_UPDATES 5
111
112 #define MEDIAN_PRE_FILTERING_WINDOW 9
113
114 enum
115 {
116   PROP_0,
117   PROP_ADDRESS,
118   PROP_PORT,
119   PROP_ROUNDTRIP_LIMIT,
120   PROP_MINIMUM_UPDATE_INTERVAL,
121   PROP_BUS,
122   PROP_BASE_TIME,
123   PROP_INTERNAL_CLOCK,
124   PROP_IS_NTP
125 };
126
127 struct _GstNetClientInternalClock
128 {
129   GstSystemClock clock;
130
131   GThread *thread;
132
133   GSocket *socket;
134   GSocketAddress *servaddr;
135   GCancellable *cancel;
136   gboolean made_cancel_fd;
137
138   GstClockTime timeout_expiration;
139   GstClockTime roundtrip_limit;
140   GstClockTime rtt_avg;
141   GstClockTime minimum_update_interval;
142   GstClockTime last_remote_poll_interval;
143   guint skipped_updates;
144   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
145   gint last_rtts_missing;
146
147   gchar *address;
148   gint port;
149   gboolean is_ntp;
150
151   /* Protected by OBJECT_LOCK */
152   GList *busses;
153 };
154
155 struct _GstNetClientInternalClockClass
156 {
157   GstSystemClockClass parent_class;
158 };
159
160 #define _do_init \
161   GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
162 G_DEFINE_TYPE_WITH_CODE (GstNetClientInternalClock,
163     gst_net_client_internal_clock, GST_TYPE_SYSTEM_CLOCK, _do_init);
164
165 static void gst_net_client_internal_clock_finalize (GObject * object);
166 static void gst_net_client_internal_clock_set_property (GObject * object,
167     guint prop_id, const GValue * value, GParamSpec * pspec);
168 static void gst_net_client_internal_clock_get_property (GObject * object,
169     guint prop_id, GValue * value, GParamSpec * pspec);
170 static void gst_net_client_internal_clock_constructed (GObject * object);
171
172 static gboolean gst_net_client_internal_clock_start (GstNetClientInternalClock *
173     self);
174 static void gst_net_client_internal_clock_stop (GstNetClientInternalClock *
175     self);
176
177 static void
178 gst_net_client_internal_clock_class_init (GstNetClientInternalClockClass *
179     klass)
180 {
181   GObjectClass *gobject_class;
182
183   gobject_class = G_OBJECT_CLASS (klass);
184
185   gobject_class->finalize = gst_net_client_internal_clock_finalize;
186   gobject_class->get_property = gst_net_client_internal_clock_get_property;
187   gobject_class->set_property = gst_net_client_internal_clock_set_property;
188   gobject_class->constructed = gst_net_client_internal_clock_constructed;
189
190   g_object_class_install_property (gobject_class, PROP_ADDRESS,
191       g_param_spec_string ("address", "address",
192           "The IP address of the machine providing a time server",
193           DEFAULT_ADDRESS,
194           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
195   g_object_class_install_property (gobject_class, PROP_PORT,
196       g_param_spec_int ("port", "port",
197           "The port on which the remote server is listening", 0, G_MAXUINT16,
198           DEFAULT_PORT,
199           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
200   g_object_class_install_property (gobject_class, PROP_IS_NTP,
201       g_param_spec_boolean ("is-ntp", "Is NTP",
202           "The clock is using the NTPv4 protocol", FALSE,
203           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
204 }
205
206 static void
207 gst_net_client_internal_clock_init (GstNetClientInternalClock * self)
208 {
209   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
210
211   self->port = DEFAULT_PORT;
212   self->address = g_strdup (DEFAULT_ADDRESS);
213   self->is_ntp = FALSE;
214
215   gst_clock_set_timeout (GST_CLOCK (self), DEFAULT_TIMEOUT);
216
217   self->thread = NULL;
218
219   self->servaddr = NULL;
220   self->rtt_avg = GST_CLOCK_TIME_NONE;
221   self->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
222   self->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
223   self->last_remote_poll_interval = GST_CLOCK_TIME_NONE;
224   self->skipped_updates = 0;
225   self->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW;
226 }
227
228 static void
229 gst_net_client_internal_clock_finalize (GObject * object)
230 {
231   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
232
233   if (self->thread) {
234     gst_net_client_internal_clock_stop (self);
235   }
236
237   g_free (self->address);
238   self->address = NULL;
239
240   if (self->servaddr != NULL) {
241     g_object_unref (self->servaddr);
242     self->servaddr = NULL;
243   }
244
245   if (self->socket != NULL) {
246     if (!g_socket_close (self->socket, NULL))
247       GST_ERROR_OBJECT (self, "Failed to close socket");
248     g_object_unref (self->socket);
249     self->socket = NULL;
250   }
251
252   g_warn_if_fail (self->busses == NULL);
253
254   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->finalize
255       (object);
256 }
257
258 static void
259 gst_net_client_internal_clock_set_property (GObject * object, guint prop_id,
260     const GValue * value, GParamSpec * pspec)
261 {
262   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
263
264   switch (prop_id) {
265     case PROP_ADDRESS:
266       GST_OBJECT_LOCK (self);
267       g_free (self->address);
268       self->address = g_value_dup_string (value);
269       if (self->address == NULL)
270         self->address = g_strdup (DEFAULT_ADDRESS);
271       GST_OBJECT_UNLOCK (self);
272       break;
273     case PROP_PORT:
274       GST_OBJECT_LOCK (self);
275       self->port = g_value_get_int (value);
276       GST_OBJECT_UNLOCK (self);
277       break;
278     case PROP_IS_NTP:
279       GST_OBJECT_LOCK (self);
280       self->is_ntp = g_value_get_boolean (value);
281       GST_OBJECT_UNLOCK (self);
282       break;
283     default:
284       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
285       break;
286   }
287 }
288
289 static void
290 gst_net_client_internal_clock_get_property (GObject * object, guint prop_id,
291     GValue * value, GParamSpec * pspec)
292 {
293   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
294
295   switch (prop_id) {
296     case PROP_ADDRESS:
297       GST_OBJECT_LOCK (self);
298       g_value_set_string (value, self->address);
299       GST_OBJECT_UNLOCK (self);
300       break;
301     case PROP_PORT:
302       g_value_set_int (value, self->port);
303       break;
304     case PROP_IS_NTP:
305       g_value_set_boolean (value, self->is_ntp);
306       break;
307     default:
308       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
309       break;
310   }
311 }
312
313 static void
314 gst_net_client_internal_clock_constructed (GObject * object)
315 {
316   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
317
318   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->constructed
319       (object);
320
321   if (!gst_net_client_internal_clock_start (self)) {
322     g_warning ("failed to start clock '%s'", GST_OBJECT_NAME (self));
323   }
324
325   /* all systems go, cap'n */
326 }
327
328 static gint
329 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
330 {
331   if (*a < *b)
332     return -1;
333   else if (*a > *b)
334     return 1;
335   return 0;
336 }
337
338 static void
339 gst_net_client_internal_clock_observe_times (GstNetClientInternalClock * self,
340     GstClockTime local_1, GstClockTime remote_1, GstClockTime remote_2,
341     GstClockTime local_2)
342 {
343   GstClockTime current_timeout = 0;
344   GstClockTime local_avg, remote_avg;
345   gdouble r_squared;
346   GstClock *clock;
347   GstClockTime rtt, rtt_limit, min_update_interval;
348   /* Use for discont tracking */
349   GstClockTime time_before = 0;
350   GstClockTime min_guess = 0;
351   GstClockTimeDiff time_discont = 0;
352   gboolean synched, now_synched;
353   GstClockTime internal_time, external_time, rate_num, rate_den;
354   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
355       orig_rate_den;
356   GstClockTime max_discont;
357   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
358   GstClockTime median;
359   gint i;
360
361   GST_OBJECT_LOCK (self);
362   rtt_limit = self->roundtrip_limit;
363
364   GST_LOG_OBJECT (self,
365       "local1 %" G_GUINT64_FORMAT " remote1 %" G_GUINT64_FORMAT " remote2 %"
366       G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT, local_1, remote_1,
367       remote_2, local_2);
368
369   /* If the server told us a poll interval and it's bigger than the
370    * one configured via the property, use the server's */
371   if (self->last_remote_poll_interval != GST_CLOCK_TIME_NONE &&
372       self->last_remote_poll_interval > self->minimum_update_interval)
373     min_update_interval = self->last_remote_poll_interval;
374   else
375     min_update_interval = self->minimum_update_interval;
376   GST_OBJECT_UNLOCK (self);
377
378   if (local_2 < local_1) {
379     GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
380         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
381         GST_TIME_ARGS (local_2));
382     goto bogus_observation;
383   }
384
385   if (remote_2 < remote_1) {
386     GST_LOG_OBJECT (self,
387         "Dropping observation: remote receive time %" GST_TIME_FORMAT
388         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (remote_1),
389         GST_TIME_ARGS (remote_2));
390     goto bogus_observation;
391   }
392
393   /* The round trip time is (assuming symmetric path delays)
394    * delta = (local_2 - local_1) - (remote_2 - remote_1)
395    */
396
397   rtt = GST_CLOCK_DIFF (local_1, local_2) - GST_CLOCK_DIFF (remote_1, remote_2);
398
399   if ((rtt_limit > 0) && (rtt > rtt_limit)) {
400     GST_LOG_OBJECT (self,
401         "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
402         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
403     goto bogus_observation;
404   }
405
406   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
407     self->last_rtts[i - 1] = self->last_rtts[i];
408   self->last_rtts[i - 1] = rtt;
409
410   if (self->last_rtts_missing) {
411     self->last_rtts_missing--;
412   } else {
413     memcpy (&last_rtts, &self->last_rtts, sizeof (last_rtts));
414     g_qsort_with_data (&last_rtts,
415         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
416         (GCompareDataFunc) compare_clock_time, NULL);
417
418     median = last_rtts[MEDIAN_PRE_FILTERING_WINDOW / 2];
419
420     /* FIXME: We might want to use something else here, like only allowing
421      * things in the interquartile range, or also filtering away delays that
422      * are too small compared to the median. This here worked well enough
423      * in tests so far.
424      */
425     if (rtt > 2 * median) {
426       GST_LOG_OBJECT (self,
427           "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * median %"
428           GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (median));
429       goto bogus_observation;
430     }
431   }
432
433   /* Track an average round trip time, for a bit of smoothing */
434   /* Always update before discarding a sample, so genuine changes in
435    * the network get picked up, eventually */
436   if (self->rtt_avg == GST_CLOCK_TIME_NONE)
437     self->rtt_avg = rtt;
438   else if (rtt < self->rtt_avg) /* Shorter RTTs carry more weight than longer */
439     self->rtt_avg = (3 * self->rtt_avg + rtt) / 4;
440   else
441     self->rtt_avg = (15 * self->rtt_avg + rtt) / 16;
442
443   if (rtt > 2 * self->rtt_avg) {
444     GST_LOG_OBJECT (self,
445         "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
446         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (self->rtt_avg));
447     goto bogus_observation;
448   }
449
450   /* The difference between the local and remote clock (again assuming
451    * symmetric path delays):
452    *
453    * local_1 + delta / 2 - remote_1 = theta
454    * or
455    * local_2 - delta / 2 - remote_2 = theta
456    *
457    * which gives after some simple algebraic transformations:
458    *
459    *         (remote_1 - local_1) + (remote_2 - local_2)
460    * theta = -------------------------------------------
461    *                              2
462    *
463    *
464    * Thus remote time at local_avg is equal to:
465    *
466    * local_avg + theta =
467    *
468    * local_1 + local_2   (remote_1 - local_1) + (remote_2 - local_2)
469    * ----------------- + -------------------------------------------
470    *         2                                2
471    *
472    * =
473    *
474    * remote_1 + remote_2
475    * ------------------- = remote_avg
476    *          2
477    *
478    * We use this for our clock estimation, i.e. local_avg at remote clock
479    * being the same as remote_avg.
480    */
481
482   local_avg = (local_2 + local_1) / 2;
483   remote_avg = (remote_2 + remote_1) / 2;
484
485   GST_LOG_OBJECT (self,
486       "remoteavg %" G_GUINT64_FORMAT " localavg %" G_GUINT64_FORMAT,
487       remote_avg, local_avg);
488
489   clock = GST_CLOCK_CAST (self);
490
491   /* Store what the clock produced as 'now' before this update */
492   gst_clock_get_calibration (GST_CLOCK_CAST (self), &orig_internal_time,
493       &orig_external_time, &orig_rate_num, &orig_rate_den);
494   internal_time = orig_internal_time;
495   external_time = orig_external_time;
496   rate_num = orig_rate_num;
497   rate_den = orig_rate_den;
498
499   min_guess =
500       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1,
501       internal_time, external_time, rate_num, rate_den);
502   time_before =
503       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
504       internal_time, external_time, rate_num, rate_den);
505
506   /* Maximum discontinuity, when we're synched with the master. Could make this a property,
507    * but this value seems to work fine */
508   max_discont = self->rtt_avg / 4;
509
510   /* If the remote observation was within a max_discont window around our min/max estimates, we're synched */
511   synched =
512       (GST_CLOCK_DIFF (remote_avg, min_guess) < (GstClockTimeDiff) (max_discont)
513       && GST_CLOCK_DIFF (time_before,
514           remote_avg) < (GstClockTimeDiff) (max_discont));
515
516   if (gst_clock_add_observation_unapplied (GST_CLOCK_CAST (self),
517           local_avg, remote_avg, &r_squared, &internal_time, &external_time,
518           &rate_num, &rate_den)) {
519
520     /* Now compare the difference (discont) in the clock
521      * after this observation */
522     time_discont = GST_CLOCK_DIFF (time_before,
523         gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
524             internal_time, external_time, rate_num, rate_den));
525
526     /* If we were in sync with the remote clock, clamp the allowed
527      * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
528      * of remote time correctly windowed the actual remote time observation */
529     if (synched && ABS (time_discont) > max_discont) {
530       GstClockTimeDiff offset;
531       GST_DEBUG_OBJECT (clock,
532           "Too large a discont, clamping to 1/4 average RTT = %"
533           GST_TIME_FORMAT, GST_TIME_ARGS (max_discont));
534       if (time_discont > 0) {   /* Too large a forward step - add a -ve offset */
535         offset = max_discont - time_discont;
536         if (-offset > external_time)
537           external_time = 0;
538         else
539           external_time += offset;
540       } else {                  /* Too large a backward step - add a +ve offset */
541         offset = -(max_discont + time_discont);
542         external_time += offset;
543       }
544
545       time_discont += offset;
546     }
547
548     /* Check if the new clock params would have made our observation within range */
549     now_synched =
550         (GST_CLOCK_DIFF (remote_avg,
551             gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self),
552                 local_1, internal_time, external_time, rate_num,
553                 rate_den)) < (GstClockTimeDiff) (max_discont))
554         &&
555         (GST_CLOCK_DIFF (gst_clock_adjust_with_calibration
556             (GST_CLOCK_CAST (self), local_2, internal_time, external_time,
557                 rate_num, rate_den),
558             remote_avg) < (GstClockTimeDiff) (max_discont));
559
560     /* Only update the clock if we had synch or just gained it */
561     if (synched || now_synched || self->skipped_updates > MAX_SKIPPED_UPDATES) {
562       gst_clock_set_calibration (GST_CLOCK_CAST (self), internal_time,
563           external_time, rate_num, rate_den);
564       /* ghetto formula - shorter timeout for bad correlations */
565       current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
566       current_timeout =
567           MIN (current_timeout, gst_clock_get_timeout (GST_CLOCK_CAST (self)));
568       self->skipped_updates = 0;
569
570       /* FIXME: When do we consider the clock absolutely not synced anymore? */
571       gst_clock_set_synced (GST_CLOCK (self), TRUE);
572     } else {
573       /* Restore original calibration vars for the report, we're not changing the clock */
574       internal_time = orig_internal_time;
575       external_time = orig_external_time;
576       rate_num = orig_rate_num;
577       rate_den = orig_rate_den;
578       time_discont = 0;
579       self->skipped_updates++;
580     }
581   }
582
583   /* Limit the polling to at most one per minimum_update_interval */
584   if (rtt < min_update_interval)
585     current_timeout = MAX (min_update_interval - rtt, current_timeout);
586
587   GST_OBJECT_LOCK (self);
588   if (self->busses) {
589     GstStructure *s;
590     GstMessage *msg;
591     GList *l;
592
593     /* Output a stats message, whether we updated the clock or not */
594     s = gst_structure_new ("gst-netclock-statistics",
595         "synchronised", G_TYPE_BOOLEAN, synched,
596         "rtt", G_TYPE_UINT64, rtt,
597         "rtt-average", G_TYPE_UINT64, self->rtt_avg,
598         "local", G_TYPE_UINT64, local_avg,
599         "remote", G_TYPE_UINT64, remote_avg,
600         "discontinuity", G_TYPE_INT64, time_discont,
601         "remote-min-estimate", G_TYPE_UINT64, min_guess,
602         "remote-max-estimate", G_TYPE_UINT64, time_before,
603         "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote_avg,
604             min_guess), "remote-max-error", G_TYPE_INT64,
605         GST_CLOCK_DIFF (remote_avg, time_before), "request-send", G_TYPE_UINT64,
606         local_1, "request-receive", G_TYPE_UINT64, local_2, "r-squared",
607         G_TYPE_DOUBLE, r_squared, "timeout", G_TYPE_UINT64, current_timeout,
608         "internal-time", G_TYPE_UINT64, internal_time, "external-time",
609         G_TYPE_UINT64, external_time, "rate-num", G_TYPE_UINT64, rate_num,
610         "rate-den", G_TYPE_UINT64, rate_den, "rate", G_TYPE_DOUBLE,
611         (gdouble) (rate_num) / rate_den, "local-clock-offset", G_TYPE_INT64,
612         GST_CLOCK_DIFF (internal_time, external_time), NULL);
613     msg = gst_message_new_element (GST_OBJECT (self), s);
614
615     for (l = self->busses; l; l = l->next)
616       gst_bus_post (l->data, gst_message_ref (msg));
617     gst_message_unref (msg);
618   }
619   GST_OBJECT_UNLOCK (self);
620
621   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
622   self->timeout_expiration = gst_util_get_timestamp () + current_timeout;
623
624   return;
625
626 bogus_observation:
627   /* Schedule a new packet again soon */
628   self->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
629   return;
630 }
631
632 static gpointer
633 gst_net_client_internal_clock_thread (gpointer data)
634 {
635   GstNetClientInternalClock *self = data;
636   GSocket *socket = self->socket;
637   GError *err = NULL;
638
639   GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
640
641   g_socket_set_blocking (socket, TRUE);
642   g_socket_set_timeout (socket, 0);
643
644   while (!g_cancellable_is_cancelled (self->cancel)) {
645     GstClockTime expiration_time = self->timeout_expiration;
646     GstClockTime now = gst_util_get_timestamp ();
647     gint64 socket_timeout;
648
649     if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
650       socket_timeout = 0;
651     } else {
652       socket_timeout = (expiration_time - now) / GST_USECOND;
653     }
654
655     GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
656
657     if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
658             self->cancel, &err)) {
659       /* cancelled, timeout or error */
660       if (err->code == G_IO_ERROR_CANCELLED) {
661         GST_INFO_OBJECT (self, "cancelled");
662         g_clear_error (&err);
663         break;
664       } else if (err->code == G_IO_ERROR_TIMED_OUT) {
665         /* timed out, let's send another packet */
666         GST_DEBUG_OBJECT (self, "timed out");
667
668         if (self->is_ntp) {
669           GstNtpPacket *packet;
670
671           packet = gst_ntp_packet_new (NULL, NULL);
672
673           packet->transmit_time =
674               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
675
676           GST_DEBUG_OBJECT (self,
677               "sending packet, local time = %" GST_TIME_FORMAT,
678               GST_TIME_ARGS (packet->transmit_time));
679
680           gst_ntp_packet_send (packet, self->socket, self->servaddr, NULL);
681
682           g_free (packet);
683         } else {
684           GstNetTimePacket *packet;
685
686           packet = gst_net_time_packet_new (NULL);
687
688           packet->local_time =
689               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
690
691           GST_DEBUG_OBJECT (self,
692               "sending packet, local time = %" GST_TIME_FORMAT,
693               GST_TIME_ARGS (packet->local_time));
694
695           gst_net_time_packet_send (packet, self->socket, self->servaddr, NULL);
696
697           g_free (packet);
698         }
699
700         /* reset timeout (but are expecting a response sooner anyway) */
701         self->timeout_expiration =
702             gst_util_get_timestamp () +
703             gst_clock_get_timeout (GST_CLOCK_CAST (self));
704       } else {
705         GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
706         g_usleep (G_USEC_PER_SEC / 10); /* throttle */
707       }
708       g_clear_error (&err);
709     } else {
710       GstClockTime new_local;
711
712       /* got packet */
713
714       new_local = gst_clock_get_internal_time (GST_CLOCK_CAST (self));
715
716       if (self->is_ntp) {
717         GstNtpPacket *packet;
718
719         packet = gst_ntp_packet_receive (socket, NULL, &err);
720
721         if (packet != NULL) {
722           GST_LOG_OBJECT (self, "got packet back");
723           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
724               GST_TIME_ARGS (packet->origin_time));
725           GST_LOG_OBJECT (self, "remote_1 = %" GST_TIME_FORMAT,
726               GST_TIME_ARGS (packet->receive_time));
727           GST_LOG_OBJECT (self, "remote_2 = %" GST_TIME_FORMAT,
728               GST_TIME_ARGS (packet->transmit_time));
729           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
730               GST_TIME_ARGS (new_local));
731           GST_LOG_OBJECT (self, "poll_interval = %" GST_TIME_FORMAT,
732               GST_TIME_ARGS (packet->poll_interval));
733
734           /* Remember the last poll interval we ever got from the server */
735           if (packet->poll_interval != GST_CLOCK_TIME_NONE)
736             self->last_remote_poll_interval = packet->poll_interval;
737
738           /* observe_times will reset the timeout */
739           gst_net_client_internal_clock_observe_times (self,
740               packet->origin_time, packet->receive_time, packet->transmit_time,
741               new_local);
742
743           g_free (packet);
744         } else if (err != NULL) {
745           if (g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION)
746               || g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY)) {
747             GST_ERROR_OBJECT (self, "fatal receive error: %s", err->message);
748             g_clear_error (&err);
749             break;
750           } else if (g_error_matches (err, GST_NTP_ERROR,
751                   GST_NTP_ERROR_KOD_RATE)) {
752             GST_WARNING_OBJECT (self, "need to limit rate");
753
754             /* If the server did not tell us a poll interval before, double
755              * our minimum poll interval. Otherwise we assume that the server
756              * already told us something sensible and that this error here
757              * was just a spurious error */
758             if (self->last_remote_poll_interval == GST_CLOCK_TIME_NONE)
759               self->minimum_update_interval *= 2;
760
761             /* And wait a bit before we send the next packet instead of
762              * sending it immediately */
763             self->timeout_expiration =
764                 gst_util_get_timestamp () +
765                 gst_clock_get_timeout (GST_CLOCK_CAST (self));
766           } else {
767             GST_WARNING_OBJECT (self, "receive error: %s", err->message);
768           }
769           g_clear_error (&err);
770         }
771       } else {
772         GstNetTimePacket *packet;
773
774         packet = gst_net_time_packet_receive (socket, NULL, &err);
775
776         if (packet != NULL) {
777           GST_LOG_OBJECT (self, "got packet back");
778           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
779               GST_TIME_ARGS (packet->local_time));
780           GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
781               GST_TIME_ARGS (packet->remote_time));
782           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
783               GST_TIME_ARGS (new_local));
784
785           /* observe_times will reset the timeout */
786           gst_net_client_internal_clock_observe_times (self, packet->local_time,
787               packet->remote_time, packet->remote_time, new_local);
788
789           g_free (packet);
790         } else if (err != NULL) {
791           GST_WARNING_OBJECT (self, "receive error: %s", err->message);
792           g_clear_error (&err);
793         }
794       }
795     }
796   }
797   GST_INFO_OBJECT (self, "shutting down net client clock thread");
798   return NULL;
799 }
800
801 static gboolean
802 gst_net_client_internal_clock_start (GstNetClientInternalClock * self)
803 {
804   GSocketAddress *servaddr;
805   GSocketAddress *myaddr;
806   GSocketAddress *anyaddr;
807   GInetAddress *inetaddr;
808   GSocket *socket;
809   GError *error = NULL;
810   GSocketFamily family;
811   GPollFD dummy_pollfd;
812   GResolver *resolver = NULL;
813   GError *err = NULL;
814
815   g_return_val_if_fail (self->address != NULL, FALSE);
816   g_return_val_if_fail (self->servaddr == NULL, FALSE);
817
818   /* create target address */
819   inetaddr = g_inet_address_new_from_string (self->address);
820   if (inetaddr == NULL) {
821     GList *results;
822
823     resolver = g_resolver_get_default ();
824
825     results = g_resolver_lookup_by_name (resolver, self->address, NULL, &err);
826     if (!results)
827       goto failed_to_resolve;
828
829     inetaddr = G_INET_ADDRESS (g_object_ref (results->data));
830     g_resolver_free_addresses (results);
831     g_object_unref (resolver);
832   }
833
834   family = g_inet_address_get_family (inetaddr);
835
836   servaddr = g_inet_socket_address_new (inetaddr, self->port);
837   g_object_unref (inetaddr);
838
839   g_assert (servaddr != NULL);
840
841   GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address,
842       self->port);
843
844   socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
845       G_SOCKET_PROTOCOL_UDP, &error);
846
847   if (socket == NULL)
848     goto no_socket;
849
850   GST_DEBUG_OBJECT (self, "binding socket");
851   inetaddr = g_inet_address_new_any (family);
852   anyaddr = g_inet_socket_address_new (inetaddr, 0);
853   g_socket_bind (socket, anyaddr, TRUE, &error);
854   g_object_unref (anyaddr);
855   g_object_unref (inetaddr);
856
857   if (error != NULL)
858     goto bind_error;
859
860   /* check address we're bound to, mostly for debugging purposes */
861   myaddr = g_socket_get_local_address (socket, &error);
862
863   if (myaddr == NULL)
864     goto getsockname_error;
865
866   GST_DEBUG_OBJECT (self, "socket opened on UDP port %d",
867       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
868
869   g_object_unref (myaddr);
870
871   self->cancel = g_cancellable_new ();
872   self->made_cancel_fd =
873       g_cancellable_make_pollfd (self->cancel, &dummy_pollfd);
874
875   self->socket = socket;
876   self->servaddr = G_SOCKET_ADDRESS (servaddr);
877
878   self->thread = g_thread_try_new ("GstNetClientInternalClock",
879       gst_net_client_internal_clock_thread, self, &error);
880
881   if (error != NULL)
882     goto no_thread;
883
884   return TRUE;
885
886   /* ERRORS */
887 no_socket:
888   {
889     GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
890     g_error_free (error);
891     return FALSE;
892   }
893 bind_error:
894   {
895     GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
896     g_error_free (error);
897     g_object_unref (socket);
898     return FALSE;
899   }
900 getsockname_error:
901   {
902     GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
903     g_error_free (error);
904     g_object_unref (socket);
905     return FALSE;
906   }
907 failed_to_resolve:
908   {
909     GST_ERROR_OBJECT (self, "resolving '%s' failed: %s",
910         self->address, err->message);
911     g_clear_error (&err);
912     g_object_unref (resolver);
913     return FALSE;
914   }
915 no_thread:
916   {
917     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
918     g_object_unref (self->servaddr);
919     self->servaddr = NULL;
920     g_object_unref (self->socket);
921     self->socket = NULL;
922     g_error_free (error);
923     return FALSE;
924   }
925 }
926
927 static void
928 gst_net_client_internal_clock_stop (GstNetClientInternalClock * self)
929 {
930   if (self->thread == NULL)
931     return;
932
933   GST_INFO_OBJECT (self, "stopping...");
934   g_cancellable_cancel (self->cancel);
935
936   g_thread_join (self->thread);
937   self->thread = NULL;
938
939   if (self->made_cancel_fd)
940     g_cancellable_release_fd (self->cancel);
941
942   g_object_unref (self->cancel);
943   self->cancel = NULL;
944
945   g_object_unref (self->servaddr);
946   self->servaddr = NULL;
947
948   g_object_unref (self->socket);
949   self->socket = NULL;
950
951   GST_INFO_OBJECT (self, "stopped");
952 }
953
954 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
955   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
956
957 struct _GstNetClientClockPrivate
958 {
959   GstClock *internal_clock;
960
961   GstClockTime roundtrip_limit;
962   GstClockTime minimum_update_interval;
963
964   GstClockTime base_time, internal_base_time;
965
966   gchar *address;
967   gint port;
968
969   GstBus *bus;
970
971   gboolean is_ntp;
972
973   gulong synced_id;
974 };
975
976 G_DEFINE_TYPE (GstNetClientClock, gst_net_client_clock, GST_TYPE_SYSTEM_CLOCK);
977
978 static void gst_net_client_clock_finalize (GObject * object);
979 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
980     const GValue * value, GParamSpec * pspec);
981 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
982     GValue * value, GParamSpec * pspec);
983 static void gst_net_client_clock_constructed (GObject * object);
984
985 static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock);
986
987 static void
988 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
989 {
990   GObjectClass *gobject_class;
991   GstClockClass *clock_class;
992
993   gobject_class = G_OBJECT_CLASS (klass);
994   clock_class = GST_CLOCK_CLASS (klass);
995
996   g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
997
998   gobject_class->finalize = gst_net_client_clock_finalize;
999   gobject_class->get_property = gst_net_client_clock_get_property;
1000   gobject_class->set_property = gst_net_client_clock_set_property;
1001   gobject_class->constructed = gst_net_client_clock_constructed;
1002
1003   g_object_class_install_property (gobject_class, PROP_ADDRESS,
1004       g_param_spec_string ("address", "address",
1005           "The IP address of the machine providing a time server",
1006           DEFAULT_ADDRESS,
1007           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1008   g_object_class_install_property (gobject_class, PROP_PORT,
1009       g_param_spec_int ("port", "port",
1010           "The port on which the remote server is listening", 0, G_MAXUINT16,
1011           DEFAULT_PORT,
1012           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1013   g_object_class_install_property (gobject_class, PROP_BUS,
1014       g_param_spec_object ("bus", "bus",
1015           "A GstBus on which to send clock status information", GST_TYPE_BUS,
1016           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1017
1018   /**
1019    * GstNetClientInternalClock::round-trip-limit:
1020    *
1021    * Maximum allowed round-trip for packets. If this property is set to a nonzero
1022    * value, all packets with a round-trip interval larger than this limit will be
1023    * ignored. This is useful for networks with severe and fluctuating transport
1024    * delays. Filtering out these packets increases stability of the synchronization.
1025    * On the other hand, the lower the limit, the higher the amount of filtered
1026    * packets. Empirical tests are typically necessary to estimate a good value
1027    * for the limit.
1028    * If the property is set to zero, the limit is disabled.
1029    *
1030    * Since: 1.4
1031    */
1032   g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
1033       g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
1034           "Maximum tolerable round-trip interval for packets, in nanoseconds "
1035           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
1036           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1037
1038   g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL,
1039       g_param_spec_uint64 ("minimum-update-interval", "minimum update interval",
1040           "Minimum polling interval for packets, in nanoseconds"
1041           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL,
1042           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1043
1044   g_object_class_install_property (gobject_class, PROP_BASE_TIME,
1045       g_param_spec_uint64 ("base-time", "Base Time",
1046           "Initial time that is reported before synchronization", 0,
1047           G_MAXUINT64, DEFAULT_BASE_TIME,
1048           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1049
1050   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
1051       g_param_spec_object ("internal-clock", "Internal Clock",
1052           "Internal clock that directly slaved to the remote clock",
1053           GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1054
1055   clock_class->get_internal_time = gst_net_client_clock_get_internal_time;
1056 }
1057
1058 static void
1059 gst_net_client_clock_init (GstNetClientClock * self)
1060 {
1061   GstNetClientClockPrivate *priv;
1062   GstClock *clock;
1063
1064   self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
1065
1066   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
1067   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
1068
1069   priv->port = DEFAULT_PORT;
1070   priv->address = g_strdup (DEFAULT_ADDRESS);
1071
1072   priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
1073   priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
1074
1075   clock = gst_system_clock_obtain ();
1076   priv->base_time = DEFAULT_BASE_TIME;
1077   priv->internal_base_time = gst_clock_get_time (clock);
1078   gst_object_unref (clock);
1079 }
1080
1081 /* Must be called with clocks_lock */
1082 static void
1083 update_clock_cache (ClockCache * cache)
1084 {
1085   GstClockTime roundtrip_limit = 0, minimum_update_interval = 0;
1086   GList *l, *busses = NULL;
1087
1088   GST_OBJECT_LOCK (cache->clock);
1089   g_list_free_full (GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses,
1090       (GDestroyNotify) gst_object_unref);
1091
1092   for (l = cache->clocks; l; l = l->next) {
1093     GstNetClientClock *clock = l->data;
1094
1095     if (clock->priv->bus)
1096       busses = g_list_prepend (busses, gst_object_ref (clock->priv->bus));
1097
1098     if (roundtrip_limit == 0)
1099       roundtrip_limit = clock->priv->roundtrip_limit;
1100     else
1101       roundtrip_limit = MAX (roundtrip_limit, clock->priv->roundtrip_limit);
1102
1103     if (minimum_update_interval == 0)
1104       minimum_update_interval = clock->priv->minimum_update_interval;
1105     else
1106       minimum_update_interval =
1107           MIN (minimum_update_interval, clock->priv->minimum_update_interval);
1108   }
1109   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses = busses;
1110   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->roundtrip_limit =
1111       roundtrip_limit;
1112   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->minimum_update_interval =
1113       minimum_update_interval;
1114
1115   GST_OBJECT_UNLOCK (cache->clock);
1116 }
1117
1118 static gboolean
1119 remove_clock_cache (GstClock * clock, GstClockTime time, GstClockID id,
1120     gpointer user_data)
1121 {
1122   ClockCache *cache = user_data;
1123
1124   G_LOCK (clocks_lock);
1125   if (!cache->clocks) {
1126     gst_clock_id_unref (cache->remove_id);
1127     gst_object_unref (cache->clock);
1128     clocks = g_list_remove (clocks, cache);
1129     g_free (cache);
1130   }
1131   G_UNLOCK (clocks_lock);
1132
1133   return TRUE;
1134 }
1135
1136 static void
1137 gst_net_client_clock_finalize (GObject * object)
1138 {
1139   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1140   GList *l;
1141
1142   if (self->priv->synced_id)
1143     g_signal_handler_disconnect (self->priv->internal_clock,
1144         self->priv->synced_id);
1145   self->priv->synced_id = 0;
1146
1147   G_LOCK (clocks_lock);
1148   for (l = clocks; l; l = l->next) {
1149     ClockCache *cache = l->data;
1150
1151     if (cache->clock == self->priv->internal_clock) {
1152       cache->clocks = g_list_remove (cache->clocks, self);
1153
1154       if (cache->clocks) {
1155         update_clock_cache (cache);
1156       } else {
1157         GstClock *sysclock = gst_system_clock_obtain ();
1158         GstClockTime time = gst_clock_get_time (sysclock) + 60 * GST_SECOND;
1159
1160         cache->remove_id = gst_clock_new_single_shot_id (sysclock, time);
1161         gst_clock_id_wait_async (cache->remove_id, remove_clock_cache, cache,
1162             NULL);
1163         gst_object_unref (sysclock);
1164       }
1165       break;
1166     }
1167   }
1168   G_UNLOCK (clocks_lock);
1169
1170   g_free (self->priv->address);
1171   self->priv->address = NULL;
1172
1173   if (self->priv->bus != NULL) {
1174     gst_object_unref (self->priv->bus);
1175     self->priv->bus = NULL;
1176   }
1177
1178   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->finalize (object);
1179 }
1180
1181 static void
1182 gst_net_client_clock_set_property (GObject * object, guint prop_id,
1183     const GValue * value, GParamSpec * pspec)
1184 {
1185   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1186   gboolean update = FALSE;
1187
1188   switch (prop_id) {
1189     case PROP_ADDRESS:
1190       GST_OBJECT_LOCK (self);
1191       g_free (self->priv->address);
1192       self->priv->address = g_value_dup_string (value);
1193       if (self->priv->address == NULL)
1194         self->priv->address = g_strdup (DEFAULT_ADDRESS);
1195       GST_OBJECT_UNLOCK (self);
1196       break;
1197     case PROP_PORT:
1198       GST_OBJECT_LOCK (self);
1199       self->priv->port = g_value_get_int (value);
1200       GST_OBJECT_UNLOCK (self);
1201       break;
1202     case PROP_ROUNDTRIP_LIMIT:
1203       GST_OBJECT_LOCK (self);
1204       self->priv->roundtrip_limit = g_value_get_uint64 (value);
1205       GST_OBJECT_UNLOCK (self);
1206       update = TRUE;
1207       break;
1208     case PROP_MINIMUM_UPDATE_INTERVAL:
1209       GST_OBJECT_LOCK (self);
1210       self->priv->minimum_update_interval = g_value_get_uint64 (value);
1211       GST_OBJECT_UNLOCK (self);
1212       update = TRUE;
1213       break;
1214     case PROP_BUS:
1215       GST_OBJECT_LOCK (self);
1216       if (self->priv->bus)
1217         gst_object_unref (self->priv->bus);
1218       self->priv->bus = g_value_dup_object (value);
1219       GST_OBJECT_UNLOCK (self);
1220       update = TRUE;
1221       break;
1222     case PROP_BASE_TIME:{
1223       GstClock *clock;
1224
1225       self->priv->base_time = g_value_get_uint64 (value);
1226       clock = gst_system_clock_obtain ();
1227       self->priv->internal_base_time = gst_clock_get_time (clock);
1228       gst_object_unref (clock);
1229       break;
1230     }
1231     default:
1232       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1233       break;
1234   }
1235
1236   if (update && self->priv->internal_clock) {
1237     GList *l;
1238
1239     G_LOCK (clocks_lock);
1240     for (l = clocks; l; l = l->next) {
1241       ClockCache *cache = l->data;
1242
1243       if (cache->clock == self->priv->internal_clock) {
1244         update_clock_cache (cache);
1245       }
1246     }
1247     G_UNLOCK (clocks_lock);
1248   }
1249 }
1250
1251 static void
1252 gst_net_client_clock_get_property (GObject * object, guint prop_id,
1253     GValue * value, GParamSpec * pspec)
1254 {
1255   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1256
1257   switch (prop_id) {
1258     case PROP_ADDRESS:
1259       GST_OBJECT_LOCK (self);
1260       g_value_set_string (value, self->priv->address);
1261       GST_OBJECT_UNLOCK (self);
1262       break;
1263     case PROP_PORT:
1264       g_value_set_int (value, self->priv->port);
1265       break;
1266     case PROP_ROUNDTRIP_LIMIT:
1267       GST_OBJECT_LOCK (self);
1268       g_value_set_uint64 (value, self->priv->roundtrip_limit);
1269       GST_OBJECT_UNLOCK (self);
1270       break;
1271     case PROP_MINIMUM_UPDATE_INTERVAL:
1272       GST_OBJECT_LOCK (self);
1273       g_value_set_uint64 (value, self->priv->minimum_update_interval);
1274       GST_OBJECT_UNLOCK (self);
1275       break;
1276     case PROP_BUS:
1277       GST_OBJECT_LOCK (self);
1278       g_value_set_object (value, self->priv->bus);
1279       GST_OBJECT_UNLOCK (self);
1280       break;
1281     case PROP_BASE_TIME:
1282       g_value_set_uint64 (value, self->priv->base_time);
1283       break;
1284     case PROP_INTERNAL_CLOCK:
1285       g_value_set_object (value, self->priv->internal_clock);
1286       break;
1287     default:
1288       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1289       break;
1290   }
1291 }
1292
1293 static void
1294 gst_net_client_clock_synced_cb (GstClock * internal_clock, gboolean synced,
1295     GstClock * self)
1296 {
1297   gst_clock_set_synced (self, synced);
1298 }
1299
1300 static void
1301 gst_net_client_clock_constructed (GObject * object)
1302 {
1303   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1304   GstClock *internal_clock;
1305   GList *l;
1306   ClockCache *cache = NULL;
1307
1308   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->constructed (object);
1309
1310   G_LOCK (clocks_lock);
1311   for (l = clocks; l; l = l->next) {
1312     ClockCache *tmp = l->data;
1313     GstNetClientInternalClock *internal_clock =
1314         GST_NET_CLIENT_INTERNAL_CLOCK (tmp->clock);
1315
1316     if (strcmp (internal_clock->address, self->priv->address) == 0 &&
1317         internal_clock->port == self->priv->port) {
1318       cache = tmp;
1319
1320       if (cache->remove_id) {
1321         gst_clock_id_unschedule (cache->remove_id);
1322         cache->remove_id = NULL;
1323       }
1324       break;
1325     }
1326   }
1327
1328   if (!cache) {
1329     cache = g_new0 (ClockCache, 1);
1330
1331     cache->clock =
1332         g_object_new (GST_TYPE_NET_CLIENT_INTERNAL_CLOCK, "address",
1333         self->priv->address, "port", self->priv->port, "is-ntp",
1334         self->priv->is_ntp, NULL);
1335     clocks = g_list_prepend (clocks, cache);
1336
1337     /* Not actually leaked but is cached for a while before being disposed,
1338      * see gst_net_client_clock_finalize, so pretend it is to not confuse
1339      * tests. */
1340     GST_OBJECT_FLAG_SET (cache->clock, GST_OBJECT_FLAG_MAY_BE_LEAKED);
1341   }
1342
1343   cache->clocks = g_list_prepend (cache->clocks, self);
1344
1345   GST_OBJECT_LOCK (cache->clock);
1346   if (gst_clock_is_synced (cache->clock))
1347     gst_clock_set_synced (GST_CLOCK (self), TRUE);
1348   self->priv->synced_id =
1349       g_signal_connect (cache->clock, "synced",
1350       G_CALLBACK (gst_net_client_clock_synced_cb), self);
1351   GST_OBJECT_UNLOCK (cache->clock);
1352
1353   G_UNLOCK (clocks_lock);
1354
1355   self->priv->internal_clock = internal_clock = cache->clock;
1356
1357   /* all systems go, cap'n */
1358 }
1359
1360 static GstClockTime
1361 gst_net_client_clock_get_internal_time (GstClock * clock)
1362 {
1363   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock);
1364
1365   if (!gst_clock_is_synced (self->priv->internal_clock)) {
1366     GstClockTime now = gst_clock_get_internal_time (self->priv->internal_clock);
1367     return gst_clock_adjust_with_calibration (self->priv->internal_clock, now,
1368         self->priv->internal_base_time, self->priv->base_time, 1, 1);
1369   }
1370
1371   return gst_clock_get_time (self->priv->internal_clock);
1372 }
1373
1374 /**
1375  * gst_net_client_clock_new:
1376  * @name: a name for the clock
1377  * @remote_address: the address or hostname of the remote clock provider
1378  * @remote_port: the port of the remote clock provider
1379  * @base_time: initial time of the clock
1380  *
1381  * Create a new #GstNetClientInternalClock that will report the time
1382  * provided by the #GstNetTimeProvider on @remote_address and
1383  * @remote_port.
1384  *
1385  * Returns: a new #GstClock that receives a time from the remote
1386  * clock.
1387  */
1388 GstClock *
1389 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
1390     gint remote_port, GstClockTime base_time)
1391 {
1392   GstClock *ret;
1393
1394   g_return_val_if_fail (remote_address != NULL, NULL);
1395   g_return_val_if_fail (remote_port > 0, NULL);
1396   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1397   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1398
1399   ret =
1400       g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "name", name, "address",
1401       remote_address, "port", remote_port, "base-time", base_time, NULL);
1402
1403   return ret;
1404 }
1405
1406 G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_NET_CLIENT_CLOCK);
1407
1408 static void
1409 gst_ntp_clock_class_init (GstNtpClockClass * klass)
1410 {
1411 }
1412
1413 static void
1414 gst_ntp_clock_init (GstNtpClock * self)
1415 {
1416   GST_NET_CLIENT_CLOCK (self)->priv->is_ntp = TRUE;
1417 }
1418
1419 /**
1420  * gst_ntp_clock_new:
1421  * @name: a name for the clock
1422  * @remote_address: the address or hostname of the remote clock provider
1423  * @remote_port: the port of the remote clock provider
1424  * @base_time: initial time of the clock
1425  *
1426  * Create a new #GstNtpClock that will report the time provided by
1427  * the NTPv4 server on @remote_address and @remote_port.
1428  *
1429  * Returns: a new #GstClock that receives a time from the remote
1430  * clock.
1431  *
1432  * Since: 1.6
1433  */
1434 GstClock *
1435 gst_ntp_clock_new (const gchar * name, const gchar * remote_address,
1436     gint remote_port, GstClockTime base_time)
1437 {
1438   GstClock *ret;
1439
1440   g_return_val_if_fail (remote_address != NULL, NULL);
1441   g_return_val_if_fail (remote_port > 0, NULL);
1442   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1443   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1444
1445   ret =
1446       g_object_new (GST_TYPE_NTP_CLOCK, "name", name, "address", remote_address,
1447       "port", remote_port, "base-time", base_time, NULL);
1448
1449   return ret;
1450 }