94be750e0182cd5286065c7d1dcdc6d0c3e4d7e6
[platform/upstream/libnice.git] / agent / outputstream.c
1 /*
2  * This file is part of the Nice GLib ICE library.
3  *
4  * (C) 2010, 2013 Collabora Ltd.
5  *  Contact: Youness Alaoui
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is the Nice GLib ICE library.
18  *
19  * The Initial Developers of the Original Code are Collabora Ltd and Nokia
20  * Corporation. All Rights Reserved.
21  *
22  * Contributors:
23  *   Youness Alaoui, Collabora Ltd.
24  *   Philip Withnall, Collabora Ltd.
25  *
26  * Alternatively, the contents of this file may be used under the terms of the
27  * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
28  * case the provisions of LGPL are applicable instead of those above. If you
29  * wish to allow use of your version of this file only under the terms of the
30  * LGPL and not to allow others to use your version of this file under the
31  * MPL, indicate your decision by deleting the provisions above and replace
32  * them with the notice and other provisions required by the LGPL. If you do
33  * not delete the provisions above, a recipient may use your version of this
34  * file under either the MPL or the LGPL.
35  */
36
37 /***
38  * SECTION:nice_output_stream
39  * @short_description: #GOutputStream implementation for libnice
40  * @see_also: #NiceAgent
41  * @include: outputstream.h
42  * @stability: Stable
43  *
44  * #NiceOutputStream is a #GOutputStream wrapper for a single reliable stream
45  * and component of a #NiceAgent. Given an existing reliable #NiceAgent, plus
46  * the IDs of an existing stream and component in the agent, it will provide a
47  * streaming output interface for writing to the given component.
48  *
49  * A single #NiceOutputStream can only be used with a single agent, stream and
50  * component triple, and will be closed as soon as that stream is removed from
51  * the agent (e.g. if nice_agent_remove_stream() is called from another thread).
52  * If g_output_stream_close() is called on a #NiceOutputStream, the output
53  * stream and underlying #NiceAgent stream will be closed, but the underlying
54  * stream will not be removed. Use nice_agent_remove_stream() to do that.
55  *
56  * The output stream can only be used once the
57  * #NiceAgent::reliable-transport-writable signal has been received for the
58  * stream/component pair. Any calls to g_output_stream_write() before then will
59  * return %G_IO_ERROR_BROKEN_PIPE.
60  *
61  * Since: 0.1.5
62  */
63
64 #ifdef HAVE_CONFIG_H
65 # include "config.h"
66 #endif
67
68 #include <errno.h>
69
70 #include "outputstream.h"
71 #include "agent-priv.h"
72
73 static void nice_output_stream_init_pollable (
74     GPollableOutputStreamInterface *iface);
75 static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
76     gpointer user_data);
77
78 G_DEFINE_TYPE_WITH_CODE (NiceOutputStream,
79                          nice_output_stream, G_TYPE_OUTPUT_STREAM,
80                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
81                                                 nice_output_stream_init_pollable));
82
83 enum
84 {
85   PROP_AGENT = 1,
86   PROP_STREAM_ID,
87   PROP_COMPONENT_ID,
88 };
89
90 struct _NiceOutputStreamPrivate
91 {
92   GWeakRef/*<NiceAgent>*/ agent_ref;
93   guint stream_id;
94   guint component_id;
95
96   GCancellable *closed_cancellable;
97 };
98
99 static void nice_output_stream_dispose (GObject *object);
100 static void nice_output_stream_get_property (GObject *object, guint prop_id,
101     GValue *value, GParamSpec *pspec);
102 static void nice_output_stream_set_property (GObject *object, guint prop_id,
103     const GValue *value, GParamSpec *pspec);
104
105 static gssize nice_output_stream_write (GOutputStream *stream,
106     const void *buffer, gsize count, GCancellable *cancellable, GError **error);
107 static gboolean nice_output_stream_close (GOutputStream *stream,
108     GCancellable *cancellable, GError **error);
109
110 static gboolean nice_output_stream_is_writable (GPollableOutputStream *stream);
111 static gssize nice_output_stream_write_nonblocking (
112     GPollableOutputStream *stream, const void *buffer, gsize count,
113     GError **error);
114 static GSource *nice_output_stream_create_source (GPollableOutputStream *stream,
115     GCancellable *cancellable);
116
117 /* Output Stream */
118 static void
119 nice_output_stream_class_init (NiceOutputStreamClass *klass)
120 {
121   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
122   GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
123
124   g_type_class_add_private (klass, sizeof (NiceOutputStreamPrivate));
125
126   stream_class->write_fn = nice_output_stream_write;
127   stream_class->close_fn = nice_output_stream_close;
128
129   gobject_class->set_property = nice_output_stream_set_property;
130   gobject_class->get_property = nice_output_stream_get_property;
131   gobject_class->dispose = nice_output_stream_dispose;
132
133   /***
134    * NiceOutputStream:agent:
135    *
136    * The #NiceAgent to wrap with an output stream. This must be an existing
137    * reliable agent.
138    *
139    * A reference is not held on the #NiceAgent. If the agent is destroyed before
140    * the #NiceOutputStream, %G_IO_ERROR_CLOSED will be returned for all
141    * subsequent operations on the stream.
142    *
143    * Since: 0.1.5
144    */
145   g_object_class_install_property (gobject_class, PROP_AGENT,
146       g_param_spec_object ("agent",
147           "NiceAgent",
148           "The underlying NiceAgent",
149           NICE_TYPE_AGENT,
150           G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151
152   /***
153    * NiceOutputStream:stream-id:
154    *
155    * ID of the stream to use in the #NiceOutputStream:agent.
156    *
157    * Since: 0.1.5
158    */
159   g_object_class_install_property (gobject_class, PROP_STREAM_ID,
160       g_param_spec_uint (
161           "stream-id",
162           "Agent’s stream ID",
163           "The ID of the agent’s stream to wrap.",
164           0, G_MAXUINT,
165           0,
166           G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167
168   /***
169    * NiceOutputStream:component-id:
170    *
171    * ID of the component to use in the #NiceOutputStream:agent.
172    *
173    * Since: 0.1.5
174    */
175   g_object_class_install_property (gobject_class, PROP_COMPONENT_ID,
176       g_param_spec_uint (
177           "component-id",
178           "Agent’s component ID",
179           "The ID of the agent’s component to wrap.",
180           0, G_MAXUINT,
181           0,
182           G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
183 }
184
185 static void
186 nice_output_stream_dispose (GObject *object)
187 {
188   NiceOutputStream *self = NICE_OUTPUT_STREAM (object);
189   NiceAgent *agent;
190
191   /* Ensure the stream is closed first, otherwise the agent can’t be found in
192    * the close handler called by the parent implementation. */
193   if (!g_output_stream_is_closed (G_OUTPUT_STREAM (object)))
194     g_output_stream_close (G_OUTPUT_STREAM (object), NULL, NULL);
195
196   agent = g_weak_ref_get (&self->priv->agent_ref);
197   if (agent != NULL) {
198     g_signal_handlers_disconnect_by_func (agent, streams_removed_cb, self);
199     g_object_unref (agent);
200   }
201
202   g_weak_ref_clear (&self->priv->agent_ref);
203
204   g_clear_object (&self->priv->closed_cancellable);
205
206   G_OBJECT_CLASS (nice_output_stream_parent_class)->dispose (object);
207 }
208
209 static void
210 nice_output_stream_get_property (GObject *object, guint prop_id,
211     GValue *value, GParamSpec *pspec)
212 {
213   NiceOutputStream *self = NICE_OUTPUT_STREAM (object);
214
215   switch (prop_id) {
216     case PROP_AGENT:
217       g_value_take_object (value, g_weak_ref_get (&self->priv->agent_ref));
218       break;
219     case PROP_STREAM_ID:
220       g_value_set_uint (value, self->priv->stream_id);
221       break;
222     case PROP_COMPONENT_ID:
223       g_value_set_uint (value, self->priv->component_id);
224       break;
225      default:
226       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
227     }
228 }
229
230 static void
231 nice_output_stream_set_property (GObject *object, guint prop_id,
232     const GValue *value, GParamSpec *pspec)
233 {
234   NiceOutputStream *self = NICE_OUTPUT_STREAM (object);
235
236   switch (prop_id) {
237     case PROP_AGENT: {
238       /* Construct only. */
239       NiceAgent *agent = g_value_dup_object (value);
240       g_weak_ref_set (&self->priv->agent_ref, agent);
241
242       /* agent may be NULL if the stream is being constructed by
243        * nice_io_stream_get_output_stream() after the NiceIOStream’s agent has
244        * already been finalised. */
245       if (agent != NULL) {
246         g_signal_connect (agent, "streams-removed",
247             (GCallback) streams_removed_cb, self);
248         g_object_unref (agent);
249       }
250
251       break;
252     }
253     case PROP_STREAM_ID:
254       /* Construct only. */
255       self->priv->stream_id = g_value_get_uint (value);
256       break;
257     case PROP_COMPONENT_ID:
258       /* Construct only. */
259       self->priv->component_id = g_value_get_uint (value);
260       break;
261      default:
262       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
263     }
264 }
265
266 static void
267 nice_output_stream_init (NiceOutputStream *stream)
268 {
269   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, NICE_TYPE_OUTPUT_STREAM,
270       NiceOutputStreamPrivate);
271
272   g_weak_ref_init (&stream->priv->agent_ref, NULL);
273   stream->priv->closed_cancellable = g_cancellable_new ();
274 }
275
276 static void
277 nice_output_stream_init_pollable (GPollableOutputStreamInterface *iface)
278 {
279   iface->is_writable = nice_output_stream_is_writable;
280   iface->write_nonblocking = nice_output_stream_write_nonblocking;
281   iface->create_source = nice_output_stream_create_source;
282 }
283
284 /***
285  * nice_output_stream_new:
286  * @agent: A #NiceAgent
287  * @stream_id: The ID of the agent’s stream to wrap
288  * @component_id: The ID of the agent’s component to wrap
289  *
290  * Create a new #NiceOutputStream wrapping the given stream/component from
291  * @agent, which must be a reliable #NiceAgent.
292  *
293  * The constructed #NiceOutputStream will not hold a reference to @agent. If
294  * @agent is destroyed before the output stream, %G_IO_ERROR_CLOSED will be
295  * returned for all subsequent operations on the stream.
296  *
297  * Returns: The new #NiceOutputStream object
298  *
299  * Since: 0.1.5
300  */
301 NiceOutputStream *
302 nice_output_stream_new (NiceAgent *agent, guint stream_id, guint component_id)
303 {
304   g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
305   g_return_val_if_fail (stream_id >= 1, NULL);
306   g_return_val_if_fail (component_id >= 1, NULL);
307
308   return g_object_new (NICE_TYPE_OUTPUT_STREAM,
309       "agent", agent,
310       "stream-id", stream_id,
311       "component-id", component_id,
312       NULL);
313 }
314
315 typedef struct {
316   volatile gint ref_count;
317
318   GCond cond;
319   GMutex mutex;
320
321   gboolean writable;
322   gboolean cancelled;
323 } WriteData;
324
325 static WriteData *
326 write_data_ref (WriteData *write_data)
327 {
328   g_atomic_int_inc (&write_data->ref_count);
329   return write_data;
330 }
331
332 static void
333 write_data_unref (WriteData *write_data)
334 {
335   if (g_atomic_int_dec_and_test (&write_data->ref_count)) {
336     g_cond_clear (&write_data->cond);
337     g_mutex_clear (&write_data->mutex);
338     g_slice_free (WriteData, write_data);
339   }
340 }
341
342 static void
343 write_cancelled_cb (GCancellable *cancellable, gpointer user_data)
344 {
345   WriteData *write_data = user_data;
346
347   g_mutex_lock (&write_data->mutex);
348   g_cond_broadcast (&write_data->cond);
349   write_data->cancelled = TRUE;
350   g_mutex_unlock (&write_data->mutex);
351 }
352
353 static void
354 reliable_transport_writeable_cb (NiceAgent *agent, guint stream_id,
355     guint component_id, gpointer user_data)
356 {
357   WriteData *write_data = user_data;
358
359   g_mutex_lock (&write_data->mutex);
360   write_data->writable = TRUE;
361   g_cond_broadcast (&write_data->cond);
362   g_mutex_unlock (&write_data->mutex);
363 }
364
365 static gssize
366 nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count,
367     GCancellable *cancellable, GError **error)
368 {
369   NiceOutputStream *self = NICE_OUTPUT_STREAM (stream);
370   const gchar* buf = buffer;
371   gssize len = 0;
372   gint n_sent;
373   NiceAgent *agent = NULL;  /* owned */
374   gulong cancel_id = 0, closed_cancel_id, writeable_id;
375   WriteData *write_data;
376
377   /* Closed streams are not writeable. */
378   if (g_output_stream_is_closed (stream)) {
379     g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
380         "Stream is closed.");
381     return -1;
382   }
383
384   /* Has the agent disappeared? */
385   agent = g_weak_ref_get (&self->priv->agent_ref);
386   if (agent == NULL) {
387     g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
388         "Stream is closed due to the NiceAgent being finalised.");
389     return -1;
390   }
391
392   if (count == 0) {
393     g_object_unref (agent);
394     return 0;
395   }
396
397   /* FIXME: nice_agent_send() is non-blocking, which is a bit unexpected
398    * since nice_agent_recv() is blocking. Currently this uses a fairly dodgy
399    * GCond solution; would be much better for nice_agent_send() to block
400    * properly in the main loop. */
401   write_data = g_slice_new0 (WriteData);
402   write_data->ref_count = 1;
403   g_mutex_init (&write_data->mutex);
404   g_cond_init (&write_data->cond);
405
406   if (cancellable != NULL) {
407     cancel_id = g_cancellable_connect (cancellable,
408         (GCallback) write_cancelled_cb, write_data_ref (write_data),
409         (GDestroyNotify) write_data_unref);
410   }
411
412   closed_cancel_id = g_cancellable_connect (self->priv->closed_cancellable,
413       (GCallback) write_cancelled_cb, write_data_ref (write_data),
414       (GDestroyNotify) write_data_unref);
415
416   g_mutex_lock (&write_data->mutex);
417
418   writeable_id = g_signal_connect_data (G_OBJECT (agent),
419       "reliable-transport-writable",
420       (GCallback) reliable_transport_writeable_cb, write_data_ref (write_data),
421       (GClosureNotify) G_CALLBACK (write_data_unref), 0);
422
423
424   do {
425     /* Have to unlock while calling into the agent because
426      * it will take the agent lock which will cause a deadlock if one of
427      * the callbacks is called.
428      */
429     if (g_cancellable_is_cancelled (cancellable) ||
430         g_cancellable_is_cancelled (self->priv->closed_cancellable))
431       break;
432
433     write_data->writable = FALSE;
434     g_mutex_unlock (&write_data->mutex);
435
436     n_sent = nice_agent_send (agent, self->priv->stream_id,
437         self->priv->component_id, count - len, buf + len);
438
439     g_mutex_lock (&write_data->mutex);
440
441     if (n_sent <= 0) {
442       if (!write_data->writable && !write_data->cancelled)
443         g_cond_wait (&write_data->cond, &write_data->mutex);
444     } else if (n_sent > 0) {
445       /* Success. */
446       len += n_sent;
447     }
448   } while ((gsize) len < count);
449
450   g_signal_handler_disconnect (G_OBJECT (agent), writeable_id);
451   g_mutex_unlock (&write_data->mutex);
452
453   if (cancel_id)
454     g_cancellable_disconnect (cancellable, cancel_id);
455   g_cancellable_disconnect (self->priv->closed_cancellable, closed_cancel_id);
456
457   if (len == 0) {
458     len = -1;
459     if (!g_cancellable_set_error_if_cancelled (cancellable, error)) {
460       if (g_cancellable_is_cancelled (self->priv->closed_cancellable))
461         g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
462             "Stream has been removed from agent");
463     }
464   }
465
466   write_data_unref (write_data);
467
468   g_object_unref (agent);
469   g_assert_cmpint (len, !=, 0);
470
471   return len;
472 }
473
474 static gboolean
475 nice_output_stream_close (GOutputStream *stream, GCancellable *cancellable,
476     GError **error)
477 {
478   NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
479   NiceComponent *component = NULL;
480   NiceStream *_stream = NULL;
481   NiceAgent *agent;  /* owned */
482
483   /* Has the agent disappeared? */
484   agent = g_weak_ref_get (&priv->agent_ref);
485   if (agent == NULL)
486     return TRUE;
487
488   agent_lock (agent);
489
490   /* Shut down the write side of the pseudo-TCP stream. */
491   if (agent_find_component (agent, priv->stream_id, priv->component_id,
492           &_stream, &component) && agent->reliable &&
493       !pseudo_tcp_socket_is_closed (component->tcp)) {
494     pseudo_tcp_socket_shutdown (component->tcp, PSEUDO_TCP_SHUTDOWN_WR);
495   }
496
497   agent_unlock (agent);
498
499   g_object_unref (agent);
500
501   return TRUE;
502 }
503
504 static gboolean
505 nice_output_stream_is_writable (GPollableOutputStream *stream)
506 {
507   NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
508   NiceComponent *component = NULL;
509   NiceStream *_stream = NULL;
510   gboolean retval = FALSE;
511   NiceAgent *agent;  /* owned */
512
513   /* Closed streams are not writeable. */
514   if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
515     return FALSE;
516
517   /* Has the agent disappeared? */
518   agent = g_weak_ref_get (&priv->agent_ref);
519   if (agent == NULL)
520     return FALSE;
521
522   agent_lock (agent);
523
524   if (!agent_find_component (agent, priv->stream_id, priv->component_id,
525           &_stream, &component)) {
526     g_warning ("Could not find component %u in stream %u", priv->component_id,
527         priv->stream_id);
528     goto done;
529   }
530   if (component->selected_pair.local != NULL) {
531     NiceSocket *sockptr = component->selected_pair.local->sockptr;
532
533     /* If it’s a reliable agent, see if there’s any space in the pseudo-TCP
534      * output buffer. */
535     if (!nice_socket_is_reliable (sockptr)) {
536       retval = pseudo_tcp_socket_can_send (component->tcp);
537     } else {
538       retval = (g_socket_condition_check (sockptr->fileno, G_IO_OUT) != 0);
539     }
540   }
541
542 done:
543   agent_unlock (agent);
544
545   g_object_unref (agent);
546
547   return retval;
548 }
549
550 static gssize
551 nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
552     const void *buffer, gsize count, GError **error)
553 {
554   NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
555   NiceAgent *agent;  /* owned */
556   gint n_sent;
557
558   /* Closed streams are not writeable. */
559   if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) {
560     g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
561         "Stream is closed.");
562     return -1;
563   }
564
565   /* Has the agent disappeared? */
566   agent = g_weak_ref_get (&priv->agent_ref);
567   if (agent == NULL) {
568     g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
569         "Stream is closed due to the NiceAgent being finalised.");
570     return -1;
571   }
572
573   if (count == 0) {
574     n_sent = 0;
575     goto done;
576   }
577
578   n_sent = nice_agent_send (agent, priv->stream_id, priv->component_id,
579       count, buffer);
580
581   if (n_sent == -1)
582     g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
583         g_strerror (EAGAIN));
584
585  done:
586
587   g_object_unref (agent);
588
589   return n_sent;
590 }
591
592 static GSource *
593 nice_output_stream_create_source (GPollableOutputStream *stream,
594     GCancellable *cancellable)
595 {
596   NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
597   GSource *component_source = NULL;
598   NiceComponent *component = NULL;
599   NiceStream *_stream = NULL;
600   NiceAgent *agent;  /* owned */
601
602   component_source = g_pollable_source_new (G_OBJECT (stream));
603
604   if (cancellable) {
605     GSource *cancellable_source = g_cancellable_source_new (cancellable);
606
607     g_source_set_dummy_callback (cancellable_source);
608     g_source_add_child_source (component_source, cancellable_source);
609     g_source_unref (cancellable_source);
610   }
611
612   /* Closed streams cannot have sources. */
613   if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
614     return component_source;
615
616   /* Has the agent disappeared? */
617   agent = g_weak_ref_get (&priv->agent_ref);
618   if (agent == NULL)
619     return component_source;
620
621   agent_lock (agent);
622
623   /* Grab the socket for this component. */
624   if (!agent_find_component (agent, priv->stream_id, priv->component_id,
625           &_stream, &component)) {
626     g_warning ("Could not find component %u in stream %u", priv->component_id,
627         priv->stream_id);
628     goto done;
629   }
630
631    if (component->tcp_writable_cancellable) {
632     GSource *cancellable_source =
633         g_cancellable_source_new (component->tcp_writable_cancellable);
634
635     g_source_set_dummy_callback (cancellable_source);
636     g_source_add_child_source (component_source, cancellable_source);
637     g_source_unref (cancellable_source);
638   }
639
640 done:
641   agent_unlock (agent);
642
643   g_object_unref (agent);
644
645   return component_source;
646 }
647
648 static void
649 streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
650 {
651   NiceOutputStream *self = NICE_OUTPUT_STREAM (user_data);
652   guint i;
653
654   for (i = 0; stream_ids[i] != 0; i++) {
655     if (stream_ids[i] == self->priv->stream_id) {
656       /* The socket has been closed. */
657       g_cancellable_cancel (self->priv->closed_cancellable);
658
659       g_output_stream_close (G_OUTPUT_STREAM (self), NULL, NULL);
660       break;
661     }
662   }
663 }