Split various log_buffer related stuff to separate file 29/242129/2
authorMateusz Majewski <m.majewski2@samsung.com>
Mon, 24 Aug 2020 09:48:23 +0000 (11:48 +0200)
committerMichal Bloch <m.bloch@partner.samsung.com>
Mon, 24 Aug 2020 10:33:56 +0000 (10:33 +0000)
Change-Id: If20af3642f882f1cd3357cb5a726d1d98db4232a

Makefile.am
src/logger/log_buffer.c [new file with mode: 0644]
src/logger/log_buffer.h [new file with mode: 0644]
src/logger/logger.c
src/logger/logger_internal.h

index 89aba76..eaba195 100644 (file)
@@ -127,6 +127,7 @@ dlog_logger_SOURCES = \
        src/logger/logger.c \
        src/logger/dlogutil_line.c \
        src/logger/fd_entity.c \
+       src/logger/log_buffer.c \
        src/logger/log_storage.c \
        src/logger/qos.c \
        src/logger/qos_distributions.c \
diff --git a/src/logger/log_buffer.c b/src/logger/log_buffer.c
new file mode 100644 (file)
index 0000000..637473b
--- /dev/null
@@ -0,0 +1,346 @@
+#include "log_buffer.h"
+#include "logger_internal.h"
+
+/**
+ * @brief Append to buffer
+ * @details Appends an entry to the buffer
+ * @param[in] s The logger server
+ * @param[in] entry The entry to append
+ * @param[in] b The buffer whither to append
+ */
+int buffer_append(const dlogutil_entry_s *entry, struct log_buffer *b)
+{
+       if (!log_storage_add_new_entry(b->log_storage_ptr, entry))
+               return -ENOMEM;
+       return 0;
+}
+
+/**
+ * @brief Service pipe log data
+ * @details Handle log messages incoming through a pipe
+ * @param[in] server The logger server
+ * @param[in] wr The writer who sent the logs
+ * @param[in] event The event associated with the data
+ * @return 0 on success, else -errno
+ */
+static int service_writer_pipe(struct logger *server, struct writer *wr, struct epoll_event *event)
+{
+       if (event->events & EPOLLIN) {
+               int r = read(wr->fd_entity.fd, wr->buffer + wr->readed, sizeof wr->buffer - wr->readed);
+
+               if (r == -1 && errno == EAGAIN)
+                       return 0;
+               else if ((r == 0 || r == -1) && event->events & EPOLLHUP)
+                       return -EINVAL;
+               else if (r == 0)
+                       return -EBADF;
+
+               wr->readed += r;
+
+               struct pipe_logger_entry *const ple = (struct pipe_logger_entry *const)wr->buffer;
+               while ((wr->readed >= sizeof(ple->len)) && (ple->len <= wr->readed)) {
+                       const int payload_size = ple->len - sizeof *ple;
+                       if (payload_size < 0 || payload_size > LOG_MAX_PAYLOAD_SIZE)
+                               return -EINVAL;
+
+                       struct dlogutil_entry_with_msg lem;
+                       parse_pipe_message(ple, &lem.header, ple->len);
+                       add_recv_timestamp(&lem.header, server->time);
+                       fixup_pipe_msg(&lem, payload_size);
+                       if (qos_is_enabled(&server->qos))
+                               qos_add_log(&server->qos, &lem.header);
+                       r = buffer_append(&lem.header, wr->buf_ptr);
+                       wr->readed -= ple->len;
+                       memmove(wr->buffer, wr->buffer + ple->len, sizeof wr->buffer - ple->len);
+
+                       if (r)
+                               return r;
+               }
+       } else if (event->events & EPOLLHUP)
+               return -EBADF;
+
+       return 0;
+}
+
+/**
+ * @brief Service util request
+ * @details Handle a request from util
+ * @param[in] server The logger server
+ * @param[in] wr The writer who sent the request
+ * @param[in] msg The message containing the request
+ * @return 0 on success, else -errno
+ */
+static int service_writer_handle_req_util(struct logger* server, struct writer* wr, struct dlog_control_msg* msg)
+{
+       assert(server);
+       assert(wr);
+       assert(msg);
+
+       // check request type, that should be always DLOG_REQ_HANDLE_LOGUTIL
+       // as dispatched by service_writer_handle_req_ctrl handler
+       // don't assert for compatibility with service_writer_handle_req_pipe
+       // and possible mistakes in the future that would be hard to track
+       if (msg->request != DLOG_REQ_HANDLE_LOGUTIL)
+               return -EINVAL;
+
+       if (msg->length <= sizeof(struct dlog_control_msg) ||
+           msg->length > sizeof(struct dlog_control_msg) + MAX_LOGGER_REQUEST_LEN)
+               return -EINVAL;
+
+       if (msg->data[msg->length - sizeof(struct dlog_control_msg)] != 0)
+               return -EINVAL;
+
+       __attribute__((cleanup(reader_pipe_cleanup))) struct reader_pipe *reader = NULL;
+
+       int retval;
+       __attribute__((cleanup(free_dlogutil_line_params))) struct dlogutil_line_params params;
+       if (!initialize_dlogutil_line_params(&params)) {
+               retval = -ENOMEM;
+               goto cleanup;
+       }
+
+       retval = get_dlogutil_line_params(msg->data, &params);
+       if (retval < 0) {
+               retval = -ENOMEM;
+               goto cleanup;
+       }
+
+       if (params.file_path) {
+               /* Do not trust writer-based readers (only config-based).
+                * The control socket's privilege checks are fairly lenient
+                * so this prevents people from asking us to overwrite
+                * some potentially important files at logger privilege.
+                *
+                * At some point it would be good to be able to skip the
+                * middleman and become able to write to a file directly
+                * though. The daemon should become able to receive an
+                * opened file descriptor from a writer. */
+               retval = -EPERM;
+               goto cleanup;
+       }
+
+       retval = reader_pipe_init_with_writer(&reader, wr, server, params.filter, &params.file, params.monitor, params.is_dumping);
+       if (retval != 0)
+               goto cleanup;
+
+       int write_fd = -1, read_fd = -1;
+       retval = create_fifo_fds(server, wr->fd_entity.fd, &write_fd, &read_fd, reader->is_dumping);
+       if (retval < 0)
+               goto cleanup;
+
+       set_write_fd_entity(&reader->common.fd_entity_sink, write_fd);
+       retval = send_pipe(wr->fd_entity.fd, read_fd);
+       if (read_fd > 0)
+               close(read_fd);
+       if (retval)
+               goto cleanup;
+
+       retval = add_reader_pipe(server, reader);
+       if (retval < 0)
+               goto cleanup;
+
+       reader = NULL;
+       return 0;
+
+cleanup:
+       retval = send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_HANDLE_LOGUTIL, DLOG_REQ_RESULT_ERR, NULL, 0);
+       if (retval < 0)
+               printf("ERROR: both create_reader_from_dlogutil_line() and send_dlog_reply() failed\n");
+
+       return retval;
+}
+
+static int service_writer_handle_req_get_usage(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
+{
+       assert(server);
+       assert(wr);
+       assert(msg);
+       assert(msg->request == DLOG_REQ_GET_USAGE);
+
+       uint32_t buf_size = log_storage_get_usage(wr->buf_ptr->log_storage_ptr);
+
+       return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_USAGE, DLOG_REQ_RESULT_OK, &buf_size, sizeof buf_size);
+}
+
+static int service_writer_handle_req_get_capacity(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
+{
+       assert(server);
+       assert(wr);
+       assert(msg);
+       assert(msg->request == DLOG_REQ_GET_CAPACITY);
+
+       uint32_t buf_size = log_storage_get_capacity(wr->buf_ptr->log_storage_ptr);
+
+       return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_CAPACITY, DLOG_REQ_RESULT_OK, &buf_size, sizeof buf_size);
+}
+
+/**
+ * @brief Service clear request
+ * @details Handle a clear-buffer request
+ * @param[in] server The logger server
+ * @param[in] wr The writer who sent the request
+ * @param[in] msg The message containing the request
+ * @return 0 on success, else -errno
+ */
+static int service_writer_handle_req_clear(struct logger* server, struct writer* wr, struct dlog_control_msg* msg)
+{
+       (void) server;
+       assert(msg);
+
+       // check request type, that should be always DLOG_REQ_CLEAR
+       // as dispatched by service_writer_handle_req_ctrl handler
+       // don't assert for compatibility with service_writer_handle_req_pipe
+       // and possible mistakes in the future that would be hard to track
+       if (msg->request != DLOG_REQ_CLEAR)
+               return -EINVAL;
+
+       if (msg->length != (sizeof(struct dlog_control_msg)))
+               return -EINVAL;
+
+       if (!wr || !wr->buf_ptr)
+               return -EINVAL;
+
+       log_storage_clear(wr->buf_ptr->log_storage_ptr);
+
+       return 0;
+}
+
+/**
+ * @brief Service control request
+ * @details Handle a clear-buffer or util request in respect to msg request type
+ * @param[in] server The logger server
+ * @param[in] wr The writer who sent the request
+ * @param[in] msg The message containing the request
+ * @return 0 on success, else -errno
+ */
+static int service_writer_handle_req_ctrl(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
+{
+       assert(server);
+       assert(wr);
+       assert(msg);
+
+       int ret;
+       switch (msg->request) {
+               case DLOG_REQ_CLEAR:          ret = service_writer_handle_req_clear       (server, wr, msg); break;
+               case DLOG_REQ_HANDLE_LOGUTIL: ret = service_writer_handle_req_util        (server, wr, msg); break;
+               case DLOG_REQ_GET_CAPACITY:   ret = service_writer_handle_req_get_capacity(server, wr, msg); break;
+               case DLOG_REQ_GET_USAGE:      ret = service_writer_handle_req_get_usage   (server, wr, msg); break;
+
+               default: ret = -EINVAL;
+       }
+
+       if (wr->readed > msg->length) {
+               wr->readed -= msg->length;
+               memmove(wr->buffer, wr->buffer + msg->length, wr->readed);
+       } else
+               wr->readed = 0;
+
+       return ret;
+}
+
+/**
+ * @brief Service a pipe acquisition request
+ * @details Handle a pipe request
+ * @param[in] server The logger server
+ * @param[in] wr The writer who sent the request
+ * @param[in] msg The message containing the request
+ * @return 0 on success, else -errno
+ */
+static int service_writer_handle_req_pipe(struct logger* server, struct writer* wr, struct dlog_control_msg* msg)
+{
+       int r;
+
+       assert(msg);
+
+       // check request type given by user
+       // don't assert that as the message is not parsed before
+       if (msg->request != DLOG_REQ_PIPE)
+               return -EINVAL;
+
+       if (msg->length != sizeof(struct dlog_control_msg))
+               return -EINVAL;
+
+       int pipe_fd[2];
+       if (pipe2(pipe_fd, O_CLOEXEC | O_NONBLOCK) < 0) { // O_NONBLOCK just for pipe_fd[0]; writer removes it for pipe_fd[1] on its own
+               check_if_fd_limit_reached(server, errno);
+               return -errno;
+       }
+
+       if (fcntl(pipe_fd[1], F_SETPIPE_SZ, PIPE_REQUESTED_SIZE) < 0) {
+               /* Ignore failures. This call is just a performance optimisation
+                * and doesn't affect functionality; we can't do anything about
+                * an error anyway. */
+       }
+
+       assert(server);
+       assert(wr);
+
+       struct fd_entity pipe_entity = wr->fd_entity;
+       set_read_fd_entity(&pipe_entity, pipe_fd[0]);
+       r = add_fd_entity(&server->epoll_common, &pipe_entity);
+       if (r < 0)
+               goto err_close;
+
+       r = send_pipe(wr->fd_entity.fd, pipe_fd[1]);
+       if (r < 0)
+               goto err_remove;
+       close(pipe_fd[1]);
+
+       writer_close_fd(server, wr);
+       wr->service_writer = service_writer_pipe;
+       wr->fd_entity = pipe_entity;
+       wr->readed = 0;
+       return 0;
+
+err_remove:
+       remove_fd_entity(&server->epoll_common, &pipe_entity);
+
+err_close:
+       close(pipe_fd[0]);
+       close(pipe_fd[1]);
+
+       return r;
+}
+
+/**
+ * @brief Create buffer
+ * @details Allocate a buffer structure
+ * @param[out] lb The newly-allocated buffer
+ * @param[in] buf_id The buffer ID
+ * @param[in] data Buffer config data
+ * @return 0 on success, -errno on failure
+ */
+int buffer_create(struct log_buffer **log_buffer, log_id_t buf_id, struct buffer_config_data *data)
+{
+       assert(data);
+       struct log_buffer *lb = (struct log_buffer *) calloc(1, sizeof(*lb));
+
+       if (!lb)
+               return -ENOMEM;
+
+       lb->id = buf_id;
+       lb->log_storage_ptr = log_storage_create(data->size, data->sort_by);
+       if (!lb->log_storage_ptr) {
+               free(lb);
+               return -ENOMEM;
+       }
+
+       int r;
+       r = socket_initialize(&lb->sock_ctl, lb, service_writer_handle_req_ctrl, &data->ctl_socket);
+       if (r < 0) {
+               log_storage_free(lb->log_storage_ptr);
+               free(lb);
+               return r;
+       }
+
+       r = socket_initialize(&lb->sock_wr, lb, service_writer_handle_req_pipe, &data->write_socket);
+       if (r < 0) {
+               socket_close(&lb->sock_ctl);
+               log_storage_free(lb->log_storage_ptr);
+               free(lb);
+               return r;
+       }
+
+       *log_buffer = lb;
+       return 0;
+}
diff --git a/src/logger/log_buffer.h b/src/logger/log_buffer.h
new file mode 100644 (file)
index 0000000..2ab93fc
--- /dev/null
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <ptrs_list.h>
+#include "socket.h"
+#include "log_storage.h"
+
+struct log_buffer {
+       struct sock_data   sock_wr;
+       struct sock_data   sock_ctl;
+       list_head          readers_pipe;
+       log_id_t           id;
+       dlogutil_sorting_order_e sort_by;
+       log_storage       *log_storage_ptr;
+};
+
+struct buffer_config_data {
+       int size;
+       struct socket_config_data write_socket;
+       struct socket_config_data ctl_socket;
+       dlogutil_sorting_order_e sort_by;
+};
+
+int buffer_create(struct log_buffer **log_buffer, log_id_t buf_id, struct buffer_config_data *data);
+int buffer_append(const dlogutil_entry_s *entry, struct log_buffer *b);
index bd3d93a..2895d92 100644 (file)
  */
 
 // function prototypes
-static int service_writer_pipe(struct logger* server, struct writer* wr, struct epoll_event* event);
 int service_writer_kmsg(struct logger* server, struct writer* wr, struct epoll_event* event);
 int service_writer_syslog(struct logger* server, struct writer* wr, struct epoll_event* event);
-static int service_writer_handle_req_ctrl(struct logger *server, struct writer *wr, struct dlog_control_msg *msg);
-static int service_writer_handle_req_pipe(struct logger *server, struct writer *wr, struct dlog_control_msg *msg);
 void dispatch_event_writer(struct logger *server, struct epoll_event *event, void *userdata);
 static void logger_free(struct logger* l);
 static int initialize_epoll_size(struct epoll_event **events, unsigned *size);
@@ -253,49 +250,6 @@ int create_syslog_writer(struct writer ** writer, struct log_buffer *log_buffer)
        return ret;
 }
 
