[GSW-43] Flow Control for sending huge amount of data over DLT. partialy fixed
authorMohammed AL Dardoun <mohammed.aldardoun@partner.bmw.de>
Tue, 9 Aug 2011 15:04:17 +0000 (17:04 +0200)
committerMohammed AL Dardoun <mohammed.aldardoun@partner.bmw.de>
Tue, 9 Aug 2011 15:04:17 +0000 (17:04 +0200)
include/dlt/dlt_common.h
src/daemon/dlt_daemon_common.c
src/daemon/dlt_daemon_common.h
src/lib/dlt_user.c
src/lib/dlt_user_cfg.h
src/shared/dlt_common.c
src/tests/dlt-test-internal.c

index a13787b..44c1f33 100755 (executable)
@@ -555,6 +555,10 @@ typedef struct
     uint32_t    pos_write;  /**< current writing position in bytes*/
     uint32_t    pos_read;   /**< current reading position in bytes*/
     uint32_t    count;      /**< nr. of entries */
+    uint32_t   minimum_size;   /**< minimum value for the buffer  */
+    uint32_t   increasing_size;        /**< increasing value for the buffer    */
+    uint32_t   maximum_size;   /**< maximum value for the buffer*/
+    
 } DltRingBuffer;
 
 #ifdef __cplusplus
@@ -938,7 +942,7 @@ extern "C"
      * @param size Maximum size of buffer in bytes
      * @return negative value if there was an error
      */
-    int dlt_ringbuffer_init(DltRingBuffer *dltbuf, uint32_t size);
+    int dlt_ringbuffer_init(DltRingBuffer *dltbuf, uint32_t size, uint32_t increase_size, uint32_t max_size);
 
     /**
      * Release and free memory used by ringbuffer
@@ -969,6 +973,21 @@ extern "C"
      */
     int dlt_ringbuffer_put3(DltRingBuffer *dltbuf, void *data1, uint32_t size1, void *data2, uint32_t size2, void *data3, uint32_t size3);
 
+        /**
+     * This method writes the size of message
+     * @param dltbuf Pointer to ringbuffer structure
+     * @param data_size Pointer to size of data to be written
+     * @param unit_size Size of uint32_t   
+     */
+       void dlt_ringbuffer_putMessageSize(DltRingBuffer *dltbuf,uint32_t * data_size, uint32_t unit_size);
+       
+        /**
+     * This method reads the size of a message 
+     * @param dltbuf Pointer to ringbuffer structure
+     * @param unit_size Size of uint32_t 
+     * @return size of the message  
+     */
+       int dlt_ringbuffer_getMessageSize(DltRingBuffer *dltbuf, uint32_t unit_size);
     /**
      * Read one entry from ringbuffer
      * @param dltbuf Pointer to ringbuffer structure
index 4fbfd31..fb33fc3 100755 (executable)
@@ -166,7 +166,8 @@ int dlt_daemon_init(DltDaemon *daemon,int verbose)
     dlt_set_id(daemon->ecuid,"");
 
     /* initialize ring buffer for client connection */
-    if (dlt_ringbuffer_init(&(daemon->client_ringbuffer), DLT_DAEMON_RINGBUFFER_SIZE)==-1)
+    
+    if (dlt_ringbuffer_init(&(daemon->client_ringbuffer), DLT_DAEMON_RINGBUFFER_SIZE,DLT_DAEMON_RINGBUFFER_INCREASE_SIZE,DLT_DAEMON_RINGBUFFER_MAXIMUM_SIZE)==-1)
     {
        return -1;
     }
