PROP_MQTT_SUB_TIMEOUT,
PROP_MQTT_OPT_CLEANSESSION,
PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
+ PROP_MQTT_QOS,
PROP_LAST
};
DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60, /* 1 minute */
DEFAULT_MQTT_SUB_TIMEOUT = 10000000, /* 10 seconds */
DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000, /* 1 seconds */
+ DEFAULT_MQTT_QOS = 2, /* Once and one only */
};
static guint8 src_client_id = 0;
static gint gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self);
static void gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self,
const gint num);
+static gint gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self);
+static void gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos);
static void cb_mqtt_on_connection_lost (void *context, char *cause);
static int cb_mqtt_on_message_arrived (void *context, char *topic_name,
self->mqtt_respn_opts.onSuccess = NULL;
self->mqtt_respn_opts.onFailure = NULL;
self->mqtt_respn_opts.context = self;
+ self->mqtt_qos = DEFAULT_MQTT_QOS;
/** init private member variables */
self->err = NULL;
1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
+ g_param_spec_int ("mqtt-qos", "mqtt QoS level",
+ "The QoS level of MQTT.\n"
+ "\t\t\t 0: At most once\n"
+ "\t\t\t 1: At least once\n"
+ "\t\t\t 2: Exactly once\n"
+ "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
+ 0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state);
case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
gst_mqtt_src_set_opt_keep_alive_interval (self, g_value_get_int (value));
break;
+ case PROP_MQTT_QOS:
+ gst_mqtt_src_set_mqtt_qos (self, g_value_get_int (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
g_value_set_int (value, gst_mqtt_src_get_opt_keep_alive_interval (self));
break;
+ case PROP_MQTT_QOS:
+ g_value_set_int (value, gst_mqtt_src_get_mqtt_qos (self));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
/**
+ * @brief Getter for the 'mqtt-qos' property
+ */
+static gint
+gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self)
+{
+ return self->mqtt_qos;
+}
+
+/**
+ * @brief Setter for the 'mqtt-qos' property
+ */
+static void
+gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos)
+{
+ self->mqtt_qos = qos;
+}
+
+/**
* @brief A callback to handle the connection lost to the broker
*/
static void
opts.onFailure = cb_mqtt_on_subscribe_failure;
opts.subscribeOptions.retainHandling = 1;
- /** @todo Support QoS option */
mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle,
- self->mqtt_topic, 0, &opts);
+ self->mqtt_topic, self->mqtt_qos, &opts);
if (mqttasync_ret != MQTTASYNC_SUCCESS)
return FALSE;
return TRUE;