2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) <2011> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 * Copyright (C) <2014> William Manley <will@williammanley.net>
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
25 * SECTION:element-socketsrc
28 * Receive data from a socket.
30 * As compared to other elements:
32 * socketsrc can be considered a source counterpart to the #multisocketsink
35 * socketsrc can also be considered a generalization of #tcpclientsrc and
36 * #tcpserversrc: it contains all the logic required to communicate over the
37 * socket but none of the logic for creating the sockets/establishing the
38 * connection in the first place, allowing the user to accomplish this
39 * externally in whatever manner they wish making it applicable to other types
40 * of sockets besides TCP.
42 * As compared to #fdsrc socketsrc is socket specific and deals with #GSocket
43 * objects rather than sockets via integer file-descriptors.
45 * @see_also: #multisocketsink
52 #include <gst/gst-i18n-plugin.h>
53 #include <gst/net/gstnetcontrolmessagemeta.h>
54 #include "gsttcpelements.h"
55 #include "gstsocketsrc.h"
58 GST_DEBUG_CATEGORY_STATIC (socketsrc_debug);
59 #define GST_CAT_DEFAULT socketsrc_debug
61 #define MAX_READ_SIZE 4 * 1024
64 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
70 #define DEFAULT_SEND_MESSAGES FALSE
82 CONNECTION_CLOSED_BY_PEER,
86 static guint gst_socket_src_signals[LAST_SIGNAL] = { 0 };
88 #define gst_socket_src_parent_class parent_class
89 G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
90 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (socketsrc, "socketsrc",
91 GST_RANK_NONE, GST_TYPE_SOCKET_SRC, tcp_element_init (plugin));
93 static void gst_socket_src_finalize (GObject * gobject);
95 static GstCaps *gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
96 static gboolean gst_socketsrc_event (GstBaseSrc * src, GstEvent * event);
97 static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
99 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
100 static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
102 static void gst_socket_src_set_property (GObject * object, guint prop_id,
103 const GValue * value, GParamSpec * pspec);
104 static void gst_socket_src_get_property (GObject * object, guint prop_id,
105 GValue * value, GParamSpec * pspec);
107 #define SWAP(a, b) do { GSocket* _swap_tmp = a; a = b; b = _swap_tmp; } while (0);
110 gst_socket_src_class_init (GstSocketSrcClass * klass)
112 GObjectClass *gobject_class;
113 GstElementClass *gstelement_class;
114 GstBaseSrcClass *gstbasesrc_class;
115 GstPushSrcClass *gstpush_src_class;
117 gobject_class = (GObjectClass *) klass;
118 gstelement_class = (GstElementClass *) klass;
119 gstbasesrc_class = (GstBaseSrcClass *) klass;
120 gstpush_src_class = (GstPushSrcClass *) klass;
122 gobject_class->set_property = gst_socket_src_set_property;
123 gobject_class->get_property = gst_socket_src_get_property;
124 gobject_class->finalize = gst_socket_src_finalize;
126 g_object_class_install_property (gobject_class, PROP_SOCKET,
127 g_param_spec_object ("socket", "Socket",
128 "The socket to receive packets from", G_TYPE_SOCKET,
129 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
131 g_object_class_install_property (gobject_class, PROP_CAPS,
132 g_param_spec_boxed ("caps", "Caps",
133 "The caps of the source pad", GST_TYPE_CAPS,
134 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
137 * GstSocketSrc:send-messages:
139 * Control if the source will handle GstNetworkMessage events.
140 * The event is a CUSTOM event named 'GstNetworkMessage' and contains:
142 * "buffer", GST_TYPE_BUFFER : the buffer with data to send
144 * The buffer in the event will be sent on the socket. This allows
145 * for simple bidirectional communication.
149 g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
150 g_param_spec_boolean ("send-messages", "Send Messages",
151 "If GstNetworkMessage events should be handled",
152 DEFAULT_SEND_MESSAGES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154 gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER] =
155 g_signal_new ("connection-closed-by-peer", G_TYPE_FROM_CLASS (klass),
156 G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSocketSrcClass,
157 connection_closed_by_peer), NULL, NULL, NULL, G_TYPE_NONE, 0);
159 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
161 gst_element_class_set_static_metadata (gstelement_class,
162 "socket source", "Source/Network",
163 "Receive data from a socket",
164 "Thomas Vander Stichele <thomas at apestaart dot org>, "
165 "William Manley <will@williammanley.net>");
167 gstbasesrc_class->event = gst_socketsrc_event;
168 gstbasesrc_class->get_caps = gst_socketsrc_getcaps;
169 gstbasesrc_class->unlock = gst_socket_src_unlock;
170 gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
172 gstpush_src_class->fill = gst_socket_src_fill;
174 GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
178 gst_socket_src_init (GstSocketSrc * this)
181 this->cancellable = g_cancellable_new ();
182 this->send_messages = DEFAULT_SEND_MESSAGES;
186 gst_socket_src_finalize (GObject * gobject)
188 GstSocketSrc *this = GST_SOCKET_SRC (gobject);
191 gst_caps_unref (this->caps);
192 g_clear_object (&this->cancellable);
193 g_clear_object (&this->socket);
195 G_OBJECT_CLASS (parent_class)->finalize (gobject);
199 gst_socketsrc_event (GstBaseSrc * bsrc, GstEvent * event)
202 gboolean res = FALSE;
204 src = GST_SOCKET_SRC (bsrc);
206 switch (GST_EVENT_TYPE (event)) {
207 case GST_EVENT_CUSTOM_UPSTREAM:
208 if (src->send_messages && gst_event_has_name (event, "GstNetworkMessage")) {
209 const GstStructure *str = gst_event_get_structure (event);
212 GST_OBJECT_LOCK (src);
213 if ((socket = src->socket))
214 g_object_ref (socket);
215 GST_OBJECT_UNLOCK (src);
223 gst_structure_get (str, "buffer", GST_TYPE_BUFFER, &buf, NULL);
226 gst_buffer_map (buf, &map, GST_MAP_READ);
227 GST_LOG ("sending buffer of size %" G_GSIZE_FORMAT, map.size);
228 ret = g_socket_send_with_blocking (socket, (gchar *) map.data,
229 map.size, FALSE, src->cancellable, &err);
230 gst_buffer_unmap (buf, &map);
233 GST_WARNING ("could not send message: %s", err->message);
234 g_clear_error (&err);
238 gst_buffer_unref (buf);
240 g_object_unref (socket);
245 res = GST_BASE_SRC_CLASS (parent_class)->event (bsrc, event);
252 gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
254 GstSocketSrc *socketsrc;
255 GstCaps *caps, *result;
257 socketsrc = GST_SOCKET_SRC (src);
259 GST_OBJECT_LOCK (src);
260 if ((caps = socketsrc->caps))
262 GST_OBJECT_UNLOCK (src);
266 result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
267 gst_caps_unref (caps);
272 result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
278 gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
281 GstFlowReturn ret = GST_FLOW_OK;
285 GSocket *socket = NULL;
286 GSocketControlMessage **messages = NULL;
287 gint num_messages = 0;
292 src = GST_SOCKET_SRC (psrc);
294 GST_OBJECT_LOCK (src);
297 socket = g_object_ref (src->socket);
299 GST_OBJECT_UNLOCK (src);
304 GST_LOG_OBJECT (src, "asked for a buffer");
307 gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
308 ivec.buffer = map.data;
309 ivec.size = map.size;
311 g_socket_receive_message (socket, NULL, &ivec, 1, &messages,
312 &num_messages, &flags, src->cancellable, &err);
313 gst_buffer_unmap (outbuf, &map);
315 for (i = 0; i < num_messages; i++) {
316 gst_buffer_add_net_control_message_meta (outbuf, messages[i]);
317 g_object_unref (messages[i]);
324 GST_DEBUG_OBJECT (src, "Received EOS on socket %p fd %i", socket,
325 g_socket_get_fd (socket));
327 /* We've hit EOS but we'll send this signal to allow someone to change
328 * our socket before we send EOS downstream. */
329 g_signal_emit (src, gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER], 0);
331 GST_OBJECT_LOCK (src);
334 tmp = g_object_ref (src->socket);
336 GST_OBJECT_UNLOCK (src);
338 /* Do this dance with tmp to avoid unreffing with the lock held */
339 if (tmp != NULL && tmp != socket) {
341 g_clear_object (&tmp);
343 GST_INFO_OBJECT (src, "New socket available after EOS %p fd %i: Retrying",
344 socket, g_socket_get_fd (socket));
346 /* retry with our new socket: */
349 g_clear_object (&tmp);
350 GST_INFO_OBJECT (src, "Forwarding EOS downstream");
353 } else if (rret < 0) {
354 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
355 ret = GST_FLOW_FLUSHING;
356 GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
358 ret = GST_FLOW_ERROR;
359 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
360 ("Failed to read from socket: %s", err->message));
364 gst_buffer_resize (outbuf, 0, rret);
367 "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
368 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
369 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
370 gst_buffer_get_size (outbuf),
371 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
372 GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
373 GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
375 g_clear_error (&err);
376 g_clear_object (&socket);
382 GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
383 ("Cannot receive: No socket set on socketsrc"));
384 return GST_FLOW_ERROR;
389 gst_socket_src_set_property (GObject * object, guint prop_id,
390 const GValue * value, GParamSpec * pspec)
392 GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
396 GSocket *socket = G_SOCKET (g_value_dup_object (value));
397 GST_OBJECT_LOCK (socketsrc);
398 SWAP (socket, socketsrc->socket);
399 GST_OBJECT_UNLOCK (socketsrc);
400 g_clear_object (&socket);
405 const GstCaps *new_caps_val = gst_value_get_caps (value);
409 if (new_caps_val == NULL) {
410 new_caps = gst_caps_new_any ();
412 new_caps = gst_caps_copy (new_caps_val);
415 GST_OBJECT_LOCK (socketsrc);
416 old_caps = socketsrc->caps;
417 socketsrc->caps = new_caps;
418 GST_OBJECT_UNLOCK (socketsrc);
421 gst_caps_unref (old_caps);
423 gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (socketsrc));
426 case PROP_SEND_MESSAGES:
427 socketsrc->send_messages = g_value_get_boolean (value);
430 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
436 gst_socket_src_get_property (GObject * object, guint prop_id,
437 GValue * value, GParamSpec * pspec)
439 GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
443 g_value_set_object (value, socketsrc->socket);
446 GST_OBJECT_LOCK (socketsrc);
447 gst_value_set_caps (value, socketsrc->caps);
448 GST_OBJECT_UNLOCK (socketsrc);
450 case PROP_SEND_MESSAGES:
451 g_value_set_boolean (value, socketsrc->send_messages);
454 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
460 gst_socket_src_unlock (GstBaseSrc * bsrc)
462 GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
464 GST_DEBUG_OBJECT (src, "set to flushing");
465 g_cancellable_cancel (src->cancellable);
471 gst_socket_src_unlock_stop (GstBaseSrc * bsrc)
473 GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
475 GST_DEBUG_OBJECT (src, "unset flushing");
476 g_cancellable_reset (src->cancellable);