Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stdlib): properly detect tokio runtime in dns_lookup #882

Merged
merged 8 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ stdlib = [
"dep:snap",
"dep:strip-ansi-escapes",
"dep:syslog_loose",
"dep:tokio",
"dep:uaparser",
"dep:url",
"dep:utf8-width",
Expand Down Expand Up @@ -203,6 +204,7 @@ domain = { version = "0.10.0", optional = true, features = ["resolv-sync", "serd
hostname = { version = "0.4", optional = true }
grok = { version = "2", optional = true }
onig = { version = "6", default-features = false, optional = true }
tokio = { version = "1.37", optional = true, features = ["io-util", "macros", "net", "time", "sync", "rt", "rt-multi-thread" ] }
uuid = { version = "1", features = ["v4", "v7"], optional = true }

# Dependencies used for WASM
Expand Down
75 changes: 72 additions & 3 deletions src/stdlib/dns_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::compiler::prelude::*;
#[cfg(not(target_arch = "wasm32"))]
mod non_wasm {
use std::collections::BTreeMap;
use std::io::Error;
use std::net::ToSocketAddrs;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

use domain::base::iana::Class;
Expand All @@ -12,10 +15,69 @@ mod non_wasm {
use domain::resolv::stub::conf::{ResolvConf, ResolvOptions, ServerConf, Transport};
use domain::resolv::stub::Answer;
use domain::resolv::StubResolver;
use once_cell::sync::Lazy;
use tokio::runtime::Handle;

use crate::compiler::prelude::*;
use crate::value::Value;

static WORKER: Lazy<Worker> = Lazy::new(|| Worker::new());

type Job<T> = Box<dyn FnOnce() -> T + Send + 'static>;
struct JobHandle<T> {
job: Job<T>,
result: Arc<mpsc::Sender<T>>,
}

struct Worker {
thread: Option<thread::JoinHandle<()>>,
queue: Option<mpsc::Sender<JobHandle<Result<Answer, Error>>>>,
}

impl Worker {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you borrow this implementation from somewhere? It's slightly more complicated than I might have expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is roughly based on https://doc.rust-lang.org/book/ch20-02-multithreaded.html
I tried to simplify it as much as I could, but I thought I had to have some kind of channel to take in the function and to send back the result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, gotcha, yeah I see. It seems like that example just creates one channel that is reused where this implementation creates a channel per call to execute. Is there a reason we need to create one channel per execute call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. My bad, one should be enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, a single mpsc::sync_channel(CHANNEL_CAPACITY) should be enough.
Both the sender and the receiver should be lazily instantiated instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is all contained in Worker instance now, so both of them are lazily insttantiated (since Worker is lazily instantiated). There are 2 bounded channels though, one for jobs and one for results. I hope that is alright.

I have made the capacity 0 for testing (meaning it is always blocking), but on the other hand, not sure if it makes sense to make it any bigger, considering there is only 1 thread handling jobs. I might be missing something,

fn new() -> Self {
let (sender, receiver) = mpsc::channel::<JobHandle<Result<Answer, Error>>>();
let receiver = Arc::new(Mutex::new(receiver));
Self {
thread: Some(thread::spawn(move || loop {
match receiver.lock().unwrap().recv() {
Ok(handle) => {
let result = (handle.job)();
handle.result.as_ref().send(result).unwrap();
Copy link
Member

@jszwedko jszwedko Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change all of these unwraps to expects?

}
Err(_) => todo!(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we do here? Panic?

}
})),
queue: Some(sender),
}
}

fn execute<F>(&self, f: F) -> Result<Answer, Error>
where
F: FnOnce() -> Result<Answer, Error> + Send + 'static,
{
let job = Box::new(f);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let handle = JobHandle {
job,
result: Arc::new(sender),
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to my above question, did you borrow this from somewhere? It seems like a lot of "work" to do for each execute function such that I'm wondering what the performance looks like. Maybe we could add a benchmark for it in benches/stdlib.rs to see?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are already some benchmarks there for the dns_lookup function. Did you mean we should add some benchmarks for the Worker itself?


self.queue.as_ref().unwrap().send(handle).unwrap();
return receiver.lock().unwrap().recv().unwrap();
}
}

impl Drop for Worker {
fn drop(&mut self) {
drop(self.queue.take());
if let Some(thread) = self.thread.take() {
thread.join().unwrap();
}
}
}
Copy link
Member

@pront pront Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @esensar, thank you for this patch. I think it's acceptable workaround for an undocumented function. I think it's good to add some comments here and also a module-level doc //! to explain what this function does and the limitations of a single threading solution. Especially when it comes:

  • Blocking
  • Worker Thread Saturation
  • Unbounded Job Queue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reasonable improvement here is to use a bounded channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that makes sense. I will try to add these as soon as possible, but I will not be available for the next 2-3 days. When is the release planned? I would love to get this ready for that release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The release is planned for the 17th, which doesn't leave us too much time 😓 We do minor releases every 6 weeks though so it wouldn't have to wait too long if it didn't make it.


fn dns_lookup(value: Value, qtype: Value, qclass: Value, options: Value) -> Resolved {
let host: Name<Vec<_>> = value
.try_bytes_utf8_lossy()?
Expand All @@ -34,9 +96,16 @@ mod non_wasm {
.map_err(|err| format!("parsing query class failed: {err}"))?;

let conf = build_options(options.try_object()?)?;
let answer = StubResolver::run_with_conf(conf, move |stub| async move {
stub.query((host, qtype, qclass)).await
})
let answer = match Handle::try_current() {
Ok(_) => WORKER.execute(move || {
StubResolver::run_with_conf(conf, move |stub| async move {
stub.query((host, qtype, qclass)).await
})
}),
Err(_) => StubResolver::run_with_conf(conf, move |stub| async move {
stub.query((host, qtype, qclass)).await
}),
}
.map_err(|err| format!("query failed: {err}"))?;

Ok(parse_answer(answer)?.into())
Expand Down
Loading