Skip to content

Commit

Permalink
compiliert wieder
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidma committed May 1, 2024
1 parent 1ada200 commit 68e2de5
Show file tree
Hide file tree
Showing 149 changed files with 2,507 additions and 1,330 deletions.
400 changes: 222 additions & 178 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ members = [
"crates/opn",
"crates/parameter_tester",
"crates/parameters",
"crates/path_serde",
"crates/path_serde_derive",
"crates/projection",
"crates/repository",
"crates/serialize_hierarchy",
"crates/serialize_hierarchy_derive",
"crates/source_analyzer",
"crates/spl_network",
"crates/spl_network_messages",
Expand Down Expand Up @@ -142,6 +142,8 @@ opusfile-ng = "0.1.0"
ordered-float = "3.1.0"
parameters = { path = "crates/parameters" }
parking_lot = "0.12.1"
path_serde = { path = "crates/path_serde" }
path_serde_derive = { path = "crates/path_serde_derive" }
petgraph = "0.6.2"
png = "0.17.6"
proc-macro-error = "1.0.4"
Expand All @@ -161,8 +163,6 @@ serde_bytes = "0.11.8"
serde_derive = "1.0.195"
serde_json = "1.0.107"
serde_test = "1.0.152"
serialize_hierarchy = { path = "crates/serialize_hierarchy" }
serialize_hierarchy_derive = { path = "crates/serialize_hierarchy_derive" }
sha2 = "0.10.8"
smallvec = "1.9.0"
source_analyzer = { path = "crates/source_analyzer" }
Expand Down
9 changes: 8 additions & 1 deletion crates/code_generation/src/cyclers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ fn generate_cycler_instance(cycler: &Cycler) -> TokenStream {

fn generate_database_struct() -> TokenStream {
quote! {
#[derive(Default, serde::Deserialize, serde::Serialize, serialize_hierarchy::SerializeHierarchy)]
#[derive(
Default,
serde::Deserialize,
serde::Serialize,
path_serde::PathSerialize,
path_serde::PathDeserialize,
path_serde::PathIntrospect,
)]
pub(crate) struct Database {
pub main_outputs: MainOutputs,
pub additional_outputs: AdditionalOutputs,
Expand Down
4 changes: 3 additions & 1 deletion crates/code_generation/src/structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub fn generate_structs(structs: &Structs) -> TokenStream {
Default,
serde::Deserialize,
serde::Serialize,
serialize_hierarchy::SerializeHierarchy,
path_serde::PathSerialize,
path_serde::PathDeserialize,
path_serde::PathIntrospect,
)]
};
let parameters =
Expand Down
2 changes: 1 addition & 1 deletion crates/communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ framework = { workspace = true, optional = true}
futures-util = { workspace = true }
log = { workspace = true }
parameters = { workspace = true }
path_serde = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serialize_hierarchy = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
Expand Down
174 changes: 90 additions & 84 deletions crates/communication/src/server/outputs/provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
num::Wrapping,
sync::Arc,
};
Expand All @@ -8,7 +8,7 @@ use bincode::{DefaultOptions, Options};
use framework::{Reader, Writer};
use futures_util::{stream::FuturesUnordered, StreamExt};
use log::error;
use serialize_hierarchy::SerializeHierarchy;
use path_serde::{PathIntrospect, PathSerialize};
use tokio::{
select, spawn,
sync::{
Expand Down Expand Up @@ -36,15 +36,16 @@ pub fn provider<Outputs>(
subscribed_outputs_writer: Writer<HashSet<String>>,
) -> JoinHandle<()>
where
Outputs: SerializeHierarchy + Send + Sync + 'static,
Outputs: PathIntrospect + PathSerialize + Send + Sync + 'static,
{
spawn(async move {
let (request_sender, mut request_receiver) = channel(1);

let fields = Outputs::get_fields();
outputs_sender
.send(Request::RegisterCycler {
cycler_instance: cycler_instance.to_string(),
fields: Outputs::get_fields(),
fields: fields.clone(),
request_sender,
})
.await
Expand All @@ -58,10 +59,11 @@ where
request = request_receiver.recv() => {
match request {
Some(request) => {
handle_client_request::<Outputs>(
handle_client_request(
request,
cycler_instance,
&mut subscriptions,
&fields,
).await
},
None => break,
Expand All @@ -87,14 +89,12 @@ enum SubscriptionsState {
Unchanged,
}

async fn handle_client_request<Outputs>(
async fn handle_client_request(
request: ClientRequest<OutputsRequest>,
cycler_instance: &'static str,
subscriptions: &mut HashMap<(Client, usize), Subscription>,
) -> SubscriptionsState
where
Outputs: SerializeHierarchy,
{
fields: &BTreeSet<String>,
) -> SubscriptionsState {
let is_get_next = matches!(request.request, OutputsRequest::GetNext { .. });
match request.request {
OutputsRequest::GetFields { .. } => {
Expand All @@ -113,50 +113,7 @@ where
format,
} => {
assert_eq!(cycler_instance, received_cycler_instance);
if Outputs::exists(&path) {
match subscriptions.entry((request.client.clone(), id)) {
Entry::Occupied(_) => {
let error_message = format!("already subscribed with id {id}");
request
.client
.response_sender
.send(Response::Textual(TextualResponse::Outputs(
if is_get_next {
TextualOutputsResponse::GetNext {
id,
result: Err(error_message),
}
} else {
TextualOutputsResponse::Subscribe {
id,
result: Err(error_message),
}
},
)))
.await
.expect("receiver should always wait for all senders");
SubscriptionsState::Unchanged
}
Entry::Vacant(entry) => {
entry.insert(Subscription {
path,
format,
once: is_get_next,
});
if !is_get_next {
request
.client
.response_sender
.send(Response::Textual(TextualResponse::Outputs(
TextualOutputsResponse::Subscribe { id, result: Ok(()) },
)))
.await
.expect("receiver should always wait for all senders");
}
SubscriptionsState::Changed
}
}
} else {
if !fields.contains(&path) {
request
.client
.response_sender
Expand All @@ -168,7 +125,49 @@ where
)))
.await
.expect("receiver should always wait for all senders");
SubscriptionsState::Unchanged
return SubscriptionsState::Unchanged;
}
match subscriptions.entry((request.client.clone(), id)) {
Entry::Occupied(_) => {
let error_message = format!("already subscribed with id {id}");
request
.client
.response_sender
.send(Response::Textual(TextualResponse::Outputs(
if is_get_next {
TextualOutputsResponse::GetNext {
id,
result: Err(error_message),
}
} else {
TextualOutputsResponse::Subscribe {
id,
result: Err(error_message),
}
},
)))
.await
.expect("receiver should always wait for all senders");
SubscriptionsState::Unchanged
}
Entry::Vacant(entry) => {
entry.insert(Subscription {
path,
format,
once: is_get_next,
});
if !is_get_next {
request
.client
.response_sender
.send(Response::Textual(TextualResponse::Outputs(
TextualOutputsResponse::Subscribe { id, result: Ok(()) },
)))
.await
.expect("receiver should always wait for all senders");
}
SubscriptionsState::Changed
}
}
}
OutputsRequest::Unsubscribe {
Expand Down Expand Up @@ -231,7 +230,7 @@ fn write_subscribed_outputs_from_subscriptions(
}

async fn handle_notified_output(
outputs_reader: &Reader<impl SerializeHierarchy>,
outputs_reader: &Reader<impl PathSerialize>,
subscriptions: &mut HashMap<(Client, usize), Subscription>,
next_binary_reference_id: &mut Wrapping<usize>,
) -> SubscriptionsState {
Expand Down Expand Up @@ -354,9 +353,9 @@ mod tests {

use bincode::serialize;
use framework::multiple_buffer_with_slots;
use serde::{de::Deserialize, Deserializer, Serialize, Serializer};
use path_serde::serialize;
use serde::{Serialize, Serializer};
use serde_json::Value;
use serialize_hierarchy::Error;
use tokio::{sync::mpsc::error::TryRecvError, task::yield_now, time::timeout};

use crate::messages::Format;
Expand All @@ -367,42 +366,49 @@ mod tests {
existing_fields: HashMap<String, T>,
}

impl<T> SerializeHierarchy for OutputsFake<T>
impl<T> PathSerialize for OutputsFake<T>
where
for<'a> T: Deserialize<'a> + Serialize,
T: Serialize,
{
fn serialize_path<S>(&self, path: &str, serializer: S) -> Result<S::Ok, Error<S::Error>>
fn serialize_path<S>(
&self,
path: &str,
serializer: S,
) -> Result<S::Ok, serialize::Error<S::Error>>
where
S: Serializer,
{
self.existing_fields
.get(path)
.ok_or(Error::UnexpectedPathSegment {
segment: path.to_string(),
.ok_or(serialize::Error::UnexpectedPath {
path: path.to_owned(),
})?
.serialize(serializer)
.map_err(Error::SerializationFailed)
}

fn deserialize_path<'de, D>(
&mut self,
path: &str,
deserializer: D,
) -> Result<(), Error<D::Error>>
where
D: Deserializer<'de>,
{
self.existing_fields.insert(
path.to_string(),
T::deserialize(deserializer).map_err(Error::DeserializationFailed)?,
);
Ok(())
}

fn exists(field_path: &str) -> bool {
field_path == "a.b.c"
.map_err(serialize::Error::SerializationFailed)
}
}

// impl<T> PathDeserialize for OutputsFake<T>
// where
// for<'a> T: Deserialize<'a> + Serialize,
// {
// fn deserialize_path<'de, D>(
// &mut self,
// path: &str,
// deserializer: D,
// ) -> Result<(), deserialize::Error<D::Error>>
// where
// D: Deserializer<'de>,
// {
// self.existing_fields.insert(
// path.to_string(),
// T::deserialize(deserializer).map_err(deserialize::Error::DeserializationFailed)?,
// );
// Ok(())
// }
// }

impl<T> PathIntrospect for OutputsFake<T> {
fn extend_with_fields(fields: &mut BTreeSet<String>, _prefix: &str) {
fields.insert("a".to_string());
fields.insert("a.b".to_string());
Expand All @@ -413,7 +419,7 @@ mod tests {
async fn get_registered_request_sender_from_provider(
cycler_instance: &'static str,
outputs_changed: Arc<Notify>,
output: Reader<impl SerializeHierarchy + Send + Sync + 'static>,
output: Reader<impl PathIntrospect + PathSerialize + Send + Sync + 'static>,
) -> (
JoinHandle<()>,
BTreeSet<String>,
Expand Down
Loading

0 comments on commit 68e2de5

Please sign in to comment.