4c3d740b109d276c7608395f10eb4dfc7418085e
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / ext / gs / gstgssink.cpp
1 /* GStreamer
2  * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
3  *
4  * gstgssink.cpp:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:element-gssink
23  * @title: gssink
24  * @see_also: #GstGsSrc
25  *
26  * Write incoming data to a series of sequentially-named remote files on a
27  * Google Cloud Storage bucket.
28  *
29  * The object-name property should contain a string with a \%d placeholder
30  * that will be substituted with the index for each filename.
31  *
32  * If the #GstGsSink:post-messages property is %TRUE, it sends an application
33  * message named `GstGsSink` after writing each buffer.
34  *
35  * The message's structure contains these fields:
36  *
37  * * #gchararray `filename`: the filename where the buffer was written.
38  * * #gchararray `date`: the date of the current buffer, NULL if no start date
39  * is provided.
40  * * #gint `index`: index of the buffer.
41  * * #GstClockTime `timestamp`: the timestamp of the buffer.
42  * * #GstClockTime `stream-time`: the stream time of the buffer.
43  * * #GstClockTime `running-time`: the running_time of the buffer.
44  * * #GstClockTime `duration`: the duration of the buffer.
45  * * #guint64 `offset`: the offset of the buffer that triggered the message.
46  * * #guint64 `offset-end`: the offset-end of the buffer that triggered the
47  * message.
48  *
49  * ## Example launch line
50  * ```
51  * gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
52  * object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
53  * next-file=buffer post-messages=true
54  * ```
55  * ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
56  * names are frame00000.png, frame00001.png, ..., frame00014.png
57  * ```
58  * gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
59  * pngenc ! gssink start-date="2020-04-16T08:55:03Z"
60  * object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
61  * next-file=buffer post-messages=true
62  * ```
63  * ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
64  * names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
65  * im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
66  * im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
67  * ```
68  * gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
69  * object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
70  * ```
71  * ### Upload any stream as a single file into Google Cloud Storage. Similar as
72  * filesink in this case. The file is then accessible from:
73  * gs://mybucket/mypath/myvideos/video.mp4
74  *
75  * See also: #GstGsSrc
76  * Since: 1.20
77  */
78
79 #ifdef HAVE_CONFIG_H
80 #include "config.h"
81 #endif
82
83 #include "gstgscommon.h"
84 #include "gstgssink.h"
85
86 #include <algorithm>
87
88 static GstStaticPadTemplate sinktemplate =
89     GST_STATIC_PAD_TEMPLATE("sink",
90                             GST_PAD_SINK,
91                             GST_PAD_ALWAYS,
92                             GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
95 #define GST_CAT_DEFAULT gst_gs_sink_debug
96
97 #define DEFAULT_INDEX 0
98 #define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
99 #define DEFAULT_OBJECT_NAME "%s_%05d"
100 #define DEFAULT_POST_MESSAGES FALSE
101
102 namespace gcs = google::cloud::storage;
103
104 enum {
105   PROP_0,
106   PROP_BUCKET_NAME,
107   PROP_OBJECT_NAME,
108   PROP_INDEX,
109   PROP_POST_MESSAGES,
110   PROP_NEXT_FILE,
111   PROP_SERVICE_ACCOUNT_EMAIL,
112   PROP_START_DATE,
113   PROP_SERVICE_ACCOUNT_CREDENTIALS,
114   PROP_METADATA,
115 };
116
117 class GSWriteStream;
118
119 struct _GstGsSink {
120   GstBaseSink parent;
121
122   std::unique_ptr<google::cloud::storage::Client> gcs_client;
123   std::unique_ptr<GSWriteStream> gcs_stream;
124   gchar* service_account_email;
125   gchar* service_account_credentials;
126   gchar* bucket_name;
127   gchar* object_name;
128   gchar* start_date_str;
129   GDateTime* start_date;
130   gint index;
131   gboolean post_messages;
132   GstGsSinkNext next_file;
133   const gchar* content_type;
134   size_t nb_percent_format;
135   gboolean percent_s_is_first;
136   GstStructure* metadata;
137 };
138
139 static void gst_gs_sink_finalize(GObject* object);
140
141 static void gst_gs_sink_set_property(GObject* object,
142                                      guint prop_id,
143                                      const GValue* value,
144                                      GParamSpec* pspec);
145 static void gst_gs_sink_get_property(GObject* object,
146                                      guint prop_id,
147                                      GValue* value,
148                                      GParamSpec* pspec);
149
150 static gboolean gst_gs_sink_start(GstBaseSink* bsink);
151 static gboolean gst_gs_sink_stop(GstBaseSink* sink);
152 static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
153 static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
154                                              GstBufferList* buffer_list);
155 static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
156 static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
157
158 #define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
159 static GType gst_gs_sink_next_get_type(void) {
160   static GType gs_sink_next_type = 0;
161   static const GEnumValue next_types[] = {
162       {GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
163       {GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
164       {0, NULL, NULL}};
165
166   if (!gs_sink_next_type) {
167     gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
168   }
169
170   return gs_sink_next_type;
171 }
172
173 #define gst_gs_sink_parent_class parent_class
174 G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
175 GST_ELEMENT_REGISTER_DEFINE(gssink, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK)
176
177 class GSWriteStream {
178  public:
179   GSWriteStream(google::cloud::storage::Client& client,
180                 const char* bucket_name,
181                 const char* object_name,
182                 gcs::ObjectMetadata metadata)
183       : gcs_stream_(client.WriteObject(bucket_name,
184                                        object_name,
185                                        gcs::WithObjectMetadata(metadata))) {}
186   ~GSWriteStream() { gcs_stream_.Close(); }
187
188   gcs::ObjectWriteStream& stream() { return gcs_stream_; }
189
190  private:
191   gcs::ObjectWriteStream gcs_stream_;
192 };
193
194 static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
195   GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
196   GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
197   GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
198
199   gobject_class->set_property = gst_gs_sink_set_property;
200   gobject_class->get_property = gst_gs_sink_get_property;
201
202   /**
203    * GstGsSink:bucket-name:
204    *
205    * Name of the Google Cloud Storage bucket.
206    *
207    * Since: 1.20
208    */
209   g_object_class_install_property(
210       gobject_class, PROP_BUCKET_NAME,
211       g_param_spec_string(
212           "bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
213           NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
214
215   /**
216    * GstGsSink:object-name:
217    *
218    * Full path name of the remote file.
219    *
220    * Since: 1.20
221    */
222   g_object_class_install_property(
223       gobject_class, PROP_OBJECT_NAME,
224       g_param_spec_string(
225           "object-name", "Object Name", "Full path name of the remote file",
226           DEFAULT_OBJECT_NAME,
227           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
228
229   /**
230    * GstGsSink:index:
231    *
232    * Index to use with location property to create file names.
233    *
234    * Since: 1.20
235    */
236   g_object_class_install_property(
237       gobject_class, PROP_INDEX,
238       g_param_spec_int(
239           "index", "Index",
240           "Index to use with location property to create file names.  The "
241           "index is incremented by one for each buffer written.",
242           0, G_MAXINT, DEFAULT_INDEX,
243           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
244
245   /**
246    * GstGsSink:post-messages:
247    *
248    * Post a message on the GstBus for each file.
249    *
250    * Since: 1.20
251    */
252   g_object_class_install_property(
253       gobject_class, PROP_POST_MESSAGES,
254       g_param_spec_boolean(
255           "post-messages", "Post Messages",
256           "Post a message for each file with information of the buffer",
257           DEFAULT_POST_MESSAGES,
258           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
259   /**
260    * GstGsSink:next-file:
261    *
262    * A #GstGsSinkNext that specifies when to start a new file.
263    *
264    * Since: 1.20
265    */
266   g_object_class_install_property(
267       gobject_class, PROP_NEXT_FILE,
268       g_param_spec_enum(
269           "next-file", "Next File", "When to start a new file",
270           GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
271           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
272
273   /**
274    * GstGsSink:service-account-email:
275    *
276    * Service Account Email to use for credentials.
277    *
278    * Since: 1.20
279    */
280   g_object_class_install_property(
281       gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
282       g_param_spec_string(
283           "service-account-email", "Service Account Email",
284           "Service Account Email to use for credentials", NULL,
285           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
286                         GST_PARAM_MUTABLE_READY)));
287
288   /**
289    * GstGsSink:service-account-credentials:
290    *
291    * Service Account Credentials as a JSON string to use for credentials.
292    *
293    * Since: 1.20
294    */
295   g_object_class_install_property(
296       gobject_class, PROP_SERVICE_ACCOUNT_CREDENTIALS,
297       g_param_spec_string(
298           "service-account-credentials", "Service Account Credentials",
299           "Service Account Credentials as a JSON string to use for credentials",
300           NULL,
301           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
302                         GST_PARAM_MUTABLE_READY)));
303
304   /**
305    * GstGsSink:start-date:
306    *
307    * Start date in iso8601 format.
308    *
309    * Since: 1.20
310    */
311   g_object_class_install_property(
312       gobject_class, PROP_START_DATE,
313       g_param_spec_string(
314           "start-date", "Start Date", "Start date in iso8601 format", NULL,
315           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
316                         GST_PARAM_MUTABLE_READY)));
317
318   /**
319    * GstGsSink:metadata:
320    *
321    * A map of metadata to store with the object; field values need to be
322    * convertible to strings.
323    *
324    * Since: 1.20
325    */
326   g_object_class_install_property(
327       gobject_class, PROP_METADATA,
328       g_param_spec_boxed(
329           "metadata", "Metadata",
330           "A map of metadata to store with the object; field values need to be "
331           "convertible to strings.",
332           GST_TYPE_STRUCTURE,
333           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
334                         GST_PARAM_MUTABLE_READY)));
335
336   gobject_class->finalize = gst_gs_sink_finalize;
337
338   gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
339   gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
340   gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
341   gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
342   gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
343   gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
344
345   GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
346
347   gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
348   gst_element_class_set_static_metadata(
349       gstelement_class, "Google Cloud Storage Sink", "Sink/File",
350       "Write buffers to a sequentially named set of files on Google Cloud "
351       "Storage",
352       "Julien Isorce <jisorce@oblong.com>");
353 }
354
355 static void gst_gs_sink_init(GstGsSink* sink) {
356   sink->gcs_client = nullptr;
357   sink->gcs_stream = nullptr;
358   sink->index = DEFAULT_INDEX;
359   sink->post_messages = DEFAULT_POST_MESSAGES;
360   sink->service_account_email = NULL;
361   sink->service_account_credentials = NULL;
362   sink->bucket_name = NULL;
363   sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
364   sink->start_date_str = NULL;
365   sink->start_date = NULL;
366   sink->next_file = DEFAULT_NEXT_FILE;
367   sink->content_type = NULL;
368   sink->nb_percent_format = 0;
369   sink->percent_s_is_first = FALSE;
370
371   gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
372 }
373
374 static void gst_gs_sink_finalize(GObject* object) {
375   GstGsSink* sink = GST_GS_SINK(object);
376
377   sink->gcs_client = nullptr;
378   sink->gcs_stream = nullptr;
379   g_free(sink->service_account_email);
380   sink->service_account_email = NULL;
381   g_free(sink->service_account_credentials);
382   sink->service_account_credentials = NULL;
383   g_free(sink->bucket_name);
384   sink->bucket_name = NULL;
385   g_free(sink->object_name);
386   sink->object_name = NULL;
387   g_free(sink->start_date_str);
388   sink->start_date_str = NULL;
389   if (sink->start_date) {
390     g_date_time_unref(sink->start_date);
391     sink->start_date = NULL;
392   }
393   sink->content_type = NULL;
394   g_clear_pointer(&sink->metadata, gst_structure_free);
395
396   G_OBJECT_CLASS(parent_class)->finalize(object);
397 }
398
399 static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
400                                             const gchar* object_name) {
401   g_free(sink->object_name);
402   sink->object_name = NULL;
403   sink->nb_percent_format = 0;
404   sink->percent_s_is_first = FALSE;
405
406   if (!object_name) {
407     GST_ERROR_OBJECT(sink, "Object name is null");
408     return FALSE;
409   }
410
411   const std::string name(object_name);
412   sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
413   if (sink->nb_percent_format > 2) {
414     GST_ERROR_OBJECT(sink, "Object name has too many formats");
415     return FALSE;
416   }
417
418   const size_t delimiter_percent_s = name.find("%s");
419   if (delimiter_percent_s == std::string::npos) {
420     if (sink->nb_percent_format == 2) {
421       GST_ERROR_OBJECT(sink, "Object name must have just one number format");
422       return FALSE;
423     }
424     sink->object_name = g_strdup(object_name);
425     return TRUE;
426   }
427
428   const size_t delimiter_percent = name.find_first_of('%');
429   if (delimiter_percent_s == delimiter_percent) {
430     sink->percent_s_is_first = TRUE;
431
432     if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
433       GST_ERROR_OBJECT(sink, "Object name expect max one string format");
434       return FALSE;
435     }
436   }
437
438   sink->object_name = g_strdup(object_name);
439
440   return TRUE;
441 }
442
443 static void gst_gs_sink_set_property(GObject* object,
444                                      guint prop_id,
445                                      const GValue* value,
446                                      GParamSpec* pspec) {
447   GstGsSink* sink = GST_GS_SINK(object);
448
449   switch (prop_id) {
450     case PROP_BUCKET_NAME:
451       g_free(sink->bucket_name);
452       sink->bucket_name = g_strdup(g_value_get_string(value));
453       break;
454     case PROP_OBJECT_NAME:
455       gst_gs_sink_set_object_name(sink, g_value_get_string(value));
456       break;
457     case PROP_INDEX:
458       sink->index = g_value_get_int(value);
459       break;
460     case PROP_POST_MESSAGES:
461       sink->post_messages = g_value_get_boolean(value);
462       break;
463     case PROP_NEXT_FILE:
464       sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
465       break;
466     case PROP_SERVICE_ACCOUNT_EMAIL:
467       g_free(sink->service_account_email);
468       sink->service_account_email = g_strdup(g_value_get_string(value));
469       break;
470     case PROP_SERVICE_ACCOUNT_CREDENTIALS:
471       g_free(sink->service_account_credentials);
472       sink->service_account_credentials = g_strdup(g_value_get_string(value));
473       break;
474     case PROP_START_DATE:
475       g_free(sink->start_date_str);
476       if (sink->start_date)
477         g_date_time_unref(sink->start_date);
478       sink->start_date_str = g_strdup(g_value_get_string(value));
479       sink->start_date =
480           g_date_time_new_from_iso8601(sink->start_date_str, NULL);
481       if (!sink->start_date) {
482         GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
483                          sink->start_date_str);
484         g_free(sink->start_date_str);
485         sink->start_date_str = NULL;
486       }
487       break;
488     case PROP_METADATA:
489       g_clear_pointer(&sink->metadata, gst_structure_free);
490       sink->metadata = (GstStructure*)g_value_dup_boxed(value);
491       break;
492     default:
493       G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
494       break;
495   }
496 }
497
498 static void gst_gs_sink_get_property(GObject* object,
499                                      guint prop_id,
500                                      GValue* value,
501                                      GParamSpec* pspec) {
502   GstGsSink* sink = GST_GS_SINK(object);
503
504   switch (prop_id) {
505     case PROP_BUCKET_NAME:
506       g_value_set_string(value, sink->bucket_name);
507       break;
508     case PROP_OBJECT_NAME:
509       g_value_set_string(value, sink->object_name);
510       break;
511     case PROP_INDEX:
512       g_value_set_int(value, sink->index);
513       break;
514     case PROP_POST_MESSAGES:
515       g_value_set_boolean(value, sink->post_messages);
516       break;
517     case PROP_NEXT_FILE:
518       g_value_set_enum(value, sink->next_file);
519       break;
520     case PROP_SERVICE_ACCOUNT_EMAIL:
521       g_value_set_string(value, sink->service_account_email);
522       break;
523     case PROP_SERVICE_ACCOUNT_CREDENTIALS:
524       g_value_set_string(value, sink->service_account_credentials);
525       break;
526     case PROP_START_DATE:
527       g_value_set_string(value, sink->start_date_str);
528       break;
529     case PROP_METADATA:
530       g_value_set_boxed(value, sink->metadata);
531       break;
532     default:
533       G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
534       break;
535   }
536 }
537
538 static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
539   GstGsSink* sink = GST_GS_SINK(bsink);
540   GError* err = NULL;
541
542   if (!sink->bucket_name) {
543     GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
544                       GST_ERROR_SYSTEM);
545     return FALSE;
546   }
547
548   if (!sink->object_name) {
549     GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
550                       GST_ERROR_SYSTEM);
551     return FALSE;
552   }
553
554   sink->content_type = "";
555
556   sink->gcs_client = gst_gs_create_client(
557       sink->service_account_email, sink->service_account_credentials, &err);
558   if (err) {
559     GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
560                       ("Could not create client (%s)", err->message),
561                       GST_ERROR_SYSTEM);
562     g_clear_error(&err);
563     return FALSE;
564   }
565
566   GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
567                   sink->bucket_name, sink->object_name);
568
569   return TRUE;
570 }
571
572 static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
573   GstGsSink* sink = GST_GS_SINK(bsink);
574
575   sink->gcs_client = nullptr;
576   sink->gcs_stream = nullptr;
577   sink->content_type = NULL;
578
579   return TRUE;
580 }
581
582 static void gst_gs_sink_post_message_full(GstGsSink* sink,
583                                           GstClockTime timestamp,
584                                           GstClockTime duration,
585                                           GstClockTime offset,
586                                           GstClockTime offset_end,
587                                           GstClockTime running_time,
588                                           GstClockTime stream_time,
589                                           const char* filename,
590                                           const gchar* date) {
591   GstStructure* s;
592
593   if (!sink->post_messages)
594     return;
595
596   s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
597                         "date", G_TYPE_STRING, date, "index", G_TYPE_INT,
598                         sink->index, "timestamp", G_TYPE_UINT64, timestamp,
599                         "stream-time", G_TYPE_UINT64, stream_time,
600                         "running-time", G_TYPE_UINT64, running_time, "duration",
601                         G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
602                         offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
603
604   gst_element_post_message(GST_ELEMENT_CAST(sink),
605                            gst_message_new_element(GST_OBJECT_CAST(sink), s));
606 }
607
608 static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
609                                                GstClockTime timestamp,
610                                                GstClockTime duration,
611                                                const char* filename) {
612   GstClockTime running_time, stream_time;
613   guint64 offset, offset_end;
614   GstSegment* segment;
615   GstFormat format;
616
617   if (!sink->post_messages)
618     return;
619
620   segment = &GST_BASE_SINK(sink)->segment;
621   format = segment->format;
622
623   offset = -1;
624   offset_end = -1;
625
626   running_time = gst_segment_to_running_time(segment, format, timestamp);
627   stream_time = gst_segment_to_stream_time(segment, format, timestamp);
628
629   gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
630                                 running_time, stream_time, filename, NULL);
631 }
632
633 static void gst_gs_sink_post_message(GstGsSink* sink,
634                                      GstBuffer* buffer,
635                                      const char* filename,
636                                      const char* date) {
637   GstClockTime duration, timestamp;
638   GstClockTime running_time, stream_time;
639   guint64 offset, offset_end;
640   GstSegment* segment;
641   GstFormat format;
642
643   if (!sink->post_messages)
644     return;
645
646   segment = &GST_BASE_SINK(sink)->segment;
647   format = segment->format;
648
649   timestamp = GST_BUFFER_PTS(buffer);
650   duration = GST_BUFFER_DURATION(buffer);
651   offset = GST_BUFFER_OFFSET(buffer);
652   offset_end = GST_BUFFER_OFFSET_END(buffer);
653
654   running_time = gst_segment_to_running_time(segment, format, timestamp);
655   stream_time = gst_segment_to_stream_time(segment, format, timestamp);
656
657   gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
658                                 running_time, stream_time, filename, date);
659 }
660
661 struct AddMetadataIter {
662   GstGsSink* sink;
663   gcs::ObjectMetadata* metadata;
664 };
665
666 static gboolean add_metadata_foreach(GQuark field_id,
667                                      const GValue* value,
668                                      gpointer user_data) {
669   struct AddMetadataIter* it = (struct AddMetadataIter*)user_data;
670   GValue svalue = G_VALUE_INIT;
671
672   g_value_init(&svalue, G_TYPE_STRING);
673
674   if (g_value_transform(value, &svalue)) {
675     const gchar* key = g_quark_to_string(field_id);
676     const gchar* value = g_value_get_string(&svalue);
677
678     GST_LOG_OBJECT(it->sink, "metadata '%s' -> '%s'", key, value);
679     it->metadata->upsert_metadata(key, value);
680   } else {
681     GST_WARNING_OBJECT(it->sink, "Failed to convert metadata '%s' to string",
682                        g_quark_to_string(field_id));
683   }
684
685   g_value_unset(&svalue);
686   return TRUE;
687 }
688
689 static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
690                                               GstBuffer* buffer) {
691   GstMapInfo map = {0};
692   gchar* object_name = NULL;
693   gchar* buffer_date = NULL;
694
695   if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
696     return GST_FLOW_ERROR;
697
698   gcs::ObjectMetadata metadata =
699       gcs::ObjectMetadata().set_content_type(sink->content_type);
700
701   if (sink->metadata) {
702     struct AddMetadataIter it = {sink, &metadata};
703
704     gst_structure_foreach(sink->metadata, add_metadata_foreach, &it);
705   }
706
707   switch (sink->next_file) {
708     case GST_GS_SINK_NEXT_BUFFER: {
709       // Get buffer date if needed.
710       if (sink->start_date) {
711         if (sink->nb_percent_format != 2) {
712           GST_ERROR_OBJECT(sink, "Object name expects date and index");
713           gst_buffer_unmap(buffer, &map);
714           return GST_FLOW_ERROR;
715         }
716
717         if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
718           GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
719           gst_buffer_unmap(buffer, &map);
720           return GST_FLOW_ERROR;
721         }
722
723         if (sink->percent_s_is_first) {
724           object_name =
725               g_strdup_printf(sink->object_name, buffer_date, sink->index);
726         } else {
727           object_name =
728               g_strdup_printf(sink->object_name, sink->index, buffer_date);
729         }
730       } else {
731         if (sink->nb_percent_format != 1) {
732           GST_ERROR_OBJECT(sink, "Object name expects only an index");
733           gst_buffer_unmap(buffer, &map);
734           return GST_FLOW_ERROR;
735         }
736
737         object_name = g_strdup_printf(sink->object_name, sink->index);
738       }
739
740       GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
741
742       gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
743           sink->bucket_name, object_name, gcs::WithObjectMetadata(metadata));
744
745       gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
746       if (gcs_stream.fail()) {
747         GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
748       }
749       gcs_stream.Close();
750
751       google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
752           sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
753       if (!object_metadata) {
754         GST_ERROR_OBJECT(
755             sink, "Could not get object metadata for object %s (%s)",
756             object_name, object_metadata.status().message().c_str());
757         gst_buffer_unmap(buffer, &map);
758         g_free(object_name);
759         g_free(buffer_date);
760         return GST_FLOW_ERROR;
761       }
762
763       GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
764                       object_name, object_metadata->size());
765
766       gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
767       g_free(object_name);
768       g_free(buffer_date);
769       ++sink->index;
770       break;
771     }
772     case GST_GS_SINK_NEXT_NONE: {
773       if (!sink->gcs_stream) {
774         GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
775         sink->gcs_stream = std::make_unique<GSWriteStream>(
776             *sink->gcs_client.get(), sink->bucket_name, sink->object_name,
777             metadata);
778
779         if (!sink->gcs_stream->stream().IsOpen()) {
780           GST_ELEMENT_ERROR(
781               sink, RESOURCE, OPEN_READ,
782               ("Could not create write stream (%s)",
783                sink->gcs_stream->stream().last_status().message().c_str()),
784               GST_ERROR_SYSTEM);
785           gst_buffer_unmap(buffer, &map);
786           return GST_FLOW_OK;
787         }
788       }
789
790       GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
791
792       gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
793       stream.write(reinterpret_cast<const char*>(map.data), map.size);
794       if (stream.fail()) {
795         GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
796       }
797       break;
798     }
799     default:
800       g_assert_not_reached();
801   }
802
803   gst_buffer_unmap(buffer, &map);
804   return GST_FLOW_OK;
805 }
806
807 static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
808   GstGsSink* sink = GST_GS_SINK(bsink);
809   GstFlowReturn flow = GST_FLOW_OK;
810
811   flow = gst_gs_sink_write_buffer(sink, buffer);
812   return flow;
813 }
814
815 static gboolean buffer_list_copy_data(GstBuffer** buf,
816                                       guint idx,
817                                       gpointer data) {
818   GstBuffer* dest = GST_BUFFER_CAST(data);
819   guint num, i;
820
821   if (idx == 0)
822     gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
823
824   num = gst_buffer_n_memory(*buf);
825   for (i = 0; i < num; ++i) {
826     GstMemory* mem;
827
828     mem = gst_buffer_get_memory(*buf, i);
829     gst_buffer_append_memory(dest, mem);
830   }
831
832   return TRUE;
833 }
834
835 /* Our assumption for now is that the buffers in a buffer list should always
836  * end up in the same file. If someone wants different behaviour, they'll just
837  * have to add a property for that. */
838 static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
839                                              GstBufferList* list) {
840   GstBuffer* buf;
841   guint size;
842
843   size = gst_buffer_list_calculate_size(list);
844   GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
845
846   /* copy all buffers in the list into one single buffer, so we can use
847    * the normal render function (FIXME: optimise to avoid the memcpy) */
848   buf = gst_buffer_new();
849   gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
850   g_assert(gst_buffer_get_size(buf) == size);
851
852   gst_gs_sink_render(sink, buf);
853   gst_buffer_unref(buf);
854
855   return GST_FLOW_OK;
856 }
857
858 static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
859   GstGsSink* sink = GST_GS_SINK(bsink);
860   GstStructure* s = gst_caps_get_structure(caps, 0);
861
862   sink->content_type = gst_structure_get_name(s);
863
864   GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
865
866   return TRUE;
867 }
868
869 static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
870   GstGsSink* sink = GST_GS_SINK(bsink);
871
872   switch (GST_EVENT_TYPE(event)) {
873     case GST_EVENT_EOS:
874       if (sink->gcs_stream) {
875         sink->gcs_stream = nullptr;
876         gst_gs_sink_post_message_from_time(
877             sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
878       }
879       break;
880     default:
881       break;
882   }
883
884   return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
885 }