move a recurring normal event to LOG, where it should be
[platform/upstream/gstreamer.git] / gst / tcp / gsttcp.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * gsttcp.c: TCP functions
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <netdb.h>
32 #include <string.h>             /* memset, in FD_ZERO macro */
33 #include <unistd.h>
34 #include <sys/ioctl.h>
35
36 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
37 #include <sys/filio.h>
38 #endif
39
40 #include <glib.h>
41 #include <gst/gst.h>
42 #include <gst/gst-i18n-plugin.h>
43 #include <gst/dataprotocol/dataprotocol.h>
44
45 GST_DEBUG_CATEGORY_EXTERN (tcp_debug);
46 #define GST_CAT_DEFAULT tcp_debug
47
48 #ifndef MSG_NOSIGNAL
49 #define MSG_NOSIGNAL 0
50 #endif
51
52 /* resolve host to IP address, throwing errors if it fails */
53 /* host can already be an IP address */
54 /* returns a newly allocated gchar * with the dotted ip address,
55    or NULL, in which case it already fired an error. */
56 gchar *
57 gst_tcp_host_to_ip (GstElement * element, const gchar * host)
58 {
59   struct hostent *hostinfo;
60   char **addrs;
61   gchar *ip;
62   struct in_addr addr;
63
64   GST_DEBUG_OBJECT (element, "resolving host %s", host);
65
66   /* first check if it already is an IP address */
67   if (inet_aton (host, &addr)) {
68     ip = g_strdup (host);
69     goto beach;
70   }
71   /* FIXME: could do a localhost check here */
72
73   /* perform a name lookup */
74   if (!(hostinfo = gethostbyname (host)))
75     goto resolve_error;
76
77   if (hostinfo->h_addrtype != AF_INET)
78     goto not_ip;
79
80   addrs = hostinfo->h_addr_list;
81
82   /* There could be more than one IP address, but we just return the first */
83   ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs));
84
85 beach:
86   GST_DEBUG_OBJECT (element, "resolved to IP %s", ip);
87   return ip;
88
89 resolve_error:
90   {
91     GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
92         ("Could not find IP address for host \"%s\".", host));
93     return NULL;
94   }
95 not_ip:
96   {
97     GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
98         ("host \"%s\" is not an IP host", host));
99     return NULL;
100   }
101 }
102
103 /* write buffer to given socket incrementally.
104  * Returns number of bytes written.
105  */
106 gint
107 gst_tcp_socket_write (int socket, const void *buf, size_t count)
108 {
109   size_t bytes_written = 0;
110
111   while (bytes_written < count) {
112     ssize_t wrote = send (socket, buf + bytes_written,
113         count - bytes_written, MSG_NOSIGNAL);
114
115     if (wrote <= 0) {
116       return bytes_written;
117     }
118     bytes_written += wrote;
119   }
120
121   if (bytes_written < 0)
122     GST_WARNING ("error while writing");
123   else
124     GST_LOG ("wrote %d bytes succesfully", bytes_written);
125   return bytes_written;
126 }
127
128 /* atomically read count bytes into buf, cancellable. return val of GST_FLOW_OK
129  * indicates success, anything else is failure.
130  */
131 static GstFlowReturn
132 gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
133     int cancel_fd)
134 {
135   fd_set testfds;
136   int maxfdp1;
137   ssize_t n;
138   size_t bytes_read;
139   int num_to_read;
140
141   bytes_read = 0;
142
143   while (bytes_read < count) {
144     /* do a blocking select on the socket */
145     FD_ZERO (&testfds);
146     FD_SET (socket, &testfds);
147     if (cancel_fd >= 0)
148       FD_SET (cancel_fd, &testfds);
149     maxfdp1 = MAX (socket, cancel_fd) + 1;
150
151     /* no action (0) is an error too in our case */
152     if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
153       goto select_error;
154
155     if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
156       goto cancelled;
157
158     /* ask how much is available for reading on the socket */
159     if (ioctl (socket, FIONREAD, &num_to_read) < 0)
160       goto ioctl_error;
161
162     if (num_to_read == 0)
163       goto got_eos;
164
165     /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */
166
167     num_to_read = MIN (num_to_read, count - bytes_read);
168
169     n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read);
170
171     if (n < 0)
172       goto read_error;
173
174     if (n < num_to_read)
175       goto short_read;
176
177     bytes_read += num_to_read;
178   }
179
180   return GST_FLOW_OK;
181
182   /* ERRORS */
183 select_error:
184   {
185     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
186         ("select failed: %s", g_strerror (errno)));
187     return GST_FLOW_ERROR;
188   }
189 cancelled:
190   {
191     GST_DEBUG_OBJECT (this, "Select was cancelled");
192     return GST_FLOW_WRONG_STATE;
193   }
194 ioctl_error:
195   {
196     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
197         ("ioctl failed: %s", g_strerror (errno)));
198     return GST_FLOW_ERROR;
199   }
200 got_eos:
201   {
202     GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
203     return GST_FLOW_UNEXPECTED;
204   }
205 read_error:
206   {
207     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
208         ("read failed: %s", g_strerror (errno)));
209     return GST_FLOW_ERROR;
210   }
211 short_read:
212   {
213     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
214         ("short read: wanted %d bytes, got %d", num_to_read, n));
215     return GST_FLOW_ERROR;
216   }
217 }
218
219 /* close the socket and reset the fd.  Used to clean up after errors. */
220 void
221 gst_tcp_socket_close (int *socket)
222 {
223   close (*socket);
224   *socket = -1;
225 }
226
227 /* read a buffer from the given socket
228  * returns:
229  * - a GstBuffer in which data should be read
230  * - NULL, indicating a connection close or an error, to be handled with
231  *         EOS
232  */
233 GstFlowReturn
234 gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
235     GstBuffer ** buf)
236 {
237   fd_set testfds;
238   int ret;
239   int maxfdp1;
240   ssize_t bytes_read;
241   int readsize;
242
243   *buf = NULL;
244
245   /* do a blocking select on the socket */
246   FD_ZERO (&testfds);
247   FD_SET (socket, &testfds);
248   if (cancel_fd >= 0)
249     FD_SET (cancel_fd, &testfds);
250   maxfdp1 = MAX (socket, cancel_fd) + 1;
251
252   /* no action (0) is an error too in our case */
253   if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0)
254     goto select_error;
255
256   if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
257     goto cancelled;
258
259   /* ask how much is available for reading on the socket */
260   if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
261     goto ioctl_error;
262
263   if (readsize == 0)
264     goto got_eos;
265
266   /* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */
267
268   *buf = gst_buffer_new_and_alloc (readsize);
269
270   bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize);
271
272   if (bytes_read < 0)
273     goto read_error;
274
275   if (bytes_read < readsize)
276     /* but mom, you promised to give me readsize bytes! */
277     goto short_read;
278
279   GST_LOG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (*buf));
280   return GST_FLOW_OK;
281
282   /* ERRORS */
283 select_error:
284   {
285     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
286         ("select failed: %s", g_strerror (errno)));
287     return GST_FLOW_ERROR;
288   }
289 cancelled:
290   {
291     GST_DEBUG_OBJECT (this, "Select was cancelled");
292     return GST_FLOW_WRONG_STATE;
293   }
294 ioctl_error:
295   {
296     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
297         ("ioctl failed: %s", g_strerror (errno)));
298     return GST_FLOW_ERROR;
299   }
300 got_eos:
301   {
302     GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
303     return GST_FLOW_WRONG_STATE;
304   }
305 read_error:
306   {
307     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
308         ("read failed: %s", g_strerror (errno)));
309     gst_buffer_unref (*buf);
310     *buf = NULL;
311     return GST_FLOW_ERROR;
312   }
313 short_read:
314   {
315     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
316         ("short read: wanted %d bytes, got %d", readsize, bytes_read));
317     gst_buffer_unref (*buf);
318     *buf = NULL;
319     return GST_FLOW_ERROR;
320   }
321 }
322
323 /* read a buffer from the given socket
324  * returns:
325  * - a GstBuffer in which data should be read
326  * - NULL, indicating a connection close or an error, to be handled with
327  *         EOS
328  */
329 GstFlowReturn
330 gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
331     GstBuffer ** buf)
332 {
333   GstFlowReturn ret;
334   guint8 *header = NULL;
335
336   GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header",
337       GST_DP_HEADER_LENGTH);
338
339   *buf = NULL;
340   header = g_malloc (GST_DP_HEADER_LENGTH);
341
342   ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
343       cancel_fd);
344
345   if (ret != GST_FLOW_OK)
346     goto header_read_error;
347
348   if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
349     goto validate_error;
350
351   if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER)
352     goto is_not_buffer;
353
354   GST_LOG_OBJECT (this, "validated buffer packet header");
355
356   *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header);
357
358   g_free (header);
359
360   ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf),
361       GST_BUFFER_SIZE (*buf), cancel_fd);
362
363   if (ret != GST_FLOW_OK)
364     goto data_read_error;
365
366   return GST_FLOW_OK;
367
368   /* ERRORS */
369 header_read_error:
370   {
371     g_free (header);
372     return ret;
373   }
374 validate_error:
375   {
376     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
377         ("GDP buffer packet header does not validate"));
378     g_free (header);
379     return GST_FLOW_ERROR;
380   }
381 is_not_buffer:
382   {
383     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
384         ("GDP packet contains something that is not a buffer (type %d)",
385             gst_dp_header_payload_type (header)));
386     g_free (header);
387     return GST_FLOW_ERROR;
388   }
389 data_read_error:
390   {
391     gst_buffer_unref (*buf);
392     *buf = NULL;
393     return ret;
394   }
395 }
396
397 GstFlowReturn
398 gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
399     GstCaps ** caps)
400 {
401   GstFlowReturn ret;
402   guint8 *header = NULL;
403   guint8 *payload = NULL;
404   size_t payload_length;
405
406   GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header",
407       GST_DP_HEADER_LENGTH);
408
409   *caps = NULL;
410   header = g_malloc (GST_DP_HEADER_LENGTH);
411
412   ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
413       cancel_fd);
414
415   if (ret != GST_FLOW_OK)
416     goto header_read_error;
417
418   if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
419     goto header_validate_error;
420
421   if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
422     goto is_not_caps;
423
424   GST_LOG_OBJECT (this, "validated caps packet header");
425
426   payload_length = gst_dp_header_payload_length (header);
427   payload = g_malloc (payload_length);
428
429   GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload",
430       payload_length);
431
432   ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd);
433
434   if (ret != GST_FLOW_OK)
435     goto payload_read_error;
436
437   if (!gst_dp_validate_payload (payload_length, header, payload))
438     goto payload_validate_error;
439
440   *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload);
441
442   GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps);
443
444   g_free (header);
445   g_free (payload);
446
447   return GST_FLOW_OK;
448
449   /* ERRORS */
450 header_read_error:
451   {
452     g_free (header);
453     return ret;
454   }
455 header_validate_error:
456   {
457     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
458         ("GDP caps packet header does not validate"));
459     g_free (header);
460     return GST_FLOW_ERROR;
461   }
462 is_not_caps:
463   {
464     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
465         ("GDP packet contains something that is not a caps (type %d)",
466             gst_dp_header_payload_type (header)));
467     g_free (header);
468     return GST_FLOW_ERROR;
469   }
470 payload_read_error:
471   {
472     g_free (header);
473     g_free (payload);
474     return ret;
475   }
476 payload_validate_error:
477   {
478     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
479         ("GDP caps packet payload does not validate"));
480     g_free (header);
481     g_free (payload);
482     return GST_FLOW_ERROR;
483   }
484 }
485
486 /* write a GDP header to the socket.  Return false if fails. */
487 gboolean
488 gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer,
489     gboolean fatal, const gchar * host, int port)
490 {
491   guint length;
492   guint8 *header;
493   size_t wrote;
494
495   if (!gst_dp_header_from_buffer (buffer, 0, &length, &header))
496     goto create_error;
497
498   GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
499   wrote = gst_tcp_socket_write (socket, header, length);
500   g_free (header);
501
502   if (wrote != length)
503     goto write_error;
504
505   return TRUE;
506
507   /* ERRORS */
508 create_error:
509   {
510     if (fatal)
511       GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
512           ("Could not create GDP header from buffer"));
513     return FALSE;
514   }
515 write_error:
516   {
517     if (fatal)
518       GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
519           (_("Error while sending data to \"%s:%d\"."), host, port),
520           ("Only %d of %d bytes written: %s",
521               wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno)));
522     return FALSE;
523   }
524 }
525
526 /* write GDP header and payload to the given socket for the given caps.
527  * Return false if fails. */
528 gboolean
529 gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
530     gboolean fatal, const char *host, int port)
531 {
532   guint length;
533   guint8 *header;
534   guint8 *payload;
535   size_t wrote;
536
537   if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload))
538     goto create_error;
539
540   GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
541   wrote = gst_tcp_socket_write (socket, header, length);
542   if (wrote != length)
543     goto write_header_error;
544
545   length = gst_dp_header_payload_length (header);
546   g_free (header);
547
548   GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
549   wrote = gst_tcp_socket_write (socket, payload, length);
550   g_free (payload);
551
552   if (wrote != length)
553     goto write_payload_error;
554
555   return TRUE;
556
557   /* ERRORS */
558 create_error:
559   {
560     if (fatal)
561       GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
562           ("Could not create GDP packet from caps"));
563     return FALSE;
564   }
565 write_header_error:
566   {
567     g_free (header);
568     g_free (payload);
569     if (fatal)
570       GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
571           (_("Error while sending gdp header data to \"%s:%d\"."), host, port),
572           ("Only %d of %d bytes written: %s",
573               wrote, length, g_strerror (errno)));
574     return FALSE;
575   }
576 write_payload_error:
577   {
578     if (fatal)
579       GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
580           (_("Error while sending gdp payload data to \"%s:%d\"."), host, port),
581           ("Only %d of %d bytes written: %s",
582               wrote, length, g_strerror (errno)));
583     return FALSE;
584   }
585 }