tests: rtp-payloading: Test for handling of custom events in rtpgst
[platform/upstream/gstreamer.git] / ext / soup / gstsouphttpclientsink.c
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17  * Boston, MA 02110-1335, USA.
18  */
19 /**
20  * SECTION:element-gstsouphttpclientsink
21  *
22  * The souphttpclientsink element sends pipeline data to an HTTP server
23  * using HTTP PUT commands.
24  *
25  * <refsect2>
26  * <title>Example launch line</title>
27  * |[
28  * gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
29  *   souphttpclientsink location=http://server/filename.ogv
30  * ]|
31  * 
32  * This example encodes 10 seconds of video and sends it to the HTTP
33  * server "server" using HTTP PUT commands.
34  * </refsect2>
35  */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include <gst/gst.h>
42 #include <gst/base/gstbasesink.h>
43 #include "gstsouphttpclientsink.h"
44 #include "gstsouputils.h"
45
46 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
47 #define GST_CAT_DEFAULT souphttpclientsink_dbg
48
49 /* prototypes */
50
51
52 static void gst_soup_http_client_sink_set_property (GObject * object,
53     guint property_id, const GValue * value, GParamSpec * pspec);
54 static void gst_soup_http_client_sink_get_property (GObject * object,
55     guint property_id, GValue * value, GParamSpec * pspec);
56 static void gst_soup_http_client_sink_dispose (GObject * object);
57 static void gst_soup_http_client_sink_finalize (GObject * object);
58
59 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
60     GstCaps * caps);
61 static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
62     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
63 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
64 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
65 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
66 static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
67     GstEvent * event);
68 static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
69     GstBuffer * buffer);
70 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
71     GstBuffer * buffer);
72
73 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
74     souphttpsink);
75 static void authenticate (SoupSession * session, SoupMessage * msg,
76     SoupAuth * auth, gboolean retrying, gpointer user_data);
77 static void callback (SoupSession * session, SoupMessage * msg,
78     gpointer user_data);
79 static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
80     souphttpsink, const gchar * uri);
81
82 enum
83 {
84   PROP_0,
85   PROP_LOCATION,
86   PROP_USER_AGENT,
87   PROP_AUTOMATIC_REDIRECT,
88   PROP_PROXY,
89   PROP_USER_ID,
90   PROP_USER_PW,
91   PROP_PROXY_ID,
92   PROP_PROXY_PW,
93   PROP_COOKIES,
94   PROP_SESSION,
95   PROP_SOUP_LOG_LEVEL,
96   PROP_RETRY_DELAY,
97   PROP_RETRIES
98 };
99
100 #define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
101 #define DEFAULT_SOUP_LOG_LEVEL       SOUP_LOGGER_LOG_NONE
102
103 /* pad templates */
104
105 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("sink",
107     GST_PAD_SINK,
108     GST_PAD_ALWAYS,
109     GST_STATIC_CAPS_ANY);
110
111
112 /* class initialization */
113
114 #define gst_soup_http_client_sink_parent_class parent_class
115 G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
116     GST_TYPE_BASE_SINK);
117
118 static void
119 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
120 {
121   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
122   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
124
125   gobject_class->set_property = gst_soup_http_client_sink_set_property;
126   gobject_class->get_property = gst_soup_http_client_sink_get_property;
127   gobject_class->dispose = gst_soup_http_client_sink_dispose;
128   gobject_class->finalize = gst_soup_http_client_sink_finalize;
129
130   g_object_class_install_property (gobject_class,
131       PROP_LOCATION,
132       g_param_spec_string ("location", "Location",
133           "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
134   g_object_class_install_property (gobject_class,
135       PROP_USER_AGENT,
136       g_param_spec_string ("user-agent", "User-Agent",
137           "Value of the User-Agent HTTP request header field",
138           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
139   g_object_class_install_property (gobject_class,
140       PROP_AUTOMATIC_REDIRECT,
141       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
142           "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
143           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
144   g_object_class_install_property (gobject_class,
145       PROP_PROXY,
146       g_param_spec_string ("proxy", "Proxy",
147           "HTTP proxy server URI", "",
148           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
149   g_object_class_install_property (gobject_class,
150       PROP_USER_ID,
151       g_param_spec_string ("user-id", "user-id",
152           "user id for authentication", "",
153           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154   g_object_class_install_property (gobject_class, PROP_USER_PW,
155       g_param_spec_string ("user-pw", "user-pw",
156           "user password for authentication", "",
157           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
159       g_param_spec_string ("proxy-id", "proxy-id",
160           "user id for proxy authentication", "",
161           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
163       g_param_spec_string ("proxy-pw", "proxy-pw",
164           "user password for proxy authentication", "",
165           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
166   g_object_class_install_property (gobject_class, PROP_SESSION,
167       g_param_spec_object ("session", "session",
168           "SoupSession object to use for communication",
169           SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
170   g_object_class_install_property (gobject_class, PROP_COOKIES,
171       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
172           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
173   g_object_class_install_property (gobject_class, PROP_RETRY_DELAY,
174       g_param_spec_int ("retry-delay", "Retry Delay",
175           "Delay in seconds between retries after a failure", 1, G_MAXINT, 5,
176           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177   g_object_class_install_property (gobject_class, PROP_RETRIES,
178       g_param_spec_int ("retries", "Retries",
179           "Maximum number of retries, zero to disable, -1 to retry forever",
180           -1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
181  /**
182    * GstSoupHttpClientSink::http-log-level:
183    *
184    * If set and > 0, captures and dumps HTTP session data as
185    * log messages if log level >= GST_LEVEL_TRACE
186    *
187    * Since: 1.4
188    */
189   g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL,
190       g_param_spec_enum ("http-log-level", "HTTP log level",
191           "Set log level for soup's HTTP session log",
192           SOUP_TYPE_LOGGER_LOG_LEVEL, DEFAULT_SOUP_LOG_LEVEL,
193           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
194
195   gst_element_class_add_pad_template (gstelement_class,
196       gst_static_pad_template_get (&gst_soup_http_client_sink_sink_template));
197
198   gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink",
199       "Generic", "Sends streams to HTTP server via PUT",
200       "David Schleef <ds@entropywave.com>");
201
202   base_sink_class->set_caps =
203       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
204   if (0)
205     base_sink_class->get_times =
206         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
207   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
208   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
209   base_sink_class->unlock =
210       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
211   base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
212   if (0)
213     base_sink_class->preroll =
214         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
215   base_sink_class->render =
216       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
217
218   GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
219       "souphttpclientsink element");
220
221 }
222
223 static void
224 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
225 {
226   const char *proxy;
227
228   g_mutex_init (&souphttpsink->mutex);
229   g_cond_init (&souphttpsink->cond);
230
231   souphttpsink->location = NULL;
232   souphttpsink->automatic_redirect = TRUE;
233   souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
234   souphttpsink->user_id = NULL;
235   souphttpsink->user_pw = NULL;
236   souphttpsink->proxy_id = NULL;
237   souphttpsink->proxy_pw = NULL;
238   souphttpsink->prop_session = NULL;
239   souphttpsink->timeout = 1;
240   souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
241   souphttpsink->retry_delay = 5;
242   souphttpsink->retries = 0;
243   proxy = g_getenv ("http_proxy");
244   if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
245     GST_WARNING_OBJECT (souphttpsink,
246         "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
247         proxy);
248   }
249
250   gst_soup_http_client_sink_reset (souphttpsink);
251 }
252
253 static void
254 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
255 {
256   g_list_free_full (souphttpsink->queued_buffers,
257       (GDestroyNotify) gst_buffer_unref);
258   souphttpsink->queued_buffers = NULL;
259   g_free (souphttpsink->reason_phrase);
260   souphttpsink->reason_phrase = NULL;
261   souphttpsink->status_code = 0;
262   souphttpsink->offset = 0;
263   souphttpsink->failures = 0;
264
265   g_list_free_full (souphttpsink->streamheader_buffers,
266       (GDestroyNotify) gst_buffer_unref);
267   g_list_free_full (souphttpsink->sent_buffers,
268       (GDestroyNotify) gst_buffer_unref);
269 }
270
271 static gboolean
272 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
273     const gchar * uri)
274 {
275   if (souphttpsink->proxy) {
276     soup_uri_free (souphttpsink->proxy);
277     souphttpsink->proxy = NULL;
278   }
279   if (g_str_has_prefix (uri, "http://")) {
280     souphttpsink->proxy = soup_uri_new (uri);
281   } else {
282     gchar *new_uri = g_strconcat ("http://", uri, NULL);
283
284     souphttpsink->proxy = soup_uri_new (new_uri);
285     g_free (new_uri);
286   }
287
288   return TRUE;
289 }
290
291 void
292 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
293     const GValue * value, GParamSpec * pspec)
294 {
295   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
296
297   g_mutex_lock (&souphttpsink->mutex);
298   switch (property_id) {
299     case PROP_SESSION:
300       if (souphttpsink->prop_session) {
301         g_object_unref (souphttpsink->prop_session);
302       }
303       souphttpsink->prop_session = g_value_dup_object (value);
304       break;
305     case PROP_LOCATION:
306       g_free (souphttpsink->location);
307       souphttpsink->location = g_value_dup_string (value);
308       souphttpsink->offset = 0;
309       if ((souphttpsink->location == NULL)
310           || !gst_uri_is_valid (souphttpsink->location)) {
311         GST_WARNING_OBJECT (souphttpsink,
312             "The location (\"%s\") set, is not a valid uri.",
313             souphttpsink->location);
314         g_free (souphttpsink->location);
315         souphttpsink->location = NULL;
316       }
317       break;
318     case PROP_USER_AGENT:
319       g_free (souphttpsink->user_agent);
320       souphttpsink->user_agent = g_value_dup_string (value);
321       break;
322     case PROP_AUTOMATIC_REDIRECT:
323       souphttpsink->automatic_redirect = g_value_get_boolean (value);
324       break;
325     case PROP_USER_ID:
326       g_free (souphttpsink->user_id);
327       souphttpsink->user_id = g_value_dup_string (value);
328       break;
329     case PROP_USER_PW:
330       g_free (souphttpsink->user_pw);
331       souphttpsink->user_pw = g_value_dup_string (value);
332       break;
333     case PROP_PROXY_ID:
334       g_free (souphttpsink->proxy_id);
335       souphttpsink->proxy_id = g_value_dup_string (value);
336       break;
337     case PROP_PROXY_PW:
338       g_free (souphttpsink->proxy_pw);
339       souphttpsink->proxy_pw = g_value_dup_string (value);
340       break;
341     case PROP_PROXY:
342     {
343       const gchar *proxy;
344
345       proxy = g_value_get_string (value);
346
347       if (proxy == NULL) {
348         GST_WARNING ("proxy property cannot be NULL");
349         goto done;
350       }
351       if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
352         GST_WARNING ("badly formatted proxy URI");
353         goto done;
354       }
355       break;
356     }
357     case PROP_COOKIES:
358       g_strfreev (souphttpsink->cookies);
359       souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
360       break;
361     case PROP_SOUP_LOG_LEVEL:
362       souphttpsink->log_level = g_value_get_enum (value);
363       break;
364     case PROP_RETRY_DELAY:
365       souphttpsink->retry_delay = g_value_get_int (value);
366       break;
367     case PROP_RETRIES:
368       souphttpsink->retries = g_value_get_int (value);
369       break;
370     default:
371       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
372       break;
373   }
374 done:
375   g_mutex_unlock (&souphttpsink->mutex);
376 }
377
378 void
379 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
380     GValue * value, GParamSpec * pspec)
381 {
382   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
383
384   switch (property_id) {
385     case PROP_SESSION:
386       g_value_set_object (value, souphttpsink->prop_session);
387       break;
388     case PROP_LOCATION:
389       g_value_set_string (value, souphttpsink->location);
390       break;
391     case PROP_AUTOMATIC_REDIRECT:
392       g_value_set_boolean (value, souphttpsink->automatic_redirect);
393       break;
394     case PROP_USER_AGENT:
395       g_value_set_string (value, souphttpsink->user_agent);
396       break;
397     case PROP_USER_ID:
398       g_value_set_string (value, souphttpsink->user_id);
399       break;
400     case PROP_USER_PW:
401       g_value_set_string (value, souphttpsink->user_pw);
402       break;
403     case PROP_PROXY_ID:
404       g_value_set_string (value, souphttpsink->proxy_id);
405       break;
406     case PROP_PROXY_PW:
407       g_value_set_string (value, souphttpsink->proxy_pw);
408       break;
409     case PROP_PROXY:
410       if (souphttpsink->proxy == NULL)
411         g_value_set_static_string (value, "");
412       else {
413         char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
414
415         g_value_set_string (value, proxy);
416         g_free (proxy);
417       }
418       break;
419     case PROP_COOKIES:
420       g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
421       break;
422     case PROP_SOUP_LOG_LEVEL:
423       g_value_set_enum (value, souphttpsink->log_level);
424       break;
425     case PROP_RETRY_DELAY:
426       g_value_set_int (value, souphttpsink->retry_delay);
427       break;
428     case PROP_RETRIES:
429       g_value_set_int (value, souphttpsink->retries);
430       break;
431     default:
432       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
433       break;
434   }
435 }
436
437 void
438 gst_soup_http_client_sink_dispose (GObject * object)
439 {
440   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
441
442   /* clean up as possible.  may be called multiple times */
443   if (souphttpsink->prop_session)
444     g_object_unref (souphttpsink->prop_session);
445   souphttpsink->prop_session = NULL;
446
447   G_OBJECT_CLASS (parent_class)->dispose (object);
448 }
449
450 void
451 gst_soup_http_client_sink_finalize (GObject * object)
452 {
453   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
454
455   /* clean up object here */
456
457   g_free (souphttpsink->user_agent);
458   g_free (souphttpsink->user_id);
459   g_free (souphttpsink->user_pw);
460   g_free (souphttpsink->proxy_id);
461   g_free (souphttpsink->proxy_pw);
462   if (souphttpsink->proxy)
463     soup_uri_free (souphttpsink->proxy);
464   g_free (souphttpsink->location);
465   g_strfreev (souphttpsink->cookies);
466
467   g_cond_clear (&souphttpsink->cond);
468   g_mutex_clear (&souphttpsink->mutex);
469
470   G_OBJECT_CLASS (parent_class)->finalize (object);
471 }
472
473
474
475 static gboolean
476 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
477 {
478   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
479   GstStructure *structure;
480   const GValue *value_array;
481   int i, n;
482
483   GST_DEBUG_OBJECT (souphttpsink, "new stream headers set");
484   structure = gst_caps_get_structure (caps, 0);
485   value_array = gst_structure_get_value (structure, "streamheader");
486   if (value_array) {
487     g_list_free_full (souphttpsink->streamheader_buffers,
488         (GDestroyNotify) gst_buffer_unref);
489     souphttpsink->streamheader_buffers = NULL;
490
491     n = gst_value_array_get_size (value_array);
492     for (i = 0; i < n; i++) {
493       const GValue *value;
494       GstBuffer *buffer;
495       value = gst_value_array_get_value (value_array, i);
496       buffer = GST_BUFFER (gst_value_get_buffer (value));
497       souphttpsink->streamheader_buffers =
498           g_list_append (souphttpsink->streamheader_buffers,
499           gst_buffer_ref (buffer));
500     }
501   }
502
503   return TRUE;
504 }
505
506 static void
507 gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
508     GstClockTime * start, GstClockTime * end)
509 {
510
511 }
512
513 static gboolean
514 thread_ready_idle_cb (gpointer data)
515 {
516   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
517
518   GST_LOG_OBJECT (souphttpsink, "thread ready");
519
520   g_mutex_lock (&souphttpsink->mutex);
521   g_cond_signal (&souphttpsink->cond);
522   g_mutex_unlock (&souphttpsink->mutex);
523
524   return FALSE;                 /* only run once */
525 }
526
527 static gpointer
528 thread_func (gpointer ptr)
529 {
530   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
531
532   GST_DEBUG ("thread start");
533
534   g_main_loop_run (souphttpsink->loop);
535
536   GST_DEBUG ("thread quit");
537
538   return NULL;
539 }
540
541 static gboolean
542 gst_soup_http_client_sink_start (GstBaseSink * sink)
543 {
544   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
545
546   if (souphttpsink->prop_session) {
547     souphttpsink->session = souphttpsink->prop_session;
548   } else {
549     GSource *source;
550     GError *error = NULL;
551
552     souphttpsink->context = g_main_context_new ();
553
554     /* set up idle source to signal when the main loop is running and
555      * it's safe for ::stop() to call g_main_loop_quit() */
556     source = g_idle_source_new ();
557     g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
558     g_source_attach (source, souphttpsink->context);
559     g_source_unref (source);
560
561     souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
562
563     g_mutex_lock (&souphttpsink->mutex);
564
565     /* FIXME: error handling */
566     souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
567         thread_func, souphttpsink, &error);
568
569     GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
570     g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
571     g_mutex_unlock (&souphttpsink->mutex);
572     GST_LOG_OBJECT (souphttpsink, "main loop thread running");
573
574     if (souphttpsink->proxy == NULL) {
575       souphttpsink->session =
576           soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
577           souphttpsink->context, SOUP_SESSION_USER_AGENT,
578           souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
579           NULL);
580     } else {
581       souphttpsink->session =
582           soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
583           souphttpsink->context, SOUP_SESSION_USER_AGENT,
584           souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
585           SOUP_SESSION_PROXY_URI, souphttpsink->proxy, NULL);
586     }
587
588     g_signal_connect (souphttpsink->session, "authenticate",
589         G_CALLBACK (authenticate), souphttpsink);
590   }
591
592   /* Set up logging */
593   gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level,
594       GST_ELEMENT (souphttpsink));
595
596   return TRUE;
597 }
598
599 static gboolean
600 gst_soup_http_client_sink_stop (GstBaseSink * sink)
601 {
602   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
603
604   GST_DEBUG ("stop");
605
606   if (souphttpsink->prop_session == NULL) {
607     soup_session_abort (souphttpsink->session);
608     g_object_unref (souphttpsink->session);
609   }
610
611   g_mutex_lock (&souphttpsink->mutex);
612   if (souphttpsink->timer) {
613     g_source_destroy (souphttpsink->timer);
614     g_source_unref (souphttpsink->timer);
615     souphttpsink->timer = NULL;
616   }
617   g_mutex_unlock (&souphttpsink->mutex);
618
619   if (souphttpsink->loop) {
620     g_main_loop_quit (souphttpsink->loop);
621     g_mutex_lock (&souphttpsink->mutex);
622     g_cond_signal (&souphttpsink->cond);
623     g_mutex_unlock (&souphttpsink->mutex);
624     g_thread_join (souphttpsink->thread);
625     g_main_loop_unref (souphttpsink->loop);
626     souphttpsink->loop = NULL;
627   }
628   if (souphttpsink->context) {
629     g_main_context_unref (souphttpsink->context);
630     souphttpsink->context = NULL;
631   }
632
633   gst_soup_http_client_sink_reset (souphttpsink);
634
635   return TRUE;
636 }
637
638 static gboolean
639 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
640 {
641   GST_DEBUG ("unlock");
642
643   return TRUE;
644 }
645
646 static gboolean
647 gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
648 {
649   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
650
651   GST_DEBUG_OBJECT (souphttpsink, "event");
652
653   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
654     GST_DEBUG_OBJECT (souphttpsink, "got eos");
655     g_mutex_lock (&souphttpsink->mutex);
656     while (souphttpsink->message) {
657       GST_DEBUG_OBJECT (souphttpsink, "waiting");
658       g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
659     }
660     g_mutex_unlock (&souphttpsink->mutex);
661     GST_DEBUG_OBJECT (souphttpsink, "finished eos");
662   }
663
664   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
665 }
666
667 static GstFlowReturn
668 gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
669 {
670   GST_DEBUG ("preroll");
671
672   return GST_FLOW_OK;
673 }
674
675 static void
676 send_message_locked (GstSoupHttpClientSink * souphttpsink)
677 {
678   GList *g;
679   guint64 n;
680
681   if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
682     return;
683   }
684
685   /* If the URI went away, drop all these buffers */
686   if (souphttpsink->location == NULL) {
687     GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers");
688     g_list_free_full (souphttpsink->queued_buffers,
689         (GDestroyNotify) gst_buffer_unref);
690     souphttpsink->queued_buffers = NULL;
691     return;
692   }
693
694   souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
695   if (souphttpsink->message == NULL) {
696     GST_WARNING_OBJECT (souphttpsink,
697         "URI could not be parsed while creating message.");
698     g_list_free_full (souphttpsink->queued_buffers,
699         (GDestroyNotify) gst_buffer_unref);
700     souphttpsink->queued_buffers = NULL;
701     return;
702   }
703
704   soup_message_set_flags (souphttpsink->message,
705       (souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
706
707   if (souphttpsink->cookies) {
708     gchar **cookie;
709
710     for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) {
711       soup_message_headers_append (souphttpsink->message->request_headers,
712           "Cookie", *cookie);
713     }
714   }
715
716   n = 0;
717   if (souphttpsink->offset == 0) {
718     for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
719       GstBuffer *buffer = g->data;
720       GstMapInfo map;
721
722       GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers");
723       gst_buffer_map (buffer, &map, GST_MAP_READ);
724       /* Stream headers are updated whenever ::set_caps is called, so there's
725        * no guarantees about their lifetime and we ask libsoup to copy them 
726        * into the message body with SOUP_MEMORY_COPY. */
727       soup_message_body_append (souphttpsink->message->request_body,
728           SOUP_MEMORY_COPY, map.data, map.size);
729       n += map.size;
730       gst_buffer_unmap (buffer, &map);
731     }
732   }
733
734   for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
735     GstBuffer *buffer = g->data;
736     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
737       GstMapInfo map;
738
739       gst_buffer_map (buffer, &map, GST_MAP_READ);
740       /* Queued buffers are only freed in the next iteration of the mainloop
741        * after the message body has been written out, so we don't need libsoup
742        * to copy those while appending to the body. However, if the buffer is
743        * used elsewhere, it should be copied. Hence, SOUP_MEMORY_TEMPORARY. */
744       soup_message_body_append (souphttpsink->message->request_body,
745           SOUP_MEMORY_TEMPORARY, map.data, map.size);
746       n += map.size;
747       gst_buffer_unmap (buffer, &map);
748     }
749   }
750
751   if (souphttpsink->offset != 0) {
752     char *s;
753     s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
754         souphttpsink->offset, souphttpsink->offset + n - 1);
755     soup_message_headers_append (souphttpsink->message->request_headers,
756         "Content-Range", s);
757     g_free (s);
758   }
759
760   if (n == 0) {
761     GST_DEBUG_OBJECT (souphttpsink,
762         "total size of buffers queued is 0, freeing everything");
763     g_list_free_full (souphttpsink->queued_buffers,
764         (GDestroyNotify) gst_buffer_unref);
765     souphttpsink->queued_buffers = NULL;
766     g_object_unref (souphttpsink->message);
767     souphttpsink->message = NULL;
768     return;
769   }
770
771   souphttpsink->sent_buffers = souphttpsink->queued_buffers;
772   souphttpsink->queued_buffers = NULL;
773
774   GST_DEBUG_OBJECT (souphttpsink,
775       "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
776       souphttpsink->offset, n);
777   soup_session_queue_message (souphttpsink->session, souphttpsink->message,
778       callback, souphttpsink);
779
780   souphttpsink->offset += n;
781 }
782
783 static gboolean
784 send_message (GstSoupHttpClientSink * souphttpsink)
785 {
786   g_mutex_lock (&souphttpsink->mutex);
787   send_message_locked (souphttpsink);
788   if (souphttpsink->timer) {
789     g_source_destroy (souphttpsink->timer);
790     g_source_unref (souphttpsink->timer);
791     souphttpsink->timer = NULL;
792   }
793   g_mutex_unlock (&souphttpsink->mutex);
794
795   return FALSE;
796 }
797
798 static void
799 callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
800 {
801   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
802
803   GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
804       msg->status_code, msg->reason_phrase);
805
806   g_mutex_lock (&souphttpsink->mutex);
807   g_cond_signal (&souphttpsink->cond);
808   souphttpsink->message = NULL;
809
810   if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
811     souphttpsink->failures++;
812     if (souphttpsink->retries &&
813         (souphttpsink->retries < 0 ||
814             souphttpsink->retries >= souphttpsink->failures)) {
815       guint64 retry_delay;
816       const char *retry_after =
817           soup_message_headers_get_one (msg->response_headers,
818           "Retry-After");
819       if (retry_after) {
820         gchar *end = NULL;
821         retry_delay = g_ascii_strtoull (retry_after, &end, 10);
822         if (end || errno) {
823           retry_delay = souphttpsink->retry_delay;
824         } else {
825           retry_delay = MAX (retry_delay, souphttpsink->retry_delay);
826         }
827         GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
828             "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
829             " seconds with Retry-After: %s)", msg->status_code,
830             msg->reason_phrase, retry_delay, retry_after);
831       } else {
832         retry_delay = souphttpsink->retry_delay;
833         GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
834             "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
835             " seconds)", msg->status_code, msg->reason_phrase, retry_delay);
836       }
837       souphttpsink->timer = g_timeout_source_new_seconds (retry_delay);
838       g_source_set_callback (souphttpsink->timer, (GSourceFunc) (send_message),
839           souphttpsink, NULL);
840       g_source_attach (souphttpsink->timer, souphttpsink->context);
841     } else {
842       souphttpsink->status_code = msg->status_code;
843       souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
844     }
845     g_mutex_unlock (&souphttpsink->mutex);
846     return;
847   }
848
849   g_list_free_full (souphttpsink->sent_buffers,
850       (GDestroyNotify) gst_buffer_unref);
851   souphttpsink->sent_buffers = NULL;
852   souphttpsink->failures = 0;
853
854   send_message_locked (souphttpsink);
855   g_mutex_unlock (&souphttpsink->mutex);
856 }
857
858 static GstFlowReturn
859 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
860 {
861   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
862   GSource *source;
863   gboolean wake;
864
865   if (souphttpsink->status_code != 0) {
866     GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
867         ("Could not write to HTTP URI"),
868         ("status: %d %s", souphttpsink->status_code,
869             souphttpsink->reason_phrase));
870     return GST_FLOW_ERROR;
871   }
872
873   g_mutex_lock (&souphttpsink->mutex);
874   if (souphttpsink->location != NULL) {
875     wake = (souphttpsink->queued_buffers == NULL);
876     souphttpsink->queued_buffers =
877         g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
878
879     if (wake) {
880       GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers");
881       source = g_idle_source_new ();
882       g_source_set_callback (source, (GSourceFunc) (send_message),
883           souphttpsink, NULL);
884       g_source_attach (source, souphttpsink->context);
885       g_source_unref (source);
886     }
887   }
888   g_mutex_unlock (&souphttpsink->mutex);
889
890   return GST_FLOW_OK;
891 }
892
893 static void
894 authenticate (SoupSession * session, SoupMessage * msg,
895     SoupAuth * auth, gboolean retrying, gpointer user_data)
896 {
897   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
898
899   if (!retrying) {
900     /* First time authentication only, if we fail and are called again with retry true fall through */
901     if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
902       if (souphttpsink->user_id && souphttpsink->user_pw)
903         soup_auth_authenticate (auth, souphttpsink->user_id,
904             souphttpsink->user_pw);
905     } else if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
906       if (souphttpsink->proxy_id && souphttpsink->proxy_pw)
907         soup_auth_authenticate (auth, souphttpsink->proxy_id,
908             souphttpsink->proxy_pw);
909     }
910   }
911 }