Skip to content

Commit

Permalink
Merge pull request #1 from zianksm/feature-adjust-interface
Browse files Browse the repository at this point in the history
Feature adjust interface
  • Loading branch information
zianksm authored Jun 12, 2023
2 parents 3868bb3 + 3bbf77a commit cead3cc
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ impl MigrationTransaction {
.take()
.expect("inner transaction should have been built");

let (progress, _) = ExtrinsicSubmitter::submit(tx.into()).await.unwrap();
let progress = ExtrinsicSubmitter::submit(tx.into()).await.unwrap();
let notifier_channel = self.notifier.clone();

extrinsics::Transaction::new(progress, notifier_channel, None)
extrinsics::Transaction::new(progress, Some(notifier_channel), None)
}
}

Expand Down
10 changes: 5 additions & 5 deletions dhatu/src/tx/dhatu_assets/migration_transaction/types.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use subxt::{tx::SubmittableExtrinsic, OnlineClient, PolkadotConfig};

use crate::tx::extrinsics::prelude::{
transfer_nft_contract::types::ContractTransactionPayload, NotificationMessage,
};
use crate::{tx::extrinsics::prelude::{
transfer_nft_contract::types::ContractTransactionPayload, extrinsics::TransactionMessage,
}, types::{SenderChannel, ReceiverChannel}};

pub type MigrationTask<T> = std::pin::Pin<Box<dyn futures::Future<Output = T>>>;
pub type MigrationTransactionPayload = ContractTransactionPayload;
pub type MigrationTransactionResultNotifier =
tokio::sync::mpsc::UnboundedSender<NotificationMessage>;
SenderChannel<TransactionMessage>;
pub type MigrationTransactionResultReceiver =
tokio::sync::mpsc::UnboundedReceiver<NotificationMessage>;
ReceiverChannel<TransactionMessage>;
pub type MigrationTransaction = SubmittableExtrinsic<PolkadotConfig, OnlineClient<PolkadotConfig>>;
49 changes: 44 additions & 5 deletions dhatu/src/tx/extrinsics/callback_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{str::FromStr};
use std::str::FromStr;

use serde_json::Value;
use serde::Serialize;


use crate::{error::Error};
use super::prelude::enums::{ExtrinsicStatus, Hash};
use crate::error::Error;

#[cfg(feature = "tokio")]
pub struct Executor {
http_connection_pool: reqwest::Client,
}
Expand Down Expand Up @@ -43,14 +45,51 @@ impl Executor {
}
}

pub fn execute(&self, body: Value, callback: &str) -> Result<(), Error> {
#[cfg(feature = "tokio")]
#[cfg(feature = "serde")]
pub fn execute(&self, status: ExtrinsicStatus, callback_url: &str) -> Result<(), Error> {
let client = self.http_connection_pool.clone();

let callback = Url::from_str(callback)?;
let body = Self::infer_callback_body(status);

let callback = Url::from_str(callback_url)?;
let task = client.post(callback.0).json(&body).send();

tokio::task::spawn(task);

Ok(())
}

fn infer_callback_body(status: ExtrinsicStatus) -> CallBackBody<Hash> {
match status {
ExtrinsicStatus::Pending => CallBackBody::new(false, String::from("pending"), None),
ExtrinsicStatus::Failed(reason) => CallBackBody::new(
false,
format!("failed with reason : {}", reason.inner()),
None,
),
ExtrinsicStatus::Success(result) => {
CallBackBody::new(true, String::from("success"), Some(result.hash()))
}
}
}
}

#[cfg(feature = "serde")]
#[derive(Serialize)]
pub struct CallBackBody<Data: Serialize> {
status: bool,
message: String,
data: Option<Data>,
}

