1 #include "reader_pipe.h"
3 #include <buffer_traits.h>
4 #include "logger_internal.h"
6 static void reader_pipe_free(struct reader_common *_reader)
8 struct reader_pipe *reader = (struct reader_pipe *) _reader;
10 if (reader->common.server)
11 list_remove(&reader->common.server->readers, reader);
13 for (log_id_t i = 0; i < NELEMS(reader->log_storage_reader_ptrs); ++i) {
14 log_storage_reader *const lsr = reader->log_storage_reader_ptrs[i];
17 log_storage_release_reader(lsr);
21 static int print_out_logs(struct reader_common *_reader, struct now_t _time);
23 static void reader_pipe_notify_losing_log(const dlogutil_entry_s *le, void *reader_)
25 struct reader_pipe *reader = (struct reader_pipe *) reader_;
28 reader_pipe_print_out_single_log(reader, (dlogutil_entry_s *)le);
31 static struct reader_pipe *reader_pipe_alloc(struct logger *server, bool monitor, bool is_dumping)
33 struct reader_pipe *ret = calloc(1, sizeof(*ret));
37 reader_common_init(&ret->common, server);
39 ret->common.free_reader = reader_pipe_free;
40 ret->common.service_reader = print_out_logs;
42 for (log_id_t i = 0; i < NELEMS(ret->log_storage_reader_ptrs); ++i)
43 ret->log_storage_reader_ptrs[i] = NULL;
48 int reader_pipe_init(struct reader_pipe **reader, int enabled_buffers, struct logger *server,
49 bool monitor, bool is_dumping)
54 __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(server, monitor, is_dumping);
58 for (int i = 0; i < LOG_ID_MAX; ++i) {
59 if (!(enabled_buffers & (1 << i)))
62 const struct log_buffer *const buffer = server->buffers[i];
66 log_storage_reader *const lsr = log_storage_new_reader(buffer->log_storage_ptr,
67 is_dumping, monitor, reader_pipe_notify_losing_log, ret);
70 ret->log_storage_reader_ptrs[i] = lsr;
73 init_fd_entity(&ret->common.fd_entity_sink, dispatch_event_reader, ret);
74 init_fd_entity(&ret->common.fd_entity_source, dispatch_event_reader, ret);
81 int reader_pipe_init_with_writer(struct reader_pipe **reader, struct writer *writer, struct logger *server,
82 bool monitor, bool is_dumping)
86 assert(writer->buf_ptr);
89 __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *ret = reader_pipe_alloc(server, monitor, is_dumping);
93 log_storage_reader *const lsr = log_storage_new_reader(writer->buf_ptr->log_storage_ptr,
94 is_dumping, monitor, reader_pipe_notify_losing_log, ret);
97 ret->log_storage_reader_ptrs[0] = lsr;
99 init_fd_entity(&ret->common.fd_entity_sink, dispatch_event_reader, ret);
100 init_fd_entity(&ret->common.fd_entity_source, dispatch_event_reader, ret);
107 int reader_pipe_print_out_single_log(struct reader_pipe *reader, const dlogutil_entry_s *dlogutil_entry)
110 assert(dlogutil_entry);
112 return reader_apply_log_to_subs(&reader->common, dlogutil_entry);
116 * @brief Print out logs
117 * @details Make sure the reader is up to date on printed logs
118 * @param[in] reader The reader to read the data
119 * @param[in] _time Unused timestamps
120 * @return 0 if data remains for the next iteration, 1 if the buffer is to be removed, else -1
122 static int print_out_logs(struct reader_common *_reader, struct now_t _time)
124 struct reader_pipe *const reader = (struct reader_pipe *) _reader;
127 int r = reader_flush(_reader, (struct timespec){0, 0}, 0);
134 log_storage_reader *best_lsr = NULL;
135 const dlogutil_entry_s *best_entry = NULL;
136 bool any_reader_exists = false;
137 for (log_id_t i = 0; i < NELEMS(reader->log_storage_reader_ptrs); ++i) {
138 log_storage_reader *const lsr = reader->log_storage_reader_ptrs[i];
142 /* We check this here (in the loop, as opposed to after fetching
143 * a log just for that storage reader) because storage readers can
144 * start finished if there are no logs (so would never reach the
145 * fetch phase). Ideally this would be detected at creation instead
146 * of here, but I am under a tight deadline and have no mana for that. */
147 if (log_storage_reader_is_finished(lsr)) {
148 log_storage_release_reader(lsr);
149 reader->log_storage_reader_ptrs[i] = NULL;
152 any_reader_exists = true;
154 if (!log_storage_reader_is_new_entry_available(lsr))
157 const dlogutil_entry_s *const entry = log_storage_reader_peek_entry(lsr);
160 if (!best_entry || log_entry_is_earlier(reader->common.server->sort_by, entry, best_entry)) {
167 if (!any_reader_exists)
172 const dlogutil_entry_s *ple = (const dlogutil_entry_s *)log_storage_reader_get_new_entry(best_lsr);
175 assert(ple == best_entry);
177 switch (reader_pipe_print_out_single_log(reader, ple)) {
178 case 0: /* nothing more to do, let's do next loop */
181 case 1: /* error after which we need to end the reader */
184 default: /* writing error, bounce out */