gs: add source and sink for Google Cloud Storage
authorJulien <jisorce@oblong.com>
Tue, 23 Jun 2020 19:41:27 +0000 (12:41 -0700)
committerJulien Isorce <julien.isorce@gmail.com>
Thu, 18 Mar 2021 22:32:48 +0000 (22:32 +0000)
Useful when having a service that runs a GStreamer pipeline
or application in Google Cloud to avoid storing the inputs
and outputs in the running container or service. For example
when analyzing a video from a Google Cloud Storage bucket
and extracting images or converting the video and then uploading
the results into another Google Cloud Storage bucket.

- gssrc allows to read from a file located in Google Cloud
Storage and it supports seeking.
- gssink allows to write to a file located in Google Cloud
Storage. There are 2 modes, one similar to multifilesink and
the other similar to filesink.

Example:
  gst-launch-1.0 gssrc location=gs://mybucket/videos/sample.mp4 ! decodebin ! glimagesink
  gst-launch-1.0 playbin uri=gs://mybucket/videos/sample.mp4
  gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! gssink object-name="img/img%05d.png" bucket-name="mybucket" next-file=buffer
  gst-launch-1.0 filesrc location=sample.mp4 ! gssink object-name="videos/video.mp4" bucket-name="mybucket" next-file=none

When running locally simply set GOOGLE_APPLICATION_CREDENTIALS. But
when running in Google Cloud Run or Google Cloud Engine, just set the
"service-account-email" property on each element.

Closes #1264

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1369>

13 files changed:
docs/plugins/gst_plugins_cache.json
ext/gs/.clang-format [new file with mode: 0644]
ext/gs/README.md [new file with mode: 0644]
ext/gs/gstgs.cpp [new file with mode: 0644]
ext/gs/gstgscommon.cpp [new file with mode: 0644]
ext/gs/gstgscommon.h [new file with mode: 0644]
ext/gs/gstgssink.cpp [new file with mode: 0644]
ext/gs/gstgssink.h [new file with mode: 0644]
ext/gs/gstgssrc.cpp [new file with mode: 0644]
ext/gs/gstgssrc.h [new file with mode: 0644]
ext/gs/meson.build [new file with mode: 0644]
ext/meson.build
meson_options.txt

index 6eae764..02da740 100644 (file)
         "tracers": {},
         "url": "Unknown package origin"
     },
