Skip to content

Commit

Permalink
feat: upgrade service start implementation + documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Apr 4, 2023
1 parent 6516155 commit 02db65e
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 79 deletions.
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,24 @@ Tbe first time this command run, a chainstate archive will be downloaded, uncomp
The subsequent scans will use the cached chainstate if already present, speeding up iterations and the overall feedback loop.

---
## Running `chainhook` in production mode
## Run `chainhook` as a service for streaming new blocks

To be documented.
`chainhook` can be ran as a background service for streaming and processing new canonical blocks appended to the Bitcoin and Stacks blockchains.

When running chainhook as a service, `if_this` / `then_that` predicates can be registered by passing the path of the `json` file in the command line:

```bash
$ chainhook service start --predicate-path=./path/to/predicate-1.json --predicate-path=./path/to/predicate-2.json --config-path=./path/to/config.toml
```

Predicates can also be added dynamically. When the `--predicate-path` option is not passed or when the `--start-http-api` option is passed, `chainhook` will instantiate a REST API allowing developers to list, add and removes preducates at runtime:

```bash
$ chainhook service start --config-path=./path/to/config.toml
```

```bash
$ chainhook service start --predicate-path=./path/to/predicate-1.json --start-http-api --config-path=./path/to/config.toml
```

A comprehensive OpenAPI spcification explaining how to interact with the Chainhook REST API can be found [here](./docs/chainhook-openapi.json).
46 changes: 33 additions & 13 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ struct StartCommand {
conflicts_with = "devnet"
)]
pub config_path: Option<String>,
/// Specify relative path of the chainhooks (yaml format) to evaluate
#[clap(long = "predicate-path")]
pub predicates_paths: Vec<String>,
/// Start REST API for managing predicates
#[clap(long = "start-http-api")]
pub start_http_api: bool,
}

