Skip to content

Commit

Permalink
Fix leaking sessions
Browse files Browse the repository at this point in the history
There was async access happening to sessions, causing sessions being re-created after them being destroyed (and with default config, too).

Moving get_app() around, into the flushing logic to cleanly separate session access from async context.

Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
  • Loading branch information
bwoebi committed Sep 19, 2024
1 parent 89f48d4 commit 411fdaf
Showing 1 changed file with 78 additions and 82 deletions.
160 changes: 78 additions & 82 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use manual_future::{ManualFuture, ManualFutureCompleter};
use std::borrow::Cow;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};

use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
Expand Down Expand Up @@ -249,71 +248,6 @@ impl SidecarServer {
.expect("Unable to acquire lock on sessions")
}

async fn get_app(
&self,
instance_id: &InstanceId,
runtime_meta: &RuntimeMetadata,
service_name: &str,
env_name: &str,
initial_actions: Vec<TelemetryActions>,
) -> Option<AppInstance> {
let rt_info = self.get_runtime(instance_id);

let manual_app_future = rt_info.get_app(service_name, env_name);

if manual_app_future.completer.is_none() {
return manual_app_future.app_future.await;
}

let mut builder = TelemetryWorkerBuilder::new_fetch_host(
service_name.to_owned(),
runtime_meta.language_name.to_owned(),
runtime_meta.language_version.to_owned(),
runtime_meta.tracer_version.to_owned(),
);
builder.runtime_id = Some(instance_id.runtime_id.to_owned());
builder.application.env = Some(env_name.to_owned());
let session_info = self.get_session(&instance_id.session_id);
let mut config = session_info
.session_config
.lock()
.expect("Unable to acquire lock on session_config")
.clone()
.unwrap_or_else(ddtelemetry::config::Config::from_env);
config.restartable = true;

let instance_option = match builder.spawn_with_config(config.clone()).await {
Ok((handle, worker_join)) => {
info!("spawning telemetry worker {config:?}");

let instance = AppInstance {
telemetry: handle,
telemetry_worker_shutdown: worker_join.map(Result::ok).boxed().shared(),
telemetry_metrics: Default::default(),
};

instance.telemetry.send_msgs(initial_actions).await.ok();

instance
.telemetry
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await
.ok();
Some(instance)
}
Err(e) => {
error!("could not spawn telemetry worker {:?}", e);
None
}
};
manual_app_future
.completer
.expect("Completed expected Some ManualFuture for application instance, but found none")
.complete(instance_option)
.await;
manual_app_future.app_future.await
}

fn send_trace_v04(&self, headers: &SerializedTracerHeaderTags, data: &[u8], target: &Endpoint) {
let headers: TracerHeaderTags = match headers.try_into() {
Ok(headers) => headers,
Expand Down Expand Up @@ -588,20 +522,77 @@ impl SidecarInterface for SidecarServer {
}
};
if let Some(AppOrQueue::Queue(mut enqueued_data)) = app_or_queue {
let rt_info = self.get_runtime(&instance_id);
let manual_app_future = rt_info.get_app(&service_name, &env_name);

let instance_future = if manual_app_future.completer.is_some() {
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
service_name.to_owned(),
runtime_meta.language_name.to_owned(),
runtime_meta.language_version.to_owned(),
runtime_meta.tracer_version.to_owned(),
);
builder.runtime_id = Some(instance_id.runtime_id.to_owned());
builder.application.env = Some(env_name.to_owned());
let session_info = self.get_session(&instance_id.session_id);
let mut config = session_info
.session_config
.lock()
.expect("Unable to acquire lock on session_config")
.clone()
.unwrap_or_else(ddtelemetry::config::Config::from_env);
config.restartable = true;
Some(
builder
.spawn_with_config(config.clone())
.map(move |result| {
if result.is_ok() {
info!("spawning telemetry worker {config:?}");
}
result
}),
)
} else {
None
};

tokio::spawn(async move {
let mut actions: Vec<TelemetryActions> = vec![];
enqueued_data.extract_telemetry_actions(&mut actions).await;

if let Some(mut app) = self
.get_app(
&instance_id,
&runtime_meta,
&service_name,
&env_name,
actions,
)
.await
{
if let Some(instance_future) = instance_future {
let instance_option = match instance_future.await {
Ok((handle, worker_join)) => {
let instance = AppInstance {
telemetry: handle,
telemetry_worker_shutdown: worker_join
.map(Result::ok)
.boxed()
.shared(),
telemetry_metrics: Default::default(),
};

let mut actions: Vec<TelemetryActions> = vec![];
enqueued_data.extract_telemetry_actions(&mut actions).await;
instance.telemetry.send_msgs(actions).await.ok();

instance
.telemetry
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await
.ok();
Some(instance)
}
Err(e) => {
error!("could not spawn telemetry worker {:?}", e);
None
}
};
manual_app_future
.completer
.expect("Completed expected Some ManualFuture for application instance, but found none")
.complete(instance_option)
.await;
}

if let Some(mut app) = manual_app_future.app_future.await {
// Register metrics
for metric in std::mem::take(&mut enqueued_data.metrics).into_iter() {
app.register_metric(metric);
Expand All @@ -618,9 +609,14 @@ impl SidecarInterface for SidecarServer {
if actions.iter().any(|action| {
matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop))
}) {
self.get_runtime(&instance_id)
.lock_applications()
.remove(&queue_id);
// Avoid self.get_runtime(), it could create a new one.
if let Some(session) = self.lock_sessions().get(&instance_id.session_id) {
if let Some(runtime) =
session.lock_runtimes().get(&instance_id.runtime_id)
{
runtime.lock_applications().remove(&queue_id);
}
}
}

app.telemetry.send_msgs(actions).await.ok();
Expand Down

0 comments on commit 411fdaf

Please sign in to comment.