gst: don't use volatile to mean atomic
[platform/upstream/gstreamer.git] / gst / rtmp2 / rtmp / rtmpconnection.c
1 /* GStreamer RTMP Library
2  * Copyright (C) 2013 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include <string.h>
28 #include <math.h>
29 #include "rtmpconnection.h"
30 #include "rtmpchunkstream.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
33 #include "amf.h"
34
35 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
36 #define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
37
38 #define READ_SIZE 8192
39
40 typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
41
42 struct _GstRtmpConnection
43 {
44   GObject parent_instance;
45
46   /* should be properties */
47   gboolean input_paused;
48   gboolean error;
49
50   /* private */
51   GThread *thread;
52   GSocketConnection *connection;
53   GCancellable *cancellable;
54   GSocketClient *socket_client;
55   GAsyncQueue *output_queue;
56   GMainContext *main_context;
57
58   GSource *input_source;
59   GByteArray *input_bytes;
60   guint input_needed_bytes;
61   GstRtmpChunkStreams *input_streams, *output_streams;
62   GList *transactions;
63   GList *expected_commands;
64   guint transaction_count;
65
66   GstRtmpConnectionMessageFunc input_handler;
67   gpointer input_handler_user_data;
68   GDestroyNotify input_handler_user_data_destroy;
69
70   GstRtmpConnectionFunc output_handler;
71   gpointer output_handler_user_data;
72   GDestroyNotify output_handler_user_data_destroy;
73
74   gboolean writing;
75
76   /* Protects the values below during concurrent access.
77    * - Taken by the loop thread when writing, but not reading.
78    * - Taken by other threads when reading (calling get_stats).
79    */
80   GMutex stats_lock;
81
82   /* RTMP configuration */
83   guint32 in_chunk_size;
84   guint32 out_chunk_size, out_chunk_size_pending;
85   guint32 in_window_ack_size;
86   guint32 out_window_ack_size, out_window_ack_size_pending;
87
88   guint64 in_bytes_total;
89   guint64 out_bytes_total;
90   guint64 in_bytes_acked;
91   guint64 out_bytes_acked;
92 };
93
94
95 typedef struct
96 {
97   GObjectClass parent_class;
98 } GstRtmpConnectionClass;
99
100 /* prototypes */
101
102 static void gst_rtmp_connection_dispose (GObject * object);
103 static void gst_rtmp_connection_finalize (GObject * object);
104 static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
105 static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
106     gpointer user_data);
107 static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
108 static void gst_rtmp_connection_write_buffer_done (GObject * obj,
109     GAsyncResult * result, gpointer user_data);
110 static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
111     guint needed_bytes);
112 static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
113 static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
114 static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
115     connection, GstBuffer * buffer);
116 static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
117     connection, GstBuffer * buffer);
118 static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
119     GstBuffer * buffer);
120 static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
121     GstBuffer * buffer);
122 static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
123     GstBuffer * buffer);
124 static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
125     guint32 in_chunk_size);
126 static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
127     guint32 bytes);
128 static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
129     self, guint32 in_chunk_size);
130
131 static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
132 static void
133 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
134     guint32 event_data);
135
136 static gboolean
137 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
138     GstBuffer * buffer);
139 static void
140 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
141
142 typedef struct
143 {
144   gdouble transaction_id;
145   GstRtmpCommandCallback func;
146   gpointer user_data;
147 } Transaction;
148
149 static Transaction *
150 transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
151     gpointer user_data)
152 {
153   Transaction *data = g_slice_new (Transaction);
154   data->transaction_id = transaction_id;
155   data->func = func;
156   data->user_data = user_data;
157   return data;
158 }
159
160 static void
161 transaction_free (gpointer ptr)
162 {
163   Transaction *data = ptr;
164   g_slice_free (Transaction, data);
165 }
166
167 typedef struct
168 {
169   guint32 stream_id;
170   gchar *command_name;
171   GstRtmpCommandCallback func;
172   gpointer user_data;
173 } ExpectedCommand;
174
175 static ExpectedCommand *
176 expected_command_new (guint32 stream_id, const gchar * command_name,
177     GstRtmpCommandCallback func, gpointer user_data)
178 {
179   ExpectedCommand *data = g_slice_new (ExpectedCommand);
180   data->stream_id = stream_id;
181   data->command_name = g_strdup (command_name);
182   data->func = func;
183   data->user_data = user_data;
184   return data;
185 }
186
187 static void
188 expected_command_free (gpointer ptr)
189 {
190   ExpectedCommand *data = ptr;
191   g_free (data->command_name);
192   g_slice_free (ExpectedCommand, data);
193 }
194
195 enum
196 {
197   SIGNAL_ERROR,
198   SIGNAL_STREAM_CONTROL,
199
200   N_SIGNALS
201 };
202
203 static guint signals[N_SIGNALS] = { 0, };
204
205 /* singletons */
206
207 static GstMemory *set_data_frame_value;
208
209 static void
210 init_set_data_frame_value (void)
211 {
212   GstAmfNode *node = gst_amf_node_new_string ("@setDataFrame", -1);
213   GBytes *bytes = gst_amf_node_serialize (node);
214   gsize size;
215   const gchar *data = g_bytes_get_data (bytes, &size);
216
217   set_data_frame_value = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
218       (gpointer) data, size, 0, size, bytes, (GDestroyNotify) g_bytes_unref);
219   GST_MINI_OBJECT_FLAG_SET (set_data_frame_value,
220       GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED);
221
222   gst_amf_node_free (node);
223 }
224
225 /* class initialization */
226
227 G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
228     G_TYPE_OBJECT,
229     GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
230         "rtmpconnection", 0, "debug category for GstRtmpConnection class");
231     init_set_data_frame_value ());
232
233 static void
234 gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
235 {
236   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
237
238   gobject_class->dispose = gst_rtmp_connection_dispose;
239   gobject_class->finalize = gst_rtmp_connection_finalize;
240
241   signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
242       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
243
244   signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
245       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
246       G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
247
248   GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
249 }
250
251 static void
252 gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
253 {
254   rtmpconnection->output_queue =
255       g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
256   rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
257   rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
258
259   rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
260   rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
261
262   rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
263   rtmpconnection->input_needed_bytes = 1;
264
265   g_mutex_init (&rtmpconnection->stats_lock);
266 }
267
268 void
269 gst_rtmp_connection_dispose (GObject * object)
270 {
271   GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
272   GST_DEBUG_OBJECT (rtmpconnection, "dispose");
273
274   /* clean up as possible.  may be called multiple times */
275
276   gst_rtmp_connection_close (rtmpconnection);
277   g_cancellable_cancel (rtmpconnection->cancellable);
278   gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
279   gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
280
281   G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
282 }
283
284 void
285 gst_rtmp_connection_finalize (GObject * object)
286 {
287   GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
288   GST_DEBUG_OBJECT (rtmpconnection, "finalize");
289
290   /* clean up object here */
291
292   g_mutex_clear (&rtmpconnection->stats_lock);
293   g_clear_object (&rtmpconnection->cancellable);
294   g_clear_object (&rtmpconnection->connection);
295   g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
296   g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
297   g_clear_pointer (&rtmpconnection->output_streams,
298       gst_rtmp_chunk_streams_free);
299   g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
300   g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
301   g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
302
303   G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
304 }
305
306 GSocket *
307 gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
308 {
309   return g_socket_connection_get_socket (sc->connection);
310 }
311
312 static void
313 gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
314     GSocketConnection * connection)
315 {
316   GInputStream *is;
317
318   sc->thread = g_thread_ref (g_thread_self ());
319   sc->main_context = g_main_context_ref_thread_default ();
320   sc->connection = g_object_ref (connection);
321
322   /* refs the socket because it's creating an input stream, which holds a ref */
323   is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
324   /* refs the socket because it's creating a socket source */
325   g_warn_if_fail (!sc->input_source);
326   sc->input_source =
327       g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
328       sc->cancellable);
329   g_source_set_callback (sc->input_source,
330       (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
331       g_object_unref);
332   g_source_attach (sc->input_source, sc->main_context);
333 }
334
335 GstRtmpConnection *
336 gst_rtmp_connection_new (GSocketConnection * connection,
337     GCancellable * cancellable)
338 {
339   GstRtmpConnection *sc;
340
341   sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
342   if (cancellable)
343     sc->cancellable = g_object_ref (cancellable);
344   else
345     sc->cancellable = g_cancellable_new ();
346
347   gst_rtmp_connection_set_socket_connection (sc, connection);
348
349   return sc;
350 }
351
352 static void
353 cancel_all_commands (GstRtmpConnection * self)
354 {
355   GList *l;
356
357   for (l = self->transactions; l; l = g_list_next (l)) {
358     Transaction *cc = l->data;
359     GST_LOG_OBJECT (self, "calling transaction callback %s",
360         GST_DEBUG_FUNCPTR_NAME (cc->func));
361     cc->func ("<cancelled>", NULL, cc->user_data);
362   }
363   g_list_free_full (self->transactions, transaction_free);
364   self->transactions = NULL;
365
366   for (l = self->expected_commands; l; l = g_list_next (l)) {
367     ExpectedCommand *cc = l->data;
368     GST_LOG_OBJECT (self, "calling expected command callback %s",
369         GST_DEBUG_FUNCPTR_NAME (cc->func));
370     cc->func ("<cancelled>", NULL, cc->user_data);
371   }
372   g_list_free_full (self->expected_commands, expected_command_free);
373   self->expected_commands = NULL;
374 }
375
376 void
377 gst_rtmp_connection_close (GstRtmpConnection * self)
378 {
379   if (self->thread != g_thread_self ()) {
380     GST_ERROR_OBJECT (self, "Called from wrong thread");
381   }
382
383   g_cancellable_cancel (self->cancellable);
384   cancel_all_commands (self);
385
386   if (self->input_source) {
387     g_source_destroy (self->input_source);
388     g_clear_pointer (&self->input_source, g_source_unref);
389   }
390
391   if (self->connection) {
392     g_io_stream_close_async (G_IO_STREAM (self->connection),
393         G_PRIORITY_DEFAULT, NULL, NULL, NULL);
394   }
395 }
396
397 void
398 gst_rtmp_connection_close_and_unref (gpointer ptr)
399 {
400   GstRtmpConnection *connection;
401
402   g_return_if_fail (ptr);
403
404   connection = GST_RTMP_CONNECTION (ptr);
405   gst_rtmp_connection_close (connection);
406   g_object_unref (connection);
407 }
408
409 void
410 gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
411     GstRtmpConnectionMessageFunc callback, gpointer user_data,
412     GDestroyNotify user_data_destroy)
413 {
414   if (sc->input_handler_user_data_destroy) {
415     sc->input_handler_user_data_destroy (sc->input_handler_user_data);
416   }
417
418   sc->input_handler = callback;
419   sc->input_handler_user_data = user_data;
420   sc->input_handler_user_data_destroy = user_data_destroy;
421 }
422
423 void
424 gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
425     GstRtmpConnectionFunc callback, gpointer user_data,
426     GDestroyNotify user_data_destroy)
427 {
428   if (sc->output_handler_user_data_destroy) {
429     sc->output_handler_user_data_destroy (sc->output_handler_user_data);
430   }
431
432   sc->output_handler = callback;
433   sc->output_handler_user_data = user_data;
434   sc->output_handler_user_data_destroy = user_data_destroy;
435 }
436
437 static gboolean
438 gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
439 {
440   GstRtmpConnection *sc = user_data;
441   gssize ret;
442   guint oldsize;
443   GError *error = NULL;
444   guint64 bytes_since_ack;
445
446   GST_TRACE_OBJECT (sc, "input ready");
447
448   oldsize = sc->input_bytes->len;
449   g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
450   ret =
451       g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
452       sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
453   g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
454
455   if (ret < 0) {
456     gint code = error->code;
457
458     if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
459             code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
460       /* should retry */
461       GST_DEBUG_OBJECT (sc, "read IO error %d %s, continuing",
462           code, error->message);
463       g_error_free (error);
464       return G_SOURCE_CONTINUE;
465     }
466
467     GST_ERROR_OBJECT (sc, "read error: %s %d %s",
468         g_quark_to_string (error->domain), code, error->message);
469     g_error_free (error);
470   } else if (ret == 0) {
471     GST_INFO_OBJECT (sc, "read EOF");
472   }
473
474   if (ret <= 0) {
475     gst_rtmp_connection_emit_error (sc);
476     return G_SOURCE_REMOVE;
477   }
478
479   GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
480
481   g_mutex_lock (&sc->stats_lock);
482   sc->in_bytes_total += ret;
483   g_mutex_unlock (&sc->stats_lock);
484
485   bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
486   if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
487     gst_rtmp_connection_send_ack (sc);
488   }
489
490   gst_rtmp_connection_try_read (sc);
491   return G_SOURCE_CONTINUE;
492 }
493
494 static void
495 gst_rtmp_connection_start_write (GstRtmpConnection * self)
496 {
497   GOutputStream *os;
498   GstBuffer *message, *chunks;
499   GstRtmpMeta *meta;
500   GstRtmpChunkStream *cstream;
501
502   if (self->writing) {
503     return;
504   }
505
506   message = g_async_queue_try_pop (self->output_queue);
507   if (!message) {
508     return;
509   }
510
511   meta = gst_buffer_get_rtmp_meta (message);
512   if (!meta) {
513     GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
514     goto out;
515   }
516
517   if (gst_rtmp_message_is_protocol_control (message)) {
518     if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
519       GST_ERROR_OBJECT (self,
520           "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
521       goto out;
522     }
523   }
524
525   cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
526   if (!cstream) {
527     GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
528         message);
529     goto out;
530   }
531
532   chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
533       self->out_chunk_size);
534   if (!chunks) {
535     GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
536     goto out;
537   }
538
539   self->writing = TRUE;
540   if (self->output_handler) {
541     self->output_handler (self, self->output_handler_user_data);
542   }
543
544   os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
545   gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
546       self->cancellable, gst_rtmp_connection_write_buffer_done,
547       g_object_ref (self));
548
549   gst_buffer_unref (chunks);
550
551 out:
552   gst_buffer_unref (message);
553 }
554
555 static void
556 gst_rtmp_connection_emit_error (GstRtmpConnection * self)
557 {
558   if (self->error) {
559     return;
560   }
561
562   GST_INFO_OBJECT (self, "connection error");
563   self->error = TRUE;
564
565   cancel_all_commands (self);
566
567   g_signal_emit (self, signals[SIGNAL_ERROR], 0);
568 }
569
570 static void
571 gst_rtmp_connection_write_buffer_done (GObject * obj,
572     GAsyncResult * result, gpointer user_data)
573 {
574   GOutputStream *os = G_OUTPUT_STREAM (obj);
575   GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
576   gsize bytes_written = 0;
577   GError *error = NULL;
578   gboolean res;
579
580   self->writing = FALSE;
581
582   res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
583       &bytes_written, &error);
584
585   g_mutex_lock (&self->stats_lock);
586   self->out_bytes_total += bytes_written;
587   g_mutex_unlock (&self->stats_lock);
588
589   if (!res) {
590     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
591       GST_INFO_OBJECT (self,
592           "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
593     } else {
594       GST_ERROR_OBJECT (self,
595           "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
596           error->message, bytes_written);
597     }
598     gst_rtmp_connection_emit_error (self);
599     g_error_free (error);
600     g_object_unref (self);
601     return;
602   }
603
604   GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
605       bytes_written);
606
607   gst_rtmp_connection_apply_protocol_control (self);
608   gst_rtmp_connection_start_write (self);
609   g_object_unref (self);
610 }
611
612 static void
613 gst_rtmp_connection_start_read (GstRtmpConnection * connection,
614     guint needed_bytes)
615 {
616   g_return_if_fail (needed_bytes > 0);
617   connection->input_needed_bytes = needed_bytes;
618   gst_rtmp_connection_try_read (connection);
619 }
620
621 static void
622 gst_rtmp_connection_try_read (GstRtmpConnection * connection)
623 {
624   guint need = connection->input_needed_bytes,
625       len = connection->input_bytes->len;
626
627   if (len < need) {
628     GST_TRACE_OBJECT (connection, "got %u < %u bytes, need more", len, need);
629     return;
630   }
631
632   GST_TRACE_OBJECT (connection, "got %u >= %u bytes, proceeding", len, need);
633   gst_rtmp_connection_do_read (connection);
634 }
635
636 static void
637 gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
638     GBytes ** outbytes)
639 {
640   g_return_if_fail (size <= sc->input_bytes->len);
641
642   if (outbytes) {
643     *outbytes = g_bytes_new (sc->input_bytes->data, size);
644   }
645
646   g_byte_array_remove_range (sc->input_bytes, 0, size);
647 }
648
649 static void
650 gst_rtmp_connection_do_read (GstRtmpConnection * sc)
651 {
652   GByteArray *input_bytes = sc->input_bytes;
653   gsize needed_bytes = 1;
654
655   while (1) {
656     GstRtmpChunkStream *cstream;
657     guint32 chunk_stream_id, header_size, next_size;
658     guint8 *data;
659
660     chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
661         input_bytes->len);
662
663     if (!chunk_stream_id) {
664       needed_bytes = input_bytes->len + 1;
665       break;
666     }
667
668     cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
669     header_size = gst_rtmp_chunk_stream_parse_header (cstream,
670         input_bytes->data, input_bytes->len);
671
672     if (input_bytes->len < header_size) {
673       needed_bytes = header_size;
674       break;
675     }
676
677     next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
678         sc->in_chunk_size, &data);
679
680     if (input_bytes->len < header_size + next_size) {
681       needed_bytes = header_size + next_size;
682       break;
683     }
684
685     memcpy (data, input_bytes->data + header_size, next_size);
686     gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
687
688     next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
689         sc->in_chunk_size);
690
691     if (next_size == 0) {
692       GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
693       gst_rtmp_connection_handle_message (sc, buffer);
694       gst_buffer_unref (buffer);
695     }
696   }
697
698   gst_rtmp_connection_start_read (sc, needed_bytes);
699 }
700
701 static void
702 gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
703 {
704   if (gst_rtmp_message_is_protocol_control (buffer)) {
705     gst_rtmp_connection_handle_protocol_control (sc, buffer);
706     return;
707   }
708
709   if (gst_rtmp_message_is_user_control (buffer)) {
710     gst_rtmp_connection_handle_user_control (sc, buffer);
711     return;
712   }
713
714   switch (gst_rtmp_message_get_type (buffer)) {
715     case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
716       gst_rtmp_connection_handle_cm (sc, buffer);
717       return;
718
719     case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
720       gst_rtmp_connection_handle_aggregate (sc, buffer);
721       break;
722
723     default:
724       if (sc->input_handler) {
725         sc->input_handler (sc, buffer, sc->input_handler_user_data);
726       }
727       return;
728   }
729 }
730
731 static void
732 gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
733     GstBuffer * buffer)
734 {
735   GstRtmpMeta *meta;
736   GstMapInfo map;
737   gsize pos = 0;
738   guint32 first_ts = 0;
739
740   meta = gst_buffer_get_rtmp_meta (buffer);
741   g_return_if_fail (meta);
742
743   gst_buffer_map (buffer, &map, GST_MAP_READ);
744   GST_TRACE_OBJECT (connection, "got aggregate message");
745
746   /* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
747    * The payload is part of a FLV file.
748    *
749    * WARNING: This spec defines the payload to use an "RTMP message format"
750    * which misidentifies the format of the timestamps and omits the size of the
751    * backpointers. */
752
753   while (pos < map.size) {
754     gsize remaining = map.size - pos;
755     GstBuffer *submessage;
756     GstRtmpMeta *submeta;
757     GstRtmpFlvTagHeader header;
758
759     if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
760       GST_ERROR_OBJECT (connection,
761           "aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
762           GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
763       break;
764     }
765
766     if (remaining < header.total_size) {
767       GST_ERROR_OBJECT (connection,
768           "aggregate contains incomplete message; want %" G_GSIZE_FORMAT
769           ", got %" G_GSIZE_FORMAT, header.total_size, remaining);
770       break;
771     }
772
773     submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
774         GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
775         pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
776
777     GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
778     GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
779     GST_BUFFER_OFFSET_END (submessage) =
780         GST_BUFFER_OFFSET (submessage) + header.total_size;
781
782     submeta = gst_buffer_get_rtmp_meta (submessage);
783     g_assert (submeta);
784
785     submeta->type = header.type;
786     submeta->size = header.payload_size;
787
788     if (pos == 0) {
789       first_ts = header.timestamp;
790     } else {
791       guint32 ts_offset = header.timestamp - first_ts;
792
793       submeta->ts_delta += ts_offset;
794       GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
795       GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
796     }
797
798     gst_rtmp_buffer_dump (submessage, "<<< submessage");
799     gst_rtmp_connection_handle_message (connection, submessage);
800     gst_buffer_unref (submessage);
801
802     pos += header.total_size;
803   }
804
805   gst_buffer_unmap (buffer, &map);
806 }
807
808 static void
809 gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
810     GstBuffer * buffer)
811 {
812   GstRtmpProtocolControl pc;
813
814   if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
815     GST_ERROR_OBJECT (connection, "can't parse protocol control message");
816     return;
817   }
818
819   GST_LOG_OBJECT (connection, "got protocol control message %d:%s", pc.type,
820       gst_rtmp_message_type_get_nick (pc.type));
821
822   switch (pc.type) {
823     case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
824       GST_INFO_OBJECT (connection, "incoming chunk size %" G_GUINT32_FORMAT,
825           pc.param);
826       gst_rtmp_connection_handle_set_chunk_size (connection, pc.param);
827       break;
828
829     case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
830       GST_ERROR_OBJECT (connection, "unimplemented: chunk abort, stream_id = %"
831           G_GUINT32_FORMAT, pc.param);
832       break;
833
834     case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
835       GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
836           pc.param);
837       gst_rtmp_connection_handle_ack (connection, pc.param);
838       break;
839
840     case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
841       GST_INFO_OBJECT (connection,
842           "incoming window ack size: %" G_GUINT32_FORMAT, pc.param);
843       gst_rtmp_connection_handle_window_ack_size (connection, pc.param);
844       break;
845
846     case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
847       GST_FIXME_OBJECT (connection, "set peer bandwidth: %" G_GUINT32_FORMAT
848           ", %" G_GUINT32_FORMAT, pc.param, pc.param2);
849       /* FIXME this is not correct, but close enough */
850       gst_rtmp_connection_request_window_size (connection, pc.param);
851       break;
852
853     default:
854       GST_ERROR_OBJECT (connection, "unimplemented protocol control type %d:%s",
855           pc.type, gst_rtmp_message_type_get_nick (pc.type));
856       break;
857   }
858 }
859
860 static void
861 gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
862     GstBuffer * buffer)
863 {
864   GstRtmpUserControl uc;
865
866   if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
867     GST_ERROR_OBJECT (connection, "can't parse user control message");
868     return;
869   }
870
871   GST_LOG_OBJECT (connection, "got user control message %d:%s", uc.type,
872       gst_rtmp_user_control_type_get_nick (uc.type));
873
874   switch (uc.type) {
875     case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
876     case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
877     case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
878     case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
879       GST_INFO_OBJECT (connection, "stream %u got %s", uc.param,
880           gst_rtmp_user_control_type_get_nick (uc.type));
881       g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
882           uc.type, uc.param);
883       break;
884
885     case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
886       GST_FIXME_OBJECT (connection, "ignoring set buffer length: %"
887           G_GUINT32_FORMAT ", %" G_GUINT32_FORMAT " ms", uc.param, uc.param2);
888       break;
889
890     case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
891       GST_DEBUG_OBJECT (connection, "ping request: %" G_GUINT32_FORMAT,
892           uc.param);
893       gst_rtmp_connection_send_ping_response (connection, uc.param);
894       break;
895
896     case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
897       GST_DEBUG_OBJECT (connection,
898           "ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
899       break;
900
901     case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
902       GST_LOG_OBJECT (connection, "ignoring buffer empty: %" G_GUINT32_FORMAT,
903           uc.param);
904       break;
905
906     case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
907       GST_LOG_OBJECT (connection, "ignoring buffer ready: %" G_GUINT32_FORMAT,
908           uc.param);
909       break;
910
911     default:
912       GST_ERROR_OBJECT (connection, "unimplemented user control type %d:%s",
913           uc.type, gst_rtmp_user_control_type_get_nick (uc.type));
914       break;
915   }
916 }
917
918 static void
919 gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
920     guint32 chunk_size)
921 {
922   if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
923     GST_ERROR_OBJECT (self,
924         "peer requested chunk size %" G_GUINT32_FORMAT "; too small",
925         chunk_size);
926     return;
927   }
928
929   if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
930     GST_ERROR_OBJECT (self,
931         "peer requested chunk size %" G_GUINT32_FORMAT "; too large",
932         chunk_size);
933     return;
934   }
935
936   if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
937     GST_WARNING_OBJECT (self,
938         "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
939   }
940
941   g_mutex_lock (&self->stats_lock);
942   self->in_chunk_size = chunk_size;
943   g_mutex_unlock (&self->stats_lock);
944 }
945
946 static void
947 gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
948 {
949   guint64 last_ack, new_ack;
950   guint32 last_ack_low, last_ack_high;
951
952   last_ack = self->out_bytes_acked;
953   last_ack_low = last_ack & G_MAXUINT32;
954   last_ack_high = (last_ack >> 32) & G_MAXUINT32;
955
956   if (bytes < last_ack_low) {
957     GST_WARNING_OBJECT (self,
958         "Acknowledgement bytes regression, assuming rollover: %"
959         G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
960     last_ack_high += 1;
961   }
962
963   new_ack = (((guint64) last_ack_high) << 32) | bytes;
964
965   GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
966       new_ack - last_ack);
967
968   g_mutex_lock (&self->stats_lock);
969   self->out_bytes_acked = new_ack;
970   g_mutex_unlock (&self->stats_lock);
971 }
972
973 static void
974 gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
975     guint32 window_ack_size)
976 {
977   if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
978     GST_WARNING_OBJECT (self,
979         "peer requested small window ack size %" G_GUINT32_FORMAT,
980         window_ack_size);
981   }
982
983   g_mutex_lock (&self->stats_lock);
984   self->in_window_ack_size = window_ack_size;
985   g_mutex_unlock (&self->stats_lock);
986 }
987
988 static gboolean
989 is_command_response (const gchar * command_name)
990 {
991   return g_strcmp0 (command_name, "_result") == 0 ||
992       g_strcmp0 (command_name, "_error") == 0;
993 }
994
995 static void
996 gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
997 {
998   GstRtmpMeta *meta;
999   gchar *command_name;
1000   gdouble transaction_id;
1001   GPtrArray *args;
1002
1003   meta = gst_buffer_get_rtmp_meta (buffer);
1004   g_return_if_fail (meta);
1005
1006   {
1007     GstMapInfo map;
1008     gst_buffer_map (buffer, &map, GST_MAP_READ);
1009     args = gst_amf_parse_command (map.data, map.size, &transaction_id,
1010         &command_name);
1011     gst_buffer_unmap (buffer, &map);
1012   }
1013
1014   if (!args) {
1015     return;
1016   }
1017
1018   if (!isfinite (transaction_id) || transaction_id < 0 ||
1019       transaction_id > G_MAXUINT) {
1020     GST_WARNING_OBJECT (sc,
1021         "Server sent command \"%s\" with extreme transaction ID %.0f",
1022         GST_STR_NULL (command_name), transaction_id);
1023   } else if (transaction_id > sc->transaction_count) {
1024     GST_WARNING_OBJECT (sc,
1025         "Server sent command \"%s\" with unused transaction ID (%.0f > %u)",
1026         GST_STR_NULL (command_name), transaction_id, sc->transaction_count);
1027     sc->transaction_count = transaction_id;
1028   }
1029
1030   GST_DEBUG_OBJECT (sc,
1031       "got control message \"%s\" transaction %.0f size %"
1032       G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
1033       meta->size);
1034
1035   if (is_command_response (command_name)) {
1036     if (transaction_id != 0) {
1037       GList *l;
1038
1039       for (l = sc->transactions; l; l = g_list_next (l)) {
1040         Transaction *t = l->data;
1041
1042         if (t->transaction_id != transaction_id) {
1043           continue;
1044         }
1045
1046         GST_LOG_OBJECT (sc, "calling transaction callback %s",
1047             GST_DEBUG_FUNCPTR_NAME (t->func));
1048         sc->transactions = g_list_remove_link (sc->transactions, l);
1049         t->func (command_name, args, t->user_data);
1050         g_list_free_full (l, transaction_free);
1051         break;
1052       }
1053     } else {
1054       GST_WARNING_OBJECT (sc, "Server sent response \"%s\" without transaction",
1055           GST_STR_NULL (command_name));
1056     }
1057   } else {
1058     GList *l;
1059
1060     if (transaction_id != 0) {
1061       GST_FIXME_OBJECT (sc, "Server sent command \"%s\" expecting reply",
1062           GST_STR_NULL (command_name));
1063     }
1064
1065     for (l = sc->expected_commands; l; l = g_list_next (l)) {
1066       ExpectedCommand *ec = l->data;
1067
1068       if (ec->stream_id != meta->mstream) {
1069         continue;
1070       }
1071
1072       if (g_strcmp0 (ec->command_name, command_name)) {
1073         continue;
1074       }
1075
1076       GST_LOG_OBJECT (sc, "calling expected command callback %s",
1077           GST_DEBUG_FUNCPTR_NAME (ec->func));
1078       sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
1079       ec->func (command_name, args, ec->user_data);
1080       g_list_free_full (l, expected_command_free);
1081       break;
1082     }
1083   }
1084
1085   g_free (command_name);
1086   g_ptr_array_unref (args);
1087 }
1088
1089 static gboolean
1090 start_write (gpointer user_data)
1091 {
1092   GstRtmpConnection *sc = user_data;
1093   gst_rtmp_connection_start_write (sc);
1094   return G_SOURCE_REMOVE;
1095 }
1096
1097 void
1098 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
1099 {
1100   g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
1101   g_return_if_fail (GST_IS_BUFFER (buffer));
1102
1103   g_async_queue_push (self->output_queue, buffer);
1104   g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
1105       start_write, g_object_ref (self), g_object_unref);
1106 }
1107
1108 guint
1109 gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
1110 {
1111   return g_async_queue_length (connection->output_queue);
1112 }
1113
1114 guint
1115 gst_rtmp_connection_send_command (GstRtmpConnection * connection,
1116     GstRtmpCommandCallback response_command, gpointer user_data,
1117     guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
1118     ...)
1119 {
1120   GstBuffer *buffer;
1121   gdouble transaction_id = 0;
1122   va_list ap;
1123   GBytes *payload;
1124   guint8 *data;
1125   gsize size;
1126
1127   g_return_val_if_fail (GST_IS_RTMP_CONNECTION (connection), 0);
1128
1129   if (connection->thread != g_thread_self ()) {
1130     GST_ERROR_OBJECT (connection, "Called from wrong thread");
1131   }
1132
1133   GST_DEBUG_OBJECT (connection,
1134       "Sending command '%s' on stream id %" G_GUINT32_FORMAT,
1135       command_name, stream_id);
1136
1137   if (response_command) {
1138     Transaction *t;
1139
1140     transaction_id = ++connection->transaction_count;
1141
1142     GST_LOG_OBJECT (connection, "Registering %s for transid %.0f",
1143         GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
1144
1145     t = transaction_new (transaction_id, response_command, user_data);
1146
1147     connection->transactions = g_list_append (connection->transactions, t);
1148   }
1149
1150   va_start (ap, argument);
1151   payload = gst_amf_serialize_command_valist (transaction_id,
1152       command_name, argument, ap);
1153   va_end (ap);
1154
1155   data = g_bytes_unref_to_data (payload, &size);
1156   buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
1157       3, stream_id, data, size);
1158
1159   gst_rtmp_connection_queue_message (connection, buffer);
1160   return transaction_id;
1161 }
1162
1163 void
1164 gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
1165     GstRtmpCommandCallback response_command, gpointer user_data,
1166     guint32 stream_id, const gchar * command_name)
1167 {
1168   ExpectedCommand *ec;
1169
1170   g_return_if_fail (response_command);
1171   g_return_if_fail (command_name);
1172   g_return_if_fail (!is_command_response (command_name));
1173
1174   GST_LOG_OBJECT (connection,
1175       "Registering %s for stream id %" G_GUINT32_FORMAT " name \"%s\"",
1176       GST_DEBUG_FUNCPTR_NAME (response_command), stream_id, command_name);
1177
1178   ec = expected_command_new (stream_id, command_name, response_command,
1179       user_data);
1180
1181   connection->expected_commands =
1182       g_list_append (connection->expected_commands, ec);
1183 }
1184
1185 static void
1186 gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
1187 {
1188   guint64 in_bytes_total = connection->in_bytes_total;
1189   GstRtmpProtocolControl pc = {
1190     .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
1191     .param = (guint32) in_bytes_total,
1192   };
1193
1194   gst_rtmp_connection_queue_message (connection,
1195       gst_rtmp_message_new_protocol_control (&pc));
1196
1197   g_mutex_lock (&connection->stats_lock);
1198   connection->in_bytes_acked = in_bytes_total;
1199   g_mutex_unlock (&connection->stats_lock);
1200 }
1201
1202 static void
1203 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
1204     guint32 event_data)
1205 {
1206   GstRtmpUserControl uc = {
1207     .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
1208     .param = event_data,
1209   };
1210
1211   gst_rtmp_connection_queue_message (connection,
1212       gst_rtmp_message_new_user_control (&uc));
1213 }
1214
1215 void
1216 gst_rtmp_connection_set_chunk_size (GstRtmpConnection * connection,
1217     guint32 chunk_size)
1218 {
1219   GstRtmpProtocolControl pc = {
1220     .type = GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE,
1221     .param = chunk_size,
1222   };
1223
1224   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1225
1226   gst_rtmp_connection_queue_message (connection,
1227       gst_rtmp_message_new_protocol_control (&pc));
1228 }
1229
1230 void
1231 gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
1232     guint32 window_ack_size)
1233 {
1234   GstRtmpProtocolControl pc = {
1235     .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
1236     .param = window_ack_size,
1237   };
1238
1239   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1240
1241   gst_rtmp_connection_queue_message (connection,
1242       gst_rtmp_message_new_protocol_control (&pc));
1243 }
1244
1245 void
1246 gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
1247     GstBuffer * buffer)
1248 {
1249   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1250   g_return_if_fail (GST_IS_BUFFER (buffer));
1251
1252   gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
1253   gst_rtmp_connection_queue_message (connection, buffer);
1254 }
1255
1256 static gboolean
1257 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
1258     GstBuffer * buffer)
1259 {
1260   GstRtmpProtocolControl pc;
1261
1262   if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
1263     GST_ERROR_OBJECT (self, "can't parse protocol control message");
1264     return FALSE;
1265   }
1266
1267   switch (pc.type) {
1268     case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
1269       guint32 chunk_size = pc.param;
1270
1271       GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
1272           chunk_size);
1273
1274       if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
1275         GST_ERROR_OBJECT (self,
1276             "requested chunk size %" G_GUINT32_FORMAT " is too small",
1277             chunk_size);
1278         return FALSE;
1279       }
1280
1281       if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
1282         GST_ERROR_OBJECT (self,
1283             "requested chunk size %" G_GUINT32_FORMAT " is too large",
1284             chunk_size);
1285         return FALSE;
1286       }
1287
1288       if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
1289         GST_WARNING_OBJECT (self,
1290             "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
1291       }
1292
1293       self->out_chunk_size_pending = pc.param;
1294       break;
1295     }
1296
1297     case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
1298       guint32 window_ack_size = pc.param;
1299
1300       GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
1301           window_ack_size);
1302
1303       if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1304         GST_WARNING_OBJECT (self,
1305             "requesting small window ack size %" G_GUINT32_FORMAT,
1306             window_ack_size);
1307       }
1308
1309       self->out_window_ack_size_pending = window_ack_size;
1310       break;
1311     }
1312
1313     default:
1314       break;
1315   }
1316
1317   return TRUE;
1318 }
1319
1320 static void
1321 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
1322 {
1323   guint32 chunk_size, window_ack_size;
1324
1325   chunk_size = self->out_chunk_size_pending;
1326   if (chunk_size) {
1327     self->out_chunk_size_pending = 0;
1328
1329     g_mutex_lock (&self->stats_lock);
1330     self->out_chunk_size = chunk_size;
1331     g_mutex_unlock (&self->stats_lock);
1332
1333     GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
1334   }
1335
1336   window_ack_size = self->out_window_ack_size_pending;
1337   if (window_ack_size) {
1338     self->out_window_ack_size_pending = 0;
1339
1340     g_mutex_lock (&self->stats_lock);
1341     self->out_window_ack_size = window_ack_size;
1342     g_mutex_unlock (&self->stats_lock);
1343
1344     GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
1345         window_ack_size);
1346   }
1347 }
1348
1349 static GstStructure *
1350 get_stats (GstRtmpConnection * self)
1351 {
1352   return gst_structure_new ("GstRtmpConnectionStats",
1353       "in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
1354       "out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
1355       "in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
1356       "out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
1357       "in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
1358       "out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
1359       "in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
1360       "out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
1361 }
1362
1363 GstStructure *
1364 gst_rtmp_connection_get_null_stats (void)
1365 {
1366   return get_stats (NULL);
1367 }
1368
1369 GstStructure *
1370 gst_rtmp_connection_get_stats (GstRtmpConnection * self)
1371 {
1372   GstStructure *s;
1373
1374   g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
1375
1376   g_mutex_lock (&self->stats_lock);
1377   s = get_stats (self);
1378   g_mutex_unlock (&self->stats_lock);
1379
1380   return s;
1381 }