queue->cur_level.time, \
queue->max_level.time, \
(guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
- queue->writing_pos - queue->max_reading_pos : \
+ queue->current->writing_pos - queue->current->max_reading_pos : \
queue->queue->length))
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
G_OBJECT_CLASS (parent_class)->finalize (object);
}
+static void
+debug_ranges (GstQueue2 * queue)
+{
+ GstQueue2Range *walk;
+
+ for (walk = queue->ranges; walk; walk = walk->next) {
+ GST_DEBUG_OBJECT (queue, "range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT,
+ walk->offset, walk->writing_pos);
+ }
+}
+
+/* clear all the downloaded ranges */
+static void
+clean_ranges (GstQueue2 * queue)
+{
+ GST_DEBUG_OBJECT (queue, "clean queue ranges");
+
+ g_slice_free_chain (GstQueue2Range, queue->ranges, next);
+ queue->ranges = NULL;
+ queue->current = NULL;
+}
+
+/* find a range that contains @offset or NULL when nothing does */
+static GstQueue2Range *
+find_range (GstQueue2 * queue, guint64 offset, guint64 length)
+{
+ GstQueue2Range *range, *walk;
+
+ /* first do a quick check for the current range */
+ for (walk = queue->ranges; walk; walk = walk->next) {
+ if (offset >= walk->offset && offset <= walk->writing_pos) {
+ /* we can reuse an existing range */
+ range = walk;
+ break;
+ }
+ }
+ return range;
+}
+
+/* make a new range for @offset or reuse an existing range */
+static GstQueue2Range *
+add_range (GstQueue2 * queue, guint64 offset)
+{
+ GstQueue2Range *range, *prev, *next;
+
+ GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
+
+ if ((range = find_range (queue, offset, 0))) {
+ GST_DEBUG_OBJECT (queue,
+ "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
+ range->writing_pos);
+ range->writing_pos = offset;
+ } else {
+ GST_DEBUG_OBJECT (queue,
+ "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
+
+ range = g_slice_new0 (GstQueue2Range);
+ range->offset = offset;
+ range->writing_pos = offset;
+ range->reading_pos = offset;
+ range->max_reading_pos = offset;
+
+ /* insert sorted */
+ prev = NULL;
+ next = queue->ranges;
+ while (next) {
+ if (next->offset > offset) {
+ /* insert before next */
+ GST_DEBUG_OBJECT (queue,
+ "insert before range %p, offset %" G_GUINT64_FORMAT, next,
+ next->offset);
+ break;
+ }
+ /* try next */
+ prev = next;
+ next = next->next;
+ }
+ range->next = next;
+ if (prev)
+ prev->next = range;
+ else
+ queue->ranges = range;
+ }
+ debug_ranges (queue);
+
+ return range;
+}
+
+
+/* clear and init the download ranges for offset 0 */
+static void
+init_ranges (GstQueue2 * queue)
+{
+ GST_DEBUG_OBJECT (queue, "init queue ranges");
+
+ /* get rid of all the current ranges */
+ clean_ranges (queue);
+ /* make a range for offset 0 */
+ queue->current = add_range (queue, 0);
+}
+
static gboolean
gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
{
if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
buffering_left =
(gdouble) ((duration -
- queue->writing_pos) * 1000) / queue->byte_in_rate;
+ queue->current->writing_pos) * 1000) / queue->byte_in_rate;
} else {
buffering_left = G_MAXINT64;
}
guint size;
guint8 *data;
int ret;
+ guint64 writing_pos, max_reading_pos;
+ GstQueue2Range *next;
+
+ writing_pos = queue->current->writing_pos;
+ max_reading_pos = queue->current->max_reading_pos;
#ifdef HAVE_FSEEKO
- fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET);
+ fseeko (queue->temp_file, (off_t) writing_pos, SEEK_SET);
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
- lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET);
+ lseek (fileno (queue->temp_file), (off_t) writing_pos, SEEK_SET);
#else
- fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
+ fseek (queue->temp_file, writing_pos, SEEK_SET);
#endif
data = GST_BUFFER_DATA (buffer);
/* FIXME do something useful here */
GST_ERROR_OBJECT (queue, "fwrite returned error");
}
- queue->writing_pos += size;
+ writing_pos += size;
- if (queue->writing_pos > queue->max_reading_pos)
- queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
+ if (writing_pos > max_reading_pos)
+ queue->cur_level.bytes = writing_pos - max_reading_pos;
else
queue->cur_level.bytes = 0;
+
+ /* try to merge with next range */
+ while ((next = queue->current->next)) {
+ GST_DEBUG_OBJECT (queue,
+ "cheking %" G_GUINT64_FORMAT " < %" G_GUINT64_FORMAT, writing_pos,
+ next->offset);
+ if (writing_pos < next->offset)
+ break;
+
+ GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
+ next->writing_pos);
+ /* we ran over the offset of the next group */
+ queue->current->writing_pos = writing_pos = next->writing_pos;
+
+ /* remove the group */
+ queue->current->next = next->next;
+ g_slice_free (GstQueue2Range, next);
+
+ debug_ranges (queue);
+ }
+ queue->current->writing_pos = writing_pos;
+}
+
+static gboolean
+perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
+{
+ GstEvent *event;
+ gboolean res;
+
+ GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
+
+ event =
+ gst_event_new_seek (1.0, GST_FORMAT_BYTES,
+ GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
+ GST_SEEK_TYPE_NONE, -1);
+
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ res = gst_pad_push_event (queue->sinkpad, event);
+ GST_QUEUE2_MUTEX_LOCK (queue);
+
+ if (res) {
+ queue->current = add_range (queue, offset);
+ }
+ return res;
}
/* see if there is enough data in the file to read a full buffer */
static gboolean
gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
{
- GST_DEBUG_OBJECT (queue,
- "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
- length, queue->writing_pos);
- if (queue->is_eos)
- return TRUE;
+ GstQueue2Range *range;
- if (offset + length < queue->writing_pos)
- return TRUE;
+ GST_DEBUG_OBJECT (queue, "offset %" G_GUINT64_FORMAT ", len %u", offset,
+ length);
+
+ if ((range = find_range (queue, offset, length))) {
+ if (queue->current != range) {
+ GST_DEBUG_OBJECT (queue, "switching ranges");
+ perform_seek_to_offset (queue, range->writing_pos);
+ }
+ /* we have a range for offset */
+ GST_DEBUG_OBJECT (queue,
+ "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
+ G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
+
+ if (queue->is_eos)
+ return TRUE;
+
+ if (offset + length < range->writing_pos)
+ return TRUE;
+
+ } else {
+ /* we don't have the range, see how far away we are */
+ perform_seek_to_offset (queue, offset);
+ }
return FALSE;
}
{
size_t res;
GstBuffer *buf;
+ guint64 reading_pos, max_reading_pos, writing_pos;
/* check if we have enough data at @offset. If there is not enough data, we
* block and wait. */
*buffer = buf;
- queue->reading_pos = offset + length;
- queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos);
+ reading_pos = queue->current->reading_pos;
+ writing_pos = queue->current->writing_pos;
+ max_reading_pos = queue->current->max_reading_pos;
+
+ reading_pos = offset + length;
+ max_reading_pos = MAX (max_reading_pos, reading_pos);
- if (queue->writing_pos > queue->max_reading_pos)
- queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
+ if (writing_pos > max_reading_pos)
+ queue->cur_level.bytes = writing_pos - max_reading_pos;
else
queue->cur_level.bytes = 0;
+ queue->current->reading_pos = reading_pos;
+ queue->current->max_reading_pos = max_reading_pos;
+
return GST_FLOW_OK;
/* ERRORS */
} else {
GstFlowReturn ret;
GstBuffer *buffer;
+ guint64 reading_pos;
+
+ reading_pos = queue->current->reading_pos;
ret =
- gst_queue2_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
+ gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
&buffer);
switch (ret) {
case GST_FLOW_OK:
gint fd = -1;
gchar *name = NULL;
+ if (queue->temp_file)
+ goto already_opened;
+
GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
/* we have two cases:
if (queue->temp_file == NULL)
goto open_failed;
}
+ GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
- queue->writing_pos = 0;
- queue->reading_pos = 0;
- queue->max_reading_pos = 0;
+ init_ranges (queue);
return TRUE;
/* ERRORS */
+already_opened:
+ {
+ GST_DEBUG_OBJECT (queue, "temp file was already open");
+ return TRUE;
+ }
no_directory:
{
GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
remove (queue->temp_location);
queue->temp_file = NULL;
+ clean_ranges (queue);
}
static void
queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
- queue->writing_pos = 0;
- queue->reading_pos = 0;
- queue->max_reading_pos = 0;
+ init_ranges (queue);
}
static void
case GST_EVENT_EOS:
/* Zero the thresholds, this makes sure the queue is completely
* filled and we can read all data from the queue. */
+ GST_DEBUG_OBJECT (queue, "we have EOS");
queue->is_eos = TRUE;
break;
case GST_EVENT_NEWSEGMENT:
case GST_EVENT_FLUSH_START:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
- /* forward event */
- gst_pad_push_event (queue->srcpad, event);
+ if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+ /* forward event */
+ gst_pad_push_event (queue->srcpad, event);
- /* now unblock the chain function */
- GST_QUEUE2_MUTEX_LOCK (queue);
- queue->srcresult = GST_FLOW_WRONG_STATE;
- /* unblock the loop and chain functions */
- g_cond_signal (queue->item_add);
- g_cond_signal (queue->item_del);
- GST_QUEUE2_MUTEX_UNLOCK (queue);
+ /* now unblock the chain function */
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ queue->srcresult = GST_FLOW_WRONG_STATE;
+ /* unblock the loop and chain functions */
+ g_cond_signal (queue->item_add);
+ g_cond_signal (queue->item_del);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
- /* make sure it pauses, this should happen since we sent
- * flush_start downstream. */
- gst_pad_pause_task (queue->srcpad);
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+ /* make sure it pauses, this should happen since we sent
+ * flush_start downstream. */
+ gst_pad_pause_task (queue->srcpad);
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+ }
goto done;
}
case GST_EVENT_FLUSH_STOP:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
- /* forward event */
- gst_pad_push_event (queue->srcpad, event);
- GST_QUEUE2_MUTEX_LOCK (queue);
- gst_queue2_locked_flush (queue);
- queue->srcresult = GST_FLOW_OK;
- queue->is_eos = FALSE;
- queue->unexpected = FALSE;
- /* reset rate counters */
- reset_rate_timer (queue);
- gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
- queue->srcpad);
- GST_QUEUE2_MUTEX_UNLOCK (queue);
+ if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+ /* forward event */
+ gst_pad_push_event (queue->srcpad, event);
+
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ gst_queue2_locked_flush (queue);
+ queue->srcresult = GST_FLOW_OK;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
+ /* reset rate counters */
+ reset_rate_timer (queue);
+ gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
+ queue->srcpad);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ } else {
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ queue->segment_event_received = FALSE;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ }
goto done;
}
default:
return FALSE;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
- return queue->writing_pos == queue->max_reading_pos;
+ return queue->current->writing_pos == queue->current->max_reading_pos;
} else {
if (queue->queue->length == 0)
return TRUE;
/* just forward upstream */
res = gst_pad_push_event (queue->sinkpad, event);
} else {
- /* when using a temp file, we unblock the pending read */
+ /* when using a temp file, we eat the event */
res = TRUE;
gst_event_unref (event);
}
GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
} else {
gint64 start, stop;
+ guint64 writing_pos;
gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
&duration))
goto peer_failed;
+ writing_pos = queue->current->writing_pos;
+
GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %"
- G_GINT64_FORMAT, duration, queue->writing_pos);
+ G_GINT64_FORMAT, duration, writing_pos);
start = 0;
/* get our available data relative to the duration */
if (duration != -1)
- stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration;
+ stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
else
stop = -1;
break;
}
case GST_FORMAT_BYTES:
start = 0;
- stop = queue->writing_pos;
+ stop = queue->current->writing_pos;
break;
default:
start = -1;
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
- offset = (offset == -1) ? queue->reading_pos : offset;
+ offset = (offset == -1) ? queue->current->reading_pos : offset;
/* function will block when the range is not yet available */
ret = gst_queue2_create_read (queue, offset, length, buffer);
if (active) {
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ /* open the temp file now */
+ result = gst_queue2_open_temp_location_file (queue);
+
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating pull mode");
queue->srcresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
- result = TRUE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);