Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics: basic start unit job reporting #265

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data/org.containers.hirte.Manager.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
<method name="CreateMonitor">
<arg name="monitor" type="o" direction="out" />
</method>
<method name="EnableMetrics" />
<method name="DisableMetrics" />

<signal name="JobNew">
<arg name="id" type="u" />
Expand Down
19 changes: 19 additions & 0 deletions data/org.containers.hirte.Metrics.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN" "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
<!-- SPDX-License-Identifier: GPL-2.0-or-later -->
<node>
<interface name="org.containers.hirte.Metrics">
<signal name="StartUnitJobMetrics">
<arg name="node_name" type="s" />
<arg name="job_id" type="s" />
<arg name="unit" type="s" />
<arg name="job_measured_time_micros" type="t" />
<arg name="unit_start_prop_time_micros" type="t" />
</signal>
<signal name="AgentJobMetrics">
<arg name="node_name" type="s" />
<arg name="unit" type="s" />
<arg name="method" type="s" />
<arg name="systemd_job_time_micros" type="t" />
</signal>
</interface>
</node>
11 changes: 11 additions & 0 deletions data/org.containers.hirte.internal.Agent.Metrics.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN" "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
<!-- SPDX-License-Identifier: GPL-2.0-or-later -->
<node>
<interface name="org.containers.hirte.internal.Agent.Metrics">
<signal name="AgentJobMetrics">
<arg name="unit" type="s" />
<arg name="method" type="s" />
<arg name="systemd_job_time_micros" type="t" />
</signal>
</interface>
</node>
2 changes: 2 additions & 0 deletions data/org.containers.hirte.internal.Agent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
<method name="Unsubscribe">
<arg name="unit" type="s" direction="in" />
</method>
<method name="EnableMetrics" />
<method name="DisableMetrics" />
<method name="StartDep">
<arg name="unit" type="s" direction="in" />
</method>
Expand Down
181 changes: 140 additions & 41 deletions src/agent/agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "libhirte/common/event-util.h"
#include "libhirte/common/opt.h"
#include "libhirte/common/parse-util.h"
#include "libhirte/common/time-util.h"
#include "libhirte/log/log.h"
#include "libhirte/service/shutdown.h"

Expand Down Expand Up @@ -45,6 +46,52 @@ static bool
void *userdata,
free_func_t free_userdata);

/* Keep track of outstanding systemd job and connect it back to
the originating hirte job id so we can proxy changes to it. */
typedef struct {
int ref_count;

Agent *agent;
uint32_t hirte_job_id;
uint64_t job_start_micros;
char *unit;
char *method;
} AgentJobOp;


static AgentJobOp *agent_job_op_ref(AgentJobOp *op) {
op->ref_count++;
return op;
}

static void agent_job_op_unref(AgentJobOp *op) {
op->ref_count--;
if (op->ref_count != 0) {
return;
}

agent_unref(op->agent);
free(op->unit);
free(op->method);
free(op);
}

DEFINE_CLEANUP_FUNC(AgentJobOp, agent_job_op_unref)
#define _cleanup_agent_job_op_ _cleanup_(agent_job_op_unrefp)

static AgentJobOp *agent_job_new(Agent *agent, uint32_t hirte_job_id, const char *unit, const char *method) {
AgentJobOp *op = malloc0(sizeof(AgentJobOp));
if (op) {
op->ref_count = 1;
op->agent = agent_ref(agent);
op->hirte_job_id = hirte_job_id;
op->job_start_micros = 0;
op->unit = strdup(unit);
op->method = strdup(method);
}
return op;
}

