src/logger/reader_common.c \
src/logger/reader_logger.c \
src/logger/reader_pipe.c \
+ src/logger/subreader_dlogutil.c \
src/logger/subreader_file.c \
src/logger/subreader_metrics.c \
src/logger/socket.c \
#include "log_buffer.h"
#include "logger_internal.h"
+#include "subreader_dlogutil.h"
#include <ptrs_list.h>
/**
goto cleanup;
}
- retval = reader_pipe_init_with_writer(&reader, wr, server, params.filter, params.monitor, params.is_dumping);
+ retval = reader_pipe_init_with_writer(&reader, wr, server, params.monitor, params.is_dumping);
if (retval != 0)
goto cleanup;
if (retval < 0)
goto cleanup;
+ retval = reader_add_subreader_dlogutil(&reader->common, params.filter, pipe_fd[1]);
+ if (retval != 0)
+ goto cleanup;
+
set_write_fd_entity(&reader->common.fd_entity_sink, pipe_fd[1]);
retval = send_pipe(wr->fd_entity.fd, pipe_fd[0]);
if (pipe_fd[0] > 0)
if (params->file.path == NULL)
return -EINVAL;
- retval = reader_pipe_init(&reader, params->buf_id, server, params->filter, params->monitor, params->is_dumping);
+ retval = reader_pipe_init(&reader, params->buf_id, server, params->monitor, params->is_dumping);
if (retval != 0)
return retval;
if (reader->log_storage_reader_ptr)
log_storage_release_reader(reader->log_storage_reader_ptr);
- if (reader->filter)
- log_filter_free(reader->filter);
}
static int print_out_logs(struct reader_common *_reader, struct now_t _time);
}
}
-static struct reader_pipe *reader_pipe_alloc(struct log_filter *filter, struct timespec ts,
- bool monitor, bool is_dumping)
+static struct reader_pipe *reader_pipe_alloc(struct timespec ts, bool monitor, bool is_dumping)
{
struct reader_pipe *ret = calloc(1, sizeof(*ret));
if (!ret)
ret->common.free_reader = reader_pipe_free;
ret->common.service_reader = print_out_logs;
- ret->filter = log_filter_from_filter(filter);
ret->monitor = monitor;
ret->is_dumping = is_dumping;
ret->buf_ptr = NULL;
ret->log_storage_reader_ptr = NULL;
ret->last_read_time = ts;
- ret->partial_log_size = 0;
return ret;
}
int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger *server,
- struct log_filter *filter, bool monitor, bool is_dumping)
+ bool monitor, bool is_dumping)
{
assert(reader);
assert(buf_id > LOG_ID_INVALID);
assert(buf_id < LOG_ID_MAX);
assert(server);
- assert(filter);
struct timespec ts_mono;
if (clock_gettime(CLOCK_MONOTONIC, &ts_mono))
return -errno;
- __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(filter, ts_mono, monitor, is_dumping);
+ __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(ts_mono, monitor, is_dumping);
if (!ret)
return -ENOMEM;
}
int reader_pipe_init_with_writer(struct reader_pipe **reader, struct writer *writer, struct logger *server,
- struct log_filter *filter, bool monitor, bool is_dumping)
+ bool monitor, bool is_dumping)
{
assert(reader);
assert(writer);
assert(writer->buf_ptr);
assert(server);
- assert(filter);
struct timespec ts_mono;
if (clock_gettime(CLOCK_MONOTONIC, &ts_mono))
return -errno;
- __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(filter, ts_mono, monitor, is_dumping);
+ __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(ts_mono, monitor, is_dumping);
if (!ret)
return -ENOMEM;
assert(reader->buf_ptr);
assert(dlogutil_entry);
- if (!log_should_print_line(reader->filter, dlogutil_entry))
- return 0;
-
- if (reader->common.subs != NULL)
- return list_foreach_ret(reader->common.subs, (dlogutil_entry_s *) dlogutil_entry, subreader_apply_log);
-
- const char *tag = dlogutil_entry->msg + 1;
- if (!strlen(tag))
- return 0;
-
- int r = write(reader->common.fd_entity_sink.fd, dlogutil_entry, dlogutil_entry->len);
- if (r < 0) {
- if (errno != EAGAIN)
- return 1;
-
- /* The pipe is just clogged, this is not an actual error.
- * We own the entry so it needs to be saved for later. */
- r = 0;
- }
-
- if (r < dlogutil_entry->len) {
- reader->partial_log_size = dlogutil_entry->len - r;
- memcpy(reader->partial_log, ((char *)dlogutil_entry) + r, reader->partial_log_size);
- return -1;
- }
-
- return 0;
+ return list_foreach_ret(reader->common.subs, (dlogutil_entry_s *) dlogutil_entry, subreader_apply_log);
}
/**
assert(reader->buf_ptr);
- if (reader->partial_log_size) {
- int r = write(reader->common.fd_entity_sink.fd, reader->partial_log, reader->partial_log_size);
- if (r <= 0)
- return r != 0 && errno != EAGAIN;
-
- if (r < reader->partial_log_size) {
- reader->partial_log_size -= r;
- memmove(reader->partial_log, reader->partial_log + r, reader->partial_log_size);
- return 0;
- }
-
- reader->partial_log_size = 0;
- }
+ int r = reader_flush(_reader, (struct timespec){0, 0}, 0);
+ if (r > 0)
+ return 1;
+ if (r < 0)
+ return 0;
while (log_storage_reader_is_new_entry_available(reader->log_storage_reader_ptr)) {
const dlogutil_entry_s *ple = (const dlogutil_entry_s *)log_storage_reader_get_new_entry(reader->log_storage_reader_ptr);
struct log_buffer *buf_ptr;
log_storage_reader *log_storage_reader_ptr;
struct timespec last_read_time;
- int partial_log_size;
- char partial_log[sizeof(struct dlogutil_entry_with_msg)];
bool is_dumping;
bool monitor;
- struct log_filter *filter;
};
int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger *server,
- struct log_filter *filter, bool monitor, bool is_dumping);
+ bool monitor, bool is_dumping);
int reader_pipe_init_with_writer(struct reader_pipe **reader, struct writer *writer, struct logger *server,
- struct log_filter *filter, bool monitor, bool is_dumping);
+ bool monitor, bool is_dumping);
int reader_should_buffer(struct reader_pipe *reader, const struct buf_params *buf_params, struct timespec now);
int reader_print_out_single_log(struct reader_pipe *reader, const dlogutil_entry_s *dlogutil_entry);
--- /dev/null
+#include "subreader_dlogutil.h"
+#include "logger_internal.h"
+
+static void subreader_dlogutil_free(void *userdata)
+{
+ struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) userdata;
+ assert(srdu);
+
+ /* No close(pipe_fd)! It is already contained in
+ * reader_common's sink FD entity (pipes can get
+ * clogged so they must be exposed to epoll) and
+ * gets closed that way.
+ *
+ * This is a bit suboptimal (since a reader can
+ * only have one subreader that requires to use
+ * the FD entity) but in practice it's a 1 to 1
+ * relationship anyway so far, and would create
+ * problems (what if one subreader can continue
+ * and another does not?) so I'm happy with it. */
+}
+
+static int subreader_dlogutil_flush(void *userdata, struct timespec ts, int flush_time)
+{
+ struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) userdata;
+ assert(srdu);
+
+ if (!srdu->partial_log_size)
+ return 0;
+
+ int r = write(srdu->pipe_fd, srdu->partial_log, srdu->partial_log_size);
+ if (r <= 0)
+ return (r != 0 && errno != EAGAIN) ? 1 : -1;
+
+ if (r < srdu->partial_log_size) {
+ srdu->partial_log_size -= r;
+ memmove(srdu->partial_log, srdu->partial_log + r, srdu->partial_log_size);
+ return -1;
+ }
+
+ srdu->partial_log_size = 0;
+ return 0;
+}
+
+static int subreader_dlogutil_apply_log(const struct subreader_common *sub, const struct dlogutil_entry *due)
+{
+ assert(sub);
+ assert(due);
+
+ struct subreader_dlogutil *const srdu = (struct subreader_dlogutil *) sub->sub_userdata;
+ assert(srdu);
+
+ const char *tag = due->msg + 1;
+ if (!strlen(tag))
+ return 0;
+
+ int r = subreader_dlogutil_flush(srdu, (struct timespec) {0, 0}, 0);
+ if (r) {
+ /* Shouldn't really happen, since we only retry after
+ * we get an event that the pipe got unclogged. Maybe
+ * if the pipe only gets unclogged by, say, 1 byte a
+ * few times in a row and we try to append a bunch of
+ * full logs to the buffer, the buffer can get clogged
+ * as well? Drop the log in that case; realistically
+ * that shouldn't happen though. */
+
+ if (srdu->partial_log_size + due->len >= sizeof srdu->partial_log)
+ return r;
+
+ memcpy(srdu->partial_log + srdu->partial_log_size, (char *) due, due->len);
+ srdu->partial_log_size += due->len;
+
+ return r;
+ }
+
+ r = write(srdu->pipe_fd, due, due->len);
+ if (r < 0) {
+ if (errno != EAGAIN)
+ return 1;
+
+ /* The pipe is just clogged, this is not an actual error.
+ * We own the entry so it needs to be saved for later. */
+ r = 0;
+ }
+
+ if (r < due->len) {
+ srdu->partial_log_size = due->len - r;
+ memcpy(srdu->partial_log, ((char *)due) + r, srdu->partial_log_size);
+ return -1;
+ }
+
+ return 0;
+}
+
+int reader_add_subreader_dlogutil(struct reader_common *reader, struct log_filter *filter, int pipe_fd)
+{
+ assert(reader);
+ assert(pipe_fd >= 0);
+
+ struct subreader_common *const sub = malloc(sizeof *sub);
+ struct subreader_dlogutil *const srdu = malloc(sizeof *srdu);
+ if (!sub || !srdu) {
+ free(sub);
+ free(srdu);
+ return -ENOMEM;
+ }
+
+ srdu->pipe_fd = pipe_fd;
+ srdu->partial_log_size = 0;
+
+ sub->sub_userdata = srdu;
+ sub->sub_destroy = subreader_dlogutil_free;
+ sub->sub_apply_log = subreader_dlogutil_apply_log;
+ sub->sub_flush = subreader_dlogutil_flush;
+ sub->filter = log_filter_move(filter);
+
+ list_add(&reader->subs, sub);
+
+ return 0;
+}
--- /dev/null
+#pragma once
+
+#include "reader_common.h"
+#include "log_storage.h"
+#include "queued_entry_timestamp.h"
+
+struct subreader_dlogutil {
+ int pipe_fd;
+ int partial_log_size;
+ char partial_log[sizeof(struct dlogutil_entry_with_msg)];
+};
+
+int reader_add_subreader_dlogutil(struct reader_common *reader, struct log_filter *filter, int pipe_fd);
+