Extract pipe reader functionality into subs 52/273652/7
authorMichal Bloch <m.bloch@samsung.com>
Wed, 13 Apr 2022 16:30:00 +0000 (18:30 +0200)
committerMateusz Majewski <m.majewski2@samsung.com>
Thu, 28 Apr 2022 10:40:00 +0000 (12:40 +0200)
Change-Id: I83d194543b400750c664b47ae6af8ccfac3920e7
Signed-off-by: Michal Bloch <m.bloch@samsung.com>
Makefile.am
src/logger/log_buffer.c
src/logger/logger.c
src/logger/reader_pipe.c
src/logger/reader_pipe.h
src/logger/subreader_dlogutil.c [new file with mode: 0644]
src/logger/subreader_dlogutil.h [new file with mode: 0644]

index ab92e94..d4b9773 100644 (file)
@@ -137,6 +137,7 @@ dlog_logger_SOURCES = \
        src/logger/reader_common.c \
        src/logger/reader_logger.c \
        src/logger/reader_pipe.c \
+       src/logger/subreader_dlogutil.c \
        src/logger/subreader_file.c \
        src/logger/subreader_metrics.c \
        src/logger/socket.c \
index 63140ee..e0fd735 100644 (file)
@@ -1,5 +1,6 @@
 #include "log_buffer.h"
 #include "logger_internal.h"
