Skip to content

Commit

Permalink
refactor(statedb/export): refactor export logic and enhance logging (#…
Browse files Browse the repository at this point in the history
…2408)

1. export top level fields of object
2. implements detailed logging during export processes
3. improves CSV writer configuration for better performance.
4. remove dead code
  • Loading branch information
popcnt1 authored Aug 11, 2024
1 parent 8289948 commit 112df71
Showing 1 changed file with 57 additions and 148 deletions.
205 changes: 57 additions & 148 deletions crates/rooch/src/commands/statedb/commands/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use rooch_types::error::{RoochError, RoochResult};
use rooch_types::framework::address_mapping::RoochToBitcoinAddressMapping;
use rooch_types::rooch_network::RoochChainID;

use crate::commands::statedb::commands::{
init_job, GLOBAL_STATE_TYPE_FIELD, GLOBAL_STATE_TYPE_OBJECT, GLOBAL_STATE_TYPE_ROOT,
};
use crate::commands::statedb::commands::{init_job, GLOBAL_STATE_TYPE_ROOT};

/// Export statedb
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -131,9 +129,36 @@ impl FromStr for ExportID {
#[serde(rename_all = "kebab-case")]
pub enum ExportObjectName {
#[default]
Root,
UtxoStore,
InscriptionStore,
AddressMap,
Unknown,
}

impl ExportObjectName {
pub fn object_id(&self) -> Option<ObjectID> {
match self {
ExportObjectName::UtxoStore => Some(BitcoinUTXOStore::object_id()),
ExportObjectName::InscriptionStore => Some(InscriptionStore::object_id()),
ExportObjectName::AddressMap => Some(RoochToBitcoinAddressMapping::object_id()),
ExportObjectName::Root => Some(ObjectID::root()),
ExportObjectName::Unknown => None,
}
}
pub fn from_object_id(object_id: ObjectID) -> Self {
if object_id == ObjectID::root() {
ExportObjectName::Root
} else if object_id == BitcoinUTXOStore::object_id() {
ExportObjectName::UtxoStore
} else if object_id == InscriptionStore::object_id() {
ExportObjectName::InscriptionStore
} else if object_id == RoochToBitcoinAddressMapping::object_id() {
ExportObjectName::AddressMap
} else {
ExportObjectName::Unknown
}
}
}

impl Display for ExportObjectName {
Expand All @@ -142,6 +167,8 @@ impl Display for ExportObjectName {
ExportObjectName::UtxoStore => write!(f, "utxo-store"),
ExportObjectName::InscriptionStore => write!(f, "inscription-store"),
ExportObjectName::AddressMap => write!(f, "address-map"),
ExportObjectName::Root => write!(f, "root"),
_ => write!(f, "unknown"),
}
}
}
Expand All @@ -154,7 +181,8 @@ impl FromStr for ExportObjectName {
"utxo-store" => Ok(ExportObjectName::UtxoStore),
"inscription-store" => Ok(ExportObjectName::InscriptionStore),
"address-map" => Ok(ExportObjectName::AddressMap),
_ => Err("object-name no match"),
"root" => Ok(ExportObjectName::Root),
_ => Ok(ExportObjectName::Unknown),
}
}
}
Expand Down Expand Up @@ -199,7 +227,10 @@ impl ExportCommand {

let file_name = self.output.display().to_string();
let mut writer_builder = csv::WriterBuilder::new();
let writer_builder = writer_builder.delimiter(b',').double_quote(false);
let writer_builder = writer_builder
.delimiter(b',')
.double_quote(false)
.buffer_capacity(1 << 23);
let mut writer = writer_builder.from_path(file_name).map_err(|e| {
RoochError::from(anyhow::Error::msg(format!("Invalid output path: {}", e)))
})?;
Expand All @@ -223,15 +254,11 @@ impl ExportCommand {
Self::export_indexer(&moveos_store, root_state_root, &mut writer)?;
}
ExportMode::Object => {
let obj_id = self.object_id.unwrap_or_else(|| {
match self
.object_name
let obj_id: ObjectID = self.object_id.unwrap_or_else(|| {
self.object_name
.expect("object name must be existed if object id not provided")
{
ExportObjectName::UtxoStore => BitcoinUTXOStore::object_id(),
ExportObjectName::InscriptionStore => InscriptionStore::object_id(),
ExportObjectName::AddressMap => RoochToBitcoinAddressMapping::object_id(),
}
.object_id()
.expect("object id must be existed")
});
Self::export_object(&moveos_store, root_state_root, obj_id, &mut writer)?;
}
Expand Down Expand Up @@ -322,6 +349,9 @@ impl ExportCommand {
let starting_key = None;
let mut count: u64 = 0;

let object_name =
object_name.unwrap_or(ExportObjectName::from_object_id(object_id.clone()).to_string());

let iter = moveos_store
.get_state_store()
.iter(obj_state_root, starting_key)?;
Expand All @@ -330,12 +360,18 @@ impl ExportCommand {
let (k, v) = item?;
writer.write_record([k.to_string().as_str(), v.to_string().as_str()])?;
count += 1;
if count % 1_000_000 == 0 {
println!(
"exporting top_level_fields of object_id: {:?}({}), exported count: {}",
object_id, object_name, count
)
}
}

println!(
"export_top_level_fields object_id {:?}({}), state_root: {:?} export field counts {}",
"Done. export_top_level_fields of object_id: {:?}({}), state_root: {:?}, exported count: {}",
object_id,
object_name.unwrap_or("unknown".to_string()),
object_name,
obj_state_root,
count
);
Expand Down Expand Up @@ -369,144 +405,17 @@ impl ExportCommand {
object_id: ObjectID,
writer: &mut Writer<W>,
) -> Result<()> {
println!("export_object object_id: {:?}", object_id);

let obj = moveos_store
.get_field_at(root_state_root, &object_id.field_key())?
.expect("state should exist.");

let state_root = obj.state_root();
let timestamp = obj.updated_at();
// write csv field states
Self::export_field_states(
moveos_store,
state_root,
root_state_root,
object_id.clone(),
false,
true,
writer,
)?;

// write csv object states.
{
let export_id =
ExportID::new(object_id.clone(), state_root, root_state_root, timestamp);
writer.write_field(GLOBAL_STATE_TYPE_OBJECT)?;
writer.write_field(export_id.to_string())?;
writer.write_record(None::<&[u8]>)?;
}
writer.write_field(object_id.field_key().to_string())?;
//TODO
//writer.write_field(obj.to_string())?;
writer.write_record(None::<&[u8]>)?;

// flush csv writer
let state_root = if object_id == ObjectID::root() {
root_state_root
} else {
get_state_root(moveos_store, root_state_root, object_id.field_key())
};
Self::export_top_level_fields(moveos_store, state_root, object_id, None, writer)?;
writer.flush()?;
println!("export_object root state_root: {:?}", root_state_root);

Ok(())
}

fn export_field_states<W: std::io::Write>(
moveos_store: &MoveOSStore,
state_root: H256,
parent_state_root: H256,
object_id: ObjectID,
// export child field as object state under indexer mode
is_child_field_as_object_state: bool,
is_recursive_export_child_field: bool,
writer: &mut Writer<W>,
) -> Result<()> {
let starting_key = None;
let mut count: u64 = 0;

let mut iter = moveos_store
.get_state_store()
.iter(state_root, starting_key)?;

if is_recursive_export_child_field && object_id.has_child() {
for item in iter {
let (_k, _v) = item?;
//TODO
// if v.is_object() {
// let object = v.clone().as_raw_object()?;
// if object.size > 0 {
// Self::export_field_states(
// moveos_store,
// object.state_root(),
// state_root,
// object.id,
// false,
// false,
// writer,
// )?;
// }
// }
}

// seek from starting_key
iter = moveos_store
.get_state_store()
.iter(state_root, starting_key)?;
}

// write csv header.
{
let state_type = if is_child_field_as_object_state {
GLOBAL_STATE_TYPE_OBJECT
} else {
GLOBAL_STATE_TYPE_FIELD
};
let export_id = ExportID::new(object_id.clone(), state_root, parent_state_root, 0);
writer.write_field(state_type)?;
writer.write_field(export_id.to_string())?;
writer.write_record(None::<&[u8]>)?;
}

for item in iter {
let (k, _v) = item?;
writer.write_field(k.to_string())?;
//TODO
//writer.write_field(v.to_string())?;
writer.write_record(None::<&[u8]>)?;

count += 1;
}

println!(
"export_field_states object_id {:?}, state_root: {:?} export field counts {}",
object_id, state_root, count
);
Ok(())
}
}

#[allow(dead_code)]
// export root's export id for further checking in import job.
fn export_root_export_id<W: std::io::Write>(
root_state_root: H256,
writer: &mut Writer<W>,
) -> Result<()> {
let root_export_id = ExportID::new(ObjectID::root(), root_state_root, root_state_root, 0);
writer.write_record([GLOBAL_STATE_TYPE_ROOT, root_export_id.to_string().as_str()])?;
Ok(())
}

#[allow(dead_code)]
fn export_fields<W: std::io::Write>(
moveos_store: &MoveOSStore,
state_root: H256,
writer: &mut Writer<W>,
field_keys: Vec<FieldKey>,
) -> Result<()> {
let state_kvs = get_object_states(moveos_store, state_root, field_keys);
for (k, v) in state_kvs.into_iter() {
writer.write_record([k.to_string().as_str(), v.to_string().as_str()])?;
}
Ok(())
}

fn get_object_states(
moveos_store: &MoveOSStore,
state_root: H256,
Expand Down

0 comments on commit 112df71

Please sign in to comment.