Skip to content

Commit

Permalink
Update to zenoh-0.7.0-rc
Browse files Browse the repository at this point in the history
  • Loading branch information
phil-opp committed Sep 7, 2023
1 parent 2cd9b8e commit 0e35a1e
Show file tree
Hide file tree
Showing 10 changed files with 565 additions and 554 deletions.
993 changes: 493 additions & 500 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion anna-rs
Submodule anna-rs updated 1 files
+2 −2 Cargo.lock
2 changes: 1 addition & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ license = "Apache-2.0"

[dependencies]
# keep in sync with anna-rs
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "d4b00540cd0faa6ce585e11862cf9740ca226489" }
zenoh = "0.7.0-rc"
19 changes: 10 additions & 9 deletions function-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ version = "0.1.0"
anyhow = "1.0.45"
argh = "0.1.6"
eyre = "0.6.5"
wasmedge-sdk = {version = "0.8.1", optional = true}
wasmtime = {version = "9.0.3", optional = true}
wasmtime-wasi = {version = "9.0.3", optional = true}
wasmedge-sdk = { version = "0.8.1", optional = true }
wasmtime = { version = "9.0.3", optional = true }
wasmtime-wasi = { version = "9.0.3", optional = true }
# keep in sync with anna-rs
anna = {git = "https://github.com/essa-project/anna-rs.git"}
anna = { git = "https://github.com/essa-project/anna-rs.git" }
bincode = "1.3.3"
chrono = {version = "0.4.19", default-features = false}
essa-common = {path = "../common"}
chrono = { version = "0.4.19", default-features = false }
essa-common = { path = "../common" }
fern = "0.6.0"
log = "0.4.14"
serde = {version = "1.0.130", features = ["derive"]}
serde = { version = "1.0.130", features = ["derive"] }
smol = "1.2.5"
uuid = {version = "0.8.2", features = ["v4"]}
zenoh = {git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "d4b00540cd0faa6ce585e11862cf9740ca226489"}
uuid = { version = "0.8.2", features = ["v4"] }
zenoh = "0.7.0-rc"
flume = "0.10.14"

[features]
default = ["wasmedge_executor"]
Expand Down
21 changes: 11 additions & 10 deletions function-executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use anyhow::Context;
use essa_common::{
essa_default_zenoh_prefix, executor_run_function_subscribe_topic, executor_run_module_topic,
};
use flume::RecvError;
use std::{
sync::Arc,
thread,
time::{Duration, Instant},
};
use uuid::Uuid;
use zenoh::prelude::{Receiver, SplitBuffer, ZFuture};
use zenoh::prelude::{sync::SyncResolve, SplitBuffer};