#[derive(Subcommand, PartialEq, Clone, Debug)]
Expand Down Expand Up @@ -321,10 +327,19 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
match opts.command {
Command::Service(subcmd) => match subcmd {
ServiceCommand::Start(cmd) => {
let config =
let mut config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
// We disable the API if a predicate was passed, and the --enable-
if cmd.predicates_paths.len() > 0 && !cmd.start_http_api {
config.chainhooks.enable_http_api = false;
}
let mut service = Service::new(config, ctx);
return service.run().await;
let predicates = cmd
.predicates_paths
.iter()
.map(|p| load_predicate_from_path(p))
.collect::<Result<Vec<ChainhookFullSpecification>, _>>()?;
return service.run(predicates).await;
}
},
Command::Config(subcmd) => match subcmd {
Expand Down Expand Up @@ -455,17 +470,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
PredicatesCommand::Scan(cmd) => {
let mut config =
Config::default(false, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
let file = std::fs::File::open(&cmd.predicate_path)
.map_err(|e| format!("unable to read file {}\n{:?}", cmd.predicate_path, e))?;
let mut file_reader = BufReader::new(file);
let mut file_buffer = vec![];
file_reader
.read_to_end(&mut file_buffer)
.map_err(|e| format!("unable to read file {}\n{:?}", cmd.predicate_path, e))?;
let predicate: ChainhookFullSpecification = serde_json::from_slice(&file_buffer)
.map_err(|e| {
format!("unable to parse json file {}\n{:?}", cmd.predicate_path, e)
})?;
let predicate = load_predicate_from_path(&cmd.predicate_path)?;
match predicate {
ChainhookFullSpecification::Bitcoin(predicate) => {
scan_bitcoin_chain_with_predicate(predicate, &config, &ctx).await?;
Expand Down Expand Up @@ -654,3 +659,18 @@ pub fn install_ctrlc_handler(terminate_tx: Sender<DigestingCommand>, ctx: Contex
})
.expect("Error setting Ctrl-C handler");
}

pub fn load_predicate_from_path(
predicate_path: &str,
) -> Result<ChainhookFullSpecification, String> {
let file = std::fs::File::open(&predicate_path)
.map_err(|e| format!("unable to read file {}\n{:?}", predicate_path, e))?;
let mut file_reader = BufReader::new(file);
let mut file_buffer = vec![];
file_reader
.read_to_end(&mut file_buffer)
.map_err(|e| format!("unable to read file {}\n{:?}", predicate_path, e))?;
let predicate: ChainhookFullSpecification = serde_json::from_slice(&file_buffer)
.map_err(|e| format!("unable to parse json file {}\n{:?}", predicate_path, e))?;
Ok(predicate)
}
6 changes: 6 additions & 0 deletions components/chainhook-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct TsvUrlConfig {
pub struct ChainhooksConfig {
pub max_stacks_registrations: u16,
pub max_bitcoin_registrations: u16,
pub enable_http_api: bool,
}

impl Config {
Expand Down Expand Up @@ -97,6 +98,7 @@ impl Config {
chainhook_config: None,
ingestion_port: DEFAULT_INGESTION_PORT,
control_port: DEFAULT_CONTROL_PORT,
control_api_enabled: self.chainhooks.enable_http_api,
bitcoind_rpc_username: self.network.bitcoind_rpc_username.clone(),
bitcoind_rpc_password: self.network.bitcoind_rpc_password.clone(),
bitcoind_rpc_url: self.network.bitcoind_rpc_url.clone(),
Expand Down Expand Up @@ -149,6 +151,7 @@ impl Config {
.chainhooks
.max_bitcoin_registrations
.unwrap_or(100),
enable_http_api: true,
},
network: IndexerConfig {
stacks_node_rpc_url: config_file.network.stacks_node_rpc_url.to_string(),
Expand Down Expand Up @@ -273,6 +276,7 @@ impl Config {
chainhooks: ChainhooksConfig {
max_stacks_registrations: 50,
max_bitcoin_registrations: 50,
enable_http_api: true,
},
network: IndexerConfig {
stacks_node_rpc_url: "http://0.0.0.0:20443".into(),
Expand Down Expand Up @@ -302,6 +306,7 @@ impl Config {
chainhooks: ChainhooksConfig {
max_stacks_registrations: 10,
max_bitcoin_registrations: 10,
enable_http_api: true,
},
network: IndexerConfig {
stacks_node_rpc_url: "http://0.0.0.0:20443".into(),
Expand Down Expand Up @@ -331,6 +336,7 @@ impl Config {
chainhooks: ChainhooksConfig {
max_stacks_registrations: 10,
max_bitcoin_registrations: 10,
enable_http_api: true,
},
network: IndexerConfig {
stacks_node_rpc_url: "http://0.0.0.0:20443".into(),
Expand Down
146 changes: 82 additions & 64 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,70 +39,39 @@ impl Service {
Self { config, ctx }
}

pub async fn run(&mut self) -> Result<(), String> {
pub async fn run(
&mut self,
mut predicates: Vec<ChainhookFullSpecification>,
) -> Result<(), String> {
let mut chainhook_config = ChainhookConfig::new();

{
let redis_config = self.config.expected_redis_config();
let client = redis::Client::open(redis_config.uri.clone()).unwrap();
let mut redis_con = match client.get_connection() {
Ok(con) => con,
Err(message) => {
error!(
if predicates.is_empty() {
let mut registered_predicates = load_predicates_from_redis(&self.config, &self.ctx)?;
predicates.append(&mut registered_predicates);
}

for predicate in predicates.into_iter() {
match chainhook_config.register_hook(
(
&self.config.network.bitcoin_network,
&self.config.network.stacks_network,
),
predicate,
&ApiKey(None),
) {
Ok(spec) => {
info!(
self.ctx.expect_logger(),
"Unable to connect to redis server: {}",
message.to_string()
"Predicate {} retrieved from storage and loaded",
spec.uuid(),
);
std::thread::sleep(std::time::Duration::from_secs(1));
std::process::exit(1);
}
};

let chainhooks_to_load: Vec<String> = redis_con
.scan_match("chainhook:*:*:*")
.expect("unable to retrieve prunable entries")
.into_iter()
.collect();

for key in chainhooks_to_load.iter() {
let chainhook = match redis_con.hget::<_, _, String>(key, "specification") {
Ok(spec) => {
ChainhookFullSpecification::deserialize_specification(&spec, key).unwrap()
// todo
}
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to load chainhook associated with key {}: {}",
key,
e.to_string()
);
continue;
}
};

match chainhook_config.register_hook(
(
&self.config.network.bitcoin_network,
&self.config.network.stacks_network,
),
chainhook,
&ApiKey(None),
) {
Ok(spec) => {
info!(
self.ctx.expect_logger(),
"Predicate {} retrieved from storage and loaded",
spec.uuid(),
);
}
Err(e) => {
error!(
self.ctx.expect_logger(),
"Failed loading predicate from storage: {}",
e.to_string()
);
}
Err(e) => {
error!(
self.ctx.expect_logger(),
"Failed loading predicate from storage: {}",
e.to_string()
);
}
}
}
Expand Down Expand Up @@ -133,11 +102,13 @@ impl Service {
}
}

info!(
self.ctx.expect_logger(),
"Listening for chainhook predicate registrations on port {}",
event_observer_config.control_port
);
if self.config.chainhooks.enable_http_api {
info!(
self.ctx.expect_logger(),
"Listening for chainhook predicate registrations on port {}",
event_observer_config.control_port
);
}

// let ordinal_index = match initialize_ordinal_index(&event_observer_config, None, &self.ctx)
// {
Expand Down Expand Up @@ -618,3 +589,50 @@ fn update_storage_with_confirmed_stacks_blocks(
redis_con.set(&format!("stx:tip"), block.block_identifier.index);
}
}

fn load_predicates_from_redis(
config: &Config,
ctx: &Context,
) -> Result<Vec<ChainhookFullSpecification>, String> {
let redis_config = config.expected_redis_config();
let client = redis::Client::open(redis_config.uri.clone()).unwrap();
let mut redis_con = match client.get_connection() {
Ok(con) => con,
Err(message) => {
error!(
ctx.expect_logger(),
"Unable to connect to redis server: {}",
message.to_string()
);
std::thread::sleep(std::time::Duration::from_secs(1));
std::process::exit(1);
}
};

let chainhooks_to_load: Vec<String> = redis_con
.scan_match("chainhook:*:*:*")
.expect("unable to retrieve prunable entries")
.into_iter()
.collect();

let mut predicates = vec![];
for key in chainhooks_to_load.iter() {
let chainhook = match redis_con.hget::<_, _, String>(key, "specification") {
Ok(spec) => {
ChainhookFullSpecification::deserialize_specification(&spec, key).unwrap()
// todo
}
Err(e) => {
error!(
ctx.expect_logger(),
"unable to load chainhook associated with key {}: {}",
key,
e.to_string()
);
continue;
}
};
predicates.push(chainhook);
}
Ok(predicates)
}
1 change: 1 addition & 0 deletions components/chainhook-event-observer/src/observer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub struct EventObserverConfig {
pub event_handlers: Vec<EventHandler>,
pub ingestion_port: u16,
pub control_port: u16,
pub control_api_enabled: bool,
pub bitcoind_rpc_username: String,
pub bitcoind_rpc_password: String,
pub bitcoind_rpc_url: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn generate_test_config() -> (EventObserverConfig, ChainhookStore) {
event_handlers: vec![],
ingestion_port: 0,
control_port: 0,
control_api_enabled: false,
bitcoind_rpc_username: "user".into(),
bitcoind_rpc_password: "user".into(),
bitcoind_rpc_url: "http://localhost:18443".into(),
Expand Down

0 comments on commit 02db65e

Please sign in to comment.