netclientclock: Destroy a cached clock 60 seconds after its last use
[platform/upstream/gstreamer.git] / libs / gst / net / gstnetclientclock.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2005 Andy Wingo <wingo@pobox.com>
5  * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
7  *
8  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
9  * the network
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION:gstnetclientclock
28  * @short_description: Special clock that synchronizes to a remote time
29  *                     provider.
30  * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
31  *
32  * #GstNetClientClock implements a custom #GstClock that synchronizes its time
33  * to a remote time provider such as #GstNetTimeProvider. #GstNtpClock
34  * implements a #GstClock that synchronizes its time to a remote NTPv4 server.
35  *
36  * A new clock is created with gst_net_client_clock_new() or
37  * gst_ntp_clock_new(), which takes the address and port of the remote time
38  * provider along with a name and an initial time.
39  *
40  * This clock will poll the time provider and will update its calibration
41  * parameters based on the local and remote observations.
42  *
43  * The "round-trip" property limits the maximum round trip packets can take.
44  *
45  * Various parameters of the clock can be configured with the parent #GstClock
46  * "timeout", "window-size" and "window-threshold" object properties.
47  *
48  * A #GstNetClientClock and #GstNtpClock is typically set on a #GstPipeline with
49  * gst_pipeline_use_clock().
50  *
51  * If you set a #GstBus on the clock via the "bus" object property, it will
52  * send @GST_MESSAGE_ELEMENT messages with an attached #GstStructure containing
53  * statistics about clock accuracy and network traffic.
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include "gstnettimepacket.h"
61 #include "gstntppacket.h"
62 #include "gstnetclientclock.h"
63
64 #include <gio/gio.h>
65
66 #include <string.h>
67
68 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
69 #define GST_CAT_DEFAULT (ncc_debug)
70
71 typedef struct
72 {
73   GstClock *clock;              /* GstNetClientInternalClock */
74
75   GList *clocks;                /* GstNetClientClocks */
76
77   GstClockID remove_id;
78 } ClockCache;
79
80 G_LOCK_DEFINE_STATIC (clocks_lock);
81 static GList *clocks = NULL;
82
83 #define GST_TYPE_NET_CLIENT_INTERNAL_CLOCK \
84   (gst_net_client_internal_clock_get_type())
85 #define GST_NET_CLIENT_INTERNAL_CLOCK(obj) \
86   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClock))
87 #define GST_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
88   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClockClass))
89 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK(obj) \
90   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
91 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
92   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
93
94 typedef struct _GstNetClientInternalClock GstNetClientInternalClock;
95 typedef struct _GstNetClientInternalClockClass GstNetClientInternalClockClass;
96
97 G_GNUC_INTERNAL GType gst_net_client_internal_clock_get_type (void);
98
99 #define DEFAULT_ADDRESS         "127.0.0.1"
100 #define DEFAULT_PORT            5637
101 #define DEFAULT_TIMEOUT         GST_SECOND
102 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
103 /* Minimum timeout will be immediately (ie, as fast as one RTT), but no
104  * more often than 1/20th second (arbitrarily, to spread observations a little) */
105 #define DEFAULT_MINIMUM_UPDATE_INTERVAL (GST_SECOND / 20)
106 #define DEFAULT_BASE_TIME       0
107
108 /* Maximum number of clock updates we can skip before updating */
109 #define MAX_SKIPPED_UPDATES 5
110
111 #define MEDIAN_PRE_FILTERING_WINDOW 9
112
113 enum
114 {
115   PROP_0,
116   PROP_ADDRESS,
117   PROP_PORT,
118   PROP_ROUNDTRIP_LIMIT,
119   PROP_MINIMUM_UPDATE_INTERVAL,
120   PROP_BUS,
121   PROP_BASE_TIME,
122   PROP_INTERNAL_CLOCK,
123   PROP_IS_NTP
124 };
125
126 struct _GstNetClientInternalClock
127 {
128   GstSystemClock clock;
129
130   GThread *thread;
131
132   GSocket *socket;
133   GSocketAddress *servaddr;
134   GCancellable *cancel;
135   gboolean made_cancel_fd;
136
137   GstClockTime timeout_expiration;
138   GstClockTime roundtrip_limit;
139   GstClockTime rtt_avg;
140   GstClockTime minimum_update_interval;
141   GstClockTime last_remote_poll_interval;
142   guint skipped_updates;
143   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
144   gint last_rtts_missing;
145
146   gchar *address;
147   gint port;
148   gboolean is_ntp;
149
150   /* Protected by OBJECT_LOCK */
151   GList *busses;
152 };
153
154 struct _GstNetClientInternalClockClass
155 {
156   GstSystemClockClass parent_class;
157 };
158
159 #define _do_init \
160   GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
161 G_DEFINE_TYPE_WITH_CODE (GstNetClientInternalClock,
162     gst_net_client_internal_clock, GST_TYPE_SYSTEM_CLOCK, _do_init);
163
164 static void gst_net_client_internal_clock_finalize (GObject * object);
165 static void gst_net_client_internal_clock_set_property (GObject * object,
166     guint prop_id, const GValue * value, GParamSpec * pspec);
167 static void gst_net_client_internal_clock_get_property (GObject * object,
168     guint prop_id, GValue * value, GParamSpec * pspec);
169 static void gst_net_client_internal_clock_constructed (GObject * object);
170
171 static gboolean gst_net_client_internal_clock_start (GstNetClientInternalClock *
172     self);
173 static void gst_net_client_internal_clock_stop (GstNetClientInternalClock *
174     self);
175
176 static void
177 gst_net_client_internal_clock_class_init (GstNetClientInternalClockClass *
178     klass)
179 {
180   GObjectClass *gobject_class;
181
182   gobject_class = G_OBJECT_CLASS (klass);
183
184   gobject_class->finalize = gst_net_client_internal_clock_finalize;
185   gobject_class->get_property = gst_net_client_internal_clock_get_property;
186   gobject_class->set_property = gst_net_client_internal_clock_set_property;
187   gobject_class->constructed = gst_net_client_internal_clock_constructed;
188
189   g_object_class_install_property (gobject_class, PROP_ADDRESS,
190       g_param_spec_string ("address", "address",
191           "The IP address of the machine providing a time server",
192           DEFAULT_ADDRESS,
193           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
194   g_object_class_install_property (gobject_class, PROP_PORT,
195       g_param_spec_int ("port", "port",
196           "The port on which the remote server is listening", 0, G_MAXUINT16,
197           DEFAULT_PORT,
198           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
199   g_object_class_install_property (gobject_class, PROP_IS_NTP,
200       g_param_spec_boolean ("is-ntp", "Is NTP",
201           "The clock is using the NTPv4 protocol", FALSE,
202           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
203 }
204
205 static void
206 gst_net_client_internal_clock_init (GstNetClientInternalClock * self)
207 {
208   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
209
210   self->port = DEFAULT_PORT;
211   self->address = g_strdup (DEFAULT_ADDRESS);
212   self->is_ntp = FALSE;
213
214   gst_clock_set_timeout (GST_CLOCK (self), DEFAULT_TIMEOUT);
215
216   self->thread = NULL;
217
218   self->servaddr = NULL;
219   self->rtt_avg = GST_CLOCK_TIME_NONE;
220   self->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
221   self->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
222   self->last_remote_poll_interval = GST_CLOCK_TIME_NONE;
223   self->skipped_updates = 0;
224   self->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW;
225 }
226
227 static void
228 gst_net_client_internal_clock_finalize (GObject * object)
229 {
230   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
231
232   if (self->thread) {
233     gst_net_client_internal_clock_stop (self);
234   }
235
236   g_free (self->address);
237   self->address = NULL;
238
239   if (self->servaddr != NULL) {
240     g_object_unref (self->servaddr);
241     self->servaddr = NULL;
242   }
243
244   if (self->socket != NULL) {
245     g_socket_close (self->socket, NULL);
246     g_object_unref (self->socket);
247     self->socket = NULL;
248   }
249
250   g_warn_if_fail (self->busses == NULL);
251
252   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->finalize
253       (object);
254 }
255
256 static void
257 gst_net_client_internal_clock_set_property (GObject * object, guint prop_id,
258     const GValue * value, GParamSpec * pspec)
259 {
260   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
261
262   switch (prop_id) {
263     case PROP_ADDRESS:
264       GST_OBJECT_LOCK (self);
265       g_free (self->address);
266       self->address = g_value_dup_string (value);
267       if (self->address == NULL)
268         self->address = g_strdup (DEFAULT_ADDRESS);
269       GST_OBJECT_UNLOCK (self);
270       break;
271     case PROP_PORT:
272       GST_OBJECT_LOCK (self);
273       self->port = g_value_get_int (value);
274       GST_OBJECT_UNLOCK (self);
275       break;
276     case PROP_IS_NTP:
277       GST_OBJECT_LOCK (self);
278       self->is_ntp = g_value_get_boolean (value);
279       GST_OBJECT_UNLOCK (self);
280       break;
281     default:
282       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
283       break;
284   }
285 }
286
287 static void
288 gst_net_client_internal_clock_get_property (GObject * object, guint prop_id,
289     GValue * value, GParamSpec * pspec)
290 {
291   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
292
293   switch (prop_id) {
294     case PROP_ADDRESS:
295       GST_OBJECT_LOCK (self);
296       g_value_set_string (value, self->address);
297       GST_OBJECT_UNLOCK (self);
298       break;
299     case PROP_PORT:
300       g_value_set_int (value, self->port);
301       break;
302     case PROP_IS_NTP:
303       g_value_set_boolean (value, self->is_ntp);
304       break;
305     default:
306       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
307       break;
308   }
309 }
310
311 static void
312 gst_net_client_internal_clock_constructed (GObject * object)
313 {
314   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
315
316   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->constructed
317       (object);
318
319   if (!gst_net_client_internal_clock_start (self)) {
320     g_warning ("failed to start clock '%s'", GST_OBJECT_NAME (self));
321   }
322
323   /* all systems go, cap'n */
324 }
325
326 static gint
327 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
328 {
329   if (*a < *b)
330     return -1;
331   else if (*a > *b)
332     return 1;
333   return 0;
334 }
335
336 static void
337 gst_net_client_internal_clock_observe_times (GstNetClientInternalClock * self,
338     GstClockTime local_1, GstClockTime remote_1, GstClockTime remote_2,
339     GstClockTime local_2)
340 {
341   GstClockTime current_timeout = 0;
342   GstClockTime local_avg, remote_avg;
343   gdouble r_squared;
344   GstClock *clock;
345   GstClockTime rtt, rtt_limit, min_update_interval;
346   /* Use for discont tracking */
347   GstClockTime time_before = 0;
348   GstClockTime min_guess = 0;
349   GstClockTimeDiff time_discont = 0;
350   gboolean synched, now_synched;
351   GstClockTime internal_time, external_time, rate_num, rate_den;
352   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
353       orig_rate_den;
354   GstClockTime max_discont;
355   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
356   GstClockTime median;
357   gint i;
358
359   GST_OBJECT_LOCK (self);
360   rtt_limit = self->roundtrip_limit;
361
362   /* If the server told us a poll interval and it's bigger than the
363    * one configured via the property, use the server's */
364   if (self->last_remote_poll_interval != GST_CLOCK_TIME_NONE &&
365       self->last_remote_poll_interval > self->minimum_update_interval)
366     min_update_interval = self->last_remote_poll_interval;
367   else
368     min_update_interval = self->minimum_update_interval;
369   GST_OBJECT_UNLOCK (self);
370
371   if (local_2 < local_1) {
372     GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
373         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
374         GST_TIME_ARGS (local_2));
375     goto bogus_observation;
376   }
377
378   if (remote_2 < remote_1) {
379     GST_LOG_OBJECT (self,
380         "Dropping observation: remote receive time %" GST_TIME_FORMAT
381         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (remote_1),
382         GST_TIME_ARGS (remote_2));
383     goto bogus_observation;
384   }
385
386   /* The round trip time is (assuming symmetric path delays)
387    * delta = (local_2 - local_1) - (remote_2 - remote_1)
388    */
389
390   rtt = GST_CLOCK_DIFF (local_1, local_2) - GST_CLOCK_DIFF (remote_1, remote_2);
391
392   if ((rtt_limit > 0) && (rtt > rtt_limit)) {
393     GST_LOG_OBJECT (self,
394         "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
395         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
396     goto bogus_observation;
397   }
398
399   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
400     self->last_rtts[i - 1] = self->last_rtts[i];
401   self->last_rtts[i - 1] = rtt;
402
403   if (self->last_rtts_missing) {
404     self->last_rtts_missing--;
405   } else {
406     memcpy (&last_rtts, &self->last_rtts, sizeof (last_rtts));
407     g_qsort_with_data (&last_rtts,
408         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
409         (GCompareDataFunc) compare_clock_time, NULL);
410
411     median = last_rtts[MEDIAN_PRE_FILTERING_WINDOW / 2];
412
413     /* FIXME: We might want to use something else here, like only allowing
414      * things in the interquartile range, or also filtering away delays that
415      * are too small compared to the median. This here worked well enough
416      * in tests so far.
417      */
418     if (rtt > 2 * median) {
419       GST_LOG_OBJECT (self,
420           "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * median %"
421           GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (median));
422       goto bogus_observation;
423     }
424   }
425
426   /* Track an average round trip time, for a bit of smoothing */
427   /* Always update before discarding a sample, so genuine changes in
428    * the network get picked up, eventually */
429   if (self->rtt_avg == GST_CLOCK_TIME_NONE)
430     self->rtt_avg = rtt;
431   else if (rtt < self->rtt_avg) /* Shorter RTTs carry more weight than longer */
432     self->rtt_avg = (3 * self->rtt_avg + rtt) / 4;
433   else
434     self->rtt_avg = (15 * self->rtt_avg + rtt) / 16;
435
436   if (rtt > 2 * self->rtt_avg) {
437     GST_LOG_OBJECT (self,
438         "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
439         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (self->rtt_avg));
440     goto bogus_observation;
441   }
442
443   /* The difference between the local and remote clock (again assuming
444    * symmetric path delays):
445    *
446    * local_1 + delta / 2 - remote_1 = theta
447    * or
448    * local_2 - delta / 2 - remote_2 = theta
449    *
450    * which gives after some simple algebraic transformations:
451    *
452    *         (remote_1 - local_1) + (remote_2 - local_2)
453    * theta = -------------------------------------------
454    *                              2
455    *
456    *
457    * Thus remote time at local_avg is equal to:
458    *
459    * local_avg + theta =
460    *
461    * local_1 + local_2   (remote_1 - local_1) + (remote_2 - local_2)
462    * ----------------- + -------------------------------------------
463    *         2                                2
464    *
465    * =
466    *
467    * remote_1 + remote_2
468    * ------------------- = remote_avg
469    *          2
470    *
471    * We use this for our clock estimation, i.e. local_avg at remote clock
472    * being the same as remote_avg.
473    */
474
475   local_avg = (local_2 + local_1) / 2;
476   remote_avg = (remote_2 + remote_1) / 2;
477
478   GST_LOG_OBJECT (self,
479       "local1 %" G_GUINT64_FORMAT " remote1 %" G_GUINT64_FORMAT " remote2 %"
480       G_GUINT64_FORMAT " remoteavg %" G_GUINT64_FORMAT " localavg %"
481       G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT, local_1, remote_1,
482       remote_2, remote_avg, local_avg, local_2);
483
484   clock = GST_CLOCK_CAST (self);
485
486   /* Store what the clock produced as 'now' before this update */
487   gst_clock_get_calibration (GST_CLOCK_CAST (self), &orig_internal_time,
488       &orig_external_time, &orig_rate_num, &orig_rate_den);
489   internal_time = orig_internal_time;
490   external_time = orig_external_time;
491   rate_num = orig_rate_num;
492   rate_den = orig_rate_den;
493
494   min_guess =
495       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1,
496       internal_time, external_time, rate_num, rate_den);
497   time_before =
498       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
499       internal_time, external_time, rate_num, rate_den);
500
501   /* Maximum discontinuity, when we're synched with the master. Could make this a property,
502    * but this value seems to work fine */
503   max_discont = self->rtt_avg / 4;
504
505   /* If the remote observation was within a max_discont window around our min/max estimates, we're synched */
506   synched =
507       (GST_CLOCK_DIFF (remote_avg, min_guess) < (GstClockTimeDiff) (max_discont)
508       && GST_CLOCK_DIFF (time_before,
509           remote_avg) < (GstClockTimeDiff) (max_discont));
510
511   if (gst_clock_add_observation_unapplied (GST_CLOCK_CAST (self),
512           local_avg, remote_avg, &r_squared, &internal_time, &external_time,
513           &rate_num, &rate_den)) {
514
515     /* Now compare the difference (discont) in the clock
516      * after this observation */
517     time_discont = GST_CLOCK_DIFF (time_before,
518         gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
519             internal_time, external_time, rate_num, rate_den));
520
521     /* If we were in sync with the remote clock, clamp the allowed
522      * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
523      * of remote time correctly windowed the actual remote time observation */
524     if (synched && ABS (time_discont) > max_discont) {
525       GstClockTimeDiff offset;
526       GST_DEBUG_OBJECT (clock,
527           "Too large a discont, clamping to 1/4 average RTT = %"
528           GST_TIME_FORMAT, GST_TIME_ARGS (max_discont));
529       if (time_discont > 0) {   /* Too large a forward step - add a -ve offset */
530         offset = max_discont - time_discont;
531         if (-offset > external_time)
532           external_time = 0;
533         else
534           external_time += offset;
535       } else {                  /* Too large a backward step - add a +ve offset */
536         offset = -(max_discont + time_discont);
537         external_time += offset;
538       }
539
540       time_discont += offset;
541     }
542
543     /* Check if the new clock params would have made our observation within range */
544     now_synched =
545         (GST_CLOCK_DIFF (remote_avg,
546             gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self),
547                 local_1, internal_time, external_time, rate_num,
548                 rate_den)) < (GstClockTimeDiff) (max_discont))
549         &&
550         (GST_CLOCK_DIFF (gst_clock_adjust_with_calibration
551             (GST_CLOCK_CAST (self), local_2, internal_time, external_time,
552                 rate_num, rate_den),
553             remote_avg) < (GstClockTimeDiff) (max_discont));
554
555     /* Only update the clock if we had synch or just gained it */
556     if (synched || now_synched || self->skipped_updates > MAX_SKIPPED_UPDATES) {
557       gst_clock_set_calibration (GST_CLOCK_CAST (self), internal_time,
558           external_time, rate_num, rate_den);
559       /* ghetto formula - shorter timeout for bad correlations */
560       current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
561       current_timeout =
562           MIN (current_timeout, gst_clock_get_timeout (GST_CLOCK_CAST (self)));
563       self->skipped_updates = 0;
564
565       /* FIXME: When do we consider the clock absolutely not synced anymore? */
566       gst_clock_set_synced (GST_CLOCK (self), TRUE);
567     } else {
568       /* Restore original calibration vars for the report, we're not changing the clock */
569       internal_time = orig_internal_time;
570       external_time = orig_external_time;
571       rate_num = orig_rate_num;
572       rate_den = orig_rate_den;
573       time_discont = 0;
574       self->skipped_updates++;
575     }
576   }
577
578   /* Limit the polling to at most one per minimum_update_interval */
579   if (rtt < min_update_interval)
580     current_timeout = MAX (min_update_interval - rtt, current_timeout);
581
582   GST_OBJECT_LOCK (self);
583   if (self->busses) {
584     GstStructure *s;
585     GstMessage *msg;
586     GList *l;
587
588     /* Output a stats message, whether we updated the clock or not */
589     s = gst_structure_new ("gst-netclock-statistics",
590         "synchronised", G_TYPE_BOOLEAN, synched,
591         "rtt", G_TYPE_UINT64, rtt,
592         "rtt-average", G_TYPE_UINT64, self->rtt_avg,
593         "local", G_TYPE_UINT64, local_avg,
594         "remote", G_TYPE_UINT64, remote_avg,
595         "discontinuity", G_TYPE_INT64, time_discont,
596         "remote-min-estimate", G_TYPE_UINT64, min_guess,
597         "remote-max-estimate", G_TYPE_UINT64, time_before,
598         "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote_avg,
599             min_guess), "remote-max-error", G_TYPE_INT64,
600         GST_CLOCK_DIFF (remote_avg, time_before), "request-send", G_TYPE_UINT64,
601         local_1, "request-receive", G_TYPE_UINT64, local_2, "r-squared",
602         G_TYPE_DOUBLE, r_squared, "timeout", G_TYPE_UINT64, current_timeout,
603         "internal-time", G_TYPE_UINT64, internal_time, "external-time",
604         G_TYPE_UINT64, external_time, "rate-num", G_TYPE_UINT64, rate_num,
605         "rate-den", G_TYPE_UINT64, rate_den, "rate", G_TYPE_DOUBLE,
606         (gdouble) (rate_num) / rate_den, "local-clock-offset", G_TYPE_INT64,
607         GST_CLOCK_DIFF (internal_time, external_time), NULL);
608     msg = gst_message_new_element (GST_OBJECT (self), s);
609
610     for (l = self->busses; l; l = l->next)
611       gst_bus_post (l->data, gst_message_ref (msg));
612     gst_message_unref (msg);
613   }
614   GST_OBJECT_UNLOCK (self);
615
616   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
617   self->timeout_expiration = gst_util_get_timestamp () + current_timeout;
618
619   return;
620
621 bogus_observation:
622   /* Schedule a new packet again soon */
623   self->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
624   return;
625 }
626
627 static gpointer
628 gst_net_client_internal_clock_thread (gpointer data)
629 {
630   GstNetClientInternalClock *self = data;
631   GSocket *socket = self->socket;
632   GError *err = NULL;
633
634   GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
635
636   g_socket_set_blocking (socket, TRUE);
637   g_socket_set_timeout (socket, 0);
638
639   while (!g_cancellable_is_cancelled (self->cancel)) {
640     GstClockTime expiration_time = self->timeout_expiration;
641     GstClockTime now = gst_util_get_timestamp ();
642     gint64 socket_timeout;
643
644     if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
645       socket_timeout = 0;
646     } else {
647       socket_timeout = (expiration_time - now) / GST_USECOND;
648     }
649
650     GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
651
652     if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
653             self->cancel, &err)) {
654       /* cancelled, timeout or error */
655       if (err->code == G_IO_ERROR_CANCELLED) {
656         GST_INFO_OBJECT (self, "cancelled");
657         g_clear_error (&err);
658         break;
659       } else if (err->code == G_IO_ERROR_TIMED_OUT) {
660         /* timed out, let's send another packet */
661         GST_DEBUG_OBJECT (self, "timed out");
662
663         if (self->is_ntp) {
664           GstNtpPacket *packet;
665
666           packet = gst_ntp_packet_new (NULL, NULL);
667
668           packet->transmit_time =
669               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
670
671           GST_DEBUG_OBJECT (self,
672               "sending packet, local time = %" GST_TIME_FORMAT,
673               GST_TIME_ARGS (packet->transmit_time));
674
675           gst_ntp_packet_send (packet, self->socket, self->servaddr, NULL);
676
677           g_free (packet);
678         } else {
679           GstNetTimePacket *packet;
680
681           packet = gst_net_time_packet_new (NULL);
682
683           packet->local_time =
684               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
685
686           GST_DEBUG_OBJECT (self,
687               "sending packet, local time = %" GST_TIME_FORMAT,
688               GST_TIME_ARGS (packet->local_time));
689
690           gst_net_time_packet_send (packet, self->socket, self->servaddr, NULL);
691
692           g_free (packet);
693         }
694
695         /* reset timeout (but are expecting a response sooner anyway) */
696         self->timeout_expiration =
697             gst_util_get_timestamp () +
698             gst_clock_get_timeout (GST_CLOCK_CAST (self));
699       } else {
700         GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
701         g_usleep (G_USEC_PER_SEC / 10); /* throttle */
702       }
703       g_clear_error (&err);
704     } else {
705       GstClockTime new_local;
706
707       /* got packet */
708
709       new_local = gst_clock_get_internal_time (GST_CLOCK_CAST (self));
710
711       if (self->is_ntp) {
712         GstNtpPacket *packet;
713
714         packet = gst_ntp_packet_receive (socket, NULL, &err);
715
716         if (packet != NULL) {
717           GST_LOG_OBJECT (self, "got packet back");
718           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
719               GST_TIME_ARGS (packet->origin_time));
720           GST_LOG_OBJECT (self, "remote_1 = %" GST_TIME_FORMAT,
721               GST_TIME_ARGS (packet->receive_time));
722           GST_LOG_OBJECT (self, "remote_2 = %" GST_TIME_FORMAT,
723               GST_TIME_ARGS (packet->transmit_time));
724           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
725               GST_TIME_ARGS (new_local));
726           GST_LOG_OBJECT (self, "poll_interval = %" GST_TIME_FORMAT,
727               GST_TIME_ARGS (packet->poll_interval));
728
729           /* Remember the last poll interval we ever got from the server */
730           if (packet->poll_interval != GST_CLOCK_TIME_NONE)
731             self->last_remote_poll_interval = packet->poll_interval;
732
733           /* observe_times will reset the timeout */
734           gst_net_client_internal_clock_observe_times (self,
735               packet->origin_time, packet->receive_time, packet->transmit_time,
736               new_local);
737
738           g_free (packet);
739         } else if (err != NULL) {
740           if (g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION)
741               || g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY)) {
742             GST_ERROR_OBJECT (self, "fatal receive error: %s", err->message);
743             break;
744           } else if (g_error_matches (err, GST_NTP_ERROR,
745                   GST_NTP_ERROR_KOD_RATE)) {
746             GST_WARNING_OBJECT (self, "need to limit rate");
747
748             /* If the server did not tell us a poll interval before, double
749              * our minimum poll interval. Otherwise we assume that the server
750              * already told us something sensible and that this error here
751              * was just a spurious error */
752             if (self->last_remote_poll_interval == GST_CLOCK_TIME_NONE)
753               self->minimum_update_interval *= 2;
754
755             /* And wait a bit before we send the next packet instead of
756              * sending it immediately */
757             self->timeout_expiration =
758                 gst_util_get_timestamp () +
759                 gst_clock_get_timeout (GST_CLOCK_CAST (self));
760           } else {
761             GST_WARNING_OBJECT (self, "receive error: %s", err->message);
762           }
763           g_clear_error (&err);
764         }
765       } else {
766         GstNetTimePacket *packet;
767
768         packet = gst_net_time_packet_receive (socket, NULL, &err);
769
770         if (packet != NULL) {
771           GST_LOG_OBJECT (self, "got packet back");
772           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
773               GST_TIME_ARGS (packet->local_time));
774           GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
775               GST_TIME_ARGS (packet->remote_time));
776           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
777               GST_TIME_ARGS (new_local));
778
779           /* observe_times will reset the timeout */
780           gst_net_client_internal_clock_observe_times (self, packet->local_time,
781               packet->remote_time, packet->remote_time, new_local);
782
783           g_free (packet);
784         } else if (err != NULL) {
785           GST_WARNING_OBJECT (self, "receive error: %s", err->message);
786           g_clear_error (&err);
787         }
788       }
789     }
790   }
791   GST_INFO_OBJECT (self, "shutting down net client clock thread");
792   return NULL;
793 }
794
795 static gboolean
796 gst_net_client_internal_clock_start (GstNetClientInternalClock * self)
797 {
798   GSocketAddress *servaddr;
799   GSocketAddress *myaddr;
800   GSocketAddress *anyaddr;
801   GInetAddress *inetaddr;
802   GSocket *socket;
803   GError *error = NULL;
804   GSocketFamily family;
805   GPollFD dummy_pollfd;
806   GResolver *resolver = NULL;
807   GError *err = NULL;
808
809   g_return_val_if_fail (self->address != NULL, FALSE);
810   g_return_val_if_fail (self->servaddr == NULL, FALSE);
811
812   /* create target address */
813   inetaddr = g_inet_address_new_from_string (self->address);
814   if (inetaddr == NULL) {
815     GList *results;
816
817     resolver = g_resolver_get_default ();
818
819     results = g_resolver_lookup_by_name (resolver, self->address, NULL, &err);
820     if (!results)
821       goto failed_to_resolve;
822
823     inetaddr = G_INET_ADDRESS (g_object_ref (results->data));
824     g_resolver_free_addresses (results);
825     g_object_unref (resolver);
826   }
827
828   family = g_inet_address_get_family (inetaddr);
829
830   servaddr = g_inet_socket_address_new (inetaddr, self->port);
831   g_object_unref (inetaddr);
832
833   g_assert (servaddr != NULL);
834
835   GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address,
836       self->port);
837
838   socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
839       G_SOCKET_PROTOCOL_UDP, &error);
840
841   if (socket == NULL)
842     goto no_socket;
843
844   GST_DEBUG_OBJECT (self, "binding socket");
845   inetaddr = g_inet_address_new_any (family);
846   anyaddr = g_inet_socket_address_new (inetaddr, 0);
847   g_socket_bind (socket, anyaddr, TRUE, &error);
848   g_object_unref (anyaddr);
849   g_object_unref (inetaddr);
850
851   if (error != NULL)
852     goto bind_error;
853
854   /* check address we're bound to, mostly for debugging purposes */
855   myaddr = g_socket_get_local_address (socket, &error);
856
857   if (myaddr == NULL)
858     goto getsockname_error;
859
860   GST_DEBUG_OBJECT (self, "socket opened on UDP port %d",
861       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
862
863   g_object_unref (myaddr);
864
865   self->cancel = g_cancellable_new ();
866   self->made_cancel_fd =
867       g_cancellable_make_pollfd (self->cancel, &dummy_pollfd);
868
869   self->socket = socket;
870   self->servaddr = G_SOCKET_ADDRESS (servaddr);
871
872   self->thread = g_thread_try_new ("GstNetClientInternalClock",
873       gst_net_client_internal_clock_thread, self, &error);
874
875   if (error != NULL)
876     goto no_thread;
877
878   return TRUE;
879
880   /* ERRORS */
881 no_socket:
882   {
883     GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
884     g_error_free (error);
885     return FALSE;
886   }
887 bind_error:
888   {
889     GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
890     g_error_free (error);
891     g_object_unref (socket);
892     return FALSE;
893   }
894 getsockname_error:
895   {
896     GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
897     g_error_free (error);
898     g_object_unref (socket);
899     return FALSE;
900   }
901 failed_to_resolve:
902   {
903     GST_ERROR_OBJECT (self, "resolving '%s' failed: %s",
904         self->address, err->message);
905     g_clear_error (&err);
906     g_object_unref (resolver);
907     return FALSE;
908   }
909 no_thread:
910   {
911     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
912     g_object_unref (self->servaddr);
913     self->servaddr = NULL;
914     g_object_unref (self->socket);
915     self->socket = NULL;
916     g_error_free (error);
917     return FALSE;
918   }
919 }
920
921 static void
922 gst_net_client_internal_clock_stop (GstNetClientInternalClock * self)
923 {
924   if (self->thread == NULL)
925     return;
926
927   GST_INFO_OBJECT (self, "stopping...");
928   g_cancellable_cancel (self->cancel);
929
930   g_thread_join (self->thread);
931   self->thread = NULL;
932
933   if (self->made_cancel_fd)
934     g_cancellable_release_fd (self->cancel);
935
936   g_object_unref (self->cancel);
937   self->cancel = NULL;
938
939   g_object_unref (self->servaddr);
940   self->servaddr = NULL;
941
942   g_object_unref (self->socket);
943   self->socket = NULL;
944
945   GST_INFO_OBJECT (self, "stopped");
946 }
947
948 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
949   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
950
951 struct _GstNetClientClockPrivate
952 {
953   GstClock *internal_clock;
954
955   GstClockTime roundtrip_limit;
956   GstClockTime minimum_update_interval;
957
958   GstClockTime base_time;
959
960   gchar *address;
961   gint port;
962
963   GstBus *bus;
964
965   gboolean is_ntp;
966
967   gulong synced_id;
968 };
969
970 G_DEFINE_TYPE (GstNetClientClock, gst_net_client_clock, GST_TYPE_SYSTEM_CLOCK);
971
972 static void gst_net_client_clock_finalize (GObject * object);
973 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
974     const GValue * value, GParamSpec * pspec);
975 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
976     GValue * value, GParamSpec * pspec);
977 static void gst_net_client_clock_constructed (GObject * object);
978
979 static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock);
980
981 static void
982 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
983 {
984   GObjectClass *gobject_class;
985   GstClockClass *clock_class;
986
987   gobject_class = G_OBJECT_CLASS (klass);
988   clock_class = GST_CLOCK_CLASS (klass);
989
990   g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
991
992   gobject_class->finalize = gst_net_client_clock_finalize;
993   gobject_class->get_property = gst_net_client_clock_get_property;
994   gobject_class->set_property = gst_net_client_clock_set_property;
995   gobject_class->constructed = gst_net_client_clock_constructed;
996
997   g_object_class_install_property (gobject_class, PROP_ADDRESS,
998       g_param_spec_string ("address", "address",
999           "The IP address of the machine providing a time server",
1000           DEFAULT_ADDRESS,
1001           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1002   g_object_class_install_property (gobject_class, PROP_PORT,
1003       g_param_spec_int ("port", "port",
1004           "The port on which the remote server is listening", 0, G_MAXUINT16,
1005           DEFAULT_PORT,
1006           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1007   g_object_class_install_property (gobject_class, PROP_BUS,
1008       g_param_spec_object ("bus", "bus",
1009           "A GstBus on which to send clock status information", GST_TYPE_BUS,
1010           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1011
1012   /**
1013    * GstNetClientInternalClock::round-trip-limit:
1014    *
1015    * Maximum allowed round-trip for packets. If this property is set to a nonzero
1016    * value, all packets with a round-trip interval larger than this limit will be
1017    * ignored. This is useful for networks with severe and fluctuating transport
1018    * delays. Filtering out these packets increases stability of the synchronization.
1019    * On the other hand, the lower the limit, the higher the amount of filtered
1020    * packets. Empirical tests are typically necessary to estimate a good value
1021    * for the limit.
1022    * If the property is set to zero, the limit is disabled.
1023    *
1024    * Since: 1.4
1025    */
1026   g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
1027       g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
1028           "Maximum tolerable round-trip interval for packets, in nanoseconds "
1029           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
1030           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1031
1032   g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL,
1033       g_param_spec_uint64 ("minimum-update-interval", "minimum update interval",
1034           "Minimum polling interval for packets, in nanoseconds"
1035           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL,
1036           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1037
1038   g_object_class_install_property (gobject_class, PROP_BASE_TIME,
1039       g_param_spec_uint64 ("base-time", "Base Time",
1040           "Initial time that is reported before synchronization", 0,
1041           G_MAXUINT64, DEFAULT_BASE_TIME,
1042           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1043
1044   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
1045       g_param_spec_object ("internal-clock", "Internal Clock",
1046           "Internal clock that directly slaved to the remote clock",
1047           GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1048
1049   clock_class->get_internal_time = gst_net_client_clock_get_internal_time;
1050 }
1051
1052 static void
1053 gst_net_client_clock_init (GstNetClientClock * self)
1054 {
1055   GstNetClientClockPrivate *priv;
1056
1057   self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
1058
1059   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
1060   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
1061
1062   priv->port = DEFAULT_PORT;
1063   priv->address = g_strdup (DEFAULT_ADDRESS);
1064
1065   priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
1066   priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
1067   priv->base_time = DEFAULT_BASE_TIME;
1068 }
1069
1070 /* Must be called with clocks_lock */
1071 static void
1072 update_clock_cache (ClockCache * cache)
1073 {
1074   GstClockTime roundtrip_limit = 0, minimum_update_interval = 0;
1075   GList *l, *busses = NULL;
1076
1077   GST_OBJECT_LOCK (cache->clock);
1078   g_list_free_full (GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses,
1079       (GDestroyNotify) gst_object_unref);
1080
1081   for (l = cache->clocks; l; l = l->next) {
1082     GstNetClientClock *clock = l->data;
1083
1084     if (clock->priv->bus)
1085       busses = g_list_prepend (busses, gst_object_ref (clock->priv->bus));
1086
1087     if (roundtrip_limit == 0)
1088       roundtrip_limit = clock->priv->roundtrip_limit;
1089     else
1090       roundtrip_limit = MAX (roundtrip_limit, clock->priv->roundtrip_limit);
1091
1092     if (minimum_update_interval == 0)
1093       minimum_update_interval = clock->priv->minimum_update_interval;
1094     else
1095       minimum_update_interval =
1096           MIN (minimum_update_interval, clock->priv->minimum_update_interval);
1097   }
1098   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses = busses;
1099   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->roundtrip_limit =
1100       roundtrip_limit;
1101   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->minimum_update_interval =
1102       minimum_update_interval;
1103
1104   GST_OBJECT_UNLOCK (cache->clock);
1105 }
1106
1107 static gboolean
1108 remove_clock_cache (GstClock * clock, GstClockTime time, GstClockID id,
1109     gpointer user_data)
1110 {
1111   ClockCache *cache = user_data;
1112
1113   G_LOCK (clocks_lock);
1114   if (!cache->clocks) {
1115     gst_clock_id_unref (cache->remove_id);
1116     gst_object_unref (cache->clock);
1117     g_free (cache);
1118     clocks = g_list_remove (clocks, cache);
1119   }
1120   G_UNLOCK (clocks_lock);
1121
1122   return TRUE;
1123 }
1124
1125 static void
1126 gst_net_client_clock_finalize (GObject * object)
1127 {
1128   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1129   GList *l;
1130
1131   if (self->priv->synced_id)
1132     g_signal_handler_disconnect (self, self->priv->synced_id);
1133   self->priv->synced_id = 0;
1134
1135   G_LOCK (clocks_lock);
1136   for (l = clocks; l; l = l->next) {
1137     ClockCache *cache = l->data;
1138
1139     if (cache->clock == self->priv->internal_clock) {
1140       cache->clocks = g_list_remove (cache->clocks, self);
1141
1142       if (cache->clocks) {
1143         update_clock_cache (cache);
1144       } else {
1145         GstClock *sysclock = gst_system_clock_obtain ();
1146         GstClockTime time = gst_clock_get_time (sysclock) + 60 * GST_SECOND;
1147
1148         cache->remove_id = gst_clock_new_single_shot_id (sysclock, time);
1149         gst_clock_id_wait_async (cache->remove_id, remove_clock_cache, cache,
1150             NULL);
1151         gst_object_unref (sysclock);
1152       }
1153       break;
1154     }
1155   }
1156   G_UNLOCK (clocks_lock);
1157
1158   g_free (self->priv->address);
1159   self->priv->address = NULL;
1160
1161   if (self->priv->bus != NULL) {
1162     gst_object_unref (self->priv->bus);
1163     self->priv->bus = NULL;
1164   }
1165
1166   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->finalize (object);
1167 }
1168
1169 static void
1170 gst_net_client_clock_set_property (GObject * object, guint prop_id,
1171     const GValue * value, GParamSpec * pspec)
1172 {
1173   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1174   gboolean update = FALSE;
1175
1176   switch (prop_id) {
1177     case PROP_ADDRESS:
1178       GST_OBJECT_LOCK (self);
1179       g_free (self->priv->address);
1180       self->priv->address = g_value_dup_string (value);
1181       if (self->priv->address == NULL)
1182         self->priv->address = g_strdup (DEFAULT_ADDRESS);
1183       GST_OBJECT_UNLOCK (self);
1184       break;
1185     case PROP_PORT:
1186       GST_OBJECT_LOCK (self);
1187       self->priv->port = g_value_get_int (value);
1188       GST_OBJECT_UNLOCK (self);
1189       break;
1190     case PROP_ROUNDTRIP_LIMIT:
1191       GST_OBJECT_LOCK (self);
1192       self->priv->roundtrip_limit = g_value_get_uint64 (value);
1193       GST_OBJECT_UNLOCK (self);
1194       update = TRUE;
1195       break;
1196     case PROP_MINIMUM_UPDATE_INTERVAL:
1197       GST_OBJECT_LOCK (self);
1198       self->priv->minimum_update_interval = g_value_get_uint64 (value);
1199       GST_OBJECT_UNLOCK (self);
1200       update = TRUE;
1201       break;
1202     case PROP_BUS:
1203       GST_OBJECT_LOCK (self);
1204       if (self->priv->bus)
1205         gst_object_unref (self->priv->bus);
1206       self->priv->bus = g_value_dup_object (value);
1207       GST_OBJECT_UNLOCK (self);
1208       update = TRUE;
1209       break;
1210     case PROP_BASE_TIME:
1211       self->priv->base_time = g_value_get_uint64 (value);
1212       break;
1213     default:
1214       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1215       break;
1216   }
1217
1218   if (update && self->priv->internal_clock) {
1219     GList *l;
1220
1221     G_LOCK (clocks_lock);
1222     for (l = clocks; l; l = l->next) {
1223       ClockCache *cache = l->data;
1224
1225       if (cache->clock == self->priv->internal_clock) {
1226         update_clock_cache (cache);
1227       }
1228     }
1229     G_UNLOCK (clocks_lock);
1230   }
1231 }
1232
1233 static void
1234 gst_net_client_clock_get_property (GObject * object, guint prop_id,
1235     GValue * value, GParamSpec * pspec)
1236 {
1237   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1238
1239   switch (prop_id) {
1240     case PROP_ADDRESS:
1241       GST_OBJECT_LOCK (self);
1242       g_value_set_string (value, self->priv->address);
1243       GST_OBJECT_UNLOCK (self);
1244       break;
1245     case PROP_PORT:
1246       g_value_set_int (value, self->priv->port);
1247       break;
1248     case PROP_ROUNDTRIP_LIMIT:
1249       GST_OBJECT_LOCK (self);
1250       g_value_set_uint64 (value, self->priv->roundtrip_limit);
1251       GST_OBJECT_UNLOCK (self);
1252       break;
1253     case PROP_MINIMUM_UPDATE_INTERVAL:
1254       GST_OBJECT_LOCK (self);
1255       g_value_set_uint64 (value, self->priv->minimum_update_interval);
1256       GST_OBJECT_UNLOCK (self);
1257       break;
1258     case PROP_BUS:
1259       GST_OBJECT_LOCK (self);
1260       g_value_set_object (value, self->priv->bus);
1261       GST_OBJECT_UNLOCK (self);
1262       break;
1263     case PROP_BASE_TIME:
1264       g_value_set_uint64 (value, self->priv->base_time);
1265       break;
1266     case PROP_INTERNAL_CLOCK:
1267       g_value_set_object (value, self->priv->internal_clock);
1268       break;
1269     default:
1270       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1271       break;
1272   }
1273 }
1274
1275 static void
1276 gst_net_client_clocked_synced_cb (GstClock * internal_clock, gboolean synced,
1277     GstClock * self)
1278 {
1279   gst_clock_set_synced (self, synced);
1280 }
1281
1282 static void
1283 gst_net_client_clock_constructed (GObject * object)
1284 {
1285   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1286   GstClock *internal_clock;
1287   GstClockTime internal;
1288   GList *l;
1289   ClockCache *cache = NULL;
1290
1291   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->constructed (object);
1292
1293   G_LOCK (clocks_lock);
1294   for (l = clocks; l; l = l->next) {
1295     ClockCache *tmp = l->data;
1296     GstNetClientInternalClock *internal_clock =
1297         GST_NET_CLIENT_INTERNAL_CLOCK (tmp->clock);
1298
1299     if (strcmp (internal_clock->address, self->priv->address) == 0 &&
1300         internal_clock->port == self->priv->port) {
1301       cache = tmp;
1302
1303       if (cache->remove_id) {
1304         gst_clock_id_unschedule (cache->remove_id);
1305         cache->remove_id = NULL;
1306       }
1307       break;
1308     }
1309   }
1310
1311   if (!cache) {
1312     cache = g_new0 (ClockCache, 1);
1313
1314     cache->clock =
1315         g_object_new (GST_TYPE_NET_CLIENT_INTERNAL_CLOCK, "address",
1316         self->priv->address, "port", self->priv->port, "is-ntp",
1317         self->priv->is_ntp, NULL);
1318     clocks = g_list_prepend (clocks, cache);
1319   }
1320
1321   cache->clocks = g_list_prepend (cache->clocks, self);
1322
1323   GST_OBJECT_LOCK (cache->clock);
1324   if (gst_clock_is_synced (cache->clock))
1325     gst_clock_set_synced (GST_CLOCK (self), TRUE);
1326   self->priv->synced_id =
1327       g_signal_connect (cache->clock, "synced",
1328       G_CALLBACK (gst_net_client_clocked_synced_cb), self);
1329   GST_OBJECT_UNLOCK (cache->clock);
1330
1331   G_UNLOCK (clocks_lock);
1332
1333   self->priv->internal_clock = internal_clock = cache->clock;
1334
1335   /* gst_clock_get_time() values are guaranteed to be increasing. because no one
1336    * has called get_time on this clock yet we are free to adjust to any value
1337    * without worrying about worrying about MAX() issues with the clock's
1338    * internal time.
1339    */
1340
1341   /* update our internal time so get_time() give something around base_time.
1342      assume that the rate is 1 in the beginning. */
1343   internal = gst_clock_get_internal_time (internal_clock);
1344   gst_clock_set_calibration (internal_clock, internal,
1345       self->priv->base_time, 1, 1);
1346
1347   {
1348     GstClockTime now = gst_clock_get_time (internal_clock);
1349
1350     if (GST_CLOCK_DIFF (now, self->priv->base_time) > 0 ||
1351         GST_CLOCK_DIFF (now, self->priv->base_time + GST_SECOND) < 0) {
1352       g_warning ("unable to set the base time, expect sync problems!");
1353     }
1354   }
1355
1356   /* all systems go, cap'n */
1357 }
1358
1359 static GstClockTime
1360 gst_net_client_clock_get_internal_time (GstClock * clock)
1361 {
1362   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock);
1363
1364   return gst_clock_get_time (self->priv->internal_clock);
1365 }
1366
1367 /**
1368  * gst_net_client_clock_new:
1369  * @name: a name for the clock
1370  * @remote_address: the address or hostname of the remote clock provider
1371  * @remote_port: the port of the remote clock provider
1372  * @base_time: initial time of the clock
1373  *
1374  * Create a new #GstNetClientInternalClock that will report the time
1375  * provided by the #GstNetTimeProvider on @remote_address and 
1376  * @remote_port.
1377  *
1378  * Returns: a new #GstClock that receives a time from the remote
1379  * clock.
1380  */
1381 GstClock *
1382 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
1383     gint remote_port, GstClockTime base_time)
1384 {
1385   GstClock *ret;
1386
1387   g_return_val_if_fail (remote_address != NULL, NULL);
1388   g_return_val_if_fail (remote_port > 0, NULL);
1389   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1390   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1391
1392   ret = g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "address", remote_address,
1393       "port", remote_port, "base-time", base_time, NULL);
1394
1395   return ret;
1396 }
1397
1398 G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_SYSTEM_CLOCK);
1399
1400 static void
1401 gst_ntp_clock_class_init (GstNtpClockClass * klass)
1402 {
1403 }
1404
1405 static void
1406 gst_ntp_clock_init (GstNtpClock * self)
1407 {
1408   GST_NET_CLIENT_CLOCK (self)->priv->is_ntp = TRUE;
1409 }
1410
1411 /**
1412  * gst_ntp_clock_new:
1413  * @name: a name for the clock
1414  * @remote_address: the address or hostname of the remote clock provider
1415  * @remote_port: the port of the remote clock provider
1416  * @base_time: initial time of the clock
1417  *
1418  * Create a new #GstNtpClock that will report the time provided by
1419  * the NTPv4 server on @remote_address and @remote_port.
1420  *
1421  * Returns: a new #GstClock that receives a time from the remote
1422  * clock.
1423  *
1424  * Since: 1.6
1425  */
1426 GstClock *
1427 gst_ntp_clock_new (const gchar * name, const gchar * remote_address,
1428     gint remote_port, GstClockTime base_time)
1429 {
1430   GstClock *ret;
1431
1432   g_return_val_if_fail (remote_address != NULL, NULL);
1433   g_return_val_if_fail (remote_port > 0, NULL);
1434   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1435   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1436
1437   ret = g_object_new (GST_TYPE_NTP_CLOCK, "address", remote_address,
1438       "port", remote_port, "base-time", base_time, NULL);
1439
1440   return ret;
1441 }