gio: more implementations of GFileDescriptorBased
[platform/upstream/glib.git] / gio / gsocketinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright © 2008 Christian Kellner, Samuel Cormier-Iijima
4  *           © 2009 codethink
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Authors: Christian Kellner <gicmo@gnome.org>
22  *          Samuel Cormier-Iijima <sciyoshi@gmail.com>
23  *          Ryan Lortie <desrt@desrt.ca>
24  */
25
26 #include "config.h"
27 #include "gsocketinputstream.h"
28 #include "glibintl.h"
29
30 #include "gsimpleasyncresult.h"
31 #include "gcancellable.h"
32 #include "gpollableinputstream.h"
33 #include "gioerror.h"
34 #include "gfiledescriptorbased.h"
35
36 static void g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
37 #ifdef G_OS_UNIX
38 static void g_socket_input_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface);
39 #endif
40
41 #define g_socket_input_stream_get_type _g_socket_input_stream_get_type
42 G_DEFINE_TYPE_WITH_CODE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM,
43                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, g_socket_input_stream_pollable_iface_init)
44 #ifdef G_OS_UNIX
45
46                          G_IMPLEMENT_INTERFACE (G_TYPE_FILE_DESCRIPTOR_BASED, g_socket_input_stream_file_descriptor_based_iface_init)
47 #endif
48                          )
49
50 enum
51 {
52   PROP_0,
53   PROP_SOCKET
54 };
55
56 struct _GSocketInputStreamPrivate
57 {
58   GSocket *socket;
59
60   /* pending operation metadata */
61   GSimpleAsyncResult *result;
62   GCancellable *cancellable;
63   gpointer buffer;
64   gsize count;
65 };
66
67 static void
68 g_socket_input_stream_get_property (GObject    *object,
69                                     guint       prop_id,
70                                     GValue     *value,
71                                     GParamSpec *pspec)
72 {
73   GSocketInputStream *stream = G_SOCKET_INPUT_STREAM (object);
74
75   switch (prop_id)
76     {
77       case PROP_SOCKET:
78         g_value_set_object (value, stream->priv->socket);
79         break;
80
81       default:
82         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
83     }
84 }
85
86 static void
87 g_socket_input_stream_set_property (GObject      *object,
88                                     guint         prop_id,
89                                     const GValue *value,
90                                     GParamSpec   *pspec)
91 {
92   GSocketInputStream *stream = G_SOCKET_INPUT_STREAM (object);
93
94   switch (prop_id)
95     {
96       case PROP_SOCKET:
97         stream->priv->socket = g_value_dup_object (value);
98         break;
99
100       default:
101         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
102     }
103 }
104
105 static void
106 g_socket_input_stream_finalize (GObject *object)
107 {
108   GSocketInputStream *stream = G_SOCKET_INPUT_STREAM (object);
109
110   if (stream->priv->socket)
111     g_object_unref (stream->priv->socket);
112
113   if (G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize)
114     (*G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize) (object);
115 }
116
117 static gssize
118 g_socket_input_stream_read (GInputStream  *stream,
119                             void          *buffer,
120                             gsize          count,
121                             GCancellable  *cancellable,
122                             GError       **error)
123 {
124   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
125
126   return g_socket_receive_with_blocking (input_stream->priv->socket,
127                                          buffer, count, TRUE,
128                                          cancellable, error);
129 }
130
131 static gboolean
132 g_socket_input_stream_read_ready (GSocket *socket,
133                                   GIOCondition condition,
134                                   GSocketInputStream *stream)
135 {
136   GSimpleAsyncResult *simple;
137   GError *error = NULL;
138   gssize result;
139
140   result = g_socket_receive_with_blocking (stream->priv->socket,
141                                            stream->priv->buffer,
142                                            stream->priv->count,
143                                            FALSE,
144                                            stream->priv->cancellable,
145                                            &error);
146
147   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
148     return TRUE;
149
150   simple = stream->priv->result;
151   stream->priv->result = NULL;
152
153   if (result >= 0)
154     g_simple_async_result_set_op_res_gssize (simple, result);
155
156   if (error)
157     g_simple_async_result_take_error (simple, error);
158
159   if (stream->priv->cancellable)
160     g_object_unref (stream->priv->cancellable);
161
162   g_simple_async_result_complete (simple);
163   g_object_unref (simple);
164
165   return FALSE;
166 }
167
168 static void
169 g_socket_input_stream_read_async (GInputStream        *stream,
170                                   void                *buffer,
171                                   gsize                count,
172                                   gint                 io_priority,
173                                   GCancellable        *cancellable,
174                                   GAsyncReadyCallback  callback,
175                                   gpointer             user_data)
176 {
177   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
178   GSource *source;
179
180   g_assert (input_stream->priv->result == NULL);
181
182   input_stream->priv->result =
183     g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
184                                g_socket_input_stream_read_async);
185   if (cancellable)
186     g_object_ref (cancellable);
187   input_stream->priv->cancellable = cancellable;
188   input_stream->priv->buffer = buffer;
189   input_stream->priv->count = count;
190
191   source = g_socket_create_source (input_stream->priv->socket,
192                                    G_IO_IN | G_IO_HUP | G_IO_ERR,
193                                    cancellable);
194   g_source_set_callback (source,
195                          (GSourceFunc) g_socket_input_stream_read_ready,
196                          g_object_ref (input_stream), g_object_unref);
197   g_source_attach (source, g_main_context_get_thread_default ());
198   g_source_unref (source);
199 }
200
201 static gssize
202 g_socket_input_stream_read_finish (GInputStream  *stream,
203                                    GAsyncResult  *result,
204                                    GError       **error)
205 {
206   GSimpleAsyncResult *simple;
207   gssize count;
208
209   g_return_val_if_fail (G_IS_SOCKET_INPUT_STREAM (stream), -1);
210
211   simple = G_SIMPLE_ASYNC_RESULT (result);
212
213   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async);
214
215   count = g_simple_async_result_get_op_res_gssize (simple);
216
217   return count;
218 }
219
220 static gboolean
221 g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
222 {
223   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
224
225   return g_socket_condition_check (input_stream->priv->socket, G_IO_IN);
226 }
227
228 static GSource *
229 g_socket_input_stream_pollable_create_source (GPollableInputStream *pollable,
230                                               GCancellable         *cancellable)
231 {
232   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
233   GSource *socket_source, *pollable_source;
234
235   pollable_source = g_pollable_source_new (G_OBJECT (input_stream));
236   socket_source = g_socket_create_source (input_stream->priv->socket,
237                                           G_IO_IN, cancellable);
238   g_source_set_dummy_callback (socket_source);
239   g_source_add_child_source (pollable_source, socket_source);
240   g_source_unref (socket_source);
241
242   return pollable_source;
243 }
244
245 static gssize
246 g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream  *pollable,
247                                                  void                  *buffer,
248                                                  gsize                  size,
249                                                  GError               **error)
250 {
251   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
252
253   return g_socket_receive_with_blocking (input_stream->priv->socket,
254                                          buffer, size, FALSE,
255                                          NULL, error);
256 }
257
258 #ifdef G_OS_UNIX
259 static int
260 g_socket_input_stream_get_fd (GFileDescriptorBased *fd_based)
261 {
262   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (fd_based);
263
264   return g_socket_get_fd (input_stream->priv->socket);
265 }
266 #endif
267
268 static void
269 g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
270 {
271   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
272   GInputStreamClass *ginputstream_class = G_INPUT_STREAM_CLASS (klass);
273
274   g_type_class_add_private (klass, sizeof (GSocketInputStreamPrivate));
275
276   gobject_class->finalize = g_socket_input_stream_finalize;
277   gobject_class->get_property = g_socket_input_stream_get_property;
278   gobject_class->set_property = g_socket_input_stream_set_property;
279
280   ginputstream_class->read_fn = g_socket_input_stream_read;
281   ginputstream_class->read_async = g_socket_input_stream_read_async;
282   ginputstream_class->read_finish = g_socket_input_stream_read_finish;
283
284   g_object_class_install_property (gobject_class, PROP_SOCKET,
285                                    g_param_spec_object ("socket",
286                                                         P_("socket"),
287                                                         P_("The socket that this stream wraps"),
288                                                         G_TYPE_SOCKET, G_PARAM_CONSTRUCT_ONLY |
289                                                         G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
290 }
291
292 #ifdef G_OS_UNIX
293 static void
294 g_socket_input_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface)
295 {
296   iface->get_fd = g_socket_input_stream_get_fd;
297 }
298 #endif
299
300 static void
301 g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
302 {
303   iface->is_readable = g_socket_input_stream_pollable_is_readable;
304   iface->create_source = g_socket_input_stream_pollable_create_source;
305   iface->read_nonblocking = g_socket_input_stream_pollable_read_nonblocking;
306 }
307
308 static void
309 g_socket_input_stream_init (GSocketInputStream *stream)
310 {
311   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_INPUT_STREAM, GSocketInputStreamPrivate);
312 }
313
314 GSocketInputStream *
315 _g_socket_input_stream_new (GSocket *socket)
316 {
317   return G_SOCKET_INPUT_STREAM (g_object_new (G_TYPE_SOCKET_INPUT_STREAM, "socket", socket, NULL));
318 }