76c86c316e1d91c20719b46003091a158b66635f
[platform/upstream/gstreamer.git] / gst / tcp / gsttcp.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * gsttcp.c: TCP functions
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <netdb.h>
32 #include <string.h>             /* memset, in FD_ZERO macro */
33 #include <unistd.h>
34 #include <sys/ioctl.h>
35
36 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
37 #include <sys/filio.h>
38 #endif
39
40 #include <glib.h>
41 #include <gst/gst.h>
42 #include <gst/gst-i18n-plugin.h>
43 #include <gst/dataprotocol/dataprotocol.h>
44
45 GST_DEBUG_CATEGORY_EXTERN (tcp_debug);
46 #define GST_CAT_DEFAULT tcp_debug
47
48 #ifndef MSG_NOSIGNAL
49 #define MSG_NOSIGNAL 0
50 #endif
51
52 /* resolve host to IP address, throwing errors if it fails */
53 /* host can already be an IP address */
54 /* returns a newly allocated gchar * with the dotted ip address,
55    or NULL, in which case it already fired an error. */
56 gchar *
57 gst_tcp_host_to_ip (GstElement * element, const gchar * host)
58 {
59   struct hostent *hostinfo;
60   char **addrs;
61   gchar *ip;
62   struct in_addr addr;
63
64   GST_DEBUG_OBJECT (element, "resolving host %s", host);
65
66   /* first check if it already is an IP address */
67   if (inet_aton (host, &addr)) {
68     ip = g_strdup (host);
69     goto beach;
70   }
71   /* FIXME: could do a localhost check here */
72
73   /* perform a name lookup */
74   if (!(hostinfo = gethostbyname (host)))
75     goto resolve_error;
76
77   if (hostinfo->h_addrtype != AF_INET)
78     goto not_ip;
79
80   addrs = hostinfo->h_addr_list;
81
82   /* There could be more than one IP address, but we just return the first */
83   ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs));
84
85 beach:
86   GST_DEBUG_OBJECT (element, "resolved to IP %s", ip);
87   return ip;
88
89 resolve_error:
90   {
91     GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
92         ("Could not find IP address for host \"%s\".", host));
93     return NULL;
94   }
95 not_ip:
96   {
97     GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
98         ("host \"%s\" is not an IP host", host));
99     return NULL;
100   }
101 }
102
103 /* write buffer to given socket incrementally.
104  * Returns number of bytes written.
105  */
106 gint
107 gst_tcp_socket_write (int socket, const void *buf, size_t count)
108 {
109   size_t bytes_written = 0;
110
111   while (bytes_written < count) {
112     ssize_t wrote = send (socket, buf + bytes_written,
113         count - bytes_written, MSG_NOSIGNAL);
114
115     if (wrote <= 0) {
116       return bytes_written;
117     }
118     bytes_written += wrote;
119   }
120
121   if (bytes_written < 0)
122     GST_WARNING ("error while writing");
123   else
124     GST_LOG ("wrote %d bytes succesfully", bytes_written);
125   return bytes_written;
126 }
127
128 /* atomically read count bytes into buf, cancellable. return val of GST_FLOW_OK
129  * indicates success, anything else is failure.
130  */
131 static GstFlowReturn
132 gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
133     int cancel_fd)
134 {
135   fd_set testfds;
136   int maxfdp1;
137   ssize_t n;
138   size_t bytes_read;
139   int num_to_read;
140
141   bytes_read = 0;
142
143   while (bytes_read < count) {
144     /* do a blocking select on the socket */
145     FD_ZERO (&testfds);
146     FD_SET (socket, &testfds);
147     if (cancel_fd >= 0)
148       FD_SET (cancel_fd, &testfds);
149     maxfdp1 = MAX (socket, cancel_fd) + 1;
150
151     /* no action (0) is an error too in our case */
152     if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
153       goto select_error;
154
155     if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
156       goto cancelled;
157
158     /* ask how much is available for reading on the socket */
159     if (ioctl (socket, FIONREAD, &num_to_read) < 0)
160       goto ioctl_error;
161
162     if (num_to_read == 0)
163       goto got_eos;
164
165     /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */
166
167     num_to_read = MIN (num_to_read, count - bytes_read);
168
169     n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read);
170
171     if (n < 0)
172       goto read_error;
173
174     if (n < num_to_read)
175       goto short_read;
176
177     bytes_read += num_to_read;
178   }
179
180   return GST_FLOW_OK;
181
182   /* ERRORS */
183 select_error:
184   {
185     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
186         ("select failed: %s", g_strerror (errno)));
187     return GST_FLOW_ERROR;
188   }
189 cancelled:
190   {
191     GST_DEBUG_OBJECT (this, "Select was cancelled");
192     return GST_FLOW_WRONG_STATE;
193   }
194 ioctl_error:
195   {
196     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
197         ("ioctl failed: %s", g_strerror (errno)));
198     return GST_FLOW_ERROR;
199   }
200 got_eos:
201   {
202     GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
203     return GST_FLOW_UNEXPECTED;
204   }
205 read_error:
206   {
207     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
208         ("read failed: %s", g_strerror (errno)));
209     return GST_FLOW_ERROR;
210   }
211 short_read:
212   {
213     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
214         ("short read: wanted %d bytes, got %d", num_to_read, n));
215     return GST_FLOW_ERROR;
216   }
217 }
218
219 /* close the socket and reset the fd.  Used to clean up after errors. */
220 void
221 gst_tcp_socket_close (int *socket)
222 {
223   close (*socket);
224   *socket = -1;
225 }
226
227 /* read a buffer from the given socket
228  * returns:
229  * - a GstBuffer in which data should be read
230  * - NULL, indicating a connection close or an error, to be handled with
231  *         EOS
232  */
233 GstFlowReturn
234 gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
235     GstBuffer ** buf)
236 {
237   fd_set testfds;
238   int ret;
239   int maxfdp1;
240   ssize_t bytes_read;
241   int readsize;
242
243   *buf = NULL;
244
245   /* do a blocking select on the socket */
246   FD_ZERO (&testfds);
247   FD_SET (socket, &testfds);
248   if (cancel_fd >= 0)
249     FD_SET (cancel_fd, &testfds);
250   maxfdp1 = MAX (socket, cancel_fd) + 1;
251
252   /* no action (0) is an error too in our case */
253   if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0)
254     goto select_error;
255
256   if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
257     goto cancelled;
258
259   /* ask how much is available for reading on the socket */
260   if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
261     goto ioctl_error;
262
263   if (readsize == 0)
264     goto got_eos;
265
266   /* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */
267
268   *buf = gst_buffer_new_and_alloc (readsize);
269
270   bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize);
271
272   if (bytes_read < 0)
273     goto read_error;
274
275   if (bytes_read < readsize)
276     /* but mom, you promised to give me readsize bytes! */
277     goto short_read;
278
279   GST_DEBUG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (buf));
280   return GST_FLOW_OK;
281
282   /* ERRORS */
283 select_error:
284   {
285     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
286         ("select failed: %s", g_strerror (errno)));
287     return GST_FLOW_ERROR;
288   }
289 cancelled:
290   {
291     GST_DEBUG_OBJECT (this, "Select was cancelled");
292     return GST_FLOW_WRONG_STATE;
293   }
294 ioctl_error:
295   {
296     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
297         ("ioctl failed: %s", g_strerror (errno)));
298     return GST_FLOW_ERROR;
299   }
300 got_eos:
301   {
302     GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
303     return GST_FLOW_WRONG_STATE;
304   }
305 read_error:
306   {
307     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
308         ("read failed: %s", g_strerror (errno)));
309     gst_buffer_unref (*buf);
310     *buf = NULL;
311     return GST_FLOW_ERROR;
312   }
313 short_read:
314   {
315     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
316         ("short read: wanted %d bytes, got %d", readsize, bytes_read));
317     gst_buffer_unref (*buf);
318     *buf = NULL;
319     return GST_FLOW_ERROR;
320   }
321 }
322
323 /* read a buffer from the given socket
324  * returns:
325  * - a GstBuffer in which data should be read
326  * - NULL, indicating a connection close or an error, to be handled with
327  *         EOS
328  */
329 GstFlowReturn
330 gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
331     GstBuffer ** buf)
332 {
333   GstFlowReturn ret;
334   guint8 *header = NULL;
335
336   GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header",
337       GST_DP_HEADER_LENGTH);
338
339   *buf = NULL;
340   header = g_malloc (GST_DP_HEADER_LENGTH);
341
342   ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
343       cancel_fd);
344
345   if (ret != GST_FLOW_OK)
346     goto header_read_error;
347
348   if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
349     goto validate_error;
350
351   if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER)
352     goto is_not_buffer;
353
354   GST_LOG_OBJECT (this, "validated buffer packet header");
355
356   *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header);
357
358   g_free (header);
359
360   ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf),
361       GST_BUFFER_SIZE (*buf), cancel_fd);
362
363   if (ret != GST_FLOW_OK)
364     goto data_read_error;
365
366   return GST_FLOW_OK;
367
368   /* ERRORS */
369 header_read_error:
370   {
371     g_free (header);
372     return ret;
373   }
374 validate_error:
375   {
376     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
377         ("GDP buffer packet header does not validate"));
378     g_free (header);
379     return GST_FLOW_ERROR;
380   }
381 is_not_buffer:
382   {
383     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
384         ("GDP packet contains something that is not a buffer"));
385     g_free (header);
386     return GST_FLOW_ERROR;
387   }
388 data_read_error:
389   {
390     gst_buffer_unref (*buf);
391     *buf = NULL;
392     return ret;
393   }
394 }
395
396 GstFlowReturn
397 gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
398     GstCaps ** caps)
399 {
400   GstFlowReturn ret;
401   guint8 *header = NULL;
402   guint8 *payload = NULL;
403   size_t payload_length;
404
405   GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header",
406       GST_DP_HEADER_LENGTH);
407
408   *caps = NULL;
409   header = g_malloc (GST_DP_HEADER_LENGTH);
410
411   ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
412       cancel_fd);
413
414   if (ret != GST_FLOW_OK)
415     goto header_read_error;
416
417   if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
418     goto header_validate_error;
419
420   if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
421     goto is_not_caps;
422
423   GST_LOG_OBJECT (this, "validated caps packet header");
424
425   payload_length = gst_dp_header_payload_length (header);
426   payload = g_malloc (payload_length);
427
428   GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload",
429       payload_length);
430
431   ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd);
432
433   if (ret != GST_FLOW_OK)
434     goto payload_read_error;
435
436   if (!gst_dp_validate_payload (payload_length, header, payload))
437     goto payload_validate_error;
438
439   *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload);
440
441   GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps);
442
443   g_free (header);
444   g_free (payload);
445
446   return GST_FLOW_OK;
447
448   /* ERRORS */
449 header_read_error:
450   {
451     g_free (header);
452     return ret;
453   }
454 header_validate_error:
455   {
456     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
457         ("GDP caps packet header does not validate"));
458     g_free (header);
459     return GST_FLOW_ERROR;
460   }
461 is_not_caps:
462   {
463     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
464         ("GDP packet contains something that is not a caps"));
465     g_free (header);
466     return GST_FLOW_ERROR;
467   }
468 payload_read_error:
469   {
470     g_free (header);
471     g_free (payload);
472     return ret;
473   }
474 payload_validate_error:
475   {
476     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
477         ("GDP caps packet payload does not validate"));
478     g_free (header);
479     g_free (payload);
480     return GST_FLOW_ERROR;
481   }
482 }
483
484 /* write a GDP header to the socket.  Return false if fails. */
485 gboolean
486 gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer,
487     gboolean fatal, const gchar * host, int port)
488 {
489   guint length;
490   guint8 *header;
491   size_t wrote;
492
493   if (!gst_dp_header_from_buffer (buffer, 0, &length, &header))
494     goto create_error;
495
496   GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
497   wrote = gst_tcp_socket_write (socket, header, length);
498   g_free (header);
499
500   if (wrote != length)
501     goto write_error;
502
503   return TRUE;
504
505   /* ERRORS */
506 create_error:
507   {
508     if (fatal)
509       GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
510           ("Could not create GDP header from buffer"));
511     return FALSE;
512   }
513 write_error:
514   {
515     if (fatal)
516       GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
517           (_("Error while sending data to \"%s:%d\"."), host, port),
518           ("Only %d of %d bytes written: %s",
519               wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno)));
520     return FALSE;
521   }
522 }
523
524 /* write GDP header and payload to the given socket for the given caps.
525  * Return false if fails. */
526 gboolean
527 gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
528     gboolean fatal, const char *host, int port)
529 {
530   guint length;
531   guint8 *header;
532   guint8 *payload;
533   size_t wrote;
534
535   if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload))
536     goto create_error;
537
538   GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
539   wrote = gst_tcp_socket_write (socket, header, length);
540   if (wrote != length)
541     goto write_header_error;
542
543   length = gst_dp_header_payload_length (header);
544   g_free (header);
545
546   GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
547   wrote = gst_tcp_socket_write (socket, payload, length);
548   g_free (payload);
549
550   if (wrote != length)
551     goto write_payload_error;
552
553   return TRUE;
554
555   /* ERRORS */
556 create_error:
557   {
558     if (fatal)
559       GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
560           ("Could not create GDP packet from caps"));
561     return FALSE;
562   }
563 write_header_error:
564   {
565     g_free (header);
566     g_free (payload);
567     if (fatal)
568       GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
569           (_("Error while sending gdp header data to \"%s:%d\"."), host, port),
570           ("Only %d of %d bytes written: %s",
571               wrote, length, g_strerror (errno)));
572     return FALSE;
573   }
574 write_payload_error:
575   {
576     if (fatal)
577       GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
578           (_("Error while sending gdp payload data to \"%s:%d\"."), host, port),
579           ("Only %d of %d bytes written: %s",
580               wrote, length, g_strerror (errno)));
581     return FALSE;
582   }
583 }