-
Notifications
You must be signed in to change notification settings - Fork 8
/
lib.rs
100 lines (80 loc) · 2.96 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use std::sync::Arc;
use std::time::Duration;
use actix_web::middleware;
use actix_web::middleware::Logger;
use actix_web::web::Data;
use actix_web::App;
use actix_web::HttpServer;
use openraft::Config;
use openraft::Raft;
use openraft::SnapshotPolicy;
use crate::app::ExampleApp;
use crate::network::api;
use crate::network::management;
use crate::network::raft;
use crate::network::raft_network_impl::ExampleNetwork;
use crate::store::ExampleRequest;
use crate::store::ExampleResponse;
use crate::store::ExampleStore;
use crate::store::Restore;
pub mod app;
pub mod client;
pub mod network;
pub mod store;
pub mod matchengine;
pub type ExampleNodeId = u64;
openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId
);
pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;
pub async fn start_example_raft_node(node_id: ExampleNodeId, http_addr: String) -> std::io::Result<()> {
// Create a configuration for the raft instance.
let mut config = Config::default().validate().unwrap();
config.snapshot_policy = SnapshotPolicy::LogsSinceLast(500);
config.max_applied_log_to_keep = 20000;
config.install_snapshot_timeout = 400;
let config = Arc::new(config);
// Create a instance of where the Raft data will be stored.
let es = ExampleStore::open_create(node_id);
//es.load_latest_snapshot().await.unwrap();
let mut store = Arc::new(es);
store.restore().await;
// Create the network layer that will connect and communicate the raft instances and
// will be used in conjunction with the store created above.
let network = ExampleNetwork::new();
// Create a local raft instance.
let raft = Raft::new(node_id, config.clone(), network, store.clone());
// Create an application that will store all the instances created above, this will
// be later used on the actix-web services.
let app = Data::new(ExampleApp {
id: node_id,
addr: http_addr.clone(),
raft,
store,
config,
});
// Start the actix-web server.
let server = HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
.wrap(middleware::Compress::default())
.app_data(app.clone())
// raft internal RPC
.service(raft::append)
.service(raft::snapshot)
.service(raft::vote)
// admin API
.service(management::init)
.service(management::add_learner)
.service(management::change_membership)
.service(management::metrics)
// application API
.service(api::write)
.service(api::read)
.service(api::consistent_read)
}).keep_alive(Duration::from_secs(5));
let x = server.bind(http_addr)?;
x.run().await
}