ptpclock: Allow at least 100ms delay between Sync/Follow_Up and Delay_Req/Delay_Resp...
[platform/upstream/gstreamer.git] / subprojects / gstreamer / libs / gst / net / gstptpclock.c
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @title: GstPtpClock
23  * @short_description: Special clock that synchronizes to a remote time
24  *                     provider via PTP (IEEE1588:2008).
25  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
26  *
27  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
28  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
29  * clock in some specific domain.
30  *
31  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
32  * a helper process to do the actual communication via the PTP ports. This is
33  * required as PTP listens on ports < 1024 and thus requires special
34  * privileges. Once this helper process is started, the main process will
35  * synchronize to all PTP domains that are detected on the selected
36  * interfaces.
37  *
38  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
39  * time from a master clock inside a specific PTP domain. This clock will only
40  * return valid timestamps once the timestamps in the PTP domain are known. To
41  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
42  * signal and gst_clock_is_synced().
43  *
44  * To gather statistics about the PTP clock synchronization,
45  * gst_ptp_statistics_callback_add() can be used. This gives the application
46  * the possibility to collect all kinds of statistics from the clock
47  * synchronization.
48  *
49  * Since: 1.6
50  *
51  */
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include "gstptpclock.h"
57
58 #include "gstptp_private.h"
59
60 #ifdef HAVE_SYS_WAIT_H
61 #include <sys/wait.h>
62 #endif
63 #ifdef G_OS_WIN32
64 #define WIN32_LEAN_AND_MEAN
65 #include <windows.h>
66 #include <processthreadsapi.h>  /* GetCurrentProcessId */
67 #endif
68 #include <sys/types.h>
69
70 #ifdef HAVE_UNISTD_H
71 #include <unistd.h>
72 #elif defined(G_OS_WIN32)
73 #include <io.h>
74 #endif
75
76 #include <gst/base/base.h>
77
78 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
79 #define GST_CAT_DEFAULT (ptp_debug)
80
81 /* IEEE 1588 7.7.3.1 */
82 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
83
84 /* Use a running average for calculating the mean path delay instead
85  * of just using the last measurement. Enabling this helps in unreliable
86  * networks, like wifi, with often changing delays
87  *
88  * Undef for following IEEE1588-2008 by the letter
89  */
90 #define USE_RUNNING_AVERAGE_DELAY 1
91
92 /* Filter out any measurements that are above a certain threshold compared to
93  * previous measurements. Enabling this helps filtering out outliers that
94  * happen fairly often in unreliable networks, like wifi.
95  *
96  * Undef for following IEEE1588-2008 by the letter
97  */
98 #define USE_MEASUREMENT_FILTERING 1
99
100 /* Select the first clock from which we capture a SYNC message as the master
101  * clock of the domain until we are ready to run the best master clock
102  * algorithm. This allows faster syncing but might mean a change of the master
103  * clock in the beginning. As all clocks in a domain are supposed to use the
104  * same time, this shouldn't be much of a problem.
105  *
106  * Undef for following IEEE1588-2008 by the letter
107  */
108 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
109
110 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
111  * afterwards. This allows better synchronization in networks with varying
112  * delays, as for every other SYNC message we would have to assume that it's
113  * the average of what we saw before. But that might be completely off
114  */
115 #define USE_ONLY_SYNC_WITH_DELAY 1
116
117 /* Filter out delay measurements that are too far away from the median of the
118  * last delay measurements, currently those that are more than 2 times as big.
119  * This increases accuracy a lot on wifi.
120  */
121 #define USE_MEDIAN_PRE_FILTERING 1
122 #define MEDIAN_PRE_FILTERING_WINDOW 9
123
124 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
125 #define MAX_SKIPPED_UPDATES 5
126
127 typedef enum
128 {
129   PTP_MESSAGE_TYPE_SYNC = 0x0,
130   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
131   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
132   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
133   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
134   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
135   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
136   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
137   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
138   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
139 } PtpMessageType;
140
141 typedef struct
142 {
143   guint64 seconds_field;        /* 48 bits valid */
144   guint32 nanoseconds_field;
145 } PtpTimestamp;
146
147 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
148 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
149 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
150
151 typedef struct
152 {
153   guint64 clock_identity;
154   guint16 port_number;
155 } PtpClockIdentity;
156
157 static gint
158 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
159 {
160   if (a->clock_identity < b->clock_identity)
161     return -1;
162   else if (a->clock_identity > b->clock_identity)
163     return 1;
164
165   if (a->port_number < b->port_number)
166     return -1;
167   else if (a->port_number > b->port_number)
168     return 1;
169
170   return 0;
171 }
172
173 typedef struct
174 {
175   guint8 clock_class;
176   guint8 clock_accuracy;
177   guint16 offset_scaled_log_variance;
178 } PtpClockQuality;
179
180 typedef struct
181 {
182   guint8 transport_specific;
183   PtpMessageType message_type;
184   /* guint8 reserved; */
185   guint8 version_ptp;
186   guint16 message_length;
187   guint8 domain_number;
188   /* guint8 reserved; */
189   guint16 flag_field;
190   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
191   /* guint32 reserved; */
192   PtpClockIdentity source_port_identity;
193   guint16 sequence_id;
194   guint8 control_field;
195   gint8 log_message_interval;
196
197   union
198   {
199     struct
200     {
201       PtpTimestamp origin_timestamp;
202       gint16 current_utc_offset;
203       /* guint8 reserved; */
204       guint8 grandmaster_priority_1;
205       PtpClockQuality grandmaster_clock_quality;
206       guint8 grandmaster_priority_2;
207       guint64 grandmaster_identity;
208       guint16 steps_removed;
209       guint8 time_source;
210     } announce;
211
212     struct
213     {
214       PtpTimestamp origin_timestamp;
215     } sync;
216
217     struct
218     {
219       PtpTimestamp precise_origin_timestamp;
220     } follow_up;
221
222     struct
223     {
224       PtpTimestamp origin_timestamp;
225     } delay_req;
226
227     struct
228     {
229       PtpTimestamp receive_timestamp;
230       PtpClockIdentity requesting_port_identity;
231     } delay_resp;
232
233   } message_specific;
234 } PtpMessage;
235
236 static GMutex ptp_lock;
237 static GCond ptp_cond;
238 static gboolean initted = FALSE;
239 #ifdef HAVE_PTP
240 static gboolean supported = TRUE;
241 #else
242 static gboolean supported = FALSE;
243 #endif
244 static GPid ptp_helper_pid;
245 static GThread *ptp_helper_thread;
246 static GMainContext *main_context;
247 static GMainLoop *main_loop;
248 static GIOChannel *stdin_channel, *stdout_channel;
249 static GRand *delay_req_rand;
250 static GstClock *observation_system_clock;
251 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
252
253 typedef struct
254 {
255   GstClockTime receive_time;
256
257   PtpClockIdentity master_clock_identity;
258
259   guint8 grandmaster_priority_1;
260   PtpClockQuality grandmaster_clock_quality;
261   guint8 grandmaster_priority_2;
262   guint64 grandmaster_identity;
263   guint16 steps_removed;
264   guint8 time_source;
265
266   guint16 sequence_id;
267 } PtpAnnounceMessage;
268
269 typedef struct
270 {
271   PtpClockIdentity master_clock_identity;
272
273   GstClockTime announce_interval;       /* last interval we received */
274   GQueue announce_messages;
275 } PtpAnnounceSender;
276
277 typedef struct
278 {
279   guint domain;
280   PtpClockIdentity master_clock_identity;
281
282   guint16 sync_seqnum;
283   GstClockTime sync_recv_time_local;    /* t2 */
284   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
285   GstClockTime follow_up_recv_time_local;
286
287   GSource *timeout_source;
288   guint16 delay_req_seqnum;
289   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
290   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
291   GstClockTime delay_resp_recv_time_local;
292
293   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
294   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
295 } PtpPendingSync;
296
297 static void
298 ptp_pending_sync_free (PtpPendingSync * sync)
299 {
300   if (sync->timeout_source) {
301     g_source_destroy (sync->timeout_source);
302     g_source_unref (sync->timeout_source);
303   }
304   g_free (sync);
305 }
306
307 typedef struct
308 {
309   guint domain;
310
311   GstClockTime last_ptp_time;
312   GstClockTime last_local_time;
313   gint skipped_updates;
314
315   /* Used for selecting the master/grandmaster */
316   GList *announce_senders;
317
318   /* Last selected master clock */
319   gboolean have_master_clock;
320   PtpClockIdentity master_clock_identity;
321   guint64 grandmaster_identity;
322
323   /* Last SYNC or FOLLOW_UP timestamp we received */
324   GstClockTime last_ptp_sync_time;
325   GstClockTime sync_interval;
326
327   GstClockTime mean_path_delay;
328   GstClockTime last_delay_req, min_delay_req_interval;
329   guint16 last_delay_req_seqnum;
330
331   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
332   gint last_path_delays_missing;
333
334   GQueue pending_syncs;
335
336   GstClock *domain_clock;
337 } PtpDomainData;
338
339 static GList *domain_data;
340 static GMutex domain_clocks_lock;
341 static GList *domain_clocks;
342
343 /* Protected by PTP lock */
344 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
345 static GHookList domain_stats_hooks;
346 static gint domain_stats_n_hooks;
347 static gboolean domain_stats_hooks_initted = FALSE;
348
349 /* Converts log2 seconds to GstClockTime */
350 static GstClockTime
351 log2_to_clock_time (gint l)
352 {
353   if (l < 0)
354     return GST_SECOND >> (-l);
355   else
356     return GST_SECOND << l;
357 }
358
359 static void
360 dump_ptp_message (PtpMessage * msg)
361 {
362   GST_TRACE ("PTP message:");
363   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
364   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
365   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
366   GST_TRACE ("\tmessage_length: %u", msg->message_length);
367   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
368   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
369   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
370       (msg->correction_field / 65536),
371       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
372   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
373       msg->source_port_identity.clock_identity,
374       msg->source_port_identity.port_number);
375   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
376   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
377   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
378       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
379
380   switch (msg->message_type) {
381     case PTP_MESSAGE_TYPE_ANNOUNCE:
382       GST_TRACE ("\tANNOUNCE:");
383       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
384           msg->message_specific.announce.origin_timestamp.seconds_field,
385           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
386       GST_TRACE ("\t\tcurrent_utc_offset: %d",
387           msg->message_specific.announce.current_utc_offset);
388       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
389           msg->message_specific.announce.grandmaster_priority_1);
390       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
391           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
392           msg->message_specific.announce.
393           grandmaster_clock_quality.clock_accuracy,
394           msg->message_specific.announce.
395           grandmaster_clock_quality.offset_scaled_log_variance);
396       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
397           msg->message_specific.announce.grandmaster_priority_2);
398       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
399           msg->message_specific.announce.grandmaster_identity);
400       GST_TRACE ("\t\tsteps_removed: %u",
401           msg->message_specific.announce.steps_removed);
402       GST_TRACE ("\t\ttime_source: 0x%02x",
403           msg->message_specific.announce.time_source);
404       break;
405     case PTP_MESSAGE_TYPE_SYNC:
406       GST_TRACE ("\tSYNC:");
407       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
408           msg->message_specific.sync.origin_timestamp.seconds_field,
409           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
410       break;
411     case PTP_MESSAGE_TYPE_FOLLOW_UP:
412       GST_TRACE ("\tFOLLOW_UP:");
413       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
414           msg->message_specific.follow_up.
415           precise_origin_timestamp.seconds_field,
416           msg->message_specific.follow_up.
417           precise_origin_timestamp.nanoseconds_field);
418       break;
419     case PTP_MESSAGE_TYPE_DELAY_REQ:
420       GST_TRACE ("\tDELAY_REQ:");
421       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
422           msg->message_specific.delay_req.origin_timestamp.seconds_field,
423           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
424       break;
425     case PTP_MESSAGE_TYPE_DELAY_RESP:
426       GST_TRACE ("\tDELAY_RESP:");
427       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
428           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
429           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
430       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
431           "x %u",
432           msg->message_specific.delay_resp.
433           requesting_port_identity.clock_identity,
434           msg->message_specific.delay_resp.
435           requesting_port_identity.port_number);
436       break;
437     default:
438       break;
439   }
440   GST_TRACE (" ");
441 }
442
443 /* IEEE 1588-2008 5.3.3 */
444 static gboolean
445 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
446 {
447   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
448
449   timestamp->seconds_field =
450       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
451       gst_byte_reader_get_uint16_be_unchecked (reader);
452   timestamp->nanoseconds_field =
453       gst_byte_reader_get_uint32_be_unchecked (reader);
454
455   if (timestamp->nanoseconds_field >= 1000000000)
456     return FALSE;
457
458   return TRUE;
459 }
460
461 /* IEEE 1588-2008 13.3 */
462 static gboolean
463 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
464 {
465   guint8 b;
466
467   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
468
469   b = gst_byte_reader_get_uint8_unchecked (reader);
470   msg->transport_specific = b >> 4;
471   msg->message_type = b & 0x0f;
472
473   b = gst_byte_reader_get_uint8_unchecked (reader);
474   msg->version_ptp = b & 0x0f;
475   if (msg->version_ptp != 2) {
476     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
477     return FALSE;
478   }
479
480   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
481   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
482     GST_WARNING ("Not enough data (%u < %u)",
483         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
484     return FALSE;
485   }
486
487   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
488   gst_byte_reader_skip_unchecked (reader, 1);
489
490   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
491   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
492   gst_byte_reader_skip_unchecked (reader, 4);
493
494   msg->source_port_identity.clock_identity =
495       gst_byte_reader_get_uint64_be_unchecked (reader);
496   msg->source_port_identity.port_number =
497       gst_byte_reader_get_uint16_be_unchecked (reader);
498
499   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
500   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
501   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
502
503   return TRUE;
504 }
505
506 /* IEEE 1588-2008 13.5 */
507 static gboolean
508 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
509 {
510   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
511
512   if (gst_byte_reader_get_remaining (reader) < 20)
513     return FALSE;
514
515   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
516           reader))
517     return FALSE;
518
519   msg->message_specific.announce.current_utc_offset =
520       gst_byte_reader_get_uint16_be_unchecked (reader);
521   gst_byte_reader_skip_unchecked (reader, 1);
522
523   msg->message_specific.announce.grandmaster_priority_1 =
524       gst_byte_reader_get_uint8_unchecked (reader);
525   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
526       gst_byte_reader_get_uint8_unchecked (reader);
527   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
528       gst_byte_reader_get_uint8_unchecked (reader);
529   msg->message_specific.announce.
530       grandmaster_clock_quality.offset_scaled_log_variance =
531       gst_byte_reader_get_uint16_be_unchecked (reader);
532   msg->message_specific.announce.grandmaster_priority_2 =
533       gst_byte_reader_get_uint8_unchecked (reader);
534   msg->message_specific.announce.grandmaster_identity =
535       gst_byte_reader_get_uint64_be_unchecked (reader);
536   msg->message_specific.announce.steps_removed =
537       gst_byte_reader_get_uint16_be_unchecked (reader);
538   msg->message_specific.announce.time_source =
539       gst_byte_reader_get_uint8_unchecked (reader);
540
541   return TRUE;
542 }
543
544 /* IEEE 1588-2008 13.6 */
545 static gboolean
546 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
547 {
548   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
549
550   if (gst_byte_reader_get_remaining (reader) < 10)
551     return FALSE;
552
553   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
554           reader))
555     return FALSE;
556
557   return TRUE;
558 }
559
560 /* IEEE 1588-2008 13.6 */
561 static gboolean
562 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
563 {
564   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
565
566   if (gst_byte_reader_get_remaining (reader) < 10)
567     return FALSE;
568
569   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
570           reader))
571     return FALSE;
572
573   return TRUE;
574 }
575
576 /* IEEE 1588-2008 13.7 */
577 static gboolean
578 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
579 {
580   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
581
582   if (gst_byte_reader_get_remaining (reader) < 10)
583     return FALSE;
584
585   if (!parse_ptp_timestamp (&msg->message_specific.
586           follow_up.precise_origin_timestamp, reader))
587     return FALSE;
588
589   return TRUE;
590 }
591
592 /* IEEE 1588-2008 13.8 */
593 static gboolean
594 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
595 {
596   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
597       FALSE);
598
599   if (gst_byte_reader_get_remaining (reader) < 20)
600     return FALSE;
601
602   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
603           reader))
604     return FALSE;
605
606   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
607       gst_byte_reader_get_uint64_be_unchecked (reader);
608   msg->message_specific.delay_resp.requesting_port_identity.port_number =
609       gst_byte_reader_get_uint16_be_unchecked (reader);
610
611   return TRUE;
612 }
613
614 static gboolean
615 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
616 {
617   GstByteReader reader;
618   gboolean ret = FALSE;
619
620   gst_byte_reader_init (&reader, data, size);
621
622   if (!parse_ptp_message_header (msg, &reader)) {
623     GST_WARNING ("Failed to parse PTP message header");
624     return FALSE;
625   }
626
627   switch (msg->message_type) {
628     case PTP_MESSAGE_TYPE_SYNC:
629       ret = parse_ptp_message_sync (msg, &reader);
630       break;
631     case PTP_MESSAGE_TYPE_FOLLOW_UP:
632       ret = parse_ptp_message_follow_up (msg, &reader);
633       break;
634     case PTP_MESSAGE_TYPE_DELAY_REQ:
635       ret = parse_ptp_message_delay_req (msg, &reader);
636       break;
637     case PTP_MESSAGE_TYPE_DELAY_RESP:
638       ret = parse_ptp_message_delay_resp (msg, &reader);
639       break;
640     case PTP_MESSAGE_TYPE_ANNOUNCE:
641       ret = parse_ptp_message_announce (msg, &reader);
642       break;
643     default:
644       /* ignore for now */
645       break;
646   }
647
648   return ret;
649 }
650
651 static gint
652 compare_announce_message (const PtpAnnounceMessage * a,
653     const PtpAnnounceMessage * b)
654 {
655   /* IEEE 1588 Figure 27 */
656   if (a->grandmaster_identity == b->grandmaster_identity) {
657     if (a->steps_removed + 1 < b->steps_removed)
658       return -1;
659     else if (a->steps_removed > b->steps_removed + 1)
660       return 1;
661
662     /* Error cases are filtered out earlier */
663     if (a->steps_removed < b->steps_removed)
664       return -1;
665     else if (a->steps_removed > b->steps_removed)
666       return 1;
667
668     /* Error cases are filtered out earlier */
669     if (a->master_clock_identity.clock_identity <
670         b->master_clock_identity.clock_identity)
671       return -1;
672     else if (a->master_clock_identity.clock_identity >
673         b->master_clock_identity.clock_identity)
674       return 1;
675
676     /* Error cases are filtered out earlier */
677     if (a->master_clock_identity.port_number <
678         b->master_clock_identity.port_number)
679       return -1;
680     else if (a->master_clock_identity.port_number >
681         b->master_clock_identity.port_number)
682       return 1;
683     else
684       g_assert_not_reached ();
685
686     return 0;
687   }
688
689   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
690     return -1;
691   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
692     return 1;
693
694   if (a->grandmaster_clock_quality.clock_class <
695       b->grandmaster_clock_quality.clock_class)
696     return -1;
697   else if (a->grandmaster_clock_quality.clock_class >
698       b->grandmaster_clock_quality.clock_class)
699     return 1;
700
701   if (a->grandmaster_clock_quality.clock_accuracy <
702       b->grandmaster_clock_quality.clock_accuracy)
703     return -1;
704   else if (a->grandmaster_clock_quality.clock_accuracy >
705       b->grandmaster_clock_quality.clock_accuracy)
706     return 1;
707
708   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
709       b->grandmaster_clock_quality.offset_scaled_log_variance)
710     return -1;
711   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
712       b->grandmaster_clock_quality.offset_scaled_log_variance)
713     return 1;
714
715   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
716     return -1;
717   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
718     return 1;
719
720   if (a->grandmaster_identity < b->grandmaster_identity)
721     return -1;
722   else if (a->grandmaster_identity > b->grandmaster_identity)
723     return 1;
724   else
725     g_assert_not_reached ();
726
727   return 0;
728 }
729
730 static void
731 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
732 {
733   GList *qualified_messages = NULL;
734   GList *l, *m;
735   PtpAnnounceMessage *best = NULL;
736
737   /* IEEE 1588 9.3.2.5 */
738   for (l = domain->announce_senders; l; l = l->next) {
739     PtpAnnounceSender *sender = l->data;
740     GstClockTime window = 4 * sender->announce_interval;
741     gint count = 0;
742
743     for (m = sender->announce_messages.head; m; m = m->next) {
744       PtpAnnounceMessage *msg = m->data;
745
746       if (now - msg->receive_time <= window)
747         count++;
748     }
749
750     /* Only include the newest message of announce senders that had at least 2
751      * announce messages in the last 4 announce intervals. Which also means
752      * that we wait at least 4 announce intervals before we select a master
753      * clock. Until then we just report based on the newest SYNC we received
754      */
755     if (count >= 2) {
756       qualified_messages =
757           g_list_prepend (qualified_messages,
758           g_queue_peek_tail (&sender->announce_messages));
759     }
760   }
761
762   if (!qualified_messages) {
763     GST_DEBUG
764         ("No qualified announce messages for domain %u, can't select a master clock",
765         domain->domain);
766     domain->have_master_clock = FALSE;
767     return;
768   }
769
770   for (l = qualified_messages; l; l = l->next) {
771     PtpAnnounceMessage *msg = l->data;
772
773     if (!best || compare_announce_message (msg, best) < 0)
774       best = msg;
775   }
776   g_clear_pointer (&qualified_messages, g_list_free);
777
778   if (domain->have_master_clock
779       && compare_clock_identity (&domain->master_clock_identity,
780           &best->master_clock_identity) == 0) {
781     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
782   } else {
783     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
784         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
785         domain->domain, best->master_clock_identity.clock_identity,
786         best->master_clock_identity.port_number, best->grandmaster_identity);
787
788     domain->have_master_clock = TRUE;
789     domain->grandmaster_identity = best->grandmaster_identity;
790
791     /* Opportunistic master clock selection likely gave us the same master
792      * clock before, no need to reset all statistics */
793     if (compare_clock_identity (&domain->master_clock_identity,
794             &best->master_clock_identity) != 0) {
795       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
796           sizeof (PtpClockIdentity));
797       domain->mean_path_delay = 0;
798       domain->last_delay_req = 0;
799       domain->last_path_delays_missing = 9;
800       domain->min_delay_req_interval = 0;
801       domain->sync_interval = 0;
802       domain->last_ptp_sync_time = 0;
803       domain->skipped_updates = 0;
804       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
805           NULL);
806       g_queue_clear (&domain->pending_syncs);
807     }
808
809     if (g_atomic_int_get (&domain_stats_n_hooks)) {
810       GstStructure *stats =
811           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
812           "domain", G_TYPE_UINT, domain->domain,
813           "master-clock-id", G_TYPE_UINT64,
814           domain->master_clock_identity.clock_identity,
815           "master-clock-port", G_TYPE_UINT,
816           domain->master_clock_identity.port_number,
817           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
818           NULL);
819       emit_ptp_statistics (domain->domain, stats);
820       gst_structure_free (stats);
821     }
822   }
823 }
824
825 static void
826 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
827 {
828   GList *l;
829   PtpDomainData *domain = NULL;
830   PtpAnnounceSender *sender = NULL;
831   PtpAnnounceMessage *announce;
832
833   /* IEEE1588 9.3.2.2 e)
834    * Don't consider messages with the alternate master flag set
835    */
836   if ((msg->flag_field & 0x0100))
837     return;
838
839   /* IEEE 1588 9.3.2.5 d)
840    * Don't consider announce messages with steps_removed>=255
841    */
842   if (msg->message_specific.announce.steps_removed >= 255)
843     return;
844
845   for (l = domain_data; l; l = l->next) {
846     PtpDomainData *tmp = l->data;
847
848     if (tmp->domain == msg->domain_number) {
849       domain = tmp;
850       break;
851     }
852   }
853
854   if (!domain) {
855     gchar *clock_name;
856
857     domain = g_new0 (PtpDomainData, 1);
858     domain->domain = msg->domain_number;
859     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
860     domain->domain_clock =
861         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
862     gst_object_ref_sink (domain->domain_clock);
863     g_free (clock_name);
864     g_queue_init (&domain->pending_syncs);
865     domain->last_path_delays_missing = 9;
866     domain_data = g_list_prepend (domain_data, domain);
867
868     g_mutex_lock (&domain_clocks_lock);
869     domain_clocks = g_list_prepend (domain_clocks, domain);
870     g_mutex_unlock (&domain_clocks_lock);
871
872     if (g_atomic_int_get (&domain_stats_n_hooks)) {
873       GstStructure *stats =
874           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
875           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
876           domain->domain_clock, NULL);
877       emit_ptp_statistics (domain->domain, stats);
878       gst_structure_free (stats);
879     }
880   }
881
882   for (l = domain->announce_senders; l; l = l->next) {
883     PtpAnnounceSender *tmp = l->data;
884
885     if (compare_clock_identity (&tmp->master_clock_identity,
886             &msg->source_port_identity) == 0) {
887       sender = tmp;
888       break;
889     }
890   }
891
892   if (!sender) {
893     sender = g_new0 (PtpAnnounceSender, 1);
894
895     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
896         sizeof (PtpClockIdentity));
897     g_queue_init (&sender->announce_messages);
898     domain->announce_senders =
899         g_list_prepend (domain->announce_senders, sender);
900   }
901
902   for (l = sender->announce_messages.head; l; l = l->next) {
903     PtpAnnounceMessage *tmp = l->data;
904
905     /* IEEE 1588 9.3.2.5 c)
906      * Don't consider identical messages, i.e. duplicates
907      */
908     if (tmp->sequence_id == msg->sequence_id)
909       return;
910   }
911
912   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
913
914   announce = g_new0 (PtpAnnounceMessage, 1);
915   announce->receive_time = receive_time;
916   announce->sequence_id = msg->sequence_id;
917   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
918       sizeof (PtpClockIdentity));
919   announce->grandmaster_identity =
920       msg->message_specific.announce.grandmaster_identity;
921   announce->grandmaster_priority_1 =
922       msg->message_specific.announce.grandmaster_priority_1;
923   announce->grandmaster_clock_quality.clock_class =
924       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
925   announce->grandmaster_clock_quality.clock_accuracy =
926       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
927   announce->grandmaster_clock_quality.offset_scaled_log_variance =
928       msg->message_specific.announce.
929       grandmaster_clock_quality.offset_scaled_log_variance;
930   announce->grandmaster_priority_2 =
931       msg->message_specific.announce.grandmaster_priority_2;
932   announce->steps_removed = msg->message_specific.announce.steps_removed;
933   announce->time_source = msg->message_specific.announce.time_source;
934   g_queue_push_tail (&sender->announce_messages, announce);
935
936   select_best_master_clock (domain, receive_time);
937 }
938
939 static gboolean
940 send_delay_req_timeout (PtpPendingSync * sync)
941 {
942   StdIOHeader header = { 0, };
943   guint8 delay_req[44];
944   GstByteWriter writer;
945   GIOStatus status;
946   gsize written;
947   GError *err = NULL;
948
949   header.type = TYPE_EVENT;
950   header.size = 44;
951
952   GST_TRACE ("Sending delay_req to domain %u", sync->domain);
953
954   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
955   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
956   gst_byte_writer_put_uint8_unchecked (&writer, 2);
957   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
958   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
959   gst_byte_writer_put_uint8_unchecked (&writer, 0);
960   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
961   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
962   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
963   gst_byte_writer_put_uint64_be_unchecked (&writer,
964       ptp_clock_id.clock_identity);
965   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
966   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
967   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
968   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
969   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
970   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
971
972   status =
973       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
974       sizeof (header), &written, &err);
975   if (status == G_IO_STATUS_ERROR) {
976     g_warning ("Failed to write to stdout: %s", err->message);
977     g_clear_error (&err);
978     return G_SOURCE_REMOVE;
979   } else if (status == G_IO_STATUS_EOF) {
980     g_message ("EOF on stdout");
981     g_main_loop_quit (main_loop);
982     return G_SOURCE_REMOVE;
983   } else if (status != G_IO_STATUS_NORMAL) {
984     g_warning ("Unexpected stdout write status: %d", status);
985     g_main_loop_quit (main_loop);
986     return G_SOURCE_REMOVE;
987   } else if (written != sizeof (header)) {
988     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
989     g_main_loop_quit (main_loop);
990     return G_SOURCE_REMOVE;
991   }
992
993   sync->delay_req_send_time_local =
994       gst_clock_get_time (observation_system_clock);
995
996   status =
997       g_io_channel_write_chars (stdout_channel,
998       (const gchar *) delay_req, 44, &written, &err);
999   if (status == G_IO_STATUS_ERROR) {
1000     g_warning ("Failed to write to stdout: %s", err->message);
1001     g_clear_error (&err);
1002     g_main_loop_quit (main_loop);
1003     return G_SOURCE_REMOVE;
1004   } else if (status == G_IO_STATUS_EOF) {
1005     g_message ("EOF on stdout");
1006     g_main_loop_quit (main_loop);
1007     return G_SOURCE_REMOVE;
1008   } else if (status != G_IO_STATUS_NORMAL) {
1009     g_warning ("Unexpected stdout write status: %d", status);
1010     g_main_loop_quit (main_loop);
1011     return G_SOURCE_REMOVE;
1012   } else if (written != 44) {
1013     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
1014     g_main_loop_quit (main_loop);
1015     return G_SOURCE_REMOVE;
1016   }
1017
1018   return G_SOURCE_REMOVE;
1019 }
1020
1021 static gboolean
1022 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
1023 {
1024   GstClockTime now = gst_clock_get_time (observation_system_clock);
1025   guint timeout;
1026   GSource *timeout_source;
1027
1028   if (domain->last_delay_req != 0
1029       && domain->last_delay_req + domain->min_delay_req_interval > now) {
1030     GST_TRACE ("Too soon to send new DELAY_REQ");
1031     return FALSE;
1032   }
1033
1034   domain->last_delay_req = now;
1035   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1036
1037   /* IEEE 1588 9.5.11.2 */
1038   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1039     timeout = 0;
1040   else
1041     timeout =
1042         g_rand_int_range (delay_req_rand, 0,
1043         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1044
1045   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1046   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1047   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1048       sync, NULL);
1049   g_source_attach (timeout_source, main_context);
1050
1051   return TRUE;
1052 }
1053
1054 /* Filtering of outliers for RTT and time calculations inspired
1055  * by the code from gstnetclientclock.c
1056  */
1057 static void
1058 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1059 {
1060   GstClockTime internal_time, external_time, rate_num, rate_den;
1061   GstClockTime corrected_ptp_time, corrected_local_time;
1062   gdouble r_squared = 0.0;
1063   gboolean synced;
1064   GstClockTimeDiff discont = 0;
1065   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1066 #ifdef USE_MEASUREMENT_FILTERING
1067   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1068       orig_rate_den;
1069   GstClockTime new_estimated_ptp_time;
1070   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1071   gboolean now_synced;
1072 #endif
1073 #ifdef USE_ONLY_SYNC_WITH_DELAY
1074   GstClockTime mean_path_delay;
1075 #endif
1076
1077   GST_TRACE ("Updating PTP time");
1078
1079 #ifdef USE_ONLY_SYNC_WITH_DELAY
1080   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE) {
1081     GST_TRACE ("Not updating - no delay_req sent");
1082     return;
1083   }
1084
1085   /* IEEE 1588 11.3 */
1086   mean_path_delay =
1087       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1088       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1089       (sync->correction_field_sync + sync->correction_field_delay +
1090           32768) / 65536) / 2;
1091 #endif
1092
1093   /* IEEE 1588 11.2 */
1094   corrected_ptp_time =
1095       sync->sync_send_time_remote +
1096       (sync->correction_field_sync + 32768) / 65536;
1097
1098 #ifdef USE_ONLY_SYNC_WITH_DELAY
1099   corrected_local_time = sync->sync_recv_time_local - mean_path_delay;
1100 #else
1101   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1102 #endif
1103
1104 #ifdef USE_MEASUREMENT_FILTERING
1105   /* We check this here and when updating the mean path delay, because
1106    * we can get here without a delay response too. The tolerance on
1107    * accepting follow-up after a sync is high, because a PTP server
1108    * doesn't have to prioritise sending FOLLOW_UP - its purpose is
1109    * just to give us the accurate timestamp of the preceding SYNC.
1110    *
1111    * For that reason also allow at least 100ms delay in case of delays smaller
1112    * than 5ms. */
1113   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1114       && sync->follow_up_recv_time_local >
1115       sync->sync_recv_time_local + MAX (100 * GST_MSECOND,
1116           20 * domain->mean_path_delay)) {
1117     GstClockTimeDiff delay =
1118         sync->follow_up_recv_time_local - sync->sync_recv_time_local;
1119     GST_WARNING ("Sync-follow-up delay for domain %u too big: %"
1120         GST_STIME_FORMAT " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")",
1121         domain->domain, GST_STIME_ARGS (delay),
1122         GST_TIME_ARGS (domain->mean_path_delay));
1123     synced = FALSE;
1124     gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1125         &internal_time, &external_time, &rate_num, &rate_den);
1126     goto out;
1127   }
1128 #endif
1129
1130   /* Set an initial local-remote relation */
1131   if (domain->last_ptp_time == 0)
1132     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1133         corrected_ptp_time, 1, 1);
1134
1135 #ifdef USE_MEASUREMENT_FILTERING
1136   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1137    * estimate with our present knowledge about the clock
1138    */
1139   /* Store what the clock produced as 'now' before this update */
1140   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1141       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1142   internal_time = orig_internal_time;
1143   external_time = orig_external_time;
1144   rate_num = orig_rate_num;
1145   rate_den = orig_rate_den;
1146
1147   /* 3/4 RTT window around the estimation */
1148   max_discont = domain->mean_path_delay * 3 / 2;
1149
1150   /* Check if the estimated sync time is inside our window */
1151   estimated_ptp_time_min = corrected_local_time - max_discont;
1152   estimated_ptp_time_min =
1153       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1154       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1155   estimated_ptp_time_max = corrected_local_time + max_discont;
1156   estimated_ptp_time_max =
1157       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1158       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1159
1160   synced = (estimated_ptp_time_min < corrected_ptp_time
1161       && corrected_ptp_time < estimated_ptp_time_max);
1162
1163   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1164       GST_TIME_FORMAT, domain->domain,
1165       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1166
1167   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1168       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1169       GST_TIME_ARGS (corrected_ptp_time),
1170       GST_TIME_ARGS (estimated_ptp_time_max));
1171
1172   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1173           corrected_local_time, corrected_ptp_time, &r_squared,
1174           &internal_time, &external_time, &rate_num, &rate_den)) {
1175     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1176
1177     /* Old estimated PTP time based on receive time and path delay */
1178     estimated_ptp_time = corrected_local_time;
1179     estimated_ptp_time =
1180         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1181         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1182         orig_external_time, orig_rate_num, orig_rate_den);
1183
1184     /* New estimated PTP time based on receive time and path delay */
1185     new_estimated_ptp_time = corrected_local_time;
1186     new_estimated_ptp_time =
1187         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1188         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1189         external_time, rate_num, rate_den);
1190
1191     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1192     if (synced && ABS (discont) > max_discont) {
1193       GstClockTimeDiff offset;
1194       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1195           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1196           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1197           GST_TIME_ARGS (max_discont));
1198       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1199         offset = max_discont - discont;
1200         if (-offset > external_time)
1201           external_time = 0;
1202         else
1203           external_time += offset;
1204       } else {                  /* Too large a backward step - add a +ve offset */
1205         offset = -(max_discont + discont);
1206         external_time += offset;
1207       }
1208
1209       discont += offset;
1210     } else {
1211       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1212           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1213           GST_TIME_ARGS (max_discont));
1214     }
1215
1216     /* Check if the estimated sync time is now (still) inside our window */
1217     estimated_ptp_time_min = corrected_local_time - max_discont;
1218     estimated_ptp_time_min =
1219         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1220         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1221         external_time, rate_num, rate_den);
1222     estimated_ptp_time_max = corrected_local_time + max_discont;
1223     estimated_ptp_time_max =
1224         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1225         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1226         external_time, rate_num, rate_den);
1227
1228     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1229         && corrected_ptp_time < estimated_ptp_time_max);
1230
1231     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1232         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1233         GST_TIME_ARGS (corrected_ptp_time),
1234         GST_TIME_ARGS (estimated_ptp_time_max));
1235
1236     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1237       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1238           internal_time, external_time, rate_num, rate_den);
1239       domain->skipped_updates = 0;
1240
1241       domain->last_ptp_time = corrected_ptp_time;
1242       domain->last_local_time = corrected_local_time;
1243     } else {
1244       domain->skipped_updates++;
1245     }
1246   } else {
1247     domain->last_ptp_time = corrected_ptp_time;
1248     domain->last_local_time = corrected_local_time;
1249   }
1250
1251 #else
1252   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1253       GST_TIME_FORMAT, domain->domain,
1254       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1255
1256   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1257       &internal_time, &external_time, &rate_num, &rate_den);
1258
1259   estimated_ptp_time = corrected_local_time;
1260   estimated_ptp_time =
1261       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1262       (domain->domain_clock), estimated_ptp_time, internal_time,
1263       external_time, rate_num, rate_den);
1264
1265   gst_clock_add_observation (domain->domain_clock,
1266       corrected_local_time, corrected_ptp_time, &r_squared);
1267
1268   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1269       &internal_time, &external_time, &rate_num, &rate_den);
1270
1271   synced = TRUE;
1272   domain->last_ptp_time = corrected_ptp_time;
1273   domain->last_local_time = corrected_local_time;
1274 #endif
1275
1276 #ifdef USE_MEASUREMENT_FILTERING
1277 out:
1278 #endif
1279   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1280     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1281         "domain", G_TYPE_UINT, domain->domain,
1282         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1283         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1284         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1285         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1286         "discontinuity", G_TYPE_INT64, discont,
1287         "synced", G_TYPE_BOOLEAN, synced,
1288         "r-squared", G_TYPE_DOUBLE, r_squared,
1289         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1290         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1291         "rate-num", G_TYPE_UINT64, rate_num,
1292         "rate-den", G_TYPE_UINT64, rate_den,
1293         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1294         NULL);
1295     emit_ptp_statistics (domain->domain, stats);
1296     gst_structure_free (stats);
1297   }
1298
1299 }
1300
1301 #ifdef USE_MEDIAN_PRE_FILTERING
1302 static gint
1303 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1304 {
1305   if (*a < *b)
1306     return -1;
1307   else if (*a > *b)
1308     return 1;
1309   return 0;
1310 }
1311 #endif
1312
1313 static gboolean
1314 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1315 {
1316 #ifdef USE_MEDIAN_PRE_FILTERING
1317   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
1318   GstClockTime median;
1319   gint i;
1320 #endif
1321
1322   GstClockTime mean_path_delay, delay_req_delay = 0;
1323   gboolean ret;
1324
1325   /* IEEE 1588 11.3 */
1326   mean_path_delay =
1327       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1328       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1329       (sync->correction_field_sync + sync->correction_field_delay +
1330           32768) / 65536) / 2;
1331
1332 #ifdef USE_MEDIAN_PRE_FILTERING
1333   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
1334     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1335   domain->last_path_delays[i - 1] = mean_path_delay;
1336
1337   if (domain->last_path_delays_missing) {
1338     domain->last_path_delays_missing--;
1339   } else {
1340     memcpy (&last_path_delays, &domain->last_path_delays,
1341         sizeof (last_path_delays));
1342     g_qsort_with_data (&last_path_delays,
1343         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
1344         (GCompareDataFunc) compare_clock_time, NULL);
1345
1346     median = last_path_delays[MEDIAN_PRE_FILTERING_WINDOW / 2];
1347
1348     /* FIXME: We might want to use something else here, like only allowing
1349      * things in the interquartile range, or also filtering away delays that
1350      * are too small compared to the median. This here worked well enough
1351      * in tests so far.
1352      */
1353     if (mean_path_delay > 2 * median) {
1354       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1355           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1356           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1357       ret = FALSE;
1358       goto out;
1359     }
1360   }
1361 #endif
1362
1363 #ifdef USE_RUNNING_AVERAGE_DELAY
1364   /* Track an average round trip time, for a bit of smoothing */
1365   /* Always update before discarding a sample, so genuine changes in
1366    * the network get picked up, eventually */
1367   if (domain->mean_path_delay == 0)
1368     domain->mean_path_delay = mean_path_delay;
1369   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1370     domain->mean_path_delay =
1371         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1372   else
1373     domain->mean_path_delay =
1374         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1375 #else
1376   domain->mean_path_delay = mean_path_delay;
1377 #endif
1378
1379 #ifdef USE_MEASUREMENT_FILTERING
1380   /* The tolerance on accepting follow-up after a sync is high, because
1381    * a PTP server doesn't have to prioritise sending FOLLOW_UP - its purpose is
1382    * just to give us the accurate timestamp of the preceding SYNC.
1383    *
1384    * For that reason also allow at least 100ms delay in case of delays smaller
1385    * than 5ms. */
1386   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1387       domain->mean_path_delay != 0
1388       && sync->follow_up_recv_time_local >
1389       MAX (100 * GST_MSECOND,
1390           sync->sync_recv_time_local + 20 * domain->mean_path_delay)) {
1391     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1392         " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")", domain->domain,
1393         GST_TIME_ARGS (sync->follow_up_recv_time_local -
1394             sync->sync_recv_time_local),
1395         GST_TIME_ARGS (domain->mean_path_delay));
1396     ret = FALSE;
1397     goto out;
1398   }
1399
1400   if (mean_path_delay > 2 * domain->mean_path_delay) {
1401     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1402         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1403         GST_TIME_ARGS (mean_path_delay),
1404         GST_TIME_ARGS (domain->mean_path_delay));
1405     ret = FALSE;
1406     goto out;
1407   }
1408 #endif
1409
1410   delay_req_delay =
1411       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1412
1413 #ifdef USE_MEASUREMENT_FILTERING
1414   /* delay_req_delay is a RTT, so 2 times the path delay is what we'd
1415    * hope for, but some PTP systems don't prioritise sending DELAY_RESP,
1416    * but they must still have placed an accurate reception timestamp.
1417    * That means we should be quite tolerant about late DELAY_RESP, and
1418    * mostly rely on filtering out jumps in the mean-path-delay elsewhere.
1419    *
1420    * For that reason also allow at least 100ms delay in case of delays smaller
1421    * than 5ms. */
1422   if (delay_req_delay > MAX (100 * GST_MSECOND, 20 * domain->mean_path_delay)) {
1423     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1424         GST_TIME_FORMAT " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")",
1425         domain->domain, GST_TIME_ARGS (delay_req_delay),
1426         GST_TIME_ARGS (domain->mean_path_delay));
1427     ret = FALSE;
1428     goto out;
1429   }
1430 #endif
1431
1432   ret = TRUE;
1433
1434   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1435       GST_TIME_FORMAT ")", domain->domain,
1436       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1437   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1438       domain->domain, GST_TIME_ARGS (delay_req_delay));
1439
1440 #if defined(USE_MEASUREMENT_FILTERING) || defined(USE_MEDIAN_PRE_FILTERING)
1441 out:
1442 #endif
1443   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1444     GstStructure *stats =
1445         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1446         "domain", G_TYPE_UINT, domain->domain,
1447         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1448         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1449         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1450     emit_ptp_statistics (domain->domain, stats);
1451     gst_structure_free (stats);
1452   }
1453
1454   return ret;
1455 }
1456
1457 static void
1458 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1459 {
1460   GList *l;
1461   PtpDomainData *domain = NULL;
1462   PtpPendingSync *sync = NULL;
1463
1464   /* Don't consider messages with the alternate master flag set */
1465   if ((msg->flag_field & 0x0100)) {
1466     GST_TRACE ("Ignoring sync message with alternate-master flag");
1467     return;
1468   }
1469
1470   for (l = domain_data; l; l = l->next) {
1471     PtpDomainData *tmp = l->data;
1472
1473     if (msg->domain_number == tmp->domain) {
1474       domain = tmp;
1475       break;
1476     }
1477   }
1478
1479   if (!domain) {
1480     gchar *clock_name;
1481
1482     domain = g_new0 (PtpDomainData, 1);
1483     domain->domain = msg->domain_number;
1484     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1485     domain->domain_clock =
1486         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1487     gst_object_ref_sink (domain->domain_clock);
1488     g_free (clock_name);
1489     g_queue_init (&domain->pending_syncs);
1490     domain->last_path_delays_missing = 9;
1491     domain_data = g_list_prepend (domain_data, domain);
1492
1493     g_mutex_lock (&domain_clocks_lock);
1494     domain_clocks = g_list_prepend (domain_clocks, domain);
1495     g_mutex_unlock (&domain_clocks_lock);
1496   }
1497
1498   /* If we have a master clock, ignore this message if it's not coming from there */
1499   if (domain->have_master_clock
1500       && compare_clock_identity (&domain->master_clock_identity,
1501           &msg->source_port_identity) != 0)
1502     return;
1503
1504 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1505   /* Opportunistic selection of master clock */
1506   if (!domain->have_master_clock)
1507     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1508         sizeof (PtpClockIdentity));
1509 #else
1510   if (!domain->have_master_clock)
1511     return;
1512 #endif
1513
1514   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1515
1516   /* Check if duplicated */
1517   for (l = domain->pending_syncs.head; l; l = l->next) {
1518     PtpPendingSync *tmp = l->data;
1519
1520     if (tmp->sync_seqnum == msg->sequence_id)
1521       return;
1522   }
1523
1524   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1525       GST_CLOCK_TIME_NONE / GST_SECOND) {
1526     GST_FIXME ("Unsupported sync message seconds field value: %"
1527         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1528         msg->message_specific.sync.origin_timestamp.seconds_field,
1529         GST_CLOCK_TIME_NONE / GST_SECOND);
1530     return;
1531   }
1532
1533   sync = g_new0 (PtpPendingSync, 1);
1534   sync->domain = domain->domain;
1535   sync->sync_seqnum = msg->sequence_id;
1536   sync->sync_recv_time_local = receive_time;
1537   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1538   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1539   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1540   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1541   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1542
1543   /* 0.5 correction factor for division later */
1544   sync->correction_field_sync = msg->correction_field;
1545
1546   if ((msg->flag_field & 0x0200)) {
1547     /* Wait for FOLLOW_UP */
1548     GST_TRACE ("Waiting for FOLLOW_UP msg");
1549   } else {
1550     sync->sync_send_time_remote =
1551         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1552         sync.origin_timestamp);
1553
1554     if (domain->last_ptp_sync_time != 0
1555         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1556       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1557           GST_TIME_FORMAT, domain->domain,
1558           GST_TIME_ARGS (domain->last_ptp_sync_time),
1559           GST_TIME_ARGS (sync->sync_send_time_remote));
1560       ptp_pending_sync_free (sync);
1561       sync = NULL;
1562       return;
1563     }
1564     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1565
1566     if (send_delay_req (domain, sync)) {
1567       /* Sent delay request */
1568     } else {
1569       update_ptp_time (domain, sync);
1570       ptp_pending_sync_free (sync);
1571       sync = NULL;
1572     }
1573   }
1574
1575   if (sync)
1576     g_queue_push_tail (&domain->pending_syncs, sync);
1577 }
1578
1579 static void
1580 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1581 {
1582   GList *l;
1583   PtpDomainData *domain = NULL;
1584   PtpPendingSync *sync = NULL;
1585
1586   GST_TRACE ("Processing FOLLOW_UP message");
1587
1588   /* Don't consider messages with the alternate master flag set */
1589   if ((msg->flag_field & 0x0100)) {
1590     GST_TRACE ("Ignoring FOLLOW_UP with alternate-master flag");
1591     return;
1592   }
1593
1594   for (l = domain_data; l; l = l->next) {
1595     PtpDomainData *tmp = l->data;
1596
1597     if (msg->domain_number == tmp->domain) {
1598       domain = tmp;
1599       break;
1600     }
1601   }
1602
1603   if (!domain) {
1604     GST_TRACE ("No domain match for FOLLOW_UP msg");
1605     return;
1606   }
1607
1608   /* If we have a master clock, ignore this message if it's not coming from there */
1609   if (domain->have_master_clock
1610       && compare_clock_identity (&domain->master_clock_identity,
1611           &msg->source_port_identity) != 0) {
1612     GST_TRACE ("FOLLOW_UP msg not from current clock master. Ignoring");
1613     return;
1614   }
1615
1616   /* Check if we know about this one */
1617   for (l = domain->pending_syncs.head; l; l = l->next) {
1618     PtpPendingSync *tmp = l->data;
1619
1620     if (tmp->sync_seqnum == msg->sequence_id) {
1621       sync = tmp;
1622       break;
1623     }
1624   }
1625
1626   if (!sync) {
1627     GST_TRACE ("Ignoring FOLLOW_UP with no pending SYNC");
1628     return;
1629   }
1630
1631   /* Got a FOLLOW_UP for this already */
1632   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE) {
1633     GST_TRACE ("Got repeat FOLLOW_UP. Ignoring");
1634     return;
1635   }
1636
1637   if (sync->sync_recv_time_local >= receive_time) {
1638     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1639         GST_TIME_FORMAT, domain->domain,
1640         GST_TIME_ARGS (sync->sync_recv_time_local),
1641         GST_TIME_ARGS (receive_time));
1642     g_queue_remove (&domain->pending_syncs, sync);
1643     ptp_pending_sync_free (sync);
1644     return;
1645   }
1646
1647   sync->correction_field_sync += msg->correction_field;
1648   sync->sync_send_time_remote =
1649       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1650       follow_up.precise_origin_timestamp);
1651   sync->follow_up_recv_time_local = receive_time;
1652
1653   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1654     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1655         GST_TIME_FORMAT, domain->domain,
1656         GST_TIME_ARGS (domain->last_ptp_sync_time),
1657         GST_TIME_ARGS (sync->sync_send_time_remote));
1658     g_queue_remove (&domain->pending_syncs, sync);
1659     ptp_pending_sync_free (sync);
1660     sync = NULL;
1661     return;
1662   }
1663   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1664
1665   if (send_delay_req (domain, sync)) {
1666     /* Sent delay request */
1667   } else {
1668     update_ptp_time (domain, sync);
1669     g_queue_remove (&domain->pending_syncs, sync);
1670     ptp_pending_sync_free (sync);
1671     sync = NULL;
1672   }
1673 }
1674
1675 static void
1676 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1677 {
1678   GList *l;
1679   PtpDomainData *domain = NULL;
1680   PtpPendingSync *sync = NULL;
1681
1682   /* Don't consider messages with the alternate master flag set */
1683   if ((msg->flag_field & 0x0100))
1684     return;
1685
1686   for (l = domain_data; l; l = l->next) {
1687     PtpDomainData *tmp = l->data;
1688
1689     if (msg->domain_number == tmp->domain) {
1690       domain = tmp;
1691       break;
1692     }
1693   }
1694
1695   if (!domain)
1696     return;
1697
1698   /* If we have a master clock, ignore this message if it's not coming from there */
1699   if (domain->have_master_clock
1700       && compare_clock_identity (&domain->master_clock_identity,
1701           &msg->source_port_identity) != 0)
1702     return;
1703
1704   /* Not for us */
1705   if (msg->message_specific.delay_resp.
1706       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1707       || msg->message_specific.delay_resp.
1708       requesting_port_identity.port_number != ptp_clock_id.port_number)
1709     return;
1710
1711   domain->min_delay_req_interval =
1712       log2_to_clock_time (msg->log_message_interval);
1713
1714   /* Check if we know about this one */
1715   for (l = domain->pending_syncs.head; l; l = l->next) {
1716     PtpPendingSync *tmp = l->data;
1717
1718     if (tmp->delay_req_seqnum == msg->sequence_id) {
1719       sync = tmp;
1720       break;
1721     }
1722   }
1723
1724   if (!sync)
1725     return;
1726
1727   /* Got a DELAY_RESP for this already */
1728   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1729     return;
1730
1731   if (sync->delay_req_send_time_local > receive_time) {
1732     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1733         GST_TIME_FORMAT, domain->domain,
1734         GST_TIME_ARGS (sync->delay_req_send_time_local),
1735         GST_TIME_ARGS (receive_time));
1736     g_queue_remove (&domain->pending_syncs, sync);
1737     ptp_pending_sync_free (sync);
1738     return;
1739   }
1740
1741   sync->correction_field_delay = msg->correction_field;
1742
1743   sync->delay_req_recv_time_remote =
1744       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1745       delay_resp.receive_timestamp);
1746   sync->delay_resp_recv_time_local = receive_time;
1747
1748   if (domain->mean_path_delay != 0
1749       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1750     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1751         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1752         GST_TIME_ARGS (sync->sync_send_time_remote),
1753         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1754     g_queue_remove (&domain->pending_syncs, sync);
1755     ptp_pending_sync_free (sync);
1756     return;
1757   }
1758
1759   if (update_mean_path_delay (domain, sync))
1760     update_ptp_time (domain, sync);
1761   g_queue_remove (&domain->pending_syncs, sync);
1762   ptp_pending_sync_free (sync);
1763 }
1764
1765 static void
1766 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1767 {
1768   /* Ignore our own messages */
1769   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1770       msg->source_port_identity.port_number == ptp_clock_id.port_number) {
1771     GST_TRACE ("Ignoring our own message");
1772     return;
1773   }
1774
1775   GST_TRACE ("Message type %d receive_time %" GST_TIME_FORMAT,
1776       msg->message_type, GST_TIME_ARGS (receive_time));
1777   switch (msg->message_type) {
1778     case PTP_MESSAGE_TYPE_ANNOUNCE:
1779       handle_announce_message (msg, receive_time);
1780       break;
1781     case PTP_MESSAGE_TYPE_SYNC:
1782       handle_sync_message (msg, receive_time);
1783       break;
1784     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1785       handle_follow_up_message (msg, receive_time);
1786       break;
1787     case PTP_MESSAGE_TYPE_DELAY_RESP:
1788       handle_delay_resp_message (msg, receive_time);
1789       break;
1790     default:
1791       break;
1792   }
1793 }
1794
1795 static gboolean
1796 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1797     gpointer user_data)
1798 {
1799   GIOStatus status;
1800   StdIOHeader header;
1801   gchar buffer[8192];
1802   GError *err = NULL;
1803   gsize read;
1804
1805   if ((condition & G_IO_STATUS_EOF)) {
1806     GST_ERROR ("Got EOF on stdin");
1807     g_main_loop_quit (main_loop);
1808     return G_SOURCE_REMOVE;
1809   }
1810
1811   status =
1812       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1813       &read, &err);
1814   if (status == G_IO_STATUS_ERROR) {
1815     GST_ERROR ("Failed to read from stdin: %s", err->message);
1816     g_clear_error (&err);
1817     g_main_loop_quit (main_loop);
1818     return G_SOURCE_REMOVE;
1819   } else if (status == G_IO_STATUS_EOF) {
1820     GST_ERROR ("Got EOF on stdin");
1821     g_main_loop_quit (main_loop);
1822     return G_SOURCE_REMOVE;
1823   } else if (status != G_IO_STATUS_NORMAL) {
1824     GST_ERROR ("Unexpected stdin read status: %d", status);
1825     g_main_loop_quit (main_loop);
1826     return G_SOURCE_REMOVE;
1827   } else if (read != sizeof (header)) {
1828     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1829     g_main_loop_quit (main_loop);
1830     return G_SOURCE_REMOVE;
1831   } else if (header.size > 8192) {
1832     GST_ERROR ("Unexpected size: %u", header.size);
1833     g_main_loop_quit (main_loop);
1834     return G_SOURCE_REMOVE;
1835   }
1836
1837   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1838   if (status == G_IO_STATUS_ERROR) {
1839     GST_ERROR ("Failed to read from stdin: %s", err->message);
1840     g_clear_error (&err);
1841     g_main_loop_quit (main_loop);
1842     return G_SOURCE_REMOVE;
1843   } else if (status == G_IO_STATUS_EOF) {
1844     GST_ERROR ("EOF on stdin");
1845     g_main_loop_quit (main_loop);
1846     return G_SOURCE_REMOVE;
1847   } else if (status != G_IO_STATUS_NORMAL) {
1848     GST_ERROR ("Unexpected stdin read status: %d", status);
1849     g_main_loop_quit (main_loop);
1850     return G_SOURCE_REMOVE;
1851   } else if (read != header.size) {
1852     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1853     g_main_loop_quit (main_loop);
1854     return G_SOURCE_REMOVE;
1855   }
1856
1857   switch (header.type) {
1858     case TYPE_EVENT:
1859     case TYPE_GENERAL:{
1860       GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
1861       PtpMessage msg;
1862
1863       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1864         dump_ptp_message (&msg);
1865         handle_ptp_message (&msg, receive_time);
1866       }
1867       break;
1868     }
1869     default:
1870     case TYPE_CLOCK_ID:{
1871       if (header.size != 8) {
1872         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1873         g_main_loop_quit (main_loop);
1874         return G_SOURCE_REMOVE;
1875       }
1876       g_mutex_lock (&ptp_lock);
1877       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1878 #ifdef G_OS_WIN32
1879       ptp_clock_id.port_number = (guint16) GetCurrentProcessId ();
1880 #else
1881       ptp_clock_id.port_number = getpid ();
1882 #endif
1883       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1884           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1885       g_cond_signal (&ptp_cond);
1886       g_mutex_unlock (&ptp_lock);
1887       break;
1888     }
1889   }
1890
1891   return G_SOURCE_CONTINUE;
1892 }
1893
1894 /* Cleanup all announce messages and announce message senders
1895  * that are timed out by now, and clean up all pending syncs
1896  * that are missing their FOLLOW_UP or DELAY_RESP */
1897 static gboolean
1898 cleanup_cb (gpointer data)
1899 {
1900   GstClockTime now = gst_clock_get_time (observation_system_clock);
1901   GList *l, *m, *n;
1902
1903   for (l = domain_data; l; l = l->next) {
1904     PtpDomainData *domain = l->data;
1905
1906     for (n = domain->announce_senders; n;) {
1907       PtpAnnounceSender *sender = n->data;
1908       gboolean timed_out = TRUE;
1909
1910       /* Keep only 5 messages per sender around */
1911       while (g_queue_get_length (&sender->announce_messages) > 5) {
1912         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1913         g_free (msg);
1914       }
1915
1916       for (m = sender->announce_messages.head; m; m = m->next) {
1917         PtpAnnounceMessage *msg = m->data;
1918
1919         if (msg->receive_time +
1920             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1921           timed_out = FALSE;
1922           break;
1923         }
1924       }
1925
1926       if (timed_out) {
1927         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1928             sender->master_clock_identity.clock_identity,
1929             sender->master_clock_identity.port_number);
1930         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1931         g_queue_clear (&sender->announce_messages);
1932       }
1933
1934       if (g_queue_get_length (&sender->announce_messages) == 0) {
1935         GList *tmp = n->next;
1936
1937         if (compare_clock_identity (&sender->master_clock_identity,
1938                 &domain->master_clock_identity) == 0)
1939           GST_WARNING ("currently selected master clock timed out");
1940         g_free (sender);
1941         domain->announce_senders =
1942             g_list_delete_link (domain->announce_senders, n);
1943         n = tmp;
1944       } else {
1945         n = n->next;
1946       }
1947     }
1948     select_best_master_clock (domain, now);
1949
1950     /* Clean up any pending syncs */
1951     for (n = domain->pending_syncs.head; n;) {
1952       PtpPendingSync *sync = n->data;
1953       gboolean timed_out = FALSE;
1954
1955       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1956        * and pending delay reqs after 4 delay req intervals or 10 seconds
1957        */
1958       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1959           ((domain->min_delay_req_interval != 0
1960                   && sync->delay_req_send_time_local +
1961                   4 * domain->min_delay_req_interval < now)
1962               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1963         timed_out = TRUE;
1964       } else if ((domain->sync_interval != 0
1965               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
1966           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
1967         timed_out = TRUE;
1968       }
1969
1970       if (timed_out) {
1971         GList *tmp = n->next;
1972         ptp_pending_sync_free (sync);
1973         g_queue_delete_link (&domain->pending_syncs, n);
1974         n = tmp;
1975       } else {
1976         n = n->next;
1977       }
1978     }
1979   }
1980
1981   return G_SOURCE_CONTINUE;
1982 }
1983
1984 static gpointer
1985 ptp_helper_main (gpointer data)
1986 {
1987   GSource *cleanup_source;
1988
1989   GST_DEBUG ("Starting PTP helper loop");
1990
1991   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
1992   cleanup_source = g_timeout_source_new_seconds (5);
1993   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
1994   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
1995   g_source_attach (cleanup_source, main_context);
1996   g_source_unref (cleanup_source);
1997
1998   g_main_loop_run (main_loop);
1999   GST_DEBUG ("Stopped PTP helper loop");
2000
2001   g_mutex_lock (&ptp_lock);
2002   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2003   ptp_clock_id.port_number = 0;
2004   initted = FALSE;
2005   g_cond_signal (&ptp_cond);
2006   g_mutex_unlock (&ptp_lock);
2007
2008   return NULL;
2009 }
2010
2011 /**
2012  * gst_ptp_is_supported:
2013  *
2014  * Check if PTP clocks are generally supported on this system, and if previous
2015  * initializations did not fail.
2016  *
2017  * Returns: %TRUE if PTP clocks are generally supported on this system, and
2018  * previous initializations did not fail.
2019  *
2020  * Since: 1.6
2021  */
2022 gboolean
2023 gst_ptp_is_supported (void)
2024 {
2025   return supported;
2026 }
2027
2028 /**
2029  * gst_ptp_is_initialized:
2030  *
2031  * Check if the GStreamer PTP clock subsystem is initialized.
2032  *
2033  * Returns: %TRUE if the GStreamer PTP clock subsystem is initialized.
2034  *
2035  * Since: 1.6
2036  */
2037 gboolean
2038 gst_ptp_is_initialized (void)
2039 {
2040   return initted;
2041 }
2042
2043 /**
2044  * gst_ptp_init:
2045  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
2046  * @interfaces: (transfer none) (array zero-terminated=1) (allow-none): network interfaces to run the clock on
2047  *
2048  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
2049  * slave-only mode for all domains on the given @interfaces with the
2050  * given @clock_id.
2051  *
2052  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
2053  * generated from the MAC address of the first network interface.
2054  *
2055  * This function is automatically called by gst_ptp_clock_new() with default
2056  * parameters if it wasn't called before.
2057  *
2058  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
2059  *
2060  * Since: 1.6
2061  */
2062 gboolean
2063 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
2064 {
2065   gboolean ret;
2066   const gchar *env;
2067   gchar **argv = NULL;
2068   gint argc, argc_c;
2069   gint fd_r, fd_w;
2070   GError *err = NULL;
2071   GSource *stdin_source;
2072
2073   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
2074
2075   g_mutex_lock (&ptp_lock);
2076   if (!supported) {
2077     GST_ERROR ("PTP not supported");
2078     ret = FALSE;
2079     goto done;
2080   }
2081
2082   if (initted) {
2083     GST_DEBUG ("PTP already initialized");
2084     ret = TRUE;
2085     goto done;
2086   }
2087
2088   if (ptp_helper_pid) {
2089     GST_DEBUG ("PTP currently initializing");
2090     goto wait;
2091   }
2092
2093   if (!domain_stats_hooks_initted) {
2094     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2095     domain_stats_hooks_initted = TRUE;
2096   }
2097
2098   argc = 1;
2099   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2100     argc += 2;
2101   if (interfaces != NULL)
2102     argc += 2 * g_strv_length (interfaces);
2103
2104   argv = g_new0 (gchar *, argc + 2);
2105   argc_c = 0;
2106
2107   env = g_getenv ("GST_PTP_HELPER_1_0");
2108   if (env == NULL)
2109     env = g_getenv ("GST_PTP_HELPER");
2110   if (env != NULL && *env != '\0') {
2111     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2112     argv[argc_c++] = g_strdup (env);
2113   } else {
2114     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2115   }
2116
2117   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2118     argv[argc_c++] = g_strdup ("-c");
2119     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2120   }
2121
2122   if (interfaces != NULL) {
2123     gchar **ptr = interfaces;
2124
2125     while (*ptr) {
2126       argv[argc_c++] = g_strdup ("-i");
2127       argv[argc_c++] = g_strdup (*ptr);
2128       ptr++;
2129     }
2130   }
2131
2132   main_context = g_main_context_new ();
2133   main_loop = g_main_loop_new (main_context, FALSE);
2134
2135   ptp_helper_thread =
2136       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2137   if (!ptp_helper_thread) {
2138     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2139     g_clear_error (&err);
2140     ret = FALSE;
2141     goto done;
2142   }
2143
2144   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2145           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2146     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2147     g_clear_error (&err);
2148     ret = FALSE;
2149     supported = FALSE;
2150     goto done;
2151   }
2152
2153   stdin_channel = g_io_channel_unix_new (fd_r);
2154   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2155   g_io_channel_set_buffered (stdin_channel, FALSE);
2156   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2157   stdin_source =
2158       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2159   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2160   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2161       NULL);
2162   g_source_attach (stdin_source, main_context);
2163   g_source_unref (stdin_source);
2164
2165   /* Create stdout channel */
2166   stdout_channel = g_io_channel_unix_new (fd_w);
2167   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2168   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2169   g_io_channel_set_buffered (stdout_channel, FALSE);
2170
2171   delay_req_rand = g_rand_new ();
2172   observation_system_clock =
2173       g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
2174       NULL);
2175   gst_object_ref_sink (observation_system_clock);
2176
2177   initted = TRUE;
2178
2179 wait:
2180   GST_DEBUG ("Waiting for PTP to be initialized");
2181
2182   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2183     g_cond_wait (&ptp_cond, &ptp_lock);
2184
2185   ret = initted;
2186   if (ret) {
2187     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2188         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2189   } else {
2190     GST_ERROR ("Failed to initialize");
2191     supported = FALSE;
2192   }
2193
2194 done:
2195   g_strfreev (argv);
2196
2197   if (!ret) {
2198     if (ptp_helper_pid) {
2199 #ifndef G_OS_WIN32
2200       kill (ptp_helper_pid, SIGKILL);
2201       waitpid (ptp_helper_pid, NULL, 0);
2202 #else
2203       TerminateProcess (ptp_helper_pid, 1);
2204       WaitForSingleObject (ptp_helper_pid, INFINITE);
2205 #endif
2206       g_spawn_close_pid (ptp_helper_pid);
2207     }
2208     ptp_helper_pid = 0;
2209
2210     if (stdin_channel)
2211       g_io_channel_unref (stdin_channel);
2212     stdin_channel = NULL;
2213     if (stdout_channel)
2214       g_io_channel_unref (stdout_channel);
2215     stdout_channel = NULL;
2216
2217     if (main_loop && ptp_helper_thread) {
2218       g_main_loop_quit (main_loop);
2219       g_thread_join (ptp_helper_thread);
2220     }
2221     ptp_helper_thread = NULL;
2222     if (main_loop)
2223       g_main_loop_unref (main_loop);
2224     main_loop = NULL;
2225     if (main_context)
2226       g_main_context_unref (main_context);
2227     main_context = NULL;
2228
2229     if (delay_req_rand)
2230       g_rand_free (delay_req_rand);
2231     delay_req_rand = NULL;
2232
2233     if (observation_system_clock)
2234       gst_object_unref (observation_system_clock);
2235     observation_system_clock = NULL;
2236   }
2237
2238   g_mutex_unlock (&ptp_lock);
2239
2240   return ret;
2241 }
2242
2243 /**
2244  * gst_ptp_deinit:
2245  *
2246  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2247  * are any remaining GstPtpClock instances, they won't be further synchronized
2248  * to the PTP network clock.
2249  *
2250  * Since: 1.6
2251  */
2252 void
2253 gst_ptp_deinit (void)
2254 {
2255   GList *l, *m;
2256
2257   g_mutex_lock (&ptp_lock);
2258
2259   if (ptp_helper_pid) {
2260 #ifndef G_OS_WIN32
2261     kill (ptp_helper_pid, SIGKILL);
2262     waitpid (ptp_helper_pid, NULL, 0);
2263 #else
2264     TerminateProcess (ptp_helper_pid, 1);
2265     WaitForSingleObject (ptp_helper_pid, INFINITE);
2266 #endif
2267     g_spawn_close_pid (ptp_helper_pid);
2268   }
2269   ptp_helper_pid = 0;
2270
2271   if (stdin_channel)
2272     g_io_channel_unref (stdin_channel);
2273   stdin_channel = NULL;
2274   if (stdout_channel)
2275     g_io_channel_unref (stdout_channel);
2276   stdout_channel = NULL;
2277
2278   if (main_loop && ptp_helper_thread) {
2279     GThread *tmp = ptp_helper_thread;
2280     ptp_helper_thread = NULL;
2281     g_mutex_unlock (&ptp_lock);
2282     g_main_loop_quit (main_loop);
2283     g_thread_join (tmp);
2284     g_mutex_lock (&ptp_lock);
2285   }
2286   if (main_loop)
2287     g_main_loop_unref (main_loop);
2288   main_loop = NULL;
2289   if (main_context)
2290     g_main_context_unref (main_context);
2291   main_context = NULL;
2292
2293   if (delay_req_rand)
2294     g_rand_free (delay_req_rand);
2295   delay_req_rand = NULL;
2296   if (observation_system_clock)
2297     gst_object_unref (observation_system_clock);
2298   observation_system_clock = NULL;
2299
2300   for (l = domain_data; l; l = l->next) {
2301     PtpDomainData *domain = l->data;
2302
2303     for (m = domain->announce_senders; m; m = m->next) {
2304       PtpAnnounceSender *sender = m->data;
2305
2306       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2307       g_queue_clear (&sender->announce_messages);
2308       g_free (sender);
2309     }
2310     g_list_free (domain->announce_senders);
2311
2312     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2313         NULL);
2314     g_queue_clear (&domain->pending_syncs);
2315     gst_object_unref (domain->domain_clock);
2316     g_free (domain);
2317   }
2318   g_list_free (domain_data);
2319   domain_data = NULL;
2320   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2321   g_list_free (domain_clocks);
2322   domain_clocks = NULL;
2323
2324   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2325   ptp_clock_id.port_number = 0;
2326
2327   initted = FALSE;
2328
2329   g_mutex_unlock (&ptp_lock);
2330 }
2331
2332 #define DEFAULT_DOMAIN 0
2333
2334 enum
2335 {
2336   PROP_0,
2337   PROP_DOMAIN,
2338   PROP_INTERNAL_CLOCK,
2339   PROP_MASTER_CLOCK_ID,
2340   PROP_GRANDMASTER_CLOCK_ID
2341 };
2342
2343 struct _GstPtpClockPrivate
2344 {
2345   guint domain;
2346   GstClock *domain_clock;
2347   gulong domain_stats_id;
2348 };
2349
2350 #define gst_ptp_clock_parent_class parent_class
2351 G_DEFINE_TYPE_WITH_PRIVATE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2352
2353 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2354     const GValue * value, GParamSpec * pspec);
2355 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2356     GValue * value, GParamSpec * pspec);
2357 static void gst_ptp_clock_finalize (GObject * object);
2358
2359 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2360
2361 static void
2362 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2363 {
2364   GObjectClass *gobject_class;
2365   GstClockClass *clock_class;
2366
2367   gobject_class = G_OBJECT_CLASS (klass);
2368   clock_class = GST_CLOCK_CLASS (klass);
2369
2370   gobject_class->finalize = gst_ptp_clock_finalize;
2371   gobject_class->get_property = gst_ptp_clock_get_property;
2372   gobject_class->set_property = gst_ptp_clock_set_property;
2373
2374   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2375       g_param_spec_uint ("domain", "Domain",
2376           "The PTP domain", 0, G_MAXUINT8,
2377           DEFAULT_DOMAIN,
2378           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2379
2380   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2381       g_param_spec_object ("internal-clock", "Internal Clock",
2382           "Internal clock", GST_TYPE_CLOCK,
2383           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2384
2385   g_object_class_install_property (gobject_class, PROP_MASTER_CLOCK_ID,
2386       g_param_spec_uint64 ("master-clock-id", "Master Clock ID",
2387           "Master Clock ID", 0, G_MAXUINT64, 0,
2388           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2389
2390   g_object_class_install_property (gobject_class, PROP_GRANDMASTER_CLOCK_ID,
2391       g_param_spec_uint64 ("grandmaster-clock-id", "Grand Master Clock ID",
2392           "Grand Master Clock ID", 0, G_MAXUINT64, 0,
2393           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2394
2395   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2396 }
2397
2398 static void
2399 gst_ptp_clock_init (GstPtpClock * self)
2400 {
2401   GstPtpClockPrivate *priv;
2402
2403   self->priv = priv = gst_ptp_clock_get_instance_private (self);
2404
2405   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2406   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2407
2408   priv->domain = DEFAULT_DOMAIN;
2409 }
2410
2411 static gboolean
2412 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2413 {
2414   gboolean got_clock = TRUE;
2415
2416   if (G_UNLIKELY (!self->priv->domain_clock)) {
2417     g_mutex_lock (&domain_clocks_lock);
2418     if (!self->priv->domain_clock) {
2419       GList *l;
2420
2421       got_clock = FALSE;
2422
2423       for (l = domain_clocks; l; l = l->next) {
2424         PtpDomainData *clock_data = l->data;
2425
2426         if (clock_data->domain == self->priv->domain &&
2427             clock_data->have_master_clock && clock_data->last_ptp_time != 0) {
2428           GST_DEBUG ("Switching domain clock on domain %d", clock_data->domain);
2429           self->priv->domain_clock = clock_data->domain_clock;
2430           got_clock = TRUE;
2431           break;
2432         }
2433       }
2434     }
2435     g_mutex_unlock (&domain_clocks_lock);
2436     if (got_clock) {
2437       g_object_notify (G_OBJECT (self), "internal-clock");
2438       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2439     }
2440   }
2441
2442   return got_clock;
2443 }
2444
2445 static gboolean
2446 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2447     gpointer user_data)
2448 {
2449   GstPtpClock *self = user_data;
2450
2451   if (domain != self->priv->domain
2452       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2453     return TRUE;
2454
2455   /* Let's set our internal clock */
2456   if (!gst_ptp_clock_ensure_domain_clock (self))
2457     return TRUE;
2458
2459   self->priv->domain_stats_id = 0;
2460
2461   return FALSE;
2462 }
2463
2464 static void
2465 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2466     const GValue * value, GParamSpec * pspec)
2467 {
2468   GstPtpClock *self = GST_PTP_CLOCK (object);
2469
2470   switch (prop_id) {
2471     case PROP_DOMAIN:
2472       self->priv->domain = g_value_get_uint (value);
2473       gst_ptp_clock_ensure_domain_clock (self);
2474       if (!self->priv->domain_clock)
2475         self->priv->domain_stats_id =
2476             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2477             NULL);
2478       break;
2479     default:
2480       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2481       break;
2482   }
2483 }
2484
2485 static void
2486 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2487     GValue * value, GParamSpec * pspec)
2488 {
2489   GstPtpClock *self = GST_PTP_CLOCK (object);
2490
2491   switch (prop_id) {
2492     case PROP_DOMAIN:
2493       g_value_set_uint (value, self->priv->domain);
2494       break;
2495     case PROP_INTERNAL_CLOCK:
2496       gst_ptp_clock_ensure_domain_clock (self);
2497       g_value_set_object (value, self->priv->domain_clock);
2498       break;
2499     case PROP_MASTER_CLOCK_ID:
2500     case PROP_GRANDMASTER_CLOCK_ID:{
2501       GList *l;
2502
2503       g_mutex_lock (&domain_clocks_lock);
2504       g_value_set_uint64 (value, 0);
2505
2506       for (l = domain_clocks; l; l = l->next) {
2507         PtpDomainData *clock_data = l->data;
2508
2509         if (clock_data->domain == self->priv->domain) {
2510           if (prop_id == PROP_MASTER_CLOCK_ID)
2511             g_value_set_uint64 (value,
2512                 clock_data->master_clock_identity.clock_identity);
2513           else
2514             g_value_set_uint64 (value, clock_data->grandmaster_identity);
2515           break;
2516         }
2517       }
2518       g_mutex_unlock (&domain_clocks_lock);
2519       break;
2520     }
2521     default:
2522       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2523       break;
2524   }
2525 }
2526
2527 static void
2528 gst_ptp_clock_finalize (GObject * object)
2529 {
2530   GstPtpClock *self = GST_PTP_CLOCK (object);
2531
2532   if (self->priv->domain_stats_id)
2533     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2534
2535   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2536 }
2537
2538 static GstClockTime
2539 gst_ptp_clock_get_internal_time (GstClock * clock)
2540 {
2541   GstPtpClock *self = GST_PTP_CLOCK (clock);
2542
2543   gst_ptp_clock_ensure_domain_clock (self);
2544
2545   if (!self->priv->domain_clock) {
2546     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2547         self->priv->domain);
2548     return GST_CLOCK_TIME_NONE;
2549   }
2550
2551   return gst_clock_get_time (self->priv->domain_clock);
2552 }
2553
2554 /**
2555  * gst_ptp_clock_new:
2556  * @name: Name of the clock
2557  * @domain: PTP domain
2558  *
2559  * Creates a new PTP clock instance that exports the PTP time of the master
2560  * clock in @domain. This clock can be slaved to other clocks as needed.
2561  *
2562  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2563  * default parameters.
2564  *
2565  * This clock only returns valid timestamps after it received the first
2566  * times from the PTP master clock on the network. Once this happens the
2567  * GstPtpClock::internal-clock property will become non-NULL. You can
2568  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
2569  * gst_clock_is_synced().
2570  *
2571  * Returns: (transfer full): A new #GstClock
2572  *
2573  * Since: 1.6
2574  */
2575 GstClock *
2576 gst_ptp_clock_new (const gchar * name, guint domain)
2577 {
2578   GstClock *clock;
2579
2580   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2581
2582   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2583     GST_ERROR ("Failed to initialize PTP");
2584     return NULL;
2585   }
2586
2587   clock = g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2588       NULL);
2589
2590   /* Clear floating flag */
2591   gst_object_ref_sink (clock);
2592
2593   return clock;
2594 }
2595
2596 typedef struct
2597 {
2598   guint8 domain;
2599   const GstStructure *stats;
2600 } DomainStatsMarshalData;
2601
2602 static void
2603 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2604 {
2605   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2606
2607   if (!callback (data->domain, data->stats, hook->data))
2608     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2609 }
2610
2611 static void
2612 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2613 {
2614   DomainStatsMarshalData data = { domain, stats };
2615
2616   g_mutex_lock (&ptp_lock);
2617   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2618       (GHookMarshaller) domain_stats_marshaller, &data);
2619   g_mutex_unlock (&ptp_lock);
2620 }
2621
2622 /**
2623  * gst_ptp_statistics_callback_add:
2624  * @callback: GstPtpStatisticsCallback to call
2625  * @user_data: Data to pass to the callback
2626  * @destroy_data: GDestroyNotify to destroy the data
2627  *
2628  * Installs a new statistics callback for gathering PTP statistics. See
2629  * GstPtpStatisticsCallback for a list of statistics that are provided.
2630  *
2631  * Returns: Id for the callback that can be passed to
2632  * gst_ptp_statistics_callback_remove()
2633  *
2634  * Since: 1.6
2635  */
2636 gulong
2637 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2638     gpointer user_data, GDestroyNotify destroy_data)
2639 {
2640   GHook *hook;
2641
2642   g_mutex_lock (&ptp_lock);
2643
2644   if (!domain_stats_hooks_initted) {
2645     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2646     domain_stats_hooks_initted = TRUE;
2647   }
2648
2649   hook = g_hook_alloc (&domain_stats_hooks);
2650   hook->func = callback;
2651   hook->data = user_data;
2652   hook->destroy = destroy_data;
2653   g_hook_prepend (&domain_stats_hooks, hook);
2654   g_atomic_int_add (&domain_stats_n_hooks, 1);
2655
2656   g_mutex_unlock (&ptp_lock);
2657
2658   return hook->hook_id;
2659 }
2660
2661 /**
2662  * gst_ptp_statistics_callback_remove:
2663  * @id: Callback id to remove
2664  *
2665  * Removes a PTP statistics callback that was previously added with
2666  * gst_ptp_statistics_callback_add().
2667  *
2668  * Since: 1.6
2669  */
2670 void
2671 gst_ptp_statistics_callback_remove (gulong id)
2672 {
2673   g_mutex_lock (&ptp_lock);
2674   if (g_hook_destroy (&domain_stats_hooks, id))
2675     g_atomic_int_add (&domain_stats_n_hooks, -1);
2676   g_mutex_unlock (&ptp_lock);
2677 }