2 * This file is part of the Nice GLib ICE library.
4 * (C) 2010, 2013 Collabora Ltd.
5 * Contact: Youness Alaoui
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
17 * The Original Code is the Nice GLib ICE library.
19 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
20 * Corporation. All Rights Reserved.
23 * Youness Alaoui, Collabora Ltd.
24 * Philip Withnall, Collabora Ltd.
26 * Alternatively, the contents of this file may be used under the terms of the
27 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
28 * case the provisions of LGPL are applicable instead of those above. If you
29 * wish to allow use of your version of this file only under the terms of the
30 * LGPL and not to allow others to use your version of this file under the
31 * MPL, indicate your decision by deleting the provisions above and replace
32 * them with the notice and other provisions required by the LGPL. If you do
33 * not delete the provisions above, a recipient may use your version of this
34 * file under either the MPL or the LGPL.
38 * SECTION:nice_output_stream
39 * @short_description: #GOutputStream implementation for libnice
40 * @see_also: #NiceAgent
41 * @include: outputstream.h
44 * #NiceOutputStream is a #GOutputStream wrapper for a single reliable stream
45 * and component of a #NiceAgent. Given an existing reliable #NiceAgent, plus
46 * the IDs of an existing stream and component in the agent, it will provide a
47 * streaming output interface for writing to the given component.
49 * A single #NiceOutputStream can only be used with a single agent, stream and
50 * component triple, and will be closed as soon as that stream is removed from
51 * the agent (e.g. if nice_agent_remove_stream() is called from another thread).
52 * If g_output_stream_close() is called on a #NiceOutputStream, the output
53 * stream and underlying #NiceAgent stream will be closed, but the underlying
54 * stream will not be removed. Use nice_agent_remove_stream() to do that.
56 * The output stream can only be used once the
57 * #NiceAgent::reliable-transport-writable signal has been received for the
58 * stream/component pair. Any calls to g_output_stream_write() before then will
59 * return %G_IO_ERROR_BROKEN_PIPE.
70 #include "outputstream.h"
71 #include "agent-priv.h"
73 static void nice_output_stream_init_pollable (
74 GPollableOutputStreamInterface *iface);
75 static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
78 G_DEFINE_TYPE_WITH_CODE (NiceOutputStream,
79 nice_output_stream, G_TYPE_OUTPUT_STREAM,
80 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
81 nice_output_stream_init_pollable));
90 struct _NiceOutputStreamPrivate
92 GWeakRef/*<NiceAgent>*/ agent_ref;
96 GCancellable *closed_cancellable;
99 static void nice_output_stream_dispose (GObject *object);
100 static void nice_output_stream_get_property (GObject *object, guint prop_id,
101 GValue *value, GParamSpec *pspec);
102 static void nice_output_stream_set_property (GObject *object, guint prop_id,
103 const GValue *value, GParamSpec *pspec);
105 static gssize nice_output_stream_write (GOutputStream *stream,
106 const void *buffer, gsize count, GCancellable *cancellable, GError **error);
107 static gboolean nice_output_stream_close (GOutputStream *stream,
108 GCancellable *cancellable, GError **error);
110 static gboolean nice_output_stream_is_writable (GPollableOutputStream *stream);
111 static gssize nice_output_stream_write_nonblocking (
112 GPollableOutputStream *stream, const void *buffer, gsize count,
114 static GSource *nice_output_stream_create_source (GPollableOutputStream *stream,
115 GCancellable *cancellable);
119 nice_output_stream_class_init (NiceOutputStreamClass *klass)
121 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
122 GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
124 g_type_class_add_private (klass, sizeof (NiceOutputStreamPrivate));
126 stream_class->write_fn = nice_output_stream_write;
127 stream_class->close_fn = nice_output_stream_close;
129 gobject_class->set_property = nice_output_stream_set_property;
130 gobject_class->get_property = nice_output_stream_get_property;
131 gobject_class->dispose = nice_output_stream_dispose;
134 * NiceOutputStream:agent:
136 * The #NiceAgent to wrap with an output stream. This must be an existing
139 * A reference is not held on the #NiceAgent. If the agent is destroyed before
140 * the #NiceOutputStream, %G_IO_ERROR_CLOSED will be returned for all
141 * subsequent operations on the stream.
145 g_object_class_install_property (gobject_class, PROP_AGENT,
146 g_param_spec_object ("agent",
148 "The underlying NiceAgent",
150 G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
153 * NiceOutputStream:stream-id:
155 * ID of the stream to use in the #NiceOutputStream:agent.
159 g_object_class_install_property (gobject_class, PROP_STREAM_ID,
163 "The ID of the agent’s stream to wrap.",
166 G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
169 * NiceOutputStream:component-id:
171 * ID of the component to use in the #NiceOutputStream:agent.
175 g_object_class_install_property (gobject_class, PROP_COMPONENT_ID,
178 "Agent’s component ID",
179 "The ID of the agent’s component to wrap.",
182 G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
186 nice_output_stream_dispose (GObject *object)
188 NiceOutputStream *self = NICE_OUTPUT_STREAM (object);
191 /* Ensure the stream is closed first, otherwise the agent can’t be found in
192 * the close handler called by the parent implementation. */
193 if (!g_output_stream_is_closed (G_OUTPUT_STREAM (object)))
194 g_output_stream_close (G_OUTPUT_STREAM (object), NULL, NULL);
196 agent = g_weak_ref_get (&self->priv->agent_ref);
198 g_signal_handlers_disconnect_by_func (agent, streams_removed_cb, self);
199 g_object_unref (agent);
202 g_weak_ref_clear (&self->priv->agent_ref);
204 g_clear_object (&self->priv->closed_cancellable);
206 G_OBJECT_CLASS (nice_output_stream_parent_class)->dispose (object);
210 nice_output_stream_get_property (GObject *object, guint prop_id,
211 GValue *value, GParamSpec *pspec)
213 NiceOutputStream *self = NICE_OUTPUT_STREAM (object);
217 g_value_take_object (value, g_weak_ref_get (&self->priv->agent_ref));
220 g_value_set_uint (value, self->priv->stream_id);
222 case PROP_COMPONENT_ID:
223 g_value_set_uint (value, self->priv->component_id);
226 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
231 nice_output_stream_set_property (GObject *object, guint prop_id,
232 const GValue *value, GParamSpec *pspec)
234 NiceOutputStream *self = NICE_OUTPUT_STREAM (object);
238 /* Construct only. */
239 NiceAgent *agent = g_value_dup_object (value);
240 g_weak_ref_set (&self->priv->agent_ref, agent);
242 /* agent may be NULL if the stream is being constructed by
243 * nice_io_stream_get_output_stream() after the NiceIOStream’s agent has
244 * already been finalised. */
246 g_signal_connect (agent, "streams-removed",
247 (GCallback) streams_removed_cb, self);
248 g_object_unref (agent);
254 /* Construct only. */
255 self->priv->stream_id = g_value_get_uint (value);
257 case PROP_COMPONENT_ID:
258 /* Construct only. */
259 self->priv->component_id = g_value_get_uint (value);
262 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
267 nice_output_stream_init (NiceOutputStream *stream)
269 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, NICE_TYPE_OUTPUT_STREAM,
270 NiceOutputStreamPrivate);
272 g_weak_ref_init (&stream->priv->agent_ref, NULL);
273 stream->priv->closed_cancellable = g_cancellable_new ();
277 nice_output_stream_init_pollable (GPollableOutputStreamInterface *iface)
279 iface->is_writable = nice_output_stream_is_writable;
280 iface->write_nonblocking = nice_output_stream_write_nonblocking;
281 iface->create_source = nice_output_stream_create_source;
285 * nice_output_stream_new:
286 * @agent: A #NiceAgent
287 * @stream_id: The ID of the agent’s stream to wrap
288 * @component_id: The ID of the agent’s component to wrap
290 * Create a new #NiceOutputStream wrapping the given stream/component from
291 * @agent, which must be a reliable #NiceAgent.
293 * The constructed #NiceOutputStream will not hold a reference to @agent. If
294 * @agent is destroyed before the output stream, %G_IO_ERROR_CLOSED will be
295 * returned for all subsequent operations on the stream.
297 * Returns: The new #NiceOutputStream object
302 nice_output_stream_new (NiceAgent *agent, guint stream_id, guint component_id)
304 g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
305 g_return_val_if_fail (stream_id >= 1, NULL);
306 g_return_val_if_fail (component_id >= 1, NULL);
308 return g_object_new (NICE_TYPE_OUTPUT_STREAM,
310 "stream-id", stream_id,
311 "component-id", component_id,
316 volatile gint ref_count;
326 write_data_ref (WriteData *write_data)
328 g_atomic_int_inc (&write_data->ref_count);
333 write_data_unref (WriteData *write_data)
335 if (g_atomic_int_dec_and_test (&write_data->ref_count)) {
336 g_cond_clear (&write_data->cond);
337 g_mutex_clear (&write_data->mutex);
338 g_slice_free (WriteData, write_data);
343 write_cancelled_cb (GCancellable *cancellable, gpointer user_data)
345 WriteData *write_data = user_data;
347 g_mutex_lock (&write_data->mutex);
348 g_cond_broadcast (&write_data->cond);
349 write_data->cancelled = TRUE;
350 g_mutex_unlock (&write_data->mutex);
354 reliable_transport_writeable_cb (NiceAgent *agent, guint stream_id,
355 guint component_id, gpointer user_data)
357 WriteData *write_data = user_data;
359 g_mutex_lock (&write_data->mutex);
360 write_data->writable = TRUE;
361 g_cond_broadcast (&write_data->cond);
362 g_mutex_unlock (&write_data->mutex);
366 nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count,
367 GCancellable *cancellable, GError **error)
369 NiceOutputStream *self = NICE_OUTPUT_STREAM (stream);
370 const gchar* buf = buffer;
373 NiceAgent *agent = NULL; /* owned */
374 gulong cancel_id = 0, closed_cancel_id, writeable_id;
375 WriteData *write_data;
377 /* Closed streams are not writeable. */
378 if (g_output_stream_is_closed (stream)) {
379 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
380 "Stream is closed.");
384 /* Has the agent disappeared? */
385 agent = g_weak_ref_get (&self->priv->agent_ref);
387 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
388 "Stream is closed due to the NiceAgent being finalised.");
393 g_object_unref (agent);
397 /* FIXME: nice_agent_send() is non-blocking, which is a bit unexpected
398 * since nice_agent_recv() is blocking. Currently this uses a fairly dodgy
399 * GCond solution; would be much better for nice_agent_send() to block
400 * properly in the main loop. */
401 write_data = g_slice_new0 (WriteData);
402 write_data->ref_count = 1;
403 g_mutex_init (&write_data->mutex);
404 g_cond_init (&write_data->cond);
406 if (cancellable != NULL) {
407 cancel_id = g_cancellable_connect (cancellable,
408 (GCallback) write_cancelled_cb, write_data_ref (write_data),
409 (GDestroyNotify) write_data_unref);
412 closed_cancel_id = g_cancellable_connect (self->priv->closed_cancellable,
413 (GCallback) write_cancelled_cb, write_data_ref (write_data),
414 (GDestroyNotify) write_data_unref);
416 g_mutex_lock (&write_data->mutex);
418 writeable_id = g_signal_connect_data (G_OBJECT (agent),
419 "reliable-transport-writable",
420 (GCallback) reliable_transport_writeable_cb, write_data_ref (write_data),
421 (GClosureNotify) G_CALLBACK (write_data_unref), 0);
425 /* Have to unlock while calling into the agent because
426 * it will take the agent lock which will cause a deadlock if one of
427 * the callbacks is called.
429 if (g_cancellable_is_cancelled (cancellable) ||
430 g_cancellable_is_cancelled (self->priv->closed_cancellable))
433 write_data->writable = FALSE;
434 g_mutex_unlock (&write_data->mutex);
436 n_sent = nice_agent_send (agent, self->priv->stream_id,
437 self->priv->component_id, count - len, buf + len);
439 g_mutex_lock (&write_data->mutex);
442 if (!write_data->writable && !write_data->cancelled)
443 g_cond_wait (&write_data->cond, &write_data->mutex);
444 } else if (n_sent > 0) {
448 } while ((gsize) len < count);
450 g_signal_handler_disconnect (G_OBJECT (agent), writeable_id);
451 g_mutex_unlock (&write_data->mutex);
454 g_cancellable_disconnect (cancellable, cancel_id);
455 g_cancellable_disconnect (self->priv->closed_cancellable, closed_cancel_id);
459 if (!g_cancellable_set_error_if_cancelled (cancellable, error)) {
460 if (g_cancellable_is_cancelled (self->priv->closed_cancellable))
461 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
462 "Stream has been removed from agent");
466 write_data_unref (write_data);
468 g_object_unref (agent);
469 g_assert_cmpint (len, !=, 0);
475 nice_output_stream_close (GOutputStream *stream, GCancellable *cancellable,
478 NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
479 NiceComponent *component = NULL;
480 NiceStream *_stream = NULL;
481 NiceAgent *agent; /* owned */
483 /* Has the agent disappeared? */
484 agent = g_weak_ref_get (&priv->agent_ref);
490 /* Shut down the write side of the pseudo-TCP stream. */
491 if (agent_find_component (agent, priv->stream_id, priv->component_id,
492 &_stream, &component) && agent->reliable &&
493 !pseudo_tcp_socket_is_closed (component->tcp)) {
494 pseudo_tcp_socket_shutdown (component->tcp, PSEUDO_TCP_SHUTDOWN_WR);
497 agent_unlock (agent);
499 g_object_unref (agent);
505 nice_output_stream_is_writable (GPollableOutputStream *stream)
507 NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
508 NiceComponent *component = NULL;
509 NiceStream *_stream = NULL;
510 gboolean retval = FALSE;
511 NiceAgent *agent; /* owned */
513 /* Closed streams are not writeable. */
514 if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
517 /* Has the agent disappeared? */
518 agent = g_weak_ref_get (&priv->agent_ref);
524 if (!agent_find_component (agent, priv->stream_id, priv->component_id,
525 &_stream, &component)) {
526 g_warning ("Could not find component %u in stream %u", priv->component_id,
530 if (component->selected_pair.local != NULL) {
531 NiceSocket *sockptr = component->selected_pair.local->sockptr;
533 /* If it’s a reliable agent, see if there’s any space in the pseudo-TCP
535 if (!nice_socket_is_reliable (sockptr)) {
536 retval = pseudo_tcp_socket_can_send (component->tcp);
538 retval = (g_socket_condition_check (sockptr->fileno, G_IO_OUT) != 0);
543 agent_unlock (agent);
545 g_object_unref (agent);
551 nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
552 const void *buffer, gsize count, GError **error)
554 NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
555 NiceAgent *agent; /* owned */
558 /* Closed streams are not writeable. */
559 if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) {
560 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
561 "Stream is closed.");
565 /* Has the agent disappeared? */
566 agent = g_weak_ref_get (&priv->agent_ref);
568 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
569 "Stream is closed due to the NiceAgent being finalised.");
578 n_sent = nice_agent_send (agent, priv->stream_id, priv->component_id,
582 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
583 g_strerror (EAGAIN));
587 g_object_unref (agent);
593 nice_output_stream_create_source (GPollableOutputStream *stream,
594 GCancellable *cancellable)
596 NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
597 GSource *component_source = NULL;
598 NiceComponent *component = NULL;
599 NiceStream *_stream = NULL;
600 NiceAgent *agent; /* owned */
602 component_source = g_pollable_source_new (G_OBJECT (stream));
605 GSource *cancellable_source = g_cancellable_source_new (cancellable);
607 g_source_set_dummy_callback (cancellable_source);
608 g_source_add_child_source (component_source, cancellable_source);
609 g_source_unref (cancellable_source);
612 /* Closed streams cannot have sources. */
613 if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
614 return component_source;
616 /* Has the agent disappeared? */
617 agent = g_weak_ref_get (&priv->agent_ref);
619 return component_source;
623 /* Grab the socket for this component. */
624 if (!agent_find_component (agent, priv->stream_id, priv->component_id,
625 &_stream, &component)) {
626 g_warning ("Could not find component %u in stream %u", priv->component_id,
631 if (component->tcp_writable_cancellable) {
632 GSource *cancellable_source =
633 g_cancellable_source_new (component->tcp_writable_cancellable);
635 g_source_set_dummy_callback (cancellable_source);
636 g_source_add_child_source (component_source, cancellable_source);
637 g_source_unref (cancellable_source);
641 agent_unlock (agent);
643 g_object_unref (agent);
645 return component_source;
649 streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
651 NiceOutputStream *self = NICE_OUTPUT_STREAM (user_data);
654 for (i = 0; stream_ids[i] != 0; i++) {
655 if (stream_ids[i] == self->priv->stream_id) {
656 /* The socket has been closed. */
657 g_cancellable_cancel (self->priv->closed_cancellable);
659 g_output_stream_close (G_OUTPUT_STREAM (self), NULL, NULL);