Merge remote-tracking branch 'gvdb/master'
[platform/upstream/glib.git] / gio / gsocketinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright © 2008 Christian Kellner, Samuel Cormier-Iijima
4  *           © 2009 codethink
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Authors: Christian Kellner <gicmo@gnome.org>
22  *          Samuel Cormier-Iijima <sciyoshi@gmail.com>
23  *          Ryan Lortie <desrt@desrt.ca>
24  */
25
26 #include "config.h"
27 #include "gsocketinputstream.h"
28 #include "glibintl.h"
29
30 #include "gsimpleasyncresult.h"
31 #include "gcancellable.h"
32 #include "gpollableinputstream.h"
33 #include "gioerror.h"
34
35
36 static void g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
37
38 #define g_socket_input_stream_get_type _g_socket_input_stream_get_type
39 G_DEFINE_TYPE_WITH_CODE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM,
40                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, g_socket_input_stream_pollable_iface_init)
41                          )
42
43 enum
44 {
45   PROP_0,
46   PROP_SOCKET
47 };
48
49 struct _GSocketInputStreamPrivate
50 {
51   GSocket *socket;
52
53   /* pending operation metadata */
54   GSimpleAsyncResult *result;
55   GCancellable *cancellable;
56   gpointer buffer;
57   gsize count;
58 };
59
60 static void
61 g_socket_input_stream_get_property (GObject    *object,
62                                     guint       prop_id,
63                                     GValue     *value,
64                                     GParamSpec *pspec)
65 {
66   GSocketInputStream *stream = G_SOCKET_INPUT_STREAM (object);
67
68   switch (prop_id)
69     {
70       case PROP_SOCKET:
71         g_value_set_object (value, stream->priv->socket);
72         break;
73
74       default:
75         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
76     }
77 }
78
79 static void
80 g_socket_input_stream_set_property (GObject      *object,
81                                     guint         prop_id,
82                                     const GValue *value,
83                                     GParamSpec   *pspec)
84 {
85   GSocketInputStream *stream = G_SOCKET_INPUT_STREAM (object);
86
87   switch (prop_id)
88     {
89       case PROP_SOCKET:
90         stream->priv->socket = g_value_dup_object (value);
91         break;
92
93       default:
94         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
95     }
96 }
97
98 static void
99 g_socket_input_stream_finalize (GObject *object)
100 {
101   GSocketInputStream *stream = G_SOCKET_INPUT_STREAM (object);
102
103   if (stream->priv->socket)
104     g_object_unref (stream->priv->socket);
105
106   if (G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize)
107     (*G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize) (object);
108 }
109
110 static gssize
111 g_socket_input_stream_read (GInputStream  *stream,
112                             void          *buffer,
113                             gsize          count,
114                             GCancellable  *cancellable,
115                             GError       **error)
116 {
117   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
118
119   return g_socket_receive_with_blocking (input_stream->priv->socket,
120                                          buffer, count, TRUE,
121                                          cancellable, error);
122 }
123
124 static gboolean
125 g_socket_input_stream_read_ready (GSocket *socket,
126                                   GIOCondition condition,
127                                   GSocketInputStream *stream)
128 {
129   GSimpleAsyncResult *simple;
130   GError *error = NULL;
131   gssize result;
132
133   result = g_socket_receive_with_blocking (stream->priv->socket,
134                                            stream->priv->buffer,
135                                            stream->priv->count,
136                                            FALSE,
137                                            stream->priv->cancellable,
138                                            &error);
139
140   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
141     return TRUE;
142
143   simple = stream->priv->result;
144   stream->priv->result = NULL;
145
146   if (result >= 0)
147     g_simple_async_result_set_op_res_gssize (simple, result);
148
149   if (error)
150     g_simple_async_result_take_error (simple, error);
151
152   if (stream->priv->cancellable)
153     g_object_unref (stream->priv->cancellable);
154
155   g_simple_async_result_complete (simple);
156   g_object_unref (simple);
157
158   return FALSE;
159 }
160
161 static void
162 g_socket_input_stream_read_async (GInputStream        *stream,
163                                   void                *buffer,
164                                   gsize                count,
165                                   gint                 io_priority,
166                                   GCancellable        *cancellable,
167                                   GAsyncReadyCallback  callback,
168                                   gpointer             user_data)
169 {
170   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
171   GSource *source;
172
173   g_assert (input_stream->priv->result == NULL);
174
175   input_stream->priv->result =
176     g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
177                                g_socket_input_stream_read_async);
178   if (cancellable)
179     g_object_ref (cancellable);
180   input_stream->priv->cancellable = cancellable;
181   input_stream->priv->buffer = buffer;
182   input_stream->priv->count = count;
183
184   source = g_socket_create_source (input_stream->priv->socket,
185                                    G_IO_IN | G_IO_HUP | G_IO_ERR,
186                                    cancellable);
187   g_source_set_callback (source,
188                          (GSourceFunc) g_socket_input_stream_read_ready,
189                          g_object_ref (input_stream), g_object_unref);
190   g_source_attach (source, g_main_context_get_thread_default ());
191   g_source_unref (source);
192 }
193
194 static gssize
195 g_socket_input_stream_read_finish (GInputStream  *stream,
196                                    GAsyncResult  *result,
197                                    GError       **error)
198 {
199   GSimpleAsyncResult *simple;
200   gssize count;
201
202   g_return_val_if_fail (G_IS_SOCKET_INPUT_STREAM (stream), -1);
203
204   simple = G_SIMPLE_ASYNC_RESULT (result);
205
206   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async);
207
208   count = g_simple_async_result_get_op_res_gssize (simple);
209
210   return count;
211 }
212
213 static gboolean
214 g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
215 {
216   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
217
218   return g_socket_condition_check (input_stream->priv->socket, G_IO_IN);
219 }
220
221 static GSource *
222 g_socket_input_stream_pollable_create_source (GPollableInputStream *pollable,
223                                               GCancellable         *cancellable)
224 {
225   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
226   GSource *socket_source, *pollable_source;
227
228   pollable_source = g_pollable_source_new (G_OBJECT (input_stream));
229   socket_source = g_socket_create_source (input_stream->priv->socket,
230                                           G_IO_IN, cancellable);
231   g_source_set_dummy_callback (socket_source);
232   g_source_add_child_source (pollable_source, socket_source);
233   g_source_unref (socket_source);
234
235   return pollable_source;
236 }
237
238 static gssize
239 g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream  *pollable,
240                                                  void                  *buffer,
241                                                  gsize                  size,
242                                                  GError               **error)
243 {
244   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
245
246   return g_socket_receive_with_blocking (input_stream->priv->socket,
247                                          buffer, size, FALSE,
248                                          NULL, error);
249 }
250
251 static void
252 g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
253 {
254   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
255   GInputStreamClass *ginputstream_class = G_INPUT_STREAM_CLASS (klass);
256
257   g_type_class_add_private (klass, sizeof (GSocketInputStreamPrivate));
258
259   gobject_class->finalize = g_socket_input_stream_finalize;
260   gobject_class->get_property = g_socket_input_stream_get_property;
261   gobject_class->set_property = g_socket_input_stream_set_property;
262
263   ginputstream_class->read_fn = g_socket_input_stream_read;
264   ginputstream_class->read_async = g_socket_input_stream_read_async;
265   ginputstream_class->read_finish = g_socket_input_stream_read_finish;
266
267   g_object_class_install_property (gobject_class, PROP_SOCKET,
268                                    g_param_spec_object ("socket",
269                                                         P_("socket"),
270                                                         P_("The socket that this stream wraps"),
271                                                         G_TYPE_SOCKET, G_PARAM_CONSTRUCT_ONLY |
272                                                         G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273 }
274
275 static void
276 g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
277 {
278   iface->is_readable = g_socket_input_stream_pollable_is_readable;
279   iface->create_source = g_socket_input_stream_pollable_create_source;
280   iface->read_nonblocking = g_socket_input_stream_pollable_read_nonblocking;
281 }
282
283 static void
284 g_socket_input_stream_init (GSocketInputStream *stream)
285 {
286   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_INPUT_STREAM, GSocketInputStreamPrivate);
287 }
288
289 GSocketInputStream *
290 _g_socket_input_stream_new (GSocket *socket)
291 {
292   return G_SOCKET_INPUT_STREAM (g_object_new (G_TYPE_SOCKET_INPUT_STREAM, "socket", socket, NULL));
293 }