From ba78cdb6f6e562798c848d3ce9f04ef4d5414006 Mon Sep 17 00:00:00 2001 From: Mark Kemel Date: Mon, 3 Apr 2023 13:56:19 +0300 Subject: [PATCH] metrics: start unit job metrics This change allows the user to enable/disable metrics, and then if metrics are enabled - on every StartUnit job hirte manager will fetch unit start net time from properties, and subtract it from measured job time and report all the data via signal Signed-off-by: Mark Kemel --- data/org.containers.hirte.Manager.xml | 2 + data/org.containers.hirte.Metrics.xml | 19 ++ ...ontainers.hirte.internal.Agent.Metrics.xml | 11 ++ data/org.containers.hirte.internal.Agent.xml | 2 + src/agent/agent.c | 181 ++++++++++++++---- src/agent/agent.h | 6 + src/client/client.c | 146 ++++++++++++++ src/libhirte/common/parse-util.c | 2 +- src/libhirte/common/protocol.h | 4 + src/libhirte/common/time-util.c | 22 +++ src/libhirte/common/time-util.h | 12 ++ src/libhirte/meson.build | 2 + src/manager/job.c | 3 + src/manager/job.h | 3 + src/manager/manager.c | 57 +++++- src/manager/manager.h | 3 + src/manager/meson.build | 2 + src/manager/metrics.c | 138 +++++++++++++ src/manager/metrics.h | 10 + src/manager/node.c | 95 ++++++++- src/manager/node.h | 5 + 21 files changed, 679 insertions(+), 46 deletions(-) create mode 100644 data/org.containers.hirte.Metrics.xml create mode 100644 data/org.containers.hirte.internal.Agent.Metrics.xml create mode 100644 src/libhirte/common/time-util.c create mode 100644 src/libhirte/common/time-util.h create mode 100644 src/manager/metrics.c create mode 100644 src/manager/metrics.h diff --git a/data/org.containers.hirte.Manager.xml b/data/org.containers.hirte.Manager.xml index 5cfddfe4ee..8506797f7f 100644 --- a/data/org.containers.hirte.Manager.xml +++ b/data/org.containers.hirte.Manager.xml @@ -15,6 +15,8 @@ + + diff --git a/data/org.containers.hirte.Metrics.xml b/data/org.containers.hirte.Metrics.xml new file mode 100644 index 0000000000..69a8f98ec0 --- /dev/null +++ b/data/org.containers.hirte.Metrics.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + + + diff --git a/data/org.containers.hirte.internal.Agent.Metrics.xml b/data/org.containers.hirte.internal.Agent.Metrics.xml new file mode 100644 index 0000000000..fd0c4ea491 --- /dev/null +++ b/data/org.containers.hirte.internal.Agent.Metrics.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/data/org.containers.hirte.internal.Agent.xml b/data/org.containers.hirte.internal.Agent.xml index fe90618cde..ffbcc0c06a 100644 --- a/data/org.containers.hirte.internal.Agent.xml +++ b/data/org.containers.hirte.internal.Agent.xml @@ -52,6 +52,8 @@ + + diff --git a/src/agent/agent.c b/src/agent/agent.c index 39818e834f..1d42a1f421 100644 --- a/src/agent/agent.c +++ b/src/agent/agent.c @@ -11,6 +11,7 @@ #include "libhirte/common/event-util.h" #include "libhirte/common/opt.h" #include "libhirte/common/parse-util.h" +#include "libhirte/common/time-util.h" #include "libhirte/log/log.h" #include "libhirte/service/shutdown.h" @@ -45,6 +46,52 @@ static bool void *userdata, free_func_t free_userdata); +/* Keep track of outstanding systemd job and connect it back to + the originating hirte job id so we can proxy changes to it. */ +typedef struct { + int ref_count; + + Agent *agent; + uint32_t hirte_job_id; + uint64_t job_start_micros; + char *unit; + char *method; +} AgentJobOp; + + +static AgentJobOp *agent_job_op_ref(AgentJobOp *op) { + op->ref_count++; + return op; +} + +static void agent_job_op_unref(AgentJobOp *op) { + op->ref_count--; + if (op->ref_count != 0) { + return; + } + + agent_unref(op->agent); + free(op->unit); + free(op->method); + free(op); +} + +DEFINE_CLEANUP_FUNC(AgentJobOp, agent_job_op_unref) +#define _cleanup_agent_job_op_ _cleanup_(agent_job_op_unrefp) + +static AgentJobOp *agent_job_new(Agent *agent, uint32_t hirte_job_id, const char *unit, const char *method) { + AgentJobOp *op = malloc0(sizeof(AgentJobOp)); + if (op) { + op->ref_count = 1; + op->agent = agent_ref(agent); + op->hirte_job_id = hirte_job_id; + op->job_start_micros = 0; + op->unit = strdup(unit); + op->method = strdup(method); + } + return op; +} + bool agent_is_connected(Agent *agent) { return agent != NULL && agent->connection_state == AGENT_CONNECTION_STATE_CONNECTED; } @@ -192,6 +239,10 @@ static void systemd_request_set_userdata(SystemdRequest *req, void *userdata, fr static bool systemd_request_start(SystemdRequest *req, sd_bus_message_handler_t callback) { Agent *agent = req->agent; + AgentJobOp *op = req->userdata; + if (op != NULL) { + op->job_start_micros = get_time_micros(); + } int r = sd_bus_call_async( agent->systemd_dbus, &req->slot, req->message, callback, req, HIRTE_DEFAULT_DBUS_TIMEOUT); @@ -337,6 +388,7 @@ Agent *agent_new(void) { agent->connection_retry_count = 0; agent->wildcard_subscription_active = false; agent->name = get_hostname(); + agent->metrics_enabled = false; return steal_pointer(&agent); } @@ -413,6 +465,10 @@ void agent_unref(Agent *agent) { sd_event_unrefp(&agent->event); } + if (agent->metrics_slot != NULL) { + sd_bus_slot_unrefp(&agent->metrics_slot); + } + if (agent->peer_dbus != NULL) { sd_bus_unrefp(&agent->peer_dbus); } @@ -824,50 +880,19 @@ static int agent_method_set_unit_properties(sd_bus_message *m, void *userdata, U return 1; } - -/* Keep track of outstanding systemd job and connect it back to - the originating hirte job id so we can proxy changes to it. */ -typedef struct { - int ref_count; - - Agent *agent; - uint32_t hirte_job_id; -} AgentJobOp; - - -static AgentJobOp *agent_job_op_ref(AgentJobOp *op) { - op->ref_count++; - return op; -} - -static void agent_job_op_unref(AgentJobOp *op) { - op->ref_count--; - if (op->ref_count != 0) { - return; - } - - agent_unref(op->agent); - free(op); -} - -DEFINE_CLEANUP_FUNC(AgentJobOp, agent_job_op_unref) -#define _cleanup_agent_job_op_ _cleanup_(agent_job_op_unrefp) - -static AgentJobOp *agent_job_new(Agent *agent, uint32_t hirte_job_id) { - AgentJobOp *op = malloc0(sizeof(AgentJobOp)); - if (op) { - op->ref_count = 1; - op->agent = agent_ref(agent); - op->hirte_job_id = hirte_job_id; - } - return op; -} - - static void agent_job_done(UNUSED sd_bus_message *m, const char *result, void *userdata) { AgentJobOp *op = userdata; Agent *agent = op->agent; + if (agent->metrics_enabled) { + agent_send_job_metrics( + agent, + op->unit, + op->method, + // NOLINTNEXTLINE(bugprone-narrowing-conversions, cppcoreguidelines-narrowing-conversions) + finalize_time_interval_micros(op->job_start_micros)); + } + hirte_log_infof("Sending JobDone %u, result: %s", op->hirte_job_id, result); int r = sd_bus_emit_signal( @@ -929,7 +954,7 @@ static int agent_run_unit_lifecycle_method(sd_bus_message *m, Agent *agent, cons return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_FAILED, "Internal error"); } - _cleanup_agent_job_op_ AgentJobOp *op = agent_job_new(agent, job_id); + _cleanup_agent_job_op_ AgentJobOp *op = agent_job_new(agent, job_id, name, method); if (op == NULL) { return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_FAILED, "Internal error"); } @@ -1213,6 +1238,60 @@ static int agent_method_stop_dep(sd_bus_message *m, void *userdata, UNUSED sd_bu } +/*************************************************************************** + ********** org.containers.hirte.internal.Agent.EnableMetrics ************** + **************************************************************************/ + +static const sd_bus_vtable agent_metrics_vtable[] = { + SD_BUS_VTABLE_START(0), + SD_BUS_SIGNAL_WITH_NAMES( + "AgentJobMetrics", + "sst", + SD_BUS_PARAM(unit) SD_BUS_PARAM(method) SD_BUS_PARAM(systemd_job_time_micros), + 0), + SD_BUS_VTABLE_END +}; + +static int agent_method_enable_metrics(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) { + Agent *agent = userdata; + int r = 0; + + if (agent->metrics_enabled) { + return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_INVALID_ARGS, "Metrics already enabled"); + } + agent->metrics_enabled = true; + r = sd_bus_add_object_vtable( + agent->peer_dbus, + &agent->metrics_slot, + INTERNAL_AGENT_METRICS_OBJECT_PATH, + INTERNAL_AGENT_METRICS_INTERFACE, + agent_metrics_vtable, + NULL); + if (r < 0) { + hirte_log_errorf("Failed to add metrics vtable: %s", strerror(-r)); + return r; + } + hirte_log_debug("Metrics enabled"); + return sd_bus_reply_method_return(m, ""); +} + +/**************************************************************************** + ********** org.containers.hirte.internal.Agent.DisableMetrics ************** + ***************************************************************************/ + +static int agent_method_disable_metrics(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) { + Agent *agent = userdata; + + if (!agent->metrics_enabled) { + return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_INVALID_ARGS, "Metrics already disabled"); + } + agent->metrics_enabled = false; + sd_bus_slot_unrefp(&agent->metrics_slot); + hirte_log_debug("Metrics disabled"); + return sd_bus_reply_method_return(m, ""); +} + + static const sd_bus_vtable internal_agent_vtable[] = { SD_BUS_VTABLE_START(0), SD_BUS_METHOD("ListUnits", "", UNIT_INFO_STRUCT_ARRAY_TYPESTRING, agent_method_list_units, 0), @@ -1225,6 +1304,8 @@ static const sd_bus_vtable internal_agent_vtable[] = { SD_BUS_METHOD("ReloadUnit", "ssu", "", agent_method_reload_unit, 0), SD_BUS_METHOD("Subscribe", "s", "", agent_method_subscribe, 0), SD_BUS_METHOD("Unsubscribe", "s", "", agent_method_unsubscribe, 0), + SD_BUS_METHOD("EnableMetrics", "", "", agent_method_enable_metrics, 0), + SD_BUS_METHOD("DisableMetrics", "", "", agent_method_disable_metrics, 0), SD_BUS_SIGNAL_WITH_NAMES("JobDone", "us", SD_BUS_PARAM(id) SD_BUS_PARAM(result), 0), SD_BUS_SIGNAL_WITH_NAMES("JobStateChanged", "us", SD_BUS_PARAM(id) SD_BUS_PARAM(state), 0), SD_BUS_SIGNAL_WITH_NAMES( @@ -2033,5 +2114,23 @@ static int agent_stop_local_proxy_service(Agent *agent, ProxyService *proxy) { } + return 0; +} + +int agent_send_job_metrics(Agent *agent, char *unit, char *method, uint64_t systemd_job_time) { + hirte_log_debugf("Sending agent %s job metrics on unit %s: %ldus", unit, method, systemd_job_time); + int r = sd_bus_emit_signal( + agent->peer_dbus, + INTERNAL_AGENT_METRICS_OBJECT_PATH, + INTERNAL_AGENT_METRICS_INTERFACE, + "AgentJobMetrics", + "sst", + unit, + method, + systemd_job_time); + if (r < 0) { + hirte_log_errorf("Failed to emit metric signal: %s", strerror(-r)); + } + return 0; } diff --git a/src/agent/agent.h b/src/agent/agent.h index cd3029e119..ddd8f275f5 100644 --- a/src/agent/agent.h +++ b/src/agent/agent.h @@ -60,12 +60,16 @@ struct Agent { char *orch_addr; char *api_bus_service_name; + bool metrics_enabled; + sd_event *event; sd_bus *api_bus; sd_bus *systemd_dbus; sd_bus *peer_dbus; + sd_bus_slot *metrics_slot; + LIST_HEAD(SystemdRequest, outstanding_requests); LIST_HEAD(JobTracker, tracked_jobs); LIST_HEAD(ProxyService, proxy_services); @@ -105,5 +109,7 @@ bool agent_is_connected(Agent *agent); void agent_remove_proxy(Agent *agent, ProxyService *proxy, bool emit); +int agent_send_job_metrics(Agent *agent, char *unit, char *method, uint64_t systemd_job_time); + DEFINE_CLEANUP_FUNC(Agent, agent_unref) #define _cleanup_agent_ _cleanup_(agent_unrefp) diff --git a/src/client/client.c b/src/client/client.c index 835299385e..016093124d 100644 --- a/src/client/client.c +++ b/src/client/client.c @@ -2,6 +2,7 @@ #include #include "libhirte/bus/utils.h" +#include "libhirte/common/time-util.h" #include "libhirte/service/shutdown.h" #include "client.h" @@ -289,6 +290,137 @@ int method_lifecycle_action_on(Client *client, char *node_name, char *unit, char return r; } +int method_metrics_toggle(Client *client, char *method) { + _cleanup_sd_bus_error_ sd_bus_error error = SD_BUS_ERROR_NULL; + _cleanup_sd_bus_message_ sd_bus_message *message = NULL; + int r = 0; + + r = sd_bus_call_method( + client->api_bus, + HIRTE_INTERFACE_BASE_NAME, + HIRTE_OBJECT_PATH, + MANAGER_INTERFACE, + method, + &error, + &message, + ""); + if (r < 0) { + fprintf(stderr, "Failed to issue method call: %s\n", error.message); + return r; + } + + printf("Done\n"); + return r; +} + +static int match_start_unit_job_metrics_signal( + sd_bus_message *m, UNUSED void *userdata, UNUSED sd_bus_error *error) { + const char *job_path = NULL; + const uint64_t job_measured_time = 0; + const uint64_t unit_net_start_time = 0; + const char *node_name = NULL; + const char *unit = NULL; + int r = 0; + + r = sd_bus_message_read( + m, "ssstt", &node_name, &job_path, &unit, &job_measured_time, &unit_net_start_time); + if (r < 0) { + fprintf(stderr, "Can't parse job result: %s", strerror(-r)); + return r; + } + + printf("[%s] Job %s to start unit %s:\n\t" + "Hirte job gross measured time: %.1lfms\n\t" + "Unit net start time (from properties): %.1lfms\n", + node_name, + job_path, + unit, + micros_to_millis(job_measured_time), + micros_to_millis(unit_net_start_time)); + + return 0; +} + +static int match_agent_job_metrics_signal(sd_bus_message *m, UNUSED void *userdata, UNUSED sd_bus_error *error) { + const char *node_name = NULL; + char *unit = NULL; + char *method = NULL; + uint64_t systemd_job_time = 0; + int r = 0; + r = sd_bus_message_read(m, "ssst", &node_name, &unit, &method, &systemd_job_time); + if (r < 0) { + fprintf(stderr, "Can't parse metric: %s", strerror(-r)); + return r; + } + + printf("[%s] Agent systemd %s job on %s net measured time: %.1lfms\n", + node_name, + method, + unit, + micros_to_millis(systemd_job_time)); + return 0; +} + +int method_metrics_listen(Client *client) { + int r = 0; + + _cleanup_sd_event_ sd_event *event = NULL; + r = sd_event_default(&event); + if (r < 0) { + fprintf(stderr, "Failed to create event loop: %s", strerror(-r)); + return r; + } + + r = sd_bus_attach_event(client->api_bus, event, SD_EVENT_PRIORITY_NORMAL); + if (r < 0) { + fprintf(stderr, "Failed to attach bus to event: %s", strerror(-r)); + return r; + } + + r = sd_bus_match_signal( + client->api_bus, + NULL, + HIRTE_INTERFACE_BASE_NAME, + METRICS_OBJECT_PATH, + METRICS_INTERFACE, + "StartUnitJobMetrics", + match_start_unit_job_metrics_signal, + client); + if (r < 0) { + fprintf(stderr, "Failed to add StartUnitJobMetrics api bus match: %s", strerror(-r)); + return r; + } + + r = sd_bus_match_signal( + client->api_bus, + NULL, + HIRTE_INTERFACE_BASE_NAME, + METRICS_OBJECT_PATH, + METRICS_INTERFACE, + "AgentJobMetrics", + match_agent_job_metrics_signal, + client); + if (r < 0) { + fprintf(stderr, "Failed to add AgentJobMetrics api bus match: %s", strerror(-r)); + return r; + } + + r = event_loop_add_shutdown_signals(event); + if (r < 0) { + fprintf(stderr, "Failed to add signals to agent event loop: %s", strerror(-r)); + return r; + } + + printf("Waiting for metrics signals...\n"); + r = sd_event_loop(event); + if (r < 0) { + fprintf(stderr, "Starting event loop failed: %s", strerror(-r)); + return r; + } + + return r; +} + int client_call_manager(Client *client) { int r = 0; @@ -335,6 +467,16 @@ int client_call_manager(Client *client) { return -EINVAL; } r = method_monitor_units_on_nodes(client->api_bus, client->opargv[0], client->opargv[1]); + } else if (streq(client->op, "metrics")) { + if (client->opargc != 1) { + return -EINVAL; + } else if (streq(client->opargv[0], "enable")) { + method_metrics_toggle(client, "EnableMetrics"); + } else if (streq(client->opargv[0], "disable")) { + method_metrics_toggle(client, "DisableMetrics"); + } else if (streq(client->opargv[0], "listen")) { + method_metrics_listen(client); + } } else { return -EINVAL; } @@ -358,5 +500,9 @@ int print_client_usage(char *argv) { printf(" usage: reload nodename unitname\n"); printf(" - restart: restarts a specific systemd service (or timer, or slice) on a specific node\n"); printf(" usage: restart nodename unitname\n"); + printf(" - metrics [enable|disable]: enables/disables metrics reporting\n"); + printf(" usage: metrics [enable|disable]\n"); + printf(" - metrics listen: listen and print incoming metrics reports\n"); + printf(" usage: metrics listen\n"); return 0; } diff --git a/src/libhirte/common/parse-util.c b/src/libhirte/common/parse-util.c index 53b5b5f5ab..b500825779 100644 --- a/src/libhirte/common/parse-util.c +++ b/src/libhirte/common/parse-util.c @@ -39,4 +39,4 @@ bool parse_port(const char *in, uint16_t *ret) { *ret = (uint16_t) l; return true; -} +} \ No newline at end of file diff --git a/src/libhirte/common/protocol.h b/src/libhirte/common/protocol.h index 4246c73f97..ab55ee5e83 100644 --- a/src/libhirte/common/protocol.h +++ b/src/libhirte/common/protocol.h @@ -22,20 +22,24 @@ #define NODE_INTERFACE HIRTE_INTERFACE_BASE_NAME ".Node" #define JOB_INTERFACE HIRTE_INTERFACE_BASE_NAME ".Job" #define MONITOR_INTERFACE HIRTE_INTERFACE_BASE_NAME ".Monitor" +#define METRICS_INTERFACE HIRTE_INTERFACE_BASE_NAME ".Metrics" #define NODE_OBJECT_PATH_PREFIX HIRTE_OBJECT_PATH "/node" #define JOB_OBJECT_PATH_PREFIX HIRTE_OBJECT_PATH "/job" #define MONITOR_OBJECT_PATH_PREFIX HIRTE_OBJECT_PATH "/monitor" +#define METRICS_OBJECT_PATH HIRTE_OBJECT_PATH "/metrics" /* Internal objects */ #define INTERNAL_MANAGER_OBJECT_PATH HIRTE_OBJECT_PATH "/internal" #define INTERNAL_AGENT_OBJECT_PATH HIRTE_OBJECT_PATH "/internal/agent" #define INTERNAL_PROXY_OBJECT_PATH_PREFIX HIRTE_OBJECT_PATH "/internal/proxy" +#define INTERNAL_AGENT_METRICS_OBJECT_PATH INTERNAL_AGENT_OBJECT_PATH "/metrics" /* Internal interfaces */ #define INTERNAL_MANAGER_INTERFACE HIRTE_INTERFACE_BASE_NAME ".internal.Manager" #define INTERNAL_AGENT_INTERFACE HIRTE_INTERFACE_BASE_NAME ".internal.Agent" #define INTERNAL_PROXY_INTERFACE HIRTE_INTERFACE_BASE_NAME ".internal.Proxy" +#define INTERNAL_AGENT_METRICS_INTERFACE INTERNAL_AGENT_INTERFACE ".Metrics" #define HIRTE_BUS_ERROR_OFFLINE "org.containers.hirte.Offline" #define HIRTE_BUS_ERROR_NO_SUCH_SUBSCRIPTION "org.containers.hirte.NoSuchSubscription" diff --git a/src/libhirte/common/time-util.c b/src/libhirte/common/time-util.c new file mode 100644 index 0000000000..c3d342fc4a --- /dev/null +++ b/src/libhirte/common/time-util.c @@ -0,0 +1,22 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ +#include + +#include "time-util.h" + +uint64_t get_time_micros() { + struct timespec now; + if (clock_gettime(CLOCK_REALTIME, &now) < 0) { + return 0; + } + uint64_t now_micros = now.tv_sec * sec_to_microsec_multiplier + + (uint64_t) ((double) now.tv_nsec * nanosec_to_microsec_multiplier); + return now_micros; +} + +uint64_t finalize_time_interval_micros(int64_t start_time_micros) { + return get_time_micros() - start_time_micros; +} + +double micros_to_millis(uint64_t time_micros) { + return (double) time_micros * microsec_to_millisec_multiplier; +} diff --git a/src/libhirte/common/time-util.h b/src/libhirte/common/time-util.h new file mode 100644 index 0000000000..fe9956b4cd --- /dev/null +++ b/src/libhirte/common/time-util.h @@ -0,0 +1,12 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ +#pragma once + +#include + +static const uint64_t sec_to_microsec_multiplier = 1000000; +static const double microsec_to_millisec_multiplier = 1e-3; +static const double nanosec_to_microsec_multiplier = 1e-3; + +uint64_t get_time_micros(); +uint64_t finalize_time_interval_micros(int64_t start_time_micros); +double micros_to_millis(uint64_t time_micros); diff --git a/src/libhirte/meson.build b/src/libhirte/meson.build index 356f26dad4..5033705760 100644 --- a/src/libhirte/meson.build +++ b/src/libhirte/meson.build @@ -15,6 +15,8 @@ libhirte_src = [ 'common/opt.h', 'common/parse-util.c', 'common/parse-util.h', + 'common/time-util.c', + 'common/time-util.h', 'hashmap/hashmap.c', 'hashmap/hashmap.h', 'ini/ini.c', diff --git a/src/manager/job.c b/src/manager/job.c index 3b1235317e..cee4c4ee58 100644 --- a/src/manager/job.c +++ b/src/manager/job.c @@ -65,6 +65,9 @@ Job *job_new(Node *node, const char *unit, const char *type) { return NULL; } + job->job_start_micros = 0; + job->job_end_micros = 0; + return steal_pointer(&job); } diff --git a/src/manager/job.h b/src/manager/job.h index f0099e613a..b2c98541dd 100644 --- a/src/manager/job.h +++ b/src/manager/job.h @@ -19,6 +19,9 @@ struct Job { sd_bus_slot *export_slot; + uint64_t job_start_micros; + uint64_t job_end_micros; + LIST_FIELDS(Job, jobs); }; diff --git a/src/manager/manager.c b/src/manager/manager.c index 4f576ce2a8..0097bca618 100644 --- a/src/manager/manager.c +++ b/src/manager/manager.c @@ -7,12 +7,14 @@ #include "libhirte/common/cfg.h" #include "libhirte/common/common.h" #include "libhirte/common/parse-util.h" +#include "libhirte/common/time-util.h" #include "libhirte/log/log.h" #include "libhirte/service/shutdown.h" #include "libhirte/socket.h" #include "job.h" #include "manager.h" +#include "metrics.h" #include "monitor.h" #include "node.h" @@ -39,6 +41,7 @@ Manager *manager_new(void) { manager->port = HIRTE_DEFAULT_PORT; manager->api_bus_service_name = steal_pointer(&service_name); manager->event = steal_pointer(&event); + manager->metrics_enabled = false; LIST_HEAD_INIT(manager->nodes); LIST_HEAD_INIT(manager->anonymous_nodes); LIST_HEAD_INIT(manager->jobs); @@ -69,6 +72,7 @@ void manager_unref(Manager *manager) { sd_bus_slot_unrefp(&manager->name_owner_changed_slot); sd_bus_slot_unrefp(&manager->filter_slot); sd_bus_slot_unrefp(&manager->manager_slot); + sd_bus_slot_unrefp(&manager->metrics_slot); sd_bus_unrefp(&manager->api_bus); Job *job = NULL; @@ -202,7 +206,6 @@ bool manager_add_job(Manager *manager, Job *job) { } void manager_remove_job(Manager *manager, Job *job, const char *result) { - int r = sd_bus_emit_signal( manager->api_bus, HIRTE_MANAGER_OBJECT_PATH, @@ -220,6 +223,9 @@ void manager_remove_job(Manager *manager, Job *job, const char *result) { } LIST_REMOVE(jobs, manager->jobs, job); + if (manager->metrics_enabled && streq(job->type, "start")) { + metrics_produce_job_report(job); + } job_unref(job); } @@ -239,6 +245,9 @@ void manager_finish_job(Manager *manager, uint32_t job_id, const char *result) { Job *job = NULL; LIST_FOREACH(jobs, job, manager->jobs) { if (job->id == job_id) { + if (manager->metrics_enabled) { + job->job_end_micros = get_time_micros(); + } manager_remove_job(manager, job, result); break; } @@ -736,6 +745,50 @@ static int manager_method_create_monitor(sd_bus_message *m, void *userdata, UNUS return 1; } +/************************************************************************* + ************** org.containers.hirte.Manager.EnableMetrics *************** + ************************************************************************/ +static int manager_method_metrics_enable(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) { + Manager *manager = userdata; + Node *node = NULL; + int r = 0; + if (manager->metrics_enabled) { + return sd_bus_reply_method_errorf( + m, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Metrics already enabled"); + } + r = metrics_export(manager); + if (r < 0) { + return sd_bus_reply_method_errorf( + m, SD_BUS_ERROR_FAILED, "Failed to register metrics service: %s", strerror(-r)); + } + manager->metrics_enabled = true; + LIST_FOREACH(nodes, node, manager->nodes) { + node_enable_metrics(node); + } + hirte_log_debug("Metrics enabled"); + return sd_bus_reply_method_return(m, ""); +} + +/************************************************************************* + ************** org.containers.hirte.Manager.DisableMetrics ************** + ************************************************************************/ +static int manager_method_metrics_disable(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) { + Manager *manager = userdata; + Node *node = NULL; + if (!manager->metrics_enabled) { + return sd_bus_reply_method_errorf( + m, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Metrics already disabled"); + } + sd_bus_slot_unrefp(&manager->metrics_slot); + manager->metrics_slot = NULL; + manager->metrics_enabled = false; + LIST_FOREACH(nodes, node, manager->nodes) { + node_disable_metrics(node); + } + hirte_log_debug("Metrics disabled"); + return sd_bus_reply_method_return(m, ""); +} + static const sd_bus_vtable manager_vtable[] = { SD_BUS_VTABLE_START(0), SD_BUS_METHOD("Ping", "s", "s", manager_method_ping, 0), @@ -743,6 +796,8 @@ static const sd_bus_vtable manager_vtable[] = { SD_BUS_METHOD("ListNodes", "", "a(sos)", manager_method_list_nodes, 0), SD_BUS_METHOD("GetNode", "s", "o", manager_method_get_node, 0), SD_BUS_METHOD("CreateMonitor", "", "o", manager_method_create_monitor, 0), + SD_BUS_METHOD("EnableMetrics", "", "", manager_method_metrics_enable, 0), + SD_BUS_METHOD("DisableMetrics", "", "", manager_method_metrics_disable, 0), SD_BUS_SIGNAL_WITH_NAMES("JobNew", "uo", SD_BUS_PARAM(id) SD_BUS_PARAM(job), 0), SD_BUS_SIGNAL_WITH_NAMES( "JobRemoved", diff --git a/src/manager/manager.h b/src/manager/manager.h index 64df5eed69..5a39cfe1db 100644 --- a/src/manager/manager.h +++ b/src/manager/manager.h @@ -22,6 +22,9 @@ struct Manager { sd_bus_slot *manager_slot; sd_bus_slot *filter_slot; sd_bus_slot *name_owner_changed_slot; + sd_bus_slot *metrics_slot; + + bool metrics_enabled; int n_nodes; LIST_HEAD(Node, nodes); diff --git a/src/manager/meson.build b/src/manager/meson.build index 30a1130a98..86d36cf733 100644 --- a/src/manager/meson.build +++ b/src/manager/meson.build @@ -9,6 +9,8 @@ orch_src = [ 'node.c', 'job.h', 'job.c', + 'metrics.c', + 'metrics.h', 'monitor.h', 'monitor.c', 'proxy_monitor.c', diff --git a/src/manager/metrics.c b/src/manager/metrics.c new file mode 100644 index 0000000000..a51c3a82e0 --- /dev/null +++ b/src/manager/metrics.c @@ -0,0 +1,138 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ + +#include "libhirte/common/common.h" +#include "libhirte/common/parse-util.h" +#include "libhirte/log/log.h" + +#include "job.h" +#include "manager.h" +#include "metrics.h" +#include "node.h" + +static const sd_bus_vtable metrics_api_vtable[] = { + SD_BUS_VTABLE_START(0), + + SD_BUS_SIGNAL_WITH_NAMES( + "StartUnitJobMetrics", + "ssstt", + SD_BUS_PARAM(node_name) SD_BUS_PARAM(job_id) SD_BUS_PARAM(unit) SD_BUS_PARAM( + job_measured_time_micros) SD_BUS_PARAM(unit_start_prop_time_micros), + 0), + SD_BUS_SIGNAL_WITH_NAMES( + "AgentJobMetrics", + "ssst", + SD_BUS_PARAM(node_name) SD_BUS_PARAM(unit) SD_BUS_PARAM(method) + SD_BUS_PARAM(systemd_job_time_micros), + 0), + SD_BUS_VTABLE_END +}; + +int metrics_export(Manager *manager) { + int r = sd_bus_add_object_vtable( + manager->api_bus, + &manager->metrics_slot, + METRICS_OBJECT_PATH, + METRICS_INTERFACE, + metrics_api_vtable, + NULL); + if (r < 0) { + hirte_log_errorf("Failed to add API metrics vtable: %s", strerror(-r)); + return r; + } + + return 0; +} + +static int node_metrics_match_agent_job(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) { + Node *node = userdata; + char *unit = NULL; + char *method = NULL; + uint64_t systemd_job_time = 0; + int r = sd_bus_message_read(m, "sst", &unit, &method, &systemd_job_time); + if (r < 0) { + hirte_log_errorf("Invalid generic int64 metric signal: %s", strerror(-r)); + return r; + } + hirte_log_debugf( + "Reporting agent %s job metrics on unit %s: %ldus for node %s", + unit, + method, + systemd_job_time, + node->name); + r = sd_bus_emit_signal( + node->manager->api_bus, + METRICS_OBJECT_PATH, + METRICS_INTERFACE, + "AgentJobMetrics", + "ssst", + node->name, + unit, + method, + systemd_job_time); + if (r < 0) { + hirte_log_errorf("Failed to emit StartUnitJobMetrics signal: %s", strerror(-r)); + return r; + } + + return 0; +} + +bool metrics_node_signal_matching_register(Node *node) { + int r = sd_bus_match_signal( + node->agent_bus, + &node->metrics_matching_slot, + NULL, + INTERNAL_AGENT_METRICS_OBJECT_PATH, + INTERNAL_AGENT_METRICS_INTERFACE, + "AgentJobMetrics", + node_metrics_match_agent_job, + node); + if (r < 0) { + hirte_log_errorf("Failed to add metrics signal matching: %s", strerror(-r)); + return false; + } + + return true; +} + +void metrics_produce_job_report(Job *job) { + int r = 0; + uint64_t inactive_exit_timestamp = 0, active_enter_timestamp = 0, unit_net_start_time_micros = 0; + uint64_t job_measured_time_micros = 0; + r = node_method_get_unit_uint64_property_sync( + job->node, job->unit, "InactiveExitTimestampMonotonic", &inactive_exit_timestamp); + if (r < 0) { + hirte_log_errorf("Failed to get unit property InactiveExitTimestampMonotonic: %s", strerror(-r)); + return; + } + + r = node_method_get_unit_uint64_property_sync( + job->node, job->unit, "ActiveEnterTimestampMonotonic", &active_enter_timestamp); + if (r < 0) { + hirte_log_errorf("Failed to get unit property ActiveEnterTimestampMonotonic: %s", strerror(-r)); + return; + } + + unit_net_start_time_micros = (active_enter_timestamp - inactive_exit_timestamp); + job_measured_time_micros = job->job_end_micros - job->job_start_micros; + + hirte_log_debugf( + "Reporting job metrics: Job measured time: %ldus, Unit start time from properties: %ldus", + job_measured_time_micros, + unit_net_start_time_micros); + + r = sd_bus_emit_signal( + job->node->manager->api_bus, + METRICS_OBJECT_PATH, + METRICS_INTERFACE, + "StartUnitJobMetrics", + "ssstt", + job->node->name, + job->object_path, + job->unit, + job_measured_time_micros, + unit_net_start_time_micros); + if (r < 0) { + hirte_log_errorf("Failed to emit StartUnitJobMetrics signal: %s", strerror(-r)); + } +} diff --git a/src/manager/metrics.h b/src/manager/metrics.h new file mode 100644 index 0000000000..1e0aa132aa --- /dev/null +++ b/src/manager/metrics.h @@ -0,0 +1,10 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ +#pragma once + +#include + +#include "types.h" + +int metrics_export(Manager *manager); +bool metrics_node_signal_matching_register(Node *node); +void metrics_produce_job_report(Job *job); diff --git a/src/manager/node.c b/src/manager/node.c index 35e63ef4f5..735326f3fb 100644 --- a/src/manager/node.c +++ b/src/manager/node.c @@ -1,9 +1,12 @@ /* SPDX-License-Identifier: GPL-2.0-or-later */ #include "libhirte/bus/utils.h" +#include "libhirte/common/parse-util.h" +#include "libhirte/common/time-util.h" #include "libhirte/log/log.h" #include "job.h" #include "manager.h" +#include "metrics.h" #include "monitor.h" #include "node.h" #include "proxy_monitor.h" @@ -786,6 +789,9 @@ void node_unset_agent_bus(Node *node) { sd_bus_slot_unrefp(&node->internal_manager_slot); node->internal_manager_slot = NULL; + sd_bus_slot_unrefp(&node->metrics_matching_slot); + node->metrics_matching_slot = NULL; + sd_bus_unrefp(&node->agent_bus); node->agent_bus = NULL; @@ -839,6 +845,10 @@ static int node_method_register(sd_bus_message *m, void *userdata, UNUSED sd_bus m, SD_BUS_ERROR_FAILED, "Internal error: Couldn't set agent bus"); } + if (manager->metrics_enabled) { + node_enable_metrics(named_node); + } + node_unset_agent_bus(node); hirte_log_infof("Registered managed node from fd %d as '%s'", sd_bus_get_fd(agent_bus), name); @@ -1373,6 +1383,7 @@ static int node_run_unit_lifecycle_method( sd_bus_message *m, Node *node, const char *job_type, const char *method) { const char *unit = NULL; const char *mode = NULL; + uint64_t start_time = get_time_micros(); int r = sd_bus_message_read(m, "ss", &unit, &mode); if (r < 0) { @@ -1384,6 +1395,10 @@ static int node_run_unit_lifecycle_method( return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_FAILED, "Out of memory"); } + if (node->manager->metrics_enabled) { + setup->job->job_start_micros = start_time; + } + _cleanup_agent_request_ AgentRequest *req = NULL; node_create_request( &req, @@ -1455,9 +1470,11 @@ static int send_agent_simple_message(Node *node, const char *method, const char return r; } - r = sd_bus_message_append(m, "s", arg); - if (r < 0) { - return r; + if (arg != NULL) { + r = sd_bus_message_append(m, "s", arg); + if (r < 0) { + return r; + } } return sd_bus_send(node->agent_bus, m, NULL); @@ -1699,3 +1716,75 @@ int node_remove_proxy_dependency(Node *node, const char *unit_name) { return 0; } + +int node_method_get_unit_uint64_property_sync(Node *node, char *unit, char *property, uint64_t *value) { + int r = 0; + _cleanup_sd_bus_message_ sd_bus_message *message = NULL; + sd_bus_error error = SD_BUS_ERROR_NULL; + r = sd_bus_call_method( + node->agent_bus, + HIRTE_AGENT_DBUS_NAME, + INTERNAL_AGENT_OBJECT_PATH, + INTERNAL_AGENT_INTERFACE, + "GetUnitProperty", + &error, + &message, + "sss", + unit, + "org.freedesktop.systemd1.Unit", + property); + if (r < 0) { + hirte_log_errorf("Failed to issue GetUnitProperty call: %s", error.message); + sd_bus_error_free(&error); + return r; + } + + r = sd_bus_message_enter_container(message, SD_BUS_TYPE_VARIANT, "t"); + if (r < 0) { + hirte_log_errorf("Failed to parse response message: %s", strerror(-r)); + return r; + } + + r = sd_bus_message_read_basic(message, SD_BUS_TYPE_UINT64, value); + if (r < 0) { + hirte_log_errorf("Failed to parse response message: %s", strerror(-r)); + return r; + } + + r = sd_bus_message_exit_container(message); + if (r < 0) { + hirte_log_errorf("Failed to parse response message: %s", strerror(-r)); + return r; + } + + return 0; +} + +void node_enable_metrics(Node *node) { + if (!node_has_agent(node)) { + return; + } + + int r = send_agent_simple_message(node, "EnableMetrics", NULL); + if (r < 0) { + hirte_log_error("Failed to enable metrics on agent"); + } + + if (!metrics_node_signal_matching_register(node)) { + hirte_log_error("Failed to enable metrics on agent"); + } +} + +void node_disable_metrics(Node *node) { + if (!node_has_agent(node)) { + return; + } + + int r = send_agent_simple_message(node, "DisableMetrics", NULL); + if (r < 0) { + hirte_log_error("Failed to disable metrics on agent"); + } + + sd_bus_slot_unrefp(&node->metrics_matching_slot); + node->metrics_matching_slot = NULL; +} diff --git a/src/manager/node.h b/src/manager/node.h index 720f3fa664..9ec63e7c85 100644 --- a/src/manager/node.h +++ b/src/manager/node.h @@ -40,6 +40,7 @@ struct Node { sd_bus *agent_bus; sd_bus_slot *internal_manager_slot; sd_bus_slot *disconnect_slot; + sd_bus_slot *metrics_matching_slot; LIST_FIELDS(Node, nodes); @@ -84,6 +85,10 @@ int node_create_request( void node_remove_proxy_monitor(Node *node, ProxyMonitor *proxy_monitor); +int node_method_get_unit_uint64_property_sync(Node *node, char *unit, char *property, uint64_t *value); +void node_enable_metrics(Node *node); +void node_disable_metrics(Node *node); + DEFINE_CLEANUP_FUNC(Node, node_unref) #define _cleanup_node_ _cleanup_(node_unrefp) DEFINE_CLEANUP_FUNC(AgentRequest, agent_request_unref)