Bumps documentation to 93% symbol coverage, touching most
[platform/upstream/glib.git] / gio / gsocketinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31 #include <poll.h>
32
33 #include <glib.h>
34 #include <glib/gstdio.h>
35 #include "gioerror.h"
36 #include "gsimpleasyncresult.h"
37 #include "gsocketinputstream.h"
38 #include "gcancellable.h"
39 #include "gasynchelper.h"
40 #include "glibintl.h"
41
42 /**
43  * SECTION:gsocketinputstream
44  * @short_description: Socket Input Stream
45  * @see_also: #GInputStream.
46  *
47  * #GSocketInputStream implements #GInputStream for reading from a socket, 
48  * including asynchronous operations.
49  **/
50
51 G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM);
52
53 struct _GSocketInputStreamPrivate {
54   int fd;
55   gboolean close_fd_at_close;
56 };
57
58 static gssize   g_socket_input_stream_read         (GInputStream         *stream,
59                                                     void                 *buffer,
60                                                     gsize                 count,
61                                                     GCancellable         *cancellable,
62                                                     GError              **error);
63 static gboolean g_socket_input_stream_close        (GInputStream         *stream,
64                                                     GCancellable         *cancellable,
65                                                     GError              **error);
66 static void     g_socket_input_stream_read_async   (GInputStream         *stream,
67                                                     void                 *buffer,
68                                                     gsize                 count,
69                                                     int                   io_priority,
70                                                     GCancellable         *cancellable,
71                                                     GAsyncReadyCallback   callback,
72                                                     gpointer              data);
73 static gssize   g_socket_input_stream_read_finish  (GInputStream         *stream,
74                                                     GAsyncResult         *result,
75                                                     GError              **error);
76 static void     g_socket_input_stream_skip_async   (GInputStream         *stream,
77                                                     gsize                 count,
78                                                     int                   io_priority,
79                                                     GCancellable         *cancellable,
80                                                     GAsyncReadyCallback   callback,
81                                                     gpointer              data);
82 static gssize   g_socket_input_stream_skip_finish  (GInputStream         *stream,
83                                                     GAsyncResult         *result,
84                                                     GError              **error);
85 static void     g_socket_input_stream_close_async  (GInputStream         *stream,
86                                                     int                   io_priority,
87                                                     GCancellable         *cancellable,
88                                                     GAsyncReadyCallback   callback,
89                                                     gpointer              data);
90 static gboolean g_socket_input_stream_close_finish (GInputStream         *stream,
91                                                     GAsyncResult         *result,
92                                                     GError              **error);
93
94 static void
95 g_socket_input_stream_finalize (GObject *object)
96 {
97   GSocketInputStream *stream;
98   
99   stream = G_SOCKET_INPUT_STREAM (object);
100
101   if (G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize)
102     (*G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize) (object);
103 }
104
105 static void
106 g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
107 {
108   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
109   GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
110   
111   g_type_class_add_private (klass, sizeof (GSocketInputStreamPrivate));
112   
113   gobject_class->finalize = g_socket_input_stream_finalize;
114
115   stream_class->read = g_socket_input_stream_read;
116   stream_class->close = g_socket_input_stream_close;
117   stream_class->read_async = g_socket_input_stream_read_async;
118   stream_class->read_finish = g_socket_input_stream_read_finish;
119   if (0)
120     {
121       /* TODO: Implement instead of using fallbacks */
122       stream_class->skip_async = g_socket_input_stream_skip_async;
123       stream_class->skip_finish = g_socket_input_stream_skip_finish;
124     }
125   stream_class->close_async = g_socket_input_stream_close_async;
126   stream_class->close_finish = g_socket_input_stream_close_finish;
127 }
128
129 static void
130 g_socket_input_stream_init (GSocketInputStream *socket)
131 {
132   socket->priv = G_TYPE_INSTANCE_GET_PRIVATE (socket,
133                                               G_TYPE_SOCKET_INPUT_STREAM,
134                                               GSocketInputStreamPrivate);
135 }
136
137 /**
138  * g_socket_input_stream_new:
139  * @fd: socket file descriptor.
140  * @close_fd_at_close: a #gboolean.
141  * 
142  * Creates a new #GSocketInputStream for the given @fd. If @close_fd_at_close
143  * is %TRUE, the socket will be closed when the stream is closed.
144  * 
145  * Returns: a #GSocketInputStream. 
146  **/
147 GInputStream *
148 g_socket_input_stream_new (int fd,
149                            gboolean close_fd_at_close)
150 {
151   GSocketInputStream *stream;
152
153   g_return_val_if_fail (fd != -1, NULL);
154
155   stream = g_object_new (G_TYPE_SOCKET_INPUT_STREAM, NULL);
156
157   stream->priv->fd = fd;
158   stream->priv->close_fd_at_close = close_fd_at_close;
159   
160   return G_INPUT_STREAM (stream);
161 }
162
163 static gssize
164 g_socket_input_stream_read (GInputStream *stream,
165                             void         *buffer,
166                             gsize         count,
167                             GCancellable *cancellable,
168                             GError      **error)
169 {
170   GSocketInputStream *socket_stream;
171   gssize res;
172   struct pollfd poll_fds[2];
173   int poll_ret;
174   int cancel_fd;
175
176   socket_stream = G_SOCKET_INPUT_STREAM (stream);
177
178   cancel_fd = g_cancellable_get_fd (cancellable);
179   if (cancel_fd != -1)
180     {
181       do
182         {
183           poll_fds[0].events = POLLIN;
184           poll_fds[0].fd = socket_stream->priv->fd;
185           poll_fds[1].events = POLLIN;
186           poll_fds[1].fd = cancel_fd;
187           poll_ret = poll (poll_fds, 2, -1);
188         }
189       while (poll_ret == -1 && errno == EINTR);
190       
191       if (poll_ret == -1)
192         {
193           g_set_error (error, G_IO_ERROR,
194                        g_io_error_from_errno (errno),
195                        _("Error reading from socket: %s"),
196                        g_strerror (errno));
197           return -1;
198         }
199     }
200
201   while (1)
202     {
203       if (g_cancellable_set_error_if_cancelled (cancellable, error))
204         break;
205       res = read (socket_stream->priv->fd, buffer, count);
206       if (res == -1)
207         {
208           if (errno == EINTR)
209             continue;
210           
211           g_set_error (error, G_IO_ERROR,
212                        g_io_error_from_errno (errno),
213                        _("Error reading from socket: %s"),
214                        g_strerror (errno));
215         }
216       
217       break;
218     }
219
220   return res;
221 }
222
223 static gboolean
224 g_socket_input_stream_close (GInputStream *stream,
225                              GCancellable *cancellable,
226                              GError      **error)
227 {
228   GSocketInputStream *socket_stream;
229   int res;
230
231   socket_stream = G_SOCKET_INPUT_STREAM (stream);
232
233   if (!socket_stream->priv->close_fd_at_close)
234     return TRUE;
235   
236   while (1)
237     {
238       /* This might block during the close. Doesn't seem to be a way to avoid it though. */
239       res = close (socket_stream->priv->fd);
240       if (res == -1)
241         {
242           g_set_error (error, G_IO_ERROR,
243                        g_io_error_from_errno (errno),
244                        _("Error closing socket: %s"),
245                        g_strerror (errno));
246         }
247       break;
248     }
249   
250   return res != -1;
251 }
252
253 typedef struct {
254   gsize count;
255   void *buffer;
256   GAsyncReadyCallback callback;
257   gpointer user_data;
258   GCancellable *cancellable;
259   GSocketInputStream *stream;
260 } ReadAsyncData;
261
262 static gboolean
263 read_async_cb (ReadAsyncData *data,
264                GIOCondition condition,
265                int fd)
266 {
267   GSimpleAsyncResult *simple;
268   GError *error = NULL;
269   gssize count_read;
270
271   /* We know that we can read from fd once without blocking */
272   while (1)
273     {
274       if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
275         {
276           count_read = -1;
277           break;
278         }
279       count_read = read (data->stream->priv->fd, data->buffer, data->count);
280       if (count_read == -1)
281         {
282           if (errno == EINTR)
283             continue;
284           
285           g_set_error (&error, G_IO_ERROR,
286                        g_io_error_from_errno (errno),
287                        _("Error reading from socket: %s"),
288                        g_strerror (errno));
289         }
290       break;
291     }
292
293   simple = g_simple_async_result_new (G_OBJECT (data->stream),
294                                       data->callback,
295                                       data->user_data,
296                                       g_socket_input_stream_read_async);
297
298   g_simple_async_result_set_op_res_gssize (simple, count_read);
299
300   if (count_read == -1)
301     {
302       g_simple_async_result_set_from_error (simple, error);
303       g_error_free (error);
304     }
305
306   /* Complete immediately, not in idle, since we're already in a mainloop callout */
307   g_simple_async_result_complete (simple);
308   g_object_unref (simple);
309
310   return FALSE;
311 }
312
313 static void
314 g_socket_input_stream_read_async (GInputStream        *stream,
315                                   void                *buffer,
316                                   gsize                count,
317                                   int                  io_priority,
318                                   GCancellable        *cancellable,
319                                   GAsyncReadyCallback  callback,
320                                   gpointer             user_data)
321 {
322   GSource *source;
323   GSocketInputStream *socket_stream;
324   ReadAsyncData *data;
325
326   socket_stream = G_SOCKET_INPUT_STREAM (stream);
327
328   data = g_new0 (ReadAsyncData, 1);
329   data->count = count;
330   data->buffer = buffer;
331   data->callback = callback;
332   data->user_data = user_data;
333   data->cancellable = cancellable;
334   data->stream = socket_stream;
335
336   source = _g_fd_source_new (socket_stream->priv->fd,
337                              POLLIN,
338                              cancellable);
339   
340   g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
341   g_source_attach (source, NULL);
342  
343   g_source_unref (source);
344 }
345
346 static gssize
347 g_socket_input_stream_read_finish (GInputStream              *stream,
348                                    GAsyncResult              *result,
349                                    GError                   **error)
350 {
351   GSimpleAsyncResult *simple;
352   gssize nread;
353
354   simple = G_SIMPLE_ASYNC_RESULT (result);
355   g_assert (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async);
356   
357   nread = g_simple_async_result_get_op_res_gssize (simple);
358   return nread;
359 }
360
361 static void
362 g_socket_input_stream_skip_async (GInputStream        *stream,
363                                   gsize                count,
364                                   int                  io_priority,
365                                   GCancellable        *cancellable,
366                                   GAsyncReadyCallback  callback,
367                                   gpointer             data)
368 {
369   g_assert_not_reached ();
370   /* TODO: Not implemented */
371 }
372
373 static gssize
374 g_socket_input_stream_skip_finish  (GInputStream              *stream,
375                                     GAsyncResult              *result,
376                                     GError                   **error)
377 {
378   g_assert_not_reached ();
379   /* TODO: Not implemented */
380 }
381
382
383 typedef struct {
384   GInputStream *stream;
385   GAsyncReadyCallback callback;
386   gpointer user_data;
387 } CloseAsyncData;
388
389 static void
390 close_async_data_free (gpointer _data)
391 {
392   CloseAsyncData *data = _data;
393
394   g_free (data);
395 }
396
397 static gboolean
398 close_async_cb (CloseAsyncData *data)
399 {
400   GSocketInputStream *socket_stream;
401   GSimpleAsyncResult *simple;
402   GError *error = NULL;
403   gboolean result;
404   int res;
405
406   socket_stream = G_SOCKET_INPUT_STREAM (data->stream);
407
408   if (!socket_stream->priv->close_fd_at_close)
409     {
410       result = TRUE;
411       goto out;
412     }
413   
414   while (1)
415     {
416       res = close (socket_stream->priv->fd);
417       if (res == -1)
418         {
419           g_set_error (&error, G_IO_ERROR,
420                        g_io_error_from_errno (errno),
421                        _("Error closing socket: %s"),
422                        g_strerror (errno));
423         }
424       break;
425     }
426   
427   result = res != -1;
428
429  out:
430   simple = g_simple_async_result_new (G_OBJECT (data->stream),
431                                       data->callback,
432                                       data->user_data,
433                                       g_socket_input_stream_close_async);
434
435   if (!result)
436     {
437       g_simple_async_result_set_from_error (simple, error);
438       g_error_free (error);
439     }
440
441   /* Complete immediately, not in idle, since we're already in a mainloop callout */
442   g_simple_async_result_complete (simple);
443   g_object_unref (simple);
444   
445   return FALSE;
446 }
447
448 static void
449 g_socket_input_stream_close_async (GInputStream       *stream,
450                                    int                 io_priority,
451                                    GCancellable       *cancellable,
452                                    GAsyncReadyCallback callback,
453                                    gpointer            user_data)
454 {
455   GSource *idle;
456   CloseAsyncData *data;
457
458   data = g_new0 (CloseAsyncData, 1);
459
460   data->stream = stream;
461   data->callback = callback;
462   data->user_data = user_data;
463   
464   idle = g_idle_source_new ();
465   g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, close_async_data_free);
466   g_source_attach (idle, NULL);
467   g_source_unref (idle);
468 }
469
470 static gboolean
471 g_socket_input_stream_close_finish (GInputStream              *stream,
472                                     GAsyncResult              *result,
473                                     GError                   **error)
474 {
475   /* Failures handled in generic close_finish code */
476   return TRUE;
477 }
478