Merging gst-plugins-bad
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / rtmp2 / rtmp / rtmpconnection.c
1 /* GStreamer RTMP Library
2  * Copyright (C) 2013 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include <string.h>
28 #include <math.h>
29 #include "rtmpconnection.h"
30 #include "rtmpchunkstream.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
33 #include "amf.h"
34
35 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
36 #define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
37
38 #define READ_SIZE 8192
39
40 typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
41
42 struct _GstRtmpConnection
43 {
44   GObject parent_instance;
45
46   /* should be properties */
47   gboolean input_paused;
48   gboolean error;
49
50   /* private */
51   GThread *thread;
52   GSocketConnection *connection;
53   GCancellable *cancellable;
54   GSocketClient *socket_client;
55   GAsyncQueue *output_queue;
56   GMainContext *main_context;
57
58   GCancellable *outer_cancellable;
59   gulong cancel_handler_id;
60
61   GSource *input_source;
62   GByteArray *input_bytes;
63   guint input_needed_bytes;
64   GstRtmpChunkStreams *input_streams, *output_streams;
65   GList *transactions;
66   GList *expected_commands;
67   guint transaction_count;
68
69   GstRtmpConnectionMessageFunc input_handler;
70   gpointer input_handler_user_data;
71   GDestroyNotify input_handler_user_data_destroy;
72
73   GstRtmpConnectionFunc output_handler;
74   gpointer output_handler_user_data;
75   GDestroyNotify output_handler_user_data_destroy;
76
77   gboolean writing;
78
79   /* Protects the values below during concurrent access.
80    * - Taken by the loop thread when writing, but not reading.
81    * - Taken by other threads when reading (calling get_stats).
82    */
83   GMutex stats_lock;
84
85   /* RTMP configuration */
86   guint32 in_chunk_size;
87   guint32 out_chunk_size, out_chunk_size_pending;
88   guint32 in_window_ack_size;
89   guint32 out_window_ack_size, out_window_ack_size_pending;
90
91   guint64 in_bytes_total;
92   guint64 out_bytes_total;
93   guint64 in_bytes_acked;
94   guint64 out_bytes_acked;
95 };
96
97
98 typedef struct
99 {
100   GObjectClass parent_class;
101 } GstRtmpConnectionClass;
102
103 /* prototypes */
104
105 static void gst_rtmp_connection_dispose (GObject * object);
106 static void gst_rtmp_connection_finalize (GObject * object);
107 static void gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
108     GCancellable * cancellable);
109 static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
110 static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
111     gpointer user_data);
112 static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
113 static void gst_rtmp_connection_write_buffer_done (GObject * obj,
114     GAsyncResult * result, gpointer user_data);
115 static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
116     guint needed_bytes);
117 static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
118 static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
119 static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
120     connection, GstBuffer * buffer);
121 static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
122     connection, GstBuffer * buffer);
123 static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
124     GstBuffer * buffer);
125 static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
126     GstBuffer * buffer);
127 static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
128     GstBuffer * buffer);
129 static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
130     guint32 in_chunk_size);
131 static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
132     guint32 bytes);
133 static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
134     self, guint32 in_chunk_size);
135
136 static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
137 static void
138 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
139     guint32 event_data);
140
141 static gboolean
142 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
143     GstBuffer * buffer);
144 static void
145 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
146
147 typedef struct
148 {
149   gdouble transaction_id;
150   GstRtmpCommandCallback func;
151   gpointer user_data;
152 } Transaction;
153
154 static Transaction *
155 transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
156     gpointer user_data)
157 {
158   Transaction *data = g_slice_new (Transaction);
159   data->transaction_id = transaction_id;
160   data->func = func;
161   data->user_data = user_data;
162   return data;
163 }
164
165 static void
166 transaction_free (gpointer ptr)
167 {
168   Transaction *data = ptr;
169   g_slice_free (Transaction, data);
170 }
171
172 typedef struct
173 {
174   guint32 stream_id;
175   gchar *command_name;
176   GstRtmpCommandCallback func;
177   gpointer user_data;
178 } ExpectedCommand;
179
180 static ExpectedCommand *
181 expected_command_new (guint32 stream_id, const gchar * command_name,
182     GstRtmpCommandCallback func, gpointer user_data)
183 {
184   ExpectedCommand *data = g_slice_new (ExpectedCommand);
185   data->stream_id = stream_id;
186   data->command_name = g_strdup (command_name);
187   data->func = func;
188   data->user_data = user_data;
189   return data;
190 }
191
192 static void
193 expected_command_free (gpointer ptr)
194 {
195   ExpectedCommand *data = ptr;
196   g_free (data->command_name);
197   g_slice_free (ExpectedCommand, data);
198 }
199
200 enum
201 {
202   SIGNAL_ERROR,
203   SIGNAL_STREAM_CONTROL,
204
205   N_SIGNALS
206 };
207
208 static guint signals[N_SIGNALS] = { 0, };
209
210 /* singletons */
211
212 static GstMemory *set_data_frame_value;
213
214 static void
215 init_set_data_frame_value (void)
216 {
217   GstAmfNode *node = gst_amf_node_new_string ("@setDataFrame", -1);
218   GBytes *bytes = gst_amf_node_serialize (node);
219   gsize size;
220   const gchar *data = g_bytes_get_data (bytes, &size);
221
222   set_data_frame_value = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
223       (gpointer) data, size, 0, size, bytes, (GDestroyNotify) g_bytes_unref);
224   GST_MINI_OBJECT_FLAG_SET (set_data_frame_value,
225       GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED);
226
227   gst_amf_node_free (node);
228 }
229
230 /* class initialization */
231
232 G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
233     G_TYPE_OBJECT,
234     GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
235         "rtmpconnection", 0, "debug category for GstRtmpConnection class");
236     init_set_data_frame_value ());
237
238 static void
239 gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
240 {
241   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
242
243   gobject_class->dispose = gst_rtmp_connection_dispose;
244   gobject_class->finalize = gst_rtmp_connection_finalize;
245
246   signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
247       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
248
249   signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
250       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
251       G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
252
253   GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
254 }
255
256 static void
257 gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
258 {
259   rtmpconnection->cancellable = g_cancellable_new ();
260   rtmpconnection->output_queue =
261       g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
262   rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
263   rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
264
265   rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
266   rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
267
268   rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
269   rtmpconnection->input_needed_bytes = 1;
270
271   g_mutex_init (&rtmpconnection->stats_lock);
272 }
273
274 void
275 gst_rtmp_connection_dispose (GObject * object)
276 {
277   GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
278   GST_DEBUG_OBJECT (rtmpconnection, "dispose");
279
280   /* clean up as possible.  may be called multiple times */
281
282   gst_rtmp_connection_close (rtmpconnection);
283   g_cancellable_cancel (rtmpconnection->cancellable);
284   gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
285   gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
286   gst_rtmp_connection_set_cancellable (rtmpconnection, NULL);
287
288   G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
289 }
290
291 void
292 gst_rtmp_connection_finalize (GObject * object)
293 {
294   GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
295   GST_DEBUG_OBJECT (rtmpconnection, "finalize");
296
297   /* clean up object here */
298
299   g_mutex_clear (&rtmpconnection->stats_lock);
300   g_clear_object (&rtmpconnection->cancellable);
301   g_clear_object (&rtmpconnection->connection);
302   g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
303   g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
304   g_clear_pointer (&rtmpconnection->output_streams,
305       gst_rtmp_chunk_streams_free);
306   g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
307   g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
308   g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
309
310   G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
311 }
312
313 GSocket *
314 gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
315 {
316   return g_socket_connection_get_socket (sc->connection);
317 }
318
319 static void
320 gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
321     GSocketConnection * connection)
322 {
323   GInputStream *is;
324
325   sc->thread = g_thread_ref (g_thread_self ());
326   sc->main_context = g_main_context_ref_thread_default ();
327   sc->connection = g_object_ref (connection);
328
329   /* refs the socket because it's creating an input stream, which holds a ref */
330   is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
331   /* refs the socket because it's creating a socket source */
332   g_warn_if_fail (!sc->input_source);
333   sc->input_source =
334       g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
335       sc->cancellable);
336   g_source_set_callback (sc->input_source,
337       (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
338       g_object_unref);
339   g_source_attach (sc->input_source, sc->main_context);
340 }
341
342 static void
343 gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
344     GCancellable * cancellable)
345 {
346   g_cancellable_disconnect (self->outer_cancellable, self->cancel_handler_id);
347   g_clear_object (&self->outer_cancellable);
348   self->cancel_handler_id = 0;
349
350   if (cancellable == NULL)
351     return;
352
353   self->outer_cancellable = g_object_ref (cancellable);
354   self->cancel_handler_id =
355       g_cancellable_connect (cancellable, G_CALLBACK (g_cancellable_cancel),
356       g_object_ref (self->cancellable), g_object_unref);
357 }
358
359
360 GstRtmpConnection *
361 gst_rtmp_connection_new (GSocketConnection * connection,
362     GCancellable * cancellable)
363 {
364   GstRtmpConnection *sc;
365
366   sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
367
368   gst_rtmp_connection_set_socket_connection (sc, connection);
369   gst_rtmp_connection_set_cancellable (sc, cancellable);
370
371   return sc;
372 }
373
374 static void
375 cancel_all_commands (GstRtmpConnection * self)
376 {
377   GList *l;
378
379   for (l = self->transactions; l; l = g_list_next (l)) {
380     Transaction *cc = l->data;
381     GST_LOG_OBJECT (self, "calling transaction callback %s",
382         GST_DEBUG_FUNCPTR_NAME (cc->func));
383     cc->func ("<cancelled>", NULL, cc->user_data);
384   }
385   g_list_free_full (self->transactions, transaction_free);
386   self->transactions = NULL;
387
388   for (l = self->expected_commands; l; l = g_list_next (l)) {
389     ExpectedCommand *cc = l->data;
390     GST_LOG_OBJECT (self, "calling expected command callback %s",
391         GST_DEBUG_FUNCPTR_NAME (cc->func));
392     cc->func ("<cancelled>", NULL, cc->user_data);
393   }
394   g_list_free_full (self->expected_commands, expected_command_free);
395   self->expected_commands = NULL;
396 }
397
398 void
399 gst_rtmp_connection_close (GstRtmpConnection * self)
400 {
401   if (self->thread != g_thread_self ()) {
402     GST_ERROR_OBJECT (self, "Called from wrong thread");
403   }
404
405   g_cancellable_cancel (self->cancellable);
406   cancel_all_commands (self);
407
408   if (self->input_source) {
409     g_source_destroy (self->input_source);
410     g_clear_pointer (&self->input_source, g_source_unref);
411   }
412
413   if (self->connection) {
414     g_io_stream_close_async (G_IO_STREAM (self->connection),
415         G_PRIORITY_DEFAULT, NULL, NULL, NULL);
416   }
417 }
418
419 void
420 gst_rtmp_connection_close_and_unref (gpointer ptr)
421 {
422   GstRtmpConnection *connection;
423
424   g_return_if_fail (ptr);
425
426   connection = GST_RTMP_CONNECTION (ptr);
427   gst_rtmp_connection_close (connection);
428   g_object_unref (connection);
429 }
430
431 void
432 gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
433     GstRtmpConnectionMessageFunc callback, gpointer user_data,
434     GDestroyNotify user_data_destroy)
435 {
436   if (sc->input_handler_user_data_destroy) {
437     sc->input_handler_user_data_destroy (sc->input_handler_user_data);
438   }
439
440   sc->input_handler = callback;
441   sc->input_handler_user_data = user_data;
442   sc->input_handler_user_data_destroy = user_data_destroy;
443 }
444
445 void
446 gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
447     GstRtmpConnectionFunc callback, gpointer user_data,
448     GDestroyNotify user_data_destroy)
449 {
450   if (sc->output_handler_user_data_destroy) {
451     sc->output_handler_user_data_destroy (sc->output_handler_user_data);
452   }
453
454   sc->output_handler = callback;
455   sc->output_handler_user_data = user_data;
456   sc->output_handler_user_data_destroy = user_data_destroy;
457 }
458
459 static gboolean
460 gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
461 {
462   GstRtmpConnection *sc = user_data;
463   gssize ret;
464   guint oldsize;
465   GError *error = NULL;
466   guint64 bytes_since_ack;
467
468   GST_TRACE_OBJECT (sc, "input ready");
469
470   oldsize = sc->input_bytes->len;
471   g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
472   ret =
473       g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
474       sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
475   g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
476
477   if (ret < 0) {
478     gint code = error->code;
479
480     if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
481             code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
482       /* should retry */
483       GST_DEBUG_OBJECT (sc, "read IO error %d %s, continuing",
484           code, error->message);
485       g_error_free (error);
486       return G_SOURCE_CONTINUE;
487     }
488
489     GST_ERROR_OBJECT (sc, "read error: %s %d %s",
490         g_quark_to_string (error->domain), code, error->message);
491     g_error_free (error);
492   } else if (ret == 0) {
493     GST_INFO_OBJECT (sc, "read EOF");
494   }
495
496   if (ret <= 0) {
497     gst_rtmp_connection_emit_error (sc);
498     return G_SOURCE_REMOVE;
499   }
500
501   GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
502
503   g_mutex_lock (&sc->stats_lock);
504   sc->in_bytes_total += ret;
505   g_mutex_unlock (&sc->stats_lock);
506
507   bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
508   if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
509     gst_rtmp_connection_send_ack (sc);
510   }
511
512   gst_rtmp_connection_try_read (sc);
513   return G_SOURCE_CONTINUE;
514 }
515
516 static void
517 gst_rtmp_connection_start_write (GstRtmpConnection * self)
518 {
519   GOutputStream *os;
520   GstBuffer *message, *chunks;
521   GstRtmpMeta *meta;
522   GstRtmpChunkStream *cstream;
523
524   if (self->writing) {
525     return;
526   }
527
528   message = g_async_queue_try_pop (self->output_queue);
529   if (!message) {
530     return;
531   }
532
533   meta = gst_buffer_get_rtmp_meta (message);
534   if (!meta) {
535     GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
536     goto out;
537   }
538
539   if (gst_rtmp_message_is_protocol_control (message)) {
540     if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
541       GST_ERROR_OBJECT (self,
542           "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
543       goto out;
544     }
545   }
546
547   cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
548   if (!cstream) {
549     GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
550         message);
551     goto out;
552   }
553
554   chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
555       self->out_chunk_size);
556   if (!chunks) {
557     GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
558     goto out;
559   }
560
561   self->writing = TRUE;
562   if (self->output_handler) {
563     self->output_handler (self, self->output_handler_user_data);
564   }
565
566   os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
567   gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
568       self->cancellable, gst_rtmp_connection_write_buffer_done,
569       g_object_ref (self));
570
571   gst_buffer_unref (chunks);
572
573 out:
574   gst_buffer_unref (message);
575 }
576
577 static void
578 gst_rtmp_connection_emit_error (GstRtmpConnection * self)
579 {
580   if (self->error) {
581     return;
582   }
583
584   GST_INFO_OBJECT (self, "connection error");
585   self->error = TRUE;
586
587   cancel_all_commands (self);
588
589   g_signal_emit (self, signals[SIGNAL_ERROR], 0);
590 }
591
592 static void
593 gst_rtmp_connection_write_buffer_done (GObject * obj,
594     GAsyncResult * result, gpointer user_data)
595 {
596   GOutputStream *os = G_OUTPUT_STREAM (obj);
597   GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
598   gsize bytes_written = 0;
599   GError *error = NULL;
600   gboolean res;
601
602   self->writing = FALSE;
603
604   res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
605       &bytes_written, &error);
606
607   g_mutex_lock (&self->stats_lock);
608   self->out_bytes_total += bytes_written;
609   g_mutex_unlock (&self->stats_lock);
610
611   if (!res) {
612     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
613       GST_INFO_OBJECT (self,
614           "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
615     } else {
616       GST_ERROR_OBJECT (self,
617           "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
618           error->message, bytes_written);
619     }
620     gst_rtmp_connection_emit_error (self);
621     g_error_free (error);
622     g_object_unref (self);
623     return;
624   }
625
626   GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
627       bytes_written);
628
629   gst_rtmp_connection_apply_protocol_control (self);
630   gst_rtmp_connection_start_write (self);
631   g_object_unref (self);
632 }
633
634 static void
635 gst_rtmp_connection_start_read (GstRtmpConnection * connection,
636     guint needed_bytes)
637 {
638   g_return_if_fail (needed_bytes > 0);
639   connection->input_needed_bytes = needed_bytes;
640   gst_rtmp_connection_try_read (connection);
641 }
642
643 static void
644 gst_rtmp_connection_try_read (GstRtmpConnection * connection)
645 {
646   guint need = connection->input_needed_bytes,
647       len = connection->input_bytes->len;
648
649   if (len < need) {
650     GST_TRACE_OBJECT (connection, "got %u < %u bytes, need more", len, need);
651     return;
652   }
653
654   GST_TRACE_OBJECT (connection, "got %u >= %u bytes, proceeding", len, need);
655   gst_rtmp_connection_do_read (connection);
656 }
657
658 static void
659 gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
660     GBytes ** outbytes)
661 {
662   g_return_if_fail (size <= sc->input_bytes->len);
663
664   if (outbytes) {
665     *outbytes = g_bytes_new (sc->input_bytes->data, size);
666   }
667
668   g_byte_array_remove_range (sc->input_bytes, 0, size);
669 }
670
671 static void
672 gst_rtmp_connection_do_read (GstRtmpConnection * sc)
673 {
674   GByteArray *input_bytes = sc->input_bytes;
675   gsize needed_bytes = 1;
676
677   while (1) {
678     GstRtmpChunkStream *cstream;
679     guint32 chunk_stream_id, header_size, next_size;
680     guint8 *data;
681
682     chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
683         input_bytes->len);
684
685     if (!chunk_stream_id) {
686       needed_bytes = input_bytes->len + 1;
687       break;
688     }
689
690     cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
691     header_size = gst_rtmp_chunk_stream_parse_header (cstream,
692         input_bytes->data, input_bytes->len);
693
694     if (input_bytes->len < header_size) {
695       needed_bytes = header_size;
696       break;
697     }
698
699     next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
700         sc->in_chunk_size, &data);
701
702     if (input_bytes->len < header_size + next_size) {
703       needed_bytes = header_size + next_size;
704       break;
705     }
706
707     memcpy (data, input_bytes->data + header_size, next_size);
708     gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
709
710     next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
711         sc->in_chunk_size);
712
713     if (next_size == 0) {
714       GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
715       gst_rtmp_connection_handle_message (sc, buffer);
716       gst_buffer_unref (buffer);
717     }
718   }
719
720   gst_rtmp_connection_start_read (sc, needed_bytes);
721 }
722
723 static void
724 gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
725 {
726   if (gst_rtmp_message_is_protocol_control (buffer)) {
727     gst_rtmp_connection_handle_protocol_control (sc, buffer);
728     return;
729   }
730
731   if (gst_rtmp_message_is_user_control (buffer)) {
732     gst_rtmp_connection_handle_user_control (sc, buffer);
733     return;
734   }
735
736   switch (gst_rtmp_message_get_type (buffer)) {
737     case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
738       gst_rtmp_connection_handle_cm (sc, buffer);
739       return;
740
741     case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
742       gst_rtmp_connection_handle_aggregate (sc, buffer);
743       break;
744
745     default:
746       if (sc->input_handler) {
747         sc->input_handler (sc, buffer, sc->input_handler_user_data);
748       }
749       return;
750   }
751 }
752
753 static void
754 gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
755     GstBuffer * buffer)
756 {
757   GstRtmpMeta *meta;
758   GstMapInfo map;
759   gsize pos = 0;
760   guint32 first_ts = 0;
761
762   meta = gst_buffer_get_rtmp_meta (buffer);
763   g_return_if_fail (meta);
764
765   gst_buffer_map (buffer, &map, GST_MAP_READ);
766   GST_TRACE_OBJECT (connection, "got aggregate message");
767
768   /* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
769    * The payload is part of a FLV file.
770    *
771    * WARNING: This spec defines the payload to use an "RTMP message format"
772    * which misidentifies the format of the timestamps and omits the size of the
773    * backpointers. */
774
775   while (pos < map.size) {
776     gsize remaining = map.size - pos;
777     GstBuffer *submessage;
778     GstRtmpMeta *submeta;
779     GstRtmpFlvTagHeader header;
780
781     if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
782       GST_ERROR_OBJECT (connection,
783           "aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
784           GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
785       break;
786     }
787
788     if (remaining < header.total_size) {
789       GST_ERROR_OBJECT (connection,
790           "aggregate contains incomplete message; want %" G_GSIZE_FORMAT
791           ", got %" G_GSIZE_FORMAT, header.total_size, remaining);
792       break;
793     }
794
795     submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
796         GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
797         pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
798
799     GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
800     GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
801     GST_BUFFER_OFFSET_END (submessage) =
802         GST_BUFFER_OFFSET (submessage) + header.total_size;
803
804     submeta = gst_buffer_get_rtmp_meta (submessage);
805     g_assert (submeta);
806
807     submeta->type = header.type;
808     submeta->size = header.payload_size;
809
810     if (pos == 0) {
811       first_ts = header.timestamp;
812     } else {
813       guint32 ts_offset = header.timestamp - first_ts;
814
815       submeta->ts_delta += ts_offset;
816       GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
817       GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
818     }
819
820     gst_rtmp_buffer_dump (submessage, "<<< submessage");
821     gst_rtmp_connection_handle_message (connection, submessage);
822     gst_buffer_unref (submessage);
823
824     pos += header.total_size;
825   }
826
827   gst_buffer_unmap (buffer, &map);
828 }
829
830 static void
831 gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
832     GstBuffer * buffer)
833 {
834   GstRtmpProtocolControl pc;
835
836   if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
837     GST_ERROR_OBJECT (connection, "can't parse protocol control message");
838     return;
839   }
840
841   GST_LOG_OBJECT (connection, "got protocol control message %d:%s", pc.type,
842       gst_rtmp_message_type_get_nick (pc.type));
843
844   switch (pc.type) {
845     case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
846       GST_INFO_OBJECT (connection, "incoming chunk size %" G_GUINT32_FORMAT,
847           pc.param);
848       gst_rtmp_connection_handle_set_chunk_size (connection, pc.param);
849       break;
850
851     case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
852       GST_ERROR_OBJECT (connection, "unimplemented: chunk abort, stream_id = %"
853           G_GUINT32_FORMAT, pc.param);
854       break;
855
856     case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
857       GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
858           pc.param);
859       gst_rtmp_connection_handle_ack (connection, pc.param);
860       break;
861
862     case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
863       GST_INFO_OBJECT (connection,
864           "incoming window ack size: %" G_GUINT32_FORMAT, pc.param);
865       gst_rtmp_connection_handle_window_ack_size (connection, pc.param);
866       break;
867
868     case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
869       GST_FIXME_OBJECT (connection, "set peer bandwidth: %" G_GUINT32_FORMAT
870           ", %" G_GUINT32_FORMAT, pc.param, pc.param2);
871       /* FIXME this is not correct, but close enough */
872       gst_rtmp_connection_request_window_size (connection, pc.param);
873       break;
874
875     default:
876       GST_ERROR_OBJECT (connection, "unimplemented protocol control type %d:%s",
877           pc.type, gst_rtmp_message_type_get_nick (pc.type));
878       break;
879   }
880 }
881
882 static void
883 gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
884     GstBuffer * buffer)
885 {
886   GstRtmpUserControl uc;
887
888   if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
889     GST_ERROR_OBJECT (connection, "can't parse user control message");
890     return;
891   }
892
893   GST_LOG_OBJECT (connection, "got user control message %d:%s", uc.type,
894       gst_rtmp_user_control_type_get_nick (uc.type));
895
896   switch (uc.type) {
897     case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
898     case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
899     case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
900     case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
901       GST_INFO_OBJECT (connection, "stream %u got %s", uc.param,
902           gst_rtmp_user_control_type_get_nick (uc.type));
903       g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
904           uc.type, uc.param);
905       break;
906
907     case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
908       GST_FIXME_OBJECT (connection, "ignoring set buffer length: %"
909           G_GUINT32_FORMAT ", %" G_GUINT32_FORMAT " ms", uc.param, uc.param2);
910       break;
911
912     case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
913       GST_DEBUG_OBJECT (connection, "ping request: %" G_GUINT32_FORMAT,
914           uc.param);
915       gst_rtmp_connection_send_ping_response (connection, uc.param);
916       break;
917
918     case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
919       GST_DEBUG_OBJECT (connection,
920           "ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
921       break;
922
923     case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
924       GST_LOG_OBJECT (connection, "ignoring buffer empty: %" G_GUINT32_FORMAT,
925           uc.param);
926       break;
927
928     case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
929       GST_LOG_OBJECT (connection, "ignoring buffer ready: %" G_GUINT32_FORMAT,
930           uc.param);
931       break;
932
933     default:
934       GST_ERROR_OBJECT (connection, "unimplemented user control type %d:%s",
935           uc.type, gst_rtmp_user_control_type_get_nick (uc.type));
936       break;
937   }
938 }
939
940 static void
941 gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
942     guint32 chunk_size)
943 {
944   if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
945     GST_ERROR_OBJECT (self,
946         "peer requested chunk size %" G_GUINT32_FORMAT "; too small",
947         chunk_size);
948     return;
949   }
950
951   if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
952     GST_ERROR_OBJECT (self,
953         "peer requested chunk size %" G_GUINT32_FORMAT "; too large",
954         chunk_size);
955     return;
956   }
957
958   if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
959     GST_WARNING_OBJECT (self,
960         "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
961   }
962
963   g_mutex_lock (&self->stats_lock);
964   self->in_chunk_size = chunk_size;
965   g_mutex_unlock (&self->stats_lock);
966 }
967
968 static void
969 gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
970 {
971   guint64 last_ack, new_ack;
972   guint32 last_ack_low, last_ack_high;
973
974   last_ack = self->out_bytes_acked;
975   last_ack_low = last_ack & G_MAXUINT32;
976   last_ack_high = (last_ack >> 32) & G_MAXUINT32;
977
978   if (bytes < last_ack_low) {
979     GST_WARNING_OBJECT (self,
980         "Acknowledgement bytes regression, assuming rollover: %"
981         G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
982     last_ack_high += 1;
983   }
984
985   new_ack = (((guint64) last_ack_high) << 32) | bytes;
986
987   GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
988       new_ack - last_ack);
989
990   g_mutex_lock (&self->stats_lock);
991   self->out_bytes_acked = new_ack;
992   g_mutex_unlock (&self->stats_lock);
993 }
994
995 static void
996 gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
997     guint32 window_ack_size)
998 {
999   if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1000     GST_WARNING_OBJECT (self,
1001         "peer requested small window ack size %" G_GUINT32_FORMAT,
1002         window_ack_size);
1003   }
1004
1005   g_mutex_lock (&self->stats_lock);
1006   self->in_window_ack_size = window_ack_size;
1007   g_mutex_unlock (&self->stats_lock);
1008 }
1009
1010 static gboolean
1011 is_command_response (const gchar * command_name)
1012 {
1013   return g_strcmp0 (command_name, "_result") == 0 ||
1014       g_strcmp0 (command_name, "_error") == 0;
1015 }
1016
1017 static void
1018 gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
1019 {
1020   GstRtmpMeta *meta;
1021   gchar *command_name;
1022   gdouble transaction_id;
1023   GPtrArray *args;
1024
1025   meta = gst_buffer_get_rtmp_meta (buffer);
1026   g_return_if_fail (meta);
1027
1028   {
1029     GstMapInfo map;
1030     gst_buffer_map (buffer, &map, GST_MAP_READ);
1031     args = gst_amf_parse_command (map.data, map.size, &transaction_id,
1032         &command_name);
1033     gst_buffer_unmap (buffer, &map);
1034   }
1035
1036   if (!args) {
1037     return;
1038   }
1039
1040   if (!isfinite (transaction_id) || transaction_id < 0 ||
1041       transaction_id > G_MAXUINT) {
1042     GST_WARNING_OBJECT (sc,
1043         "Server sent command \"%s\" with extreme transaction ID %.0f",
1044         GST_STR_NULL (command_name), transaction_id);
1045   } else if (transaction_id > sc->transaction_count) {
1046     GST_WARNING_OBJECT (sc,
1047         "Server sent command \"%s\" with unused transaction ID (%.0f > %u)",
1048         GST_STR_NULL (command_name), transaction_id, sc->transaction_count);
1049     sc->transaction_count = transaction_id;
1050   }
1051
1052   GST_DEBUG_OBJECT (sc,
1053       "got control message \"%s\" transaction %.0f size %"
1054       G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
1055       meta->size);
1056
1057   if (is_command_response (command_name)) {
1058     if (transaction_id != 0) {
1059       GList *l;
1060
1061       for (l = sc->transactions; l; l = g_list_next (l)) {
1062         Transaction *t = l->data;
1063
1064         if (t->transaction_id != transaction_id) {
1065           continue;
1066         }
1067
1068         GST_LOG_OBJECT (sc, "calling transaction callback %s",
1069             GST_DEBUG_FUNCPTR_NAME (t->func));
1070         sc->transactions = g_list_remove_link (sc->transactions, l);
1071         t->func (command_name, args, t->user_data);
1072         g_list_free_full (l, transaction_free);
1073         break;
1074       }
1075     } else {
1076       GST_WARNING_OBJECT (sc, "Server sent response \"%s\" without transaction",
1077           GST_STR_NULL (command_name));
1078     }
1079   } else {
1080     GList *l;
1081
1082     if (transaction_id != 0) {
1083       GST_FIXME_OBJECT (sc, "Server sent command \"%s\" expecting reply",
1084           GST_STR_NULL (command_name));
1085     }
1086
1087     for (l = sc->expected_commands; l; l = g_list_next (l)) {
1088       ExpectedCommand *ec = l->data;
1089
1090       if (ec->stream_id != meta->mstream) {
1091         continue;
1092       }
1093
1094       if (g_strcmp0 (ec->command_name, command_name)) {
1095         continue;
1096       }
1097
1098       GST_LOG_OBJECT (sc, "calling expected command callback %s",
1099           GST_DEBUG_FUNCPTR_NAME (ec->func));
1100       sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
1101       ec->func (command_name, args, ec->user_data);
1102       g_list_free_full (l, expected_command_free);
1103       break;
1104     }
1105   }
1106
1107   g_free (command_name);
1108   g_ptr_array_unref (args);
1109 }
1110
1111 static gboolean
1112 start_write (gpointer user_data)
1113 {
1114   GstRtmpConnection *sc = user_data;
1115   gst_rtmp_connection_start_write (sc);
1116   return G_SOURCE_REMOVE;
1117 }
1118
1119 void
1120 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
1121 {
1122   g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
1123   g_return_if_fail (GST_IS_BUFFER (buffer));
1124
1125   g_async_queue_push (self->output_queue, buffer);
1126   g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
1127       start_write, g_object_ref (self), g_object_unref);
1128 }
1129
1130 guint
1131 gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
1132 {
1133   return g_async_queue_length (connection->output_queue);
1134 }
1135
1136 guint
1137 gst_rtmp_connection_send_command (GstRtmpConnection * connection,
1138     GstRtmpCommandCallback response_command, gpointer user_data,
1139     guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
1140     ...)
1141 {
1142   GstBuffer *buffer;
1143   gdouble transaction_id = 0;
1144   va_list ap;
1145   GBytes *payload;
1146   guint8 *data;
1147   gsize size;
1148
1149   g_return_val_if_fail (GST_IS_RTMP_CONNECTION (connection), 0);
1150
1151   if (connection->thread != g_thread_self ()) {
1152     GST_ERROR_OBJECT (connection, "Called from wrong thread");
1153   }
1154
1155   GST_DEBUG_OBJECT (connection,
1156       "Sending command '%s' on stream id %" G_GUINT32_FORMAT,
1157       command_name, stream_id);
1158
1159   if (response_command) {
1160     Transaction *t;
1161
1162     transaction_id = ++connection->transaction_count;
1163
1164     GST_LOG_OBJECT (connection, "Registering %s for transid %.0f",
1165         GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
1166
1167     t = transaction_new (transaction_id, response_command, user_data);
1168
1169     connection->transactions = g_list_append (connection->transactions, t);
1170   }
1171
1172   va_start (ap, argument);
1173   payload = gst_amf_serialize_command_valist (transaction_id,
1174       command_name, argument, ap);
1175   va_end (ap);
1176
1177   data = g_bytes_unref_to_data (payload, &size);
1178   buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
1179       3, stream_id, data, size);
1180
1181   gst_rtmp_connection_queue_message (connection, buffer);
1182   return transaction_id;
1183 }
1184
1185 void
1186 gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
1187     GstRtmpCommandCallback response_command, gpointer user_data,
1188     guint32 stream_id, const gchar * command_name)
1189 {
1190   ExpectedCommand *ec;
1191
1192   g_return_if_fail (response_command);
1193   g_return_if_fail (command_name);
1194   g_return_if_fail (!is_command_response (command_name));
1195
1196   GST_LOG_OBJECT (connection,
1197       "Registering %s for stream id %" G_GUINT32_FORMAT " name \"%s\"",
1198       GST_DEBUG_FUNCPTR_NAME (response_command), stream_id, command_name);
1199
1200   ec = expected_command_new (stream_id, command_name, response_command,
1201       user_data);
1202
1203   connection->expected_commands =
1204       g_list_append (connection->expected_commands, ec);
1205 }
1206
1207 static void
1208 gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
1209 {
1210   guint64 in_bytes_total = connection->in_bytes_total;
1211   GstRtmpProtocolControl pc = {
1212     .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
1213     .param = (guint32) in_bytes_total,
1214   };
1215
1216   gst_rtmp_connection_queue_message (connection,
1217       gst_rtmp_message_new_protocol_control (&pc));
1218
1219   g_mutex_lock (&connection->stats_lock);
1220   connection->in_bytes_acked = in_bytes_total;
1221   g_mutex_unlock (&connection->stats_lock);
1222 }
1223
1224 static void
1225 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
1226     guint32 event_data)
1227 {
1228   GstRtmpUserControl uc = {
1229     .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
1230     .param = event_data,
1231   };
1232
1233   gst_rtmp_connection_queue_message (connection,
1234       gst_rtmp_message_new_user_control (&uc));
1235 }
1236
1237 void
1238 gst_rtmp_connection_set_chunk_size (GstRtmpConnection * connection,
1239     guint32 chunk_size)
1240 {
1241   GstRtmpProtocolControl pc = {
1242     .type = GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE,
1243     .param = chunk_size,
1244   };
1245
1246   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1247
1248   gst_rtmp_connection_queue_message (connection,
1249       gst_rtmp_message_new_protocol_control (&pc));
1250 }
1251
1252 void
1253 gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
1254     guint32 window_ack_size)
1255 {
1256   GstRtmpProtocolControl pc = {
1257     .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
1258     .param = window_ack_size,
1259   };
1260
1261   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1262
1263   gst_rtmp_connection_queue_message (connection,
1264       gst_rtmp_message_new_protocol_control (&pc));
1265 }
1266
1267 void
1268 gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
1269     GstBuffer * buffer)
1270 {
1271   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1272   g_return_if_fail (GST_IS_BUFFER (buffer));
1273
1274   gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
1275   gst_rtmp_connection_queue_message (connection, buffer);
1276 }
1277
1278 static gboolean
1279 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
1280     GstBuffer * buffer)
1281 {
1282   GstRtmpProtocolControl pc;
1283
1284   if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
1285     GST_ERROR_OBJECT (self, "can't parse protocol control message");
1286     return FALSE;
1287   }
1288
1289   switch (pc.type) {
1290     case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
1291       guint32 chunk_size = pc.param;
1292
1293       GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
1294           chunk_size);
1295
1296       if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
1297         GST_ERROR_OBJECT (self,
1298             "requested chunk size %" G_GUINT32_FORMAT " is too small",
1299             chunk_size);
1300         return FALSE;
1301       }
1302
1303       if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
1304         GST_ERROR_OBJECT (self,
1305             "requested chunk size %" G_GUINT32_FORMAT " is too large",
1306             chunk_size);
1307         return FALSE;
1308       }
1309
1310       if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
1311         GST_WARNING_OBJECT (self,
1312             "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
1313       }
1314
1315       self->out_chunk_size_pending = pc.param;
1316       break;
1317     }
1318
1319     case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
1320       guint32 window_ack_size = pc.param;
1321
1322       GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
1323           window_ack_size);
1324
1325       if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1326         GST_WARNING_OBJECT (self,
1327             "requesting small window ack size %" G_GUINT32_FORMAT,
1328             window_ack_size);
1329       }
1330
1331       self->out_window_ack_size_pending = window_ack_size;
1332       break;
1333     }
1334
1335     default:
1336       break;
1337   }
1338
1339   return TRUE;
1340 }
1341
1342 static void
1343 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
1344 {
1345   guint32 chunk_size, window_ack_size;
1346
1347   chunk_size = self->out_chunk_size_pending;
1348   if (chunk_size) {
1349     self->out_chunk_size_pending = 0;
1350
1351     g_mutex_lock (&self->stats_lock);
1352     self->out_chunk_size = chunk_size;
1353     g_mutex_unlock (&self->stats_lock);
1354
1355     GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
1356   }
1357
1358   window_ack_size = self->out_window_ack_size_pending;
1359   if (window_ack_size) {
1360     self->out_window_ack_size_pending = 0;
1361
1362     g_mutex_lock (&self->stats_lock);
1363     self->out_window_ack_size = window_ack_size;
1364     g_mutex_unlock (&self->stats_lock);
1365
1366     GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
1367         window_ack_size);
1368   }
1369 }
1370
1371 static GstStructure *
1372 get_stats (GstRtmpConnection * self)
1373 {
1374   return gst_structure_new ("GstRtmpConnectionStats",
1375       "in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
1376       "out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
1377       "in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
1378       "out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
1379       "in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
1380       "out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
1381       "in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
1382       "out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
1383 }
1384
1385 GstStructure *
1386 gst_rtmp_connection_get_null_stats (void)
1387 {
1388   return get_stats (NULL);
1389 }
1390
1391 GstStructure *
1392 gst_rtmp_connection_get_stats (GstRtmpConnection * self)
1393 {
1394   GstStructure *s;
1395
1396   g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
1397
1398   g_mutex_lock (&self->stats_lock);
1399   s = get_stats (self);
1400   g_mutex_unlock (&self->stats_lock);
1401
1402   return s;
1403 }