return bytes_written;
}
-/* atomically read count bytes into buf, cancellable. return val of GST_FLOW_OK
- * indicates success, anything else is failure.
- */
-static GstFlowReturn
-gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
- GstPoll * fdset)
-{
- ssize_t n;
- size_t bytes_read;
- int num_to_read;
- int ret;
-
- bytes_read = 0;
-
- while (bytes_read < count) {
- /* do a blocking select on the socket */
- /* no action (0) is an error too in our case */
- if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
- if (ret == -1 && errno == EBUSY)
- goto cancelled;
- else
- goto select_error;
- }
-
- /* ask how much is available for reading on the socket */
- if (ioctl (socket, FIONREAD, &num_to_read) < 0)
- goto ioctl_error;
-
- if (num_to_read == 0)
- goto got_eos;
-
- /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */
-
- num_to_read = MIN (num_to_read, count - bytes_read);
-
- n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read);
-
- if (n < 0)
- goto read_error;
-
- if (n < num_to_read)
- goto short_read;
-
- bytes_read += num_to_read;
- }
-
- return GST_FLOW_OK;
-
- /* ERRORS */
-select_error:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("select failed: %s", g_strerror (errno)));
- return GST_FLOW_ERROR;
- }
-cancelled:
- {
- GST_DEBUG_OBJECT (this, "Select was cancelled");
- return GST_FLOW_WRONG_STATE;
- }
-ioctl_error:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("ioctl failed: %s", g_strerror (errno)));
- return GST_FLOW_ERROR;
- }
-got_eos:
- {
- GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
- return GST_FLOW_EOS;
- }
-read_error:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("read failed: %s", g_strerror (errno)));
- return GST_FLOW_ERROR;
- }
-short_read:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("short read: wanted %d bytes, got %" G_GSSIZE_FORMAT, num_to_read, n));
- return GST_FLOW_ERROR;
- }
-}
-
/* close the socket and reset the fd. Used to clean up after errors. */
void
gst_tcp_socket_close (GstPollFD * socket)
return GST_FLOW_ERROR;
}
}
-
-/* read a buffer from the given socket
- * returns:
- * - a GstBuffer in which data should be read
- * - NULL, indicating a connection close or an error, to be handled with
- * EOS
- */
-GstFlowReturn
-gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
- GstBuffer ** buf)
-{
- GstFlowReturn ret;
- guint8 *header = NULL;
- guint8 *data;
- gsize size;
-
- GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header",
- GST_DP_HEADER_LENGTH);
-
- *buf = NULL;
- header = g_malloc (GST_DP_HEADER_LENGTH);
-
- ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
-
- if (ret != GST_FLOW_OK)
- goto header_read_error;
-
- if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
- goto validate_error;
-
- if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER)
- goto is_not_buffer;
-
- GST_LOG_OBJECT (this, "validated buffer packet header");
-
- *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header);
-
- g_free (header);
-
- data = gst_buffer_map (*buf, &size, NULL, GST_MAP_WRITE);
- ret = gst_tcp_socket_read (this, socket, data, size, fdset);
- gst_buffer_unmap (*buf, data, size);
-
- if (ret != GST_FLOW_OK)
- goto data_read_error;
-
- return GST_FLOW_OK;
-
- /* ERRORS */
-header_read_error:
- {
- g_free (header);
- return ret;
- }
-validate_error:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("GDP buffer packet header does not validate"));
- g_free (header);
- return GST_FLOW_ERROR;
- }
-is_not_buffer:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("GDP packet contains something that is not a buffer (type %d)",
- gst_dp_header_payload_type (header)));
- g_free (header);
- return GST_FLOW_ERROR;
- }
-data_read_error:
- {
- gst_buffer_unref (*buf);
- *buf = NULL;
- return ret;
- }
-}
-
-GstFlowReturn
-gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset,
- GstCaps ** caps)
-{
- GstFlowReturn ret;
- guint8 *header = NULL;
- guint8 *payload = NULL;
- size_t payload_length;
-
- GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header",
- GST_DP_HEADER_LENGTH);
-
- *caps = NULL;
- header = g_malloc (GST_DP_HEADER_LENGTH);
-
- ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
-
- if (ret != GST_FLOW_OK)
- goto header_read_error;
-
- if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
- goto header_validate_error;
-
- if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
- goto is_not_caps;
-
- GST_LOG_OBJECT (this, "validated caps packet header");
-
- payload_length = gst_dp_header_payload_length (header);
- payload = g_malloc (payload_length);
-
- GST_LOG_OBJECT (this,
- "Reading %" G_GSIZE_FORMAT " bytes for caps packet payload",
- payload_length);
-
- ret = gst_tcp_socket_read (this, socket, payload, payload_length, fdset);
-
- if (ret != GST_FLOW_OK)
- goto payload_read_error;
-
- if (!gst_dp_validate_payload (GST_DP_HEADER_LENGTH, header, payload))
- goto payload_validate_error;
-
- *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload);
-
- GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps);
-
- g_free (header);
- g_free (payload);
-
- return GST_FLOW_OK;
-
- /* ERRORS */
-header_read_error:
- {
- g_free (header);
- return ret;
- }
-header_validate_error:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("GDP caps packet header does not validate"));
- g_free (header);
- return GST_FLOW_ERROR;
- }
-is_not_caps:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("GDP packet contains something that is not a caps (type %d)",
- gst_dp_header_payload_type (header)));
- g_free (header);
- return GST_FLOW_ERROR;
- }
-payload_read_error:
- {
- g_free (header);
- g_free (payload);
- return ret;
- }
-payload_validate_error:
- {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("GDP caps packet payload does not validate"));
- g_free (header);
- g_free (payload);
- return GST_FLOW_ERROR;
- }
-}