Add locks to log storages 78/292778/4
authorMichal Bloch <m.bloch@samsung.com>
Fri, 12 May 2023 11:17:22 +0000 (13:17 +0200)
committerMichal Bloch <m.bloch@samsung.com>
Fri, 15 Sep 2023 12:09:15 +0000 (14:09 +0200)
Change-Id: Ic4ed88e4ee90b750b9bda57b3297b70350237f51

Makefile.am
src/logger/log_compressed_storage.c
src/logger/log_storage.c

index 7a6ffa3..83563b1 100644 (file)
@@ -573,7 +573,7 @@ src_tests_test_libdlog_container_warning_LDFLAGS = $(AM_LDFLAGS)  -Wl,--wrap=acc
 
 src_tests_test_logger_log_storage_SOURCES = src/tests/test_logger_log_storage.c src/logger/log_storage.c src/shared/ptrs_list.c src/shared/queued_entry_timestamp.c
 src_tests_test_logger_log_storage_CFLAGS = $(check_CFLAGS)
-src_tests_test_logger_log_storage_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=malloc
+src_tests_test_logger_log_storage_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=malloc -pthread
 
 src_tests_test_logger_compressed_storage_SOURCES = src/tests/test_logger_compressed_storage.c \
        src/logger/compression_fastlz.c \
@@ -584,7 +584,7 @@ src_tests_test_logger_compressed_storage_SOURCES = src/tests/test_logger_compres
        external/miniz/miniz.c \
        external/fastlz/fastlz.c
 src_tests_test_logger_compressed_storage_CFLAGS = $(check_CFLAGS)
-src_tests_test_logger_compressed_storage_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=malloc
+src_tests_test_logger_compressed_storage_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=malloc -pthread
 
 src_tests_test_ptrs_list_pos_SOURCES = src/tests/test_ptrs_list_pos.c src/shared/ptrs_list.c
 src_tests_test_ptrs_list_pos_CFLAGS = $(check_CFLAGS)
index f19ee1a..1373a32 100644 (file)
@@ -21,6 +21,7 @@
 #include <stdlib.h>
 #include <stdbool.h>
 #include <assert.h>
