Skip to content

Commit

Permalink
commit file sinks after rewards db purged
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Apr 24, 2023
1 parent 9913e1f commit e4dca48
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 31 deletions.
32 changes: 14 additions & 18 deletions iot_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,11 @@ impl Rewarder {
.await?
// Await the returned oneshot to ensure we wrote the file
.await??;

let written_files = self.rewards_sink.commit().await?.await??;
// Write the rewards manifest for the completed period
self.reward_manifests_sink
.write(
RewardManifest {
start_timestamp: scheduler.reward_period.start.encode_timestamp(),
end_timestamp: scheduler.reward_period.end.encode_timestamp(),
written_files,
},
[],
)
.await?
.await??;

self.reward_manifests_sink.commit().await?;

let mut transaction = self.pool.begin().await?;

// Clear gateway shares table period to end of reward period
GatewayShares::clear_rewarded_shares(&mut transaction, scheduler.reward_period.end).await?;

save_rewarded_timestamp(
"last_rewarded_end_time",
&scheduler.reward_period.end,
Expand All @@ -122,9 +105,22 @@ impl Rewarder {
&mut transaction,
)
.await?;

transaction.commit().await?;

// now that the db has been purged, safe to write out the manifest
self.reward_manifests_sink
.write(
RewardManifest {
start_timestamp: scheduler.reward_period.start.encode_timestamp(),
end_timestamp: scheduler.reward_period.end.encode_timestamp(),
written_files,
},
[],
)
.await?
.await??;
self.reward_manifests_sink.commit().await?;

Ok(())
}
}
Expand Down
24 changes: 11 additions & 13 deletions mobile_verifier/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,17 @@ impl VerifierDaemon {

let written_files = self.mobile_rewards.commit().await?.await??;

// Write out the manifest file
let mut transaction = self.pool.begin().await?;
// Clear the heartbeats table:
sqlx::query("TRUNCATE TABLE heartbeats;")
.execute(&mut transaction)
.await?;

save_last_rewarded_end_time(&mut transaction, &scheduler.reward_period.end).await?;
save_next_rewarded_end_time(&mut transaction, &scheduler.next_reward_period().end).await?;
transaction.commit().await?;

// now that the db has been purged, safe to write out the manifest
self.reward_manifests
.write(
RewardManifest {
Expand All @@ -185,18 +195,6 @@ impl VerifierDaemon {

self.reward_manifests.commit().await?;

let mut transaction = self.pool.begin().await?;

// Clear the heartbeats table:
sqlx::query("TRUNCATE TABLE heartbeats;")
.execute(&mut transaction)
.await?;

save_last_rewarded_end_time(&mut transaction, &scheduler.reward_period.end).await?;
save_next_rewarded_end_time(&mut transaction, &scheduler.next_reward_period().end).await?;

transaction.commit().await?;

Ok(())
}
}
Expand Down

0 comments on commit e4dca48

Please sign in to comment.