From dbac5e45fb05985c519eca31178531ac7b77dabc Mon Sep 17 00:00:00 2001 From: Michal Bloch Date: Wed, 13 Apr 2022 18:30:00 +0200 Subject: [PATCH] Extract pipe reader functionality into subs Change-Id: I83d194543b400750c664b47ae6af8ccfac3920e7 Signed-off-by: Michal Bloch --- Makefile.am | 1 + src/logger/log_buffer.c | 7 ++- src/logger/logger.c | 2 +- src/logger/reader_pipe.c | 63 ++++----------------- src/logger/reader_pipe.h | 7 +-- src/logger/subreader_dlogutil.c | 119 ++++++++++++++++++++++++++++++++++++++++ src/logger/subreader_dlogutil.h | 14 +++++ 7 files changed, 154 insertions(+), 59 deletions(-) create mode 100644 src/logger/subreader_dlogutil.c create mode 100644 src/logger/subreader_dlogutil.h diff --git a/Makefile.am b/Makefile.am index ab92e94..d4b9773 100644 --- a/Makefile.am +++ b/Makefile.am @@ -137,6 +137,7 @@ dlog_logger_SOURCES = \ src/logger/reader_common.c \ src/logger/reader_logger.c \ src/logger/reader_pipe.c \ + src/logger/subreader_dlogutil.c \ src/logger/subreader_file.c \ src/logger/subreader_metrics.c \ src/logger/socket.c \ diff --git a/src/logger/log_buffer.c b/src/logger/log_buffer.c index 63140ee..e0fd735 100644 --- a/src/logger/log_buffer.c +++ b/src/logger/log_buffer.c @@ -1,5 +1,6 @@ #include "log_buffer.h" #include "logger_internal.h" +#include "subreader_dlogutil.h" #include /** @@ -244,7 +245,7 @@ static int service_writer_handle_req_util(struct logger *server, struct writer * goto cleanup; } - retval = reader_pipe_init_with_writer(&reader, wr, server, params.filter, params.monitor, params.is_dumping); + retval = reader_pipe_init_with_writer(&reader, wr, server, params.monitor, params.is_dumping); if (retval != 0) goto cleanup; @@ -253,6 +254,10 @@ static int service_writer_handle_req_util(struct logger *server, struct writer * if (retval < 0) goto cleanup; + retval = reader_add_subreader_dlogutil(&reader->common, params.filter, pipe_fd[1]); + if (retval != 0) + goto cleanup; + set_write_fd_entity(&reader->common.fd_entity_sink, pipe_fd[1]); retval = send_pipe(wr->fd_entity.fd, pipe_fd[0]); if (pipe_fd[0] > 0) diff --git a/src/logger/logger.c b/src/logger/logger.c index 8b676bc..fc17d53 100644 --- a/src/logger/logger.c +++ b/src/logger/logger.c @@ -325,7 +325,7 @@ static int create_reader_pipe_from_dlogutil_line(struct dlogutil_line_params *pa if (params->file.path == NULL) return -EINVAL; - retval = reader_pipe_init(&reader, params->buf_id, server, params->filter, params->monitor, params->is_dumping); + retval = reader_pipe_init(&reader, params->buf_id, server, params->monitor, params->is_dumping); if (retval != 0) return retval; diff --git a/src/logger/reader_pipe.c b/src/logger/reader_pipe.c index ca862bb..1e1f1ae 100644 --- a/src/logger/reader_pipe.c +++ b/src/logger/reader_pipe.c @@ -8,8 +8,6 @@ static void reader_pipe_free(struct reader_common *_reader) if (reader->log_storage_reader_ptr) log_storage_release_reader(reader->log_storage_reader_ptr); - if (reader->filter) - log_filter_free(reader->filter); } static int print_out_logs(struct reader_common *_reader, struct now_t _time); @@ -45,8 +43,7 @@ static void dispatch_event_reader_pipe(struct logger *server, struct epoll_event } } -static struct reader_pipe *reader_pipe_alloc(struct log_filter *filter, struct timespec ts, - bool monitor, bool is_dumping) +static struct reader_pipe *reader_pipe_alloc(struct timespec ts, bool monitor, bool is_dumping) { struct reader_pipe *ret = calloc(1, sizeof(*ret)); if (!ret) @@ -57,31 +54,28 @@ static struct reader_pipe *reader_pipe_alloc(struct log_filter *filter, struct t ret->common.free_reader = reader_pipe_free; ret->common.service_reader = print_out_logs; - ret->filter = log_filter_from_filter(filter); ret->monitor = monitor; ret->is_dumping = is_dumping; ret->buf_ptr = NULL; ret->log_storage_reader_ptr = NULL; ret->last_read_time = ts; - ret->partial_log_size = 0; return ret; } int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger *server, - struct log_filter *filter, bool monitor, bool is_dumping) + bool monitor, bool is_dumping) { assert(reader); assert(buf_id > LOG_ID_INVALID); assert(buf_id < LOG_ID_MAX); assert(server); - assert(filter); struct timespec ts_mono; if (clock_gettime(CLOCK_MONOTONIC, &ts_mono)) return -errno; - __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(filter, ts_mono, monitor, is_dumping); + __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(ts_mono, monitor, is_dumping); if (!ret) return -ENOMEM; @@ -98,19 +92,18 @@ int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger } int reader_pipe_init_with_writer(struct reader_pipe **reader, struct writer *writer, struct logger *server, - struct log_filter *filter, bool monitor, bool is_dumping) + bool monitor, bool is_dumping) { assert(reader); assert(writer); assert(writer->buf_ptr); assert(server); - assert(filter); struct timespec ts_mono; if (clock_gettime(CLOCK_MONOTONIC, &ts_mono)) return -errno; - __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(filter, ts_mono, monitor, is_dumping); + __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(ts_mono, monitor, is_dumping); if (!ret) return -ENOMEM; @@ -130,33 +123,7 @@ int reader_print_out_single_log(struct reader_pipe *reader, const dlogutil_entry assert(reader->buf_ptr); assert(dlogutil_entry); - if (!log_should_print_line(reader->filter, dlogutil_entry)) - return 0; - - if (reader->common.subs != NULL) - return list_foreach_ret(reader->common.subs, (dlogutil_entry_s *) dlogutil_entry, subreader_apply_log); - - const char *tag = dlogutil_entry->msg + 1; - if (!strlen(tag)) - return 0; - - int r = write(reader->common.fd_entity_sink.fd, dlogutil_entry, dlogutil_entry->len); - if (r < 0) { - if (errno != EAGAIN) - return 1; - - /* The pipe is just clogged, this is not an actual error. - * We own the entry so it needs to be saved for later. */ - r = 0; - } - - if (r < dlogutil_entry->len) { - reader->partial_log_size = dlogutil_entry->len - r; - memcpy(reader->partial_log, ((char *)dlogutil_entry) + r, reader->partial_log_size); - return -1; - } - - return 0; + return list_foreach_ret(reader->common.subs, (dlogutil_entry_s *) dlogutil_entry, subreader_apply_log); } /** @@ -173,19 +140,11 @@ static int print_out_logs(struct reader_common *_reader, struct now_t _time) assert(reader->buf_ptr); - if (reader->partial_log_size) { - int r = write(reader->common.fd_entity_sink.fd, reader->partial_log, reader->partial_log_size); - if (r <= 0) - return r != 0 && errno != EAGAIN; - - if (r < reader->partial_log_size) { - reader->partial_log_size -= r; - memmove(reader->partial_log, reader->partial_log + r, reader->partial_log_size); - return 0; - } - - reader->partial_log_size = 0; - } + int r = reader_flush(_reader, (struct timespec){0, 0}, 0); + if (r > 0) + return 1; + if (r < 0) + return 0; while (log_storage_reader_is_new_entry_available(reader->log_storage_reader_ptr)) { const dlogutil_entry_s *ple = (const dlogutil_entry_s *)log_storage_reader_get_new_entry(reader->log_storage_reader_ptr); diff --git a/src/logger/reader_pipe.h b/src/logger/reader_pipe.h index 5ad4dbd..cba2f9e 100644 --- a/src/logger/reader_pipe.h +++ b/src/logger/reader_pipe.h @@ -14,16 +14,13 @@ struct reader_pipe { struct log_buffer *buf_ptr; log_storage_reader *log_storage_reader_ptr; struct timespec last_read_time; - int partial_log_size; - char partial_log[sizeof(struct dlogutil_entry_with_msg)]; bool is_dumping; bool monitor; - struct log_filter *filter; }; int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger *server, - struct log_filter *filter, bool monitor, bool is_dumping); + bool monitor, bool is_dumping); int reader_pipe_init_with_writer(struct reader_pipe **reader, struct writer *writer, struct logger *server, - struct log_filter *filter, bool monitor, bool is_dumping); + bool monitor, bool is_dumping); int reader_should_buffer(struct reader_pipe *reader, const struct buf_params *buf_params, struct timespec now); int reader_print_out_single_log(struct reader_pipe *reader, const dlogutil_entry_s *dlogutil_entry); diff --git a/src/logger/subreader_dlogutil.c b/src/logger/subreader_dlogutil.c new file mode 100644 index 0000000..ed5531c --- /dev/null +++ b/src/logger/subreader_dlogutil.c @@ -0,0 +1,119 @@ +#include "subreader_dlogutil.h" +#include "logger_internal.h" + +static void subreader_dlogutil_free(void *userdata) +{ + struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) userdata; + assert(srdu); + + /* No close(pipe_fd)! It is already contained in + * reader_common's sink FD entity (pipes can get + * clogged so they must be exposed to epoll) and + * gets closed that way. + * + * This is a bit suboptimal (since a reader can + * only have one subreader that requires to use + * the FD entity) but in practice it's a 1 to 1 + * relationship anyway so far, and would create + * problems (what if one subreader can continue + * and another does not?) so I'm happy with it. */ +} + +static int subreader_dlogutil_flush(void *userdata, struct timespec ts, int flush_time) +{ + struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) userdata; + assert(srdu); + + if (!srdu->partial_log_size) + return 0; + + int r = write(srdu->pipe_fd, srdu->partial_log, srdu->partial_log_size); + if (r <= 0) + return (r != 0 && errno != EAGAIN) ? 1 : -1; + + if (r < srdu->partial_log_size) { + srdu->partial_log_size -= r; + memmove(srdu->partial_log, srdu->partial_log + r, srdu->partial_log_size); + return -1; + } + + srdu->partial_log_size = 0; + return 0; +} + +static int subreader_dlogutil_apply_log(const struct subreader_common *sub, const struct dlogutil_entry *due) +{ + assert(sub); + assert(due); + + struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) sub->sub_userdata; + assert(srdu); + + const char *tag = due->msg + 1; + if (!strlen(tag)) + return 0; + + int r = subreader_dlogutil_flush(srdu, (struct timespec) {0, 0}, 0); + if (r) { + /* Shouldn't really happen, since we only retry after + * we get an event that the pipe got unclogged. Maybe + * if the pipe only gets unclogged by, say, 1 byte a + * few times in a row and we try to append a bunch of + * full logs to the buffer, the buffer can get clogged + * as well? Drop the log in that case; realistically + * that shouldn't happen though. */ + + if (srdu->partial_log_size + due->len >= sizeof srdu->partial_log) + return r; + + memcpy(srdu->partial_log + srdu->partial_log_size, (char *) due, due->len); + srdu->partial_log_size += due->len; + + return r; + } + + r = write(srdu->pipe_fd, due, due->len); + if (r < 0) { + if (errno != EAGAIN) + return 1; + + /* The pipe is just clogged, this is not an actual error. + * We own the entry so it needs to be saved for later. */ + r = 0; + } + + if (r < due->len) { + srdu->partial_log_size = due->len - r; + memcpy(srdu->partial_log, ((char *)due) + r, srdu->partial_log_size); + return -1; + } + + return 0; +} + +int reader_add_subreader_dlogutil(struct reader_common *reader, struct log_filter *filter, int pipe_fd) +{ + assert(reader); + assert(pipe_fd >= 0); + + struct subreader_common *const sub = malloc(sizeof *sub); + struct subreader_dlogutil *const srdu = malloc(sizeof *srdu); + if (!sub || !srdu) { + free(sub); + free(srdu); + return -ENOMEM; + } + + srdu->pipe_fd = pipe_fd; + srdu->partial_log_size = 0; + + sub->sub_userdata = srdu; + sub->sub_destroy = subreader_dlogutil_free; + sub->sub_apply_log = subreader_dlogutil_apply_log; + sub->sub_flush = subreader_dlogutil_flush; + sub->filter = log_filter_move(filter); + + list_add(&reader->subs, sub); + + return 0; +} diff --git a/src/logger/subreader_dlogutil.h b/src/logger/subreader_dlogutil.h new file mode 100644 index 0000000..91fa654 --- /dev/null +++ b/src/logger/subreader_dlogutil.h @@ -0,0 +1,14 @@ +#pragma once + +#include "reader_common.h" +#include "log_storage.h" +#include "queued_entry_timestamp.h" + +struct subreader_dlogutil { + int pipe_fd; + int partial_log_size; + char partial_log[sizeof(struct dlogutil_entry_with_msg)]; +}; + +int reader_add_subreader_dlogutil(struct reader_common *reader, struct log_filter *filter, int pipe_fd); + -- 2.7.4