From 73e27fb0173a9d5464597f0469c50f2a3294652d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 15 Jun 2010 16:12:02 +0200 Subject: [PATCH] queue2; cleanups and fixes Make a macro for some frequent checks Emit the removed signal in all cases when we remove something --- plugins/elements/gstqueue2.c | 103 ++++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 56 deletions(-) diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index 07bd69d..c79c327 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -97,6 +97,7 @@ enum #define DEFAULT_BUFFER_SIZE 4096 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL) #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer) /* for consistency with the above macro */ +#define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue)) /* default property values */ #define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */ @@ -148,7 +149,7 @@ enum queue->max_level.bytes, \ queue->cur_level.time, \ queue->max_level.time, \ - (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ + (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \ queue->current->writing_pos - queue->current->max_reading_pos : \ queue->queue->length)) @@ -792,7 +793,7 @@ update_buffering (GstQueue2 * queue) queue->buffering_percent = percent; - if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) { + if (!QUEUE_IS_USING_QUEUE (queue)) { GstFormat fmt = GST_FORMAT_BYTES; gint64 duration; @@ -1105,11 +1106,12 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) /* we don't have the range, see how far away we are, FIXME, find a good * threshold based on the incomming rate. */ if (!queue->is_eos && queue->current) { - if (QUEUE_IS_USING_RING_BUFFER (queue) && (offset < queue->current->offset - || offset > - queue->current->writing_pos + queue->max_level.bytes - - queue->cur_level.bytes)) { - perform_seek_to_offset (queue, offset); + if (QUEUE_IS_USING_RING_BUFFER (queue)) { + if (offset < queue->current->offset || offset > + queue->current->writing_pos + queue->max_level.bytes - + queue->cur_level.bytes) { + perform_seek_to_offset (queue, offset); + } } else if (offset < queue->current->writing_pos + 200000) { update_cur_pos (queue, queue->current, offset + length); GST_INFO_OBJECT (queue, "wait for data"); @@ -1174,6 +1176,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, guint8 *data; guint64 file_offset; guint block_length, remaining, read_length; + gint64 read_return; /* allocate the output buffer of the requested size */ buf = gst_buffer_new_and_alloc (length); @@ -1221,21 +1224,15 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, /* while we still have data to read, we loop */ while (read_length > 0) { - gint64 read_return; - read_return = gst_queue2_read_data_at_offset (queue, file_offset, block_length, data); - if (read_return < 0) { - gst_buffer_unref (buf); - return read_return; - } + if (read_return < 0) + goto read_error; - if (QUEUE_IS_USING_RING_BUFFER (queue)) { - file_offset = (file_offset + read_return) % queue->ring_buffer_max_size; - } else { - file_offset += read_return; - } + file_offset += read_return; + if (QUEUE_IS_USING_RING_BUFFER (queue)) + file_offset %= queue->ring_buffer_max_size; data += read_return; read_length -= read_return; @@ -1261,6 +1258,12 @@ out_flushing: GST_DEBUG_OBJECT (queue, "we are flushing"); return GST_FLOW_WRONG_STATE; } +read_error: + { + GST_DEBUG_OBJECT (queue, "we have a read error"); + gst_buffer_unref (buf); + return read_return; + } } /* should be called with QUEUE_LOCK */ @@ -1282,6 +1285,7 @@ gst_queue2_read_item_from_file (GstQueue2 * queue) ret = gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE, &buffer); + switch (ret) { case GST_FLOW_OK: item = GST_MINI_OBJECT_CAST (buffer); @@ -1419,7 +1423,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue) static void gst_queue2_locked_flush (GstQueue2 * queue) { - if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) { + if (!QUEUE_IS_USING_QUEUE (queue)) { gst_queue2_flush_temp_file (queue); } else { while (!g_queue_is_empty (queue->queue)) { @@ -1493,15 +1497,16 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) rem = buffer; - rb_space = - queue->ring_buffer_max_size - (queue->current->writing_pos - - queue->current->reading_pos); - while (rb_space <= 0) { - GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); + do { rb_space = queue->ring_buffer_max_size - (queue->current->writing_pos - queue->current->reading_pos); - } + + if (rb_space > 0) + break; + + GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); + } while (TRUE); /* loop if we can't write the whole buffer at once */ do { @@ -1718,8 +1723,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) size = GST_BUFFER_SIZE (buffer); /* add buffer to the statistics */ - if (!(QUEUE_IS_USING_TEMP_FILE (queue) - || QUEUE_IS_USING_RING_BUFFER (queue))) { + if (QUEUE_IS_USING_QUEUE (queue)) { queue->cur_level.buffers++; queue->cur_level.bytes += size; } @@ -1753,8 +1757,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) apply_segment (queue, event, &queue->sink_segment); /* This is our first new segment, we hold it * as we can't save it on the temp file */ - if (QUEUE_IS_USING_RING_BUFFER (queue) - || QUEUE_IS_USING_TEMP_FILE (queue)) { + if (!QUEUE_IS_USING_QUEUE (queue)) { if (queue->segment_event_received) goto unexpected_event; @@ -1769,8 +1772,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) queue->unexpected = FALSE; break; default: - if (QUEUE_IS_USING_RING_BUFFER (queue) - || QUEUE_IS_USING_TEMP_FILE (queue)) + if (!QUEUE_IS_USING_QUEUE (queue)) goto unexpected_event; break; } @@ -1785,8 +1787,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) /* update the buffering status */ update_buffering (queue); - if (!(QUEUE_IS_USING_TEMP_FILE (queue) - || QUEUE_IS_USING_RING_BUFFER (queue))) + if (QUEUE_IS_USING_QUEUE (queue)) g_queue_push_tail (queue->queue, item); else gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); @@ -1815,7 +1816,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue) { GstMiniObject *item; - if (QUEUE_IS_USING_TEMP_FILE (queue) || QUEUE_IS_USING_RING_BUFFER (queue)) + if (!QUEUE_IS_USING_QUEUE (queue)) item = gst_queue2_read_item_from_file (queue); else item = g_queue_pop_head (queue->queue); @@ -1833,8 +1834,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue) GST_CAT_LOG_OBJECT (queue_dataflow, queue, "retrieved buffer %p from queue", buffer); - if (!(QUEUE_IS_USING_TEMP_FILE (queue) - || QUEUE_IS_USING_RING_BUFFER (queue))) { + if (QUEUE_IS_USING_QUEUE (queue)) { queue->cur_level.buffers--; queue->cur_level.bytes -= size; } @@ -1863,14 +1863,13 @@ gst_queue2_locked_dequeue (GstQueue2 * queue) default: break; } - GST_QUEUE2_SIGNAL_DEL (queue); } else { g_warning ("Unexpected item %p dequeued from queue %s (refcounting problem?)", item, GST_OBJECT_NAME (queue)); item = NULL; - GST_QUEUE2_SIGNAL_DEL (queue); } + GST_QUEUE2_SIGNAL_DEL (queue); return item; @@ -1893,8 +1892,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) case GST_EVENT_FLUSH_START: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event"); - if (!QUEUE_IS_USING_RING_BUFFER (queue) - || !QUEUE_IS_USING_TEMP_FILE (queue)) { + if (QUEUE_IS_USING_QUEUE (queue)) { /* forward event */ gst_pad_push_event (queue->srcpad, event); @@ -1918,8 +1916,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event"); - if (!QUEUE_IS_USING_RING_BUFFER (queue) - || !QUEUE_IS_USING_TEMP_FILE (queue)) { + if (QUEUE_IS_USING_QUEUE (queue)) { /* forward event */ gst_pad_push_event (queue->srcpad, event); @@ -1985,8 +1982,7 @@ gst_queue2_is_empty (GstQueue2 * queue) if (queue->is_eos) return FALSE; - if ((QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) - && queue->current) { + if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) { return queue->current->writing_pos <= queue->current->max_reading_pos; } else { if (queue->queue->length == 0) @@ -2282,8 +2278,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: - if (QUEUE_IS_USING_RING_BUFFER (queue) - || !QUEUE_IS_USING_TEMP_FILE (queue)) { + if (QUEUE_IS_USING_QUEUE (queue)) { /* just forward upstream */ res = gst_pad_push_event (queue->sinkpad, event); } else { @@ -2300,8 +2295,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) } break; case GST_EVENT_FLUSH_STOP: - if (QUEUE_IS_USING_RING_BUFFER (queue) - || !QUEUE_IS_USING_TEMP_FILE (queue)) { + if (QUEUE_IS_USING_QUEUE (queue)) { /* just forward upstream */ res = gst_pad_push_event (queue->sinkpad, event); } else { @@ -2391,8 +2385,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) GST_DEBUG_OBJECT (queue, "query buffering"); /* FIXME - is this condition correct? what should ring buffer do? */ - if (!(QUEUE_IS_USING_RING_BUFFER (queue) - || QUEUE_IS_USING_TEMP_FILE (queue))) { + if (QUEUE_IS_USING_QUEUE (queue)) { /* no temp file, just forward to the peer */ if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) goto peer_failed; @@ -2546,7 +2539,7 @@ gst_queue2_src_checkgetrange_function (GstPad * pad) queue = GST_QUEUE2 (gst_pad_get_parent (pad)); /* we can operate in pull mode when we are using a tempfile */ - ret = QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue); + ret = !QUEUE_IS_USING_QUEUE (queue); gst_object_unref (GST_OBJECT (queue)); @@ -2634,7 +2627,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active) queue = GST_QUEUE2 (gst_pad_get_parent (pad)); if (active) { - if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) { + if (!QUEUE_IS_USING_QUEUE (queue)) { /* open the temp file now */ result = gst_queue2_open_temp_location_file (queue); @@ -2682,8 +2675,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - if (QUEUE_IS_USING_RING_BUFFER (queue) - || QUEUE_IS_USING_TEMP_FILE (queue)) { + if (!QUEUE_IS_USING_QUEUE (queue)) { if (!gst_queue2_open_temp_location_file (queue)) ret = GST_STATE_CHANGE_FAILURE; } @@ -2708,8 +2700,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: - if (QUEUE_IS_USING_RING_BUFFER (queue) - || QUEUE_IS_USING_TEMP_FILE (queue)) + if (!QUEUE_IS_USING_QUEUE (queue)) gst_queue2_close_temp_location_file (queue); if (queue->starting_segment != NULL) { gst_event_unref (queue->starting_segment); -- 2.7.4