1 /* GStreamer RTMP Library
2 * Copyright (C) 2013 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
29 #include "rtmpclient.h"
30 #include "rtmphandshake.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
34 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
35 #define GST_CAT_DEFAULT gst_rtmp_client_debug_category
37 static void send_connect_done (const gchar * command_name, GPtrArray * args,
39 static void create_stream_done (const gchar * command_name, GPtrArray * args,
41 static void on_publish_or_play_status (const gchar * command_name,
42 GPtrArray * args, gpointer user_data);
47 static gsize done = 0;
48 if (g_once_init_enter (&done)) {
49 GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category,
50 "rtmpclient", 0, "debug category for the rtmp client");
51 GST_DEBUG_REGISTER_FUNCPTR (send_connect_done);
52 GST_DEBUG_REGISTER_FUNCPTR (create_stream_done);
53 GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status);
54 g_once_init_leave (&done, 1);
58 static const gchar *scheme_strings[] = {
64 #define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1)
67 gst_rtmp_scheme_get_type (void)
69 static gsize scheme_type = 0;
70 static const GEnumValue scheme[] = {
71 {GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"},
72 {GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"},
76 if (g_once_init_enter (&scheme_type)) {
77 GType tmp = g_enum_register_static ("GstRtmpScheme", scheme);
78 g_once_init_leave (&scheme_type, tmp);
81 return (GType) scheme_type;
85 gst_rtmp_scheme_from_string (const gchar * string)
90 for (value = 0; value < NUM_SCHEMES; value++) {
91 if (strcmp (scheme_strings[value], string) == 0) {
101 gst_rtmp_scheme_from_uri (const GstUri * uri)
103 const gchar *scheme = gst_uri_get_scheme (uri);
105 return GST_RTMP_SCHEME_RTMP;
108 return gst_rtmp_scheme_from_string (scheme);
112 gst_rtmp_scheme_to_string (GstRtmpScheme scheme)
114 if (scheme >= 0 && scheme < NUM_SCHEMES) {
115 return scheme_strings[scheme];
122 gst_rtmp_scheme_get_strings (void)
124 return scheme_strings;
128 gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme)
131 case GST_RTMP_SCHEME_RTMP:
134 case GST_RTMP_SCHEME_RTMPS:
138 g_return_val_if_reached (0);
143 gst_rtmp_authmod_get_type (void)
145 static gsize authmod_type = 0;
146 static const GEnumValue authmod[] = {
147 {GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"},
148 {GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"},
149 {GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"},
153 if (g_once_init_enter (&authmod_type)) {
154 GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod);
155 g_once_init_leave (&authmod_type, tmp);
158 return (GType) authmod_type;
162 gst_rtmp_authmod_get_nick (GstRtmpAuthmod value)
164 GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD);
165 GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL;
166 return ev ? ev->value_nick : "(unknown)";
170 gst_rtmp_stop_commands_get_type (void)
172 static gsize stop_commands_type = 0;
173 static const GFlagsValue stop_commands[] = {
174 {GST_RTMP_STOP_COMMANDS_NONE, "No command", "none"},
175 {GST_RTMP_STOP_COMMANDS_FCUNPUBLISH, "FCUnpublish", "fcunpublish"},
176 {GST_RTMP_STOP_COMMANDS_CLOSE_STREAM, "closeStream", "closestream"},
177 {GST_RTMP_STOP_COMMANDS_DELETE_STREAM, "deleteStream", "deletestream"},
181 if (g_once_init_enter (&stop_commands_type)) {
182 GType tmp = g_flags_register_static ("GstRtmpStopCommands", stop_commands);
183 g_once_init_leave (&stop_commands_type, tmp);
186 return (GType) stop_commands_type;
190 gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src)
192 g_return_if_fail (dest);
193 g_return_if_fail (src);
195 dest->scheme = src->scheme;
196 dest->host = g_strdup (src->host);
197 dest->port = src->port;
198 dest->application = g_strdup (src->application);
199 dest->stream = g_strdup (src->stream);
200 dest->username = g_strdup (src->username);
201 dest->password = g_strdup (src->password);
202 dest->secure_token = g_strdup (src->secure_token);
203 dest->authmod = src->authmod;
204 dest->timeout = src->timeout;
205 dest->tls_flags = src->tls_flags;
206 dest->flash_ver = g_strdup (src->flash_ver);
207 dest->publish = src->publish;
211 gst_rtmp_location_clear (GstRtmpLocation * location)
213 g_return_if_fail (location);
215 g_clear_pointer (&location->host, g_free);
217 g_clear_pointer (&location->application, g_free);
218 g_clear_pointer (&location->stream, g_free);
219 g_clear_pointer (&location->username, g_free);
220 g_clear_pointer (&location->password, g_free);
221 g_clear_pointer (&location->secure_token, g_free);
222 g_clear_pointer (&location->flash_ver, g_free);
223 location->publish = FALSE;
227 gst_rtmp_location_get_string (const GstRtmpLocation * location,
228 gboolean with_stream)
231 gchar *base, *string;
232 const gchar *scheme_string;
235 g_return_val_if_fail (location, NULL);
237 scheme_string = gst_rtmp_scheme_to_string (location->scheme);
238 default_port = gst_rtmp_scheme_get_default_port (location->scheme);
240 uri = gst_uri_new (scheme_string, NULL, location->host,
241 location->port == default_port ? GST_URI_NO_PORT : location->port, "/",
243 base = gst_uri_to_string (uri);
245 string = g_strconcat (base, location->application, with_stream ? "/" : NULL,
246 location->stream, NULL);
254 /* Flag values for the audioCodecs property,
255 * rtmp_specification_1.0.pdf page 32 */
258 SUPPORT_SND_NONE = 0x001, /* Raw sound, no compression */
259 SUPPORT_SND_ADPCM = 0x002, /* ADPCM compression */
260 SUPPORT_SND_MP3 = 0x004, /* mp3 compression */
261 SUPPORT_SND_INTEL = 0x008, /* Not used */
262 SUPPORT_SND_UNUSED = 0x010, /* Not used */
263 SUPPORT_SND_NELLY8 = 0x020, /* NellyMoser at 8-kHz compression */
264 SUPPORT_SND_NELLY = 0x040, /* NellyMoser compression
265 * (5, 11, 22, and 44 kHz) */
266 SUPPORT_SND_G711A = 0x080, /* G711A sound compression
267 * (Flash Media Server only) */
268 SUPPORT_SND_G711U = 0x100, /* G711U sound compression
269 * (Flash Media Server only) */
270 SUPPORT_SND_NELLY16 = 0x200, /* NellyMoser at 16-kHz compression */
271 SUPPORT_SND_AAC = 0x400, /* Advanced audio coding (AAC) codec */
272 SUPPORT_SND_SPEEX = 0x800, /* Speex Audio */
273 SUPPORT_SND_ALL = 0xFFF, /* All RTMP-supported audio codecs */
276 /* audioCodecs value sent by libavformat. All "used" codecs. */
277 #define GST_RTMP_AUDIOCODECS \
278 (SUPPORT_SND_ALL & ~SUPPORT_SND_INTEL & ~SUPPORT_SND_UNUSED)
279 G_STATIC_ASSERT (GST_RTMP_AUDIOCODECS == 4071); /* libavformat's magic number */
281 /* Flag values for the videoCodecs property,
282 * rtmp_specification_1.0.pdf page 32 */
285 SUPPORT_VID_UNUSED = 0x01, /* Obsolete value */
286 SUPPORT_VID_JPEG = 0x02, /* Obsolete value */
287 SUPPORT_VID_SORENSON = 0x04, /* Sorenson Flash video */
288 SUPPORT_VID_HOMEBREW = 0x08, /* V1 screen sharing */
289 SUPPORT_VID_VP6 = 0x10, /* On2 video (Flash 8+) */
290 SUPPORT_VID_VP6ALPHA = 0x20, /* On2 video with alpha channel */
291 SUPPORT_VID_HOMEBREWV = 0x40, /* Screen sharing version 2 (Flash 8+) */
292 SUPPORT_VID_H264 = 0x80, /* H264 video */
293 SUPPORT_VID_ALL = 0xFF, /* All RTMP-supported video codecs */
296 /* videoCodecs value sent by libavformat. All non-obsolete codecs. */
297 #define GST_RTMP_VIDEOCODECS \
298 (SUPPORT_VID_ALL & ~SUPPORT_VID_UNUSED & ~SUPPORT_VID_JPEG)
299 G_STATIC_ASSERT (GST_RTMP_VIDEOCODECS == 252); /* libavformat's magic number */
301 /* Flag values for the videoFunction property,
302 * rtmp_specification_1.0.pdf page 32 */
305 /* Indicates that the client can perform frame-accurate seeks. */
306 SUPPORT_VID_CLIENT_SEEK = 1,
309 /* videoFunction value sent by libavformat */
310 #define GST_RTMP_VIDEOFUNCTION (SUPPORT_VID_CLIENT_SEEK)
311 G_STATIC_ASSERT (GST_RTMP_VIDEOFUNCTION == 1); /* libavformat's magic number */
313 static void socket_connect (GTask * task);
314 static void socket_connect_done (GObject * source, GAsyncResult * result,
316 static void handshake_done (GObject * source, GAsyncResult * result,
318 static void send_connect (GTask * task);
319 static void send_stop (GstRtmpConnection * connection, const gchar * stream,
320 const GstRtmpStopCommands stop_commands);
321 static void send_secure_token_response (GTask * task,
322 GstRtmpConnection * connection, const gchar * challenge);
323 static void connection_error (GstRtmpConnection * connection,
326 #define DEFAULT_TIMEOUT 5
330 GstRtmpLocation location;
332 GstRtmpConnection *connection;
333 gulong error_handler_id;
336 static ConnectTaskData *
337 connect_task_data_new (const GstRtmpLocation * location)
339 ConnectTaskData *data = g_slice_new0 (ConnectTaskData);
340 gst_rtmp_location_copy (&data->location, location);
345 connect_task_data_free (gpointer ptr)
347 ConnectTaskData *data = ptr;
348 gst_rtmp_location_clear (&data->location);
349 g_clear_pointer (&data->auth_query, g_free);
350 if (data->error_handler_id) {
351 g_signal_handler_disconnect (data->connection, data->error_handler_id);
353 g_clear_object (&data->connection);
354 g_slice_free (ConnectTaskData, data);
357 static GRegex *auth_regex = NULL;
360 gst_rtmp_client_connect_async (const GstRtmpLocation * location,
361 GCancellable * cancellable, GAsyncReadyCallback callback,
368 if (g_once_init_enter (&auth_regex)) {
369 GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *"
370 "\\[ *authmod=(?<authmod>.*?) *\\] *: *"
371 "(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL);
372 g_once_init_leave (&auth_regex, re);
375 task = g_task_new (NULL, cancellable, callback, user_data);
377 g_task_set_task_data (task, connect_task_data_new (location),
378 connect_task_data_free);
380 socket_connect (task);
384 socket_connect (GTask * task)
386 ConnectTaskData *data = g_task_get_task_data (task);
387 GSocketConnectable *addr;
388 GSocketClient *socket_client;
390 if (data->location.timeout < 0) {
391 data->location.timeout = DEFAULT_TIMEOUT;
394 if (data->error_handler_id) {
395 g_signal_handler_disconnect (data->connection, data->error_handler_id);
396 data->error_handler_id = 0;
399 if (data->connection) {
400 gst_rtmp_connection_close (data->connection);
401 g_clear_object (&data->connection);
404 if (!data->location.host) {
405 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
407 g_object_unref (task);
411 if (!data->location.port) {
412 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
414 g_object_unref (task);
418 socket_client = g_socket_client_new ();
419 g_socket_client_set_timeout (socket_client, data->location.timeout);
421 switch (data->location.scheme) {
422 case GST_RTMP_SCHEME_RTMP:
425 case GST_RTMP_SCHEME_RTMPS:
426 GST_DEBUG ("Configuring TLS, validation flags 0x%02x",
427 data->location.tls_flags);
428 g_socket_client_set_tls (socket_client, TRUE);
429 G_GNUC_BEGIN_IGNORE_DEPRECATIONS;
430 g_socket_client_set_tls_validation_flags (socket_client,
431 data->location.tls_flags);
432 G_GNUC_END_IGNORE_DEPRECATIONS;
436 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
437 "Invalid scheme ID %d", data->location.scheme);
438 g_object_unref (socket_client);
439 g_object_unref (task);
443 addr = g_network_address_new (data->location.host, data->location.port);
445 GST_DEBUG ("Starting socket connection");
447 g_socket_client_connect_async (socket_client, addr,
448 g_task_get_cancellable (task), socket_connect_done, task);
449 g_object_unref (addr);
450 g_object_unref (socket_client);
454 socket_connect_done (GObject * source, GAsyncResult * result,
457 GSocketClient *socket_client = G_SOCKET_CLIENT (source);
458 GSocketConnection *socket_connection;
459 GTask *task = user_data;
460 GError *error = NULL;
463 g_socket_client_connect_finish (socket_client, result, &error);
465 if (g_task_return_error_if_cancelled (task)) {
466 GST_DEBUG ("Socket connection was cancelled");
467 g_object_unref (task);
471 if (socket_connection == NULL) {
472 GST_ERROR ("Socket connection error");
473 g_task_return_error (task, error);
474 g_object_unref (task);
478 GST_DEBUG ("Socket connection established");
480 gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE,
481 g_task_get_cancellable (task), handshake_done, task);
482 g_object_unref (socket_connection);
487 handshake_done (GObject * source, GAsyncResult * result, gpointer user_data)
489 GIOStream *stream = G_IO_STREAM (source);
490 GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream);
491 GTask *task = user_data;
492 ConnectTaskData *data = g_task_get_task_data (task);
493 GError *error = NULL;
496 res = gst_rtmp_client_handshake_finish (stream, result, &error);
498 g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
499 g_task_return_error (task, error);
500 g_object_unref (task);
504 data->connection = gst_rtmp_connection_new (socket_connection,
505 g_task_get_cancellable (task));
506 data->error_handler_id = g_signal_connect (data->connection,
507 "error", G_CALLBACK (connection_error), task);
513 connection_error (GstRtmpConnection * connection, gpointer user_data)
515 GTask *task = user_data;
516 if (!g_task_had_error (task))
517 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
518 "error during connection attempt");
522 do_adobe_auth (const gchar * username, const gchar * password,
523 const gchar * salt, const gchar * opaque, const gchar * challenge)
525 guint8 hash[16]; /* MD5 digest */
526 gsize hashlen = sizeof hash;
527 gchar *challenge2, *auth_query;
530 g_return_val_if_fail (username, NULL);
531 g_return_val_if_fail (password, NULL);
532 g_return_val_if_fail (salt, NULL);
534 md5 = g_checksum_new (G_CHECKSUM_MD5);
535 g_checksum_update (md5, (guchar *) username, -1);
536 g_checksum_update (md5, (guchar *) salt, -1);
537 g_checksum_update (md5, (guchar *) password, -1);
539 g_checksum_get_digest (md5, hash, &hashlen);
540 g_warn_if_fail (hashlen == sizeof hash);
543 gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
544 g_checksum_reset (md5);
545 g_checksum_update (md5, (guchar *) hashstr, -1);
550 g_checksum_update (md5, (guchar *) opaque, -1);
552 g_checksum_update (md5, (guchar *) challenge, -1);
554 challenge2 = g_strdup_printf ("%08x", g_random_int ());
555 g_checksum_update (md5, (guchar *) challenge2, -1);
557 g_checksum_get_digest (md5, hash, &hashlen);
558 g_warn_if_fail (hashlen == sizeof hash);
561 gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
566 ("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe",
567 username, challenge2, hashstr, opaque);
570 g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s",
571 "adobe", username, challenge2, hashstr);
576 g_checksum_free (md5);
583 send_connect (GTask * task)
585 ConnectTaskData *data = g_task_get_task_data (task);
587 const gchar *app, *flash_ver;
588 gchar *uri, *appstr = NULL, *uristr = NULL;
591 node = gst_amf_node_new_object ();
592 app = data->location.application;
593 flash_ver = data->location.flash_ver;
594 publish = data->location.publish;
595 uri = gst_rtmp_location_get_string (&data->location, FALSE);
598 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
599 "Application is not set");
600 g_object_unref (task);
605 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
606 "Flash version is not set");
607 g_object_unref (task);
611 if (data->auth_query) {
612 const gchar *query = data->auth_query;
613 appstr = g_strdup_printf ("%s?%s", app, query);
614 uristr = g_strdup_printf ("%s?%s", uri, query);
615 } else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) {
616 const gchar *user = data->location.username;
617 const gchar *authmod = "adobe";
620 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
621 "no username for adobe authentication");
622 g_object_unref (task);
626 if (!data->location.password) {
627 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
628 "no password for adobe authentication");
629 g_object_unref (task);
633 appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user);
634 uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user);
636 appstr = g_strdup (app);
637 uristr = g_strdup (uri);
640 /* Arguments for the connect command.
641 * Most of these are described in rtmp_specification_1.0.pdf page 30 */
643 /* "The server application name the client is connected to." */
644 gst_amf_node_append_field_take_string (node, "app", appstr, -1);
647 /* Undocumented. Sent by both libavformat and librtmp. */
648 gst_amf_node_append_field_string (node, "type", "nonprivate", -1);
651 /* "Flash Player version. It is the same string as returned by the
652 * ApplicationScript getversion () function." */
653 gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1);
655 /* "URL of the source SWF file making the connection."
656 * XXX: libavformat sends "swfUrl" here, if provided. */
658 /* "URL of the Server. It has the following format.
659 * protocol://servername:port/appName/appInstance" */
660 gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1);
663 /* "True if proxy is being used." */
664 gst_amf_node_append_field_boolean (node, "fpad", FALSE);
666 /* Undocumented. Sent by libavformat. */
667 gst_amf_node_append_field_number (node, "capabilities",
668 15 /* libavformat's magic number */ );
670 /* "Indicates what audio codecs the client supports." */
671 gst_amf_node_append_field_number (node, "audioCodecs",
672 GST_RTMP_AUDIOCODECS);
674 /* "Indicates what video codecs are supported." */
675 gst_amf_node_append_field_number (node, "videoCodecs",
676 GST_RTMP_VIDEOCODECS);
678 /* "Indicates what special video functions are supported." */
679 gst_amf_node_append_field_number (node, "videoFunction",
680 GST_RTMP_VIDEOFUNCTION);
682 /* "URL of the web page from where the SWF file was loaded."
683 * XXX: libavformat sends "pageUrl" here, if provided. */
686 gst_rtmp_connection_send_command (data->connection, send_connect_done,
687 task, 0, "connect", node, NULL);
690 gst_amf_node_free (node);
695 send_connect_done (const gchar * command_name, GPtrArray * args,
698 GTask *task = G_TASK (user_data);
699 ConnectTaskData *data = g_task_get_task_data (task);
700 const GstAmfNode *node, *optional_args;
703 if (g_task_return_error_if_cancelled (task)) {
704 g_object_unref (task);
709 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
710 "connect failed: %s", command_name);
711 g_object_unref (task);
716 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
717 "connect failed; not enough return arguments");
718 g_object_unref (task);
722 optional_args = g_ptr_array_index (args, 1);
724 node = gst_amf_node_get_field (optional_args, "code");
725 code = node ? gst_amf_node_peek_string (node, NULL) : NULL;
727 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
728 "result code missing from connect cmd result");
729 g_object_unref (task);
733 GST_INFO ("connect result: %s", code);
735 if (g_str_equal (code, "NetConnection.Connect.Success")) {
736 node = gst_amf_node_get_field (optional_args, "secureToken");
737 send_secure_token_response (task, data->connection,
738 node ? gst_amf_node_peek_string (node, NULL) : NULL);
742 if (g_str_equal (code, "NetConnection.Connect.Rejected")) {
743 GstRtmpAuthmod authmod = data->location.authmod;
744 GMatchInfo *match_info;
748 node = gst_amf_node_get_field (optional_args, "description");
749 desc = node ? gst_amf_node_peek_string (node, NULL) : NULL;
751 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
752 "Connect rejected; no description");
753 g_object_unref (task);
757 GST_DEBUG ("connect result desc: %s", desc);
759 if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) {
760 if (strstr (desc, "authmod=adobe")) {
761 GST_INFO ("Reconnecting with authmod=adobe");
762 data->location.authmod = GST_RTMP_AUTHMOD_ADOBE;
763 socket_connect (task);
767 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
768 "unhandled authentication mode: %s", desc);
769 g_object_unref (task);
773 if (!g_regex_match (auth_regex, desc, 0, &match_info)) {
774 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
775 "failed to parse auth rejection: %s", desc);
776 g_object_unref (task);
781 gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod");
782 gchar *query_str = g_match_info_fetch_named (match_info, "query");
785 GST_INFO ("regex parsed auth: authmod=%s, query=%s",
786 GST_STR_NULL (authmod_str), GST_STR_NULL (query_str));
787 g_match_info_free (match_info);
790 case GST_RTMP_AUTHMOD_ADOBE:
791 matches = g_str_equal (authmod_str, "adobe");
800 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
801 "server uses wrong authentication mode '%s'; expected %s",
802 GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod));
803 g_object_unref (task);
804 g_free (authmod_str);
808 g_free (authmod_str);
810 query = gst_uri_from_string (query_str);
812 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
813 "failed to parse authentication query '%s'",
814 GST_STR_NULL (query_str));
815 g_object_unref (task);
823 const gchar *reason = gst_uri_get_query_value (query, "reason");
826 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
827 "authentication failed; no reason: %s", desc);
828 g_object_unref (task);
829 gst_uri_unref (query);
833 if (g_str_equal (reason, "authfailed")) {
834 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
835 "authentication failed! wrong credentials?");
836 g_object_unref (task);
837 gst_uri_unref (query);
841 if (!g_str_equal (reason, "needauth")) {
842 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
843 "unhandled rejection reason '%s'", reason);
844 g_object_unref (task);
845 gst_uri_unref (query);
850 g_warn_if_fail (!data->auth_query);
851 data->auth_query = do_adobe_auth (data->location.username,
852 data->location.password, gst_uri_get_query_value (query, "salt"),
853 gst_uri_get_query_value (query, "opaque"),
854 gst_uri_get_query_value (query, "challenge"));
856 gst_uri_unref (query);
858 if (!data->auth_query) {
859 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
860 "couldn't generate adobe style authentication query");
861 g_object_unref (task);
865 socket_connect (task);
869 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
870 "unhandled connect result code: %s", code);
871 g_object_unref (task);
874 /* prep key: pack 1st 16 chars into 4 LittleEndian ints */
876 rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4])
880 g_return_if_fail (key);
881 g_return_if_fail (out);
883 /* ensure we can read 16 bytes */
884 strncpy (copy, key, 16);
885 /* placate GCC 8 -Wstringop-truncation */
888 out[0] = GST_READ_UINT32_LE (copy);
889 out[1] = GST_READ_UINT32_LE (copy + 4);
890 out[2] = GST_READ_UINT32_LE (copy + 8);
891 out[3] = GST_READ_UINT32_LE (copy + 12);
894 /* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */
896 rtmp_tea_decode_prep_text (const gchar * text)
901 g_return_val_if_fail (text, NULL);
904 arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8);
906 for (i = 0; i < len; i += 8) {
912 /* ensure we can read 8 bytes */
913 strncpy (copy, text + i, 8);
914 /* placate GCC 8 -Wstringop-truncation */
917 for (j = 0; j < 4; j++) {
920 hi = g_ascii_xdigit_value (copy[2 * j]);
921 lo = g_ascii_xdigit_value (copy[2 * j + 1]);
923 chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0);
926 val = GST_READ_UINT32_LE (chars);
927 g_array_append_val (arr, val);
933 /* return text from uint32s to chars */
935 rtmp_tea_decode_return_text (GArray * arr)
937 #if G_BYTE_ORDER != G_LITTLE_ENDIAN
940 g_return_val_if_fail (arr, NULL);
942 for (i = 0; i < arr->len; i++) {
943 guint32 *val = &g_array_index (arr, guint32, i);
944 *val = GUINT32_TO_LE (*val);
948 /* array is alredy zero-terminated */
949 return g_array_free (arr, FALSE);
952 /* http://www.movable-type.co.uk/scripts/tea-block.html */
954 rtmp_tea_decode_btea (GArray * text, guint32 key[4])
957 guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9;
960 g_return_if_fail (text);
961 g_return_if_fail (text->len > 0);
962 g_return_if_fail (key);
964 v = (guint32 *) text->data;
972 #define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z));
976 for (p = n - 1; p > 0; p--)
977 z = v[p - 1], y = v[p] -= MX;
986 /* taken from librtmp */
988 rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text)
993 rtmp_tea_decode_prep_key (bin_key, key);
994 text = rtmp_tea_decode_prep_text (hex_text);
995 rtmp_tea_decode_btea (text, key);
996 return rtmp_tea_decode_return_text (text);
1000 send_secure_token_response (GTask * task, GstRtmpConnection * connection,
1001 const gchar * challenge)
1003 ConnectTaskData *data = g_task_get_task_data (task);
1009 if (!data->location.secure_token || !data->location.secure_token[0]) {
1010 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
1011 "server requires secure token authentication");
1012 g_object_unref (task);
1016 response = rtmp_tea_decode (data->location.secure_token, challenge);
1018 GST_DEBUG ("response: %s", response);
1020 node1 = gst_amf_node_new_null ();
1021 node2 = gst_amf_node_new_take_string (response, -1);
1022 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1023 "secureTokenResponse", node1, node2, NULL);
1024 gst_amf_node_free (node1);
1025 gst_amf_node_free (node2);
1028 g_signal_handler_disconnect (connection, data->error_handler_id);
1029 data->error_handler_id = 0;
1031 g_task_return_pointer (task, g_object_ref (connection),
1032 gst_rtmp_connection_close_and_unref);
1033 g_object_unref (task);
1037 gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error)
1039 GTask *task = G_TASK (result);
1040 return g_task_propagate_pointer (task, error);
1043 static void send_create_stream (GTask * task);
1044 static void send_publish_or_play (GTask * task);
1048 GstRtmpConnection *connection;
1049 gulong error_handler_id;
1055 static StreamTaskData *
1056 stream_task_data_new (GstRtmpConnection * connection, const gchar * stream,
1059 StreamTaskData *data = g_slice_new0 (StreamTaskData);
1060 data->connection = g_object_ref (connection);
1061 data->stream = g_strdup (stream);
1062 data->publish = publish;
1067 stream_task_data_free (gpointer ptr)
1069 StreamTaskData *data = ptr;
1070 g_clear_pointer (&data->stream, g_free);
1071 if (data->error_handler_id) {
1072 g_signal_handler_disconnect (data->connection, data->error_handler_id);
1074 g_clear_object (&data->connection);
1075 g_slice_free (StreamTaskData, data);
1079 start_stream (GstRtmpConnection * connection, const gchar * stream,
1080 gboolean publish, GCancellable * cancellable,
1081 GAsyncReadyCallback callback, gpointer user_data)
1084 StreamTaskData *data;
1088 task = g_task_new (connection, cancellable, callback, user_data);
1091 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
1092 "Stream is not set");
1093 g_object_unref (task);
1097 data = stream_task_data_new (connection, stream, publish);
1098 g_task_set_task_data (task, data, stream_task_data_free);
1100 data->error_handler_id = g_signal_connect (connection,
1101 "error", G_CALLBACK (connection_error), task);
1103 send_create_stream (task);
1107 gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
1108 const gchar * stream, GCancellable * cancellable,
1109 GAsyncReadyCallback callback, gpointer user_data)
1111 start_stream (connection, stream, TRUE, cancellable, callback, user_data);
1115 gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
1116 const gchar * stream, GCancellable * cancellable,
1117 GAsyncReadyCallback callback, gpointer user_data)
1119 start_stream (connection, stream, FALSE, cancellable, callback, user_data);
1123 send_set_buffer_length (GstRtmpConnection * connection, guint32 stream,
1126 GstRtmpUserControl uc = {
1127 .type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH,
1132 gst_rtmp_connection_queue_message (connection,
1133 gst_rtmp_message_new_user_control (&uc));
1137 send_create_stream (GTask * task)
1139 GstRtmpConnection *connection = g_task_get_source_object (task);
1140 StreamTaskData *data = g_task_get_task_data (task);
1141 GstAmfNode *command_object, *stream_name;
1143 command_object = gst_amf_node_new_null ();
1144 stream_name = gst_amf_node_new_string (data->stream, -1);
1146 if (data->publish) {
1147 /* Not part of RTMP documentation */
1148 GST_DEBUG ("Releasing stream '%s'", data->stream);
1149 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1150 "releaseStream", command_object, stream_name, NULL);
1151 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1152 "FCPublish", command_object, stream_name, NULL);
1154 /* Matches librtmp */
1155 gst_rtmp_connection_request_window_size (connection,
1156 GST_RTMP_DEFAULT_WINDOW_ACK_SIZE);
1157 send_set_buffer_length (connection, 0, 300);
1160 GST_INFO ("Creating stream '%s'", data->stream);
1161 gst_rtmp_connection_send_command (connection, create_stream_done, task, 0,
1162 "createStream", command_object, NULL);
1164 gst_amf_node_free (stream_name);
1165 gst_amf_node_free (command_object);
1169 create_stream_done (const gchar * command_name, GPtrArray * args,
1172 GTask *task = G_TASK (user_data);
1173 StreamTaskData *data = g_task_get_task_data (task);
1176 if (g_task_return_error_if_cancelled (task)) {
1177 g_object_unref (task);
1182 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1183 "createStream failed: %s", command_name);
1184 g_object_unref (task);
1188 if (args->len < 2) {
1189 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1190 "createStream failed; not enough return arguments");
1191 g_object_unref (task);
1195 result = g_ptr_array_index (args, 1);
1196 if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) {
1197 GString *error_dump = g_string_new ("");
1199 gst_amf_node_dump (result, -1, error_dump);
1201 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1202 "createStream failed: %s", error_dump->str);
1203 g_object_unref (task);
1205 g_string_free (error_dump, TRUE);
1209 data->id = gst_amf_node_get_number (result);
1210 GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id);
1212 if (data->id == 0) {
1213 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
1214 "createStream returned ID 0");
1215 g_object_unref (task);
1219 send_publish_or_play (task);
1223 send_publish_or_play (GTask * task)
1225 GstRtmpConnection *connection = g_task_get_source_object (task);
1226 StreamTaskData *data = g_task_get_task_data (task);
1227 const gchar *command = data->publish ? "publish" : "play";
1228 GstAmfNode *command_object, *stream_name, *argument;
1230 command_object = gst_amf_node_new_null ();
1231 stream_name = gst_amf_node_new_string (data->stream, -1);
1233 if (data->publish) {
1234 /* publishing type (live, record, append) */
1235 argument = gst_amf_node_new_string ("live", -1);
1237 /* "Start" argument: -2 = live or recording, -1 = only live
1238 0 or positive = only recording, seek to X seconds */
1239 argument = gst_amf_node_new_number (-2);
1242 GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT,
1243 command, data->stream, data->id);
1244 gst_rtmp_connection_expect_command (connection, on_publish_or_play_status,
1245 task, data->id, "onStatus");
1246 gst_rtmp_connection_send_command (connection, NULL, NULL, data->id,
1247 command, command_object, stream_name, argument, NULL);
1249 if (!data->publish) {
1250 /* Matches librtmp */
1251 send_set_buffer_length (connection, data->id, 30000);
1254 gst_amf_node_free (command_object);
1255 gst_amf_node_free (stream_name);
1256 gst_amf_node_free (argument);
1260 on_publish_or_play_status (const gchar * command_name, GPtrArray * args,
1263 GTask *task = G_TASK (user_data);
1264 GstRtmpConnection *connection = g_task_get_source_object (task);
1265 StreamTaskData *data = g_task_get_task_data (task);
1266 const gchar *command = data->publish ? "publish" : "play", *code = NULL;
1269 if (g_task_return_error_if_cancelled (task)) {
1270 g_object_unref (task);
1275 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1276 "%s failed: %s", command, command_name);
1277 g_object_unref (task);
1281 if (args->len < 2) {
1282 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1283 "%s failed; not enough return arguments", command);
1284 g_object_unref (task);
1289 const GstAmfNode *info_object, *code_object;
1290 info_object = g_ptr_array_index (args, 1);
1291 code_object = gst_amf_node_get_field (info_object, "code");
1294 code = gst_amf_node_peek_string (code_object, NULL);
1297 info_dump = g_string_new ("");
1298 gst_amf_node_dump (info_object, -1, info_dump);
1301 if (data->publish) {
1302 if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) {
1303 GST_INFO ("publish success: %s", info_dump->str);
1304 g_task_return_boolean (task, TRUE);
1308 if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) {
1309 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS,
1310 "publish denied: stream already exists: %s", info_dump->str);
1314 if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) {
1315 g_task_return_new_error (task, G_IO_ERROR,
1316 G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str);
1320 if (g_strcmp0 (code, "NetStream.Play.Start") == 0 ||
1321 g_strcmp0 (code, "NetStream.Play.PublishNotify") == 0 ||
1322 g_strcmp0 (code, "NetStream.Play.Reset") == 0) {
1323 GST_INFO ("play success: %s", info_dump->str);
1324 g_task_return_boolean (task, TRUE);
1328 if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) {
1329 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND,
1330 "play denied: stream not found: %s", info_dump->str);
1335 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1336 "unhandled %s result: %s", command, info_dump->str);
1339 g_string_free (info_dump, TRUE);
1341 g_signal_handler_disconnect (connection, data->error_handler_id);
1342 data->error_handler_id = 0;
1344 g_object_unref (task);
1348 start_stream_finish (GstRtmpConnection * connection,
1349 GAsyncResult * result, guint32 * stream_id, GError ** error)
1352 StreamTaskData *data;
1354 g_return_val_if_fail (g_task_is_valid (result, connection), FALSE);
1356 task = G_TASK (result);
1358 if (!g_task_propagate_boolean (G_TASK (result), error)) {
1362 data = g_task_get_task_data (task);
1365 *stream_id = data->id;
1372 gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
1373 GAsyncResult * result, guint32 * stream_id, GError ** error)
1375 return start_stream_finish (connection, result, stream_id, error);
1379 gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
1380 GAsyncResult * result, guint32 * stream_id, GError ** error)
1382 return start_stream_finish (connection, result, stream_id, error);
1386 gst_rtmp_client_stop_publish (GstRtmpConnection * connection,
1387 const gchar * stream, const GstRtmpStopCommands stop_commands)
1389 send_stop (connection, stream, stop_commands);
1393 send_stop (GstRtmpConnection * connection, const gchar * stream,
1394 const GstRtmpStopCommands stop_commands)
1396 GstAmfNode *command_object, *stream_name;
1398 command_object = gst_amf_node_new_null ();
1399 stream_name = gst_amf_node_new_string (stream, -1);
1401 if (stop_commands & GST_RTMP_STOP_COMMANDS_FCUNPUBLISH) {
1402 GST_DEBUG ("Sending stop command 'FCUnpublish' for stream '%s'", stream);
1403 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1404 "FCUnpublish", command_object, stream_name, NULL);
1406 if (stop_commands & GST_RTMP_STOP_COMMANDS_CLOSE_STREAM) {
1407 GST_DEBUG ("Sending stop command 'closeStream' for stream '%s'", stream);
1408 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1409 "closeStream", command_object, stream_name, NULL);
1411 if (stop_commands & GST_RTMP_STOP_COMMANDS_DELETE_STREAM) {
1412 GST_DEBUG ("Sending stop command 'deleteStream' for stream '%s'", stream);
1413 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1414 "deleteStream", command_object, stream_name, NULL);
1417 gst_amf_node_free (stream_name);
1418 gst_amf_node_free (command_object);