rtmp2: Allow NULL flash version, omitting the field
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / rtmp2 / rtmp / rtmpclient.c
1 /* GStreamer RTMP Library
2  * Copyright (C) 2013 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include <gio/gio.h>
28 #include <string.h>
29 #include "rtmpclient.h"
30 #include "rtmphandshake.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
33
34 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
35 #define GST_CAT_DEFAULT gst_rtmp_client_debug_category
36
37 static void send_connect_done (const gchar * command_name, GPtrArray * args,
38     gpointer user_data);
39 static void create_stream_done (const gchar * command_name, GPtrArray * args,
40     gpointer user_data);
41 static void on_publish_or_play_status (const gchar * command_name,
42     GPtrArray * args, gpointer user_data);
43
44 static void
45 init_debug (void)
46 {
47   static gsize done = 0;
48   if (g_once_init_enter (&done)) {
49     GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category,
50         "rtmpclient", 0, "debug category for the rtmp client");
51     GST_DEBUG_REGISTER_FUNCPTR (send_connect_done);
52     GST_DEBUG_REGISTER_FUNCPTR (create_stream_done);
53     GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status);
54     g_once_init_leave (&done, 1);
55   }
56 }
57
58 static const gchar *scheme_strings[] = {
59   "rtmp",
60   "rtmps",
61   NULL
62 };
63
64 #define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1)
65
66 GType
67 gst_rtmp_scheme_get_type (void)
68 {
69   static gsize scheme_type = 0;
70   static const GEnumValue scheme[] = {
71     {GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"},
72     {GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"},
73     {0, NULL, NULL},
74   };
75
76   if (g_once_init_enter (&scheme_type)) {
77     GType tmp = g_enum_register_static ("GstRtmpScheme", scheme);
78     g_once_init_leave (&scheme_type, tmp);
79   }
80
81   return (GType) scheme_type;
82 }
83
84 GstRtmpScheme
85 gst_rtmp_scheme_from_string (const gchar * string)
86 {
87   if (string) {
88     gint value;
89
90     for (value = 0; value < NUM_SCHEMES; value++) {
91       if (strcmp (scheme_strings[value], string) == 0) {
92         return value;
93       }
94     }
95   }
96
97   return -1;
98 }
99
100 GstRtmpScheme
101 gst_rtmp_scheme_from_uri (const GstUri * uri)
102 {
103   const gchar *scheme = gst_uri_get_scheme (uri);
104   if (!scheme) {
105     return GST_RTMP_SCHEME_RTMP;
106   }
107
108   return gst_rtmp_scheme_from_string (scheme);
109 }
110
111 const gchar *
112 gst_rtmp_scheme_to_string (GstRtmpScheme scheme)
113 {
114   if (scheme >= 0 && scheme < NUM_SCHEMES) {
115     return scheme_strings[scheme];
116   }
117
118   return "invalid";
119 }
120
121 const gchar *const *
122 gst_rtmp_scheme_get_strings (void)
123 {
124   return scheme_strings;
125 }
126
127 guint
128 gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme)
129 {
130   switch (scheme) {
131     case GST_RTMP_SCHEME_RTMP:
132       return 1935;
133
134     case GST_RTMP_SCHEME_RTMPS:
135       return 443;
136
137     default:
138       g_return_val_if_reached (0);
139   }
140 }
141
142 GType
143 gst_rtmp_authmod_get_type (void)
144 {
145   static gsize authmod_type = 0;
146   static const GEnumValue authmod[] = {
147     {GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"},
148     {GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"},
149     {GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"},
150     {0, NULL, NULL},
151   };
152
153   if (g_once_init_enter (&authmod_type)) {
154     GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod);
155     g_once_init_leave (&authmod_type, tmp);
156   }
157
158   return (GType) authmod_type;
159 }
160
161 static const gchar *
162 gst_rtmp_authmod_get_nick (GstRtmpAuthmod value)
163 {
164   GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD);
165   GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL;
166   return ev ? ev->value_nick : "(unknown)";
167 }
168
169 GType
170 gst_rtmp_stop_commands_get_type (void)
171 {
172   static gsize stop_commands_type = 0;
173   static const GFlagsValue stop_commands[] = {
174     {GST_RTMP_STOP_COMMANDS_NONE, "No command", "none"},
175     {GST_RTMP_STOP_COMMANDS_FCUNPUBLISH, "FCUnpublish", "fcunpublish"},
176     {GST_RTMP_STOP_COMMANDS_CLOSE_STREAM, "closeStream", "closestream"},
177     {GST_RTMP_STOP_COMMANDS_DELETE_STREAM, "deleteStream", "deletestream"},
178     {0, NULL, NULL},
179   };
180
181   if (g_once_init_enter (&stop_commands_type)) {
182     GType tmp = g_flags_register_static ("GstRtmpStopCommands", stop_commands);
183     g_once_init_leave (&stop_commands_type, tmp);
184   }
185
186   return (GType) stop_commands_type;
187 }
188
189 void
190 gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src)
191 {
192   g_return_if_fail (dest);
193   g_return_if_fail (src);
194
195   dest->scheme = src->scheme;
196   dest->host = g_strdup (src->host);
197   dest->port = src->port;
198   dest->application = g_strdup (src->application);
199   dest->stream = g_strdup (src->stream);
200   dest->username = g_strdup (src->username);
201   dest->password = g_strdup (src->password);
202   dest->secure_token = g_strdup (src->secure_token);
203   dest->authmod = src->authmod;
204   dest->timeout = src->timeout;
205   dest->tls_flags = src->tls_flags;
206   dest->flash_ver = g_strdup (src->flash_ver);
207   dest->publish = src->publish;
208 }
209
210 void
211 gst_rtmp_location_clear (GstRtmpLocation * location)
212 {
213   g_return_if_fail (location);
214
215   g_clear_pointer (&location->host, g_free);
216   location->port = 0;
217   g_clear_pointer (&location->application, g_free);
218   g_clear_pointer (&location->stream, g_free);
219   g_clear_pointer (&location->username, g_free);
220   g_clear_pointer (&location->password, g_free);
221   g_clear_pointer (&location->secure_token, g_free);
222   g_clear_pointer (&location->flash_ver, g_free);
223   location->publish = FALSE;
224 }
225
226 gchar *
227 gst_rtmp_location_get_string (const GstRtmpLocation * location,
228     gboolean with_stream)
229 {
230   GstUri *uri;
231   gchar *base, *string;
232   const gchar *scheme_string;
233   guint default_port;
234
235   g_return_val_if_fail (location, NULL);
236
237   scheme_string = gst_rtmp_scheme_to_string (location->scheme);
238   default_port = gst_rtmp_scheme_get_default_port (location->scheme);
239
240   uri = gst_uri_new (scheme_string, NULL, location->host,
241       location->port == default_port ? GST_URI_NO_PORT : location->port, "/",
242       NULL, NULL);
243   base = gst_uri_to_string (uri);
244
245   string = g_strconcat (base, location->application, with_stream ? "/" : NULL,
246       location->stream, NULL);
247
248   g_free (base);
249   gst_uri_unref (uri);
250
251   return string;
252 }
253
254 /* Flag values for the audioCodecs property,
255  * rtmp_specification_1.0.pdf page 32 */
256 enum
257 {
258   SUPPORT_SND_NONE = 0x001,     /* Raw sound, no compression */
259   SUPPORT_SND_ADPCM = 0x002,    /* ADPCM compression */
260   SUPPORT_SND_MP3 = 0x004,      /* mp3 compression */
261   SUPPORT_SND_INTEL = 0x008,    /* Not used */
262   SUPPORT_SND_UNUSED = 0x010,   /* Not used */
263   SUPPORT_SND_NELLY8 = 0x020,   /* NellyMoser at 8-kHz compression */
264   SUPPORT_SND_NELLY = 0x040,    /* NellyMoser compression
265                                  * (5, 11, 22, and 44 kHz) */
266   SUPPORT_SND_G711A = 0x080,    /* G711A sound compression
267                                  * (Flash Media Server only) */
268   SUPPORT_SND_G711U = 0x100,    /* G711U sound compression
269                                  * (Flash Media Server only) */
270   SUPPORT_SND_NELLY16 = 0x200,  /* NellyMoser at 16-kHz compression */
271   SUPPORT_SND_AAC = 0x400,      /* Advanced audio coding (AAC) codec */
272   SUPPORT_SND_SPEEX = 0x800,    /* Speex Audio */
273   SUPPORT_SND_ALL = 0xFFF,      /* All RTMP-supported audio codecs */
274 };
275
276 /* audioCodecs value sent by libavformat. All "used" codecs. */
277 #define GST_RTMP_AUDIOCODECS \
278   (SUPPORT_SND_ALL & ~SUPPORT_SND_INTEL & ~SUPPORT_SND_UNUSED)
279 G_STATIC_ASSERT (GST_RTMP_AUDIOCODECS == 4071); /* libavformat's magic number */
280
281 /* Flag values for the videoCodecs property,
282  * rtmp_specification_1.0.pdf page 32 */
283 enum
284 {
285   SUPPORT_VID_UNUSED = 0x01,    /* Obsolete value */
286   SUPPORT_VID_JPEG = 0x02,      /* Obsolete value */
287   SUPPORT_VID_SORENSON = 0x04,  /* Sorenson Flash video */
288   SUPPORT_VID_HOMEBREW = 0x08,  /* V1 screen sharing */
289   SUPPORT_VID_VP6 = 0x10,       /* On2 video (Flash 8+) */
290   SUPPORT_VID_VP6ALPHA = 0x20,  /* On2 video with alpha channel */
291   SUPPORT_VID_HOMEBREWV = 0x40, /* Screen sharing version 2 (Flash 8+) */
292   SUPPORT_VID_H264 = 0x80,      /* H264 video */
293   SUPPORT_VID_ALL = 0xFF,       /* All RTMP-supported video codecs */
294 };
295
296 /* videoCodecs value sent by libavformat. All non-obsolete codecs. */
297 #define GST_RTMP_VIDEOCODECS \
298   (SUPPORT_VID_ALL & ~SUPPORT_VID_UNUSED & ~SUPPORT_VID_JPEG)
299 G_STATIC_ASSERT (GST_RTMP_VIDEOCODECS == 252);  /* libavformat's magic number */
300
301 /* Flag values for the videoFunction property,
302  * rtmp_specification_1.0.pdf page 32 */
303 enum
304 {
305   /* Indicates that the client can perform frame-accurate seeks. */
306   SUPPORT_VID_CLIENT_SEEK = 1,
307 };
308
309 /* videoFunction value sent by libavformat */
310 #define GST_RTMP_VIDEOFUNCTION (SUPPORT_VID_CLIENT_SEEK)
311 G_STATIC_ASSERT (GST_RTMP_VIDEOFUNCTION == 1);  /* libavformat's magic number */
312
313 static void socket_connect (GTask * task);
314 static void socket_connect_done (GObject * source, GAsyncResult * result,
315     gpointer user_data);
316 static void handshake_done (GObject * source, GAsyncResult * result,
317     gpointer user_data);
318 static void send_connect (GTask * task);
319 static void send_stop (GstRtmpConnection * connection, const gchar * stream,
320     const GstRtmpStopCommands stop_commands);
321 static void send_secure_token_response (GTask * task,
322     GstRtmpConnection * connection, const gchar * challenge);
323 static void connection_error (GstRtmpConnection * connection,
324     const GError * error, gpointer user_data);
325
326 #define DEFAULT_TIMEOUT 5
327
328 typedef struct
329 {
330   GstRtmpLocation location;
331   gchar *auth_query;
332   GstRtmpConnection *connection;
333   gulong error_handler_id;
334 } ConnectTaskData;
335
336 static ConnectTaskData *
337 connect_task_data_new (const GstRtmpLocation * location)
338 {
339   ConnectTaskData *data = g_slice_new0 (ConnectTaskData);
340   gst_rtmp_location_copy (&data->location, location);
341   return data;
342 }
343
344 static void
345 connect_task_data_free (gpointer ptr)
346 {
347   ConnectTaskData *data = ptr;
348   gst_rtmp_location_clear (&data->location);
349   g_clear_pointer (&data->auth_query, g_free);
350   if (data->error_handler_id) {
351     g_signal_handler_disconnect (data->connection, data->error_handler_id);
352   }
353   g_clear_object (&data->connection);
354   g_slice_free (ConnectTaskData, data);
355 }
356
357 static GRegex *auth_regex = NULL;
358
359 void
360 gst_rtmp_client_connect_async (const GstRtmpLocation * location,
361     GCancellable * cancellable, GAsyncReadyCallback callback,
362     gpointer user_data)
363 {
364   GTask *task;
365
366   init_debug ();
367
368   if (g_once_init_enter (&auth_regex)) {
369     GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *"
370         "\\[ *authmod=(?<authmod>.*?) *\\] *: *"
371         "(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL);
372     g_once_init_leave (&auth_regex, re);
373   }
374
375   task = g_task_new (NULL, cancellable, callback, user_data);
376
377   g_task_set_task_data (task, connect_task_data_new (location),
378       connect_task_data_free);
379
380   socket_connect (task);
381 }
382
383 static void
384 socket_connect (GTask * task)
385 {
386   ConnectTaskData *data = g_task_get_task_data (task);
387   GSocketConnectable *addr;
388   GSocketClient *socket_client;
389
390   if (data->location.timeout < 0) {
391     data->location.timeout = DEFAULT_TIMEOUT;
392   }
393
394   if (data->error_handler_id) {
395     g_signal_handler_disconnect (data->connection, data->error_handler_id);
396     data->error_handler_id = 0;
397   }
398
399   if (data->connection) {
400     gst_rtmp_connection_close (data->connection);
401     g_clear_object (&data->connection);
402   }
403
404   if (!data->location.host) {
405     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
406         "Host is not set");
407     g_object_unref (task);
408     return;
409   }
410
411   if (!data->location.port) {
412     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
413         "Port is not set");
414     g_object_unref (task);
415     return;
416   }
417
418   socket_client = g_socket_client_new ();
419   g_socket_client_set_timeout (socket_client, data->location.timeout);
420
421   switch (data->location.scheme) {
422     case GST_RTMP_SCHEME_RTMP:
423       break;
424
425     case GST_RTMP_SCHEME_RTMPS:
426       GST_DEBUG ("Configuring TLS, validation flags 0x%02x",
427           data->location.tls_flags);
428       g_socket_client_set_tls (socket_client, TRUE);
429       G_GNUC_BEGIN_IGNORE_DEPRECATIONS;
430       g_socket_client_set_tls_validation_flags (socket_client,
431           data->location.tls_flags);
432       G_GNUC_END_IGNORE_DEPRECATIONS;
433       break;
434
435     default:
436       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
437           "Invalid scheme ID %d", data->location.scheme);
438       g_object_unref (socket_client);
439       g_object_unref (task);
440       return;
441   }
442
443   addr = g_network_address_new (data->location.host, data->location.port);
444
445   GST_DEBUG ("Starting socket connection");
446
447   g_socket_client_connect_async (socket_client, addr,
448       g_task_get_cancellable (task), socket_connect_done, task);
449   g_object_unref (addr);
450   g_object_unref (socket_client);
451 }
452
453 static void
454 socket_connect_done (GObject * source, GAsyncResult * result,
455     gpointer user_data)
456 {
457   GSocketClient *socket_client = G_SOCKET_CLIENT (source);
458   GSocketConnection *socket_connection;
459   GTask *task = user_data;
460   GError *error = NULL;
461
462   socket_connection =
463       g_socket_client_connect_finish (socket_client, result, &error);
464
465   if (g_task_return_error_if_cancelled (task)) {
466     GST_DEBUG ("Socket connection was cancelled");
467     g_object_unref (task);
468     return;
469   }
470
471   if (socket_connection == NULL) {
472     GST_ERROR ("Socket connection error");
473     g_task_return_error (task, error);
474     g_object_unref (task);
475     return;
476   }
477
478   GST_DEBUG ("Socket connection established");
479
480   gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE,
481       g_task_get_cancellable (task), handshake_done, task);
482   g_object_unref (socket_connection);
483 }
484
485
486 static void
487 handshake_done (GObject * source, GAsyncResult * result, gpointer user_data)
488 {
489   GIOStream *stream = G_IO_STREAM (source);
490   GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream);
491   GTask *task = user_data;
492   ConnectTaskData *data = g_task_get_task_data (task);
493   GError *error = NULL;
494   gboolean res;
495
496   res = gst_rtmp_client_handshake_finish (stream, result, &error);
497   if (!res) {
498     g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
499     g_task_return_error (task, error);
500     g_object_unref (task);
501     return;
502   }
503
504   data->connection = gst_rtmp_connection_new (socket_connection,
505       g_task_get_cancellable (task));
506   data->error_handler_id = g_signal_connect (data->connection,
507       "error", G_CALLBACK (connection_error), task);
508
509   send_connect (task);
510 }
511
512 static void
513 connection_error (GstRtmpConnection * connection, const GError * error,
514     gpointer user_data)
515 {
516   GTask *task = user_data;
517
518   if (!g_task_had_error (task))
519     g_task_return_error (task, g_error_copy (error));
520 }
521
522 static gchar *
523 do_adobe_auth (const gchar * username, const gchar * password,
524     const gchar * salt, const gchar * opaque, const gchar * challenge)
525 {
526   guint8 hash[16];              /* MD5 digest */
527   gsize hashlen = sizeof hash;
528   gchar *challenge2, *auth_query;
529   GChecksum *md5;
530
531   g_return_val_if_fail (username, NULL);
532   g_return_val_if_fail (password, NULL);
533   g_return_val_if_fail (salt, NULL);
534
535   md5 = g_checksum_new (G_CHECKSUM_MD5);
536   g_checksum_update (md5, (guchar *) username, -1);
537   g_checksum_update (md5, (guchar *) salt, -1);
538   g_checksum_update (md5, (guchar *) password, -1);
539
540   g_checksum_get_digest (md5, hash, &hashlen);
541   g_warn_if_fail (hashlen == sizeof hash);
542
543   {
544     gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
545     g_checksum_reset (md5);
546     g_checksum_update (md5, (guchar *) hashstr, -1);
547     g_free (hashstr);
548   }
549
550   if (opaque)
551     g_checksum_update (md5, (guchar *) opaque, -1);
552   else if (challenge)
553     g_checksum_update (md5, (guchar *) challenge, -1);
554
555   challenge2 = g_strdup_printf ("%08x", g_random_int ());
556   g_checksum_update (md5, (guchar *) challenge2, -1);
557
558   g_checksum_get_digest (md5, hash, &hashlen);
559   g_warn_if_fail (hashlen == sizeof hash);
560
561   {
562     gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
563
564     if (opaque) {
565       auth_query =
566           g_strdup_printf
567           ("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe",
568           username, challenge2, hashstr, opaque);
569     } else {
570       auth_query =
571           g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s",
572           "adobe", username, challenge2, hashstr);
573     }
574     g_free (hashstr);
575   }
576
577   g_checksum_free (md5);
578   g_free (challenge2);
579
580   return auth_query;
581 }
582
583 static void
584 send_connect (GTask * task)
585 {
586   ConnectTaskData *data = g_task_get_task_data (task);
587   GstAmfNode *node;
588   const gchar *app, *flash_ver;
589   gchar *uri, *appstr = NULL, *uristr = NULL;
590   gboolean publish;
591
592   node = gst_amf_node_new_object ();
593   app = data->location.application;
594   flash_ver = data->location.flash_ver;
595   publish = data->location.publish;
596   uri = gst_rtmp_location_get_string (&data->location, FALSE);
597
598   if (!app) {
599     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
600         "Application is not set");
601     g_object_unref (task);
602     goto out;
603   }
604
605   if (data->auth_query) {
606     const gchar *query = data->auth_query;
607     appstr = g_strdup_printf ("%s?%s", app, query);
608     uristr = g_strdup_printf ("%s?%s", uri, query);
609   } else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) {
610     const gchar *user = data->location.username;
611     const gchar *authmod = "adobe";
612
613     if (!user) {
614       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
615           "no username for adobe authentication");
616       g_object_unref (task);
617       goto out;
618     }
619
620     if (!data->location.password) {
621       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
622           "no password for adobe authentication");
623       g_object_unref (task);
624       goto out;
625     }
626
627     appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user);
628     uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user);
629   } else {
630     appstr = g_strdup (app);
631     uristr = g_strdup (uri);
632   }
633
634   /* Arguments for the connect command.
635    * Most of these are described in rtmp_specification_1.0.pdf page 30 */
636
637   /* "The server application name the client is connected to." */
638   gst_amf_node_append_field_take_string (node, "app", appstr, -1);
639
640   if (publish) {
641     /* Undocumented. Sent by both libavformat and librtmp. */
642     gst_amf_node_append_field_string (node, "type", "nonprivate", -1);
643   }
644
645   if (flash_ver) {
646     /* "Flash Player version. It is the same string as returned by the
647      * ApplicationScript getversion () function." */
648     gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1);
649   }
650
651   /* "URL of the source SWF file making the connection."
652    * XXX: libavformat sends "swfUrl" here, if provided. */
653
654   /* "URL of the Server. It has the following format.
655    * protocol://servername:port/appName/appInstance" */
656   gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1);
657
658   if (!publish) {
659     /* "True if proxy is being used." */
660     gst_amf_node_append_field_boolean (node, "fpad", FALSE);
661
662     /* Undocumented. Sent by libavformat. */
663     gst_amf_node_append_field_number (node, "capabilities",
664         15 /* libavformat's magic number */ );
665
666     /* "Indicates what audio codecs the client supports." */
667     gst_amf_node_append_field_number (node, "audioCodecs",
668         GST_RTMP_AUDIOCODECS);
669
670     /* "Indicates what video codecs are supported." */
671     gst_amf_node_append_field_number (node, "videoCodecs",
672         GST_RTMP_VIDEOCODECS);
673
674     /* "Indicates what special video functions are supported." */
675     gst_amf_node_append_field_number (node, "videoFunction",
676         GST_RTMP_VIDEOFUNCTION);
677
678     /* "URL of the web page from where the SWF file was loaded."
679      * XXX: libavformat sends "pageUrl" here, if provided. */
680   }
681
682   gst_rtmp_connection_send_command (data->connection, send_connect_done,
683       task, 0, "connect", node, NULL);
684
685 out:
686   gst_amf_node_free (node);
687   g_free (uri);
688 }
689
690 static void
691 send_connect_done (const gchar * command_name, GPtrArray * args,
692     gpointer user_data)
693 {
694   GTask *task = G_TASK (user_data);
695   ConnectTaskData *data = g_task_get_task_data (task);
696   const GstAmfNode *node, *optional_args;
697   const gchar *code;
698
699   if (g_task_return_error_if_cancelled (task)) {
700     g_object_unref (task);
701     return;
702   }
703
704   if (!args) {
705     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
706         "'connect' cmd failed: %s", command_name);
707     g_object_unref (task);
708     return;
709   }
710
711   if (args->len < 2) {
712     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
713         "'connect' cmd failed; not enough return arguments");
714     g_object_unref (task);
715     return;
716   }
717
718   optional_args = g_ptr_array_index (args, 1);
719
720   node = gst_amf_node_get_field (optional_args, "code");
721   code = node ? gst_amf_node_peek_string (node, NULL) : NULL;
722   if (!code) {
723     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
724         "'connect' cmd failed; no status code");
725     g_object_unref (task);
726     return;
727   }
728
729   GST_INFO ("connect result: %s", code);
730
731   if (g_str_equal (code, "NetConnection.Connect.Success")) {
732     node = gst_amf_node_get_field (optional_args, "secureToken");
733     send_secure_token_response (task, data->connection,
734         node ? gst_amf_node_peek_string (node, NULL) : NULL);
735     return;
736   }
737
738   if (g_str_equal (code, "NetConnection.Connect.Rejected")) {
739     GstRtmpAuthmod authmod = data->location.authmod;
740     GMatchInfo *match_info;
741     const gchar *desc;
742     GstUri *query;
743
744     node = gst_amf_node_get_field (optional_args, "description");
745     desc = node ? gst_amf_node_peek_string (node, NULL) : NULL;
746     if (!desc) {
747       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
748           "'connect' cmd returned '%s'; no description", code);
749       g_object_unref (task);
750       return;
751     }
752
753     GST_DEBUG ("connect result desc: %s", desc);
754
755     if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) {
756       if (strstr (desc, "authmod=adobe")) {
757         GST_INFO ("Reconnecting with authmod=adobe");
758         data->location.authmod = GST_RTMP_AUTHMOD_ADOBE;
759         socket_connect (task);
760         return;
761       }
762
763       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
764           "'connect' cmd returned unhandled authmod: %s", desc);
765       g_object_unref (task);
766       return;
767     }
768
769     if (!g_regex_match (auth_regex, desc, 0, &match_info)) {
770       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
771           "'connect' cmd returned '%s': %s", code, desc);
772       g_object_unref (task);
773       return;
774     }
775
776     {
777       gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod");
778       gchar *query_str = g_match_info_fetch_named (match_info, "query");
779       gboolean matches;
780
781       GST_INFO ("regex parsed auth: authmod=%s, query=%s",
782           GST_STR_NULL (authmod_str), GST_STR_NULL (query_str));
783       g_match_info_free (match_info);
784
785       switch (authmod) {
786         case GST_RTMP_AUTHMOD_ADOBE:
787           matches = g_str_equal (authmod_str, "adobe");
788           break;
789
790         default:
791           matches = FALSE;
792           break;
793       }
794
795       if (!matches) {
796         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
797             "server uses wrong authentication mode '%s'; expected %s",
798             GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod));
799         g_object_unref (task);
800         g_free (authmod_str);
801         g_free (query_str);
802         return;
803       }
804       g_free (authmod_str);
805
806       query = gst_uri_from_string (query_str);
807       if (!query) {
808         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
809             "failed to parse authentication query '%s'",
810             GST_STR_NULL (query_str));
811         g_object_unref (task);
812         g_free (query_str);
813         return;
814       }
815       g_free (query_str);
816     }
817
818     {
819       const gchar *reason = gst_uri_get_query_value (query, "reason");
820
821       if (!reason) {
822         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
823             "authentication failed; no reason: %s", desc);
824         g_object_unref (task);
825         gst_uri_unref (query);
826         return;
827       }
828
829       if (g_str_equal (reason, "authfailed")) {
830         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
831             "authentication failed; wrong credentials?: %s", desc);
832         g_object_unref (task);
833         gst_uri_unref (query);
834         return;
835       }
836
837       if (!g_str_equal (reason, "needauth")) {
838         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
839             "authentication failed; reason '%s': %s", reason, desc);
840         g_object_unref (task);
841         gst_uri_unref (query);
842         return;
843       }
844     }
845
846     {
847       const gchar *salt, *opaque, *challenge;
848
849       salt = gst_uri_get_query_value (query, "salt");
850       if (!salt) {
851         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
852             "salt missing from auth request: %s", desc);
853         g_object_unref (task);
854         gst_uri_unref (query);
855         return;
856       }
857
858       opaque = gst_uri_get_query_value (query, "opaque");
859       challenge = gst_uri_get_query_value (query, "challenge");
860
861       g_warn_if_fail (!data->auth_query);
862       data->auth_query = do_adobe_auth (data->location.username,
863           data->location.password, salt, opaque, challenge);
864     }
865
866     gst_uri_unref (query);
867
868     if (!data->auth_query) {
869       /* do_adobe_auth should not fail; send_connect tests if username
870        * and password are provided */
871       g_warn_if_reached ();
872       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
873           "internal error: failed to generate adobe auth query");
874       g_object_unref (task);
875       return;
876     }
877
878     socket_connect (task);
879     return;
880   }
881
882   g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
883       "'connect' cmd returned '%s'", code);
884   g_object_unref (task);
885 }
886
887 /* prep key: pack 1st 16 chars into 4 LittleEndian ints */
888 static void
889 rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4])
890 {
891   gchar copy[17];
892
893   g_return_if_fail (key);
894   g_return_if_fail (out);
895
896   /* ensure we can read 16 bytes */
897   strncpy (copy, key, 16);
898   /* placate GCC 8 -Wstringop-truncation */
899   copy[16] = 0;
900
901   out[0] = GST_READ_UINT32_LE (copy);
902   out[1] = GST_READ_UINT32_LE (copy + 4);
903   out[2] = GST_READ_UINT32_LE (copy + 8);
904   out[3] = GST_READ_UINT32_LE (copy + 12);
905 }
906
907 /* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */
908 static GArray *
909 rtmp_tea_decode_prep_text (const gchar * text)
910 {
911   GArray *arr;
912   gsize len, i;
913
914   g_return_val_if_fail (text, NULL);
915
916   len = strlen (text);
917   arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8);
918
919   for (i = 0; i < len; i += 8) {
920     gchar copy[9];
921     guchar chars[4];
922     gsize j;
923     guint32 val;
924
925     /* ensure we can read 8 bytes */
926     strncpy (copy, text + i, 8);
927     /* placate GCC 8 -Wstringop-truncation */
928     copy[8] = 0;
929
930     for (j = 0; j < 4; j++) {
931       gint hi, lo;
932
933       hi = g_ascii_xdigit_value (copy[2 * j]);
934       lo = g_ascii_xdigit_value (copy[2 * j + 1]);
935
936       chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0);
937     }
938
939     val = GST_READ_UINT32_LE (chars);
940     g_array_append_val (arr, val);
941   }
942
943   return arr;
944 }
945
946 /* return text from uint32s to chars */
947 static gchar *
948 rtmp_tea_decode_return_text (GArray * arr)
949 {
950 #if G_BYTE_ORDER != G_LITTLE_ENDIAN
951   gsize i;
952
953   g_return_val_if_fail (arr, NULL);
954
955   for (i = 0; i < arr->len; i++) {
956     guint32 *val = &g_array_index (arr, guint32, i);
957     *val = GUINT32_TO_LE (*val);
958   }
959 #endif
960
961   /* array is alredy zero-terminated */
962   return g_array_free (arr, FALSE);
963 }
964
965 /* http://www.movable-type.co.uk/scripts/tea-block.html */
966 static void
967 rtmp_tea_decode_btea (GArray * text, guint32 key[4])
968 {
969   guint32 *v, n, *k;
970   guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9;
971   guint32 p, q;
972
973   g_return_if_fail (text);
974   g_return_if_fail (text->len > 0);
975   g_return_if_fail (key);
976
977   v = (guint32 *) text->data;
978   n = text->len;
979   k = key;
980   z = v[n - 1];
981   y = v[0];
982   q = 6 + 52 / n;
983   sum = q * DELTA;
984
985 #define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z));
986
987   while (sum != 0) {
988     e = sum >> 2 & 3;
989     for (p = n - 1; p > 0; p--)
990       z = v[p - 1], y = v[p] -= MX;
991     z = v[n - 1];
992     y = v[0] -= MX;
993     sum -= DELTA;
994   }
995
996 #undef MX
997 }
998
999 /* taken from librtmp */
1000 static gchar *
1001 rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text)
1002 {
1003   guint32 key[4];
1004   GArray *text;
1005
1006   rtmp_tea_decode_prep_key (bin_key, key);
1007   text = rtmp_tea_decode_prep_text (hex_text);
1008   rtmp_tea_decode_btea (text, key);
1009   return rtmp_tea_decode_return_text (text);
1010 }
1011
1012 static void
1013 send_secure_token_response (GTask * task, GstRtmpConnection * connection,
1014     const gchar * challenge)
1015 {
1016   ConnectTaskData *data = g_task_get_task_data (task);
1017   if (challenge) {
1018     GstAmfNode *node1;
1019     GstAmfNode *node2;
1020     gchar *response;
1021
1022     if (!data->location.secure_token || !data->location.secure_token[0]) {
1023       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
1024           "server requires secureToken but no token provided");
1025       g_object_unref (task);
1026       return;
1027     }
1028
1029     response = rtmp_tea_decode (data->location.secure_token, challenge);
1030
1031     GST_DEBUG ("response: %s", response);
1032
1033     node1 = gst_amf_node_new_null ();
1034     node2 = gst_amf_node_new_take_string (response, -1);
1035     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1036         "secureTokenResponse", node1, node2, NULL);
1037     gst_amf_node_free (node1);
1038     gst_amf_node_free (node2);
1039   }
1040
1041   g_signal_handler_disconnect (connection, data->error_handler_id);
1042   data->error_handler_id = 0;
1043
1044   g_task_return_pointer (task, g_object_ref (connection),
1045       gst_rtmp_connection_close_and_unref);
1046   g_object_unref (task);
1047 }
1048
1049 GstRtmpConnection *
1050 gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error)
1051 {
1052   GTask *task = G_TASK (result);
1053   return g_task_propagate_pointer (task, error);
1054 }
1055
1056 static void send_create_stream (GTask * task);
1057 static void send_publish_or_play (GTask * task);
1058
1059 typedef struct
1060 {
1061   GstRtmpConnection *connection;
1062   gulong error_handler_id;
1063   gchar *stream;
1064   gboolean publish;
1065   guint32 id;
1066 } StreamTaskData;
1067
1068 static StreamTaskData *
1069 stream_task_data_new (GstRtmpConnection * connection, const gchar * stream,
1070     gboolean publish)
1071 {
1072   StreamTaskData *data = g_slice_new0 (StreamTaskData);
1073   data->connection = g_object_ref (connection);
1074   data->stream = g_strdup (stream);
1075   data->publish = publish;
1076   return data;
1077 }
1078
1079 static void
1080 stream_task_data_free (gpointer ptr)
1081 {
1082   StreamTaskData *data = ptr;
1083   g_clear_pointer (&data->stream, g_free);
1084   if (data->error_handler_id) {
1085     g_signal_handler_disconnect (data->connection, data->error_handler_id);
1086   }
1087   g_clear_object (&data->connection);
1088   g_slice_free (StreamTaskData, data);
1089 }
1090
1091 static void
1092 start_stream (GstRtmpConnection * connection, const gchar * stream,
1093     gboolean publish, GCancellable * cancellable,
1094     GAsyncReadyCallback callback, gpointer user_data)
1095 {
1096   GTask *task;
1097   StreamTaskData *data;
1098
1099   init_debug ();
1100
1101   task = g_task_new (connection, cancellable, callback, user_data);
1102
1103   if (!stream) {
1104     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
1105         "Stream is not set");
1106     g_object_unref (task);
1107     return;
1108   }
1109
1110   data = stream_task_data_new (connection, stream, publish);
1111   g_task_set_task_data (task, data, stream_task_data_free);
1112
1113   data->error_handler_id = g_signal_connect (connection,
1114       "error", G_CALLBACK (connection_error), task);
1115
1116   send_create_stream (task);
1117 }
1118
1119 void
1120 gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
1121     const gchar * stream, GCancellable * cancellable,
1122     GAsyncReadyCallback callback, gpointer user_data)
1123 {
1124   start_stream (connection, stream, TRUE, cancellable, callback, user_data);
1125 }
1126
1127 void
1128 gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
1129     const gchar * stream, GCancellable * cancellable,
1130     GAsyncReadyCallback callback, gpointer user_data)
1131 {
1132   start_stream (connection, stream, FALSE, cancellable, callback, user_data);
1133 }
1134
1135 static void
1136 send_set_buffer_length (GstRtmpConnection * connection, guint32 stream,
1137     guint32 ms)
1138 {
1139   GstRtmpUserControl uc = {
1140     .type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH,
1141     .param = stream,
1142     .param2 = ms,
1143   };
1144
1145   gst_rtmp_connection_queue_message (connection,
1146       gst_rtmp_message_new_user_control (&uc));
1147 }
1148
1149 static void
1150 send_create_stream (GTask * task)
1151 {
1152   GstRtmpConnection *connection = g_task_get_source_object (task);
1153   StreamTaskData *data = g_task_get_task_data (task);
1154   GstAmfNode *command_object, *stream_name;
1155
1156   command_object = gst_amf_node_new_null ();
1157   stream_name = gst_amf_node_new_string (data->stream, -1);
1158
1159   if (data->publish) {
1160     /* Not part of RTMP documentation */
1161     GST_DEBUG ("Releasing stream '%s'", data->stream);
1162     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1163         "releaseStream", command_object, stream_name, NULL);
1164     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1165         "FCPublish", command_object, stream_name, NULL);
1166   } else {
1167     /* Matches librtmp */
1168     gst_rtmp_connection_request_window_size (connection,
1169         GST_RTMP_DEFAULT_WINDOW_ACK_SIZE);
1170     send_set_buffer_length (connection, 0, 300);
1171   }
1172
1173   GST_INFO ("Creating stream '%s'", data->stream);
1174   gst_rtmp_connection_send_command (connection, create_stream_done, task, 0,
1175       "createStream", command_object, NULL);
1176
1177   gst_amf_node_free (stream_name);
1178   gst_amf_node_free (command_object);
1179 }
1180
1181 static void
1182 create_stream_done (const gchar * command_name, GPtrArray * args,
1183     gpointer user_data)
1184 {
1185   GTask *task = G_TASK (user_data);
1186   StreamTaskData *data = g_task_get_task_data (task);
1187   GstAmfNode *result;
1188
1189   if (g_task_return_error_if_cancelled (task)) {
1190     g_object_unref (task);
1191     return;
1192   }
1193
1194   if (!args) {
1195     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1196         "'createStream' cmd failed: %s", command_name);
1197     g_object_unref (task);
1198     return;
1199   }
1200
1201   if (args->len < 2) {
1202     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1203         "'createStream' cmd failed; not enough return arguments");
1204     g_object_unref (task);
1205     return;
1206   }
1207
1208   result = g_ptr_array_index (args, 1);
1209   if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) {
1210     GString *error_dump = g_string_new ("");
1211
1212     gst_amf_node_dump (result, -1, error_dump);
1213
1214     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1215         "'createStream' cmd failed: %s: %s", command_name, error_dump->str);
1216     g_object_unref (task);
1217
1218     g_string_free (error_dump, TRUE);
1219     return;
1220   }
1221
1222   data->id = gst_amf_node_get_number (result);
1223   GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id);
1224
1225   if (data->id == 0) {
1226     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
1227         "'createStream' cmd returned ID 0");
1228     g_object_unref (task);
1229     return;
1230   }
1231
1232   send_publish_or_play (task);
1233 }
1234
1235 static void
1236 send_publish_or_play (GTask * task)
1237 {
1238   GstRtmpConnection *connection = g_task_get_source_object (task);
1239   StreamTaskData *data = g_task_get_task_data (task);
1240   const gchar *command = data->publish ? "publish" : "play";
1241   GstAmfNode *command_object, *stream_name, *argument;
1242
1243   command_object = gst_amf_node_new_null ();
1244   stream_name = gst_amf_node_new_string (data->stream, -1);
1245
1246   if (data->publish) {
1247     /* publishing type (live, record, append) */
1248     argument = gst_amf_node_new_string ("live", -1);
1249   } else {
1250     /* "Start" argument: -2 = live or recording, -1 = only live
1251        0 or positive = only recording, seek to X seconds */
1252     argument = gst_amf_node_new_number (-2);
1253   }
1254
1255   GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT,
1256       command, data->stream, data->id);
1257   gst_rtmp_connection_expect_command (connection, on_publish_or_play_status,
1258       task, data->id, "onStatus");
1259   gst_rtmp_connection_send_command (connection, NULL, NULL, data->id,
1260       command, command_object, stream_name, argument, NULL);
1261
1262   if (!data->publish) {
1263     /* Matches librtmp */
1264     send_set_buffer_length (connection, data->id, 30000);
1265   }
1266
1267   gst_amf_node_free (command_object);
1268   gst_amf_node_free (stream_name);
1269   gst_amf_node_free (argument);
1270 }
1271
1272 static void
1273 on_publish_or_play_status (const gchar * command_name, GPtrArray * args,
1274     gpointer user_data)
1275 {
1276   GTask *task = G_TASK (user_data);
1277   GstRtmpConnection *connection = g_task_get_source_object (task);
1278   StreamTaskData *data = g_task_get_task_data (task);
1279   const gchar *command = data->publish ? "publish" : "play", *code = NULL;
1280   GString *info_dump;
1281
1282   if (g_task_return_error_if_cancelled (task)) {
1283     g_object_unref (task);
1284     return;
1285   }
1286
1287   if (!args) {
1288     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1289         "'%s' cmd failed: %s", command, command_name);
1290     g_object_unref (task);
1291     return;
1292   }
1293
1294   if (args->len < 2) {
1295     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1296         "'%s' cmd failed; not enough return arguments", command);
1297     g_object_unref (task);
1298     return;
1299   }
1300
1301   {
1302     const GstAmfNode *info_object, *code_object;
1303     info_object = g_ptr_array_index (args, 1);
1304     code_object = gst_amf_node_get_field (info_object, "code");
1305
1306     if (code_object) {
1307       code = gst_amf_node_peek_string (code_object, NULL);
1308     }
1309
1310     info_dump = g_string_new ("");
1311     gst_amf_node_dump (info_object, -1, info_dump);
1312   }
1313
1314   if (data->publish) {
1315     if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) {
1316       GST_INFO ("publish success: %s", info_dump->str);
1317       g_task_return_boolean (task, TRUE);
1318       goto out;
1319     }
1320
1321     if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) {
1322       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS,
1323           "publish denied; stream already exists: %s", info_dump->str);
1324       goto out;
1325     }
1326
1327     if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) {
1328       g_task_return_new_error (task, G_IO_ERROR,
1329           G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str);
1330       goto out;
1331     }
1332   } else {
1333     if (g_strcmp0 (code, "NetStream.Play.Start") == 0 ||
1334         g_strcmp0 (code, "NetStream.Play.PublishNotify") == 0 ||
1335         g_strcmp0 (code, "NetStream.Play.Reset") == 0) {
1336       GST_INFO ("play success: %s", info_dump->str);
1337       g_task_return_boolean (task, TRUE);
1338       goto out;
1339     }
1340
1341     if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) {
1342       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND,
1343           "play denied; stream not found: %s", info_dump->str);
1344       goto out;
1345     }
1346   }
1347
1348   g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1349       "'%s' cmd failed: %s: %s", command, command_name, info_dump->str);
1350
1351 out:
1352   g_string_free (info_dump, TRUE);
1353
1354   g_signal_handler_disconnect (connection, data->error_handler_id);
1355   data->error_handler_id = 0;
1356
1357   g_object_unref (task);
1358 }
1359
1360 static gboolean
1361 start_stream_finish (GstRtmpConnection * connection,
1362     GAsyncResult * result, guint32 * stream_id, GError ** error)
1363 {
1364   GTask *task;
1365   StreamTaskData *data;
1366
1367   g_return_val_if_fail (g_task_is_valid (result, connection), FALSE);
1368
1369   task = G_TASK (result);
1370
1371   if (!g_task_propagate_boolean (G_TASK (result), error)) {
1372     return FALSE;
1373   }
1374
1375   data = g_task_get_task_data (task);
1376
1377   if (stream_id) {
1378     *stream_id = data->id;
1379   }
1380
1381   return TRUE;
1382 }
1383
1384 gboolean
1385 gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
1386     GAsyncResult * result, guint32 * stream_id, GError ** error)
1387 {
1388   return start_stream_finish (connection, result, stream_id, error);
1389 }
1390
1391 gboolean
1392 gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
1393     GAsyncResult * result, guint32 * stream_id, GError ** error)
1394 {
1395   return start_stream_finish (connection, result, stream_id, error);
1396 }
1397
1398 void
1399 gst_rtmp_client_stop_publish (GstRtmpConnection * connection,
1400     const gchar * stream, const GstRtmpStopCommands stop_commands)
1401 {
1402   send_stop (connection, stream, stop_commands);
1403 }
1404
1405 static void
1406 send_stop (GstRtmpConnection * connection, const gchar * stream,
1407     const GstRtmpStopCommands stop_commands)
1408 {
1409   GstAmfNode *command_object, *stream_name;
1410
1411   command_object = gst_amf_node_new_null ();
1412   stream_name = gst_amf_node_new_string (stream, -1);
1413
1414   if (stop_commands & GST_RTMP_STOP_COMMANDS_FCUNPUBLISH) {
1415     GST_DEBUG ("Sending stop command 'FCUnpublish' for stream '%s'", stream);
1416     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1417         "FCUnpublish", command_object, stream_name, NULL);
1418   }
1419   if (stop_commands & GST_RTMP_STOP_COMMANDS_CLOSE_STREAM) {
1420     GST_DEBUG ("Sending stop command 'closeStream' for stream '%s'", stream);
1421     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1422         "closeStream", command_object, stream_name, NULL);
1423   }
1424   if (stop_commands & GST_RTMP_STOP_COMMANDS_DELETE_STREAM) {
1425     GST_DEBUG ("Sending stop command 'deleteStream' for stream '%s'", stream);
1426     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1427         "deleteStream", command_object, stream_name, NULL);
1428   }
1429
1430   gst_amf_node_free (stream_name);
1431   gst_amf_node_free (command_object);
1432 }