gio/ docs/reference/gio Merged gio-standalone into glib.
[platform/upstream/glib.git] / gio / gsocketinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31 #include <poll.h>
32
33 #include <glib.h>
34 #include <glib/gstdio.h>
35 #include "gioerror.h"
36 #include "gsimpleasyncresult.h"
37 #include "gsocketinputstream.h"
38 #include "gcancellable.h"
39 #include "gasynchelper.h"
40
41 #include "glibintl.h"
42
43 G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM);
44
45 struct _GSocketInputStreamPrivate {
46   int fd;
47   gboolean close_fd_at_close;
48 };
49
50 static gssize   g_socket_input_stream_read         (GInputStream         *stream,
51                                                     void                 *buffer,
52                                                     gsize                 count,
53                                                     GCancellable         *cancellable,
54                                                     GError              **error);
55 static gboolean g_socket_input_stream_close        (GInputStream         *stream,
56                                                     GCancellable         *cancellable,
57                                                     GError              **error);
58 static void     g_socket_input_stream_read_async   (GInputStream         *stream,
59                                                     void                 *buffer,
60                                                     gsize                 count,
61                                                     int                   io_priority,
62                                                     GCancellable         *cancellable,
63                                                     GAsyncReadyCallback   callback,
64                                                     gpointer              data);
65 static gssize   g_socket_input_stream_read_finish  (GInputStream         *stream,
66                                                     GAsyncResult         *result,
67                                                     GError              **error);
68 static void     g_socket_input_stream_skip_async   (GInputStream         *stream,
69                                                     gsize                 count,
70                                                     int                   io_priority,
71                                                     GCancellable         *cancellable,
72                                                     GAsyncReadyCallback   callback,
73                                                     gpointer              data);
74 static gssize   g_socket_input_stream_skip_finish  (GInputStream         *stream,
75                                                     GAsyncResult         *result,
76                                                     GError              **error);
77 static void     g_socket_input_stream_close_async  (GInputStream         *stream,
78                                                     int                   io_priority,
79                                                     GCancellable         *cancellable,
80                                                     GAsyncReadyCallback   callback,
81                                                     gpointer              data);
82 static gboolean g_socket_input_stream_close_finish (GInputStream         *stream,
83                                                     GAsyncResult         *result,
84                                                     GError              **error);
85
86 static void
87 g_socket_input_stream_finalize (GObject *object)
88 {
89   GSocketInputStream *stream;
90   
91   stream = G_SOCKET_INPUT_STREAM (object);
92
93   if (G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize)
94     (*G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize) (object);
95 }
96
97 static void
98 g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
99 {
100   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
101   GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
102   
103   g_type_class_add_private (klass, sizeof (GSocketInputStreamPrivate));
104   
105   gobject_class->finalize = g_socket_input_stream_finalize;
106
107   stream_class->read = g_socket_input_stream_read;
108   stream_class->close = g_socket_input_stream_close;
109   stream_class->read_async = g_socket_input_stream_read_async;
110   stream_class->read_finish = g_socket_input_stream_read_finish;
111   if (0)
112     {
113       /* TODO: Implement instead of using fallbacks */
114       stream_class->skip_async = g_socket_input_stream_skip_async;
115       stream_class->skip_finish = g_socket_input_stream_skip_finish;
116     }
117   stream_class->close_async = g_socket_input_stream_close_async;
118   stream_class->close_finish = g_socket_input_stream_close_finish;
119 }
120
121 static void
122 g_socket_input_stream_init (GSocketInputStream *socket)
123 {
124   socket->priv = G_TYPE_INSTANCE_GET_PRIVATE (socket,
125                                               G_TYPE_SOCKET_INPUT_STREAM,
126                                               GSocketInputStreamPrivate);
127 }
128
129 /**
130  * g_socket_input_stream_new:
131  * @fd: file descriptor.
132  * @close_fd_at_close: boolean value
133  * 
134  * 
135  * Returns: new #GInputStream. If @close_fd_at_close is %TRUE, 
136  * @fd will be closed when the #GInputStream is closed.
137  **/
138 GInputStream *
139 g_socket_input_stream_new (int fd,
140                            gboolean close_fd_at_close)
141 {
142   GSocketInputStream *stream;
143
144   g_return_val_if_fail (fd != -1, NULL);
145
146   stream = g_object_new (G_TYPE_SOCKET_INPUT_STREAM, NULL);
147
148   stream->priv->fd = fd;
149   stream->priv->close_fd_at_close = close_fd_at_close;
150   
151   return G_INPUT_STREAM (stream);
152 }
153
154 static gssize
155 g_socket_input_stream_read (GInputStream *stream,
156                             void         *buffer,
157                             gsize         count,
158                             GCancellable *cancellable,
159                             GError      **error)
160 {
161   GSocketInputStream *socket_stream;
162   gssize res;
163   struct pollfd poll_fds[2];
164   int poll_ret;
165   int cancel_fd;
166
167   socket_stream = G_SOCKET_INPUT_STREAM (stream);
168
169   cancel_fd = g_cancellable_get_fd (cancellable);
170   if (cancel_fd != -1)
171     {
172       do
173         {
174           poll_fds[0].events = POLLIN;
175           poll_fds[0].fd = socket_stream->priv->fd;
176           poll_fds[1].events = POLLIN;
177           poll_fds[1].fd = cancel_fd;
178           poll_ret = poll (poll_fds, 2, -1);
179         }
180       while (poll_ret == -1 && errno == EINTR);
181       
182       if (poll_ret == -1)
183         {
184           g_set_error (error, G_IO_ERROR,
185                        g_io_error_from_errno (errno),
186                        _("Error reading from socket: %s"),
187                        g_strerror (errno));
188           return -1;
189         }
190     }
191
192   while (1)
193     {
194       if (g_cancellable_set_error_if_cancelled (cancellable, error))
195         break;
196       res = read (socket_stream->priv->fd, buffer, count);
197       if (res == -1)
198         {
199           if (errno == EINTR)
200             continue;
201           
202           g_set_error (error, G_IO_ERROR,
203                        g_io_error_from_errno (errno),
204                        _("Error reading from socket: %s"),
205                        g_strerror (errno));
206         }
207       
208       break;
209     }
210
211   return res;
212 }
213
214 static gboolean
215 g_socket_input_stream_close (GInputStream *stream,
216                              GCancellable *cancellable,
217                              GError      **error)
218 {
219   GSocketInputStream *socket_stream;
220   int res;
221
222   socket_stream = G_SOCKET_INPUT_STREAM (stream);
223
224   if (!socket_stream->priv->close_fd_at_close)
225     return TRUE;
226   
227   while (1)
228     {
229       /* This might block during the close. Doesn't seem to be a way to avoid it though. */
230       res = close (socket_stream->priv->fd);
231       if (res == -1)
232         {
233           g_set_error (error, G_IO_ERROR,
234                        g_io_error_from_errno (errno),
235                        _("Error closing socket: %s"),
236                        g_strerror (errno));
237         }
238       break;
239     }
240   
241   return res != -1;
242 }
243
244 typedef struct {
245   gsize count;
246   void *buffer;
247   GAsyncReadyCallback callback;
248   gpointer user_data;
249   GCancellable *cancellable;
250   GSocketInputStream *stream;
251 } ReadAsyncData;
252
253 static gboolean
254 read_async_cb (ReadAsyncData *data,
255                GIOCondition condition,
256                int fd)
257 {
258   GSimpleAsyncResult *simple;
259   GError *error = NULL;
260   gssize count_read;
261
262   /* We know that we can read from fd once without blocking */
263   while (1)
264     {
265       if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
266         {
267           count_read = -1;
268           break;
269         }
270       count_read = read (data->stream->priv->fd, data->buffer, data->count);
271       if (count_read == -1)
272         {
273           if (errno == EINTR)
274             continue;
275           
276           g_set_error (&error, G_IO_ERROR,
277                        g_io_error_from_errno (errno),
278                        _("Error reading from socket: %s"),
279                        g_strerror (errno));
280         }
281       break;
282     }
283
284   simple = g_simple_async_result_new (G_OBJECT (data->stream),
285                                       data->callback,
286                                       data->user_data,
287                                       g_socket_input_stream_read_async);
288
289   g_simple_async_result_set_op_res_gssize (simple, count_read);
290
291   if (count_read == -1)
292     {
293       g_simple_async_result_set_from_error (simple, error);
294       g_error_free (error);
295     }
296
297   /* Complete immediately, not in idle, since we're already in a mainloop callout */
298   g_simple_async_result_complete (simple);
299   g_object_unref (simple);
300
301   return FALSE;
302 }
303
304 static void
305 g_socket_input_stream_read_async (GInputStream        *stream,
306                                   void                *buffer,
307                                   gsize                count,
308                                   int                  io_priority,
309                                   GCancellable        *cancellable,
310                                   GAsyncReadyCallback  callback,
311                                   gpointer             user_data)
312 {
313   GSource *source;
314   GSocketInputStream *socket_stream;
315   ReadAsyncData *data;
316
317   socket_stream = G_SOCKET_INPUT_STREAM (stream);
318
319   data = g_new0 (ReadAsyncData, 1);
320   data->count = count;
321   data->buffer = buffer;
322   data->callback = callback;
323   data->user_data = user_data;
324   data->cancellable = cancellable;
325   data->stream = socket_stream;
326
327   source = _g_fd_source_new (socket_stream->priv->fd,
328                              POLLIN,
329                              cancellable);
330   
331   g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
332   g_source_attach (source, NULL);
333  
334   g_source_unref (source);
335 }
336
337 static gssize
338 g_socket_input_stream_read_finish (GInputStream              *stream,
339                                    GAsyncResult              *result,
340                                    GError                   **error)
341 {
342   GSimpleAsyncResult *simple;
343   gssize nread;
344
345   simple = G_SIMPLE_ASYNC_RESULT (result);
346   g_assert (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async);
347   
348   nread = g_simple_async_result_get_op_res_gssize (simple);
349   return nread;
350 }
351
352 static void
353 g_socket_input_stream_skip_async (GInputStream        *stream,
354                                   gsize                count,
355                                   int                  io_priority,
356                                   GCancellable        *cancellable,
357                                   GAsyncReadyCallback  callback,
358                                   gpointer             data)
359 {
360   g_assert_not_reached ();
361   /* TODO: Not implemented */
362 }
363
364 static gssize
365 g_socket_input_stream_skip_finish  (GInputStream              *stream,
366                                     GAsyncResult              *result,
367                                     GError                   **error)
368 {
369   g_assert_not_reached ();
370   /* TODO: Not implemented */
371 }
372
373
374 typedef struct {
375   GInputStream *stream;
376   GAsyncReadyCallback callback;
377   gpointer user_data;
378 } CloseAsyncData;
379
380 static void
381 close_async_data_free (gpointer _data)
382 {
383   CloseAsyncData *data = _data;
384
385   g_free (data);
386 }
387
388 static gboolean
389 close_async_cb (CloseAsyncData *data)
390 {
391   GSocketInputStream *socket_stream;
392   GSimpleAsyncResult *simple;
393   GError *error = NULL;
394   gboolean result;
395   int res;
396
397   socket_stream = G_SOCKET_INPUT_STREAM (data->stream);
398
399   if (!socket_stream->priv->close_fd_at_close)
400     {
401       result = TRUE;
402       goto out;
403     }
404   
405   while (1)
406     {
407       res = close (socket_stream->priv->fd);
408       if (res == -1)
409         {
410           g_set_error (&error, G_IO_ERROR,
411                        g_io_error_from_errno (errno),
412                        _("Error closing socket: %s"),
413                        g_strerror (errno));
414         }
415       break;
416     }
417   
418   result = res != -1;
419
420  out:
421   simple = g_simple_async_result_new (G_OBJECT (data->stream),
422                                       data->callback,
423                                       data->user_data,
424                                       g_socket_input_stream_close_async);
425
426   if (!result)
427     {
428       g_simple_async_result_set_from_error (simple, error);
429       g_error_free (error);
430     }
431
432   /* Complete immediately, not in idle, since we're already in a mainloop callout */
433   g_simple_async_result_complete (simple);
434   g_object_unref (simple);
435   
436   return FALSE;
437 }
438
439 static void
440 g_socket_input_stream_close_async (GInputStream       *stream,
441                                    int                 io_priority,
442                                    GCancellable       *cancellable,
443                                    GAsyncReadyCallback callback,
444                                    gpointer            user_data)
445 {
446   GSource *idle;
447   CloseAsyncData *data;
448
449   data = g_new0 (CloseAsyncData, 1);
450
451   data->stream = stream;
452   data->callback = callback;
453   data->user_data = user_data;
454   
455   idle = g_idle_source_new ();
456   g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, close_async_data_free);
457   g_source_attach (idle, NULL);
458   g_source_unref (idle);
459 }
460
461 static gboolean
462 g_socket_input_stream_close_finish (GInputStream              *stream,
463                                     GAsyncResult              *result,
464                                     GError                   **error)
465 {
466   /* Failures handled in generic close_finish code */
467   return TRUE;
468 }
469