-/**
- * @brief Create buffer
- * @details Allocate a buffer structure
- * @param[out] lb The newly-allocated buffer
- * @param[in] buf_id The buffer ID
- * @param[in] data Buffer config data
- * @return 0 on success, -errno on failure
- */
-static int buffer_create(struct log_buffer **log_buffer, log_id_t buf_id, struct buffer_config_data *data)
-{
-       assert(data);
-       struct log_buffer *lb = (struct log_buffer *) calloc(1, sizeof(*lb));
-
-       if (!lb)
-               return -ENOMEM;
-
-       lb->id = buf_id;
-       lb->log_storage_ptr = log_storage_create(data->size, data->sort_by);
-       if (!lb->log_storage_ptr) {
-               free(lb);
-               return -ENOMEM;
-       }
-
-       int r;
-       r = socket_initialize(&lb->sock_ctl, lb, service_writer_handle_req_ctrl, &data->ctl_socket);
-       if (r < 0) {
-               log_storage_free(lb->log_storage_ptr);
-               free(lb);
-               return r;
-       }
-
-       r = socket_initialize(&lb->sock_wr, lb, service_writer_handle_req_pipe, &data->write_socket);
-       if (r < 0) {
-               socket_close(&lb->sock_ctl);
-               log_storage_free(lb->log_storage_ptr);
-               free(lb);
-               return r;
-       }
-
-       *log_buffer = lb;
-       return 0;
-}
-
 static bool cond_reader_pipe_free(void *ptr, void *user_data)
 {
        struct reader_pipe *reader = (struct reader_pipe *)ptr;
@@ -366,20 +320,6 @@ void qos_periodic_check(struct logger *server)
 }
 
 /**
- * @brief Append to buffer
- * @details Appends an entry to the buffer
- * @param[in] s The logger server
- * @param[in] entry The entry to append
- * @param[in] b The buffer whither to append
- */
-int buffer_append(const dlogutil_entry_s *entry, struct log_buffer *b)
-{
-       if (!log_storage_add_new_entry(b->log_storage_ptr, entry))
-               return -ENOMEM;
-       return 0;
-}
-
-/**
  * @brief FD limit handler
  * @details Checks whether the FD limit was reached and leaves logs about it if so
  * @param[in] server The logger server
@@ -476,7 +416,7 @@ int add_buffer_reader(struct log_buffer *buffer, struct reader_pipe *reader)
        return list_add(&buffer->readers_pipe, reader) ? 0 : -ENOMEM;
 }
 
-static int add_reader_pipe(struct logger *server, struct reader_pipe *reader)
+int add_reader_pipe(struct logger *server, struct reader_pipe *reader)
 {
        assert(reader);
        assert(server);
@@ -552,7 +492,7 @@ failure:
        return ret;
 }
 
-static int create_fifo_fds(struct logger *server, int fifo_id, int *write_fd, int *read_fd, bool dump)
+int create_fifo_fds(struct logger *server, int fifo_id, int *write_fd, int *read_fd, bool dump)
 {
        assert(write_fd);
        assert(read_fd);
@@ -664,246 +604,6 @@ static int create_reader_pipe_from_dlogutil_line(struct dlogutil_line_params *pa
 }
 
 /**
- * @brief Service util request
- * @details Handle a request from util
- * @param[in] server The logger server
- * @param[in] wr The writer who sent the request
- * @param[in] msg The message containing the request
- * @return 0 on success, else -errno
- */
-static int service_writer_handle_req_util(struct logger* server, struct writer* wr, struct dlog_control_msg* msg)
-{
-       assert(server);
-       assert(wr);
-       assert(msg);
-
-       // check request type, that should be always DLOG_REQ_HANDLE_LOGUTIL
-       // as dispatched by service_writer_handle_req_ctrl handler
-       // don't assert for compatibility with service_writer_handle_req_pipe
-       // and possible mistakes in the future that would be hard to track
-       if (msg->request != DLOG_REQ_HANDLE_LOGUTIL)
-               return -EINVAL;
-
-       if (msg->length <= sizeof(struct dlog_control_msg) ||
-           msg->length > sizeof(struct dlog_control_msg) + MAX_LOGGER_REQUEST_LEN)
-               return -EINVAL;
-
-       if (msg->data[msg->length - sizeof(struct dlog_control_msg)] != 0)
-               return -EINVAL;
-
-       __attribute__((cleanup(reader_pipe_cleanup))) struct reader_pipe *reader = NULL;
-
-       int retval;
-       __attribute__((cleanup(free_dlogutil_line_params))) struct dlogutil_line_params params;
-       if (!initialize_dlogutil_line_params(&params)) {
-               retval = -ENOMEM;
-               goto cleanup;
-       }
-
-       retval = get_dlogutil_line_params(msg->data, &params);
-       if (retval < 0) {
-               retval = -ENOMEM;
-               goto cleanup;
-       }
-
-       if (params.file_path) {
-               /* Do not trust writer-based readers (only config-based).
-                * The control socket's privilege checks are fairly lenient
-                * so this prevents people from asking us to overwrite
-                * some potentially important files at logger privilege.
-                *
-                * At some point it would be good to be able to skip the
-                * middleman and become able to write to a file directly
-                * though. The daemon should become able to receive an
-                * opened file descriptor from a writer. */
-               retval = -EPERM;
-               goto cleanup;
-       }
-
-       retval = reader_pipe_init_with_writer(&reader, wr, server, params.filter, &params.file, params.monitor, params.is_dumping);
-       if (retval != 0)
-               goto cleanup;
-
-       int write_fd = -1, read_fd = -1;
-       retval = create_fifo_fds(server, wr->fd_entity.fd, &write_fd, &read_fd, reader->is_dumping);
-       if (retval < 0)
-               goto cleanup;
-
-       set_write_fd_entity(&reader->common.fd_entity_sink, write_fd);
-       retval = send_pipe(wr->fd_entity.fd, read_fd);
-       if (read_fd > 0)
-               close(read_fd);
-       if (retval)
-               goto cleanup;
-
-       retval = add_reader_pipe(server, reader);
-       if (retval < 0)
-               goto cleanup;
-
-       reader = NULL;
-       return 0;
-
-cleanup:
-       retval = send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_HANDLE_LOGUTIL, DLOG_REQ_RESULT_ERR, NULL, 0);
-       if (retval < 0)
-               printf("ERROR: both create_reader_from_dlogutil_line() and send_dlog_reply() failed\n");
-
-       return retval;
-}
-
-static int service_writer_handle_req_get_usage(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
-{
-       assert(server);
-       assert(wr);
-       assert(msg);
-       assert(msg->request == DLOG_REQ_GET_USAGE);
-
-       uint32_t buf_size = log_storage_get_usage(wr->buf_ptr->log_storage_ptr);
-
-       return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_USAGE, DLOG_REQ_RESULT_OK, &buf_size, sizeof buf_size);
-}
-
-static int service_writer_handle_req_get_capacity(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
-{
-       assert(server);
-       assert(wr);
-       assert(msg);
-       assert(msg->request == DLOG_REQ_GET_CAPACITY);
-
-       uint32_t buf_size = log_storage_get_capacity(wr->buf_ptr->log_storage_ptr);
-
-       return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_CAPACITY, DLOG_REQ_RESULT_OK, &buf_size, sizeof buf_size);
-}
-
-/**
- * @brief Service clear request
- * @details Handle a clear-buffer request
- * @param[in] server The logger server
- * @param[in] wr The writer who sent the request
- * @param[in] msg The message containing the request
- * @return 0 on success, else -errno
- */
-static int service_writer_handle_req_clear(struct logger* server, struct writer* wr, struct dlog_control_msg* msg)
-{
-       (void) server;
-       assert(msg);
-
-       // check request type, that should be always DLOG_REQ_CLEAR
-       // as dispatched by service_writer_handle_req_ctrl handler
-       // don't assert for compatibility with service_writer_handle_req_pipe
-       // and possible mistakes in the future that would be hard to track
-       if (msg->request != DLOG_REQ_CLEAR)
-               return -EINVAL;
-
-       if (msg->length != (sizeof(struct dlog_control_msg)))
-               return -EINVAL;
-
-       if (!wr || !wr->buf_ptr)
-               return -EINVAL;
-
-       log_storage_clear(wr->buf_ptr->log_storage_ptr);
-
-       return 0;
-}
-
-/**
- * @brief Service control request
- * @details Handle a clear-buffer or util request in respect to msg request type
- * @param[in] server The logger server
- * @param[in] wr The writer who sent the request
- * @param[in] msg The message containing the request
- * @return 0 on success, else -errno
- */
-static int service_writer_handle_req_ctrl(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
-{
-       assert(server);
-       assert(wr);
-       assert(msg);
-
-       int ret;
-       switch (msg->request) {
-               case DLOG_REQ_CLEAR:          ret = service_writer_handle_req_clear       (server, wr, msg); break;
-               case DLOG_REQ_HANDLE_LOGUTIL: ret = service_writer_handle_req_util        (server, wr, msg); break;
-               case DLOG_REQ_GET_CAPACITY:   ret = service_writer_handle_req_get_capacity(server, wr, msg); break;
-               case DLOG_REQ_GET_USAGE:      ret = service_writer_handle_req_get_usage   (server, wr, msg); break;
-
-               default: ret = -EINVAL;
-       }
-
-       if (wr->readed > msg->length) {
-               wr->readed -= msg->length;
-               memmove(wr->buffer, wr->buffer + msg->length, wr->readed);
-       } else
-               wr->readed = 0;
-
-       return ret;
-}
-
-/**
- * @brief Service a pipe acquisition request
- * @details Handle a pipe request
- * @param[in] server The logger server
- * @param[in] wr The writer who sent the request
- * @param[in] msg The message containing the request
- * @return 0 on success, else -errno
- */
-static int service_writer_handle_req_pipe(struct logger* server, struct writer* wr, struct dlog_control_msg* msg)
-{
-       int r;
-
-       assert(msg);
-
-       // check request type given by user
-       // don't assert that as the message is not parsed before
-       if (msg->request != DLOG_REQ_PIPE)
-               return -EINVAL;
-
-       if (msg->length != sizeof(struct dlog_control_msg))
-               return -EINVAL;
-
-       int pipe_fd[2];
-       if (pipe2(pipe_fd, O_CLOEXEC | O_NONBLOCK) < 0) { // O_NONBLOCK just for pipe_fd[0]; writer removes it for pipe_fd[1] on its own
-               check_if_fd_limit_reached(server, errno);
-               return -errno;
-       }
-
-       if (fcntl(pipe_fd[1], F_SETPIPE_SZ, PIPE_REQUESTED_SIZE) < 0) {
-               /* Ignore failures. This call is just a performance optimisation
-                * and doesn't affect functionality; we can't do anything about
-                * an error anyway. */
-       }
-
-       assert(server);
-       assert(wr);
-
-       struct fd_entity pipe_entity = wr->fd_entity;
-       set_read_fd_entity(&pipe_entity, pipe_fd[0]);
-       r = add_fd_entity(&server->epoll_common, &pipe_entity);
-       if (r < 0)
-               goto err_close;
-
-       r = send_pipe(wr->fd_entity.fd, pipe_fd[1]);
-       if (r < 0)
-               goto err_remove;
-       close(pipe_fd[1]);
-
-       writer_close_fd(server, wr);
-       wr->service_writer = service_writer_pipe;
-       wr->fd_entity = pipe_entity;
-       wr->readed = 0;
-       return 0;
-
-err_remove:
-       remove_fd_entity(&server->epoll_common, &pipe_entity);
-
-err_close:
-       close(pipe_fd[0]);
-       close(pipe_fd[1]);
-
-       return r;
-}
-
-/**
  * @brief Service a socket request
  * @details Handle a socket request
  * @param[in] server The logger server
@@ -948,53 +648,6 @@ dont_process_yet_and_read_more_data:
 }
 
 /**
- * @brief Service pipe log data
- * @details Handle log messages incoming through a pipe
- * @param[in] server The logger server
- * @param[in] wr The writer who sent the logs
- * @param[in] event The event associated with the data
- * @return 0 on success, else -errno
- */
-static int service_writer_pipe(struct logger *server, struct writer *wr, struct epoll_event *event)
-{
-       if (event->events & EPOLLIN) {
-               int r = read(wr->fd_entity.fd, wr->buffer + wr->readed, sizeof wr->buffer - wr->readed);
-
-               if (r == -1 && errno == EAGAIN)
-                       return 0;
-               else if ((r == 0 || r == -1) && event->events & EPOLLHUP)
-                       return -EINVAL;
-               else if (r == 0)
-                       return -EBADF;
-
-               wr->readed += r;
-
-               struct pipe_logger_entry *const ple = (struct pipe_logger_entry *const)wr->buffer;
-               while ((wr->readed >= sizeof(ple->len)) && (ple->len <= wr->readed)) {
-                       const int payload_size = ple->len - sizeof *ple;
-                       if (payload_size < 0 || payload_size > LOG_MAX_PAYLOAD_SIZE)
-                               return -EINVAL;
-
-                       struct dlogutil_entry_with_msg lem;
-                       parse_pipe_message(ple, &lem.header, ple->len);
-                       add_recv_timestamp(&lem.header, server->time);
-                       fixup_pipe_msg(&lem, payload_size);
-                       if (qos_is_enabled(&server->qos))
-                               qos_add_log(&server->qos, &lem.header);
-                       r = buffer_append(&lem.header, wr->buf_ptr);
-                       wr->readed -= ple->len;
-                       memmove(wr->buffer, wr->buffer + ple->len, sizeof wr->buffer - ple->len);
-
-                       if (r)
-                               return r;
-               }
-       } else if (event->events & EPOLLHUP)
-               return -EBADF;
-
-       return 0;
-}
-
-/**
  * @brief Service /dev/kmsg
  * @details Read from the /dev/kmsg device
  * @param[in] server The logger server
index 8df3d22..482a802 100644 (file)
@@ -36,6 +36,7 @@
 #include "reader_pipe.h"
 #include "socket.h"
 #include "dlogutil_line.h"
+#include "log_buffer.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -148,15 +149,6 @@ struct writer {
 };
 #undef LARGEST_STRUCT
 
-struct log_buffer {
-       struct sock_data   sock_wr;
-       struct sock_data   sock_ctl;
-       list_head          readers_pipe;
-       log_id_t           id;
-       dlogutil_sorting_order_e sort_by;
-       log_storage       *log_storage_ptr;
-};
-
 struct logger {
        struct epoll_metadata epoll_common;
        struct epoll_metadata epoll_socket;
@@ -172,13 +164,6 @@ struct logger {
        struct qos_module     qos;
 };
 
-struct buffer_config_data {
-       int size;
-       struct socket_config_data write_socket;
-       struct socket_config_data ctl_socket;
-       dlogutil_sorting_order_e sort_by;
-};
-
 struct logger_config_data {
        struct buf_params buf_params;
        list_head logfile_configs;
@@ -203,6 +188,9 @@ int writer_create(struct writer **writer, int fd, struct log_buffer *log_buffer,
                                                 service_writer_t service_writer, service_socket_t service_socket);
 int service_writer_socket(struct logger* server, struct writer* wr, struct epoll_event* event);
 void logger_add_writer(struct logger* l, struct writer* wr);
+int create_fifo_fds(struct logger *server, int fifo_id, int *write_fd, int *read_fd, bool dump);
+int add_reader_pipe(struct logger *server, struct reader_pipe *reader);
+void writer_close_fd(struct logger* server, struct writer* wr);
 
 #ifdef __cplusplus
 }