Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise memory usage while loading genesis #4269

Merged
merged 30 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d595fab
opt out of records loading while loading config
posvyatokum Apr 27, 2021
a3c5cd3
create GenesisValidator to allow validation while streaming records
posvyatokum Apr 27, 2021
fb76e11
adding streaming records from genesis and records files
posvyatokum Apr 27, 2021
b3a61de
adding records file path and streaming to Genesis
posvyatokum Apr 28, 2021
46f6d7d
stream records from file in json_hash if records field is empty
posvyatokum Apr 28, 2021
b1f9e62
stream records from file in genesis validation
posvyatokum Apr 28, 2021
9db4ba3
encapsulate records iteration vs streaming logic
posvyatokum Apr 29, 2021
6408175
apply genesis state in chunks
posvyatokum May 1, 2021
f58dc4c
refactor process_records usage
posvyatokum May 1, 2021
9ed31e5
switch to streaming genesis in neard run
posvyatokum May 1, 2021
e995761
remove unused import
posvyatokum May 1, 2021
cb1677c
Merge branch 'master' into optimize-genesis-loading
EgorKulikov May 5, 2021
c64669d
increase chunk size
posvyatokum May 5, 2021
b546bf5
punctuation
posvyatokum May 5, 2021
b26d7de
refactor genesis streaming
posvyatokum May 5, 2021
87d2f60
refactor ref
posvyatokum May 5, 2021
455b786
remove pub from GenesisValidator
posvyatokum May 5, 2021
0f5d445
Borrow<StateRecord> -> &StateRecord
posvyatokum May 5, 2021
4f606ec
process_records -> for_each_record
posvyatokum May 5, 2021
3fe0888
move genesis processing to separate file & genesis chunk -> genesis b…
posvyatokum May 5, 2021
9c4ac48
make GenesisStateApplier stateless
posvyatokum May 5, 2021
d63d621
pass error from File::open
posvyatokum May 5, 2021
fb04fd7
make RecordsProcessor visitor universal
posvyatokum May 5, 2021
2597ee1
Merge branch 'master' into optimize-genesis-loading
posvyatokum May 5, 2021
b0ad7f3
fix imports
posvyatokum May 5, 2021
2959101
move imports to tests
posvyatokum May 6, 2021
4587d00
Merge branch 'master' into optimize-genesis-loading
posvyatokum May 6, 2021
acfdf61
new_as_is -> new_with_path
posvyatokum May 7, 2021
fd2e67b
Merge branch 'master' into optimize-genesis-loading
posvyatokum May 7, 2021
7c918ca
minor fixes: imports and visibility
posvyatokum May 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 109 additions & 7 deletions core/chain-configs/src/genesis_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
//! contains `RuntimeConfig`, but we keep it here for now until we figure
//! out the better place.
use std::fs::File;
use std::io::BufReader;
use std::io::{BufReader, Read};
use std::marker::PhantomData;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::{fmt, io};

use chrono::{DateTime, Utc};
use num_rational::Rational;
use serde::{Deserialize, Serialize};
use serde::de::{self, DeserializeSeed, IgnoredAny, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Serializer;
use smart_default::SmartDefault;

Expand Down Expand Up @@ -180,6 +182,12 @@ pub struct Genesis {
#[serde(flatten)]
pub config: GenesisConfig,
pub records: GenesisRecords,
/// Genesis object may not contain records.
/// In this case records can be found in records_file.
/// The idea is that all records consume too much memory,
/// so they should be processed in streaming fashion with for_each_record.
#[serde(skip)]
pub records_file: PathBuf,
/// Using zero-size PhantomData is a Rust pattern preventing a structure being constructed
/// without calling `new` method, which has some initialization routine.
#[serde(skip)]
Expand Down Expand Up @@ -266,6 +274,74 @@ impl GenesisRecords {
}
}

/// Visitor for records.
/// Reads records one by one and passes them to sink.
/// If full genesis file is passed, reads records from "records" field and
/// IGNORES OTHER FIELDS.
struct RecordsProcessor<F> {
sink: F,
}

impl<'de, F: FnMut(StateRecord)> Visitor<'de> for RecordsProcessor<&'_ mut F> {
type Value = ();

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str(
"either:\
1. array of StateRecord\
2. map with records field which is array of StateRecord",
)
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
while let Some(record) = seq.next_element::<StateRecord>()? {
(self.sink)(record)
}
Ok(())
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"records" => {
map.next_value_seed(self)?;
return Ok(());
}
_ => {
map.next_value::<IgnoredAny>()?;
}
}
}
Err(de::Error::custom("missing field: records"))
}
}

impl<'de, F: FnMut(StateRecord)> DeserializeSeed<'de> for RecordsProcessor<&'_ mut F> {
type Value = ();

fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_seq(self)
}
}

fn stream_records_from_file(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is logically static method of RecordsProcessor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not really important): I'd say it's the other way around -- this function is the public API, and RecordsProcessor is it's private implementation detail. If it were a separate module (it doesn't have to be), it's best written as

// genesis_streaming.rs
pub fn stream_records_from_file(
    reader: impl Read,
    mut callback: impl FnMut(StateRecord),
) -> serde_json::Result<()> {
    let mut deserializer = serde_json::Deserializer::from_reader(reader);
    let records_processor = RecordsProcessor { sink: &mut callback };
    deserializer.deserialize_any(records_processor)
}

