int tail;
int not_empty;
int buffered_len;
- int elapsed;
char buffer[0];
};
struct log_buffer** buffers;
int max_buffered_bytes;
int max_buffered_time;
- int should_timeout;
log_format* default_format;
};
{
int buf_id = reader->buf_id;
struct log_buffer* buffer = server->buffers[buf_id];
- int r = 0;
-
- r = print_out_logs(reader, buffer);
- if (!r)
- server->should_timeout |= (1 << LOG_ID_MAX);
- return r;
+ return print_out_logs(reader, buffer);
}
static int parse_command_line(const char* cmdl, struct logger* server, struct writer* wr)
goto cleanup;
}
reader->current = server->buffers[reader->buf_id]->head;
- server->should_timeout |= (1 << reader->buf_id);
- server->buffers[reader->buf_id]->elapsed = server->max_buffered_time;
LIST_ADD(server->readers[reader->buf_id], reader);
cleanup:
return EINVAL;
wr->buf_ptr->head = wr->buf_ptr->tail = wr->buf_ptr->not_empty = 0;
- wr->buf_ptr->elapsed = wr->buf_ptr->buffered_len = wr->buf_ptr->lines = 0;
+ wr->buf_ptr->buffered_len = wr->buf_ptr->lines = 0;
LIST_FOREACH(server->readers[wr->buf_ptr->id], reader) {
reader->current = 0;
return EINVAL;
buffer_append(entry, server->buffers[wr->buf_ptr->id], server->readers[wr->buf_ptr->id]);
wr->readed -= entry->len;
- server->should_timeout |= (1<<wr->buf_ptr->id);
memmove(wr->buffer, wr->buffer + entry->len, LOG_MAX_SIZE - entry->len);
}
} else if (event->events & EPOLLHUP)
return 0;
}
-static void service_all_readers(struct logger* server, int time_elapsed)
+static void service_all_readers(struct logger* server, int force_push)
{
int i = 0;
int r = 0;
struct log_buffer** buffers = server->buffers;
struct reader* reader = NULL;
- server->should_timeout &= ~(1 << LOG_ID_MAX);
for (i = 0; i < LOG_ID_MAX; i++) {
- buffers[i]->elapsed += time_elapsed;
- if (buffers[i]->buffered_len >= server->max_buffered_bytes ||
- buffers[i]->elapsed >= server->max_buffered_time) {
+ if (force_push || buffers[i]->buffered_len >= server->max_buffered_bytes) {
LIST_FOREACH(server->readers[i], reader) {
r = service_reader(server, reader);
if (r > 0) {
}
buffers[i]->buffered_len = 0;
- buffers[i]->elapsed = 0;
- server->should_timeout &= ~(1<<i);
}
}
}
return 0;
}
-static int logger_get_timeout(struct logger* server)
-{
- int timeout = -1;
- int i = 0;
-
- if (!server->should_timeout)
- return timeout;
-
- if (server->should_timeout & (1 << LOG_ID_MAX))
- timeout = server->max_buffered_time;
-
-
- for (i = 0; i < LOG_ID_MAX; i++) {
- int diff = server->max_buffered_time - server->buffers[i]->elapsed;
- if (diff >= 0 && (diff < timeout || timeout == -1))
- timeout = diff;
- }
-
- return timeout;
-}
-
static int do_logger(struct logger* server)
{
int nfds, i;
- int time_left = 0;
- struct timeval tv1;
- struct timeval tv2;
const int max_events = 1024;
struct epoll_event events[1024];
for (;;) {
- gettimeofday(&tv1, NULL);
- time_left = logger_get_timeout(server);
- nfds = epoll_wait(server->epollfd, events, max_events, time_left);
-
- if (nfds < 0 && errno == EINTR) {
- gettimeofday(&tv2, NULL);
- time_left = (tv2.tv_sec - tv1.tv_sec)*1000 + (tv2.tv_usec - tv1.tv_usec)/1000;
- } else if (nfds < 0)
+ do {
+ nfds = epoll_wait(server->epollfd, events, max_events, server->max_buffered_time);
+ } while (nfds < 0 && errno == EINTR);
+
+ if (nfds < 0)
goto err;
for (i = 0; i < nfds; i++) {
struct fd_entity* entity = (struct fd_entity*) events[i].data.ptr;
dispatch_event(server, entity, &events[i]);
}
- service_all_readers(server, time_left);
+
+ service_all_readers(server, nfds == 0);
}
return 0;