index fa0fe53..d861a9e 100755 (executable)
@@ -97,6 +97,10 @@ extern "C" {
 \r
 #define DLT_DAEMON_RINGBUFFER_SIZE 100000 /**< Ring buffer size for storing log messages while no client is connected */\r
 \r
+#define DLT_DAEMON_RINGBUFFER_INCREASE_SIZE  DLT_DAEMON_RINGBUFFER_SIZE\r
+\r
+#define DLT_DAEMON_RINGBUFFER_MAXIMUM_SIZE 100*DLT_DAEMON_RINGBUFFER_INCREASE_SIZE    \r
+\r
 #define DLT_DAEMON_STORE_TO_BUFFER -2   /**< Constant value to identify the command "store to buffer" */\r
 \r
 /* Use a semaphore or mutex from your OS to prevent concurrent access to the DLT buffer. */\r
index 26aa3e0..e1225bb 100755 (executable)
@@ -284,7 +284,7 @@ int dlt_init_common(void)
     dlt_user.dlt_ll_ts_max_num_entries = 0;
     dlt_user.dlt_ll_ts_num_entries = 0;
 
-    if (dlt_ringbuffer_init(&(dlt_user.rbuf), DLT_USER_RINGBUFFER_SIZE)==-1)
+    if (dlt_ringbuffer_init(&(dlt_user.rbuf), DLT_USER_RINGBUFFER_SIZE,DLT_USER_RINGBUFFER_INCREASE_SIZE,DLT_USER_RINGBUFFER_MAXIMUM_SIZE)==-1)
     {
                dlt_user_initialised = 0;
         return -1;
@@ -2149,6 +2149,10 @@ int dlt_user_log_send_log(DltContextData *log, int mtype)
                        {
                                dlt_log(LOG_ERR,"Storing message to history buffer failed! Message discarded.\n");
                        }
+                       else
+                       {
+                               ret = DLT_RETURN_OK;
+                       }
 
             DLT_SEM_FREE();
         }
@@ -2158,6 +2162,7 @@ int dlt_user_log_send_log(DltContextData *log, int mtype)
         case DLT_RETURN_PIPE_FULL:
         {
             /* data could not be written */
+            //printf("dlt_user overflow\n");
             dlt_user.overflow = 1;
             return -1;
         }
index 4376587..e6476f0 100755 (executable)
 #define DLT_USER_RCVBUF_MAX_SIZE 10024 \r
 \r
 /* Size of ring buffer */      \r
-#define DLT_USER_RINGBUFFER_SIZE 10024       \r
+#define DLT_USER_RINGBUFFER_SIZE 10024\r
+\r
+#define DLT_USER_RINGBUFFER_INCREASE_SIZE  DLT_USER_RINGBUFFER_SIZE\r
+\r
+#define DLT_USER_RINGBUFFER_MAXIMUM_SIZE 100*DLT_USER_RINGBUFFER_INCREASE_SIZE       \r
 \r
 /* Temporary buffer length */\r
 #define DLT_USER_BUFFER_LENGTH               255\r
index 0533c0e..22834dd 100755 (executable)
@@ -2232,7 +2232,7 @@ int dlt_check_storageheader(DltStorageHeader *storageheader)
              (storageheader->pattern[3] == 1));
 }
 
-int dlt_ringbuffer_init(DltRingBuffer *dltbuf, uint32_t size)
+int dlt_ringbuffer_init(DltRingBuffer *dltbuf, uint32_t size, uint32_t increasesize, uint32_t maxsize)
 {
 
     if (dltbuf==0)
@@ -2245,13 +2245,17 @@ int dlt_ringbuffer_init(DltRingBuffer *dltbuf, uint32_t size)
         return -1;
     }
 
-    dltbuf->buffer=(char*)malloc(size);
+       dltbuf->minimum_size = size;
+    dltbuf->buffer=(char*)malloc(dltbuf->minimum_size);
+    
     if (dltbuf->buffer==0)
     {
         return -1;
     }
 
-    dltbuf->size=size;
+    dltbuf->size= dltbuf->minimum_size;
+    dltbuf->increasing_size = increasesize;
+    dltbuf->maximum_size = maxsize;
 
     dltbuf->pos_write=0;
     dltbuf->pos_read=0;
