#include <stdlib.h>
#include <stdbool.h>
#include <assert.h>
+#include <pthread.h>
#include <queued_entry_timestamp.h>
/* Note: this is effectively a simplified version of `log_storage`.
char *compression_workspace;
size_t compression_workspace_size;
struct compression_algo *algo; // compression algorithm implementation
+ pthread_rwlock_t lock;
};
struct log_compressed_storage_reader {
return NULL;
storage->name = strdup(name);
- if (!storage->name) {
- free(storage);
- return NULL;
- }
+ if (!storage->name)
+ goto fail_dup;
+
+ int const r = pthread_rwlock_init(&storage->lock, NULL);
+ if (r < 0)
+ goto fail_lock;
storage->compression_workspace_size = algo->get_workspace_size(COMPRESSION_CHUNK_SIZE);
storage->compression_workspace = malloc(storage->compression_workspace_size);
- if (!storage->compression_workspace) {
- free(storage->name);
- free(storage);
- return NULL;
- }
+ if (!storage->compression_workspace)
+ goto fail_workspace;
storage->algo = algo;
storage->capacity = capacity;
storage->waiting_readers = NULL;
return storage;
+
+fail_workspace:
+ (void) pthread_rwlock_destroy(&storage->lock);
+fail_lock:
+ free(storage->name);
+fail_dup:
+ free(storage);
+ return NULL;
}
static void log_compressed_storage_reader_set_storage(void *reader_, void *storage)
assert(storage);
log_compressed_storage_clear(storage);
+
// disconnect all the remaining readers
list_foreach(storage->waiting_readers, NULL, log_compressed_storage_reader_set_storage);
+
+ (void) pthread_rwlock_destroy(&storage->lock);
+
free(storage->compression_workspace);
free(storage->name);
free(storage);
if (NULL == reader)
return NULL;
+ (void) pthread_rwlock_wrlock(&storage->lock);
+
reader->storage = storage;
reader->callback = callback;
reader->user_data = user_data;
else if (NULL != reader->storage)
list_add(&storage->waiting_readers, reader);
+ pthread_rwlock_unlock(&storage->lock);
+
return reader;
}
assert(storage->algo->is_legal(buf_in, size_in));
assert(size_in <= COMPRESSION_CHUNK_SIZE);
+ (void) pthread_rwlock_wrlock(&storage->lock);
+
size_t size_out = storage->algo->comp(buf_in, size_in, storage->compression_workspace, storage->compression_workspace_size);
if (size_out == 0)
- return false;
+ goto out_false;
size_t size_actual = size_out < size_in ? size_out : size_in;
log_compressed_storage_entry *log_compressed_storage_entry = malloc(sizeof(*log_compressed_storage_entry) + size_actual);
if (NULL == log_compressed_storage_entry)
- return false;
+ goto out_false;
log_compressed_storage_entry->readers = NULL;
log_compressed_storage_entry->entry.size_in = size_in;
while (storage->counter_end - storage->counter_begin > storage->capacity)
log_compressed_storage_remove_the_earliest_log(storage);
+ pthread_rwlock_unlock(&storage->lock);
return true;
+
+out_false:
+ pthread_rwlock_unlock(&storage->lock);
+ return false;
}
void log_compressed_storage_clear(log_compressed_storage *storage)
{
assert(storage);
+ (void) pthread_rwlock_wrlock(&storage->lock);
while (NULL != storage->entries)
log_compressed_storage_remove_the_earliest_log(storage);
+ pthread_rwlock_unlock(&storage->lock);
}
void log_compressed_storage_release_reader(log_compressed_storage_reader *reader)
{
assert(reader);
+ log_compressed_storage *const storage = reader->storage;
+ if (!storage) {
+ assert(!reader->current);
+ goto exit;
+ }
+
+ (void) pthread_rwlock_wrlock(&storage->lock);
+
if (reader->current)
list_remove(&reader->current->readers, reader);
- else if (reader->storage)
- list_remove(&reader->storage->waiting_readers, reader);
+ else
+ list_remove(&storage->waiting_readers, reader);
+
+ pthread_rwlock_unlock(&storage->lock);
+
+exit:
free(reader);
}
if (NULL == reader->storage)
return false;
- return log_compressed_storage_reader_entry_is_not_last(reader, reader->current) ||
- (NULL == reader->current && NULL != reader->storage->entries);
+ if (NULL == reader->current && NULL != reader->storage->entries)
+ return true;
+
+ (void) pthread_rwlock_rdlock(&reader->storage->lock);
+ bool const ret = log_compressed_storage_reader_entry_is_not_last(reader, reader->current);
+ pthread_rwlock_unlock(&reader->storage->lock);
+ return ret;
}
size_t log_compressed_storage_reader_get_new_entry(log_compressed_storage_reader *reader, char *uncompressed)
const struct compression_entry *ce = NULL;
// storage is NULL if a dumping/one-shot reader has finished reading
- if (NULL == reader->storage)
+ log_compressed_storage *const storage = reader->storage;
+ if (NULL == storage)
return 0;
const struct compression_algo *const algo = reader->storage->algo; // save in case a dumping reader gets disconnected (nulling reader->storage)
+ (void) pthread_rwlock_wrlock(&storage->lock); // wr because of waiting_readers
if (NULL == reader->current) {
if (NULL != reader->storage->entries) {
// there are some logs in the buffer, let's get them
list_add(&reader->current->readers, reader);
}
- if (NULL == ce)
+ if (NULL == ce) {
+ pthread_rwlock_unlock(&storage->lock);
return 0;
+ }
reader->counter += ce->size_out ?: ce->size_in;
size_t decompressed_size = (size_t) algo->decomp(ce->compressed_data, ce->size_out, uncompressed, ce->size_in);
assert(decompressed_size == ce->size_in);
}
+ pthread_rwlock_unlock(&storage->lock);
return ce->size_in;
}
{
assert(capacity != 0);
- /* in case of shrinking the storage size drop excess logs */
+ (void) pthread_rwlock_wrlock(&storage->lock);
while (storage->counter_end - storage->counter_begin > capacity)
log_compressed_storage_remove_the_earliest_log(storage);
storage->capacity = capacity;
+ pthread_rwlock_unlock(&storage->lock);
}
#include <stdlib.h>
#include <stdbool.h>
#include <assert.h>
+#include <pthread.h>
#include <queued_entry_timestamp.h>
/* This is implementation of log storage.
uint64_t counter_end; // "pointer" to the end of the stored log stream
struct log_storage_entry *entries; // the log entries are stored here
list_head waiting_readers; // these are the readers that did not read anything from the current entries
+ pthread_rwlock_t lock;
};
struct log_storage_reader {
if (NULL == storage)
return NULL;
+ int const r = pthread_rwlock_init(&storage->lock, NULL);
+ if (r < 0) {
+ free(storage);
+ return NULL;
+ }
+
storage->capacity = capacity;
storage->sort_by = sort_by;
storage->counter_begin = 0;
{
assert(storage);
log_storage_clear(storage);
+
// disconnect all the remaining readers
list_foreach(storage->waiting_readers, NULL, log_storage_reader_set_storage);
+
+ (void) pthread_rwlock_destroy(&storage->lock);
+
free(storage);
}
if (NULL == reader)
return NULL;
+ (void) pthread_rwlock_wrlock(&storage->lock);
reader->storage = storage;
reader->callback = callback;
reader->user_data = user_data;
else if (NULL != reader->storage)
list_add(&storage->waiting_readers, reader);
+ pthread_rwlock_unlock(&storage->lock);
return reader;
}
log_storage_entry->readers = NULL;
memcpy(&log_storage_entry->entry, le, le->len);
+ (void) pthread_rwlock_wrlock(&storage->lock);
if (NULL == storage->entries) {
// simple insert to empty storage
storage->entries = log_storage_entry;
// if we have reached the size limit for the buffer - free some space
while (storage->counter_end - storage->counter_begin > storage->capacity)
log_storage_remove_the_earliest_log(storage);
+
+ pthread_rwlock_unlock(&storage->lock);
return true;
}
void log_storage_clear(log_storage *storage)
{
assert(storage);
+ (void) pthread_rwlock_wrlock(&storage->lock);
+
while (NULL != storage->entries)
log_storage_remove_the_earliest_log(storage);
+
+ pthread_rwlock_unlock(&storage->lock);
}
dlogutil_sorting_order_e log_storage_get_sorting_order(const log_storage *storage)
{
assert(reader);
+ log_storage *const storage = reader->storage;
+ if (!storage) {
+ assert(!reader->current);
+ goto exit;
+ }
+
+ (void) pthread_rwlock_wrlock(&storage->lock);
+
if (reader->current)
list_remove(&reader->current->readers, reader);
- else if (reader->storage)
- list_remove(&reader->storage->waiting_readers, reader);
+ else
+ list_remove(&storage->waiting_readers, reader);
+
+ pthread_rwlock_unlock(&storage->lock);
+
+exit:
free(reader);
}
assert(reader);
// storage is NULL if a dumping/one-shot reader has finished reading
- if (NULL == reader->storage)
+ log_storage *const storage = reader->storage;
+ if (NULL == storage)
return false;
- return log_storage_reader_entry_is_not_last(reader, reader->current) ||
- (NULL == reader->current && NULL != reader->storage->entries);
+ if (NULL == reader->current && NULL != storage->entries)
+ return true;
+
+ (void) pthread_rwlock_rdlock(&storage->lock);
+ bool ret = log_storage_reader_entry_is_not_last(reader, reader->current);
+ pthread_rwlock_unlock(&storage->lock);
+ return ret;
}
bool log_storage_reader_get_new_entry(log_storage_reader *reader, dlogutil_entry_s *entry)
const dlogutil_entry_s *le = NULL;
// storage is NULL if a dumping/one-shot reader has finished reading
- if (NULL == reader->storage)
- return false;
+ log_storage *const storage = reader->storage;
+ if (!storage)
+ return NULL;
+ (void) pthread_rwlock_wrlock(&storage->lock); // wr because of waiting_readers
if (NULL == reader->current) {
if (NULL != reader->storage->entries) {
// there are some logs in the buffer, let's get them
list_add(&reader->current->readers, reader);
}
- if (NULL != le) {
- reader->counter += le->len;
- if (entry)
- memcpy(entry, le, le->len);
- return true;
- }
+ if (!le)
+ goto exit;
+
+ reader->counter += le->len;
+ if (entry)
+ memcpy(entry, le, le->len);
- return false;
+exit:
+ pthread_rwlock_unlock(&storage->lock);
+ return !!le;
}
uint64_t log_storage_reader_get_ready_bytes(const log_storage_reader *reader)