ptp: Work around bug in ptpd in default configuration
[platform/upstream/gstreamer.git] / subprojects / gstreamer / libs / gst / net / gstptpclock.c
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @title: GstPtpClock
23  * @short_description: Special clock that synchronizes to a remote time
24  *                     provider via PTP (IEEE1588:2008).
25  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
26  *
27  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
28  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
29  * clock in some specific domain.
30  *
31  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
32  * a helper process to do the actual communication via the PTP ports. This is
33  * required as PTP listens on ports < 1024 and thus requires special
34  * privileges. Once this helper process is started, the main process will
35  * synchronize to all PTP domains that are detected on the selected
36  * interfaces.
37  *
38  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
39  * time from a master clock inside a specific PTP domain. This clock will only
40  * return valid timestamps once the timestamps in the PTP domain are known. To
41  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
42  * signal and gst_clock_is_synced().
43  *
44  * To gather statistics about the PTP clock synchronization,
45  * gst_ptp_statistics_callback_add() can be used. This gives the application
46  * the possibility to collect all kinds of statistics from the clock
47  * synchronization.
48  *
49  * Since: 1.6
50  *
51  */
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include "gstptpclock.h"
57
58 #include "gstptp_private.h"
59
60 #ifdef HAVE_SYS_WAIT_H
61 #include <sys/wait.h>
62 #endif
63 #ifdef G_OS_WIN32
64 #define WIN32_LEAN_AND_MEAN
65 #include <windows.h>
66 #include <processthreadsapi.h>  /* GetCurrentProcessId */
67 #endif
68 #include <sys/types.h>
69
70 #ifdef HAVE_UNISTD_H
71 #include <unistd.h>
72 #elif defined(G_OS_WIN32)
73 #include <io.h>
74 #endif
75
76 #include <gst/base/base.h>
77
78 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
79 #define GST_CAT_DEFAULT (ptp_debug)
80
81 /* IEEE 1588 7.7.3.1 */
82 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
83
84 /* Use a running average for calculating the mean path delay instead
85  * of just using the last measurement. Enabling this helps in unreliable
86  * networks, like wifi, with often changing delays
87  *
88  * Undef for following IEEE1588-2008 by the letter
89  */
90 #define USE_RUNNING_AVERAGE_DELAY 1
91
92 /* Filter out any measurements that are above a certain threshold compared to
93  * previous measurements. Enabling this helps filtering out outliers that
94  * happen fairly often in unreliable networks, like wifi.
95  *
96  * Undef for following IEEE1588-2008 by the letter
97  */
98 #define USE_MEASUREMENT_FILTERING 1
99
100 /* Select the first clock from which we capture a SYNC message as the master
101  * clock of the domain until we are ready to run the best master clock
102  * algorithm. This allows faster syncing but might mean a change of the master
103  * clock in the beginning. As all clocks in a domain are supposed to use the
104  * same time, this shouldn't be much of a problem.
105  *
106  * Undef for following IEEE1588-2008 by the letter
107  */
108 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
109
110 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
111  * afterwards. This allows better synchronization in networks with varying
112  * delays, as for every other SYNC message we would have to assume that it's
113  * the average of what we saw before. But that might be completely off
114  */
115 #define USE_ONLY_SYNC_WITH_DELAY 1
116
117 /* Filter out delay measurements that are too far away from the median of the
118  * last delay measurements, currently those that are more than 2 times as big.
119  * This increases accuracy a lot on wifi.
120  */
121 #define USE_MEDIAN_PRE_FILTERING 1
122 #define MEDIAN_PRE_FILTERING_WINDOW 9
123
124 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
125 #define MAX_SKIPPED_UPDATES 5
126
127 typedef enum
128 {
129   PTP_MESSAGE_TYPE_SYNC = 0x0,
130   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
131   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
132   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
133   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
134   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
135   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
136   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
137   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
138   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
139 } PtpMessageType;
140
141 typedef struct
142 {
143   guint64 seconds_field;        /* 48 bits valid */
144   guint32 nanoseconds_field;
145 } PtpTimestamp;
146
147 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
148 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
149 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
150
151 typedef struct
152 {
153   guint64 clock_identity;
154   guint16 port_number;
155 } PtpClockIdentity;
156
157 static gint
158 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
159 {
160   if (a->clock_identity < b->clock_identity)
161     return -1;
162   else if (a->clock_identity > b->clock_identity)
163     return 1;
164
165   if (a->port_number < b->port_number)
166     return -1;
167   else if (a->port_number > b->port_number)
168     return 1;
169
170   return 0;
171 }
172
173 typedef struct
174 {
175   guint8 clock_class;
176   guint8 clock_accuracy;
177   guint16 offset_scaled_log_variance;
178 } PtpClockQuality;
179
180 typedef struct
181 {
182   guint8 transport_specific;
183   PtpMessageType message_type;
184   /* guint8 reserved; */
185   guint8 version_ptp;
186   guint16 message_length;
187   guint8 domain_number;
188   /* guint8 reserved; */
189   guint16 flag_field;
190   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
191   /* guint32 reserved; */
192   PtpClockIdentity source_port_identity;
193   guint16 sequence_id;
194   guint8 control_field;
195   gint8 log_message_interval;
196
197   union
198   {
199     struct
200     {
201       PtpTimestamp origin_timestamp;
202       gint16 current_utc_offset;
203       /* guint8 reserved; */
204       guint8 grandmaster_priority_1;
205       PtpClockQuality grandmaster_clock_quality;
206       guint8 grandmaster_priority_2;
207       guint64 grandmaster_identity;
208       guint16 steps_removed;
209       guint8 time_source;
210     } announce;
211
212     struct
213     {
214       PtpTimestamp origin_timestamp;
215     } sync;
216
217     struct
218     {
219       PtpTimestamp precise_origin_timestamp;
220     } follow_up;
221
222     struct
223     {
224       PtpTimestamp origin_timestamp;
225     } delay_req;
226
227     struct
228     {
229       PtpTimestamp receive_timestamp;
230       PtpClockIdentity requesting_port_identity;
231     } delay_resp;
232
233   } message_specific;
234 } PtpMessage;
235
236 static GMutex ptp_lock;
237 static GCond ptp_cond;
238 static gboolean initted = FALSE;
239 #ifdef HAVE_PTP
240 static gboolean supported = TRUE;
241 #else
242 static gboolean supported = FALSE;
243 #endif
244 static GPid ptp_helper_pid;
245 static GThread *ptp_helper_thread;
246 static GMainContext *main_context;
247 static GMainLoop *main_loop;
248 static GIOChannel *stdin_channel, *stdout_channel;
249 static GRand *delay_req_rand;
250 static GstClock *observation_system_clock;
251 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
252
253 typedef struct
254 {
255   GstClockTime receive_time;
256
257   PtpClockIdentity master_clock_identity;
258
259   guint8 grandmaster_priority_1;
260   PtpClockQuality grandmaster_clock_quality;
261   guint8 grandmaster_priority_2;
262   guint64 grandmaster_identity;
263   guint16 steps_removed;
264   guint8 time_source;
265
266   guint16 sequence_id;
267 } PtpAnnounceMessage;
268
269 typedef struct
270 {
271   PtpClockIdentity master_clock_identity;
272
273   GstClockTime announce_interval;       /* last interval we received */
274   GQueue announce_messages;
275 } PtpAnnounceSender;
276
277 typedef struct
278 {
279   guint domain;
280   PtpClockIdentity master_clock_identity;
281
282   guint16 sync_seqnum;
283   GstClockTime sync_recv_time_local;    /* t2 */
284   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
285   GstClockTime follow_up_recv_time_local;
286
287   GSource *timeout_source;
288   guint16 delay_req_seqnum;
289   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
290   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
291   GstClockTime delay_resp_recv_time_local;
292
293   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
294   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
295 } PtpPendingSync;
296
297 static void
298 ptp_pending_sync_free (PtpPendingSync * sync)
299 {
300   if (sync->timeout_source) {
301     g_source_destroy (sync->timeout_source);
302     g_source_unref (sync->timeout_source);
303   }
304   g_free (sync);
305 }
306
307 typedef struct
308 {
309   guint domain;
310
311   GstClockTime last_ptp_time;
312   GstClockTime last_local_time;
313   gint skipped_updates;
314
315   /* Used for selecting the master/grandmaster */
316   GList *announce_senders;
317
318   /* Last selected master clock */
319   gboolean have_master_clock;
320   PtpClockIdentity master_clock_identity;
321   guint64 grandmaster_identity;
322
323   /* Last SYNC or FOLLOW_UP timestamp we received */
324   GstClockTime last_ptp_sync_time;
325   GstClockTime sync_interval;
326
327   GstClockTime mean_path_delay;
328   GstClockTime last_delay_req, min_delay_req_interval;
329   guint16 last_delay_req_seqnum;
330
331   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
332   gint last_path_delays_missing;
333
334   GQueue pending_syncs;
335
336   GstClock *domain_clock;
337 } PtpDomainData;
338
339 static GList *domain_data;
340 static GMutex domain_clocks_lock;
341 static GList *domain_clocks;
342
343 /* Protected by PTP lock */
344 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
345 static GHookList domain_stats_hooks;
346 static gint domain_stats_n_hooks;
347 static gboolean domain_stats_hooks_initted = FALSE;
348
349 /* Only ever accessed from the PTP thread */
350 /* PTPD in hybrid mode (default) sends multicast PTP messages with an invalid
351  * logMessageInterval. We work around this here and warn once */
352 static gboolean ptpd_hybrid_workaround_warned_once = FALSE;
353
354 /* Converts log2 seconds to GstClockTime */
355 static GstClockTime
356 log2_to_clock_time (gint l)
357 {
358   if (l < 0)
359     return GST_SECOND >> (-l);
360   else
361     return GST_SECOND << l;
362 }
363
364 static void
365 dump_ptp_message (PtpMessage * msg)
366 {
367   GST_TRACE ("PTP message:");
368   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
369   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
370   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
371   GST_TRACE ("\tmessage_length: %u", msg->message_length);
372   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
373   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
374   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
375       (msg->correction_field / 65536),
376       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
377   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
378       msg->source_port_identity.clock_identity,
379       msg->source_port_identity.port_number);
380   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
381   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
382   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
383       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
384
385   switch (msg->message_type) {
386     case PTP_MESSAGE_TYPE_ANNOUNCE:
387       GST_TRACE ("\tANNOUNCE:");
388       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
389           msg->message_specific.announce.origin_timestamp.seconds_field,
390           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
391       GST_TRACE ("\t\tcurrent_utc_offset: %d",
392           msg->message_specific.announce.current_utc_offset);
393       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
394           msg->message_specific.announce.grandmaster_priority_1);
395       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
396           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
397           msg->message_specific.announce.
398           grandmaster_clock_quality.clock_accuracy,
399           msg->message_specific.announce.
400           grandmaster_clock_quality.offset_scaled_log_variance);
401       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
402           msg->message_specific.announce.grandmaster_priority_2);
403       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
404           msg->message_specific.announce.grandmaster_identity);
405       GST_TRACE ("\t\tsteps_removed: %u",
406           msg->message_specific.announce.steps_removed);
407       GST_TRACE ("\t\ttime_source: 0x%02x",
408           msg->message_specific.announce.time_source);
409       break;
410     case PTP_MESSAGE_TYPE_SYNC:
411       GST_TRACE ("\tSYNC:");
412       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
413           msg->message_specific.sync.origin_timestamp.seconds_field,
414           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
415       break;
416     case PTP_MESSAGE_TYPE_FOLLOW_UP:
417       GST_TRACE ("\tFOLLOW_UP:");
418       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
419           msg->message_specific.follow_up.
420           precise_origin_timestamp.seconds_field,
421           msg->message_specific.follow_up.
422           precise_origin_timestamp.nanoseconds_field);
423       break;
424     case PTP_MESSAGE_TYPE_DELAY_REQ:
425       GST_TRACE ("\tDELAY_REQ:");
426       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
427           msg->message_specific.delay_req.origin_timestamp.seconds_field,
428           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
429       break;
430     case PTP_MESSAGE_TYPE_DELAY_RESP:
431       GST_TRACE ("\tDELAY_RESP:");
432       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
433           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
434           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
435       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
436           "x %u",
437           msg->message_specific.delay_resp.
438           requesting_port_identity.clock_identity,
439           msg->message_specific.delay_resp.
440           requesting_port_identity.port_number);
441       break;
442     default:
443       break;
444   }
445   GST_TRACE (" ");
446 }
447
448 /* IEEE 1588-2008 5.3.3 */
449 static gboolean
450 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
451 {
452   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
453
454   timestamp->seconds_field =
455       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
456       gst_byte_reader_get_uint16_be_unchecked (reader);
457   timestamp->nanoseconds_field =
458       gst_byte_reader_get_uint32_be_unchecked (reader);
459
460   if (timestamp->nanoseconds_field >= 1000000000)
461     return FALSE;
462
463   return TRUE;
464 }
465
466 /* IEEE 1588-2008 13.3 */
467 static gboolean
468 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
469 {
470   guint8 b;
471
472   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
473
474   b = gst_byte_reader_get_uint8_unchecked (reader);
475   msg->transport_specific = b >> 4;
476   msg->message_type = b & 0x0f;
477
478   b = gst_byte_reader_get_uint8_unchecked (reader);
479   msg->version_ptp = b & 0x0f;
480   if (msg->version_ptp != 2) {
481     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
482     return FALSE;
483   }
484
485   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
486   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
487     GST_WARNING ("Not enough data (%u < %u)",
488         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
489     return FALSE;
490   }
491
492   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
493   gst_byte_reader_skip_unchecked (reader, 1);
494
495   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
496   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
497   gst_byte_reader_skip_unchecked (reader, 4);
498
499   msg->source_port_identity.clock_identity =
500       gst_byte_reader_get_uint64_be_unchecked (reader);
501   msg->source_port_identity.port_number =
502       gst_byte_reader_get_uint16_be_unchecked (reader);
503
504   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
505   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
506   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
507
508   return TRUE;
509 }
510
511 /* IEEE 1588-2008 13.5 */
512 static gboolean
513 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
514 {
515   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
516
517   if (gst_byte_reader_get_remaining (reader) < 20)
518     return FALSE;
519
520   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
521           reader))
522     return FALSE;
523
524   msg->message_specific.announce.current_utc_offset =
525       gst_byte_reader_get_uint16_be_unchecked (reader);
526   gst_byte_reader_skip_unchecked (reader, 1);
527
528   msg->message_specific.announce.grandmaster_priority_1 =
529       gst_byte_reader_get_uint8_unchecked (reader);
530   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
531       gst_byte_reader_get_uint8_unchecked (reader);
532   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
533       gst_byte_reader_get_uint8_unchecked (reader);
534   msg->message_specific.announce.
535       grandmaster_clock_quality.offset_scaled_log_variance =
536       gst_byte_reader_get_uint16_be_unchecked (reader);
537   msg->message_specific.announce.grandmaster_priority_2 =
538       gst_byte_reader_get_uint8_unchecked (reader);
539   msg->message_specific.announce.grandmaster_identity =
540       gst_byte_reader_get_uint64_be_unchecked (reader);
541   msg->message_specific.announce.steps_removed =
542       gst_byte_reader_get_uint16_be_unchecked (reader);
543   msg->message_specific.announce.time_source =
544       gst_byte_reader_get_uint8_unchecked (reader);
545
546   return TRUE;
547 }
548
549 /* IEEE 1588-2008 13.6 */
550 static gboolean
551 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
552 {
553   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
554
555   if (gst_byte_reader_get_remaining (reader) < 10)
556     return FALSE;
557
558   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
559           reader))
560     return FALSE;
561
562   return TRUE;
563 }
564
565 /* IEEE 1588-2008 13.6 */
566 static gboolean
567 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
568 {
569   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
570
571   if (gst_byte_reader_get_remaining (reader) < 10)
572     return FALSE;
573
574   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
575           reader))
576     return FALSE;
577
578   return TRUE;
579 }
580
581 /* IEEE 1588-2008 13.7 */
582 static gboolean
583 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
584 {
585   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
586
587   if (gst_byte_reader_get_remaining (reader) < 10)
588     return FALSE;
589
590   if (!parse_ptp_timestamp (&msg->message_specific.
591           follow_up.precise_origin_timestamp, reader))
592     return FALSE;
593
594   return TRUE;
595 }
596
597 /* IEEE 1588-2008 13.8 */
598 static gboolean
599 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
600 {
601   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
602       FALSE);
603
604   if (gst_byte_reader_get_remaining (reader) < 20)
605     return FALSE;
606
607   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
608           reader))
609     return FALSE;
610
611   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
612       gst_byte_reader_get_uint64_be_unchecked (reader);
613   msg->message_specific.delay_resp.requesting_port_identity.port_number =
614       gst_byte_reader_get_uint16_be_unchecked (reader);
615
616   return TRUE;
617 }
618
619 static gboolean
620 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
621 {
622   GstByteReader reader;
623   gboolean ret = FALSE;
624
625   gst_byte_reader_init (&reader, data, size);
626
627   if (!parse_ptp_message_header (msg, &reader)) {
628     GST_WARNING ("Failed to parse PTP message header");
629     return FALSE;
630   }
631
632   switch (msg->message_type) {
633     case PTP_MESSAGE_TYPE_SYNC:
634       ret = parse_ptp_message_sync (msg, &reader);
635       break;
636     case PTP_MESSAGE_TYPE_FOLLOW_UP:
637       ret = parse_ptp_message_follow_up (msg, &reader);
638       break;
639     case PTP_MESSAGE_TYPE_DELAY_REQ:
640       ret = parse_ptp_message_delay_req (msg, &reader);
641       break;
642     case PTP_MESSAGE_TYPE_DELAY_RESP:
643       ret = parse_ptp_message_delay_resp (msg, &reader);
644       break;
645     case PTP_MESSAGE_TYPE_ANNOUNCE:
646       ret = parse_ptp_message_announce (msg, &reader);
647       break;
648     default:
649       /* ignore for now */
650       break;
651   }
652
653   return ret;
654 }
655
656 static gint
657 compare_announce_message (const PtpAnnounceMessage * a,
658     const PtpAnnounceMessage * b)
659 {
660   /* IEEE 1588 Figure 27 */
661   if (a->grandmaster_identity == b->grandmaster_identity) {
662     if (a->steps_removed + 1 < b->steps_removed)
663       return -1;
664     else if (a->steps_removed > b->steps_removed + 1)
665       return 1;
666
667     /* Error cases are filtered out earlier */
668     if (a->steps_removed < b->steps_removed)
669       return -1;
670     else if (a->steps_removed > b->steps_removed)
671       return 1;
672
673     /* Error cases are filtered out earlier */
674     if (a->master_clock_identity.clock_identity <
675         b->master_clock_identity.clock_identity)
676       return -1;
677     else if (a->master_clock_identity.clock_identity >
678         b->master_clock_identity.clock_identity)
679       return 1;
680
681     /* Error cases are filtered out earlier */
682     if (a->master_clock_identity.port_number <
683         b->master_clock_identity.port_number)
684       return -1;
685     else if (a->master_clock_identity.port_number >
686         b->master_clock_identity.port_number)
687       return 1;
688     else
689       g_assert_not_reached ();
690
691     return 0;
692   }
693
694   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
695     return -1;
696   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
697     return 1;
698
699   if (a->grandmaster_clock_quality.clock_class <
700       b->grandmaster_clock_quality.clock_class)
701     return -1;
702   else if (a->grandmaster_clock_quality.clock_class >
703       b->grandmaster_clock_quality.clock_class)
704     return 1;
705
706   if (a->grandmaster_clock_quality.clock_accuracy <
707       b->grandmaster_clock_quality.clock_accuracy)
708     return -1;
709   else if (a->grandmaster_clock_quality.clock_accuracy >
710       b->grandmaster_clock_quality.clock_accuracy)
711     return 1;
712
713   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
714       b->grandmaster_clock_quality.offset_scaled_log_variance)
715     return -1;
716   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
717       b->grandmaster_clock_quality.offset_scaled_log_variance)
718     return 1;
719
720   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
721     return -1;
722   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
723     return 1;
724
725   if (a->grandmaster_identity < b->grandmaster_identity)
726     return -1;
727   else if (a->grandmaster_identity > b->grandmaster_identity)
728     return 1;
729   else
730     g_assert_not_reached ();
731
732   return 0;
733 }
734
735 static void
736 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
737 {
738   GList *qualified_messages = NULL;
739   GList *l, *m;
740   PtpAnnounceMessage *best = NULL;
741
742   /* IEEE 1588 9.3.2.5 */
743   for (l = domain->announce_senders; l; l = l->next) {
744     PtpAnnounceSender *sender = l->data;
745     GstClockTime window = 4 * sender->announce_interval;
746     gint count = 0;
747
748     for (m = sender->announce_messages.head; m; m = m->next) {
749       PtpAnnounceMessage *msg = m->data;
750
751       if (now - msg->receive_time <= window)
752         count++;
753     }
754
755     /* Only include the newest message of announce senders that had at least 2
756      * announce messages in the last 4 announce intervals. Which also means
757      * that we wait at least 4 announce intervals before we select a master
758      * clock. Until then we just report based on the newest SYNC we received
759      */
760     if (count >= 2) {
761       qualified_messages =
762           g_list_prepend (qualified_messages,
763           g_queue_peek_tail (&sender->announce_messages));
764     }
765   }
766
767   if (!qualified_messages) {
768     GST_DEBUG
769         ("No qualified announce messages for domain %u, can't select a master clock",
770         domain->domain);
771     domain->have_master_clock = FALSE;
772     return;
773   }
774
775   for (l = qualified_messages; l; l = l->next) {
776     PtpAnnounceMessage *msg = l->data;
777
778     if (!best || compare_announce_message (msg, best) < 0)
779       best = msg;
780   }
781   g_clear_pointer (&qualified_messages, g_list_free);
782
783   if (domain->have_master_clock
784       && compare_clock_identity (&domain->master_clock_identity,
785           &best->master_clock_identity) == 0) {
786     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
787   } else {
788     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
789         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
790         domain->domain, best->master_clock_identity.clock_identity,
791         best->master_clock_identity.port_number, best->grandmaster_identity);
792
793     domain->have_master_clock = TRUE;
794     domain->grandmaster_identity = best->grandmaster_identity;
795
796     /* Opportunistic master clock selection likely gave us the same master
797      * clock before, no need to reset all statistics */
798     if (compare_clock_identity (&domain->master_clock_identity,
799             &best->master_clock_identity) != 0) {
800       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
801           sizeof (PtpClockIdentity));
802       domain->mean_path_delay = 0;
803       domain->last_delay_req = 0;
804       domain->last_path_delays_missing = 9;
805       domain->min_delay_req_interval = 0;
806       domain->sync_interval = 0;
807       domain->last_ptp_sync_time = 0;
808       domain->skipped_updates = 0;
809       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
810           NULL);
811       g_queue_clear (&domain->pending_syncs);
812     }
813
814     if (g_atomic_int_get (&domain_stats_n_hooks)) {
815       GstStructure *stats =
816           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
817           "domain", G_TYPE_UINT, domain->domain,
818           "master-clock-id", G_TYPE_UINT64,
819           domain->master_clock_identity.clock_identity,
820           "master-clock-port", G_TYPE_UINT,
821           domain->master_clock_identity.port_number,
822           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
823           NULL);
824       emit_ptp_statistics (domain->domain, stats);
825       gst_structure_free (stats);
826     }
827   }
828 }
829
830 static void
831 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
832 {
833   GList *l;
834   PtpDomainData *domain = NULL;
835   PtpAnnounceSender *sender = NULL;
836   PtpAnnounceMessage *announce;
837
838   /* IEEE1588 9.3.2.2 e)
839    * Don't consider messages with the alternate master flag set
840    */
841   if ((msg->flag_field & 0x0100))
842     return;
843
844   /* IEEE 1588 9.3.2.5 d)
845    * Don't consider announce messages with steps_removed>=255
846    */
847   if (msg->message_specific.announce.steps_removed >= 255)
848     return;
849
850   for (l = domain_data; l; l = l->next) {
851     PtpDomainData *tmp = l->data;
852
853     if (tmp->domain == msg->domain_number) {
854       domain = tmp;
855       break;
856     }
857   }
858
859   if (!domain) {
860     gchar *clock_name;
861
862     domain = g_new0 (PtpDomainData, 1);
863     domain->domain = msg->domain_number;
864     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
865     domain->domain_clock =
866         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
867     gst_object_ref_sink (domain->domain_clock);
868     g_free (clock_name);
869     g_queue_init (&domain->pending_syncs);
870     domain->last_path_delays_missing = 9;
871     domain_data = g_list_prepend (domain_data, domain);
872
873     g_mutex_lock (&domain_clocks_lock);
874     domain_clocks = g_list_prepend (domain_clocks, domain);
875     g_mutex_unlock (&domain_clocks_lock);
876
877     if (g_atomic_int_get (&domain_stats_n_hooks)) {
878       GstStructure *stats =
879           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
880           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
881           domain->domain_clock, NULL);
882       emit_ptp_statistics (domain->domain, stats);
883       gst_structure_free (stats);
884     }
885   }
886
887   for (l = domain->announce_senders; l; l = l->next) {
888     PtpAnnounceSender *tmp = l->data;
889
890     if (compare_clock_identity (&tmp->master_clock_identity,
891             &msg->source_port_identity) == 0) {
892       sender = tmp;
893       break;
894     }
895   }
896
897   if (!sender) {
898     sender = g_new0 (PtpAnnounceSender, 1);
899
900     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
901         sizeof (PtpClockIdentity));
902     g_queue_init (&sender->announce_messages);
903     domain->announce_senders =
904         g_list_prepend (domain->announce_senders, sender);
905   }
906
907   for (l = sender->announce_messages.head; l; l = l->next) {
908     PtpAnnounceMessage *tmp = l->data;
909
910     /* IEEE 1588 9.3.2.5 c)
911      * Don't consider identical messages, i.e. duplicates
912      */
913     if (tmp->sequence_id == msg->sequence_id)
914       return;
915   }
916
917   if (msg->log_message_interval == 0x7f) {
918     sender->announce_interval = 2 * GST_SECOND;
919
920     if (!ptpd_hybrid_workaround_warned_once) {
921       GST_WARNING ("Working around ptpd bug: ptpd sends multicast PTP packets "
922           "with invalid logMessageInterval");
923       ptpd_hybrid_workaround_warned_once = TRUE;
924     }
925   } else {
926     sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
927   }
928
929   announce = g_new0 (PtpAnnounceMessage, 1);
930   announce->receive_time = receive_time;
931   announce->sequence_id = msg->sequence_id;
932   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
933       sizeof (PtpClockIdentity));
934   announce->grandmaster_identity =
935       msg->message_specific.announce.grandmaster_identity;
936   announce->grandmaster_priority_1 =
937       msg->message_specific.announce.grandmaster_priority_1;
938   announce->grandmaster_clock_quality.clock_class =
939       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
940   announce->grandmaster_clock_quality.clock_accuracy =
941       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
942   announce->grandmaster_clock_quality.offset_scaled_log_variance =
943       msg->message_specific.announce.
944       grandmaster_clock_quality.offset_scaled_log_variance;
945   announce->grandmaster_priority_2 =
946       msg->message_specific.announce.grandmaster_priority_2;
947   announce->steps_removed = msg->message_specific.announce.steps_removed;
948   announce->time_source = msg->message_specific.announce.time_source;
949   g_queue_push_tail (&sender->announce_messages, announce);
950
951   select_best_master_clock (domain, receive_time);
952 }
953
954 static gboolean
955 send_delay_req_timeout (PtpPendingSync * sync)
956 {
957   StdIOHeader header = { 0, };
958   guint8 delay_req[44];
959   GstByteWriter writer;
960   GIOStatus status;
961   gsize written;
962   GError *err = NULL;
963
964   header.type = TYPE_EVENT;
965   header.size = 44;
966
967   GST_TRACE ("Sending delay_req to domain %u", sync->domain);
968
969   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
970   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
971   gst_byte_writer_put_uint8_unchecked (&writer, 2);
972   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
973   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
974   gst_byte_writer_put_uint8_unchecked (&writer, 0);
975   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
976   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
977   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
978   gst_byte_writer_put_uint64_be_unchecked (&writer,
979       ptp_clock_id.clock_identity);
980   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
981   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
982   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
983   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
984   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
985   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
986
987   status =
988       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
989       sizeof (header), &written, &err);
990   if (status == G_IO_STATUS_ERROR) {
991     g_warning ("Failed to write to stdout: %s", err->message);
992     g_clear_error (&err);
993     return G_SOURCE_REMOVE;
994   } else if (status == G_IO_STATUS_EOF) {
995     g_message ("EOF on stdout");
996     g_main_loop_quit (main_loop);
997     return G_SOURCE_REMOVE;
998   } else if (status != G_IO_STATUS_NORMAL) {
999     g_warning ("Unexpected stdout write status: %d", status);
1000     g_main_loop_quit (main_loop);
1001     return G_SOURCE_REMOVE;
1002   } else if (written != sizeof (header)) {
1003     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
1004     g_main_loop_quit (main_loop);
1005     return G_SOURCE_REMOVE;
1006   }
1007
1008   sync->delay_req_send_time_local =
1009       gst_clock_get_time (observation_system_clock);
1010
1011   status =
1012       g_io_channel_write_chars (stdout_channel,
1013       (const gchar *) delay_req, 44, &written, &err);
1014   if (status == G_IO_STATUS_ERROR) {
1015     g_warning ("Failed to write to stdout: %s", err->message);
1016     g_clear_error (&err);
1017     g_main_loop_quit (main_loop);
1018     return G_SOURCE_REMOVE;
1019   } else if (status == G_IO_STATUS_EOF) {
1020     g_message ("EOF on stdout");
1021     g_main_loop_quit (main_loop);
1022     return G_SOURCE_REMOVE;
1023   } else if (status != G_IO_STATUS_NORMAL) {
1024     g_warning ("Unexpected stdout write status: %d", status);
1025     g_main_loop_quit (main_loop);
1026     return G_SOURCE_REMOVE;
1027   } else if (written != 44) {
1028     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
1029     g_main_loop_quit (main_loop);
1030     return G_SOURCE_REMOVE;
1031   }
1032
1033   return G_SOURCE_REMOVE;
1034 }
1035
1036 static gboolean
1037 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
1038 {
1039   GstClockTime now = gst_clock_get_time (observation_system_clock);
1040   guint timeout;
1041   GSource *timeout_source;
1042
1043   if (domain->last_delay_req != 0
1044       && domain->last_delay_req + domain->min_delay_req_interval > now) {
1045     GST_TRACE ("Too soon to send new DELAY_REQ");
1046     return FALSE;
1047   }
1048
1049   domain->last_delay_req = now;
1050   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1051
1052   /* IEEE 1588 9.5.11.2 */
1053   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1054     timeout = 0;
1055   else
1056     timeout =
1057         g_rand_int_range (delay_req_rand, 0,
1058         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1059
1060   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1061   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1062   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1063       sync, NULL);
1064   g_source_attach (timeout_source, main_context);
1065
1066   return TRUE;
1067 }
1068
1069 /* Filtering of outliers for RTT and time calculations inspired
1070  * by the code from gstnetclientclock.c
1071  */
1072 static void
1073 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1074 {
1075   GstClockTime internal_time, external_time, rate_num, rate_den;
1076   GstClockTime corrected_ptp_time, corrected_local_time;
1077   gdouble r_squared = 0.0;
1078   gboolean synced;
1079   GstClockTimeDiff discont = 0;
1080   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1081 #ifdef USE_MEASUREMENT_FILTERING
1082   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1083       orig_rate_den;
1084   GstClockTime new_estimated_ptp_time;
1085   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1086   gboolean now_synced;
1087 #endif
1088 #ifdef USE_ONLY_SYNC_WITH_DELAY
1089   GstClockTime mean_path_delay;
1090 #endif
1091
1092   GST_TRACE ("Updating PTP time");
1093
1094 #ifdef USE_ONLY_SYNC_WITH_DELAY
1095   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE) {
1096     GST_TRACE ("Not updating - no delay_req sent");
1097     return;
1098   }
1099
1100   /* IEEE 1588 11.3 */
1101   mean_path_delay =
1102       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1103       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1104       (sync->correction_field_sync + sync->correction_field_delay +
1105           32768) / 65536) / 2;
1106 #endif
1107
1108   /* IEEE 1588 11.2 */
1109   corrected_ptp_time =
1110       sync->sync_send_time_remote +
1111       (sync->correction_field_sync + 32768) / 65536;
1112
1113 #ifdef USE_ONLY_SYNC_WITH_DELAY
1114   corrected_local_time = sync->sync_recv_time_local - mean_path_delay;
1115 #else
1116   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1117 #endif
1118
1119 #ifdef USE_MEASUREMENT_FILTERING
1120   /* We check this here and when updating the mean path delay, because
1121    * we can get here without a delay response too. The tolerance on
1122    * accepting follow-up after a sync is high, because a PTP server
1123    * doesn't have to prioritise sending FOLLOW_UP - its purpose is
1124    * just to give us the accurate timestamp of the preceding SYNC.
1125    *
1126    * For that reason also allow at least 100ms delay in case of delays smaller
1127    * than 5ms. */
1128   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1129       && sync->follow_up_recv_time_local >
1130       sync->sync_recv_time_local + MAX (100 * GST_MSECOND,
1131           20 * domain->mean_path_delay)) {
1132     GstClockTimeDiff delay =
1133         sync->follow_up_recv_time_local - sync->sync_recv_time_local;
1134     GST_WARNING ("Sync-follow-up delay for domain %u too big: %"
1135         GST_STIME_FORMAT " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")",
1136         domain->domain, GST_STIME_ARGS (delay),
1137         GST_TIME_ARGS (domain->mean_path_delay));
1138     synced = FALSE;
1139     gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1140         &internal_time, &external_time, &rate_num, &rate_den);
1141     goto out;
1142   }
1143 #endif
1144
1145   /* Set an initial local-remote relation */
1146   if (domain->last_ptp_time == 0)
1147     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1148         corrected_ptp_time, 1, 1);
1149
1150 #ifdef USE_MEASUREMENT_FILTERING
1151   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1152    * estimate with our present knowledge about the clock
1153    */
1154   /* Store what the clock produced as 'now' before this update */
1155   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1156       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1157   internal_time = orig_internal_time;
1158   external_time = orig_external_time;
1159   rate_num = orig_rate_num;
1160   rate_den = orig_rate_den;
1161
1162   /* 3/4 RTT window around the estimation */
1163   max_discont = domain->mean_path_delay * 3 / 2;
1164
1165   /* Check if the estimated sync time is inside our window */
1166   estimated_ptp_time_min = corrected_local_time - max_discont;
1167   estimated_ptp_time_min =
1168       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1169       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1170   estimated_ptp_time_max = corrected_local_time + max_discont;
1171   estimated_ptp_time_max =
1172       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1173       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1174
1175   synced = (estimated_ptp_time_min < corrected_ptp_time
1176       && corrected_ptp_time < estimated_ptp_time_max);
1177
1178   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1179       GST_TIME_FORMAT, domain->domain,
1180       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1181
1182   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1183       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1184       GST_TIME_ARGS (corrected_ptp_time),
1185       GST_TIME_ARGS (estimated_ptp_time_max));
1186
1187   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1188           corrected_local_time, corrected_ptp_time, &r_squared,
1189           &internal_time, &external_time, &rate_num, &rate_den)) {
1190     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1191
1192     /* Old estimated PTP time based on receive time and path delay */
1193     estimated_ptp_time = corrected_local_time;
1194     estimated_ptp_time =
1195         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1196         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1197         orig_external_time, orig_rate_num, orig_rate_den);
1198
1199     /* New estimated PTP time based on receive time and path delay */
1200     new_estimated_ptp_time = corrected_local_time;
1201     new_estimated_ptp_time =
1202         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1203         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1204         external_time, rate_num, rate_den);
1205
1206     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1207     if (synced && ABS (discont) > max_discont) {
1208       GstClockTimeDiff offset;
1209       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1210           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1211           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1212           GST_TIME_ARGS (max_discont));
1213       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1214         offset = max_discont - discont;
1215         if (-offset > external_time)
1216           external_time = 0;
1217         else
1218           external_time += offset;
1219       } else {                  /* Too large a backward step - add a +ve offset */
1220         offset = -(max_discont + discont);
1221         external_time += offset;
1222       }
1223
1224       discont += offset;
1225     } else {
1226       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1227           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1228           GST_TIME_ARGS (max_discont));
1229     }
1230
1231     /* Check if the estimated sync time is now (still) inside our window */
1232     estimated_ptp_time_min = corrected_local_time - max_discont;
1233     estimated_ptp_time_min =
1234         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1235         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1236         external_time, rate_num, rate_den);
1237     estimated_ptp_time_max = corrected_local_time + max_discont;
1238     estimated_ptp_time_max =
1239         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1240         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1241         external_time, rate_num, rate_den);
1242
1243     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1244         && corrected_ptp_time < estimated_ptp_time_max);
1245
1246     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1247         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1248         GST_TIME_ARGS (corrected_ptp_time),
1249         GST_TIME_ARGS (estimated_ptp_time_max));
1250
1251     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1252       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1253           internal_time, external_time, rate_num, rate_den);
1254       domain->skipped_updates = 0;
1255
1256       domain->last_ptp_time = corrected_ptp_time;
1257       domain->last_local_time = corrected_local_time;
1258     } else {
1259       domain->skipped_updates++;
1260     }
1261   } else {
1262     domain->last_ptp_time = corrected_ptp_time;
1263     domain->last_local_time = corrected_local_time;
1264   }
1265
1266 #else
1267   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1268       GST_TIME_FORMAT, domain->domain,
1269       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1270
1271   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1272       &internal_time, &external_time, &rate_num, &rate_den);
1273
1274   estimated_ptp_time = corrected_local_time;
1275   estimated_ptp_time =
1276       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1277       (domain->domain_clock), estimated_ptp_time, internal_time,
1278       external_time, rate_num, rate_den);
1279
1280   gst_clock_add_observation (domain->domain_clock,
1281       corrected_local_time, corrected_ptp_time, &r_squared);
1282
1283   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1284       &internal_time, &external_time, &rate_num, &rate_den);
1285
1286   synced = TRUE;
1287   domain->last_ptp_time = corrected_ptp_time;
1288   domain->last_local_time = corrected_local_time;
1289 #endif
1290
1291 #ifdef USE_MEASUREMENT_FILTERING
1292 out:
1293 #endif
1294   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1295     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1296         "domain", G_TYPE_UINT, domain->domain,
1297         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1298         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1299         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1300         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1301         "discontinuity", G_TYPE_INT64, discont,
1302         "synced", G_TYPE_BOOLEAN, synced,
1303         "r-squared", G_TYPE_DOUBLE, r_squared,
1304         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1305         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1306         "rate-num", G_TYPE_UINT64, rate_num,
1307         "rate-den", G_TYPE_UINT64, rate_den,
1308         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1309         NULL);
1310     emit_ptp_statistics (domain->domain, stats);
1311     gst_structure_free (stats);
1312   }
1313
1314 }
1315
1316 #ifdef USE_MEDIAN_PRE_FILTERING
1317 static gint
1318 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1319 {
1320   if (*a < *b)
1321     return -1;
1322   else if (*a > *b)
1323     return 1;
1324   return 0;
1325 }
1326 #endif
1327
1328 static gboolean
1329 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1330 {
1331 #ifdef USE_MEDIAN_PRE_FILTERING
1332   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
1333   GstClockTime median;
1334   gint i;
1335 #endif
1336
1337   GstClockTime mean_path_delay, delay_req_delay = 0;
1338   gboolean ret;
1339
1340   /* IEEE 1588 11.3 */
1341   mean_path_delay =
1342       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1343       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1344       (sync->correction_field_sync + sync->correction_field_delay +
1345           32768) / 65536) / 2;
1346
1347 #ifdef USE_MEDIAN_PRE_FILTERING
1348   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
1349     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1350   domain->last_path_delays[i - 1] = mean_path_delay;
1351
1352   if (domain->last_path_delays_missing) {
1353     domain->last_path_delays_missing--;
1354   } else {
1355     memcpy (&last_path_delays, &domain->last_path_delays,
1356         sizeof (last_path_delays));
1357     g_qsort_with_data (&last_path_delays,
1358         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
1359         (GCompareDataFunc) compare_clock_time, NULL);
1360
1361     median = last_path_delays[MEDIAN_PRE_FILTERING_WINDOW / 2];
1362
1363     /* FIXME: We might want to use something else here, like only allowing
1364      * things in the interquartile range, or also filtering away delays that
1365      * are too small compared to the median. This here worked well enough
1366      * in tests so far.
1367      */
1368     if (mean_path_delay > 2 * median) {
1369       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1370           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1371           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1372       ret = FALSE;
1373       goto out;
1374     }
1375   }
1376 #endif
1377
1378 #ifdef USE_RUNNING_AVERAGE_DELAY
1379   /* Track an average round trip time, for a bit of smoothing */
1380   /* Always update before discarding a sample, so genuine changes in
1381    * the network get picked up, eventually */
1382   if (domain->mean_path_delay == 0)
1383     domain->mean_path_delay = mean_path_delay;
1384   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1385     domain->mean_path_delay =
1386         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1387   else
1388     domain->mean_path_delay =
1389         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1390 #else
1391   domain->mean_path_delay = mean_path_delay;
1392 #endif
1393
1394 #ifdef USE_MEASUREMENT_FILTERING
1395   /* The tolerance on accepting follow-up after a sync is high, because
1396    * a PTP server doesn't have to prioritise sending FOLLOW_UP - its purpose is
1397    * just to give us the accurate timestamp of the preceding SYNC.
1398    *
1399    * For that reason also allow at least 100ms delay in case of delays smaller
1400    * than 5ms. */
1401   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1402       domain->mean_path_delay != 0
1403       && sync->follow_up_recv_time_local >
1404       sync->sync_recv_time_local + MAX (100 * GST_MSECOND,
1405           20 * domain->mean_path_delay)) {
1406     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1407         " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")", domain->domain,
1408         GST_TIME_ARGS (sync->follow_up_recv_time_local -
1409             sync->sync_recv_time_local),
1410         GST_TIME_ARGS (domain->mean_path_delay));
1411     ret = FALSE;
1412     goto out;
1413   }
1414
1415   if (mean_path_delay > 2 * domain->mean_path_delay) {
1416     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1417         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1418         GST_TIME_ARGS (mean_path_delay),
1419         GST_TIME_ARGS (domain->mean_path_delay));
1420     ret = FALSE;
1421     goto out;
1422   }
1423 #endif
1424
1425   delay_req_delay =
1426       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1427
1428 #ifdef USE_MEASUREMENT_FILTERING
1429   /* delay_req_delay is a RTT, so 2 times the path delay is what we'd
1430    * hope for, but some PTP systems don't prioritise sending DELAY_RESP,
1431    * but they must still have placed an accurate reception timestamp.
1432    * That means we should be quite tolerant about late DELAY_RESP, and
1433    * mostly rely on filtering out jumps in the mean-path-delay elsewhere.
1434    *
1435    * For that reason also allow at least 100ms delay in case of delays smaller
1436    * than 5ms. */
1437   if (delay_req_delay > MAX (100 * GST_MSECOND, 20 * domain->mean_path_delay)) {
1438     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1439         GST_TIME_FORMAT " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")",
1440         domain->domain, GST_TIME_ARGS (delay_req_delay),
1441         GST_TIME_ARGS (domain->mean_path_delay));
1442     ret = FALSE;
1443     goto out;
1444   }
1445 #endif
1446
1447   ret = TRUE;
1448
1449   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1450       GST_TIME_FORMAT ")", domain->domain,
1451       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1452   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1453       domain->domain, GST_TIME_ARGS (delay_req_delay));
1454
1455 #if defined(USE_MEASUREMENT_FILTERING) || defined(USE_MEDIAN_PRE_FILTERING)
1456 out:
1457 #endif
1458   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1459     GstStructure *stats =
1460         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1461         "domain", G_TYPE_UINT, domain->domain,
1462         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1463         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1464         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1465     emit_ptp_statistics (domain->domain, stats);
1466     gst_structure_free (stats);
1467   }
1468
1469   return ret;
1470 }
1471
1472 static void
1473 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1474 {
1475   GList *l;
1476   PtpDomainData *domain = NULL;
1477   PtpPendingSync *sync = NULL;
1478
1479   /* Don't consider messages with the alternate master flag set */
1480   if ((msg->flag_field & 0x0100)) {
1481     GST_TRACE ("Ignoring sync message with alternate-master flag");
1482     return;
1483   }
1484
1485   for (l = domain_data; l; l = l->next) {
1486     PtpDomainData *tmp = l->data;
1487
1488     if (msg->domain_number == tmp->domain) {
1489       domain = tmp;
1490       break;
1491     }
1492   }
1493
1494   if (!domain) {
1495     gchar *clock_name;
1496
1497     domain = g_new0 (PtpDomainData, 1);
1498     domain->domain = msg->domain_number;
1499     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1500     domain->domain_clock =
1501         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1502     gst_object_ref_sink (domain->domain_clock);
1503     g_free (clock_name);
1504     g_queue_init (&domain->pending_syncs);
1505     domain->last_path_delays_missing = 9;
1506     domain_data = g_list_prepend (domain_data, domain);
1507
1508     g_mutex_lock (&domain_clocks_lock);
1509     domain_clocks = g_list_prepend (domain_clocks, domain);
1510     g_mutex_unlock (&domain_clocks_lock);
1511   }
1512
1513   /* If we have a master clock, ignore this message if it's not coming from there */
1514   if (domain->have_master_clock
1515       && compare_clock_identity (&domain->master_clock_identity,
1516           &msg->source_port_identity) != 0)
1517     return;
1518
1519 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1520   /* Opportunistic selection of master clock */
1521   if (!domain->have_master_clock)
1522     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1523         sizeof (PtpClockIdentity));
1524 #else
1525   if (!domain->have_master_clock)
1526     return;
1527 #endif
1528
1529   if (msg->log_message_interval == 0x7f) {
1530     domain->sync_interval = GST_SECOND;
1531
1532     if (!ptpd_hybrid_workaround_warned_once) {
1533       GST_WARNING ("Working around ptpd bug: ptpd sends multicast PTP packets "
1534           "with invalid logMessageInterval");
1535       ptpd_hybrid_workaround_warned_once = TRUE;
1536     }
1537   } else {
1538     domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1539   }
1540
1541   /* Check if duplicated */
1542   for (l = domain->pending_syncs.head; l; l = l->next) {
1543     PtpPendingSync *tmp = l->data;
1544
1545     if (tmp->sync_seqnum == msg->sequence_id)
1546       return;
1547   }
1548
1549   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1550       GST_CLOCK_TIME_NONE / GST_SECOND) {
1551     GST_FIXME ("Unsupported sync message seconds field value: %"
1552         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1553         msg->message_specific.sync.origin_timestamp.seconds_field,
1554         GST_CLOCK_TIME_NONE / GST_SECOND);
1555     return;
1556   }
1557
1558   sync = g_new0 (PtpPendingSync, 1);
1559   sync->domain = domain->domain;
1560   sync->sync_seqnum = msg->sequence_id;
1561   sync->sync_recv_time_local = receive_time;
1562   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1563   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1564   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1565   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1566   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1567
1568   /* 0.5 correction factor for division later */
1569   sync->correction_field_sync = msg->correction_field;
1570
1571   if ((msg->flag_field & 0x0200)) {
1572     /* Wait for FOLLOW_UP */
1573     GST_TRACE ("Waiting for FOLLOW_UP msg");
1574   } else {
1575     sync->sync_send_time_remote =
1576         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1577         sync.origin_timestamp);
1578
1579     if (domain->last_ptp_sync_time != 0
1580         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1581       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1582           GST_TIME_FORMAT, domain->domain,
1583           GST_TIME_ARGS (domain->last_ptp_sync_time),
1584           GST_TIME_ARGS (sync->sync_send_time_remote));
1585       ptp_pending_sync_free (sync);
1586       sync = NULL;
1587       return;
1588     }
1589     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1590
1591     if (send_delay_req (domain, sync)) {
1592       /* Sent delay request */
1593     } else {
1594       update_ptp_time (domain, sync);
1595       ptp_pending_sync_free (sync);
1596       sync = NULL;
1597     }
1598   }
1599
1600   if (sync)
1601     g_queue_push_tail (&domain->pending_syncs, sync);
1602 }
1603
1604 static void
1605 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1606 {
1607   GList *l;
1608   PtpDomainData *domain = NULL;
1609   PtpPendingSync *sync = NULL;
1610
1611   GST_TRACE ("Processing FOLLOW_UP message");
1612
1613   /* Don't consider messages with the alternate master flag set */
1614   if ((msg->flag_field & 0x0100)) {
1615     GST_TRACE ("Ignoring FOLLOW_UP with alternate-master flag");
1616     return;
1617   }
1618
1619   for (l = domain_data; l; l = l->next) {
1620     PtpDomainData *tmp = l->data;
1621
1622     if (msg->domain_number == tmp->domain) {
1623       domain = tmp;
1624       break;
1625     }
1626   }
1627
1628   if (!domain) {
1629     GST_TRACE ("No domain match for FOLLOW_UP msg");
1630     return;
1631   }
1632
1633   /* If we have a master clock, ignore this message if it's not coming from there */
1634   if (domain->have_master_clock
1635       && compare_clock_identity (&domain->master_clock_identity,
1636           &msg->source_port_identity) != 0) {
1637     GST_TRACE ("FOLLOW_UP msg not from current clock master. Ignoring");
1638     return;
1639   }
1640
1641   /* Check if we know about this one */
1642   for (l = domain->pending_syncs.head; l; l = l->next) {
1643     PtpPendingSync *tmp = l->data;
1644
1645     if (tmp->sync_seqnum == msg->sequence_id) {
1646       sync = tmp;
1647       break;
1648     }
1649   }
1650
1651   if (!sync) {
1652     GST_TRACE ("Ignoring FOLLOW_UP with no pending SYNC");
1653     return;
1654   }
1655
1656   /* Got a FOLLOW_UP for this already */
1657   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE) {
1658     GST_TRACE ("Got repeat FOLLOW_UP. Ignoring");
1659     return;
1660   }
1661
1662   if (sync->sync_recv_time_local >= receive_time) {
1663     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1664         GST_TIME_FORMAT, domain->domain,
1665         GST_TIME_ARGS (sync->sync_recv_time_local),
1666         GST_TIME_ARGS (receive_time));
1667     g_queue_remove (&domain->pending_syncs, sync);
1668     ptp_pending_sync_free (sync);
1669     return;
1670   }
1671
1672   sync->correction_field_sync += msg->correction_field;
1673   sync->sync_send_time_remote =
1674       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1675       follow_up.precise_origin_timestamp);
1676   sync->follow_up_recv_time_local = receive_time;
1677
1678   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1679     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1680         GST_TIME_FORMAT, domain->domain,
1681         GST_TIME_ARGS (domain->last_ptp_sync_time),
1682         GST_TIME_ARGS (sync->sync_send_time_remote));
1683     g_queue_remove (&domain->pending_syncs, sync);
1684     ptp_pending_sync_free (sync);
1685     sync = NULL;
1686     return;
1687   }
1688   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1689
1690   if (send_delay_req (domain, sync)) {
1691     /* Sent delay request */
1692   } else {
1693     update_ptp_time (domain, sync);
1694     g_queue_remove (&domain->pending_syncs, sync);
1695     ptp_pending_sync_free (sync);
1696     sync = NULL;
1697   }
1698 }
1699
1700 static void
1701 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1702 {
1703   GList *l;
1704   PtpDomainData *domain = NULL;
1705   PtpPendingSync *sync = NULL;
1706
1707   /* Don't consider messages with the alternate master flag set */
1708   if ((msg->flag_field & 0x0100))
1709     return;
1710
1711   for (l = domain_data; l; l = l->next) {
1712     PtpDomainData *tmp = l->data;
1713
1714     if (msg->domain_number == tmp->domain) {
1715       domain = tmp;
1716       break;
1717     }
1718   }
1719
1720   if (!domain)
1721     return;
1722
1723   /* If we have a master clock, ignore this message if it's not coming from there */
1724   if (domain->have_master_clock
1725       && compare_clock_identity (&domain->master_clock_identity,
1726           &msg->source_port_identity) != 0)
1727     return;
1728
1729   /* Not for us */
1730   if (msg->message_specific.delay_resp.
1731       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1732       || msg->message_specific.delay_resp.
1733       requesting_port_identity.port_number != ptp_clock_id.port_number)
1734     return;
1735
1736   if (msg->log_message_interval == 0x7f) {
1737     domain->min_delay_req_interval = GST_SECOND;
1738
1739     if (!ptpd_hybrid_workaround_warned_once) {
1740       GST_WARNING ("Working around ptpd bug: ptpd sends multicast PTP packets "
1741           "with invalid logMessageInterval");
1742       ptpd_hybrid_workaround_warned_once = TRUE;
1743     }
1744   } else {
1745     domain->min_delay_req_interval =
1746         log2_to_clock_time (msg->log_message_interval);
1747   }
1748
1749   /* Check if we know about this one */
1750   for (l = domain->pending_syncs.head; l; l = l->next) {
1751     PtpPendingSync *tmp = l->data;
1752
1753     if (tmp->delay_req_seqnum == msg->sequence_id) {
1754       sync = tmp;
1755       break;
1756     }
1757   }
1758
1759   if (!sync)
1760     return;
1761
1762   /* Got a DELAY_RESP for this already */
1763   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1764     return;
1765
1766   if (sync->delay_req_send_time_local > receive_time) {
1767     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1768         GST_TIME_FORMAT, domain->domain,
1769         GST_TIME_ARGS (sync->delay_req_send_time_local),
1770         GST_TIME_ARGS (receive_time));
1771     g_queue_remove (&domain->pending_syncs, sync);
1772     ptp_pending_sync_free (sync);
1773     return;
1774   }
1775
1776   sync->correction_field_delay = msg->correction_field;
1777
1778   sync->delay_req_recv_time_remote =
1779       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1780       delay_resp.receive_timestamp);
1781   sync->delay_resp_recv_time_local = receive_time;
1782
1783   if (domain->mean_path_delay != 0
1784       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1785     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1786         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1787         GST_TIME_ARGS (sync->sync_send_time_remote),
1788         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1789     g_queue_remove (&domain->pending_syncs, sync);
1790     ptp_pending_sync_free (sync);
1791     return;
1792   }
1793
1794   if (update_mean_path_delay (domain, sync))
1795     update_ptp_time (domain, sync);
1796   g_queue_remove (&domain->pending_syncs, sync);
1797   ptp_pending_sync_free (sync);
1798 }
1799
1800 static void
1801 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1802 {
1803   /* Ignore our own messages */
1804   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1805       msg->source_port_identity.port_number == ptp_clock_id.port_number) {
1806     GST_TRACE ("Ignoring our own message");
1807     return;
1808   }
1809
1810   GST_TRACE ("Message type %d receive_time %" GST_TIME_FORMAT,
1811       msg->message_type, GST_TIME_ARGS (receive_time));
1812   switch (msg->message_type) {
1813     case PTP_MESSAGE_TYPE_ANNOUNCE:
1814       handle_announce_message (msg, receive_time);
1815       break;
1816     case PTP_MESSAGE_TYPE_SYNC:
1817       handle_sync_message (msg, receive_time);
1818       break;
1819     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1820       handle_follow_up_message (msg, receive_time);
1821       break;
1822     case PTP_MESSAGE_TYPE_DELAY_RESP:
1823       handle_delay_resp_message (msg, receive_time);
1824       break;
1825     default:
1826       break;
1827   }
1828 }
1829
1830 static gboolean
1831 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1832     gpointer user_data)
1833 {
1834   GIOStatus status;
1835   StdIOHeader header;
1836   gchar buffer[8192];
1837   GError *err = NULL;
1838   gsize read;
1839
1840   if ((condition & G_IO_STATUS_EOF)) {
1841     GST_ERROR ("Got EOF on stdin");
1842     g_main_loop_quit (main_loop);
1843     return G_SOURCE_REMOVE;
1844   }
1845
1846   status =
1847       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1848       &read, &err);
1849   if (status == G_IO_STATUS_ERROR) {
1850     GST_ERROR ("Failed to read from stdin: %s", err->message);
1851     g_clear_error (&err);
1852     g_main_loop_quit (main_loop);
1853     return G_SOURCE_REMOVE;
1854   } else if (status == G_IO_STATUS_EOF) {
1855     GST_ERROR ("Got EOF on stdin");
1856     g_main_loop_quit (main_loop);
1857     return G_SOURCE_REMOVE;
1858   } else if (status != G_IO_STATUS_NORMAL) {
1859     GST_ERROR ("Unexpected stdin read status: %d", status);
1860     g_main_loop_quit (main_loop);
1861     return G_SOURCE_REMOVE;
1862   } else if (read != sizeof (header)) {
1863     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1864     g_main_loop_quit (main_loop);
1865     return G_SOURCE_REMOVE;
1866   } else if (header.size > 8192) {
1867     GST_ERROR ("Unexpected size: %u", header.size);
1868     g_main_loop_quit (main_loop);
1869     return G_SOURCE_REMOVE;
1870   }
1871
1872   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1873   if (status == G_IO_STATUS_ERROR) {
1874     GST_ERROR ("Failed to read from stdin: %s", err->message);
1875     g_clear_error (&err);
1876     g_main_loop_quit (main_loop);
1877     return G_SOURCE_REMOVE;
1878   } else if (status == G_IO_STATUS_EOF) {
1879     GST_ERROR ("EOF on stdin");
1880     g_main_loop_quit (main_loop);
1881     return G_SOURCE_REMOVE;
1882   } else if (status != G_IO_STATUS_NORMAL) {
1883     GST_ERROR ("Unexpected stdin read status: %d", status);
1884     g_main_loop_quit (main_loop);
1885     return G_SOURCE_REMOVE;
1886   } else if (read != header.size) {
1887     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1888     g_main_loop_quit (main_loop);
1889     return G_SOURCE_REMOVE;
1890   }
1891
1892   switch (header.type) {
1893     case TYPE_EVENT:
1894     case TYPE_GENERAL:{
1895       GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
1896       PtpMessage msg;
1897
1898       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1899         dump_ptp_message (&msg);
1900         handle_ptp_message (&msg, receive_time);
1901       }
1902       break;
1903     }
1904     default:
1905     case TYPE_CLOCK_ID:{
1906       if (header.size != 8) {
1907         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1908         g_main_loop_quit (main_loop);
1909         return G_SOURCE_REMOVE;
1910       }
1911       g_mutex_lock (&ptp_lock);
1912       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1913 #ifdef G_OS_WIN32
1914       ptp_clock_id.port_number = (guint16) GetCurrentProcessId ();
1915 #else
1916       ptp_clock_id.port_number = getpid ();
1917 #endif
1918       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1919           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1920       g_cond_signal (&ptp_cond);
1921       g_mutex_unlock (&ptp_lock);
1922       break;
1923     }
1924   }
1925
1926   return G_SOURCE_CONTINUE;
1927 }
1928
1929 /* Cleanup all announce messages and announce message senders
1930  * that are timed out by now, and clean up all pending syncs
1931  * that are missing their FOLLOW_UP or DELAY_RESP */
1932 static gboolean
1933 cleanup_cb (gpointer data)
1934 {
1935   GstClockTime now = gst_clock_get_time (observation_system_clock);
1936   GList *l, *m, *n;
1937
1938   for (l = domain_data; l; l = l->next) {
1939     PtpDomainData *domain = l->data;
1940
1941     for (n = domain->announce_senders; n;) {
1942       PtpAnnounceSender *sender = n->data;
1943       gboolean timed_out = TRUE;
1944
1945       /* Keep only 5 messages per sender around */
1946       while (g_queue_get_length (&sender->announce_messages) > 5) {
1947         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1948         g_free (msg);
1949       }
1950
1951       for (m = sender->announce_messages.head; m; m = m->next) {
1952         PtpAnnounceMessage *msg = m->data;
1953
1954         if (msg->receive_time +
1955             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1956           timed_out = FALSE;
1957           break;
1958         }
1959       }
1960
1961       if (timed_out) {
1962         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1963             sender->master_clock_identity.clock_identity,
1964             sender->master_clock_identity.port_number);
1965         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1966         g_queue_clear (&sender->announce_messages);
1967       }
1968
1969       if (g_queue_get_length (&sender->announce_messages) == 0) {
1970         GList *tmp = n->next;
1971
1972         if (compare_clock_identity (&sender->master_clock_identity,
1973                 &domain->master_clock_identity) == 0)
1974           GST_WARNING ("currently selected master clock timed out");
1975         g_free (sender);
1976         domain->announce_senders =
1977             g_list_delete_link (domain->announce_senders, n);
1978         n = tmp;
1979       } else {
1980         n = n->next;
1981       }
1982     }
1983     select_best_master_clock (domain, now);
1984
1985     /* Clean up any pending syncs */
1986     for (n = domain->pending_syncs.head; n;) {
1987       PtpPendingSync *sync = n->data;
1988       gboolean timed_out = FALSE;
1989
1990       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1991        * and pending delay reqs after 4 delay req intervals or 10 seconds
1992        */
1993       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1994           ((domain->min_delay_req_interval != 0
1995                   && sync->delay_req_send_time_local +
1996                   4 * domain->min_delay_req_interval < now)
1997               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1998         timed_out = TRUE;
1999       } else if ((domain->sync_interval != 0
2000               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
2001           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
2002         timed_out = TRUE;
2003       }
2004
2005       if (timed_out) {
2006         GList *tmp = n->next;
2007         ptp_pending_sync_free (sync);
2008         g_queue_delete_link (&domain->pending_syncs, n);
2009         n = tmp;
2010       } else {
2011         n = n->next;
2012       }
2013     }
2014   }
2015
2016   return G_SOURCE_CONTINUE;
2017 }
2018
2019 static gpointer
2020 ptp_helper_main (gpointer data)
2021 {
2022   GSource *cleanup_source;
2023
2024   GST_DEBUG ("Starting PTP helper loop");
2025
2026   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
2027   cleanup_source = g_timeout_source_new_seconds (5);
2028   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
2029   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
2030   g_source_attach (cleanup_source, main_context);
2031   g_source_unref (cleanup_source);
2032
2033   g_main_loop_run (main_loop);
2034   GST_DEBUG ("Stopped PTP helper loop");
2035
2036   g_mutex_lock (&ptp_lock);
2037   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2038   ptp_clock_id.port_number = 0;
2039   initted = FALSE;
2040   g_cond_signal (&ptp_cond);
2041   g_mutex_unlock (&ptp_lock);
2042
2043   return NULL;
2044 }
2045
2046 /**
2047  * gst_ptp_is_supported:
2048  *
2049  * Check if PTP clocks are generally supported on this system, and if previous
2050  * initializations did not fail.
2051  *
2052  * Returns: %TRUE if PTP clocks are generally supported on this system, and
2053  * previous initializations did not fail.
2054  *
2055  * Since: 1.6
2056  */
2057 gboolean
2058 gst_ptp_is_supported (void)
2059 {
2060   return supported;
2061 }
2062
2063 /**
2064  * gst_ptp_is_initialized:
2065  *
2066  * Check if the GStreamer PTP clock subsystem is initialized.
2067  *
2068  * Returns: %TRUE if the GStreamer PTP clock subsystem is initialized.
2069  *
2070  * Since: 1.6
2071  */
2072 gboolean
2073 gst_ptp_is_initialized (void)
2074 {
2075   return initted;
2076 }
2077
2078 /**
2079  * gst_ptp_init:
2080  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
2081  * @interfaces: (transfer none) (array zero-terminated=1) (allow-none): network interfaces to run the clock on
2082  *
2083  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
2084  * slave-only mode for all domains on the given @interfaces with the
2085  * given @clock_id.
2086  *
2087  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
2088  * generated from the MAC address of the first network interface.
2089  *
2090  * This function is automatically called by gst_ptp_clock_new() with default
2091  * parameters if it wasn't called before.
2092  *
2093  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
2094  *
2095  * Since: 1.6
2096  */
2097 gboolean
2098 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
2099 {
2100   gboolean ret;
2101   const gchar *env;
2102   gchar **argv = NULL;
2103   gint argc, argc_c;
2104   gint fd_r, fd_w;
2105   GError *err = NULL;
2106   GSource *stdin_source;
2107
2108   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
2109
2110   g_mutex_lock (&ptp_lock);
2111   if (!supported) {
2112     GST_ERROR ("PTP not supported");
2113     ret = FALSE;
2114     goto done;
2115   }
2116
2117   if (initted) {
2118     GST_DEBUG ("PTP already initialized");
2119     ret = TRUE;
2120     goto done;
2121   }
2122
2123   if (ptp_helper_pid) {
2124     GST_DEBUG ("PTP currently initializing");
2125     goto wait;
2126   }
2127
2128   if (!domain_stats_hooks_initted) {
2129     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2130     domain_stats_hooks_initted = TRUE;
2131   }
2132
2133   argc = 1;
2134   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2135     argc += 2;
2136   if (interfaces != NULL)
2137     argc += 2 * g_strv_length (interfaces);
2138
2139   argv = g_new0 (gchar *, argc + 2);
2140   argc_c = 0;
2141
2142   env = g_getenv ("GST_PTP_HELPER_1_0");
2143   if (env == NULL)
2144     env = g_getenv ("GST_PTP_HELPER");
2145   if (env != NULL && *env != '\0') {
2146     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2147     argv[argc_c++] = g_strdup (env);
2148   } else {
2149     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2150   }
2151
2152   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2153     argv[argc_c++] = g_strdup ("-c");
2154     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2155   }
2156
2157   if (interfaces != NULL) {
2158     gchar **ptr = interfaces;
2159
2160     while (*ptr) {
2161       argv[argc_c++] = g_strdup ("-i");
2162       argv[argc_c++] = g_strdup (*ptr);
2163       ptr++;
2164     }
2165   }
2166
2167   main_context = g_main_context_new ();
2168   main_loop = g_main_loop_new (main_context, FALSE);
2169
2170   ptp_helper_thread =
2171       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2172   if (!ptp_helper_thread) {
2173     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2174     g_clear_error (&err);
2175     ret = FALSE;
2176     goto done;
2177   }
2178
2179   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2180           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2181     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2182     g_clear_error (&err);
2183     ret = FALSE;
2184     supported = FALSE;
2185     goto done;
2186   }
2187
2188   stdin_channel = g_io_channel_unix_new (fd_r);
2189   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2190   g_io_channel_set_buffered (stdin_channel, FALSE);
2191   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2192   stdin_source =
2193       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2194   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2195   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2196       NULL);
2197   g_source_attach (stdin_source, main_context);
2198   g_source_unref (stdin_source);
2199
2200   /* Create stdout channel */
2201   stdout_channel = g_io_channel_unix_new (fd_w);
2202   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2203   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2204   g_io_channel_set_buffered (stdout_channel, FALSE);
2205
2206   delay_req_rand = g_rand_new ();
2207   observation_system_clock =
2208       g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
2209       NULL);
2210   gst_object_ref_sink (observation_system_clock);
2211
2212   initted = TRUE;
2213
2214 wait:
2215   GST_DEBUG ("Waiting for PTP to be initialized");
2216
2217   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2218     g_cond_wait (&ptp_cond, &ptp_lock);
2219
2220   ret = initted;
2221   if (ret) {
2222     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2223         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2224   } else {
2225     GST_ERROR ("Failed to initialize");
2226     supported = FALSE;
2227   }
2228
2229 done:
2230   g_strfreev (argv);
2231
2232   if (!ret) {
2233     if (ptp_helper_pid) {
2234 #ifndef G_OS_WIN32
2235       kill (ptp_helper_pid, SIGKILL);
2236       waitpid (ptp_helper_pid, NULL, 0);
2237 #else
2238       TerminateProcess (ptp_helper_pid, 1);
2239       WaitForSingleObject (ptp_helper_pid, INFINITE);
2240 #endif
2241       g_spawn_close_pid (ptp_helper_pid);
2242     }
2243     ptp_helper_pid = 0;
2244
2245     if (stdin_channel)
2246       g_io_channel_unref (stdin_channel);
2247     stdin_channel = NULL;
2248     if (stdout_channel)
2249       g_io_channel_unref (stdout_channel);
2250     stdout_channel = NULL;
2251
2252     if (main_loop && ptp_helper_thread) {
2253       g_main_loop_quit (main_loop);
2254       g_thread_join (ptp_helper_thread);
2255     }
2256     ptp_helper_thread = NULL;
2257     if (main_loop)
2258       g_main_loop_unref (main_loop);
2259     main_loop = NULL;
2260     if (main_context)
2261       g_main_context_unref (main_context);
2262     main_context = NULL;
2263
2264     if (delay_req_rand)
2265       g_rand_free (delay_req_rand);
2266     delay_req_rand = NULL;
2267
2268     if (observation_system_clock)
2269       gst_object_unref (observation_system_clock);
2270     observation_system_clock = NULL;
2271   }
2272
2273   g_mutex_unlock (&ptp_lock);
2274
2275   return ret;
2276 }
2277
2278 /**
2279  * gst_ptp_deinit:
2280  *
2281  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2282  * are any remaining GstPtpClock instances, they won't be further synchronized
2283  * to the PTP network clock.
2284  *
2285  * Since: 1.6
2286  */
2287 void
2288 gst_ptp_deinit (void)
2289 {
2290   GList *l, *m;
2291
2292   g_mutex_lock (&ptp_lock);
2293
2294   if (ptp_helper_pid) {
2295 #ifndef G_OS_WIN32
2296     kill (ptp_helper_pid, SIGKILL);
2297     waitpid (ptp_helper_pid, NULL, 0);
2298 #else
2299     TerminateProcess (ptp_helper_pid, 1);
2300     WaitForSingleObject (ptp_helper_pid, INFINITE);
2301 #endif
2302     g_spawn_close_pid (ptp_helper_pid);
2303   }
2304   ptp_helper_pid = 0;
2305
2306   if (stdin_channel)
2307     g_io_channel_unref (stdin_channel);
2308   stdin_channel = NULL;
2309   if (stdout_channel)
2310     g_io_channel_unref (stdout_channel);
2311   stdout_channel = NULL;
2312
2313   if (main_loop && ptp_helper_thread) {
2314     GThread *tmp = ptp_helper_thread;
2315     ptp_helper_thread = NULL;
2316     g_mutex_unlock (&ptp_lock);
2317     g_main_loop_quit (main_loop);
2318     g_thread_join (tmp);
2319     g_mutex_lock (&ptp_lock);
2320   }
2321   if (main_loop)
2322     g_main_loop_unref (main_loop);
2323   main_loop = NULL;
2324   if (main_context)
2325     g_main_context_unref (main_context);
2326   main_context = NULL;
2327
2328   if (delay_req_rand)
2329     g_rand_free (delay_req_rand);
2330   delay_req_rand = NULL;
2331   if (observation_system_clock)
2332     gst_object_unref (observation_system_clock);
2333   observation_system_clock = NULL;
2334
2335   for (l = domain_data; l; l = l->next) {
2336     PtpDomainData *domain = l->data;
2337
2338     for (m = domain->announce_senders; m; m = m->next) {
2339       PtpAnnounceSender *sender = m->data;
2340
2341       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2342       g_queue_clear (&sender->announce_messages);
2343       g_free (sender);
2344     }
2345     g_list_free (domain->announce_senders);
2346
2347     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2348         NULL);
2349     g_queue_clear (&domain->pending_syncs);
2350     gst_object_unref (domain->domain_clock);
2351     g_free (domain);
2352   }
2353   g_list_free (domain_data);
2354   domain_data = NULL;
2355   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2356   g_list_free (domain_clocks);
2357   domain_clocks = NULL;
2358
2359   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2360   ptp_clock_id.port_number = 0;
2361
2362   initted = FALSE;
2363
2364   g_mutex_unlock (&ptp_lock);
2365 }
2366
2367 #define DEFAULT_DOMAIN 0
2368
2369 enum
2370 {
2371   PROP_0,
2372   PROP_DOMAIN,
2373   PROP_INTERNAL_CLOCK,
2374   PROP_MASTER_CLOCK_ID,
2375   PROP_GRANDMASTER_CLOCK_ID
2376 };
2377
2378 struct _GstPtpClockPrivate
2379 {
2380   guint domain;
2381   GstClock *domain_clock;
2382   gulong domain_stats_id;
2383 };
2384
2385 #define gst_ptp_clock_parent_class parent_class
2386 G_DEFINE_TYPE_WITH_PRIVATE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2387
2388 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2389     const GValue * value, GParamSpec * pspec);
2390 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2391     GValue * value, GParamSpec * pspec);
2392 static void gst_ptp_clock_finalize (GObject * object);
2393
2394 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2395
2396 static void
2397 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2398 {
2399   GObjectClass *gobject_class;
2400   GstClockClass *clock_class;
2401
2402   gobject_class = G_OBJECT_CLASS (klass);
2403   clock_class = GST_CLOCK_CLASS (klass);
2404
2405   gobject_class->finalize = gst_ptp_clock_finalize;
2406   gobject_class->get_property = gst_ptp_clock_get_property;
2407   gobject_class->set_property = gst_ptp_clock_set_property;
2408
2409   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2410       g_param_spec_uint ("domain", "Domain",
2411           "The PTP domain", 0, G_MAXUINT8,
2412           DEFAULT_DOMAIN,
2413           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2414
2415   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2416       g_param_spec_object ("internal-clock", "Internal Clock",
2417           "Internal clock", GST_TYPE_CLOCK,
2418           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2419
2420   g_object_class_install_property (gobject_class, PROP_MASTER_CLOCK_ID,
2421       g_param_spec_uint64 ("master-clock-id", "Master Clock ID",
2422           "Master Clock ID", 0, G_MAXUINT64, 0,
2423           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2424
2425   g_object_class_install_property (gobject_class, PROP_GRANDMASTER_CLOCK_ID,
2426       g_param_spec_uint64 ("grandmaster-clock-id", "Grand Master Clock ID",
2427           "Grand Master Clock ID", 0, G_MAXUINT64, 0,
2428           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2429
2430   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2431 }
2432
2433 static void
2434 gst_ptp_clock_init (GstPtpClock * self)
2435 {
2436   GstPtpClockPrivate *priv;
2437
2438   self->priv = priv = gst_ptp_clock_get_instance_private (self);
2439
2440   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2441   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2442
2443   priv->domain = DEFAULT_DOMAIN;
2444 }
2445
2446 static gboolean
2447 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2448 {
2449   gboolean got_clock = TRUE;
2450
2451   if (G_UNLIKELY (!self->priv->domain_clock)) {
2452     g_mutex_lock (&domain_clocks_lock);
2453     if (!self->priv->domain_clock) {
2454       GList *l;
2455
2456       got_clock = FALSE;
2457
2458       for (l = domain_clocks; l; l = l->next) {
2459         PtpDomainData *clock_data = l->data;
2460
2461         if (clock_data->domain == self->priv->domain &&
2462             clock_data->have_master_clock && clock_data->last_ptp_time != 0) {
2463           GST_DEBUG ("Switching domain clock on domain %d", clock_data->domain);
2464           self->priv->domain_clock = clock_data->domain_clock;
2465           got_clock = TRUE;
2466           break;
2467         }
2468       }
2469     }
2470     g_mutex_unlock (&domain_clocks_lock);
2471     if (got_clock) {
2472       g_object_notify (G_OBJECT (self), "internal-clock");
2473       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2474     }
2475   }
2476
2477   return got_clock;
2478 }
2479
2480 static gboolean
2481 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2482     gpointer user_data)
2483 {
2484   GstPtpClock *self = user_data;
2485
2486   if (domain != self->priv->domain
2487       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2488     return TRUE;
2489
2490   /* Let's set our internal clock */
2491   if (!gst_ptp_clock_ensure_domain_clock (self))
2492     return TRUE;
2493
2494   self->priv->domain_stats_id = 0;
2495
2496   return FALSE;
2497 }
2498
2499 static void
2500 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2501     const GValue * value, GParamSpec * pspec)
2502 {
2503   GstPtpClock *self = GST_PTP_CLOCK (object);
2504
2505   switch (prop_id) {
2506     case PROP_DOMAIN:
2507       self->priv->domain = g_value_get_uint (value);
2508       gst_ptp_clock_ensure_domain_clock (self);
2509       if (!self->priv->domain_clock)
2510         self->priv->domain_stats_id =
2511             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2512             NULL);
2513       break;
2514     default:
2515       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2516       break;
2517   }
2518 }
2519
2520 static void
2521 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2522     GValue * value, GParamSpec * pspec)
2523 {
2524   GstPtpClock *self = GST_PTP_CLOCK (object);
2525
2526   switch (prop_id) {
2527     case PROP_DOMAIN:
2528       g_value_set_uint (value, self->priv->domain);
2529       break;
2530     case PROP_INTERNAL_CLOCK:
2531       gst_ptp_clock_ensure_domain_clock (self);
2532       g_value_set_object (value, self->priv->domain_clock);
2533       break;
2534     case PROP_MASTER_CLOCK_ID:
2535     case PROP_GRANDMASTER_CLOCK_ID:{
2536       GList *l;
2537
2538       g_mutex_lock (&domain_clocks_lock);
2539       g_value_set_uint64 (value, 0);
2540
2541       for (l = domain_clocks; l; l = l->next) {
2542         PtpDomainData *clock_data = l->data;
2543
2544         if (clock_data->domain == self->priv->domain) {
2545           if (prop_id == PROP_MASTER_CLOCK_ID)
2546             g_value_set_uint64 (value,
2547                 clock_data->master_clock_identity.clock_identity);
2548           else
2549             g_value_set_uint64 (value, clock_data->grandmaster_identity);
2550           break;
2551         }
2552       }
2553       g_mutex_unlock (&domain_clocks_lock);
2554       break;
2555     }
2556     default:
2557       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2558       break;
2559   }
2560 }
2561
2562 static void
2563 gst_ptp_clock_finalize (GObject * object)
2564 {
2565   GstPtpClock *self = GST_PTP_CLOCK (object);
2566
2567   if (self->priv->domain_stats_id)
2568     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2569
2570   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2571 }
2572
2573 static GstClockTime
2574 gst_ptp_clock_get_internal_time (GstClock * clock)
2575 {
2576   GstPtpClock *self = GST_PTP_CLOCK (clock);
2577
2578   gst_ptp_clock_ensure_domain_clock (self);
2579
2580   if (!self->priv->domain_clock) {
2581     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2582         self->priv->domain);
2583     return GST_CLOCK_TIME_NONE;
2584   }
2585
2586   return gst_clock_get_time (self->priv->domain_clock);
2587 }
2588
2589 /**
2590  * gst_ptp_clock_new:
2591  * @name: Name of the clock
2592  * @domain: PTP domain
2593  *
2594  * Creates a new PTP clock instance that exports the PTP time of the master
2595  * clock in @domain. This clock can be slaved to other clocks as needed.
2596  *
2597  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2598  * default parameters.
2599  *
2600  * This clock only returns valid timestamps after it received the first
2601  * times from the PTP master clock on the network. Once this happens the
2602  * GstPtpClock::internal-clock property will become non-NULL. You can
2603  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
2604  * gst_clock_is_synced().
2605  *
2606  * Returns: (transfer full): A new #GstClock
2607  *
2608  * Since: 1.6
2609  */
2610 GstClock *
2611 gst_ptp_clock_new (const gchar * name, guint domain)
2612 {
2613   GstClock *clock;
2614
2615   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2616
2617   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2618     GST_ERROR ("Failed to initialize PTP");
2619     return NULL;
2620   }
2621
2622   clock = g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2623       NULL);
2624
2625   /* Clear floating flag */
2626   gst_object_ref_sink (clock);
2627
2628   return clock;
2629 }
2630
2631 typedef struct
2632 {
2633   guint8 domain;
2634   const GstStructure *stats;
2635 } DomainStatsMarshalData;
2636
2637 static void
2638 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2639 {
2640   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2641
2642   if (!callback (data->domain, data->stats, hook->data))
2643     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2644 }
2645
2646 static void
2647 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2648 {
2649   DomainStatsMarshalData data = { domain, stats };
2650
2651   g_mutex_lock (&ptp_lock);
2652   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2653       (GHookMarshaller) domain_stats_marshaller, &data);
2654   g_mutex_unlock (&ptp_lock);
2655 }
2656
2657 /**
2658  * gst_ptp_statistics_callback_add:
2659  * @callback: GstPtpStatisticsCallback to call
2660  * @user_data: Data to pass to the callback
2661  * @destroy_data: GDestroyNotify to destroy the data
2662  *
2663  * Installs a new statistics callback for gathering PTP statistics. See
2664  * GstPtpStatisticsCallback for a list of statistics that are provided.
2665  *
2666  * Returns: Id for the callback that can be passed to
2667  * gst_ptp_statistics_callback_remove()
2668  *
2669  * Since: 1.6
2670  */
2671 gulong
2672 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2673     gpointer user_data, GDestroyNotify destroy_data)
2674 {
2675   GHook *hook;
2676
2677   g_mutex_lock (&ptp_lock);
2678
2679   if (!domain_stats_hooks_initted) {
2680     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2681     domain_stats_hooks_initted = TRUE;
2682   }
2683
2684   hook = g_hook_alloc (&domain_stats_hooks);
2685   hook->func = callback;
2686   hook->data = user_data;
2687   hook->destroy = destroy_data;
2688   g_hook_prepend (&domain_stats_hooks, hook);
2689   g_atomic_int_add (&domain_stats_n_hooks, 1);
2690
2691   g_mutex_unlock (&ptp_lock);
2692
2693   return hook->hook_id;
2694 }
2695
2696 /**
2697  * gst_ptp_statistics_callback_remove:
2698  * @id: Callback id to remove
2699  *
2700  * Removes a PTP statistics callback that was previously added with
2701  * gst_ptp_statistics_callback_add().
2702  *
2703  * Since: 1.6
2704  */
2705 void
2706 gst_ptp_statistics_callback_remove (gulong id)
2707 {
2708   g_mutex_lock (&ptp_lock);
2709   if (g_hook_destroy (&domain_stats_hooks, id))
2710     g_atomic_int_add (&domain_stats_n_hooks, -1);
2711   g_mutex_unlock (&ptp_lock);
2712 }