struct _GstStreamConsistency
{
- gboolean flushing;
- gboolean segment;
- gboolean eos;
- gulong probeid;
- GstPad *pad;
+ /* FIXME: do we want to track some states per pad? */
+ volatile gboolean flushing;
+ volatile gboolean newsegment;
+ volatile gboolean eos;
+ volatile gboolean expect_flush;
+ GstObject *parent;
+ GList *pads;
};
-
+ typedef struct _GstStreamConsistencyProbe
+ {
+ GstPad *pad;
+ gulong probeid;
+ } GstStreamConsistencyProbe;
+
static gboolean
-source_pad_data_cb (GstPad * pad, GstMiniObject * data,
+source_pad_data_cb (GstPad * pad, GstPadProbeInfo * info,
GstStreamConsistency * consist)
{
+ GstMiniObject *data = GST_PAD_PROBE_INFO_DATA (info);
+
+ GST_DEBUG_OBJECT (pad, "%p: %d %d %d %d", consist, consist->flushing,
+ consist->newsegment, consist->eos, consist->expect_flush);
+
if (GST_IS_BUFFER (data)) {
- GST_DEBUG_OBJECT (pad, "Buffer %" GST_TIME_FORMAT,
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (GST_BUFFER (data))));
+ GST_DEBUG_OBJECT (pad,
+ "Buffer pts %" GST_TIME_FORMAT ", dts %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (GST_BUFFER_PTS (GST_BUFFER_CAST (data))),
+ GST_TIME_ARGS (GST_BUFFER_DTS (GST_BUFFER_CAST (data))));
/* If an EOS went through, a buffer would be invalid */
fail_if (consist->eos, "Buffer received after EOS");
- /* Buffers need to be preceded by a newsegment event */
- fail_unless (consist->newsegment, "Buffer received without newsegment");
+ /* Buffers need to be preceded by a segment event */
+ fail_unless (consist->segment, "Buffer received without segment");
} else if (GST_IS_EVENT (data)) {
GstEvent *event = (GstEvent *) data;
fail_unless (consist->flushing,
"Received a FLUSH_STOP without a FLUSH_START");
fail_if (consist->eos, "Received a FLUSH_STOP after an EOS");
- consist->flushing = FALSE;
+ consist->flushing = consist->expect_flush = FALSE;
break;
- case GST_EVENT_NEWSEGMENT:
+ case GST_EVENT_SEGMENT:
+ fail_if ((consist->expect_flush && consist->flushing),
- "Received NEWSEGMENT while in a flushing seek");
- consist->newsegment = TRUE;
++ "Received SEGMENT while in a flushing seek");
+ consist->segment = TRUE;
consist->eos = FALSE;
break;
case GST_EVENT_EOS:
return TRUE;
}
-sink_pad_data_cb (GstPad * pad, GstMiniObject * data,
+ static gboolean
- case GST_EVENT_NEWSEGMENT:
++sink_pad_data_cb (GstPad * pad, GstPadProbeInfo * info,
+ GstStreamConsistency * consist)
+ {
++ GstMiniObject *data = GST_PAD_PROBE_INFO_DATA (info);
++
+ GST_DEBUG_OBJECT (pad, "%p: %d %d %d %d", consist, consist->flushing,
+ consist->newsegment, consist->eos, consist->expect_flush);
+
+ if (GST_IS_BUFFER (data)) {
+ GST_DEBUG_OBJECT (pad, "Buffer %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (GST_BUFFER (data))));
+ /* If an EOS went through, a buffer would be invalid */
+ fail_if (consist->eos, "Buffer received after EOS");
+ /* Buffers need to be preceded by a newsegment event */
+ fail_unless (consist->newsegment, "Buffer received without newsegment");
+ } else if (GST_IS_EVENT (data)) {
+ GstEvent *event = (GstEvent *) data;
+
+ GST_DEBUG_OBJECT (pad, "%s", GST_EVENT_TYPE_NAME (event));
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEEK:
+ {
+ GstSeekFlags flags;
+
+ gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL,
+ NULL);
+ consist->expect_flush =
+ ((flags & GST_SEEK_FLAG_FLUSH) == GST_SEEK_FLAG_FLUSH);
+ break;
+ }
- "Received NEWSEGMENT while in a flushing seek");
- consist->newsegment = TRUE;
++ case GST_EVENT_SEGMENT:
+ fail_if ((consist->expect_flush && consist->flushing),
- gst_pad_add_data_probe (pad, (GCallback) source_pad_data_cb, consist);
++ "Received SEGMENT while in a flushing seek");
++ consist->segment = TRUE;
+ consist->eos = FALSE;
+ break;
+ default:
+ /* FIXME : Figure out what to do for other events */
+ break;
+ }
+ }
+
+ return TRUE;
+ }
+
+ static void
+ add_pad (GstStreamConsistency * consist, GstPad * pad)
+ {
+ GstStreamConsistencyProbe *p;
+ GstPadDirection dir;
+
+ p = g_new0 (GstStreamConsistencyProbe, 1);
+ p->pad = g_object_ref (pad);
+ dir = gst_pad_get_direction (pad);
+ if (dir == GST_PAD_SRC) {
++
+ p->probeid =
- gst_pad_add_data_probe (pad, (GCallback) sink_pad_data_cb, consist);
++ gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
++ (GstPadProbeCallback) source_pad_data_cb, consist, NULL);
++
+ } else if (dir == GST_PAD_SINK) {
+ p->probeid =
++ gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
++ (GstPadProbeCallback) sink_pad_data_cb, consist, NULL);
+ }
+ consist->pads = g_list_prepend (consist->pads, p);
+ }
+
/**
* gst_consistency_checker_new:
* @pad: The #GstPad on which the dataflow will be checked.
void
gst_consistency_checker_free (GstStreamConsistency * consist)
{
- gst_pad_remove_data_probe (p->pad, p->probeid);
+ GList *node;
+ GstStreamConsistencyProbe *p;
+
+ /* Remove the data probe */
+ gst_pad_remove_probe (consist->pad, consist->probeid);
+ g_object_unref (consist->pad);
++
+ /* Remove the data probes */
+ for (node = consist->pads; node; node = g_list_next (node)) {
+ p = (GstStreamConsistencyProbe *) node->data;
++ gst_pad_remove_probe (p->pad, p->probeid);
+ g_object_unref (p->pad);
+ g_free (p);
+ }
+ g_list_free (consist->pads);
g_free (consist);
}