GST_DEBUG_CATEGORY (gst_base_ts_mux_debug);
#define GST_CAT_DEFAULT gst_base_ts_mux_debug
-#define COLLECT_DATA_PAD(collect_data) (((GstCollectData *)(collect_data))->pad)
+/* GstBaseTsMuxPad */
+
+G_DEFINE_TYPE (GstBaseTsMuxPad, gst_base_ts_mux_pad, GST_TYPE_AGGREGATOR_PAD);
+
+/* Internals */
+
+static void
+gst_base_ts_mux_pad_reset (GstBaseTsMuxPad * pad)
+{
+ pad->dts = GST_CLOCK_STIME_NONE;
+ pad->prog_id = -1;
+
+ if (pad->free_func)
+ pad->free_func (pad->prepare_data);
+ pad->prepare_data = NULL;
+ pad->prepare_func = NULL;
+ pad->free_func = NULL;
+
+ if (pad->codec_data)
+ gst_buffer_replace (&pad->codec_data, NULL);
+
+ /* reference owned elsewhere */
+ pad->stream = NULL;
+ pad->prog = NULL;
+
+ if (pad->language) {
+ g_free (pad->language);
+ pad->language = NULL;
+ }
+}
+
+/* GstAggregatorPad implementation */
+
+static GstFlowReturn
+gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
+{
+ gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (agg_pad));
+
+ return GST_FLOW_OK;
+}
+
+/* GObject implementation */
+
+static void
+gst_base_ts_mux_pad_dispose (GObject * obj)
+{
+ GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (obj);
+
+ gst_base_ts_mux_pad_reset (ts_pad);
+}
+
+static void
+gst_base_ts_mux_pad_class_init (GstBaseTsMuxPadClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstAggregatorPadClass *gstaggpad_class = GST_AGGREGATOR_PAD_CLASS (klass);
+
+ gobject_class->dispose = gst_base_ts_mux_pad_dispose;
+ gstaggpad_class->flush = gst_base_ts_mux_pad_flush;
+}
+
+static void
+gst_base_ts_mux_pad_init (GstBaseTsMuxPad * vaggpad)
+{
+}
+
+/* GstBaseTsMux */
enum
{
GstBuffer *buffer;
} StreamData;
-G_DEFINE_TYPE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_ELEMENT);
+G_DEFINE_TYPE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_AGGREGATOR);
/* Internals */
#define parent_class gst_base_ts_mux_parent_class
static void
-gst_base_ts_mux_pad_reset (GstBaseTsPadData * pad_data)
-{
- pad_data->dts = GST_CLOCK_STIME_NONE;
- pad_data->prog_id = -1;
-
- if (pad_data->free_func)
- pad_data->free_func (pad_data->prepare_data);
- pad_data->prepare_data = NULL;
- pad_data->prepare_func = NULL;
- pad_data->free_func = NULL;
-
- if (pad_data->codec_data)
- gst_buffer_replace (&pad_data->codec_data, NULL);
-
- /* reference owned elsewhere */
- pad_data->stream = NULL;
- pad_data->prog = NULL;
-
- if (pad_data->language) {
- g_free (pad_data->language);
- pad_data->language = NULL;
- }
-
-}
-
-static void
gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux)
{
GstBuffer *buf;
GValue value = { 0 };
GstCaps *caps;
- caps = gst_caps_make_writable (gst_pad_get_current_caps (mux->srcpad));
+ caps =
+ gst_caps_make_writable (gst_pad_get_current_caps (GST_AGGREGATOR_SRC_PAD
+ (mux)));
structure = gst_caps_get_structure (caps, 0);
g_value_init (&array, GST_TYPE_ARRAY);
}
gst_structure_set_value (structure, "streamheader", &array);
- gst_pad_set_caps (mux->srcpad, caps);
+ gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
g_value_unset (&array);
gst_caps_unref (caps);
}
static void
-gst_base_ts_mux_prepare_srcpad (GstBaseTsMux * mux)
-{
- GstSegment seg;
- /* we are not going to seek */
- GstEvent *new_seg;
- gchar s_id[32];
- GstCaps *caps = gst_caps_new_simple ("video/mpegts",
- "systemstream", G_TYPE_BOOLEAN, TRUE,
- "packetsize", G_TYPE_INT, mux->packet_size,
- NULL);
-
- /* stream-start (FIXME: create id based on input ids) */
- g_snprintf (s_id, sizeof (s_id), "basetsmux-%08x", g_random_int ());
- gst_pad_push_event (mux->srcpad, gst_event_new_stream_start (s_id));
-
- gst_segment_init (&seg, GST_FORMAT_TIME);
- new_seg = gst_event_new_segment (&seg);
-
- /* Set caps on src pad from our template and push new segment */
- gst_pad_set_caps (mux->srcpad, caps);
- gst_caps_unref (caps);
-
- if (!gst_pad_push_event (mux->srcpad, new_seg)) {
- GST_WARNING_OBJECT (mux, "New segment event was not handled downstream");
- }
-}
-
-static void
gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
{
GstBuffer *buf;
- GSList *walk;
GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
mux->first = TRUE;
gst_event_replace (&mux->force_key_unit_event, NULL);
gst_buffer_replace (&mux->out_buffer, NULL);
- if (mux->collect) {
- GST_COLLECT_PADS_STREAM_LOCK (mux->collect);
- for (walk = mux->collect->data; walk != NULL; walk = g_slist_next (walk))
- gst_base_ts_mux_pad_reset ((GstBaseTsPadData *) walk->data);
- GST_COLLECT_PADS_STREAM_UNLOCK (mux->collect);
- }
-
if (alloc) {
g_assert (klass->create_ts_mux);
}
static GstFlowReturn
-gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data)
+gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
{
GstFlowReturn ret = GST_FLOW_ERROR;
GstCaps *caps;
guint8 color_spec = 0;
j2k_private_data *private_data = NULL;
- pad = ts_data->collect.pad;
+ pad = GST_PAD (ts_pad);
caps = gst_pad_get_current_caps (pad);
if (caps == NULL)
goto not_negotiated;
GST_DEBUG_OBJECT (pad, "Creating stream with PID 0x%04x for caps %"
- GST_PTR_FORMAT, ts_data->pid, caps);
+ GST_PTR_FORMAT, ts_pad->pid, caps);
s = gst_caps_get_structure (caps, 0);
GST_DEBUG_OBJECT (pad,
"we have additional codec data (%" G_GSIZE_FORMAT " bytes)",
gst_buffer_get_size (codec_data));
- ts_data->codec_data = gst_buffer_ref (codec_data);
- ts_data->prepare_func = gst_base_ts_mux_prepare_aac;
+ ts_pad->codec_data = gst_buffer_ref (codec_data);
+ ts_pad->prepare_func = gst_base_ts_mux_prepare_aac;
} else {
- ts_data->codec_data = NULL;
+ ts_pad->codec_data = NULL;
}
break;
}
} else if (strcmp (mt, "application/x-teletext") == 0) {
st = TSMUX_ST_PS_TELETEXT;
/* needs a particularly sized layout */
- ts_data->prepare_func = gst_base_ts_mux_prepare_teletext;
+ ts_pad->prepare_func = gst_base_ts_mux_prepare_teletext;
} else if (strcmp (mt, "audio/x-opus") == 0) {
guint8 channels, mapping_family, stream_count, coupled_count;
guint8 channel_mapping[256];
}
st = TSMUX_ST_PS_OPUS;
- ts_data->prepare_func = gst_base_ts_mux_prepare_opus;
+ ts_pad->prepare_func = gst_base_ts_mux_prepare_opus;
} else if (strcmp (mt, "meta/x-klv") == 0) {
st = TSMUX_ST_PS_KLV;
} else if (strcmp (mt, "image/x-jpc") == 0) {
goto not_negotiated;
}
st = TSMUX_ST_VIDEO_JP2K;
- ts_data->prepare_func = gst_base_ts_mux_prepare_jpeg2000;
- ts_data->prepare_data = private_data;
- ts_data->free_func = gst_base_ts_mux_free_jpeg2000;
+ ts_pad->prepare_func = gst_base_ts_mux_prepare_jpeg2000;
+ ts_pad->prepare_data = private_data;
+ ts_pad->free_func = gst_base_ts_mux_free_jpeg2000;
} else {
GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
if (klass->handle_media_type) {
- st = klass->handle_media_type (mux, mt, ts_data);
+ st = klass->handle_media_type (mux, mt, ts_pad);
}
}
if (st != TSMUX_ST_RESERVED) {
- ts_data->stream = tsmux_create_stream (mux->tsmux, st, ts_data->pid,
- ts_data->language);
+ ts_pad->stream = tsmux_create_stream (mux->tsmux, st, ts_pad->pid,
+ ts_pad->language);
} else {
GST_DEBUG_OBJECT (pad, "Failed to determine stream type");
}
- if (ts_data->stream != NULL) {
+ if (ts_pad->stream != NULL) {
const char *interlace_mode = gst_structure_get_string (s, "interlace-mode");
- gst_structure_get_int (s, "rate", &ts_data->stream->audio_sampling);
- gst_structure_get_int (s, "channels", &ts_data->stream->audio_channels);
- gst_structure_get_int (s, "bitrate", &ts_data->stream->audio_bitrate);
+ gst_structure_get_int (s, "rate", &ts_pad->stream->audio_sampling);
+ gst_structure_get_int (s, "channels", &ts_pad->stream->audio_channels);
+ gst_structure_get_int (s, "bitrate", &ts_pad->stream->audio_bitrate);
/* frame rate */
- gst_structure_get_fraction (s, "framerate", &ts_data->stream->num,
- &ts_data->stream->den);
+ gst_structure_get_fraction (s, "framerate", &ts_pad->stream->num,
+ &ts_pad->stream->den);
/* Interlace mode */
- ts_data->stream->interlace_mode = FALSE;
+ ts_pad->stream->interlace_mode = FALSE;
if (interlace_mode) {
- ts_data->stream->interlace_mode =
+ ts_pad->stream->interlace_mode =
g_str_equal (interlace_mode, "interleaved");
}
/* Width and Height */
- gst_structure_get_int (s, "width", &ts_data->stream->horizontal_size);
- gst_structure_get_int (s, "height", &ts_data->stream->vertical_size);
+ gst_structure_get_int (s, "width", &ts_pad->stream->horizontal_size);
+ gst_structure_get_int (s, "height", &ts_pad->stream->vertical_size);
- ts_data->stream->color_spec = color_spec;
- ts_data->stream->max_bitrate = max_rate;
- ts_data->stream->profile_and_level = profile | main_level;
+ ts_pad->stream->color_spec = color_spec;
+ ts_pad->stream->max_bitrate = max_rate;
+ ts_pad->stream->profile_and_level = profile | main_level;
- ts_data->stream->opus_channel_config_code = opus_channel_config_code;
+ ts_pad->stream->opus_channel_config_code = opus_channel_config_code;
- tsmux_stream_set_buffer_release_func (ts_data->stream, release_buffer_cb);
- tsmux_program_add_stream (ts_data->prog, ts_data->stream);
+ tsmux_stream_set_buffer_release_func (ts_pad->stream, release_buffer_cb);
+ tsmux_program_add_stream (ts_pad->prog, ts_pad->stream);
ret = GST_FLOW_OK;
}
gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
{
GstFlowReturn ret = GST_FLOW_OK;
- GSList *walk = mux->collect->data;
+ GList *walk = GST_ELEMENT (mux)->sinkpads;
/* Create the streams */
while (walk) {
- GstCollectData *c_data = (GstCollectData *) walk->data;
- GstBaseTsPadData *ts_data = (GstBaseTsPadData *) walk->data;
+ GstPad *pad = GST_PAD (walk->data);
+ GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data);
gchar *name = NULL;
gchar *pcr_name;
- walk = g_slist_next (walk);
+ walk = g_list_next (walk);
- if (ts_data->prog_id == -1) {
- name = GST_PAD_NAME (c_data->pad);
+ if (ts_pad->prog_id == -1) {
+ name = GST_PAD_NAME (pad);
if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map,
name)) {
gint idx;
idx, name, DEFAULT_PROG_ID);
idx = DEFAULT_PROG_ID;
}
- ts_data->prog_id = idx;
+ ts_pad->prog_id = idx;
} else {
- ts_data->prog_id = DEFAULT_PROG_ID;
+ ts_pad->prog_id = DEFAULT_PROG_ID;
}
}
- ts_data->prog =
+ ts_pad->prog =
(TsMuxProgram *) g_hash_table_lookup (mux->programs,
- GINT_TO_POINTER (ts_data->prog_id));
- if (ts_data->prog == NULL) {
- ts_data->prog = tsmux_program_new (mux->tsmux, ts_data->prog_id);
- if (ts_data->prog == NULL)
+ GINT_TO_POINTER (ts_pad->prog_id));
+ if (ts_pad->prog == NULL) {
+ ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
+ if (ts_pad->prog == NULL)
goto no_program;
- tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval);
+ tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
g_hash_table_insert (mux->programs,
- GINT_TO_POINTER (ts_data->prog_id), ts_data->prog);
+ GINT_TO_POINTER (ts_pad->prog_id), ts_pad->prog);
/* Take the first stream of the program for the PCR */
- GST_DEBUG_OBJECT (COLLECT_DATA_PAD (ts_data),
+ GST_DEBUG_OBJECT (ts_pad,
"Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
- ts_data->pid, ts_data->prog_id);
+ ts_pad->pid, ts_pad->prog_id);
- tsmux_program_set_pcr_stream (ts_data->prog, ts_data->stream);
+ tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
}
- if (ts_data->stream == NULL) {
- ret = gst_base_ts_mux_create_stream (mux, ts_data);
+ if (ts_pad->stream == NULL) {
+ ret = gst_base_ts_mux_create_stream (mux, ts_pad);
if (ret != GST_FLOW_OK)
goto no_stream;
}
/* Check for user-specified PCR PID */
- pcr_name = g_strdup_printf ("PCR_%d", ts_data->prog->pgm_number);
+ pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) {
const gchar *sink_name =
gst_structure_get_string (mux->prog_map, pcr_name);
if (!g_strcmp0 (name, sink_name)) {
GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
- "program (prog_id = %d)", ts_data->pid, ts_data->prog->pgm_number);
- tsmux_program_set_pcr_stream (ts_data->prog, ts_data->stream);
+ "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
+ tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
}
}
g_free (pcr_name);
}
static GstFlowReturn
+finish_buffer_list (GstBaseTsMux * mux, GstBufferList * list)
+{
+ guint i;
+ guint l = gst_buffer_list_length (list);
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ for (i = 0; i < l; i++) {
+ GstBuffer *buf = gst_buffer_list_get (list, i);
+
+ ret =
+ gst_aggregator_finish_buffer (GST_AGGREGATOR (mux),
+ gst_buffer_ref (buf));
+
+ if (ret != GST_FLOW_OK)
+ break;
+ }
+
+ gst_buffer_list_unref (list);
+
+ return ret;
+}
+
+static GstFlowReturn
gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
{
GstBufferList *buffer_list;
/* no alignment, just push all available data */
if (align == 0) {
buffer_list = gst_adapter_take_buffer_list (mux->out_adapter, av);
- return gst_pad_push_list (mux->srcpad, buffer_list);
+ return finish_buffer_list (mux, buffer_list);
}
align *= packet_size;
gst_buffer_list_add (buffer_list, buf);
}
- return gst_pad_push_list (mux->srcpad, buffer_list);
+ return finish_buffer_list (mux, buffer_list);
}
static GstFlowReturn
klass->allocate_packet (mux, buf);
}
-/* GstElement implementation */
-
-static gboolean
-gst_base_ts_mux_sink_event (GstCollectPads * pads, GstCollectData * data,
- GstEvent * event, gpointer user_data)
+static GstFlowReturn
+gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
+ GstAggregatorPad * agg_pad, GstBuffer * buf)
{
- GstBaseTsMux *mux = GST_BASE_TS_MUX (user_data);
- gboolean res = FALSE;
- gboolean forward = TRUE;
- GstBaseTsPadData *pad_data = (GstBaseTsPadData *) data;
-
-#ifndef GST_DISABLE_GST_DEBUG
- GstPad *pad;
-
- pad = data->pad;
-#endif
-
- switch (GST_EVENT_TYPE (event)) {
- case GST_EVENT_CUSTOM_DOWNSTREAM:
- {
- GstClockTime timestamp, stream_time, running_time;
- gboolean all_headers;
- guint count;
-
- if (!gst_video_event_is_force_key_unit (event))
- goto out;
-
- res = TRUE;
- forward = FALSE;
-
- gst_video_event_parse_downstream_force_key_unit (event,
- ×tamp, &stream_time, &running_time, &all_headers, &count);
- GST_INFO_OBJECT (pad, "have downstream force-key-unit event, "
- "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
- gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
-
- if (mux->force_key_unit_event != NULL) {
- GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
- "as an upstream force key unit is already queued");
- goto out;
- }
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstBaseTsMuxPad *best = GST_BASE_TS_MUX_PAD (agg_pad);
+ TsMuxProgram *prog;
+ gint64 pts = GST_CLOCK_STIME_NONE;
+ gint64 dts = GST_CLOCK_STIME_NONE;
+ gboolean delta = TRUE, header = FALSE;
+ StreamData *stream_data;
- if (!all_headers)
- goto out;
+ GST_DEBUG_OBJECT (mux, "Pads collected");
- mux->pending_key_unit_ts = running_time;
- gst_event_replace (&mux->force_key_unit_event, event);
- break;
+ if (G_UNLIKELY (mux->first)) {
+ ret = gst_base_ts_mux_create_streams (mux);
+ if (G_UNLIKELY (ret != GST_FLOW_OK)) {
+ if (buf)
+ gst_buffer_unref (buf);
+ return ret;
}
- case GST_EVENT_TAG:{
- GstTagList *list;
- gchar *lang = NULL;
- GST_DEBUG_OBJECT (mux, "received tag event");
- gst_event_parse_tag (event, &list);
+ mux->first = FALSE;
+ }
- /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
- if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
- const gchar *lang_code;
+ prog = best->prog;
+ if (prog == NULL)
+ goto no_program;
- lang_code = gst_tag_get_language_code_iso_639_2B (lang);
- if (lang_code) {
- GST_DEBUG_OBJECT (pad, "Setting language to '%s'", lang_code);
+ g_assert (buf != NULL);
- g_free (pad_data->language);
- pad_data->language = g_strdup (lang_code);
- } else {
- GST_WARNING_OBJECT (pad, "Did not get language code for '%s'", lang);
- }
- g_free (lang);
- }
+ if (best->prepare_func) {
+ GstBuffer *tmp;
- /* handled this, don't want collectpads to forward it downstream */
- res = TRUE;
- forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
- break;
- }
- case GST_EVENT_STREAM_START:{
- GstStreamFlags flags;
+ tmp = best->prepare_func (buf, best, mux);
+ g_assert (tmp);
+ gst_buffer_unref (buf);
+ buf = tmp;
+ }
- gst_event_parse_stream_flags (event, &flags);
+ if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
+ GstEvent *event;
- /* Don't wait for data on sparse inputs like metadata streams */
- if ((flags & GST_STREAM_FLAG_SPARSE)) {
- GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED);
- gst_collect_pads_set_waiting (pads, data, FALSE);
- GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);
- }
- break;
- }
- case GST_EVENT_FLUSH_STOP:{
+ event = check_pending_key_unit_event (mux->force_key_unit_event,
+ &agg_pad->segment, GST_BUFFER_PTS (buf),
+ GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
+ if (event) {
+ GstClockTime running_time;
+ guint count;
GList *cur;
- /* Send initial segments again after a flush-stop, and also resend the
- * header sections */
- mux->first = TRUE;
+ mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
+ gst_event_replace (&mux->force_key_unit_event, NULL);
+
+ gst_video_event_parse_downstream_force_key_unit (event,
+ NULL, NULL, &running_time, NULL, &count);
+
+ GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
+ "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
+ GST_TIME_ARGS (running_time), count);
+ gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
/* output PAT, SI tables */
tsmux_resend_pat (mux->tsmux);
tsmux_resend_pmt (program);
}
- break;
}
- default:
- break;
}
-out:
- if (!forward)
- gst_event_unref (event);
- else
- res = gst_collect_pads_event_default (pads, data, event, FALSE);
+ if (G_UNLIKELY (prog->pcr_stream == NULL)) {
+ /* Take the first data stream for the PCR */
+ GST_DEBUG_OBJECT (best,
+ "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
+ best->pid, best->prog_id);
- return res;
-}
+ /* Set the chosen PCR stream */
+ tsmux_program_set_pcr_stream (prog, best->stream);
+ }
-static gboolean
-gst_base_ts_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
- GstBaseTsMux *mux = GST_BASE_TS_MUX (parent);
- gboolean res = TRUE, forward = TRUE;
+ GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid);
- switch (GST_EVENT_TYPE (event)) {
- case GST_EVENT_CUSTOM_UPSTREAM:
- {
- GstIterator *iter;
- GstIteratorResult iter_ret;
- GstPad *sinkpad;
- GValue sinkpad_value = G_VALUE_INIT;
- GstClockTime running_time;
- gboolean all_headers, done, res = FALSE;
- guint count;
+ if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
+ pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
+ GST_DEBUG_OBJECT (mux, "Buffer has PTS %" GST_TIME_FORMAT " pts %"
+ G_GINT64_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts);
+ }
- if (!gst_video_event_is_force_key_unit (event))
- break;
+ if (GST_CLOCK_STIME_IS_VALID (best->dts)) {
+ dts = GSTTIME_TO_MPEGTIME (best->dts);
+ GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_STIME_FORMAT " dts %"
+ G_GINT64_FORMAT, GST_STIME_ARGS (best->dts), dts);
+ }
- forward = FALSE;
+ /* should not have a DTS without PTS */
+ if (!GST_CLOCK_STIME_IS_VALID (pts) && GST_CLOCK_STIME_IS_VALID (dts)) {
+ GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
+ pts = dts;
+ }
- gst_video_event_parse_upstream_force_key_unit (event,
- &running_time, &all_headers, &count);
+ if (best->stream->is_video_stream) {
+ delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
+ header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
+ }
- GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
- "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
- gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
- all_headers, count);
+ if (best->stream->is_meta && gst_buffer_get_size (buf) > (G_MAXUINT16 - 3)) {
+ GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
- if (!all_headers)
- break;
+ gst_buffer_unref (buf);
+ return GST_FLOW_OK;
+ }
- mux->pending_key_unit_ts = running_time;
- gst_event_replace (&mux->force_key_unit_event, event);
+ GST_DEBUG_OBJECT (mux, "delta: %d", delta);
- iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
- done = FALSE;
- while (!done) {
- gboolean tmp;
+ stream_data = stream_data_new (buf);
+ tsmux_stream_add_data (best->stream, stream_data->map_info.data,
+ stream_data->map_info.size, stream_data, pts, dts, !delta);
- iter_ret = gst_iterator_next (iter, &sinkpad_value);
- sinkpad = GST_PAD (g_value_get_object (&sinkpad_value));
+ /* outgoing ts follows ts of PCR program stream */
+ if (prog->pcr_stream == best->stream) {
+ /* prefer DTS if present for PCR as it should be monotone */
+ mux->last_ts =
+ GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
+ GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
+ }
- switch (iter_ret) {
- case GST_ITERATOR_DONE:
- done = TRUE;
- break;
- case GST_ITERATOR_OK:
- GST_INFO_OBJECT (pad, "forwarding");
- tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
- GST_INFO_OBJECT (mux, "result %d", tmp);
- /* succeed if at least one pad succeeds */
- res |= tmp;
- break;
- case GST_ITERATOR_ERROR:
- done = TRUE;
- break;
- case GST_ITERATOR_RESYNC:
- break;
- }
- g_value_reset (&sinkpad_value);
- }
- g_value_unset (&sinkpad_value);
- gst_iterator_free (iter);
- break;
+ mux->is_delta = delta;
+ mux->is_header = header;
+ while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
+ if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
+ /* Failed writing data for some reason. Set appropriate error */
+ GST_DEBUG_OBJECT (mux, "Failed to write data packet");
+ GST_ELEMENT_ERROR (mux, STREAM, MUX,
+ ("Failed writing output data to stream %04x", best->stream->id),
+ (NULL));
+ goto write_fail;
}
- default:
- break;
}
+ /* flush packet cache */
+ return gst_base_ts_mux_push_packets (mux, FALSE);
- if (forward)
- res = gst_pad_event_default (pad, parent, event);
- else
- gst_event_unref (event);
-
- return res;
+ /* ERRORS */
+write_fail:
+ {
+ return mux->last_flow_ret;
+ }
+no_program:
+ {
+ if (buf)
+ gst_buffer_unref (buf);
+ GST_ELEMENT_ERROR (mux, STREAM, MUX,
+ ("Stream on pad %" GST_PTR_FORMAT
+ " is not associated with any program", best), (NULL));
+ return GST_FLOW_ERROR;
+ }
}
+/* GstElement implementation */
+
static GstPad *
gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
const gchar * name, const GstCaps * caps)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
gint pid = -1;
- gchar *pad_name = NULL;
GstPad *pad = NULL;
- GstBaseTsPadData *pad_data = NULL;
if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
if (tsmux_find_stream (mux->tsmux, pid))
pid = tsmux_get_new_pid (mux->tsmux);
}
- pad_name = g_strdup_printf ("sink_%d", pid);
- pad = gst_pad_new_from_template (templ, pad_name);
- g_free (pad_name);
-
- pad_data = (GstBaseTsPadData *)
- gst_collect_pads_add_pad (mux->collect, pad, sizeof (GstBaseTsPadData),
- (GstCollectDataDestroyNotify) (gst_base_ts_mux_pad_reset), TRUE);
- if (pad_data == NULL)
- goto pad_failure;
+ pad = (GstPad *)
+ GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
+ templ, name, caps);
- gst_base_ts_mux_pad_reset (pad_data);
- pad_data->pid = pid;
-
- if (G_UNLIKELY (!gst_element_add_pad (element, pad)))
- goto could_not_add;
+ gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (pad));
+ GST_BASE_TS_MUX_PAD (pad)->pid = pid;
return pad;
(NULL));
return NULL;
}
-could_not_add:
- {
- GST_ELEMENT_ERROR (element, STREAM, FAILED,
- ("Internal data stream error."), ("Could not add pad to element"));
- gst_collect_pads_remove_pad (mux->collect, pad);
- gst_object_unref (pad);
- return NULL;
- }
-pad_failure:
- {
- GST_ELEMENT_ERROR (element, STREAM, FAILED,
- ("Internal data stream error."), ("Could not add pad to collectpads"));
- gst_object_unref (pad);
- return NULL;
- }
}
-static void
-gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
+static gboolean
+gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
{
+ GstMpegtsSection *section;
GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
- GST_DEBUG_OBJECT (mux, "Pad %" GST_PTR_FORMAT " being released", pad);
+ section = gst_event_parse_mpegts_section (event);
+ gst_event_unref (event);
+
+ if (section) {
+ GST_DEBUG ("Received event with mpegts section");
+
+ /* TODO: Check that the section type is supported */
+ tsmux_add_mpegts_si_section (mux->tsmux, section);
- if (mux->collect) {
- gst_collect_pads_remove_pad (mux->collect, pad);
+ return TRUE;
}
- /* chain up */
- gst_element_remove_pad (element, pad);
+ return FALSE;
}
-static GstStateChangeReturn
-gst_base_ts_mux_change_state (GstElement * element, GstStateChange transition)
+/* GstAggregator implementation */
+
+static gboolean
+gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
+ GstEvent * event)
{
- GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
- GstStateChangeReturn ret;
+ GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
+ GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+ GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (agg_pad);
+ gboolean res = FALSE;
+ gboolean forward = TRUE;
- switch (transition) {
- case GST_STATE_CHANGE_NULL_TO_READY:
- break;
- case GST_STATE_CHANGE_READY_TO_PAUSED:
- gst_collect_pads_start (mux->collect);
- break;
- case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
- break;
- case GST_STATE_CHANGE_PAUSED_TO_READY:
- gst_collect_pads_stop (mux->collect);
- break;
- case GST_STATE_CHANGE_READY_TO_NULL:
- break;
- default:
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CUSTOM_DOWNSTREAM:
+ {
+ GstClockTime timestamp, stream_time, running_time;
+ gboolean all_headers;
+ guint count;
+
+ if (!gst_video_event_is_force_key_unit (event))
+ goto out;
+
+ res = TRUE;
+ forward = FALSE;
+
+ gst_video_event_parse_downstream_force_key_unit (event,
+ ×tamp, &stream_time, &running_time, &all_headers, &count);
+ GST_INFO_OBJECT (ts_pad, "have downstream force-key-unit event, "
+ "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
+ gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
+
+ if (mux->force_key_unit_event != NULL) {
+ GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
+ "as an upstream force key unit is already queued");
+ goto out;
+ }
+
+ if (!all_headers)
+ goto out;
+
+ mux->pending_key_unit_ts = running_time;
+ gst_event_replace (&mux->force_key_unit_event, event);
break;
- }
+ }
+ case GST_EVENT_TAG:{
+ GstTagList *list;
+ gchar *lang = NULL;
+
+ GST_DEBUG_OBJECT (mux, "received tag event");
+ gst_event_parse_tag (event, &list);
+
+ /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
+ if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
+ const gchar *lang_code;
+
+ lang_code = gst_tag_get_language_code_iso_639_2B (lang);
+ if (lang_code) {
+ GST_DEBUG_OBJECT (ts_pad, "Setting language to '%s'", lang_code);
- ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ g_free (ts_pad->language);
+ ts_pad->language = g_strdup (lang_code);
+ } else {
+ GST_WARNING_OBJECT (ts_pad, "Did not get language code for '%s'",
+ lang);
+ }
+ g_free (lang);
+ }
- switch (transition) {
- case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ /* handled this, don't want collectpads to forward it downstream */
+ res = TRUE;
+ forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
break;
- case GST_STATE_CHANGE_PAUSED_TO_READY:
- gst_base_ts_mux_reset (mux, TRUE);
+ }
+ case GST_EVENT_STREAM_START:{
+ GstStreamFlags flags;
+
+ gst_event_parse_stream_flags (event, &flags);
+
+ /* Don't wait for data on sparse inputs like metadata streams */
+ /*
+ if ((flags & GST_STREAM_FLAG_SPARSE)) {
+ GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED);
+ gst_collect_pads_set_waiting (pads, data, FALSE);
+ GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);
+ }
+ */
break;
- case GST_STATE_CHANGE_READY_TO_NULL:
+ }
+ case GST_EVENT_FLUSH_STOP:{
+ GList *cur;
+
+ /* Send initial segments again after a flush-stop, and also resend the
+ * header sections */
+ mux->first = TRUE;
+
+ /* output PAT, SI tables */
+ tsmux_resend_pat (mux->tsmux);
+ tsmux_resend_si (mux->tsmux);
+
+ /* output PMT for each program */
+ for (cur = mux->tsmux->programs; cur; cur = cur->next) {
+ TsMuxProgram *program = (TsMuxProgram *) cur->data;
+
+ tsmux_resend_pmt (program);
+ }
break;
+ }
default:
break;
}
- return ret;
+out:
+ if (!forward)
+ gst_event_unref (event);
+ else
+ res = agg_class->sink_event (agg, agg_pad, event);
+
+ return res;
}
static gboolean
-gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
+gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
{
- GstMpegtsSection *section;
- GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
+ GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
+ GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+ gboolean res = TRUE, forward = TRUE;
- section = gst_event_parse_mpegts_section (event);
- gst_event_unref (event);
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CUSTOM_UPSTREAM:
+ {
+ GstIterator *iter;
+ GstIteratorResult iter_ret;
+ GstPad *sinkpad;
+ GValue sinkpad_value = G_VALUE_INIT;
+ GstClockTime running_time;
+ gboolean all_headers, done, res = FALSE;
+ guint count;
- if (section) {
- GST_DEBUG ("Received event with mpegts section");
+ if (!gst_video_event_is_force_key_unit (event))
+ break;
- /* TODO: Check that the section type is supported */
- tsmux_add_mpegts_si_section (mux->tsmux, section);
+ forward = FALSE;
- return TRUE;
+ gst_video_event_parse_upstream_force_key_unit (event,
+ &running_time, &all_headers, &count);
+
+ GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
+ "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
+ gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
+ all_headers, count);
+
+ if (!all_headers)
+ break;
+
+ mux->pending_key_unit_ts = running_time;
+ gst_event_replace (&mux->force_key_unit_event, event);
+
+ iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
+ done = FALSE;
+ while (!done) {
+ gboolean tmp;
+
+ iter_ret = gst_iterator_next (iter, &sinkpad_value);
+ sinkpad = GST_PAD (g_value_get_object (&sinkpad_value));
+
+ switch (iter_ret) {
+ case GST_ITERATOR_DONE:
+ done = TRUE;
+ break;
+ case GST_ITERATOR_OK:
+ GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding");
+ tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
+ GST_INFO_OBJECT (mux, "result %d", tmp);
+ /* succeed if at least one pad succeeds */
+ res |= tmp;
+ break;
+ case GST_ITERATOR_ERROR:
+ done = TRUE;
+ break;
+ case GST_ITERATOR_RESYNC:
+ break;
+ }
+ g_value_reset (&sinkpad_value);
+ }
+ g_value_unset (&sinkpad_value);
+ gst_iterator_free (iter);
+ break;
+ }
+ default:
+ break;
}
- return FALSE;
-}
+ if (forward)
+ res = agg_class->src_event (agg, event);
+ else
+ gst_event_unref (event);
-/* CollectPads implementation */
+ return res;
+}
-static GstFlowReturn
-gst_base_ts_mux_clip_inc_running_time (GstCollectPads * pads,
- GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
- gpointer user_data)
+static GstBuffer *
+gst_base_ts_mux_clip (GstAggregator * agg,
+ GstAggregatorPad * agg_pad, GstBuffer * buf)
{
- GstBaseTsPadData *pad_data = (GstBaseTsPadData *) cdata;
+ GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (agg_pad);
GstClockTime time;
+ GstBuffer *ret;
- *outbuf = buf;
+ ret = buf;
/* PTS */
time = GST_BUFFER_PTS (buf);
/* invalid left alone and passed */
if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
- time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
+ time =
+ gst_segment_to_running_time (&agg_pad->segment, GST_FORMAT_TIME, time);
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
- GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
+ GST_DEBUG_OBJECT (pad, "clipping buffer on pad outside segment");
gst_buffer_unref (buf);
- *outbuf = NULL;
+ ret = NULL;
goto beach;
} else {
- GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
+ GST_LOG_OBJECT (pad, "buffer pts %" GST_TIME_FORMAT " -> %"
GST_TIME_FORMAT " running time",
GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
- buf = *outbuf = gst_buffer_make_writable (buf);
- GST_BUFFER_PTS (*outbuf) = time;
+ buf = ret = gst_buffer_make_writable (buf);
+ GST_BUFFER_PTS (ret) = time;
}
}
gint sign;
gint64 dts;
- sign = gst_segment_to_running_time_full (&cdata->segment, GST_FORMAT_TIME,
+ sign = gst_segment_to_running_time_full (&agg_pad->segment, GST_FORMAT_TIME,
time, &time);
if (sign > 0)
else
dts = -((gint64) time);
- GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
+ GST_LOG_OBJECT (pad, "buffer dts %" GST_TIME_FORMAT " -> %"
GST_STIME_FORMAT " running time", GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
GST_STIME_ARGS (dts));
- if (GST_CLOCK_STIME_IS_VALID (pad_data->dts) && dts < pad_data->dts) {
+ if (GST_CLOCK_STIME_IS_VALID (pad->dts) && dts < pad->dts) {
/* Ignore DTS going backward */
- GST_WARNING_OBJECT (cdata->pad, "ignoring DTS going backward");
- dts = pad_data->dts;
+ GST_WARNING_OBJECT (pad, "ignoring DTS going backward");
+ dts = pad->dts;
}
- *outbuf = gst_buffer_make_writable (buf);
+ ret = gst_buffer_make_writable (buf);
if (sign > 0)
- GST_BUFFER_DTS (*outbuf) = time;
+ GST_BUFFER_DTS (ret) = time;
else
- GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
+ GST_BUFFER_DTS (ret) = GST_CLOCK_TIME_NONE;
- pad_data->dts = dts;
+ pad->dts = dts;
} else {
- pad_data->dts = GST_CLOCK_STIME_NONE;
+ pad->dts = GST_CLOCK_STIME_NONE;
}
beach:
- return GST_FLOW_OK;
+ return ret;
}
static GstFlowReturn
-gst_base_ts_mux_collected_buffer (GstCollectPads * pads, GstCollectData * data,
- GstBuffer * buf, GstBaseTsMux * mux)
+gst_base_ts_mux_update_src_caps (GstAggregator * agg, GstCaps * caps,
+ GstCaps ** ret)
{
- GstFlowReturn ret = GST_FLOW_OK;
- GstBaseTsPadData *best = (GstBaseTsPadData *) data;
- TsMuxProgram *prog;
- gint64 pts = GST_CLOCK_STIME_NONE;
- gint64 dts = GST_CLOCK_STIME_NONE;
- gboolean delta = TRUE, header = FALSE;
- StreamData *stream_data;
-
- GST_DEBUG_OBJECT (mux, "Pads collected");
-
- if (G_UNLIKELY (mux->first)) {
- ret = gst_base_ts_mux_create_streams (mux);
- if (G_UNLIKELY (ret != GST_FLOW_OK)) {
- if (buf)
- gst_buffer_unref (buf);
- return ret;
- }
-
- gst_base_ts_mux_prepare_srcpad (mux);
-
- mux->first = FALSE;
- }
-
- if (G_UNLIKELY (best == NULL)) {
- GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
- /* EOS */
- GST_INFO_OBJECT (mux, "EOS");
- /* drain some possibly cached data */
- if (klass->drain)
- klass->drain (mux);
- gst_base_ts_mux_push_packets (mux, TRUE);
- gst_pad_push_event (mux->srcpad, gst_event_new_eos ());
-
- if (buf)
- gst_buffer_unref (buf);
-
- return GST_FLOW_OK;
- }
-
- prog = best->prog;
- if (prog == NULL)
- goto no_program;
+ GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+ GstStructure *s;
- g_assert (buf != NULL);
+ *ret = gst_caps_copy (caps);
+ s = gst_caps_get_structure (*ret, 0);
+ gst_structure_set (s, "packetsize", G_TYPE_INT, mux->packet_size, NULL);
- if (best->prepare_func) {
- GstBuffer *tmp;
+ return GST_FLOW_OK;
+}
- tmp = best->prepare_func (buf, best, mux);
- g_assert (tmp);
- gst_buffer_unref (buf);
- buf = tmp;
+static GstBaseTsMuxPad *
+gst_base_ts_mux_find_best_pad (GstAggregator * aggregator)
+{
+ GstBaseTsMuxPad *pad, *best = NULL;
+ GList *l;
+ GstBuffer *buffer;
+ GstClockTime best_ts = GST_CLOCK_TIME_NONE;
+
+ for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
+ pad = GST_BASE_TS_MUX_PAD (l->data);
+ buffer = gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (pad));
+ if (!buffer)
+ continue;
+ if (best_ts == GST_CLOCK_TIME_NONE) {
+ best = pad;
+ best_ts = GST_BUFFER_DTS_OR_PTS (buffer);
+ } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
+ GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer);
+ if (t < best_ts) {
+ best = pad;
+ best_ts = t;
+ }
+ }
+ gst_buffer_unref (buffer);
}
+ GST_DEBUG_OBJECT (aggregator,
+ "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
+ GST_TIME_ARGS (best_ts), best);
- if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
- GstEvent *event;
-
- event = check_pending_key_unit_event (mux->force_key_unit_event,
- &best->collect.segment, GST_BUFFER_PTS (buf),
- GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
- if (event) {
- GstClockTime running_time;
- guint count;
- GList *cur;
-
- mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
- gst_event_replace (&mux->force_key_unit_event, NULL);
-
- gst_video_event_parse_downstream_force_key_unit (event,
- NULL, NULL, &running_time, NULL, &count);
-
- GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
- "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
- GST_TIME_ARGS (running_time), count);
- gst_pad_push_event (mux->srcpad, event);
+ return best;
+}
- /* output PAT, SI tables */
- tsmux_resend_pat (mux->tsmux);
- tsmux_resend_si (mux->tsmux);
+static gboolean
+gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux)
+{
+ GList *l;
- /* output PMT for each program */
- for (cur = mux->tsmux->programs; cur; cur = cur->next) {
- TsMuxProgram *program = (TsMuxProgram *) cur->data;
+ for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
+ GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data);
- tsmux_resend_pmt (program);
- }
- }
+ if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad)))
+ return FALSE;
}
+ return TRUE;
+}
- if (G_UNLIKELY (prog->pcr_stream == NULL)) {
- /* Take the first data stream for the PCR */
- GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best),
- "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
- best->pid, best->prog_id);
-
- /* Set the chosen PCR stream */
- tsmux_program_set_pcr_stream (prog, best->stream);
- }
- GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best),
- "Chose stream for output (PID: 0x%04x)", best->pid);
+static GstFlowReturn
+gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout)
+{
+ GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstBaseTsMuxPad *best = gst_base_ts_mux_find_best_pad (agg);
- if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
- pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
- GST_DEBUG_OBJECT (mux, "Buffer has PTS %" GST_TIME_FORMAT " pts %"
- G_GINT64_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts);
- }
+ if (best) {
+ GstBuffer *buffer;
- if (GST_CLOCK_STIME_IS_VALID (best->dts)) {
- dts = GSTTIME_TO_MPEGTIME (best->dts);
- GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_STIME_FORMAT " dts %"
- G_GINT64_FORMAT, GST_STIME_ARGS (best->dts), dts);
- }
+ buffer = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best));
- /* should not have a DTS without PTS */
- if (!GST_CLOCK_STIME_IS_VALID (pts) && GST_CLOCK_STIME_IS_VALID (dts)) {
- GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
- pts = dts;
- }
+ ret =
+ gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg),
+ GST_AGGREGATOR_PAD (best), buffer);
- if (best->stream->is_video_stream) {
- delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
- header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
+ if (ret != GST_FLOW_OK)
+ goto done;
}
- if (best->stream->is_meta && gst_buffer_get_size (buf) > (G_MAXUINT16 - 3)) {
- GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
+ if (gst_base_ts_mux_are_all_pads_eos (mux)) {
+ GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
+ /* drain some possibly cached data */
+ if (klass->drain)
+ klass->drain (mux);
+ gst_base_ts_mux_push_packets (mux, TRUE);
- gst_buffer_unref (buf);
- return GST_FLOW_OK;
+ ret = GST_FLOW_EOS;
}
- GST_DEBUG_OBJECT (mux, "delta: %d", delta);
+done:
+ return ret;
+}
- stream_data = stream_data_new (buf);
- tsmux_stream_add_data (best->stream, stream_data->map_info.data,
- stream_data->map_info.size, stream_data, pts, dts, !delta);
+static gboolean
+gst_base_ts_mux_start (GstAggregator * agg)
+{
+ gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
- /* outgoing ts follows ts of PCR program stream */
- if (prog->pcr_stream == best->stream) {
- /* prefer DTS if present for PCR as it should be monotone */
- mux->last_ts =
- GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
- GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
- }
+ return TRUE;
+}
- mux->is_delta = delta;
- mux->is_header = header;
- while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
- if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
- /* Failed writing data for some reason. Set appropriate error */
- GST_DEBUG_OBJECT (mux, "Failed to write data packet");
- GST_ELEMENT_ERROR (mux, STREAM, MUX,
- ("Failed writing output data to stream %04x", best->stream->id),
- (NULL));
- goto write_fail;
- }
- }
- /* flush packet cache */
- return gst_base_ts_mux_push_packets (mux, FALSE);
+static gboolean
+gst_base_ts_mux_stop (GstAggregator * agg)
+{
+ gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), FALSE);
- /* ERRORS */
-write_fail:
- {
- return mux->last_flow_ret;
- }
-no_program:
- {
- if (buf)
- gst_buffer_unref (buf);
- GST_ELEMENT_ERROR (mux, STREAM, MUX,
- ("Stream on pad %" GST_PTR_FORMAT
- " is not associated with any program", COLLECT_DATA_PAD (best)),
- (NULL));
- return GST_FLOW_ERROR;
- }
+ return TRUE;
}
/* GObject implementation */
g_object_unref (mux->out_adapter);
mux->out_adapter = NULL;
}
- if (mux->collect) {
- gst_object_unref (mux->collect);
- mux->collect = NULL;
- }
if (mux->prog_map) {
gst_structure_free (mux->prog_map);
mux->prog_map = NULL;
const GValue * value, GParamSpec * pspec)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
- GSList *walk;
+ GList *walk;
switch (prop_id) {
case PROP_PROG_MAP:
tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
break;
case PROP_PMT_INTERVAL:
- walk = mux->collect->data;
+ walk = GST_ELEMENT (object)->sinkpads;
mux->pmt_interval = g_value_get_uint (value);
while (walk) {
- GstBaseTsPadData *ts_data = (GstBaseTsPadData *) walk->data;
+ GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data);
- tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval);
- walk = g_slist_next (walk);
+ tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
+ walk = g_list_next (walk);
}
break;
case PROP_ALIGNMENT:
TsMux *tsmux = tsmux_new ();
tsmux_set_write_func (tsmux, new_packet_cb, mux);
tsmux_set_alloc_func (tsmux, alloc_packet_cb, mux);
+ tsmux_set_bitrate (tsmux, mux->bitrate);
return tsmux;
}
gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
{
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+ GstAggregatorClass *gstagg_class = GST_AGGREGATOR_CLASS (klass);
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GST_DEBUG_CATEGORY_INIT (gst_base_ts_mux_debug, "basetsmux", 0,
gobject_class->constructed = gst_base_ts_mux_constructed;
gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
- gstelement_class->release_pad = gst_base_ts_mux_release_pad;
- gstelement_class->change_state = gst_base_ts_mux_change_state;
gstelement_class->send_event = gst_base_ts_mux_send_event;
+ gstagg_class->update_src_caps = gst_base_ts_mux_update_src_caps;
+ gstagg_class->aggregate = gst_base_ts_mux_aggregate;
+ gstagg_class->clip = gst_base_ts_mux_clip;
+ gstagg_class->sink_event = gst_base_ts_mux_sink_event;
+ gstagg_class->src_event = gst_base_ts_mux_src_event;
+ gstagg_class->start = gst_base_ts_mux_start;
+ gstagg_class->stop = gst_base_ts_mux_stop;
+
klass->create_ts_mux = gst_base_ts_mux_default_create_ts_mux;
klass->allocate_packet = gst_base_ts_mux_default_allocate_packet;
klass->output_packet = gst_base_ts_mux_default_output_packet;
" to achieve multiplex-wide constant bitrate",
0, G_MAXUINT64, TSMUX_DEFAULT_BITRATE,
(GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+ gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
+ &gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
}
static void
gst_base_ts_mux_init (GstBaseTsMux * mux)
{
- mux->srcpad =
- gst_pad_new_from_static_template (&gst_base_ts_mux_src_factory, "src");
- gst_pad_use_fixed_caps (mux->srcpad);
- gst_pad_set_event_function (mux->srcpad,
- GST_DEBUG_FUNCPTR (gst_base_ts_mux_src_event));
- gst_element_add_pad (GST_ELEMENT (mux), mux->srcpad);
-
- mux->collect = gst_collect_pads_new ();
- gst_collect_pads_set_buffer_function (mux->collect,
- (GstCollectPadsBufferFunction)
- GST_DEBUG_FUNCPTR (gst_base_ts_mux_collected_buffer), mux);
-
- gst_collect_pads_set_event_function (mux->collect,
- (GstCollectPadsEventFunction)
- GST_DEBUG_FUNCPTR (gst_base_ts_mux_sink_event), mux);
- gst_collect_pads_set_clip_function (mux->collect, (GstCollectPadsClipFunction)
- GST_DEBUG_FUNCPTR (gst_base_ts_mux_clip_inc_running_time), mux);
-
mux->out_adapter = gst_adapter_new ();
/* properties */
sinkpad = gst_element_get_request_pad (element, sinkname);
fail_if (sinkpad == NULL, "Could not get sink pad from %s",
GST_ELEMENT_NAME (element));
- /* references are owned by: 1) us, 2) tsmux, 3) collect pads */
- ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+ /* references are owned by: 1) us, 2) tsmux */
+ ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
"Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
gst_object_unref (sinkpad); /* because we got it higher up */
- /* references are owned by: 1) tsmux, 2) collect pads */
- ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
+ /* references are owned by: 1) tsmux */
+ ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 1);
if (padname)
*padname = g_strdup (GST_PAD_NAME (sinkpad));
/* clean up floating src pad */
if (!(sinkpad = gst_element_get_static_pad (element, sinkname)))
sinkpad = gst_element_get_request_pad (element, sinkname);
- /* pad refs held by 1) tsmux 2) collectpads and 3) us (through _get) */
- ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+ /* pad refs held by 1) tsmux 2) us (through _get) */
+ ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
srcpad = gst_pad_get_peer (sinkpad);
gst_pad_unlink (srcpad, sinkpad);
GST_DEBUG ("src %p", srcpad);
/* after unlinking, pad refs still held by
- * 1) tsmux and 2) collectpads and 3) us (through _get) */
- ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+ * 1) tsmux and 2) us (through _get) */
+ ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
gst_object_unref (sinkpad);
/* one more ref is held by element itself */
gint i;
gint pmt_pid = -1, el_pid = -1, pcr_pid = -1, packets = 0;
gchar *padname;
+ GstQuery *drain;
mux = setup_tsmux (srctemplate, sinkname, &padname);
ts += 40 * GST_MSECOND;
}
+ drain = gst_query_new_drain ();
+ gst_pad_peer_query (mysrcpad, drain);
+ gst_query_unref (drain);
+
if (check_func)
check_func (buffers);
GST_END_TEST;
-
-typedef struct _TestData
-{
- GstEvent *sink_event;
- gint src_events;
-} TestData;
-
-typedef struct _ThreadData
-{
- GstPad *pad;
- GstBuffer *buffer;
- GstFlowReturn flow_return;
- GThread *thread;
-} ThreadData;
-
-static gboolean
-src_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
- TestData *data = (TestData *) gst_pad_get_element_private (pad);
-
- if (event->type == GST_EVENT_CUSTOM_UPSTREAM)
- data->src_events += 1;
-
- gst_event_unref (event);
- return TRUE;
-}
-
-static gboolean
-sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
- TestData *data = (TestData *) gst_pad_get_element_private (pad);
-
- if (event->type == GST_EVENT_CUSTOM_DOWNSTREAM)
- data->sink_event = event;
-
- gst_event_unref (event);
- return TRUE;
-}
-
-static void
-link_sinks (GstElement * mpegtsmux,
- GstPad ** src1, GstPad ** src2, GstPad ** src3, TestData * test_data)
-{
- GstPad *mux_sink1, *mux_sink2, *mux_sink3;
-
- /* link 3 sink pads, 2 video 1 audio */
- *src1 = gst_pad_new_from_static_template (&video_src_template, "src1");
- gst_pad_set_active (*src1, TRUE);
- gst_pad_set_element_private (*src1, test_data);
- gst_pad_set_event_function (*src1, src_event);
- mux_sink1 = gst_element_get_request_pad (mpegtsmux, "sink_1");
- fail_unless (gst_pad_link (*src1, mux_sink1) == GST_PAD_LINK_OK);
-
- *src2 = gst_pad_new_from_static_template (&video_src_template, "src2");
- gst_pad_set_active (*src2, TRUE);
- gst_pad_set_element_private (*src2, test_data);
- gst_pad_set_event_function (*src2, src_event);
- mux_sink2 = gst_element_get_request_pad (mpegtsmux, "sink_2");
- fail_unless (gst_pad_link (*src2, mux_sink2) == GST_PAD_LINK_OK);
-
- *src3 = gst_pad_new_from_static_template (&audio_src_template, "src3");
- gst_pad_set_active (*src3, TRUE);
- gst_pad_set_element_private (*src3, test_data);
- gst_pad_set_event_function (*src3, src_event);
- mux_sink3 = gst_element_get_request_pad (mpegtsmux, "sink_3");
- fail_unless (gst_pad_link (*src3, mux_sink3) == GST_PAD_LINK_OK);
-
- gst_object_unref (mux_sink1);
- gst_object_unref (mux_sink2);
- gst_object_unref (mux_sink3);
-}
-
-static void
-link_src (GstElement * mpegtsmux, GstPad ** sink, TestData * test_data)
-{
- GstPad *mux_src;
-
- mux_src = gst_element_get_static_pad (mpegtsmux, "src");
- *sink = gst_pad_new_from_static_template (&sink_template, "sink");
- gst_pad_set_active (*sink, TRUE);
- gst_pad_set_event_function (*sink, sink_event);
- gst_pad_set_element_private (*sink, test_data);
- fail_unless (gst_pad_link (mux_src, *sink) == GST_PAD_LINK_OK);
-
- gst_object_unref (mux_src);
-}
-
-static void
-setup_caps (GstElement * mpegtsmux, GstPad * src1, GstPad * src2, GstPad * src3)
-{
- GstSegment segment;
- GstCaps *caps;
-
- gst_segment_init (&segment, GST_FORMAT_TIME);
-
- caps = gst_caps_new_simple ("video/x-h264",
- "stream-format", G_TYPE_STRING, "byte-stream",
- "alignment", G_TYPE_STRING, "nal", NULL);
- gst_pad_push_event (src1, gst_event_new_stream_start ("1"));
- gst_pad_push_event (src1, gst_event_new_caps (caps));
- gst_pad_push_event (src1, gst_event_new_segment (&segment));
- gst_pad_push_event (src2, gst_event_new_stream_start ("2"));
- gst_pad_push_event (src2, gst_event_new_caps (caps));
- gst_pad_push_event (src2, gst_event_new_segment (&segment));
- gst_caps_unref (caps);
- caps = gst_caps_new_simple ("audio/mpeg", "mpegversion", G_TYPE_INT, 4,
- "stream-format", G_TYPE_STRING, "raw", "framed", G_TYPE_BOOLEAN, TRUE,
- NULL);
- gst_pad_push_event (src3, gst_event_new_stream_start ("3"));
- gst_pad_push_event (src3, gst_event_new_caps (caps));
- gst_pad_push_event (src3, gst_event_new_segment (&segment));
- gst_caps_unref (caps);
-}
-
-static gpointer
-pad_push_thread (gpointer user_data)
-{
- ThreadData *data = (ThreadData *) user_data;
-
- data->flow_return = gst_pad_push (data->pad, data->buffer);
-
- return NULL;
-}
-
-static ThreadData *
-pad_push (GstPad * pad, GstBuffer * buffer, GstClockTime timestamp)
-{
- ThreadData *data;
-
- data = g_new0 (ThreadData, 1);
- data->pad = pad;
- data->buffer = buffer;
- GST_BUFFER_TIMESTAMP (buffer) = timestamp;
- data->thread = g_thread_try_new ("gst-check", pad_push_thread, data, NULL);
-
- return data;
-}
-
-GST_START_TEST (test_force_key_unit_event_downstream)
-{
- GstElement *mpegtsmux;
- GstPad *sink;
- GstPad *src1;
- GstPad *src2;
- GstPad *src3;
- GstEvent *sink_event;
- GstClockTime timestamp, stream_time, running_time;
- gboolean all_headers = TRUE;
- gint count = 0;
- ThreadData *thread_data_1, *thread_data_2, *thread_data_3, *thread_data_4;
- TestData test_data = { 0, };
-
- mpegtsmux = gst_check_setup_element ("mpegtsmux");
-
- link_src (mpegtsmux, &sink, &test_data);
- link_sinks (mpegtsmux, &src1, &src2, &src3, &test_data);
- gst_element_set_state (mpegtsmux, GST_STATE_PLAYING);
- setup_caps (mpegtsmux, src1, src2, src3);
-
- /* send a force-key-unit event with running_time=2s */
- timestamp = stream_time = running_time = 2 * GST_SECOND;
- sink_event = gst_video_event_new_downstream_force_key_unit (timestamp,
- stream_time, running_time, all_headers, count);
-
- fail_unless (gst_pad_push_event (src1, sink_event));
- fail_unless (test_data.sink_event == NULL);
-
- /* push 4 buffers, make sure mpegtsmux handles the force-key-unit event when
- * the buffer with the requested running time is collected */
- thread_data_1 = pad_push (src1, gst_buffer_new (), 1 * GST_SECOND);
- thread_data_2 = pad_push (src2, gst_buffer_new (), 2 * GST_SECOND);
- thread_data_3 = pad_push (src3, gst_buffer_new (), 3 * GST_SECOND);
-
- g_thread_join (thread_data_1->thread);
- fail_unless (test_data.sink_event == NULL);
-
- /* push again on src1 so that the buffer on src2 is collected */
- thread_data_4 = pad_push (src1, gst_buffer_new (), 4 * GST_SECOND);
-
- g_thread_join (thread_data_2->thread);
- fail_unless (test_data.sink_event != NULL);
-
- gst_element_set_state (mpegtsmux, GST_STATE_NULL);
-
- g_thread_join (thread_data_3->thread);
- g_thread_join (thread_data_4->thread);
-
- g_free (thread_data_1);
- g_free (thread_data_2);
- g_free (thread_data_3);
- g_free (thread_data_4);
- gst_object_unref (src1);
- gst_object_unref (src2);
- gst_object_unref (src3);
- gst_object_unref (sink);
- gst_object_unref (mpegtsmux);
-}
-
-GST_END_TEST;
-
-GST_START_TEST (test_force_key_unit_event_upstream)
-{
- GstElement *mpegtsmux;
- GstPad *sink;
- GstPad *src1;
- GstPad *src2;
- GstPad *src3;
- GstClockTime timestamp, stream_time, running_time;
- gboolean all_headers = TRUE;
- gint count = 0;
- TestData test_data = { 0, };
- ThreadData *thread_data_1, *thread_data_2, *thread_data_3, *thread_data_4;
- GstEvent *event;
-
- mpegtsmux = gst_check_setup_element ("mpegtsmux");
-
- link_src (mpegtsmux, &sink, &test_data);
- link_sinks (mpegtsmux, &src1, &src2, &src3, &test_data);
- gst_element_set_state (mpegtsmux, GST_STATE_PLAYING);
- setup_caps (mpegtsmux, src1, src2, src3);
-
- /* send an upstream force-key-unit event with running_time=2s */
- timestamp = stream_time = running_time = 2 * GST_SECOND;
- event =
- gst_video_event_new_upstream_force_key_unit (running_time, TRUE, count);
- fail_unless (gst_pad_push_event (sink, event));
-
- fail_unless (test_data.sink_event == NULL);
- fail_unless_equals_int (test_data.src_events, 3);
-
- /* send downstream events with unrelated seqnums */
- event = gst_video_event_new_downstream_force_key_unit (timestamp,
- stream_time, running_time, all_headers, count);
- fail_unless (gst_pad_push_event (src1, event));
- event = gst_video_event_new_downstream_force_key_unit (timestamp,
- stream_time, running_time, all_headers, count);
- fail_unless (gst_pad_push_event (src2, event));
-
- /* events should be skipped */
- fail_unless (test_data.sink_event == NULL);
-
- /* push 4 buffers, make sure mpegtsmux handles the force-key-unit event when
- * the buffer with the requested running time is collected */
- thread_data_1 = pad_push (src1, gst_buffer_new (), 1 * GST_SECOND);
- thread_data_2 = pad_push (src2, gst_buffer_new (), 2 * GST_SECOND);
- thread_data_3 = pad_push (src3, gst_buffer_new (), 3 * GST_SECOND);
-
- g_thread_join (thread_data_1->thread);
- fail_unless (test_data.sink_event == NULL);
-
- /* push again on src1 so that the buffer on src2 is collected */
- thread_data_4 = pad_push (src1, gst_buffer_new (), 4 * GST_SECOND);
-
- g_thread_join (thread_data_2->thread);
- fail_unless (test_data.sink_event != NULL);
-
- gst_element_set_state (mpegtsmux, GST_STATE_NULL);
-
- g_thread_join (thread_data_3->thread);
- g_thread_join (thread_data_4->thread);
-
- g_free (thread_data_1);
- g_free (thread_data_2);
- g_free (thread_data_3);
- g_free (thread_data_4);
-
- gst_object_unref (src1);
- gst_object_unref (src2);
- gst_object_unref (src3);
- gst_object_unref (sink);
- gst_object_unref (mpegtsmux);
-}
-
-GST_END_TEST;
-
-static GstFlowReturn expected_flow;
-
-static GstFlowReturn
-flow_test_stat_chain_func (GstPad * pad, GstObject * parent, GstBuffer * buffer)
-{
- gst_buffer_unref (buffer);
-
- GST_INFO ("returning flow %s (%d)", gst_flow_get_name (expected_flow),
- expected_flow);
- return expected_flow;
-}
-
-GST_START_TEST (test_propagate_flow_status)
-{
- GstElement *mux;
- gchar *padname;
- GstBuffer *inbuffer;
- GstCaps *caps;
- guint i;
-
- GstFlowReturn expected[] = { GST_FLOW_OK, GST_FLOW_FLUSHING, GST_FLOW_EOS,
- GST_FLOW_NOT_NEGOTIATED, GST_FLOW_ERROR, GST_FLOW_NOT_SUPPORTED
- };
-
- mux = setup_tsmux (&video_src_template, "sink_%d", &padname);
- gst_pad_set_chain_function (mysinkpad, flow_test_stat_chain_func);
-
- fail_unless (gst_element_set_state (mux,
- GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
- "could not set to playing");
-
- caps = gst_caps_from_string (VIDEO_CAPS_STRING);
- gst_check_setup_events (mysrcpad, mux, caps, GST_FORMAT_TIME);
- gst_caps_unref (caps);
-
- for (i = 0; i < G_N_ELEMENTS (expected); ++i) {
- GstFlowReturn res;
-
- inbuffer = gst_buffer_new_and_alloc (1);
- ASSERT_BUFFER_REFCOUNT (inbuffer, "inbuffer", 1);
-
- expected_flow = expected[i];
- GST_INFO ("expecting flow %s (%d)", gst_flow_get_name (expected_flow),
- expected_flow);
-
- GST_BUFFER_TIMESTAMP (inbuffer) = i * GST_SECOND;
-
- res = gst_pad_push (mysrcpad, inbuffer);
-
- fail_unless_equals_int (res, expected[i]);
- }
-
- cleanup_tsmux (mux, padname);
- g_free (padname);
-}
-
-GST_END_TEST;
-
GST_START_TEST (test_multiple_state_change)
{
GstElement *mux;
size_t num_transitions_to_test = 10;
mux = setup_tsmux (&video_src_template, "sink_%d", &padname);
- gst_pad_set_chain_function (mysinkpad, flow_test_stat_chain_func);
gst_segment_init (&segment, GST_FORMAT_TIME);
caps = gst_caps_from_string (VIDEO_CAPS_STRING);
gst_caps_unref (caps);
for (i = 0; i < num_transitions_to_test; ++i) {
+ GstQuery *drain;
GstState next_state = states[i % G_N_ELEMENTS (states)];
fail_unless (gst_element_set_state (mux,
next_state) == GST_STATE_CHANGE_SUCCESS,
inbuffer = gst_buffer_new_and_alloc (1);
ASSERT_BUFFER_REFCOUNT (inbuffer, "inbuffer", 1);
- expected_flow = GST_FLOW_OK;
GST_BUFFER_PTS (inbuffer) = 0;
fail_unless (GST_FLOW_OK == gst_pad_push (mysrcpad, inbuffer));
+
+ drain = gst_query_new_drain ();
+ gst_pad_peer_query (mysrcpad, drain);
+ gst_query_unref (drain);
}
}
tcase_add_test (tc_chain, test_audio);
tcase_add_test (tc_chain, test_video);
- tcase_add_test (tc_chain, test_force_key_unit_event_downstream);
- tcase_add_test (tc_chain, test_force_key_unit_event_upstream);
- tcase_add_test (tc_chain, test_propagate_flow_status);
tcase_add_test (tc_chain, test_multiple_state_change);
tcase_add_test (tc_chain, test_align);
tcase_add_test (tc_chain, test_keyframe_flag_propagation);