disconn_opts.onFailure = cb_mqtt_on_disconnect_failure;
disconn_opts.context = self;
- g_mutex_lock (&self->mqtt_sink_mutex);
- self->mqtt_sink_state = SINK_RENDER_STOPPED;
+ g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_STOPPED);
while (MQTTAsync_isConnected (self->mqtt_client_handle)) {
+ gint64 end_time = g_get_monotonic_time () +
+ DEFAULT_MQTT_DISCONNECT_TIMEOUT / 10;
+ mqtt_sink_state_t cur_state;
+
MQTTAsync_disconnect (self->mqtt_client_handle, &disconn_opts);
- g_cond_wait (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex);
- if ((self->mqtt_sink_state == MQTT_DISCONNECTED) ||
- (self->mqtt_sink_state == MQTT_DISCONNECT_FAILED) ||
- (self->mqtt_sink_state == SINK_RENDER_EOS) ||
- (self->mqtt_sink_state == SINK_RENDER_ERROR))
+ g_mutex_lock (&self->mqtt_sink_mutex);
+ g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
+ end_time);
+ g_mutex_unlock (&self->mqtt_sink_mutex);
+ cur_state = g_atomic_int_get (&self->mqtt_sink_state);
+
+ if ((cur_state == MQTT_DISCONNECTED) ||
+ (cur_state == MQTT_DISCONNECT_FAILED) ||
+ (cur_state == SINK_RENDER_EOS) || (cur_state == SINK_RENDER_ERROR))
break;
}
- g_mutex_unlock (&self->mqtt_sink_mutex);
MQTTAsync_destroy (&self->mqtt_client_handle);
return TRUE;
static gboolean is_static_sized_buf = FALSE;
GstMqttSink *self = GST_MQTT_SINK (basesink);
GstFlowReturn ret = GST_FLOW_ERROR;
+ mqtt_sink_state_t cur_state;
GstMemory *in_buf_mem;
GstMapInfo in_buf_map;
gint mqtt_rc;
guint8 *msg_pub;
- g_mutex_lock (&self->mqtt_sink_mutex);
- while (self->mqtt_sink_state != MQTT_CONNECTED) {
+ while ((cur_state =
+ g_atomic_int_get (&self->mqtt_sink_state)) != MQTT_CONNECTED) {
gint64 end_time = g_get_monotonic_time ();
+ mqtt_sink_state_t _state;
end_time += (self->mqtt_pub_wait_timeout * G_TIME_SPAN_SECOND);
+ g_mutex_lock (&self->mqtt_sink_mutex);
g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
end_time);
- switch (self->mqtt_sink_state) {
+ g_mutex_unlock (&self->mqtt_sink_mutex);
+
+ _state = g_atomic_int_get (&self->mqtt_sink_state);
+ switch (_state) {
case MQTT_CONNECT_FAILURE:
case MQTT_DISCONNECTED:
case MQTT_CONNECTION_LOST:
default:
continue;
}
- goto ret_unlock;
+ goto ret_with;
}
- g_mutex_unlock (&self->mqtt_sink_mutex);
if (self->num_buffers == 0) {
ret = GST_FLOW_EOS;
ret_with:
return ret;
-
-ret_unlock:
- g_mutex_unlock (&self->mqtt_sink_mutex);
-
- return ret;
}
/**
switch (type) {
case GST_EVENT_EOS:
+ g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
g_mutex_lock (&self->mqtt_sink_mutex);
- self->mqtt_sink_state = SINK_RENDER_EOS;
g_cond_broadcast (&self->mqtt_sink_gcond);
g_mutex_unlock (&self->mqtt_sink_mutex);
break;
{
GstMqttSink *self = (GstMqttSink *) context;
+ g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTED);
g_mutex_lock (&self->mqtt_sink_mutex);
- self->mqtt_sink_state = MQTT_CONNECTED;
g_cond_broadcast (&self->mqtt_sink_gcond);
g_mutex_unlock (&self->mqtt_sink_mutex);
}
{
GstMqttSink *self = (GstMqttSink *) context;
+ g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECT_FAILURE);
g_mutex_lock (&self->mqtt_sink_mutex);
- self->mqtt_sink_state = MQTT_CONNECT_FAILURE;
g_cond_broadcast (&self->mqtt_sink_gcond);
g_mutex_unlock (&self->mqtt_sink_mutex);
}
{
GstMqttSink *self = (GstMqttSink *) context;
+ g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECTED);
g_mutex_lock (&self->mqtt_sink_mutex);
- self->mqtt_sink_state = MQTT_DISCONNECTED;
g_cond_broadcast (&self->mqtt_sink_gcond);
g_mutex_unlock (&self->mqtt_sink_mutex);
}
{
GstMqttSink *self = (GstMqttSink *) context;
+ g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECT_FAILED);
g_mutex_lock (&self->mqtt_sink_mutex);
- self->mqtt_sink_state = MQTT_DISCONNECT_FAILED;
g_cond_broadcast (&self->mqtt_sink_gcond);
g_mutex_unlock (&self->mqtt_sink_mutex);
-
}
/**
{
GstMqttSink *self = (GstMqttSink *) context;
+ g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTION_LOST);
g_mutex_lock (&self->mqtt_sink_mutex);
- self->mqtt_sink_state = MQTT_CONNECTION_LOST;
g_cond_broadcast (&self->mqtt_sink_gcond);
g_mutex_unlock (&self->mqtt_sink_mutex);
}
cb_mqtt_on_send_success (void *context, MQTTAsync_successData * response)
{
GstMqttSink *self = (GstMqttSink *) context;
+ mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
- g_mutex_lock (&self->mqtt_sink_mutex);
- if (self->mqtt_sink_state == SINK_RENDER_STOPPED) {
- self->mqtt_sink_state = SINK_RENDER_EOS;
+ if (state == SINK_RENDER_STOPPED) {
+ g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
+
+ g_mutex_lock (&self->mqtt_sink_mutex);
g_cond_broadcast (&self->mqtt_sink_gcond);
+ g_mutex_unlock (&self->mqtt_sink_mutex);
}
- g_mutex_unlock (&self->mqtt_sink_mutex);
}
/**
cb_mqtt_on_send_failure (void *context, MQTTAsync_failureData * response)
{
GstMqttSink *self = (GstMqttSink *) context;
+ mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
- g_mutex_lock (&self->mqtt_sink_mutex);
- if (self->mqtt_sink_state == SINK_RENDER_STOPPED) {
- self->mqtt_sink_state = SINK_RENDER_ERROR;
+ if (state == SINK_RENDER_STOPPED) {
+ g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_ERROR);
+
+ g_mutex_lock (&self->mqtt_sink_mutex);
g_cond_broadcast (&self->mqtt_sink_gcond);
+ g_mutex_unlock (&self->mqtt_sink_mutex);
}
- g_mutex_unlock (&self->mqtt_sink_mutex);
+
}