eafac1b6deedfbb9cfa44541ef3aaa749e66294e
[platform/upstream/libnice.git] / agent / inputstream.c
1 /*
2  * This file is part of the Nice GLib ICE library.
3  *
4  * (C) 2010, 2013 Collabora Ltd.
5  *  Contact: Youness Alaoui
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is the Nice GLib ICE library.
18  *
19  * The Initial Developers of the Original Code are Collabora Ltd and Nokia
20  * Corporation. All Rights Reserved.
21  *
22  * Contributors:
23  *   Youness Alaoui, Collabora Ltd.
24  *   Philip Withnall, Collabora Ltd.
25  *
26  * Alternatively, the contents of this file may be used under the terms of the
27  * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
28  * case the provisions of LGPL are applicable instead of those above. If you
29  * wish to allow use of your version of this file only under the terms of the
30  * LGPL and not to allow others to use your version of this file under the
31  * MPL, indicate your decision by deleting the provisions above and replace
32  * them with the notice and other provisions required by the LGPL. If you do
33  * not delete the provisions above, a recipient may use your version of this
34  * file under either the MPL or the LGPL.
35  */
36
37 /***
38  * SECTION:nice_input_stream
39  * @short_description: #GInputStream implementation for libnice
40  * @see_also: #NiceAgent
41  * @include: inputstream.h
42  * @stability: Stable
43  *
44  * #NiceInputStream is a #GInputStream wrapper for a single reliable stream and
45  * component of a #NiceAgent. Given an existing reliable #NiceAgent, plus the
46  * IDs of an existing stream and component in the agent, it will provide a
47  * streaming input interface for reading from the given component.
48  *
49  * A single #NiceInputStream can only be used with a single agent, stream and
50  * component triple, and will be closed as soon as that stream is removed from
51  * the agent (e.g. if nice_agent_remove_stream() is called from another thread).
52  * If g_input_stream_close() is called on a #NiceInputStream, the input stream
53  * and underlying #NiceAgent stream will be closed, but the underlying stream
54  * will not be removed. Use nice_agent_remove_stream() to do that.
55  *
56  * Since: 0.1.5
57  */
58
59 #ifdef HAVE_CONFIG_H
60 # include "config.h"
61 #endif
62
63 #include <errno.h>
64
65 #include "inputstream.h"
66 #include "agent-priv.h"
67
68 static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
69     gpointer user_data);
70 static void nice_input_stream_init_pollable (
71     GPollableInputStreamInterface *iface);
72
73 G_DEFINE_TYPE_WITH_CODE (NiceInputStream,
74                          nice_input_stream, G_TYPE_INPUT_STREAM,
75                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
76                              nice_input_stream_init_pollable));
77
78 enum
79 {
80   PROP_AGENT = 1,
81   PROP_STREAM_ID,
82   PROP_COMPONENT_ID,
83 };
84
85 struct _NiceInputStreamPrivate
86 {
87   GWeakRef/*<NiceAgent>*/ agent_ref;
88   guint stream_id;
89   guint component_id;
90 };
91
92 static void nice_input_stream_dispose (GObject *object);
93 static void nice_input_stream_get_property (GObject *object, guint prop_id,
94     GValue *value, GParamSpec *pspec);
95 static void nice_input_stream_set_property (GObject *object, guint prop_id,
96     const GValue *value, GParamSpec *pspec);
97 static gssize nice_input_stream_read (GInputStream *stream, void *buffer,
98     gsize count, GCancellable *cancellable, GError **error);
99 static gboolean nice_input_stream_close (GInputStream *stream,
100     GCancellable *cancellable, GError **error);
101 static gboolean nice_input_stream_is_readable (GPollableInputStream *stream);
102 static gssize nice_input_stream_read_nonblocking (GPollableInputStream *stream,
103     void *buffer, gsize count, GError **error);
104 static GSource *nice_input_stream_create_source (GPollableInputStream *stream,
105     GCancellable *cancellable);
106
107 static void
108 nice_input_stream_class_init (NiceInputStreamClass *klass)
109 {
110   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
111   GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
112
113   g_type_class_add_private (klass, sizeof (NiceInputStreamPrivate));
114
115   gobject_class->set_property = nice_input_stream_set_property;
116   gobject_class->get_property = nice_input_stream_get_property;
117   gobject_class->dispose = nice_input_stream_dispose;
118
119   stream_class->read_fn = nice_input_stream_read;
120   stream_class->close_fn = nice_input_stream_close;
121
122   /***
123    * NiceInputStream:agent:
124    *
125    * The #NiceAgent to wrap with an input stream. This must be an existing
126    * reliable agent.
127    *
128    * A reference is not held on the #NiceAgent. If the agent is destroyed before
129    * the #NiceInputStream, %G_IO_ERROR_CLOSED will be returned for all
130    * subsequent operations on the stream.
131    *
132    * Since: 0.1.5
133    */
134   g_object_class_install_property (gobject_class, PROP_AGENT,
135       g_param_spec_object ("agent",
136           "NiceAgent",
137           "The underlying NiceAgent",
138           NICE_TYPE_AGENT,
139           G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140
141   /***
142    * NiceInputStream:stream-id:
143    *
144    * ID of the stream to use in the #NiceInputStream:agent.
145    *
146    * Since: 0.1.5
147    */
148   g_object_class_install_property (gobject_class, PROP_STREAM_ID,
149       g_param_spec_uint (
150           "stream-id",
151           "Agent’s stream ID",
152           "The ID of the agent’s stream to wrap.",
153           0, G_MAXUINT,
154           0,
155           G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
156
157   /***
158    * NiceInputStream:component-id:
159    *
160    * ID of the component to use in the #NiceInputStream:agent.
161    *
162    * Since: 0.1.5
163    */
164   g_object_class_install_property (gobject_class, PROP_COMPONENT_ID,
165       g_param_spec_uint (
166           "component-id",
167           "Agent’s component ID",
168           "The ID of the agent’s component to wrap.",
169           0, G_MAXUINT,
170           0,
171           G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172 }
173
174 static void
175 nice_input_stream_dispose (GObject *object)
176 {
177   NiceInputStream *self = NICE_INPUT_STREAM (object);
178   NiceAgent *agent;
179
180   /* Ensure the stream is closed first, otherwise the agent can’t be found in
181    * the close handler called by the parent implementation. */
182   if (!g_input_stream_is_closed (G_INPUT_STREAM (object)))
183     g_input_stream_close (G_INPUT_STREAM (object), NULL, NULL);
184
185   agent = g_weak_ref_get (&self->priv->agent_ref);
186   if (agent != NULL) {
187     g_signal_handlers_disconnect_by_func (agent, streams_removed_cb, self);
188     g_object_unref (agent);
189   }
190
191   g_weak_ref_clear (&self->priv->agent_ref);
192
193   G_OBJECT_CLASS (nice_input_stream_parent_class)->dispose (object);
194 }
195
196 static void
197 nice_input_stream_get_property (GObject *object, guint prop_id,
198     GValue *value, GParamSpec *pspec)
199 {
200   NiceInputStream *self = NICE_INPUT_STREAM (object);
201
202   switch (prop_id) {
203     case PROP_AGENT:
204       g_value_take_object (value, g_weak_ref_get (&self->priv->agent_ref));
205       break;
206     case PROP_STREAM_ID:
207       g_value_set_uint (value, self->priv->stream_id);
208       break;
209     case PROP_COMPONENT_ID:
210       g_value_set_uint (value, self->priv->component_id);
211       break;
212      default:
213       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
214     }
215 }
216
217 static void
218 nice_input_stream_set_property (GObject *object, guint prop_id,
219     const GValue *value, GParamSpec *pspec)
220 {
221   NiceInputStream *self = NICE_INPUT_STREAM (object);
222
223   switch (prop_id) {
224     case PROP_AGENT: {
225       /* Construct only. */
226       NiceAgent *agent = g_value_dup_object (value);
227       g_weak_ref_set (&self->priv->agent_ref, agent);
228
229       /* agent may be NULL if the stream is being constructed by
230        * nice_io_stream_get_input_stream() after the NiceIOStream’s agent has
231        * already been finalised. */
232       if (agent != NULL) {
233         g_signal_connect (agent, "streams-removed",
234             (GCallback) streams_removed_cb, self);
235         g_object_unref (agent);
236       }
237
238       break;
239     }
240     case PROP_STREAM_ID:
241       /* Construct only. */
242       self->priv->stream_id = g_value_get_uint (value);
243       break;
244     case PROP_COMPONENT_ID:
245       /* Construct only. */
246       self->priv->component_id = g_value_get_uint (value);
247       break;
248      default:
249       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
250     }
251 }
252
253 static void
254 nice_input_stream_init (NiceInputStream *stream)
255 {
256   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, NICE_TYPE_INPUT_STREAM,
257       NiceInputStreamPrivate);
258
259   g_weak_ref_init (&stream->priv->agent_ref, NULL);
260 }
261
262 static void
263 nice_input_stream_init_pollable (GPollableInputStreamInterface *iface)
264 {
265   iface->is_readable = nice_input_stream_is_readable;
266   iface->read_nonblocking = nice_input_stream_read_nonblocking;
267   iface->create_source = nice_input_stream_create_source;
268 }
269
270 /***
271  * nice_input_stream_new:
272  * @agent: A #NiceAgent
273  * @stream_id: The ID of the agent’s stream to wrap
274  * @component_id: The ID of the agent’s component to wrap
275  *
276  * Create a new #NiceInputStream wrapping the given stream/component from
277  * @agent, which must be a reliable #NiceAgent.
278  *
279  * The constructed #NiceInputStream will not hold a reference to @agent. If
280  * @agent is destroyed before the input stream, %G_IO_ERROR_CLOSED will be
281  * returned for all subsequent operations on the stream.
282  *
283  * Returns: The new #NiceInputStream object
284  *
285  * Since: 0.1.5
286  */
287 NiceInputStream *
288 nice_input_stream_new (NiceAgent *agent, guint stream_id, guint component_id)
289 {
290   g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
291   g_return_val_if_fail (stream_id >= 1, NULL);
292   g_return_val_if_fail (component_id >= 1, NULL);
293
294   return g_object_new (NICE_TYPE_INPUT_STREAM,
295       "agent", agent,
296       "stream-id", stream_id,
297       "component-id", component_id,
298       NULL);
299 }
300
301 static gssize
302 nice_input_stream_read (GInputStream *stream, void *buffer, gsize count,
303     GCancellable *cancellable, GError **error)
304 {
305   NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
306   NiceAgent *agent;  /* owned */
307   gssize len;
308
309   /* Closed streams are not readable. */
310   if (g_input_stream_is_closed (stream)) {
311     return 0;
312   }
313
314   /* Has the agent disappeared? */
315   agent = g_weak_ref_get (&priv->agent_ref);
316   if (agent == NULL) {
317     g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
318         "Stream is closed due to the NiceAgent being finalised.");
319     return -1;
320   }
321
322   len = nice_agent_recv (agent, priv->stream_id, priv->component_id,
323                          buffer, count, cancellable, error);
324
325   g_object_unref (agent);
326
327   return len;
328 }
329
330 static gboolean
331 nice_input_stream_close (GInputStream *stream, GCancellable *cancellable,
332     GError **error)
333 {
334   NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
335   NiceComponent *component = NULL;
336   NiceStream *_stream = NULL;
337   NiceAgent *agent;  /* owned */
338
339   /* Has the agent disappeared? */
340   agent = g_weak_ref_get (&priv->agent_ref);
341   if (agent == NULL)
342     return TRUE;
343
344   agent_lock (agent);
345
346   /* Shut down the read side of the pseudo-TCP stream, if it still exists. */
347   if (agent_find_component (agent, priv->stream_id, priv->component_id,
348           &_stream, &component) && agent->reliable &&
349       !pseudo_tcp_socket_is_closed (component->tcp)) {
350     pseudo_tcp_socket_shutdown (component->tcp, PSEUDO_TCP_SHUTDOWN_RD);
351   }
352
353   agent_unlock (agent);
354
355   g_object_unref (agent);
356
357   return TRUE;
358 }
359
360 static gboolean
361 nice_input_stream_is_readable (GPollableInputStream *stream)
362 {
363   NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
364   NiceComponent *component = NULL;
365   NiceStream *_stream = NULL;
366   gboolean retval = FALSE;
367   GSList *i;
368   NiceAgent *agent;  /* owned */
369
370   /* Closed streams are not readable. */
371   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
372     return FALSE;
373
374   /* Has the agent disappeared? */
375   agent = g_weak_ref_get (&priv->agent_ref);
376   if (agent == NULL)
377     return FALSE;
378
379   agent_lock (agent);
380
381   if (!agent_find_component (agent, priv->stream_id, priv->component_id,
382           &_stream, &component)) {
383     g_warning ("Could not find component %u in stream %u", priv->component_id,
384         priv->stream_id);
385     goto done;
386   }
387
388   /* If it’s a reliable agent, see if there’s any pending data in the pseudo-TCP
389    * buffer. */
390   if (agent->reliable &&
391       pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
392     retval = TRUE;
393     goto done;
394   }
395
396   /* Check whether any of the component’s FDs are pollable. */
397   for (i = component->socket_sources; i != NULL; i = i->next) {
398     SocketSource *socket_source = i->data;
399     NiceSocket *nicesock = socket_source->socket;
400
401     if (g_socket_condition_check (nicesock->fileno, G_IO_IN) != 0) {
402       retval = TRUE;
403       break;
404     }
405   }
406
407 done:
408   agent_unlock (agent);
409
410   g_object_unref (agent);
411
412   return retval;
413 }
414
415 static gssize
416 nice_input_stream_read_nonblocking (GPollableInputStream *stream, void *buffer,
417     gsize count, GError **error)
418 {
419   NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
420   NiceAgent *agent;  /* owned */
421   gssize len;
422
423   /* Closed streams are not readable. */
424   if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) {
425     return 0;
426   }
427
428   /* Has the agent disappeared? */
429   agent = g_weak_ref_get (&priv->agent_ref);
430   if (agent == NULL) {
431     g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
432         "Stream is closed due to the NiceAgent being finalised.");
433     return -1;
434   }
435
436   len = nice_agent_recv_nonblocking (agent, priv->stream_id,
437       priv->component_id, (guint8 *) buffer, count, NULL, error);
438
439   g_object_unref (agent);
440
441   return len;
442 }
443
444 static GSource *
445 nice_input_stream_create_source (GPollableInputStream *stream,
446     GCancellable *cancellable)
447 {
448   NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
449   GSource *component_source = NULL;
450   NiceAgent *agent;  /* owned */
451
452   /* Closed streams cannot have sources. */
453   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
454     goto dummy_source;
455
456   /* Has the agent disappeared? */
457   agent = g_weak_ref_get (&priv->agent_ref);
458   if (agent == NULL)
459     goto dummy_source;
460
461   component_source = nice_component_input_source_new (agent, priv->stream_id,
462       priv->component_id, stream, cancellable);
463
464   g_object_unref (agent);
465
466   return component_source;
467
468  dummy_source:
469
470   component_source = g_pollable_source_new (G_OBJECT (stream));
471
472   if (cancellable) {
473     GSource *cancellable_source = g_cancellable_source_new (cancellable);
474
475     g_source_set_dummy_callback (cancellable_source);
476     g_source_add_child_source (component_source, cancellable_source);
477     g_source_unref (cancellable_source);
478   }
479
480   return component_source;
481 }
482
483 static void
484 streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
485 {
486   NiceInputStream *self = NICE_INPUT_STREAM (user_data);
487   guint i;
488
489   for (i = 0; stream_ids[i] != 0; i++) {
490     if (stream_ids[i] == self->priv->stream_id) {
491       /* The socket has been closed. */
492       g_input_stream_close (G_INPUT_STREAM (self), NULL, NULL);
493       break;
494     }
495   }
496 }