gst: don't use volatile to mean atomic
[platform/upstream/gstreamer.git] / gst / rtmp2 / rtmp / rtmpclient.c
1 /* GStreamer RTMP Library
2  * Copyright (C) 2013 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include <gio/gio.h>
28 #include <string.h>
29 #include "rtmpclient.h"
30 #include "rtmphandshake.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
33
34 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
35 #define GST_CAT_DEFAULT gst_rtmp_client_debug_category
36
37 static void send_connect_done (const gchar * command_name, GPtrArray * args,
38     gpointer user_data);
39 static void create_stream_done (const gchar * command_name, GPtrArray * args,
40     gpointer user_data);
41 static void on_publish_or_play_status (const gchar * command_name,
42     GPtrArray * args, gpointer user_data);
43
44 static void
45 init_debug (void)
46 {
47   static gsize done = 0;
48   if (g_once_init_enter (&done)) {
49     GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category,
50         "rtmpclient", 0, "debug category for the rtmp client");
51     GST_DEBUG_REGISTER_FUNCPTR (send_connect_done);
52     GST_DEBUG_REGISTER_FUNCPTR (create_stream_done);
53     GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status);
54     g_once_init_leave (&done, 1);
55   }
56 }
57
58 static const gchar *scheme_strings[] = {
59   "rtmp",
60   "rtmps",
61   NULL
62 };
63
64 #define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1)
65
66 GType
67 gst_rtmp_scheme_get_type (void)
68 {
69   static gsize scheme_type = 0;
70   static const GEnumValue scheme[] = {
71     {GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"},
72     {GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"},
73     {0, NULL, NULL},
74   };
75
76   if (g_once_init_enter (&scheme_type)) {
77     GType tmp = g_enum_register_static ("GstRtmpScheme", scheme);
78     g_once_init_leave (&scheme_type, tmp);
79   }
80
81   return (GType) scheme_type;
82 }
83
84 GstRtmpScheme
85 gst_rtmp_scheme_from_string (const gchar * string)
86 {
87   if (string) {
88     gint value;
89
90     for (value = 0; value < NUM_SCHEMES; value++) {
91       if (strcmp (scheme_strings[value], string) == 0) {
92         return value;
93       }
94     }
95   }
96
97   return -1;
98 }
99
100 GstRtmpScheme
101 gst_rtmp_scheme_from_uri (const GstUri * uri)
102 {
103   const gchar *scheme = gst_uri_get_scheme (uri);
104   if (!scheme) {
105     return GST_RTMP_SCHEME_RTMP;
106   }
107
108   return gst_rtmp_scheme_from_string (scheme);
109 }
110
111 const gchar *
112 gst_rtmp_scheme_to_string (GstRtmpScheme scheme)
113 {
114   if (scheme >= 0 && scheme < NUM_SCHEMES) {
115     return scheme_strings[scheme];
116   }
117
118   return "invalid";
119 }
120
121 const gchar *const *
122 gst_rtmp_scheme_get_strings (void)
123 {
124   return scheme_strings;
125 }
126
127 guint
128 gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme)
129 {
130   switch (scheme) {
131     case GST_RTMP_SCHEME_RTMP:
132       return 1935;
133
134     case GST_RTMP_SCHEME_RTMPS:
135       return 443;
136
137     default:
138       g_return_val_if_reached (0);
139   }
140 }
141
142 GType
143 gst_rtmp_authmod_get_type (void)
144 {
145   static gsize authmod_type = 0;
146   static const GEnumValue authmod[] = {
147     {GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"},
148     {GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"},
149     {GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"},
150     {0, NULL, NULL},
151   };
152
153   if (g_once_init_enter (&authmod_type)) {
154     GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod);
155     g_once_init_leave (&authmod_type, tmp);
156   }
157
158   return (GType) authmod_type;
159 }
160
161 static const gchar *
162 gst_rtmp_authmod_get_nick (GstRtmpAuthmod value)
163 {
164   GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD);
165   GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL;
166   return ev ? ev->value_nick : "(unknown)";
167 }
168
169 GType
170 gst_rtmp_stop_commands_get_type (void)
171 {
172   static gsize stop_commands_type = 0;
173   static const GFlagsValue stop_commands[] = {
174     {GST_RTMP_STOP_COMMANDS_NONE, "No command", "none"},
175     {GST_RTMP_STOP_COMMANDS_FCUNPUBLISH, "FCUnpublish", "fcunpublish"},
176     {GST_RTMP_STOP_COMMANDS_CLOSE_STREAM, "closeStream", "closestream"},
177     {GST_RTMP_STOP_COMMANDS_DELETE_STREAM, "deleteStream", "deletestream"},
178     {0, NULL, NULL},
179   };
180
181   if (g_once_init_enter (&stop_commands_type)) {
182     GType tmp = g_flags_register_static ("GstRtmpStopCommands", stop_commands);
183     g_once_init_leave (&stop_commands_type, tmp);
184   }
185
186   return (GType) stop_commands_type;
187 }
188
189 void
190 gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src)
191 {
192   g_return_if_fail (dest);
193   g_return_if_fail (src);
194
195   dest->scheme = src->scheme;
196   dest->host = g_strdup (src->host);
197   dest->port = src->port;
198   dest->application = g_strdup (src->application);
199   dest->stream = g_strdup (src->stream);
200   dest->username = g_strdup (src->username);
201   dest->password = g_strdup (src->password);
202   dest->secure_token = g_strdup (src->secure_token);
203   dest->authmod = src->authmod;
204   dest->timeout = src->timeout;
205   dest->tls_flags = src->tls_flags;
206   dest->flash_ver = g_strdup (src->flash_ver);
207   dest->publish = src->publish;
208 }
209
210 void
211 gst_rtmp_location_clear (GstRtmpLocation * location)
212 {
213   g_return_if_fail (location);
214
215   g_clear_pointer (&location->host, g_free);
216   location->port = 0;
217   g_clear_pointer (&location->application, g_free);
218   g_clear_pointer (&location->stream, g_free);
219   g_clear_pointer (&location->username, g_free);
220   g_clear_pointer (&location->password, g_free);
221   g_clear_pointer (&location->secure_token, g_free);
222   g_clear_pointer (&location->flash_ver, g_free);
223   location->publish = FALSE;
224 }
225
226 gchar *
227 gst_rtmp_location_get_string (const GstRtmpLocation * location,
228     gboolean with_stream)
229 {
230   GstUri *uri;
231   gchar *base, *string;
232   const gchar *scheme_string;
233   guint default_port;
234
235   g_return_val_if_fail (location, NULL);
236
237   scheme_string = gst_rtmp_scheme_to_string (location->scheme);
238   default_port = gst_rtmp_scheme_get_default_port (location->scheme);
239
240   uri = gst_uri_new (scheme_string, NULL, location->host,
241       location->port == default_port ? GST_URI_NO_PORT : location->port, "/",
242       NULL, NULL);
243   base = gst_uri_to_string (uri);
244
245   string = g_strconcat (base, location->application, with_stream ? "/" : NULL,
246       location->stream, NULL);
247
248   g_free (base);
249   gst_uri_unref (uri);
250
251   return string;
252 }
253
254 /* Flag values for the audioCodecs property,
255  * rtmp_specification_1.0.pdf page 32 */
256 enum
257 {
258   SUPPORT_SND_NONE = 0x001,     /* Raw sound, no compression */
259   SUPPORT_SND_ADPCM = 0x002,    /* ADPCM compression */
260   SUPPORT_SND_MP3 = 0x004,      /* mp3 compression */
261   SUPPORT_SND_INTEL = 0x008,    /* Not used */
262   SUPPORT_SND_UNUSED = 0x010,   /* Not used */
263   SUPPORT_SND_NELLY8 = 0x020,   /* NellyMoser at 8-kHz compression */
264   SUPPORT_SND_NELLY = 0x040,    /* NellyMoser compression
265                                  * (5, 11, 22, and 44 kHz) */
266   SUPPORT_SND_G711A = 0x080,    /* G711A sound compression
267                                  * (Flash Media Server only) */
268   SUPPORT_SND_G711U = 0x100,    /* G711U sound compression
269                                  * (Flash Media Server only) */
270   SUPPORT_SND_NELLY16 = 0x200,  /* NellyMoser at 16-kHz compression */
271   SUPPORT_SND_AAC = 0x400,      /* Advanced audio coding (AAC) codec */
272   SUPPORT_SND_SPEEX = 0x800,    /* Speex Audio */
273   SUPPORT_SND_ALL = 0xFFF,      /* All RTMP-supported audio codecs */
274 };
275
276 /* audioCodecs value sent by libavformat. All "used" codecs. */
277 #define GST_RTMP_AUDIOCODECS \
278   (SUPPORT_SND_ALL & ~SUPPORT_SND_INTEL & ~SUPPORT_SND_UNUSED)
279 G_STATIC_ASSERT (GST_RTMP_AUDIOCODECS == 4071); /* libavformat's magic number */
280
281 /* Flag values for the videoCodecs property,
282  * rtmp_specification_1.0.pdf page 32 */
283 enum
284 {
285   SUPPORT_VID_UNUSED = 0x01,    /* Obsolete value */
286   SUPPORT_VID_JPEG = 0x02,      /* Obsolete value */
287   SUPPORT_VID_SORENSON = 0x04,  /* Sorenson Flash video */
288   SUPPORT_VID_HOMEBREW = 0x08,  /* V1 screen sharing */
289   SUPPORT_VID_VP6 = 0x10,       /* On2 video (Flash 8+) */
290   SUPPORT_VID_VP6ALPHA = 0x20,  /* On2 video with alpha channel */
291   SUPPORT_VID_HOMEBREWV = 0x40, /* Screen sharing version 2 (Flash 8+) */
292   SUPPORT_VID_H264 = 0x80,      /* H264 video */
293   SUPPORT_VID_ALL = 0xFF,       /* All RTMP-supported video codecs */
294 };
295
296 /* videoCodecs value sent by libavformat. All non-obsolete codecs. */
297 #define GST_RTMP_VIDEOCODECS \
298   (SUPPORT_VID_ALL & ~SUPPORT_VID_UNUSED & ~SUPPORT_VID_JPEG)
299 G_STATIC_ASSERT (GST_RTMP_VIDEOCODECS == 252);  /* libavformat's magic number */
300
301 /* Flag values for the videoFunction property,
302  * rtmp_specification_1.0.pdf page 32 */
303 enum
304 {
305   /* Indicates that the client can perform frame-accurate seeks. */
306   SUPPORT_VID_CLIENT_SEEK = 1,
307 };
308
309 /* videoFunction value sent by libavformat */
310 #define GST_RTMP_VIDEOFUNCTION (SUPPORT_VID_CLIENT_SEEK)
311 G_STATIC_ASSERT (GST_RTMP_VIDEOFUNCTION == 1);  /* libavformat's magic number */
312
313 static void socket_connect (GTask * task);
314 static void socket_connect_done (GObject * source, GAsyncResult * result,
315     gpointer user_data);
316 static void handshake_done (GObject * source, GAsyncResult * result,
317     gpointer user_data);
318 static void send_connect (GTask * task);
319 static void send_stop (GstRtmpConnection * connection, const gchar * stream,
320     const GstRtmpStopCommands stop_commands);
321 static void send_secure_token_response (GTask * task,
322     GstRtmpConnection * connection, const gchar * challenge);
323 static void connection_error (GstRtmpConnection * connection,
324     gpointer user_data);
325
326 #define DEFAULT_TIMEOUT 5
327
328 typedef struct
329 {
330   GstRtmpLocation location;
331   gchar *auth_query;
332   GstRtmpConnection *connection;
333   gulong error_handler_id;
334 } ConnectTaskData;
335
336 static ConnectTaskData *
337 connect_task_data_new (const GstRtmpLocation * location)
338 {
339   ConnectTaskData *data = g_slice_new0 (ConnectTaskData);
340   gst_rtmp_location_copy (&data->location, location);
341   return data;
342 }
343
344 static void
345 connect_task_data_free (gpointer ptr)
346 {
347   ConnectTaskData *data = ptr;
348   gst_rtmp_location_clear (&data->location);
349   g_clear_pointer (&data->auth_query, g_free);
350   if (data->error_handler_id) {
351     g_signal_handler_disconnect (data->connection, data->error_handler_id);
352   }
353   g_clear_object (&data->connection);
354   g_slice_free (ConnectTaskData, data);
355 }
356
357 static GRegex *auth_regex = NULL;
358
359 void
360 gst_rtmp_client_connect_async (const GstRtmpLocation * location,
361     GCancellable * cancellable, GAsyncReadyCallback callback,
362     gpointer user_data)
363 {
364   GTask *task;
365
366   init_debug ();
367
368   if (g_once_init_enter (&auth_regex)) {
369     GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *"
370         "\\[ *authmod=(?<authmod>.*?) *\\] *: *"
371         "(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL);
372     g_once_init_leave (&auth_regex, re);
373   }
374
375   task = g_task_new (NULL, cancellable, callback, user_data);
376
377   g_task_set_task_data (task, connect_task_data_new (location),
378       connect_task_data_free);
379
380   socket_connect (task);
381 }
382
383 static void
384 socket_connect (GTask * task)
385 {
386   ConnectTaskData *data = g_task_get_task_data (task);
387   GSocketConnectable *addr;
388   GSocketClient *socket_client;
389
390   if (data->location.timeout < 0) {
391     data->location.timeout = DEFAULT_TIMEOUT;
392   }
393
394   if (data->error_handler_id) {
395     g_signal_handler_disconnect (data->connection, data->error_handler_id);
396     data->error_handler_id = 0;
397   }
398
399   if (data->connection) {
400     gst_rtmp_connection_close (data->connection);
401     g_clear_object (&data->connection);
402   }
403
404   if (!data->location.host) {
405     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
406         "Host is not set");
407     g_object_unref (task);
408     return;
409   }
410
411   if (!data->location.port) {
412     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
413         "Port is not set");
414     g_object_unref (task);
415     return;
416   }
417
418   socket_client = g_socket_client_new ();
419   g_socket_client_set_timeout (socket_client, data->location.timeout);
420
421   switch (data->location.scheme) {
422     case GST_RTMP_SCHEME_RTMP:
423       break;
424
425     case GST_RTMP_SCHEME_RTMPS:
426       GST_DEBUG ("Configuring TLS, validation flags 0x%02x",
427           data->location.tls_flags);
428       g_socket_client_set_tls (socket_client, TRUE);
429       g_socket_client_set_tls_validation_flags (socket_client,
430           data->location.tls_flags);
431       break;
432
433     default:
434       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
435           "Invalid scheme ID %d", data->location.scheme);
436       g_object_unref (socket_client);
437       g_object_unref (task);
438       return;
439   }
440
441   addr = g_network_address_new (data->location.host, data->location.port);
442
443   GST_DEBUG ("Starting socket connection");
444
445   g_socket_client_connect_async (socket_client, addr,
446       g_task_get_cancellable (task), socket_connect_done, task);
447   g_object_unref (addr);
448   g_object_unref (socket_client);
449 }
450
451 static void
452 socket_connect_done (GObject * source, GAsyncResult * result,
453     gpointer user_data)
454 {
455   GSocketClient *socket_client = G_SOCKET_CLIENT (source);
456   GSocketConnection *socket_connection;
457   GTask *task = user_data;
458   GError *error = NULL;
459
460   socket_connection =
461       g_socket_client_connect_finish (socket_client, result, &error);
462
463   if (g_task_return_error_if_cancelled (task)) {
464     GST_DEBUG ("Socket connection was cancelled");
465     g_object_unref (task);
466     return;
467   }
468
469   if (socket_connection == NULL) {
470     GST_ERROR ("Socket connection error");
471     g_task_return_error (task, error);
472     g_object_unref (task);
473     return;
474   }
475
476   GST_DEBUG ("Socket connection established");
477
478   gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE,
479       g_task_get_cancellable (task), handshake_done, task);
480   g_object_unref (socket_connection);
481 }
482
483
484 static void
485 handshake_done (GObject * source, GAsyncResult * result, gpointer user_data)
486 {
487   GIOStream *stream = G_IO_STREAM (source);
488   GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream);
489   GTask *task = user_data;
490   ConnectTaskData *data = g_task_get_task_data (task);
491   GError *error = NULL;
492   gboolean res;
493
494   res = gst_rtmp_client_handshake_finish (stream, result, &error);
495   if (!res) {
496     g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
497     g_task_return_error (task, error);
498     g_object_unref (task);
499     return;
500   }
501
502   data->connection = gst_rtmp_connection_new (socket_connection,
503       g_task_get_cancellable (task));
504   data->error_handler_id = g_signal_connect (data->connection,
505       "error", G_CALLBACK (connection_error), task);
506
507   send_connect (task);
508 }
509
510 static void
511 connection_error (GstRtmpConnection * connection, gpointer user_data)
512 {
513   GTask *task = user_data;
514   if (!g_task_had_error (task))
515     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
516         "error during connection attempt");
517 }
518
519 static gchar *
520 do_adobe_auth (const gchar * username, const gchar * password,
521     const gchar * salt, const gchar * opaque, const gchar * challenge)
522 {
523   guint8 hash[16];              /* MD5 digest */
524   gsize hashlen = sizeof hash;
525   gchar *challenge2, *auth_query;
526   GChecksum *md5;
527
528   g_return_val_if_fail (username, NULL);
529   g_return_val_if_fail (password, NULL);
530   g_return_val_if_fail (salt, NULL);
531
532   md5 = g_checksum_new (G_CHECKSUM_MD5);
533   g_checksum_update (md5, (guchar *) username, -1);
534   g_checksum_update (md5, (guchar *) salt, -1);
535   g_checksum_update (md5, (guchar *) password, -1);
536
537   g_checksum_get_digest (md5, hash, &hashlen);
538   g_warn_if_fail (hashlen == sizeof hash);
539
540   {
541     gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
542     g_checksum_reset (md5);
543     g_checksum_update (md5, (guchar *) hashstr, -1);
544     g_free (hashstr);
545   }
546
547   if (opaque)
548     g_checksum_update (md5, (guchar *) opaque, -1);
549   else if (challenge)
550     g_checksum_update (md5, (guchar *) challenge, -1);
551
552   challenge2 = g_strdup_printf ("%08x", g_random_int ());
553   g_checksum_update (md5, (guchar *) challenge2, -1);
554
555   g_checksum_get_digest (md5, hash, &hashlen);
556   g_warn_if_fail (hashlen == sizeof hash);
557
558   {
559     gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
560
561     if (opaque) {
562       auth_query =
563           g_strdup_printf
564           ("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe",
565           username, challenge2, hashstr, opaque);
566     } else {
567       auth_query =
568           g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s",
569           "adobe", username, challenge2, hashstr);
570     }
571     g_free (hashstr);
572   }
573
574   g_checksum_free (md5);
575   g_free (challenge2);
576
577   return auth_query;
578 }
579
580 static void
581 send_connect (GTask * task)
582 {
583   ConnectTaskData *data = g_task_get_task_data (task);
584   GstAmfNode *node;
585   const gchar *app, *flash_ver;
586   gchar *uri, *appstr = NULL, *uristr = NULL;
587   gboolean publish;
588
589   node = gst_amf_node_new_object ();
590   app = data->location.application;
591   flash_ver = data->location.flash_ver;
592   publish = data->location.publish;
593   uri = gst_rtmp_location_get_string (&data->location, FALSE);
594
595   if (!app) {
596     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
597         "Application is not set");
598     g_object_unref (task);
599     goto out;
600   }
601
602   if (!flash_ver) {
603     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
604         "Flash version is not set");
605     g_object_unref (task);
606     goto out;
607   }
608
609   if (data->auth_query) {
610     const gchar *query = data->auth_query;
611     appstr = g_strdup_printf ("%s?%s", app, query);
612     uristr = g_strdup_printf ("%s?%s", uri, query);
613   } else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) {
614     const gchar *user = data->location.username;
615     const gchar *authmod = "adobe";
616
617     if (!user) {
618       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
619           "no username for adobe authentication");
620       g_object_unref (task);
621       goto out;
622     }
623
624     if (!data->location.password) {
625       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
626           "no password for adobe authentication");
627       g_object_unref (task);
628       goto out;
629     }
630
631     appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user);
632     uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user);
633   } else {
634     appstr = g_strdup (app);
635     uristr = g_strdup (uri);
636   }
637
638   /* Arguments for the connect command.
639    * Most of these are described in rtmp_specification_1.0.pdf page 30 */
640
641   /* "The server application name the client is connected to." */
642   gst_amf_node_append_field_take_string (node, "app", appstr, -1);
643
644   if (publish) {
645     /* Undocumented. Sent by both libavformat and librtmp. */
646     gst_amf_node_append_field_string (node, "type", "nonprivate", -1);
647   }
648
649   /* "Flash Player version. It is the same string as returned by the
650    * ApplicationScript getversion () function." */
651   gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1);
652
653   /* "URL of the source SWF file making the connection."
654    * XXX: libavformat sends "swfUrl" here, if provided. */
655
656   /* "URL of the Server. It has the following format.
657    * protocol://servername:port/appName/appInstance" */
658   gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1);
659
660   if (!publish) {
661     /* "True if proxy is being used." */
662     gst_amf_node_append_field_boolean (node, "fpad", FALSE);
663
664     /* Undocumented. Sent by libavformat. */
665     gst_amf_node_append_field_number (node, "capabilities",
666         15 /* libavformat's magic number */ );
667
668     /* "Indicates what audio codecs the client supports." */
669     gst_amf_node_append_field_number (node, "audioCodecs",
670         GST_RTMP_AUDIOCODECS);
671
672     /* "Indicates what video codecs are supported." */
673     gst_amf_node_append_field_number (node, "videoCodecs",
674         GST_RTMP_VIDEOCODECS);
675
676     /* "Indicates what special video functions are supported." */
677     gst_amf_node_append_field_number (node, "videoFunction",
678         GST_RTMP_VIDEOFUNCTION);
679
680     /* "URL of the web page from where the SWF file was loaded."
681      * XXX: libavformat sends "pageUrl" here, if provided. */
682   }
683
684   gst_rtmp_connection_send_command (data->connection, send_connect_done,
685       task, 0, "connect", node, NULL);
686
687 out:
688   gst_amf_node_free (node);
689   g_free (uri);
690 }
691
692 static void
693 send_connect_done (const gchar * command_name, GPtrArray * args,
694     gpointer user_data)
695 {
696   GTask *task = G_TASK (user_data);
697   ConnectTaskData *data = g_task_get_task_data (task);
698   const GstAmfNode *node, *optional_args;
699   const gchar *code;
700
701   if (g_task_return_error_if_cancelled (task)) {
702     g_object_unref (task);
703     return;
704   }
705
706   if (!args) {
707     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
708         "connect failed: %s", command_name);
709     g_object_unref (task);
710     return;
711   }
712
713   if (args->len < 2) {
714     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
715         "connect failed; not enough return arguments");
716     g_object_unref (task);
717     return;
718   }
719
720   optional_args = g_ptr_array_index (args, 1);
721
722   node = gst_amf_node_get_field (optional_args, "code");
723   if (!node) {
724     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
725         "result code missing from connect cmd result");
726     g_object_unref (task);
727     return;
728   }
729
730   code = gst_amf_node_peek_string (node, NULL);
731   GST_INFO ("connect result: %s", GST_STR_NULL (code));
732
733   if (g_str_equal (code, "NetConnection.Connect.Success")) {
734     node = gst_amf_node_get_field (optional_args, "secureToken");
735     send_secure_token_response (task, data->connection,
736         node ? gst_amf_node_peek_string (node, NULL) : NULL);
737     return;
738   }
739
740   if (g_str_equal (code, "NetConnection.Connect.Rejected")) {
741     GstRtmpAuthmod authmod = data->location.authmod;
742     GMatchInfo *match_info;
743     const gchar *desc;
744     GstUri *query;
745
746     node = gst_amf_node_get_field (optional_args, "description");
747     if (!node) {
748       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
749           "Connect rejected; no description");
750       g_object_unref (task);
751       return;
752     }
753
754     desc = gst_amf_node_peek_string (node, NULL);
755     GST_DEBUG ("connect result desc: %s", GST_STR_NULL (desc));
756
757     if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) {
758       if (strstr (desc, "authmod=adobe")) {
759         GST_INFO ("Reconnecting with authmod=adobe");
760         data->location.authmod = GST_RTMP_AUTHMOD_ADOBE;
761         socket_connect (task);
762         return;
763       }
764
765       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
766           "unhandled authentication mode: %s", desc);
767       g_object_unref (task);
768       return;
769     }
770
771     if (!g_regex_match (auth_regex, desc, 0, &match_info)) {
772       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
773           "failed to parse auth rejection: %s", desc);
774       g_object_unref (task);
775       return;
776     }
777
778     {
779       gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod");
780       gchar *query_str = g_match_info_fetch_named (match_info, "query");
781       gboolean matches;
782
783       GST_INFO ("regex parsed auth: authmod=%s, query=%s",
784           GST_STR_NULL (authmod_str), GST_STR_NULL (query_str));
785       g_match_info_free (match_info);
786
787       switch (authmod) {
788         case GST_RTMP_AUTHMOD_ADOBE:
789           matches = g_str_equal (authmod_str, "adobe");
790           break;
791
792         default:
793           matches = FALSE;
794           break;
795       }
796
797       if (!matches) {
798         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
799             "server uses wrong authentication mode '%s'; expected %s",
800             GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod));
801         g_object_unref (task);
802         g_free (authmod_str);
803         g_free (query_str);
804         return;
805       }
806       g_free (authmod_str);
807
808       query = gst_uri_from_string (query_str);
809       if (!query) {
810         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
811             "failed to parse authentication query '%s'",
812             GST_STR_NULL (query_str));
813         g_object_unref (task);
814         g_free (query_str);
815         return;
816       }
817       g_free (query_str);
818     }
819
820     {
821       const gchar *reason = gst_uri_get_query_value (query, "reason");
822
823       if (g_str_equal (reason, "authfailed")) {
824         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
825             "authentication failed! wrong credentials?");
826         g_object_unref (task);
827         gst_uri_unref (query);
828         return;
829       }
830
831       if (!g_str_equal (reason, "needauth")) {
832         g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
833             "unhandled rejection reason '%s'", reason ? reason : "");
834         g_object_unref (task);
835         gst_uri_unref (query);
836         return;
837       }
838     }
839
840     g_warn_if_fail (!data->auth_query);
841     data->auth_query = do_adobe_auth (data->location.username,
842         data->location.password, gst_uri_get_query_value (query, "salt"),
843         gst_uri_get_query_value (query, "opaque"),
844         gst_uri_get_query_value (query, "challenge"));
845
846     gst_uri_unref (query);
847
848     if (!data->auth_query) {
849       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
850           "couldn't generate adobe style authentication query");
851       g_object_unref (task);
852       return;
853     }
854
855     socket_connect (task);
856     return;
857   }
858
859   g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
860       "unhandled connect result code: %s", GST_STR_NULL (code));
861   g_object_unref (task);
862 }
863
864 /* prep key: pack 1st 16 chars into 4 LittleEndian ints */
865 static void
866 rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4])
867 {
868   gchar copy[17];
869
870   g_return_if_fail (key);
871   g_return_if_fail (out);
872
873   /* ensure we can read 16 bytes */
874   strncpy (copy, key, 16);
875   /* placate GCC 8 -Wstringop-truncation */
876   copy[16] = 0;
877
878   out[0] = GST_READ_UINT32_LE (copy);
879   out[1] = GST_READ_UINT32_LE (copy + 4);
880   out[2] = GST_READ_UINT32_LE (copy + 8);
881   out[3] = GST_READ_UINT32_LE (copy + 12);
882 }
883
884 /* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */
885 static GArray *
886 rtmp_tea_decode_prep_text (const gchar * text)
887 {
888   GArray *arr;
889   gsize len, i;
890
891   g_return_val_if_fail (text, NULL);
892
893   len = strlen (text);
894   arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8);
895
896   for (i = 0; i < len; i += 8) {
897     gchar copy[9];
898     guchar chars[4];
899     gsize j;
900     guint32 val;
901
902     /* ensure we can read 8 bytes */
903     strncpy (copy, text + i, 8);
904     /* placate GCC 8 -Wstringop-truncation */
905     copy[8] = 0;
906
907     for (j = 0; j < 4; j++) {
908       gint hi, lo;
909
910       hi = g_ascii_xdigit_value (copy[2 * j]);
911       lo = g_ascii_xdigit_value (copy[2 * j + 1]);
912
913       chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0);
914     }
915
916     val = GST_READ_UINT32_LE (chars);
917     g_array_append_val (arr, val);
918   }
919
920   return arr;
921 }
922
923 /* return text from uint32s to chars */
924 static gchar *
925 rtmp_tea_decode_return_text (GArray * arr)
926 {
927 #if G_BYTE_ORDER != G_LITTLE_ENDIAN
928   gsize i;
929
930   g_return_val_if_fail (arr, NULL);
931
932   for (i = 0; i < arr->len; i++) {
933     guint32 *val = &g_array_index (arr, guint32, i);
934     *val = GUINT32_TO_LE (*val);
935   }
936 #endif
937
938   /* array is alredy zero-terminated */
939   return g_array_free (arr, FALSE);
940 }
941
942 /* http://www.movable-type.co.uk/scripts/tea-block.html */
943 static void
944 rtmp_tea_decode_btea (GArray * text, guint32 key[4])
945 {
946   guint32 *v, n, *k;
947   guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9;
948   guint32 p, q;
949
950   g_return_if_fail (text);
951   g_return_if_fail (text->len > 0);
952   g_return_if_fail (key);
953
954   v = (guint32 *) text->data;
955   n = text->len;
956   k = key;
957   z = v[n - 1];
958   y = v[0];
959   q = 6 + 52 / n;
960   sum = q * DELTA;
961
962 #define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z));
963
964   while (sum != 0) {
965     e = sum >> 2 & 3;
966     for (p = n - 1; p > 0; p--)
967       z = v[p - 1], y = v[p] -= MX;
968     z = v[n - 1];
969     y = v[0] -= MX;
970     sum -= DELTA;
971   }
972
973 #undef MX
974 }
975
976 /* taken from librtmp */
977 static gchar *
978 rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text)
979 {
980   guint32 key[4];
981   GArray *text;
982
983   rtmp_tea_decode_prep_key (bin_key, key);
984   text = rtmp_tea_decode_prep_text (hex_text);
985   rtmp_tea_decode_btea (text, key);
986   return rtmp_tea_decode_return_text (text);
987 }
988
989 static void
990 send_secure_token_response (GTask * task, GstRtmpConnection * connection,
991     const gchar * challenge)
992 {
993   ConnectTaskData *data = g_task_get_task_data (task);
994   if (challenge) {
995     GstAmfNode *node1;
996     GstAmfNode *node2;
997     gchar *response;
998
999     if (!data->location.secure_token || !data->location.secure_token[0]) {
1000       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
1001           "server requires secure token authentication");
1002       g_object_unref (task);
1003       return;
1004     }
1005
1006     response = rtmp_tea_decode (data->location.secure_token, challenge);
1007
1008     GST_DEBUG ("response: %s", response);
1009
1010     node1 = gst_amf_node_new_null ();
1011     node2 = gst_amf_node_new_take_string (response, -1);
1012     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1013         "secureTokenResponse", node1, node2, NULL);
1014     gst_amf_node_free (node1);
1015     gst_amf_node_free (node2);
1016   }
1017
1018   g_signal_handler_disconnect (connection, data->error_handler_id);
1019   data->error_handler_id = 0;
1020
1021   g_task_return_pointer (task, g_object_ref (connection),
1022       gst_rtmp_connection_close_and_unref);
1023   g_object_unref (task);
1024 }
1025
1026 GstRtmpConnection *
1027 gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error)
1028 {
1029   GTask *task = G_TASK (result);
1030   return g_task_propagate_pointer (task, error);
1031 }
1032
1033 static void send_create_stream (GTask * task);
1034 static void send_publish_or_play (GTask * task);
1035
1036 typedef struct
1037 {
1038   GstRtmpConnection *connection;
1039   gulong error_handler_id;
1040   gchar *stream;
1041   gboolean publish;
1042   guint32 id;
1043 } StreamTaskData;
1044
1045 static StreamTaskData *
1046 stream_task_data_new (GstRtmpConnection * connection, const gchar * stream,
1047     gboolean publish)
1048 {
1049   StreamTaskData *data = g_slice_new0 (StreamTaskData);
1050   data->connection = g_object_ref (connection);
1051   data->stream = g_strdup (stream);
1052   data->publish = publish;
1053   return data;
1054 }
1055
1056 static void
1057 stream_task_data_free (gpointer ptr)
1058 {
1059   StreamTaskData *data = ptr;
1060   g_clear_pointer (&data->stream, g_free);
1061   if (data->error_handler_id) {
1062     g_signal_handler_disconnect (data->connection, data->error_handler_id);
1063   }
1064   g_clear_object (&data->connection);
1065   g_slice_free (StreamTaskData, data);
1066 }
1067
1068 static void
1069 start_stream (GstRtmpConnection * connection, const gchar * stream,
1070     gboolean publish, GCancellable * cancellable,
1071     GAsyncReadyCallback callback, gpointer user_data)
1072 {
1073   GTask *task;
1074   StreamTaskData *data;
1075
1076   init_debug ();
1077
1078   task = g_task_new (connection, cancellable, callback, user_data);
1079
1080   if (!stream) {
1081     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
1082         "Stream is not set");
1083     g_object_unref (task);
1084     return;
1085   }
1086
1087   data = stream_task_data_new (connection, stream, publish);
1088   g_task_set_task_data (task, data, stream_task_data_free);
1089
1090   data->error_handler_id = g_signal_connect (connection,
1091       "error", G_CALLBACK (connection_error), task);
1092
1093   send_create_stream (task);
1094 }
1095
1096 void
1097 gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
1098     const gchar * stream, GCancellable * cancellable,
1099     GAsyncReadyCallback callback, gpointer user_data)
1100 {
1101   start_stream (connection, stream, TRUE, cancellable, callback, user_data);
1102 }
1103
1104 void
1105 gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
1106     const gchar * stream, GCancellable * cancellable,
1107     GAsyncReadyCallback callback, gpointer user_data)
1108 {
1109   start_stream (connection, stream, FALSE, cancellable, callback, user_data);
1110 }
1111
1112 static void
1113 send_set_buffer_length (GstRtmpConnection * connection, guint32 stream,
1114     guint32 ms)
1115 {
1116   GstRtmpUserControl uc = {
1117     .type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH,
1118     .param = stream,
1119     .param2 = ms,
1120   };
1121
1122   gst_rtmp_connection_queue_message (connection,
1123       gst_rtmp_message_new_user_control (&uc));
1124 }
1125
1126 static void
1127 send_create_stream (GTask * task)
1128 {
1129   GstRtmpConnection *connection = g_task_get_source_object (task);
1130   StreamTaskData *data = g_task_get_task_data (task);
1131   GstAmfNode *command_object, *stream_name;
1132
1133   command_object = gst_amf_node_new_null ();
1134   stream_name = gst_amf_node_new_string (data->stream, -1);
1135
1136   if (data->publish) {
1137     /* Not part of RTMP documentation */
1138     GST_DEBUG ("Releasing stream '%s'", data->stream);
1139     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1140         "releaseStream", command_object, stream_name, NULL);
1141     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1142         "FCPublish", command_object, stream_name, NULL);
1143   } else {
1144     /* Matches librtmp */
1145     gst_rtmp_connection_request_window_size (connection,
1146         GST_RTMP_DEFAULT_WINDOW_ACK_SIZE);
1147     send_set_buffer_length (connection, 0, 300);
1148   }
1149
1150   GST_INFO ("Creating stream '%s'", data->stream);
1151   gst_rtmp_connection_send_command (connection, create_stream_done, task, 0,
1152       "createStream", command_object, NULL);
1153
1154   gst_amf_node_free (stream_name);
1155   gst_amf_node_free (command_object);
1156 }
1157
1158 static void
1159 create_stream_done (const gchar * command_name, GPtrArray * args,
1160     gpointer user_data)
1161 {
1162   GTask *task = G_TASK (user_data);
1163   StreamTaskData *data = g_task_get_task_data (task);
1164   GstAmfNode *result;
1165
1166   if (g_task_return_error_if_cancelled (task)) {
1167     g_object_unref (task);
1168     return;
1169   }
1170
1171   if (!args) {
1172     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1173         "createStream failed: %s", command_name);
1174     g_object_unref (task);
1175     return;
1176   }
1177
1178   if (args->len < 2) {
1179     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1180         "createStream failed; not enough return arguments");
1181     g_object_unref (task);
1182     return;
1183   }
1184
1185   result = g_ptr_array_index (args, 1);
1186   if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) {
1187     GString *error_dump = g_string_new ("");
1188
1189     gst_amf_node_dump (result, -1, error_dump);
1190
1191     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1192         "createStream failed: %s", error_dump->str);
1193     g_object_unref (task);
1194
1195     g_string_free (error_dump, TRUE);
1196     return;
1197   }
1198
1199   data->id = gst_amf_node_get_number (result);
1200   GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id);
1201
1202   if (data->id == 0) {
1203     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
1204         "createStream returned ID 0");
1205     g_object_unref (task);
1206     return;
1207   }
1208
1209   send_publish_or_play (task);
1210 }
1211
1212 static void
1213 send_publish_or_play (GTask * task)
1214 {
1215   GstRtmpConnection *connection = g_task_get_source_object (task);
1216   StreamTaskData *data = g_task_get_task_data (task);
1217   const gchar *command = data->publish ? "publish" : "play";
1218   GstAmfNode *command_object, *stream_name, *argument;
1219
1220   command_object = gst_amf_node_new_null ();
1221   stream_name = gst_amf_node_new_string (data->stream, -1);
1222
1223   if (data->publish) {
1224     /* publishing type (live, record, append) */
1225     argument = gst_amf_node_new_string ("live", -1);
1226   } else {
1227     /* "Start" argument: -2 = live or recording, -1 = only live
1228        0 or positive = only recording, seek to X seconds */
1229     argument = gst_amf_node_new_number (-2);
1230   }
1231
1232   GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT,
1233       command, data->stream, data->id);
1234   gst_rtmp_connection_expect_command (connection, on_publish_or_play_status,
1235       task, data->id, "onStatus");
1236   gst_rtmp_connection_send_command (connection, NULL, NULL, data->id,
1237       command, command_object, stream_name, argument, NULL);
1238
1239   if (!data->publish) {
1240     /* Matches librtmp */
1241     send_set_buffer_length (connection, data->id, 30000);
1242   }
1243
1244   gst_amf_node_free (command_object);
1245   gst_amf_node_free (stream_name);
1246   gst_amf_node_free (argument);
1247 }
1248
1249 static void
1250 on_publish_or_play_status (const gchar * command_name, GPtrArray * args,
1251     gpointer user_data)
1252 {
1253   GTask *task = G_TASK (user_data);
1254   GstRtmpConnection *connection = g_task_get_source_object (task);
1255   StreamTaskData *data = g_task_get_task_data (task);
1256   const gchar *command = data->publish ? "publish" : "play", *code = NULL;
1257   GString *info_dump;
1258
1259   if (g_task_return_error_if_cancelled (task)) {
1260     g_object_unref (task);
1261     return;
1262   }
1263
1264   if (!args) {
1265     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1266         "%s failed: %s", command, command_name);
1267     g_object_unref (task);
1268     return;
1269   }
1270
1271   if (args->len < 2) {
1272     g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1273         "%s failed; not enough return arguments", command);
1274     g_object_unref (task);
1275     return;
1276   }
1277
1278   {
1279     const GstAmfNode *info_object, *code_object;
1280     info_object = g_ptr_array_index (args, 1);
1281     code_object = gst_amf_node_get_field (info_object, "code");
1282
1283     if (code_object) {
1284       code = gst_amf_node_peek_string (code_object, NULL);
1285     }
1286
1287     info_dump = g_string_new ("");
1288     gst_amf_node_dump (info_object, -1, info_dump);
1289   }
1290
1291   if (data->publish) {
1292     if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) {
1293       GST_INFO ("publish success: %s", info_dump->str);
1294       g_task_return_boolean (task, TRUE);
1295       goto out;
1296     }
1297
1298     if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) {
1299       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS,
1300           "publish denied: stream already exists: %s", info_dump->str);
1301       goto out;
1302     }
1303
1304     if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) {
1305       g_task_return_new_error (task, G_IO_ERROR,
1306           G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str);
1307       goto out;
1308     }
1309   } else {
1310     if (g_strcmp0 (code, "NetStream.Play.Start") == 0 ||
1311         g_strcmp0 (code, "NetStream.Play.Reset") == 0) {
1312       GST_INFO ("play success: %s", info_dump->str);
1313       g_task_return_boolean (task, TRUE);
1314       goto out;
1315     }
1316
1317     if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) {
1318       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND,
1319           "play denied: stream not found: %s", info_dump->str);
1320       goto out;
1321     }
1322   }
1323
1324   g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1325       "unhandled %s result: %s", command, info_dump->str);
1326
1327 out:
1328   g_string_free (info_dump, TRUE);
1329
1330   g_signal_handler_disconnect (connection, data->error_handler_id);
1331   data->error_handler_id = 0;
1332
1333   g_object_unref (task);
1334 }
1335
1336 static gboolean
1337 start_stream_finish (GstRtmpConnection * connection,
1338     GAsyncResult * result, guint32 * stream_id, GError ** error)
1339 {
1340   GTask *task;
1341   StreamTaskData *data;
1342
1343   g_return_val_if_fail (g_task_is_valid (result, connection), FALSE);
1344
1345   task = G_TASK (result);
1346
1347   if (!g_task_propagate_boolean (G_TASK (result), error)) {
1348     return FALSE;
1349   }
1350
1351   data = g_task_get_task_data (task);
1352
1353   if (stream_id) {
1354     *stream_id = data->id;
1355   }
1356
1357   return TRUE;
1358 }
1359
1360 gboolean
1361 gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
1362     GAsyncResult * result, guint32 * stream_id, GError ** error)
1363 {
1364   return start_stream_finish (connection, result, stream_id, error);
1365 }
1366
1367 gboolean
1368 gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
1369     GAsyncResult * result, guint32 * stream_id, GError ** error)
1370 {
1371   return start_stream_finish (connection, result, stream_id, error);
1372 }
1373
1374 void
1375 gst_rtmp_client_stop_publish (GstRtmpConnection * connection,
1376     const gchar * stream, const GstRtmpStopCommands stop_commands)
1377 {
1378   send_stop (connection, stream, stop_commands);
1379 }
1380
1381 static void
1382 send_stop (GstRtmpConnection * connection, const gchar * stream,
1383     const GstRtmpStopCommands stop_commands)
1384 {
1385   GstAmfNode *command_object, *stream_name;
1386
1387   command_object = gst_amf_node_new_null ();
1388   stream_name = gst_amf_node_new_string (stream, -1);
1389
1390   if (stop_commands & GST_RTMP_STOP_COMMANDS_FCUNPUBLISH) {
1391     GST_DEBUG ("Sending stop command 'FCUnpublish' for stream '%s'", stream);
1392     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1393         "FCUnpublish", command_object, stream_name, NULL);
1394   }
1395   if (stop_commands & GST_RTMP_STOP_COMMANDS_CLOSE_STREAM) {
1396     GST_DEBUG ("Sending stop command 'closeStream' for stream '%s'", stream);
1397     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1398         "closeStream", command_object, stream_name, NULL);
1399   }
1400   if (stop_commands & GST_RTMP_STOP_COMMANDS_DELETE_STREAM) {
1401     GST_DEBUG ("Sending stop command 'deleteStream' for stream '%s'", stream);
1402     gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1403         "deleteStream", command_object, stream_name, NULL);
1404   }
1405
1406   gst_amf_node_free (stream_name);
1407   gst_amf_node_free (command_object);
1408 }