struct RecordsProcessor<F> { ... }

reader: impl Read,
mut callback: impl FnMut(StateRecord),
) -> serde_json::Result<()> {
let mut deserializer = serde_json::Deserializer::from_reader(reader);
let records_processor = RecordsProcessor { sink: &mut callback };
deserializer.deserialize_any(records_processor)
}

pub struct GenesisJsonHasher {
digest: sha2::Sha256,
}
Expand All @@ -287,9 +363,9 @@ impl GenesisJsonHasher {

pub fn process_genesis(&mut self, genesis: &Genesis) {
self.process_config(&genesis.config);
for record in genesis.records.as_ref() {
self.process_record(record)
}
genesis.for_each_record(|record: &StateRecord| {
self.process_record(record);
});
}

pub fn finalize(self) -> CryptoHash {
Expand All @@ -299,11 +375,16 @@ impl GenesisJsonHasher {

impl Genesis {
pub fn new(config: GenesisConfig, records: GenesisRecords) -> Self {
let mut genesis = Self { config, records, phantom: PhantomData };
let mut genesis =
Self { config, records, records_file: PathBuf::new(), phantom: PhantomData };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder it this should be records_file: None? That is, use Option to relfect in the type system that we might not have records_file at all.

genesis.config.total_supply = get_initial_supply(&genesis.records.as_ref());
genesis
}

pub fn new_with_path(config: GenesisConfig, records_file: PathBuf) -> Self {
Self { config, records: GenesisRecords(vec![]), records_file, phantom: PhantomData }
}

/// Reads Genesis from a single file.
pub fn from_file<P: AsRef<Path>>(path: P) -> Self {
let reader = BufReader::new(File::open(path).expect("Could not open genesis config file."));
Expand Down Expand Up @@ -337,6 +418,27 @@ impl Genesis {
hasher.process_genesis(self);
hasher.finalize()
}

fn stream_records_with_callback(&self, callback: impl FnMut(StateRecord)) -> io::Result<()> {
let reader = BufReader::new(File::open(&self.records_file)?);
stream_records_from_file(reader, callback).map_err(io::Error::from)
}

/// If records vector is empty processes records stream from records_file.
/// May panic if records_file is removed or is in wrong format.
pub fn for_each_record(&self, mut callback: impl FnMut(&StateRecord)) {
if self.records.as_ref().is_empty() {
let callback_move = |record: StateRecord| {
callback(&record);
};
self.stream_records_with_callback(callback_move)
.expect("error while streaming records");
} else {
for record in self.records.as_ref() {
callback(record);
}
}
}
}

pub fn get_initial_supply(records: &[StateRecord]) -> Balance {
Expand Down
13 changes: 13 additions & 0 deletions core/primitives/src/state_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,16 @@ fn to_printable(blob: &[u8]) -> String {
}
}
}

pub fn state_record_to_account_id(state_record: &StateRecord) -> &AccountId {
match state_record {
StateRecord::Account { account_id, .. }
| StateRecord::AccessKey { account_id, .. }
| StateRecord::Contract { account_id, .. }
| StateRecord::ReceivedData { account_id, .. }
| StateRecord::Data { account_id, .. } => account_id,
StateRecord::PostponedReceipt(receipt) | StateRecord::DelayedReceipt(receipt) => {
&receipt.receiver_id
}
}
}
30 changes: 25 additions & 5 deletions neard/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,12 +1046,13 @@ pub fn download_genesis(url: &String, path: &PathBuf) {
});
}

pub fn load_config(dir: &Path) -> NearConfig {
pub fn load_config_without_genesis_records(dir: &Path) -> NearConfig {
let config = Config::from_file(&dir.join(CONFIG_FILENAME));
let genesis = if let Some(ref genesis_records_file) = config.genesis_records_file {
Genesis::from_files(&dir.join(&config.genesis_file), &dir.join(genesis_records_file))
let genesis_config = GenesisConfig::from_file(&dir.join(&config.genesis_file));
let genesis_records_file = if let Some(genesis_records_file) = &config.genesis_records_file {
dir.join(genesis_records_file)
} else {
Genesis::from_file(&dir.join(&config.genesis_file))
dir.join(&config.genesis_file)
};
let validator_signer = if dir.join(&config.validator_key_file).exists() {
let signer =
Expand All @@ -1062,7 +1063,26 @@ pub fn load_config(dir: &Path) -> NearConfig {
None
};
let network_signer = InMemorySigner::from_file(&dir.join(&config.node_key_file));
NearConfig::new(config, genesis, (&network_signer).into(), validator_signer)
NearConfig::new(
config,
Genesis::new_with_path(genesis_config, genesis_records_file),
(&network_signer).into(),
validator_signer,
)
}

pub fn load_config(dir: &Path) -> NearConfig {
let mut near_config = load_config_without_genesis_records(dir);
near_config.genesis =
if let Some(ref genesis_records_file) = near_config.config.genesis_records_file {
Genesis::from_files(
&dir.join(&near_config.config.genesis_file),
&dir.join(genesis_records_file),
)
} else {
Genesis::from_file(&dir.join(&near_config.config.genesis_file))
};
near_config
}

pub fn load_test_config(seed: &str, port: u16, genesis: Genesis) -> NearConfig {
Expand Down
Loading