Skip to content

Commit

Permalink
chore: Fix lint warnings across the board (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Jun 3, 2022
1 parent 5153f3c commit 277bb19
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 61 deletions.
2 changes: 1 addition & 1 deletion book/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@
- [Intersect Options](./advanced/intersect_options.md)
- [Guides](./guides/README.md)
- [Cardano => Kafka](./guides/cardano_2_kafka.md)
- [Custom networks](./guides/connecting_to_custom_networks.md)
- [Custom networks](./guides/connecting_to_custom_networks.md)
28 changes: 14 additions & 14 deletions book/src/guides/connecting_to_custom_networks.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ adahandle_policy = String

Some details on the cofigurataion:

| Name | DataType | Description |
| :--- | :--- | :--- |
| byron_epoch_length | u32 | .... |
| byron_slot_length | u32 | .... |
| byron_known_slot | u32 | .... |
| byron_known_hash | String | .... |
| byron_known_time | u64 | .... |
| shelley_epoch_length | u32 | .... |
| shelley_slot_length | u32 | .... |
| shelley_known_slot | u32 | .... |
| shelley_known_hash | String | .... |
| shelley_known_time | u64 | .... |
| address_hrp | String | .... |
| adahandle_policy | String | Minting policy of AdaHandle on the network. |
| Name | DataType | Description |
| :------------------- | :------- | :------------------------------------------ |
| byron_epoch_length | u32 | .... |
| byron_slot_length | u32 | .... |
| byron_known_slot | u32 | .... |
| byron_known_hash | String | .... |
| byron_known_time | u64 | .... |
| shelley_epoch_length | u32 | .... |
| shelley_slot_length | u32 | .... |
| shelley_known_slot | u32 | .... |
| shelley_known_hash | String | .... |
| shelley_known_time | u64 | .... |
| address_hrp | String | .... |
| adahandle_policy | String | Minting policy of AdaHandle on the network. |
2 changes: 1 addition & 1 deletion book/src/sinks/redis_streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ stream_strategy = "ByEventType"
- `type`: the literal value `Redis`.
- `redis_server`: the redis server in the format `redis://[<username>][:<password>]@<hostname>[:port][/<db>]`
- `stream_name` : the name of the redis stream for StreamStrategy `None`, default is "oura" if not specified
- `stream_strategy` : `None` or `ByEventType`
- `stream_strategy` : `None` or `ByEventType`
5 changes: 1 addition & 4 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,11 @@ enum Sink {
#[cfg(feature = "aws")]
AwsS3(AwsS3Config),


#[cfg(feature = "redissink")]
Redis(RedisConfig),

#[cfg(feature = "gcp")]
GcpPubSub(GcpPubSubConfig),

}

fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> BootstrapResult {
Expand Down Expand Up @@ -165,7 +163,6 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> Boot

#[cfg(feature = "gcp")]
Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input),

}
}

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ pub mod aws_s3;
pub mod redis;

#[cfg(feature = "gcp")]
pub mod gcp_pubsub;
pub mod gcp_pubsub;
2 changes: 1 addition & 1 deletion src/sinks/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod run;
mod setup;
pub use setup::*;
pub use setup::*;
59 changes: 32 additions & 27 deletions src/sinks/redis/run.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
#![allow(unused_variables)]
use std::sync::Arc;
use super::StreamStrategy;
use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error};
use serde::Serialize;
use serde_json::{json};
use crate::{pipelining::StageReceiver, utils::Utils, Error, model::Event};
use super::{StreamStrategy};
use serde_json::json;
use std::sync::Arc;

#[derive(Serialize)]
pub struct RedisRecord {
pub event: Event,
pub event: Event,
pub key: String,
}

impl From<Event> for RedisRecord {
fn from(event: Event) -> Self {
let key = key(&event);
RedisRecord {
event,
key,
}
RedisRecord { event, key }
}
}

fn key(event : &Event) -> String {
fn key(event: &Event) -> String {
if let Some(fingerprint) = &event.fingerprint {
fingerprint.clone()
} else {
Expand All @@ -30,26 +27,34 @@ fn key(event : &Event) -> String {
}

pub fn producer_loop(
input : StageReceiver,
utils : Arc<Utils>,
conn : &mut redis::Connection,
stream_strategy : StreamStrategy,
redis_stream : String,
input: StageReceiver,
utils: Arc<Utils>,
conn: &mut redis::Connection,
stream_strategy: StreamStrategy,
redis_stream: String,
) -> Result<(), Error> {
for event in input.iter() {
utils.track_sink_progress(&event);
let payload = RedisRecord::from(event);
let stream : String;
match stream_strategy {
StreamStrategy::ByEventType => {
stream = payload.event.data.clone().to_string().to_lowercase();
}
_ => {
stream = redis_stream.clone();
}
}
log::debug!("Stream: {:?}, Key: {:?}, Event: {:?}", stream, payload.key, payload.event);
let _ : () = redis::cmd("XADD").arg(stream).arg("*").arg(&[(payload.key,json!(payload.event).to_string())]).query(conn)?;

let stream = match stream_strategy {
StreamStrategy::ByEventType => payload.event.data.clone().to_string().to_lowercase(),
_ => redis_stream.clone(),
};

log::debug!(
"Stream: {:?}, Key: {:?}, Event: {:?}",
stream,
payload.key,
payload.event
);

let _: () = redis::cmd("XADD")
.arg(stream)
.arg("*")
.arg(&[(payload.key, json!(payload.event).to_string())])
.query(conn)?;
}

Ok(())
}
}
31 changes: 19 additions & 12 deletions src/sinks/redis/setup.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use redis::{Client};
use redis::Client;
use serde::Deserialize;

use crate::{
Expand All @@ -11,32 +11,39 @@ use super::run::*;
#[derive(Debug, Clone, Deserialize)]
pub enum StreamStrategy {
ByEventType,
None
}
None,
}

#[derive(Debug, Deserialize)]
pub struct Config {
pub redis_server : String,
pub stream_strategy : Option<StreamStrategy>,
pub stream_name : Option<String>,
pub redis_server: String,
pub stream_strategy: Option<StreamStrategy>,
pub stream_name: Option<String>,
}

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let client = Client::open(self.inner.redis_server.clone())?;
let mut connection = client.get_connection()?;
log::debug!("Connected to Redis Database!");

let stream_strategy = match self.inner.stream_strategy.clone() {
Some(strategy) => {
strategy
},
_ => StreamStrategy::None
Some(strategy) => strategy,
_ => StreamStrategy::None,
};
let redis_stream = self.inner.stream_name.clone().unwrap_or("oura".to_string());

let redis_stream = self
.inner
.stream_name
.clone()
.unwrap_or_else(|| "oura".to_string());

let utils = self.utils.clone();
let handle = std::thread::spawn(move || {
producer_loop(input, utils, &mut connection, stream_strategy, redis_stream).expect("redis sink loop failed");
producer_loop(input, utils, &mut connection, stream_strategy, redis_stream)
.expect("redis sink loop failed");
});

Ok(handle)
}
}

0 comments on commit 277bb19

Please sign in to comment.