Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasf committed Nov 28, 2020
1 parent 410ff54 commit 619785d
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 69 deletions.
28 changes: 14 additions & 14 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ impl Work {
}

pub fn node_limit(&self) -> Option<NodeLimit> {
match self {
Work::Analysis { nodes, .. } => nodes.clone(),
match *self {
Work::Analysis { nodes, .. } => nodes,
Work::Move { .. } => None,
}
}
Expand Down Expand Up @@ -543,18 +543,16 @@ impl ApiActor {
}

async fn abort(&mut self, batch_id: BatchId) -> reqwest::Result<()> {
Ok({
let url = format!("{}/abort/{}", self.endpoint, batch_id);
self.logger.warn(&format!("Aborting batch {}.", batch_id));
self.client.post(&url).json(&VoidRequestBody {
fishnet: Fishnet::authenticated(self.key.clone()),
stockfish: Stockfish::without_flavor(),
}).send().await?.error_for_status()?;
})
let url = format!("{}/abort/{}", self.endpoint, batch_id);
self.logger.warn(&format!("Aborting batch {}.", batch_id));
self.client.post(&url).json(&VoidRequestBody {
fishnet: Fishnet::authenticated(self.key.clone()),
stockfish: Stockfish::without_flavor(),
}).send().await?.error_for_status().map(|_| ())
}

async fn handle_message_inner(&mut self, msg: ApiMessage) -> reqwest::Result<()> {
Ok(match msg {
match msg {
ApiMessage::CheckKey { key, callback } => {
let url = format!("{}/key/{}", self.endpoint, key.0);
let res = self.client.get(&url).send().await?;
Expand Down Expand Up @@ -594,7 +592,7 @@ impl ApiActor {
StatusCode::BAD_REQUEST => callback.send(Acquired::BadRequest).nevermind("callback dropped"),
StatusCode::OK | StatusCode::ACCEPTED => {
if let Err(Acquired::Accepted(res)) = callback.send(Acquired::Accepted(res.json().await?)) {
self.logger.error(&format!("Acquired a batch, but callback dropped. Aborting."));
self.logger.error("Acquired a batch, but callback dropped. Aborting.");
self.abort(res.work.id()).await?;
}
}
Expand Down Expand Up @@ -632,7 +630,7 @@ impl ApiActor {
StatusCode::NO_CONTENT => callback.send(Acquired::NoContent).nevermind("callback dropped"),
StatusCode::OK | StatusCode::ACCEPTED => {
if let Err(Acquired::Accepted(res)) = callback.send(Acquired::Accepted(res.json().await?)) {
self.logger.error(&format!("Acquired a batch while submitting move, but callback dropped. Aborting."));
self.logger.error("Acquired a batch while submitting move, but callback dropped. Aborting.");
self.abort(res.work.id()).await?;
}
}
Expand All @@ -641,6 +639,8 @@ impl ApiActor {
}
}
}
})
}

Ok(())
}
}
20 changes: 10 additions & 10 deletions src/assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ const NNUE: Asset = Asset {
};

#[cfg(all(unix, target_arch = "x86_64", not(target_os = "macos")))]
const STOCKFISH: &'static [Asset] = &[
const STOCKFISH: &[Asset] = &[
Asset {
name: "stockfish-x86-64-bmi2",
data: include_bytes!("../assets/stockfish-x86-64-bmi2.xz"),
Expand Down Expand Up @@ -143,7 +143,7 @@ const STOCKFISH: &'static [Asset] = &[
];

#[cfg(all(unix, target_arch = "x86_64", not(target_os = "macos")))]
const STOCKFISH_MV: &'static [Asset] = &[
const STOCKFISH_MV: &[Asset] = &[
Asset {
name: "stockfish-mv-x86-64-bmi2",
data: include_bytes!("../assets/stockfish-mv-x86-64-bmi2.xz"),
Expand Down Expand Up @@ -177,7 +177,7 @@ const STOCKFISH_MV: &'static [Asset] = &[
];

#[cfg(all(windows, target_arch = "x86_64"))]
const STOCKFISH: &'static [Asset] = &[
const STOCKFISH: &[Asset] = &[
Asset {
name: "stockfish-x86-64-bmi2.exe",
data: include_bytes!("../assets/stockfish-x86-64-bmi2.exe.xz"),
Expand Down Expand Up @@ -211,7 +211,7 @@ const STOCKFISH: &'static [Asset] = &[
];

#[cfg(all(windows, target_arch = "x86_64"))]
const STOCKFISH_MV: &'static [Asset] = &[
const STOCKFISH_MV: &[Asset] = &[
Asset {
name: "stockfish-mv-x86-64-bmi2.exe",
data: include_bytes!("../assets/stockfish-mv-x86-64-bmi2.exe.xz"),
Expand Down Expand Up @@ -245,7 +245,7 @@ const STOCKFISH_MV: &'static [Asset] = &[
];

#[cfg(all(target_os = "macos", target_arch = "x86_64"))]
const STOCKFISH: &'static [Asset] = &[
const STOCKFISH: &[Asset] = &[
Asset {
name: "stockfish-macos-x86-64",
data: include_bytes!("../assets/stockfish-macos-x86-64.xz"),
Expand All @@ -255,7 +255,7 @@ const STOCKFISH: &'static [Asset] = &[
];

#[cfg(all(target_os = "macos", target_arch = "x86_64"))]
const STOCKFISH_MV: &'static [Asset] = &[
const STOCKFISH_MV: &[Asset] = &[
Asset {
name: "stockfish-mv-macos-x86-64",
data: include_bytes!("../assets/stockfish-mv-macos-x86-64.xz"),
Expand All @@ -265,7 +265,7 @@ const STOCKFISH_MV: &'static [Asset] = &[
];

#[cfg(all(unix, target_arch = "aarch64"))]
const STOCKFISH: &'static [Asset] = &[
const STOCKFISH: &[Asset] = &[
Asset {
name: "stockfish-mv-armv8",
data: include_bytes!("../assets/stockfish-armv8.xz"),
Expand All @@ -275,7 +275,7 @@ const STOCKFISH: &'static [Asset] = &[
];

#[cfg(all(unix, target_arch = "aarch64"))]
const STOCKFISH_MV: &'static [Asset] = &[
const STOCKFISH_MV: &[Asset] = &[
Asset {
name: "stockfish-mv-armv8",
data: include_bytes!("../assets/stockfish-mv-armv8.xz"),
Expand Down Expand Up @@ -342,8 +342,8 @@ impl Assets {
Ok(Assets {
nnue: NNUE.create(dir.path())?.to_str().expect("nnue path printable").to_owned(),
stockfish: ByEngineFlavor {
official: STOCKFISH.iter().filter(|a| cpu.contains(a.needs)).next().expect("stockfish").create(dir.path())?,
multi_variant: STOCKFISH_MV.iter().filter(|a| cpu.contains(a.needs)).next().expect("stockfish").create(dir.path())?,
official: STOCKFISH.iter().find(|a| cpu.contains(a.needs)).expect("compatible stockfish").create(dir.path())?,
multi_variant: STOCKFISH_MV.iter().find(|a| cpu.contains(a.needs)).expect("compatible stockfish").create(dir.path())?,
},
dir,
})
Expand Down
4 changes: 2 additions & 2 deletions src/configure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,14 @@ pub async fn parse_and_configure() -> Opt {
// Step 1: Endpoint.
let endpoint = loop {
let mut endpoint = String::new();
eprint!("Endpoint (default: {}): ", ini.get("Fishnet", "Endpoint").unwrap_or(DEFAULT_ENDPOINT.to_owned()));
eprint!("Endpoint (default: {}): ", ini.get("Fishnet", "Endpoint").unwrap_or_else(|| DEFAULT_ENDPOINT.to_owned()));
io::stderr().flush().expect("flush stderr");
io::stdin().read_line(&mut endpoint).expect("read endpoint from stdin");

let endpoint = Some(endpoint.trim().to_owned())
.filter(|e| !e.is_empty())
.or_else(|| ini.get("Fishnet", "Endpoint"))
.unwrap_or(DEFAULT_ENDPOINT.to_owned());
.unwrap_or_else(|| DEFAULT_ENDPOINT.to_owned());

match endpoint.parse() {
Ok(url) => {
Expand Down
7 changes: 2 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async fn run(opt: Opt, logger: &Logger) {
break;
}
_ = time::sleep(timeout) => {
logger.warn(&format!("Engine timed out in worker. If this happens frequently it is better to stop and defer to clients with better hardware."));
logger.warn(&format!("Engine timed out in worker {}. If this happens frequently it is better to stop and defer to clients with better hardware.", i));
drop(sf);
join_handle.await.expect("join");
break;
Expand All @@ -222,10 +222,7 @@ async fn run(opt: Opt, logger: &Logger) {

let (callback, waiter) = oneshot::channel();

if let Err(_) = tx.send(Pull {
response,
callback,
}).await {
if tx.send(Pull { response, callback }).await.is_err() {
logger.debug(&format!("Worker {} was about to send result, but shutting down", i));
break;
}
Expand Down
57 changes: 29 additions & 28 deletions src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::{min, max};
use std::convert::TryInto;
use std::collections::{VecDeque, HashMap};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::{Duration, Instant};
use shakmaty::uci::Uci;
Expand Down Expand Up @@ -116,33 +117,33 @@ impl QueueState {
}

fn add_incoming_batch(&mut self, batch: IncomingBatch) {
let batch_id = batch.work.id();
if self.pending.contains_key(&batch_id) {
self.logger.error(&format!("Dropping duplicate incoming batch {}", batch_id));
} else {
let progress_at = ProgressAt::from(&batch);

// Reversal only for cosmetics when displaying progress.
let mut positions = Vec::with_capacity(batch.positions.len());
for pos in batch.positions.into_iter().rev() {
positions.insert(0, match pos {
Skip::Present(pos) => {
self.incoming.push_back(pos);
None
}
Skip::Skip => Some(Skip::Skip),
});
}
match self.pending.entry(batch.work.id()) {
Entry::Occupied(entry) => self.logger.error(&format!("Dropping duplicate incoming batch {}", entry.key())),
Entry::Vacant(entry) => {
let progress_at = ProgressAt::from(&batch);

// Reversal only for cosmetics when displaying progress.
let mut positions = Vec::with_capacity(batch.positions.len());
for pos in batch.positions.into_iter().rev() {
positions.insert(0, match pos {
Skip::Present(pos) => {
self.incoming.push_back(pos);
None
}
Skip::Skip => Some(Skip::Skip),
});
}

self.pending.insert(batch_id, PendingBatch {
work: batch.work,
flavor: batch.flavor,
url: batch.url,
positions,
started_at: Instant::now(),
});
entry.insert(PendingBatch {
work: batch.work,
flavor: batch.flavor,
url: batch.url,
positions,
started_at: Instant::now(),
});

self.logger.progress(self.status_bar(), progress_at);
self.logger.progress(self.status_bar(), progress_at);
}
}
}

Expand Down Expand Up @@ -267,8 +268,8 @@ impl QueueActor {

if user_backlog >= sec || system_backlog >= sec {
if let Some(status) = self.api.status().await {
let user_wait = user_backlog.checked_sub(status.user.oldest).unwrap_or(Duration::default());
let system_wait = system_backlog.checked_sub(status.system.oldest).unwrap_or(Duration::default());
let user_wait = user_backlog.checked_sub(status.user.oldest).unwrap_or_default();
let system_wait = system_backlog.checked_sub(status.system.oldest).unwrap_or_default();
self.logger.debug(&format!("User wait: {:?} due to {:?} for oldest {:?}, system wait: {:?} due to {:?} for oldest {:?}",
user_wait, user_backlog, status.user.oldest,
system_wait, system_backlog, status.system.oldest));
Expand Down Expand Up @@ -484,7 +485,7 @@ impl IncomingBatch {
if positions.iter().all(Skip::is_skipped) {
let now = Instant::now();
return Err(CompletedBatch {
work: body.work.clone(),
work: body.work,
url,
flavor,
positions: positions.into_iter().map(|_| Skip::Skip).collect(),
Expand Down
19 changes: 12 additions & 7 deletions src/stockfish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl StockfishActor {
let mut stdout = Stdout::new(child.stdout.take().ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "stdout closed"))?);
let mut stdin = Stdin::new(child.stdin.take().ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "stdin closed"))?);

Ok(loop {
loop {
tokio::select! {
msg = self.rx.recv() => {
if let Some(msg) = msg {
Expand All @@ -166,18 +166,23 @@ impl StockfishActor {
break;
}
}
})
}

Ok(())
}

async fn handle_message(&mut self, stdout: &mut Stdout, stdin: &mut Stdin, msg: StockfishMessage) -> Result<(), EngineError> {
Ok(match msg {
match msg {
StockfishMessage::Go { mut callback, position } => {
tokio::select! {
_ = callback.closed() => return Err(EngineError::Shutdown),
res = self.go(stdout, stdin, position) => callback.send(res?).nevermind("go receiver dropped"),
_ = callback.closed() => Err(EngineError::Shutdown),
res = self.go(stdout, stdin, position) => {
callback.send(res?).nevermind("go receiver dropped");
Ok(())
}
}
}
})
}
}

async fn go(&mut self, stdout: &mut Stdout, stdin: &mut Stdin, position: Position) -> io::Result<PositionResponse> {
Expand Down Expand Up @@ -264,7 +269,7 @@ impl StockfishActor {

loop {
let line = stdout.read_line().await?;
let mut parts = line.split(" ");
let mut parts = line.split(' ');
match parts.next() {
Some("bestmove") => {
return Ok(PositionResponse {
Expand Down
7 changes: 4 additions & 3 deletions src/systemd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn systemd_system(opt: Opt) {
println!("ExecStart={}", exe);
println!("KillMode=mixed");
println!("WorkingDirectory=/tmp");
println!("User={}", env::var("USER").unwrap_or("XXX".to_owned()));
println!("User={}", env::var("USER").unwrap_or_else(|_| "XXX".to_owned()));
println!("Nice=5");
println!("CapabilityBoundingSet=");
println!("PrivateTmp=true");
Expand All @@ -33,7 +33,7 @@ pub fn systemd_system(opt: Opt) {
println!("WantedBy=multi-user.target");

if atty::is(Stream::Stdout) {
let command = env::args().next().unwrap_or("./fishnet".to_owned());
let command = env::args().next().unwrap_or_else(|| "./fishnet".to_owned());
eprintln!();
eprintln!("# Example usage:");
eprintln!("# {} systemd | sudo tee /etc/systemd/system/fishnet.service", command);
Expand Down Expand Up @@ -69,9 +69,10 @@ pub fn systemd_user(opt: Opt) {
println!("WantedBy=default.target");

if atty::is(Stream::Stdout) {
let command = env::args().next().unwrap_or_else(|| "./fishnet".to_owned());
eprintln!();
eprintln!("# Example usage:");
eprintln!("# {} systemd-user | tee ~/.config/systemd/user/fishnet.service", env::args().next().unwrap_or("./fishnet".to_owned()));
eprintln!("# {} systemd-user | tee ~/.config/systemd/user/fishnet.service", command);
eprintln!("# systemctl enable --user fishnet.service");
eprintln!("# systemctl start --user fishnet.service");
eprintln!("# Live view of log: journalctl --follow --user-unit fishnet");
Expand Down

0 comments on commit 619785d

Please sign in to comment.