#define GST_CLOCK_TIME_NONE ((uint64_t) -1)
-typedef struct _EdgeInfo {
+typedef struct _EdgeInfo
+{
char *mqtt_host_address;
char *mqtt_host_port;
char *mqtt_client_id;
EdgeInfo *info = (EdgeInfo *) context;
if ((info->mqtt_state == MQTT_SENDING) ||
- (info->mqtt_state == MQTT_DISCONNECTED))
- {
+ (info->mqtt_state == MQTT_DISCONNECTED)) {
debug_print ("state: %d", info->mqtt_state);
- return ;
+ return;
}
info->mqtt_state = MQTT_DISCONNECT_FAILED;
* @return the number of microseconds since January 1, 1970 UTC.
*/
static int64_t
-get_time_stamp () {
+get_time_stamp ()
+{
const int64_t GST_SEC_TO_US_MULTIPLIER = 1000000;
struct timeval tv;
gettimeofday (&tv, NULL);
- return tv.tv_sec * (int64_t)GST_SEC_TO_US_MULTIPLIER + tv.tv_usec;
+ return tv.tv_sec * (int64_t) GST_SEC_TO_US_MULTIPLIER + tv.tv_usec;
}
/**
* @param[in,out] info The pointer of EdgeInfo structure.
*/
static void
-init_edge_info (EdgeInfo *info)
+init_edge_info (EdgeInfo * info)
{
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
- MQTTAsync_disconnectOptions disconn_opts = MQTTAsync_disconnectOptions_initializer;
+ MQTTAsync_disconnectOptions disconn_opts =
+ MQTTAsync_disconnectOptions_initializer;
/* initialize GstMQTTMessageHdr */
info->mqtt_msg_hdr.num_mems = 0;
info->mqtt_msg_hdr.duration = GST_CLOCK_TIME_NONE;
info->mqtt_msg_hdr.dts = GST_CLOCK_TIME_NONE;
info->mqtt_msg_hdr.pts = GST_CLOCK_TIME_NONE;
- memset (info->mqtt_msg_hdr.gst_caps_str, 0x00, sizeof(info->mqtt_msg_hdr.gst_caps_str));
+ memset (info->mqtt_msg_hdr.gst_caps_str, 0x00,
+ sizeof (info->mqtt_msg_hdr.gst_caps_str));
/* initialize EdgeInfo */
info->mqtt_host_address = NULL;
* @param[in] info The pointer of EdgeInfo structure.
*/
static void
-finalize_edge_info (EdgeInfo *info)
+finalize_edge_info (EdgeInfo * info)
{
if (info->mqtt_host_address) {
free (info->mqtt_host_address);
int
edge_close_connection (edge_h handle)
{
- EdgeInfo *info = (EdgeInfo *)handle;
+ EdgeInfo *info = (EdgeInfo *) handle;
int ret;
if (!handle) {
/* support QoS */
if (info->mqtt_state == MQTT_SENDING) {
- debug_print ("Info: MQTTAsync_waitForCompletion(): %d", info->mqtt_last_sent_token);
- MQTTAsync_waitForCompletion (info->mqtt_client, info->mqtt_last_sent_token, DEFAULT_MQTT_DISCONNECT_TIMEOUT);
+ debug_print ("Info: MQTTAsync_waitForCompletion(): %d",
+ info->mqtt_last_sent_token);
+ MQTTAsync_waitForCompletion (info->mqtt_client, info->mqtt_last_sent_token,
+ DEFAULT_MQTT_DISCONNECT_TIMEOUT);
}
/* In case of MQTT_REQUEST_STOP state, publishing new message is denied. */
info->mqtt_state = MQTT_REQUEST_STOP;
* @brief Open the MQTT connection with specific options
*/
int
-edge_open_connection (edge_h *handle,
+edge_open_connection (edge_h * handle,
char *host_address, char *host_port, char *topic_name,
int64_t base_time_stamp, uint64_t duration, char *gst_caps_string,
edge_state_change_cb callback, void *user_data)
int ret = 0;
if (!handle) {
- debug_print("Error: Invalid Param: handle is NULL");
+ debug_print ("Error: Invalid Param: handle is NULL");
return -1;
}
info = (EdgeInfo *) malloc (sizeof (EdgeInfo));
if (!info) {
- debug_print("Error: failed to malloc()");
+ debug_print ("Error: failed to malloc()");
return -1;
}
init_edge_info (info);
/* Set parameter */
- info->mqtt_host_address = host_address ? strdup (host_address) : strdup (DEFAULT_MQTT_HOST_ADDRESS);
- info->mqtt_host_port = host_port ? strdup (host_port) : strdup (DEFAULT_MQTT_HOST_PORT);
+ info->mqtt_host_address =
+ host_address ? strdup (host_address) : strdup (DEFAULT_MQTT_HOST_ADDRESS);
+ info->mqtt_host_port =
+ host_port ? strdup (host_port) : strdup (DEFAULT_MQTT_HOST_PORT);
- snprintf (client_id, BUFFER_SIZE, "edge_sensor_%u_%u", getpid(), sink_client_id++);
- info->mqtt_client_id = strdup(client_id);
+ snprintf (client_id, BUFFER_SIZE, "edge_sensor_%u_%u", getpid (),
+ sink_client_id++);
+ info->mqtt_client_id = strdup (client_id);
if (topic_name) {
info->mqtt_topic = strdup (topic_name);
} else {
- snprintf (topic, BUFFER_SIZE, DEFAULT_MQTT_PUB_TOPIC_FORMAT, info->mqtt_client_id);
+ snprintf (topic, BUFFER_SIZE, DEFAULT_MQTT_PUB_TOPIC_FORMAT,
+ info->mqtt_client_id);
info->mqtt_topic = strdup (topic);
}
/* Set base time stamp if user is provided. If not, current time stamp is used */
if (base_time_stamp == 0) {
- info->mqtt_msg_hdr.base_time_epoch = get_time_stamp() * GST_US_TO_NS_MULTIPLIER;
+ info->mqtt_msg_hdr.base_time_epoch =
+ get_time_stamp () * GST_US_TO_NS_MULTIPLIER;
} else {
info->mqtt_msg_hdr.base_time_epoch = base_time_stamp;
}
}
/* Create MQTT Client object */
- snprintf (server_url, BUFFER_SIZE, "%s:%s", info->mqtt_host_address, info->mqtt_host_port);
- ret = MQTTAsync_create (&info->mqtt_client, server_url,
- info->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
+ snprintf (server_url, BUFFER_SIZE, "%s:%s", info->mqtt_host_address,
+ info->mqtt_host_port);
+ ret =
+ MQTTAsync_create (&info->mqtt_client, server_url, info->mqtt_client_id,
+ MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (ret != MQTTASYNC_SUCCESS) {
debug_print ("Error: failed MQTTAsync_create(): %d", ret);
goto error_handling;
int
edge_publish_single_msg (edge_h handle, void *buffer, uint64_t payload_size)
{
- EdgeInfo *info = (EdgeInfo *)handle;
+ EdgeInfo *info = (EdgeInfo *) handle;
int ret = 0;
uint8_t *msg_pub;
/* check the input parameters */
if (!handle) {
- debug_print("Error: Invalid Param: handle is NULL");
+ debug_print ("Error: Invalid Param: handle is NULL");
return -1;
}
if (!buffer) {
- debug_print("Error: Invalid Param: buffer is NULL");
+ debug_print ("Error: Invalid Param: buffer is NULL");
return -1;
}
if (payload_size == 0) {
- debug_print("Error: Invalid Param: payload_size should be bigger than 0.");
+ debug_print ("Error: Invalid Param: payload_size should be bigger than 0.");
return -1;
}
info->mqtt_msg_buf = NULL;
}
if (!info->mqtt_msg_buf) {
- info->mqtt_msg_buf = malloc(info->mqtt_msg_buf_size);
+ info->mqtt_msg_buf = malloc (info->mqtt_msg_buf_size);
if (!info->mqtt_msg_buf) {
debug_print ("Error: failed malloc()");
return -1;
info->mqtt_msg_hdr.size_mems[0] = payload_size;
/* set timestamp */
- info->mqtt_msg_hdr.sent_time_epoch = get_time_stamp() * GST_US_TO_NS_MULTIPLIER;
- info->mqtt_msg_hdr.pts = info->mqtt_msg_hdr.base_time_epoch - info->mqtt_msg_hdr.sent_time_epoch;
+ info->mqtt_msg_hdr.sent_time_epoch =
+ get_time_stamp () * GST_US_TO_NS_MULTIPLIER;
+ info->mqtt_msg_hdr.pts =
+ info->mqtt_msg_hdr.base_time_epoch - info->mqtt_msg_hdr.sent_time_epoch;
info->mqtt_msg_hdr.dts = GST_CLOCK_TIME_NONE;
info->mqtt_msg_hdr.duration = GST_CLOCK_TIME_NONE;
/* copy header and payload */
msg_pub = info->mqtt_msg_buf;
- memcpy (msg_pub, &info->mqtt_msg_hdr, sizeof(info->mqtt_msg_hdr));
- memcpy (&msg_pub[sizeof(info->mqtt_msg_hdr)], buffer, payload_size);
+ memcpy (msg_pub, &info->mqtt_msg_hdr, sizeof (info->mqtt_msg_hdr));
+ memcpy (&msg_pub[sizeof (info->mqtt_msg_hdr)], buffer, payload_size);
/* send message to the MQTT server */
ret = MQTTAsync_send (info->mqtt_client, info->mqtt_topic,
- info->mqtt_msg_buf_size, info->mqtt_msg_buf,
- DEFAULT_MQTT_QOS, 1, &info->mqtt_respn_opts);
+ info->mqtt_msg_buf_size, info->mqtt_msg_buf,
+ DEFAULT_MQTT_QOS, 1, &info->mqtt_respn_opts);
if (ret != MQTTASYNC_SUCCESS) {
debug_print ("Error: failed MQTTAsync_send()");
return -1;