ptpclock: Use the current path delay for calculation the local/remote clock times
[platform/upstream/gstreamer.git] / libs / gst / net / gstptpclock.c
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @short_description: Special clock that synchronizes to a remote time
23  *                     provider via PTP (IEEE1588:2008).
24  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
25  *
26  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
27  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
28  * clock in some specific domain.
29  *
30  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
31  * a helper process to do the actual communication via the PTP ports. This is
32  * required as PTP listens on ports < 1024 and thus requires special
33  * privileges. Once this helper process is started, the main process will
34  * synchronize to all PTP domains that are detected on the selected
35  * interfaces.
36  *
37  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
38  * time from a master clock inside a specific PTP domain. This clock will only
39  * return valid timestamps once the timestamps in the PTP domain are known. To
40  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
41  * signal and gst_clock_is_synced().
42  *
43  *
44  * To gather statistics about the PTP clock synchronization,
45  * gst_ptp_statistics_callback_add() can be used. This gives the application
46  * the possibility to collect all kinds of statistics from the clock
47  * synchronization.
48  *
49  * Since: 1.6
50  *
51  */
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include "gstptpclock.h"
57
58 #ifdef HAVE_PTP
59
60 #include "gstptp_private.h"
61
62 #include <sys/wait.h>
63 #include <sys/types.h>
64 #include <unistd.h>
65
66 #include <gst/base/base.h>
67
68 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
69 #define GST_CAT_DEFAULT (ptp_debug)
70
71 /* IEEE 1588 7.7.3.1 */
72 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
73
74 /* Use a running average for calculating the mean path delay instead
75  * of just using the last measurement. Enabling this helps in unreliable
76  * networks, like wifi, with often changing delays
77  *
78  * Undef for following IEEE1588-2008 by the letter
79  */
80 #define USE_RUNNING_AVERAGE_DELAY 1
81
82 /* Filter out any measurements that are above a certain threshold compared to
83  * previous measurements. Enabling this helps filtering out outliers that
84  * happen fairly often in unreliable networks, like wifi.
85  *
86  * Undef for following IEEE1588-2008 by the letter
87  */
88 #define USE_MEASUREMENT_FILTERING 1
89
90 /* Select the first clock from which we capture a SYNC message as the master
91  * clock of the domain until we are ready to run the best master clock
92  * algorithm. This allows faster syncing but might mean a change of the master
93  * clock in the beginning. As all clocks in a domain are supposed to use the
94  * same time, this shouldn't be much of a problem.
95  *
96  * Undef for following IEEE1588-2008 by the letter
97  */
98 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
99
100 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
101  * afterwards. This allows better synchronization in networks with varying
102  * delays, as for every other SYNC message we would have to assume that it's
103  * the average of what we saw before. But that might be completely off
104  */
105 #define USE_ONLY_SYNC_WITH_DELAY 1
106
107 /* Filter out delay measurements that are too far away from the median of the
108  * last delay measurements, currently those that are more than 2 times as big.
109  * This increases accuracy a lot on wifi.
110  */
111 #define USE_MEDIAN_PRE_FILTERING 1
112 #define MEDIAN_PRE_FILTERING_WINDOW 9
113
114 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
115 #define MAX_SKIPPED_UPDATES 5
116
117 typedef enum
118 {
119   PTP_MESSAGE_TYPE_SYNC = 0x0,
120   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
121   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
122   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
123   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
124   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
125   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
126   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
127   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
128   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
129 } PtpMessageType;
130
131 typedef struct
132 {
133   guint64 seconds_field;        /* 48 bits valid */
134   guint32 nanoseconds_field;
135 } PtpTimestamp;
136
137 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
138 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
139 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
140
141 typedef struct
142 {
143   guint64 clock_identity;
144   guint16 port_number;
145 } PtpClockIdentity;
146
147 static gint
148 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
149 {
150   if (a->clock_identity < b->clock_identity)
151     return -1;
152   else if (a->clock_identity > b->clock_identity)
153     return 1;
154
155   if (a->port_number < b->port_number)
156     return -1;
157   else if (a->port_number > b->port_number)
158     return 1;
159
160   return 0;
161 }
162
163 typedef struct
164 {
165   guint8 clock_class;
166   guint8 clock_accuracy;
167   guint16 offset_scaled_log_variance;
168 } PtpClockQuality;
169
170 typedef struct
171 {
172   guint8 transport_specific;
173   PtpMessageType message_type;
174   /* guint8 reserved; */
175   guint8 version_ptp;
176   guint16 message_length;
177   guint8 domain_number;
178   /* guint8 reserved; */
179   guint16 flag_field;
180   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
181   /* guint32 reserved; */
182   PtpClockIdentity source_port_identity;
183   guint16 sequence_id;
184   guint8 control_field;
185   gint8 log_message_interval;
186
187   union
188   {
189     struct
190     {
191       PtpTimestamp origin_timestamp;
192       gint16 current_utc_offset;
193       /* guint8 reserved; */
194       guint8 grandmaster_priority_1;
195       PtpClockQuality grandmaster_clock_quality;
196       guint8 grandmaster_priority_2;
197       guint64 grandmaster_identity;
198       guint16 steps_removed;
199       guint8 time_source;
200     } announce;
201
202     struct
203     {
204       PtpTimestamp origin_timestamp;
205     } sync;
206
207     struct
208     {
209       PtpTimestamp precise_origin_timestamp;
210     } follow_up;
211
212     struct
213     {
214       PtpTimestamp origin_timestamp;
215     } delay_req;
216
217     struct
218     {
219       PtpTimestamp receive_timestamp;
220       PtpClockIdentity requesting_port_identity;
221     } delay_resp;
222
223   } message_specific;
224 } PtpMessage;
225
226 static GMutex ptp_lock;
227 static GCond ptp_cond;
228 static gboolean initted = FALSE;
229 static gboolean supported = TRUE;
230 static GPid ptp_helper_pid;
231 static GThread *ptp_helper_thread;
232 static GMainContext *main_context;
233 static GMainLoop *main_loop;
234 static GIOChannel *stdin_channel, *stdout_channel;
235 static GRand *delay_req_rand;
236 static GstClock *observation_system_clock;
237 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
238
239 typedef struct
240 {
241   GstClockTime receive_time;
242
243   PtpClockIdentity master_clock_identity;
244
245   guint8 grandmaster_priority_1;
246   PtpClockQuality grandmaster_clock_quality;
247   guint8 grandmaster_priority_2;
248   guint64 grandmaster_identity;
249   guint16 steps_removed;
250   guint8 time_source;
251
252   guint16 sequence_id;
253 } PtpAnnounceMessage;
254
255 typedef struct
256 {
257   PtpClockIdentity master_clock_identity;
258
259   GstClockTime announce_interval;       /* last interval we received */
260   GQueue announce_messages;
261 } PtpAnnounceSender;
262
263 typedef struct
264 {
265   guint domain;
266   PtpClockIdentity master_clock_identity;
267
268   guint16 sync_seqnum;
269   GstClockTime sync_recv_time_local;    /* t2 */
270   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
271   GstClockTime follow_up_recv_time_local;
272
273   GSource *timeout_source;
274   guint16 delay_req_seqnum;
275   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
276   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
277   GstClockTime delay_resp_recv_time_local;
278
279   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
280   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
281 } PtpPendingSync;
282
283 static void
284 ptp_pending_sync_free (PtpPendingSync * sync)
285 {
286   if (sync->timeout_source)
287     g_source_destroy (sync->timeout_source);
288   g_free (sync);
289 }
290
291 typedef struct
292 {
293   guint domain;
294
295   GstClockTime last_ptp_time;
296   GstClockTime last_local_time;
297   gint skipped_updates;
298
299   /* Used for selecting the master/grandmaster */
300   GList *announce_senders;
301
302   /* Last selected master clock */
303   gboolean have_master_clock;
304   PtpClockIdentity master_clock_identity;
305   guint64 grandmaster_identity;
306
307   /* Last SYNC or FOLLOW_UP timestamp we received */
308   GstClockTime last_ptp_sync_time;
309   GstClockTime sync_interval;
310
311   GstClockTime mean_path_delay;
312   GstClockTime last_delay_req, min_delay_req_interval;
313   guint16 last_delay_req_seqnum;
314
315   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
316   gint last_path_delays_missing;
317
318   GQueue pending_syncs;
319
320   GstClock *domain_clock;
321 } PtpDomainData;
322
323 static GList *domain_data;
324 static GMutex domain_clocks_lock;
325 static GList *domain_clocks;
326
327 /* Protected by PTP lock */
328 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
329 static GHookList domain_stats_hooks;
330 static gint domain_stats_n_hooks;
331 static gboolean domain_stats_hooks_initted = FALSE;
332
333 /* Converts log2 seconds to GstClockTime */
334 static GstClockTime
335 log2_to_clock_time (gint l)
336 {
337   if (l < 0)
338     return GST_SECOND >> (-l);
339   else
340     return GST_SECOND << l;
341 }
342
343 static void
344 dump_ptp_message (PtpMessage * msg)
345 {
346   GST_TRACE ("PTP message:");
347   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
348   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
349   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
350   GST_TRACE ("\tmessage_length: %u", msg->message_length);
351   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
352   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
353   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
354       (msg->correction_field / 65536),
355       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
356   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
357       msg->source_port_identity.clock_identity,
358       msg->source_port_identity.port_number);
359   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
360   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
361   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
362       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
363
364   switch (msg->message_type) {
365     case PTP_MESSAGE_TYPE_ANNOUNCE:
366       GST_TRACE ("\tANNOUNCE:");
367       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
368           msg->message_specific.announce.origin_timestamp.seconds_field,
369           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
370       GST_TRACE ("\t\tcurrent_utc_offset: %d",
371           msg->message_specific.announce.current_utc_offset);
372       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
373           msg->message_specific.announce.grandmaster_priority_1);
374       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
375           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
376           msg->message_specific.announce.
377           grandmaster_clock_quality.clock_accuracy,
378           msg->message_specific.announce.
379           grandmaster_clock_quality.offset_scaled_log_variance);
380       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
381           msg->message_specific.announce.grandmaster_priority_2);
382       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
383           msg->message_specific.announce.grandmaster_identity);
384       GST_TRACE ("\t\tsteps_removed: %u",
385           msg->message_specific.announce.steps_removed);
386       GST_TRACE ("\t\ttime_source: 0x%02x",
387           msg->message_specific.announce.time_source);
388       break;
389     case PTP_MESSAGE_TYPE_SYNC:
390       GST_TRACE ("\tSYNC:");
391       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
392           msg->message_specific.sync.origin_timestamp.seconds_field,
393           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
394       break;
395     case PTP_MESSAGE_TYPE_FOLLOW_UP:
396       GST_TRACE ("\tFOLLOW_UP:");
397       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
398           msg->message_specific.follow_up.
399           precise_origin_timestamp.seconds_field,
400           msg->message_specific.follow_up.
401           precise_origin_timestamp.nanoseconds_field);
402       break;
403     case PTP_MESSAGE_TYPE_DELAY_REQ:
404       GST_TRACE ("\tDELAY_REQ:");
405       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
406           msg->message_specific.delay_req.origin_timestamp.seconds_field,
407           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
408       break;
409     case PTP_MESSAGE_TYPE_DELAY_RESP:
410       GST_TRACE ("\tDELAY_RESP:");
411       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
412           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
413           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
414       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
415           "x %u",
416           msg->message_specific.delay_resp.
417           requesting_port_identity.clock_identity,
418           msg->message_specific.delay_resp.
419           requesting_port_identity.port_number);
420       break;
421     default:
422       break;
423   }
424   GST_TRACE (" ");
425 }
426
427 /* IEEE 1588-2008 5.3.3 */
428 static gboolean
429 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
430 {
431   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
432
433   timestamp->seconds_field =
434       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
435       gst_byte_reader_get_uint16_be_unchecked (reader);
436   timestamp->nanoseconds_field =
437       gst_byte_reader_get_uint32_be_unchecked (reader);
438
439   if (timestamp->nanoseconds_field >= 1000000000)
440     return FALSE;
441
442   return TRUE;
443 }
444
445 /* IEEE 1588-2008 13.3 */
446 static gboolean
447 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
448 {
449   guint8 b;
450
451   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
452
453   b = gst_byte_reader_get_uint8_unchecked (reader);
454   msg->transport_specific = b >> 4;
455   msg->message_type = b & 0x0f;
456
457   b = gst_byte_reader_get_uint8_unchecked (reader);
458   msg->version_ptp = b & 0x0f;
459   if (msg->version_ptp != 2) {
460     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
461     return FALSE;
462   }
463
464   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
465   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
466     GST_WARNING ("Not enough data (%u < %u)",
467         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
468     return FALSE;
469   }
470
471   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
472   gst_byte_reader_skip_unchecked (reader, 1);
473
474   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
475   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
476   gst_byte_reader_skip_unchecked (reader, 4);
477
478   msg->source_port_identity.clock_identity =
479       gst_byte_reader_get_uint64_be_unchecked (reader);
480   msg->source_port_identity.port_number =
481       gst_byte_reader_get_uint16_be_unchecked (reader);
482
483   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
484   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
485   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
486
487   return TRUE;
488 }
489
490 /* IEEE 1588-2008 13.5 */
491 static gboolean
492 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
493 {
494   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
495
496   if (gst_byte_reader_get_remaining (reader) < 20)
497     return FALSE;
498
499   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
500           reader))
501     return FALSE;
502
503   msg->message_specific.announce.current_utc_offset =
504       gst_byte_reader_get_uint16_be_unchecked (reader);
505   gst_byte_reader_skip_unchecked (reader, 1);
506
507   msg->message_specific.announce.grandmaster_priority_1 =
508       gst_byte_reader_get_uint8_unchecked (reader);
509   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
510       gst_byte_reader_get_uint8_unchecked (reader);
511   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
512       gst_byte_reader_get_uint8_unchecked (reader);
513   msg->message_specific.announce.
514       grandmaster_clock_quality.offset_scaled_log_variance =
515       gst_byte_reader_get_uint16_be_unchecked (reader);
516   msg->message_specific.announce.grandmaster_priority_2 =
517       gst_byte_reader_get_uint8_unchecked (reader);
518   msg->message_specific.announce.grandmaster_identity =
519       gst_byte_reader_get_uint64_be_unchecked (reader);
520   msg->message_specific.announce.steps_removed =
521       gst_byte_reader_get_uint16_be_unchecked (reader);
522   msg->message_specific.announce.time_source =
523       gst_byte_reader_get_uint8_unchecked (reader);
524
525   return TRUE;
526 }
527
528 /* IEEE 1588-2008 13.6 */
529 static gboolean
530 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
531 {
532   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
533
534   if (gst_byte_reader_get_remaining (reader) < 10)
535     return FALSE;
536
537   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
538           reader))
539     return FALSE;
540
541   return TRUE;
542 }
543
544 /* IEEE 1588-2008 13.6 */
545 static gboolean
546 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
547 {
548   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
549
550   if (gst_byte_reader_get_remaining (reader) < 10)
551     return FALSE;
552
553   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
554           reader))
555     return FALSE;
556
557   return TRUE;
558 }
559
560 /* IEEE 1588-2008 13.7 */
561 static gboolean
562 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
563 {
564   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
565
566   if (gst_byte_reader_get_remaining (reader) < 10)
567     return FALSE;
568
569   if (!parse_ptp_timestamp (&msg->message_specific.
570           follow_up.precise_origin_timestamp, reader))
571     return FALSE;
572
573   return TRUE;
574 }
575
576 /* IEEE 1588-2008 13.8 */
577 static gboolean
578 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
579 {
580   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
581       FALSE);
582
583   if (gst_byte_reader_get_remaining (reader) < 20)
584     return FALSE;
585
586   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
587           reader))
588     return FALSE;
589
590   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
591       gst_byte_reader_get_uint64_be_unchecked (reader);
592   msg->message_specific.delay_resp.requesting_port_identity.port_number =
593       gst_byte_reader_get_uint16_be_unchecked (reader);
594
595   return TRUE;
596 }
597
598 static gboolean
599 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
600 {
601   GstByteReader reader;
602   gboolean ret = FALSE;
603
604   gst_byte_reader_init (&reader, data, size);
605
606   if (!parse_ptp_message_header (msg, &reader)) {
607     GST_WARNING ("Failed to parse PTP message header");
608     return FALSE;
609   }
610
611   switch (msg->message_type) {
612     case PTP_MESSAGE_TYPE_SYNC:
613       ret = parse_ptp_message_sync (msg, &reader);
614       break;
615     case PTP_MESSAGE_TYPE_FOLLOW_UP:
616       ret = parse_ptp_message_follow_up (msg, &reader);
617       break;
618     case PTP_MESSAGE_TYPE_DELAY_REQ:
619       ret = parse_ptp_message_delay_req (msg, &reader);
620       break;
621     case PTP_MESSAGE_TYPE_DELAY_RESP:
622       ret = parse_ptp_message_delay_resp (msg, &reader);
623       break;
624     case PTP_MESSAGE_TYPE_ANNOUNCE:
625       ret = parse_ptp_message_announce (msg, &reader);
626       break;
627     default:
628       /* ignore for now */
629       break;
630   }
631
632   return ret;
633 }
634
635 static gint
636 compare_announce_message (const PtpAnnounceMessage * a,
637     const PtpAnnounceMessage * b)
638 {
639   /* IEEE 1588 Figure 27 */
640   if (a->grandmaster_identity == b->grandmaster_identity) {
641     if (a->steps_removed + 1 < b->steps_removed)
642       return -1;
643     else if (a->steps_removed > b->steps_removed + 1)
644       return 1;
645
646     /* Error cases are filtered out earlier */
647     if (a->steps_removed < b->steps_removed)
648       return -1;
649     else if (a->steps_removed > b->steps_removed)
650       return 1;
651
652     /* Error cases are filtered out earlier */
653     if (a->master_clock_identity.clock_identity <
654         b->master_clock_identity.clock_identity)
655       return -1;
656     else if (a->master_clock_identity.clock_identity >
657         b->master_clock_identity.clock_identity)
658       return 1;
659
660     /* Error cases are filtered out earlier */
661     if (a->master_clock_identity.port_number <
662         b->master_clock_identity.port_number)
663       return -1;
664     else if (a->master_clock_identity.port_number >
665         b->master_clock_identity.port_number)
666       return 1;
667     else
668       g_assert_not_reached ();
669
670     return 0;
671   }
672
673   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
674     return -1;
675   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
676     return 1;
677
678   if (a->grandmaster_clock_quality.clock_class <
679       b->grandmaster_clock_quality.clock_class)
680     return -1;
681   else if (a->grandmaster_clock_quality.clock_class >
682       b->grandmaster_clock_quality.clock_class)
683     return 1;
684
685   if (a->grandmaster_clock_quality.clock_accuracy <
686       b->grandmaster_clock_quality.clock_accuracy)
687     return -1;
688   else if (a->grandmaster_clock_quality.clock_accuracy >
689       b->grandmaster_clock_quality.clock_accuracy)
690     return 1;
691
692   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
693       b->grandmaster_clock_quality.offset_scaled_log_variance)
694     return -1;
695   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
696       b->grandmaster_clock_quality.offset_scaled_log_variance)
697     return 1;
698
699   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
700     return -1;
701   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
702     return 1;
703
704   if (a->grandmaster_identity < b->grandmaster_identity)
705     return -1;
706   else if (a->grandmaster_identity > b->grandmaster_identity)
707     return 1;
708   else
709     g_assert_not_reached ();
710
711   return 0;
712 }
713
714 static void
715 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
716 {
717   GList *qualified_messages = NULL;
718   GList *l, *m;
719   PtpAnnounceMessage *best = NULL;
720
721   /* IEEE 1588 9.3.2.5 */
722   for (l = domain->announce_senders; l; l = l->next) {
723     PtpAnnounceSender *sender = l->data;
724     GstClockTime window = 4 * sender->announce_interval;
725     gint count = 0;
726
727     for (m = sender->announce_messages.head; m; m = m->next) {
728       PtpAnnounceMessage *msg = m->data;
729
730       if (now - msg->receive_time <= window)
731         count++;
732     }
733
734     /* Only include the newest message of announce senders that had at least 2
735      * announce messages in the last 4 announce intervals. Which also means
736      * that we wait at least 4 announce intervals before we select a master
737      * clock. Until then we just report based on the newest SYNC we received
738      */
739     if (count >= 2) {
740       qualified_messages =
741           g_list_prepend (qualified_messages,
742           g_queue_peek_tail (&sender->announce_messages));
743     }
744   }
745
746   if (!qualified_messages) {
747     GST_DEBUG
748         ("No qualified announce messages for domain %u, can't select a master clock",
749         domain->domain);
750     domain->have_master_clock = FALSE;
751     return;
752   }
753
754   for (l = qualified_messages; l; l = l->next) {
755     PtpAnnounceMessage *msg = l->data;
756
757     if (!best || compare_announce_message (msg, best) < 0)
758       best = msg;
759   }
760
761   if (domain->have_master_clock
762       && compare_clock_identity (&domain->master_clock_identity,
763           &best->master_clock_identity) == 0) {
764     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
765   } else {
766     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
767         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
768         domain->domain, best->master_clock_identity.clock_identity,
769         best->master_clock_identity.port_number, best->grandmaster_identity);
770
771     domain->have_master_clock = TRUE;
772     domain->grandmaster_identity = best->grandmaster_identity;
773
774     /* Opportunistic master clock selection likely gave us the same master
775      * clock before, no need to reset all statistics */
776     if (compare_clock_identity (&domain->master_clock_identity,
777             &best->master_clock_identity) != 0) {
778       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
779           sizeof (PtpClockIdentity));
780       domain->mean_path_delay = 0;
781       domain->last_delay_req = 0;
782       domain->last_path_delays_missing = 9;
783       domain->min_delay_req_interval = 0;
784       domain->sync_interval = 0;
785       domain->last_ptp_sync_time = 0;
786       domain->skipped_updates = 0;
787       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
788           NULL);
789       g_queue_clear (&domain->pending_syncs);
790     }
791
792     if (g_atomic_int_get (&domain_stats_n_hooks)) {
793       GstStructure *stats =
794           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
795           "domain", G_TYPE_UINT, domain->domain,
796           "master-clock-id", G_TYPE_UINT64,
797           domain->master_clock_identity.clock_identity,
798           "master-clock-port", G_TYPE_UINT,
799           domain->master_clock_identity.port_number,
800           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
801           NULL);
802       emit_ptp_statistics (domain->domain, stats);
803       gst_structure_free (stats);
804     }
805   }
806 }
807
808 static void
809 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
810 {
811   GList *l;
812   PtpDomainData *domain = NULL;
813   PtpAnnounceSender *sender = NULL;
814   PtpAnnounceMessage *announce;
815
816   /* IEEE1588 9.3.2.2 e)
817    * Don't consider messages with the alternate master flag set
818    */
819   if ((msg->flag_field & 0x0100))
820     return;
821
822   /* IEEE 1588 9.3.2.5 d)
823    * Don't consider announce messages with steps_removed>=255
824    */
825   if (msg->message_specific.announce.steps_removed >= 255)
826     return;
827
828   for (l = domain_data; l; l = l->next) {
829     PtpDomainData *tmp = l->data;
830
831     if (tmp->domain == msg->domain_number) {
832       domain = tmp;
833       break;
834     }
835   }
836
837   if (!domain) {
838     gchar *clock_name;
839
840     domain = g_new0 (PtpDomainData, 1);
841     domain->domain = msg->domain_number;
842     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
843     domain->domain_clock =
844         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
845     g_free (clock_name);
846     g_queue_init (&domain->pending_syncs);
847     domain->last_path_delays_missing = 9;
848     domain_data = g_list_prepend (domain_data, domain);
849
850     g_mutex_lock (&domain_clocks_lock);
851     domain_clocks = g_list_prepend (domain_clocks, domain);
852     g_mutex_unlock (&domain_clocks_lock);
853
854     if (g_atomic_int_get (&domain_stats_n_hooks)) {
855       GstStructure *stats =
856           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
857           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
858           domain->domain_clock, NULL);
859       emit_ptp_statistics (domain->domain, stats);
860       gst_structure_free (stats);
861     }
862   }
863
864   for (l = domain->announce_senders; l; l = l->next) {
865     PtpAnnounceSender *tmp = l->data;
866
867     if (compare_clock_identity (&tmp->master_clock_identity,
868             &msg->source_port_identity) == 0) {
869       sender = tmp;
870       break;
871     }
872   }
873
874   if (!sender) {
875     sender = g_new0 (PtpAnnounceSender, 1);
876
877     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
878         sizeof (PtpClockIdentity));
879     g_queue_init (&sender->announce_messages);
880     domain->announce_senders =
881         g_list_prepend (domain->announce_senders, sender);
882   }
883
884   for (l = sender->announce_messages.head; l; l = l->next) {
885     PtpAnnounceMessage *tmp = l->data;
886
887     /* IEEE 1588 9.3.2.5 c)
888      * Don't consider identical messages, i.e. duplicates
889      */
890     if (tmp->sequence_id == msg->sequence_id)
891       return;
892   }
893
894   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
895
896   announce = g_new0 (PtpAnnounceMessage, 1);
897   announce->receive_time = receive_time;
898   announce->sequence_id = msg->sequence_id;
899   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
900       sizeof (PtpClockIdentity));
901   announce->grandmaster_identity =
902       msg->message_specific.announce.grandmaster_identity;
903   announce->grandmaster_priority_1 =
904       msg->message_specific.announce.grandmaster_priority_1;
905   announce->grandmaster_clock_quality.clock_class =
906       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
907   announce->grandmaster_clock_quality.clock_accuracy =
908       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
909   announce->grandmaster_clock_quality.offset_scaled_log_variance =
910       msg->message_specific.announce.
911       grandmaster_clock_quality.offset_scaled_log_variance;
912   announce->grandmaster_priority_2 =
913       msg->message_specific.announce.grandmaster_priority_2;
914   announce->steps_removed = msg->message_specific.announce.steps_removed;
915   announce->time_source = msg->message_specific.announce.time_source;
916   g_queue_push_tail (&sender->announce_messages, announce);
917
918   select_best_master_clock (domain, receive_time);
919 }
920
921 static gboolean
922 send_delay_req_timeout (PtpPendingSync * sync)
923 {
924   StdIOHeader header = { 0, };
925   guint8 delay_req[44];
926   GstByteWriter writer;
927   GIOStatus status;
928   gsize written;
929   GError *err = NULL;
930
931   header.type = TYPE_EVENT;
932   header.size = 44;
933
934   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
935   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
936   gst_byte_writer_put_uint8_unchecked (&writer, 2);
937   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
938   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
939   gst_byte_writer_put_uint8_unchecked (&writer, 0);
940   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
941   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
942   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
943   gst_byte_writer_put_uint64_be_unchecked (&writer,
944       ptp_clock_id.clock_identity);
945   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
946   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
947   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
948   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
949   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
950   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
951
952   status =
953       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
954       sizeof (header), &written, &err);
955   if (status == G_IO_STATUS_ERROR) {
956     g_warning ("Failed to write to stdout: %s", err->message);
957     return G_SOURCE_REMOVE;
958   } else if (status == G_IO_STATUS_EOF) {
959     g_message ("EOF on stdout");
960     g_main_loop_quit (main_loop);
961     return G_SOURCE_REMOVE;
962   } else if (status != G_IO_STATUS_NORMAL) {
963     g_warning ("Unexpected stdout write status: %d", status);
964     g_main_loop_quit (main_loop);
965     return G_SOURCE_REMOVE;
966   } else if (written != sizeof (header)) {
967     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
968     g_main_loop_quit (main_loop);
969     return G_SOURCE_REMOVE;
970   }
971
972   sync->delay_req_send_time_local =
973       gst_clock_get_time (observation_system_clock);
974
975   status =
976       g_io_channel_write_chars (stdout_channel,
977       (const gchar *) delay_req, 44, &written, &err);
978   if (status == G_IO_STATUS_ERROR) {
979     g_warning ("Failed to write to stdout: %s", err->message);
980     g_main_loop_quit (main_loop);
981     return G_SOURCE_REMOVE;
982   } else if (status == G_IO_STATUS_EOF) {
983     g_message ("EOF on stdout");
984     g_main_loop_quit (main_loop);
985     return G_SOURCE_REMOVE;
986   } else if (status != G_IO_STATUS_NORMAL) {
987     g_warning ("Unexpected stdout write status: %d", status);
988     g_main_loop_quit (main_loop);
989     return G_SOURCE_REMOVE;
990   } else if (written != 44) {
991     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
992     g_main_loop_quit (main_loop);
993     return G_SOURCE_REMOVE;
994   }
995
996   return G_SOURCE_REMOVE;
997 }
998
999 static gboolean
1000 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
1001 {
1002   GstClockTime now = gst_clock_get_time (observation_system_clock);
1003   guint timeout;
1004   GSource *timeout_source;
1005
1006   if (domain->last_delay_req != 0
1007       && domain->last_delay_req + domain->min_delay_req_interval > now)
1008     return FALSE;
1009
1010   domain->last_delay_req = now;
1011   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1012
1013   /* IEEE 1588 9.5.11.2 */
1014   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1015     timeout = 0;
1016   else
1017     timeout =
1018         g_rand_int_range (delay_req_rand, 0,
1019         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1020
1021   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1022   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1023   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1024       sync, NULL);
1025   g_source_attach (timeout_source, main_context);
1026
1027   return TRUE;
1028 }
1029
1030 /* Filtering of outliers for RTT and time calculations inspired
1031  * by the code from gstnetclientclock.c
1032  */
1033 static void
1034 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1035 {
1036   GstClockTime internal_time, external_time, rate_num, rate_den;
1037   GstClockTime corrected_ptp_time, corrected_local_time;
1038   gdouble r_squared = 0.0;
1039   gboolean synced;
1040   GstClockTimeDiff discont = 0;
1041   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1042 #ifdef USE_MEASUREMENT_FILTERING
1043   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1044       orig_rate_den;
1045   GstClockTime new_estimated_ptp_time;
1046   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1047   gboolean now_synced;
1048 #endif
1049
1050 #ifdef USE_ONLY_SYNC_WITH_DELAY
1051   GstClockTime mean_path_delay;
1052
1053   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE)
1054     return;
1055
1056   /* IEEE 1588 11.3 */
1057   mean_path_delay =
1058       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1059       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1060       (sync->correction_field_sync + sync->correction_field_delay +
1061           32768) / 65536) / 2;
1062 #endif
1063
1064   /* IEEE 1588 11.2 */
1065   corrected_ptp_time =
1066       sync->sync_send_time_remote +
1067       (sync->correction_field_sync + 32768) / 65536;
1068
1069 #ifdef USE_ONLY_SYNC_WITH_DELAY
1070   corrected_local_time = sync->sync_recv_time_local - mean_path_delay;
1071 #else
1072   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1073 #endif
1074
1075 #ifdef USE_MEASUREMENT_FILTERING
1076   /* We check this here and when updating the mean path delay, because
1077    * we can get here without a delay response too */
1078   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1079       && sync->follow_up_recv_time_local >
1080       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1081     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1082         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1083         GST_TIME_ARGS (sync->follow_up_recv_time_local),
1084         GST_TIME_ARGS (domain->mean_path_delay));
1085     synced = FALSE;
1086     goto out;
1087   }
1088 #endif
1089
1090   /* Set an initial local-remote relation */
1091   if (domain->last_ptp_time == 0)
1092     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1093         corrected_ptp_time, 1, 1);
1094
1095 #ifdef USE_MEASUREMENT_FILTERING
1096   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1097    * estimate with our present knowledge about the clock
1098    */
1099   /* Store what the clock produced as 'now' before this update */
1100   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1101       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1102   internal_time = orig_internal_time;
1103   external_time = orig_external_time;
1104   rate_num = orig_rate_num;
1105   rate_den = orig_rate_den;
1106
1107   /* 3/4 RTT window around the estimation */
1108   max_discont = domain->mean_path_delay * 3 / 2;
1109
1110   /* Check if the estimated sync time is inside our window */
1111   estimated_ptp_time_min = corrected_local_time - max_discont;
1112   estimated_ptp_time_min =
1113       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1114       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1115   estimated_ptp_time_max = corrected_local_time + max_discont;
1116   estimated_ptp_time_max =
1117       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1118       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1119
1120   synced = (estimated_ptp_time_min < corrected_ptp_time
1121       && corrected_ptp_time < estimated_ptp_time_max);
1122
1123   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1124       GST_TIME_FORMAT, domain->domain,
1125       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1126
1127   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1128       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1129       GST_TIME_ARGS (corrected_ptp_time),
1130       GST_TIME_ARGS (estimated_ptp_time_max));
1131
1132   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1133           corrected_local_time, corrected_ptp_time, &r_squared,
1134           &internal_time, &external_time, &rate_num, &rate_den)) {
1135     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1136
1137     /* Old estimated PTP time based on receive time and path delay */
1138     estimated_ptp_time = corrected_local_time;
1139     estimated_ptp_time =
1140         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1141         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1142         orig_external_time, orig_rate_num, orig_rate_den);
1143
1144     /* New estimated PTP time based on receive time and path delay */
1145     new_estimated_ptp_time = corrected_local_time;
1146     new_estimated_ptp_time =
1147         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1148         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1149         external_time, rate_num, rate_den);
1150
1151     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1152     if (synced && ABS (discont) > max_discont) {
1153       GstClockTimeDiff offset;
1154       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1155           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1156           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1157           GST_TIME_ARGS (max_discont));
1158       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1159         offset = max_discont - discont;
1160         if (-offset > external_time)
1161           external_time = 0;
1162         else
1163           external_time += offset;
1164       } else {                  /* Too large a backward step - add a +ve offset */
1165         offset = -(max_discont + discont);
1166         external_time += offset;
1167       }
1168
1169       discont += offset;
1170     } else {
1171       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1172           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1173           GST_TIME_ARGS (max_discont));
1174     }
1175
1176     /* Check if the estimated sync time is now (still) inside our window */
1177     estimated_ptp_time_min = corrected_local_time - max_discont;
1178     estimated_ptp_time_min =
1179         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1180         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1181         external_time, rate_num, rate_den);
1182     estimated_ptp_time_max = corrected_local_time + max_discont;
1183     estimated_ptp_time_max =
1184         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1185         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1186         external_time, rate_num, rate_den);
1187
1188     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1189         && corrected_ptp_time < estimated_ptp_time_max);
1190
1191     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1192         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1193         GST_TIME_ARGS (corrected_ptp_time),
1194         GST_TIME_ARGS (estimated_ptp_time_max));
1195
1196     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1197       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1198           internal_time, external_time, rate_num, rate_den);
1199       domain->skipped_updates = 0;
1200
1201       domain->last_ptp_time = corrected_ptp_time;
1202       domain->last_local_time = corrected_local_time;
1203     } else {
1204       domain->skipped_updates++;
1205     }
1206   } else {
1207     domain->last_ptp_time = corrected_ptp_time;
1208     domain->last_local_time = corrected_local_time;
1209   }
1210
1211 #else
1212   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1213       GST_TIME_FORMAT, domain->domain,
1214       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1215
1216   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1217       &internal_time, &external_time, &rate_num, &rate_den);
1218
1219   estimated_ptp_time = corrected_local_time;
1220   estimated_ptp_time =
1221       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1222       (domain->domain_clock), estimated_ptp_time, internal_time,
1223       external_time, rate_num, rate_den);
1224
1225   gst_clock_add_observation (domain->domain_clock,
1226       corrected_local_time, corrected_ptp_time, &r_squared);
1227
1228   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1229       &internal_time, &external_time, &rate_num, &rate_den);
1230
1231   synced = TRUE;
1232   domain->last_ptp_time = corrected_ptp_time;
1233   domain->last_local_time = corrected_local_time;
1234 #endif
1235
1236 #ifdef USE_MEASUREMENT_FILTERING
1237 out:
1238 #endif
1239   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1240     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1241         "domain", G_TYPE_UINT, domain->domain,
1242         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1243         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1244         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1245         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1246         "discontinuity", G_TYPE_INT64, discont,
1247         "synced", G_TYPE_BOOLEAN, synced,
1248         "r-squared", G_TYPE_DOUBLE, r_squared,
1249         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1250         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1251         "rate-num", G_TYPE_UINT64, rate_num,
1252         "rate-den", G_TYPE_UINT64, rate_den,
1253         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1254         NULL);
1255     emit_ptp_statistics (domain->domain, stats);
1256     gst_structure_free (stats);
1257   }
1258
1259 }
1260
1261 #ifdef USE_MEDIAN_PRE_FILTERING
1262 static gint
1263 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1264 {
1265   if (*a < *b)
1266     return -1;
1267   else if (*a > *b)
1268     return 1;
1269   return 0;
1270 }
1271 #endif
1272
1273 static gboolean
1274 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1275 {
1276 #ifdef USE_MEDIAN_PRE_FILTERING
1277   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
1278   GstClockTime median;
1279   gint i;
1280 #endif
1281
1282   GstClockTime mean_path_delay, delay_req_delay = 0;
1283   gboolean ret;
1284
1285   /* IEEE 1588 11.3 */
1286   mean_path_delay =
1287       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1288       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1289       (sync->correction_field_sync + sync->correction_field_delay +
1290           32768) / 65536) / 2;
1291
1292 #ifdef USE_MEDIAN_PRE_FILTERING
1293   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
1294     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1295   domain->last_path_delays[i - 1] = mean_path_delay;
1296
1297   if (domain->last_path_delays_missing) {
1298     domain->last_path_delays_missing--;
1299   } else {
1300     memcpy (&last_path_delays, &domain->last_path_delays,
1301         sizeof (last_path_delays));
1302     g_qsort_with_data (&last_path_delays,
1303         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
1304         (GCompareDataFunc) compare_clock_time, NULL);
1305
1306     median = last_path_delays[MEDIAN_PRE_FILTERING_WINDOW / 2];
1307
1308     /* FIXME: We might want to use something else here, like only allowing
1309      * things in the interquartile range, or also filtering away delays that
1310      * are too small compared to the median. This here worked well enough
1311      * in tests so far.
1312      */
1313     if (mean_path_delay > 2 * median) {
1314       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1315           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1316           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1317       ret = FALSE;
1318       goto out;
1319     }
1320   }
1321 #endif
1322
1323 #ifdef USE_RUNNING_AVERAGE_DELAY
1324   /* Track an average round trip time, for a bit of smoothing */
1325   /* Always update before discarding a sample, so genuine changes in
1326    * the network get picked up, eventually */
1327   if (domain->mean_path_delay == 0)
1328     domain->mean_path_delay = mean_path_delay;
1329   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1330     domain->mean_path_delay =
1331         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1332   else
1333     domain->mean_path_delay =
1334         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1335 #else
1336   domain->mean_path_delay = mean_path_delay;
1337 #endif
1338
1339 #ifdef USE_MEASUREMENT_FILTERING
1340   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1341       domain->mean_path_delay != 0
1342       && sync->follow_up_recv_time_local >
1343       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1344     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1345         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1346         GST_TIME_ARGS (sync->follow_up_recv_time_local -
1347             sync->sync_recv_time_local),
1348         GST_TIME_ARGS (domain->mean_path_delay));
1349     ret = FALSE;
1350     goto out;
1351   }
1352
1353   if (mean_path_delay > 2 * domain->mean_path_delay) {
1354     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1355         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1356         GST_TIME_ARGS (mean_path_delay),
1357         GST_TIME_ARGS (domain->mean_path_delay));
1358     ret = FALSE;
1359     goto out;
1360   }
1361 #endif
1362
1363   delay_req_delay =
1364       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1365
1366 #ifdef USE_MEASUREMENT_FILTERING
1367   /* delay_req_delay is a RTT, so 2 times the path delay */
1368   if (delay_req_delay > 4 * domain->mean_path_delay) {
1369     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1370         GST_TIME_FORMAT " > 4 * %" GST_TIME_FORMAT, domain->domain,
1371         GST_TIME_ARGS (delay_req_delay),
1372         GST_TIME_ARGS (domain->mean_path_delay));
1373     ret = FALSE;
1374     goto out;
1375   }
1376 #endif
1377
1378   ret = TRUE;
1379
1380   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1381       GST_TIME_FORMAT ")", domain->domain,
1382       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1383   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1384       domain->domain, GST_TIME_ARGS (delay_req_delay));
1385
1386 #ifdef USE_MEASUREMENT_FILTERING
1387 out:
1388 #endif
1389   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1390     GstStructure *stats =
1391         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1392         "domain", G_TYPE_UINT, domain->domain,
1393         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1394         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1395         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1396     emit_ptp_statistics (domain->domain, stats);
1397     gst_structure_free (stats);
1398   }
1399
1400   return ret;
1401 }
1402
1403 static void
1404 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1405 {
1406   GList *l;
1407   PtpDomainData *domain = NULL;
1408   PtpPendingSync *sync = NULL;
1409
1410   /* Don't consider messages with the alternate master flag set */
1411   if ((msg->flag_field & 0x0100))
1412     return;
1413
1414   for (l = domain_data; l; l = l->next) {
1415     PtpDomainData *tmp = l->data;
1416
1417     if (msg->domain_number == tmp->domain) {
1418       domain = tmp;
1419       break;
1420     }
1421   }
1422
1423   if (!domain) {
1424     gchar *clock_name;
1425
1426     domain = g_new0 (PtpDomainData, 1);
1427     domain->domain = msg->domain_number;
1428     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1429     domain->domain_clock =
1430         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1431     g_free (clock_name);
1432     g_queue_init (&domain->pending_syncs);
1433     domain->last_path_delays_missing = 9;
1434     domain_data = g_list_prepend (domain_data, domain);
1435
1436     g_mutex_lock (&domain_clocks_lock);
1437     domain_clocks = g_list_prepend (domain_clocks, domain);
1438     g_mutex_unlock (&domain_clocks_lock);
1439   }
1440
1441   /* If we have a master clock, ignore this message if it's not coming from there */
1442   if (domain->have_master_clock
1443       && compare_clock_identity (&domain->master_clock_identity,
1444           &msg->source_port_identity) != 0)
1445     return;
1446
1447 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1448   /* Opportunistic selection of master clock */
1449   if (!domain->have_master_clock)
1450     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1451         sizeof (PtpClockIdentity));
1452 #else
1453   if (!domain->have_master_clock)
1454     return;
1455 #endif
1456
1457   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1458
1459   /* Check if duplicated */
1460   for (l = domain->pending_syncs.head; l; l = l->next) {
1461     PtpPendingSync *tmp = l->data;
1462
1463     if (tmp->sync_seqnum == msg->sequence_id)
1464       return;
1465   }
1466
1467   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1468       GST_CLOCK_TIME_NONE / GST_SECOND) {
1469     GST_FIXME ("Unsupported sync message seconds field value: %"
1470         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1471         msg->message_specific.sync.origin_timestamp.seconds_field,
1472         GST_CLOCK_TIME_NONE / GST_SECOND);
1473     return;
1474   }
1475
1476   sync = g_new0 (PtpPendingSync, 1);
1477   sync->domain = domain->domain;
1478   sync->sync_seqnum = msg->sequence_id;
1479   sync->sync_recv_time_local = receive_time;
1480   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1481   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1482   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1483   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1484   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1485
1486   /* 0.5 correction factor for division later */
1487   sync->correction_field_sync = msg->correction_field;
1488
1489   if ((msg->flag_field & 0x0200)) {
1490     /* Wait for FOLLOW_UP */
1491   } else {
1492     sync->sync_send_time_remote =
1493         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1494         sync.origin_timestamp);
1495
1496     if (domain->last_ptp_sync_time != 0
1497         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1498       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1499           GST_TIME_FORMAT, domain->domain,
1500           GST_TIME_ARGS (domain->last_ptp_sync_time),
1501           GST_TIME_ARGS (sync->sync_send_time_remote));
1502       ptp_pending_sync_free (sync);
1503       sync = NULL;
1504       return;
1505     }
1506     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1507
1508     if (send_delay_req (domain, sync)) {
1509       /* Sent delay request */
1510     } else {
1511       update_ptp_time (domain, sync);
1512       ptp_pending_sync_free (sync);
1513       sync = NULL;
1514     }
1515   }
1516
1517   if (sync)
1518     g_queue_push_tail (&domain->pending_syncs, sync);
1519 }
1520
1521 static void
1522 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1523 {
1524   GList *l;
1525   PtpDomainData *domain = NULL;
1526   PtpPendingSync *sync = NULL;
1527
1528   /* Don't consider messages with the alternate master flag set */
1529   if ((msg->flag_field & 0x0100))
1530     return;
1531
1532   for (l = domain_data; l; l = l->next) {
1533     PtpDomainData *tmp = l->data;
1534
1535     if (msg->domain_number == tmp->domain) {
1536       domain = tmp;
1537       break;
1538     }
1539   }
1540
1541   if (!domain)
1542     return;
1543
1544   /* If we have a master clock, ignore this message if it's not coming from there */
1545   if (domain->have_master_clock
1546       && compare_clock_identity (&domain->master_clock_identity,
1547           &msg->source_port_identity) != 0)
1548     return;
1549
1550   /* Check if we know about this one */
1551   for (l = domain->pending_syncs.head; l; l = l->next) {
1552     PtpPendingSync *tmp = l->data;
1553
1554     if (tmp->sync_seqnum == msg->sequence_id) {
1555       sync = tmp;
1556       break;
1557     }
1558   }
1559
1560   if (!sync)
1561     return;
1562
1563   /* Got a FOLLOW_UP for this already */
1564   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE)
1565     return;
1566
1567   if (sync->sync_recv_time_local >= receive_time) {
1568     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1569         GST_TIME_FORMAT, domain->domain,
1570         GST_TIME_ARGS (sync->sync_recv_time_local),
1571         GST_TIME_ARGS (receive_time));
1572     g_queue_remove (&domain->pending_syncs, sync);
1573     ptp_pending_sync_free (sync);
1574     return;
1575   }
1576
1577   sync->correction_field_sync += msg->correction_field;
1578   sync->sync_send_time_remote =
1579       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1580       follow_up.precise_origin_timestamp);
1581   sync->follow_up_recv_time_local = receive_time;
1582
1583   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1584     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1585         GST_TIME_FORMAT, domain->domain,
1586         GST_TIME_ARGS (domain->last_ptp_sync_time),
1587         GST_TIME_ARGS (sync->sync_send_time_remote));
1588     g_queue_remove (&domain->pending_syncs, sync);
1589     ptp_pending_sync_free (sync);
1590     sync = NULL;
1591     return;
1592   }
1593   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1594
1595   if (send_delay_req (domain, sync)) {
1596     /* Sent delay request */
1597   } else {
1598     update_ptp_time (domain, sync);
1599     g_queue_remove (&domain->pending_syncs, sync);
1600     ptp_pending_sync_free (sync);
1601     sync = NULL;
1602   }
1603 }
1604
1605 static void
1606 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1607 {
1608   GList *l;
1609   PtpDomainData *domain = NULL;
1610   PtpPendingSync *sync = NULL;
1611
1612   /* Don't consider messages with the alternate master flag set */
1613   if ((msg->flag_field & 0x0100))
1614     return;
1615
1616   for (l = domain_data; l; l = l->next) {
1617     PtpDomainData *tmp = l->data;
1618
1619     if (msg->domain_number == tmp->domain) {
1620       domain = tmp;
1621       break;
1622     }
1623   }
1624
1625   if (!domain)
1626     return;
1627
1628   /* If we have a master clock, ignore this message if it's not coming from there */
1629   if (domain->have_master_clock
1630       && compare_clock_identity (&domain->master_clock_identity,
1631           &msg->source_port_identity) != 0)
1632     return;
1633
1634   /* Not for us */
1635   if (msg->message_specific.delay_resp.
1636       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1637       || msg->message_specific.delay_resp.
1638       requesting_port_identity.port_number != ptp_clock_id.port_number)
1639     return;
1640
1641   domain->min_delay_req_interval =
1642       log2_to_clock_time (msg->log_message_interval);
1643
1644   /* Check if we know about this one */
1645   for (l = domain->pending_syncs.head; l; l = l->next) {
1646     PtpPendingSync *tmp = l->data;
1647
1648     if (tmp->delay_req_seqnum == msg->sequence_id) {
1649       sync = tmp;
1650       break;
1651     }
1652   }
1653
1654   if (!sync)
1655     return;
1656
1657   /* Got a DELAY_RESP for this already */
1658   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1659     return;
1660
1661   if (sync->delay_req_send_time_local > receive_time) {
1662     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1663         GST_TIME_FORMAT, domain->domain,
1664         GST_TIME_ARGS (sync->delay_req_send_time_local),
1665         GST_TIME_ARGS (receive_time));
1666     g_queue_remove (&domain->pending_syncs, sync);
1667     ptp_pending_sync_free (sync);
1668     return;
1669   }
1670
1671   sync->correction_field_delay = msg->correction_field;
1672
1673   sync->delay_req_recv_time_remote =
1674       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1675       delay_resp.receive_timestamp);
1676   sync->delay_resp_recv_time_local = receive_time;
1677
1678   if (domain->mean_path_delay != 0
1679       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1680     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1681         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1682         GST_TIME_ARGS (sync->sync_send_time_remote),
1683         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1684     g_queue_remove (&domain->pending_syncs, sync);
1685     ptp_pending_sync_free (sync);
1686     return;
1687   }
1688
1689   if (update_mean_path_delay (domain, sync))
1690     update_ptp_time (domain, sync);
1691   g_queue_remove (&domain->pending_syncs, sync);
1692   ptp_pending_sync_free (sync);
1693 }
1694
1695 static void
1696 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1697 {
1698   /* Ignore our own messages */
1699   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1700       msg->source_port_identity.port_number == ptp_clock_id.port_number)
1701     return;
1702
1703   switch (msg->message_type) {
1704     case PTP_MESSAGE_TYPE_ANNOUNCE:
1705       handle_announce_message (msg, receive_time);
1706       break;
1707     case PTP_MESSAGE_TYPE_SYNC:
1708       handle_sync_message (msg, receive_time);
1709       break;
1710     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1711       handle_follow_up_message (msg, receive_time);
1712       break;
1713     case PTP_MESSAGE_TYPE_DELAY_RESP:
1714       handle_delay_resp_message (msg, receive_time);
1715       break;
1716     default:
1717       break;
1718   }
1719 }
1720
1721 static gboolean
1722 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1723     gpointer user_data)
1724 {
1725   GIOStatus status;
1726   StdIOHeader header;
1727   gchar buffer[8192];
1728   GError *err = NULL;
1729   gsize read;
1730
1731   if ((condition & G_IO_STATUS_EOF)) {
1732     GST_ERROR ("Got EOF on stdin");
1733     g_main_loop_quit (main_loop);
1734     return G_SOURCE_REMOVE;
1735   }
1736
1737   status =
1738       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1739       &read, &err);
1740   if (status == G_IO_STATUS_ERROR) {
1741     GST_ERROR ("Failed to read from stdin: %s", err->message);
1742     g_main_loop_quit (main_loop);
1743     return G_SOURCE_REMOVE;
1744   } else if (status == G_IO_STATUS_EOF) {
1745     GST_ERROR ("Got EOF on stdin");
1746     g_main_loop_quit (main_loop);
1747     return G_SOURCE_REMOVE;
1748   } else if (status != G_IO_STATUS_NORMAL) {
1749     GST_ERROR ("Unexpected stdin read status: %d", status);
1750     g_main_loop_quit (main_loop);
1751     return G_SOURCE_REMOVE;
1752   } else if (read != sizeof (header)) {
1753     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1754     g_main_loop_quit (main_loop);
1755     return G_SOURCE_REMOVE;
1756   } else if (header.size > 8192) {
1757     GST_ERROR ("Unexpected size: %u", header.size);
1758     g_main_loop_quit (main_loop);
1759     return G_SOURCE_REMOVE;
1760   }
1761
1762   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1763   if (status == G_IO_STATUS_ERROR) {
1764     GST_ERROR ("Failed to read from stdin: %s", err->message);
1765     g_main_loop_quit (main_loop);
1766     return G_SOURCE_REMOVE;
1767   } else if (status == G_IO_STATUS_EOF) {
1768     GST_ERROR ("EOF on stdin");
1769     g_main_loop_quit (main_loop);
1770     return G_SOURCE_REMOVE;
1771   } else if (status != G_IO_STATUS_NORMAL) {
1772     GST_ERROR ("Unexpected stdin read status: %d", status);
1773     g_main_loop_quit (main_loop);
1774     return G_SOURCE_REMOVE;
1775   } else if (read != header.size) {
1776     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1777     g_main_loop_quit (main_loop);
1778     return G_SOURCE_REMOVE;
1779   }
1780
1781   switch (header.type) {
1782     case TYPE_EVENT:
1783     case TYPE_GENERAL:{
1784       GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
1785       PtpMessage msg;
1786
1787       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1788         dump_ptp_message (&msg);
1789         handle_ptp_message (&msg, receive_time);
1790       }
1791       break;
1792     }
1793     default:
1794     case TYPE_CLOCK_ID:{
1795       if (header.size != 8) {
1796         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1797         g_main_loop_quit (main_loop);
1798         return G_SOURCE_REMOVE;
1799       }
1800       g_mutex_lock (&ptp_lock);
1801       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1802       ptp_clock_id.port_number = getpid ();
1803       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1804           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1805       g_cond_signal (&ptp_cond);
1806       g_mutex_unlock (&ptp_lock);
1807       break;
1808     }
1809   }
1810
1811   return G_SOURCE_CONTINUE;
1812 }
1813
1814 /* Cleanup all announce messages and announce message senders
1815  * that are timed out by now, and clean up all pending syncs
1816  * that are missing their FOLLOW_UP or DELAY_RESP */
1817 static gboolean
1818 cleanup_cb (gpointer data)
1819 {
1820   GstClockTime now = gst_clock_get_time (observation_system_clock);
1821   GList *l, *m, *n;
1822
1823   for (l = domain_data; l; l = l->next) {
1824     PtpDomainData *domain = l->data;
1825
1826     for (n = domain->announce_senders; n;) {
1827       PtpAnnounceSender *sender = n->data;
1828       gboolean timed_out = TRUE;
1829
1830       /* Keep only 5 messages per sender around */
1831       while (g_queue_get_length (&sender->announce_messages) > 5) {
1832         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1833         g_free (msg);
1834       }
1835
1836       for (m = sender->announce_messages.head; m; m = m->next) {
1837         PtpAnnounceMessage *msg = m->data;
1838
1839         if (msg->receive_time +
1840             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1841           timed_out = FALSE;
1842           break;
1843         }
1844       }
1845
1846       if (timed_out) {
1847         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1848             sender->master_clock_identity.clock_identity,
1849             sender->master_clock_identity.port_number);
1850         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1851         g_queue_clear (&sender->announce_messages);
1852       }
1853
1854       if (g_queue_get_length (&sender->announce_messages) == 0) {
1855         GList *tmp = n->next;
1856
1857         if (compare_clock_identity (&sender->master_clock_identity,
1858                 &domain->master_clock_identity) == 0)
1859           GST_WARNING ("currently selected master clock timed out");
1860         g_free (sender);
1861         domain->announce_senders =
1862             g_list_delete_link (domain->announce_senders, n);
1863         n = tmp;
1864       } else {
1865         n = n->next;
1866       }
1867     }
1868     select_best_master_clock (domain, now);
1869
1870     /* Clean up any pending syncs */
1871     for (n = domain->pending_syncs.head; n;) {
1872       PtpPendingSync *sync = n->data;
1873       gboolean timed_out = FALSE;
1874
1875       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1876        * and pending delay reqs after 4 delay req intervals or 10 seconds
1877        */
1878       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1879           ((domain->min_delay_req_interval != 0
1880                   && sync->delay_req_send_time_local +
1881                   4 * domain->min_delay_req_interval < now)
1882               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1883         timed_out = TRUE;
1884       } else if ((domain->sync_interval != 0
1885               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
1886           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
1887         timed_out = TRUE;
1888       }
1889
1890       if (timed_out) {
1891         GList *tmp = n->next;
1892         ptp_pending_sync_free (sync);
1893         g_queue_delete_link (&domain->pending_syncs, n);
1894         n = tmp;
1895       } else {
1896         n = n->next;
1897       }
1898     }
1899   }
1900
1901   return G_SOURCE_CONTINUE;
1902 }
1903
1904 static gpointer
1905 ptp_helper_main (gpointer data)
1906 {
1907   GSource *cleanup_source;
1908
1909   GST_DEBUG ("Starting PTP helper loop");
1910
1911   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
1912   cleanup_source = g_timeout_source_new_seconds (5);
1913   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
1914   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
1915   g_source_attach (cleanup_source, main_context);
1916   g_source_unref (cleanup_source);
1917
1918   g_main_loop_run (main_loop);
1919   GST_DEBUG ("Stopped PTP helper loop");
1920
1921   g_mutex_lock (&ptp_lock);
1922   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
1923   ptp_clock_id.port_number = 0;
1924   initted = FALSE;
1925   g_cond_signal (&ptp_cond);
1926   g_mutex_unlock (&ptp_lock);
1927
1928   return NULL;
1929 }
1930
1931 /**
1932  * gst_ptp_is_supported:
1933  *
1934  * Check if PTP clocks are generally supported on this system, and if previous
1935  * initializations did not fail.
1936  *
1937  * Returns: %TRUE if PTP clocks are generally supported on this system, and
1938  * previous initializations did not fail.
1939  *
1940  * Since: 1.6
1941  */
1942 gboolean
1943 gst_ptp_is_supported (void)
1944 {
1945   return supported;
1946 }
1947
1948 /**
1949  * gst_ptp_is_initialized:
1950  *
1951  * Check if the GStreamer PTP clock subsystem is initialized.
1952  *
1953  * Returns: %TRUE if the GStreamer PTP clock subsystem is intialized.
1954  *
1955  * Since: 1.6
1956  */
1957 gboolean
1958 gst_ptp_is_initialized (void)
1959 {
1960   return initted;
1961 }
1962
1963 /**
1964  * gst_ptp_init:
1965  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
1966  * @interfaces: (transfer none) (array zero-terminated=1): network interfaces to run the clock on
1967  *
1968  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
1969  * slave-only mode for all domains on the given @interfaces with the
1970  * given @clock_id.
1971  *
1972  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
1973  * generated from the MAC address of the first network interface.
1974  *
1975  *
1976  * This function is automatically called by gst_ptp_clock_new() with default
1977  * parameters if it wasn't called before.
1978  *
1979  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
1980  *
1981  * Since: 1.6
1982  */
1983 gboolean
1984 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
1985 {
1986   gboolean ret;
1987   const gchar *env;
1988   gchar **argv = NULL;
1989   gint argc, argc_c;
1990   gint fd_r, fd_w;
1991   GError *err = NULL;
1992   GSource *stdin_source;
1993
1994   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
1995
1996   g_mutex_lock (&ptp_lock);
1997   if (!supported) {
1998     GST_ERROR ("PTP not supported");
1999     ret = FALSE;
2000     goto done;
2001   }
2002
2003   if (initted) {
2004     GST_DEBUG ("PTP already initialized");
2005     ret = TRUE;
2006     goto done;
2007   }
2008
2009   if (ptp_helper_pid) {
2010     GST_DEBUG ("PTP currently initializing");
2011     goto wait;
2012   }
2013
2014   if (!domain_stats_hooks_initted) {
2015     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2016     domain_stats_hooks_initted = TRUE;
2017   }
2018
2019   argc = 1;
2020   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2021     argc += 2;
2022   if (interfaces != NULL)
2023     argc += 2 * g_strv_length (interfaces);
2024
2025   argv = g_new0 (gchar *, argc + 2);
2026   argc_c = 0;
2027
2028   env = g_getenv ("GST_PTP_HELPER_1_0");
2029   if (env == NULL)
2030     env = g_getenv ("GST_PTP_HELPER");
2031   if (env != NULL && *env != '\0') {
2032     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2033     argv[argc_c++] = g_strdup (env);
2034   } else {
2035     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2036   }
2037
2038   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2039     argv[argc_c++] = g_strdup ("-c");
2040     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2041   }
2042
2043   if (interfaces != NULL) {
2044     gchar **ptr = interfaces;
2045
2046     while (*ptr) {
2047       argv[argc_c++] = g_strdup ("-i");
2048       argv[argc_c++] = g_strdup (*ptr);
2049       ptr++;
2050     }
2051   }
2052
2053   main_context = g_main_context_new ();
2054   main_loop = g_main_loop_new (main_context, FALSE);
2055
2056   ptp_helper_thread =
2057       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2058   if (!ptp_helper_thread) {
2059     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2060     g_clear_error (&err);
2061     ret = FALSE;
2062     goto done;
2063   }
2064
2065   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2066           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2067     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2068     g_clear_error (&err);
2069     ret = FALSE;
2070     supported = FALSE;
2071     goto done;
2072   }
2073
2074   stdin_channel = g_io_channel_unix_new (fd_r);
2075   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2076   g_io_channel_set_buffered (stdin_channel, FALSE);
2077   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2078   stdin_source =
2079       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2080   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2081   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2082       NULL);
2083   g_source_attach (stdin_source, main_context);
2084   g_source_unref (stdin_source);
2085
2086   /* Create stdout channel */
2087   stdout_channel = g_io_channel_unix_new (fd_w);
2088   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2089   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2090   g_io_channel_set_buffered (stdout_channel, FALSE);
2091
2092   delay_req_rand = g_rand_new ();
2093   observation_system_clock =
2094       g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
2095       NULL);
2096
2097   initted = TRUE;
2098
2099 wait:
2100   GST_DEBUG ("Waiting for PTP to be initialized");
2101
2102   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2103     g_cond_wait (&ptp_cond, &ptp_lock);
2104
2105   ret = initted;
2106   if (ret) {
2107     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2108         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2109   } else {
2110     GST_ERROR ("Failed to initialize");
2111     supported = FALSE;
2112   }
2113
2114 done:
2115   g_strfreev (argv);
2116
2117   if (!ret) {
2118     if (ptp_helper_pid) {
2119       kill (ptp_helper_pid, SIGKILL);
2120       waitpid (ptp_helper_pid, NULL, 0);
2121       g_spawn_close_pid (ptp_helper_pid);
2122     }
2123     ptp_helper_pid = 0;
2124
2125     if (stdin_channel)
2126       g_io_channel_unref (stdin_channel);
2127     stdin_channel = NULL;
2128     if (stdout_channel)
2129       g_io_channel_unref (stdout_channel);
2130     stdout_channel = NULL;
2131
2132     if (main_loop && ptp_helper_thread) {
2133       g_main_loop_quit (main_loop);
2134       g_thread_join (ptp_helper_thread);
2135     }
2136     ptp_helper_thread = NULL;
2137     if (main_loop)
2138       g_main_loop_unref (main_loop);
2139     main_loop = NULL;
2140     if (main_context)
2141       g_main_context_unref (main_context);
2142     main_context = NULL;
2143
2144     if (delay_req_rand)
2145       g_rand_free (delay_req_rand);
2146     delay_req_rand = NULL;
2147
2148     if (observation_system_clock)
2149       gst_object_unref (observation_system_clock);
2150     observation_system_clock = NULL;
2151   }
2152
2153   g_mutex_unlock (&ptp_lock);
2154
2155   return ret;
2156 }
2157
2158 /**
2159  * gst_ptp_deinit:
2160  *
2161  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2162  * are any remaining GstPtpClock instances, they won't be further synchronized
2163  * to the PTP network clock.
2164  *
2165  * Since: 1.6
2166  */
2167 void
2168 gst_ptp_deinit (void)
2169 {
2170   GList *l, *m;
2171
2172   g_mutex_lock (&ptp_lock);
2173
2174   if (ptp_helper_pid) {
2175     kill (ptp_helper_pid, SIGKILL);
2176     waitpid (ptp_helper_pid, NULL, 0);
2177     g_spawn_close_pid (ptp_helper_pid);
2178   }
2179   ptp_helper_pid = 0;
2180
2181   if (stdin_channel)
2182     g_io_channel_unref (stdin_channel);
2183   stdin_channel = NULL;
2184   if (stdout_channel)
2185     g_io_channel_unref (stdout_channel);
2186   stdout_channel = NULL;
2187
2188   if (main_loop && ptp_helper_thread) {
2189     GThread *tmp = ptp_helper_thread;
2190     ptp_helper_thread = NULL;
2191     g_mutex_unlock (&ptp_lock);
2192     g_main_loop_quit (main_loop);
2193     g_thread_join (tmp);
2194     g_mutex_lock (&ptp_lock);
2195   }
2196   if (main_loop)
2197     g_main_loop_unref (main_loop);
2198   main_loop = NULL;
2199   if (main_context)
2200     g_main_context_unref (main_context);
2201   main_context = NULL;
2202
2203   if (delay_req_rand)
2204     g_rand_free (delay_req_rand);
2205   delay_req_rand = NULL;
2206   if (observation_system_clock)
2207     gst_object_unref (observation_system_clock);
2208   observation_system_clock = NULL;
2209
2210   for (l = domain_data; l; l = l->next) {
2211     PtpDomainData *domain = l->data;
2212
2213     for (m = domain->announce_senders; m; m = m->next) {
2214       PtpAnnounceSender *sender = m->data;
2215
2216       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2217       g_queue_clear (&sender->announce_messages);
2218       g_free (sender);
2219     }
2220     g_list_free (domain->announce_senders);
2221
2222     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2223         NULL);
2224     g_queue_clear (&domain->pending_syncs);
2225     gst_object_unref (domain->domain_clock);
2226     g_free (domain);
2227   }
2228   g_list_free (domain_data);
2229   domain_data = NULL;
2230   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2231   g_list_free (domain_clocks);
2232   domain_clocks = NULL;
2233
2234   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2235   ptp_clock_id.port_number = 0;
2236
2237   initted = FALSE;
2238
2239   g_mutex_unlock (&ptp_lock);
2240 }
2241
2242 #define DEFAULT_DOMAIN 0
2243
2244 enum
2245 {
2246   PROP_0,
2247   PROP_DOMAIN,
2248   PROP_INTERNAL_CLOCK
2249 };
2250
2251 #define GST_PTP_CLOCK_GET_PRIVATE(obj)  \
2252   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PTP_CLOCK, GstPtpClockPrivate))
2253
2254 struct _GstPtpClockPrivate
2255 {
2256   guint domain;
2257   GstClock *domain_clock;
2258   gulong domain_stats_id;
2259 };
2260
2261 #define gst_ptp_clock_parent_class parent_class
2262 G_DEFINE_TYPE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2263
2264 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2265     const GValue * value, GParamSpec * pspec);
2266 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2267     GValue * value, GParamSpec * pspec);
2268 static void gst_ptp_clock_finalize (GObject * object);
2269
2270 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2271
2272 static void
2273 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2274 {
2275   GObjectClass *gobject_class;
2276   GstClockClass *clock_class;
2277
2278   gobject_class = G_OBJECT_CLASS (klass);
2279   clock_class = GST_CLOCK_CLASS (klass);
2280
2281   g_type_class_add_private (klass, sizeof (GstPtpClockPrivate));
2282
2283   gobject_class->finalize = gst_ptp_clock_finalize;
2284   gobject_class->get_property = gst_ptp_clock_get_property;
2285   gobject_class->set_property = gst_ptp_clock_set_property;
2286
2287   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2288       g_param_spec_uint ("domain", "Domain",
2289           "The PTP domain", 0, G_MAXUINT8,
2290           DEFAULT_DOMAIN,
2291           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2292
2293   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2294       g_param_spec_object ("internal-clock", "Internal Clock",
2295           "Internal clock", GST_TYPE_CLOCK,
2296           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2297
2298   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2299 }
2300
2301 static void
2302 gst_ptp_clock_init (GstPtpClock * self)
2303 {
2304   GstPtpClockPrivate *priv;
2305
2306   self->priv = priv = GST_PTP_CLOCK_GET_PRIVATE (self);
2307
2308   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2309   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2310
2311   priv->domain = DEFAULT_DOMAIN;
2312 }
2313
2314 static gboolean
2315 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2316 {
2317   gboolean got_clock = TRUE;
2318
2319   if (G_UNLIKELY (!self->priv->domain_clock)) {
2320     g_mutex_lock (&domain_clocks_lock);
2321     if (!self->priv->domain_clock) {
2322       GList *l;
2323
2324       got_clock = FALSE;
2325
2326       for (l = domain_clocks; l; l = l->next) {
2327         PtpDomainData *clock_data = l->data;
2328
2329         if (clock_data->domain == self->priv->domain
2330             && clock_data->last_ptp_time != 0) {
2331           self->priv->domain_clock = clock_data->domain_clock;
2332           got_clock = TRUE;
2333           break;
2334         }
2335       }
2336     }
2337     g_mutex_unlock (&domain_clocks_lock);
2338     if (got_clock) {
2339       g_object_notify (G_OBJECT (self), "internal-clock");
2340       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2341     }
2342   }
2343
2344   return got_clock;
2345 }
2346
2347 static gboolean
2348 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2349     gpointer user_data)
2350 {
2351   GstPtpClock *self = user_data;
2352
2353   if (domain != self->priv->domain
2354       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2355     return TRUE;
2356
2357   /* Let's set our internal clock */
2358   if (!gst_ptp_clock_ensure_domain_clock (self))
2359     return TRUE;
2360
2361   self->priv->domain_stats_id = 0;
2362
2363   return FALSE;
2364 }
2365
2366 static void
2367 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2368     const GValue * value, GParamSpec * pspec)
2369 {
2370   GstPtpClock *self = GST_PTP_CLOCK (object);
2371
2372   switch (prop_id) {
2373     case PROP_DOMAIN:
2374       self->priv->domain = g_value_get_uint (value);
2375       gst_ptp_clock_ensure_domain_clock (self);
2376       if (!self->priv->domain_clock)
2377         self->priv->domain_stats_id =
2378             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2379             NULL);
2380       break;
2381     default:
2382       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2383       break;
2384   }
2385 }
2386
2387 static void
2388 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2389     GValue * value, GParamSpec * pspec)
2390 {
2391   GstPtpClock *self = GST_PTP_CLOCK (object);
2392
2393   switch (prop_id) {
2394     case PROP_DOMAIN:
2395       g_value_set_uint (value, self->priv->domain);
2396       break;
2397     case PROP_INTERNAL_CLOCK:
2398       gst_ptp_clock_ensure_domain_clock (self);
2399       g_value_set_object (value, self->priv->domain_clock);
2400       break;
2401     default:
2402       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2403       break;
2404   }
2405 }
2406
2407 static void
2408 gst_ptp_clock_finalize (GObject * object)
2409 {
2410   GstPtpClock *self = GST_PTP_CLOCK (object);
2411
2412   if (self->priv->domain_stats_id)
2413     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2414
2415   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2416 }
2417
2418 static GstClockTime
2419 gst_ptp_clock_get_internal_time (GstClock * clock)
2420 {
2421   GstPtpClock *self = GST_PTP_CLOCK (clock);
2422
2423   gst_ptp_clock_ensure_domain_clock (self);
2424
2425   if (!self->priv->domain_clock) {
2426     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2427         self->priv->domain);
2428     return GST_CLOCK_TIME_NONE;
2429   }
2430
2431   return gst_clock_get_time (self->priv->domain_clock);
2432 }
2433
2434 /**
2435  * gst_ptp_clock_new:
2436  * @name: Name of the clock
2437  * @domain: PTP domain
2438  *
2439  * Creates a new PTP clock instance that exports the PTP time of the master
2440  * clock in @domain. This clock can be slaved to other clocks as needed.
2441  *
2442  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2443  * default parameters.
2444  *
2445  *
2446  * This clock only returns valid timestamps after it received the first
2447  * times from the PTP master clock on the network. Once this happens the
2448  * GstPtpClock::internal-clock property will become non-NULL. You can
2449  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
2450  * gst_clock_is_synced().
2451  *
2452  * Since: 1.6
2453  */
2454 GstClock *
2455 gst_ptp_clock_new (const gchar * name, guint domain)
2456 {
2457   g_return_val_if_fail (name != NULL, NULL);
2458   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2459
2460   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2461     GST_ERROR ("Failed to initialize PTP");
2462     return NULL;
2463   }
2464
2465   return g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2466       NULL);
2467 }
2468
2469 typedef struct
2470 {
2471   guint8 domain;
2472   const GstStructure *stats;
2473 } DomainStatsMarshalData;
2474
2475 static void
2476 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2477 {
2478   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2479
2480   if (!callback (data->domain, data->stats, hook->data))
2481     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2482 }
2483
2484 static void
2485 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2486 {
2487   DomainStatsMarshalData data = { domain, stats };
2488
2489   g_mutex_lock (&ptp_lock);
2490   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2491       (GHookMarshaller) domain_stats_marshaller, &data);
2492   g_mutex_unlock (&ptp_lock);
2493 }
2494
2495 /**
2496  * gst_ptp_statistics_callback_add:
2497  * @callback: GstPtpStatisticsCallback to call
2498  * @user_data: Data to pass to the callback
2499  * @destroy_data: GDestroyNotify to destroy the data
2500  *
2501  * Installs a new statistics callback for gathering PTP statistics. See
2502  * GstPtpStatisticsCallback for a list of statistics that are provided.
2503  *
2504  * Returns: Id for the callback that can be passed to
2505  * gst_ptp_statistics_callback_remove()
2506  *
2507  * Since: 1.6
2508  */
2509 gulong
2510 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2511     gpointer user_data, GDestroyNotify destroy_data)
2512 {
2513   GHook *hook;
2514
2515   g_mutex_lock (&ptp_lock);
2516
2517   if (!domain_stats_hooks_initted) {
2518     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2519     domain_stats_hooks_initted = TRUE;
2520   }
2521
2522   hook = g_hook_alloc (&domain_stats_hooks);
2523   hook->func = callback;
2524   hook->data = user_data;
2525   hook->destroy = destroy_data;
2526   g_hook_prepend (&domain_stats_hooks, hook);
2527   g_atomic_int_add (&domain_stats_n_hooks, 1);
2528
2529   g_mutex_unlock (&ptp_lock);
2530
2531   return hook->hook_id;
2532 }
2533
2534 /**
2535  * gst_ptp_statistics_callback_remove:
2536  * @id: Callback id to remove
2537  *
2538  * Removes a PTP statistics callback that was previously added with
2539  * gst_ptp_statistics_callback_add().
2540  *
2541  * Since: 1.6
2542  */
2543 void
2544 gst_ptp_statistics_callback_remove (gulong id)
2545 {
2546   g_mutex_lock (&ptp_lock);
2547   if (g_hook_destroy (&domain_stats_hooks, id))
2548     g_atomic_int_add (&domain_stats_n_hooks, -1);
2549   g_mutex_unlock (&ptp_lock);
2550 }
2551
2552 #else /* HAVE_PTP */
2553
2554 GType
2555 gst_ptp_clock_get_type (void)
2556 {
2557   return G_TYPE_INVALID;
2558 }
2559
2560 gboolean
2561 gst_ptp_is_supported (void)
2562 {
2563   return FALSE;
2564 }
2565
2566 gboolean
2567 gst_ptp_is_initialized (void)
2568 {
2569   return FALSE;
2570 }
2571
2572 gboolean
2573 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
2574 {
2575   return FALSE;
2576 }
2577
2578 void
2579 gst_ptp_deinit (void)
2580 {
2581 }
2582
2583 GstClock *
2584 gst_ptp_clock_new (const gchar * name, guint domain)
2585 {
2586   return NULL;
2587 }
2588
2589 gulong
2590 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2591     gpointer user_data, GDestroyNotify destroy_data)
2592 {
2593   return 0;
2594 }
2595
2596 void
2597 gst_ptp_statistics_callback_remove (gulong id)
2598 {
2599   return;
2600 }
2601
2602 #endif