cccombiner: fix emission of selected-samples in one case
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / ext / closedcaption / gstcccombiner.c
1 /*
2  * GStreamer
3  * Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21
22 #ifdef HAVE_CONFIG_H
23 #  include <config.h>
24 #endif
25
26 #include <gst/gst.h>
27 #include <gst/base/base.h>
28 #include <gst/video/video.h>
29 #include <string.h>
30
31 #include "gstcccombiner.h"
32
33 GST_DEBUG_CATEGORY_STATIC (gst_cc_combiner_debug);
34 #define GST_CAT_DEFAULT gst_cc_combiner_debug
35
36 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
37     GST_PAD_SINK,
38     GST_PAD_ALWAYS,
39     GST_STATIC_CAPS_ANY);
40
41 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
42     GST_PAD_SRC,
43     GST_PAD_ALWAYS,
44     GST_STATIC_CAPS_ANY);
45
46 static GstStaticPadTemplate captiontemplate =
47     GST_STATIC_PAD_TEMPLATE ("caption",
48     GST_PAD_SINK,
49     GST_PAD_REQUEST,
50     GST_STATIC_CAPS
51     ("closedcaption/x-cea-608,format={ (string) raw, (string) s334-1a}; "
52         "closedcaption/x-cea-708,format={ (string) cc_data, (string) cdp }"));
53
54 #define parent_class gst_cc_combiner_parent_class
55 G_DEFINE_TYPE (GstCCCombiner, gst_cc_combiner, GST_TYPE_AGGREGATOR);
56 GST_ELEMENT_REGISTER_DEFINE (cccombiner, "cccombiner",
57     GST_RANK_NONE, GST_TYPE_CCCOMBINER);
58
59 enum
60 {
61   PROP_0,
62   PROP_SCHEDULE,
63   PROP_MAX_SCHEDULED,
64 };
65
66 #define DEFAULT_MAX_SCHEDULED 30
67 #define DEFAULT_SCHEDULE TRUE
68
69 typedef struct
70 {
71   GstVideoCaptionType caption_type;
72   GstBuffer *buffer;
73 } CaptionData;
74
75 typedef struct
76 {
77   GstBuffer *buffer;
78   GstClockTime running_time;
79   GstClockTime stream_time;
80 } CaptionQueueItem;
81
82 static void
83 caption_data_clear (CaptionData * data)
84 {
85   gst_buffer_unref (data->buffer);
86 }
87
88 static void
89 clear_scheduled (CaptionQueueItem * item)
90 {
91   gst_buffer_unref (item->buffer);
92 }
93
94 static void
95 gst_cc_combiner_finalize (GObject * object)
96 {
97   GstCCCombiner *self = GST_CCCOMBINER (object);
98
99   gst_queue_array_free (self->scheduled[0]);
100   gst_queue_array_free (self->scheduled[1]);
101   g_array_unref (self->current_frame_captions);
102   self->current_frame_captions = NULL;
103
104   G_OBJECT_CLASS (parent_class)->finalize (object);
105 }
106
107 #define GST_FLOW_NEED_DATA GST_FLOW_CUSTOM_SUCCESS
108
109 static const guint8 *
110 extract_cdp (const guint8 * cdp, guint cdp_len, guint * cc_data_len)
111 {
112   GstByteReader br;
113   guint16 u16;
114   guint8 u8;
115   guint8 flags;
116   guint len = 0;
117   const guint8 *cc_data = NULL;
118
119   *cc_data_len = 0;
120
121   /* Header + footer length */
122   if (cdp_len < 11) {
123     goto done;
124   }
125
126   gst_byte_reader_init (&br, cdp, cdp_len);
127   u16 = gst_byte_reader_get_uint16_be_unchecked (&br);
128   if (u16 != 0x9669) {
129     goto done;
130   }
131
132   u8 = gst_byte_reader_get_uint8_unchecked (&br);
133   if (u8 != cdp_len) {
134     goto done;
135   }
136
137   gst_byte_reader_skip_unchecked (&br, 1);
138
139   flags = gst_byte_reader_get_uint8_unchecked (&br);
140
141   /* No cc_data? */
142   if ((flags & 0x40) == 0) {
143     goto done;
144   }
145
146   /* cdp_hdr_sequence_cntr */
147   gst_byte_reader_skip_unchecked (&br, 2);
148
149   /* time_code_present */
150   if (flags & 0x80) {
151     if (gst_byte_reader_get_remaining (&br) < 5) {
152       goto done;
153     }
154     gst_byte_reader_skip_unchecked (&br, 5);
155   }
156
157   /* ccdata_present */
158   if (flags & 0x40) {
159     guint8 cc_count;
160
161     if (gst_byte_reader_get_remaining (&br) < 2) {
162       goto done;
163     }
164     u8 = gst_byte_reader_get_uint8_unchecked (&br);
165     if (u8 != 0x72) {
166       goto done;
167     }
168
169     cc_count = gst_byte_reader_get_uint8_unchecked (&br);
170     if ((cc_count & 0xe0) != 0xe0) {
171       goto done;
172     }
173     cc_count &= 0x1f;
174
175     if (cc_count == 0)
176       return 0;
177
178     len = 3 * cc_count;
179     if (gst_byte_reader_get_remaining (&br) < len)
180       goto done;
181
182     cc_data = gst_byte_reader_get_data_unchecked (&br, len);
183     *cc_data_len = len;
184   }
185
186 done:
187   return cc_data;
188 }
189
190 #define MAX_CDP_PACKET_LEN 256
191 #define MAX_CEA608_LEN 32
192
193 static const struct cdp_fps_entry cdp_fps_table[] = {
194   {0x1f, 24000, 1001, 25, 22, 3 /* FIXME: alternating max cea608 count! */ },
195   {0x2f, 24, 1, 25, 22, 2},
196   {0x3f, 25, 1, 24, 22, 2},
197   {0x4f, 30000, 1001, 20, 18, 2},
198   {0x5f, 30, 1, 20, 18, 2},
199   {0x6f, 50, 1, 12, 11, 1},
200   {0x7f, 60000, 1001, 10, 9, 1},
201   {0x8f, 60, 1, 10, 9, 1},
202 };
203 static const struct cdp_fps_entry null_fps_entry = { 0, 0, 0, 0 };
204
205 static const struct cdp_fps_entry *
206 cdp_fps_entry_from_fps (guint fps_n, guint fps_d)
207 {
208   int i;
209   for (i = 0; i < G_N_ELEMENTS (cdp_fps_table); i++) {
210     if (cdp_fps_table[i].fps_n == fps_n && cdp_fps_table[i].fps_d == fps_d)
211       return &cdp_fps_table[i];
212   }
213   return &null_fps_entry;
214 }
215
216
217 static GstBuffer *
218 make_cdp (GstCCCombiner * self, const guint8 * cc_data, guint cc_data_len,
219     const struct cdp_fps_entry *fps_entry, const GstVideoTimeCode * tc)
220 {
221   GstByteWriter bw;
222   guint8 flags, checksum;
223   guint i, len;
224   GstBuffer *ret = gst_buffer_new_allocate (NULL, MAX_CDP_PACKET_LEN, NULL);
225   GstMapInfo map;
226
227   gst_buffer_map (ret, &map, GST_MAP_WRITE);
228
229   gst_byte_writer_init_with_data (&bw, map.data, MAX_CDP_PACKET_LEN, FALSE);
230   gst_byte_writer_put_uint16_be_unchecked (&bw, 0x9669);
231   /* Write a length of 0 for now */
232   gst_byte_writer_put_uint8_unchecked (&bw, 0);
233
234   gst_byte_writer_put_uint8_unchecked (&bw, fps_entry->fps_idx);
235
236   /* caption_service_active */
237   flags = 0x02;
238
239   /* ccdata_present */
240   flags |= 0x40;
241
242   if (tc && tc->config.fps_n > 0)
243     flags |= 0x80;
244
245   /* reserved */
246   flags |= 0x01;
247
248   gst_byte_writer_put_uint8_unchecked (&bw, flags);
249
250   gst_byte_writer_put_uint16_be_unchecked (&bw, self->cdp_hdr_sequence_cntr);
251
252   if (tc && tc->config.fps_n > 0) {
253     guint8 u8;
254
255     gst_byte_writer_put_uint8_unchecked (&bw, 0x71);
256     /* reserved 11 - 2 bits */
257     u8 = 0xc0;
258     /* tens of hours - 2 bits */
259     u8 |= ((tc->hours / 10) & 0x3) << 4;
260     /* units of hours - 4 bits */
261     u8 |= (tc->hours % 10) & 0xf;
262     gst_byte_writer_put_uint8_unchecked (&bw, u8);
263
264     /* reserved 1 - 1 bit */
265     u8 = 0x80;
266     /* tens of minutes - 3 bits */
267     u8 |= ((tc->minutes / 10) & 0x7) << 4;
268     /* units of minutes - 4 bits */
269     u8 |= (tc->minutes % 10) & 0xf;
270     gst_byte_writer_put_uint8_unchecked (&bw, u8);
271
272     /* field flag - 1 bit */
273     u8 = tc->field_count < 2 ? 0x00 : 0x80;
274     /* tens of seconds - 3 bits */
275     u8 |= ((tc->seconds / 10) & 0x7) << 4;
276     /* units of seconds - 4 bits */
277     u8 |= (tc->seconds % 10) & 0xf;
278     gst_byte_writer_put_uint8_unchecked (&bw, u8);
279
280     /* drop frame flag - 1 bit */
281     u8 = (tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) ? 0x80 :
282         0x00;
283     /* reserved0 - 1 bit */
284     /* tens of frames - 2 bits */
285     u8 |= ((tc->frames / 10) & 0x3) << 4;
286     /* units of frames 4 bits */
287     u8 |= (tc->frames % 10) & 0xf;
288     gst_byte_writer_put_uint8_unchecked (&bw, u8);
289   }
290
291   gst_byte_writer_put_uint8_unchecked (&bw, 0x72);
292   gst_byte_writer_put_uint8_unchecked (&bw, 0xe0 | fps_entry->max_cc_count);
293   gst_byte_writer_put_data_unchecked (&bw, cc_data, cc_data_len);
294   while (fps_entry->max_cc_count > cc_data_len / 3) {
295     gst_byte_writer_put_uint8_unchecked (&bw, 0xfa);
296     gst_byte_writer_put_uint8_unchecked (&bw, 0x00);
297     gst_byte_writer_put_uint8_unchecked (&bw, 0x00);
298     cc_data_len += 3;
299   }
300
301   gst_byte_writer_put_uint8_unchecked (&bw, 0x74);
302   gst_byte_writer_put_uint16_be_unchecked (&bw, self->cdp_hdr_sequence_cntr);
303   self->cdp_hdr_sequence_cntr++;
304   /* We calculate the checksum afterwards */
305   gst_byte_writer_put_uint8_unchecked (&bw, 0);
306
307   len = gst_byte_writer_get_pos (&bw);
308   gst_byte_writer_set_pos (&bw, 2);
309   gst_byte_writer_put_uint8_unchecked (&bw, len);
310
311   checksum = 0;
312   for (i = 0; i < len; i++) {
313     checksum += map.data[i];
314   }
315   checksum &= 0xff;
316   checksum = 256 - checksum;
317   map.data[len - 1] = checksum;
318
319   gst_buffer_unmap (ret, &map);
320
321   gst_buffer_set_size (ret, len);
322
323   return ret;
324 }
325
326 static GstBuffer *
327 make_padding (GstCCCombiner * self, const GstVideoTimeCode * tc, guint field)
328 {
329   GstBuffer *ret = NULL;
330
331   switch (self->caption_type) {
332     case GST_VIDEO_CAPTION_TYPE_CEA708_CDP:
333     {
334       const guint8 cc_data[6] = { 0xfc, 0x80, 0x80, 0xf9, 0x80, 0x80 };
335
336       ret = make_cdp (self, cc_data, 6, self->cdp_fps_entry, tc);
337       break;
338     }
339     case GST_VIDEO_CAPTION_TYPE_CEA708_RAW:
340     {
341       GstMapInfo map;
342
343       ret = gst_buffer_new_allocate (NULL, 3, NULL);
344
345       gst_buffer_map (ret, &map, GST_MAP_WRITE);
346
347       map.data[0] = 0xfc | (field & 0x01);
348       map.data[1] = 0x80;
349       map.data[2] = 0x80;
350
351       gst_buffer_unmap (ret, &map);
352       break;
353     }
354     case GST_VIDEO_CAPTION_TYPE_CEA608_S334_1A:
355     {
356       GstMapInfo map;
357
358       ret = gst_buffer_new_allocate (NULL, 3, NULL);
359
360       gst_buffer_map (ret, &map, GST_MAP_WRITE);
361
362       map.data[0] = 0x80 | (field == 0 ? 0x01 : 0x00);
363       map.data[1] = 0x80;
364       map.data[2] = 0x80;
365
366       gst_buffer_unmap (ret, &map);
367       break;
368     }
369     case GST_VIDEO_CAPTION_TYPE_CEA608_RAW:
370     {
371       GstMapInfo map;
372
373       ret = gst_buffer_new_allocate (NULL, 2, NULL);
374
375       gst_buffer_map (ret, &map, GST_MAP_WRITE);
376
377       map.data[0] = 0x80;
378       map.data[1] = 0x80;
379
380       gst_buffer_unmap (ret, &map);
381       break;
382     }
383     default:
384       break;
385   }
386
387   return ret;
388 }
389
390 static void
391 queue_caption (GstCCCombiner * self, GstBuffer * scheduled, guint field)
392 {
393   GstAggregatorPad *caption_pad;
394   CaptionQueueItem item;
395
396   if (self->progressive && field == 1) {
397     gst_buffer_unref (scheduled);
398     return;
399   }
400
401   caption_pad =
402       GST_AGGREGATOR_PAD_CAST (gst_element_get_static_pad (GST_ELEMENT_CAST
403           (self), "caption"));
404
405   g_assert (gst_queue_array_get_length (self->scheduled[field]) <=
406       self->max_scheduled);
407
408   if (gst_queue_array_get_length (self->scheduled[field]) ==
409       self->max_scheduled) {
410     CaptionQueueItem *dropped =
411         gst_queue_array_pop_tail_struct (self->scheduled[field]);
412
413     GST_WARNING_OBJECT (self,
414         "scheduled queue runs too long, dropping %" GST_PTR_FORMAT, dropped);
415
416     gst_element_post_message (GST_ELEMENT_CAST (self),
417         gst_message_new_qos (GST_OBJECT_CAST (self), FALSE,
418             dropped->running_time, dropped->stream_time,
419             GST_BUFFER_PTS (dropped->buffer), GST_BUFFER_DURATION (dropped)));
420
421     gst_buffer_unref (dropped->buffer);
422   }
423
424   gst_object_unref (caption_pad);
425
426   item.buffer = scheduled;
427   item.running_time =
428       gst_segment_to_running_time (&caption_pad->segment, GST_FORMAT_TIME,
429       GST_BUFFER_PTS (scheduled));
430   item.stream_time =
431       gst_segment_to_stream_time (&caption_pad->segment, GST_FORMAT_TIME,
432       GST_BUFFER_PTS (scheduled));
433
434   gst_queue_array_push_tail_struct (self->scheduled[field], &item);
435 }
436
437 static void
438 schedule_cdp (GstCCCombiner * self, const GstVideoTimeCode * tc,
439     const guint8 * data, guint len, GstClockTime pts, GstClockTime duration)
440 {
441   const guint8 *cc_data;
442   guint cc_data_len;
443   gboolean inject = FALSE;
444
445   if ((cc_data = extract_cdp (data, len, &cc_data_len))) {
446     guint8 i;
447
448     for (i = 0; i < cc_data_len / 3; i++) {
449       gboolean cc_valid = (cc_data[i * 3] & 0x04) == 0x04;
450       guint8 cc_type = cc_data[i * 3] & 0x03;
451
452       if (!cc_valid)
453         continue;
454
455       if (cc_type == 0x00 || cc_type == 0x01) {
456         if (cc_data[i * 3 + 1] != 0x80 || cc_data[i * 3 + 2] != 0x80) {
457           inject = TRUE;
458           break;
459         }
460         continue;
461       } else {
462         inject = TRUE;
463         break;
464       }
465     }
466   }
467
468   if (inject) {
469     GstBuffer *buf =
470         make_cdp (self, cc_data, cc_data_len, self->cdp_fps_entry, tc);
471
472     /* We only set those for QoS reporting purposes */
473     GST_BUFFER_PTS (buf) = pts;
474     GST_BUFFER_DURATION (buf) = duration;
475
476     queue_caption (self, buf, 0);
477   }
478 }
479
480 static void
481 schedule_cea608_s334_1a (GstCCCombiner * self, guint8 * data, guint len,
482     GstClockTime pts, GstClockTime duration)
483 {
484   guint8 field0_data[3], field1_data[3];
485   guint field0_len = 0, field1_len = 0;
486   guint i;
487   gboolean field0_608 = FALSE, field1_608 = FALSE;
488
489   if (len % 3 != 0) {
490     GST_WARNING ("Invalid cc_data buffer size %u. Truncating to a multiple "
491         "of 3", len);
492     len = len - (len % 3);
493   }
494
495   for (i = 0; i < len / 3; i++) {
496     guint8 cc_type = data[i * 3] & 0x03;
497
498     if (cc_type == 0x01) {
499       if (field0_608)
500         continue;
501
502       field0_608 = TRUE;
503
504       if (data[i * 3 + 1] == 0x80 && data[i * 3 + 2] == 0x80)
505         continue;
506
507       field0_data[field0_len++] = data[i * 3];
508       field0_data[field0_len++] = data[i * 3 + 1];
509       field0_data[field0_len++] = data[i * 3 + 2];
510     } else if (cc_type == 0x00) {
511       if (field1_608)
512         continue;
513
514       field1_608 = TRUE;
515
516       if (data[i * 3 + 1] == 0x80 && data[i * 3 + 2] == 0x80)
517         continue;
518
519       field1_data[field1_len++] = data[i * 3];
520       field1_data[field1_len++] = data[i * 3 + 1];
521       field1_data[field1_len++] = data[i * 3 + 2];
522     } else {
523       break;
524     }
525   }
526
527   if (field0_len > 0) {
528     GstBuffer *buf = gst_buffer_new_allocate (NULL, field0_len, NULL);
529
530     gst_buffer_fill (buf, 0, field0_data, field0_len);
531     GST_BUFFER_PTS (buf) = pts;
532     GST_BUFFER_DURATION (buf) = duration;
533
534     queue_caption (self, buf, 0);
535   }
536
537   if (field1_len > 0) {
538     GstBuffer *buf = gst_buffer_new_allocate (NULL, field1_len, NULL);
539
540     gst_buffer_fill (buf, 0, field1_data, field1_len);
541     GST_BUFFER_PTS (buf) = pts;
542     GST_BUFFER_DURATION (buf) = duration;
543
544     queue_caption (self, buf, 1);
545   }
546 }
547
548 static void
549 schedule_cea708_raw (GstCCCombiner * self, guint8 * data, guint len,
550     GstClockTime pts, GstClockTime duration)
551 {
552   guint8 field0_data[MAX_CDP_PACKET_LEN], field1_data[3];
553   guint field0_len = 0, field1_len = 0;
554   guint i;
555   gboolean field0_608 = FALSE, field1_608 = FALSE;
556   gboolean started_ccp = FALSE;
557
558   if (len % 3 != 0) {
559     GST_WARNING ("Invalid cc_data buffer size %u. Truncating to a multiple "
560         "of 3", len);
561     len = len - (len % 3);
562   }
563
564   for (i = 0; i < len / 3; i++) {
565     gboolean cc_valid = (data[i * 3] & 0x04) == 0x04;
566     guint8 cc_type = data[i * 3] & 0x03;
567
568     if (!started_ccp) {
569       if (cc_type == 0x00) {
570         if (!cc_valid)
571           continue;
572
573         if (field0_608)
574           continue;
575
576         field0_608 = TRUE;
577
578         if (data[i * 3 + 1] == 0x80 && data[i * 3 + 2] == 0x80)
579           continue;
580
581         field0_data[field0_len++] = data[i * 3];
582         field0_data[field0_len++] = data[i * 3 + 1];
583         field0_data[field0_len++] = data[i * 3 + 2];
584       } else if (cc_type == 0x01) {
585         if (!cc_valid)
586           continue;
587
588         if (field1_608)
589           continue;
590
591         field1_608 = TRUE;
592
593         if (data[i * 3 + 1] == 0x80 && data[i * 3 + 2] == 0x80)
594           continue;
595
596         field1_data[field1_len++] = data[i * 3];
597         field1_data[field1_len++] = data[i * 3 + 1];
598         field1_data[field1_len++] = data[i * 3 + 2];
599       }
600
601       continue;
602     }
603
604     if (cc_type & 0x10)
605       started_ccp = TRUE;
606
607     if (!cc_valid)
608       continue;
609
610     if (cc_type == 0x00 || cc_type == 0x01)
611       continue;
612
613     field0_data[field0_len++] = data[i * 3];
614     field0_data[field0_len++] = data[i * 3 + 1];
615     field0_data[field0_len++] = data[i * 3 + 2];
616   }
617
618   if (field0_len > 0) {
619     GstBuffer *buf = gst_buffer_new_allocate (NULL, field0_len, NULL);
620
621     gst_buffer_fill (buf, 0, field0_data, field0_len);
622     GST_BUFFER_PTS (buf) = pts;
623     GST_BUFFER_DURATION (buf) = duration;
624
625     queue_caption (self, buf, 0);
626   }
627
628   if (field1_len > 0) {
629     GstBuffer *buf = gst_buffer_new_allocate (NULL, field1_len, NULL);
630
631     gst_buffer_fill (buf, 0, field1_data, field1_len);
632     GST_BUFFER_PTS (buf) = pts;
633     GST_BUFFER_DURATION (buf) = duration;
634
635     queue_caption (self, buf, 1);
636   }
637 }
638
639 static void
640 schedule_cea608_raw (GstCCCombiner * self, guint8 * data, guint len,
641     GstBuffer * buffer)
642 {
643   if (len < 2) {
644     return;
645   }
646
647   if (data[0] != 0x80 || data[1] != 0x80) {
648     queue_caption (self, gst_buffer_ref (buffer), 0);
649   }
650 }
651
652
653 static void
654 schedule_caption (GstCCCombiner * self, GstBuffer * caption_buf,
655     const GstVideoTimeCode * tc)
656 {
657   GstMapInfo map;
658   GstClockTime pts, duration;
659
660   pts = GST_BUFFER_PTS (caption_buf);
661   duration = GST_BUFFER_DURATION (caption_buf);
662
663   gst_buffer_map (caption_buf, &map, GST_MAP_READ);
664
665   switch (self->caption_type) {
666     case GST_VIDEO_CAPTION_TYPE_CEA708_CDP:
667       schedule_cdp (self, tc, map.data, map.size, pts, duration);
668       break;
669     case GST_VIDEO_CAPTION_TYPE_CEA708_RAW:
670       schedule_cea708_raw (self, map.data, map.size, pts, duration);
671       break;
672     case GST_VIDEO_CAPTION_TYPE_CEA608_S334_1A:
673       schedule_cea608_s334_1a (self, map.data, map.size, pts, duration);
674       break;
675     case GST_VIDEO_CAPTION_TYPE_CEA608_RAW:
676       schedule_cea608_raw (self, map.data, map.size, caption_buf);
677       break;
678     default:
679       break;
680   }
681
682   gst_buffer_unmap (caption_buf, &map);
683 }
684
685 static void
686 dequeue_caption (GstCCCombiner * self, const GstVideoTimeCode * tc, guint field,
687     gboolean drain)
688 {
689   CaptionQueueItem *scheduled;
690   CaptionData caption_data;
691
692   if ((scheduled = gst_queue_array_pop_head_struct (self->scheduled[field]))) {
693     caption_data.buffer = scheduled->buffer;
694     caption_data.caption_type = self->caption_type;
695     g_array_append_val (self->current_frame_captions, caption_data);
696   } else if (!drain) {
697     caption_data.caption_type = self->caption_type;
698     caption_data.buffer = make_padding (self, tc, field);
699     g_array_append_val (self->current_frame_captions, caption_data);
700   }
701 }
702
703 static GstFlowReturn
704 gst_cc_combiner_collect_captions (GstCCCombiner * self, gboolean timeout)
705 {
706   GstAggregatorPad *src_pad =
707       GST_AGGREGATOR_PAD (GST_AGGREGATOR_SRC_PAD (self));
708   GstAggregatorPad *caption_pad;
709   GstBuffer *video_buf;
710   GstVideoTimeCodeMeta *tc_meta;
711   GstVideoTimeCode *tc = NULL;
712   gboolean caption_pad_is_eos = FALSE;
713
714   g_assert (self->current_video_buffer != NULL);
715
716   caption_pad =
717       GST_AGGREGATOR_PAD_CAST (gst_element_get_static_pad (GST_ELEMENT_CAST
718           (self), "caption"));
719   /* No caption pad, forward buffer directly */
720   if (!caption_pad) {
721     GST_LOG_OBJECT (self, "No caption pad, passing through video");
722     video_buf = self->current_video_buffer;
723     gst_aggregator_selected_samples (GST_AGGREGATOR_CAST (self),
724         GST_BUFFER_PTS (video_buf), GST_BUFFER_DTS (video_buf),
725         GST_BUFFER_DURATION (video_buf), NULL);
726     self->current_video_buffer = NULL;
727     goto done;
728   }
729
730   tc_meta = gst_buffer_get_video_time_code_meta (self->current_video_buffer);
731
732   if (tc_meta) {
733     tc = &tc_meta->tc;
734   }
735
736   GST_LOG_OBJECT (self, "Trying to collect captions for queued video buffer");
737   do {
738     GstBuffer *caption_buf;
739     GstClockTime caption_time;
740     CaptionData caption_data;
741
742     caption_buf = gst_aggregator_pad_peek_buffer (caption_pad);
743     if (!caption_buf) {
744       if (gst_aggregator_pad_is_eos (caption_pad)) {
745         GST_DEBUG_OBJECT (self, "Caption pad is EOS, we're done");
746
747         caption_pad_is_eos = TRUE;
748         break;
749       } else if (!timeout) {
750         GST_DEBUG_OBJECT (self, "Need more caption data");
751         gst_object_unref (caption_pad);
752         return GST_FLOW_NEED_DATA;
753       } else {
754         GST_DEBUG_OBJECT (self, "No caption data on timeout");
755         break;
756       }
757     }
758
759     caption_time = GST_BUFFER_PTS (caption_buf);
760     if (!GST_CLOCK_TIME_IS_VALID (caption_time)) {
761       GST_ERROR_OBJECT (self, "Caption buffer without PTS");
762
763       gst_buffer_unref (caption_buf);
764       gst_object_unref (caption_pad);
765
766       return GST_FLOW_ERROR;
767     }
768
769     caption_time =
770         gst_segment_to_running_time (&caption_pad->segment, GST_FORMAT_TIME,
771         caption_time);
772
773     if (!GST_CLOCK_TIME_IS_VALID (caption_time)) {
774       GST_DEBUG_OBJECT (self, "Caption buffer outside segment, dropping");
775
776       gst_aggregator_pad_drop_buffer (caption_pad);
777       gst_buffer_unref (caption_buf);
778
779       continue;
780     }
781
782     if (gst_buffer_get_size (caption_buf) == 0 &&
783         GST_BUFFER_FLAG_IS_SET (caption_buf, GST_BUFFER_FLAG_GAP)) {
784       /* This is a gap, we can go ahead. We only consume it once its end point
785        * is behind the current video running time. Important to note that
786        * we can't deal with gaps with no duration (-1)
787        */
788       if (!GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DURATION (caption_buf))) {
789         GST_ERROR_OBJECT (self, "GAP buffer without a duration");
790
791         gst_buffer_unref (caption_buf);
792         gst_object_unref (caption_pad);
793
794         return GST_FLOW_ERROR;
795       }
796
797       gst_buffer_unref (caption_buf);
798
799       if (caption_time + GST_BUFFER_DURATION (caption_buf) <
800           self->current_video_running_time_end) {
801         gst_aggregator_pad_drop_buffer (caption_pad);
802         continue;
803       } else {
804         break;
805       }
806     }
807
808     /* Collected all caption buffers for this video buffer */
809     if (caption_time >= self->current_video_running_time_end) {
810       gst_buffer_unref (caption_buf);
811       break;
812     } else if (!self->schedule) {
813       if (GST_CLOCK_TIME_IS_VALID (self->previous_video_running_time_end)) {
814         if (caption_time < self->previous_video_running_time_end) {
815           GST_WARNING_OBJECT (self,
816               "Caption buffer before end of last video frame, dropping");
817
818           gst_aggregator_pad_drop_buffer (caption_pad);
819           gst_buffer_unref (caption_buf);
820           continue;
821         }
822       } else if (caption_time < self->current_video_running_time) {
823         GST_WARNING_OBJECT (self,
824             "Caption buffer before current video frame, dropping");
825
826         gst_aggregator_pad_drop_buffer (caption_pad);
827         gst_buffer_unref (caption_buf);
828         continue;
829       }
830     }
831
832     /* This caption buffer has to be collected */
833     GST_LOG_OBJECT (self,
834         "Collecting caption buffer %p %" GST_TIME_FORMAT " for video buffer %p",
835         caption_buf, GST_TIME_ARGS (caption_time), self->current_video_buffer);
836
837     caption_data.caption_type = self->caption_type;
838
839     gst_aggregator_pad_drop_buffer (caption_pad);
840
841     if (!self->schedule) {
842       caption_data.buffer = caption_buf;
843       g_array_append_val (self->current_frame_captions, caption_data);
844     } else {
845       schedule_caption (self, caption_buf, tc);
846       gst_buffer_unref (caption_buf);
847     }
848   } while (TRUE);
849
850   /* FIXME pad correctly according to fps */
851   if (self->schedule) {
852     g_assert (self->current_frame_captions->len == 0);
853
854     switch (self->caption_type) {
855       case GST_VIDEO_CAPTION_TYPE_CEA708_CDP:
856       {
857         /* Only relevant in alternate and mixed mode, no need to look at the caps */
858         if (GST_BUFFER_FLAG_IS_SET (self->current_video_buffer,
859                 GST_VIDEO_BUFFER_FLAG_INTERLACED)) {
860           if (!GST_VIDEO_BUFFER_IS_BOTTOM_FIELD (self->current_video_buffer)) {
861             dequeue_caption (self, tc, 0, caption_pad_is_eos);
862           }
863         } else {
864           dequeue_caption (self, tc, 0, caption_pad_is_eos);
865         }
866         break;
867       }
868       case GST_VIDEO_CAPTION_TYPE_CEA708_RAW:
869       case GST_VIDEO_CAPTION_TYPE_CEA608_S334_1A:
870       {
871         if (self->progressive) {
872           dequeue_caption (self, tc, 0, caption_pad_is_eos);
873         } else if (GST_BUFFER_FLAG_IS_SET (self->current_video_buffer,
874                 GST_VIDEO_BUFFER_FLAG_INTERLACED) &&
875             GST_BUFFER_FLAG_IS_SET (self->current_video_buffer,
876                 GST_VIDEO_BUFFER_FLAG_ONEFIELD)) {
877           if (GST_VIDEO_BUFFER_IS_TOP_FIELD (self->current_video_buffer)) {
878             dequeue_caption (self, tc, 0, caption_pad_is_eos);
879           } else {
880             dequeue_caption (self, tc, 1, caption_pad_is_eos);
881           }
882         } else {
883           dequeue_caption (self, tc, 0, caption_pad_is_eos);
884           dequeue_caption (self, tc, 1, caption_pad_is_eos);
885         }
886         break;
887       }
888       case GST_VIDEO_CAPTION_TYPE_CEA608_RAW:
889       {
890         if (self->progressive) {
891           dequeue_caption (self, tc, 0, caption_pad_is_eos);
892         } else if (GST_BUFFER_FLAG_IS_SET (self->current_video_buffer,
893                 GST_VIDEO_BUFFER_FLAG_INTERLACED)) {
894           if (!GST_VIDEO_BUFFER_IS_BOTTOM_FIELD (self->current_video_buffer)) {
895             dequeue_caption (self, tc, 0, caption_pad_is_eos);
896           }
897         } else {
898           dequeue_caption (self, tc, 0, caption_pad_is_eos);
899         }
900         break;
901       }
902       default:
903         break;
904     }
905   }
906
907   gst_aggregator_selected_samples (GST_AGGREGATOR_CAST (self),
908       GST_BUFFER_PTS (self->current_video_buffer),
909       GST_BUFFER_DTS (self->current_video_buffer),
910       GST_BUFFER_DURATION (self->current_video_buffer), NULL);
911
912   GST_LOG_OBJECT (self, "Attaching %u captions to buffer %p",
913       self->current_frame_captions->len, self->current_video_buffer);
914
915   if (self->current_frame_captions->len > 0) {
916     guint i;
917
918     video_buf = gst_buffer_make_writable (self->current_video_buffer);
919     self->current_video_buffer = NULL;
920
921     for (i = 0; i < self->current_frame_captions->len; i++) {
922       CaptionData *caption_data =
923           &g_array_index (self->current_frame_captions, CaptionData, i);
924       GstMapInfo map;
925
926       gst_buffer_map (caption_data->buffer, &map, GST_MAP_READ);
927       gst_buffer_add_video_caption_meta (video_buf, caption_data->caption_type,
928           map.data, map.size);
929       gst_buffer_unmap (caption_data->buffer, &map);
930     }
931
932     g_array_set_size (self->current_frame_captions, 0);
933   } else {
934     GST_LOG_OBJECT (self, "No captions for buffer %p",
935         self->current_video_buffer);
936     video_buf = self->current_video_buffer;
937     self->current_video_buffer = NULL;
938   }
939
940   gst_object_unref (caption_pad);
941
942 done:
943   src_pad->segment.position =
944       GST_BUFFER_PTS (video_buf) + GST_BUFFER_DURATION (video_buf);
945
946   return gst_aggregator_finish_buffer (GST_AGGREGATOR_CAST (self), video_buf);
947 }
948
949 static GstFlowReturn
950 gst_cc_combiner_aggregate (GstAggregator * aggregator, gboolean timeout)
951 {
952   GstCCCombiner *self = GST_CCCOMBINER (aggregator);
953   GstFlowReturn flow_ret = GST_FLOW_OK;
954
955   /* If we have no current video buffer, queue one. If we have one but
956    * its end running time is not known yet, try to determine it from the
957    * next video buffer */
958   if (!self->current_video_buffer
959       || !GST_CLOCK_TIME_IS_VALID (self->current_video_running_time_end)) {
960     GstAggregatorPad *video_pad;
961     GstClockTime video_start;
962     GstBuffer *video_buf;
963
964     video_pad =
965         GST_AGGREGATOR_PAD_CAST (gst_element_get_static_pad (GST_ELEMENT_CAST
966             (aggregator), "sink"));
967     video_buf = gst_aggregator_pad_peek_buffer (video_pad);
968     if (!video_buf) {
969       if (gst_aggregator_pad_is_eos (video_pad)) {
970         GST_DEBUG_OBJECT (aggregator, "Video pad is EOS, we're done");
971
972         /* Assume that this buffer ends where it started +50ms (25fps) and handle it */
973         if (self->current_video_buffer) {
974           self->current_video_running_time_end =
975               self->current_video_running_time + 50 * GST_MSECOND;
976           flow_ret = gst_cc_combiner_collect_captions (self, timeout);
977         }
978
979         /* If we collected all captions for the remaining video frame we're
980          * done, otherwise get called another time and go directly into the
981          * outer branch for finishing the current video frame */
982         if (flow_ret == GST_FLOW_NEED_DATA)
983           flow_ret = GST_FLOW_OK;
984         else
985           flow_ret = GST_FLOW_EOS;
986       } else {
987         flow_ret = GST_FLOW_OK;
988       }
989
990       gst_object_unref (video_pad);
991       return flow_ret;
992     }
993
994     video_start = GST_BUFFER_PTS (video_buf);
995     if (!GST_CLOCK_TIME_IS_VALID (video_start)) {
996       gst_buffer_unref (video_buf);
997       gst_object_unref (video_pad);
998
999       GST_ERROR_OBJECT (aggregator, "Video buffer without PTS");
1000
1001       return GST_FLOW_ERROR;
1002     }
1003
1004     video_start =
1005         gst_segment_to_running_time (&video_pad->segment, GST_FORMAT_TIME,
1006         video_start);
1007     if (!GST_CLOCK_TIME_IS_VALID (video_start)) {
1008       GST_DEBUG_OBJECT (aggregator, "Buffer outside segment, dropping");
1009       gst_aggregator_pad_drop_buffer (video_pad);
1010       gst_buffer_unref (video_buf);
1011       gst_object_unref (video_pad);
1012       return GST_FLOW_OK;
1013     }
1014
1015     if (self->current_video_buffer) {
1016       /* If we already have a video buffer just update the current end running
1017        * time accordingly. That's what was missing and why we got here */
1018       self->current_video_running_time_end = video_start;
1019       gst_buffer_unref (video_buf);
1020       GST_LOG_OBJECT (self,
1021           "Determined end timestamp for video buffer: %p %" GST_TIME_FORMAT
1022           " - %" GST_TIME_FORMAT, self->current_video_buffer,
1023           GST_TIME_ARGS (self->current_video_running_time),
1024           GST_TIME_ARGS (self->current_video_running_time_end));
1025     } else {
1026       /* Otherwise we had no buffer queued currently. Let's do that now
1027        * so that we can collect captions for it */
1028       gst_buffer_replace (&self->current_video_buffer, video_buf);
1029       self->current_video_running_time = video_start;
1030       gst_aggregator_pad_drop_buffer (video_pad);
1031       gst_buffer_unref (video_buf);
1032
1033       if (GST_BUFFER_DURATION_IS_VALID (video_buf)) {
1034         GstClockTime end_time =
1035             GST_BUFFER_PTS (video_buf) + GST_BUFFER_DURATION (video_buf);
1036         if (video_pad->segment.stop != -1 && end_time > video_pad->segment.stop)
1037           end_time = video_pad->segment.stop;
1038         self->current_video_running_time_end =
1039             gst_segment_to_running_time (&video_pad->segment, GST_FORMAT_TIME,
1040             end_time);
1041       } else if (self->video_fps_n != 0 && self->video_fps_d != 0) {
1042         GstClockTime end_time =
1043             GST_BUFFER_PTS (video_buf) + gst_util_uint64_scale_int (GST_SECOND,
1044             self->video_fps_d, self->video_fps_n);
1045         if (video_pad->segment.stop != -1 && end_time > video_pad->segment.stop)
1046           end_time = video_pad->segment.stop;
1047         self->current_video_running_time_end =
1048             gst_segment_to_running_time (&video_pad->segment, GST_FORMAT_TIME,
1049             end_time);
1050       } else {
1051         self->current_video_running_time_end = GST_CLOCK_TIME_NONE;
1052       }
1053
1054       GST_LOG_OBJECT (self,
1055           "Queued new video buffer: %p %" GST_TIME_FORMAT " - %"
1056           GST_TIME_FORMAT, self->current_video_buffer,
1057           GST_TIME_ARGS (self->current_video_running_time),
1058           GST_TIME_ARGS (self->current_video_running_time_end));
1059     }
1060
1061     gst_object_unref (video_pad);
1062   }
1063
1064   /* At this point we have a video buffer queued and can start collecting
1065    * caption buffers for it */
1066   g_assert (self->current_video_buffer != NULL);
1067   g_assert (GST_CLOCK_TIME_IS_VALID (self->current_video_running_time));
1068   g_assert (GST_CLOCK_TIME_IS_VALID (self->current_video_running_time_end));
1069
1070   flow_ret = gst_cc_combiner_collect_captions (self, timeout);
1071
1072   /* Only if we collected all captions we replace the current video buffer
1073    * with NULL and continue with the next one on the next call */
1074   if (flow_ret == GST_FLOW_NEED_DATA) {
1075     flow_ret = GST_FLOW_OK;
1076   } else {
1077     gst_buffer_replace (&self->current_video_buffer, NULL);
1078     self->previous_video_running_time_end =
1079         self->current_video_running_time_end;
1080     self->current_video_running_time = self->current_video_running_time_end =
1081         GST_CLOCK_TIME_NONE;
1082   }
1083
1084   return flow_ret;
1085 }
1086
1087 static gboolean
1088 gst_cc_combiner_sink_event (GstAggregator * aggregator,
1089     GstAggregatorPad * agg_pad, GstEvent * event)
1090 {
1091   GstCCCombiner *self = GST_CCCOMBINER (aggregator);
1092
1093   switch (GST_EVENT_TYPE (event)) {
1094     case GST_EVENT_CAPS:{
1095       GstCaps *caps;
1096       GstStructure *s;
1097
1098       gst_event_parse_caps (event, &caps);
1099       s = gst_caps_get_structure (caps, 0);
1100
1101       if (strcmp (GST_OBJECT_NAME (agg_pad), "caption") == 0) {
1102         GstVideoCaptionType caption_type =
1103             gst_video_caption_type_from_caps (caps);
1104
1105         if (self->caption_type != GST_VIDEO_CAPTION_TYPE_UNKNOWN &&
1106             caption_type != self->caption_type) {
1107           GST_ERROR_OBJECT (self, "Changing caption type is not allowed");
1108
1109           GST_ELEMENT_ERROR (self, CORE, NEGOTIATION, (NULL),
1110               ("Changing caption type is not allowed"));
1111
1112           return FALSE;
1113         }
1114         self->caption_type = caption_type;
1115       } else {
1116         gint fps_n, fps_d;
1117         const gchar *interlace_mode;
1118
1119         fps_n = fps_d = 0;
1120
1121         gst_structure_get_fraction (s, "framerate", &fps_n, &fps_d);
1122
1123         interlace_mode = gst_structure_get_string (s, "interlace-mode");
1124
1125         self->progressive = !interlace_mode
1126             || !g_strcmp0 (interlace_mode, "progressive");
1127
1128         if (fps_n != self->video_fps_n || fps_d != self->video_fps_d) {
1129           GstClockTime latency;
1130
1131           latency = gst_util_uint64_scale (GST_SECOND, fps_d, fps_n);
1132           gst_aggregator_set_latency (aggregator, latency, latency);
1133         }
1134
1135         self->video_fps_n = fps_n;
1136         self->video_fps_d = fps_d;
1137
1138         self->cdp_fps_entry = cdp_fps_entry_from_fps (fps_n, fps_d);
1139
1140         gst_aggregator_set_src_caps (aggregator, caps);
1141       }
1142
1143       break;
1144     }
1145     case GST_EVENT_SEGMENT:{
1146       if (strcmp (GST_OBJECT_NAME (agg_pad), "sink") == 0) {
1147         const GstSegment *segment;
1148
1149         gst_event_parse_segment (event, &segment);
1150         gst_aggregator_update_segment (aggregator, segment);
1151       }
1152       break;
1153     }
1154     default:
1155       break;
1156   }
1157
1158   return GST_AGGREGATOR_CLASS (parent_class)->sink_event (aggregator, agg_pad,
1159       event);
1160 }
1161
1162 static gboolean
1163 gst_cc_combiner_stop (GstAggregator * aggregator)
1164 {
1165   GstCCCombiner *self = GST_CCCOMBINER (aggregator);
1166
1167   self->video_fps_n = self->video_fps_d = 0;
1168   self->current_video_running_time = self->current_video_running_time_end =
1169       self->previous_video_running_time_end = GST_CLOCK_TIME_NONE;
1170   gst_buffer_replace (&self->current_video_buffer, NULL);
1171
1172   g_array_set_size (self->current_frame_captions, 0);
1173   self->caption_type = GST_VIDEO_CAPTION_TYPE_UNKNOWN;
1174
1175   gst_queue_array_clear (self->scheduled[0]);
1176   gst_queue_array_clear (self->scheduled[1]);
1177   self->cdp_fps_entry = &null_fps_entry;
1178
1179   return TRUE;
1180 }
1181
1182 static GstFlowReturn
1183 gst_cc_combiner_flush (GstAggregator * aggregator)
1184 {
1185   GstCCCombiner *self = GST_CCCOMBINER (aggregator);
1186   GstAggregatorPad *src_pad =
1187       GST_AGGREGATOR_PAD (GST_AGGREGATOR_SRC_PAD (aggregator));
1188
1189   self->current_video_running_time = self->current_video_running_time_end =
1190       self->previous_video_running_time_end = GST_CLOCK_TIME_NONE;
1191   gst_buffer_replace (&self->current_video_buffer, NULL);
1192
1193   g_array_set_size (self->current_frame_captions, 0);
1194
1195   src_pad->segment.position = GST_CLOCK_TIME_NONE;
1196
1197   self->cdp_hdr_sequence_cntr = 0;
1198   gst_queue_array_clear (self->scheduled[0]);
1199   gst_queue_array_clear (self->scheduled[1]);
1200
1201   return GST_FLOW_OK;
1202 }
1203
1204 static GstAggregatorPad *
1205 gst_cc_combiner_create_new_pad (GstAggregator * aggregator,
1206     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1207 {
1208   GstCCCombiner *self = GST_CCCOMBINER (aggregator);
1209   GstAggregatorPad *agg_pad;
1210
1211   if (templ->direction != GST_PAD_SINK)
1212     return NULL;
1213
1214   if (templ->presence != GST_PAD_REQUEST)
1215     return NULL;
1216
1217   if (strcmp (templ->name_template, "caption") != 0)
1218     return NULL;
1219
1220   GST_OBJECT_LOCK (self);
1221   agg_pad = g_object_new (GST_TYPE_AGGREGATOR_PAD,
1222       "name", "caption", "direction", GST_PAD_SINK, "template", templ, NULL);
1223   self->caption_type = GST_VIDEO_CAPTION_TYPE_UNKNOWN;
1224   GST_OBJECT_UNLOCK (self);
1225
1226   return agg_pad;
1227 }
1228
1229 static gboolean
1230 gst_cc_combiner_src_query (GstAggregator * aggregator, GstQuery * query)
1231 {
1232   GstPad *video_sinkpad =
1233       gst_element_get_static_pad (GST_ELEMENT_CAST (aggregator), "sink");
1234   gboolean ret;
1235
1236   switch (GST_QUERY_TYPE (query)) {
1237     case GST_QUERY_POSITION:
1238     case GST_QUERY_DURATION:
1239     case GST_QUERY_URI:
1240     case GST_QUERY_CAPS:
1241     case GST_QUERY_ALLOCATION:
1242       ret = gst_pad_peer_query (video_sinkpad, query);
1243       break;
1244     case GST_QUERY_ACCEPT_CAPS:{
1245       GstCaps *caps;
1246       GstCaps *templ = gst_static_pad_template_get_caps (&srctemplate);
1247
1248       gst_query_parse_accept_caps (query, &caps);
1249       gst_query_set_accept_caps_result (query, gst_caps_is_subset (caps,
1250               templ));
1251       gst_caps_unref (templ);
1252       ret = TRUE;
1253       break;
1254     }
1255     default:
1256       ret = GST_AGGREGATOR_CLASS (parent_class)->src_query (aggregator, query);
1257       break;
1258   }
1259
1260   gst_object_unref (video_sinkpad);
1261
1262   return ret;
1263 }
1264
1265 static gboolean
1266 gst_cc_combiner_sink_query (GstAggregator * aggregator,
1267     GstAggregatorPad * aggpad, GstQuery * query)
1268 {
1269   GstPad *video_sinkpad =
1270       gst_element_get_static_pad (GST_ELEMENT_CAST (aggregator), "sink");
1271   GstPad *srcpad = GST_AGGREGATOR_SRC_PAD (aggregator);
1272
1273   gboolean ret;
1274
1275   switch (GST_QUERY_TYPE (query)) {
1276     case GST_QUERY_POSITION:
1277     case GST_QUERY_DURATION:
1278     case GST_QUERY_URI:
1279     case GST_QUERY_ALLOCATION:
1280       if (GST_PAD_CAST (aggpad) == video_sinkpad) {
1281         ret = gst_pad_peer_query (srcpad, query);
1282       } else {
1283         ret =
1284             GST_AGGREGATOR_CLASS (parent_class)->sink_query (aggregator,
1285             aggpad, query);
1286       }
1287       break;
1288     case GST_QUERY_CAPS:
1289       if (GST_PAD_CAST (aggpad) == video_sinkpad) {
1290         ret = gst_pad_peer_query (srcpad, query);
1291       } else {
1292         GstCaps *filter;
1293         GstCaps *templ = gst_static_pad_template_get_caps (&captiontemplate);
1294
1295         gst_query_parse_caps (query, &filter);
1296
1297         if (filter) {
1298           GstCaps *caps =
1299               gst_caps_intersect_full (filter, templ, GST_CAPS_INTERSECT_FIRST);
1300           gst_query_set_caps_result (query, caps);
1301           gst_caps_unref (caps);
1302         } else {
1303           gst_query_set_caps_result (query, templ);
1304         }
1305         gst_caps_unref (templ);
1306         ret = TRUE;
1307       }
1308       break;
1309     case GST_QUERY_ACCEPT_CAPS:
1310       if (GST_PAD_CAST (aggpad) == video_sinkpad) {
1311         ret = gst_pad_peer_query (srcpad, query);
1312       } else {
1313         GstCaps *caps;
1314         GstCaps *templ = gst_static_pad_template_get_caps (&captiontemplate);
1315
1316         gst_query_parse_accept_caps (query, &caps);
1317         gst_query_set_accept_caps_result (query, gst_caps_is_subset (caps,
1318                 templ));
1319         gst_caps_unref (templ);
1320         ret = TRUE;
1321       }
1322       break;
1323     default:
1324       ret = GST_AGGREGATOR_CLASS (parent_class)->sink_query (aggregator,
1325           aggpad, query);
1326       break;
1327   }
1328
1329   gst_object_unref (video_sinkpad);
1330
1331   return ret;
1332 }
1333
1334 static GstSample *
1335 gst_cc_combiner_peek_next_sample (GstAggregator * agg,
1336     GstAggregatorPad * aggpad)
1337 {
1338   GstAggregatorPad *caption_pad, *video_pad;
1339   GstCCCombiner *self = GST_CCCOMBINER (agg);
1340   GstSample *res = NULL;
1341
1342   caption_pad =
1343       GST_AGGREGATOR_PAD_CAST (gst_element_get_static_pad (GST_ELEMENT_CAST
1344           (self), "caption"));
1345   video_pad =
1346       GST_AGGREGATOR_PAD_CAST (gst_element_get_static_pad (GST_ELEMENT_CAST
1347           (self), "sink"));
1348
1349   if (aggpad == caption_pad) {
1350     if (self->current_frame_captions->len > 0) {
1351       GstCaps *caps = gst_pad_get_current_caps (GST_PAD (aggpad));
1352       GstBufferList *buflist = gst_buffer_list_new ();
1353       guint i;
1354
1355       for (i = 0; i < self->current_frame_captions->len; i++) {
1356         CaptionData *caption_data =
1357             &g_array_index (self->current_frame_captions, CaptionData, i);
1358         gst_buffer_list_add (buflist, gst_buffer_ref (caption_data->buffer));
1359       }
1360
1361       res = gst_sample_new (NULL, caps, &aggpad->segment, NULL);
1362       gst_caps_unref (caps);
1363
1364       gst_sample_set_buffer_list (res, buflist);
1365       gst_buffer_list_unref (buflist);
1366     }
1367   } else if (aggpad == video_pad) {
1368     if (self->current_video_buffer) {
1369       GstCaps *caps = gst_pad_get_current_caps (GST_PAD (aggpad));
1370       res = gst_sample_new (self->current_video_buffer,
1371           caps, &aggpad->segment, NULL);
1372       gst_caps_unref (caps);
1373     }
1374   }
1375
1376   if (caption_pad)
1377     gst_object_unref (caption_pad);
1378
1379   if (video_pad)
1380     gst_object_unref (video_pad);
1381
1382   return res;
1383 }
1384
1385 static GstStateChangeReturn
1386 gst_cc_combiner_change_state (GstElement * element, GstStateChange transition)
1387 {
1388   GstCCCombiner *self = GST_CCCOMBINER (element);
1389
1390   switch (transition) {
1391     case GST_STATE_CHANGE_READY_TO_PAUSED:
1392       self->schedule = self->prop_schedule;
1393       self->max_scheduled = self->prop_max_scheduled;
1394       break;
1395     default:
1396       break;
1397   }
1398
1399   return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1400 }
1401
1402 static void
1403 gst_cc_combiner_set_property (GObject * object, guint prop_id,
1404     const GValue * value, GParamSpec * pspec)
1405 {
1406   GstCCCombiner *self = GST_CCCOMBINER (object);
1407
1408   switch (prop_id) {
1409     case PROP_SCHEDULE:
1410       self->prop_schedule = g_value_get_boolean (value);
1411       break;
1412     case PROP_MAX_SCHEDULED:
1413       self->prop_max_scheduled = g_value_get_uint (value);
1414       break;
1415     default:
1416       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1417       break;
1418   }
1419 }
1420
1421 static void
1422 gst_cc_combiner_get_property (GObject * object, guint prop_id, GValue * value,
1423     GParamSpec * pspec)
1424 {
1425   GstCCCombiner *self = GST_CCCOMBINER (object);
1426
1427   switch (prop_id) {
1428     case PROP_SCHEDULE:
1429       g_value_set_boolean (value, self->prop_schedule);
1430       break;
1431     case PROP_MAX_SCHEDULED:
1432       g_value_set_uint (value, self->prop_max_scheduled);
1433       break;
1434     default:
1435       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1436       break;
1437   }
1438 }
1439
1440 static void
1441 gst_cc_combiner_class_init (GstCCCombinerClass * klass)
1442 {
1443   GObjectClass *gobject_class;
1444   GstElementClass *gstelement_class;
1445   GstAggregatorClass *aggregator_class;
1446
1447   gobject_class = (GObjectClass *) klass;
1448   gstelement_class = (GstElementClass *) klass;
1449   aggregator_class = (GstAggregatorClass *) klass;
1450
1451   gobject_class->finalize = gst_cc_combiner_finalize;
1452   gobject_class->set_property = gst_cc_combiner_set_property;
1453   gobject_class->get_property = gst_cc_combiner_get_property;
1454
1455   gst_element_class_set_static_metadata (gstelement_class,
1456       "Closed Caption Combiner",
1457       "Filter",
1458       "Combines GstVideoCaptionMeta with video input stream",
1459       "Sebastian Dröge <sebastian@centricular.com>");
1460
1461   /**
1462    * GstCCCombiner:schedule:
1463    *
1464    * Controls whether caption buffers should be smoothly scheduled
1465    * in order to have exactly one per output video buffer.
1466    *
1467    * This can involve rewriting input captions, for example when the
1468    * input is CDP sequence counters are rewritten, time codes are dropped
1469    * and potentially re-injected if the input video frame had a time code
1470    * meta.
1471    *
1472    * Caption buffers may also get split up in order to assign captions to
1473    * the correct field when the input is interlaced.
1474    *
1475    * This can also imply that the input will drift from synchronization,
1476    * when there isn't enough padding in the input stream to catch up. In
1477    * that case the element will start dropping old caption buffers once
1478    * the number of buffers in its internal queue reaches
1479    * #GstCCCombiner:max-scheduled.
1480    *
1481    * When this is set to %FALSE, the behaviour of this element is essentially
1482    * that of a funnel.
1483    *
1484    * Since: 1.20
1485    */
1486   g_object_class_install_property (G_OBJECT_CLASS (klass),
1487       PROP_SCHEDULE, g_param_spec_boolean ("schedule",
1488           "Schedule",
1489           "Schedule caption buffers so that exactly one is output per video frame",
1490           FALSE,
1491           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1492           GST_PARAM_MUTABLE_READY));
1493
1494   /**
1495    * GstCCCombiner:max-scheduled:
1496    *
1497    * Controls the number of scheduled buffers after which the element
1498    * will start dropping old buffers from its internal queues. See
1499    * #GstCCCombiner:schedule.
1500    *
1501    * Since: 1.20
1502    */
1503   g_object_class_install_property (G_OBJECT_CLASS (klass),
1504       PROP_MAX_SCHEDULED, g_param_spec_uint ("max-scheduled",
1505           "Max Scheduled",
1506           "Maximum number of buffers to queue for scheduling", 0, G_MAXUINT,
1507           DEFAULT_MAX_SCHEDULED,
1508           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1509           GST_PARAM_MUTABLE_READY));
1510
1511   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
1512       &sinktemplate, GST_TYPE_AGGREGATOR_PAD);
1513   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
1514       &srctemplate, GST_TYPE_AGGREGATOR_PAD);
1515   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
1516       &captiontemplate, GST_TYPE_AGGREGATOR_PAD);
1517
1518   gstelement_class->change_state =
1519       GST_DEBUG_FUNCPTR (gst_cc_combiner_change_state);
1520
1521   aggregator_class->aggregate = gst_cc_combiner_aggregate;
1522   aggregator_class->stop = gst_cc_combiner_stop;
1523   aggregator_class->flush = gst_cc_combiner_flush;
1524   aggregator_class->create_new_pad = gst_cc_combiner_create_new_pad;
1525   aggregator_class->sink_event = gst_cc_combiner_sink_event;
1526   aggregator_class->negotiate = NULL;
1527   aggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
1528   aggregator_class->src_query = gst_cc_combiner_src_query;
1529   aggregator_class->sink_query = gst_cc_combiner_sink_query;
1530   aggregator_class->peek_next_sample = gst_cc_combiner_peek_next_sample;
1531
1532   GST_DEBUG_CATEGORY_INIT (gst_cc_combiner_debug, "cccombiner",
1533       0, "Closed Caption combiner");
1534 }
1535
1536 static void
1537 gst_cc_combiner_init (GstCCCombiner * self)
1538 {
1539   GstPadTemplate *templ;
1540   GstAggregatorPad *agg_pad;
1541
1542   templ = gst_static_pad_template_get (&sinktemplate);
1543   agg_pad = g_object_new (GST_TYPE_AGGREGATOR_PAD,
1544       "name", "sink", "direction", GST_PAD_SINK, "template", templ, NULL);
1545   gst_object_unref (templ);
1546   gst_element_add_pad (GST_ELEMENT_CAST (self), GST_PAD_CAST (agg_pad));
1547
1548   self->current_frame_captions =
1549       g_array_new (FALSE, FALSE, sizeof (CaptionData));
1550   g_array_set_clear_func (self->current_frame_captions,
1551       (GDestroyNotify) caption_data_clear);
1552
1553   self->current_video_running_time = self->current_video_running_time_end =
1554       self->previous_video_running_time_end = GST_CLOCK_TIME_NONE;
1555
1556   self->caption_type = GST_VIDEO_CAPTION_TYPE_UNKNOWN;
1557
1558   self->prop_schedule = DEFAULT_SCHEDULE;
1559   self->prop_max_scheduled = DEFAULT_MAX_SCHEDULED;
1560   self->scheduled[0] =
1561       gst_queue_array_new_for_struct (sizeof (CaptionQueueItem), 0);
1562   self->scheduled[1] =
1563       gst_queue_array_new_for_struct (sizeof (CaptionQueueItem), 0);
1564   gst_queue_array_set_clear_func (self->scheduled[0],
1565       (GDestroyNotify) clear_scheduled);
1566   gst_queue_array_set_clear_func (self->scheduled[1],
1567       (GDestroyNotify) clear_scheduled);
1568   self->cdp_hdr_sequence_cntr = 0;
1569   self->cdp_fps_entry = &null_fps_entry;
1570 }