11210512c9a55f8638b1f30bc7d73f2647588769
[platform/upstream/gstreamer.git] / libs / gst / net / gstnetclientclock.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2005 Andy Wingo <wingo@pobox.com>
5  * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6  *
7  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
8  * the network
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
25 /**
26  * SECTION:gstnetclientclock
27  * @short_description: Special clock that synchronizes to a remote time
28  *                     provider.
29  * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
30  *
31  * This object implements a custom #GstClock that synchronizes its time
32  * to a remote time provider such as #GstNetTimeProvider.
33  *
34  * A new clock is created with gst_net_client_clock_new() which takes the
35  * address and port of the remote time provider along with a name and
36  * an initial time.
37  *
38  * This clock will poll the time provider and will update its calibration
39  * parameters based on the local and remote observations.
40  *
41  * Various parameters of the clock can be configured with the parent #GstClock
42  * "timeout", "window-size" and "window-threshold" object properties.
43  *
44  * A #GstNetClientClock is typically set on a #GstPipeline with 
45  * gst_pipeline_use_clock().
46  *
47  * Last reviewed on 2005-11-23 (0.9.5)
48  */
49
50 #ifdef HAVE_CONFIG_H
51 #include "config.h"
52 #endif
53
54 #include "gstnettimepacket.h"
55 #include "gstnetclientclock.h"
56
57 #include <gio/gio.h>
58
59 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
60 #define GST_CAT_DEFAULT (ncc_debug)
61
62 #define DEFAULT_ADDRESS         "127.0.0.1"
63 #define DEFAULT_PORT            5637
64 #define DEFAULT_TIMEOUT         GST_SECOND
65
66 enum
67 {
68   PROP_0,
69   PROP_ADDRESS,
70   PROP_PORT
71 };
72
73 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
74   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate))
75
76 struct _GstNetClientClockPrivate
77 {
78   GThread *thread;
79
80   GSocket *socket;
81   GSocketAddress *servaddr;
82   GCancellable *cancel;
83
84   GstClockTime timeout_expiration;
85
86   gchar *address;
87   gint port;
88 };
89
90 #define _do_init \
91   GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
92 #define gst_net_client_clock_parent_class parent_class
93 G_DEFINE_TYPE_WITH_CODE (GstNetClientClock, gst_net_client_clock,
94     GST_TYPE_SYSTEM_CLOCK, _do_init);
95
96 static void gst_net_client_clock_finalize (GObject * object);
97 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
98     const GValue * value, GParamSpec * pspec);
99 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
100     GValue * value, GParamSpec * pspec);
101
102 static void gst_net_client_clock_stop (GstNetClientClock * self);
103
104 static void
105 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
106 {
107   GObjectClass *gobject_class;
108
109   gobject_class = G_OBJECT_CLASS (klass);
110
111   g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate));
112
113   gobject_class->finalize = gst_net_client_clock_finalize;
114   gobject_class->get_property = gst_net_client_clock_get_property;
115   gobject_class->set_property = gst_net_client_clock_set_property;
116
117   g_object_class_install_property (gobject_class, PROP_ADDRESS,
118       g_param_spec_string ("address", "address",
119           "The IP address of the machine providing a time server",
120           DEFAULT_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
121   g_object_class_install_property (gobject_class, PROP_PORT,
122       g_param_spec_int ("port", "port",
123           "The port on which the remote server is listening", 0, G_MAXUINT16,
124           DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
125 }
126
127 static void
128 gst_net_client_clock_init (GstNetClientClock * self)
129 {
130   GstClock *clock = GST_CLOCK_CAST (self);
131
132   self->priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
133
134   self->priv->port = DEFAULT_PORT;
135   self->priv->address = g_strdup (DEFAULT_ADDRESS);
136
137   clock->timeout = DEFAULT_TIMEOUT;
138
139   self->priv->thread = NULL;
140
141   self->priv->servaddr = NULL;
142 }
143
144 static void
145 gst_net_client_clock_finalize (GObject * object)
146 {
147   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
148
149   if (self->priv->thread) {
150     gst_net_client_clock_stop (self);
151   }
152
153   g_free (self->priv->address);
154   self->priv->address = NULL;
155
156   if (self->priv->servaddr != NULL) {
157     g_object_unref (self->priv->servaddr);
158     self->priv->servaddr = NULL;
159   }
160
161   if (self->priv->socket != NULL) {
162     g_socket_close (self->priv->socket, NULL);
163     g_object_unref (self->priv->socket);
164     self->priv->socket = NULL;
165   }
166
167   G_OBJECT_CLASS (parent_class)->finalize (object);
168 }
169
170 static void
171 gst_net_client_clock_set_property (GObject * object, guint prop_id,
172     const GValue * value, GParamSpec * pspec)
173 {
174   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
175
176   switch (prop_id) {
177     case PROP_ADDRESS:
178       g_free (self->priv->address);
179       self->priv->address = g_value_dup_string (value);
180       if (self->priv->address == NULL)
181         self->priv->address = g_strdup (DEFAULT_ADDRESS);
182       break;
183     case PROP_PORT:
184       self->priv->port = g_value_get_int (value);
185       break;
186     default:
187       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
188       break;
189   }
190 }
191
192 static void
193 gst_net_client_clock_get_property (GObject * object, guint prop_id,
194     GValue * value, GParamSpec * pspec)
195 {
196   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
197
198   switch (prop_id) {
199     case PROP_ADDRESS:
200       g_value_set_string (value, self->priv->address);
201       break;
202     case PROP_PORT:
203       g_value_set_int (value, self->priv->port);
204       break;
205     default:
206       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
207       break;
208   }
209 }
210
211 static void
212 gst_net_client_clock_observe_times (GstNetClientClock * self,
213     GstClockTime local_1, GstClockTime remote, GstClockTime local_2)
214 {
215   GstClockTime current_timeout;
216   GstClockTime local_avg;
217   gdouble r_squared;
218   GstClock *clock;
219
220   if (local_2 < local_1)
221     goto bogus_observation;
222
223   local_avg = (local_2 + local_1) / 2;
224
225   clock = GST_CLOCK_CAST (self);
226
227   gst_clock_add_observation (GST_CLOCK (self), local_avg, remote, &r_squared);
228
229   GST_CLOCK_SLAVE_LOCK (self);
230   if (clock->filling) {
231     current_timeout = 0;
232   } else {
233     /* geto formula */
234     current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
235     current_timeout = MIN (current_timeout, clock->timeout);
236   }
237   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
238   self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout;
239   GST_CLOCK_SLAVE_UNLOCK (clock);
240
241   return;
242
243 bogus_observation:
244   {
245     GST_WARNING_OBJECT (self, "time packet receive time < send time (%"
246         GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")", GST_TIME_ARGS (local_1),
247         GST_TIME_ARGS (local_2));
248     return;
249   }
250 }
251
252 typedef struct
253 {
254   GSource source;
255   GstNetClientClock *clock;
256   gboolean *p_timeout;
257 } GstNetClientClockTimeoutSource;
258
259 static gboolean
260 gst_net_client_clock_timeout_source_prepare (GSource * s, gint * p_timeout)
261 {
262   GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
263   GstClockTime expiration_time = source->clock->priv->timeout_expiration;
264   GstClockTime now = gst_util_get_timestamp ();
265
266   if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
267     *p_timeout = 0;
268     return TRUE;
269   }
270
271   *p_timeout = (expiration_time - now) / GST_MSECOND;
272   GST_TRACE_OBJECT (source->clock, "time out in %d ms please", *p_timeout);
273   return FALSE;
274 }
275
276 static gboolean
277 gst_net_client_clock_timeout_source_check (GSource * s)
278 {
279   GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
280
281   return (gst_util_get_timestamp () >= source->clock->priv->timeout_expiration);
282 }
283
284 static gboolean
285 gst_net_client_clock_timeout_source_dispatch (GSource * s, GSourceFunc cb,
286     gpointer data)
287 {
288   GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
289
290   GST_TRACE_OBJECT (source->clock, "timed out");
291   *source->p_timeout = TRUE;
292   return TRUE;
293 }
294
295 static gboolean
296 gst_net_client_clock_socket_cb (GSocket * socket, GIOCondition condition,
297     gpointer user_data)
298 {
299   GIOCondition *p_cond = user_data;
300
301   GST_TRACE ("socket %p I/O condition: 0x%02x", socket, condition);
302   *p_cond = condition;
303   return TRUE;
304 }
305
306 static gpointer
307 gst_net_client_clock_thread (gpointer data)
308 {
309   GstNetClientClock *self = data;
310   GstNetTimePacket *packet;
311   GMainContext *ctx;
312   GSourceFuncs funcs = { NULL, };
313   GSource *source;
314   GIOCondition cond;
315   gboolean timeout;
316   GSocket *socket = self->priv->socket;
317   GError *err = NULL;
318   GstClock *clock = data;
319
320   GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
321
322   g_socket_set_blocking (socket, TRUE);
323   g_socket_set_timeout (socket, 0);
324
325   ctx = g_main_context_new ();
326
327   source = g_socket_create_source (socket, G_IO_IN, self->priv->cancel);
328   g_source_set_name (source, "GStreamer net client clock thread socket");
329   g_source_set_callback (source, (GSourceFunc) gst_net_client_clock_socket_cb,
330       &cond, NULL);
331   g_source_attach (source, ctx);
332   g_source_unref (source);
333
334   /* GSocket only support second granularity for timeouts, so roll our own
335    * timeout source (so we don't have to create a new source whenever the
336    * timeout changes, as we would have to do with the default timeout source) */
337   funcs.prepare = gst_net_client_clock_timeout_source_prepare;
338   funcs.check = gst_net_client_clock_timeout_source_check;
339   funcs.dispatch = gst_net_client_clock_timeout_source_dispatch;
340   funcs.finalize = NULL;
341   source = g_source_new (&funcs, sizeof (GstNetClientClockTimeoutSource));
342   ((GstNetClientClockTimeoutSource *) source)->clock = self;
343   ((GstNetClientClockTimeoutSource *) source)->p_timeout = &timeout;
344   g_source_set_name (source, "GStreamer net client clock timeout");
345   g_source_attach (source, ctx);
346   g_source_unref (source);
347
348   while (!g_cancellable_is_cancelled (self->priv->cancel)) {
349     cond = 0;
350     timeout = FALSE;
351     g_main_context_iteration (ctx, TRUE);
352
353     if (g_cancellable_is_cancelled (self->priv->cancel))
354       break;
355
356     if (timeout) {
357       /* timed out, let's send another packet */
358       GST_DEBUG_OBJECT (self, "timed out");
359
360       packet = gst_net_time_packet_new (NULL);
361
362       packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
363
364       GST_DEBUG_OBJECT (self, "sending packet, local time = %" GST_TIME_FORMAT,
365           GST_TIME_ARGS (packet->local_time));
366
367       gst_net_time_packet_send (packet, self->priv->socket,
368           self->priv->servaddr, NULL);
369
370       g_free (packet);
371
372       /* reset timeout (but are expecting a response sooner anyway) */
373       self->priv->timeout_expiration =
374           gst_util_get_timestamp () + clock->timeout;
375       continue;
376     }
377
378     /* got data to read? */
379     if ((cond & G_IO_IN)) {
380       GstClockTime new_local;
381
382       new_local = gst_clock_get_internal_time (GST_CLOCK (self));
383
384       packet = gst_net_time_packet_receive (socket, NULL, &err);
385
386       if (err != NULL) {
387         GST_WARNING_OBJECT (self, "receive error: %s", err->message);
388         g_error_free (err);
389         err = NULL;
390         continue;
391       }
392
393       GST_LOG_OBJECT (self, "got packet back");
394       GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
395           GST_TIME_ARGS (packet->local_time));
396       GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
397           GST_TIME_ARGS (packet->remote_time));
398       GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
399           GST_TIME_ARGS (new_local));
400
401       /* observe_times will reset the timeout */
402       gst_net_client_clock_observe_times (self, packet->local_time,
403           packet->remote_time, new_local);
404
405       g_free (packet);
406       continue;
407     }
408
409     if ((cond & (G_IO_ERR | G_IO_HUP))) {
410       GST_DEBUG_OBJECT (self, "socket error?! %s", g_strerror (errno));
411       g_usleep (G_USEC_PER_SEC / 10);
412       continue;
413     }
414   }
415
416   GST_INFO_OBJECT (self, "shutting down net client clock thread");
417   g_main_context_unref (ctx);
418   return NULL;
419 }
420
421 static gboolean
422 gst_net_client_clock_start (GstNetClientClock * self)
423 {
424   GSocketAddress *servaddr;
425   GSocketAddress *myaddr;
426   GInetAddress *inetaddr;
427   GSocket *socket;
428   GError *error = NULL;
429
430   g_return_val_if_fail (self->priv->address != NULL, FALSE);
431   g_return_val_if_fail (self->priv->servaddr == NULL, FALSE);
432
433   socket = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_DATAGRAM,
434       G_SOCKET_PROTOCOL_UDP, &error);
435
436   if (socket == NULL)
437     goto no_socket;
438
439   /* check address we're bound to, mostly for debugging purposes */
440   myaddr = g_socket_get_local_address (socket, &error);
441
442   if (myaddr == NULL)
443     goto getsockname_error;
444
445   GST_DEBUG_OBJECT (self, "socket opened on UDP port %hd",
446       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
447
448   g_object_unref (myaddr);
449
450   /* create target address */
451   inetaddr = g_inet_address_new_from_string (self->priv->address);
452
453   if (inetaddr == NULL)
454     goto bad_address;
455
456   servaddr = g_inet_socket_address_new (inetaddr, self->priv->port);
457   g_object_unref (inetaddr);
458
459   g_assert (servaddr != NULL);
460
461   GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->priv->address,
462       self->priv->port);
463
464   self->priv->cancel = g_cancellable_new ();
465   self->priv->socket = socket;
466   self->priv->servaddr = G_SOCKET_ADDRESS (servaddr);
467
468 #if !GLIB_CHECK_VERSION (2, 31, 0)
469   self->priv->thread = g_thread_create (gst_net_client_clock_thread, self, TRUE,
470       &error);
471 #else
472   self->priv->thread = g_thread_try_new ("GstNetClientClock",
473       gst_net_client_clock_thread, self, &error);
474 #endif
475
476   if (error != NULL)
477     goto no_thread;
478
479   return TRUE;
480
481   /* ERRORS */
482 no_socket:
483   {
484     GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
485     g_error_free (error);
486     return FALSE;
487   }
488 getsockname_error:
489   {
490     GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
491     g_error_free (error);
492     g_object_unref (socket);
493     return FALSE;
494   }
495 bad_address:
496   {
497     GST_ERROR_OBJECT (self, "inet_address_new_from_string('%s') failed",
498         self->priv->address);
499     g_object_unref (socket);
500     return FALSE;
501   }
502 no_thread:
503   {
504     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
505     g_object_unref (self->priv->servaddr);
506     self->priv->servaddr = NULL;
507     g_object_unref (self->priv->socket);
508     self->priv->socket = NULL;
509     g_error_free (error);
510     return FALSE;
511   }
512 }
513
514 static void
515 gst_net_client_clock_stop (GstNetClientClock * self)
516 {
517   if (self->priv->thread == NULL)
518     return;
519
520   GST_INFO_OBJECT (self, "stopping...");
521   g_cancellable_cancel (self->priv->cancel);
522
523   g_thread_join (self->priv->thread);
524   self->priv->thread = NULL;
525
526   g_object_unref (self->priv->cancel);
527   self->priv->cancel = NULL;
528
529   g_object_unref (self->priv->servaddr);
530   self->priv->servaddr = NULL;
531
532   g_object_unref (self->priv->socket);
533   self->priv->socket = NULL;
534
535   GST_INFO_OBJECT (self, "stopped");
536 }
537
538 /**
539  * gst_net_client_clock_new:
540  * @name: a name for the clock
541  * @remote_address: the address of the remote clock provider
542  * @remote_port: the port of the remote clock provider
543  * @base_time: initial time of the clock
544  *
545  * Create a new #GstNetClientClock that will report the time
546  * provided by the #GstNetTimeProvider on @remote_address and 
547  * @remote_port.
548  *
549  * Returns: a new #GstClock that receives a time from the remote
550  * clock.
551  */
552 GstClock *
553 gst_net_client_clock_new (gchar * name, const gchar * remote_address,
554     gint remote_port, GstClockTime base_time)
555 {
556   /* FIXME: gst_net_client_clock_new() should be a thin wrapper for g_object_new() */
557   GstNetClientClock *ret;
558   GstClockTime internal;
559
560   g_return_val_if_fail (remote_address != NULL, NULL);
561   g_return_val_if_fail (remote_port > 0, NULL);
562   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
563   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
564
565   ret = g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "address", remote_address,
566       "port", remote_port, NULL);
567
568   /* gst_clock_get_time() values are guaranteed to be increasing. because no one
569    * has called get_time on this clock yet we are free to adjust to any value
570    * without worrying about worrying about MAX() issues with the clock's
571    * internal time.
572    */
573
574   /* update our internal time so get_time() give something around base_time.
575      assume that the rate is 1 in the beginning. */
576   internal = gst_clock_get_internal_time (GST_CLOCK (ret));
577   gst_clock_set_calibration (GST_CLOCK (ret), internal, base_time, 1, 1);
578
579   {
580     GstClockTime now = gst_clock_get_time (GST_CLOCK (ret));
581
582     if (GST_CLOCK_DIFF (now, base_time) > 0 ||
583         GST_CLOCK_DIFF (now, base_time + GST_SECOND) < 0) {
584       g_warning ("unable to set the base time, expect sync problems!");
585     }
586   }
587
588   if (!gst_net_client_clock_start (ret))
589     goto failed_start;
590
591   /* all systems go, cap'n */
592   return (GstClock *) ret;
593
594 failed_start:
595   {
596     /* already printed a nice error */
597     gst_object_unref (ret);
598     return NULL;
599   }
600 }