-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce transform::manager #13270
Introduce transform::manager #13270
Conversation
98d464d
to
d108903
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great. several questions but i don't think there are any blockers.
void work_queue::submit(ss::noncopyable_function<ss::future<>()> fn) { | ||
if (_as.abort_requested()) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if the signature here should be
ss::noncopyable_function<ss::future<>(seastar::abort_source*)> fn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this - right now there isn't a use case because starting a wasm engine isn't interruptible at the moment and that's the main source of latency I'd assume. Do you think it's worth future proofing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By future proofing I mean adding now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's worth future proofing?
nah
@@ -121,4 +121,6 @@ ss::future<> processor::do_run_transform_loop() { | |||
|
|||
model::transform_id processor::id() const { return _id; } | |||
const model::ntp& processor::ntp() const { return _ntp; } | |||
const model::transform_metadata& processor::meta() const { return _meta; } | |||
bool processor::is_running() const { return !_task.available(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll be interested to see later in the pr how is_running
is used, but it feels like an odd property to expose. for example, if its false then the only move is to inspect if there was an exception, but that isn't exposed. it could be restarted, but that could be hidden--restart automatic policy. also, it's true before before start() and after stop() (ie contains ss::now()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you saw this :)
I'm happy for alternatives, but I'd like to colocate the processor with the backoff state and this feels like the easiest thing? I guess an optional (or nullptr) is another option too...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it makes sense to have this or something like it if the restart policy is extracted out of the processor itself. it just wasn't clear what was coming up in a commit-by-commit review, so it's a bit more of a stream of consciousness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream of consciousness
love stream of consciousness reviews!
// enqueued to ensure that task execution is deterministic. Because in | ||
// debug mode seastar randomizes task order, so there is no way to wait | ||
// for those tasks to be executed outside of draining the seastar queue. | ||
ss::set_idle_cpu_handler([this](ss::work_waiting_on_reactor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious if you saw this technique in Redpanda source tree. I ask because we had to solve a similar problem at one point in the past and I don't recall that we had support from Seastar for that like you've managed to get with this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not. I studied the reactor source in seastar a lot until I stumbled upon this. AFAIK this is would be the first usage of ss::set_idle_cpu_handler
in Redpanda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So cool
4306089
to
a1e478f
Compare
Force push rebase with dev |
This is a small utility that manages running tasks sequentially on a single fiber. This is useful for simplifying control loops by executing everything on a single fiber without need for locking/etc. As a potenial future extension: for long running async work, this queue could have the ability to manage spinning off background fibers to handle work, then re-enqueuing the result of that work. The advantage of having the work_queue track that is that the bookkeeping is consolidated into a single place and we can cleanly handle shutdown. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This is the main controller for processors. It ensures that there are processors running on it's core for all of the leader ntp's that have transforms attached as inputs. In the case of failures we retry with backoff exponentially, this state is tracked along side it's processor in a table that includes probes as well (as these must be shared on a core if a core owns multiple partitions for an input topic that has a transform). The relationship of there being many transforms for many partitions means that the logic here is non trivial of keeping track of all the state, as well as tracking if a transform is running or not. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This will help in the transform manager tests that we can plugin a processor that reports its lifecycle state to the tests Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Useful so we don't need to remember which units latency should be in. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Mostly just applying clang-tidy auto fixes (range loops and no c-style arrays) Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This will allow deterministic tests by using `ss::manual_clock` instead of `ss::lowres_clock` which will requiring sleeping and all sorts of not fun patterns. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Make sure tasks from ss::manual_clock are completed when the clock is advanced, we do this by asking the reactor to tell us when there are no more tasks before we explicitly add tasks from the queue in transform_manager into seastar's queue. Otherwise seastar's queue task order randomizer can cause tasks to not be enqueued as the tests expect. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
When we start tasks to create processors in parallel, we will create multiple probes in `create_processor`, so instead create the probe once in the callers of those methods so that we don't get duplicate registration. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
In release mode seastar can execute tasks inline (when created using `ss::future::then`) instead of submitting the task to the scheduler, which cause these tests to fail, as we require control over when the callbacks are executed. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
a1e478f
to
f5df616
Compare
Force push: fix #13270 (comment) |
Use the standard seastar mechanism via a gate. Much simpler. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
The concurrency is premature and prevents easy reasoning about concurrent data structure modification. If there are futures that take a long time, we should not do them in parallel but move them off the queue (and extend the work queue to support tracking these too). Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
9573cfa
to
9b7b108
Compare
Force push: remove unused header |
cb7f218
to
e6c7763
Compare
Force push: remove unused variable |
For predicable memory usage and performance. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
There is no real tuning happening here, but this should only happen if there are network failures fetching the binary (mitigated by retries in the implementation) or because compilation fails (which is not recoverable). So we can probably afford to delay longer. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
e6c7763
to
2f49ee9
Compare
Force push: fix oss build |
Transform manager is the component for ensuring that the correct
transform::processor
is running for a given core.It is a component that recieves notifications from the rest of the system and processes those notifications on a single fiber queue.
The queue makes handling shutdown a little simpler, as we can remove most of the logic for handling shutdown correctly into another class,
additionally using a single fiber means we don't need to worry about concurrent modifications of data structures or duplicate notifications
being scheduled.
We also template the manager based on the clock type. This is a similar pattern used in other places that allows us to use the seastar manual
clock for fully deterministic testing, but use the lowres clock for production usage.
Backports Required
Release Notes