From 2fed420c439557101e59ade11b32c48fd0f201fe Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sat, 4 Jun 2022 18:51:27 -0300 Subject: [PATCH] fix: Upgrade Pallas to fix tx hash mismatch (#312) --- Cargo.lock | 47 +++++++++++++++--------------- Cargo.toml | 2 +- src/mapper/collect.rs | 5 ++-- src/mapper/map.rs | 2 +- src/mapper/shelley.rs | 16 +++++------ src/sources/common.rs | 40 +++++++++----------------- src/sources/n2c/blocks.rs | 58 ++++++++++++++++++++++---------------- src/sources/n2c/run.rs | 28 +++++++++--------- src/sources/n2c/setup.rs | 22 +++++++-------- src/sources/n2n/headers.rs | 4 +-- src/sources/n2n/run.rs | 6 ++-- src/sources/n2n/setup.rs | 18 ++++++------ 12 files changed, 124 insertions(+), 124 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 584db8b3..d6d3d464 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1320,9 +1320,9 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "minicbor" -version = "0.15.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b419e66bd98ccf5824dd4b8f4141f7431d1d07733c3e13c946278204a437d2b" +checksum = "a5e575910763b21a0db7df5e142907fe944bff84d1dfc78e2ba92e7f3bdfd36b" dependencies = [ "half", "minicbor-derive", @@ -1330,9 +1330,9 @@ dependencies = [ [[package]] name = "minicbor-derive" -version = "0.9.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03c32aff53852dc6dd3817559d603734e4f7a65411702953238aafdcb3a7fd47" +checksum = "d0a86c5f04def8fb7735ae918bb589af82f985526f4c62e0249544b668b2f456" dependencies = [ "proc-macro2", "quote", @@ -1593,9 +1593,9 @@ dependencies = [ [[package]] name = "pallas" -version = "0.9.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "930fb75d557eff3e9dbf7ca215816c834b2a0769e542764a99f01ae8daae7fde" +checksum = "e754376e24ea897ccaf8cde8c404c8a5baca798b03a4fdba4a48ca99077a6d3d" dependencies = [ "pallas-codec", "pallas-crypto", @@ -1606,56 +1606,59 @@ dependencies = [ [[package]] name = "pallas-codec" -version = "0.9.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a6b72d36979c540e639cc0b04cbeb922367df2315910b6cafb4bf61a44f3baf" +checksum = "6503e55e726847aa40cae46e190ddfc48f169c29c94ec19aa9cabe47d33a9660" dependencies = [ "minicbor", ] [[package]] name = "pallas-crypto" -version = "0.9.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa690b0e5cbc874ee5f9024f33e2743ddd5034b158f9f1947b4e5675a2ebb99f" +checksum = "72884124f88227e26802667f8a18b303e47c7fa7afd3852234d07c3379a8e2b9" dependencies = [ "cryptoxide", "hex", - "minicbor", + "pallas-codec", "rand_core", "thiserror", ] [[package]] name = "pallas-miniprotocols" -version = "0.9.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f7f73bdfaaef9c42034a4cb3bce48c07b4d8862371b7d6eee8f9162416a30f9" +checksum = "4b48feaccb2c86871c0c8f9fdd78c7a9246979da979bb8764d4deef4b75475dd" dependencies = [ "hex", "itertools", "log 0.4.17", - "net2", "pallas-codec", "pallas-multiplexer", + "thiserror", ] [[package]] name = "pallas-multiplexer" -version = "0.9.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0862ffc0600471b6bbf3b7b237842f94a1141f578f5d5426c202ce58e391e158" +checksum = "7c5c69d4c2e6dc2a337573296b13d071bee6c2add517425a94a197587533ccff" dependencies = [ "byteorder 1.4.3", "hex", "log 0.4.17", + "pallas-codec", + "rand", + "thiserror", ] [[package]] name = "pallas-primitives" -version = "0.9.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "000b7c2fe9e8cbcf00853d3dcb690f08a2038cf7ddba9d7c419c749530d6916e" +checksum = "6369069cf3a96ec12053e3902e1a15f77de494eb4dabf90d921bd6b0b8003c3a" dependencies = [ "base58", "bech32", @@ -2412,18 +2415,18 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 7438a26c..e4588c5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.9.1" +pallas = "0.10.0" # pallas = { path = "../pallas/pallas" } hex = "0.4.3" net2 = "0.2.37" diff --git a/src/mapper/collect.rs b/src/mapper/collect.rs index 4358eeba..ad5b93f5 100644 --- a/src/mapper/collect.rs +++ b/src/mapper/collect.rs @@ -1,5 +1,6 @@ -use pallas::ledger::primitives::alonzo::{ - AuxiliaryData, Block, Multiasset, TransactionInput, TransactionOutput, Value, +use pallas::ledger::primitives::{ + alonzo::{AuxiliaryData, Block, Multiasset, TransactionInput, TransactionOutput, Value}, + ToHash, }; use crate::{ diff --git a/src/mapper/map.rs b/src/mapper/map.rs index 469e5d78..b9ac0a1f 100644 --- a/src/mapper/map.rs +++ b/src/mapper/map.rs @@ -8,7 +8,7 @@ use pallas::ledger::primitives::alonzo::{ TransactionOutput, Value, }; use pallas::ledger::primitives::alonzo::{NetworkId, TransactionBody, TransactionBodyComponent}; -use pallas::ledger::primitives::ToCanonicalJson; +use pallas::ledger::primitives::{ToCanonicalJson, ToHash}; use pallas::network::miniprotocols::Point; use serde_json::{json, Value as JsonValue}; diff --git a/src/mapper/shelley.rs b/src/mapper/shelley.rs index cfafcb2f..c1eca4f5 100644 --- a/src/mapper/shelley.rs +++ b/src/mapper/shelley.rs @@ -1,7 +1,7 @@ -use pallas::ledger::primitives::Fragment; +use pallas::ledger::primitives::{Fragment, ToHash}; use pallas::ledger::primitives::alonzo::{ - self, crypto, AuxiliaryData, Block, Certificate, Metadata, Multiasset, TransactionBody, + self, AuxiliaryData, Block, Certificate, Metadata, Multiasset, TransactionBody, TransactionBodyComponent, TransactionInput, TransactionOutput, TransactionWitnessSet, Value, }; @@ -272,7 +272,7 @@ impl EventWriter { #[deprecated(note = "use crawl_from_shelley_cbor instead")] pub fn crawl_with_cbor(&self, block: &Block, cbor: &[u8]) -> Result<(), Error> { - let hash = crypto::hash_block_header(&block.header); + let hash = block.header.to_hash(); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(&hash)), @@ -287,7 +287,7 @@ impl EventWriter { #[deprecated(note = "use crawl_from_shelley_cbor instead")] pub fn crawl(&self, block: &Block) -> Result<(), Error> { - let hash = crypto::hash_block_header(&block.header); + let hash = block.header.to_hash(); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(&hash)), @@ -305,13 +305,13 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we already have a decoded block (for example, N2C). The raw CBOR is also /// passed through in case we need to attach it to outbound events. - pub fn crawl_shelley_with_cbor( + pub fn crawl_shelley_with_cbor<'b>( &self, - block: &Block, - cbor: &[u8], + block: &'b Block, + cbor: &'b [u8], era: Era, ) -> Result<(), Error> { - let hash = crypto::hash_block_header(&block.header); + let hash = block.header.to_hash(); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(&hash)), diff --git a/src/sources/common.rs b/src/sources/common.rs index ee535129..aee99ad2 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -1,14 +1,9 @@ use core::fmt; -use std::{net::TcpStream, ops::Deref, str::FromStr, time::Duration}; +use std::{ops::Deref, str::FromStr, time::Duration}; -#[cfg(target_family = "unix")] -use std::os::unix::net::UnixStream; - -use log::info; -use net2::TcpStreamExt; use pallas::network::{ miniprotocols::{chainsync::TipFinder, run_agent, Point, MAINNET_MAGIC, TESTNET_MAGIC}, - multiplexer::{Channel, Multiplexer}, + multiplexer::{bearers::Bearer, StdChannel, StdPlexer}, }; use serde::{de::Visitor, Deserializer}; use serde::{Deserialize, Serialize}; @@ -151,24 +146,16 @@ pub struct RetryPolicy { connection_max_backoff: u32, } -pub fn setup_multiplexer_attempt( - bearer: &BearerKind, - address: &str, - protocols: &[u16], -) -> Result { +pub fn setup_multiplexer_attempt(bearer: &BearerKind, address: &str) -> Result { match bearer { BearerKind::Tcp => { - let tcp = TcpStream::connect(address)?; - tcp.set_nodelay(true)?; - tcp.set_keepalive_ms(Some(30_000u32))?; - - Multiplexer::setup(tcp, protocols) + let bearer = Bearer::connect_tcp(address)?; + Ok(StdPlexer::new(bearer)) } #[cfg(target_family = "unix")] BearerKind::Unix => { - let unix = UnixStream::connect(address)?; - - Multiplexer::setup(unix, protocols) + let unix = Bearer::connect_unix(address)?; + Ok(StdPlexer::new(unix)) } } } @@ -176,12 +163,11 @@ pub fn setup_multiplexer_attempt( pub fn setup_multiplexer( bearer: &BearerKind, address: &str, - protocols: &[u16], retry: &Option, -) -> Result { +) -> Result { match retry { Some(policy) => retry::retry_operation( - || setup_multiplexer_attempt(bearer, address, protocols), + || setup_multiplexer_attempt(bearer, address), &retry::Policy { max_retries: policy.connection_max_retries, backoff_unit: Duration::from_secs(1), @@ -189,7 +175,7 @@ pub fn setup_multiplexer( max_backoff: Duration::from_secs(policy.connection_max_backoff as u64), }, ), - None => setup_multiplexer_attempt(bearer, address, protocols), + None => setup_multiplexer_attempt(bearer, address), } } @@ -241,7 +227,7 @@ pub fn should_finalize( } pub(crate) fn find_end_of_chain( - channel: &mut Channel, + channel: &mut StdChannel, well_known: &ChainWellKnownInfo, ) -> Result { let point = Point::Specific( @@ -251,7 +237,7 @@ pub(crate) fn find_end_of_chain( let agent = TipFinder::initial(point); let agent = run_agent(agent, channel)?; - info!("chain point query output: {:?}", agent.output); + log::info!("chain point query output: {:?}", agent.output); match agent.output { Some(tip) => Ok(tip.0), @@ -263,7 +249,7 @@ pub(crate) fn define_start_point( intersect: &Option, since: &Option, utils: &Utils, - cs_channel: &mut Channel, + cs_channel: &mut StdChannel, ) -> Result>, Error> { let cursor = utils.get_cursor_if_any(); diff --git a/src/sources/n2c/blocks.rs b/src/sources/n2c/blocks.rs index ea7d8b5f..af52e023 100644 --- a/src/sources/n2c/blocks.rs +++ b/src/sources/n2c/blocks.rs @@ -2,50 +2,58 @@ use std::ops::Deref; use pallas::{ codec::minicbor::decode, - ledger::primitives::{alonzo, byron, probing, Era}, - network::miniprotocols::{chainsync::BlockContent, Point}, + ledger::primitives::{alonzo, byron, probing, Era, ToHash}, + network::miniprotocols::Point, }; use crate::Error; -#[derive(Debug)] -pub(crate) enum MultiEraBlock { - Byron(Box), - AlonzoCompatible(Box, Era), -} - -impl TryFrom for MultiEraBlock { - type Error = Error; +pub(crate) struct CborHolder(Vec); - fn try_from(value: BlockContent) -> Result { - let bytes = value.deref(); +impl<'b> CborHolder { + pub fn new(bytes: Vec) -> Self { + Self(bytes) + } - match probing::probe_block_cbor_era(bytes) { + pub fn parse(&'b self) -> Result, Error> { + let block = match probing::probe_block_cbor_era(&self.0) { probing::Outcome::Matched(era) => match era { pallas::ledger::primitives::Era::Byron => { - let block = decode(bytes)?; - Ok(MultiEraBlock::Byron(Box::new(block))) + let block = decode(&self.0)?; + MultiEraBlock::Byron(Box::new(block)) } _ => { - let alonzo::BlockWrapper(_, block) = decode(bytes)?; - Ok(MultiEraBlock::AlonzoCompatible(Box::new(block), era)) + let alonzo::BlockWrapper(_, block) = decode(&self.0)?; + MultiEraBlock::AlonzoCompatible(Box::new(block), era) } }, - // TODO: we're assuming that the genesis block is Byron-compatible. Is this a safe + // TODO: we're assuming that the geenesis block is Byron-compatible. Is this a safe // assumption? probing::Outcome::GenesisBlock => { - let block = decode(bytes)?; - Ok(MultiEraBlock::Byron(Box::new(block))) + let block = decode(&self.0)?; + MultiEraBlock::Byron(Box::new(block)) } probing::Outcome::Inconclusive => { - log::error!("CBOR hex for debugging: {}", hex::encode(bytes)); - Err("can't infer primitive block from cbor, inconclusive probing".into()) + log::error!("CBOR hex for debugging: {}", hex::encode(&self.0)); + return Err("can't infer primitive block from cbor, inconclusive probing".into()); } - } + }; + + Ok(block) } + + pub fn cbor(&'b self) -> &'b [u8] { + &self.0 + } +} + +#[derive(Debug)] +pub(crate) enum MultiEraBlock<'b> { + Byron(Box), + AlonzoCompatible(Box>, Era), } -impl MultiEraBlock { +impl MultiEraBlock<'_> { pub(crate) fn read_cursor(&self) -> Result { match self { MultiEraBlock::Byron(x) => match x.deref() { @@ -61,7 +69,7 @@ impl MultiEraBlock { } }, MultiEraBlock::AlonzoCompatible(x, _) => { - let hash = alonzo::crypto::hash_block_header(&x.header); + let hash = x.header.to_hash(); Ok(Point::Specific(x.header.header_body.slot, hash.to_vec())) } } diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index 8a5524ff..2f0f7e8e 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -1,18 +1,18 @@ -use std::{collections::HashMap, fmt::Debug, ops::Deref}; +use std::{collections::HashMap, fmt::Debug}; use pallas::network::{ miniprotocols::{chainsync, run_agent, Point}, - multiplexer::Channel, + multiplexer::StdChannel, }; -use crate::{mapper::EventWriter, Error}; +use crate::{mapper::EventWriter, sources::n2c::blocks::CborHolder, Error}; use super::blocks::MultiEraBlock; struct ChainObserver { chain_buffer: chainsync::RollbackBuffer, min_depth: usize, - blocks: HashMap, + blocks: HashMap, event_writer: EventWriter, } @@ -32,16 +32,16 @@ fn log_buffer_state(buffer: &chainsync::RollbackBuffer) { ); } -impl chainsync::Observer for ChainObserver { +impl<'b> chainsync::Observer for ChainObserver { fn on_roll_forward( &mut self, content: chainsync::BlockContent, tip: &chainsync::Tip, ) -> Result> { // parse the block and extract the point of the chain - let cbor = Vec::from(content.deref()); - let block = MultiEraBlock::try_from(content)?; - let point = block.read_cursor()?; + let cbor = content.into(); + let block = CborHolder::new(cbor); + let point = block.parse()?.read_cursor()?; // store the block for later retrieval self.blocks.insert(point.clone(), block); @@ -61,13 +61,13 @@ impl chainsync::Observer for ChainObserver { .remove(&point) .expect("required block not found in memory"); - match block { - MultiEraBlock::Byron(model) => { - self.event_writer.crawl_byron_with_cbor(&model, &cbor)? - } + match block.parse()? { + MultiEraBlock::Byron(model) => self + .event_writer + .crawl_byron_with_cbor(&model, block.cbor())?, MultiEraBlock::AlonzoCompatible(model, era) => self .event_writer - .crawl_shelley_with_cbor(&model, &cbor, era.into())?, + .crawl_shelley_with_cbor(&model, block.cbor(), era.into())?, }; } @@ -107,7 +107,7 @@ impl chainsync::Observer for ChainObserver { } pub(crate) fn observe_forever( - mut channel: Channel, + mut channel: StdChannel, event_writer: EventWriter, known_points: Option>, min_depth: usize, diff --git a/src/sources/n2c/setup.rs b/src/sources/n2c/setup.rs index baa196ed..b1b79acb 100644 --- a/src/sources/n2c/setup.rs +++ b/src/sources/n2c/setup.rs @@ -1,10 +1,8 @@ use std::ops::Deref; -use log::info; - use pallas::network::{ miniprotocols::{handshake, run_agent, MAINNET_MAGIC}, - multiplexer::Channel, + multiplexer::StdChannel, }; use serde::Deserialize; @@ -53,10 +51,10 @@ pub struct Config { pub retry_policy: Option, } -fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> { +fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), Error> { let versions = handshake::n2c::VersionTable::v1_and_above(magic); let agent = run_agent(handshake::Initiator::initial(versions), channel)?; - info!("handshake output: {:?}", agent.output); + log::info!("handshake output: {:?}", agent.output); match agent.output { handshake::Output::Accepted(_, _) => Ok(()), @@ -68,13 +66,18 @@ impl SourceProvider for WithUtils { fn bootstrap(&self) -> PartialBootstrapResult { let (output_tx, output_rx) = new_inter_stage_channel(None); - let mut muxer = setup_multiplexer( + let mut plexer = setup_multiplexer( &self.inner.address.0, &self.inner.address.1, - &[0, 5], &self.inner.retry_policy, )?; + let mut hs_channel = plexer.use_channel(0); + let mut cs_channel = plexer.use_channel(5); + + plexer.muxer.spawn(); + plexer.demuxer.spawn(); + let magic = match &self.inner.magic { Some(m) => *m.deref(), None => MAINNET_MAGIC, @@ -82,11 +85,8 @@ impl SourceProvider for WithUtils { let writer = EventWriter::new(output_tx, self.utils.clone(), self.inner.mapper.clone()); - let mut hs_channel = muxer.use_channel(0); do_handshake(&mut hs_channel, magic)?; - let mut cs_channel = muxer.use_channel(5); - let known_points = define_start_point( &self.inner.intersect, #[allow(deprecated)] @@ -95,7 +95,7 @@ impl SourceProvider for WithUtils { &mut cs_channel, )?; - info!("starting chain sync from: {:?}", &known_points); + log::info!("starting chain sync from: {:?}", &known_points); let min_depth = self.inner.min_depth; let handle = std::thread::spawn(move || { diff --git a/src/sources/n2n/headers.rs b/src/sources/n2n/headers.rs index 1e524a47..af2fd8a2 100644 --- a/src/sources/n2n/headers.rs +++ b/src/sources/n2n/headers.rs @@ -1,6 +1,6 @@ use pallas::{ codec::minicbor::decode, - ledger::primitives::{alonzo, byron}, + ledger::primitives::{alonzo, byron, ToHash}, network::miniprotocols::{chainsync::HeaderContent, Point}, }; @@ -50,7 +50,7 @@ impl MultiEraHeader { Ok(Point::Specific(slot, hash.to_vec())) } MultiEraHeader::AlonzoCompatible(x) => { - let hash = alonzo::crypto::hash_block_header(x); + let hash = x.to_hash(); Ok(Point::Specific(x.header_body.slot, hash.to_vec())) } } diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index 8ce9b6f3..5cc6f7aa 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -4,7 +4,7 @@ use pallas::{ ledger::primitives::{probing, Era}, network::{ miniprotocols::{blockfetch, chainsync, run_agent, Point}, - multiplexer::Channel, + multiplexer::StdChannel, }, }; @@ -143,7 +143,7 @@ impl chainsync::Observer for &mut ChainObserver { } pub(crate) fn fetch_blocks_forever( - mut channel: Channel, + mut channel: StdChannel, event_writer: EventWriter, input: Receiver, ) -> Result<(), Error> { @@ -156,7 +156,7 @@ pub(crate) fn fetch_blocks_forever( } pub(crate) fn observe_headers_forever( - mut channel: Channel, + mut channel: StdChannel, event_writer: EventWriter, known_points: Option>, block_requests: SyncSender, diff --git a/src/sources/n2n/setup.rs b/src/sources/n2n/setup.rs index da8638ef..34c95c73 100644 --- a/src/sources/n2n/setup.rs +++ b/src/sources/n2n/setup.rs @@ -4,7 +4,7 @@ use log::info; use pallas::network::{ miniprotocols::{handshake, run_agent, MAINNET_MAGIC}, - multiplexer::Channel, + multiplexer::StdChannel, }; use serde::Deserialize; @@ -55,7 +55,7 @@ pub struct Config { pub finalize: Option, } -fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> { +fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), Error> { let versions = handshake::n2n::VersionTable::v6_and_above(magic); let agent = run_agent(handshake::Initiator::initial(versions), channel)?; info!("handshake output: {:?}", agent.output); @@ -70,13 +70,19 @@ impl SourceProvider for WithUtils { fn bootstrap(&self) -> PartialBootstrapResult { let (output_tx, output_rx) = new_inter_stage_channel(None); - let mut muxer = setup_multiplexer( + let mut plexer = setup_multiplexer( &self.inner.address.0, &self.inner.address.1, - &[0, 2, 3], &self.inner.retry_policy, )?; + let mut hs_channel = plexer.use_channel(0); + let mut cs_channel = plexer.use_channel(2); + let bf_channel = plexer.use_channel(3); + + plexer.muxer.spawn(); + plexer.demuxer.spawn(); + let magic = match &self.inner.magic { Some(m) => *m.deref(), None => MAINNET_MAGIC, @@ -84,11 +90,8 @@ impl SourceProvider for WithUtils { let writer = EventWriter::new(output_tx, self.utils.clone(), self.inner.mapper.clone()); - let mut hs_channel = muxer.use_channel(0); do_handshake(&mut hs_channel, magic)?; - let mut cs_channel = muxer.use_channel(2); - let known_points = define_start_point( &self.inner.intersect, #[allow(deprecated)] @@ -118,7 +121,6 @@ impl SourceProvider for WithUtils { log::info!("observe headers thread ended"); }); - let bf_channel = muxer.use_channel(3); let bf_writer = writer; let bf_handle = std::thread::spawn(move || { fetch_blocks_forever(bf_channel, bf_writer, headers_rx)