1528da2e7d422749d8552e823770af76a9ebd4af
[platform/upstream/gstreamer.git] / libs / gst / net / gstptpclock.c
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @short_description: Special clock that synchronizes to a remote time
23  *                     provider via PTP (IEEE1588:2008).
24  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
25  *
26  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
27  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
28  * clock in some specific domain.
29  *
30  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
31  * a helper process to do the actual communication via the PTP ports. This is
32  * required as PTP listens on ports < 1024 and thus requires special
33  * privileges. Once this helper process is started, the main process will
34  * synchronize to all PTP domains that are detected on the selected
35  * interfaces.
36  *
37  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
38  * time from a master clock inside a specific PTP domain. This clock will only
39  * return valid timestamps once the timestamps in the PTP domain are known. To
40  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
41  * signal and gst_clock_is_synced().
42  *
43  *
44  * To gather statistics about the PTP clock synchronization,
45  * gst_ptp_statistics_callback_add() can be used. This gives the application
46  * the possibility to collect all kinds of statistics from the clock
47  * synchronization.
48  *
49  * Since: 1.6
50  *
51  */
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include "gstptpclock.h"
57
58 #include "gstptp_private.h"
59
60 #ifdef HAVE_SYS_WAIT_H
61 #include <sys/wait.h>
62 #endif
63 #ifdef G_OS_WIN32
64 #define WIN32_LEAN_AND_MEAN
65 #include <windows.h>
66 #endif
67 #include <sys/types.h>
68
69 #ifdef HAVE_UNISTD_H
70 #include <unistd.h>
71 #elif defined(G_OS_WIN32)
72 #include <io.h>
73 #endif
74
75 #include <gst/base/base.h>
76
77 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
78 #define GST_CAT_DEFAULT (ptp_debug)
79
80 /* IEEE 1588 7.7.3.1 */
81 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
82
83 /* Use a running average for calculating the mean path delay instead
84  * of just using the last measurement. Enabling this helps in unreliable
85  * networks, like wifi, with often changing delays
86  *
87  * Undef for following IEEE1588-2008 by the letter
88  */
89 #define USE_RUNNING_AVERAGE_DELAY 1
90
91 /* Filter out any measurements that are above a certain threshold compared to
92  * previous measurements. Enabling this helps filtering out outliers that
93  * happen fairly often in unreliable networks, like wifi.
94  *
95  * Undef for following IEEE1588-2008 by the letter
96  */
97 #define USE_MEASUREMENT_FILTERING 1
98
99 /* Select the first clock from which we capture a SYNC message as the master
100  * clock of the domain until we are ready to run the best master clock
101  * algorithm. This allows faster syncing but might mean a change of the master
102  * clock in the beginning. As all clocks in a domain are supposed to use the
103  * same time, this shouldn't be much of a problem.
104  *
105  * Undef for following IEEE1588-2008 by the letter
106  */
107 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
108
109 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
110  * afterwards. This allows better synchronization in networks with varying
111  * delays, as for every other SYNC message we would have to assume that it's
112  * the average of what we saw before. But that might be completely off
113  */
114 #define USE_ONLY_SYNC_WITH_DELAY 1
115
116 /* Filter out delay measurements that are too far away from the median of the
117  * last delay measurements, currently those that are more than 2 times as big.
118  * This increases accuracy a lot on wifi.
119  */
120 #define USE_MEDIAN_PRE_FILTERING 1
121 #define MEDIAN_PRE_FILTERING_WINDOW 9
122
123 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
124 #define MAX_SKIPPED_UPDATES 5
125
126 typedef enum
127 {
128   PTP_MESSAGE_TYPE_SYNC = 0x0,
129   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
130   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
131   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
132   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
133   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
134   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
135   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
136   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
137   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
138 } PtpMessageType;
139
140 typedef struct
141 {
142   guint64 seconds_field;        /* 48 bits valid */
143   guint32 nanoseconds_field;
144 } PtpTimestamp;
145
146 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
147 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
148 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
149
150 typedef struct
151 {
152   guint64 clock_identity;
153   guint16 port_number;
154 } PtpClockIdentity;
155
156 static gint
157 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
158 {
159   if (a->clock_identity < b->clock_identity)
160     return -1;
161   else if (a->clock_identity > b->clock_identity)
162     return 1;
163
164   if (a->port_number < b->port_number)
165     return -1;
166   else if (a->port_number > b->port_number)
167     return 1;
168
169   return 0;
170 }
171
172 typedef struct
173 {
174   guint8 clock_class;
175   guint8 clock_accuracy;
176   guint16 offset_scaled_log_variance;
177 } PtpClockQuality;
178
179 typedef struct
180 {
181   guint8 transport_specific;
182   PtpMessageType message_type;
183   /* guint8 reserved; */
184   guint8 version_ptp;
185   guint16 message_length;
186   guint8 domain_number;
187   /* guint8 reserved; */
188   guint16 flag_field;
189   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
190   /* guint32 reserved; */
191   PtpClockIdentity source_port_identity;
192   guint16 sequence_id;
193   guint8 control_field;
194   gint8 log_message_interval;
195
196   union
197   {
198     struct
199     {
200       PtpTimestamp origin_timestamp;
201       gint16 current_utc_offset;
202       /* guint8 reserved; */
203       guint8 grandmaster_priority_1;
204       PtpClockQuality grandmaster_clock_quality;
205       guint8 grandmaster_priority_2;
206       guint64 grandmaster_identity;
207       guint16 steps_removed;
208       guint8 time_source;
209     } announce;
210
211     struct
212     {
213       PtpTimestamp origin_timestamp;
214     } sync;
215
216     struct
217     {
218       PtpTimestamp precise_origin_timestamp;
219     } follow_up;
220
221     struct
222     {
223       PtpTimestamp origin_timestamp;
224     } delay_req;
225
226     struct
227     {
228       PtpTimestamp receive_timestamp;
229       PtpClockIdentity requesting_port_identity;
230     } delay_resp;
231
232   } message_specific;
233 } PtpMessage;
234
235 static GMutex ptp_lock;
236 static GCond ptp_cond;
237 static gboolean initted = FALSE;
238 #ifdef HAVE_PTP
239 static gboolean supported = TRUE;
240 #else
241 static gboolean supported = FALSE;
242 #endif
243 static GPid ptp_helper_pid;
244 static GThread *ptp_helper_thread;
245 static GMainContext *main_context;
246 static GMainLoop *main_loop;
247 static GIOChannel *stdin_channel, *stdout_channel;
248 static GRand *delay_req_rand;
249 static GstClock *observation_system_clock;
250 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
251
252 typedef struct
253 {
254   GstClockTime receive_time;
255
256   PtpClockIdentity master_clock_identity;
257
258   guint8 grandmaster_priority_1;
259   PtpClockQuality grandmaster_clock_quality;
260   guint8 grandmaster_priority_2;
261   guint64 grandmaster_identity;
262   guint16 steps_removed;
263   guint8 time_source;
264
265   guint16 sequence_id;
266 } PtpAnnounceMessage;
267
268 typedef struct
269 {
270   PtpClockIdentity master_clock_identity;
271
272   GstClockTime announce_interval;       /* last interval we received */
273   GQueue announce_messages;
274 } PtpAnnounceSender;
275
276 typedef struct
277 {
278   guint domain;
279   PtpClockIdentity master_clock_identity;
280
281   guint16 sync_seqnum;
282   GstClockTime sync_recv_time_local;    /* t2 */
283   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
284   GstClockTime follow_up_recv_time_local;
285
286   GSource *timeout_source;
287   guint16 delay_req_seqnum;
288   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
289   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
290   GstClockTime delay_resp_recv_time_local;
291
292   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
293   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
294 } PtpPendingSync;
295
296 static void
297 ptp_pending_sync_free (PtpPendingSync * sync)
298 {
299   if (sync->timeout_source)
300     g_source_destroy (sync->timeout_source);
301   g_free (sync);
302 }
303
304 typedef struct
305 {
306   guint domain;
307
308   GstClockTime last_ptp_time;
309   GstClockTime last_local_time;
310   gint skipped_updates;
311
312   /* Used for selecting the master/grandmaster */
313   GList *announce_senders;
314
315   /* Last selected master clock */
316   gboolean have_master_clock;
317   PtpClockIdentity master_clock_identity;
318   guint64 grandmaster_identity;
319
320   /* Last SYNC or FOLLOW_UP timestamp we received */
321   GstClockTime last_ptp_sync_time;
322   GstClockTime sync_interval;
323
324   GstClockTime mean_path_delay;
325   GstClockTime last_delay_req, min_delay_req_interval;
326   guint16 last_delay_req_seqnum;
327
328   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
329   gint last_path_delays_missing;
330
331   GQueue pending_syncs;
332
333   GstClock *domain_clock;
334 } PtpDomainData;
335
336 static GList *domain_data;
337 static GMutex domain_clocks_lock;
338 static GList *domain_clocks;
339
340 /* Protected by PTP lock */
341 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
342 static GHookList domain_stats_hooks;
343 static gint domain_stats_n_hooks;
344 static gboolean domain_stats_hooks_initted = FALSE;
345
346 /* Converts log2 seconds to GstClockTime */
347 static GstClockTime
348 log2_to_clock_time (gint l)
349 {
350   if (l < 0)
351     return GST_SECOND >> (-l);
352   else
353     return GST_SECOND << l;
354 }
355
356 static void
357 dump_ptp_message (PtpMessage * msg)
358 {
359   GST_TRACE ("PTP message:");
360   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
361   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
362   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
363   GST_TRACE ("\tmessage_length: %u", msg->message_length);
364   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
365   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
366   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
367       (msg->correction_field / 65536),
368       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
369   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
370       msg->source_port_identity.clock_identity,
371       msg->source_port_identity.port_number);
372   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
373   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
374   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
375       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
376
377   switch (msg->message_type) {
378     case PTP_MESSAGE_TYPE_ANNOUNCE:
379       GST_TRACE ("\tANNOUNCE:");
380       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
381           msg->message_specific.announce.origin_timestamp.seconds_field,
382           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
383       GST_TRACE ("\t\tcurrent_utc_offset: %d",
384           msg->message_specific.announce.current_utc_offset);
385       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
386           msg->message_specific.announce.grandmaster_priority_1);
387       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
388           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
389           msg->message_specific.announce.
390           grandmaster_clock_quality.clock_accuracy,
391           msg->message_specific.announce.
392           grandmaster_clock_quality.offset_scaled_log_variance);
393       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
394           msg->message_specific.announce.grandmaster_priority_2);
395       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
396           msg->message_specific.announce.grandmaster_identity);
397       GST_TRACE ("\t\tsteps_removed: %u",
398           msg->message_specific.announce.steps_removed);
399       GST_TRACE ("\t\ttime_source: 0x%02x",
400           msg->message_specific.announce.time_source);
401       break;
402     case PTP_MESSAGE_TYPE_SYNC:
403       GST_TRACE ("\tSYNC:");
404       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
405           msg->message_specific.sync.origin_timestamp.seconds_field,
406           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
407       break;
408     case PTP_MESSAGE_TYPE_FOLLOW_UP:
409       GST_TRACE ("\tFOLLOW_UP:");
410       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
411           msg->message_specific.follow_up.
412           precise_origin_timestamp.seconds_field,
413           msg->message_specific.follow_up.
414           precise_origin_timestamp.nanoseconds_field);
415       break;
416     case PTP_MESSAGE_TYPE_DELAY_REQ:
417       GST_TRACE ("\tDELAY_REQ:");
418       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
419           msg->message_specific.delay_req.origin_timestamp.seconds_field,
420           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
421       break;
422     case PTP_MESSAGE_TYPE_DELAY_RESP:
423       GST_TRACE ("\tDELAY_RESP:");
424       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
425           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
426           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
427       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
428           "x %u",
429           msg->message_specific.delay_resp.
430           requesting_port_identity.clock_identity,
431           msg->message_specific.delay_resp.
432           requesting_port_identity.port_number);
433       break;
434     default:
435       break;
436   }
437   GST_TRACE (" ");
438 }
439
440 /* IEEE 1588-2008 5.3.3 */
441 static gboolean
442 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
443 {
444   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
445
446   timestamp->seconds_field =
447       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
448       gst_byte_reader_get_uint16_be_unchecked (reader);
449   timestamp->nanoseconds_field =
450       gst_byte_reader_get_uint32_be_unchecked (reader);
451
452   if (timestamp->nanoseconds_field >= 1000000000)
453     return FALSE;
454
455   return TRUE;
456 }
457
458 /* IEEE 1588-2008 13.3 */
459 static gboolean
460 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
461 {
462   guint8 b;
463
464   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
465
466   b = gst_byte_reader_get_uint8_unchecked (reader);
467   msg->transport_specific = b >> 4;
468   msg->message_type = b & 0x0f;
469
470   b = gst_byte_reader_get_uint8_unchecked (reader);
471   msg->version_ptp = b & 0x0f;
472   if (msg->version_ptp != 2) {
473     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
474     return FALSE;
475   }
476
477   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
478   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
479     GST_WARNING ("Not enough data (%u < %u)",
480         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
481     return FALSE;
482   }
483
484   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
485   gst_byte_reader_skip_unchecked (reader, 1);
486
487   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
488   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
489   gst_byte_reader_skip_unchecked (reader, 4);
490
491   msg->source_port_identity.clock_identity =
492       gst_byte_reader_get_uint64_be_unchecked (reader);
493   msg->source_port_identity.port_number =
494       gst_byte_reader_get_uint16_be_unchecked (reader);
495
496   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
497   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
498   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
499
500   return TRUE;
501 }
502
503 /* IEEE 1588-2008 13.5 */
504 static gboolean
505 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
506 {
507   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
508
509   if (gst_byte_reader_get_remaining (reader) < 20)
510     return FALSE;
511
512   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
513           reader))
514     return FALSE;
515
516   msg->message_specific.announce.current_utc_offset =
517       gst_byte_reader_get_uint16_be_unchecked (reader);
518   gst_byte_reader_skip_unchecked (reader, 1);
519
520   msg->message_specific.announce.grandmaster_priority_1 =
521       gst_byte_reader_get_uint8_unchecked (reader);
522   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
523       gst_byte_reader_get_uint8_unchecked (reader);
524   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
525       gst_byte_reader_get_uint8_unchecked (reader);
526   msg->message_specific.announce.
527       grandmaster_clock_quality.offset_scaled_log_variance =
528       gst_byte_reader_get_uint16_be_unchecked (reader);
529   msg->message_specific.announce.grandmaster_priority_2 =
530       gst_byte_reader_get_uint8_unchecked (reader);
531   msg->message_specific.announce.grandmaster_identity =
532       gst_byte_reader_get_uint64_be_unchecked (reader);
533   msg->message_specific.announce.steps_removed =
534       gst_byte_reader_get_uint16_be_unchecked (reader);
535   msg->message_specific.announce.time_source =
536       gst_byte_reader_get_uint8_unchecked (reader);
537
538   return TRUE;
539 }
540
541 /* IEEE 1588-2008 13.6 */
542 static gboolean
543 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
544 {
545   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
546
547   if (gst_byte_reader_get_remaining (reader) < 10)
548     return FALSE;
549
550   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
551           reader))
552     return FALSE;
553
554   return TRUE;
555 }
556
557 /* IEEE 1588-2008 13.6 */
558 static gboolean
559 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
560 {
561   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
562
563   if (gst_byte_reader_get_remaining (reader) < 10)
564     return FALSE;
565
566   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
567           reader))
568     return FALSE;
569
570   return TRUE;
571 }
572
573 /* IEEE 1588-2008 13.7 */
574 static gboolean
575 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
576 {
577   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
578
579   if (gst_byte_reader_get_remaining (reader) < 10)
580     return FALSE;
581
582   if (!parse_ptp_timestamp (&msg->message_specific.
583           follow_up.precise_origin_timestamp, reader))
584     return FALSE;
585
586   return TRUE;
587 }
588
589 /* IEEE 1588-2008 13.8 */
590 static gboolean
591 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
592 {
593   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
594       FALSE);
595
596   if (gst_byte_reader_get_remaining (reader) < 20)
597     return FALSE;
598
599   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
600           reader))
601     return FALSE;
602
603   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
604       gst_byte_reader_get_uint64_be_unchecked (reader);
605   msg->message_specific.delay_resp.requesting_port_identity.port_number =
606       gst_byte_reader_get_uint16_be_unchecked (reader);
607
608   return TRUE;
609 }
610
611 static gboolean
612 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
613 {
614   GstByteReader reader;
615   gboolean ret = FALSE;
616
617   gst_byte_reader_init (&reader, data, size);
618
619   if (!parse_ptp_message_header (msg, &reader)) {
620     GST_WARNING ("Failed to parse PTP message header");
621     return FALSE;
622   }
623
624   switch (msg->message_type) {
625     case PTP_MESSAGE_TYPE_SYNC:
626       ret = parse_ptp_message_sync (msg, &reader);
627       break;
628     case PTP_MESSAGE_TYPE_FOLLOW_UP:
629       ret = parse_ptp_message_follow_up (msg, &reader);
630       break;
631     case PTP_MESSAGE_TYPE_DELAY_REQ:
632       ret = parse_ptp_message_delay_req (msg, &reader);
633       break;
634     case PTP_MESSAGE_TYPE_DELAY_RESP:
635       ret = parse_ptp_message_delay_resp (msg, &reader);
636       break;
637     case PTP_MESSAGE_TYPE_ANNOUNCE:
638       ret = parse_ptp_message_announce (msg, &reader);
639       break;
640     default:
641       /* ignore for now */
642       break;
643   }
644
645   return ret;
646 }
647
648 static gint
649 compare_announce_message (const PtpAnnounceMessage * a,
650     const PtpAnnounceMessage * b)
651 {
652   /* IEEE 1588 Figure 27 */
653   if (a->grandmaster_identity == b->grandmaster_identity) {
654     if (a->steps_removed + 1 < b->steps_removed)
655       return -1;
656     else if (a->steps_removed > b->steps_removed + 1)
657       return 1;
658
659     /* Error cases are filtered out earlier */
660     if (a->steps_removed < b->steps_removed)
661       return -1;
662     else if (a->steps_removed > b->steps_removed)
663       return 1;
664
665     /* Error cases are filtered out earlier */
666     if (a->master_clock_identity.clock_identity <
667         b->master_clock_identity.clock_identity)
668       return -1;
669     else if (a->master_clock_identity.clock_identity >
670         b->master_clock_identity.clock_identity)
671       return 1;
672
673     /* Error cases are filtered out earlier */
674     if (a->master_clock_identity.port_number <
675         b->master_clock_identity.port_number)
676       return -1;
677     else if (a->master_clock_identity.port_number >
678         b->master_clock_identity.port_number)
679       return 1;
680     else
681       g_assert_not_reached ();
682
683     return 0;
684   }
685
686   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
687     return -1;
688   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
689     return 1;
690
691   if (a->grandmaster_clock_quality.clock_class <
692       b->grandmaster_clock_quality.clock_class)
693     return -1;
694   else if (a->grandmaster_clock_quality.clock_class >
695       b->grandmaster_clock_quality.clock_class)
696     return 1;
697
698   if (a->grandmaster_clock_quality.clock_accuracy <
699       b->grandmaster_clock_quality.clock_accuracy)
700     return -1;
701   else if (a->grandmaster_clock_quality.clock_accuracy >
702       b->grandmaster_clock_quality.clock_accuracy)
703     return 1;
704
705   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
706       b->grandmaster_clock_quality.offset_scaled_log_variance)
707     return -1;
708   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
709       b->grandmaster_clock_quality.offset_scaled_log_variance)
710     return 1;
711
712   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
713     return -1;
714   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
715     return 1;
716
717   if (a->grandmaster_identity < b->grandmaster_identity)
718     return -1;
719   else if (a->grandmaster_identity > b->grandmaster_identity)
720     return 1;
721   else
722     g_assert_not_reached ();
723
724   return 0;
725 }
726
727 static void
728 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
729 {
730   GList *qualified_messages = NULL;
731   GList *l, *m;
732   PtpAnnounceMessage *best = NULL;
733
734   /* IEEE 1588 9.3.2.5 */
735   for (l = domain->announce_senders; l; l = l->next) {
736     PtpAnnounceSender *sender = l->data;
737     GstClockTime window = 4 * sender->announce_interval;
738     gint count = 0;
739
740     for (m = sender->announce_messages.head; m; m = m->next) {
741       PtpAnnounceMessage *msg = m->data;
742
743       if (now - msg->receive_time <= window)
744         count++;
745     }
746
747     /* Only include the newest message of announce senders that had at least 2
748      * announce messages in the last 4 announce intervals. Which also means
749      * that we wait at least 4 announce intervals before we select a master
750      * clock. Until then we just report based on the newest SYNC we received
751      */
752     if (count >= 2) {
753       qualified_messages =
754           g_list_prepend (qualified_messages,
755           g_queue_peek_tail (&sender->announce_messages));
756     }
757   }
758
759   if (!qualified_messages) {
760     GST_DEBUG
761         ("No qualified announce messages for domain %u, can't select a master clock",
762         domain->domain);
763     domain->have_master_clock = FALSE;
764     return;
765   }
766
767   for (l = qualified_messages; l; l = l->next) {
768     PtpAnnounceMessage *msg = l->data;
769
770     if (!best || compare_announce_message (msg, best) < 0)
771       best = msg;
772   }
773
774   if (domain->have_master_clock
775       && compare_clock_identity (&domain->master_clock_identity,
776           &best->master_clock_identity) == 0) {
777     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
778   } else {
779     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
780         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
781         domain->domain, best->master_clock_identity.clock_identity,
782         best->master_clock_identity.port_number, best->grandmaster_identity);
783
784     domain->have_master_clock = TRUE;
785     domain->grandmaster_identity = best->grandmaster_identity;
786
787     /* Opportunistic master clock selection likely gave us the same master
788      * clock before, no need to reset all statistics */
789     if (compare_clock_identity (&domain->master_clock_identity,
790             &best->master_clock_identity) != 0) {
791       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
792           sizeof (PtpClockIdentity));
793       domain->mean_path_delay = 0;
794       domain->last_delay_req = 0;
795       domain->last_path_delays_missing = 9;
796       domain->min_delay_req_interval = 0;
797       domain->sync_interval = 0;
798       domain->last_ptp_sync_time = 0;
799       domain->skipped_updates = 0;
800       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
801           NULL);
802       g_queue_clear (&domain->pending_syncs);
803     }
804
805     if (g_atomic_int_get (&domain_stats_n_hooks)) {
806       GstStructure *stats =
807           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
808           "domain", G_TYPE_UINT, domain->domain,
809           "master-clock-id", G_TYPE_UINT64,
810           domain->master_clock_identity.clock_identity,
811           "master-clock-port", G_TYPE_UINT,
812           domain->master_clock_identity.port_number,
813           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
814           NULL);
815       emit_ptp_statistics (domain->domain, stats);
816       gst_structure_free (stats);
817     }
818   }
819 }
820
821 static void
822 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
823 {
824   GList *l;
825   PtpDomainData *domain = NULL;
826   PtpAnnounceSender *sender = NULL;
827   PtpAnnounceMessage *announce;
828
829   /* IEEE1588 9.3.2.2 e)
830    * Don't consider messages with the alternate master flag set
831    */
832   if ((msg->flag_field & 0x0100))
833     return;
834
835   /* IEEE 1588 9.3.2.5 d)
836    * Don't consider announce messages with steps_removed>=255
837    */
838   if (msg->message_specific.announce.steps_removed >= 255)
839     return;
840
841   for (l = domain_data; l; l = l->next) {
842     PtpDomainData *tmp = l->data;
843
844     if (tmp->domain == msg->domain_number) {
845       domain = tmp;
846       break;
847     }
848   }
849
850   if (!domain) {
851     gchar *clock_name;
852
853     domain = g_new0 (PtpDomainData, 1);
854     domain->domain = msg->domain_number;
855     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
856     domain->domain_clock =
857         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
858     g_free (clock_name);
859     g_queue_init (&domain->pending_syncs);
860     domain->last_path_delays_missing = 9;
861     domain_data = g_list_prepend (domain_data, domain);
862
863     g_mutex_lock (&domain_clocks_lock);
864     domain_clocks = g_list_prepend (domain_clocks, domain);
865     g_mutex_unlock (&domain_clocks_lock);
866
867     if (g_atomic_int_get (&domain_stats_n_hooks)) {
868       GstStructure *stats =
869           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
870           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
871           domain->domain_clock, NULL);
872       emit_ptp_statistics (domain->domain, stats);
873       gst_structure_free (stats);
874     }
875   }
876
877   for (l = domain->announce_senders; l; l = l->next) {
878     PtpAnnounceSender *tmp = l->data;
879
880     if (compare_clock_identity (&tmp->master_clock_identity,
881             &msg->source_port_identity) == 0) {
882       sender = tmp;
883       break;
884     }
885   }
886
887   if (!sender) {
888     sender = g_new0 (PtpAnnounceSender, 1);
889
890     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
891         sizeof (PtpClockIdentity));
892     g_queue_init (&sender->announce_messages);
893     domain->announce_senders =
894         g_list_prepend (domain->announce_senders, sender);
895   }
896
897   for (l = sender->announce_messages.head; l; l = l->next) {
898     PtpAnnounceMessage *tmp = l->data;
899
900     /* IEEE 1588 9.3.2.5 c)
901      * Don't consider identical messages, i.e. duplicates
902      */
903     if (tmp->sequence_id == msg->sequence_id)
904       return;
905   }
906
907   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
908
909   announce = g_new0 (PtpAnnounceMessage, 1);
910   announce->receive_time = receive_time;
911   announce->sequence_id = msg->sequence_id;
912   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
913       sizeof (PtpClockIdentity));
914   announce->grandmaster_identity =
915       msg->message_specific.announce.grandmaster_identity;
916   announce->grandmaster_priority_1 =
917       msg->message_specific.announce.grandmaster_priority_1;
918   announce->grandmaster_clock_quality.clock_class =
919       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
920   announce->grandmaster_clock_quality.clock_accuracy =
921       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
922   announce->grandmaster_clock_quality.offset_scaled_log_variance =
923       msg->message_specific.announce.
924       grandmaster_clock_quality.offset_scaled_log_variance;
925   announce->grandmaster_priority_2 =
926       msg->message_specific.announce.grandmaster_priority_2;
927   announce->steps_removed = msg->message_specific.announce.steps_removed;
928   announce->time_source = msg->message_specific.announce.time_source;
929   g_queue_push_tail (&sender->announce_messages, announce);
930
931   select_best_master_clock (domain, receive_time);
932 }
933
934 static gboolean
935 send_delay_req_timeout (PtpPendingSync * sync)
936 {
937   StdIOHeader header = { 0, };
938   guint8 delay_req[44];
939   GstByteWriter writer;
940   GIOStatus status;
941   gsize written;
942   GError *err = NULL;
943
944   header.type = TYPE_EVENT;
945   header.size = 44;
946
947   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
948   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
949   gst_byte_writer_put_uint8_unchecked (&writer, 2);
950   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
951   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
952   gst_byte_writer_put_uint8_unchecked (&writer, 0);
953   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
954   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
955   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
956   gst_byte_writer_put_uint64_be_unchecked (&writer,
957       ptp_clock_id.clock_identity);
958   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
959   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
960   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
961   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
962   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
963   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
964
965   status =
966       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
967       sizeof (header), &written, &err);
968   if (status == G_IO_STATUS_ERROR) {
969     g_warning ("Failed to write to stdout: %s", err->message);
970     g_clear_error (&err);
971     return G_SOURCE_REMOVE;
972   } else if (status == G_IO_STATUS_EOF) {
973     g_message ("EOF on stdout");
974     g_main_loop_quit (main_loop);
975     return G_SOURCE_REMOVE;
976   } else if (status != G_IO_STATUS_NORMAL) {
977     g_warning ("Unexpected stdout write status: %d", status);
978     g_main_loop_quit (main_loop);
979     return G_SOURCE_REMOVE;
980   } else if (written != sizeof (header)) {
981     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
982     g_main_loop_quit (main_loop);
983     return G_SOURCE_REMOVE;
984   }
985
986   sync->delay_req_send_time_local =
987       gst_clock_get_time (observation_system_clock);
988
989   status =
990       g_io_channel_write_chars (stdout_channel,
991       (const gchar *) delay_req, 44, &written, &err);
992   if (status == G_IO_STATUS_ERROR) {
993     g_warning ("Failed to write to stdout: %s", err->message);
994     g_clear_error (&err);
995     g_main_loop_quit (main_loop);
996     return G_SOURCE_REMOVE;
997   } else if (status == G_IO_STATUS_EOF) {
998     g_message ("EOF on stdout");
999     g_main_loop_quit (main_loop);
1000     return G_SOURCE_REMOVE;
1001   } else if (status != G_IO_STATUS_NORMAL) {
1002     g_warning ("Unexpected stdout write status: %d", status);
1003     g_main_loop_quit (main_loop);
1004     return G_SOURCE_REMOVE;
1005   } else if (written != 44) {
1006     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
1007     g_main_loop_quit (main_loop);
1008     return G_SOURCE_REMOVE;
1009   }
1010
1011   return G_SOURCE_REMOVE;
1012 }
1013
1014 static gboolean
1015 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
1016 {
1017   GstClockTime now = gst_clock_get_time (observation_system_clock);
1018   guint timeout;
1019   GSource *timeout_source;
1020
1021   if (domain->last_delay_req != 0
1022       && domain->last_delay_req + domain->min_delay_req_interval > now)
1023     return FALSE;
1024
1025   domain->last_delay_req = now;
1026   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1027
1028   /* IEEE 1588 9.5.11.2 */
1029   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1030     timeout = 0;
1031   else
1032     timeout =
1033         g_rand_int_range (delay_req_rand, 0,
1034         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1035
1036   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1037   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1038   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1039       sync, NULL);
1040   g_source_attach (timeout_source, main_context);
1041
1042   return TRUE;
1043 }
1044
1045 /* Filtering of outliers for RTT and time calculations inspired
1046  * by the code from gstnetclientclock.c
1047  */
1048 static void
1049 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1050 {
1051   GstClockTime internal_time, external_time, rate_num, rate_den;
1052   GstClockTime corrected_ptp_time, corrected_local_time;
1053   gdouble r_squared = 0.0;
1054   gboolean synced;
1055   GstClockTimeDiff discont = 0;
1056   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1057 #ifdef USE_MEASUREMENT_FILTERING
1058   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1059       orig_rate_den;
1060   GstClockTime new_estimated_ptp_time;
1061   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1062   gboolean now_synced;
1063 #endif
1064
1065 #ifdef USE_ONLY_SYNC_WITH_DELAY
1066   GstClockTime mean_path_delay;
1067
1068   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE)
1069     return;
1070
1071   /* IEEE 1588 11.3 */
1072   mean_path_delay =
1073       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1074       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1075       (sync->correction_field_sync + sync->correction_field_delay +
1076           32768) / 65536) / 2;
1077 #endif
1078
1079   /* IEEE 1588 11.2 */
1080   corrected_ptp_time =
1081       sync->sync_send_time_remote +
1082       (sync->correction_field_sync + 32768) / 65536;
1083
1084 #ifdef USE_ONLY_SYNC_WITH_DELAY
1085   corrected_local_time = sync->sync_recv_time_local - mean_path_delay;
1086 #else
1087   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1088 #endif
1089
1090 #ifdef USE_MEASUREMENT_FILTERING
1091   /* We check this here and when updating the mean path delay, because
1092    * we can get here without a delay response too */
1093   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1094       && sync->follow_up_recv_time_local >
1095       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1096     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1097         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1098         GST_TIME_ARGS (sync->follow_up_recv_time_local),
1099         GST_TIME_ARGS (domain->mean_path_delay));
1100     synced = FALSE;
1101     gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1102         &internal_time, &external_time, &rate_num, &rate_den);
1103     goto out;
1104   }
1105 #endif
1106
1107   /* Set an initial local-remote relation */
1108   if (domain->last_ptp_time == 0)
1109     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1110         corrected_ptp_time, 1, 1);
1111
1112 #ifdef USE_MEASUREMENT_FILTERING
1113   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1114    * estimate with our present knowledge about the clock
1115    */
1116   /* Store what the clock produced as 'now' before this update */
1117   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1118       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1119   internal_time = orig_internal_time;
1120   external_time = orig_external_time;
1121   rate_num = orig_rate_num;
1122   rate_den = orig_rate_den;
1123
1124   /* 3/4 RTT window around the estimation */
1125   max_discont = domain->mean_path_delay * 3 / 2;
1126
1127   /* Check if the estimated sync time is inside our window */
1128   estimated_ptp_time_min = corrected_local_time - max_discont;
1129   estimated_ptp_time_min =
1130       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1131       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1132   estimated_ptp_time_max = corrected_local_time + max_discont;
1133   estimated_ptp_time_max =
1134       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1135       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1136
1137   synced = (estimated_ptp_time_min < corrected_ptp_time
1138       && corrected_ptp_time < estimated_ptp_time_max);
1139
1140   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1141       GST_TIME_FORMAT, domain->domain,
1142       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1143
1144   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1145       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1146       GST_TIME_ARGS (corrected_ptp_time),
1147       GST_TIME_ARGS (estimated_ptp_time_max));
1148
1149   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1150           corrected_local_time, corrected_ptp_time, &r_squared,
1151           &internal_time, &external_time, &rate_num, &rate_den)) {
1152     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1153
1154     /* Old estimated PTP time based on receive time and path delay */
1155     estimated_ptp_time = corrected_local_time;
1156     estimated_ptp_time =
1157         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1158         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1159         orig_external_time, orig_rate_num, orig_rate_den);
1160
1161     /* New estimated PTP time based on receive time and path delay */
1162     new_estimated_ptp_time = corrected_local_time;
1163     new_estimated_ptp_time =
1164         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1165         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1166         external_time, rate_num, rate_den);
1167
1168     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1169     if (synced && ABS (discont) > max_discont) {
1170       GstClockTimeDiff offset;
1171       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1172           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1173           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1174           GST_TIME_ARGS (max_discont));
1175       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1176         offset = max_discont - discont;
1177         if (-offset > external_time)
1178           external_time = 0;
1179         else
1180           external_time += offset;
1181       } else {                  /* Too large a backward step - add a +ve offset */
1182         offset = -(max_discont + discont);
1183         external_time += offset;
1184       }
1185
1186       discont += offset;
1187     } else {
1188       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1189           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1190           GST_TIME_ARGS (max_discont));
1191     }
1192
1193     /* Check if the estimated sync time is now (still) inside our window */
1194     estimated_ptp_time_min = corrected_local_time - max_discont;
1195     estimated_ptp_time_min =
1196         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1197         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1198         external_time, rate_num, rate_den);
1199     estimated_ptp_time_max = corrected_local_time + max_discont;
1200     estimated_ptp_time_max =
1201         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1202         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1203         external_time, rate_num, rate_den);
1204
1205     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1206         && corrected_ptp_time < estimated_ptp_time_max);
1207
1208     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1209         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1210         GST_TIME_ARGS (corrected_ptp_time),
1211         GST_TIME_ARGS (estimated_ptp_time_max));
1212
1213     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1214       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1215           internal_time, external_time, rate_num, rate_den);
1216       domain->skipped_updates = 0;
1217
1218       domain->last_ptp_time = corrected_ptp_time;
1219       domain->last_local_time = corrected_local_time;
1220     } else {
1221       domain->skipped_updates++;
1222     }
1223   } else {
1224     domain->last_ptp_time = corrected_ptp_time;
1225     domain->last_local_time = corrected_local_time;
1226   }
1227
1228 #else
1229   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1230       GST_TIME_FORMAT, domain->domain,
1231       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1232
1233   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1234       &internal_time, &external_time, &rate_num, &rate_den);
1235
1236   estimated_ptp_time = corrected_local_time;
1237   estimated_ptp_time =
1238       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1239       (domain->domain_clock), estimated_ptp_time, internal_time,
1240       external_time, rate_num, rate_den);
1241
1242   gst_clock_add_observation (domain->domain_clock,
1243       corrected_local_time, corrected_ptp_time, &r_squared);
1244
1245   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1246       &internal_time, &external_time, &rate_num, &rate_den);
1247
1248   synced = TRUE;
1249   domain->last_ptp_time = corrected_ptp_time;
1250   domain->last_local_time = corrected_local_time;
1251 #endif
1252
1253 #ifdef USE_MEASUREMENT_FILTERING
1254 out:
1255 #endif
1256   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1257     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1258         "domain", G_TYPE_UINT, domain->domain,
1259         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1260         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1261         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1262         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1263         "discontinuity", G_TYPE_INT64, discont,
1264         "synced", G_TYPE_BOOLEAN, synced,
1265         "r-squared", G_TYPE_DOUBLE, r_squared,
1266         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1267         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1268         "rate-num", G_TYPE_UINT64, rate_num,
1269         "rate-den", G_TYPE_UINT64, rate_den,
1270         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1271         NULL);
1272     emit_ptp_statistics (domain->domain, stats);
1273     gst_structure_free (stats);
1274   }
1275
1276 }
1277
1278 #ifdef USE_MEDIAN_PRE_FILTERING
1279 static gint
1280 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1281 {
1282   if (*a < *b)
1283     return -1;
1284   else if (*a > *b)
1285     return 1;
1286   return 0;
1287 }
1288 #endif
1289
1290 static gboolean
1291 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1292 {
1293 #ifdef USE_MEDIAN_PRE_FILTERING
1294   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
1295   GstClockTime median;
1296   gint i;
1297 #endif
1298
1299   GstClockTime mean_path_delay, delay_req_delay = 0;
1300   gboolean ret;
1301
1302   /* IEEE 1588 11.3 */
1303   mean_path_delay =
1304       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1305       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1306       (sync->correction_field_sync + sync->correction_field_delay +
1307           32768) / 65536) / 2;
1308
1309 #ifdef USE_MEDIAN_PRE_FILTERING
1310   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
1311     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1312   domain->last_path_delays[i - 1] = mean_path_delay;
1313
1314   if (domain->last_path_delays_missing) {
1315     domain->last_path_delays_missing--;
1316   } else {
1317     memcpy (&last_path_delays, &domain->last_path_delays,
1318         sizeof (last_path_delays));
1319     g_qsort_with_data (&last_path_delays,
1320         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
1321         (GCompareDataFunc) compare_clock_time, NULL);
1322
1323     median = last_path_delays[MEDIAN_PRE_FILTERING_WINDOW / 2];
1324
1325     /* FIXME: We might want to use something else here, like only allowing
1326      * things in the interquartile range, or also filtering away delays that
1327      * are too small compared to the median. This here worked well enough
1328      * in tests so far.
1329      */
1330     if (mean_path_delay > 2 * median) {
1331       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1332           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1333           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1334       ret = FALSE;
1335       goto out;
1336     }
1337   }
1338 #endif
1339
1340 #ifdef USE_RUNNING_AVERAGE_DELAY
1341   /* Track an average round trip time, for a bit of smoothing */
1342   /* Always update before discarding a sample, so genuine changes in
1343    * the network get picked up, eventually */
1344   if (domain->mean_path_delay == 0)
1345     domain->mean_path_delay = mean_path_delay;
1346   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1347     domain->mean_path_delay =
1348         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1349   else
1350     domain->mean_path_delay =
1351         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1352 #else
1353   domain->mean_path_delay = mean_path_delay;
1354 #endif
1355
1356 #ifdef USE_MEASUREMENT_FILTERING
1357   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1358       domain->mean_path_delay != 0
1359       && sync->follow_up_recv_time_local >
1360       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1361     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1362         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1363         GST_TIME_ARGS (sync->follow_up_recv_time_local -
1364             sync->sync_recv_time_local),
1365         GST_TIME_ARGS (domain->mean_path_delay));
1366     ret = FALSE;
1367     goto out;
1368   }
1369
1370   if (mean_path_delay > 2 * domain->mean_path_delay) {
1371     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1372         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1373         GST_TIME_ARGS (mean_path_delay),
1374         GST_TIME_ARGS (domain->mean_path_delay));
1375     ret = FALSE;
1376     goto out;
1377   }
1378 #endif
1379
1380   delay_req_delay =
1381       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1382
1383 #ifdef USE_MEASUREMENT_FILTERING
1384   /* delay_req_delay is a RTT, so 2 times the path delay */
1385   if (delay_req_delay > 4 * domain->mean_path_delay) {
1386     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1387         GST_TIME_FORMAT " > 4 * %" GST_TIME_FORMAT, domain->domain,
1388         GST_TIME_ARGS (delay_req_delay),
1389         GST_TIME_ARGS (domain->mean_path_delay));
1390     ret = FALSE;
1391     goto out;
1392   }
1393 #endif
1394
1395   ret = TRUE;
1396
1397   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1398       GST_TIME_FORMAT ")", domain->domain,
1399       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1400   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1401       domain->domain, GST_TIME_ARGS (delay_req_delay));
1402
1403 #ifdef USE_MEASUREMENT_FILTERING
1404 out:
1405 #endif
1406   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1407     GstStructure *stats =
1408         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1409         "domain", G_TYPE_UINT, domain->domain,
1410         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1411         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1412         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1413     emit_ptp_statistics (domain->domain, stats);
1414     gst_structure_free (stats);
1415   }
1416
1417   return ret;
1418 }
1419
1420 static void
1421 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1422 {
1423   GList *l;
1424   PtpDomainData *domain = NULL;
1425   PtpPendingSync *sync = NULL;
1426
1427   /* Don't consider messages with the alternate master flag set */
1428   if ((msg->flag_field & 0x0100))
1429     return;
1430
1431   for (l = domain_data; l; l = l->next) {
1432     PtpDomainData *tmp = l->data;
1433
1434     if (msg->domain_number == tmp->domain) {
1435       domain = tmp;
1436       break;
1437     }
1438   }
1439
1440   if (!domain) {
1441     gchar *clock_name;
1442
1443     domain = g_new0 (PtpDomainData, 1);
1444     domain->domain = msg->domain_number;
1445     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1446     domain->domain_clock =
1447         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1448     g_free (clock_name);
1449     g_queue_init (&domain->pending_syncs);
1450     domain->last_path_delays_missing = 9;
1451     domain_data = g_list_prepend (domain_data, domain);
1452
1453     g_mutex_lock (&domain_clocks_lock);
1454     domain_clocks = g_list_prepend (domain_clocks, domain);
1455     g_mutex_unlock (&domain_clocks_lock);
1456   }
1457
1458   /* If we have a master clock, ignore this message if it's not coming from there */
1459   if (domain->have_master_clock
1460       && compare_clock_identity (&domain->master_clock_identity,
1461           &msg->source_port_identity) != 0)
1462     return;
1463
1464 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1465   /* Opportunistic selection of master clock */
1466   if (!domain->have_master_clock)
1467     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1468         sizeof (PtpClockIdentity));
1469 #else
1470   if (!domain->have_master_clock)
1471     return;
1472 #endif
1473
1474   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1475
1476   /* Check if duplicated */
1477   for (l = domain->pending_syncs.head; l; l = l->next) {
1478     PtpPendingSync *tmp = l->data;
1479
1480     if (tmp->sync_seqnum == msg->sequence_id)
1481       return;
1482   }
1483
1484   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1485       GST_CLOCK_TIME_NONE / GST_SECOND) {
1486     GST_FIXME ("Unsupported sync message seconds field value: %"
1487         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1488         msg->message_specific.sync.origin_timestamp.seconds_field,
1489         GST_CLOCK_TIME_NONE / GST_SECOND);
1490     return;
1491   }
1492
1493   sync = g_new0 (PtpPendingSync, 1);
1494   sync->domain = domain->domain;
1495   sync->sync_seqnum = msg->sequence_id;
1496   sync->sync_recv_time_local = receive_time;
1497   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1498   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1499   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1500   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1501   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1502
1503   /* 0.5 correction factor for division later */
1504   sync->correction_field_sync = msg->correction_field;
1505
1506   if ((msg->flag_field & 0x0200)) {
1507     /* Wait for FOLLOW_UP */
1508   } else {
1509     sync->sync_send_time_remote =
1510         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1511         sync.origin_timestamp);
1512
1513     if (domain->last_ptp_sync_time != 0
1514         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1515       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1516           GST_TIME_FORMAT, domain->domain,
1517           GST_TIME_ARGS (domain->last_ptp_sync_time),
1518           GST_TIME_ARGS (sync->sync_send_time_remote));
1519       ptp_pending_sync_free (sync);
1520       sync = NULL;
1521       return;
1522     }
1523     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1524
1525     if (send_delay_req (domain, sync)) {
1526       /* Sent delay request */
1527     } else {
1528       update_ptp_time (domain, sync);
1529       ptp_pending_sync_free (sync);
1530       sync = NULL;
1531     }
1532   }
1533
1534   if (sync)
1535     g_queue_push_tail (&domain->pending_syncs, sync);
1536 }
1537
1538 static void
1539 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1540 {
1541   GList *l;
1542   PtpDomainData *domain = NULL;
1543   PtpPendingSync *sync = NULL;
1544
1545   /* Don't consider messages with the alternate master flag set */
1546   if ((msg->flag_field & 0x0100))
1547     return;
1548
1549   for (l = domain_data; l; l = l->next) {
1550     PtpDomainData *tmp = l->data;
1551
1552     if (msg->domain_number == tmp->domain) {
1553       domain = tmp;
1554       break;
1555     }
1556   }
1557
1558   if (!domain)
1559     return;
1560
1561   /* If we have a master clock, ignore this message if it's not coming from there */
1562   if (domain->have_master_clock
1563       && compare_clock_identity (&domain->master_clock_identity,
1564           &msg->source_port_identity) != 0)
1565     return;
1566
1567   /* Check if we know about this one */
1568   for (l = domain->pending_syncs.head; l; l = l->next) {
1569     PtpPendingSync *tmp = l->data;
1570
1571     if (tmp->sync_seqnum == msg->sequence_id) {
1572       sync = tmp;
1573       break;
1574     }
1575   }
1576
1577   if (!sync)
1578     return;
1579
1580   /* Got a FOLLOW_UP for this already */
1581   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE)
1582     return;
1583
1584   if (sync->sync_recv_time_local >= receive_time) {
1585     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1586         GST_TIME_FORMAT, domain->domain,
1587         GST_TIME_ARGS (sync->sync_recv_time_local),
1588         GST_TIME_ARGS (receive_time));
1589     g_queue_remove (&domain->pending_syncs, sync);
1590     ptp_pending_sync_free (sync);
1591     return;
1592   }
1593
1594   sync->correction_field_sync += msg->correction_field;
1595   sync->sync_send_time_remote =
1596       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1597       follow_up.precise_origin_timestamp);
1598   sync->follow_up_recv_time_local = receive_time;
1599
1600   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1601     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1602         GST_TIME_FORMAT, domain->domain,
1603         GST_TIME_ARGS (domain->last_ptp_sync_time),
1604         GST_TIME_ARGS (sync->sync_send_time_remote));
1605     g_queue_remove (&domain->pending_syncs, sync);
1606     ptp_pending_sync_free (sync);
1607     sync = NULL;
1608     return;
1609   }
1610   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1611
1612   if (send_delay_req (domain, sync)) {
1613     /* Sent delay request */
1614   } else {
1615     update_ptp_time (domain, sync);
1616     g_queue_remove (&domain->pending_syncs, sync);
1617     ptp_pending_sync_free (sync);
1618     sync = NULL;
1619   }
1620 }
1621
1622 static void
1623 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1624 {
1625   GList *l;
1626   PtpDomainData *domain = NULL;
1627   PtpPendingSync *sync = NULL;
1628
1629   /* Don't consider messages with the alternate master flag set */
1630   if ((msg->flag_field & 0x0100))
1631     return;
1632
1633   for (l = domain_data; l; l = l->next) {
1634     PtpDomainData *tmp = l->data;
1635
1636     if (msg->domain_number == tmp->domain) {
1637       domain = tmp;
1638       break;
1639     }
1640   }
1641
1642   if (!domain)
1643     return;
1644
1645   /* If we have a master clock, ignore this message if it's not coming from there */
1646   if (domain->have_master_clock
1647       && compare_clock_identity (&domain->master_clock_identity,
1648           &msg->source_port_identity) != 0)
1649     return;
1650
1651   /* Not for us */
1652   if (msg->message_specific.delay_resp.
1653       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1654       || msg->message_specific.delay_resp.
1655       requesting_port_identity.port_number != ptp_clock_id.port_number)
1656     return;
1657
1658   domain->min_delay_req_interval =
1659       log2_to_clock_time (msg->log_message_interval);
1660
1661   /* Check if we know about this one */
1662   for (l = domain->pending_syncs.head; l; l = l->next) {
1663     PtpPendingSync *tmp = l->data;
1664
1665     if (tmp->delay_req_seqnum == msg->sequence_id) {
1666       sync = tmp;
1667       break;
1668     }
1669   }
1670
1671   if (!sync)
1672     return;
1673
1674   /* Got a DELAY_RESP for this already */
1675   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1676     return;
1677
1678   if (sync->delay_req_send_time_local > receive_time) {
1679     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1680         GST_TIME_FORMAT, domain->domain,
1681         GST_TIME_ARGS (sync->delay_req_send_time_local),
1682         GST_TIME_ARGS (receive_time));
1683     g_queue_remove (&domain->pending_syncs, sync);
1684     ptp_pending_sync_free (sync);
1685     return;
1686   }
1687
1688   sync->correction_field_delay = msg->correction_field;
1689
1690   sync->delay_req_recv_time_remote =
1691       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1692       delay_resp.receive_timestamp);
1693   sync->delay_resp_recv_time_local = receive_time;
1694
1695   if (domain->mean_path_delay != 0
1696       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1697     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1698         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1699         GST_TIME_ARGS (sync->sync_send_time_remote),
1700         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1701     g_queue_remove (&domain->pending_syncs, sync);
1702     ptp_pending_sync_free (sync);
1703     return;
1704   }
1705
1706   if (update_mean_path_delay (domain, sync))
1707     update_ptp_time (domain, sync);
1708   g_queue_remove (&domain->pending_syncs, sync);
1709   ptp_pending_sync_free (sync);
1710 }
1711
1712 static void
1713 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1714 {
1715   /* Ignore our own messages */
1716   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1717       msg->source_port_identity.port_number == ptp_clock_id.port_number)
1718     return;
1719
1720   switch (msg->message_type) {
1721     case PTP_MESSAGE_TYPE_ANNOUNCE:
1722       handle_announce_message (msg, receive_time);
1723       break;
1724     case PTP_MESSAGE_TYPE_SYNC:
1725       handle_sync_message (msg, receive_time);
1726       break;
1727     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1728       handle_follow_up_message (msg, receive_time);
1729       break;
1730     case PTP_MESSAGE_TYPE_DELAY_RESP:
1731       handle_delay_resp_message (msg, receive_time);
1732       break;
1733     default:
1734       break;
1735   }
1736 }
1737
1738 static gboolean
1739 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1740     gpointer user_data)
1741 {
1742   GIOStatus status;
1743   StdIOHeader header;
1744   gchar buffer[8192];
1745   GError *err = NULL;
1746   gsize read;
1747
1748   if ((condition & G_IO_STATUS_EOF)) {
1749     GST_ERROR ("Got EOF on stdin");
1750     g_main_loop_quit (main_loop);
1751     return G_SOURCE_REMOVE;
1752   }
1753
1754   status =
1755       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1756       &read, &err);
1757   if (status == G_IO_STATUS_ERROR) {
1758     GST_ERROR ("Failed to read from stdin: %s", err->message);
1759     g_clear_error (&err);
1760     g_main_loop_quit (main_loop);
1761     return G_SOURCE_REMOVE;
1762   } else if (status == G_IO_STATUS_EOF) {
1763     GST_ERROR ("Got EOF on stdin");
1764     g_main_loop_quit (main_loop);
1765     return G_SOURCE_REMOVE;
1766   } else if (status != G_IO_STATUS_NORMAL) {
1767     GST_ERROR ("Unexpected stdin read status: %d", status);
1768     g_main_loop_quit (main_loop);
1769     return G_SOURCE_REMOVE;
1770   } else if (read != sizeof (header)) {
1771     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1772     g_main_loop_quit (main_loop);
1773     return G_SOURCE_REMOVE;
1774   } else if (header.size > 8192) {
1775     GST_ERROR ("Unexpected size: %u", header.size);
1776     g_main_loop_quit (main_loop);
1777     return G_SOURCE_REMOVE;
1778   }
1779
1780   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1781   if (status == G_IO_STATUS_ERROR) {
1782     GST_ERROR ("Failed to read from stdin: %s", err->message);
1783     g_clear_error (&err);
1784     g_main_loop_quit (main_loop);
1785     return G_SOURCE_REMOVE;
1786   } else if (status == G_IO_STATUS_EOF) {
1787     GST_ERROR ("EOF on stdin");
1788     g_main_loop_quit (main_loop);
1789     return G_SOURCE_REMOVE;
1790   } else if (status != G_IO_STATUS_NORMAL) {
1791     GST_ERROR ("Unexpected stdin read status: %d", status);
1792     g_main_loop_quit (main_loop);
1793     return G_SOURCE_REMOVE;
1794   } else if (read != header.size) {
1795     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1796     g_main_loop_quit (main_loop);
1797     return G_SOURCE_REMOVE;
1798   }
1799
1800   switch (header.type) {
1801     case TYPE_EVENT:
1802     case TYPE_GENERAL:{
1803       GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
1804       PtpMessage msg;
1805
1806       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1807         dump_ptp_message (&msg);
1808         handle_ptp_message (&msg, receive_time);
1809       }
1810       break;
1811     }
1812     default:
1813     case TYPE_CLOCK_ID:{
1814       if (header.size != 8) {
1815         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1816         g_main_loop_quit (main_loop);
1817         return G_SOURCE_REMOVE;
1818       }
1819       g_mutex_lock (&ptp_lock);
1820       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1821       ptp_clock_id.port_number = getpid ();
1822       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1823           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1824       g_cond_signal (&ptp_cond);
1825       g_mutex_unlock (&ptp_lock);
1826       break;
1827     }
1828   }
1829
1830   return G_SOURCE_CONTINUE;
1831 }
1832
1833 /* Cleanup all announce messages and announce message senders
1834  * that are timed out by now, and clean up all pending syncs
1835  * that are missing their FOLLOW_UP or DELAY_RESP */
1836 static gboolean
1837 cleanup_cb (gpointer data)
1838 {
1839   GstClockTime now = gst_clock_get_time (observation_system_clock);
1840   GList *l, *m, *n;
1841
1842   for (l = domain_data; l; l = l->next) {
1843     PtpDomainData *domain = l->data;
1844
1845     for (n = domain->announce_senders; n;) {
1846       PtpAnnounceSender *sender = n->data;
1847       gboolean timed_out = TRUE;
1848
1849       /* Keep only 5 messages per sender around */
1850       while (g_queue_get_length (&sender->announce_messages) > 5) {
1851         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1852         g_free (msg);
1853       }
1854
1855       for (m = sender->announce_messages.head; m; m = m->next) {
1856         PtpAnnounceMessage *msg = m->data;
1857
1858         if (msg->receive_time +
1859             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1860           timed_out = FALSE;
1861           break;
1862         }
1863       }
1864
1865       if (timed_out) {
1866         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1867             sender->master_clock_identity.clock_identity,
1868             sender->master_clock_identity.port_number);
1869         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1870         g_queue_clear (&sender->announce_messages);
1871       }
1872
1873       if (g_queue_get_length (&sender->announce_messages) == 0) {
1874         GList *tmp = n->next;
1875
1876         if (compare_clock_identity (&sender->master_clock_identity,
1877                 &domain->master_clock_identity) == 0)
1878           GST_WARNING ("currently selected master clock timed out");
1879         g_free (sender);
1880         domain->announce_senders =
1881             g_list_delete_link (domain->announce_senders, n);
1882         n = tmp;
1883       } else {
1884         n = n->next;
1885       }
1886     }
1887     select_best_master_clock (domain, now);
1888
1889     /* Clean up any pending syncs */
1890     for (n = domain->pending_syncs.head; n;) {
1891       PtpPendingSync *sync = n->data;
1892       gboolean timed_out = FALSE;
1893
1894       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1895        * and pending delay reqs after 4 delay req intervals or 10 seconds
1896        */
1897       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1898           ((domain->min_delay_req_interval != 0
1899                   && sync->delay_req_send_time_local +
1900                   4 * domain->min_delay_req_interval < now)
1901               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1902         timed_out = TRUE;
1903       } else if ((domain->sync_interval != 0
1904               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
1905           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
1906         timed_out = TRUE;
1907       }
1908
1909       if (timed_out) {
1910         GList *tmp = n->next;
1911         ptp_pending_sync_free (sync);
1912         g_queue_delete_link (&domain->pending_syncs, n);
1913         n = tmp;
1914       } else {
1915         n = n->next;
1916       }
1917     }
1918   }
1919
1920   return G_SOURCE_CONTINUE;
1921 }
1922
1923 static gpointer
1924 ptp_helper_main (gpointer data)
1925 {
1926   GSource *cleanup_source;
1927
1928   GST_DEBUG ("Starting PTP helper loop");
1929
1930   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
1931   cleanup_source = g_timeout_source_new_seconds (5);
1932   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
1933   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
1934   g_source_attach (cleanup_source, main_context);
1935   g_source_unref (cleanup_source);
1936
1937   g_main_loop_run (main_loop);
1938   GST_DEBUG ("Stopped PTP helper loop");
1939
1940   g_mutex_lock (&ptp_lock);
1941   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
1942   ptp_clock_id.port_number = 0;
1943   initted = FALSE;
1944   g_cond_signal (&ptp_cond);
1945   g_mutex_unlock (&ptp_lock);
1946
1947   return NULL;
1948 }
1949
1950 /**
1951  * gst_ptp_is_supported:
1952  *
1953  * Check if PTP clocks are generally supported on this system, and if previous
1954  * initializations did not fail.
1955  *
1956  * Returns: %TRUE if PTP clocks are generally supported on this system, and
1957  * previous initializations did not fail.
1958  *
1959  * Since: 1.6
1960  */
1961 gboolean
1962 gst_ptp_is_supported (void)
1963 {
1964   return supported;
1965 }
1966
1967 /**
1968  * gst_ptp_is_initialized:
1969  *
1970  * Check if the GStreamer PTP clock subsystem is initialized.
1971  *
1972  * Returns: %TRUE if the GStreamer PTP clock subsystem is intialized.
1973  *
1974  * Since: 1.6
1975  */
1976 gboolean
1977 gst_ptp_is_initialized (void)
1978 {
1979   return initted;
1980 }
1981
1982 /**
1983  * gst_ptp_init:
1984  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
1985  * @interfaces: (transfer none) (array zero-terminated=1) (allow-none): network interfaces to run the clock on
1986  *
1987  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
1988  * slave-only mode for all domains on the given @interfaces with the
1989  * given @clock_id.
1990  *
1991  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
1992  * generated from the MAC address of the first network interface.
1993  *
1994  *
1995  * This function is automatically called by gst_ptp_clock_new() with default
1996  * parameters if it wasn't called before.
1997  *
1998  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
1999  *
2000  * Since: 1.6
2001  */
2002 gboolean
2003 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
2004 {
2005   gboolean ret;
2006   const gchar *env;
2007   gchar **argv = NULL;
2008   gint argc, argc_c;
2009   gint fd_r, fd_w;
2010   GError *err = NULL;
2011   GSource *stdin_source;
2012
2013   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
2014
2015   g_mutex_lock (&ptp_lock);
2016   if (!supported) {
2017     GST_ERROR ("PTP not supported");
2018     ret = FALSE;
2019     goto done;
2020   }
2021
2022   if (initted) {
2023     GST_DEBUG ("PTP already initialized");
2024     ret = TRUE;
2025     goto done;
2026   }
2027
2028   if (ptp_helper_pid) {
2029     GST_DEBUG ("PTP currently initializing");
2030     goto wait;
2031   }
2032
2033   if (!domain_stats_hooks_initted) {
2034     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2035     domain_stats_hooks_initted = TRUE;
2036   }
2037
2038   argc = 1;
2039   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2040     argc += 2;
2041   if (interfaces != NULL)
2042     argc += 2 * g_strv_length (interfaces);
2043
2044   argv = g_new0 (gchar *, argc + 2);
2045   argc_c = 0;
2046
2047   env = g_getenv ("GST_PTP_HELPER_1_0");
2048   if (env == NULL)
2049     env = g_getenv ("GST_PTP_HELPER");
2050   if (env != NULL && *env != '\0') {
2051     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2052     argv[argc_c++] = g_strdup (env);
2053   } else {
2054     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2055   }
2056
2057   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2058     argv[argc_c++] = g_strdup ("-c");
2059     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2060   }
2061
2062   if (interfaces != NULL) {
2063     gchar **ptr = interfaces;
2064
2065     while (*ptr) {
2066       argv[argc_c++] = g_strdup ("-i");
2067       argv[argc_c++] = g_strdup (*ptr);
2068       ptr++;
2069     }
2070   }
2071
2072   main_context = g_main_context_new ();
2073   main_loop = g_main_loop_new (main_context, FALSE);
2074
2075   ptp_helper_thread =
2076       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2077   if (!ptp_helper_thread) {
2078     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2079     g_clear_error (&err);
2080     ret = FALSE;
2081     goto done;
2082   }
2083
2084   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2085           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2086     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2087     g_clear_error (&err);
2088     ret = FALSE;
2089     supported = FALSE;
2090     goto done;
2091   }
2092
2093   stdin_channel = g_io_channel_unix_new (fd_r);
2094   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2095   g_io_channel_set_buffered (stdin_channel, FALSE);
2096   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2097   stdin_source =
2098       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2099   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2100   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2101       NULL);
2102   g_source_attach (stdin_source, main_context);
2103   g_source_unref (stdin_source);
2104
2105   /* Create stdout channel */
2106   stdout_channel = g_io_channel_unix_new (fd_w);
2107   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2108   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2109   g_io_channel_set_buffered (stdout_channel, FALSE);
2110
2111   delay_req_rand = g_rand_new ();
2112   observation_system_clock =
2113       g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
2114       NULL);
2115
2116   initted = TRUE;
2117
2118 wait:
2119   GST_DEBUG ("Waiting for PTP to be initialized");
2120
2121   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2122     g_cond_wait (&ptp_cond, &ptp_lock);
2123
2124   ret = initted;
2125   if (ret) {
2126     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2127         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2128   } else {
2129     GST_ERROR ("Failed to initialize");
2130     supported = FALSE;
2131   }
2132
2133 done:
2134   g_strfreev (argv);
2135
2136   if (!ret) {
2137     if (ptp_helper_pid) {
2138 #ifndef G_OS_WIN32
2139       kill (ptp_helper_pid, SIGKILL);
2140       waitpid (ptp_helper_pid, NULL, 0);
2141 #else
2142       TerminateProcess (ptp_helper_pid, 1);
2143       WaitForSingleObject (ptp_helper_pid, INFINITE);
2144 #endif
2145       g_spawn_close_pid (ptp_helper_pid);
2146     }
2147     ptp_helper_pid = 0;
2148
2149     if (stdin_channel)
2150       g_io_channel_unref (stdin_channel);
2151     stdin_channel = NULL;
2152     if (stdout_channel)
2153       g_io_channel_unref (stdout_channel);
2154     stdout_channel = NULL;
2155
2156     if (main_loop && ptp_helper_thread) {
2157       g_main_loop_quit (main_loop);
2158       g_thread_join (ptp_helper_thread);
2159     }
2160     ptp_helper_thread = NULL;
2161     if (main_loop)
2162       g_main_loop_unref (main_loop);
2163     main_loop = NULL;
2164     if (main_context)
2165       g_main_context_unref (main_context);
2166     main_context = NULL;
2167
2168     if (delay_req_rand)
2169       g_rand_free (delay_req_rand);
2170     delay_req_rand = NULL;
2171
2172     if (observation_system_clock)
2173       gst_object_unref (observation_system_clock);
2174     observation_system_clock = NULL;
2175   }
2176
2177   g_mutex_unlock (&ptp_lock);
2178
2179   return ret;
2180 }
2181
2182 /**
2183  * gst_ptp_deinit:
2184  *
2185  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2186  * are any remaining GstPtpClock instances, they won't be further synchronized
2187  * to the PTP network clock.
2188  *
2189  * Since: 1.6
2190  */
2191 void
2192 gst_ptp_deinit (void)
2193 {
2194   GList *l, *m;
2195
2196   g_mutex_lock (&ptp_lock);
2197
2198   if (ptp_helper_pid) {
2199 #ifndef G_OS_WIN32
2200     kill (ptp_helper_pid, SIGKILL);
2201     waitpid (ptp_helper_pid, NULL, 0);
2202 #else
2203     TerminateProcess (ptp_helper_pid, 1);
2204     WaitForSingleObject (ptp_helper_pid, INFINITE);
2205 #endif
2206     g_spawn_close_pid (ptp_helper_pid);
2207   }
2208   ptp_helper_pid = 0;
2209
2210   if (stdin_channel)
2211     g_io_channel_unref (stdin_channel);
2212   stdin_channel = NULL;
2213   if (stdout_channel)
2214     g_io_channel_unref (stdout_channel);
2215   stdout_channel = NULL;
2216
2217   if (main_loop && ptp_helper_thread) {
2218     GThread *tmp = ptp_helper_thread;
2219     ptp_helper_thread = NULL;
2220     g_mutex_unlock (&ptp_lock);
2221     g_main_loop_quit (main_loop);
2222     g_thread_join (tmp);
2223     g_mutex_lock (&ptp_lock);
2224   }
2225   if (main_loop)
2226     g_main_loop_unref (main_loop);
2227   main_loop = NULL;
2228   if (main_context)
2229     g_main_context_unref (main_context);
2230   main_context = NULL;
2231
2232   if (delay_req_rand)
2233     g_rand_free (delay_req_rand);
2234   delay_req_rand = NULL;
2235   if (observation_system_clock)
2236     gst_object_unref (observation_system_clock);
2237   observation_system_clock = NULL;
2238
2239   for (l = domain_data; l; l = l->next) {
2240     PtpDomainData *domain = l->data;
2241
2242     for (m = domain->announce_senders; m; m = m->next) {
2243       PtpAnnounceSender *sender = m->data;
2244
2245       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2246       g_queue_clear (&sender->announce_messages);
2247       g_free (sender);
2248     }
2249     g_list_free (domain->announce_senders);
2250
2251     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2252         NULL);
2253     g_queue_clear (&domain->pending_syncs);
2254     gst_object_unref (domain->domain_clock);
2255     g_free (domain);
2256   }
2257   g_list_free (domain_data);
2258   domain_data = NULL;
2259   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2260   g_list_free (domain_clocks);
2261   domain_clocks = NULL;
2262
2263   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2264   ptp_clock_id.port_number = 0;
2265
2266   initted = FALSE;
2267
2268   g_mutex_unlock (&ptp_lock);
2269 }
2270
2271 #define DEFAULT_DOMAIN 0
2272
2273 enum
2274 {
2275   PROP_0,
2276   PROP_DOMAIN,
2277   PROP_INTERNAL_CLOCK,
2278   PROP_MASTER_CLOCK_ID,
2279   PROP_GRANDMASTER_CLOCK_ID
2280 };
2281
2282 #define GST_PTP_CLOCK_GET_PRIVATE(obj)  \
2283   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PTP_CLOCK, GstPtpClockPrivate))
2284
2285 struct _GstPtpClockPrivate
2286 {
2287   guint domain;
2288   GstClock *domain_clock;
2289   gulong domain_stats_id;
2290 };
2291
2292 #define gst_ptp_clock_parent_class parent_class
2293 G_DEFINE_TYPE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2294
2295 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2296     const GValue * value, GParamSpec * pspec);
2297 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2298     GValue * value, GParamSpec * pspec);
2299 static void gst_ptp_clock_finalize (GObject * object);
2300
2301 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2302
2303 static void
2304 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2305 {
2306   GObjectClass *gobject_class;
2307   GstClockClass *clock_class;
2308
2309   gobject_class = G_OBJECT_CLASS (klass);
2310   clock_class = GST_CLOCK_CLASS (klass);
2311
2312   g_type_class_add_private (klass, sizeof (GstPtpClockPrivate));
2313
2314   gobject_class->finalize = gst_ptp_clock_finalize;
2315   gobject_class->get_property = gst_ptp_clock_get_property;
2316   gobject_class->set_property = gst_ptp_clock_set_property;
2317
2318   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2319       g_param_spec_uint ("domain", "Domain",
2320           "The PTP domain", 0, G_MAXUINT8,
2321           DEFAULT_DOMAIN,
2322           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2323
2324   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2325       g_param_spec_object ("internal-clock", "Internal Clock",
2326           "Internal clock", GST_TYPE_CLOCK,
2327           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2328
2329   g_object_class_install_property (gobject_class, PROP_MASTER_CLOCK_ID,
2330       g_param_spec_uint64 ("master-clock-id", "Master Clock ID",
2331           "Master Clock ID", 0, G_MAXUINT64, 0,
2332           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2333
2334   g_object_class_install_property (gobject_class, PROP_GRANDMASTER_CLOCK_ID,
2335       g_param_spec_uint64 ("grandmaster-clock-id", "Grand Master Clock ID",
2336           "Grand Master Clock ID", 0, G_MAXUINT64, 0,
2337           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2338
2339   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2340 }
2341
2342 static void
2343 gst_ptp_clock_init (GstPtpClock * self)
2344 {
2345   GstPtpClockPrivate *priv;
2346
2347   self->priv = priv = GST_PTP_CLOCK_GET_PRIVATE (self);
2348
2349   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2350   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2351
2352   priv->domain = DEFAULT_DOMAIN;
2353 }
2354
2355 static gboolean
2356 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2357 {
2358   gboolean got_clock = TRUE;
2359
2360   if (G_UNLIKELY (!self->priv->domain_clock)) {
2361     g_mutex_lock (&domain_clocks_lock);
2362     if (!self->priv->domain_clock) {
2363       GList *l;
2364
2365       got_clock = FALSE;
2366
2367       for (l = domain_clocks; l; l = l->next) {
2368         PtpDomainData *clock_data = l->data;
2369
2370         if (clock_data->domain == self->priv->domain
2371             && clock_data->last_ptp_time != 0) {
2372           self->priv->domain_clock = clock_data->domain_clock;
2373           got_clock = TRUE;
2374           break;
2375         }
2376       }
2377     }
2378     g_mutex_unlock (&domain_clocks_lock);
2379     if (got_clock) {
2380       g_object_notify (G_OBJECT (self), "internal-clock");
2381       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2382     }
2383   }
2384
2385   return got_clock;
2386 }
2387
2388 static gboolean
2389 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2390     gpointer user_data)
2391 {
2392   GstPtpClock *self = user_data;
2393
2394   if (domain != self->priv->domain
2395       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2396     return TRUE;
2397
2398   /* Let's set our internal clock */
2399   if (!gst_ptp_clock_ensure_domain_clock (self))
2400     return TRUE;
2401
2402   self->priv->domain_stats_id = 0;
2403
2404   return FALSE;
2405 }
2406
2407 static void
2408 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2409     const GValue * value, GParamSpec * pspec)
2410 {
2411   GstPtpClock *self = GST_PTP_CLOCK (object);
2412
2413   switch (prop_id) {
2414     case PROP_DOMAIN:
2415       self->priv->domain = g_value_get_uint (value);
2416       gst_ptp_clock_ensure_domain_clock (self);
2417       if (!self->priv->domain_clock)
2418         self->priv->domain_stats_id =
2419             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2420             NULL);
2421       break;
2422     default:
2423       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2424       break;
2425   }
2426 }
2427
2428 static void
2429 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2430     GValue * value, GParamSpec * pspec)
2431 {
2432   GstPtpClock *self = GST_PTP_CLOCK (object);
2433
2434   switch (prop_id) {
2435     case PROP_DOMAIN:
2436       g_value_set_uint (value, self->priv->domain);
2437       break;
2438     case PROP_INTERNAL_CLOCK:
2439       gst_ptp_clock_ensure_domain_clock (self);
2440       g_value_set_object (value, self->priv->domain_clock);
2441       break;
2442     case PROP_MASTER_CLOCK_ID:
2443     case PROP_GRANDMASTER_CLOCK_ID:{
2444       GList *l;
2445
2446       g_mutex_lock (&domain_clocks_lock);
2447       g_value_set_uint64 (value, 0);
2448
2449       for (l = domain_clocks; l; l = l->next) {
2450         PtpDomainData *clock_data = l->data;
2451
2452         if (clock_data->domain == self->priv->domain) {
2453           if (prop_id == PROP_MASTER_CLOCK_ID)
2454             g_value_set_uint64 (value,
2455                 clock_data->master_clock_identity.clock_identity);
2456           else
2457             g_value_set_uint64 (value, clock_data->grandmaster_identity);
2458           break;
2459         }
2460       }
2461       g_mutex_unlock (&domain_clocks_lock);
2462       break;
2463     }
2464     default:
2465       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2466       break;
2467   }
2468 }
2469
2470 static void
2471 gst_ptp_clock_finalize (GObject * object)
2472 {
2473   GstPtpClock *self = GST_PTP_CLOCK (object);
2474
2475   if (self->priv->domain_stats_id)
2476     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2477
2478   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2479 }
2480
2481 static GstClockTime
2482 gst_ptp_clock_get_internal_time (GstClock * clock)
2483 {
2484   GstPtpClock *self = GST_PTP_CLOCK (clock);
2485
2486   gst_ptp_clock_ensure_domain_clock (self);
2487
2488   if (!self->priv->domain_clock) {
2489     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2490         self->priv->domain);
2491     return GST_CLOCK_TIME_NONE;
2492   }
2493
2494   return gst_clock_get_time (self->priv->domain_clock);
2495 }
2496
2497 /**
2498  * gst_ptp_clock_new:
2499  * @name: Name of the clock
2500  * @domain: PTP domain
2501  *
2502  * Creates a new PTP clock instance that exports the PTP time of the master
2503  * clock in @domain. This clock can be slaved to other clocks as needed.
2504  *
2505  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2506  * default parameters.
2507  *
2508  *
2509  * This clock only returns valid timestamps after it received the first
2510  * times from the PTP master clock on the network. Once this happens the
2511  * GstPtpClock::internal-clock property will become non-NULL. You can
2512  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
2513  * gst_clock_is_synced().
2514  *
2515  * Since: 1.6
2516  */
2517 GstClock *
2518 gst_ptp_clock_new (const gchar * name, guint domain)
2519 {
2520   g_return_val_if_fail (name != NULL, NULL);
2521   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2522
2523   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2524     GST_ERROR ("Failed to initialize PTP");
2525     return NULL;
2526   }
2527
2528   return g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2529       NULL);
2530 }
2531
2532 typedef struct
2533 {
2534   guint8 domain;
2535   const GstStructure *stats;
2536 } DomainStatsMarshalData;
2537
2538 static void
2539 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2540 {
2541   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2542
2543   if (!callback (data->domain, data->stats, hook->data))
2544     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2545 }
2546
2547 static void
2548 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2549 {
2550   DomainStatsMarshalData data = { domain, stats };
2551
2552   g_mutex_lock (&ptp_lock);
2553   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2554       (GHookMarshaller) domain_stats_marshaller, &data);
2555   g_mutex_unlock (&ptp_lock);
2556 }
2557
2558 /**
2559  * gst_ptp_statistics_callback_add:
2560  * @callback: GstPtpStatisticsCallback to call
2561  * @user_data: Data to pass to the callback
2562  * @destroy_data: GDestroyNotify to destroy the data
2563  *
2564  * Installs a new statistics callback for gathering PTP statistics. See
2565  * GstPtpStatisticsCallback for a list of statistics that are provided.
2566  *
2567  * Returns: Id for the callback that can be passed to
2568  * gst_ptp_statistics_callback_remove()
2569  *
2570  * Since: 1.6
2571  */
2572 gulong
2573 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2574     gpointer user_data, GDestroyNotify destroy_data)
2575 {
2576   GHook *hook;
2577
2578   g_mutex_lock (&ptp_lock);
2579
2580   if (!domain_stats_hooks_initted) {
2581     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2582     domain_stats_hooks_initted = TRUE;
2583   }
2584
2585   hook = g_hook_alloc (&domain_stats_hooks);
2586   hook->func = callback;
2587   hook->data = user_data;
2588   hook->destroy = destroy_data;
2589   g_hook_prepend (&domain_stats_hooks, hook);
2590   g_atomic_int_add (&domain_stats_n_hooks, 1);
2591
2592   g_mutex_unlock (&ptp_lock);
2593
2594   return hook->hook_id;
2595 }
2596
2597 /**
2598  * gst_ptp_statistics_callback_remove:
2599  * @id: Callback id to remove
2600  *
2601  * Removes a PTP statistics callback that was previously added with
2602  * gst_ptp_statistics_callback_add().
2603  *
2604  * Since: 1.6
2605  */
2606 void
2607 gst_ptp_statistics_callback_remove (gulong id)
2608 {
2609   g_mutex_lock (&ptp_lock);
2610   if (g_hook_destroy (&domain_stats_hooks, id))
2611     g_atomic_int_add (&domain_stats_n_hooks, -1);
2612   g_mutex_unlock (&ptp_lock);
2613 }