static void
gst_my_filter_loop (GstMyFilter * filter)
{
+ GstFlowReturn ret;
guint64 len;
GstFormat fmt = GST_FORMAT_BYTES;
GstBuffer *buf = NULL;
- if (!gst_pad_query_position (filter->sinkpad, &fmt, NULL, &len)) {
+ if (!gst_pad_query_duration (filter->sinkpad, &fmt, &len)) {
+ GST_DEBUG_OBJECT (filter, "failed to query duration, pausing");
goto stop;
- } else if (filter->offset >= len) {
- gst_pad_push_event (filter->sinkpad, gst_event_new (GST_EVENT_EOS));
- } else if (gst_pad_pull_range (filter->sinkpad, filter->offset,
- BLOCKSIZE, &buf) != GST_FLOW_OK ||
- gst_pad_push (filter->sinkpad, buf) != GST_FLOW_OK) {
+ }
+
+ if (filter->offset >= len) {
+ GST_DEBUG_OBJECT (filter, "at end of input, sending EOS, pausing");
+ gst_pad_push_event (filter->srcpad, gst_event_new_eos ());
+ goto stop;
+ }
+
+ /* now, read BLOCKSIZE bytes from byte offset filter->offset */
+ ret = gst_pad_pull_range (filter->sinkpad, filter->offset,
+ BLOCKSIZE, &buf);
+
+ if (ret != GST_FLOW_OK) {
+ GST_DEBUG_OBJECT (filter, "pull_range failed: %s", gst_flow_get_name (ret));
goto stop;
- } else {
- filter->offset += BLOCKSIZE;
- return;
}
+ /* now push buffer downstream */
+ ret = gst_pad_push (filter->srcpad, buf);
+
+ buf = NULL; /* gst_pad_push() took ownership of buffer */
+
+ if (ret != GST_FLOW_OK) {
+ GST_DEBUG_OBJECT (filter, "pad_push failed: %s", gst_flow_get_name (ret));
+ goto stop;
+ }
+
+ /* everything is fine, increase offset and wait for us to be called again */
+ filter->offset += BLOCKSIZE;
+ return;
+
stop:
+ GST_DEBUG_OBJECT (filter, "pausing task");
gst_pad_pause_task (filter->sinkpad);
}
<!-- example-end task.c j -->