1 #include "log_buffer.h"
2 #include "log_compressed_storage.h"
3 #include "logger_internal.h"
4 #include "subreader_dlogutil.h"
5 #include "reader_memory.h"
9 * @brief Append to buffer
10 * @details Appends an entry to the buffer
11 * @param[in] s The logger server
12 * @param[in] entry The entry to append
13 * @param[in] b The buffer whither to append
15 int buffer_append(const dlogutil_entry_s *entry, struct log_buffer *b)
17 if (!log_storage_add_new_entry(b->log_storage_ptr, entry))
23 * @brief Service pipe log data
24 * @details Handle log messages incoming through a pipe
25 * @param[in] server The logger server
26 * @param[in] wr The writer who sent the logs
27 * @param[in] event The event associated with the data
28 * @return 0 on success, else -errno
30 static int service_writer_pipe(struct logger *server, struct writer *wr, struct epoll_event *event)
32 if (~event->events & EPOLLIN) {
33 return event->events & EPOLLHUP
39 int r = read(wr->fd_entity.fd, wr->buffer + wr->readed, sizeof wr->buffer - wr->readed);
40 if (r == -1 && errno == EAGAIN)
42 else if ((r == 0 || r == -1) && event->events & EPOLLHUP)
49 struct pipe_logger_entry *const ple = (struct pipe_logger_entry *const)wr->buffer;
50 while ((wr->readed >= sizeof(ple->len)) && (ple->len <= wr->readed)) {
51 const int payload_size = ple->len - sizeof *ple;
52 if (payload_size <= 0 || payload_size > LOG_MAX_PAYLOAD_SIZE)
55 struct dlogutil_entry_with_msg lem;
56 parse_pipe_message(ple, &lem.header, ple->len);
62 add_recv_timestamp(&lem.header, now);
64 fixup_pipe_msg(&lem, payload_size);
66 qos_add_log(server->qos, &lem.header);
67 r = buffer_append(&lem.header, wr->buf_ptr);
68 wr->readed -= ple->len;
69 memmove(wr->buffer, wr->buffer + ple->len, sizeof wr->buffer - ple->len);
78 static int service_writer_stdout(struct logger *server, struct writer *wr, struct epoll_event *event)
82 assert(wr->stdout_data);
87 char *terminal = NULL;
89 if (event->events & EPOLLIN) {
90 // The - 1 is important, since we want to add a zero at the end.
91 int r = read(wr->fd_entity.fd, wr->buffer + wr->readed, sizeof wr->buffer - wr->readed - 1);
93 if (r == -1 && errno == EAGAIN)
95 else if ((r == 0 || r == -1) && event->events & EPOLLHUP)
99 else if (!wr->buf_ptr->accept_stdout)
102 terminal = wr->buffer + wr->readed + r;
104 eol = strchrnul(wr->buffer + wr->readed, '\n');
107 } else if (event->events & EPOLLHUP)
110 // If we will return something that's not zero, we need to flush the entire buffer.
111 // We do this by pretending there is an extra \0 at the end of the input.
112 // Worth noting that if we got here, there can't be a \0 or \n in the input already
113 // (because if there were, the previous iteration would take care of it).
115 if (wr->readed > 0) {
116 eol = wr->buffer + wr->readed;
120 assert(wr->readed <= sizeof(wr->buffer) - 1);
121 terminal = wr->buffer + wr->readed;
131 // We need to handle the case in which the buffer is full (and again, - 1 is important).
132 while (eol < terminal || wr->readed == sizeof(wr->buffer) - 1) {
135 struct dlogutil_entry_with_msg lem = {
137 .len = sizeof(lem.header),
138 .priority = wr->stdout_data->prio,
139 .pid = wr->stdout_data->pid,
140 .tid = wr->stdout_data->pid,
145 int r = get_now(&now);
148 add_recv_timestamp(&lem.header, now);
150 /* HACK: We copy recv to sent, because we assume that those timestamps are available
151 * and we do not want to special case them. TODO: Is there a better solution? */
152 lem.header.sec_sent_mono = lem.header.sec_recv_mono;
153 lem.header.sec_sent_real = lem.header.sec_recv_real;
154 lem.header.nsec_sent_mono = lem.header.nsec_recv_mono;
155 lem.header.nsec_sent_real = lem.header.nsec_recv_real;
157 lem.header.tag_len = strlen(strncpy((char *)&lem + lem.header.len, wr->stdout_data->tag, sizeof(lem) - lem.header.len - 1));
158 lem.header.len += lem.header.tag_len + 1;
159 assert(lem.header.len <= sizeof(lem));
160 assert(((char *)&lem)[lem.header.len - 1] == '\0');
162 if (sizeof(lem) - lem.header.len - 1 >= 1) {
163 cut = strlen(strncpy((char *)&lem + lem.header.len, wr->buffer, sizeof(lem) - lem.header.len - 1));
164 lem.header.len += cut + 1;
168 assert(lem.header.len <= sizeof(lem));
169 assert(((char *)&lem)[lem.header.len - 1] == '\0');
172 qos_add_log(server->qos, &lem.header);
174 r = buffer_append(&lem.header, wr->buf_ptr);
178 if (wr->buffer[cut] == '\0')
180 if (cut == sizeof(wr->buffer)) {
181 // This will happen if the buffer was full.
183 wr->buffer[0] = '\0';
188 assert(terminal == wr->buffer + wr->readed);
189 memmove(wr->buffer, wr->buffer + cut, sizeof(wr->buffer) - cut);
190 assert(*terminal == '\0');
192 eol = strchrnul(wr->buffer, '\n');
198 static bool is_control_msg_valid_for_util_req(const struct dlog_control_msg *msg)
200 return msg->length > sizeof *msg
201 && msg->length <= sizeof *msg + MAX_LOGGER_REQUEST_LEN
202 && msg->data[msg->length - sizeof *msg] == '\0'
206 static int parse_req_for_dlogutil_params(struct dlogutil_line_params *params, const struct dlog_control_msg *msg)
208 if (!initialize_dlogutil_line_params(params, (struct buf_params) { }))
211 /* Note that in this case, the format doesn't matter
212 * since we're going to send binary data anyway.
213 * Therefore, we just pass OFF as default. */
214 const int r = get_dlogutil_line_params(msg->data, FORMAT_OFF, params);
218 /* Do not trust writer-based readers (only config-based).
219 * The control socket's privilege checks are fairly lenient
220 * so this prevents people from asking us to overwrite
221 * some potentially important files at logger privilege.
223 * At some point it would be good to be able to skip the
224 * middleman and become able to write to a file directly
225 * though. The daemon should become able to receive an
226 * opened file descriptor from a writer. */
227 if (params->file_path)
233 static int sent_req_reply(int fd, signed char request)
235 const int ret = send_dlog_reply(fd, request, DLOG_REQ_RESULT_ERR, NULL, 0);
237 printf("ERROR: both the request handling and send_dlog_reply() failed\n");
242 static int distribute_pipes(struct logger *server, struct reader_common *reader, struct dlogutil_line_params *params, int writer_fd)
244 int pipe_fd[2] = { -1, -1 };
245 if (create_fifo_fds(server, pipe_fd, 0, params->is_dumping))
248 if (reader_add_subreader_dlogutil(reader, params->filter, pipe_fd[1]) != 0)
251 /* FIXME: ideally the FD entity would belong to the sub,
252 * it only works because there is only one sub at a time.
253 * Changing this requires some design thought around how
254 * to handle multiple subs with varying sink throughput. */
255 set_write_fd_entity(&reader->fd_entity_sink, pipe_fd[1]);
257 const int rs = send_pipe(writer_fd, pipe_fd[0]);
263 const int ra = add_reader_to_server(reader, server);
270 int req_init_reader_pipe(struct logger *server, struct writer *wr, void *reader, struct dlogutil_line_params *params)
272 return reader_pipe_init((struct reader_pipe **)reader, params->enabled_buffers, server, params->monitor, params->is_dumping);
275 int req_init_reader_memory(struct logger *server, struct writer *wr, void *reader, struct dlogutil_line_params *params)
277 return reader_memory_init_with_writer((struct reader_memory **)reader, wr, server, params->compression, params->monitor, params->is_dumping);
281 * @brief Service util request
282 * @details Handle a request from util
283 * @param[in] server The logger server
284 * @param[in] wr The writer who sent the request
285 * @param[in] msg The message containing the request
286 * @return 0 on success, else -errno
288 int service_writer_handle_req_generic_util(struct logger *server, struct writer *wr, struct dlog_control_msg *msg, bool wanted_compression,
289 int (*make_reader_func)(struct logger *, struct writer *, void *, struct dlogutil_line_params *), signed char request)
294 assert(msg->request == request);
296 if (!is_control_msg_valid_for_util_req(msg))
299 __attribute__((cleanup(free_dlogutil_line_params))) struct dlogutil_line_params params;
300 int r = parse_req_for_dlogutil_params(¶ms, msg);
304 /* Enabling compression is only available from the text
305 * config. libdlogutil clients have a separate request
306 * type to obtain data. Eventually it would be good to
307 * merge them together, but that requires some thought
309 if (!!params.compression != wanted_compression)
312 __attribute__((cleanup(reader_free_ptr))) struct reader_common *reader = NULL;
313 r = make_reader_func(server, wr, &reader, ¶ms);
317 if (!distribute_pipes(server, reader, ¶ms, wr->fd_entity.fd))
324 /* NB: reply success means that stuff is still stable enough that the client
325 * can probably get a second chance, so return a success from the whole func
326 * so as not to drop the client */
327 return sent_req_reply(wr->fd_entity.fd, request);
330 static int service_writer_handle_req_util(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
332 return service_writer_handle_req_generic_util(server, wr, msg,
333 false, req_init_reader_pipe, DLOG_REQ_HANDLE_LOGUTIL);
336 static int service_writer_handle_req_compressed_memory_util(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
338 return service_writer_handle_req_generic_util(server, wr, msg,
339 true, req_init_reader_memory, DLOG_REQ_HANDLE_COMPRESSED_LOGUTIL);
342 static int service_writer_handle_req_get_usage(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
347 assert(msg->request == DLOG_REQ_GET_USAGE);
349 uint32_t buf_size = log_storage_get_usage(wr->buf_ptr->log_storage_ptr);
351 return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_USAGE, DLOG_REQ_RESULT_OK, &buf_size, sizeof buf_size);
354 static int service_writer_handle_req_get_capacity(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
359 assert(msg->request == DLOG_REQ_GET_CAPACITY);
361 uint32_t buf_size = log_storage_get_capacity(wr->buf_ptr->log_storage_ptr);
363 return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_CAPACITY, DLOG_REQ_RESULT_OK, &buf_size, sizeof buf_size);
367 * @brief Service clear request
368 * @details Handle a clear-buffer request
369 * @param[in] server The logger server
370 * @param[in] wr The writer who sent the request
371 * @param[in] msg The message containing the request
372 * @return 0 on success, else -errno
374 static int service_writer_handle_req_clear(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
379 assert(msg->request == DLOG_REQ_CLEAR);
381 if (msg->length != (sizeof(struct dlog_control_msg)))
384 if (!wr || !wr->buf_ptr)
387 log_storage_clear(wr->buf_ptr->log_storage_ptr);
392 static int service_writer_handle_req_global_enable_disable_stdout(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
397 assert(msg->request == DLOG_REQ_GLOBAL_ENABLE_DISABLE_STDOUT);
402 if (msg->length != sizeof(struct dlog_control_msg) + sizeof(char))
405 wr->buf_ptr->accept_stdout = msg->data[0] != '\0';
409 static int service_writer_handle_req_get_stdout(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
414 assert(msg->request == DLOG_REQ_GET_STDOUT);
419 if (msg->length != sizeof(struct dlog_control_msg))
422 return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_STDOUT, DLOG_REQ_RESULT_OK, &(char){ wr->buf_ptr->accept_stdout }, sizeof(char));
425 bool cmp_names(void *element, void *userdata)
427 return strcmp(log_compressed_storage_get_name((log_compressed_storage *) element), (char *) userdata) == 0;
430 static int service_writer_handle_req_compression_resize(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
436 assert(msg->request == DLOG_REQ_CHANGE_COMPRESSION_SIZE);
438 if (msg->length <= sizeof(struct dlog_control_msg) ||
439 msg->length > sizeof(struct dlog_control_msg) + MAX_LOGGER_REQUEST_LEN)
442 if (msg->data[msg->length - sizeof(struct dlog_control_msg)] != 0)
445 __attribute__((cleanup(free_ptr))) char *storage_name = NULL;
446 __attribute__((cleanup(free_ptr))) char *data = calloc(1, msg->length);
449 memcpy(data, msg->data, msg->length);
451 unsigned int capacity = *(unsigned int *)data;
455 size_t storage_name_len = msg->length - sizeof(unsigned int);
456 storage_name = calloc(1, storage_name_len);
457 if (storage_name == NULL)
459 memcpy(storage_name, (data + sizeof(unsigned int)), storage_name_len);
461 log_compressed_storage *storage = list_find_if(server->compressed_memories, storage_name, cmp_names);
465 log_compressed_storage_resize(storage, capacity);
470 * @brief Service control request
471 * @details Handle a clear-buffer or util request in respect to msg request type
472 * @param[in] server The logger server
473 * @param[in] wr The writer who sent the request
474 * @param[in] msg The message containing the request
475 * @return 0 on success, else -errno
477 static int service_writer_handle_req_ctrl(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
484 switch (msg->request) {
485 case DLOG_REQ_CLEAR: ret = service_writer_handle_req_clear (server, wr, msg); break;
486 case DLOG_REQ_HANDLE_LOGUTIL: ret = service_writer_handle_req_util (server, wr, msg); break;
487 case DLOG_REQ_HANDLE_COMPRESSED_LOGUTIL: ret = service_writer_handle_req_compressed_memory_util (server, wr, msg); break;
488 case DLOG_REQ_GET_CAPACITY: ret = service_writer_handle_req_get_capacity (server, wr, msg); break;
489 case DLOG_REQ_GET_USAGE: ret = service_writer_handle_req_get_usage (server, wr, msg); break;
490 case DLOG_REQ_GLOBAL_ENABLE_DISABLE_STDOUT: ret = service_writer_handle_req_global_enable_disable_stdout(server, wr, msg); break;
491 case DLOG_REQ_GET_STDOUT: ret = service_writer_handle_req_get_stdout (server, wr, msg); break;
492 case DLOG_REQ_CHANGE_COMPRESSION_SIZE: ret = service_writer_handle_req_compression_resize (server, wr, msg); break;
494 default: ret = -EINVAL;
497 if (wr->readed > msg->length) {
498 wr->readed -= msg->length;
499 memmove(wr->buffer, wr->buffer + msg->length, wr->readed);
507 * @brief Service a pipe acquisition request
508 * @details Handle a pipe request
509 * @param[in] server The logger server
510 * @param[in] wr The writer who sent the request
511 * @param[in] msg The message containing the request
512 * @return 0 on success, else -errno
514 static int service_writer_handle_req_pipe(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
520 assert(msg->request == DLOG_REQ_PIPE);
522 if (msg->length != sizeof(struct dlog_control_msg))
525 /* We get named fifo fds here instead of regular pipe(2) to ensure Smack
526 * labels are set correctly - see comments in get_nonblocking_fifo().
528 int pipe_fd[2] = { -1, -1 };
529 int ret = get_nonblocking_fifo(pipe_fd, O_CLOEXEC);
531 check_if_fd_limit_reached(server, errno);
535 if (fcntl(pipe_fd[1], F_SETPIPE_SZ, PIPE_REQUESTED_SIZE) < 0) {
536 /* Ignore failures. This call is just a performance optimisation
537 * and doesn't affect functionality; we can't do anything about
538 * an error anyway. */
544 struct fd_entity pipe_entity = wr->fd_entity;
545 set_read_fd_entity(&pipe_entity, pipe_fd[0]);
546 r = add_fd_entity(&server->epoll_common, &pipe_entity);
550 r = send_pipe(wr->fd_entity.fd, pipe_fd[1]);
555 writer_close_fd(server, wr);
556 wr->service_writer = service_writer_pipe;
557 wr->fd_entity = pipe_entity;
562 remove_fd_entity(&server->epoll_common, &pipe_entity);
571 static int service_writer_handle_req_stdout(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
577 assert(msg->request == DLOG_REQ_STDOUT);
579 if (msg->length <= sizeof(struct dlog_control_msg) + sizeof(struct dlog_control_msg_stdout))
581 struct dlog_control_msg_stdout *pre_data = (void *)msg->data;
582 if (pre_data->prio <= DLOG_DEFAULT || pre_data->prio >= DLOG_PRIO_MAX) // TODO: ugly
585 int data_len = msg->length - sizeof(struct dlog_control_msg);
586 struct dlog_control_msg_stdout *data = malloc(data_len + 1);
589 memcpy(data, pre_data, data_len);
590 // NB: There might have been a zero earlier. We don't really care, as nothing wrong will happen.
591 ((char *)data)[data_len] = '\0';
593 // Open connection to logger, do not set O_CLOEXEC as stdout might get inherited
594 int pipe_fd[2] = { -1, -1 };
595 r = get_nonblocking_fifo(pipe_fd, 0);
597 check_if_fd_limit_reached(server, errno);
601 if (fcntl(pipe_fd[1], F_SETPIPE_SZ, PIPE_REQUESTED_SIZE) < 0) {
602 /* Ignore failures. This call is just a performance optimisation
603 * and doesn't affect functionality; we can't do anything about
604 * an error anyway. */
610 struct fd_entity pipe_entity = wr->fd_entity;
611 set_read_fd_entity(&pipe_entity, pipe_fd[0]);
612 r = add_fd_entity(&server->epoll_common, &pipe_entity);
616 r = send_pipe(wr->fd_entity.fd, pipe_fd[1]);
621 writer_close_fd(server, wr);
622 wr->service_writer = service_writer_stdout;
623 wr->fd_entity = pipe_entity;
625 wr->stdout_data = data;
629 remove_fd_entity(&server->epoll_common, &pipe_entity);
641 static int service_writer_handle_req_write(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
647 switch (msg->request) {
648 case DLOG_REQ_PIPE: return service_writer_handle_req_pipe(server, wr, msg);
649 case DLOG_REQ_STDOUT:
650 return service_writer_handle_req_stdout(server, wr, msg);
652 default: return -EINVAL;
655 /* NB: unlike for the control socket, resetting `wr->readed`
656 * is the responsibility of the specific request handlers,
657 * since they may obsolete the need by modifying the connection,
658 * in particular, reset it via morph to a pipe */
662 * @brief Create buffer
663 * @details Allocate a buffer structure
664 * @param[out] lb The newly-allocated buffer
665 * @param[in] buf_id The buffer ID
666 * @param[in] data Buffer config data
667 * @return 0 on success, -errno on failure
669 int buffer_create(struct log_buffer **log_buffer, log_id_t buf_id, struct buffer_config_data *data, struct logger *server)
672 struct log_buffer *lb = (struct log_buffer *) calloc(1, sizeof(*lb));
678 lb->log_storage_ptr = log_storage_create(data->size, server->sort_by);
679 if (!lb->log_storage_ptr) {
685 r = socket_initialize(&lb->sock_ctl, lb, service_writer_handle_req_ctrl, &data->ctl_socket);
687 log_storage_free(lb->log_storage_ptr);
692 r = socket_initialize(&lb->sock_conn, lb, service_writer_handle_req_write, &data->conn_socket);
694 socket_close(&lb->sock_ctl);
695 log_storage_free(lb->log_storage_ptr);
700 lb->accept_stdout = true;
708 * @details Deallocate a buffer
709 * @param[in] buffer The buffer to deallocate
711 void buffer_free(struct log_buffer *buffer)
715 socket_close(&buffer->sock_ctl);
716 socket_close(&buffer->sock_conn);
718 log_storage_free(buffer->log_storage_ptr);