2 * Copyright (C) 2011 David Schleef <ds@entropywave.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17 * Boston, MA 02110-1335, USA.
20 * SECTION:element-gstsouphttpclientsink
22 * The souphttpclientsink element sends pipeline data to an HTTP server
23 * using HTTP PUT commands.
26 * <title>Example launch line</title>
28 * gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
29 * souphttpclientsink location=http://server/filename.ogv
32 * This example encodes 10 seconds of video and sends it to the HTTP
33 * server "server" using HTTP PUT commands.
42 #include <gst/base/gstbasesink.h>
43 #include "gstsouphttpclientsink.h"
44 #include "gstsouputils.h"
46 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
47 #define GST_CAT_DEFAULT souphttpclientsink_dbg
52 static void gst_soup_http_client_sink_set_property (GObject * object,
53 guint property_id, const GValue * value, GParamSpec * pspec);
54 static void gst_soup_http_client_sink_get_property (GObject * object,
55 guint property_id, GValue * value, GParamSpec * pspec);
56 static void gst_soup_http_client_sink_dispose (GObject * object);
57 static void gst_soup_http_client_sink_finalize (GObject * object);
59 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
61 static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
62 GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
63 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
64 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
65 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
66 static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
68 static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
70 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
73 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
75 static void authenticate (SoupSession * session, SoupMessage * msg,
76 SoupAuth * auth, gboolean retrying, gpointer user_data);
77 static void callback (SoupSession * session, SoupMessage * msg,
79 static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
80 souphttpsink, const gchar * uri);
87 PROP_AUTOMATIC_REDIRECT,
100 #define DEFAULT_USER_AGENT "GStreamer souphttpclientsink "
101 #define DEFAULT_SOUP_LOG_LEVEL SOUP_LOGGER_LOG_NONE
105 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("sink",
109 GST_STATIC_CAPS_ANY);
112 /* class initialization */
114 #define gst_soup_http_client_sink_parent_class parent_class
115 G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
119 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
121 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
122 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123 GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
125 gobject_class->set_property = gst_soup_http_client_sink_set_property;
126 gobject_class->get_property = gst_soup_http_client_sink_get_property;
127 gobject_class->dispose = gst_soup_http_client_sink_dispose;
128 gobject_class->finalize = gst_soup_http_client_sink_finalize;
130 g_object_class_install_property (gobject_class,
132 g_param_spec_string ("location", "Location",
133 "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
134 g_object_class_install_property (gobject_class,
136 g_param_spec_string ("user-agent", "User-Agent",
137 "Value of the User-Agent HTTP request header field",
138 DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
139 g_object_class_install_property (gobject_class,
140 PROP_AUTOMATIC_REDIRECT,
141 g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
142 "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
143 TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
144 g_object_class_install_property (gobject_class,
146 g_param_spec_string ("proxy", "Proxy",
147 "HTTP proxy server URI", "",
148 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
149 g_object_class_install_property (gobject_class,
151 g_param_spec_string ("user-id", "user-id",
152 "user id for authentication", "",
153 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154 g_object_class_install_property (gobject_class, PROP_USER_PW,
155 g_param_spec_string ("user-pw", "user-pw",
156 "user password for authentication", "",
157 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158 g_object_class_install_property (gobject_class, PROP_PROXY_ID,
159 g_param_spec_string ("proxy-id", "proxy-id",
160 "user id for proxy authentication", "",
161 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162 g_object_class_install_property (gobject_class, PROP_PROXY_PW,
163 g_param_spec_string ("proxy-pw", "proxy-pw",
164 "user password for proxy authentication", "",
165 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
166 g_object_class_install_property (gobject_class, PROP_SESSION,
167 g_param_spec_object ("session", "session",
168 "SoupSession object to use for communication",
169 SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
170 g_object_class_install_property (gobject_class, PROP_COOKIES,
171 g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
172 G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
173 g_object_class_install_property (gobject_class, PROP_RETRY_DELAY,
174 g_param_spec_int ("retry-delay", "Retry Delay",
175 "Delay in seconds between retries after a failure", 1, G_MAXINT, 5,
176 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177 g_object_class_install_property (gobject_class, PROP_RETRIES,
178 g_param_spec_int ("retries", "Retries",
179 "Maximum number of retries, zero to disable, -1 to retry forever",
180 -1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
182 * GstSoupHttpClientSink::http-log-level:
184 * If set and > 0, captures and dumps HTTP session data as
185 * log messages if log level >= GST_LEVEL_TRACE
189 g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL,
190 g_param_spec_enum ("http-log-level", "HTTP log level",
191 "Set log level for soup's HTTP session log",
192 SOUP_TYPE_LOGGER_LOG_LEVEL, DEFAULT_SOUP_LOG_LEVEL,
193 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
195 gst_element_class_add_pad_template (gstelement_class,
196 gst_static_pad_template_get (&gst_soup_http_client_sink_sink_template));
198 gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink",
199 "Generic", "Sends streams to HTTP server via PUT",
200 "David Schleef <ds@entropywave.com>");
202 base_sink_class->set_caps =
203 GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
205 base_sink_class->get_times =
206 GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
207 base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
208 base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
209 base_sink_class->unlock =
210 GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
211 base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
213 base_sink_class->preroll =
214 GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
215 base_sink_class->render =
216 GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
218 GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
219 "souphttpclientsink element");
224 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
228 g_mutex_init (&souphttpsink->mutex);
229 g_cond_init (&souphttpsink->cond);
231 souphttpsink->location = NULL;
232 souphttpsink->automatic_redirect = TRUE;
233 souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
234 souphttpsink->user_id = NULL;
235 souphttpsink->user_pw = NULL;
236 souphttpsink->proxy_id = NULL;
237 souphttpsink->proxy_pw = NULL;
238 souphttpsink->prop_session = NULL;
239 souphttpsink->timeout = 1;
240 souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
241 souphttpsink->retry_delay = 5;
242 souphttpsink->retries = 0;
243 proxy = g_getenv ("http_proxy");
244 if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
245 GST_WARNING_OBJECT (souphttpsink,
246 "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
250 gst_soup_http_client_sink_reset (souphttpsink);
254 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
256 g_list_free_full (souphttpsink->queued_buffers,
257 (GDestroyNotify) gst_buffer_unref);
258 souphttpsink->queued_buffers = NULL;
259 g_free (souphttpsink->reason_phrase);
260 souphttpsink->reason_phrase = NULL;
261 souphttpsink->status_code = 0;
262 souphttpsink->offset = 0;
263 souphttpsink->failures = 0;
265 g_list_free_full (souphttpsink->streamheader_buffers,
266 (GDestroyNotify) gst_buffer_unref);
267 g_list_free_full (souphttpsink->sent_buffers,
268 (GDestroyNotify) gst_buffer_unref);
272 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
275 if (souphttpsink->proxy) {
276 soup_uri_free (souphttpsink->proxy);
277 souphttpsink->proxy = NULL;
279 if (g_str_has_prefix (uri, "http://")) {
280 souphttpsink->proxy = soup_uri_new (uri);
282 gchar *new_uri = g_strconcat ("http://", uri, NULL);
284 souphttpsink->proxy = soup_uri_new (new_uri);
292 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
293 const GValue * value, GParamSpec * pspec)
295 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
297 g_mutex_lock (&souphttpsink->mutex);
298 switch (property_id) {
300 if (souphttpsink->prop_session) {
301 g_object_unref (souphttpsink->prop_session);
303 souphttpsink->prop_session = g_value_dup_object (value);
306 g_free (souphttpsink->location);
307 souphttpsink->location = g_value_dup_string (value);
308 souphttpsink->offset = 0;
309 if ((souphttpsink->location == NULL)
310 || !gst_uri_is_valid (souphttpsink->location)) {
311 GST_WARNING_OBJECT (souphttpsink,
312 "The location (\"%s\") set, is not a valid uri.",
313 souphttpsink->location);
314 g_free (souphttpsink->location);
315 souphttpsink->location = NULL;
318 case PROP_USER_AGENT:
319 g_free (souphttpsink->user_agent);
320 souphttpsink->user_agent = g_value_dup_string (value);
322 case PROP_AUTOMATIC_REDIRECT:
323 souphttpsink->automatic_redirect = g_value_get_boolean (value);
326 g_free (souphttpsink->user_id);
327 souphttpsink->user_id = g_value_dup_string (value);
330 g_free (souphttpsink->user_pw);
331 souphttpsink->user_pw = g_value_dup_string (value);
334 g_free (souphttpsink->proxy_id);
335 souphttpsink->proxy_id = g_value_dup_string (value);
338 g_free (souphttpsink->proxy_pw);
339 souphttpsink->proxy_pw = g_value_dup_string (value);
345 proxy = g_value_get_string (value);
348 GST_WARNING ("proxy property cannot be NULL");
351 if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
352 GST_WARNING ("badly formatted proxy URI");
358 g_strfreev (souphttpsink->cookies);
359 souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
361 case PROP_SOUP_LOG_LEVEL:
362 souphttpsink->log_level = g_value_get_enum (value);
364 case PROP_RETRY_DELAY:
365 souphttpsink->retry_delay = g_value_get_int (value);
368 souphttpsink->retries = g_value_get_int (value);
371 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
375 g_mutex_unlock (&souphttpsink->mutex);
379 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
380 GValue * value, GParamSpec * pspec)
382 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
384 switch (property_id) {
386 g_value_set_object (value, souphttpsink->prop_session);
389 g_value_set_string (value, souphttpsink->location);
391 case PROP_AUTOMATIC_REDIRECT:
392 g_value_set_boolean (value, souphttpsink->automatic_redirect);
394 case PROP_USER_AGENT:
395 g_value_set_string (value, souphttpsink->user_agent);
398 g_value_set_string (value, souphttpsink->user_id);
401 g_value_set_string (value, souphttpsink->user_pw);
404 g_value_set_string (value, souphttpsink->proxy_id);
407 g_value_set_string (value, souphttpsink->proxy_pw);
410 if (souphttpsink->proxy == NULL)
411 g_value_set_static_string (value, "");
413 char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
415 g_value_set_string (value, proxy);
420 g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
422 case PROP_SOUP_LOG_LEVEL:
423 g_value_set_enum (value, souphttpsink->log_level);
425 case PROP_RETRY_DELAY:
426 g_value_set_int (value, souphttpsink->retry_delay);
429 g_value_set_int (value, souphttpsink->retries);
432 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
438 gst_soup_http_client_sink_dispose (GObject * object)
440 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
442 /* clean up as possible. may be called multiple times */
443 if (souphttpsink->prop_session)
444 g_object_unref (souphttpsink->prop_session);
445 souphttpsink->prop_session = NULL;
447 G_OBJECT_CLASS (parent_class)->dispose (object);
451 gst_soup_http_client_sink_finalize (GObject * object)
453 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
455 /* clean up object here */
457 g_free (souphttpsink->user_agent);
458 g_free (souphttpsink->user_id);
459 g_free (souphttpsink->user_pw);
460 g_free (souphttpsink->proxy_id);
461 g_free (souphttpsink->proxy_pw);
462 if (souphttpsink->proxy)
463 soup_uri_free (souphttpsink->proxy);
464 g_free (souphttpsink->location);
465 g_strfreev (souphttpsink->cookies);
467 g_cond_clear (&souphttpsink->cond);
468 g_mutex_clear (&souphttpsink->mutex);
470 G_OBJECT_CLASS (parent_class)->finalize (object);
476 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
478 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
479 GstStructure *structure;
480 const GValue *value_array;
483 GST_DEBUG_OBJECT (souphttpsink, "new stream headers set");
484 structure = gst_caps_get_structure (caps, 0);
485 value_array = gst_structure_get_value (structure, "streamheader");
487 g_list_free_full (souphttpsink->streamheader_buffers,
488 (GDestroyNotify) gst_buffer_unref);
489 souphttpsink->streamheader_buffers = NULL;
491 n = gst_value_array_get_size (value_array);
492 for (i = 0; i < n; i++) {
495 value = gst_value_array_get_value (value_array, i);
496 buffer = GST_BUFFER (gst_value_get_buffer (value));
497 souphttpsink->streamheader_buffers =
498 g_list_append (souphttpsink->streamheader_buffers,
499 gst_buffer_ref (buffer));
507 gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
508 GstClockTime * start, GstClockTime * end)
514 thread_ready_idle_cb (gpointer data)
516 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
518 GST_LOG_OBJECT (souphttpsink, "thread ready");
520 g_mutex_lock (&souphttpsink->mutex);
521 g_cond_signal (&souphttpsink->cond);
522 g_mutex_unlock (&souphttpsink->mutex);
524 return FALSE; /* only run once */
528 thread_func (gpointer ptr)
530 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
532 GST_DEBUG ("thread start");
534 g_main_loop_run (souphttpsink->loop);
536 GST_DEBUG ("thread quit");
542 gst_soup_http_client_sink_start (GstBaseSink * sink)
544 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
546 if (souphttpsink->prop_session) {
547 souphttpsink->session = souphttpsink->prop_session;
550 GError *error = NULL;
552 souphttpsink->context = g_main_context_new ();
554 /* set up idle source to signal when the main loop is running and
555 * it's safe for ::stop() to call g_main_loop_quit() */
556 source = g_idle_source_new ();
557 g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
558 g_source_attach (source, souphttpsink->context);
559 g_source_unref (source);
561 souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
563 g_mutex_lock (&souphttpsink->mutex);
565 /* FIXME: error handling */
566 souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
567 thread_func, souphttpsink, &error);
569 GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
570 g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
571 g_mutex_unlock (&souphttpsink->mutex);
572 GST_LOG_OBJECT (souphttpsink, "main loop thread running");
574 if (souphttpsink->proxy == NULL) {
575 souphttpsink->session =
576 soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
577 souphttpsink->context, SOUP_SESSION_USER_AGENT,
578 souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
581 souphttpsink->session =
582 soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
583 souphttpsink->context, SOUP_SESSION_USER_AGENT,
584 souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
585 SOUP_SESSION_PROXY_URI, souphttpsink->proxy, NULL);
588 g_signal_connect (souphttpsink->session, "authenticate",
589 G_CALLBACK (authenticate), souphttpsink);
593 gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level,
594 GST_ELEMENT (souphttpsink));
600 gst_soup_http_client_sink_stop (GstBaseSink * sink)
602 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
606 if (souphttpsink->prop_session == NULL) {
607 soup_session_abort (souphttpsink->session);
608 g_object_unref (souphttpsink->session);
611 g_mutex_lock (&souphttpsink->mutex);
612 if (souphttpsink->timer) {
613 g_source_destroy (souphttpsink->timer);
614 g_source_unref (souphttpsink->timer);
615 souphttpsink->timer = NULL;
617 g_mutex_unlock (&souphttpsink->mutex);
619 if (souphttpsink->loop) {
620 g_main_loop_quit (souphttpsink->loop);
621 g_mutex_lock (&souphttpsink->mutex);
622 g_cond_signal (&souphttpsink->cond);
623 g_mutex_unlock (&souphttpsink->mutex);
624 g_thread_join (souphttpsink->thread);
625 g_main_loop_unref (souphttpsink->loop);
626 souphttpsink->loop = NULL;
628 if (souphttpsink->context) {
629 g_main_context_unref (souphttpsink->context);
630 souphttpsink->context = NULL;
633 gst_soup_http_client_sink_reset (souphttpsink);
639 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
641 GST_DEBUG ("unlock");
647 gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
649 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
651 GST_DEBUG_OBJECT (souphttpsink, "event");
653 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
654 GST_DEBUG_OBJECT (souphttpsink, "got eos");
655 g_mutex_lock (&souphttpsink->mutex);
656 while (souphttpsink->message) {
657 GST_DEBUG_OBJECT (souphttpsink, "waiting");
658 g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
660 g_mutex_unlock (&souphttpsink->mutex);
661 GST_DEBUG_OBJECT (souphttpsink, "finished eos");
664 return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
668 gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
670 GST_DEBUG ("preroll");
676 send_message_locked (GstSoupHttpClientSink * souphttpsink)
681 if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
685 /* If the URI went away, drop all these buffers */
686 if (souphttpsink->location == NULL) {
687 GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers");
688 g_list_free_full (souphttpsink->queued_buffers,
689 (GDestroyNotify) gst_buffer_unref);
690 souphttpsink->queued_buffers = NULL;
694 souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
695 if (souphttpsink->message == NULL) {
696 GST_WARNING_OBJECT (souphttpsink,
697 "URI could not be parsed while creating message.");
698 g_list_free_full (souphttpsink->queued_buffers,
699 (GDestroyNotify) gst_buffer_unref);
700 souphttpsink->queued_buffers = NULL;
704 soup_message_set_flags (souphttpsink->message,
705 (souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
707 if (souphttpsink->cookies) {
710 for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) {
711 soup_message_headers_append (souphttpsink->message->request_headers,
717 if (souphttpsink->offset == 0) {
718 for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
719 GstBuffer *buffer = g->data;
722 GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers");
723 gst_buffer_map (buffer, &map, GST_MAP_READ);
724 /* Stream headers are updated whenever ::set_caps is called, so there's
725 * no guarantees about their lifetime and we ask libsoup to copy them
726 * into the message body with SOUP_MEMORY_COPY. */
727 soup_message_body_append (souphttpsink->message->request_body,
728 SOUP_MEMORY_COPY, map.data, map.size);
730 gst_buffer_unmap (buffer, &map);
734 for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
735 GstBuffer *buffer = g->data;
736 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
739 gst_buffer_map (buffer, &map, GST_MAP_READ);
740 /* Queued buffers are only freed in the next iteration of the mainloop
741 * after the message body has been written out, so we don't need libsoup
742 * to copy those while appending to the body. However, if the buffer is
743 * used elsewhere, it should be copied. Hence, SOUP_MEMORY_TEMPORARY. */
744 soup_message_body_append (souphttpsink->message->request_body,
745 SOUP_MEMORY_TEMPORARY, map.data, map.size);
747 gst_buffer_unmap (buffer, &map);
751 if (souphttpsink->offset != 0) {
753 s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
754 souphttpsink->offset, souphttpsink->offset + n - 1);
755 soup_message_headers_append (souphttpsink->message->request_headers,
761 GST_DEBUG_OBJECT (souphttpsink,
762 "total size of buffers queued is 0, freeing everything");
763 g_list_free_full (souphttpsink->queued_buffers,
764 (GDestroyNotify) gst_buffer_unref);
765 souphttpsink->queued_buffers = NULL;
766 g_object_unref (souphttpsink->message);
767 souphttpsink->message = NULL;
771 souphttpsink->sent_buffers = souphttpsink->queued_buffers;
772 souphttpsink->queued_buffers = NULL;
774 GST_DEBUG_OBJECT (souphttpsink,
775 "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
776 souphttpsink->offset, n);
777 soup_session_queue_message (souphttpsink->session, souphttpsink->message,
778 callback, souphttpsink);
780 souphttpsink->offset += n;
784 send_message (GstSoupHttpClientSink * souphttpsink)
786 g_mutex_lock (&souphttpsink->mutex);
787 send_message_locked (souphttpsink);
788 if (souphttpsink->timer) {
789 g_source_destroy (souphttpsink->timer);
790 g_source_unref (souphttpsink->timer);
791 souphttpsink->timer = NULL;
793 g_mutex_unlock (&souphttpsink->mutex);
799 callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
801 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
803 GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
804 msg->status_code, msg->reason_phrase);
806 g_mutex_lock (&souphttpsink->mutex);
807 g_cond_signal (&souphttpsink->cond);
808 souphttpsink->message = NULL;
810 if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
811 souphttpsink->failures++;
812 if (souphttpsink->retries &&
813 (souphttpsink->retries < 0 ||
814 souphttpsink->retries >= souphttpsink->failures)) {
816 const char *retry_after =
817 soup_message_headers_get_one (msg->response_headers,
821 retry_delay = g_ascii_strtoull (retry_after, &end, 10);
823 retry_delay = souphttpsink->retry_delay;
825 retry_delay = MAX (retry_delay, souphttpsink->retry_delay);
827 GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
828 "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
829 " seconds with Retry-After: %s)", msg->status_code,
830 msg->reason_phrase, retry_delay, retry_after);
832 retry_delay = souphttpsink->retry_delay;
833 GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
834 "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
835 " seconds)", msg->status_code, msg->reason_phrase, retry_delay);
837 souphttpsink->timer = g_timeout_source_new_seconds (retry_delay);
838 g_source_set_callback (souphttpsink->timer, (GSourceFunc) (send_message),
840 g_source_attach (souphttpsink->timer, souphttpsink->context);
842 souphttpsink->status_code = msg->status_code;
843 souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
845 g_mutex_unlock (&souphttpsink->mutex);
849 g_list_free_full (souphttpsink->sent_buffers,
850 (GDestroyNotify) gst_buffer_unref);
851 souphttpsink->sent_buffers = NULL;
852 souphttpsink->failures = 0;
854 send_message_locked (souphttpsink);
855 g_mutex_unlock (&souphttpsink->mutex);
859 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
861 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
865 if (souphttpsink->status_code != 0) {
866 GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
867 ("Could not write to HTTP URI"),
868 ("status: %d %s", souphttpsink->status_code,
869 souphttpsink->reason_phrase));
870 return GST_FLOW_ERROR;
873 g_mutex_lock (&souphttpsink->mutex);
874 if (souphttpsink->location != NULL) {
875 wake = (souphttpsink->queued_buffers == NULL);
876 souphttpsink->queued_buffers =
877 g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
880 GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers");
881 source = g_idle_source_new ();
882 g_source_set_callback (source, (GSourceFunc) (send_message),
884 g_source_attach (source, souphttpsink->context);
885 g_source_unref (source);
888 g_mutex_unlock (&souphttpsink->mutex);
894 authenticate (SoupSession * session, SoupMessage * msg,
895 SoupAuth * auth, gboolean retrying, gpointer user_data)
897 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
900 /* First time authentication only, if we fail and are called again with retry true fall through */
901 if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
902 if (souphttpsink->user_id && souphttpsink->user_pw)
903 soup_auth_authenticate (auth, souphttpsink->user_id,
904 souphttpsink->user_pw);
905 } else if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
906 if (souphttpsink->proxy_id && souphttpsink->proxy_pw)
907 soup_auth_authenticate (auth, souphttpsink->proxy_id,
908 souphttpsink->proxy_pw);