-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(stdlib): properly detect tokio runtime in dns_lookup
#882
fix(stdlib): properly detect tokio runtime in dns_lookup
#882
Conversation
When testing
Which happens because VRL is called in a tokio managed thread. The solution here was to move the call to another thread, because internally Let me know if you have a better solution for this, since this might be too much to spawn a thread for each lookup. |
Hi @esensar, thanks for suggesting a fix.
This might be problematic, could we have a pool or a dedicated thread for this? |
src/stdlib/dns_lookup.rs
Outdated
stub.query((host, qtype, qclass)).await | ||
}) | ||
let answer = match Handle::try_current() { | ||
Ok(_) => thread::spawn(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if it returns Ok
you can use something like:
futures::executor::block_on(async {
handle
.spawn(async {
...
})
})
to just execute in the current Tokio runtime instead of spawning a separate thread per https://stackoverflow.com/a/62536772
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have to await on return value of block_on
which bring us back to the same issue. I think we will have to go with a pool or a dedicated thread until proper async support is added to VRL (if it ever gets added, since it maybe clashes with original goals).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah gotcha. Then yeah, a pool seems like a good approach. If/when VRL functions become async we can revisit this. I was going to suggest we thread the through the runtime from Vector into VRL, but I think I'd prefer to just make the functions async than do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm realizing this might be the first example of an "async VRL function". Per vectordotdev/vector#20495 we've tried to avoid that until now. I think we can just hack it as you currently have with sync lookups but if we gather more examples like this it might motivate adding async
VRL function variants (internally, not exposing the async nature to users).
@esensar just a heads up that we'll be releasing Vector next week in-case you want to try to fix this before then. |
I have implemented just a single dedicated thread for this. I know that is probably not the best idea, but this close to the release, I didn't want to risk breaking stuff by adding a thread pool (which would probably include a crate, since it would be better to rely on something existing already). |
src/stdlib/dns_lookup.rs
Outdated
static WORKER: Lazy<Worker> = Lazy::new(|| Worker::new()); | ||
|
||
type Job<T> = Box<dyn FnOnce() -> T + Send + 'static>; | ||
struct JobHandle<T> { | ||
job: Job<T>, | ||
result: Arc<mpsc::Sender<T>>, | ||
} | ||
|
||
struct Worker { | ||
thread: Option<thread::JoinHandle<()>>, | ||
queue: Option<mpsc::Sender<JobHandle<Result<Answer, Error>>>>, | ||
} | ||
|
||
impl Worker { | ||
fn new() -> Self { | ||
let (sender, receiver) = mpsc::channel::<JobHandle<Result<Answer, Error>>>(); | ||
let receiver = Arc::new(Mutex::new(receiver)); | ||
Self { | ||
thread: Some(thread::spawn(move || loop { | ||
match receiver.lock().unwrap().recv() { | ||
Ok(handle) => { | ||
let result = (handle.job)(); | ||
handle.result.as_ref().send(result).unwrap(); | ||
} | ||
Err(_) => todo!(), | ||
} | ||
})), | ||
queue: Some(sender), | ||
} | ||
} | ||
|
||
fn execute<F>(&self, f: F) -> Result<Answer, Error> | ||
where | ||
F: FnOnce() -> Result<Answer, Error> + Send + 'static, | ||
{ | ||
let job = Box::new(f); | ||
let (sender, receiver) = mpsc::channel(); | ||
let receiver = Arc::new(Mutex::new(receiver)); | ||
let handle = JobHandle { | ||
job, | ||
result: Arc::new(sender), | ||
}; | ||
|
||
self.queue.as_ref().unwrap().send(handle).unwrap(); | ||
return receiver.lock().unwrap().recv().unwrap(); | ||
} | ||
} | ||
|
||
impl Drop for Worker { | ||
fn drop(&mut self) { | ||
drop(self.queue.take()); | ||
if let Some(thread) = self.thread.take() { | ||
thread.join().unwrap(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @esensar, thank you for this patch. I think it's acceptable workaround for an undocumented function. I think it's good to add some comments here and also a module-level doc //!
to explain what this function does and the limitations of a single threading solution. Especially when it comes:
- Blocking
- Worker Thread Saturation
- Unbounded Job Queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reasonable improvement here is to use a bounded channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, that makes sense. I will try to add these as soon as possible, but I will not be available for the next 2-3 days. When is the release planned? I would love to get this ready for that release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The release is planned for the 17th, which doesn't leave us too much time 😓 We do minor releases every 6 weeks though so it wouldn't have to wait too long if it didn't make it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @esensar . I think we are ok including a single-threaded implementation to unblock use of this function for the release while we think about a better model. I left some inline comments below, but we'd also like to see a warning added as a rustdoc just to make people aware of the potential bottleneck. Do you think you could add that?
src/stdlib/dns_lookup.rs
Outdated
match receiver.lock().unwrap().recv() { | ||
Ok(handle) => { | ||
let result = (handle.job)(); | ||
handle.result.as_ref().send(result).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change all of these unwrap
s to expect
s?
src/stdlib/dns_lookup.rs
Outdated
let result = (handle.job)(); | ||
handle.result.as_ref().send(result).unwrap(); | ||
} | ||
Err(_) => todo!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should we do here? Panic?
queue: Option<mpsc::Sender<JobHandle<Result<Answer, Error>>>>, | ||
} | ||
|
||
impl Worker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you borrow this implementation from somewhere? It's slightly more complicated than I might have expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is roughly based on https://doc.rust-lang.org/book/ch20-02-multithreaded.html
I tried to simplify it as much as I could, but I thought I had to have some kind of channel to take in the function and to send back the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, gotcha, yeah I see. It seems like that example just creates one channel that is reused where this implementation creates a channel per call to execute
. Is there a reason we need to create one channel per execute
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right. My bad, one should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, a single mpsc::sync_channel(CHANNEL_CAPACITY)
should be enough.
Both the sender and the receiver should be lazily instantiated instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is all contained in Worker
instance now, so both of them are lazily insttantiated (since Worker
is lazily instantiated). There are 2 bounded channels though, one for jobs and one for results. I hope that is alright.
I have made the capacity 0 for testing (meaning it is always blocking), but on the other hand, not sure if it makes sense to make it any bigger, considering there is only 1 thread handling jobs. I might be missing something,
src/stdlib/dns_lookup.rs
Outdated
let job = Box::new(f); | ||
let (sender, receiver) = mpsc::channel(); | ||
let receiver = Arc::new(Mutex::new(receiver)); | ||
let handle = JobHandle { | ||
job, | ||
result: Arc::new(sender), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly to my above question, did you borrow this from somewhere? It seems like a lot of "work" to do for each execute function such that I'm wondering what the performance looks like. Maybe we could add a benchmark for it in benches/stdlib.rs
to see?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are already some benchmarks there for the dns_lookup
function. Did you mean we should add some benchmarks for the Worker
itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @esensar ! I think this looks good to me know.
It seems possible that having a higher channel capacity could improve performance by letting the worker thread process more messages before it switches out, but we'd need to do some benchmarking to prove that out.
I'd like to get @pront's review again too. |
src/stdlib/dns_lookup.rs
Outdated
// Currently blocks on each request until result is received | ||
// It should be avoided unless absolutely needed | ||
static WORKER: Lazy<Worker> = Lazy::new(Worker::new); | ||
const CHANNEL_CAPACITY: usize = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's set a sufficiently large number here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had trouble figuring out the right number for this and then I thought it would make sense to keep it 0, considering that it is just 1 thread, but that is probably wrong.
Do you have suggestion for a number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you have works but we could probably go a bit further by allowing some buffering (without worrying about memory explosion), based on the docs:
This channel has an internal buffer on which messages will be queued. bound specifies the buffer size. When the internal buffer becomes full, future sends will block waiting for the buffer to open up.
Note that a buffer size of 0 is valid, in which case this becomes “rendezvous channel” where each [send](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html#method.send) will not return until a [recv](https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv) is paired with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tying it to the number of concurrent executions of the VRL runtime could make sense. For example, the remap
transform runs one transform per available thread: https://github.com/vectordotdev/vector/blob/b3276b4cc73dee6d3854469562f1b1fcf15a419c/src/topology/builder.rs#L68-L73. To do that "right" though it should probably be configurable on the VRL runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess any number higher than that would be alright, so I guess for now we can hardcode something random, like a 100
, since it won't really add more jobs than threads, due to the way the function blocks the current thread while waiting for result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tying it to the number of concurrent executions of the VRL runtime could make sense. For example, the
remap
transform runs one transform per available thread: https://github.com/vectordotdev/vector/blob/b3276b4cc73dee6d3854469562f1b1fcf15a419c/src/topology/builder.rs#L68-L73. To do that "right" though it should probably be configurable on the VRL runtime.
Sorry for just going for the hard coded number, but I thought we could go for that configurable approach when we update this to use multiple threads (or hopefully sometime in the future make it properly async).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @esensar!
No description provided.