1 /* ASF RTP Payloader plugin for GStreamer
2 * Copyright (C) 2009 Thiago Santos <thiagoss@embedded.ufcg.edu.br>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * - this element doesn't follow (max/min) time properties,
22 * is it possible to do it with a container format?
29 #include <gst/rtp/gstrtpbuffer.h>
32 #include "gstrtpasfpay.h"
34 GST_DEBUG_CATEGORY_STATIC (rtpasfpay_debug);
35 #define GST_CAT_DEFAULT (rtpasfpay_debug)
37 static GstStaticPadTemplate gst_rtp_asf_pay_sink_template =
38 GST_STATIC_PAD_TEMPLATE ("sink",
41 GST_STATIC_CAPS ("video/x-ms-asf, " "parsed = (boolean) true")
44 static GstStaticPadTemplate gst_rtp_asf_pay_src_template =
45 GST_STATIC_PAD_TEMPLATE ("src",
48 GST_STATIC_CAPS ("application/x-rtp, "
49 "media = (string) {\"audio\", \"video\", \"application\"}, "
50 "clock-rate = (int) 1000, " "encoding-name = (string) \"X-ASF-PF\"")
54 gst_rtp_asf_pay_handle_buffer (GstRTPBasePayload * rtppay, GstBuffer * buffer);
56 gst_rtp_asf_pay_set_caps (GstRTPBasePayload * rtppay, GstCaps * caps);
58 #define gst_rtp_asf_pay_parent_class parent_class
59 G_DEFINE_TYPE (GstRtpAsfPay, gst_rtp_asf_pay, GST_TYPE_RTP_BASE_PAYLOAD);
60 GST_ELEMENT_REGISTER_DEFINE (rtpasfpay, "rtpasfpay",
61 GST_RANK_NONE, GST_TYPE_RTP_ASF_PAY);
64 gst_rtp_asf_pay_init (GstRtpAsfPay * rtpasfpay)
66 rtpasfpay->first_ts = 0;
67 rtpasfpay->config = NULL;
68 rtpasfpay->packets_count = 0;
69 rtpasfpay->state = ASF_NOT_STARTED;
70 rtpasfpay->headers = NULL;
71 rtpasfpay->current = NULL;
75 gst_rtp_asf_pay_finalize (GObject * object)
77 GstRtpAsfPay *rtpasfpay;
78 rtpasfpay = GST_RTP_ASF_PAY (object);
79 g_free (rtpasfpay->config);
80 if (rtpasfpay->headers)
81 gst_buffer_unref (rtpasfpay->headers);
82 G_OBJECT_CLASS (parent_class)->finalize (object);
86 gst_rtp_asf_pay_class_init (GstRtpAsfPayClass * klass)
88 GObjectClass *gobject_class;
89 GstElementClass *gstelement_class;
90 GstRTPBasePayloadClass *gstbasertppayload_class;
92 gobject_class = (GObjectClass *) klass;
93 gstelement_class = (GstElementClass *) klass;
94 gstbasertppayload_class = (GstRTPBasePayloadClass *) klass;
96 gobject_class->finalize = gst_rtp_asf_pay_finalize;
98 gstbasertppayload_class->handle_buffer = gst_rtp_asf_pay_handle_buffer;
99 gstbasertppayload_class->set_caps = gst_rtp_asf_pay_set_caps;
101 gst_element_class_add_static_pad_template (gstelement_class,
102 &gst_rtp_asf_pay_sink_template);
103 gst_element_class_add_static_pad_template (gstelement_class,
104 &gst_rtp_asf_pay_src_template);
105 gst_element_class_set_static_metadata (gstelement_class, "RTP ASF payloader",
106 "Codec/Payloader/Network",
107 "Payload-encodes ASF into RTP packets (MS_RTSP)",
108 "Thiago Santos <thiagoss@embedded.ufcg.edu.br>");
110 GST_DEBUG_CATEGORY_INIT (rtpasfpay_debug, "rtpasfpay", 0,
111 "ASF RTP Payloader");
115 gst_rtp_asf_pay_set_caps (GstRTPBasePayload * rtppay, GstCaps * caps)
117 /* FIXME change application for the actual content */
118 gst_rtp_base_payload_set_options (rtppay, "application", TRUE, "X-ASF-PF",
124 gst_rtp_asf_pay_handle_packet (GstRtpAsfPay * rtpasfpay, GstBuffer * buffer)
126 GstRTPBasePayload *rtppay;
127 GstAsfPacketInfo *packetinfo;
130 guint32 packet_util_size;
131 guint32 packet_offset;
133 GstFlowReturn ret = GST_FLOW_OK;
135 rtppay = GST_RTP_BASE_PAYLOAD (rtpasfpay);
136 packetinfo = &rtpasfpay->packetinfo;
138 if (!gst_asf_parse_packet (buffer, packetinfo, TRUE,
139 rtpasfpay->asfinfo.packet_size)) {
140 GST_ERROR_OBJECT (rtpasfpay, "Error while parsing asf packet");
141 gst_buffer_unref (buffer);
142 return GST_FLOW_ERROR;
145 if (packetinfo->packet_size == 0)
146 packetinfo->packet_size = rtpasfpay->asfinfo.packet_size;
148 GST_LOG_OBJECT (rtpasfpay, "Packet size: %" G_GUINT32_FORMAT
149 ", padding: %" G_GUINT32_FORMAT, packetinfo->packet_size,
150 packetinfo->padding);
152 /* update padding field to 0 */
153 if (packetinfo->padding > 0) {
154 GstAsfPacketInfo info;
155 /* find padding field offset */
156 guint offset = packetinfo->err_cor_len + 2 +
157 gst_asf_get_var_size_field_len (packetinfo->packet_field_type) +
158 gst_asf_get_var_size_field_len (packetinfo->seq_field_type);
159 buffer = gst_buffer_make_writable (buffer);
160 switch (packetinfo->padd_field_type) {
161 case ASF_FIELD_TYPE_DWORD:
162 gst_buffer_memset (buffer, offset, 0, 4);
164 case ASF_FIELD_TYPE_WORD:
165 gst_buffer_memset (buffer, offset, 0, 2);
167 case ASF_FIELD_TYPE_BYTE:
168 gst_buffer_memset (buffer, offset, 0, 1);
170 case ASF_FIELD_TYPE_NONE:
174 gst_asf_parse_packet (buffer, &info, FALSE, 0);
177 if (packetinfo->padding != 0)
178 packet_util_size = rtpasfpay->asfinfo.packet_size - packetinfo->padding;
180 packet_util_size = packetinfo->packet_size;
182 while (packet_util_size > 0) {
183 /* Even if we don't fill completely an output buffer we
184 * push it when we add an fragment. Because it seems that
185 * it is not possible to determine where a asf packet
186 * fragment ends inside a rtp packet payload.
187 * This flag tells us to push the packet.
189 gboolean force_push = FALSE;
190 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
192 /* we have no output buffer pending, create one */
193 if (rtpasfpay->current == NULL) {
194 GST_LOG_OBJECT (rtpasfpay, "Creating new output buffer");
196 gst_rtp_buffer_new_allocate_len (GST_RTP_BASE_PAYLOAD_MTU (rtpasfpay),
198 rtpasfpay->cur_off = 0;
199 rtpasfpay->has_ts = FALSE;
200 rtpasfpay->marker = FALSE;
202 gst_rtp_buffer_map (rtpasfpay->current, GST_MAP_READWRITE, &rtp);
203 data = gst_rtp_buffer_get_payload (&rtp);
204 data += rtpasfpay->cur_off;
205 size_left = gst_rtp_buffer_get_payload_len (&rtp) - rtpasfpay->cur_off;
207 GST_DEBUG_OBJECT (rtpasfpay, "Input buffer bytes consumed: %"
208 G_GUINT32_FORMAT "/%" G_GSIZE_FORMAT, packet_offset,
209 gst_buffer_get_size (buffer));
211 GST_DEBUG_OBJECT (rtpasfpay, "Output rtpbuffer status");
212 GST_DEBUG_OBJECT (rtpasfpay, "Current offset: %" G_GUINT32_FORMAT,
214 GST_DEBUG_OBJECT (rtpasfpay, "Size left: %" G_GUINT32_FORMAT, size_left);
215 GST_DEBUG_OBJECT (rtpasfpay, "Has ts: %s",
216 rtpasfpay->has_ts ? "yes" : "no");
217 if (rtpasfpay->has_ts) {
218 GST_DEBUG_OBJECT (rtpasfpay, "Ts: %" G_GUINT32_FORMAT, rtpasfpay->ts);
222 if (packetinfo->has_keyframe) {
223 flags = flags | 0x80;
225 flags = flags | 0x20; /* Relative timestamp is present */
227 if (!rtpasfpay->has_ts) {
228 /* this is the first asf packet, its send time is the
229 * rtp packet timestamp */
230 rtpasfpay->has_ts = TRUE;
231 rtpasfpay->ts = packetinfo->send_time;
234 if (size_left >= packet_util_size + 8) {
235 /* enough space for the rest of the packet */
236 if (packet_offset == 0) {
237 flags = flags | 0x40;
238 GST_WRITE_UINT24_BE (data + 1, packet_util_size);
240 GST_WRITE_UINT24_BE (data + 1, packet_offset);
244 GST_WRITE_UINT32_BE (data + 4,
245 (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts);
246 gst_buffer_extract (buffer, packet_offset, data + 8, packet_util_size);
248 /* updating status variables */
249 rtpasfpay->cur_off += 8 + packet_util_size;
250 size_left -= packet_util_size + 8;
251 packet_offset += packet_util_size;
252 packet_util_size = 0;
253 rtpasfpay->marker = TRUE;
255 /* fragment packet */
257 GST_WRITE_UINT24_BE (data + 1, packet_offset);
258 GST_WRITE_UINT32_BE (data + 4,
259 (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts);
260 gst_buffer_extract (buffer, packet_offset, data + 8, size_left - 8);
262 /* updating status variables */
263 rtpasfpay->cur_off += size_left;
264 packet_offset += size_left - 8;
265 packet_util_size -= size_left - 8;
270 /* there is not enough room for any more buffers */
271 if (force_push || size_left <= 8) {
273 gst_rtp_buffer_set_ssrc (&rtp, rtppay->current_ssrc);
274 gst_rtp_buffer_set_marker (&rtp, rtpasfpay->marker);
275 gst_rtp_buffer_set_payload_type (&rtp, GST_RTP_BASE_PAYLOAD_PT (rtppay));
276 gst_rtp_buffer_set_seq (&rtp, rtppay->seqnum + 1);
277 gst_rtp_buffer_set_timestamp (&rtp, packetinfo->send_time);
278 gst_rtp_buffer_unmap (&rtp);
280 /* trim remaining bytes not used */
281 if (size_left != 0) {
282 gst_buffer_set_size (rtpasfpay->current,
283 gst_buffer_get_size (rtpasfpay->current) - size_left);
286 GST_BUFFER_TIMESTAMP (rtpasfpay->current) = GST_BUFFER_TIMESTAMP (buffer);
289 rtppay->timestamp = packetinfo->send_time;
291 GST_DEBUG_OBJECT (rtpasfpay, "Pushing rtp buffer");
292 ret = gst_rtp_base_payload_push (rtppay, rtpasfpay->current);
293 rtpasfpay->current = NULL;
294 if (ret != GST_FLOW_OK) {
295 gst_buffer_unref (buffer);
300 gst_buffer_unref (buffer);
305 gst_rtp_asf_pay_parse_headers (GstRtpAsfPay * rtpasfpay)
310 g_return_val_if_fail (rtpasfpay->headers, GST_FLOW_ERROR);
312 if (!gst_asf_parse_headers (rtpasfpay->headers, &rtpasfpay->asfinfo))
315 GST_DEBUG_OBJECT (rtpasfpay, "Packets number: %" G_GUINT64_FORMAT,
316 rtpasfpay->asfinfo.packets_count);
317 GST_DEBUG_OBJECT (rtpasfpay, "Packets size: %" G_GUINT32_FORMAT,
318 rtpasfpay->asfinfo.packet_size);
319 GST_DEBUG_OBJECT (rtpasfpay, "Broadcast mode: %s",
320 rtpasfpay->asfinfo.broadcast ? "true" : "false");
322 /* get the config for caps */
323 g_free (rtpasfpay->config);
324 gst_buffer_map (rtpasfpay->headers, &map, GST_MAP_READ);
325 rtpasfpay->config = g_base64_encode (map.data, map.size);
326 gst_buffer_unmap (rtpasfpay->headers, &map);
327 GST_DEBUG_OBJECT (rtpasfpay, "Serialized headers to base64 string %s",
330 g_assert (rtpasfpay->config != NULL);
331 GST_DEBUG_OBJECT (rtpasfpay, "Setting optional caps values: maxps=%"
332 G_GUINT32_FORMAT " and config=%s", rtpasfpay->asfinfo.packet_size,
335 g_strdup_printf ("%" G_GUINT32_FORMAT, rtpasfpay->asfinfo.packet_size);
336 gst_rtp_base_payload_set_outcaps (GST_RTP_BASE_PAYLOAD (rtpasfpay), "maxps",
337 G_TYPE_STRING, maxps, "config", G_TYPE_STRING, rtpasfpay->config, NULL);
344 GST_ELEMENT_ERROR (rtpasfpay, STREAM, DECODE, (NULL),
345 ("Error parsing headers"));
346 return GST_FLOW_ERROR;
351 gst_rtp_asf_pay_handle_buffer (GstRTPBasePayload * rtppay, GstBuffer * buffer)
353 GstRtpAsfPay *rtpasfpay = GST_RTP_ASF_PAY_CAST (rtppay);
355 if (G_UNLIKELY (rtpasfpay->state == ASF_END)) {
356 GST_LOG_OBJECT (rtpasfpay,
357 "Dropping buffer as we already pushed all packets");
358 gst_buffer_unref (buffer);
359 return GST_FLOW_EOS; /* we already finished our job */
363 * we only accept if they are in a single buffer */
364 if (G_UNLIKELY (rtpasfpay->state == ASF_NOT_STARTED)) {
367 if (gst_buffer_get_size (buffer) < 24) { /* guid+object size size */
368 GST_ERROR_OBJECT (rtpasfpay,
369 "Buffer too small, smaller than a Guid and object size");
370 gst_buffer_unref (buffer);
371 return GST_FLOW_ERROR;
374 header_size = gst_asf_match_and_peek_obj_size_buf (buffer,
375 &(guids[ASF_HEADER_OBJECT_INDEX]));
376 if (header_size > 0) {
377 GST_DEBUG_OBJECT (rtpasfpay, "ASF header guid received, size %"
378 G_GUINT64_FORMAT, header_size);
380 if (gst_buffer_get_size (buffer) < header_size) {
381 GST_ERROR_OBJECT (rtpasfpay, "Headers should be contained in a single"
383 gst_buffer_unref (buffer);
384 return GST_FLOW_ERROR;
386 rtpasfpay->state = ASF_DATA_OBJECT;
388 /* clear previous headers, if any */
389 if (rtpasfpay->headers) {
390 gst_buffer_unref (rtpasfpay->headers);
393 GST_DEBUG_OBJECT (rtpasfpay, "Storing headers");
394 if (gst_buffer_get_size (buffer) == header_size) {
395 rtpasfpay->headers = buffer;
398 /* headers are a subbuffer of thie buffer */
399 GstBuffer *aux = gst_buffer_copy_region (buffer,
400 GST_BUFFER_COPY_ALL, header_size,
401 gst_buffer_get_size (buffer) - header_size);
402 rtpasfpay->headers = gst_buffer_copy_region (buffer,
403 GST_BUFFER_COPY_ALL, 0, header_size);
404 gst_buffer_replace (&buffer, aux);
408 GST_ERROR_OBJECT (rtpasfpay, "Missing ASF header start");
409 gst_buffer_unref (buffer);
410 return GST_FLOW_ERROR;
414 if (G_UNLIKELY (rtpasfpay->state == ASF_DATA_OBJECT)) {
417 if (gst_buffer_get_size (buffer) != ASF_DATA_OBJECT_SIZE) {
418 GST_ERROR_OBJECT (rtpasfpay, "Received buffer of different size of "
419 "the data object header");
420 gst_buffer_unref (buffer);
421 return GST_FLOW_ERROR;
424 gst_buffer_map (buffer, &map, GST_MAP_READ);
425 if (gst_asf_match_guid (map.data, &(guids[ASF_DATA_OBJECT_INDEX]))) {
426 gst_buffer_unmap (buffer, &map);
427 GST_DEBUG_OBJECT (rtpasfpay, "Received data object header");
428 rtpasfpay->headers = gst_buffer_append (rtpasfpay->headers, buffer);
429 rtpasfpay->state = ASF_PACKETS;
431 return gst_rtp_asf_pay_parse_headers (rtpasfpay);
433 gst_buffer_unmap (buffer, &map);
434 GST_ERROR_OBJECT (rtpasfpay, "Unexpected object received (was expecting "
436 gst_buffer_unref (buffer);
437 return GST_FLOW_ERROR;
441 if (G_LIKELY (rtpasfpay->state == ASF_PACKETS)) {
442 /* in broadcast mode we can't trust the packets count information
444 * We assume that if this is on broadcast mode it is a live stream
445 * and we are going to keep receiving packets indefinitely
447 if (rtpasfpay->asfinfo.broadcast ||
448 rtpasfpay->packets_count < rtpasfpay->asfinfo.packets_count) {
449 GST_DEBUG_OBJECT (rtpasfpay, "Received packet %"
450 G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
451 rtpasfpay->packets_count, rtpasfpay->asfinfo.packets_count);
452 rtpasfpay->packets_count++;
453 return gst_rtp_asf_pay_handle_packet (rtpasfpay, buffer);
455 GST_INFO_OBJECT (rtpasfpay, "Packets ended");
456 rtpasfpay->state = ASF_END;
457 gst_buffer_unref (buffer);
462 gst_buffer_unref (buffer);