Skip to content

Commit

Permalink
support multiple config filters, fix transaction filter (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Oct 11, 2023
1 parent 9a1fa11 commit 9d21d53
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 148 deletions.
88 changes: 55 additions & 33 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,22 @@ use {
};

/// Plugin config.
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[allow(dead_code)]
libpath: String,

/// Kafka config.
pub kafka: HashMap<String, String>,

/// Graceful shutdown timeout.
#[serde(default)]
pub shutdown_timeout_ms: u64,
/// Kafka topic to send account updates to.
#[serde(default)]
pub update_account_topic: String,
/// Kafka topic to send slot status updates to.
#[serde(default)]
pub slot_status_topic: String,
/// Kafka topic to send transaction to.
#[serde(default)]
pub transaction_topic: String,
/// List of programs to ignore.
#[serde(default)]
pub program_ignores: Vec<String>,
/// List of programs to include
#[serde(default)]
pub program_filters: Vec<String>,
// List of accounts to include
#[serde(default)]
pub account_filters: Vec<String>,
/// Publish all accounts on startup.
#[serde(default)]
pub publish_all_accounts: bool,
/// Wrap all event message in a single message type.
#[serde(default)]
pub wrap_messages: bool,

/// Accounts, transactions filters
pub filters: Vec<ConfigFilter>,

/// Prometheus endpoint.
#[serde(default)]
pub prometheus: Option<SocketAddr>,
Expand All @@ -67,16 +52,10 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
libpath: "".to_owned(),
kafka: HashMap::new(),
shutdown_timeout_ms: 30_000,
update_account_topic: "".to_owned(),
slot_status_topic: "".to_owned(),
transaction_topic: "".to_owned(),
program_ignores: Vec::new(),
program_filters: Vec::new(),
account_filters: Vec::new(),
publish_all_accounts: false,
wrap_messages: false,
filters: vec![],
prometheus: None,
}
}
Expand Down Expand Up @@ -119,4 +98,47 @@ impl Config {
}
}

/// Plugin config.
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct ConfigFilter {
/// Kafka topic to send account updates to.
pub update_account_topic: String,
/// Kafka topic to send slot status updates to.
pub slot_status_topic: String,
/// Kafka topic to send transaction to.
pub transaction_topic: String,
/// List of programs to ignore.
pub program_ignores: Vec<String>,
/// List of programs to include
pub program_filters: Vec<String>,
// List of accounts to include
pub account_filters: Vec<String>,
/// Publish all accounts on startup.
pub publish_all_accounts: bool,
/// Publish vote transactions.
pub include_vote_transactions: bool,
/// Publish failed transactions.
pub include_failed_transactions: bool,
/// Wrap all event message in a single message type.
pub wrap_messages: bool,
}

impl Default for ConfigFilter {
fn default() -> Self {
Self {
update_account_topic: "".to_owned(),
slot_status_topic: "".to_owned(),
transaction_topic: "".to_owned(),
program_ignores: Vec::new(),
program_filters: Vec::new(),
account_filters: Vec::new(),
publish_all_accounts: false,
include_vote_transactions: true,
include_failed_transactions: true,
wrap_messages: false,
}
}
}

pub type Producer = ThreadedProducer<DefaultProducerContext>;
50 changes: 38 additions & 12 deletions src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,30 @@
// limitations under the License.

use {
crate::Config,
crate::ConfigFilter,
solana_program::pubkey::Pubkey,
std::{collections::HashSet, str::FromStr},
};

pub struct Filter {
program_ignores: HashSet<[u8; 32]>,
program_filters: HashSet<[u8; 32]>,
account_filters: HashSet<[u8; 32]>,
pub publish_all_accounts: bool,
pub program_ignores: HashSet<[u8; 32]>,
pub program_filters: HashSet<[u8; 32]>,
pub account_filters: HashSet<[u8; 32]>,
pub include_vote_transactions: bool,
pub include_failed_transactions: bool,

pub update_account_topic: String,
pub slot_status_topic: String,
pub transaction_topic: String,

pub wrap_messages: bool,
}

impl Filter {
pub fn new(config: &Config) -> Self {
pub fn new(config: &ConfigFilter) -> Self {
Self {
publish_all_accounts: config.publish_all_accounts,
program_ignores: config
.program_ignores
.iter()
Expand All @@ -42,6 +52,14 @@ impl Filter {
.iter()
.flat_map(|p| Pubkey::from_str(p).ok().map(|p| p.to_bytes()))
.collect(),
include_vote_transactions: config.include_vote_transactions,
include_failed_transactions: config.include_failed_transactions,

update_account_topic: config.update_account_topic.clone(),
slot_status_topic: config.slot_status_topic.clone(),
transaction_topic: config.transaction_topic.clone(),

wrap_messages: config.wrap_messages,
}
}

Expand All @@ -61,25 +79,33 @@ impl Filter {
Err(_error) => true,
}
}

pub fn wants_vote_tx(&self) -> bool {
self.include_vote_transactions
}

pub fn wants_failed_tx(&self) -> bool {
self.include_failed_transactions
}
}

#[cfg(test)]
mod tests {
use {
crate::{Config, Filter},
crate::{ConfigFilter, Filter},
solana_program::pubkey::Pubkey,
std::str::FromStr,
};

#[test]
fn test_filter() {
let config = Config {
let config = ConfigFilter {
program_ignores: vec![
"Sysvar1111111111111111111111111111111111111".to_owned(),
"Vote111111111111111111111111111111111111111".to_owned(),
],
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
..Config::default()
..Default::default()
};

let filter = Filter::new(&config);
Expand All @@ -99,13 +125,13 @@ mod tests {

#[test]
fn test_owner_filter() {
let config = Config {
let config = ConfigFilter {
program_ignores: vec![
"Sysvar1111111111111111111111111111111111111".to_owned(),
"Vote111111111111111111111111111111111111111".to_owned(),
],
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
..Config::default()
..Default::default()
};

let filter = Filter::new(&config);
Expand All @@ -131,10 +157,10 @@ mod tests {

#[test]
fn test_account_filter() {
let config = Config {
let config = ConfigFilter {
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
account_filters: vec!["5KKsLVU6TcbVDK4BS6K1DGDxnh4Q9xjYJ8XaDCG5t8ht".to_owned()],
..Config::default()
..Default::default()
};

let filter = Filter::new(&config);
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod publisher;
mod version;

pub use {
config::{Config, Producer},
config::{Config, ConfigFilter, Producer},
event::*,
filter::Filter,
plugin::KafkaPlugin,
Expand Down
Loading

0 comments on commit 9d21d53

Please sign in to comment.