tee: Implement allocation query aggregation
[platform/upstream/gstreamer.git] / plugins / elements / gstidentity.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2005 Wim Taymans <wim@fluendo.com>
5  *
6  * gstidentity.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:element-identity
25  * @title: identity
26  *
27  * Dummy element that passes incoming data through unmodified. It has some
28  * useful diagnostic functions, such as offset and timestamp checking.
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #  include "config.h"
33 #endif
34
35 #include <stdlib.h>
36 #include <string.h>
37
38 #include "gstelements_private.h"
39 #include "../../gst/gst-i18n-lib.h"
40 #include "gstidentity.h"
41
42 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
43     GST_PAD_SINK,
44     GST_PAD_ALWAYS,
45     GST_STATIC_CAPS_ANY);
46
47 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
48     GST_PAD_SRC,
49     GST_PAD_ALWAYS,
50     GST_STATIC_CAPS_ANY);
51
52 GST_DEBUG_CATEGORY_STATIC (gst_identity_debug);
53 #define GST_CAT_DEFAULT gst_identity_debug
54
55 /* Identity signals and args */
56 enum
57 {
58   SIGNAL_HANDOFF,
59   /* FILL ME */
60   LAST_SIGNAL
61 };
62
63 #define DEFAULT_SLEEP_TIME              0
64 #define DEFAULT_DUPLICATE               1
65 #define DEFAULT_ERROR_AFTER             -1
66 #define DEFAULT_DROP_PROBABILITY        0.0
67 #define DEFAULT_DROP_BUFFER_FLAGS       0
68 #define DEFAULT_DATARATE                0
69 #define DEFAULT_SILENT                  TRUE
70 #define DEFAULT_SINGLE_SEGMENT          FALSE
71 #define DEFAULT_DUMP                    FALSE
72 #define DEFAULT_SYNC                    FALSE
73 #define DEFAULT_CHECK_IMPERFECT_TIMESTAMP FALSE
74 #define DEFAULT_CHECK_IMPERFECT_OFFSET    FALSE
75 #define DEFAULT_SIGNAL_HANDOFFS           TRUE
76 #define DEFAULT_TS_OFFSET               0
77
78 enum
79 {
80   PROP_0,
81   PROP_SLEEP_TIME,
82   PROP_ERROR_AFTER,
83   PROP_DROP_PROBABILITY,
84   PROP_DROP_BUFFER_FLAGS,
85   PROP_DATARATE,
86   PROP_SILENT,
87   PROP_SINGLE_SEGMENT,
88   PROP_LAST_MESSAGE,
89   PROP_DUMP,
90   PROP_SYNC,
91   PROP_TS_OFFSET,
92   PROP_CHECK_IMPERFECT_TIMESTAMP,
93   PROP_CHECK_IMPERFECT_OFFSET,
94   PROP_SIGNAL_HANDOFFS
95 };
96
97
98 #define _do_init \
99     GST_DEBUG_CATEGORY_INIT (gst_identity_debug, "identity", 0, "identity element");
100 #define gst_identity_parent_class parent_class
101 G_DEFINE_TYPE_WITH_CODE (GstIdentity, gst_identity, GST_TYPE_BASE_TRANSFORM,
102     _do_init);
103
104 static void gst_identity_finalize (GObject * object);
105 static void gst_identity_set_property (GObject * object, guint prop_id,
106     const GValue * value, GParamSpec * pspec);
107 static void gst_identity_get_property (GObject * object, guint prop_id,
108     GValue * value, GParamSpec * pspec);
109
110 static gboolean gst_identity_sink_event (GstBaseTransform * trans,
111     GstEvent * event);
112 static GstFlowReturn gst_identity_transform_ip (GstBaseTransform * trans,
113     GstBuffer * buf);
114 static gboolean gst_identity_start (GstBaseTransform * trans);
115 static gboolean gst_identity_stop (GstBaseTransform * trans);
116 static GstStateChangeReturn gst_identity_change_state (GstElement * element,
117     GstStateChange transition);
118 static gboolean gst_identity_accept_caps (GstBaseTransform * base,
119     GstPadDirection direction, GstCaps * caps);
120 static gboolean gst_identity_query (GstBaseTransform * base,
121     GstPadDirection direction, GstQuery * query);
122
123 static guint gst_identity_signals[LAST_SIGNAL] = { 0 };
124
125 static GParamSpec *pspec_last_message = NULL;
126
127 static void
128 gst_identity_finalize (GObject * object)
129 {
130   GstIdentity *identity;
131
132   identity = GST_IDENTITY (object);
133
134   g_free (identity->last_message);
135   g_cond_clear (&identity->blocked_cond);
136
137   G_OBJECT_CLASS (parent_class)->finalize (object);
138 }
139
140 static void
141 gst_identity_class_init (GstIdentityClass * klass)
142 {
143   GObjectClass *gobject_class;
144   GstElementClass *gstelement_class;
145   GstBaseTransformClass *gstbasetrans_class;
146
147   gobject_class = G_OBJECT_CLASS (klass);
148   gstelement_class = GST_ELEMENT_CLASS (klass);
149   gstbasetrans_class = GST_BASE_TRANSFORM_CLASS (klass);
150
151   gobject_class->set_property = gst_identity_set_property;
152   gobject_class->get_property = gst_identity_get_property;
153
154   g_object_class_install_property (gobject_class, PROP_SLEEP_TIME,
155       g_param_spec_uint ("sleep-time", "Sleep time",
156           "Microseconds to sleep between processing", 0, G_MAXUINT,
157           DEFAULT_SLEEP_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158   g_object_class_install_property (gobject_class, PROP_ERROR_AFTER,
159       g_param_spec_int ("error-after", "Error After", "Error after N buffers",
160           G_MININT, G_MAXINT, DEFAULT_ERROR_AFTER,
161           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162   g_object_class_install_property (gobject_class, PROP_DROP_PROBABILITY,
163       g_param_spec_float ("drop-probability", "Drop Probability",
164           "The Probability a buffer is dropped", 0.0, 1.0,
165           DEFAULT_DROP_PROBABILITY,
166           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167
168   /**
169    * GstIdentity:drop-buffer-flags:
170    *
171    * Drop buffers with the given flags.
172    *
173    * Since: 1.8
174    **/
175   g_object_class_install_property (gobject_class, PROP_DROP_BUFFER_FLAGS,
176       g_param_spec_flags ("drop-buffer-flags", "Check flags to drop buffers",
177           "Drop buffers with the given flags",
178           GST_TYPE_BUFFER_FLAGS, DEFAULT_DROP_BUFFER_FLAGS,
179           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
180   g_object_class_install_property (gobject_class, PROP_DATARATE,
181       g_param_spec_int ("datarate", "Datarate",
182           "(Re)timestamps buffers with number of bytes per second (0 = inactive)",
183           0, G_MAXINT, DEFAULT_DATARATE,
184           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185   g_object_class_install_property (gobject_class, PROP_SILENT,
186       g_param_spec_boolean ("silent", "silent", "silent", DEFAULT_SILENT,
187           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
188   g_object_class_install_property (gobject_class, PROP_SINGLE_SEGMENT,
189       g_param_spec_boolean ("single-segment", "Single Segment",
190           "Timestamp buffers and eat segments so as to appear as one segment",
191           DEFAULT_SINGLE_SEGMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192   pspec_last_message = g_param_spec_string ("last-message", "last-message",
193       "last-message", NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
194   g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
195       pspec_last_message);
196   g_object_class_install_property (gobject_class, PROP_DUMP,
197       g_param_spec_boolean ("dump", "Dump", "Dump buffer contents to stdout",
198           DEFAULT_DUMP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
199   g_object_class_install_property (gobject_class, PROP_SYNC,
200       g_param_spec_boolean ("sync", "Synchronize",
201           "Synchronize to pipeline clock", DEFAULT_SYNC,
202           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
203   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
204       g_param_spec_int64 ("ts-offset", "Timestamp offset for synchronisation",
205           "Timestamp offset in nanoseconds for synchronisation, negative for earlier sync",
206           G_MININT64, G_MAXINT64, DEFAULT_TS_OFFSET,
207           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
208   g_object_class_install_property (gobject_class,
209       PROP_CHECK_IMPERFECT_TIMESTAMP,
210       g_param_spec_boolean ("check-imperfect-timestamp",
211           "Check for discontiguous timestamps",
212           "Send element messages if timestamps and durations do not match up",
213           DEFAULT_CHECK_IMPERFECT_TIMESTAMP,
214           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215   g_object_class_install_property (gobject_class, PROP_CHECK_IMPERFECT_OFFSET,
216       g_param_spec_boolean ("check-imperfect-offset",
217           "Check for discontiguous offset",
218           "Send element messages if offset and offset_end do not match up",
219           DEFAULT_CHECK_IMPERFECT_OFFSET,
220           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
221
222   /**
223    * GstIdentity:signal-handoffs
224    *
225    * If set to #TRUE, the identity will emit a handoff signal when handling a buffer.
226    * When set to #FALSE, no signal will be emitted, which might improve performance.
227    */
228   g_object_class_install_property (gobject_class, PROP_SIGNAL_HANDOFFS,
229       g_param_spec_boolean ("signal-handoffs",
230           "Signal handoffs", "Send a signal before pushing the buffer",
231           DEFAULT_SIGNAL_HANDOFFS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232
233   /**
234    * GstIdentity::handoff:
235    * @identity: the identity instance
236    * @buffer: the buffer that just has been received
237    * @pad: the pad that received it
238    *
239    * This signal gets emitted before passing the buffer downstream.
240    */
241   gst_identity_signals[SIGNAL_HANDOFF] =
242       g_signal_new ("handoff", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
243       G_STRUCT_OFFSET (GstIdentityClass, handoff), NULL, NULL,
244       g_cclosure_marshal_generic, G_TYPE_NONE, 1,
245       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
246
247   gobject_class->finalize = gst_identity_finalize;
248
249   gst_element_class_set_static_metadata (gstelement_class,
250       "Identity",
251       "Generic",
252       "Pass data without modification", "Erik Walthinsen <omega@cse.ogi.edu>");
253   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
254   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
255
256   gstelement_class->change_state =
257       GST_DEBUG_FUNCPTR (gst_identity_change_state);
258
259   gstbasetrans_class->sink_event = GST_DEBUG_FUNCPTR (gst_identity_sink_event);
260   gstbasetrans_class->transform_ip =
261       GST_DEBUG_FUNCPTR (gst_identity_transform_ip);
262   gstbasetrans_class->start = GST_DEBUG_FUNCPTR (gst_identity_start);
263   gstbasetrans_class->stop = GST_DEBUG_FUNCPTR (gst_identity_stop);
264   gstbasetrans_class->accept_caps =
265       GST_DEBUG_FUNCPTR (gst_identity_accept_caps);
266   gstbasetrans_class->query = gst_identity_query;
267 }
268
269 static void
270 gst_identity_init (GstIdentity * identity)
271 {
272   identity->sleep_time = DEFAULT_SLEEP_TIME;
273   identity->error_after = DEFAULT_ERROR_AFTER;
274   identity->drop_probability = DEFAULT_DROP_PROBABILITY;
275   identity->drop_buffer_flags = DEFAULT_DROP_BUFFER_FLAGS;
276   identity->datarate = DEFAULT_DATARATE;
277   identity->silent = DEFAULT_SILENT;
278   identity->single_segment = DEFAULT_SINGLE_SEGMENT;
279   identity->sync = DEFAULT_SYNC;
280   identity->check_imperfect_timestamp = DEFAULT_CHECK_IMPERFECT_TIMESTAMP;
281   identity->check_imperfect_offset = DEFAULT_CHECK_IMPERFECT_OFFSET;
282   identity->dump = DEFAULT_DUMP;
283   identity->last_message = NULL;
284   identity->signal_handoffs = DEFAULT_SIGNAL_HANDOFFS;
285   identity->ts_offset = DEFAULT_TS_OFFSET;
286   g_cond_init (&identity->blocked_cond);
287
288   gst_base_transform_set_gap_aware (GST_BASE_TRANSFORM_CAST (identity), TRUE);
289 }
290
291 static void
292 gst_identity_notify_last_message (GstIdentity * identity)
293 {
294   g_object_notify_by_pspec ((GObject *) identity, pspec_last_message);
295 }
296
297 static GstFlowReturn
298 gst_identity_do_sync (GstIdentity * identity, GstClockTime running_time)
299 {
300   GstFlowReturn ret = GST_FLOW_OK;
301
302   if (identity->sync &&
303       GST_BASE_TRANSFORM_CAST (identity)->segment.format == GST_FORMAT_TIME) {
304     GstClock *clock;
305
306     GST_OBJECT_LOCK (identity);
307
308     if (identity->flushing) {
309       GST_OBJECT_UNLOCK (identity);
310       return GST_FLOW_FLUSHING;
311     }
312
313     while (identity->blocked)
314       g_cond_wait (&identity->blocked_cond, GST_OBJECT_GET_LOCK (identity));
315
316     if (identity->flushing) {
317       GST_OBJECT_UNLOCK (identity);
318       return GST_FLOW_FLUSHING;
319     }
320
321     if ((clock = GST_ELEMENT (identity)->clock)) {
322       GstClockReturn cret;
323       GstClockTime timestamp;
324       GstClockTimeDiff ts_offset = identity->ts_offset;
325
326       timestamp = running_time + GST_ELEMENT (identity)->base_time +
327           identity->upstream_latency;
328       if (ts_offset < 0) {
329         ts_offset = -ts_offset;
330         if (ts_offset < timestamp)
331           timestamp -= ts_offset;
332         else
333           timestamp = 0;
334       } else
335         timestamp += ts_offset;
336
337       /* save id if we need to unlock */
338       identity->clock_id = gst_clock_new_single_shot_id (clock, timestamp);
339       GST_OBJECT_UNLOCK (identity);
340
341       cret = gst_clock_id_wait (identity->clock_id, NULL);
342
343       GST_OBJECT_LOCK (identity);
344       if (identity->clock_id) {
345         gst_clock_id_unref (identity->clock_id);
346         identity->clock_id = NULL;
347       }
348       if (cret == GST_CLOCK_UNSCHEDULED || identity->flushing)
349         ret = GST_FLOW_FLUSHING;
350     }
351     GST_OBJECT_UNLOCK (identity);
352   }
353
354   return ret;
355 }
356
357 static gboolean
358 gst_identity_sink_event (GstBaseTransform * trans, GstEvent * event)
359 {
360   GstIdentity *identity;
361   gboolean ret = TRUE;
362
363   identity = GST_IDENTITY (trans);
364
365   if (!identity->silent) {
366     const GstStructure *s;
367     const gchar *tstr;
368     gchar *sstr;
369
370     GST_OBJECT_LOCK (identity);
371     g_free (identity->last_message);
372
373     tstr = gst_event_type_get_name (GST_EVENT_TYPE (event));
374     if ((s = gst_event_get_structure (event)))
375       sstr = gst_structure_to_string (s);
376     else
377       sstr = g_strdup ("");
378
379     identity->last_message =
380         g_strdup_printf ("event   ******* (%s:%s) E (type: %s (%d), %s) %p",
381         GST_DEBUG_PAD_NAME (trans->sinkpad), tstr, GST_EVENT_TYPE (event),
382         sstr, event);
383     g_free (sstr);
384     GST_OBJECT_UNLOCK (identity);
385
386     gst_identity_notify_last_message (identity);
387   }
388
389   if (identity->single_segment && (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) {
390     if (!trans->have_segment) {
391       GstEvent *news;
392       GstSegment segment;
393
394       gst_event_copy_segment (event, &segment);
395       gst_event_copy_segment (event, &trans->segment);
396       trans->have_segment = TRUE;
397
398       /* This is the first segment, send out a (0, -1) segment */
399       gst_segment_init (&segment, segment.format);
400       news = gst_event_new_segment (&segment);
401
402       gst_pad_event_default (trans->sinkpad, GST_OBJECT_CAST (trans), news);
403     } else {
404       /* need to track segment for proper running time */
405       gst_event_copy_segment (event, &trans->segment);
406     }
407   }
408
409   if (GST_EVENT_TYPE (event) == GST_EVENT_GAP &&
410       trans->have_segment && trans->segment.format == GST_FORMAT_TIME) {
411     GstClockTime start, dur;
412
413     gst_event_parse_gap (event, &start, &dur);
414     if (GST_CLOCK_TIME_IS_VALID (start)) {
415       start = gst_segment_to_running_time (&trans->segment,
416           GST_FORMAT_TIME, start);
417
418       gst_identity_do_sync (identity, start);
419
420       /* also transform GAP timestamp similar to buffer timestamps */
421       if (identity->single_segment) {
422         gst_event_unref (event);
423         event = gst_event_new_gap (start, dur);
424       }
425     }
426   }
427
428   /* Reset previous timestamp, duration and offsets on SEGMENT
429    * to prevent false warnings when checking for perfect streams */
430   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
431     identity->prev_timestamp = identity->prev_duration = GST_CLOCK_TIME_NONE;
432     identity->prev_offset = identity->prev_offset_end = GST_BUFFER_OFFSET_NONE;
433   }
434
435   if (identity->single_segment && GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
436     /* eat up segments */
437     gst_event_unref (event);
438     ret = TRUE;
439   } else {
440     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START) {
441       GST_OBJECT_LOCK (identity);
442       identity->flushing = TRUE;
443       if (identity->clock_id) {
444         GST_DEBUG_OBJECT (identity, "unlock clock wait");
445         gst_clock_id_unschedule (identity->clock_id);
446       }
447       GST_OBJECT_UNLOCK (identity);
448     } else if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) {
449       GST_OBJECT_LOCK (identity);
450       identity->flushing = FALSE;
451       GST_OBJECT_UNLOCK (identity);
452     }
453
454     ret = GST_BASE_TRANSFORM_CLASS (parent_class)->sink_event (trans, event);
455   }
456
457   return ret;
458 }
459
460 static void
461 gst_identity_check_imperfect_timestamp (GstIdentity * identity, GstBuffer * buf)
462 {
463   GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buf);
464
465   /* invalid timestamp drops us out of check.  FIXME: maybe warn ? */
466   if (timestamp != GST_CLOCK_TIME_NONE) {
467     /* check if we had a previous buffer to compare to */
468     if (identity->prev_timestamp != GST_CLOCK_TIME_NONE &&
469         identity->prev_duration != GST_CLOCK_TIME_NONE) {
470       GstClockTime t_expected;
471       GstClockTimeDiff dt;
472
473       t_expected = identity->prev_timestamp + identity->prev_duration;
474       dt = GST_CLOCK_DIFF (t_expected, timestamp);
475       if (dt != 0) {
476         /*
477          * "imperfect-timestamp" bus message:
478          * @identity:        the identity instance
479          * @delta:           the GST_CLOCK_DIFF to the prev timestamp
480          * @prev-timestamp:  the previous buffer timestamp
481          * @prev-duration:   the previous buffer duration
482          * @prev-offset:     the previous buffer offset
483          * @prev-offset-end: the previous buffer offset end
484          * @cur-timestamp:   the current buffer timestamp
485          * @cur-duration:    the current buffer duration
486          * @cur-offset:      the current buffer offset
487          * @cur-offset-end:  the current buffer offset end
488          *
489          * This bus message gets emitted if the check-imperfect-timestamp
490          * property is set and there is a gap in time between the
491          * last buffer and the newly received buffer.
492          */
493         gst_element_post_message (GST_ELEMENT (identity),
494             gst_message_new_element (GST_OBJECT (identity),
495                 gst_structure_new ("imperfect-timestamp",
496                     "delta", G_TYPE_INT64, dt,
497                     "prev-timestamp", G_TYPE_UINT64,
498                     identity->prev_timestamp, "prev-duration", G_TYPE_UINT64,
499                     identity->prev_duration, "prev-offset", G_TYPE_UINT64,
500                     identity->prev_offset, "prev-offset-end", G_TYPE_UINT64,
501                     identity->prev_offset_end, "cur-timestamp", G_TYPE_UINT64,
502                     timestamp, "cur-duration", G_TYPE_UINT64,
503                     GST_BUFFER_DURATION (buf), "cur-offset", G_TYPE_UINT64,
504                     GST_BUFFER_OFFSET (buf), "cur-offset-end", G_TYPE_UINT64,
505                     GST_BUFFER_OFFSET_END (buf), NULL)));
506       }
507     } else {
508       GST_DEBUG_OBJECT (identity, "can't check data-contiguity, no "
509           "offset_end was set on previous buffer");
510     }
511   }
512 }
513
514 static void
515 gst_identity_check_imperfect_offset (GstIdentity * identity, GstBuffer * buf)
516 {
517   guint64 offset;
518
519   offset = GST_BUFFER_OFFSET (buf);
520
521   if (identity->prev_offset_end != offset &&
522       identity->prev_offset_end != GST_BUFFER_OFFSET_NONE &&
523       offset != GST_BUFFER_OFFSET_NONE) {
524     /*
525      * "imperfect-offset" bus message:
526      * @identity:        the identity instance
527      * @prev-timestamp:  the previous buffer timestamp
528      * @prev-duration:   the previous buffer duration
529      * @prev-offset:     the previous buffer offset
530      * @prev-offset-end: the previous buffer offset end
531      * @cur-timestamp:   the current buffer timestamp
532      * @cur-duration:    the current buffer duration
533      * @cur-offset:      the current buffer offset
534      * @cur-offset-end:  the current buffer offset end
535      *
536      * This bus message gets emitted if the check-imperfect-offset
537      * property is set and there is a gap in offsets between the
538      * last buffer and the newly received buffer.
539      */
540     gst_element_post_message (GST_ELEMENT (identity),
541         gst_message_new_element (GST_OBJECT (identity),
542             gst_structure_new ("imperfect-offset", "prev-timestamp",
543                 G_TYPE_UINT64, identity->prev_timestamp, "prev-duration",
544                 G_TYPE_UINT64, identity->prev_duration, "prev-offset",
545                 G_TYPE_UINT64, identity->prev_offset, "prev-offset-end",
546                 G_TYPE_UINT64, identity->prev_offset_end, "cur-timestamp",
547                 G_TYPE_UINT64, GST_BUFFER_TIMESTAMP (buf), "cur-duration",
548                 G_TYPE_UINT64, GST_BUFFER_DURATION (buf), "cur-offset",
549                 G_TYPE_UINT64, GST_BUFFER_OFFSET (buf), "cur-offset-end",
550                 G_TYPE_UINT64, GST_BUFFER_OFFSET_END (buf), NULL)));
551   } else {
552     GST_DEBUG_OBJECT (identity, "can't check offset contiguity, no offset "
553         "and/or offset_end were set on previous buffer");
554   }
555 }
556
557 static const gchar *
558 print_pretty_time (gchar * ts_str, gsize ts_str_len, GstClockTime ts)
559 {
560   if (ts == GST_CLOCK_TIME_NONE)
561     return "none";
562
563   g_snprintf (ts_str, ts_str_len, "%" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
564   return ts_str;
565 }
566
567 static void
568 gst_identity_update_last_message_for_buffer (GstIdentity * identity,
569     const gchar * action, GstBuffer * buf, gsize size)
570 {
571   gchar dts_str[64], pts_str[64], dur_str[64];
572   gchar *flag_str, *meta_str;
573
574   GST_OBJECT_LOCK (identity);
575
576   flag_str = gst_buffer_get_flags_string (buf);
577   meta_str = gst_buffer_get_meta_string (buf);
578
579   g_free (identity->last_message);
580   identity->last_message = g_strdup_printf ("%s   ******* (%s:%s) "
581       "(%" G_GSIZE_FORMAT " bytes, dts: %s, pts: %s, duration: %s, offset: %"
582       G_GINT64_FORMAT ", " "offset_end: % " G_GINT64_FORMAT
583       ", flags: %08x %s, meta: %s) %p", action,
584       GST_DEBUG_PAD_NAME (GST_BASE_TRANSFORM_CAST (identity)->sinkpad), size,
585       print_pretty_time (dts_str, sizeof (dts_str), GST_BUFFER_DTS (buf)),
586       print_pretty_time (pts_str, sizeof (pts_str), GST_BUFFER_PTS (buf)),
587       print_pretty_time (dur_str, sizeof (dur_str), GST_BUFFER_DURATION (buf)),
588       GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf),
589       GST_BUFFER_FLAGS (buf), flag_str, meta_str ? meta_str : "none", buf);
590   g_free (flag_str);
591
592   GST_OBJECT_UNLOCK (identity);
593
594   gst_identity_notify_last_message (identity);
595 }
596
597 static GstFlowReturn
598 gst_identity_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
599 {
600   GstFlowReturn ret = GST_FLOW_OK;
601   GstIdentity *identity = GST_IDENTITY (trans);
602   GstClockTime rundts = GST_CLOCK_TIME_NONE;
603   GstClockTime runpts = GST_CLOCK_TIME_NONE;
604   GstClockTime ts, duration, runtimestamp;
605   gsize size;
606
607   size = gst_buffer_get_size (buf);
608
609   if (identity->check_imperfect_timestamp)
610     gst_identity_check_imperfect_timestamp (identity, buf);
611   if (identity->check_imperfect_offset)
612     gst_identity_check_imperfect_offset (identity, buf);
613
614   /* update prev values */
615   identity->prev_timestamp = GST_BUFFER_TIMESTAMP (buf);
616   identity->prev_duration = GST_BUFFER_DURATION (buf);
617   identity->prev_offset_end = GST_BUFFER_OFFSET_END (buf);
618   identity->prev_offset = GST_BUFFER_OFFSET (buf);
619
620   if (identity->error_after >= 0) {
621     identity->error_after--;
622     if (identity->error_after == 0)
623       goto error_after;
624   }
625
626   if (identity->drop_probability > 0.0) {
627     if ((gfloat) (1.0 * rand () / (RAND_MAX)) < identity->drop_probability)
628       goto dropped;
629   }
630
631   if (GST_BUFFER_FLAG_IS_SET (buf, identity->drop_buffer_flags))
632     goto dropped;
633
634   if (identity->dump) {
635     GstMapInfo info;
636
637     if (gst_buffer_map (buf, &info, GST_MAP_READ)) {
638       gst_util_dump_mem (info.data, info.size);
639       gst_buffer_unmap (buf, &info);
640     }
641   }
642
643   if (!identity->silent) {
644     gst_identity_update_last_message_for_buffer (identity, "chain", buf, size);
645   }
646
647   if (identity->datarate > 0) {
648     GstClockTime time = gst_util_uint64_scale_int (identity->offset,
649         GST_SECOND, identity->datarate);
650
651     GST_BUFFER_PTS (buf) = GST_BUFFER_DTS (buf) = time;
652     GST_BUFFER_DURATION (buf) = size * GST_SECOND / identity->datarate;
653   }
654
655   if (identity->signal_handoffs)
656     g_signal_emit (identity, gst_identity_signals[SIGNAL_HANDOFF], 0, buf);
657
658   if (trans->segment.format == GST_FORMAT_TIME) {
659     rundts = gst_segment_to_running_time (&trans->segment,
660         GST_FORMAT_TIME, GST_BUFFER_DTS (buf));
661     runpts = gst_segment_to_running_time (&trans->segment,
662         GST_FORMAT_TIME, GST_BUFFER_PTS (buf));
663   }
664
665   if (GST_CLOCK_TIME_IS_VALID (rundts))
666     runtimestamp = rundts;
667   else if (GST_CLOCK_TIME_IS_VALID (runpts))
668     runtimestamp = runpts;
669   else
670     runtimestamp = 0;
671   ret = gst_identity_do_sync (identity, runtimestamp);
672
673   identity->offset += size;
674
675   if (identity->sleep_time && ret == GST_FLOW_OK)
676     g_usleep (identity->sleep_time);
677
678   if (identity->single_segment && (trans->segment.format == GST_FORMAT_TIME)
679       && (ret == GST_FLOW_OK)) {
680     GST_BUFFER_DTS (buf) = rundts;
681     GST_BUFFER_PTS (buf) = runpts;
682     GST_BUFFER_OFFSET (buf) = GST_CLOCK_TIME_NONE;
683     GST_BUFFER_OFFSET_END (buf) = GST_CLOCK_TIME_NONE;
684   }
685
686   return ret;
687
688   /* ERRORS */
689 error_after:
690   {
691     GST_ELEMENT_ERROR (identity, CORE, FAILED,
692         (_("Failed after iterations as requested.")), (NULL));
693     return GST_FLOW_ERROR;
694   }
695 dropped:
696   {
697     if (!identity->silent) {
698       gst_identity_update_last_message_for_buffer (identity, "dropping", buf,
699           size);
700     }
701
702     ts = GST_BUFFER_TIMESTAMP (buf);
703     if (GST_CLOCK_TIME_IS_VALID (ts)) {
704       duration = GST_BUFFER_DURATION (buf);
705       gst_pad_push_event (GST_BASE_TRANSFORM_SRC_PAD (identity),
706           gst_event_new_gap (ts, duration));
707     }
708
709     /* return DROPPED to basetransform. */
710     return GST_BASE_TRANSFORM_FLOW_DROPPED;
711   }
712 }
713
714 static void
715 gst_identity_set_property (GObject * object, guint prop_id,
716     const GValue * value, GParamSpec * pspec)
717 {
718   GstIdentity *identity;
719
720   identity = GST_IDENTITY (object);
721
722   switch (prop_id) {
723     case PROP_SLEEP_TIME:
724       identity->sleep_time = g_value_get_uint (value);
725       break;
726     case PROP_SILENT:
727       identity->silent = g_value_get_boolean (value);
728       break;
729     case PROP_SINGLE_SEGMENT:
730       identity->single_segment = g_value_get_boolean (value);
731       break;
732     case PROP_DUMP:
733       identity->dump = g_value_get_boolean (value);
734       break;
735     case PROP_ERROR_AFTER:
736       identity->error_after = g_value_get_int (value);
737       break;
738     case PROP_DROP_PROBABILITY:
739       identity->drop_probability = g_value_get_float (value);
740       break;
741     case PROP_DROP_BUFFER_FLAGS:
742       identity->drop_buffer_flags = g_value_get_flags (value);
743       break;
744     case PROP_DATARATE:
745       identity->datarate = g_value_get_int (value);
746       break;
747     case PROP_SYNC:
748       identity->sync = g_value_get_boolean (value);
749       break;
750     case PROP_TS_OFFSET:
751       identity->ts_offset = g_value_get_int64 (value);
752       break;
753     case PROP_CHECK_IMPERFECT_TIMESTAMP:
754       identity->check_imperfect_timestamp = g_value_get_boolean (value);
755       break;
756     case PROP_CHECK_IMPERFECT_OFFSET:
757       identity->check_imperfect_offset = g_value_get_boolean (value);
758       break;
759     case PROP_SIGNAL_HANDOFFS:
760       identity->signal_handoffs = g_value_get_boolean (value);
761       break;
762     default:
763       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
764       break;
765   }
766   if (identity->datarate > 0 || identity->single_segment)
767     gst_base_transform_set_passthrough (GST_BASE_TRANSFORM (identity), FALSE);
768   else
769     gst_base_transform_set_passthrough (GST_BASE_TRANSFORM (identity), TRUE);
770 }
771
772 static void
773 gst_identity_get_property (GObject * object, guint prop_id, GValue * value,
774     GParamSpec * pspec)
775 {
776   GstIdentity *identity;
777
778   identity = GST_IDENTITY (object);
779
780   switch (prop_id) {
781     case PROP_SLEEP_TIME:
782       g_value_set_uint (value, identity->sleep_time);
783       break;
784     case PROP_ERROR_AFTER:
785       g_value_set_int (value, identity->error_after);
786       break;
787     case PROP_DROP_PROBABILITY:
788       g_value_set_float (value, identity->drop_probability);
789       break;
790     case PROP_DROP_BUFFER_FLAGS:
791       g_value_set_flags (value, identity->drop_buffer_flags);
792       break;
793     case PROP_DATARATE:
794       g_value_set_int (value, identity->datarate);
795       break;
796     case PROP_SILENT:
797       g_value_set_boolean (value, identity->silent);
798       break;
799     case PROP_SINGLE_SEGMENT:
800       g_value_set_boolean (value, identity->single_segment);
801       break;
802     case PROP_DUMP:
803       g_value_set_boolean (value, identity->dump);
804       break;
805     case PROP_LAST_MESSAGE:
806       GST_OBJECT_LOCK (identity);
807       g_value_set_string (value, identity->last_message);
808       GST_OBJECT_UNLOCK (identity);
809       break;
810     case PROP_SYNC:
811       g_value_set_boolean (value, identity->sync);
812       break;
813     case PROP_TS_OFFSET:
814       identity->ts_offset = g_value_get_int64 (value);
815       break;
816     case PROP_CHECK_IMPERFECT_TIMESTAMP:
817       g_value_set_boolean (value, identity->check_imperfect_timestamp);
818       break;
819     case PROP_CHECK_IMPERFECT_OFFSET:
820       g_value_set_boolean (value, identity->check_imperfect_offset);
821       break;
822     case PROP_SIGNAL_HANDOFFS:
823       g_value_set_boolean (value, identity->signal_handoffs);
824       break;
825     default:
826       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
827       break;
828   }
829 }
830
831 static gboolean
832 gst_identity_start (GstBaseTransform * trans)
833 {
834   GstIdentity *identity;
835
836   identity = GST_IDENTITY (trans);
837
838   identity->offset = 0;
839   identity->prev_timestamp = GST_CLOCK_TIME_NONE;
840   identity->prev_duration = GST_CLOCK_TIME_NONE;
841   identity->prev_offset_end = GST_BUFFER_OFFSET_NONE;
842   identity->prev_offset = GST_BUFFER_OFFSET_NONE;
843
844   return TRUE;
845 }
846
847 static gboolean
848 gst_identity_stop (GstBaseTransform * trans)
849 {
850   GstIdentity *identity;
851
852   identity = GST_IDENTITY (trans);
853
854   GST_OBJECT_LOCK (identity);
855   g_free (identity->last_message);
856   identity->last_message = NULL;
857   GST_OBJECT_UNLOCK (identity);
858
859   return TRUE;
860 }
861
862 static gboolean
863 gst_identity_accept_caps (GstBaseTransform * base,
864     GstPadDirection direction, GstCaps * caps)
865 {
866   gboolean ret;
867   GstPad *pad;
868
869   /* Proxy accept-caps */
870
871   if (direction == GST_PAD_SRC)
872     pad = GST_BASE_TRANSFORM_SINK_PAD (base);
873   else
874     pad = GST_BASE_TRANSFORM_SRC_PAD (base);
875
876   ret = gst_pad_peer_query_accept_caps (pad, caps);
877
878   return ret;
879 }
880
881 static gboolean
882 gst_identity_query (GstBaseTransform * base, GstPadDirection direction,
883     GstQuery * query)
884 {
885   GstIdentity *identity;
886   gboolean ret;
887
888   identity = GST_IDENTITY (base);
889
890   ret = GST_BASE_TRANSFORM_CLASS (parent_class)->query (base, direction, query);
891
892   if (GST_QUERY_TYPE (query) == GST_QUERY_LATENCY) {
893     gboolean live = FALSE;
894     GstClockTime min = 0, max = 0;
895
896     if (ret) {
897       gst_query_parse_latency (query, &live, &min, &max);
898
899       if (identity->sync && max < min) {
900         GST_ELEMENT_WARNING (base, CORE, CLOCK, (NULL),
901             ("Impossible to configure latency before identity sync=true:"
902                 " max %" GST_TIME_FORMAT " < min %"
903                 GST_TIME_FORMAT ". Add queues or other buffering elements.",
904                 GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
905       }
906     }
907
908     /* Ignore the upstream latency if it is not live */
909     GST_OBJECT_LOCK (identity);
910     if (live)
911       identity->upstream_latency = min;
912     else
913       identity->upstream_latency = 0;
914     GST_OBJECT_UNLOCK (identity);
915
916     gst_query_set_latency (query, live || identity->sync, min, max);
917     ret = TRUE;
918   }
919   return ret;
920 }
921
922 static GstStateChangeReturn
923 gst_identity_change_state (GstElement * element, GstStateChange transition)
924 {
925   GstStateChangeReturn ret;
926   GstIdentity *identity = GST_IDENTITY (element);
927   gboolean no_preroll = FALSE;
928
929   switch (transition) {
930     case GST_STATE_CHANGE_NULL_TO_READY:
931       break;
932     case GST_STATE_CHANGE_READY_TO_PAUSED:
933       GST_OBJECT_LOCK (identity);
934       identity->flushing = FALSE;
935       identity->blocked = TRUE;
936       GST_OBJECT_UNLOCK (identity);
937       if (identity->sync)
938         no_preroll = TRUE;
939       break;
940     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
941       GST_OBJECT_LOCK (identity);
942       identity->blocked = FALSE;
943       g_cond_broadcast (&identity->blocked_cond);
944       GST_OBJECT_UNLOCK (identity);
945       break;
946     case GST_STATE_CHANGE_PAUSED_TO_READY:
947       GST_OBJECT_LOCK (identity);
948       identity->flushing = TRUE;
949       if (identity->clock_id) {
950         GST_DEBUG_OBJECT (identity, "unlock clock wait");
951         gst_clock_id_unschedule (identity->clock_id);
952       }
953       identity->blocked = FALSE;
954       g_cond_broadcast (&identity->blocked_cond);
955       GST_OBJECT_UNLOCK (identity);
956       break;
957     default:
958       break;
959   }
960
961   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
962
963   switch (transition) {
964     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
965       GST_OBJECT_LOCK (identity);
966       identity->upstream_latency = 0;
967       identity->blocked = TRUE;
968       GST_OBJECT_UNLOCK (identity);
969       if (identity->sync)
970         no_preroll = TRUE;
971       break;
972     case GST_STATE_CHANGE_PAUSED_TO_READY:
973       break;
974     case GST_STATE_CHANGE_READY_TO_NULL:
975       break;
976     default:
977       break;
978   }
979
980   if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
981     ret = GST_STATE_CHANGE_NO_PREROLL;
982
983   return ret;
984 }