613f985faba166948fbd6284afa9acc263c7a24b
[platform/upstream/gstreamer.git] / gst / tcp / gstsocketsrc.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  * Copyright (C) <2014> William Manley <will@williammanley.net>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-socketsrc
26  * @title: socketsrc
27  *
28  * Receive data from a socket.
29  *
30  * As compared to other elements:
31  *
32  * socketsrc can be considered a source counterpart to the #multisocketsink
33  * sink.
34  *
35  * socketsrc can also be considered a generalization of #tcpclientsrc and
36  * #tcpserversrc: it contains all the logic required to communicate over the
37  * socket but none of the logic for creating the sockets/establishing the
38  * connection in the first place, allowing the user to accomplish this
39  * externally in whatever manner they wish making it applicable to other types
40  * of sockets besides TCP.
41  *
42  * As compared to #fdsrc socketsrc is socket specific and deals with #GSocket
43  * objects rather than sockets via integer file-descriptors.
44  *
45  * @see_also: #multisocketsink
46  */
47
48 #ifdef HAVE_CONFIG_H
49 #include "config.h"
50 #endif
51
52 #include <gst/gst-i18n-plugin.h>
53 #include <gst/net/gstnetcontrolmessagemeta.h>
54 #include "gsttcpelements.h"
55 #include "gstsocketsrc.h"
56 #include "gsttcp.h"
57
58 GST_DEBUG_CATEGORY_STATIC (socketsrc_debug);
59 #define GST_CAT_DEFAULT socketsrc_debug
60
61 #define MAX_READ_SIZE                   4 * 1024
62
63
64 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
65     GST_PAD_SRC,
66     GST_PAD_ALWAYS,
67     GST_STATIC_CAPS_ANY);
68
69
70 #define DEFAULT_SEND_MESSAGES FALSE
71
72 enum
73 {
74   PROP_0,
75   PROP_SOCKET,
76   PROP_CAPS,
77   PROP_SEND_MESSAGES
78 };
79
80 enum
81 {
82   CONNECTION_CLOSED_BY_PEER,
83   LAST_SIGNAL
84 };
85
86 static guint gst_socket_src_signals[LAST_SIGNAL] = { 0 };
87
88 #define gst_socket_src_parent_class parent_class
89 G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
90 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (socketsrc, "socketsrc",
91     GST_RANK_NONE, GST_TYPE_SOCKET_SRC, tcp_element_init (plugin));
92
93 static void gst_socket_src_finalize (GObject * gobject);
94
95 static GstCaps *gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
96 static gboolean gst_socketsrc_event (GstBaseSrc * src, GstEvent * event);
97 static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
98     GstBuffer * outbuf);
99 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
100 static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
101
102 static void gst_socket_src_set_property (GObject * object, guint prop_id,
103     const GValue * value, GParamSpec * pspec);
104 static void gst_socket_src_get_property (GObject * object, guint prop_id,
105     GValue * value, GParamSpec * pspec);
106
107 #define SWAP(a, b) do { GSocket* _swap_tmp = a; a = b; b = _swap_tmp; } while (0);
108
109 static void
110 gst_socket_src_class_init (GstSocketSrcClass * klass)
111 {
112   GObjectClass *gobject_class;
113   GstElementClass *gstelement_class;
114   GstBaseSrcClass *gstbasesrc_class;
115   GstPushSrcClass *gstpush_src_class;
116
117   gobject_class = (GObjectClass *) klass;
118   gstelement_class = (GstElementClass *) klass;
119   gstbasesrc_class = (GstBaseSrcClass *) klass;
120   gstpush_src_class = (GstPushSrcClass *) klass;
121
122   gobject_class->set_property = gst_socket_src_set_property;
123   gobject_class->get_property = gst_socket_src_get_property;
124   gobject_class->finalize = gst_socket_src_finalize;
125
126   g_object_class_install_property (gobject_class, PROP_SOCKET,
127       g_param_spec_object ("socket", "Socket",
128           "The socket to receive packets from", G_TYPE_SOCKET,
129           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
130
131   g_object_class_install_property (gobject_class, PROP_CAPS,
132       g_param_spec_boxed ("caps", "Caps",
133           "The caps of the source pad", GST_TYPE_CAPS,
134           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
135
136   /**
137    * GstSocketSrc:send-messages:
138    *
139    * Control if the source will handle GstNetworkMessage events.
140    * The event is a CUSTOM event named 'GstNetworkMessage' and contains:
141    *
142    *   "buffer", GST_TYPE_BUFFER    : the buffer with data to send
143    *
144    * The buffer in the event will be sent on the socket. This allows
145    * for simple bidirectional communication.
146    *
147    * Since: 1.8.0
148    **/
149   g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
150       g_param_spec_boolean ("send-messages", "Send Messages",
151           "If GstNetworkMessage events should be handled",
152           DEFAULT_SEND_MESSAGES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
153
154   gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER] =
155       g_signal_new ("connection-closed-by-peer", G_TYPE_FROM_CLASS (klass),
156       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSocketSrcClass,
157           connection_closed_by_peer), NULL, NULL, NULL, G_TYPE_NONE, 0);
158
159   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
160
161   gst_element_class_set_static_metadata (gstelement_class,
162       "socket source", "Source/Network",
163       "Receive data from a socket",
164       "Thomas Vander Stichele <thomas at apestaart dot org>, "
165       "William Manley <will@williammanley.net>");
166
167   gstbasesrc_class->event = gst_socketsrc_event;
168   gstbasesrc_class->get_caps = gst_socketsrc_getcaps;
169   gstbasesrc_class->unlock = gst_socket_src_unlock;
170   gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
171
172   gstpush_src_class->fill = gst_socket_src_fill;
173
174   GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
175 }
176
177 static void
178 gst_socket_src_init (GstSocketSrc * this)
179 {
180   this->socket = NULL;
181   this->cancellable = g_cancellable_new ();
182   this->send_messages = DEFAULT_SEND_MESSAGES;
183 }
184
185 static void
186 gst_socket_src_finalize (GObject * gobject)
187 {
188   GstSocketSrc *this = GST_SOCKET_SRC (gobject);
189
190   if (this->caps)
191     gst_caps_unref (this->caps);
192   g_clear_object (&this->cancellable);
193   g_clear_object (&this->socket);
194
195   G_OBJECT_CLASS (parent_class)->finalize (gobject);
196 }
197
198 static gboolean
199 gst_socketsrc_event (GstBaseSrc * bsrc, GstEvent * event)
200 {
201   GstSocketSrc *src;
202   gboolean res = FALSE;
203
204   src = GST_SOCKET_SRC (bsrc);
205
206   switch (GST_EVENT_TYPE (event)) {
207     case GST_EVENT_CUSTOM_UPSTREAM:
208       if (src->send_messages && gst_event_has_name (event, "GstNetworkMessage")) {
209         const GstStructure *str = gst_event_get_structure (event);
210         GSocket *socket;
211
212         GST_OBJECT_LOCK (src);
213         if ((socket = src->socket))
214           g_object_ref (socket);
215         GST_OBJECT_UNLOCK (src);
216
217         if (socket) {
218           GstBuffer *buf;
219           GstMapInfo map;
220           GError *err = NULL;
221           gssize ret;
222
223           gst_structure_get (str, "buffer", GST_TYPE_BUFFER, &buf, NULL);
224
225           if (buf) {
226             gst_buffer_map (buf, &map, GST_MAP_READ);
227             GST_LOG ("sending buffer of size %" G_GSIZE_FORMAT, map.size);
228             ret = g_socket_send_with_blocking (socket, (gchar *) map.data,
229                 map.size, FALSE, src->cancellable, &err);
230             gst_buffer_unmap (buf, &map);
231
232             if (ret == -1) {
233               GST_WARNING ("could not send message: %s", err->message);
234               g_clear_error (&err);
235               res = FALSE;
236             } else
237               res = TRUE;
238             gst_buffer_unref (buf);
239           }
240           g_object_unref (socket);
241         }
242       }
243       break;
244     default:
245       res = GST_BASE_SRC_CLASS (parent_class)->event (bsrc, event);
246       break;
247   }
248   return res;
249 }
250
251 static GstCaps *
252 gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
253 {
254   GstSocketSrc *socketsrc;
255   GstCaps *caps, *result;
256
257   socketsrc = GST_SOCKET_SRC (src);
258
259   GST_OBJECT_LOCK (src);
260   if ((caps = socketsrc->caps))
261     gst_caps_ref (caps);
262   GST_OBJECT_UNLOCK (src);
263
264   if (caps) {
265     if (filter) {
266       result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
267       gst_caps_unref (caps);
268     } else {
269       result = caps;
270     }
271   } else {
272     result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
273   }
274   return result;
275 }
276
277 static GstFlowReturn
278 gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
279 {
280   GstSocketSrc *src;
281   GstFlowReturn ret = GST_FLOW_OK;
282   gssize rret;
283   GError *err = NULL;
284   GstMapInfo map;
285   GSocket *socket = NULL;
286   GSocketControlMessage **messages = NULL;
287   gint num_messages = 0;
288   gint i;
289   GInputVector ivec;
290   gint flags = 0;
291
292   src = GST_SOCKET_SRC (psrc);
293
294   GST_OBJECT_LOCK (src);
295
296   if (src->socket)
297     socket = g_object_ref (src->socket);
298
299   GST_OBJECT_UNLOCK (src);
300
301   if (socket == NULL)
302     goto no_socket;
303
304   GST_LOG_OBJECT (src, "asked for a buffer");
305
306 retry:
307   gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
308   ivec.buffer = map.data;
309   ivec.size = map.size;
310   rret =
311       g_socket_receive_message (socket, NULL, &ivec, 1, &messages,
312       &num_messages, &flags, src->cancellable, &err);
313   gst_buffer_unmap (outbuf, &map);
314
315   for (i = 0; i < num_messages; i++) {
316     gst_buffer_add_net_control_message_meta (outbuf, messages[i]);
317     g_object_unref (messages[i]);
318     messages[i] = NULL;
319   }
320   g_free (messages);
321
322   if (rret == 0) {
323     GSocket *tmp = NULL;
324     GST_DEBUG_OBJECT (src, "Received EOS on socket %p fd %i", socket,
325         g_socket_get_fd (socket));
326
327     /* We've hit EOS but we'll send this signal to allow someone to change
328      * our socket before we send EOS downstream. */
329     g_signal_emit (src, gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER], 0);
330
331     GST_OBJECT_LOCK (src);
332
333     if (src->socket)
334       tmp = g_object_ref (src->socket);
335
336     GST_OBJECT_UNLOCK (src);
337
338     /* Do this dance with tmp to avoid unreffing with the lock held */
339     if (tmp != NULL && tmp != socket) {
340       SWAP (socket, tmp);
341       g_clear_object (&tmp);
342
343       GST_INFO_OBJECT (src, "New socket available after EOS %p fd %i: Retrying",
344           socket, g_socket_get_fd (socket));
345
346       /* retry with our new socket: */
347       goto retry;
348     } else {
349       g_clear_object (&tmp);
350       GST_INFO_OBJECT (src, "Forwarding EOS downstream");
351       ret = GST_FLOW_EOS;
352     }
353   } else if (rret < 0) {
354     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
355       ret = GST_FLOW_FLUSHING;
356       GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
357     } else {
358       ret = GST_FLOW_ERROR;
359       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
360           ("Failed to read from socket: %s", err->message));
361     }
362   } else {
363     ret = GST_FLOW_OK;
364     gst_buffer_resize (outbuf, 0, rret);
365
366     GST_LOG_OBJECT (src,
367         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
368         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
369         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
370         gst_buffer_get_size (outbuf),
371         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
372         GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
373         GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
374   }
375   g_clear_error (&err);
376   g_clear_object (&socket);
377
378   return ret;
379
380 no_socket:
381   {
382     GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
383         ("Cannot receive: No socket set on socketsrc"));
384     return GST_FLOW_ERROR;
385   }
386 }
387
388 static void
389 gst_socket_src_set_property (GObject * object, guint prop_id,
390     const GValue * value, GParamSpec * pspec)
391 {
392   GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
393
394   switch (prop_id) {
395     case PROP_SOCKET:{
396       GSocket *socket = G_SOCKET (g_value_dup_object (value));
397       GST_OBJECT_LOCK (socketsrc);
398       SWAP (socket, socketsrc->socket);
399       GST_OBJECT_UNLOCK (socketsrc);
400       g_clear_object (&socket);
401       break;
402     }
403     case PROP_CAPS:
404     {
405       const GstCaps *new_caps_val = gst_value_get_caps (value);
406       GstCaps *new_caps;
407       GstCaps *old_caps;
408
409       if (new_caps_val == NULL) {
410         new_caps = gst_caps_new_any ();
411       } else {
412         new_caps = gst_caps_copy (new_caps_val);
413       }
414
415       GST_OBJECT_LOCK (socketsrc);
416       old_caps = socketsrc->caps;
417       socketsrc->caps = new_caps;
418       GST_OBJECT_UNLOCK (socketsrc);
419
420       if (old_caps)
421         gst_caps_unref (old_caps);
422
423       gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (socketsrc));
424       break;
425     }
426     case PROP_SEND_MESSAGES:
427       socketsrc->send_messages = g_value_get_boolean (value);
428       break;
429     default:
430       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
431       break;
432   }
433 }
434
435 static void
436 gst_socket_src_get_property (GObject * object, guint prop_id,
437     GValue * value, GParamSpec * pspec)
438 {
439   GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
440
441   switch (prop_id) {
442     case PROP_SOCKET:
443       g_value_set_object (value, socketsrc->socket);
444       break;
445     case PROP_CAPS:
446       GST_OBJECT_LOCK (socketsrc);
447       gst_value_set_caps (value, socketsrc->caps);
448       GST_OBJECT_UNLOCK (socketsrc);
449       break;
450     case PROP_SEND_MESSAGES:
451       g_value_set_boolean (value, socketsrc->send_messages);
452       break;
453     default:
454       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
455       break;
456   }
457 }
458
459 static gboolean
460 gst_socket_src_unlock (GstBaseSrc * bsrc)
461 {
462   GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
463
464   GST_DEBUG_OBJECT (src, "set to flushing");
465   g_cancellable_cancel (src->cancellable);
466
467   return TRUE;
468 }
469
470 static gboolean
471 gst_socket_src_unlock_stop (GstBaseSrc * bsrc)
472 {
473   GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
474
475   GST_DEBUG_OBJECT (src, "unset flushing");
476   g_cancellable_reset (src->cancellable);
477
478   return TRUE;
479 }