bool agent_is_connected(Agent *agent) {
return agent != NULL && agent->connection_state == AGENT_CONNECTION_STATE_CONNECTED;
}
Expand Down Expand Up @@ -192,6 +239,10 @@ static void systemd_request_set_userdata(SystemdRequest *req, void *userdata, fr

static bool systemd_request_start(SystemdRequest *req, sd_bus_message_handler_t callback) {
Agent *agent = req->agent;
AgentJobOp *op = req->userdata;
if (op != NULL) {
op->job_start_micros = get_time_micros();
}

int r = sd_bus_call_async(
agent->systemd_dbus, &req->slot, req->message, callback, req, HIRTE_DEFAULT_DBUS_TIMEOUT);
Expand Down Expand Up @@ -337,6 +388,7 @@ Agent *agent_new(void) {
agent->connection_retry_count = 0;
agent->wildcard_subscription_active = false;
agent->name = get_hostname();
agent->metrics_enabled = false;

return steal_pointer(&agent);
}
Expand Down Expand Up @@ -413,6 +465,10 @@ void agent_unref(Agent *agent) {
sd_event_unrefp(&agent->event);
}

if (agent->metrics_slot != NULL) {
sd_bus_slot_unrefp(&agent->metrics_slot);
}

if (agent->peer_dbus != NULL) {
sd_bus_unrefp(&agent->peer_dbus);
}
Expand Down Expand Up @@ -824,50 +880,19 @@ static int agent_method_set_unit_properties(sd_bus_message *m, void *userdata, U
return 1;
}


/* Keep track of outstanding systemd job and connect it back to
the originating hirte job id so we can proxy changes to it. */
typedef struct {
int ref_count;

Agent *agent;
uint32_t hirte_job_id;
} AgentJobOp;


static AgentJobOp *agent_job_op_ref(AgentJobOp *op) {
op->ref_count++;
return op;
}

static void agent_job_op_unref(AgentJobOp *op) {
op->ref_count--;
if (op->ref_count != 0) {
return;
}

agent_unref(op->agent);
free(op);
}

DEFINE_CLEANUP_FUNC(AgentJobOp, agent_job_op_unref)
#define _cleanup_agent_job_op_ _cleanup_(agent_job_op_unrefp)

static AgentJobOp *agent_job_new(Agent *agent, uint32_t hirte_job_id) {
AgentJobOp *op = malloc0(sizeof(AgentJobOp));
if (op) {
op->ref_count = 1;
op->agent = agent_ref(agent);
op->hirte_job_id = hirte_job_id;
}
return op;
}


static void agent_job_done(UNUSED sd_bus_message *m, const char *result, void *userdata) {
AgentJobOp *op = userdata;
Agent *agent = op->agent;

if (agent->metrics_enabled) {
agent_send_job_metrics(
agent,
op->unit,
op->method,
// NOLINTNEXTLINE(bugprone-narrowing-conversions, cppcoreguidelines-narrowing-conversions)
finalize_time_interval_micros(op->job_start_micros));
}

hirte_log_infof("Sending JobDone %u, result: %s", op->hirte_job_id, result);

int r = sd_bus_emit_signal(
Expand Down Expand Up @@ -929,7 +954,7 @@ static int agent_run_unit_lifecycle_method(sd_bus_message *m, Agent *agent, cons
return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_FAILED, "Internal error");
}

_cleanup_agent_job_op_ AgentJobOp *op = agent_job_new(agent, job_id);
_cleanup_agent_job_op_ AgentJobOp *op = agent_job_new(agent, job_id, name, method);
if (op == NULL) {
return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_FAILED, "Internal error");
}
Expand Down Expand Up @@ -1213,6 +1238,60 @@ static int agent_method_stop_dep(sd_bus_message *m, void *userdata, UNUSED sd_bu
}


/***************************************************************************
********** org.containers.hirte.internal.Agent.EnableMetrics **************
**************************************************************************/

static const sd_bus_vtable agent_metrics_vtable[] = {
SD_BUS_VTABLE_START(0),
SD_BUS_SIGNAL_WITH_NAMES(
"AgentJobMetrics",
"sst",
SD_BUS_PARAM(unit) SD_BUS_PARAM(method) SD_BUS_PARAM(systemd_job_time_micros),
0),
SD_BUS_VTABLE_END
};

static int agent_method_enable_metrics(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
Agent *agent = userdata;
int r = 0;

if (agent->metrics_enabled) {
return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_INVALID_ARGS, "Metrics already enabled");
}
agent->metrics_enabled = true;
mkemel marked this conversation as resolved.
Show resolved Hide resolved
r = sd_bus_add_object_vtable(
agent->peer_dbus,
&agent->metrics_slot,
INTERNAL_AGENT_METRICS_OBJECT_PATH,
INTERNAL_AGENT_METRICS_INTERFACE,
agent_metrics_vtable,
NULL);
if (r < 0) {
hirte_log_errorf("Failed to add metrics vtable: %s", strerror(-r));
return r;
}
hirte_log_debug("Metrics enabled");
return sd_bus_reply_method_return(m, "");
}

/****************************************************************************
********** org.containers.hirte.internal.Agent.DisableMetrics **************
***************************************************************************/

static int agent_method_disable_metrics(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
Agent *agent = userdata;

if (!agent->metrics_enabled) {
return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_INVALID_ARGS, "Metrics already disabled");
}
agent->metrics_enabled = false;
mkemel marked this conversation as resolved.
Show resolved Hide resolved
sd_bus_slot_unrefp(&agent->metrics_slot);
hirte_log_debug("Metrics disabled");
return sd_bus_reply_method_return(m, "");
}


