plugins: port some plugins to the new memory API
[platform/upstream/gstreamer.git] / gst / tcp / gsttcp.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * gsttcp.c: TCP functions
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <netdb.h>
32 #include <unistd.h>
33 #include <sys/ioctl.h>
34
35 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
36 #include <sys/filio.h>
37 #endif
38
39 #include "gsttcp.h"
40 #include <gst/gst-i18n-plugin.h>
41
42 GST_DEBUG_CATEGORY_EXTERN (tcp_debug);
43 #define GST_CAT_DEFAULT tcp_debug
44
45 #ifndef MSG_NOSIGNAL
46 #define MSG_NOSIGNAL 0
47 #endif
48
49 /* resolve host to IP address, throwing errors if it fails */
50 /* host can already be an IP address */
51 /* returns a newly allocated gchar * with the dotted ip address,
52    or NULL, in which case it already fired an error. */
53 gchar *
54 gst_tcp_host_to_ip (GstElement * element, const gchar * host)
55 {
56   struct hostent *hostinfo;
57   char **addrs;
58   gchar *ip;
59   struct in_addr addr;
60
61   GST_DEBUG_OBJECT (element, "resolving host %s", host);
62
63   /* first check if it already is an IP address */
64   if (inet_aton (host, &addr)) {
65     ip = g_strdup (host);
66     goto beach;
67   }
68   /* FIXME: could do a localhost check here */
69
70   /* perform a name lookup */
71   if (!(hostinfo = gethostbyname (host)))
72     goto resolve_error;
73
74   if (hostinfo->h_addrtype != AF_INET)
75     goto not_ip;
76
77   addrs = hostinfo->h_addr_list;
78
79   /* There could be more than one IP address, but we just return the first */
80   ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs));
81
82 beach:
83   GST_DEBUG_OBJECT (element, "resolved to IP %s", ip);
84   return ip;
85
86 resolve_error:
87   {
88     GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
89         ("Could not find IP address for host \"%s\".", host));
90     return NULL;
91   }
92 not_ip:
93   {
94     GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
95         ("host \"%s\" is not an IP host", host));
96     return NULL;
97   }
98 }
99
100 /* write buffer to given socket incrementally.
101  * Returns number of bytes written.
102  */
103 gint
104 gst_tcp_socket_write (int socket, const void *buf, size_t count)
105 {
106   size_t bytes_written = 0;
107
108   while (bytes_written < count) {
109     ssize_t wrote = send (socket, (const char *) buf + bytes_written,
110         count - bytes_written, MSG_NOSIGNAL);
111
112     if (wrote <= 0) {
113       GST_WARNING ("error while writing");
114       return bytes_written;
115     }
116     bytes_written += wrote;
117   }
118
119   GST_LOG ("wrote %" G_GSIZE_FORMAT " bytes succesfully", bytes_written);
120   return bytes_written;
121 }
122
123 /* atomically read count bytes into buf, cancellable. return val of GST_FLOW_OK
124  * indicates success, anything else is failure.
125  */
126 static GstFlowReturn
127 gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
128     GstPoll * fdset)
129 {
130   ssize_t n;
131   size_t bytes_read;
132   int num_to_read;
133   int ret;
134
135   bytes_read = 0;
136
137   while (bytes_read < count) {
138     /* do a blocking select on the socket */
139     /* no action (0) is an error too in our case */
140     if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
141       if (ret == -1 && errno == EBUSY)
142         goto cancelled;
143       else
144         goto select_error;
145     }
146
147     /* ask how much is available for reading on the socket */
148     if (ioctl (socket, FIONREAD, &num_to_read) < 0)
149       goto ioctl_error;
150
151     if (num_to_read == 0)
152       goto got_eos;
153
154     /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */
155
156     num_to_read = MIN (num_to_read, count - bytes_read);
157
158     n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read);
159
160     if (n < 0)
161       goto read_error;
162
163     if (n < num_to_read)
164       goto short_read;
165
166     bytes_read += num_to_read;
167   }
168
169   return GST_FLOW_OK;
170
171   /* ERRORS */
172 select_error:
173   {
174     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
175         ("select failed: %s", g_strerror (errno)));
176     return GST_FLOW_ERROR;
177   }
178 cancelled:
179   {
180     GST_DEBUG_OBJECT (this, "Select was cancelled");
181     return GST_FLOW_WRONG_STATE;
182   }
183 ioctl_error:
184   {
185     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
186         ("ioctl failed: %s", g_strerror (errno)));
187     return GST_FLOW_ERROR;
188   }
189 got_eos:
190   {
191     GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
192     return GST_FLOW_UNEXPECTED;
193   }
194 read_error:
195   {
196     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
197         ("read failed: %s", g_strerror (errno)));
198     return GST_FLOW_ERROR;
199   }
200 short_read:
201   {
202     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
203         ("short read: wanted %d bytes, got %" G_GSSIZE_FORMAT, num_to_read, n));
204     return GST_FLOW_ERROR;
205   }
206 }
207
208 /* close the socket and reset the fd.  Used to clean up after errors. */
209 void
210 gst_tcp_socket_close (GstPollFD * socket)
211 {
212   if (socket->fd >= 0) {
213     close (socket->fd);
214     socket->fd = -1;
215   }
216 }
217
218 /* read a buffer from the given socket
219  * returns:
220  * - a GstBuffer in which data should be read
221  * - NULL, indicating a connection close or an error, to be handled with
222  *         EOS
223  */
224 GstFlowReturn
225 gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
226     GstBuffer ** buf)
227 {
228   int ret;
229   ssize_t bytes_read;
230   int readsize;
231   guint8 *data;
232
233   *buf = NULL;
234
235   /* do a blocking select on the socket */
236   /* no action (0) is an error too in our case */
237   if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
238     if (ret == -1 && errno == EBUSY)
239       goto cancelled;
240     else
241       goto select_error;
242   }
243
244   /* ask how much is available for reading on the socket */
245   if (ioctl (socket, FIONREAD, &readsize) < 0)
246     goto ioctl_error;
247
248   if (readsize == 0)
249     goto got_eos;
250
251   /* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */
252
253   *buf = gst_buffer_new_and_alloc (readsize);
254
255   data = gst_buffer_map (*buf, NULL, NULL, GST_MAP_WRITE);
256   bytes_read = read (socket, data, readsize);
257   gst_buffer_unmap (*buf, data, bytes_read);
258
259   if (bytes_read < 0)
260     goto read_error;
261
262   if (bytes_read < readsize)
263     /* but mom, you promised to give me readsize bytes! */
264     goto short_read;
265
266   GST_LOG_OBJECT (this, "returning buffer of size %d", bytes_read);
267   return GST_FLOW_OK;
268
269   /* ERRORS */
270 select_error:
271   {
272     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
273         ("select failed: %s", g_strerror (errno)));
274     return GST_FLOW_ERROR;
275   }
276 cancelled:
277   {
278     GST_DEBUG_OBJECT (this, "Select was cancelled");
279     return GST_FLOW_WRONG_STATE;
280   }
281 ioctl_error:
282   {
283     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
284         ("ioctl failed: %s", g_strerror (errno)));
285     return GST_FLOW_ERROR;
286   }
287 got_eos:
288   {
289     GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
290     return GST_FLOW_UNEXPECTED;
291   }
292 read_error:
293   {
294     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
295         ("read failed: %s", g_strerror (errno)));
296     gst_buffer_unref (*buf);
297     *buf = NULL;
298     return GST_FLOW_ERROR;
299   }
300 short_read:
301   {
302     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
303         ("short read: wanted %d bytes, got %" G_GSSIZE_FORMAT, readsize,
304             bytes_read));
305     gst_buffer_unref (*buf);
306     *buf = NULL;
307     return GST_FLOW_ERROR;
308   }
309 }
310
311 /* read a buffer from the given socket
312  * returns:
313  * - a GstBuffer in which data should be read
314  * - NULL, indicating a connection close or an error, to be handled with
315  *         EOS
316  */
317 GstFlowReturn
318 gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
319     GstBuffer ** buf)
320 {
321   GstFlowReturn ret;
322   guint8 *header = NULL;
323   guint8 *data;
324   gsize size;
325
326   GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header",
327       GST_DP_HEADER_LENGTH);
328
329   *buf = NULL;
330   header = g_malloc (GST_DP_HEADER_LENGTH);
331
332   ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
333
334   if (ret != GST_FLOW_OK)
335     goto header_read_error;
336
337   if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
338     goto validate_error;
339
340   if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER)
341     goto is_not_buffer;
342
343   GST_LOG_OBJECT (this, "validated buffer packet header");
344
345   *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header);
346
347   g_free (header);
348
349   data = gst_buffer_map (*buf, &size, NULL, GST_MAP_WRITE);
350   ret = gst_tcp_socket_read (this, socket, data, size, fdset);
351   gst_buffer_unmap (*buf, data, size);
352
353   if (ret != GST_FLOW_OK)
354     goto data_read_error;
355
356   return GST_FLOW_OK;
357
358   /* ERRORS */
359 header_read_error:
360   {
361     g_free (header);
362     return ret;
363   }
364 validate_error:
365   {
366     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
367         ("GDP buffer packet header does not validate"));
368     g_free (header);
369     return GST_FLOW_ERROR;
370   }
371 is_not_buffer:
372   {
373     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
374         ("GDP packet contains something that is not a buffer (type %d)",
375             gst_dp_header_payload_type (header)));
376     g_free (header);
377     return GST_FLOW_ERROR;
378   }
379 data_read_error:
380   {
381     gst_buffer_unref (*buf);
382     *buf = NULL;
383     return ret;
384   }
385 }
386
387 GstFlowReturn
388 gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset,
389     GstCaps ** caps)
390 {
391   GstFlowReturn ret;
392   guint8 *header = NULL;
393   guint8 *payload = NULL;
394   size_t payload_length;
395
396   GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header",
397       GST_DP_HEADER_LENGTH);
398
399   *caps = NULL;
400   header = g_malloc (GST_DP_HEADER_LENGTH);
401
402   ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
403
404   if (ret != GST_FLOW_OK)
405     goto header_read_error;
406
407   if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
408     goto header_validate_error;
409
410   if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
411     goto is_not_caps;
412
413   GST_LOG_OBJECT (this, "validated caps packet header");
414
415   payload_length = gst_dp_header_payload_length (header);
416   payload = g_malloc (payload_length);
417
418   GST_LOG_OBJECT (this,
419       "Reading %" G_GSIZE_FORMAT " bytes for caps packet payload",
420       payload_length);
421
422   ret = gst_tcp_socket_read (this, socket, payload, payload_length, fdset);
423
424   if (ret != GST_FLOW_OK)
425     goto payload_read_error;
426
427   if (!gst_dp_validate_payload (GST_DP_HEADER_LENGTH, header, payload))
428     goto payload_validate_error;
429
430   *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload);
431
432   GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps);
433
434   g_free (header);
435   g_free (payload);
436
437   return GST_FLOW_OK;
438
439   /* ERRORS */
440 header_read_error:
441   {
442     g_free (header);
443     return ret;
444   }
445 header_validate_error:
446   {
447     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
448         ("GDP caps packet header does not validate"));
449     g_free (header);
450     return GST_FLOW_ERROR;
451   }
452 is_not_caps:
453   {
454     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
455         ("GDP packet contains something that is not a caps (type %d)",
456             gst_dp_header_payload_type (header)));
457     g_free (header);
458     return GST_FLOW_ERROR;
459   }
460 payload_read_error:
461   {
462     g_free (header);
463     g_free (payload);
464     return ret;
465   }
466 payload_validate_error:
467   {
468     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
469         ("GDP caps packet payload does not validate"));
470     g_free (header);
471     g_free (payload);
472     return GST_FLOW_ERROR;
473   }
474 }