[GDLT-137]: Automated resending: Improve init. Use everywhere.
authorLassi Marttala <Lassi.LM.Marttala@partner.bmw.de>
Thu, 4 Oct 2012 07:58:29 +0000 (09:58 +0200)
committerAlexander Wenzel <Alexander.AW.Wenzel@bmw.de>
Mon, 26 Nov 2012 11:39:33 +0000 (12:39 +0100)
This is a combination of 2 commits. Rest of the commit messages below.

[GDLT-137]: Add resending to all user library places which use buffer.

[GDLT-137]: Delay mq opening. Make it thread safe.

Signed-off-by: Alexander Wenzel <Alexander.AW.Wenzel@bmw.de>
src/lib/dlt_user.c
src/tests/dlt-test-client.c

index 0edea8c..0681bd6 100644 (file)
@@ -101,8 +101,13 @@ static pthread_attr_t dlt_receiverthread_attr;
 /* Segmented Network Trace */
 #define DLT_MAX_TRACE_SEGMENT_SIZE 1024
 #define DLT_MESSAGE_QUEUE_NAME "/dlt_message_queue"
-#define DLT_DELAYED_RESEND_INDICATOR_PATTERN 0xFFFFFFFF
+#define DLT_DELAYED_RESEND_INDICATOR_PATTERN 0xFFFF
 
