1 /* GStreamer RTMP Library
2 * Copyright (C) 2013 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
29 #include "rtmpconnection.h"
30 #include "rtmpchunkstream.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
35 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
36 #define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
38 #define READ_SIZE 8192
40 typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
42 struct _GstRtmpConnection
44 GObject parent_instance;
46 /* should be properties */
47 gboolean input_paused;
52 GSocketConnection *connection;
53 GCancellable *cancellable;
54 GSocketClient *socket_client;
55 GAsyncQueue *output_queue;
56 GMainContext *main_context;
58 GCancellable *outer_cancellable;
59 gulong cancel_handler_id;
61 GSource *input_source;
62 GByteArray *input_bytes;
63 guint input_needed_bytes;
64 GstRtmpChunkStreams *input_streams, *output_streams;
66 GList *expected_commands;
67 guint transaction_count;
69 GstRtmpConnectionMessageFunc input_handler;
70 gpointer input_handler_user_data;
71 GDestroyNotify input_handler_user_data_destroy;
73 GstRtmpConnectionFunc output_handler;
74 gpointer output_handler_user_data;
75 GDestroyNotify output_handler_user_data_destroy;
79 /* Protects the values below during concurrent access.
80 * - Taken by the loop thread when writing, but not reading.
81 * - Taken by other threads when reading (calling get_stats).
85 /* RTMP configuration */
86 guint32 in_chunk_size;
87 guint32 out_chunk_size, out_chunk_size_pending;
88 guint32 in_window_ack_size;
89 guint32 out_window_ack_size, out_window_ack_size_pending;
91 guint64 in_bytes_total;
92 guint64 out_bytes_total;
93 guint64 in_bytes_acked;
94 guint64 out_bytes_acked;
100 GObjectClass parent_class;
101 } GstRtmpConnectionClass;
105 static void gst_rtmp_connection_dispose (GObject * object);
106 static void gst_rtmp_connection_finalize (GObject * object);
107 static void gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
108 GCancellable * cancellable);
109 static void gst_rtmp_connection_emit_error (GstRtmpConnection * self,
111 static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
113 static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
114 static void gst_rtmp_connection_write_buffer_done (GObject * obj,
115 GAsyncResult * result, gpointer user_data);
116 static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
118 static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
119 static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
120 static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
121 connection, GstBuffer * buffer);
122 static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
123 connection, GstBuffer * buffer);
124 static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
126 static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
128 static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
130 static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
131 guint32 in_chunk_size);
132 static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
134 static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
135 self, guint32 in_chunk_size);
137 static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
139 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
143 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
146 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
150 gdouble transaction_id;
151 GstRtmpCommandCallback func;
156 transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
159 Transaction *data = g_slice_new (Transaction);
160 data->transaction_id = transaction_id;
162 data->user_data = user_data;
167 transaction_free (gpointer ptr)
169 Transaction *data = ptr;
170 g_slice_free (Transaction, data);
177 GstRtmpCommandCallback func;
181 static ExpectedCommand *
182 expected_command_new (guint32 stream_id, const gchar * command_name,
183 GstRtmpCommandCallback func, gpointer user_data)
185 ExpectedCommand *data = g_slice_new (ExpectedCommand);
186 data->stream_id = stream_id;
187 data->command_name = g_strdup (command_name);
189 data->user_data = user_data;
194 expected_command_free (gpointer ptr)
196 ExpectedCommand *data = ptr;
197 g_free (data->command_name);
198 g_slice_free (ExpectedCommand, data);
204 SIGNAL_STREAM_CONTROL,
209 static guint signals[N_SIGNALS] = { 0, };
213 static GstMemory *set_data_frame_value;
216 init_set_data_frame_value (void)
218 GstAmfNode *node = gst_amf_node_new_string ("@setDataFrame", -1);
219 GBytes *bytes = gst_amf_node_serialize (node);
221 const gchar *data = g_bytes_get_data (bytes, &size);
223 set_data_frame_value = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
224 (gpointer) data, size, 0, size, bytes, (GDestroyNotify) g_bytes_unref);
225 GST_MINI_OBJECT_FLAG_SET (set_data_frame_value,
226 GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED);
228 gst_amf_node_free (node);
231 /* class initialization */
233 G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
235 GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
236 "rtmpconnection", 0, "debug category for GstRtmpConnection class");
237 init_set_data_frame_value ());
240 gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
242 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
244 gobject_class->dispose = gst_rtmp_connection_dispose;
245 gobject_class->finalize = gst_rtmp_connection_finalize;
247 signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
248 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_ERROR);
250 signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
251 G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
252 G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
254 GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
258 gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
260 rtmpconnection->cancellable = g_cancellable_new ();
261 rtmpconnection->output_queue =
262 g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
263 rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
264 rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
266 rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
267 rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
269 rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
270 rtmpconnection->input_needed_bytes = 1;
272 g_mutex_init (&rtmpconnection->stats_lock);
276 gst_rtmp_connection_dispose (GObject * object)
278 GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
279 GST_DEBUG_OBJECT (rtmpconnection, "dispose");
281 /* clean up as possible. may be called multiple times */
283 gst_rtmp_connection_close (rtmpconnection);
284 g_cancellable_cancel (rtmpconnection->cancellable);
285 gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
286 gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
287 gst_rtmp_connection_set_cancellable (rtmpconnection, NULL);
289 G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
293 gst_rtmp_connection_finalize (GObject * object)
295 GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
296 GST_DEBUG_OBJECT (rtmpconnection, "finalize");
298 /* clean up object here */
300 g_mutex_clear (&rtmpconnection->stats_lock);
301 g_clear_object (&rtmpconnection->cancellable);
302 g_clear_object (&rtmpconnection->connection);
303 g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
304 g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
305 g_clear_pointer (&rtmpconnection->output_streams,
306 gst_rtmp_chunk_streams_free);
307 g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
308 g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
309 g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
311 G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
315 gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
317 return g_socket_connection_get_socket (sc->connection);
321 gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
322 GSocketConnection * connection)
326 sc->thread = g_thread_ref (g_thread_self ());
327 sc->main_context = g_main_context_ref_thread_default ();
328 sc->connection = g_object_ref (connection);
330 /* refs the socket because it's creating an input stream, which holds a ref */
331 is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
332 /* refs the socket because it's creating a socket source */
333 g_warn_if_fail (!sc->input_source);
335 g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
337 g_source_set_callback (sc->input_source,
338 (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
340 g_source_attach (sc->input_source, sc->main_context);
344 gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
345 GCancellable * cancellable)
347 g_cancellable_disconnect (self->outer_cancellable, self->cancel_handler_id);
348 g_clear_object (&self->outer_cancellable);
349 self->cancel_handler_id = 0;
351 if (cancellable == NULL)
354 self->outer_cancellable = g_object_ref (cancellable);
355 self->cancel_handler_id =
356 g_cancellable_connect (cancellable, G_CALLBACK (g_cancellable_cancel),
357 g_object_ref (self->cancellable), g_object_unref);
362 gst_rtmp_connection_new (GSocketConnection * connection,
363 GCancellable * cancellable)
365 GstRtmpConnection *sc;
367 sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
369 gst_rtmp_connection_set_socket_connection (sc, connection);
370 gst_rtmp_connection_set_cancellable (sc, cancellable);
376 cancel_all_commands (GstRtmpConnection * self, const gchar * reason)
380 for (l = self->transactions; l; l = g_list_next (l)) {
381 Transaction *cc = l->data;
382 GST_LOG_OBJECT (self, "calling transaction callback %s",
383 GST_DEBUG_FUNCPTR_NAME (cc->func));
384 cc->func (reason, NULL, cc->user_data);
386 g_list_free_full (self->transactions, transaction_free);
387 self->transactions = NULL;
389 for (l = self->expected_commands; l; l = g_list_next (l)) {
390 ExpectedCommand *cc = l->data;
391 GST_LOG_OBJECT (self, "calling expected command callback %s",
392 GST_DEBUG_FUNCPTR_NAME (cc->func));
393 cc->func (reason, NULL, cc->user_data);
395 g_list_free_full (self->expected_commands, expected_command_free);
396 self->expected_commands = NULL;
400 gst_rtmp_connection_close (GstRtmpConnection * self)
402 if (self->thread != g_thread_self ()) {
403 GST_ERROR_OBJECT (self, "Called from wrong thread");
406 g_cancellable_cancel (self->cancellable);
407 cancel_all_commands (self, "connection closed locally");
409 if (self->input_source) {
410 g_source_destroy (self->input_source);
411 g_clear_pointer (&self->input_source, g_source_unref);
414 if (self->connection) {
415 g_io_stream_close_async (G_IO_STREAM (self->connection),
416 G_PRIORITY_DEFAULT, NULL, NULL, NULL);
421 gst_rtmp_connection_close_and_unref (gpointer ptr)
423 GstRtmpConnection *connection;
425 g_return_if_fail (ptr);
427 connection = GST_RTMP_CONNECTION (ptr);
428 gst_rtmp_connection_close (connection);
429 g_object_unref (connection);
433 gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
434 GstRtmpConnectionMessageFunc callback, gpointer user_data,
435 GDestroyNotify user_data_destroy)
437 if (sc->input_handler_user_data_destroy) {
438 sc->input_handler_user_data_destroy (sc->input_handler_user_data);
441 sc->input_handler = callback;
442 sc->input_handler_user_data = user_data;
443 sc->input_handler_user_data_destroy = user_data_destroy;
447 gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
448 GstRtmpConnectionFunc callback, gpointer user_data,
449 GDestroyNotify user_data_destroy)
451 if (sc->output_handler_user_data_destroy) {
452 sc->output_handler_user_data_destroy (sc->output_handler_user_data);
455 sc->output_handler = callback;
456 sc->output_handler_user_data = user_data;
457 sc->output_handler_user_data_destroy = user_data_destroy;
461 gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
463 GstRtmpConnection *sc = user_data;
466 GError *error = NULL;
467 guint64 bytes_since_ack;
469 GST_TRACE_OBJECT (sc, "input ready");
471 oldsize = sc->input_bytes->len;
472 g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
474 g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
475 sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
476 g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
479 error = g_error_new (G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED,
480 "connection closed remotely");
485 gint code = error->code;
487 if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
488 code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
490 GST_DEBUG_OBJECT (sc, "read IO error %d %s, continuing",
491 code, error->message);
492 g_error_free (error);
493 return G_SOURCE_CONTINUE;
496 GST_ERROR_OBJECT (sc, "read error: %s %d %s",
497 g_quark_to_string (error->domain), code, error->message);
499 gst_rtmp_connection_emit_error (sc, error);
500 return G_SOURCE_REMOVE;
503 GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
505 g_mutex_lock (&sc->stats_lock);
506 sc->in_bytes_total += ret;
507 g_mutex_unlock (&sc->stats_lock);
509 bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
510 if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
511 gst_rtmp_connection_send_ack (sc);
514 gst_rtmp_connection_try_read (sc);
515 return G_SOURCE_CONTINUE;
519 gst_rtmp_connection_start_write (GstRtmpConnection * self)
522 GstBuffer *message, *chunks;
524 GstRtmpChunkStream *cstream;
530 message = g_async_queue_try_pop (self->output_queue);
535 meta = gst_buffer_get_rtmp_meta (message);
537 GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
541 if (gst_rtmp_message_is_protocol_control (message)) {
542 if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
543 GST_ERROR_OBJECT (self,
544 "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
549 cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
551 GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
556 chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
557 self->out_chunk_size);
559 GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
563 self->writing = TRUE;
564 if (self->output_handler) {
565 self->output_handler (self, self->output_handler_user_data);
568 os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
569 gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
570 self->cancellable, gst_rtmp_connection_write_buffer_done,
571 g_object_ref (self));
573 gst_buffer_unref (chunks);
576 gst_buffer_unref (message);
580 gst_rtmp_connection_emit_error (GstRtmpConnection * self, GError * error)
584 cancel_all_commands (self, error->message);
585 g_signal_emit (self, signals[SIGNAL_ERROR], 0, error);
588 g_error_free (error);
592 gst_rtmp_connection_write_buffer_done (GObject * obj,
593 GAsyncResult * result, gpointer user_data)
595 GOutputStream *os = G_OUTPUT_STREAM (obj);
596 GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
597 gsize bytes_written = 0;
598 GError *error = NULL;
601 self->writing = FALSE;
603 res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
604 &bytes_written, &error);
606 g_mutex_lock (&self->stats_lock);
607 self->out_bytes_total += bytes_written;
608 g_mutex_unlock (&self->stats_lock);
611 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
612 GST_INFO_OBJECT (self,
613 "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
615 GST_ERROR_OBJECT (self,
616 "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
617 error->message, bytes_written);
619 gst_rtmp_connection_emit_error (self, error);
620 g_object_unref (self);
624 GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
627 gst_rtmp_connection_apply_protocol_control (self);
628 gst_rtmp_connection_start_write (self);
629 g_object_unref (self);
633 gst_rtmp_connection_start_read (GstRtmpConnection * connection,
636 g_return_if_fail (needed_bytes > 0);
637 connection->input_needed_bytes = needed_bytes;
638 gst_rtmp_connection_try_read (connection);
642 gst_rtmp_connection_try_read (GstRtmpConnection * connection)
644 guint need = connection->input_needed_bytes,
645 len = connection->input_bytes->len;
648 GST_TRACE_OBJECT (connection, "got %u < %u bytes, need more", len, need);
652 GST_TRACE_OBJECT (connection, "got %u >= %u bytes, proceeding", len, need);
653 gst_rtmp_connection_do_read (connection);
657 gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
660 g_return_if_fail (size <= sc->input_bytes->len);
663 *outbytes = g_bytes_new (sc->input_bytes->data, size);
666 g_byte_array_remove_range (sc->input_bytes, 0, size);
670 gst_rtmp_connection_do_read (GstRtmpConnection * sc)
672 GByteArray *input_bytes = sc->input_bytes;
673 gsize needed_bytes = 1;
676 GstRtmpChunkStream *cstream;
677 guint32 chunk_stream_id, header_size, next_size;
680 chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
683 if (!chunk_stream_id) {
684 needed_bytes = input_bytes->len + 1;
688 cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
689 header_size = gst_rtmp_chunk_stream_parse_header (cstream,
690 input_bytes->data, input_bytes->len);
692 if (input_bytes->len < header_size) {
693 needed_bytes = header_size;
697 next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
698 sc->in_chunk_size, &data);
700 if (input_bytes->len < header_size + next_size) {
701 needed_bytes = header_size + next_size;
705 memcpy (data, input_bytes->data + header_size, next_size);
706 gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
708 next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
711 if (next_size == 0) {
712 GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
713 gst_rtmp_connection_handle_message (sc, buffer);
714 gst_buffer_unref (buffer);
718 gst_rtmp_connection_start_read (sc, needed_bytes);
722 gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
724 if (gst_rtmp_message_is_protocol_control (buffer)) {
725 gst_rtmp_connection_handle_protocol_control (sc, buffer);
729 if (gst_rtmp_message_is_user_control (buffer)) {
730 gst_rtmp_connection_handle_user_control (sc, buffer);
734 switch (gst_rtmp_message_get_type (buffer)) {
735 case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
736 gst_rtmp_connection_handle_cm (sc, buffer);
739 case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
740 gst_rtmp_connection_handle_aggregate (sc, buffer);
744 if (sc->input_handler) {
745 sc->input_handler (sc, buffer, sc->input_handler_user_data);
752 gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
758 guint32 first_ts = 0;
760 meta = gst_buffer_get_rtmp_meta (buffer);
761 g_return_if_fail (meta);
763 gst_buffer_map (buffer, &map, GST_MAP_READ);
764 GST_TRACE_OBJECT (connection, "got aggregate message");
766 /* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
767 * The payload is part of a FLV file.
769 * WARNING: This spec defines the payload to use an "RTMP message format"
770 * which misidentifies the format of the timestamps and omits the size of the
773 while (pos < map.size) {
774 gsize remaining = map.size - pos;
775 GstBuffer *submessage;
776 GstRtmpMeta *submeta;
777 GstRtmpFlvTagHeader header;
779 if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
780 GST_ERROR_OBJECT (connection,
781 "aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
782 GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
786 if (remaining < header.total_size) {
787 GST_ERROR_OBJECT (connection,
788 "aggregate contains incomplete message; want %" G_GSIZE_FORMAT
789 ", got %" G_GSIZE_FORMAT, header.total_size, remaining);
793 submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
794 GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
795 pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
797 GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
798 GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
799 GST_BUFFER_OFFSET_END (submessage) =
800 GST_BUFFER_OFFSET (submessage) + header.total_size;
802 submeta = gst_buffer_get_rtmp_meta (submessage);
805 submeta->type = header.type;
806 submeta->size = header.payload_size;
809 first_ts = header.timestamp;
811 guint32 ts_offset = header.timestamp - first_ts;
813 submeta->ts_delta += ts_offset;
814 GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
815 GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
818 gst_rtmp_buffer_dump (submessage, "<<< submessage");
819 gst_rtmp_connection_handle_message (connection, submessage);
820 gst_buffer_unref (submessage);
822 pos += header.total_size;
825 gst_buffer_unmap (buffer, &map);
829 gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
832 GstRtmpProtocolControl pc;
834 if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
835 GST_ERROR_OBJECT (connection, "can't parse protocol control message");
839 GST_LOG_OBJECT (connection, "got protocol control message %d:%s", pc.type,
840 gst_rtmp_message_type_get_nick (pc.type));
843 case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
844 GST_INFO_OBJECT (connection, "incoming chunk size %" G_GUINT32_FORMAT,
846 gst_rtmp_connection_handle_set_chunk_size (connection, pc.param);
849 case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
850 GST_ERROR_OBJECT (connection, "unimplemented: chunk abort, stream_id = %"
851 G_GUINT32_FORMAT, pc.param);
854 case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
855 GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
857 gst_rtmp_connection_handle_ack (connection, pc.param);
860 case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
861 GST_INFO_OBJECT (connection,
862 "incoming window ack size: %" G_GUINT32_FORMAT, pc.param);
863 gst_rtmp_connection_handle_window_ack_size (connection, pc.param);
866 case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
867 GST_FIXME_OBJECT (connection, "set peer bandwidth: %" G_GUINT32_FORMAT
868 ", %" G_GUINT32_FORMAT, pc.param, pc.param2);
869 /* FIXME this is not correct, but close enough */
870 gst_rtmp_connection_request_window_size (connection, pc.param);
874 GST_ERROR_OBJECT (connection, "unimplemented protocol control type %d:%s",
875 pc.type, gst_rtmp_message_type_get_nick (pc.type));
881 gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
884 GstRtmpUserControl uc;
886 if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
887 GST_ERROR_OBJECT (connection, "can't parse user control message");
891 GST_LOG_OBJECT (connection, "got user control message %d:%s", uc.type,
892 gst_rtmp_user_control_type_get_nick (uc.type));
895 case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
896 case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
897 case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
898 case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
899 GST_INFO_OBJECT (connection, "stream %u got %s", uc.param,
900 gst_rtmp_user_control_type_get_nick (uc.type));
901 g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
905 case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
906 GST_FIXME_OBJECT (connection, "ignoring set buffer length: %"
907 G_GUINT32_FORMAT ", %" G_GUINT32_FORMAT " ms", uc.param, uc.param2);
910 case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
911 GST_DEBUG_OBJECT (connection, "ping request: %" G_GUINT32_FORMAT,
913 gst_rtmp_connection_send_ping_response (connection, uc.param);
916 case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
917 GST_DEBUG_OBJECT (connection,
918 "ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
921 case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
922 GST_LOG_OBJECT (connection, "ignoring buffer empty: %" G_GUINT32_FORMAT,
926 case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
927 GST_LOG_OBJECT (connection, "ignoring buffer ready: %" G_GUINT32_FORMAT,
932 GST_ERROR_OBJECT (connection, "unimplemented user control type %d:%s",
933 uc.type, gst_rtmp_user_control_type_get_nick (uc.type));
939 gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
942 if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
943 GST_ERROR_OBJECT (self,
944 "peer requested chunk size %" G_GUINT32_FORMAT "; too small",
949 if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
950 GST_ERROR_OBJECT (self,
951 "peer requested chunk size %" G_GUINT32_FORMAT "; too large",
956 if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
957 GST_WARNING_OBJECT (self,
958 "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
961 g_mutex_lock (&self->stats_lock);
962 self->in_chunk_size = chunk_size;
963 g_mutex_unlock (&self->stats_lock);
967 gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
969 guint64 last_ack, new_ack;
970 guint32 last_ack_low, last_ack_high;
972 last_ack = self->out_bytes_acked;
973 last_ack_low = last_ack & G_MAXUINT32;
974 last_ack_high = (last_ack >> 32) & G_MAXUINT32;
976 if (bytes < last_ack_low) {
977 GST_WARNING_OBJECT (self,
978 "Acknowledgement bytes regression, assuming rollover: %"
979 G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
983 new_ack = (((guint64) last_ack_high) << 32) | bytes;
985 GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
988 g_mutex_lock (&self->stats_lock);
989 self->out_bytes_acked = new_ack;
990 g_mutex_unlock (&self->stats_lock);
994 gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
995 guint32 window_ack_size)
997 if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
998 GST_WARNING_OBJECT (self,
999 "peer requested small window ack size %" G_GUINT32_FORMAT,
1003 g_mutex_lock (&self->stats_lock);
1004 self->in_window_ack_size = window_ack_size;
1005 g_mutex_unlock (&self->stats_lock);
1009 is_command_response (const gchar * command_name)
1011 return g_strcmp0 (command_name, "_result") == 0 ||
1012 g_strcmp0 (command_name, "_error") == 0;
1016 gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
1019 gchar *command_name;
1020 gdouble transaction_id;
1023 meta = gst_buffer_get_rtmp_meta (buffer);
1024 g_return_if_fail (meta);
1028 gst_buffer_map (buffer, &map, GST_MAP_READ);
1029 args = gst_amf_parse_command (map.data, map.size, &transaction_id,
1031 gst_buffer_unmap (buffer, &map);
1038 if (!isfinite (transaction_id) || transaction_id < 0 ||
1039 transaction_id > G_MAXUINT) {
1040 GST_WARNING_OBJECT (sc,
1041 "Server sent command \"%s\" with extreme transaction ID %.0f",
1042 GST_STR_NULL (command_name), transaction_id);
1043 } else if (transaction_id > sc->transaction_count) {
1044 GST_WARNING_OBJECT (sc,
1045 "Server sent command \"%s\" with unused transaction ID (%.0f > %u)",
1046 GST_STR_NULL (command_name), transaction_id, sc->transaction_count);
1047 sc->transaction_count = transaction_id;
1050 GST_DEBUG_OBJECT (sc,
1051 "got control message \"%s\" transaction %.0f size %"
1052 G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
1055 if (is_command_response (command_name)) {
1056 if (transaction_id != 0) {
1059 for (l = sc->transactions; l; l = g_list_next (l)) {
1060 Transaction *t = l->data;
1062 if (t->transaction_id != transaction_id) {
1066 GST_LOG_OBJECT (sc, "calling transaction callback %s",
1067 GST_DEBUG_FUNCPTR_NAME (t->func));
1068 sc->transactions = g_list_remove_link (sc->transactions, l);
1069 t->func (command_name, args, t->user_data);
1070 g_list_free_full (l, transaction_free);
1074 GST_WARNING_OBJECT (sc, "Server sent response \"%s\" without transaction",
1075 GST_STR_NULL (command_name));
1080 if (transaction_id != 0) {
1081 GST_FIXME_OBJECT (sc, "Server sent command \"%s\" expecting reply",
1082 GST_STR_NULL (command_name));
1085 for (l = sc->expected_commands; l; l = g_list_next (l)) {
1086 ExpectedCommand *ec = l->data;
1088 if (ec->stream_id != meta->mstream) {
1092 if (g_strcmp0 (ec->command_name, command_name)) {
1096 GST_LOG_OBJECT (sc, "calling expected command callback %s",
1097 GST_DEBUG_FUNCPTR_NAME (ec->func));
1098 sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
1099 ec->func (command_name, args, ec->user_data);
1100 g_list_free_full (l, expected_command_free);
1105 g_free (command_name);
1106 g_ptr_array_unref (args);
1110 start_write (gpointer user_data)
1112 GstRtmpConnection *sc = user_data;
1113 gst_rtmp_connection_start_write (sc);
1114 return G_SOURCE_REMOVE;
1118 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
1120 g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
1121 g_return_if_fail (GST_IS_BUFFER (buffer));
1123 g_async_queue_push (self->output_queue, buffer);
1124 g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
1125 start_write, g_object_ref (self), g_object_unref);
1129 gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
1131 return g_async_queue_length (connection->output_queue);
1135 gst_rtmp_connection_send_command (GstRtmpConnection * connection,
1136 GstRtmpCommandCallback response_command, gpointer user_data,
1137 guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
1141 gdouble transaction_id = 0;
1147 g_return_val_if_fail (GST_IS_RTMP_CONNECTION (connection), 0);
1149 if (connection->thread != g_thread_self ()) {
1150 GST_ERROR_OBJECT (connection, "Called from wrong thread");
1153 GST_DEBUG_OBJECT (connection,
1154 "Sending command '%s' on stream id %" G_GUINT32_FORMAT,
1155 command_name, stream_id);
1157 if (response_command) {
1160 transaction_id = ++connection->transaction_count;
1162 GST_LOG_OBJECT (connection, "Registering %s for transid %.0f",
1163 GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
1165 t = transaction_new (transaction_id, response_command, user_data);
1167 connection->transactions = g_list_append (connection->transactions, t);
1170 va_start (ap, argument);
1171 payload = gst_amf_serialize_command_valist (transaction_id,
1172 command_name, argument, ap);
1175 data = g_bytes_unref_to_data (payload, &size);
1176 buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
1177 3, stream_id, data, size);
1179 gst_rtmp_connection_queue_message (connection, buffer);
1180 return transaction_id;
1184 gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
1185 GstRtmpCommandCallback response_command, gpointer user_data,
1186 guint32 stream_id, const gchar * command_name)
1188 ExpectedCommand *ec;
1190 g_return_if_fail (response_command);
1191 g_return_if_fail (command_name);
1192 g_return_if_fail (!is_command_response (command_name));
1194 GST_LOG_OBJECT (connection,
1195 "Registering %s for stream id %" G_GUINT32_FORMAT " name \"%s\"",
1196 GST_DEBUG_FUNCPTR_NAME (response_command), stream_id, command_name);
1198 ec = expected_command_new (stream_id, command_name, response_command,
1201 connection->expected_commands =
1202 g_list_append (connection->expected_commands, ec);
1206 gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
1208 guint64 in_bytes_total = connection->in_bytes_total;
1209 GstRtmpProtocolControl pc = {
1210 .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
1211 .param = (guint32) in_bytes_total,
1214 gst_rtmp_connection_queue_message (connection,
1215 gst_rtmp_message_new_protocol_control (&pc));
1217 g_mutex_lock (&connection->stats_lock);
1218 connection->in_bytes_acked = in_bytes_total;
1219 g_mutex_unlock (&connection->stats_lock);
1223 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
1226 GstRtmpUserControl uc = {
1227 .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
1228 .param = event_data,
1231 gst_rtmp_connection_queue_message (connection,
1232 gst_rtmp_message_new_user_control (&uc));
1236 gst_rtmp_connection_set_chunk_size (GstRtmpConnection * connection,
1239 GstRtmpProtocolControl pc = {
1240 .type = GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE,
1241 .param = chunk_size,
1244 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1246 gst_rtmp_connection_queue_message (connection,
1247 gst_rtmp_message_new_protocol_control (&pc));
1251 gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
1252 guint32 window_ack_size)
1254 GstRtmpProtocolControl pc = {
1255 .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
1256 .param = window_ack_size,
1259 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1261 gst_rtmp_connection_queue_message (connection,
1262 gst_rtmp_message_new_protocol_control (&pc));
1266 gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
1269 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1270 g_return_if_fail (GST_IS_BUFFER (buffer));
1272 gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
1273 gst_rtmp_connection_queue_message (connection, buffer);
1277 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
1280 GstRtmpProtocolControl pc;
1282 if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
1283 GST_ERROR_OBJECT (self, "can't parse protocol control message");
1288 case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
1289 guint32 chunk_size = pc.param;
1291 GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
1294 if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
1295 GST_ERROR_OBJECT (self,
1296 "requested chunk size %" G_GUINT32_FORMAT " is too small",
1301 if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
1302 GST_ERROR_OBJECT (self,
1303 "requested chunk size %" G_GUINT32_FORMAT " is too large",
1308 if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
1309 GST_WARNING_OBJECT (self,
1310 "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
1313 self->out_chunk_size_pending = pc.param;
1317 case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
1318 guint32 window_ack_size = pc.param;
1320 GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
1323 if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1324 GST_WARNING_OBJECT (self,
1325 "requesting small window ack size %" G_GUINT32_FORMAT,
1329 self->out_window_ack_size_pending = window_ack_size;
1341 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
1343 guint32 chunk_size, window_ack_size;
1345 chunk_size = self->out_chunk_size_pending;
1347 self->out_chunk_size_pending = 0;
1349 g_mutex_lock (&self->stats_lock);
1350 self->out_chunk_size = chunk_size;
1351 g_mutex_unlock (&self->stats_lock);
1353 GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
1356 window_ack_size = self->out_window_ack_size_pending;
1357 if (window_ack_size) {
1358 self->out_window_ack_size_pending = 0;
1360 g_mutex_lock (&self->stats_lock);
1361 self->out_window_ack_size = window_ack_size;
1362 g_mutex_unlock (&self->stats_lock);
1364 GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
1369 static GstStructure *
1370 get_stats (GstRtmpConnection * self)
1372 return gst_structure_new ("GstRtmpConnectionStats",
1373 "in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
1374 "out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
1375 "in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
1376 "out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
1377 "in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
1378 "out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
1379 "in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
1380 "out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
1384 gst_rtmp_connection_get_null_stats (void)
1386 return get_stats (NULL);
1390 gst_rtmp_connection_get_stats (GstRtmpConnection * self)
1394 g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
1396 g_mutex_lock (&self->stats_lock);
1397 s = get_stats (self);
1398 g_mutex_unlock (&self->stats_lock);