Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

splitted model and helper functions into own folders #74

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/generator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod common;
mod model;
pub use common::{
cargo_fix, cargo_fmt, cargo_generate_rustdoc, cargo_init_project, template_render_write,
};
pub use model::generate_models_folder;
48 changes: 48 additions & 0 deletions src/generator/model.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use super::template_render_write;
use crate::template_context::TemplateContext;
use crate::{parser::common::validate_identifier_string, utils::write_to_path_create_dir};

use std::path::Path;

pub fn generate_models_folder(
async_config: &TemplateContext,
template_path: &Path,
output_path: &Path,
) {
async_config
.model
.message_models
.iter()
.for_each(|message_model| {
if !message_model.model_definition.is_empty() {
template_render_write(
&template_path.join("model.go"),
message_model.clone(),
&output_path.join(format!(
"src/model/{}.rs",
validate_identifier_string(&message_model.unique_id, false)
)),
);
}
});

let imports = async_config
.model
.message_models
.iter()
.map(|message_model| {
if !message_model.model_definition.is_empty() {
format!(
"pub mod {}; \n pub use {}::*; \n",
validate_identifier_string(&message_model.unique_id, false),
validate_identifier_string(&message_model.unique_id, false)
)
} else {
"".to_string()
}
})
.collect::<Vec<String>>()
.join("\n");

write_to_path_create_dir(&imports, &output_path.join("src/model/mod.rs")).unwrap();
}
24 changes: 19 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod utils;

use crate::{
asyncapi_model::AsyncAPI,
generator::{cargo_fix, cargo_generate_rustdoc, template_render_write},
generator::{cargo_fix, cargo_generate_rustdoc, generate_models_folder, template_render_write},
utils::append_file_to_file,
};

Expand Down Expand Up @@ -61,15 +61,29 @@ fn main() {
);

template_render_write(
&template_path.join("model.go"),
&template_path.join("Readme.md"),
&async_config,
&output_path.join("src/model.rs"),
&output_path.join("Readme.md"),
);

template_render_write(
&template_path.join("Readme.md"),
&template_path.join("utils/mod.go"),
&async_config,
&output_path.join("Readme.md"),
&output_path.join("src/utils/mod.rs"),
);

template_render_write(
&template_path.join("utils/streams.go"),
&async_config,
&output_path.join("src/utils/streams.rs"),
);
template_render_write(
&template_path.join("utils/common.go"),
&async_config,
&output_path.join("src/utils/common.rs"),
);

generate_models_folder(&async_config, template_path, output_path);
println!("🚀 File generation finished, adding dependencies...");

