update for basesink event handler changes
[platform/upstream/gstreamer.git] / ext / soup / gstsouphttpclientsink.c
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17  * Boston, MA 02110-1335, USA.
18  */
19 /**
20  * SECTION:element-gstsouphttpclientsink
21  *
22  * The souphttpclientsink element sends pipeline data to an HTTP server
23  * using HTTP PUT commands.
24  *
25  * <refsect2>
26  * <title>Example launch line</title>
27  * |[
28  * gst-launch -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
29  *   souphttpclientsink location=http://server/filename.ogv
30  * ]|
31  * 
32  * This example encodes 10 seconds of video and sends it to the HTTP
33  * server "server" using HTTP PUT commands.
34  * </refsect2>
35  */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include <gst/gst.h>
42 #include <gst/base/gstbasesink.h>
43 #include "gstsouphttpclientsink.h"
44
45 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
46 #define GST_CAT_DEFAULT souphttpclientsink_dbg
47
48 /* prototypes */
49
50
51 static void gst_soup_http_client_sink_set_property (GObject * object,
52     guint property_id, const GValue * value, GParamSpec * pspec);
53 static void gst_soup_http_client_sink_get_property (GObject * object,
54     guint property_id, GValue * value, GParamSpec * pspec);
55 static void gst_soup_http_client_sink_dispose (GObject * object);
56 static void gst_soup_http_client_sink_finalize (GObject * object);
57
58 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
59     GstCaps * caps);
60 static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
61     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
62 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
63 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
64 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
65 static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
66     GstEvent * event);
67 static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
68     GstBuffer * buffer);
69 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
70     GstBuffer * buffer);
71
72 static void free_buffer_list (GList * list);
73 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
74     souphttpsink);
75 static void authenticate (SoupSession * session, SoupMessage * msg,
76     SoupAuth * auth, gboolean retrying, gpointer user_data);
77 static void callback (SoupSession * session, SoupMessage * msg,
78     gpointer user_data);
79 static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
80     souphttpsink, const gchar * uri);
81
82 enum
83 {
84   PROP_0,
85   PROP_LOCATION,
86   PROP_USER_AGENT,
87   PROP_AUTOMATIC_REDIRECT,
88   PROP_PROXY,
89   PROP_USER_ID,
90   PROP_USER_PW,
91   PROP_PROXY_ID,
92   PROP_PROXY_PW,
93   PROP_COOKIES,
94   PROP_SESSION
95 };
96
97 #define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
98
99 /* pad templates */
100
101 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
102 GST_STATIC_PAD_TEMPLATE ("sink",
103     GST_PAD_SINK,
104     GST_PAD_ALWAYS,
105     GST_STATIC_CAPS_ANY);
106
107
108 /* class initialization */
109
110 #define gst_soup_http_client_sink_parent_class parent_class
111 G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
112     GST_TYPE_BASE_SINK);
113
114 static void
115 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
116 {
117   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
118   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
119   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
120
121   gobject_class->set_property = gst_soup_http_client_sink_set_property;
122   gobject_class->get_property = gst_soup_http_client_sink_get_property;
123   gobject_class->dispose = gst_soup_http_client_sink_dispose;
124   gobject_class->finalize = gst_soup_http_client_sink_finalize;
125
126   g_object_class_install_property (gobject_class,
127       PROP_LOCATION,
128       g_param_spec_string ("location", "Location",
129           "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
130   g_object_class_install_property (gobject_class,
131       PROP_USER_AGENT,
132       g_param_spec_string ("user-agent", "User-Agent",
133           "Value of the User-Agent HTTP request header field",
134           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
135   g_object_class_install_property (gobject_class,
136       PROP_AUTOMATIC_REDIRECT,
137       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
138           "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
139           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140   g_object_class_install_property (gobject_class,
141       PROP_PROXY,
142       g_param_spec_string ("proxy", "Proxy",
143           "HTTP proxy server URI", "",
144           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
145   g_object_class_install_property (gobject_class,
146       PROP_USER_ID,
147       g_param_spec_string ("user-id", "user-id",
148           "user id for authentication", "",
149           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150   g_object_class_install_property (gobject_class, PROP_USER_PW,
151       g_param_spec_string ("user-pw", "user-pw",
152           "user password for authentication", "",
153           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
155       g_param_spec_string ("proxy-id", "proxy-id",
156           "user id for proxy authentication", "",
157           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
159       g_param_spec_string ("proxy-pw", "proxy-pw",
160           "user password for proxy authentication", "",
161           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162   g_object_class_install_property (gobject_class, PROP_SESSION,
163       g_param_spec_object ("session", "session",
164           "SoupSession object to use for communication",
165           SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
166   g_object_class_install_property (gobject_class, PROP_COOKIES,
167       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
168           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
169
170   gst_element_class_add_pad_template (gstelement_class,
171       gst_static_pad_template_get (&gst_soup_http_client_sink_sink_template));
172
173   gst_element_class_set_details_simple (gstelement_class, "HTTP client sink",
174       "Generic", "Sends streams to HTTP server via PUT",
175       "David Schleef <ds@entropywave.com>");
176
177   base_sink_class->set_caps =
178       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
179   if (0)
180     base_sink_class->get_times =
181         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
182   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
183   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
184   base_sink_class->unlock =
185       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
186   base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
187   if (0)
188     base_sink_class->preroll =
189         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
190   base_sink_class->render =
191       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
192
193   GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
194       "souphttpclientsink element");
195
196 }
197
198 static void
199 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
200 {
201   const char *proxy;
202
203   souphttpsink->mutex = g_mutex_new ();
204   souphttpsink->cond = g_cond_new ();
205
206   souphttpsink->location = NULL;
207   souphttpsink->automatic_redirect = TRUE;
208   souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
209   souphttpsink->user_id = NULL;
210   souphttpsink->user_pw = NULL;
211   souphttpsink->proxy_id = NULL;
212   souphttpsink->proxy_pw = NULL;
213   souphttpsink->prop_session = NULL;
214   souphttpsink->timeout = 1;
215   proxy = g_getenv ("http_proxy");
216   if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
217     GST_WARNING_OBJECT (souphttpsink,
218         "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
219         proxy);
220   }
221
222   gst_soup_http_client_sink_reset (souphttpsink);
223 }
224
225 static void
226 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
227 {
228   g_free (souphttpsink->reason_phrase);
229   souphttpsink->reason_phrase = NULL;
230   souphttpsink->status_code = 0;
231   souphttpsink->offset = 0;
232
233 }
234
235 static gboolean
236 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
237     const gchar * uri)
238 {
239   if (souphttpsink->proxy) {
240     soup_uri_free (souphttpsink->proxy);
241     souphttpsink->proxy = NULL;
242   }
243   if (g_str_has_prefix (uri, "http://")) {
244     souphttpsink->proxy = soup_uri_new (uri);
245   } else {
246     gchar *new_uri = g_strconcat ("http://", uri, NULL);
247
248     souphttpsink->proxy = soup_uri_new (new_uri);
249     g_free (new_uri);
250   }
251
252   return TRUE;
253 }
254
255 void
256 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
257     const GValue * value, GParamSpec * pspec)
258 {
259   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
260
261   g_mutex_lock (souphttpsink->mutex);
262   switch (property_id) {
263     case PROP_SESSION:
264       if (souphttpsink->prop_session) {
265         g_object_unref (souphttpsink->prop_session);
266       }
267       souphttpsink->prop_session = g_value_dup_object (value);
268       break;
269     case PROP_LOCATION:
270       g_free (souphttpsink->location);
271       souphttpsink->location = g_value_dup_string (value);
272       souphttpsink->offset = 0;
273       break;
274     case PROP_USER_AGENT:
275       g_free (souphttpsink->user_agent);
276       souphttpsink->user_agent = g_value_dup_string (value);
277       break;
278     case PROP_AUTOMATIC_REDIRECT:
279       souphttpsink->automatic_redirect = g_value_get_boolean (value);
280       break;
281     case PROP_USER_ID:
282       g_free (souphttpsink->user_id);
283       souphttpsink->user_id = g_value_dup_string (value);
284       break;
285     case PROP_USER_PW:
286       g_free (souphttpsink->user_pw);
287       souphttpsink->user_pw = g_value_dup_string (value);
288       break;
289     case PROP_PROXY_ID:
290       g_free (souphttpsink->proxy_id);
291       souphttpsink->proxy_id = g_value_dup_string (value);
292       break;
293     case PROP_PROXY_PW:
294       g_free (souphttpsink->proxy_pw);
295       souphttpsink->proxy_pw = g_value_dup_string (value);
296       break;
297     case PROP_PROXY:
298     {
299       const gchar *proxy;
300
301       proxy = g_value_get_string (value);
302
303       if (proxy == NULL) {
304         GST_WARNING ("proxy property cannot be NULL");
305         goto done;
306       }
307       if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
308         GST_WARNING ("badly formatted proxy URI");
309         goto done;
310       }
311       break;
312     }
313     case PROP_COOKIES:
314       g_strfreev (souphttpsink->cookies);
315       souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
316       break;
317     default:
318       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
319       break;
320   }
321 done:
322   g_mutex_unlock (souphttpsink->mutex);
323 }
324
325 void
326 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
327     GValue * value, GParamSpec * pspec)
328 {
329   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
330
331   switch (property_id) {
332     case PROP_SESSION:
333       g_value_set_object (value, souphttpsink->prop_session);
334       break;
335     case PROP_LOCATION:
336       g_value_set_string (value, souphttpsink->location);
337       break;
338     case PROP_AUTOMATIC_REDIRECT:
339       g_value_set_boolean (value, souphttpsink->automatic_redirect);
340       break;
341     case PROP_USER_AGENT:
342       g_value_set_string (value, souphttpsink->user_agent);
343       break;
344     case PROP_USER_ID:
345       g_value_set_string (value, souphttpsink->user_id);
346       break;
347     case PROP_USER_PW:
348       g_value_set_string (value, souphttpsink->user_pw);
349       break;
350     case PROP_PROXY_ID:
351       g_value_set_string (value, souphttpsink->proxy_id);
352       break;
353     case PROP_PROXY_PW:
354       g_value_set_string (value, souphttpsink->proxy_pw);
355       break;
356     case PROP_PROXY:
357       if (souphttpsink->proxy == NULL)
358         g_value_set_static_string (value, "");
359       else {
360         char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
361
362         g_value_set_string (value, proxy);
363         g_free (proxy);
364       }
365       break;
366     case PROP_COOKIES:
367       g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
368       break;
369     default:
370       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
371       break;
372   }
373 }
374
375 void
376 gst_soup_http_client_sink_dispose (GObject * object)
377 {
378   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
379
380   /* clean up as possible.  may be called multiple times */
381   if (souphttpsink->prop_session)
382     g_object_unref (souphttpsink->prop_session);
383   souphttpsink->prop_session = NULL;
384
385   G_OBJECT_CLASS (parent_class)->dispose (object);
386 }
387
388 void
389 gst_soup_http_client_sink_finalize (GObject * object)
390 {
391   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
392
393   /* clean up object here */
394
395   g_free (souphttpsink->user_agent);
396   g_free (souphttpsink->user_id);
397   g_free (souphttpsink->user_pw);
398   g_free (souphttpsink->proxy_id);
399   g_free (souphttpsink->proxy_pw);
400   if (souphttpsink->proxy)
401     soup_uri_free (souphttpsink->proxy);
402   g_free (souphttpsink->location);
403
404   g_cond_free (souphttpsink->cond);
405   g_mutex_free (souphttpsink->mutex);
406
407   G_OBJECT_CLASS (parent_class)->finalize (object);
408 }
409
410
411
412 static gboolean
413 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
414 {
415   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
416   GstStructure *structure;
417   const GValue *value_array;
418   int i, n;
419
420   structure = gst_caps_get_structure (caps, 0);
421   value_array = gst_structure_get_value (structure, "streamheader");
422   if (value_array) {
423     free_buffer_list (souphttpsink->streamheader_buffers);
424     souphttpsink->streamheader_buffers = NULL;
425
426     n = gst_value_array_get_size (value_array);
427     for (i = 0; i < n; i++) {
428       const GValue *value;
429       GstBuffer *buffer;
430       value = gst_value_array_get_value (value_array, i);
431       buffer = GST_BUFFER (gst_value_get_buffer (value));
432       souphttpsink->streamheader_buffers =
433           g_list_append (souphttpsink->streamheader_buffers,
434           gst_buffer_ref (buffer));
435     }
436   }
437
438   return TRUE;
439 }
440
441 static void
442 gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
443     GstClockTime * start, GstClockTime * end)
444 {
445
446 }
447
448 static gpointer
449 thread_func (gpointer ptr)
450 {
451   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
452
453   GST_DEBUG ("thread start");
454
455   souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
456   g_main_loop_run (souphttpsink->loop);
457
458   GST_DEBUG ("thread quit");
459
460   return NULL;
461 }
462
463 static gboolean
464 gst_soup_http_client_sink_start (GstBaseSink * sink)
465 {
466   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
467
468   if (souphttpsink->prop_session) {
469     souphttpsink->session = souphttpsink->prop_session;
470   } else {
471     GError *error = NULL;
472
473     souphttpsink->context = g_main_context_new ();
474
475     souphttpsink->thread = g_thread_create (thread_func, souphttpsink,
476         TRUE, &error);
477
478     souphttpsink->session =
479         soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
480         souphttpsink->context, SOUP_SESSION_USER_AGENT,
481         souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
482         NULL);
483
484     //soup_session_add_feature (souphttpsink->session,
485     //    SOUP_SESSION_FEATURE (soup_logger_new (SOUP_LOGGER_LOG_BODY, 100)));
486
487     g_signal_connect (souphttpsink->session, "authenticate",
488         G_CALLBACK (authenticate), souphttpsink);
489   }
490
491   return TRUE;
492 }
493
494 static gboolean
495 gst_soup_http_client_sink_stop (GstBaseSink * sink)
496 {
497   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
498
499   GST_DEBUG ("stop");
500
501   if (souphttpsink->prop_session == NULL) {
502     soup_session_abort (souphttpsink->session);
503     g_object_unref (souphttpsink->session);
504   }
505
506   if (souphttpsink->loop) {
507     g_main_loop_quit (souphttpsink->loop);
508     g_thread_join (souphttpsink->thread);
509     g_main_loop_unref (souphttpsink->loop);
510     souphttpsink->loop = NULL;
511   }
512   if (souphttpsink->context) {
513     g_main_context_unref (souphttpsink->context);
514     souphttpsink->context = NULL;
515   }
516
517   gst_soup_http_client_sink_reset (souphttpsink);
518
519   return TRUE;
520 }
521
522 static gboolean
523 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
524 {
525   GST_DEBUG ("unlock");
526
527   return TRUE;
528 }
529
530 static gboolean
531 gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
532 {
533   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
534
535   GST_DEBUG_OBJECT (souphttpsink, "event");
536
537   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
538     GST_DEBUG_OBJECT (souphttpsink, "got eos");
539     g_mutex_lock (souphttpsink->mutex);
540     while (souphttpsink->message) {
541       GST_DEBUG_OBJECT (souphttpsink, "waiting");
542       g_cond_wait (souphttpsink->cond, souphttpsink->mutex);
543     }
544     g_mutex_unlock (souphttpsink->mutex);
545     GST_DEBUG_OBJECT (souphttpsink, "finished eos");
546   }
547
548   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
549 }
550
551 static GstFlowReturn
552 gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
553 {
554   GST_DEBUG ("preroll");
555
556   return GST_FLOW_OK;
557 }
558
559 static void
560 free_buffer_list (GList * list)
561 {
562   GList *g;
563   for (g = list; g; g = g_list_next (g)) {
564     GstBuffer *buffer = g->data;
565     gst_buffer_unref (buffer);
566   }
567   g_list_free (list);
568 }
569
570 static void
571 send_message_locked (GstSoupHttpClientSink * souphttpsink)
572 {
573   GList *g;
574   guint64 n;
575
576   if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
577     return;
578   }
579
580   /* If the URI went away, drop all these buffers */
581   if (souphttpsink->location == NULL) {
582     free_buffer_list (souphttpsink->queued_buffers);
583     souphttpsink->queued_buffers = NULL;
584     return;
585   }
586
587   souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
588
589   n = 0;
590   if (souphttpsink->offset == 0) {
591     for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
592       GstBuffer *buffer = g->data;
593       gpointer data;
594       gsize size;
595
596       /* FIXME, lifetime of the buffer? */
597       data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
598       soup_message_body_append (souphttpsink->message->request_body,
599           SOUP_MEMORY_STATIC, data, size);
600       n += size;
601       gst_buffer_unmap (buffer, data, size);
602     }
603   }
604
605   for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
606     GstBuffer *buffer = g->data;
607     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
608       gpointer data;
609       gsize size;
610
611       /* FIXME, lifetime of the buffer? */
612       data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
613       soup_message_body_append (souphttpsink->message->request_body,
614           SOUP_MEMORY_STATIC, data, size);
615       n += size;
616       gst_buffer_unmap (buffer, data, size);
617     }
618   }
619
620   if (souphttpsink->offset != 0) {
621     char *s;
622     s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
623         souphttpsink->offset, souphttpsink->offset + n - 1);
624     soup_message_headers_append (souphttpsink->message->request_headers,
625         "Content-Range", s);
626     g_free (s);
627   }
628
629   if (n == 0) {
630     free_buffer_list (souphttpsink->queued_buffers);
631     souphttpsink->queued_buffers = NULL;
632     g_object_unref (souphttpsink->message);
633     souphttpsink->message = NULL;
634     return;
635   }
636
637   souphttpsink->sent_buffers = souphttpsink->queued_buffers;
638   souphttpsink->queued_buffers = NULL;
639
640   GST_DEBUG_OBJECT (souphttpsink,
641       "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
642       souphttpsink->offset, n);
643   soup_session_queue_message (souphttpsink->session, souphttpsink->message,
644       callback, souphttpsink);
645
646   souphttpsink->offset += n;
647 }
648
649 static gboolean
650 send_message (GstSoupHttpClientSink * souphttpsink)
651 {
652   g_mutex_lock (souphttpsink->mutex);
653   send_message_locked (souphttpsink);
654   g_mutex_unlock (souphttpsink->mutex);
655
656   return FALSE;
657 }
658
659 static void
660 callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
661 {
662   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
663
664   GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
665       msg->status_code, msg->reason_phrase);
666
667   g_mutex_lock (souphttpsink->mutex);
668   g_cond_signal (souphttpsink->cond);
669   souphttpsink->message = NULL;
670
671   if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
672     souphttpsink->status_code = msg->status_code;
673     souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
674     g_mutex_unlock (souphttpsink->mutex);
675     return;
676   }
677
678   free_buffer_list (souphttpsink->sent_buffers);
679   souphttpsink->sent_buffers = NULL;
680
681   send_message_locked (souphttpsink);
682   g_mutex_unlock (souphttpsink->mutex);
683 }
684
685 static GstFlowReturn
686 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
687 {
688   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
689   GSource *source;
690   gboolean wake;
691
692   if (souphttpsink->status_code != 0) {
693     /* FIXME we should allow a moderate amount of retries. */
694     GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
695         ("Could not write to HTTP URI"),
696         ("error: %d %s", souphttpsink->status_code,
697             souphttpsink->reason_phrase));
698     return GST_FLOW_ERROR;
699   }
700
701   g_mutex_lock (souphttpsink->mutex);
702   if (souphttpsink->location != NULL) {
703     wake = (souphttpsink->queued_buffers == NULL);
704     souphttpsink->queued_buffers =
705         g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
706
707     if (wake) {
708       source = g_idle_source_new ();
709       g_source_set_callback (source, (GSourceFunc) (send_message),
710           souphttpsink, NULL);
711       g_source_attach (source, souphttpsink->context);
712       g_source_unref (source);
713     }
714   }
715   g_mutex_unlock (souphttpsink->mutex);
716
717   return GST_FLOW_OK;
718 }
719
720 static void
721 authenticate (SoupSession * session, SoupMessage * msg,
722     SoupAuth * auth, gboolean retrying, gpointer user_data)
723 {
724   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
725
726   if (!retrying) {
727     if (souphttpsink->user_id && souphttpsink->user_pw) {
728       soup_auth_authenticate (auth,
729           souphttpsink->user_id, souphttpsink->user_pw);
730     }
731   }
732 }