ptpclock: Fix error leak during failures
[platform/upstream/gstreamer.git] / libs / gst / net / gstptpclock.c
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @short_description: Special clock that synchronizes to a remote time
23  *                     provider via PTP (IEEE1588:2008).
24  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
25  *
26  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
27  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
28  * clock in some specific domain.
29  *
30  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
31  * a helper process to do the actual communication via the PTP ports. This is
32  * required as PTP listens on ports < 1024 and thus requires special
33  * privileges. Once this helper process is started, the main process will
34  * synchronize to all PTP domains that are detected on the selected
35  * interfaces.
36  *
37  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
38  * time from a master clock inside a specific PTP domain. This clock will only
39  * return valid timestamps once the timestamps in the PTP domain are known. To
40  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
41  * signal and gst_clock_is_synced().
42  *
43  *
44  * To gather statistics about the PTP clock synchronization,
45  * gst_ptp_statistics_callback_add() can be used. This gives the application
46  * the possibility to collect all kinds of statistics from the clock
47  * synchronization.
48  *
49  * Since: 1.6
50  *
51  */
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include "gstptpclock.h"
57
58 #include "gstptp_private.h"
59
60 #ifdef HAVE_SYS_WAIT_H
61 #include <sys/wait.h>
62 #endif
63 #ifdef G_OS_WIN32
64 #include <windows.h>
65 #endif
66 #include <sys/types.h>
67 #include <unistd.h>
68
69 #include <gst/base/base.h>
70
71 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
72 #define GST_CAT_DEFAULT (ptp_debug)
73
74 /* IEEE 1588 7.7.3.1 */
75 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
76
77 /* Use a running average for calculating the mean path delay instead
78  * of just using the last measurement. Enabling this helps in unreliable
79  * networks, like wifi, with often changing delays
80  *
81  * Undef for following IEEE1588-2008 by the letter
82  */
83 #define USE_RUNNING_AVERAGE_DELAY 1
84
85 /* Filter out any measurements that are above a certain threshold compared to
86  * previous measurements. Enabling this helps filtering out outliers that
87  * happen fairly often in unreliable networks, like wifi.
88  *
89  * Undef for following IEEE1588-2008 by the letter
90  */
91 #define USE_MEASUREMENT_FILTERING 1
92
93 /* Select the first clock from which we capture a SYNC message as the master
94  * clock of the domain until we are ready to run the best master clock
95  * algorithm. This allows faster syncing but might mean a change of the master
96  * clock in the beginning. As all clocks in a domain are supposed to use the
97  * same time, this shouldn't be much of a problem.
98  *
99  * Undef for following IEEE1588-2008 by the letter
100  */
101 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
102
103 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
104  * afterwards. This allows better synchronization in networks with varying
105  * delays, as for every other SYNC message we would have to assume that it's
106  * the average of what we saw before. But that might be completely off
107  */
108 #define USE_ONLY_SYNC_WITH_DELAY 1
109
110 /* Filter out delay measurements that are too far away from the median of the
111  * last delay measurements, currently those that are more than 2 times as big.
112  * This increases accuracy a lot on wifi.
113  */
114 #define USE_MEDIAN_PRE_FILTERING 1
115 #define MEDIAN_PRE_FILTERING_WINDOW 9
116
117 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
118 #define MAX_SKIPPED_UPDATES 5
119
120 typedef enum
121 {
122   PTP_MESSAGE_TYPE_SYNC = 0x0,
123   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
124   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
125   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
126   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
127   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
128   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
129   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
130   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
131   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
132 } PtpMessageType;
133
134 typedef struct
135 {
136   guint64 seconds_field;        /* 48 bits valid */
137   guint32 nanoseconds_field;
138 } PtpTimestamp;
139
140 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
141 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
142 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
143
144 typedef struct
145 {
146   guint64 clock_identity;
147   guint16 port_number;
148 } PtpClockIdentity;
149
150 static gint
151 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
152 {
153   if (a->clock_identity < b->clock_identity)
154     return -1;
155   else if (a->clock_identity > b->clock_identity)
156     return 1;
157
158   if (a->port_number < b->port_number)
159     return -1;
160   else if (a->port_number > b->port_number)
161     return 1;
162
163   return 0;
164 }
165
166 typedef struct
167 {
168   guint8 clock_class;
169   guint8 clock_accuracy;
170   guint16 offset_scaled_log_variance;
171 } PtpClockQuality;
172
173 typedef struct
174 {
175   guint8 transport_specific;
176   PtpMessageType message_type;
177   /* guint8 reserved; */
178   guint8 version_ptp;
179   guint16 message_length;
180   guint8 domain_number;
181   /* guint8 reserved; */
182   guint16 flag_field;
183   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
184   /* guint32 reserved; */
185   PtpClockIdentity source_port_identity;
186   guint16 sequence_id;
187   guint8 control_field;
188   gint8 log_message_interval;
189
190   union
191   {
192     struct
193     {
194       PtpTimestamp origin_timestamp;
195       gint16 current_utc_offset;
196       /* guint8 reserved; */
197       guint8 grandmaster_priority_1;
198       PtpClockQuality grandmaster_clock_quality;
199       guint8 grandmaster_priority_2;
200       guint64 grandmaster_identity;
201       guint16 steps_removed;
202       guint8 time_source;
203     } announce;
204
205     struct
206     {
207       PtpTimestamp origin_timestamp;
208     } sync;
209
210     struct
211     {
212       PtpTimestamp precise_origin_timestamp;
213     } follow_up;
214
215     struct
216     {
217       PtpTimestamp origin_timestamp;
218     } delay_req;
219
220     struct
221     {
222       PtpTimestamp receive_timestamp;
223       PtpClockIdentity requesting_port_identity;
224     } delay_resp;
225
226   } message_specific;
227 } PtpMessage;
228
229 static GMutex ptp_lock;
230 static GCond ptp_cond;
231 static gboolean initted = FALSE;
232 #ifdef HAVE_PTP
233 static gboolean supported = TRUE;
234 #else
235 static gboolean supported = FALSE;
236 #endif
237 static GPid ptp_helper_pid;
238 static GThread *ptp_helper_thread;
239 static GMainContext *main_context;
240 static GMainLoop *main_loop;
241 static GIOChannel *stdin_channel, *stdout_channel;
242 static GRand *delay_req_rand;
243 static GstClock *observation_system_clock;
244 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
245
246 typedef struct
247 {
248   GstClockTime receive_time;
249
250   PtpClockIdentity master_clock_identity;
251
252   guint8 grandmaster_priority_1;
253   PtpClockQuality grandmaster_clock_quality;
254   guint8 grandmaster_priority_2;
255   guint64 grandmaster_identity;
256   guint16 steps_removed;
257   guint8 time_source;
258
259   guint16 sequence_id;
260 } PtpAnnounceMessage;
261
262 typedef struct
263 {
264   PtpClockIdentity master_clock_identity;
265
266   GstClockTime announce_interval;       /* last interval we received */
267   GQueue announce_messages;
268 } PtpAnnounceSender;
269
270 typedef struct
271 {
272   guint domain;
273   PtpClockIdentity master_clock_identity;
274
275   guint16 sync_seqnum;
276   GstClockTime sync_recv_time_local;    /* t2 */
277   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
278   GstClockTime follow_up_recv_time_local;
279
280   GSource *timeout_source;
281   guint16 delay_req_seqnum;
282   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
283   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
284   GstClockTime delay_resp_recv_time_local;
285
286   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
287   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
288 } PtpPendingSync;
289
290 static void
291 ptp_pending_sync_free (PtpPendingSync * sync)
292 {
293   if (sync->timeout_source)
294     g_source_destroy (sync->timeout_source);
295   g_free (sync);
296 }
297
298 typedef struct
299 {
300   guint domain;
301
302   GstClockTime last_ptp_time;
303   GstClockTime last_local_time;
304   gint skipped_updates;
305
306   /* Used for selecting the master/grandmaster */
307   GList *announce_senders;
308
309   /* Last selected master clock */
310   gboolean have_master_clock;
311   PtpClockIdentity master_clock_identity;
312   guint64 grandmaster_identity;
313
314   /* Last SYNC or FOLLOW_UP timestamp we received */
315   GstClockTime last_ptp_sync_time;
316   GstClockTime sync_interval;
317
318   GstClockTime mean_path_delay;
319   GstClockTime last_delay_req, min_delay_req_interval;
320   guint16 last_delay_req_seqnum;
321
322   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
323   gint last_path_delays_missing;
324
325   GQueue pending_syncs;
326
327   GstClock *domain_clock;
328 } PtpDomainData;
329
330 static GList *domain_data;
331 static GMutex domain_clocks_lock;
332 static GList *domain_clocks;
333
334 /* Protected by PTP lock */
335 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
336 static GHookList domain_stats_hooks;
337 static gint domain_stats_n_hooks;
338 static gboolean domain_stats_hooks_initted = FALSE;
339
340 /* Converts log2 seconds to GstClockTime */
341 static GstClockTime
342 log2_to_clock_time (gint l)
343 {
344   if (l < 0)
345     return GST_SECOND >> (-l);
346   else
347     return GST_SECOND << l;
348 }
349
350 static void
351 dump_ptp_message (PtpMessage * msg)
352 {
353   GST_TRACE ("PTP message:");
354   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
355   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
356   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
357   GST_TRACE ("\tmessage_length: %u", msg->message_length);
358   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
359   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
360   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
361       (msg->correction_field / 65536),
362       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
363   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
364       msg->source_port_identity.clock_identity,
365       msg->source_port_identity.port_number);
366   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
367   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
368   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
369       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
370
371   switch (msg->message_type) {
372     case PTP_MESSAGE_TYPE_ANNOUNCE:
373       GST_TRACE ("\tANNOUNCE:");
374       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
375           msg->message_specific.announce.origin_timestamp.seconds_field,
376           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
377       GST_TRACE ("\t\tcurrent_utc_offset: %d",
378           msg->message_specific.announce.current_utc_offset);
379       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
380           msg->message_specific.announce.grandmaster_priority_1);
381       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
382           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
383           msg->message_specific.announce.
384           grandmaster_clock_quality.clock_accuracy,
385           msg->message_specific.announce.
386           grandmaster_clock_quality.offset_scaled_log_variance);
387       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
388           msg->message_specific.announce.grandmaster_priority_2);
389       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
390           msg->message_specific.announce.grandmaster_identity);
391       GST_TRACE ("\t\tsteps_removed: %u",
392           msg->message_specific.announce.steps_removed);
393       GST_TRACE ("\t\ttime_source: 0x%02x",
394           msg->message_specific.announce.time_source);
395       break;
396     case PTP_MESSAGE_TYPE_SYNC:
397       GST_TRACE ("\tSYNC:");
398       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
399           msg->message_specific.sync.origin_timestamp.seconds_field,
400           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
401       break;
402     case PTP_MESSAGE_TYPE_FOLLOW_UP:
403       GST_TRACE ("\tFOLLOW_UP:");
404       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
405           msg->message_specific.follow_up.
406           precise_origin_timestamp.seconds_field,
407           msg->message_specific.follow_up.
408           precise_origin_timestamp.nanoseconds_field);
409       break;
410     case PTP_MESSAGE_TYPE_DELAY_REQ:
411       GST_TRACE ("\tDELAY_REQ:");
412       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
413           msg->message_specific.delay_req.origin_timestamp.seconds_field,
414           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
415       break;
416     case PTP_MESSAGE_TYPE_DELAY_RESP:
417       GST_TRACE ("\tDELAY_RESP:");
418       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
419           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
420           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
421       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
422           "x %u",
423           msg->message_specific.delay_resp.
424           requesting_port_identity.clock_identity,
425           msg->message_specific.delay_resp.
426           requesting_port_identity.port_number);
427       break;
428     default:
429       break;
430   }
431   GST_TRACE (" ");
432 }
433
434 /* IEEE 1588-2008 5.3.3 */
435 static gboolean
436 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
437 {
438   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
439
440   timestamp->seconds_field =
441       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
442       gst_byte_reader_get_uint16_be_unchecked (reader);
443   timestamp->nanoseconds_field =
444       gst_byte_reader_get_uint32_be_unchecked (reader);
445
446   if (timestamp->nanoseconds_field >= 1000000000)
447     return FALSE;
448
449   return TRUE;
450 }
451
452 /* IEEE 1588-2008 13.3 */
453 static gboolean
454 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
455 {
456   guint8 b;
457
458   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
459
460   b = gst_byte_reader_get_uint8_unchecked (reader);
461   msg->transport_specific = b >> 4;
462   msg->message_type = b & 0x0f;
463
464   b = gst_byte_reader_get_uint8_unchecked (reader);
465   msg->version_ptp = b & 0x0f;
466   if (msg->version_ptp != 2) {
467     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
468     return FALSE;
469   }
470
471   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
472   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
473     GST_WARNING ("Not enough data (%u < %u)",
474         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
475     return FALSE;
476   }
477
478   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
479   gst_byte_reader_skip_unchecked (reader, 1);
480
481   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
482   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
483   gst_byte_reader_skip_unchecked (reader, 4);
484
485   msg->source_port_identity.clock_identity =
486       gst_byte_reader_get_uint64_be_unchecked (reader);
487   msg->source_port_identity.port_number =
488       gst_byte_reader_get_uint16_be_unchecked (reader);
489
490   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
491   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
492   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
493
494   return TRUE;
495 }
496
497 /* IEEE 1588-2008 13.5 */
498 static gboolean
499 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
500 {
501   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
502
503   if (gst_byte_reader_get_remaining (reader) < 20)
504     return FALSE;
505
506   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
507           reader))
508     return FALSE;
509
510   msg->message_specific.announce.current_utc_offset =
511       gst_byte_reader_get_uint16_be_unchecked (reader);
512   gst_byte_reader_skip_unchecked (reader, 1);
513
514   msg->message_specific.announce.grandmaster_priority_1 =
515       gst_byte_reader_get_uint8_unchecked (reader);
516   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
517       gst_byte_reader_get_uint8_unchecked (reader);
518   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
519       gst_byte_reader_get_uint8_unchecked (reader);
520   msg->message_specific.announce.
521       grandmaster_clock_quality.offset_scaled_log_variance =
522       gst_byte_reader_get_uint16_be_unchecked (reader);
523   msg->message_specific.announce.grandmaster_priority_2 =
524       gst_byte_reader_get_uint8_unchecked (reader);
525   msg->message_specific.announce.grandmaster_identity =
526       gst_byte_reader_get_uint64_be_unchecked (reader);
527   msg->message_specific.announce.steps_removed =
528       gst_byte_reader_get_uint16_be_unchecked (reader);
529   msg->message_specific.announce.time_source =
530       gst_byte_reader_get_uint8_unchecked (reader);
531
532   return TRUE;
533 }
534
535 /* IEEE 1588-2008 13.6 */
536 static gboolean
537 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
538 {
539   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
540
541   if (gst_byte_reader_get_remaining (reader) < 10)
542     return FALSE;
543
544   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
545           reader))
546     return FALSE;
547
548   return TRUE;
549 }
550
551 /* IEEE 1588-2008 13.6 */
552 static gboolean
553 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
554 {
555   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
556
557   if (gst_byte_reader_get_remaining (reader) < 10)
558     return FALSE;
559
560   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
561           reader))
562     return FALSE;
563
564   return TRUE;
565 }
566
567 /* IEEE 1588-2008 13.7 */
568 static gboolean
569 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
570 {
571   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
572
573   if (gst_byte_reader_get_remaining (reader) < 10)
574     return FALSE;
575
576   if (!parse_ptp_timestamp (&msg->message_specific.
577           follow_up.precise_origin_timestamp, reader))
578     return FALSE;
579
580   return TRUE;
581 }
582
583 /* IEEE 1588-2008 13.8 */
584 static gboolean
585 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
586 {
587   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
588       FALSE);
589
590   if (gst_byte_reader_get_remaining (reader) < 20)
591     return FALSE;
592
593   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
594           reader))
595     return FALSE;
596
597   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
598       gst_byte_reader_get_uint64_be_unchecked (reader);
599   msg->message_specific.delay_resp.requesting_port_identity.port_number =
600       gst_byte_reader_get_uint16_be_unchecked (reader);
601
602   return TRUE;
603 }
604
605 static gboolean
606 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
607 {
608   GstByteReader reader;
609   gboolean ret = FALSE;
610
611   gst_byte_reader_init (&reader, data, size);
612
613   if (!parse_ptp_message_header (msg, &reader)) {
614     GST_WARNING ("Failed to parse PTP message header");
615     return FALSE;
616   }
617
618   switch (msg->message_type) {
619     case PTP_MESSAGE_TYPE_SYNC:
620       ret = parse_ptp_message_sync (msg, &reader);
621       break;
622     case PTP_MESSAGE_TYPE_FOLLOW_UP:
623       ret = parse_ptp_message_follow_up (msg, &reader);
624       break;
625     case PTP_MESSAGE_TYPE_DELAY_REQ:
626       ret = parse_ptp_message_delay_req (msg, &reader);
627       break;
628     case PTP_MESSAGE_TYPE_DELAY_RESP:
629       ret = parse_ptp_message_delay_resp (msg, &reader);
630       break;
631     case PTP_MESSAGE_TYPE_ANNOUNCE:
632       ret = parse_ptp_message_announce (msg, &reader);
633       break;
634     default:
635       /* ignore for now */
636       break;
637   }
638
639   return ret;
640 }
641
642 static gint
643 compare_announce_message (const PtpAnnounceMessage * a,
644     const PtpAnnounceMessage * b)
645 {
646   /* IEEE 1588 Figure 27 */
647   if (a->grandmaster_identity == b->grandmaster_identity) {
648     if (a->steps_removed + 1 < b->steps_removed)
649       return -1;
650     else if (a->steps_removed > b->steps_removed + 1)
651       return 1;
652
653     /* Error cases are filtered out earlier */
654     if (a->steps_removed < b->steps_removed)
655       return -1;
656     else if (a->steps_removed > b->steps_removed)
657       return 1;
658
659     /* Error cases are filtered out earlier */
660     if (a->master_clock_identity.clock_identity <
661         b->master_clock_identity.clock_identity)
662       return -1;
663     else if (a->master_clock_identity.clock_identity >
664         b->master_clock_identity.clock_identity)
665       return 1;
666
667     /* Error cases are filtered out earlier */
668     if (a->master_clock_identity.port_number <
669         b->master_clock_identity.port_number)
670       return -1;
671     else if (a->master_clock_identity.port_number >
672         b->master_clock_identity.port_number)
673       return 1;
674     else
675       g_assert_not_reached ();
676
677     return 0;
678   }
679
680   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
681     return -1;
682   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
683     return 1;
684
685   if (a->grandmaster_clock_quality.clock_class <
686       b->grandmaster_clock_quality.clock_class)
687     return -1;
688   else if (a->grandmaster_clock_quality.clock_class >
689       b->grandmaster_clock_quality.clock_class)
690     return 1;
691
692   if (a->grandmaster_clock_quality.clock_accuracy <
693       b->grandmaster_clock_quality.clock_accuracy)
694     return -1;
695   else if (a->grandmaster_clock_quality.clock_accuracy >
696       b->grandmaster_clock_quality.clock_accuracy)
697     return 1;
698
699   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
700       b->grandmaster_clock_quality.offset_scaled_log_variance)
701     return -1;
702   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
703       b->grandmaster_clock_quality.offset_scaled_log_variance)
704     return 1;
705
706   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
707     return -1;
708   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
709     return 1;
710
711   if (a->grandmaster_identity < b->grandmaster_identity)
712     return -1;
713   else if (a->grandmaster_identity > b->grandmaster_identity)
714     return 1;
715   else
716     g_assert_not_reached ();
717
718   return 0;
719 }
720
721 static void
722 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
723 {
724   GList *qualified_messages = NULL;
725   GList *l, *m;
726   PtpAnnounceMessage *best = NULL;
727
728   /* IEEE 1588 9.3.2.5 */
729   for (l = domain->announce_senders; l; l = l->next) {
730     PtpAnnounceSender *sender = l->data;
731     GstClockTime window = 4 * sender->announce_interval;
732     gint count = 0;
733
734     for (m = sender->announce_messages.head; m; m = m->next) {
735       PtpAnnounceMessage *msg = m->data;
736
737       if (now - msg->receive_time <= window)
738         count++;
739     }
740
741     /* Only include the newest message of announce senders that had at least 2
742      * announce messages in the last 4 announce intervals. Which also means
743      * that we wait at least 4 announce intervals before we select a master
744      * clock. Until then we just report based on the newest SYNC we received
745      */
746     if (count >= 2) {
747       qualified_messages =
748           g_list_prepend (qualified_messages,
749           g_queue_peek_tail (&sender->announce_messages));
750     }
751   }
752
753   if (!qualified_messages) {
754     GST_DEBUG
755         ("No qualified announce messages for domain %u, can't select a master clock",
756         domain->domain);
757     domain->have_master_clock = FALSE;
758     return;
759   }
760
761   for (l = qualified_messages; l; l = l->next) {
762     PtpAnnounceMessage *msg = l->data;
763
764     if (!best || compare_announce_message (msg, best) < 0)
765       best = msg;
766   }
767
768   if (domain->have_master_clock
769       && compare_clock_identity (&domain->master_clock_identity,
770           &best->master_clock_identity) == 0) {
771     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
772   } else {
773     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
774         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
775         domain->domain, best->master_clock_identity.clock_identity,
776         best->master_clock_identity.port_number, best->grandmaster_identity);
777
778     domain->have_master_clock = TRUE;
779     domain->grandmaster_identity = best->grandmaster_identity;
780
781     /* Opportunistic master clock selection likely gave us the same master
782      * clock before, no need to reset all statistics */
783     if (compare_clock_identity (&domain->master_clock_identity,
784             &best->master_clock_identity) != 0) {
785       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
786           sizeof (PtpClockIdentity));
787       domain->mean_path_delay = 0;
788       domain->last_delay_req = 0;
789       domain->last_path_delays_missing = 9;
790       domain->min_delay_req_interval = 0;
791       domain->sync_interval = 0;
792       domain->last_ptp_sync_time = 0;
793       domain->skipped_updates = 0;
794       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
795           NULL);
796       g_queue_clear (&domain->pending_syncs);
797     }
798
799     if (g_atomic_int_get (&domain_stats_n_hooks)) {
800       GstStructure *stats =
801           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
802           "domain", G_TYPE_UINT, domain->domain,
803           "master-clock-id", G_TYPE_UINT64,
804           domain->master_clock_identity.clock_identity,
805           "master-clock-port", G_TYPE_UINT,
806           domain->master_clock_identity.port_number,
807           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
808           NULL);
809       emit_ptp_statistics (domain->domain, stats);
810       gst_structure_free (stats);
811     }
812   }
813 }
814
815 static void
816 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
817 {
818   GList *l;
819   PtpDomainData *domain = NULL;
820   PtpAnnounceSender *sender = NULL;
821   PtpAnnounceMessage *announce;
822
823   /* IEEE1588 9.3.2.2 e)
824    * Don't consider messages with the alternate master flag set
825    */
826   if ((msg->flag_field & 0x0100))
827     return;
828
829   /* IEEE 1588 9.3.2.5 d)
830    * Don't consider announce messages with steps_removed>=255
831    */
832   if (msg->message_specific.announce.steps_removed >= 255)
833     return;
834
835   for (l = domain_data; l; l = l->next) {
836     PtpDomainData *tmp = l->data;
837
838     if (tmp->domain == msg->domain_number) {
839       domain = tmp;
840       break;
841     }
842   }
843
844   if (!domain) {
845     gchar *clock_name;
846
847     domain = g_new0 (PtpDomainData, 1);
848     domain->domain = msg->domain_number;
849     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
850     domain->domain_clock =
851         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
852     g_free (clock_name);
853     g_queue_init (&domain->pending_syncs);
854     domain->last_path_delays_missing = 9;
855     domain_data = g_list_prepend (domain_data, domain);
856
857     g_mutex_lock (&domain_clocks_lock);
858     domain_clocks = g_list_prepend (domain_clocks, domain);
859     g_mutex_unlock (&domain_clocks_lock);
860
861     if (g_atomic_int_get (&domain_stats_n_hooks)) {
862       GstStructure *stats =
863           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
864           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
865           domain->domain_clock, NULL);
866       emit_ptp_statistics (domain->domain, stats);
867       gst_structure_free (stats);
868     }
869   }
870
871   for (l = domain->announce_senders; l; l = l->next) {
872     PtpAnnounceSender *tmp = l->data;
873
874     if (compare_clock_identity (&tmp->master_clock_identity,
875             &msg->source_port_identity) == 0) {
876       sender = tmp;
877       break;
878     }
879   }
880
881   if (!sender) {
882     sender = g_new0 (PtpAnnounceSender, 1);
883
884     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
885         sizeof (PtpClockIdentity));
886     g_queue_init (&sender->announce_messages);
887     domain->announce_senders =
888         g_list_prepend (domain->announce_senders, sender);
889   }
890
891   for (l = sender->announce_messages.head; l; l = l->next) {
892     PtpAnnounceMessage *tmp = l->data;
893
894     /* IEEE 1588 9.3.2.5 c)
895      * Don't consider identical messages, i.e. duplicates
896      */
897     if (tmp->sequence_id == msg->sequence_id)
898       return;
899   }
900
901   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
902
903   announce = g_new0 (PtpAnnounceMessage, 1);
904   announce->receive_time = receive_time;
905   announce->sequence_id = msg->sequence_id;
906   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
907       sizeof (PtpClockIdentity));
908   announce->grandmaster_identity =
909       msg->message_specific.announce.grandmaster_identity;
910   announce->grandmaster_priority_1 =
911       msg->message_specific.announce.grandmaster_priority_1;
912   announce->grandmaster_clock_quality.clock_class =
913       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
914   announce->grandmaster_clock_quality.clock_accuracy =
915       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
916   announce->grandmaster_clock_quality.offset_scaled_log_variance =
917       msg->message_specific.announce.
918       grandmaster_clock_quality.offset_scaled_log_variance;
919   announce->grandmaster_priority_2 =
920       msg->message_specific.announce.grandmaster_priority_2;
921   announce->steps_removed = msg->message_specific.announce.steps_removed;
922   announce->time_source = msg->message_specific.announce.time_source;
923   g_queue_push_tail (&sender->announce_messages, announce);
924
925   select_best_master_clock (domain, receive_time);
926 }
927
928 static gboolean
929 send_delay_req_timeout (PtpPendingSync * sync)
930 {
931   StdIOHeader header = { 0, };
932   guint8 delay_req[44];
933   GstByteWriter writer;
934   GIOStatus status;
935   gsize written;
936   GError *err = NULL;
937
938   header.type = TYPE_EVENT;
939   header.size = 44;
940
941   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
942   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
943   gst_byte_writer_put_uint8_unchecked (&writer, 2);
944   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
945   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
946   gst_byte_writer_put_uint8_unchecked (&writer, 0);
947   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
948   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
949   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
950   gst_byte_writer_put_uint64_be_unchecked (&writer,
951       ptp_clock_id.clock_identity);
952   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
953   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
954   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
955   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
956   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
957   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
958
959   status =
960       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
961       sizeof (header), &written, &err);
962   if (status == G_IO_STATUS_ERROR) {
963     g_warning ("Failed to write to stdout: %s", err->message);
964     g_clear_error (&err);
965     return G_SOURCE_REMOVE;
966   } else if (status == G_IO_STATUS_EOF) {
967     g_message ("EOF on stdout");
968     g_main_loop_quit (main_loop);
969     return G_SOURCE_REMOVE;
970   } else if (status != G_IO_STATUS_NORMAL) {
971     g_warning ("Unexpected stdout write status: %d", status);
972     g_main_loop_quit (main_loop);
973     return G_SOURCE_REMOVE;
974   } else if (written != sizeof (header)) {
975     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
976     g_main_loop_quit (main_loop);
977     return G_SOURCE_REMOVE;
978   }
979
980   sync->delay_req_send_time_local =
981       gst_clock_get_time (observation_system_clock);
982
983   status =
984       g_io_channel_write_chars (stdout_channel,
985       (const gchar *) delay_req, 44, &written, &err);
986   if (status == G_IO_STATUS_ERROR) {
987     g_warning ("Failed to write to stdout: %s", err->message);
988     g_clear_error (&err);
989     g_main_loop_quit (main_loop);
990     return G_SOURCE_REMOVE;
991   } else if (status == G_IO_STATUS_EOF) {
992     g_message ("EOF on stdout");
993     g_main_loop_quit (main_loop);
994     return G_SOURCE_REMOVE;
995   } else if (status != G_IO_STATUS_NORMAL) {
996     g_warning ("Unexpected stdout write status: %d", status);
997     g_main_loop_quit (main_loop);
998     return G_SOURCE_REMOVE;
999   } else if (written != 44) {
1000     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
1001     g_main_loop_quit (main_loop);
1002     return G_SOURCE_REMOVE;
1003   }
1004
1005   return G_SOURCE_REMOVE;
1006 }
1007
1008 static gboolean
1009 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
1010 {
1011   GstClockTime now = gst_clock_get_time (observation_system_clock);
1012   guint timeout;
1013   GSource *timeout_source;
1014
1015   if (domain->last_delay_req != 0
1016       && domain->last_delay_req + domain->min_delay_req_interval > now)
1017     return FALSE;
1018
1019   domain->last_delay_req = now;
1020   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1021
1022   /* IEEE 1588 9.5.11.2 */
1023   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1024     timeout = 0;
1025   else
1026     timeout =
1027         g_rand_int_range (delay_req_rand, 0,
1028         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1029
1030   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1031   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1032   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1033       sync, NULL);
1034   g_source_attach (timeout_source, main_context);
1035
1036   return TRUE;
1037 }
1038
1039 /* Filtering of outliers for RTT and time calculations inspired
1040  * by the code from gstnetclientclock.c
1041  */
1042 static void
1043 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1044 {
1045   GstClockTime internal_time, external_time, rate_num, rate_den;
1046   GstClockTime corrected_ptp_time, corrected_local_time;
1047   gdouble r_squared = 0.0;
1048   gboolean synced;
1049   GstClockTimeDiff discont = 0;
1050   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1051 #ifdef USE_MEASUREMENT_FILTERING
1052   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1053       orig_rate_den;
1054   GstClockTime new_estimated_ptp_time;
1055   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1056   gboolean now_synced;
1057 #endif
1058
1059 #ifdef USE_ONLY_SYNC_WITH_DELAY
1060   GstClockTime mean_path_delay;
1061
1062   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE)
1063     return;
1064
1065   /* IEEE 1588 11.3 */
1066   mean_path_delay =
1067       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1068       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1069       (sync->correction_field_sync + sync->correction_field_delay +
1070           32768) / 65536) / 2;
1071 #endif
1072
1073   /* IEEE 1588 11.2 */
1074   corrected_ptp_time =
1075       sync->sync_send_time_remote +
1076       (sync->correction_field_sync + 32768) / 65536;
1077
1078 #ifdef USE_ONLY_SYNC_WITH_DELAY
1079   corrected_local_time = sync->sync_recv_time_local - mean_path_delay;
1080 #else
1081   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1082 #endif
1083
1084 #ifdef USE_MEASUREMENT_FILTERING
1085   /* We check this here and when updating the mean path delay, because
1086    * we can get here without a delay response too */
1087   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1088       && sync->follow_up_recv_time_local >
1089       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1090     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1091         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1092         GST_TIME_ARGS (sync->follow_up_recv_time_local),
1093         GST_TIME_ARGS (domain->mean_path_delay));
1094     synced = FALSE;
1095     gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1096         &internal_time, &external_time, &rate_num, &rate_den);
1097     goto out;
1098   }
1099 #endif
1100
1101   /* Set an initial local-remote relation */
1102   if (domain->last_ptp_time == 0)
1103     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1104         corrected_ptp_time, 1, 1);
1105
1106 #ifdef USE_MEASUREMENT_FILTERING
1107   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1108    * estimate with our present knowledge about the clock
1109    */
1110   /* Store what the clock produced as 'now' before this update */
1111   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1112       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1113   internal_time = orig_internal_time;
1114   external_time = orig_external_time;
1115   rate_num = orig_rate_num;
1116   rate_den = orig_rate_den;
1117
1118   /* 3/4 RTT window around the estimation */
1119   max_discont = domain->mean_path_delay * 3 / 2;
1120
1121   /* Check if the estimated sync time is inside our window */
1122   estimated_ptp_time_min = corrected_local_time - max_discont;
1123   estimated_ptp_time_min =
1124       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1125       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1126   estimated_ptp_time_max = corrected_local_time + max_discont;
1127   estimated_ptp_time_max =
1128       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1129       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1130
1131   synced = (estimated_ptp_time_min < corrected_ptp_time
1132       && corrected_ptp_time < estimated_ptp_time_max);
1133
1134   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1135       GST_TIME_FORMAT, domain->domain,
1136       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1137
1138   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1139       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1140       GST_TIME_ARGS (corrected_ptp_time),
1141       GST_TIME_ARGS (estimated_ptp_time_max));
1142
1143   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1144           corrected_local_time, corrected_ptp_time, &r_squared,
1145           &internal_time, &external_time, &rate_num, &rate_den)) {
1146     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1147
1148     /* Old estimated PTP time based on receive time and path delay */
1149     estimated_ptp_time = corrected_local_time;
1150     estimated_ptp_time =
1151         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1152         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1153         orig_external_time, orig_rate_num, orig_rate_den);
1154
1155     /* New estimated PTP time based on receive time and path delay */
1156     new_estimated_ptp_time = corrected_local_time;
1157     new_estimated_ptp_time =
1158         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1159         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1160         external_time, rate_num, rate_den);
1161
1162     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1163     if (synced && ABS (discont) > max_discont) {
1164       GstClockTimeDiff offset;
1165       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1166           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1167           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1168           GST_TIME_ARGS (max_discont));
1169       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1170         offset = max_discont - discont;
1171         if (-offset > external_time)
1172           external_time = 0;
1173         else
1174           external_time += offset;
1175       } else {                  /* Too large a backward step - add a +ve offset */
1176         offset = -(max_discont + discont);
1177         external_time += offset;
1178       }
1179
1180       discont += offset;
1181     } else {
1182       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1183           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1184           GST_TIME_ARGS (max_discont));
1185     }
1186
1187     /* Check if the estimated sync time is now (still) inside our window */
1188     estimated_ptp_time_min = corrected_local_time - max_discont;
1189     estimated_ptp_time_min =
1190         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1191         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1192         external_time, rate_num, rate_den);
1193     estimated_ptp_time_max = corrected_local_time + max_discont;
1194     estimated_ptp_time_max =
1195         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1196         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1197         external_time, rate_num, rate_den);
1198
1199     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1200         && corrected_ptp_time < estimated_ptp_time_max);
1201
1202     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1203         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1204         GST_TIME_ARGS (corrected_ptp_time),
1205         GST_TIME_ARGS (estimated_ptp_time_max));
1206
1207     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1208       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1209           internal_time, external_time, rate_num, rate_den);
1210       domain->skipped_updates = 0;
1211
1212       domain->last_ptp_time = corrected_ptp_time;
1213       domain->last_local_time = corrected_local_time;
1214     } else {
1215       domain->skipped_updates++;
1216     }
1217   } else {
1218     domain->last_ptp_time = corrected_ptp_time;
1219     domain->last_local_time = corrected_local_time;
1220   }
1221
1222 #else
1223   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1224       GST_TIME_FORMAT, domain->domain,
1225       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1226
1227   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1228       &internal_time, &external_time, &rate_num, &rate_den);
1229
1230   estimated_ptp_time = corrected_local_time;
1231   estimated_ptp_time =
1232       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1233       (domain->domain_clock), estimated_ptp_time, internal_time,
1234       external_time, rate_num, rate_den);
1235
1236   gst_clock_add_observation (domain->domain_clock,
1237       corrected_local_time, corrected_ptp_time, &r_squared);
1238
1239   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1240       &internal_time, &external_time, &rate_num, &rate_den);
1241
1242   synced = TRUE;
1243   domain->last_ptp_time = corrected_ptp_time;
1244   domain->last_local_time = corrected_local_time;
1245 #endif
1246
1247 #ifdef USE_MEASUREMENT_FILTERING
1248 out:
1249 #endif
1250   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1251     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1252         "domain", G_TYPE_UINT, domain->domain,
1253         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1254         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1255         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1256         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1257         "discontinuity", G_TYPE_INT64, discont,
1258         "synced", G_TYPE_BOOLEAN, synced,
1259         "r-squared", G_TYPE_DOUBLE, r_squared,
1260         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1261         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1262         "rate-num", G_TYPE_UINT64, rate_num,
1263         "rate-den", G_TYPE_UINT64, rate_den,
1264         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1265         NULL);
1266     emit_ptp_statistics (domain->domain, stats);
1267     gst_structure_free (stats);
1268   }
1269
1270 }
1271
1272 #ifdef USE_MEDIAN_PRE_FILTERING
1273 static gint
1274 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1275 {
1276   if (*a < *b)
1277     return -1;
1278   else if (*a > *b)
1279     return 1;
1280   return 0;
1281 }
1282 #endif
1283
1284 static gboolean
1285 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1286 {
1287 #ifdef USE_MEDIAN_PRE_FILTERING
1288   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
1289   GstClockTime median;
1290   gint i;
1291 #endif
1292
1293   GstClockTime mean_path_delay, delay_req_delay = 0;
1294   gboolean ret;
1295
1296   /* IEEE 1588 11.3 */
1297   mean_path_delay =
1298       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1299       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1300       (sync->correction_field_sync + sync->correction_field_delay +
1301           32768) / 65536) / 2;
1302
1303 #ifdef USE_MEDIAN_PRE_FILTERING
1304   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
1305     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1306   domain->last_path_delays[i - 1] = mean_path_delay;
1307
1308   if (domain->last_path_delays_missing) {
1309     domain->last_path_delays_missing--;
1310   } else {
1311     memcpy (&last_path_delays, &domain->last_path_delays,
1312         sizeof (last_path_delays));
1313     g_qsort_with_data (&last_path_delays,
1314         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
1315         (GCompareDataFunc) compare_clock_time, NULL);
1316
1317     median = last_path_delays[MEDIAN_PRE_FILTERING_WINDOW / 2];
1318
1319     /* FIXME: We might want to use something else here, like only allowing
1320      * things in the interquartile range, or also filtering away delays that
1321      * are too small compared to the median. This here worked well enough
1322      * in tests so far.
1323      */
1324     if (mean_path_delay > 2 * median) {
1325       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1326           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1327           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1328       ret = FALSE;
1329       goto out;
1330     }
1331   }
1332 #endif
1333
1334 #ifdef USE_RUNNING_AVERAGE_DELAY
1335   /* Track an average round trip time, for a bit of smoothing */
1336   /* Always update before discarding a sample, so genuine changes in
1337    * the network get picked up, eventually */
1338   if (domain->mean_path_delay == 0)
1339     domain->mean_path_delay = mean_path_delay;
1340   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1341     domain->mean_path_delay =
1342         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1343   else
1344     domain->mean_path_delay =
1345         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1346 #else
1347   domain->mean_path_delay = mean_path_delay;
1348 #endif
1349
1350 #ifdef USE_MEASUREMENT_FILTERING
1351   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1352       domain->mean_path_delay != 0
1353       && sync->follow_up_recv_time_local >
1354       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1355     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1356         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1357         GST_TIME_ARGS (sync->follow_up_recv_time_local -
1358             sync->sync_recv_time_local),
1359         GST_TIME_ARGS (domain->mean_path_delay));
1360     ret = FALSE;
1361     goto out;
1362   }
1363
1364   if (mean_path_delay > 2 * domain->mean_path_delay) {
1365     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1366         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1367         GST_TIME_ARGS (mean_path_delay),
1368         GST_TIME_ARGS (domain->mean_path_delay));
1369     ret = FALSE;
1370     goto out;
1371   }
1372 #endif
1373
1374   delay_req_delay =
1375       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1376
1377 #ifdef USE_MEASUREMENT_FILTERING
1378   /* delay_req_delay is a RTT, so 2 times the path delay */
1379   if (delay_req_delay > 4 * domain->mean_path_delay) {
1380     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1381         GST_TIME_FORMAT " > 4 * %" GST_TIME_FORMAT, domain->domain,
1382         GST_TIME_ARGS (delay_req_delay),
1383         GST_TIME_ARGS (domain->mean_path_delay));
1384     ret = FALSE;
1385     goto out;
1386   }
1387 #endif
1388
1389   ret = TRUE;
1390
1391   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1392       GST_TIME_FORMAT ")", domain->domain,
1393       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1394   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1395       domain->domain, GST_TIME_ARGS (delay_req_delay));
1396
1397 #ifdef USE_MEASUREMENT_FILTERING
1398 out:
1399 #endif
1400   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1401     GstStructure *stats =
1402         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1403         "domain", G_TYPE_UINT, domain->domain,
1404         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1405         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1406         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1407     emit_ptp_statistics (domain->domain, stats);
1408     gst_structure_free (stats);
1409   }
1410
1411   return ret;
1412 }
1413
1414 static void
1415 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1416 {
1417   GList *l;
1418   PtpDomainData *domain = NULL;
1419   PtpPendingSync *sync = NULL;
1420
1421   /* Don't consider messages with the alternate master flag set */
1422   if ((msg->flag_field & 0x0100))
1423     return;
1424
1425   for (l = domain_data; l; l = l->next) {
1426     PtpDomainData *tmp = l->data;
1427
1428     if (msg->domain_number == tmp->domain) {
1429       domain = tmp;
1430       break;
1431     }
1432   }
1433
1434   if (!domain) {
1435     gchar *clock_name;
1436
1437     domain = g_new0 (PtpDomainData, 1);
1438     domain->domain = msg->domain_number;
1439     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1440     domain->domain_clock =
1441         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1442     g_free (clock_name);
1443     g_queue_init (&domain->pending_syncs);
1444     domain->last_path_delays_missing = 9;
1445     domain_data = g_list_prepend (domain_data, domain);
1446
1447     g_mutex_lock (&domain_clocks_lock);
1448     domain_clocks = g_list_prepend (domain_clocks, domain);
1449     g_mutex_unlock (&domain_clocks_lock);
1450   }
1451
1452   /* If we have a master clock, ignore this message if it's not coming from there */
1453   if (domain->have_master_clock
1454       && compare_clock_identity (&domain->master_clock_identity,
1455           &msg->source_port_identity) != 0)
1456     return;
1457
1458 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1459   /* Opportunistic selection of master clock */
1460   if (!domain->have_master_clock)
1461     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1462         sizeof (PtpClockIdentity));
1463 #else
1464   if (!domain->have_master_clock)
1465     return;
1466 #endif
1467
1468   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1469
1470   /* Check if duplicated */
1471   for (l = domain->pending_syncs.head; l; l = l->next) {
1472     PtpPendingSync *tmp = l->data;
1473
1474     if (tmp->sync_seqnum == msg->sequence_id)
1475       return;
1476   }
1477
1478   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1479       GST_CLOCK_TIME_NONE / GST_SECOND) {
1480     GST_FIXME ("Unsupported sync message seconds field value: %"
1481         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1482         msg->message_specific.sync.origin_timestamp.seconds_field,
1483         GST_CLOCK_TIME_NONE / GST_SECOND);
1484     return;
1485   }
1486
1487   sync = g_new0 (PtpPendingSync, 1);
1488   sync->domain = domain->domain;
1489   sync->sync_seqnum = msg->sequence_id;
1490   sync->sync_recv_time_local = receive_time;
1491   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1492   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1493   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1494   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1495   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1496
1497   /* 0.5 correction factor for division later */
1498   sync->correction_field_sync = msg->correction_field;
1499
1500   if ((msg->flag_field & 0x0200)) {
1501     /* Wait for FOLLOW_UP */
1502   } else {
1503     sync->sync_send_time_remote =
1504         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1505         sync.origin_timestamp);
1506
1507     if (domain->last_ptp_sync_time != 0
1508         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1509       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1510           GST_TIME_FORMAT, domain->domain,
1511           GST_TIME_ARGS (domain->last_ptp_sync_time),
1512           GST_TIME_ARGS (sync->sync_send_time_remote));
1513       ptp_pending_sync_free (sync);
1514       sync = NULL;
1515       return;
1516     }
1517     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1518
1519     if (send_delay_req (domain, sync)) {
1520       /* Sent delay request */
1521     } else {
1522       update_ptp_time (domain, sync);
1523       ptp_pending_sync_free (sync);
1524       sync = NULL;
1525     }
1526   }
1527
1528   if (sync)
1529     g_queue_push_tail (&domain->pending_syncs, sync);
1530 }
1531
1532 static void
1533 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1534 {
1535   GList *l;
1536   PtpDomainData *domain = NULL;
1537   PtpPendingSync *sync = NULL;
1538
1539   /* Don't consider messages with the alternate master flag set */
1540   if ((msg->flag_field & 0x0100))
1541     return;
1542
1543   for (l = domain_data; l; l = l->next) {
1544     PtpDomainData *tmp = l->data;
1545
1546     if (msg->domain_number == tmp->domain) {
1547       domain = tmp;
1548       break;
1549     }
1550   }
1551
1552   if (!domain)
1553     return;
1554
1555   /* If we have a master clock, ignore this message if it's not coming from there */
1556   if (domain->have_master_clock
1557       && compare_clock_identity (&domain->master_clock_identity,
1558           &msg->source_port_identity) != 0)
1559     return;
1560
1561   /* Check if we know about this one */
1562   for (l = domain->pending_syncs.head; l; l = l->next) {
1563     PtpPendingSync *tmp = l->data;
1564
1565     if (tmp->sync_seqnum == msg->sequence_id) {
1566       sync = tmp;
1567       break;
1568     }
1569   }
1570
1571   if (!sync)
1572     return;
1573
1574   /* Got a FOLLOW_UP for this already */
1575   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE)
1576     return;
1577
1578   if (sync->sync_recv_time_local >= receive_time) {
1579     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1580         GST_TIME_FORMAT, domain->domain,
1581         GST_TIME_ARGS (sync->sync_recv_time_local),
1582         GST_TIME_ARGS (receive_time));
1583     g_queue_remove (&domain->pending_syncs, sync);
1584     ptp_pending_sync_free (sync);
1585     return;
1586   }
1587
1588   sync->correction_field_sync += msg->correction_field;
1589   sync->sync_send_time_remote =
1590       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1591       follow_up.precise_origin_timestamp);
1592   sync->follow_up_recv_time_local = receive_time;
1593
1594   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1595     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1596         GST_TIME_FORMAT, domain->domain,
1597         GST_TIME_ARGS (domain->last_ptp_sync_time),
1598         GST_TIME_ARGS (sync->sync_send_time_remote));
1599     g_queue_remove (&domain->pending_syncs, sync);
1600     ptp_pending_sync_free (sync);
1601     sync = NULL;
1602     return;
1603   }
1604   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1605
1606   if (send_delay_req (domain, sync)) {
1607     /* Sent delay request */
1608   } else {
1609     update_ptp_time (domain, sync);
1610     g_queue_remove (&domain->pending_syncs, sync);
1611     ptp_pending_sync_free (sync);
1612     sync = NULL;
1613   }
1614 }
1615
1616 static void
1617 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1618 {
1619   GList *l;
1620   PtpDomainData *domain = NULL;
1621   PtpPendingSync *sync = NULL;
1622
1623   /* Don't consider messages with the alternate master flag set */
1624   if ((msg->flag_field & 0x0100))
1625     return;
1626
1627   for (l = domain_data; l; l = l->next) {
1628     PtpDomainData *tmp = l->data;
1629
1630     if (msg->domain_number == tmp->domain) {
1631       domain = tmp;
1632       break;
1633     }
1634   }
1635
1636   if (!domain)
1637     return;
1638
1639   /* If we have a master clock, ignore this message if it's not coming from there */
1640   if (domain->have_master_clock
1641       && compare_clock_identity (&domain->master_clock_identity,
1642           &msg->source_port_identity) != 0)
1643     return;
1644
1645   /* Not for us */
1646   if (msg->message_specific.delay_resp.
1647       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1648       || msg->message_specific.delay_resp.
1649       requesting_port_identity.port_number != ptp_clock_id.port_number)
1650     return;
1651
1652   domain->min_delay_req_interval =
1653       log2_to_clock_time (msg->log_message_interval);
1654
1655   /* Check if we know about this one */
1656   for (l = domain->pending_syncs.head; l; l = l->next) {
1657     PtpPendingSync *tmp = l->data;
1658
1659     if (tmp->delay_req_seqnum == msg->sequence_id) {
1660       sync = tmp;
1661       break;
1662     }
1663   }
1664
1665   if (!sync)
1666     return;
1667
1668   /* Got a DELAY_RESP for this already */
1669   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1670     return;
1671
1672   if (sync->delay_req_send_time_local > receive_time) {
1673     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1674         GST_TIME_FORMAT, domain->domain,
1675         GST_TIME_ARGS (sync->delay_req_send_time_local),
1676         GST_TIME_ARGS (receive_time));
1677     g_queue_remove (&domain->pending_syncs, sync);
1678     ptp_pending_sync_free (sync);
1679     return;
1680   }
1681
1682   sync->correction_field_delay = msg->correction_field;
1683
1684   sync->delay_req_recv_time_remote =
1685       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1686       delay_resp.receive_timestamp);
1687   sync->delay_resp_recv_time_local = receive_time;
1688
1689   if (domain->mean_path_delay != 0
1690       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1691     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1692         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1693         GST_TIME_ARGS (sync->sync_send_time_remote),
1694         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1695     g_queue_remove (&domain->pending_syncs, sync);
1696     ptp_pending_sync_free (sync);
1697     return;
1698   }
1699
1700   if (update_mean_path_delay (domain, sync))
1701     update_ptp_time (domain, sync);
1702   g_queue_remove (&domain->pending_syncs, sync);
1703   ptp_pending_sync_free (sync);
1704 }
1705
1706 static void
1707 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1708 {
1709   /* Ignore our own messages */
1710   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1711       msg->source_port_identity.port_number == ptp_clock_id.port_number)
1712     return;
1713
1714   switch (msg->message_type) {
1715     case PTP_MESSAGE_TYPE_ANNOUNCE:
1716       handle_announce_message (msg, receive_time);
1717       break;
1718     case PTP_MESSAGE_TYPE_SYNC:
1719       handle_sync_message (msg, receive_time);
1720       break;
1721     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1722       handle_follow_up_message (msg, receive_time);
1723       break;
1724     case PTP_MESSAGE_TYPE_DELAY_RESP:
1725       handle_delay_resp_message (msg, receive_time);
1726       break;
1727     default:
1728       break;
1729   }
1730 }
1731
1732 static gboolean
1733 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1734     gpointer user_data)
1735 {
1736   GIOStatus status;
1737   StdIOHeader header;
1738   gchar buffer[8192];
1739   GError *err = NULL;
1740   gsize read;
1741
1742   if ((condition & G_IO_STATUS_EOF)) {
1743     GST_ERROR ("Got EOF on stdin");
1744     g_main_loop_quit (main_loop);
1745     return G_SOURCE_REMOVE;
1746   }
1747
1748   status =
1749       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1750       &read, &err);
1751   if (status == G_IO_STATUS_ERROR) {
1752     GST_ERROR ("Failed to read from stdin: %s", err->message);
1753     g_clear_error (&err);
1754     g_main_loop_quit (main_loop);
1755     return G_SOURCE_REMOVE;
1756   } else if (status == G_IO_STATUS_EOF) {
1757     GST_ERROR ("Got EOF on stdin");
1758     g_main_loop_quit (main_loop);
1759     return G_SOURCE_REMOVE;
1760   } else if (status != G_IO_STATUS_NORMAL) {
1761     GST_ERROR ("Unexpected stdin read status: %d", status);
1762     g_main_loop_quit (main_loop);
1763     return G_SOURCE_REMOVE;
1764   } else if (read != sizeof (header)) {
1765     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1766     g_main_loop_quit (main_loop);
1767     return G_SOURCE_REMOVE;
1768   } else if (header.size > 8192) {
1769     GST_ERROR ("Unexpected size: %u", header.size);
1770     g_main_loop_quit (main_loop);
1771     return G_SOURCE_REMOVE;
1772   }
1773
1774   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1775   if (status == G_IO_STATUS_ERROR) {
1776     GST_ERROR ("Failed to read from stdin: %s", err->message);
1777     g_clear_error (&err);
1778     g_main_loop_quit (main_loop);
1779     return G_SOURCE_REMOVE;
1780   } else if (status == G_IO_STATUS_EOF) {
1781     GST_ERROR ("EOF on stdin");
1782     g_main_loop_quit (main_loop);
1783     return G_SOURCE_REMOVE;
1784   } else if (status != G_IO_STATUS_NORMAL) {
1785     GST_ERROR ("Unexpected stdin read status: %d", status);
1786     g_main_loop_quit (main_loop);
1787     return G_SOURCE_REMOVE;
1788   } else if (read != header.size) {
1789     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1790     g_main_loop_quit (main_loop);
1791     return G_SOURCE_REMOVE;
1792   }
1793
1794   switch (header.type) {
1795     case TYPE_EVENT:
1796     case TYPE_GENERAL:{
1797       GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
1798       PtpMessage msg;
1799
1800       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1801         dump_ptp_message (&msg);
1802         handle_ptp_message (&msg, receive_time);
1803       }
1804       break;
1805     }
1806     default:
1807     case TYPE_CLOCK_ID:{
1808       if (header.size != 8) {
1809         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1810         g_main_loop_quit (main_loop);
1811         return G_SOURCE_REMOVE;
1812       }
1813       g_mutex_lock (&ptp_lock);
1814       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1815       ptp_clock_id.port_number = getpid ();
1816       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1817           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1818       g_cond_signal (&ptp_cond);
1819       g_mutex_unlock (&ptp_lock);
1820       break;
1821     }
1822   }
1823
1824   return G_SOURCE_CONTINUE;
1825 }
1826
1827 /* Cleanup all announce messages and announce message senders
1828  * that are timed out by now, and clean up all pending syncs
1829  * that are missing their FOLLOW_UP or DELAY_RESP */
1830 static gboolean
1831 cleanup_cb (gpointer data)
1832 {
1833   GstClockTime now = gst_clock_get_time (observation_system_clock);
1834   GList *l, *m, *n;
1835
1836   for (l = domain_data; l; l = l->next) {
1837     PtpDomainData *domain = l->data;
1838
1839     for (n = domain->announce_senders; n;) {
1840       PtpAnnounceSender *sender = n->data;
1841       gboolean timed_out = TRUE;
1842
1843       /* Keep only 5 messages per sender around */
1844       while (g_queue_get_length (&sender->announce_messages) > 5) {
1845         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1846         g_free (msg);
1847       }
1848
1849       for (m = sender->announce_messages.head; m; m = m->next) {
1850         PtpAnnounceMessage *msg = m->data;
1851
1852         if (msg->receive_time +
1853             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1854           timed_out = FALSE;
1855           break;
1856         }
1857       }
1858
1859       if (timed_out) {
1860         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1861             sender->master_clock_identity.clock_identity,
1862             sender->master_clock_identity.port_number);
1863         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1864         g_queue_clear (&sender->announce_messages);
1865       }
1866
1867       if (g_queue_get_length (&sender->announce_messages) == 0) {
1868         GList *tmp = n->next;
1869
1870         if (compare_clock_identity (&sender->master_clock_identity,
1871                 &domain->master_clock_identity) == 0)
1872           GST_WARNING ("currently selected master clock timed out");
1873         g_free (sender);
1874         domain->announce_senders =
1875             g_list_delete_link (domain->announce_senders, n);
1876         n = tmp;
1877       } else {
1878         n = n->next;
1879       }
1880     }
1881     select_best_master_clock (domain, now);
1882
1883     /* Clean up any pending syncs */
1884     for (n = domain->pending_syncs.head; n;) {
1885       PtpPendingSync *sync = n->data;
1886       gboolean timed_out = FALSE;
1887
1888       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1889        * and pending delay reqs after 4 delay req intervals or 10 seconds
1890        */
1891       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1892           ((domain->min_delay_req_interval != 0
1893                   && sync->delay_req_send_time_local +
1894                   4 * domain->min_delay_req_interval < now)
1895               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1896         timed_out = TRUE;
1897       } else if ((domain->sync_interval != 0
1898               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
1899           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
1900         timed_out = TRUE;
1901       }
1902
1903       if (timed_out) {
1904         GList *tmp = n->next;
1905         ptp_pending_sync_free (sync);
1906         g_queue_delete_link (&domain->pending_syncs, n);
1907         n = tmp;
1908       } else {
1909         n = n->next;
1910       }
1911     }
1912   }
1913
1914   return G_SOURCE_CONTINUE;
1915 }
1916
1917 static gpointer
1918 ptp_helper_main (gpointer data)
1919 {
1920   GSource *cleanup_source;
1921
1922   GST_DEBUG ("Starting PTP helper loop");
1923
1924   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
1925   cleanup_source = g_timeout_source_new_seconds (5);
1926   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
1927   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
1928   g_source_attach (cleanup_source, main_context);
1929   g_source_unref (cleanup_source);
1930
1931   g_main_loop_run (main_loop);
1932   GST_DEBUG ("Stopped PTP helper loop");
1933
1934   g_mutex_lock (&ptp_lock);
1935   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
1936   ptp_clock_id.port_number = 0;
1937   initted = FALSE;
1938   g_cond_signal (&ptp_cond);
1939   g_mutex_unlock (&ptp_lock);
1940
1941   return NULL;
1942 }
1943
1944 /**
1945  * gst_ptp_is_supported:
1946  *
1947  * Check if PTP clocks are generally supported on this system, and if previous
1948  * initializations did not fail.
1949  *
1950  * Returns: %TRUE if PTP clocks are generally supported on this system, and
1951  * previous initializations did not fail.
1952  *
1953  * Since: 1.6
1954  */
1955 gboolean
1956 gst_ptp_is_supported (void)
1957 {
1958   return supported;
1959 }
1960
1961 /**
1962  * gst_ptp_is_initialized:
1963  *
1964  * Check if the GStreamer PTP clock subsystem is initialized.
1965  *
1966  * Returns: %TRUE if the GStreamer PTP clock subsystem is intialized.
1967  *
1968  * Since: 1.6
1969  */
1970 gboolean
1971 gst_ptp_is_initialized (void)
1972 {
1973   return initted;
1974 }
1975
1976 /**
1977  * gst_ptp_init:
1978  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
1979  * @interfaces: (transfer none) (array zero-terminated=1) (allow-none): network interfaces to run the clock on
1980  *
1981  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
1982  * slave-only mode for all domains on the given @interfaces with the
1983  * given @clock_id.
1984  *
1985  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
1986  * generated from the MAC address of the first network interface.
1987  *
1988  *
1989  * This function is automatically called by gst_ptp_clock_new() with default
1990  * parameters if it wasn't called before.
1991  *
1992  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
1993  *
1994  * Since: 1.6
1995  */
1996 gboolean
1997 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
1998 {
1999   gboolean ret;
2000   const gchar *env;
2001   gchar **argv = NULL;
2002   gint argc, argc_c;
2003   gint fd_r, fd_w;
2004   GError *err = NULL;
2005   GSource *stdin_source;
2006
2007   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
2008
2009   g_mutex_lock (&ptp_lock);
2010   if (!supported) {
2011     GST_ERROR ("PTP not supported");
2012     ret = FALSE;
2013     goto done;
2014   }
2015
2016   if (initted) {
2017     GST_DEBUG ("PTP already initialized");
2018     ret = TRUE;
2019     goto done;
2020   }
2021
2022   if (ptp_helper_pid) {
2023     GST_DEBUG ("PTP currently initializing");
2024     goto wait;
2025   }
2026
2027   if (!domain_stats_hooks_initted) {
2028     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2029     domain_stats_hooks_initted = TRUE;
2030   }
2031
2032   argc = 1;
2033   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2034     argc += 2;
2035   if (interfaces != NULL)
2036     argc += 2 * g_strv_length (interfaces);
2037
2038   argv = g_new0 (gchar *, argc + 2);
2039   argc_c = 0;
2040
2041   env = g_getenv ("GST_PTP_HELPER_1_0");
2042   if (env == NULL)
2043     env = g_getenv ("GST_PTP_HELPER");
2044   if (env != NULL && *env != '\0') {
2045     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2046     argv[argc_c++] = g_strdup (env);
2047   } else {
2048     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2049   }
2050
2051   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2052     argv[argc_c++] = g_strdup ("-c");
2053     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2054   }
2055
2056   if (interfaces != NULL) {
2057     gchar **ptr = interfaces;
2058
2059     while (*ptr) {
2060       argv[argc_c++] = g_strdup ("-i");
2061       argv[argc_c++] = g_strdup (*ptr);
2062       ptr++;
2063     }
2064   }
2065
2066   main_context = g_main_context_new ();
2067   main_loop = g_main_loop_new (main_context, FALSE);
2068
2069   ptp_helper_thread =
2070       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2071   if (!ptp_helper_thread) {
2072     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2073     g_clear_error (&err);
2074     ret = FALSE;
2075     goto done;
2076   }
2077
2078   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2079           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2080     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2081     g_clear_error (&err);
2082     ret = FALSE;
2083     supported = FALSE;
2084     goto done;
2085   }
2086
2087   stdin_channel = g_io_channel_unix_new (fd_r);
2088   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2089   g_io_channel_set_buffered (stdin_channel, FALSE);
2090   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2091   stdin_source =
2092       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2093   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2094   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2095       NULL);
2096   g_source_attach (stdin_source, main_context);
2097   g_source_unref (stdin_source);
2098
2099   /* Create stdout channel */
2100   stdout_channel = g_io_channel_unix_new (fd_w);
2101   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2102   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2103   g_io_channel_set_buffered (stdout_channel, FALSE);
2104
2105   delay_req_rand = g_rand_new ();
2106   observation_system_clock =
2107       g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
2108       NULL);
2109
2110   initted = TRUE;
2111
2112 wait:
2113   GST_DEBUG ("Waiting for PTP to be initialized");
2114
2115   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2116     g_cond_wait (&ptp_cond, &ptp_lock);
2117
2118   ret = initted;
2119   if (ret) {
2120     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2121         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2122   } else {
2123     GST_ERROR ("Failed to initialize");
2124     supported = FALSE;
2125   }
2126
2127 done:
2128   g_strfreev (argv);
2129
2130   if (!ret) {
2131     if (ptp_helper_pid) {
2132 #ifndef G_OS_WIN32
2133       kill (ptp_helper_pid, SIGKILL);
2134       waitpid (ptp_helper_pid, NULL, 0);
2135 #else
2136       TerminateProcess (ptp_helper_pid, 1);
2137       WaitForSingleObject (ptp_helper_pid, INFINITE);
2138 #endif
2139       g_spawn_close_pid (ptp_helper_pid);
2140     }
2141     ptp_helper_pid = 0;
2142
2143     if (stdin_channel)
2144       g_io_channel_unref (stdin_channel);
2145     stdin_channel = NULL;
2146     if (stdout_channel)
2147       g_io_channel_unref (stdout_channel);
2148     stdout_channel = NULL;
2149
2150     if (main_loop && ptp_helper_thread) {
2151       g_main_loop_quit (main_loop);
2152       g_thread_join (ptp_helper_thread);
2153     }
2154     ptp_helper_thread = NULL;
2155     if (main_loop)
2156       g_main_loop_unref (main_loop);
2157     main_loop = NULL;
2158     if (main_context)
2159       g_main_context_unref (main_context);
2160     main_context = NULL;
2161
2162     if (delay_req_rand)
2163       g_rand_free (delay_req_rand);
2164     delay_req_rand = NULL;
2165
2166     if (observation_system_clock)
2167       gst_object_unref (observation_system_clock);
2168     observation_system_clock = NULL;
2169   }
2170
2171   g_mutex_unlock (&ptp_lock);
2172
2173   return ret;
2174 }
2175
2176 /**
2177  * gst_ptp_deinit:
2178  *
2179  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2180  * are any remaining GstPtpClock instances, they won't be further synchronized
2181  * to the PTP network clock.
2182  *
2183  * Since: 1.6
2184  */
2185 void
2186 gst_ptp_deinit (void)
2187 {
2188   GList *l, *m;
2189
2190   g_mutex_lock (&ptp_lock);
2191
2192   if (ptp_helper_pid) {
2193 #ifndef G_OS_WIN32
2194     kill (ptp_helper_pid, SIGKILL);
2195     waitpid (ptp_helper_pid, NULL, 0);
2196 #else
2197     TerminateProcess (ptp_helper_pid, 1);
2198     WaitForSingleObject (ptp_helper_pid, INFINITE);
2199 #endif
2200     g_spawn_close_pid (ptp_helper_pid);
2201   }
2202   ptp_helper_pid = 0;
2203
2204   if (stdin_channel)
2205     g_io_channel_unref (stdin_channel);
2206   stdin_channel = NULL;
2207   if (stdout_channel)
2208     g_io_channel_unref (stdout_channel);
2209   stdout_channel = NULL;
2210
2211   if (main_loop && ptp_helper_thread) {
2212     GThread *tmp = ptp_helper_thread;
2213     ptp_helper_thread = NULL;
2214     g_mutex_unlock (&ptp_lock);
2215     g_main_loop_quit (main_loop);
2216     g_thread_join (tmp);
2217     g_mutex_lock (&ptp_lock);
2218   }
2219   if (main_loop)
2220     g_main_loop_unref (main_loop);
2221   main_loop = NULL;
2222   if (main_context)
2223     g_main_context_unref (main_context);
2224   main_context = NULL;
2225
2226   if (delay_req_rand)
2227     g_rand_free (delay_req_rand);
2228   delay_req_rand = NULL;
2229   if (observation_system_clock)
2230     gst_object_unref (observation_system_clock);
2231   observation_system_clock = NULL;
2232
2233   for (l = domain_data; l; l = l->next) {
2234     PtpDomainData *domain = l->data;
2235
2236     for (m = domain->announce_senders; m; m = m->next) {
2237       PtpAnnounceSender *sender = m->data;
2238
2239       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2240       g_queue_clear (&sender->announce_messages);
2241       g_free (sender);
2242     }
2243     g_list_free (domain->announce_senders);
2244
2245     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2246         NULL);
2247     g_queue_clear (&domain->pending_syncs);
2248     gst_object_unref (domain->domain_clock);
2249     g_free (domain);
2250   }
2251   g_list_free (domain_data);
2252   domain_data = NULL;
2253   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2254   g_list_free (domain_clocks);
2255   domain_clocks = NULL;
2256
2257   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2258   ptp_clock_id.port_number = 0;
2259
2260   initted = FALSE;
2261
2262   g_mutex_unlock (&ptp_lock);
2263 }
2264
2265 #define DEFAULT_DOMAIN 0
2266
2267 enum
2268 {
2269   PROP_0,
2270   PROP_DOMAIN,
2271   PROP_INTERNAL_CLOCK
2272 };
2273
2274 #define GST_PTP_CLOCK_GET_PRIVATE(obj)  \
2275   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PTP_CLOCK, GstPtpClockPrivate))
2276
2277 struct _GstPtpClockPrivate
2278 {
2279   guint domain;
2280   GstClock *domain_clock;
2281   gulong domain_stats_id;
2282 };
2283
2284 #define gst_ptp_clock_parent_class parent_class
2285 G_DEFINE_TYPE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2286
2287 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2288     const GValue * value, GParamSpec * pspec);
2289 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2290     GValue * value, GParamSpec * pspec);
2291 static void gst_ptp_clock_finalize (GObject * object);
2292
2293 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2294
2295 static void
2296 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2297 {
2298   GObjectClass *gobject_class;
2299   GstClockClass *clock_class;
2300
2301   gobject_class = G_OBJECT_CLASS (klass);
2302   clock_class = GST_CLOCK_CLASS (klass);
2303
2304   g_type_class_add_private (klass, sizeof (GstPtpClockPrivate));
2305
2306   gobject_class->finalize = gst_ptp_clock_finalize;
2307   gobject_class->get_property = gst_ptp_clock_get_property;
2308   gobject_class->set_property = gst_ptp_clock_set_property;
2309
2310   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2311       g_param_spec_uint ("domain", "Domain",
2312           "The PTP domain", 0, G_MAXUINT8,
2313           DEFAULT_DOMAIN,
2314           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2315
2316   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2317       g_param_spec_object ("internal-clock", "Internal Clock",
2318           "Internal clock", GST_TYPE_CLOCK,
2319           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2320
2321   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2322 }
2323
2324 static void
2325 gst_ptp_clock_init (GstPtpClock * self)
2326 {
2327   GstPtpClockPrivate *priv;
2328
2329   self->priv = priv = GST_PTP_CLOCK_GET_PRIVATE (self);
2330
2331   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2332   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2333
2334   priv->domain = DEFAULT_DOMAIN;
2335 }
2336
2337 static gboolean
2338 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2339 {
2340   gboolean got_clock = TRUE;
2341
2342   if (G_UNLIKELY (!self->priv->domain_clock)) {
2343     g_mutex_lock (&domain_clocks_lock);
2344     if (!self->priv->domain_clock) {
2345       GList *l;
2346
2347       got_clock = FALSE;
2348
2349       for (l = domain_clocks; l; l = l->next) {
2350         PtpDomainData *clock_data = l->data;
2351
2352         if (clock_data->domain == self->priv->domain
2353             && clock_data->last_ptp_time != 0) {
2354           self->priv->domain_clock = clock_data->domain_clock;
2355           got_clock = TRUE;
2356           break;
2357         }
2358       }
2359     }
2360     g_mutex_unlock (&domain_clocks_lock);
2361     if (got_clock) {
2362       g_object_notify (G_OBJECT (self), "internal-clock");
2363       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2364     }
2365   }
2366
2367   return got_clock;
2368 }
2369
2370 static gboolean
2371 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2372     gpointer user_data)
2373 {
2374   GstPtpClock *self = user_data;
2375
2376   if (domain != self->priv->domain
2377       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2378     return TRUE;
2379
2380   /* Let's set our internal clock */
2381   if (!gst_ptp_clock_ensure_domain_clock (self))
2382     return TRUE;
2383
2384   self->priv->domain_stats_id = 0;
2385
2386   return FALSE;
2387 }
2388
2389 static void
2390 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2391     const GValue * value, GParamSpec * pspec)
2392 {
2393   GstPtpClock *self = GST_PTP_CLOCK (object);
2394
2395   switch (prop_id) {
2396     case PROP_DOMAIN:
2397       self->priv->domain = g_value_get_uint (value);
2398       gst_ptp_clock_ensure_domain_clock (self);
2399       if (!self->priv->domain_clock)
2400         self->priv->domain_stats_id =
2401             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2402             NULL);
2403       break;
2404     default:
2405       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2406       break;
2407   }
2408 }
2409
2410 static void
2411 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2412     GValue * value, GParamSpec * pspec)
2413 {
2414   GstPtpClock *self = GST_PTP_CLOCK (object);
2415
2416   switch (prop_id) {
2417     case PROP_DOMAIN:
2418       g_value_set_uint (value, self->priv->domain);
2419       break;
2420     case PROP_INTERNAL_CLOCK:
2421       gst_ptp_clock_ensure_domain_clock (self);
2422       g_value_set_object (value, self->priv->domain_clock);
2423       break;
2424     default:
2425       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2426       break;
2427   }
2428 }
2429
2430 static void
2431 gst_ptp_clock_finalize (GObject * object)
2432 {
2433   GstPtpClock *self = GST_PTP_CLOCK (object);
2434
2435   if (self->priv->domain_stats_id)
2436     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2437
2438   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2439 }
2440
2441 static GstClockTime
2442 gst_ptp_clock_get_internal_time (GstClock * clock)
2443 {
2444   GstPtpClock *self = GST_PTP_CLOCK (clock);
2445
2446   gst_ptp_clock_ensure_domain_clock (self);
2447
2448   if (!self->priv->domain_clock) {
2449     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2450         self->priv->domain);
2451     return GST_CLOCK_TIME_NONE;
2452   }
2453
2454   return gst_clock_get_time (self->priv->domain_clock);
2455 }
2456
2457 /**
2458  * gst_ptp_clock_new:
2459  * @name: Name of the clock
2460  * @domain: PTP domain
2461  *
2462  * Creates a new PTP clock instance that exports the PTP time of the master
2463  * clock in @domain. This clock can be slaved to other clocks as needed.
2464  *
2465  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2466  * default parameters.
2467  *
2468  *
2469  * This clock only returns valid timestamps after it received the first
2470  * times from the PTP master clock on the network. Once this happens the
2471  * GstPtpClock::internal-clock property will become non-NULL. You can
2472  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
2473  * gst_clock_is_synced().
2474  *
2475  * Since: 1.6
2476  */
2477 GstClock *
2478 gst_ptp_clock_new (const gchar * name, guint domain)
2479 {
2480   g_return_val_if_fail (name != NULL, NULL);
2481   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2482
2483   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2484     GST_ERROR ("Failed to initialize PTP");
2485     return NULL;
2486   }
2487
2488   return g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2489       NULL);
2490 }
2491
2492 typedef struct
2493 {
2494   guint8 domain;
2495   const GstStructure *stats;
2496 } DomainStatsMarshalData;
2497
2498 static void
2499 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2500 {
2501   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2502
2503   if (!callback (data->domain, data->stats, hook->data))
2504     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2505 }
2506
2507 static void
2508 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2509 {
2510   DomainStatsMarshalData data = { domain, stats };
2511
2512   g_mutex_lock (&ptp_lock);
2513   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2514       (GHookMarshaller) domain_stats_marshaller, &data);
2515   g_mutex_unlock (&ptp_lock);
2516 }
2517
2518 /**
2519  * gst_ptp_statistics_callback_add:
2520  * @callback: GstPtpStatisticsCallback to call
2521  * @user_data: Data to pass to the callback
2522  * @destroy_data: GDestroyNotify to destroy the data
2523  *
2524  * Installs a new statistics callback for gathering PTP statistics. See
2525  * GstPtpStatisticsCallback for a list of statistics that are provided.
2526  *
2527  * Returns: Id for the callback that can be passed to
2528  * gst_ptp_statistics_callback_remove()
2529  *
2530  * Since: 1.6
2531  */
2532 gulong
2533 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2534     gpointer user_data, GDestroyNotify destroy_data)
2535 {
2536   GHook *hook;
2537
2538   g_mutex_lock (&ptp_lock);
2539
2540   if (!domain_stats_hooks_initted) {
2541     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2542     domain_stats_hooks_initted = TRUE;
2543   }
2544
2545   hook = g_hook_alloc (&domain_stats_hooks);
2546   hook->func = callback;
2547   hook->data = user_data;
2548   hook->destroy = destroy_data;
2549   g_hook_prepend (&domain_stats_hooks, hook);
2550   g_atomic_int_add (&domain_stats_n_hooks, 1);
2551
2552   g_mutex_unlock (&ptp_lock);
2553
2554   return hook->hook_id;
2555 }
2556
2557 /**
2558  * gst_ptp_statistics_callback_remove:
2559  * @id: Callback id to remove
2560  *
2561  * Removes a PTP statistics callback that was previously added with
2562  * gst_ptp_statistics_callback_add().
2563  *
2564  * Since: 1.6
2565  */
2566 void
2567 gst_ptp_statistics_callback_remove (gulong id)
2568 {
2569   g_mutex_lock (&ptp_lock);
2570   if (g_hook_destroy (&domain_stats_hooks, id))
2571     g_atomic_int_add (&domain_stats_n_hooks, -1);
2572   g_mutex_unlock (&ptp_lock);
2573 }