+    "gs": {
+        "description": "Read and write from and to a Google Cloud Storage",
+        "elements": {
+            "gssink": {
+                "author": "Julien Isorce <jisorce@oblong.com>",
+                "description": "Write buffers to a sequentially named set of files on Google Cloud Storage",
+                "hierarchy": [
+                    "GstGsSink",
+                    "GstBaseSink",
+                    "GstElement",
+                    "GstObject",
+                    "GInitiallyUnowned",
+                    "GObject"
+                ],
+                "klass": "Sink/File",
+                "long-name": "Google Cloud Storage Sink",
+                "pad-templates": {
+                    "sink": {
+                        "caps": "ANY",
+                        "direction": "sink",
+                        "presence": "always"
+                    }
+                },
+                "properties": {
+                    "bucket-name": {
+                        "blurb": "Google Cloud Storage Bucket Name",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "NULL",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "gchararray",
+                        "writable": true
+                    },
+                    "index": {
+                        "blurb": "Index to use with location property to create file names.  The index is incremented by one for each buffer written.",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "0",
+                        "max": "2147483647",
+                        "min": "0",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "gint",
+                        "writable": true
+                    },
+                    "next-file": {
+                        "blurb": "When to start a new file",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "buffer (0)",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "GstGsSinkNext",
+                        "writable": true
+                    },
+                    "object-name": {
+                        "blurb": "Full path name of the remote file",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "%%s_%%05d",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "gchararray",
+                        "writable": true
+                    },
+                    "post-messages": {
+                        "blurb": "Post a message for each file with information of the buffer",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "false",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "gboolean",
+                        "writable": true
+                    },
+                    "service-account-email": {
+                        "blurb": "Service Account Email to use for credentials",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "NULL",
+                        "mutable": "ready",
+                        "readable": true,
+                        "type": "gchararray",
+                        "writable": true
+                    },
+                    "start-date": {
+                        "blurb": "Start date in iso8601 format",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "NULL",
+                        "mutable": "ready",
+                        "readable": true,
+                        "type": "gchararray",
+                        "writable": true
+                    }
+                },
+                "rank": "none"
+            },
+            "gssrc": {
+                "author": "Julien Isorce <jisorce@oblong.com>",
+                "description": "Read from arbitrary point from a file in a Google Cloud Storage",
+                "hierarchy": [
+                    "GstGsSrc",
+                    "GstBaseSrc",
+                    "GstElement",
+                    "GstObject",
+                    "GInitiallyUnowned",
+                    "GObject"
+                ],
+                "interfaces": [
+                    "GstURIHandler"
+                ],
+                "klass": "Source/File",
+                "long-name": "Google Cloud Storage Source",
+                "pad-templates": {
+                    "src": {
+                        "caps": "ANY",
+                        "direction": "src",
+                        "presence": "always"
+                    }
+                },
+                "properties": {
+                    "location": {
+                        "blurb": "Location of the file to read",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "NULL",
+                        "mutable": "ready",
+                        "readable": true,
+                        "type": "gchararray",
+                        "writable": true
+                    },
+                    "service-account-email": {
+                        "blurb": "Service Account Email to use for credentials",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "NULL",
+                        "mutable": "ready",
+                        "readable": true,
+                        "type": "gchararray",
+                        "writable": true
+                    }
+                },
+                "rank": "none"
+            }
+        },
+        "filename": "gstgs",
+        "license": "LGPL",
+        "other-types": {
+            "GstGsSinkNext": {
+                "kind": "enum",
+                "values": [
+                    {
+                        "desc": "New file for each buffer",
+                        "name": "buffer",
+                        "value": "0"
+                    },
+                    {
+                        "desc": "New file for each buffer",
+                        "name": "Only one file, no next file",
+                        "value": "1"
+                    }
+                ]
+            }
+        },
+        "package": "GStreamer Bad Plug-ins",
+        "source": "gst-plugins-bad",
+        "tracers": {},
+        "url": "Unknown package origin"
+    },
     "gsm": {
         "description": "GSM encoder/decoder",
         "elements": {
diff --git a/ext/gs/.clang-format b/ext/gs/.clang-format
new file mode 100644 (file)
index 0000000..a27438b
--- /dev/null
@@ -0,0 +1,39 @@
+# Defines the Chromium style for automatic reformatting.
+# http://clang.llvm.org/docs/ClangFormatStyleOptions.html
+BasedOnStyle: Chromium
+# This defaults to 'Auto'. Explicitly set it for a while, so that
+# 'vector<vector<int> >' in existing files gets formatted to
+# 'vector<vector<int>>'. ('Auto' means that clang-format will only use
+# 'int>>' if the file already contains at least one such instance.)
+Standard: Cpp11
+
+# Make sure code like:
+# IPC_BEGIN_MESSAGE_MAP()
+#   IPC_MESSAGE_HANDLER(WidgetHostViewHost_Update, OnUpdate)
+# IPC_END_MESSAGE_MAP()
+# gets correctly indented.
+MacroBlockBegin: "^\
+BEGIN_MSG_MAP|\
+BEGIN_MSG_MAP_EX|\
+BEGIN_SAFE_MSG_MAP_EX|\
+CR_BEGIN_MSG_MAP_EX|\
+IPC_BEGIN_MESSAGE_MAP|\
+IPC_BEGIN_MESSAGE_MAP_WITH_PARAM|\
+IPC_PROTOBUF_MESSAGE_TRAITS_BEGIN|\
+IPC_STRUCT_BEGIN|\
+IPC_STRUCT_BEGIN_WITH_PARENT|\
+IPC_STRUCT_TRAITS_BEGIN|\
+POLPARAMS_BEGIN|\
+PPAPI_BEGIN_MESSAGE_MAP$"
+MacroBlockEnd: "^\
+CR_END_MSG_MAP|\
+END_MSG_MAP|\
+IPC_END_MESSAGE_MAP|\
+IPC_PROTOBUF_MESSAGE_TRAITS_END|\
+IPC_STRUCT_END|\
+IPC_STRUCT_TRAITS_END|\
+POLPARAMS_END|\
+PPAPI_END_MESSAGE_MAP$"
+
+# TODO: Remove this once clang-format r357700 is rolled in.
+JavaImportGroups: ['android', 'androidx', 'com', 'dalvik', 'junit', 'org', 'com.google.android.apps.chrome', 'org.chromium', 'java', 'javax']
diff --git a/ext/gs/README.md b/ext/gs/README.md
new file mode 100644 (file)
index 0000000..48fb88c
--- /dev/null
@@ -0,0 +1,71 @@
+# Install the Google Cloud Storage dependencies.
+
+```
+sudo apt-get install \
+    cmake \
+    libcurl3-gnutls-dev \
+    libgrpc++-dev \
+    libprotobuf-dev \
+    protobuf-compiler-grpc
+```
+
+# Build the Google Cloud Storage library
+
+```
+git clone https://github.com/google/crc32c.git
+cd crc32c && git checkout -b 1.1.1
+mkdir build && cd build
+cmake .. \
+    -GNinja \
+    -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
+    -DCMAKE_INSTALL_LIBDIR:PATH=lib \
+    -DBUILD_SHARED_LIBS=YES \
+    -DCRC32C_USE_GLOG=NO \
+    -DCRC32C_BUILD_TESTS=NO \
+    -DCRC32C_BUILD_BENCHMARKS=NO
+ninja && ninja install
+
+git clone https://github.com/abseil/abseil-cpp.git
+git checkout master
+mkdir build && cd build
+cmake .. \
+    -GNinja \
+    -DBUILD_TESTING=NO \
+    -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
+    -DCMAKE_INSTALL_LIBDIR:PATH=lib \
+    -DBUILD_SHARED_LIBS=YES
+ninja && ninja install
+
+git clone https://github.com/googleapis/google-cloud-cpp.git
+git checkout -b v1.25.0
+mkdir build && cd build
+cmake .. \
+    -GNinja \
+    -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
+    -DCMAKE_INSTALL_LIBDIR:PATH=lib \
+    -DBUILD_SHARED_LIBS=YES \
+    -DBUILD_TESTING=NO \
+    -DGOOGLE_CLOUD_CPP_ENABLE=storage
+ninja && ninja install
+```
+
+# Running the gs elements locally
+
+When running from the command line or in a container running locally, simply
+set the credentials by exporting GOOGLE_APPLICATION_CREDENTIALS. If you are
+not familiar with this environment variable, check the documentation
+https://cloud.google.com/docs/authentication/getting-started
+Note that you can restrict a service account to the role Storage Admin or
+Storage Object Creator instead of the Project Owner role from the above
+documentation.
+
+# Running the gs elements in Google Cloud Run
+
+Add the Storage Object Viewer role to the service account assigned to the
+Cloud Run service where gssrc runs. For gssink add the role Storage Object
+Creator. Then just set the service-account-email property on the element.
+
+# Running the gs elements in Google Cloud Kubernetes
+
+You need to set GOOGLE_APPLICATION_CREDENTIALS in the container and ship the
+json file to which the environment variable points to.
diff --git a/ext/gs/gstgs.cpp b/ext/gs/gstgs.cpp
new file mode 100644 (file)
index 0000000..f5c9ddf
--- /dev/null
@@ -0,0 +1,56 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssrc.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * plugin-gs:
+ *
+ * The gs plugin contains elements to interact with with Google Cloud Storage.
+ * In particular with the gs:// protocol or by specifying the storage bucket.
+ *
+ * Since: 1.20
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstgssink.h"
+#include "gstgssrc.h"
+
+GST_DEBUG_CATEGORY (gst_gs_src_debug);
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+  if (!gst_element_register (plugin, "gssrc", GST_RANK_NONE, GST_TYPE_GS_SRC))
+    return FALSE;
+
+  if (!gst_element_register (plugin, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK))
+    return FALSE;
+
+  return TRUE;
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+    GST_VERSION_MINOR,
+    gs,
+    "Read and write from and to a Google Cloud Storage",
+    plugin_init,
+    PACKAGE_VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/ext/gs/gstgscommon.cpp b/ext/gs/gstgscommon.cpp
new file mode 100644 (file)
index 0000000..b0f5760
--- /dev/null
@@ -0,0 +1,134 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgscommon.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include "gstgscommon.h"
+
+#include "google/cloud/storage/oauth2/compute_engine_credentials.h"
+
+namespace gcs = google::cloud::storage;
+
+namespace {
+
+#if !GLIB_CHECK_VERSION(2, 62, 0)
+  static inline gchar *
+  g_date_time_format_iso8601 (GDateTime * datetime)
+  {
+    GString *
+        outstr = NULL;
+    gchar *
+        main_date = NULL;
+    gint64 offset;
+
+    // Main date and time.
+    main_date = g_date_time_format (datetime, "%Y-%m-%dT%H:%M:%S");
+    outstr = g_string_new (main_date);
+    g_free (main_date);
+
+    // Timezone. Format it as `%:::z` unless the offset is zero, in which case
+    // we can simply use `Z`.
+    offset = g_date_time_get_utc_offset (datetime);
+
+    if (offset == 0) {
+      g_string_append_c (outstr, 'Z');
+    } else {
+      gchar *
+          time_zone = g_date_time_format (datetime, "%:::z");
+      g_string_append (outstr, time_zone);
+      g_free (time_zone);
+    }
+
+    return g_string_free (outstr, FALSE);
+  }
+#endif
+
+}  // namespace
+
+std::unique_ptr <
+    google::cloud::storage::Client >
+gst_gs_create_client (const gchar * service_account_email, GError ** error)
+{
+  if (service_account_email) {
+    // Meant to be used from a container running in the Cloud.
+
+    google::cloud::StatusOr < std::shared_ptr <
+        gcs::oauth2::Credentials >> creds (std::make_shared <
+        gcs::oauth2::ComputeEngineCredentials <>> (service_account_email));
+    if (!creds) {
+      g_set_error (error, GST_RESOURCE_ERROR,
+          GST_RESOURCE_ERROR_NOT_AUTHORIZED,
+          "Could not retrieve credentials for the given service account %s (%s)",
+          service_account_email, creds.status ().message ().c_str ());
+      return nullptr;
+    }
+
+    gcs::ClientOptions client_options (std::move (creds.value ()));
+    return std::make_unique < gcs::Client > (client_options,
+        gcs::StrictIdempotencyPolicy ());
+  }
+  // Default account. This is meant to retrieve the credentials automatically
+  // using diffrent methods.
+  google::cloud::StatusOr < gcs::ClientOptions > client_options =
+      gcs::ClientOptions::CreateDefaultClientOptions ();
+
+  if (!client_options) {
+    g_set_error (error, GST_RESOURCE_ERROR,
+        GST_RESOURCE_ERROR_NOT_AUTHORIZED,
+        "Could not create default client options (%s)",
+        client_options.status ().message ().c_str ());
+    return nullptr;
+  }
+  return std::make_unique < gcs::Client > (client_options.value (),
+      gcs::StrictIdempotencyPolicy ());
+}
+
+gboolean
+gst_gs_get_buffer_date (GstBuffer * buffer, GDateTime * start_date,
+    gchar ** buffer_date_str_ptr)
+{
+  gchar *
+      buffer_date_str = NULL;
+  GstClockTime buffer_timestamp = GST_CLOCK_TIME_NONE;
+  GTimeSpan buffer_timespan = 0;
+
+  if (!buffer || !start_date)
+    return FALSE;
+
+  buffer_timestamp = GST_BUFFER_PTS (buffer);
+
+  // GTimeSpan is in micro seconds.
+  buffer_timespan = GST_TIME_AS_USECONDS (buffer_timestamp);
+
+  GDateTime *
+      buffer_date = g_date_time_add (start_date, buffer_timespan);
+  if (!buffer_date)
+    return FALSE;
+
+  buffer_date_str = g_date_time_format_iso8601 (buffer_date);
+  g_date_time_unref (buffer_date);
+
+  if (!buffer_date_str)
+    return FALSE;
+
+  if (buffer_date_str_ptr)
+    *buffer_date_str_ptr = buffer_date_str;
+
+  return TRUE;
+}
diff --git a/ext/gs/gstgscommon.h b/ext/gs/gstgscommon.h
new file mode 100644 (file)
index 0000000..8dd59f2
--- /dev/null
@@ -0,0 +1,39 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgscommon.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_GS_COMMON_H__
+#define __GST_GS_COMMON_H__
+
+#include <memory>
+
+#include <gst/gst.h>
+
+#include <google/cloud/storage/client.h>
+
+std::unique_ptr<google::cloud::storage::Client> gst_gs_create_client(
+    const gchar* service_account_email,
+    GError** error);
+
+gboolean gst_gs_get_buffer_date(GstBuffer* buffer,
+                                GDateTime* start_date,
+                                gchar** buffer_date_str_ptr);
+
+#endif // __GST_GS_COMMON_H__
diff --git a/ext/gs/gstgssink.cpp b/ext/gs/gstgssink.cpp
new file mode 100644 (file)
index 0000000..3fdb7b0
--- /dev/null
@@ -0,0 +1,793 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssink.cpp:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * SECTION:element-gssink
+ * @title: gssink
+ * @see_also: #GstGsSrc
+ *
+ * Write incoming data to a series of sequentially-named remote files on a
+ * Google Cloud Storage bucket.
+ *
+ * The object-name property should contain a string with a \%d placeholder
+ * that will be substituted with the index for each filename.
+ *
+ * If the #GstGsSink:post-messages property is %TRUE, it sends an application
+ * message named `GstGsSink` after writing each buffer.
+ *
+ * The message's structure contains these fields:
+ *
+ * * #gchararray `filename`: the filename where the buffer was written.
+ * * #gchararray `date`: the date of the current buffer, NULL if no start date
+ * is provided.
+ * * #gint `index`: index of the buffer.
+ * * #GstClockTime `timestamp`: the timestamp of the buffer.
+ * * #GstClockTime `stream-time`: the stream time of the buffer.
+ * * #GstClockTime `running-time`: the running_time of the buffer.
+ * * #GstClockTime `duration`: the duration of the buffer.
+ * * #guint64 `offset`: the offset of the buffer that triggered the message.
+ * * #guint64 `offset-end`: the offset-end of the buffer that triggered the
+ * message.
+ *
+ * ## Example launch line
+ * ```
+ * gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
+ * object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
+ * next-file=buffer post-messages=true
+ * ```
+ * ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
+ * names are frame00000.png, frame00001.png, ..., frame00014.png
+ * ```
+ * gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
+ * pngenc ! gssink start-date="2020-04-16T08:55:03Z"
+ * object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
+ * next-file=buffer post-messages=true
+ * ```
+ * ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
+ * names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
+ * im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
+ * im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
+ * ```
+ * gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
+ * object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
+ * ```
+ * ### Upload any stream as a single file into Google Cloud Storage. Similar as
+ * filesink in this case. The file is then accessible from:
+ * gs://mybucket/mypath/myvideos/video.mp4
+ *
+ * See also: #GstGsSrc
+ * Since: 1.20
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstgscommon.h"
+#include "gstgssink.h"
+
+#include <algorithm>
+
+static GstStaticPadTemplate sinktemplate =
+    GST_STATIC_PAD_TEMPLATE("sink",
+                            GST_PAD_SINK,
+                            GST_PAD_ALWAYS,
+                            GST_STATIC_CAPS_ANY);
+
+GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
+#define GST_CAT_DEFAULT gst_gs_sink_debug
+
+#define DEFAULT_INDEX 0
+#define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
+#define DEFAULT_OBJECT_NAME "%s_%05d"
+#define DEFAULT_POST_MESSAGES FALSE
+
+namespace gcs = google::cloud::storage;
+
+enum {
+  PROP_0,
+  PROP_BUCKET_NAME,
+  PROP_OBJECT_NAME,
+  PROP_INDEX,
+  PROP_POST_MESSAGES,
+  PROP_NEXT_FILE,
+  PROP_SERVICE_ACCOUNT_EMAIL,
+  PROP_START_DATE,
+};
+
+class GSWriteStream;
+
+struct _GstGsSink {
+  GstBaseSink parent;
+
+  std::unique_ptr<google::cloud::storage::Client> gcs_client;
+  std::unique_ptr<GSWriteStream> gcs_stream;
+  gchar* service_account_email;
+  gchar* bucket_name;
+  gchar* object_name;
+  gchar* start_date_str;
+  GDateTime* start_date;
+  gint index;
+  gboolean post_messages;
+  GstGsSinkNext next_file;
+  const gchar* content_type;
+  size_t nb_percent_format;
+  gboolean percent_s_is_first;
+};
+
+static void gst_gs_sink_finalize(GObject* object);
+
+static void gst_gs_sink_set_property(GObject* object,
+                                     guint prop_id,
+                                     const GValue* value,
+                                     GParamSpec* pspec);
+static void gst_gs_sink_get_property(GObject* object,
+                                     guint prop_id,
+                                     GValue* value,
+                                     GParamSpec* pspec);
+
+static gboolean gst_gs_sink_start(GstBaseSink* bsink);
+static gboolean gst_gs_sink_stop(GstBaseSink* sink);
+static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
+static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
+                                             GstBufferList* buffer_list);
+static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
+static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
+
+#define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
+static GType gst_gs_sink_next_get_type(void) {
+  static GType gs_sink_next_type = 0;
+  static const GEnumValue next_types[] = {
+      {GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
+      {GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
+      {0, NULL, NULL}};
+
+  if (!gs_sink_next_type) {
+    gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
+  }
+
+  return gs_sink_next_type;
+}
+
+#define gst_gs_sink_parent_class parent_class
+G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
+
+class GSWriteStream {
+ public:
+  GSWriteStream(google::cloud::storage::Client& client,
+                const char* bucket_name,
+                const char* object_name,
+                const char* content_type)
+      : gcs_stream_(client.WriteObject(
+            bucket_name,
+            object_name,
+            gcs::WithObjectMetadata(
+                gcs::ObjectMetadata().set_content_type(content_type)))) {}
+  ~GSWriteStream() { gcs_stream_.Close(); }
+
+  gcs::ObjectWriteStream& stream() { return gcs_stream_; }
+
+ private:
+  gcs::ObjectWriteStream gcs_stream_;
+};
+
+static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
+  GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
+  GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
+  GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
+
+  gobject_class->set_property = gst_gs_sink_set_property;
+  gobject_class->get_property = gst_gs_sink_get_property;
+
+  /**
+   * GstGsSink:bucket-name:
+   *
+   * Name of the Google Cloud Storage bucket.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_BUCKET_NAME,
+      g_param_spec_string(
+          "bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
+          NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+  /**
+   * GstGsSink:object-name:
+   *
+   * Full path name of the remote file.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_OBJECT_NAME,
+      g_param_spec_string(
+          "object-name", "Object Name", "Full path name of the remote file",
+          DEFAULT_OBJECT_NAME,
+          (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+  /**
+   * GstGsSink:index:
+   *
+   * Index to use with location property to create file names.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_INDEX,
+      g_param_spec_int(
+          "index", "Index",
+          "Index to use with location property to create file names.  The "
+          "index is incremented by one for each buffer written.",
+          0, G_MAXINT, DEFAULT_INDEX,
+          (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+  /**
+   * GstGsSink:post-messages:
+   *
+   * Post a message on the GstBus for each file.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_POST_MESSAGES,
+      g_param_spec_boolean(
+          "post-messages", "Post Messages",
+          "Post a message for each file with information of the buffer",
+          DEFAULT_POST_MESSAGES,
+          (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+  /**
+   * GstGsSink:next-file:
+   *
+   * A #GstGsSinkNext that specifies when to start a new file.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_NEXT_FILE,
+      g_param_spec_enum(
+          "next-file", "Next File", "When to start a new file",
+          GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
+          (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+  /**
+   * GstGsSink:service-account-email:
+   *
+   * Service Account Email to use for credentials.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
+      g_param_spec_string(
+          "service-account-email", "Service Account Email",
+          "Service Account Email to use for credentials", NULL,
+          (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+                        GST_PARAM_MUTABLE_READY)));
+
+  /**
+   * GstGsSink:start-date:
+   *
+   * Start date in iso8601 format.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_START_DATE,
+      g_param_spec_string(
+          "start-date", "Start Date", "Start date in iso8601 format", NULL,
+          (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+                        GST_PARAM_MUTABLE_READY)));
+
+  gobject_class->finalize = gst_gs_sink_finalize;
+
+  gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
+  gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
+  gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
+  gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
+  gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
+  gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
+
+  GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
+
+  gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
+  gst_element_class_set_static_metadata(
+      gstelement_class, "Google Cloud Storage Sink", "Sink/File",
+      "Write buffers to a sequentially named set of files on Google Cloud "
+      "Storage",
+      "Julien Isorce <jisorce@oblong.com>");
+}
+
+static void gst_gs_sink_init(GstGsSink* sink) {
+  sink->gcs_client = nullptr;
+  sink->gcs_stream = nullptr;
+  sink->index = DEFAULT_INDEX;
+  sink->post_messages = DEFAULT_POST_MESSAGES;
+  sink->service_account_email = NULL;
+  sink->bucket_name = NULL;
+  sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
+  sink->start_date_str = NULL;
+  sink->start_date = NULL;
+  sink->next_file = DEFAULT_NEXT_FILE;
+  sink->content_type = NULL;
+  sink->nb_percent_format = 0;
+  sink->percent_s_is_first = FALSE;
+
+  gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
+}
+
+static void gst_gs_sink_finalize(GObject* object) {
+  GstGsSink* sink = GST_GS_SINK(object);
+
+  sink->gcs_client = nullptr;
+  sink->gcs_stream = nullptr;
+  g_free(sink->service_account_email);
+  sink->service_account_email = NULL;
+  g_free(sink->bucket_name);
+  sink->bucket_name = NULL;
+  g_free(sink->object_name);
+  sink->object_name = NULL;
+  g_free(sink->start_date_str);
+  sink->start_date_str = NULL;
+  if (sink->start_date) {
+    g_date_time_unref(sink->start_date);
+    sink->start_date = NULL;
+  }
+  sink->content_type = NULL;
+
+  G_OBJECT_CLASS(parent_class)->finalize(object);
+}
+
+static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
+                                            const gchar* object_name) {
+  g_free(sink->object_name);
+  sink->object_name = NULL;
+  sink->nb_percent_format = 0;
+  sink->percent_s_is_first = FALSE;
+
+  if (!object_name) {
+    GST_ERROR_OBJECT(sink, "Object name is null");
+    return FALSE;
+  }
+
+  const std::string name(object_name);
+  sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
+  if (sink->nb_percent_format > 2) {
+    GST_ERROR_OBJECT(sink, "Object name has too many formats");
+    return FALSE;
+  }
+
+  const size_t delimiter_percent_s = name.find("%s");
+  if (delimiter_percent_s == std::string::npos) {
+    if (sink->nb_percent_format == 2) {
+      GST_ERROR_OBJECT(sink, "Object name must have just one number format");
+      return FALSE;
+    }
+    sink->object_name = g_strdup(object_name);
+    return TRUE;
+  }
+
+  const size_t delimiter_percent = name.find_first_of('%');
+  if (delimiter_percent_s == delimiter_percent) {
+    sink->percent_s_is_first = TRUE;
+
+    if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
+      GST_ERROR_OBJECT(sink, "Object name expect max one string format");
+      return FALSE;
+    }
+  }
+
+  sink->object_name = g_strdup(object_name);
+
+  return TRUE;
+}
+
+static void gst_gs_sink_set_property(GObject* object,
+                                     guint prop_id,
+                                     const GValue* value,
+                                     GParamSpec* pspec) {
+  GstGsSink* sink = GST_GS_SINK(object);
+
+  switch (prop_id) {
+    case PROP_BUCKET_NAME:
+      g_free(sink->bucket_name);
+      sink->bucket_name = g_strdup(g_value_get_string(value));
+      break;
+    case PROP_OBJECT_NAME:
+      gst_gs_sink_set_object_name(sink, g_value_get_string(value));
+      break;
+    case PROP_INDEX:
+      sink->index = g_value_get_int(value);
+      break;
+    case PROP_POST_MESSAGES:
+      sink->post_messages = g_value_get_boolean(value);
+      break;
+    case PROP_NEXT_FILE:
+      sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
+      break;
+    case PROP_SERVICE_ACCOUNT_EMAIL:
+      g_free(sink->service_account_email);
+      sink->service_account_email = g_strdup(g_value_get_string(value));
+      break;
+    case PROP_START_DATE:
+      g_free(sink->start_date_str);
+      if (sink->start_date)
+        g_date_time_unref(sink->start_date);
+      sink->start_date_str = g_strdup(g_value_get_string(value));
+      sink->start_date =
+          g_date_time_new_from_iso8601(sink->start_date_str, NULL);
+      if (!sink->start_date) {
+        GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
+                         sink->start_date_str);
+        g_free(sink->start_date_str);
+        sink->start_date_str = NULL;
+      }
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+      break;
+  }
+}
+
+static void gst_gs_sink_get_property(GObject* object,
+                                     guint prop_id,
+                                     GValue* value,
+                                     GParamSpec* pspec) {
+  GstGsSink* sink = GST_GS_SINK(object);
+
+  switch (prop_id) {
+    case PROP_BUCKET_NAME:
+      g_value_set_string(value, sink->bucket_name);
+      break;
+    case PROP_OBJECT_NAME:
+      g_value_set_string(value, sink->object_name);
+      break;
+    case PROP_INDEX:
+      g_value_set_int(value, sink->index);
+      break;
+    case PROP_POST_MESSAGES:
+      g_value_set_boolean(value, sink->post_messages);
+      break;
+    case PROP_NEXT_FILE:
+      g_value_set_enum(value, sink->next_file);
+      break;
+    case PROP_SERVICE_ACCOUNT_EMAIL:
+      g_value_set_string(value, sink->service_account_email);
+      break;
+    case PROP_START_DATE:
+      g_value_set_string(value, sink->start_date_str);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+      break;
+  }
+}
+
+static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
+  GstGsSink* sink = GST_GS_SINK(bsink);
+  GError* err = NULL;
+
+  if (!sink->bucket_name) {
+    GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
+                      GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+
+  if (!sink->object_name) {
+    GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
+                      GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+
+  sink->content_type = "";
+
+  sink->gcs_client = gst_gs_create_client(sink->service_account_email, &err);
+  if (err) {
+    GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
+                      ("Could not create client (%s)", err->message),
+                      GST_ERROR_SYSTEM);
+    g_clear_error(&err);
+    return FALSE;
+  }
+
+  GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
+                  sink->bucket_name, sink->object_name);
+
+  return TRUE;
+}
+
+static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
+  GstGsSink* sink = GST_GS_SINK(bsink);
+
+  sink->gcs_client = nullptr;
+  sink->gcs_stream = nullptr;
+  sink->content_type = NULL;
+
+  return TRUE;
+}
+
+static void gst_gs_sink_post_message_full(GstGsSink* sink,
+                                          GstClockTime timestamp,
+                                          GstClockTime duration,
+                                          GstClockTime offset,
+                                          GstClockTime offset_end,
+                                          GstClockTime running_time,
+                                          GstClockTime stream_time,
+                                          const char* filename,
+                                          const gchar* date) {
+  GstStructure* s;
+
+  if (!sink->post_messages)
+    return;
+
+  s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
+                        "date", G_TYPE_STRING, date, "index", G_TYPE_INT,
+                        sink->index, "timestamp", G_TYPE_UINT64, timestamp,
+                        "stream-time", G_TYPE_UINT64, stream_time,
+                        "running-time", G_TYPE_UINT64, running_time, "duration",
+                        G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
+                        offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
+
+  gst_element_post_message(GST_ELEMENT_CAST(sink),
+                           gst_message_new_element(GST_OBJECT_CAST(sink), s));
+}
+
+static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
+                                               GstClockTime timestamp,
+                                               GstClockTime duration,
+                                               const char* filename) {
+  GstClockTime running_time, stream_time;
+  guint64 offset, offset_end;
+  GstSegment* segment;
+  GstFormat format;
+
+  if (!sink->post_messages)
+    return;
+
+  segment = &GST_BASE_SINK(sink)->segment;
+  format = segment->format;
+
+  offset = -1;
+  offset_end = -1;
+
+  running_time = gst_segment_to_running_time(segment, format, timestamp);
+  stream_time = gst_segment_to_stream_time(segment, format, timestamp);
+
+  gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
+                                running_time, stream_time, filename, NULL);
+}
+
+static void gst_gs_sink_post_message(GstGsSink* sink,
+                                     GstBuffer* buffer,
+                                     const char* filename,
+                                     const char* date) {
+  GstClockTime duration, timestamp;
+  GstClockTime running_time, stream_time;
+  guint64 offset, offset_end;
+  GstSegment* segment;
+  GstFormat format;
+
+  if (!sink->post_messages)
+    return;
+
+  segment = &GST_BASE_SINK(sink)->segment;
+  format = segment->format;
+
+  timestamp = GST_BUFFER_PTS(buffer);
+  duration = GST_BUFFER_DURATION(buffer);
+  offset = GST_BUFFER_OFFSET(buffer);
+  offset_end = GST_BUFFER_OFFSET_END(buffer);
+
+  running_time = gst_segment_to_running_time(segment, format, timestamp);
+  stream_time = gst_segment_to_stream_time(segment, format, timestamp);
+
+  gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
+                                running_time, stream_time, filename, date);
+}
+
+static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
+                                              GstBuffer* buffer) {
+  GstMapInfo map = {0};
+  gchar* object_name = NULL;
+  gchar* buffer_date = NULL;
+
+  if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
+    return GST_FLOW_ERROR;
+
+  switch (sink->next_file) {
+    case GST_GS_SINK_NEXT_BUFFER: {
+      // Get buffer date if needed.
+      if (sink->start_date) {
+        if (sink->nb_percent_format != 2) {
+          GST_ERROR_OBJECT(sink, "Object name expects date and index");
+          gst_buffer_unmap(buffer, &map);
+          return GST_FLOW_ERROR;
+        }
+
+        if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
+          GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
+          gst_buffer_unmap(buffer, &map);
+          return GST_FLOW_ERROR;
+        }
+
+        if (sink->percent_s_is_first) {
+          object_name =
+              g_strdup_printf(sink->object_name, buffer_date, sink->index);
+        } else {
+          object_name =
+              g_strdup_printf(sink->object_name, sink->index, buffer_date);
+        }
+      } else {
+        if (sink->nb_percent_format != 1) {
+          GST_ERROR_OBJECT(sink, "Object name expects only an index");
+          gst_buffer_unmap(buffer, &map);
+          return GST_FLOW_ERROR;
+        }
+
+        object_name = g_strdup_printf(sink->object_name, sink->index);
+      }
+
+      GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
+
+      gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
+          sink->bucket_name, object_name,
+          gcs::WithObjectMetadata(
+              gcs::ObjectMetadata().set_content_type(sink->content_type)));
+      gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
+      if (gcs_stream.fail()) {
+        GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
+      }
+      gcs_stream.Close();
+
+      google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
+          sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
+      if (!object_metadata) {
+        GST_ERROR_OBJECT(
+            sink, "Could not get object metadata for object %s (%s)",
+            object_name, object_metadata.status().message().c_str());
+        gst_buffer_unmap(buffer, &map);
+        g_free(object_name);
+        g_free(buffer_date);
+        return GST_FLOW_ERROR;
+      }
+
+      GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
+                      object_name, object_metadata->size());
+
+      gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
+      g_free(object_name);
+      g_free(buffer_date);
+      ++sink->index;
+      break;
+    }
+    case GST_GS_SINK_NEXT_NONE: {
+      if (!sink->gcs_stream) {
+        GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
+        sink->gcs_stream = std::make_unique<GSWriteStream>(
+            *sink->gcs_client.get(), sink->bucket_name, sink->object_name,
+            sink->content_type);
+
+        if (!sink->gcs_stream->stream().IsOpen()) {
+          GST_ELEMENT_ERROR(
+              sink, RESOURCE, OPEN_READ,
+              ("Could not create write stream (%s)",
+               sink->gcs_stream->stream().last_status().message().c_str()),
+              GST_ERROR_SYSTEM);
+          gst_buffer_unmap(buffer, &map);
+          return GST_FLOW_OK;
+        }
+      }
+
+      GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
+
+      gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
+      stream.write(reinterpret_cast<const char*>(map.data), map.size);
+      if (stream.fail()) {
+        GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
+      }
+      break;
+    }
+    default:
+      g_assert_not_reached();
+  }
+
+  gst_buffer_unmap(buffer, &map);
+  return GST_FLOW_OK;
+}
+
+static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
+  GstGsSink* sink = GST_GS_SINK(bsink);
+  GstFlowReturn flow = GST_FLOW_OK;
+
+  flow = gst_gs_sink_write_buffer(sink, buffer);
+  return flow;
+}
+
+static gboolean buffer_list_copy_data(GstBuffer** buf,
+                                      guint idx,
+                                      gpointer data) {
+  GstBuffer* dest = GST_BUFFER_CAST(data);
+  guint num, i;
+
+  if (idx == 0)
+    gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
+
+  num = gst_buffer_n_memory(*buf);
+  for (i = 0; i < num; ++i) {
+    GstMemory* mem;
+
+    mem = gst_buffer_get_memory(*buf, i);
+    gst_buffer_append_memory(dest, mem);
+  }
+
+  return TRUE;
+}
+
+/* Our assumption for now is that the buffers in a buffer list should always
+ * end up in the same file. If someone wants different behaviour, they'll just
+ * have to add a property for that. */
+static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
+                                             GstBufferList* list) {
+  GstBuffer* buf;
+  guint size;
+
+  size = gst_buffer_list_calculate_size(list);
+  GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
+
+  /* copy all buffers in the list into one single buffer, so we can use
+   * the normal render function (FIXME: optimise to avoid the memcpy) */
+  buf = gst_buffer_new();
+  gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
+  g_assert(gst_buffer_get_size(buf) == size);
+
+  gst_gs_sink_render(sink, buf);
+  gst_buffer_unref(buf);
+
+  return GST_FLOW_OK;
+}
+
+static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
+  GstGsSink* sink = GST_GS_SINK(bsink);
+  GstStructure* s = gst_caps_get_structure(caps, 0);
+
+  sink->content_type = gst_structure_get_name(s);
+
+  GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
+
+  return TRUE;
+}
+
+static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
+  GstGsSink* sink = GST_GS_SINK(bsink);
+
+  switch (GST_EVENT_TYPE(event)) {
+    case GST_EVENT_EOS:
+      if (sink->gcs_stream) {
+        sink->gcs_stream = nullptr;
+        gst_gs_sink_post_message_from_time(
+            sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
+      }
+      break;
+    default:
+      break;
+  }
+
+  return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
+}
diff --git a/ext/gs/gstgssink.h b/ext/gs/gstgssink.h
new file mode 100644 (file)
index 0000000..e93ac4b
--- /dev/null
@@ -0,0 +1,47 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssink.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_GS_SINK_H__
+#define __GST_GS_SINK_H__
+
+#include <gst/base/base.h>
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_GS_SINK (gst_gs_sink_get_type())
+G_DECLARE_FINAL_TYPE(GstGsSink, gst_gs_sink, GST, GS_SINK, GstBaseSink)
+
+/**
+ * GstGsSinkNext:
+ * @GST_GS_SINK_NEXT_BUFFER: New file for each buffer.
+ * @GST_GS_SINK_NEXT_NONE: Only one file like filesink.
+ *
+ * File splitting modes.
+ * Since: 1.20
+ */
+typedef enum {
+  GST_GS_SINK_NEXT_BUFFER,
+  GST_GS_SINK_NEXT_NONE,
+} GstGsSinkNext;
+
+G_END_DECLS
+#endif // __GST_GS_SINK_H__
diff --git a/ext/gs/gstgssrc.cpp b/ext/gs/gstgssrc.cpp
new file mode 100644 (file)
index 0000000..c37640b
--- /dev/null
@@ -0,0 +1,578 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssrc.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-gssrc
+ * @title: gssrc
+ * @see_also: #GstGsSrc
+ *
+ * Read data from a file in a Google Cloud Storage.
+ *
+ * ## Example launch line
+ * ```
+ * gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin !
+ * glimagesink
+ * ```
+ * ### Play a video from a Google Cloud Storage.
+ * ```
+ * gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin ! navseek
+ * seek-offset=10 ! glimagesink
+ * ```
+ * ### Play a video from a Google Cloud Storage and seek using the keyboard
+ * from the terminal.
+ *
+ * See also: #GstGsSink
+ * Since: 1.20
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstgscommon.h"
+#include "gstgssrc.h"
+
+static GstStaticPadTemplate srctemplate =
+    GST_STATIC_PAD_TEMPLATE("src",
+                            GST_PAD_SRC,
+                            GST_PAD_ALWAYS,
+                            GST_STATIC_CAPS_ANY);
+
+GST_DEBUG_CATEGORY_STATIC(gst_gs_src_debug);
+#define GST_CAT_DEFAULT gst_gs_src_debug
+
+enum { LAST_SIGNAL };
+
+#define DEFAULT_BLOCKSIZE 4 * 1024
+
+enum { PROP_0, PROP_LOCATION, PROP_SERVICE_ACCOUNT_EMAIL };
+
+class GSReadStream;
+
+struct _GstGsSrc {
+  GstBaseSrc parent;
+
+  std::unique_ptr<google::cloud::storage::Client> gcs_client;
+  std::unique_ptr<GSReadStream> gcs_stream;
+  gchar* uri;
+  gchar* service_account_email;
+  std::string bucket_name;
+  std::string object_name;
+  guint64 read_position;
+  guint64 object_size;
+};
+
+static void gst_gs_src_finalize(GObject* object);
+
+static void gst_gs_src_set_property(GObject* object,
+                                    guint prop_id,
+                                    const GValue* value,
+                                    GParamSpec* pspec);
+static void gst_gs_src_get_property(GObject* object,
+                                    guint prop_id,
+                                    GValue* value,
+                                    GParamSpec* pspec);
+
+static gboolean gst_gs_src_start(GstBaseSrc* basesrc);
+static gboolean gst_gs_src_stop(GstBaseSrc* basesrc);
+
+static gboolean gst_gs_src_is_seekable(GstBaseSrc* src);
+static gboolean gst_gs_src_get_size(GstBaseSrc* src, guint64* size);
+static GstFlowReturn gst_gs_src_fill(GstBaseSrc* src,
+                                     guint64 offset,
+                                     guint length,
+                                     GstBuffer* buf);
+static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query);
+
+static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data);
+
+#define _do_init                                                            \
+  G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, gst_gs_src_uri_handler_init); \
+  GST_DEBUG_CATEGORY_INIT(gst_gs_src_debug, "gssrc", 0, "gssrc element");
+#define gst_gs_src_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE(GstGsSrc, gst_gs_src, GST_TYPE_BASE_SRC, _do_init);
+
+namespace gcs = google::cloud::storage;
+
+class GSReadStream {
+ public:
+  GSReadStream(GstGsSrc* src,
+               const std::int64_t start = 0,
+               const std::int64_t end = -1)
+      : gcs_stream_(src->gcs_client->ReadObject(src->bucket_name,
+                                                src->object_name,
+                                                gcs::ReadFromOffset(start))) {}
+  ~GSReadStream() { gcs_stream_.Close(); }
+
+  gcs::ObjectReadStream& stream() { return gcs_stream_; }
+
+ private:
+  gcs::ObjectReadStream gcs_stream_;
+};
+
+static void gst_gs_src_class_init(GstGsSrcClass* klass) {
+  GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
+  GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
+  GstBaseSrcClass* gstbasesrc_class = GST_BASE_SRC_CLASS(klass);
+
+  gobject_class->set_property = gst_gs_src_set_property;
+  gobject_class->get_property = gst_gs_src_get_property;
+
+  /**
+   * GstGsSink:location:
+   *
+   * Name of the Google Cloud Storage bucket.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_LOCATION,
+      g_param_spec_string(
+          "location", "File Location", "Location of the file to read", NULL,
+          (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+                        GST_PARAM_MUTABLE_READY)));
+
+  /**
+   * GstGsSrc:service-account-email:
+   *
+   * Service Account Email to use for credentials.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property(
+      gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
+      g_param_spec_string(
+          "service-account-email", "Service Account Email",
+          "Service Account Email to use for credentials", NULL,
+          (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+                        GST_PARAM_MUTABLE_READY)));
+
+  gobject_class->finalize = gst_gs_src_finalize;
+
+  gst_element_class_set_static_metadata(
+      gstelement_class, "Google Cloud Storage Source", "Source/File",
+      "Read from arbitrary point from a file in a Google Cloud Storage",
+      "Julien Isorce <jisorce@oblong.com>");
+  gst_element_class_add_static_pad_template(gstelement_class, &srctemplate);
+
+  gstbasesrc_class->start = GST_DEBUG_FUNCPTR(gst_gs_src_start);
+  gstbasesrc_class->stop = GST_DEBUG_FUNCPTR(gst_gs_src_stop);
+  gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR(gst_gs_src_is_seekable);
+  gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR(gst_gs_src_get_size);
+  gstbasesrc_class->fill = GST_DEBUG_FUNCPTR(gst_gs_src_fill);
+  gstbasesrc_class->query = GST_DEBUG_FUNCPTR(gst_gs_src_query);
+}
+
+static void gst_gs_src_init(GstGsSrc* src) {
+  src->gcs_stream = nullptr;
+  src->uri = NULL;
+  src->service_account_email = NULL;
+  src->read_position = 0;
+  src->object_size = 0;
+
+  gst_base_src_set_blocksize(GST_BASE_SRC(src), DEFAULT_BLOCKSIZE);
+  gst_base_src_set_dynamic_size(GST_BASE_SRC(src), FALSE);
+  gst_base_src_set_live(GST_BASE_SRC(src), FALSE);
+}
+
+static void gst_gs_src_finalize(GObject* object) {
+  GstGsSrc* src = GST_GS_SRC(object);
+
+  g_free(src->uri);
+  src->uri = NULL;
+  g_free(src->service_account_email);
+  src->service_account_email = NULL;
+  src->read_position = 0;
+  src->object_size = 0;
+
+  G_OBJECT_CLASS(parent_class)->finalize(object);
+}
+
+static gboolean gst_gs_src_set_location(GstGsSrc* src,
+                                        const gchar* location,
+                                        GError** err) {
+  GstState state = GST_STATE_NULL;
+  std::string filepath = location;
+  size_t delimiter = std::string::npos;
+
+  // The element must be stopped in order to do this.
+  GST_OBJECT_LOCK(src);
+  state = GST_STATE(src);
+  if (state != GST_STATE_READY && state != GST_STATE_NULL)
+    goto wrong_state;
+  GST_OBJECT_UNLOCK(src);
+
+  g_free(src->uri);
+  src->uri = NULL;
+
+  if (location) {
+    if (g_str_has_prefix(location, "gs://")) {
+      src->uri = g_strdup(location);
+      filepath = filepath.substr(5);
+    } else {
+      src->uri = g_strdup_printf("gs://%s", location);
+      filepath = location;
+    }
+
+    delimiter = filepath.find_first_of('/');
+    if (delimiter == std::string::npos)
+      goto wrong_location;
+
+    std::string bucket_name = filepath.substr(0, delimiter);
+    src->bucket_name = bucket_name;
+    src->object_name = filepath.substr(delimiter + 1);
+
+    GST_INFO_OBJECT(src, "uri is %s", src->uri);
+    GST_INFO_OBJECT(src, "bucket name is %s", src->bucket_name.c_str());
+    GST_INFO_OBJECT(src, "object name is %s", src->object_name.c_str());
+  }
+  g_object_notify(G_OBJECT(src), "location");
+
+  return TRUE;
+
+  // ERROR.
+wrong_state : {
+  g_warning(
+      "Changing the `location' property on gssrc when a file is open"
+      "is not supported.");
+  if (err)
+    g_set_error(
+        err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
+        "Changing the `location' property on gssrc when a file is open is "
+        "not supported.");
+  GST_OBJECT_UNLOCK(src);
+  return FALSE;
+}
+wrong_location : {
+  if (err)
+    g_set_error(err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
+                "Failed to find a bucket name");
+  GST_OBJECT_UNLOCK(src);
+  return FALSE;
+}
+}
+
+static gboolean gst_gs_src_set_service_account_email(
+    GstGsSrc* src,
+    const gchar* service_account_email) {
+  if (GST_STATE(src) == GST_STATE_PLAYING ||
+      GST_STATE(src) == GST_STATE_PAUSED) {
+    GST_WARNING_OBJECT(src,
+                       "Setting a new service account email not supported in "
+                       "PLAYING or PAUSED state");
+    return FALSE;
+  }
+
+  GST_OBJECT_LOCK(src);
+  g_free(src->service_account_email);
+  src->service_account_email = NULL;
+
+  if (service_account_email)
+    src->service_account_email = g_strdup(service_account_email);
+
+  GST_OBJECT_UNLOCK(src);
+
+  return TRUE;
+}
+
+static void gst_gs_src_set_property(GObject* object,
+                                    guint prop_id,
+                                    const GValue* value,
+                                    GParamSpec* pspec) {
+  GstGsSrc* src = GST_GS_SRC(object);
+
+  g_return_if_fail(GST_IS_GS_SRC(object));
+
+  switch (prop_id) {
+    case PROP_LOCATION:
+      gst_gs_src_set_location(src, g_value_get_string(value), NULL);
+      break;
+    case PROP_SERVICE_ACCOUNT_EMAIL:
+      gst_gs_src_set_service_account_email(src, g_value_get_string(value));
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+      break;
+  }
+}
+
+static void gst_gs_src_get_property(GObject* object,
+                                    guint prop_id,
+                                    GValue* value,
+                                    GParamSpec* pspec) {
+  GstGsSrc* src = GST_GS_SRC(object);
+
+  g_return_if_fail(GST_IS_GS_SRC(object));
+
+  switch (prop_id) {
+    case PROP_LOCATION:
+      GST_OBJECT_LOCK(src);
+      g_value_set_string(value, src->uri);
+      GST_OBJECT_UNLOCK(src);
+      break;
+    case PROP_SERVICE_ACCOUNT_EMAIL:
+      GST_OBJECT_LOCK(src);
+      g_value_set_string(value, src->service_account_email);
+      GST_OBJECT_UNLOCK(src);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+      break;
+  }
+}
+
+static gint gst_gs_read_stream(GstGsSrc* src,
+                               guint8* data,
+                               const guint64 offset,
+                               const guint length) {
+  gint gcount = 0;
+  gchar* sdata = reinterpret_cast<gchar*>(data);
+
+  gcs::ObjectReadStream& stream = src->gcs_stream->stream();
+
+  while (!stream.eof()) {
+    stream.read(sdata, length);
+    if (stream.status().ok())
+      break;
+
+    GST_ERROR_OBJECT(src, "Restart after (%s)",
+                     stream.status().message().c_str());
+    src->gcs_stream = std::make_unique<GSReadStream>(src, offset);
+  }
+
+  gcount = stream.gcount();
+
+  GST_INFO_OBJECT(src, "Client read %d bytes", gcount);
+
+  return gcount;
+}
+
+static GstFlowReturn gst_gs_src_fill(GstBaseSrc* basesrc,
+                                     guint64 offset,
+                                     guint length,
+                                     GstBuffer* buf) {
+  GstGsSrc* src = GST_GS_SRC(basesrc);
+  guint to_read = 0;
+  guint bytes_read = 0;
+  gint ret = 0;
+  GstMapInfo info = {};
+  guint8* data = NULL;
+
+  if (G_UNLIKELY(offset != (guint64)-1 && src->read_position != offset)) {
+    src->gcs_stream = std::make_unique<GSReadStream>(src, offset);
+    src->read_position = offset;
+  }
+
+  if (!gst_buffer_map(buf, &info, GST_MAP_WRITE))
+    goto buffer_write_fail;
+
+  data = info.data;
+
+  bytes_read = 0;
+  to_read = length;
+  while (to_read > 0) {
+    GST_INFO_OBJECT(src, "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x",
+                    to_read, offset + bytes_read);
+
+    ret = gst_gs_read_stream(src, data + bytes_read, offset, to_read);
+    if (G_UNLIKELY(ret < 0))
+      goto could_not_read;
+
+    if (G_UNLIKELY(ret == 0)) {
+      // Push any remaining data.
+      if (bytes_read > 0)
+        break;
+      goto eos;
+    }
+
+    to_read -= ret;
+    bytes_read += ret;
+
+    src->read_position += ret;
+  }
+
+  GST_INFO_OBJECT(
+      src, "Read %" G_GUINT32_FORMAT " bytes of %" G_GUINT32_FORMAT " length",
+      bytes_read, length);
+
+  gst_buffer_unmap(buf, &info);
+  if (bytes_read != length)
+    gst_buffer_resize(buf, 0, bytes_read);
+
+  GST_BUFFER_OFFSET(buf) = offset;
+  GST_BUFFER_OFFSET_END(buf) = offset + bytes_read;
+
+  return GST_FLOW_OK;
+
+  // ERROR.
+could_not_read : {
+  GST_ELEMENT_ERROR(src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+  gst_buffer_unmap(buf, &info);
+  gst_buffer_resize(buf, 0, 0);
+  return GST_FLOW_ERROR;
+}
+eos : {
+  GST_INFO_OBJECT(src, "EOS");
+  gst_buffer_unmap(buf, &info);
+  gst_buffer_resize(buf, 0, 0);
+  return GST_FLOW_EOS;
+}
+buffer_write_fail : {
+  GST_ELEMENT_ERROR(src, RESOURCE, WRITE, (NULL), ("Can't write to buffer"));
+  return GST_FLOW_ERROR;
+}
+}
+
+static gboolean gst_gs_src_is_seekable(GstBaseSrc* basesrc) {
+  return TRUE;
+}
+
+static gboolean gst_gs_src_get_size(GstBaseSrc* basesrc, guint64* size) {
+  GstGsSrc* src = GST_GS_SRC(basesrc);
+
+  *size = src->object_size;
+
+  return TRUE;
+}
+
+static gboolean gst_gs_src_start(GstBaseSrc* basesrc) {
+  GstGsSrc* src = GST_GS_SRC(basesrc);
+  GError* err = NULL;
+  guint blocksize = 0;
+
+  src->read_position = 0;
+  src->object_size = 0;
+
+  if (src->uri == NULL || src->uri[0] == '\0') {
+    GST_ELEMENT_ERROR(src, RESOURCE, NOT_FOUND,
+                      ("No uri specified for reading."), (NULL));
+    return FALSE;
+  }
+
+  GST_INFO_OBJECT(src, "Opening file %s", src->uri);
+
+  src->gcs_client = gst_gs_create_client(src->service_account_email, &err);
+  if (err) {
+    GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ,
+                      ("Could not create client (%s)", err->message),
+                      GST_ERROR_SYSTEM);
+    g_clear_error(&err);
+    return FALSE;
+  }
+
+  GST_INFO_OBJECT(src, "Parsed bucket name (%s) and object name (%s)",
+                  src->bucket_name.c_str(), src->object_name.c_str());
+
+  google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
+      src->gcs_client->GetObjectMetadata(src->bucket_name, src->object_name);
+  if (!object_metadata) {
+    GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ,
+                      ("Could not get object metadata (%s)",
+                       object_metadata.status().message().c_str()),
+                      GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+
+  src->object_size = object_metadata->size();
+
+  GST_INFO_OBJECT(src, "Object size %" G_GUINT64_FORMAT "\n", src->object_size);
+
+  src->gcs_stream = std::make_unique<GSReadStream>(src);
+
+  blocksize = gcs::ClientOptions(nullptr).download_buffer_size();
+
+  GST_INFO_OBJECT(src, "Set blocksize to %" G_GUINT32_FORMAT, blocksize);
+
+  gst_base_src_set_blocksize(GST_BASE_SRC(src), blocksize);
+
+  return TRUE;
+}
+
+static gboolean gst_gs_src_stop(GstBaseSrc* basesrc) {
+  GstGsSrc* src = GST_GS_SRC(basesrc);
+
+  src->gcs_stream = nullptr;
+  src->read_position = 0;
+  src->object_size = 0;
+
+  return TRUE;
+}
+
+static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query) {
+  gboolean ret;
+
+  switch (GST_QUERY_TYPE(query)) {
+    case GST_QUERY_SCHEDULING: {
+      // A pushsrc can by default never operate in pull mode override
+      // if you want something different.
+      gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEQUENTIAL, 1, -1, 0);
+      gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
+
+      ret = TRUE;
+      break;
+    }
+    default:
+      ret = GST_BASE_SRC_CLASS(parent_class)->query(src, query);
+      break;
+  }
+  return ret;
+}
+
+static GstURIType gst_gs_src_uri_get_type(GType type) {
+  return GST_URI_SRC;
+}
+
+static const gchar* const* gst_gs_src_uri_get_protocols(GType type) {
+  static const gchar* protocols[] = {"gs", NULL};
+
+  return protocols;
+}
+
+static gchar* gst_gs_src_uri_get_uri(GstURIHandler* handler) {
+  GstGsSrc* src = GST_GS_SRC(handler);
+
+  return g_strdup(src->uri);
+}
+
+static gboolean gst_gs_src_uri_set_uri(GstURIHandler* handler,
+                                       const gchar* uri,
+                                       GError** err) {
+  GstGsSrc* src = GST_GS_SRC(handler);
+
+  if (strcmp(uri, "gs://") == 0) {
+    // Special case for "gs://" as this is used by some applications
+    // to test with gst_element_make_from_uri if there's an element
+    // that supports the URI protocol.
+    gst_gs_src_set_location(src, NULL, NULL);
+    return TRUE;
+  }
+
+  return gst_gs_src_set_location(src, uri, err);
+}
+
+static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data) {
+  GstURIHandlerInterface* iface = (GstURIHandlerInterface*)g_iface;
+
+  iface->get_type = gst_gs_src_uri_get_type;
+  iface->get_protocols = gst_gs_src_uri_get_protocols;
+  iface->get_uri = gst_gs_src_uri_get_uri;
+  iface->set_uri = gst_gs_src_uri_set_uri;
+}
diff --git a/ext/gs/gstgssrc.h b/ext/gs/gstgssrc.h
new file mode 100644 (file)
index 0000000..717742f
--- /dev/null
@@ -0,0 +1,34 @@
+/* GStreamer
+ * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
+ *
+ * gstgssrc.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_GS_SRC_H__
+#define __GST_GS_SRC_H__
+
+#include <gst/base/gstbasesrc.h>
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_GS_SRC (gst_gs_src_get_type())
+G_DECLARE_FINAL_TYPE(GstGsSrc, gst_gs_src, GST, GS_SRC, GstBaseSrc)
+
+G_END_DECLS
+#endif // __GST_GS_SRC_H__
diff --git a/ext/gs/meson.build b/ext/gs/meson.build
new file mode 100644 (file)
index 0000000..50e7ff8
--- /dev/null
@@ -0,0 +1,28 @@
+gs_sources = [
+  'gstgscommon.cpp',
+  'gstgssink.cpp',
+  'gstgssrc.cpp',
+  'gstgs.cpp',
+]
+
+gs_dep = dependency('storage_client', version : '>= 1.25.0', required : get_option('gs'))
+
+if gs_dep.found()
+  gstgs = library('gstgs',
+    gs_sources,
+    c_args : gst_plugins_bad_args,
+    cpp_args : gst_plugins_bad_args,
+    include_directories : [configinc, libsinc],
+    dependencies : [gstbase_dep, gs_dep],
+    install : true,
+    install_dir : plugins_install_dir,
+  )
+  pkgconfig.generate(gstgs, install_dir : plugins_pkgconfig_install_dir)
+  plugins += [gstgs]
+endif
+
+clang_format_p = find_program('clang-format', required: false)
+if clang_format_p.found()
+  run_command(clang_format_p, '--style=file', '-i', 'gstgssink.cpp', 'gstgssrc.cpp')
+endif
+
index 63844ca..98a0053 100644 (file)
@@ -18,6 +18,7 @@ subdir('fdkaac')
 subdir('flite')
 subdir('fluidsynth')
 subdir('gme')
+subdir('gs')
 subdir('gsm')
 subdir('hls')
 subdir('iqa')
index 0326851..4b8eaa3 100644 (file)
@@ -107,6 +107,7 @@ option('flite', type : 'feature', value : 'auto', description : 'Flite speech sy
 option('fluidsynth', type : 'feature', value : 'auto', description : 'Fluidsynth MIDI decoder plugin')
 option('gl', type : 'feature', value : 'auto', description : 'GStreamer OpenGL integration support (used by various plugins)')
 option('gme', type : 'feature', value : 'auto', description : 'libgme gaming console music file decoder plugin')
+option('gs', type : 'feature', value : 'auto', description : 'Google Cloud Storage source and sink plugin')
 option('gsm', type : 'feature', value : 'auto', description : 'GSM encoder/decoder plugin')
 option('ipcpipeline', type : 'feature', value : 'auto', description : 'Inter-process communication plugin')
 option('iqa', type : 'feature', value : 'auto', description : 'Image quality assessment plugin')