diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index d0b1ddef22..d33c3647e3 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -86,7 +86,7 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } - stream->net_seq_idx = (uint64_t) -1ULL; + stream->relayd_id = (uint64_t) -1ULL; stream->sent_to_relayd = 0; } @@ -170,7 +170,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) /* Check and cleanup relayd if needed. */ rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd != NULL) { consumer_stream_relayd_close(stream, relayd); } @@ -352,31 +352,24 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, /* * Write index of a specific stream either on the relayd or local disk. + * RCU read lock must be held by the caller. * * Return 0 on success or else a negative value. */ int consumer_stream_write_index(struct lttng_consumer_stream *stream, - struct ctf_packet_index *element) + struct ctf_packet_index *element, + struct consumer_relayd_sock_pair *relayd, + bool deferred) { int ret; assert(stream); assert(element); - rcu_read_lock(); - if (stream->net_seq_idx != (uint64_t) -1ULL) { - struct consumer_relayd_sock_pair *relayd; - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_send_index(&relayd->control_sock, element, - stream->relayd_stream_id, stream->next_net_seq_num - 1); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } else { - ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", - stream->key, stream->net_seq_idx); - ret = -1; - } + if (relayd) { + ret = relayd_send_index(relayd, element, + stream->relayd_stream_id, stream->next_net_seq_num - 1, + deferred); } else { if (lttng_index_file_write(stream->index_file, element)) { ret = -1; @@ -389,7 +382,6 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, } error: - rcu_read_unlock(); return ret; } diff --git a/src/common/consumer/consumer-stream.h b/src/common/consumer/consumer-stream.h index c5fb09732d..cf00990e6b 100644 --- a/src/common/consumer/consumer-stream.h +++ b/src/common/consumer/consumer-stream.h @@ -72,7 +72,9 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream); * Write index of a specific stream either on the relayd or local disk. */ int consumer_stream_write_index(struct lttng_consumer_stream *stream, - struct ctf_packet_index *index); + struct ctf_packet_index *index, + struct consumer_relayd_sock_pair *relayd, + bool deferred); int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, uint64_t session_id); diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index 2d07abc644..292f869601 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -129,7 +129,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, } static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, - uint64_t stream_id) + uint64_t stream_id, struct consumer_relayd_sock_pair *relayd, bool deferred) { int ret; struct ctf_packet_index index; @@ -137,7 +137,7 @@ static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, memset(&index, 0, sizeof(index)); index.stream_id = htobe64(stream_id); index.timestamp_end = htobe64(ts); - ret = consumer_stream_write_index(stream, &index); + ret = consumer_stream_write_index(stream, &index, relayd, deferred); if (ret < 0) { goto error; } @@ -146,7 +146,9 @@ static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, return ret; } -int consumer_flush_kernel_index(struct lttng_consumer_stream *stream) +int consumer_flush_kernel_index(struct lttng_consumer_stream *stream, + struct consumer_relayd_sock_pair *relayd, + bool deferred) { uint64_t ts, stream_id; int ret; @@ -174,7 +176,7 @@ int consumer_flush_kernel_index(struct lttng_consumer_stream *stream) goto end; } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); - ret = send_empty_index(stream, ts, stream_id); + ret = send_empty_index(stream, ts, stream_id, relayd, deferred); if (ret < 0) { goto end; } @@ -184,7 +186,8 @@ int consumer_flush_kernel_index(struct lttng_consumer_stream *stream) return ret; } -static int check_kernel_stream(struct lttng_consumer_stream *stream) +static int check_kernel_stream(struct lttng_consumer_stream *stream, + struct consumer_relayd_sock_pair *relayd) { int ret; @@ -223,13 +226,15 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream) } break; } - ret = consumer_flush_kernel_index(stream); + ret = consumer_flush_kernel_index(stream, relayd, true); pthread_mutex_unlock(&stream->lock); end: return ret; } -int consumer_flush_ust_index(struct lttng_consumer_stream *stream) +int consumer_flush_ust_index(struct lttng_consumer_stream *stream, + struct consumer_relayd_sock_pair *relayd, + bool deferred) { uint64_t ts, stream_id; int ret; @@ -258,7 +263,8 @@ int consumer_flush_ust_index(struct lttng_consumer_stream *stream) goto end; } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); - ret = send_empty_index(stream, ts, stream_id); + ret = send_empty_index(stream, ts, stream_id, + relayd, deferred); if (ret < 0) { goto end; } @@ -268,7 +274,8 @@ int consumer_flush_ust_index(struct lttng_consumer_stream *stream) return ret; } -static int check_ust_stream(struct lttng_consumer_stream *stream) +static int check_ust_stream(struct lttng_consumer_stream *stream, + struct consumer_relayd_sock_pair *relayd) { int ret; @@ -309,7 +316,7 @@ static int check_ust_stream(struct lttng_consumer_stream *stream) } break; } - ret = consumer_flush_ust_index(stream); + ret = consumer_flush_ust_index(stream, relayd, true); pthread_mutex_unlock(&stream->lock); end: return ret; @@ -327,6 +334,7 @@ static void live_timer(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream; struct lttng_ht *ht; struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd = NULL; if (channel->switch_timer_error) { goto error; @@ -335,6 +343,16 @@ static void live_timer(struct lttng_consumer_local_data *ctx, DBG("Live timer for channel %" PRIu64, channel->key); + if (channel->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(channel->relayd_id); + if (!relayd) { + ERR("Channel %s relayd ID %" PRIu64 " unknown. Can't process live timer.", + channel->name, channel->relayd_id); + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } + switch (ctx->type) { case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: @@ -342,7 +360,7 @@ static void live_timer(struct lttng_consumer_local_data *ctx, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter, stream, node_channel_id.node) { - ret = check_ust_stream(stream); + ret = check_ust_stream(stream, relayd); if (ret < 0) { goto error; } @@ -353,7 +371,7 @@ static void live_timer(struct lttng_consumer_local_data *ctx, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter, stream, node_channel_id.node) { - ret = check_kernel_stream(stream); + ret = check_kernel_stream(stream, relayd); if (ret < 0) { goto error; } @@ -365,6 +383,9 @@ static void live_timer(struct lttng_consumer_local_data *ctx, } error: + if (relayd) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } return; } @@ -481,7 +502,6 @@ int consumer_channel_timer_stop(timer_t *timer_id, int signal) goto end; } - consumer_timer_signal_thread_qs(signal); *timer_id = 0; end: return ret; diff --git a/src/common/consumer/consumer-timer.h b/src/common/consumer/consumer-timer.h index 1b5dd414c9..928eba7137 100644 --- a/src/common/consumer/consumer-timer.h +++ b/src/common/consumer/consumer-timer.h @@ -57,8 +57,8 @@ int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel); void *consumer_timer_thread(void *data); int consumer_signal_init(void); -int consumer_flush_kernel_index(struct lttng_consumer_stream *stream); -int consumer_flush_ust_index(struct lttng_consumer_stream *stream); +int consumer_flush_kernel_index(struct lttng_consumer_stream *stream, struct consumer_relayd_sock_pair *relayd, bool deferred); +int consumer_flush_ust_index(struct lttng_consumer_stream *stream, struct consumer_relayd_sock_pair *relayd, bool deferred); int consumer_timer_thread_get_channel_monitor_pipe(void); int consumer_timer_thread_set_channel_monitor_pipe(int fd); diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index afc346333c..735bc14678 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -422,25 +422,24 @@ static void cleanup_relayd_ht(void) } /* - * Update the end point status of all streams having the given network sequence - * index (relayd index). + * Update the end point status of all streams having the given relayd id. * * It's atomically set without having the stream mutex locked which is fine * because we handle the write/read race with a pipe wakeup for each thread. */ -static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, +static void update_endpoint_status_by_netidx(uint64_t relayd_id, enum consumer_endpoint_status status) { struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; - DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); + DBG("Consumer set delete flag on stream by idx %" PRIu64, relayd_id); rcu_read_lock(); /* Let's begin with metadata */ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { - if (stream->net_seq_idx == net_seq_idx) { + if (stream->relayd_id == relayd_id) { uatomic_set(&stream->endpoint_status, status); DBG("Delete flag set to metadata stream %d", stream->wait_fd); } @@ -448,7 +447,7 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, /* Follow up by the data streams */ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { - if (stream->net_seq_idx == net_seq_idx) { + if (stream->relayd_id == relayd_id) { uatomic_set(&stream->endpoint_status, status); DBG("Delete flag set to data stream %d", stream->wait_fd); } @@ -474,7 +473,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, DBG("Cleaning up relayd sockets"); /* Save the net sequence index before destroying the object */ - netidx = relayd->net_seq_idx; + netidx = relayd->id; /* * Delete the relayd from the relayd hash table, close the sockets and free @@ -584,7 +583,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->state = state; stream->uid = uid; stream->gid = gid; - stream->net_seq_idx = relayd_id; + stream->relayd_id = relayd_id; stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; @@ -624,7 +623,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64, stream->name, stream->key, channel_key, - stream->net_seq_idx, stream->session_id); + stream->relayd_id, stream->session_id); rcu_read_unlock(); return stream; @@ -714,7 +713,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd) assert(relayd); lttng_ht_lookup(consumer_data.relayd_ht, - &relayd->net_seq_idx, &iter); + &relayd->id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { goto end; @@ -729,12 +728,12 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd) * Allocate and return a consumer relayd socket. */ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( - uint64_t net_seq_idx) + uint64_t relayd_id) { struct consumer_relayd_sock_pair *obj = NULL; /* net sequence index of -1 is a failure */ - if (net_seq_idx == (uint64_t) -1ULL) { + if (relayd_id == (uint64_t) -1ULL) { goto error; } @@ -744,12 +743,13 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( goto error; } - obj->net_seq_idx = net_seq_idx; + obj->id = relayd_id; obj->refcount = 0; obj->destroy_flag = 0; obj->control_sock.sock.fd = -1; obj->data_sock.sock.fd = -1; - lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx); + lttng_ht_node_init_u64(&obj->node, obj->id); + lttng_dynamic_buffer_init(&obj->deferred_indexes.buffer); pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); error: @@ -797,12 +797,12 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, struct consumer_relayd_sock_pair *relayd; assert(stream); - assert(stream->net_seq_idx != -1ULL); + assert(stream->relayd_id != -1ULL); assert(path); /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -819,13 +819,13 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->sent_to_relayd = 1; } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", - stream->key, stream->net_seq_idx); + stream->key, stream->relayd_id); ret = -1; goto end; } DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, - stream->name, stream->key, stream->net_seq_idx); + stream->name, stream->key, stream->relayd_id); end: rcu_read_unlock(); @@ -837,16 +837,16 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, * * Returns 0 on success, < 0 on error */ -int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) +int consumer_send_relayd_streams_sent(uint64_t relayd_id) { int ret = 0; struct consumer_relayd_sock_pair *relayd; - assert(net_seq_idx != -1ULL); + assert(relayd_id != -1ULL); /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(net_seq_idx); + relayd = consumer_find_relayd(relayd_id); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -857,13 +857,13 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) } } else { ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", - net_seq_idx); + relayd_id); ret = -1; goto end; } ret = 0; - DBG("All streams sent relayd id %" PRIu64, net_seq_idx); + DBG("All streams sent relayd id %" PRIu64, relayd_id); end: rcu_read_unlock(); @@ -879,7 +879,7 @@ void close_relayd_stream(struct lttng_consumer_stream *stream) /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd) { consumer_stream_relayd_close(stream, relayd); } @@ -1541,8 +1541,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != (uint64_t) -1ULL) { - relayd = consumer_find_relayd(stream->net_seq_idx); + if (stream->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(stream->relayd_id); if (relayd == NULL) { ret = -EPIPE; goto end; @@ -1772,8 +1772,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != (uint64_t) -1ULL) { - relayd = consumer_find_relayd(stream->net_seq_idx); + if (stream->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(stream->relayd_id); if (relayd == NULL) { written = -ret; goto end; @@ -3496,7 +3496,7 @@ int lttng_consumer_init(void) * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ - void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, + void consumer_add_relayd_socket(uint64_t relayd_id, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, @@ -3509,14 +3509,14 @@ int lttng_consumer_init(void) assert(ctx); assert(relayd_sock); - DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); + DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", relayd_id); /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(net_seq_idx); + relayd = consumer_find_relayd(relayd_id); if (relayd == NULL) { assert(sock_type == LTTNG_STREAM_CONTROL); /* Not found. Allocate one. */ - relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); + relayd = consumer_allocate_relayd_sock_pair(relayd_id); if (relayd == NULL) { ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; @@ -3635,7 +3635,7 @@ int lttng_consumer_init(void) DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", - relayd->net_seq_idx, fd); + relayd->id, fd); /* We successfully added the socket. Send status back. */ ret = consumer_send_status_msg(sock, ret_code); @@ -3711,7 +3711,7 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) struct lttng_ht_iter iter; struct consumer_relayd_sock_pair *relayd = NULL; - /* Iterate over all relayd since they are indexed by net_seq_idx. */ + /* Iterate over all relayd since they are indexed by relayd_id. */ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { /* @@ -4190,7 +4190,7 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx, struct consumer_relayd_sock_pair *relayd; DBG("Rotate relay stream"); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (!relayd) { ERR("Failed to find relayd"); ret = -1; @@ -4225,7 +4225,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, DBG("Consumer rotate stream %" PRIu64, stream->key); - if (stream->net_seq_idx != (uint64_t) -1ULL) { + if (stream->relayd_id != (uint64_t) -1ULL) { ret = rotate_relay_stream(ctx, stream); } else { ret = rotate_local_stream(ctx, stream); diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 190de54ed7..5ca8593f1d 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -33,6 +33,7 @@ #include #include #include +#include /* Commands for consumer */ enum lttng_consumer_command { @@ -327,8 +328,11 @@ struct lttng_consumer_stream { /* UID/GID of the user owning the session to which stream belongs */ uid_t uid; gid_t gid; - /* Network sequence number. Indicating on which relayd socket it goes. */ - uint64_t net_seq_idx; + /* + * Relayd id, indicating on which relayd socket it goes. Set to -1ULL if + * not the stream is not associated to a relay daemon. + */ + uint64_t relayd_id; /* * Indicate if this stream was successfully sent to a relayd. This is set * after the refcount of the relayd is incremented and is checked when the @@ -476,7 +480,7 @@ struct lttng_consumer_stream { */ struct consumer_relayd_sock_pair { /* Network sequence number. */ - uint64_t net_seq_idx; + uint64_t id; /* Number of stream associated with this relayd */ int refcount; @@ -512,6 +516,11 @@ struct consumer_relayd_sock_pair { /* Session id on both sides for the sockets. */ uint64_t relayd_session_id; uint64_t sessiond_session_id; + + struct { + int count; + struct lttng_dynamic_buffer buffer; + } deferred_indexes; }; /* @@ -761,7 +770,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel); /* lttng-relayd consumer command */ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key); int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path); -int consumer_send_relayd_streams_sent(uint64_t net_seq_idx); +int consumer_send_relayd_streams_sent(uint64_t relayd_id); void close_relayd_stream(struct lttng_consumer_stream *stream); struct lttng_consumer_channel *consumer_find_channel(uint64_t key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, @@ -804,7 +813,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); -void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, +void consumer_add_relayd_socket(uint64_t relayd_id, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, uint64_t relayd_session_id); diff --git a/src/common/index/Makefile.am b/src/common/index/Makefile.am index ee73ea2ca3..ee9b6f8917 100644 --- a/src/common/index/Makefile.am +++ b/src/common/index/Makefile.am @@ -1,3 +1,3 @@ noinst_LTLIBRARIES = libindex.la -libindex_la_SOURCES = index.c index.h ctf-index.h +libindex_la_SOURCES = index.c index.h ctf-index.h index-group.h diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 66ae16bc8f..3672b8cc49 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -167,7 +167,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, * Assign the received relayd ID so we can use it for streaming. The streams * are not visible to anyone so this is OK to change it. */ - stream->net_seq_idx = relayd_id; + stream->relayd_id = relayd_id; channel->relayd_id = relayd_id; if (relayd_id != (uint64_t) -1ULL) { ret = consumer_send_relayd_stream(stream, path); @@ -310,7 +310,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } } else { close_relayd_stream(stream); - stream->net_seq_idx = (uint64_t) -1ULL; + stream->relayd_id = (uint64_t) -1ULL; } pthread_mutex_unlock(&stream->lock); } @@ -399,7 +399,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, if (use_relayd) { close_relayd_stream(metadata_stream); - metadata_stream->net_seq_idx = (uint64_t) -1ULL; + metadata_stream->relayd_id = (uint64_t) -1ULL; } else { if (metadata_stream->out_fd >= 0) { ret = close(metadata_stream->out_fd); @@ -717,13 +717,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel->monitor) { DBG("Kernel consumer add stream %s in no monitor mode with " "relayd id %" PRIu64, new_stream->name, - new_stream->net_seq_idx); + new_stream->relayd_id); cds_list_add(&new_stream->send_node, &channel->streams.head); break; } /* Send stream to relayd if the stream has an ID. */ - if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + if (new_stream->relayd_id != (uint64_t) -1ULL) { ret = consumer_send_relayd_stream(new_stream, new_stream->chan->pathname); if (ret < 0) { @@ -738,7 +738,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ if (channel->streams_sent_to_relayd) { ret = consumer_send_relayd_streams_sent( - new_stream->net_seq_idx); + new_stream->relayd_id); if (ret < 0) { goto end_nosignal; } @@ -1645,8 +1645,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * network streaming or the full padding (len) size when we are _not_ * streaming. */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { + if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || + (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { /* * Display the error but continue processing to try to release the * subbuffer. This is a DBG statement since this is possible to @@ -1680,6 +1680,23 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto rotate; } + /* TODO: make sure the stream lock is held, this would guarantee that + * the channel is present. + * + * Is rcu_read_lock taken ? + */ + struct consumer_relayd_sock_pair *relayd = NULL; + if (stream->chan->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(stream->chan->relayd_id); + if (!relayd) { + ERR("Channel %s relayd ID %" PRIu64 " unknown. Can't write index to it.", + stream->chan->name, stream->chan->relayd_id); + ret = -1; + goto error; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } if (stream->chan->live_timer_interval && !stream->metadata_flag) { /* * In live, block until all the metadata is sent. @@ -1696,16 +1713,21 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, if (stream->missed_metadata_flush) { stream->missed_metadata_flush = false; pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) consumer_flush_kernel_index(stream); + (void) consumer_flush_kernel_index(stream, relayd, false); } else { pthread_mutex_unlock(&stream->metadata_timer_lock); } if (err < 0) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto error; } } - err = consumer_stream_write_index(stream, &index); + err = consumer_stream_write_index(stream, &index, relayd, false); + if (relayd) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + if (err < 0) { goto error; } @@ -1743,7 +1765,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) * Don't create anything if this is set for streaming or should not be * monitored. */ - if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { + if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid, NULL); diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index b88a536b95..82bd102d75 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -28,35 +28,15 @@ #include #include #include +#include #include "relayd.h" -/* - * Send command. Fill up the header and append the data. - */ -static int send_command(struct lttcomm_relayd_sock *rsock, - enum lttcomm_relayd_command cmd, const void *data, size_t size, - int flags) +static int craft_command(struct lttng_dynamic_buffer *buffer, + enum lttcomm_relayd_command cmd, const void *data, size_t size) { int ret; struct lttcomm_relayd_hdr header; - char *buf; - uint64_t buf_size = sizeof(header); - - if (rsock->sock.fd < 0) { - return -ECONNRESET; - } - - if (data) { - buf_size += size; - } - - buf = zmalloc(buf_size); - if (buf == NULL) { - PERROR("zmalloc relayd send command buf"); - ret = -1; - goto alloc_error; - } memset(&header, 0, sizeof(header)); header.cmd = htobe32(cmd); @@ -66,23 +46,67 @@ static int send_command(struct lttcomm_relayd_sock *rsock, header.cmd_version = 0; header.circuit_id = 0; - /* Prepare buffer to send. */ - memcpy(buf, &header, sizeof(header)); + ret = lttng_dynamic_buffer_append(buffer, &header, sizeof(header)); + if (ret) { + ret = -1; + goto end; + } if (data) { - memcpy(buf + sizeof(header), data, size); + ret = lttng_dynamic_buffer_append(buffer, &data, size); + if (ret) { + ret = -1; + goto end; + } } +end: + return ret; +} + +static int send_buffer(struct lttcomm_relayd_sock *sock, + struct lttng_dynamic_buffer *buffer, int flags) +{ + int ret; - DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size); - ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags); + ret = sock->sock.ops->sendmsg(&sock->sock, buffer->data, buffer->size, flags); if (ret < 0) { - PERROR("Failed to send command %d of size %" PRIu64, - (int) cmd, buf_size); + PERROR("Failed to send buffer of size %" PRIu64, buffer->size); ret = -errno; + } + return ret; +} + +/* + * Send command. Fill up the header and append the data. + */ +static int send_command(struct lttcomm_relayd_sock *rsock, + enum lttcomm_relayd_command cmd, const void *data, size_t size, + int flags) +{ + int ret; + struct lttng_dynamic_buffer buffer; + + if (rsock->sock.fd < 0) { + return -ECONNRESET; + } + + lttng_dynamic_buffer_init(&buffer); + + ret = craft_command(&buffer, cmd, data, size); + if (ret) { + ERR("Error crafting command %d", (int) cmd); + ret = -ENOMEM; + goto error; + } + + DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buffer.size); + ret = send_buffer(rsock, &buffer, flags); + if (ret < 0) { + ERR("Failed to send command %d of size %" PRIu64, + (int) cmd, buffer.size); goto error; } error: - free(buf); -alloc_error: + lttng_dynamic_buffer_reset(&buffer); return ret; } @@ -962,20 +986,17 @@ int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id, } /* - * Send index to the relayd. + * Queue index to send to the relayd. */ -int relayd_send_index(struct lttcomm_relayd_sock *rsock, +int relayd_send_index(struct consumer_relayd_sock_pair *relayd, struct ctf_packet_index *index, uint64_t relay_stream_id, - uint64_t net_seq_num) + uint64_t net_seq_num, bool deferred) { int ret; struct lttcomm_relayd_index msg; struct lttcomm_relayd_generic_reply reply; - /* Code flow error. Safety net. */ - assert(rsock); - - if (rsock->minor < 4) { + if (relayd->control_sock.minor < 4) { DBG("Not sending indexes before protocol 2.4"); ret = 0; goto error; @@ -995,38 +1016,67 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock, msg.events_discarded = index->events_discarded; msg.stream_id = index->stream_id; - if (rsock->minor >= 8) { + if (relayd->control_sock.minor >= 8) { msg.stream_instance_id = index->stream_instance_id; msg.packet_seq_num = index->packet_seq_num; } /* Send command */ - ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, - lttcomm_relayd_index_len(lttng_to_index_major(rsock->major, - rsock->minor), - lttng_to_index_minor(rsock->major, rsock->minor)), + if (!deferred) { + ret = send_command(&relayd->control_sock, RELAYD_SEND_INDEX, &msg, + lttcomm_relayd_index_len(lttng_to_index_major(relayd->control_sock.major, + relayd->control_sock.minor), + lttng_to_index_minor(relayd->control_sock.major, + relayd->control_sock.minor)), 0); - if (ret < 0) { - goto error; - } + if (ret < 0) { + goto error; + } - /* Receive response */ - ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); - if (ret < 0) { - goto error; - } - reply.ret_code = be32toh(reply.ret_code); + /* Receive response */ + ret = recv_reply(&relayd->control_sock, (void *) &reply, + sizeof(reply)); + if (ret < 0) { + goto error; + } - /* Return session id or negative ret code. */ - if (reply.ret_code != LTTNG_OK) { - ret = -1; - ERR("Relayd send index replied error %d", reply.ret_code); + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd send index replied error %d", reply.ret_code); + } else { + /* Success */ + ret = 0; + } } else { - /* Success */ - ret = 0; - } + struct lttng_dynamic_buffer buffer; + lttng_dynamic_buffer_init(&buffer); + + ret = craft_command(&buffer, RELAYD_SEND_INDEX, &msg, + lttcomm_relayd_index_len(lttng_to_index_major(relayd->control_sock.major, + relayd->control_sock.minor), + lttng_to_index_minor(relayd->control_sock.major, + relayd->control_sock.minor)) + ); + if (ret) { + ret = -1; + goto deferred_error; + } + ret = lttng_dynamic_buffer_append_buffer(&relayd->deferred_indexes.buffer, &buffer); + if (ret) { + ret = -1; + goto deferred_error; + } + + /* Increment the number of indexes deferred */ + relayd->deferred_indexes.count++; +deferred_error: + lttng_dynamic_buffer_reset(&buffer); + } error: return ret; } diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 9353fc2d78..5b5da524bf 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -19,9 +19,11 @@ #define _RELAYD_H #include +#include #include #include +#include int relayd_connect(struct lttcomm_relayd_sock *sock); int relayd_close(struct lttcomm_relayd_sock *sock); @@ -47,9 +49,9 @@ int relayd_quiescent_control(struct lttcomm_relayd_sock *sock, int relayd_begin_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id); int relayd_end_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id, unsigned int *is_data_inflight); -int relayd_send_index(struct lttcomm_relayd_sock *rsock, +int relayd_send_index(struct consumer_relayd_sock_pair *relayd, struct ctf_packet_index *index, uint64_t relay_stream_id, - uint64_t net_seq_num); + uint64_t net_seq_num, bool deferred); int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version); int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index cb5743fad6..144115eefd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -552,7 +552,7 @@ static int send_channel_to_sessiond_and_relayd(int sock, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; - uint64_t net_seq_idx = -1ULL; + uint64_t relayd_id = -1ULL; assert(channel); assert(ctx); @@ -579,8 +579,8 @@ static int send_channel_to_sessiond_and_relayd(int sock, } ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } - if (net_seq_idx == -1ULL) { - net_seq_idx = stream->net_seq_idx; + if (relayd_id == -1ULL) { + relayd_id = stream->relayd_id; } } } @@ -935,7 +935,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) } /* Send metadata stream to relayd if needed. */ - if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) { + if (metadata->metadata_stream->relayd_id != (uint64_t) -1ULL) { ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname); if (ret < 0) { @@ -943,7 +943,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) goto error; } ret = consumer_send_relayd_streams_sent( - metadata->metadata_stream->net_seq_idx); + metadata->metadata_stream->relayd_id); if (ret < 0) { ret = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; @@ -1041,7 +1041,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, assert(metadata_stream); if (relayd_id != (uint64_t) -1ULL) { - metadata_stream->net_seq_idx = relayd_id; + metadata_stream->relayd_id = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); if (ret < 0) { goto error_stream; @@ -1119,7 +1119,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, /* Lock stream because we are about to change its state. */ pthread_mutex_lock(&stream->lock); - stream->net_seq_idx = relayd_id; + stream->relayd_id = relayd_id; if (use_relayd) { ret = consumer_send_relayd_stream(stream, path); @@ -2798,8 +2798,8 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * The mmap operation should write subbuf_size amount of data when network * streaming or the full padding (len) size when we are _not_ streaming. */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { + if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || + (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { /* * Display the error but continue processing to try to release the * subbuffer. This is a DBG statement since any unexpected kill or @@ -2831,7 +2831,19 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, if (!write_index) { goto rotate; } + + struct consumer_relayd_sock_pair *relayd = NULL; + if (stream->chan->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(stream->chan->relayd_id); + if (!relayd) { + ERR("Channel %s relayd ID %" PRIu64 " unknown. Can't write index to it.", + stream->chan->name, stream->chan->relayd_id); + ret = -1; + goto error; + } + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } if (stream->chan->live_timer_interval && !stream->metadata_flag) { /* * In live, block until all the metadata is sent. @@ -2848,18 +2860,24 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, if (stream->missed_metadata_flush) { stream->missed_metadata_flush = false; pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) consumer_flush_ust_index(stream); + (void) consumer_flush_ust_index(stream, relayd, false); } else { pthread_mutex_unlock(&stream->metadata_timer_lock); } if (err < 0) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto error; } } assert(!stream->metadata_flag); - err = consumer_stream_write_index(stream, &index); + + + err = consumer_stream_write_index(stream, &index, relayd, false); + if (relayd) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } if (err < 0) { goto error; } @@ -2898,7 +2916,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); /* Don't create anything if this is set for streaming. */ - if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { + if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid, NULL); diff --git a/tests/regression/tools/live/Makefile.am b/tests/regression/tools/live/Makefile.am index 46186d3835..16780c63c6 100644 --- a/tests/regression/tools/live/Makefile.am +++ b/tests/regression/tools/live/Makefile.am @@ -21,7 +21,7 @@ EXTRA_DIST += test_ust test_ust_tracefile_count test_lttng_ust endif live_test_SOURCES = live_test.c -live_test_LDADD = $(LIBTAP) $(LIBCOMMON) $(LIBRELAYD) $(LIBSESSIOND_COMM) \ +live_test_LDADD = $(LIBTAP) $(LIBRELAYD) $(LIBCOMMON) $(LIBSESSIOND_COMM) \ $(LIBHASHTABLE) $(LIBHEALTH) $(DL_LIBS) -lrt live_test_LDADD += $(LIVE) \ $(top_builddir)/src/lib/lttng-ctl/liblttng-ctl.la