tcp: cleanup files
[platform/upstream/gstreamer.git] / gst / tcp / gstsocketsrc.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  * Copyright (C) <2014> William Manley <will@williammanley.net>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-socketsrc
26  * @title: socketsrc
27  *
28  * Receive data from a socket.
29  *
30  * As compared to other elements:
31  *
32  * socketsrc can be considered a source counterpart to the #multisocketsink
33  * sink.
34  *
35  * socketsrc can also be considered a generalization of #tcpclientsrc and
36  * #tcpserversrc: it contains all the logic required to communicate over the
37  * socket but none of the logic for creating the sockets/establishing the
38  * connection in the first place, allowing the user to accomplish this
39  * externally in whatever manner they wish making it applicable to other types
40  * of sockets besides TCP.
41  *
42  * As compared to #fdsrc socketsrc is socket specific and deals with #GSocket
43  * objects rather than sockets via integer file-descriptors.
44  *
45  * @see_also: #multisocketsink
46  */
47
48 #ifdef HAVE_CONFIG_H
49 #include "config.h"
50 #endif
51
52 #include <gst/gst-i18n-plugin.h>
53 #include <gst/net/gstnetcontrolmessagemeta.h>
54 #include "gsttcpelements.h"
55 #include "gstsocketsrc.h"
56
57 GST_DEBUG_CATEGORY_STATIC (socketsrc_debug);
58 #define GST_CAT_DEFAULT socketsrc_debug
59
60 #define MAX_READ_SIZE                   4 * 1024
61
62
63 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
64     GST_PAD_SRC,
65     GST_PAD_ALWAYS,
66     GST_STATIC_CAPS_ANY);
67
68
69 #define DEFAULT_SEND_MESSAGES FALSE
70
71 enum
72 {
73   PROP_0,
74   PROP_SOCKET,
75   PROP_CAPS,
76   PROP_SEND_MESSAGES
77 };
78
79 enum
80 {
81   CONNECTION_CLOSED_BY_PEER,
82   LAST_SIGNAL
83 };
84
85 static guint gst_socket_src_signals[LAST_SIGNAL] = { 0 };
86
87 #define gst_socket_src_parent_class parent_class
88 G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
89 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (socketsrc, "socketsrc",
90     GST_RANK_NONE, GST_TYPE_SOCKET_SRC, tcp_element_init (plugin));
91
92 static void gst_socket_src_finalize (GObject * gobject);
93
94 static GstCaps *gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
95 static gboolean gst_socketsrc_event (GstBaseSrc * src, GstEvent * event);
96 static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
97     GstBuffer * outbuf);
98 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
99 static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
100
101 static void gst_socket_src_set_property (GObject * object, guint prop_id,
102     const GValue * value, GParamSpec * pspec);
103 static void gst_socket_src_get_property (GObject * object, guint prop_id,
104     GValue * value, GParamSpec * pspec);
105
106 #define SWAP(a, b) do { GSocket* _swap_tmp = a; a = b; b = _swap_tmp; } while (0);
107
108 static void
109 gst_socket_src_class_init (GstSocketSrcClass * klass)
110 {
111   GObjectClass *gobject_class;
112   GstElementClass *gstelement_class;
113   GstBaseSrcClass *gstbasesrc_class;
114   GstPushSrcClass *gstpush_src_class;
115
116   gobject_class = (GObjectClass *) klass;
117   gstelement_class = (GstElementClass *) klass;
118   gstbasesrc_class = (GstBaseSrcClass *) klass;
119   gstpush_src_class = (GstPushSrcClass *) klass;
120
121   gobject_class->set_property = gst_socket_src_set_property;
122   gobject_class->get_property = gst_socket_src_get_property;
123   gobject_class->finalize = gst_socket_src_finalize;
124
125   g_object_class_install_property (gobject_class, PROP_SOCKET,
126       g_param_spec_object ("socket", "Socket",
127           "The socket to receive packets from", G_TYPE_SOCKET,
128           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
129
130   g_object_class_install_property (gobject_class, PROP_CAPS,
131       g_param_spec_boxed ("caps", "Caps",
132           "The caps of the source pad", GST_TYPE_CAPS,
133           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
134
135   /**
136    * GstSocketSrc:send-messages:
137    *
138    * Control if the source will handle GstNetworkMessage events.
139    * The event is a CUSTOM event named 'GstNetworkMessage' and contains:
140    *
141    *   "buffer", GST_TYPE_BUFFER    : the buffer with data to send
142    *
143    * The buffer in the event will be sent on the socket. This allows
144    * for simple bidirectional communication.
145    *
146    * Since: 1.8.0
147    **/
148   g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
149       g_param_spec_boolean ("send-messages", "Send Messages",
150           "If GstNetworkMessage events should be handled",
151           DEFAULT_SEND_MESSAGES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
152
153   gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER] =
154       g_signal_new ("connection-closed-by-peer", G_TYPE_FROM_CLASS (klass),
155       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSocketSrcClass,
156           connection_closed_by_peer), NULL, NULL, NULL, G_TYPE_NONE, 0);
157
158   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
159
160   gst_element_class_set_static_metadata (gstelement_class,
161       "socket source", "Source/Network",
162       "Receive data from a socket",
163       "Thomas Vander Stichele <thomas at apestaart dot org>, "
164       "William Manley <will@williammanley.net>");
165
166   gstbasesrc_class->event = gst_socketsrc_event;
167   gstbasesrc_class->get_caps = gst_socketsrc_getcaps;
168   gstbasesrc_class->unlock = gst_socket_src_unlock;
169   gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
170
171   gstpush_src_class->fill = gst_socket_src_fill;
172
173   GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
174 }
175
176 static void
177 gst_socket_src_init (GstSocketSrc * this)
178 {
179   this->socket = NULL;
180   this->cancellable = g_cancellable_new ();
181   this->send_messages = DEFAULT_SEND_MESSAGES;
182 }
183
184 static void
185 gst_socket_src_finalize (GObject * gobject)
186 {
187   GstSocketSrc *this = GST_SOCKET_SRC (gobject);
188
189   if (this->caps)
190     gst_caps_unref (this->caps);
191   g_clear_object (&this->cancellable);
192   g_clear_object (&this->socket);
193
194   G_OBJECT_CLASS (parent_class)->finalize (gobject);
195 }
196
197 static gboolean
198 gst_socketsrc_event (GstBaseSrc * bsrc, GstEvent * event)
199 {
200   GstSocketSrc *src;
201   gboolean res = FALSE;
202
203   src = GST_SOCKET_SRC (bsrc);
204
205   switch (GST_EVENT_TYPE (event)) {
206     case GST_EVENT_CUSTOM_UPSTREAM:
207       if (src->send_messages && gst_event_has_name (event, "GstNetworkMessage")) {
208         const GstStructure *str = gst_event_get_structure (event);
209         GSocket *socket;
210
211         GST_OBJECT_LOCK (src);
212         if ((socket = src->socket))
213           g_object_ref (socket);
214         GST_OBJECT_UNLOCK (src);
215
216         if (socket) {
217           GstBuffer *buf;
218           GstMapInfo map;
219           GError *err = NULL;
220           gssize ret;
221
222           gst_structure_get (str, "buffer", GST_TYPE_BUFFER, &buf, NULL);
223
224           if (buf) {
225             gst_buffer_map (buf, &map, GST_MAP_READ);
226             GST_LOG ("sending buffer of size %" G_GSIZE_FORMAT, map.size);
227             ret = g_socket_send_with_blocking (socket, (gchar *) map.data,
228                 map.size, FALSE, src->cancellable, &err);
229             gst_buffer_unmap (buf, &map);
230
231             if (ret == -1) {
232               GST_WARNING ("could not send message: %s", err->message);
233               g_clear_error (&err);
234               res = FALSE;
235             } else
236               res = TRUE;
237             gst_buffer_unref (buf);
238           }
239           g_object_unref (socket);
240         }
241       }
242       break;
243     default:
244       res = GST_BASE_SRC_CLASS (parent_class)->event (bsrc, event);
245       break;
246   }
247   return res;
248 }
249
250 static GstCaps *
251 gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
252 {
253   GstSocketSrc *socketsrc;
254   GstCaps *caps, *result;
255
256   socketsrc = GST_SOCKET_SRC (src);
257
258   GST_OBJECT_LOCK (src);
259   if ((caps = socketsrc->caps))
260     gst_caps_ref (caps);
261   GST_OBJECT_UNLOCK (src);
262
263   if (caps) {
264     if (filter) {
265       result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
266       gst_caps_unref (caps);
267     } else {
268       result = caps;
269     }
270   } else {
271     result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
272   }
273   return result;
274 }
275
276 static GstFlowReturn
277 gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
278 {
279   GstSocketSrc *src;
280   GstFlowReturn ret = GST_FLOW_OK;
281   gssize rret;
282   GError *err = NULL;
283   GstMapInfo map;
284   GSocket *socket = NULL;
285   GSocketControlMessage **messages = NULL;
286   gint num_messages = 0;
287   gint i;
288   GInputVector ivec;
289   gint flags = 0;
290
291   src = GST_SOCKET_SRC (psrc);
292
293   GST_OBJECT_LOCK (src);
294
295   if (src->socket)
296     socket = g_object_ref (src->socket);
297
298   GST_OBJECT_UNLOCK (src);
299
300   if (socket == NULL)
301     goto no_socket;
302
303   GST_LOG_OBJECT (src, "asked for a buffer");
304
305 retry:
306   gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
307   ivec.buffer = map.data;
308   ivec.size = map.size;
309   rret =
310       g_socket_receive_message (socket, NULL, &ivec, 1, &messages,
311       &num_messages, &flags, src->cancellable, &err);
312   gst_buffer_unmap (outbuf, &map);
313
314   for (i = 0; i < num_messages; i++) {
315     gst_buffer_add_net_control_message_meta (outbuf, messages[i]);
316     g_object_unref (messages[i]);
317     messages[i] = NULL;
318   }
319   g_free (messages);
320
321   if (rret == 0) {
322     GSocket *tmp = NULL;
323     GST_DEBUG_OBJECT (src, "Received EOS on socket %p fd %i", socket,
324         g_socket_get_fd (socket));
325
326     /* We've hit EOS but we'll send this signal to allow someone to change
327      * our socket before we send EOS downstream. */
328     g_signal_emit (src, gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER], 0);
329
330     GST_OBJECT_LOCK (src);
331
332     if (src->socket)
333       tmp = g_object_ref (src->socket);
334
335     GST_OBJECT_UNLOCK (src);
336
337     /* Do this dance with tmp to avoid unreffing with the lock held */
338     if (tmp != NULL && tmp != socket) {
339       SWAP (socket, tmp);
340       g_clear_object (&tmp);
341
342       GST_INFO_OBJECT (src, "New socket available after EOS %p fd %i: Retrying",
343           socket, g_socket_get_fd (socket));
344
345       /* retry with our new socket: */
346       goto retry;
347     } else {
348       g_clear_object (&tmp);
349       GST_INFO_OBJECT (src, "Forwarding EOS downstream");
350       ret = GST_FLOW_EOS;
351     }
352   } else if (rret < 0) {
353     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
354       ret = GST_FLOW_FLUSHING;
355       GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
356     } else {
357       ret = GST_FLOW_ERROR;
358       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
359           ("Failed to read from socket: %s", err->message));
360     }
361   } else {
362     ret = GST_FLOW_OK;
363     gst_buffer_resize (outbuf, 0, rret);
364
365     GST_LOG_OBJECT (src,
366         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
367         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
368         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
369         gst_buffer_get_size (outbuf),
370         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
371         GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
372         GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
373   }
374   g_clear_error (&err);
375   g_clear_object (&socket);
376
377   return ret;
378
379 no_socket:
380   {
381     GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
382         ("Cannot receive: No socket set on socketsrc"));
383     return GST_FLOW_ERROR;
384   }
385 }
386
387 static void
388 gst_socket_src_set_property (GObject * object, guint prop_id,
389     const GValue * value, GParamSpec * pspec)
390 {
391   GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
392
393   switch (prop_id) {
394     case PROP_SOCKET:{
395       GSocket *socket = G_SOCKET (g_value_dup_object (value));
396       GST_OBJECT_LOCK (socketsrc);
397       SWAP (socket, socketsrc->socket);
398       GST_OBJECT_UNLOCK (socketsrc);
399       g_clear_object (&socket);
400       break;
401     }
402     case PROP_CAPS:
403     {
404       const GstCaps *new_caps_val = gst_value_get_caps (value);
405       GstCaps *new_caps;
406       GstCaps *old_caps;
407
408       if (new_caps_val == NULL) {
409         new_caps = gst_caps_new_any ();
410       } else {
411         new_caps = gst_caps_copy (new_caps_val);
412       }
413
414       GST_OBJECT_LOCK (socketsrc);
415       old_caps = socketsrc->caps;
416       socketsrc->caps = new_caps;
417       GST_OBJECT_UNLOCK (socketsrc);
418
419       if (old_caps)
420         gst_caps_unref (old_caps);
421
422       gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (socketsrc));
423       break;
424     }
425     case PROP_SEND_MESSAGES:
426       socketsrc->send_messages = g_value_get_boolean (value);
427       break;
428     default:
429       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
430       break;
431   }
432 }
433
434 static void
435 gst_socket_src_get_property (GObject * object, guint prop_id,
436     GValue * value, GParamSpec * pspec)
437 {
438   GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
439
440   switch (prop_id) {
441     case PROP_SOCKET:
442       g_value_set_object (value, socketsrc->socket);
443       break;
444     case PROP_CAPS:
445       GST_OBJECT_LOCK (socketsrc);
446       gst_value_set_caps (value, socketsrc->caps);
447       GST_OBJECT_UNLOCK (socketsrc);
448       break;
449     case PROP_SEND_MESSAGES:
450       g_value_set_boolean (value, socketsrc->send_messages);
451       break;
452     default:
453       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
454       break;
455   }
456 }
457
458 static gboolean
459 gst_socket_src_unlock (GstBaseSrc * bsrc)
460 {
461   GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
462
463   GST_DEBUG_OBJECT (src, "set to flushing");
464   g_cancellable_cancel (src->cancellable);
465
466   return TRUE;
467 }
468
469 static gboolean
470 gst_socket_src_unlock_stop (GstBaseSrc * bsrc)
471 {
472   GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
473
474   GST_DEBUG_OBJECT (src, "unset flushing");
475   g_cancellable_reset (src->cancellable);
476
477   return TRUE;
478 }