Skip to content

Commit

Permalink
Fix: sessiond: sessiond and agent deadlock on destroy
Browse files Browse the repository at this point in the history
Observed issue
--------------

While running the out-of-tree java agent tests [1], the session daemon
and agent often end up in a deadlock.

Attaching gdb to the session daemon, we can see that two threads are
blocked in an intriguing state.

Thread 13 (Thread 0x7f89027fc700 (LWP 9636)):
 #0  0x00007f891e81a4cf in __lll_lock_wait () from /usr/lib/libpthread.so.0
 #1  0x00007f891e812e03 in pthread_mutex_lock () from /usr/lib/libpthread.so.0
 #2  0x000055637f1fbd92 in session_lock_list () at session.c:156
 #3  0x000055637f25dc47 in update_agent_app (app=0x7f88ec003480) at agent-thread.c:56
 #4  0x000055637f25ec0a in thread_agent_management (data=0x556380cd2400) at agent-thread.c:426
 #5  0x000055637f22fb3a in launch_thread (data=0x556380cd24a0) at thread.c:65
 #6  0x00007f891e81046f in start_thread () from /usr/lib/libpthread.so.0
 #7  0x00007f891e7203d3 in clone () from /usr/lib/libc.so.6

Thread 8 (Thread 0x7f8919309700 (LWP 9631)):
 #0  0x00007f891e81b44d in recvmsg () from /usr/lib/libpthread.so.0
 #1  0x000055637f267847 in lttcomm_recvmsg_inet_sock (sock=0x7f88ec0033c0, buf=0x7f89192f5d5c, len=4, flags=0) at inet.c:367
 #2  0x000055637f2146c6 in recv_reply (sock=0x7f88ec0033c0, buf=0x7f89192f5d5c, size=4) at agent.c:275
 #3  0x000055637f215202 in app_context_op (app=0x7f88ec003400, ctx=0x7f8908020900, cmd=AGENT_CMD_APP_CTX_DISABLE) at agent.c:552
 #4  0x000055637f215c2d in disable_context (ctx=0x7f8908020900, domain=LTTNG_DOMAIN_JUL) at agent.c:841
 #5  0x000055637f217480 in agent_destroy (agt=0x7f890801dc20) at agent.c:1326
 #6  0x000055637f243448 in trace_ust_destroy_session (session=0x7f8908004010) at trace-ust.c:1408
 #7  0x000055637f1fd775 in session_release (ref=0x7f8908001e70) at session.c:873
 #8  0x000055637f1fb9ac in urcu_ref_put (ref=0x7f8908001e70, release=0x55637f1fd62a <session_release>) at /usr/include/urcu/ref.h:68
 lttng#9  0x000055637f1fdad2 in session_put (session=0x7f8908000d10) at session.c:942
 lttng#10 0x000055637f2369e6 in process_client_msg (cmd_ctx=0x7f890800e6e0, sock=0x7f8919308560, sock_error=0x7f8919308564) at client.c:2102
 lttng#11 0x000055637f2375ab in thread_manage_clients (data=0x556380cd1840) at client.c:2347
 lttng#12 0x000055637f22fb3a in launch_thread (data=0x556380cd18b0) at thread.c:65
 lttng#13 0x00007f891e81046f in start_thread () from /usr/lib/libpthread.so.0
 lttng#14 0x00007f891e7203d3 in clone () from /usr/lib/libc.so.6

T8 is holding session list lock while the cmd_destroy_session
command is being processed. More specifically, it is attempting
to destroy an "agent_context" by communicating with an "agent"
application.

Meanwhile, T13 is still registering that same "agent" application.

Cause
-----

The deadlock itself is pretty simple to understand.

The "agent thread" (T13) has the responsability of accepting new agent
application connections. When such a connection occurs, the thread
creates a new `agent_app` instance and sends the current sessions'
configuration (i.e. their event rules and contexts) to the agent
application. When that "update" is complete, a "registration done"
message is sent to the new agent application.

From the stacktrace above, we can see that T13 is attempting to update
the agent application with its initial configuration, but it is
blocked on the acquisition of the session list lock. The application's
agent is also blocked since it is waiting for the "registration done"
message before allowing tracing to proceed (not shown here, but seen
in the test logs).

