1 /* GStreamer RTMP Library
2 * Copyright (C) 2013 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
29 #include "rtmpconnection.h"
30 #include "rtmpchunkstream.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
35 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
36 #define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
38 #define READ_SIZE 8192
40 typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
42 struct _GstRtmpConnection
44 GObject parent_instance;
46 /* should be properties */
47 gboolean input_paused;
52 GSocketConnection *connection;
53 GCancellable *cancellable;
54 GSocketClient *socket_client;
55 GAsyncQueue *output_queue;
56 GMainContext *main_context;
58 GCancellable *outer_cancellable;
59 gulong cancel_handler_id;
61 GSource *input_source;
62 GByteArray *input_bytes;
63 guint input_needed_bytes;
64 GstRtmpChunkStreams *input_streams, *output_streams;
66 GList *expected_commands;
67 guint transaction_count;
69 GstRtmpConnectionMessageFunc input_handler;
70 gpointer input_handler_user_data;
71 GDestroyNotify input_handler_user_data_destroy;
73 GstRtmpConnectionFunc output_handler;
74 gpointer output_handler_user_data;
75 GDestroyNotify output_handler_user_data_destroy;
79 /* Protects the values below during concurrent access.
80 * - Taken by the loop thread when writing, but not reading.
81 * - Taken by other threads when reading (calling get_stats).
85 /* RTMP configuration */
86 guint32 in_chunk_size;
87 guint32 out_chunk_size, out_chunk_size_pending;
88 guint32 in_window_ack_size;
89 guint32 out_window_ack_size, out_window_ack_size_pending;
91 guint64 in_bytes_total;
92 guint64 out_bytes_total;
93 guint64 in_bytes_acked;
94 guint64 out_bytes_acked;
100 GObjectClass parent_class;
101 } GstRtmpConnectionClass;
105 static void gst_rtmp_connection_dispose (GObject * object);
106 static void gst_rtmp_connection_finalize (GObject * object);
107 static void gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
108 GCancellable * cancellable);
109 static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
110 static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
112 static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
113 static void gst_rtmp_connection_write_buffer_done (GObject * obj,
114 GAsyncResult * result, gpointer user_data);
115 static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
117 static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
118 static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
119 static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
120 connection, GstBuffer * buffer);
121 static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
122 connection, GstBuffer * buffer);
123 static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
125 static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
127 static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
129 static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
130 guint32 in_chunk_size);
131 static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
133 static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
134 self, guint32 in_chunk_size);
136 static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
138 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
142 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
145 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
149 gdouble transaction_id;
150 GstRtmpCommandCallback func;
155 transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
158 Transaction *data = g_slice_new (Transaction);
159 data->transaction_id = transaction_id;
161 data->user_data = user_data;
166 transaction_free (gpointer ptr)
168 Transaction *data = ptr;
169 g_slice_free (Transaction, data);
176 GstRtmpCommandCallback func;
180 static ExpectedCommand *
181 expected_command_new (guint32 stream_id, const gchar * command_name,
182 GstRtmpCommandCallback func, gpointer user_data)
184 ExpectedCommand *data = g_slice_new (ExpectedCommand);
185 data->stream_id = stream_id;
186 data->command_name = g_strdup (command_name);
188 data->user_data = user_data;
193 expected_command_free (gpointer ptr)
195 ExpectedCommand *data = ptr;
196 g_free (data->command_name);
197 g_slice_free (ExpectedCommand, data);
203 SIGNAL_STREAM_CONTROL,
208 static guint signals[N_SIGNALS] = { 0, };
212 static GstMemory *set_data_frame_value;
215 init_set_data_frame_value (void)
217 GstAmfNode *node = gst_amf_node_new_string ("@setDataFrame", -1);
218 GBytes *bytes = gst_amf_node_serialize (node);
220 const gchar *data = g_bytes_get_data (bytes, &size);
222 set_data_frame_value = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
223 (gpointer) data, size, 0, size, bytes, (GDestroyNotify) g_bytes_unref);
224 GST_MINI_OBJECT_FLAG_SET (set_data_frame_value,
225 GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED);
227 gst_amf_node_free (node);
230 /* class initialization */
232 G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
234 GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
235 "rtmpconnection", 0, "debug category for GstRtmpConnection class");
236 init_set_data_frame_value ());
239 gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
241 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
243 gobject_class->dispose = gst_rtmp_connection_dispose;
244 gobject_class->finalize = gst_rtmp_connection_finalize;
246 signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
247 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
249 signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
250 G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
251 G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
253 GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
257 gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
259 rtmpconnection->cancellable = g_cancellable_new ();
260 rtmpconnection->output_queue =
261 g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
262 rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
263 rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
265 rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
266 rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
268 rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
269 rtmpconnection->input_needed_bytes = 1;
271 g_mutex_init (&rtmpconnection->stats_lock);
275 gst_rtmp_connection_dispose (GObject * object)
277 GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
278 GST_DEBUG_OBJECT (rtmpconnection, "dispose");
280 /* clean up as possible. may be called multiple times */
282 gst_rtmp_connection_close (rtmpconnection);
283 g_cancellable_cancel (rtmpconnection->cancellable);
284 gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
285 gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
286 gst_rtmp_connection_set_cancellable (rtmpconnection, NULL);
288 G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
292 gst_rtmp_connection_finalize (GObject * object)
294 GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
295 GST_DEBUG_OBJECT (rtmpconnection, "finalize");
297 /* clean up object here */
299 g_mutex_clear (&rtmpconnection->stats_lock);
300 g_clear_object (&rtmpconnection->cancellable);
301 g_clear_object (&rtmpconnection->connection);
302 g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
303 g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
304 g_clear_pointer (&rtmpconnection->output_streams,
305 gst_rtmp_chunk_streams_free);
306 g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
307 g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
308 g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
310 G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
314 gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
316 return g_socket_connection_get_socket (sc->connection);
320 gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
321 GSocketConnection * connection)
325 sc->thread = g_thread_ref (g_thread_self ());
326 sc->main_context = g_main_context_ref_thread_default ();
327 sc->connection = g_object_ref (connection);
329 /* refs the socket because it's creating an input stream, which holds a ref */
330 is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
331 /* refs the socket because it's creating a socket source */
332 g_warn_if_fail (!sc->input_source);
334 g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
336 g_source_set_callback (sc->input_source,
337 (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
339 g_source_attach (sc->input_source, sc->main_context);
343 gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
344 GCancellable * cancellable)
346 g_cancellable_disconnect (self->outer_cancellable, self->cancel_handler_id);
347 g_clear_object (&self->outer_cancellable);
348 self->cancel_handler_id = 0;
350 if (cancellable == NULL)
353 self->outer_cancellable = g_object_ref (cancellable);
354 self->cancel_handler_id =
355 g_cancellable_connect (cancellable, G_CALLBACK (g_cancellable_cancel),
356 g_object_ref (self->cancellable), g_object_unref);
361 gst_rtmp_connection_new (GSocketConnection * connection,
362 GCancellable * cancellable)
364 GstRtmpConnection *sc;
366 sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
368 gst_rtmp_connection_set_socket_connection (sc, connection);
369 gst_rtmp_connection_set_cancellable (sc, cancellable);
375 cancel_all_commands (GstRtmpConnection * self)
379 for (l = self->transactions; l; l = g_list_next (l)) {
380 Transaction *cc = l->data;
381 GST_LOG_OBJECT (self, "calling transaction callback %s",
382 GST_DEBUG_FUNCPTR_NAME (cc->func));
383 cc->func ("<cancelled>", NULL, cc->user_data);
385 g_list_free_full (self->transactions, transaction_free);
386 self->transactions = NULL;
388 for (l = self->expected_commands; l; l = g_list_next (l)) {
389 ExpectedCommand *cc = l->data;
390 GST_LOG_OBJECT (self, "calling expected command callback %s",
391 GST_DEBUG_FUNCPTR_NAME (cc->func));
392 cc->func ("<cancelled>", NULL, cc->user_data);
394 g_list_free_full (self->expected_commands, expected_command_free);
395 self->expected_commands = NULL;
399 gst_rtmp_connection_close (GstRtmpConnection * self)
401 if (self->thread != g_thread_self ()) {
402 GST_ERROR_OBJECT (self, "Called from wrong thread");
405 g_cancellable_cancel (self->cancellable);
406 cancel_all_commands (self);
408 if (self->input_source) {
409 g_source_destroy (self->input_source);
410 g_clear_pointer (&self->input_source, g_source_unref);
413 if (self->connection) {
414 g_io_stream_close_async (G_IO_STREAM (self->connection),
415 G_PRIORITY_DEFAULT, NULL, NULL, NULL);
420 gst_rtmp_connection_close_and_unref (gpointer ptr)
422 GstRtmpConnection *connection;
424 g_return_if_fail (ptr);
426 connection = GST_RTMP_CONNECTION (ptr);
427 gst_rtmp_connection_close (connection);
428 g_object_unref (connection);
432 gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
433 GstRtmpConnectionMessageFunc callback, gpointer user_data,
434 GDestroyNotify user_data_destroy)
436 if (sc->input_handler_user_data_destroy) {
437 sc->input_handler_user_data_destroy (sc->input_handler_user_data);
440 sc->input_handler = callback;
441 sc->input_handler_user_data = user_data;
442 sc->input_handler_user_data_destroy = user_data_destroy;
446 gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
447 GstRtmpConnectionFunc callback, gpointer user_data,
448 GDestroyNotify user_data_destroy)
450 if (sc->output_handler_user_data_destroy) {
451 sc->output_handler_user_data_destroy (sc->output_handler_user_data);
454 sc->output_handler = callback;
455 sc->output_handler_user_data = user_data;
456 sc->output_handler_user_data_destroy = user_data_destroy;
460 gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
462 GstRtmpConnection *sc = user_data;
465 GError *error = NULL;
466 guint64 bytes_since_ack;
468 GST_TRACE_OBJECT (sc, "input ready");
470 oldsize = sc->input_bytes->len;
471 g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
473 g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
474 sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
475 g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
478 gint code = error->code;
480 if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
481 code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
483 GST_DEBUG_OBJECT (sc, "read IO error %d %s, continuing",
484 code, error->message);
485 g_error_free (error);
486 return G_SOURCE_CONTINUE;
489 GST_ERROR_OBJECT (sc, "read error: %s %d %s",
490 g_quark_to_string (error->domain), code, error->message);
491 g_error_free (error);
492 } else if (ret == 0) {
493 GST_INFO_OBJECT (sc, "read EOF");
497 gst_rtmp_connection_emit_error (sc);
498 return G_SOURCE_REMOVE;
501 GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
503 g_mutex_lock (&sc->stats_lock);
504 sc->in_bytes_total += ret;
505 g_mutex_unlock (&sc->stats_lock);
507 bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
508 if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
509 gst_rtmp_connection_send_ack (sc);
512 gst_rtmp_connection_try_read (sc);
513 return G_SOURCE_CONTINUE;
517 gst_rtmp_connection_start_write (GstRtmpConnection * self)
520 GstBuffer *message, *chunks;
522 GstRtmpChunkStream *cstream;
528 message = g_async_queue_try_pop (self->output_queue);
533 meta = gst_buffer_get_rtmp_meta (message);
535 GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
539 if (gst_rtmp_message_is_protocol_control (message)) {
540 if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
541 GST_ERROR_OBJECT (self,
542 "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
547 cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
549 GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
554 chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
555 self->out_chunk_size);
557 GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
561 self->writing = TRUE;
562 if (self->output_handler) {
563 self->output_handler (self, self->output_handler_user_data);
566 os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
567 gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
568 self->cancellable, gst_rtmp_connection_write_buffer_done,
569 g_object_ref (self));
571 gst_buffer_unref (chunks);
574 gst_buffer_unref (message);
578 gst_rtmp_connection_emit_error (GstRtmpConnection * self)
584 GST_INFO_OBJECT (self, "connection error");
587 cancel_all_commands (self);
589 g_signal_emit (self, signals[SIGNAL_ERROR], 0);
593 gst_rtmp_connection_write_buffer_done (GObject * obj,
594 GAsyncResult * result, gpointer user_data)
596 GOutputStream *os = G_OUTPUT_STREAM (obj);
597 GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
598 gsize bytes_written = 0;
599 GError *error = NULL;
602 self->writing = FALSE;
604 res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
605 &bytes_written, &error);
607 g_mutex_lock (&self->stats_lock);
608 self->out_bytes_total += bytes_written;
609 g_mutex_unlock (&self->stats_lock);
612 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
613 GST_INFO_OBJECT (self,
614 "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
616 GST_ERROR_OBJECT (self,
617 "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
618 error->message, bytes_written);
620 gst_rtmp_connection_emit_error (self);
621 g_error_free (error);
622 g_object_unref (self);
626 GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
629 gst_rtmp_connection_apply_protocol_control (self);
630 gst_rtmp_connection_start_write (self);
631 g_object_unref (self);
635 gst_rtmp_connection_start_read (GstRtmpConnection * connection,
638 g_return_if_fail (needed_bytes > 0);
639 connection->input_needed_bytes = needed_bytes;
640 gst_rtmp_connection_try_read (connection);
644 gst_rtmp_connection_try_read (GstRtmpConnection * connection)
646 guint need = connection->input_needed_bytes,
647 len = connection->input_bytes->len;
650 GST_TRACE_OBJECT (connection, "got %u < %u bytes, need more", len, need);
654 GST_TRACE_OBJECT (connection, "got %u >= %u bytes, proceeding", len, need);
655 gst_rtmp_connection_do_read (connection);
659 gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
662 g_return_if_fail (size <= sc->input_bytes->len);
665 *outbytes = g_bytes_new (sc->input_bytes->data, size);
668 g_byte_array_remove_range (sc->input_bytes, 0, size);
672 gst_rtmp_connection_do_read (GstRtmpConnection * sc)
674 GByteArray *input_bytes = sc->input_bytes;
675 gsize needed_bytes = 1;
678 GstRtmpChunkStream *cstream;
679 guint32 chunk_stream_id, header_size, next_size;
682 chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
685 if (!chunk_stream_id) {
686 needed_bytes = input_bytes->len + 1;
690 cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
691 header_size = gst_rtmp_chunk_stream_parse_header (cstream,
692 input_bytes->data, input_bytes->len);
694 if (input_bytes->len < header_size) {
695 needed_bytes = header_size;
699 next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
700 sc->in_chunk_size, &data);
702 if (input_bytes->len < header_size + next_size) {
703 needed_bytes = header_size + next_size;
707 memcpy (data, input_bytes->data + header_size, next_size);
708 gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
710 next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
713 if (next_size == 0) {
714 GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
715 gst_rtmp_connection_handle_message (sc, buffer);
716 gst_buffer_unref (buffer);
720 gst_rtmp_connection_start_read (sc, needed_bytes);
724 gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
726 if (gst_rtmp_message_is_protocol_control (buffer)) {
727 gst_rtmp_connection_handle_protocol_control (sc, buffer);
731 if (gst_rtmp_message_is_user_control (buffer)) {
732 gst_rtmp_connection_handle_user_control (sc, buffer);
736 switch (gst_rtmp_message_get_type (buffer)) {
737 case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
738 gst_rtmp_connection_handle_cm (sc, buffer);
741 case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
742 gst_rtmp_connection_handle_aggregate (sc, buffer);
746 if (sc->input_handler) {
747 sc->input_handler (sc, buffer, sc->input_handler_user_data);
754 gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
760 guint32 first_ts = 0;
762 meta = gst_buffer_get_rtmp_meta (buffer);
763 g_return_if_fail (meta);
765 gst_buffer_map (buffer, &map, GST_MAP_READ);
766 GST_TRACE_OBJECT (connection, "got aggregate message");
768 /* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
769 * The payload is part of a FLV file.
771 * WARNING: This spec defines the payload to use an "RTMP message format"
772 * which misidentifies the format of the timestamps and omits the size of the
775 while (pos < map.size) {
776 gsize remaining = map.size - pos;
777 GstBuffer *submessage;
778 GstRtmpMeta *submeta;
779 GstRtmpFlvTagHeader header;
781 if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
782 GST_ERROR_OBJECT (connection,
783 "aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
784 GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
788 if (remaining < header.total_size) {
789 GST_ERROR_OBJECT (connection,
790 "aggregate contains incomplete message; want %" G_GSIZE_FORMAT
791 ", got %" G_GSIZE_FORMAT, header.total_size, remaining);
795 submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
796 GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
797 pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
799 GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
800 GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
801 GST_BUFFER_OFFSET_END (submessage) =
802 GST_BUFFER_OFFSET (submessage) + header.total_size;
804 submeta = gst_buffer_get_rtmp_meta (submessage);
807 submeta->type = header.type;
808 submeta->size = header.payload_size;
811 first_ts = header.timestamp;
813 guint32 ts_offset = header.timestamp - first_ts;
815 submeta->ts_delta += ts_offset;
816 GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
817 GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
820 gst_rtmp_buffer_dump (submessage, "<<< submessage");
821 gst_rtmp_connection_handle_message (connection, submessage);
822 gst_buffer_unref (submessage);
824 pos += header.total_size;
827 gst_buffer_unmap (buffer, &map);
831 gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
834 GstRtmpProtocolControl pc;
836 if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
837 GST_ERROR_OBJECT (connection, "can't parse protocol control message");
841 GST_LOG_OBJECT (connection, "got protocol control message %d:%s", pc.type,
842 gst_rtmp_message_type_get_nick (pc.type));
845 case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
846 GST_INFO_OBJECT (connection, "incoming chunk size %" G_GUINT32_FORMAT,
848 gst_rtmp_connection_handle_set_chunk_size (connection, pc.param);
851 case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
852 GST_ERROR_OBJECT (connection, "unimplemented: chunk abort, stream_id = %"
853 G_GUINT32_FORMAT, pc.param);
856 case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
857 GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
859 gst_rtmp_connection_handle_ack (connection, pc.param);
862 case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
863 GST_INFO_OBJECT (connection,
864 "incoming window ack size: %" G_GUINT32_FORMAT, pc.param);
865 gst_rtmp_connection_handle_window_ack_size (connection, pc.param);
868 case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
869 GST_FIXME_OBJECT (connection, "set peer bandwidth: %" G_GUINT32_FORMAT
870 ", %" G_GUINT32_FORMAT, pc.param, pc.param2);
871 /* FIXME this is not correct, but close enough */
872 gst_rtmp_connection_request_window_size (connection, pc.param);
876 GST_ERROR_OBJECT (connection, "unimplemented protocol control type %d:%s",
877 pc.type, gst_rtmp_message_type_get_nick (pc.type));
883 gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
886 GstRtmpUserControl uc;
888 if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
889 GST_ERROR_OBJECT (connection, "can't parse user control message");
893 GST_LOG_OBJECT (connection, "got user control message %d:%s", uc.type,
894 gst_rtmp_user_control_type_get_nick (uc.type));
897 case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
898 case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
899 case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
900 case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
901 GST_INFO_OBJECT (connection, "stream %u got %s", uc.param,
902 gst_rtmp_user_control_type_get_nick (uc.type));
903 g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
907 case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
908 GST_FIXME_OBJECT (connection, "ignoring set buffer length: %"
909 G_GUINT32_FORMAT ", %" G_GUINT32_FORMAT " ms", uc.param, uc.param2);
912 case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
913 GST_DEBUG_OBJECT (connection, "ping request: %" G_GUINT32_FORMAT,
915 gst_rtmp_connection_send_ping_response (connection, uc.param);
918 case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
919 GST_DEBUG_OBJECT (connection,
920 "ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
923 case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
924 GST_LOG_OBJECT (connection, "ignoring buffer empty: %" G_GUINT32_FORMAT,
928 case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
929 GST_LOG_OBJECT (connection, "ignoring buffer ready: %" G_GUINT32_FORMAT,
934 GST_ERROR_OBJECT (connection, "unimplemented user control type %d:%s",
935 uc.type, gst_rtmp_user_control_type_get_nick (uc.type));
941 gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
944 if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
945 GST_ERROR_OBJECT (self,
946 "peer requested chunk size %" G_GUINT32_FORMAT "; too small",
951 if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
952 GST_ERROR_OBJECT (self,
953 "peer requested chunk size %" G_GUINT32_FORMAT "; too large",
958 if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
959 GST_WARNING_OBJECT (self,
960 "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
963 g_mutex_lock (&self->stats_lock);
964 self->in_chunk_size = chunk_size;
965 g_mutex_unlock (&self->stats_lock);
969 gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
971 guint64 last_ack, new_ack;
972 guint32 last_ack_low, last_ack_high;
974 last_ack = self->out_bytes_acked;
975 last_ack_low = last_ack & G_MAXUINT32;
976 last_ack_high = (last_ack >> 32) & G_MAXUINT32;
978 if (bytes < last_ack_low) {
979 GST_WARNING_OBJECT (self,
980 "Acknowledgement bytes regression, assuming rollover: %"
981 G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
985 new_ack = (((guint64) last_ack_high) << 32) | bytes;
987 GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
990 g_mutex_lock (&self->stats_lock);
991 self->out_bytes_acked = new_ack;
992 g_mutex_unlock (&self->stats_lock);
996 gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
997 guint32 window_ack_size)
999 if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1000 GST_WARNING_OBJECT (self,
1001 "peer requested small window ack size %" G_GUINT32_FORMAT,
1005 g_mutex_lock (&self->stats_lock);
1006 self->in_window_ack_size = window_ack_size;
1007 g_mutex_unlock (&self->stats_lock);
1011 is_command_response (const gchar * command_name)
1013 return g_strcmp0 (command_name, "_result") == 0 ||
1014 g_strcmp0 (command_name, "_error") == 0;
1018 gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
1021 gchar *command_name;
1022 gdouble transaction_id;
1025 meta = gst_buffer_get_rtmp_meta (buffer);
1026 g_return_if_fail (meta);
1030 gst_buffer_map (buffer, &map, GST_MAP_READ);
1031 args = gst_amf_parse_command (map.data, map.size, &transaction_id,
1033 gst_buffer_unmap (buffer, &map);
1040 if (!isfinite (transaction_id) || transaction_id < 0 ||
1041 transaction_id > G_MAXUINT) {
1042 GST_WARNING_OBJECT (sc,
1043 "Server sent command \"%s\" with extreme transaction ID %.0f",
1044 GST_STR_NULL (command_name), transaction_id);
1045 } else if (transaction_id > sc->transaction_count) {
1046 GST_WARNING_OBJECT (sc,
1047 "Server sent command \"%s\" with unused transaction ID (%.0f > %u)",
1048 GST_STR_NULL (command_name), transaction_id, sc->transaction_count);
1049 sc->transaction_count = transaction_id;
1052 GST_DEBUG_OBJECT (sc,
1053 "got control message \"%s\" transaction %.0f size %"
1054 G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
1057 if (is_command_response (command_name)) {
1058 if (transaction_id != 0) {
1061 for (l = sc->transactions; l; l = g_list_next (l)) {
1062 Transaction *t = l->data;
1064 if (t->transaction_id != transaction_id) {
1068 GST_LOG_OBJECT (sc, "calling transaction callback %s",
1069 GST_DEBUG_FUNCPTR_NAME (t->func));
1070 sc->transactions = g_list_remove_link (sc->transactions, l);
1071 t->func (command_name, args, t->user_data);
1072 g_list_free_full (l, transaction_free);
1076 GST_WARNING_OBJECT (sc, "Server sent response \"%s\" without transaction",
1077 GST_STR_NULL (command_name));
1082 if (transaction_id != 0) {
1083 GST_FIXME_OBJECT (sc, "Server sent command \"%s\" expecting reply",
1084 GST_STR_NULL (command_name));
1087 for (l = sc->expected_commands; l; l = g_list_next (l)) {
1088 ExpectedCommand *ec = l->data;
1090 if (ec->stream_id != meta->mstream) {
1094 if (g_strcmp0 (ec->command_name, command_name)) {
1098 GST_LOG_OBJECT (sc, "calling expected command callback %s",
1099 GST_DEBUG_FUNCPTR_NAME (ec->func));
1100 sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
1101 ec->func (command_name, args, ec->user_data);
1102 g_list_free_full (l, expected_command_free);
1107 g_free (command_name);
1108 g_ptr_array_unref (args);
1112 start_write (gpointer user_data)
1114 GstRtmpConnection *sc = user_data;
1115 gst_rtmp_connection_start_write (sc);
1116 return G_SOURCE_REMOVE;
1120 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
1122 g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
1123 g_return_if_fail (GST_IS_BUFFER (buffer));
1125 g_async_queue_push (self->output_queue, buffer);
1126 g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
1127 start_write, g_object_ref (self), g_object_unref);
1131 gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
1133 return g_async_queue_length (connection->output_queue);
1137 gst_rtmp_connection_send_command (GstRtmpConnection * connection,
1138 GstRtmpCommandCallback response_command, gpointer user_data,
1139 guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
1143 gdouble transaction_id = 0;
1149 g_return_val_if_fail (GST_IS_RTMP_CONNECTION (connection), 0);
1151 if (connection->thread != g_thread_self ()) {
1152 GST_ERROR_OBJECT (connection, "Called from wrong thread");
1155 GST_DEBUG_OBJECT (connection,
1156 "Sending command '%s' on stream id %" G_GUINT32_FORMAT,
1157 command_name, stream_id);
1159 if (response_command) {
1162 transaction_id = ++connection->transaction_count;
1164 GST_LOG_OBJECT (connection, "Registering %s for transid %.0f",
1165 GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
1167 t = transaction_new (transaction_id, response_command, user_data);
1169 connection->transactions = g_list_append (connection->transactions, t);
1172 va_start (ap, argument);
1173 payload = gst_amf_serialize_command_valist (transaction_id,
1174 command_name, argument, ap);
1177 data = g_bytes_unref_to_data (payload, &size);
1178 buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
1179 3, stream_id, data, size);
1181 gst_rtmp_connection_queue_message (connection, buffer);
1182 return transaction_id;
1186 gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
1187 GstRtmpCommandCallback response_command, gpointer user_data,
1188 guint32 stream_id, const gchar * command_name)
1190 ExpectedCommand *ec;
1192 g_return_if_fail (response_command);
1193 g_return_if_fail (command_name);
1194 g_return_if_fail (!is_command_response (command_name));
1196 GST_LOG_OBJECT (connection,
1197 "Registering %s for stream id %" G_GUINT32_FORMAT " name \"%s\"",
1198 GST_DEBUG_FUNCPTR_NAME (response_command), stream_id, command_name);
1200 ec = expected_command_new (stream_id, command_name, response_command,
1203 connection->expected_commands =
1204 g_list_append (connection->expected_commands, ec);
1208 gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
1210 guint64 in_bytes_total = connection->in_bytes_total;
1211 GstRtmpProtocolControl pc = {
1212 .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
1213 .param = (guint32) in_bytes_total,
1216 gst_rtmp_connection_queue_message (connection,
1217 gst_rtmp_message_new_protocol_control (&pc));
1219 g_mutex_lock (&connection->stats_lock);
1220 connection->in_bytes_acked = in_bytes_total;
1221 g_mutex_unlock (&connection->stats_lock);
1225 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
1228 GstRtmpUserControl uc = {
1229 .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
1230 .param = event_data,
1233 gst_rtmp_connection_queue_message (connection,
1234 gst_rtmp_message_new_user_control (&uc));
1238 gst_rtmp_connection_set_chunk_size (GstRtmpConnection * connection,
1241 GstRtmpProtocolControl pc = {
1242 .type = GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE,
1243 .param = chunk_size,
1246 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1248 gst_rtmp_connection_queue_message (connection,
1249 gst_rtmp_message_new_protocol_control (&pc));
1253 gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
1254 guint32 window_ack_size)
1256 GstRtmpProtocolControl pc = {
1257 .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
1258 .param = window_ack_size,
1261 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1263 gst_rtmp_connection_queue_message (connection,
1264 gst_rtmp_message_new_protocol_control (&pc));
1268 gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
1271 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1272 g_return_if_fail (GST_IS_BUFFER (buffer));
1274 gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
1275 gst_rtmp_connection_queue_message (connection, buffer);
1279 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
1282 GstRtmpProtocolControl pc;
1284 if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
1285 GST_ERROR_OBJECT (self, "can't parse protocol control message");
1290 case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
1291 guint32 chunk_size = pc.param;
1293 GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
1296 if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
1297 GST_ERROR_OBJECT (self,
1298 "requested chunk size %" G_GUINT32_FORMAT " is too small",
1303 if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
1304 GST_ERROR_OBJECT (self,
1305 "requested chunk size %" G_GUINT32_FORMAT " is too large",
1310 if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
1311 GST_WARNING_OBJECT (self,
1312 "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
1315 self->out_chunk_size_pending = pc.param;
1319 case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
1320 guint32 window_ack_size = pc.param;
1322 GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
1325 if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1326 GST_WARNING_OBJECT (self,
1327 "requesting small window ack size %" G_GUINT32_FORMAT,
1331 self->out_window_ack_size_pending = window_ack_size;
1343 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
1345 guint32 chunk_size, window_ack_size;
1347 chunk_size = self->out_chunk_size_pending;
1349 self->out_chunk_size_pending = 0;
1351 g_mutex_lock (&self->stats_lock);
1352 self->out_chunk_size = chunk_size;
1353 g_mutex_unlock (&self->stats_lock);
1355 GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
1358 window_ack_size = self->out_window_ack_size_pending;
1359 if (window_ack_size) {
1360 self->out_window_ack_size_pending = 0;
1362 g_mutex_lock (&self->stats_lock);
1363 self->out_window_ack_size = window_ack_size;
1364 g_mutex_unlock (&self->stats_lock);
1366 GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
1371 static GstStructure *
1372 get_stats (GstRtmpConnection * self)
1374 return gst_structure_new ("GstRtmpConnectionStats",
1375 "in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
1376 "out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
1377 "in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
1378 "out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
1379 "in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
1380 "out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
1381 "in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
1382 "out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
1386 gst_rtmp_connection_get_null_stats (void)
1388 return get_stats (NULL);
1392 gst_rtmp_connection_get_stats (GstRtmpConnection * self)
1396 g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
1398 g_mutex_lock (&self->stats_lock);
1399 s = get_stats (self);
1400 g_mutex_unlock (&self->stats_lock);