ptpclock: fix compilation
[platform/upstream/gstreamer.git] / libs / gst / net / gstptpclock.c
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @short_description: Special clock that synchronizes to a remote time
23  *                     provider via PTP (IEEE1588:2008).
24  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
25  *
26  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
27  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
28  * clock in some specific domain.
29  *
30  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
31  * a helper process to do the actual communication via the PTP ports. This is
32  * required as PTP listens on ports < 1024 and thus requires special
33  * privileges. Once this helper process is started, the main process will
34  * synchronize to all PTP domains that are detected on the selected
35  * interfaces.
36  *
37  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
38  * time from a master clock inside a specific PTP domain. This clock will only
39  * return valid timestamps once the timestamps in the PTP domain are known. To
40  * check this, the GstPtpClock::internal-clock property and the related
41  * notify::clock signal can be used. Once the internal clock is not NULL, the
42  * PTP domain's time is known. Alternatively you can wait for this with
43  * gst_ptp_clock_wait_ready().
44  *
45  *
46  * To gather statistics about the PTP clock synchronization,
47  * gst_ptp_statistics_callback_add() can be used. This gives the application
48  * the possibility to collect all kinds of statistics from the clock
49  * synchronization.
50  *
51  * Since: 1.6
52  *
53  */
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include "gstptpclock.h"
59
60 #ifdef HAVE_PTP
61
62 #include "gstptp_private.h"
63
64 #include <sys/wait.h>
65 #include <sys/types.h>
66 #include <unistd.h>
67
68 #include <gst/base/base.h>
69
70 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
71 #define GST_CAT_DEFAULT (ptp_debug)
72
73 /* IEEE 1588 7.7.3.1 */
74 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
75
76 /* Use a running average for calculating the mean path delay instead
77  * of just using the last measurement. Enabling this helps in unreliable
78  * networks, like wifi, with often changing delays
79  *
80  * Undef for following IEEE1588-2008 by the letter
81  */
82 #define USE_RUNNING_AVERAGE_DELAY 1
83
84 /* Filter out any measurements that are above a certain threshold compared to
85  * previous measurements. Enabling this helps filtering out outliers that
86  * happen fairly often in unreliable networks, like wifi.
87  *
88  * Undef for following IEEE1588-2008 by the letter
89  */
90 #define USE_MEASUREMENT_FILTERING 1
91
92 /* Select the first clock from which we capture a SYNC message as the master
93  * clock of the domain until we are ready to run the best master clock
94  * algorithm. This allows faster syncing but might mean a change of the master
95  * clock in the beginning. As all clocks in a domain are supposed to use the
96  * same time, this shouldn't be much of a problem.
97  *
98  * Undef for following IEEE1588-2008 by the letter
99  */
100 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
101
102 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
103  * afterwards. This allows better synchronization in networks with varying
104  * delays, as for every other SYNC message we would have to assume that it's
105  * the average of what we saw before. But that might be completely off
106  */
107 #define USE_ONLY_SYNC_WITH_DELAY 1
108
109 /* Filter out delay measurements that are too far away from the median of the
110  * last delay measurements, currently those that are more than 2 times as big.
111  * This increases accuracy a lot on wifi.
112  */
113 #define USE_MEDIAN_PRE_FILTERING 1
114 #define MEDIAN_PRE_FILTERING_WINDOW 9
115
116 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
117 #define MAX_SKIPPED_UPDATES 5
118
119 typedef enum
120 {
121   PTP_MESSAGE_TYPE_SYNC = 0x0,
122   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
123   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
124   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
125   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
126   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
127   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
128   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
129   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
130   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
131 } PtpMessageType;
132
133 typedef struct
134 {
135   guint64 seconds_field;        /* 48 bits valid */
136   guint32 nanoseconds_field;
137 } PtpTimestamp;
138
139 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
140 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
141 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
142
143 typedef struct
144 {
145   guint64 clock_identity;
146   guint16 port_number;
147 } PtpClockIdentity;
148
149 static gint
150 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
151 {
152   if (a->clock_identity < b->clock_identity)
153     return -1;
154   else if (a->clock_identity > b->clock_identity)
155     return 1;
156
157   if (a->port_number < b->port_number)
158     return -1;
159   else if (a->port_number > b->port_number)
160     return 1;
161
162   return 0;
163 }
164
165 typedef struct
166 {
167   guint8 clock_class;
168   guint8 clock_accuracy;
169   guint16 offset_scaled_log_variance;
170 } PtpClockQuality;
171
172 typedef struct
173 {
174   guint8 transport_specific;
175   PtpMessageType message_type;
176   /* guint8 reserved; */
177   guint8 version_ptp;
178   guint16 message_length;
179   guint8 domain_number;
180   /* guint8 reserved; */
181   guint16 flag_field;
182   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
183   /* guint32 reserved; */
184   PtpClockIdentity source_port_identity;
185   guint16 sequence_id;
186   guint8 control_field;
187   gint8 log_message_interval;
188
189   union
190   {
191     struct
192     {
193       PtpTimestamp origin_timestamp;
194       gint16 current_utc_offset;
195       /* guint8 reserved; */
196       guint8 grandmaster_priority_1;
197       PtpClockQuality grandmaster_clock_quality;
198       guint8 grandmaster_priority_2;
199       guint64 grandmaster_identity;
200       guint16 steps_removed;
201       guint8 time_source;
202     } announce;
203
204     struct
205     {
206       PtpTimestamp origin_timestamp;
207     } sync;
208
209     struct
210     {
211       PtpTimestamp precise_origin_timestamp;
212     } follow_up;
213
214     struct
215     {
216       PtpTimestamp origin_timestamp;
217     } delay_req;
218
219     struct
220     {
221       PtpTimestamp receive_timestamp;
222       PtpClockIdentity requesting_port_identity;
223     } delay_resp;
224
225   } message_specific;
226 } PtpMessage;
227
228 static GMutex ptp_lock;
229 static GCond ptp_cond;
230 static gboolean initted = FALSE;
231 static gboolean supported = TRUE;
232 static GPid ptp_helper_pid;
233 static GThread *ptp_helper_thread;
234 static GMainContext *main_context;
235 static GMainLoop *main_loop;
236 static GIOChannel *stdin_channel, *stdout_channel;
237 static GRand *delay_req_rand;
238 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
239
240 typedef struct
241 {
242   GstClockTime receive_time;
243
244   PtpClockIdentity master_clock_identity;
245
246   guint8 grandmaster_priority_1;
247   PtpClockQuality grandmaster_clock_quality;
248   guint8 grandmaster_priority_2;
249   guint64 grandmaster_identity;
250   guint16 steps_removed;
251   guint8 time_source;
252
253   guint16 sequence_id;
254 } PtpAnnounceMessage;
255
256 typedef struct
257 {
258   PtpClockIdentity master_clock_identity;
259
260   GstClockTime announce_interval;       /* last interval we received */
261   GQueue announce_messages;
262 } PtpAnnounceSender;
263
264 typedef struct
265 {
266   guint domain;
267   PtpClockIdentity master_clock_identity;
268
269   guint16 sync_seqnum;
270   GstClockTime sync_recv_time_local;    /* t2 */
271   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
272   GstClockTime follow_up_recv_time_local;
273
274   GSource *timeout_source;
275   guint16 delay_req_seqnum;
276   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
277   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
278   GstClockTime delay_resp_recv_time_local;
279
280   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
281   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
282 } PtpPendingSync;
283
284 static void
285 ptp_pending_sync_free (PtpPendingSync * sync)
286 {
287   if (sync->timeout_source)
288     g_source_destroy (sync->timeout_source);
289   g_free (sync);
290 }
291
292 typedef struct
293 {
294   guint domain;
295
296   GstClockTime last_ptp_time;
297   GstClockTime last_local_time;
298   gint skipped_updates;
299
300   /* Used for selecting the master/grandmaster */
301   GList *announce_senders;
302
303   /* Last selected master clock */
304   gboolean have_master_clock;
305   PtpClockIdentity master_clock_identity;
306   guint64 grandmaster_identity;
307
308   /* Last SYNC or FOLLOW_UP timestamp we received */
309   GstClockTime last_ptp_sync_time;
310   GstClockTime sync_interval;
311
312   GstClockTime mean_path_delay;
313   GstClockTime last_delay_req, min_delay_req_interval;
314   guint16 last_delay_req_seqnum;
315
316   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
317   gint last_path_delays_missing;
318
319   GQueue pending_syncs;
320
321   GstClock *domain_clock;
322 } PtpDomainData;
323
324 static GList *domain_data;
325 static GMutex domain_clocks_lock;
326 static GList *domain_clocks;
327
328 /* Protected by PTP lock */
329 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
330 static GHookList domain_stats_hooks;
331 static gint domain_stats_n_hooks;
332 static gboolean domain_stats_hooks_initted = FALSE;
333
334 /* Converts log2 seconds to GstClockTime */
335 static GstClockTime
336 log2_to_clock_time (gint l)
337 {
338   if (l < 0)
339     return GST_SECOND >> (-l);
340   else
341     return GST_SECOND << l;
342 }
343
344 static void
345 dump_ptp_message (PtpMessage * msg)
346 {
347   GST_TRACE ("PTP message:");
348   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
349   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
350   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
351   GST_TRACE ("\tmessage_length: %u", msg->message_length);
352   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
353   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
354   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
355       (msg->correction_field / 65536),
356       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
357   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
358       msg->source_port_identity.clock_identity,
359       msg->source_port_identity.port_number);
360   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
361   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
362   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
363       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
364
365   switch (msg->message_type) {
366     case PTP_MESSAGE_TYPE_ANNOUNCE:
367       GST_TRACE ("\tANNOUNCE:");
368       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
369           msg->message_specific.announce.origin_timestamp.seconds_field,
370           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
371       GST_TRACE ("\t\tcurrent_utc_offset: %d",
372           msg->message_specific.announce.current_utc_offset);
373       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
374           msg->message_specific.announce.grandmaster_priority_1);
375       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
376           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
377           msg->message_specific.announce.
378           grandmaster_clock_quality.clock_accuracy,
379           msg->message_specific.announce.
380           grandmaster_clock_quality.offset_scaled_log_variance);
381       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
382           msg->message_specific.announce.grandmaster_priority_2);
383       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
384           msg->message_specific.announce.grandmaster_identity);
385       GST_TRACE ("\t\tsteps_removed: %u",
386           msg->message_specific.announce.steps_removed);
387       GST_TRACE ("\t\ttime_source: 0x%02x",
388           msg->message_specific.announce.time_source);
389       break;
390     case PTP_MESSAGE_TYPE_SYNC:
391       GST_TRACE ("\tSYNC:");
392       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
393           msg->message_specific.sync.origin_timestamp.seconds_field,
394           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
395       break;
396     case PTP_MESSAGE_TYPE_FOLLOW_UP:
397       GST_TRACE ("\tFOLLOW_UP:");
398       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
399           msg->message_specific.follow_up.
400           precise_origin_timestamp.seconds_field,
401           msg->message_specific.follow_up.
402           precise_origin_timestamp.nanoseconds_field);
403       break;
404     case PTP_MESSAGE_TYPE_DELAY_REQ:
405       GST_TRACE ("\tDELAY_REQ:");
406       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
407           msg->message_specific.delay_req.origin_timestamp.seconds_field,
408           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
409       break;
410     case PTP_MESSAGE_TYPE_DELAY_RESP:
411       GST_TRACE ("\tDELAY_RESP:");
412       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
413           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
414           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
415       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
416           "x %u",
417           msg->message_specific.delay_resp.
418           requesting_port_identity.clock_identity,
419           msg->message_specific.delay_resp.
420           requesting_port_identity.port_number);
421       break;
422     default:
423       break;
424   }
425   GST_TRACE (" ");
426 }
427
428 /* IEEE 1588-2008 5.3.3 */
429 static gboolean
430 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
431 {
432   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
433
434   timestamp->seconds_field =
435       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
436       gst_byte_reader_get_uint16_be_unchecked (reader);
437   timestamp->nanoseconds_field =
438       gst_byte_reader_get_uint32_be_unchecked (reader);
439
440   if (timestamp->nanoseconds_field >= 1000000000)
441     return FALSE;
442
443   return TRUE;
444 }
445
446 /* IEEE 1588-2008 13.3 */
447 static gboolean
448 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
449 {
450   guint8 b;
451
452   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
453
454   b = gst_byte_reader_get_uint8_unchecked (reader);
455   msg->transport_specific = b >> 4;
456   msg->message_type = b & 0x0f;
457
458   b = gst_byte_reader_get_uint8_unchecked (reader);
459   msg->version_ptp = b & 0x0f;
460   if (msg->version_ptp != 2) {
461     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
462     return FALSE;
463   }
464
465   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
466   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
467     GST_WARNING ("Not enough data (%u < %u)",
468         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
469     return FALSE;
470   }
471
472   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
473   gst_byte_reader_skip_unchecked (reader, 1);
474
475   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
476   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
477   gst_byte_reader_skip_unchecked (reader, 4);
478
479   msg->source_port_identity.clock_identity =
480       gst_byte_reader_get_uint64_be_unchecked (reader);
481   msg->source_port_identity.port_number =
482       gst_byte_reader_get_uint16_be_unchecked (reader);
483
484   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
485   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
486   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
487
488   return TRUE;
489 }
490
491 /* IEEE 1588-2008 13.5 */
492 static gboolean
493 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
494 {
495   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
496
497   if (gst_byte_reader_get_remaining (reader) < 20)
498     return FALSE;
499
500   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
501           reader))
502     return FALSE;
503
504   msg->message_specific.announce.current_utc_offset =
505       gst_byte_reader_get_uint16_be_unchecked (reader);
506   gst_byte_reader_skip_unchecked (reader, 1);
507
508   msg->message_specific.announce.grandmaster_priority_1 =
509       gst_byte_reader_get_uint8_unchecked (reader);
510   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
511       gst_byte_reader_get_uint8_unchecked (reader);
512   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
513       gst_byte_reader_get_uint8_unchecked (reader);
514   msg->message_specific.announce.
515       grandmaster_clock_quality.offset_scaled_log_variance =
516       gst_byte_reader_get_uint16_be_unchecked (reader);
517   msg->message_specific.announce.grandmaster_priority_2 =
518       gst_byte_reader_get_uint8_unchecked (reader);
519   msg->message_specific.announce.grandmaster_identity =
520       gst_byte_reader_get_uint64_be_unchecked (reader);
521   msg->message_specific.announce.steps_removed =
522       gst_byte_reader_get_uint16_be_unchecked (reader);
523   msg->message_specific.announce.time_source =
524       gst_byte_reader_get_uint8_unchecked (reader);
525
526   return TRUE;
527 }
528
529 /* IEEE 1588-2008 13.6 */
530 static gboolean
531 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
532 {
533   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
534
535   if (gst_byte_reader_get_remaining (reader) < 10)
536     return FALSE;
537
538   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
539           reader))
540     return FALSE;
541
542   return TRUE;
543 }
544
545 /* IEEE 1588-2008 13.6 */
546 static gboolean
547 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
548 {
549   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
550
551   if (gst_byte_reader_get_remaining (reader) < 10)
552     return FALSE;
553
554   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
555           reader))
556     return FALSE;
557
558   return TRUE;
559 }
560
561 /* IEEE 1588-2008 13.7 */
562 static gboolean
563 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
564 {
565   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
566
567   if (gst_byte_reader_get_remaining (reader) < 10)
568     return FALSE;
569
570   if (!parse_ptp_timestamp (&msg->message_specific.
571           follow_up.precise_origin_timestamp, reader))
572     return FALSE;
573
574   return TRUE;
575 }
576
577 /* IEEE 1588-2008 13.8 */
578 static gboolean
579 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
580 {
581   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
582       FALSE);
583
584   if (gst_byte_reader_get_remaining (reader) < 20)
585     return FALSE;
586
587   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
588           reader))
589     return FALSE;
590
591   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
592       gst_byte_reader_get_uint64_be_unchecked (reader);
593   msg->message_specific.delay_resp.requesting_port_identity.port_number =
594       gst_byte_reader_get_uint16_be_unchecked (reader);
595
596   return TRUE;
597 }
598
599 static gboolean
600 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
601 {
602   GstByteReader reader;
603   gboolean ret = FALSE;
604
605   gst_byte_reader_init (&reader, data, size);
606
607   if (!parse_ptp_message_header (msg, &reader)) {
608     GST_WARNING ("Failed to parse PTP message header");
609     return FALSE;
610   }
611
612   switch (msg->message_type) {
613     case PTP_MESSAGE_TYPE_SYNC:
614       ret = parse_ptp_message_sync (msg, &reader);
615       break;
616     case PTP_MESSAGE_TYPE_FOLLOW_UP:
617       ret = parse_ptp_message_follow_up (msg, &reader);
618       break;
619     case PTP_MESSAGE_TYPE_DELAY_REQ:
620       ret = parse_ptp_message_delay_req (msg, &reader);
621       break;
622     case PTP_MESSAGE_TYPE_DELAY_RESP:
623       ret = parse_ptp_message_delay_resp (msg, &reader);
624       break;
625     case PTP_MESSAGE_TYPE_ANNOUNCE:
626       ret = parse_ptp_message_announce (msg, &reader);
627       break;
628     default:
629       /* ignore for now */
630       break;
631   }
632
633   return ret;
634 }
635
636 static gint
637 compare_announce_message (const PtpAnnounceMessage * a,
638     const PtpAnnounceMessage * b)
639 {
640   /* IEEE 1588 Figure 27 */
641   if (a->grandmaster_identity == b->grandmaster_identity) {
642     if (a->steps_removed + 1 < b->steps_removed)
643       return -1;
644     else if (a->steps_removed > b->steps_removed + 1)
645       return 1;
646
647     /* Error cases are filtered out earlier */
648     if (a->steps_removed < b->steps_removed)
649       return -1;
650     else if (a->steps_removed > b->steps_removed)
651       return 1;
652
653     /* Error cases are filtered out earlier */
654     if (a->master_clock_identity.clock_identity <
655         b->master_clock_identity.clock_identity)
656       return -1;
657     else if (a->master_clock_identity.clock_identity >
658         b->master_clock_identity.clock_identity)
659       return 1;
660
661     /* Error cases are filtered out earlier */
662     if (a->master_clock_identity.port_number <
663         b->master_clock_identity.port_number)
664       return -1;
665     else if (a->master_clock_identity.port_number >
666         b->master_clock_identity.port_number)
667       return 1;
668     else
669       g_assert_not_reached ();
670
671     return 0;
672   }
673
674   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
675     return -1;
676   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
677     return 1;
678
679   if (a->grandmaster_clock_quality.clock_class <
680       b->grandmaster_clock_quality.clock_class)
681     return -1;
682   else if (a->grandmaster_clock_quality.clock_class >
683       b->grandmaster_clock_quality.clock_class)
684     return 1;
685
686   if (a->grandmaster_clock_quality.clock_accuracy <
687       b->grandmaster_clock_quality.clock_accuracy)
688     return -1;
689   else if (a->grandmaster_clock_quality.clock_accuracy >
690       b->grandmaster_clock_quality.clock_accuracy)
691     return 1;
692
693   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
694       b->grandmaster_clock_quality.offset_scaled_log_variance)
695     return -1;
696   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
697       b->grandmaster_clock_quality.offset_scaled_log_variance)
698     return 1;
699
700   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
701     return -1;
702   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
703     return 1;
704
705   if (a->grandmaster_identity < b->grandmaster_identity)
706     return -1;
707   else if (a->grandmaster_identity > b->grandmaster_identity)
708     return 1;
709   else
710     g_assert_not_reached ();
711
712   return 0;
713 }
714
715 static void
716 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
717 {
718   GList *qualified_messages = NULL;
719   GList *l, *m;
720   PtpAnnounceMessage *best = NULL;
721
722   /* IEEE 1588 9.3.2.5 */
723   for (l = domain->announce_senders; l; l = l->next) {
724     PtpAnnounceSender *sender = l->data;
725     GstClockTime window = 4 * sender->announce_interval;
726     gint count = 0;
727
728     for (m = sender->announce_messages.head; m; m = m->next) {
729       PtpAnnounceMessage *msg = m->data;
730
731       if (now - msg->receive_time <= window)
732         count++;
733     }
734
735     /* Only include the newest message of announce senders that had at least 2
736      * announce messages in the last 4 announce intervals. Which also means
737      * that we wait at least 4 announce intervals before we select a master
738      * clock. Until then we just report based on the newest SYNC we received
739      */
740     if (count >= 2) {
741       qualified_messages =
742           g_list_prepend (qualified_messages,
743           g_queue_peek_tail (&sender->announce_messages));
744     }
745   }
746
747   if (!qualified_messages) {
748     GST_DEBUG
749         ("No qualified announce messages for domain %u, can't select a master clock",
750         domain->domain);
751     domain->have_master_clock = FALSE;
752     return;
753   }
754
755   for (l = qualified_messages; l; l = l->next) {
756     PtpAnnounceMessage *msg = l->data;
757
758     if (!best || compare_announce_message (msg, best) < 0)
759       best = msg;
760   }
761
762   if (domain->have_master_clock
763       && compare_clock_identity (&domain->master_clock_identity,
764           &best->master_clock_identity) == 0) {
765     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
766   } else {
767     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
768         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
769         domain->domain, best->master_clock_identity.clock_identity,
770         best->master_clock_identity.port_number, best->grandmaster_identity);
771
772     domain->have_master_clock = TRUE;
773     domain->grandmaster_identity = best->grandmaster_identity;
774
775     /* Opportunistic master clock selection likely gave us the same master
776      * clock before, no need to reset all statistics */
777     if (compare_clock_identity (&domain->master_clock_identity,
778             &best->master_clock_identity) != 0) {
779       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
780           sizeof (PtpClockIdentity));
781       domain->mean_path_delay = 0;
782       domain->last_delay_req = 0;
783       domain->last_path_delays_missing = 9;
784       domain->min_delay_req_interval = 0;
785       domain->sync_interval = 0;
786       domain->last_ptp_sync_time = 0;
787       domain->skipped_updates = 0;
788       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
789           NULL);
790       g_queue_clear (&domain->pending_syncs);
791     }
792
793     if (g_atomic_int_get (&domain_stats_n_hooks)) {
794       GstStructure *stats =
795           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
796           "domain", G_TYPE_UINT, domain->domain,
797           "master-clock-id", G_TYPE_UINT64,
798           domain->master_clock_identity.clock_identity,
799           "master-clock-port", G_TYPE_UINT,
800           domain->master_clock_identity.port_number,
801           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
802           NULL);
803       emit_ptp_statistics (domain->domain, stats);
804       gst_structure_free (stats);
805     }
806   }
807 }
808
809 static void
810 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
811 {
812   GList *l;
813   PtpDomainData *domain = NULL;
814   PtpAnnounceSender *sender = NULL;
815   PtpAnnounceMessage *announce;
816
817   /* IEEE1588 9.3.2.2 e)
818    * Don't consider messages with the alternate master flag set
819    */
820   if ((msg->flag_field & 0x0100))
821     return;
822
823   /* IEEE 1588 9.3.2.5 d)
824    * Don't consider announce messages with steps_removed>=255
825    */
826   if (msg->message_specific.announce.steps_removed >= 255)
827     return;
828
829   for (l = domain_data; l; l = l->next) {
830     PtpDomainData *tmp = l->data;
831
832     if (tmp->domain == msg->domain_number) {
833       domain = tmp;
834       break;
835     }
836   }
837
838   if (!domain) {
839     gchar *clock_name;
840
841     domain = g_new0 (PtpDomainData, 1);
842     domain->domain = msg->domain_number;
843     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
844     domain->domain_clock =
845         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
846     g_free (clock_name);
847     g_queue_init (&domain->pending_syncs);
848     domain->last_path_delays_missing = 9;
849     domain_data = g_list_prepend (domain_data, domain);
850
851     g_mutex_lock (&domain_clocks_lock);
852     domain_clocks = g_list_prepend (domain_clocks, domain);
853     g_mutex_unlock (&domain_clocks_lock);
854
855     if (g_atomic_int_get (&domain_stats_n_hooks)) {
856       GstStructure *stats =
857           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
858           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
859           domain->domain_clock, NULL);
860       emit_ptp_statistics (domain->domain, stats);
861       gst_structure_free (stats);
862     }
863   }
864
865   for (l = domain->announce_senders; l; l = l->next) {
866     PtpAnnounceSender *tmp = l->data;
867
868     if (compare_clock_identity (&tmp->master_clock_identity,
869             &msg->source_port_identity) == 0) {
870       sender = tmp;
871       break;
872     }
873   }
874
875   if (!sender) {
876     sender = g_new0 (PtpAnnounceSender, 1);
877
878     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
879         sizeof (PtpClockIdentity));
880     g_queue_init (&sender->announce_messages);
881     domain->announce_senders =
882         g_list_prepend (domain->announce_senders, sender);
883   }
884
885   for (l = sender->announce_messages.head; l; l = l->next) {
886     PtpAnnounceMessage *tmp = l->data;
887
888     /* IEEE 1588 9.3.2.5 c)
889      * Don't consider identical messages, i.e. duplicates
890      */
891     if (tmp->sequence_id == msg->sequence_id)
892       return;
893   }
894
895   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
896
897   announce = g_new0 (PtpAnnounceMessage, 1);
898   announce->receive_time = receive_time;
899   announce->sequence_id = msg->sequence_id;
900   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
901       sizeof (PtpClockIdentity));
902   announce->grandmaster_identity =
903       msg->message_specific.announce.grandmaster_identity;
904   announce->grandmaster_priority_1 =
905       msg->message_specific.announce.grandmaster_priority_1;
906   announce->grandmaster_clock_quality.clock_class =
907       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
908   announce->grandmaster_clock_quality.clock_accuracy =
909       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
910   announce->grandmaster_clock_quality.offset_scaled_log_variance =
911       msg->message_specific.announce.
912       grandmaster_clock_quality.offset_scaled_log_variance;
913   announce->grandmaster_priority_2 =
914       msg->message_specific.announce.grandmaster_priority_2;
915   announce->steps_removed = msg->message_specific.announce.steps_removed;
916   announce->time_source = msg->message_specific.announce.time_source;
917   g_queue_push_tail (&sender->announce_messages, announce);
918
919   select_best_master_clock (domain, receive_time);
920 }
921
922 static gboolean
923 send_delay_req_timeout (PtpPendingSync * sync)
924 {
925   StdIOHeader header = { 0, };
926   guint8 delay_req[44];
927   GstByteWriter writer;
928   GIOStatus status;
929   gsize written;
930   GError *err = NULL;
931
932   header.type = TYPE_EVENT;
933   header.size = 44;
934
935   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
936   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
937   gst_byte_writer_put_uint8_unchecked (&writer, 2);
938   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
939   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
940   gst_byte_writer_put_uint8_unchecked (&writer, 0);
941   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
942   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
943   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
944   gst_byte_writer_put_uint64_be_unchecked (&writer,
945       ptp_clock_id.clock_identity);
946   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
947   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
948   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
949   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
950   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
951   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
952
953   status =
954       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
955       sizeof (header), &written, &err);
956   if (status == G_IO_STATUS_ERROR) {
957     g_warning ("Failed to write to stdout: %s", err->message);
958     return G_SOURCE_REMOVE;
959   } else if (status == G_IO_STATUS_EOF) {
960     g_message ("EOF on stdout");
961     g_main_loop_quit (main_loop);
962     return G_SOURCE_REMOVE;
963   } else if (status != G_IO_STATUS_NORMAL) {
964     g_warning ("Unexpected stdout write status: %d", status);
965     g_main_loop_quit (main_loop);
966     return G_SOURCE_REMOVE;
967   } else if (written != sizeof (header)) {
968     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
969     g_main_loop_quit (main_loop);
970     return G_SOURCE_REMOVE;
971   }
972
973   sync->delay_req_send_time_local = gst_util_get_timestamp ();
974
975   status =
976       g_io_channel_write_chars (stdout_channel,
977       (const gchar *) delay_req, 44, &written, &err);
978   if (status == G_IO_STATUS_ERROR) {
979     g_warning ("Failed to write to stdout: %s", err->message);
980     g_main_loop_quit (main_loop);
981     return G_SOURCE_REMOVE;
982   } else if (status == G_IO_STATUS_EOF) {
983     g_message ("EOF on stdout");
984     g_main_loop_quit (main_loop);
985     return G_SOURCE_REMOVE;
986   } else if (status != G_IO_STATUS_NORMAL) {
987     g_warning ("Unexpected stdout write status: %d", status);
988     g_main_loop_quit (main_loop);
989     return G_SOURCE_REMOVE;
990   } else if (written != 44) {
991     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
992     g_main_loop_quit (main_loop);
993     return G_SOURCE_REMOVE;
994   }
995
996   return G_SOURCE_REMOVE;
997 }
998
999 static gboolean
1000 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
1001 {
1002   GstClockTime now = gst_util_get_timestamp ();
1003   guint timeout;
1004   GSource *timeout_source;
1005
1006   if (domain->last_delay_req != 0
1007       && domain->last_delay_req + domain->min_delay_req_interval > now)
1008     return FALSE;
1009
1010   domain->last_delay_req = now;
1011   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1012
1013   /* IEEE 1588 9.5.11.2 */
1014   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1015     timeout = 0;
1016   else
1017     timeout =
1018         g_rand_int_range (delay_req_rand, 0,
1019         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1020
1021   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1022   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1023   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1024       sync, NULL);
1025   g_source_attach (timeout_source, main_context);
1026
1027   return TRUE;
1028 }
1029
1030 /* Filtering of outliers for RTT and time calculations inspired
1031  * by the code from gstnetclientclock.c
1032  */
1033 static void
1034 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1035 {
1036   GstClockTime internal_time, external_time, rate_num, rate_den;
1037   GstClockTime corrected_ptp_time, corrected_local_time;
1038   gdouble r_squared = 0.0;
1039   gboolean synced;
1040   GstClockTimeDiff discont = 0;
1041   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1042 #ifdef USE_MEASUREMENT_FILTERING
1043   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1044       orig_rate_den;
1045   GstClockTime new_estimated_ptp_time;
1046   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1047   gboolean now_synced;
1048 #endif
1049
1050 #ifdef USE_ONLY_SYNC_WITH_DELAY
1051   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE)
1052     return;
1053 #endif
1054
1055   /* IEEE 1588 11.2 */
1056   corrected_ptp_time =
1057       sync->sync_send_time_remote +
1058       (sync->correction_field_sync + 32768) / 65536;
1059   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1060
1061 #ifdef USE_MEASUREMENT_FILTERING
1062   /* We check this here and when updating the mean path delay, because
1063    * we can get here without a delay response too */
1064   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1065       && sync->follow_up_recv_time_local >
1066       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1067     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1068         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1069         GST_TIME_ARGS (sync->follow_up_recv_time_local),
1070         GST_TIME_ARGS (domain->mean_path_delay));
1071     synced = FALSE;
1072     goto out;
1073   }
1074 #endif
1075
1076   /* Set an initial local-remote relation */
1077   if (domain->last_ptp_time == 0)
1078     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1079         corrected_ptp_time, 1, 1);
1080
1081 #ifdef USE_MEASUREMENT_FILTERING
1082   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1083    * estimate with our present knowledge about the clock
1084    */
1085   /* Store what the clock produced as 'now' before this update */
1086   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1087       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1088   internal_time = orig_internal_time;
1089   external_time = orig_external_time;
1090   rate_num = orig_rate_num;
1091   rate_den = orig_rate_den;
1092
1093   /* 3/4 RTT window around the estimation */
1094   max_discont = domain->mean_path_delay * 3 / 2;
1095
1096   /* Check if the estimated sync time is inside our window */
1097   estimated_ptp_time_min = corrected_local_time - max_discont;
1098   estimated_ptp_time_min =
1099       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1100       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1101   estimated_ptp_time_max = corrected_local_time + max_discont;
1102   estimated_ptp_time_max =
1103       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1104       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1105
1106   synced = (estimated_ptp_time_min < corrected_ptp_time
1107       && corrected_ptp_time < estimated_ptp_time_max);
1108
1109   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1110       GST_TIME_FORMAT, domain->domain,
1111       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1112
1113   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1114       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1115       GST_TIME_ARGS (corrected_ptp_time),
1116       GST_TIME_ARGS (estimated_ptp_time_max));
1117
1118   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1119           corrected_local_time, corrected_ptp_time, &r_squared,
1120           &internal_time, &external_time, &rate_num, &rate_den)) {
1121     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1122
1123     /* Old estimated PTP time based on receive time and path delay */
1124     estimated_ptp_time = corrected_local_time;
1125     estimated_ptp_time =
1126         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1127         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1128         orig_external_time, orig_rate_num, orig_rate_den);
1129
1130     /* New estimated PTP time based on receive time and path delay */
1131     new_estimated_ptp_time = corrected_local_time;
1132     new_estimated_ptp_time =
1133         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1134         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1135         external_time, rate_num, rate_den);
1136
1137     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1138     if (synced && ABS (discont) > max_discont) {
1139       GstClockTimeDiff offset;
1140       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1141           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1142           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1143           GST_TIME_ARGS (max_discont));
1144       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1145         offset = max_discont - discont;
1146         if (-offset > external_time)
1147           external_time = 0;
1148         else
1149           external_time += offset;
1150       } else {                  /* Too large a backward step - add a +ve offset */
1151         offset = -(max_discont + discont);
1152         external_time += offset;
1153       }
1154
1155       discont += offset;
1156     } else {
1157       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1158           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1159           GST_TIME_ARGS (max_discont));
1160     }
1161
1162     /* Check if the estimated sync time is now (still) inside our window */
1163     estimated_ptp_time_min = corrected_local_time - max_discont;
1164     estimated_ptp_time_min =
1165         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1166         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1167         external_time, rate_num, rate_den);
1168     estimated_ptp_time_max = corrected_local_time + max_discont;
1169     estimated_ptp_time_max =
1170         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1171         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1172         external_time, rate_num, rate_den);
1173
1174     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1175         && corrected_ptp_time < estimated_ptp_time_max);
1176
1177     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1178         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1179         GST_TIME_ARGS (corrected_ptp_time),
1180         GST_TIME_ARGS (estimated_ptp_time_max));
1181
1182     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1183       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1184           internal_time, external_time, rate_num, rate_den);
1185       domain->skipped_updates = 0;
1186
1187       domain->last_ptp_time = corrected_ptp_time;
1188       domain->last_local_time = corrected_local_time;
1189     } else {
1190       domain->skipped_updates++;
1191     }
1192   } else {
1193     domain->last_ptp_time = corrected_ptp_time;
1194     domain->last_local_time = corrected_local_time;
1195   }
1196
1197 #else
1198   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1199       GST_TIME_FORMAT, domain->domain,
1200       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1201
1202   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1203       &internal_time, &external_time, &rate_num, &rate_den);
1204
1205   estimated_ptp_time = corrected_local_time;
1206   estimated_ptp_time =
1207       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1208       (domain->domain_clock), estimated_ptp_time, internal_time,
1209       external_time, rate_num, rate_den);
1210
1211   gst_clock_add_observation (domain->domain_clock,
1212       corrected_local_time, corrected_ptp_time, &r_squared);
1213
1214   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1215       &internal_time, &external_time, &rate_num, &rate_den);
1216
1217   synced = TRUE;
1218   domain->last_ptp_time = corrected_ptp_time;
1219   domain->last_local_time = corrected_local_time;
1220 #endif
1221
1222 #ifdef USE_MEASUREMENT_FILTERING
1223 out:
1224 #endif
1225   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1226     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1227         "domain", G_TYPE_UINT, domain->domain,
1228         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1229         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1230         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1231         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1232         "discontinuity", G_TYPE_INT64, discont,
1233         "synced", G_TYPE_BOOLEAN, synced,
1234         "r-squared", G_TYPE_DOUBLE, r_squared,
1235         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1236         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1237         "rate-num", G_TYPE_UINT64, rate_num,
1238         "rate-den", G_TYPE_UINT64, rate_den,
1239         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1240         NULL);
1241     emit_ptp_statistics (domain->domain, stats);
1242     gst_structure_free (stats);
1243   }
1244
1245 }
1246
1247 #ifdef USE_MEDIAN_PRE_FILTERING
1248 static gint
1249 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1250 {
1251   if (*a < *b)
1252     return -1;
1253   else if (*a > *b)
1254     return 1;
1255   return 0;
1256 }
1257 #endif
1258
1259 static gboolean
1260 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1261 {
1262 #ifdef USE_MEDIAN_PRE_FILTERING
1263   GstClockTime last_path_delays[G_N_ELEMENTS (domain->last_path_delays)];
1264   GstClockTime median;
1265   gint i;
1266 #endif
1267
1268   GstClockTime mean_path_delay, delay_req_delay = 0;
1269   gboolean ret;
1270
1271   /* IEEE 1588 11.3 */
1272   mean_path_delay =
1273       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1274       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1275       (sync->correction_field_sync + sync->correction_field_delay +
1276           32768) / 65536) / 2;
1277
1278 #ifdef USE_MEDIAN_PRE_FILTERING
1279   for (i = 1; i < G_N_ELEMENTS (domain->last_path_delays); i++)
1280     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1281   domain->last_path_delays[i - 1] = mean_path_delay;
1282
1283   if (domain->last_path_delays_missing) {
1284     domain->last_path_delays_missing--;
1285   } else {
1286     memcpy (&last_path_delays, &domain->last_path_delays,
1287         sizeof (last_path_delays));
1288     g_qsort_with_data (&last_path_delays,
1289         G_N_ELEMENTS (domain->last_path_delays), sizeof (GstClockTime),
1290         (GCompareDataFunc) compare_clock_time, NULL);
1291
1292     median = last_path_delays[G_N_ELEMENTS (last_path_delays) / 2];
1293
1294     /* FIXME: We might want to use something else here, like only allowing
1295      * things in the interquartile range, or also filtering away delays that
1296      * are too small compared to the median. This here worked well enough
1297      * in tests so far.
1298      */
1299     if (mean_path_delay > 2 * median) {
1300       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1301           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1302           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1303       ret = FALSE;
1304       goto out;
1305     }
1306   }
1307 #endif
1308
1309 #ifdef USE_RUNNING_AVERAGE_DELAY
1310   /* Track an average round trip time, for a bit of smoothing */
1311   /* Always update before discarding a sample, so genuine changes in
1312    * the network get picked up, eventually */
1313   if (domain->mean_path_delay == 0)
1314     domain->mean_path_delay = mean_path_delay;
1315   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1316     domain->mean_path_delay =
1317         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1318   else
1319     domain->mean_path_delay =
1320         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1321 #else
1322   domain->mean_path_delay = mean_path_delay;
1323 #endif
1324
1325 #ifdef USE_MEASUREMENT_FILTERING
1326   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1327       domain->mean_path_delay != 0
1328       && sync->follow_up_recv_time_local >
1329       sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
1330     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1331         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1332         GST_TIME_ARGS (sync->follow_up_recv_time_local),
1333         GST_TIME_ARGS (domain->mean_path_delay));
1334     ret = FALSE;
1335     goto out;
1336   }
1337
1338   if (mean_path_delay > 2 * domain->mean_path_delay) {
1339     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1340         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1341         GST_TIME_ARGS (mean_path_delay),
1342         GST_TIME_ARGS (domain->mean_path_delay));
1343     ret = FALSE;
1344     goto out;
1345   }
1346 #endif
1347
1348   delay_req_delay =
1349       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1350
1351 #ifdef USE_MEASUREMENT_FILTERING
1352   /* delay_req_delay is a RTT, so 2 times the path delay */
1353   if (delay_req_delay > 4 * domain->mean_path_delay) {
1354     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1355         GST_TIME_FORMAT " > 4 * %" GST_TIME_FORMAT, domain->domain,
1356         GST_TIME_ARGS (delay_req_delay),
1357         GST_TIME_ARGS (domain->mean_path_delay));
1358     ret = FALSE;
1359     goto out;
1360   }
1361 #endif
1362
1363   ret = TRUE;
1364
1365   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1366       GST_TIME_FORMAT ")", domain->domain,
1367       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1368   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1369       domain->domain, GST_TIME_ARGS (delay_req_delay));
1370
1371 #ifdef USE_MEASUREMENT_FILTERING
1372 out:
1373 #endif
1374   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1375     GstStructure *stats =
1376         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1377         "domain", G_TYPE_UINT, domain->domain,
1378         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1379         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1380         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1381     emit_ptp_statistics (domain->domain, stats);
1382     gst_structure_free (stats);
1383   }
1384
1385   return ret;
1386 }
1387
1388 static void
1389 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1390 {
1391   GList *l;
1392   PtpDomainData *domain = NULL;
1393   PtpPendingSync *sync = NULL;
1394
1395   /* Don't consider messages with the alternate master flag set */
1396   if ((msg->flag_field & 0x0100))
1397     return;
1398
1399   for (l = domain_data; l; l = l->next) {
1400     PtpDomainData *tmp = l->data;
1401
1402     if (msg->domain_number == tmp->domain) {
1403       domain = tmp;
1404       break;
1405     }
1406   }
1407
1408   if (!domain) {
1409     gchar *clock_name;
1410
1411     domain = g_new0 (PtpDomainData, 1);
1412     domain->domain = msg->domain_number;
1413     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1414     domain->domain_clock =
1415         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1416     g_free (clock_name);
1417     g_queue_init (&domain->pending_syncs);
1418     domain->last_path_delays_missing = 9;
1419     domain_data = g_list_prepend (domain_data, domain);
1420
1421     g_mutex_lock (&domain_clocks_lock);
1422     domain_clocks = g_list_prepend (domain_clocks, domain);
1423     g_mutex_unlock (&domain_clocks_lock);
1424   }
1425
1426   /* If we have a master clock, ignore this message if it's not coming from there */
1427   if (domain->have_master_clock
1428       && compare_clock_identity (&domain->master_clock_identity,
1429           &msg->source_port_identity) != 0)
1430     return;
1431
1432 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1433   /* Opportunistic selection of master clock */
1434   if (!domain->have_master_clock)
1435     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1436         sizeof (PtpClockIdentity));
1437 #else
1438   if (!domain->have_master_clock)
1439     return;
1440 #endif
1441
1442   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1443
1444   /* Check if duplicated */
1445   for (l = domain->pending_syncs.head; l; l = l->next) {
1446     PtpPendingSync *tmp = l->data;
1447
1448     if (tmp->sync_seqnum == msg->sequence_id)
1449       return;
1450   }
1451
1452   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1453       GST_CLOCK_TIME_NONE / GST_SECOND) {
1454     GST_FIXME ("Unsupported sync message seconds field value: %"
1455         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1456         msg->message_specific.sync.origin_timestamp.seconds_field,
1457         GST_CLOCK_TIME_NONE / GST_SECOND);
1458     return;
1459   }
1460
1461   sync = g_new0 (PtpPendingSync, 1);
1462   sync->domain = domain->domain;
1463   sync->sync_seqnum = msg->sequence_id;
1464   sync->sync_recv_time_local = receive_time;
1465   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1466   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1467   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1468   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1469   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1470
1471   /* 0.5 correction factor for division later */
1472   sync->correction_field_sync = msg->correction_field;
1473
1474   if ((msg->flag_field & 0x0200)) {
1475     /* Wait for FOLLOW_UP */
1476   } else {
1477     sync->sync_send_time_remote =
1478         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1479         sync.origin_timestamp);
1480
1481     if (domain->last_ptp_sync_time != 0
1482         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1483       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1484           GST_TIME_FORMAT, domain->domain,
1485           GST_TIME_ARGS (domain->last_ptp_sync_time),
1486           GST_TIME_ARGS (sync->sync_send_time_remote));
1487       ptp_pending_sync_free (sync);
1488       sync = NULL;
1489       return;
1490     }
1491     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1492
1493     if (send_delay_req (domain, sync)) {
1494       /* Sent delay request */
1495     } else {
1496       update_ptp_time (domain, sync);
1497       ptp_pending_sync_free (sync);
1498       sync = NULL;
1499     }
1500   }
1501
1502   if (sync)
1503     g_queue_push_tail (&domain->pending_syncs, sync);
1504 }
1505
1506 static void
1507 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1508 {
1509   GList *l;
1510   PtpDomainData *domain = NULL;
1511   PtpPendingSync *sync = NULL;
1512
1513   /* Don't consider messages with the alternate master flag set */
1514   if ((msg->flag_field & 0x0100))
1515     return;
1516
1517   for (l = domain_data; l; l = l->next) {
1518     PtpDomainData *tmp = l->data;
1519
1520     if (msg->domain_number == tmp->domain) {
1521       domain = tmp;
1522       break;
1523     }
1524   }
1525
1526   if (!domain)
1527     return;
1528
1529   /* If we have a master clock, ignore this message if it's not coming from there */
1530   if (domain->have_master_clock
1531       && compare_clock_identity (&domain->master_clock_identity,
1532           &msg->source_port_identity) != 0)
1533     return;
1534
1535   /* Check if we know about this one */
1536   for (l = domain->pending_syncs.head; l; l = l->next) {
1537     PtpPendingSync *tmp = l->data;
1538
1539     if (tmp->sync_seqnum == msg->sequence_id) {
1540       sync = tmp;
1541       break;
1542     }
1543   }
1544
1545   if (!sync)
1546     return;
1547
1548   /* Got a FOLLOW_UP for this already */
1549   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE)
1550     return;
1551
1552   if (sync->sync_recv_time_local >= receive_time) {
1553     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1554         GST_TIME_FORMAT, domain->domain,
1555         GST_TIME_ARGS (sync->sync_recv_time_local),
1556         GST_TIME_ARGS (receive_time));
1557     g_queue_remove (&domain->pending_syncs, sync);
1558     ptp_pending_sync_free (sync);
1559     return;
1560   }
1561
1562   sync->correction_field_sync += msg->correction_field;
1563   sync->sync_send_time_remote =
1564       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1565       follow_up.precise_origin_timestamp);
1566   sync->follow_up_recv_time_local = receive_time;
1567
1568   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1569     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1570         GST_TIME_FORMAT, domain->domain,
1571         GST_TIME_ARGS (domain->last_ptp_sync_time),
1572         GST_TIME_ARGS (sync->sync_send_time_remote));
1573     g_queue_remove (&domain->pending_syncs, sync);
1574     ptp_pending_sync_free (sync);
1575     sync = NULL;
1576     return;
1577   }
1578   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1579
1580   if (send_delay_req (domain, sync)) {
1581     /* Sent delay request */
1582   } else {
1583     update_ptp_time (domain, sync);
1584     g_queue_remove (&domain->pending_syncs, sync);
1585     ptp_pending_sync_free (sync);
1586     sync = NULL;
1587   }
1588 }
1589
1590 static void
1591 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1592 {
1593   GList *l;
1594   PtpDomainData *domain = NULL;
1595   PtpPendingSync *sync = NULL;
1596
1597   /* Don't consider messages with the alternate master flag set */
1598   if ((msg->flag_field & 0x0100))
1599     return;
1600
1601   for (l = domain_data; l; l = l->next) {
1602     PtpDomainData *tmp = l->data;
1603
1604     if (msg->domain_number == tmp->domain) {
1605       domain = tmp;
1606       break;
1607     }
1608   }
1609
1610   if (!domain)
1611     return;
1612
1613   /* If we have a master clock, ignore this message if it's not coming from there */
1614   if (domain->have_master_clock
1615       && compare_clock_identity (&domain->master_clock_identity,
1616           &msg->source_port_identity) != 0)
1617     return;
1618
1619   /* Not for us */
1620   if (msg->message_specific.delay_resp.
1621       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1622       || msg->message_specific.delay_resp.
1623       requesting_port_identity.port_number != ptp_clock_id.port_number)
1624     return;
1625
1626   domain->min_delay_req_interval =
1627       log2_to_clock_time (msg->log_message_interval);
1628
1629   /* Check if we know about this one */
1630   for (l = domain->pending_syncs.head; l; l = l->next) {
1631     PtpPendingSync *tmp = l->data;
1632
1633     if (tmp->delay_req_seqnum == msg->sequence_id) {
1634       sync = tmp;
1635       break;
1636     }
1637   }
1638
1639   if (!sync)
1640     return;
1641
1642   /* Got a DELAY_RESP for this already */
1643   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1644     return;
1645
1646   if (sync->delay_req_send_time_local > receive_time) {
1647     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1648         GST_TIME_FORMAT, domain->domain,
1649         GST_TIME_ARGS (sync->delay_req_send_time_local),
1650         GST_TIME_ARGS (receive_time));
1651     g_queue_remove (&domain->pending_syncs, sync);
1652     ptp_pending_sync_free (sync);
1653     return;
1654   }
1655
1656   sync->correction_field_delay = msg->correction_field;
1657
1658   sync->delay_req_recv_time_remote =
1659       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1660       delay_resp.receive_timestamp);
1661   sync->delay_resp_recv_time_local = receive_time;
1662
1663   if (domain->mean_path_delay != 0
1664       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1665     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1666         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1667         GST_TIME_ARGS (sync->sync_send_time_remote),
1668         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1669     g_queue_remove (&domain->pending_syncs, sync);
1670     ptp_pending_sync_free (sync);
1671     return;
1672   }
1673
1674   if (update_mean_path_delay (domain, sync))
1675     update_ptp_time (domain, sync);
1676   g_queue_remove (&domain->pending_syncs, sync);
1677   ptp_pending_sync_free (sync);
1678 }
1679
1680 static void
1681 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1682 {
1683   /* Ignore our own messages */
1684   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1685       msg->source_port_identity.port_number == ptp_clock_id.port_number)
1686     return;
1687
1688   switch (msg->message_type) {
1689     case PTP_MESSAGE_TYPE_ANNOUNCE:
1690       handle_announce_message (msg, receive_time);
1691       break;
1692     case PTP_MESSAGE_TYPE_SYNC:
1693       handle_sync_message (msg, receive_time);
1694       break;
1695     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1696       handle_follow_up_message (msg, receive_time);
1697       break;
1698     case PTP_MESSAGE_TYPE_DELAY_RESP:
1699       handle_delay_resp_message (msg, receive_time);
1700       break;
1701     default:
1702       break;
1703   }
1704 }
1705
1706 static gboolean
1707 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1708     gpointer user_data)
1709 {
1710   GIOStatus status;
1711   StdIOHeader header;
1712   gchar buffer[8192];
1713   GError *err = NULL;
1714   gsize read;
1715
1716   if ((condition & G_IO_STATUS_EOF)) {
1717     GST_ERROR ("Got EOF on stdin");
1718     g_main_loop_quit (main_loop);
1719     return G_SOURCE_REMOVE;
1720   }
1721
1722   status =
1723       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1724       &read, &err);
1725   if (status == G_IO_STATUS_ERROR) {
1726     GST_ERROR ("Failed to read from stdin: %s", err->message);
1727     g_main_loop_quit (main_loop);
1728     return G_SOURCE_REMOVE;
1729   } else if (status == G_IO_STATUS_EOF) {
1730     GST_ERROR ("Got EOF on stdin");
1731     g_main_loop_quit (main_loop);
1732     return G_SOURCE_REMOVE;
1733   } else if (status != G_IO_STATUS_NORMAL) {
1734     GST_ERROR ("Unexpected stdin read status: %d", status);
1735     g_main_loop_quit (main_loop);
1736     return G_SOURCE_REMOVE;
1737   } else if (read != sizeof (header)) {
1738     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1739     g_main_loop_quit (main_loop);
1740     return G_SOURCE_REMOVE;
1741   } else if (header.size > 8192) {
1742     GST_ERROR ("Unexpected size: %u", header.size);
1743     g_main_loop_quit (main_loop);
1744     return G_SOURCE_REMOVE;
1745   }
1746
1747   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1748   if (status == G_IO_STATUS_ERROR) {
1749     GST_ERROR ("Failed to read from stdin: %s", err->message);
1750     g_main_loop_quit (main_loop);
1751     return G_SOURCE_REMOVE;
1752   } else if (status == G_IO_STATUS_EOF) {
1753     GST_ERROR ("EOF on stdin");
1754     g_main_loop_quit (main_loop);
1755     return G_SOURCE_REMOVE;
1756   } else if (status != G_IO_STATUS_NORMAL) {
1757     GST_ERROR ("Unexpected stdin read status: %d", status);
1758     g_main_loop_quit (main_loop);
1759     return G_SOURCE_REMOVE;
1760   } else if (read != header.size) {
1761     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1762     g_main_loop_quit (main_loop);
1763     return G_SOURCE_REMOVE;
1764   }
1765
1766   switch (header.type) {
1767     case TYPE_EVENT:
1768     case TYPE_GENERAL:{
1769       GstClockTime receive_time = gst_util_get_timestamp ();
1770       PtpMessage msg;
1771
1772       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1773         dump_ptp_message (&msg);
1774         handle_ptp_message (&msg, receive_time);
1775       }
1776       break;
1777     }
1778     default:
1779     case TYPE_CLOCK_ID:{
1780       if (header.size != 8) {
1781         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1782         g_main_loop_quit (main_loop);
1783         return G_SOURCE_REMOVE;
1784       }
1785       g_mutex_lock (&ptp_lock);
1786       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1787       ptp_clock_id.port_number = getpid ();
1788       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1789           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1790       g_cond_signal (&ptp_cond);
1791       g_mutex_unlock (&ptp_lock);
1792       break;
1793     }
1794   }
1795
1796   return G_SOURCE_CONTINUE;
1797 }
1798
1799 /* Cleanup all announce messages and announce message senders
1800  * that are timed out by now, and clean up all pending syncs
1801  * that are missing their FOLLOW_UP or DELAY_RESP */
1802 static gboolean
1803 cleanup_cb (gpointer data)
1804 {
1805   GstClockTime now = gst_util_get_timestamp ();
1806   GList *l, *m, *n;
1807
1808   for (l = domain_data; l; l = l->next) {
1809     PtpDomainData *domain = l->data;
1810
1811     for (n = domain->announce_senders; n;) {
1812       PtpAnnounceSender *sender = n->data;
1813       gboolean timed_out = TRUE;
1814
1815       /* Keep only 5 messages per sender around */
1816       while (g_queue_get_length (&sender->announce_messages) > 5) {
1817         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1818         g_free (msg);
1819       }
1820
1821       for (m = sender->announce_messages.head; m; m = m->next) {
1822         PtpAnnounceMessage *msg = m->data;
1823
1824         if (msg->receive_time +
1825             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1826           timed_out = FALSE;
1827           break;
1828         }
1829       }
1830
1831       if (timed_out) {
1832         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1833             sender->master_clock_identity.clock_identity,
1834             sender->master_clock_identity.port_number);
1835         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1836         g_queue_clear (&sender->announce_messages);
1837       }
1838
1839       if (g_queue_get_length (&sender->announce_messages) == 0) {
1840         GList *tmp = n->next;
1841
1842         if (compare_clock_identity (&sender->master_clock_identity,
1843                 &domain->master_clock_identity) == 0)
1844           GST_WARNING ("currently selected master clock timed out");
1845         g_free (sender);
1846         domain->announce_senders =
1847             g_list_delete_link (domain->announce_senders, n);
1848         n = tmp;
1849       } else {
1850         n = n->next;
1851       }
1852     }
1853     select_best_master_clock (domain, now);
1854
1855     /* Clean up any pending syncs */
1856     for (n = domain->pending_syncs.head; n;) {
1857       PtpPendingSync *sync = n->data;
1858       gboolean timed_out = FALSE;
1859
1860       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1861        * and pending delay reqs after 4 delay req intervals or 10 seconds
1862        */
1863       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1864           ((domain->min_delay_req_interval != 0
1865                   && sync->delay_req_send_time_local +
1866                   4 * domain->min_delay_req_interval < now)
1867               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1868         timed_out = TRUE;
1869       } else if ((domain->sync_interval != 0
1870               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
1871           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
1872         timed_out = TRUE;
1873       }
1874
1875       if (timed_out) {
1876         GList *tmp = n->next;
1877         ptp_pending_sync_free (sync);
1878         g_queue_delete_link (&domain->pending_syncs, n);
1879         n = tmp;
1880       } else {
1881         n = n->next;
1882       }
1883     }
1884   }
1885
1886   return G_SOURCE_CONTINUE;
1887 }
1888
1889 static gpointer
1890 ptp_helper_main (gpointer data)
1891 {
1892   GSource *cleanup_source;
1893
1894   GST_DEBUG ("Starting PTP helper loop");
1895
1896   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
1897   cleanup_source = g_timeout_source_new_seconds (5);
1898   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
1899   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
1900   g_source_attach (cleanup_source, main_context);
1901   g_source_unref (cleanup_source);
1902
1903   g_main_loop_run (main_loop);
1904   GST_DEBUG ("Stopped PTP helper loop");
1905
1906   g_mutex_lock (&ptp_lock);
1907   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
1908   ptp_clock_id.port_number = 0;
1909   initted = FALSE;
1910   g_cond_signal (&ptp_cond);
1911   g_mutex_unlock (&ptp_lock);
1912
1913   return NULL;
1914 }
1915
1916 /**
1917  * gst_ptp_is_supported:
1918  *
1919  * Check if PTP clocks are generally supported on this system, and if previous
1920  * initializations did not fail.
1921  *
1922  * Returns: %TRUE if PTP clocks are generally supported on this system, and
1923  * previous initializations did not fail.
1924  *
1925  * Since: 1.6
1926  */
1927 gboolean
1928 gst_ptp_is_supported (void)
1929 {
1930   return supported;
1931 }
1932
1933 /**
1934  * gst_ptp_is_initialized:
1935  *
1936  * Check if the GStreamer PTP clock subsystem is initialized.
1937  *
1938  * Returns: %TRUE if the GStreamer PTP clock subsystem is intialized.
1939  *
1940  * Since: 1.6
1941  */
1942 gboolean
1943 gst_ptp_is_initialized (void)
1944 {
1945   return initted;
1946 }
1947
1948 /**
1949  * gst_ptp_init:
1950  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
1951  * @interfaces: (transfer none) (array zero-terminated=1): network interfaces to run the clock on
1952  *
1953  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
1954  * slave-only mode for all domains on the given @interfaces with the
1955  * given @clock_id.
1956  *
1957  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
1958  * generated from the MAC address of the first network interface.
1959  *
1960  *
1961  * This function is automatically called by gst_ptp_clock_new() with default
1962  * parameters if it wasn't called before.
1963  *
1964  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
1965  *
1966  * Since: 1.6
1967  */
1968 gboolean
1969 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
1970 {
1971   gboolean ret;
1972   const gchar *env;
1973   gchar **argv = NULL;
1974   gint argc, argc_c;
1975   gint fd_r, fd_w;
1976   GError *err = NULL;
1977   GSource *stdin_source;
1978
1979   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
1980
1981   g_mutex_lock (&ptp_lock);
1982   if (!supported) {
1983     GST_ERROR ("PTP not supported");
1984     ret = FALSE;
1985     goto done;
1986   }
1987
1988   if (initted) {
1989     GST_DEBUG ("PTP already initialized");
1990     ret = TRUE;
1991     goto done;
1992   }
1993
1994   if (ptp_helper_pid) {
1995     GST_DEBUG ("PTP currently initializing");
1996     goto wait;
1997   }
1998
1999   if (!domain_stats_hooks_initted) {
2000     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2001     domain_stats_hooks_initted = TRUE;
2002   }
2003
2004   argc = 1;
2005   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2006     argc += 2;
2007   if (interfaces != NULL)
2008     argc += 2 * g_strv_length (interfaces);
2009
2010   argv = g_new0 (gchar *, argc + 2);
2011   argc_c = 0;
2012
2013   env = g_getenv ("GST_PTP_HELPER_1_0");
2014   if (env == NULL)
2015     env = g_getenv ("GST_PTP_HELPER");
2016   if (env != NULL && *env != '\0') {
2017     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2018     argv[argc_c++] = g_strdup (env);
2019   } else {
2020     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2021   }
2022
2023   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2024     argv[argc_c++] = g_strdup ("-c");
2025     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2026   }
2027
2028   if (interfaces != NULL) {
2029     gchar **ptr = interfaces;
2030
2031     while (*ptr) {
2032       argv[argc_c++] = g_strdup ("-i");
2033       argv[argc_c++] = g_strdup (*ptr);
2034       ptr++;
2035     }
2036   }
2037
2038   main_context = g_main_context_new ();
2039   main_loop = g_main_loop_new (main_context, FALSE);
2040
2041   ptp_helper_thread =
2042       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2043   if (!ptp_helper_thread) {
2044     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2045     g_clear_error (&err);
2046     ret = FALSE;
2047     goto done;
2048   }
2049
2050   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2051           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2052     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2053     g_clear_error (&err);
2054     ret = FALSE;
2055     supported = FALSE;
2056     goto done;
2057   }
2058
2059   stdin_channel = g_io_channel_unix_new (fd_r);
2060   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2061   g_io_channel_set_buffered (stdin_channel, FALSE);
2062   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2063   stdin_source =
2064       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2065   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2066   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2067       NULL);
2068   g_source_attach (stdin_source, main_context);
2069   g_source_unref (stdin_source);
2070
2071   /* Create stdout channel */
2072   stdout_channel = g_io_channel_unix_new (fd_w);
2073   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2074   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2075   g_io_channel_set_buffered (stdout_channel, FALSE);
2076
2077   delay_req_rand = g_rand_new ();
2078
2079   initted = TRUE;
2080
2081 wait:
2082   GST_DEBUG ("Waiting for PTP to be initialized");
2083
2084   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2085     g_cond_wait (&ptp_cond, &ptp_lock);
2086
2087   ret = initted;
2088   if (ret) {
2089     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2090         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2091   } else {
2092     GST_ERROR ("Failed to initialize");
2093     supported = FALSE;
2094   }
2095
2096 done:
2097   g_strfreev (argv);
2098
2099   if (!ret) {
2100     if (ptp_helper_pid) {
2101       kill (ptp_helper_pid, SIGKILL);
2102       waitpid (ptp_helper_pid, NULL, 0);
2103       g_spawn_close_pid (ptp_helper_pid);
2104     }
2105     ptp_helper_pid = 0;
2106
2107     if (stdin_channel)
2108       g_io_channel_unref (stdin_channel);
2109     stdin_channel = NULL;
2110     if (stdout_channel)
2111       g_io_channel_unref (stdout_channel);
2112     stdout_channel = NULL;
2113
2114     if (main_loop && ptp_helper_thread) {
2115       g_main_loop_quit (main_loop);
2116       g_thread_join (ptp_helper_thread);
2117     }
2118     ptp_helper_thread = NULL;
2119     if (main_loop)
2120       g_main_loop_unref (main_loop);
2121     main_loop = NULL;
2122     if (main_context)
2123       g_main_context_unref (main_context);
2124     main_context = NULL;
2125
2126     if (delay_req_rand)
2127       g_rand_free (delay_req_rand);
2128     delay_req_rand = NULL;
2129   }
2130
2131   g_mutex_unlock (&ptp_lock);
2132
2133   return ret;
2134 }
2135
2136 /**
2137  * gst_ptp_deinit:
2138  *
2139  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2140  * are any remaining GstPtpClock instances, they won't be further synchronized
2141  * to the PTP network clock.
2142  *
2143  * Since: 1.6
2144  */
2145 void
2146 gst_ptp_deinit (void)
2147 {
2148   GList *l, *m;
2149
2150   g_mutex_lock (&ptp_lock);
2151
2152   if (ptp_helper_pid) {
2153     kill (ptp_helper_pid, SIGKILL);
2154     waitpid (ptp_helper_pid, NULL, 0);
2155     g_spawn_close_pid (ptp_helper_pid);
2156   }
2157   ptp_helper_pid = 0;
2158
2159   if (stdin_channel)
2160     g_io_channel_unref (stdin_channel);
2161   stdin_channel = NULL;
2162   if (stdout_channel)
2163     g_io_channel_unref (stdout_channel);
2164   stdout_channel = NULL;
2165
2166   if (main_loop && ptp_helper_thread) {
2167     GThread *tmp = ptp_helper_thread;
2168     ptp_helper_thread = NULL;
2169     g_mutex_unlock (&ptp_lock);
2170     g_main_loop_quit (main_loop);
2171     g_thread_join (tmp);
2172     g_mutex_lock (&ptp_lock);
2173   }
2174   if (main_loop)
2175     g_main_loop_unref (main_loop);
2176   main_loop = NULL;
2177   if (main_context)
2178     g_main_context_unref (main_context);
2179   main_context = NULL;
2180
2181   if (delay_req_rand)
2182     g_rand_free (delay_req_rand);
2183   delay_req_rand = NULL;
2184
2185   for (l = domain_data; l; l = l->next) {
2186     PtpDomainData *domain = l->data;
2187
2188     for (m = domain->announce_senders; m; m = m->next) {
2189       PtpAnnounceSender *sender = m->data;
2190
2191       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2192       g_queue_clear (&sender->announce_messages);
2193       g_free (sender);
2194     }
2195     g_list_free (domain->announce_senders);
2196
2197     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2198         NULL);
2199     g_queue_clear (&domain->pending_syncs);
2200     gst_object_unref (domain->domain_clock);
2201     g_free (domain);
2202   }
2203   g_list_free (domain_data);
2204   domain_data = NULL;
2205   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2206   g_list_free (domain_clocks);
2207   domain_clocks = NULL;
2208
2209   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2210   ptp_clock_id.port_number = 0;
2211
2212   initted = FALSE;
2213
2214   g_mutex_unlock (&ptp_lock);
2215 }
2216
2217 #define DEFAULT_DOMAIN 0
2218
2219 enum
2220 {
2221   PROP_0,
2222   PROP_DOMAIN,
2223   PROP_INTERNAL_CLOCK
2224 };
2225
2226 #define GST_PTP_CLOCK_GET_PRIVATE(obj)  \
2227   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PTP_CLOCK, GstPtpClockPrivate))
2228
2229 struct _GstPtpClockPrivate
2230 {
2231   guint domain;
2232   GstClock *domain_clock;
2233   gulong domain_stats_id;
2234 };
2235
2236 #define gst_ptp_clock_parent_class parent_class
2237 G_DEFINE_TYPE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2238
2239 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2240     const GValue * value, GParamSpec * pspec);
2241 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2242     GValue * value, GParamSpec * pspec);
2243 static void gst_ptp_clock_finalize (GObject * object);
2244
2245 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2246
2247 static void
2248 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2249 {
2250   GObjectClass *gobject_class;
2251   GstClockClass *clock_class;
2252
2253   gobject_class = G_OBJECT_CLASS (klass);
2254   clock_class = GST_CLOCK_CLASS (klass);
2255
2256   g_type_class_add_private (klass, sizeof (GstPtpClockPrivate));
2257
2258   gobject_class->finalize = gst_ptp_clock_finalize;
2259   gobject_class->get_property = gst_ptp_clock_get_property;
2260   gobject_class->set_property = gst_ptp_clock_set_property;
2261
2262   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2263       g_param_spec_uint ("domain", "Domain",
2264           "The PTP domain", 0, G_MAXUINT8,
2265           DEFAULT_DOMAIN,
2266           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2267
2268   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2269       g_param_spec_object ("internal-clock", "Internal Clock",
2270           "Internal clock", GST_TYPE_CLOCK,
2271           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2272
2273   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2274 }
2275
2276 static void
2277 gst_ptp_clock_init (GstPtpClock * self)
2278 {
2279   GstPtpClockPrivate *priv;
2280
2281   self->priv = priv = GST_PTP_CLOCK_GET_PRIVATE (self);
2282
2283   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2284   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2285
2286   priv->domain = DEFAULT_DOMAIN;
2287 }
2288
2289 static gboolean
2290 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2291 {
2292   gboolean got_clock = TRUE;
2293
2294   if (G_UNLIKELY (!self->priv->domain_clock)) {
2295     g_mutex_lock (&domain_clocks_lock);
2296     if (!self->priv->domain_clock) {
2297       GList *l;
2298
2299       got_clock = FALSE;
2300
2301       for (l = domain_clocks; l; l = l->next) {
2302         PtpDomainData *clock_data = l->data;
2303
2304         if (clock_data->domain == self->priv->domain
2305             && clock_data->last_ptp_time != 0) {
2306           self->priv->domain_clock = clock_data->domain_clock;
2307           got_clock = TRUE;
2308           break;
2309         }
2310       }
2311     }
2312     g_mutex_unlock (&domain_clocks_lock);
2313     if (got_clock) {
2314       g_object_notify (G_OBJECT (self), "internal-clock");
2315       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2316     }
2317   }
2318
2319   return got_clock;
2320 }
2321
2322 static gboolean
2323 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2324     gpointer user_data)
2325 {
2326   GstPtpClock *self = user_data;
2327
2328   if (domain != self->priv->domain
2329       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2330     return TRUE;
2331
2332   /* Let's set our internal clock */
2333   if (!gst_ptp_clock_ensure_domain_clock (self))
2334     return TRUE;
2335
2336   self->priv->domain_stats_id = 0;
2337
2338   return FALSE;
2339 }
2340
2341 static void
2342 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2343     const GValue * value, GParamSpec * pspec)
2344 {
2345   GstPtpClock *self = GST_PTP_CLOCK (object);
2346
2347   switch (prop_id) {
2348     case PROP_DOMAIN:
2349       self->priv->domain = g_value_get_uint (value);
2350       gst_ptp_clock_ensure_domain_clock (self);
2351       if (!self->priv->domain_clock)
2352         self->priv->domain_stats_id =
2353             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2354             NULL);
2355       break;
2356     default:
2357       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2358       break;
2359   }
2360 }
2361
2362 static void
2363 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2364     GValue * value, GParamSpec * pspec)
2365 {
2366   GstPtpClock *self = GST_PTP_CLOCK (object);
2367
2368   switch (prop_id) {
2369     case PROP_DOMAIN:
2370       g_value_set_uint (value, self->priv->domain);
2371       break;
2372     case PROP_INTERNAL_CLOCK:
2373       gst_ptp_clock_ensure_domain_clock (self);
2374       g_value_set_object (value, self->priv->domain_clock);
2375       break;
2376     default:
2377       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2378       break;
2379   }
2380 }
2381
2382 static void
2383 gst_ptp_clock_finalize (GObject * object)
2384 {
2385   GstPtpClock *self = GST_PTP_CLOCK (object);
2386
2387   if (self->priv->domain_stats_id)
2388     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2389
2390   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2391 }
2392
2393 static GstClockTime
2394 gst_ptp_clock_get_internal_time (GstClock * clock)
2395 {
2396   GstPtpClock *self = GST_PTP_CLOCK (clock);
2397
2398   gst_ptp_clock_ensure_domain_clock (self);
2399
2400   if (!self->priv->domain_clock) {
2401     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2402         self->priv->domain);
2403     return GST_CLOCK_TIME_NONE;
2404   }
2405
2406   return gst_clock_get_time (self->priv->domain_clock);
2407 }
2408
2409 /**
2410  * gst_ptp_clock_new:
2411  * @name: Name of the clock
2412  * @domain: PTP domain
2413  *
2414  * Creates a new PTP clock instance that exports the PTP time of the master
2415  * clock in @domain. This clock can be slaved to other clocks as needed.
2416  *
2417  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2418  * default parameters.
2419  *
2420  *
2421  * This clock only returns valid timestamps after it received the first
2422  * times from the PTP master clock on the network. Once this happens the
2423  * GstPtpClock::internal-clock property will become non-NULL. You can connect
2424  * to the notify::internal-clock signal to get notified about this, or
2425  * alternatively use gst_ptp_clock_wait_ready() to wait for this to happen.
2426  *
2427  * Since: 1.6
2428  */
2429 GstClock *
2430 gst_ptp_clock_new (const gchar * name, guint domain)
2431 {
2432   g_return_val_if_fail (name != NULL, NULL);
2433   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2434
2435   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2436     GST_ERROR ("Failed to initialize PTP");
2437     return NULL;
2438   }
2439
2440   return g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2441       NULL);
2442 }
2443
2444 typedef struct
2445 {
2446   guint8 domain;
2447   const GstStructure *stats;
2448 } DomainStatsMarshalData;
2449
2450 static void
2451 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2452 {
2453   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2454
2455   if (!callback (data->domain, data->stats, hook->data))
2456     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2457 }
2458
2459 static void
2460 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2461 {
2462   DomainStatsMarshalData data = { domain, stats };
2463
2464   g_mutex_lock (&ptp_lock);
2465   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2466       (GHookMarshaller) domain_stats_marshaller, &data);
2467   g_mutex_unlock (&ptp_lock);
2468 }
2469
2470 /**
2471  * gst_ptp_statistics_callback_add:
2472  * @callback: GstPtpStatisticsCallback to call
2473  * @user_data: Data to pass to the callback
2474  * @destroy_data: GDestroyNotify to destroy the data
2475  *
2476  * Installs a new statistics callback for gathering PTP statistics. See
2477  * GstPtpStatisticsCallback for a list of statistics that are provided.
2478  *
2479  * Returns: Id for the callback that can be passed to
2480  * gst_ptp_statistics_callback_remove()
2481  *
2482  * Since: 1.6
2483  */
2484 gulong
2485 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2486     gpointer user_data, GDestroyNotify destroy_data)
2487 {
2488   GHook *hook;
2489
2490   g_mutex_lock (&ptp_lock);
2491
2492   if (!domain_stats_hooks_initted) {
2493     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2494     domain_stats_hooks_initted = TRUE;
2495   }
2496
2497   hook = g_hook_alloc (&domain_stats_hooks);
2498   hook->func = callback;
2499   hook->data = user_data;
2500   hook->destroy = destroy_data;
2501   g_hook_prepend (&domain_stats_hooks, hook);
2502   g_atomic_int_add (&domain_stats_n_hooks, 1);
2503
2504   g_mutex_unlock (&ptp_lock);
2505
2506   return hook->hook_id;
2507 }
2508
2509 /**
2510  * gst_ptp_statistics_callback_remove:
2511  * @id: Callback id to remove
2512  *
2513  * Removes a PTP statistics callback that was previously added with
2514  * gst_ptp_statistics_callback_add().
2515  *
2516  * Since: 1.6
2517  */
2518 void
2519 gst_ptp_statistics_callback_remove (gulong id)
2520 {
2521   g_mutex_lock (&ptp_lock);
2522   if (g_hook_destroy (&domain_stats_hooks, id))
2523     g_atomic_int_add (&domain_stats_n_hooks, -1);
2524   g_mutex_unlock (&ptp_lock);
2525 }
2526
2527 #else /* HAVE_PTP */
2528
2529 GType
2530 gst_ptp_clock_get_type (void)
2531 {
2532   return G_TYPE_INVALID;
2533 }
2534
2535 gboolean
2536 gst_ptp_is_supported (void)
2537 {
2538   return FALSE;
2539 }
2540
2541 gboolean
2542 gst_ptp_is_initialized (void)
2543 {
2544   return FALSE;
2545 }
2546
2547 gboolean
2548 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
2549 {
2550   return FALSE;
2551 }
2552
2553 void
2554 gst_ptp_deinit (void)
2555 {
2556 }
2557
2558 GstClock *
2559 gst_ptp_clock_new (const gchar * name, guint domain)
2560 {
2561   return NULL;
2562 }
2563
2564 gboolean
2565 gst_ptp_clock_wait_ready (GstPtpClock * self, GstClockTime timeout)
2566 {
2567   return FALSE;
2568 }
2569
2570 gulong
2571 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2572     gpointer user_data, GDestroyNotify destroy_data)
2573 {
2574   return 0;
2575 }
2576
2577 void
2578 gst_ptp_statistics_callback_remove (gulong id)
2579 {
2580   return;
2581 }
2582
2583 #endif