d8f46df9d30a482168204eedb7ea0c579e62f86d
[platform/upstream/gstreamer.git] / ext / soup / gstsouphttpclientsink.c
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17  * Boston, MA 02110-1335, USA.
18  */
19 /**
20  * SECTION:element-gstsouphttpclientsink
21  *
22  * The souphttpclientsink element sends pipeline data to an HTTP server
23  * using HTTP PUT commands.
24  *
25  * <refsect2>
26  * <title>Example launch line</title>
27  * |[
28  * gst-launch -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
29  *   souphttpclientsink location=http://server/filename.ogv
30  * ]|
31  * 
32  * This example encodes 10 seconds of video and sends it to the HTTP
33  * server "server" using HTTP PUT commands.
34  * </refsect2>
35  */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include <gst/gst.h>
42 #include <gst/base/gstbasesink.h>
43 #include "gstsouphttpclientsink.h"
44
45 #include <gst/glib-compat-private.h>
46
47 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
48 #define GST_CAT_DEFAULT souphttpclientsink_dbg
49
50 /* prototypes */
51
52
53 static void gst_soup_http_client_sink_set_property (GObject * object,
54     guint property_id, const GValue * value, GParamSpec * pspec);
55 static void gst_soup_http_client_sink_get_property (GObject * object,
56     guint property_id, GValue * value, GParamSpec * pspec);
57 static void gst_soup_http_client_sink_dispose (GObject * object);
58 static void gst_soup_http_client_sink_finalize (GObject * object);
59
60 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
61     GstCaps * caps);
62 static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
63     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
64 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
65 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
66 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
67 static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
68     GstEvent * event);
69 static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
70     GstBuffer * buffer);
71 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
72     GstBuffer * buffer);
73
74 static void free_buffer_list (GList * list);
75 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
76     souphttpsink);
77 static void authenticate (SoupSession * session, SoupMessage * msg,
78     SoupAuth * auth, gboolean retrying, gpointer user_data);
79 static void callback (SoupSession * session, SoupMessage * msg,
80     gpointer user_data);
81 static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
82     souphttpsink, const gchar * uri);
83
84 enum
85 {
86   PROP_0,
87   PROP_LOCATION,
88   PROP_USER_AGENT,
89   PROP_AUTOMATIC_REDIRECT,
90   PROP_PROXY,
91   PROP_USER_ID,
92   PROP_USER_PW,
93   PROP_PROXY_ID,
94   PROP_PROXY_PW,
95   PROP_COOKIES,
96   PROP_SESSION
97 };
98
99 #define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
100
101 /* pad templates */
102
103 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
104 GST_STATIC_PAD_TEMPLATE ("sink",
105     GST_PAD_SINK,
106     GST_PAD_ALWAYS,
107     GST_STATIC_CAPS_ANY);
108
109
110 /* class initialization */
111
112 #define gst_soup_http_client_sink_parent_class parent_class
113 G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
114     GST_TYPE_BASE_SINK);
115
116 static void
117 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
118 {
119   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
120   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
121   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
122
123   gobject_class->set_property = gst_soup_http_client_sink_set_property;
124   gobject_class->get_property = gst_soup_http_client_sink_get_property;
125   gobject_class->dispose = gst_soup_http_client_sink_dispose;
126   gobject_class->finalize = gst_soup_http_client_sink_finalize;
127
128   g_object_class_install_property (gobject_class,
129       PROP_LOCATION,
130       g_param_spec_string ("location", "Location",
131           "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132   g_object_class_install_property (gobject_class,
133       PROP_USER_AGENT,
134       g_param_spec_string ("user-agent", "User-Agent",
135           "Value of the User-Agent HTTP request header field",
136           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
137   g_object_class_install_property (gobject_class,
138       PROP_AUTOMATIC_REDIRECT,
139       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
140           "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
141           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
142   g_object_class_install_property (gobject_class,
143       PROP_PROXY,
144       g_param_spec_string ("proxy", "Proxy",
145           "HTTP proxy server URI", "",
146           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
147   g_object_class_install_property (gobject_class,
148       PROP_USER_ID,
149       g_param_spec_string ("user-id", "user-id",
150           "user id for authentication", "",
151           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
152   g_object_class_install_property (gobject_class, PROP_USER_PW,
153       g_param_spec_string ("user-pw", "user-pw",
154           "user password for authentication", "",
155           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
156   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
157       g_param_spec_string ("proxy-id", "proxy-id",
158           "user id for proxy authentication", "",
159           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
160   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
161       g_param_spec_string ("proxy-pw", "proxy-pw",
162           "user password for proxy authentication", "",
163           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
164   g_object_class_install_property (gobject_class, PROP_SESSION,
165       g_param_spec_object ("session", "session",
166           "SoupSession object to use for communication",
167           SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
168   g_object_class_install_property (gobject_class, PROP_COOKIES,
169       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
170           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
171
172   gst_element_class_add_pad_template (gstelement_class,
173       gst_static_pad_template_get (&gst_soup_http_client_sink_sink_template));
174
175   gst_element_class_set_details_simple (gstelement_class, "HTTP client sink",
176       "Generic", "Sends streams to HTTP server via PUT",
177       "David Schleef <ds@entropywave.com>");
178
179   base_sink_class->set_caps =
180       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
181   if (0)
182     base_sink_class->get_times =
183         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
184   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
185   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
186   base_sink_class->unlock =
187       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
188   base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
189   if (0)
190     base_sink_class->preroll =
191         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
192   base_sink_class->render =
193       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
194
195   GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
196       "souphttpclientsink element");
197
198 }
199
200 static void
201 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
202 {
203   const char *proxy;
204
205   souphttpsink->mutex = g_mutex_new ();
206   souphttpsink->cond = g_cond_new ();
207
208   souphttpsink->location = NULL;
209   souphttpsink->automatic_redirect = TRUE;
210   souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
211   souphttpsink->user_id = NULL;
212   souphttpsink->user_pw = NULL;
213   souphttpsink->proxy_id = NULL;
214   souphttpsink->proxy_pw = NULL;
215   souphttpsink->prop_session = NULL;
216   souphttpsink->timeout = 1;
217   proxy = g_getenv ("http_proxy");
218   if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
219     GST_WARNING_OBJECT (souphttpsink,
220         "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
221         proxy);
222   }
223
224   gst_soup_http_client_sink_reset (souphttpsink);
225 }
226
227 static void
228 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
229 {
230   g_free (souphttpsink->reason_phrase);
231   souphttpsink->reason_phrase = NULL;
232   souphttpsink->status_code = 0;
233   souphttpsink->offset = 0;
234
235 }
236
237 static gboolean
238 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
239     const gchar * uri)
240 {
241   if (souphttpsink->proxy) {
242     soup_uri_free (souphttpsink->proxy);
243     souphttpsink->proxy = NULL;
244   }
245   if (g_str_has_prefix (uri, "http://")) {
246     souphttpsink->proxy = soup_uri_new (uri);
247   } else {
248     gchar *new_uri = g_strconcat ("http://", uri, NULL);
249
250     souphttpsink->proxy = soup_uri_new (new_uri);
251     g_free (new_uri);
252   }
253
254   return TRUE;
255 }
256
257 void
258 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
259     const GValue * value, GParamSpec * pspec)
260 {
261   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
262
263   g_mutex_lock (souphttpsink->mutex);
264   switch (property_id) {
265     case PROP_SESSION:
266       if (souphttpsink->prop_session) {
267         g_object_unref (souphttpsink->prop_session);
268       }
269       souphttpsink->prop_session = g_value_dup_object (value);
270       break;
271     case PROP_LOCATION:
272       g_free (souphttpsink->location);
273       souphttpsink->location = g_value_dup_string (value);
274       souphttpsink->offset = 0;
275       break;
276     case PROP_USER_AGENT:
277       g_free (souphttpsink->user_agent);
278       souphttpsink->user_agent = g_value_dup_string (value);
279       break;
280     case PROP_AUTOMATIC_REDIRECT:
281       souphttpsink->automatic_redirect = g_value_get_boolean (value);
282       break;
283     case PROP_USER_ID:
284       g_free (souphttpsink->user_id);
285       souphttpsink->user_id = g_value_dup_string (value);
286       break;
287     case PROP_USER_PW:
288       g_free (souphttpsink->user_pw);
289       souphttpsink->user_pw = g_value_dup_string (value);
290       break;
291     case PROP_PROXY_ID:
292       g_free (souphttpsink->proxy_id);
293       souphttpsink->proxy_id = g_value_dup_string (value);
294       break;
295     case PROP_PROXY_PW:
296       g_free (souphttpsink->proxy_pw);
297       souphttpsink->proxy_pw = g_value_dup_string (value);
298       break;
299     case PROP_PROXY:
300     {
301       const gchar *proxy;
302
303       proxy = g_value_get_string (value);
304
305       if (proxy == NULL) {
306         GST_WARNING ("proxy property cannot be NULL");
307         goto done;
308       }
309       if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
310         GST_WARNING ("badly formatted proxy URI");
311         goto done;
312       }
313       break;
314     }
315     case PROP_COOKIES:
316       g_strfreev (souphttpsink->cookies);
317       souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
318       break;
319     default:
320       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
321       break;
322   }
323 done:
324   g_mutex_unlock (souphttpsink->mutex);
325 }
326
327 void
328 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
329     GValue * value, GParamSpec * pspec)
330 {
331   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
332
333   switch (property_id) {
334     case PROP_SESSION:
335       g_value_set_object (value, souphttpsink->prop_session);
336       break;
337     case PROP_LOCATION:
338       g_value_set_string (value, souphttpsink->location);
339       break;
340     case PROP_AUTOMATIC_REDIRECT:
341       g_value_set_boolean (value, souphttpsink->automatic_redirect);
342       break;
343     case PROP_USER_AGENT:
344       g_value_set_string (value, souphttpsink->user_agent);
345       break;
346     case PROP_USER_ID:
347       g_value_set_string (value, souphttpsink->user_id);
348       break;
349     case PROP_USER_PW:
350       g_value_set_string (value, souphttpsink->user_pw);
351       break;
352     case PROP_PROXY_ID:
353       g_value_set_string (value, souphttpsink->proxy_id);
354       break;
355     case PROP_PROXY_PW:
356       g_value_set_string (value, souphttpsink->proxy_pw);
357       break;
358     case PROP_PROXY:
359       if (souphttpsink->proxy == NULL)
360         g_value_set_static_string (value, "");
361       else {
362         char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
363
364         g_value_set_string (value, proxy);
365         g_free (proxy);
366       }
367       break;
368     case PROP_COOKIES:
369       g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
370       break;
371     default:
372       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
373       break;
374   }
375 }
376
377 void
378 gst_soup_http_client_sink_dispose (GObject * object)
379 {
380   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
381
382   /* clean up as possible.  may be called multiple times */
383   if (souphttpsink->prop_session)
384     g_object_unref (souphttpsink->prop_session);
385   souphttpsink->prop_session = NULL;
386
387   G_OBJECT_CLASS (parent_class)->dispose (object);
388 }
389
390 void
391 gst_soup_http_client_sink_finalize (GObject * object)
392 {
393   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
394
395   /* clean up object here */
396
397   g_free (souphttpsink->user_agent);
398   g_free (souphttpsink->user_id);
399   g_free (souphttpsink->user_pw);
400   g_free (souphttpsink->proxy_id);
401   g_free (souphttpsink->proxy_pw);
402   if (souphttpsink->proxy)
403     soup_uri_free (souphttpsink->proxy);
404   g_free (souphttpsink->location);
405
406   g_cond_free (souphttpsink->cond);
407   g_mutex_free (souphttpsink->mutex);
408
409   G_OBJECT_CLASS (parent_class)->finalize (object);
410 }
411
412
413
414 static gboolean
415 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
416 {
417   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
418   GstStructure *structure;
419   const GValue *value_array;
420   int i, n;
421
422   structure = gst_caps_get_structure (caps, 0);
423   value_array = gst_structure_get_value (structure, "streamheader");
424   if (value_array) {
425     free_buffer_list (souphttpsink->streamheader_buffers);
426     souphttpsink->streamheader_buffers = NULL;
427
428     n = gst_value_array_get_size (value_array);
429     for (i = 0; i < n; i++) {
430       const GValue *value;
431       GstBuffer *buffer;
432       value = gst_value_array_get_value (value_array, i);
433       buffer = GST_BUFFER (gst_value_get_buffer (value));
434       souphttpsink->streamheader_buffers =
435           g_list_append (souphttpsink->streamheader_buffers,
436           gst_buffer_ref (buffer));
437     }
438   }
439
440   return TRUE;
441 }
442
443 static void
444 gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
445     GstClockTime * start, GstClockTime * end)
446 {
447
448 }
449
450 static gboolean
451 thread_ready_idle_cb (gpointer data)
452 {
453   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
454
455   GST_LOG_OBJECT (souphttpsink, "thread ready");
456
457   g_mutex_lock (souphttpsink->mutex);
458   g_cond_signal (souphttpsink->cond);
459   g_mutex_unlock (souphttpsink->mutex);
460
461   return FALSE;                 /* only run once */
462 }
463
464 static gpointer
465 thread_func (gpointer ptr)
466 {
467   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
468
469   GST_DEBUG ("thread start");
470
471   g_main_loop_run (souphttpsink->loop);
472
473   GST_DEBUG ("thread quit");
474
475   return NULL;
476 }
477
478 static gboolean
479 gst_soup_http_client_sink_start (GstBaseSink * sink)
480 {
481   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
482
483   if (souphttpsink->prop_session) {
484     souphttpsink->session = souphttpsink->prop_session;
485   } else {
486     GSource *source;
487     GError *error = NULL;
488
489     souphttpsink->context = g_main_context_new ();
490
491     /* set up idle source to signal when the main loop is running and
492      * it's safe for ::stop() to call g_main_loop_quit() */
493     source = g_idle_source_new ();
494     g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
495     g_source_attach (source, souphttpsink->context);
496     g_source_unref (source);
497
498     souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
499
500     g_mutex_lock (souphttpsink->mutex);
501
502     /* FIXME: error handling */
503 #if !GLIB_CHECK_VERSION (2, 31, 0)
504     souphttpsink->thread = g_thread_create (thread_func, souphttpsink,
505         TRUE, &error);
506 #else
507     souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
508         thread_func, souphttpsink, &error);
509 #endif
510
511     GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
512     g_cond_wait (souphttpsink->cond, souphttpsink->mutex);
513     g_mutex_unlock (souphttpsink->mutex);
514     GST_LOG_OBJECT (souphttpsink, "main loop thread running");
515
516     souphttpsink->session =
517         soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
518         souphttpsink->context, SOUP_SESSION_USER_AGENT,
519         souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
520         NULL);
521
522     //soup_session_add_feature (souphttpsink->session,
523     //    SOUP_SESSION_FEATURE (soup_logger_new (SOUP_LOGGER_LOG_BODY, 100)));
524
525     g_signal_connect (souphttpsink->session, "authenticate",
526         G_CALLBACK (authenticate), souphttpsink);
527   }
528
529   return TRUE;
530 }
531
532 static gboolean
533 gst_soup_http_client_sink_stop (GstBaseSink * sink)
534 {
535   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
536
537   GST_DEBUG ("stop");
538
539   if (souphttpsink->prop_session == NULL) {
540     soup_session_abort (souphttpsink->session);
541     g_object_unref (souphttpsink->session);
542   }
543
544   if (souphttpsink->loop) {
545     g_main_loop_quit (souphttpsink->loop);
546     g_thread_join (souphttpsink->thread);
547     g_main_loop_unref (souphttpsink->loop);
548     souphttpsink->loop = NULL;
549   }
550   if (souphttpsink->context) {
551     g_main_context_unref (souphttpsink->context);
552     souphttpsink->context = NULL;
553   }
554
555   gst_soup_http_client_sink_reset (souphttpsink);
556
557   return TRUE;
558 }
559
560 static gboolean
561 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
562 {
563   GST_DEBUG ("unlock");
564
565   return TRUE;
566 }
567
568 static gboolean
569 gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
570 {
571   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
572
573   GST_DEBUG_OBJECT (souphttpsink, "event");
574
575   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
576     GST_DEBUG_OBJECT (souphttpsink, "got eos");
577     g_mutex_lock (souphttpsink->mutex);
578     while (souphttpsink->message) {
579       GST_DEBUG_OBJECT (souphttpsink, "waiting");
580       g_cond_wait (souphttpsink->cond, souphttpsink->mutex);
581     }
582     g_mutex_unlock (souphttpsink->mutex);
583     GST_DEBUG_OBJECT (souphttpsink, "finished eos");
584   }
585
586   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
587 }
588
589 static GstFlowReturn
590 gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
591 {
592   GST_DEBUG ("preroll");
593
594   return GST_FLOW_OK;
595 }
596
597 static void
598 free_buffer_list (GList * list)
599 {
600   GList *g;
601   for (g = list; g; g = g_list_next (g)) {
602     GstBuffer *buffer = g->data;
603     gst_buffer_unref (buffer);
604   }
605   g_list_free (list);
606 }
607
608 static void
609 send_message_locked (GstSoupHttpClientSink * souphttpsink)
610 {
611   GList *g;
612   guint64 n;
613
614   if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
615     return;
616   }
617
618   /* If the URI went away, drop all these buffers */
619   if (souphttpsink->location == NULL) {
620     free_buffer_list (souphttpsink->queued_buffers);
621     souphttpsink->queued_buffers = NULL;
622     return;
623   }
624
625   souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
626
627   n = 0;
628   if (souphttpsink->offset == 0) {
629     for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
630       GstBuffer *buffer = g->data;
631       gpointer data;
632       gsize size;
633
634       /* FIXME, lifetime of the buffer? */
635       data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
636       soup_message_body_append (souphttpsink->message->request_body,
637           SOUP_MEMORY_STATIC, data, size);
638       n += size;
639       gst_buffer_unmap (buffer, data, size);
640     }
641   }
642
643   for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
644     GstBuffer *buffer = g->data;
645     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
646       gpointer data;
647       gsize size;
648
649       /* FIXME, lifetime of the buffer? */
650       data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
651       soup_message_body_append (souphttpsink->message->request_body,
652           SOUP_MEMORY_STATIC, data, size);
653       n += size;
654       gst_buffer_unmap (buffer, data, size);
655     }
656   }
657
658   if (souphttpsink->offset != 0) {
659     char *s;
660     s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
661         souphttpsink->offset, souphttpsink->offset + n - 1);
662     soup_message_headers_append (souphttpsink->message->request_headers,
663         "Content-Range", s);
664     g_free (s);
665   }
666
667   if (n == 0) {
668     free_buffer_list (souphttpsink->queued_buffers);
669     souphttpsink->queued_buffers = NULL;
670     g_object_unref (souphttpsink->message);
671     souphttpsink->message = NULL;
672     return;
673   }
674
675   souphttpsink->sent_buffers = souphttpsink->queued_buffers;
676   souphttpsink->queued_buffers = NULL;
677
678   GST_DEBUG_OBJECT (souphttpsink,
679       "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
680       souphttpsink->offset, n);
681   soup_session_queue_message (souphttpsink->session, souphttpsink->message,
682       callback, souphttpsink);
683
684   souphttpsink->offset += n;
685 }
686
687 static gboolean
688 send_message (GstSoupHttpClientSink * souphttpsink)
689 {
690   g_mutex_lock (souphttpsink->mutex);
691   send_message_locked (souphttpsink);
692   g_mutex_unlock (souphttpsink->mutex);
693
694   return FALSE;
695 }
696
697 static void
698 callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
699 {
700   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
701
702   GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
703       msg->status_code, msg->reason_phrase);
704
705   g_mutex_lock (souphttpsink->mutex);
706   g_cond_signal (souphttpsink->cond);
707   souphttpsink->message = NULL;
708
709   if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
710     souphttpsink->status_code = msg->status_code;
711     souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
712     g_mutex_unlock (souphttpsink->mutex);
713     return;
714   }
715
716   free_buffer_list (souphttpsink->sent_buffers);
717   souphttpsink->sent_buffers = NULL;
718
719   send_message_locked (souphttpsink);
720   g_mutex_unlock (souphttpsink->mutex);
721 }
722
723 static GstFlowReturn
724 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
725 {
726   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
727   GSource *source;
728   gboolean wake;
729
730   if (souphttpsink->status_code != 0) {
731     /* FIXME we should allow a moderate amount of retries. */
732     GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
733         ("Could not write to HTTP URI"),
734         ("error: %d %s", souphttpsink->status_code,
735             souphttpsink->reason_phrase));
736     return GST_FLOW_ERROR;
737   }
738
739   g_mutex_lock (souphttpsink->mutex);
740   if (souphttpsink->location != NULL) {
741     wake = (souphttpsink->queued_buffers == NULL);
742     souphttpsink->queued_buffers =
743         g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
744
745     if (wake) {
746       source = g_idle_source_new ();
747       g_source_set_callback (source, (GSourceFunc) (send_message),
748           souphttpsink, NULL);
749       g_source_attach (source, souphttpsink->context);
750       g_source_unref (source);
751     }
752   }
753   g_mutex_unlock (souphttpsink->mutex);
754
755   return GST_FLOW_OK;
756 }
757
758 static void
759 authenticate (SoupSession * session, SoupMessage * msg,
760     SoupAuth * auth, gboolean retrying, gpointer user_data)
761 {
762   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
763
764   if (!retrying) {
765     if (souphttpsink->user_id && souphttpsink->user_pw) {
766       soup_auth_authenticate (auth,
767           souphttpsink->user_id, souphttpsink->user_pw);
768     }
769   }
770 }