rtpvp8depay: Parse width/height/profile from keyframes
[platform/upstream/gst-plugins-good.git] / ext / soup / gstsouphttpclientsink.c
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17  * Boston, MA 02110-1335, USA.
18  */
19 /**
20  * SECTION:element-gstsouphttpclientsink
21  *
22  * The souphttpclientsink element sends pipeline data to an HTTP server
23  * using HTTP PUT commands.
24  *
25  * <refsect2>
26  * <title>Example launch line</title>
27  * |[
28  * gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
29  *   souphttpclientsink location=http://server/filename.ogv
30  * ]|
31  * 
32  * This example encodes 10 seconds of video and sends it to the HTTP
33  * server "server" using HTTP PUT commands.
34  * </refsect2>
35  */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include <gst/gst.h>
42 #include <gst/base/gstbasesink.h>
43 #include "gstsouphttpclientsink.h"
44 #include "gstsouputils.h"
45
46 #include <gst/glib-compat-private.h>
47
48 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
49 #define GST_CAT_DEFAULT souphttpclientsink_dbg
50
51 /* prototypes */
52
53
54 static void gst_soup_http_client_sink_set_property (GObject * object,
55     guint property_id, const GValue * value, GParamSpec * pspec);
56 static void gst_soup_http_client_sink_get_property (GObject * object,
57     guint property_id, GValue * value, GParamSpec * pspec);
58 static void gst_soup_http_client_sink_dispose (GObject * object);
59 static void gst_soup_http_client_sink_finalize (GObject * object);
60
61 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
62     GstCaps * caps);
63 static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
64     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
65 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
66 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
67 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
68 static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
69     GstEvent * event);
70 static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
71     GstBuffer * buffer);
72 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
73     GstBuffer * buffer);
74
75 static void free_buffer_list (GList * list);
76 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
77     souphttpsink);
78 static void authenticate (SoupSession * session, SoupMessage * msg,
79     SoupAuth * auth, gboolean retrying, gpointer user_data);
80 static void callback (SoupSession * session, SoupMessage * msg,
81     gpointer user_data);
82 static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
83     souphttpsink, const gchar * uri);
84
85 enum
86 {
87   PROP_0,
88   PROP_LOCATION,
89   PROP_USER_AGENT,
90   PROP_AUTOMATIC_REDIRECT,
91   PROP_PROXY,
92   PROP_USER_ID,
93   PROP_USER_PW,
94   PROP_PROXY_ID,
95   PROP_PROXY_PW,
96   PROP_COOKIES,
97   PROP_SESSION,
98   PROP_SOUP_LOG_LEVEL
99 };
100
101 #define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
102 #define DEFAULT_SOUP_LOG_LEVEL       SOUP_LOGGER_LOG_NONE
103
104 /* pad templates */
105
106 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
107 GST_STATIC_PAD_TEMPLATE ("sink",
108     GST_PAD_SINK,
109     GST_PAD_ALWAYS,
110     GST_STATIC_CAPS_ANY);
111
112
113 /* class initialization */
114
115 #define gst_soup_http_client_sink_parent_class parent_class
116 G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
117     GST_TYPE_BASE_SINK);
118
119 static void
120 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
121 {
122   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
123   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
124   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
125
126   gobject_class->set_property = gst_soup_http_client_sink_set_property;
127   gobject_class->get_property = gst_soup_http_client_sink_get_property;
128   gobject_class->dispose = gst_soup_http_client_sink_dispose;
129   gobject_class->finalize = gst_soup_http_client_sink_finalize;
130
131   g_object_class_install_property (gobject_class,
132       PROP_LOCATION,
133       g_param_spec_string ("location", "Location",
134           "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
135   g_object_class_install_property (gobject_class,
136       PROP_USER_AGENT,
137       g_param_spec_string ("user-agent", "User-Agent",
138           "Value of the User-Agent HTTP request header field",
139           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140   g_object_class_install_property (gobject_class,
141       PROP_AUTOMATIC_REDIRECT,
142       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
143           "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
144           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
145   g_object_class_install_property (gobject_class,
146       PROP_PROXY,
147       g_param_spec_string ("proxy", "Proxy",
148           "HTTP proxy server URI", "",
149           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150   g_object_class_install_property (gobject_class,
151       PROP_USER_ID,
152       g_param_spec_string ("user-id", "user-id",
153           "user id for authentication", "",
154           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
155   g_object_class_install_property (gobject_class, PROP_USER_PW,
156       g_param_spec_string ("user-pw", "user-pw",
157           "user password for authentication", "",
158           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
159   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
160       g_param_spec_string ("proxy-id", "proxy-id",
161           "user id for proxy authentication", "",
162           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
163   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
164       g_param_spec_string ("proxy-pw", "proxy-pw",
165           "user password for proxy authentication", "",
166           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167   g_object_class_install_property (gobject_class, PROP_SESSION,
168       g_param_spec_object ("session", "session",
169           "SoupSession object to use for communication",
170           SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
171   g_object_class_install_property (gobject_class, PROP_COOKIES,
172       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
173           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
174  /**
175    * GstSoupHttpClientSink::http-log-level:
176    *
177    * If set and > 0, captures and dumps HTTP session data as
178    * log messages if log level >= GST_LEVEL_TRACE
179    *
180    * Since: 1.4
181    */
182   g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL,
183       g_param_spec_enum ("http-log-level", "HTTP log level",
184           "Set log level for soup's HTTP session log",
185           SOUP_TYPE_LOGGER_LOG_LEVEL, DEFAULT_SOUP_LOG_LEVEL,
186           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
187
188   gst_element_class_add_pad_template (gstelement_class,
189       gst_static_pad_template_get (&gst_soup_http_client_sink_sink_template));
190
191   gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink",
192       "Generic", "Sends streams to HTTP server via PUT",
193       "David Schleef <ds@entropywave.com>");
194
195   base_sink_class->set_caps =
196       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
197   if (0)
198     base_sink_class->get_times =
199         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
200   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
201   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
202   base_sink_class->unlock =
203       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
204   base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
205   if (0)
206     base_sink_class->preroll =
207         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
208   base_sink_class->render =
209       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
210
211   GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
212       "souphttpclientsink element");
213
214 }
215
216 static void
217 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
218 {
219   const char *proxy;
220
221   g_mutex_init (&souphttpsink->mutex);
222   g_cond_init (&souphttpsink->cond);
223
224   souphttpsink->location = NULL;
225   souphttpsink->automatic_redirect = TRUE;
226   souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
227   souphttpsink->user_id = NULL;
228   souphttpsink->user_pw = NULL;
229   souphttpsink->proxy_id = NULL;
230   souphttpsink->proxy_pw = NULL;
231   souphttpsink->prop_session = NULL;
232   souphttpsink->timeout = 1;
233   souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
234   proxy = g_getenv ("http_proxy");
235   if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
236     GST_WARNING_OBJECT (souphttpsink,
237         "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
238         proxy);
239   }
240
241   gst_soup_http_client_sink_reset (souphttpsink);
242 }
243
244 static void
245 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
246 {
247   g_list_free_full (souphttpsink->queued_buffers,
248       (GDestroyNotify) gst_buffer_unref);
249   souphttpsink->queued_buffers = NULL;
250   g_free (souphttpsink->reason_phrase);
251   souphttpsink->reason_phrase = NULL;
252   souphttpsink->status_code = 0;
253   souphttpsink->offset = 0;
254
255 }
256
257 static gboolean
258 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
259     const gchar * uri)
260 {
261   if (souphttpsink->proxy) {
262     soup_uri_free (souphttpsink->proxy);
263     souphttpsink->proxy = NULL;
264   }
265   if (g_str_has_prefix (uri, "http://")) {
266     souphttpsink->proxy = soup_uri_new (uri);
267   } else {
268     gchar *new_uri = g_strconcat ("http://", uri, NULL);
269
270     souphttpsink->proxy = soup_uri_new (new_uri);
271     g_free (new_uri);
272   }
273
274   return TRUE;
275 }
276
277 void
278 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
279     const GValue * value, GParamSpec * pspec)
280 {
281   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
282
283   g_mutex_lock (&souphttpsink->mutex);
284   switch (property_id) {
285     case PROP_SESSION:
286       if (souphttpsink->prop_session) {
287         g_object_unref (souphttpsink->prop_session);
288       }
289       souphttpsink->prop_session = g_value_dup_object (value);
290       break;
291     case PROP_LOCATION:
292       g_free (souphttpsink->location);
293       souphttpsink->location = g_value_dup_string (value);
294       souphttpsink->offset = 0;
295       break;
296     case PROP_USER_AGENT:
297       g_free (souphttpsink->user_agent);
298       souphttpsink->user_agent = g_value_dup_string (value);
299       break;
300     case PROP_AUTOMATIC_REDIRECT:
301       souphttpsink->automatic_redirect = g_value_get_boolean (value);
302       break;
303     case PROP_USER_ID:
304       g_free (souphttpsink->user_id);
305       souphttpsink->user_id = g_value_dup_string (value);
306       break;
307     case PROP_USER_PW:
308       g_free (souphttpsink->user_pw);
309       souphttpsink->user_pw = g_value_dup_string (value);
310       break;
311     case PROP_PROXY_ID:
312       g_free (souphttpsink->proxy_id);
313       souphttpsink->proxy_id = g_value_dup_string (value);
314       break;
315     case PROP_PROXY_PW:
316       g_free (souphttpsink->proxy_pw);
317       souphttpsink->proxy_pw = g_value_dup_string (value);
318       break;
319     case PROP_PROXY:
320     {
321       const gchar *proxy;
322
323       proxy = g_value_get_string (value);
324
325       if (proxy == NULL) {
326         GST_WARNING ("proxy property cannot be NULL");
327         goto done;
328       }
329       if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
330         GST_WARNING ("badly formatted proxy URI");
331         goto done;
332       }
333       break;
334     }
335     case PROP_COOKIES:
336       g_strfreev (souphttpsink->cookies);
337       souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
338       break;
339     case PROP_SOUP_LOG_LEVEL:
340       souphttpsink->log_level = g_value_get_enum (value);
341       break;
342     default:
343       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
344       break;
345   }
346 done:
347   g_mutex_unlock (&souphttpsink->mutex);
348 }
349
350 void
351 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
352     GValue * value, GParamSpec * pspec)
353 {
354   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
355
356   switch (property_id) {
357     case PROP_SESSION:
358       g_value_set_object (value, souphttpsink->prop_session);
359       break;
360     case PROP_LOCATION:
361       g_value_set_string (value, souphttpsink->location);
362       break;
363     case PROP_AUTOMATIC_REDIRECT:
364       g_value_set_boolean (value, souphttpsink->automatic_redirect);
365       break;
366     case PROP_USER_AGENT:
367       g_value_set_string (value, souphttpsink->user_agent);
368       break;
369     case PROP_USER_ID:
370       g_value_set_string (value, souphttpsink->user_id);
371       break;
372     case PROP_USER_PW:
373       g_value_set_string (value, souphttpsink->user_pw);
374       break;
375     case PROP_PROXY_ID:
376       g_value_set_string (value, souphttpsink->proxy_id);
377       break;
378     case PROP_PROXY_PW:
379       g_value_set_string (value, souphttpsink->proxy_pw);
380       break;
381     case PROP_PROXY:
382       if (souphttpsink->proxy == NULL)
383         g_value_set_static_string (value, "");
384       else {
385         char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
386
387         g_value_set_string (value, proxy);
388         g_free (proxy);
389       }
390       break;
391     case PROP_COOKIES:
392       g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
393       break;
394     case PROP_SOUP_LOG_LEVEL:
395       g_value_set_enum (value, souphttpsink->log_level);
396       break;
397     default:
398       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
399       break;
400   }
401 }
402
403 void
404 gst_soup_http_client_sink_dispose (GObject * object)
405 {
406   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
407
408   /* clean up as possible.  may be called multiple times */
409   if (souphttpsink->prop_session)
410     g_object_unref (souphttpsink->prop_session);
411   souphttpsink->prop_session = NULL;
412
413   G_OBJECT_CLASS (parent_class)->dispose (object);
414 }
415
416 void
417 gst_soup_http_client_sink_finalize (GObject * object)
418 {
419   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
420
421   /* clean up object here */
422
423   g_free (souphttpsink->user_agent);
424   g_free (souphttpsink->user_id);
425   g_free (souphttpsink->user_pw);
426   g_free (souphttpsink->proxy_id);
427   g_free (souphttpsink->proxy_pw);
428   if (souphttpsink->proxy)
429     soup_uri_free (souphttpsink->proxy);
430   g_free (souphttpsink->location);
431   g_strfreev (souphttpsink->cookies);
432
433   g_cond_clear (&souphttpsink->cond);
434   g_mutex_clear (&souphttpsink->mutex);
435
436   G_OBJECT_CLASS (parent_class)->finalize (object);
437 }
438
439
440
441 static gboolean
442 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
443 {
444   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
445   GstStructure *structure;
446   const GValue *value_array;
447   int i, n;
448
449   GST_DEBUG_OBJECT (souphttpsink, "new stream headers set");
450   structure = gst_caps_get_structure (caps, 0);
451   value_array = gst_structure_get_value (structure, "streamheader");
452   if (value_array) {
453     free_buffer_list (souphttpsink->streamheader_buffers);
454     souphttpsink->streamheader_buffers = NULL;
455
456     n = gst_value_array_get_size (value_array);
457     for (i = 0; i < n; i++) {
458       const GValue *value;
459       GstBuffer *buffer;
460       value = gst_value_array_get_value (value_array, i);
461       buffer = GST_BUFFER (gst_value_get_buffer (value));
462       souphttpsink->streamheader_buffers =
463           g_list_append (souphttpsink->streamheader_buffers,
464           gst_buffer_ref (buffer));
465     }
466   }
467
468   return TRUE;
469 }
470
471 static void
472 gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
473     GstClockTime * start, GstClockTime * end)
474 {
475
476 }
477
478 static gboolean
479 thread_ready_idle_cb (gpointer data)
480 {
481   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
482
483   GST_LOG_OBJECT (souphttpsink, "thread ready");
484
485   g_mutex_lock (&souphttpsink->mutex);
486   g_cond_signal (&souphttpsink->cond);
487   g_mutex_unlock (&souphttpsink->mutex);
488
489   return FALSE;                 /* only run once */
490 }
491
492 static gpointer
493 thread_func (gpointer ptr)
494 {
495   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
496
497   GST_DEBUG ("thread start");
498
499   g_main_loop_run (souphttpsink->loop);
500
501   GST_DEBUG ("thread quit");
502
503   return NULL;
504 }
505
506 static gboolean
507 gst_soup_http_client_sink_start (GstBaseSink * sink)
508 {
509   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
510
511   if (souphttpsink->prop_session) {
512     souphttpsink->session = souphttpsink->prop_session;
513   } else {
514     GSource *source;
515     GError *error = NULL;
516
517     souphttpsink->context = g_main_context_new ();
518
519     /* set up idle source to signal when the main loop is running and
520      * it's safe for ::stop() to call g_main_loop_quit() */
521     source = g_idle_source_new ();
522     g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
523     g_source_attach (source, souphttpsink->context);
524     g_source_unref (source);
525
526     souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
527
528     g_mutex_lock (&souphttpsink->mutex);
529
530     /* FIXME: error handling */
531     souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
532         thread_func, souphttpsink, &error);
533
534     GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
535     g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
536     g_mutex_unlock (&souphttpsink->mutex);
537     GST_LOG_OBJECT (souphttpsink, "main loop thread running");
538
539     if (souphttpsink->proxy == NULL) {
540       souphttpsink->session =
541           soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
542           souphttpsink->context, SOUP_SESSION_USER_AGENT,
543           souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
544           NULL);
545     } else {
546       souphttpsink->session =
547           soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
548           souphttpsink->context, SOUP_SESSION_USER_AGENT,
549           souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
550           SOUP_SESSION_PROXY_URI, souphttpsink->proxy, NULL);
551     }
552
553     g_signal_connect (souphttpsink->session, "authenticate",
554         G_CALLBACK (authenticate), souphttpsink);
555   }
556
557   /* Set up logging */
558   gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level,
559       GST_ELEMENT (souphttpsink));
560
561   return TRUE;
562 }
563
564 static gboolean
565 gst_soup_http_client_sink_stop (GstBaseSink * sink)
566 {
567   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
568
569   GST_DEBUG ("stop");
570
571   if (souphttpsink->prop_session == NULL) {
572     soup_session_abort (souphttpsink->session);
573     g_object_unref (souphttpsink->session);
574   }
575
576   if (souphttpsink->loop) {
577     g_main_loop_quit (souphttpsink->loop);
578     g_thread_join (souphttpsink->thread);
579     g_main_loop_unref (souphttpsink->loop);
580     souphttpsink->loop = NULL;
581   }
582   if (souphttpsink->context) {
583     g_main_context_unref (souphttpsink->context);
584     souphttpsink->context = NULL;
585   }
586
587   gst_soup_http_client_sink_reset (souphttpsink);
588
589   return TRUE;
590 }
591
592 static gboolean
593 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
594 {
595   GST_DEBUG ("unlock");
596
597   return TRUE;
598 }
599
600 static gboolean
601 gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
602 {
603   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
604
605   GST_DEBUG_OBJECT (souphttpsink, "event");
606
607   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
608     GST_DEBUG_OBJECT (souphttpsink, "got eos");
609     g_mutex_lock (&souphttpsink->mutex);
610     while (souphttpsink->message) {
611       GST_DEBUG_OBJECT (souphttpsink, "waiting");
612       g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
613     }
614     g_mutex_unlock (&souphttpsink->mutex);
615     GST_DEBUG_OBJECT (souphttpsink, "finished eos");
616   }
617
618   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
619 }
620
621 static GstFlowReturn
622 gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
623 {
624   GST_DEBUG ("preroll");
625
626   return GST_FLOW_OK;
627 }
628
629 static void
630 free_buffer_list (GList * list)
631 {
632   GList *g;
633   for (g = list; g; g = g_list_next (g)) {
634     GstBuffer *buffer = g->data;
635     gst_buffer_unref (buffer);
636   }
637   g_list_free (list);
638 }
639
640 static void
641 send_message_locked (GstSoupHttpClientSink * souphttpsink)
642 {
643   GList *g;
644   guint64 n;
645
646   if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
647     return;
648   }
649
650   /* If the URI went away, drop all these buffers */
651   if (souphttpsink->location == NULL) {
652     GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers");
653     free_buffer_list (souphttpsink->queued_buffers);
654     souphttpsink->queued_buffers = NULL;
655     return;
656   }
657
658   souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
659   soup_message_set_flags (souphttpsink->message,
660       (souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
661
662   if (souphttpsink->cookies) {
663     gchar **cookie;
664
665     for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) {
666       soup_message_headers_append (souphttpsink->message->request_headers,
667           "Cookie", *cookie);
668     }
669   }
670
671   n = 0;
672   if (souphttpsink->offset == 0) {
673     for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
674       GstBuffer *buffer = g->data;
675       GstMapInfo map;
676
677       GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers");
678       gst_buffer_map (buffer, &map, GST_MAP_READ);
679       /* Stream headers are updated whenever ::set_caps is called, so there's
680        * no guarantees about their lifetime and we ask libsoup to copy them 
681        * into the message body with SOUP_MEMORY_COPY. */
682       soup_message_body_append (souphttpsink->message->request_body,
683           SOUP_MEMORY_COPY, map.data, map.size);
684       n += map.size;
685       gst_buffer_unmap (buffer, &map);
686     }
687   }
688
689   for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
690     GstBuffer *buffer = g->data;
691     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
692       GstMapInfo map;
693
694       gst_buffer_map (buffer, &map, GST_MAP_READ);
695       /* Queued buffers are only freed in the next iteration of the mainloop
696        * after the message body has been written out, so we don't need libsoup
697        * to copy those while appending to the body. However, if the buffer is
698        * used elsewhere, it should be copied. Hence, SOUP_MEMORY_TEMPORARY. */
699       soup_message_body_append (souphttpsink->message->request_body,
700           SOUP_MEMORY_TEMPORARY, map.data, map.size);
701       n += map.size;
702       gst_buffer_unmap (buffer, &map);
703     }
704   }
705
706   if (souphttpsink->offset != 0) {
707     char *s;
708     s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
709         souphttpsink->offset, souphttpsink->offset + n - 1);
710     soup_message_headers_append (souphttpsink->message->request_headers,
711         "Content-Range", s);
712     g_free (s);
713   }
714
715   if (n == 0) {
716     GST_DEBUG_OBJECT (souphttpsink,
717         "total size of buffers queued is 0, freeing everything");
718     free_buffer_list (souphttpsink->queued_buffers);
719     souphttpsink->queued_buffers = NULL;
720     g_object_unref (souphttpsink->message);
721     souphttpsink->message = NULL;
722     return;
723   }
724
725   souphttpsink->sent_buffers = souphttpsink->queued_buffers;
726   souphttpsink->queued_buffers = NULL;
727
728   GST_DEBUG_OBJECT (souphttpsink,
729       "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
730       souphttpsink->offset, n);
731   soup_session_queue_message (souphttpsink->session, souphttpsink->message,
732       callback, souphttpsink);
733
734   souphttpsink->offset += n;
735 }
736
737 static gboolean
738 send_message (GstSoupHttpClientSink * souphttpsink)
739 {
740   g_mutex_lock (&souphttpsink->mutex);
741   send_message_locked (souphttpsink);
742   g_mutex_unlock (&souphttpsink->mutex);
743
744   return FALSE;
745 }
746
747 static void
748 callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
749 {
750   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
751
752   GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
753       msg->status_code, msg->reason_phrase);
754
755   g_mutex_lock (&souphttpsink->mutex);
756   g_cond_signal (&souphttpsink->cond);
757   souphttpsink->message = NULL;
758
759   if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
760     souphttpsink->status_code = msg->status_code;
761     souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
762     g_mutex_unlock (&souphttpsink->mutex);
763     return;
764   }
765
766   free_buffer_list (souphttpsink->sent_buffers);
767   souphttpsink->sent_buffers = NULL;
768
769   send_message_locked (souphttpsink);
770   g_mutex_unlock (&souphttpsink->mutex);
771 }
772
773 static GstFlowReturn
774 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
775 {
776   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
777   GSource *source;
778   gboolean wake;
779
780   if (souphttpsink->status_code != 0) {
781     /* FIXME we should allow a moderate amount of retries. */
782     GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
783         ("Could not write to HTTP URI"),
784         ("error: %d %s", souphttpsink->status_code,
785             souphttpsink->reason_phrase));
786     return GST_FLOW_ERROR;
787   }
788
789   g_mutex_lock (&souphttpsink->mutex);
790   if (souphttpsink->location != NULL) {
791     wake = (souphttpsink->queued_buffers == NULL);
792     souphttpsink->queued_buffers =
793         g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
794
795     if (wake) {
796       GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers");
797       source = g_idle_source_new ();
798       g_source_set_callback (source, (GSourceFunc) (send_message),
799           souphttpsink, NULL);
800       g_source_attach (source, souphttpsink->context);
801       g_source_unref (source);
802     }
803   }
804   g_mutex_unlock (&souphttpsink->mutex);
805
806   return GST_FLOW_OK;
807 }
808
809 static void
810 authenticate (SoupSession * session, SoupMessage * msg,
811     SoupAuth * auth, gboolean retrying, gpointer user_data)
812 {
813   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
814
815   if (!retrying) {
816     /* First time authentication only, if we fail and are called again with retry true fall through */
817     if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
818       if (souphttpsink->user_id && souphttpsink->user_pw)
819         soup_auth_authenticate (auth, souphttpsink->user_id,
820             souphttpsink->user_pw);
821     } else if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
822       if (souphttpsink->proxy_id && souphttpsink->proxy_pw)
823         soup_auth_authenticate (auth, souphttpsink->proxy_id,
824             souphttpsink->proxy_pw);
825     }
826   }
827 }