2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2005 Wim Taymans <wim@fluendo.com>
4 * 2005 Andy Wingo <wingo@pobox.com>
5 * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
7 * gstnetclientclock.h: clock that synchronizes itself to a time provider over
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
26 * SECTION:gstnetclientclock
27 * @short_description: Special clock that synchronizes to a remote time
29 * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
31 * This object implements a custom #GstClock that synchronizes its time
32 * to a remote time provider such as #GstNetTimeProvider.
34 * A new clock is created with gst_net_client_clock_new() which takes the
35 * address and port of the remote time provider along with a name and
38 * This clock will poll the time provider and will update its calibration
39 * parameters based on the local and remote observations.
41 * Various parameters of the clock can be configured with the parent #GstClock
42 * "timeout", "window-size" and "window-threshold" object properties.
44 * A #GstNetClientClock is typically set on a #GstPipeline with
45 * gst_pipeline_use_clock().
47 * Last reviewed on 2005-11-23 (0.9.5)
54 #include "gstnettimepacket.h"
55 #include "gstnetclientclock.h"
59 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
60 #define GST_CAT_DEFAULT (ncc_debug)
62 #define DEFAULT_ADDRESS "127.0.0.1"
63 #define DEFAULT_PORT 5637
64 #define DEFAULT_TIMEOUT GST_SECOND
73 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj) \
74 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
76 struct _GstNetClientClockPrivate
81 GSocketAddress *servaddr;
84 GstClockTime timeout_expiration;
92 GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
93 #define gst_net_client_clock_parent_class parent_class
94 G_DEFINE_TYPE_WITH_CODE (GstNetClientClock, gst_net_client_clock,
95 GST_TYPE_SYSTEM_CLOCK, _do_init);
97 static void gst_net_client_clock_finalize (GObject * object);
98 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
99 const GValue * value, GParamSpec * pspec);
100 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
101 GValue * value, GParamSpec * pspec);
103 static void gst_net_client_clock_stop (GstNetClientClock * self);
106 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
108 GObjectClass *gobject_class;
110 gobject_class = G_OBJECT_CLASS (klass);
112 g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
114 gobject_class->finalize = gst_net_client_clock_finalize;
115 gobject_class->get_property = gst_net_client_clock_get_property;
116 gobject_class->set_property = gst_net_client_clock_set_property;
118 g_object_class_install_property (gobject_class, PROP_ADDRESS,
119 g_param_spec_string ("address", "address",
120 "The IP address of the machine providing a time server",
121 DEFAULT_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
122 g_object_class_install_property (gobject_class, PROP_PORT,
123 g_param_spec_int ("port", "port",
124 "The port on which the remote server is listening", 0, G_MAXUINT16,
125 DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
129 gst_net_client_clock_init (GstNetClientClock * self)
131 GstClock *clock = GST_CLOCK_CAST (self);
132 GstNetClientClockPrivate *priv;
134 self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
136 priv->port = DEFAULT_PORT;
137 priv->address = g_strdup (DEFAULT_ADDRESS);
139 gst_clock_set_timeout (clock, DEFAULT_TIMEOUT);
143 priv->servaddr = NULL;
144 priv->rtt_avg = GST_CLOCK_TIME_NONE;
148 gst_net_client_clock_finalize (GObject * object)
150 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
152 if (self->priv->thread) {
153 gst_net_client_clock_stop (self);
156 g_free (self->priv->address);
157 self->priv->address = NULL;
159 if (self->priv->servaddr != NULL) {
160 g_object_unref (self->priv->servaddr);
161 self->priv->servaddr = NULL;
164 if (self->priv->socket != NULL) {
165 g_socket_close (self->priv->socket, NULL);
166 g_object_unref (self->priv->socket);
167 self->priv->socket = NULL;
170 G_OBJECT_CLASS (parent_class)->finalize (object);
174 gst_net_client_clock_set_property (GObject * object, guint prop_id,
175 const GValue * value, GParamSpec * pspec)
177 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
181 g_free (self->priv->address);
182 self->priv->address = g_value_dup_string (value);
183 if (self->priv->address == NULL)
184 self->priv->address = g_strdup (DEFAULT_ADDRESS);
187 self->priv->port = g_value_get_int (value);
190 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
196 gst_net_client_clock_get_property (GObject * object, guint prop_id,
197 GValue * value, GParamSpec * pspec)
199 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
203 g_value_set_string (value, self->priv->address);
206 g_value_set_int (value, self->priv->port);
209 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
215 gst_net_client_clock_observe_times (GstNetClientClock * self,
216 GstClockTime local_1, GstClockTime remote, GstClockTime local_2)
218 GstNetClientClockPrivate *priv = self->priv;
219 GstClockTime current_timeout;
220 GstClockTime local_avg;
225 if (local_2 < local_1)
226 goto bogus_observation;
228 rtt = local_2 - local_1;
230 /* Track an average round trip time, for a bit of smoothing */
231 /* Always update before discarding a sample, so genuine changes in
232 * the network get picked up, eventually */
233 if (priv->rtt_avg == GST_CLOCK_TIME_NONE)
235 else if (rtt < priv->rtt_avg) // Shorter RTTs carry more weight than longer
236 priv->rtt_avg = (3 * priv->rtt_avg + rtt) / 4;
238 priv->rtt_avg = (7 * priv->rtt_avg + rtt) / 8;
240 if (rtt > 2 * priv->rtt_avg) {
241 GST_LOG_OBJECT (self,
242 "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
243 GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (priv->rtt_avg));
244 goto bogus_observation;
247 local_avg = (local_2 + local_1) / 2;
249 GST_LOG_OBJECT (self, "local1 %" G_GUINT64_FORMAT " remote %" G_GUINT64_FORMAT
250 " localavg %" G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT,
251 local_1, remote, local_avg, local_2);
253 clock = GST_CLOCK_CAST (self);
255 if (gst_clock_add_observation (GST_CLOCK (self), local_avg, remote,
257 /* ghetto formula - shorter timeout for bad correlations */
258 current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
259 current_timeout = MIN (current_timeout, gst_clock_get_timeout (clock));
264 GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
265 self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout;
271 GST_WARNING_OBJECT (self, "time packet receive time < send time (%"
272 GST_TIME_FORMAT " < %" GST_TIME_FORMAT ") or too large",
273 GST_TIME_ARGS (local_1), GST_TIME_ARGS (local_2));
274 /* Schedule a new packet again soon */
275 self->priv->timeout_expiration =
276 gst_util_get_timestamp () + (GST_SECOND / 4);
282 gst_net_client_clock_thread (gpointer data)
284 GstNetClientClock *self = data;
285 GstNetTimePacket *packet;
286 GSocket *socket = self->priv->socket;
288 GstClock *clock = data;
290 GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
292 g_socket_set_blocking (socket, TRUE);
293 g_socket_set_timeout (socket, 0);
295 while (!g_cancellable_is_cancelled (self->priv->cancel)) {
296 GstClockTime expiration_time = self->priv->timeout_expiration;
297 GstClockTime now = gst_util_get_timestamp ();
298 gint64 socket_timeout;
300 if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
303 socket_timeout = (expiration_time - now) / GST_USECOND;
306 GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
308 if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
309 self->priv->cancel, &err)) {
310 /* cancelled, timeout or error */
311 if (err->code == G_IO_ERROR_CANCELLED) {
312 GST_INFO_OBJECT (self, "cancelled");
313 g_clear_error (&err);
315 } else if (err->code == G_IO_ERROR_TIMED_OUT) {
316 /* timed out, let's send another packet */
317 GST_DEBUG_OBJECT (self, "timed out");
319 packet = gst_net_time_packet_new (NULL);
321 packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
323 GST_DEBUG_OBJECT (self,
324 "sending packet, local time = %" GST_TIME_FORMAT,
325 GST_TIME_ARGS (packet->local_time));
327 gst_net_time_packet_send (packet, self->priv->socket,
328 self->priv->servaddr, NULL);
332 /* reset timeout (but are expecting a response sooner anyway) */
333 self->priv->timeout_expiration =
334 gst_util_get_timestamp () + gst_clock_get_timeout (clock);
336 GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
337 g_usleep (G_USEC_PER_SEC / 10); /* throttle */
339 g_clear_error (&err);
341 GstClockTime new_local;
345 new_local = gst_clock_get_internal_time (GST_CLOCK (self));
347 packet = gst_net_time_packet_receive (socket, NULL, &err);
349 if (packet != NULL) {
350 GST_LOG_OBJECT (self, "got packet back");
351 GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
352 GST_TIME_ARGS (packet->local_time));
353 GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
354 GST_TIME_ARGS (packet->remote_time));
355 GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
356 GST_TIME_ARGS (new_local));
358 /* observe_times will reset the timeout */
359 gst_net_client_clock_observe_times (self, packet->local_time,
360 packet->remote_time, new_local);
363 } else if (err != NULL) {
364 GST_WARNING_OBJECT (self, "receive error: %s", err->message);
365 g_clear_error (&err);
370 GST_INFO_OBJECT (self, "shutting down net client clock thread");
375 gst_net_client_clock_start (GstNetClientClock * self)
377 GSocketAddress *servaddr;
378 GSocketAddress *myaddr;
379 GSocketAddress *anyaddr;
380 GInetAddress *inetaddr;
382 GError *error = NULL;
383 GSocketFamily family;
385 g_return_val_if_fail (self->priv->address != NULL, FALSE);
386 g_return_val_if_fail (self->priv->servaddr == NULL, FALSE);
388 /* create target address */
389 inetaddr = g_inet_address_new_from_string (self->priv->address);
390 if (inetaddr == NULL)
393 family = g_inet_address_get_family (inetaddr);
395 servaddr = g_inet_socket_address_new (inetaddr, self->priv->port);
396 g_object_unref (inetaddr);
398 g_assert (servaddr != NULL);
400 GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->priv->address,
403 socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
404 G_SOCKET_PROTOCOL_UDP, &error);
409 GST_DEBUG_OBJECT (self, "binding socket");
410 inetaddr = g_inet_address_new_any (family);
411 anyaddr = g_inet_socket_address_new (inetaddr, 0);
412 g_socket_bind (socket, anyaddr, TRUE, &error);
413 g_object_unref (anyaddr);
414 g_object_unref (inetaddr);
419 /* check address we're bound to, mostly for debugging purposes */
420 myaddr = g_socket_get_local_address (socket, &error);
423 goto getsockname_error;
425 GST_DEBUG_OBJECT (self, "socket opened on UDP port %hd",
426 g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
428 g_object_unref (myaddr);
430 self->priv->cancel = g_cancellable_new ();
431 self->priv->socket = socket;
432 self->priv->servaddr = G_SOCKET_ADDRESS (servaddr);
434 self->priv->thread = g_thread_try_new ("GstNetClientClock",
435 gst_net_client_clock_thread, self, &error);
445 GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
446 g_error_free (error);
451 GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
452 g_error_free (error);
453 g_object_unref (socket);
458 GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
459 g_error_free (error);
460 g_object_unref (socket);
465 GST_ERROR_OBJECT (self, "inet_address_new_from_string('%s') failed",
466 self->priv->address);
471 GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
472 g_object_unref (self->priv->servaddr);
473 self->priv->servaddr = NULL;
474 g_object_unref (self->priv->socket);
475 self->priv->socket = NULL;
476 g_error_free (error);
482 gst_net_client_clock_stop (GstNetClientClock * self)
484 if (self->priv->thread == NULL)
487 GST_INFO_OBJECT (self, "stopping...");
488 g_cancellable_cancel (self->priv->cancel);
490 g_thread_join (self->priv->thread);
491 self->priv->thread = NULL;
493 g_object_unref (self->priv->cancel);
494 self->priv->cancel = NULL;
496 g_object_unref (self->priv->servaddr);
497 self->priv->servaddr = NULL;
499 g_object_unref (self->priv->socket);
500 self->priv->socket = NULL;
502 GST_INFO_OBJECT (self, "stopped");
506 * gst_net_client_clock_new:
507 * @name: a name for the clock
508 * @remote_address: the address of the remote clock provider
509 * @remote_port: the port of the remote clock provider
510 * @base_time: initial time of the clock
512 * Create a new #GstNetClientClock that will report the time
513 * provided by the #GstNetTimeProvider on @remote_address and
516 * Returns: a new #GstClock that receives a time from the remote
520 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
521 gint remote_port, GstClockTime base_time)
523 /* FIXME: gst_net_client_clock_new() should be a thin wrapper for g_object_new() */
524 GstNetClientClock *ret;
525 GstClockTime internal;
527 g_return_val_if_fail (remote_address != NULL, NULL);
528 g_return_val_if_fail (remote_port > 0, NULL);
529 g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
530 g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
532 ret = g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "address", remote_address,
533 "port", remote_port, NULL);
535 /* gst_clock_get_time() values are guaranteed to be increasing. because no one
536 * has called get_time on this clock yet we are free to adjust to any value
537 * without worrying about worrying about MAX() issues with the clock's
541 /* update our internal time so get_time() give something around base_time.
542 assume that the rate is 1 in the beginning. */
543 internal = gst_clock_get_internal_time (GST_CLOCK (ret));
544 gst_clock_set_calibration (GST_CLOCK (ret), internal, base_time, 1, 1);
547 GstClockTime now = gst_clock_get_time (GST_CLOCK (ret));
549 if (GST_CLOCK_DIFF (now, base_time) > 0 ||
550 GST_CLOCK_DIFF (now, base_time + GST_SECOND) < 0) {
551 g_warning ("unable to set the base time, expect sync problems!");
555 if (!gst_net_client_clock_start (ret))
558 /* all systems go, cap'n */
559 return (GstClock *) ret;
563 /* already printed a nice error */
564 gst_object_unref (ret);