various: fix pad template leaks
[platform/upstream/gstreamer.git] / ext / soup / gstsouphttpclientsink.c
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17  * Boston, MA 02110-1335, USA.
18  */
19 /**
20  * SECTION:element-gstsouphttpclientsink
21  *
22  * The souphttpclientsink element sends pipeline data to an HTTP server
23  * using HTTP PUT commands.
24  *
25  * <refsect2>
26  * <title>Example launch line</title>
27  * |[
28  * gst-launch -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
29  *   souphttpclientsink location=http://server/filename.ogv
30  * ]|
31  * 
32  * This example encodes 10 seconds of video and sends it to the HTTP
33  * server "server" using HTTP PUT commands.
34  * </refsect2>
35  */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include <gst/gst.h>
42 #include <gst/base/gstbasesink.h>
43 #include "gstsouphttpclientsink.h"
44
45 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
46 #define GST_CAT_DEFAULT souphttpclientsink_dbg
47
48 /* prototypes */
49
50
51 static void gst_soup_http_client_sink_set_property (GObject * object,
52     guint property_id, const GValue * value, GParamSpec * pspec);
53 static void gst_soup_http_client_sink_get_property (GObject * object,
54     guint property_id, GValue * value, GParamSpec * pspec);
55 static void gst_soup_http_client_sink_dispose (GObject * object);
56 static void gst_soup_http_client_sink_finalize (GObject * object);
57
58 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
59     GstCaps * caps);
60 static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
61     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
62 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
63 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
64 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
65 static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
66     GstEvent * event);
67 static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
68     GstBuffer * buffer);
69 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
70     GstBuffer * buffer);
71
72 static void free_buffer_list (GList * list);
73 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
74     souphttpsink);
75 static void authenticate (SoupSession * session, SoupMessage * msg,
76     SoupAuth * auth, gboolean retrying, gpointer user_data);
77 static void callback (SoupSession * session, SoupMessage * msg,
78     gpointer user_data);
79 static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
80     souphttpsink, const gchar * uri);
81
82 enum
83 {
84   PROP_0,
85   PROP_LOCATION,
86   PROP_USER_AGENT,
87   PROP_AUTOMATIC_REDIRECT,
88   PROP_PROXY,
89   PROP_USER_ID,
90   PROP_USER_PW,
91   PROP_PROXY_ID,
92   PROP_PROXY_PW,
93   PROP_COOKIES,
94   PROP_SESSION
95 };
96
97 #define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
98
99 /* pad templates */
100
101 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
102 GST_STATIC_PAD_TEMPLATE ("sink",
103     GST_PAD_SINK,
104     GST_PAD_ALWAYS,
105     GST_STATIC_CAPS_ANY);
106
107
108 /* class initialization */
109
110 #define DEBUG_INIT(bla) \
111   GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0, \
112       "souphttpclientsink element");
113
114 GST_BOILERPLATE_FULL (GstSoupHttpClientSink, gst_soup_http_client_sink,
115     GstBaseSink, GST_TYPE_BASE_SINK, DEBUG_INIT);
116
117 static void
118 gst_soup_http_client_sink_base_init (gpointer g_class)
119 {
120   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
121
122   gst_element_class_add_static_pad_template (element_class,
123       &gst_soup_http_client_sink_sink_template);
124
125   gst_element_class_set_details_simple (element_class, "HTTP client sink",
126       "Generic", "Sends streams to HTTP server via PUT",
127       "David Schleef <ds@entropywave.com>");
128 }
129
130 static void
131 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
132 {
133   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
134   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
135
136   gobject_class->set_property = gst_soup_http_client_sink_set_property;
137   gobject_class->get_property = gst_soup_http_client_sink_get_property;
138   gobject_class->dispose = gst_soup_http_client_sink_dispose;
139   gobject_class->finalize = gst_soup_http_client_sink_finalize;
140   base_sink_class->set_caps =
141       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
142   if (0)
143     base_sink_class->get_times =
144         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
145   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
146   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
147   base_sink_class->unlock =
148       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
149   base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
150   if (0)
151     base_sink_class->preroll =
152         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
153   base_sink_class->render =
154       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
155
156   g_object_class_install_property (gobject_class,
157       PROP_LOCATION,
158       g_param_spec_string ("location", "Location",
159           "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
160   g_object_class_install_property (gobject_class,
161       PROP_USER_AGENT,
162       g_param_spec_string ("user-agent", "User-Agent",
163           "Value of the User-Agent HTTP request header field",
164           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
165   g_object_class_install_property (gobject_class,
166       PROP_AUTOMATIC_REDIRECT,
167       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
168           "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
169           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
170   g_object_class_install_property (gobject_class,
171       PROP_PROXY,
172       g_param_spec_string ("proxy", "Proxy",
173           "HTTP proxy server URI", "",
174           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
175   g_object_class_install_property (gobject_class,
176       PROP_USER_ID,
177       g_param_spec_string ("user-id", "user-id",
178           "user id for authentication", "",
179           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
180   g_object_class_install_property (gobject_class, PROP_USER_PW,
181       g_param_spec_string ("user-pw", "user-pw",
182           "user password for authentication", "",
183           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
184   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
185       g_param_spec_string ("proxy-id", "proxy-id",
186           "user id for proxy authentication", "",
187           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
188   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
189       g_param_spec_string ("proxy-pw", "proxy-pw",
190           "user password for proxy authentication", "",
191           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192   g_object_class_install_property (gobject_class, PROP_SESSION,
193       g_param_spec_object ("session", "session",
194           "SoupSession object to use for communication",
195           SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_COOKIES,
197       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
198           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
199
200 }
201
202 static void
203 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink,
204     GstSoupHttpClientSinkClass * souphttpsink_class)
205 {
206   const char *proxy;
207
208   souphttpsink->mutex = g_mutex_new ();
209   souphttpsink->cond = g_cond_new ();
210
211   souphttpsink->location = NULL;
212   souphttpsink->automatic_redirect = TRUE;
213   souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
214   souphttpsink->user_id = NULL;
215   souphttpsink->user_pw = NULL;
216   souphttpsink->proxy_id = NULL;
217   souphttpsink->proxy_pw = NULL;
218   souphttpsink->prop_session = NULL;
219   souphttpsink->timeout = 1;
220   proxy = g_getenv ("http_proxy");
221   if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
222     GST_WARNING_OBJECT (souphttpsink,
223         "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
224         proxy);
225   }
226
227   gst_soup_http_client_sink_reset (souphttpsink);
228 }
229
230 static void
231 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
232 {
233   g_free (souphttpsink->reason_phrase);
234   souphttpsink->reason_phrase = NULL;
235   souphttpsink->status_code = 0;
236   souphttpsink->offset = 0;
237
238 }
239
240 static gboolean
241 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
242     const gchar * uri)
243 {
244   if (souphttpsink->proxy) {
245     soup_uri_free (souphttpsink->proxy);
246     souphttpsink->proxy = NULL;
247   }
248   if (g_str_has_prefix (uri, "http://")) {
249     souphttpsink->proxy = soup_uri_new (uri);
250   } else {
251     gchar *new_uri = g_strconcat ("http://", uri, NULL);
252
253     souphttpsink->proxy = soup_uri_new (new_uri);
254     g_free (new_uri);
255   }
256
257   return TRUE;
258 }
259
260 void
261 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
262     const GValue * value, GParamSpec * pspec)
263 {
264   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
265
266   g_mutex_lock (souphttpsink->mutex);
267   switch (property_id) {
268     case PROP_SESSION:
269       if (souphttpsink->prop_session) {
270         g_object_unref (souphttpsink->prop_session);
271       }
272       souphttpsink->prop_session = g_value_dup_object (value);
273       break;
274     case PROP_LOCATION:
275       g_free (souphttpsink->location);
276       souphttpsink->location = g_value_dup_string (value);
277       souphttpsink->offset = 0;
278       break;
279     case PROP_USER_AGENT:
280       g_free (souphttpsink->user_agent);
281       souphttpsink->user_agent = g_value_dup_string (value);
282       break;
283     case PROP_AUTOMATIC_REDIRECT:
284       souphttpsink->automatic_redirect = g_value_get_boolean (value);
285       break;
286     case PROP_USER_ID:
287       g_free (souphttpsink->user_id);
288       souphttpsink->user_id = g_value_dup_string (value);
289       break;
290     case PROP_USER_PW:
291       g_free (souphttpsink->user_pw);
292       souphttpsink->user_pw = g_value_dup_string (value);
293       break;
294     case PROP_PROXY_ID:
295       g_free (souphttpsink->proxy_id);
296       souphttpsink->proxy_id = g_value_dup_string (value);
297       break;
298     case PROP_PROXY_PW:
299       g_free (souphttpsink->proxy_pw);
300       souphttpsink->proxy_pw = g_value_dup_string (value);
301       break;
302     case PROP_PROXY:
303     {
304       const gchar *proxy;
305
306       proxy = g_value_get_string (value);
307
308       if (proxy == NULL) {
309         GST_WARNING ("proxy property cannot be NULL");
310         goto done;
311       }
312       if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
313         GST_WARNING ("badly formatted proxy URI");
314         goto done;
315       }
316       break;
317     }
318     case PROP_COOKIES:
319       g_strfreev (souphttpsink->cookies);
320       souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
321       break;
322     default:
323       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
324       break;
325   }
326 done:
327   g_mutex_unlock (souphttpsink->mutex);
328 }
329
330 void
331 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
332     GValue * value, GParamSpec * pspec)
333 {
334   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
335
336   switch (property_id) {
337     case PROP_SESSION:
338       g_value_set_object (value, souphttpsink->prop_session);
339       break;
340     case PROP_LOCATION:
341       g_value_set_string (value, souphttpsink->location);
342       break;
343     case PROP_AUTOMATIC_REDIRECT:
344       g_value_set_boolean (value, souphttpsink->automatic_redirect);
345       break;
346     case PROP_USER_AGENT:
347       g_value_set_string (value, souphttpsink->user_agent);
348       break;
349     case PROP_USER_ID:
350       g_value_set_string (value, souphttpsink->user_id);
351       break;
352     case PROP_USER_PW:
353       g_value_set_string (value, souphttpsink->user_pw);
354       break;
355     case PROP_PROXY_ID:
356       g_value_set_string (value, souphttpsink->proxy_id);
357       break;
358     case PROP_PROXY_PW:
359       g_value_set_string (value, souphttpsink->proxy_pw);
360       break;
361     case PROP_PROXY:
362       if (souphttpsink->proxy == NULL)
363         g_value_set_static_string (value, "");
364       else {
365         char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
366
367         g_value_set_string (value, proxy);
368         g_free (proxy);
369       }
370       break;
371     case PROP_COOKIES:
372       g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
373       break;
374     default:
375       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
376       break;
377   }
378 }
379
380 void
381 gst_soup_http_client_sink_dispose (GObject * object)
382 {
383   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
384
385   /* clean up as possible.  may be called multiple times */
386   if (souphttpsink->prop_session)
387     g_object_unref (souphttpsink->prop_session);
388   souphttpsink->prop_session = NULL;
389
390   G_OBJECT_CLASS (parent_class)->dispose (object);
391 }
392
393 void
394 gst_soup_http_client_sink_finalize (GObject * object)
395 {
396   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
397
398   /* clean up object here */
399
400   g_free (souphttpsink->user_agent);
401   g_free (souphttpsink->user_id);
402   g_free (souphttpsink->user_pw);
403   g_free (souphttpsink->proxy_id);
404   g_free (souphttpsink->proxy_pw);
405   if (souphttpsink->proxy)
406     soup_uri_free (souphttpsink->proxy);
407   g_free (souphttpsink->location);
408
409   g_cond_free (souphttpsink->cond);
410   g_mutex_free (souphttpsink->mutex);
411
412   G_OBJECT_CLASS (parent_class)->finalize (object);
413 }
414
415
416
417 static gboolean
418 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
419 {
420   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
421   GstStructure *structure;
422   const GValue *value_array;
423   int i, n;
424
425   structure = gst_caps_get_structure (caps, 0);
426   value_array = gst_structure_get_value (structure, "streamheader");
427   if (value_array) {
428     free_buffer_list (souphttpsink->streamheader_buffers);
429     souphttpsink->streamheader_buffers = NULL;
430
431     n = gst_value_array_get_size (value_array);
432     for (i = 0; i < n; i++) {
433       const GValue *value;
434       GstBuffer *buffer;
435       value = gst_value_array_get_value (value_array, i);
436       buffer = GST_BUFFER (gst_value_get_buffer (value));
437       souphttpsink->streamheader_buffers =
438           g_list_append (souphttpsink->streamheader_buffers,
439           gst_buffer_ref (buffer));
440     }
441   }
442
443   return TRUE;
444 }
445
446 static void
447 gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
448     GstClockTime * start, GstClockTime * end)
449 {
450
451 }
452
453 static gpointer
454 thread_func (gpointer ptr)
455 {
456   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
457
458   GST_DEBUG ("thread start");
459
460   souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
461   g_main_loop_run (souphttpsink->loop);
462
463   GST_DEBUG ("thread quit");
464
465   return NULL;
466 }
467
468 static gboolean
469 gst_soup_http_client_sink_start (GstBaseSink * sink)
470 {
471   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
472
473   if (souphttpsink->prop_session) {
474     souphttpsink->session = souphttpsink->prop_session;
475   } else {
476     GError *error = NULL;
477
478     souphttpsink->context = g_main_context_new ();
479
480     souphttpsink->thread = g_thread_create (thread_func, souphttpsink,
481         TRUE, &error);
482
483     souphttpsink->session =
484         soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
485         souphttpsink->context, SOUP_SESSION_USER_AGENT,
486         souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
487         NULL);
488
489     //soup_session_add_feature (souphttpsink->session,
490     //    SOUP_SESSION_FEATURE (soup_logger_new (SOUP_LOGGER_LOG_BODY, 100)));
491
492     g_signal_connect (souphttpsink->session, "authenticate",
493         G_CALLBACK (authenticate), souphttpsink);
494   }
495
496   return TRUE;
497 }
498
499 static gboolean
500 gst_soup_http_client_sink_stop (GstBaseSink * sink)
501 {
502   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
503
504   GST_DEBUG ("stop");
505
506   if (souphttpsink->prop_session == NULL) {
507     soup_session_abort (souphttpsink->session);
508     g_object_unref (souphttpsink->session);
509   }
510
511   if (souphttpsink->loop) {
512     g_main_loop_quit (souphttpsink->loop);
513     g_thread_join (souphttpsink->thread);
514     g_main_loop_unref (souphttpsink->loop);
515     souphttpsink->loop = NULL;
516   }
517   if (souphttpsink->context) {
518     g_main_context_unref (souphttpsink->context);
519     souphttpsink->context = NULL;
520   }
521
522   gst_soup_http_client_sink_reset (souphttpsink);
523
524   return TRUE;
525 }
526
527 static gboolean
528 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
529 {
530   GST_DEBUG ("unlock");
531
532   return TRUE;
533 }
534
535 static gboolean
536 gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
537 {
538   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
539
540   GST_DEBUG_OBJECT (souphttpsink, "event");
541
542   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
543     GST_DEBUG_OBJECT (souphttpsink, "got eos");
544     g_mutex_lock (souphttpsink->mutex);
545     while (souphttpsink->message) {
546       GST_DEBUG_OBJECT (souphttpsink, "waiting");
547       g_cond_wait (souphttpsink->cond, souphttpsink->mutex);
548     }
549     g_mutex_unlock (souphttpsink->mutex);
550     GST_DEBUG_OBJECT (souphttpsink, "finished eos");
551   }
552
553   return TRUE;
554 }
555
556 static GstFlowReturn
557 gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
558 {
559   GST_DEBUG ("preroll");
560
561   return GST_FLOW_OK;
562 }
563
564 static void
565 free_buffer_list (GList * list)
566 {
567   GList *g;
568   for (g = list; g; g = g_list_next (g)) {
569     GstBuffer *buffer = g->data;
570     gst_buffer_unref (buffer);
571   }
572   g_list_free (list);
573 }
574
575 static void
576 send_message_locked (GstSoupHttpClientSink * souphttpsink)
577 {
578   GList *g;
579   guint64 n;
580
581   if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
582     return;
583   }
584
585   /* If the URI went away, drop all these buffers */
586   if (souphttpsink->location == NULL) {
587     free_buffer_list (souphttpsink->queued_buffers);
588     souphttpsink->queued_buffers = NULL;
589     return;
590   }
591
592   souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
593
594   n = 0;
595   if (souphttpsink->offset == 0) {
596     for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
597       GstBuffer *buffer = g->data;
598       soup_message_body_append (souphttpsink->message->request_body,
599           SOUP_MEMORY_STATIC, GST_BUFFER_DATA (buffer),
600           GST_BUFFER_SIZE (buffer));
601       n += GST_BUFFER_SIZE (buffer);
602     }
603   }
604
605   for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
606     GstBuffer *buffer = g->data;
607     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
608       soup_message_body_append (souphttpsink->message->request_body,
609           SOUP_MEMORY_STATIC, GST_BUFFER_DATA (buffer),
610           GST_BUFFER_SIZE (buffer));
611       n += GST_BUFFER_SIZE (buffer);
612     }
613   }
614
615   if (souphttpsink->offset != 0) {
616     char *s;
617     s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
618         souphttpsink->offset, souphttpsink->offset + n - 1);
619     soup_message_headers_append (souphttpsink->message->request_headers,
620         "Content-Range", s);
621     g_free (s);
622   }
623
624   if (n == 0) {
625     free_buffer_list (souphttpsink->queued_buffers);
626     souphttpsink->queued_buffers = NULL;
627     g_object_unref (souphttpsink->message);
628     souphttpsink->message = NULL;
629     return;
630   }
631
632   souphttpsink->sent_buffers = souphttpsink->queued_buffers;
633   souphttpsink->queued_buffers = NULL;
634
635   GST_DEBUG_OBJECT (souphttpsink,
636       "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
637       souphttpsink->offset, n);
638   soup_session_queue_message (souphttpsink->session, souphttpsink->message,
639       callback, souphttpsink);
640
641   souphttpsink->offset += n;
642 }
643
644 static gboolean
645 send_message (GstSoupHttpClientSink * souphttpsink)
646 {
647   g_mutex_lock (souphttpsink->mutex);
648   send_message_locked (souphttpsink);
649   g_mutex_unlock (souphttpsink->mutex);
650
651   return FALSE;
652 }
653
654 static void
655 callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
656 {
657   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
658
659   GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
660       msg->status_code, msg->reason_phrase);
661
662   g_mutex_lock (souphttpsink->mutex);
663   g_cond_signal (souphttpsink->cond);
664   souphttpsink->message = NULL;
665
666   if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
667     souphttpsink->status_code = msg->status_code;
668     souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
669     g_mutex_unlock (souphttpsink->mutex);
670     return;
671   }
672
673   free_buffer_list (souphttpsink->sent_buffers);
674   souphttpsink->sent_buffers = NULL;
675
676   send_message_locked (souphttpsink);
677   g_mutex_unlock (souphttpsink->mutex);
678 }
679
680 static GstFlowReturn
681 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
682 {
683   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
684   GSource *source;
685   gboolean wake;
686
687   if (souphttpsink->status_code != 0) {
688     /* FIXME we should allow a moderate amount of retries. */
689     GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
690         ("Could not write to HTTP URI"),
691         ("error: %d %s", souphttpsink->status_code,
692             souphttpsink->reason_phrase));
693     return GST_FLOW_ERROR;
694   }
695
696   g_mutex_lock (souphttpsink->mutex);
697   if (souphttpsink->location != NULL) {
698     wake = (souphttpsink->queued_buffers == NULL);
699     souphttpsink->queued_buffers =
700         g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
701
702     if (wake) {
703       source = g_idle_source_new ();
704       g_source_set_callback (source, (GSourceFunc) (send_message),
705           souphttpsink, NULL);
706       g_source_attach (source, souphttpsink->context);
707       g_source_unref (source);
708     }
709   }
710   g_mutex_unlock (souphttpsink->mutex);
711
712   return GST_FLOW_OK;
713 }
714
715 static void
716 authenticate (SoupSession * session, SoupMessage * msg,
717     SoupAuth * auth, gboolean retrying, gpointer user_data)
718 {
719   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
720
721   if (!retrying) {
722     if (souphttpsink->user_id && souphttpsink->user_pw) {
723       soup_auth_authenticate (auth,
724           souphttpsink->user_id, souphttpsink->user_pw);
725     }
726   }
727 }