static const sd_bus_vtable internal_agent_vtable[] = {
SD_BUS_VTABLE_START(0),
SD_BUS_METHOD("ListUnits", "", UNIT_INFO_STRUCT_ARRAY_TYPESTRING, agent_method_list_units, 0),
Expand All @@ -1225,6 +1304,8 @@ static const sd_bus_vtable internal_agent_vtable[] = {
SD_BUS_METHOD("ReloadUnit", "ssu", "", agent_method_reload_unit, 0),
SD_BUS_METHOD("Subscribe", "s", "", agent_method_subscribe, 0),
SD_BUS_METHOD("Unsubscribe", "s", "", agent_method_unsubscribe, 0),
SD_BUS_METHOD("EnableMetrics", "", "", agent_method_enable_metrics, 0),
SD_BUS_METHOD("DisableMetrics", "", "", agent_method_disable_metrics, 0),
SD_BUS_SIGNAL_WITH_NAMES("JobDone", "us", SD_BUS_PARAM(id) SD_BUS_PARAM(result), 0),
SD_BUS_SIGNAL_WITH_NAMES("JobStateChanged", "us", SD_BUS_PARAM(id) SD_BUS_PARAM(state), 0),
SD_BUS_SIGNAL_WITH_NAMES(
Expand Down Expand Up @@ -2033,5 +2114,23 @@ static int agent_stop_local_proxy_service(Agent *agent, ProxyService *proxy) {
}


return 0;
}

int agent_send_job_metrics(Agent *agent, char *unit, char *method, uint64_t systemd_job_time) {
hirte_log_debugf("Sending agent %s job metrics on unit %s: %ldus", unit, method, systemd_job_time);
int r = sd_bus_emit_signal(
agent->peer_dbus,
INTERNAL_AGENT_METRICS_OBJECT_PATH,
INTERNAL_AGENT_METRICS_INTERFACE,
"AgentJobMetrics",
"sst",
unit,
method,
systemd_job_time);
if (r < 0) {
hirte_log_errorf("Failed to emit metric signal: %s", strerror(-r));
}

return 0;
}
6 changes: 6 additions & 0 deletions src/agent/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,16 @@ struct Agent {
char *orch_addr;
char *api_bus_service_name;

bool metrics_enabled;

sd_event *event;

sd_bus *api_bus;
sd_bus *systemd_dbus;
sd_bus *peer_dbus;

sd_bus_slot *metrics_slot;

LIST_HEAD(SystemdRequest, outstanding_requests);
LIST_HEAD(JobTracker, tracked_jobs);
LIST_HEAD(ProxyService, proxy_services);
Expand Down Expand Up @@ -105,5 +109,7 @@ bool agent_is_connected(Agent *agent);

void agent_remove_proxy(Agent *agent, ProxyService *proxy, bool emit);

int agent_send_job_metrics(Agent *agent, char *unit, char *method, uint64_t systemd_job_time);

DEFINE_CLEANUP_FUNC(Agent, agent_unref)
#define _cleanup_agent_ _cleanup_(agent_unrefp)
Loading