DEFAULT_MQTT_PUB_WAIT_TIMEOUT = 1, /* 1 secs */
};
+static guint8 sink_client_id = 0;
static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "tcp://localhost";
static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
static const gchar TAG_ERR_MQTTSINK[] = "ERROR: MQTTSink";
-const gchar *DEFAULT_MQTT_CLIENT_ID;
-const gchar *DEFAULT_MQTT_PUB_TOPIC;
+static const gchar DEFAULT_MQTT_CLIENT_ID[] = "$HOST_$PID_^[0-9][0-9]?$|^255$";
+static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_sink%u";
+static const gchar DEFAULT_MQTT_PUB_TOPIC[] = "$client-id/topic";
+static const gchar DEFAULT_MQTT_PUB_TOPIC_FORMAT[] = "%s/topic";
/** Function prototype declarations */
static void
/** init mqttsink properties */
self->num_buffers = DEFAULT_NUM_BUFFERS;
- self->mqtt_client_id = (gchar *) DEFAULT_MQTT_CLIENT_ID;
+ self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
- self->mqtt_topic = (gchar *) DEFAULT_MQTT_PUB_TOPIC;
+ self->mqtt_topic = g_strdup (DEFAULT_MQTT_PUB_TOPIC);
self->mqtt_pub_wait_timeout = DEFAULT_MQTT_PUB_WAIT_TIMEOUT;
self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
- DEFAULT_MQTT_CLIENT_ID = g_strdup_printf ("%s/%u/%u", g_get_host_name (),
- getpid (), g_random_int_range (0, 0xFF));
- DEFAULT_MQTT_PUB_TOPIC = g_strdup_printf ("%s/topic", DEFAULT_MQTT_CLIENT_ID);
-
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SINK, 0,
"MQTT sink");
g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
g_param_spec_string ("client-id", "Client ID",
- "The client identifier passed to the server (broker)",
- DEFAULT_MQTT_CLIENT_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ "The client identifier passed to the server (broker).", NULL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
g_param_spec_string ("host", "Host", "Host (broker) to connect to",
g_object_class_install_property (gobject_class, PROP_MQTT_PUB_TOPIC,
g_param_spec_string ("pub-topic", "Topic to Publish",
- "The topic's name to publish", DEFAULT_MQTT_PUB_TOPIC,
+ "The topic's name to publish", NULL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
self->mqtt_host_port);
int ret;
+ if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
+ self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
+ g_get_host_name (), getpid (), sink_client_id++);
+ }
+
+ if (!g_strcmp0 (DEFAULT_MQTT_PUB_TOPIC, self->mqtt_topic)) {
+ self->mqtt_topic = g_strdup_printf (DEFAULT_MQTT_PUB_TOPIC_FORMAT,
+ self->mqtt_client_id);
+ }
+
/**
* @todo Support other persistence mechanisms
* MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
static void
gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id)
{
+ g_free (self->mqtt_client_id);
self->mqtt_client_id = g_strdup (id);
- g_free ((void *) DEFAULT_MQTT_CLIENT_ID);
}
/**
static void
gst_mqtt_sink_set_pub_topic (GstMqttSink * self, const gchar * topic)
{
+ g_free (self->mqtt_topic);
self->mqtt_topic = g_strdup (topic);
- g_free ((void *) DEFAULT_MQTT_PUB_TOPIC);
}
/**
DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000, /* 1 seconds */
};
+static guint8 src_client_id = 0;
static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "tcp://localhost";
static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
static const gchar TAG_ERR_MQTTSRC[] = "ERROR: MQTTSrc";
-const gchar *DEFAULT_MQTT_CLIENT_ID;
-const gchar *DEFAULT_MQTT_SUB_TOPIC;
+static const gchar DEFAULT_MQTT_CLIENT_ID[] =
+ "$HOSTNAME_$PID_^[0-9][0-9]?$|^255$";
+static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_src%u";
/** Function prototype declarations */
static void
self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSRC);
/** init mqttsrc properties */
- self->mqtt_client_id = (gchar *) DEFAULT_MQTT_CLIENT_ID;
+ self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
- self->mqtt_topic = (gchar *) DEFAULT_MQTT_SUB_TOPIC;
+ self->mqtt_topic = NULL;
self->mqtt_sub_timeout = (gint64) DEFAULT_MQTT_SUB_TIMEOUT;
self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
- DEFAULT_MQTT_CLIENT_ID = g_strdup_printf ("%s/%u/%u", g_get_host_name (),
- getpid (), g_random_int_range (0, 0xFF));
- DEFAULT_MQTT_SUB_TOPIC = g_strdup_printf ("%s/topic", DEFAULT_MQTT_CLIENT_ID);
-
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SRC, 0,
"MQTT src");
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TOPIC,
- g_param_spec_string ("sub-topic", "Topic to Subscribe",
- "The topic's name to subscribe", DEFAULT_MQTT_SUB_TOPIC,
+ g_param_spec_string ("sub-topic", "Topic to Subscribe (mandatory)",
+ "The topic's name to subscribe", NULL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
self->mqtt_host_port);
int ret;
+ if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
+ self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
+ g_get_host_name (), getpid (), src_client_id++);
+ }
+
/**
* @todo Support other persistence mechanisms
* MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
static void
gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id)
{
+ g_free (self->mqtt_client_id);
self->mqtt_client_id = g_strdup (id);
- g_free ((void *) DEFAULT_MQTT_CLIENT_ID);
}
/**
static void
gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic)
{
+ g_free (self->mqtt_topic);
self->mqtt_topic = g_strdup (topic);
}