1 #include "reader_pipe.h"
3 #include "logger_internal.h"
5 void reader_pipe_free(struct reader_pipe *reader)
10 reader_deinit_common(&reader->common);
11 logfile_free(&reader->file);
12 if (reader->log_storage_reader_ptr)
13 log_storage_release_reader(reader->log_storage_reader_ptr);
15 log_filter_free(reader->filter);
20 void reader_pipe_cleanup(struct reader_pipe *const *ptr)
23 reader_pipe_free(*ptr);
26 static void dispatch_event_reader_pipe(struct logger *server, struct epoll_event *event, void *userdata)
28 struct reader_pipe *const rp = (struct reader_pipe *) userdata;
31 if (event->events & (EPOLLHUP | EPOLLERR)) {
32 remove_reader_fd_entities(server, &rp->common);
34 list_remove(&rp->buf_ptr->readers_pipe, rp);
39 int r = print_out_logs(rp, server->time);
41 /* TODO: There is no reason not to free the reader in full. However, when I do so, some tests start to
42 * fail without any reasonable reason. You are welcome to *try* to figure out why does this happen. */
43 remove_reader_fd_entities(server, &rp->common);
47 static struct reader_pipe *reader_pipe_alloc(dlogutil_filter_options_s *filter, struct log_file *file, struct timespec ts,
48 bool monitor, bool is_dumping)
50 struct reader_pipe *ret = calloc(1, sizeof(*ret));
54 ret->filter = log_filter_move(filter);
55 ret->monitor = monitor;
56 ret->is_dumping = is_dumping;
57 logfile_move(&ret->file, file);
59 ret->log_storage_reader_ptr = NULL;
60 ret->last_read_time = ts;
61 ret->partial_log_size = 0;
66 int reader_pipe_init(struct reader_pipe **reader, log_id_t buf_id, struct logger *server,
67 dlogutil_filter_options_s *filter, struct log_file *file, bool monitor, bool is_dumping)
70 assert(buf_id > LOG_ID_INVALID);
71 assert(buf_id < LOG_ID_MAX);
75 __attribute__((cleanup(reader_pipe_cleanup))) struct reader_pipe *ret = reader_pipe_alloc(filter, file, server->time.mono, monitor, is_dumping);
79 ret->buf_ptr = server->buffers[buf_id];
83 init_fd_entity(&ret->common.fd_entity_sink, dispatch_event_reader_pipe, ret);
84 init_fd_entity(&ret->common.fd_entity_source, dispatch_event_reader_pipe, ret);
91 int reader_pipe_init_with_writer(struct reader_pipe **reader, struct writer *writer, struct logger *server,
92 dlogutil_filter_options_s *filter, struct log_file *file, bool monitor, bool is_dumping)
96 assert(writer->buf_ptr);
100 __attribute__((cleanup(reader_pipe_cleanup))) struct reader_pipe *ret = reader_pipe_alloc(filter, file, server->time.mono, monitor, is_dumping);
104 ret->buf_ptr = writer->buf_ptr;
106 init_fd_entity(&ret->common.fd_entity_sink, dispatch_event_reader_pipe, ret);
107 init_fd_entity(&ret->common.fd_entity_source, dispatch_event_reader_pipe, ret);
115 uint64_t reader_buffered_space(const struct reader_pipe *reader)
118 assert(reader->buf_ptr);
119 return log_storage_reader_get_ready_bytes(reader->log_storage_reader_ptr);
122 int reader_is_bufferable(const struct reader_pipe *reader)
125 return reader->buf_ptr && reader->file.path != NULL;
128 int reader_ms_since(const struct reader_pipe *reader, struct timespec *ts)
130 return (ts->tv_sec - reader->last_read_time.tv_sec) * 1000 + (ts->tv_nsec - reader->last_read_time.tv_nsec) / 1000000;
133 int reader_should_buffer(struct reader_pipe *reader, const struct buf_params *buf_params, struct timespec now)
138 if (!reader_is_bufferable(reader))
141 if (reader_buffered_space(reader) < (uint64_t)buf_params->bytes && reader_ms_since(reader, &now) < (buf_params->time * 1000))
144 reader->last_read_time = now;
148 int reader_print_out_single_log(struct reader_pipe *reader, const dlogutil_entry_s *dlogutil_entry)
151 assert(reader->buf_ptr);
152 assert(dlogutil_entry);
154 if (!log_should_print_line(reader->filter, dlogutil_entry))
157 if (reader->file.path) {
158 logfile_write_with_rotation(dlogutil_entry, &reader->file, reader->buf_ptr->sort_by);
162 const char *tag = dlogutil_entry->msg + 1;
166 int r = write(reader->file.path ? reader->file.fd : reader->common.fd_entity_sink.fd, dlogutil_entry, dlogutil_entry->len);
171 /* The pipe is just clogged, this is not an actual error.
172 * We own the entry so it needs to be saved for later. */
176 reader->file.size += r;
177 if (r < dlogutil_entry->len) {
178 reader->partial_log_size = dlogutil_entry->len - r;
179 memcpy(reader->partial_log, ((char *)dlogutil_entry) + r, reader->partial_log_size);
181 } else if (logfile_rotate_needed(&reader->file) > 0) {
182 logfile_do_rotate(&reader->file);
189 * @brief Print out logs
190 * @details Make sure the reader is up to date on printed logs
191 * @param[in] reader The reader to read the data
192 * @param[in] _time Unused timestamps
193 * @return 0 if data remains for the next iteration, 1 if the buffer is to be removed, else -1
195 int print_out_logs(struct reader_pipe *reader, struct now_t _time)
199 assert(reader->buf_ptr);
201 if (reader->partial_log_size) {
202 int r = write(reader->common.fd_entity_sink.fd, reader->partial_log, reader->partial_log_size);
204 return r != 0 && errno != EAGAIN;
206 if (r < reader->partial_log_size) {
207 reader->partial_log_size -= r;
208 memmove(reader->partial_log, reader->partial_log + r, reader->partial_log_size);
212 reader->partial_log_size = 0;
215 while (log_storage_reader_is_new_entry_available(reader->log_storage_reader_ptr)) {
216 const dlogutil_entry_s* ple = (const dlogutil_entry_s *)log_storage_reader_get_new_entry(reader->log_storage_reader_ptr);
220 switch (reader_print_out_single_log(reader, ple)) {
221 case 0: /* nothing more to do, let's do next loop */
224 case 1: /* error after which we need to end the reader */
227 default: /* writing error, bounce out */
232 return reader->is_dumping ? 1 : -1;