static void gst_socket_src_finalize (GObject * gobject);
-static GstFlowReturn gst_socket_src_create (GstPushSrc * psrc,
- GstBuffer ** outbuf);
+static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
+ GstBuffer * outbuf);
static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
gstbasesrc_class->unlock = gst_socket_src_unlock;
gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
- gstpush_src_class->create = gst_socket_src_create;
+ gstpush_src_class->fill = gst_socket_src_fill;
GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
}
}
static GstFlowReturn
-gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
+gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
{
GstSocketSrc *src;
GstFlowReturn ret = GST_FLOW_OK;
gssize rret;
GError *err = NULL;
GstMapInfo map;
- gssize avail, read;
GSocket *socket;
src = GST_SOCKET_SRC (psrc);
GST_LOG_OBJECT (src, "asked for a buffer");
- /* read the buffer header */
- avail = g_socket_get_available_bytes (socket);
- if (avail < 0) {
- goto get_available_error;
- } else if (avail == 0) {
- GIOCondition condition;
-
- if (!g_socket_condition_wait (socket,
- G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
- goto select_error;
-
- condition =
- g_socket_condition_check (socket,
- G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
-
- if ((condition & G_IO_ERR)) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Socket in error state"));
- *outbuf = NULL;
- ret = GST_FLOW_ERROR;
- goto done;
- } else if ((condition & G_IO_HUP)) {
- GST_DEBUG_OBJECT (src, "Connection closed");
- *outbuf = NULL;
- ret = GST_FLOW_EOS;
- goto done;
- }
- avail = g_socket_get_available_bytes (socket);
- if (avail < 0)
- goto get_available_error;
- }
-
- if (avail > 0) {
- read = MIN (avail, MAX_READ_SIZE);
- *outbuf = gst_buffer_new_and_alloc (read);
- gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
- rret =
- g_socket_receive (socket, (gchar *) map.data, read,
- src->cancellable, &err);
- } else {
- /* Connection closed */
- *outbuf = NULL;
- read = 0;
- rret = 0;
- }
+ gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
+ rret = g_socket_receive_with_blocking (socket, (gchar *) map.data,
+ map.size, TRUE, src->cancellable, &err);
+ gst_buffer_unmap (outbuf, &map);
if (rret == 0) {
GST_DEBUG_OBJECT (src, "Connection closed");
ret = GST_FLOW_EOS;
- if (*outbuf) {
- gst_buffer_unmap (*outbuf, &map);
- gst_buffer_unref (*outbuf);
- }
- *outbuf = NULL;
} else if (rret < 0) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
ret = GST_FLOW_FLUSHING;
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Failed to read from socket: %s", err->message));
}
- gst_buffer_unmap (*outbuf, &map);
- gst_buffer_unref (*outbuf);
- *outbuf = NULL;
} else {
ret = GST_FLOW_OK;
- gst_buffer_unmap (*outbuf, &map);
- gst_buffer_resize (*outbuf, 0, rret);
+ gst_buffer_resize (outbuf, 0, rret);
GST_LOG_OBJECT (src,
"Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
- gst_buffer_get_size (*outbuf),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
- GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
+ gst_buffer_get_size (outbuf),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
+ GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
}
g_clear_error (&err);
-done:
g_object_unref (socket);
return ret;
-select_error:
- {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
- GST_DEBUG_OBJECT (src, "Cancelled");
- ret = GST_FLOW_FLUSHING;
- } else {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Select failed: %s", err->message));
- ret = GST_FLOW_ERROR;
- }
- g_clear_error (&err);
- g_object_unref (socket);
- return ret;
- }
-get_available_error:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Failed to get available bytes from socket"));
- g_object_unref (socket);
- return GST_FLOW_ERROR;
- }
no_socket:
{
GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),