Move GstAggregator from -bad to core
[platform/upstream/gstreamer.git] / plugins / elements / gstidentity.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2005 Wim Taymans <wim@fluendo.com>
5  *
6  * gstidentity.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:element-identity
25  * @title: identity
26  *
27  * Dummy element that passes incoming data through unmodified. It has some
28  * useful diagnostic functions, such as offset and timestamp checking.
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #  include "config.h"
33 #endif
34
35 #include <stdlib.h>
36 #include <string.h>
37
38 #include "gstelements_private.h"
39 #include "../../gst/gst-i18n-lib.h"
40 #include "gstidentity.h"
41
42 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
43     GST_PAD_SINK,
44     GST_PAD_ALWAYS,
45     GST_STATIC_CAPS_ANY);
46
47 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
48     GST_PAD_SRC,
49     GST_PAD_ALWAYS,
50     GST_STATIC_CAPS_ANY);
51
52 GST_DEBUG_CATEGORY_STATIC (gst_identity_debug);
53 #define GST_CAT_DEFAULT gst_identity_debug
54
55 /* Identity signals and args */
56 enum
57 {
58   SIGNAL_HANDOFF,
59   /* FILL ME */
60   LAST_SIGNAL
61 };
62
63 #define DEFAULT_SLEEP_TIME              0
64 #define DEFAULT_DUPLICATE               1
65 #define DEFAULT_ERROR_AFTER             -1
66 #define DEFAULT_DROP_PROBABILITY        0.0
67 #define DEFAULT_DROP_BUFFER_FLAGS       0
68 #define DEFAULT_DATARATE                0
69 #define DEFAULT_SILENT                  TRUE
70 #define DEFAULT_SINGLE_SEGMENT          FALSE
71 #define DEFAULT_DUMP                    FALSE
72 #define DEFAULT_SYNC                    FALSE
73 #define DEFAULT_CHECK_IMPERFECT_TIMESTAMP FALSE
74 #define DEFAULT_CHECK_IMPERFECT_OFFSET    FALSE
75 #define DEFAULT_SIGNAL_HANDOFFS           TRUE
76 #define DEFAULT_TS_OFFSET               0
77 #define DEFAULT_DROP_ALLOCATION         FALSE
78
79 enum
80 {
81   PROP_0,
82   PROP_SLEEP_TIME,
83   PROP_ERROR_AFTER,
84   PROP_DROP_PROBABILITY,
85   PROP_DROP_BUFFER_FLAGS,
86   PROP_DATARATE,
87   PROP_SILENT,
88   PROP_SINGLE_SEGMENT,
89   PROP_LAST_MESSAGE,
90   PROP_DUMP,
91   PROP_SYNC,
92   PROP_TS_OFFSET,
93   PROP_CHECK_IMPERFECT_TIMESTAMP,
94   PROP_CHECK_IMPERFECT_OFFSET,
95   PROP_SIGNAL_HANDOFFS,
96   PROP_DROP_ALLOCATION
97 };
98
99
100 #define _do_init \
101     GST_DEBUG_CATEGORY_INIT (gst_identity_debug, "identity", 0, "identity element");
102 #define gst_identity_parent_class parent_class
103 G_DEFINE_TYPE_WITH_CODE (GstIdentity, gst_identity, GST_TYPE_BASE_TRANSFORM,
104     _do_init);
105
106 static void gst_identity_finalize (GObject * object);
107 static void gst_identity_set_property (GObject * object, guint prop_id,
108     const GValue * value, GParamSpec * pspec);
109 static void gst_identity_get_property (GObject * object, guint prop_id,
110     GValue * value, GParamSpec * pspec);
111
112 static gboolean gst_identity_sink_event (GstBaseTransform * trans,
113     GstEvent * event);
114 static GstFlowReturn gst_identity_transform_ip (GstBaseTransform * trans,
115     GstBuffer * buf);
116 static gboolean gst_identity_start (GstBaseTransform * trans);
117 static gboolean gst_identity_stop (GstBaseTransform * trans);
118 static GstStateChangeReturn gst_identity_change_state (GstElement * element,
119     GstStateChange transition);
120 static gboolean gst_identity_accept_caps (GstBaseTransform * base,
121     GstPadDirection direction, GstCaps * caps);
122 static gboolean gst_identity_query (GstBaseTransform * base,
123     GstPadDirection direction, GstQuery * query);
124
125 static guint gst_identity_signals[LAST_SIGNAL] = { 0 };
126
127 static GParamSpec *pspec_last_message = NULL;
128
129 static void
130 gst_identity_finalize (GObject * object)
131 {
132   GstIdentity *identity;
133
134   identity = GST_IDENTITY (object);
135
136   g_free (identity->last_message);
137   g_cond_clear (&identity->blocked_cond);
138
139   G_OBJECT_CLASS (parent_class)->finalize (object);
140 }
141
142 static void
143 gst_identity_class_init (GstIdentityClass * klass)
144 {
145   GObjectClass *gobject_class;
146   GstElementClass *gstelement_class;
147   GstBaseTransformClass *gstbasetrans_class;
148
149   gobject_class = G_OBJECT_CLASS (klass);
150   gstelement_class = GST_ELEMENT_CLASS (klass);
151   gstbasetrans_class = GST_BASE_TRANSFORM_CLASS (klass);
152
153   gobject_class->set_property = gst_identity_set_property;
154   gobject_class->get_property = gst_identity_get_property;
155
156   g_object_class_install_property (gobject_class, PROP_SLEEP_TIME,
157       g_param_spec_uint ("sleep-time", "Sleep time",
158           "Microseconds to sleep between processing", 0, G_MAXUINT,
159           DEFAULT_SLEEP_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
160   g_object_class_install_property (gobject_class, PROP_ERROR_AFTER,
161       g_param_spec_int ("error-after", "Error After", "Error after N buffers",
162           G_MININT, G_MAXINT, DEFAULT_ERROR_AFTER,
163           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
164   g_object_class_install_property (gobject_class, PROP_DROP_PROBABILITY,
165       g_param_spec_float ("drop-probability", "Drop Probability",
166           "The Probability a buffer is dropped", 0.0, 1.0,
167           DEFAULT_DROP_PROBABILITY,
168           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
169
170   /**
171    * GstIdentity:drop-buffer-flags:
172    *
173    * Drop buffers with the given flags.
174    *
175    * Since: 1.8
176    **/
177   g_object_class_install_property (gobject_class, PROP_DROP_BUFFER_FLAGS,
178       g_param_spec_flags ("drop-buffer-flags", "Check flags to drop buffers",
179           "Drop buffers with the given flags",
180           GST_TYPE_BUFFER_FLAGS, DEFAULT_DROP_BUFFER_FLAGS,
181           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
182   g_object_class_install_property (gobject_class, PROP_DATARATE,
183       g_param_spec_int ("datarate", "Datarate",
184           "(Re)timestamps buffers with number of bytes per second (0 = inactive)",
185           0, G_MAXINT, DEFAULT_DATARATE,
186           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
187   g_object_class_install_property (gobject_class, PROP_SILENT,
188       g_param_spec_boolean ("silent", "silent", "silent", DEFAULT_SILENT,
189           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
190   g_object_class_install_property (gobject_class, PROP_SINGLE_SEGMENT,
191       g_param_spec_boolean ("single-segment", "Single Segment",
192           "Timestamp buffers and eat segments so as to appear as one segment",
193           DEFAULT_SINGLE_SEGMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
194   pspec_last_message = g_param_spec_string ("last-message", "last-message",
195       "last-message", NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
196   g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
197       pspec_last_message);
198   g_object_class_install_property (gobject_class, PROP_DUMP,
199       g_param_spec_boolean ("dump", "Dump", "Dump buffer contents to stdout",
200           DEFAULT_DUMP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201   g_object_class_install_property (gobject_class, PROP_SYNC,
202       g_param_spec_boolean ("sync", "Synchronize",
203           "Synchronize to pipeline clock", DEFAULT_SYNC,
204           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
205   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
206       g_param_spec_int64 ("ts-offset", "Timestamp offset for synchronisation",
207           "Timestamp offset in nanoseconds for synchronisation, negative for earlier sync",
208           G_MININT64, G_MAXINT64, DEFAULT_TS_OFFSET,
209           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class,
211       PROP_CHECK_IMPERFECT_TIMESTAMP,
212       g_param_spec_boolean ("check-imperfect-timestamp",
213           "Check for discontiguous timestamps",
214           "Send element messages if timestamps and durations do not match up",
215           DEFAULT_CHECK_IMPERFECT_TIMESTAMP,
216           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
217   g_object_class_install_property (gobject_class, PROP_CHECK_IMPERFECT_OFFSET,
218       g_param_spec_boolean ("check-imperfect-offset",
219           "Check for discontiguous offset",
220           "Send element messages if offset and offset_end do not match up",
221           DEFAULT_CHECK_IMPERFECT_OFFSET,
222           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
223
224   /**
225    * GstIdentity:signal-handoffs
226    *
227    * If set to %TRUE, the identity will emit a handoff signal when handling a buffer.
228    * When set to %FALSE, no signal will be emitted, which might improve performance.
229    */
230   g_object_class_install_property (gobject_class, PROP_SIGNAL_HANDOFFS,
231       g_param_spec_boolean ("signal-handoffs",
232           "Signal handoffs", "Send a signal before pushing the buffer",
233           DEFAULT_SIGNAL_HANDOFFS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
234
235   g_object_class_install_property (gobject_class, PROP_DROP_ALLOCATION,
236       g_param_spec_boolean ("drop-allocation", "Drop allocation query",
237           "Don't forward allocation queries", DEFAULT_DROP_ALLOCATION,
238           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
239
240   /**
241    * GstIdentity::handoff:
242    * @identity: the identity instance
243    * @buffer: the buffer that just has been received
244    * @pad: the pad that received it
245    *
246    * This signal gets emitted before passing the buffer downstream.
247    */
248   gst_identity_signals[SIGNAL_HANDOFF] =
249       g_signal_new ("handoff", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
250       G_STRUCT_OFFSET (GstIdentityClass, handoff), NULL, NULL,
251       g_cclosure_marshal_generic, G_TYPE_NONE, 1,
252       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
253
254   gobject_class->finalize = gst_identity_finalize;
255
256   gst_element_class_set_static_metadata (gstelement_class,
257       "Identity",
258       "Generic",
259       "Pass data without modification", "Erik Walthinsen <omega@cse.ogi.edu>");
260   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
261   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
262
263   gstelement_class->change_state =
264       GST_DEBUG_FUNCPTR (gst_identity_change_state);
265
266   gstbasetrans_class->sink_event = GST_DEBUG_FUNCPTR (gst_identity_sink_event);
267   gstbasetrans_class->transform_ip =
268       GST_DEBUG_FUNCPTR (gst_identity_transform_ip);
269   gstbasetrans_class->start = GST_DEBUG_FUNCPTR (gst_identity_start);
270   gstbasetrans_class->stop = GST_DEBUG_FUNCPTR (gst_identity_stop);
271   gstbasetrans_class->accept_caps =
272       GST_DEBUG_FUNCPTR (gst_identity_accept_caps);
273   gstbasetrans_class->query = gst_identity_query;
274 }
275
276 static void
277 gst_identity_init (GstIdentity * identity)
278 {
279   identity->sleep_time = DEFAULT_SLEEP_TIME;
280   identity->error_after = DEFAULT_ERROR_AFTER;
281   identity->drop_probability = DEFAULT_DROP_PROBABILITY;
282   identity->drop_buffer_flags = DEFAULT_DROP_BUFFER_FLAGS;
283   identity->datarate = DEFAULT_DATARATE;
284   identity->silent = DEFAULT_SILENT;
285   identity->single_segment = DEFAULT_SINGLE_SEGMENT;
286   identity->sync = DEFAULT_SYNC;
287   identity->check_imperfect_timestamp = DEFAULT_CHECK_IMPERFECT_TIMESTAMP;
288   identity->check_imperfect_offset = DEFAULT_CHECK_IMPERFECT_OFFSET;
289   identity->dump = DEFAULT_DUMP;
290   identity->last_message = NULL;
291   identity->signal_handoffs = DEFAULT_SIGNAL_HANDOFFS;
292   identity->ts_offset = DEFAULT_TS_OFFSET;
293   g_cond_init (&identity->blocked_cond);
294
295   gst_base_transform_set_gap_aware (GST_BASE_TRANSFORM_CAST (identity), TRUE);
296 }
297
298 static void
299 gst_identity_notify_last_message (GstIdentity * identity)
300 {
301   g_object_notify_by_pspec ((GObject *) identity, pspec_last_message);
302 }
303
304 static GstFlowReturn
305 gst_identity_do_sync (GstIdentity * identity, GstClockTime running_time)
306 {
307   GstFlowReturn ret = GST_FLOW_OK;
308
309   if (identity->sync &&
310       GST_BASE_TRANSFORM_CAST (identity)->segment.format == GST_FORMAT_TIME) {
311     GstClock *clock;
312
313     GST_OBJECT_LOCK (identity);
314
315     if (identity->flushing) {
316       GST_OBJECT_UNLOCK (identity);
317       return GST_FLOW_FLUSHING;
318     }
319
320     while (identity->blocked)
321       g_cond_wait (&identity->blocked_cond, GST_OBJECT_GET_LOCK (identity));
322
323     if (identity->flushing) {
324       GST_OBJECT_UNLOCK (identity);
325       return GST_FLOW_FLUSHING;
326     }
327
328     if ((clock = GST_ELEMENT (identity)->clock)) {
329       GstClockReturn cret;
330       GstClockTime timestamp;
331       GstClockTimeDiff ts_offset = identity->ts_offset;
332
333       timestamp = running_time + GST_ELEMENT (identity)->base_time +
334           identity->upstream_latency;
335       if (ts_offset < 0) {
336         ts_offset = -ts_offset;
337         if (ts_offset < timestamp)
338           timestamp -= ts_offset;
339         else
340           timestamp = 0;
341       } else
342         timestamp += ts_offset;
343
344       /* save id if we need to unlock */
345       identity->clock_id = gst_clock_new_single_shot_id (clock, timestamp);
346       GST_OBJECT_UNLOCK (identity);
347
348       cret = gst_clock_id_wait (identity->clock_id, NULL);
349
350       GST_OBJECT_LOCK (identity);
351       if (identity->clock_id) {
352         gst_clock_id_unref (identity->clock_id);
353         identity->clock_id = NULL;
354       }
355       if (cret == GST_CLOCK_UNSCHEDULED || identity->flushing)
356         ret = GST_FLOW_FLUSHING;
357     }
358     GST_OBJECT_UNLOCK (identity);
359   }
360
361   return ret;
362 }
363
364 static gboolean
365 gst_identity_sink_event (GstBaseTransform * trans, GstEvent * event)
366 {
367   GstIdentity *identity;
368   gboolean ret = TRUE;
369
370   identity = GST_IDENTITY (trans);
371
372   if (!identity->silent) {
373     const GstStructure *s;
374     const gchar *tstr;
375     gchar *sstr;
376
377     GST_OBJECT_LOCK (identity);
378     g_free (identity->last_message);
379
380     tstr = gst_event_type_get_name (GST_EVENT_TYPE (event));
381     if ((s = gst_event_get_structure (event)))
382       sstr = gst_structure_to_string (s);
383     else
384       sstr = g_strdup ("");
385
386     identity->last_message =
387         g_strdup_printf ("event   ******* (%s:%s) E (type: %s (%d), %s) %p",
388         GST_DEBUG_PAD_NAME (trans->sinkpad), tstr, GST_EVENT_TYPE (event),
389         sstr, event);
390     g_free (sstr);
391     GST_OBJECT_UNLOCK (identity);
392
393     gst_identity_notify_last_message (identity);
394   }
395
396   if (identity->single_segment && (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) {
397     if (!trans->have_segment) {
398       GstEvent *news;
399       GstSegment segment;
400
401       gst_event_copy_segment (event, &segment);
402       gst_event_copy_segment (event, &trans->segment);
403       trans->have_segment = TRUE;
404
405       /* This is the first segment, send out a (0, -1) segment */
406       gst_segment_init (&segment, segment.format);
407       news = gst_event_new_segment (&segment);
408
409       gst_pad_event_default (trans->sinkpad, GST_OBJECT_CAST (trans), news);
410     } else {
411       /* need to track segment for proper running time */
412       gst_event_copy_segment (event, &trans->segment);
413     }
414   }
415
416   if (GST_EVENT_TYPE (event) == GST_EVENT_GAP &&
417       trans->have_segment && trans->segment.format == GST_FORMAT_TIME) {
418     GstClockTime start, dur;
419
420     gst_event_parse_gap (event, &start, &dur);
421     if (GST_CLOCK_TIME_IS_VALID (start)) {
422       start = gst_segment_to_running_time (&trans->segment,
423           GST_FORMAT_TIME, start);
424
425       gst_identity_do_sync (identity, start);
426
427       /* also transform GAP timestamp similar to buffer timestamps */
428       if (identity->single_segment) {
429         gst_event_unref (event);
430         event = gst_event_new_gap (start, dur);
431       }
432     }
433   }
434
435   /* Reset previous timestamp, duration and offsets on SEGMENT
436    * to prevent false warnings when checking for perfect streams */
437   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
438     identity->prev_timestamp = identity->prev_duration = GST_CLOCK_TIME_NONE;
439     identity->prev_offset = identity->prev_offset_end = GST_BUFFER_OFFSET_NONE;
440   }
441
442   if (identity->single_segment && GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
443     /* eat up segments */
444     gst_event_unref (event);
445     ret = TRUE;
446   } else {
447     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START) {
448       GST_OBJECT_LOCK (identity);
449       identity->flushing = TRUE;
450       if (identity->clock_id) {
451         GST_DEBUG_OBJECT (identity, "unlock clock wait");
452         gst_clock_id_unschedule (identity->clock_id);
453       }
454       GST_OBJECT_UNLOCK (identity);
455     } else if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) {
456       GST_OBJECT_LOCK (identity);
457       identity->flushing = FALSE;
458       GST_OBJECT_UNLOCK (identity);
459     }
460
461     ret = GST_BASE_TRANSFORM_CLASS (parent_class)->sink_event (trans, event);
462   }
463
464   return ret;
465 }
466
467 static void
468 gst_identity_check_imperfect_timestamp (GstIdentity * identity, GstBuffer * buf)
469 {
470   GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buf);
471
472   /* invalid timestamp drops us out of check.  FIXME: maybe warn ? */
473   if (timestamp != GST_CLOCK_TIME_NONE) {
474     /* check if we had a previous buffer to compare to */
475     if (identity->prev_timestamp != GST_CLOCK_TIME_NONE &&
476         identity->prev_duration != GST_CLOCK_TIME_NONE) {
477       GstClockTime t_expected;
478       GstClockTimeDiff dt;
479
480       t_expected = identity->prev_timestamp + identity->prev_duration;
481       dt = GST_CLOCK_DIFF (t_expected, timestamp);
482       if (dt != 0) {
483         /*
484          * "imperfect-timestamp" bus message:
485          * @identity:        the identity instance
486          * @delta:           the GST_CLOCK_DIFF to the prev timestamp
487          * @prev-timestamp:  the previous buffer timestamp
488          * @prev-duration:   the previous buffer duration
489          * @prev-offset:     the previous buffer offset
490          * @prev-offset-end: the previous buffer offset end
491          * @cur-timestamp:   the current buffer timestamp
492          * @cur-duration:    the current buffer duration
493          * @cur-offset:      the current buffer offset
494          * @cur-offset-end:  the current buffer offset end
495          *
496          * This bus message gets emitted if the check-imperfect-timestamp
497          * property is set and there is a gap in time between the
498          * last buffer and the newly received buffer.
499          */
500         gst_element_post_message (GST_ELEMENT (identity),
501             gst_message_new_element (GST_OBJECT (identity),
502                 gst_structure_new ("imperfect-timestamp",
503                     "delta", G_TYPE_INT64, dt,
504                     "prev-timestamp", G_TYPE_UINT64,
505                     identity->prev_timestamp, "prev-duration", G_TYPE_UINT64,
506                     identity->prev_duration, "prev-offset", G_TYPE_UINT64,
507                     identity->prev_offset, "prev-offset-end", G_TYPE_UINT64,
508                     identity->prev_offset_end, "cur-timestamp", G_TYPE_UINT64,
509                     timestamp, "cur-duration", G_TYPE_UINT64,
510                     GST_BUFFER_DURATION (buf), "cur-offset", G_TYPE_UINT64,
511                     GST_BUFFER_OFFSET (buf), "cur-offset-end", G_TYPE_UINT64,
512                     GST_BUFFER_OFFSET_END (buf), NULL)));
513       }
514     } else {
515       GST_DEBUG_OBJECT (identity, "can't check data-contiguity, no "
516           "offset_end was set on previous buffer");
517     }
518   }
519 }
520
521 static void
522 gst_identity_check_imperfect_offset (GstIdentity * identity, GstBuffer * buf)
523 {
524   guint64 offset;
525
526   offset = GST_BUFFER_OFFSET (buf);
527
528   if (identity->prev_offset_end != offset &&
529       identity->prev_offset_end != GST_BUFFER_OFFSET_NONE &&
530       offset != GST_BUFFER_OFFSET_NONE) {
531     /*
532      * "imperfect-offset" bus message:
533      * @identity:        the identity instance
534      * @prev-timestamp:  the previous buffer timestamp
535      * @prev-duration:   the previous buffer duration
536      * @prev-offset:     the previous buffer offset
537      * @prev-offset-end: the previous buffer offset end
538      * @cur-timestamp:   the current buffer timestamp
539      * @cur-duration:    the current buffer duration
540      * @cur-offset:      the current buffer offset
541      * @cur-offset-end:  the current buffer offset end
542      *
543      * This bus message gets emitted if the check-imperfect-offset
544      * property is set and there is a gap in offsets between the
545      * last buffer and the newly received buffer.
546      */
547     gst_element_post_message (GST_ELEMENT (identity),
548         gst_message_new_element (GST_OBJECT (identity),
549             gst_structure_new ("imperfect-offset", "prev-timestamp",
550                 G_TYPE_UINT64, identity->prev_timestamp, "prev-duration",
551                 G_TYPE_UINT64, identity->prev_duration, "prev-offset",
552                 G_TYPE_UINT64, identity->prev_offset, "prev-offset-end",
553                 G_TYPE_UINT64, identity->prev_offset_end, "cur-timestamp",
554                 G_TYPE_UINT64, GST_BUFFER_TIMESTAMP (buf), "cur-duration",
555                 G_TYPE_UINT64, GST_BUFFER_DURATION (buf), "cur-offset",
556                 G_TYPE_UINT64, GST_BUFFER_OFFSET (buf), "cur-offset-end",
557                 G_TYPE_UINT64, GST_BUFFER_OFFSET_END (buf), NULL)));
558   } else {
559     GST_DEBUG_OBJECT (identity, "can't check offset contiguity, no offset "
560         "and/or offset_end were set on previous buffer");
561   }
562 }
563
564 static const gchar *
565 print_pretty_time (gchar * ts_str, gsize ts_str_len, GstClockTime ts)
566 {
567   if (ts == GST_CLOCK_TIME_NONE)
568     return "none";
569
570   g_snprintf (ts_str, ts_str_len, "%" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
571   return ts_str;
572 }
573
574 static void
575 gst_identity_update_last_message_for_buffer (GstIdentity * identity,
576     const gchar * action, GstBuffer * buf, gsize size)
577 {
578   gchar dts_str[64], pts_str[64], dur_str[64];
579   gchar *flag_str, *meta_str;
580
581   GST_OBJECT_LOCK (identity);
582
583   flag_str = gst_buffer_get_flags_string (buf);
584   meta_str = gst_buffer_get_meta_string (buf);
585
586   g_free (identity->last_message);
587   identity->last_message = g_strdup_printf ("%s   ******* (%s:%s) "
588       "(%" G_GSIZE_FORMAT " bytes, dts: %s, pts: %s, duration: %s, offset: %"
589       G_GINT64_FORMAT ", " "offset_end: % " G_GINT64_FORMAT
590       ", flags: %08x %s, meta: %s) %p", action,
591       GST_DEBUG_PAD_NAME (GST_BASE_TRANSFORM_CAST (identity)->sinkpad), size,
592       print_pretty_time (dts_str, sizeof (dts_str), GST_BUFFER_DTS (buf)),
593       print_pretty_time (pts_str, sizeof (pts_str), GST_BUFFER_PTS (buf)),
594       print_pretty_time (dur_str, sizeof (dur_str), GST_BUFFER_DURATION (buf)),
595       GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf),
596       GST_BUFFER_FLAGS (buf), flag_str, meta_str ? meta_str : "none", buf);
597   g_free (flag_str);
598
599   GST_OBJECT_UNLOCK (identity);
600
601   gst_identity_notify_last_message (identity);
602 }
603
604 static GstFlowReturn
605 gst_identity_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
606 {
607   GstFlowReturn ret = GST_FLOW_OK;
608   GstIdentity *identity = GST_IDENTITY (trans);
609   GstClockTime rundts = GST_CLOCK_TIME_NONE;
610   GstClockTime runpts = GST_CLOCK_TIME_NONE;
611   GstClockTime ts, duration, runtimestamp;
612   gsize size;
613
614   size = gst_buffer_get_size (buf);
615
616   if (identity->check_imperfect_timestamp)
617     gst_identity_check_imperfect_timestamp (identity, buf);
618   if (identity->check_imperfect_offset)
619     gst_identity_check_imperfect_offset (identity, buf);
620
621   /* update prev values */
622   identity->prev_timestamp = GST_BUFFER_TIMESTAMP (buf);
623   identity->prev_duration = GST_BUFFER_DURATION (buf);
624   identity->prev_offset_end = GST_BUFFER_OFFSET_END (buf);
625   identity->prev_offset = GST_BUFFER_OFFSET (buf);
626
627   if (identity->error_after >= 0) {
628     identity->error_after--;
629     if (identity->error_after == 0)
630       goto error_after;
631   }
632
633   if (identity->drop_probability > 0.0) {
634     if ((gfloat) (1.0 * rand () / (RAND_MAX)) < identity->drop_probability)
635       goto dropped;
636   }
637
638   if (GST_BUFFER_FLAG_IS_SET (buf, identity->drop_buffer_flags))
639     goto dropped;
640
641   if (identity->dump) {
642     GstMapInfo info;
643
644     if (gst_buffer_map (buf, &info, GST_MAP_READ)) {
645       gst_util_dump_mem (info.data, info.size);
646       gst_buffer_unmap (buf, &info);
647     }
648   }
649
650   if (!identity->silent) {
651     gst_identity_update_last_message_for_buffer (identity, "chain", buf, size);
652   }
653
654   if (identity->datarate > 0) {
655     GstClockTime time = gst_util_uint64_scale_int (identity->offset,
656         GST_SECOND, identity->datarate);
657
658     GST_BUFFER_PTS (buf) = GST_BUFFER_DTS (buf) = time;
659     GST_BUFFER_DURATION (buf) = size * GST_SECOND / identity->datarate;
660   }
661
662   if (identity->signal_handoffs)
663     g_signal_emit (identity, gst_identity_signals[SIGNAL_HANDOFF], 0, buf);
664
665   if (trans->segment.format == GST_FORMAT_TIME) {
666     rundts = gst_segment_to_running_time (&trans->segment,
667         GST_FORMAT_TIME, GST_BUFFER_DTS (buf));
668     runpts = gst_segment_to_running_time (&trans->segment,
669         GST_FORMAT_TIME, GST_BUFFER_PTS (buf));
670   }
671
672   if (GST_CLOCK_TIME_IS_VALID (rundts))
673     runtimestamp = rundts;
674   else if (GST_CLOCK_TIME_IS_VALID (runpts))
675     runtimestamp = runpts;
676   else
677     runtimestamp = 0;
678   ret = gst_identity_do_sync (identity, runtimestamp);
679
680   identity->offset += size;
681
682   if (identity->sleep_time && ret == GST_FLOW_OK)
683     g_usleep (identity->sleep_time);
684
685   if (identity->single_segment && (trans->segment.format == GST_FORMAT_TIME)
686       && (ret == GST_FLOW_OK)) {
687     GST_BUFFER_DTS (buf) = rundts;
688     GST_BUFFER_PTS (buf) = runpts;
689     GST_BUFFER_OFFSET (buf) = GST_CLOCK_TIME_NONE;
690     GST_BUFFER_OFFSET_END (buf) = GST_CLOCK_TIME_NONE;
691   }
692
693   return ret;
694
695   /* ERRORS */
696 error_after:
697   {
698     GST_ELEMENT_ERROR (identity, CORE, FAILED,
699         (_("Failed after iterations as requested.")), (NULL));
700     return GST_FLOW_ERROR;
701   }
702 dropped:
703   {
704     if (!identity->silent) {
705       gst_identity_update_last_message_for_buffer (identity, "dropping", buf,
706           size);
707     }
708
709     ts = GST_BUFFER_TIMESTAMP (buf);
710     if (GST_CLOCK_TIME_IS_VALID (ts)) {
711       duration = GST_BUFFER_DURATION (buf);
712       gst_pad_push_event (GST_BASE_TRANSFORM_SRC_PAD (identity),
713           gst_event_new_gap (ts, duration));
714     }
715
716     /* return DROPPED to basetransform. */
717     return GST_BASE_TRANSFORM_FLOW_DROPPED;
718   }
719 }
720
721 static void
722 gst_identity_set_property (GObject * object, guint prop_id,
723     const GValue * value, GParamSpec * pspec)
724 {
725   GstIdentity *identity;
726
727   identity = GST_IDENTITY (object);
728
729   switch (prop_id) {
730     case PROP_SLEEP_TIME:
731       identity->sleep_time = g_value_get_uint (value);
732       break;
733     case PROP_SILENT:
734       identity->silent = g_value_get_boolean (value);
735       break;
736     case PROP_SINGLE_SEGMENT:
737       identity->single_segment = g_value_get_boolean (value);
738       break;
739     case PROP_DUMP:
740       identity->dump = g_value_get_boolean (value);
741       break;
742     case PROP_ERROR_AFTER:
743       identity->error_after = g_value_get_int (value);
744       break;
745     case PROP_DROP_PROBABILITY:
746       identity->drop_probability = g_value_get_float (value);
747       break;
748     case PROP_DROP_BUFFER_FLAGS:
749       identity->drop_buffer_flags = g_value_get_flags (value);
750       break;
751     case PROP_DATARATE:
752       identity->datarate = g_value_get_int (value);
753       break;
754     case PROP_SYNC:
755       identity->sync = g_value_get_boolean (value);
756       break;
757     case PROP_TS_OFFSET:
758       identity->ts_offset = g_value_get_int64 (value);
759       break;
760     case PROP_CHECK_IMPERFECT_TIMESTAMP:
761       identity->check_imperfect_timestamp = g_value_get_boolean (value);
762       break;
763     case PROP_CHECK_IMPERFECT_OFFSET:
764       identity->check_imperfect_offset = g_value_get_boolean (value);
765       break;
766     case PROP_SIGNAL_HANDOFFS:
767       identity->signal_handoffs = g_value_get_boolean (value);
768       break;
769     case PROP_DROP_ALLOCATION:
770       identity->drop_allocation = g_value_get_boolean (value);
771       break;
772     default:
773       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
774       break;
775   }
776   if (identity->datarate > 0 || identity->single_segment)
777     gst_base_transform_set_passthrough (GST_BASE_TRANSFORM (identity), FALSE);
778   else
779     gst_base_transform_set_passthrough (GST_BASE_TRANSFORM (identity), TRUE);
780 }
781
782 static void
783 gst_identity_get_property (GObject * object, guint prop_id, GValue * value,
784     GParamSpec * pspec)
785 {
786   GstIdentity *identity;
787
788   identity = GST_IDENTITY (object);
789
790   switch (prop_id) {
791     case PROP_SLEEP_TIME:
792       g_value_set_uint (value, identity->sleep_time);
793       break;
794     case PROP_ERROR_AFTER:
795       g_value_set_int (value, identity->error_after);
796       break;
797     case PROP_DROP_PROBABILITY:
798       g_value_set_float (value, identity->drop_probability);
799       break;
800     case PROP_DROP_BUFFER_FLAGS:
801       g_value_set_flags (value, identity->drop_buffer_flags);
802       break;
803     case PROP_DATARATE:
804       g_value_set_int (value, identity->datarate);
805       break;
806     case PROP_SILENT:
807       g_value_set_boolean (value, identity->silent);
808       break;
809     case PROP_SINGLE_SEGMENT:
810       g_value_set_boolean (value, identity->single_segment);
811       break;
812     case PROP_DUMP:
813       g_value_set_boolean (value, identity->dump);
814       break;
815     case PROP_LAST_MESSAGE:
816       GST_OBJECT_LOCK (identity);
817       g_value_set_string (value, identity->last_message);
818       GST_OBJECT_UNLOCK (identity);
819       break;
820     case PROP_SYNC:
821       g_value_set_boolean (value, identity->sync);
822       break;
823     case PROP_TS_OFFSET:
824       identity->ts_offset = g_value_get_int64 (value);
825       break;
826     case PROP_CHECK_IMPERFECT_TIMESTAMP:
827       g_value_set_boolean (value, identity->check_imperfect_timestamp);
828       break;
829     case PROP_CHECK_IMPERFECT_OFFSET:
830       g_value_set_boolean (value, identity->check_imperfect_offset);
831       break;
832     case PROP_SIGNAL_HANDOFFS:
833       g_value_set_boolean (value, identity->signal_handoffs);
834       break;
835     case PROP_DROP_ALLOCATION:
836       g_value_set_boolean (value, identity->drop_allocation);
837       break;
838     default:
839       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
840       break;
841   }
842 }
843
844 static gboolean
845 gst_identity_start (GstBaseTransform * trans)
846 {
847   GstIdentity *identity;
848
849   identity = GST_IDENTITY (trans);
850
851   identity->offset = 0;
852   identity->prev_timestamp = GST_CLOCK_TIME_NONE;
853   identity->prev_duration = GST_CLOCK_TIME_NONE;
854   identity->prev_offset_end = GST_BUFFER_OFFSET_NONE;
855   identity->prev_offset = GST_BUFFER_OFFSET_NONE;
856
857   return TRUE;
858 }
859
860 static gboolean
861 gst_identity_stop (GstBaseTransform * trans)
862 {
863   GstIdentity *identity;
864
865   identity = GST_IDENTITY (trans);
866
867   GST_OBJECT_LOCK (identity);
868   g_free (identity->last_message);
869   identity->last_message = NULL;
870   GST_OBJECT_UNLOCK (identity);
871
872   return TRUE;
873 }
874
875 static gboolean
876 gst_identity_accept_caps (GstBaseTransform * base,
877     GstPadDirection direction, GstCaps * caps)
878 {
879   gboolean ret;
880   GstPad *pad;
881
882   /* Proxy accept-caps */
883
884   if (direction == GST_PAD_SRC)
885     pad = GST_BASE_TRANSFORM_SINK_PAD (base);
886   else
887     pad = GST_BASE_TRANSFORM_SRC_PAD (base);
888
889   ret = gst_pad_peer_query_accept_caps (pad, caps);
890
891   return ret;
892 }
893
894 static gboolean
895 gst_identity_query (GstBaseTransform * base, GstPadDirection direction,
896     GstQuery * query)
897 {
898   GstIdentity *identity;
899   gboolean ret;
900
901   identity = GST_IDENTITY (base);
902
903   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION &&
904       identity->drop_allocation) {
905     GST_DEBUG_OBJECT (identity, "Dropping allocation query.");
906     return FALSE;
907   }
908
909   ret = GST_BASE_TRANSFORM_CLASS (parent_class)->query (base, direction, query);
910
911   if (GST_QUERY_TYPE (query) == GST_QUERY_LATENCY) {
912     gboolean live = FALSE;
913     GstClockTime min = 0, max = 0;
914
915     if (ret) {
916       gst_query_parse_latency (query, &live, &min, &max);
917
918       if (identity->sync && max < min) {
919         GST_ELEMENT_WARNING (base, CORE, CLOCK, (NULL),
920             ("Impossible to configure latency before identity sync=true:"
921                 " max %" GST_TIME_FORMAT " < min %"
922                 GST_TIME_FORMAT ". Add queues or other buffering elements.",
923                 GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
924       }
925     }
926
927     /* Ignore the upstream latency if it is not live */
928     GST_OBJECT_LOCK (identity);
929     if (live)
930       identity->upstream_latency = min;
931     else
932       identity->upstream_latency = 0;
933     GST_OBJECT_UNLOCK (identity);
934
935     gst_query_set_latency (query, live || identity->sync, min, max);
936     ret = TRUE;
937   }
938   return ret;
939 }
940
941 static GstStateChangeReturn
942 gst_identity_change_state (GstElement * element, GstStateChange transition)
943 {
944   GstStateChangeReturn ret;
945   GstIdentity *identity = GST_IDENTITY (element);
946   gboolean no_preroll = FALSE;
947
948   switch (transition) {
949     case GST_STATE_CHANGE_NULL_TO_READY:
950       break;
951     case GST_STATE_CHANGE_READY_TO_PAUSED:
952       GST_OBJECT_LOCK (identity);
953       identity->flushing = FALSE;
954       identity->blocked = TRUE;
955       GST_OBJECT_UNLOCK (identity);
956       if (identity->sync)
957         no_preroll = TRUE;
958       break;
959     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
960       GST_OBJECT_LOCK (identity);
961       identity->blocked = FALSE;
962       g_cond_broadcast (&identity->blocked_cond);
963       GST_OBJECT_UNLOCK (identity);
964       break;
965     case GST_STATE_CHANGE_PAUSED_TO_READY:
966       GST_OBJECT_LOCK (identity);
967       identity->flushing = TRUE;
968       if (identity->clock_id) {
969         GST_DEBUG_OBJECT (identity, "unlock clock wait");
970         gst_clock_id_unschedule (identity->clock_id);
971       }
972       identity->blocked = FALSE;
973       g_cond_broadcast (&identity->blocked_cond);
974       GST_OBJECT_UNLOCK (identity);
975       break;
976     default:
977       break;
978   }
979
980   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
981
982   switch (transition) {
983     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
984       GST_OBJECT_LOCK (identity);
985       identity->upstream_latency = 0;
986       identity->blocked = TRUE;
987       GST_OBJECT_UNLOCK (identity);
988       if (identity->sync)
989         no_preroll = TRUE;
990       break;
991     case GST_STATE_CHANGE_PAUSED_TO_READY:
992       break;
993     case GST_STATE_CHANGE_READY_TO_NULL:
994       break;
995     default:
996       break;
997   }
998
999   if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
1000     ret = GST_STATE_CHANGE_NO_PREROLL;
1001
1002   return ret;
1003 }