#[cfg(feature = "serde")]
impl<Data: Serialize> CallBackBody<Data> {
pub fn new(status: bool, message: String, data: Option<Data>) -> Self {
Self {
status,
message,
data,
}
}
}
6 changes: 1 addition & 5 deletions dhatu/src/tx/extrinsics/extrinsics_submitter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use crate::{
error::Error,
tx::extrinsics::{
prelude::TransactionId,
types::{Extrinsic, ExtrinsicTracker, GenericError},
},
types::{MandalaExtrinsics, MandalaTransactionProgress},
};

Expand All @@ -14,7 +10,7 @@ impl ExtrinsicSubmitter {
let result = tx.into_inner()
.submit_and_watch()
.await
.map_err(|e| Error::TransactionSubmitError(e))?
.map_err(Error::TransactionSubmitError)?
.into();

Ok(result)
Expand Down
12 changes: 7 additions & 5 deletions dhatu/src/tx/extrinsics/extrinsics_tracker/enums.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::sync::Arc;

use serde::Serialize;
use sp_core::H256;
use subxt::blocks::ExtrinsicEvents;

use crate::types::MandalaConfig;

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg(feature = "serde")]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct Reason(String);

impl Reason {
Expand Down Expand Up @@ -55,11 +57,11 @@ impl ExtrinsicResult {

impl From<ExtrinsicEvents<MandalaConfig>> for ExtrinsicResult {
fn from(value: ExtrinsicEvents<MandalaConfig>) -> Self {
Self(value)
Self(Arc::new(value))
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg(feature = "serde")]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct Hash(String);

impl From<ExtrinsicResult> for Hash {
Expand Down Expand Up @@ -88,7 +90,7 @@ impl Hash {
// we disable this by default because substrate sp_core does not follow semver
// and we need to have a stable public api!
#[cfg(feature = "unstable_sp_core")]
pub fn hash(&self) -> H256 {
pub fn into_inner(&self) -> H256 {
use std::str::FromStr;

H256::from_str(self.inner_as_str()).expect("internal conversion shouldn't fail!")
Expand Down
52 changes: 37 additions & 15 deletions dhatu/src/tx/extrinsics/extrinsics_tracker/extrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,47 @@ use std::sync::Arc;
use sp_core::H256;

use tokio::sync::{
mpsc::{Receiver, Sender},
Mutex, RwLock,
mpsc::{Receiver, Sender}, RwLock,
};

use crate::{
tx::extrinsics::{
prelude::{NotificationMessage, TransactionId},
types::ExtrinsicTracker,
},
types::{MandalaTransactionProgress, SenderChannel},
};

use super::enums::{ExtrinsicStatus, Hash};

pub struct TransactionMessage {
pub(crate) status: ExtrinsicStatus,
pub(crate) callback: Option<String>,
pub(crate) id: Hash,
}

impl TransactionMessage {
pub fn new(status: ExtrinsicStatus, callback: Option<String>, id: Hash) -> Self {
Self {
status,
callback,
id,
}
}

pub fn inner_status(&self) -> ExtrinsicStatus {
self.status.clone()
}

pub fn callback(&self) -> Option<&String> {
self.callback.as_ref()
}

pub fn id(&self) -> &Hash {
&self.id
}
}

#[cfg(feature = "tokio")]
pub struct Transaction {
id: H256,
status: Arc<RwLock<ExtrinsicStatus>>,
transaction_notifier: SenderChannel<ExtrinsicStatus>,
}

impl Transaction {
Expand All @@ -39,40 +61,40 @@ impl Transaction {
impl Transaction {
pub fn new(
tx: MandalaTransactionProgress,
external_notifier: SenderChannel<ExtrinsicStatus>,
external_notifier: Option<SenderChannel<TransactionMessage>>,
callback: Option<String>,
) -> Self {
let hash = tx.0.extrinsic_hash();
let task_channel = Self::process_transaction(tx, external_notifier.clone(), callback);
let task_channel = Self::process_transaction(tx, external_notifier, callback);

let default_status = Self::watch_transaction_status(task_channel);

Self {
transaction_notifier: external_notifier,
id: hash,
status: default_status,
}
}

fn process_transaction(
tx: MandalaTransactionProgress,
external_status_notifier: SenderChannel<ExtrinsicStatus>,
external_status_notifier: Option<SenderChannel<TransactionMessage>>,
callback: Option<String>,
) -> Receiver<ExtrinsicStatus> {
let (internal_status_notifier, receiver) = Self::create_channel();

let task = async move {

let id = tx.0.extrinsic_hash().into();
let status = Self::wait(tx).await;

internal_status_notifier
.send(status.clone())
.await
.expect("there should be only 1 message sent");

external_status_notifier
.send(status)
.unwrap();
if let Some(external_status_notifier) = external_status_notifier {
let msg = TransactionMessage::new(status, callback, id);
external_status_notifier.send(msg);
}
};
tokio::task::spawn(task);
receiver
Expand Down
42 changes: 19 additions & 23 deletions dhatu/src/tx/extrinsics/extrinsics_tracker/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,55 @@
use std::{collections::HashMap, sync::Arc};

use sp_core::H256;

use tokio::sync::RwLock;

use crate::{tx::extrinsics::{
prelude::{NotificationMessage, TransactionId},
types::{BlockchainClient, ExtrinsicTracker},
}, types::SenderChannel};
use crate::{
types::{MandalaClient, MandalaTransactionProgress, SenderChannel},
};

use super::{enums::{ExtrinsicStatus, Hash}, extrinsics::Transaction};
use super::{
enums::{ExtrinsicStatus, Hash},
extrinsics::{Transaction, TransactionMessage},
};

#[doc(hidden)]
type Inner = Arc<RwLock<HashMap<Hash, Transaction>>>;

pub struct ExtrinsicWatcher {
inner: Inner,
client: BlockchainClient,
transaction_notifier:SenderChannel<ExtrinsicStatus>,
}

impl Clone for ExtrinsicWatcher {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
client: self.client.clone(),
transaction_notifier: self.transaction_notifier.clone(),
}
}
}

impl ExtrinsicWatcher {
pub fn new(
client: BlockchainClient,
transaction_notifier:SenderChannel<ExtrinsicStatus>,
) -> Self {
pub fn new(_client: MandalaClient) -> Self {
let inner = HashMap::new();
let inner = Arc::new(RwLock::new(inner));

Self {
inner,
client,
transaction_notifier,
}
Self { inner }
}

pub async fn watch(&self, tx: ExtrinsicTracker, callback: Option<String>) -> TransactionId {
let tx = Transaction::new(tx.into(), self.transaction_notifier.clone(), callback);
pub async fn watch(
&self,
tx: MandalaTransactionProgress,
external_notifier: Option<SenderChannel<TransactionMessage>>,
callback: Option<String>,
) -> Hash {
let tx = Transaction::new(tx, external_notifier, callback);
let tx_id = tx.id();

self.watch_tx(tx).await;

tx_id
}

pub async fn check(&self, tx_id: &TransactionId) -> Option<ExtrinsicStatus> {
pub async fn check(&self, tx_id: &Hash) -> Option<ExtrinsicStatus> {
let inner = self.inner.read().await;

let Some(tx) = inner.get(tx_id) else {
Expand All @@ -63,7 +59,7 @@ impl ExtrinsicWatcher {
Some(tx.status().await)
}

pub async fn stop_watching(&self, tx_id: &TransactionId) {
pub async fn stop_watching(&self, tx_id: &Hash) {
let mut inner = self.inner.write().await;
inner.remove(tx_id);
}
Expand Down
1 change: 0 additions & 1 deletion dhatu/src/tx/extrinsics/funds_reserve/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod reserve;
pub mod traits;
2 changes: 1 addition & 1 deletion dhatu/src/tx/extrinsics/funds_reserve/reserve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl FundsReserve {
.await
.map_err(FundsReserveError::RpcError)?;

let status = Transaction::wait(tx).await;
let status = Transaction::wait(tx.into()).await;

Ok(status)
}
Expand Down
Empty file.
Loading

0 comments on commit cead3cc

Please sign in to comment.