From af25e3cc93ec5c6d5fce8e0b9631a76423134a02 Mon Sep 17 00:00:00 2001 From: Graham Leggett Date: Sun, 11 Oct 2015 22:07:54 +0000 Subject: [PATCH] souphttpclientsink: Add the retry and retry-delay properties These allow a failed request to be retried after the given number of seconds instead of failing the pipeline. Take account of the Retry-After header if present. Add retries parameter that controls the number of times an HTTP request will be retried before failing. https://bugzilla.gnome.org/show_bug.cgi?id=756318 --- ext/soup/gstsouphttpclientsink.c | 85 +++++++++++++++++++++++++++++--- ext/soup/gstsouphttpclientsink.h | 4 ++ 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/ext/soup/gstsouphttpclientsink.c b/ext/soup/gstsouphttpclientsink.c index fe5bf726b8..07efdc2c5a 100644 --- a/ext/soup/gstsouphttpclientsink.c +++ b/ext/soup/gstsouphttpclientsink.c @@ -43,8 +43,6 @@ #include "gstsouphttpclientsink.h" #include "gstsouputils.h" -#include - GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg); #define GST_CAT_DEFAULT souphttpclientsink_dbg @@ -94,7 +92,9 @@ enum PROP_PROXY_PW, PROP_COOKIES, PROP_SESSION, - PROP_SOUP_LOG_LEVEL + PROP_SOUP_LOG_LEVEL, + PROP_RETRY_DELAY, + PROP_RETRIES }; #define DEFAULT_USER_AGENT "GStreamer souphttpclientsink " @@ -170,6 +170,14 @@ gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass) g_object_class_install_property (gobject_class, PROP_COOKIES, g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies", G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RETRY_DELAY, + g_param_spec_int ("retry-delay", "Retry Delay", + "Delay in seconds between retries after a failure", 1, G_MAXINT, 5, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RETRIES, + g_param_spec_int ("retries", "Retries", + "Maximum number of retries, zero to disable, -1 to retry forever", + -1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstSoupHttpClientSink::http-log-level: * @@ -230,6 +238,8 @@ gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink) souphttpsink->prop_session = NULL; souphttpsink->timeout = 1; souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL; + souphttpsink->retry_delay = 5; + souphttpsink->retries = 0; proxy = g_getenv ("http_proxy"); if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) { GST_WARNING_OBJECT (souphttpsink, @@ -250,6 +260,7 @@ gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink) souphttpsink->reason_phrase = NULL; souphttpsink->status_code = 0; souphttpsink->offset = 0; + souphttpsink->failures = 0; g_list_free_full (souphttpsink->streamheader_buffers, (GDestroyNotify) gst_buffer_unref); @@ -350,6 +361,12 @@ gst_soup_http_client_sink_set_property (GObject * object, guint property_id, case PROP_SOUP_LOG_LEVEL: souphttpsink->log_level = g_value_get_enum (value); break; + case PROP_RETRY_DELAY: + souphttpsink->retry_delay = g_value_get_int (value); + break; + case PROP_RETRIES: + souphttpsink->retries = g_value_get_int (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -405,6 +422,12 @@ gst_soup_http_client_sink_get_property (GObject * object, guint property_id, case PROP_SOUP_LOG_LEVEL: g_value_set_enum (value, souphttpsink->log_level); break; + case PROP_RETRY_DELAY: + g_value_set_int (value, souphttpsink->retry_delay); + break; + case PROP_RETRIES: + g_value_set_int (value, souphttpsink->retries); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -585,8 +608,19 @@ gst_soup_http_client_sink_stop (GstBaseSink * sink) g_object_unref (souphttpsink->session); } + g_mutex_lock (&souphttpsink->mutex); + if (souphttpsink->timer) { + g_source_destroy (souphttpsink->timer); + g_source_unref (souphttpsink->timer); + souphttpsink->timer = NULL; + } + g_mutex_unlock (&souphttpsink->mutex); + if (souphttpsink->loop) { g_main_loop_quit (souphttpsink->loop); + g_mutex_lock (&souphttpsink->mutex); + g_cond_signal (&souphttpsink->cond); + g_mutex_unlock (&souphttpsink->mutex); g_thread_join (souphttpsink->thread); g_main_loop_unref (souphttpsink->loop); souphttpsink->loop = NULL; @@ -751,6 +785,11 @@ send_message (GstSoupHttpClientSink * souphttpsink) { g_mutex_lock (&souphttpsink->mutex); send_message_locked (souphttpsink); + if (souphttpsink->timer) { + g_source_destroy (souphttpsink->timer); + g_source_unref (souphttpsink->timer); + souphttpsink->timer = NULL; + } g_mutex_unlock (&souphttpsink->mutex); return FALSE; @@ -769,8 +808,40 @@ callback (SoupSession * session, SoupMessage * msg, gpointer user_data) souphttpsink->message = NULL; if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) { - souphttpsink->status_code = msg->status_code; - souphttpsink->reason_phrase = g_strdup (msg->reason_phrase); + souphttpsink->failures++; + if (souphttpsink->retries && + (souphttpsink->retries < 0 || + souphttpsink->retries >= souphttpsink->failures)) { + guint64 retry_delay; + const char *retry_after = + soup_message_headers_get_one (msg->response_headers, + "Retry-After"); + if (retry_after) { + gchar *end = NULL; + retry_delay = g_ascii_strtoull (retry_after, &end, 10); + if (end || errno) { + retry_delay = souphttpsink->retry_delay; + } else { + retry_delay = MAX (retry_delay, souphttpsink->retry_delay); + } + GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: " + "status: %d %s (retrying PUT after %" G_GINT64_FORMAT + " seconds with Retry-After: %s)", msg->status_code, + msg->reason_phrase, retry_delay, retry_after); + } else { + retry_delay = souphttpsink->retry_delay; + GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: " + "status: %d %s (retrying PUT after %" G_GINT64_FORMAT + " seconds)", msg->status_code, msg->reason_phrase, retry_delay); + } + souphttpsink->timer = g_timeout_source_new_seconds (retry_delay); + g_source_set_callback (souphttpsink->timer, (GSourceFunc) (send_message), + souphttpsink, NULL); + g_source_attach (souphttpsink->timer, souphttpsink->context); + } else { + souphttpsink->status_code = msg->status_code; + souphttpsink->reason_phrase = g_strdup (msg->reason_phrase); + } g_mutex_unlock (&souphttpsink->mutex); return; } @@ -778,6 +849,7 @@ callback (SoupSession * session, SoupMessage * msg, gpointer user_data) g_list_free_full (souphttpsink->sent_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->sent_buffers = NULL; + souphttpsink->failures = 0; send_message_locked (souphttpsink); g_mutex_unlock (&souphttpsink->mutex); @@ -791,10 +863,9 @@ gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer) gboolean wake; if (souphttpsink->status_code != 0) { - /* FIXME we should allow a moderate amount of retries. */ GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE, ("Could not write to HTTP URI"), - ("error: %d %s", souphttpsink->status_code, + ("status: %d %s", souphttpsink->status_code, souphttpsink->reason_phrase)); return GST_FLOW_ERROR; } diff --git a/ext/soup/gstsouphttpclientsink.h b/ext/soup/gstsouphttpclientsink.h index a33027523f..29c4500169 100644 --- a/ext/soup/gstsouphttpclientsink.h +++ b/ext/soup/gstsouphttpclientsink.h @@ -43,6 +43,7 @@ struct _GstSoupHttpClientSink GMainContext *context; GMainLoop *loop; GThread *thread; + GSource *timer; SoupMessage *message; SoupSession *session; GList *queued_buffers; @@ -54,6 +55,7 @@ struct _GstSoupHttpClientSink guint64 offset; int timeout; + gint failures; /* properties */ SoupSession *prop_session; @@ -67,6 +69,8 @@ struct _GstSoupHttpClientSink gboolean automatic_redirect; gchar **cookies; SoupLoggerLogLevel log_level; + gint retry_delay; + gint retries; }; struct _GstSoupHttpClientSinkClass -- 2.34.1