/* returns:
* GST_RTSP_OK when a complete message was read.
- * GST_RTSP_EEOF: when the socket is closed
+ * GST_RTSP_EEOF: when the read socket is closed
* GST_RTSP_EINTR: when more data is needed.
* GST_RTSP_..: some other error occured.
*/
return GST_RTSP_OK;
}
-#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR)
-#define WRITE_COND (G_IO_OUT | G_IO_ERR)
+#define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
+#define READ_COND (G_IO_IN | READ_ERR)
+#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
+#define WRITE_COND (G_IO_OUT | WRITE_ERR)
typedef struct
{
GPollFD readfd;
GPollFD writefd;
- gboolean write_added;
/* queued message for transmission */
guint id;
gpointer user_data G_GNUC_UNUSED)
{
GstRTSPWatch *watch = (GstRTSPWatch *) source;
- GstRTSPResult res;
+ GstRTSPResult res = GST_RTSP_ERROR;
+ gboolean keep_running = TRUE;
/* first read as much as we can */
if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
do {
+ if (watch->readfd.revents & READ_ERR)
+ goto read_error;
+
res = build_next (&watch->builder, &watch->message, watch->conn);
if (res == GST_RTSP_EINTR)
break;
- else if (G_UNLIKELY (res == GST_RTSP_EEOF))
- goto eof;
- else if (G_LIKELY (res == GST_RTSP_OK)) {
+ else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
+ watch->readfd.events = 0;
+ watch->readfd.revents = 0;
+ g_source_remove_poll ((GSource *) watch, &watch->readfd);
+ /* When we are in tunnelled mode, the read socket can be closed and we
+ * should be prepared for a new POST method to reopen it */
+ if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) {
+ /* remove the read connection for the tunnel */
+ /* we accept a new POST request */
+ watch->conn->tstate = TUNNEL_STATE_GET;
+ /* and signal that we lost our tunnel */
+ if (watch->funcs.tunnel_lost)
+ res = watch->funcs.tunnel_lost (watch, watch->user_data);
+ goto read_done;
+ } else
+ goto eof;
+ } else if (G_LIKELY (res == GST_RTSP_OK)) {
if (!watch->conn->manual_http &&
watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
if (watch->conn->tstate == TUNNEL_STATE_NONE &&
watch->funcs.message_received (watch, &watch->message,
watch->user_data);
} else {
- if (watch->funcs.error_full)
- GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
- 0, watch->user_data), error);
- else
- goto error;
+ goto read_error;
}
read_done:
}
if (watch->writefd.revents & WRITE_COND) {
+ if (watch->writefd.revents & WRITE_ERR)
+ goto write_error;
+
g_mutex_lock (watch->mutex);
do {
if (watch->write_data == NULL) {
/* get a new message from the queue */
rec = g_queue_pop_tail (watch->messages);
if (rec == NULL)
- goto done;
+ break;
watch->write_off = 0;
watch->write_data = rec->data;
res = write_bytes (watch->writefd.fd, watch->write_data,
&watch->write_off, watch->write_size);
g_mutex_unlock (watch->mutex);
+
if (res == GST_RTSP_EINTR)
goto write_blocked;
else if (G_LIKELY (res == GST_RTSP_OK)) {
if (watch->funcs.message_sent)
watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
} else {
- if (watch->funcs.error_full)
- GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
- watch->write_id, watch->user_data), error);
- else
- goto error;
+ goto write_error;
}
g_mutex_lock (watch->mutex);
watch->write_data = NULL;
} while (TRUE);
- done:
- if (watch->write_added) {
- g_source_remove_poll ((GSource *) watch, &watch->writefd);
- watch->write_added = FALSE;
- watch->writefd.revents = 0;
- }
+ watch->writefd.events = WRITE_ERR;
g_mutex_unlock (watch->mutex);
}
write_blocked:
- return TRUE;
+ return keep_running;
/* ERRORS */
eof:
{
if (watch->funcs.closed)
watch->funcs.closed (watch, watch->user_data);
+
+ /* always stop when the readfd returns EOF in non-tunneled mode */
return FALSE;
}
+read_error:
+ {
+ watch->readfd.events = 0;
+ watch->readfd.revents = 0;
+ g_source_remove_poll ((GSource *) watch, &watch->readfd);
+ keep_running = (watch->writefd.events != 0);
+
+ if (keep_running) {
+ if (watch->funcs.error_full)
+ GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
+ 0, watch->user_data), error);
+ else
+ goto error;
+ } else
+ goto eof;
+ }
+write_error:
+ {
+ watch->writefd.events = 0;
+ watch->writefd.revents = 0;
+ g_source_remove_poll ((GSource *) watch, &watch->writefd);
+ keep_running = (watch->readfd.events != 0);
+
+ if (keep_running) {
+ if (watch->funcs.error_full)
+ GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
+ watch->write_id, watch->user_data), error);
+ else
+ goto error;
+ } else
+ goto eof;
+ }
error:
{
if (watch->funcs.error)
watch->funcs.error (watch, res, watch->user_data);
- return FALSE;
+
+ return keep_running;
}
}
g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
g_queue_free (watch->messages);
watch->messages = NULL;
+ g_free (watch->write_data);
g_mutex_free (watch->mutex);
- g_free (watch->write_data);
-
if (watch->notify)
watch->notify (watch->user_data);
}
result->user_data = user_data;
result->notify = notify;
- /* only add the read fd, the write fd is only added when we have data
- * to send. */
- g_source_add_poll ((GSource *) result, &result->readfd);
-
return result;
}
watch->readfd.revents = 0;
watch->writefd.fd = watch->conn->writefd->fd;
- watch->writefd.events = WRITE_COND;
+ watch->writefd.events = WRITE_ERR;
watch->writefd.revents = 0;
- watch->write_added = FALSE;
- g_source_add_poll ((GSource *) watch, &watch->readfd);
+ if (watch->readfd.fd != -1)
+ g_source_add_poll ((GSource *) watch, &watch->readfd);
+ if (watch->writefd.fd != -1)
+ g_source_add_poll ((GSource *) watch, &watch->writefd);
}
/**
GstRTSPResult res;
GstRTSPRec *rec;
guint off = 0;
+ GMainContext *context = NULL;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
g_mutex_lock (watch->mutex);
+ /* try to send the message synchronously first */
if (watch->messages->length == 0) {
res = write_bytes (watch->writefd.fd, data, &off, size);
if (res != GST_RTSP_EINTR) {
}
}
- /* make a record with the data and id */
+ /* make a record with the data and id for sending async */
rec = g_slice_new (GstRTSPRec);
if (off == 0) {
rec->data = (guint8 *) data;
/* make sure the main context will now also check for writability on the
* socket */
- if (!watch->write_added) {
- g_source_add_poll ((GSource *) watch, &watch->writefd);
- watch->write_added = TRUE;
+ if (watch->writefd.events != WRITE_COND) {
+ watch->writefd.events = WRITE_COND;
+ context = ((GSource *) watch)->context;
}
if (id != NULL)
done:
g_mutex_unlock (watch->mutex);
+
+ if (context)
+ g_main_context_wakeup (context);
+
return res;
}
guint size)
{
GstRTSPRec *rec;
+ GMainContext *context = NULL;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
/* make sure the main context will now also check for writability on the
* socket */
- if (!watch->write_added) {
- g_source_add_poll ((GSource *) watch, &watch->writefd);
- watch->write_added = TRUE;
+ if (watch->writefd.events != WRITE_COND) {
+ watch->writefd.events = WRITE_COND;
+ context = ((GSource *) watch)->context;
}
-
g_mutex_unlock (watch->mutex);
+
+ if (context)
+ g_main_context_wakeup (context);
+
return rec->id;
}
#endif /* GST_REMOVE_DEPRECATED */