2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2005 Wim Taymans <wim@fluendo.com>
4 * 2005 Andy Wingo <wingo@pobox.com>
5 * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
7 * gstnetclientclock.h: clock that synchronizes itself to a time provider over
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
26 * SECTION:gstnetclientclock
27 * @short_description: Special clock that synchronizes to a remote time
29 * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
31 * This object implements a custom #GstClock that synchronizes its time
32 * to a remote time provider such as #GstNetTimeProvider.
34 * A new clock is created with gst_net_client_clock_new() which takes the
35 * address and port of the remote time provider along with a name and
38 * This clock will poll the time provider and will update its calibration
39 * parameters based on the local and remote observations.
41 * The "round-trip" property limits the maximum round trip packets can take.
43 * Various parameters of the clock can be configured with the parent #GstClock
44 * "timeout", "window-size" and "window-threshold" object properties.
46 * A #GstNetClientClock is typically set on a #GstPipeline with
47 * gst_pipeline_use_clock().
49 * Last reviewed on 2005-11-23 (0.9.5)
56 #include "gstnettimepacket.h"
57 #include "gstnetclientclock.h"
61 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
62 #define GST_CAT_DEFAULT (ncc_debug)
64 #define DEFAULT_ADDRESS "127.0.0.1"
65 #define DEFAULT_PORT 5637
66 #define DEFAULT_TIMEOUT GST_SECOND
67 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
77 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj) \
78 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
80 struct _GstNetClientClockPrivate
85 GSocketAddress *servaddr;
88 GstClockTime timeout_expiration;
89 GstClockTime roundtrip_limit;
97 GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
98 #define gst_net_client_clock_parent_class parent_class
99 G_DEFINE_TYPE_WITH_CODE (GstNetClientClock, gst_net_client_clock,
100 GST_TYPE_SYSTEM_CLOCK, _do_init);
102 static void gst_net_client_clock_finalize (GObject * object);
103 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
104 const GValue * value, GParamSpec * pspec);
105 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
106 GValue * value, GParamSpec * pspec);
108 static void gst_net_client_clock_stop (GstNetClientClock * self);
111 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
113 GObjectClass *gobject_class;
115 gobject_class = G_OBJECT_CLASS (klass);
117 g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
119 gobject_class->finalize = gst_net_client_clock_finalize;
120 gobject_class->get_property = gst_net_client_clock_get_property;
121 gobject_class->set_property = gst_net_client_clock_set_property;
123 g_object_class_install_property (gobject_class, PROP_ADDRESS,
124 g_param_spec_string ("address", "address",
125 "The IP address of the machine providing a time server",
126 DEFAULT_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
127 g_object_class_install_property (gobject_class, PROP_PORT,
128 g_param_spec_int ("port", "port",
129 "The port on which the remote server is listening", 0, G_MAXUINT16,
130 DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133 * GstNetClientClock::roundtrip-limit:
135 * Maximum allowed round-trip for packets. If this property is set to a nonzero
136 * value, all packets with a round-trip interval larger than this limit will be
137 * ignored. This is useful for networks with severe and fluctuating transport
138 * delays. Filtering out these packets increases stability of the synchronization.
139 * On the other hand, the lower the limit, the higher the amount of filtered
140 * packets. Empirical tests are typically necessary to estimate a good value
142 * If the property is set to zero, the limit is disabled.
146 g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
147 g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
148 "Maximum tolerable round-trip interval for packets, in nanoseconds "
149 "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
150 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154 gst_net_client_clock_init (GstNetClientClock * self)
156 GstClock *clock = GST_CLOCK_CAST (self);
157 GstNetClientClockPrivate *priv;
159 self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
161 priv->port = DEFAULT_PORT;
162 priv->address = g_strdup (DEFAULT_ADDRESS);
164 gst_clock_set_timeout (clock, DEFAULT_TIMEOUT);
168 priv->servaddr = NULL;
169 priv->rtt_avg = GST_CLOCK_TIME_NONE;
170 priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
174 gst_net_client_clock_finalize (GObject * object)
176 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
178 if (self->priv->thread) {
179 gst_net_client_clock_stop (self);
182 g_free (self->priv->address);
183 self->priv->address = NULL;
185 if (self->priv->servaddr != NULL) {
186 g_object_unref (self->priv->servaddr);
187 self->priv->servaddr = NULL;
190 if (self->priv->socket != NULL) {
191 g_socket_close (self->priv->socket, NULL);
192 g_object_unref (self->priv->socket);
193 self->priv->socket = NULL;
196 G_OBJECT_CLASS (parent_class)->finalize (object);
200 gst_net_client_clock_set_property (GObject * object, guint prop_id,
201 const GValue * value, GParamSpec * pspec)
203 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
207 g_free (self->priv->address);
208 self->priv->address = g_value_dup_string (value);
209 if (self->priv->address == NULL)
210 self->priv->address = g_strdup (DEFAULT_ADDRESS);
213 self->priv->port = g_value_get_int (value);
215 case PROP_ROUNDTRIP_LIMIT:
216 self->priv->roundtrip_limit = g_value_get_uint64 (value);
219 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
225 gst_net_client_clock_get_property (GObject * object, guint prop_id,
226 GValue * value, GParamSpec * pspec)
228 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
232 g_value_set_string (value, self->priv->address);
235 g_value_set_int (value, self->priv->port);
237 case PROP_ROUNDTRIP_LIMIT:
238 g_value_set_uint64 (value, self->priv->roundtrip_limit);
241 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
247 gst_net_client_clock_observe_times (GstNetClientClock * self,
248 GstClockTime local_1, GstClockTime remote, GstClockTime local_2)
250 GstNetClientClockPrivate *priv = self->priv;
251 GstClockTime current_timeout;
252 GstClockTime local_avg;
257 if (local_2 < local_1) {
258 GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
259 " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
260 GST_TIME_ARGS (local_2));
261 goto bogus_observation;
264 rtt = GST_CLOCK_DIFF (local_1, local_2);
266 if ((self->priv->roundtrip_limit > 0) && (rtt > self->priv->roundtrip_limit)) {
267 GST_LOG_OBJECT (self,
268 "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
269 GST_TIME_FORMAT, GST_TIME_ARGS (rtt),
270 GST_TIME_ARGS (self->priv->roundtrip_limit));
271 goto bogus_observation;
274 /* Track an average round trip time, for a bit of smoothing */
275 /* Always update before discarding a sample, so genuine changes in
276 * the network get picked up, eventually */
277 if (priv->rtt_avg == GST_CLOCK_TIME_NONE)
279 else if (rtt < priv->rtt_avg) /* Shorter RTTs carry more weight than longer */
280 priv->rtt_avg = (3 * priv->rtt_avg + rtt) / 4;
282 priv->rtt_avg = (7 * priv->rtt_avg + rtt) / 8;
284 if (rtt > 2 * priv->rtt_avg) {
285 GST_LOG_OBJECT (self,
286 "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
287 GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (priv->rtt_avg));
288 goto bogus_observation;
291 local_avg = (local_2 + local_1) / 2;
293 GST_LOG_OBJECT (self, "local1 %" G_GUINT64_FORMAT " remote %" G_GUINT64_FORMAT
294 " localavg %" G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT,
295 local_1, remote, local_avg, local_2);
297 clock = GST_CLOCK_CAST (self);
299 if (gst_clock_add_observation (GST_CLOCK (self), local_avg, remote,
301 /* ghetto formula - shorter timeout for bad correlations */
302 current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
303 current_timeout = MIN (current_timeout, gst_clock_get_timeout (clock));
308 GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
309 self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout;
315 GST_WARNING_OBJECT (self,
316 "bogus round-trip interval; too long round trip or "
317 "receive time < send time (%" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
318 " => %" GST_TIME_FORMAT ")", GST_TIME_ARGS (local_1),
319 GST_TIME_ARGS (local_2), GST_TIME_ARGS (rtt));
320 /* Schedule a new packet again soon */
321 self->priv->timeout_expiration =
322 gst_util_get_timestamp () + (GST_SECOND / 4);
328 gst_net_client_clock_thread (gpointer data)
330 GstNetClientClock *self = data;
331 GstNetTimePacket *packet;
332 GSocket *socket = self->priv->socket;
334 GstClock *clock = data;
336 GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
338 g_socket_set_blocking (socket, TRUE);
339 g_socket_set_timeout (socket, 0);
341 while (!g_cancellable_is_cancelled (self->priv->cancel)) {
342 GstClockTime expiration_time = self->priv->timeout_expiration;
343 GstClockTime now = gst_util_get_timestamp ();
344 gint64 socket_timeout;
346 if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
349 socket_timeout = (expiration_time - now) / GST_USECOND;
352 GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
354 if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
355 self->priv->cancel, &err)) {
356 /* cancelled, timeout or error */
357 if (err->code == G_IO_ERROR_CANCELLED) {
358 GST_INFO_OBJECT (self, "cancelled");
359 g_clear_error (&err);
361 } else if (err->code == G_IO_ERROR_TIMED_OUT) {
362 /* timed out, let's send another packet */
363 GST_DEBUG_OBJECT (self, "timed out");
365 packet = gst_net_time_packet_new (NULL);
367 packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
369 GST_DEBUG_OBJECT (self,
370 "sending packet, local time = %" GST_TIME_FORMAT,
371 GST_TIME_ARGS (packet->local_time));
373 gst_net_time_packet_send (packet, self->priv->socket,
374 self->priv->servaddr, NULL);
378 /* reset timeout (but are expecting a response sooner anyway) */
379 self->priv->timeout_expiration =
380 gst_util_get_timestamp () + gst_clock_get_timeout (clock);
382 GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
383 g_usleep (G_USEC_PER_SEC / 10); /* throttle */
385 g_clear_error (&err);
387 GstClockTime new_local;
391 new_local = gst_clock_get_internal_time (GST_CLOCK (self));
393 packet = gst_net_time_packet_receive (socket, NULL, &err);
395 if (packet != NULL) {
396 GST_LOG_OBJECT (self, "got packet back");
397 GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
398 GST_TIME_ARGS (packet->local_time));
399 GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
400 GST_TIME_ARGS (packet->remote_time));
401 GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
402 GST_TIME_ARGS (new_local));
404 /* observe_times will reset the timeout */
405 gst_net_client_clock_observe_times (self, packet->local_time,
406 packet->remote_time, new_local);
409 } else if (err != NULL) {
410 GST_WARNING_OBJECT (self, "receive error: %s", err->message);
411 g_clear_error (&err);
416 GST_INFO_OBJECT (self, "shutting down net client clock thread");
421 gst_net_client_clock_start (GstNetClientClock * self)
423 GSocketAddress *servaddr;
424 GSocketAddress *myaddr;
425 GSocketAddress *anyaddr;
426 GInetAddress *inetaddr;
428 GError *error = NULL;
429 GSocketFamily family;
431 g_return_val_if_fail (self->priv->address != NULL, FALSE);
432 g_return_val_if_fail (self->priv->servaddr == NULL, FALSE);
434 /* create target address */
435 inetaddr = g_inet_address_new_from_string (self->priv->address);
436 if (inetaddr == NULL)
439 family = g_inet_address_get_family (inetaddr);
441 servaddr = g_inet_socket_address_new (inetaddr, self->priv->port);
442 g_object_unref (inetaddr);
444 g_assert (servaddr != NULL);
446 GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->priv->address,
449 socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
450 G_SOCKET_PROTOCOL_UDP, &error);
455 GST_DEBUG_OBJECT (self, "binding socket");
456 inetaddr = g_inet_address_new_any (family);
457 anyaddr = g_inet_socket_address_new (inetaddr, 0);
458 g_socket_bind (socket, anyaddr, TRUE, &error);
459 g_object_unref (anyaddr);
460 g_object_unref (inetaddr);
465 /* check address we're bound to, mostly for debugging purposes */
466 myaddr = g_socket_get_local_address (socket, &error);
469 goto getsockname_error;
471 GST_DEBUG_OBJECT (self, "socket opened on UDP port %hd",
472 g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
474 g_object_unref (myaddr);
476 self->priv->cancel = g_cancellable_new ();
477 self->priv->socket = socket;
478 self->priv->servaddr = G_SOCKET_ADDRESS (servaddr);
480 self->priv->thread = g_thread_try_new ("GstNetClientClock",
481 gst_net_client_clock_thread, self, &error);
491 GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
492 g_error_free (error);
497 GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
498 g_error_free (error);
499 g_object_unref (socket);
504 GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
505 g_error_free (error);
506 g_object_unref (socket);
511 GST_ERROR_OBJECT (self, "inet_address_new_from_string('%s') failed",
512 self->priv->address);
517 GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
518 g_object_unref (self->priv->servaddr);
519 self->priv->servaddr = NULL;
520 g_object_unref (self->priv->socket);
521 self->priv->socket = NULL;
522 g_error_free (error);
528 gst_net_client_clock_stop (GstNetClientClock * self)
530 if (self->priv->thread == NULL)
533 GST_INFO_OBJECT (self, "stopping...");
534 g_cancellable_cancel (self->priv->cancel);
536 g_thread_join (self->priv->thread);
537 self->priv->thread = NULL;
539 g_object_unref (self->priv->cancel);
540 self->priv->cancel = NULL;
542 g_object_unref (self->priv->servaddr);
543 self->priv->servaddr = NULL;
545 g_object_unref (self->priv->socket);
546 self->priv->socket = NULL;
548 GST_INFO_OBJECT (self, "stopped");
552 * gst_net_client_clock_new:
553 * @name: a name for the clock
554 * @remote_address: the address of the remote clock provider
555 * @remote_port: the port of the remote clock provider
556 * @base_time: initial time of the clock
558 * Create a new #GstNetClientClock that will report the time
559 * provided by the #GstNetTimeProvider on @remote_address and
562 * Returns: a new #GstClock that receives a time from the remote
566 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
567 gint remote_port, GstClockTime base_time)
569 /* FIXME: gst_net_client_clock_new() should be a thin wrapper for g_object_new() */
570 GstNetClientClock *ret;
571 GstClockTime internal;
573 g_return_val_if_fail (remote_address != NULL, NULL);
574 g_return_val_if_fail (remote_port > 0, NULL);
575 g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
576 g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
578 ret = g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "address", remote_address,
579 "port", remote_port, NULL);
581 /* gst_clock_get_time() values are guaranteed to be increasing. because no one
582 * has called get_time on this clock yet we are free to adjust to any value
583 * without worrying about worrying about MAX() issues with the clock's
587 /* update our internal time so get_time() give something around base_time.
588 assume that the rate is 1 in the beginning. */
589 internal = gst_clock_get_internal_time (GST_CLOCK (ret));
590 gst_clock_set_calibration (GST_CLOCK (ret), internal, base_time, 1, 1);
593 GstClockTime now = gst_clock_get_time (GST_CLOCK (ret));
595 if (GST_CLOCK_DIFF (now, base_time) > 0 ||
596 GST_CLOCK_DIFF (now, base_time + GST_SECOND) < 0) {
597 g_warning ("unable to set the base time, expect sync problems!");
601 if (!gst_net_client_clock_start (ret))
604 /* all systems go, cap'n */
605 return (GstClock *) ret;
609 /* already printed a nice error */
610 gst_object_unref (ret);