From 6082ed07a044974e829e03f060cb7dfce2b82d3f Mon Sep 17 00:00:00 2001 From: Mateusz Majewski Date: Thu, 20 Aug 2020 14:50:48 +0200 Subject: [PATCH] Move some QoS related stuff to separate file Change-Id: Ic5ee82c0602d95297a4ab89b5b8a04d27a4454a0 --- Makefile.am | 3 +- include/logcommon.h | 2 + src/logger/logger.c | 112 -------------------------- src/logger/qos.c | 177 +++++++++++++++++------------------------ src/logger/qos.h | 7 ++ src/logger/qos_distributions.c | 135 +++++++++++++++++++++++++++++++ src/shared/logcommon.c | 15 ++++ 7 files changed, 234 insertions(+), 217 deletions(-) create mode 100644 src/logger/qos_distributions.c diff --git a/Makefile.am b/Makefile.am index 603988c..6f94254 100644 --- a/Makefile.am +++ b/Makefile.am @@ -128,6 +128,7 @@ dlog_logger_SOURCES = \ src/logger/fd_entity.c \ src/logger/log_storage.c \ src/logger/qos.c \ + src/logger/qos_distributions.c \ src/logger/reader_pipe.c \ src/shared/backend_androidlogger.c \ src/shared/ptrs_list.c \ @@ -490,7 +491,7 @@ src_tests_logprint_CFLAGS = $(check_CFLAGS) src_tests_logprint_LDFLAGS = $(AM_LDFLAGS) -Wl,--wrap=write,--wrap=malloc,--wrap=calloc,--wrap=localtime_r,--wrap=strdup,--wrap=strndup,--wrap=list_add src_tests_qos_distributions_SOURCES = src/tests/qos_distributions.c \ - src/logger/qos.c + src/logger/qos_distributions.c src_tests_qos_distributions_CFLAGS = $(check_CFLAGS) src_tests_qos_distributions_LDFLAGS = $(AM_LDFLAGS) diff --git a/include/logcommon.h b/include/logcommon.h index 9c14d04..f50814d 100644 --- a/include/logcommon.h +++ b/include/logcommon.h @@ -172,6 +172,8 @@ char *create_glued_string(char const *const *strings, int n); dlogutil_sorting_order_e get_order_from_string(const char *str); +int full_write(int fd, const char *buf, int size); + #ifdef __cplusplus } #endif diff --git a/src/logger/logger.c b/src/logger/logger.c index 863cbab..2338e28 100644 --- a/src/logger/logger.c +++ b/src/logger/logger.c @@ -461,112 +461,6 @@ static void buffer_free(struct log_buffer *buffer, struct logger *logger) free(buffer); } -bool qos_is_enabled(const struct qos_module *qos) -{ - assert(qos); - return qos->threshold > 0 && qos->max_throughput > 0 && qos->file_path && strlen(qos->file_path) > 0; -} - -int full_write(int fd, const char *buf, int size) -{ - while (size > 0) { - errno = 0; - int ret = write(fd, buf, size); - if (ret == -1 || ret == 0) - return errno != 0 ? -errno : -EIO; - - buf += ret; - size -= ret; - } - - return 0; -} - -void qos_create_limits_file(struct qos_module *qos, bool is_limiting) -{ - __attribute__((cleanup(close_fd))) int fd = creat(qos->file_path, 0644); - if (fd < 0) - return; - - if (!is_limiting) // leave an empty file to reset limits. - return; - - int recv_count; - __attribute__((cleanup(free_ptr))) struct metrics_info *const recv_infos = metrics_get_info(qos->log_metrics, &recv_count); - if (!recv_infos) - return; - - qsort(recv_infos, recv_count, sizeof(*recv_infos), metrics_sort_by_pid_first); - __attribute__((cleanup(free_ptr))) struct metrics_pid_aggr_info *const aggr_infos = calloc(recv_count, sizeof(*aggr_infos)); - if (!aggr_infos) - return; - - int aggr_count = 0; - pid_t last = -1; - for (int i = 0; i < recv_count; ++i) { - if (recv_infos[i].pid != last) { - aggr_infos[aggr_count].pid = recv_infos[i].pid; - aggr_infos[aggr_count].count = 0; - for (int j = 0; j < DLOG_PRIO_MAX; ++j) - aggr_infos[aggr_count].count += recv_infos[i].count[j]; - aggr_count += 1; - } else { - for (int j = 0; j < DLOG_PRIO_MAX; ++j) - aggr_infos[aggr_count - 1].count += recv_infos[i].count[j]; - } - } - - qos_distribution_func(qos, aggr_infos, aggr_count); - - const int len = aggr_count * 32; - __attribute__((cleanup(free_ptr))) char *buf = malloc(len); - int pos = 0; - for (int i = 0; i < aggr_count; ++i) - if (aggr_infos[i].count >= 0) { - int w = snprintf(buf + pos, pos - len, "pidlimit|%d=%d\n", aggr_infos[i].pid, aggr_infos[i].count); - pos += w; - assert(pos < len); - } - - full_write(fd, buf, pos); // Ignore write errors -} - -void qos_set_next_update_time(struct qos_module *qos, struct timespec now) -{ - qos->cancel_limits_at = now; - qos->cancel_limits_at.tv_sec += qos->limit_duration; -} - -void qos_apply_limits(struct qos_module *qos) -{ - qos->currently_limiting = true; - qos_create_limits_file(qos, true); -} - -void qos_relax_limits(struct qos_module *qos) -{ - qos->currently_limiting = false; - qos_create_limits_file(qos, false); -} - -void qos_add_log(struct qos_module *qos, const struct dlogutil_entry *due) -{ - assert(qos); - assert(due); - - assert(qos->threshold > 0); - assert(qos->file_path); - - metrics_add_log(qos->log_metrics, due); - - const int diff = metrics_get_total(qos->log_metrics) - qos->threshold; - if (diff < 0) - return; - - if (diff == 0 || (qos->threshold_reapply && diff % qos->threshold_reapply == 0)) - qos_apply_limits(qos); // the inQoSition declares Exterminatus -} - void qos_periodic_check(struct logger *server) { struct qos_module *const qos = &server->qos; @@ -2013,12 +1907,6 @@ static bool cond_writer_free(void *ptr, void *user_data) return true; } -static void qos_free(struct qos_module *qos) -{ - metrics_destroy(qos->log_metrics); - free(qos->file_path); -} - /** * @brief Free logger * @details Deallocate the logger and its auxiliary structures diff --git a/src/logger/qos.c b/src/logger/qos.c index c2c0bbb..4046b37 100644 --- a/src/logger/qos.c +++ b/src/logger/qos.c @@ -14,137 +14,106 @@ #include "qos.h" +#include #include +#include void (*qos_distribution_func)(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count); -static inline int divide_rounding_upwards(int dividend, int divisor) +bool qos_is_enabled(const struct qos_module *qos) { - return (dividend + (divisor - 1)) / (divisor ?: 1); + assert(qos); + return qos->threshold > 0 && qos->max_throughput > 0 && qos->file_path && strlen(qos->file_path) > 0; } -void qos_distribution_proportional_raw(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) +void qos_create_limits_file(struct qos_module *qos, bool is_limiting) { - const double proportion = (double) qos->max_throughput / (metrics_get_total(qos->log_metrics) ?: 1); + __attribute__((cleanup(close_fd))) int fd = creat(qos->file_path, 0644); + if (fd < 0) + return; - for (int i = 0; i < count; ++i) - infos[i].count = (int) (infos[i].count * proportion + 0.5); -} + if (!is_limiting) // leave an empty file to reset limits. + return; -void qos_distribution_equal(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) -{ - /* round upwards so that it's never 0, to give all clients a chance - * to log at least something (so a developer can tell it's alive) */ - const int equal_limit_for_everybody = divide_rounding_upwards(qos->max_throughput, count); + int recv_count; + __attribute__((cleanup(free_ptr))) struct metrics_info *const recv_infos = metrics_get_info(qos->log_metrics, &recv_count); + if (!recv_infos) + return; - for (int i = 0; i < count; ++i) - infos[i].count = equal_limit_for_everybody; -} + qsort(recv_infos, recv_count, sizeof(*recv_infos), metrics_sort_by_pid_first); + __attribute__((cleanup(free_ptr))) struct metrics_pid_aggr_info *const aggr_infos = calloc(recv_count, sizeof(*aggr_infos)); + if (!aggr_infos) + return; + int aggr_count = 0; + pid_t last = -1; + for (int i = 0; i < recv_count; ++i) { + if (recv_infos[i].pid != last) { + aggr_infos[aggr_count].pid = recv_infos[i].pid; + aggr_infos[aggr_count].count = 0; + for (int j = 0; j < DLOG_PRIO_MAX; ++j) + aggr_infos[aggr_count].count += recv_infos[i].count[j]; + aggr_count += 1; + } else { + for (int j = 0; j < DLOG_PRIO_MAX; ++j) + aggr_infos[aggr_count - 1].count += recv_infos[i].count[j]; + } + } -void qos_distribution_equal_dual(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) -{ - /* Anything below the "equal" threshold is unlimited. - * The ones above this limit get leftovers distributed - * among them, equally as well. */ + qos_distribution_func(qos, aggr_infos, aggr_count); - const int equal_threshold = qos->max_throughput / count; // rounded down to prevent negative values after subtraction - int remaining_throughput = qos->max_throughput; - int count_above_threshold = count; + const int len = aggr_count * 32; + __attribute__((cleanup(free_ptr))) char *buf = malloc(len); + int pos = 0; + for (int i = 0; i < aggr_count; ++i) + if (aggr_infos[i].count >= 0) { + int w = snprintf(buf + pos, pos - len, "pidlimit|%d=%d\n", aggr_infos[i].pid, aggr_infos[i].count); + pos += w; + assert(pos < len); + } - for (int i = 0; i < count; ++i) { - if (infos[i].count >= equal_threshold) - continue; + full_write(fd, buf, pos); // Ignore write errors +} - remaining_throughput -= infos[i].count; - -- count_above_threshold; - } +void qos_set_next_update_time(struct qos_module *qos, struct timespec now) +{ + qos->cancel_limits_at = now; + qos->cancel_limits_at.tv_sec += qos->limit_duration; +} - const int higher_threshold = divide_rounding_upwards(remaining_throughput, count_above_threshold); - for (int i = 0; i < count; ++i) - if (infos[i].count >= equal_threshold) - infos[i].count = higher_threshold; - else - infos[i].count = -1; +void qos_apply_limits(struct qos_module *qos) +{ + qos->currently_limiting = true; + qos_create_limits_file(qos, true); } -void qos_distribution_equal_multi(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) +void qos_relax_limits(struct qos_module *qos) { - /* Similar to 'dual', except anything not reaching the higher - * threshold also gets an unlimited pass (which increases the - * threshold again, potentially propagating it even more) */ - - int threshold = qos->max_throughput / count; - int remaining_throughput = qos->max_throughput; - int count_above_threshold = count; - - int prev_threshold = 0; - int below_current_threshold; - do { - below_current_threshold = 0; - for (int i = 0; i < count; ++i) { - if (infos[i].count >= threshold) - continue; - if (infos[i].count < prev_threshold) - continue; - - remaining_throughput -= infos[i].count; - -- count_above_threshold; - ++ below_current_threshold; - } - prev_threshold = threshold; - threshold = divide_rounding_upwards(remaining_throughput, count_above_threshold); - } while (below_current_threshold > 0); - - for (int i = 0; i < count; ++i) - if (infos[i].count >= threshold) - infos[i].count = threshold; - else - infos[i].count = -1; + qos->currently_limiting = false; + qos_create_limits_file(qos, false); } -void qos_distribution_proportional_talmud(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) +void qos_add_log(struct qos_module *qos, const struct dlogutil_entry *due) { - /* As proscribed by the talmudic contested garment rule. - * Probably makes little sense given how our QoS works but - * it's interesting and should work really well if we ever - * switch to the "preset limits" design that the feature - * was originally supposed to follow. */ + assert(qos); + assert(due); - if (qos->max_throughput > metrics_get_total(qos->log_metrics)) - return; + assert(qos->threshold > 0); + assert(qos->file_path); - inline int sort_by_count(const void *vlhs, const void *vrhs) { - const struct metrics_pid_aggr_info *lhs = vlhs, *rhs = vrhs; - return lhs->count - rhs->count; - } - qsort(infos, count, sizeof(*infos), sort_by_count); - - int remaining_count = count; - int sum_level = 0; - bool const upper_half = qos->max_throughput > metrics_get_total(qos->log_metrics) / 2; - int total = upper_half - ? metrics_get_total(qos->log_metrics) - qos->max_throughput - : qos->max_throughput - ; - - for (int i = 0; i < count; ++i) { - int const level_diff = (infos[i].count / 2) - sum_level; - int const total_diff = level_diff * remaining_count; - if (total_diff >= total) - break; - - total -= total_diff; - sum_level += level_diff; - infos[i].count = upper_half ? infos[i].count - sum_level : sum_level; - -- remaining_count; - } + metrics_add_log(qos->log_metrics, due); - if (remaining_count == 0) + const int diff = metrics_get_total(qos->log_metrics) - qos->threshold; + if (diff < 0) return; - const int final_level = sum_level + (total / remaining_count); - for (int i = count - remaining_count; i < count; ++i) - infos[i].count = upper_half ? infos[i].count - final_level : final_level; + if (diff == 0 || (qos->threshold_reapply && diff % qos->threshold_reapply == 0)) + qos_apply_limits(qos); // the inQoSition declares Exterminatus +} + +void qos_free(struct qos_module *qos) +{ + metrics_destroy(qos->log_metrics); + free(qos->file_path); } diff --git a/src/logger/qos.h b/src/logger/qos.h index 2d5848e..9aee812 100644 --- a/src/logger/qos.h +++ b/src/logger/qos.h @@ -44,3 +44,10 @@ void qos_distribution_equal_multi(struct qos_module *qos, struct metrics_pid_agg void qos_distribution_proportional_raw(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count); void qos_distribution_proportional_talmud(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count); +bool qos_is_enabled(const struct qos_module *qos); +void qos_create_limits_file(struct qos_module *qos, bool is_limiting); +void qos_set_next_update_time(struct qos_module *qos, struct timespec now); +void qos_apply_limits(struct qos_module *qos); +void qos_relax_limits(struct qos_module *qos); +void qos_add_log(struct qos_module *qos, const struct dlogutil_entry *due); +void qos_free(struct qos_module *qos); diff --git a/src/logger/qos_distributions.c b/src/logger/qos_distributions.c new file mode 100644 index 0000000..73d9568 --- /dev/null +++ b/src/logger/qos_distributions.c @@ -0,0 +1,135 @@ +#include "qos.h" + +#include +#include + +static inline int divide_rounding_upwards(int dividend, int divisor) +{ + return (dividend + (divisor - 1)) / (divisor ?: 1); +} + +void qos_distribution_proportional_raw(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) +{ + const double proportion = (double) qos->max_throughput / (metrics_get_total(qos->log_metrics) ?: 1); + + for (int i = 0; i < count; ++i) + infos[i].count = (int) (infos[i].count * proportion + 0.5); +} + +void qos_distribution_equal(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) +{ + /* round upwards so that it's never 0, to give all clients a chance + * to log at least something (so a developer can tell it's alive) */ + const int equal_limit_for_everybody = divide_rounding_upwards(qos->max_throughput, count); + + for (int i = 0; i < count; ++i) + infos[i].count = equal_limit_for_everybody; +} + + +void qos_distribution_equal_dual(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) +{ + /* Anything below the "equal" threshold is unlimited. + * The ones above this limit get leftovers distributed + * among them, equally as well. */ + + const int equal_threshold = qos->max_throughput / count; // rounded down to prevent negative values after subtraction + int remaining_throughput = qos->max_throughput; + int count_above_threshold = count; + + for (int i = 0; i < count; ++i) { + if (infos[i].count >= equal_threshold) + continue; + + remaining_throughput -= infos[i].count; + -- count_above_threshold; + } + + const int higher_threshold = divide_rounding_upwards(remaining_throughput, count_above_threshold); + for (int i = 0; i < count; ++i) + if (infos[i].count >= equal_threshold) + infos[i].count = higher_threshold; + else + infos[i].count = -1; +} + +void qos_distribution_equal_multi(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) +{ + /* Similar to 'dual', except anything not reaching the higher + * threshold also gets an unlimited pass (which increases the + * threshold again, potentially propagating it even more) */ + + int threshold = qos->max_throughput / count; + int remaining_throughput = qos->max_throughput; + int count_above_threshold = count; + + int prev_threshold = 0; + int below_current_threshold; + do { + below_current_threshold = 0; + for (int i = 0; i < count; ++i) { + if (infos[i].count >= threshold) + continue; + if (infos[i].count < prev_threshold) + continue; + + remaining_throughput -= infos[i].count; + -- count_above_threshold; + ++ below_current_threshold; + } + prev_threshold = threshold; + threshold = divide_rounding_upwards(remaining_throughput, count_above_threshold); + } while (below_current_threshold > 0); + + for (int i = 0; i < count; ++i) + if (infos[i].count >= threshold) + infos[i].count = threshold; + else + infos[i].count = -1; +} + +void qos_distribution_proportional_talmud(struct qos_module *qos, struct metrics_pid_aggr_info *infos, int count) +{ + /* As proscribed by the talmudic contested garment rule. + * Probably makes little sense given how our QoS works but + * it's interesting and should work really well if we ever + * switch to the "preset limits" design that the feature + * was originally supposed to follow. */ + + if (qos->max_throughput > metrics_get_total(qos->log_metrics)) + return; + + inline int sort_by_count(const void *vlhs, const void *vrhs) { + const struct metrics_pid_aggr_info *lhs = vlhs, *rhs = vrhs; + return lhs->count - rhs->count; + } + qsort(infos, count, sizeof(*infos), sort_by_count); + + int remaining_count = count; + int sum_level = 0; + bool const upper_half = qos->max_throughput > metrics_get_total(qos->log_metrics) / 2; + int total = upper_half + ? metrics_get_total(qos->log_metrics) - qos->max_throughput + : qos->max_throughput + ; + + for (int i = 0; i < count; ++i) { + int const level_diff = (infos[i].count / 2) - sum_level; + int const total_diff = level_diff * remaining_count; + if (total_diff >= total) + break; + + total -= total_diff; + sum_level += level_diff; + infos[i].count = upper_half ? infos[i].count - sum_level : sum_level; + -- remaining_count; + } + + if (remaining_count == 0) + return; + + const int final_level = sum_level + (total / remaining_count); + for (int i = count - remaining_count; i < count; ++i) + infos[i].count = upper_half ? infos[i].count - final_level : final_level; +} + diff --git a/src/shared/logcommon.c b/src/shared/logcommon.c index 3d8ce4e..b5f75b6 100644 --- a/src/shared/logcommon.c +++ b/src/shared/logcommon.c @@ -515,6 +515,21 @@ dlogutil_sorting_order_e get_order_from_string(const char *str) return DLOGUTIL_SORT_DEFAULT; } +int full_write(int fd, const char *buf, int size) +{ + while (size > 0) { + errno = 0; + int ret = write(fd, buf, size); + if (ret == -1 || ret == 0) + return errno != 0 ? -errno : -EIO; + + buf += ret; + size -= ret; + } + + return 0; +} + /** * @} -- 2.7.4