+#include "subreader_dlogutil.h"
 #include <ptrs_list.h>
 
 /**
@@ -244,7 +245,7 @@ static int service_writer_handle_req_util(struct logger *server, struct writer *
                goto cleanup;
        }
 
-       retval = reader_pipe_init_with_writer(&reader, wr, server, params.filter, params.monitor, params.is_dumping);
+       retval = reader_pipe_init_with_writer(&reader, wr, server, params.monitor, params.is_dumping);
        if (retval != 0)
                goto cleanup;
 
@@ -253,6 +254,10 @@ static int service_writer_handle_req_util(struct logger *server, struct writer *
        if (retval < 0)
                goto cleanup;
 
+       retval = reader_add_subreader_dlogutil(&reader->common, params.filter, pipe_fd[1]);
+       if (retval != 0)
+               goto cleanup;
+
        set_write_fd_entity(&reader->common.fd_entity_sink, pipe_fd[1]);
        retval = send_pipe(wr->fd_entity.fd, pipe_fd[0]);
        if (pipe_fd[0] > 0)
index 8b676bc..fc17d53 100644 (file)
@@ -325,7 +325,7 @@ static int create_reader_pipe_from_dlogutil_line(struct dlogutil_line_params *pa
        if (params->file.path == NULL)
                return -EINVAL;
 
-       retval = reader_pipe_init(&reader, params->buf_id, server, params->filter, params->monitor, params->is_dumping);
+       retval = reader_pipe_init(&reader, params->buf_id, server, params->monitor, params->is_dumping);
        if (retval != 0)
                return retval;
 
index ca862bb..1e1f1ae 100644 (file)
@@ -8,8 +8,6 @@ static void reader_pipe_free(struct reader_common *_reader)
 
        if (reader->log_storage_reader_ptr)
                log_storage_release_reader(reader->log_storage_reader_ptr);
-       if (reader->filter)
-               log_filter_free(reader->filter);
 }
 
 static int print_out_logs(struct reader_common *_reader, struct now_t _time);
@@ -45,8 +43,7 @@ static void dispatch_event_reader_pipe(struct logger *server, struct epoll_event
        }
 }
 
-static struct reader_pipe *reader_pipe_alloc(struct log_filter *filter, struct timespec ts,
-       bool monitor, bool is_dumping)
+static struct reader_pipe *reader_pipe_alloc(struct timespec ts, bool monitor, bool is_dumping)
 {
        struct reader_pipe *ret = calloc(1, sizeof(*ret));
        if (!ret)
@@ -57,31 +54,28 @@ static struct reader_pipe *reader_pipe_alloc(struct log_filter *filter, struct t
 
        ret->common.free_reader = reader_pipe_free;
        ret->common.service_reader = print_out_logs;
-       ret->filter = log_filter_from_filter(filter);
        ret->monitor = monitor;
        ret->is_dumping = is_dumping;
        ret->buf_ptr = NULL;
        ret->log_storage_reader_ptr = NULL;
        ret->last_read_time = ts;
-       ret->partial_log_size = 0;
 
        return ret;
 }
 
 int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger *server,
-       struct log_filter *filter, bool monitor, bool is_dumping)
+       bool monitor, bool is_dumping)
 {
        assert(reader);
        assert(buf_id > LOG_ID_INVALID);
        assert(buf_id < LOG_ID_MAX);
        assert(server);
-       assert(filter);
 
        struct timespec ts_mono;
        if (clock_gettime(CLOCK_MONOTONIC, &ts_mono))
                return -errno;
 
-       __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(filter, ts_mono, monitor, is_dumping);
+       __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(ts_mono, monitor, is_dumping);
        if (!ret)
                return -ENOMEM;
 
@@ -98,19 +92,18 @@ int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger
 }
 
 int reader_pipe_init_with_writer(struct reader_pipe **reader, struct writer *writer, struct logger *server,
-       struct log_filter *filter, bool monitor, bool is_dumping)
+       bool monitor, bool is_dumping)
 {
        assert(reader);
        assert(writer);
        assert(writer->buf_ptr);
        assert(server);
-       assert(filter);
 
        struct timespec ts_mono;
        if (clock_gettime(CLOCK_MONOTONIC, &ts_mono))
                return -errno;
 
-       __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(filter, ts_mono, monitor, is_dumping);
+       __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(ts_mono, monitor, is_dumping);
        if (!ret)
                return -ENOMEM;
 
@@ -130,33 +123,7 @@ int reader_print_out_single_log(struct reader_pipe *reader, const dlogutil_entry
        assert(reader->buf_ptr);
        assert(dlogutil_entry);
 
-       if (!log_should_print_line(reader->filter, dlogutil_entry))
-               return 0;
-
-       if (reader->common.subs != NULL)
-               return list_foreach_ret(reader->common.subs, (dlogutil_entry_s *) dlogutil_entry, subreader_apply_log);
-
-       const char *tag = dlogutil_entry->msg + 1;
-       if (!strlen(tag))
-               return 0;
-
-       int r = write(reader->common.fd_entity_sink.fd, dlogutil_entry, dlogutil_entry->len);
-       if (r < 0) {
-               if (errno != EAGAIN)
-                       return 1;
-
-               /* The pipe is just clogged, this is not an actual error.
-                * We own the entry so it needs to be saved for later. */
-               r = 0;
-       }
-
-       if (r < dlogutil_entry->len) {
-               reader->partial_log_size = dlogutil_entry->len - r;
-               memcpy(reader->partial_log, ((char *)dlogutil_entry) + r, reader->partial_log_size);
-               return -1;
-       }
-
-       return 0;
+       return list_foreach_ret(reader->common.subs, (dlogutil_entry_s *) dlogutil_entry, subreader_apply_log);
 }
 
 /**
@@ -173,19 +140,11 @@ static int print_out_logs(struct reader_common *_reader, struct now_t _time)
 
        assert(reader->buf_ptr);
 
-       if (reader->partial_log_size) {
-               int r = write(reader->common.fd_entity_sink.fd, reader->partial_log, reader->partial_log_size);
-               if (r <= 0)
-                       return r != 0 && errno != EAGAIN;
-
-               if (r < reader->partial_log_size) {
-                       reader->partial_log_size -= r;
-                       memmove(reader->partial_log, reader->partial_log + r, reader->partial_log_size);
-                       return 0;
-               }
-
-               reader->partial_log_size = 0;
-       }
+       int r = reader_flush(_reader, (struct timespec){0, 0}, 0);
+       if (r > 0)
+               return 1;
+       if (r < 0)
+               return 0;
 
        while (log_storage_reader_is_new_entry_available(reader->log_storage_reader_ptr)) {
                const dlogutil_entry_s *ple = (const dlogutil_entry_s *)log_storage_reader_get_new_entry(reader->log_storage_reader_ptr);
index 5ad4dbd..cba2f9e 100644 (file)
@@ -14,16 +14,13 @@ struct reader_pipe {
        struct log_buffer *buf_ptr;
        log_storage_reader *log_storage_reader_ptr;
        struct timespec last_read_time;
-       int partial_log_size;
-       char partial_log[sizeof(struct dlogutil_entry_with_msg)];
        bool is_dumping;
        bool monitor;
-       struct log_filter *filter;
 };
 
 int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger *server,
-       struct log_filter *filter, bool monitor, bool is_dumping);
+       bool monitor, bool is_dumping);
 int reader_pipe_init_with_writer(struct reader_pipe **reader, struct writer *writer, struct logger *server,
-       struct log_filter *filter, bool monitor, bool is_dumping);
+       bool monitor, bool is_dumping);
 int reader_should_buffer(struct reader_pipe *reader, const struct buf_params *buf_params, struct timespec now);
 int reader_print_out_single_log(struct reader_pipe *reader, const dlogutil_entry_s *dlogutil_entry);
diff --git a/src/logger/subreader_dlogutil.c b/src/logger/subreader_dlogutil.c
new file mode 100644 (file)
index 0000000..ed5531c
--- /dev/null
@@ -0,0 +1,119 @@
+#include "subreader_dlogutil.h"
+#include "logger_internal.h"
+
+static void subreader_dlogutil_free(void *userdata)
+{
+       struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) userdata;
+       assert(srdu);
+
+       /* No close(pipe_fd)! It is already contained in
+        * reader_common's sink FD entity (pipes can get
+        * clogged so they must be exposed to epoll) and
+        * gets closed that way.
+        *
+        * This is a bit suboptimal (since a reader can
+        * only have one subreader that requires to use
+        * the FD entity) but in practice it's a 1 to 1
+        * relationship anyway so far, and would create
+        * problems (what if one subreader can continue
+        * and another does not?) so I'm happy with it. */
+}
+
+static int subreader_dlogutil_flush(void *userdata, struct timespec ts, int flush_time)
+{
+       struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) userdata;
+       assert(srdu);
+
+       if (!srdu->partial_log_size)
+               return 0;
+
+       int r = write(srdu->pipe_fd, srdu->partial_log, srdu->partial_log_size);
+       if (r <= 0)
+               return (r != 0 && errno != EAGAIN) ? 1 : -1;
+
+       if (r < srdu->partial_log_size) {
+               srdu->partial_log_size -= r;
+               memmove(srdu->partial_log, srdu->partial_log + r, srdu->partial_log_size);
+               return -1;
+       }
+
+       srdu->partial_log_size = 0;
+       return 0;
+}
+
+static int subreader_dlogutil_apply_log(const struct subreader_common *sub, const struct dlogutil_entry *due)
+{
+       assert(sub);
+       assert(due);
+
+       struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) sub->sub_userdata;
+       assert(srdu);
+
+       const char *tag = due->msg + 1;
+       if (!strlen(tag))
+               return 0;
+
+       int r = subreader_dlogutil_flush(srdu, (struct timespec) {0, 0}, 0);
+       if (r) {
+               /* Shouldn't really happen, since we only retry after
+                * we get an event that the pipe got unclogged. Maybe
+                * if the pipe only gets unclogged by, say, 1 byte a
+                * few times in a row and we try to append a bunch of
+                * full logs to the buffer, the buffer can get clogged
+                * as well? Drop the log in that case; realistically
+                * that shouldn't happen though. */
+
+               if (srdu->partial_log_size + due->len >= sizeof srdu->partial_log)
+                       return r;
+
+               memcpy(srdu->partial_log + srdu->partial_log_size, (char *) due, due->len);
+               srdu->partial_log_size += due->len;
+
+               return r;
+       }
+
+       r = write(srdu->pipe_fd, due, due->len);
+       if (r < 0) {
+               if (errno != EAGAIN)
+                       return 1;
+
+               /* The pipe is just clogged, this is not an actual error.
+                * We own the entry so it needs to be saved for later. */
+               r = 0;
+       }
+
+       if (r < due->len) {
+               srdu->partial_log_size = due->len - r;
+               memcpy(srdu->partial_log, ((char *)due) + r, srdu->partial_log_size);
+               return -1;
+       }
+
+       return 0;
+}
+
+int reader_add_subreader_dlogutil(struct reader_common *reader, struct log_filter *filter, int pipe_fd)
+{
+       assert(reader);
+       assert(pipe_fd >= 0);
+
+       struct subreader_common *const sub = malloc(sizeof *sub);
+       struct subreader_dlogutil *const srdu = malloc(sizeof *srdu);
+       if (!sub || !srdu) {
+               free(sub);
+               free(srdu);
+               return -ENOMEM;
+       }
+
+       srdu->pipe_fd = pipe_fd;
+       srdu->partial_log_size = 0;
+
+       sub->sub_userdata = srdu;
+       sub->sub_destroy = subreader_dlogutil_free;
+       sub->sub_apply_log = subreader_dlogutil_apply_log;
+       sub->sub_flush = subreader_dlogutil_flush;
+       sub->filter = log_filter_move(filter);
+
+       list_add(&reader->subs, sub);
+
+       return 0;
+}
diff --git a/src/logger/subreader_dlogutil.h b/src/logger/subreader_dlogutil.h
new file mode 100644 (file)
index 0000000..91fa654
--- /dev/null
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "reader_common.h"
+#include "log_storage.h"
+#include "queued_entry_timestamp.h"
+
+struct subreader_dlogutil {
+       int pipe_fd;
+       int partial_log_size;
+       char partial_log[sizeof(struct dlogutil_entry_with_msg)];
+};
+
+int reader_add_subreader_dlogutil(struct reader_common *reader, struct log_filter *filter, int pipe_fd);
+