GstRtmpLocation location;
gboolean async_connect;
GstStructure *stats;
+ guint idle_timeout;
/* If both self->lock and OBJECT_LOCK are needed,
* self->lock must be taken first */
GCond cond;
gboolean running, flushing;
+ gboolean timeout;
+ gboolean started;
GstTask *task;
GRecMutex task_lock;
PROP_FLASH_VERSION,
PROP_ASYNC_CONNECT,
PROP_STATS,
+ PROP_IDLE_TIMEOUT,
};
+#define DEFAULT_IDLE_TIMEOUT 0
+
/* pad templates */
static GstStaticPadTemplate gst_rtmp2_src_src_template =
g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_IDLE_TIMEOUT,
+ g_param_spec_uint ("idle-timeout", "Idle timeout",
+ "The maximum allowed time in seconds for valid packets not to arrive "
+ "from the peer (0 = no timeout)",
+ 0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
"debug category for rtmp2src element");
}
gst_rtmp2_src_init (GstRtmp2Src * self)
{
self->async_connect = TRUE;
+ self->idle_timeout = DEFAULT_IDLE_TIMEOUT;
g_mutex_init (&self->lock);
g_cond_init (&self->cond);
self->async_connect = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (self);
break;
+ case PROP_IDLE_TIMEOUT:
+ GST_OBJECT_LOCK (self);
+ self->idle_timeout = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
case PROP_STATS:
g_value_take_boxed (value, gst_rtmp2_src_get_stats (self));
break;
+ case PROP_IDLE_TIMEOUT:
+ GST_OBJECT_LOCK (self);
+ g_value_set_uint (value, self->idle_timeout);
+ GST_OBJECT_UNLOCK (self);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
self->stream_id = 0;
self->sent_header = FALSE;
self->last_ts = GST_CLOCK_TIME_NONE;
+ self->timeout = FALSE;
+ self->started = FALSE;
if (async) {
gst_task_start (self->task);
return TRUE;
}
+static gboolean
+on_timeout (GstRtmp2Src * self)
+{
+ g_mutex_lock (&self->lock);
+ self->timeout = TRUE;
+ g_cond_broadcast (&self->cond);
+ g_mutex_unlock (&self->lock);
+
+ return G_SOURCE_REMOVE;
+}
+
static GstFlowReturn
gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
GstBuffer ** outbuf)
GstBuffer *message, *buffer;
GstRtmpMeta *meta;
guint32 timestamp = 0;
+ GSource *timeout = NULL;
+ GstFlowReturn ret = GST_FLOW_OK;
static const guint8 flv_header_data[] = {
0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00,
gst_task_start (self->task);
}
+ /* wait until GMainLoop begins running so that we can attach
+ * timeout source safely.
+ * If the task stopped meanwhile, "running" will be FALSE
+ * than stop_task() will wake up us as well
+ */
+ while ((!self->started && self->running) && (!self->loop
+ || !g_main_loop_is_running (self->loop)))
+ g_cond_wait (&self->cond, &self->lock);
+
+ GST_OBJECT_LOCK (self);
+ if (self->idle_timeout && self->context) {
+ timeout = g_timeout_source_new_seconds (self->idle_timeout);
+
+ g_source_set_callback (timeout, (GSourceFunc) on_timeout, self, NULL);
+ g_source_attach (timeout, self->context);
+ }
+ GST_OBJECT_UNLOCK (self);
+
while (!self->message) {
if (!self->running) {
- g_mutex_unlock (&self->lock);
- return GST_FLOW_EOS;
+ ret = GST_FLOW_EOS;
+ goto out;
}
if (self->flushing) {
- g_mutex_unlock (&self->lock);
- return GST_FLOW_FLUSHING;
+ ret = GST_FLOW_FLUSHING;
+ goto out;
+ }
+ if (self->timeout) {
+ GST_DEBUG_OBJECT (self, "Idle timeout, return EOS");
+ ret = GST_FLOW_EOS;
+ goto out;
}
g_cond_wait (&self->cond, &self->lock);
}
+ if (timeout) {
+ g_source_destroy (timeout);
+ g_source_unref (timeout);
+ }
+
message = self->message;
self->message = NULL;
g_cond_signal (&self->cond);
gst_buffer_unref (message);
return GST_FLOW_OK;
+
+out:
+ g_mutex_unlock (&self->lock);
+ if (timeout) {
+ g_source_destroy (timeout);
+ g_source_unref (timeout);
+ }
+
+ return ret;
+}
+
+static gboolean
+main_loop_running_cb (GstRtmp2Src * self)
+{
+ GST_TRACE_OBJECT (self, "Main loop running now");
+
+ g_mutex_lock (&self->lock);
+ self->started = TRUE;
+ g_cond_broadcast (&self->cond);
+ g_mutex_unlock (&self->lock);
+
+ return G_SOURCE_REMOVE;
}
/* Mainloop task */
GMainContext *context;
GMainLoop *loop;
GTask *connector;
+ GSource *source;
GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting");
g_mutex_lock (&self->lock);
context = self->context = g_main_context_new ();
g_main_context_push_thread_default (context);
loop = self->loop = g_main_loop_new (context, TRUE);
+
+ source = g_idle_source_new ();
+ g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, self,
+ NULL);
+ g_source_attach (source, self->context);
+ g_source_unref (source);
+
connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
g_clear_pointer (&self->stats, gst_structure_free);