Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Activity support #161

Merged
merged 8 commits into from
Oct 17, 2024
Merged

Activity support #161

merged 8 commits into from
Oct 17, 2024

Conversation

cretz
Copy link
Member

@cretz cretz commented Oct 3, 2024

What was changed:

Added full activity support including:

  • Worker with ability to run multiple concurrently
  • Thread-pool-based and fiber-based activities
  • Class, instance, and definition-based activities
  • Ability to customize activity name, executor, and whether it raises on cancel
  • Cancellation token like concept with Temporalio::Cancellation
  • Full-featured context with cancellation, heartbeating, info, logging, etc
  • Worker interceptors
  • Sigs and tests for all

Want to help review?

Great! We welcome all reviews/feedback from everyone. If the PR gets too many comments on it, we may create a new PR for another round of comments. I know the 100-file-count can seem daunting, but a lot of it is generated or unimportant code.

What type of reviewer do you want to be?

I want to review high-level design only

Review README.md (rendered here).

I want to review how the activities work, but do not care about the implementation

In addition to README.md, also review temporalio/test/worker_activity_test.rb.

I want to review the Ruby implementation but do not want to dig into the Rust side

Review everything but what's in temporalio/ext

I want to help review the Rust side including

Review everything. Maybe even go one further and help us solve/understand #162 and why Ruby 3.1/3.2 wakes up queue-pop
fibers on some queue pushes but not others (and if there's a better way for us to wake up a fiber from another thread).

@cretz cretz force-pushed the activity-worker branch 2 times, most recently from 07d9668 to d52d600 Compare October 4, 2024 16:59
@cretz cretz force-pushed the activity-worker branch 3 times, most recently from e638dd5 to 97fd76a Compare October 7, 2024 18:19
@cretz cretz requested a review from a team October 7, 2024 19:25
@cretz cretz marked this pull request as ready for review October 7, 2024 19:26
Copy link
Member Author

@cretz cretz Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically just a file move from temporalio/lib/temporalio/client/implementation.rb. Only the async activity stuff at the bottom is new.

@cretz cretz linked an issue Oct 8, 2024 that may be closed by this pull request
client = Temporalio::Client.connect('localhost:7233', 'my-namespace')

# Run workflow
result = client.execute_workflow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add EOL comments to indicate what arguments are? In particular, the 'Temporal' arg is non-obvious.

Copy link
Member Author

@cretz cretz Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's that common to have to annotate all uses of Ruby code for these arguments. Like Python and Go before it, the argument comes after the workflow but we similarly do not add EOL comments there to say what comes after the workflow is its argument. There will be many many examples and docs showing calling a workflow this way, I am unsure it's reasonable to expect those to all have comments and I don't really want to treat this sample as any more special than the others.

README.md Show resolved Hide resolved
on a workflow. To get a handle to an existing workflow, use `workflow_handle` on the client.
* Clients are thread safe and are fiber-compatible (but fiber compatibility only supported for Ruby 3.3+ at this time).

#### Cloud Client Using mTLS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also document API Key

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API key is not GA yet and we don't document it in Python or .NET SDK READMEs, nor do we document in TypeScript places like https://docs.temporal.io/develop/typescript/temporal-clients#connect-to-temporal-cloud. I would expect API key documentation for each language to be done across languages when available/ready.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not GA, but it's Public Preview, which mandates that we should have basic documentation. And I expect it will be a few months before we add Ruby docs on docs.temporal.io. Till then, this README file is the main doc.

But I honestly don't mind. Your call.

Copy link
Member Author

@cretz cretz Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 We should update all languages together (docs site and README) for API key, Ruby should not be special here.

README.md Outdated Show resolved Hide resolved
README.md Outdated

#### Activity Testing

Unit testing an activity can is done via the `Temporalio::Testing::ActivityEnvironment` class. Simply instantiate the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Unit testing an activity can is done via the `Temporalio::Testing::ActivityEnvironment` class. Simply instantiate the
Unit testing an activity can be done via the `Temporalio::Testing::ActivityEnvironment` class. Simply instantiate the

#
# @param parents [Array<Cancellation>] Parent cancellations to link this one to. This cancellation will be canceled
# when any parents are canceled.
def initialize(*parents)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why did you made it plural?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason. In Go it's singular but in .NET it's plural. I could also change the name of this var to "linked" though the idea of "parent" is clearer I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

There's technically a risk of creating memory leaks if the listener graph is allowed to grow in a way that a high number of cancellation scopes tokens (presumably short lives) registered on a single long-lived cancellation scope. There's definitely no such case at the moment (and might probably never be any case of this worth being concerned about), but the possibility of having "multiple parents" was kind of pointing toward use cases that could have been problematic.

temporalio/ext/src/util.rs Outdated Show resolved Hide resolved
Comment on lines 110 to 113
// We trust our usage of this across threads. We would use Opaque but we can't
// box that properly/safely.
unsafe impl Send for AsyncCallback {}
unsafe impl Sync for AsyncCallback {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should explain that the inner queue Value is only ever supposed to be a Ruby Queue and that usage of that is threadsafe. The actual Value implementation uses a raw pointer, and hence is explicitly un-Send and Sync, but it seems like this should be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I was hoping the the method/field names were enough to clarify it is only queue. It's actually not general-purpose thread safe, heh, only in-Ruby-thread safe. They have an Opaque for this Ruby-only-thread thing. The problem with Opaque is that we can't "box" it (which basically tells Ruby not to allow it be GC'd until dropped). If they had a BoxOpaque our problem would be solved, heh.

Will clarify in comment here too. We may end up changing this to a Block or Proc if #162 shows that we need to do more than a queue push.

Comment on lines +51 to +54
// This needs to be a RefCell of an Option of an Arc because we need to
// mutably take it out of the option at finalize time but we don't have
// a mutable reference of self at that time.
core: RefCell<Option<Arc<temporal_sdk_core::Worker>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the workers array you store them in is already protected by using mutexes from the Ruby side.

Rather than doing this, it would make sense to me to store RefCell<Worker>s in that array, so you can use mutable references here when finalizing and avoid the option dance

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here is the thinking:

  • RefCell - needed so I can have a mutable reference
  • Option - needed because I need to take the worker out and obtain ownership at finalize time and leave nothing in its place (because I don't own the outer object)
  • Arc - Because I need to access this in multiple Rust threads

Can you explain a bit more about removing the Option and Arc parts? How do I get the ability to take the Arc out and unwrap the Arc as the final reference otherwise? Also, how do I have this for use across Rust threads without Arc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is rather than storing an array of these workers here: https://github.com/temporalio/sdk-ruby/pull/161/files/49d4fc6c62f0ce15d5e3b8f1add3b524de1657a4#diff-3d43a321618f2737c54966b8cad9758e44e5919da7f5bc656c0a3f196ef682ccR5

You could instead store an array of RefCell<Arc<Worker>>s which would allow you to use into_inner() on the ref cell when finalizing, because you can remove them from the array. That way you can call finalize with self instead of a ref, I think.

Copy link
Member Author

@cretz cretz Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user provides that worker array at runtime, we don't know it ahead of time. This is part of the Worker.run_all support because Ruby doesn't have a good way to run multiple things concurrently in the standard library without eating a thread for each (fiber stuff is basically non-usable by users in the stdlib without https://github.com/socketry/async which run_all also happens to support).

But if I'm understanding you correctly, yes I could make a separate WorkerGroup object that has a poll and a finalize. But I'd still have to move the core things out of the worker to the worker group I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's more or less what I'm suggesting. That way you can own every worker in the group when it's time to shut down and avoid needing option unwrapping for the more frequently used methods.

Copy link
Member Author

@cretz cretz Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have it yet, but workers still need to be individually owned to do things like update the client on the individual worker. I can't really move it to the group. (though I do need to add code probably on the Ruby side to ensure that a worker isn't run twice)

temporalio/ext/src/worker.rs Outdated Show resolved Hide resolved
}
})
.collect::<Vec<_>>();
let mut worker_stream = stream::select_all(worker_streams);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably avoid making the boxed inner streams and this step by using https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.for_each on a stream made from https://docs.rs/futures/latest/futures/stream/fn.iter.html of worker refs and then basically inlining the below stuff into here after calling the poll.

Not a big deal though, but might read a bit nicer and be slightly less overhead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the logic split into Ruby thread and Rust thread because some Ruby things are needed to build the polling streams and then I combine multiple polling streams into one polling stream. I am not sure of a good way to combine multiple streams until all done without using select_all which requires the stream be pinned.

module Temporalio
# Cancellation representation, often known as a "cancellation token". This is used by clients, activities, and
# workflows to represent cancellation in a thread/fiber-safe way.
class Cancellation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to just call this CancellationToken?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong reason. Usually Ruby isn't as wordy as .NET and some people may have familiarity with https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Cancellation.html which this is similar to (though ours is more feature rich). I can change to calling it a token everywhere if we want, though while in theory it may seem confusing to call this "a cancellation" when "a cancellation" also refers to an event, I haven't been tripped up on that in practice documenting it.

end

# @!visibility private
class Worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this a bit confusing with our overall Worker name. Maybe PoolWorker might be nice here? Not a big problem either way

Copy link
Member Author

@cretz cretz Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's private to this class and in Ruby names of things are often ambiguous if viewed in only their unqualified form. Things are meant to be understood in their hierarchical place. This is not Worker, this is Temporal::Worker::ActivityExecutor::ThreadPool::Worker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I understand that logically, but, it was still not immediately obvious to me

class ActivityExecutor
# Activity executor for scheduling activities in their own thread. This implementation is a stripped down form of
# Concurrent Ruby's `CachedThreadPool`.
class ThreadPool < ActivityExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like large part of what was inherited from the original implementation is not pertinent for what we're doing. Unless you plan to make the fiber executor honor a similar API, I'd suggest trimming this one down. That's API surface and maintainance burden we don't need.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did trim some logic down, but the remaining API surface has more value than if hidden IMO. If it were extra logic I'd agree it adds a maintenance burden, but just exposing extra info I do not think adds much burden and I think it's valuable. It's just read-only stats (unless you're referring to something else?) and I am not too concerned with the contents of the class changing significantly to the point where we will regret exposing the stats.


