pthread_t listener_thread;
/* thread and queue to send data */
+ bool sending;
nns_edge_queue_h send_queue;
pthread_t send_thread;
char *val;
int ret;
- while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h, &data_size)) {
+ eh->sending = true;
+ while (eh->sending &&
+ nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h, &data_size)) {
+ if (!eh->sending) {
+ nns_edge_data_destroy (data_h);
+ break;
+ }
+
/* Send data to destination */
switch (eh->connect_type) {
case NNS_EDGE_CONNECT_TYPE_TCP:
}
nns_edge_data_destroy (data_h);
}
+ eh->sending = false;
+
return NULL;
}
eh->broker_h = NULL;
eh->connections = NULL;
eh->listening = false;
+ eh->sending = false;
eh->listener_fd = -1;
nns_edge_metadata_create (&eh->metadata);
nns_edge_queue_create (&eh->send_queue);
eh->send_queue = NULL;
if (eh->send_thread) {
+ eh->sending = false;
pthread_join (eh->send_thread, NULL);
eh->send_thread = 0;
}