// make output a compilable project
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use gtmpl_derive::Gtmpl;
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Gtmpl)]
pub struct RustSchemaRepresentation {
// the unique identifier (e.g. UserSignupMessage)
pub unique_id: String,
Expand Down
8 changes: 1 addition & 7 deletions templates/handler.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
use async_nats::{Client, Message, jetstream};
use async_nats::jetstream::Context;
use crate::{publish_message,stream_publish_message,
{{ range .model.message_models }}
{{ if ne .model_definition "" }}
model::{{ .unique_id }},
{{ end }}
{{ end }}
};
use crate::{publish_message,utils::stream_publish_message,model::*};
use std::time;


Expand Down
74 changes: 3 additions & 71 deletions templates/main.go
Original file line number Diff line number Diff line change
@@ -1,80 +1,12 @@
mod handler;
mod model;
use async_nats::{Client, Message, Subscriber};
use async_nats::jetstream::{self, Context, stream};
use async_nats::jetstream::consumer::{pull::{self, Config}, Consumer};
use std::time::Duration;
use futures::StreamExt;

mod utils;
use utils::*;
use crate::handler::*;
use async_nats::jetstream::{self};

pub struct Producer {}

async fn listen_for_message(sub: &mut Subscriber, handler: impl Fn(Message)) {
while let Some(message) = sub.next().await {
handler(message);
println!("Message received by Subscriber: {:?}", sub);
}
}
async fn publish_message(client: &Client, channel: &str, payload: &str) {
let owned_payload = payload.to_owned().into(); // Convert to Bytes
client
.publish(channel.to_string(), owned_payload)
.await
.unwrap();
println!("sent");
}

pub async fn get_consumer(jetstream: &Context, stream_name: &str) -> Result<Consumer<Config>, async_nats::Error>{

let stream = jetstream.get_or_create_stream(jetstream::stream::Config {
name: stream_name.to_string(),
..Default::default()
}).await?;
let consumer = stream.get_or_create_consumer("consumer", Config {
durable_name: Some("consumer".to_string()),
..Default::default()
}).await?;
return Ok(consumer);
}

{{ range .subscribe_channels }}
{{ if (index . 1).original_operation.bindings }}
{{if (index . 1).original_operation.bindings.nats}}
{{ if (index . 1).original_operation.bindings.nats.streamname }}
async fn stream_publish_message(client: &Context, channel: &str, payload: &str) {
let owned_payload = payload.to_owned().into(); // Convert to Bytes
client
.publish(channel.to_string(), owned_payload)
.await
.unwrap();
println!("sent");
}
{{end}}
{{end}}
{{end}}
{{end}} //supports only one stream, otherwise no unique function name(one function is enough for any number of streams, but you have to check if there is one)


{{range .publish_channels}}
{{ if (index . 1).original_operation.bindings }}
{{ if (index . 1).original_operation.bindings.nats.streamname }}
async fn stream_listen_for_message(sub: &Consumer<Config>, handler: impl Fn(jetstream::Message)) -> Result<(), async_nats::Error>{
loop{
tokio::time::sleep(Duration::from_millis(1000)).await;
let mut messages = sub.messages().await?.take(10);
while let Some(message) = messages.next().await {
let message = message?;
handler(message);
println!("Message received by Subscriber: {:?}", sub.cached_info().name); // if you show sub its a mess, is now a Context
}
}
Ok(())
}
{{end}}
{{end}}
{{end}}

#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let client = async_nats::connect("{{ .server.url }}").await?;
Expand Down
8 changes: 3 additions & 5 deletions templates/model.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use serde::{Deserialize, Serialize};

// All models from the specification are defined here
{{ range .model.message_models }}
{{ .model_definition }}
{{ end }}
use super::*;

{{ .model_definition }}

19 changes: 19 additions & 0 deletions templates/utils/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

use async_nats::{Client, Message, Subscriber};
use futures::StreamExt;

pub async fn listen_for_message(sub: &mut Subscriber, handler: impl Fn(Message)) {
while let Some(message) = sub.next().await {
handler(message);
println!("Message received by Subscriber: {:?}", sub);
}
}
pub async fn publish_message(client: &Client, channel: &str, payload: &str) {
let owned_payload = payload.to_owned().into(); // Convert to Bytes
client
.publish(channel.to_string(), owned_payload)
.await
.unwrap();
println!("sent");
}

4 changes: 4 additions & 0 deletions templates/utils/mod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod common;
pub use common::*;
pub mod streams;
pub use streams::*;
41 changes: 41 additions & 0 deletions templates/utils/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use async_nats::jetstream::{self, Context};
use async_nats::jetstream::consumer::{pull::{Config}, Consumer};
use std::time::Duration;
use futures::StreamExt;

pub async fn stream_publish_message(client: &Context, channel: &str, payload: &str) {
let owned_payload = payload.to_owned().into(); // Convert to Bytes
client
.publish(channel.to_string(), owned_payload)
.await
.unwrap();
println!("sent");
}


pub async fn stream_listen_for_message(sub: &Consumer<Config>, handler: impl Fn(jetstream::Message)) -> Result<(), async_nats::Error>{
loop{
tokio::time::sleep(Duration::from_millis(1000)).await;
let mut messages = sub.messages().await?.take(10);
while let Some(message) = messages.next().await {
let message = message?;
handler(message);
println!("Message received by Subscriber: {:?}", sub.cached_info().name); // if you show sub its a mess, is now a Context
}
}
Ok(())
}


pub async fn get_consumer(jetstream: &Context, stream_name: &str) -> Result<Consumer<Config>, async_nats::Error>{

let stream = jetstream.get_or_create_stream(jetstream::stream::Config {
name: stream_name.to_string(),
..Default::default()
}).await?;
let consumer = stream.get_or_create_consumer("consumer", Config {
durable_name: Some("consumer".to_string()),
..Default::default()
}).await?;
return Ok(consumer);
}