gst: don't use volatile to mean atomic
[platform/upstream/gstreamer.git] / gst / rtmp2 / rtmp / rtmputils.c
1 /* GStreamer RTMP Library
2  * Copyright (C) 2013 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include "rtmputils.h"
27 #include <string.h>
28
29 static void read_all_bytes_done (GObject * source, GAsyncResult * result,
30     gpointer user_data);
31 static void write_all_bytes_done (GObject * source, GAsyncResult * result,
32     gpointer user_data);
33 static void write_all_buffer_done (GObject * source, GAsyncResult * result,
34     gpointer user_data);
35
36 void
37 gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes)
38 {
39   const guint8 *data;
40   gsize size;
41   guint offset;
42
43   g_return_if_fail (bytearray);
44
45   offset = bytearray->len;
46   data = g_bytes_get_data (bytes, &size);
47
48   g_return_if_fail (data);
49
50   g_byte_array_set_size (bytearray, offset + size);
51   memcpy (bytearray->data + offset, data, size);
52 }
53
54 void
55 gst_rtmp_input_stream_read_all_bytes_async (GInputStream * stream, gsize count,
56     int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback,
57     gpointer user_data)
58 {
59   GTask *task;
60   GByteArray *ba;
61
62   g_return_if_fail (G_IS_INPUT_STREAM (stream));
63
64   task = g_task_new (stream, cancellable, callback, user_data);
65
66   ba = g_byte_array_sized_new (count);
67   g_byte_array_set_size (ba, count);
68   g_task_set_task_data (task, ba, (GDestroyNotify) g_byte_array_unref);
69
70   g_input_stream_read_all_async (stream, ba->data, count, io_priority,
71       cancellable, read_all_bytes_done, task);
72 }
73
74 static void
75 read_all_bytes_done (GObject * source, GAsyncResult * result,
76     gpointer user_data)
77 {
78   GInputStream *is = G_INPUT_STREAM (source);
79   GTask *task = user_data;
80   GByteArray *ba = g_task_get_task_data (task);
81   GError *error = NULL;
82   gboolean res;
83   gsize bytes_read;
84   GBytes *bytes;
85
86   res = g_input_stream_read_all_finish (is, result, &bytes_read, &error);
87   if (!res) {
88     g_task_return_error (task, error);
89     g_object_unref (task);
90     return;
91   }
92
93   g_byte_array_set_size (ba, bytes_read);
94   bytes = g_byte_array_free_to_bytes (g_byte_array_ref (ba));
95
96   g_task_return_pointer (task, bytes, (GDestroyNotify) g_bytes_unref);
97   g_object_unref (task);
98 }
99
100 GBytes *
101 gst_rtmp_input_stream_read_all_bytes_finish (GInputStream * stream,
102     GAsyncResult * result, GError ** error)
103 {
104   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
105   return g_task_propagate_pointer (G_TASK (result), error);
106 }
107
108 void
109 gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream,
110     GBytes * bytes, int io_priority, GCancellable * cancellable,
111     GAsyncReadyCallback callback, gpointer user_data)
112 {
113   GTask *task;
114   const void *data;
115   gsize size;
116
117   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
118   g_return_if_fail (bytes);
119
120   data = g_bytes_get_data (bytes, &size);
121   g_return_if_fail (data);
122
123   task = g_task_new (stream, cancellable, callback, user_data);
124   g_task_set_task_data (task, g_bytes_ref (bytes),
125       (GDestroyNotify) g_bytes_unref);
126
127   g_output_stream_write_all_async (stream, data, size, io_priority,
128       cancellable, write_all_bytes_done, task);
129 }
130
131 static void
132 write_all_bytes_done (GObject * source, GAsyncResult * result,
133     gpointer user_data)
134 {
135   GOutputStream *os = G_OUTPUT_STREAM (source);
136   GTask *task = user_data;
137   GError *error = NULL;
138   gboolean res;
139
140   res = g_output_stream_write_all_finish (os, result, NULL, &error);
141   if (!res) {
142     g_task_return_error (task, error);
143     g_object_unref (task);
144     return;
145   }
146
147   g_task_return_boolean (task, TRUE);
148   g_object_unref (task);
149 }
150
151 gboolean
152 gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
153     GAsyncResult * result, GError ** error)
154 {
155   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
156   return g_task_propagate_boolean (G_TASK (result), error);
157 }
158
159 typedef struct
160 {
161   GstBuffer *buffer;
162   GstMapInfo map;
163   gboolean mapped;
164   gsize bytes_written;
165 } WriteAllBufferData;
166
167 static WriteAllBufferData *
168 write_all_buffer_data_new (GstBuffer * buffer)
169 {
170   WriteAllBufferData *data = g_slice_new0 (WriteAllBufferData);
171   data->buffer = gst_buffer_ref (buffer);
172   return data;
173 }
174
175 static void
176 write_all_buffer_data_free (gpointer ptr)
177 {
178   WriteAllBufferData *data = ptr;
179   if (data->mapped) {
180     gst_buffer_unmap (data->buffer, &data->map);
181   }
182   g_clear_pointer (&data->buffer, gst_buffer_unref);
183   g_slice_free (WriteAllBufferData, data);
184 }
185
186 void
187 gst_rtmp_output_stream_write_all_buffer_async (GOutputStream * stream,
188     GstBuffer * buffer, int io_priority, GCancellable * cancellable,
189     GAsyncReadyCallback callback, gpointer user_data)
190 {
191   GTask *task;
192   WriteAllBufferData *data;
193
194   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
195   g_return_if_fail (GST_IS_BUFFER (buffer));
196
197   task = g_task_new (stream, cancellable, callback, user_data);
198
199   data = write_all_buffer_data_new (buffer);
200   g_task_set_task_data (task, data, write_all_buffer_data_free);
201
202   if (!gst_buffer_map (buffer, &data->map, GST_MAP_READ)) {
203     g_task_return_new_error (task, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
204         "Failed to map buffer for reading");
205     g_object_unref (task);
206     return;
207   }
208
209   data->mapped = TRUE;
210
211   g_output_stream_write_all_async (stream, data->map.data, data->map.size,
212       io_priority, cancellable, write_all_buffer_done, task);
213 }
214
215 static void
216 write_all_buffer_done (GObject * source, GAsyncResult * result,
217     gpointer user_data)
218 {
219   GOutputStream *os = G_OUTPUT_STREAM (source);
220   GTask *task = user_data;
221   WriteAllBufferData *data = g_task_get_task_data (task);
222   GError *error = NULL;
223   gboolean res;
224
225   res = g_output_stream_write_all_finish (os, result, &data->bytes_written,
226       &error);
227
228   gst_buffer_unmap (data->buffer, &data->map);
229   data->mapped = FALSE;
230
231   if (!res) {
232     g_task_return_error (task, error);
233     g_object_unref (task);
234     return;
235   }
236
237   g_task_return_boolean (task, TRUE);
238   g_object_unref (task);
239 }
240
241
242 gboolean
243 gst_rtmp_output_stream_write_all_buffer_finish (GOutputStream * stream,
244     GAsyncResult * result, gsize * bytes_written, GError ** error)
245 {
246   WriteAllBufferData *data;
247   GTask *task;
248
249   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
250   task = G_TASK (result);
251
252   data = g_task_get_task_data (task);
253   if (bytes_written) {
254     *bytes_written = data->bytes_written;
255   }
256
257   return g_task_propagate_boolean (task, error);
258 }
259
260 static const gchar ascii_table[128] = {
261   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
262   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
263   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
264   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
265   ' ', '!', 0x0, '#', '$', '%', '&', '\'',
266   '(', ')', '*', '+', ',', '-', '.', '/',
267   '0', '1', '2', '3', '4', '5', '6', '7',
268   '8', '9', ':', ';', '<', '=', '>', '?',
269   '@', 'A', 'B', 'C', 'D', 'E', 'F', 'G',
270   'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O',
271   'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W',
272   'X', 'Y', 'Z', '[', 0x0, ']', '^', '_',
273   '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g',
274   'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
275   'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
276   'x', 'y', 'z', '{', '|', '}', '~', 0x0,
277 };
278
279 static const gchar ascii_escapes[128] = {
280   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 'a',
281   'b', 't', 'n', 'v', 'f', 'r', 0x0, 0x0,
282   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
283   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
284   0x0, 0x0, '"', 0x0, 0x0, 0x0, 0x0, 0x0,
285   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
286   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
287   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
288   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
289   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
290   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
291   0x0, 0x0, 0x0, 0x0, '\\', 0x0, 0x0, 0x0,
292   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
293   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
294   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
295   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
296 };
297
298 void
299 gst_rtmp_string_print_escaped (GString * string, const gchar * data,
300     gssize size)
301 {
302   gssize i;
303
304   g_return_if_fail (string);
305
306   if (!data) {
307     g_string_append (string, "(NULL)");
308     return;
309   }
310
311   g_string_append_c (string, '"');
312
313   for (i = 0; size < 0 ? data[i] != 0 : i < size; i++) {
314     guchar c = data[i];
315
316     if (G_LIKELY (c < G_N_ELEMENTS (ascii_table))) {
317       if (ascii_table[c]) {
318         g_string_append_c (string, c);
319         continue;
320       }
321
322       if (ascii_escapes[c]) {
323         g_string_append_c (string, '\\');
324         g_string_append_c (string, ascii_escapes[c]);
325         continue;
326       }
327     } else {
328       gunichar uc = g_utf8_get_char_validated (data + i,
329           size < 0 ? -1 : size - i);
330       if (uc != (gunichar) (-2) && uc != (gunichar) (-1)) {
331         if (g_unichar_isprint (uc)) {
332           g_string_append_unichar (string, uc);
333         } else if (uc <= G_MAXUINT16) {
334           g_string_append_printf (string, "\\u%04X", uc);
335         } else {
336           g_string_append_printf (string, "\\U%08X", uc);
337         }
338
339         i += g_utf8_skip[c] - 1;
340         continue;
341       }
342     }
343
344     g_string_append_printf (string, "\\x%02X", c);
345   }
346
347   g_string_append_c (string, '"');
348
349 }
350
351 gboolean
352 gst_rtmp_flv_tag_parse_header (GstRtmpFlvTagHeader * header,
353     const guint8 * data, gsize size)
354 {
355   g_return_val_if_fail (header, FALSE);
356   g_return_val_if_fail (data, FALSE);
357
358   /* Parse FLVTAG header as described in
359    * video_file_format_spec_v10.pdf page 5 (page 9 of the PDF) */
360
361   if (size < GST_RTMP_FLV_TAG_HEADER_SIZE) {
362     return FALSE;
363   }
364
365   /* TagType UI8 */
366   header->type = GST_READ_UINT8 (data);
367
368   /* DataSize UI24 */
369   header->payload_size = GST_READ_UINT24_BE (data + 1);
370
371   /* 4 bytes for the PreviousTagSize UI32 following every tag */
372   header->total_size = GST_RTMP_FLV_TAG_HEADER_SIZE + header->payload_size + 4;
373
374   /* Timestamp UI24 + TimestampExtended UI8 */
375   header->timestamp = GST_READ_UINT24_BE (data + 4);
376   header->timestamp |= (guint32) GST_READ_UINT8 (data + 7) << 24;
377
378   /* Skip StreamID UI24. It's "always 0" for FLV files and for aggregated RTMP
379    * messages we're supposed to use the Stream ID from the AGGREGATE. */
380
381   return TRUE;
382 }