Tizen 2.0 Release
[framework/multimedia/gst-plugins-good0.10.git] / ext / soup / gstsouphttpclientsink.c
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17  * Boston, MA 02110-1335, USA.
18  */
19 /**
20  * SECTION:element-gstsouphttpclientsink
21  *
22  * The souphttpclientsink element sends pipeline data to an HTTP server
23  * using HTTP PUT commands.
24  *
25  * <refsect2>
26  * <title>Example launch line</title>
27  * |[
28  * gst-launch -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
29  *   souphttpclientsink location=http://server/filename.ogv
30  * ]|
31  * 
32  * This example encodes 10 seconds of video and sends it to the HTTP
33  * server "server" using HTTP PUT commands.
34  * </refsect2>
35  */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include <gst/gst.h>
42 #include <gst/base/gstbasesink.h>
43 #include "gstsouphttpclientsink.h"
44
45 #include <gst/glib-compat-private.h>
46
47 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
48 #define GST_CAT_DEFAULT souphttpclientsink_dbg
49
50 /* prototypes */
51
52
53 static void gst_soup_http_client_sink_set_property (GObject * object,
54     guint property_id, const GValue * value, GParamSpec * pspec);
55 static void gst_soup_http_client_sink_get_property (GObject * object,
56     guint property_id, GValue * value, GParamSpec * pspec);
57 static void gst_soup_http_client_sink_dispose (GObject * object);
58 static void gst_soup_http_client_sink_finalize (GObject * object);
59
60 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
61     GstCaps * caps);
62 static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
63     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
64 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
65 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
66 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
67 static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
68     GstEvent * event);
69 static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
70     GstBuffer * buffer);
71 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
72     GstBuffer * buffer);
73
74 static void free_buffer_list (GList * list);
75 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
76     souphttpsink);
77 static void authenticate (SoupSession * session, SoupMessage * msg,
78     SoupAuth * auth, gboolean retrying, gpointer user_data);
79 static void callback (SoupSession * session, SoupMessage * msg,
80     gpointer user_data);
81 static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
82     souphttpsink, const gchar * uri);
83
84 enum
85 {
86   PROP_0,
87   PROP_LOCATION,
88   PROP_USER_AGENT,
89   PROP_AUTOMATIC_REDIRECT,
90   PROP_PROXY,
91   PROP_USER_ID,
92   PROP_USER_PW,
93   PROP_PROXY_ID,
94   PROP_PROXY_PW,
95   PROP_COOKIES,
96   PROP_SESSION
97 };
98
99 #define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
100
101 /* pad templates */
102
103 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
104 GST_STATIC_PAD_TEMPLATE ("sink",
105     GST_PAD_SINK,
106     GST_PAD_ALWAYS,
107     GST_STATIC_CAPS_ANY);
108
109
110 /* class initialization */
111
112 #define DEBUG_INIT(bla) \
113   GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0, \
114       "souphttpclientsink element");
115
116 GST_BOILERPLATE_FULL (GstSoupHttpClientSink, gst_soup_http_client_sink,
117     GstBaseSink, GST_TYPE_BASE_SINK, DEBUG_INIT);
118
119 static void
120 gst_soup_http_client_sink_base_init (gpointer g_class)
121 {
122   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
123
124   gst_element_class_add_static_pad_template (element_class,
125       &gst_soup_http_client_sink_sink_template);
126
127   gst_element_class_set_details_simple (element_class, "HTTP client sink",
128       "Generic", "Sends streams to HTTP server via PUT",
129       "David Schleef <ds@entropywave.com>");
130 }
131
132 static void
133 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
134 {
135   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
136   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
137
138   gobject_class->set_property = gst_soup_http_client_sink_set_property;
139   gobject_class->get_property = gst_soup_http_client_sink_get_property;
140   gobject_class->dispose = gst_soup_http_client_sink_dispose;
141   gobject_class->finalize = gst_soup_http_client_sink_finalize;
142   base_sink_class->set_caps =
143       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
144   if (0)
145     base_sink_class->get_times =
146         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
147   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
148   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
149   base_sink_class->unlock =
150       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
151   base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
152   if (0)
153     base_sink_class->preroll =
154         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
155   base_sink_class->render =
156       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
157
158   g_object_class_install_property (gobject_class,
159       PROP_LOCATION,
160       g_param_spec_string ("location", "Location",
161           "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162   g_object_class_install_property (gobject_class,
163       PROP_USER_AGENT,
164       g_param_spec_string ("user-agent", "User-Agent",
165           "Value of the User-Agent HTTP request header field",
166           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167   g_object_class_install_property (gobject_class,
168       PROP_AUTOMATIC_REDIRECT,
169       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
170           "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
171           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172   g_object_class_install_property (gobject_class,
173       PROP_PROXY,
174       g_param_spec_string ("proxy", "Proxy",
175           "HTTP proxy server URI", "",
176           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177   g_object_class_install_property (gobject_class,
178       PROP_USER_ID,
179       g_param_spec_string ("user-id", "user-id",
180           "user id for authentication", "",
181           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
182   g_object_class_install_property (gobject_class, PROP_USER_PW,
183       g_param_spec_string ("user-pw", "user-pw",
184           "user password for authentication", "",
185           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
186   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
187       g_param_spec_string ("proxy-id", "proxy-id",
188           "user id for proxy authentication", "",
189           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
190   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
191       g_param_spec_string ("proxy-pw", "proxy-pw",
192           "user password for proxy authentication", "",
193           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
194   g_object_class_install_property (gobject_class, PROP_SESSION,
195       g_param_spec_object ("session", "session",
196           "SoupSession object to use for communication",
197           SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
198   g_object_class_install_property (gobject_class, PROP_COOKIES,
199       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
200           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201
202 }
203
204 static void
205 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink,
206     GstSoupHttpClientSinkClass * souphttpsink_class)
207 {
208   const char *proxy;
209
210   souphttpsink->mutex = g_mutex_new ();
211   souphttpsink->cond = g_cond_new ();
212
213   souphttpsink->location = NULL;
214   souphttpsink->automatic_redirect = TRUE;
215   souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
216   souphttpsink->user_id = NULL;
217   souphttpsink->user_pw = NULL;
218   souphttpsink->proxy_id = NULL;
219   souphttpsink->proxy_pw = NULL;
220   souphttpsink->prop_session = NULL;
221   souphttpsink->timeout = 1;
222   proxy = g_getenv ("http_proxy");
223   if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
224     GST_WARNING_OBJECT (souphttpsink,
225         "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
226         proxy);
227   }
228
229   gst_soup_http_client_sink_reset (souphttpsink);
230 }
231
232 static void
233 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
234 {
235   g_free (souphttpsink->reason_phrase);
236   souphttpsink->reason_phrase = NULL;
237   souphttpsink->status_code = 0;
238   souphttpsink->offset = 0;
239
240 }
241
242 static gboolean
243 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
244     const gchar * uri)
245 {
246   if (souphttpsink->proxy) {
247     soup_uri_free (souphttpsink->proxy);
248     souphttpsink->proxy = NULL;
249   }
250   if (g_str_has_prefix (uri, "http://")) {
251     souphttpsink->proxy = soup_uri_new (uri);
252   } else {
253     gchar *new_uri = g_strconcat ("http://", uri, NULL);
254
255     souphttpsink->proxy = soup_uri_new (new_uri);
256     g_free (new_uri);
257   }
258
259   return TRUE;
260 }
261
262 void
263 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
264     const GValue * value, GParamSpec * pspec)
265 {
266   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
267
268   g_mutex_lock (souphttpsink->mutex);
269   switch (property_id) {
270     case PROP_SESSION:
271       if (souphttpsink->prop_session) {
272         g_object_unref (souphttpsink->prop_session);
273       }
274       souphttpsink->prop_session = g_value_dup_object (value);
275       break;
276     case PROP_LOCATION:
277       g_free (souphttpsink->location);
278       souphttpsink->location = g_value_dup_string (value);
279       souphttpsink->offset = 0;
280       break;
281     case PROP_USER_AGENT:
282       g_free (souphttpsink->user_agent);
283       souphttpsink->user_agent = g_value_dup_string (value);
284       break;
285     case PROP_AUTOMATIC_REDIRECT:
286       souphttpsink->automatic_redirect = g_value_get_boolean (value);
287       break;
288     case PROP_USER_ID:
289       g_free (souphttpsink->user_id);
290       souphttpsink->user_id = g_value_dup_string (value);
291       break;
292     case PROP_USER_PW:
293       g_free (souphttpsink->user_pw);
294       souphttpsink->user_pw = g_value_dup_string (value);
295       break;
296     case PROP_PROXY_ID:
297       g_free (souphttpsink->proxy_id);
298       souphttpsink->proxy_id = g_value_dup_string (value);
299       break;
300     case PROP_PROXY_PW:
301       g_free (souphttpsink->proxy_pw);
302       souphttpsink->proxy_pw = g_value_dup_string (value);
303       break;
304     case PROP_PROXY:
305     {
306       const gchar *proxy;
307
308       proxy = g_value_get_string (value);
309
310       if (proxy == NULL) {
311         GST_WARNING ("proxy property cannot be NULL");
312         goto done;
313       }
314       if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
315         GST_WARNING ("badly formatted proxy URI");
316         goto done;
317       }
318       break;
319     }
320     case PROP_COOKIES:
321       g_strfreev (souphttpsink->cookies);
322       souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
323       break;
324     default:
325       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
326       break;
327   }
328 done:
329   g_mutex_unlock (souphttpsink->mutex);
330 }
331
332 void
333 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
334     GValue * value, GParamSpec * pspec)
335 {
336   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
337
338   switch (property_id) {
339     case PROP_SESSION:
340       g_value_set_object (value, souphttpsink->prop_session);
341       break;
342     case PROP_LOCATION:
343       g_value_set_string (value, souphttpsink->location);
344       break;
345     case PROP_AUTOMATIC_REDIRECT:
346       g_value_set_boolean (value, souphttpsink->automatic_redirect);
347       break;
348     case PROP_USER_AGENT:
349       g_value_set_string (value, souphttpsink->user_agent);
350       break;
351     case PROP_USER_ID:
352       g_value_set_string (value, souphttpsink->user_id);
353       break;
354     case PROP_USER_PW:
355       g_value_set_string (value, souphttpsink->user_pw);
356       break;
357     case PROP_PROXY_ID:
358       g_value_set_string (value, souphttpsink->proxy_id);
359       break;
360     case PROP_PROXY_PW:
361       g_value_set_string (value, souphttpsink->proxy_pw);
362       break;
363     case PROP_PROXY:
364       if (souphttpsink->proxy == NULL)
365         g_value_set_static_string (value, "");
366       else {
367         char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
368
369         g_value_set_string (value, proxy);
370         g_free (proxy);
371       }
372       break;
373     case PROP_COOKIES:
374       g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
375       break;
376     default:
377       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
378       break;
379   }
380 }
381
382 void
383 gst_soup_http_client_sink_dispose (GObject * object)
384 {
385   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
386
387   /* clean up as possible.  may be called multiple times */
388   if (souphttpsink->prop_session)
389     g_object_unref (souphttpsink->prop_session);
390   souphttpsink->prop_session = NULL;
391
392   G_OBJECT_CLASS (parent_class)->dispose (object);
393 }
394
395 void
396 gst_soup_http_client_sink_finalize (GObject * object)
397 {
398   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
399
400   /* clean up object here */
401
402   g_free (souphttpsink->user_agent);
403   g_free (souphttpsink->user_id);
404   g_free (souphttpsink->user_pw);
405   g_free (souphttpsink->proxy_id);
406   g_free (souphttpsink->proxy_pw);
407   if (souphttpsink->proxy)
408     soup_uri_free (souphttpsink->proxy);
409   g_free (souphttpsink->location);
410
411   g_cond_free (souphttpsink->cond);
412   g_mutex_free (souphttpsink->mutex);
413
414   G_OBJECT_CLASS (parent_class)->finalize (object);
415 }
416
417
418
419 static gboolean
420 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
421 {
422   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
423   GstStructure *structure;
424   const GValue *value_array;
425   int i, n;
426
427   structure = gst_caps_get_structure (caps, 0);
428   value_array = gst_structure_get_value (structure, "streamheader");
429   if (value_array) {
430     free_buffer_list (souphttpsink->streamheader_buffers);
431     souphttpsink->streamheader_buffers = NULL;
432
433     n = gst_value_array_get_size (value_array);
434     for (i = 0; i < n; i++) {
435       const GValue *value;
436       GstBuffer *buffer;
437       value = gst_value_array_get_value (value_array, i);
438       buffer = GST_BUFFER (gst_value_get_buffer (value));
439       souphttpsink->streamheader_buffers =
440           g_list_append (souphttpsink->streamheader_buffers,
441           gst_buffer_ref (buffer));
442     }
443   }
444
445   return TRUE;
446 }
447
448 static void
449 gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
450     GstClockTime * start, GstClockTime * end)
451 {
452
453 }
454
455 static gboolean
456 thread_ready_idle_cb (gpointer data)
457 {
458   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
459
460   GST_LOG_OBJECT (souphttpsink, "thread ready");
461
462   g_mutex_lock (souphttpsink->mutex);
463   g_cond_signal (souphttpsink->cond);
464   g_mutex_unlock (souphttpsink->mutex);
465
466   return FALSE;                 /* only run once */
467 }
468
469 static gpointer
470 thread_func (gpointer ptr)
471 {
472   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
473
474   GST_DEBUG ("thread start");
475
476   g_main_loop_run (souphttpsink->loop);
477
478   GST_DEBUG ("thread quit");
479
480   return NULL;
481 }
482
483 static gboolean
484 gst_soup_http_client_sink_start (GstBaseSink * sink)
485 {
486   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
487
488   if (souphttpsink->prop_session) {
489     souphttpsink->session = souphttpsink->prop_session;
490   } else {
491     GSource *source;
492     GError *error = NULL;
493
494     souphttpsink->context = g_main_context_new ();
495
496     /* set up idle source to signal when the main loop is running and
497      * it's safe for ::stop() to call g_main_loop_quit() */
498     source = g_idle_source_new ();
499     g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
500     g_source_attach (source, souphttpsink->context);
501     g_source_unref (source);
502
503     souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
504
505     g_mutex_lock (souphttpsink->mutex);
506
507     /* FIXME: error handling */
508 #if !GLIB_CHECK_VERSION (2, 31, 0)
509     souphttpsink->thread = g_thread_create (thread_func, souphttpsink,
510         TRUE, &error);
511 #else
512     souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
513         thread_func, souphttpsink, &error);
514 #endif
515
516     GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
517     g_cond_wait (souphttpsink->cond, souphttpsink->mutex);
518     g_mutex_unlock (souphttpsink->mutex);
519     GST_LOG_OBJECT (souphttpsink, "main loop thread running");
520
521     souphttpsink->session =
522         soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
523         souphttpsink->context, SOUP_SESSION_USER_AGENT,
524         souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
525         NULL);
526
527     //soup_session_add_feature (souphttpsink->session,
528     //    SOUP_SESSION_FEATURE (soup_logger_new (SOUP_LOGGER_LOG_BODY, 100)));
529
530     g_signal_connect (souphttpsink->session, "authenticate",
531         G_CALLBACK (authenticate), souphttpsink);
532   }
533
534   return TRUE;
535 }
536
537 static gboolean
538 gst_soup_http_client_sink_stop (GstBaseSink * sink)
539 {
540   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
541
542   GST_DEBUG ("stop");
543
544   if (souphttpsink->prop_session == NULL) {
545     soup_session_abort (souphttpsink->session);
546     g_object_unref (souphttpsink->session);
547   }
548
549   if (souphttpsink->loop) {
550     g_main_loop_quit (souphttpsink->loop);
551     g_thread_join (souphttpsink->thread);
552     g_main_loop_unref (souphttpsink->loop);
553     souphttpsink->loop = NULL;
554   }
555   if (souphttpsink->context) {
556     g_main_context_unref (souphttpsink->context);
557     souphttpsink->context = NULL;
558   }
559
560   gst_soup_http_client_sink_reset (souphttpsink);
561
562   return TRUE;
563 }
564
565 static gboolean
566 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
567 {
568   GST_DEBUG ("unlock");
569
570   return TRUE;
571 }
572
573 static gboolean
574 gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
575 {
576   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
577
578   GST_DEBUG_OBJECT (souphttpsink, "event");
579
580   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
581     GST_DEBUG_OBJECT (souphttpsink, "got eos");
582     g_mutex_lock (souphttpsink->mutex);
583     while (souphttpsink->message) {
584       GST_DEBUG_OBJECT (souphttpsink, "waiting");
585       g_cond_wait (souphttpsink->cond, souphttpsink->mutex);
586     }
587     g_mutex_unlock (souphttpsink->mutex);
588     GST_DEBUG_OBJECT (souphttpsink, "finished eos");
589   }
590
591   return TRUE;
592 }
593
594 static GstFlowReturn
595 gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
596 {
597   GST_DEBUG ("preroll");
598
599   return GST_FLOW_OK;
600 }
601
602 static void
603 free_buffer_list (GList * list)
604 {
605   GList *g;
606   for (g = list; g; g = g_list_next (g)) {
607     GstBuffer *buffer = g->data;
608     gst_buffer_unref (buffer);
609   }
610   g_list_free (list);
611 }
612
613 static void
614 send_message_locked (GstSoupHttpClientSink * souphttpsink)
615 {
616   GList *g;
617   guint64 n;
618
619   if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
620     return;
621   }
622
623   /* If the URI went away, drop all these buffers */
624   if (souphttpsink->location == NULL) {
625     free_buffer_list (souphttpsink->queued_buffers);
626     souphttpsink->queued_buffers = NULL;
627     return;
628   }
629
630   souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
631
632   n = 0;
633   if (souphttpsink->offset == 0) {
634     for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
635       GstBuffer *buffer = g->data;
636       soup_message_body_append (souphttpsink->message->request_body,
637           SOUP_MEMORY_STATIC, GST_BUFFER_DATA (buffer),
638           GST_BUFFER_SIZE (buffer));
639       n += GST_BUFFER_SIZE (buffer);
640     }
641   }
642
643   for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
644     GstBuffer *buffer = g->data;
645     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
646       soup_message_body_append (souphttpsink->message->request_body,
647           SOUP_MEMORY_STATIC, GST_BUFFER_DATA (buffer),
648           GST_BUFFER_SIZE (buffer));
649       n += GST_BUFFER_SIZE (buffer);
650     }
651   }
652
653   if (souphttpsink->offset != 0) {
654     char *s;
655     s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
656         souphttpsink->offset, souphttpsink->offset + n - 1);
657     soup_message_headers_append (souphttpsink->message->request_headers,
658         "Content-Range", s);
659     g_free (s);
660   }
661
662   if (n == 0) {
663     free_buffer_list (souphttpsink->queued_buffers);
664     souphttpsink->queued_buffers = NULL;
665     g_object_unref (souphttpsink->message);
666     souphttpsink->message = NULL;
667     return;
668   }
669
670   souphttpsink->sent_buffers = souphttpsink->queued_buffers;
671   souphttpsink->queued_buffers = NULL;
672
673   GST_DEBUG_OBJECT (souphttpsink,
674       "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
675       souphttpsink->offset, n);
676   soup_session_queue_message (souphttpsink->session, souphttpsink->message,
677       callback, souphttpsink);
678
679   souphttpsink->offset += n;
680 }
681
682 static gboolean
683 send_message (GstSoupHttpClientSink * souphttpsink)
684 {
685   g_mutex_lock (souphttpsink->mutex);
686   send_message_locked (souphttpsink);
687   g_mutex_unlock (souphttpsink->mutex);
688
689   return FALSE;
690 }
691
692 static void
693 callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
694 {
695   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
696
697   GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
698       msg->status_code, msg->reason_phrase);
699
700   g_mutex_lock (souphttpsink->mutex);
701   g_cond_signal (souphttpsink->cond);
702   souphttpsink->message = NULL;
703
704   if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
705     souphttpsink->status_code = msg->status_code;
706     souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
707     g_mutex_unlock (souphttpsink->mutex);
708     return;
709   }
710
711   free_buffer_list (souphttpsink->sent_buffers);
712   souphttpsink->sent_buffers = NULL;
713
714   send_message_locked (souphttpsink);
715   g_mutex_unlock (souphttpsink->mutex);
716 }
717
718 static GstFlowReturn
719 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
720 {
721   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
722   GSource *source;
723   gboolean wake;
724
725   if (souphttpsink->status_code != 0) {
726     /* FIXME we should allow a moderate amount of retries. */
727     GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
728         ("Could not write to HTTP URI"),
729         ("error: %d %s", souphttpsink->status_code,
730             souphttpsink->reason_phrase));
731     return GST_FLOW_ERROR;
732   }
733
734   g_mutex_lock (souphttpsink->mutex);
735   if (souphttpsink->location != NULL) {
736     wake = (souphttpsink->queued_buffers == NULL);
737     souphttpsink->queued_buffers =
738         g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
739
740     if (wake) {
741       source = g_idle_source_new ();
742       g_source_set_callback (source, (GSourceFunc) (send_message),
743           souphttpsink, NULL);
744       g_source_attach (source, souphttpsink->context);
745       g_source_unref (source);
746     }
747   }
748   g_mutex_unlock (souphttpsink->mutex);
749
750   return GST_FLOW_OK;
751 }
752
753 static void
754 authenticate (SoupSession * session, SoupMessage * msg,
755     SoupAuth * auth, gboolean retrying, gpointer user_data)
756 {
757   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
758
759   if (!retrying) {
760     if (souphttpsink->user_id && souphttpsink->user_pw) {
761       soup_auth_authenticate (auth,
762           souphttpsink->user_id, souphttpsink->user_pw);
763     }
764   }
765 }