From 12838af35330d6324cd5d4316e9883e1712a0a43 Mon Sep 17 00:00:00 2001 From: Ederson de Souza Date: Fri, 3 Apr 2020 10:40:43 -0700 Subject: [PATCH] avtpcvfpay: Ensure NAL fragments are transmitted following stream specs TSN streams are expected to send packets to the network in a well defined "pace", which is arbitrarily defined for each stream. This pace is defined by the "measurement interval" property of a stream. When the AVTP CVF payloader element - avtpcvfpay - fragments a video frame that is too big to be sent to the network, it currently defines that all fragments should be transmitted at the same time (via DTS property of GstBuffers generated, as sink will use those to time the transmission of the AVTPDU). This doesn't comply with stream definition, which also has a limit on how many packets can be sent on a given measurement interval. This patch solves that by spreading in time the DTS of the GstBuffers containing the AVTPDUs. Two new properties, "measurement-interval" and "max-interval-frames", added to avptcvfpay element so that it knows stream measurement interval and how many AVTPDUs it can send on any of them. More details on the method used to proper spread DTS/PTS according to measurement interval can be found in a code commentary inside this patch. Tests also added for the new property and behaviour. Part-of: --- ext/avtp/gstavtpcvfpay.c | 168 ++++++++++++++++++++++++++++++++++++-- ext/avtp/gstavtpcvfpay.h | 3 + tests/check/elements/avtpcvfpay.c | 68 ++++++++++++++- 3 files changed, 227 insertions(+), 12 deletions(-) diff --git a/ext/avtp/gstavtpcvfpay.c b/ext/avtp/gstavtpcvfpay.c index f60911b..4137bb3 100644 --- a/ext/avtp/gstavtpcvfpay.c +++ b/ext/avtp/gstavtpcvfpay.c @@ -62,10 +62,14 @@ static GstStateChangeReturn gst_avtp_cvf_change_state (GstElement * enum { PROP_0, - PROP_MTU + PROP_MTU, + PROP_MEASUREMENT_INTERVAL, + PROP_MAX_INTERVAL_FRAME }; #define DEFAULT_MTU 1500 +#define DEFAULT_MEASUREMENT_INTERVAL 250000 +#define DEFAULT_MAX_INTERVAL_FRAMES 1 #define AVTP_CVF_H264_HEADER_SIZE (sizeof(struct avtp_stream_pdu) + sizeof(guint32)) #define FU_A_TYPE 28 @@ -125,6 +129,18 @@ gst_avtp_cvf_pay_class_init (GstAvtpCvfPayClass * klass) "Maximum Transit Unit (MTU) of underlying network in bytes", 0, G_MAXUINT, DEFAULT_MTU, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MEASUREMENT_INTERVAL, + g_param_spec_uint64 ("measurement-interval", "Measurement Interval", + "Measurement interval of stream in nanoseconds", 0, + G_MAXUINT64, DEFAULT_MEASUREMENT_INTERVAL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_INTERVAL_FRAME, + g_param_spec_uint ("max-interval-frames", "Maximum Interval Frames", + "Maximum number of network frames to be sent on each Measurement Interval", + 1, G_MAXUINT, DEFAULT_MAX_INTERVAL_FRAMES, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_DEBUG_CATEGORY_INIT (avtpcvfpay_debug, "avtpcvfpay", 0, "debug category for avtpcvfpay element"); } @@ -135,6 +151,9 @@ gst_avtp_cvf_pay_init (GstAvtpCvfPay * avtpcvfpay) avtpcvfpay->mtu = DEFAULT_MTU; avtpcvfpay->header = NULL; avtpcvfpay->nal_length_size = 0; + avtpcvfpay->measurement_interval = DEFAULT_MEASUREMENT_INTERVAL; + avtpcvfpay->max_interval_frames = DEFAULT_MAX_INTERVAL_FRAMES; + avtpcvfpay->last_interval_ct = 0; } static void @@ -145,10 +164,19 @@ gst_avtp_cvf_set_property (GObject * object, guint prop_id, GST_DEBUG_OBJECT (avtpcvfpay, "prop_id: %u", prop_id); - if (prop_id == PROP_MTU) { - avtpcvfpay->mtu = g_value_get_uint (value); - } else { - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + switch (prop_id) { + case PROP_MTU: + avtpcvfpay->mtu = g_value_get_uint (value); + break; + case PROP_MEASUREMENT_INTERVAL: + avtpcvfpay->measurement_interval = g_value_get_uint64 (value); + break; + case PROP_MAX_INTERVAL_FRAME: + avtpcvfpay->max_interval_frames = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; } } @@ -160,10 +188,19 @@ gst_avtp_cvf_get_property (GObject * object, guint prop_id, GST_DEBUG_OBJECT (avtpcvfpay, "prop_id: %u", prop_id); - if (prop_id == PROP_MTU) { - g_value_set_uint (value, avtpcvfpay->mtu); - } else { - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + switch (prop_id) { + case PROP_MTU: + g_value_set_uint (value, avtpcvfpay->mtu); + break; + case PROP_MEASUREMENT_INTERVAL: + g_value_set_uint64 (value, avtpcvfpay->measurement_interval); + break; + case PROP_MAX_INTERVAL_FRAME: + g_value_set_uint (value, avtpcvfpay->max_interval_frames); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; } } @@ -371,6 +408,114 @@ gst_avtpcvpay_fragment_nal (GstAvtpCvfPay * avtpcvfpay, GstBuffer * nal, return fragment; } +static void +gst_avtp_cvf_pay_spread_ts (GstAvtpCvfPay * avtpcvfpay, + GPtrArray * avtp_packets) +{ + /* A bit of the idea of what this function do: + * + * After fragmenting the NAL unit, we have a series of AVTPDUs (AVTP Data Units) + * that should be transmitted. They are going to be transmitted according to GstBuffer + * DTS (or PTS in case there's no DTS), but all of them have the same DTS, as they + * came from the same original NAL unit. + * + * However, TSN streams should send their data according to a "measurement interval", + * which is an arbitrary interval defined for the stream. For instance, a class A + * stream has measurement interval of 125us. Also, there's a MaxIntervalFrames + * parameter, that defines how many network frames can be sent on a given measurement + * interval. We also spread MaxIntervalFrames per measurement interval. + * + * To that end, this function will spread the DTS so that fragments follow measurement + * interval and MaxIntervalFrames, adjusting them to end before the actual DTS of the + * original NAL unit. + * + * Roughly, this function does: + * + * DTSn = DTSbase - (measurement_interval/MaxIntervalFrames) * (total - n - 1) + * + * Where: + * DTSn = DTS of nth fragment + * DTSbase = DTS of original NAL unit + * total = # of fragments + * + * Another issue that this function takes care of is avoiding DTSs that overlap between + * two different set of fragments. Assuming DTSlast is the DTS of the last fragment + * generated on previous call to this function, we don't want any DTSn for the current + * call to be smaller than DTSlast + (measurement_interval / MaxIntervalFrames). If + * that's the case, we adjust DTSbase to preserve this difference (so we don't schedule + * packets transmission times that violate stream spec). This will cause the last + * fragment DTS to be bigger than DTSbase - we emit a warning, as this may be a sign + * of a bad pipeline setup or inappropriate stream spec. + * + * Finally, we also avoid underflows - which would occur when DTSbase is zero or small + * enough. In this case, we'll again make last fragment DTS > DTSbase, so we log it. + * + */ + + GstAvtpBasePayload *avtpbasepayload = GST_AVTP_BASE_PAYLOAD (avtpcvfpay); + + gint i, ret; + guint len; + guint64 tx_interval, total_interval; + GstClockTime base_time, base_dts, rt; + GstBuffer *packet; + + base_time = gst_element_get_base_time (GST_ELEMENT (avtpcvfpay)); + base_dts = GST_BUFFER_DTS (g_ptr_array_index (avtp_packets, 0)); + + tx_interval = + avtpcvfpay->measurement_interval / avtpcvfpay->max_interval_frames; + len = avtp_packets->len; + total_interval = tx_interval * (len - 1); + + /* We don't want packets transmission time to overlap, so let's ensure + * packets are scheduled after last interval used */ + if (avtpcvfpay->last_interval_ct != 0) { + GstClockTime dts_ct, dts_rt; + + ret = + gst_segment_to_running_time_full (&avtpbasepayload->segment, + GST_FORMAT_TIME, base_dts, &dts_rt); + if (ret == -1) + dts_rt = -dts_rt; + + dts_ct = base_time + dts_rt; + + if (dts_ct < avtpcvfpay->last_interval_ct + total_interval + tx_interval) { + base_dts += + avtpcvfpay->last_interval_ct + total_interval + tx_interval - dts_ct; + + GST_WARNING_OBJECT (avtpcvfpay, + "Not enough measurements intervals between frames to transmit fragments" + ". Check stream transmission spec."); + } + } + + /* Not enough room to spread tx before DTS (or we would underflow), + * add offset */ + if (total_interval > base_dts) { + base_dts += total_interval - base_dts; + + GST_INFO_OBJECT (avtpcvfpay, + "Not enough measurements intervals to transmit fragments before base " + "DTS. Check pipeline settings. Are we live?"); + } + + for (i = 0; i < len; i++) { + packet = g_ptr_array_index (avtp_packets, i); + GST_BUFFER_DTS (packet) = base_dts - tx_interval * (len - i - 1); + } + + /* Remember last interval used, in clock time */ + ret = + gst_segment_to_running_time_full (&avtpbasepayload->segment, + GST_FORMAT_TIME, GST_BUFFER_DTS (g_ptr_array_index (avtp_packets, + avtp_packets->len - 1)), &rt); + if (ret == -1) + rt = -rt; + avtpcvfpay->last_interval_ct = base_time + rt; +} + static gboolean gst_avtp_cvf_pay_prepare_avtp_packets (GstAvtpCvfPay * avtpcvfpay, GPtrArray * nals, GPtrArray * avtp_packets) @@ -480,6 +625,11 @@ gst_avtp_cvf_pay_prepare_avtp_packets (GstAvtpCvfPay * avtpcvfpay, GST_LOG_OBJECT (avtpcvfpay, "Prepared %u AVTP packets", avtp_packets->len); + /* Ensure DTS/PTS respect stream transmit spec, so PDUs are transmitted + * according to measurement interval. */ + if (avtp_packets->len > 0) + gst_avtp_cvf_pay_spread_ts (avtpcvfpay, avtp_packets); + return TRUE; } diff --git a/ext/avtp/gstavtpcvfpay.h b/ext/avtp/gstavtpcvfpay.h index cd9d987..ecabe34 100644 --- a/ext/avtp/gstavtpcvfpay.h +++ b/ext/avtp/gstavtpcvfpay.h @@ -47,6 +47,9 @@ struct _GstAvtpCvfPay GstBuffer *header; guint mtu; + guint64 measurement_interval; + guint max_interval_frames; + guint64 last_interval_ct; /* H.264 specific information */ guint8 nal_length_size; diff --git a/tests/check/elements/avtpcvfpay.c b/tests/check/elements/avtpcvfpay.c index 6273809..a74c18f 100644 --- a/tests/check/elements/avtpcvfpay.c +++ b/tests/check/elements/avtpcvfpay.c @@ -139,6 +139,59 @@ compare_h264_avtpdu (struct avtp_stream_pdu *pdu, GstBuffer * buffer) return result; } +GST_START_TEST (test_payloader_spread_ts) +{ + GstHarness *h; + GstBuffer *in, *out; + gint i, total_fragments, max_interval_frames; + guint64 first_tx_time, final_dts, measurement_interval = 250000; + + /* Create the harness for the avtpcvfpay */ + h = gst_harness_new_parse + ("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=2000000 tu=125000 processing-deadline=0 mtu=128 measurement-interval=250000 max-interval-frames=3"); + gst_harness_set_src_caps (h, generate_caps (4)); + + /* A 980 bytes NAL with mtu=128 should generate 10 fragments */ + in = gst_harness_create_buffer (h, 980 + 4); + add_nal (in, 980, 7, 0); + GST_BUFFER_DTS (in) = final_dts = 1000000; + GST_BUFFER_PTS (in) = 2000000; + + /* We now push the buffer, and check if we got ten from the avtpcvfpay */ + gst_harness_push (h, in); + fail_unless_equals_int (gst_harness_buffers_received (h), 10); + + /* Using max-interval-frames=3, we'll need 4 measurement intervals to send + * all fragments, with last one just about current DTS, and others + * progressively before that. + * + * So we should have something like: + * + * | 1st | 2nd | 3rd | 4th | Intervals + * | 1 2 3 | 4 5 6 | 7 8 9 | 10 | AVTPDUs in each interval (sharing same DTS/PTS) + * + * And PTS/DTS should increment by a + * measurement-interval / max-interval-frames for each AVTPDU. + */ + i = 0; + total_fragments = 10; + max_interval_frames = 3; + first_tx_time = + final_dts - + (measurement_interval / max_interval_frames) * (total_fragments - 1); + for (i = 0; i < 10; i++) { + out = gst_harness_pull (h); + fail_unless_equals_uint64 (GST_BUFFER_DTS (out), first_tx_time); + + first_tx_time += measurement_interval / max_interval_frames; + } + + + gst_harness_teardown (h); +} + +GST_END_TEST; + GST_START_TEST (test_payloader_downstream_eos) { GstHarness *h; @@ -332,12 +385,12 @@ GST_START_TEST (test_payloader_properties) { GstHarness *h; GstElement *element; - guint mtu, mtt, tu; - guint64 streamid, processing_deadline; + guint mtu, mtt, tu, max_interval_frames; + guint64 streamid, processing_deadline, measurement_interval; /* Create the harness for the avtpcvfpay */ h = gst_harness_new_parse - ("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=1000000 tu=2000000 mtu=100 processing-deadline=5000"); + ("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=1000000 tu=2000000 mtu=100 processing-deadline=5000 measurement-interval=125000 max-interval-frames=3"); /* Check if all properties were properly set up */ element = gst_harness_find_element (h, "avtpcvfpay"); @@ -357,6 +410,14 @@ GST_START_TEST (test_payloader_properties) NULL); fail_unless_equals_uint64 (processing_deadline, 5000); + g_object_get (G_OBJECT (element), "measurement-interval", + &measurement_interval, NULL); + fail_unless_equals_uint64 (measurement_interval, 125000); + + g_object_get (G_OBJECT (element), "max-interval-frames", + &max_interval_frames, NULL); + fail_unless_equals_uint64 (max_interval_frames, 3); + gst_object_unref (element); gst_harness_teardown (h); } @@ -704,6 +765,7 @@ avtpcvfpay_suite (void) tcase_add_test (tc_chain, test_payloader_no_codec_data); tcase_add_test (tc_chain, test_payloader_zero_sized_nal); tcase_add_test (tc_chain, test_payloader_downstream_eos); + tcase_add_test (tc_chain, test_payloader_spread_ts); return s; } -- 2.7.4