rtmp2: Reject oversized messages
[platform/upstream/gstreamer.git] / gst / rtmp2 / rtmp / rtmpchunkstream.c
1 /* GStreamer RTMP Library
2  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
3  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
18  * Boston, MA 02110-1335, USA.
19  */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include "rtmpchunkstream.h"
26 #include "rtmputils.h"
27
28 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_stream_debug_category);
29 #define GST_CAT_DEFAULT gst_rtmp_chunk_stream_debug_category
30
31 static void
32 init_debug (void)
33 {
34   static volatile gsize done = 0;
35   if (g_once_init_enter (&done)) {
36     GST_DEBUG_CATEGORY_INIT (gst_rtmp_chunk_stream_debug_category,
37         "rtmpchunkstream", 0, "debug category for rtmp chunk streams");
38     g_once_init_leave (&done, 1);
39   }
40 }
41
42 enum
43 {
44   CHUNK_BYTE_TWOBYTE = 0,
45   CHUNK_BYTE_THREEBYTE = 1,
46   CHUNK_BYTE_MASK = 0x3f,
47   CHUNK_STREAM_MIN_TWOBYTE = 0x40,
48   CHUNK_STREAM_MIN_THREEBYTE = 0x140,
49   CHUNK_STREAM_MAX_THREEBYTE = 0x1003f,
50 };
51
52 typedef enum
53 {
54   CHUNK_TYPE_0 = 0,
55   CHUNK_TYPE_1 = 1,
56   CHUNK_TYPE_2 = 2,
57   CHUNK_TYPE_3 = 3,
58 } ChunkType;
59
60 static const gsize chunk_header_sizes[4] = { 11, 7, 3, 0 };
61
62 struct _GstRtmpChunkStream
63 {
64   GstBuffer *buffer;
65   GstRtmpMeta *meta;
66   GstMapInfo map;               /* Only used for parsing */
67   guint32 id;
68   guint32 offset;
69   guint64 bytes;
70 };
71
72 struct _GstRtmpChunkStreams
73 {
74   GArray *array;
75 };
76
77 static inline gboolean
78 chunk_stream_is_open (GstRtmpChunkStream * cstream)
79 {
80   return cstream->map.data != NULL;
81 }
82
83 static void
84 chunk_stream_take_buffer (GstRtmpChunkStream * cstream, GstBuffer * buffer)
85 {
86   GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
87   g_assert (meta);
88   g_assert (cstream->buffer == NULL);
89   cstream->buffer = buffer;
90   cstream->meta = meta;
91 }
92
93 static void
94 chunk_stream_clear (GstRtmpChunkStream * cstream)
95 {
96   if (chunk_stream_is_open (cstream)) {
97     gst_buffer_unmap (cstream->buffer, &cstream->map);
98     cstream->map.data = NULL;
99   }
100
101   gst_buffer_replace (&cstream->buffer, NULL);
102   cstream->meta = NULL;
103   cstream->offset = 0;
104 }
105
106 static guint32
107 chunk_stream_next_size (GstRtmpChunkStream * cstream, guint32 chunk_size)
108 {
109   guint32 size, offset;
110
111   size = cstream->meta->size;
112   offset = cstream->offset;
113
114   g_return_val_if_fail (offset <= size, 0);
115   return MIN (size - offset, chunk_size);
116 }
117
118 static inline gboolean
119 needs_ext_ts (GstRtmpMeta * meta)
120 {
121   return meta->ts_delta >= 0xffffff;
122 }
123
124
125 static guint32
126 dts_to_abs_ts (GstBuffer * buffer)
127 {
128   GstClockTime dts = GST_BUFFER_DTS (buffer);
129   guint32 ret = 0;
130
131   if (GST_CLOCK_TIME_IS_VALID (dts)) {
132     ret = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
133   }
134
135   GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " into abs TS %"
136       G_GUINT32_FORMAT " ms", GST_TIME_ARGS (dts), ret);
137   return ret;
138 }
139
140 static gboolean
141 dts_diff_to_delta_ts (GstBuffer * old_buffer, GstBuffer * buffer,
142     guint32 * out_ts)
143 {
144   GstClockTime dts = GST_BUFFER_DTS (buffer),
145       old_dts = GST_BUFFER_DTS (old_buffer);
146   guint32 abs_ts, old_abs_ts, delta_32 = 0;
147
148   if (!GST_CLOCK_TIME_IS_VALID (dts) || !GST_CLOCK_TIME_IS_VALID (old_dts)) {
149     GST_LOG ("Timestamps not valid; using delta TS 0");
150     goto out;
151   }
152
153   if (ABS (GST_CLOCK_DIFF (old_dts, dts)) > GST_MSECOND * G_MAXINT32) {
154     GST_WARNING ("Timestamp delta too large: %" GST_TIME_FORMAT " -> %"
155         GST_TIME_FORMAT, GST_TIME_ARGS (old_dts), GST_TIME_ARGS (dts));
156     return FALSE;
157   }
158
159   abs_ts = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
160   old_abs_ts = gst_util_uint64_scale_round (old_dts, 1, GST_MSECOND);
161
162   /* underflow wraps around */
163   delta_32 = abs_ts - old_abs_ts;
164
165   GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT
166       " ms) -> %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT " ms) into delta TS %"
167       G_GUINT32_FORMAT " ms", GST_TIME_ARGS (old_dts), old_abs_ts,
168       GST_TIME_ARGS (dts), abs_ts, delta_32);
169
170 out:
171   *out_ts = delta_32;
172   return TRUE;
173 }
174
175 static ChunkType
176 select_chunk_type (GstRtmpChunkStream * cstream, GstBuffer * buffer)
177 {
178   GstBuffer *old_buffer = cstream->buffer;
179   GstRtmpMeta *meta, *old_meta;
180
181   g_return_val_if_fail (buffer, -1);
182
183   meta = gst_buffer_get_rtmp_meta (buffer);
184
185   g_return_val_if_fail (meta, -1);
186   g_return_val_if_fail (gst_rtmp_message_type_is_valid (meta->type), -1);
187
188   meta->size = gst_buffer_get_size (buffer);
189   meta->cstream = cstream->id;
190
191   g_return_val_if_fail (meta->size <= GST_RTMP_MAXIMUM_MESSAGE_SIZE, -1);
192
193   if (!old_buffer) {
194     GST_TRACE ("Picking header 0: no previous header");
195     meta->ts_delta = dts_to_abs_ts (buffer);
196     return CHUNK_TYPE_0;
197   }
198
199   old_meta = gst_buffer_get_rtmp_meta (old_buffer);
200   g_return_val_if_fail (old_meta, -1);
201
202   if (old_meta->mstream != meta->mstream) {
203     GST_TRACE ("Picking header 0: stream mismatch; "
204         "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
205         old_meta->mstream, meta->mstream);
206     meta->ts_delta = dts_to_abs_ts (buffer);
207     return CHUNK_TYPE_0;
208   }
209
210   if (!dts_diff_to_delta_ts (old_buffer, buffer, &meta->ts_delta)) {
211     GST_TRACE ("Picking header 0: timestamp delta overflow");
212     meta->ts_delta = dts_to_abs_ts (buffer);
213     return CHUNK_TYPE_0;
214   }
215
216   /* now at least type 1 */
217
218   if (old_meta->type != meta->type) {
219     GST_TRACE ("Picking header 1: type mismatch; want %d got %d",
220         old_meta->type, meta->type);
221     return CHUNK_TYPE_1;
222   }
223
224   if (old_meta->size != meta->size) {
225     GST_TRACE ("Picking header 1: size mismatch; "
226         "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
227         old_meta->size, meta->size);
228     return CHUNK_TYPE_1;
229   }
230
231   /* now at least type 2 */
232
233   if (old_meta->ts_delta != meta->ts_delta) {
234     GST_TRACE ("Picking header 2: timestamp delta mismatch; "
235         "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
236         old_meta->ts_delta, meta->ts_delta);
237     return CHUNK_TYPE_2;
238   }
239
240   /* now at least type 3 */
241
242   GST_TRACE ("Picking header 3");
243   return CHUNK_TYPE_3;
244 }
245
246 static GstBuffer *
247 serialize_next (GstRtmpChunkStream * cstream, guint32 chunk_size,
248     ChunkType type)
249 {
250   GstRtmpMeta *meta = cstream->meta;
251   guint8 small_stream_id;
252   gsize header_size = chunk_header_sizes[type], offset;
253   gboolean ext_ts;
254   GstBuffer *ret;
255   GstMapInfo map;
256
257   GST_TRACE ("Serializing a chunk of type %d, offset %" G_GUINT32_FORMAT,
258       type, cstream->offset);
259
260   if (cstream->id < CHUNK_STREAM_MIN_TWOBYTE) {
261     small_stream_id = cstream->id;
262     header_size += 1;
263   } else if (cstream->id < CHUNK_STREAM_MIN_THREEBYTE) {
264     small_stream_id = CHUNK_BYTE_TWOBYTE;
265     header_size += 2;
266   } else {
267     small_stream_id = CHUNK_BYTE_THREEBYTE;
268     header_size += 3;
269   }
270
271   ext_ts = needs_ext_ts (meta);
272   if (ext_ts) {
273     header_size += 4;
274   }
275
276   GST_TRACE ("Allocating buffer, header size %" G_GSIZE_FORMAT, header_size);
277
278   ret = gst_buffer_new_allocate (NULL, header_size, NULL);
279   if (!ret) {
280     GST_ERROR ("Failed to allocate chunk buffer");
281     return NULL;
282   }
283
284   if (!gst_buffer_map (ret, &map, GST_MAP_WRITE)) {
285     GST_ERROR ("Failed to map %" GST_PTR_FORMAT, ret);
286     gst_buffer_unref (ret);
287     return NULL;
288   }
289
290   /* Chunk Basic Header */
291   GST_WRITE_UINT8 (map.data, (type << 6) | small_stream_id);
292   offset = 1;
293
294   switch (small_stream_id) {
295     case CHUNK_BYTE_TWOBYTE:
296       GST_WRITE_UINT8 (map.data + 1, cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
297       offset += 1;
298       break;
299
300     case CHUNK_BYTE_THREEBYTE:
301       GST_WRITE_UINT16_LE (map.data + 1,
302           cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
303       offset += 2;
304       break;
305   }
306
307   switch (type) {
308     case CHUNK_TYPE_0:
309       /* SRSLY:  "Message stream ID is stored in little-endian format." */
310       GST_WRITE_UINT32_LE (map.data + offset + 7, meta->mstream);
311       /* no break */
312     case CHUNK_TYPE_1:
313       GST_WRITE_UINT24_BE (map.data + offset + 3, meta->size);
314       GST_WRITE_UINT8 (map.data + offset + 6, meta->type);
315       /* no break */
316     case CHUNK_TYPE_2:
317       GST_WRITE_UINT24_BE (map.data + offset,
318           ext_ts ? 0xffffff : meta->ts_delta);
319       /* no break */
320     case CHUNK_TYPE_3:
321       offset += chunk_header_sizes[type];
322
323       if (ext_ts) {
324         GST_WRITE_UINT32_BE (map.data + offset, meta->ts_delta);
325         offset += 4;
326       }
327   }
328
329   g_assert (offset == header_size);
330   GST_MEMDUMP (">>> chunk header", map.data, offset);
331
332   gst_buffer_unmap (ret, &map);
333
334   GST_BUFFER_OFFSET (ret) = GST_BUFFER_OFFSET_IS_VALID (cstream->buffer) ?
335       GST_BUFFER_OFFSET (cstream->buffer) + cstream->offset : cstream->bytes;
336   GST_BUFFER_OFFSET_END (ret) = GST_BUFFER_OFFSET (ret);
337
338   if (meta->size > 0) {
339     guint32 payload_size = chunk_stream_next_size (cstream, chunk_size);
340
341     GST_TRACE ("Appending %" G_GUINT32_FORMAT " bytes of payload",
342         payload_size);
343
344     gst_buffer_copy_into (ret, cstream->buffer, GST_BUFFER_COPY_MEMORY,
345         cstream->offset, payload_size);
346
347     GST_BUFFER_OFFSET_END (ret) += payload_size;
348     cstream->offset += payload_size;
349     cstream->bytes += payload_size;
350   } else {
351     GST_TRACE ("Chunk has no payload");
352   }
353
354   gst_rtmp_buffer_dump (ret, ">>> chunk");
355
356   return ret;
357 }
358
359 void
360 gst_rtmp_chunk_stream_clear (GstRtmpChunkStream * cstream)
361 {
362   g_return_if_fail (cstream);
363   GST_LOG ("Clearing chunk stream %" G_GUINT32_FORMAT, cstream->id);
364   chunk_stream_clear (cstream);
365 }
366
367 guint32
368 gst_rtmp_chunk_stream_parse_id (const guint8 * data, gsize size)
369 {
370   guint32 ret;
371
372   if (size < 1) {
373     GST_TRACE ("Not enough bytes to read ID");
374     return 0;
375   }
376
377   ret = GST_READ_UINT8 (data) & CHUNK_BYTE_MASK;
378
379   switch (ret) {
380     case CHUNK_BYTE_TWOBYTE:
381       if (size < 2) {
382         GST_TRACE ("Not enough bytes to read two-byte ID");
383         return 0;
384       }
385
386       ret = GST_READ_UINT8 (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
387       break;
388
389     case CHUNK_BYTE_THREEBYTE:
390       if (size < 3) {
391         GST_TRACE ("Not enough bytes to read three-byte ID");
392         return 0;
393       }
394
395       ret = GST_READ_UINT16_LE (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
396       break;
397   }
398
399   GST_TRACE ("Parsed chunk stream ID %" G_GUINT32_FORMAT, ret);
400   return ret;
401 }
402
403 guint32
404 gst_rtmp_chunk_stream_parse_header (GstRtmpChunkStream * cstream,
405     const guint8 * data, gsize size)
406 {
407   GstBuffer *buffer;
408   GstRtmpMeta *meta;
409   const guint8 *message_header;
410   guint32 header_size;
411   ChunkType type;
412   gboolean has_abs_timestamp = FALSE;
413
414   g_return_val_if_fail (cstream, 0);
415   g_return_val_if_fail (cstream->id == gst_rtmp_chunk_stream_parse_id (data,
416           size), 0);
417
418   type = GST_READ_UINT8 (data) >> 6;
419   GST_TRACE ("Parsing chunk stream %" G_GUINT32_FORMAT " header type %d",
420       cstream->id, type);
421
422   switch (GST_READ_UINT8 (data) & CHUNK_BYTE_MASK) {
423     case CHUNK_BYTE_TWOBYTE:
424       header_size = 2;
425       break;
426     case CHUNK_BYTE_THREEBYTE:
427       header_size = 3;
428       break;
429     default:
430       header_size = 1;
431       break;
432   }
433
434   message_header = data + header_size;
435   header_size += chunk_header_sizes[type];
436
437   if (cstream->buffer) {
438     buffer = cstream->buffer;
439     meta = cstream->meta;
440     g_assert (meta->cstream == cstream->id);
441   } else {
442     buffer = gst_buffer_new ();
443     GST_BUFFER_DTS (buffer) = 0;
444     GST_BUFFER_OFFSET (buffer) = cstream->bytes;
445     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
446
447     meta = gst_buffer_add_rtmp_meta (buffer);
448     meta->cstream = cstream->id;
449
450     chunk_stream_take_buffer (cstream, buffer);
451     GST_DEBUG ("Starting parse with new %" GST_PTR_FORMAT, buffer);
452   }
453
454   if (size < header_size) {
455     GST_TRACE ("not enough bytes to read header");
456     return header_size;
457   }
458
459   switch (type) {
460     case CHUNK_TYPE_0:
461       has_abs_timestamp = TRUE;
462       /* SRSLY:  "Message stream ID is stored in little-endian format." */
463       meta->mstream = GST_READ_UINT32_LE (message_header + 7);
464       /* no break */
465     case CHUNK_TYPE_1:
466       meta->type = GST_READ_UINT8 (message_header + 6);
467       meta->size = GST_READ_UINT24_BE (message_header + 3);
468       /* no break */
469     case CHUNK_TYPE_2:
470       meta->ts_delta = GST_READ_UINT24_BE (message_header);
471       /* no break */
472     case CHUNK_TYPE_3:
473       if (needs_ext_ts (meta)) {
474         guint32 timestamp;
475
476         if (size < header_size + 4) {
477           GST_TRACE ("not enough bytes to read extended timestamp");
478           return header_size + 4;
479         }
480
481         GST_TRACE ("Reading extended timestamp");
482         timestamp = GST_READ_UINT32_BE (data + header_size);
483
484         if (type == 3 && meta->ts_delta != timestamp) {
485           GST_WARNING ("Type 3 extended timestamp does not match expected"
486               " timestamp (want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT
487               "); assuming it's not present", meta->ts_delta, timestamp);
488         } else {
489           meta->ts_delta = timestamp;
490           header_size += 4;
491         }
492       }
493   }
494
495   GST_MEMDUMP ("<<< chunk header", data, header_size);
496
497   if (!chunk_stream_is_open (cstream)) {
498     GstClockTime dts = GST_BUFFER_DTS (buffer);
499     guint32 delta_32, abs_32;
500     gint64 delta_64;
501
502     if (has_abs_timestamp) {
503       abs_32 = meta->ts_delta;
504       delta_32 = abs_32 - dts / GST_MSECOND;
505     } else {
506       delta_32 = meta->ts_delta;
507       abs_32 = delta_32 + dts / GST_MSECOND;
508     }
509
510     GST_TRACE ("Timestamp delta is %" G_GUINT32_FORMAT " (absolute %"
511         G_GUINT32_FORMAT ")", delta_32, abs_32);
512
513     /* emulate signed overflow */
514     delta_64 = delta_32;
515     if (delta_64 > G_MAXINT32) {
516       delta_64 -= G_MAXUINT32;
517       delta_64 -= 1;
518     }
519
520     delta_64 *= GST_MSECOND;
521
522     if (G_LIKELY (delta_64 >= 0)) {
523       /* Normal advancement */
524     } else if (G_LIKELY ((guint64) (-delta_64) <= dts)) {
525       /* In-bounds regression */
526       GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT,
527           GST_STIME_ARGS (delta_64));
528     } else {
529       /* Out-of-bounds regression */
530       GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT ", offsetting",
531           GST_STIME_ARGS (delta_64));
532       delta_64 = delta_32 * GST_MSECOND;
533     }
534
535     GST_BUFFER_DTS (buffer) += delta_64;
536
537     GST_TRACE ("Adjusted buffer DTS (%" GST_TIME_FORMAT ") by %"
538         GST_STIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (dts),
539         GST_STIME_ARGS (delta_64), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
540   } else {
541     GST_TRACE ("Message payload already started; not touching timestamp");
542   }
543
544   return header_size;
545 }
546
547 guint32
548 gst_rtmp_chunk_stream_parse_payload (GstRtmpChunkStream * cstream,
549     guint32 chunk_size, guint8 ** data)
550 {
551   GstMemory *mem;
552
553   g_return_val_if_fail (cstream, 0);
554   g_return_val_if_fail (cstream->buffer, 0);
555
556   if (!chunk_stream_is_open (cstream)) {
557     guint32 size = cstream->meta->size;
558
559     GST_TRACE ("Allocating buffer, payload size %" G_GUINT32_FORMAT, size);
560
561     mem = gst_allocator_alloc (NULL, size, 0);
562     if (!mem) {
563       GST_ERROR ("Failed to allocate buffer for payload size %"
564           G_GUINT32_FORMAT, size);
565       return 0;
566     }
567
568     gst_buffer_append_memory (cstream->buffer, mem);
569     gst_buffer_map (cstream->buffer, &cstream->map, GST_MAP_WRITE);
570   }
571
572   g_return_val_if_fail (cstream->map.size == cstream->meta->size, 0);
573
574   if (data) {
575     *data = cstream->map.data + cstream->offset;
576   }
577
578   return chunk_stream_next_size (cstream, chunk_size);
579 }
580
581 guint32
582 gst_rtmp_chunk_stream_wrote_payload (GstRtmpChunkStream * cstream,
583     guint32 chunk_size)
584 {
585   guint32 size;
586
587   g_return_val_if_fail (cstream, FALSE);
588   g_return_val_if_fail (chunk_stream_is_open (cstream), FALSE);
589
590   size = chunk_stream_next_size (cstream, chunk_size);
591   cstream->offset += size;
592   cstream->bytes += size;
593
594   return chunk_stream_next_size (cstream, chunk_size);
595 }
596
597 GstBuffer *
598 gst_rtmp_chunk_stream_parse_finish (GstRtmpChunkStream * cstream)
599 {
600   GstBuffer *buffer, *empty;
601
602   g_return_val_if_fail (cstream, NULL);
603   g_return_val_if_fail (cstream->buffer, NULL);
604
605   buffer = gst_buffer_ref (cstream->buffer);
606   GST_BUFFER_OFFSET_END (buffer) = cstream->bytes;
607
608   gst_rtmp_buffer_dump (buffer, "<<< message");
609
610   chunk_stream_clear (cstream);
611
612   empty = gst_buffer_new ();
613
614   if (!gst_buffer_copy_into (empty, buffer, GST_BUFFER_COPY_META, 0, 0)) {
615     GST_ERROR ("copy_into failed");
616     return NULL;
617   }
618
619   GST_BUFFER_DTS (empty) = GST_BUFFER_DTS (buffer);
620   GST_BUFFER_OFFSET (empty) = GST_BUFFER_OFFSET_END (buffer);
621
622   chunk_stream_take_buffer (cstream, empty);
623
624   return buffer;
625 }
626
627 GstBuffer *
628 gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream,
629     GstBuffer * buffer, guint32 chunk_size)
630 {
631   ChunkType type;
632
633   g_return_val_if_fail (cstream, NULL);
634   g_return_val_if_fail (GST_IS_BUFFER (buffer), NULL);
635
636   type = select_chunk_type (cstream, buffer);
637   g_return_val_if_fail (type >= 0, NULL);
638
639   GST_TRACE ("Starting serialization of message %" GST_PTR_FORMAT
640       " into stream %" G_GUINT32_FORMAT, buffer, cstream->id);
641
642   gst_rtmp_buffer_dump (buffer, ">>> message");
643
644   chunk_stream_clear (cstream);
645   chunk_stream_take_buffer (cstream, buffer);
646
647   return serialize_next (cstream, chunk_size, type);
648 }
649
650 GstBuffer *
651 gst_rtmp_chunk_stream_serialize_next (GstRtmpChunkStream * cstream,
652     guint32 chunk_size)
653 {
654   g_return_val_if_fail (cstream, NULL);
655   g_return_val_if_fail (cstream->buffer, NULL);
656
657   if (chunk_stream_next_size (cstream, chunk_size) == 0) {
658     GST_TRACE ("Message serialization finished");
659     return NULL;
660   }
661
662   GST_TRACE ("Continuing serialization of message %" GST_PTR_FORMAT
663       " into stream %" G_GUINT32_FORMAT, cstream->buffer, cstream->id);
664
665   return serialize_next (cstream, chunk_size, CHUNK_TYPE_3);
666 }
667
668 GstRtmpChunkStreams *
669 gst_rtmp_chunk_streams_new (void)
670 {
671   GstRtmpChunkStreams *cstreams;
672
673   init_debug ();
674
675   cstreams = g_slice_new (GstRtmpChunkStreams);
676   cstreams->array = g_array_new (FALSE, TRUE, sizeof (GstRtmpChunkStream));
677   g_array_set_clear_func (cstreams->array,
678       (GDestroyNotify) gst_rtmp_chunk_stream_clear);
679   return cstreams;
680 }
681
682 void
683 gst_rtmp_chunk_streams_free (gpointer ptr)
684 {
685   GstRtmpChunkStreams *cstreams = ptr;
686   g_clear_pointer (&cstreams->array, g_array_unref);
687   g_slice_free (GstRtmpChunkStreams, cstreams);
688 }
689
690 GstRtmpChunkStream *
691 gst_rtmp_chunk_streams_get (GstRtmpChunkStreams * cstreams, guint32 id)
692 {
693   GArray *array;
694   GstRtmpChunkStream *entry;
695   guint i;
696
697   g_return_val_if_fail (cstreams, NULL);
698   g_return_val_if_fail (id > CHUNK_BYTE_THREEBYTE, NULL);
699   g_return_val_if_fail (id <= CHUNK_STREAM_MAX_THREEBYTE, NULL);
700
701   array = cstreams->array;
702
703   for (i = 0; i < array->len; i++) {
704     entry = &g_array_index (array, GstRtmpChunkStream, i);
705     if (entry->id == id) {
706       GST_TRACE ("Obtaining chunk stream %" G_GUINT32_FORMAT, id);
707       return entry;
708     }
709   }
710
711   GST_DEBUG ("Allocating chunk stream %" G_GUINT32_FORMAT, id);
712
713   g_array_set_size (array, i + 1);
714   entry = &g_array_index (array, GstRtmpChunkStream, i);
715   entry->id = id;
716   return entry;
717 }