Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Shred filter
Browse files Browse the repository at this point in the history
Thread bank_forks into shred fetch
  • Loading branch information
sakridge committed Mar 20, 2020
1 parent c638e83 commit 867b5e9
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 9 deletions.
1 change: 1 addition & 0 deletions archiver-lib/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ impl Archiver {
shred_forward_sockets,
repair_socket.clone(),
&shred_fetch_sender,
None,
&exit,
);
let (slot_sender, slot_receiver) = channel();
Expand Down
108 changes: 99 additions & 9 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,131 @@
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
use bv::BitVec;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT;
use solana_ledger::shred::{OFFSET_OF_SHRED_INDEX, SIZE_OF_SHRED_INDEX};
use solana_ledger::shred::{
OFFSET_OF_SHRED_INDEX, OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_INDEX, SIZE_OF_SHRED_SLOT,
};
use solana_perf::cuda_runtime::PinnedVec;
use solana_perf::packet::{limited_deserialize, Packet, PacketsRecycler};
use solana_perf::recycler::Recycler;
use solana_sdk::clock::Slot;
use solana_streamer::streamer::{self, PacketReceiver, PacketSender};
use std::collections::HashMap;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::sync::RwLock;
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;

pub type ShredsReceived = HashMap<Slot, BitVec<u64>>;

pub struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}

impl ShredFetchStage {
// updates packets received on a channel and sends them on another channel
fn modify_packets<F>(recvr: PacketReceiver, sendr: PacketSender, modify: F)
where
fn modify_packets<F>(
recvr: PacketReceiver,
sendr: PacketSender,
bank_forks: Option<Arc<RwLock<BankForks>>>,
modify: F,
) where
F: Fn(&mut Packet),
{
let mut shreds_received = ShredsReceived::default();
let mut count = 0;
let mut last_print = Instant::now();
let mut last_cleared = Instant::now();

// In the case of bank_forks=None, setup to accept any slot range
let mut last_root = 0;
let mut last_slot = std::u64::MAX;
let mut slots_per_epoch = 0;
let mut passed = 0;
let mut filtered = 0;
let mut slot_filtered = 0;
let mut index_overrun_total = 0;

let index_start = OFFSET_OF_SHRED_INDEX;
let index_end = index_start + SIZE_OF_SHRED_INDEX;
let slot_start = OFFSET_OF_SHRED_SLOT;
let slot_end = slot_start + SIZE_OF_SHRED_SLOT;
while let Some(mut p) = recvr.iter().next() {
let index_start = OFFSET_OF_SHRED_INDEX;
let index_end = index_start + SIZE_OF_SHRED_INDEX;
if last_cleared.elapsed().as_millis() > 200 {
shreds_received.clear();
last_cleared = Instant::now();
if let Some(bank_forks) = bank_forks.as_ref() {
let bank_forks_r = bank_forks.read().unwrap();
last_root = bank_forks_r.root();
let working_bank = bank_forks_r.working_bank();
last_slot = working_bank.slot();
let root_bank = bank_forks_r.root_bank();
slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch());
}
}
let mut index_overrun = 0;
let mut shred_count = 0;
p.packets.iter_mut().for_each(|p| {
count += 1;
shred_count += 1;
p.meta.discard = true;
if index_end <= p.meta.size {
if let Ok(index) = limited_deserialize::<u32>(&p.data[index_start..index_end]) {
if index < MAX_DATA_SHREDS_PER_SLOT as u32 {
p.meta.discard = false;
modify(p);
if slot_end <= p.meta.size {
if let Ok(slot) =
limited_deserialize::<Slot>(&p.data[slot_start..slot_end])
{
if slot > last_root && slot < (last_slot + 2 * slots_per_epoch)
{
// Shred filter
let slot_received = shreds_received.entry(slot).or_insert(
BitVec::new_fill(
false,
MAX_DATA_SHREDS_PER_SLOT as u64,
),
);
if !slot_received.get(index.into()) {
p.meta.discard = false;
modify(p);
slot_received.set(index.into(), true);
passed += 1;
} else {
filtered += 1;
}
} else {
slot_filtered += 1;
}
}
}
} else {
inc_new_counter_warn!("shred_fetch_stage-shred_index_overrun", 1);
index_overrun += 1;
index_overrun_total += 1;
}
}
}
});
inc_new_counter_warn!("shred_fetch_stage-shred_index_overrun", index_overrun);
inc_new_counter_info!("shred_fetch_stage-shred_count", shred_count);
if sendr.send(p).is_err() {
break;
}
if last_print.elapsed().as_secs() > 2 {
last_print = Instant::now();
info!(
"shred count: {} passed: {} filtered: {} slot_filtered: {} index_overrun: {}",
count, passed, filtered, slot_filtered, index_overrun_total
);
count = 0;
filtered = 0;
slot_filtered = 0;
passed = 0;
index_overrun_total = 0;
}
}
}

Expand All @@ -49,6 +134,7 @@ impl ShredFetchStage {
exit: &Arc<AtomicBool>,
sender: PacketSender,
recycler: Recycler<PinnedVec<Packet>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
modify: F,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>)
where
Expand All @@ -68,9 +154,10 @@ impl ShredFetchStage {
})
.collect();

let bank_forks = bank_forks.clone();
let modifier_hdl = Builder::new()
.name("solana-tvu-fetch-stage-packet-modifier".to_string())
.spawn(|| Self::modify_packets(packet_receiver, sender, modify))
.spawn(move || Self::modify_packets(packet_receiver, sender, bank_forks, modify))
.unwrap();
(streamers, modifier_hdl)
}
Expand All @@ -80,6 +167,7 @@ impl ShredFetchStage {
forward_sockets: Vec<Arc<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
sender: &PacketSender,
bank_forks: Option<Arc<RwLock<BankForks>>>,
exit: &Arc<AtomicBool>,
) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(100, 1024);
Expand All @@ -99,6 +187,7 @@ impl ShredFetchStage {
&exit,
sender.clone(),
recycler.clone(),
bank_forks.clone(),
|p| p.meta.forward = true,
);

Expand All @@ -107,6 +196,7 @@ impl ShredFetchStage {
&exit,
sender.clone(),
recycler.clone(),
bank_forks,
|p| p.meta.repair = true,
);

Expand Down
1 change: 1 addition & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl Tvu {
forward_sockets,
repair_socket.clone(),
&fetch_sender,
Some(bank_forks.clone()),
&exit,
);

Expand Down

0 comments on commit 867b5e9

Please sign in to comment.