This repository has been archived by the owner on Oct 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
main.rs
111 lines (96 loc) · 2.64 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use rocksdb::DB;
use std::fs;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::Mutex;
mod logging;
use parity_processbot::{
config::MainConfig, constants::*, github::Payload, github_bot, server,
webhook::*,
};
fn main() -> anyhow::Result<()> {
env_logger::from_env(env_logger::Env::default().default_filter_or("info"))
.format(logging::gke::format)
.init();
let config = MainConfig::from_env();
let socket = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
config.webhook_port.parse::<u16>().expect("webhook port"),
);
let db_version_path =
Path::new(&config.db_path).join("__PROCESSBOT_VERSION__");
let is_at_current_db_version = match db_version_path.exists() {
true => {
let str = fs::read_to_string(&db_version_path)?;
str == DATABASE_VERSION
}
false => false,
};
if !is_at_current_db_version {
log::info!(
"Clearing database to start from version {}",
DATABASE_VERSION
);
for entry in fs::read_dir(&config.db_path)? {
let entry = entry?;
if entry.path() == db_version_path {
continue;
}
if entry.metadata()?.is_dir() {
fs::remove_dir_all(entry.path())?;
} else {
fs::remove_file(entry.path())?;
}
}
fs::write(db_version_path, DATABASE_VERSION)?;
}
let db = DB::open_default(&config.db_path)?;
let github_bot = github_bot::GithubBot::new(&config);
let webhook_proxy_url = config.webhook_proxy_url.clone();
let app_state = Arc::new(Mutex::new(AppState {
db,
github_bot,
config,
}));
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()?;
if let Some(webhook_proxy_url) = webhook_proxy_url {
use eventsource::reqwest::Client;
use reqwest::Url;
let client = Client::new(Url::parse(&webhook_proxy_url).unwrap());
#[derive(serde::Deserialize)]
struct SmeePayload {
body: Payload,
}
for event in client {
let state = app_state.clone();
rt.block_on(async move {
let event = event.unwrap();
if let Ok(payload) =
serde_json::from_str::<SmeePayload>(event.data.as_str())
{
log::info!("Acquiring lock");
let state = &*state.lock().await;
let (merge_cancel_outcome, result) =
handle_payload(payload.body, state).await;
if let Err(err) = result {
handle_error(merge_cancel_outcome, err, state).await;
}
log::info!("Releasing lock");
} else {
match event.event_type.as_deref() {
Some("ping") => (),
Some("ready") => log::info!("Webhook proxy is ready!"),
_ => log::info!("Not parsed {:?}", event),
}
}
});
}
} else {
rt.block_on(server::init(socket, app_state))?;
}
Ok(())
}