diff --git a/src/common/consumer/consumer-metadata-cache.c b/src/common/consumer/consumer-metadata-cache.c index 987bf7c857..f712fa7f65 100644 --- a/src/common/consumer/consumer-metadata-cache.c +++ b/src/common/consumer/consumer-metadata-cache.c @@ -23,6 +23,11 @@ #include "consumer-metadata-cache.h" +enum metadata_cache_update_version_status { + METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED, + METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED, +}; + extern struct lttng_consumer_global_data consumer_data; /* @@ -74,60 +79,23 @@ void metadata_cache_reset(struct consumer_metadata_cache *cache) * Check if the metadata cache version changed. * If it did, reset the metadata cache. * The metadata cache lock MUST be held. - * - * Returns 0 on success, a negative value on error. */ -static -int metadata_cache_check_version(struct consumer_metadata_cache *cache, - uint64_t version) +static enum metadata_cache_update_version_status metadata_cache_update_version( + struct consumer_metadata_cache *cache, uint64_t version) { - int ret = 0; + enum metadata_cache_update_version_status status; if (cache->version == version) { + status = METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED; goto end; } DBG("Metadata cache version update to %" PRIu64, version); - metadata_cache_reset(cache); cache->version = version; + status = METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED; end: - return ret; -} - -/* - * Write a character on the metadata poll pipe to wake the metadata thread. - * Returns 0 on success, -1 on error. - */ -int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel) -{ - int ret = 0; - const char dummy = 'c'; - - if (channel->monitor && channel->metadata_stream) { - ssize_t write_ret; - - write_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], - &dummy, 1); - if (write_ret < 1) { - if (errno == EWOULDBLOCK) { - /* - * This is fine, the metadata poll thread - * is having a hard time keeping-up, but - * it will eventually wake-up and consume - * the available data. - */ - ret = 0; - } else { - PERROR("Wake-up UST metadata pipe"); - ret = -1; - goto end; - } - } - } - -end: - return ret; + return status; } /* @@ -136,23 +104,31 @@ int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel) * contiguous metadata in cache to the ring buffer. The metadata cache * lock MUST be acquired to write in the cache. * - * Return 0 on success, a negative value on error. + * See `enum consumer_metadata_cache_write_status` for the meaning of the + * various returned status codes. */ -int consumer_metadata_cache_write(struct lttng_consumer_channel *channel, +enum consumer_metadata_cache_write_status +consumer_metadata_cache_write(struct lttng_consumer_channel *channel, unsigned int offset, unsigned int len, uint64_t version, const char *data) { int ret = 0; struct consumer_metadata_cache *cache; + enum consumer_metadata_cache_write_status status; + bool cache_is_invalidated = false; + uint64_t original_max_offset; assert(channel); assert(channel->metadata_cache); cache = channel->metadata_cache; + ASSERT_LOCKED(cache->lock); + original_max_offset = cache->max_offset; - ret = metadata_cache_check_version(cache, version); - if (ret < 0) { - goto end; + if (metadata_cache_update_version(cache, version) == + METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED) { + metadata_cache_reset(cache); + cache_is_invalidated = true; } DBG("Writing %u bytes from offset %u in metadata cache", len, offset); @@ -162,18 +138,25 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel, len - cache->cache_alloc_size + offset); if (ret < 0) { ERR("Extending metadata cache"); + status = CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR; goto end; } } memcpy(cache->data + offset, data, len); - if (offset + len > cache->max_offset) { - cache->max_offset = offset + len; - ret = consumer_metadata_wakeup_pipe(channel); + cache->max_offset = max(cache->max_offset, offset + len); + + if (cache_is_invalidated) { + status = CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED; + } else if (cache->max_offset > original_max_offset) { + status = CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT; + } else { + status = CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE; + assert(cache->max_offset == original_max_offset); } end: - return ret; + return status; } /* diff --git a/src/common/consumer/consumer-metadata-cache.h b/src/common/consumer/consumer-metadata-cache.h index ecb7e15495..4c029b90e9 100644 --- a/src/common/consumer/consumer-metadata-cache.h +++ b/src/common/consumer/consumer-metadata-cache.h @@ -11,6 +11,27 @@ #include +enum consumer_metadata_cache_write_status { + CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR = -1, + /* + * New metadata content was appended to the cache successfully. + * Previously available content remains valid. + */ + CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT = 0, + /* + * The new content pushed to the cache invalidated the content that + * was already present. The contents of the cache should be re-read. + */ + CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED, + /* + * A metadata cache write can simply overwrite an already existing + * section of the cache (and it should be a write-through with identical + * data). From the caller's standpoint, there is no change to the state + * of the cache. + */ + CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE, +}; + struct consumer_metadata_cache { char *data; uint64_t cache_alloc_size; @@ -35,13 +56,13 @@ struct consumer_metadata_cache { pthread_mutex_t lock; }; -int consumer_metadata_cache_write(struct lttng_consumer_channel *channel, +enum consumer_metadata_cache_write_status +consumer_metadata_cache_write(struct lttng_consumer_channel *channel, unsigned int offset, unsigned int len, uint64_t version, const char *data); int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel); void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel); int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, uint64_t offset, int timer); -int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel); #endif /* CONSUMER_METADATA_CACHE_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 23853e5c29..cecdc3f915 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -877,6 +877,43 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, return outfd; } +/* + * Write a character on the metadata poll pipe to wake the metadata thread. + * Returns 0 on success, -1 on error. + */ +int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel) +{ + int ret = 0; + + DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", + channel->name); + if (channel->monitor && channel->metadata_stream) { + const char dummy = 'c'; + const ssize_t write_ret = lttng_write( + channel->metadata_stream->ust_metadata_poll_pipe[1], + &dummy, 1); + + if (write_ret < 1) { + if (errno == EWOULDBLOCK) { + /* + * This is fine, the metadata poll thread + * is having a hard time keeping-up, but + * it will eventually wake-up and consume + * the available data. + */ + ret = 0; + } else { + PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread"); + ret = -1; + goto end; + } + } + } + +end: + return ret; +} + /* * Trigger a dump of the metadata content. Following/during the succesful * completion of this call, the metadata poll thread will start receiving diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index b45f88b756..7b381b2c75 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -1052,5 +1052,6 @@ enum lttcomm_return_code lttng_consumer_init_command( int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); enum lttcomm_return_code lttng_consumer_open_channel_packets( struct lttng_consumer_channel *channel); +int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index be00191749..5f1f93b0e1 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1283,6 +1283,17 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, return ret; } +static +void metadata_stream_reset_cache_consumed_position( + struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->lock); + + DBG("Reset metadata cache of session %" PRIu64, + stream->chan->session_id); + stream->ust_metadata_pushed = 0; +} + /* * Receive the metadata updates from the sessiond. Supports receiving * overlapping metadata, but is needs to always belong to a contiguous @@ -1297,6 +1308,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; + enum consumer_metadata_cache_write_status cache_write_status; DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); @@ -1320,10 +1332,40 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, health_code_update(); pthread_mutex_lock(&channel->metadata_cache->lock); - ret = consumer_metadata_cache_write(channel, offset, len, version, - metadata_str); + cache_write_status = consumer_metadata_cache_write( + channel, offset, len, version, metadata_str); pthread_mutex_unlock(&channel->metadata_cache->lock); - if (ret < 0) { + switch (cache_write_status) { + case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE: + /* + * The write entirely overlapped with existing contents of the + * same metadata version (same content); there is nothing to do. + */ + break; + case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED: + /* + * The metadata cache was invalidated (previously pushed + * content has been overwritten). Reset the stream's consumed + * metadata position to ensure the metadata poll thread consumes + * the whole cache. + */ + pthread_mutex_lock(&channel->metadata_stream->lock); + metadata_stream_reset_cache_consumed_position( + channel->metadata_stream); + pthread_mutex_unlock(&channel->metadata_stream->lock); + /* Fall-through. */ + case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT: + /* + * In both cases, the metadata poll thread has new data to + * consume. + */ + ret = consumer_metadata_wakeup_pipe(channel); + if (ret) { + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto end_free; + } + break; + case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR: /* Unable to handle metadata. Notify session daemon. */ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; /* @@ -1332,6 +1374,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * waiting for the metadata cache to be flushed. */ goto end_free; + default: + abort(); } if (!wait) { @@ -2464,15 +2508,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) return ustctl_stream_close_wakeup_fd(stream->ustream); } -static -void metadata_stream_reset_cache_consumed_position( - struct lttng_consumer_stream *stream) -{ - DBG("Reset metadata cache of session %" PRIu64, - stream->chan->session_id); - stream->ust_metadata_pushed = 0; -} - /* * Write up to one packet from the metadata cache to the channel. * @@ -3051,6 +3086,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); assert(stream->ustream); + ASSERT_LOCKED(stream->lock); DBG("UST consumer checking data pending");