Skip to content

Commit

Permalink
Fix: ust-consumer: metadata thread not woken-up after version change
Browse files Browse the repository at this point in the history
Issue observed
==============

The metadata regeneration test fails, very rarely, in the "streaming"
case on the CI. The interesting part of the test boils down to:
  1) start session
  2) launch an app tracing one event
  3) stop session
  4) delete metadata file
  5) start session
  6) regenerate metadata
  7) stop session
  8) destroy session
  9) read trace: babeltrace fails on an invalid metadata file.

The problem is hard to capture, but modifying the test allows us to see
that there appears to be a short window between steps 7 and 8 where the
metadata file is empty or doesn't exist.

Cause
=====

When metadata is regenerated, its version is bumped and the metadata
cache is "reset". In some cases, such as in this test, the new metadata
will have exactly the same size as it had prior as nothing happened to
change that (e.g. no new apps/probes were registered).

When this occurs, the metadata thread is not woken-up by
consumer_metadata_cache_write() as it sees that max_offset of the
metadata cache didn't change; the data was replaced but it has the same
size.

The metadata consumption thread also checks for version bumps and
resets the amount of consumed metadata. Hence, if the "cache write"
operation woke up the metadata consumption thread, the stream's
"ust metadata pushed" state would be reset and the new contents would
be consumed.

Solution
========

The metadata stream's "ust metadata pushed" position is directly reset
to zero when a metadata version change is detected by the metadata
cache. The metadata poll thread is also woken up to resume the
consumption of the newly-available data.

It is unclear why the change to the consumption position was only done
on the metadata consumption thread's code path and not directly by the
session daemon command handling.

Note that a session rotation will also result in a reset of the pushed
position and a wake-up of the metadata poll thread from the command
handling thread. I am speculating that this couldn't be done due to the
design of the locking at the time of the original
implementation (I haven't checked).

In implementing this change, the metadata reception code path is
untangled a bit to separate the logic that affects the metadata stream
from the logic that manages the metadata cache. I suspect the original
error stems from a mix-up/confusion between both concerns.

When a metadata version change happens, the metadata cache resets its
'max_offset' (in other words, it's current size) and notifies the
caller. The caller then resets the "ust pushed metadata" position to
zero and wakes-up the metadata thread to consume the new contents of the
metadata cache.

Known drawbacks
===============

None.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I142ef957140d497ac7fc4294ca65a55c12518598
  • Loading branch information
jgalar committed Feb 9, 2021
1 parent 934ba8f commit b1316da
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 66 deletions.
87 changes: 35 additions & 52 deletions src/common/consumer/consumer-metadata-cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@

#include "consumer-metadata-cache.h"

enum metadata_cache_update_version_status {
METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED,
METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED,
};

extern struct lttng_consumer_global_data consumer_data;

/*
Expand Down Expand Up @@ -74,60 +79,23 @@ void metadata_cache_reset(struct consumer_metadata_cache *cache)
* Check if the metadata cache version changed.
* If it did, reset the metadata cache.
* The metadata cache lock MUST be held.
*
* Returns 0 on success, a negative value on error.
*/
static
int metadata_cache_check_version(struct consumer_metadata_cache *cache,
uint64_t version)
static enum metadata_cache_update_version_status metadata_cache_update_version(
struct consumer_metadata_cache *cache, uint64_t version)
{
int ret = 0;
enum metadata_cache_update_version_status status;

if (cache->version == version) {
status = METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED;
goto end;
}

DBG("Metadata cache version update to %" PRIu64, version);
metadata_cache_reset(cache);
cache->version = version;
status = METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED;

end:
return ret;
}

/*
* Write a character on the metadata poll pipe to wake the metadata thread.
* Returns 0 on success, -1 on error.
*/
int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
{
int ret = 0;
const char dummy = 'c';

if (channel->monitor && channel->metadata_stream) {
ssize_t write_ret;

write_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1],
&dummy, 1);
if (write_ret < 1) {
if (errno == EWOULDBLOCK) {
/*
* This is fine, the metadata poll thread
* is having a hard time keeping-up, but
* it will eventually wake-up and consume
* the available data.
*/
ret = 0;
} else {
PERROR("Wake-up UST metadata pipe");
ret = -1;
goto end;
}
}
}

end:
return ret;
return status;
}

/*
Expand All @@ -136,23 +104,31 @@ int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
* contiguous metadata in cache to the ring buffer. The metadata cache
* lock MUST be acquired to write in the cache.
*
* Return 0 on success, a negative value on error.
* See `enum consumer_metadata_cache_write_status` for the meaning of the
* various returned status codes.
*/
int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
enum consumer_metadata_cache_write_status
consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
unsigned int offset, unsigned int len, uint64_t version,
const char *data)
{
int ret = 0;
struct consumer_metadata_cache *cache;
enum consumer_metadata_cache_write_status status;
bool cache_is_invalidated = false;
uint64_t original_max_offset;

assert(channel);
assert(channel->metadata_cache);

cache = channel->metadata_cache;
ASSERT_LOCKED(cache->lock);
original_max_offset = cache->max_offset;

ret = metadata_cache_check_version(cache, version);
if (ret < 0) {
goto end;
if (metadata_cache_update_version(cache, version) ==
METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED) {
metadata_cache_reset(cache);
cache_is_invalidated = true;
}

DBG("Writing %u bytes from offset %u in metadata cache", len, offset);
Expand All @@ -162,18 +138,25 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
len - cache->cache_alloc_size + offset);
if (ret < 0) {
ERR("Extending metadata cache");
status = CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR;
goto end;
}
}

