1 #include "log_buffer.h"
2 #include "log_compressed_storage.h"
3 #include "logger_internal.h"
4 #include "subreader_dlogutil.h"
5 #include "reader_memory.h"
9 * @brief Append to buffer
10 * @details Appends an entry to the buffer
11 * @param[in] s The logger server
12 * @param[in] entry The entry to append
13 * @param[in] b The buffer whither to append
15 int buffer_append(const dlogutil_entry_s *entry, struct log_buffer *b)
17 if (!log_storage_add_new_entry(b->log_storage_ptr, entry))
23 * @brief Service pipe log data
24 * @details Handle log messages incoming through a pipe
25 * @param[in] server The logger server
26 * @param[in] wr The writer who sent the logs
27 * @param[in] event The event associated with the data
28 * @return 0 on success, else -errno
30 static int service_writer_pipe(struct logger *server, struct writer *wr, struct epoll_event *event)
32 if (~event->events & EPOLLIN) {
33 return event->events & EPOLLHUP
39 int r = read(wr->fd_entity.fd, wr->buffer + wr->readed, sizeof wr->buffer - wr->readed);
40 if (r == -1 && errno == EAGAIN)
42 else if ((r == 0 || r == -1) && event->events & EPOLLHUP)
49 struct pipe_logger_entry *const ple = (struct pipe_logger_entry *const)wr->buffer;
50 while ((wr->readed >= sizeof(ple->len)) && (ple->len <= wr->readed)) {
51 const int payload_size = ple->len - sizeof *ple;
52 if (payload_size <= 0 || payload_size > LOG_MAX_PAYLOAD_SIZE)
55 struct dlogutil_entry_with_msg lem;
56 parse_pipe_message(ple, &lem.header, ple->len);
62 add_recv_timestamp(&lem.header, now);
64 fixup_pipe_msg(&lem, payload_size);
66 qos_add_log(server->qos, &lem.header);
67 r = buffer_append(&lem.header, wr->buf_ptr);
68 wr->readed -= ple->len;
69 memmove(wr->buffer, wr->buffer + ple->len, sizeof wr->buffer - ple->len);
78 static int service_writer_stdout(struct logger *server, struct writer *wr, struct epoll_event *event)
82 assert(wr->stdout_data);
87 char *terminal = NULL;
89 if (event->events & EPOLLIN) {
90 // The - 1 is important, since we want to add a zero at the end.
91 int r = read(wr->fd_entity.fd, wr->buffer + wr->readed, sizeof wr->buffer - wr->readed - 1);
93 if (r == -1 && errno == EAGAIN)
95 else if ((r == 0 || r == -1) && event->events & EPOLLHUP)
99 else if (!wr->buf_ptr->accept_stdout)
102 terminal = wr->buffer + wr->readed + r;
104 eol = strchrnul(wr->buffer + wr->readed, '\n');
107 } else if (event->events & EPOLLHUP)
110 // If we will return something that's not zero, we need to flush the entire buffer.
111 // We do this by pretending there is an extra \0 at the end of the input.
112 // Worth noting that if we got here, there can't be a \0 or \n in the input already
113 // (because if there were, the previous iteration would take care of it).
115 if (wr->readed > 0) {
116 eol = wr->buffer + wr->readed;
120 assert(wr->readed <= sizeof(wr->buffer) - 1);
121 terminal = wr->buffer + wr->readed;
131 // We need to handle the case in which the buffer is full (and again, - 1 is important).
132 while (eol < terminal || wr->readed == sizeof(wr->buffer) - 1) {
135 struct dlogutil_entry_with_msg lem = {
137 .len = sizeof(lem.header),
138 .priority = wr->stdout_data->prio,
139 .pid = wr->stdout_data->pid,
140 .tid = wr->stdout_data->pid,
145 int r = get_now(&now);
148 add_recv_timestamp(&lem.header, now);
150 /* HACK: We copy recv to sent, because we assume that those timestamps are available
151 * and we do not want to special case them. TODO: Is there a better solution? */
152 lem.header.sec_sent_mono = lem.header.sec_recv_mono;
153 lem.header.sec_sent_real = lem.header.sec_recv_real;
154 lem.header.nsec_sent_mono = lem.header.nsec_recv_mono;
155 lem.header.nsec_sent_real = lem.header.nsec_recv_real;
157 lem.header.tag_len = strlen(strncpy((char *)&lem + lem.header.len, wr->stdout_data->tag, sizeof(lem) - lem.header.len - 1));
158 lem.header.len += lem.header.tag_len + 1;
159 assert(lem.header.len <= sizeof(lem));
160 assert(((char *)&lem)[lem.header.len - 1] == '\0');
162 if (sizeof(lem) - lem.header.len - 1 >= 1) {
163 cut = strlen(strncpy((char *)&lem + lem.header.len, wr->buffer, sizeof(lem) - lem.header.len - 1));
164 lem.header.len += cut + 1;
168 assert(lem.header.len <= sizeof(lem));
169 assert(((char *)&lem)[lem.header.len - 1] == '\0');
172 qos_add_log(server->qos, &lem.header);
174 r = buffer_append(&lem.header, wr->buf_ptr);
178 if (wr->buffer[cut] == '\0')
180 if (cut == sizeof(wr->buffer)) {
181 // This will happen if the buffer was full.
183 wr->buffer[0] = '\0';
188 assert(terminal == wr->buffer + wr->readed);
189 memmove(wr->buffer, wr->buffer + cut, sizeof(wr->buffer) - cut);
190 assert(*terminal == '\0');
192 eol = strchrnul(wr->buffer, '\n');
199 * @brief Service util request
200 * @details Handle a standard (not compressed memory) request from util
201 * @param[in] server The logger server
202 * @param[in] wr The writer who sent the request
203 * @param[in] msg The message containing the request
204 * @return 0 on success, else -errno
206 static int service_writer_handle_req_util(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
212 assert(msg->request == DLOG_REQ_HANDLE_LOGUTIL);
214 if (msg->length <= sizeof(struct dlog_control_msg) ||
215 msg->length > sizeof(struct dlog_control_msg) + MAX_LOGGER_REQUEST_LEN)
218 if (msg->data[msg->length - sizeof(struct dlog_control_msg)] != 0)
221 __attribute__((cleanup(reader_free_ptr))) struct reader_pipe *reader = NULL;
224 __attribute__((cleanup(free_dlogutil_line_params))) struct dlogutil_line_params params;
225 if (!initialize_dlogutil_line_params(¶ms, (struct buf_params) { })) {
226 /* TODO: cleanup discards this value, so there isn't much
227 * point setting it. Ideally it would be attached to the
228 * reply but that's a protocol change so not worth it atm */
233 /* Note that in this case, the format doesn't matter.
234 * Therefore, we just pass OFF as default. */
235 retval = get_dlogutil_line_params(msg->data, FORMAT_OFF, ¶ms);
237 // retval = -ENOMEM; // see above
241 if (params.compression) {
242 /* Memory compression is only available from the text
243 * config. libdlogutil clients have a separate request
244 * type to obtain data. Eventually it would be good to
245 * merge them together, but that requires some thought
250 if (params.file_path) {
251 /* Do not trust writer-based readers (only config-based).
252 * The control socket's privilege checks are fairly lenient
253 * so this prevents people from asking us to overwrite
254 * some potentially important files at logger privilege.
256 * At some point it would be good to be able to skip the
257 * middleman and become able to write to a file directly
258 * though. The daemon should become able to receive an
259 * opened file descriptor from a writer. */
260 // retval = -EPERM; // see above
264 retval = reader_pipe_init(&reader, params.enabled_buffers, server, params.monitor, params.is_dumping);
268 int pipe_fd[2] = { -1, -1 };
269 retval = create_fifo_fds(server, pipe_fd, 0, params.is_dumping);
273 retval = reader_add_subreader_dlogutil(&reader->common, params.filter, pipe_fd[1]);
277 set_write_fd_entity(&reader->common.fd_entity_sink, pipe_fd[1]);
278 retval = send_pipe(wr->fd_entity.fd, pipe_fd[0]);
284 retval = add_reader_to_server(&reader->common, server);
292 /* NB: reply success means that stuff is still stable enough that the client
293 * can probably get a second chance, so return a success from the whole func
294 * so as not to drop the client */
295 retval = send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_HANDLE_LOGUTIL, DLOG_REQ_RESULT_ERR, NULL, 0);
297 printf("ERROR: both create_reader_from_dlogutil_line() and send_dlog_reply() failed\n");
302 static int service_writer_handle_req_compressed_memory_util(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
308 assert(msg->request == DLOG_REQ_HANDLE_COMPRESSED_LOGUTIL);
310 if (msg->length <= sizeof(struct dlog_control_msg) ||
311 msg->length > sizeof(struct dlog_control_msg) + MAX_LOGGER_REQUEST_LEN)
314 if (msg->data[msg->length - sizeof(struct dlog_control_msg)] != 0)
317 __attribute__((cleanup(reader_free_ptr))) struct reader_memory *reader = NULL;
320 __attribute__((cleanup(free_dlogutil_line_params))) struct dlogutil_line_params params;
321 if (!initialize_dlogutil_line_params(¶ms, (struct buf_params) { })) {
322 /* TODO: cleanup discards this value, so there isn't much
323 * point setting it. Ideally it would be attached to the
324 * reply but that's a protocol change so not worth it atm */
329 /* Note that in this case, the format doesn't matter.
330 * Therefore, we just pass OFF as default. */
331 retval = get_dlogutil_line_params(msg->data, FORMAT_OFF, ¶ms);
333 // retval = -ENOMEM; // see above
337 if (params.file_path) {
338 /* Do not trust writer-based readers (only config-based).
339 * The control socket's privilege checks are fairly lenient
340 * so this prevents people from asking us to overwrite
341 * some potentially important files at logger privilege.
343 * At some point it would be good to be able to skip the
344 * middleman and become able to write to a file directly
345 * though. The daemon should become able to receive an
346 * opened file descriptor from a writer. */
347 // retval = -EPERM; // see above
351 if (!params.compression)
354 retval = reader_memory_init_with_writer(&reader, wr, server, params.compression, params.monitor, params.is_dumping);
358 int pipe_fd[2] = { -1, -1 };
359 retval = create_fifo_fds(server, pipe_fd, 0, reader->is_dumping);
363 retval = reader_add_subreader_dlogutil(&reader->common, params.filter, pipe_fd[1]);
367 /* FIXME: ideally the FD entity would belong to the sub,
368 * it only works because there is only one sub at a time.
369 * Changing this requires some design thought around how
370 * to handle multiple subs with varying sink throughput. */
371 set_write_fd_entity(&reader->common.fd_entity_sink, pipe_fd[1]);
373 retval = send_pipe(wr->fd_entity.fd, pipe_fd[0]);
379 retval = add_reader_to_server(&reader->common, server);
387 /* NB: reply success means that stuff is still stable enough that the client
388 * can probably get a second chance, so return a success from the whole func
389 * so as not to drop the client */
390 retval = send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_HANDLE_COMPRESSED_LOGUTIL, DLOG_REQ_RESULT_ERR, NULL, 0);
392 printf("ERROR: both create_reader_from_dlogutil_line() and send_dlog_reply() failed\n");
397 static int service_writer_handle_req_get_usage(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
402 assert(msg->request == DLOG_REQ_GET_USAGE);
404 uint32_t buf_size = log_storage_get_usage(wr->buf_ptr->log_storage_ptr);
406 return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_USAGE, DLOG_REQ_RESULT_OK, &buf_size, sizeof buf_size);
409 static int service_writer_handle_req_get_capacity(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
414 assert(msg->request == DLOG_REQ_GET_CAPACITY);
416 uint32_t buf_size = log_storage_get_capacity(wr->buf_ptr->log_storage_ptr);
418 return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_CAPACITY, DLOG_REQ_RESULT_OK, &buf_size, sizeof buf_size);
422 * @brief Service clear request
423 * @details Handle a clear-buffer request
424 * @param[in] server The logger server
425 * @param[in] wr The writer who sent the request
426 * @param[in] msg The message containing the request
427 * @return 0 on success, else -errno
429 static int service_writer_handle_req_clear(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
434 assert(msg->request == DLOG_REQ_CLEAR);
436 if (msg->length != (sizeof(struct dlog_control_msg)))
439 if (!wr || !wr->buf_ptr)
442 log_storage_clear(wr->buf_ptr->log_storage_ptr);
447 static int service_writer_handle_req_global_enable_disable_stdout(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
452 assert(msg->request == DLOG_REQ_GLOBAL_ENABLE_DISABLE_STDOUT);
457 if (msg->length != sizeof(struct dlog_control_msg) + sizeof(char))
460 wr->buf_ptr->accept_stdout = msg->data[0] != '\0';
464 static int service_writer_handle_req_get_stdout(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
469 assert(msg->request == DLOG_REQ_GET_STDOUT);
474 if (msg->length != sizeof(struct dlog_control_msg))
477 return send_dlog_reply(wr->fd_entity.fd, DLOG_REQ_GET_STDOUT, DLOG_REQ_RESULT_OK, &(char){ wr->buf_ptr->accept_stdout }, sizeof(char));
480 bool cmp_names(void *element, void *userdata)
482 return strcmp(log_compressed_storage_get_name((log_compressed_storage *) element), (char *) userdata) == 0;
485 static int service_writer_handle_req_compression_resize(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
491 assert(msg->request == DLOG_REQ_CHANGE_COMPRESSION_SIZE);
493 if (msg->length <= sizeof(struct dlog_control_msg) ||
494 msg->length > sizeof(struct dlog_control_msg) + MAX_LOGGER_REQUEST_LEN)
497 if (msg->data[msg->length - sizeof(struct dlog_control_msg)] != 0)
500 __attribute__((cleanup(free_ptr))) char *storage_name = NULL;
501 __attribute__((cleanup(free_ptr))) char *data = calloc(1, msg->length);
504 memcpy(data, msg->data, msg->length);
506 unsigned int capacity = *(unsigned int *)data;
510 size_t storage_name_len = msg->length - sizeof(unsigned int);
511 storage_name = calloc(1, storage_name_len);
512 if (storage_name == NULL)
514 memcpy(storage_name, (data + sizeof(unsigned int)), storage_name_len);
516 log_compressed_storage *storage = list_find_if(server->compressed_memories, storage_name, cmp_names);
520 log_compressed_storage_resize(storage, capacity);
525 * @brief Service control request
526 * @details Handle a clear-buffer or util request in respect to msg request type
527 * @param[in] server The logger server
528 * @param[in] wr The writer who sent the request
529 * @param[in] msg The message containing the request
530 * @return 0 on success, else -errno
532 static int service_writer_handle_req_ctrl(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
539 switch (msg->request) {
540 case DLOG_REQ_CLEAR: ret = service_writer_handle_req_clear (server, wr, msg); break;
541 case DLOG_REQ_HANDLE_LOGUTIL: ret = service_writer_handle_req_util (server, wr, msg); break;
542 case DLOG_REQ_HANDLE_COMPRESSED_LOGUTIL: ret = service_writer_handle_req_compressed_memory_util (server, wr, msg); break;
543 case DLOG_REQ_GET_CAPACITY: ret = service_writer_handle_req_get_capacity (server, wr, msg); break;
544 case DLOG_REQ_GET_USAGE: ret = service_writer_handle_req_get_usage (server, wr, msg); break;
545 case DLOG_REQ_GLOBAL_ENABLE_DISABLE_STDOUT: ret = service_writer_handle_req_global_enable_disable_stdout(server, wr, msg); break;
546 case DLOG_REQ_GET_STDOUT: ret = service_writer_handle_req_get_stdout (server, wr, msg); break;
547 case DLOG_REQ_CHANGE_COMPRESSION_SIZE: ret = service_writer_handle_req_compression_resize (server, wr, msg); break;
549 default: ret = -EINVAL;
552 if (wr->readed > msg->length) {
553 wr->readed -= msg->length;
554 memmove(wr->buffer, wr->buffer + msg->length, wr->readed);
562 * @brief Service a pipe acquisition request
563 * @details Handle a pipe request
564 * @param[in] server The logger server
565 * @param[in] wr The writer who sent the request
566 * @param[in] msg The message containing the request
567 * @return 0 on success, else -errno
569 static int service_writer_handle_req_pipe(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
575 assert(msg->request == DLOG_REQ_PIPE);
577 if (msg->length != sizeof(struct dlog_control_msg))
580 /* We get named fifo fds here instead of regular pipe(2) to ensure Smack
581 * labels are set correctly - see comments in get_nonblocking_fifo().
583 int pipe_fd[2] = { -1, -1 };
584 int ret = get_nonblocking_fifo(pipe_fd, O_CLOEXEC);
586 check_if_fd_limit_reached(server, errno);
590 if (fcntl(pipe_fd[1], F_SETPIPE_SZ, PIPE_REQUESTED_SIZE) < 0) {
591 /* Ignore failures. This call is just a performance optimisation
592 * and doesn't affect functionality; we can't do anything about
593 * an error anyway. */
599 struct fd_entity pipe_entity = wr->fd_entity;
600 set_read_fd_entity(&pipe_entity, pipe_fd[0]);
601 r = add_fd_entity(&server->epoll_common, &pipe_entity);
605 r = send_pipe(wr->fd_entity.fd, pipe_fd[1]);
610 writer_close_fd(server, wr);
611 wr->service_writer = service_writer_pipe;
612 wr->fd_entity = pipe_entity;
617 remove_fd_entity(&server->epoll_common, &pipe_entity);
626 static int service_writer_handle_req_stdout(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
632 assert(msg->request == DLOG_REQ_STDOUT);
634 if (msg->length <= sizeof(struct dlog_control_msg) + sizeof(struct dlog_control_msg_stdout))
636 struct dlog_control_msg_stdout *pre_data = (void *)msg->data;
637 if (pre_data->prio <= DLOG_DEFAULT || pre_data->prio >= DLOG_PRIO_MAX) // TODO: ugly
640 int data_len = msg->length - sizeof(struct dlog_control_msg);
641 struct dlog_control_msg_stdout *data = malloc(data_len + 1);
644 memcpy(data, pre_data, data_len);
645 // NB: There might have been a zero earlier. We don't really care, as nothing wrong will happen.
646 ((char *)data)[data_len] = '\0';
648 // Open connection to logger, do not set O_CLOEXEC as stdout might get inherited
649 int pipe_fd[2] = { -1, -1 };
650 r = get_nonblocking_fifo(pipe_fd, 0);
652 check_if_fd_limit_reached(server, errno);
656 if (fcntl(pipe_fd[1], F_SETPIPE_SZ, PIPE_REQUESTED_SIZE) < 0) {
657 /* Ignore failures. This call is just a performance optimisation
658 * and doesn't affect functionality; we can't do anything about
659 * an error anyway. */
665 struct fd_entity pipe_entity = wr->fd_entity;
666 set_read_fd_entity(&pipe_entity, pipe_fd[0]);
667 r = add_fd_entity(&server->epoll_common, &pipe_entity);
671 r = send_pipe(wr->fd_entity.fd, pipe_fd[1]);
676 writer_close_fd(server, wr);
677 wr->service_writer = service_writer_stdout;
678 wr->fd_entity = pipe_entity;
680 wr->stdout_data = data;
684 remove_fd_entity(&server->epoll_common, &pipe_entity);
696 static int service_writer_handle_req_write(struct logger *server, struct writer *wr, struct dlog_control_msg *msg)
702 switch (msg->request) {
703 case DLOG_REQ_PIPE: return service_writer_handle_req_pipe(server, wr, msg);
704 case DLOG_REQ_STDOUT:
705 return service_writer_handle_req_stdout(server, wr, msg);
707 default: return -EINVAL;
710 /* NB: unlike for the control socket, resetting `wr->readed`
711 * is the responsibility of the specific request handlers,
712 * since they may obsolete the need by modifying the connection,
713 * in particular, reset it via morph to a pipe */
717 * @brief Create buffer
718 * @details Allocate a buffer structure
719 * @param[out] lb The newly-allocated buffer
720 * @param[in] buf_id The buffer ID
721 * @param[in] data Buffer config data
722 * @return 0 on success, -errno on failure
724 int buffer_create(struct log_buffer **log_buffer, log_id_t buf_id, struct buffer_config_data *data, struct logger *server)
727 struct log_buffer *lb = (struct log_buffer *) calloc(1, sizeof(*lb));
733 lb->log_storage_ptr = log_storage_create(data->size, server->sort_by);
734 if (!lb->log_storage_ptr) {
740 r = socket_initialize(&lb->sock_ctl, lb, service_writer_handle_req_ctrl, &data->ctl_socket);
742 log_storage_free(lb->log_storage_ptr);
747 r = socket_initialize(&lb->sock_conn, lb, service_writer_handle_req_write, &data->conn_socket);
749 socket_close(&lb->sock_ctl);
750 log_storage_free(lb->log_storage_ptr);
755 lb->accept_stdout = true;
763 * @details Deallocate a buffer
764 * @param[in] buffer The buffer to deallocate
766 void buffer_free(struct log_buffer *buffer)
770 socket_close(&buffer->sock_ctl);
771 socket_close(&buffer->sock_conn);
773 log_storage_free(buffer->log_storage_ptr);