gssink: add 'content-type' property
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / ext / gs / gstgssink.cpp
1 /* GStreamer
2  * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
3  *
4  * gstgssink.cpp:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:element-gssink
23  * @title: gssink
24  * @see_also: #GstGsSrc
25  *
26  * Write incoming data to a series of sequentially-named remote files on a
27  * Google Cloud Storage bucket.
28  *
29  * The object-name property should contain a string with a \%d placeholder
30  * that will be substituted with the index for each filename.
31  *
32  * If the #GstGsSink:post-messages property is %TRUE, it sends an application
33  * message named `GstGsSink` after writing each buffer.
34  *
35  * The message's structure contains these fields:
36  *
37  * * #gchararray `filename`: the filename where the buffer was written.
38  * * #gchararray `date`: the date of the current buffer, NULL if no start date
39  * is provided.
40  * * #gint `index`: index of the buffer.
41  * * #GstClockTime `timestamp`: the timestamp of the buffer.
42  * * #GstClockTime `stream-time`: the stream time of the buffer.
43  * * #GstClockTime `running-time`: the running_time of the buffer.
44  * * #GstClockTime `duration`: the duration of the buffer.
45  * * #guint64 `offset`: the offset of the buffer that triggered the message.
46  * * #guint64 `offset-end`: the offset-end of the buffer that triggered the
47  * message.
48  *
49  * ## Example launch line
50  * ```
51  * gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
52  * object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
53  * next-file=buffer post-messages=true
54  * ```
55  * ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
56  * names are frame00000.png, frame00001.png, ..., frame00014.png
57  * ```
58  * gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
59  * pngenc ! gssink start-date="2020-04-16T08:55:03Z"
60  * object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
61  * next-file=buffer post-messages=true
62  * ```
63  * ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
64  * names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
65  * im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
66  * im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
67  * ```
68  * gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
69  * object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
70  * ```
71  * ### Upload any stream as a single file into Google Cloud Storage. Similar as
72  * filesink in this case. The file is then accessible from:
73  * gs://mybucket/mypath/myvideos/video.mp4
74  *
75  * See also: #GstGsSrc
76  * Since: 1.20
77  */
78
79 #ifdef HAVE_CONFIG_H
80 #include "config.h"
81 #endif
82
83 #include "gstgscommon.h"
84 #include "gstgssink.h"
85
86 #include <algorithm>
87
88 static GstStaticPadTemplate sinktemplate =
89     GST_STATIC_PAD_TEMPLATE("sink",
90                             GST_PAD_SINK,
91                             GST_PAD_ALWAYS,
92                             GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
95 #define GST_CAT_DEFAULT gst_gs_sink_debug
96
97 #define DEFAULT_INDEX 0
98 #define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
99 #define DEFAULT_OBJECT_NAME "%s_%05d"
100 #define DEFAULT_POST_MESSAGES FALSE
101
102 namespace gcs = google::cloud::storage;
103
104 enum {
105   PROP_0,
106   PROP_BUCKET_NAME,
107   PROP_OBJECT_NAME,
108   PROP_INDEX,
109   PROP_POST_MESSAGES,
110   PROP_NEXT_FILE,
111   PROP_SERVICE_ACCOUNT_EMAIL,
112   PROP_START_DATE,
113   PROP_SERVICE_ACCOUNT_CREDENTIALS,
114   PROP_METADATA,
115   PROP_CONTENT_TYPE,
116 };
117
118 class GSWriteStream;
119
120 struct _GstGsSink {
121   GstBaseSink parent;
122
123   std::unique_ptr<google::cloud::storage::Client> gcs_client;
124   std::unique_ptr<GSWriteStream> gcs_stream;
125   gchar* service_account_email;
126   gchar* service_account_credentials;
127   gchar* bucket_name;
128   gchar* object_name;
129   gchar* start_date_str;
130   GDateTime* start_date;
131   gint index;
132   gboolean post_messages;
133   GstGsSinkNext next_file;
134   const gchar* content_type;
135   gchar* content_type_prop;
136   size_t nb_percent_format;
137   gboolean percent_s_is_first;
138   GstStructure* metadata;
139 };
140
141 static void gst_gs_sink_finalize(GObject* object);
142
143 static void gst_gs_sink_set_property(GObject* object,
144                                      guint prop_id,
145                                      const GValue* value,
146                                      GParamSpec* pspec);
147 static void gst_gs_sink_get_property(GObject* object,
148                                      guint prop_id,
149                                      GValue* value,
150                                      GParamSpec* pspec);
151
152 static gboolean gst_gs_sink_start(GstBaseSink* bsink);
153 static gboolean gst_gs_sink_stop(GstBaseSink* sink);
154 static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
155 static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
156                                              GstBufferList* buffer_list);
157 static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
158 static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
159
160 #define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
161 static GType gst_gs_sink_next_get_type(void) {
162   static GType gs_sink_next_type = 0;
163   static const GEnumValue next_types[] = {
164       {GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
165       {GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
166       {0, NULL, NULL}};
167
168   if (!gs_sink_next_type) {
169     gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
170   }
171
172   return gs_sink_next_type;
173 }
174
175 #define gst_gs_sink_parent_class parent_class
176 G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
177 GST_ELEMENT_REGISTER_DEFINE(gssink, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK)
178
179 class GSWriteStream {
180  public:
181   GSWriteStream(google::cloud::storage::Client& client,
182                 const char* bucket_name,
183                 const char* object_name,
184                 gcs::ObjectMetadata metadata)
185       : gcs_stream_(client.WriteObject(bucket_name,
186                                        object_name,
187                                        gcs::WithObjectMetadata(metadata))) {}
188   ~GSWriteStream() { gcs_stream_.Close(); }
189
190   gcs::ObjectWriteStream& stream() { return gcs_stream_; }
191
192  private:
193   gcs::ObjectWriteStream gcs_stream_;
194 };
195
196 static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
197   GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
198   GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
199   GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
200
201   gobject_class->set_property = gst_gs_sink_set_property;
202   gobject_class->get_property = gst_gs_sink_get_property;
203
204   /**
205    * GstGsSink:bucket-name:
206    *
207    * Name of the Google Cloud Storage bucket.
208    *
209    * Since: 1.20
210    */
211   g_object_class_install_property(
212       gobject_class, PROP_BUCKET_NAME,
213       g_param_spec_string(
214           "bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
215           NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
216
217   /**
218    * GstGsSink:object-name:
219    *
220    * Full path name of the remote file.
221    *
222    * Since: 1.20
223    */
224   g_object_class_install_property(
225       gobject_class, PROP_OBJECT_NAME,
226       g_param_spec_string(
227           "object-name", "Object Name", "Full path name of the remote file",
228           DEFAULT_OBJECT_NAME,
229           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
230
231   /**
232    * GstGsSink:index:
233    *
234    * Index to use with location property to create file names.
235    *
236    * Since: 1.20
237    */
238   g_object_class_install_property(
239       gobject_class, PROP_INDEX,
240       g_param_spec_int(
241           "index", "Index",
242           "Index to use with location property to create file names.  The "
243           "index is incremented by one for each buffer written.",
244           0, G_MAXINT, DEFAULT_INDEX,
245           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
246
247   /**
248    * GstGsSink:post-messages:
249    *
250    * Post a message on the GstBus for each file.
251    *
252    * Since: 1.20
253    */
254   g_object_class_install_property(
255       gobject_class, PROP_POST_MESSAGES,
256       g_param_spec_boolean(
257           "post-messages", "Post Messages",
258           "Post a message for each file with information of the buffer",
259           DEFAULT_POST_MESSAGES,
260           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
261   /**
262    * GstGsSink:next-file:
263    *
264    * A #GstGsSinkNext that specifies when to start a new file.
265    *
266    * Since: 1.20
267    */
268   g_object_class_install_property(
269       gobject_class, PROP_NEXT_FILE,
270       g_param_spec_enum(
271           "next-file", "Next File", "When to start a new file",
272           GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
273           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
274
275   /**
276    * GstGsSink:service-account-email:
277    *
278    * Service Account Email to use for credentials.
279    *
280    * Since: 1.20
281    */
282   g_object_class_install_property(
283       gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
284       g_param_spec_string(
285           "service-account-email", "Service Account Email",
286           "Service Account Email to use for credentials", NULL,
287           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
288                         GST_PARAM_MUTABLE_READY)));
289
290   /**
291    * GstGsSink:service-account-credentials:
292    *
293    * Service Account Credentials as a JSON string to use for credentials.
294    *
295    * Since: 1.20
296    */
297   g_object_class_install_property(
298       gobject_class, PROP_SERVICE_ACCOUNT_CREDENTIALS,
299       g_param_spec_string(
300           "service-account-credentials", "Service Account Credentials",
301           "Service Account Credentials as a JSON string to use for credentials",
302           NULL,
303           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
304                         GST_PARAM_MUTABLE_READY)));
305
306   /**
307    * GstGsSink:start-date:
308    *
309    * Start date in iso8601 format.
310    *
311    * Since: 1.20
312    */
313   g_object_class_install_property(
314       gobject_class, PROP_START_DATE,
315       g_param_spec_string(
316           "start-date", "Start Date", "Start date in iso8601 format", NULL,
317           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
318                         GST_PARAM_MUTABLE_READY)));
319
320   /**
321    * GstGsSink:metadata:
322    *
323    * A map of metadata to store with the object; field values need to be
324    * convertible to strings.
325    *
326    * Since: 1.20
327    */
328   g_object_class_install_property(
329       gobject_class, PROP_METADATA,
330       g_param_spec_boxed(
331           "metadata", "Metadata",
332           "A map of metadata to store with the object; field values need to be "
333           "convertible to strings.",
334           GST_TYPE_STRUCTURE,
335           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
336                         GST_PARAM_MUTABLE_READY)));
337
338   /**
339    * GstGsSink:content-type:
340    *
341    * The Content-Type of the object.
342    * If not set we use the name of the input caps.
343    *
344    * Since: 1.22
345    */
346   g_object_class_install_property(
347       gobject_class, PROP_CONTENT_TYPE,
348       g_param_spec_string(
349           "content-type", "Content-Type",
350           "The Content-Type of the object",
351           NULL,
352           (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
353                         GST_PARAM_MUTABLE_READY)));
354
355   gobject_class->finalize = gst_gs_sink_finalize;
356
357   gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
358   gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
359   gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
360   gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
361   gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
362   gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
363
364   GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
365
366   gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
367   gst_element_class_set_static_metadata(
368       gstelement_class, "Google Cloud Storage Sink", "Sink/File",
369       "Write buffers to a sequentially named set of files on Google Cloud "
370       "Storage",
371       "Julien Isorce <jisorce@oblong.com>");
372 }
373
374 static void gst_gs_sink_init(GstGsSink* sink) {
375   sink->gcs_client = nullptr;
376   sink->gcs_stream = nullptr;
377   sink->index = DEFAULT_INDEX;
378   sink->post_messages = DEFAULT_POST_MESSAGES;
379   sink->service_account_email = NULL;
380   sink->service_account_credentials = NULL;
381   sink->bucket_name = NULL;
382   sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
383   sink->start_date_str = NULL;
384   sink->start_date = NULL;
385   sink->next_file = DEFAULT_NEXT_FILE;
386   sink->content_type = NULL;
387   sink->content_type_prop = NULL;
388   sink->nb_percent_format = 0;
389   sink->percent_s_is_first = FALSE;
390
391   gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
392 }
393
394 static void gst_gs_sink_finalize(GObject* object) {
395   GstGsSink* sink = GST_GS_SINK(object);
396
397   sink->gcs_client = nullptr;
398   sink->gcs_stream = nullptr;
399   g_free(sink->service_account_email);
400   sink->service_account_email = NULL;
401   g_free(sink->service_account_credentials);
402   sink->service_account_credentials = NULL;
403   g_free(sink->bucket_name);
404   sink->bucket_name = NULL;
405   g_free(sink->object_name);
406   sink->object_name = NULL;
407   g_free(sink->start_date_str);
408   sink->start_date_str = NULL;
409   if (sink->start_date) {
410     g_date_time_unref(sink->start_date);
411     sink->start_date = NULL;
412   }
413   sink->content_type = NULL;
414   g_clear_pointer(&sink->content_type_prop, g_free);
415   g_clear_pointer(&sink->metadata, gst_structure_free);
416
417   G_OBJECT_CLASS(parent_class)->finalize(object);
418 }
419
420 static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
421                                             const gchar* object_name) {
422   g_free(sink->object_name);
423   sink->object_name = NULL;
424   sink->nb_percent_format = 0;
425   sink->percent_s_is_first = FALSE;
426
427   if (!object_name) {
428     GST_ERROR_OBJECT(sink, "Object name is null");
429     return FALSE;
430   }
431
432   const std::string name(object_name);
433   sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
434   if (sink->nb_percent_format > 2) {
435     GST_ERROR_OBJECT(sink, "Object name has too many formats");
436     return FALSE;
437   }
438
439   const size_t delimiter_percent_s = name.find("%s");
440   if (delimiter_percent_s == std::string::npos) {
441     if (sink->nb_percent_format == 2) {
442       GST_ERROR_OBJECT(sink, "Object name must have just one number format");
443       return FALSE;
444     }
445     sink->object_name = g_strdup(object_name);
446     return TRUE;
447   }
448
449   const size_t delimiter_percent = name.find_first_of('%');
450   if (delimiter_percent_s == delimiter_percent) {
451     sink->percent_s_is_first = TRUE;
452
453     if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
454       GST_ERROR_OBJECT(sink, "Object name expect max one string format");
455       return FALSE;
456     }
457   }
458
459   sink->object_name = g_strdup(object_name);
460
461   return TRUE;
462 }
463
464 static void gst_gs_sink_set_property(GObject* object,
465                                      guint prop_id,
466                                      const GValue* value,
467                                      GParamSpec* pspec) {
468   GstGsSink* sink = GST_GS_SINK(object);
469
470   switch (prop_id) {
471     case PROP_BUCKET_NAME:
472       g_free(sink->bucket_name);
473       sink->bucket_name = g_strdup(g_value_get_string(value));
474       break;
475     case PROP_OBJECT_NAME:
476       gst_gs_sink_set_object_name(sink, g_value_get_string(value));
477       break;
478     case PROP_INDEX:
479       sink->index = g_value_get_int(value);
480       break;
481     case PROP_POST_MESSAGES:
482       sink->post_messages = g_value_get_boolean(value);
483       break;
484     case PROP_NEXT_FILE:
485       sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
486       break;
487     case PROP_SERVICE_ACCOUNT_EMAIL:
488       g_free(sink->service_account_email);
489       sink->service_account_email = g_strdup(g_value_get_string(value));
490       break;
491     case PROP_SERVICE_ACCOUNT_CREDENTIALS:
492       g_free(sink->service_account_credentials);
493       sink->service_account_credentials = g_strdup(g_value_get_string(value));
494       break;
495     case PROP_START_DATE:
496       g_free(sink->start_date_str);
497       if (sink->start_date)
498         g_date_time_unref(sink->start_date);
499       sink->start_date_str = g_strdup(g_value_get_string(value));
500       sink->start_date =
501           g_date_time_new_from_iso8601(sink->start_date_str, NULL);
502       if (!sink->start_date) {
503         GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
504                          sink->start_date_str);
505         g_free(sink->start_date_str);
506         sink->start_date_str = NULL;
507       }
508       break;
509     case PROP_METADATA:
510       g_clear_pointer(&sink->metadata, gst_structure_free);
511       sink->metadata = (GstStructure*)g_value_dup_boxed(value);
512       break;
513     case PROP_CONTENT_TYPE:
514       g_clear_pointer(&sink->content_type_prop, g_free);
515       sink->content_type_prop = g_value_dup_string(value);
516       break;
517     default:
518       G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
519       break;
520   }
521 }
522
523 static void gst_gs_sink_get_property(GObject* object,
524                                      guint prop_id,
525                                      GValue* value,
526                                      GParamSpec* pspec) {
527   GstGsSink* sink = GST_GS_SINK(object);
528
529   switch (prop_id) {
530     case PROP_BUCKET_NAME:
531       g_value_set_string(value, sink->bucket_name);
532       break;
533     case PROP_OBJECT_NAME:
534       g_value_set_string(value, sink->object_name);
535       break;
536     case PROP_INDEX:
537       g_value_set_int(value, sink->index);
538       break;
539     case PROP_POST_MESSAGES:
540       g_value_set_boolean(value, sink->post_messages);
541       break;
542     case PROP_NEXT_FILE:
543       g_value_set_enum(value, sink->next_file);
544       break;
545     case PROP_SERVICE_ACCOUNT_EMAIL:
546       g_value_set_string(value, sink->service_account_email);
547       break;
548     case PROP_SERVICE_ACCOUNT_CREDENTIALS:
549       g_value_set_string(value, sink->service_account_credentials);
550       break;
551     case PROP_START_DATE:
552       g_value_set_string(value, sink->start_date_str);
553       break;
554     case PROP_METADATA:
555       g_value_set_boxed(value, sink->metadata);
556       break;
557     case PROP_CONTENT_TYPE:
558       g_value_set_string(value, sink->content_type_prop);
559       break;
560     default:
561       G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
562       break;
563   }
564 }
565
566 static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
567   GstGsSink* sink = GST_GS_SINK(bsink);
568   GError* err = NULL;
569
570   if (!sink->bucket_name) {
571     GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
572                       GST_ERROR_SYSTEM);
573     return FALSE;
574   }
575
576   if (!sink->object_name) {
577     GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
578                       GST_ERROR_SYSTEM);
579     return FALSE;
580   }
581
582   sink->content_type = "";
583
584   sink->gcs_client = gst_gs_create_client(
585       sink->service_account_email, sink->service_account_credentials, &err);
586   if (err) {
587     GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
588                       ("Could not create client (%s)", err->message),
589                       GST_ERROR_SYSTEM);
590     g_clear_error(&err);
591     return FALSE;
592   }
593
594   GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
595                   sink->bucket_name, sink->object_name);
596
597   return TRUE;
598 }
599
600 static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
601   GstGsSink* sink = GST_GS_SINK(bsink);
602
603   sink->gcs_client = nullptr;
604   sink->gcs_stream = nullptr;
605   sink->content_type = NULL;
606
607   return TRUE;
608 }
609
610 static void gst_gs_sink_post_message_full(GstGsSink* sink,
611                                           GstClockTime timestamp,
612                                           GstClockTime duration,
613                                           GstClockTime offset,
614                                           GstClockTime offset_end,
615                                           GstClockTime running_time,
616                                           GstClockTime stream_time,
617                                           const char* filename,
618                                           const gchar* date) {
619   GstStructure* s;
620
621   if (!sink->post_messages)
622     return;
623
624   s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
625                         "date", G_TYPE_STRING, date, "index", G_TYPE_INT,
626                         sink->index, "timestamp", G_TYPE_UINT64, timestamp,
627                         "stream-time", G_TYPE_UINT64, stream_time,
628                         "running-time", G_TYPE_UINT64, running_time, "duration",
629                         G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
630                         offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
631
632   gst_element_post_message(GST_ELEMENT_CAST(sink),
633                            gst_message_new_element(GST_OBJECT_CAST(sink), s));
634 }
635
636 static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
637                                                GstClockTime timestamp,
638                                                GstClockTime duration,
639                                                const char* filename) {
640   GstClockTime running_time, stream_time;
641   guint64 offset, offset_end;
642   GstSegment* segment;
643   GstFormat format;
644
645   if (!sink->post_messages)
646     return;
647
648   segment = &GST_BASE_SINK(sink)->segment;
649   format = segment->format;
650
651   offset = -1;
652   offset_end = -1;
653
654   running_time = gst_segment_to_running_time(segment, format, timestamp);
655   stream_time = gst_segment_to_stream_time(segment, format, timestamp);
656
657   gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
658                                 running_time, stream_time, filename, NULL);
659 }
660
661 static void gst_gs_sink_post_message(GstGsSink* sink,
662                                      GstBuffer* buffer,
663                                      const char* filename,
664                                      const char* date) {
665   GstClockTime duration, timestamp;
666   GstClockTime running_time, stream_time;
667   guint64 offset, offset_end;
668   GstSegment* segment;
669   GstFormat format;
670
671   if (!sink->post_messages)
672     return;
673
674   segment = &GST_BASE_SINK(sink)->segment;
675   format = segment->format;
676
677   timestamp = GST_BUFFER_PTS(buffer);
678   duration = GST_BUFFER_DURATION(buffer);
679   offset = GST_BUFFER_OFFSET(buffer);
680   offset_end = GST_BUFFER_OFFSET_END(buffer);
681
682   running_time = gst_segment_to_running_time(segment, format, timestamp);
683   stream_time = gst_segment_to_stream_time(segment, format, timestamp);
684
685   gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
686                                 running_time, stream_time, filename, date);
687 }
688
689 struct AddMetadataIter {
690   GstGsSink* sink;
691   gcs::ObjectMetadata* metadata;
692 };
693
694 static gboolean add_metadata_foreach(GQuark field_id,
695                                      const GValue* value,
696                                      gpointer user_data) {
697   struct AddMetadataIter* it = (struct AddMetadataIter*)user_data;
698   GValue svalue = G_VALUE_INIT;
699
700   g_value_init(&svalue, G_TYPE_STRING);
701
702   if (g_value_transform(value, &svalue)) {
703     const gchar* key = g_quark_to_string(field_id);
704     const gchar* value = g_value_get_string(&svalue);
705
706     GST_LOG_OBJECT(it->sink, "metadata '%s' -> '%s'", key, value);
707     it->metadata->upsert_metadata(key, value);
708   } else {
709     GST_WARNING_OBJECT(it->sink, "Failed to convert metadata '%s' to string",
710                        g_quark_to_string(field_id));
711   }
712
713   g_value_unset(&svalue);
714   return TRUE;
715 }
716
717 static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
718                                               GstBuffer* buffer) {
719   GstMapInfo map = {0};
720   gchar* object_name = NULL;
721   gchar* buffer_date = NULL;
722   const gchar* content_type;
723
724   if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
725     return GST_FLOW_ERROR;
726
727   if (sink->content_type_prop)
728     content_type = sink->content_type_prop;
729   else
730     content_type = sink->content_type;
731
732   gcs::ObjectMetadata metadata =
733       gcs::ObjectMetadata().set_content_type(content_type);
734
735   if (sink->metadata) {
736     struct AddMetadataIter it = {sink, &metadata};
737
738     gst_structure_foreach(sink->metadata, add_metadata_foreach, &it);
739   }
740
741   switch (sink->next_file) {
742     case GST_GS_SINK_NEXT_BUFFER: {
743       // Get buffer date if needed.
744       if (sink->start_date) {
745         if (sink->nb_percent_format != 2) {
746           GST_ERROR_OBJECT(sink, "Object name expects date and index");
747           gst_buffer_unmap(buffer, &map);
748           return GST_FLOW_ERROR;
749         }
750
751         if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
752           GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
753           gst_buffer_unmap(buffer, &map);
754           return GST_FLOW_ERROR;
755         }
756
757         if (sink->percent_s_is_first) {
758           object_name =
759               g_strdup_printf(sink->object_name, buffer_date, sink->index);
760         } else {
761           object_name =
762               g_strdup_printf(sink->object_name, sink->index, buffer_date);
763         }
764       } else {
765         if (sink->nb_percent_format != 1) {
766           GST_ERROR_OBJECT(sink, "Object name expects only an index");
767           gst_buffer_unmap(buffer, &map);
768           return GST_FLOW_ERROR;
769         }
770
771         object_name = g_strdup_printf(sink->object_name, sink->index);
772       }
773
774       GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
775
776       gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
777           sink->bucket_name, object_name, gcs::WithObjectMetadata(metadata));
778
779       gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
780       if (gcs_stream.fail()) {
781         GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
782       }
783       gcs_stream.Close();
784
785       google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
786           sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
787       if (!object_metadata) {
788         GST_ERROR_OBJECT(
789             sink, "Could not get object metadata for object %s (%s)",
790             object_name, object_metadata.status().message().c_str());
791         gst_buffer_unmap(buffer, &map);
792         g_free(object_name);
793         g_free(buffer_date);
794         return GST_FLOW_ERROR;
795       }
796
797       GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
798                       object_name, object_metadata->size());
799
800       gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
801       g_free(object_name);
802       g_free(buffer_date);
803       ++sink->index;
804       break;
805     }
806     case GST_GS_SINK_NEXT_NONE: {
807       if (!sink->gcs_stream) {
808         GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
809         sink->gcs_stream = std::make_unique<GSWriteStream>(
810             *sink->gcs_client.get(), sink->bucket_name, sink->object_name,
811             metadata);
812
813         if (!sink->gcs_stream->stream().IsOpen()) {
814           GST_ELEMENT_ERROR(
815               sink, RESOURCE, OPEN_READ,
816               ("Could not create write stream (%s)",
817                sink->gcs_stream->stream().last_status().message().c_str()),
818               GST_ERROR_SYSTEM);
819           gst_buffer_unmap(buffer, &map);
820           return GST_FLOW_OK;
821         }
822       }
823
824       GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
825
826       gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
827       stream.write(reinterpret_cast<const char*>(map.data), map.size);
828       if (stream.fail()) {
829         GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
830       }
831       break;
832     }
833     default:
834       g_assert_not_reached();
835   }
836
837   gst_buffer_unmap(buffer, &map);
838   return GST_FLOW_OK;
839 }
840
841 static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
842   GstGsSink* sink = GST_GS_SINK(bsink);
843   GstFlowReturn flow = GST_FLOW_OK;
844
845   flow = gst_gs_sink_write_buffer(sink, buffer);
846   return flow;
847 }
848
849 static gboolean buffer_list_copy_data(GstBuffer** buf,
850                                       guint idx,
851                                       gpointer data) {
852   GstBuffer* dest = GST_BUFFER_CAST(data);
853   guint num, i;
854
855   if (idx == 0)
856     gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
857
858   num = gst_buffer_n_memory(*buf);
859   for (i = 0; i < num; ++i) {
860     GstMemory* mem;
861
862     mem = gst_buffer_get_memory(*buf, i);
863     gst_buffer_append_memory(dest, mem);
864   }
865
866   return TRUE;
867 }
868
869 /* Our assumption for now is that the buffers in a buffer list should always
870  * end up in the same file. If someone wants different behaviour, they'll just
871  * have to add a property for that. */
872 static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
873                                              GstBufferList* list) {
874   GstBuffer* buf;
875   guint size;
876
877   size = gst_buffer_list_calculate_size(list);
878   GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
879
880   /* copy all buffers in the list into one single buffer, so we can use
881    * the normal render function (FIXME: optimise to avoid the memcpy) */
882   buf = gst_buffer_new();
883   gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
884   g_assert(gst_buffer_get_size(buf) == size);
885
886   gst_gs_sink_render(sink, buf);
887   gst_buffer_unref(buf);
888
889   return GST_FLOW_OK;
890 }
891
892 static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
893   GstGsSink* sink = GST_GS_SINK(bsink);
894   GstStructure* s = gst_caps_get_structure(caps, 0);
895
896   sink->content_type = gst_structure_get_name(s);
897
898   /* TODO: we could automatically convert some caps such as 'video/quicktime,variant=iso' to 'video/mp4' */
899
900   GST_INFO_OBJECT(sink, "Content-Type: caps: %s property: %s", sink->content_type, sink->content_type_prop);
901
902   return TRUE;
903 }
904
905 static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
906   GstGsSink* sink = GST_GS_SINK(bsink);
907
908   switch (GST_EVENT_TYPE(event)) {
909     case GST_EVENT_EOS:
910       if (sink->gcs_stream) {
911         sink->gcs_stream = nullptr;
912         gst_gs_sink_post_message_from_time(
913             sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
914       }
915       break;
916     default:
917       break;
918   }
919
920   return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
921 }