Skip to content

Commit

Permalink
support multiple config filters, fix transaction filter (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Oct 11, 2023
1 parent c81c968 commit faaa4c2
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 166 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "solana-accountsdb-plugin-kafka"
description = "Solana AccountsDb plugin for Kafka"
authors = ["Blockdaemon", "Triton One"]
version = "0.1.8+solana.1.16.15"
version = "0.3.0+solana.1.16.15"
edition = "2021"
repository = "https://github.com/rpcpool/solana-accountsdb-plugin-kafka"
license = "Apache-2.0"
Expand Down
96 changes: 55 additions & 41 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,22 @@ use {
};

/// Plugin config.
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[allow(dead_code)]
libpath: String,

/// Kafka config.
pub kafka: HashMap<String, String>,

/// Graceful shutdown timeout.
#[serde(default)]
pub shutdown_timeout_ms: u64,
/// Kafka topic to send account updates to.
#[serde(default)]
pub update_account_topic: String,
/// Kafka topic to send slot status updates to.
#[serde(default)]
pub slot_status_topic: String,
/// Kafka topic to send transaction to.
#[serde(default)]
pub transaction_topic: String,
/// List of programs to ignore.
#[serde(default)]
pub program_ignores: Vec<String>,
/// List of programs to include
#[serde(default)]
pub program_filters: Vec<String>,
// List of accounts to include
#[serde(default)]
pub account_filters: Vec<String>,
/// Publish all accounts on startup.
#[serde(default)]
pub publish_all_accounts: bool,
/// Publish vote transactions.
#[serde(default)]
pub include_vote_transactions: bool,
/// Publish failed transactions.
#[serde(default)]
pub include_failed_transactions: bool,
/// Wrap all event message in a single message type.
#[serde(default)]
pub wrap_messages: bool,

/// Accounts, transactions filters
pub filters: Vec<ConfigFilter>,

/// Prometheus endpoint.
#[serde(default)]
pub prometheus: Option<SocketAddr>,
Expand All @@ -73,18 +52,10 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
libpath: "".to_owned(),
kafka: HashMap::new(),
shutdown_timeout_ms: 30_000,
update_account_topic: "".to_owned(),
slot_status_topic: "".to_owned(),
transaction_topic: "".to_owned(),
program_ignores: Vec::new(),
program_filters: Vec::new(),
account_filters: Vec::new(),
publish_all_accounts: false,
include_vote_transactions: true,
include_failed_transactions: true,
wrap_messages: false,
filters: vec![],
prometheus: None,
}
}
Expand Down Expand Up @@ -127,4 +98,47 @@ impl Config {
}
}

/// Plugin config.
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct ConfigFilter {
/// Kafka topic to send account updates to.
pub update_account_topic: String,
/// Kafka topic to send slot status updates to.
pub slot_status_topic: String,
/// Kafka topic to send transaction to.
pub transaction_topic: String,
/// List of programs to ignore.
pub program_ignores: Vec<String>,
/// List of programs to include
pub program_filters: Vec<String>,
// List of accounts to include
pub account_filters: Vec<String>,
/// Publish all accounts on startup.
pub publish_all_accounts: bool,
/// Publish vote transactions.
pub include_vote_transactions: bool,
/// Publish failed transactions.
pub include_failed_transactions: bool,
/// Wrap all event message in a single message type.
pub wrap_messages: bool,
}

impl Default for ConfigFilter {
fn default() -> Self {
Self {
update_account_topic: "".to_owned(),
slot_status_topic: "".to_owned(),
transaction_topic: "".to_owned(),
program_ignores: Vec::new(),
program_filters: Vec::new(),
account_filters: Vec::new(),
publish_all_accounts: false,
include_vote_transactions: true,
include_failed_transactions: true,
wrap_messages: false,
}
}
}

pub type Producer = ThreadedProducer<DefaultProducerContext>;
41 changes: 27 additions & 14 deletions src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,30 @@
// limitations under the License.

use {
crate::Config,
crate::ConfigFilter,
solana_program::pubkey::Pubkey,
std::{collections::HashSet, str::FromStr},
};

pub struct Filter {
program_ignores: HashSet<[u8; 32]>,
program_filters: HashSet<[u8; 32]>,
account_filters: HashSet<[u8; 32]>,
include_vote_transactions: bool,
include_failed_transactions: bool,
pub publish_all_accounts: bool,
pub program_ignores: HashSet<[u8; 32]>,
pub program_filters: HashSet<[u8; 32]>,
pub account_filters: HashSet<[u8; 32]>,
pub include_vote_transactions: bool,
pub include_failed_transactions: bool,

pub update_account_topic: String,
pub slot_status_topic: String,
pub transaction_topic: String,

pub wrap_messages: bool,
}

impl Filter {
pub fn new(config: &Config) -> Self {
pub fn new(config: &ConfigFilter) -> Self {
Self {
publish_all_accounts: config.publish_all_accounts,
program_ignores: config
.program_ignores
.iter()
Expand All @@ -46,6 +54,11 @@ impl Filter {
.collect(),
include_vote_transactions: config.include_vote_transactions,
include_failed_transactions: config.include_failed_transactions,

update_account_topic: config.update_account_topic.clone(),
slot_status_topic: config.slot_status_topic.clone(),
transaction_topic: config.transaction_topic.clone(),
wrap_messages: config.wrap_messages,
}
}

Expand Down Expand Up @@ -78,20 +91,20 @@ impl Filter {
#[cfg(test)]
mod tests {
use {
crate::{Config, Filter},
crate::{ConfigFilter, Filter},
solana_program::pubkey::Pubkey,
std::str::FromStr,
};

#[test]
fn test_filter() {
let config = Config {
let config = ConfigFilter {
program_ignores: vec![
"Sysvar1111111111111111111111111111111111111".to_owned(),
"Vote111111111111111111111111111111111111111".to_owned(),
],
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
..Config::default()
..Default::default()
};

let filter = Filter::new(&config);
Expand All @@ -111,13 +124,13 @@ mod tests {

#[test]
fn test_owner_filter() {
let config = Config {
let config = ConfigFilter {
program_ignores: vec![
"Sysvar1111111111111111111111111111111111111".to_owned(),
"Vote111111111111111111111111111111111111111".to_owned(),
],
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
..Config::default()
..Default::default()
};

let filter = Filter::new(&config);
Expand All @@ -143,10 +156,10 @@ mod tests {

#[test]
fn test_account_filter() {
let config = Config {
let config = ConfigFilter {
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
account_filters: vec!["5KKsLVU6TcbVDK4BS6K1DGDxnh4Q9xjYJ8XaDCG5t8ht".to_owned()],
..Config::default()
..Default::default()
};

let filter = Filter::new(&config);
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod publisher;
mod version;

pub use {
config::{Config, Producer},
config::{Config, ConfigFilter, Producer},
event::*,
filter::Filter,
plugin::KafkaPlugin,
Expand Down
Loading

0 comments on commit faaa4c2

Please sign in to comment.