+/* Mutex to wait on while message queue is not initialized */
+pthread_mutex_t mq_mutex;
+pthread_cond_t  mq_init_condition;
+
+/* Structure to pass data to segmented thread */
 typedef struct {
        DltContext                      *handle;
        uint16_t                        id;
@@ -129,6 +134,7 @@ static int dlt_user_log_check_user_message(void);
 static void dlt_user_log_reattach_to_daemon(void);
 static int dlt_user_log_send_overflow(void);
 static void dlt_user_trace_network_segmented_thread(void *unused);
+static int dlt_user_queue_resend(void);
 
 int dlt_user_check_library_version(const char *user_major_version,const char *user_minor_version){
 
@@ -240,35 +246,15 @@ int dlt_init(void)
                dlt_log(LOG_WARNING, "Can't destroy thread attributes!\n");
        }
 
-    /* Generate per process name for queue */
-    char queue_name[NAME_MAX];
-    sprintf(queue_name, "%s.%d", DLT_MESSAGE_QUEUE_NAME, getpid());
-
-    /* Maximum queue size is 10, limit to size of pointers */
-    struct mq_attr mqatr;
-    mqatr.mq_flags             = 0;
-    mqatr.mq_maxmsg            = 10;
-    mqatr.mq_msgsize   = sizeof(s_segmented_data *);
-    mqatr.mq_curmsgs   = 0;
-
-    /* Separate handles for reading and writing */
-    dlt_user.dlt_segmented_queue_read_handle = mq_open(queue_name, O_CREAT| O_RDONLY,
-               S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH, &mqatr);
-    if(dlt_user.dlt_segmented_queue_read_handle < 0)
-    {
-       dlt_log(LOG_CRIT, "Can't create message queue read handle!\n");
-       dlt_log(LOG_CRIT, strerror(errno));
-       return -1;
-    }
+    /* These will be lazy initialized only when needed */
+    dlt_user.dlt_segmented_queue_read_handle = -1;
+    dlt_user.dlt_segmented_queue_write_handle = -1;
 
-    dlt_user.dlt_segmented_queue_write_handle = mq_open(queue_name, O_WRONLY);
-    if(dlt_user.dlt_segmented_queue_write_handle < 0)
-    {
-       dlt_log(LOG_CRIT, "Can't open message queue write handle!\n");
-       dlt_log(LOG_CRIT, strerror(errno));
-       return -1;
-    }
+    /* Wait mutext for segmented thread */
+    pthread_mutex_init(&mq_mutex, NULL);
+    pthread_cond_init(&mq_init_condition, NULL);
 
+    /* Start the segmented thread */
        if(pthread_create(&(dlt_user.dlt_segmented_nwt_handle), NULL,
           (void *)dlt_user_trace_network_segmented_thread, NULL))
        {
@@ -304,6 +290,73 @@ int dlt_init_file(const char *name)
     return 0;
 }
 
+int dlt_init_message_queue(void)
+{
+       pthread_mutex_lock(&mq_mutex);
+       if(dlt_user.dlt_segmented_queue_read_handle >= 0 &&
+          dlt_user.dlt_segmented_queue_write_handle >= 0)
+       {
+               // Already intialized
+               pthread_mutex_unlock(&mq_mutex);
+               return 0;
+       }
+
+    /* Generate per process name for queue */
+    char queue_name[NAME_MAX];
+    sprintf(queue_name, "%s.%d", DLT_MESSAGE_QUEUE_NAME, getpid());
+
+    /* Maximum queue size is 10, limit to size of pointers */
+    struct mq_attr mqatr;
+    mqatr.mq_flags             = 0;
+    mqatr.mq_maxmsg            = 10;
+    mqatr.mq_msgsize   = sizeof(s_segmented_data *);
+    mqatr.mq_curmsgs   = 0;
+
+    /**
+     * Create the message queue. It must be newly created
+     * if old one was left by a crashing process.
+     * */
+    dlt_user.dlt_segmented_queue_read_handle = mq_open(queue_name, O_CREAT| O_RDONLY | O_EXCL,
+               S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH, &mqatr);
+    if(dlt_user.dlt_segmented_queue_read_handle < 0)
+    {
+       if(errno == EEXIST)
+       {
+               dlt_log(LOG_WARNING, "Old message queue exists, trying to delete.\n");
+               if(mq_unlink(queue_name) < 0)
+               {
+                       dlt_log(LOG_CRIT, "Could not delete existing message queue!\n");
+                       dlt_log(LOG_CRIT, strerror(errno));
+               }
+               else // Retry
+               {
+                       dlt_user.dlt_segmented_queue_read_handle = mq_open(queue_name, O_CREAT| O_RDONLY | O_EXCL,
+                                       S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH, &mqatr);
+               }
+       }
+       if(dlt_user.dlt_segmented_queue_read_handle < 0)
+       {
+               dlt_log(LOG_CRIT, "Can't create message queue read handle!\n");
+               dlt_log(LOG_CRIT, strerror(errno));
+               pthread_mutex_unlock(&mq_mutex);
+               return -1;
+       }
+    }
+
+    dlt_user.dlt_segmented_queue_write_handle = mq_open(queue_name, O_WRONLY);
+    if(dlt_user.dlt_segmented_queue_write_handle < 0)
+    {
+       dlt_log(LOG_CRIT, "Can't open message queue write handle!\n");
+       dlt_log(LOG_CRIT, strerror(errno));
+       pthread_mutex_unlock(&mq_mutex);
+       return -1;
+    }
+
+    pthread_cond_signal(&mq_init_condition);
+    pthread_mutex_unlock(&mq_mutex);
+    return 0;
+}
+
 int dlt_init_common(void)
 {
     char *env_local_print;
@@ -491,18 +544,13 @@ int dlt_free(void)
     char queue_name[NAME_MAX];
     sprintf(queue_name, "%s.%d", DLT_MESSAGE_QUEUE_NAME, getpid());
 
-    if(mq_close(dlt_user.dlt_segmented_queue_write_handle) < 0)
-    {
-       dlt_log(LOG_ERR, "Failed to unlink message queue write handle!\n");
-       dlt_log(LOG_ERR, strerror(errno));
-    }
-
-    if(mq_close(dlt_user.dlt_segmented_queue_read_handle) < 0 ||
-       mq_unlink(queue_name))
-    {
-       dlt_log(LOG_ERR, "Failed to unlink message queue read handle!\n");
-       dlt_log(LOG_ERR, strerror(errno));
-    }
+    /**
+     * Ignore errors from these, to not to spam user if dlt_free
+     * is accidentally called multiple times.
+     */
+    mq_close(dlt_user.dlt_segmented_queue_write_handle);
+    mq_close(dlt_user.dlt_segmented_queue_read_handle);
+    mq_unlink(queue_name);
 
     dlt_user_initialised = 0;
 
@@ -1049,6 +1097,11 @@ int dlt_forward_msg(void *msgdata,size_t size)
                        }
 
             DLT_SEM_FREE();
+
+            if(dlt_user_queue_resend() < 0)
+            {
+               dlt_log(LOG_WARNING, "Failed to queue resending.\n");
+            }
         }
 
         switch (ret)
