From: Sebastian Dröge Date: Mon, 17 Sep 2018 14:03:45 +0000 (+0300) Subject: rtsp-connection: Make use of new GstRTSPMessage API for directly storing a body buffe... X-Git-Tag: 1.19.3~511^2~1254 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=f90dac8d4813b604f87b10cd959d0e5f4d9b16a2;p=platform%2Fupstream%2Fgstreamer.git rtsp-connection: Make use of new GstRTSPMessage API for directly storing a body buffer and add API for writing multiple messages By doing so we can send a whole GstBufferList and each memory in the contained buffers without copying into a single memory area and with a single writev() call. This improves performance considerably for high-packet-rate streams. This depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333 to be efficient, otherwise each chunk of memory is a separate write() call. https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/370 --- diff --git a/docs/libs/gst-plugins-base-libs-sections.txt b/docs/libs/gst-plugins-base-libs-sections.txt index 46fb30999b..6bbad3b481 100644 --- a/docs/libs/gst-plugins-base-libs-sections.txt +++ b/docs/libs/gst-plugins-base-libs-sections.txt @@ -1840,6 +1840,7 @@ gst_rtsp_connection_write gst_rtsp_connection_poll gst_rtsp_connection_send +gst_rtsp_connection_send_messages gst_rtsp_connection_receive gst_rtsp_connection_next_timeout @@ -1892,6 +1893,7 @@ gst_rtsp_watch_unref gst_rtsp_watch_attach gst_rtsp_watch_reset gst_rtsp_watch_send_message +gst_rtsp_watch_send_messages gst_rtsp_watch_write_data gst_rtsp_watch_get_send_backlog gst_rtsp_watch_set_send_backlog diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 48e17d65e6..322e77b8b7 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -89,6 +89,46 @@ typedef struct guint coutl; } DecodeCtx; +typedef struct +{ + /* If %TRUE we only own data and none of the + * other fields + */ + gboolean borrowed; + + /* Header or full message */ + guint8 *data; + guint data_size; + gboolean data_is_data_header; + + /* Payload following data, if any */ + guint8 *body_data; + guint body_data_size; + /* or */ + GstBuffer *body_buffer; + + /* DATA packet header statically allocated for above */ + guint8 data_header[4]; + + /* all below only for async writing */ + + guint data_offset; /* == data_size when done */ + guint body_offset; /* into body_data or the buffer */ + + /* ID of the message for notification */ + guint id; +} GstRTSPSerializedMessage; + +static void +gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg) +{ + if (!msg->borrowed) { + g_free (msg->body_data); + gst_buffer_replace (&msg->body_buffer, NULL); + } + g_free (msg->data); +} + #ifdef MSG_NOSIGNAL #define SEND_FLAGS MSG_NOSIGNAL #else @@ -1205,6 +1245,100 @@ error: } } +/* NOTE: This changes the values of vectors if multiple iterations are needed! + * + * Depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333 + */ +#if GLIB_CHECK_VERSION(2, 59, 0) +static GstRTSPResult +writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors, + gsize * bytes_written, gboolean block, GCancellable * cancellable) +{ + gsize _bytes_written = 0; + gsize written; + GError *err = NULL; + + while (n_vectors > 0) { + gboolean res; + + if (block) + res = g_output_stream_writev (stream, vectors, n_vectors, &written, + cancellable, &err); + else + res = + g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM + (stream), vectors, n_vectors, &written, cancellable, &err); + _bytes_written += written; + if (G_UNLIKELY (!res)) + goto error; + + /* skip vectors that have been written in full */ + while (written >= vectors[0].size) { + written -= vectors[0].size; + ++vectors; + --n_vectors; + } + + /* skip partially written vector data */ + if (written > 0) { + vectors[0].size -= written; + vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written; + } + } + + *bytes_written = _bytes_written; + + return GST_RTSP_OK; + + /* ERRORS */ +error: + { + *bytes_written = _bytes_written; + + if (G_UNLIKELY (written == 0)) + return GST_RTSP_EEOF; + + GST_DEBUG ("%s", err->message); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + return GST_RTSP_ETIMEOUT; + } + g_clear_error (&err); + return GST_RTSP_ESYS; + } +} +#else +static GstRTSPResult +writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors, + gsize * bytes_written, gboolean block, GCancellable * cancellable) +{ + gsize _bytes_written = 0; + guint written; + gint i; + GstRTSPResult res; + + for (i = 0; i < n_vectors; i++) { + written = 0; + res = + write_bytes (stream, vectors[i].buffer, &written, vectors[i].size, + block, cancellable); + _bytes_written += written; + if (G_UNLIKELY (res != GST_RTSP_OK)) + break; + } + + *bytes_written = _bytes_written; + + return res; +} +#endif + static gint fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, gboolean block, GError ** err) @@ -1465,6 +1599,7 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size, * * Returns: #GST_RTSP_OK on success. */ +/* FIXME 2.0: This should've been static! */ GstRTSPResult gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, guint size, GTimeVal * timeout) @@ -1490,15 +1625,22 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, return res; } -static GString * -message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) +static gboolean +serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message, + GstRTSPSerializedMessage * serialized_message) { GString *str = NULL; - str = g_string_new (""); + memset (serialized_message, 0, sizeof (*serialized_message)); + + /* Initially we borrow the body_data / body_buffer fields from + * the message */ + serialized_message->borrowed = TRUE; switch (message->type) { case GST_RTSP_MESSAGE_REQUEST: + str = g_string_new (""); + /* create request string, add CSeq */ g_string_append_printf (str, "%s %s RTSP/%s\r\n" "CSeq: %d\r\n", @@ -1516,12 +1658,16 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) add_auth_header (conn, message); break; case GST_RTSP_MESSAGE_RESPONSE: + str = g_string_new (""); + /* create response string */ g_string_append_printf (str, "RTSP/%s %d %s\r\n", gst_rtsp_version_as_text (message->type_data.response.version), message->type_data.response.code, message->type_data.response.reason); break; case GST_RTSP_MESSAGE_HTTP_REQUEST: + str = g_string_new (""); + /* create request string */ g_string_append_printf (str, "%s %s HTTP/%s\r\n", gst_rtsp_method_as_text (message->type_data.request.method), @@ -1531,6 +1677,8 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) add_auth_header (conn, message); break; case GST_RTSP_MESSAGE_HTTP_RESPONSE: + str = g_string_new (""); + /* create response string */ g_string_append_printf (str, "HTTP/%s %d %s\r\n", gst_rtsp_version_as_text (message->type_data.request.version), @@ -1538,7 +1686,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) break; case GST_RTSP_MESSAGE_DATA: { - guint8 data_header[4]; + guint8 *data_header = serialized_message->data_header; /* prepare data header */ data_header[0] = '$'; @@ -1546,16 +1694,22 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) data_header[2] = (message->body_size >> 8) & 0xff; data_header[3] = message->body_size & 0xff; - /* create string with header and data */ - str = g_string_append_len (str, (gchar *) data_header, 4); - str = - g_string_append_len (str, (gchar *) message->body, - message->body_size); + /* create serialized message with header and data */ + serialized_message->data_is_data_header = TRUE; + serialized_message->data_size = 4; + + if (message->body) { + serialized_message->body_data = message->body; + serialized_message->body_data_size = message->body_size; + } else { + g_assert (message->body_buffer != NULL); + serialized_message->body_buffer = message->body_buffer; + } break; } default: g_string_free (str, TRUE); - g_return_val_if_reached (NULL); + g_return_val_if_reached (FALSE); break; } @@ -1563,6 +1717,8 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) if (message->type != GST_RTSP_MESSAGE_DATA) { gchar date_string[100]; + g_assert (str != NULL); + gen_date_string (date_string, sizeof (date_string)); /* add date header */ @@ -1573,7 +1729,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) gst_rtsp_message_append_headers (message, str); /* append Content-Length and body if needed */ - if (message->body != NULL && message->body_size > 0) { + if (message->body_size > 0) { gchar *len; len = g_strdup_printf ("%d", message->body_size); @@ -1582,16 +1738,24 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) g_free (len); /* header ends here */ g_string_append (str, "\r\n"); - str = - g_string_append_len (str, (gchar *) message->body, - message->body_size); + + if (message->body) { + serialized_message->body_data = message->body; + serialized_message->body_data_size = message->body_size; + } else { + g_assert (message->body_buffer != NULL); + serialized_message->body_buffer = message->body_buffer; + } } else { /* just end headers */ g_string_append (str, "\r\n"); } + + serialized_message->data_size = str->len; + serialized_message->data = (guint8 *) g_string_free (str, FALSE); } - return str; + return TRUE; } /** @@ -1612,36 +1776,190 @@ GstRTSPResult gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout) { - GString *string = NULL; + g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); + + return gst_rtsp_connection_send_messages (conn, message, 1, timeout); +} + +/** + * gst_rtsp_connection_send_messages: + * @conn: a #GstRTSPConnection + * @messages: (array length=n_messages): the messages to send + * @n_messages: the number of messages to send + * @timeout: a timeout value or %NULL + * + * Attempt to send @messages to the connected @conn, blocking up to + * the specified @timeout. @timeout can be %NULL, in which case this function + * might block forever. + * + * This function can be cancelled with gst_rtsp_connection_flush(). + * + * Returns: #GST_RTSP_OK on success. + * + * Since: 1.16 + */ +GstRTSPResult +gst_rtsp_connection_send_messages (GstRTSPConnection * conn, + GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout) +{ + GstClockTime to; GstRTSPResult res; - gchar *str; - gsize len; + GstRTSPSerializedMessage *serialized_messages; + GOutputVector *vectors; + GstMapInfo *map_infos; + guint n_vectors, n_memories; + gint i, j, k; + gsize bytes_to_write, bytes_written; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL); + + serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages); + memset (serialized_messages, 0, + sizeof (GstRTSPSerializedMessage) * n_messages); + + for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages; + i++) { + if (G_UNLIKELY (!serialize_message (conn, &messages[i], + &serialized_messages[i]))) + goto no_message; + + if (conn->tunneled) { + gint state = 0, save = 0; + gchar *base64_buffer, *out_buffer; + gsize written = 0; + gsize in_length; + + in_length = serialized_messages[i].data_size; + if (serialized_messages[i].body_data) + in_length += serialized_messages[i].body_data_size; + else if (serialized_messages[i].body_buffer) + in_length += gst_buffer_get_size (serialized_messages[i].body_buffer); + + in_length = (in_length / 3 + 1) * 4 + 4 + 1; + base64_buffer = out_buffer = g_malloc0 (in_length); + + written = + g_base64_encode_step (serialized_messages[i].data_is_data_header ? + serialized_messages[i].data_header : serialized_messages[i].data, + serialized_messages[i].data_size, FALSE, out_buffer, &state, &save); + out_buffer += written; + + if (serialized_messages[i].body_data) { + written = + g_base64_encode_step (serialized_messages[i].body_data, + serialized_messages[i].body_data_size, FALSE, out_buffer, &state, + &save); + out_buffer += written; + } else if (serialized_messages[i].body_buffer) { + guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer); + + for (j = 0; j < n; j++) { + GstMemory *mem = + gst_buffer_peek_memory (serialized_messages[i].body_buffer, j); + GstMapInfo map; + + gst_memory_map (mem, &map, GST_MAP_READ); + + written = g_base64_encode_step (map.data, map.size, + FALSE, out_buffer, &state, &save); + out_buffer += written; + + gst_memory_unmap (mem, &map); + } + } - if (G_UNLIKELY (!(string = message_to_string (conn, message)))) - goto no_message; + written = g_base64_encode_close (FALSE, out_buffer, &state, &save); + out_buffer += written; - if (conn->tunneled) { - str = g_base64_encode ((const guchar *) string->str, string->len); - g_string_free (string, TRUE); - len = strlen (str); - } else { - str = string->str; - len = string->len; - g_string_free (string, FALSE); + gst_rtsp_serialized_message_clear (&serialized_messages[i]); + memset (&serialized_messages[i], 0, sizeof (serialized_messages[i])); + + serialized_messages[i].data = (guint8 *) base64_buffer; + serialized_messages[i].data_size = (out_buffer - base64_buffer) + 1; + n_vectors++; + } else { + n_vectors++; + if (serialized_messages[i].body_data) { + n_vectors++; + } else if (serialized_messages[i].body_buffer) { + n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer); + n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer); + } + } } - /* write request */ - res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout); + vectors = g_newa (GOutputVector, n_vectors); + map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL; + + for (i = 0, j = 0, k = 0; i < n_messages; i++) { + vectors[j].buffer = serialized_messages[i].data_is_data_header ? + serialized_messages[i].data_header : serialized_messages[i].data; + vectors[j].size = serialized_messages[i].data_size; + bytes_to_write += vectors[j].size; + j++; + + if (serialized_messages[i].body_data) { + vectors[j].buffer = serialized_messages[i].body_data; + vectors[j].size = serialized_messages[i].body_data_size; + bytes_to_write += vectors[j].size; + j++; + } else if (serialized_messages[i].body_buffer) { + gint l, n; + + n = gst_buffer_n_memory (serialized_messages[i].body_buffer); + for (l = 0; l < n; l++) { + GstMemory *mem = + gst_buffer_peek_memory (serialized_messages[i].body_buffer, l); + + gst_memory_map (mem, &map_infos[k], GST_MAP_READ); + vectors[j].buffer = map_infos[k].data; + vectors[j].size = map_infos[k].size; + bytes_to_write += vectors[j].size; + + k++; + j++; + } + } + } + + /* write request: this is synchronous */ + to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; + + g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND); + res = + writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written, + TRUE, conn->cancellable); + g_socket_set_timeout (conn->write_socket, 0); + + g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); - g_free (str); + /* free everything */ + for (i = 0, k = 0; i < n_messages; i++) { + if (serialized_messages[i].body_buffer) { + gint l, n; + + n = gst_buffer_n_memory (serialized_messages[i].body_buffer); + for (l = 0; l < n; l++) { + GstMemory *mem = + gst_buffer_peek_memory (serialized_messages[i].body_buffer, l); + + gst_memory_unmap (mem, &map_infos[k]); + k++; + } + } + + g_free (serialized_messages[i].data); + } return res; no_message: { + for (i = 0; i < n_messages; i++) { + gst_rtsp_serialized_message_clear (&serialized_messages[i]); + } g_warning ("Wrong message"); return GST_RTSP_EINVAL; } @@ -3136,13 +3454,6 @@ gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn) #define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) #define WRITE_COND (G_IO_OUT | WRITE_ERR) -typedef struct -{ - guint8 *data; - guint size; - guint id; -} GstRTSPRec; - /* async functions */ struct _GstRTSPWatch { @@ -3164,10 +3475,8 @@ struct _GstRTSPWatch GMutex mutex; GstQueueArray *messages; gsize messages_bytes; - guint8 *write_data; - guint write_off; - guint write_size; - guint write_id; + guint messages_count; + gsize max_bytes; guint max_messages; GCond queue_not_full; @@ -3180,7 +3489,7 @@ struct _GstRTSPWatch }; #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \ - ((w)->max_messages != 0 && gst_queue_array_get_length((w)->messages) >= (w)->max_messages)) + ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages)) static gboolean gst_rtsp_source_prepare (GSource * source, gint * timeout) @@ -3426,71 +3735,251 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream, GstRTSPConnection *conn = watch->conn; /* if this connection was already closed, stop now */ - if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream) + if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream || + !watch->messages) goto eof; g_mutex_lock (&watch->mutex); do { - if (watch->write_data == NULL) { - GstRTSPRec *rec_ptr, rec; - - /* get a new message from the queue */ - rec_ptr = gst_queue_array_pop_head_struct (watch->messages); - if (rec_ptr == NULL) { - if (watch->writesrc) { - if (!g_source_is_destroyed ((GSource *) watch)) - g_source_remove_child_source ((GSource *) watch, watch->writesrc); - g_source_unref (watch->writesrc); - watch->writesrc = NULL; - /* we create and add the write source again when we actually have - * something to write */ - - /* since write source is now removed we add read source on the write - * socket instead to be able to detect when client closes get channel - * in tunneled mode */ - if (watch->conn->control_stream) { - watch->controlsrc = - g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM - (watch->conn->control_stream), NULL); - g_source_set_callback (watch->controlsrc, - (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, - NULL); - g_source_add_child_source ((GSource *) watch, watch->controlsrc); - } else { - watch->controlsrc = NULL; + guint n_messages = gst_queue_array_get_length (watch->messages); + GOutputVector *vectors; + GstMapInfo *map_infos; + guint *ids; + gsize bytes_to_write, bytes_written; + guint n_vectors, n_memories, n_ids, drop_messages; + gint i, j, k, l; + GstRTSPSerializedMessage *msg; + + /* if this connection was already closed, stop now */ + if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream || + !watch->messages) { + g_mutex_unlock (&watch->mutex); + goto eof; + } + + if (n_messages == 0) { + if (watch->writesrc) { + if (!g_source_is_destroyed ((GSource *) watch)) + g_source_remove_child_source ((GSource *) watch, watch->writesrc); + g_source_unref (watch->writesrc); + watch->writesrc = NULL; + /* we create and add the write source again when we actually have + * something to write */ + + /* since write source is now removed we add read source on the write + * socket instead to be able to detect when client closes get channel + * in tunneled mode */ + if (watch->conn->control_stream) { + watch->controlsrc = + g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM + (watch->conn->control_stream), NULL); + g_source_set_callback (watch->controlsrc, + (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, + NULL); + g_source_add_child_source ((GSource *) watch, watch->controlsrc); + } else { + watch->controlsrc = NULL; + } + } + break; + } + + for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) { + msg = gst_queue_array_peek_nth_struct (watch->messages, i); + if (msg->id != 0) + n_ids++; + + if (msg->data_offset < msg->data_size) + n_vectors++; + + if (msg->body_data && msg->body_offset < msg->body_data_size) { + n_vectors++; + } else if (msg->body_buffer) { + guint m, n; + guint offset = 0; + + n = gst_buffer_n_memory (msg->body_buffer); + for (m = 0; m < n; m++) { + GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m); + + /* Skip all memories we already wrote */ + if (offset + mem->size < msg->body_offset) { + offset += mem->size; + continue; } + offset += mem->size; + + n_memories++; + n_vectors++; } - break; } + } - rec = *rec_ptr; - watch->messages_bytes -= rec.size; + vectors = g_newa (GOutputVector, n_vectors); + map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL; + ids = n_ids ? g_newa (guint, n_ids + 1) : NULL; + if (ids) + memset (ids, 0, sizeof (guint) * (n_ids + 1)); + + for (i = 0, j = 0, k = 0, l = 0, bytes_to_write = 0; i < n_messages; i++) { + msg = gst_queue_array_peek_nth_struct (watch->messages, i); + + if (msg->data_offset < msg->data_size) { + vectors[j].buffer = (msg->data_is_data_header ? + msg->data_header : msg->data) + msg->data_offset; + vectors[j].size = msg->data_size - msg->data_offset; + bytes_to_write += vectors[j].size; + j++; + } + + if (msg->body_data) { + if (msg->body_offset < msg->body_data_size) { + vectors[j].buffer = msg->body_data + msg->body_offset; + vectors[j].size = msg->body_data_size - msg->body_offset; + bytes_to_write += vectors[j].size; + j++; + } + } else if (msg->body_buffer) { + guint m, n; + guint offset = 0; + + n = gst_buffer_n_memory (msg->body_buffer); + for (m = 0; m < n; m++) { + GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m); + guint off; + + /* Skip all memories we already wrote */ + if (offset + mem->size < msg->body_offset) { + offset += mem->size; + continue; + } + + if (offset < msg->body_offset) { + off = msg->body_offset - offset; + } else { + offset += mem->size; + off = 0; + } + + g_assert (off < mem->size); + + gst_memory_map (mem, &map_infos[k], GST_MAP_READ); + vectors[j].buffer = map_infos[k].data + off; + vectors[j].size = map_infos[k].size - off; + bytes_to_write += vectors[j].size; + + k++; + j++; + } + } + } - watch->write_off = 0; - watch->write_data = rec.data; - watch->write_size = rec.size; - watch->write_id = rec.id; + res = + writev_bytes (watch->conn->output_stream, vectors, n_vectors, + &bytes_written, FALSE, watch->conn->cancellable); + g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); + + /* First unmap all memories here, this simplifies the code below + * as we don't have to skip all memories that were already written + * before */ + for (k = 0; k < n_memories; k++) { + gst_memory_unmap (map_infos[k].memory, &map_infos[k]); } - res = write_bytes (conn->output_stream, watch->write_data, - &watch->write_off, watch->write_size, FALSE, conn->cancellable); + if (bytes_written == bytes_to_write) { + /* fast path, just unmap all memories, free memory, drop all messages and notify them */ + l = 0; + while ((msg = gst_queue_array_pop_head_struct (watch->messages))) { + if (msg->id) { + ids[l] = msg->id; + l++; + } + + gst_rtsp_serialized_message_clear (msg); + } + + g_assert (watch->messages_bytes >= bytes_written); + watch->messages_bytes -= bytes_written; + } else if (bytes_written > 0) { + /* not done, let's skip all messages that were sent already and free them */ + for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) { + msg = gst_queue_array_peek_nth_struct (watch->messages, i); + + if (bytes_written >= 0) { + if (bytes_written >= msg->data_size) { + guint body_size; + + /* all data of this message is sent, check body and otherwise + * skip the whole message for next time */ + msg->data_offset = msg->data_size; + bytes_written -= msg->data_size; + + if (msg->body_data) { + body_size = msg->body_data_size; + + } else if (msg->body_buffer) { + body_size = gst_buffer_get_size (msg->body_buffer); + } else { + body_size = 0; + } + + if (bytes_written >= body_size) { + /* body written, drop this message */ + msg->body_offset = body_size; + bytes_written -= body_size; + drop_messages++; + + if (msg->id) { + ids[l] = msg->id; + l++; + } + + gst_rtsp_serialized_message_clear (msg); + } else { + msg->body_offset = bytes_written; + bytes_written = 0; + } + } else { + /* Need to continue sending from the data of this message */ + msg->data_offset = bytes_written; + bytes_written = 0; + } + } + } + + while (drop_messages > 0) { + msg = gst_queue_array_pop_head_struct (watch->messages); + g_assert (msg); + drop_messages--; + } + + g_assert (watch->messages_bytes >= bytes_written); + watch->messages_bytes -= bytes_written; + } if (!IS_BACKLOG_FULL (watch)) g_cond_signal (&watch->queue_not_full); g_mutex_unlock (&watch->mutex); - if (res == GST_RTSP_EINTR) + /* notify all messages that were successfully written */ + if (ids) { + while (*ids) { + /* only decrease the counter for messages that have an id. Only + * the last message of a messages chunk is counted */ + watch->messages_count--; + + if (watch->funcs.message_sent) + watch->funcs.message_sent (watch, *ids, watch->user_data); + ids++; + } + } + + if (res == GST_RTSP_EINTR) { goto write_blocked; - else if (G_LIKELY (res == GST_RTSP_OK)) { - if (watch->funcs.message_sent) - watch->funcs.message_sent (watch, watch->write_id, watch->user_data); - } else { + } else if (G_UNLIKELY (res != GST_RTSP_OK)) { goto write_error; } g_mutex_lock (&watch->mutex); - - g_free (watch->write_data); - watch->write_data = NULL; } while (TRUE); g_mutex_unlock (&watch->mutex); @@ -3504,29 +3993,29 @@ eof: } write_error: { - if (watch->funcs.error_full) - watch->funcs.error_full (watch, res, NULL, - watch->write_id, watch->user_data); - else if (watch->funcs.error) + if (watch->funcs.error_full) { + guint i, n_messages; + + n_messages = gst_queue_array_get_length (watch->messages); + for (i = 0; i < n_messages; i++) { + GstRTSPSerializedMessage *msg = + gst_queue_array_peek_nth_struct (watch->messages, i); + if (msg->id) + watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data); + } + } else if (watch->funcs.error) { watch->funcs.error (watch, res, watch->user_data); + } return FALSE; } } -static void -gst_rtsp_rec_clear (gpointer data) -{ - GstRTSPRec *rec = data; - - g_free (rec->data); -} - static void gst_rtsp_source_finalize (GSource * source) { GstRTSPWatch *watch = (GstRTSPWatch *) source; - GstRTSPRec *rec; + GstRTSPSerializedMessage *msg; if (watch->notify) watch->notify (watch->user_data); @@ -3534,13 +4023,14 @@ gst_rtsp_source_finalize (GSource * source) build_reset (&watch->builder); gst_rtsp_message_unset (&watch->message); - while ((rec = gst_queue_array_pop_head_struct (watch->messages))) - gst_rtsp_rec_clear (rec); + while ((msg = gst_queue_array_pop_head_struct (watch->messages))) { + gst_rtsp_serialized_message_clear (msg); + } gst_queue_array_free (watch->messages); watch->messages = NULL; watch->messages_bytes = 0; + watch->messages_count = 0; - g_free (watch->write_data); g_cond_clear (&watch->queue_not_full); if (watch->readsrc) @@ -3598,7 +4088,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->builder.state = STATE_START; g_mutex_init (&result->mutex); - result->messages = gst_queue_array_new_for_struct (sizeof (GstRTSPRec), 10); + result->messages = + gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10); g_cond_init (&result->queue_not_full); gst_rtsp_watch_reset (result); @@ -3756,83 +4247,190 @@ gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch, g_mutex_unlock (&watch->mutex); } -/** - * gst_rtsp_watch_write_data: - * @watch: a #GstRTSPWatch - * @data: (array length=size) (transfer full): the data to queue - * @size: the size of @data - * @id: (out) (allow-none): location for a message ID or %NULL - * - * Write @data using the connection of the @watch. If it cannot be sent - * immediately, it will be queued for transmission in @watch. The contents of - * @message will then be serialized and transmitted when the connection of the - * @watch becomes writable. In case the @message is queued, the ID returned in - * @id will be non-zero and used as the ID argument in the message_sent - * callback. - * - * This function will take ownership of @data and g_free() it after use. - * - * If the amount of queued data exceeds the limits set with - * gst_rtsp_watch_set_send_backlog(), this function will return - * #GST_RTSP_ENOMEM. - * - * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits - * are reached. #GST_RTSP_EINTR when @watch was flushing. - */ -GstRTSPResult -gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, - guint size, guint * id) +static GstRTSPResult +gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch, + GstRTSPSerializedMessage * messages, guint n_messages, guint * id) { GstRTSPResult res; - GstRTSPRec rec; - guint off = 0; GMainContext *context = NULL; + gint i; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (size != 0, GST_RTSP_EINVAL); + g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL); g_mutex_lock (&watch->mutex); if (watch->flushing) goto flushing; /* try to send the message synchronously first */ - if (gst_queue_array_get_length (watch->messages) == 0 - && watch->write_data == NULL) { + if (gst_queue_array_get_length (watch->messages) == 0) { + gint j, k; + GOutputVector *vectors; + GstMapInfo *map_infos; + gsize bytes_to_write, bytes_written; + guint n_vectors, n_memories, drop_messages; + + for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) { + n_vectors++; + if (messages[i].body_data) { + n_vectors++; + } else if (messages[i].body_buffer) { + n_vectors += gst_buffer_n_memory (messages[i].body_buffer); + n_memories += gst_buffer_n_memory (messages[i].body_buffer); + } + } + + vectors = g_newa (GOutputVector, n_vectors); + map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL; + + for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) { + vectors[j].buffer = messages[i].data_is_data_header ? + messages[i].data_header : messages[i].data; + vectors[j].size = messages[i].data_size; + bytes_to_write += vectors[j].size; + j++; + + if (messages[i].body_data) { + vectors[j].buffer = messages[i].body_data; + vectors[j].size = messages[i].body_data_size; + bytes_to_write += vectors[j].size; + j++; + } else if (messages[i].body_buffer) { + gint l, n; + + n = gst_buffer_n_memory (messages[i].body_buffer); + for (l = 0; l < n; l++) { + GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l); + + gst_memory_map (mem, &map_infos[k], GST_MAP_READ); + vectors[j].buffer = map_infos[k].data; + vectors[j].size = map_infos[k].size; + bytes_to_write += vectors[j].size; + + k++; + j++; + } + } + } + res = - write_bytes (watch->conn->output_stream, data, &off, size, FALSE, - watch->conn->cancellable); + writev_bytes (watch->conn->output_stream, vectors, n_vectors, + &bytes_written, FALSE, watch->conn->cancellable); + g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); + + /* At this point we sent everything we could without blocking or + * error and updated the offsets inside the message accordingly */ + + /* First of all unmap all memories. This simplifies the code below */ + for (k = 0; k < n_memories; k++) { + gst_memory_unmap (map_infos[k].memory, &map_infos[k]); + } + if (res != GST_RTSP_EINTR) { + /* actual error or done completely */ if (id != NULL) *id = 0; - g_free ((gpointer) data); + + /* free everything */ + for (i = 0, k = 0; i < n_messages; i++) { + gst_rtsp_serialized_message_clear (&messages[i]); + } + goto done; } + + /* not done, let's skip all messages that were sent already and free them */ + for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) { + if (bytes_written >= 0) { + if (bytes_written >= messages[i].data_size) { + guint body_size; + + /* all data of this message is sent, check body and otherwise + * skip the whole message for next time */ + messages[i].data_offset = messages[i].data_size; + bytes_written -= messages[i].data_size; + + if (messages[i].body_data) { + body_size = messages[i].body_data_size; + + } else if (messages[i].body_buffer) { + body_size = gst_buffer_get_size (messages[i].body_buffer); + } else { + body_size = 0; + } + + if (bytes_written >= body_size) { + /* body written, drop this message */ + messages[i].body_offset = body_size; + bytes_written -= body_size; + drop_messages++; + + gst_rtsp_serialized_message_clear (&messages[i]); + } else { + messages[i].body_offset = bytes_written; + bytes_written = 0; + } + } else { + /* Need to continue sending from the data of this message */ + messages[i].data_offset = bytes_written; + bytes_written = 0; + } + } + } + + g_assert (n_messages > drop_messages); + + messages += drop_messages; + n_messages -= drop_messages; } /* check limits */ if (IS_BACKLOG_FULL (watch)) goto too_much_backlog; - /* make a record with the data and id for sending async */ - memset (&rec, 0, sizeof (rec)); - if (off == 0) { - rec.data = (guint8 *) data; - rec.size = size; - } else { - rec.data = g_memdup (data + off, size - off); - rec.size = size - off; - g_free ((gpointer) data); - } + for (i = 0; i < n_messages; i++) { + GstRTSPSerializedMessage local_message; - do { - /* make sure rec->id is never 0 */ - rec.id = ++watch->id; - } while (G_UNLIKELY (rec.id == 0)); + /* make a record with the data and id for sending async */ + local_message = messages[i]; + + /* copy the body data or take an additional reference to the body buffer + * we don't own them here */ + if (local_message.body_data) { + local_message.body_data = + g_memdup (local_message.body_data, local_message.body_data_size); + } else if (local_message.body_buffer) { + gst_buffer_ref (local_message.body_buffer); + } + local_message.borrowed = FALSE; + + /* set an id for the very last message */ + if (i == n_messages - 1) { + do { + /* make sure rec->id is never 0 */ + local_message.id = ++watch->id; + } while (G_UNLIKELY (local_message.id == 0)); - /* add the record to a queue. */ - gst_queue_array_push_tail_struct (watch->messages, &rec); - watch->messages_bytes += rec.size; + if (id != NULL) + *id = local_message.id; + } else { + local_message.id = 0; + } + + /* add the record to a queue. */ + gst_queue_array_push_tail_struct (watch->messages, &local_message); + watch->messages_bytes += + (local_message.data_size - local_message.data_offset); + if (local_message.body_data) + watch->messages_bytes += + (local_message.body_data_size - local_message.body_offset); + else if (local_message.body_buffer) + watch->messages_bytes += + (gst_buffer_get_size (local_message.body_buffer) - + local_message.body_offset); + } + /* each message chunks is one unit */ + watch->messages_count++; /* make sure the main context will now also check for writability on the * socket */ @@ -3853,9 +4451,6 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL); g_source_add_child_source ((GSource *) watch, watch->writesrc); } - - if (id != NULL) - *id = rec.id; res = GST_RTSP_OK; done: @@ -3871,19 +4466,62 @@ flushing: { GST_DEBUG ("we are flushing"); g_mutex_unlock (&watch->mutex); - g_free ((gpointer) data); + for (i = 0; i < n_messages; i++) { + gst_rtsp_serialized_message_clear (&messages[i]); + } return GST_RTSP_EINTR; } too_much_backlog: { GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %" G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes, - watch->messages_bytes, watch->max_messages, - gst_queue_array_get_length (watch->messages)); + watch->messages_bytes, watch->max_messages, watch->messages_count); g_mutex_unlock (&watch->mutex); - g_free ((gpointer) data); + for (i = 0; i < n_messages; i++) { + gst_rtsp_serialized_message_clear (&messages[i]); + } return GST_RTSP_ENOMEM; } + + return GST_RTSP_OK; +} + +/** + * gst_rtsp_watch_write_data: + * @watch: a #GstRTSPWatch + * @data: (array length=size) (transfer full): the data to queue + * @size: the size of @data + * @id: (out) (allow-none): location for a message ID or %NULL + * + * Write @data using the connection of the @watch. If it cannot be sent + * immediately, it will be queued for transmission in @watch. The contents of + * @message will then be serialized and transmitted when the connection of the + * @watch becomes writable. In case the @message is queued, the ID returned in + * @id will be non-zero and used as the ID argument in the message_sent + * callback. + * + * This function will take ownership of @data and g_free() it after use. + * + * If the amount of queued data exceeds the limits set with + * gst_rtsp_watch_set_send_backlog(), this function will return + * #GST_RTSP_ENOMEM. + * + * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits + * are reached. #GST_RTSP_EINTR when @watch was flushing. + */ +/* FIXME 2.0: This should've been static! */ +GstRTSPResult +gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, + guint size, guint * id) +{ + GstRTSPSerializedMessage serialized_message; + + memset (&serialized_message, 0, sizeof (serialized_message)); + serialized_message.data = (guint8 *) data; + serialized_message.data_size = size; + + return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message, + 1, id); } /** @@ -3905,17 +4543,59 @@ GstRTSPResult gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message, guint * id) { - GString *str; - guint size; - g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); - /* make a record with the message as a string and id */ - str = message_to_string (watch->conn, message); - size = str->len; - return gst_rtsp_watch_write_data (watch, - (guint8 *) g_string_free (str, FALSE), size, id); + return gst_rtsp_watch_send_messages (watch, message, 1, id); +} + +/** + * gst_rtsp_watch_send_messages: + * @watch: a #GstRTSPWatch + * @messages: (array length=n_messages): the messages to send + * @n_messages: the number of messages to send + * @id: (out) (allow-none): location for a message ID or %NULL + * + * Sends @messages using the connection of the @watch. If they cannot be sent + * immediately, they will be queued for transmission in @watch. The contents of + * @messages will then be serialized and transmitted when the connection of the + * @watch becomes writable. In case the @messages are queued, the ID returned in + * @id will be non-zero and used as the ID argument in the message_sent + * callback once the last message is sent. The callback will only be called + * once for the last message. + * + * Returns: #GST_RTSP_OK on success. + * + * Since: 1.16 + */ +GstRTSPResult +gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages, + guint n_messages, guint * id) +{ + GstRTSPSerializedMessage *serialized_messages; + gint i; + + g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL); + + serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages); + memset (serialized_messages, 0, + sizeof (GstRTSPSerializedMessage) * n_messages); + + for (i = 0; i < n_messages; i++) { + if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i])) + goto error; + } + + return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages, + n_messages, id); + +error: + for (i = 0; i < n_messages; i++) { + gst_rtsp_serialized_message_clear (&serialized_messages[i]); + } + + return GST_RTSP_EINVAL; } /** @@ -4005,10 +4685,11 @@ gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing) watch->flushing = flushing; g_cond_signal (&watch->queue_not_full); if (flushing) { - GstRTSPRec *rec; + GstRTSPSerializedMessage *msg; - while ((rec = gst_queue_array_pop_head_struct (watch->messages))) - gst_rtsp_rec_clear (rec); + while ((msg = gst_queue_array_pop_head_struct (watch->messages))) { + gst_rtsp_serialized_message_clear (msg); + } } g_mutex_unlock (&watch->mutex); } diff --git a/gst-libs/gst/rtsp/gstrtspconnection.h b/gst-libs/gst/rtsp/gstrtspconnection.h index adea8af0b7..b27f813105 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.h +++ b/gst-libs/gst/rtsp/gstrtspconnection.h @@ -136,6 +136,10 @@ GST_RTSP_API GstRTSPResult gst_rtsp_connection_send (GstRTSPConnection *conn, GstRTSPMessage *message, GTimeVal *timeout); +GST_RTSP_API +GstRTSPResult gst_rtsp_connection_send_messages (GstRTSPConnection *conn, GstRTSPMessage *messages, guint n_messages, + GTimeVal *timeout); + GST_RTSP_API GstRTSPResult gst_rtsp_connection_receive (GstRTSPConnection *conn, GstRTSPMessage *message, GTimeVal *timeout); @@ -313,6 +317,12 @@ GstRTSPResult gst_rtsp_watch_send_message (GstRTSPWatch *watch, GstRTSPMessage *message, guint *id); +GST_RTSP_API +GstRTSPResult gst_rtsp_watch_send_messages (GstRTSPWatch *watch, + GstRTSPMessage *messages, + guint n_messages, + guint *id); + GST_RTSP_API GstRTSPResult gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal *timeout);