videocodectestsink: Add YUV422 support
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / asfmux / gstrtpasfpay.c
1 /* ASF RTP Payloader plugin for GStreamer
2  * Copyright (C) 2009 Thiago Santos <thiagoss@embedded.ufcg.edu.br>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME
21  * - this element doesn't follow (max/min) time properties,
22  *   is it possible to do it with a container format?
23  */
24
25 #ifdef HAVE_CONFIG_H
26 #  include "config.h"
27 #endif
28
29 #include <gst/rtp/gstrtpbuffer.h>
30 #include <string.h>
31
32 #include "gstrtpasfpay.h"
33
34 GST_DEBUG_CATEGORY_STATIC (rtpasfpay_debug);
35 #define GST_CAT_DEFAULT (rtpasfpay_debug)
36
37 static GstStaticPadTemplate gst_rtp_asf_pay_sink_template =
38 GST_STATIC_PAD_TEMPLATE ("sink",
39     GST_PAD_SINK,
40     GST_PAD_ALWAYS,
41     GST_STATIC_CAPS ("video/x-ms-asf, " "parsed = (boolean) true")
42     );
43
44 static GstStaticPadTemplate gst_rtp_asf_pay_src_template =
45 GST_STATIC_PAD_TEMPLATE ("src",
46     GST_PAD_SRC,
47     GST_PAD_ALWAYS,
48     GST_STATIC_CAPS ("application/x-rtp, "
49         "media = (string) {\"audio\", \"video\", \"application\"}, "
50         "clock-rate = (int) 1000, " "encoding-name = (string) \"X-ASF-PF\"")
51     );
52
53 static GstFlowReturn
54 gst_rtp_asf_pay_handle_buffer (GstRTPBasePayload * rtppay, GstBuffer * buffer);
55 static gboolean
56 gst_rtp_asf_pay_set_caps (GstRTPBasePayload * rtppay, GstCaps * caps);
57
58 #define gst_rtp_asf_pay_parent_class parent_class
59 G_DEFINE_TYPE (GstRtpAsfPay, gst_rtp_asf_pay, GST_TYPE_RTP_BASE_PAYLOAD);
60 GST_ELEMENT_REGISTER_DEFINE (rtpasfpay, "rtpasfpay",
61     GST_RANK_NONE, GST_TYPE_RTP_ASF_PAY);
62
63 static void
64 gst_rtp_asf_pay_init (GstRtpAsfPay * rtpasfpay)
65 {
66   rtpasfpay->first_ts = 0;
67   rtpasfpay->config = NULL;
68   rtpasfpay->packets_count = 0;
69   rtpasfpay->state = ASF_NOT_STARTED;
70   rtpasfpay->headers = NULL;
71   rtpasfpay->current = NULL;
72 }
73
74 static void
75 gst_rtp_asf_pay_finalize (GObject * object)
76 {
77   GstRtpAsfPay *rtpasfpay;
78   rtpasfpay = GST_RTP_ASF_PAY (object);
79   g_free (rtpasfpay->config);
80   if (rtpasfpay->headers)
81     gst_buffer_unref (rtpasfpay->headers);
82   G_OBJECT_CLASS (parent_class)->finalize (object);
83 }
84
85 static void
86 gst_rtp_asf_pay_class_init (GstRtpAsfPayClass * klass)
87 {
88   GObjectClass *gobject_class;
89   GstElementClass *gstelement_class;
90   GstRTPBasePayloadClass *gstbasertppayload_class;
91
92   gobject_class = (GObjectClass *) klass;
93   gstelement_class = (GstElementClass *) klass;
94   gstbasertppayload_class = (GstRTPBasePayloadClass *) klass;
95
96   gobject_class->finalize = gst_rtp_asf_pay_finalize;
97
98   gstbasertppayload_class->handle_buffer = gst_rtp_asf_pay_handle_buffer;
99   gstbasertppayload_class->set_caps = gst_rtp_asf_pay_set_caps;
100
101   gst_element_class_add_static_pad_template (gstelement_class,
102       &gst_rtp_asf_pay_sink_template);
103   gst_element_class_add_static_pad_template (gstelement_class,
104       &gst_rtp_asf_pay_src_template);
105   gst_element_class_set_static_metadata (gstelement_class, "RTP ASF payloader",
106       "Codec/Payloader/Network",
107       "Payload-encodes ASF into RTP packets (MS_RTSP)",
108       "Thiago Santos <thiagoss@embedded.ufcg.edu.br>");
109
110   GST_DEBUG_CATEGORY_INIT (rtpasfpay_debug, "rtpasfpay", 0,
111       "ASF RTP Payloader");
112 }
113
114 static gboolean
115 gst_rtp_asf_pay_set_caps (GstRTPBasePayload * rtppay, GstCaps * caps)
116 {
117   /* FIXME change application for the actual content */
118   gst_rtp_base_payload_set_options (rtppay, "application", TRUE, "X-ASF-PF",
119       1000);
120   return TRUE;
121 }
122
123 static GstFlowReturn
124 gst_rtp_asf_pay_handle_packet (GstRtpAsfPay * rtpasfpay, GstBuffer * buffer)
125 {
126   GstRTPBasePayload *rtppay;
127   GstAsfPacketInfo *packetinfo;
128   guint8 flags;
129   guint8 *data;
130   guint32 packet_util_size;
131   guint32 packet_offset;
132   guint32 size_left;
133   GstFlowReturn ret = GST_FLOW_OK;
134
135   rtppay = GST_RTP_BASE_PAYLOAD (rtpasfpay);
136   packetinfo = &rtpasfpay->packetinfo;
137
138   if (!gst_asf_parse_packet (buffer, packetinfo, TRUE,
139           rtpasfpay->asfinfo.packet_size)) {
140     GST_ERROR_OBJECT (rtpasfpay, "Error while parsing asf packet");
141     gst_buffer_unref (buffer);
142     return GST_FLOW_ERROR;
143   }
144
145   if (packetinfo->packet_size == 0)
146     packetinfo->packet_size = rtpasfpay->asfinfo.packet_size;
147
148   GST_LOG_OBJECT (rtpasfpay, "Packet size: %" G_GUINT32_FORMAT
149       ", padding: %" G_GUINT32_FORMAT, packetinfo->packet_size,
150       packetinfo->padding);
151
152   /* update padding field to 0 */
153   if (packetinfo->padding > 0) {
154     GstAsfPacketInfo info;
155     /* find padding field offset */
156     guint offset = packetinfo->err_cor_len + 2 +
157         gst_asf_get_var_size_field_len (packetinfo->packet_field_type) +
158         gst_asf_get_var_size_field_len (packetinfo->seq_field_type);
159     buffer = gst_buffer_make_writable (buffer);
160     switch (packetinfo->padd_field_type) {
161       case ASF_FIELD_TYPE_DWORD:
162         gst_buffer_memset (buffer, offset, 0, 4);
163         break;
164       case ASF_FIELD_TYPE_WORD:
165         gst_buffer_memset (buffer, offset, 0, 2);
166         break;
167       case ASF_FIELD_TYPE_BYTE:
168         gst_buffer_memset (buffer, offset, 0, 1);
169         break;
170       case ASF_FIELD_TYPE_NONE:
171       default:
172         break;
173     }
174     gst_asf_parse_packet (buffer, &info, FALSE, 0);
175   }
176
177   if (packetinfo->padding != 0)
178     packet_util_size = rtpasfpay->asfinfo.packet_size - packetinfo->padding;
179   else
180     packet_util_size = packetinfo->packet_size;
181   packet_offset = 0;
182   while (packet_util_size > 0) {
183     /* Even if we don't fill completely an output buffer we
184      * push it when we add an fragment. Because it seems that
185      * it is not possible to determine where a asf packet
186      * fragment ends inside a rtp packet payload.
187      * This flag tells us to push the packet.
188      */
189     gboolean force_push = FALSE;
190     GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
191
192     /* we have no output buffer pending, create one */
193     if (rtpasfpay->current == NULL) {
194       GST_LOG_OBJECT (rtpasfpay, "Creating new output buffer");
195       rtpasfpay->current =
196           gst_rtp_buffer_new_allocate_len (GST_RTP_BASE_PAYLOAD_MTU (rtpasfpay),
197           0, 0);
198       rtpasfpay->cur_off = 0;
199       rtpasfpay->has_ts = FALSE;
200       rtpasfpay->marker = FALSE;
201     }
202     gst_rtp_buffer_map (rtpasfpay->current, GST_MAP_READWRITE, &rtp);
203     data = gst_rtp_buffer_get_payload (&rtp);
204     data += rtpasfpay->cur_off;
205     size_left = gst_rtp_buffer_get_payload_len (&rtp) - rtpasfpay->cur_off;
206
207     GST_DEBUG_OBJECT (rtpasfpay, "Input buffer bytes consumed: %"
208         G_GUINT32_FORMAT "/%" G_GSIZE_FORMAT, packet_offset,
209         gst_buffer_get_size (buffer));
210
211     GST_DEBUG_OBJECT (rtpasfpay, "Output rtpbuffer status");
212     GST_DEBUG_OBJECT (rtpasfpay, "Current offset: %" G_GUINT32_FORMAT,
213         rtpasfpay->cur_off);
214     GST_DEBUG_OBJECT (rtpasfpay, "Size left: %" G_GUINT32_FORMAT, size_left);
215     GST_DEBUG_OBJECT (rtpasfpay, "Has ts: %s",
216         rtpasfpay->has_ts ? "yes" : "no");
217     if (rtpasfpay->has_ts) {
218       GST_DEBUG_OBJECT (rtpasfpay, "Ts: %" G_GUINT32_FORMAT, rtpasfpay->ts);
219     }
220
221     flags = 0;
222     if (packetinfo->has_keyframe) {
223       flags = flags | 0x80;
224     }
225     flags = flags | 0x20;       /* Relative timestamp is present */
226
227     if (!rtpasfpay->has_ts) {
228       /* this is the first asf packet, its send time is the 
229        * rtp packet timestamp */
230       rtpasfpay->has_ts = TRUE;
231       rtpasfpay->ts = packetinfo->send_time;
232     }
233
234     if (size_left >= packet_util_size + 8) {
235       /* enough space for the rest of the packet */
236       if (packet_offset == 0) {
237         flags = flags | 0x40;
238         GST_WRITE_UINT24_BE (data + 1, packet_util_size);
239       } else {
240         GST_WRITE_UINT24_BE (data + 1, packet_offset);
241         force_push = TRUE;
242       }
243       data[0] = flags;
244       GST_WRITE_UINT32_BE (data + 4,
245           (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts);
246       gst_buffer_extract (buffer, packet_offset, data + 8, packet_util_size);
247
248       /* updating status variables */
249       rtpasfpay->cur_off += 8 + packet_util_size;
250       size_left -= packet_util_size + 8;
251       packet_offset += packet_util_size;
252       packet_util_size = 0;
253       rtpasfpay->marker = TRUE;
254     } else {
255       /* fragment packet */
256       data[0] = flags;
257       GST_WRITE_UINT24_BE (data + 1, packet_offset);
258       GST_WRITE_UINT32_BE (data + 4,
259           (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts);
260       gst_buffer_extract (buffer, packet_offset, data + 8, size_left - 8);
261
262       /* updating status variables */
263       rtpasfpay->cur_off += size_left;
264       packet_offset += size_left - 8;
265       packet_util_size -= size_left - 8;
266       size_left = 0;
267       force_push = TRUE;
268     }
269
270     /* there is not enough room for any more buffers */
271     if (force_push || size_left <= 8) {
272
273       gst_rtp_buffer_set_ssrc (&rtp, rtppay->current_ssrc);
274       gst_rtp_buffer_set_marker (&rtp, rtpasfpay->marker);
275       gst_rtp_buffer_set_payload_type (&rtp, GST_RTP_BASE_PAYLOAD_PT (rtppay));
276       gst_rtp_buffer_set_seq (&rtp, rtppay->seqnum + 1);
277       gst_rtp_buffer_set_timestamp (&rtp, packetinfo->send_time);
278       gst_rtp_buffer_unmap (&rtp);
279
280       /* trim remaining bytes not used */
281       if (size_left != 0) {
282         gst_buffer_set_size (rtpasfpay->current,
283             gst_buffer_get_size (rtpasfpay->current) - size_left);
284       }
285
286       GST_BUFFER_TIMESTAMP (rtpasfpay->current) = GST_BUFFER_TIMESTAMP (buffer);
287
288       rtppay->seqnum++;
289       rtppay->timestamp = packetinfo->send_time;
290
291       GST_DEBUG_OBJECT (rtpasfpay, "Pushing rtp buffer");
292       ret = gst_rtp_base_payload_push (rtppay, rtpasfpay->current);
293       rtpasfpay->current = NULL;
294       if (ret != GST_FLOW_OK) {
295         gst_buffer_unref (buffer);
296         return ret;
297       }
298     }
299   }
300   gst_buffer_unref (buffer);
301   return ret;
302 }
303
304 static GstFlowReturn
305 gst_rtp_asf_pay_parse_headers (GstRtpAsfPay * rtpasfpay)
306 {
307   gchar *maxps;
308   GstMapInfo map;
309
310   g_return_val_if_fail (rtpasfpay->headers, GST_FLOW_ERROR);
311
312   if (!gst_asf_parse_headers (rtpasfpay->headers, &rtpasfpay->asfinfo))
313     goto error;
314
315   GST_DEBUG_OBJECT (rtpasfpay, "Packets number: %" G_GUINT64_FORMAT,
316       rtpasfpay->asfinfo.packets_count);
317   GST_DEBUG_OBJECT (rtpasfpay, "Packets size: %" G_GUINT32_FORMAT,
318       rtpasfpay->asfinfo.packet_size);
319   GST_DEBUG_OBJECT (rtpasfpay, "Broadcast mode: %s",
320       rtpasfpay->asfinfo.broadcast ? "true" : "false");
321
322   /* get the config for caps */
323   g_free (rtpasfpay->config);
324   gst_buffer_map (rtpasfpay->headers, &map, GST_MAP_READ);
325   rtpasfpay->config = g_base64_encode (map.data, map.size);
326   gst_buffer_unmap (rtpasfpay->headers, &map);
327   GST_DEBUG_OBJECT (rtpasfpay, "Serialized headers to base64 string %s",
328       rtpasfpay->config);
329
330   g_assert (rtpasfpay->config != NULL);
331   GST_DEBUG_OBJECT (rtpasfpay, "Setting optional caps values: maxps=%"
332       G_GUINT32_FORMAT " and config=%s", rtpasfpay->asfinfo.packet_size,
333       rtpasfpay->config);
334   maxps =
335       g_strdup_printf ("%" G_GUINT32_FORMAT, rtpasfpay->asfinfo.packet_size);
336   gst_rtp_base_payload_set_outcaps (GST_RTP_BASE_PAYLOAD (rtpasfpay), "maxps",
337       G_TYPE_STRING, maxps, "config", G_TYPE_STRING, rtpasfpay->config, NULL);
338   g_free (maxps);
339
340   return GST_FLOW_OK;
341
342 error:
343   {
344     GST_ELEMENT_ERROR (rtpasfpay, STREAM, DECODE, (NULL),
345         ("Error parsing headers"));
346     return GST_FLOW_ERROR;
347   }
348 }
349
350 static GstFlowReturn
351 gst_rtp_asf_pay_handle_buffer (GstRTPBasePayload * rtppay, GstBuffer * buffer)
352 {
353   GstRtpAsfPay *rtpasfpay = GST_RTP_ASF_PAY_CAST (rtppay);
354
355   if (G_UNLIKELY (rtpasfpay->state == ASF_END)) {
356     GST_LOG_OBJECT (rtpasfpay,
357         "Dropping buffer as we already pushed all packets");
358     gst_buffer_unref (buffer);
359     return GST_FLOW_EOS;        /* we already finished our job */
360   }
361
362   /* receive headers 
363    * we only accept if they are in a single buffer */
364   if (G_UNLIKELY (rtpasfpay->state == ASF_NOT_STARTED)) {
365     guint64 header_size;
366
367     if (gst_buffer_get_size (buffer) < 24) {    /* guid+object size size */
368       GST_ERROR_OBJECT (rtpasfpay,
369           "Buffer too small, smaller than a Guid and object size");
370       gst_buffer_unref (buffer);
371       return GST_FLOW_ERROR;
372     }
373
374     header_size = gst_asf_match_and_peek_obj_size_buf (buffer,
375         &(guids[ASF_HEADER_OBJECT_INDEX]));
376     if (header_size > 0) {
377       GST_DEBUG_OBJECT (rtpasfpay, "ASF header guid received, size %"
378           G_GUINT64_FORMAT, header_size);
379
380       if (gst_buffer_get_size (buffer) < header_size) {
381         GST_ERROR_OBJECT (rtpasfpay, "Headers should be contained in a single"
382             " buffer");
383         gst_buffer_unref (buffer);
384         return GST_FLOW_ERROR;
385       } else {
386         rtpasfpay->state = ASF_DATA_OBJECT;
387
388         /* clear previous headers, if any */
389         if (rtpasfpay->headers) {
390           gst_buffer_unref (rtpasfpay->headers);
391         }
392
393         GST_DEBUG_OBJECT (rtpasfpay, "Storing headers");
394         if (gst_buffer_get_size (buffer) == header_size) {
395           rtpasfpay->headers = buffer;
396           return GST_FLOW_OK;
397         } else {
398           /* headers are a subbuffer of thie buffer */
399           GstBuffer *aux = gst_buffer_copy_region (buffer,
400               GST_BUFFER_COPY_ALL, header_size,
401               gst_buffer_get_size (buffer) - header_size);
402           rtpasfpay->headers = gst_buffer_copy_region (buffer,
403               GST_BUFFER_COPY_ALL, 0, header_size);
404           gst_buffer_replace (&buffer, aux);
405         }
406       }
407     } else {
408       GST_ERROR_OBJECT (rtpasfpay, "Missing ASF header start");
409       gst_buffer_unref (buffer);
410       return GST_FLOW_ERROR;
411     }
412   }
413
414   if (G_UNLIKELY (rtpasfpay->state == ASF_DATA_OBJECT)) {
415     GstMapInfo map;
416
417     if (gst_buffer_get_size (buffer) != ASF_DATA_OBJECT_SIZE) {
418       GST_ERROR_OBJECT (rtpasfpay, "Received buffer of different size of "
419           "the data object header");
420       gst_buffer_unref (buffer);
421       return GST_FLOW_ERROR;
422     }
423
424     gst_buffer_map (buffer, &map, GST_MAP_READ);
425     if (gst_asf_match_guid (map.data, &(guids[ASF_DATA_OBJECT_INDEX]))) {
426       gst_buffer_unmap (buffer, &map);
427       GST_DEBUG_OBJECT (rtpasfpay, "Received data object header");
428       rtpasfpay->headers = gst_buffer_append (rtpasfpay->headers, buffer);
429       rtpasfpay->state = ASF_PACKETS;
430
431       return gst_rtp_asf_pay_parse_headers (rtpasfpay);
432     } else {
433       gst_buffer_unmap (buffer, &map);
434       GST_ERROR_OBJECT (rtpasfpay, "Unexpected object received (was expecting "
435           "data object)");
436       gst_buffer_unref (buffer);
437       return GST_FLOW_ERROR;
438     }
439   }
440
441   if (G_LIKELY (rtpasfpay->state == ASF_PACKETS)) {
442     /* in broadcast mode we can't trust the packets count information
443      * from the headers
444      * We assume that if this is on broadcast mode it is a live stream
445      * and we are going to keep receiving packets indefinitely
446      */
447     if (rtpasfpay->asfinfo.broadcast ||
448         rtpasfpay->packets_count < rtpasfpay->asfinfo.packets_count) {
449       GST_DEBUG_OBJECT (rtpasfpay, "Received packet %"
450           G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
451           rtpasfpay->packets_count, rtpasfpay->asfinfo.packets_count);
452       rtpasfpay->packets_count++;
453       return gst_rtp_asf_pay_handle_packet (rtpasfpay, buffer);
454     } else {
455       GST_INFO_OBJECT (rtpasfpay, "Packets ended");
456       rtpasfpay->state = ASF_END;
457       gst_buffer_unref (buffer);
458       return GST_FLOW_EOS;
459     }
460   }
461
462   gst_buffer_unref (buffer);
463   return GST_FLOW_OK;
464 }