Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(listener): listener fixes #2249

Merged
merged 17 commits into from
Jun 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 62 additions & 29 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ impl ChainListener {
}
tracing::info!(target: "chain-listener", "Subscribed successfully");

// Proof id should be loaded once on start, there is no reason to update it on refresh
// TODO: associate proof id with nonce, not current epoch
if let Err(err) = self.load_proof_id().await {
justprosh marked this conversation as resolved.
Show resolved Hide resolved
tracing::error!(target: "chain-listener", "Failed to load persisted proof id: {err}; Stopping...");
exit(1);
}

if let Err(err) = self.refresh_state().await {
tracing::error!(target: "chain-listener", "Failed to refresh state: {err}; Stopping...");
exit(1);
Expand Down Expand Up @@ -308,12 +315,13 @@ impl ChainListener {

self.difficulty = init_params.difficulty;
self.init_timestamp = init_params.init_timestamp;
self.global_nonce = init_params.global_nonce;

self.epoch_duration = init_params.epoch_duration;
self.min_proofs_per_epoch = init_params.min_proofs_per_epoch;
self.max_proofs_per_epoch = init_params.max_proofs_per_epoch;

self.set_current_epoch(init_params.current_epoch);
self.set_global_nonce(init_params.global_nonce).await?;

Ok(())
}
Expand Down Expand Up @@ -360,12 +368,10 @@ impl ChainListener {
_ => {}
}
}

self.load_proof_id().await?;
};

if let Err(e) = result {
tracing::warn!(target: "chain-listener", "Failed to refresh compute units: {e}");
tracing::warn!(target: "chain-listener", "Failed to refresh state: {e}");
tracing::info!(target: "chain-listener", "Retrying in 5 seconds");
tokio::time::sleep(Duration::from_secs(5)).await;
} else {
Expand All @@ -390,7 +396,7 @@ impl ChainListener {
let write = retry(backoff, || async {
persistence::persist_proof_id(
&self.persisted_proof_id_dir,
self.last_submitted_proof_id,
proof_id,
self.current_epoch,
).await.map_err(|err|{
tracing::warn!(target: "chain-listener", "Failed to persist proof id: {err}; Retrying...");
Expand All @@ -399,12 +405,13 @@ impl ChainListener {
Ok(())
}).await;

self.last_submitted_proof_id = proof_id;

if let Err(err) = write {
tracing::warn!(target: "chain-listener", "Failed to persist proof id: {err}; Ignoring..");
}

self.last_submitted_proof_id = proof_id;
tracing::info!(target: "chain-listener", "Persisted proof id {proof_id} on epoch {}", self.current_epoch);
tracing::info!(target: "chain-listener", "Persisted proof id {} on epoch {}", self.last_submitted_proof_id, self.current_epoch);
Ok(())
}

Expand All @@ -415,13 +422,9 @@ impl ChainListener {
if let Some(persisted_proof_id) = persisted_proof_id {
self.last_submitted_proof_id = persisted_proof_id.proof_id;
tracing::info!(target: "chain-listener", "Loaded persisted proof id {} saved on epoch {}", persisted_proof_id.proof_id, persisted_proof_id.epoch);
if persisted_proof_id.epoch != self.current_epoch {
tracing::info!(target: "chain-listener","Persisted proof id epoch is different from current epoch {}, resetting proof id", self.current_epoch);
self.reset_proof_id().await?;
}
} else {
tracing::info!(target: "chain-listener","No persisted proof id found, starting from zero");
self.reset_proof_id().await?;
tracing::info!(target: "chain-listener", "No persisted proof id found, starting from zero");
self.last_submitted_proof_id = ProofIdx::zero();
}

Ok(())
Expand Down Expand Up @@ -559,27 +562,33 @@ impl ChainListener {

let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect();

self.cc_compute_units
.extend(units.into_iter().map(|unit| (CUID::new(unit.id.0), unit)));

for cu in in_deal {
self.active_deals
.insert(cu.deal.to_string().into(), CUID::new(cu.id.0));
}
self.cc_compute_units = units
.into_iter()
.map(|unit| (CUID::new(unit.id.0), unit))
.collect();

let active = self
.cc_compute_units
.values()
.filter(|unit| unit.startEpoch <= self.current_epoch);

let pending = self
.cc_compute_units
.values()
.filter(|unit| unit.startEpoch > self.current_epoch);

for cu in &in_deal {
let cu_id = CUID::new(cu.id.0);
// TODO: in the future it should be BTreeMap<DealId, Vec<CUID>>, because deal will be able
// to use multiple CUs from one peer
self.active_deals.insert(cu.deal.to_string().into(), cu_id);
}

tracing::info!(target: "chain-listener",
"Compute units mapping: in cc {}/[{} pending], in deal {}",
self.cc_compute_units.len(),
pending.clone().count(),
self.active_deals.len()
in_deal.len()
);

tracing::info!(target: "chain-listener",
Expand All @@ -599,6 +608,12 @@ impl ChainListener {
.collect::<Vec<_>>()
);

// NOTE: cores are released after all the logs to simplify debug on failure
for cu_id in self.active_deals.values() {
self.core_distributor.release_worker_cores(&[*cu_id]);
self.acquire_core_for_deal(*cu_id)?;
}

Ok(())
}

Expand Down Expand Up @@ -677,15 +692,10 @@ impl ChainListener {
if epoch_changed {
// TODO: add epoch_number to metrics

// nonce changes every epoch
self.global_nonce = self.chain_connector.get_global_nonce().await?;
tracing::info!(target: "chain-listener",
"New global nonce: {}",
self.global_nonce
);

self.set_current_epoch(epoch_number);
self.reset_proof_id().await?;
self.set_global_nonce(self.chain_connector.get_global_nonce().await?)
.await?;
tracing::info!(target: "chain-listener", "Global nonce: {}", self.global_nonce);

if let Some(status) = self.get_commitment_status().await? {
tracing::info!(target: "chain-listener", "Current commitment status: {status:?}");
Expand Down Expand Up @@ -864,6 +874,12 @@ impl ChainListener {
return Ok(());
}

if self.active_units_count() == 0 {
tracing::info!(target: "chain-listener", "No active units found in this epoch {}", self.current_epoch);
self.stop_commitment().await?;
return Ok(());
}

tracing::info!(target: "chain-listener",
"Refreshing commitment, active compute units: {}",
self.cc_compute_units
Expand Down Expand Up @@ -1351,6 +1367,16 @@ impl ChainListener {
}
}

async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) -> eyre::Result<()> {
if self.global_nonce != global_nonce {
tracing::info!(target: "chain-listener", "Global changed, was {}, new global nonce is {global_nonce}", self.global_nonce);
self.global_nonce = global_nonce;
self.reset_proof_id().await?;
}

Ok(())
}

fn observe<F>(&self, f: F)
where
F: FnOnce(&ChainListenerMetrics),
Expand All @@ -1359,6 +1385,13 @@ impl ChainListener {
f(metrics);
}
}

fn active_units_count(&self) -> usize {
self.cc_compute_units
.iter()
.filter(|(_, cu)| cu.startEpoch <= self.current_epoch)
.count()
}
}

struct CUGroups {
Expand Down
Loading