static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
gpointer user_data);
static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
-static void gst_rtmp_connection_write_bytes_done (GObject * obj,
+static void gst_rtmp_connection_write_buffer_done (GObject * obj,
GAsyncResult * result, gpointer user_data);
static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
guint needed_bytes);
{
rtmpconnection->cancellable = g_cancellable_new ();
rtmpconnection->output_queue =
- g_async_queue_new_full ((GDestroyNotify) g_bytes_unref);
+ g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
gst_rtmp_connection_start_write (GstRtmpConnection * self)
{
GOutputStream *os;
- GBytes *bytes;
+ GstBuffer *chunks;
if (self->writing) {
return;
}
- bytes = g_async_queue_try_pop (self->output_queue);
- if (!bytes) {
+ chunks = g_async_queue_try_pop (self->output_queue);
+ if (!chunks) {
return;
}
}
os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
- gst_rtmp_output_stream_write_all_bytes_async (os, bytes,
- G_PRIORITY_DEFAULT, self->cancellable,
- gst_rtmp_connection_write_bytes_done, g_object_ref (self));
- g_bytes_unref (bytes);
+ gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
+ self->cancellable, gst_rtmp_connection_write_buffer_done,
+ g_object_ref (self));
+
+ gst_buffer_unref (chunks);
}
static void
}
static void
-gst_rtmp_connection_write_bytes_done (GObject * obj,
+gst_rtmp_connection_write_buffer_done (GObject * obj,
GAsyncResult * result, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
self->writing = FALSE;
- res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error);
+ res = gst_rtmp_output_stream_write_all_buffer_finish (os, result, NULL,
+ &error);
+
if (!res) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_INFO_OBJECT (self, "write cancelled");
return G_SOURCE_REMOVE;
}
-static void
-byte_array_take_buffer (GByteArray * byte_array, GstBuffer * buffer)
-{
- GstMapInfo map;
- gboolean ret;
- ret = gst_buffer_map (buffer, &map, GST_MAP_READ);
- g_assert (ret);
- g_assert (byte_array->len + map.size <= (guint64) G_MAXUINT);
- g_byte_array_append (byte_array, map.data, map.size);
- gst_buffer_unmap (buffer, &map);
- gst_buffer_unref (buffer);
-}
-
void
gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
{
GstRtmpMeta *meta;
GstRtmpChunkStream *cstream;
- GstBuffer *out_buffer;
- GByteArray *out_ba;
+ GstBuffer *chunks;
g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
g_return_if_fail (GST_IS_BUFFER (buffer));
cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
g_return_if_fail (cstream);
- out_buffer = gst_rtmp_chunk_stream_serialize_start (cstream, buffer,
+ chunks = gst_rtmp_chunk_stream_serialize_all (cstream, buffer,
self->out_chunk_size);
- g_return_if_fail (out_buffer);
-
- out_ba = g_byte_array_new ();
-
- while (out_buffer) {
- byte_array_take_buffer (out_ba, out_buffer);
-
- out_buffer = gst_rtmp_chunk_stream_serialize_next (cstream,
- self->out_chunk_size);
- }
+ g_return_if_fail (chunks);
- g_async_queue_push (self->output_queue, g_byte_array_free_to_bytes (out_ba));
+ g_async_queue_push (self->output_queue, chunks);
g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
start_write, g_object_ref (self), g_object_unref);
}