base:subparse: fix invalid mem access issue
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / rtmp2 / rtmp / rtmpconnection.c
1 /* GStreamer RTMP Library
2  * Copyright (C) 2013 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include <string.h>
28 #include <math.h>
29 #include "rtmpconnection.h"
30 #include "rtmpchunkstream.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
33 #include "amf.h"
34
35 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
36 #define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
37
38 #define READ_SIZE 8192
39
40 typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
41
42 struct _GstRtmpConnection
43 {
44   GObject parent_instance;
45
46   /* should be properties */
47   gboolean input_paused;
48   gboolean error;
49
50   /* private */
51   GThread *thread;
52   GSocketConnection *connection;
53   GCancellable *cancellable;
54   GSocketClient *socket_client;
55   GAsyncQueue *output_queue;
56   GMainContext *main_context;
57
58   GCancellable *outer_cancellable;
59   gulong cancel_handler_id;
60
61   GSource *input_source;
62   GByteArray *input_bytes;
63   guint input_needed_bytes;
64   GstRtmpChunkStreams *input_streams, *output_streams;
65   GList *transactions;
66   GList *expected_commands;
67   guint transaction_count;
68
69   GstRtmpConnectionMessageFunc input_handler;
70   gpointer input_handler_user_data;
71   GDestroyNotify input_handler_user_data_destroy;
72
73   GstRtmpConnectionFunc output_handler;
74   gpointer output_handler_user_data;
75   GDestroyNotify output_handler_user_data_destroy;
76
77   gboolean writing;
78
79   /* Protects the values below during concurrent access.
80    * - Taken by the loop thread when writing, but not reading.
81    * - Taken by other threads when reading (calling get_stats).
82    */
83   GMutex stats_lock;
84
85   /* RTMP configuration */
86   guint32 in_chunk_size;
87   guint32 out_chunk_size, out_chunk_size_pending;
88   guint32 in_window_ack_size;
89   guint32 out_window_ack_size, out_window_ack_size_pending;
90
91   guint64 in_bytes_total;
92   guint64 out_bytes_total;
93   guint64 in_bytes_acked;
94   guint64 out_bytes_acked;
95 };
96
97
98 typedef struct
99 {
100   GObjectClass parent_class;
101 } GstRtmpConnectionClass;
102
103 /* prototypes */
104
105 static void gst_rtmp_connection_dispose (GObject * object);
106 static void gst_rtmp_connection_finalize (GObject * object);
107 static void gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
108     GCancellable * cancellable);
109 static void gst_rtmp_connection_emit_error (GstRtmpConnection * self,
110     GError * error);
111 static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
112     gpointer user_data);
113 static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
114 static void gst_rtmp_connection_write_buffer_done (GObject * obj,
115     GAsyncResult * result, gpointer user_data);
116 static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
117     guint needed_bytes);
118 static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
119 static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
120 static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
121     connection, GstBuffer * buffer);
122 static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
123     connection, GstBuffer * buffer);
124 static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
125     GstBuffer * buffer);
126 static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
127     GstBuffer * buffer);
128 static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
129     GstBuffer * buffer);
130 static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
131     guint32 in_chunk_size);
132 static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
133     guint32 bytes);
134 static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
135     self, guint32 in_chunk_size);
136
137 static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
138 static void
139 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
140     guint32 event_data);
141
142 static gboolean
143 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
144     GstBuffer * buffer);
145 static void
146 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
147
148 typedef struct
149 {
150   gdouble transaction_id;
151   GstRtmpCommandCallback func;
152   gpointer user_data;
153 } Transaction;
154
155 static Transaction *
156 transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
157     gpointer user_data)
158 {
159   Transaction *data = g_slice_new (Transaction);
160   data->transaction_id = transaction_id;
161   data->func = func;
162   data->user_data = user_data;
163   return data;
164 }
165
166 static void
167 transaction_free (gpointer ptr)
168 {
169   Transaction *data = ptr;
170   g_slice_free (Transaction, data);
171 }
172
173 typedef struct
174 {
175   guint32 stream_id;
176   gchar *command_name;
177   GstRtmpCommandCallback func;
178   gpointer user_data;
179 } ExpectedCommand;
180
181 static ExpectedCommand *
182 expected_command_new (guint32 stream_id, const gchar * command_name,
183     GstRtmpCommandCallback func, gpointer user_data)
184 {
185   ExpectedCommand *data = g_slice_new (ExpectedCommand);
186   data->stream_id = stream_id;
187   data->command_name = g_strdup (command_name);
188   data->func = func;
189   data->user_data = user_data;
190   return data;
191 }
192
193 static void
194 expected_command_free (gpointer ptr)
195 {
196   ExpectedCommand *data = ptr;
197   g_free (data->command_name);
198   g_slice_free (ExpectedCommand, data);
199 }
200
201 enum
202 {
203   SIGNAL_ERROR,
204   SIGNAL_STREAM_CONTROL,
205
206   N_SIGNALS
207 };
208
209 static guint signals[N_SIGNALS] = { 0, };
210
211 /* singletons */
212
213 static GstMemory *set_data_frame_value;
214
215 static void
216 init_set_data_frame_value (void)
217 {
218   GstAmfNode *node = gst_amf_node_new_string ("@setDataFrame", -1);
219   GBytes *bytes = gst_amf_node_serialize (node);
220   gsize size;
221   const gchar *data = g_bytes_get_data (bytes, &size);
222
223   set_data_frame_value = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
224       (gpointer) data, size, 0, size, bytes, (GDestroyNotify) g_bytes_unref);
225   GST_MINI_OBJECT_FLAG_SET (set_data_frame_value,
226       GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED);
227
228   gst_amf_node_free (node);
229 }
230
231 /* class initialization */
232
233 G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
234     G_TYPE_OBJECT,
235     GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
236         "rtmpconnection", 0, "debug category for GstRtmpConnection class");
237     init_set_data_frame_value ());
238
239 static void
240 gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
241 {
242   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
243
244   gobject_class->dispose = gst_rtmp_connection_dispose;
245   gobject_class->finalize = gst_rtmp_connection_finalize;
246
247   signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
248       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_ERROR);
249
250   signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
251       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
252       G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
253
254   GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
255 }
256
257 static void
258 gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
259 {
260   rtmpconnection->cancellable = g_cancellable_new ();
261   rtmpconnection->output_queue =
262       g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
263   rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
264   rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
265
266   rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
267   rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
268
269   rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
270   rtmpconnection->input_needed_bytes = 1;
271
272   g_mutex_init (&rtmpconnection->stats_lock);
273 }
274
275 void
276 gst_rtmp_connection_dispose (GObject * object)
277 {
278   GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
279   GST_DEBUG_OBJECT (rtmpconnection, "dispose");
280
281   /* clean up as possible.  may be called multiple times */
282
283   gst_rtmp_connection_close (rtmpconnection);
284   g_cancellable_cancel (rtmpconnection->cancellable);
285   gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
286   gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
287   gst_rtmp_connection_set_cancellable (rtmpconnection, NULL);
288
289   G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
290 }
291
292 void
293 gst_rtmp_connection_finalize (GObject * object)
294 {
295   GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
296   GST_DEBUG_OBJECT (rtmpconnection, "finalize");
297
298   /* clean up object here */
299
300   g_mutex_clear (&rtmpconnection->stats_lock);
301   g_clear_object (&rtmpconnection->cancellable);
302   g_clear_object (&rtmpconnection->connection);
303   g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
304   g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
305   g_clear_pointer (&rtmpconnection->output_streams,
306       gst_rtmp_chunk_streams_free);
307   g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
308   g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
309   g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
310
311   G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
312 }
313
314 GSocket *
315 gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
316 {
317   return g_socket_connection_get_socket (sc->connection);
318 }
319
320 static void
321 gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
322     GSocketConnection * connection)
323 {
324   GInputStream *is;
325
326   sc->thread = g_thread_ref (g_thread_self ());
327   sc->main_context = g_main_context_ref_thread_default ();
328   sc->connection = g_object_ref (connection);
329
330   /* refs the socket because it's creating an input stream, which holds a ref */
331   is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
332   /* refs the socket because it's creating a socket source */
333   g_warn_if_fail (!sc->input_source);
334   sc->input_source =
335       g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
336       sc->cancellable);
337   g_source_set_callback (sc->input_source,
338       (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
339       g_object_unref);
340   g_source_attach (sc->input_source, sc->main_context);
341 }
342
343 static void
344 gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
345     GCancellable * cancellable)
346 {
347   g_cancellable_disconnect (self->outer_cancellable, self->cancel_handler_id);
348   g_clear_object (&self->outer_cancellable);
349   self->cancel_handler_id = 0;
350
351   if (cancellable == NULL)
352     return;
353
354   self->outer_cancellable = g_object_ref (cancellable);
355   self->cancel_handler_id =
356       g_cancellable_connect (cancellable, G_CALLBACK (g_cancellable_cancel),
357       g_object_ref (self->cancellable), g_object_unref);
358 }
359
360
361 GstRtmpConnection *
362 gst_rtmp_connection_new (GSocketConnection * connection,
363     GCancellable * cancellable)
364 {
365   GstRtmpConnection *sc;
366
367   sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
368
369   gst_rtmp_connection_set_socket_connection (sc, connection);
370   gst_rtmp_connection_set_cancellable (sc, cancellable);
371
372   return sc;
373 }
374
375 static void
376 cancel_all_commands (GstRtmpConnection * self, const gchar * reason)
377 {
378   GList *l;
379
380   for (l = self->transactions; l; l = g_list_next (l)) {
381     Transaction *cc = l->data;
382     GST_LOG_OBJECT (self, "calling transaction callback %s",
383         GST_DEBUG_FUNCPTR_NAME (cc->func));
384     cc->func (reason, NULL, cc->user_data);
385   }
386   g_list_free_full (self->transactions, transaction_free);
387   self->transactions = NULL;
388
389   for (l = self->expected_commands; l; l = g_list_next (l)) {
390     ExpectedCommand *cc = l->data;
391     GST_LOG_OBJECT (self, "calling expected command callback %s",
392         GST_DEBUG_FUNCPTR_NAME (cc->func));
393     cc->func (reason, NULL, cc->user_data);
394   }
395   g_list_free_full (self->expected_commands, expected_command_free);
396   self->expected_commands = NULL;
397 }
398
399 void
400 gst_rtmp_connection_close (GstRtmpConnection * self)
401 {
402   if (self->thread != g_thread_self ()) {
403     GST_ERROR_OBJECT (self, "Called from wrong thread");
404   }
405
406   g_cancellable_cancel (self->cancellable);
407   cancel_all_commands (self, "connection closed locally");
408
409   if (self->input_source) {
410     g_source_destroy (self->input_source);
411     g_clear_pointer (&self->input_source, g_source_unref);
412   }
413
414   if (self->connection) {
415     g_io_stream_close_async (G_IO_STREAM (self->connection),
416         G_PRIORITY_DEFAULT, NULL, NULL, NULL);
417   }
418 }
419
420 void
421 gst_rtmp_connection_close_and_unref (gpointer ptr)
422 {
423   GstRtmpConnection *connection;
424
425   g_return_if_fail (ptr);
426
427   connection = GST_RTMP_CONNECTION (ptr);
428   gst_rtmp_connection_close (connection);
429   g_object_unref (connection);
430 }
431
432 void
433 gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
434     GstRtmpConnectionMessageFunc callback, gpointer user_data,
435     GDestroyNotify user_data_destroy)
436 {
437   if (sc->input_handler_user_data_destroy) {
438     sc->input_handler_user_data_destroy (sc->input_handler_user_data);
439   }
440
441   sc->input_handler = callback;
442   sc->input_handler_user_data = user_data;
443   sc->input_handler_user_data_destroy = user_data_destroy;
444 }
445
446 void
447 gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
448     GstRtmpConnectionFunc callback, gpointer user_data,
449     GDestroyNotify user_data_destroy)
450 {
451   if (sc->output_handler_user_data_destroy) {
452     sc->output_handler_user_data_destroy (sc->output_handler_user_data);
453   }
454
455   sc->output_handler = callback;
456   sc->output_handler_user_data = user_data;
457   sc->output_handler_user_data_destroy = user_data_destroy;
458 }
459
460 static gboolean
461 gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
462 {
463   GstRtmpConnection *sc = user_data;
464   gssize ret;
465   guint oldsize;
466   GError *error = NULL;
467   guint64 bytes_since_ack;
468
469   GST_TRACE_OBJECT (sc, "input ready");
470
471   oldsize = sc->input_bytes->len;
472   g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
473   ret =
474       g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
475       sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
476   g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
477
478   if (ret == 0) {
479     error = g_error_new (G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED,
480         "connection closed remotely");
481     ret = -1;
482   }
483
484   if (ret < 0) {
485     gint code = error->code;
486
487     if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
488             code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
489       /* should retry */
490       GST_DEBUG_OBJECT (sc, "read IO error %d %s, continuing",
491           code, error->message);
492       g_error_free (error);
493       return G_SOURCE_CONTINUE;
494     }
495
496     GST_ERROR_OBJECT (sc, "read error: %s %d %s",
497         g_quark_to_string (error->domain), code, error->message);
498
499     gst_rtmp_connection_emit_error (sc, error);
500     return G_SOURCE_REMOVE;
501   }
502
503   GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
504
505   g_mutex_lock (&sc->stats_lock);
506   sc->in_bytes_total += ret;
507   g_mutex_unlock (&sc->stats_lock);
508
509   bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
510   if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
511     gst_rtmp_connection_send_ack (sc);
512   }
513
514   gst_rtmp_connection_try_read (sc);
515   return G_SOURCE_CONTINUE;
516 }
517
518 static void
519 gst_rtmp_connection_start_write (GstRtmpConnection * self)
520 {
521   GOutputStream *os;
522   GstBuffer *message, *chunks;
523   GstRtmpMeta *meta;
524   GstRtmpChunkStream *cstream;
525
526   if (self->writing) {
527     return;
528   }
529
530   message = g_async_queue_try_pop (self->output_queue);
531   if (!message) {
532     return;
533   }
534
535   meta = gst_buffer_get_rtmp_meta (message);
536   if (!meta) {
537     GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
538     goto out;
539   }
540
541   if (gst_rtmp_message_is_protocol_control (message)) {
542     if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
543       GST_ERROR_OBJECT (self,
544           "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
545       goto out;
546     }
547   }
548
549   cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
550   if (!cstream) {
551     GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
552         message);
553     goto out;
554   }
555
556   chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
557       self->out_chunk_size);
558   if (!chunks) {
559     GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
560     goto out;
561   }
562
563   self->writing = TRUE;
564   if (self->output_handler) {
565     self->output_handler (self, self->output_handler_user_data);
566   }
567
568   os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
569   gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
570       self->cancellable, gst_rtmp_connection_write_buffer_done,
571       g_object_ref (self));
572
573   gst_buffer_unref (chunks);
574
575 out:
576   gst_buffer_unref (message);
577 }
578
579 static void
580 gst_rtmp_connection_emit_error (GstRtmpConnection * self, GError * error)
581 {
582   if (!self->error) {
583     self->error = TRUE;
584     cancel_all_commands (self, error->message);
585     g_signal_emit (self, signals[SIGNAL_ERROR], 0, error);
586   }
587
588   g_error_free (error);
589 }
590
591 static void
592 gst_rtmp_connection_write_buffer_done (GObject * obj,
593     GAsyncResult * result, gpointer user_data)
594 {
595   GOutputStream *os = G_OUTPUT_STREAM (obj);
596   GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
597   gsize bytes_written = 0;
598   GError *error = NULL;
599   gboolean res;
600
601   self->writing = FALSE;
602
603   res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
604       &bytes_written, &error);
605
606   g_mutex_lock (&self->stats_lock);
607   self->out_bytes_total += bytes_written;
608   g_mutex_unlock (&self->stats_lock);
609
610   if (!res) {
611     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
612       GST_INFO_OBJECT (self,
613           "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
614     } else {
615       GST_ERROR_OBJECT (self,
616           "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
617           error->message, bytes_written);
618     }
619     gst_rtmp_connection_emit_error (self, error);
620     g_object_unref (self);
621     return;
622   }
623
624   GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
625       bytes_written);
626
627   gst_rtmp_connection_apply_protocol_control (self);
628   gst_rtmp_connection_start_write (self);
629   g_object_unref (self);
630 }
631
632 static void
633 gst_rtmp_connection_start_read (GstRtmpConnection * connection,
634     guint needed_bytes)
635 {
636   g_return_if_fail (needed_bytes > 0);
637   connection->input_needed_bytes = needed_bytes;
638   gst_rtmp_connection_try_read (connection);
639 }
640
641 static void
642 gst_rtmp_connection_try_read (GstRtmpConnection * connection)
643 {
644   guint need = connection->input_needed_bytes,
645       len = connection->input_bytes->len;
646
647   if (len < need) {
648     GST_TRACE_OBJECT (connection, "got %u < %u bytes, need more", len, need);
649     return;
650   }
651
652   GST_TRACE_OBJECT (connection, "got %u >= %u bytes, proceeding", len, need);
653   gst_rtmp_connection_do_read (connection);
654 }
655
656 static void
657 gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
658     GBytes ** outbytes)
659 {
660   g_return_if_fail (size <= sc->input_bytes->len);
661
662   if (outbytes) {
663     *outbytes = g_bytes_new (sc->input_bytes->data, size);
664   }
665
666   g_byte_array_remove_range (sc->input_bytes, 0, size);
667 }
668
669 static void
670 gst_rtmp_connection_do_read (GstRtmpConnection * sc)
671 {
672   GByteArray *input_bytes = sc->input_bytes;
673   gsize needed_bytes = 1;
674
675   while (1) {
676     GstRtmpChunkStream *cstream;
677     guint32 chunk_stream_id, header_size, next_size;
678     guint8 *data;
679
680     chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
681         input_bytes->len);
682
683     if (!chunk_stream_id) {
684       needed_bytes = input_bytes->len + 1;
685       break;
686     }
687
688     cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
689     header_size = gst_rtmp_chunk_stream_parse_header (cstream,
690         input_bytes->data, input_bytes->len);
691
692     if (input_bytes->len < header_size) {
693       needed_bytes = header_size;
694       break;
695     }
696
697     next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
698         sc->in_chunk_size, &data);
699
700     if (input_bytes->len < header_size + next_size) {
701       needed_bytes = header_size + next_size;
702       break;
703     }
704
705     memcpy (data, input_bytes->data + header_size, next_size);
706     gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
707
708     next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
709         sc->in_chunk_size);
710
711     if (next_size == 0) {
712       GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
713       gst_rtmp_connection_handle_message (sc, buffer);
714       gst_buffer_unref (buffer);
715     }
716   }
717
718   gst_rtmp_connection_start_read (sc, needed_bytes);
719 }
720
721 static void
722 gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
723 {
724   if (gst_rtmp_message_is_protocol_control (buffer)) {
725     gst_rtmp_connection_handle_protocol_control (sc, buffer);
726     return;
727   }
728
729   if (gst_rtmp_message_is_user_control (buffer)) {
730     gst_rtmp_connection_handle_user_control (sc, buffer);
731     return;
732   }
733
734   switch (gst_rtmp_message_get_type (buffer)) {
735     case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
736       gst_rtmp_connection_handle_cm (sc, buffer);
737       return;
738
739     case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
740       gst_rtmp_connection_handle_aggregate (sc, buffer);
741       break;
742
743     default:
744       if (sc->input_handler) {
745         sc->input_handler (sc, buffer, sc->input_handler_user_data);
746       }
747       return;
748   }
749 }
750
751 static void
752 gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
753     GstBuffer * buffer)
754 {
755   GstRtmpMeta *meta;
756   GstMapInfo map;
757   gsize pos = 0;
758   guint32 first_ts = 0;
759
760   meta = gst_buffer_get_rtmp_meta (buffer);
761   g_return_if_fail (meta);
762
763   gst_buffer_map (buffer, &map, GST_MAP_READ);
764   GST_TRACE_OBJECT (connection, "got aggregate message");
765
766   /* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
767    * The payload is part of a FLV file.
768    *
769    * WARNING: This spec defines the payload to use an "RTMP message format"
770    * which misidentifies the format of the timestamps and omits the size of the
771    * backpointers. */
772
773   while (pos < map.size) {
774     gsize remaining = map.size - pos;
775     GstBuffer *submessage;
776     GstRtmpMeta *submeta;
777     GstRtmpFlvTagHeader header;
778
779     if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
780       GST_ERROR_OBJECT (connection,
781           "aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
782           GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
783       break;
784     }
785
786     if (remaining < header.total_size) {
787       GST_ERROR_OBJECT (connection,
788           "aggregate contains incomplete message; want %" G_GSIZE_FORMAT
789           ", got %" G_GSIZE_FORMAT, header.total_size, remaining);
790       break;
791     }
792
793     submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
794         GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
795         pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
796
797     GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
798     GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
799     GST_BUFFER_OFFSET_END (submessage) =
800         GST_BUFFER_OFFSET (submessage) + header.total_size;
801
802     submeta = gst_buffer_get_rtmp_meta (submessage);
803     g_assert (submeta);
804
805     submeta->type = header.type;
806     submeta->size = header.payload_size;
807
808     if (pos == 0) {
809       first_ts = header.timestamp;
810     } else {
811       guint32 ts_offset = header.timestamp - first_ts;
812
813       submeta->ts_delta += ts_offset;
814       GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
815       GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
816     }
817
818     gst_rtmp_buffer_dump (submessage, "<<< submessage");
819     gst_rtmp_connection_handle_message (connection, submessage);
820     gst_buffer_unref (submessage);
821
822     pos += header.total_size;
823   }
824
825   gst_buffer_unmap (buffer, &map);
826 }
827
828 static void
829 gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
830     GstBuffer * buffer)
831 {
832   GstRtmpProtocolControl pc;
833
834   if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
835     GST_ERROR_OBJECT (connection, "can't parse protocol control message");
836     return;
837   }
838
839   GST_LOG_OBJECT (connection, "got protocol control message %d:%s", pc.type,
840       gst_rtmp_message_type_get_nick (pc.type));
841
842   switch (pc.type) {
843     case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
844       GST_INFO_OBJECT (connection, "incoming chunk size %" G_GUINT32_FORMAT,
845           pc.param);
846       gst_rtmp_connection_handle_set_chunk_size (connection, pc.param);
847       break;
848
849     case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
850       GST_ERROR_OBJECT (connection, "unimplemented: chunk abort, stream_id = %"
851           G_GUINT32_FORMAT, pc.param);
852       break;
853
854     case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
855       GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
856           pc.param);
857       gst_rtmp_connection_handle_ack (connection, pc.param);
858       break;
859
860     case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
861       GST_INFO_OBJECT (connection,
862           "incoming window ack size: %" G_GUINT32_FORMAT, pc.param);
863       gst_rtmp_connection_handle_window_ack_size (connection, pc.param);
864       break;
865
866     case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
867       GST_FIXME_OBJECT (connection, "set peer bandwidth: %" G_GUINT32_FORMAT
868           ", %" G_GUINT32_FORMAT, pc.param, pc.param2);
869       /* FIXME this is not correct, but close enough */
870       gst_rtmp_connection_request_window_size (connection, pc.param);
871       break;
872
873     default:
874       GST_ERROR_OBJECT (connection, "unimplemented protocol control type %d:%s",
875           pc.type, gst_rtmp_message_type_get_nick (pc.type));
876       break;
877   }
878 }
879
880 static void
881 gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
882     GstBuffer * buffer)
883 {
884   GstRtmpUserControl uc;
885
886   if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
887     GST_ERROR_OBJECT (connection, "can't parse user control message");
888     return;
889   }
890
891   GST_LOG_OBJECT (connection, "got user control message %d:%s", uc.type,
892       gst_rtmp_user_control_type_get_nick (uc.type));
893
894   switch (uc.type) {
895     case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
896     case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
897     case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
898     case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
899       GST_INFO_OBJECT (connection, "stream %u got %s", uc.param,
900           gst_rtmp_user_control_type_get_nick (uc.type));
901       g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
902           uc.type, uc.param);
903       break;
904
905     case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
906       GST_FIXME_OBJECT (connection, "ignoring set buffer length: %"
907           G_GUINT32_FORMAT ", %" G_GUINT32_FORMAT " ms", uc.param, uc.param2);
908       break;
909
910     case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
911       GST_DEBUG_OBJECT (connection, "ping request: %" G_GUINT32_FORMAT,
912           uc.param);
913       gst_rtmp_connection_send_ping_response (connection, uc.param);
914       break;
915
916     case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
917       GST_DEBUG_OBJECT (connection,
918           "ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
919       break;
920
921     case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
922       GST_LOG_OBJECT (connection, "ignoring buffer empty: %" G_GUINT32_FORMAT,
923           uc.param);
924       break;
925
926     case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
927       GST_LOG_OBJECT (connection, "ignoring buffer ready: %" G_GUINT32_FORMAT,
928           uc.param);
929       break;
930
931     default:
932       GST_ERROR_OBJECT (connection, "unimplemented user control type %d:%s",
933           uc.type, gst_rtmp_user_control_type_get_nick (uc.type));
934       break;
935   }
936 }
937
938 static void
939 gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
940     guint32 chunk_size)
941 {
942   if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
943     GST_ERROR_OBJECT (self,
944         "peer requested chunk size %" G_GUINT32_FORMAT "; too small",
945         chunk_size);
946     return;
947   }
948
949   if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
950     GST_ERROR_OBJECT (self,
951         "peer requested chunk size %" G_GUINT32_FORMAT "; too large",
952         chunk_size);
953     return;
954   }
955
956   if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
957     GST_WARNING_OBJECT (self,
958         "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
959   }
960
961   g_mutex_lock (&self->stats_lock);
962   self->in_chunk_size = chunk_size;
963   g_mutex_unlock (&self->stats_lock);
964 }
965
966 static void
967 gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
968 {
969   guint64 last_ack, new_ack;
970   guint32 last_ack_low, last_ack_high;
971
972   last_ack = self->out_bytes_acked;
973   last_ack_low = last_ack & G_MAXUINT32;
974   last_ack_high = (last_ack >> 32) & G_MAXUINT32;
975
976   if (bytes < last_ack_low) {
977     GST_WARNING_OBJECT (self,
978         "Acknowledgement bytes regression, assuming rollover: %"
979         G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
980     last_ack_high += 1;
981   }
982
983   new_ack = (((guint64) last_ack_high) << 32) | bytes;
984
985   GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
986       new_ack - last_ack);
987
988   g_mutex_lock (&self->stats_lock);
989   self->out_bytes_acked = new_ack;
990   g_mutex_unlock (&self->stats_lock);
991 }
992
993 static void
994 gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
995     guint32 window_ack_size)
996 {
997   if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
998     GST_WARNING_OBJECT (self,
999         "peer requested small window ack size %" G_GUINT32_FORMAT,
1000         window_ack_size);
1001   }
1002
1003   g_mutex_lock (&self->stats_lock);
1004   self->in_window_ack_size = window_ack_size;
1005   g_mutex_unlock (&self->stats_lock);
1006 }
1007
1008 static gboolean
1009 is_command_response (const gchar * command_name)
1010 {
1011   return g_strcmp0 (command_name, "_result") == 0 ||
1012       g_strcmp0 (command_name, "_error") == 0;
1013 }
1014
1015 static void
1016 gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
1017 {
1018   GstRtmpMeta *meta;
1019   gchar *command_name;
1020   gdouble transaction_id;
1021   GPtrArray *args;
1022
1023   meta = gst_buffer_get_rtmp_meta (buffer);
1024   g_return_if_fail (meta);
1025
1026   {
1027     GstMapInfo map;
1028     gst_buffer_map (buffer, &map, GST_MAP_READ);
1029     args = gst_amf_parse_command (map.data, map.size, &transaction_id,
1030         &command_name);
1031     gst_buffer_unmap (buffer, &map);
1032   }
1033
1034   if (!args) {
1035     return;
1036   }
1037
1038   if (!isfinite (transaction_id) || transaction_id < 0 ||
1039       transaction_id > G_MAXUINT) {
1040     GST_WARNING_OBJECT (sc,
1041         "Server sent command \"%s\" with extreme transaction ID %.0f",
1042         GST_STR_NULL (command_name), transaction_id);
1043   } else if (transaction_id > sc->transaction_count) {
1044     GST_WARNING_OBJECT (sc,
1045         "Server sent command \"%s\" with unused transaction ID (%.0f > %u)",
1046         GST_STR_NULL (command_name), transaction_id, sc->transaction_count);
1047     sc->transaction_count = transaction_id;
1048   }
1049
1050   GST_DEBUG_OBJECT (sc,
1051       "got control message \"%s\" transaction %.0f size %"
1052       G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
1053       meta->size);
1054
1055   if (is_command_response (command_name)) {
1056     if (transaction_id != 0) {
1057       GList *l;
1058
1059       for (l = sc->transactions; l; l = g_list_next (l)) {
1060         Transaction *t = l->data;
1061
1062         if (t->transaction_id != transaction_id) {
1063           continue;
1064         }
1065
1066         GST_LOG_OBJECT (sc, "calling transaction callback %s",
1067             GST_DEBUG_FUNCPTR_NAME (t->func));
1068         sc->transactions = g_list_remove_link (sc->transactions, l);
1069         t->func (command_name, args, t->user_data);
1070         g_list_free_full (l, transaction_free);
1071         break;
1072       }
1073     } else {
1074       GST_WARNING_OBJECT (sc, "Server sent response \"%s\" without transaction",
1075           GST_STR_NULL (command_name));
1076     }
1077   } else {
1078     GList *l;
1079
1080     if (transaction_id != 0) {
1081       GST_FIXME_OBJECT (sc, "Server sent command \"%s\" expecting reply",
1082           GST_STR_NULL (command_name));
1083     }
1084
1085     for (l = sc->expected_commands; l; l = g_list_next (l)) {
1086       ExpectedCommand *ec = l->data;
1087
1088       if (ec->stream_id != meta->mstream) {
1089         continue;
1090       }
1091
1092       if (g_strcmp0 (ec->command_name, command_name)) {
1093         continue;
1094       }
1095
1096       GST_LOG_OBJECT (sc, "calling expected command callback %s",
1097           GST_DEBUG_FUNCPTR_NAME (ec->func));
1098       sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
1099       ec->func (command_name, args, ec->user_data);
1100       g_list_free_full (l, expected_command_free);
1101       break;
1102     }
1103   }
1104
1105   g_free (command_name);
1106   g_ptr_array_unref (args);
1107 }
1108
1109 static gboolean
1110 start_write (gpointer user_data)
1111 {
1112   GstRtmpConnection *sc = user_data;
1113   gst_rtmp_connection_start_write (sc);
1114   return G_SOURCE_REMOVE;
1115 }
1116
1117 void
1118 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
1119 {
1120   g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
1121   g_return_if_fail (GST_IS_BUFFER (buffer));
1122
1123   g_async_queue_push (self->output_queue, buffer);
1124   g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
1125       start_write, g_object_ref (self), g_object_unref);
1126 }
1127
1128 guint
1129 gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
1130 {
1131   return g_async_queue_length (connection->output_queue);
1132 }
1133
1134 guint
1135 gst_rtmp_connection_send_command (GstRtmpConnection * connection,
1136     GstRtmpCommandCallback response_command, gpointer user_data,
1137     guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
1138     ...)
1139 {
1140   GstBuffer *buffer;
1141   gdouble transaction_id = 0;
1142   va_list ap;
1143   GBytes *payload;
1144   guint8 *data;
1145   gsize size;
1146
1147   g_return_val_if_fail (GST_IS_RTMP_CONNECTION (connection), 0);
1148
1149   if (connection->thread != g_thread_self ()) {
1150     GST_ERROR_OBJECT (connection, "Called from wrong thread");
1151   }
1152
1153   GST_DEBUG_OBJECT (connection,
1154       "Sending command '%s' on stream id %" G_GUINT32_FORMAT,
1155       command_name, stream_id);
1156
1157   if (response_command) {
1158     Transaction *t;
1159
1160     transaction_id = ++connection->transaction_count;
1161
1162     GST_LOG_OBJECT (connection, "Registering %s for transid %.0f",
1163         GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
1164
1165     t = transaction_new (transaction_id, response_command, user_data);
1166
1167     connection->transactions = g_list_append (connection->transactions, t);
1168   }
1169
1170   va_start (ap, argument);
1171   payload = gst_amf_serialize_command_valist (transaction_id,
1172       command_name, argument, ap);
1173   va_end (ap);
1174
1175   data = g_bytes_unref_to_data (payload, &size);
1176   buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
1177       3, stream_id, data, size);
1178
1179   gst_rtmp_connection_queue_message (connection, buffer);
1180   return transaction_id;
1181 }
1182
1183 void
1184 gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
1185     GstRtmpCommandCallback response_command, gpointer user_data,
1186     guint32 stream_id, const gchar * command_name)
1187 {
1188   ExpectedCommand *ec;
1189
1190   g_return_if_fail (response_command);
1191   g_return_if_fail (command_name);
1192   g_return_if_fail (!is_command_response (command_name));
1193
1194   GST_LOG_OBJECT (connection,
1195       "Registering %s for stream id %" G_GUINT32_FORMAT " name \"%s\"",
1196       GST_DEBUG_FUNCPTR_NAME (response_command), stream_id, command_name);
1197
1198   ec = expected_command_new (stream_id, command_name, response_command,
1199       user_data);
1200
1201   connection->expected_commands =
1202       g_list_append (connection->expected_commands, ec);
1203 }
1204
1205 static void
1206 gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
1207 {
1208   guint64 in_bytes_total = connection->in_bytes_total;
1209   GstRtmpProtocolControl pc = {
1210     .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
1211     .param = (guint32) in_bytes_total,
1212   };
1213
1214   gst_rtmp_connection_queue_message (connection,
1215       gst_rtmp_message_new_protocol_control (&pc));
1216
1217   g_mutex_lock (&connection->stats_lock);
1218   connection->in_bytes_acked = in_bytes_total;
1219   g_mutex_unlock (&connection->stats_lock);
1220 }
1221
1222 static void
1223 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
1224     guint32 event_data)
1225 {
1226   GstRtmpUserControl uc = {
1227     .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
1228     .param = event_data,
1229   };
1230
1231   gst_rtmp_connection_queue_message (connection,
1232       gst_rtmp_message_new_user_control (&uc));
1233 }
1234
1235 void
1236 gst_rtmp_connection_set_chunk_size (GstRtmpConnection * connection,
1237     guint32 chunk_size)
1238 {
1239   GstRtmpProtocolControl pc = {
1240     .type = GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE,
1241     .param = chunk_size,
1242   };
1243
1244   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1245
1246   gst_rtmp_connection_queue_message (connection,
1247       gst_rtmp_message_new_protocol_control (&pc));
1248 }
1249
1250 void
1251 gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
1252     guint32 window_ack_size)
1253 {
1254   GstRtmpProtocolControl pc = {
1255     .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
1256     .param = window_ack_size,
1257   };
1258
1259   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1260
1261   gst_rtmp_connection_queue_message (connection,
1262       gst_rtmp_message_new_protocol_control (&pc));
1263 }
1264
1265 void
1266 gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
1267     GstBuffer * buffer)
1268 {
1269   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1270   g_return_if_fail (GST_IS_BUFFER (buffer));
1271
1272   gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
1273   gst_rtmp_connection_queue_message (connection, buffer);
1274 }
1275
1276 static gboolean
1277 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
1278     GstBuffer * buffer)
1279 {
1280   GstRtmpProtocolControl pc;
1281
1282   if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
1283     GST_ERROR_OBJECT (self, "can't parse protocol control message");
1284     return FALSE;
1285   }
1286
1287   switch (pc.type) {
1288     case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
1289       guint32 chunk_size = pc.param;
1290
1291       GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
1292           chunk_size);
1293
1294       if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
1295         GST_ERROR_OBJECT (self,
1296             "requested chunk size %" G_GUINT32_FORMAT " is too small",
1297             chunk_size);
1298         return FALSE;
1299       }
1300
1301       if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
1302         GST_ERROR_OBJECT (self,
1303             "requested chunk size %" G_GUINT32_FORMAT " is too large",
1304             chunk_size);
1305         return FALSE;
1306       }
1307
1308       if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
1309         GST_WARNING_OBJECT (self,
1310             "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
1311       }
1312
1313       self->out_chunk_size_pending = pc.param;
1314       break;
1315     }
1316
1317     case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
1318       guint32 window_ack_size = pc.param;
1319
1320       GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
1321           window_ack_size);
1322
1323       if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1324         GST_WARNING_OBJECT (self,
1325             "requesting small window ack size %" G_GUINT32_FORMAT,
1326             window_ack_size);
1327       }
1328
1329       self->out_window_ack_size_pending = window_ack_size;
1330       break;
1331     }
1332
1333     default:
1334       break;
1335   }
1336
1337   return TRUE;
1338 }
1339
1340 static void
1341 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
1342 {
1343   guint32 chunk_size, window_ack_size;
1344
1345   chunk_size = self->out_chunk_size_pending;
1346   if (chunk_size) {
1347     self->out_chunk_size_pending = 0;
1348
1349     g_mutex_lock (&self->stats_lock);
1350     self->out_chunk_size = chunk_size;
1351     g_mutex_unlock (&self->stats_lock);
1352
1353     GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
1354   }
1355
1356   window_ack_size = self->out_window_ack_size_pending;
1357   if (window_ack_size) {
1358     self->out_window_ack_size_pending = 0;
1359
1360     g_mutex_lock (&self->stats_lock);
1361     self->out_window_ack_size = window_ack_size;
1362     g_mutex_unlock (&self->stats_lock);
1363
1364     GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
1365         window_ack_size);
1366   }
1367 }
1368
1369 static GstStructure *
1370 get_stats (GstRtmpConnection * self)
1371 {
1372   return gst_structure_new ("GstRtmpConnectionStats",
1373       "in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
1374       "out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
1375       "in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
1376       "out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
1377       "in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
1378       "out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
1379       "in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
1380       "out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
1381 }
1382
1383 GstStructure *
1384 gst_rtmp_connection_get_null_stats (void)
1385 {
1386   return get_stats (NULL);
1387 }
1388
1389 GstStructure *
1390 gst_rtmp_connection_get_stats (GstRtmpConnection * self)
1391 {
1392   GstStructure *s;
1393
1394   g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
1395
1396   g_mutex_lock (&self->stats_lock);
1397   s = get_stats (self);
1398   g_mutex_unlock (&self->stats_lock);
1399
1400   return s;
1401 }