+#include <pthread.h>
 #include <queued_entry_timestamp.h>
 
 /* Note: this is effectively a simplified version of `log_storage`.
@@ -48,6 +49,7 @@ struct log_compressed_storage {
        char *compression_workspace;
        size_t compression_workspace_size;
        struct compression_algo *algo; // compression algorithm implementation
+       pthread_rwlock_t lock;
 };
 
 struct log_compressed_storage_reader {
@@ -108,18 +110,17 @@ log_compressed_storage *log_compressed_storage_create(unsigned capacity, unsigne
                return NULL;
 
        storage->name = strdup(name);
-       if (!storage->name) {
-               free(storage);
-               return NULL;
-       }
+       if (!storage->name)
+               goto fail_dup;
+
+       int const r = pthread_rwlock_init(&storage->lock, NULL);
+       if (r < 0)
+               goto fail_lock;
 
        storage->compression_workspace_size = algo->get_workspace_size(COMPRESSION_CHUNK_SIZE);
        storage->compression_workspace = malloc(storage->compression_workspace_size);
-       if (!storage->compression_workspace) {
-               free(storage->name);
-               free(storage);
-               return NULL;
-       }
+       if (!storage->compression_workspace)
+               goto fail_workspace;
 
        storage->algo = algo;
        storage->capacity = capacity;
@@ -131,6 +132,14 @@ log_compressed_storage *log_compressed_storage_create(unsigned capacity, unsigne
        storage->waiting_readers = NULL;
 
        return storage;
+
+fail_workspace:
+       (void) pthread_rwlock_destroy(&storage->lock);
+fail_lock:
+       free(storage->name);
+fail_dup:
+       free(storage);
+       return NULL;
 }
 
 static void log_compressed_storage_reader_set_storage(void *reader_, void *storage)
@@ -145,8 +154,12 @@ void log_compressed_storage_free(log_compressed_storage *storage)
        assert(storage);
 
        log_compressed_storage_clear(storage);
+
        // disconnect all the remaining readers
        list_foreach(storage->waiting_readers, NULL, log_compressed_storage_reader_set_storage);
+
+       (void) pthread_rwlock_destroy(&storage->lock);
+
        free(storage->compression_workspace);
        free(storage->name);
        free(storage);
@@ -180,6 +193,8 @@ log_compressed_storage_reader *log_compressed_storage_new_reader(log_compressed_
        if (NULL == reader)
                return NULL;
 
+       (void) pthread_rwlock_wrlock(&storage->lock);
+
        reader->storage = storage;
        reader->callback = callback;
        reader->user_data = user_data;
@@ -201,6 +216,8 @@ log_compressed_storage_reader *log_compressed_storage_new_reader(log_compressed_
        else if (NULL != reader->storage)
                list_add(&storage->waiting_readers, reader);
 
+       pthread_rwlock_unlock(&storage->lock);
+
        return reader;
 }
 
@@ -284,15 +301,17 @@ bool log_compressed_storage_add_new_entry(log_compressed_storage *storage, char
        assert(storage->algo->is_legal(buf_in, size_in));
        assert(size_in <= COMPRESSION_CHUNK_SIZE);
 
+       (void) pthread_rwlock_wrlock(&storage->lock);
+
        size_t size_out = storage->algo->comp(buf_in, size_in, storage->compression_workspace, storage->compression_workspace_size);
        if (size_out == 0)
-               return false;
+               goto out_false;
 
        size_t size_actual = size_out < size_in ? size_out : size_in;
 
        log_compressed_storage_entry *log_compressed_storage_entry = malloc(sizeof(*log_compressed_storage_entry) + size_actual);
        if (NULL == log_compressed_storage_entry)
-               return false;
+               goto out_false;
 
        log_compressed_storage_entry->readers = NULL;
        log_compressed_storage_entry->entry.size_in = size_in;
@@ -324,24 +343,43 @@ bool log_compressed_storage_add_new_entry(log_compressed_storage *storage, char
        while (storage->counter_end - storage->counter_begin > storage->capacity)
                log_compressed_storage_remove_the_earliest_log(storage);
 
+       pthread_rwlock_unlock(&storage->lock);
        return true;
+
+out_false:
+       pthread_rwlock_unlock(&storage->lock);
+       return false;
 }
 
 void log_compressed_storage_clear(log_compressed_storage *storage)
 {
        assert(storage);
+       (void) pthread_rwlock_wrlock(&storage->lock);
        while (NULL != storage->entries)
                log_compressed_storage_remove_the_earliest_log(storage);
+       pthread_rwlock_unlock(&storage->lock);
 }
 
 void log_compressed_storage_release_reader(log_compressed_storage_reader *reader)
 {
        assert(reader);
 
+       log_compressed_storage *const storage = reader->storage;
+       if (!storage) {
+               assert(!reader->current);
+               goto exit;
+       }
+
+       (void) pthread_rwlock_wrlock(&storage->lock);
+
        if (reader->current)
                list_remove(&reader->current->readers, reader);
-       else if (reader->storage)
-               list_remove(&reader->storage->waiting_readers, reader);
+       else
+               list_remove(&storage->waiting_readers, reader);
+
+       pthread_rwlock_unlock(&storage->lock);
+
+exit:
        free(reader);
 }
 
@@ -362,8 +400,13 @@ bool log_compressed_storage_reader_is_new_entry_available(const log_compressed_s
        if (NULL == reader->storage)
                return false;
 
-       return log_compressed_storage_reader_entry_is_not_last(reader, reader->current) ||
-               (NULL == reader->current && NULL != reader->storage->entries);
+       if (NULL == reader->current && NULL != reader->storage->entries)
+               return true;
+
+       (void) pthread_rwlock_rdlock(&reader->storage->lock);
+       bool const ret = log_compressed_storage_reader_entry_is_not_last(reader, reader->current);
+       pthread_rwlock_unlock(&reader->storage->lock);
+       return ret;
 }
 
 size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader *reader, char *uncompressed)
@@ -373,10 +416,12 @@ size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader
        const struct compression_entry *ce = NULL;
 
        // storage is NULL if a dumping/one-shot reader has finished reading
-       if (NULL == reader->storage)
+       log_compressed_storage *const storage = reader->storage;
+       if (NULL == storage)
                return 0;
 
        const struct compression_algo *const algo = reader->storage->algo; // save in case a dumping reader gets disconnected (nulling reader->storage)
+       (void) pthread_rwlock_wrlock(&storage->lock); // wr because of waiting_readers
        if (NULL == reader->current) {
                if (NULL != reader->storage->entries) {
                        // there are some logs in the buffer, let's get them
@@ -395,8 +440,10 @@ size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader
                        list_add(&reader->current->readers, reader);
        }
 
-       if (NULL == ce)
+       if (NULL == ce) {
+               pthread_rwlock_unlock(&storage->lock);
                return 0;
+       }
 
        reader->counter += ce->size_out ?: ce->size_in;
 
@@ -407,6 +454,7 @@ size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader
                size_t decompressed_size = (size_t) algo->decomp(ce->compressed_data, ce->size_out, uncompressed, ce->size_in);
                assert(decompressed_size == ce->size_in);
        }
+       pthread_rwlock_unlock(&storage->lock);
 
        return ce->size_in;
 }
@@ -448,8 +496,9 @@ void log_compressed_storage_resize(log_compressed_storage *storage, unsigned int
 {
        assert(capacity != 0);
 
-       /* in case of shrinking the storage size drop excess logs */
+       (void) pthread_rwlock_wrlock(&storage->lock);
        while (storage->counter_end - storage->counter_begin > capacity)
                log_compressed_storage_remove_the_earliest_log(storage);
        storage->capacity = capacity;