memcpy(cache->data + offset, data, len);
if (offset + len > cache->max_offset) {
cache->max_offset = offset + len;
ret = consumer_metadata_wakeup_pipe(channel);
cache->max_offset = max(cache->max_offset, offset + len);

if (cache_is_invalidated) {
status = CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED;
} else if (cache->max_offset > original_max_offset) {
status = CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT;
} else {
status = CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE;
assert(cache->max_offset == original_max_offset);
}

end:
return ret;
return status;
}

/*
Expand Down
25 changes: 23 additions & 2 deletions src/common/consumer/consumer-metadata-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@

#include <common/consumer/consumer.h>

enum consumer_metadata_cache_write_status {
CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR = -1,
/*
* New metadata content was appended to the cache successfully.
* Previously available content remains valid.
*/
CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT = 0,
/*
* The new content pushed to the cache invalidated the content that
* was already present. The contents of the cache should be re-read.
*/
CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED,
/*
* A metadata cache write can simply overwrite an already existing
* section of the cache (and it should be a write-through with identical
* data). From the caller's standpoint, there is no change to the state
* of the cache.
*/
CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE,
};

struct consumer_metadata_cache {
char *data;
uint64_t cache_alloc_size;
Expand All @@ -35,13 +56,13 @@ struct consumer_metadata_cache {
pthread_mutex_t lock;
};

int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
enum consumer_metadata_cache_write_status
consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
unsigned int offset, unsigned int len, uint64_t version,
const char *data);
int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel);
void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel);
int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
uint64_t offset, int timer);
int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);

#endif /* CONSUMER_METADATA_CACHE_H */
37 changes: 37 additions & 0 deletions src/common/consumer/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,43 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
return outfd;
}

/*
* Write a character on the metadata poll pipe to wake the metadata thread.
* Returns 0 on success, -1 on error.
*/
int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
{
int ret = 0;

DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'",
channel->name);
if (channel->monitor && channel->metadata_stream) {
const char dummy = 'c';
const ssize_t write_ret = lttng_write(
channel->metadata_stream->ust_metadata_poll_pipe[1],
&dummy, 1);

if (write_ret < 1) {
if (errno == EWOULDBLOCK) {
/*
* This is fine, the metadata poll thread
* is having a hard time keeping-up, but
* it will eventually wake-up and consume
* the available data.
*/
ret = 0;
} else {
PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
ret = -1;
goto end;
}
}
}

end:
return ret;
}

/*
* Trigger a dump of the metadata content. Following/during the succesful
* completion of this call, the metadata poll thread will start receiving
Expand Down
1 change: 1 addition & 0 deletions src/common/consumer/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -1052,5 +1052,6 @@ enum lttcomm_return_code lttng_consumer_init_command(
int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
enum lttcomm_return_code lttng_consumer_open_channel_packets(
struct lttng_consumer_channel *channel);
int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);

#endif /* LIB_CONSUMER_H */
60 changes: 48 additions & 12 deletions src/common/ust-consumer/ust-consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,17 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
return ret;
}

static
void metadata_stream_reset_cache_consumed_position(
struct lttng_consumer_stream *stream)
{
ASSERT_LOCKED(stream->lock);

DBG("Reset metadata cache of session %" PRIu64,
stream->chan->session_id);
stream->ust_metadata_pushed = 0;
}

/*
* Receive the metadata updates from the sessiond. Supports receiving
* overlapping metadata, but is needs to always belong to a contiguous
Expand All @@ -1297,6 +1308,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
enum consumer_metadata_cache_write_status cache_write_status;

DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);

Expand All @@ -1320,10 +1332,40 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
health_code_update();

pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, version,
metadata_str);
cache_write_status = consumer_metadata_cache_write(
channel, offset, len, version, metadata_str);
pthread_mutex_unlock(&channel->metadata_cache->lock);
if (ret < 0) {
switch (cache_write_status) {
case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
/*
* The write entirely overlapped with existing contents of the
* same metadata version (same content); there is nothing to do.
*/
break;
case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
/*
* The metadata cache was invalidated (previously pushed
* content has been overwritten). Reset the stream's consumed
* metadata position to ensure the metadata poll thread consumes
* the whole cache.
*/
pthread_mutex_lock(&channel->metadata_stream->lock);
metadata_stream_reset_cache_consumed_position(
channel->metadata_stream);
pthread_mutex_unlock(&channel->metadata_stream->lock);
/* Fall-through. */
case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
/*
* In both cases, the metadata poll thread has new data to
* consume.
*/
ret = consumer_metadata_wakeup_pipe(channel);
if (ret) {
ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
goto end_free;
}
break;
case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
/* Unable to handle metadata. Notify session daemon. */
ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
/*
Expand All @@ -1332,6 +1374,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
* waiting for the metadata cache to be flushed.
*/
goto end_free;
default:
abort();
}

if (!wait) {
Expand Down Expand Up @@ -2464,15 +2508,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
return ustctl_stream_close_wakeup_fd(stream->ustream);
}

static
void metadata_stream_reset_cache_consumed_position(
struct lttng_consumer_stream *stream)
{
DBG("Reset metadata cache of session %" PRIu64,
stream->chan->session_id);
stream->ust_metadata_pushed = 0;
}

/*
* Write up to one packet from the metadata cache to the channel.
*
Expand Down Expand Up @@ -3051,6 +3086,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)

assert(stream);
assert(stream->ustream);
ASSERT_LOCKED(stream->lock);

DBG("UST consumer checking data pending");

Expand Down

0 comments on commit b1316da

Please sign in to comment.