Skip to content

Commit

Permalink
chore:
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed Jan 3, 2025
1 parent 532b5ec commit bde713c
Show file tree
Hide file tree
Showing 8 changed files with 369 additions and 245 deletions.
465 changes: 266 additions & 199 deletions src/tests.rs

Large diffs are not rendered by default.

39 changes: 14 additions & 25 deletions stem.spec
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

ُREAD: desktop books for quantum computing and neuroscience
READ: algo coding: gaming, quantum computing, codeforces, graph and nalgebra
TODOs:
0 -> loadbalancer for container services (Container{}) with weighted requests, deploy BPF object
1 -> uniqueDhtFileobject, worker!{event -> publish}, #[event] and OnionStreamer for Event supports stream (p2p and rmq), request response (p2p and rmq-rpc), kademlia, gossipsub:
startP2pSwarmEventLoop(), receiveP2pResponse(), receiveRpcResponse(), sendP2pRequest(), sendRpcRequest()
2 -> serverless stemlib based iot, opentable, khadangApp, exchange, stockbot, broker and game:
TASKS:
0 -> #[derive(Service(prot=http, host=0.0.0.0, port=2535))] pub struct OtpDto; gives the Otp, deploy(), routers() and stop() methods
1 -> complete impl OnionStream for Container{} and impl OnionStream for Event{} and #[event] proc macro for event varaints
2 -> serverless deployement of container objects and their services as BPF also impl different services for different dtos like tcp service for MinIoDriver dto
3 -> loadbalancer for container services (Container{}) with round robing and weighted round robin,
4 -> stream (p2p and rmq), request response (p2p and rmq-rpc), kademlia, gossipsub, wrtc, quic, tcp, ws, http2:
startP2pSwarmEventLoop(), receiveP2pResponse(), receiveRpcResponse(),
sendP2pRequest(), sendRpcRequest()
5 -> serverless stemlib based iot, opentable, khadangApp, exchange, stockbot, broker and game mmq:
cloudflare wasm worker wrangler with neuron actor cli for the p2p based Dex and Cex
infra: ci/cd(build,bumpV,push,pull) /> services: KycWorker, walletWroker(apiSigning), MarketWorker(MatchEngine), MainServer(ws,Http2), stemlib NeuronActor
OTC:
Expand All @@ -19,8 +23,10 @@ TODOs:
find the match with that order based on amount, quantity, side and the base/qoute
build atomic tx and execute the tx inside a lock and a light thread
neuron crypter based operations:
update atomic object in qrcode and zkp noir coding
impl ContractInterface for Event{} to do wallet ops
Hiding correct information between wrong ones and its combination with zkp
neuron ed25519 wallet to sign each message and verify in its handlers
neuron ed25519 wallet to sign each message and verify in all actor msg handlers
contract and wallet over zk
encrypt the neuron object instance using aes256 encryption
#[inject(ed25519WalletSecure)]
Expand All @@ -35,7 +41,7 @@ TODOs:
Cex broker order book and MatchEngineActorWorker using neuron actor rmq which contains all orders
Atomic orderTx in WalletServiceActorWorker and neuron actor
live orders with IPFS raft crypter graph concept through Ws, wrtc, tcp, udp, ed25519 noise
wrangler with raft, wasm, ws, wrtc, http2, p2p, tcp, udp, grpc, rpc, redis, sqlx, rmq, chan, spawn, cb(event)
serverless smart contract wrangler with raft, wasm, ws, wrtc, http2, p2p, tcp, udp, grpc, rpc, redis, sqlx, rmq, chan, spawn, cb(event)
streaming with rmq and p2p gossipsub kad + req-rep with rmq rpc and p2p req-res + main server with salvo http2 and ws
wait-for-it worker using stemlib actor: s1 must wait for s2 to be up to execute its codes (use it to test if a given TCP host/port are available); if it's up already execute the codes also handle notif signal and interval exec with timeout
custom error handler and log the error using logger neuron broadcaster
Expand Down Expand Up @@ -70,7 +76,7 @@ TODOs:
services must talk with each other based on their wallet and data signing
stemlib, lunatic, wrangler wasm actors, dyn stat dist, poly and dep injection
Actor: Vec<joinHandle>, interval executor, eventloop receiver, message passing for executing arbitrary tasks inside a thread of the actor, actor address
3 -> other features inside the stemlib
6 -> build onion based protocols for neuron stemlib:
SYNAPSE protocol network behavior features1: file sharing, vpn like tor, ton and v2ray, firewall, gateway like nginx and traefik
SYNAPSE protocol network behavior features2: loadbalancer, ingress listener like ngrok, reverse proxy and dns/cdn server, packet sniffer
▶ onion protocol with noise, tcp, quic, wrtc, ws, udp and p2p, os, codec like ffmpeg and Gstreamer (streaming over video using grpc, p2p, tcp using while let some)
Expand Down Expand Up @@ -114,23 +120,6 @@ TODOs:
▶ our VPS must detect the amount of CPU and RAM that every servers needs to get, without running the app
▶ our VPS must detect the number of instances of every servers needs to be run and the load balancing algorithm
bpf based proxy, firewall, vpns, packet sniffer and load balancer
Concepts:
dynamic dispatch use cases:
- used for coding polymorphism to support multiple types within a single type
- the implementor of the trait would be specified at runtime
- since traits are dynamically sized on the heap so the trait must be object safe trait
- methods of the trait on the instance will be called dynamically through vtable pointers
- used with Pin<Arc<dyn Trait>> and Pin<Box<dyn Trait>> for self-ref types like future traits
- used with Box or Arc for dynamic memory allocation for the pinned location (since Rust moves types inside the ram)
- used with Arc<dyn Trait> and Box<dyn Trait> for regular trait interfaces
- accessing multiple types through a single interface to register them as a service
- dependency injection, sdk writing like object storage and otp, testing, proxy design pattern
- example:
protobuf codes holds interfaces and contracts between server and clients
grpc has protobuf data codec the proto should be compiled into Rust codes and services
into trait interfaces then we could implement services for structs inside the Rust codes
which is the design pattern proxy and dependency injection that enables to call service
methods on struct instance allows us calling object methods directly through http2
https://shivangsnewsletter.com/p/why-doesnt-cloudflare-use-containers
https://www.youtube.com/watch?v=rht1vO2MBIg
https://medium.com/@harshiljani2002/building-stock-market-engine-from-scratch-in-rust-ii-0c7b5d8a60b6
Expand Down
24 changes: 24 additions & 0 deletions stemlib/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub struct Stage{

#[derive(Clone)]
pub struct Job{ // Job tree
pub id: String,
pub task: IoEvent, // an io task with the event instance
pub weight: u32,
pub executorId: std::thread::ThreadId,
Expand Down Expand Up @@ -315,6 +316,19 @@ pub struct Event{
pub offset: Arc<AtomicU64>, // the position of the event inside the brain network
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub enum ChannelType{
#[default]
Local,
Remote(String) // p2p, rmq, ws
}

#[derive(Clone)]
pub struct JobResult<T: Send + Sync + 'static>{
pub jobId: String,
pub data: T
}

#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub enum EventStatus{
#[default]
Expand Down Expand Up @@ -404,13 +418,23 @@ pub struct LocalFileDriver{
#[derive(Clone)]
pub struct WalletDto;

#[derive(Clone)]
pub struct Otp;

#[derive(Clone)]
pub struct WebHookHandler;

#[derive(Clone)]
pub struct RateLimiter;

#[derive(Clone)]
pub struct Container{
// Arc is a reference-counted smart pointer used for thread-safe shared ownership of data
// Arc makes the whole service field cloneable cause the container must be cloneable
// to return updated context when pushing new container into its vector
pub service: Arc<dyn Service>, // dependency injection through dynamic dispatching
pub id: String,
pub requests: Arc<Vec<salvo::Request>>,
// a service must have host and port
pub host: String,
pub port: u16
Expand Down
68 changes: 55 additions & 13 deletions stemlib/src/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,9 @@ impl<T: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync> ObjectStora
let data = self.clone();
let mut objId = Uuid::new_v4().to_string();
objId.hashMe();
let string = serde_json::to_string(&data).unwrap();
// T implements the Serialize and Deserialize traits
// so we can simply convert it into string
let string = serde_json::to_string(&data).unwrap(); // convert the data into string
let _: () = conn.set(&objId, &string).await.unwrap();

objId
Expand All @@ -788,9 +790,8 @@ impl<T: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync> ObjectStora
let mut conn = redisPool.get().await.unwrap();

let value: String = conn.get(key).await.unwrap();
let data = value.as_bytes();
data.to_vec()

let data = serde_json::to_vec(&value).unwrap(); // convert the loaded string back to bytes
data
}

/// check that either two objects are the same or not
Expand All @@ -800,24 +801,39 @@ impl<T: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync> ObjectStora

}

/// this would allows us to stream over an event like sending and receiving events and executing callbacks
/// this would allows us to stream over an event like sending and receiving events and executing callbacks on those events
/// supports stream (p2p and rmq), request response (p2p and rmq-rpc), kademlia, gossipsub:
impl OnionStream for Event{
type Model = Event;
type Channel = ChannelType;

/// a method to stream over various channels and execute callback based on those events
async fn on<R: std::future::Future<Output = ()> + Send + Sync + 'static,
F: Clone + Fn(Event, Option<StreamError>) -> R + Send + Sync + 'static>
(&mut self, streamer: &str, eventType: &str, callback: F) -> Self::Model {

// execute callback instead of directly caching and storing the received
// or sent events on redis or inside db , the process can be done inside
// the callback instead of handling it in here
(&mut self, streamer: &str, eventType: &str, callback: F) -> Self {

// execute callback in the background thread, it can be storing
// and caching the event on redis and db.

todo!()

}

}


impl OnionStream for Container{

type Channel = ChannelType;

async fn on<R: std::future::Future<Output = ()> + Send + Sync + 'static, // the io task
F: Clone + Fn(Event, Option<StreamError>) -> R + Send + Sync + 'static> // the callback with event and optional streaming error
(&mut self, streamer: &str, eventType: &str, callback: F) -> Self {

todo!()

}
}

// used for en(de)crypting data in form of string
impl Crypter for String{
fn decrypt(&mut self, secure_cell_config: &mut SecureCellConfig){
Expand Down Expand Up @@ -978,17 +994,18 @@ impl Container{
let host = &self.host;
let port = self.port;
let mut service = Arc::clone(&self.service);
service.startService(host, port);
service.startService(host, port); // calling the startService method of Service trait on the service dependency
}
}

// impl Service for dtos
impl Service for WalletDto{
fn startService(&self, host: &str, port: u16){
let host = host.to_string();
// start an http server for the WalletDto model
go!{
{
let router = routers::buildRouters();
let router = routers::buildWalletDtoRouters();
let acceptor = TcpListener::new(&format!("{}:{}", host, port)).bind().await;
Server::new(acceptor).serve(router).await;
}
Expand All @@ -998,12 +1015,36 @@ impl Service for WalletDto{

impl Service for MinIoDriver{
fn startService(&self, host: &str, port: u16) {
// we can start a ws server in here
// ...
}
}

impl Service for Otp{
fn startService(&self, host: &str, port: u16) {
// we can start a http server in here
// ...
}
}

impl Service for WebHookHandler{
fn startService(&self, host: &str, port: u16) {
// start a webhook handler server
// build its routers in here
// ...
}
}

impl Service for RateLimiter{
fn startService(&self, host: &str, port: u16) {
// we can start a tcp, quic, udp, http, ws server in here
// ...
}
}

impl Service for LocalFileDriver{
fn startService(&self, host: &str, port: u16) {
// we can start a tcp/quic/udp server in here
// ...
}
}
Expand Down Expand Up @@ -1173,6 +1214,7 @@ impl Worker{
impl Job {
pub fn new(task: IoEvent, parent: Option<Arc<Job>>) -> Arc<Self> {
Arc::new(Job {
id: Uuid::new_v4().to_string(),
task,
weight: 100,
executorId: std::thread::current().id(), // initially we've considered the current thead id for this
Expand Down
6 changes: 3 additions & 3 deletions stemlib/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ pub trait Crypter{
}

pub trait OnionStream{
type Model;
type Channel;
async fn on<R: std::future::Future<Output = ()> + Send + Sync + 'static,
F: Clone + Fn(Event, Option<StreamError>) -> R + Send + Sync + 'static>
(&mut self, streamer: &str, eventType: &str, callback: F) -> Self::Model;
(&mut self, streamer: &str, eventType: &str, callback: F) -> Self;
}

/// a distributed object storage interface supports object and instances and files (video, audio and image)
Expand All @@ -33,7 +33,7 @@ pub trait ObjectStorage{ // it can be any bytes io or &[u8], an encoded instance
async fn store(&mut self) -> String;
/// load the object from the storage as u8 bytes
async fn fetch(key: &str) -> Vec<u8>;
/// comapare the current checksum against the passed in file this is useful to detect steghided pictures and files
/// comapare the current checksum against the passed in object id, this is useful to detect steghided object
fn checksum(&mut self, objId: &str) -> bool;
}

Expand Down
6 changes: 3 additions & 3 deletions stemlib/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ pub struct WakeUp{

#[derive(Message, Clone, Debug)]
#[rtype(result = "()")]
pub struct Deploy{
}
pub struct Deploy;

#[derive(Clone, Debug)]
pub enum MsgType{
Serve,
Stop
Stop,
Event(Event)
}

#[derive(Message, Clone, Serialize, Deserialize, Debug, Default)]
Expand Down
2 changes: 1 addition & 1 deletion stemlib/src/routers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub async fn getAllEntitiesHandler(



pub fn buildRouters() -> Router{
pub fn buildWalletDtoRouters() -> Router{

let routers = Router::with_path("/report/")
.hoop(countCall)
Expand Down
4 changes: 3 additions & 1 deletion stemlib/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ pub struct Transaction{
hash: Option<String>, // sha256 ash of the transaction
tx_sig: Option<String>, // the signature result of signing the tx hash with private key, this will use to verify the tx along with the pubkey of the signer
signer: String, // the one who has signed the tx
payer: String, // the one who has paid for the gas fee
}

#[derive(Serialize, Deserialize, Clone, Default, Debug)]
Expand Down Expand Up @@ -164,7 +165,8 @@ impl Transaction{
treasury_type: TreasuryType::Credit,
hash: Some(String::from("")), // stringify the whole tx object then hash it
tx_sig: Some(String::from("")), // sign the the stringified_tx_object with prvkey
signer: String::from("") // the one who has signed with the prv key usually the server
signer: String::from(""), // the one who has signed with the prv key usually the server
payer: String::from("")
};

tx_data
Expand Down

0 comments on commit bde713c

Please sign in to comment.