GMutex lock;
GCond cond;
gboolean flushing;
+ gboolean clear_to_send;
};
G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
g_mutex_init (&self->lock);
g_cond_init (&self->cond);
self->flushing = FALSE;
+ self->clear_to_send = FALSE;
}
static void gst_sctp_enc_finalize (GObject * object);
{
GstSctpEnc *self = GST_SCTP_ENC (parent);
GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
+ GstSctpEncPad *sctpenc_pad_next = NULL;
GstMapInfo map;
guint32 ppid;
gboolean ordered;
GstFlowReturn flow_ret = GST_FLOW_ERROR;
const guint8 *data;
guint32 length;
+ gboolean clear_to_send;
GST_OBJECT_LOCK (self);
if (self->src_ret != GST_FLOW_OK) {
data = map.data;
length = map.size;
+ GST_OBJECT_LOCK (self);
+ clear_to_send = g_queue_is_empty (&self->pending_pads);
+ g_queue_push_tail (&self->pending_pads, sctpenc_pad);
+ GST_OBJECT_UNLOCK (self);
+
g_mutex_lock (&sctpenc_pad->lock);
+
+ if (clear_to_send) {
+ sctpenc_pad->clear_to_send = TRUE;
+ }
+
+ while (!sctpenc_pad->flushing && !sctpenc_pad->clear_to_send) {
+ g_cond_wait (&sctpenc_pad->cond, &sctpenc_pad->lock);
+ }
+
while (!sctpenc_pad->flushing) {
guint32 bytes_sent;
length -= bytes_sent;
/* The buffer was probably full. Retry in a while */
- GST_OBJECT_LOCK (self);
- g_queue_push_tail (&self->pending_pads, sctpenc_pad);
- GST_OBJECT_UNLOCK (self);
-
g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
- GST_OBJECT_LOCK (self);
- g_queue_remove (&self->pending_pads, sctpenc_pad);
- GST_OBJECT_UNLOCK (self);
} else if (bytes_sent == length) {
GST_DEBUG_OBJECT (pad, "Successfully sent buffer");
sctpenc_pad->bytes_sent += bytes_sent;
flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
out:
+ sctpenc_pad->clear_to_send = FALSE;
g_mutex_unlock (&sctpenc_pad->lock);
+ GST_OBJECT_LOCK (self);
+ g_queue_remove (&self->pending_pads, sctpenc_pad);
+ sctpenc_pad_next = g_queue_peek_head (&self->pending_pads);
+ GST_OBJECT_UNLOCK (self);
+
+ if (sctpenc_pad_next) {
+ g_mutex_lock (&sctpenc_pad_next->lock);
+ sctpenc_pad_next->clear_to_send = TRUE;
+ g_cond_signal (&sctpenc_pad_next->cond);
+ g_mutex_unlock (&sctpenc_pad_next->lock);
+ }
+
gst_buffer_unmap (buffer, &map);
error:
gst_buffer_unref (buffer);
GstSctpEnc *self = user_data;
GstBuffer *gstbuf;
GstDataQueueItem *item;
- GList *pending_pads, *l;
GstSctpEncPad *sctpenc_pad;
GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT,
GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
}
- /* Wake up pads in the order they waited, oldest pad first */
+ /* Wake up the oldest pad which is the one that needs to finish first */
GST_OBJECT_LOCK (self);
- pending_pads = NULL;
- while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
- pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
- }
- GST_OBJECT_UNLOCK (self);
+ sctpenc_pad = g_queue_peek_head (&self->pending_pads);
+ if (sctpenc_pad) {
+ gst_object_ref (sctpenc_pad);
+
+ GST_OBJECT_UNLOCK (self);
- for (l = pending_pads; l; l = l->next) {
- sctpenc_pad = l->data;
g_mutex_lock (&sctpenc_pad->lock);
g_cond_signal (&sctpenc_pad->cond);
g_mutex_unlock (&sctpenc_pad->lock);
+
+ gst_object_unref (sctpenc_pad);
+ } else {
+ GST_OBJECT_UNLOCK (self);
}
- g_list_free (pending_pads);
}
static void