Skip to content

Commit

Permalink
Merge branch 'master' into provide-public-ip
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh authored Sep 13, 2024
2 parents 19b9085 + 127fac5 commit 9db7752
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/release-please/manifest.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
".": "0.27.1"
".": "0.27.5"
}
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
# Changelog

## [0.27.5](https://github.com/fluencelabs/nox/compare/nox-v0.27.4...nox-v0.27.5) (2024-09-12)


### Bug Fixes

* **listener:** next epoch calculation ([#2371](https://github.com/fluencelabs/nox/issues/2371)) ([2d29b30](https://github.com/fluencelabs/nox/commit/2d29b30973fa7bac4f143eedb6b360860dc08815))

## [0.27.4](https://github.com/fluencelabs/nox/compare/nox-v0.27.3...nox-v0.27.4) (2024-09-12)


### Bug Fixes

* **listener:** last window calculation + don't send empty proofs ([#2369](https://github.com/fluencelabs/nox/issues/2369)) ([da70f7f](https://github.com/fluencelabs/nox/commit/da70f7f597d33c2506a85ce96aeb2ca7a79c096d))

## [0.27.3](https://github.com/fluencelabs/nox/compare/nox-v0.27.2...nox-v0.27.3) (2024-09-12)


### Bug Fixes

* **listener:** fix batch request creation ([#2367](https://github.com/fluencelabs/nox/issues/2367)) ([e8ee2f8](https://github.com/fluencelabs/nox/commit/e8ee2f84d79f38ea3a34a179a150ef681a838e0e))

## [0.27.2](https://github.com/fluencelabs/nox/compare/nox-v0.27.1...nox-v0.27.2) (2024-09-12)


### Features

* **listener:** skip proofs with wrong nonce ([#2365](https://github.com/fluencelabs/nox/issues/2365)) ([fe0ec7c](https://github.com/fluencelabs/nox/commit/fe0ec7c54030c7812df6493df0d975a9d6e197c7))

## [0.27.1](https://github.com/fluencelabs/nox/compare/nox-v0.27.0...nox-v0.27.1) (2024-09-10)


Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 56 additions & 49 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use alloy_primitives::{Address, FixedBytes, Uint, U256, U64};
use alloy_primitives::{Address, FixedBytes, Uint, U256};
use alloy_sol_types::SolEvent;
use backoff::Error::Permanent;
use std::cmp::min;
Expand Down Expand Up @@ -217,7 +217,6 @@ impl ChainListener {
let result = tokio::task::Builder::new()
.name("ChainListener")
.spawn(async move {

if let Err(err) = self.set_utility_core().await {
tracing::error!(target: "chain-listener", "Failed to set utility core: {err}; Stopping...");
exit(1);
Expand Down Expand Up @@ -427,16 +426,16 @@ impl ChainListener {
.cloned()
.ok_or(eyre::eyre!("No utility core id"))?;
measured_request(&self.metrics,
retry(ExponentialBackoff::default(), || async {
ccp_client
.realloc_utility_cores(vec![utility_core])
.await
.map_err(|err| {
tracing::warn!(target: "chain-listener", "Error reallocating utility core {utility_core} to CCP, error: {err}. Retrying...");
eyre::eyre!("Error reallocating utility core {utility_core} to CCP, error: {err}")
})?;
Ok(())
})
retry(ExponentialBackoff::default(), || async {
ccp_client
.realloc_utility_cores(vec![utility_core])
.await
.map_err(|err| {
tracing::warn!(target: "chain-listener", "Error reallocating utility core {utility_core} to CCP, error: {err}. Retrying...");
eyre::eyre!("Error reallocating utility core {utility_core} to CCP, error: {err}")
})?;
Ok(())
}),
).await?;

tracing::info!("Utility core {utility_core} successfully reallocated");
Expand Down Expand Up @@ -611,17 +610,18 @@ impl ChainListener {
params: ArrayParams,
) -> Result<Subscription<JsonValue>, client::Error> {
let sub = retry(ExponentialBackoff::default(), || async {
self
self
.ws_client
.subscribe("eth_subscribe", params.clone(), "eth_unsubscribe")
.await.map_err(|err| {
.await.map_err(|err| {
if let client::Error::RestartNeeded(_) = err {
tracing::error!(target: "chain-listener", "Failed to subscribe to {method}: {err};");
Permanent(err)
} else {
tracing::warn!(target: "chain-listener", "Failed to subscribe to {method}: {err}; Retrying...");
backoff::Error::transient(err)
}})
}
})
}).await?;

Ok(sub)
Expand Down Expand Up @@ -1073,7 +1073,7 @@ impl ChainListener {
tracing::info!(target: "chain-listener", "Stopping current commitment");
if let Some(ref ccp_client) = self.ccp_client {
measured_request(&self.metrics,
ccp_client.on_no_active_commitment()
ccp_client.on_no_active_commitment(),
).await.map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to send no active commitment to CCP: {err}");
eyre::eyre!("Failed to send no active commitment to CCP: {err}")
Expand Down Expand Up @@ -1118,9 +1118,12 @@ impl ChainListener {

fn is_epoch_ending(&self) -> bool {
let window = Uint::from(self.listener_config.epoch_end_window.as_secs());
let next_epoch_start =
self.init_timestamp + self.epoch_duration * (self.current_epoch + Uint::from(1));
next_epoch_start - self.last_observed_block_timestamp < window
let next_epoch_start = self.init_timestamp + self.epoch_duration * self.current_epoch;
tracing::debug!(target: "chain-listener", "Next epoch start: {}, last observed block timestamp: {}", next_epoch_start, self.last_observed_block_timestamp);
next_epoch_start
.checked_sub(self.last_observed_block_timestamp)
.unwrap_or(Uint::ZERO)
< window
}

async fn poll_proofs(&mut self) -> eyre::Result<()> {
Expand All @@ -1131,6 +1134,11 @@ impl ChainListener {
if let Some(ref ccp_client) = self.ccp_client {
let batch_requests = self.get_batch_request();

if batch_requests.is_empty() {
tracing::debug!(target: "chain-listener", "No compute units to poll proofs for. Probably all CUs have reached max proofs count");
return Ok(());
}

let proof_batches = if self.is_epoch_ending() {
let last_known_proofs = batch_requests
.into_iter()
Expand Down Expand Up @@ -1158,30 +1166,22 @@ impl ChainListener {
.map_err(|err| eyre::eyre!("Failed to poll batched proofs from ccp: {err}"))?
};

// TODO: maybe filter out proofs that are not related to current epoch
// // Filter proofs related to current epoch only
// let proof_batches: Vec<BatchResponse> = proofs
// .into_iter()
// .for_each(move |p| {
// p.proof_batches
// .into_iter()
// .filter(|p| p.id.global_nonce == self.global_nonce)
// .collect()
// })
// .collect();

if !proof_batches.is_empty() {
let total_proofs = proof_batches
.iter()
.map(|p| p.proof_batches.len())
.sum::<usize>();
tracing::info!(target: "chain-listener", "Found {} proofs in {} batches from polling", total_proofs, proof_batches.len());

let mut unit_ids = Vec::new();
let mut local_nonces = Vec::new();
let mut result_hashes = Vec::new();
let batch_count = proof_batches.len();
let mut skipped_proofs_count = 0;
for batch in proof_batches.into_iter() {
for proof in batch.proof_batches.into_iter() {
if proof.id.global_nonce != self.global_nonce
|| proof.id.difficulty != self.difficulty
{
tracing::debug!(target: "chain-listener", "Proof (id={}, global nonce={}, difficulty={}) doesn't match current nonce {} and/or difficulty {}. Skipping..", proof.id.idx, proof.id.global_nonce, proof.id.difficulty, self.global_nonce, self.difficulty);
skipped_proofs_count += 1;
continue;
}

unit_ids.push(proof.cu_id);
local_nonces.push(proof.local_nonce);
result_hashes.push(proof.result_hash);
Expand All @@ -1191,12 +1191,17 @@ impl ChainListener {
}
}

self.submit_proofs(unit_ids, local_nonces, result_hashes)
.await?;
tracing::info!(target: "chain-listener", "Found {} proofs in {} batches from polling, skipped {} proofs", result_hashes.len(), batch_count, skipped_proofs_count);

if !result_hashes.is_empty() {
self.submit_proofs(unit_ids, local_nonces, result_hashes)
.await?;
}
} else {
tracing::debug!(target: "chain-listener", "No proofs found from polling");
}
}

Ok(())
}

Expand All @@ -1210,14 +1215,14 @@ impl ChainListener {
self.chain_connector.submit_proofs(unit_ids.clone(), local_nonces.clone(), result_hashes.clone()).await.map_err(|err| {
match err {
ConnectorError::RpcCallError { .. } => { Permanent(err) }
_ => {
_ => {
tracing::warn!(target: "chain-listener", "Failed to submit proof: {err}. Retrying..");
backoff::Error::transient(err)
}
}
})
})
.await;
.await;

match submit {
Err(err) => {
Expand Down Expand Up @@ -1278,7 +1283,7 @@ impl ChainListener {

Ok(s)
})
.await?;
.await?;

for (status, (deal_id, worker)) in statuses
.into_iter()
Expand Down Expand Up @@ -1425,17 +1430,19 @@ impl ChainListener {
let mut batch_request = HashMap::new();
for cu_id in self.cc_compute_units.keys() {
let sent_proofs_count = self.proof_tracker.get_proof_counter(cu_id);
let proofs_needed = U64::from(
self.max_proofs_per_epoch
.checked_add(-sent_proofs_count)
.unwrap_or(Uint::ZERO),
)
.as_limbs()[0] as usize;
let proofs_needed = self
.max_proofs_per_epoch
.checked_sub(sent_proofs_count)
.unwrap_or(Uint::ZERO)
.as_limbs()[0];

if proofs_needed > 0 {
let request = BatchRequest {
last_seen_proof_idx: self.proof_tracker.get_last_submitted_proof_id(cu_id),
proof_batch_size: min(proofs_needed, self.listener_config.max_proof_batch_size),
proof_batch_size: min(
proofs_needed as usize,
self.listener_config.max_proof_batch_size,
),
};

batch_request.insert(*cu_id, request);
Expand Down
2 changes: 1 addition & 1 deletion nox/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nox"
version = "0.27.1"
version = "0.27.5"
authors = ["Fluence DAO", "Cloudless Labs"]
description = "Node implementing peer functionality in the Fluence p2p network"
edition = "2021"
Expand Down

0 comments on commit 9db7752

Please sign in to comment.