This patch adds skeleton codes of sink and src elements for MQTT support.
Signed-off-by: Wook Song <wook16.song@samsung.com>
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-only */
+/**
+ * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
+ */
+/**
+ * @file mqttcommon.h
+ * @date 08 Mar 2021
+ * @brief Common macros and utility functions for GStreamer MQTT plugins
+ * @see https://github.com/nnstreamer/nnstreamer
+ * @author Wook Song <wook16.song@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#ifndef __GST_MQTT_COMMON_H__
+#define __GST_MQTT_COMMON_H__
+
+#ifndef GST_MQTT_PACKAGE
+#define GST_MQTT_PACKAGE "GStreamer MQTT Plugins"
+#endif /* GST_MQTT_PACKAGE */
+
+#define GST_MQTT_ELEM_NAME_SINK "mqttsink"
+#define GST_MQTT_ELEM_NAME_SRC "mqttsrc"
+
+#endif /* !__GST_MQTT_COMMON_H__ */
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-only */
+/**
+ * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
+ */
+/**
+ * @file mqttsink.c
+ * @date 09 Mar 2021
+ * @brief Register sub-plugins included in libgstmqtt
+ * @see https://github.com/nnstreamer/nnstreamer
+ * @author Wook Song <wook16.song@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#include <gst/gst.h>
+
+#include "mqttcommon.h"
+#include "mqttsink.h"
+#include "mqttsrc.h"
+
+/**
+ * @brief The entry point of the GStreamer MQTT plugin
+ */
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ if (!gst_element_register (plugin, GST_MQTT_ELEM_NAME_SINK, GST_RANK_NONE,
+ GST_TYPE_MQTT_SINK)) {
+ return FALSE;
+ }
+
+ if (!gst_element_register (plugin, GST_MQTT_ELEM_NAME_SRC, GST_RANK_NONE,
+ GST_TYPE_MQTT_SRC)) {
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+#ifndef PACKAGE
+#define PACKAGE GST_MQTT_PACKAGE
+#endif
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR, mqtt,
+ "A collection of GStreamer plugins to support MQTT",
+ plugin_init, VERSION, "LGPL", PACKAGE,
+ "https://github.com/nnstreamer/nnstreamer")
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-only */
+/**
+ * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
+ */
+/**
+ * @file mqttsink.c
+ * @date 08 Mar 2021
+ * @brief Publish incoming data streams as a MQTT topic
+ * @see https://github.com/nnstreamer/nnstreamer
+ * @author Wook Song <wook16.song@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gst/base/gstbasesink.h>
+
+#include "mqttsink.h"
+
+#define gst_mqtt_sink_parent_class parent_class
+G_DEFINE_TYPE (GstMqttSink, gst_mqtt_sink, GST_TYPE_BASE_SINK);
+
+GST_DEBUG_CATEGORY_STATIC (gst_mqtt_sink_debug);
+#define GST_CAT_DEFAULT gst_mqtt_sink_debug
+
+enum
+{
+ PROP_0,
+
+ PROP_LAST
+};
+
+/** Function prototype declarations */
+static void
+gst_mqtt_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void
+gst_mqtt_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static void gst_mqtt_sink_class_finalize (GObject * object);
+
+static GstStateChangeReturn
+gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition);
+
+static gboolean gst_mqtt_sink_start (GstBaseSink * basesink);
+static gboolean gst_mqtt_sink_stop (GstBaseSink * basesink);
+static gboolean gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query);
+static GstFlowReturn
+gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * buffer);
+static GstFlowReturn
+gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list);
+static gboolean gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event);
+
+/**
+ * @brief Initialize GstMqttSink object
+ */
+static void
+gst_mqtt_sink_init (GstMqttSink * self)
+{
+ /** @todo */
+}
+
+/**
+ * @brief Initialize GstMqttSinkClass object
+ */
+static void
+gst_mqtt_sink_class_init (GstMqttSinkClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+ GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
+
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SINK, 0,
+ "MQTT sink");
+
+ gobject_class->set_property = gst_mqtt_sink_set_property;
+ gobject_class->get_property = gst_mqtt_sink_get_property;
+ gobject_class->finalize = gst_mqtt_sink_class_finalize;
+
+ gstelement_class->change_state = gst_mqtt_sink_change_state;
+
+ gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_sink_start);
+ gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_sink_stop);
+ gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_sink_query);
+ gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_mqtt_sink_render);
+ gstbasesink_class->render_list =
+ GST_DEBUG_FUNCPTR (gst_mqtt_sink_render_list);
+ gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_mqtt_sink_event);
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "MQTT sink", "Sink/MQTT",
+ "Publish incoming data streams as a MQTT topic",
+ "Wook Song <wook16.song@samsung.com>");
+}
+
+/**
+ * @brief The setter for the mqttsink's properties
+ */
+static void
+gst_mqtt_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+/**
+ * @brief The getter for the mqttsink's properties
+ */
+static void
+gst_mqtt_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+/**
+ * @brief Finalize GstMqttSinkClass object
+ */
+static void
+gst_mqtt_sink_class_finalize (GObject * object)
+{
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+/**
+ * @brief Handle mqttsink's state change
+ */
+static GstStateChangeReturn
+gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+ return ret;
+}
+
+/**
+ * @brief Start mqttsink, called when state changed null to ready
+ */
+static gboolean
+gst_mqtt_sink_start (GstBaseSink * basesink)
+{
+ return TRUE;
+}
+
+/**
+ * @brief Stop mqttsink, called when state changed ready to null
+ */
+static gboolean
+gst_mqtt_sink_stop (GstBaseSink * basesink)
+{
+ return TRUE;
+}
+
+/**
+ * @brief Perform queries on the element
+ */
+static gboolean
+gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query)
+{
+ return TRUE;
+}
+
+/**
+ * @brief The callback to process each buffer receiving on the sink pad
+ */
+static GstFlowReturn
+gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * buffer)
+{
+ return GST_FLOW_OK;
+}
+
+/**
+ * @brief The callback to process GstBufferList (instead of a single buffer)
+ * on the sink pad
+ */
+static GstFlowReturn
+gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list)
+{
+ return GST_FLOW_OK;
+}
+
+/**
+ * @brief Handle events arriving on the sink pad
+ */
+static gboolean
+gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event)
+{
+ return TRUE;
+}
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-only */
+/**
+ * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
+ */
+/**
+ * @file mqttsink.h
+ * @date 08 Mar 2021
+ * @brief Publish incoming data streams as a MQTT topic
+ * @see https://github.com/nnstreamer/nnstreamer
+ * @author Wook Song <wook16.song@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#ifndef __GST_MQTT_SINK_H__
+#define __GST_MQTT_SINK_H__
+#include <gst/base/gstbasesink.h>
+#include <gst/gst.h>
+
+#include "mqttcommon.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_MQTT_SINK \
+ (gst_mqtt_sink_get_type())
+#define GST_MQTT_SINK(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_MQTT_SINK, GstMqttSink))
+#define GST_IS_MQTT_SINK(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_MQTT_SINK))
+#define GST_MQTT_SINK_CAST(obj) \
+ ((GstMqttSink *) obj)
+#define GST_MQTT_SINK_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_MQTT_SINK, GstMqttSinkClass))
+#define GST_IS_MQTT_SINK_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_MQTT_SINK))
+
+typedef struct _GstMqttSink GstMqttSink;
+typedef struct _GstMqttSinkClass GstMqttSinkClass;
+
+/**
+ * @brief GstMqttSink data structure.
+ *
+ * GstMqttSink inherits GstBaseSink.
+ */
+struct _GstMqttSink {
+ GstBaseSink parent;
+};
+
+/**
+ * @brief GstMqttSinkClass data structure.
+ *
+ * GstMqttSinkClass inherits GstBaseSinkClass.
+ */
+struct _GstMqttSinkClass {
+ GstBaseSinkClass parent_class;
+};
+
+GType gst_mqtt_sink_get_type (void);
+
+G_END_DECLS
+#endif /* !__GST_MQTT_SINK_H__ */
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-only */
+/**
+ * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
+ */
+/**
+ * @file mqttsrc.c
+ * @date 08 Mar 2021
+ * @brief Subscribe a MQTT topic and push incoming data to the GStreamer pipeline
+ * @see https://github.com/nnstreamer/nnstreamer
+ * @author Wook Song <wook16.song@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gst/base/gstbasesrc.h>
+
+#include "mqttsrc.h"
+
+#define gst_mqtt_src_parent_class parent_class
+G_DEFINE_TYPE (GstMqttSrc, gst_mqtt_src, GST_TYPE_BASE_SRC);
+
+GST_DEBUG_CATEGORY_STATIC (gst_mqtt_src_debug);
+#define GST_CAT_DEFAULT gst_mqtt_src_debug
+
+enum
+{
+ PROP_0,
+
+ PROP_LAST
+};
+
+
+/** Function prototype declarations */
+static void
+gst_mqtt_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void
+gst_mqtt_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static void gst_mqtt_src_class_finalize (GObject * object);
+
+static GstStateChangeReturn
+gst_mqtt_src_change_state (GstElement * element, GstStateChange transition);
+
+static gboolean gst_mqtt_src_start (GstBaseSrc * basesrc);
+static gboolean gst_mqtt_src_stop (GstBaseSrc * basesrc);
+static gboolean gst_mqtt_src_event (GstBaseSrc * basesrc, GstEvent * event);
+static gboolean gst_mqtt_src_set_caps (GstBaseSrc * basesrc, GstCaps * caps);
+static GstCaps *gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter);
+static GstCaps *gst_mqtt_src_fixate (GstBaseSrc * basesrc, GstCaps * caps);
+static void
+gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
+ GstClockTime * start, GstClockTime * end);
+static gboolean gst_mqtt_src_get_size (GstBaseSrc * basesrc, guint64 * size);
+static gboolean gst_mqtt_src_is_seekable (GstBaseSrc * basesrc);
+static GstFlowReturn
+gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
+ GstBuffer ** buf);
+static GstFlowReturn
+gst_mqtt_src_alloc (GstBaseSrc * basesrc, guint64 offset, guint size,
+ GstBuffer ** buf);
+static GstFlowReturn
+gst_mqtt_src_fill (GstBaseSrc * basesrc, guint64 offset, guint size,
+ GstBuffer * buf);
+
+/** Function defintions */
+static void
+gst_mqtt_src_init (GstMqttSrc * self)
+{
+ /** @todo */
+}
+
+static void
+gst_mqtt_src_class_init (GstMqttSrcClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+ GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
+
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SRC, 0,
+ "MQTT src");
+
+ gobject_class->set_property = gst_mqtt_src_set_property;
+ gobject_class->get_property = gst_mqtt_src_get_property;
+ gobject_class->finalize = gst_mqtt_src_class_finalize;
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state);
+ gst_element_class_set_static_metadata (gstelement_class,
+ "MQTT Source",
+ "Source/MQTT",
+ "Subscribe a MQTT topic and push incoming data to the GStreamer pipeline",
+ "Wook Song <wook16.song@samsung.com>");
+
+ gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_src_start);
+ gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_src_stop);
+ gstbasesrc_class->event = GST_DEBUG_FUNCPTR (gst_mqtt_src_event);
+ gstbasesrc_class->set_caps = GST_DEBUG_FUNCPTR (gst_mqtt_src_set_caps);
+ gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_caps);
+ gstbasesrc_class->fixate = GST_DEBUG_FUNCPTR (gst_mqtt_src_fixate);
+ gstbasesrc_class->get_times = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_times);
+ gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_size);
+ gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_mqtt_src_is_seekable);
+ gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_mqtt_src_create);
+ gstbasesrc_class->alloc = GST_DEBUG_FUNCPTR (gst_mqtt_src_alloc);
+ gstbasesrc_class->fill = GST_DEBUG_FUNCPTR (gst_mqtt_src_fill);
+}
+
+static void
+gst_mqtt_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_mqtt_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_mqtt_src_class_finalize (GObject * object)
+{
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstStateChangeReturn
+gst_mqtt_src_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+ return ret;
+}
+
+static gboolean
+gst_mqtt_src_start (GstBaseSrc * basesrc)
+{
+ return TRUE;
+}
+
+static gboolean
+gst_mqtt_src_stop (GstBaseSrc * basesrc)
+{
+ return TRUE;
+}
+
+static gboolean
+gst_mqtt_src_event (GstBaseSrc * basesrc, GstEvent * event)
+{
+ return TRUE;
+}
+
+static gboolean
+gst_mqtt_src_set_caps (GstBaseSrc * basesrc, GstCaps * caps)
+{
+ return TRUE;
+}
+
+static GstCaps *
+gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter)
+{
+ GstCaps *caps = gst_caps_new_empty ();
+
+ return caps;
+}
+
+static GstCaps *
+gst_mqtt_src_fixate (GstBaseSrc * basesrc, GstCaps * caps)
+{
+ caps = gst_caps_make_writable (caps);
+ caps = GST_BASE_SRC_CLASS (parent_class)->fixate (basesrc, caps);
+
+ return caps;
+}
+
+static void
+gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
+ GstClockTime * start, GstClockTime * end)
+{
+ return;
+}
+
+static gboolean
+gst_mqtt_src_get_size (GstBaseSrc * basesrc, guint64 * size)
+{
+ return TRUE;
+}
+
+static gboolean
+gst_mqtt_src_is_seekable (GstBaseSrc * basesrc)
+{
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
+ GstBuffer ** buf)
+{
+ return GST_FLOW_OK;
+}
+
+static GstFlowReturn
+gst_mqtt_src_alloc (GstBaseSrc * basesrc, guint64 offset, guint size,
+ GstBuffer ** buf)
+{
+ return GST_FLOW_OK;
+}
+
+static GstFlowReturn
+gst_mqtt_src_fill (GstBaseSrc * basesrc, guint64 offset, guint size,
+ GstBuffer * buf)
+{
+ return GST_FLOW_OK;
+}
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-only */
+/**
+ * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
+ */
+/**
+ * @file mqttsrc.h
+ * @date 08 Mar 2021
+ * @brief Subscribe a MQTT topic and push incoming data to the GStreamer pipeline
+ * @see https://github.com/nnstreamer/nnstreamer
+ * @author Wook Song <wook16.song@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#ifndef __GST_MQTT_SRC_H__
+#define __GST_MQTT_SRC_H__
+#include <gst/base/gstbasesrc.h>
+#include <gst/gst.h>
+
+#include "mqttcommon.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_MQTT_SRC \
+ (gst_mqtt_src_get_type())
+#define GST_MQTT_SRC(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_MQTT_SRC, GstMqttSrc))
+#define GST_IS_MQTT_SRC(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_MQTT_SRC))
+#define GST_MQTT_SRC_CAST(obj) \
+ ((GstMqttSrc *) obj)
+#define GST_MQTT_SRC_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_MQTT_SRC, GstMqttSrcClass))
+#define GST_IS_MQTT_SRC_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_MQTT_SRC))
+
+typedef struct _GstMqttSrc GstMqttSrc;
+typedef struct _GstMqttSrcClass GstMqttSrcClass;
+
+struct _GstMqttSrc {
+ GstBaseSrc parent;
+};
+
+struct _GstMqttSrcClass {
+ GstBaseSrcClass parent_class;
+};
+
+GType gst_mqtt_src_get_type (void);
+
+G_END_DECLS
+#endif /* !__GST_MQTT_SRC_H__ */