From db8fc439a082088ffd0c61694cc2834564345a54 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sat, 1 Jul 2023 18:55:39 +0200 Subject: [PATCH 01/14] added check if output folder exists then prompts for if it should be repalced --- src/main.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/main.rs b/src/main.rs index 770e582..730de55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -49,6 +49,26 @@ fn main() { } }; + //check if project with name already exists, if yes ask for permission to overwrite + if output_path.exists() { + let warn_message = format!("A project with the name {} already exists in the current directory, do you want to overwrite the existing project? \nWARNING: This will delete all files in the directory and all applied. \nType 'y' to continue or anything else to exit.",title); + println!("{}", warn_message); + let mut input = String::new(); + match std::io::stdin().read_line(&mut input) { + Ok(_) => { + if input.trim() != "y" { + println!("Aborting generation..."); + std::process::exit(0); + } + std::fs::remove_dir_all(output_path).unwrap(); + } + Err(err) => { + println!("❌ Error reading input: {}", err); + std::process::exit(1); + } + } + } + template_render_write( &template_path.join("main.go"), &async_config, From 467f857eeb28972de1a79d1449743338180ba936 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sat, 1 Jul 2023 19:03:27 +0200 Subject: [PATCH 02/14] refactored function out of main --- src/generator/common.rs | 23 +++++++++++++++++++++++ src/generator/mod.rs | 3 ++- src/main.rs | 22 ++-------------------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/generator/common.rs b/src/generator/common.rs index 7aa5bac..5daa707 100644 --- a/src/generator/common.rs +++ b/src/generator/common.rs @@ -8,6 +8,29 @@ use std::{ process::{Command, Output}, vec, }; + +pub fn check_for_overwrite(output_path: &Path, project_title: &str) { + //check if project with name already exists, if yes ask for permission to overwrite + if output_path.exists() { + let warn_message = format!("A project with the name {} already exists in the current directory, do you want to overwrite the existing project? \nWARNING: This will delete all files in the directory and all applied. \nType 'y' to continue or anything else to exit.",project_title); + println!("{}", warn_message); + let mut input = String::new(); + match std::io::stdin().read_line(&mut input) { + Ok(_) => { + if input.trim() != "y" { + println!("Aborting generation..."); + std::process::exit(0); + } + std::fs::remove_dir_all(output_path).unwrap(); + } + Err(err) => { + println!("❌ Error reading input: {}", err); + std::process::exit(1); + } + } + } +} + /// initialize a cargo project in path pub fn cargo_init_project(path: impl AsRef) -> Output { Command::new("cargo") diff --git a/src/generator/mod.rs b/src/generator/mod.rs index 7260d1b..1f1c531 100644 --- a/src/generator/mod.rs +++ b/src/generator/mod.rs @@ -1,4 +1,5 @@ mod common; pub use common::{ - cargo_fix, cargo_fmt, cargo_generate_rustdoc, cargo_init_project, template_render_write, + cargo_fix, cargo_fmt, cargo_generate_rustdoc, cargo_init_project, check_for_overwrite, + template_render_write, }; diff --git a/src/main.rs b/src/main.rs index 730de55..49af9e1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ mod utils; use crate::{ asyncapi_model::AsyncAPI, - generator::{cargo_fix, cargo_generate_rustdoc, template_render_write}, + generator::{cargo_fix, cargo_generate_rustdoc, check_for_overwrite, template_render_write}, utils::append_file_to_file, }; @@ -49,25 +49,7 @@ fn main() { } }; - //check if project with name already exists, if yes ask for permission to overwrite - if output_path.exists() { - let warn_message = format!("A project with the name {} already exists in the current directory, do you want to overwrite the existing project? \nWARNING: This will delete all files in the directory and all applied. \nType 'y' to continue or anything else to exit.",title); - println!("{}", warn_message); - let mut input = String::new(); - match std::io::stdin().read_line(&mut input) { - Ok(_) => { - if input.trim() != "y" { - println!("Aborting generation..."); - std::process::exit(0); - } - std::fs::remove_dir_all(output_path).unwrap(); - } - Err(err) => { - println!("❌ Error reading input: {}", err); - std::process::exit(1); - } - } - } + check_for_overwrite(output_path, title); template_render_write( &template_path.join("main.go"), From 430b7a103b11c6999b161d13042a620663e78e96 Mon Sep 17 00:00:00 2001 From: Aro Date: Sat, 1 Jul 2023 20:15:07 +0200 Subject: [PATCH 03/14] load data into .env file, write multiple templates, tree structure in template folder, service reads .env file --- example/specs/basic_queues_and_stream.yaml | 4 +-- src/asyncapi_model/operation_binding.rs | 2 ++ src/generator/common.rs | 25 ++++++++++++++++- src/generator/mod.rs | 1 + src/main.rs | 30 ++++++++------------ templates/.env | 32 ++++++++++++++++++++++ templates/dependencies.toml | 1 + templates/example.go | 30 -------------------- templates/pubsub.go | 21 -------------- templates/{ => src}/handler.go | 0 templates/{ => src}/main.go | 31 +++++++++++---------- templates/{ => src}/model.go | 0 12 files changed, 89 insertions(+), 88 deletions(-) create mode 100644 templates/.env delete mode 100644 templates/example.go delete mode 100644 templates/pubsub.go rename templates/{ => src}/handler.go (100%) rename templates/{ => src}/main.go (78%) rename templates/{ => src}/model.go (100%) diff --git a/example/specs/basic_queues_and_stream.yaml b/example/specs/basic_queues_and_stream.yaml index fa8cef9..562d30b 100644 --- a/example/specs/basic_queues_and_stream.yaml +++ b/example/specs/basic_queues_and_stream.yaml @@ -11,7 +11,7 @@ channels: subscribe: bindings: nats: - streamname: testStream + x-streamname: testStream operationId: onUserSignup summary: User signup notification message: @@ -27,7 +27,7 @@ channels: publish: bindings: nats: - streamname: testStream + x-streamname: testStream operationId: userSignedUp summary: send welcome email to user message: diff --git a/src/asyncapi_model/operation_binding.rs b/src/asyncapi_model/operation_binding.rs index 2824066..79f9a18 100644 --- a/src/asyncapi_model/operation_binding.rs +++ b/src/asyncapi_model/operation_binding.rs @@ -247,6 +247,8 @@ pub struct NATSOperationBinding { /// The version of this binding. If omitted, "latest" MUST be assumed. #[serde(skip_serializing_if = "Option::is_none")] pub binding_version: Option, + + #[serde(rename(deserialize = "x-streamname", serialize = "streamname"))] pub streamname: Option, } diff --git a/src/generator/common.rs b/src/generator/common.rs index 7aa5bac..90ef48b 100644 --- a/src/generator/common.rs +++ b/src/generator/common.rs @@ -1,6 +1,6 @@ use gtmpl::Context; -use crate::utils; +use crate::{template_context::TemplateContext, utils}; use std::{ ffi::OsStr, fs, @@ -119,6 +119,29 @@ pub fn template_render_write( } } +pub fn write_multiple_templates( + template_path: &Path, + context_ref: &TemplateContext, + output_path: &Path, + endings: &[&str], +) { + for ending in endings { + if ending.ends_with(".go") { + template_render_write( + &template_path.join(ending), + context_ref, + &output_path.join(ending).with_extension("rs"), + ); + } else { + template_render_write( + &template_path.join(ending), + context_ref, + &output_path.join(ending), + ); + } + } +} + pub fn cargo_generate_rustdoc(path: &Path) { Command::new("cargo") .current_dir(path) diff --git a/src/generator/mod.rs b/src/generator/mod.rs index 7260d1b..967b074 100644 --- a/src/generator/mod.rs +++ b/src/generator/mod.rs @@ -1,4 +1,5 @@ mod common; pub use common::{ cargo_fix, cargo_fmt, cargo_generate_rustdoc, cargo_init_project, template_render_write, + write_multiple_templates, }; diff --git a/src/main.rs b/src/main.rs index 770e582..f5dc7ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ mod utils; use crate::{ asyncapi_model::AsyncAPI, - generator::{cargo_fix, cargo_generate_rustdoc, template_render_write}, + generator::{cargo_fix, cargo_generate_rustdoc, write_multiple_templates}, utils::append_file_to_file, }; @@ -49,27 +49,19 @@ fn main() { } }; - template_render_write( - &template_path.join("main.go"), + write_multiple_templates( + &template_path, &async_config, - &output_path.join("src/main.rs"), - ); - template_render_write( - &template_path.join("handler.go"), - &async_config, - &output_path.join("src/handler.rs"), + &output_path, + &[ + "src/main.go", + "src/handler.go", + "src/model.go", + "Readme.md", + ".env", + ], ); - template_render_write( - &template_path.join("model.go"), - &async_config, - &output_path.join("src/model.rs"), - ); - template_render_write( - &template_path.join("Readme.md"), - &async_config, - &output_path.join("Readme.md"), - ); println!("🚀 File generation finished, adding dependencies..."); // make output a compilable project diff --git a/templates/.env b/templates/.env new file mode 100644 index 0000000..8b8a515 --- /dev/null +++ b/templates/.env @@ -0,0 +1,32 @@ +################General Config################ + +SERVICE_PORT = "http://localhost:8080" +SERVER_URL = "{{ .server.url }}" +LOG_LEVEL = "DEBUG" +OPA_RULES= "path/to/admin/policy" + +################Channel wise Config################ +{{ range .subscribe_channels }} + {{ if (index . 1).original_operation.bindings }} + {{ if (index . 1).original_operation.bindings.nats.queue }} +{{ (index . 1).unique_id}}_QUEUE = "{{ (index . 1).original_operation.bindings.nats.queue}}" + {{else}} +{{ (index . 1).unique_id}}_STREAM = "{{ (index . 1).original_operation.bindings.nats.streamname}}" + {{ end }} + {{ end }} +{{ (index . 1).unique_id }}_SUBJECT = "{{ (index . 0) }}" +{{ end }} + +{{ range .publish_channels }} + {{ if (index . 1).original_operation.bindings }} + {{ if (index . 1).original_operation.bindings.nats.queue }} +{{ (index . 1).unique_id}}_QUEUE = "{{ (index . 1).original_operation.bindings.nats.queue}}" + {{else}} +{{ (index . 1).unique_id}}_STREAM = "{{ (index . 1).original_operation.bindings.nats.streamname}}" + {{ end }} + {{ end }} +{{ (index . 1).unique_id }}_SUBJECT = "{{ (index . 0) }}" +{{ end }} + + + diff --git a/templates/dependencies.toml b/templates/dependencies.toml index a7581e9..bf991d2 100644 --- a/templates/dependencies.toml +++ b/templates/dependencies.toml @@ -3,3 +3,4 @@ futures = "0.3.28" serde = "1.0.164" serde_json = "1.0.97" tokio = { version = "1.28.2", features = ["full"] } +dotenv = "0.15.0" diff --git a/templates/example.go b/templates/example.go deleted file mode 100644 index 5d1899f..0000000 --- a/templates/example.go +++ /dev/null @@ -1,30 +0,0 @@ -use async_nats::{Client, Subscriber}; -use futures::StreamExt; - -async fn listen_for_message(sub1: &mut Subscriber) { - while let Some(message) = sub1.next().await { - println!("Received message {:#?}", message); - } -} - -async fn publish_messages(client: &Client, amount: i32, channel: &str, data: &'static str) { - for _ in 0..amount { - client.publish(channel.into(), data.into()).await.unwrap(); - } -} - -#[tokio::main] -async fn main() -> Result<(), async_nats::Error> { - let client = async_nats::connect("{{ .server.url }}").await?; - // declare subscribers - - // subscribe - tokio::select! { - {{range .subscribe_channels}} - _=listen_for_message(&mut {{ (index . 1).operationId }}) => {} - {{end}} - } - println!("fin"); - - Ok(()) -} diff --git a/templates/pubsub.go b/templates/pubsub.go deleted file mode 100644 index 8bf0d0b..0000000 --- a/templates/pubsub.go +++ /dev/null @@ -1,21 +0,0 @@ -use futures::StreamExt; -use serde::{Deserialize, Serialize}; - - -{{ .schema }} - -#[tokio::main] -async fn main() -> Result<(), async_nats::Error> { - let client = async_nats::connect("{{ .server_url }}").await?; - let mut subscriber = client.subscribe("{{ .channel_name }}".into()).await?.take(10); - - for _ in 0..10 { - client.publish("{{ .channel_name }}".into(), "Hi mom".into()).await?; - } - - while let Some(message) = subscriber.next().await { - println!("Received message {:?}", message); - } - - Ok(()) -} diff --git a/templates/handler.go b/templates/src/handler.go similarity index 100% rename from templates/handler.go rename to templates/src/handler.go diff --git a/templates/main.go b/templates/src/main.go similarity index 78% rename from templates/main.go rename to templates/src/main.go index 0e10e82..01d631c 100644 --- a/templates/main.go +++ b/templates/src/main.go @@ -3,13 +3,12 @@ mod model; use async_nats::{Client, Message, Subscriber}; use async_nats::jetstream::{self, Context, stream}; use async_nats::jetstream::consumer::{pull::{self, Config}, Consumer}; -use std::time::Duration; +use std::{time::Duration, env, collections::HashMap}; use futures::StreamExt; +use dotenv::dotenv; use crate::handler::*; -pub struct Producer {} - async fn listen_for_message(sub: &mut Subscriber, handler: impl Fn(Message)) { while let Some(message) = sub.next().await { handler(message); @@ -77,48 +76,50 @@ pub async fn get_consumer(jetstream: &Context, stream_name: &str) -> Result Result<(), async_nats::Error> { - let client = async_nats::connect("{{ .server.url }}").await?; + + dotenv().ok(); + let env: HashMap = env::vars().collect(); + + let client = async_nats::connect(env.get("SERVER_URL").unwrap()).await?; {{ range .publish_channels }} {{ if (index . 1).original_operation.bindings }} {{ if (index . 1).original_operation.bindings.nats.queue }} - let mut {{ (index . 1).unique_id }} = client.queue_subscribe("{{ index . 0 }}".into(), "{{ (index . 1).original_operation.bindings.nats.queue }}".into()).await?; + let mut {{ (index . 1).unique_id }} = client.queue_subscribe(env.get("{{ (index . 1).unique_id}}_SUBJECT").unwrap().into(), + env.get("{{ (index . 1).unique_id}}_QUEUE").unwrap().into()).await?; {{ else }} let clientcpy = client.clone(); let context_jetstream = jetstream::new(clientcpy); - let {{ (index . 1).unique_id }} = "{{ (index . 1).original_operation.bindings.nats.streamname }}"; + let {{ (index . 1).unique_id }} = env.get("{{ (index . 1).unique_id }}_STREAM").unwrap(); let consumer = get_consumer(&context_jetstream, &{{ (index . 1).unique_id }}).await?; {{end}} {{ else }} - let mut {{ (index . 1).unique_id }} = client.subscribe("{{ index . 0 }}".into()).await?; + let mut {{ (index . 1).unique_id }} = client.subscribe(env.get("{{ (index . 1).unique_id}}_SUBJECT").unwrap().into()).await?; {{end}} {{end}} - //test(&client, "foo").await; tokio::join!( {{ range .subscribe_channels }} {{ if (index . 1).original_operation.bindings }} {{if (index . 1).original_operation.bindings.nats.streamname}} - stream_producer_{{ (index . 1).unique_id }}(&context_jetstream, "{{ (index . 1).original_operation.bindings.nats.streamname }}"), - {{ else }} - producer_{{ (index . 1).unique_id }}(&client, "{{ index . 0 }}"), + stream_producer_{{ (index . 1).unique_id }}(&context_jetstream, env.get("{{ (index . 1).unique_id}}_STREAM").unwrap()), {{ end }} {{ else }} - producer_{{ (index . 1).unique_id }}(&client, "{{ index . 0 }}"), + producer_{{ (index . 1).unique_id }}(&client, env.get("{{ (index . 1).unique_id}}_SUBJECT").unwrap() ), {{ end }} {{ end }} {{ range .publish_channels }} {{ if (index . 1).original_operation.bindings }} {{if (index . 1).original_operation.bindings.nats.streamname}} - stream_listen_for_message(&consumer, stream_handler_{{ (index . 1).unique_id }}), + stream_listen_for_message(&consumer, stream_handler_{{ (index . 1).unique_id }}), {{ else }} - listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), + listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), {{ end }} {{ else }} - listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), + listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), {{ end }} {{ end }} ); diff --git a/templates/model.go b/templates/src/model.go similarity index 100% rename from templates/model.go rename to templates/src/model.go From d95e0f4c767e653e22a41db64ba60d9a7331bbf1 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sat, 1 Jul 2023 21:17:26 +0200 Subject: [PATCH 04/14] splitted model and helper functions into own folders --- src/generator/mod.rs | 2 + src/generator/model.rs | 48 ++++++++++++ src/main.rs | 24 ++++-- .../types/rust_schema_representation.rs | 3 +- templates/handler.go | 8 +- templates/main.go | 74 +------------------ templates/model.go | 8 +- templates/utils/common.go | 19 +++++ templates/utils/mod.go | 4 + templates/utils/streams.go | 41 ++++++++++ 10 files changed, 142 insertions(+), 89 deletions(-) create mode 100644 src/generator/model.rs create mode 100644 templates/utils/common.go create mode 100644 templates/utils/mod.go create mode 100644 templates/utils/streams.go diff --git a/src/generator/mod.rs b/src/generator/mod.rs index 7260d1b..6938c0c 100644 --- a/src/generator/mod.rs +++ b/src/generator/mod.rs @@ -1,4 +1,6 @@ mod common; +mod model; pub use common::{ cargo_fix, cargo_fmt, cargo_generate_rustdoc, cargo_init_project, template_render_write, }; +pub use model::generate_models_folder; diff --git a/src/generator/model.rs b/src/generator/model.rs new file mode 100644 index 0000000..1543a13 --- /dev/null +++ b/src/generator/model.rs @@ -0,0 +1,48 @@ +use super::template_render_write; +use crate::template_context::TemplateContext; +use crate::{parser::common::validate_identifier_string, utils::write_to_path_create_dir}; + +use std::path::Path; + +pub fn generate_models_folder( + async_config: &TemplateContext, + template_path: &Path, + output_path: &Path, +) { + async_config + .model + .message_models + .iter() + .for_each(|message_model| { + if !message_model.model_definition.is_empty() { + template_render_write( + &template_path.join("model.go"), + message_model.clone(), + &output_path.join(format!( + "src/model/{}.rs", + validate_identifier_string(&message_model.unique_id, false) + )), + ); + } + }); + + let imports = async_config + .model + .message_models + .iter() + .map(|message_model| { + if !message_model.model_definition.is_empty() { + format!( + "pub mod {}; \n pub use {}::*; \n", + validate_identifier_string(&message_model.unique_id, false), + validate_identifier_string(&message_model.unique_id, false) + ) + } else { + "".to_string() + } + }) + .collect::>() + .join("\n"); + + write_to_path_create_dir(&imports, &output_path.join("src/model/mod.rs")).unwrap(); +} diff --git a/src/main.rs b/src/main.rs index 770e582..9fbb20f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ mod utils; use crate::{ asyncapi_model::AsyncAPI, - generator::{cargo_fix, cargo_generate_rustdoc, template_render_write}, + generator::{cargo_fix, cargo_generate_rustdoc, generate_models_folder, template_render_write}, utils::append_file_to_file, }; @@ -61,15 +61,29 @@ fn main() { ); template_render_write( - &template_path.join("model.go"), + &template_path.join("Readme.md"), &async_config, - &output_path.join("src/model.rs"), + &output_path.join("Readme.md"), ); + template_render_write( - &template_path.join("Readme.md"), + &template_path.join("utils/mod.go"), &async_config, - &output_path.join("Readme.md"), + &output_path.join("src/utils/mod.rs"), ); + + template_render_write( + &template_path.join("utils/streams.go"), + &async_config, + &output_path.join("src/utils/streams.rs"), + ); + template_render_write( + &template_path.join("utils/common.go"), + &async_config, + &output_path.join("src/utils/common.rs"), + ); + + generate_models_folder(&async_config, template_path, output_path); println!("🚀 File generation finished, adding dependencies..."); // make output a compilable project diff --git a/src/parser/json_schema_parser/types/rust_schema_representation.rs b/src/parser/json_schema_parser/types/rust_schema_representation.rs index 8a492e5..b78ec78 100644 --- a/src/parser/json_schema_parser/types/rust_schema_representation.rs +++ b/src/parser/json_schema_parser/types/rust_schema_representation.rs @@ -1,5 +1,6 @@ +use gtmpl_derive::Gtmpl; use serde::Serialize; -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Gtmpl)] pub struct RustSchemaRepresentation { // the unique identifier (e.g. UserSignupMessage) pub unique_id: String, diff --git a/templates/handler.go b/templates/handler.go index ca52fba..5a4497d 100644 --- a/templates/handler.go +++ b/templates/handler.go @@ -1,12 +1,6 @@ use async_nats::{Client, Message, jetstream}; use async_nats::jetstream::Context; -use crate::{publish_message,stream_publish_message, -{{ range .model.message_models }} - {{ if ne .model_definition "" }} - model::{{ .unique_id }}, - {{ end }} -{{ end }} -}; +use crate::{publish_message,utils::stream_publish_message,model::*}; use std::time; diff --git a/templates/main.go b/templates/main.go index 0e10e82..15638fb 100644 --- a/templates/main.go +++ b/templates/main.go @@ -1,80 +1,12 @@ mod handler; mod model; -use async_nats::{Client, Message, Subscriber}; -use async_nats::jetstream::{self, Context, stream}; -use async_nats::jetstream::consumer::{pull::{self, Config}, Consumer}; -use std::time::Duration; -use futures::StreamExt; - +mod utils; +use utils::*; use crate::handler::*; +use async_nats::jetstream::{self}; pub struct Producer {} -async fn listen_for_message(sub: &mut Subscriber, handler: impl Fn(Message)) { - while let Some(message) = sub.next().await { - handler(message); - println!("Message received by Subscriber: {:?}", sub); - } -} -async fn publish_message(client: &Client, channel: &str, payload: &str) { - let owned_payload = payload.to_owned().into(); // Convert to Bytes - client - .publish(channel.to_string(), owned_payload) - .await - .unwrap(); - println!("sent"); -} - -pub async fn get_consumer(jetstream: &Context, stream_name: &str) -> Result, async_nats::Error>{ - - let stream = jetstream.get_or_create_stream(jetstream::stream::Config { - name: stream_name.to_string(), - ..Default::default() - }).await?; - let consumer = stream.get_or_create_consumer("consumer", Config { - durable_name: Some("consumer".to_string()), - ..Default::default() - }).await?; - return Ok(consumer); -} - -{{ range .subscribe_channels }} - {{ if (index . 1).original_operation.bindings }} - {{if (index . 1).original_operation.bindings.nats}} - {{ if (index . 1).original_operation.bindings.nats.streamname }} - async fn stream_publish_message(client: &Context, channel: &str, payload: &str) { - let owned_payload = payload.to_owned().into(); // Convert to Bytes - client - .publish(channel.to_string(), owned_payload) - .await - .unwrap(); - println!("sent"); - } - {{end}} - {{end}} - {{end}} -{{end}} //supports only one stream, otherwise no unique function name(one function is enough for any number of streams, but you have to check if there is one) - - -{{range .publish_channels}} - {{ if (index . 1).original_operation.bindings }} - {{ if (index . 1).original_operation.bindings.nats.streamname }} - async fn stream_listen_for_message(sub: &Consumer, handler: impl Fn(jetstream::Message)) -> Result<(), async_nats::Error>{ - loop{ - tokio::time::sleep(Duration::from_millis(1000)).await; - let mut messages = sub.messages().await?.take(10); - while let Some(message) = messages.next().await { - let message = message?; - handler(message); - println!("Message received by Subscriber: {:?}", sub.cached_info().name); // if you show sub its a mess, is now a Context - } - } - Ok(()) - } - {{end}} - {{end}} -{{end}} - #[tokio::main] async fn main() -> Result<(), async_nats::Error> { let client = async_nats::connect("{{ .server.url }}").await?; diff --git a/templates/model.go b/templates/model.go index 1ddabc8..b08d168 100644 --- a/templates/model.go +++ b/templates/model.go @@ -1,7 +1,5 @@ use serde::{Deserialize, Serialize}; - -// All models from the specification are defined here -{{ range .model.message_models }} - {{ .model_definition }} -{{ end }} +use super::*; + +{{ .model_definition }} diff --git a/templates/utils/common.go b/templates/utils/common.go new file mode 100644 index 0000000..bbf88f5 --- /dev/null +++ b/templates/utils/common.go @@ -0,0 +1,19 @@ + +use async_nats::{Client, Message, Subscriber}; +use futures::StreamExt; + +pub async fn listen_for_message(sub: &mut Subscriber, handler: impl Fn(Message)) { + while let Some(message) = sub.next().await { + handler(message); + println!("Message received by Subscriber: {:?}", sub); + } +} +pub async fn publish_message(client: &Client, channel: &str, payload: &str) { + let owned_payload = payload.to_owned().into(); // Convert to Bytes + client + .publish(channel.to_string(), owned_payload) + .await + .unwrap(); + println!("sent"); +} + diff --git a/templates/utils/mod.go b/templates/utils/mod.go new file mode 100644 index 0000000..e16f31a --- /dev/null +++ b/templates/utils/mod.go @@ -0,0 +1,4 @@ +pub mod common; +pub use common::*; +pub mod streams; +pub use streams::*; \ No newline at end of file diff --git a/templates/utils/streams.go b/templates/utils/streams.go new file mode 100644 index 0000000..381cb8d --- /dev/null +++ b/templates/utils/streams.go @@ -0,0 +1,41 @@ +use async_nats::jetstream::{self, Context}; +use async_nats::jetstream::consumer::{pull::{Config}, Consumer}; +use std::time::Duration; +use futures::StreamExt; + +pub async fn stream_publish_message(client: &Context, channel: &str, payload: &str) { + let owned_payload = payload.to_owned().into(); // Convert to Bytes + client + .publish(channel.to_string(), owned_payload) + .await + .unwrap(); + println!("sent"); +} + + +pub async fn stream_listen_for_message(sub: &Consumer, handler: impl Fn(jetstream::Message)) -> Result<(), async_nats::Error>{ + loop{ + tokio::time::sleep(Duration::from_millis(1000)).await; + let mut messages = sub.messages().await?.take(10); + while let Some(message) = messages.next().await { + let message = message?; + handler(message); + println!("Message received by Subscriber: {:?}", sub.cached_info().name); // if you show sub its a mess, is now a Context + } + } + Ok(()) +} + + +pub async fn get_consumer(jetstream: &Context, stream_name: &str) -> Result, async_nats::Error>{ + + let stream = jetstream.get_or_create_stream(jetstream::stream::Config { + name: stream_name.to_string(), + ..Default::default() + }).await?; + let consumer = stream.get_or_create_consumer("consumer", Config { + durable_name: Some("consumer".to_string()), + ..Default::default() + }).await?; + return Ok(consumer); +} \ No newline at end of file From 27f4d6d3d6a395a2fa93f903ea8782da2f370d9d Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sun, 2 Jul 2023 10:18:14 +0200 Subject: [PATCH 05/14] moved utils into correct folder --- templates/{ => src}/utils/common.go | 0 templates/{ => src}/utils/mod.go | 0 templates/{ => src}/utils/streams.go | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename templates/{ => src}/utils/common.go (100%) rename templates/{ => src}/utils/mod.go (100%) rename templates/{ => src}/utils/streams.go (100%) diff --git a/templates/utils/common.go b/templates/src/utils/common.go similarity index 100% rename from templates/utils/common.go rename to templates/src/utils/common.go diff --git a/templates/utils/mod.go b/templates/src/utils/mod.go similarity index 100% rename from templates/utils/mod.go rename to templates/src/utils/mod.go diff --git a/templates/utils/streams.go b/templates/src/utils/streams.go similarity index 100% rename from templates/utils/streams.go rename to templates/src/utils/streams.go From affb30bb7ebcb53548cfbb3c1aba87615ef830d0 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sun, 2 Jul 2023 10:20:40 +0200 Subject: [PATCH 06/14] removed unneeded log --- src/generator/common.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/generator/common.rs b/src/generator/common.rs index d41f79f..e6c8461 100644 --- a/src/generator/common.rs +++ b/src/generator/common.rs @@ -62,7 +62,6 @@ pub fn cargo_fix(path: &PathBuf) -> Output { } fn key_exists(args: &[gtmpl_value::Value]) -> Result { - println!("{:?}", args); if args.is_empty() { return Err(gtmpl_value::FuncError::AtLeastXArgs( "Need at least 1 arg for key exists".to_string(), From 3b92ae1816515521b75b6fdf0926b26e46c62425 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sun, 2 Jul 2023 10:34:08 +0200 Subject: [PATCH 07/14] completed merge --- example/specs/basic_queues_and_stream.yaml | 2 +- src/generator/common.rs | 6 +++++- src/generator/model.rs | 2 +- src/main.rs | 12 +++++++----- templates/src/main.go | 2 ++ 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/example/specs/basic_queues_and_stream.yaml b/example/specs/basic_queues_and_stream.yaml index 562d30b..55d376e 100644 --- a/example/specs/basic_queues_and_stream.yaml +++ b/example/specs/basic_queues_and_stream.yaml @@ -1,6 +1,6 @@ asyncapi: 2.1.0 info: - title: My_API + title: queque_and_stream_api version: 1.0.0 servers: production: diff --git a/src/generator/common.rs b/src/generator/common.rs index e6c8461..3a4fb1e 100644 --- a/src/generator/common.rs +++ b/src/generator/common.rs @@ -116,7 +116,11 @@ pub fn template_render_write( let template = match fs::read_to_string(template_path) { Ok(template) => template, Err(e) => { - eprintln!("❌ Error reading template: {}", e); + eprintln!( + "❌ Error reading template {:#?}: {}", + template_path.to_str(), + e + ); std::process::exit(1); } }; diff --git a/src/generator/model.rs b/src/generator/model.rs index 1543a13..fc77588 100644 --- a/src/generator/model.rs +++ b/src/generator/model.rs @@ -16,7 +16,7 @@ pub fn generate_models_folder( .for_each(|message_model| { if !message_model.model_definition.is_empty() { template_render_write( - &template_path.join("model.go"), + &template_path.join("src/model.go"), message_model.clone(), &output_path.join(format!( "src/model/{}.rs", diff --git a/src/main.rs b/src/main.rs index d597e12..1ee3f2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,10 @@ mod utils; use crate::{ asyncapi_model::AsyncAPI, - generator::{cargo_fix, cargo_generate_rustdoc, check_for_overwrite, generate_models_folder, template_render_write, write_multiple_templates}, + generator::{ + cargo_fix, cargo_generate_rustdoc, check_for_overwrite, generate_models_folder, + write_multiple_templates, + }, utils::append_file_to_file, }; @@ -51,11 +54,10 @@ fn main() { check_for_overwrite(output_path, title); - write_multiple_templates( - &template_path, + template_path, &async_config, - &output_path, + output_path, &[ "src/main.go", "src/handler.go", @@ -63,7 +65,7 @@ fn main() { ".env", "src/utils/mod.go", "src/utils/streams.go", - "src/utils/common.rs" + "src/utils/common.go", ], ); diff --git a/templates/src/main.go b/templates/src/main.go index 2f6cc12..04e942d 100644 --- a/templates/src/main.go +++ b/templates/src/main.go @@ -4,6 +4,8 @@ mod utils; use utils::*; use crate::handler::*; use async_nats::jetstream::{self}; +use std::{env, collections::HashMap}; +use dotenv::dotenv; #[tokio::main] async fn main() -> Result<(), async_nats::Error> { From da8dcbf14f7be6965016b33f033dced621c64622 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sun, 2 Jul 2023 10:41:42 +0200 Subject: [PATCH 08/14] fixed bug with some producer handlers not being created --- templates/src/main.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/templates/src/main.go b/templates/src/main.go index 04e942d..ca26f99 100644 --- a/templates/src/main.go +++ b/templates/src/main.go @@ -34,25 +34,25 @@ async fn main() -> Result<(), async_nats::Error> { tokio::join!( {{ range .subscribe_channels }} - {{ if (index . 1).original_operation.bindings }} - {{if (index . 1).original_operation.bindings.nats.streamname}} - stream_producer_{{ (index . 1).unique_id }}(&context_jetstream, env.get("{{ (index . 1).unique_id}}_STREAM").unwrap()), - {{ end }} + {{ $isStream := false }} + {{if key_exists (index . 1) "original_operation" "bindings" "nats" "streamname"}} + {{ $isStream := ((index . 1).original_operation.bindings.nats.streamname) }} + {{end}} + {{if $isStream}} + stream_producer_{{ (index . 1).unique_id }}(&context_jetstream, env.get("{{ (index . 1).unique_id}}_STREAM").unwrap()), {{ else }} - producer_{{ (index . 1).unique_id }}(&client, env.get("{{ (index . 1).unique_id}}_SUBJECT").unwrap() ), - {{ end }} - {{ end }} + producer_{{ (index . 1).unique_id }}(&client, env.get("{{ (index . 1).unique_id}}_SUBJECT").unwrap() ), + {{ end }} + {{ end }} {{ range .publish_channels }} - {{ if (index . 1).original_operation.bindings }} - {{if (index . 1).original_operation.bindings.nats.streamname}} - stream_listen_for_message(&consumer, stream_handler_{{ (index . 1).unique_id }}), - - {{ else }} - listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), - {{ end }} - + {{ $isStream := false }} + {{if key_exists (index . 1) "original_operation" "bindings" "nats" "streamname"}} + {{ $isStream := ((index . 1).original_operation.bindings.nats.streamname) }} + {{end}} + {{if $isStream}} + stream_listen_for_message(&consumer, stream_handler_{{ (index . 1).unique_id }}), {{ else }} - listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), + listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), {{ end }} {{ end }} ); From 0984e3e84949ce9758d5df2993409aea627b75c3 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sun, 2 Jul 2023 14:12:03 +0200 Subject: [PATCH 09/14] made publish handlers more usable --- example/demo/automatic_lighting.yaml | 68 ++++++++++++++++++++++++++++ src/main.rs | 1 + templates/src/config/mod.go | 7 +++ templates/src/handler.go | 62 ++++++++++++++++--------- templates/src/main.go | 17 +++---- templates/src/utils/common.go | 12 +++-- 6 files changed, 131 insertions(+), 36 deletions(-) create mode 100644 example/demo/automatic_lighting.yaml create mode 100644 templates/src/config/mod.go diff --git a/example/demo/automatic_lighting.yaml b/example/demo/automatic_lighting.yaml new file mode 100644 index 0000000..c9e741b --- /dev/null +++ b/example/demo/automatic_lighting.yaml @@ -0,0 +1,68 @@ +asyncapi: '2.6.0' +info: + title: automatic lighting + version: '1.0.0' + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + ### Check out its awesome features: + * Turn a specific streetlight on/off 🌃 + * Dim a specific streetlight 😎 + * Receive real-time information about environmental lighting conditions 📈 +servers: + dev: + url: localhost:4022 + protocol: nats + description: Nats message broker +channels: + lightMeasured: + description: The topic on which measured values may be produced and consumed. + publish: + summary: Inform about environmental lighting conditions of a particular streetlight. + operationId: receiveLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + turnOnStreetlight: + subscribe: + operationId: turnOn + message: + $ref: '#/components/messages/turnOnOff' + turnOffStreetlight: + subscribe: + operationId: turnOff + message: + $ref: '#/components/messages/turnOnOff' +components: + messages: + lightMeasured: + name: lightMeasured + title: Light measured + summary: Inform about environmental lighting conditions of a particular streetlight. + contentType: application/json + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + turnOnOff: + name: turnOnOff + title: Turn on/off + summary: Command a particular streetlight to turn the lights on or off. + payload: + $ref: "#/components/schemas/turnOnOffPayload" + schemas: + lightMeasuredPayload: + type: object + properties: + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + turnOnOffPayload: + type: object + properties: + lightId: + type: string + command: + type: string + enum: + - on + - off + description: Whether to turn on or off the light. + diff --git a/src/main.rs b/src/main.rs index 1ee3f2e..bfb8645 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,7 @@ fn main() { "src/utils/mod.go", "src/utils/streams.go", "src/utils/common.go", + "src/config/mod.go", ], ); diff --git a/templates/src/config/mod.go b/templates/src/config/mod.go new file mode 100644 index 0000000..13ab37d --- /dev/null +++ b/templates/src/config/mod.go @@ -0,0 +1,7 @@ +use dotenv::dotenv; +use std::{collections::HashMap, env}; + +pub fn get_env() -> HashMap { + dotenv().ok(); + env::vars().collect() +} \ No newline at end of file diff --git a/templates/src/handler.go b/templates/src/handler.go index 5a4497d..6133eed 100644 --- a/templates/src/handler.go +++ b/templates/src/handler.go @@ -1,6 +1,6 @@ use async_nats::{Client, Message, jetstream}; use async_nats::jetstream::Context; -use crate::{publish_message,utils::stream_publish_message,model::*}; +use crate::{publish_message,stream_publish_message,model::*}; use std::time; @@ -17,7 +17,7 @@ use std::time; {{ $isStream := ((index . 1).original_operation.bindings.nats.streamname) }} {{end}} {{if $isStream}} - pub fn stream_handler_{{ (index . 1).unique_id }}(message: jetstream::Message) { + pub fn stream_handler_{{ (index . 1).unique_id }}(message: jetstream::Message, client: &Client) { {{ range (index . 1).messages }} {{ if .payload}} match serde_json::from_slice::<{{ .payload.struct_reference }}>(&message.message.payload.as_ref()) { @@ -45,13 +45,11 @@ use std::time; {{ end }} } {{else}} - pub fn handler_{{ (index . 1).unique_id }}(message: Message) { + pub async fn handler_{{ (index . 1).unique_id }}(message: Message, client: &Client) { {{ range (index . 1).messages }} {{ if .payload}} match serde_json::from_slice::<{{ .payload.struct_reference }}>(&message.payload.as_ref()) { Ok(deserialized_message) => { - println!("Received message {:#?}", deserialized_message); - // TODO: Replace this with your own handler code {{ if eq .payload.model_type "enum"}} match deserialized_message { {{$enumName := .payload.unique_id}} @@ -62,6 +60,9 @@ use std::time; } {{ end }} } + {{else}} + println!("Received message {:#?}", deserialized_message); + // TODO: Replace this with your own handler code {{ end }} }, Err(_) => { @@ -78,6 +79,7 @@ use std::time; {{ range .subscribe_channels }} {{ $isStream := false }} + {{ $channel := . }} /// Publish a message in the {{ (index . 1).unique_id }} channel /// Channel messages: @@ -89,22 +91,40 @@ use std::time; {{ end }} {{ if $isStream }} - pub async fn stream_producer_{{ (index . 1).unique_id }}(context_stream: &Context, channel: &str) { //context instead of client - // This is just an example producer, publishing a message every 2 seconds - // TODO: replace this with your own producer code - loop { - tokio::time::sleep(time::Duration::from_secs(2)).await; - stream_publish_message(context_stream, channel, "{\"test\":\"serialized\"}").await; - } - } + {{ range (index . 1).messages }} + pub async fn stream_producer_{{ (index $channel 1).unique_id }}(context_stream: &Context, payload : {{ if .payload}} {{ .payload.struct_reference }} {{ else }} () {{ end }}) { //context instead of client + let subject = config::get_env().get("{{ (index $channel 1).unique_id }}_SUBJECT").unwrap().clone(); + {{ if .payload }} + let payload = match serde_json::to_string(&payload) { + Ok(payload) => payload, + Err(_) => { + println!("Failed to serialize message payload: {{ .payload.struct_reference }}"); + return; + } + }; + publish_message(client, &subject, &payload).await; + {{else}} + publish_message(client, &subject, &"").await; + {{end}} + } + {{end}} {{ else }} - pub async fn producer_{{ (index . 1).unique_id }}(client: &Client, channel: &str) { - // This is just an example producer, publishing a message every 2 seconds - // TODO: replace this with your own producer code - loop { - tokio::time::sleep(time::Duration::from_secs(2)).await; - publish_message(client, channel, "{\"test\":\"serialized\"}").await; + {{ range (index . 1).messages }} + pub async fn producer_{{ (index $channel 1).unique_id }}(client: &Client, payload: {{ if .payload }} {{.payload.struct_reference}} {{else}} () {{end}}) { + let subject = config::get_env().get("{{ (index $channel 1).unique_id }}_SUBJECT").unwrap().clone(); + {{ if .payload }} + let payload = match serde_json::to_string(&payload) { + Ok(payload) => payload, + Err(_) => { + println!("Failed to serialize message payload: {{ .payload.struct_reference }}"); + return; + } + }; + publish_message(client, &subject, &payload).await; + {{else}} + publish_message(client, &subject, &"").await; + {{end}} } - } + {{ end }} {{ end }} -{{ end }} +{{ end }} diff --git a/templates/src/main.go b/templates/src/main.go index ca26f99..d1f08b2 100644 --- a/templates/src/main.go +++ b/templates/src/main.go @@ -4,14 +4,14 @@ mod utils; use utils::*; use crate::handler::*; use async_nats::jetstream::{self}; -use std::{env, collections::HashMap}; +use std::{collections::HashMap}; use dotenv::dotenv; +mod config; + #[tokio::main] async fn main() -> Result<(), async_nats::Error> { - - dotenv().ok(); - let env: HashMap = env::vars().collect(); + let env: HashMap = config::get_env(); let client = async_nats::connect(env.get("SERVER_URL").unwrap()).await?; @@ -38,11 +38,6 @@ async fn main() -> Result<(), async_nats::Error> { {{if key_exists (index . 1) "original_operation" "bindings" "nats" "streamname"}} {{ $isStream := ((index . 1).original_operation.bindings.nats.streamname) }} {{end}} - {{if $isStream}} - stream_producer_{{ (index . 1).unique_id }}(&context_jetstream, env.get("{{ (index . 1).unique_id}}_STREAM").unwrap()), - {{ else }} - producer_{{ (index . 1).unique_id }}(&client, env.get("{{ (index . 1).unique_id}}_SUBJECT").unwrap() ), - {{ end }} {{ end }} {{ range .publish_channels }} {{ $isStream := false }} @@ -50,9 +45,9 @@ async fn main() -> Result<(), async_nats::Error> { {{ $isStream := ((index . 1).original_operation.bindings.nats.streamname) }} {{end}} {{if $isStream}} - stream_listen_for_message(&consumer, stream_handler_{{ (index . 1).unique_id }}), + stream_listen_for_message(&consumer, stream_handler_{{ (index . 1).unique_id }}, &client), {{ else }} - listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), + listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}, &client), {{ end }} {{ end }} ); diff --git a/templates/src/utils/common.go b/templates/src/utils/common.go index bbf88f5..7b93693 100644 --- a/templates/src/utils/common.go +++ b/templates/src/utils/common.go @@ -1,13 +1,17 @@ - use async_nats::{Client, Message, Subscriber}; use futures::StreamExt; -pub async fn listen_for_message(sub: &mut Subscriber, handler: impl Fn(Message)) { +pub async fn listen_for_message<'a, F, Fut>(sub: &mut Subscriber, handler: F, client: &'a Client) +where + F: Fn(Message,&'a Client) -> Fut + 'a, + Fut: std::future::Future + 'a, +{ while let Some(message) = sub.next().await { - handler(message); - println!("Message received by Subscriber: {:?}", sub); + handler(message, client).await; + println!("Message received by Subscriber: {:?}", sub); } } + pub async fn publish_message(client: &Client, channel: &str, payload: &str) { let owned_payload = payload.to_owned().into(); // Convert to Bytes client From 570f34e769d7db2e18b0db03f31c1f62a4e22dfb Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sun, 2 Jul 2023 14:23:22 +0200 Subject: [PATCH 10/14] corrected path --- templates/src/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/templates/src/handler.go b/templates/src/handler.go index 6133eed..ada52bb 100644 --- a/templates/src/handler.go +++ b/templates/src/handler.go @@ -1,6 +1,6 @@ use async_nats::{Client, Message, jetstream}; use async_nats::jetstream::Context; -use crate::{publish_message,stream_publish_message,model::*}; +use crate::{publish_message,stream_publish_message,model::*, ,config::*}; use std::time; @@ -93,7 +93,7 @@ use std::time; {{ if $isStream }} {{ range (index . 1).messages }} pub async fn stream_producer_{{ (index $channel 1).unique_id }}(context_stream: &Context, payload : {{ if .payload}} {{ .payload.struct_reference }} {{ else }} () {{ end }}) { //context instead of client - let subject = config::get_env().get("{{ (index $channel 1).unique_id }}_SUBJECT").unwrap().clone(); + let subject = get_env().get("{{ (index $channel 1).unique_id }}_SUBJECT").unwrap().clone(); {{ if .payload }} let payload = match serde_json::to_string(&payload) { Ok(payload) => payload, @@ -111,7 +111,7 @@ use std::time; {{ else }} {{ range (index . 1).messages }} pub async fn producer_{{ (index $channel 1).unique_id }}(client: &Client, payload: {{ if .payload }} {{.payload.struct_reference}} {{else}} () {{end}}) { - let subject = config::get_env().get("{{ (index $channel 1).unique_id }}_SUBJECT").unwrap().clone(); + let subject = get_env().get("{{ (index $channel 1).unique_id }}_SUBJECT").unwrap().clone(); {{ if .payload }} let payload = match serde_json::to_string(&payload) { Ok(payload) => payload, From 77eafd5df804d6851da46256aa1db26fba6cd5fc Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Sun, 2 Jul 2023 14:34:06 +0200 Subject: [PATCH 11/14] fix --- templates/src/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/src/handler.go b/templates/src/handler.go index ada52bb..ed0974a 100644 --- a/templates/src/handler.go +++ b/templates/src/handler.go @@ -1,6 +1,6 @@ use async_nats::{Client, Message, jetstream}; use async_nats::jetstream::Context; -use crate::{publish_message,stream_publish_message,model::*, ,config::*}; +use crate::{publish_message,stream_publish_message,model::*,config::*}; use std::time; From dbd507942f24beae3323d77eac351f9ada625eab Mon Sep 17 00:00:00 2001 From: Aro Date: Sun, 2 Jul 2023 18:44:07 +0200 Subject: [PATCH 12/14] added simple cli to microservice --- example/specs/basic_queues_and_stream.yaml | 10 +- .../basic_queues_and_stream_messages.yaml | 110 ++++++++++++++++++ src/generator/common.rs | 8 +- src/main.rs | 2 + templates/.env | 2 + templates/dependencies.toml | 2 + templates/src/cli.go | 43 +++++++ templates/src/handler.go | 2 +- templates/src/main.go | 5 + 9 files changed, 181 insertions(+), 3 deletions(-) create mode 100644 example/specs/basic_queues_and_stream_messages.yaml create mode 100644 templates/src/cli.go diff --git a/example/specs/basic_queues_and_stream.yaml b/example/specs/basic_queues_and_stream.yaml index 55d376e..b5f7144 100644 --- a/example/specs/basic_queues_and_stream.yaml +++ b/example/specs/basic_queues_and_stream.yaml @@ -1,6 +1,6 @@ asyncapi: 2.1.0 info: - title: queque_and_stream_api + title: queue_and_stream_api version: 1.0.0 servers: production: @@ -53,6 +53,10 @@ channels: properties: userBought: type: string + quantity: + type: number + timestamp: + type: string publish: bindings: nats: @@ -65,6 +69,10 @@ channels: properties: userBought: type: string + quantity: + type: number + credit: + type: boolean user/sell: subscribe: operationId: userSold diff --git a/example/specs/basic_queues_and_stream_messages.yaml b/example/specs/basic_queues_and_stream_messages.yaml new file mode 100644 index 0000000..f4a3349 --- /dev/null +++ b/example/specs/basic_queues_and_stream_messages.yaml @@ -0,0 +1,110 @@ +asyncapi: 2.1.0 +info: + title: queue_and_stream_api + version: 1.0.0 +servers: + production: + url: demo.nats.io + protocol: nats +channels: + user/signedup: + subscribe: + bindings: + nats: + x-streamname: testStream + operationId: onUserSignup + summary: User signup notification + message: + payload: + type: object + properties: + userSingnedUp: + type: string + userID: + type: number + timestamp: + type: string + publish: + bindings: + nats: + x-streamname: testStream + operationId: userSignedUp + summary: send welcome email to user + message: + payload: + type: object + properties: + userSingnedUp: + type: string + userID: + type: number + timestamp: + type: string + user/buy: + subscribe: + bindings: + nats: + queue: MyQueue + operationId: userBought + summary: User bought something + message: + payload: + type: object + properties: + userBought: + type: string + quantity: + type: number + timestamp: + type: string + examples: + - name: simpleExample + payload: + userBought: "something" + quantity: 1 + timestamp: "2020-01-01T00:00:00Z" + publish: + bindings: + nats: + queue: MyQueue + operationId: onUserBought + summary: send email to user + message: + payload: + type: object + properties: + userBought: + type: string + quantity: + type: number + credit: + type: boolean + examples: + - name: secondExample + payload: + userBought: "something else" + quantity: 3 + credit: true + user/sell: + subscribe: + operationId: userSold + summary: User sold something + message: + payload: + type: object + properties: + soldItem: + type: string + timestamp: + type: string + publish: + operationId: onUserSold + summary: send email to user + message: + payload: + type: object + properties: + soldItem: + type: string + timestamp: + type: string \ No newline at end of file diff --git a/src/generator/common.rs b/src/generator/common.rs index 3a4fb1e..d861acc 100644 --- a/src/generator/common.rs +++ b/src/generator/common.rs @@ -129,13 +129,19 @@ pub fn template_render_write( base_template .parse(&template) .expect("failed to parse template"); - let render = match base_template.render(&Context::from(context_ref.into())) { + let mut render = match base_template.render(&Context::from(context_ref.into())) { Ok(render) => render, Err(e) => { eprintln!("❌ Error rendering template: {}", e); std::process::exit(1); } }; + if output_path.ends_with(".env") { + let mut lines: Vec<&str> = render.split("\n").collect(); + lines.retain(|&x| x.trim() != ""); + render = lines.join("\n"); + } + match utils::write_to_path_create_dir(&render, output_path) { Ok(_) => (), Err(e) => { diff --git a/src/main.rs b/src/main.rs index bfb8645..a99a4a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,7 @@ fn main() { std::process::exit(1); } }; + let title: &str = match &args.title { Some(t) => t, None => &spec.info.title, @@ -61,6 +62,7 @@ fn main() { &[ "src/main.go", "src/handler.go", + "src/cli.go", "Readme.md", ".env", "src/utils/mod.go", diff --git a/templates/.env b/templates/.env index 8b8a515..b236303 100644 --- a/templates/.env +++ b/templates/.env @@ -7,6 +7,7 @@ OPA_RULES= "path/to/admin/policy" ################Channel wise Config################ {{ range .subscribe_channels }} +################{{ (index . 1).unique_id }}################ {{ if (index . 1).original_operation.bindings }} {{ if (index . 1).original_operation.bindings.nats.queue }} {{ (index . 1).unique_id}}_QUEUE = "{{ (index . 1).original_operation.bindings.nats.queue}}" @@ -18,6 +19,7 @@ OPA_RULES= "path/to/admin/policy" {{ end }} {{ range .publish_channels }} +################{{ (index . 1).unique_id }}################ {{ if (index . 1).original_operation.bindings }} {{ if (index . 1).original_operation.bindings.nats.queue }} {{ (index . 1).unique_id}}_QUEUE = "{{ (index . 1).original_operation.bindings.nats.queue}}" diff --git a/templates/dependencies.toml b/templates/dependencies.toml index bf991d2..de2d807 100644 --- a/templates/dependencies.toml +++ b/templates/dependencies.toml @@ -4,3 +4,5 @@ serde = "1.0.164" serde_json = "1.0.97" tokio = { version = "1.28.2", features = ["full"] } dotenv = "0.15.0" +clap = {version = "4.3.0", features = ["derive"]} + diff --git a/templates/src/cli.go b/templates/src/cli.go new file mode 100644 index 0000000..f6f4bde --- /dev/null +++ b/templates/src/cli.go @@ -0,0 +1,43 @@ +use clap::Parser; +use crate::{model::*, utils::*}; +use async_nats::jetstream::Context; +use async_nats::{jetstream, Client, Message}; + +/// specify Messages to send using your new Microservice! +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +pub struct Args { + ///Command to either send all example messages specified in the spec or a single custom message + /// use with --c all to send all example messages (potential custom messages will be ignored) + /// use with --c destination/channel -message {myMessageJson} to send a custom message to a specific destination/channel + #[arg(short, long, default_value = "" )] + pub command: String, + + ///specify the message according to the specified message schema + #[arg(short, long, default_value="" )] + pub message: String, +} + + + +pub async fn send_all_messages(client: &Client)-> Result<(), async_nats::Error>{ + //TODO: modify this template to iterate over .subscribe channels so that they are sent to their respective channels + Ok(()) + +} + +pub async fn handle_cli(client: &Client, command: &String, message: &String)-> Result<(), async_nats::Error> { + match command.as_str(){ + "all" => { + send_all_messages(&client).await?; + }, + "" => { + (); + }, + _ => { + client.publish(command.into(), message.to_owned().into()).await?; + println!("Sent message {:?} to {}",message, command); + } + } + Ok(()) +} \ No newline at end of file diff --git a/templates/src/handler.go b/templates/src/handler.go index ada52bb..7d60c0b 100644 --- a/templates/src/handler.go +++ b/templates/src/handler.go @@ -1,6 +1,6 @@ use async_nats::{Client, Message, jetstream}; use async_nats::jetstream::Context; -use crate::{publish_message,stream_publish_message,model::*, ,config::*}; +use crate::{publish_message,stream_publish_message,model::* ,config::*}; use std::time; diff --git a/templates/src/main.go b/templates/src/main.go index d1f08b2..39fd979 100644 --- a/templates/src/main.go +++ b/templates/src/main.go @@ -1,6 +1,9 @@ mod handler; mod model; mod utils; +mod cli; +use clap::Parser; +use crate::cli::*; use utils::*; use crate::handler::*; use async_nats::jetstream::{self}; @@ -12,8 +15,10 @@ mod config; #[tokio::main] async fn main() -> Result<(), async_nats::Error> { let env: HashMap = config::get_env(); + let args = cli::Args::parse(); let client = async_nats::connect(env.get("SERVER_URL").unwrap()).await?; + handle_cli(&client, &args.command, &args.message).await?; {{ range .publish_channels }} {{ if (index . 1).original_operation.bindings }} From d71801c33c539b0c12f5f76329dcdfb1fa196706 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Mon, 3 Jul 2023 18:13:28 +0200 Subject: [PATCH 13/14] clippy --- example/demo/automatic_lighting.yaml | 1 - src/generator/common.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/example/demo/automatic_lighting.yaml b/example/demo/automatic_lighting.yaml index c9e741b..050d784 100644 --- a/example/demo/automatic_lighting.yaml +++ b/example/demo/automatic_lighting.yaml @@ -6,7 +6,6 @@ info: The Smartylighting Streetlights API allows you to remotely manage the city lights. ### Check out its awesome features: * Turn a specific streetlight on/off 🌃 - * Dim a specific streetlight 😎 * Receive real-time information about environmental lighting conditions 📈 servers: dev: diff --git a/src/generator/common.rs b/src/generator/common.rs index d861acc..827b8f7 100644 --- a/src/generator/common.rs +++ b/src/generator/common.rs @@ -137,7 +137,7 @@ pub fn template_render_write( } }; if output_path.ends_with(".env") { - let mut lines: Vec<&str> = render.split("\n").collect(); + let mut lines: Vec<&str> = render.split('\n').collect(); lines.retain(|&x| x.trim() != ""); render = lines.join("\n"); } From c44ec46a7c251e7332c02dadee0ceb93e32c5d48 Mon Sep 17 00:00:00 2001 From: Niclas Doepner Date: Mon, 3 Jul 2023 18:42:11 +0200 Subject: [PATCH 14/14] fixed streams compiling --- templates/src/handler.go | 4 ++-- templates/src/utils/streams.go | 30 +++++++++++++++++++----------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/templates/src/handler.go b/templates/src/handler.go index ed0974a..9fd8dd8 100644 --- a/templates/src/handler.go +++ b/templates/src/handler.go @@ -102,9 +102,9 @@ use std::time; return; } }; - publish_message(client, &subject, &payload).await; + stream_publish_message(context_stream, &subject, &payload).await; {{else}} - publish_message(client, &subject, &"").await; + stream_publish_message(context_stream, &subject, &"").await; {{end}} } {{end}} diff --git a/templates/src/utils/streams.go b/templates/src/utils/streams.go index 381cb8d..c6a4ad2 100644 --- a/templates/src/utils/streams.go +++ b/templates/src/utils/streams.go @@ -1,4 +1,5 @@ use async_nats::jetstream::{self, Context}; +use async_nats::Client; use async_nats::jetstream::consumer::{pull::{Config}, Consumer}; use std::time::Duration; use futures::StreamExt; @@ -13,17 +14,24 @@ pub async fn stream_publish_message(client: &Context, channel: &str, payload: &s } -pub async fn stream_listen_for_message(sub: &Consumer, handler: impl Fn(jetstream::Message)) -> Result<(), async_nats::Error>{ - loop{ - tokio::time::sleep(Duration::from_millis(1000)).await; - let mut messages = sub.messages().await?.take(10); - while let Some(message) = messages.next().await { - let message = message?; - handler(message); - println!("Message received by Subscriber: {:?}", sub.cached_info().name); // if you show sub its a mess, is now a Context - } - } - Ok(()) +pub async fn stream_listen_for_message( + sub: &Consumer, + handler: impl Fn(jetstream::Message, &Client), + client: &Client +) -> Result<(), async_nats::Error> { + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + let mut messages = sub.messages().await?.take(10); + while let Some(message) = messages.next().await { + let message = message?; + handler(message, client); + println!( + "Message received by Subscriber: {:?}", + sub.cached_info().name + ); // if you show sub its a mess, is now a Context + } + } + Ok(()) }