gst_rtmp_connection_start_write (GstRtmpConnection * self)
{
GOutputStream *os;
- GstBuffer *chunks;
+ GstBuffer *message, *chunks;
+ GstRtmpMeta *meta;
+ GstRtmpChunkStream *cstream;
if (self->writing) {
return;
}
- chunks = g_async_queue_try_pop (self->output_queue);
- if (!chunks) {
+ message = g_async_queue_try_pop (self->output_queue);
+ if (!message) {
return;
}
+ meta = gst_buffer_get_rtmp_meta (message);
+ if (!meta) {
+ GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
+ goto out;
+ }
+
+ cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
+ if (!cstream) {
+ GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
+ message);
+ goto out;
+ }
+
+ chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
+ self->out_chunk_size);
+ if (!chunks) {
+ GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
+ goto out;
+ }
+
self->writing = TRUE;
if (self->output_handler) {
self->output_handler (self, self->output_handler_user_data);
g_object_ref (self));
gst_buffer_unref (chunks);
+
+out:
+ gst_buffer_unref (message);
}
static void
void
gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
{
- GstRtmpMeta *meta;
- GstRtmpChunkStream *cstream;
- GstBuffer *chunks;
-
g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
g_return_if_fail (GST_IS_BUFFER (buffer));
- meta = gst_buffer_get_rtmp_meta (buffer);
- g_return_if_fail (meta);
-
- cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
- g_return_if_fail (cstream);
-
- chunks = gst_rtmp_chunk_stream_serialize_all (cstream, buffer,
- self->out_chunk_size);
- g_return_if_fail (chunks);
-
- g_async_queue_push (self->output_queue, chunks);
+ g_async_queue_push (self->output_queue, buffer);
g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
start_write, g_object_ref (self), g_object_unref);
}