continue;
}
- r = reader_logger_run(reader);
+ r = reader_run(&reader->common);
if (r < 0) {
reader_free(&reader->common);
g_backend.logger_readers[id] = NULL;
struct reader_logger *const reader = g_backend.logger_readers[id];
if (!reader)
continue;
- pthread_join(reader->thread, NULL);
reader_free(&reader->common);
}
static void reader_deinit_common(struct reader_common *reader)
{
+ if (reader->thread)
+ pthread_join(reader->thread, NULL);
+
list_clear_custom(&reader->subs, NULL, subreader_free);
if (reader->server) {
void reader_common_init(struct reader_common *reader, struct logger *server)
{
reader->server = server;
+ reader->thread = 0;
init_fd_entity(&reader->fd_entity_sink , NULL, NULL);
init_fd_entity(&reader->fd_entity_source, NULL, NULL);
}
reader_free(reader);
}
+int reader_run(struct reader_common *const reader)
+{
+ pthread_t thread;
+ int r = pthread_create(&thread, NULL, reader->thread_func, reader);
+ if (r < 0)
+ return r;
+
+ reader->thread = thread;
+ return 0;
+}
+
int reader_flush(struct reader_common *reader, struct timespec now_mono, int flush)
{
return list_foreach_ret(reader->subs, &(struct subreader_flush_args) {
struct fd_entity fd_entity_sink;
struct fd_entity fd_entity_source;
struct logger *server;
+
+ pthread_t thread; // note, pthread id, not the same as OS tid!
+ void *(*thread_func)(void *userdata);
+
list_head subs;
int (*service_reader) (struct reader_common *_reader, struct now_t time);
void (*free_reader) (struct reader_common *reader);
void reader_common_init(struct reader_common *reader, struct logger *server);
int reader_apply_log_to_subs(struct reader_common *reader, const struct dlogutil_entry *de);
void dispatch_event_reader(struct logger *server, struct epoll_event *event, void *userdata);
+int reader_run(struct reader_common *const reader);
void subreader_free(void *sub, void *userdata);
int subreader_apply_log(void *sub, void *userdata);
#include <assert.h>
#include <poll.h>
+static void *reader_logger_thread(void *userdata);
+
static void free_reader_logger(struct reader_common *_reader)
{
struct reader_logger *const reader = (struct reader_logger *) _reader;
if (reader->device_fd != -1)
close(reader->device_fd);
-
- /* We don't do `pthread_cancel(reader->thread);`, the thread should
- * end on its own (there's some external cleanup involved anyway).
- * At the moment the thread assumes the reader is permanent and only
- * closes when the whole program does, which is approximately true. */
}
static void handle_single_log(struct reader_logger *reader, struct now_t time, int read_count)
reader_common_init(&ret->common, server);
ret->device_fd = -1;
- ret->thread = 0;
ret->buf_id = LOG_ID_INVALID;
+ ret->common.thread_func = reader_logger_thread;
ret->common.service_reader = service_reader_logger;
ret->common.free_reader = free_reader_logger;
(void) reader_flush(&reader->common, now.mono, reader->common.server->buf_params.time);
}
-static void *reader_thread(void *userdata)
+static void *reader_logger_thread(void *userdata)
{
struct reader_logger *reader = (struct reader_logger *) userdata;
return NULL;
}
-int reader_logger_run(struct reader_logger *reader)
-{
- pthread_t thread;
- int r = pthread_create(&thread, NULL, reader_thread, reader);
- if (r < 0)
- return r;
-
- reader->thread = thread;
- return 0;
-}
-
struct reader_logger {
struct reader_common common;
- pthread_t thread; // note, pthread id, not the same as OS tid!
int device_fd;
log_id_t buf_id;
int skip_count;
};
int reader_logger_init(struct reader_logger **reader, log_id_t buf_id, struct logger *server, bool skip);
-int reader_logger_run(struct reader_logger *reader);
+