From 71c9cdeff4b2994616dc6c54f5f5cf3233c8e166 Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Wed, 29 Jun 2016 15:56:47 -0400 Subject: [PATCH] webrtcdsp: Rewrite echo data synchronization The previous code would run out of sync if there was packet lost or clock skews. When that happened, the echo cancellation feature would completely stop working. As this is crucial for audio calls, this patch re-implement synchronization completely. Instead of letting it drift until next discont, we now synchronize against the record data at every iteration. This way we simply never let the stream drift for longer then 10ms period. We also shorter the delay by using the latency up the probe (basically excluding the sink latency. This is a decent delay to avoid starving in the probe queue. https://bugzilla.gnome.org/show_bug.cgi?id=768009 --- ext/webrtcdsp/gstwebrtcdsp.cpp | 172 +++++++++++------------------------ ext/webrtcdsp/gstwebrtcechoprobe.cpp | 88 +++++++++++++++++- ext/webrtcdsp/gstwebrtcechoprobe.h | 6 +- 3 files changed, 143 insertions(+), 123 deletions(-) diff --git a/ext/webrtcdsp/gstwebrtcdsp.cpp b/ext/webrtcdsp/gstwebrtcdsp.cpp index f8f086d..fa974e0 100644 --- a/ext/webrtcdsp/gstwebrtcdsp.cpp +++ b/ext/webrtcdsp/gstwebrtcdsp.cpp @@ -173,10 +173,8 @@ struct _GstWebrtcDsp guint period_size; /* Protected by the stream lock */ - GstClockTime timestamp; GstAdapter *adapter; webrtc::AudioProcessing * apm; - gint delay_ms; /* Protected by the object lock */ gchar *probe_name; @@ -247,138 +245,97 @@ webrtc_error_to_string (gint err) return str; } -/* with probe object lock */ -static gboolean -gst_webrtc_dsp_sync_reverse_stream (GstWebrtcDsp * self, - GstWebrtcEchoProbe * probe) +static GstBuffer * +gst_webrtc_dsp_take_buffer (GstWebrtcDsp * self) { - GstClockTime probe_timestamp; - GstClockTimeDiff diff; + GstBuffer *buffer; + GstClockTime timestamp; guint64 distance; - /* We need to wait for a time reference */ - if (!GST_CLOCK_TIME_IS_VALID (self->timestamp)) - return FALSE; - - probe_timestamp = gst_adapter_prev_pts (probe->adapter, &distance); - - if (!GST_CLOCK_TIME_IS_VALID (probe_timestamp)) { - GST_WARNING_OBJECT (self, - "Echo Probe is handling buffer without timestamp."); - return FALSE; - } - - if (gst_adapter_pts_at_discont (probe->adapter) == probe_timestamp) { - if (distance == 0) - probe->synchronized = FALSE; - } - - if (probe->synchronized) - return TRUE; - - if (gst_adapter_available (probe->adapter) < probe->period_size - || probe->latency == -1) { - GST_TRACE_OBJECT (self, "Echo Probe not ready yet"); - return FALSE; - } - - if (self->info.rate != probe->info.rate) { - GST_WARNING_OBJECT (self, - "Echo Probe has rate %i while the DSP is running at rate %i, use a " - "caps filter to ensure those are the same.", - probe->info.rate, self->info.rate); - return FALSE; - } + timestamp = gst_adapter_prev_pts (self->adapter, &distance); + timestamp += gst_util_uint64_scale_int (distance / self->info.bpf, + GST_SECOND, self->info.rate); - probe_timestamp += gst_util_uint64_scale_int (distance / probe->info.bpf, - GST_SECOND, probe->info.rate); + buffer = gst_adapter_take_buffer (self->adapter, self->period_size); - diff = GST_CLOCK_DIFF (probe_timestamp, self->timestamp); - self->delay_ms = (probe->latency - diff) / GST_MSECOND; + GST_BUFFER_PTS (buffer) = timestamp; + GST_BUFFER_DURATION (buffer) = 10 * GST_MSECOND; - GST_DEBUG_OBJECT (probe, "Echo Probe is now synchronized"); - probe->synchronized = TRUE; + if (gst_adapter_pts_at_discont (self->adapter) == timestamp && distance == 0) { + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + } else + GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT); - return TRUE; + return buffer; } -static void -gst_webrtc_dsp_analyze_reverse_stream (GstWebrtcDsp * self) +static GstFlowReturn +gst_webrtc_dsp_analyze_reverse_stream (GstWebrtcDsp * self, + GstClockTime rec_time) { GstWebrtcEchoProbe *probe = NULL; webrtc::AudioProcessing * apm; webrtc::AudioFrame frame; - gint err; + GstFlowReturn ret = GST_FLOW_OK; + gint err, delay; GST_OBJECT_LOCK (self); if (self->echo_cancel) probe = GST_WEBRTC_ECHO_PROBE (g_object_ref (self->probe)); GST_OBJECT_UNLOCK (self); + /* If echo cancellation is disabled */ if (!probe) - return; + return GST_FLOW_OK; apm = self->apm; - GST_WEBRTC_ECHO_PROBE_LOCK (probe); - - if (gst_adapter_available (probe->adapter) < probe->period_size) { - GST_LOG_OBJECT (self, "No echo data yet..."); - goto beach; - } + delay = gst_webrtc_echo_probe_read (probe, rec_time, (gpointer) &frame); - if (!gst_webrtc_dsp_sync_reverse_stream (self, probe)) - goto beach; + apm->set_stream_delay_ms (delay); - frame.num_channels_ = probe->info.channels; - frame.sample_rate_hz_ = probe->info.rate; - frame.samples_per_channel_ = probe->period_size / probe->info.bpf; + if (delay < 0) + goto done; - gst_adapter_copy (probe->adapter, (guint8 *) frame.data_, 0, - probe->period_size); - gst_adapter_flush (probe->adapter, self->period_size); + if (frame.sample_rate_hz_ != self->info.rate) { + GST_ELEMENT_ERROR (self, STREAM, FORMAT, + ("Echo Probe has rate %i , while the DSP is running at rate %i," + " use a caps filter to ensure those are the same.", + frame.sample_rate_hz_, self->info.rate), (NULL)); + ret = GST_FLOW_ERROR; + goto done; + } if ((err = apm->AnalyzeReverseStream (&frame)) < 0) GST_WARNING_OBJECT (self, "Reverse stream analyses failed: %s.", webrtc_error_to_string (err)); -beach: - GST_WEBRTC_ECHO_PROBE_UNLOCK (probe); +done: gst_object_unref (probe); + + return ret; } -static GstBuffer * -gst_webrtc_dsp_process_stream (GstWebrtcDsp * self) +static GstFlowReturn +gst_webrtc_dsp_process_stream (GstWebrtcDsp * self, + GstBuffer * buffer) { - GstBuffer *buffer; GstMapInfo info; webrtc::AudioProcessing * apm = self->apm; webrtc::AudioFrame frame; - GstClockTime timestamp; - guint64 distance; gint err; frame.num_channels_ = self->info.channels; frame.sample_rate_hz_ = self->info.rate; frame.samples_per_channel_ = self->period_size / self->info.bpf; - timestamp = gst_adapter_prev_pts (self->adapter, &distance); - - if (GST_CLOCK_TIME_IS_VALID (timestamp)) - timestamp += gst_util_uint64_scale_int (distance / self->info.bpf, - GST_SECOND, self->info.rate); - - buffer = gst_adapter_take_buffer (self->adapter, self->period_size); - if (!gst_buffer_map (buffer, &info, (GstMapFlags) GST_MAP_READWRITE)) { gst_buffer_unref (buffer); - return NULL; + return GST_FLOW_ERROR; } memcpy (frame.data_, info.data, self->period_size); - apm->set_stream_delay_ms (self->delay_ms); - if ((err = apm->ProcessStream (&frame)) < 0) { GST_WARNING_OBJECT (self, "Failed to filter the audio: %s.", webrtc_error_to_string (err)); @@ -388,18 +345,7 @@ gst_webrtc_dsp_process_stream (GstWebrtcDsp * self) gst_buffer_unmap (buffer, &info); - GST_BUFFER_PTS (buffer) = timestamp; - GST_BUFFER_DURATION (buffer) = 10 * GST_MSECOND; - - if (gst_adapter_pts_at_discont (self->adapter) == timestamp) - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); - else - GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT); - - if (GST_CLOCK_TIME_IS_VALID (timestamp)) - self->timestamp = timestamp + GST_BUFFER_DURATION (buffer); - - return buffer; + return GST_FLOW_OK; } static GstFlowReturn @@ -412,18 +358,9 @@ gst_webrtc_dsp_submit_input_buffer (GstBaseTransform * btrans, GST_BUFFER_PTS (buffer) = gst_segment_to_running_time (&btrans->segment, GST_FORMAT_TIME, GST_BUFFER_PTS (buffer)); - if (!GST_CLOCK_TIME_IS_VALID (self->timestamp)) - self->timestamp = GST_BUFFER_PTS (buffer); - if (is_discont) { - GST_OBJECT_LOCK (self); - if (self->echo_cancel && self->probe) { - GST_WEBRTC_ECHO_PROBE_LOCK (self->probe); - self->probe->synchronized = FALSE; - GST_WEBRTC_ECHO_PROBE_UNLOCK (self->probe); - } - GST_OBJECT_UNLOCK (self); - + GST_DEBUG_OBJECT (self, + "Received discont, clearing adapter."); gst_adapter_clear (self->adapter); } @@ -436,15 +373,20 @@ static GstFlowReturn gst_webrtc_dsp_generate_output (GstBaseTransform * btrans, GstBuffer ** outbuf) { GstWebrtcDsp *self = GST_WEBRTC_DSP (btrans); + GstFlowReturn ret; - if (gst_adapter_available (self->adapter) >= self->period_size) { - gst_webrtc_dsp_analyze_reverse_stream (self); - *outbuf = gst_webrtc_dsp_process_stream (self); - } else { + if (gst_adapter_available (self->adapter) < self->period_size) { *outbuf = NULL; + return GST_FLOW_OK; } - return GST_FLOW_OK; + *outbuf = gst_webrtc_dsp_take_buffer (self); + ret = gst_webrtc_dsp_analyze_reverse_stream (self, GST_BUFFER_PTS (*outbuf)); + + if (ret == GST_FLOW_OK) + ret = gst_webrtc_dsp_process_stream (self, *outbuf); + + return ret; } static gboolean @@ -495,8 +437,6 @@ gst_webrtc_dsp_setup (GstAudioFilter * filter, const GstAudioInfo * info) GST_OBJECT_LOCK (self); gst_adapter_clear (self->adapter); - self->timestamp = GST_CLOCK_TIME_NONE; - self->delay_ms = 0; self->info = *info; apm = self->apm; @@ -515,8 +455,6 @@ gst_webrtc_dsp_setup (GstAudioFilter * filter, const GstAudioInfo * info) probe_info = self->probe->info; } - self->probe->synchronized = FALSE; - GST_WEBRTC_ECHO_PROBE_UNLOCK (self->probe); } @@ -540,7 +478,7 @@ gst_webrtc_dsp_setup (GstAudioFilter * filter, const GstAudioInfo * info) apm->high_pass_filter ()->Enable (true); } - if (self->echo_cancel && self->probe) { + if (self->echo_cancel) { GST_DEBUG_OBJECT (self, "Enabling Echo Cancellation"); apm->echo_cancellation ()->enable_drift_compensation (false); apm->echo_cancellation () diff --git a/ext/webrtcdsp/gstwebrtcechoprobe.cpp b/ext/webrtcdsp/gstwebrtcechoprobe.cpp index 8dbfc26..ebc35c1 100644 --- a/ext/webrtcdsp/gstwebrtcechoprobe.cpp +++ b/ext/webrtcdsp/gstwebrtcechoprobe.cpp @@ -80,7 +80,6 @@ gst_webrtc_echo_probe_setup (GstAudioFilter * filter, const GstAudioInfo * info) GST_WEBRTC_ECHO_PROBE_LOCK (self); self->info = *info; - self->synchronized = FALSE; /* WebRTC library works with 10ms buffers, compute once this size */ self->period_size = info->bpf * info->rate / 100; @@ -119,19 +118,31 @@ gst_webrtc_echo_probe_src_event (GstBaseTransform * btrans, GstEvent * event) GstBaseTransformClass *klass; GstWebrtcEchoProbe *self = GST_WEBRTC_ECHO_PROBE (btrans); GstClockTime latency; + GstClockTime upstream_latency = 0; + GstQuery *query; klass = GST_BASE_TRANSFORM_CLASS (gst_webrtc_echo_probe_parent_class); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_LATENCY: gst_event_parse_latency (event, &latency); + query = gst_query_new_latency (); + + if (gst_pad_query (btrans->srcpad, query)) { + gst_query_parse_latency (query, NULL, &upstream_latency, NULL); + + if (!GST_CLOCK_TIME_IS_VALID (upstream_latency)) + upstream_latency = 0; + } GST_WEBRTC_ECHO_PROBE_LOCK (self); self->latency = latency; + self->delay = upstream_latency / GST_MSECOND; GST_WEBRTC_ECHO_PROBE_UNLOCK (self); - GST_DEBUG_OBJECT (self, "We have a latency of %" GST_TIME_FORMAT, - GST_TIME_ARGS (latency)); + GST_DEBUG_OBJECT (self, "We have a latency of %" GST_TIME_FORMAT + " and delay of %ims", GST_TIME_ARGS (latency), + (gint) (upstream_latency / GST_MSECOND)); break; default: break; @@ -184,7 +195,7 @@ gst_webrtc_echo_probe_init (GstWebrtcEchoProbe * self) gst_audio_info_init (&self->info); g_mutex_init (&self->lock); - self->latency = -1; + self->latency = GST_CLOCK_TIME_NONE; G_LOCK (gst_aec_probes); gst_aec_probes = g_list_prepend (gst_aec_probes, self); @@ -254,3 +265,72 @@ gst_webrtc_release_echo_probe (GstWebrtcEchoProbe * probe) GST_WEBRTC_ECHO_PROBE_UNLOCK (probe); gst_object_unref (probe); } + +gint +gst_webrtc_echo_probe_read (GstWebrtcEchoProbe * self, GstClockTime rec_time, + gpointer _frame) +{ + webrtc::AudioFrame * frame = (webrtc::AudioFrame *) _frame; + GstClockTimeDiff diff; + gsize avail, skip, offset, size; + gint delay = -1; + + GST_WEBRTC_ECHO_PROBE_LOCK (self); + + if (!GST_CLOCK_TIME_IS_VALID (self->latency)) + goto done; + + if (gst_adapter_available (self->adapter) == 0) { + diff = G_MAXINT64; + } else { + GstClockTime play_time; + guint64 distance; + + play_time = gst_adapter_prev_pts (self->adapter, &distance); + + if (GST_CLOCK_TIME_IS_VALID (play_time)) { + play_time += gst_util_uint64_scale_int (distance / self->info.bpf, + GST_SECOND, self->info.rate); + play_time += self->latency; + + diff = GST_CLOCK_DIFF (rec_time, play_time) / GST_MSECOND; + } else { + /* We have no timestamp, assume perfect delay */ + diff = self->delay; + } + } + + avail = gst_adapter_available (self->adapter); + + if (diff > self->delay) { + skip = (diff - self->delay) * self->info.rate / 1000 * self->info.bpf; + skip = MIN (self->period_size, skip); + offset = 0; + } else { + skip = 0; + offset = (self->delay - diff) * self->info.rate / 1000 * self->info.bpf; + offset = MIN (avail, offset); + } + + size = MIN (avail - offset, self->period_size - skip); + + if (size < self->period_size) + memset (frame->data_, 0, self->period_size); + + if (size) { + gst_adapter_copy (self->adapter, (guint8 *) frame->data_ + skip, + offset, size); + gst_adapter_flush (self->adapter, offset + size); + } + + frame->num_channels_ = self->info.channels; + frame->sample_rate_hz_ = self->info.rate; + frame->samples_per_channel_ = self->period_size / self->info.bpf; + + delay = self->delay; + +done: + GST_WEBRTC_ECHO_PROBE_UNLOCK (self); + + return delay; +} diff --git a/ext/webrtcdsp/gstwebrtcechoprobe.h b/ext/webrtcdsp/gstwebrtcechoprobe.h index a18547d..7c8a240 100644 --- a/ext/webrtcdsp/gstwebrtcechoprobe.h +++ b/ext/webrtcdsp/gstwebrtcechoprobe.h @@ -62,8 +62,8 @@ struct _GstWebrtcEchoProbe /* Protected by the lock */ GstAudioInfo info; guint period_size; - gint latency; - gboolean synchronized; + GstClockTime latency; + gint delay; GstSegment segment; GstAdapter *adapter; @@ -81,6 +81,8 @@ GType gst_webrtc_echo_probe_get_type (void); GstWebrtcEchoProbe *gst_webrtc_acquire_echo_probe (const gchar * name); void gst_webrtc_release_echo_probe (GstWebrtcEchoProbe * probe); +gint gst_webrtc_echo_probe_read (GstWebrtcEchoProbe * self, + GstClockTime rec_time, gpointer frame); G_END_DECLS #endif /* __GST_WEBRTC_ECHO_PROBE_H__ */ -- 2.7.4