#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION
#define DEFAULT_MAX_DROPOUT_TIME 60000
#define DEFAULT_MAX_MISORDER_TIME 2000
+#define DEFAULT_DISABLE_RTCP FALSE
enum
{
PROP_STATS,
PROP_PROBATION,
PROP_MAX_DROPOUT_TIME,
- PROP_MAX_MISORDER_TIME
+ PROP_MAX_MISORDER_TIME,
+ PROP_DISABLE_RTCP
};
/* GObject vmethods */
0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * RTPSession::disable-rtcp:
+ *
+ * Allow disabling the sending of RTCP packets for this source.
+ */
+ g_object_class_install_property (gobject_class, PROP_DISABLE_RTCP,
+ g_param_spec_boolean ("disable-rtcp", "Disable RTCP",
+ "Disable sending RTCP packets for this source",
+ DEFAULT_DISABLE_RTCP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
}
src->bye_reason = NULL;
src->sent_bye = FALSE;
g_hash_table_remove_all (src->reported_in_sr_of);
+ g_queue_foreach (src->retained_feedback, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (src->retained_feedback);
+ src->last_rtptime = -1;
src->stats.cycles = -1;
src->stats.jitter = 0;
src->stats.sent_pli_count = 0;
src->stats.sent_fir_count = 0;
+ src->stats.sent_nack_count = 0;
+ src->stats.recv_nack_count = 0;
}
static void
rtp_source_init (RTPSource * src)
{
- /* sources are initialy on probation until we receive enough valid RTP
+ /* sources are initially on probation until we receive enough valid RTP
* packets or a valid RTCP packet */
src->validated = FALSE;
src->internal = FALSE;
src->clock_rate = -1;
src->packets = g_queue_new ();
src->seqnum_offset = -1;
- src->last_rtptime = -1;
src->retained_feedback = g_queue_new ();
- src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32));
+ src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16));
+ src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime));
src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal);
+ src->last_keyframe_request = GST_CLOCK_TIME_NONE;
+
rtp_source_reset (src);
+
+ src->pt_set = FALSE;
}
void
g_queue_free (src->retained_feedback);
g_array_free (src->nacks, TRUE);
+ g_array_free (src->nack_deadlines, TRUE);
if (src->rtp_from)
g_object_unref (src->rtp_from);
"sent-pli-count", G_TYPE_UINT, src->stats.sent_pli_count,
"recv-pli-count", G_TYPE_UINT, src->stats.recv_pli_count,
"sent-fir-count", G_TYPE_UINT, src->stats.sent_fir_count,
- "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count, NULL);
+ "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count,
+ "sent-nack-count", G_TYPE_UINT, src->stats.sent_nack_count,
+ "recv-nack-count", G_TYPE_UINT, src->stats.recv_nack_count, NULL);
/* get the last SR. */
have_sr = rtp_source_get_last_sr (src, &time, &ntptime, &rtptime,
case PROP_MAX_MISORDER_TIME:
src->max_misorder_time = g_value_get_uint (value);
break;
+ case PROP_DISABLE_RTCP:
+ src->disable_rtcp = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_MAX_MISORDER_TIME:
g_value_set_uint (value, src->max_misorder_time);
break;
+ case PROP_DISABLE_RTCP:
+ g_value_set_boolean (value, src->disable_rtcp);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
goto done;
}
} else {
- /* unexpected seqnum in probation */
+ /* unexpected seqnum in probation
+ *
+ * There is no need to clean the queue at this point because the
+ * invalid packets in the queue are not going to be pushed as we are
+ * still in probation, and some cleanup will be performed at future
+ * probation attempts anyway if there are too many old packets in the
+ * queue.
+ */
goto probation_seqnum;
}
} else if (delta >= 0 && delta < max_dropout) {
g_queue_clear (src->packets);
/* duplicate or reordered packet, will be filtered by jitterbuffer. */
- GST_WARNING ("duplicate or reordered packet (seqnr %u, expected %u)",
+ GST_INFO ("duplicate or reordered packet (seqnr %u, expected %u)",
seqnr, expected);
}
}
src->stats.octets_received += pinfo->payload_len;
src->stats.bytes_received += pinfo->bytes;
- src->stats.packets_received++;
+ src->stats.packets_received += pinfo->packets;
/* for the bitrate estimation */
src->bytes_received += pinfo->payload_len;
}
probation_seqnum:
{
- GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected);
+ GST_WARNING ("probation: seqnr %d != expected %d "
+ "(SSRC %u curr_probation %i probation %i)", seqnr, expected, src->ssrc,
+ src->curr_probation, src->probation);
src->curr_probation = src->probation;
src->stats.max_seq = seqnr;
return FALSE;
* @src: an #RTPSource
* @pinfo: an #RTPPacketInfo
*
- * Let @src handle the incomming RTP packet described in @pinfo.
+ * Let @src handle the incoming RTP packet described in @pinfo.
*
* Returns: a #GstFlowReturn.
*/
/**
* rtp_source_send_rtp:
* @src: an #RTPSource
- * @data: an RTP buffer or a list of RTP buffers
- * @is_list: if @data is a buffer or list
- * @running_time: the running time of @data
+ * @pinfo: an #RTPPacketInfo
*
- * Send @data (an RTP buffer or list of buffers) originating from @src.
- * This will make @src a sender. This function takes ownership of @data and
+ * Send data (an RTP buffer or buffer list from @pinfo) originating from @src.
+ * This will make @src a sender. This function takes ownership of the data and
* modifies the SSRC in the RTP packet to that of @src when needed.
*
* Returns: a #GstFlowReturn.
if (!update_receiver_stats (src, pinfo, FALSE))
return GST_FLOW_OK;
+ if (src->pt_set && src->pt != pinfo->pt) {
+ GST_WARNING ("Changing pt from %u to %u for SSRC %u", src->pt, pinfo->pt,
+ src->ssrc);
+ }
+
+ src->pt = pinfo->pt;
+ src->pt_set = TRUE;
+
/* update stats for the SR */
src->stats.packets_sent += pinfo->packets;
src->stats.octets_sent += pinfo->payload_len;
GST_DEBUG ("last_rtime %" GST_TIME_FORMAT ", last_rtptime %"
G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_rtime), t_rtp);
+ if (src->clock_rate == -1 && src->pt_set) {
+ GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt,
+ src->ssrc);
+ get_clock_rate (src, src->pt);
+ }
+
if (src->clock_rate != -1) {
/* get the diff between the clock running_time and the buffer running_time.
* This is the elapsed time, as measured against the pipeline clock, between
*/
void
rtp_source_timeout (RTPSource * src, GstClockTime current_time,
- GstClockTime feedback_retention_window)
+ GstClockTime running_time, GstClockTime feedback_retention_window)
{
GstRTCPPacket *pkt;
+ GstClockTime max_pts_window;
+ guint pruned = 0;
src->conflicting_addresses =
timeout_conflicting_addresses (src->conflicting_addresses, current_time);
+ if (feedback_retention_window == GST_CLOCK_TIME_NONE ||
+ running_time < feedback_retention_window) {
+ return;
+ }
+
+ max_pts_window = running_time - feedback_retention_window;
+
/* Time out AVPF packets that are older than the desired length */
- while ((pkt = g_queue_peek_tail (src->retained_feedback)) &&
- GST_BUFFER_PTS (pkt) < feedback_retention_window)
- gst_buffer_unref (g_queue_pop_tail (src->retained_feedback));
+ while ((pkt = g_queue_peek_head (src->retained_feedback)) &&
+ GST_BUFFER_PTS (pkt) < max_pts_window) {
+ gst_buffer_unref (g_queue_pop_head (src->retained_feedback));
+ pruned++;
+ }
+
+ GST_LOG_OBJECT (src,
+ "%u RTCP packets pruned with PTS less than %" GST_TIME_FORMAT
+ ", queue len: %u", pruned, GST_TIME_ARGS (max_pts_window),
+ g_queue_get_length (src->retained_feedback));
}
static gint
const GstBuffer *bufa = a;
const GstBuffer *bufb = b;
- return GST_BUFFER_PTS (bufa) - GST_BUFFER_PTS (bufb);
+ g_return_val_if_fail (GST_BUFFER_PTS (bufa) != GST_CLOCK_TIME_NONE, -1);
+ g_return_val_if_fail (GST_BUFFER_PTS (bufb) != GST_CLOCK_TIME_NONE, 1);
+
+ if (GST_BUFFER_PTS (bufa) < GST_BUFFER_PTS (bufb)) {
+ return -1;
+ } else if (GST_BUFFER_PTS (bufa) > GST_BUFFER_PTS (bufb)) {
+ return 1;
+ }
+
+ return 0;
}
void
{
GstBuffer *buffer;
+ g_return_if_fail (running_time != GST_CLOCK_TIME_NONE);
+
buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY,
packet->offset, (gst_rtcp_packet_get_length (packet) + 1) * 4);
GST_BUFFER_PTS (buffer) = running_time;
g_queue_insert_sorted (src->retained_feedback, buffer, compare_buffers, NULL);
+
+ GST_LOG_OBJECT (src, "RTCP packet retained with PTS: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (running_time));
}
gboolean
* rtp_source_register_nack:
* @src: The #RTPSource
* @seqnum: a seqnum
+ * @deadline: the deadline before which RTX is still possible
*
* Register that @seqnum has not been received from @src.
*/
void
-rtp_source_register_nack (RTPSource * src, guint16 seqnum)
+rtp_source_register_nack (RTPSource * src, guint16 seqnum,
+ GstClockTime deadline)
{
- guint i, len;
- guint32 dword = seqnum << 16;
- gint diff = 16;
+ gint i;
+ guint len;
+ gint diff = -1;
+ guint16 tseq;
len = src->nacks->len;
- for (i = 0; i < len; i++) {
- guint32 tdword;
- guint16 tseq;
+ for (i = len - 1; i >= 0; i--) {
+ tseq = g_array_index (src->nacks, guint16, i);
+ diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
- tdword = g_array_index (src->nacks, guint32, i);
- tseq = tdword >> 16;
+ GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len);
- diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
- if (diff < 16)
+ if (diff >= 0)
break;
}
- /* we already have this seqnum */
- if (diff == 0)
- return;
- /* it comes before the recorded seqnum, FIXME, we could merge it
- * if not to far away */
- if (diff < 0) {
- GST_DEBUG ("insert NACK #%u at %u", seqnum, i);
- g_array_insert_val (src->nacks, i, dword);
- } else if (diff < 16) {
- /* we can merge it */
- dword = g_array_index (src->nacks, guint32, i);
- dword |= 1 << (diff - 1);
- GST_DEBUG ("merge NACK #%u at %u with NACK #%u -> 0x%08x", seqnum, i,
- dword >> 16, dword);
- g_array_index (src->nacks, guint32, i) = dword;
+
+ if (diff == 0) {
+ GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_index (src->nack_deadlines, GstClockTime, i) = deadline;
+ } else if (i == len - 1) {
+ GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_append_val (src->nacks, seqnum);
+ g_array_append_val (src->nack_deadlines, deadline);
} else {
- GST_DEBUG ("append NACK #%u", seqnum);
- g_array_append_val (src->nacks, dword);
+ GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_insert_val (src->nacks, i + 1, seqnum);
+ g_array_insert_val (src->nack_deadlines, i + 1, deadline);
}
+
src->send_nack = TRUE;
}
*
* Returns: an array of @n_nacks seqnum values.
*/
-guint32 *
+guint16 *
rtp_source_get_nacks (RTPSource * src, guint * n_nacks)
{
if (n_nacks)
*n_nacks = src->nacks->len;
- return (guint32 *) src->nacks->data;
+ return (guint16 *) src->nacks->data;
}
+/**
+ * rtp_source_get_nacks:
+ * @src: The #RTPSource
+ * @n_nacks: result number of nacks
+ *
+ * Get the registered NACKS deadlines.
+ *
+ * Returns: an array of @n_nacks deadline values.
+ */
+GstClockTime *
+rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks)
+{
+ if (n_nacks)
+ *n_nacks = src->nack_deadlines->len;
+
+ return (GstClockTime *) src->nack_deadlines->data;
+}
+
+/**
+ * rtp_source_clear_nacks:
+ * @src: The #RTPSource
+ * @n_nacks: number of nacks
+ *
+ * Remove @n_nacks oldest NACKS form array.
+ */
void
-rtp_source_clear_nacks (RTPSource * src)
+rtp_source_clear_nacks (RTPSource * src, guint n_nacks)
{
- g_array_set_size (src->nacks, 0);
- src->send_nack = FALSE;
+ g_return_if_fail (n_nacks <= src->nacks->len);
+
+ if (src->nacks->len == n_nacks) {
+ g_array_set_size (src->nacks, 0);
+ g_array_set_size (src->nack_deadlines, 0);
+ src->send_nack = FALSE;
+ } else {
+ g_array_remove_range (src->nacks, 0, n_nacks);
+ g_array_remove_range (src->nack_deadlines, 0, n_nacks);
+ }
}