Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use our storage traits and get rid of sqlx #1174

Merged
merged 29 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
920b0ba
Use our storage traits and get rid of sqlx
TheQuantumPhysicist Sep 13, 2023
3499a8d
Initial database-agnostic testing for API server
TheQuantumPhysicist Sep 14, 2023
af51570
Appease clippy and add license to some files
TheQuantumPhysicist Sep 14, 2023
1f30fe6
Enable transactional database for in-memory by cloning the initial state
TheQuantumPhysicist Sep 14, 2023
575e2b6
Basic API server in-memory integration tests
TheQuantumPhysicist Sep 15, 2023
dcd4765
Add missing license
TheQuantumPhysicist Sep 15, 2023
f25c91b
Implement transactional traits for postgres and prepare the test
TheQuantumPhysicist Sep 15, 2023
f3ee81c
Minor
TheQuantumPhysicist Sep 15, 2023
e2789f2
Cargo.lock changes from rebase
TheQuantumPhysicist Sep 17, 2023
557ac0c
Make transactional async
TheQuantumPhysicist Sep 17, 2023
0f64f56
Make ApiServerTransactionRo async
TheQuantumPhysicist Sep 17, 2023
82c6b07
Make transaction traits async
TheQuantumPhysicist Sep 17, 2023
08e6168
Make the first db method async
TheQuantumPhysicist Sep 17, 2023
7ee53fc
Make all API server storage functions async
TheQuantumPhysicist Sep 17, 2023
37b84c2
Re-enable tests in async mode
TheQuantumPhysicist Sep 17, 2023
afadefa
Replace postgres with tokio-postgres (incomplete due to issues with t…
TheQuantumPhysicist Sep 17, 2023
323b206
Finally, make it all async. All API server database stuff are async
TheQuantumPhysicist Sep 17, 2023
1eb723b
Minor
TheQuantumPhysicist Sep 17, 2023
ed3cec7
Uncomment tests
TheQuantumPhysicist Sep 18, 2023
06e158f
Ensure that transactions are wrapped asynchronously in the parent dat…
TheQuantumPhysicist Sep 19, 2023
423549e
Minor logging edit
TheQuantumPhysicist Sep 19, 2023
d9dc445
Minor
TheQuantumPhysicist Sep 19, 2023
3159799
Minor
TheQuantumPhysicist Sep 19, 2023
a3c90df
Comment added
TheQuantumPhysicist Sep 19, 2023
88aa1b5
Cleanup for api storage traits
TheQuantumPhysicist Sep 19, 2023
8945f2c
Merge pull request #1195 from mintlayer/fix/postgres-async
TheQuantumPhysicist Sep 21, 2023
89b61f1
Merge pull request #1223 from mintlayer/fix/postgres-async-with-tx-dr…
TheQuantumPhysicist Sep 21, 2023
5264ecb
Merge remote-tracking branch 'origin/master' into fix/postgres
TheQuantumPhysicist Sep 22, 2023
b0976f9
Merge pull request #1226 from mintlayer/fix/postgres-async-with-tx-dr…
TheQuantumPhysicist Sep 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
543 changes: 105 additions & 438 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = "2021"
members = [
"accounting", # Accounting and balances abstractions.
"api-server/api-server-common", # API server, for light-wallets and block explorers: common between web-server and scanner.
"api-server/storage-test-suite",# Test suite for the abstract storage layer of the API server to ensure consistent behavior.
"api-server/scanner-daemon", # API server, for light-wallets and block explorers: blockchain scanner daemon.
"api-server/scanner-lib", # API server, for light-wallets and block explorers: blockchain scanner library.
"api-server/web-server", # API server, for light-wallets and block explorers: web-server.
Expand Down Expand Up @@ -186,7 +187,6 @@ siphasher = "1.0"
slave-pool = "0.2"
snowstorm = "0.4"
socket2 = "0.5"
sqlx = "0.7"
sscanf = "0.4"
static_assertions = "1.1"
syn = "2.0"
Expand Down
5 changes: 4 additions & 1 deletion api-server/api-server-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ crypto = { path = '../../crypto/' }
logging = {path = '../../logging'}
serialization = { path = "../../serialization" }

async-trait.workspace = true
futures = { workspace = true, default-features = false }
parity-scale-codec.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
sqlx = { workspace = true, features = [ "runtime-tokio", "macros", "sqlite", "postgres" ] }
tokio-postgres = "0.7"
bb8-postgres = "0.8"

[dev-dependencies]
chainstate-test-framework = { path = '../../chainstate/test-framework' }
Expand Down
25 changes: 12 additions & 13 deletions api-server/api-server-common/src/storage/impls/in_memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,34 @@ use crate::storage::storage_api::{block_aux_data::BlockAuxData, ApiServerStorage

use super::CURRENT_STORAGE_VERSION;

#[derive(Debug, Clone)]
struct ApiServerInMemoryStorage {
block_table: BTreeMap<Id<Block>, Block>,
block_aux_data_table: BTreeMap<Id<Block>, BlockAuxData>,
main_chain_blocks_table: BTreeMap<BlockHeight, Id<Block>>,
transaction_table: BTreeMap<Id<Transaction>, (Option<Id<Block>>, SignedTransaction)>,
best_block: (BlockHeight, Id<GenBlock>),
storage_version: Option<u32>,
storage_version: u32,
}

impl ApiServerInMemoryStorage {
pub fn new(chain_config: &ChainConfig) -> Self {
Self {
let mut result = Self {
block_table: BTreeMap::new(),
block_aux_data_table: BTreeMap::new(),
main_chain_blocks_table: BTreeMap::new(),
transaction_table: BTreeMap::new(),
best_block: (0.into(), chain_config.genesis_block_id()),
storage_version: None,
}
storage_version: super::CURRENT_STORAGE_VERSION,
};
result
.initialize_storage(chain_config)
.expect("In-memory initialization must succeed");
result
}

fn is_initialized(&self) -> Result<bool, ApiServerStorageError> {
let storage_version_handle = self.storage_version;
Ok(storage_version_handle.is_some())
Ok(true)
}

fn get_block(&self, block_id: Id<Block>) -> Result<Option<Block>, ApiServerStorageError> {
Expand All @@ -74,7 +78,7 @@ impl ApiServerInMemoryStorage {
Ok(Some(tx.clone()))
}

fn get_storage_version(&self) -> Result<Option<u32>, ApiServerStorageError> {
fn get_storage_version(&self) -> Result<u32, ApiServerStorageError> {
let version_table_handle = self.storage_version;
Ok(version_table_handle)
}
Expand Down Expand Up @@ -115,7 +119,7 @@ impl ApiServerInMemoryStorage {
chain_config: &ChainConfig,
) -> Result<(), ApiServerStorageError> {
self.best_block = (0.into(), chain_config.genesis_block_id());
self.storage_version = Some(CURRENT_STORAGE_VERSION);
self.storage_version = CURRENT_STORAGE_VERSION;

Ok(())
}
Expand All @@ -140,11 +144,6 @@ impl ApiServerInMemoryStorage {
Ok(())
}

fn set_storage_version(&mut self, version: u32) -> Result<(), ApiServerStorageError> {
self.storage_version = Some(version);
Ok(())
}

fn set_best_block(
&mut self,
block_height: BlockHeight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

use common::chain::ChainConfig;

use crate::storage::storage_api::{
ApiServerStorage, ApiServerStorageError, ApiServerTransactionRo, ApiTransactionRw,
ApiServerStorage, ApiServerStorageError, ApiServerTransactionRo, ApiServerTransactionRw,
Transactional,
};

Expand All @@ -28,42 +28,54 @@ pub mod read;
pub mod write;

pub struct ApiServerInMemoryStorageTransactionalRo<'t> {
transaction: RwLockReadGuard<'t, ApiServerInMemoryStorage>,
transaction: tokio::sync::RwLockReadGuard<'t, ApiServerInMemoryStorage>,
}

impl<'t> ApiServerInMemoryStorageTransactionalRo<'t> {
fn new(storage: &'t TransactionalApiServerInMemoryStorage) -> Self {
async fn new(
storage: &'t TransactionalApiServerInMemoryStorage,
) -> ApiServerInMemoryStorageTransactionalRo<'t> {
Self {
transaction: storage.tx_ro(),
transaction: storage.tx_ro().await,
}
}
}

#[async_trait::async_trait]
impl<'t> ApiServerTransactionRo for ApiServerInMemoryStorageTransactionalRo<'t> {
fn close(self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
async fn close(self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
Ok(())
}
}

pub struct ApiServerInMemoryStorageTransactionalRw<'t> {
transaction: RwLockWriteGuard<'t, ApiServerInMemoryStorage>,
initial_data_before_tx: ApiServerInMemoryStorage,
}

impl<'t> ApiServerInMemoryStorageTransactionalRw<'t> {
fn new(storage: &'t mut TransactionalApiServerInMemoryStorage) -> Self {
async fn new(
storage: &'t mut TransactionalApiServerInMemoryStorage,
) -> ApiServerInMemoryStorageTransactionalRw<'t> {
let transaction = storage.tx_rw().await;
let initial_data_before_tx = transaction.clone();
Self {
transaction: storage.tx_rw(),
transaction,
initial_data_before_tx,
}
}
}

impl<'t> ApiTransactionRw for ApiServerInMemoryStorageTransactionalRw<'t> {
fn commit(self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
#[async_trait::async_trait]
impl<'t> ApiServerTransactionRw for ApiServerInMemoryStorageTransactionalRw<'t> {
async fn commit(self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
Ok(())
}

fn rollback(self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
unimplemented!()
async fn rollback(mut self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
// We restore the original data that was there when the transaction started
std::mem::swap(&mut *self.transaction, &mut self.initial_data_before_tx);
Ok(())
}
}

Expand All @@ -78,26 +90,31 @@ impl TransactionalApiServerInMemoryStorage {
}
}

fn tx_ro(&self) -> RwLockReadGuard<'_, ApiServerInMemoryStorage> {
self.storage.read().expect("Poisoned mutex")
async fn tx_ro(&self) -> RwLockReadGuard<'_, ApiServerInMemoryStorage> {
self.storage.read().await
}

fn tx_rw(&mut self) -> RwLockWriteGuard<'_, ApiServerInMemoryStorage> {
self.storage.write().expect("Poisoned mutex")
async fn tx_rw(&mut self) -> RwLockWriteGuard<'_, ApiServerInMemoryStorage> {
self.storage.write().await
}
}

#[async_trait::async_trait]
impl<'t> Transactional<'t> for TransactionalApiServerInMemoryStorage {
type TransactionRo = ApiServerInMemoryStorageTransactionalRo<'t>;

type TransactionRw = ApiServerInMemoryStorageTransactionalRw<'t>;

fn transaction_ro<'s: 't>(&'s self) -> Result<Self::TransactionRo, ApiServerStorageError> {
Ok(ApiServerInMemoryStorageTransactionalRo::new(self))
async fn transaction_ro<'s: 't>(
&'s self,
) -> Result<Self::TransactionRo, ApiServerStorageError> {
Ok(ApiServerInMemoryStorageTransactionalRo::new(self).await)
}

fn transaction_rw<'s: 't>(&'s mut self) -> Result<Self::TransactionRw, ApiServerStorageError> {
Ok(ApiServerInMemoryStorageTransactionalRw::new(self))
async fn transaction_rw<'s: 't>(
&'s mut self,
) -> Result<Self::TransactionRw, ApiServerStorageError> {
Ok(ApiServerInMemoryStorageTransactionalRw::new(self).await)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,45 @@ use crate::storage::storage_api::{

use super::ApiServerInMemoryStorageTransactionalRo;

#[async_trait::async_trait]
impl<'t> ApiServerStorageRead for ApiServerInMemoryStorageTransactionalRo<'t> {
fn is_initialized(&self) -> Result<bool, ApiServerStorageError> {
async fn is_initialized(&mut self) -> Result<bool, ApiServerStorageError> {
self.transaction.is_initialized()
}

fn get_block(&self, block_id: Id<Block>) -> Result<Option<Block>, ApiServerStorageError> {
async fn get_block(
&mut self,
block_id: Id<Block>,
) -> Result<Option<Block>, ApiServerStorageError> {
self.transaction.get_block(block_id)
}

fn get_transaction(
&self,
async fn get_transaction(
&mut self,
transaction_id: Id<Transaction>,
) -> Result<Option<(Option<Id<Block>>, SignedTransaction)>, ApiServerStorageError> {
self.transaction.get_transaction(transaction_id)
}

fn get_storage_version(&self) -> Result<Option<u32>, ApiServerStorageError> {
self.transaction.get_storage_version()
async fn get_storage_version(&mut self) -> Result<Option<u32>, ApiServerStorageError> {
Ok(Some(self.transaction.get_storage_version()?))
}

fn get_best_block(&self) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
async fn get_best_block(
&mut self,
) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
self.transaction.get_best_block()
}

fn get_block_aux_data(
&self,
async fn get_block_aux_data(
&mut self,
block_id: Id<Block>,
) -> Result<Option<BlockAuxData>, ApiServerStorageError> {
self.transaction.get_block_aux_data(block_id)
}

fn get_main_chain_block_id(
&self,
async fn get_main_chain_block_id(
&mut self,
block_height: BlockHeight,
) -> Result<Option<Id<Block>>, ApiServerStorageError> {
self.transaction.get_main_chain_block_id(block_height)
Expand Down
Loading
Loading