Meanwhile, T8 is holding the session list lock while destroying a
session. This is expected as all client commands are executed with
this lock held. It is, amongst other reasons, used to serialize
changes to the sessions' configuration and configuration updates sent
to the tracers (i.e. because new apps appear or to keep existing
tracers in sync with the users' session configuration).

The question becomes: why is T8 tearing down an application that is
not yet registered?

First, inspecting `agent_app` immediately shows that this structure
has no built-in synchronization mechanism. Therefore, the fact that
two threads are accessing it at the same time raises a big red flag.

Speculating on the intentions of the original design, my intuition is
that the "agent_management" thread's role is limited to instantiating
an `agent_app` and synchronizing it with the various sessions'
configuration. Once that synchronization is performed, the agent
application should be published and never accessed again by the "agent
thread".

Configuration updates (i.e. new event rules, contexts) are then sent
synchronously as they are requested by a client in the context of the
client thread. Those updates are performed while holding the session
list lock.

Hence, there is only one thread that should manipulate the agent
application at any given time making an explicit `agent_app` lock
unnecessary.

Overall, this would echo what is done when a 'user space tracer'
application registers to the session daemon (see dispatch.c:368).

Evidently this isn't what is happening here.

The agent thread creates the `agent_app`, publishes it, and then
performs an "agent app update" (sending the configuration) while
holding the session list lock. This means that there is a window where
an agent application is visible to the other threads, yet has not been
properly registered.

Solution
--------

The acquisition of the session list lock is moved outside of
update_agent_app() to allow the "agent thread" to hold the session
list lock during the "configuration update" phase of the agent
application registration.

Essentially, the sequence of operation changes from:

- Agent tcp connection established
- call handle_registration()
  - agent version check
  - allocation of agent_app instance
  - new agent_add is published through the global agent_apps_ht_by_sock
    hashtable
    ***
    it is now reachable by all other threads without any form of
    exclusivity synchronization.
    ***
- update_agent_app
  - acquire session list lock
  - iterate over sessions
    - send configuration
  - release session list lock
- send registration done

to:

- Agent tcp connection established
- call accept_agent_registration()
  - agent version check
- allocation of agent_app instance
- acquire session list lock
- update_agent_app
  - iterate over sessions
    - send configuration
- send registration done
- new agent_add is published through the global agent_apps_ht_by_sock
  hashtable
- release session list lock

Links
-----

[1] https://github.com/lttng/lttng-ust-java-tests

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: Ia34c5ad81ed3936acbca756b425423e0cb8dbddf
  • Loading branch information
jgalar committed Apr 22, 2020
1 parent 317ea24 commit dd6737b
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 88 deletions.
219 changes: 144 additions & 75 deletions src/bin/lttng-sessiond/agent-thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ struct thread_notifiers {
sem_t ready;
};

struct agent_app_id {
pid_t pid;
enum lttng_domain_type domain;
};

struct agent_protocol_version {
unsigned int major, minor;
};

static int agent_tracing_enabled = -1;

/*
Expand All @@ -42,38 +51,37 @@ static const char *default_reg_uri =
* Update agent application using the given socket. This is done just after
* registration was successful.
*
* This is a quite heavy call in terms of locking since the session list lock
* AND session lock are acquired.
* This will acquire the various sessions' lock; none must be held by the
* caller.
* The caller must hold the session list lock.
*/
static void update_agent_app(struct agent_app *app)
static void update_agent_app(const struct agent_app *app)
{
struct ltt_session *session, *stmp;
struct ltt_session_list *list;

list = session_get_list();
assert(list);

session_lock_list();
cds_list_for_each_entry_safe(session, stmp, &list->head, list) {
if (!session_get(session)) {
continue;
}

session_lock(session);
if (session->ust_session) {
struct agent *agt;
const struct agent *agt;

rcu_read_lock();
agt = trace_ust_find_agent(session->ust_session, app->domain);
if (agt) {
agent_update(agt, app->sock->fd);
agent_update(agt, app);
}
rcu_read_unlock();
}
session_unlock(session);
session_put(session);
}
session_unlock_list();
}

/*
Expand Down Expand Up @@ -188,23 +196,56 @@ static void destroy_tcp_socket(struct lttcomm_sock *sock)
lttcomm_destroy_sock(sock);
}

static const char *domain_type_str(enum lttng_domain_type domain_type)
{
switch (domain_type) {
case LTTNG_DOMAIN_NONE:
return "none";
case LTTNG_DOMAIN_KERNEL:
return "kernel";
case LTTNG_DOMAIN_UST:
return "ust";
case LTTNG_DOMAIN_JUL:
return "jul";
case LTTNG_DOMAIN_LOG4J:
return "log4j";
case LTTNG_DOMAIN_PYTHON:
return "python";
default:
return "unknown";
}
}

static bool is_agent_protocol_version_supported(
const struct agent_protocol_version *version)
{
const bool is_supported = version->major == AGENT_MAJOR_VERSION &&
version->minor == AGENT_MINOR_VERSION;

if (!is_supported) {
WARN("Refusing agent connection: unsupported protocol version %ui.%ui, expected %i.%i",
version->major, version->minor,
AGENT_MAJOR_VERSION, AGENT_MINOR_VERSION);
}

return is_supported;
}

/*
* Handle a new agent registration using the reg socket. After that, a new
* agent application is added to the global hash table and attach to an UST app
* object. If r_app is not NULL, the created app is set to the pointer.
* Handle a new agent connection on the registration socket.
*
* Return the new FD created upon accept() on success or else a negative errno
* value.
* Returns 0 on success, or else a negative errno value.
* On success, the resulting socket is returned through `agent_app_socket`
* and the application's reported id is updated through `agent_app_id`.
*/
static int handle_registration(struct lttcomm_sock *reg_sock,
struct agent_app **r_app)
static int accept_agent_connection(
struct lttcomm_sock *reg_sock,
struct agent_app_id *agent_app_id,
struct lttcomm_sock **agent_app_socket)
{
int ret;
pid_t pid;
uint32_t major_version, minor_version;
struct agent_protocol_version agent_version;
ssize_t size;
enum lttng_domain_type domain;
struct agent_app *app;
struct agent_register_msg msg;
struct lttcomm_sock *new_sock;

Expand All @@ -213,60 +254,52 @@ static int handle_registration(struct lttcomm_sock *reg_sock,
new_sock = reg_sock->ops->accept(reg_sock);
if (!new_sock) {
ret = -ENOTCONN;
goto error;
goto end;
}

size = new_sock->ops->recvmsg(new_sock, &msg, sizeof(msg), 0);
if (size < sizeof(msg)) {
if (size < 0) {
PERROR("Failed to register new agent application");
} else if (size != 0) {
ERR("Failed to register new agent application: invalid registration message length: expected length = %zu, message length = %zd",
sizeof(msg), size);
} else {
DBG("Failed to register new agent application: connection closed");
}
ret = -EINVAL;
goto error_socket;
}
domain = be32toh(msg.domain);
pid = be32toh(msg.pid);
major_version = be32toh(msg.major_version);
minor_version = be32toh(msg.minor_version);

/* Test communication protocol version of the registring agent. */
if (major_version != AGENT_MAJOR_VERSION) {
ret = -EINVAL;
goto error_socket;
}
if (minor_version != AGENT_MINOR_VERSION) {
ret = -EINVAL;
goto error_socket;
goto error_close_socket;
}

DBG2("[agent-thread] New registration for pid %d domain %d on socket %d",
pid, domain, new_sock->fd);
agent_version = (struct agent_protocol_version) {
be32toh(msg.major_version),
be32toh(msg.minor_version),
};

app = agent_create_app(pid, domain, new_sock);
if (!app) {
ret = -ENOMEM;
goto error_socket;
/* Test communication protocol version of the registering agent. */
if (!is_agent_protocol_version_supported(&agent_version)) {
ret = -EINVAL;
goto error_close_socket;
}

/*
* Add before assigning the socket value to the UST app so it can be found
* concurrently.
*/
agent_add_app(app);

/*
* We don't need to attach the agent app to the app. If we ever do so, we
* should consider both registration order of agent before app and app
* before agent.
*/
*agent_app_id = (struct agent_app_id) {
.domain = (enum lttng_domain_type) be32toh(msg.domain),
.pid = (pid_t) be32toh(msg.pid),
};

if (r_app) {
*r_app = app;
}
DBG2("New registration for agent application: pid = %ld, domain = %s, socket fd = %d",
(long) agent_app_id->pid,
domain_type_str(agent_app_id->domain), new_sock->fd);

return new_sock->fd;
*agent_app_socket = new_sock;
new_sock = NULL;
ret = 0;
goto end;

error_socket:
error_close_socket:
new_sock->ops->close(new_sock);
lttcomm_destroy_sock(new_sock);
error:
end:
return ret;
}

Expand Down Expand Up @@ -362,7 +395,7 @@ static void *thread_agent_management(void *data)
uatomic_set(&agent_tracing_enabled, 1);
mark_thread_as_ready(notifiers);

/* Add TCP socket to poll set. */
/* Add TCP socket to the poll set. */
ret = lttng_poll_add(&events, reg_sock->fd,
LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
if (ret < 0) {
Expand Down Expand Up @@ -399,43 +432,79 @@ static void *thread_agent_management(void *data)
goto exit;
}

/* Activity on the registration socket. */
if (revents & LPOLLIN) {
int new_fd;
struct agent_app *app = NULL;
struct agent_app_id new_app_id;
struct agent_app *new_app = NULL;
struct lttcomm_sock *new_app_socket;
int new_app_socket_fd;

assert(pollfd == reg_sock->fd);
new_fd = handle_registration(reg_sock, &app);
if (new_fd < 0) {

ret = accept_agent_connection(
reg_sock, &new_app_id, &new_app_socket);
if (ret < 0) {
/* Errors are already logged. */
continue;
}
/* Should not have a NULL app on success. */
assert(app);

/*
* Since this is a command socket (write then read),
* only add poll error event to only detect shutdown.
* new_app_socket's ownership has been
* transferred to the new agent app.
*/
ret = lttng_poll_add(&events, new_fd,
new_app = agent_create_app(new_app_id.pid,
new_app_id.domain,
new_app_socket);
if (!new_app) {
new_app_socket->ops->close(
new_app_socket);
continue;
}
new_app_socket_fd = new_app_socket->fd;
new_app_socket = NULL;

/*
* Since this is a command socket (write then
* read), only add poll error event to only
* detect shutdown.
*/
ret = lttng_poll_add(&events, new_app_socket_fd,
LPOLLERR | LPOLLHUP | LPOLLRDHUP);
if (ret < 0) {
agent_destroy_app_by_sock(new_fd);
agent_destroy_app(new_app);
continue;
}

/* Update newly registered app. */
update_agent_app(app);
/*
* Prevent sessions from being modified while
* the agent application's configuration is
* updated.
*/
session_lock_list();

/*
* Update the newly registered applications's
* configuration.
*/
update_agent_app(new_app);

/* On failure, the poll will detect it and clean it up. */
ret = agent_send_registration_done(app);
ret = agent_send_registration_done(new_app);
if (ret < 0) {
/* Removing from the poll set */
ret = lttng_poll_del(&events, new_fd);
agent_destroy_app(new_app);
/* Removing from the poll set. */
ret = lttng_poll_del(&events,
new_app_socket_fd);
if (ret < 0) {
session_unlock_list();
goto error;
}
agent_destroy_app_by_sock(new_fd);
continue;
}

/* Publish the new agent app. */
agent_add_app(new_app);

session_unlock_list();
} else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
/* Removing from the poll set */
ret = lttng_poll_del(&events, pollfd);
Expand Down
18 changes: 8 additions & 10 deletions src/bin/lttng-sessiond/agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ static ssize_t list_events(struct agent_app *app, struct lttng_event **events)
*
* Return LTTNG_OK on success or else a LTTNG_ERR* code.
*/
static int enable_event(struct agent_app *app, struct agent_event *event)
static int enable_event(const struct agent_app *app, struct agent_event *event)
{
int ret;
char *bytes_to_send;
Expand Down Expand Up @@ -495,8 +495,8 @@ int send_pstring(struct lttcomm_sock *sock, const char *str, uint32_t len)
*
* Return LTTNG_OK on success or else a LTTNG_ERR* code.
*/
static int app_context_op(struct agent_app *app,
struct agent_app_ctx *ctx, enum lttcomm_agent_command cmd)
static int app_context_op(const struct agent_app *app,
const struct agent_app_ctx *ctx, enum lttcomm_agent_command cmd)
{
int ret;
uint32_t reply_ret_code;
Expand Down Expand Up @@ -946,7 +946,7 @@ struct agent_app *agent_create_app(pid_t pid, enum lttng_domain_type domain,

app = zmalloc(sizeof(*app));
if (!app) {
PERROR("zmalloc agent create");
PERROR("Failed to allocate agent application instance");
goto error;
}

Expand Down Expand Up @@ -1402,26 +1402,24 @@ void agent_app_ht_clean(void)
* Note that this function is most likely to be used with a tracing session
* thus the caller should make sure to hold the appropriate lock(s).
*/
void agent_update(struct agent *agt, int sock)
void agent_update(const struct agent *agt, const struct agent_app *app)
{
int ret;
struct agent_app *app;
struct agent_event *event;
struct lttng_ht_iter iter;
struct agent_app_ctx *ctx;

assert(agt);
assert(sock >= 0);
assert(app);

DBG("Agent updating app socket %d", sock);
DBG("Agent updating app: pid = %ld", (long) app->pid);

rcu_read_lock();
app = agent_find_app_by_sock(sock);
/*
* We are in the registration path thus if the application is gone,
* there is a serious code flow error.
*/
assert(app);

cds_lfht_for_each_entry(agt->events->ht, &iter.iter, event, node.node) {
/* Skip event if disabled. */
if (!event->enabled) {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/lttng-sessiond/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ int agent_enable_event(struct agent_event *event,
enum lttng_domain_type domain);
int agent_disable_event(struct agent_event *event,
enum lttng_domain_type domain);
void agent_update(struct agent *agt, int sock);
void agent_update(const struct agent *agt, const struct agent_app *app);
int agent_list_events(struct lttng_event **events,
enum lttng_domain_type domain);

Expand Down
Loading

0 comments on commit dd6737b

Please sign in to comment.