assert(reader);
assert(buffer);
- reader->log_storage_reader_ptr = log_storage_new_reader(buffer->log_storage_ptr,
+ reader->log_storage_reader_ptrs[0] = log_storage_new_reader(buffer->log_storage_ptr,
reader->is_dumping, reader->monitor, reader_pipe_notify_losing_log, reader);
- if (!reader->log_storage_reader_ptr)
+ if (!reader->log_storage_reader_ptrs[0])
return -ENOMEM;
return list_add(&reader->common.server->readers, reader) ? 0 : -ENOMEM;
if (reader->common.server)
list_remove(&reader->common.server->readers, reader);
- if (reader->log_storage_reader_ptr)
- log_storage_release_reader(reader->log_storage_reader_ptr);
+
+ for (log_id_t i = 0; i < NELEMS(reader->log_storage_reader_ptrs); ++i) {
+ log_storage_reader *const lsr = reader->log_storage_reader_ptrs[i];
+ if (!lsr)
+ continue;
+ log_storage_release_reader(lsr);
+ }
}
static int print_out_logs(struct reader_common *_reader, struct now_t _time);
ret->monitor = monitor;
ret->is_dumping = is_dumping;
ret->buf_ptr = NULL;
- ret->log_storage_reader_ptr = NULL;
+
+ for (log_id_t i = 0; i < NELEMS(ret->log_storage_reader_ptrs); ++i)
+ ret->log_storage_reader_ptrs[i] = NULL;
return ret;
}
if (r < 0)
return 0;
- while (log_storage_reader_is_new_entry_available(reader->log_storage_reader_ptr)) {
- const dlogutil_entry_s *ple = (const dlogutil_entry_s *)log_storage_reader_get_new_entry(reader->log_storage_reader_ptr);
+ while (true) {
+ log_storage_reader *best_lsr = NULL;
+ const dlogutil_entry_s *best_entry = NULL;
+ for (log_id_t i = 0; i < NELEMS(reader->log_storage_reader_ptrs); ++i) {
+ log_storage_reader *const lsr = reader->log_storage_reader_ptrs[i];
+ if (!lsr)
+ continue;
+ if (!log_storage_reader_is_new_entry_available(lsr))
+ continue;
+
+ const dlogutil_entry_s *const entry = log_storage_reader_peek_entry(lsr);
+ assert(entry);
+
+ if (!best_entry || log_entry_is_earlier(reader->buf_ptr->sort_by, entry, best_entry)) {
+ best_lsr = lsr;
+ best_entry = entry;
+ continue;
+ }
+ }
+
+ if (!best_lsr)
+ break;
+
+ const dlogutil_entry_s *ple = (const dlogutil_entry_s *)log_storage_reader_get_new_entry(best_lsr);
assert(ple);
+ assert(ple == best_entry);
switch (reader_pipe_print_out_single_log(reader, ple)) {
case 0: /* nothing more to do, let's do next loop */