# Run everything else in the excecutor
executor = @worker.options.activity_executors[defn.executor]
executor.execute_activity(defn) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Looks like executor.execute_activity has no real need for defn

Copy link
Member Author

@cretz cretz Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only because you're looking at the two implementations today :-) This is a user-facing abstraction and I am concerned about a future where people want to configure their executor on a per-activity basis (granted they may have to extend activity definition or something). I considered not even exposing this abstraction, but there are enough users of different async libraries out there that it seemed worth it. And it seemed wrong to tell an executor to execute a block for an activity without them even being able to know what activity it is, especially if I have it right here. This is similar to how our recent worker slot supplier abstraction provides info to make decisions there even though we don't use all of that info in our limited implementations today.

@mjameswh
Copy link
Contributor

In README.md, build instructions are missing the bundle install (this is not new, but just noticed it now).

@cretz
Copy link
Member Author

cretz commented Oct 11, 2024

Thanks! Will add. EDIT: Done

Copy link
Contributor

@mjameswh mjameswh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work! No blocker from me.

Please wait for more reviews before merging, though.

Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving but I think it's worth trying to see if you can get rid of the option in the bridge worker per that thread

@cretz cretz merged commit dd0da03 into temporalio:main Oct 17, 2024
8 checks passed
@cretz cretz deleted the activity-worker branch October 17, 2024 13:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Activity test environment
3 participants