From: Julien Date: Tue, 23 Jun 2020 19:41:27 +0000 (-0700) Subject: gs: add source and sink for Google Cloud Storage X-Git-Tag: 1.19.3~507^2~694 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=e9f5d94c9304aaae4ffc27ffeae1f7c49c4d3e57;p=platform%2Fupstream%2Fgstreamer.git gs: add source and sink for Google Cloud Storage Useful when having a service that runs a GStreamer pipeline or application in Google Cloud to avoid storing the inputs and outputs in the running container or service. For example when analyzing a video from a Google Cloud Storage bucket and extracting images or converting the video and then uploading the results into another Google Cloud Storage bucket. - gssrc allows to read from a file located in Google Cloud Storage and it supports seeking. - gssink allows to write to a file located in Google Cloud Storage. There are 2 modes, one similar to multifilesink and the other similar to filesink. Example: gst-launch-1.0 gssrc location=gs://mybucket/videos/sample.mp4 ! decodebin ! glimagesink gst-launch-1.0 playbin uri=gs://mybucket/videos/sample.mp4 gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! gssink object-name="img/img%05d.png" bucket-name="mybucket" next-file=buffer gst-launch-1.0 filesrc location=sample.mp4 ! gssink object-name="videos/video.mp4" bucket-name="mybucket" next-file=none When running locally simply set GOOGLE_APPLICATION_CREDENTIALS. But when running in Google Cloud Run or Google Cloud Engine, just set the "service-account-email" property on each element. Closes #1264 Part-of: --- diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 6eae764..02da740 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -25898,6 +25898,195 @@ "tracers": {}, "url": "Unknown package origin" }, + "gs": { + "description": "Read and write from and to a Google Cloud Storage", + "elements": { + "gssink": { + "author": "Julien Isorce ", + "description": "Write buffers to a sequentially named set of files on Google Cloud Storage", + "hierarchy": [ + "GstGsSink", + "GstBaseSink", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Sink/File", + "long-name": "Google Cloud Storage Sink", + "pad-templates": { + "sink": { + "caps": "ANY", + "direction": "sink", + "presence": "always" + } + }, + "properties": { + "bucket-name": { + "blurb": "Google Cloud Storage Bucket Name", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "index": { + "blurb": "Index to use with location property to create file names. The index is incremented by one for each buffer written.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "2147483647", + "min": "0", + "mutable": "null", + "readable": true, + "type": "gint", + "writable": true + }, + "next-file": { + "blurb": "When to start a new file", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "buffer (0)", + "mutable": "null", + "readable": true, + "type": "GstGsSinkNext", + "writable": true + }, + "object-name": { + "blurb": "Full path name of the remote file", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "%%s_%%05d", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "post-messages": { + "blurb": "Post a message for each file with information of the buffer", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true + }, + "service-account-email": { + "blurb": "Service Account Email to use for credentials", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "start-date": { + "blurb": "Start date in iso8601 format", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + } + }, + "rank": "none" + }, + "gssrc": { + "author": "Julien Isorce ", + "description": "Read from arbitrary point from a file in a Google Cloud Storage", + "hierarchy": [ + "GstGsSrc", + "GstBaseSrc", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstURIHandler" + ], + "klass": "Source/File", + "long-name": "Google Cloud Storage Source", + "pad-templates": { + "src": { + "caps": "ANY", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "location": { + "blurb": "Location of the file to read", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "service-account-email": { + "blurb": "Service Account Email to use for credentials", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + } + }, + "rank": "none" + } + }, + "filename": "gstgs", + "license": "LGPL", + "other-types": { + "GstGsSinkNext": { + "kind": "enum", + "values": [ + { + "desc": "New file for each buffer", + "name": "buffer", + "value": "0" + }, + { + "desc": "New file for each buffer", + "name": "Only one file, no next file", + "value": "1" + } + ] + } + }, + "package": "GStreamer Bad Plug-ins", + "source": "gst-plugins-bad", + "tracers": {}, + "url": "Unknown package origin" + }, "gsm": { "description": "GSM encoder/decoder", "elements": { diff --git a/ext/gs/.clang-format b/ext/gs/.clang-format new file mode 100644 index 0000000..a27438b --- /dev/null +++ b/ext/gs/.clang-format @@ -0,0 +1,39 @@ +# Defines the Chromium style for automatic reformatting. +# http://clang.llvm.org/docs/ClangFormatStyleOptions.html +BasedOnStyle: Chromium +# This defaults to 'Auto'. Explicitly set it for a while, so that +# 'vector >' in existing files gets formatted to +# 'vector>'. ('Auto' means that clang-format will only use +# 'int>>' if the file already contains at least one such instance.) +Standard: Cpp11 + +# Make sure code like: +# IPC_BEGIN_MESSAGE_MAP() +# IPC_MESSAGE_HANDLER(WidgetHostViewHost_Update, OnUpdate) +# IPC_END_MESSAGE_MAP() +# gets correctly indented. +MacroBlockBegin: "^\ +BEGIN_MSG_MAP|\ +BEGIN_MSG_MAP_EX|\ +BEGIN_SAFE_MSG_MAP_EX|\ +CR_BEGIN_MSG_MAP_EX|\ +IPC_BEGIN_MESSAGE_MAP|\ +IPC_BEGIN_MESSAGE_MAP_WITH_PARAM|\ +IPC_PROTOBUF_MESSAGE_TRAITS_BEGIN|\ +IPC_STRUCT_BEGIN|\ +IPC_STRUCT_BEGIN_WITH_PARENT|\ +IPC_STRUCT_TRAITS_BEGIN|\ +POLPARAMS_BEGIN|\ +PPAPI_BEGIN_MESSAGE_MAP$" +MacroBlockEnd: "^\ +CR_END_MSG_MAP|\ +END_MSG_MAP|\ +IPC_END_MESSAGE_MAP|\ +IPC_PROTOBUF_MESSAGE_TRAITS_END|\ +IPC_STRUCT_END|\ +IPC_STRUCT_TRAITS_END|\ +POLPARAMS_END|\ +PPAPI_END_MESSAGE_MAP$" + +# TODO: Remove this once clang-format r357700 is rolled in. +JavaImportGroups: ['android', 'androidx', 'com', 'dalvik', 'junit', 'org', 'com.google.android.apps.chrome', 'org.chromium', 'java', 'javax'] diff --git a/ext/gs/README.md b/ext/gs/README.md new file mode 100644 index 0000000..48fb88c --- /dev/null +++ b/ext/gs/README.md @@ -0,0 +1,71 @@ +# Install the Google Cloud Storage dependencies. + +``` +sudo apt-get install \ + cmake \ + libcurl3-gnutls-dev \ + libgrpc++-dev \ + libprotobuf-dev \ + protobuf-compiler-grpc +``` + +# Build the Google Cloud Storage library + +``` +git clone https://github.com/google/crc32c.git +cd crc32c && git checkout -b 1.1.1 +mkdir build && cd build +cmake .. \ + -GNinja \ + -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \ + -DCMAKE_INSTALL_LIBDIR:PATH=lib \ + -DBUILD_SHARED_LIBS=YES \ + -DCRC32C_USE_GLOG=NO \ + -DCRC32C_BUILD_TESTS=NO \ + -DCRC32C_BUILD_BENCHMARKS=NO +ninja && ninja install + +git clone https://github.com/abseil/abseil-cpp.git +git checkout master +mkdir build && cd build +cmake .. \ + -GNinja \ + -DBUILD_TESTING=NO \ + -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \ + -DCMAKE_INSTALL_LIBDIR:PATH=lib \ + -DBUILD_SHARED_LIBS=YES +ninja && ninja install + +git clone https://github.com/googleapis/google-cloud-cpp.git +git checkout -b v1.25.0 +mkdir build && cd build +cmake .. \ + -GNinja \ + -DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \ + -DCMAKE_INSTALL_LIBDIR:PATH=lib \ + -DBUILD_SHARED_LIBS=YES \ + -DBUILD_TESTING=NO \ + -DGOOGLE_CLOUD_CPP_ENABLE=storage +ninja && ninja install +``` + +# Running the gs elements locally + +When running from the command line or in a container running locally, simply +set the credentials by exporting GOOGLE_APPLICATION_CREDENTIALS. If you are +not familiar with this environment variable, check the documentation +https://cloud.google.com/docs/authentication/getting-started +Note that you can restrict a service account to the role Storage Admin or +Storage Object Creator instead of the Project Owner role from the above +documentation. + +# Running the gs elements in Google Cloud Run + +Add the Storage Object Viewer role to the service account assigned to the +Cloud Run service where gssrc runs. For gssink add the role Storage Object +Creator. Then just set the service-account-email property on the element. + +# Running the gs elements in Google Cloud Kubernetes + +You need to set GOOGLE_APPLICATION_CREDENTIALS in the container and ship the +json file to which the environment variable points to. diff --git a/ext/gs/gstgs.cpp b/ext/gs/gstgs.cpp new file mode 100644 index 0000000..f5c9ddf --- /dev/null +++ b/ext/gs/gstgs.cpp @@ -0,0 +1,56 @@ +/* GStreamer + * Copyright (C) 2020 Julien Isorce + * + * gstgssrc.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +/** + * plugin-gs: + * + * The gs plugin contains elements to interact with with Google Cloud Storage. + * In particular with the gs:// protocol or by specifying the storage bucket. + * + * Since: 1.20 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstgssink.h" +#include "gstgssrc.h" + +GST_DEBUG_CATEGORY (gst_gs_src_debug); + +static gboolean +plugin_init (GstPlugin * plugin) +{ + if (!gst_element_register (plugin, "gssrc", GST_RANK_NONE, GST_TYPE_GS_SRC)) + return FALSE; + + if (!gst_element_register (plugin, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK)) + return FALSE; + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + gs, + "Read and write from and to a Google Cloud Storage", + plugin_init, + PACKAGE_VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/ext/gs/gstgscommon.cpp b/ext/gs/gstgscommon.cpp new file mode 100644 index 0000000..b0f5760 --- /dev/null +++ b/ext/gs/gstgscommon.cpp @@ -0,0 +1,134 @@ +/* GStreamer + * Copyright (C) 2020 Julien Isorce + * + * gstgscommon.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "gstgscommon.h" + +#include "google/cloud/storage/oauth2/compute_engine_credentials.h" + +namespace gcs = google::cloud::storage; + +namespace { + +#if !GLIB_CHECK_VERSION(2, 62, 0) + static inline gchar * + g_date_time_format_iso8601 (GDateTime * datetime) + { + GString * + outstr = NULL; + gchar * + main_date = NULL; + gint64 offset; + + // Main date and time. + main_date = g_date_time_format (datetime, "%Y-%m-%dT%H:%M:%S"); + outstr = g_string_new (main_date); + g_free (main_date); + + // Timezone. Format it as `%:::z` unless the offset is zero, in which case + // we can simply use `Z`. + offset = g_date_time_get_utc_offset (datetime); + + if (offset == 0) { + g_string_append_c (outstr, 'Z'); + } else { + gchar * + time_zone = g_date_time_format (datetime, "%:::z"); + g_string_append (outstr, time_zone); + g_free (time_zone); + } + + return g_string_free (outstr, FALSE); + } +#endif + +} // namespace + +std::unique_ptr < + google::cloud::storage::Client > +gst_gs_create_client (const gchar * service_account_email, GError ** error) +{ + if (service_account_email) { + // Meant to be used from a container running in the Cloud. + + google::cloud::StatusOr < std::shared_ptr < + gcs::oauth2::Credentials >> creds (std::make_shared < + gcs::oauth2::ComputeEngineCredentials <>> (service_account_email)); + if (!creds) { + g_set_error (error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_NOT_AUTHORIZED, + "Could not retrieve credentials for the given service account %s (%s)", + service_account_email, creds.status ().message ().c_str ()); + return nullptr; + } + + gcs::ClientOptions client_options (std::move (creds.value ())); + return std::make_unique < gcs::Client > (client_options, + gcs::StrictIdempotencyPolicy ()); + } + // Default account. This is meant to retrieve the credentials automatically + // using diffrent methods. + google::cloud::StatusOr < gcs::ClientOptions > client_options = + gcs::ClientOptions::CreateDefaultClientOptions (); + + if (!client_options) { + g_set_error (error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_NOT_AUTHORIZED, + "Could not create default client options (%s)", + client_options.status ().message ().c_str ()); + return nullptr; + } + return std::make_unique < gcs::Client > (client_options.value (), + gcs::StrictIdempotencyPolicy ()); +} + +gboolean +gst_gs_get_buffer_date (GstBuffer * buffer, GDateTime * start_date, + gchar ** buffer_date_str_ptr) +{ + gchar * + buffer_date_str = NULL; + GstClockTime buffer_timestamp = GST_CLOCK_TIME_NONE; + GTimeSpan buffer_timespan = 0; + + if (!buffer || !start_date) + return FALSE; + + buffer_timestamp = GST_BUFFER_PTS (buffer); + + // GTimeSpan is in micro seconds. + buffer_timespan = GST_TIME_AS_USECONDS (buffer_timestamp); + + GDateTime * + buffer_date = g_date_time_add (start_date, buffer_timespan); + if (!buffer_date) + return FALSE; + + buffer_date_str = g_date_time_format_iso8601 (buffer_date); + g_date_time_unref (buffer_date); + + if (!buffer_date_str) + return FALSE; + + if (buffer_date_str_ptr) + *buffer_date_str_ptr = buffer_date_str; + + return TRUE; +} diff --git a/ext/gs/gstgscommon.h b/ext/gs/gstgscommon.h new file mode 100644 index 0000000..8dd59f2 --- /dev/null +++ b/ext/gs/gstgscommon.h @@ -0,0 +1,39 @@ +/* GStreamer + * Copyright (C) 2020 Julien Isorce + * + * gstgscommon.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_GS_COMMON_H__ +#define __GST_GS_COMMON_H__ + +#include + +#include + +#include + +std::unique_ptr gst_gs_create_client( + const gchar* service_account_email, + GError** error); + +gboolean gst_gs_get_buffer_date(GstBuffer* buffer, + GDateTime* start_date, + gchar** buffer_date_str_ptr); + +#endif // __GST_GS_COMMON_H__ diff --git a/ext/gs/gstgssink.cpp b/ext/gs/gstgssink.cpp new file mode 100644 index 0000000..3fdb7b0 --- /dev/null +++ b/ext/gs/gstgssink.cpp @@ -0,0 +1,793 @@ +/* GStreamer + * Copyright (C) 2020 Julien Isorce + * + * gstgssink.cpp: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +/** + * SECTION:element-gssink + * @title: gssink + * @see_also: #GstGsSrc + * + * Write incoming data to a series of sequentially-named remote files on a + * Google Cloud Storage bucket. + * + * The object-name property should contain a string with a \%d placeholder + * that will be substituted with the index for each filename. + * + * If the #GstGsSink:post-messages property is %TRUE, it sends an application + * message named `GstGsSink` after writing each buffer. + * + * The message's structure contains these fields: + * + * * #gchararray `filename`: the filename where the buffer was written. + * * #gchararray `date`: the date of the current buffer, NULL if no start date + * is provided. + * * #gint `index`: index of the buffer. + * * #GstClockTime `timestamp`: the timestamp of the buffer. + * * #GstClockTime `stream-time`: the stream time of the buffer. + * * #GstClockTime `running-time`: the running_time of the buffer. + * * #GstClockTime `duration`: the duration of the buffer. + * * #guint64 `offset`: the offset of the buffer that triggered the message. + * * #guint64 `offset-end`: the offset-end of the buffer that triggered the + * message. + * + * ## Example launch line + * ``` + * gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink + * object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket" + * next-file=buffer post-messages=true + * ``` + * ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file + * names are frame00000.png, frame00001.png, ..., frame00014.png + * ``` + * gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 ! + * pngenc ! gssink start-date="2020-04-16T08:55:03Z" + * object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket" + * next-file=buffer post-messages=true + * ``` + * ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file + * names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png, + * im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png, + * im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png. + * ``` + * gst-launch-1.0 filesrc location=some_video.mp4 ! gssink + * object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none + * ``` + * ### Upload any stream as a single file into Google Cloud Storage. Similar as + * filesink in this case. The file is then accessible from: + * gs://mybucket/mypath/myvideos/video.mp4 + * + * See also: #GstGsSrc + * Since: 1.20 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstgscommon.h" +#include "gstgssink.h" + +#include + +static GstStaticPadTemplate sinktemplate = + GST_STATIC_PAD_TEMPLATE("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug); +#define GST_CAT_DEFAULT gst_gs_sink_debug + +#define DEFAULT_INDEX 0 +#define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER +#define DEFAULT_OBJECT_NAME "%s_%05d" +#define DEFAULT_POST_MESSAGES FALSE + +namespace gcs = google::cloud::storage; + +enum { + PROP_0, + PROP_BUCKET_NAME, + PROP_OBJECT_NAME, + PROP_INDEX, + PROP_POST_MESSAGES, + PROP_NEXT_FILE, + PROP_SERVICE_ACCOUNT_EMAIL, + PROP_START_DATE, +}; + +class GSWriteStream; + +struct _GstGsSink { + GstBaseSink parent; + + std::unique_ptr gcs_client; + std::unique_ptr gcs_stream; + gchar* service_account_email; + gchar* bucket_name; + gchar* object_name; + gchar* start_date_str; + GDateTime* start_date; + gint index; + gboolean post_messages; + GstGsSinkNext next_file; + const gchar* content_type; + size_t nb_percent_format; + gboolean percent_s_is_first; +}; + +static void gst_gs_sink_finalize(GObject* object); + +static void gst_gs_sink_set_property(GObject* object, + guint prop_id, + const GValue* value, + GParamSpec* pspec); +static void gst_gs_sink_get_property(GObject* object, + guint prop_id, + GValue* value, + GParamSpec* pspec); + +static gboolean gst_gs_sink_start(GstBaseSink* bsink); +static gboolean gst_gs_sink_stop(GstBaseSink* sink); +static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer); +static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink, + GstBufferList* buffer_list); +static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps); +static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event); + +#define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type()) +static GType gst_gs_sink_next_get_type(void) { + static GType gs_sink_next_type = 0; + static const GEnumValue next_types[] = { + {GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"}, + {GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"}, + {0, NULL, NULL}}; + + if (!gs_sink_next_type) { + gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types); + } + + return gs_sink_next_type; +} + +#define gst_gs_sink_parent_class parent_class +G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK); + +class GSWriteStream { + public: + GSWriteStream(google::cloud::storage::Client& client, + const char* bucket_name, + const char* object_name, + const char* content_type) + : gcs_stream_(client.WriteObject( + bucket_name, + object_name, + gcs::WithObjectMetadata( + gcs::ObjectMetadata().set_content_type(content_type)))) {} + ~GSWriteStream() { gcs_stream_.Close(); } + + gcs::ObjectWriteStream& stream() { return gcs_stream_; } + + private: + gcs::ObjectWriteStream gcs_stream_; +}; + +static void gst_gs_sink_class_init(GstGsSinkClass* klass) { + GObjectClass* gobject_class = G_OBJECT_CLASS(klass); + GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass); + GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass); + + gobject_class->set_property = gst_gs_sink_set_property; + gobject_class->get_property = gst_gs_sink_get_property; + + /** + * GstGsSink:bucket-name: + * + * Name of the Google Cloud Storage bucket. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_BUCKET_NAME, + g_param_spec_string( + "bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name", + NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS))); + + /** + * GstGsSink:object-name: + * + * Full path name of the remote file. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_OBJECT_NAME, + g_param_spec_string( + "object-name", "Object Name", "Full path name of the remote file", + DEFAULT_OBJECT_NAME, + (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS))); + + /** + * GstGsSink:index: + * + * Index to use with location property to create file names. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_INDEX, + g_param_spec_int( + "index", "Index", + "Index to use with location property to create file names. The " + "index is incremented by one for each buffer written.", + 0, G_MAXINT, DEFAULT_INDEX, + (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS))); + + /** + * GstGsSink:post-messages: + * + * Post a message on the GstBus for each file. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_POST_MESSAGES, + g_param_spec_boolean( + "post-messages", "Post Messages", + "Post a message for each file with information of the buffer", + DEFAULT_POST_MESSAGES, + (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS))); + /** + * GstGsSink:next-file: + * + * A #GstGsSinkNext that specifies when to start a new file. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_NEXT_FILE, + g_param_spec_enum( + "next-file", "Next File", "When to start a new file", + GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE, + (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS))); + + /** + * GstGsSink:service-account-email: + * + * Service Account Email to use for credentials. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_SERVICE_ACCOUNT_EMAIL, + g_param_spec_string( + "service-account-email", "Service Account Email", + "Service Account Email to use for credentials", NULL, + (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY))); + + /** + * GstGsSink:start-date: + * + * Start date in iso8601 format. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_START_DATE, + g_param_spec_string( + "start-date", "Start Date", "Start date in iso8601 format", NULL, + (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY))); + + gobject_class->finalize = gst_gs_sink_finalize; + + gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start); + gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop); + gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render); + gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list); + gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps); + gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event); + + GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element"); + + gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate); + gst_element_class_set_static_metadata( + gstelement_class, "Google Cloud Storage Sink", "Sink/File", + "Write buffers to a sequentially named set of files on Google Cloud " + "Storage", + "Julien Isorce "); +} + +static void gst_gs_sink_init(GstGsSink* sink) { + sink->gcs_client = nullptr; + sink->gcs_stream = nullptr; + sink->index = DEFAULT_INDEX; + sink->post_messages = DEFAULT_POST_MESSAGES; + sink->service_account_email = NULL; + sink->bucket_name = NULL; + sink->object_name = g_strdup(DEFAULT_OBJECT_NAME); + sink->start_date_str = NULL; + sink->start_date = NULL; + sink->next_file = DEFAULT_NEXT_FILE; + sink->content_type = NULL; + sink->nb_percent_format = 0; + sink->percent_s_is_first = FALSE; + + gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE); +} + +static void gst_gs_sink_finalize(GObject* object) { + GstGsSink* sink = GST_GS_SINK(object); + + sink->gcs_client = nullptr; + sink->gcs_stream = nullptr; + g_free(sink->service_account_email); + sink->service_account_email = NULL; + g_free(sink->bucket_name); + sink->bucket_name = NULL; + g_free(sink->object_name); + sink->object_name = NULL; + g_free(sink->start_date_str); + sink->start_date_str = NULL; + if (sink->start_date) { + g_date_time_unref(sink->start_date); + sink->start_date = NULL; + } + sink->content_type = NULL; + + G_OBJECT_CLASS(parent_class)->finalize(object); +} + +static gboolean gst_gs_sink_set_object_name(GstGsSink* sink, + const gchar* object_name) { + g_free(sink->object_name); + sink->object_name = NULL; + sink->nb_percent_format = 0; + sink->percent_s_is_first = FALSE; + + if (!object_name) { + GST_ERROR_OBJECT(sink, "Object name is null"); + return FALSE; + } + + const std::string name(object_name); + sink->nb_percent_format = std::count(name.begin(), name.end(), '%'); + if (sink->nb_percent_format > 2) { + GST_ERROR_OBJECT(sink, "Object name has too many formats"); + return FALSE; + } + + const size_t delimiter_percent_s = name.find("%s"); + if (delimiter_percent_s == std::string::npos) { + if (sink->nb_percent_format == 2) { + GST_ERROR_OBJECT(sink, "Object name must have just one number format"); + return FALSE; + } + sink->object_name = g_strdup(object_name); + return TRUE; + } + + const size_t delimiter_percent = name.find_first_of('%'); + if (delimiter_percent_s == delimiter_percent) { + sink->percent_s_is_first = TRUE; + + if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) { + GST_ERROR_OBJECT(sink, "Object name expect max one string format"); + return FALSE; + } + } + + sink->object_name = g_strdup(object_name); + + return TRUE; +} + +static void gst_gs_sink_set_property(GObject* object, + guint prop_id, + const GValue* value, + GParamSpec* pspec) { + GstGsSink* sink = GST_GS_SINK(object); + + switch (prop_id) { + case PROP_BUCKET_NAME: + g_free(sink->bucket_name); + sink->bucket_name = g_strdup(g_value_get_string(value)); + break; + case PROP_OBJECT_NAME: + gst_gs_sink_set_object_name(sink, g_value_get_string(value)); + break; + case PROP_INDEX: + sink->index = g_value_get_int(value); + break; + case PROP_POST_MESSAGES: + sink->post_messages = g_value_get_boolean(value); + break; + case PROP_NEXT_FILE: + sink->next_file = (GstGsSinkNext)g_value_get_enum(value); + break; + case PROP_SERVICE_ACCOUNT_EMAIL: + g_free(sink->service_account_email); + sink->service_account_email = g_strdup(g_value_get_string(value)); + break; + case PROP_START_DATE: + g_free(sink->start_date_str); + if (sink->start_date) + g_date_time_unref(sink->start_date); + sink->start_date_str = g_strdup(g_value_get_string(value)); + sink->start_date = + g_date_time_new_from_iso8601(sink->start_date_str, NULL); + if (!sink->start_date) { + GST_ERROR_OBJECT(sink, "Failed to parse start date %s", + sink->start_date_str); + g_free(sink->start_date_str); + sink->start_date_str = NULL; + } + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void gst_gs_sink_get_property(GObject* object, + guint prop_id, + GValue* value, + GParamSpec* pspec) { + GstGsSink* sink = GST_GS_SINK(object); + + switch (prop_id) { + case PROP_BUCKET_NAME: + g_value_set_string(value, sink->bucket_name); + break; + case PROP_OBJECT_NAME: + g_value_set_string(value, sink->object_name); + break; + case PROP_INDEX: + g_value_set_int(value, sink->index); + break; + case PROP_POST_MESSAGES: + g_value_set_boolean(value, sink->post_messages); + break; + case PROP_NEXT_FILE: + g_value_set_enum(value, sink->next_file); + break; + case PROP_SERVICE_ACCOUNT_EMAIL: + g_value_set_string(value, sink->service_account_email); + break; + case PROP_START_DATE: + g_value_set_string(value, sink->start_date_str); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static gboolean gst_gs_sink_start(GstBaseSink* bsink) { + GstGsSink* sink = GST_GS_SINK(bsink); + GError* err = NULL; + + if (!sink->bucket_name) { + GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"), + GST_ERROR_SYSTEM); + return FALSE; + } + + if (!sink->object_name) { + GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"), + GST_ERROR_SYSTEM); + return FALSE; + } + + sink->content_type = ""; + + sink->gcs_client = gst_gs_create_client(sink->service_account_email, &err); + if (err) { + GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ, + ("Could not create client (%s)", err->message), + GST_ERROR_SYSTEM); + g_clear_error(&err); + return FALSE; + } + + GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)", + sink->bucket_name, sink->object_name); + + return TRUE; +} + +static gboolean gst_gs_sink_stop(GstBaseSink* bsink) { + GstGsSink* sink = GST_GS_SINK(bsink); + + sink->gcs_client = nullptr; + sink->gcs_stream = nullptr; + sink->content_type = NULL; + + return TRUE; +} + +static void gst_gs_sink_post_message_full(GstGsSink* sink, + GstClockTime timestamp, + GstClockTime duration, + GstClockTime offset, + GstClockTime offset_end, + GstClockTime running_time, + GstClockTime stream_time, + const char* filename, + const gchar* date) { + GstStructure* s; + + if (!sink->post_messages) + return; + + s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename, + "date", G_TYPE_STRING, date, "index", G_TYPE_INT, + sink->index, "timestamp", G_TYPE_UINT64, timestamp, + "stream-time", G_TYPE_UINT64, stream_time, + "running-time", G_TYPE_UINT64, running_time, "duration", + G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64, + offset, "offset-end", G_TYPE_UINT64, offset_end, NULL); + + gst_element_post_message(GST_ELEMENT_CAST(sink), + gst_message_new_element(GST_OBJECT_CAST(sink), s)); +} + +static void gst_gs_sink_post_message_from_time(GstGsSink* sink, + GstClockTime timestamp, + GstClockTime duration, + const char* filename) { + GstClockTime running_time, stream_time; + guint64 offset, offset_end; + GstSegment* segment; + GstFormat format; + + if (!sink->post_messages) + return; + + segment = &GST_BASE_SINK(sink)->segment; + format = segment->format; + + offset = -1; + offset_end = -1; + + running_time = gst_segment_to_running_time(segment, format, timestamp); + stream_time = gst_segment_to_stream_time(segment, format, timestamp); + + gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end, + running_time, stream_time, filename, NULL); +} + +static void gst_gs_sink_post_message(GstGsSink* sink, + GstBuffer* buffer, + const char* filename, + const char* date) { + GstClockTime duration, timestamp; + GstClockTime running_time, stream_time; + guint64 offset, offset_end; + GstSegment* segment; + GstFormat format; + + if (!sink->post_messages) + return; + + segment = &GST_BASE_SINK(sink)->segment; + format = segment->format; + + timestamp = GST_BUFFER_PTS(buffer); + duration = GST_BUFFER_DURATION(buffer); + offset = GST_BUFFER_OFFSET(buffer); + offset_end = GST_BUFFER_OFFSET_END(buffer); + + running_time = gst_segment_to_running_time(segment, format, timestamp); + stream_time = gst_segment_to_stream_time(segment, format, timestamp); + + gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end, + running_time, stream_time, filename, date); +} + +static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink, + GstBuffer* buffer) { + GstMapInfo map = {0}; + gchar* object_name = NULL; + gchar* buffer_date = NULL; + + if (!gst_buffer_map(buffer, &map, GST_MAP_READ)) + return GST_FLOW_ERROR; + + switch (sink->next_file) { + case GST_GS_SINK_NEXT_BUFFER: { + // Get buffer date if needed. + if (sink->start_date) { + if (sink->nb_percent_format != 2) { + GST_ERROR_OBJECT(sink, "Object name expects date and index"); + gst_buffer_unmap(buffer, &map); + return GST_FLOW_ERROR; + } + + if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) { + GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name); + gst_buffer_unmap(buffer, &map); + return GST_FLOW_ERROR; + } + + if (sink->percent_s_is_first) { + object_name = + g_strdup_printf(sink->object_name, buffer_date, sink->index); + } else { + object_name = + g_strdup_printf(sink->object_name, sink->index, buffer_date); + } + } else { + if (sink->nb_percent_format != 1) { + GST_ERROR_OBJECT(sink, "Object name expects only an index"); + gst_buffer_unmap(buffer, &map); + return GST_FLOW_ERROR; + } + + object_name = g_strdup_printf(sink->object_name, sink->index); + } + + GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size); + + gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject( + sink->bucket_name, object_name, + gcs::WithObjectMetadata( + gcs::ObjectMetadata().set_content_type(sink->content_type))); + gcs_stream.write(reinterpret_cast(map.data), map.size); + if (gcs_stream.fail()) { + GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name); + } + gcs_stream.Close(); + + google::cloud::StatusOr object_metadata = + sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name); + if (!object_metadata) { + GST_ERROR_OBJECT( + sink, "Could not get object metadata for object %s (%s)", + object_name, object_metadata.status().message().c_str()); + gst_buffer_unmap(buffer, &map); + g_free(object_name); + g_free(buffer_date); + return GST_FLOW_ERROR; + } + + GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n", + object_name, object_metadata->size()); + + gst_gs_sink_post_message(sink, buffer, object_name, buffer_date); + g_free(object_name); + g_free(buffer_date); + ++sink->index; + break; + } + case GST_GS_SINK_NEXT_NONE: { + if (!sink->gcs_stream) { + GST_INFO_OBJECT(sink, "Opening %s", sink->object_name); + sink->gcs_stream = std::make_unique( + *sink->gcs_client.get(), sink->bucket_name, sink->object_name, + sink->content_type); + + if (!sink->gcs_stream->stream().IsOpen()) { + GST_ELEMENT_ERROR( + sink, RESOURCE, OPEN_READ, + ("Could not create write stream (%s)", + sink->gcs_stream->stream().last_status().message().c_str()), + GST_ERROR_SYSTEM); + gst_buffer_unmap(buffer, &map); + return GST_FLOW_OK; + } + } + + GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size); + + gcs::ObjectWriteStream& stream = sink->gcs_stream->stream(); + stream.write(reinterpret_cast(map.data), map.size); + if (stream.fail()) { + GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name); + } + break; + } + default: + g_assert_not_reached(); + } + + gst_buffer_unmap(buffer, &map); + return GST_FLOW_OK; +} + +static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) { + GstGsSink* sink = GST_GS_SINK(bsink); + GstFlowReturn flow = GST_FLOW_OK; + + flow = gst_gs_sink_write_buffer(sink, buffer); + return flow; +} + +static gboolean buffer_list_copy_data(GstBuffer** buf, + guint idx, + gpointer data) { + GstBuffer* dest = GST_BUFFER_CAST(data); + guint num, i; + + if (idx == 0) + gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1); + + num = gst_buffer_n_memory(*buf); + for (i = 0; i < num; ++i) { + GstMemory* mem; + + mem = gst_buffer_get_memory(*buf, i); + gst_buffer_append_memory(dest, mem); + } + + return TRUE; +} + +/* Our assumption for now is that the buffers in a buffer list should always + * end up in the same file. If someone wants different behaviour, they'll just + * have to add a property for that. */ +static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink, + GstBufferList* list) { + GstBuffer* buf; + guint size; + + size = gst_buffer_list_calculate_size(list); + GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size); + + /* copy all buffers in the list into one single buffer, so we can use + * the normal render function (FIXME: optimise to avoid the memcpy) */ + buf = gst_buffer_new(); + gst_buffer_list_foreach(list, buffer_list_copy_data, buf); + g_assert(gst_buffer_get_size(buf) == size); + + gst_gs_sink_render(sink, buf); + gst_buffer_unref(buf); + + return GST_FLOW_OK; +} + +static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) { + GstGsSink* sink = GST_GS_SINK(bsink); + GstStructure* s = gst_caps_get_structure(caps, 0); + + sink->content_type = gst_structure_get_name(s); + + GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type); + + return TRUE; +} + +static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) { + GstGsSink* sink = GST_GS_SINK(bsink); + + switch (GST_EVENT_TYPE(event)) { + case GST_EVENT_EOS: + if (sink->gcs_stream) { + sink->gcs_stream = nullptr; + gst_gs_sink_post_message_from_time( + sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name); + } + break; + default: + break; + } + + return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event); +} diff --git a/ext/gs/gstgssink.h b/ext/gs/gstgssink.h new file mode 100644 index 0000000..e93ac4b --- /dev/null +++ b/ext/gs/gstgssink.h @@ -0,0 +1,47 @@ +/* GStreamer + * Copyright (C) 2020 Julien Isorce + * + * gstgssink.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_GS_SINK_H__ +#define __GST_GS_SINK_H__ + +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_GS_SINK (gst_gs_sink_get_type()) +G_DECLARE_FINAL_TYPE(GstGsSink, gst_gs_sink, GST, GS_SINK, GstBaseSink) + +/** + * GstGsSinkNext: + * @GST_GS_SINK_NEXT_BUFFER: New file for each buffer. + * @GST_GS_SINK_NEXT_NONE: Only one file like filesink. + * + * File splitting modes. + * Since: 1.20 + */ +typedef enum { + GST_GS_SINK_NEXT_BUFFER, + GST_GS_SINK_NEXT_NONE, +} GstGsSinkNext; + +G_END_DECLS +#endif // __GST_GS_SINK_H__ diff --git a/ext/gs/gstgssrc.cpp b/ext/gs/gstgssrc.cpp new file mode 100644 index 0000000..c37640b --- /dev/null +++ b/ext/gs/gstgssrc.cpp @@ -0,0 +1,578 @@ +/* GStreamer + * Copyright (C) 2020 Julien Isorce + * + * gstgssrc.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-gssrc + * @title: gssrc + * @see_also: #GstGsSrc + * + * Read data from a file in a Google Cloud Storage. + * + * ## Example launch line + * ``` + * gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin ! + * glimagesink + * ``` + * ### Play a video from a Google Cloud Storage. + * ``` + * gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin ! navseek + * seek-offset=10 ! glimagesink + * ``` + * ### Play a video from a Google Cloud Storage and seek using the keyboard + * from the terminal. + * + * See also: #GstGsSink + * Since: 1.20 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstgscommon.h" +#include "gstgssrc.h" + +static GstStaticPadTemplate srctemplate = + GST_STATIC_PAD_TEMPLATE("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC(gst_gs_src_debug); +#define GST_CAT_DEFAULT gst_gs_src_debug + +enum { LAST_SIGNAL }; + +#define DEFAULT_BLOCKSIZE 4 * 1024 + +enum { PROP_0, PROP_LOCATION, PROP_SERVICE_ACCOUNT_EMAIL }; + +class GSReadStream; + +struct _GstGsSrc { + GstBaseSrc parent; + + std::unique_ptr gcs_client; + std::unique_ptr gcs_stream; + gchar* uri; + gchar* service_account_email; + std::string bucket_name; + std::string object_name; + guint64 read_position; + guint64 object_size; +}; + +static void gst_gs_src_finalize(GObject* object); + +static void gst_gs_src_set_property(GObject* object, + guint prop_id, + const GValue* value, + GParamSpec* pspec); +static void gst_gs_src_get_property(GObject* object, + guint prop_id, + GValue* value, + GParamSpec* pspec); + +static gboolean gst_gs_src_start(GstBaseSrc* basesrc); +static gboolean gst_gs_src_stop(GstBaseSrc* basesrc); + +static gboolean gst_gs_src_is_seekable(GstBaseSrc* src); +static gboolean gst_gs_src_get_size(GstBaseSrc* src, guint64* size); +static GstFlowReturn gst_gs_src_fill(GstBaseSrc* src, + guint64 offset, + guint length, + GstBuffer* buf); +static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query); + +static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data); + +#define _do_init \ + G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, gst_gs_src_uri_handler_init); \ + GST_DEBUG_CATEGORY_INIT(gst_gs_src_debug, "gssrc", 0, "gssrc element"); +#define gst_gs_src_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE(GstGsSrc, gst_gs_src, GST_TYPE_BASE_SRC, _do_init); + +namespace gcs = google::cloud::storage; + +class GSReadStream { + public: + GSReadStream(GstGsSrc* src, + const std::int64_t start = 0, + const std::int64_t end = -1) + : gcs_stream_(src->gcs_client->ReadObject(src->bucket_name, + src->object_name, + gcs::ReadFromOffset(start))) {} + ~GSReadStream() { gcs_stream_.Close(); } + + gcs::ObjectReadStream& stream() { return gcs_stream_; } + + private: + gcs::ObjectReadStream gcs_stream_; +}; + +static void gst_gs_src_class_init(GstGsSrcClass* klass) { + GObjectClass* gobject_class = G_OBJECT_CLASS(klass); + GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass); + GstBaseSrcClass* gstbasesrc_class = GST_BASE_SRC_CLASS(klass); + + gobject_class->set_property = gst_gs_src_set_property; + gobject_class->get_property = gst_gs_src_get_property; + + /** + * GstGsSink:location: + * + * Name of the Google Cloud Storage bucket. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_LOCATION, + g_param_spec_string( + "location", "File Location", "Location of the file to read", NULL, + (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY))); + + /** + * GstGsSrc:service-account-email: + * + * Service Account Email to use for credentials. + * + * Since: 1.20 + */ + g_object_class_install_property( + gobject_class, PROP_SERVICE_ACCOUNT_EMAIL, + g_param_spec_string( + "service-account-email", "Service Account Email", + "Service Account Email to use for credentials", NULL, + (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY))); + + gobject_class->finalize = gst_gs_src_finalize; + + gst_element_class_set_static_metadata( + gstelement_class, "Google Cloud Storage Source", "Source/File", + "Read from arbitrary point from a file in a Google Cloud Storage", + "Julien Isorce "); + gst_element_class_add_static_pad_template(gstelement_class, &srctemplate); + + gstbasesrc_class->start = GST_DEBUG_FUNCPTR(gst_gs_src_start); + gstbasesrc_class->stop = GST_DEBUG_FUNCPTR(gst_gs_src_stop); + gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR(gst_gs_src_is_seekable); + gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR(gst_gs_src_get_size); + gstbasesrc_class->fill = GST_DEBUG_FUNCPTR(gst_gs_src_fill); + gstbasesrc_class->query = GST_DEBUG_FUNCPTR(gst_gs_src_query); +} + +static void gst_gs_src_init(GstGsSrc* src) { + src->gcs_stream = nullptr; + src->uri = NULL; + src->service_account_email = NULL; + src->read_position = 0; + src->object_size = 0; + + gst_base_src_set_blocksize(GST_BASE_SRC(src), DEFAULT_BLOCKSIZE); + gst_base_src_set_dynamic_size(GST_BASE_SRC(src), FALSE); + gst_base_src_set_live(GST_BASE_SRC(src), FALSE); +} + +static void gst_gs_src_finalize(GObject* object) { + GstGsSrc* src = GST_GS_SRC(object); + + g_free(src->uri); + src->uri = NULL; + g_free(src->service_account_email); + src->service_account_email = NULL; + src->read_position = 0; + src->object_size = 0; + + G_OBJECT_CLASS(parent_class)->finalize(object); +} + +static gboolean gst_gs_src_set_location(GstGsSrc* src, + const gchar* location, + GError** err) { + GstState state = GST_STATE_NULL; + std::string filepath = location; + size_t delimiter = std::string::npos; + + // The element must be stopped in order to do this. + GST_OBJECT_LOCK(src); + state = GST_STATE(src); + if (state != GST_STATE_READY && state != GST_STATE_NULL) + goto wrong_state; + GST_OBJECT_UNLOCK(src); + + g_free(src->uri); + src->uri = NULL; + + if (location) { + if (g_str_has_prefix(location, "gs://")) { + src->uri = g_strdup(location); + filepath = filepath.substr(5); + } else { + src->uri = g_strdup_printf("gs://%s", location); + filepath = location; + } + + delimiter = filepath.find_first_of('/'); + if (delimiter == std::string::npos) + goto wrong_location; + + std::string bucket_name = filepath.substr(0, delimiter); + src->bucket_name = bucket_name; + src->object_name = filepath.substr(delimiter + 1); + + GST_INFO_OBJECT(src, "uri is %s", src->uri); + GST_INFO_OBJECT(src, "bucket name is %s", src->bucket_name.c_str()); + GST_INFO_OBJECT(src, "object name is %s", src->object_name.c_str()); + } + g_object_notify(G_OBJECT(src), "location"); + + return TRUE; + + // ERROR. +wrong_state : { + g_warning( + "Changing the `location' property on gssrc when a file is open" + "is not supported."); + if (err) + g_set_error( + err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE, + "Changing the `location' property on gssrc when a file is open is " + "not supported."); + GST_OBJECT_UNLOCK(src); + return FALSE; +} +wrong_location : { + if (err) + g_set_error(err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, + "Failed to find a bucket name"); + GST_OBJECT_UNLOCK(src); + return FALSE; +} +} + +static gboolean gst_gs_src_set_service_account_email( + GstGsSrc* src, + const gchar* service_account_email) { + if (GST_STATE(src) == GST_STATE_PLAYING || + GST_STATE(src) == GST_STATE_PAUSED) { + GST_WARNING_OBJECT(src, + "Setting a new service account email not supported in " + "PLAYING or PAUSED state"); + return FALSE; + } + + GST_OBJECT_LOCK(src); + g_free(src->service_account_email); + src->service_account_email = NULL; + + if (service_account_email) + src->service_account_email = g_strdup(service_account_email); + + GST_OBJECT_UNLOCK(src); + + return TRUE; +} + +static void gst_gs_src_set_property(GObject* object, + guint prop_id, + const GValue* value, + GParamSpec* pspec) { + GstGsSrc* src = GST_GS_SRC(object); + + g_return_if_fail(GST_IS_GS_SRC(object)); + + switch (prop_id) { + case PROP_LOCATION: + gst_gs_src_set_location(src, g_value_get_string(value), NULL); + break; + case PROP_SERVICE_ACCOUNT_EMAIL: + gst_gs_src_set_service_account_email(src, g_value_get_string(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void gst_gs_src_get_property(GObject* object, + guint prop_id, + GValue* value, + GParamSpec* pspec) { + GstGsSrc* src = GST_GS_SRC(object); + + g_return_if_fail(GST_IS_GS_SRC(object)); + + switch (prop_id) { + case PROP_LOCATION: + GST_OBJECT_LOCK(src); + g_value_set_string(value, src->uri); + GST_OBJECT_UNLOCK(src); + break; + case PROP_SERVICE_ACCOUNT_EMAIL: + GST_OBJECT_LOCK(src); + g_value_set_string(value, src->service_account_email); + GST_OBJECT_UNLOCK(src); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static gint gst_gs_read_stream(GstGsSrc* src, + guint8* data, + const guint64 offset, + const guint length) { + gint gcount = 0; + gchar* sdata = reinterpret_cast(data); + + gcs::ObjectReadStream& stream = src->gcs_stream->stream(); + + while (!stream.eof()) { + stream.read(sdata, length); + if (stream.status().ok()) + break; + + GST_ERROR_OBJECT(src, "Restart after (%s)", + stream.status().message().c_str()); + src->gcs_stream = std::make_unique(src, offset); + } + + gcount = stream.gcount(); + + GST_INFO_OBJECT(src, "Client read %d bytes", gcount); + + return gcount; +} + +static GstFlowReturn gst_gs_src_fill(GstBaseSrc* basesrc, + guint64 offset, + guint length, + GstBuffer* buf) { + GstGsSrc* src = GST_GS_SRC(basesrc); + guint to_read = 0; + guint bytes_read = 0; + gint ret = 0; + GstMapInfo info = {}; + guint8* data = NULL; + + if (G_UNLIKELY(offset != (guint64)-1 && src->read_position != offset)) { + src->gcs_stream = std::make_unique(src, offset); + src->read_position = offset; + } + + if (!gst_buffer_map(buf, &info, GST_MAP_WRITE)) + goto buffer_write_fail; + + data = info.data; + + bytes_read = 0; + to_read = length; + while (to_read > 0) { + GST_INFO_OBJECT(src, "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x", + to_read, offset + bytes_read); + + ret = gst_gs_read_stream(src, data + bytes_read, offset, to_read); + if (G_UNLIKELY(ret < 0)) + goto could_not_read; + + if (G_UNLIKELY(ret == 0)) { + // Push any remaining data. + if (bytes_read > 0) + break; + goto eos; + } + + to_read -= ret; + bytes_read += ret; + + src->read_position += ret; + } + + GST_INFO_OBJECT( + src, "Read %" G_GUINT32_FORMAT " bytes of %" G_GUINT32_FORMAT " length", + bytes_read, length); + + gst_buffer_unmap(buf, &info); + if (bytes_read != length) + gst_buffer_resize(buf, 0, bytes_read); + + GST_BUFFER_OFFSET(buf) = offset; + GST_BUFFER_OFFSET_END(buf) = offset + bytes_read; + + return GST_FLOW_OK; + + // ERROR. +could_not_read : { + GST_ELEMENT_ERROR(src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + gst_buffer_unmap(buf, &info); + gst_buffer_resize(buf, 0, 0); + return GST_FLOW_ERROR; +} +eos : { + GST_INFO_OBJECT(src, "EOS"); + gst_buffer_unmap(buf, &info); + gst_buffer_resize(buf, 0, 0); + return GST_FLOW_EOS; +} +buffer_write_fail : { + GST_ELEMENT_ERROR(src, RESOURCE, WRITE, (NULL), ("Can't write to buffer")); + return GST_FLOW_ERROR; +} +} + +static gboolean gst_gs_src_is_seekable(GstBaseSrc* basesrc) { + return TRUE; +} + +static gboolean gst_gs_src_get_size(GstBaseSrc* basesrc, guint64* size) { + GstGsSrc* src = GST_GS_SRC(basesrc); + + *size = src->object_size; + + return TRUE; +} + +static gboolean gst_gs_src_start(GstBaseSrc* basesrc) { + GstGsSrc* src = GST_GS_SRC(basesrc); + GError* err = NULL; + guint blocksize = 0; + + src->read_position = 0; + src->object_size = 0; + + if (src->uri == NULL || src->uri[0] == '\0') { + GST_ELEMENT_ERROR(src, RESOURCE, NOT_FOUND, + ("No uri specified for reading."), (NULL)); + return FALSE; + } + + GST_INFO_OBJECT(src, "Opening file %s", src->uri); + + src->gcs_client = gst_gs_create_client(src->service_account_email, &err); + if (err) { + GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, + ("Could not create client (%s)", err->message), + GST_ERROR_SYSTEM); + g_clear_error(&err); + return FALSE; + } + + GST_INFO_OBJECT(src, "Parsed bucket name (%s) and object name (%s)", + src->bucket_name.c_str(), src->object_name.c_str()); + + google::cloud::StatusOr object_metadata = + src->gcs_client->GetObjectMetadata(src->bucket_name, src->object_name); + if (!object_metadata) { + GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, + ("Could not get object metadata (%s)", + object_metadata.status().message().c_str()), + GST_ERROR_SYSTEM); + return FALSE; + } + + src->object_size = object_metadata->size(); + + GST_INFO_OBJECT(src, "Object size %" G_GUINT64_FORMAT "\n", src->object_size); + + src->gcs_stream = std::make_unique(src); + + blocksize = gcs::ClientOptions(nullptr).download_buffer_size(); + + GST_INFO_OBJECT(src, "Set blocksize to %" G_GUINT32_FORMAT, blocksize); + + gst_base_src_set_blocksize(GST_BASE_SRC(src), blocksize); + + return TRUE; +} + +static gboolean gst_gs_src_stop(GstBaseSrc* basesrc) { + GstGsSrc* src = GST_GS_SRC(basesrc); + + src->gcs_stream = nullptr; + src->read_position = 0; + src->object_size = 0; + + return TRUE; +} + +static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query) { + gboolean ret; + + switch (GST_QUERY_TYPE(query)) { + case GST_QUERY_SCHEDULING: { + // A pushsrc can by default never operate in pull mode override + // if you want something different. + gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEQUENTIAL, 1, -1, 0); + gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH); + + ret = TRUE; + break; + } + default: + ret = GST_BASE_SRC_CLASS(parent_class)->query(src, query); + break; + } + return ret; +} + +static GstURIType gst_gs_src_uri_get_type(GType type) { + return GST_URI_SRC; +} + +static const gchar* const* gst_gs_src_uri_get_protocols(GType type) { + static const gchar* protocols[] = {"gs", NULL}; + + return protocols; +} + +static gchar* gst_gs_src_uri_get_uri(GstURIHandler* handler) { + GstGsSrc* src = GST_GS_SRC(handler); + + return g_strdup(src->uri); +} + +static gboolean gst_gs_src_uri_set_uri(GstURIHandler* handler, + const gchar* uri, + GError** err) { + GstGsSrc* src = GST_GS_SRC(handler); + + if (strcmp(uri, "gs://") == 0) { + // Special case for "gs://" as this is used by some applications + // to test with gst_element_make_from_uri if there's an element + // that supports the URI protocol. + gst_gs_src_set_location(src, NULL, NULL); + return TRUE; + } + + return gst_gs_src_set_location(src, uri, err); +} + +static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data) { + GstURIHandlerInterface* iface = (GstURIHandlerInterface*)g_iface; + + iface->get_type = gst_gs_src_uri_get_type; + iface->get_protocols = gst_gs_src_uri_get_protocols; + iface->get_uri = gst_gs_src_uri_get_uri; + iface->set_uri = gst_gs_src_uri_set_uri; +} diff --git a/ext/gs/gstgssrc.h b/ext/gs/gstgssrc.h new file mode 100644 index 0000000..717742f --- /dev/null +++ b/ext/gs/gstgssrc.h @@ -0,0 +1,34 @@ +/* GStreamer + * Copyright (C) 2020 Julien Isorce + * + * gstgssrc.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_GS_SRC_H__ +#define __GST_GS_SRC_H__ + +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_GS_SRC (gst_gs_src_get_type()) +G_DECLARE_FINAL_TYPE(GstGsSrc, gst_gs_src, GST, GS_SRC, GstBaseSrc) + +G_END_DECLS +#endif // __GST_GS_SRC_H__ diff --git a/ext/gs/meson.build b/ext/gs/meson.build new file mode 100644 index 0000000..50e7ff8 --- /dev/null +++ b/ext/gs/meson.build @@ -0,0 +1,28 @@ +gs_sources = [ + 'gstgscommon.cpp', + 'gstgssink.cpp', + 'gstgssrc.cpp', + 'gstgs.cpp', +] + +gs_dep = dependency('storage_client', version : '>= 1.25.0', required : get_option('gs')) + +if gs_dep.found() + gstgs = library('gstgs', + gs_sources, + c_args : gst_plugins_bad_args, + cpp_args : gst_plugins_bad_args, + include_directories : [configinc, libsinc], + dependencies : [gstbase_dep, gs_dep], + install : true, + install_dir : plugins_install_dir, + ) + pkgconfig.generate(gstgs, install_dir : plugins_pkgconfig_install_dir) + plugins += [gstgs] +endif + +clang_format_p = find_program('clang-format', required: false) +if clang_format_p.found() + run_command(clang_format_p, '--style=file', '-i', 'gstgssink.cpp', 'gstgssrc.cpp') +endif + diff --git a/ext/meson.build b/ext/meson.build index 63844ca..98a0053 100644 --- a/ext/meson.build +++ b/ext/meson.build @@ -18,6 +18,7 @@ subdir('fdkaac') subdir('flite') subdir('fluidsynth') subdir('gme') +subdir('gs') subdir('gsm') subdir('hls') subdir('iqa') diff --git a/meson_options.txt b/meson_options.txt index 0326851..4b8eaa3 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -107,6 +107,7 @@ option('flite', type : 'feature', value : 'auto', description : 'Flite speech sy option('fluidsynth', type : 'feature', value : 'auto', description : 'Fluidsynth MIDI decoder plugin') option('gl', type : 'feature', value : 'auto', description : 'GStreamer OpenGL integration support (used by various plugins)') option('gme', type : 'feature', value : 'auto', description : 'libgme gaming console music file decoder plugin') +option('gs', type : 'feature', value : 'auto', description : 'Google Cloud Storage source and sink plugin') option('gsm', type : 'feature', value : 'auto', description : 'GSM encoder/decoder plugin') option('ipcpipeline', type : 'feature', value : 'auto', description : 'Inter-process communication plugin') option('iqa', type : 'feature', value : 'auto', description : 'Image quality assessment plugin')