1 /* GStreamer RTMP Library
2 * Copyright (C) 2013 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
29 #include "rtmpconnection.h"
30 #include "rtmpchunkstream.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
35 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
36 #define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
38 #define READ_SIZE 8192
40 typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
42 struct _GstRtmpConnection
44 GObject parent_instance;
46 /* should be properties */
47 gboolean input_paused;
52 GSocketConnection *connection;
53 GCancellable *cancellable;
54 GSocketClient *socket_client;
55 GAsyncQueue *output_queue;
56 GMainContext *main_context;
58 GSource *input_source;
59 GByteArray *input_bytes;
60 guint input_needed_bytes;
61 GstRtmpChunkStreams *input_streams, *output_streams;
63 GList *expected_commands;
64 guint transaction_count;
66 GstRtmpConnectionMessageFunc input_handler;
67 gpointer input_handler_user_data;
68 GDestroyNotify input_handler_user_data_destroy;
70 GstRtmpConnectionFunc output_handler;
71 gpointer output_handler_user_data;
72 GDestroyNotify output_handler_user_data_destroy;
76 /* Protects the values below during concurrent access.
77 * - Taken by the loop thread when writing, but not reading.
78 * - Taken by other threads when reading (calling get_stats).
82 /* RTMP configuration */
83 guint32 in_chunk_size;
84 guint32 out_chunk_size, out_chunk_size_pending;
85 guint32 in_window_ack_size;
86 guint32 out_window_ack_size, out_window_ack_size_pending;
88 guint64 in_bytes_total;
89 guint64 out_bytes_total;
90 guint64 in_bytes_acked;
91 guint64 out_bytes_acked;
97 GObjectClass parent_class;
98 } GstRtmpConnectionClass;
102 static void gst_rtmp_connection_dispose (GObject * object);
103 static void gst_rtmp_connection_finalize (GObject * object);
104 static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
105 static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
107 static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
108 static void gst_rtmp_connection_write_buffer_done (GObject * obj,
109 GAsyncResult * result, gpointer user_data);
110 static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
112 static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
113 static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
114 static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
115 connection, GstBuffer * buffer);
116 static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
117 connection, GstBuffer * buffer);
118 static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
120 static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
122 static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
124 static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
125 guint32 in_chunk_size);
126 static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
128 static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
129 self, guint32 in_chunk_size);
131 static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
133 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
137 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
140 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
144 gdouble transaction_id;
145 GstRtmpCommandCallback func;
150 transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
153 Transaction *data = g_slice_new (Transaction);
154 data->transaction_id = transaction_id;
156 data->user_data = user_data;
161 transaction_free (gpointer ptr)
163 Transaction *data = ptr;
164 g_slice_free (Transaction, data);
171 GstRtmpCommandCallback func;
175 static ExpectedCommand *
176 expected_command_new (guint32 stream_id, const gchar * command_name,
177 GstRtmpCommandCallback func, gpointer user_data)
179 ExpectedCommand *data = g_slice_new (ExpectedCommand);
180 data->stream_id = stream_id;
181 data->command_name = g_strdup (command_name);
183 data->user_data = user_data;
188 expected_command_free (gpointer ptr)
190 ExpectedCommand *data = ptr;
191 g_free (data->command_name);
192 g_slice_free (ExpectedCommand, data);
198 SIGNAL_STREAM_CONTROL,
203 static guint signals[N_SIGNALS] = { 0, };
207 static GstMemory *set_data_frame_value;
210 init_set_data_frame_value (void)
212 GstAmfNode *node = gst_amf_node_new_string ("@setDataFrame", -1);
213 GBytes *bytes = gst_amf_node_serialize (node);
215 const gchar *data = g_bytes_get_data (bytes, &size);
217 set_data_frame_value = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
218 (gpointer) data, size, 0, size, bytes, (GDestroyNotify) g_bytes_unref);
219 GST_MINI_OBJECT_FLAG_SET (set_data_frame_value,
220 GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED);
222 gst_amf_node_free (node);
225 /* class initialization */
227 G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
229 GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
230 "rtmpconnection", 0, "debug category for GstRtmpConnection class");
231 init_set_data_frame_value ());
234 gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
236 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
238 gobject_class->dispose = gst_rtmp_connection_dispose;
239 gobject_class->finalize = gst_rtmp_connection_finalize;
241 signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
242 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
244 signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
245 G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
246 G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
248 GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
252 gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
254 rtmpconnection->output_queue =
255 g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
256 rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
257 rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
259 rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
260 rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
262 rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
263 rtmpconnection->input_needed_bytes = 1;
265 g_mutex_init (&rtmpconnection->stats_lock);
269 gst_rtmp_connection_dispose (GObject * object)
271 GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
272 GST_DEBUG_OBJECT (rtmpconnection, "dispose");
274 /* clean up as possible. may be called multiple times */
276 gst_rtmp_connection_close (rtmpconnection);
277 g_cancellable_cancel (rtmpconnection->cancellable);
278 gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
279 gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
281 G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
285 gst_rtmp_connection_finalize (GObject * object)
287 GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
288 GST_DEBUG_OBJECT (rtmpconnection, "finalize");
290 /* clean up object here */
292 g_mutex_clear (&rtmpconnection->stats_lock);
293 g_clear_object (&rtmpconnection->cancellable);
294 g_clear_object (&rtmpconnection->connection);
295 g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
296 g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
297 g_clear_pointer (&rtmpconnection->output_streams,
298 gst_rtmp_chunk_streams_free);
299 g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
300 g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
301 g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
303 G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
307 gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
309 return g_socket_connection_get_socket (sc->connection);
313 gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
314 GSocketConnection * connection)
318 sc->thread = g_thread_ref (g_thread_self ());
319 sc->main_context = g_main_context_ref_thread_default ();
320 sc->connection = g_object_ref (connection);
322 /* refs the socket because it's creating an input stream, which holds a ref */
323 is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
324 /* refs the socket because it's creating a socket source */
325 g_warn_if_fail (!sc->input_source);
327 g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
329 g_source_set_callback (sc->input_source,
330 (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
332 g_source_attach (sc->input_source, sc->main_context);
336 gst_rtmp_connection_new (GSocketConnection * connection,
337 GCancellable * cancellable)
339 GstRtmpConnection *sc;
341 sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
343 sc->cancellable = g_object_ref (cancellable);
345 sc->cancellable = g_cancellable_new ();
347 gst_rtmp_connection_set_socket_connection (sc, connection);
353 cancel_all_commands (GstRtmpConnection * self)
357 for (l = self->transactions; l; l = g_list_next (l)) {
358 Transaction *cc = l->data;
359 GST_LOG_OBJECT (self, "calling transaction callback %s",
360 GST_DEBUG_FUNCPTR_NAME (cc->func));
361 cc->func ("<cancelled>", NULL, cc->user_data);
363 g_list_free_full (self->transactions, transaction_free);
364 self->transactions = NULL;
366 for (l = self->expected_commands; l; l = g_list_next (l)) {
367 ExpectedCommand *cc = l->data;
368 GST_LOG_OBJECT (self, "calling expected command callback %s",
369 GST_DEBUG_FUNCPTR_NAME (cc->func));
370 cc->func ("<cancelled>", NULL, cc->user_data);
372 g_list_free_full (self->expected_commands, expected_command_free);
373 self->expected_commands = NULL;
377 gst_rtmp_connection_close (GstRtmpConnection * self)
379 if (self->thread != g_thread_self ()) {
380 GST_ERROR_OBJECT (self, "Called from wrong thread");
383 g_cancellable_cancel (self->cancellable);
384 cancel_all_commands (self);
386 if (self->input_source) {
387 g_source_destroy (self->input_source);
388 g_clear_pointer (&self->input_source, g_source_unref);
391 if (self->connection) {
392 g_io_stream_close_async (G_IO_STREAM (self->connection),
393 G_PRIORITY_DEFAULT, NULL, NULL, NULL);
398 gst_rtmp_connection_close_and_unref (gpointer ptr)
400 GstRtmpConnection *connection;
402 g_return_if_fail (ptr);
404 connection = GST_RTMP_CONNECTION (ptr);
405 gst_rtmp_connection_close (connection);
406 g_object_unref (connection);
410 gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
411 GstRtmpConnectionMessageFunc callback, gpointer user_data,
412 GDestroyNotify user_data_destroy)
414 if (sc->input_handler_user_data_destroy) {
415 sc->input_handler_user_data_destroy (sc->input_handler_user_data);
418 sc->input_handler = callback;
419 sc->input_handler_user_data = user_data;
420 sc->input_handler_user_data_destroy = user_data_destroy;
424 gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
425 GstRtmpConnectionFunc callback, gpointer user_data,
426 GDestroyNotify user_data_destroy)
428 if (sc->output_handler_user_data_destroy) {
429 sc->output_handler_user_data_destroy (sc->output_handler_user_data);
432 sc->output_handler = callback;
433 sc->output_handler_user_data = user_data;
434 sc->output_handler_user_data_destroy = user_data_destroy;
438 gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
440 GstRtmpConnection *sc = user_data;
443 GError *error = NULL;
444 guint64 bytes_since_ack;
446 GST_TRACE_OBJECT (sc, "input ready");
448 oldsize = sc->input_bytes->len;
449 g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
451 g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
452 sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
453 g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
456 gint code = error->code;
458 if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
459 code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
461 GST_DEBUG_OBJECT (sc, "read IO error %d %s, continuing",
462 code, error->message);
463 g_error_free (error);
464 return G_SOURCE_CONTINUE;
467 GST_ERROR_OBJECT (sc, "read error: %s %d %s",
468 g_quark_to_string (error->domain), code, error->message);
469 g_error_free (error);
470 } else if (ret == 0) {
471 GST_INFO_OBJECT (sc, "read EOF");
475 gst_rtmp_connection_emit_error (sc);
476 return G_SOURCE_REMOVE;
479 GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
481 g_mutex_lock (&sc->stats_lock);
482 sc->in_bytes_total += ret;
483 g_mutex_unlock (&sc->stats_lock);
485 bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
486 if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
487 gst_rtmp_connection_send_ack (sc);
490 gst_rtmp_connection_try_read (sc);
491 return G_SOURCE_CONTINUE;
495 gst_rtmp_connection_start_write (GstRtmpConnection * self)
498 GstBuffer *message, *chunks;
500 GstRtmpChunkStream *cstream;
506 message = g_async_queue_try_pop (self->output_queue);
511 meta = gst_buffer_get_rtmp_meta (message);
513 GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
517 if (gst_rtmp_message_is_protocol_control (message)) {
518 if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
519 GST_ERROR_OBJECT (self,
520 "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
525 cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
527 GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
532 chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
533 self->out_chunk_size);
535 GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
539 self->writing = TRUE;
540 if (self->output_handler) {
541 self->output_handler (self, self->output_handler_user_data);
544 os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
545 gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
546 self->cancellable, gst_rtmp_connection_write_buffer_done,
547 g_object_ref (self));
549 gst_buffer_unref (chunks);
552 gst_buffer_unref (message);
556 gst_rtmp_connection_emit_error (GstRtmpConnection * self)
562 GST_INFO_OBJECT (self, "connection error");
565 cancel_all_commands (self);
567 g_signal_emit (self, signals[SIGNAL_ERROR], 0);
571 gst_rtmp_connection_write_buffer_done (GObject * obj,
572 GAsyncResult * result, gpointer user_data)
574 GOutputStream *os = G_OUTPUT_STREAM (obj);
575 GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
576 gsize bytes_written = 0;
577 GError *error = NULL;
580 self->writing = FALSE;
582 res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
583 &bytes_written, &error);
585 g_mutex_lock (&self->stats_lock);
586 self->out_bytes_total += bytes_written;
587 g_mutex_unlock (&self->stats_lock);
590 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
591 GST_INFO_OBJECT (self,
592 "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
594 GST_ERROR_OBJECT (self,
595 "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
596 error->message, bytes_written);
598 gst_rtmp_connection_emit_error (self);
599 g_error_free (error);
600 g_object_unref (self);
604 GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
607 gst_rtmp_connection_apply_protocol_control (self);
608 gst_rtmp_connection_start_write (self);
609 g_object_unref (self);
613 gst_rtmp_connection_start_read (GstRtmpConnection * connection,
616 g_return_if_fail (needed_bytes > 0);
617 connection->input_needed_bytes = needed_bytes;
618 gst_rtmp_connection_try_read (connection);
622 gst_rtmp_connection_try_read (GstRtmpConnection * connection)
624 guint need = connection->input_needed_bytes,
625 len = connection->input_bytes->len;
628 GST_TRACE_OBJECT (connection, "got %u < %u bytes, need more", len, need);
632 GST_TRACE_OBJECT (connection, "got %u >= %u bytes, proceeding", len, need);
633 gst_rtmp_connection_do_read (connection);
637 gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
640 g_return_if_fail (size <= sc->input_bytes->len);
643 *outbytes = g_bytes_new (sc->input_bytes->data, size);
646 g_byte_array_remove_range (sc->input_bytes, 0, size);
650 gst_rtmp_connection_do_read (GstRtmpConnection * sc)
652 GByteArray *input_bytes = sc->input_bytes;
653 gsize needed_bytes = 1;
656 GstRtmpChunkStream *cstream;
657 guint32 chunk_stream_id, header_size, next_size;
660 chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
663 if (!chunk_stream_id) {
664 needed_bytes = input_bytes->len + 1;
668 cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
669 header_size = gst_rtmp_chunk_stream_parse_header (cstream,
670 input_bytes->data, input_bytes->len);
672 if (input_bytes->len < header_size) {
673 needed_bytes = header_size;
677 next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
678 sc->in_chunk_size, &data);
680 if (input_bytes->len < header_size + next_size) {
681 needed_bytes = header_size + next_size;
685 memcpy (data, input_bytes->data + header_size, next_size);
686 gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
688 next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
691 if (next_size == 0) {
692 GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
693 gst_rtmp_connection_handle_message (sc, buffer);
694 gst_buffer_unref (buffer);
698 gst_rtmp_connection_start_read (sc, needed_bytes);
702 gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
704 if (gst_rtmp_message_is_protocol_control (buffer)) {
705 gst_rtmp_connection_handle_protocol_control (sc, buffer);
709 if (gst_rtmp_message_is_user_control (buffer)) {
710 gst_rtmp_connection_handle_user_control (sc, buffer);
714 switch (gst_rtmp_message_get_type (buffer)) {
715 case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
716 gst_rtmp_connection_handle_cm (sc, buffer);
719 case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
720 gst_rtmp_connection_handle_aggregate (sc, buffer);
724 if (sc->input_handler) {
725 sc->input_handler (sc, buffer, sc->input_handler_user_data);
732 gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
738 guint32 first_ts = 0;
740 meta = gst_buffer_get_rtmp_meta (buffer);
741 g_return_if_fail (meta);
743 gst_buffer_map (buffer, &map, GST_MAP_READ);
744 GST_TRACE_OBJECT (connection, "got aggregate message");
746 /* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
747 * The payload is part of a FLV file.
749 * WARNING: This spec defines the payload to use an "RTMP message format"
750 * which misidentifies the format of the timestamps and omits the size of the
753 while (pos < map.size) {
754 gsize remaining = map.size - pos;
755 GstBuffer *submessage;
756 GstRtmpMeta *submeta;
757 GstRtmpFlvTagHeader header;
759 if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
760 GST_ERROR_OBJECT (connection,
761 "aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
762 GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
766 if (remaining < header.total_size) {
767 GST_ERROR_OBJECT (connection,
768 "aggregate contains incomplete message; want %" G_GSIZE_FORMAT
769 ", got %" G_GSIZE_FORMAT, header.total_size, remaining);
773 submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
774 GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
775 pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
777 GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
778 GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
779 GST_BUFFER_OFFSET_END (submessage) =
780 GST_BUFFER_OFFSET (submessage) + header.total_size;
782 submeta = gst_buffer_get_rtmp_meta (submessage);
785 submeta->type = header.type;
786 submeta->size = header.payload_size;
789 first_ts = header.timestamp;
791 guint32 ts_offset = header.timestamp - first_ts;
793 submeta->ts_delta += ts_offset;
794 GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
795 GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
798 gst_rtmp_buffer_dump (submessage, "<<< submessage");
799 gst_rtmp_connection_handle_message (connection, submessage);
800 gst_buffer_unref (submessage);
802 pos += header.total_size;
805 gst_buffer_unmap (buffer, &map);
809 gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
812 GstRtmpProtocolControl pc;
814 if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
815 GST_ERROR_OBJECT (connection, "can't parse protocol control message");
819 GST_LOG_OBJECT (connection, "got protocol control message %d:%s", pc.type,
820 gst_rtmp_message_type_get_nick (pc.type));
823 case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
824 GST_INFO_OBJECT (connection, "incoming chunk size %" G_GUINT32_FORMAT,
826 gst_rtmp_connection_handle_set_chunk_size (connection, pc.param);
829 case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
830 GST_ERROR_OBJECT (connection, "unimplemented: chunk abort, stream_id = %"
831 G_GUINT32_FORMAT, pc.param);
834 case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
835 GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
837 gst_rtmp_connection_handle_ack (connection, pc.param);
840 case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
841 GST_INFO_OBJECT (connection,
842 "incoming window ack size: %" G_GUINT32_FORMAT, pc.param);
843 gst_rtmp_connection_handle_window_ack_size (connection, pc.param);
846 case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
847 GST_FIXME_OBJECT (connection, "set peer bandwidth: %" G_GUINT32_FORMAT
848 ", %" G_GUINT32_FORMAT, pc.param, pc.param2);
849 /* FIXME this is not correct, but close enough */
850 gst_rtmp_connection_request_window_size (connection, pc.param);
854 GST_ERROR_OBJECT (connection, "unimplemented protocol control type %d:%s",
855 pc.type, gst_rtmp_message_type_get_nick (pc.type));
861 gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
864 GstRtmpUserControl uc;
866 if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
867 GST_ERROR_OBJECT (connection, "can't parse user control message");
871 GST_LOG_OBJECT (connection, "got user control message %d:%s", uc.type,
872 gst_rtmp_user_control_type_get_nick (uc.type));
875 case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
876 case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
877 case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
878 case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
879 GST_INFO_OBJECT (connection, "stream %u got %s", uc.param,
880 gst_rtmp_user_control_type_get_nick (uc.type));
881 g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
885 case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
886 GST_FIXME_OBJECT (connection, "ignoring set buffer length: %"
887 G_GUINT32_FORMAT ", %" G_GUINT32_FORMAT " ms", uc.param, uc.param2);
890 case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
891 GST_DEBUG_OBJECT (connection, "ping request: %" G_GUINT32_FORMAT,
893 gst_rtmp_connection_send_ping_response (connection, uc.param);
896 case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
897 GST_DEBUG_OBJECT (connection,
898 "ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
901 case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
902 GST_LOG_OBJECT (connection, "ignoring buffer empty: %" G_GUINT32_FORMAT,
906 case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
907 GST_LOG_OBJECT (connection, "ignoring buffer ready: %" G_GUINT32_FORMAT,
912 GST_ERROR_OBJECT (connection, "unimplemented user control type %d:%s",
913 uc.type, gst_rtmp_user_control_type_get_nick (uc.type));
919 gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
922 if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
923 GST_ERROR_OBJECT (self,
924 "peer requested chunk size %" G_GUINT32_FORMAT "; too small",
929 if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
930 GST_ERROR_OBJECT (self,
931 "peer requested chunk size %" G_GUINT32_FORMAT "; too large",
936 if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
937 GST_WARNING_OBJECT (self,
938 "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
941 g_mutex_lock (&self->stats_lock);
942 self->in_chunk_size = chunk_size;
943 g_mutex_unlock (&self->stats_lock);
947 gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
949 guint64 last_ack, new_ack;
950 guint32 last_ack_low, last_ack_high;
952 last_ack = self->out_bytes_acked;
953 last_ack_low = last_ack & G_MAXUINT32;
954 last_ack_high = (last_ack >> 32) & G_MAXUINT32;
956 if (bytes < last_ack_low) {
957 GST_WARNING_OBJECT (self,
958 "Acknowledgement bytes regression, assuming rollover: %"
959 G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
963 new_ack = (((guint64) last_ack_high) << 32) | bytes;
965 GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
968 g_mutex_lock (&self->stats_lock);
969 self->out_bytes_acked = new_ack;
970 g_mutex_unlock (&self->stats_lock);
974 gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
975 guint32 window_ack_size)
977 if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
978 GST_WARNING_OBJECT (self,
979 "peer requested small window ack size %" G_GUINT32_FORMAT,
983 g_mutex_lock (&self->stats_lock);
984 self->in_window_ack_size = window_ack_size;
985 g_mutex_unlock (&self->stats_lock);
989 is_command_response (const gchar * command_name)
991 return g_strcmp0 (command_name, "_result") == 0 ||
992 g_strcmp0 (command_name, "_error") == 0;
996 gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
1000 gdouble transaction_id;
1003 meta = gst_buffer_get_rtmp_meta (buffer);
1004 g_return_if_fail (meta);
1008 gst_buffer_map (buffer, &map, GST_MAP_READ);
1009 args = gst_amf_parse_command (map.data, map.size, &transaction_id,
1011 gst_buffer_unmap (buffer, &map);
1018 if (!isfinite (transaction_id) || transaction_id < 0 ||
1019 transaction_id > G_MAXUINT) {
1020 GST_WARNING_OBJECT (sc,
1021 "Server sent command \"%s\" with extreme transaction ID %.0f",
1022 GST_STR_NULL (command_name), transaction_id);
1023 } else if (transaction_id > sc->transaction_count) {
1024 GST_WARNING_OBJECT (sc,
1025 "Server sent command \"%s\" with unused transaction ID (%.0f > %u)",
1026 GST_STR_NULL (command_name), transaction_id, sc->transaction_count);
1027 sc->transaction_count = transaction_id;
1030 GST_DEBUG_OBJECT (sc,
1031 "got control message \"%s\" transaction %.0f size %"
1032 G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
1035 if (is_command_response (command_name)) {
1036 if (transaction_id != 0) {
1039 for (l = sc->transactions; l; l = g_list_next (l)) {
1040 Transaction *t = l->data;
1042 if (t->transaction_id != transaction_id) {
1046 GST_LOG_OBJECT (sc, "calling transaction callback %s",
1047 GST_DEBUG_FUNCPTR_NAME (t->func));
1048 sc->transactions = g_list_remove_link (sc->transactions, l);
1049 t->func (command_name, args, t->user_data);
1050 g_list_free_full (l, transaction_free);
1054 GST_WARNING_OBJECT (sc, "Server sent response \"%s\" without transaction",
1055 GST_STR_NULL (command_name));
1060 if (transaction_id != 0) {
1061 GST_FIXME_OBJECT (sc, "Server sent command \"%s\" expecting reply",
1062 GST_STR_NULL (command_name));
1065 for (l = sc->expected_commands; l; l = g_list_next (l)) {
1066 ExpectedCommand *ec = l->data;
1068 if (ec->stream_id != meta->mstream) {
1072 if (g_strcmp0 (ec->command_name, command_name)) {
1076 GST_LOG_OBJECT (sc, "calling expected command callback %s",
1077 GST_DEBUG_FUNCPTR_NAME (ec->func));
1078 sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
1079 ec->func (command_name, args, ec->user_data);
1080 g_list_free_full (l, expected_command_free);
1085 g_free (command_name);
1086 g_ptr_array_unref (args);
1090 start_write (gpointer user_data)
1092 GstRtmpConnection *sc = user_data;
1093 gst_rtmp_connection_start_write (sc);
1094 return G_SOURCE_REMOVE;
1098 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
1100 g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
1101 g_return_if_fail (GST_IS_BUFFER (buffer));
1103 g_async_queue_push (self->output_queue, buffer);
1104 g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
1105 start_write, g_object_ref (self), g_object_unref);
1109 gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
1111 return g_async_queue_length (connection->output_queue);
1115 gst_rtmp_connection_send_command (GstRtmpConnection * connection,
1116 GstRtmpCommandCallback response_command, gpointer user_data,
1117 guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
1121 gdouble transaction_id = 0;
1127 g_return_val_if_fail (GST_IS_RTMP_CONNECTION (connection), 0);
1129 if (connection->thread != g_thread_self ()) {
1130 GST_ERROR_OBJECT (connection, "Called from wrong thread");
1133 GST_DEBUG_OBJECT (connection,
1134 "Sending command '%s' on stream id %" G_GUINT32_FORMAT,
1135 command_name, stream_id);
1137 if (response_command) {
1140 transaction_id = ++connection->transaction_count;
1142 GST_LOG_OBJECT (connection, "Registering %s for transid %.0f",
1143 GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
1145 t = transaction_new (transaction_id, response_command, user_data);
1147 connection->transactions = g_list_append (connection->transactions, t);
1150 va_start (ap, argument);
1151 payload = gst_amf_serialize_command_valist (transaction_id,
1152 command_name, argument, ap);
1155 data = g_bytes_unref_to_data (payload, &size);
1156 buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
1157 3, stream_id, data, size);
1159 gst_rtmp_connection_queue_message (connection, buffer);
1160 return transaction_id;
1164 gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
1165 GstRtmpCommandCallback response_command, gpointer user_data,
1166 guint32 stream_id, const gchar * command_name)
1168 ExpectedCommand *ec;
1170 g_return_if_fail (response_command);
1171 g_return_if_fail (command_name);
1172 g_return_if_fail (!is_command_response (command_name));
1174 GST_LOG_OBJECT (connection,
1175 "Registering %s for stream id %" G_GUINT32_FORMAT " name \"%s\"",
1176 GST_DEBUG_FUNCPTR_NAME (response_command), stream_id, command_name);
1178 ec = expected_command_new (stream_id, command_name, response_command,
1181 connection->expected_commands =
1182 g_list_append (connection->expected_commands, ec);
1186 gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
1188 guint64 in_bytes_total = connection->in_bytes_total;
1189 GstRtmpProtocolControl pc = {
1190 .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
1191 .param = (guint32) in_bytes_total,
1194 gst_rtmp_connection_queue_message (connection,
1195 gst_rtmp_message_new_protocol_control (&pc));
1197 g_mutex_lock (&connection->stats_lock);
1198 connection->in_bytes_acked = in_bytes_total;
1199 g_mutex_unlock (&connection->stats_lock);
1203 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
1206 GstRtmpUserControl uc = {
1207 .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
1208 .param = event_data,
1211 gst_rtmp_connection_queue_message (connection,
1212 gst_rtmp_message_new_user_control (&uc));
1216 gst_rtmp_connection_set_chunk_size (GstRtmpConnection * connection,
1219 GstRtmpProtocolControl pc = {
1220 .type = GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE,
1221 .param = chunk_size,
1224 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1226 gst_rtmp_connection_queue_message (connection,
1227 gst_rtmp_message_new_protocol_control (&pc));
1231 gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
1232 guint32 window_ack_size)
1234 GstRtmpProtocolControl pc = {
1235 .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
1236 .param = window_ack_size,
1239 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1241 gst_rtmp_connection_queue_message (connection,
1242 gst_rtmp_message_new_protocol_control (&pc));
1246 gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
1249 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1250 g_return_if_fail (GST_IS_BUFFER (buffer));
1252 gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
1253 gst_rtmp_connection_queue_message (connection, buffer);
1257 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
1260 GstRtmpProtocolControl pc;
1262 if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
1263 GST_ERROR_OBJECT (self, "can't parse protocol control message");
1268 case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
1269 guint32 chunk_size = pc.param;
1271 GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
1274 if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
1275 GST_ERROR_OBJECT (self,
1276 "requested chunk size %" G_GUINT32_FORMAT " is too small",
1281 if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
1282 GST_ERROR_OBJECT (self,
1283 "requested chunk size %" G_GUINT32_FORMAT " is too large",
1288 if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
1289 GST_WARNING_OBJECT (self,
1290 "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
1293 self->out_chunk_size_pending = pc.param;
1297 case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
1298 guint32 window_ack_size = pc.param;
1300 GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
1303 if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1304 GST_WARNING_OBJECT (self,
1305 "requesting small window ack size %" G_GUINT32_FORMAT,
1309 self->out_window_ack_size_pending = window_ack_size;
1321 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
1323 guint32 chunk_size, window_ack_size;
1325 chunk_size = self->out_chunk_size_pending;
1327 self->out_chunk_size_pending = 0;
1329 g_mutex_lock (&self->stats_lock);
1330 self->out_chunk_size = chunk_size;
1331 g_mutex_unlock (&self->stats_lock);
1333 GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
1336 window_ack_size = self->out_window_ack_size_pending;
1337 if (window_ack_size) {
1338 self->out_window_ack_size_pending = 0;
1340 g_mutex_lock (&self->stats_lock);
1341 self->out_window_ack_size = window_ack_size;
1342 g_mutex_unlock (&self->stats_lock);
1344 GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
1349 static GstStructure *
1350 get_stats (GstRtmpConnection * self)
1352 return gst_structure_new ("GstRtmpConnectionStats",
1353 "in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
1354 "out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
1355 "in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
1356 "out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
1357 "in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
1358 "out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
1359 "in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
1360 "out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
1364 gst_rtmp_connection_get_null_stats (void)
1366 return get_stats (NULL);
1370 gst_rtmp_connection_get_stats (GstRtmpConnection * self)
1374 g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
1376 g_mutex_lock (&self->stats_lock);
1377 s = get_stats (self);
1378 g_mutex_unlock (&self->stats_lock);