buf = gst_pad_pull (identity->sinkpad);
if (GST_IS_EVENT (buf)) {
- gst_pad_event_default (identity->sinkpad, GST_EVENT (buf));
- }
+ GstEvent *event = GST_EVENT (buf);
- gst_identity_chain (identity->sinkpad, buf);
+ if (GST_EVENT_IS_INTERRUPT (event)) {
+ gst_event_unref (event);
+ }
+ else {
+ gst_pad_event_default (identity->sinkpad, event);
+ }
+ }
+ else {
+ gst_identity_chain (identity->sinkpad, buf);
+ }
}
static void
GST_EVENT_SIZE,
GST_EVENT_RATE,
GST_EVENT_FILLER,
- GST_EVENT_TS_OFFSET
+ GST_EVENT_TS_OFFSET,
+ GST_EVENT_INTERRUPT
} GstEventType;
extern GType _gst_event_type;
#define GST_EVENT_TIMESTAMP(event) (GST_EVENT(event)->timestamp)
#define GST_EVENT_SRC(event) (GST_EVENT(event)->src)
+#define GST_EVENT_IS_INTERRUPT(event) (GST_EVENT_TYPE (event) == GST_EVENT_INTERRUPT)
+
#define GST_SEEK_FORMAT_SHIFT 0
#define GST_SEEK_METHOD_SHIFT 16
#define GST_SEEK_FLAGS_SHIFT 20
GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
- g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, NULL);
+ g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK,
+ GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT)));
peer = GST_RPAD_PEER (pad);
GST_PAD_NAME (pad), NULL);
}
else {
+restart:
if (peer->gethandler) {
GstBuffer *buf;
gboolean active = GST_PAD_IS_ACTIVE (peer);
if (buf) {
if (!gst_probe_dispatcher_dispatch (&peer->probedisp, GST_DATA (buf)))
- return NULL;
+ goto restart;
if (!GST_IS_EVENT (buf) && !active) {
g_warning ("pull on pad %s:%s but it is not active",
GST_DEBUG_PAD_NAME (peer));
- return NULL;
+ return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
}
return buf;
}
NULL);
}
}
- return NULL;
+ return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
}
/**
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!");
g_mutex_unlock (queue->qlock);
if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
- return NULL;
+ return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
GST_OPT_SCHEDULER_STATE_STOPPED,
GST_OPT_SCHEDULER_STATE_ERROR,
GST_OPT_SCHEDULER_STATE_RUNNING,
+ GST_OPT_SCHEDULER_STATE_INTERRUPTED
} GstOptSchedulerState;
struct _GstOptScheduler {
GST_DEBUG_PAD_NAME (pad), group);
buffer = GST_RPAD_GETFUNC (pad) (pad);
- if (buffer)
+ if (buffer) {
+ if (GST_EVENT_IS_INTERRUPT (buffer)) {
+ gst_event_unref (GST_EVENT (buffer));
+ break;
+ }
gst_pad_push (pad, buffer);
+ }
}
group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
else {
g_warning ("deadlock detected, disabling group %p", group);
group_error_handler (group);
- return NULL;
+ return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
}
#endif
+ /* if the scheduler interrupted, make sure we send an INTERRUPTED event to the
+ * > * loop based element */
+ if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
+ return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
+ }
if (GST_PAD_BUFLIST (srcpad)) {
buffer = (GstBuffer *) GST_PAD_BUFLIST (srcpad)->data;
static gboolean
gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
+ GST_INFO (GST_CAT_SCHEDULING, "interrupt from \"%s\"",
+ GST_ELEMENT_NAME (element));
+
#ifdef USE_COTHREADS
do_cothread_switch (do_cothread_get_main (((GstOptScheduler*)sched)->context));
return FALSE;
#else
+ {
+ GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+
+ osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
+ }
return TRUE;
#endif
}
schedule_chain (chain);
scheduled = TRUE;
}
+
+ /* don't schedule any more chains when interrupted or in error */
+ if (osched->state != GST_OPT_SCHEDULER_STATE_RUNNING)
+ break;
}
/* at this point it's possible that the scheduler state is
buf = gst_pad_pull (identity->sinkpad);
if (GST_IS_EVENT (buf)) {
- gst_pad_event_default (identity->sinkpad, GST_EVENT (buf));
- }
+ GstEvent *event = GST_EVENT (buf);
- gst_identity_chain (identity->sinkpad, buf);
+ if (GST_EVENT_IS_INTERRUPT (event)) {
+ gst_event_unref (event);
+ }
+ else {
+ gst_pad_event_default (identity->sinkpad, event);
+ }
+ }
+ else {
+ gst_identity_chain (identity->sinkpad, buf);
+ }
}
static void
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!");
g_mutex_unlock (queue->qlock);
if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
- return NULL;
+ return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {