Skip to content

Commit

Permalink
Add Worker Loader (#349)
Browse files Browse the repository at this point in the history
* Add worker loader.

* Fix worker loader traits.

* Add spawn with loader to reactor and oneshot workers.

* Add changelog.
  • Loading branch information
futursolo authored Jul 30, 2023
1 parent ffdd2d0 commit 26caf55
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@

## `worker`

### Next Version

- Add Worker Loader (#349)

### Version 0.3.0

- Function Worker (#267)
Expand Down
67 changes: 42 additions & 25 deletions crates/worker/src/actor/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ where
self
}

/// Spawns a Worker.
pub fn spawn(&self, path: &str) -> WorkerBridge<W>
fn spawn_inner(&self, worker: DedicatedWorker) -> WorkerBridge<W>
where
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
Expand All @@ -121,40 +120,36 @@ where

let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));

let worker = {
let handler = {
let pending_queue = pending_queue.clone();
let callbacks = callbacks.clone();
let worker = create_worker(path);

let handler = {
let worker = worker.clone();
let worker = worker.clone();

move |msg: FromWorker<W>| match msg {
FromWorker::WorkerLoaded => {
if let Some(pending_queue) = pending_queue.borrow_mut().take() {
for to_worker in pending_queue.into_iter() {
worker.post_packed_message::<_, CODEC>(to_worker);
}
move |msg: FromWorker<W>| match msg {
FromWorker::WorkerLoaded => {
if let Some(pending_queue) = pending_queue.borrow_mut().take() {
for to_worker in pending_queue.into_iter() {
worker.post_packed_message::<_, CODEC>(to_worker);
}
}
FromWorker::ProcessOutput(id, output) => {
let mut callbacks = callbacks.borrow_mut();

if let Some(m) = callbacks.get(&id) {
if let Some(m) = Weak::upgrade(m) {
m(output);
} else {
callbacks.remove(&id);
}
}
FromWorker::ProcessOutput(id, output) => {
let mut callbacks = callbacks.borrow_mut();

if let Some(m) = callbacks.get(&id) {
if let Some(m) = Weak::upgrade(m) {
m(output);
} else {
callbacks.remove(&id);
}
}
}
};

worker.set_on_packed_message::<_, CODEC, _>(handler);
worker
}
};

worker.set_on_packed_message::<_, CODEC, _>(handler);

WorkerBridge::<W>::new::<CODEC>(
handler_id,
worker,
Expand All @@ -163,4 +158,26 @@ where
self.callback.clone(),
)
}

/// Spawns a Worker.
pub fn spawn(&self, path: &str) -> WorkerBridge<W>
where
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let worker = create_worker(path);

self.spawn_inner(worker)
}

/// Spawns a Worker with a loader shim script.
pub fn spawn_with_loader(&self, loader_path: &str) -> WorkerBridge<W>
where
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let worker = DedicatedWorker::new(loader_path).expect("failed to spawn worker");

self.spawn_inner(worker)
}
}
13 changes: 13 additions & 0 deletions crates/worker/src/oneshot/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,17 @@ where

OneshotBridge::new(inner, rx)
}

/// Spawns an Oneshot Worker with a loader shim script.
pub fn spawn_with_loader(mut self, loader_path: &str) -> OneshotBridge<N>
where
N::Input: Serialize + for<'de> Deserialize<'de>,
N::Output: Serialize + for<'de> Deserialize<'de>,
{
let rx = OneshotBridge::register_callback(&mut self.inner);

let inner = self.inner.spawn_with_loader(loader_path);

OneshotBridge::new(inner, rx)
}
}
13 changes: 13 additions & 0 deletions crates/worker/src/reactor/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,17 @@ where

ReactorBridge::new(inner, rx)
}

/// Spawns a Reactor Worker with a loader shim script.
pub fn spawn_with_loader(mut self, loader_path: &str) -> ReactorBridge<R>
where
<R::Scope as ReactorScoped>::Input: Serialize + for<'de> Deserialize<'de>,
<R::Scope as ReactorScoped>::Output: Serialize + for<'de> Deserialize<'de>,
{
let rx = ReactorBridge::register_callback(&mut self.inner);

let inner = self.inner.spawn_with_loader(loader_path);

ReactorBridge::new(inner, rx)
}
}
2 changes: 1 addition & 1 deletion examples/markdown/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Markdown</title>

<link data-trunk rel="rust" data-type="worker" data-wasm-opt="z" data-bin="example_markdown_worker" />
<link data-trunk rel="rust" data-type="worker" data-wasm-opt="z" data-bin="example_markdown_worker" data-loader-shim />
<link data-trunk rel="rust" data-wasm-opt="z" data-bin="example_markdown_app" />
</head>

Expand Down
3 changes: 2 additions & 1 deletion examples/markdown/src/bin/example_markdown_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ fn main() {
.flatten()
.expect_throw("failed to query root element");

let mut bridge = MarkdownWorker::spawner().spawn("/example_markdown_worker.js");
let mut bridge =
MarkdownWorker::spawner().spawn_with_loader("/example_markdown_worker_loader.js");

spawn_local(async move {
let content = bridge.run(MARKDOWN_CONTENT.to_owned()).await;
Expand Down

0 comments on commit 26caf55

Please sign in to comment.