"clock-rate = (int) 90000, " "encoding-name = (string) \"JPEG2000\"")
);
+typedef enum
+{
+ J2K_MARKER = 0xFF,
+ J2K_MARKER_SOC = 0x4F,
+ J2K_MARKER_SOT = 0x90,
+ J2K_MARKER_SOP = 0x91,
+ J2K_MARKER_SOD = 0x93,
+ J2K_MARKER_EOC = 0xD9
+} RtpJ2KMarker;
+
GST_BOILERPLATE (GstRtpJ2KDepay, gst_rtp_j2k_depay, GstBaseRTPDepayload,
GST_TYPE_BASE_RTP_DEPAYLOAD);
gst_rtp_j2k_depay_init (GstRtpJ2KDepay * rtpj2kdepay,
GstRtpJ2KDepayClass * klass)
{
- rtpj2kdepay->adapter = gst_adapter_new ();
+ rtpj2kdepay->pu_adapter = gst_adapter_new ();
+ rtpj2kdepay->t_adapter = gst_adapter_new ();
+ rtpj2kdepay->f_adapter = gst_adapter_new ();
+}
+
+static void
+store_mheader (GstRtpJ2KDepay * rtpj2kdepay, guint idx, GstBuffer * buf)
+{
+ GstBuffer *old;
+
+ GST_DEBUG_OBJECT (rtpj2kdepay, "storing main header %p at index %u", buf,
+ idx);
+ if ((old = rtpj2kdepay->MH[idx]))
+ gst_buffer_unref (old);
+ rtpj2kdepay->MH[idx] = buf;
+}
+
+static void
+clear_mheaders (GstRtpJ2KDepay * rtpj2kdepay)
+{
+ guint i;
+
+ for (i = 0; i < 8; i++)
+ store_mheader (rtpj2kdepay, i, NULL);
+}
+
+static void
+gst_rtp_j2k_depay_reset (GstRtpJ2KDepay * rtpj2kdepay)
+{
+ clear_mheaders (rtpj2kdepay);
+ gst_adapter_clear (rtpj2kdepay->pu_adapter);
+ gst_adapter_clear (rtpj2kdepay->t_adapter);
+ gst_adapter_clear (rtpj2kdepay->f_adapter);
+ rtpj2kdepay->next_frag = 0;
}
static void
rtpj2kdepay = GST_RTP_J2K_DEPAY (object);
- g_object_unref (rtpj2kdepay->adapter);
- rtpj2kdepay->adapter = NULL;
+ clear_mheaders (rtpj2kdepay);
+
+ g_object_unref (rtpj2kdepay->pu_adapter);
+ g_object_unref (rtpj2kdepay->t_adapter);
+ g_object_unref (rtpj2kdepay->f_adapter);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
return res;
}
-static GstBuffer *
-gst_rtp_j2k_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
+static void
+gst_rtp_j2k_depay_clear_pu (GstRtpJ2KDepay * rtpj2kdepay)
+{
+ gst_adapter_clear (rtpj2kdepay->pu_adapter);
+ rtpj2kdepay->have_sync = FALSE;
+}
+
+static GstFlowReturn
+gst_rtp_j2k_depay_flush_pu (GstBaseRTPDepayload * depayload)
{
GstRtpJ2KDepay *rtpj2kdepay;
- GstBuffer *outbuf;
- guint8 *payload;
- guint frag_offset;
+ GstBuffer *mheader;
+ guint avail, MHF, mh_id;
rtpj2kdepay = GST_RTP_J2K_DEPAY (depayload);
- /* flush everything on discont for now */
- if (GST_BUFFER_IS_DISCONT (buf)) {
- GST_DEBUG_OBJECT (rtpj2kdepay, "DISCONT, flushing data");
- gst_adapter_clear (rtpj2kdepay->adapter);
- rtpj2kdepay->need_header = TRUE;
+ /* take all available buffers */
+ avail = gst_adapter_available (rtpj2kdepay->pu_adapter);
+ if (avail == 0)
+ goto done;
+
+ MHF = rtpj2kdepay->pu_MHF;
+ mh_id = rtpj2kdepay->last_mh_id;
+
+ GST_DEBUG_OBJECT (rtpj2kdepay, "flushing PU of size %u", avail);
+
+ if (MHF == 0) {
+ GList *packets, *walk;
+
+ packets = gst_adapter_take_list (rtpj2kdepay->pu_adapter, avail);
+ /* append packets */
+ for (walk = packets; walk; walk = g_list_next (walk)) {
+ GstBuffer *buf = GST_BUFFER_CAST (walk->data);
+ GST_DEBUG_OBJECT (rtpj2kdepay, "append pu packet of size %u",
+ GST_BUFFER_SIZE (buf));
+ gst_adapter_push (rtpj2kdepay->t_adapter, buf);
+ }
+ g_list_free (packets);
+ } else {
+ /* we have a header */
+ GST_DEBUG_OBJECT (rtpj2kdepay, "keeping header %u", mh_id);
+ /* we managed to see the start and end of the header, take all from
+ * adapter and keep in header */
+ mheader = gst_adapter_take_buffer (rtpj2kdepay->pu_adapter, avail);
+
+ store_mheader (rtpj2kdepay, mh_id, mheader);
}
- if (gst_rtp_buffer_get_payload_len (buf) < 8)
- goto empty_packet;
+done:
+ rtpj2kdepay->have_sync = FALSE;
- payload = gst_rtp_buffer_get_payload (buf);
+ return GST_FLOW_OK;
+}
- /*
- * 0 1 2 3
- * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- * |tp |MHF|mh_id|T| priority | tile number |
- * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- * |reserved | fragment offset |
- * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- */
- frag_offset = (payload[5] << 16) | (payload[6] << 8) | payload[7];
+static GstFlowReturn
+gst_rtp_j2k_depay_flush_tile (GstBaseRTPDepayload * depayload)
+{
+ GstRtpJ2KDepay *rtpj2kdepay;
+ guint avail, mh_id;
+ GList *packets, *walk;
+ guint8 end[2];
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ rtpj2kdepay = GST_RTP_J2K_DEPAY (depayload);
- GST_DEBUG_OBJECT (rtpj2kdepay, "frag %u", frag_offset);
+ /* flush pending PU */
+ gst_rtp_j2k_depay_flush_pu (depayload);
- if (rtpj2kdepay->need_header) {
- if (frag_offset != 0)
+ /* take all available buffers */
+ avail = gst_adapter_available (rtpj2kdepay->t_adapter);
+ if (avail == 0)
+ goto done;
+
+ mh_id = rtpj2kdepay->last_mh_id;
+
+ GST_DEBUG_OBJECT (rtpj2kdepay, "flushing tile of size %u", avail);
+
+ if (gst_adapter_available (rtpj2kdepay->f_adapter) == 0) {
+ GstBuffer *mheader;
+
+ /* we need a header now */
+ if ((mheader = rtpj2kdepay->MH[mh_id]) == NULL)
goto waiting_header;
- rtpj2kdepay->need_header = FALSE;
+ /* push header in the adapter */
+ GST_DEBUG_OBJECT (rtpj2kdepay, "pushing header %u", mh_id);
+ gst_adapter_push (rtpj2kdepay->f_adapter, gst_buffer_ref (mheader));
}
- /* take JPEG 2000 data, push in the adapter */
- outbuf = gst_rtp_buffer_get_payload_subbuffer (buf, 8, -1);
- gst_adapter_push (rtpj2kdepay->adapter, outbuf);
- outbuf = NULL;
+ /* check for last bytes */
+ gst_adapter_copy (rtpj2kdepay->t_adapter, end, avail - 2, 2);
+
+ /* now append the tile packets to the frame */
+ packets = gst_adapter_take_list (rtpj2kdepay->t_adapter, avail);
+ for (walk = packets; walk; walk = g_list_next (walk)) {
+ GstBuffer *buf = GST_BUFFER_CAST (walk->data);
+
+ if (walk == packets) {
+ guint8 *data;
+ guint size;
+
+ /* first buffer should contain the SOT */
+ data = GST_BUFFER_DATA (buf);
+ size = GST_BUFFER_SIZE (buf);
+
+ if (size < 12)
+ goto invalid_tile;
+
+ if (data[0] == 0xff && data[1] == J2K_MARKER_SOT) {
+ guint Psot, nPsot;
+
+ if (end[0] == 0xff && end[1] == J2K_MARKER_EOC)
+ nPsot = avail - 2;
+ else
+ nPsot = avail;
+
+ Psot = GST_READ_UINT32_BE (&data[6]);
+ if (Psot != nPsot && Psot != 0) {
+ /* Psot must match the size of the tile */
+ GST_DEBUG_OBJECT (rtpj2kdepay, "set Psot from %u to %u", Psot, nPsot);
+ buf = gst_buffer_make_writable (buf);
+ data = GST_BUFFER_DATA (buf);
+ GST_WRITE_UINT32_BE (&data[6], nPsot);
+ }
+ }
+ }
- if (gst_rtp_buffer_get_marker (buf)) {
- guint avail;
- guint8 end[2];
- guint8 *data;
+ GST_DEBUG_OBJECT (rtpj2kdepay, "append pu packet of size %u",
+ GST_BUFFER_SIZE (buf));
+ gst_adapter_push (rtpj2kdepay->f_adapter, buf);
+ }
+ g_list_free (packets);
- /* last buffer take all data out of the adapter */
- avail = gst_adapter_available (rtpj2kdepay->adapter);
- GST_DEBUG_OBJECT (rtpj2kdepay, "marker set, last buffer");
+done:
+ rtpj2kdepay->last_tile = -1;
+ return ret;
+
+ /* errors */
+waiting_header:
+ {
+ GST_DEBUG_OBJECT (rtpj2kdepay, "waiting for header %u", mh_id);
+ gst_adapter_clear (rtpj2kdepay->t_adapter);
+ rtpj2kdepay->last_tile = -1;
+ return ret;
+ }
+invalid_tile:
+ {
+ GST_ELEMENT_WARNING (rtpj2kdepay, STREAM, DECODE, ("Invalid tile"), (NULL));
+ gst_adapter_clear (rtpj2kdepay->t_adapter);
+ rtpj2kdepay->last_tile = -1;
+ return ret;
+ }
+}
+
+static GstFlowReturn
+gst_rtp_j2k_depay_flush_frame (GstBaseRTPDepayload * depayload)
+{
+ GstRtpJ2KDepay *rtpj2kdepay;
+ guint8 end[2];
+ guint8 *data;
+ guint avail;
+ GstBuffer *outbuf;
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ rtpj2kdepay = GST_RTP_J2K_DEPAY (depayload);
+
+ /* flush pending tile */
+ gst_rtp_j2k_depay_flush_tile (depayload);
+
+ /* last buffer take all data out of the adapter */
+ avail = gst_adapter_available (rtpj2kdepay->f_adapter);
+ if (avail == 0)
+ goto done;
+
+ if (avail > 2) {
/* take the last bytes of the JPEG 2000 data to see if there is an EOC
* marker */
- gst_adapter_copy (rtpj2kdepay->adapter, end, avail - 2, 2);
+ gst_adapter_copy (rtpj2kdepay->f_adapter, end, avail - 2, 2);
if (end[0] != 0xff && end[1] != 0xd9) {
GST_DEBUG_OBJECT (rtpj2kdepay, "no EOC marker, adding one");
data[0] = 0xff;
data[1] = 0xd9;
- gst_adapter_push (rtpj2kdepay->adapter, outbuf);
+ gst_adapter_push (rtpj2kdepay->f_adapter, outbuf);
avail += 2;
}
- outbuf = gst_adapter_take_buffer (rtpj2kdepay->adapter, avail);
- GST_DEBUG_OBJECT (rtpj2kdepay, "returning %u bytes", avail);
+ GST_DEBUG_OBJECT (rtpj2kdepay, "pushing %u bytes", avail);
+ outbuf = gst_adapter_take_buffer (rtpj2kdepay->f_adapter, avail);
+ ret = gst_base_rtp_depayload_push (depayload, outbuf);
+ } else {
+ GST_WARNING_OBJECT (rtpj2kdepay, "empty packet");
+ gst_adapter_clear (rtpj2kdepay->f_adapter);
}
- return outbuf;
+ /* we accept any mh_id now */
+ rtpj2kdepay->last_mh_id = -1;
+
+ /* reset state */
+ rtpj2kdepay->next_frag = 0;
+ rtpj2kdepay->have_sync = FALSE;
+
+done:
+ /* we can't keep headers with mh_id of 0 */
+ store_mheader (rtpj2kdepay, 0, NULL);
+
+ return ret;
+}
+
+static GstBuffer *
+gst_rtp_j2k_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
+{
+ GstRtpJ2KDepay *rtpj2kdepay;
+ guint8 *payload;
+ guint MHF, mh_id, frag_offset, tile, payload_len, j2klen;
+ gint gap;
+ guint32 rtptime;
+
+ rtpj2kdepay = GST_RTP_J2K_DEPAY (depayload);
+
+ payload = gst_rtp_buffer_get_payload (buf);
+ payload_len = gst_rtp_buffer_get_payload_len (buf);
+
+ /* we need at least a header */
+ if (payload_len < 8)
+ goto empty_packet;
+
+ rtptime = gst_rtp_buffer_get_timestamp (buf);
+
+ /* new timestamp marks new frame */
+ if (rtpj2kdepay->last_rtptime != rtptime) {
+ rtpj2kdepay->last_rtptime = rtptime;
+ /* flush pending frame */
+ gst_rtp_j2k_depay_flush_frame (depayload);
+ }
+
+ /*
+ * 0 1 2 3
+ * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |tp |MHF|mh_id|T| priority | tile number |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |reserved | fragment offset |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+ MHF = (payload[0] & 0x30) >> 4;
+ mh_id = (payload[0] & 0xe) >> 1;
+
+ if (rtpj2kdepay->last_mh_id == -1)
+ rtpj2kdepay->last_mh_id = mh_id;
+ else if (rtpj2kdepay->last_mh_id != mh_id)
+ goto wrong_mh_id;
+
+ tile = (payload[2] << 8) | payload[3];
+ frag_offset = (payload[5] << 16) | (payload[6] << 8) | payload[7];
+ j2klen = payload_len - 8;
+
+ GST_DEBUG_OBJECT (rtpj2kdepay, "MHF %u, tile %u, frag %u, expected %u", MHF,
+ tile, frag_offset, rtpj2kdepay->next_frag);
+
+ /* calculate the gap between expected frag */
+ gap = frag_offset - rtpj2kdepay->next_frag;
+ /* calculate next frag */
+ rtpj2kdepay->next_frag = frag_offset + j2klen;
+
+ if (gap != 0) {
+ GST_DEBUG_OBJECT (rtpj2kdepay, "discont of %d, clear PU", gap);
+ /* discont, clear pu adapter and resync */
+ gst_rtp_j2k_depay_clear_pu (rtpj2kdepay);
+ }
+
+ /* check for sync code */
+ if (j2klen > 2 && payload[8] == 0xff) {
+ guint marker = payload[9];
+
+ /* packets must start with SOC, SOT or SOP */
+ switch (marker) {
+ case J2K_MARKER_SOC:
+ GST_DEBUG_OBJECT (rtpj2kdepay, "found SOC packet");
+ /* flush the previous frame, should have happened when the timestamp
+ * changed above. */
+ gst_rtp_j2k_depay_flush_frame (depayload);
+ rtpj2kdepay->have_sync = TRUE;
+ break;
+ case J2K_MARKER_SOT:
+ /* flush the previous tile */
+ gst_rtp_j2k_depay_flush_tile (depayload);
+ GST_DEBUG_OBJECT (rtpj2kdepay, "found SOT packet");
+ rtpj2kdepay->have_sync = TRUE;
+ /* we sync on the tile now */
+ rtpj2kdepay->last_tile = tile;
+ break;
+ case J2K_MARKER_SOP:
+ GST_DEBUG_OBJECT (rtpj2kdepay, "found SOP packet");
+ /* flush the previous PU */
+ gst_rtp_j2k_depay_flush_pu (depayload);
+ if (rtpj2kdepay->last_tile != tile) {
+ /* wrong tile, we lose sync and we need a new SOT or SOC to regain
+ * sync. First flush out the previous tile if we have one. */
+ if (rtpj2kdepay->last_tile != -1)
+ gst_rtp_j2k_depay_flush_tile (depayload);
+ /* now we have no more valid tile and no sync */
+ rtpj2kdepay->last_tile = -1;
+ rtpj2kdepay->have_sync = FALSE;
+ } else {
+ rtpj2kdepay->have_sync = TRUE;
+ }
+ break;
+ default:
+ GST_DEBUG_OBJECT (rtpj2kdepay, "no sync packet 0x%02d", marker);
+ break;
+ }
+ }
+
+ if (rtpj2kdepay->have_sync) {
+ GstBuffer *pu_frag;
+
+ if (gst_adapter_available (rtpj2kdepay->pu_adapter) == 0) {
+ /* first part of pu, record state */
+ GST_DEBUG_OBJECT (rtpj2kdepay, "first PU");
+ rtpj2kdepay->pu_MHF = MHF;
+ }
+ /* and push in pu adapter */
+ GST_DEBUG_OBJECT (rtpj2kdepay, "push pu of size %u in adapter", j2klen);
+ pu_frag = gst_rtp_buffer_get_payload_subbuffer (buf, 8, -1);
+ gst_adapter_push (rtpj2kdepay->pu_adapter, pu_frag);
+
+ if (MHF & 2) {
+ /* last part of main header received, we can flush it */
+ GST_DEBUG_OBJECT (rtpj2kdepay, "header end, flush pu");
+ gst_rtp_j2k_depay_flush_pu (depayload);
+ }
+ } else {
+ GST_DEBUG_OBJECT (rtpj2kdepay, "discard packet, no sync");
+ }
+
+ /* marker bit finishes the frame */
+ if (gst_rtp_buffer_get_marker (buf)) {
+ GST_DEBUG_OBJECT (rtpj2kdepay, "marker set, last buffer");
+ /* then flush frame */
+ gst_rtp_j2k_depay_flush_frame (depayload);
+ }
+ return NULL;
/* ERRORS */
empty_packet:
("Empty Payload."), (NULL));
return NULL;
}
-waiting_header:
+wrong_mh_id:
{
- GST_DEBUG_OBJECT (rtpj2kdepay, "we are waiting for a header");
+ GST_ELEMENT_WARNING (rtpj2kdepay, STREAM, DECODE,
+ ("Invalid mh_id %u, expected %u", mh_id, rtpj2kdepay->last_mh_id),
+ (NULL));
+ gst_rtp_j2k_depay_clear_pu (rtpj2kdepay);
return NULL;
}
}
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
- gst_adapter_clear (rtpj2kdepay->adapter);
- rtpj2kdepay->need_header = TRUE;
+ gst_rtp_j2k_depay_reset (rtpj2kdepay);
break;
default:
break;
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
- gst_adapter_clear (rtpj2kdepay->adapter);
+ gst_rtp_j2k_depay_reset (rtpj2kdepay);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;