#[cfg(all(feature = "wasmedge_executor", feature = "wasmtime_executor"))]
compile_error!(
Expand Down Expand Up @@ -61,7 +62,7 @@ fn main() -> anyhow::Result<()> {

let zenoh = Arc::new(
zenoh::open(zenoh::config::Config::default())
.wait()
.res()
.map_err(|e| anyhow::anyhow!(e))
.context("failed to connect to zenoh")?,
);
Expand Down Expand Up @@ -130,13 +131,13 @@ fn module_receive_loop(
zenoh_prefix: String,
) -> anyhow::Result<()> {
let mut new_modules = zenoh
.subscribe(executor_run_module_topic(args.id, &zenoh_prefix))
.wait()
.declare_subscriber(executor_run_module_topic(args.id, &zenoh_prefix))
.res()
.map_err(|e| anyhow::anyhow!(e))
.context("failed to subscribe to new modules")?;

loop {
match new_modules.receiver().recv() {
match new_modules.receiver.recv() {
Ok(change) => {
let wasm_bytes = change.value.payload.contiguous().into_owned();

Expand All @@ -158,7 +159,7 @@ fn module_receive_loop(
log::info!("Module run finished in {:?}", Instant::now() - start);
});
}
Err(zenoh::sync::channel::RecvError::Disconnected) => break,
Err(RecvError::Disconnected) => break,
}
}
Ok(())
Expand All @@ -171,16 +172,16 @@ fn function_call_receive_loop(
zenoh_prefix: String,
) -> anyhow::Result<()> {
let mut function_calls = zenoh
.queryable(executor_run_function_subscribe_topic(
.declare_queryable(executor_run_function_subscribe_topic(
args.id,
&zenoh_prefix,
))
.wait()
.res()
.map_err(|e| anyhow::anyhow!(e))
.context("failed to subscribe to new modules")?;

loop {
match function_calls.receiver().recv() {
match function_calls.receiver.recv() {
Ok(query) => {
// start a new function executor instance to run the requested
// function
Expand All @@ -198,7 +199,7 @@ fn function_call_receive_loop(
}
});
}
Err(zenoh::sync::channel::RecvError::Disconnected) => break,
Err(RecvError::Disconnected) => break,
}
}
Ok(())
Expand Down
19 changes: 10 additions & 9 deletions function-executor/src/wasmedge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{get_args, get_module, kvs_get, kvs_put, EssaResult, FunctionExecutor
use anna::{lattice::LastWriterWinsLattice, nodes::ClientNode, ClientKey};
use anyhow::Context;
use essa_common::scheduler_function_call_topic;
use flume::Receiver;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
Expand All @@ -18,8 +19,8 @@ use wasmedge_sdk::{
ValType, Vm, VmBuilder, WasmValue,
};
use zenoh::{
prelude::{Receiver, Sample, SplitBuffer, ZFuture},
query::ReplyReceiver,
prelude::{sync::SyncResolve, Sample, SplitBuffer},
query::Reply,
queryable::Query,
};

Expand Down Expand Up @@ -64,7 +65,7 @@ impl FunctionExecutor {

/// Runs the given function of a already compiled WASM module.
pub fn handle_function_call(mut self, query: Query) -> anyhow::Result<()> {
let mut topic_split = query.key_selector().as_str().split('/');
let mut topic_split = query.key_expr().as_str().split('/');
let args_key = ClientKey::from(topic_split.next_back().context("no args key in topic")?);
let function_name = topic_split
.next_back()
Expand Down Expand Up @@ -106,8 +107,8 @@ impl FunctionExecutor {
// also improve performance since the receiver would no longer need to
// busy-wait on the result key in the KVS anymore.
if let Some(result_value) = host_state.function_result.take() {
let selector = query.key_selector().to_string();
query.reply(Sample::new(selector, result_value));
let selector = query.key_expr();
query.reply(Ok(Sample::new(selector, result_value)));

Ok(())
} else {
Expand Down Expand Up @@ -702,7 +703,7 @@ struct HostState {
function_result: Option<Vec<u8>>,

next_result_handle: u32,
result_receivers: HashMap<u32, ReplyReceiver>,
result_receivers: HashMap<u32, Receiver<Reply>>,
results: HashMap<u32, Arc<Vec<u8>>>,

zenoh: Arc<zenoh::Session>,
Expand All @@ -717,7 +718,7 @@ impl HostState {
&mut self,
function_name: String,
args: Vec<u8>,
) -> Result<ReplyReceiver, EssaResult> {
) -> Result<Receiver<Reply>, EssaResult> {
// get the requested function and check its signature
let func = self
.module
Expand Down Expand Up @@ -802,13 +803,13 @@ fn call_function_extern(
args_key: ClientKey,
zenoh: Arc<zenoh::Session>,
zenoh_prefix: &str,
) -> anyhow::Result<ReplyReceiver> {
) -> anyhow::Result<Receiver<Reply>> {
let topic = scheduler_function_call_topic(zenoh_prefix, &module_key, &function_name, &args_key);

// send the request to the scheduler node
let reply = zenoh
.get(topic)
.wait()
.res()
.map_err(|e| anyhow::anyhow!(e))
.context("failed to send function call request to scheduler")?;

Expand Down
30 changes: 20 additions & 10 deletions function-executor/src/wasmtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use crate::{get_args, get_module, kvs_get, kvs_put, EssaResult, FunctionExecutor
use anna::{lattice::LastWriterWinsLattice, nodes::ClientNode, ClientKey};
use anyhow::{bail, Context};
use essa_common::scheduler_function_call_topic;
use flume::Receiver;
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;
use wasmtime::{Caller, Engine, Extern, Linker, Module, Store, ValType};
use wasmtime_wasi::{WasiCtx, WasiCtxBuilder};
use zenoh::{
prelude::{Receiver, Sample, SplitBuffer, ZFuture},
query::ReplyReceiver,
prelude::{sync::SyncResolve, Sample, SplitBuffer},
query::Reply,
queryable::Query,
};

Expand Down Expand Up @@ -70,7 +71,7 @@ impl FunctionExecutor {

/// Runs the given function of a already compiled WASM module.
pub fn handle_function_call(mut self, query: Query) -> anyhow::Result<()> {
let mut topic_split = query.key_selector().as_str().split('/');
let mut topic_split = query.key_expr().as_str().split('/');
let args_key = ClientKey::from(topic_split.next_back().context("no args key in topic")?);
let function_name = topic_split
.next_back()
Expand Down Expand Up @@ -117,8 +118,11 @@ impl FunctionExecutor {
// busy-wait on the result key in the KVS anymore.
let mut host_state = store.into_data();
if let Some(result_value) = host_state.function_result.take() {
let selector = query.key_selector().to_string();
query.reply(Sample::new(selector, result_value));
let selector = query.key_expr().clone();
query
.reply(Ok(Sample::new(selector, result_value)))
.res()
.map_err(|err| anyhow::anyhow!(err))?;

Ok(())
} else {
Expand Down Expand Up @@ -580,7 +584,7 @@ struct HostState {
function_result: Option<Vec<u8>>,

next_result_handle: u32,
result_receivers: HashMap<u32, ReplyReceiver>,
result_receivers: HashMap<u32, Receiver<Reply>>,
results: HashMap<u32, Arc<Vec<u8>>>,

zenoh: Arc<zenoh::Session>,
Expand All @@ -595,7 +599,7 @@ impl HostState {
&mut self,
function_name: String,
args: Vec<u8>,
) -> Result<ReplyReceiver, EssaResult> {
) -> Result<Receiver<Reply>, EssaResult> {
// get the requested function and check its signature
let func = self
.module
Expand Down Expand Up @@ -653,7 +657,13 @@ impl HostState {
std::collections::hash_map::Entry::Vacant(entry) => {
if let Some(result) = self.result_receivers.remove(&handle) {
let reply = result.recv().map_err(|_| EssaResult::UnknownError)?;
let value = reply.sample.value.payload.contiguous().into_owned();
let value = reply
.sample
.map_err(|_| EssaResult::UnknownError)?
.value
.payload
.contiguous()
.into_owned();
let value = entry.insert(Arc::new(value));
Ok(value.clone())
} else {
Expand All @@ -675,13 +685,13 @@ fn call_function_extern(
args_key: ClientKey,
zenoh: Arc<zenoh::Session>,
zenoh_prefix: &str,
) -> anyhow::Result<ReplyReceiver> {
) -> anyhow::Result<Receiver<Reply>> {
let topic = scheduler_function_call_topic(zenoh_prefix, &module_key, &function_name, &args_key);

// send the request to the scheduler node
let reply = zenoh
.get(topic)
.wait()
.res()
.map_err(|e| anyhow::anyhow!(e))
.context("failed to send function call request to scheduler")?;

Expand Down
2 changes: 1 addition & 1 deletion function-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ wasmtime = "9.0.3"
wasmtime-wasi = "9.0.3"
argh = "0.1.6"
# keep in sync with anna-rs
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "d4b00540cd0faa6ce585e11862cf9740ca226489" }
zenoh = "0.7.0-rc"
essa-common = { path = "../common" }
smol = "1.2.5"
futures = "0.3.17"
Expand Down
23 changes: 14 additions & 9 deletions function-scheduler/src/bin/function-scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use essa_common::{
};
use futures::{select, StreamExt};
use zenoh::{
prelude::{Receiver, SplitBuffer, ZFuture},
prelude::{r#async::AsyncResolve, SplitBuffer},
queryable::Query,
};

Expand All @@ -25,27 +25,30 @@ fn main() -> anyhow::Result<()> {

async fn run() -> anyhow::Result<()> {
let zenoh = zenoh::open(zenoh::config::Config::default())
.wait()
.res()
.await
.map_err(|e| anyhow::anyhow!(e))
.context("failed to connect to zenoh")?
.into_arc();
let zenoh_prefix = essa_default_zenoh_prefix();

// subscribe to module run requests issued through `run-function`
let mut new_modules_sub = zenoh
.subscribe(scheduler_run_module_topic(zenoh_prefix))
.declare_subscriber(scheduler_run_module_topic(zenoh_prefix))
.res()
.await
.map_err(|e| anyhow::anyhow!(e))
.context("failed to subscribe to new modules")?;
let mut new_modules = new_modules_sub.receiver().fuse();
let mut new_modules = new_modules_sub.receiver.into_stream();

// subscribe to remote function call requests issued by WASM functions
let mut function_calls_sub = zenoh
.queryable(scheduler_function_call_subscribe_topic(zenoh_prefix))
.declare_queryable(scheduler_function_call_subscribe_topic(zenoh_prefix))
.res()
.await
.map_err(|e| anyhow::anyhow!(e))
.context("failed to subscribe to function calls")?;
let mut function_calls = function_calls_sub.receiver().fuse();
let mut function_calls = function_calls_sub.receiver.into_stream();

loop {
select! {
Expand Down Expand Up @@ -81,7 +84,8 @@ async fn run_module(
executor_run_module_topic(executor_id, zenoh_prefix),
wasm_bytes,
)
.wait()
.res()
.await
.map_err(|e| anyhow::anyhow!(e))
.context("failed to send module to executor")?;

Expand All @@ -97,7 +101,7 @@ async fn call_function(
// executor node 0
let executor_id = 0;

let mut topic_split = query.key_selector().as_str().split('/');
let mut topic_split = query.key_expr().as_str().split('/');
let args = topic_split
.next_back()
.context("no args key in topic")?
Expand All @@ -118,13 +122,14 @@ async fn call_function(

let reply = zenoh
.get(topic)
.res()
.await
.expect("failed to forward function call to executor")
.recv_async()
.await
.expect("failed to receive reply");

query.reply_async(reply.sample).await;
query.reply(reply.sample).res().await;
};
smol::spawn(task).detach();

Expand Down
Loading

0 comments on commit 0e35a1e

Please sign in to comment.