From 815aa3ffc5e8252f00c32876ad3d9792726ef1fd Mon Sep 17 00:00:00 2001 From: Michal Bloch Date: Fri, 12 May 2023 13:17:22 +0200 Subject: [PATCH] Add locks to log storages Change-Id: Ic4ed88e4ee90b750b9bda57b3297b70350237f51 --- Makefile.am | 4 +- src/logger/log_compressed_storage.c | 85 +++++++++++++++++++++++++++++-------- src/logger/log_storage.c | 71 +++++++++++++++++++++++++------ 3 files changed, 126 insertions(+), 34 deletions(-) diff --git a/Makefile.am b/Makefile.am index 7a6ffa3..83563b1 100644 --- a/Makefile.am +++ b/Makefile.am @@ -573,7 +573,7 @@ src_tests_test_libdlog_container_warning_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=acc src_tests_test_logger_log_storage_SOURCES = src/tests/test_logger_log_storage.c src/logger/log_storage.c src/shared/ptrs_list.c src/shared/queued_entry_timestamp.c src_tests_test_logger_log_storage_CFLAGS = $(check_CFLAGS) -src_tests_test_logger_log_storage_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=malloc +src_tests_test_logger_log_storage_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=malloc -pthread src_tests_test_logger_compressed_storage_SOURCES = src/tests/test_logger_compressed_storage.c \ src/logger/compression_fastlz.c \ @@ -584,7 +584,7 @@ src_tests_test_logger_compressed_storage_SOURCES = src/tests/test_logger_compres external/miniz/miniz.c \ external/fastlz/fastlz.c src_tests_test_logger_compressed_storage_CFLAGS = $(check_CFLAGS) -src_tests_test_logger_compressed_storage_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=malloc +src_tests_test_logger_compressed_storage_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=malloc -pthread src_tests_test_ptrs_list_pos_SOURCES = src/tests/test_ptrs_list_pos.c src/shared/ptrs_list.c src_tests_test_ptrs_list_pos_CFLAGS = $(check_CFLAGS) diff --git a/src/logger/log_compressed_storage.c b/src/logger/log_compressed_storage.c index f19ee1a..1373a32 100644 --- a/src/logger/log_compressed_storage.c +++ b/src/logger/log_compressed_storage.c @@ -21,6 +21,7 @@ #include #include #include +#include #include /* Note: this is effectively a simplified version of `log_storage`. @@ -48,6 +49,7 @@ struct log_compressed_storage { char *compression_workspace; size_t compression_workspace_size; struct compression_algo *algo; // compression algorithm implementation + pthread_rwlock_t lock; }; struct log_compressed_storage_reader { @@ -108,18 +110,17 @@ log_compressed_storage *log_compressed_storage_create(unsigned capacity, unsigne return NULL; storage->name = strdup(name); - if (!storage->name) { - free(storage); - return NULL; - } + if (!storage->name) + goto fail_dup; + + int const r = pthread_rwlock_init(&storage->lock, NULL); + if (r < 0) + goto fail_lock; storage->compression_workspace_size = algo->get_workspace_size(COMPRESSION_CHUNK_SIZE); storage->compression_workspace = malloc(storage->compression_workspace_size); - if (!storage->compression_workspace) { - free(storage->name); - free(storage); - return NULL; - } + if (!storage->compression_workspace) + goto fail_workspace; storage->algo = algo; storage->capacity = capacity; @@ -131,6 +132,14 @@ log_compressed_storage *log_compressed_storage_create(unsigned capacity, unsigne storage->waiting_readers = NULL; return storage; + +fail_workspace: + (void) pthread_rwlock_destroy(&storage->lock); +fail_lock: + free(storage->name); +fail_dup: + free(storage); + return NULL; } static void log_compressed_storage_reader_set_storage(void *reader_, void *storage) @@ -145,8 +154,12 @@ void log_compressed_storage_free(log_compressed_storage *storage) assert(storage); log_compressed_storage_clear(storage); + // disconnect all the remaining readers list_foreach(storage->waiting_readers, NULL, log_compressed_storage_reader_set_storage); + + (void) pthread_rwlock_destroy(&storage->lock); + free(storage->compression_workspace); free(storage->name); free(storage); @@ -180,6 +193,8 @@ log_compressed_storage_reader *log_compressed_storage_new_reader(log_compressed_ if (NULL == reader) return NULL; + (void) pthread_rwlock_wrlock(&storage->lock); + reader->storage = storage; reader->callback = callback; reader->user_data = user_data; @@ -201,6 +216,8 @@ log_compressed_storage_reader *log_compressed_storage_new_reader(log_compressed_ else if (NULL != reader->storage) list_add(&storage->waiting_readers, reader); + pthread_rwlock_unlock(&storage->lock); + return reader; } @@ -284,15 +301,17 @@ bool log_compressed_storage_add_new_entry(log_compressed_storage *storage, char assert(storage->algo->is_legal(buf_in, size_in)); assert(size_in <= COMPRESSION_CHUNK_SIZE); + (void) pthread_rwlock_wrlock(&storage->lock); + size_t size_out = storage->algo->comp(buf_in, size_in, storage->compression_workspace, storage->compression_workspace_size); if (size_out == 0) - return false; + goto out_false; size_t size_actual = size_out < size_in ? size_out : size_in; log_compressed_storage_entry *log_compressed_storage_entry = malloc(sizeof(*log_compressed_storage_entry) + size_actual); if (NULL == log_compressed_storage_entry) - return false; + goto out_false; log_compressed_storage_entry->readers = NULL; log_compressed_storage_entry->entry.size_in = size_in; @@ -324,24 +343,43 @@ bool log_compressed_storage_add_new_entry(log_compressed_storage *storage, char while (storage->counter_end - storage->counter_begin > storage->capacity) log_compressed_storage_remove_the_earliest_log(storage); + pthread_rwlock_unlock(&storage->lock); return true; + +out_false: + pthread_rwlock_unlock(&storage->lock); + return false; } void log_compressed_storage_clear(log_compressed_storage *storage) { assert(storage); + (void) pthread_rwlock_wrlock(&storage->lock); while (NULL != storage->entries) log_compressed_storage_remove_the_earliest_log(storage); + pthread_rwlock_unlock(&storage->lock); } void log_compressed_storage_release_reader(log_compressed_storage_reader *reader) { assert(reader); + log_compressed_storage *const storage = reader->storage; + if (!storage) { + assert(!reader->current); + goto exit; + } + + (void) pthread_rwlock_wrlock(&storage->lock); + if (reader->current) list_remove(&reader->current->readers, reader); - else if (reader->storage) - list_remove(&reader->storage->waiting_readers, reader); + else + list_remove(&storage->waiting_readers, reader); + + pthread_rwlock_unlock(&storage->lock); + +exit: free(reader); } @@ -362,8 +400,13 @@ bool log_compressed_storage_reader_is_new_entry_available(const log_compressed_s if (NULL == reader->storage) return false; - return log_compressed_storage_reader_entry_is_not_last(reader, reader->current) || - (NULL == reader->current && NULL != reader->storage->entries); + if (NULL == reader->current && NULL != reader->storage->entries) + return true; + + (void) pthread_rwlock_rdlock(&reader->storage->lock); + bool const ret = log_compressed_storage_reader_entry_is_not_last(reader, reader->current); + pthread_rwlock_unlock(&reader->storage->lock); + return ret; } size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader *reader, char *uncompressed) @@ -373,10 +416,12 @@ size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader const struct compression_entry *ce = NULL; // storage is NULL if a dumping/one-shot reader has finished reading - if (NULL == reader->storage) + log_compressed_storage *const storage = reader->storage; + if (NULL == storage) return 0; const struct compression_algo *const algo = reader->storage->algo; // save in case a dumping reader gets disconnected (nulling reader->storage) + (void) pthread_rwlock_wrlock(&storage->lock); // wr because of waiting_readers if (NULL == reader->current) { if (NULL != reader->storage->entries) { // there are some logs in the buffer, let's get them @@ -395,8 +440,10 @@ size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader list_add(&reader->current->readers, reader); } - if (NULL == ce) + if (NULL == ce) { + pthread_rwlock_unlock(&storage->lock); return 0; + } reader->counter += ce->size_out ?: ce->size_in; @@ -407,6 +454,7 @@ size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader size_t decompressed_size = (size_t) algo->decomp(ce->compressed_data, ce->size_out, uncompressed, ce->size_in); assert(decompressed_size == ce->size_in); } + pthread_rwlock_unlock(&storage->lock); return ce->size_in; } @@ -448,8 +496,9 @@ void log_compressed_storage_resize(log_compressed_storage *storage, unsigned int { assert(capacity != 0); - /* in case of shrinking the storage size drop excess logs */ + (void) pthread_rwlock_wrlock(&storage->lock); while (storage->counter_end - storage->counter_begin > capacity) log_compressed_storage_remove_the_earliest_log(storage); storage->capacity = capacity; + pthread_rwlock_unlock(&storage->lock); } diff --git a/src/logger/log_storage.c b/src/logger/log_storage.c index b26cea5..abce906 100644 --- a/src/logger/log_storage.c +++ b/src/logger/log_storage.c @@ -19,6 +19,7 @@ #include #include #include +#include #include /* This is implementation of log storage. @@ -124,6 +125,7 @@ struct log_storage { uint64_t counter_end; // "pointer" to the end of the stored log stream struct log_storage_entry *entries; // the log entries are stored here list_head waiting_readers; // these are the readers that did not read anything from the current entries + pthread_rwlock_t lock; }; struct log_storage_reader { @@ -151,6 +153,12 @@ log_storage *log_storage_create(unsigned capacity, dlogutil_sorting_order_e sort if (NULL == storage) return NULL; + int const r = pthread_rwlock_init(&storage->lock, NULL); + if (r < 0) { + free(storage); + return NULL; + } + storage->capacity = capacity; storage->sort_by = sort_by; storage->counter_begin = 0; @@ -172,8 +180,12 @@ void log_storage_free(log_storage *storage) { assert(storage); log_storage_clear(storage); + // disconnect all the remaining readers list_foreach(storage->waiting_readers, NULL, log_storage_reader_set_storage); + + (void) pthread_rwlock_destroy(&storage->lock); + free(storage); } @@ -199,6 +211,7 @@ log_storage_reader *log_storage_new_reader(log_storage *storage, if (NULL == reader) return NULL; + (void) pthread_rwlock_wrlock(&storage->lock); reader->storage = storage; reader->callback = callback; reader->user_data = user_data; @@ -220,6 +233,7 @@ log_storage_reader *log_storage_new_reader(log_storage *storage, else if (NULL != reader->storage) list_add(&storage->waiting_readers, reader); + pthread_rwlock_unlock(&storage->lock); return reader; } @@ -318,6 +332,7 @@ bool log_storage_add_new_entry(log_storage *storage, const dlogutil_entry_s *le) log_storage_entry->readers = NULL; memcpy(&log_storage_entry->entry, le, le->len); + (void) pthread_rwlock_wrlock(&storage->lock); if (NULL == storage->entries) { // simple insert to empty storage storage->entries = log_storage_entry; @@ -357,14 +372,20 @@ bool log_storage_add_new_entry(log_storage *storage, const dlogutil_entry_s *le) // if we have reached the size limit for the buffer - free some space while (storage->counter_end - storage->counter_begin > storage->capacity) log_storage_remove_the_earliest_log(storage); + + pthread_rwlock_unlock(&storage->lock); return true; } void log_storage_clear(log_storage *storage) { assert(storage); + (void) pthread_rwlock_wrlock(&storage->lock); + while (NULL != storage->entries) log_storage_remove_the_earliest_log(storage); + + pthread_rwlock_unlock(&storage->lock); } dlogutil_sorting_order_e log_storage_get_sorting_order(const log_storage *storage) @@ -377,10 +398,22 @@ void log_storage_release_reader(log_storage_reader *reader) { assert(reader); + log_storage *const storage = reader->storage; + if (!storage) { + assert(!reader->current); + goto exit; + } + + (void) pthread_rwlock_wrlock(&storage->lock); + if (reader->current) list_remove(&reader->current->readers, reader); - else if (reader->storage) - list_remove(&reader->storage->waiting_readers, reader); + else + list_remove(&storage->waiting_readers, reader); + + pthread_rwlock_unlock(&storage->lock); + +exit: free(reader); } @@ -398,11 +431,17 @@ bool log_storage_reader_is_new_entry_available(const log_storage_reader *reader) assert(reader); // storage is NULL if a dumping/one-shot reader has finished reading - if (NULL == reader->storage) + log_storage *const storage = reader->storage; + if (NULL == storage) return false; - return log_storage_reader_entry_is_not_last(reader, reader->current) || - (NULL == reader->current && NULL != reader->storage->entries); + if (NULL == reader->current && NULL != storage->entries) + return true; + + (void) pthread_rwlock_rdlock(&storage->lock); + bool ret = log_storage_reader_entry_is_not_last(reader, reader->current); + pthread_rwlock_unlock(&storage->lock); + return ret; } bool log_storage_reader_get_new_entry(log_storage_reader *reader, dlogutil_entry_s *entry) @@ -412,9 +451,11 @@ bool log_storage_reader_get_new_entry(log_storage_reader *reader, dlogutil_entry const dlogutil_entry_s *le = NULL; // storage is NULL if a dumping/one-shot reader has finished reading - if (NULL == reader->storage) - return false; + log_storage *const storage = reader->storage; + if (!storage) + return NULL; + (void) pthread_rwlock_wrlock(&storage->lock); // wr because of waiting_readers if (NULL == reader->current) { if (NULL != reader->storage->entries) { // there are some logs in the buffer, let's get them @@ -433,14 +474,16 @@ bool log_storage_reader_get_new_entry(log_storage_reader *reader, dlogutil_entry list_add(&reader->current->readers, reader); } - if (NULL != le) { - reader->counter += le->len; - if (entry) - memcpy(entry, le, le->len); - return true; - } + if (!le) + goto exit; + + reader->counter += le->len; + if (entry) + memcpy(entry, le, le->len); - return false; +exit: + pthread_rwlock_unlock(&storage->lock); + return !!le; } uint64_t log_storage_reader_get_ready_bytes(const log_storage_reader *reader) -- 2.7.4