diff --git a/example/demo/automatic_lighting.yaml b/example/demo/automatic_lighting.yaml new file mode 100644 index 0000000..050d784 --- /dev/null +++ b/example/demo/automatic_lighting.yaml @@ -0,0 +1,67 @@ +asyncapi: '2.6.0' +info: + title: automatic lighting + version: '1.0.0' + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + ### Check out its awesome features: + * Turn a specific streetlight on/off 🌃 + * Receive real-time information about environmental lighting conditions 📈 +servers: + dev: + url: localhost:4022 + protocol: nats + description: Nats message broker +channels: + lightMeasured: + description: The topic on which measured values may be produced and consumed. + publish: + summary: Inform about environmental lighting conditions of a particular streetlight. + operationId: receiveLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + turnOnStreetlight: + subscribe: + operationId: turnOn + message: + $ref: '#/components/messages/turnOnOff' + turnOffStreetlight: + subscribe: + operationId: turnOff + message: + $ref: '#/components/messages/turnOnOff' +components: + messages: + lightMeasured: + name: lightMeasured + title: Light measured + summary: Inform about environmental lighting conditions of a particular streetlight. + contentType: application/json + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + turnOnOff: + name: turnOnOff + title: Turn on/off + summary: Command a particular streetlight to turn the lights on or off. + payload: + $ref: "#/components/schemas/turnOnOffPayload" + schemas: + lightMeasuredPayload: + type: object + properties: + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + turnOnOffPayload: + type: object + properties: + lightId: + type: string + command: + type: string + enum: + - on + - off + description: Whether to turn on or off the light. + diff --git a/example/specs/basic_queues_and_stream.yaml b/example/specs/basic_queues_and_stream.yaml index fa8cef9..b5f7144 100644 --- a/example/specs/basic_queues_and_stream.yaml +++ b/example/specs/basic_queues_and_stream.yaml @@ -1,6 +1,6 @@ asyncapi: 2.1.0 info: - title: My_API + title: queue_and_stream_api version: 1.0.0 servers: production: @@ -11,7 +11,7 @@ channels: subscribe: bindings: nats: - streamname: testStream + x-streamname: testStream operationId: onUserSignup summary: User signup notification message: @@ -27,7 +27,7 @@ channels: publish: bindings: nats: - streamname: testStream + x-streamname: testStream operationId: userSignedUp summary: send welcome email to user message: @@ -53,6 +53,10 @@ channels: properties: userBought: type: string + quantity: + type: number + timestamp: + type: string publish: bindings: nats: @@ -65,6 +69,10 @@ channels: properties: userBought: type: string + quantity: + type: number + credit: + type: boolean user/sell: subscribe: operationId: userSold diff --git a/example/specs/basic_queues_and_stream_messages.yaml b/example/specs/basic_queues_and_stream_messages.yaml new file mode 100644 index 0000000..f4a3349 --- /dev/null +++ b/example/specs/basic_queues_and_stream_messages.yaml @@ -0,0 +1,110 @@ +asyncapi: 2.1.0 +info: + title: queue_and_stream_api + version: 1.0.0 +servers: + production: + url: demo.nats.io + protocol: nats +channels: + user/signedup: + subscribe: + bindings: + nats: + x-streamname: testStream + operationId: onUserSignup + summary: User signup notification + message: + payload: + type: object + properties: + userSingnedUp: + type: string + userID: + type: number + timestamp: + type: string + publish: + bindings: + nats: + x-streamname: testStream + operationId: userSignedUp + summary: send welcome email to user + message: + payload: + type: object + properties: + userSingnedUp: + type: string + userID: + type: number + timestamp: + type: string + user/buy: + subscribe: + bindings: + nats: + queue: MyQueue + operationId: userBought + summary: User bought something + message: + payload: + type: object + properties: + userBought: + type: string + quantity: + type: number + timestamp: + type: string + examples: + - name: simpleExample + payload: + userBought: "something" + quantity: 1 + timestamp: "2020-01-01T00:00:00Z" + publish: + bindings: + nats: + queue: MyQueue + operationId: onUserBought + summary: send email to user + message: + payload: + type: object + properties: + userBought: + type: string + quantity: + type: number + credit: + type: boolean + examples: + - name: secondExample + payload: + userBought: "something else" + quantity: 3 + credit: true + user/sell: + subscribe: + operationId: userSold + summary: User sold something + message: + payload: + type: object + properties: + soldItem: + type: string + timestamp: + type: string + publish: + operationId: onUserSold + summary: send email to user + message: + payload: + type: object + properties: + soldItem: + type: string + timestamp: + type: string \ No newline at end of file diff --git a/src/asyncapi_model/operation_binding.rs b/src/asyncapi_model/operation_binding.rs index 2824066..79f9a18 100644 --- a/src/asyncapi_model/operation_binding.rs +++ b/src/asyncapi_model/operation_binding.rs @@ -247,6 +247,8 @@ pub struct NATSOperationBinding { /// The version of this binding. If omitted, "latest" MUST be assumed. #[serde(skip_serializing_if = "Option::is_none")] pub binding_version: Option, + + #[serde(rename(deserialize = "x-streamname", serialize = "streamname"))] pub streamname: Option, } diff --git a/src/generator/common.rs b/src/generator/common.rs index 7aa5bac..827b8f7 100644 --- a/src/generator/common.rs +++ b/src/generator/common.rs @@ -1,6 +1,6 @@ use gtmpl::Context; -use crate::utils; +use crate::{template_context::TemplateContext, utils}; use std::{ ffi::OsStr, fs, @@ -8,6 +8,29 @@ use std::{ process::{Command, Output}, vec, }; + +pub fn check_for_overwrite(output_path: &Path, project_title: &str) { + //check if project with name already exists, if yes ask for permission to overwrite + if output_path.exists() { + let warn_message = format!("A project with the name {} already exists in the current directory, do you want to overwrite the existing project? \nWARNING: This will delete all files in the directory and all applied. \nType 'y' to continue or anything else to exit.",project_title); + println!("{}", warn_message); + let mut input = String::new(); + match std::io::stdin().read_line(&mut input) { + Ok(_) => { + if input.trim() != "y" { + println!("Aborting generation..."); + std::process::exit(0); + } + std::fs::remove_dir_all(output_path).unwrap(); + } + Err(err) => { + println!("❌ Error reading input: {}", err); + std::process::exit(1); + } + } + } +} + /// initialize a cargo project in path pub fn cargo_init_project(path: impl AsRef) -> Output { Command::new("cargo") @@ -39,7 +62,6 @@ pub fn cargo_fix(path: &PathBuf) -> Output { } fn key_exists(args: &[gtmpl_value::Value]) -> Result { - println!("{:?}", args); if args.is_empty() { return Err(gtmpl_value::FuncError::AtLeastXArgs( "Need at least 1 arg for key exists".to_string(), @@ -94,7 +116,11 @@ pub fn template_render_write( let template = match fs::read_to_string(template_path) { Ok(template) => template, Err(e) => { - eprintln!("❌ Error reading template: {}", e); + eprintln!( + "❌ Error reading template {:#?}: {}", + template_path.to_str(), + e + ); std::process::exit(1); } }; @@ -103,13 +129,19 @@ pub fn template_render_write( base_template .parse(&template) .expect("failed to parse template"); - let render = match base_template.render(&Context::from(context_ref.into())) { + let mut render = match base_template.render(&Context::from(context_ref.into())) { Ok(render) => render, Err(e) => { eprintln!("❌ Error rendering template: {}", e); std::process::exit(1); } }; + if output_path.ends_with(".env") { + let mut lines: Vec<&str> = render.split('\n').collect(); + lines.retain(|&x| x.trim() != ""); + render = lines.join("\n"); + } + match utils::write_to_path_create_dir(&render, output_path) { Ok(_) => (), Err(e) => { @@ -119,6 +151,29 @@ pub fn template_render_write( } } +pub fn write_multiple_templates( + template_path: &Path, + context_ref: &TemplateContext, + output_path: &Path, + endings: &[&str], +) { + for ending in endings { + if ending.ends_with(".go") { + template_render_write( + &template_path.join(ending), + context_ref, + &output_path.join(ending).with_extension("rs"), + ); + } else { + template_render_write( + &template_path.join(ending), + context_ref, + &output_path.join(ending), + ); + } + } +} + pub fn cargo_generate_rustdoc(path: &Path) { Command::new("cargo") .current_dir(path) diff --git a/src/generator/mod.rs b/src/generator/mod.rs index 7260d1b..88a5361 100644 --- a/src/generator/mod.rs +++ b/src/generator/mod.rs @@ -1,4 +1,7 @@ mod common; +mod model; pub use common::{ - cargo_fix, cargo_fmt, cargo_generate_rustdoc, cargo_init_project, template_render_write, + cargo_fix, cargo_fmt, cargo_generate_rustdoc, cargo_init_project, check_for_overwrite, + template_render_write, write_multiple_templates, }; +pub use model::generate_models_folder; diff --git a/src/generator/model.rs b/src/generator/model.rs new file mode 100644 index 0000000..fc77588 --- /dev/null +++ b/src/generator/model.rs @@ -0,0 +1,48 @@ +use super::template_render_write; +use crate::template_context::TemplateContext; +use crate::{parser::common::validate_identifier_string, utils::write_to_path_create_dir}; + +use std::path::Path; + +pub fn generate_models_folder( + async_config: &TemplateContext, + template_path: &Path, + output_path: &Path, +) { + async_config + .model + .message_models + .iter() + .for_each(|message_model| { + if !message_model.model_definition.is_empty() { + template_render_write( + &template_path.join("src/model.go"), + message_model.clone(), + &output_path.join(format!( + "src/model/{}.rs", + validate_identifier_string(&message_model.unique_id, false) + )), + ); + } + }); + + let imports = async_config + .model + .message_models + .iter() + .map(|message_model| { + if !message_model.model_definition.is_empty() { + format!( + "pub mod {}; \n pub use {}::*; \n", + validate_identifier_string(&message_model.unique_id, false), + validate_identifier_string(&message_model.unique_id, false) + ) + } else { + "".to_string() + } + }) + .collect::>() + .join("\n"); + + write_to_path_create_dir(&imports, &output_path.join("src/model/mod.rs")).unwrap(); +} diff --git a/src/main.rs b/src/main.rs index 770e582..a99a4a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,10 @@ mod utils; use crate::{ asyncapi_model::AsyncAPI, - generator::{cargo_fix, cargo_generate_rustdoc, template_render_write}, + generator::{ + cargo_fix, cargo_generate_rustdoc, check_for_overwrite, generate_models_folder, + write_multiple_templates, + }, utils::append_file_to_file, }; @@ -33,6 +36,7 @@ fn main() { std::process::exit(1); } }; + let title: &str = match &args.title { Some(t) => t, None => &spec.info.title, @@ -49,27 +53,26 @@ fn main() { } }; - template_render_write( - &template_path.join("main.go"), - &async_config, - &output_path.join("src/main.rs"), - ); - template_render_write( - &template_path.join("handler.go"), - &async_config, - &output_path.join("src/handler.rs"), - ); + check_for_overwrite(output_path, title); - template_render_write( - &template_path.join("model.go"), - &async_config, - &output_path.join("src/model.rs"), - ); - template_render_write( - &template_path.join("Readme.md"), + write_multiple_templates( + template_path, &async_config, - &output_path.join("Readme.md"), + output_path, + &[ + "src/main.go", + "src/handler.go", + "src/cli.go", + "Readme.md", + ".env", + "src/utils/mod.go", + "src/utils/streams.go", + "src/utils/common.go", + "src/config/mod.go", + ], ); + + generate_models_folder(&async_config, template_path, output_path); println!("🚀 File generation finished, adding dependencies..."); // make output a compilable project diff --git a/src/parser/json_schema_parser/types/rust_schema_representation.rs b/src/parser/json_schema_parser/types/rust_schema_representation.rs index 8a492e5..b78ec78 100644 --- a/src/parser/json_schema_parser/types/rust_schema_representation.rs +++ b/src/parser/json_schema_parser/types/rust_schema_representation.rs @@ -1,5 +1,6 @@ +use gtmpl_derive::Gtmpl; use serde::Serialize; -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Gtmpl)] pub struct RustSchemaRepresentation { // the unique identifier (e.g. UserSignupMessage) pub unique_id: String, diff --git a/templates/.env b/templates/.env new file mode 100644 index 0000000..b236303 --- /dev/null +++ b/templates/.env @@ -0,0 +1,34 @@ +################General Config################ + +SERVICE_PORT = "http://localhost:8080" +SERVER_URL = "{{ .server.url }}" +LOG_LEVEL = "DEBUG" +OPA_RULES= "path/to/admin/policy" + +################Channel wise Config################ +{{ range .subscribe_channels }} +################{{ (index . 1).unique_id }}################ + {{ if (index . 1).original_operation.bindings }} + {{ if (index . 1).original_operation.bindings.nats.queue }} +{{ (index . 1).unique_id}}_QUEUE = "{{ (index . 1).original_operation.bindings.nats.queue}}" + {{else}} +{{ (index . 1).unique_id}}_STREAM = "{{ (index . 1).original_operation.bindings.nats.streamname}}" + {{ end }} + {{ end }} +{{ (index . 1).unique_id }}_SUBJECT = "{{ (index . 0) }}" +{{ end }} + +{{ range .publish_channels }} +################{{ (index . 1).unique_id }}################ + {{ if (index . 1).original_operation.bindings }} + {{ if (index . 1).original_operation.bindings.nats.queue }} +{{ (index . 1).unique_id}}_QUEUE = "{{ (index . 1).original_operation.bindings.nats.queue}}" + {{else}} +{{ (index . 1).unique_id}}_STREAM = "{{ (index . 1).original_operation.bindings.nats.streamname}}" + {{ end }} + {{ end }} +{{ (index . 1).unique_id }}_SUBJECT = "{{ (index . 0) }}" +{{ end }} + + + diff --git a/templates/dependencies.toml b/templates/dependencies.toml index a7581e9..de2d807 100644 --- a/templates/dependencies.toml +++ b/templates/dependencies.toml @@ -3,3 +3,6 @@ futures = "0.3.28" serde = "1.0.164" serde_json = "1.0.97" tokio = { version = "1.28.2", features = ["full"] } +dotenv = "0.15.0" +clap = {version = "4.3.0", features = ["derive"]} + diff --git a/templates/example.go b/templates/example.go deleted file mode 100644 index 5d1899f..0000000 --- a/templates/example.go +++ /dev/null @@ -1,30 +0,0 @@ -use async_nats::{Client, Subscriber}; -use futures::StreamExt; - -async fn listen_for_message(sub1: &mut Subscriber) { - while let Some(message) = sub1.next().await { - println!("Received message {:#?}", message); - } -} - -async fn publish_messages(client: &Client, amount: i32, channel: &str, data: &'static str) { - for _ in 0..amount { - client.publish(channel.into(), data.into()).await.unwrap(); - } -} - -#[tokio::main] -async fn main() -> Result<(), async_nats::Error> { - let client = async_nats::connect("{{ .server.url }}").await?; - // declare subscribers - - // subscribe - tokio::select! { - {{range .subscribe_channels}} - _=listen_for_message(&mut {{ (index . 1).operationId }}) => {} - {{end}} - } - println!("fin"); - - Ok(()) -} diff --git a/templates/main.go b/templates/main.go deleted file mode 100644 index 0e10e82..0000000 --- a/templates/main.go +++ /dev/null @@ -1,128 +0,0 @@ -mod handler; -mod model; -use async_nats::{Client, Message, Subscriber}; -use async_nats::jetstream::{self, Context, stream}; -use async_nats::jetstream::consumer::{pull::{self, Config}, Consumer}; -use std::time::Duration; -use futures::StreamExt; - -use crate::handler::*; - -pub struct Producer {} - -async fn listen_for_message(sub: &mut Subscriber, handler: impl Fn(Message)) { - while let Some(message) = sub.next().await { - handler(message); - println!("Message received by Subscriber: {:?}", sub); - } -} -async fn publish_message(client: &Client, channel: &str, payload: &str) { - let owned_payload = payload.to_owned().into(); // Convert to Bytes - client - .publish(channel.to_string(), owned_payload) - .await - .unwrap(); - println!("sent"); -} - -pub async fn get_consumer(jetstream: &Context, stream_name: &str) -> Result, async_nats::Error>{ - - let stream = jetstream.get_or_create_stream(jetstream::stream::Config { - name: stream_name.to_string(), - ..Default::default() - }).await?; - let consumer = stream.get_or_create_consumer("consumer", Config { - durable_name: Some("consumer".to_string()), - ..Default::default() - }).await?; - return Ok(consumer); -} - -{{ range .subscribe_channels }} - {{ if (index . 1).original_operation.bindings }} - {{if (index . 1).original_operation.bindings.nats}} - {{ if (index . 1).original_operation.bindings.nats.streamname }} - async fn stream_publish_message(client: &Context, channel: &str, payload: &str) { - let owned_payload = payload.to_owned().into(); // Convert to Bytes - client - .publish(channel.to_string(), owned_payload) - .await - .unwrap(); - println!("sent"); - } - {{end}} - {{end}} - {{end}} -{{end}} //supports only one stream, otherwise no unique function name(one function is enough for any number of streams, but you have to check if there is one) - - -{{range .publish_channels}} - {{ if (index . 1).original_operation.bindings }} - {{ if (index . 1).original_operation.bindings.nats.streamname }} - async fn stream_listen_for_message(sub: &Consumer, handler: impl Fn(jetstream::Message)) -> Result<(), async_nats::Error>{ - loop{ - tokio::time::sleep(Duration::from_millis(1000)).await; - let mut messages = sub.messages().await?.take(10); - while let Some(message) = messages.next().await { - let message = message?; - handler(message); - println!("Message received by Subscriber: {:?}", sub.cached_info().name); // if you show sub its a mess, is now a Context - } - } - Ok(()) - } - {{end}} - {{end}} -{{end}} - -#[tokio::main] -async fn main() -> Result<(), async_nats::Error> { - let client = async_nats::connect("{{ .server.url }}").await?; - - {{ range .publish_channels }} - {{ if (index . 1).original_operation.bindings }} - {{ if (index . 1).original_operation.bindings.nats.queue }} - let mut {{ (index . 1).unique_id }} = client.queue_subscribe("{{ index . 0 }}".into(), "{{ (index . 1).original_operation.bindings.nats.queue }}".into()).await?; - {{ else }} - let clientcpy = client.clone(); - let context_jetstream = jetstream::new(clientcpy); - let {{ (index . 1).unique_id }} = "{{ (index . 1).original_operation.bindings.nats.streamname }}"; - let consumer = get_consumer(&context_jetstream, &{{ (index . 1).unique_id }}).await?; - {{end}} - {{ else }} - let mut {{ (index . 1).unique_id }} = client.subscribe("{{ index . 0 }}".into()).await?; - {{end}} - {{end}} - - //test(&client, "foo").await; - - tokio::join!( - {{ range .subscribe_channels }} - {{ if (index . 1).original_operation.bindings }} - {{if (index . 1).original_operation.bindings.nats.streamname}} - stream_producer_{{ (index . 1).unique_id }}(&context_jetstream, "{{ (index . 1).original_operation.bindings.nats.streamname }}"), - {{ else }} - producer_{{ (index . 1).unique_id }}(&client, "{{ index . 0 }}"), - {{ end }} - {{ else }} - producer_{{ (index . 1).unique_id }}(&client, "{{ index . 0 }}"), - {{ end }} - {{ end }} - {{ range .publish_channels }} - {{ if (index . 1).original_operation.bindings }} - {{if (index . 1).original_operation.bindings.nats.streamname}} - stream_listen_for_message(&consumer, stream_handler_{{ (index . 1).unique_id }}), - - {{ else }} - listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), - {{ end }} - - {{ else }} - listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}), - {{ end }} - {{ end }} - ); - - println!("fin"); - Ok(()) -} diff --git a/templates/model.go b/templates/model.go deleted file mode 100644 index 1ddabc8..0000000 --- a/templates/model.go +++ /dev/null @@ -1,7 +0,0 @@ -use serde::{Deserialize, Serialize}; - -// All models from the specification are defined here -{{ range .model.message_models }} - {{ .model_definition }} -{{ end }} - diff --git a/templates/pubsub.go b/templates/pubsub.go deleted file mode 100644 index 8bf0d0b..0000000 --- a/templates/pubsub.go +++ /dev/null @@ -1,21 +0,0 @@ -use futures::StreamExt; -use serde::{Deserialize, Serialize}; - - -{{ .schema }} - -#[tokio::main] -async fn main() -> Result<(), async_nats::Error> { - let client = async_nats::connect("{{ .server_url }}").await?; - let mut subscriber = client.subscribe("{{ .channel_name }}".into()).await?.take(10); - - for _ in 0..10 { - client.publish("{{ .channel_name }}".into(), "Hi mom".into()).await?; - } - - while let Some(message) = subscriber.next().await { - println!("Received message {:?}", message); - } - - Ok(()) -} diff --git a/templates/src/cli.go b/templates/src/cli.go new file mode 100644 index 0000000..f6f4bde --- /dev/null +++ b/templates/src/cli.go @@ -0,0 +1,43 @@ +use clap::Parser; +use crate::{model::*, utils::*}; +use async_nats::jetstream::Context; +use async_nats::{jetstream, Client, Message}; + +/// specify Messages to send using your new Microservice! +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +pub struct Args { + ///Command to either send all example messages specified in the spec or a single custom message + /// use with --c all to send all example messages (potential custom messages will be ignored) + /// use with --c destination/channel -message {myMessageJson} to send a custom message to a specific destination/channel + #[arg(short, long, default_value = "" )] + pub command: String, + + ///specify the message according to the specified message schema + #[arg(short, long, default_value="" )] + pub message: String, +} + + + +pub async fn send_all_messages(client: &Client)-> Result<(), async_nats::Error>{ + //TODO: modify this template to iterate over .subscribe channels so that they are sent to their respective channels + Ok(()) + +} + +pub async fn handle_cli(client: &Client, command: &String, message: &String)-> Result<(), async_nats::Error> { + match command.as_str(){ + "all" => { + send_all_messages(&client).await?; + }, + "" => { + (); + }, + _ => { + client.publish(command.into(), message.to_owned().into()).await?; + println!("Sent message {:?} to {}",message, command); + } + } + Ok(()) +} \ No newline at end of file diff --git a/templates/src/config/mod.go b/templates/src/config/mod.go new file mode 100644 index 0000000..13ab37d --- /dev/null +++ b/templates/src/config/mod.go @@ -0,0 +1,7 @@ +use dotenv::dotenv; +use std::{collections::HashMap, env}; + +pub fn get_env() -> HashMap { + dotenv().ok(); + env::vars().collect() +} \ No newline at end of file diff --git a/templates/handler.go b/templates/src/handler.go similarity index 63% rename from templates/handler.go rename to templates/src/handler.go index ca52fba..9fd8dd8 100644 --- a/templates/handler.go +++ b/templates/src/handler.go @@ -1,12 +1,6 @@ use async_nats::{Client, Message, jetstream}; use async_nats::jetstream::Context; -use crate::{publish_message,stream_publish_message, -{{ range .model.message_models }} - {{ if ne .model_definition "" }} - model::{{ .unique_id }}, - {{ end }} -{{ end }} -}; +use crate::{publish_message,stream_publish_message,model::*,config::*}; use std::time; @@ -23,7 +17,7 @@ use std::time; {{ $isStream := ((index . 1).original_operation.bindings.nats.streamname) }} {{end}} {{if $isStream}} - pub fn stream_handler_{{ (index . 1).unique_id }}(message: jetstream::Message) { + pub fn stream_handler_{{ (index . 1).unique_id }}(message: jetstream::Message, client: &Client) { {{ range (index . 1).messages }} {{ if .payload}} match serde_json::from_slice::<{{ .payload.struct_reference }}>(&message.message.payload.as_ref()) { @@ -51,13 +45,11 @@ use std::time; {{ end }} } {{else}} - pub fn handler_{{ (index . 1).unique_id }}(message: Message) { + pub async fn handler_{{ (index . 1).unique_id }}(message: Message, client: &Client) { {{ range (index . 1).messages }} {{ if .payload}} match serde_json::from_slice::<{{ .payload.struct_reference }}>(&message.payload.as_ref()) { Ok(deserialized_message) => { - println!("Received message {:#?}", deserialized_message); - // TODO: Replace this with your own handler code {{ if eq .payload.model_type "enum"}} match deserialized_message { {{$enumName := .payload.unique_id}} @@ -68,6 +60,9 @@ use std::time; } {{ end }} } + {{else}} + println!("Received message {:#?}", deserialized_message); + // TODO: Replace this with your own handler code {{ end }} }, Err(_) => { @@ -84,6 +79,7 @@ use std::time; {{ range .subscribe_channels }} {{ $isStream := false }} + {{ $channel := . }} /// Publish a message in the {{ (index . 1).unique_id }} channel /// Channel messages: @@ -95,22 +91,40 @@ use std::time; {{ end }} {{ if $isStream }} - pub async fn stream_producer_{{ (index . 1).unique_id }}(context_stream: &Context, channel: &str) { //context instead of client - // This is just an example producer, publishing a message every 2 seconds - // TODO: replace this with your own producer code - loop { - tokio::time::sleep(time::Duration::from_secs(2)).await; - stream_publish_message(context_stream, channel, "{\"test\":\"serialized\"}").await; - } - } + {{ range (index . 1).messages }} + pub async fn stream_producer_{{ (index $channel 1).unique_id }}(context_stream: &Context, payload : {{ if .payload}} {{ .payload.struct_reference }} {{ else }} () {{ end }}) { //context instead of client + let subject = get_env().get("{{ (index $channel 1).unique_id }}_SUBJECT").unwrap().clone(); + {{ if .payload }} + let payload = match serde_json::to_string(&payload) { + Ok(payload) => payload, + Err(_) => { + println!("Failed to serialize message payload: {{ .payload.struct_reference }}"); + return; + } + }; + stream_publish_message(context_stream, &subject, &payload).await; + {{else}} + stream_publish_message(context_stream, &subject, &"").await; + {{end}} + } + {{end}} {{ else }} - pub async fn producer_{{ (index . 1).unique_id }}(client: &Client, channel: &str) { - // This is just an example producer, publishing a message every 2 seconds - // TODO: replace this with your own producer code - loop { - tokio::time::sleep(time::Duration::from_secs(2)).await; - publish_message(client, channel, "{\"test\":\"serialized\"}").await; + {{ range (index . 1).messages }} + pub async fn producer_{{ (index $channel 1).unique_id }}(client: &Client, payload: {{ if .payload }} {{.payload.struct_reference}} {{else}} () {{end}}) { + let subject = get_env().get("{{ (index $channel 1).unique_id }}_SUBJECT").unwrap().clone(); + {{ if .payload }} + let payload = match serde_json::to_string(&payload) { + Ok(payload) => payload, + Err(_) => { + println!("Failed to serialize message payload: {{ .payload.struct_reference }}"); + return; + } + }; + publish_message(client, &subject, &payload).await; + {{else}} + publish_message(client, &subject, &"").await; + {{end}} } - } + {{ end }} {{ end }} -{{ end }} +{{ end }} diff --git a/templates/src/main.go b/templates/src/main.go new file mode 100644 index 0000000..39fd979 --- /dev/null +++ b/templates/src/main.go @@ -0,0 +1,62 @@ +mod handler; +mod model; +mod utils; +mod cli; +use clap::Parser; +use crate::cli::*; +use utils::*; +use crate::handler::*; +use async_nats::jetstream::{self}; +use std::{collections::HashMap}; +use dotenv::dotenv; +mod config; + + +#[tokio::main] +async fn main() -> Result<(), async_nats::Error> { + let env: HashMap = config::get_env(); + let args = cli::Args::parse(); + + let client = async_nats::connect(env.get("SERVER_URL").unwrap()).await?; + handle_cli(&client, &args.command, &args.message).await?; + + {{ range .publish_channels }} + {{ if (index . 1).original_operation.bindings }} + {{ if (index . 1).original_operation.bindings.nats.queue }} + let mut {{ (index . 1).unique_id }} = client.queue_subscribe(env.get("{{ (index . 1).unique_id}}_SUBJECT").unwrap().into(), + env.get("{{ (index . 1).unique_id}}_QUEUE").unwrap().into()).await?; + {{ else }} + let clientcpy = client.clone(); + let context_jetstream = jetstream::new(clientcpy); + let {{ (index . 1).unique_id }} = env.get("{{ (index . 1).unique_id }}_STREAM").unwrap(); + let consumer = get_consumer(&context_jetstream, &{{ (index . 1).unique_id }}).await?; + {{end}} + {{ else }} + let mut {{ (index . 1).unique_id }} = client.subscribe(env.get("{{ (index . 1).unique_id}}_SUBJECT").unwrap().into()).await?; + {{end}} + {{end}} + + + tokio::join!( + {{ range .subscribe_channels }} + {{ $isStream := false }} + {{if key_exists (index . 1) "original_operation" "bindings" "nats" "streamname"}} + {{ $isStream := ((index . 1).original_operation.bindings.nats.streamname) }} + {{end}} + {{ end }} + {{ range .publish_channels }} + {{ $isStream := false }} + {{if key_exists (index . 1) "original_operation" "bindings" "nats" "streamname"}} + {{ $isStream := ((index . 1).original_operation.bindings.nats.streamname) }} + {{end}} + {{if $isStream}} + stream_listen_for_message(&consumer, stream_handler_{{ (index . 1).unique_id }}, &client), + {{ else }} + listen_for_message(&mut {{ (index . 1).unique_id }}, handler_{{ (index . 1).unique_id }}, &client), + {{ end }} + {{ end }} + ); + + println!("fin"); + Ok(()) +} diff --git a/templates/src/model.go b/templates/src/model.go new file mode 100644 index 0000000..b08d168 --- /dev/null +++ b/templates/src/model.go @@ -0,0 +1,5 @@ +use serde::{Deserialize, Serialize}; +use super::*; + +{{ .model_definition }} + diff --git a/templates/src/utils/common.go b/templates/src/utils/common.go new file mode 100644 index 0000000..7b93693 --- /dev/null +++ b/templates/src/utils/common.go @@ -0,0 +1,23 @@ +use async_nats::{Client, Message, Subscriber}; +use futures::StreamExt; + +pub async fn listen_for_message<'a, F, Fut>(sub: &mut Subscriber, handler: F, client: &'a Client) +where + F: Fn(Message,&'a Client) -> Fut + 'a, + Fut: std::future::Future + 'a, +{ + while let Some(message) = sub.next().await { + handler(message, client).await; + println!("Message received by Subscriber: {:?}", sub); + } +} + +pub async fn publish_message(client: &Client, channel: &str, payload: &str) { + let owned_payload = payload.to_owned().into(); // Convert to Bytes + client + .publish(channel.to_string(), owned_payload) + .await + .unwrap(); + println!("sent"); +} + diff --git a/templates/src/utils/mod.go b/templates/src/utils/mod.go new file mode 100644 index 0000000..e16f31a --- /dev/null +++ b/templates/src/utils/mod.go @@ -0,0 +1,4 @@ +pub mod common; +pub use common::*; +pub mod streams; +pub use streams::*; \ No newline at end of file diff --git a/templates/src/utils/streams.go b/templates/src/utils/streams.go new file mode 100644 index 0000000..c6a4ad2 --- /dev/null +++ b/templates/src/utils/streams.go @@ -0,0 +1,49 @@ +use async_nats::jetstream::{self, Context}; +use async_nats::Client; +use async_nats::jetstream::consumer::{pull::{Config}, Consumer}; +use std::time::Duration; +use futures::StreamExt; + +pub async fn stream_publish_message(client: &Context, channel: &str, payload: &str) { + let owned_payload = payload.to_owned().into(); // Convert to Bytes + client + .publish(channel.to_string(), owned_payload) + .await + .unwrap(); + println!("sent"); +} + + +pub async fn stream_listen_for_message( + sub: &Consumer, + handler: impl Fn(jetstream::Message, &Client), + client: &Client +) -> Result<(), async_nats::Error> { + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + let mut messages = sub.messages().await?.take(10); + while let Some(message) = messages.next().await { + let message = message?; + handler(message, client); + println!( + "Message received by Subscriber: {:?}", + sub.cached_info().name + ); // if you show sub its a mess, is now a Context + } + } + Ok(()) +} + + +pub async fn get_consumer(jetstream: &Context, stream_name: &str) -> Result, async_nats::Error>{ + + let stream = jetstream.get_or_create_stream(jetstream::stream::Config { + name: stream_name.to_string(), + ..Default::default() + }).await?; + let consumer = stream.get_or_create_consumer("consumer", Config { + durable_name: Some("consumer".to_string()), + ..Default::default() + }).await?; + return Ok(consumer); +} \ No newline at end of file