2 * gstrtponviftimestamp.h
4 * Copyright (C) 2014 Axis Communications AB
5 * Author: Guillaume Desmottes <guillaume.desmottes@collabora.com>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
29 #include <gst/rtp/gstrtpbuffer.h>
31 #include "gstrtponviftimestamp.h"
33 #define GST_NTP_OFFSET_EVENT_NAME "GstNtpOffset"
35 #define DEFAULT_NTP_OFFSET GST_CLOCK_TIME_NONE
36 #define DEFAULT_CSEQ 0
37 #define DEFAULT_SET_E_BIT FALSE
38 #define DEFAULT_SET_T_BIT FALSE
39 #define DEFAULT_DROP_OUT_OF_SEGMENT TRUE
41 GST_DEBUG_CATEGORY_STATIC (rtponviftimestamp_debug);
42 #define GST_CAT_DEFAULT (rtponviftimestamp_debug)
44 static GstFlowReturn gst_rtp_onvif_timestamp_chain (GstPad * pad,
45 GstObject * parent, GstBuffer * buf);
46 static GstFlowReturn gst_rtp_onvif_timestamp_chain_list (GstPad * pad,
47 GstObject * parent, GstBufferList * list);
49 static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self,
51 static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
52 GstBufferList * list);
54 static GstStaticPadTemplate sink_template_factory =
55 GST_STATIC_PAD_TEMPLATE ("sink",
58 GST_STATIC_CAPS ("application/x-rtp")
61 static GstStaticPadTemplate src_template_factory =
62 GST_STATIC_PAD_TEMPLATE ("src",
65 GST_STATIC_CAPS ("application/x-rtp")
75 PROP_DROP_OUT_OF_SEGMENT
78 /*static guint gst_rtp_onvif_timestamp_signals[LAST_SIGNAL] = { 0 }; */
80 G_DEFINE_TYPE (GstRtpOnvifTimestamp, gst_rtp_onvif_timestamp, GST_TYPE_ELEMENT);
81 GST_ELEMENT_REGISTER_DEFINE (rtponviftimestamp, "rtponviftimestamp",
82 GST_RANK_NONE, GST_TYPE_RTP_ONVIF_TIMESTAMP);
85 gst_rtp_onvif_timestamp_get_property (GObject * object,
86 guint prop_id, GValue * value, GParamSpec * pspec)
88 GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
92 g_value_set_uint64 (value, self->prop_ntp_offset);
95 g_value_set_uint (value, self->prop_cseq);
98 g_value_set_boolean (value, self->prop_set_e_bit);
101 g_value_set_boolean (value, self->prop_set_t_bit);
103 case PROP_DROP_OUT_OF_SEGMENT:
104 g_value_set_boolean (value, self->prop_drop_out_of_segment);
107 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
113 gst_rtp_onvif_timestamp_set_property (GObject * object,
114 guint prop_id, const GValue * value, GParamSpec * pspec)
116 GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
119 case PROP_NTP_OFFSET:
120 self->prop_ntp_offset = g_value_get_uint64 (value);
123 self->prop_cseq = g_value_get_uint (value);
126 self->prop_set_e_bit = g_value_get_boolean (value);
129 self->prop_set_t_bit = g_value_get_boolean (value);
131 case PROP_DROP_OUT_OF_SEGMENT:
132 self->prop_drop_out_of_segment = g_value_get_boolean (value);
135 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
140 /* send cached buffer or list, and events, if present */
142 send_cached_buffer_and_events (GstRtpOnvifTimestamp * self)
144 GstFlowReturn ret = GST_FLOW_OK;
146 g_assert (!(self->buffer && self->list));
149 GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->buffer);
150 ret = handle_and_push_buffer (self, self->buffer);
154 GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->list);
155 ret = handle_and_push_buffer_list (self, self->list);
159 if (ret != GST_FLOW_OK)
162 while (!g_queue_is_empty (self->event_queue)) {
165 event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
166 GST_LOG_OBJECT (self->sinkpad, "sending %" GST_PTR_FORMAT, event);
167 (void) gst_pad_send_event (self->sinkpad, event);
175 purge_cached_buffer_and_events (GstRtpOnvifTimestamp * self)
177 g_assert (!(self->buffer && self->list));
180 GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->buffer);
181 gst_buffer_unref (self->buffer);
185 GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->list);
186 gst_buffer_list_unref (self->list);
190 while (!g_queue_is_empty (self->event_queue)) {
193 event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
194 gst_event_unref (event);
198 static GstStateChangeReturn
199 gst_rtp_onvif_timestamp_change_state (GstElement * element,
200 GstStateChange transition)
202 GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (element);
203 GstStateChangeReturn ret;
205 switch (transition) {
206 case GST_STATE_CHANGE_READY_TO_PAUSED:
207 self->ntp_offset = self->prop_ntp_offset;
208 GST_DEBUG_OBJECT (self, "ntp-offset: %" GST_TIME_FORMAT,
209 GST_TIME_ARGS (self->ntp_offset));
210 self->set_d_bit = TRUE;
211 self->set_e_bit = FALSE;
212 self->set_t_bit = FALSE;
218 ret = GST_ELEMENT_CLASS (gst_rtp_onvif_timestamp_parent_class)->change_state
219 (element, transition);
221 if (ret == GST_STATE_CHANGE_FAILURE)
224 switch (transition) {
225 case GST_STATE_CHANGE_PAUSED_TO_READY:
226 purge_cached_buffer_and_events (self);
227 gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
237 gst_rtp_onvif_timestamp_finalize (GObject * object)
239 GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
241 g_queue_free (self->event_queue);
243 G_OBJECT_CLASS (gst_rtp_onvif_timestamp_parent_class)->finalize (object);
247 gst_rtp_onvif_timestamp_class_init (GstRtpOnvifTimestampClass * klass)
249 GObjectClass *gobject_class;
250 GstElementClass *gstelement_class;
252 gobject_class = G_OBJECT_CLASS (klass);
253 gstelement_class = GST_ELEMENT_CLASS (klass);
255 gobject_class->get_property = gst_rtp_onvif_timestamp_get_property;
256 gobject_class->set_property = gst_rtp_onvif_timestamp_set_property;
257 gobject_class->finalize = gst_rtp_onvif_timestamp_finalize;
259 g_object_class_install_property (gobject_class, PROP_NTP_OFFSET,
260 g_param_spec_uint64 ("ntp-offset", "NTP offset",
261 "Offset between the pipeline running time and the absolute UTC time, "
262 "in nano-seconds since 1900 (-1 for automatic computation)",
264 DEFAULT_NTP_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266 g_object_class_install_property (gobject_class, PROP_CSEQ,
267 g_param_spec_uint ("cseq", "CSeq",
268 "The RTSP CSeq which initiated the playback",
270 DEFAULT_CSEQ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
272 g_object_class_install_property (gobject_class, PROP_SET_E_BIT,
273 g_param_spec_boolean ("set-e-bit", "Set 'E' bit",
274 "If the element should set the 'E' bit as defined in the ONVIF RTP "
275 "extension. This increases latency by one packet",
276 DEFAULT_SET_E_BIT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
278 g_object_class_install_property (gobject_class, PROP_SET_T_BIT,
279 g_param_spec_boolean ("set-t-bit", "Set 'T' bit",
280 "If the element should set the 'T' bit as defined in the ONVIF RTP "
281 "extension. This increases latency by one packet",
282 DEFAULT_SET_T_BIT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
284 g_object_class_install_property (gobject_class, PROP_DROP_OUT_OF_SEGMENT,
285 g_param_spec_boolean ("drop-out-of-segment", "Drop out of segment",
286 "Whether the element should drop buffers that fall outside the segment, "
287 "not part of the specification but allows full reverse playback.",
288 DEFAULT_DROP_OUT_OF_SEGMENT,
289 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292 gst_element_class_add_static_pad_template (gstelement_class,
293 &sink_template_factory);
294 gst_element_class_add_static_pad_template (gstelement_class,
295 &src_template_factory);
297 gst_element_class_set_static_metadata (gstelement_class,
298 "ONVIF NTP timestamps RTP extension", "Effect/RTP",
299 "Add absolute timestamps and flags of recorded data in a playback "
300 "session", "Guillaume Desmottes <guillaume.desmottes@collabora.com>");
302 gstelement_class->change_state =
303 GST_DEBUG_FUNCPTR (gst_rtp_onvif_timestamp_change_state);
305 GST_DEBUG_CATEGORY_INIT (rtponviftimestamp_debug, "rtponviftimestamp",
306 0, "ONVIF NTP timestamps RTP extension");
310 parse_event_ntp_offset (GstRtpOnvifTimestamp * self, GstEvent * event,
311 GstClockTime * offset, gboolean * discont)
313 const GstStructure *structure = gst_event_get_structure (event);
314 GstClockTime event_offset;
315 gboolean event_discont;
317 if (!gst_structure_get_clock_time (structure, "ntp-offset", &event_offset)) {
318 GST_ERROR_OBJECT (self, "no ntp-offset in %" GST_PTR_FORMAT, event);
321 if (!gst_structure_get_boolean (structure, "discont", &event_discont)) {
322 GST_ERROR_OBJECT (self, "no discontinue in %" GST_PTR_FORMAT, event);
327 *offset = event_offset;
330 *discont = event_discont;
336 gst_rtp_onvif_timestamp_sink_event (GstPad * pad, GstObject * parent,
339 GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
340 gboolean drop = FALSE;
343 GST_DEBUG_OBJECT (pad, "handling event %s", GST_EVENT_TYPE_NAME (event));
345 /* handle serialized events, which, should not be enqueued */
346 switch (GST_EVENT_TYPE (event)) {
347 case GST_EVENT_CUSTOM_DOWNSTREAM:
348 /* if the "set-e-bit" property is set, an offset event might mark the
349 * stream as discontinued. We need to check if the currently cached buffer
350 * needs the e-bit before it's pushed */
351 if (self->buffer != NULL && self->prop_set_e_bit &&
352 gst_event_has_name (event, GST_NTP_OFFSET_EVENT_NAME)) {
354 if (parse_event_ntp_offset (self, event, NULL, &discont)) {
355 GST_DEBUG_OBJECT (self, "stream %s discontinued",
356 (discont ? "is" : "is not"));
357 self->set_e_bit = discont;
369 /* Push pending buffers, if any */
370 self->set_e_bit = TRUE;
371 if (self->prop_set_t_bit)
372 self->set_t_bit = TRUE;
373 res = send_cached_buffer_and_events (self);
374 if (res != GST_FLOW_OK) {
381 case GST_EVENT_FLUSH_STOP:
382 purge_cached_buffer_and_events (self);
383 self->set_d_bit = TRUE;
384 self->set_e_bit = FALSE;
385 self->set_t_bit = FALSE;
386 gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
392 /* enqueue serialized events if there is a cached buffer */
393 if (GST_EVENT_IS_SERIALIZED (event) && (self->buffer || self->list)) {
394 GST_DEBUG ("enqueueing serialized event");
395 g_queue_push_tail (self->event_queue, event);
400 /* handle rest of the events */
401 switch (GST_EVENT_TYPE (event)) {
402 case GST_EVENT_CUSTOM_DOWNSTREAM:
403 /* update the ntp-offset after any cached buffer/buffer list has been
404 * pushed. the d-bit of the next buffer/buffer list should be set if
405 * the stream is discontinued */
406 if (gst_event_has_name (event, GST_NTP_OFFSET_EVENT_NAME)) {
409 if (parse_event_ntp_offset (self, event, &offset, &discont)) {
410 GST_DEBUG_OBJECT (self, "new ntp-offset: %" GST_TIME_FORMAT
411 ", stream %s discontinued", GST_TIME_ARGS (offset),
412 (discont ? "is" : "is not"));
413 self->ntp_offset = offset;
414 self->set_d_bit = discont;
421 case GST_EVENT_SEGMENT:
422 gst_event_copy_segment (event, &self->segment);
430 gst_event_unref (event);
432 ret = gst_pad_event_default (pad, parent, event);
438 gst_rtp_onvif_timestamp_init (GstRtpOnvifTimestamp * self)
441 gst_pad_new_from_static_template (&sink_template_factory, "sink");
442 gst_pad_set_chain_function (self->sinkpad, gst_rtp_onvif_timestamp_chain);
443 gst_pad_set_chain_list_function (self->sinkpad,
444 gst_rtp_onvif_timestamp_chain_list);
445 gst_pad_set_event_function (self->sinkpad,
446 gst_rtp_onvif_timestamp_sink_event);
447 gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
448 GST_PAD_SET_PROXY_CAPS (self->sinkpad);
449 GST_PAD_SET_PROXY_ALLOCATION (self->sinkpad);
452 gst_pad_new_from_static_template (&src_template_factory, "src");
453 gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
455 self->prop_ntp_offset = DEFAULT_NTP_OFFSET;
456 self->prop_set_e_bit = DEFAULT_SET_E_BIT;
457 self->prop_set_t_bit = DEFAULT_SET_T_BIT;
458 self->prop_drop_out_of_segment = DEFAULT_DROP_OUT_OF_SEGMENT;
460 gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
462 self->event_queue = g_queue_new ();
467 #define EXTENSION_ID 0xABAC
468 #define EXTENSION_SIZE 3
471 handle_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf)
473 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
480 if (!GST_CLOCK_TIME_IS_VALID (self->ntp_offset)) {
481 GstClock *clock = gst_element_get_clock (GST_ELEMENT (self));
484 GstClockTime clock_time = gst_clock_get_time (clock);
485 guint64 real_time = g_get_real_time ();
486 GstClockTime running_time = clock_time -
487 gst_element_get_base_time (GST_ELEMENT (self));
489 /* convert microseconds to nanoseconds */
492 /* add constant to convert from 1970 based time to 1900 based time */
493 real_time += (G_GUINT64_CONSTANT (2208988800) * GST_SECOND);
495 self->ntp_offset = real_time - running_time;
497 GST_DEBUG_OBJECT (self, "new ntp-offset: %" GST_TIME_FORMAT,
498 GST_TIME_ARGS (self->ntp_offset));
500 gst_object_unref (clock);
502 GST_ELEMENT_ERROR (self, STREAM, FAILED, ("No ntp-offset present"),
503 ("Can not guess ntp-offset with no clock."));
504 /* Received a buffer in PAUSED, so we can't guess the match
505 * between the running time and the NTP clock yet.
511 if (self->segment.format != GST_FORMAT_TIME) {
512 GST_ELEMENT_ERROR (self, STREAM, FAILED,
513 ("did not receive a time segment yet"), (NULL));
517 if (!gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp)) {
518 GST_ELEMENT_ERROR (self, STREAM, FAILED,
519 ("Failed to map RTP buffer"), (NULL));
523 if (!gst_rtp_buffer_set_extension_data (&rtp, EXTENSION_ID, EXTENSION_SIZE)) {
524 GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to set extension data"),
526 gst_rtp_buffer_unmap (&rtp);
530 if (!gst_rtp_buffer_get_extension_data (&rtp, &bits, (gpointer) & data,
532 GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to get extension data"),
534 gst_rtp_buffer_unmap (&rtp);
539 if (GST_BUFFER_PTS_IS_VALID (buf)) {
540 time = gst_segment_to_stream_time (&self->segment, GST_FORMAT_TIME,
541 GST_BUFFER_PTS (buf));
542 } else if (GST_BUFFER_DTS_IS_VALID (buf)) {
543 time = gst_segment_to_stream_time (&self->segment, GST_FORMAT_TIME,
544 GST_BUFFER_DTS (buf));
546 GST_INFO_OBJECT (self,
547 "Buffer doesn't contain any valid DTS or PTS timestamp");
551 if (self->prop_drop_out_of_segment && time == GST_CLOCK_TIME_NONE) {
552 GST_ERROR_OBJECT (self, "Failed to get stream time");
553 gst_rtp_buffer_unmap (&rtp);
557 /* add the offset (in seconds) */
558 if (time != GST_CLOCK_TIME_NONE) {
559 time += self->ntp_offset;
560 /* convert to NTP time. upper 32 bits should contain the seconds
561 * and the lower 32 bits, the fractions of a second. */
562 time = gst_util_uint64_scale (time, (G_GINT64_CONSTANT (1) << 32),
566 GST_DEBUG_OBJECT (self, "timestamp: %" G_GUINT64_FORMAT, time);
568 GST_WRITE_UINT64_BE (data, time);
570 /* The next byte is composed of: C E D T mbz (4 bits) */
572 /* Set C if the buffer does *not* have the DELTA_UNIT flag as it means
573 * that's a key frame (or 'clean point'). */
574 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
575 GST_DEBUG_OBJECT (self, "set C flag");
579 /* Set E if this the last buffer of a contiguous section of recording */
580 if (self->set_e_bit) {
581 GST_DEBUG_OBJECT (self, "set E flag");
583 self->set_e_bit = FALSE;
586 /* Set D if the buffer has the DISCONT flag */
587 if (self->set_d_bit || GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DISCONT)) {
588 GST_DEBUG_OBJECT (self, "set D flag");
590 self->set_d_bit = FALSE;
593 /* Set T if we have received EOS */
594 if (self->set_t_bit) {
595 GST_DEBUG_OBJECT (self, "set T flag");
597 self->set_t_bit = FALSE;
600 GST_WRITE_UINT8 (data + 8, field);
602 /* CSeq (low-order byte) */
603 GST_WRITE_UINT8 (data + 9, (guchar) self->prop_cseq);
605 memset (data + 10, 0, 3);
608 gst_rtp_buffer_unmap (&rtp);
612 /* @buf: (transfer full) */
614 handle_and_push_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf)
616 if (!handle_buffer (self, buf)) {
617 gst_buffer_unref (buf);
618 return GST_FLOW_ERROR;
621 return gst_pad_push (self->srcpad, buf);
625 gst_rtp_onvif_timestamp_chain (GstPad * pad, GstObject * parent,
628 GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
629 GstFlowReturn result = GST_FLOW_OK;
631 if (!self->prop_set_e_bit && !self->prop_set_t_bit) {
632 /* Modify and push this buffer right away */
633 return handle_and_push_buffer (self, buf);
636 /* send any previously cached item(s), this leaves an empty queue */
637 result = send_cached_buffer_and_events (self);
639 /* enqueue the new item, as the only item in the queue */
644 /* @buf: (transfer full) */
646 handle_and_push_buffer_list (GstRtpOnvifTimestamp * self, GstBufferList * list)
650 /* Set the extension on the *first* buffer */
651 buf = gst_buffer_list_get (list, 0);
652 if (!handle_buffer (self, buf)) {
653 gst_buffer_list_unref (list);
654 return GST_FLOW_ERROR;
657 return gst_pad_push_list (self->srcpad, list);
660 /* gst_pad_chain_list_default() refs the buffer when passing it to the chain
661 * function, making it not writable. We implement our own chain_list function
662 * to avoid having to copy each buffer. */
664 gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent,
665 GstBufferList * list)
667 GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
668 GstFlowReturn result = GST_FLOW_OK;
670 if (!self->prop_set_e_bit && !self->prop_set_t_bit) {
671 return handle_and_push_buffer_list (self, list);
674 /* send any previously cached item(s), this leaves an empty queue */
675 result = send_cached_buffer_and_events (self);
677 /* enqueue the new item, as the only item in the queue */