2 * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
22 * SECTION:element-gssink
24 * @see_also: #GstGsSrc
26 * Write incoming data to a series of sequentially-named remote files on a
27 * Google Cloud Storage bucket.
29 * The object-name property should contain a string with a \%d placeholder
30 * that will be substituted with the index for each filename.
32 * If the #GstGsSink:post-messages property is %TRUE, it sends an application
33 * message named `GstGsSink` after writing each buffer.
35 * The message's structure contains these fields:
37 * * #gchararray `filename`: the filename where the buffer was written.
38 * * #gchararray `date`: the date of the current buffer, NULL if no start date
40 * * #gint `index`: index of the buffer.
41 * * #GstClockTime `timestamp`: the timestamp of the buffer.
42 * * #GstClockTime `stream-time`: the stream time of the buffer.
43 * * #GstClockTime `running-time`: the running_time of the buffer.
44 * * #GstClockTime `duration`: the duration of the buffer.
45 * * #guint64 `offset`: the offset of the buffer that triggered the message.
46 * * #guint64 `offset-end`: the offset-end of the buffer that triggered the
49 * ## Example launch line
51 * gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
52 * object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
53 * next-file=buffer post-messages=true
55 * ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
56 * names are frame00000.png, frame00001.png, ..., frame00014.png
58 * gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
59 * pngenc ! gssink start-date="2020-04-16T08:55:03Z"
60 * object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
61 * next-file=buffer post-messages=true
63 * ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
64 * names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
65 * im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
66 * im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
68 * gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
69 * object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
71 * ### Upload any stream as a single file into Google Cloud Storage. Similar as
72 * filesink in this case. The file is then accessible from:
73 * gs://mybucket/mypath/myvideos/video.mp4
83 #include "gstgscommon.h"
84 #include "gstgssink.h"
88 static GstStaticPadTemplate sinktemplate =
89 GST_STATIC_PAD_TEMPLATE("sink",
94 GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
95 #define GST_CAT_DEFAULT gst_gs_sink_debug
97 #define DEFAULT_INDEX 0
98 #define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
99 #define DEFAULT_OBJECT_NAME "%s_%05d"
100 #define DEFAULT_POST_MESSAGES FALSE
102 namespace gcs = google::cloud::storage;
111 PROP_SERVICE_ACCOUNT_EMAIL,
113 PROP_SERVICE_ACCOUNT_CREDENTIALS,
122 std::unique_ptr<google::cloud::storage::Client> gcs_client;
123 std::unique_ptr<GSWriteStream> gcs_stream;
124 gchar* service_account_email;
125 gchar* service_account_credentials;
128 gchar* start_date_str;
129 GDateTime* start_date;
131 gboolean post_messages;
132 GstGsSinkNext next_file;
133 const gchar* content_type;
134 size_t nb_percent_format;
135 gboolean percent_s_is_first;
136 GstStructure* metadata;
139 static void gst_gs_sink_finalize(GObject* object);
141 static void gst_gs_sink_set_property(GObject* object,
145 static void gst_gs_sink_get_property(GObject* object,
150 static gboolean gst_gs_sink_start(GstBaseSink* bsink);
151 static gboolean gst_gs_sink_stop(GstBaseSink* sink);
152 static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
153 static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
154 GstBufferList* buffer_list);
155 static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
156 static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
158 #define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
159 static GType gst_gs_sink_next_get_type(void) {
160 static GType gs_sink_next_type = 0;
161 static const GEnumValue next_types[] = {
162 {GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
163 {GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
166 if (!gs_sink_next_type) {
167 gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
170 return gs_sink_next_type;
173 #define gst_gs_sink_parent_class parent_class
174 G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
175 GST_ELEMENT_REGISTER_DEFINE(gssink, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK)
177 class GSWriteStream {
179 GSWriteStream(google::cloud::storage::Client& client,
180 const char* bucket_name,
181 const char* object_name,
182 gcs::ObjectMetadata metadata)
183 : gcs_stream_(client.WriteObject(bucket_name,
185 gcs::WithObjectMetadata(metadata))) {}
186 ~GSWriteStream() { gcs_stream_.Close(); }
188 gcs::ObjectWriteStream& stream() { return gcs_stream_; }
191 gcs::ObjectWriteStream gcs_stream_;
194 static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
195 GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
196 GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
197 GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
199 gobject_class->set_property = gst_gs_sink_set_property;
200 gobject_class->get_property = gst_gs_sink_get_property;
203 * GstGsSink:bucket-name:
205 * Name of the Google Cloud Storage bucket.
209 g_object_class_install_property(
210 gobject_class, PROP_BUCKET_NAME,
212 "bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
213 NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
216 * GstGsSink:object-name:
218 * Full path name of the remote file.
222 g_object_class_install_property(
223 gobject_class, PROP_OBJECT_NAME,
225 "object-name", "Object Name", "Full path name of the remote file",
227 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
232 * Index to use with location property to create file names.
236 g_object_class_install_property(
237 gobject_class, PROP_INDEX,
240 "Index to use with location property to create file names. The "
241 "index is incremented by one for each buffer written.",
242 0, G_MAXINT, DEFAULT_INDEX,
243 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
246 * GstGsSink:post-messages:
248 * Post a message on the GstBus for each file.
252 g_object_class_install_property(
253 gobject_class, PROP_POST_MESSAGES,
254 g_param_spec_boolean(
255 "post-messages", "Post Messages",
256 "Post a message for each file with information of the buffer",
257 DEFAULT_POST_MESSAGES,
258 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
260 * GstGsSink:next-file:
262 * A #GstGsSinkNext that specifies when to start a new file.
266 g_object_class_install_property(
267 gobject_class, PROP_NEXT_FILE,
269 "next-file", "Next File", "When to start a new file",
270 GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
271 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
274 * GstGsSink:service-account-email:
276 * Service Account Email to use for credentials.
280 g_object_class_install_property(
281 gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
283 "service-account-email", "Service Account Email",
284 "Service Account Email to use for credentials", NULL,
285 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
286 GST_PARAM_MUTABLE_READY)));
289 * GstGsSink:service-account-credentials:
291 * Service Account Credentials as a JSON string to use for credentials.
295 g_object_class_install_property(
296 gobject_class, PROP_SERVICE_ACCOUNT_CREDENTIALS,
298 "service-account-credentials", "Service Account Credentials",
299 "Service Account Credentials as a JSON string to use for credentials",
301 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
302 GST_PARAM_MUTABLE_READY)));
305 * GstGsSink:start-date:
307 * Start date in iso8601 format.
311 g_object_class_install_property(
312 gobject_class, PROP_START_DATE,
314 "start-date", "Start Date", "Start date in iso8601 format", NULL,
315 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
316 GST_PARAM_MUTABLE_READY)));
319 * GstGsSink:metadata:
321 * A map of metadata to store with the object; field values need to be
322 * convertible to strings.
326 g_object_class_install_property(
327 gobject_class, PROP_METADATA,
329 "metadata", "Metadata",
330 "A map of metadata to store with the object; field values need to be "
331 "convertible to strings.",
333 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
334 GST_PARAM_MUTABLE_READY)));
336 gobject_class->finalize = gst_gs_sink_finalize;
338 gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
339 gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
340 gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
341 gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
342 gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
343 gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
345 GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
347 gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
348 gst_element_class_set_static_metadata(
349 gstelement_class, "Google Cloud Storage Sink", "Sink/File",
350 "Write buffers to a sequentially named set of files on Google Cloud "
352 "Julien Isorce <jisorce@oblong.com>");
355 static void gst_gs_sink_init(GstGsSink* sink) {
356 sink->gcs_client = nullptr;
357 sink->gcs_stream = nullptr;
358 sink->index = DEFAULT_INDEX;
359 sink->post_messages = DEFAULT_POST_MESSAGES;
360 sink->service_account_email = NULL;
361 sink->service_account_credentials = NULL;
362 sink->bucket_name = NULL;
363 sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
364 sink->start_date_str = NULL;
365 sink->start_date = NULL;
366 sink->next_file = DEFAULT_NEXT_FILE;
367 sink->content_type = NULL;
368 sink->nb_percent_format = 0;
369 sink->percent_s_is_first = FALSE;
371 gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
374 static void gst_gs_sink_finalize(GObject* object) {
375 GstGsSink* sink = GST_GS_SINK(object);
377 sink->gcs_client = nullptr;
378 sink->gcs_stream = nullptr;
379 g_free(sink->service_account_email);
380 sink->service_account_email = NULL;
381 g_free(sink->service_account_credentials);
382 sink->service_account_credentials = NULL;
383 g_free(sink->bucket_name);
384 sink->bucket_name = NULL;
385 g_free(sink->object_name);
386 sink->object_name = NULL;
387 g_free(sink->start_date_str);
388 sink->start_date_str = NULL;
389 if (sink->start_date) {
390 g_date_time_unref(sink->start_date);
391 sink->start_date = NULL;
393 sink->content_type = NULL;
394 g_clear_pointer(&sink->metadata, gst_structure_free);
396 G_OBJECT_CLASS(parent_class)->finalize(object);
399 static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
400 const gchar* object_name) {
401 g_free(sink->object_name);
402 sink->object_name = NULL;
403 sink->nb_percent_format = 0;
404 sink->percent_s_is_first = FALSE;
407 GST_ERROR_OBJECT(sink, "Object name is null");
411 const std::string name(object_name);
412 sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
413 if (sink->nb_percent_format > 2) {
414 GST_ERROR_OBJECT(sink, "Object name has too many formats");
418 const size_t delimiter_percent_s = name.find("%s");
419 if (delimiter_percent_s == std::string::npos) {
420 if (sink->nb_percent_format == 2) {
421 GST_ERROR_OBJECT(sink, "Object name must have just one number format");
424 sink->object_name = g_strdup(object_name);
428 const size_t delimiter_percent = name.find_first_of('%');
429 if (delimiter_percent_s == delimiter_percent) {
430 sink->percent_s_is_first = TRUE;
432 if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
433 GST_ERROR_OBJECT(sink, "Object name expect max one string format");
438 sink->object_name = g_strdup(object_name);
443 static void gst_gs_sink_set_property(GObject* object,
447 GstGsSink* sink = GST_GS_SINK(object);
450 case PROP_BUCKET_NAME:
451 g_free(sink->bucket_name);
452 sink->bucket_name = g_strdup(g_value_get_string(value));
454 case PROP_OBJECT_NAME:
455 gst_gs_sink_set_object_name(sink, g_value_get_string(value));
458 sink->index = g_value_get_int(value);
460 case PROP_POST_MESSAGES:
461 sink->post_messages = g_value_get_boolean(value);
464 sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
466 case PROP_SERVICE_ACCOUNT_EMAIL:
467 g_free(sink->service_account_email);
468 sink->service_account_email = g_strdup(g_value_get_string(value));
470 case PROP_SERVICE_ACCOUNT_CREDENTIALS:
471 g_free(sink->service_account_credentials);
472 sink->service_account_credentials = g_strdup(g_value_get_string(value));
474 case PROP_START_DATE:
475 g_free(sink->start_date_str);
476 if (sink->start_date)
477 g_date_time_unref(sink->start_date);
478 sink->start_date_str = g_strdup(g_value_get_string(value));
480 g_date_time_new_from_iso8601(sink->start_date_str, NULL);
481 if (!sink->start_date) {
482 GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
483 sink->start_date_str);
484 g_free(sink->start_date_str);
485 sink->start_date_str = NULL;
489 g_clear_pointer(&sink->metadata, gst_structure_free);
490 sink->metadata = (GstStructure*)g_value_dup_boxed(value);
493 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
498 static void gst_gs_sink_get_property(GObject* object,
502 GstGsSink* sink = GST_GS_SINK(object);
505 case PROP_BUCKET_NAME:
506 g_value_set_string(value, sink->bucket_name);
508 case PROP_OBJECT_NAME:
509 g_value_set_string(value, sink->object_name);
512 g_value_set_int(value, sink->index);
514 case PROP_POST_MESSAGES:
515 g_value_set_boolean(value, sink->post_messages);
518 g_value_set_enum(value, sink->next_file);
520 case PROP_SERVICE_ACCOUNT_EMAIL:
521 g_value_set_string(value, sink->service_account_email);
523 case PROP_SERVICE_ACCOUNT_CREDENTIALS:
524 g_value_set_string(value, sink->service_account_credentials);
526 case PROP_START_DATE:
527 g_value_set_string(value, sink->start_date_str);
530 g_value_set_boxed(value, sink->metadata);
533 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
538 static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
539 GstGsSink* sink = GST_GS_SINK(bsink);
542 if (!sink->bucket_name) {
543 GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
548 if (!sink->object_name) {
549 GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
554 sink->content_type = "";
556 sink->gcs_client = gst_gs_create_client(
557 sink->service_account_email, sink->service_account_credentials, &err);
559 GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
560 ("Could not create client (%s)", err->message),
566 GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
567 sink->bucket_name, sink->object_name);
572 static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
573 GstGsSink* sink = GST_GS_SINK(bsink);
575 sink->gcs_client = nullptr;
576 sink->gcs_stream = nullptr;
577 sink->content_type = NULL;
582 static void gst_gs_sink_post_message_full(GstGsSink* sink,
583 GstClockTime timestamp,
584 GstClockTime duration,
586 GstClockTime offset_end,
587 GstClockTime running_time,
588 GstClockTime stream_time,
589 const char* filename,
593 if (!sink->post_messages)
596 s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
597 "date", G_TYPE_STRING, date, "index", G_TYPE_INT,
598 sink->index, "timestamp", G_TYPE_UINT64, timestamp,
599 "stream-time", G_TYPE_UINT64, stream_time,
600 "running-time", G_TYPE_UINT64, running_time, "duration",
601 G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
602 offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
604 gst_element_post_message(GST_ELEMENT_CAST(sink),
605 gst_message_new_element(GST_OBJECT_CAST(sink), s));
608 static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
609 GstClockTime timestamp,
610 GstClockTime duration,
611 const char* filename) {
612 GstClockTime running_time, stream_time;
613 guint64 offset, offset_end;
617 if (!sink->post_messages)
620 segment = &GST_BASE_SINK(sink)->segment;
621 format = segment->format;
626 running_time = gst_segment_to_running_time(segment, format, timestamp);
627 stream_time = gst_segment_to_stream_time(segment, format, timestamp);
629 gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
630 running_time, stream_time, filename, NULL);
633 static void gst_gs_sink_post_message(GstGsSink* sink,
635 const char* filename,
637 GstClockTime duration, timestamp;
638 GstClockTime running_time, stream_time;
639 guint64 offset, offset_end;
643 if (!sink->post_messages)
646 segment = &GST_BASE_SINK(sink)->segment;
647 format = segment->format;
649 timestamp = GST_BUFFER_PTS(buffer);
650 duration = GST_BUFFER_DURATION(buffer);
651 offset = GST_BUFFER_OFFSET(buffer);
652 offset_end = GST_BUFFER_OFFSET_END(buffer);
654 running_time = gst_segment_to_running_time(segment, format, timestamp);
655 stream_time = gst_segment_to_stream_time(segment, format, timestamp);
657 gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
658 running_time, stream_time, filename, date);
661 struct AddMetadataIter {
663 gcs::ObjectMetadata* metadata;
666 static gboolean add_metadata_foreach(GQuark field_id,
668 gpointer user_data) {
669 struct AddMetadataIter* it = (struct AddMetadataIter*)user_data;
670 GValue svalue = G_VALUE_INIT;
672 g_value_init(&svalue, G_TYPE_STRING);
674 if (g_value_transform(value, &svalue)) {
675 const gchar* key = g_quark_to_string(field_id);
676 const gchar* value = g_value_get_string(&svalue);
678 GST_LOG_OBJECT(it->sink, "metadata '%s' -> '%s'", key, value);
679 it->metadata->upsert_metadata(key, value);
681 GST_WARNING_OBJECT(it->sink, "Failed to convert metadata '%s' to string",
682 g_quark_to_string(field_id));
685 g_value_unset(&svalue);
689 static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
691 GstMapInfo map = {0};
692 gchar* object_name = NULL;
693 gchar* buffer_date = NULL;
695 if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
696 return GST_FLOW_ERROR;
698 gcs::ObjectMetadata metadata =
699 gcs::ObjectMetadata().set_content_type(sink->content_type);
701 if (sink->metadata) {
702 struct AddMetadataIter it = {sink, &metadata};
704 gst_structure_foreach(sink->metadata, add_metadata_foreach, &it);
707 switch (sink->next_file) {
708 case GST_GS_SINK_NEXT_BUFFER: {
709 // Get buffer date if needed.
710 if (sink->start_date) {
711 if (sink->nb_percent_format != 2) {
712 GST_ERROR_OBJECT(sink, "Object name expects date and index");
713 gst_buffer_unmap(buffer, &map);
714 return GST_FLOW_ERROR;
717 if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
718 GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
719 gst_buffer_unmap(buffer, &map);
720 return GST_FLOW_ERROR;
723 if (sink->percent_s_is_first) {
725 g_strdup_printf(sink->object_name, buffer_date, sink->index);
728 g_strdup_printf(sink->object_name, sink->index, buffer_date);
731 if (sink->nb_percent_format != 1) {
732 GST_ERROR_OBJECT(sink, "Object name expects only an index");
733 gst_buffer_unmap(buffer, &map);
734 return GST_FLOW_ERROR;
737 object_name = g_strdup_printf(sink->object_name, sink->index);
740 GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
742 gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
743 sink->bucket_name, object_name, gcs::WithObjectMetadata(metadata));
745 gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
746 if (gcs_stream.fail()) {
747 GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
751 google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
752 sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
753 if (!object_metadata) {
755 sink, "Could not get object metadata for object %s (%s)",
756 object_name, object_metadata.status().message().c_str());
757 gst_buffer_unmap(buffer, &map);
760 return GST_FLOW_ERROR;
763 GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
764 object_name, object_metadata->size());
766 gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
772 case GST_GS_SINK_NEXT_NONE: {
773 if (!sink->gcs_stream) {
774 GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
775 sink->gcs_stream = std::make_unique<GSWriteStream>(
776 *sink->gcs_client.get(), sink->bucket_name, sink->object_name,
779 if (!sink->gcs_stream->stream().IsOpen()) {
781 sink, RESOURCE, OPEN_READ,
782 ("Could not create write stream (%s)",
783 sink->gcs_stream->stream().last_status().message().c_str()),
785 gst_buffer_unmap(buffer, &map);
790 GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
792 gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
793 stream.write(reinterpret_cast<const char*>(map.data), map.size);
795 GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
800 g_assert_not_reached();
803 gst_buffer_unmap(buffer, &map);
807 static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
808 GstGsSink* sink = GST_GS_SINK(bsink);
809 GstFlowReturn flow = GST_FLOW_OK;
811 flow = gst_gs_sink_write_buffer(sink, buffer);
815 static gboolean buffer_list_copy_data(GstBuffer** buf,
818 GstBuffer* dest = GST_BUFFER_CAST(data);
822 gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
824 num = gst_buffer_n_memory(*buf);
825 for (i = 0; i < num; ++i) {
828 mem = gst_buffer_get_memory(*buf, i);
829 gst_buffer_append_memory(dest, mem);
835 /* Our assumption for now is that the buffers in a buffer list should always
836 * end up in the same file. If someone wants different behaviour, they'll just
837 * have to add a property for that. */
838 static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
839 GstBufferList* list) {
843 size = gst_buffer_list_calculate_size(list);
844 GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
846 /* copy all buffers in the list into one single buffer, so we can use
847 * the normal render function (FIXME: optimise to avoid the memcpy) */
848 buf = gst_buffer_new();
849 gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
850 g_assert(gst_buffer_get_size(buf) == size);
852 gst_gs_sink_render(sink, buf);
853 gst_buffer_unref(buf);
858 static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
859 GstGsSink* sink = GST_GS_SINK(bsink);
860 GstStructure* s = gst_caps_get_structure(caps, 0);
862 sink->content_type = gst_structure_get_name(s);
864 GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
869 static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
870 GstGsSink* sink = GST_GS_SINK(bsink);
872 switch (GST_EVENT_TYPE(event)) {
874 if (sink->gcs_stream) {
875 sink->gcs_stream = nullptr;
876 gst_gs_sink_post_message_from_time(
877 sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
884 return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);