ptpclock: do not require a name to create a clock
[platform/upstream/gstreamer.git] / libs / gst / net / gstptpclock.c
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @title: GstPtpClock
23  * @short_description: Special clock that synchronizes to a remote time
24  *                     provider via PTP (IEEE1588:2008).
25  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
26  *
27  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
28  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
29  * clock in some specific domain.
30  *
31  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
32  * a helper process to do the actual communication via the PTP ports. This is
33  * required as PTP listens on ports < 1024 and thus requires special
34  * privileges. Once this helper process is started, the main process will
35  * synchronize to all PTP domains that are detected on the selected
36  * interfaces.
37  *
38  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
39  * time from a master clock inside a specific PTP domain. This clock will only
40  * return valid timestamps once the timestamps in the PTP domain are known. To
41  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
42  * signal and gst_clock_is_synced().
43  *
44  * To gather statistics about the PTP clock synchronization,
45  * gst_ptp_statistics_callback_add() can be used. This gives the application
46  * the possibility to collect all kinds of statistics from the clock
47  * synchronization.
48  *
49  * Since: 1.6
50  *
51  */
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include "gstptpclock.h"
57
58 #include "gstptp_private.h"
59
60 #ifdef HAVE_SYS_WAIT_H
61 #include <sys/wait.h>
62 #endif
63 #ifdef G_OS_WIN32
64 #define WIN32_LEAN_AND_MEAN
65 #include <windows.h>
66 #endif
67 #include <sys/types.h>
68
69 #ifdef HAVE_UNISTD_H
70 #include <unistd.h>
71 #elif defined(G_OS_WIN32)
72 #include <io.h>
73 #endif
74
75 #include <gst/base/base.h>
76
77 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
78 #define GST_CAT_DEFAULT (ptp_debug)
79
80 /* IEEE 1588 7.7.3.1 */
81 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
82
83 /* Use a running average for calculating the mean path delay instead
84  * of just using the last measurement. Enabling this helps in unreliable
85  * networks, like wifi, with often changing delays
86  *
87  * Undef for following IEEE1588-2008 by the letter
88  */
89 #define USE_RUNNING_AVERAGE_DELAY 1
90
91 /* Filter out any measurements that are above a certain threshold compared to
92  * previous measurements. Enabling this helps filtering out outliers that
93  * happen fairly often in unreliable networks, like wifi.
94  *
95  * Undef for following IEEE1588-2008 by the letter
96  */
97 #define USE_MEASUREMENT_FILTERING 1
98
99 /* Select the first clock from which we capture a SYNC message as the master
100  * clock of the domain until we are ready to run the best master clock
101  * algorithm. This allows faster syncing but might mean a change of the master
102  * clock in the beginning. As all clocks in a domain are supposed to use the
103  * same time, this shouldn't be much of a problem.
104  *
105  * Undef for following IEEE1588-2008 by the letter
106  */
107 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
108
109 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
110  * afterwards. This allows better synchronization in networks with varying
111  * delays, as for every other SYNC message we would have to assume that it's
112  * the average of what we saw before. But that might be completely off
113  */
114 #define USE_ONLY_SYNC_WITH_DELAY 1
115
116 /* Filter out delay measurements that are too far away from the median of the
117  * last delay measurements, currently those that are more than 2 times as big.
118  * This increases accuracy a lot on wifi.
119  */
120 #define USE_MEDIAN_PRE_FILTERING 1
121 #define MEDIAN_PRE_FILTERING_WINDOW 9
122
123 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
124 #define MAX_SKIPPED_UPDATES 5
125
126 typedef enum
127 {
128   PTP_MESSAGE_TYPE_SYNC = 0x0,
129   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
130   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
131   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
132   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
133   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
134   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
135   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
136   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
137   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
138 } PtpMessageType;
139
140 typedef struct
141 {
142   guint64 seconds_field;        /* 48 bits valid */
143   guint32 nanoseconds_field;
144 } PtpTimestamp;
145
146 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
147 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
148 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
149
150 typedef struct
151 {
152   guint64 clock_identity;
153   guint16 port_number;
154 } PtpClockIdentity;
155
156 static gint
157 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
158 {
159   if (a->clock_identity < b->clock_identity)
160     return -1;
161   else if (a->clock_identity > b->clock_identity)
162     return 1;
163
164   if (a->port_number < b->port_number)
165     return -1;
166   else if (a->port_number > b->port_number)
167     return 1;
168
169   return 0;
170 }
171
172 typedef struct
173 {
174   guint8 clock_class;
175   guint8 clock_accuracy;
176   guint16 offset_scaled_log_variance;
177 } PtpClockQuality;
178
179 typedef struct
180 {
181   guint8 transport_specific;
182   PtpMessageType message_type;
183   /* guint8 reserved; */
184   guint8 version_ptp;
185   guint16 message_length;
186   guint8 domain_number;
187   /* guint8 reserved; */
188   guint16 flag_field;
189   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
190   /* guint32 reserved; */
191   PtpClockIdentity source_port_identity;
192   guint16 sequence_id;
193   guint8 control_field;
194   gint8 log_message_interval;
195
196   union
197   {
198     struct
199     {
200       PtpTimestamp origin_timestamp;
201       gint16 current_utc_offset;
202       /* guint8 reserved; */
203       guint8 grandmaster_priority_1;
204       PtpClockQuality grandmaster_clock_quality;
205       guint8 grandmaster_priority_2;
206       guint64 grandmaster_identity;
207       guint16 steps_removed;
208       guint8 time_source;
209     } announce;
210
211     struct
212     {
213       PtpTimestamp origin_timestamp;
214     } sync;
215
216     struct
217     {
218       PtpTimestamp precise_origin_timestamp;
219     } follow_up;
220
221     struct
222     {
223       PtpTimestamp origin_timestamp;
224     } delay_req;
225
226     struct
227     {
228       PtpTimestamp receive_timestamp;
229       PtpClockIdentity requesting_port_identity;
230     } delay_resp;
231
232   } message_specific;
233 } PtpMessage;
234
235 static GMutex ptp_lock;
236 static GCond ptp_cond;
237 static gboolean initted = FALSE;
238 #ifdef HAVE_PTP
239 static gboolean supported = TRUE;
240 #else
241 static gboolean supported = FALSE;
242 #endif
243 static GPid ptp_helper_pid;
244 static GThread *ptp_helper_thread;
245 static GMainContext *main_context;
246 static GMainLoop *main_loop;
247 static GIOChannel *stdin_channel, *stdout_channel;
248 static GRand *delay_req_rand;
249 static GstClock *observation_system_clock;
250 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
251
252 typedef struct
253 {
254   GstClockTime receive_time;
255
256   PtpClockIdentity master_clock_identity;
257
258   guint8 grandmaster_priority_1;
259   PtpClockQuality grandmaster_clock_quality;
260   guint8 grandmaster_priority_2;
261   guint64 grandmaster_identity;
262   guint16 steps_removed;
263   guint8 time_source;
264
265   guint16 sequence_id;
266 } PtpAnnounceMessage;
267
268 typedef struct
269 {
270   PtpClockIdentity master_clock_identity;
271
272   GstClockTime announce_interval;       /* last interval we received */
273   GQueue announce_messages;
274 } PtpAnnounceSender;
275
276 typedef struct
277 {
278   guint domain;
279   PtpClockIdentity master_clock_identity;
280
281   guint16 sync_seqnum;
282   GstClockTime sync_recv_time_local;    /* t2 */
283   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
284   GstClockTime follow_up_recv_time_local;
285
286   GSource *timeout_source;
287   guint16 delay_req_seqnum;
288   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
289   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
290   GstClockTime delay_resp_recv_time_local;
291
292   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
293   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
294 } PtpPendingSync;
295
296 static void
297 ptp_pending_sync_free (PtpPendingSync * sync)
298 {
299   if (sync->timeout_source) {
300     g_source_destroy (sync->timeout_source);
301     g_source_unref (sync->timeout_source);
302   }
303   g_free (sync);
304 }
305
306 typedef struct
307 {
308   guint domain;
309
310   GstClockTime last_ptp_time;
311   GstClockTime last_local_time;
312   gint skipped_updates;
313
314   /* Used for selecting the master/grandmaster */
315   GList *announce_senders;
316
317   /* Last selected master clock */
318   gboolean have_master_clock;
319   PtpClockIdentity master_clock_identity;
320   guint64 grandmaster_identity;
321
322   /* Last SYNC or FOLLOW_UP timestamp we received */
323   GstClockTime last_ptp_sync_time;
324   GstClockTime sync_interval;
325
326   GstClockTime mean_path_delay;
327   GstClockTime last_delay_req, min_delay_req_interval;
328   guint16 last_delay_req_seqnum;
329
330   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
331   gint last_path_delays_missing;
332
333   GQueue pending_syncs;
334
335   GstClock *domain_clock;
336 } PtpDomainData;
337
338 static GList *domain_data;
339 static GMutex domain_clocks_lock;
340 static GList *domain_clocks;
341
342 /* Protected by PTP lock */
343 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
344 static GHookList domain_stats_hooks;
345 static gint domain_stats_n_hooks;
346 static gboolean domain_stats_hooks_initted = FALSE;
347
348 /* Converts log2 seconds to GstClockTime */
349 static GstClockTime
350 log2_to_clock_time (gint l)
351 {
352   if (l < 0)
353     return GST_SECOND >> (-l);
354   else
355     return GST_SECOND << l;
356 }
357
358 static void
359 dump_ptp_message (PtpMessage * msg)
360 {
361   GST_TRACE ("PTP message:");
362   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
363   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
364   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
365   GST_TRACE ("\tmessage_length: %u", msg->message_length);
366   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
367   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
368   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
369       (msg->correction_field / 65536),
370       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
371   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
372       msg->source_port_identity.clock_identity,
373       msg->source_port_identity.port_number);
374   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
375   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
376   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
377       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
378
379   switch (msg->message_type) {
380     case PTP_MESSAGE_TYPE_ANNOUNCE:
381       GST_TRACE ("\tANNOUNCE:");
382       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
383           msg->message_specific.announce.origin_timestamp.seconds_field,
384           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
385       GST_TRACE ("\t\tcurrent_utc_offset: %d",
386           msg->message_specific.announce.current_utc_offset);
387       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
388           msg->message_specific.announce.grandmaster_priority_1);
389       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
390           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
391           msg->message_specific.announce.
392           grandmaster_clock_quality.clock_accuracy,
393           msg->message_specific.announce.
394           grandmaster_clock_quality.offset_scaled_log_variance);
395       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
396           msg->message_specific.announce.grandmaster_priority_2);
397       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
398           msg->message_specific.announce.grandmaster_identity);
399       GST_TRACE ("\t\tsteps_removed: %u",
400           msg->message_specific.announce.steps_removed);
401       GST_TRACE ("\t\ttime_source: 0x%02x",
402           msg->message_specific.announce.time_source);
403       break;
404     case PTP_MESSAGE_TYPE_SYNC:
405       GST_TRACE ("\tSYNC:");
406       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
407           msg->message_specific.sync.origin_timestamp.seconds_field,
408           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
409       break;
410     case PTP_MESSAGE_TYPE_FOLLOW_UP:
411       GST_TRACE ("\tFOLLOW_UP:");
412       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
413           msg->message_specific.follow_up.
414           precise_origin_timestamp.seconds_field,
415           msg->message_specific.follow_up.
416           precise_origin_timestamp.nanoseconds_field);
417       break;
418     case PTP_MESSAGE_TYPE_DELAY_REQ:
419       GST_TRACE ("\tDELAY_REQ:");
420       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
421           msg->message_specific.delay_req.origin_timestamp.seconds_field,
422           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
423       break;
424     case PTP_MESSAGE_TYPE_DELAY_RESP:
425       GST_TRACE ("\tDELAY_RESP:");
426       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
427           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
428           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
429       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
430           "x %u",
431           msg->message_specific.delay_resp.
432           requesting_port_identity.clock_identity,
433           msg->message_specific.delay_resp.
434           requesting_port_identity.port_number);
435       break;
436     default:
437       break;
438   }
439   GST_TRACE (" ");
440 }
441
442 /* IEEE 1588-2008 5.3.3 */
443 static gboolean
444 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
445 {
446   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
447
448   timestamp->seconds_field =
449       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
450       gst_byte_reader_get_uint16_be_unchecked (reader);
451   timestamp->nanoseconds_field =
452       gst_byte_reader_get_uint32_be_unchecked (reader);
453
454   if (timestamp->nanoseconds_field >= 1000000000)
455     return FALSE;
456
457   return TRUE;
458 }
459
460 /* IEEE 1588-2008 13.3 */
461 static gboolean
462 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
463 {
464   guint8 b;
465
466   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
467
468   b = gst_byte_reader_get_uint8_unchecked (reader);
469   msg->transport_specific = b >> 4;
470   msg->message_type = b & 0x0f;
471
472   b = gst_byte_reader_get_uint8_unchecked (reader);
473   msg->version_ptp = b & 0x0f;
474   if (msg->version_ptp != 2) {
475     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
476     return FALSE;
477   }
478
479   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
480   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
481     GST_WARNING ("Not enough data (%u < %u)",
482         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
483     return FALSE;
484   }
485
486   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
487   gst_byte_reader_skip_unchecked (reader, 1);
488
489   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
490   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
491   gst_byte_reader_skip_unchecked (reader, 4);
492
493   msg->source_port_identity.clock_identity =
494       gst_byte_reader_get_uint64_be_unchecked (reader);
495   msg->source_port_identity.port_number =
496       gst_byte_reader_get_uint16_be_unchecked (reader);
497
498   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
499   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
500   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
501
502   return TRUE;
503 }
504
505 /* IEEE 1588-2008 13.5 */
506 static gboolean
507 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
508 {
509   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
510
511   if (gst_byte_reader_get_remaining (reader) < 20)
512     return FALSE;
513
514   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
515           reader))
516     return FALSE;
517
518   msg->message_specific.announce.current_utc_offset =
519       gst_byte_reader_get_uint16_be_unchecked (reader);
520   gst_byte_reader_skip_unchecked (reader, 1);
521
522   msg->message_specific.announce.grandmaster_priority_1 =
523       gst_byte_reader_get_uint8_unchecked (reader);
524   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
525       gst_byte_reader_get_uint8_unchecked (reader);
526   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
527       gst_byte_reader_get_uint8_unchecked (reader);
528   msg->message_specific.announce.
529       grandmaster_clock_quality.offset_scaled_log_variance =
530       gst_byte_reader_get_uint16_be_unchecked (reader);
531   msg->message_specific.announce.grandmaster_priority_2 =
532       gst_byte_reader_get_uint8_unchecked (reader);
533   msg->message_specific.announce.grandmaster_identity =
534       gst_byte_reader_get_uint64_be_unchecked (reader);
535   msg->message_specific.announce.steps_removed =
536       gst_byte_reader_get_uint16_be_unchecked (reader);
537   msg->message_specific.announce.time_source =
538       gst_byte_reader_get_uint8_unchecked (reader);
539
540   return TRUE;
541 }
542
543 /* IEEE 1588-2008 13.6 */
544 static gboolean
545 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
546 {
547   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
548
549   if (gst_byte_reader_get_remaining (reader) < 10)
550     return FALSE;
551
552   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
553           reader))
554     return FALSE;
555
556   return TRUE;
557 }
558
559 /* IEEE 1588-2008 13.6 */
560 static gboolean
561 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
562 {
563   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
564
565   if (gst_byte_reader_get_remaining (reader) < 10)
566     return FALSE;
567
568   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
569           reader))
570     return FALSE;
571
572   return TRUE;
573 }
574
575 /* IEEE 1588-2008 13.7 */
576 static gboolean
577 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
578 {
579   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
580
581   if (gst_byte_reader_get_remaining (reader) < 10)
582     return FALSE;
583
584   if (!parse_ptp_timestamp (&msg->message_specific.
585           follow_up.precise_origin_timestamp, reader))
586     return FALSE;
587
588   return TRUE;
589 }
590
591 /* IEEE 1588-2008 13.8 */
592 static gboolean
593 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
594 {
595   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
596       FALSE);
597
598   if (gst_byte_reader_get_remaining (reader) < 20)
599     return FALSE;
600
601   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
602           reader))
603     return FALSE;
604
605   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
606       gst_byte_reader_get_uint64_be_unchecked (reader);
607   msg->message_specific.delay_resp.requesting_port_identity.port_number =
608       gst_byte_reader_get_uint16_be_unchecked (reader);
609
610   return TRUE;
611 }
612
613 static gboolean
614 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
615 {
616   GstByteReader reader;
617   gboolean ret = FALSE;
618
619   gst_byte_reader_init (&reader, data, size);
620
621   if (!parse_ptp_message_header (msg, &reader)) {
622     GST_WARNING ("Failed to parse PTP message header");
623     return FALSE;
624   }
625
626   switch (msg->message_type) {
627     case PTP_MESSAGE_TYPE_SYNC:
628       ret = parse_ptp_message_sync (msg, &reader);
629       break;
630     case PTP_MESSAGE_TYPE_FOLLOW_UP:
631       ret = parse_ptp_message_follow_up (msg, &reader);
632       break;
633     case PTP_MESSAGE_TYPE_DELAY_REQ:
634       ret = parse_ptp_message_delay_req (msg, &reader);
635       break;
636     case PTP_MESSAGE_TYPE_DELAY_RESP:
637       ret = parse_ptp_message_delay_resp (msg, &reader);
638       break;
639     case PTP_MESSAGE_TYPE_ANNOUNCE:
640       ret = parse_ptp_message_announce (msg, &reader);
641       break;
642     default:
643       /* ignore for now */
644       break;
645   }
646
647   return ret;
648 }
649
650 static gint
651 compare_announce_message (const PtpAnnounceMessage * a,
652     const PtpAnnounceMessage * b)
653 {
654   /* IEEE 1588 Figure 27 */
655   if (a->grandmaster_identity == b->grandmaster_identity) {
656     if (a->steps_removed + 1 < b->steps_removed)
657       return -1;
658     else if (a->steps_removed > b->steps_removed + 1)
659       return 1;
660
661     /* Error cases are filtered out earlier */
662     if (a->steps_removed < b->steps_removed)
663       return -1;
664     else if (a->steps_removed > b->steps_removed)
665       return 1;
666
667     /* Error cases are filtered out earlier */
668     if (a->master_clock_identity.clock_identity <
669         b->master_clock_identity.clock_identity)
670       return -1;
671     else if (a->master_clock_identity.clock_identity >
672         b->master_clock_identity.clock_identity)
673       return 1;
674
675     /* Error cases are filtered out earlier */
676     if (a->master_clock_identity.port_number <
677         b->master_clock_identity.port_number)
678       return -1;
679     else if (a->master_clock_identity.port_number >
680         b->master_clock_identity.port_number)
681       return 1;
682     else
683       g_assert_not_reached ();
684
685     return 0;
686   }
687
688   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
689     return -1;
690   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
691     return 1;
692
693   if (a->grandmaster_clock_quality.clock_class <
694       b->grandmaster_clock_quality.clock_class)
695     return -1;
696   else if (a->grandmaster_clock_quality.clock_class >
697       b->grandmaster_clock_quality.clock_class)
698     return 1;
699
700   if (a->grandmaster_clock_quality.clock_accuracy <
701       b->grandmaster_clock_quality.clock_accuracy)
702     return -1;
703   else if (a->grandmaster_clock_quality.clock_accuracy >
704       b->grandmaster_clock_quality.clock_accuracy)
705     return 1;
706
707   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
708       b->grandmaster_clock_quality.offset_scaled_log_variance)
709     return -1;
710   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
711       b->grandmaster_clock_quality.offset_scaled_log_variance)
712     return 1;
713
714   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
715     return -1;
716   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
717     return 1;
718
719   if (a->grandmaster_identity < b->grandmaster_identity)
720     return -1;
721   else if (a->grandmaster_identity > b->grandmaster_identity)
722     return 1;
723   else
724     g_assert_not_reached ();
725
726   return 0;
727 }
728
729 static void
730 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
731 {
732   GList *qualified_messages = NULL;
733   GList *l, *m;
734   PtpAnnounceMessage *best = NULL;
735
736   /* IEEE 1588 9.3.2.5 */
737   for (l = domain->announce_senders; l; l = l->next) {
738     PtpAnnounceSender *sender = l->data;
739     GstClockTime window = 4 * sender->announce_interval;
740     gint count = 0;
741
742     for (m = sender->announce_messages.head; m; m = m->next) {
743       PtpAnnounceMessage *msg = m->data;
744
745       if (now - msg->receive_time <= window)
746         count++;
747     }
748
749     /* Only include the newest message of announce senders that had at least 2
750      * announce messages in the last 4 announce intervals. Which also means
751      * that we wait at least 4 announce intervals before we select a master
752      * clock. Until then we just report based on the newest SYNC we received
753      */
754     if (count >= 2) {
755       qualified_messages =
756           g_list_prepend (qualified_messages,
757           g_queue_peek_tail (&sender->announce_messages));
758     }
759   }
760
761   if (!qualified_messages) {
762     GST_DEBUG
763         ("No qualified announce messages for domain %u, can't select a master clock",
764         domain->domain);
765     domain->have_master_clock = FALSE;
766     return;
767   }
768
769   for (l = qualified_messages; l; l = l->next) {
770     PtpAnnounceMessage *msg = l->data;
771
772     if (!best || compare_announce_message (msg, best) < 0)
773       best = msg;
774   }
775
776   if (domain->have_master_clock
777       && compare_clock_identity (&domain->master_clock_identity,
778           &best->master_clock_identity) == 0) {
779     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
780   } else {
781     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
782         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
783         domain->domain, best->master_clock_identity.clock_identity,
784         best->master_clock_identity.port_number, best->grandmaster_identity);
785
786     domain->have_master_clock = TRUE;
787     domain->grandmaster_identity = best->grandmaster_identity;
788
789     /* Opportunistic master clock selection likely gave us the same master
790      * clock before, no need to reset all statistics */
791     if (compare_clock_identity (&domain->master_clock_identity,
792             &best->master_clock_identity) != 0) {
793       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
794           sizeof (PtpClockIdentity));
795       domain->mean_path_delay = 0;
796       domain->last_delay_req = 0;
797       domain->last_path_delays_missing = 9;
798       domain->min_delay_req_interval = 0;
799       domain->sync_interval = 0;
800       domain->last_ptp_sync_time = 0;
801       domain->skipped_updates = 0;
802       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
803           NULL);
804       g_queue_clear (&domain->pending_syncs);
805     }
806
807     if (g_atomic_int_get (&domain_stats_n_hooks)) {
808       GstStructure *stats =
809           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
810           "domain", G_TYPE_UINT, domain->domain,
811           "master-clock-id", G_TYPE_UINT64,
812           domain->master_clock_identity.clock_identity,
813           "master-clock-port", G_TYPE_UINT,
814           domain->master_clock_identity.port_number,
815           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
816           NULL);
817       emit_ptp_statistics (domain->domain, stats);
818       gst_structure_free (stats);
819     }
820   }
821 }
822
823 static void
824 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
825 {
826   GList *l;
827   PtpDomainData *domain = NULL;
828   PtpAnnounceSender *sender = NULL;
829   PtpAnnounceMessage *announce;
830
831   /* IEEE1588 9.3.2.2 e)
832    * Don't consider messages with the alternate master flag set
833    */
834   if ((msg->flag_field & 0x0100))
835     return;
836
837   /* IEEE 1588 9.3.2.5 d)
838    * Don't consider announce messages with steps_removed>=255
839    */
840   if (msg->message_specific.announce.steps_removed >= 255)
841     return;
842
843   for (l = domain_data; l; l = l->next) {
844     PtpDomainData *tmp = l->data;
845
846     if (tmp->domain == msg->domain_number) {
847       domain = tmp;
848       break;
849     }
850   }
851
852   if (!domain) {
853     gchar *clock_name;
854
855     domain = g_new0 (PtpDomainData, 1);
856     domain->domain = msg->domain_number;
857     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
858     domain->domain_clock =
859         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
860     gst_object_ref_sink (domain->domain_clock);
861     g_free (clock_name);
862     g_queue_init (&domain->pending_syncs);
863     domain->last_path_delays_missing = 9;
864     domain_data = g_list_prepend (domain_data, domain);
865
866     g_mutex_lock (&domain_clocks_lock);
867     domain_clocks = g_list_prepend (domain_clocks, domain);
868     g_mutex_unlock (&domain_clocks_lock);
869
870     if (g_atomic_int_get (&domain_stats_n_hooks)) {
871       GstStructure *stats =
872           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
873           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
874           domain->domain_clock, NULL);
875       emit_ptp_statistics (domain->domain, stats);
876       gst_structure_free (stats);
877     }
878   }
879
880   for (l = domain->announce_senders; l; l = l->next) {
881     PtpAnnounceSender *tmp = l->data;
882
883     if (compare_clock_identity (&tmp->master_clock_identity,
884             &msg->source_port_identity) == 0) {
885       sender = tmp;
886       break;
887     }
888   }
889
890   if (!sender) {
891     sender = g_new0 (PtpAnnounceSender, 1);
892
893     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
894         sizeof (PtpClockIdentity));
895     g_queue_init (&sender->announce_messages);
896     domain->announce_senders =
897         g_list_prepend (domain->announce_senders, sender);
898   }
899
900   for (l = sender->announce_messages.head; l; l = l->next) {
901     PtpAnnounceMessage *tmp = l->data;
902
903     /* IEEE 1588 9.3.2.5 c)
904      * Don't consider identical messages, i.e. duplicates
905      */
906     if (tmp->sequence_id == msg->sequence_id)
907       return;
908   }
909
910   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
911
912   announce = g_new0 (PtpAnnounceMessage, 1);
913   announce->receive_time = receive_time;
914   announce->sequence_id = msg->sequence_id;
915   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
916       sizeof (PtpClockIdentity));
917   announce->grandmaster_identity =
918       msg->message_specific.announce.grandmaster_identity;
919   announce->grandmaster_priority_1 =
920       msg->message_specific.announce.grandmaster_priority_1;
921   announce->grandmaster_clock_quality.clock_class =
922       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
923   announce->grandmaster_clock_quality.clock_accuracy =
924       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
925   announce->grandmaster_clock_quality.offset_scaled_log_variance =
926       msg->message_specific.announce.
927       grandmaster_clock_quality.offset_scaled_log_variance;
928   announce->grandmaster_priority_2 =
929       msg->message_specific.announce.grandmaster_priority_2;
930   announce->steps_removed = msg->message_specific.announce.steps_removed;
931   announce->time_source = msg->message_specific.announce.time_source;
932   g_queue_push_tail (&sender->announce_messages, announce);
933
934   select_best_master_clock (domain, receive_time);
935 }
936
937 static gboolean
938 send_delay_req_timeout (PtpPendingSync * sync)
939 {
940   StdIOHeader header = { 0, };
941   guint8 delay_req[44];
942   GstByteWriter writer;
943   GIOStatus status;
944   gsize written;
945   GError *err = NULL;
946
947   header.type = TYPE_EVENT;
948   header.size = 44;
949
950   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
951   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
952   gst_byte_writer_put_uint8_unchecked (&writer, 2);
953   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
954   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
955   gst_byte_writer_put_uint8_unchecked (&writer, 0);
956   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
957   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
958   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
959   gst_byte_writer_put_uint64_be_unchecked (&writer,
960       ptp_clock_id.clock_identity);
961   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
962   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
963   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
964   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
965   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
966   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
967
968   status =
969       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
970       sizeof (header), &written, &err);
971   if (status == G_IO_STATUS_ERROR) {
972     g_warning ("Failed to write to stdout: %s", err->message);
973     g_clear_error (&err);
974     return G_SOURCE_REMOVE;
975   } else if (status == G_IO_STATUS_EOF) {
976     g_message ("EOF on stdout");
977     g_main_loop_quit (main_loop);
978     return G_SOURCE_REMOVE;
979   } else if (status != G_IO_STATUS_NORMAL) {
980     g_warning ("Unexpected stdout write status: %d", status);
981     g_main_loop_quit (main_loop);
982     return G_SOURCE_REMOVE;
983   } else if (written != sizeof (header)) {
984     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
985     g_main_loop_quit (main_loop);
986     return G_SOURCE_REMOVE;
987   }
988
989   sync->delay_req_send_time_local =
990       gst_clock_get_time (observation_system_clock);
991
992   status =
993       g_io_channel_write_chars (stdout_channel,
994       (const gchar *) delay_req, 44, &written, &err);
995   if (status == G_IO_STATUS_ERROR) {
996     g_warning ("Failed to write to stdout: %s", err->message);
997     g_clear_error (&err);
998     g_main_loop_quit (main_loop);
999     return G_SOURCE_REMOVE;
1000   } else if (status == G_IO_STATUS_EOF) {
1001     g_message ("EOF on stdout");
1002     g_main_loop_quit (main_loop);
1003     return G_SOURCE_REMOVE;
1004   } else if (status != G_IO_STATUS_NORMAL) {
1005     g_warning ("Unexpected stdout write status: %d", status);
1006     g_main_loop_quit (main_loop);
1007     return G_SOURCE_REMOVE;
1008   } else if (written != 44) {
1009     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
1010     g_main_loop_quit (main_loop);
1011     return G_SOURCE_REMOVE;
1012   }
1013
1014   return G_SOURCE_REMOVE;
1015 }
1016
1017 static gboolean
1018 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
1019 {
1020   GstClockTime now = gst_clock_get_time (observation_system_clock);
1021   guint timeout;
1022   GSource *timeout_source;
1023
1024   if (domain->last_delay_req != 0
1025       && domain->last_delay_req + domain->min_delay_req_interval > now)
1026     return FALSE;
1027
1028   domain->last_delay_req = now;
1029   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1030
1031   /* IEEE 1588 9.5.11.2 */
1032   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1033     timeout = 0;
1034   else
1035     timeout =
1036         g_rand_int_range (delay_req_rand, 0,
1037         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1038
1039   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1040   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1041   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1042       sync, NULL);
1043   g_source_attach (timeout_source, main_context);
1044
1045   return TRUE;
1046 }
1047
1048 /* Filtering of outliers for RTT and time calculations inspired
1049  * by the code from gstnetclientclock.c
1050  */
1051 static void
1052 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1053 {
1054   GstClockTime internal_time, external_time, rate_num, rate_den;
1055   GstClockTime corrected_ptp_time, corrected_local_time;
1056   gdouble r_squared = 0.0;
1057   gboolean synced;
1058   GstClockTimeDiff discont = 0;
1059   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1060 #ifdef USE_MEASUREMENT_FILTERING
1061   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1062       orig_rate_den;
1063   GstClockTime new_estimated_ptp_time;
1064   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1065   gboolean now_synced;
1066 #endif
1067
1068 #ifdef USE_ONLY_SYNC_WITH_DELAY
1069   GstClockTime mean_path_delay;
1070
1071   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE)
1072     return;
1073
1074   /* IEEE 1588 11.3 */
1075   mean_path_delay =
1076       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1077       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1078       (sync->correction_field_sync + sync->correction_field_delay +
1079           32768) / 65536) / 2;
1080 #endif
1081
1082   /* IEEE 1588 11.2 */
1083   corrected_ptp_time =
1084       sync->sync_send_time_remote +
1085       (sync->correction_field_sync + 32768) / 65536;
1086
1087 #ifdef USE_ONLY_SYNC_WITH_DELAY
1088   corrected_local_time = sync->sync_recv_time_local - mean_path_delay;
1089 #else
1090   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1091 #endif
1092
1093 #ifdef USE_MEASUREMENT_FILTERING
1094   /* We check this here and when updating the mean path delay, because
1095    * we can get here without a delay response too */
1096   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1097       && sync->follow_up_recv_time_local >
1098       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1099     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1100         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1101         GST_TIME_ARGS (sync->follow_up_recv_time_local),
1102         GST_TIME_ARGS (domain->mean_path_delay));
1103     synced = FALSE;
1104     gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1105         &internal_time, &external_time, &rate_num, &rate_den);
1106     goto out;
1107   }
1108 #endif
1109
1110   /* Set an initial local-remote relation */
1111   if (domain->last_ptp_time == 0)
1112     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1113         corrected_ptp_time, 1, 1);
1114
1115 #ifdef USE_MEASUREMENT_FILTERING
1116   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1117    * estimate with our present knowledge about the clock
1118    */
1119   /* Store what the clock produced as 'now' before this update */
1120   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1121       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1122   internal_time = orig_internal_time;
1123   external_time = orig_external_time;
1124   rate_num = orig_rate_num;
1125   rate_den = orig_rate_den;
1126
1127   /* 3/4 RTT window around the estimation */
1128   max_discont = domain->mean_path_delay * 3 / 2;
1129
1130   /* Check if the estimated sync time is inside our window */
1131   estimated_ptp_time_min = corrected_local_time - max_discont;
1132   estimated_ptp_time_min =
1133       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1134       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1135   estimated_ptp_time_max = corrected_local_time + max_discont;
1136   estimated_ptp_time_max =
1137       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1138       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1139
1140   synced = (estimated_ptp_time_min < corrected_ptp_time
1141       && corrected_ptp_time < estimated_ptp_time_max);
1142
1143   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1144       GST_TIME_FORMAT, domain->domain,
1145       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1146
1147   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1148       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1149       GST_TIME_ARGS (corrected_ptp_time),
1150       GST_TIME_ARGS (estimated_ptp_time_max));
1151
1152   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1153           corrected_local_time, corrected_ptp_time, &r_squared,
1154           &internal_time, &external_time, &rate_num, &rate_den)) {
1155     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1156
1157     /* Old estimated PTP time based on receive time and path delay */
1158     estimated_ptp_time = corrected_local_time;
1159     estimated_ptp_time =
1160         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1161         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1162         orig_external_time, orig_rate_num, orig_rate_den);
1163
1164     /* New estimated PTP time based on receive time and path delay */
1165     new_estimated_ptp_time = corrected_local_time;
1166     new_estimated_ptp_time =
1167         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1168         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1169         external_time, rate_num, rate_den);
1170
1171     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1172     if (synced && ABS (discont) > max_discont) {
1173       GstClockTimeDiff offset;
1174       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1175           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1176           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1177           GST_TIME_ARGS (max_discont));
1178       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1179         offset = max_discont - discont;
1180         if (-offset > external_time)
1181           external_time = 0;
1182         else
1183           external_time += offset;
1184       } else {                  /* Too large a backward step - add a +ve offset */
1185         offset = -(max_discont + discont);
1186         external_time += offset;
1187       }
1188
1189       discont += offset;
1190     } else {
1191       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1192           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1193           GST_TIME_ARGS (max_discont));
1194     }
1195
1196     /* Check if the estimated sync time is now (still) inside our window */
1197     estimated_ptp_time_min = corrected_local_time - max_discont;
1198     estimated_ptp_time_min =
1199         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1200         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1201         external_time, rate_num, rate_den);
1202     estimated_ptp_time_max = corrected_local_time + max_discont;
1203     estimated_ptp_time_max =
1204         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1205         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1206         external_time, rate_num, rate_den);
1207
1208     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1209         && corrected_ptp_time < estimated_ptp_time_max);
1210
1211     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1212         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1213         GST_TIME_ARGS (corrected_ptp_time),
1214         GST_TIME_ARGS (estimated_ptp_time_max));
1215
1216     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1217       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1218           internal_time, external_time, rate_num, rate_den);
1219       domain->skipped_updates = 0;
1220
1221       domain->last_ptp_time = corrected_ptp_time;
1222       domain->last_local_time = corrected_local_time;
1223     } else {
1224       domain->skipped_updates++;
1225     }
1226   } else {
1227     domain->last_ptp_time = corrected_ptp_time;
1228     domain->last_local_time = corrected_local_time;
1229   }
1230
1231 #else
1232   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1233       GST_TIME_FORMAT, domain->domain,
1234       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1235
1236   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1237       &internal_time, &external_time, &rate_num, &rate_den);
1238
1239   estimated_ptp_time = corrected_local_time;
1240   estimated_ptp_time =
1241       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1242       (domain->domain_clock), estimated_ptp_time, internal_time,
1243       external_time, rate_num, rate_den);
1244
1245   gst_clock_add_observation (domain->domain_clock,
1246       corrected_local_time, corrected_ptp_time, &r_squared);
1247
1248   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1249       &internal_time, &external_time, &rate_num, &rate_den);
1250
1251   synced = TRUE;
1252   domain->last_ptp_time = corrected_ptp_time;
1253   domain->last_local_time = corrected_local_time;
1254 #endif
1255
1256 #ifdef USE_MEASUREMENT_FILTERING
1257 out:
1258 #endif
1259   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1260     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1261         "domain", G_TYPE_UINT, domain->domain,
1262         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1263         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1264         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1265         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1266         "discontinuity", G_TYPE_INT64, discont,
1267         "synced", G_TYPE_BOOLEAN, synced,
1268         "r-squared", G_TYPE_DOUBLE, r_squared,
1269         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1270         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1271         "rate-num", G_TYPE_UINT64, rate_num,
1272         "rate-den", G_TYPE_UINT64, rate_den,
1273         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1274         NULL);
1275     emit_ptp_statistics (domain->domain, stats);
1276     gst_structure_free (stats);
1277   }
1278
1279 }
1280
1281 #ifdef USE_MEDIAN_PRE_FILTERING
1282 static gint
1283 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1284 {
1285   if (*a < *b)
1286     return -1;
1287   else if (*a > *b)
1288     return 1;
1289   return 0;
1290 }
1291 #endif
1292
1293 static gboolean
1294 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1295 {
1296 #ifdef USE_MEDIAN_PRE_FILTERING
1297   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
1298   GstClockTime median;
1299   gint i;
1300 #endif
1301
1302   GstClockTime mean_path_delay, delay_req_delay = 0;
1303   gboolean ret;
1304
1305   /* IEEE 1588 11.3 */
1306   mean_path_delay =
1307       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1308       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1309       (sync->correction_field_sync + sync->correction_field_delay +
1310           32768) / 65536) / 2;
1311
1312 #ifdef USE_MEDIAN_PRE_FILTERING
1313   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
1314     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1315   domain->last_path_delays[i - 1] = mean_path_delay;
1316
1317   if (domain->last_path_delays_missing) {
1318     domain->last_path_delays_missing--;
1319   } else {
1320     memcpy (&last_path_delays, &domain->last_path_delays,
1321         sizeof (last_path_delays));
1322     g_qsort_with_data (&last_path_delays,
1323         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
1324         (GCompareDataFunc) compare_clock_time, NULL);
1325
1326     median = last_path_delays[MEDIAN_PRE_FILTERING_WINDOW / 2];
1327
1328     /* FIXME: We might want to use something else here, like only allowing
1329      * things in the interquartile range, or also filtering away delays that
1330      * are too small compared to the median. This here worked well enough
1331      * in tests so far.
1332      */
1333     if (mean_path_delay > 2 * median) {
1334       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1335           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1336           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1337       ret = FALSE;
1338       goto out;
1339     }
1340   }
1341 #endif
1342
1343 #ifdef USE_RUNNING_AVERAGE_DELAY
1344   /* Track an average round trip time, for a bit of smoothing */
1345   /* Always update before discarding a sample, so genuine changes in
1346    * the network get picked up, eventually */
1347   if (domain->mean_path_delay == 0)
1348     domain->mean_path_delay = mean_path_delay;
1349   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1350     domain->mean_path_delay =
1351         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1352   else
1353     domain->mean_path_delay =
1354         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1355 #else
1356   domain->mean_path_delay = mean_path_delay;
1357 #endif
1358
1359 #ifdef USE_MEASUREMENT_FILTERING
1360   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1361       domain->mean_path_delay != 0
1362       && sync->follow_up_recv_time_local >
1363       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1364     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1365         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1366         GST_TIME_ARGS (sync->follow_up_recv_time_local -
1367             sync->sync_recv_time_local),
1368         GST_TIME_ARGS (domain->mean_path_delay));
1369     ret = FALSE;
1370     goto out;
1371   }
1372
1373   if (mean_path_delay > 2 * domain->mean_path_delay) {
1374     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1375         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1376         GST_TIME_ARGS (mean_path_delay),
1377         GST_TIME_ARGS (domain->mean_path_delay));
1378     ret = FALSE;
1379     goto out;
1380   }
1381 #endif
1382
1383   delay_req_delay =
1384       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1385
1386 #ifdef USE_MEASUREMENT_FILTERING
1387   /* delay_req_delay is a RTT, so 2 times the path delay */
1388   if (delay_req_delay > 4 * domain->mean_path_delay) {
1389     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1390         GST_TIME_FORMAT " > 4 * %" GST_TIME_FORMAT, domain->domain,
1391         GST_TIME_ARGS (delay_req_delay),
1392         GST_TIME_ARGS (domain->mean_path_delay));
1393     ret = FALSE;
1394     goto out;
1395   }
1396 #endif
1397
1398   ret = TRUE;
1399
1400   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1401       GST_TIME_FORMAT ")", domain->domain,
1402       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1403   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1404       domain->domain, GST_TIME_ARGS (delay_req_delay));
1405
1406 #ifdef USE_MEASUREMENT_FILTERING
1407 out:
1408 #endif
1409   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1410     GstStructure *stats =
1411         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1412         "domain", G_TYPE_UINT, domain->domain,
1413         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1414         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1415         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1416     emit_ptp_statistics (domain->domain, stats);
1417     gst_structure_free (stats);
1418   }
1419
1420   return ret;
1421 }
1422
1423 static void
1424 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1425 {
1426   GList *l;
1427   PtpDomainData *domain = NULL;
1428   PtpPendingSync *sync = NULL;
1429
1430   /* Don't consider messages with the alternate master flag set */
1431   if ((msg->flag_field & 0x0100))
1432     return;
1433
1434   for (l = domain_data; l; l = l->next) {
1435     PtpDomainData *tmp = l->data;
1436
1437     if (msg->domain_number == tmp->domain) {
1438       domain = tmp;
1439       break;
1440     }
1441   }
1442
1443   if (!domain) {
1444     gchar *clock_name;
1445
1446     domain = g_new0 (PtpDomainData, 1);
1447     domain->domain = msg->domain_number;
1448     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1449     domain->domain_clock =
1450         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1451     gst_object_ref_sink (domain->domain_clock);
1452     g_free (clock_name);
1453     g_queue_init (&domain->pending_syncs);
1454     domain->last_path_delays_missing = 9;
1455     domain_data = g_list_prepend (domain_data, domain);
1456
1457     g_mutex_lock (&domain_clocks_lock);
1458     domain_clocks = g_list_prepend (domain_clocks, domain);
1459     g_mutex_unlock (&domain_clocks_lock);
1460   }
1461
1462   /* If we have a master clock, ignore this message if it's not coming from there */
1463   if (domain->have_master_clock
1464       && compare_clock_identity (&domain->master_clock_identity,
1465           &msg->source_port_identity) != 0)
1466     return;
1467
1468 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1469   /* Opportunistic selection of master clock */
1470   if (!domain->have_master_clock)
1471     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1472         sizeof (PtpClockIdentity));
1473 #else
1474   if (!domain->have_master_clock)
1475     return;
1476 #endif
1477
1478   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1479
1480   /* Check if duplicated */
1481   for (l = domain->pending_syncs.head; l; l = l->next) {
1482     PtpPendingSync *tmp = l->data;
1483
1484     if (tmp->sync_seqnum == msg->sequence_id)
1485       return;
1486   }
1487
1488   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1489       GST_CLOCK_TIME_NONE / GST_SECOND) {
1490     GST_FIXME ("Unsupported sync message seconds field value: %"
1491         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1492         msg->message_specific.sync.origin_timestamp.seconds_field,
1493         GST_CLOCK_TIME_NONE / GST_SECOND);
1494     return;
1495   }
1496
1497   sync = g_new0 (PtpPendingSync, 1);
1498   sync->domain = domain->domain;
1499   sync->sync_seqnum = msg->sequence_id;
1500   sync->sync_recv_time_local = receive_time;
1501   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1502   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1503   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1504   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1505   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1506
1507   /* 0.5 correction factor for division later */
1508   sync->correction_field_sync = msg->correction_field;
1509
1510   if ((msg->flag_field & 0x0200)) {
1511     /* Wait for FOLLOW_UP */
1512   } else {
1513     sync->sync_send_time_remote =
1514         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1515         sync.origin_timestamp);
1516
1517     if (domain->last_ptp_sync_time != 0
1518         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1519       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1520           GST_TIME_FORMAT, domain->domain,
1521           GST_TIME_ARGS (domain->last_ptp_sync_time),
1522           GST_TIME_ARGS (sync->sync_send_time_remote));
1523       ptp_pending_sync_free (sync);
1524       sync = NULL;
1525       return;
1526     }
1527     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1528
1529     if (send_delay_req (domain, sync)) {
1530       /* Sent delay request */
1531     } else {
1532       update_ptp_time (domain, sync);
1533       ptp_pending_sync_free (sync);
1534       sync = NULL;
1535     }
1536   }
1537
1538   if (sync)
1539     g_queue_push_tail (&domain->pending_syncs, sync);
1540 }
1541
1542 static void
1543 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1544 {
1545   GList *l;
1546   PtpDomainData *domain = NULL;
1547   PtpPendingSync *sync = NULL;
1548
1549   /* Don't consider messages with the alternate master flag set */
1550   if ((msg->flag_field & 0x0100))
1551     return;
1552
1553   for (l = domain_data; l; l = l->next) {
1554     PtpDomainData *tmp = l->data;
1555
1556     if (msg->domain_number == tmp->domain) {
1557       domain = tmp;
1558       break;
1559     }
1560   }
1561
1562   if (!domain)
1563     return;
1564
1565   /* If we have a master clock, ignore this message if it's not coming from there */
1566   if (domain->have_master_clock
1567       && compare_clock_identity (&domain->master_clock_identity,
1568           &msg->source_port_identity) != 0)
1569     return;
1570
1571   /* Check if we know about this one */
1572   for (l = domain->pending_syncs.head; l; l = l->next) {
1573     PtpPendingSync *tmp = l->data;
1574
1575     if (tmp->sync_seqnum == msg->sequence_id) {
1576       sync = tmp;
1577       break;
1578     }
1579   }
1580
1581   if (!sync)
1582     return;
1583
1584   /* Got a FOLLOW_UP for this already */
1585   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE)
1586     return;
1587
1588   if (sync->sync_recv_time_local >= receive_time) {
1589     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1590         GST_TIME_FORMAT, domain->domain,
1591         GST_TIME_ARGS (sync->sync_recv_time_local),
1592         GST_TIME_ARGS (receive_time));
1593     g_queue_remove (&domain->pending_syncs, sync);
1594     ptp_pending_sync_free (sync);
1595     return;
1596   }
1597
1598   sync->correction_field_sync += msg->correction_field;
1599   sync->sync_send_time_remote =
1600       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1601       follow_up.precise_origin_timestamp);
1602   sync->follow_up_recv_time_local = receive_time;
1603
1604   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1605     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1606         GST_TIME_FORMAT, domain->domain,
1607         GST_TIME_ARGS (domain->last_ptp_sync_time),
1608         GST_TIME_ARGS (sync->sync_send_time_remote));
1609     g_queue_remove (&domain->pending_syncs, sync);
1610     ptp_pending_sync_free (sync);
1611     sync = NULL;
1612     return;
1613   }
1614   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1615
1616   if (send_delay_req (domain, sync)) {
1617     /* Sent delay request */
1618   } else {
1619     update_ptp_time (domain, sync);
1620     g_queue_remove (&domain->pending_syncs, sync);
1621     ptp_pending_sync_free (sync);
1622     sync = NULL;
1623   }
1624 }
1625
1626 static void
1627 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1628 {
1629   GList *l;
1630   PtpDomainData *domain = NULL;
1631   PtpPendingSync *sync = NULL;
1632
1633   /* Don't consider messages with the alternate master flag set */
1634   if ((msg->flag_field & 0x0100))
1635     return;
1636
1637   for (l = domain_data; l; l = l->next) {
1638     PtpDomainData *tmp = l->data;
1639
1640     if (msg->domain_number == tmp->domain) {
1641       domain = tmp;
1642       break;
1643     }
1644   }
1645
1646   if (!domain)
1647     return;
1648
1649   /* If we have a master clock, ignore this message if it's not coming from there */
1650   if (domain->have_master_clock
1651       && compare_clock_identity (&domain->master_clock_identity,
1652           &msg->source_port_identity) != 0)
1653     return;
1654
1655   /* Not for us */
1656   if (msg->message_specific.delay_resp.
1657       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1658       || msg->message_specific.delay_resp.
1659       requesting_port_identity.port_number != ptp_clock_id.port_number)
1660     return;
1661
1662   domain->min_delay_req_interval =
1663       log2_to_clock_time (msg->log_message_interval);
1664
1665   /* Check if we know about this one */
1666   for (l = domain->pending_syncs.head; l; l = l->next) {
1667     PtpPendingSync *tmp = l->data;
1668
1669     if (tmp->delay_req_seqnum == msg->sequence_id) {
1670       sync = tmp;
1671       break;
1672     }
1673   }
1674
1675   if (!sync)
1676     return;
1677
1678   /* Got a DELAY_RESP for this already */
1679   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1680     return;
1681
1682   if (sync->delay_req_send_time_local > receive_time) {
1683     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1684         GST_TIME_FORMAT, domain->domain,
1685         GST_TIME_ARGS (sync->delay_req_send_time_local),
1686         GST_TIME_ARGS (receive_time));
1687     g_queue_remove (&domain->pending_syncs, sync);
1688     ptp_pending_sync_free (sync);
1689     return;
1690   }
1691
1692   sync->correction_field_delay = msg->correction_field;
1693
1694   sync->delay_req_recv_time_remote =
1695       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1696       delay_resp.receive_timestamp);
1697   sync->delay_resp_recv_time_local = receive_time;
1698
1699   if (domain->mean_path_delay != 0
1700       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1701     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1702         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1703         GST_TIME_ARGS (sync->sync_send_time_remote),
1704         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1705     g_queue_remove (&domain->pending_syncs, sync);
1706     ptp_pending_sync_free (sync);
1707     return;
1708   }
1709
1710   if (update_mean_path_delay (domain, sync))
1711     update_ptp_time (domain, sync);
1712   g_queue_remove (&domain->pending_syncs, sync);
1713   ptp_pending_sync_free (sync);
1714 }
1715
1716 static void
1717 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1718 {
1719   /* Ignore our own messages */
1720   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1721       msg->source_port_identity.port_number == ptp_clock_id.port_number)
1722     return;
1723
1724   switch (msg->message_type) {
1725     case PTP_MESSAGE_TYPE_ANNOUNCE:
1726       handle_announce_message (msg, receive_time);
1727       break;
1728     case PTP_MESSAGE_TYPE_SYNC:
1729       handle_sync_message (msg, receive_time);
1730       break;
1731     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1732       handle_follow_up_message (msg, receive_time);
1733       break;
1734     case PTP_MESSAGE_TYPE_DELAY_RESP:
1735       handle_delay_resp_message (msg, receive_time);
1736       break;
1737     default:
1738       break;
1739   }
1740 }
1741
1742 static gboolean
1743 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1744     gpointer user_data)
1745 {
1746   GIOStatus status;
1747   StdIOHeader header;
1748   gchar buffer[8192];
1749   GError *err = NULL;
1750   gsize read;
1751
1752   if ((condition & G_IO_STATUS_EOF)) {
1753     GST_ERROR ("Got EOF on stdin");
1754     g_main_loop_quit (main_loop);
1755     return G_SOURCE_REMOVE;
1756   }
1757
1758   status =
1759       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1760       &read, &err);
1761   if (status == G_IO_STATUS_ERROR) {
1762     GST_ERROR ("Failed to read from stdin: %s", err->message);
1763     g_clear_error (&err);
1764     g_main_loop_quit (main_loop);
1765     return G_SOURCE_REMOVE;
1766   } else if (status == G_IO_STATUS_EOF) {
1767     GST_ERROR ("Got EOF on stdin");
1768     g_main_loop_quit (main_loop);
1769     return G_SOURCE_REMOVE;
1770   } else if (status != G_IO_STATUS_NORMAL) {
1771     GST_ERROR ("Unexpected stdin read status: %d", status);
1772     g_main_loop_quit (main_loop);
1773     return G_SOURCE_REMOVE;
1774   } else if (read != sizeof (header)) {
1775     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1776     g_main_loop_quit (main_loop);
1777     return G_SOURCE_REMOVE;
1778   } else if (header.size > 8192) {
1779     GST_ERROR ("Unexpected size: %u", header.size);
1780     g_main_loop_quit (main_loop);
1781     return G_SOURCE_REMOVE;
1782   }
1783
1784   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1785   if (status == G_IO_STATUS_ERROR) {
1786     GST_ERROR ("Failed to read from stdin: %s", err->message);
1787     g_clear_error (&err);
1788     g_main_loop_quit (main_loop);
1789     return G_SOURCE_REMOVE;
1790   } else if (status == G_IO_STATUS_EOF) {
1791     GST_ERROR ("EOF on stdin");
1792     g_main_loop_quit (main_loop);
1793     return G_SOURCE_REMOVE;
1794   } else if (status != G_IO_STATUS_NORMAL) {
1795     GST_ERROR ("Unexpected stdin read status: %d", status);
1796     g_main_loop_quit (main_loop);
1797     return G_SOURCE_REMOVE;
1798   } else if (read != header.size) {
1799     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1800     g_main_loop_quit (main_loop);
1801     return G_SOURCE_REMOVE;
1802   }
1803
1804   switch (header.type) {
1805     case TYPE_EVENT:
1806     case TYPE_GENERAL:{
1807       GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
1808       PtpMessage msg;
1809
1810       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1811         dump_ptp_message (&msg);
1812         handle_ptp_message (&msg, receive_time);
1813       }
1814       break;
1815     }
1816     default:
1817     case TYPE_CLOCK_ID:{
1818       if (header.size != 8) {
1819         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1820         g_main_loop_quit (main_loop);
1821         return G_SOURCE_REMOVE;
1822       }
1823       g_mutex_lock (&ptp_lock);
1824       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1825       ptp_clock_id.port_number = getpid ();
1826       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1827           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1828       g_cond_signal (&ptp_cond);
1829       g_mutex_unlock (&ptp_lock);
1830       break;
1831     }
1832   }
1833
1834   return G_SOURCE_CONTINUE;
1835 }
1836
1837 /* Cleanup all announce messages and announce message senders
1838  * that are timed out by now, and clean up all pending syncs
1839  * that are missing their FOLLOW_UP or DELAY_RESP */
1840 static gboolean
1841 cleanup_cb (gpointer data)
1842 {
1843   GstClockTime now = gst_clock_get_time (observation_system_clock);
1844   GList *l, *m, *n;
1845
1846   for (l = domain_data; l; l = l->next) {
1847     PtpDomainData *domain = l->data;
1848
1849     for (n = domain->announce_senders; n;) {
1850       PtpAnnounceSender *sender = n->data;
1851       gboolean timed_out = TRUE;
1852
1853       /* Keep only 5 messages per sender around */
1854       while (g_queue_get_length (&sender->announce_messages) > 5) {
1855         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1856         g_free (msg);
1857       }
1858
1859       for (m = sender->announce_messages.head; m; m = m->next) {
1860         PtpAnnounceMessage *msg = m->data;
1861
1862         if (msg->receive_time +
1863             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1864           timed_out = FALSE;
1865           break;
1866         }
1867       }
1868
1869       if (timed_out) {
1870         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1871             sender->master_clock_identity.clock_identity,
1872             sender->master_clock_identity.port_number);
1873         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1874         g_queue_clear (&sender->announce_messages);
1875       }
1876
1877       if (g_queue_get_length (&sender->announce_messages) == 0) {
1878         GList *tmp = n->next;
1879
1880         if (compare_clock_identity (&sender->master_clock_identity,
1881                 &domain->master_clock_identity) == 0)
1882           GST_WARNING ("currently selected master clock timed out");
1883         g_free (sender);
1884         domain->announce_senders =
1885             g_list_delete_link (domain->announce_senders, n);
1886         n = tmp;
1887       } else {
1888         n = n->next;
1889       }
1890     }
1891     select_best_master_clock (domain, now);
1892
1893     /* Clean up any pending syncs */
1894     for (n = domain->pending_syncs.head; n;) {
1895       PtpPendingSync *sync = n->data;
1896       gboolean timed_out = FALSE;
1897
1898       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1899        * and pending delay reqs after 4 delay req intervals or 10 seconds
1900        */
1901       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1902           ((domain->min_delay_req_interval != 0
1903                   && sync->delay_req_send_time_local +
1904                   4 * domain->min_delay_req_interval < now)
1905               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1906         timed_out = TRUE;
1907       } else if ((domain->sync_interval != 0
1908               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
1909           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
1910         timed_out = TRUE;
1911       }
1912
1913       if (timed_out) {
1914         GList *tmp = n->next;
1915         ptp_pending_sync_free (sync);
1916         g_queue_delete_link (&domain->pending_syncs, n);
1917         n = tmp;
1918       } else {
1919         n = n->next;
1920       }
1921     }
1922   }
1923
1924   return G_SOURCE_CONTINUE;
1925 }
1926
1927 static gpointer
1928 ptp_helper_main (gpointer data)
1929 {
1930   GSource *cleanup_source;
1931
1932   GST_DEBUG ("Starting PTP helper loop");
1933
1934   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
1935   cleanup_source = g_timeout_source_new_seconds (5);
1936   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
1937   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
1938   g_source_attach (cleanup_source, main_context);
1939   g_source_unref (cleanup_source);
1940
1941   g_main_loop_run (main_loop);
1942   GST_DEBUG ("Stopped PTP helper loop");
1943
1944   g_mutex_lock (&ptp_lock);
1945   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
1946   ptp_clock_id.port_number = 0;
1947   initted = FALSE;
1948   g_cond_signal (&ptp_cond);
1949   g_mutex_unlock (&ptp_lock);
1950
1951   return NULL;
1952 }
1953
1954 /**
1955  * gst_ptp_is_supported:
1956  *
1957  * Check if PTP clocks are generally supported on this system, and if previous
1958  * initializations did not fail.
1959  *
1960  * Returns: %TRUE if PTP clocks are generally supported on this system, and
1961  * previous initializations did not fail.
1962  *
1963  * Since: 1.6
1964  */
1965 gboolean
1966 gst_ptp_is_supported (void)
1967 {
1968   return supported;
1969 }
1970
1971 /**
1972  * gst_ptp_is_initialized:
1973  *
1974  * Check if the GStreamer PTP clock subsystem is initialized.
1975  *
1976  * Returns: %TRUE if the GStreamer PTP clock subsystem is intialized.
1977  *
1978  * Since: 1.6
1979  */
1980 gboolean
1981 gst_ptp_is_initialized (void)
1982 {
1983   return initted;
1984 }
1985
1986 /**
1987  * gst_ptp_init:
1988  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
1989  * @interfaces: (transfer none) (array zero-terminated=1) (allow-none): network interfaces to run the clock on
1990  *
1991  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
1992  * slave-only mode for all domains on the given @interfaces with the
1993  * given @clock_id.
1994  *
1995  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
1996  * generated from the MAC address of the first network interface.
1997  *
1998  * This function is automatically called by gst_ptp_clock_new() with default
1999  * parameters if it wasn't called before.
2000  *
2001  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
2002  *
2003  * Since: 1.6
2004  */
2005 gboolean
2006 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
2007 {
2008   gboolean ret;
2009   const gchar *env;
2010   gchar **argv = NULL;
2011   gint argc, argc_c;
2012   gint fd_r, fd_w;
2013   GError *err = NULL;
2014   GSource *stdin_source;
2015
2016   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
2017
2018   g_mutex_lock (&ptp_lock);
2019   if (!supported) {
2020     GST_ERROR ("PTP not supported");
2021     ret = FALSE;
2022     goto done;
2023   }
2024
2025   if (initted) {
2026     GST_DEBUG ("PTP already initialized");
2027     ret = TRUE;
2028     goto done;
2029   }
2030
2031   if (ptp_helper_pid) {
2032     GST_DEBUG ("PTP currently initializing");
2033     goto wait;
2034   }
2035
2036   if (!domain_stats_hooks_initted) {
2037     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2038     domain_stats_hooks_initted = TRUE;
2039   }
2040
2041   argc = 1;
2042   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2043     argc += 2;
2044   if (interfaces != NULL)
2045     argc += 2 * g_strv_length (interfaces);
2046
2047   argv = g_new0 (gchar *, argc + 2);
2048   argc_c = 0;
2049
2050   env = g_getenv ("GST_PTP_HELPER_1_0");
2051   if (env == NULL)
2052     env = g_getenv ("GST_PTP_HELPER");
2053   if (env != NULL && *env != '\0') {
2054     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2055     argv[argc_c++] = g_strdup (env);
2056   } else {
2057     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2058   }
2059
2060   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2061     argv[argc_c++] = g_strdup ("-c");
2062     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2063   }
2064
2065   if (interfaces != NULL) {
2066     gchar **ptr = interfaces;
2067
2068     while (*ptr) {
2069       argv[argc_c++] = g_strdup ("-i");
2070       argv[argc_c++] = g_strdup (*ptr);
2071       ptr++;
2072     }
2073   }
2074
2075   main_context = g_main_context_new ();
2076   main_loop = g_main_loop_new (main_context, FALSE);
2077
2078   ptp_helper_thread =
2079       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2080   if (!ptp_helper_thread) {
2081     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2082     g_clear_error (&err);
2083     ret = FALSE;
2084     goto done;
2085   }
2086
2087   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2088           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2089     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2090     g_clear_error (&err);
2091     ret = FALSE;
2092     supported = FALSE;
2093     goto done;
2094   }
2095
2096   stdin_channel = g_io_channel_unix_new (fd_r);
2097   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2098   g_io_channel_set_buffered (stdin_channel, FALSE);
2099   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2100   stdin_source =
2101       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2102   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2103   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2104       NULL);
2105   g_source_attach (stdin_source, main_context);
2106   g_source_unref (stdin_source);
2107
2108   /* Create stdout channel */
2109   stdout_channel = g_io_channel_unix_new (fd_w);
2110   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2111   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2112   g_io_channel_set_buffered (stdout_channel, FALSE);
2113
2114   delay_req_rand = g_rand_new ();
2115   observation_system_clock =
2116       g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
2117       NULL);
2118   gst_object_ref_sink (observation_system_clock);
2119
2120   initted = TRUE;
2121
2122 wait:
2123   GST_DEBUG ("Waiting for PTP to be initialized");
2124
2125   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2126     g_cond_wait (&ptp_cond, &ptp_lock);
2127
2128   ret = initted;
2129   if (ret) {
2130     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2131         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2132   } else {
2133     GST_ERROR ("Failed to initialize");
2134     supported = FALSE;
2135   }
2136
2137 done:
2138   g_strfreev (argv);
2139
2140   if (!ret) {
2141     if (ptp_helper_pid) {
2142 #ifndef G_OS_WIN32
2143       kill (ptp_helper_pid, SIGKILL);
2144       waitpid (ptp_helper_pid, NULL, 0);
2145 #else
2146       TerminateProcess (ptp_helper_pid, 1);
2147       WaitForSingleObject (ptp_helper_pid, INFINITE);
2148 #endif
2149       g_spawn_close_pid (ptp_helper_pid);
2150     }
2151     ptp_helper_pid = 0;
2152
2153     if (stdin_channel)
2154       g_io_channel_unref (stdin_channel);
2155     stdin_channel = NULL;
2156     if (stdout_channel)
2157       g_io_channel_unref (stdout_channel);
2158     stdout_channel = NULL;
2159
2160     if (main_loop && ptp_helper_thread) {
2161       g_main_loop_quit (main_loop);
2162       g_thread_join (ptp_helper_thread);
2163     }
2164     ptp_helper_thread = NULL;
2165     if (main_loop)
2166       g_main_loop_unref (main_loop);
2167     main_loop = NULL;
2168     if (main_context)
2169       g_main_context_unref (main_context);
2170     main_context = NULL;
2171
2172     if (delay_req_rand)
2173       g_rand_free (delay_req_rand);
2174     delay_req_rand = NULL;
2175
2176     if (observation_system_clock)
2177       gst_object_unref (observation_system_clock);
2178     observation_system_clock = NULL;
2179   }
2180
2181   g_mutex_unlock (&ptp_lock);
2182
2183   return ret;
2184 }
2185
2186 /**
2187  * gst_ptp_deinit:
2188  *
2189  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2190  * are any remaining GstPtpClock instances, they won't be further synchronized
2191  * to the PTP network clock.
2192  *
2193  * Since: 1.6
2194  */
2195 void
2196 gst_ptp_deinit (void)
2197 {
2198   GList *l, *m;
2199
2200   g_mutex_lock (&ptp_lock);
2201
2202   if (ptp_helper_pid) {
2203 #ifndef G_OS_WIN32
2204     kill (ptp_helper_pid, SIGKILL);
2205     waitpid (ptp_helper_pid, NULL, 0);
2206 #else
2207     TerminateProcess (ptp_helper_pid, 1);
2208     WaitForSingleObject (ptp_helper_pid, INFINITE);
2209 #endif
2210     g_spawn_close_pid (ptp_helper_pid);
2211   }
2212   ptp_helper_pid = 0;
2213
2214   if (stdin_channel)
2215     g_io_channel_unref (stdin_channel);
2216   stdin_channel = NULL;
2217   if (stdout_channel)
2218     g_io_channel_unref (stdout_channel);
2219   stdout_channel = NULL;
2220
2221   if (main_loop && ptp_helper_thread) {
2222     GThread *tmp = ptp_helper_thread;
2223     ptp_helper_thread = NULL;
2224     g_mutex_unlock (&ptp_lock);
2225     g_main_loop_quit (main_loop);
2226     g_thread_join (tmp);
2227     g_mutex_lock (&ptp_lock);
2228   }
2229   if (main_loop)
2230     g_main_loop_unref (main_loop);
2231   main_loop = NULL;
2232   if (main_context)
2233     g_main_context_unref (main_context);
2234   main_context = NULL;
2235
2236   if (delay_req_rand)
2237     g_rand_free (delay_req_rand);
2238   delay_req_rand = NULL;
2239   if (observation_system_clock)
2240     gst_object_unref (observation_system_clock);
2241   observation_system_clock = NULL;
2242
2243   for (l = domain_data; l; l = l->next) {
2244     PtpDomainData *domain = l->data;
2245
2246     for (m = domain->announce_senders; m; m = m->next) {
2247       PtpAnnounceSender *sender = m->data;
2248
2249       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2250       g_queue_clear (&sender->announce_messages);
2251       g_free (sender);
2252     }
2253     g_list_free (domain->announce_senders);
2254
2255     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2256         NULL);
2257     g_queue_clear (&domain->pending_syncs);
2258     gst_object_unref (domain->domain_clock);
2259     g_free (domain);
2260   }
2261   g_list_free (domain_data);
2262   domain_data = NULL;
2263   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2264   g_list_free (domain_clocks);
2265   domain_clocks = NULL;
2266
2267   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2268   ptp_clock_id.port_number = 0;
2269
2270   initted = FALSE;
2271
2272   g_mutex_unlock (&ptp_lock);
2273 }
2274
2275 #define DEFAULT_DOMAIN 0
2276
2277 enum
2278 {
2279   PROP_0,
2280   PROP_DOMAIN,
2281   PROP_INTERNAL_CLOCK,
2282   PROP_MASTER_CLOCK_ID,
2283   PROP_GRANDMASTER_CLOCK_ID
2284 };
2285
2286 #define GST_PTP_CLOCK_GET_PRIVATE(obj)  \
2287   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PTP_CLOCK, GstPtpClockPrivate))
2288
2289 struct _GstPtpClockPrivate
2290 {
2291   guint domain;
2292   GstClock *domain_clock;
2293   gulong domain_stats_id;
2294 };
2295
2296 #define gst_ptp_clock_parent_class parent_class
2297 G_DEFINE_TYPE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2298
2299 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2300     const GValue * value, GParamSpec * pspec);
2301 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2302     GValue * value, GParamSpec * pspec);
2303 static void gst_ptp_clock_finalize (GObject * object);
2304
2305 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2306
2307 static void
2308 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2309 {
2310   GObjectClass *gobject_class;
2311   GstClockClass *clock_class;
2312
2313   gobject_class = G_OBJECT_CLASS (klass);
2314   clock_class = GST_CLOCK_CLASS (klass);
2315
2316   g_type_class_add_private (klass, sizeof (GstPtpClockPrivate));
2317
2318   gobject_class->finalize = gst_ptp_clock_finalize;
2319   gobject_class->get_property = gst_ptp_clock_get_property;
2320   gobject_class->set_property = gst_ptp_clock_set_property;
2321
2322   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2323       g_param_spec_uint ("domain", "Domain",
2324           "The PTP domain", 0, G_MAXUINT8,
2325           DEFAULT_DOMAIN,
2326           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2327
2328   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2329       g_param_spec_object ("internal-clock", "Internal Clock",
2330           "Internal clock", GST_TYPE_CLOCK,
2331           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2332
2333   g_object_class_install_property (gobject_class, PROP_MASTER_CLOCK_ID,
2334       g_param_spec_uint64 ("master-clock-id", "Master Clock ID",
2335           "Master Clock ID", 0, G_MAXUINT64, 0,
2336           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2337
2338   g_object_class_install_property (gobject_class, PROP_GRANDMASTER_CLOCK_ID,
2339       g_param_spec_uint64 ("grandmaster-clock-id", "Grand Master Clock ID",
2340           "Grand Master Clock ID", 0, G_MAXUINT64, 0,
2341           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2342
2343   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2344 }
2345
2346 static void
2347 gst_ptp_clock_init (GstPtpClock * self)
2348 {
2349   GstPtpClockPrivate *priv;
2350
2351   self->priv = priv = GST_PTP_CLOCK_GET_PRIVATE (self);
2352
2353   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2354   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2355
2356   priv->domain = DEFAULT_DOMAIN;
2357 }
2358
2359 static gboolean
2360 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2361 {
2362   gboolean got_clock = TRUE;
2363
2364   if (G_UNLIKELY (!self->priv->domain_clock)) {
2365     g_mutex_lock (&domain_clocks_lock);
2366     if (!self->priv->domain_clock) {
2367       GList *l;
2368
2369       got_clock = FALSE;
2370
2371       for (l = domain_clocks; l; l = l->next) {
2372         PtpDomainData *clock_data = l->data;
2373
2374         if (clock_data->domain == self->priv->domain
2375             && clock_data->last_ptp_time != 0) {
2376           self->priv->domain_clock = clock_data->domain_clock;
2377           got_clock = TRUE;
2378           break;
2379         }
2380       }
2381     }
2382     g_mutex_unlock (&domain_clocks_lock);
2383     if (got_clock) {
2384       g_object_notify (G_OBJECT (self), "internal-clock");
2385       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2386     }
2387   }
2388
2389   return got_clock;
2390 }
2391
2392 static gboolean
2393 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2394     gpointer user_data)
2395 {
2396   GstPtpClock *self = user_data;
2397
2398   if (domain != self->priv->domain
2399       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2400     return TRUE;
2401
2402   /* Let's set our internal clock */
2403   if (!gst_ptp_clock_ensure_domain_clock (self))
2404     return TRUE;
2405
2406   self->priv->domain_stats_id = 0;
2407
2408   return FALSE;
2409 }
2410
2411 static void
2412 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2413     const GValue * value, GParamSpec * pspec)
2414 {
2415   GstPtpClock *self = GST_PTP_CLOCK (object);
2416
2417   switch (prop_id) {
2418     case PROP_DOMAIN:
2419       self->priv->domain = g_value_get_uint (value);
2420       gst_ptp_clock_ensure_domain_clock (self);
2421       if (!self->priv->domain_clock)
2422         self->priv->domain_stats_id =
2423             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2424             NULL);
2425       break;
2426     default:
2427       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2428       break;
2429   }
2430 }
2431
2432 static void
2433 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2434     GValue * value, GParamSpec * pspec)
2435 {
2436   GstPtpClock *self = GST_PTP_CLOCK (object);
2437
2438   switch (prop_id) {
2439     case PROP_DOMAIN:
2440       g_value_set_uint (value, self->priv->domain);
2441       break;
2442     case PROP_INTERNAL_CLOCK:
2443       gst_ptp_clock_ensure_domain_clock (self);
2444       g_value_set_object (value, self->priv->domain_clock);
2445       break;
2446     case PROP_MASTER_CLOCK_ID:
2447     case PROP_GRANDMASTER_CLOCK_ID:{
2448       GList *l;
2449
2450       g_mutex_lock (&domain_clocks_lock);
2451       g_value_set_uint64 (value, 0);
2452
2453       for (l = domain_clocks; l; l = l->next) {
2454         PtpDomainData *clock_data = l->data;
2455
2456         if (clock_data->domain == self->priv->domain) {
2457           if (prop_id == PROP_MASTER_CLOCK_ID)
2458             g_value_set_uint64 (value,
2459                 clock_data->master_clock_identity.clock_identity);
2460           else
2461             g_value_set_uint64 (value, clock_data->grandmaster_identity);
2462           break;
2463         }
2464       }
2465       g_mutex_unlock (&domain_clocks_lock);
2466       break;
2467     }
2468     default:
2469       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2470       break;
2471   }
2472 }
2473
2474 static void
2475 gst_ptp_clock_finalize (GObject * object)
2476 {
2477   GstPtpClock *self = GST_PTP_CLOCK (object);
2478
2479   if (self->priv->domain_stats_id)
2480     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2481
2482   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2483 }
2484
2485 static GstClockTime
2486 gst_ptp_clock_get_internal_time (GstClock * clock)
2487 {
2488   GstPtpClock *self = GST_PTP_CLOCK (clock);
2489
2490   gst_ptp_clock_ensure_domain_clock (self);
2491
2492   if (!self->priv->domain_clock) {
2493     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2494         self->priv->domain);
2495     return GST_CLOCK_TIME_NONE;
2496   }
2497
2498   return gst_clock_get_time (self->priv->domain_clock);
2499 }
2500
2501 /**
2502  * gst_ptp_clock_new:
2503  * @name: Name of the clock
2504  * @domain: PTP domain
2505  *
2506  * Creates a new PTP clock instance that exports the PTP time of the master
2507  * clock in @domain. This clock can be slaved to other clocks as needed.
2508  *
2509  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2510  * default parameters.
2511  *
2512  * This clock only returns valid timestamps after it received the first
2513  * times from the PTP master clock on the network. Once this happens the
2514  * GstPtpClock::internal-clock property will become non-NULL. You can
2515  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
2516  * gst_clock_is_synced().
2517  *
2518  * Returns: (transfer full): A new #GstClock
2519  *
2520  * Since: 1.6
2521  */
2522 GstClock *
2523 gst_ptp_clock_new (const gchar * name, guint domain)
2524 {
2525   GstClock *clock;
2526
2527   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2528
2529   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2530     GST_ERROR ("Failed to initialize PTP");
2531     return NULL;
2532   }
2533
2534   clock = g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2535       NULL);
2536
2537   /* Clear floating flag */
2538   gst_object_ref_sink (clock);
2539
2540   return clock;
2541 }
2542
2543 typedef struct
2544 {
2545   guint8 domain;
2546   const GstStructure *stats;
2547 } DomainStatsMarshalData;
2548
2549 static void
2550 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2551 {
2552   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2553
2554   if (!callback (data->domain, data->stats, hook->data))
2555     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2556 }
2557
2558 static void
2559 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2560 {
2561   DomainStatsMarshalData data = { domain, stats };
2562
2563   g_mutex_lock (&ptp_lock);
2564   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2565       (GHookMarshaller) domain_stats_marshaller, &data);
2566   g_mutex_unlock (&ptp_lock);
2567 }
2568
2569 /**
2570  * gst_ptp_statistics_callback_add:
2571  * @callback: GstPtpStatisticsCallback to call
2572  * @user_data: Data to pass to the callback
2573  * @destroy_data: GDestroyNotify to destroy the data
2574  *
2575  * Installs a new statistics callback for gathering PTP statistics. See
2576  * GstPtpStatisticsCallback for a list of statistics that are provided.
2577  *
2578  * Returns: Id for the callback that can be passed to
2579  * gst_ptp_statistics_callback_remove()
2580  *
2581  * Since: 1.6
2582  */
2583 gulong
2584 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2585     gpointer user_data, GDestroyNotify destroy_data)
2586 {
2587   GHook *hook;
2588
2589   g_mutex_lock (&ptp_lock);
2590
2591   if (!domain_stats_hooks_initted) {
2592     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2593     domain_stats_hooks_initted = TRUE;
2594   }
2595
2596   hook = g_hook_alloc (&domain_stats_hooks);
2597   hook->func = callback;
2598   hook->data = user_data;
2599   hook->destroy = destroy_data;
2600   g_hook_prepend (&domain_stats_hooks, hook);
2601   g_atomic_int_add (&domain_stats_n_hooks, 1);
2602
2603   g_mutex_unlock (&ptp_lock);
2604
2605   return hook->hook_id;
2606 }
2607
2608 /**
2609  * gst_ptp_statistics_callback_remove:
2610  * @id: Callback id to remove
2611  *
2612  * Removes a PTP statistics callback that was previously added with
2613  * gst_ptp_statistics_callback_add().
2614  *
2615  * Since: 1.6
2616  */
2617 void
2618 gst_ptp_statistics_callback_remove (gulong id)
2619 {
2620   g_mutex_lock (&ptp_lock);
2621   if (g_hook_destroy (&domain_stats_hooks, id))
2622     g_atomic_int_add (&domain_stats_n_hooks, -1);
2623   g_mutex_unlock (&ptp_lock);
2624 }