cbbf627d50fb669bc925ff62ec5f3b2f2bba3bb3
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / rtmp2 / rtmp / rtmpclient.c
1 /* GStreamer RTMP Library
2  * Copyright (C) 2013 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include <gio/gio.h>
28 #include <string.h>
29 #include "rtmpclient.h"
30 #include "rtmphandshake.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
33
34 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
35 #define GST_CAT_DEFAULT gst_rtmp_client_debug_category
36
37 static void send_connect_done (const gchar * command_name, GPtrArray * args,
38     gpointer user_data);
39 static void create_stream_done (const gchar * command_name, GPtrArray * args,
40     gpointer user_data);
41 static void on_publish_or_play_status (const gchar * command_name,
42     GPtrArray * args, gpointer user_data);
43
44 static void
45 init_debug (void)
46 {
47   static gsize done = 0;
48   if (g_once_init_enter (&done)) {
49     GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category,
50         "rtmpclient", 0, "debug category for the rtmp client");
51     GST_DEBUG_REGISTER_FUNCPTR (send_connect_done);
52     GST_DEBUG_REGISTER_FUNCPTR (create_stream_done);
53     GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status);
54     g_once_init_leave (&done, 1);
55   }
56 }
57
58 static const gchar *scheme_strings[] = {
59   "rtmp",
60   "rtmps",
61   NULL
62 };
63
64 #define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1)
65
66 GType
67 gst_rtmp_scheme_get_type (void)
68 {
69   static gsize scheme_type = 0;
70   static const GEnumValue scheme[] = {
71     {GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"},
72     {GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"},
73     {0, NULL, NULL},
74   };
75
76   if (g_once_init_enter (&scheme_type)) {
77     GType tmp = g_enum_register_static ("GstRtmpScheme", scheme);
78     g_once_init_leave (&scheme_type, tmp);
79   }
80
81   return (GType) scheme_type;
82 }
83
84 GstRtmpScheme
85 gst_rtmp_scheme_from_string (const gchar * string)
86 {
87   if (string) {
88     gint value;
89
90     for (value = 0; value < NUM_SCHEMES; value++) {
91       if (strcmp (scheme_strings[value], string) == 0) {
92         return value;
93       }
94     }
95   }
96
97   return -1;
98 }
99
100 GstRtmpScheme
101 gst_rtmp_scheme_from_uri (const GstUri * uri)
102 {
103   const gchar *scheme = gst_uri_get_scheme (uri);
104   if (!scheme) {
105     return GST_RTMP_SCHEME_RTMP;
106   }
107
108   return gst_rtmp_scheme_from_string (scheme);
109 }
110
111 const gchar *
112 gst_rtmp_scheme_to_string (GstRtmpScheme scheme)
113 {
114   if (scheme >= 0 && scheme < NUM_SCHEMES) {
115     return scheme_strings[scheme];
116   }
117
118   return "invalid";
119 }
120
121 const gchar *const *
122 gst_rtmp_scheme_get_strings (void)
123 {
124   return scheme_strings;
125 }
126
127 guint
128 gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme)
129 {
130   switch (scheme) {
131     case GST_RTMP_SCHEME_RTMP:
132       return 1935;
133
134     case GST_RTMP_SCHEME_RTMPS:
135       return 443;
136
137     default:
138       g_return_val_if_reached (0);
139   }
140 }
141
142 GType
143 gst_rtmp_authmod_get_type (void)
144 {
145   static gsize authmod_type = 0;
146   static const GEnumValue authmod[] = {
147     {GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"},
148     {GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"},
149     {GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"},
150     {0, NULL, NULL},
151   };
152
153   if (g_once_init_enter (&authmod_type)) {
154     GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod);
155     g_once_init_leave (&authmod_type, tmp);
156   }
157
158   return (GType) authmod_type;
159 }
160
161 static const gchar *
162 gst_rtmp_authmod_get_nick (GstRtmpAuthmod value)
163 {
164   GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD);
165   GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL;
166   return ev ? ev->value_nick : "(unknown)";
167 }
168
169 GType
170 gst_rtmp_stop_commands_get_type (void)
171 {
172   static gsize stop_commands_type = 0;
173   static const GFlagsValue stop_commands[] = {
174     {GST_RTMP_STOP_COMMANDS_NONE, "No command", "none"},
175     {GST_RTMP_STOP_COMMANDS_FCUNPUBLISH, "FCUnpublish", "fcunpublish"},
176     {GST_RTMP_STOP_COMMANDS_CLOSE_STREAM, "closeStream", "closestream"},
177     {GST_RTMP_STOP_COMMANDS_DELETE_STREAM, "deleteStream", "deletestream"},
178     {0, NULL, NULL},
179   };
180
181   if (g_once_init_enter (&stop_commands_type)) {
182     GType tmp = g_flags_register_static ("GstRtmpStopCommands", stop_commands);
183     g_once_init_leave (&stop_commands_type, tmp);
184   }
185
186   return (GType) stop_commands_type;
187 }
188
189 void
190 gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src)
191 {
192   g_return_if_fail (dest);
193   g_return_if_fail (src);
194
195   dest->scheme = src->scheme;
196   dest->host = g_strdup (src->host);
197   dest->port = src->port;
198   dest->application = g_strdup (src->application);
199   dest->stream = g_strdup (src->stream);
200   dest->username = g_strdup (src->username);
201   dest->password = g_strdup (src->password);
202   dest->secure_token = g_strdup (src->secure_token);
203   dest->authmod = src->authmod;
204   dest->timeout = src->timeout;
205   dest->tls_flags = src->tls_flags;
206   dest->flash_ver = g_strdup (src->flash_ver);
207   dest->publish = src->publish;
208 }
209
210 void
211 gst_rtmp_location_clear (GstRtmpLocation * location)
212 {
213   g_return_if_fail (location);
214
215   g_clear_pointer (&location->host, g_free);
216   location->port = 0;
217   g_clear_pointer (&location->application, g_free);
218   g_clear_pointer (&location->stream, g_free);
219   g_clear_pointer (&location->username, g_free);
220   g_clear_pointer (&location->password, g_free);
221   g_clear_pointer (&location->secure_token, g_free);
222   g_clear_pointer (&location->flash_ver, g_free);
223   location->publish = FALSE;
224 }
225
226 gchar *
227 gst_rtmp_location_get_string (const GstRtmpLocation * location,
228     gboolean with_stream)
229 {
230   GstUri *uri;
231   gchar *base, *string;
232   const gchar *scheme_string;
233   guint default_port;
234
235   g_return_val_if_fail (location, NULL);
236
237   scheme_string = gst_rtmp_scheme_to_string (location->scheme);
238   default_port = gst_rtmp_scheme_get_default_port (location->scheme);
239
240   uri = gst_uri_new (scheme_string, NULL, location->host,
241       location->port == default_port ? GST_URI_NO_PORT : location->port, "/",
242       NULL, NULL);
243   base = gst_uri_to_string (uri);
244
245   string = g_strconcat (base, location->application, with_stream ? "/" : NULL,
246       location->stream, NULL);
247
248   g_free (base);
249   gst_uri_unref (uri);
250
251   return string;
252 }
253
254 /* Flag values for the audioCodecs property,
255  * rtmp_specification_1.0.pdf page 32 */
256 enum
257 {
258   SUPPORT_SND_NONE = 0x001,     /* Raw sound, no compression */
259   SUPPORT_SND_ADPCM = 0x002,    /* ADPCM compression */
260   SUPPORT_SND_MP3 = 0x004,      /* mp3 compression */
261   SUPPORT_SND_INTEL = 0x008,    /* Not used */
262   SUPPORT_SND_UNUSED = 0x010,   /* Not used */
263   SUPPORT_SND_NELLY8 = 0x020,   /* NellyMoser at 8-kHz compression */
264   SUPPORT_SND_NELLY = 0x040,    /* NellyMoser compression
265                                  * (5, 11, 22, and 44 kHz) */
266   SUPPORT_SND_G711A = 0x080,    /* G711A sound compression
267                                  * (Flash Media Server only) */
268   SUPPORT_SND_G711U = 0x100,    /* G711U sound compression
269                                  * (Flash Media Server only) */
270   SUPPORT_SND_NELLY16 = 0x200,  /* NellyMoser at 16-kHz compression */
271   SUPPORT_SND_AAC = 0x400,      /* Advanced audio coding (AAC) codec */
272   SUPPORT_SND_SPEEX = 0x800,    /* Speex Audio */
273   SUPPORT_SND_ALL = 0xFFF,      /* All RTMP-supported audio codecs */
274 };
275
276 /* audioCodecs value sent by libavformat. All "used" codecs. */
277 #define GST_RTMP_AUDIOCODECS \
278   (SUPPORT_SND_ALL & ~SUPPORT_SND_INTEL & ~SUPPORT_SND_UNUSED)
279 G_STATIC_ASSERT (GST_RTMP_AUDIOCODECS == 4071); /* libavformat's magic number */
280
281 /* Flag values for the videoCodecs property,
282  * rtmp_specification_1.0.pdf page 32 */
283 enum
284 {
285   SUPPORT_VID_UNUSED = 0x01,    /* Obsolete value */
286   SUPPORT_VID_JPEG = 0x02,      /* Obsolete value */
287   SUPPORT_VID_SORENSON = 0x04,  /* Sorenson Flash video */
288   SUPPORT_VID_HOMEBREW = 0x08,  /* V1 screen sharing */
289   SUPPORT_VID_VP6 = 0x10,       /* On2 video (Flash 8+) */
290   SUPPORT_VID_VP6ALPHA = 0x20,  /* On2 video with alpha channel */
291   SUPPORT_VID_HOMEBREWV = 0x40, /* Screen sharing version 2 (Flash 8+) */
292   SUPPORT_VID_H264 = 0x80,      /* H264 video */
293   SUPPORT_VID_ALL = 0xFF,       /* All RTMP-supported video codecs */
294 };
295
296 /* videoCodecs value sent by libavformat. All non-obsolete codecs. */
297 #define GST_RTMP_VIDEOCODECS \
298   (SUPPORT_VID_ALL & ~SUPPORT_VID_UNUSED & ~SUPPORT_VID_JPEG)
299 G_STATIC_ASSERT (GST_RTMP_VIDEOCODECS == 252);  /* libavformat's magic number */
300
301 /* Flag values for the videoFunction property,
302  * rtmp_specification_1.0.pdf page 32 */
303 enum
304 {
305   /* Indicates that the client can perform frame-accurate seeks. */
306   SUPPORT_VID_CLIENT_SEEK = 1,
307 };
308
309 /* videoFunction value sent by libavformat */
310 #define GST_RTMP_VIDEOFUNCTION (SUPPORT_VID_CLIENT_SEEK)
311 G_STATIC_ASSERT (GST_RTMP_VIDEOFUNCTION == 1);  /* libavformat's magic number */
312
313 static void socket_connect (GTask * task);
314 static void socket_connect_done (GObject * source, GAsyncResult * result,
315     gpointer user_data);
316 static void handshake_done (GObject * source, GAsyncResult * result,
317     gpointer user_data);
318 static void send_connect (GTask * task);
319 static void send_stop (GstRtmpConnection * connection, const gchar * stream,
320     const GstRtmpStopCommands stop_commands);
321 static void send_secure_token_response (GTask * task,
322     GstRtmpConnection * connection, const gchar * challenge);
323 static void connection_error (GstRtmpConnection * connection,
324     gpointer user_data);
325
326 #define DEFAULT_TIMEOUT 5
327
328 typedef struct
329 {
330   GstRtmpLocation location;
331   gchar *auth_query;
332   GstRtmpConnection *connection;
333   gulong error_handler_id;
334 } ConnectTaskData;
335
336 static ConnectTaskData *
337 connect_task_data_new (const GstRtmpLocation * location)
338 {
339   ConnectTaskData *data = g_slice_new0 (ConnectTaskData);
340   gst_rtmp_location_copy (&data->location, location);
341   return data;
342 }
343
344 static void
345 connect_task_data_free (gpointer ptr)
346 {
347   ConnectTaskData *data = ptr;
348   gst_rtmp_location_clear (&data->location);
349   g_clear_pointer (&data->auth_query, g_free);
350   if (data->error_handler_id) {
351     g_signal_handler_disconnect (data->connection, data->error_handler_id);
352   }
353   g_clear_object (&data->connection);
354   g_slice_free (ConnectTaskData, data);
355 }
356
357 static GRegex *auth_regex = NULL;
358
359 void
360 gst_rtmp_client_connect_async (const GstRtmpLocation * location,
361     GCancellable * cancellable, GAsyncReadyCallback callback,
362     gpointer user_data)
363 {
364   GTask *task;
365
366   init_debug ();
367
368   if (g_once_init_enter (&auth_regex)) {
369     GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *"
370         "\\[ *authmod=(?<authmod>.*?) *\\] *: *"
371         "(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL);
372     g_once_init_leave (&auth_regex, re);
373   }
374
375   task = g_task_new (NULL, cancellable, callback, user_data);
376
377   g_task_set_task_data (task, connect_task_data_new (location),
378       connect_task_data_free);
379
380   socket_connect (task);
381 }
382
383 static void
384 socket_connect (GTask * task)
385 {
386   ConnectTaskData *data = g_task_get_task_data (task);
387   GSocketConnectable *addr;
388   GSocketClient *socket_client;
389
390   if (data->location.timeout < 0) {
391     data->location.timeout = DEFAULT_TIMEOUT;
392   }
393
394   if (data->error_handler_id) {
395     g_signal_handler_disconnect (data->connection, data->error_handler_id);
396     data->error_handler_id = 0;
397   }
398
399   if (data->connection) {
400     gst_rtmp_connection_close (data->connection);
401     g_clear_object (&data->connection);
402   }
403
404   if (!data->location.host) {
405     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
406         "Host is not set");
407     g_object_unref (task);
408     return;
409   }
410
411   if (!data->location.port) {
412     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
413         "Port is not set");
414     g_object_unref (task);
415     return;
416   }
417
418   socket_client = g_socket_client_new ();
419   g_socket_client_set_timeout (socket_client, data->location.timeout);
420
421   switch (data->location.scheme) {
422     case GST_RTMP_SCHEME_RTMP:
423       break;
424
425     case GST_RTMP_SCHEME_RTMPS:
426       GST_DEBUG ("Configuring TLS, validation flags 0x%02x",
427           data->location.tls_flags);
428       g_socket_client_set_tls (socket_client, TRUE);
429       G_GNUC_BEGIN_IGNORE_DEPRECATIONS;
430       g_socket_client_set_tls_validation_flags (socket_client,
431           data->location.tls_flags);
432       G_GNUC_END_IGNORE_DEPRECATIONS;
433       break;
434
435     default:
436       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
437           "Invalid scheme ID %d", data->location.scheme);
438       g_object_unref (socket_client);
439       g_object_unref (task);
440       return;
441   }
442
443   addr = g_network_address_new (data->location.host, data->location.port);
444
445   GST_DEBUG ("Starting socket connection");
446
447   g_socket_client_connect_async (socket_client, addr,
448       g_task_get_cancellable (task), socket_connect_done, task);
449   g_object_unref (addr);
450   g_object_unref (socket_client);
451 }
452
453 static void
454 socket_connect_done (GObject * source, GAsyncResult * result,
455     gpointer user_data)
456 {
457   GSocketClient *socket_client = G_SOCKET_CLIENT (source);
458   GSocketConnection *socket_connection;
459   GTask *task = user_data;
460   GError *error = NULL;
461
462   socket_connection =
463       g_socket_client_connect_finish (socket_client, result, &error);
464
465   if (g_task_return_error_if_cancelled (task)) {
466     GST_DEBUG ("Socket connection was cancelled");
467     g_object_unref (task);
468     return;
469   }
470
471   if (socket_connection == NULL) {
472     GST_ERROR ("Socket connection error");
473     g_task_return_error (task, error);
474     g_object_unref (task);
475     return;
476   }
477
478   GST_DEBUG ("Socket connection established");
479
480   gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE,
481       g_task_get_cancellable (task), handshake_done, task);
482   g_object_unref (socket_connection);
483 }
484
485
486 static void
487 handshake_done (GObject * source, GAsyncResult * result, gpointer user_data)
488 {
489   GIOStream *stream = G_IO_STREAM (source);
490   GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream);
491   GTask *task = user_data;
492   ConnectTaskData *data = g_task_get_task_data (task);
493   GError *error = NULL;
494   gboolean res;
495
496   res = gst_rtmp_client_handshake_finish (stream, result, &error);
497   if (!res) {
498     g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
499     g_task_return_error (task, error);
500     g_object_unref (task);
501     return;
502   }
503
504   data->connection = gst_rtmp_connection_new (socket_connection,
505       g_task_get_cancellable (task));
506   data->error_handler_id = g_signal_connect (data->connection,
507       "error", G_CALLBACK (connection_error), task);
508
509   send_connect (task);
510 }
511
512 static void
513 connection_error (GstRtmpConnection * connection, gpointer user_data)
514 {
515   GTask *task = user_data;
516   if (!g_task_had_error (task))
517     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
518         "error during connection attempt");
519 }
520
521 static gchar *
522 do_adobe_auth (const gchar * username, const gchar * password,
523     const gchar * salt, const gchar * opaque, const gchar * challenge)
524 {
525   guint8 hash[16];              /* MD5 digest */
526   gsize hashlen = sizeof hash;
527   gchar *challenge2, *auth_query;
528   GChecksum *md5;
529
530   g_return_val_if_fail (username, NULL);
531   g_return_val_if_fail (password, NULL);
532   g_return_val_if_fail (salt, NULL);
533
534   md5 = g_checksum_new (G_CHECKSUM_MD5);
535   g_checksum_update (md5, (guchar *) username, -1);
536   g_checksum_update (md5, (guchar *) salt, -1);
537   g_checksum_update (md5, (guchar *) password, -1);
538
539   g_checksum_get_digest (md5, hash, &hashlen);
540   g_warn_if_fail (hashlen == sizeof hash);
541
542   {
543     gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
544     g_checksum_reset (md5);
545     g_checksum_update (md5, (guchar *) hashstr, -1);
546     g_free (hashstr);
547   }
548
549   if (opaque)
550     g_checksum_update (md5, (guchar *) opaque, -1);
551   else if (challenge)
552     g_checksum_update (md5, (guchar *) challenge, -1);
553
554   challenge2 = g_strdup_printf ("%08x", g_random_int ());
555   g_checksum_update (md5, (guchar *) challenge2, -1);
556
557   g_checksum_get_digest (md5, hash, &hashlen);
558   g_warn_if_fail (hashlen == sizeof hash);
559
560   {
561     gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
562
563     if (opaque) {
564       auth_query =
565           g_strdup_printf
566           ("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe",
567           username, challenge2, hashstr, opaque);
568     } else {
569       auth_query =
570           g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s",
571           "adobe", username, challenge2, hashstr);
572     }
573     g_free (hashstr);
574   }
575
576   g_checksum_free (md5);
577   g_free (challenge2);
578
579   return auth_query;
580 }
581
582 static void
583 send_connect (GTask * task)
584 {
585   ConnectTaskData *data = g_task_get_task_data (task);
586   GstAmfNode *node;
587   const gchar *app, *flash_ver;
588   gchar *uri, *appstr = NULL, *uristr = NULL;
589   gboolean publish;
590
591   node = gst_amf_node_new_object ();
592   app = data->location.application;
593   flash_ver = data->location.flash_ver;
594   publish = data->location.publish;
595   uri = gst_rtmp_location_get_string (&data->location, FALSE);
596
597   if (!app) {
598     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
599         "Application is not set");
600     g_object_unref (task);
601     goto out;
602   }
603
604   if (!flash_ver) {
605     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
606         "Flash version is not set");
607     g_object_unref (task);
608     goto out;
609   }
610
611   if (data->auth_query) {
612     const gchar *query = data->auth_query;
613     appstr = g_strdup_printf ("%s?%s", app, query);
614     uristr = g_strdup_printf ("%s?%s", uri, query);
615   } else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) {
616     const gchar *user = data->location.username;
617     const gchar *authmod = "adobe";
618
619     if (!user) {
620       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
621           "no username for adobe authentication");
622       g_object_unref (task);
623       goto out;
624     }
625
626     if (!data->location.password) {
627       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
628           "no password for adobe authentication");
629       g_object_unref (task);
630       goto out;
631     }
632
633     appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user);
634     uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user);
635   } else {
636     appstr = g_strdup (app);
637     uristr = g_strdup (uri);
638   }
639
640   /* Arguments for the connect command.
641    * Most of these are described in rtmp_specification_1.0.pdf page 30 */
642
643   /* "The server application name the client is connected to." */
644   gst_amf_node_append_field_take_string (node, "app", appstr, -1);
645
646   if (publish) {
647     /* Undocumented. Sent by both libavformat and librtmp. */
648     gst_amf_node_append_field_string (node, "type", "nonprivate", -1);
649   }
650
651   /* "Flash Player version. It is the same string as returned by the
652    * ApplicationScript getversion () function." */
653   gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1);
654
655   /* "URL of the source SWF file making the connection."
656    * XXX: libavformat sends "swfUrl" here, if provided. */
657
658   /* "URL of the Server. It has the following format.
659    * protocol://servername:port/appName/appInstance" */
660   gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1);
661
662   if (!publish) {
663     /* "True if proxy is being used." */
664     gst_amf_node_append_field_boolean (node, "fpad", FALSE);
665
666     /* Undocumented. Sent by libavformat. */
667     gst_amf_node_append_field_number (node, "capabilities",
668         15 /* libavformat's magic number */ );
669
670     /* "Indicates what audio codecs the client supports." */
671     gst_amf_node_append_field_number (node, "audioCodecs",
672         GST_RTMP_AUDIOCODECS);
673
674     /* "Indicates what video codecs are supported." */
675     gst_amf_node_append_field_number (node, "videoCodecs",
676         GST_RTMP_VIDEOCODECS);
677
678     /* "Indicates what special video functions are supported." */
679     gst_amf_node_append_field_number (node, "videoFunction",
680         GST_RTMP_VIDEOFUNCTION);
681
682     /* "URL of the web page from where the SWF file was loaded."
683      * XXX: libavformat sends "pageUrl" here, if provided. */
684   }
685
686   gst_rtmp_connection_send_command (data->connection, send_connect_done,
687       task, 0, "connect", node, NULL);
688
689 out:
690   gst_amf_node_free (node);
691   g_free (uri);
692 }
693
694 static void
695 send_connect_done (const gchar * command_name, GPtrArray * args,
696     gpointer user_data)
697 {
698   GTask *task = G_TASK (user_data);
699   ConnectTaskData *data = g_task_get_task_data (task);
700   const GstAmfNode *node, *optional_args;
701   const gchar *code;
702
703   if (g_task_return_error_if_cancelled (task)) {
704     g_object_unref (task);
705     return;
706   }
707
708   if (!args) {
709     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
710         "connect failed: %s", command_name);
711     g_object_unref (task);
712     return;
713   }
714
715   if (args->len < 2) {
716     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
717         "connect failed; not enough return arguments");
718     g_object_unref (task);
719     return;
720   }
721
722   optional_args = g_ptr_array_index (args, 1);
723
724   node = gst_amf_node_get_field (optional_args, "code");
725   code = node ? gst_amf_node_peek_string (node, NULL) : NULL;
726   if (!code) {
727     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
728         "result code missing from connect cmd result");
729     g_object_unref (task);
730     return;
731   }
732
733   GST_INFO ("connect result: %s", code);
734
735   if (g_str_equal (code, "NetConnection.Connect.Success")) {
736     node = gst_amf_node_get_field (optional_args, "secureToken");
737     send_secure_token_response (task, data->connection,
738         node ? gst_amf_node_peek_string (node, NULL) : NULL);
739     return;
740   }
741
742   if (g_str_equal (code, "NetConnection.Connect.Rejected")) {
743     GstRtmpAuthmod authmod = data->location.authmod;
744     GMatchInfo *match_info;
745     const gchar *desc;
746     GstUri *query;
747
748     node = gst_amf_node_get_field (optional_args, "description");
749     desc = node ? gst_amf_node_peek_string (node, NULL) : NULL;
750     if (!desc) {
751       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
752           "Connect rejected; no description");
753       g_object_unref (task);
754       return;
755     }
756
757     GST_DEBUG ("connect result desc: %s", desc);
758
759     if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) {
760       if (strstr (desc, "authmod=adobe")) {
761         GST_INFO ("Reconnecting with authmod=adobe");
762         data->location.authmod = GST_RTMP_AUTHMOD_ADOBE;
763         socket_connect (task);
764         return;
765       }
766
767       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
768           "unhandled authentication mode: %s", desc);
769       g_object_unref (task);
770       return;
771     }
772
773     if (!g_regex_match (auth_regex, desc, 0, &match_info)) {
774       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
775           "failed to parse auth rejection: %s", desc);
776       g_object_unref (task);
777       return;
778     }
779
780     {
781       gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod");
782       gchar *query_str = g_match_info_fetch_named (match_info, "query");
783       gboolean matches;
784
785       GST_INFO ("regex parsed auth: authmod=%s, query=%s",
786           GST_STR_NULL (authmod_str), GST_STR_NULL (query_str));
787       g_match_info_free (match_info);
788
789       switch (authmod) {
790         case GST_RTMP_AUTHMOD_ADOBE:
791           matches = g_str_equal (authmod_str, "adobe");
792           break;
793
794         default:
795           matches = FALSE;
796           break;
797       }
798
799       if (!matches) {
800         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
801             "server uses wrong authentication mode '%s'; expected %s",
802             GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod));
803         g_object_unref (task);
804         g_free (authmod_str);
805         g_free (query_str);
806         return;
807       }
808       g_free (authmod_str);
809
810       query = gst_uri_from_string (query_str);
811       if (!query) {
812         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
813             "failed to parse authentication query '%s'",
814             GST_STR_NULL (query_str));
815         g_object_unref (task);
816         g_free (query_str);
817         return;
818       }
819       g_free (query_str);
820     }
821
822     {
823       const gchar *reason = gst_uri_get_query_value (query, "reason");
824
825       if (!reason) {
826         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
827             "authentication failed; no reason: %s", desc);
828         g_object_unref (task);
829         gst_uri_unref (query);
830         return;
831       }
832
833       if (g_str_equal (reason, "authfailed")) {
834         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
835             "authentication failed! wrong credentials?");
836         g_object_unref (task);
837         gst_uri_unref (query);
838         return;
839       }
840
841       if (!g_str_equal (reason, "needauth")) {
842         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
843             "unhandled rejection reason '%s'", reason);
844         g_object_unref (task);
845         gst_uri_unref (query);
846         return;
847       }
848     }
849
850     g_warn_if_fail (!data->auth_query);
851     data->auth_query = do_adobe_auth (data->location.username,
852         data->location.password, gst_uri_get_query_value (query, "salt"),
853         gst_uri_get_query_value (query, "opaque"),
854         gst_uri_get_query_value (query, "challenge"));
855
856     gst_uri_unref (query);
857
858     if (!data->auth_query) {
859       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
860           "couldn't generate adobe style authentication query");
861       g_object_unref (task);
862       return;
863     }
864
865     socket_connect (task);
866     return;
867   }
868
869   g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
870       "unhandled connect result code: %s", code);
871   g_object_unref (task);
872 }
873
874 /* prep key: pack 1st 16 chars into 4 LittleEndian ints */
875 static void
876 rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4])
877 {
878   gchar copy[17];
879
880   g_return_if_fail (key);
881   g_return_if_fail (out);
882
883   /* ensure we can read 16 bytes */
884   strncpy (copy, key, 16);
885   /* placate GCC 8 -Wstringop-truncation */
886   copy[16] = 0;
887
888   out[0] = GST_READ_UINT32_LE (copy);
889   out[1] = GST_READ_UINT32_LE (copy + 4);
890   out[2] = GST_READ_UINT32_LE (copy + 8);
891   out[3] = GST_READ_UINT32_LE (copy + 12);
892 }
893
894 /* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */
895 static GArray *
896 rtmp_tea_decode_prep_text (const gchar * text)
897 {
898   GArray *arr;
899   gsize len, i;
900
901   g_return_val_if_fail (text, NULL);
902
903   len = strlen (text);
904   arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8);
905
906   for (i = 0; i < len; i += 8) {
907     gchar copy[9];
908     guchar chars[4];
909     gsize j;
910     guint32 val;
911
912     /* ensure we can read 8 bytes */
913     strncpy (copy, text + i, 8);
914     /* placate GCC 8 -Wstringop-truncation */
915     copy[8] = 0;
916
917     for (j = 0; j < 4; j++) {
918       gint hi, lo;
919
920       hi = g_ascii_xdigit_value (copy[2 * j]);
921       lo = g_ascii_xdigit_value (copy[2 * j + 1]);
922
923       chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0);
924     }
925
926     val = GST_READ_UINT32_LE (chars);
927     g_array_append_val (arr, val);
928   }
929
930   return arr;
931 }
932
933 /* return text from uint32s to chars */
934 static gchar *
935 rtmp_tea_decode_return_text (GArray * arr)
936 {
937 #if G_BYTE_ORDER != G_LITTLE_ENDIAN
938   gsize i;
939
940   g_return_val_if_fail (arr, NULL);
941
942   for (i = 0; i < arr->len; i++) {
943     guint32 *val = &g_array_index (arr, guint32, i);
944     *val = GUINT32_TO_LE (*val);
945   }
946 #endif
947
948   /* array is alredy zero-terminated */
949   return g_array_free (arr, FALSE);
950 }
951
952 /* http://www.movable-type.co.uk/scripts/tea-block.html */
953 static void
954 rtmp_tea_decode_btea (GArray * text, guint32 key[4])
955 {
956   guint32 *v, n, *k;
957   guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9;
958   guint32 p, q;
959
960   g_return_if_fail (text);
961   g_return_if_fail (text->len > 0);
962   g_return_if_fail (key);
963
964   v = (guint32 *) text->data;
965   n = text->len;
966   k = key;
967   z = v[n - 1];
968   y = v[0];
969   q = 6 + 52 / n;
970   sum = q * DELTA;
971
972 #define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z));
973
974   while (sum != 0) {
975     e = sum >> 2 & 3;
976     for (p = n - 1; p > 0; p--)
977       z = v[p - 1], y = v[p] -= MX;
978     z = v[n - 1];
979     y = v[0] -= MX;
980     sum -= DELTA;
981   }
982
983 #undef MX
984 }
985
986 /* taken from librtmp */
987 static gchar *
988 rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text)
989 {
990   guint32 key[4];
991   GArray *text;
992
993   rtmp_tea_decode_prep_key (bin_key, key);
994   text = rtmp_tea_decode_prep_text (hex_text);
995   rtmp_tea_decode_btea (text, key);
996   return rtmp_tea_decode_return_text (text);
997 }
998
999 static void
1000 send_secure_token_response (GTask * task, GstRtmpConnection * connection,
1001     const gchar * challenge)
1002 {
1003   ConnectTaskData *data = g_task_get_task_data (task);
1004   if (challenge) {
1005     GstAmfNode *node1;
1006     GstAmfNode *node2;
1007     gchar *response;
1008
1009     if (!data->location.secure_token || !data->location.secure_token[0]) {
1010       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
1011           "server requires secure token authentication");
1012       g_object_unref (task);
1013       return;
1014     }
1015
1016     response = rtmp_tea_decode (data->location.secure_token, challenge);
1017
1018     GST_DEBUG ("response: %s", response);
1019
1020     node1 = gst_amf_node_new_null ();
1021     node2 = gst_amf_node_new_take_string (response, -1);
1022     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1023         "secureTokenResponse", node1, node2, NULL);
1024     gst_amf_node_free (node1);
1025     gst_amf_node_free (node2);
1026   }
1027
1028   g_signal_handler_disconnect (connection, data->error_handler_id);
1029   data->error_handler_id = 0;
1030
1031   g_task_return_pointer (task, g_object_ref (connection),
1032       gst_rtmp_connection_close_and_unref);
1033   g_object_unref (task);
1034 }
1035
1036 GstRtmpConnection *
1037 gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error)
1038 {
1039   GTask *task = G_TASK (result);
1040   return g_task_propagate_pointer (task, error);
1041 }
1042
1043 static void send_create_stream (GTask * task);
1044 static void send_publish_or_play (GTask * task);
1045
1046 typedef struct
1047 {
1048   GstRtmpConnection *connection;
1049   gulong error_handler_id;
1050   gchar *stream;
1051   gboolean publish;
1052   guint32 id;
1053 } StreamTaskData;
1054
1055 static StreamTaskData *
1056 stream_task_data_new (GstRtmpConnection * connection, const gchar * stream,
1057     gboolean publish)
1058 {
1059   StreamTaskData *data = g_slice_new0 (StreamTaskData);
1060   data->connection = g_object_ref (connection);
1061   data->stream = g_strdup (stream);
1062   data->publish = publish;
1063   return data;
1064 }
1065
1066 static void
1067 stream_task_data_free (gpointer ptr)
1068 {
1069   StreamTaskData *data = ptr;
1070   g_clear_pointer (&data->stream, g_free);
1071   if (data->error_handler_id) {
1072     g_signal_handler_disconnect (data->connection, data->error_handler_id);
1073   }
1074   g_clear_object (&data->connection);
1075   g_slice_free (StreamTaskData, data);
1076 }
1077
1078 static void
1079 start_stream (GstRtmpConnection * connection, const gchar * stream,
1080     gboolean publish, GCancellable * cancellable,
1081     GAsyncReadyCallback callback, gpointer user_data)
1082 {
1083   GTask *task;
1084   StreamTaskData *data;
1085
1086   init_debug ();
1087
1088   task = g_task_new (connection, cancellable, callback, user_data);
1089
1090   if (!stream) {
1091     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
1092         "Stream is not set");
1093     g_object_unref (task);
1094     return;
1095   }
1096
1097   data = stream_task_data_new (connection, stream, publish);
1098   g_task_set_task_data (task, data, stream_task_data_free);
1099
1100   data->error_handler_id = g_signal_connect (connection,
1101       "error", G_CALLBACK (connection_error), task);
1102
1103   send_create_stream (task);
1104 }
1105
1106 void
1107 gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
1108     const gchar * stream, GCancellable * cancellable,
1109     GAsyncReadyCallback callback, gpointer user_data)
1110 {
1111   start_stream (connection, stream, TRUE, cancellable, callback, user_data);
1112 }
1113
1114 void
1115 gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
1116     const gchar * stream, GCancellable * cancellable,
1117     GAsyncReadyCallback callback, gpointer user_data)
1118 {
1119   start_stream (connection, stream, FALSE, cancellable, callback, user_data);
1120 }
1121
1122 static void
1123 send_set_buffer_length (GstRtmpConnection * connection, guint32 stream,
1124     guint32 ms)
1125 {
1126   GstRtmpUserControl uc = {
1127     .type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH,
1128     .param = stream,
1129     .param2 = ms,
1130   };
1131
1132   gst_rtmp_connection_queue_message (connection,
1133       gst_rtmp_message_new_user_control (&uc));
1134 }
1135
1136 static void
1137 send_create_stream (GTask * task)
1138 {
1139   GstRtmpConnection *connection = g_task_get_source_object (task);
1140   StreamTaskData *data = g_task_get_task_data (task);
1141   GstAmfNode *command_object, *stream_name;
1142
1143   command_object = gst_amf_node_new_null ();
1144   stream_name = gst_amf_node_new_string (data->stream, -1);
1145
1146   if (data->publish) {
1147     /* Not part of RTMP documentation */
1148     GST_DEBUG ("Releasing stream '%s'", data->stream);
1149     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1150         "releaseStream", command_object, stream_name, NULL);
1151     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1152         "FCPublish", command_object, stream_name, NULL);
1153   } else {
1154     /* Matches librtmp */
1155     gst_rtmp_connection_request_window_size (connection,
1156         GST_RTMP_DEFAULT_WINDOW_ACK_SIZE);
1157     send_set_buffer_length (connection, 0, 300);
1158   }
1159
1160   GST_INFO ("Creating stream '%s'", data->stream);
1161   gst_rtmp_connection_send_command (connection, create_stream_done, task, 0,
1162       "createStream", command_object, NULL);
1163
1164   gst_amf_node_free (stream_name);
1165   gst_amf_node_free (command_object);
1166 }
1167
1168 static void
1169 create_stream_done (const gchar * command_name, GPtrArray * args,
1170     gpointer user_data)
1171 {
1172   GTask *task = G_TASK (user_data);
1173   StreamTaskData *data = g_task_get_task_data (task);
1174   GstAmfNode *result;
1175
1176   if (g_task_return_error_if_cancelled (task)) {
1177     g_object_unref (task);
1178     return;
1179   }
1180
1181   if (!args) {
1182     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1183         "createStream failed: %s", command_name);
1184     g_object_unref (task);
1185     return;
1186   }
1187
1188   if (args->len < 2) {
1189     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1190         "createStream failed; not enough return arguments");
1191     g_object_unref (task);
1192     return;
1193   }
1194
1195   result = g_ptr_array_index (args, 1);
1196   if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) {
1197     GString *error_dump = g_string_new ("");
1198
1199     gst_amf_node_dump (result, -1, error_dump);
1200
1201     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1202         "createStream failed: %s", error_dump->str);
1203     g_object_unref (task);
1204
1205     g_string_free (error_dump, TRUE);
1206     return;
1207   }
1208
1209   data->id = gst_amf_node_get_number (result);
1210   GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id);
1211
1212   if (data->id == 0) {
1213     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
1214         "createStream returned ID 0");
1215     g_object_unref (task);
1216     return;
1217   }
1218
1219   send_publish_or_play (task);
1220 }
1221
1222 static void
1223 send_publish_or_play (GTask * task)
1224 {
1225   GstRtmpConnection *connection = g_task_get_source_object (task);
1226   StreamTaskData *data = g_task_get_task_data (task);
1227   const gchar *command = data->publish ? "publish" : "play";
1228   GstAmfNode *command_object, *stream_name, *argument;
1229
1230   command_object = gst_amf_node_new_null ();
1231   stream_name = gst_amf_node_new_string (data->stream, -1);
1232
1233   if (data->publish) {
1234     /* publishing type (live, record, append) */
1235     argument = gst_amf_node_new_string ("live", -1);
1236   } else {
1237     /* "Start" argument: -2 = live or recording, -1 = only live
1238        0 or positive = only recording, seek to X seconds */
1239     argument = gst_amf_node_new_number (-2);
1240   }
1241
1242   GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT,
1243       command, data->stream, data->id);
1244   gst_rtmp_connection_expect_command (connection, on_publish_or_play_status,
1245       task, data->id, "onStatus");
1246   gst_rtmp_connection_send_command (connection, NULL, NULL, data->id,
1247       command, command_object, stream_name, argument, NULL);
1248
1249   if (!data->publish) {
1250     /* Matches librtmp */
1251     send_set_buffer_length (connection, data->id, 30000);
1252   }
1253
1254   gst_amf_node_free (command_object);
1255   gst_amf_node_free (stream_name);
1256   gst_amf_node_free (argument);
1257 }
1258
1259 static void
1260 on_publish_or_play_status (const gchar * command_name, GPtrArray * args,
1261     gpointer user_data)
1262 {
1263   GTask *task = G_TASK (user_data);
1264   GstRtmpConnection *connection = g_task_get_source_object (task);
1265   StreamTaskData *data = g_task_get_task_data (task);
1266   const gchar *command = data->publish ? "publish" : "play", *code = NULL;
1267   GString *info_dump;
1268
1269   if (g_task_return_error_if_cancelled (task)) {
1270     g_object_unref (task);
1271     return;
1272   }
1273
1274   if (!args) {
1275     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1276         "%s failed: %s", command, command_name);
1277     g_object_unref (task);
1278     return;
1279   }
1280
1281   if (args->len < 2) {
1282     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1283         "%s failed; not enough return arguments", command);
1284     g_object_unref (task);
1285     return;
1286   }
1287
1288   {
1289     const GstAmfNode *info_object, *code_object;
1290     info_object = g_ptr_array_index (args, 1);
1291     code_object = gst_amf_node_get_field (info_object, "code");
1292
1293     if (code_object) {
1294       code = gst_amf_node_peek_string (code_object, NULL);
1295     }
1296
1297     info_dump = g_string_new ("");
1298     gst_amf_node_dump (info_object, -1, info_dump);
1299   }
1300
1301   if (data->publish) {
1302     if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) {
1303       GST_INFO ("publish success: %s", info_dump->str);
1304       g_task_return_boolean (task, TRUE);
1305       goto out;
1306     }
1307
1308     if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) {
1309       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS,
1310           "publish denied: stream already exists: %s", info_dump->str);
1311       goto out;
1312     }
1313
1314     if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) {
1315       g_task_return_new_error (task, G_IO_ERROR,
1316           G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str);
1317       goto out;
1318     }
1319   } else {
1320     if (g_strcmp0 (code, "NetStream.Play.Start") == 0 ||
1321         g_strcmp0 (code, "NetStream.Play.PublishNotify") == 0 ||
1322         g_strcmp0 (code, "NetStream.Play.Reset") == 0) {
1323       GST_INFO ("play success: %s", info_dump->str);
1324       g_task_return_boolean (task, TRUE);
1325       goto out;
1326     }
1327
1328     if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) {
1329       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND,
1330           "play denied: stream not found: %s", info_dump->str);
1331       goto out;
1332     }
1333   }
1334
1335   g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1336       "unhandled %s result: %s", command, info_dump->str);
1337
1338 out:
1339   g_string_free (info_dump, TRUE);
1340
1341   g_signal_handler_disconnect (connection, data->error_handler_id);
1342   data->error_handler_id = 0;
1343
1344   g_object_unref (task);
1345 }
1346
1347 static gboolean
1348 start_stream_finish (GstRtmpConnection * connection,
1349     GAsyncResult * result, guint32 * stream_id, GError ** error)
1350 {
1351   GTask *task;
1352   StreamTaskData *data;
1353
1354   g_return_val_if_fail (g_task_is_valid (result, connection), FALSE);
1355
1356   task = G_TASK (result);
1357
1358   if (!g_task_propagate_boolean (G_TASK (result), error)) {
1359     return FALSE;
1360   }
1361
1362   data = g_task_get_task_data (task);
1363
1364   if (stream_id) {
1365     *stream_id = data->id;
1366   }
1367
1368   return TRUE;
1369 }
1370
1371 gboolean
1372 gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
1373     GAsyncResult * result, guint32 * stream_id, GError ** error)
1374 {
1375   return start_stream_finish (connection, result, stream_id, error);
1376 }
1377
1378 gboolean
1379 gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
1380     GAsyncResult * result, guint32 * stream_id, GError ** error)
1381 {
1382   return start_stream_finish (connection, result, stream_id, error);
1383 }
1384
1385 void
1386 gst_rtmp_client_stop_publish (GstRtmpConnection * connection,
1387     const gchar * stream, const GstRtmpStopCommands stop_commands)
1388 {
1389   send_stop (connection, stream, stop_commands);
1390 }
1391
1392 static void
1393 send_stop (GstRtmpConnection * connection, const gchar * stream,
1394     const GstRtmpStopCommands stop_commands)
1395 {
1396   GstAmfNode *command_object, *stream_name;
1397
1398   command_object = gst_amf_node_new_null ();
1399   stream_name = gst_amf_node_new_string (stream, -1);
1400
1401   if (stop_commands & GST_RTMP_STOP_COMMANDS_FCUNPUBLISH) {
1402     GST_DEBUG ("Sending stop command 'FCUnpublish' for stream '%s'", stream);
1403     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1404         "FCUnpublish", command_object, stream_name, NULL);
1405   }
1406   if (stop_commands & GST_RTMP_STOP_COMMANDS_CLOSE_STREAM) {
1407     GST_DEBUG ("Sending stop command 'closeStream' for stream '%s'", stream);
1408     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1409         "closeStream", command_object, stream_name, NULL);
1410   }
1411   if (stop_commands & GST_RTMP_STOP_COMMANDS_DELETE_STREAM) {
1412     GST_DEBUG ("Sending stop command 'deleteStream' for stream '%s'", stream);
1413     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1414         "deleteStream", command_object, stream_name, NULL);
1415   }
1416
1417   gst_amf_node_free (stream_name);
1418   gst_amf_node_free (command_object);
1419 }