guint coutl;
} DecodeCtx;
+typedef struct
+{
+ /* If %TRUE we only own data and none of the
+ * other fields
+ */
+ gboolean borrowed;
+
+ /* Header or full message */
+ guint8 *data;
+ guint data_size;
+ gboolean data_is_data_header;
+
+ /* Payload following data, if any */
+ guint8 *body_data;
+ guint body_data_size;
+ /* or */
+ GstBuffer *body_buffer;
+
+ /* DATA packet header statically allocated for above */
+ guint8 data_header[4];
+
+ /* all below only for async writing */
+
+ guint data_offset; /* == data_size when done */
+ guint body_offset; /* into body_data or the buffer */
+
+ /* ID of the message for notification */
+ guint id;
+} GstRTSPSerializedMessage;
+
+static void
+gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
+{
+ if (!msg->borrowed) {
+ g_free (msg->body_data);
+ gst_buffer_replace (&msg->body_buffer, NULL);
+ }
+ g_free (msg->data);
+}
+
#ifdef MSG_NOSIGNAL
#define SEND_FLAGS MSG_NOSIGNAL
#else
}
}
+/* NOTE: This changes the values of vectors if multiple iterations are needed!
+ *
+ * Depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333
+ */
+#if GLIB_CHECK_VERSION(2, 59, 0)
+static GstRTSPResult
+writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
+ gsize * bytes_written, gboolean block, GCancellable * cancellable)
+{
+ gsize _bytes_written = 0;
+ gsize written;
+ GError *err = NULL;
+
+ while (n_vectors > 0) {
+ gboolean res;
+
+ if (block)
+ res = g_output_stream_writev (stream, vectors, n_vectors, &written,
+ cancellable, &err);
+ else
+ res =
+ g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
+ (stream), vectors, n_vectors, &written, cancellable, &err);
+ _bytes_written += written;
+ if (G_UNLIKELY (!res))
+ goto error;
+
+ /* skip vectors that have been written in full */
+ while (written >= vectors[0].size) {
+ written -= vectors[0].size;
+ ++vectors;
+ --n_vectors;
+ }
+
+ /* skip partially written vector data */
+ if (written > 0) {
+ vectors[0].size -= written;
+ vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
+ }
+ }
+
+ *bytes_written = _bytes_written;
+
+ return GST_RTSP_OK;
+
+ /* ERRORS */
+error:
+ {
+ *bytes_written = _bytes_written;
+
+ if (G_UNLIKELY (written == 0))
+ return GST_RTSP_EEOF;
+
+ GST_DEBUG ("%s", err->message);
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ g_clear_error (&err);
+ return GST_RTSP_EINTR;
+ } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_clear_error (&err);
+ return GST_RTSP_EINTR;
+ } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
+ g_clear_error (&err);
+ return GST_RTSP_ETIMEOUT;
+ }
+ g_clear_error (&err);
+ return GST_RTSP_ESYS;
+ }
+}
+#else
+static GstRTSPResult
+writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
+ gsize * bytes_written, gboolean block, GCancellable * cancellable)
+{
+ gsize _bytes_written = 0;
+ guint written;
+ gint i;
+ GstRTSPResult res;
+
+ for (i = 0; i < n_vectors; i++) {
+ written = 0;
+ res =
+ write_bytes (stream, vectors[i].buffer, &written, vectors[i].size,
+ block, cancellable);
+ _bytes_written += written;
+ if (G_UNLIKELY (res != GST_RTSP_OK))
+ break;
+ }
+
+ *bytes_written = _bytes_written;
+
+ return res;
+}
+#endif
+
static gint
fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
gboolean block, GError ** err)
*
* Returns: #GST_RTSP_OK on success.
*/
+/* FIXME 2.0: This should've been static! */
GstRTSPResult
gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
guint size, GTimeVal * timeout)
return res;
}
-static GString *
-message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
+static gboolean
+serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
+ GstRTSPSerializedMessage * serialized_message)
{
GString *str = NULL;
- str = g_string_new ("");
+ memset (serialized_message, 0, sizeof (*serialized_message));
+
+ /* Initially we borrow the body_data / body_buffer fields from
+ * the message */
+ serialized_message->borrowed = TRUE;
switch (message->type) {
case GST_RTSP_MESSAGE_REQUEST:
+ str = g_string_new ("");
+
/* create request string, add CSeq */
g_string_append_printf (str, "%s %s RTSP/%s\r\n"
"CSeq: %d\r\n",
add_auth_header (conn, message);
break;
case GST_RTSP_MESSAGE_RESPONSE:
+ str = g_string_new ("");
+
/* create response string */
g_string_append_printf (str, "RTSP/%s %d %s\r\n",
gst_rtsp_version_as_text (message->type_data.response.version),
message->type_data.response.code, message->type_data.response.reason);
break;
case GST_RTSP_MESSAGE_HTTP_REQUEST:
+ str = g_string_new ("");
+
/* create request string */
g_string_append_printf (str, "%s %s HTTP/%s\r\n",
gst_rtsp_method_as_text (message->type_data.request.method),
add_auth_header (conn, message);
break;
case GST_RTSP_MESSAGE_HTTP_RESPONSE:
+ str = g_string_new ("");
+
/* create response string */
g_string_append_printf (str, "HTTP/%s %d %s\r\n",
gst_rtsp_version_as_text (message->type_data.request.version),
break;
case GST_RTSP_MESSAGE_DATA:
{
- guint8 data_header[4];
+ guint8 *data_header = serialized_message->data_header;
/* prepare data header */
data_header[0] = '$';
data_header[2] = (message->body_size >> 8) & 0xff;
data_header[3] = message->body_size & 0xff;
- /* create string with header and data */
- str = g_string_append_len (str, (gchar *) data_header, 4);
- str =
- g_string_append_len (str, (gchar *) message->body,
- message->body_size);
+ /* create serialized message with header and data */
+ serialized_message->data_is_data_header = TRUE;
+ serialized_message->data_size = 4;
+
+ if (message->body) {
+ serialized_message->body_data = message->body;
+ serialized_message->body_data_size = message->body_size;
+ } else {
+ g_assert (message->body_buffer != NULL);
+ serialized_message->body_buffer = message->body_buffer;
+ }
break;
}
default:
g_string_free (str, TRUE);
- g_return_val_if_reached (NULL);
+ g_return_val_if_reached (FALSE);
break;
}
if (message->type != GST_RTSP_MESSAGE_DATA) {
gchar date_string[100];
+ g_assert (str != NULL);
+
gen_date_string (date_string, sizeof (date_string));
/* add date header */
gst_rtsp_message_append_headers (message, str);
/* append Content-Length and body if needed */
- if (message->body != NULL && message->body_size > 0) {
+ if (message->body_size > 0) {
gchar *len;
len = g_strdup_printf ("%d", message->body_size);
g_free (len);
/* header ends here */
g_string_append (str, "\r\n");
- str =
- g_string_append_len (str, (gchar *) message->body,
- message->body_size);
+
+ if (message->body) {
+ serialized_message->body_data = message->body;
+ serialized_message->body_data_size = message->body_size;
+ } else {
+ g_assert (message->body_buffer != NULL);
+ serialized_message->body_buffer = message->body_buffer;
+ }
} else {
/* just end headers */
g_string_append (str, "\r\n");
}
+
+ serialized_message->data_size = str->len;
+ serialized_message->data = (guint8 *) g_string_free (str, FALSE);
}
- return str;
+ return TRUE;
}
/**
gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
GTimeVal * timeout)
{
- GString *string = NULL;
+ g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
+ g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
+
+ return gst_rtsp_connection_send_messages (conn, message, 1, timeout);
+}
+
+/**
+ * gst_rtsp_connection_send_messages:
+ * @conn: a #GstRTSPConnection
+ * @messages: (array length=n_messages): the messages to send
+ * @n_messages: the number of messages to send
+ * @timeout: a timeout value or %NULL
+ *
+ * Attempt to send @messages to the connected @conn, blocking up to
+ * the specified @timeout. @timeout can be %NULL, in which case this function
+ * might block forever.
+ *
+ * This function can be cancelled with gst_rtsp_connection_flush().
+ *
+ * Returns: #GST_RTSP_OK on success.
+ *
+ * Since: 1.16
+ */
+GstRTSPResult
+gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
+ GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
+{
+ GstClockTime to;
GstRTSPResult res;
- gchar *str;
- gsize len;
+ GstRTSPSerializedMessage *serialized_messages;
+ GOutputVector *vectors;
+ GstMapInfo *map_infos;
+ guint n_vectors, n_memories;
+ gint i, j, k;
+ gsize bytes_to_write, bytes_written;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
- g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
+ g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
+
+ serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
+ memset (serialized_messages, 0,
+ sizeof (GstRTSPSerializedMessage) * n_messages);
+
+ for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
+ i++) {
+ if (G_UNLIKELY (!serialize_message (conn, &messages[i],
+ &serialized_messages[i])))
+ goto no_message;
+
+ if (conn->tunneled) {
+ gint state = 0, save = 0;
+ gchar *base64_buffer, *out_buffer;
+ gsize written = 0;
+ gsize in_length;
+
+ in_length = serialized_messages[i].data_size;
+ if (serialized_messages[i].body_data)
+ in_length += serialized_messages[i].body_data_size;
+ else if (serialized_messages[i].body_buffer)
+ in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
+
+ in_length = (in_length / 3 + 1) * 4 + 4 + 1;
+ base64_buffer = out_buffer = g_malloc0 (in_length);
+
+ written =
+ g_base64_encode_step (serialized_messages[i].data_is_data_header ?
+ serialized_messages[i].data_header : serialized_messages[i].data,
+ serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
+ out_buffer += written;
+
+ if (serialized_messages[i].body_data) {
+ written =
+ g_base64_encode_step (serialized_messages[i].body_data,
+ serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
+ &save);
+ out_buffer += written;
+ } else if (serialized_messages[i].body_buffer) {
+ guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
+
+ for (j = 0; j < n; j++) {
+ GstMemory *mem =
+ gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
+ GstMapInfo map;
+
+ gst_memory_map (mem, &map, GST_MAP_READ);
+
+ written = g_base64_encode_step (map.data, map.size,
+ FALSE, out_buffer, &state, &save);
+ out_buffer += written;
+
+ gst_memory_unmap (mem, &map);
+ }
+ }
- if (G_UNLIKELY (!(string = message_to_string (conn, message))))
- goto no_message;
+ written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
+ out_buffer += written;
- if (conn->tunneled) {
- str = g_base64_encode ((const guchar *) string->str, string->len);
- g_string_free (string, TRUE);
- len = strlen (str);
- } else {
- str = string->str;
- len = string->len;
- g_string_free (string, FALSE);
+ gst_rtsp_serialized_message_clear (&serialized_messages[i]);
+ memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
+
+ serialized_messages[i].data = (guint8 *) base64_buffer;
+ serialized_messages[i].data_size = (out_buffer - base64_buffer) + 1;
+ n_vectors++;
+ } else {
+ n_vectors++;
+ if (serialized_messages[i].body_data) {
+ n_vectors++;
+ } else if (serialized_messages[i].body_buffer) {
+ n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
+ n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
+ }
+ }
}
- /* write request */
- res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);
+ vectors = g_newa (GOutputVector, n_vectors);
+ map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
+
+ for (i = 0, j = 0, k = 0; i < n_messages; i++) {
+ vectors[j].buffer = serialized_messages[i].data_is_data_header ?
+ serialized_messages[i].data_header : serialized_messages[i].data;
+ vectors[j].size = serialized_messages[i].data_size;
+ bytes_to_write += vectors[j].size;
+ j++;
+
+ if (serialized_messages[i].body_data) {
+ vectors[j].buffer = serialized_messages[i].body_data;
+ vectors[j].size = serialized_messages[i].body_data_size;
+ bytes_to_write += vectors[j].size;
+ j++;
+ } else if (serialized_messages[i].body_buffer) {
+ gint l, n;
+
+ n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
+ for (l = 0; l < n; l++) {
+ GstMemory *mem =
+ gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
+
+ gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
+ vectors[j].buffer = map_infos[k].data;
+ vectors[j].size = map_infos[k].size;
+ bytes_to_write += vectors[j].size;
+
+ k++;
+ j++;
+ }
+ }
+ }
+
+ /* write request: this is synchronous */
+ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
+
+ g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
+ res =
+ writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
+ TRUE, conn->cancellable);
+ g_socket_set_timeout (conn->write_socket, 0);
+
+ g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
- g_free (str);
+ /* free everything */
+ for (i = 0, k = 0; i < n_messages; i++) {
+ if (serialized_messages[i].body_buffer) {
+ gint l, n;
+
+ n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
+ for (l = 0; l < n; l++) {
+ GstMemory *mem =
+ gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
+
+ gst_memory_unmap (mem, &map_infos[k]);
+ k++;
+ }
+ }
+
+ g_free (serialized_messages[i].data);
+ }
return res;
no_message:
{
+ for (i = 0; i < n_messages; i++) {
+ gst_rtsp_serialized_message_clear (&serialized_messages[i]);
+ }
g_warning ("Wrong message");
return GST_RTSP_EINVAL;
}
#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
#define WRITE_COND (G_IO_OUT | WRITE_ERR)
-typedef struct
-{
- guint8 *data;
- guint size;
- guint id;
-} GstRTSPRec;
-
/* async functions */
struct _GstRTSPWatch
{
GMutex mutex;
GstQueueArray *messages;
gsize messages_bytes;
- guint8 *write_data;
- guint write_off;
- guint write_size;
- guint write_id;
+ guint messages_count;
+
gsize max_bytes;
guint max_messages;
GCond queue_not_full;
};
#define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
- ((w)->max_messages != 0 && gst_queue_array_get_length((w)->messages) >= (w)->max_messages))
+ ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
static gboolean
gst_rtsp_source_prepare (GSource * source, gint * timeout)
GstRTSPConnection *conn = watch->conn;
/* if this connection was already closed, stop now */
- if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream)
+ if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
+ !watch->messages)
goto eof;
g_mutex_lock (&watch->mutex);
do {
- if (watch->write_data == NULL) {
- GstRTSPRec *rec_ptr, rec;
-
- /* get a new message from the queue */
- rec_ptr = gst_queue_array_pop_head_struct (watch->messages);
- if (rec_ptr == NULL) {
- if (watch->writesrc) {
- if (!g_source_is_destroyed ((GSource *) watch))
- g_source_remove_child_source ((GSource *) watch, watch->writesrc);
- g_source_unref (watch->writesrc);
- watch->writesrc = NULL;
- /* we create and add the write source again when we actually have
- * something to write */
-
- /* since write source is now removed we add read source on the write
- * socket instead to be able to detect when client closes get channel
- * in tunneled mode */
- if (watch->conn->control_stream) {
- watch->controlsrc =
- g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
- (watch->conn->control_stream), NULL);
- g_source_set_callback (watch->controlsrc,
- (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
- NULL);
- g_source_add_child_source ((GSource *) watch, watch->controlsrc);
- } else {
- watch->controlsrc = NULL;
+ guint n_messages = gst_queue_array_get_length (watch->messages);
+ GOutputVector *vectors;
+ GstMapInfo *map_infos;
+ guint *ids;
+ gsize bytes_to_write, bytes_written;
+ guint n_vectors, n_memories, n_ids, drop_messages;
+ gint i, j, k, l;
+ GstRTSPSerializedMessage *msg;
+
+ /* if this connection was already closed, stop now */
+ if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
+ !watch->messages) {
+ g_mutex_unlock (&watch->mutex);
+ goto eof;
+ }
+
+ if (n_messages == 0) {
+ if (watch->writesrc) {
+ if (!g_source_is_destroyed ((GSource *) watch))
+ g_source_remove_child_source ((GSource *) watch, watch->writesrc);
+ g_source_unref (watch->writesrc);
+ watch->writesrc = NULL;
+ /* we create and add the write source again when we actually have
+ * something to write */
+
+ /* since write source is now removed we add read source on the write
+ * socket instead to be able to detect when client closes get channel
+ * in tunneled mode */
+ if (watch->conn->control_stream) {
+ watch->controlsrc =
+ g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
+ (watch->conn->control_stream), NULL);
+ g_source_set_callback (watch->controlsrc,
+ (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
+ NULL);
+ g_source_add_child_source ((GSource *) watch, watch->controlsrc);
+ } else {
+ watch->controlsrc = NULL;
+ }
+ }
+ break;
+ }
+
+ for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
+ msg = gst_queue_array_peek_nth_struct (watch->messages, i);
+ if (msg->id != 0)
+ n_ids++;
+
+ if (msg->data_offset < msg->data_size)
+ n_vectors++;
+
+ if (msg->body_data && msg->body_offset < msg->body_data_size) {
+ n_vectors++;
+ } else if (msg->body_buffer) {
+ guint m, n;
+ guint offset = 0;
+
+ n = gst_buffer_n_memory (msg->body_buffer);
+ for (m = 0; m < n; m++) {
+ GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
+
+ /* Skip all memories we already wrote */
+ if (offset + mem->size < msg->body_offset) {
+ offset += mem->size;
+ continue;
}
+ offset += mem->size;
+
+ n_memories++;
+ n_vectors++;
}
- break;
}
+ }
- rec = *rec_ptr;
- watch->messages_bytes -= rec.size;
+ vectors = g_newa (GOutputVector, n_vectors);
+ map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
+ ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
+ if (ids)
+ memset (ids, 0, sizeof (guint) * (n_ids + 1));
+
+ for (i = 0, j = 0, k = 0, l = 0, bytes_to_write = 0; i < n_messages; i++) {
+ msg = gst_queue_array_peek_nth_struct (watch->messages, i);
+
+ if (msg->data_offset < msg->data_size) {
+ vectors[j].buffer = (msg->data_is_data_header ?
+ msg->data_header : msg->data) + msg->data_offset;
+ vectors[j].size = msg->data_size - msg->data_offset;
+ bytes_to_write += vectors[j].size;
+ j++;
+ }
+
+ if (msg->body_data) {
+ if (msg->body_offset < msg->body_data_size) {
+ vectors[j].buffer = msg->body_data + msg->body_offset;
+ vectors[j].size = msg->body_data_size - msg->body_offset;
+ bytes_to_write += vectors[j].size;
+ j++;
+ }
+ } else if (msg->body_buffer) {
+ guint m, n;
+ guint offset = 0;
+
+ n = gst_buffer_n_memory (msg->body_buffer);
+ for (m = 0; m < n; m++) {
+ GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
+ guint off;
+
+ /* Skip all memories we already wrote */
+ if (offset + mem->size < msg->body_offset) {
+ offset += mem->size;
+ continue;
+ }
+
+ if (offset < msg->body_offset) {
+ off = msg->body_offset - offset;
+ } else {
+ offset += mem->size;
+ off = 0;
+ }
+
+ g_assert (off < mem->size);
+
+ gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
+ vectors[j].buffer = map_infos[k].data + off;
+ vectors[j].size = map_infos[k].size - off;
+ bytes_to_write += vectors[j].size;
+
+ k++;
+ j++;
+ }
+ }
+ }
- watch->write_off = 0;
- watch->write_data = rec.data;
- watch->write_size = rec.size;
- watch->write_id = rec.id;
+ res =
+ writev_bytes (watch->conn->output_stream, vectors, n_vectors,
+ &bytes_written, FALSE, watch->conn->cancellable);
+ g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
+
+ /* First unmap all memories here, this simplifies the code below
+ * as we don't have to skip all memories that were already written
+ * before */
+ for (k = 0; k < n_memories; k++) {
+ gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
}
- res = write_bytes (conn->output_stream, watch->write_data,
- &watch->write_off, watch->write_size, FALSE, conn->cancellable);
+ if (bytes_written == bytes_to_write) {
+ /* fast path, just unmap all memories, free memory, drop all messages and notify them */
+ l = 0;
+ while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
+ if (msg->id) {
+ ids[l] = msg->id;
+ l++;
+ }
+
+ gst_rtsp_serialized_message_clear (msg);
+ }
+
+ g_assert (watch->messages_bytes >= bytes_written);
+ watch->messages_bytes -= bytes_written;
+ } else if (bytes_written > 0) {
+ /* not done, let's skip all messages that were sent already and free them */
+ for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
+ msg = gst_queue_array_peek_nth_struct (watch->messages, i);
+
+ if (bytes_written >= 0) {
+ if (bytes_written >= msg->data_size) {
+ guint body_size;
+
+ /* all data of this message is sent, check body and otherwise
+ * skip the whole message for next time */
+ msg->data_offset = msg->data_size;
+ bytes_written -= msg->data_size;
+
+ if (msg->body_data) {
+ body_size = msg->body_data_size;
+
+ } else if (msg->body_buffer) {
+ body_size = gst_buffer_get_size (msg->body_buffer);
+ } else {
+ body_size = 0;
+ }
+
+ if (bytes_written >= body_size) {
+ /* body written, drop this message */
+ msg->body_offset = body_size;
+ bytes_written -= body_size;
+ drop_messages++;
+
+ if (msg->id) {
+ ids[l] = msg->id;
+ l++;
+ }
+
+ gst_rtsp_serialized_message_clear (msg);
+ } else {
+ msg->body_offset = bytes_written;
+ bytes_written = 0;
+ }
+ } else {
+ /* Need to continue sending from the data of this message */
+ msg->data_offset = bytes_written;
+ bytes_written = 0;
+ }
+ }
+ }
+
+ while (drop_messages > 0) {
+ msg = gst_queue_array_pop_head_struct (watch->messages);
+ g_assert (msg);
+ drop_messages--;
+ }
+
+ g_assert (watch->messages_bytes >= bytes_written);
+ watch->messages_bytes -= bytes_written;
+ }
if (!IS_BACKLOG_FULL (watch))
g_cond_signal (&watch->queue_not_full);
g_mutex_unlock (&watch->mutex);
- if (res == GST_RTSP_EINTR)
+ /* notify all messages that were successfully written */
+ if (ids) {
+ while (*ids) {
+ /* only decrease the counter for messages that have an id. Only
+ * the last message of a messages chunk is counted */
+ watch->messages_count--;
+
+ if (watch->funcs.message_sent)
+ watch->funcs.message_sent (watch, *ids, watch->user_data);
+ ids++;
+ }
+ }
+
+ if (res == GST_RTSP_EINTR) {
goto write_blocked;
- else if (G_LIKELY (res == GST_RTSP_OK)) {
- if (watch->funcs.message_sent)
- watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
- } else {
+ } else if (G_UNLIKELY (res != GST_RTSP_OK)) {
goto write_error;
}
g_mutex_lock (&watch->mutex);
-
- g_free (watch->write_data);
- watch->write_data = NULL;
} while (TRUE);
g_mutex_unlock (&watch->mutex);
}
write_error:
{
- if (watch->funcs.error_full)
- watch->funcs.error_full (watch, res, NULL,
- watch->write_id, watch->user_data);
- else if (watch->funcs.error)
+ if (watch->funcs.error_full) {
+ guint i, n_messages;
+
+ n_messages = gst_queue_array_get_length (watch->messages);
+ for (i = 0; i < n_messages; i++) {
+ GstRTSPSerializedMessage *msg =
+ gst_queue_array_peek_nth_struct (watch->messages, i);
+ if (msg->id)
+ watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data);
+ }
+ } else if (watch->funcs.error) {
watch->funcs.error (watch, res, watch->user_data);
+ }
return FALSE;
}
}
-static void
-gst_rtsp_rec_clear (gpointer data)
-{
- GstRTSPRec *rec = data;
-
- g_free (rec->data);
-}
-
static void
gst_rtsp_source_finalize (GSource * source)
{
GstRTSPWatch *watch = (GstRTSPWatch *) source;
- GstRTSPRec *rec;
+ GstRTSPSerializedMessage *msg;
if (watch->notify)
watch->notify (watch->user_data);
build_reset (&watch->builder);
gst_rtsp_message_unset (&watch->message);
- while ((rec = gst_queue_array_pop_head_struct (watch->messages)))
- gst_rtsp_rec_clear (rec);
+ while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
+ gst_rtsp_serialized_message_clear (msg);
+ }
gst_queue_array_free (watch->messages);
watch->messages = NULL;
watch->messages_bytes = 0;
+ watch->messages_count = 0;
- g_free (watch->write_data);
g_cond_clear (&watch->queue_not_full);
if (watch->readsrc)
result->builder.state = STATE_START;
g_mutex_init (&result->mutex);
- result->messages = gst_queue_array_new_for_struct (sizeof (GstRTSPRec), 10);
+ result->messages =
+ gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10);
g_cond_init (&result->queue_not_full);
gst_rtsp_watch_reset (result);
g_mutex_unlock (&watch->mutex);
}
-/**
- * gst_rtsp_watch_write_data:
- * @watch: a #GstRTSPWatch
- * @data: (array length=size) (transfer full): the data to queue
- * @size: the size of @data
- * @id: (out) (allow-none): location for a message ID or %NULL
- *
- * Write @data using the connection of the @watch. If it cannot be sent
- * immediately, it will be queued for transmission in @watch. The contents of
- * @message will then be serialized and transmitted when the connection of the
- * @watch becomes writable. In case the @message is queued, the ID returned in
- * @id will be non-zero and used as the ID argument in the message_sent
- * callback.
- *
- * This function will take ownership of @data and g_free() it after use.
- *
- * If the amount of queued data exceeds the limits set with
- * gst_rtsp_watch_set_send_backlog(), this function will return
- * #GST_RTSP_ENOMEM.
- *
- * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
- * are reached. #GST_RTSP_EINTR when @watch was flushing.
- */
-GstRTSPResult
-gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
- guint size, guint * id)
+static GstRTSPResult
+gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
+ GstRTSPSerializedMessage * messages, guint n_messages, guint * id)
{
GstRTSPResult res;
- GstRTSPRec rec;
- guint off = 0;
GMainContext *context = NULL;
+ gint i;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
- g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
- g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
+ g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL);
g_mutex_lock (&watch->mutex);
if (watch->flushing)
goto flushing;
/* try to send the message synchronously first */
- if (gst_queue_array_get_length (watch->messages) == 0
- && watch->write_data == NULL) {
+ if (gst_queue_array_get_length (watch->messages) == 0) {
+ gint j, k;
+ GOutputVector *vectors;
+ GstMapInfo *map_infos;
+ gsize bytes_to_write, bytes_written;
+ guint n_vectors, n_memories, drop_messages;
+
+ for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) {
+ n_vectors++;
+ if (messages[i].body_data) {
+ n_vectors++;
+ } else if (messages[i].body_buffer) {
+ n_vectors += gst_buffer_n_memory (messages[i].body_buffer);
+ n_memories += gst_buffer_n_memory (messages[i].body_buffer);
+ }
+ }
+
+ vectors = g_newa (GOutputVector, n_vectors);
+ map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
+
+ for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) {
+ vectors[j].buffer = messages[i].data_is_data_header ?
+ messages[i].data_header : messages[i].data;
+ vectors[j].size = messages[i].data_size;
+ bytes_to_write += vectors[j].size;
+ j++;
+
+ if (messages[i].body_data) {
+ vectors[j].buffer = messages[i].body_data;
+ vectors[j].size = messages[i].body_data_size;
+ bytes_to_write += vectors[j].size;
+ j++;
+ } else if (messages[i].body_buffer) {
+ gint l, n;
+
+ n = gst_buffer_n_memory (messages[i].body_buffer);
+ for (l = 0; l < n; l++) {
+ GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l);
+
+ gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
+ vectors[j].buffer = map_infos[k].data;
+ vectors[j].size = map_infos[k].size;
+ bytes_to_write += vectors[j].size;
+
+ k++;
+ j++;
+ }
+ }
+ }
+
res =
- write_bytes (watch->conn->output_stream, data, &off, size, FALSE,
- watch->conn->cancellable);
+ writev_bytes (watch->conn->output_stream, vectors, n_vectors,
+ &bytes_written, FALSE, watch->conn->cancellable);
+ g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
+
+ /* At this point we sent everything we could without blocking or
+ * error and updated the offsets inside the message accordingly */
+
+ /* First of all unmap all memories. This simplifies the code below */
+ for (k = 0; k < n_memories; k++) {
+ gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
+ }
+
if (res != GST_RTSP_EINTR) {
+ /* actual error or done completely */
if (id != NULL)
*id = 0;
- g_free ((gpointer) data);
+
+ /* free everything */
+ for (i = 0, k = 0; i < n_messages; i++) {
+ gst_rtsp_serialized_message_clear (&messages[i]);
+ }
+
goto done;
}
+
+ /* not done, let's skip all messages that were sent already and free them */
+ for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
+ if (bytes_written >= 0) {
+ if (bytes_written >= messages[i].data_size) {
+ guint body_size;
+
+ /* all data of this message is sent, check body and otherwise
+ * skip the whole message for next time */
+ messages[i].data_offset = messages[i].data_size;
+ bytes_written -= messages[i].data_size;
+
+ if (messages[i].body_data) {
+ body_size = messages[i].body_data_size;
+
+ } else if (messages[i].body_buffer) {
+ body_size = gst_buffer_get_size (messages[i].body_buffer);
+ } else {
+ body_size = 0;
+ }
+
+ if (bytes_written >= body_size) {
+ /* body written, drop this message */
+ messages[i].body_offset = body_size;
+ bytes_written -= body_size;
+ drop_messages++;
+
+ gst_rtsp_serialized_message_clear (&messages[i]);
+ } else {
+ messages[i].body_offset = bytes_written;
+ bytes_written = 0;
+ }
+ } else {
+ /* Need to continue sending from the data of this message */
+ messages[i].data_offset = bytes_written;
+ bytes_written = 0;
+ }
+ }
+ }
+
+ g_assert (n_messages > drop_messages);
+
+ messages += drop_messages;
+ n_messages -= drop_messages;
}
/* check limits */
if (IS_BACKLOG_FULL (watch))
goto too_much_backlog;
- /* make a record with the data and id for sending async */
- memset (&rec, 0, sizeof (rec));
- if (off == 0) {
- rec.data = (guint8 *) data;
- rec.size = size;
- } else {
- rec.data = g_memdup (data + off, size - off);
- rec.size = size - off;
- g_free ((gpointer) data);
- }
+ for (i = 0; i < n_messages; i++) {
+ GstRTSPSerializedMessage local_message;
- do {
- /* make sure rec->id is never 0 */
- rec.id = ++watch->id;
- } while (G_UNLIKELY (rec.id == 0));
+ /* make a record with the data and id for sending async */
+ local_message = messages[i];
+
+ /* copy the body data or take an additional reference to the body buffer
+ * we don't own them here */
+ if (local_message.body_data) {
+ local_message.body_data =
+ g_memdup (local_message.body_data, local_message.body_data_size);
+ } else if (local_message.body_buffer) {
+ gst_buffer_ref (local_message.body_buffer);
+ }
+ local_message.borrowed = FALSE;
+
+ /* set an id for the very last message */
+ if (i == n_messages - 1) {
+ do {
+ /* make sure rec->id is never 0 */
+ local_message.id = ++watch->id;
+ } while (G_UNLIKELY (local_message.id == 0));
- /* add the record to a queue. */
- gst_queue_array_push_tail_struct (watch->messages, &rec);
- watch->messages_bytes += rec.size;
+ if (id != NULL)
+ *id = local_message.id;
+ } else {
+ local_message.id = 0;
+ }
+
+ /* add the record to a queue. */
+ gst_queue_array_push_tail_struct (watch->messages, &local_message);
+ watch->messages_bytes +=
+ (local_message.data_size - local_message.data_offset);
+ if (local_message.body_data)
+ watch->messages_bytes +=
+ (local_message.body_data_size - local_message.body_offset);
+ else if (local_message.body_buffer)
+ watch->messages_bytes +=
+ (gst_buffer_get_size (local_message.body_buffer) -
+ local_message.body_offset);
+ }
+ /* each message chunks is one unit */
+ watch->messages_count++;
/* make sure the main context will now also check for writability on the
* socket */
(GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
g_source_add_child_source ((GSource *) watch, watch->writesrc);
}
-
- if (id != NULL)
- *id = rec.id;
res = GST_RTSP_OK;
done:
{
GST_DEBUG ("we are flushing");
g_mutex_unlock (&watch->mutex);
- g_free ((gpointer) data);
+ for (i = 0; i < n_messages; i++) {
+ gst_rtsp_serialized_message_clear (&messages[i]);
+ }
return GST_RTSP_EINTR;
}
too_much_backlog:
{
GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
- watch->messages_bytes, watch->max_messages,
- gst_queue_array_get_length (watch->messages));
+ watch->messages_bytes, watch->max_messages, watch->messages_count);
g_mutex_unlock (&watch->mutex);
- g_free ((gpointer) data);
+ for (i = 0; i < n_messages; i++) {
+ gst_rtsp_serialized_message_clear (&messages[i]);
+ }
return GST_RTSP_ENOMEM;
}
+
+ return GST_RTSP_OK;
+}
+
+/**
+ * gst_rtsp_watch_write_data:
+ * @watch: a #GstRTSPWatch
+ * @data: (array length=size) (transfer full): the data to queue
+ * @size: the size of @data
+ * @id: (out) (allow-none): location for a message ID or %NULL
+ *
+ * Write @data using the connection of the @watch. If it cannot be sent
+ * immediately, it will be queued for transmission in @watch. The contents of
+ * @message will then be serialized and transmitted when the connection of the
+ * @watch becomes writable. In case the @message is queued, the ID returned in
+ * @id will be non-zero and used as the ID argument in the message_sent
+ * callback.
+ *
+ * This function will take ownership of @data and g_free() it after use.
+ *
+ * If the amount of queued data exceeds the limits set with
+ * gst_rtsp_watch_set_send_backlog(), this function will return
+ * #GST_RTSP_ENOMEM.
+ *
+ * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
+ * are reached. #GST_RTSP_EINTR when @watch was flushing.
+ */
+/* FIXME 2.0: This should've been static! */
+GstRTSPResult
+gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
+ guint size, guint * id)
+{
+ GstRTSPSerializedMessage serialized_message;
+
+ memset (&serialized_message, 0, sizeof (serialized_message));
+ serialized_message.data = (guint8 *) data;
+ serialized_message.data_size = size;
+
+ return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message,
+ 1, id);
}
/**
gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
guint * id)
{
- GString *str;
- guint size;
-
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
- /* make a record with the message as a string and id */
- str = message_to_string (watch->conn, message);
- size = str->len;
- return gst_rtsp_watch_write_data (watch,
- (guint8 *) g_string_free (str, FALSE), size, id);
+ return gst_rtsp_watch_send_messages (watch, message, 1, id);
+}
+
+/**
+ * gst_rtsp_watch_send_messages:
+ * @watch: a #GstRTSPWatch
+ * @messages: (array length=n_messages): the messages to send
+ * @n_messages: the number of messages to send
+ * @id: (out) (allow-none): location for a message ID or %NULL
+ *
+ * Sends @messages using the connection of the @watch. If they cannot be sent
+ * immediately, they will be queued for transmission in @watch. The contents of
+ * @messages will then be serialized and transmitted when the connection of the
+ * @watch becomes writable. In case the @messages are queued, the ID returned in
+ * @id will be non-zero and used as the ID argument in the message_sent
+ * callback once the last message is sent. The callback will only be called
+ * once for the last message.
+ *
+ * Returns: #GST_RTSP_OK on success.
+ *
+ * Since: 1.16
+ */
+GstRTSPResult
+gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages,
+ guint n_messages, guint * id)
+{
+ GstRTSPSerializedMessage *serialized_messages;
+ gint i;
+
+ g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
+ g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
+
+ serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
+ memset (serialized_messages, 0,
+ sizeof (GstRTSPSerializedMessage) * n_messages);
+
+ for (i = 0; i < n_messages; i++) {
+ if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i]))
+ goto error;
+ }
+
+ return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages,
+ n_messages, id);
+
+error:
+ for (i = 0; i < n_messages; i++) {
+ gst_rtsp_serialized_message_clear (&serialized_messages[i]);
+ }
+
+ return GST_RTSP_EINVAL;
}
/**
watch->flushing = flushing;
g_cond_signal (&watch->queue_not_full);
if (flushing) {
- GstRTSPRec *rec;
+ GstRTSPSerializedMessage *msg;
- while ((rec = gst_queue_array_pop_head_struct (watch->messages)))
- gst_rtsp_rec_clear (rec);
+ while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
+ gst_rtsp_serialized_message_clear (msg);
+ }
}
g_mutex_unlock (&watch->mutex);
}