Skip to content

Commit

Permalink
Implement process_event function for SignerEvent<T>
Browse files Browse the repository at this point in the history
Signed-off-by: Jacinta Ferrant <jacinta@trustmachines.co>
  • Loading branch information
jferrant committed Dec 12, 2024
1 parent 5c1ae90 commit f96a33f
Showing 1 changed file with 68 additions and 124 deletions.
192 changes: 68 additions & 124 deletions libsigner/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,21 +305,18 @@ impl<T: SignerEventTrait> EventReceiver<T> for SignerEventReceiver<T> {
&request.method(),
)));
}
debug!("Processing {} event", request.url());
if request.url() == "/stackerdb_chunks" {
process_stackerdb_event(event_receiver.local_addr, request)
.map_err(|e| {
error!("Error processing stackerdb_chunks message"; "err" => ?e);
e
})
process_event::<T, StackerDBChunksEvent>(request)
} else if request.url() == "/proposal_response" {
process_proposal_response(request)
process_event::<T, BlockValidateResponse>(request)
} else if request.url() == "/new_burn_block" {
process_new_burn_block_event(request)
process_event::<T, BurnBlockEvent>(request)
} else if request.url() == "/shutdown" {
event_receiver.stop_signal.store(true, Ordering::SeqCst);
return Err(EventError::Terminated);
Err(EventError::Terminated)
} else if request.url() == "/new_block" {
process_new_block(request)
process_event::<T, BlockEvent>(request)
} else {
let url = request.url().to_string();
debug!(
Expand Down Expand Up @@ -391,12 +388,13 @@ fn ack_dispatcher(request: HttpRequest) {

// TODO: add tests from mutation testing results #4835
#[cfg_attr(test, mutants::skip)]
/// Process a stackerdb event from the node
fn process_stackerdb_event<T: SignerEventTrait>(
local_addr: Option<SocketAddr>,
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
fn process_event<T, E>(mut request: HttpRequest) -> Result<SignerEvent<T>, EventError>
where
T: SignerEventTrait,
E: serde::de::DeserializeOwned + TryInto<SignerEvent<T>, Error = EventError>,
{
let mut body = String::new();

if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
ack_dispatcher(request);
Expand All @@ -405,27 +403,12 @@ fn process_stackerdb_event<T: SignerEventTrait>(
&e
)));
}

debug!("Got stackerdb_chunks event"; "chunks_event_body" => %body);
let event: StackerDBChunksEvent = serde_json::from_slice(body.as_bytes())
// Regardless of whether we successfully deserialize, we should ack the dispatcher so they don't keep resending it
ack_dispatcher(request);
let json_event: E = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;

let event_contract_id = event.contract_id.clone();

let signer_event = match SignerEvent::try_from(event) {
Err(e) => {
info!(
"[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this",
local_addr,
event_contract_id
);
ack_dispatcher(request);
return Err(e);
}
Ok(x) => x,
};

ack_dispatcher(request);
let signer_event: SignerEvent<T> = json_event.try_into()?;

Ok(signer_event)
}
Expand Down Expand Up @@ -472,108 +455,69 @@ impl<T: SignerEventTrait> TryFrom<StackerDBChunksEvent> for SignerEvent<T> {
}
}

/// Process a proposal response from the node
fn process_proposal_response<T: SignerEventTrait>(
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
debug!("Got proposal_response event");
let mut body = String::new();
if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
impl<T: SignerEventTrait> TryFrom<BlockValidateResponse> for SignerEvent<T> {
type Error = EventError;

ack_dispatcher(request);
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
fn try_from(block_validate_response: BlockValidateResponse) -> Result<Self, Self::Error> {
Ok(SignerEvent::BlockValidationResponse(
block_validate_response,
))
}
}

let event: BlockValidateResponse = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;

ack_dispatcher(request);
Ok(SignerEvent::BlockValidationResponse(event))
#[derive(Debug, Deserialize)]
struct BurnBlockEvent {
burn_block_hash: String,
burn_block_height: u64,
reward_recipients: Vec<serde_json::Value>,
reward_slot_holders: Vec<String>,
burn_amount: u64,
}

/// Process a new burn block event from the node
fn process_new_burn_block_event<T: SignerEventTrait>(
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
debug!("Got burn_block event");
let mut body = String::new();
if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
impl<T: SignerEventTrait> TryFrom<BurnBlockEvent> for SignerEvent<T> {
type Error = EventError;

ack_dispatcher(request);
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
}
#[derive(Debug, Deserialize)]
struct TempBurnBlockEvent {
burn_block_hash: String,
burn_block_height: u64,
reward_recipients: Vec<serde_json::Value>,
reward_slot_holders: Vec<String>,
burn_amount: u64,
fn try_from(burn_block_event: BurnBlockEvent) -> Result<Self, Self::Error> {
let burn_header_hash = burn_block_event
.burn_block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
BurnchainHeaderHash::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;

Ok(SignerEvent::NewBurnBlock {
burn_height: burn_block_event.burn_block_height,
received_time: SystemTime::now(),
burn_header_hash,
})
}
let temp: TempBurnBlockEvent = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;
let burn_header_hash = temp
.burn_block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
BurnchainHeaderHash::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;
let event = SignerEvent::NewBurnBlock {
burn_height: temp.burn_block_height,
received_time: SystemTime::now(),
burn_header_hash,
};
ack_dispatcher(request);
Ok(event)
}

/// Process a new burn block event from the node
fn process_new_block<T: SignerEventTrait>(
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
debug!("Got new_block event");
let mut body = String::new();
if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
#[derive(Debug, Deserialize)]
struct BlockEvent {
block_hash: String,
block_height: u64,
}

ack_dispatcher(request);
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
}
#[derive(Debug, Deserialize)]
struct TempBlockEvent {
block_hash: String,
block_height: u64,
}
impl<T: SignerEventTrait> TryFrom<BlockEvent> for SignerEvent<T> {
type Error = EventError;

let temp: TempBlockEvent = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;
let block_hash: Sha512Trunc256Sum = temp
.block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
Sha512Trunc256Sum::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;
let event = SignerEvent::NewBlock {
block_hash,
block_height: temp.block_height,
};
ack_dispatcher(request);
Ok(event)
fn try_from(block_event: BlockEvent) -> Result<Self, Self::Error> {
let block_hash: Sha512Trunc256Sum = block_event
.block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
Sha512Trunc256Sum::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;
Ok(SignerEvent::NewBlock {
block_hash,
block_height: block_event.block_height,
})
}
}

pub fn get_signers_db_signer_set_message_id(name: &str) -> Option<(u32, u32)> {
Expand Down

0 comments on commit f96a33f

Please sign in to comment.