2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) <2011> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 * Copyright (C) <2014> William Manley <will@williammanley.net>
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
25 * SECTION:element-socketsrc
28 * Receive data from a socket.
30 * As compared to other elements:
32 * socketsrc can be considered a source counterpart to the #multisocketsink
35 * socketsrc can also be considered a generalization of #tcpclientsrc and
36 * #tcpserversrc: it contains all the logic required to communicate over the
37 * socket but none of the logic for creating the sockets/establishing the
38 * connection in the first place, allowing the user to accomplish this
39 * externally in whatever manner they wish making it applicable to other types
40 * of sockets besides TCP.
42 * As compared to #fdsrc socketsrc is socket specific and deals with #GSocket
43 * objects rather than sockets via integer file-descriptors.
45 * @see_also: #multisocketsink
52 #include <gst/gst-i18n-plugin.h>
53 #include <gst/net/gstnetcontrolmessagemeta.h>
54 #include "gsttcpelements.h"
55 #include "gstsocketsrc.h"
57 GST_DEBUG_CATEGORY_STATIC (socketsrc_debug);
58 #define GST_CAT_DEFAULT socketsrc_debug
60 #define MAX_READ_SIZE 4 * 1024
63 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
69 #define DEFAULT_SEND_MESSAGES FALSE
81 CONNECTION_CLOSED_BY_PEER,
85 static guint gst_socket_src_signals[LAST_SIGNAL] = { 0 };
87 #define gst_socket_src_parent_class parent_class
88 G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
89 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (socketsrc, "socketsrc",
90 GST_RANK_NONE, GST_TYPE_SOCKET_SRC, tcp_element_init (plugin));
92 static void gst_socket_src_finalize (GObject * gobject);
94 static GstCaps *gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
95 static gboolean gst_socketsrc_event (GstBaseSrc * src, GstEvent * event);
96 static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
98 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
99 static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
101 static void gst_socket_src_set_property (GObject * object, guint prop_id,
102 const GValue * value, GParamSpec * pspec);
103 static void gst_socket_src_get_property (GObject * object, guint prop_id,
104 GValue * value, GParamSpec * pspec);
106 #define SWAP(a, b) do { GSocket* _swap_tmp = a; a = b; b = _swap_tmp; } while (0);
109 gst_socket_src_class_init (GstSocketSrcClass * klass)
111 GObjectClass *gobject_class;
112 GstElementClass *gstelement_class;
113 GstBaseSrcClass *gstbasesrc_class;
114 GstPushSrcClass *gstpush_src_class;
116 gobject_class = (GObjectClass *) klass;
117 gstelement_class = (GstElementClass *) klass;
118 gstbasesrc_class = (GstBaseSrcClass *) klass;
119 gstpush_src_class = (GstPushSrcClass *) klass;
121 gobject_class->set_property = gst_socket_src_set_property;
122 gobject_class->get_property = gst_socket_src_get_property;
123 gobject_class->finalize = gst_socket_src_finalize;
125 g_object_class_install_property (gobject_class, PROP_SOCKET,
126 g_param_spec_object ("socket", "Socket",
127 "The socket to receive packets from", G_TYPE_SOCKET,
128 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
130 g_object_class_install_property (gobject_class, PROP_CAPS,
131 g_param_spec_boxed ("caps", "Caps",
132 "The caps of the source pad", GST_TYPE_CAPS,
133 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
136 * GstSocketSrc:send-messages:
138 * Control if the source will handle GstNetworkMessage events.
139 * The event is a CUSTOM event named 'GstNetworkMessage' and contains:
141 * "buffer", GST_TYPE_BUFFER : the buffer with data to send
143 * The buffer in the event will be sent on the socket. This allows
144 * for simple bidirectional communication.
148 g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
149 g_param_spec_boolean ("send-messages", "Send Messages",
150 "If GstNetworkMessage events should be handled",
151 DEFAULT_SEND_MESSAGES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
153 gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER] =
154 g_signal_new ("connection-closed-by-peer", G_TYPE_FROM_CLASS (klass),
155 G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSocketSrcClass,
156 connection_closed_by_peer), NULL, NULL, NULL, G_TYPE_NONE, 0);
158 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
160 gst_element_class_set_static_metadata (gstelement_class,
161 "socket source", "Source/Network",
162 "Receive data from a socket",
163 "Thomas Vander Stichele <thomas at apestaart dot org>, "
164 "William Manley <will@williammanley.net>");
166 gstbasesrc_class->event = gst_socketsrc_event;
167 gstbasesrc_class->get_caps = gst_socketsrc_getcaps;
168 gstbasesrc_class->unlock = gst_socket_src_unlock;
169 gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
171 gstpush_src_class->fill = gst_socket_src_fill;
173 GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
177 gst_socket_src_init (GstSocketSrc * this)
180 this->cancellable = g_cancellable_new ();
181 this->send_messages = DEFAULT_SEND_MESSAGES;
185 gst_socket_src_finalize (GObject * gobject)
187 GstSocketSrc *this = GST_SOCKET_SRC (gobject);
190 gst_caps_unref (this->caps);
191 g_clear_object (&this->cancellable);
192 g_clear_object (&this->socket);
194 G_OBJECT_CLASS (parent_class)->finalize (gobject);
198 gst_socketsrc_event (GstBaseSrc * bsrc, GstEvent * event)
201 gboolean res = FALSE;
203 src = GST_SOCKET_SRC (bsrc);
205 switch (GST_EVENT_TYPE (event)) {
206 case GST_EVENT_CUSTOM_UPSTREAM:
207 if (src->send_messages && gst_event_has_name (event, "GstNetworkMessage")) {
208 const GstStructure *str = gst_event_get_structure (event);
211 GST_OBJECT_LOCK (src);
212 if ((socket = src->socket))
213 g_object_ref (socket);
214 GST_OBJECT_UNLOCK (src);
222 gst_structure_get (str, "buffer", GST_TYPE_BUFFER, &buf, NULL);
225 gst_buffer_map (buf, &map, GST_MAP_READ);
226 GST_LOG ("sending buffer of size %" G_GSIZE_FORMAT, map.size);
227 ret = g_socket_send_with_blocking (socket, (gchar *) map.data,
228 map.size, FALSE, src->cancellable, &err);
229 gst_buffer_unmap (buf, &map);
232 GST_WARNING ("could not send message: %s", err->message);
233 g_clear_error (&err);
237 gst_buffer_unref (buf);
239 g_object_unref (socket);
244 res = GST_BASE_SRC_CLASS (parent_class)->event (bsrc, event);
251 gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
253 GstSocketSrc *socketsrc;
254 GstCaps *caps, *result;
256 socketsrc = GST_SOCKET_SRC (src);
258 GST_OBJECT_LOCK (src);
259 if ((caps = socketsrc->caps))
261 GST_OBJECT_UNLOCK (src);
265 result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
266 gst_caps_unref (caps);
271 result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
277 gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
280 GstFlowReturn ret = GST_FLOW_OK;
284 GSocket *socket = NULL;
285 GSocketControlMessage **messages = NULL;
286 gint num_messages = 0;
291 src = GST_SOCKET_SRC (psrc);
293 GST_OBJECT_LOCK (src);
296 socket = g_object_ref (src->socket);
298 GST_OBJECT_UNLOCK (src);
303 GST_LOG_OBJECT (src, "asked for a buffer");
306 gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
307 ivec.buffer = map.data;
308 ivec.size = map.size;
310 g_socket_receive_message (socket, NULL, &ivec, 1, &messages,
311 &num_messages, &flags, src->cancellable, &err);
312 gst_buffer_unmap (outbuf, &map);
314 for (i = 0; i < num_messages; i++) {
315 gst_buffer_add_net_control_message_meta (outbuf, messages[i]);
316 g_object_unref (messages[i]);
323 GST_DEBUG_OBJECT (src, "Received EOS on socket %p fd %i", socket,
324 g_socket_get_fd (socket));
326 /* We've hit EOS but we'll send this signal to allow someone to change
327 * our socket before we send EOS downstream. */
328 g_signal_emit (src, gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER], 0);
330 GST_OBJECT_LOCK (src);
333 tmp = g_object_ref (src->socket);
335 GST_OBJECT_UNLOCK (src);
337 /* Do this dance with tmp to avoid unreffing with the lock held */
338 if (tmp != NULL && tmp != socket) {
340 g_clear_object (&tmp);
342 GST_INFO_OBJECT (src, "New socket available after EOS %p fd %i: Retrying",
343 socket, g_socket_get_fd (socket));
345 /* retry with our new socket: */
348 g_clear_object (&tmp);
349 GST_INFO_OBJECT (src, "Forwarding EOS downstream");
352 } else if (rret < 0) {
353 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
354 ret = GST_FLOW_FLUSHING;
355 GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
357 ret = GST_FLOW_ERROR;
358 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
359 ("Failed to read from socket: %s", err->message));
363 gst_buffer_resize (outbuf, 0, rret);
366 "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
367 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
368 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
369 gst_buffer_get_size (outbuf),
370 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
371 GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
372 GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
374 g_clear_error (&err);
375 g_clear_object (&socket);
381 GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
382 ("Cannot receive: No socket set on socketsrc"));
383 return GST_FLOW_ERROR;
388 gst_socket_src_set_property (GObject * object, guint prop_id,
389 const GValue * value, GParamSpec * pspec)
391 GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
395 GSocket *socket = G_SOCKET (g_value_dup_object (value));
396 GST_OBJECT_LOCK (socketsrc);
397 SWAP (socket, socketsrc->socket);
398 GST_OBJECT_UNLOCK (socketsrc);
399 g_clear_object (&socket);
404 const GstCaps *new_caps_val = gst_value_get_caps (value);
408 if (new_caps_val == NULL) {
409 new_caps = gst_caps_new_any ();
411 new_caps = gst_caps_copy (new_caps_val);
414 GST_OBJECT_LOCK (socketsrc);
415 old_caps = socketsrc->caps;
416 socketsrc->caps = new_caps;
417 GST_OBJECT_UNLOCK (socketsrc);
420 gst_caps_unref (old_caps);
422 gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (socketsrc));
425 case PROP_SEND_MESSAGES:
426 socketsrc->send_messages = g_value_get_boolean (value);
429 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
435 gst_socket_src_get_property (GObject * object, guint prop_id,
436 GValue * value, GParamSpec * pspec)
438 GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
442 g_value_set_object (value, socketsrc->socket);
445 GST_OBJECT_LOCK (socketsrc);
446 gst_value_set_caps (value, socketsrc->caps);
447 GST_OBJECT_UNLOCK (socketsrc);
449 case PROP_SEND_MESSAGES:
450 g_value_set_boolean (value, socketsrc->send_messages);
453 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
459 gst_socket_src_unlock (GstBaseSrc * bsrc)
461 GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
463 GST_DEBUG_OBJECT (src, "set to flushing");
464 g_cancellable_cancel (src->cancellable);
470 gst_socket_src_unlock_stop (GstBaseSrc * bsrc)
472 GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
474 GST_DEBUG_OBJECT (src, "unset flushing");
475 g_cancellable_reset (src->cancellable);