Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch commands #6

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions src/common/consumer/consumer-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
uatomic_read(&relayd->destroy_flag)) {
consumer_destroy_relayd(relayd);
}
stream->net_seq_idx = (uint64_t) -1ULL;
stream->relayd_id = (uint64_t) -1ULL;
stream->sent_to_relayd = 0;
}

Expand Down Expand Up @@ -170,7 +170,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)

/* Check and cleanup relayd if needed. */
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
relayd = consumer_find_relayd(stream->relayd_id);
if (relayd != NULL) {
consumer_stream_relayd_close(stream, relayd);
}
Expand Down Expand Up @@ -352,31 +352,24 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,

/*
* Write index of a specific stream either on the relayd or local disk.
* RCU read lock must be held by the caller.
*
* Return 0 on success or else a negative value.
*/
int consumer_stream_write_index(struct lttng_consumer_stream *stream,
struct ctf_packet_index *element)
struct ctf_packet_index *element,
struct consumer_relayd_sock_pair *relayd,
bool deferred)
{
int ret;

assert(stream);
assert(element);

rcu_read_lock();
if (stream->net_seq_idx != (uint64_t) -1ULL) {
struct consumer_relayd_sock_pair *relayd;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_index(&relayd->control_sock, element,
stream->relayd_stream_id, stream->next_net_seq_num - 1);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
stream->key, stream->net_seq_idx);
ret = -1;
}
if (relayd) {
ret = relayd_send_index(relayd, element,
stream->relayd_stream_id, stream->next_net_seq_num - 1,
deferred);
} else {
if (lttng_index_file_write(stream->index_file, element)) {
ret = -1;
Expand All @@ -389,7 +382,6 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream,
}

error:
rcu_read_unlock();
return ret;
}

Expand Down
4 changes: 3 additions & 1 deletion src/common/consumer/consumer-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream);
* Write index of a specific stream either on the relayd or local disk.
*/
int consumer_stream_write_index(struct lttng_consumer_stream *stream,
struct ctf_packet_index *index);
struct ctf_packet_index *index,
struct consumer_relayd_sock_pair *relayd,
bool deferred);

int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
uint64_t session_id);
Expand Down
46 changes: 33 additions & 13 deletions src/common/consumer/consumer-timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
}

static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
uint64_t stream_id)
uint64_t stream_id, struct consumer_relayd_sock_pair *relayd, bool deferred)
{
int ret;
struct ctf_packet_index index;

memset(&index, 0, sizeof(index));
index.stream_id = htobe64(stream_id);
index.timestamp_end = htobe64(ts);
ret = consumer_stream_write_index(stream, &index);
ret = consumer_stream_write_index(stream, &index, relayd, deferred);
if (ret < 0) {
goto error;
}
Expand All @@ -146,7 +146,9 @@ static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
return ret;
}

int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
int consumer_flush_kernel_index(struct lttng_consumer_stream *stream,
struct consumer_relayd_sock_pair *relayd,
bool deferred)
{
uint64_t ts, stream_id;
int ret;
Expand Down Expand Up @@ -174,7 +176,7 @@ int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
ret = send_empty_index(stream, ts, stream_id);
ret = send_empty_index(stream, ts, stream_id, relayd, deferred);
if (ret < 0) {
goto end;
}
Expand All @@ -184,7 +186,8 @@ int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
return ret;
}

static int check_kernel_stream(struct lttng_consumer_stream *stream)
static int check_kernel_stream(struct lttng_consumer_stream *stream,
struct consumer_relayd_sock_pair *relayd)
{
int ret;

Expand Down Expand Up @@ -223,13 +226,15 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream)
}
break;
}
ret = consumer_flush_kernel_index(stream);
ret = consumer_flush_kernel_index(stream, relayd, true);
pthread_mutex_unlock(&stream->lock);
end:
return ret;
}

int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
int consumer_flush_ust_index(struct lttng_consumer_stream *stream,
struct consumer_relayd_sock_pair *relayd,
bool deferred)
{
uint64_t ts, stream_id;
int ret;
Expand Down Expand Up @@ -258,7 +263,8 @@ int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
ret = send_empty_index(stream, ts, stream_id);
ret = send_empty_index(stream, ts, stream_id,
relayd, deferred);
if (ret < 0) {
goto end;
}
Expand All @@ -268,7 +274,8 @@ int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
return ret;
}

static int check_ust_stream(struct lttng_consumer_stream *stream)
static int check_ust_stream(struct lttng_consumer_stream *stream,
struct consumer_relayd_sock_pair *relayd)
{
int ret;

Expand Down Expand Up @@ -309,7 +316,7 @@ static int check_ust_stream(struct lttng_consumer_stream *stream)
}
break;
}
ret = consumer_flush_ust_index(stream);
ret = consumer_flush_ust_index(stream, relayd, true);
pthread_mutex_unlock(&stream->lock);
end:
return ret;
Expand All @@ -327,6 +334,7 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream;
struct lttng_ht *ht;
struct lttng_ht_iter iter;
struct consumer_relayd_sock_pair *relayd = NULL;

if (channel->switch_timer_error) {
goto error;
Expand All @@ -335,14 +343,24 @@ static void live_timer(struct lttng_consumer_local_data *ctx,

DBG("Live timer for channel %" PRIu64, channel->key);

if (channel->relayd_id != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(channel->relayd_id);
if (!relayd) {
ERR("Channel %s relayd ID %" PRIu64 " unknown. Can't process live timer.",
channel->name, channel->relayd_id);
}

pthread_mutex_lock(&relayd->ctrl_sock_mutex);
}

switch (ctx->type) {
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
stream, node_channel_id.node) {
ret = check_ust_stream(stream);
ret = check_ust_stream(stream, relayd);
if (ret < 0) {
goto error;
}
Expand All @@ -353,7 +371,7 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
stream, node_channel_id.node) {
ret = check_kernel_stream(stream);
ret = check_kernel_stream(stream, relayd);
if (ret < 0) {
goto error;
}
Expand All @@ -365,6 +383,9 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
}

error:
if (relayd) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
return;
}

Expand Down Expand Up @@ -481,7 +502,6 @@ int consumer_channel_timer_stop(timer_t *timer_id, int signal)
goto end;
}

consumer_timer_signal_thread_qs(signal);
*timer_id = 0;
end:
return ret;
Expand Down
4 changes: 2 additions & 2 deletions src/common/consumer/consumer-timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel);
void *consumer_timer_thread(void *data);
int consumer_signal_init(void);

int consumer_flush_kernel_index(struct lttng_consumer_stream *stream);
int consumer_flush_ust_index(struct lttng_consumer_stream *stream);
int consumer_flush_kernel_index(struct lttng_consumer_stream *stream, struct consumer_relayd_sock_pair *relayd, bool deferred);
int consumer_flush_ust_index(struct lttng_consumer_stream *stream, struct consumer_relayd_sock_pair *relayd, bool deferred);

int consumer_timer_thread_get_channel_monitor_pipe(void);
int consumer_timer_thread_set_channel_monitor_pipe(int fd);
Expand Down
Loading