netclock: Make the RTT average ignore large values more forcefully.
[platform/upstream/gstreamer.git] / libs / gst / net / gstnetclientclock.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2005 Andy Wingo <wingo@pobox.com>
5  * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6  *
7  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
8  * the network
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25 /**
26  * SECTION:gstnetclientclock
27  * @short_description: Special clock that synchronizes to a remote time
28  *                     provider.
29  * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
30  *
31  * This object implements a custom #GstClock that synchronizes its time
32  * to a remote time provider such as #GstNetTimeProvider.
33  *
34  * A new clock is created with gst_net_client_clock_new() which takes the
35  * address and port of the remote time provider along with a name and
36  * an initial time.
37  *
38  * This clock will poll the time provider and will update its calibration
39  * parameters based on the local and remote observations.
40  *
41  * The "round-trip" property limits the maximum round trip packets can take.
42  *
43  * Various parameters of the clock can be configured with the parent #GstClock
44  * "timeout", "window-size" and "window-threshold" object properties.
45  *
46  * A #GstNetClientClock is typically set on a #GstPipeline with 
47  * gst_pipeline_use_clock().
48  *
49  * If you set a #GstBus on the clock via the "bus" object property, it will
50  * send @GST_MESSAGE_ELEMENT messages with an attached #GstStructure containing
51  * statistics about clock accuracy and network traffic.
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include "gstnettimepacket.h"
59 #include "gstnetclientclock.h"
60
61 #include <gio/gio.h>
62
63 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
64 #define GST_CAT_DEFAULT (ncc_debug)
65
66 #define DEFAULT_ADDRESS         "127.0.0.1"
67 #define DEFAULT_PORT            5637
68 #define DEFAULT_TIMEOUT         GST_SECOND
69 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
70 /* Minimum timeout will be immediately (ie, as fast as one RTT), but no
71  * more often than 1/20th second (arbitrarily, to spread observations a little) */
72 #define DEFAULT_MINIMUM_UPDATE_INTERVAL (GST_SECOND / 20)
73
74 enum
75 {
76   PROP_0,
77   PROP_ADDRESS,
78   PROP_PORT,
79   PROP_ROUNDTRIP_LIMIT,
80   PROP_MINIMUM_UPDATE_INTERVAL,
81   PROP_BUS
82 };
83
84 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
85   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
86
87 struct _GstNetClientClockPrivate
88 {
89   GThread *thread;
90
91   GSocket *socket;
92   GSocketAddress *servaddr;
93   GCancellable *cancel;
94
95   GstClockTime timeout_expiration;
96   GstClockTime roundtrip_limit;
97   GstClockTime rtt_avg;
98   GstClockTime minimum_update_interval;
99
100   gchar *address;
101   gint port;
102
103   GstBus *bus;
104 };
105
106 #define _do_init \
107   GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
108 #define gst_net_client_clock_parent_class parent_class
109 G_DEFINE_TYPE_WITH_CODE (GstNetClientClock, gst_net_client_clock,
110     GST_TYPE_SYSTEM_CLOCK, _do_init);
111
112 static void gst_net_client_clock_finalize (GObject * object);
113 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
114     const GValue * value, GParamSpec * pspec);
115 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
116     GValue * value, GParamSpec * pspec);
117
118 static void gst_net_client_clock_stop (GstNetClientClock * self);
119
120 static void
121 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
122 {
123   GObjectClass *gobject_class;
124
125   gobject_class = G_OBJECT_CLASS (klass);
126
127   g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
128
129   gobject_class->finalize = gst_net_client_clock_finalize;
130   gobject_class->get_property = gst_net_client_clock_get_property;
131   gobject_class->set_property = gst_net_client_clock_set_property;
132
133   g_object_class_install_property (gobject_class, PROP_ADDRESS,
134       g_param_spec_string ("address", "address",
135           "The IP address of the machine providing a time server",
136           DEFAULT_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
137   g_object_class_install_property (gobject_class, PROP_PORT,
138       g_param_spec_int ("port", "port",
139           "The port on which the remote server is listening", 0, G_MAXUINT16,
140           DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
141   g_object_class_install_property (gobject_class, PROP_BUS,
142       g_param_spec_object ("bus", "bus",
143           "A GstBus on which to send clock status information", GST_TYPE_BUS,
144           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
145
146   /**
147    * GstNetClientClock::round-trip-limit:
148    *
149    * Maximum allowed round-trip for packets. If this property is set to a nonzero
150    * value, all packets with a round-trip interval larger than this limit will be
151    * ignored. This is useful for networks with severe and fluctuating transport
152    * delays. Filtering out these packets increases stability of the synchronization.
153    * On the other hand, the lower the limit, the higher the amount of filtered
154    * packets. Empirical tests are typically necessary to estimate a good value
155    * for the limit.
156    * If the property is set to zero, the limit is disabled.
157    *
158    * Since: 1.4
159    */
160   g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
161       g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
162           "Maximum tolerable round-trip interval for packets, in nanoseconds "
163           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
164           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
165
166   g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL,
167       g_param_spec_uint64 ("minimum-update-interval", "minimum update interval",
168           "Minimum polling interval for packets, in nanoseconds"
169           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL,
170           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
171 }
172
173 static void
174 gst_net_client_clock_init (GstNetClientClock * self)
175 {
176   GstClock *clock = GST_CLOCK_CAST (self);
177   GstNetClientClockPrivate *priv;
178
179   self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
180
181   priv->port = DEFAULT_PORT;
182   priv->address = g_strdup (DEFAULT_ADDRESS);
183
184   gst_clock_set_timeout (clock, DEFAULT_TIMEOUT);
185
186   priv->thread = NULL;
187
188   priv->servaddr = NULL;
189   priv->rtt_avg = GST_CLOCK_TIME_NONE;
190   priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
191   priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
192 }
193
194 static void
195 gst_net_client_clock_finalize (GObject * object)
196 {
197   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
198
199   if (self->priv->thread) {
200     gst_net_client_clock_stop (self);
201   }
202
203   g_free (self->priv->address);
204   self->priv->address = NULL;
205
206   if (self->priv->servaddr != NULL) {
207     g_object_unref (self->priv->servaddr);
208     self->priv->servaddr = NULL;
209   }
210
211   if (self->priv->socket != NULL) {
212     g_socket_close (self->priv->socket, NULL);
213     g_object_unref (self->priv->socket);
214     self->priv->socket = NULL;
215   }
216
217   if (self->priv->bus != NULL) {
218     gst_object_unref (self->priv->bus);
219     self->priv->bus = NULL;
220   }
221
222   G_OBJECT_CLASS (parent_class)->finalize (object);
223 }
224
225 static void
226 gst_net_client_clock_set_property (GObject * object, guint prop_id,
227     const GValue * value, GParamSpec * pspec)
228 {
229   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
230
231   switch (prop_id) {
232     case PROP_ADDRESS:
233       GST_OBJECT_LOCK (self);
234       g_free (self->priv->address);
235       self->priv->address = g_value_dup_string (value);
236       if (self->priv->address == NULL)
237         self->priv->address = g_strdup (DEFAULT_ADDRESS);
238       GST_OBJECT_UNLOCK (self);
239       break;
240     case PROP_PORT:
241       GST_OBJECT_LOCK (self);
242       self->priv->port = g_value_get_int (value);
243       GST_OBJECT_UNLOCK (self);
244       break;
245     case PROP_ROUNDTRIP_LIMIT:
246       GST_OBJECT_LOCK (self);
247       self->priv->roundtrip_limit = g_value_get_uint64 (value);
248       GST_OBJECT_UNLOCK (self);
249       break;
250     case PROP_MINIMUM_UPDATE_INTERVAL:
251       GST_OBJECT_LOCK (self);
252       self->priv->minimum_update_interval = g_value_get_uint64 (value);
253       GST_OBJECT_UNLOCK (self);
254       break;
255     case PROP_BUS:
256       GST_OBJECT_LOCK (self);
257       if (self->priv->bus)
258         gst_object_unref (self->priv->bus);
259       self->priv->bus = g_value_dup_object (value);
260       GST_OBJECT_UNLOCK (self);
261       break;
262     default:
263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
264       break;
265   }
266 }
267
268 static void
269 gst_net_client_clock_get_property (GObject * object, guint prop_id,
270     GValue * value, GParamSpec * pspec)
271 {
272   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
273
274   switch (prop_id) {
275     case PROP_ADDRESS:
276       GST_OBJECT_LOCK (self);
277       g_value_set_string (value, self->priv->address);
278       GST_OBJECT_UNLOCK (self);
279       break;
280     case PROP_PORT:
281       g_value_set_int (value, self->priv->port);
282       break;
283     case PROP_ROUNDTRIP_LIMIT:
284       GST_OBJECT_LOCK (self);
285       g_value_set_uint64 (value, self->priv->roundtrip_limit);
286       GST_OBJECT_UNLOCK (self);
287       break;
288     case PROP_MINIMUM_UPDATE_INTERVAL:
289       GST_OBJECT_LOCK (self);
290       g_value_set_uint64 (value, self->priv->minimum_update_interval);
291       GST_OBJECT_UNLOCK (self);
292       break;
293     case PROP_BUS:
294       GST_OBJECT_LOCK (self);
295       g_value_set_object (value, self->priv->bus);
296       GST_OBJECT_UNLOCK (self);
297       break;
298     default:
299       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
300       break;
301   }
302 }
303
304 static void
305 gst_net_client_clock_observe_times (GstNetClientClock * self,
306     GstClockTime local_1, GstClockTime remote, GstClockTime local_2)
307 {
308   GstNetClientClockPrivate *priv = self->priv;
309   GstClockTime current_timeout;
310   GstClockTime local_avg;
311   gdouble r_squared;
312   GstClock *clock;
313   GstClockTime rtt, rtt_limit, min_update_interval;
314   GstBus *bus = NULL;
315   /* Use for discont tracking */
316   GstClockTime time_before = 0;
317   GstClockTime min_guess = 0;
318   GstClockTimeDiff time_discont = 0;
319   gboolean synched;
320   GstClockTime internal_time, external_time, rate_num, rate_den;
321   GstClockTime max_discont;
322
323   GST_OBJECT_LOCK (self);
324   rtt_limit = self->priv->roundtrip_limit;
325   min_update_interval = self->priv->minimum_update_interval;
326   if (self->priv->bus)
327     bus = gst_object_ref (self->priv->bus);
328   GST_OBJECT_UNLOCK (self);
329
330   if (local_2 < local_1) {
331     GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
332         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
333         GST_TIME_ARGS (local_2));
334     goto bogus_observation;
335   }
336
337   rtt = GST_CLOCK_DIFF (local_1, local_2);
338
339   if ((rtt_limit > 0) && (rtt > rtt_limit)) {
340     GST_LOG_OBJECT (self,
341         "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
342         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
343     goto bogus_observation;
344   }
345
346   /* Track an average round trip time, for a bit of smoothing */
347   /* Always update before discarding a sample, so genuine changes in
348    * the network get picked up, eventually */
349   if (priv->rtt_avg == GST_CLOCK_TIME_NONE)
350     priv->rtt_avg = rtt;
351   else if (rtt < priv->rtt_avg) /* Shorter RTTs carry more weight than longer */
352     priv->rtt_avg = (3 * priv->rtt_avg + rtt) / 4;
353   else
354     priv->rtt_avg = (15 * priv->rtt_avg + rtt) / 16;
355
356   if (rtt > 2 * priv->rtt_avg) {
357     GST_LOG_OBJECT (self,
358         "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
359         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (priv->rtt_avg));
360     goto bogus_observation;
361   }
362
363   local_avg = (local_2 + local_1) / 2;
364
365   GST_LOG_OBJECT (self, "local1 %" G_GUINT64_FORMAT " remote %" G_GUINT64_FORMAT
366       " localavg %" G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT,
367       local_1, remote, local_avg, local_2);
368
369   clock = GST_CLOCK_CAST (self);
370
371   /* Store what the clock produced as 'now' before this update */
372   gst_clock_get_calibration (GST_CLOCK (self), &internal_time, &external_time,
373       &rate_num, &rate_den);
374
375   min_guess =
376       gst_clock_adjust_with_calibration (GST_CLOCK (self), local_1,
377       internal_time, external_time, rate_num, rate_den);
378   time_before =
379       gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2,
380       internal_time, external_time, rate_num, rate_den);
381
382   /* Maximum discontinuity, when we're synched with the master. Could make this a property,
383    * but this value seems to work fine */
384   max_discont = priv->rtt_avg / 4;
385
386   /* If the remote observation was within 1/4 RTT of our min/max estimates, we're synched */
387   synched =
388       (GST_CLOCK_DIFF (remote, min_guess) < (GstClockTimeDiff) (max_discont)
389       && GST_CLOCK_DIFF (time_before,
390           remote) < (GstClockTimeDiff) (max_discont));
391
392   if (gst_clock_add_observation_unapplied (GST_CLOCK (self), local_avg, remote,
393           &r_squared, &internal_time, &external_time, &rate_num, &rate_den)) {
394
395     /* Now compare the difference (discont) in the clock
396      * after this observation */
397     time_discont = GST_CLOCK_DIFF (time_before,
398         gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2,
399             internal_time, external_time, rate_num, rate_den));
400
401     /* If we were in sync with the remote clock, clamp the allowed
402      * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
403      * of remote time correctly windowed the actual remote time observation */
404     if (synched && ABS (time_discont) > max_discont) {
405       GstClockTimeDiff offset;
406       GST_DEBUG_OBJECT (clock,
407           "Too large a discont, clamping to 1/4 average RTT = %"
408           GST_TIME_FORMAT, GST_TIME_ARGS (max_discont));
409       if (time_discont > 0) {   /* Too large a forward step - add a -ve offset */
410         offset = max_discont - time_discont;
411         if (-offset > external_time)
412           external_time = 0;
413         else
414           external_time += offset;
415       } else {                  /* Too large a backward step - add a +ve offset */
416         offset = -(max_discont + time_discont);
417         external_time += offset;
418       }
419
420       time_discont += offset;
421     }
422
423     gst_clock_set_calibration (GST_CLOCK (self), internal_time, external_time,
424         rate_num, rate_den);
425
426     /* ghetto formula - shorter timeout for bad correlations */
427     current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
428     current_timeout = MIN (current_timeout, gst_clock_get_timeout (clock));
429   } else {
430     /* No correlation, short timeout when starting up or lost sync */
431     current_timeout = 0;
432   }
433
434   /* Limit the polling to at most one per minimum_update_interval */
435   if (rtt < min_update_interval)
436     current_timeout = MAX (min_update_interval - rtt, current_timeout);
437
438   if (bus) {
439     GstStructure *s;
440     GstMessage *msg;
441
442     /* Output a stats message, whether we updated the clock or not */
443     s = gst_structure_new ("gst-netclock-statistics",
444         "synchronised", G_TYPE_BOOLEAN, synched,
445         "rtt", G_TYPE_UINT64, rtt,
446         "rtt-average", G_TYPE_UINT64, priv->rtt_avg,
447         "local", G_TYPE_UINT64, local_avg,
448         "remote", G_TYPE_UINT64, remote,
449         "discontinuity", G_TYPE_INT64, time_discont,
450         "remote-min-estimate", G_TYPE_UINT64, min_guess,
451         "remote-max-estimate", G_TYPE_UINT64, time_before,
452         "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, min_guess),
453         "remote-max-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, time_before),
454         "request-send", G_TYPE_UINT64, local_1,
455         "request-receive", G_TYPE_UINT64, local_2,
456         "r-squared", G_TYPE_DOUBLE, r_squared,
457         "timeout", G_TYPE_UINT64, current_timeout,
458         "internal-time", G_TYPE_UINT64, internal_time,
459         "external-time", G_TYPE_UINT64, external_time,
460         "rate-num", G_TYPE_UINT64, rate_num,
461         "rate-den", G_TYPE_UINT64, rate_den,
462         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
463         "local-clock-offset", G_TYPE_INT64, GST_CLOCK_DIFF (internal_time,
464             external_time), NULL);
465     msg = gst_message_new_element (GST_OBJECT (self), s);
466     gst_bus_post (bus, msg);
467   }
468
469   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
470   self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout;
471
472   if (bus)
473     gst_object_unref (bus);
474   return;
475
476 bogus_observation:
477   if (bus)
478     gst_object_unref (bus);
479   /* Schedule a new packet again soon */
480   self->priv->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
481   return;
482 }
483
484 static gpointer
485 gst_net_client_clock_thread (gpointer data)
486 {
487   GstNetClientClock *self = data;
488   GstNetTimePacket *packet;
489   GSocket *socket = self->priv->socket;
490   GError *err = NULL;
491   GstClock *clock = data;
492
493   GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
494
495   g_socket_set_blocking (socket, TRUE);
496   g_socket_set_timeout (socket, 0);
497
498   while (!g_cancellable_is_cancelled (self->priv->cancel)) {
499     GstClockTime expiration_time = self->priv->timeout_expiration;
500     GstClockTime now = gst_util_get_timestamp ();
501     gint64 socket_timeout;
502
503     if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
504       socket_timeout = 0;
505     } else {
506       socket_timeout = (expiration_time - now) / GST_USECOND;
507     }
508
509     GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
510
511     if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
512             self->priv->cancel, &err)) {
513       /* cancelled, timeout or error */
514       if (err->code == G_IO_ERROR_CANCELLED) {
515         GST_INFO_OBJECT (self, "cancelled");
516         g_clear_error (&err);
517         break;
518       } else if (err->code == G_IO_ERROR_TIMED_OUT) {
519         /* timed out, let's send another packet */
520         GST_DEBUG_OBJECT (self, "timed out");
521
522         packet = gst_net_time_packet_new (NULL);
523
524         packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
525
526         GST_DEBUG_OBJECT (self,
527             "sending packet, local time = %" GST_TIME_FORMAT,
528             GST_TIME_ARGS (packet->local_time));
529
530         gst_net_time_packet_send (packet, self->priv->socket,
531             self->priv->servaddr, NULL);
532
533         g_free (packet);
534
535         /* reset timeout (but are expecting a response sooner anyway) */
536         self->priv->timeout_expiration =
537             gst_util_get_timestamp () + gst_clock_get_timeout (clock);
538       } else {
539         GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
540         g_usleep (G_USEC_PER_SEC / 10); /* throttle */
541       }
542       g_clear_error (&err);
543     } else {
544       GstClockTime new_local;
545
546       /* got packet */
547
548       new_local = gst_clock_get_internal_time (GST_CLOCK (self));
549
550       packet = gst_net_time_packet_receive (socket, NULL, &err);
551
552       if (packet != NULL) {
553         GST_LOG_OBJECT (self, "got packet back");
554         GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
555             GST_TIME_ARGS (packet->local_time));
556         GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
557             GST_TIME_ARGS (packet->remote_time));
558         GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
559             GST_TIME_ARGS (new_local));
560
561         /* observe_times will reset the timeout */
562         gst_net_client_clock_observe_times (self, packet->local_time,
563             packet->remote_time, new_local);
564
565         g_free (packet);
566       } else if (err != NULL) {
567         GST_WARNING_OBJECT (self, "receive error: %s", err->message);
568         g_clear_error (&err);
569       }
570     }
571   }
572   GST_INFO_OBJECT (self, "shutting down net client clock thread");
573   return NULL;
574 }
575
576 static gboolean
577 gst_net_client_clock_start (GstNetClientClock * self)
578 {
579   GSocketAddress *servaddr;
580   GSocketAddress *myaddr;
581   GSocketAddress *anyaddr;
582   GInetAddress *inetaddr;
583   GSocket *socket;
584   GError *error = NULL;
585   GSocketFamily family;
586
587   g_return_val_if_fail (self->priv->address != NULL, FALSE);
588   g_return_val_if_fail (self->priv->servaddr == NULL, FALSE);
589
590   /* create target address */
591   inetaddr = g_inet_address_new_from_string (self->priv->address);
592   if (inetaddr == NULL)
593     goto bad_address;
594
595   family = g_inet_address_get_family (inetaddr);
596
597   servaddr = g_inet_socket_address_new (inetaddr, self->priv->port);
598   g_object_unref (inetaddr);
599
600   g_assert (servaddr != NULL);
601
602   GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->priv->address,
603       self->priv->port);
604
605   socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
606       G_SOCKET_PROTOCOL_UDP, &error);
607
608   if (socket == NULL)
609     goto no_socket;
610
611   GST_DEBUG_OBJECT (self, "binding socket");
612   inetaddr = g_inet_address_new_any (family);
613   anyaddr = g_inet_socket_address_new (inetaddr, 0);
614   g_socket_bind (socket, anyaddr, TRUE, &error);
615   g_object_unref (anyaddr);
616   g_object_unref (inetaddr);
617
618   if (error != NULL)
619     goto bind_error;
620
621   /* check address we're bound to, mostly for debugging purposes */
622   myaddr = g_socket_get_local_address (socket, &error);
623
624   if (myaddr == NULL)
625     goto getsockname_error;
626
627   GST_DEBUG_OBJECT (self, "socket opened on UDP port %hd",
628       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
629
630   g_object_unref (myaddr);
631
632   self->priv->cancel = g_cancellable_new ();
633   self->priv->socket = socket;
634   self->priv->servaddr = G_SOCKET_ADDRESS (servaddr);
635
636   self->priv->thread = g_thread_try_new ("GstNetClientClock",
637       gst_net_client_clock_thread, self, &error);
638
639   if (error != NULL)
640     goto no_thread;
641
642   return TRUE;
643
644   /* ERRORS */
645 no_socket:
646   {
647     GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
648     g_error_free (error);
649     return FALSE;
650   }
651 bind_error:
652   {
653     GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
654     g_error_free (error);
655     g_object_unref (socket);
656     return FALSE;
657   }
658 getsockname_error:
659   {
660     GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
661     g_error_free (error);
662     g_object_unref (socket);
663     return FALSE;
664   }
665 bad_address:
666   {
667     GST_ERROR_OBJECT (self, "inet_address_new_from_string('%s') failed",
668         self->priv->address);
669     return FALSE;
670   }
671 no_thread:
672   {
673     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
674     g_object_unref (self->priv->servaddr);
675     self->priv->servaddr = NULL;
676     g_object_unref (self->priv->socket);
677     self->priv->socket = NULL;
678     g_error_free (error);
679     return FALSE;
680   }
681 }
682
683 static void
684 gst_net_client_clock_stop (GstNetClientClock * self)
685 {
686   if (self->priv->thread == NULL)
687     return;
688
689   GST_INFO_OBJECT (self, "stopping...");
690   g_cancellable_cancel (self->priv->cancel);
691
692   g_thread_join (self->priv->thread);
693   self->priv->thread = NULL;
694
695   g_object_unref (self->priv->cancel);
696   self->priv->cancel = NULL;
697
698   g_object_unref (self->priv->servaddr);
699   self->priv->servaddr = NULL;
700
701   g_object_unref (self->priv->socket);
702   self->priv->socket = NULL;
703
704   GST_INFO_OBJECT (self, "stopped");
705 }
706
707 /**
708  * gst_net_client_clock_new:
709  * @name: a name for the clock
710  * @remote_address: the address of the remote clock provider
711  * @remote_port: the port of the remote clock provider
712  * @base_time: initial time of the clock
713  *
714  * Create a new #GstNetClientClock that will report the time
715  * provided by the #GstNetTimeProvider on @remote_address and 
716  * @remote_port.
717  *
718  * Returns: a new #GstClock that receives a time from the remote
719  * clock.
720  */
721 GstClock *
722 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
723     gint remote_port, GstClockTime base_time)
724 {
725   /* FIXME: gst_net_client_clock_new() should be a thin wrapper for g_object_new() */
726   GstNetClientClock *ret;
727   GstClockTime internal;
728
729   g_return_val_if_fail (remote_address != NULL, NULL);
730   g_return_val_if_fail (remote_port > 0, NULL);
731   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
732   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
733
734   ret = g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "address", remote_address,
735       "port", remote_port, NULL);
736
737   /* gst_clock_get_time() values are guaranteed to be increasing. because no one
738    * has called get_time on this clock yet we are free to adjust to any value
739    * without worrying about worrying about MAX() issues with the clock's
740    * internal time.
741    */
742
743   /* update our internal time so get_time() give something around base_time.
744      assume that the rate is 1 in the beginning. */
745   internal = gst_clock_get_internal_time (GST_CLOCK (ret));
746   gst_clock_set_calibration (GST_CLOCK (ret), internal, base_time, 1, 1);
747
748   {
749     GstClockTime now = gst_clock_get_time (GST_CLOCK (ret));
750
751     if (GST_CLOCK_DIFF (now, base_time) > 0 ||
752         GST_CLOCK_DIFF (now, base_time + GST_SECOND) < 0) {
753       g_warning ("unable to set the base time, expect sync problems!");
754     }
755   }
756
757   if (!gst_net_client_clock_start (ret))
758     goto failed_start;
759
760   /* all systems go, cap'n */
761   return (GstClock *) ret;
762
763 failed_start:
764   {
765     /* already printed a nice error */
766     gst_object_unref (ret);
767     return NULL;
768   }
769 }