ptpclock: Fix documentation a bit
[platform/upstream/gstreamer.git] / libs / gst / net / gstptpclock.c
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @short_description: Special clock that synchronizes to a remote time
23  *                     provider via PTP (IEEE1588:2008).
24  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
25  *
26  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
27  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
28  * clock in some specific domain.
29  *
30  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
31  * a helper process to do the actual communication via the PTP ports. This is
32  * required as PTP listens on ports < 1024 and thus requires special
33  * privileges. Once this helper process is started, the main process will
34  * synchronize to all PTP domains that are detected on the selected
35  * interfaces.
36  *
37  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
38  * time from a master clock inside a specific PTP domain. This clock will only
39  * return valid timestamps once the timestamps in the PTP domain are known. To
40  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
41  * signal and gst_clock_is_synced().
42  *
43  *
44  * To gather statistics about the PTP clock synchronization,
45  * gst_ptp_statistics_callback_add() can be used. This gives the application
46  * the possibility to collect all kinds of statistics from the clock
47  * synchronization.
48  *
49  * Since: 1.6
50  *
51  */
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include "gstptpclock.h"
57
58 #ifdef HAVE_PTP
59
60 #include "gstptp_private.h"
61
62 #include <sys/wait.h>
63 #include <sys/types.h>
64 #include <unistd.h>
65
66 #include <gst/base/base.h>
67
68 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
69 #define GST_CAT_DEFAULT (ptp_debug)
70
71 /* IEEE 1588 7.7.3.1 */
72 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
73
74 /* Use a running average for calculating the mean path delay instead
75  * of just using the last measurement. Enabling this helps in unreliable
76  * networks, like wifi, with often changing delays
77  *
78  * Undef for following IEEE1588-2008 by the letter
79  */
80 #define USE_RUNNING_AVERAGE_DELAY 1
81
82 /* Filter out any measurements that are above a certain threshold compared to
83  * previous measurements. Enabling this helps filtering out outliers that
84  * happen fairly often in unreliable networks, like wifi.
85  *
86  * Undef for following IEEE1588-2008 by the letter
87  */
88 #define USE_MEASUREMENT_FILTERING 1
89
90 /* Select the first clock from which we capture a SYNC message as the master
91  * clock of the domain until we are ready to run the best master clock
92  * algorithm. This allows faster syncing but might mean a change of the master
93  * clock in the beginning. As all clocks in a domain are supposed to use the
94  * same time, this shouldn't be much of a problem.
95  *
96  * Undef for following IEEE1588-2008 by the letter
97  */
98 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
99
100 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
101  * afterwards. This allows better synchronization in networks with varying
102  * delays, as for every other SYNC message we would have to assume that it's
103  * the average of what we saw before. But that might be completely off
104  */
105 #define USE_ONLY_SYNC_WITH_DELAY 1
106
107 /* Filter out delay measurements that are too far away from the median of the
108  * last delay measurements, currently those that are more than 2 times as big.
109  * This increases accuracy a lot on wifi.
110  */
111 #define USE_MEDIAN_PRE_FILTERING 1
112 #define MEDIAN_PRE_FILTERING_WINDOW 9
113
114 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
115 #define MAX_SKIPPED_UPDATES 5
116
117 typedef enum
118 {
119   PTP_MESSAGE_TYPE_SYNC = 0x0,
120   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
121   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
122   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
123   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
124   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
125   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
126   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
127   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
128   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
129 } PtpMessageType;
130
131 typedef struct
132 {
133   guint64 seconds_field;        /* 48 bits valid */
134   guint32 nanoseconds_field;
135 } PtpTimestamp;
136
137 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
138 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
139 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
140
141 typedef struct
142 {
143   guint64 clock_identity;
144   guint16 port_number;
145 } PtpClockIdentity;
146
147 static gint
148 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
149 {
150   if (a->clock_identity < b->clock_identity)
151     return -1;
152   else if (a->clock_identity > b->clock_identity)
153     return 1;
154
155   if (a->port_number < b->port_number)
156     return -1;
157   else if (a->port_number > b->port_number)
158     return 1;
159
160   return 0;
161 }
162
163 typedef struct
164 {
165   guint8 clock_class;
166   guint8 clock_accuracy;
167   guint16 offset_scaled_log_variance;
168 } PtpClockQuality;
169
170 typedef struct
171 {
172   guint8 transport_specific;
173   PtpMessageType message_type;
174   /* guint8 reserved; */
175   guint8 version_ptp;
176   guint16 message_length;
177   guint8 domain_number;
178   /* guint8 reserved; */
179   guint16 flag_field;
180   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
181   /* guint32 reserved; */
182   PtpClockIdentity source_port_identity;
183   guint16 sequence_id;
184   guint8 control_field;
185   gint8 log_message_interval;
186
187   union
188   {
189     struct
190     {
191       PtpTimestamp origin_timestamp;
192       gint16 current_utc_offset;
193       /* guint8 reserved; */
194       guint8 grandmaster_priority_1;
195       PtpClockQuality grandmaster_clock_quality;
196       guint8 grandmaster_priority_2;
197       guint64 grandmaster_identity;
198       guint16 steps_removed;
199       guint8 time_source;
200     } announce;
201
202     struct
203     {
204       PtpTimestamp origin_timestamp;
205     } sync;
206
207     struct
208     {
209       PtpTimestamp precise_origin_timestamp;
210     } follow_up;
211
212     struct
213     {
214       PtpTimestamp origin_timestamp;
215     } delay_req;
216
217     struct
218     {
219       PtpTimestamp receive_timestamp;
220       PtpClockIdentity requesting_port_identity;
221     } delay_resp;
222
223   } message_specific;
224 } PtpMessage;
225
226 static GMutex ptp_lock;
227 static GCond ptp_cond;
228 static gboolean initted = FALSE;
229 static gboolean supported = TRUE;
230 static GPid ptp_helper_pid;
231 static GThread *ptp_helper_thread;
232 static GMainContext *main_context;
233 static GMainLoop *main_loop;
234 static GIOChannel *stdin_channel, *stdout_channel;
235 static GRand *delay_req_rand;
236 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
237
238 typedef struct
239 {
240   GstClockTime receive_time;
241
242   PtpClockIdentity master_clock_identity;
243
244   guint8 grandmaster_priority_1;
245   PtpClockQuality grandmaster_clock_quality;
246   guint8 grandmaster_priority_2;
247   guint64 grandmaster_identity;
248   guint16 steps_removed;
249   guint8 time_source;
250
251   guint16 sequence_id;
252 } PtpAnnounceMessage;
253
254 typedef struct
255 {
256   PtpClockIdentity master_clock_identity;
257
258   GstClockTime announce_interval;       /* last interval we received */
259   GQueue announce_messages;
260 } PtpAnnounceSender;
261
262 typedef struct
263 {
264   guint domain;
265   PtpClockIdentity master_clock_identity;
266
267   guint16 sync_seqnum;
268   GstClockTime sync_recv_time_local;    /* t2 */
269   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
270   GstClockTime follow_up_recv_time_local;
271
272   GSource *timeout_source;
273   guint16 delay_req_seqnum;
274   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
275   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
276   GstClockTime delay_resp_recv_time_local;
277
278   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
279   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
280 } PtpPendingSync;
281
282 static void
283 ptp_pending_sync_free (PtpPendingSync * sync)
284 {
285   if (sync->timeout_source)
286     g_source_destroy (sync->timeout_source);
287   g_free (sync);
288 }
289
290 typedef struct
291 {
292   guint domain;
293
294   GstClockTime last_ptp_time;
295   GstClockTime last_local_time;
296   gint skipped_updates;
297
298   /* Used for selecting the master/grandmaster */
299   GList *announce_senders;
300
301   /* Last selected master clock */
302   gboolean have_master_clock;
303   PtpClockIdentity master_clock_identity;
304   guint64 grandmaster_identity;
305
306   /* Last SYNC or FOLLOW_UP timestamp we received */
307   GstClockTime last_ptp_sync_time;
308   GstClockTime sync_interval;
309
310   GstClockTime mean_path_delay;
311   GstClockTime last_delay_req, min_delay_req_interval;
312   guint16 last_delay_req_seqnum;
313
314   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
315   gint last_path_delays_missing;
316
317   GQueue pending_syncs;
318
319   GstClock *domain_clock;
320 } PtpDomainData;
321
322 static GList *domain_data;
323 static GMutex domain_clocks_lock;
324 static GList *domain_clocks;
325
326 /* Protected by PTP lock */
327 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
328 static GHookList domain_stats_hooks;
329 static gint domain_stats_n_hooks;
330 static gboolean domain_stats_hooks_initted = FALSE;
331
332 /* Converts log2 seconds to GstClockTime */
333 static GstClockTime
334 log2_to_clock_time (gint l)
335 {
336   if (l < 0)
337     return GST_SECOND >> (-l);
338   else
339     return GST_SECOND << l;
340 }
341
342 static void
343 dump_ptp_message (PtpMessage * msg)
344 {
345   GST_TRACE ("PTP message:");
346   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
347   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
348   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
349   GST_TRACE ("\tmessage_length: %u", msg->message_length);
350   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
351   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
352   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
353       (msg->correction_field / 65536),
354       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
355   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
356       msg->source_port_identity.clock_identity,
357       msg->source_port_identity.port_number);
358   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
359   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
360   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
361       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
362
363   switch (msg->message_type) {
364     case PTP_MESSAGE_TYPE_ANNOUNCE:
365       GST_TRACE ("\tANNOUNCE:");
366       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
367           msg->message_specific.announce.origin_timestamp.seconds_field,
368           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
369       GST_TRACE ("\t\tcurrent_utc_offset: %d",
370           msg->message_specific.announce.current_utc_offset);
371       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
372           msg->message_specific.announce.grandmaster_priority_1);
373       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
374           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
375           msg->message_specific.announce.
376           grandmaster_clock_quality.clock_accuracy,
377           msg->message_specific.announce.
378           grandmaster_clock_quality.offset_scaled_log_variance);
379       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
380           msg->message_specific.announce.grandmaster_priority_2);
381       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
382           msg->message_specific.announce.grandmaster_identity);
383       GST_TRACE ("\t\tsteps_removed: %u",
384           msg->message_specific.announce.steps_removed);
385       GST_TRACE ("\t\ttime_source: 0x%02x",
386           msg->message_specific.announce.time_source);
387       break;
388     case PTP_MESSAGE_TYPE_SYNC:
389       GST_TRACE ("\tSYNC:");
390       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
391           msg->message_specific.sync.origin_timestamp.seconds_field,
392           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
393       break;
394     case PTP_MESSAGE_TYPE_FOLLOW_UP:
395       GST_TRACE ("\tFOLLOW_UP:");
396       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
397           msg->message_specific.follow_up.
398           precise_origin_timestamp.seconds_field,
399           msg->message_specific.follow_up.
400           precise_origin_timestamp.nanoseconds_field);
401       break;
402     case PTP_MESSAGE_TYPE_DELAY_REQ:
403       GST_TRACE ("\tDELAY_REQ:");
404       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
405           msg->message_specific.delay_req.origin_timestamp.seconds_field,
406           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
407       break;
408     case PTP_MESSAGE_TYPE_DELAY_RESP:
409       GST_TRACE ("\tDELAY_RESP:");
410       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
411           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
412           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
413       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
414           "x %u",
415           msg->message_specific.delay_resp.
416           requesting_port_identity.clock_identity,
417           msg->message_specific.delay_resp.
418           requesting_port_identity.port_number);
419       break;
420     default:
421       break;
422   }
423   GST_TRACE (" ");
424 }
425
426 /* IEEE 1588-2008 5.3.3 */
427 static gboolean
428 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
429 {
430   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
431
432   timestamp->seconds_field =
433       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
434       gst_byte_reader_get_uint16_be_unchecked (reader);
435   timestamp->nanoseconds_field =
436       gst_byte_reader_get_uint32_be_unchecked (reader);
437
438   if (timestamp->nanoseconds_field >= 1000000000)
439     return FALSE;
440
441   return TRUE;
442 }
443
444 /* IEEE 1588-2008 13.3 */
445 static gboolean
446 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
447 {
448   guint8 b;
449
450   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
451
452   b = gst_byte_reader_get_uint8_unchecked (reader);
453   msg->transport_specific = b >> 4;
454   msg->message_type = b & 0x0f;
455
456   b = gst_byte_reader_get_uint8_unchecked (reader);
457   msg->version_ptp = b & 0x0f;
458   if (msg->version_ptp != 2) {
459     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
460     return FALSE;
461   }
462
463   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
464   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
465     GST_WARNING ("Not enough data (%u < %u)",
466         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
467     return FALSE;
468   }
469
470   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
471   gst_byte_reader_skip_unchecked (reader, 1);
472
473   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
474   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
475   gst_byte_reader_skip_unchecked (reader, 4);
476
477   msg->source_port_identity.clock_identity =
478       gst_byte_reader_get_uint64_be_unchecked (reader);
479   msg->source_port_identity.port_number =
480       gst_byte_reader_get_uint16_be_unchecked (reader);
481
482   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
483   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
484   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
485
486   return TRUE;
487 }
488
489 /* IEEE 1588-2008 13.5 */
490 static gboolean
491 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
492 {
493   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
494
495   if (gst_byte_reader_get_remaining (reader) < 20)
496     return FALSE;
497
498   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
499           reader))
500     return FALSE;
501
502   msg->message_specific.announce.current_utc_offset =
503       gst_byte_reader_get_uint16_be_unchecked (reader);
504   gst_byte_reader_skip_unchecked (reader, 1);
505
506   msg->message_specific.announce.grandmaster_priority_1 =
507       gst_byte_reader_get_uint8_unchecked (reader);
508   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
509       gst_byte_reader_get_uint8_unchecked (reader);
510   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
511       gst_byte_reader_get_uint8_unchecked (reader);
512   msg->message_specific.announce.
513       grandmaster_clock_quality.offset_scaled_log_variance =
514       gst_byte_reader_get_uint16_be_unchecked (reader);
515   msg->message_specific.announce.grandmaster_priority_2 =
516       gst_byte_reader_get_uint8_unchecked (reader);
517   msg->message_specific.announce.grandmaster_identity =
518       gst_byte_reader_get_uint64_be_unchecked (reader);
519   msg->message_specific.announce.steps_removed =
520       gst_byte_reader_get_uint16_be_unchecked (reader);
521   msg->message_specific.announce.time_source =
522       gst_byte_reader_get_uint8_unchecked (reader);
523
524   return TRUE;
525 }
526
527 /* IEEE 1588-2008 13.6 */
528 static gboolean
529 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
530 {
531   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
532
533   if (gst_byte_reader_get_remaining (reader) < 10)
534     return FALSE;
535
536   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
537           reader))
538     return FALSE;
539
540   return TRUE;
541 }
542
543 /* IEEE 1588-2008 13.6 */
544 static gboolean
545 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
546 {
547   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
548
549   if (gst_byte_reader_get_remaining (reader) < 10)
550     return FALSE;
551
552   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
553           reader))
554     return FALSE;
555
556   return TRUE;
557 }
558
559 /* IEEE 1588-2008 13.7 */
560 static gboolean
561 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
562 {
563   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
564
565   if (gst_byte_reader_get_remaining (reader) < 10)
566     return FALSE;
567
568   if (!parse_ptp_timestamp (&msg->message_specific.
569           follow_up.precise_origin_timestamp, reader))
570     return FALSE;
571
572   return TRUE;
573 }
574
575 /* IEEE 1588-2008 13.8 */
576 static gboolean
577 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
578 {
579   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
580       FALSE);
581
582   if (gst_byte_reader_get_remaining (reader) < 20)
583     return FALSE;
584
585   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
586           reader))
587     return FALSE;
588
589   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
590       gst_byte_reader_get_uint64_be_unchecked (reader);
591   msg->message_specific.delay_resp.requesting_port_identity.port_number =
592       gst_byte_reader_get_uint16_be_unchecked (reader);
593
594   return TRUE;
595 }
596
597 static gboolean
598 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
599 {
600   GstByteReader reader;
601   gboolean ret = FALSE;
602
603   gst_byte_reader_init (&reader, data, size);
604
605   if (!parse_ptp_message_header (msg, &reader)) {
606     GST_WARNING ("Failed to parse PTP message header");
607     return FALSE;
608   }
609
610   switch (msg->message_type) {
611     case PTP_MESSAGE_TYPE_SYNC:
612       ret = parse_ptp_message_sync (msg, &reader);
613       break;
614     case PTP_MESSAGE_TYPE_FOLLOW_UP:
615       ret = parse_ptp_message_follow_up (msg, &reader);
616       break;
617     case PTP_MESSAGE_TYPE_DELAY_REQ:
618       ret = parse_ptp_message_delay_req (msg, &reader);
619       break;
620     case PTP_MESSAGE_TYPE_DELAY_RESP:
621       ret = parse_ptp_message_delay_resp (msg, &reader);
622       break;
623     case PTP_MESSAGE_TYPE_ANNOUNCE:
624       ret = parse_ptp_message_announce (msg, &reader);
625       break;
626     default:
627       /* ignore for now */
628       break;
629   }
630
631   return ret;
632 }
633
634 static gint
635 compare_announce_message (const PtpAnnounceMessage * a,
636     const PtpAnnounceMessage * b)
637 {
638   /* IEEE 1588 Figure 27 */
639   if (a->grandmaster_identity == b->grandmaster_identity) {
640     if (a->steps_removed + 1 < b->steps_removed)
641       return -1;
642     else if (a->steps_removed > b->steps_removed + 1)
643       return 1;
644
645     /* Error cases are filtered out earlier */
646     if (a->steps_removed < b->steps_removed)
647       return -1;
648     else if (a->steps_removed > b->steps_removed)
649       return 1;
650
651     /* Error cases are filtered out earlier */
652     if (a->master_clock_identity.clock_identity <
653         b->master_clock_identity.clock_identity)
654       return -1;
655     else if (a->master_clock_identity.clock_identity >
656         b->master_clock_identity.clock_identity)
657       return 1;
658
659     /* Error cases are filtered out earlier */
660     if (a->master_clock_identity.port_number <
661         b->master_clock_identity.port_number)
662       return -1;
663     else if (a->master_clock_identity.port_number >
664         b->master_clock_identity.port_number)
665       return 1;
666     else
667       g_assert_not_reached ();
668
669     return 0;
670   }
671
672   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
673     return -1;
674   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
675     return 1;
676
677   if (a->grandmaster_clock_quality.clock_class <
678       b->grandmaster_clock_quality.clock_class)
679     return -1;
680   else if (a->grandmaster_clock_quality.clock_class >
681       b->grandmaster_clock_quality.clock_class)
682     return 1;
683
684   if (a->grandmaster_clock_quality.clock_accuracy <
685       b->grandmaster_clock_quality.clock_accuracy)
686     return -1;
687   else if (a->grandmaster_clock_quality.clock_accuracy >
688       b->grandmaster_clock_quality.clock_accuracy)
689     return 1;
690
691   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
692       b->grandmaster_clock_quality.offset_scaled_log_variance)
693     return -1;
694   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
695       b->grandmaster_clock_quality.offset_scaled_log_variance)
696     return 1;
697
698   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
699     return -1;
700   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
701     return 1;
702
703   if (a->grandmaster_identity < b->grandmaster_identity)
704     return -1;
705   else if (a->grandmaster_identity > b->grandmaster_identity)
706     return 1;
707   else
708     g_assert_not_reached ();
709
710   return 0;
711 }
712
713 static void
714 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
715 {
716   GList *qualified_messages = NULL;
717   GList *l, *m;
718   PtpAnnounceMessage *best = NULL;
719
720   /* IEEE 1588 9.3.2.5 */
721   for (l = domain->announce_senders; l; l = l->next) {
722     PtpAnnounceSender *sender = l->data;
723     GstClockTime window = 4 * sender->announce_interval;
724     gint count = 0;
725
726     for (m = sender->announce_messages.head; m; m = m->next) {
727       PtpAnnounceMessage *msg = m->data;
728
729       if (now - msg->receive_time <= window)
730         count++;
731     }
732
733     /* Only include the newest message of announce senders that had at least 2
734      * announce messages in the last 4 announce intervals. Which also means
735      * that we wait at least 4 announce intervals before we select a master
736      * clock. Until then we just report based on the newest SYNC we received
737      */
738     if (count >= 2) {
739       qualified_messages =
740           g_list_prepend (qualified_messages,
741           g_queue_peek_tail (&sender->announce_messages));
742     }
743   }
744
745   if (!qualified_messages) {
746     GST_DEBUG
747         ("No qualified announce messages for domain %u, can't select a master clock",
748         domain->domain);
749     domain->have_master_clock = FALSE;
750     return;
751   }
752
753   for (l = qualified_messages; l; l = l->next) {
754     PtpAnnounceMessage *msg = l->data;
755
756     if (!best || compare_announce_message (msg, best) < 0)
757       best = msg;
758   }
759
760   if (domain->have_master_clock
761       && compare_clock_identity (&domain->master_clock_identity,
762           &best->master_clock_identity) == 0) {
763     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
764   } else {
765     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
766         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
767         domain->domain, best->master_clock_identity.clock_identity,
768         best->master_clock_identity.port_number, best->grandmaster_identity);
769
770     domain->have_master_clock = TRUE;
771     domain->grandmaster_identity = best->grandmaster_identity;
772
773     /* Opportunistic master clock selection likely gave us the same master
774      * clock before, no need to reset all statistics */
775     if (compare_clock_identity (&domain->master_clock_identity,
776             &best->master_clock_identity) != 0) {
777       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
778           sizeof (PtpClockIdentity));
779       domain->mean_path_delay = 0;
780       domain->last_delay_req = 0;
781       domain->last_path_delays_missing = 9;
782       domain->min_delay_req_interval = 0;
783       domain->sync_interval = 0;
784       domain->last_ptp_sync_time = 0;
785       domain->skipped_updates = 0;
786       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
787           NULL);
788       g_queue_clear (&domain->pending_syncs);
789     }
790
791     if (g_atomic_int_get (&domain_stats_n_hooks)) {
792       GstStructure *stats =
793           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
794           "domain", G_TYPE_UINT, domain->domain,
795           "master-clock-id", G_TYPE_UINT64,
796           domain->master_clock_identity.clock_identity,
797           "master-clock-port", G_TYPE_UINT,
798           domain->master_clock_identity.port_number,
799           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
800           NULL);
801       emit_ptp_statistics (domain->domain, stats);
802       gst_structure_free (stats);
803     }
804   }
805 }
806
807 static void
808 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
809 {
810   GList *l;
811   PtpDomainData *domain = NULL;
812   PtpAnnounceSender *sender = NULL;
813   PtpAnnounceMessage *announce;
814
815   /* IEEE1588 9.3.2.2 e)
816    * Don't consider messages with the alternate master flag set
817    */
818   if ((msg->flag_field & 0x0100))
819     return;
820
821   /* IEEE 1588 9.3.2.5 d)
822    * Don't consider announce messages with steps_removed>=255
823    */
824   if (msg->message_specific.announce.steps_removed >= 255)
825     return;
826
827   for (l = domain_data; l; l = l->next) {
828     PtpDomainData *tmp = l->data;
829
830     if (tmp->domain == msg->domain_number) {
831       domain = tmp;
832       break;
833     }
834   }
835
836   if (!domain) {
837     gchar *clock_name;
838
839     domain = g_new0 (PtpDomainData, 1);
840     domain->domain = msg->domain_number;
841     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
842     domain->domain_clock =
843         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
844     g_free (clock_name);
845     g_queue_init (&domain->pending_syncs);
846     domain->last_path_delays_missing = 9;
847     domain_data = g_list_prepend (domain_data, domain);
848
849     g_mutex_lock (&domain_clocks_lock);
850     domain_clocks = g_list_prepend (domain_clocks, domain);
851     g_mutex_unlock (&domain_clocks_lock);
852
853     if (g_atomic_int_get (&domain_stats_n_hooks)) {
854       GstStructure *stats =
855           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
856           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
857           domain->domain_clock, NULL);
858       emit_ptp_statistics (domain->domain, stats);
859       gst_structure_free (stats);
860     }
861   }
862
863   for (l = domain->announce_senders; l; l = l->next) {
864     PtpAnnounceSender *tmp = l->data;
865
866     if (compare_clock_identity (&tmp->master_clock_identity,
867             &msg->source_port_identity) == 0) {
868       sender = tmp;
869       break;
870     }
871   }
872
873   if (!sender) {
874     sender = g_new0 (PtpAnnounceSender, 1);
875
876     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
877         sizeof (PtpClockIdentity));
878     g_queue_init (&sender->announce_messages);
879     domain->announce_senders =
880         g_list_prepend (domain->announce_senders, sender);
881   }
882
883   for (l = sender->announce_messages.head; l; l = l->next) {
884     PtpAnnounceMessage *tmp = l->data;
885
886     /* IEEE 1588 9.3.2.5 c)
887      * Don't consider identical messages, i.e. duplicates
888      */
889     if (tmp->sequence_id == msg->sequence_id)
890       return;
891   }
892
893   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
894
895   announce = g_new0 (PtpAnnounceMessage, 1);
896   announce->receive_time = receive_time;
897   announce->sequence_id = msg->sequence_id;
898   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
899       sizeof (PtpClockIdentity));
900   announce->grandmaster_identity =
901       msg->message_specific.announce.grandmaster_identity;
902   announce->grandmaster_priority_1 =
903       msg->message_specific.announce.grandmaster_priority_1;
904   announce->grandmaster_clock_quality.clock_class =
905       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
906   announce->grandmaster_clock_quality.clock_accuracy =
907       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
908   announce->grandmaster_clock_quality.offset_scaled_log_variance =
909       msg->message_specific.announce.
910       grandmaster_clock_quality.offset_scaled_log_variance;
911   announce->grandmaster_priority_2 =
912       msg->message_specific.announce.grandmaster_priority_2;
913   announce->steps_removed = msg->message_specific.announce.steps_removed;
914   announce->time_source = msg->message_specific.announce.time_source;
915   g_queue_push_tail (&sender->announce_messages, announce);
916
917   select_best_master_clock (domain, receive_time);
918 }
919
920 static gboolean
921 send_delay_req_timeout (PtpPendingSync * sync)
922 {
923   StdIOHeader header = { 0, };
924   guint8 delay_req[44];
925   GstByteWriter writer;
926   GIOStatus status;
927   gsize written;
928   GError *err = NULL;
929
930   header.type = TYPE_EVENT;
931   header.size = 44;
932
933   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
934   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
935   gst_byte_writer_put_uint8_unchecked (&writer, 2);
936   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
937   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
938   gst_byte_writer_put_uint8_unchecked (&writer, 0);
939   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
940   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
941   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
942   gst_byte_writer_put_uint64_be_unchecked (&writer,
943       ptp_clock_id.clock_identity);
944   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
945   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
946   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
947   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
948   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
949   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
950
951   status =
952       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
953       sizeof (header), &written, &err);
954   if (status == G_IO_STATUS_ERROR) {
955     g_warning ("Failed to write to stdout: %s", err->message);
956     return G_SOURCE_REMOVE;
957   } else if (status == G_IO_STATUS_EOF) {
958     g_message ("EOF on stdout");
959     g_main_loop_quit (main_loop);
960     return G_SOURCE_REMOVE;
961   } else if (status != G_IO_STATUS_NORMAL) {
962     g_warning ("Unexpected stdout write status: %d", status);
963     g_main_loop_quit (main_loop);
964     return G_SOURCE_REMOVE;
965   } else if (written != sizeof (header)) {
966     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
967     g_main_loop_quit (main_loop);
968     return G_SOURCE_REMOVE;
969   }
970
971   sync->delay_req_send_time_local = gst_util_get_timestamp ();
972
973   status =
974       g_io_channel_write_chars (stdout_channel,
975       (const gchar *) delay_req, 44, &written, &err);
976   if (status == G_IO_STATUS_ERROR) {
977     g_warning ("Failed to write to stdout: %s", err->message);
978     g_main_loop_quit (main_loop);
979     return G_SOURCE_REMOVE;
980   } else if (status == G_IO_STATUS_EOF) {
981     g_message ("EOF on stdout");
982     g_main_loop_quit (main_loop);
983     return G_SOURCE_REMOVE;
984   } else if (status != G_IO_STATUS_NORMAL) {
985     g_warning ("Unexpected stdout write status: %d", status);
986     g_main_loop_quit (main_loop);
987     return G_SOURCE_REMOVE;
988   } else if (written != 44) {
989     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
990     g_main_loop_quit (main_loop);
991     return G_SOURCE_REMOVE;
992   }
993
994   return G_SOURCE_REMOVE;
995 }
996
997 static gboolean
998 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
999 {
1000   GstClockTime now = gst_util_get_timestamp ();
1001   guint timeout;
1002   GSource *timeout_source;
1003
1004   if (domain->last_delay_req != 0
1005       && domain->last_delay_req + domain->min_delay_req_interval > now)
1006     return FALSE;
1007
1008   domain->last_delay_req = now;
1009   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1010
1011   /* IEEE 1588 9.5.11.2 */
1012   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1013     timeout = 0;
1014   else
1015     timeout =
1016         g_rand_int_range (delay_req_rand, 0,
1017         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1018
1019   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1020   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1021   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1022       sync, NULL);
1023   g_source_attach (timeout_source, main_context);
1024
1025   return TRUE;
1026 }
1027
1028 /* Filtering of outliers for RTT and time calculations inspired
1029  * by the code from gstnetclientclock.c
1030  */
1031 static void
1032 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1033 {
1034   GstClockTime internal_time, external_time, rate_num, rate_den;
1035   GstClockTime corrected_ptp_time, corrected_local_time;
1036   gdouble r_squared = 0.0;
1037   gboolean synced;
1038   GstClockTimeDiff discont = 0;
1039   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1040 #ifdef USE_MEASUREMENT_FILTERING
1041   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1042       orig_rate_den;
1043   GstClockTime new_estimated_ptp_time;
1044   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1045   gboolean now_synced;
1046 #endif
1047
1048 #ifdef USE_ONLY_SYNC_WITH_DELAY
1049   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE)
1050     return;
1051 #endif
1052
1053   /* IEEE 1588 11.2 */
1054   corrected_ptp_time =
1055       sync->sync_send_time_remote +
1056       (sync->correction_field_sync + 32768) / 65536;
1057   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1058
1059 #ifdef USE_MEASUREMENT_FILTERING
1060   /* We check this here and when updating the mean path delay, because
1061    * we can get here without a delay response too */
1062   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1063       && sync->follow_up_recv_time_local >
1064       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1065     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1066         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1067         GST_TIME_ARGS (sync->follow_up_recv_time_local),
1068         GST_TIME_ARGS (domain->mean_path_delay));
1069     synced = FALSE;
1070     goto out;
1071   }
1072 #endif
1073
1074   /* Set an initial local-remote relation */
1075   if (domain->last_ptp_time == 0)
1076     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1077         corrected_ptp_time, 1, 1);
1078
1079 #ifdef USE_MEASUREMENT_FILTERING
1080   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1081    * estimate with our present knowledge about the clock
1082    */
1083   /* Store what the clock produced as 'now' before this update */
1084   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1085       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1086   internal_time = orig_internal_time;
1087   external_time = orig_external_time;
1088   rate_num = orig_rate_num;
1089   rate_den = orig_rate_den;
1090
1091   /* 3/4 RTT window around the estimation */
1092   max_discont = domain->mean_path_delay * 3 / 2;
1093
1094   /* Check if the estimated sync time is inside our window */
1095   estimated_ptp_time_min = corrected_local_time - max_discont;
1096   estimated_ptp_time_min =
1097       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1098       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1099   estimated_ptp_time_max = corrected_local_time + max_discont;
1100   estimated_ptp_time_max =
1101       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1102       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1103
1104   synced = (estimated_ptp_time_min < corrected_ptp_time
1105       && corrected_ptp_time < estimated_ptp_time_max);
1106
1107   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1108       GST_TIME_FORMAT, domain->domain,
1109       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1110
1111   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1112       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1113       GST_TIME_ARGS (corrected_ptp_time),
1114       GST_TIME_ARGS (estimated_ptp_time_max));
1115
1116   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1117           corrected_local_time, corrected_ptp_time, &r_squared,
1118           &internal_time, &external_time, &rate_num, &rate_den)) {
1119     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1120
1121     /* Old estimated PTP time based on receive time and path delay */
1122     estimated_ptp_time = corrected_local_time;
1123     estimated_ptp_time =
1124         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1125         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1126         orig_external_time, orig_rate_num, orig_rate_den);
1127
1128     /* New estimated PTP time based on receive time and path delay */
1129     new_estimated_ptp_time = corrected_local_time;
1130     new_estimated_ptp_time =
1131         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1132         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1133         external_time, rate_num, rate_den);
1134
1135     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1136     if (synced && ABS (discont) > max_discont) {
1137       GstClockTimeDiff offset;
1138       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1139           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1140           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1141           GST_TIME_ARGS (max_discont));
1142       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1143         offset = max_discont - discont;
1144         if (-offset > external_time)
1145           external_time = 0;
1146         else
1147           external_time += offset;
1148       } else {                  /* Too large a backward step - add a +ve offset */
1149         offset = -(max_discont + discont);
1150         external_time += offset;
1151       }
1152
1153       discont += offset;
1154     } else {
1155       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1156           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1157           GST_TIME_ARGS (max_discont));
1158     }
1159
1160     /* Check if the estimated sync time is now (still) inside our window */
1161     estimated_ptp_time_min = corrected_local_time - max_discont;
1162     estimated_ptp_time_min =
1163         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1164         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1165         external_time, rate_num, rate_den);
1166     estimated_ptp_time_max = corrected_local_time + max_discont;
1167     estimated_ptp_time_max =
1168         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1169         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1170         external_time, rate_num, rate_den);
1171
1172     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1173         && corrected_ptp_time < estimated_ptp_time_max);
1174
1175     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1176         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1177         GST_TIME_ARGS (corrected_ptp_time),
1178         GST_TIME_ARGS (estimated_ptp_time_max));
1179
1180     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1181       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1182           internal_time, external_time, rate_num, rate_den);
1183       domain->skipped_updates = 0;
1184
1185       domain->last_ptp_time = corrected_ptp_time;
1186       domain->last_local_time = corrected_local_time;
1187     } else {
1188       domain->skipped_updates++;
1189     }
1190   } else {
1191     domain->last_ptp_time = corrected_ptp_time;
1192     domain->last_local_time = corrected_local_time;
1193   }
1194
1195 #else
1196   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1197       GST_TIME_FORMAT, domain->domain,
1198       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1199
1200   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1201       &internal_time, &external_time, &rate_num, &rate_den);
1202
1203   estimated_ptp_time = corrected_local_time;
1204   estimated_ptp_time =
1205       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1206       (domain->domain_clock), estimated_ptp_time, internal_time,
1207       external_time, rate_num, rate_den);
1208
1209   gst_clock_add_observation (domain->domain_clock,
1210       corrected_local_time, corrected_ptp_time, &r_squared);
1211
1212   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1213       &internal_time, &external_time, &rate_num, &rate_den);
1214
1215   synced = TRUE;
1216   domain->last_ptp_time = corrected_ptp_time;
1217   domain->last_local_time = corrected_local_time;
1218 #endif
1219
1220 #ifdef USE_MEASUREMENT_FILTERING
1221 out:
1222 #endif
1223   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1224     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1225         "domain", G_TYPE_UINT, domain->domain,
1226         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1227         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1228         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1229         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1230         "discontinuity", G_TYPE_INT64, discont,
1231         "synced", G_TYPE_BOOLEAN, synced,
1232         "r-squared", G_TYPE_DOUBLE, r_squared,
1233         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1234         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1235         "rate-num", G_TYPE_UINT64, rate_num,
1236         "rate-den", G_TYPE_UINT64, rate_den,
1237         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1238         NULL);
1239     emit_ptp_statistics (domain->domain, stats);
1240     gst_structure_free (stats);
1241   }
1242
1243 }
1244
1245 #ifdef USE_MEDIAN_PRE_FILTERING
1246 static gint
1247 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1248 {
1249   if (*a < *b)
1250     return -1;
1251   else if (*a > *b)
1252     return 1;
1253   return 0;
1254 }
1255 #endif
1256
1257 static gboolean
1258 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1259 {
1260 #ifdef USE_MEDIAN_PRE_FILTERING
1261   GstClockTime last_path_delays[G_N_ELEMENTS (domain->last_path_delays)];
1262   GstClockTime median;
1263   gint i;
1264 #endif
1265
1266   GstClockTime mean_path_delay, delay_req_delay = 0;
1267   gboolean ret;
1268
1269   /* IEEE 1588 11.3 */
1270   mean_path_delay =
1271       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1272       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1273       (sync->correction_field_sync + sync->correction_field_delay +
1274           32768) / 65536) / 2;
1275
1276 #ifdef USE_MEDIAN_PRE_FILTERING
1277   for (i = 1; i < G_N_ELEMENTS (domain->last_path_delays); i++)
1278     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1279   domain->last_path_delays[i - 1] = mean_path_delay;
1280
1281   if (domain->last_path_delays_missing) {
1282     domain->last_path_delays_missing--;
1283   } else {
1284     memcpy (&last_path_delays, &domain->last_path_delays,
1285         sizeof (last_path_delays));
1286     g_qsort_with_data (&last_path_delays,
1287         G_N_ELEMENTS (domain->last_path_delays), sizeof (GstClockTime),
1288         (GCompareDataFunc) compare_clock_time, NULL);
1289
1290     median = last_path_delays[G_N_ELEMENTS (last_path_delays) / 2];
1291
1292     /* FIXME: We might want to use something else here, like only allowing
1293      * things in the interquartile range, or also filtering away delays that
1294      * are too small compared to the median. This here worked well enough
1295      * in tests so far.
1296      */
1297     if (mean_path_delay > 2 * median) {
1298       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1299           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1300           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1301       ret = FALSE;
1302       goto out;
1303     }
1304   }
1305 #endif
1306
1307 #ifdef USE_RUNNING_AVERAGE_DELAY
1308   /* Track an average round trip time, for a bit of smoothing */
1309   /* Always update before discarding a sample, so genuine changes in
1310    * the network get picked up, eventually */
1311   if (domain->mean_path_delay == 0)
1312     domain->mean_path_delay = mean_path_delay;
1313   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1314     domain->mean_path_delay =
1315         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1316   else
1317     domain->mean_path_delay =
1318         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1319 #else
1320   domain->mean_path_delay = mean_path_delay;
1321 #endif
1322
1323 #ifdef USE_MEASUREMENT_FILTERING
1324   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1325       domain->mean_path_delay != 0
1326       && sync->follow_up_recv_time_local >
1327       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1328     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1329         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1330         GST_TIME_ARGS (sync->follow_up_recv_time_local -
1331             sync->sync_recv_time_local),
1332         GST_TIME_ARGS (domain->mean_path_delay));
1333     ret = FALSE;
1334     goto out;
1335   }
1336
1337   if (mean_path_delay > 2 * domain->mean_path_delay) {
1338     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1339         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1340         GST_TIME_ARGS (mean_path_delay),
1341         GST_TIME_ARGS (domain->mean_path_delay));
1342     ret = FALSE;
1343     goto out;
1344   }
1345 #endif
1346
1347   delay_req_delay =
1348       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1349
1350 #ifdef USE_MEASUREMENT_FILTERING
1351   /* delay_req_delay is a RTT, so 2 times the path delay */
1352   if (delay_req_delay > 4 * domain->mean_path_delay) {
1353     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1354         GST_TIME_FORMAT " > 4 * %" GST_TIME_FORMAT, domain->domain,
1355         GST_TIME_ARGS (delay_req_delay),
1356         GST_TIME_ARGS (domain->mean_path_delay));
1357     ret = FALSE;
1358     goto out;
1359   }
1360 #endif
1361
1362   ret = TRUE;
1363
1364   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1365       GST_TIME_FORMAT ")", domain->domain,
1366       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1367   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1368       domain->domain, GST_TIME_ARGS (delay_req_delay));
1369
1370 #ifdef USE_MEASUREMENT_FILTERING
1371 out:
1372 #endif
1373   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1374     GstStructure *stats =
1375         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1376         "domain", G_TYPE_UINT, domain->domain,
1377         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1378         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1379         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1380     emit_ptp_statistics (domain->domain, stats);
1381     gst_structure_free (stats);
1382   }
1383
1384   return ret;
1385 }
1386
1387 static void
1388 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1389 {
1390   GList *l;
1391   PtpDomainData *domain = NULL;
1392   PtpPendingSync *sync = NULL;
1393
1394   /* Don't consider messages with the alternate master flag set */
1395   if ((msg->flag_field & 0x0100))
1396     return;
1397
1398   for (l = domain_data; l; l = l->next) {
1399     PtpDomainData *tmp = l->data;
1400
1401     if (msg->domain_number == tmp->domain) {
1402       domain = tmp;
1403       break;
1404     }
1405   }
1406
1407   if (!domain) {
1408     gchar *clock_name;
1409
1410     domain = g_new0 (PtpDomainData, 1);
1411     domain->domain = msg->domain_number;
1412     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1413     domain->domain_clock =
1414         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1415     g_free (clock_name);
1416     g_queue_init (&domain->pending_syncs);
1417     domain->last_path_delays_missing = 9;
1418     domain_data = g_list_prepend (domain_data, domain);
1419
1420     g_mutex_lock (&domain_clocks_lock);
1421     domain_clocks = g_list_prepend (domain_clocks, domain);
1422     g_mutex_unlock (&domain_clocks_lock);
1423   }
1424
1425   /* If we have a master clock, ignore this message if it's not coming from there */
1426   if (domain->have_master_clock
1427       && compare_clock_identity (&domain->master_clock_identity,
1428           &msg->source_port_identity) != 0)
1429     return;
1430
1431 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1432   /* Opportunistic selection of master clock */
1433   if (!domain->have_master_clock)
1434     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1435         sizeof (PtpClockIdentity));
1436 #else
1437   if (!domain->have_master_clock)
1438     return;
1439 #endif
1440
1441   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1442
1443   /* Check if duplicated */
1444   for (l = domain->pending_syncs.head; l; l = l->next) {
1445     PtpPendingSync *tmp = l->data;
1446
1447     if (tmp->sync_seqnum == msg->sequence_id)
1448       return;
1449   }
1450
1451   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1452       GST_CLOCK_TIME_NONE / GST_SECOND) {
1453     GST_FIXME ("Unsupported sync message seconds field value: %"
1454         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1455         msg->message_specific.sync.origin_timestamp.seconds_field,
1456         GST_CLOCK_TIME_NONE / GST_SECOND);
1457     return;
1458   }
1459
1460   sync = g_new0 (PtpPendingSync, 1);
1461   sync->domain = domain->domain;
1462   sync->sync_seqnum = msg->sequence_id;
1463   sync->sync_recv_time_local = receive_time;
1464   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1465   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1466   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1467   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1468   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1469
1470   /* 0.5 correction factor for division later */
1471   sync->correction_field_sync = msg->correction_field;
1472
1473   if ((msg->flag_field & 0x0200)) {
1474     /* Wait for FOLLOW_UP */
1475   } else {
1476     sync->sync_send_time_remote =
1477         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1478         sync.origin_timestamp);
1479
1480     if (domain->last_ptp_sync_time != 0
1481         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1482       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1483           GST_TIME_FORMAT, domain->domain,
1484           GST_TIME_ARGS (domain->last_ptp_sync_time),
1485           GST_TIME_ARGS (sync->sync_send_time_remote));
1486       ptp_pending_sync_free (sync);
1487       sync = NULL;
1488       return;
1489     }
1490     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1491
1492     if (send_delay_req (domain, sync)) {
1493       /* Sent delay request */
1494     } else {
1495       update_ptp_time (domain, sync);
1496       ptp_pending_sync_free (sync);
1497       sync = NULL;
1498     }
1499   }
1500
1501   if (sync)
1502     g_queue_push_tail (&domain->pending_syncs, sync);
1503 }
1504
1505 static void
1506 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1507 {
1508   GList *l;
1509   PtpDomainData *domain = NULL;
1510   PtpPendingSync *sync = NULL;
1511
1512   /* Don't consider messages with the alternate master flag set */
1513   if ((msg->flag_field & 0x0100))
1514     return;
1515
1516   for (l = domain_data; l; l = l->next) {
1517     PtpDomainData *tmp = l->data;
1518
1519     if (msg->domain_number == tmp->domain) {
1520       domain = tmp;
1521       break;
1522     }
1523   }
1524
1525   if (!domain)
1526     return;
1527
1528   /* If we have a master clock, ignore this message if it's not coming from there */
1529   if (domain->have_master_clock
1530       && compare_clock_identity (&domain->master_clock_identity,
1531           &msg->source_port_identity) != 0)
1532     return;
1533
1534   /* Check if we know about this one */
1535   for (l = domain->pending_syncs.head; l; l = l->next) {
1536     PtpPendingSync *tmp = l->data;
1537
1538     if (tmp->sync_seqnum == msg->sequence_id) {
1539       sync = tmp;
1540       break;
1541     }
1542   }
1543
1544   if (!sync)
1545     return;
1546
1547   /* Got a FOLLOW_UP for this already */
1548   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE)
1549     return;
1550
1551   if (sync->sync_recv_time_local >= receive_time) {
1552     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1553         GST_TIME_FORMAT, domain->domain,
1554         GST_TIME_ARGS (sync->sync_recv_time_local),
1555         GST_TIME_ARGS (receive_time));
1556     g_queue_remove (&domain->pending_syncs, sync);
1557     ptp_pending_sync_free (sync);
1558     return;
1559   }
1560
1561   sync->correction_field_sync += msg->correction_field;
1562   sync->sync_send_time_remote =
1563       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1564       follow_up.precise_origin_timestamp);
1565   sync->follow_up_recv_time_local = receive_time;
1566
1567   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1568     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1569         GST_TIME_FORMAT, domain->domain,
1570         GST_TIME_ARGS (domain->last_ptp_sync_time),
1571         GST_TIME_ARGS (sync->sync_send_time_remote));
1572     g_queue_remove (&domain->pending_syncs, sync);
1573     ptp_pending_sync_free (sync);
1574     sync = NULL;
1575     return;
1576   }
1577   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1578
1579   if (send_delay_req (domain, sync)) {
1580     /* Sent delay request */
1581   } else {
1582     update_ptp_time (domain, sync);
1583     g_queue_remove (&domain->pending_syncs, sync);
1584     ptp_pending_sync_free (sync);
1585     sync = NULL;
1586   }
1587 }
1588
1589 static void
1590 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1591 {
1592   GList *l;
1593   PtpDomainData *domain = NULL;
1594   PtpPendingSync *sync = NULL;
1595
1596   /* Don't consider messages with the alternate master flag set */
1597   if ((msg->flag_field & 0x0100))
1598     return;
1599
1600   for (l = domain_data; l; l = l->next) {
1601     PtpDomainData *tmp = l->data;
1602
1603     if (msg->domain_number == tmp->domain) {
1604       domain = tmp;
1605       break;
1606     }
1607   }
1608
1609   if (!domain)
1610     return;
1611
1612   /* If we have a master clock, ignore this message if it's not coming from there */
1613   if (domain->have_master_clock
1614       && compare_clock_identity (&domain->master_clock_identity,
1615           &msg->source_port_identity) != 0)
1616     return;
1617
1618   /* Not for us */
1619   if (msg->message_specific.delay_resp.
1620       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1621       || msg->message_specific.delay_resp.
1622       requesting_port_identity.port_number != ptp_clock_id.port_number)
1623     return;
1624
1625   domain->min_delay_req_interval =
1626       log2_to_clock_time (msg->log_message_interval);
1627
1628   /* Check if we know about this one */
1629   for (l = domain->pending_syncs.head; l; l = l->next) {
1630     PtpPendingSync *tmp = l->data;
1631
1632     if (tmp->delay_req_seqnum == msg->sequence_id) {
1633       sync = tmp;
1634       break;
1635     }
1636   }
1637
1638   if (!sync)
1639     return;
1640
1641   /* Got a DELAY_RESP for this already */
1642   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1643     return;
1644
1645   if (sync->delay_req_send_time_local > receive_time) {
1646     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1647         GST_TIME_FORMAT, domain->domain,
1648         GST_TIME_ARGS (sync->delay_req_send_time_local),
1649         GST_TIME_ARGS (receive_time));
1650     g_queue_remove (&domain->pending_syncs, sync);
1651     ptp_pending_sync_free (sync);
1652     return;
1653   }
1654
1655   sync->correction_field_delay = msg->correction_field;
1656
1657   sync->delay_req_recv_time_remote =
1658       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1659       delay_resp.receive_timestamp);
1660   sync->delay_resp_recv_time_local = receive_time;
1661
1662   if (domain->mean_path_delay != 0
1663       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1664     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1665         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1666         GST_TIME_ARGS (sync->sync_send_time_remote),
1667         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1668     g_queue_remove (&domain->pending_syncs, sync);
1669     ptp_pending_sync_free (sync);
1670     return;
1671   }
1672
1673   if (update_mean_path_delay (domain, sync))
1674     update_ptp_time (domain, sync);
1675   g_queue_remove (&domain->pending_syncs, sync);
1676   ptp_pending_sync_free (sync);
1677 }
1678
1679 static void
1680 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1681 {
1682   /* Ignore our own messages */
1683   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1684       msg->source_port_identity.port_number == ptp_clock_id.port_number)
1685     return;
1686
1687   switch (msg->message_type) {
1688     case PTP_MESSAGE_TYPE_ANNOUNCE:
1689       handle_announce_message (msg, receive_time);
1690       break;
1691     case PTP_MESSAGE_TYPE_SYNC:
1692       handle_sync_message (msg, receive_time);
1693       break;
1694     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1695       handle_follow_up_message (msg, receive_time);
1696       break;
1697     case PTP_MESSAGE_TYPE_DELAY_RESP:
1698       handle_delay_resp_message (msg, receive_time);
1699       break;
1700     default:
1701       break;
1702   }
1703 }
1704
1705 static gboolean
1706 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1707     gpointer user_data)
1708 {
1709   GIOStatus status;
1710   StdIOHeader header;
1711   gchar buffer[8192];
1712   GError *err = NULL;
1713   gsize read;
1714
1715   if ((condition & G_IO_STATUS_EOF)) {
1716     GST_ERROR ("Got EOF on stdin");
1717     g_main_loop_quit (main_loop);
1718     return G_SOURCE_REMOVE;
1719   }
1720
1721   status =
1722       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1723       &read, &err);
1724   if (status == G_IO_STATUS_ERROR) {
1725     GST_ERROR ("Failed to read from stdin: %s", err->message);
1726     g_main_loop_quit (main_loop);
1727     return G_SOURCE_REMOVE;
1728   } else if (status == G_IO_STATUS_EOF) {
1729     GST_ERROR ("Got EOF on stdin");
1730     g_main_loop_quit (main_loop);
1731     return G_SOURCE_REMOVE;
1732   } else if (status != G_IO_STATUS_NORMAL) {
1733     GST_ERROR ("Unexpected stdin read status: %d", status);
1734     g_main_loop_quit (main_loop);
1735     return G_SOURCE_REMOVE;
1736   } else if (read != sizeof (header)) {
1737     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1738     g_main_loop_quit (main_loop);
1739     return G_SOURCE_REMOVE;
1740   } else if (header.size > 8192) {
1741     GST_ERROR ("Unexpected size: %u", header.size);
1742     g_main_loop_quit (main_loop);
1743     return G_SOURCE_REMOVE;
1744   }
1745
1746   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1747   if (status == G_IO_STATUS_ERROR) {
1748     GST_ERROR ("Failed to read from stdin: %s", err->message);
1749     g_main_loop_quit (main_loop);
1750     return G_SOURCE_REMOVE;
1751   } else if (status == G_IO_STATUS_EOF) {
1752     GST_ERROR ("EOF on stdin");
1753     g_main_loop_quit (main_loop);
1754     return G_SOURCE_REMOVE;
1755   } else if (status != G_IO_STATUS_NORMAL) {
1756     GST_ERROR ("Unexpected stdin read status: %d", status);
1757     g_main_loop_quit (main_loop);
1758     return G_SOURCE_REMOVE;
1759   } else if (read != header.size) {
1760     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1761     g_main_loop_quit (main_loop);
1762     return G_SOURCE_REMOVE;
1763   }
1764
1765   switch (header.type) {
1766     case TYPE_EVENT:
1767     case TYPE_GENERAL:{
1768       GstClockTime receive_time = gst_util_get_timestamp ();
1769       PtpMessage msg;
1770
1771       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1772         dump_ptp_message (&msg);
1773         handle_ptp_message (&msg, receive_time);
1774       }
1775       break;
1776     }
1777     default:
1778     case TYPE_CLOCK_ID:{
1779       if (header.size != 8) {
1780         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1781         g_main_loop_quit (main_loop);
1782         return G_SOURCE_REMOVE;
1783       }
1784       g_mutex_lock (&ptp_lock);
1785       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1786       ptp_clock_id.port_number = getpid ();
1787       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1788           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1789       g_cond_signal (&ptp_cond);
1790       g_mutex_unlock (&ptp_lock);
1791       break;
1792     }
1793   }
1794
1795   return G_SOURCE_CONTINUE;
1796 }
1797
1798 /* Cleanup all announce messages and announce message senders
1799  * that are timed out by now, and clean up all pending syncs
1800  * that are missing their FOLLOW_UP or DELAY_RESP */
1801 static gboolean
1802 cleanup_cb (gpointer data)
1803 {
1804   GstClockTime now = gst_util_get_timestamp ();
1805   GList *l, *m, *n;
1806
1807   for (l = domain_data; l; l = l->next) {
1808     PtpDomainData *domain = l->data;
1809
1810     for (n = domain->announce_senders; n;) {
1811       PtpAnnounceSender *sender = n->data;
1812       gboolean timed_out = TRUE;
1813
1814       /* Keep only 5 messages per sender around */
1815       while (g_queue_get_length (&sender->announce_messages) > 5) {
1816         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1817         g_free (msg);
1818       }
1819
1820       for (m = sender->announce_messages.head; m; m = m->next) {
1821         PtpAnnounceMessage *msg = m->data;
1822
1823         if (msg->receive_time +
1824             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1825           timed_out = FALSE;
1826           break;
1827         }
1828       }
1829
1830       if (timed_out) {
1831         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1832             sender->master_clock_identity.clock_identity,
1833             sender->master_clock_identity.port_number);
1834         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1835         g_queue_clear (&sender->announce_messages);
1836       }
1837
1838       if (g_queue_get_length (&sender->announce_messages) == 0) {
1839         GList *tmp = n->next;
1840
1841         if (compare_clock_identity (&sender->master_clock_identity,
1842                 &domain->master_clock_identity) == 0)
1843           GST_WARNING ("currently selected master clock timed out");
1844         g_free (sender);
1845         domain->announce_senders =
1846             g_list_delete_link (domain->announce_senders, n);
1847         n = tmp;
1848       } else {
1849         n = n->next;
1850       }
1851     }
1852     select_best_master_clock (domain, now);
1853
1854     /* Clean up any pending syncs */
1855     for (n = domain->pending_syncs.head; n;) {
1856       PtpPendingSync *sync = n->data;
1857       gboolean timed_out = FALSE;
1858
1859       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1860        * and pending delay reqs after 4 delay req intervals or 10 seconds
1861        */
1862       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1863           ((domain->min_delay_req_interval != 0
1864                   && sync->delay_req_send_time_local +
1865                   4 * domain->min_delay_req_interval < now)
1866               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1867         timed_out = TRUE;
1868       } else if ((domain->sync_interval != 0
1869               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
1870           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
1871         timed_out = TRUE;
1872       }
1873
1874       if (timed_out) {
1875         GList *tmp = n->next;
1876         ptp_pending_sync_free (sync);
1877         g_queue_delete_link (&domain->pending_syncs, n);
1878         n = tmp;
1879       } else {
1880         n = n->next;
1881       }
1882     }
1883   }
1884
1885   return G_SOURCE_CONTINUE;
1886 }
1887
1888 static gpointer
1889 ptp_helper_main (gpointer data)
1890 {
1891   GSource *cleanup_source;
1892
1893   GST_DEBUG ("Starting PTP helper loop");
1894
1895   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
1896   cleanup_source = g_timeout_source_new_seconds (5);
1897   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
1898   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
1899   g_source_attach (cleanup_source, main_context);
1900   g_source_unref (cleanup_source);
1901
1902   g_main_loop_run (main_loop);
1903   GST_DEBUG ("Stopped PTP helper loop");
1904
1905   g_mutex_lock (&ptp_lock);
1906   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
1907   ptp_clock_id.port_number = 0;
1908   initted = FALSE;
1909   g_cond_signal (&ptp_cond);
1910   g_mutex_unlock (&ptp_lock);
1911
1912   return NULL;
1913 }
1914
1915 /**
1916  * gst_ptp_is_supported:
1917  *
1918  * Check if PTP clocks are generally supported on this system, and if previous
1919  * initializations did not fail.
1920  *
1921  * Returns: %TRUE if PTP clocks are generally supported on this system, and
1922  * previous initializations did not fail.
1923  *
1924  * Since: 1.6
1925  */
1926 gboolean
1927 gst_ptp_is_supported (void)
1928 {
1929   return supported;
1930 }
1931
1932 /**
1933  * gst_ptp_is_initialized:
1934  *
1935  * Check if the GStreamer PTP clock subsystem is initialized.
1936  *
1937  * Returns: %TRUE if the GStreamer PTP clock subsystem is intialized.
1938  *
1939  * Since: 1.6
1940  */
1941 gboolean
1942 gst_ptp_is_initialized (void)
1943 {
1944   return initted;
1945 }
1946
1947 /**
1948  * gst_ptp_init:
1949  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
1950  * @interfaces: (transfer none) (array zero-terminated=1): network interfaces to run the clock on
1951  *
1952  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
1953  * slave-only mode for all domains on the given @interfaces with the
1954  * given @clock_id.
1955  *
1956  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
1957  * generated from the MAC address of the first network interface.
1958  *
1959  *
1960  * This function is automatically called by gst_ptp_clock_new() with default
1961  * parameters if it wasn't called before.
1962  *
1963  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
1964  *
1965  * Since: 1.6
1966  */
1967 gboolean
1968 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
1969 {
1970   gboolean ret;
1971   const gchar *env;
1972   gchar **argv = NULL;
1973   gint argc, argc_c;
1974   gint fd_r, fd_w;
1975   GError *err = NULL;
1976   GSource *stdin_source;
1977
1978   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
1979
1980   g_mutex_lock (&ptp_lock);
1981   if (!supported) {
1982     GST_ERROR ("PTP not supported");
1983     ret = FALSE;
1984     goto done;
1985   }
1986
1987   if (initted) {
1988     GST_DEBUG ("PTP already initialized");
1989     ret = TRUE;
1990     goto done;
1991   }
1992
1993   if (ptp_helper_pid) {
1994     GST_DEBUG ("PTP currently initializing");
1995     goto wait;
1996   }
1997
1998   if (!domain_stats_hooks_initted) {
1999     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2000     domain_stats_hooks_initted = TRUE;
2001   }
2002
2003   argc = 1;
2004   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2005     argc += 2;
2006   if (interfaces != NULL)
2007     argc += 2 * g_strv_length (interfaces);
2008
2009   argv = g_new0 (gchar *, argc + 2);
2010   argc_c = 0;
2011
2012   env = g_getenv ("GST_PTP_HELPER_1_0");
2013   if (env == NULL)
2014     env = g_getenv ("GST_PTP_HELPER");
2015   if (env != NULL && *env != '\0') {
2016     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2017     argv[argc_c++] = g_strdup (env);
2018   } else {
2019     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2020   }
2021
2022   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2023     argv[argc_c++] = g_strdup ("-c");
2024     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2025   }
2026
2027   if (interfaces != NULL) {
2028     gchar **ptr = interfaces;
2029
2030     while (*ptr) {
2031       argv[argc_c++] = g_strdup ("-i");
2032       argv[argc_c++] = g_strdup (*ptr);
2033       ptr++;
2034     }
2035   }
2036
2037   main_context = g_main_context_new ();
2038   main_loop = g_main_loop_new (main_context, FALSE);
2039
2040   ptp_helper_thread =
2041       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2042   if (!ptp_helper_thread) {
2043     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2044     g_clear_error (&err);
2045     ret = FALSE;
2046     goto done;
2047   }
2048
2049   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2050           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2051     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2052     g_clear_error (&err);
2053     ret = FALSE;
2054     supported = FALSE;
2055     goto done;
2056   }
2057
2058   stdin_channel = g_io_channel_unix_new (fd_r);
2059   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2060   g_io_channel_set_buffered (stdin_channel, FALSE);
2061   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2062   stdin_source =
2063       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2064   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2065   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2066       NULL);
2067   g_source_attach (stdin_source, main_context);
2068   g_source_unref (stdin_source);
2069
2070   /* Create stdout channel */
2071   stdout_channel = g_io_channel_unix_new (fd_w);
2072   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2073   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2074   g_io_channel_set_buffered (stdout_channel, FALSE);
2075
2076   delay_req_rand = g_rand_new ();
2077
2078   initted = TRUE;
2079
2080 wait:
2081   GST_DEBUG ("Waiting for PTP to be initialized");
2082
2083   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2084     g_cond_wait (&ptp_cond, &ptp_lock);
2085
2086   ret = initted;
2087   if (ret) {
2088     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2089         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2090   } else {
2091     GST_ERROR ("Failed to initialize");
2092     supported = FALSE;
2093   }
2094
2095 done:
2096   g_strfreev (argv);
2097
2098   if (!ret) {
2099     if (ptp_helper_pid) {
2100       kill (ptp_helper_pid, SIGKILL);
2101       waitpid (ptp_helper_pid, NULL, 0);
2102       g_spawn_close_pid (ptp_helper_pid);
2103     }
2104     ptp_helper_pid = 0;
2105
2106     if (stdin_channel)
2107       g_io_channel_unref (stdin_channel);
2108     stdin_channel = NULL;
2109     if (stdout_channel)
2110       g_io_channel_unref (stdout_channel);
2111     stdout_channel = NULL;
2112
2113     if (main_loop && ptp_helper_thread) {
2114       g_main_loop_quit (main_loop);
2115       g_thread_join (ptp_helper_thread);
2116     }
2117     ptp_helper_thread = NULL;
2118     if (main_loop)
2119       g_main_loop_unref (main_loop);
2120     main_loop = NULL;
2121     if (main_context)
2122       g_main_context_unref (main_context);
2123     main_context = NULL;
2124
2125     if (delay_req_rand)
2126       g_rand_free (delay_req_rand);
2127     delay_req_rand = NULL;
2128   }
2129
2130   g_mutex_unlock (&ptp_lock);
2131
2132   return ret;
2133 }
2134
2135 /**
2136  * gst_ptp_deinit:
2137  *
2138  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2139  * are any remaining GstPtpClock instances, they won't be further synchronized
2140  * to the PTP network clock.
2141  *
2142  * Since: 1.6
2143  */
2144 void
2145 gst_ptp_deinit (void)
2146 {
2147   GList *l, *m;
2148
2149   g_mutex_lock (&ptp_lock);
2150
2151   if (ptp_helper_pid) {
2152     kill (ptp_helper_pid, SIGKILL);
2153     waitpid (ptp_helper_pid, NULL, 0);
2154     g_spawn_close_pid (ptp_helper_pid);
2155   }
2156   ptp_helper_pid = 0;
2157
2158   if (stdin_channel)
2159     g_io_channel_unref (stdin_channel);
2160   stdin_channel = NULL;
2161   if (stdout_channel)
2162     g_io_channel_unref (stdout_channel);
2163   stdout_channel = NULL;
2164
2165   if (main_loop && ptp_helper_thread) {
2166     GThread *tmp = ptp_helper_thread;
2167     ptp_helper_thread = NULL;
2168     g_mutex_unlock (&ptp_lock);
2169     g_main_loop_quit (main_loop);
2170     g_thread_join (tmp);
2171     g_mutex_lock (&ptp_lock);
2172   }
2173   if (main_loop)
2174     g_main_loop_unref (main_loop);
2175   main_loop = NULL;
2176   if (main_context)
2177     g_main_context_unref (main_context);
2178   main_context = NULL;
2179
2180   if (delay_req_rand)
2181     g_rand_free (delay_req_rand);
2182   delay_req_rand = NULL;
2183
2184   for (l = domain_data; l; l = l->next) {
2185     PtpDomainData *domain = l->data;
2186
2187     for (m = domain->announce_senders; m; m = m->next) {
2188       PtpAnnounceSender *sender = m->data;
2189
2190       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2191       g_queue_clear (&sender->announce_messages);
2192       g_free (sender);
2193     }
2194     g_list_free (domain->announce_senders);
2195
2196     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2197         NULL);
2198     g_queue_clear (&domain->pending_syncs);
2199     gst_object_unref (domain->domain_clock);
2200     g_free (domain);
2201   }
2202   g_list_free (domain_data);
2203   domain_data = NULL;
2204   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2205   g_list_free (domain_clocks);
2206   domain_clocks = NULL;
2207
2208   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2209   ptp_clock_id.port_number = 0;
2210
2211   initted = FALSE;
2212
2213   g_mutex_unlock (&ptp_lock);
2214 }
2215
2216 #define DEFAULT_DOMAIN 0
2217
2218 enum
2219 {
2220   PROP_0,
2221   PROP_DOMAIN,
2222   PROP_INTERNAL_CLOCK
2223 };
2224
2225 #define GST_PTP_CLOCK_GET_PRIVATE(obj)  \
2226   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PTP_CLOCK, GstPtpClockPrivate))
2227
2228 struct _GstPtpClockPrivate
2229 {
2230   guint domain;
2231   GstClock *domain_clock;
2232   gulong domain_stats_id;
2233 };
2234
2235 #define gst_ptp_clock_parent_class parent_class
2236 G_DEFINE_TYPE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2237
2238 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2239     const GValue * value, GParamSpec * pspec);
2240 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2241     GValue * value, GParamSpec * pspec);
2242 static void gst_ptp_clock_finalize (GObject * object);
2243
2244 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2245
2246 static void
2247 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2248 {
2249   GObjectClass *gobject_class;
2250   GstClockClass *clock_class;
2251
2252   gobject_class = G_OBJECT_CLASS (klass);
2253   clock_class = GST_CLOCK_CLASS (klass);
2254
2255   g_type_class_add_private (klass, sizeof (GstPtpClockPrivate));
2256
2257   gobject_class->finalize = gst_ptp_clock_finalize;
2258   gobject_class->get_property = gst_ptp_clock_get_property;
2259   gobject_class->set_property = gst_ptp_clock_set_property;
2260
2261   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2262       g_param_spec_uint ("domain", "Domain",
2263           "The PTP domain", 0, G_MAXUINT8,
2264           DEFAULT_DOMAIN,
2265           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2266
2267   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2268       g_param_spec_object ("internal-clock", "Internal Clock",
2269           "Internal clock", GST_TYPE_CLOCK,
2270           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2271
2272   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2273 }
2274
2275 static void
2276 gst_ptp_clock_init (GstPtpClock * self)
2277 {
2278   GstPtpClockPrivate *priv;
2279
2280   self->priv = priv = GST_PTP_CLOCK_GET_PRIVATE (self);
2281
2282   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2283   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2284
2285   priv->domain = DEFAULT_DOMAIN;
2286 }
2287
2288 static gboolean
2289 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2290 {
2291   gboolean got_clock = TRUE;
2292
2293   if (G_UNLIKELY (!self->priv->domain_clock)) {
2294     g_mutex_lock (&domain_clocks_lock);
2295     if (!self->priv->domain_clock) {
2296       GList *l;
2297
2298       got_clock = FALSE;
2299
2300       for (l = domain_clocks; l; l = l->next) {
2301         PtpDomainData *clock_data = l->data;
2302
2303         if (clock_data->domain == self->priv->domain
2304             && clock_data->last_ptp_time != 0) {
2305           self->priv->domain_clock = clock_data->domain_clock;
2306           got_clock = TRUE;
2307           break;
2308         }
2309       }
2310     }
2311     g_mutex_unlock (&domain_clocks_lock);
2312     if (got_clock) {
2313       g_object_notify (G_OBJECT (self), "internal-clock");
2314       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2315     }
2316   }
2317
2318   return got_clock;
2319 }
2320
2321 static gboolean
2322 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2323     gpointer user_data)
2324 {
2325   GstPtpClock *self = user_data;
2326
2327   if (domain != self->priv->domain
2328       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2329     return TRUE;
2330
2331   /* Let's set our internal clock */
2332   if (!gst_ptp_clock_ensure_domain_clock (self))
2333     return TRUE;
2334
2335   self->priv->domain_stats_id = 0;
2336
2337   return FALSE;
2338 }
2339
2340 static void
2341 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2342     const GValue * value, GParamSpec * pspec)
2343 {
2344   GstPtpClock *self = GST_PTP_CLOCK (object);
2345
2346   switch (prop_id) {
2347     case PROP_DOMAIN:
2348       self->priv->domain = g_value_get_uint (value);
2349       gst_ptp_clock_ensure_domain_clock (self);
2350       if (!self->priv->domain_clock)
2351         self->priv->domain_stats_id =
2352             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2353             NULL);
2354       break;
2355     default:
2356       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2357       break;
2358   }
2359 }
2360
2361 static void
2362 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2363     GValue * value, GParamSpec * pspec)
2364 {
2365   GstPtpClock *self = GST_PTP_CLOCK (object);
2366
2367   switch (prop_id) {
2368     case PROP_DOMAIN:
2369       g_value_set_uint (value, self->priv->domain);
2370       break;
2371     case PROP_INTERNAL_CLOCK:
2372       gst_ptp_clock_ensure_domain_clock (self);
2373       g_value_set_object (value, self->priv->domain_clock);
2374       break;
2375     default:
2376       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2377       break;
2378   }
2379 }
2380
2381 static void
2382 gst_ptp_clock_finalize (GObject * object)
2383 {
2384   GstPtpClock *self = GST_PTP_CLOCK (object);
2385
2386   if (self->priv->domain_stats_id)
2387     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2388
2389   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2390 }
2391
2392 static GstClockTime
2393 gst_ptp_clock_get_internal_time (GstClock * clock)
2394 {
2395   GstPtpClock *self = GST_PTP_CLOCK (clock);
2396
2397   gst_ptp_clock_ensure_domain_clock (self);
2398
2399   if (!self->priv->domain_clock) {
2400     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2401         self->priv->domain);
2402     return GST_CLOCK_TIME_NONE;
2403   }
2404
2405   return gst_clock_get_time (self->priv->domain_clock);
2406 }
2407
2408 /**
2409  * gst_ptp_clock_new:
2410  * @name: Name of the clock
2411  * @domain: PTP domain
2412  *
2413  * Creates a new PTP clock instance that exports the PTP time of the master
2414  * clock in @domain. This clock can be slaved to other clocks as needed.
2415  *
2416  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2417  * default parameters.
2418  *
2419  *
2420  * This clock only returns valid timestamps after it received the first
2421  * times from the PTP master clock on the network. Once this happens the
2422  * GstPtpClock::internal-clock property will become non-NULL. You can
2423  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
2424  * gst_clock_is_synced().
2425  *
2426  * Since: 1.6
2427  */
2428 GstClock *
2429 gst_ptp_clock_new (const gchar * name, guint domain)
2430 {
2431   g_return_val_if_fail (name != NULL, NULL);
2432   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2433
2434   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2435     GST_ERROR ("Failed to initialize PTP");
2436     return NULL;
2437   }
2438
2439   return g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2440       NULL);
2441 }
2442
2443 typedef struct
2444 {
2445   guint8 domain;
2446   const GstStructure *stats;
2447 } DomainStatsMarshalData;
2448
2449 static void
2450 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2451 {
2452   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2453
2454   if (!callback (data->domain, data->stats, hook->data))
2455     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2456 }
2457
2458 static void
2459 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2460 {
2461   DomainStatsMarshalData data = { domain, stats };
2462
2463   g_mutex_lock (&ptp_lock);
2464   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2465       (GHookMarshaller) domain_stats_marshaller, &data);
2466   g_mutex_unlock (&ptp_lock);
2467 }
2468
2469 /**
2470  * gst_ptp_statistics_callback_add:
2471  * @callback: GstPtpStatisticsCallback to call
2472  * @user_data: Data to pass to the callback
2473  * @destroy_data: GDestroyNotify to destroy the data
2474  *
2475  * Installs a new statistics callback for gathering PTP statistics. See
2476  * GstPtpStatisticsCallback for a list of statistics that are provided.
2477  *
2478  * Returns: Id for the callback that can be passed to
2479  * gst_ptp_statistics_callback_remove()
2480  *
2481  * Since: 1.6
2482  */
2483 gulong
2484 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2485     gpointer user_data, GDestroyNotify destroy_data)
2486 {
2487   GHook *hook;
2488
2489   g_mutex_lock (&ptp_lock);
2490
2491   if (!domain_stats_hooks_initted) {
2492     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2493     domain_stats_hooks_initted = TRUE;
2494   }
2495
2496   hook = g_hook_alloc (&domain_stats_hooks);
2497   hook->func = callback;
2498   hook->data = user_data;
2499   hook->destroy = destroy_data;
2500   g_hook_prepend (&domain_stats_hooks, hook);
2501   g_atomic_int_add (&domain_stats_n_hooks, 1);
2502
2503   g_mutex_unlock (&ptp_lock);
2504
2505   return hook->hook_id;
2506 }
2507
2508 /**
2509  * gst_ptp_statistics_callback_remove:
2510  * @id: Callback id to remove
2511  *
2512  * Removes a PTP statistics callback that was previously added with
2513  * gst_ptp_statistics_callback_add().
2514  *
2515  * Since: 1.6
2516  */
2517 void
2518 gst_ptp_statistics_callback_remove (gulong id)
2519 {
2520   g_mutex_lock (&ptp_lock);
2521   if (g_hook_destroy (&domain_stats_hooks, id))
2522     g_atomic_int_add (&domain_stats_n_hooks, -1);
2523   g_mutex_unlock (&ptp_lock);
2524 }
2525
2526 #else /* HAVE_PTP */
2527
2528 GType
2529 gst_ptp_clock_get_type (void)
2530 {
2531   return G_TYPE_INVALID;
2532 }
2533
2534 gboolean
2535 gst_ptp_is_supported (void)
2536 {
2537   return FALSE;
2538 }
2539
2540 gboolean
2541 gst_ptp_is_initialized (void)
2542 {
2543   return FALSE;
2544 }
2545
2546 gboolean
2547 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
2548 {
2549   return FALSE;
2550 }
2551
2552 void
2553 gst_ptp_deinit (void)
2554 {
2555 }
2556
2557 GstClock *
2558 gst_ptp_clock_new (const gchar * name, guint domain)
2559 {
2560   return NULL;
2561 }
2562
2563 gulong
2564 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2565     gpointer user_data, GDestroyNotify destroy_data)
2566 {
2567   return 0;
2568 }
2569
2570 void
2571 gst_ptp_statistics_callback_remove (gulong id)
2572 {
2573   return;
2574 }
2575
2576 #endif