@@ -2061,8 +2114,17 @@ void dlt_user_trace_network_segmented_thread(void *unused)
 
        while(1)
        {
+               // Wait untill message queue is initialized
+               pthread_mutex_lock(&mq_mutex);
+               if(dlt_user.dlt_segmented_queue_read_handle < 0)
+               {
+                       pthread_cond_wait(&mq_init_condition, &mq_mutex);
+               }
+               pthread_mutex_unlock(&mq_mutex);
+
                ssize_t read = mq_receive(dlt_user.dlt_segmented_queue_read_handle, (char *)&data,
                                        sizeof(s_segmented_data * ), NULL);
+
                if(read != sizeof(s_segmented_data *))
                {
                        dlt_log(LOG_ERR, "NWTSegmented: Error while reading queue.\n");
@@ -2071,9 +2133,15 @@ void dlt_user_trace_network_segmented_thread(void *unused)
                }
 
                /* Indicator just to try to flush the buffer */
-               if(data->payload == (void *)DLT_DELAYED_RESEND_INDICATOR_PATTERN)
+               if(data->payload_len == DLT_DELAYED_RESEND_INDICATOR_PATTERN)
                {
-                       dlt_user_log_resend_buffer();
+                       // Sleep 100ms, to allow other process to read FIFO
+                       usleep(100*1000);
+                       if(dlt_user_log_resend_buffer() < 0)
+                       {
+                               // Requeue if still not empty
+                               dlt_user_queue_resend();
+                       }
                        free(data);
                        continue;
                }
@@ -2172,6 +2240,13 @@ int dlt_user_trace_network_segmented(DltContext *handle, DltNetworkTraceType nw_
                return -1;
        }
 
+       /* Open queue if it is not open */
+       if(dlt_init_message_queue() < 0)
+       {
+               dlt_log(LOG_ERR, "NWTSegmented: Could not open queue.\n");
+               return -1;
+       }
+
        /* Add to queue */
        if(mq_send(dlt_user.dlt_segmented_queue_write_handle,
                        (char *)&thread_data, sizeof(s_segmented_data *), 1) < 0)
@@ -2584,6 +2659,33 @@ int dlt_user_log_init(DltContext *handle, DltContextData *log)
     return 0;
 }
 
+int dlt_user_queue_resend(void)
+{
+    /**
+     * Ask segmented thread to try emptying the buffer soon.
+     * This will be freed in dlt_user_trace_network_segmented_thread
+     * */
+    s_segmented_data *resend_data = malloc(sizeof(s_segmented_data));
+    resend_data->payload_len = DLT_DELAYED_RESEND_INDICATOR_PATTERN;
+
+    /* Open queue if it is not open */
+       if(dlt_init_message_queue() < 0)
+       {
+               dlt_log(LOG_ERR, "NWTSegmented: Could not open queue.\n");
+               return -1;
+       }
+
+    if(mq_send(dlt_user.dlt_segmented_queue_write_handle, (char *)&resend_data, sizeof(s_segmented_data *), 1) < 0)
+    {
+       dlt_log(LOG_ERR,"Could not request resending.\n");
+       dlt_log(LOG_ERR, strerror(errno));
+       free(resend_data);
+       DLT_SEM_FREE();
+       return -1;
+    }
+    return 0;
+}
+
 DltReturnValue dlt_user_log_send_log(DltContextData *log, int mtype)
 {
     DltMessage msg;
@@ -2809,17 +2911,9 @@ DltReturnValue dlt_user_log_send_log(DltContextData *log, int mtype)
 
             DLT_SEM_FREE();
 
-            /**
-             * Ask segmented thread to try emptying the buffer soon.
-             * This will be freed in dlt_user_trace_network_segmented_thread
-             * */
-            s_segmented_data *resend_data = malloc(sizeof(s_segmented_data));
-            resend_data->payload = (void *)DLT_DELAYED_RESEND_INDICATOR_PATTERN;
-            if(mq_send(dlt_user.dlt_segmented_queue_write_handle, (char *)&resend_data, sizeof(s_segmented_data *), 1) < 0)
+            if(dlt_user_queue_resend() < 0)
             {
-               dlt_log(LOG_ERR,"Could not request resending.\n");
-               dlt_log(LOG_ERR, strerror(errno));
-               free(resend_data);
+               dlt_log(LOG_WARNING, "Failed to queue resending.\n");
             }
         }
 
@@ -2929,6 +3023,11 @@ int dlt_user_log_send_register_application(void)
              }
 
         DLT_SEM_FREE();
+
+        if(dlt_user_queue_resend() < 0)
+        {
+               dlt_log(LOG_WARNING, "Failed to queue resending.\n");
+        }
     }
 
     return 0;
@@ -3035,6 +3134,11 @@ int dlt_user_log_send_register_context(DltContextData *log)
              }
 
         DLT_SEM_FREE();
+
+        if(dlt_user_queue_resend() < 0)
+        {
+               dlt_log(LOG_WARNING, "Failed to queue resending.\n");
+        }
     }
 
     return 0;
index 6f1e972..f99da3e 100755 (executable)
@@ -2398,7 +2398,6 @@ int dlt_testclient_message_callback(DltMessage *message, void *data)
                     /* If the payload is correct, the counter is increased by 1 */
                     if (message->extendedheader->noar==4)
                     {
-                       //TODO: CHECK ACTUAL CONTENT
                         type_info=0;
                         type_info_tmp=0;
                         length=0,length_tmp=0; /* the macro can set this variable to -1 */