+       pthread_rwlock_unlock(&storage->lock);
 }
index b26cea5..abce906 100644 (file)
@@ -19,6 +19,7 @@
 #include <stdlib.h>
 #include <stdbool.h>
 #include <assert.h>
+#include <pthread.h>
 #include <queued_entry_timestamp.h>
 
 /* This is implementation of log storage.
@@ -124,6 +125,7 @@ struct log_storage {
        uint64_t counter_end;   // "pointer" to the end of the stored log stream
        struct log_storage_entry *entries; // the log entries are stored here
        list_head waiting_readers; // these are the readers that did not read anything from the current entries
+       pthread_rwlock_t lock;
 };
 
 struct log_storage_reader {
@@ -151,6 +153,12 @@ log_storage *log_storage_create(unsigned capacity, dlogutil_sorting_order_e sort
        if (NULL == storage)
                return NULL;
 
+       int const r = pthread_rwlock_init(&storage->lock, NULL);
+       if (r < 0) {
+               free(storage);
+               return NULL;
+       }
+
        storage->capacity = capacity;
        storage->sort_by = sort_by;
        storage->counter_begin = 0;
@@ -172,8 +180,12 @@ void log_storage_free(log_storage *storage)
 {
        assert(storage);
        log_storage_clear(storage);
+
        // disconnect all the remaining readers
        list_foreach(storage->waiting_readers, NULL, log_storage_reader_set_storage);
+
+       (void) pthread_rwlock_destroy(&storage->lock);
+
        free(storage);
 }
 
@@ -199,6 +211,7 @@ log_storage_reader *log_storage_new_reader(log_storage *storage,
        if (NULL == reader)
                return NULL;
 
+       (void) pthread_rwlock_wrlock(&storage->lock);
        reader->storage = storage;
        reader->callback = callback;
        reader->user_data = user_data;
@@ -220,6 +233,7 @@ log_storage_reader *log_storage_new_reader(log_storage *storage,
        else if (NULL != reader->storage)
                list_add(&storage->waiting_readers, reader);
 
+       pthread_rwlock_unlock(&storage->lock);
        return reader;
 }
 
@@ -318,6 +332,7 @@ bool log_storage_add_new_entry(log_storage *storage, const dlogutil_entry_s *le)
        log_storage_entry->readers = NULL;
 
        memcpy(&log_storage_entry->entry, le, le->len);
+       (void) pthread_rwlock_wrlock(&storage->lock);
        if (NULL == storage->entries) {
                // simple insert to empty storage
                storage->entries = log_storage_entry;
@@ -357,14 +372,20 @@ bool log_storage_add_new_entry(log_storage *storage, const dlogutil_entry_s *le)
        // if we have reached the size limit for the buffer - free some space
        while (storage->counter_end - storage->counter_begin > storage->capacity)
                log_storage_remove_the_earliest_log(storage);
+
+       pthread_rwlock_unlock(&storage->lock);
        return true;
 }
 
 void log_storage_clear(log_storage *storage)
 {
        assert(storage);
+       (void) pthread_rwlock_wrlock(&storage->lock);
+
        while (NULL != storage->entries)
                log_storage_remove_the_earliest_log(storage);
+
+       pthread_rwlock_unlock(&storage->lock);
 }
 
 dlogutil_sorting_order_e log_storage_get_sorting_order(const log_storage *storage)
@@ -377,10 +398,22 @@ void log_storage_release_reader(log_storage_reader *reader)
 {
        assert(reader);
 
+       log_storage *const storage = reader->storage;
+       if (!storage) {
+               assert(!reader->current);
+               goto exit;
+       }
+
+       (void) pthread_rwlock_wrlock(&storage->lock);
+
        if (reader->current)
                list_remove(&reader->current->readers, reader);
-       else if (reader->storage)
-               list_remove(&reader->storage->waiting_readers, reader);
+       else
+               list_remove(&storage->waiting_readers, reader);
+
+       pthread_rwlock_unlock(&storage->lock);
+
+exit:
        free(reader);
 }
 
@@ -398,11 +431,17 @@ bool log_storage_reader_is_new_entry_available(const log_storage_reader *reader)
        assert(reader);
 
        // storage is NULL if a dumping/one-shot reader has finished reading
-       if (NULL == reader->storage)
+       log_storage *const storage = reader->storage;
+       if (NULL == storage)
                return false;
 
-       return log_storage_reader_entry_is_not_last(reader, reader->current) ||
-               (NULL == reader->current && NULL != reader->storage->entries);
+       if (NULL == reader->current && NULL != storage->entries)
+               return true;
+
+       (void) pthread_rwlock_rdlock(&storage->lock);
+       bool ret = log_storage_reader_entry_is_not_last(reader, reader->current);
+       pthread_rwlock_unlock(&storage->lock);
+       return ret;
 }
 
 bool log_storage_reader_get_new_entry(log_storage_reader *reader, dlogutil_entry_s *entry)
@@ -412,9 +451,11 @@ bool log_storage_reader_get_new_entry(log_storage_reader *reader, dlogutil_entry
        const dlogutil_entry_s *le = NULL;
 
        // storage is NULL if a dumping/one-shot reader has finished reading
-       if (NULL == reader->storage)
-               return false;
+       log_storage *const storage = reader->storage;
+       if (!storage)
+               return NULL;
 
+       (void) pthread_rwlock_wrlock(&storage->lock); // wr because of waiting_readers
        if (NULL == reader->current) {
                if (NULL != reader->storage->entries) {
                        // there are some logs in the buffer, let's get them
@@ -433,14 +474,16 @@ bool log_storage_reader_get_new_entry(log_storage_reader *reader, dlogutil_entry
                        list_add(&reader->current->readers, reader);
        }
 
-       if (NULL != le) {
-               reader->counter += le->len;
-               if (entry)
-                       memcpy(entry, le, le->len);
-               return true;
-       }
+       if (!le)
+               goto exit;
+
+       reader->counter += le->len;
+       if (entry)
+               memcpy(entry, le, le->len);
 
-       return false;
+exit:
+       pthread_rwlock_unlock(&storage->lock);
+       return !!le;
 }
 
 uint64_t log_storage_reader_get_ready_bytes(const log_storage_reader *reader)