2 * This file is part of the Nice GLib ICE library.
4 * (C) 2010, 2013 Collabora Ltd.
5 * Contact: Youness Alaoui
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
17 * The Original Code is the Nice GLib ICE library.
19 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
20 * Corporation. All Rights Reserved.
23 * Youness Alaoui, Collabora Ltd.
24 * Philip Withnall, Collabora Ltd.
26 * Alternatively, the contents of this file may be used under the terms of the
27 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
28 * case the provisions of LGPL are applicable instead of those above. If you
29 * wish to allow use of your version of this file only under the terms of the
30 * LGPL and not to allow others to use your version of this file under the
31 * MPL, indicate your decision by deleting the provisions above and replace
32 * them with the notice and other provisions required by the LGPL. If you do
33 * not delete the provisions above, a recipient may use your version of this
34 * file under either the MPL or the LGPL.
38 * SECTION:nice_input_stream
39 * @short_description: #GInputStream implementation for libnice
40 * @see_also: #NiceAgent
41 * @include: inputstream.h
44 * #NiceInputStream is a #GInputStream wrapper for a single reliable stream and
45 * component of a #NiceAgent. Given an existing reliable #NiceAgent, plus the
46 * IDs of an existing stream and component in the agent, it will provide a
47 * streaming input interface for reading from the given component.
49 * A single #NiceInputStream can only be used with a single agent, stream and
50 * component triple, and will be closed as soon as that stream is removed from
51 * the agent (e.g. if nice_agent_remove_stream() is called from another thread).
52 * If g_input_stream_close() is called on a #NiceInputStream, the input stream
53 * and underlying #NiceAgent stream will be closed, but the underlying stream
54 * will not be removed. Use nice_agent_remove_stream() to do that.
65 #include "inputstream.h"
66 #include "agent-priv.h"
68 static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
70 static void nice_input_stream_init_pollable (
71 GPollableInputStreamInterface *iface);
73 G_DEFINE_TYPE_WITH_CODE (NiceInputStream,
74 nice_input_stream, G_TYPE_INPUT_STREAM,
75 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
76 nice_input_stream_init_pollable));
85 struct _NiceInputStreamPrivate
87 GWeakRef/*<NiceAgent>*/ agent_ref;
92 static void nice_input_stream_dispose (GObject *object);
93 static void nice_input_stream_get_property (GObject *object, guint prop_id,
94 GValue *value, GParamSpec *pspec);
95 static void nice_input_stream_set_property (GObject *object, guint prop_id,
96 const GValue *value, GParamSpec *pspec);
97 static gssize nice_input_stream_read (GInputStream *stream, void *buffer,
98 gsize count, GCancellable *cancellable, GError **error);
99 static gboolean nice_input_stream_close (GInputStream *stream,
100 GCancellable *cancellable, GError **error);
101 static gboolean nice_input_stream_is_readable (GPollableInputStream *stream);
102 static gssize nice_input_stream_read_nonblocking (GPollableInputStream *stream,
103 void *buffer, gsize count, GError **error);
104 static GSource *nice_input_stream_create_source (GPollableInputStream *stream,
105 GCancellable *cancellable);
108 nice_input_stream_class_init (NiceInputStreamClass *klass)
110 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
111 GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
113 g_type_class_add_private (klass, sizeof (NiceInputStreamPrivate));
115 gobject_class->set_property = nice_input_stream_set_property;
116 gobject_class->get_property = nice_input_stream_get_property;
117 gobject_class->dispose = nice_input_stream_dispose;
119 stream_class->read_fn = nice_input_stream_read;
120 stream_class->close_fn = nice_input_stream_close;
123 * NiceInputStream:agent:
125 * The #NiceAgent to wrap with an input stream. This must be an existing
128 * A reference is not held on the #NiceAgent. If the agent is destroyed before
129 * the #NiceInputStream, %G_IO_ERROR_CLOSED will be returned for all
130 * subsequent operations on the stream.
134 g_object_class_install_property (gobject_class, PROP_AGENT,
135 g_param_spec_object ("agent",
137 "The underlying NiceAgent",
139 G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
142 * NiceInputStream:stream-id:
144 * ID of the stream to use in the #NiceInputStream:agent.
148 g_object_class_install_property (gobject_class, PROP_STREAM_ID,
152 "The ID of the agent’s stream to wrap.",
155 G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158 * NiceInputStream:component-id:
160 * ID of the component to use in the #NiceInputStream:agent.
164 g_object_class_install_property (gobject_class, PROP_COMPONENT_ID,
167 "Agent’s component ID",
168 "The ID of the agent’s component to wrap.",
171 G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
175 nice_input_stream_dispose (GObject *object)
177 NiceInputStream *self = NICE_INPUT_STREAM (object);
180 /* Ensure the stream is closed first, otherwise the agent can’t be found in
181 * the close handler called by the parent implementation. */
182 if (!g_input_stream_is_closed (G_INPUT_STREAM (object)))
183 g_input_stream_close (G_INPUT_STREAM (object), NULL, NULL);
185 agent = g_weak_ref_get (&self->priv->agent_ref);
187 g_signal_handlers_disconnect_by_func (agent, streams_removed_cb, self);
188 g_object_unref (agent);
191 g_weak_ref_clear (&self->priv->agent_ref);
193 G_OBJECT_CLASS (nice_input_stream_parent_class)->dispose (object);
197 nice_input_stream_get_property (GObject *object, guint prop_id,
198 GValue *value, GParamSpec *pspec)
200 NiceInputStream *self = NICE_INPUT_STREAM (object);
204 g_value_take_object (value, g_weak_ref_get (&self->priv->agent_ref));
207 g_value_set_uint (value, self->priv->stream_id);
209 case PROP_COMPONENT_ID:
210 g_value_set_uint (value, self->priv->component_id);
213 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
218 nice_input_stream_set_property (GObject *object, guint prop_id,
219 const GValue *value, GParamSpec *pspec)
221 NiceInputStream *self = NICE_INPUT_STREAM (object);
225 /* Construct only. */
226 NiceAgent *agent = g_value_dup_object (value);
227 g_weak_ref_set (&self->priv->agent_ref, agent);
229 /* agent may be NULL if the stream is being constructed by
230 * nice_io_stream_get_input_stream() after the NiceIOStream’s agent has
231 * already been finalised. */
233 g_signal_connect (agent, "streams-removed",
234 (GCallback) streams_removed_cb, self);
235 g_object_unref (agent);
241 /* Construct only. */
242 self->priv->stream_id = g_value_get_uint (value);
244 case PROP_COMPONENT_ID:
245 /* Construct only. */
246 self->priv->component_id = g_value_get_uint (value);
249 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254 nice_input_stream_init (NiceInputStream *stream)
256 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, NICE_TYPE_INPUT_STREAM,
257 NiceInputStreamPrivate);
259 g_weak_ref_init (&stream->priv->agent_ref, NULL);
263 nice_input_stream_init_pollable (GPollableInputStreamInterface *iface)
265 iface->is_readable = nice_input_stream_is_readable;
266 iface->read_nonblocking = nice_input_stream_read_nonblocking;
267 iface->create_source = nice_input_stream_create_source;
271 * nice_input_stream_new:
272 * @agent: A #NiceAgent
273 * @stream_id: The ID of the agent’s stream to wrap
274 * @component_id: The ID of the agent’s component to wrap
276 * Create a new #NiceInputStream wrapping the given stream/component from
277 * @agent, which must be a reliable #NiceAgent.
279 * The constructed #NiceInputStream will not hold a reference to @agent. If
280 * @agent is destroyed before the input stream, %G_IO_ERROR_CLOSED will be
281 * returned for all subsequent operations on the stream.
283 * Returns: The new #NiceInputStream object
288 nice_input_stream_new (NiceAgent *agent, guint stream_id, guint component_id)
290 g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
291 g_return_val_if_fail (stream_id >= 1, NULL);
292 g_return_val_if_fail (component_id >= 1, NULL);
294 return g_object_new (NICE_TYPE_INPUT_STREAM,
296 "stream-id", stream_id,
297 "component-id", component_id,
302 nice_input_stream_read (GInputStream *stream, void *buffer, gsize count,
303 GCancellable *cancellable, GError **error)
305 NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
306 NiceAgent *agent; /* owned */
309 /* Closed streams are not readable. */
310 if (g_input_stream_is_closed (stream)) {
314 /* Has the agent disappeared? */
315 agent = g_weak_ref_get (&priv->agent_ref);
317 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
318 "Stream is closed due to the NiceAgent being finalised.");
322 len = nice_agent_recv (agent, priv->stream_id, priv->component_id,
323 buffer, count, cancellable, error);
325 g_object_unref (agent);
331 nice_input_stream_close (GInputStream *stream, GCancellable *cancellable,
334 NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
335 NiceComponent *component = NULL;
336 NiceStream *_stream = NULL;
337 NiceAgent *agent; /* owned */
339 /* Has the agent disappeared? */
340 agent = g_weak_ref_get (&priv->agent_ref);
346 /* Shut down the read side of the pseudo-TCP stream, if it still exists. */
347 if (agent_find_component (agent, priv->stream_id, priv->component_id,
348 &_stream, &component) && agent->reliable &&
349 !pseudo_tcp_socket_is_closed (component->tcp)) {
350 pseudo_tcp_socket_shutdown (component->tcp, PSEUDO_TCP_SHUTDOWN_RD);
353 agent_unlock (agent);
355 g_object_unref (agent);
361 nice_input_stream_is_readable (GPollableInputStream *stream)
363 NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
364 NiceComponent *component = NULL;
365 NiceStream *_stream = NULL;
366 gboolean retval = FALSE;
368 NiceAgent *agent; /* owned */
370 /* Closed streams are not readable. */
371 if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
374 /* Has the agent disappeared? */
375 agent = g_weak_ref_get (&priv->agent_ref);
381 if (!agent_find_component (agent, priv->stream_id, priv->component_id,
382 &_stream, &component)) {
383 g_warning ("Could not find component %u in stream %u", priv->component_id,
388 /* If it’s a reliable agent, see if there’s any pending data in the pseudo-TCP
390 if (agent->reliable &&
391 pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
396 /* Check whether any of the component’s FDs are pollable. */
397 for (i = component->socket_sources; i != NULL; i = i->next) {
398 SocketSource *socket_source = i->data;
399 NiceSocket *nicesock = socket_source->socket;
401 if (g_socket_condition_check (nicesock->fileno, G_IO_IN) != 0) {
408 agent_unlock (agent);
410 g_object_unref (agent);
416 nice_input_stream_read_nonblocking (GPollableInputStream *stream, void *buffer,
417 gsize count, GError **error)
419 NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
420 NiceAgent *agent; /* owned */
423 /* Closed streams are not readable. */
424 if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) {
428 /* Has the agent disappeared? */
429 agent = g_weak_ref_get (&priv->agent_ref);
431 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
432 "Stream is closed due to the NiceAgent being finalised.");
436 len = nice_agent_recv_nonblocking (agent, priv->stream_id,
437 priv->component_id, (guint8 *) buffer, count, NULL, error);
439 g_object_unref (agent);
445 nice_input_stream_create_source (GPollableInputStream *stream,
446 GCancellable *cancellable)
448 NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
449 GSource *component_source = NULL;
450 NiceAgent *agent; /* owned */
452 /* Closed streams cannot have sources. */
453 if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
456 /* Has the agent disappeared? */
457 agent = g_weak_ref_get (&priv->agent_ref);
461 component_source = nice_component_input_source_new (agent, priv->stream_id,
462 priv->component_id, stream, cancellable);
464 g_object_unref (agent);
466 return component_source;
470 component_source = g_pollable_source_new (G_OBJECT (stream));
473 GSource *cancellable_source = g_cancellable_source_new (cancellable);
475 g_source_set_dummy_callback (cancellable_source);
476 g_source_add_child_source (component_source, cancellable_source);
477 g_source_unref (cancellable_source);
480 return component_source;
484 streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
486 NiceInputStream *self = NICE_INPUT_STREAM (user_data);
489 for (i = 0; stream_ids[i] != 0; i++) {
490 if (stream_ids[i] == self->priv->stream_id) {
491 /* The socket has been closed. */
492 g_input_stream_close (G_INPUT_STREAM (self), NULL, NULL);