enum
{
PROP_0,
- PROP_MTU
+ PROP_MTU,
+ PROP_MEASUREMENT_INTERVAL,
+ PROP_MAX_INTERVAL_FRAME
};
#define DEFAULT_MTU 1500
+#define DEFAULT_MEASUREMENT_INTERVAL 250000
+#define DEFAULT_MAX_INTERVAL_FRAMES 1
#define AVTP_CVF_H264_HEADER_SIZE (sizeof(struct avtp_stream_pdu) + sizeof(guint32))
#define FU_A_TYPE 28
"Maximum Transit Unit (MTU) of underlying network in bytes", 0,
G_MAXUINT, DEFAULT_MTU, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MEASUREMENT_INTERVAL,
+ g_param_spec_uint64 ("measurement-interval", "Measurement Interval",
+ "Measurement interval of stream in nanoseconds", 0,
+ G_MAXUINT64, DEFAULT_MEASUREMENT_INTERVAL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_INTERVAL_FRAME,
+ g_param_spec_uint ("max-interval-frames", "Maximum Interval Frames",
+ "Maximum number of network frames to be sent on each Measurement Interval",
+ 1, G_MAXUINT, DEFAULT_MAX_INTERVAL_FRAMES,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
GST_DEBUG_CATEGORY_INIT (avtpcvfpay_debug, "avtpcvfpay",
0, "debug category for avtpcvfpay element");
}
avtpcvfpay->mtu = DEFAULT_MTU;
avtpcvfpay->header = NULL;
avtpcvfpay->nal_length_size = 0;
+ avtpcvfpay->measurement_interval = DEFAULT_MEASUREMENT_INTERVAL;
+ avtpcvfpay->max_interval_frames = DEFAULT_MAX_INTERVAL_FRAMES;
+ avtpcvfpay->last_interval_ct = 0;
}
static void
GST_DEBUG_OBJECT (avtpcvfpay, "prop_id: %u", prop_id);
- if (prop_id == PROP_MTU) {
- avtpcvfpay->mtu = g_value_get_uint (value);
- } else {
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ switch (prop_id) {
+ case PROP_MTU:
+ avtpcvfpay->mtu = g_value_get_uint (value);
+ break;
+ case PROP_MEASUREMENT_INTERVAL:
+ avtpcvfpay->measurement_interval = g_value_get_uint64 (value);
+ break;
+ case PROP_MAX_INTERVAL_FRAME:
+ avtpcvfpay->max_interval_frames = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
}
}
GST_DEBUG_OBJECT (avtpcvfpay, "prop_id: %u", prop_id);
- if (prop_id == PROP_MTU) {
- g_value_set_uint (value, avtpcvfpay->mtu);
- } else {
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ switch (prop_id) {
+ case PROP_MTU:
+ g_value_set_uint (value, avtpcvfpay->mtu);
+ break;
+ case PROP_MEASUREMENT_INTERVAL:
+ g_value_set_uint64 (value, avtpcvfpay->measurement_interval);
+ break;
+ case PROP_MAX_INTERVAL_FRAME:
+ g_value_set_uint (value, avtpcvfpay->max_interval_frames);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
}
}
return fragment;
}
+static void
+gst_avtp_cvf_pay_spread_ts (GstAvtpCvfPay * avtpcvfpay,
+ GPtrArray * avtp_packets)
+{
+ /* A bit of the idea of what this function do:
+ *
+ * After fragmenting the NAL unit, we have a series of AVTPDUs (AVTP Data Units)
+ * that should be transmitted. They are going to be transmitted according to GstBuffer
+ * DTS (or PTS in case there's no DTS), but all of them have the same DTS, as they
+ * came from the same original NAL unit.
+ *
+ * However, TSN streams should send their data according to a "measurement interval",
+ * which is an arbitrary interval defined for the stream. For instance, a class A
+ * stream has measurement interval of 125us. Also, there's a MaxIntervalFrames
+ * parameter, that defines how many network frames can be sent on a given measurement
+ * interval. We also spread MaxIntervalFrames per measurement interval.
+ *
+ * To that end, this function will spread the DTS so that fragments follow measurement
+ * interval and MaxIntervalFrames, adjusting them to end before the actual DTS of the
+ * original NAL unit.
+ *
+ * Roughly, this function does:
+ *
+ * DTSn = DTSbase - (measurement_interval/MaxIntervalFrames) * (total - n - 1)
+ *
+ * Where:
+ * DTSn = DTS of nth fragment
+ * DTSbase = DTS of original NAL unit
+ * total = # of fragments
+ *
+ * Another issue that this function takes care of is avoiding DTSs that overlap between
+ * two different set of fragments. Assuming DTSlast is the DTS of the last fragment
+ * generated on previous call to this function, we don't want any DTSn for the current
+ * call to be smaller than DTSlast + (measurement_interval / MaxIntervalFrames). If
+ * that's the case, we adjust DTSbase to preserve this difference (so we don't schedule
+ * packets transmission times that violate stream spec). This will cause the last
+ * fragment DTS to be bigger than DTSbase - we emit a warning, as this may be a sign
+ * of a bad pipeline setup or inappropriate stream spec.
+ *
+ * Finally, we also avoid underflows - which would occur when DTSbase is zero or small
+ * enough. In this case, we'll again make last fragment DTS > DTSbase, so we log it.
+ *
+ */
+
+ GstAvtpBasePayload *avtpbasepayload = GST_AVTP_BASE_PAYLOAD (avtpcvfpay);
+
+ gint i, ret;
+ guint len;
+ guint64 tx_interval, total_interval;
+ GstClockTime base_time, base_dts, rt;
+ GstBuffer *packet;
+
+ base_time = gst_element_get_base_time (GST_ELEMENT (avtpcvfpay));
+ base_dts = GST_BUFFER_DTS (g_ptr_array_index (avtp_packets, 0));
+
+ tx_interval =
+ avtpcvfpay->measurement_interval / avtpcvfpay->max_interval_frames;
+ len = avtp_packets->len;
+ total_interval = tx_interval * (len - 1);
+
+ /* We don't want packets transmission time to overlap, so let's ensure
+ * packets are scheduled after last interval used */
+ if (avtpcvfpay->last_interval_ct != 0) {
+ GstClockTime dts_ct, dts_rt;
+
+ ret =
+ gst_segment_to_running_time_full (&avtpbasepayload->segment,
+ GST_FORMAT_TIME, base_dts, &dts_rt);
+ if (ret == -1)
+ dts_rt = -dts_rt;
+
+ dts_ct = base_time + dts_rt;
+
+ if (dts_ct < avtpcvfpay->last_interval_ct + total_interval + tx_interval) {
+ base_dts +=
+ avtpcvfpay->last_interval_ct + total_interval + tx_interval - dts_ct;
+
+ GST_WARNING_OBJECT (avtpcvfpay,
+ "Not enough measurements intervals between frames to transmit fragments"
+ ". Check stream transmission spec.");
+ }
+ }
+
+ /* Not enough room to spread tx before DTS (or we would underflow),
+ * add offset */
+ if (total_interval > base_dts) {
+ base_dts += total_interval - base_dts;
+
+ GST_INFO_OBJECT (avtpcvfpay,
+ "Not enough measurements intervals to transmit fragments before base "
+ "DTS. Check pipeline settings. Are we live?");
+ }
+
+ for (i = 0; i < len; i++) {
+ packet = g_ptr_array_index (avtp_packets, i);
+ GST_BUFFER_DTS (packet) = base_dts - tx_interval * (len - i - 1);
+ }
+
+ /* Remember last interval used, in clock time */
+ ret =
+ gst_segment_to_running_time_full (&avtpbasepayload->segment,
+ GST_FORMAT_TIME, GST_BUFFER_DTS (g_ptr_array_index (avtp_packets,
+ avtp_packets->len - 1)), &rt);
+ if (ret == -1)
+ rt = -rt;
+ avtpcvfpay->last_interval_ct = base_time + rt;
+}
+
static gboolean
gst_avtp_cvf_pay_prepare_avtp_packets (GstAvtpCvfPay * avtpcvfpay,
GPtrArray * nals, GPtrArray * avtp_packets)
GST_LOG_OBJECT (avtpcvfpay, "Prepared %u AVTP packets", avtp_packets->len);
+ /* Ensure DTS/PTS respect stream transmit spec, so PDUs are transmitted
+ * according to measurement interval. */
+ if (avtp_packets->len > 0)
+ gst_avtp_cvf_pay_spread_ts (avtpcvfpay, avtp_packets);
+
return TRUE;
}
return result;
}
+GST_START_TEST (test_payloader_spread_ts)
+{
+ GstHarness *h;
+ GstBuffer *in, *out;
+ gint i, total_fragments, max_interval_frames;
+ guint64 first_tx_time, final_dts, measurement_interval = 250000;
+
+ /* Create the harness for the avtpcvfpay */
+ h = gst_harness_new_parse
+ ("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=2000000 tu=125000 processing-deadline=0 mtu=128 measurement-interval=250000 max-interval-frames=3");
+ gst_harness_set_src_caps (h, generate_caps (4));
+
+ /* A 980 bytes NAL with mtu=128 should generate 10 fragments */
+ in = gst_harness_create_buffer (h, 980 + 4);
+ add_nal (in, 980, 7, 0);
+ GST_BUFFER_DTS (in) = final_dts = 1000000;
+ GST_BUFFER_PTS (in) = 2000000;
+
+ /* We now push the buffer, and check if we got ten from the avtpcvfpay */
+ gst_harness_push (h, in);
+ fail_unless_equals_int (gst_harness_buffers_received (h), 10);
+
+ /* Using max-interval-frames=3, we'll need 4 measurement intervals to send
+ * all fragments, with last one just about current DTS, and others
+ * progressively before that.
+ *
+ * So we should have something like:
+ *
+ * | 1st | 2nd | 3rd | 4th | Intervals
+ * | 1 2 3 | 4 5 6 | 7 8 9 | 10 | AVTPDUs in each interval (sharing same DTS/PTS)
+ *
+ * And PTS/DTS should increment by a
+ * measurement-interval / max-interval-frames for each AVTPDU.
+ */
+ i = 0;
+ total_fragments = 10;
+ max_interval_frames = 3;
+ first_tx_time =
+ final_dts -
+ (measurement_interval / max_interval_frames) * (total_fragments - 1);
+ for (i = 0; i < 10; i++) {
+ out = gst_harness_pull (h);
+ fail_unless_equals_uint64 (GST_BUFFER_DTS (out), first_tx_time);
+
+ first_tx_time += measurement_interval / max_interval_frames;
+ }
+
+
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
GST_START_TEST (test_payloader_downstream_eos)
{
GstHarness *h;
{
GstHarness *h;
GstElement *element;
- guint mtu, mtt, tu;
- guint64 streamid, processing_deadline;
+ guint mtu, mtt, tu, max_interval_frames;
+ guint64 streamid, processing_deadline, measurement_interval;
/* Create the harness for the avtpcvfpay */
h = gst_harness_new_parse
- ("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=1000000 tu=2000000 mtu=100 processing-deadline=5000");
+ ("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=1000000 tu=2000000 mtu=100 processing-deadline=5000 measurement-interval=125000 max-interval-frames=3");
/* Check if all properties were properly set up */
element = gst_harness_find_element (h, "avtpcvfpay");
NULL);
fail_unless_equals_uint64 (processing_deadline, 5000);
+ g_object_get (G_OBJECT (element), "measurement-interval",
+ &measurement_interval, NULL);
+ fail_unless_equals_uint64 (measurement_interval, 125000);
+
+ g_object_get (G_OBJECT (element), "max-interval-frames",
+ &max_interval_frames, NULL);
+ fail_unless_equals_uint64 (max_interval_frames, 3);
+
gst_object_unref (element);
gst_harness_teardown (h);
}
tcase_add_test (tc_chain, test_payloader_no_codec_data);
tcase_add_test (tc_chain, test_payloader_zero_sized_nal);
tcase_add_test (tc_chain, test_payloader_downstream_eos);
+ tcase_add_test (tc_chain, test_payloader_spread_ts);
return s;
}