@@ -2288,7 +2292,7 @@ int dlt_ringbuffer_free(DltRingBuffer *dltbuf)
 
 int dlt_ringbuffer_put(DltRingBuffer *dltbuf, void *data, uint32_t size)
 {
-    uint32_t sui, part1, part2;
+       uint32_t sui, part1, part2;      
 
     if (dltbuf==0)
     {
@@ -2307,29 +2311,21 @@ int dlt_ringbuffer_put(DltRingBuffer *dltbuf, void *data, uint32_t size)
 
     sui = sizeof(uint32_t);
 
+       dlt_ringbuffer_checkandfreespace(dltbuf, (size+sui));
+    
     if ((size+sui)>dltbuf->size)
     {
         return -1;
     }
 
-    dlt_ringbuffer_checkandfreespace(dltbuf, (size+sui));
-
     if (dltbuf->pos_write >= dltbuf->size)
     {
         dltbuf->pos_write = 0;
     }
 
     /* Not enough space for one uint available before end of linear buffer */
-    /* Start at begin of linear buffer */
-    if ((dltbuf->size - dltbuf->pos_write) < sui)
-    {
-        dltbuf->pos_write = 0;
-    }
-
-    /* Write length of following data to buffer */
-    memcpy(&(dltbuf->buffer[dltbuf->pos_write]), &size, sui);
-    dltbuf->pos_write+=sui;
-
+       dlt_ringbuffer_putMessageSize(dltbuf, &size,sui); 
+   
     if (dltbuf->pos_write >= dltbuf->size)
     {
         dltbuf->pos_write = 0;
@@ -2359,9 +2355,59 @@ int dlt_ringbuffer_put(DltRingBuffer *dltbuf, void *data, uint32_t size)
     return 0;
 }
 
+// This function is for writting messages size into buffer
+void dlt_ringbuffer_putMessageSize(DltRingBuffer *dltbuf,uint32_t * data_size, uint32_t unit_size)
+{
+       unsigned char firstByte;
+       unsigned char secondByte;
+       unsigned char thirdByte;
+       unsigned char fourthByte;       
+       
+       if ((dltbuf->size - dltbuf->pos_write) < unit_size)
+    {
+               firstByte = (*data_size)>>0;
+               secondByte = (*data_size)>>8;
+               thirdByte = (*data_size)>>16;
+               fourthByte = (*data_size)>>24;
+               
+               switch(dltbuf->size - dltbuf->pos_write)
+               {
+                       case 1:
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write]), &firstByte, 1);
+                               dltbuf->pos_write = 0;
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]),&secondByte,1);
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]),&thirdByte,1);
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]),&fourthByte,1);
+                               break;                  
+                       case 2:
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]), &firstByte, 1);                  
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write]),&secondByte,1);
+                               dltbuf->pos_write = 0;
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]),&thirdByte,1);
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]),&fourthByte,1);                   
+                               break;                  
+                       case 3:
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]), &firstByte, 1);                  
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]),&secondByte,1);                   
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write]),&thirdByte,1);
+                               dltbuf->pos_write = 0;
+                               memcpy(&(dltbuf->buffer[dltbuf->pos_write++]),&fourthByte,1);                   
+                               break;
+               }       
+      
+    }
+    else
+       {
+               /* Write length of following data to buffer */
+               memcpy(&(dltbuf->buffer[dltbuf->pos_write]), data_size, unit_size);
+               dltbuf->pos_write +=unit_size;          
+       }       
+}
+
 
 int dlt_ringbuffer_put3(DltRingBuffer *dltbuf, void *data1, uint32_t size1, void *data2, uint32_t size2, void *data3, uint32_t size3)
 {
+   
     uint32_t sui, part1, part2;
     uint32_t total_size;
 
@@ -2379,28 +2425,21 @@ int dlt_ringbuffer_put3(DltRingBuffer *dltbuf, void *data1, uint32_t size1, void
 
     total_size = size1+size2+size3;
 
+       dlt_ringbuffer_checkandfreespace(dltbuf, (total_size+sui));
+    
     if ((total_size+sui)>dltbuf->size)
     {
         return -1;
-    }
-
-    dlt_ringbuffer_checkandfreespace(dltbuf, (total_size+sui));
+    }   
 
     if (dltbuf->pos_write >= dltbuf->size)
     {
         dltbuf->pos_write = 0;
     }
 
+    //Write size of data
     /* Not enough space for one uint available before end of linear buffer */
-    /* Start at begin of linear buffer */
-    if ((dltbuf->size - dltbuf->pos_write) < sui)
-    {
-        dltbuf->pos_write = 0;
-    }
-
-    /* Write length of following data to buffer */
-    memcpy(&(dltbuf->buffer[dltbuf->pos_write]), &total_size, sui);
-    dltbuf->pos_write+=sui;
+    dlt_ringbuffer_putMessageSize(dltbuf, &total_size,sui); 
 
     if (dltbuf->pos_write >= dltbuf->size)
     {
@@ -2511,15 +2550,8 @@ int dlt_ringbuffer_get(DltRingBuffer *dltbuf, void *data, size_t *size)
         dltbuf->pos_read = 0;
     }
 
-    if ((dltbuf->size - dltbuf->pos_read) < sui)
-    {
-        dltbuf->pos_read = 0;
-    }
-
-    /* printf("Reading at offset: %d\n", dltbuf->pos_read); */
-
-    memcpy(&tmpsize,&(dltbuf->buffer[dltbuf->pos_read]), sui);
-    dltbuf->pos_read += sui;
+  // get size of data 
+       tmpsize = dlt_ringbuffer_getMessageSize(dltbuf, sui);               
 
     if (dltbuf->pos_read >= dltbuf->size)
     {
@@ -2559,6 +2591,66 @@ int dlt_ringbuffer_get(DltRingBuffer *dltbuf, void *data, size_t *size)
     return 0;
 }
 
+int dlt_ringbuffer_getMessageSize(DltRingBuffer *dltbuf, uint32_t unit_size)
+{
+       uint32_t temp;
+       uint32_t retVal;
+       unsigned char firstByte = 0;
+       unsigned char secondByte = 0;
+       unsigned char thirdByte = 0;
+       unsigned char fourthByte = 0;   
+       
+       if ((dltbuf->size - dltbuf->pos_read) < unit_size)
+    {
+               switch(dltbuf->size - dltbuf->pos_read)
+               {
+                       case 1:
+                               memcpy(&firstByte,&(dltbuf->buffer[dltbuf->pos_read]), 1);
+                               dltbuf->pos_read = 0;
+                               memcpy(&secondByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);
+                               memcpy(&thirdByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);
+                               memcpy(&fourthByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);                   
+                               break;
+                       case 2:
+                               memcpy(&firstByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);                            
+                               memcpy(&secondByte,&(dltbuf->buffer[dltbuf->pos_read]), 1);
+                               dltbuf->pos_read = 0;
+                               memcpy(&thirdByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);
+                               memcpy(&fourthByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);                   
+                               break;
+                       case 3:
+                               memcpy(&firstByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);                            
+                               memcpy(&secondByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);
+                               memcpy(&thirdByte,&(dltbuf->buffer[dltbuf->pos_read]), 1);
+                               dltbuf->pos_read = 0;
+                               memcpy(&fourthByte,&(dltbuf->buffer[dltbuf->pos_read++]), 1);                   
+                               break;
+               }               
+               
+               retVal = firstByte;
+               
+               temp = secondByte;
+               temp = temp<<8;
+               retVal |= temp;
+               
+               temp = thirdByte;
+               temp = temp<<16;
+               retVal |= temp;
+               
+               temp = fourthByte;
+               temp = temp<<24;
+               retVal |= temp;
+       }
+    else
+    {
+               /* printf("Reading at offset: %d\n", dltbuf->pos_read); */
+               memcpy(&retVal,&(dltbuf->buffer[dltbuf->pos_read]), unit_size);
+               dltbuf->pos_read += unit_size;
+       }
+       
+       return retVal;  
+}
+
 int dlt_ringbuffer_get_skip(DltRingBuffer *dltbuf)
 {
     uint32_t tmpsize=0;
@@ -2587,15 +2679,9 @@ int dlt_ringbuffer_get_skip(DltRingBuffer *dltbuf)
     {
         dltbuf->pos_read = 0;
     }
-
-    if ((dltbuf->size - dltbuf->pos_read) < sui)
-    {
-        dltbuf->pos_read = 0;
-    }
-
-    memcpy(&tmpsize,&(dltbuf->buffer[dltbuf->pos_read]), sui);
-    dltbuf->pos_read += sui;
-
+    
+       tmpsize = dlt_ringbuffer_getMessageSize(dltbuf, sui);
+       
     if (dltbuf->pos_read >= dltbuf->size)
     {
         dltbuf->pos_read = 0;
@@ -2623,6 +2709,7 @@ int dlt_ringbuffer_get_skip(DltRingBuffer *dltbuf)
     return 0;
 }
 
+
 int dlt_ringbuffer_freespacewrite(DltRingBuffer *dltbuf, uint32_t *freespace)
 {
     if ((dltbuf==0) || (freespace==0))
@@ -2661,7 +2748,9 @@ int dlt_ringbuffer_freespacewrite(DltRingBuffer *dltbuf, uint32_t *freespace)
 int dlt_ringbuffer_checkandfreespace(DltRingBuffer *dltbuf, uint32_t reqspace)
 {
     uint32_t space_left;
-
+    uint32_t newSize;
+    uint32_t sizeOfNonReadData; 
+    
     if (dltbuf==0)
     {
         return -1;
@@ -2669,30 +2758,75 @@ int dlt_ringbuffer_checkandfreespace(DltRingBuffer *dltbuf, uint32_t reqspace)
 
     if (dlt_ringbuffer_freespacewrite(dltbuf,&space_left) == -1)
     {
-        return -1;
+       return -1;
     }
 
     /* printf("Now reading at: %d, space_left = %d, req = %d, r=%d, w=%d, count=%d \n",
               dltbuf->pos_read,space_left, reqspace, dltbuf->pos_read, dltbuf->pos_write, dltbuf->count); */
 
-    while (space_left<reqspace)
-    {
-        /* Overwrite, correct read position */
-
-        /* Read and skip one element */
-        dlt_ringbuffer_get_skip(dltbuf);
-
-        /* Space until pos_read */
-        if (dlt_ringbuffer_freespacewrite(dltbuf,&space_left) == -1)
-           {
-                   return -1;
-           }
-
-        /* printf("Overwrite: Now reading at: %d, space_left = %d, req = %d, r=%d, w=%d, count=%d \n",
-                  dltbuf->pos_read,space_left, reqspace, dltbuf->pos_read, dltbuf->pos_write, dltbuf->count); */
-    }
-
-    return 0;
+       if(space_left<reqspace)
+       {
+               if(dltbuf->size < dltbuf->maximum_size)
+               {
+                       char * extendedBuffer;
+                       
+                       newSize = dltbuf->increasing_size + dltbuf->size;
+                       if(newSize > dltbuf->maximum_size)
+                       {
+                               newSize = dltbuf->maximum_size;
+                       }
+                       
+                       extendedBuffer = (char *) malloc(newSize);
+                       
+               
+                       if(dltbuf->pos_write > dltbuf->pos_read)
+                       {
+                               //printf("Buffer read < write \n");
+                               sizeOfNonReadData = dltbuf->pos_write - dltbuf->pos_read;                       
+                               memcpy(extendedBuffer,&(dltbuf->buffer[dltbuf->pos_read]),sizeOfNonReadData);
+                               free(dltbuf->buffer);
+                               dltbuf->buffer = extendedBuffer;
+                               dltbuf->pos_read = dltbuf->buffer[0];
+                               dltbuf->pos_write = sizeOfNonReadData;
+                               dltbuf->size = newSize;
+                       }
+                       else if(dltbuf->pos_write <= dltbuf->pos_read && dltbuf->count >0)
+                       {
+                               
+                               //printf("Buffer read = write \n");
+                               sizeOfNonReadData = (dltbuf->size - dltbuf->pos_read) + (dltbuf->pos_write - dltbuf->buffer[0]);                
+                               memcpy(extendedBuffer,&(dltbuf->buffer[dltbuf->pos_read]),dltbuf->size - dltbuf->pos_read);
+                               memcpy(&(extendedBuffer[dltbuf->size - dltbuf->pos_read]),&(dltbuf->buffer[0]),dltbuf->pos_write - dltbuf->buffer[0]);          
+                               free(dltbuf->buffer);
+                               dltbuf->buffer = extendedBuffer;
+                               dltbuf->pos_read = dltbuf->buffer[0];
+                               dltbuf->pos_write = sizeOfNonReadData;
+                               dltbuf->size = newSize;
+                       }                                               
+                       //printf("Buffer size is increased to %d!\n",newSize);
+               }
+               else
+               {
+                       //printf("Maximum buffer size reached!\n");
+                       while (space_left<reqspace)
+                       {
+                               /* Overwrite, correct read position */
+                               /* Read and skip one element */
+                               dlt_ringbuffer_get_skip(dltbuf);
+                               
+                               /* Space until pos_read */
+                               if (dlt_ringbuffer_freespacewrite(dltbuf,&space_left) == -1)
+                               {
+                                       return -1;
+                               }
+                               
+                               /* printf("Overwrite: Now reading at: %d, space_left = %d, req = %d, r=%d, w=%d, count=%d \n",
+                                                 dltbuf->pos_read,space_left, reqspace, dltbuf->pos_read, dltbuf->pos_write, dltbuf->count); */
+                       } 
+               }                                       
+       }
+       
+       return 0; 
 }
 
 #if !defined (__WIN32__)
index 00209f1..e457904 100755 (executable)
@@ -206,7 +206,7 @@ void internal1(void)
     for (size=8;size<=30;size++)
     {
 
-        dlt_ringbuffer_init(&mybuf, size);
+        dlt_ringbuffer_init(&mybuf, size,size,10*size);
 
         memset(result,0,1024);