change everything to transaction (#6440)

This commit is contained in:
Nikolay Volf
2020-06-21 13:34:19 +03:00
committed by GitHub
parent 04c62d6c5a
commit 36d2eefdc9
3 changed files with 74 additions and 73 deletions
+61 -60
View File
@@ -72,15 +72,15 @@ pub use generic_proto::LegacyConnectionKillError;
const REQUEST_TIMEOUT_SEC: u64 = 40; const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Interval at which we perform time based maintenance /// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
/// Interval at which we propagate extrinsics; /// Interval at which we propagate transactions;
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);
/// Maximim number of known block hashes to keep for a peer. /// Maximim number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
/// Maximim number of known extrinsic hashes to keep for a peer. /// Maximim number of known transaction hashes to keep for a peer.
/// ///
/// This should be approx. 2 blocks full of transactions for the network to function properly. /// This should be approx. 2 blocks full of transactions for the network to function properly.
const MAX_KNOWN_EXTRINSICS: usize = 10240; // ~300kb per peer + overhead. const MAX_KNOWN_TRANSACTIONS: usize = 10240; // ~300kb per peer + overhead.
/// Maximim number of transaction validation request we keep at any moment. /// Maximim number of transaction validation request we keep at any moment.
const MAX_PENDING_TRANSACTIONS: usize = 8192; const MAX_PENDING_TRANSACTIONS: usize = 8192;
@@ -106,25 +106,25 @@ mod rep {
pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout"); pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
/// Reputation change when we are a light client and a peer is behind us. /// Reputation change when we are a light client and a peer is behind us.
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer"); pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
/// Reputation change when a peer sends us any extrinsic. /// Reputation change when a peer sends us any transaction.
/// ///
/// This forces node to verify it, thus the negative value here. Once extrinsic is verified, /// This forces node to verify it, thus the negative value here. Once transaction is verified,
/// reputation change should be refunded with `ANY_EXTRINSIC_REFUND` /// reputation change should be refunded with `ANY_TRANSACTION_REFUND`
pub const ANY_EXTRINSIC: Rep = Rep::new(-(1 << 4), "Any extrinsic"); pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
/// Reputation change when a peer sends us any extrinsic that is not invalid. /// Reputation change when a peer sends us any transaction that is not invalid.
pub const ANY_EXTRINSIC_REFUND: Rep = Rep::new(1 << 4, "Any extrinsic (refund)"); pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
/// Reputation change when a peer sends us an extrinsic that we didn't know about. /// Reputation change when a peer sends us an transaction that we didn't know about.
pub const GOOD_EXTRINSIC: Rep = Rep::new(1 << 7, "Good extrinsic"); pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
/// Reputation change when a peer sends us a bad extrinsic. /// Reputation change when a peer sends us a bad transaction.
pub const BAD_EXTRINSIC: Rep = Rep::new(-(1 << 12), "Bad extrinsic"); pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
/// We sent an RPC query to the given node, but it failed. /// We sent an RPC query to the given node, but it failed.
pub const RPC_FAILED: Rep = Rep::new(-(1 << 12), "Remote call failed"); pub const RPC_FAILED: Rep = Rep::new(-(1 << 12), "Remote call failed");
/// We received a message that failed to decode. /// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// We received an unexpected response. /// We received an unexpected response.
pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet"); pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet");
/// We received an unexpected extrinsic packet. /// We received an unexpected transaction packet.
pub const UNEXPECTED_EXTRINSICS: Rep = Rep::new_fatal("Unexpected extrinsics packet"); pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet");
/// We received an unexpected light node request. /// We received an unexpected light node request.
pub const UNEXPECTED_REQUEST: Rep = Rep::new_fatal("Unexpected block request packet"); pub const UNEXPECTED_REQUEST: Rep = Rep::new_fatal("Unexpected block request packet");
/// Peer has different genesis. /// Peer has different genesis.
@@ -145,7 +145,7 @@ struct Metrics {
fork_targets: Gauge<U64>, fork_targets: Gauge<U64>,
finality_proofs: GaugeVec<U64>, finality_proofs: GaugeVec<U64>,
justifications: GaugeVec<U64>, justifications: GaugeVec<U64>,
propagated_extrinsics: Counter<U64>, propagated_transactions: Counter<U64>,
} }
impl Metrics { impl Metrics {
@@ -191,8 +191,8 @@ impl Metrics {
)?; )?;
register(g, r)? register(g, r)?
}, },
propagated_extrinsics: register(Counter::new( propagated_transactions: register(Counter::new(
"sync_propagated_extrinsics", "sync_propagated_transactions",
"Number of transactions propagated to at least one peer", "Number of transactions propagated to at least one peer",
)?, r)?, )?, r)?,
}) })
@@ -221,11 +221,11 @@ impl Future for PendingTransaction {
pub struct Protocol<B: BlockT, H: ExHashT> { pub struct Protocol<B: BlockT, H: ExHashT> {
/// Interval at which we call `tick`. /// Interval at which we call `tick`.
tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>, tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Interval at which we call `propagate_extrinsics`. /// Interval at which we call `propagate_transactions`.
propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>, propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending list of messages to return from `poll` as a priority. /// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>, pending_messages: VecDeque<CustomMessageOutcome<B>>,
/// Pending extrinsic verification tasks. /// Pending transactions verification tasks.
pending_transactions: FuturesUnordered<PendingTransaction>, pending_transactions: FuturesUnordered<PendingTransaction>,
config: ProtocolConfig, config: ProtocolConfig,
genesis_hash: B::Hash, genesis_hash: B::Hash,
@@ -280,7 +280,7 @@ struct Peer<B: BlockT, H: ExHashT> {
/// Requests we are no longer interested in. /// Requests we are no longer interested in.
obsolete_requests: HashMap<message::RequestId, Instant>, obsolete_requests: HashMap<message::RequestId, Instant>,
/// Holds a set of transactions known to this peer. /// Holds a set of transactions known to this peer.
known_extrinsics: LruHashSet<H>, known_transactions: LruHashSet<H>,
/// Holds a set of blocks known to this peer. /// Holds a set of blocks known to this peer.
known_blocks: LruHashSet<B::Hash>, known_blocks: LruHashSet<B::Hash>,
/// Request counter, /// Request counter,
@@ -601,7 +601,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
return outcome; return outcome;
}, },
GenericMessage::Transactions(m) => GenericMessage::Transactions(m) =>
self.on_extrinsics(who, m), self.on_transactions(who, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request),
GenericMessage::RemoteCallResponse(_) => GenericMessage::RemoteCallResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"), warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
@@ -720,8 +720,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
// Print some diagnostics. // Print some diagnostics.
if let Some(peer) = self.context_data.peers.get(&who) { if let Some(peer) = self.context_data.peers.get(&who) {
debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \ debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \
known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})", known_transactions: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
who, peer.info.protocol_version, peer.info.roles, peer.known_extrinsics, peer.known_blocks, who, peer.info.protocol_version, peer.info.roles, peer.known_transactions, peer.known_blocks,
peer.info.best_hash, peer.info.best_number); peer.info.best_hash, peer.info.best_number);
} else { } else {
debug!(target: "sync", "Peer clogged before being properly connected"); debug!(target: "sync", "Peer clogged before being properly connected");
@@ -1048,7 +1048,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let peer = Peer { let peer = Peer {
info, info,
block_request: None, block_request: None,
known_extrinsics: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_EXTRINSICS) known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")), .expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS) known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")), .expect("Constant is nonzero")),
@@ -1137,28 +1137,29 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.map(|(peer_id, peer)| (peer_id, peer.info.roles)) .map(|(peer_id, peer)| (peer_id, peer.info.roles))
} }
/// Called when peer sends us new extrinsics /// Called when peer sends us new transactions
fn on_extrinsics( fn on_transactions(
&mut self, &mut self,
who: PeerId, who: PeerId,
extrinsics: message::Transactions<B::Extrinsic> transactions: message::Transactions<B::Extrinsic>
) { ) {
// sending extrinsic to light node is considered a bad behavior // sending transaction to light node is considered a bad behavior
if !self.config.roles.is_full() { if !self.config.roles.is_full() {
trace!(target: "sync", "Peer {} is trying to send extrinsic to the light node", who); trace!(target: "sync", "Peer {} is trying to send transactions to the light node", who);
self.behaviour.disconnect_peer(&who); self.behaviour.disconnect_peer(&who);
self.peerset_handle.report_peer(who, rep::UNEXPECTED_EXTRINSICS); self.peerset_handle.report_peer(who, rep::UNEXPECTED_TRANSACTIONS);
return; return;
} }
// Accept extrinsics only when fully synced // Accept transactions only when fully synced
if self.sync.status().state != SyncState::Idle { if self.sync.status().state != SyncState::Idle {
trace!(target: "sync", "{} Ignoring extrinsics while syncing", who); trace!(target: "sync", "{} Ignoring transactions while syncing", who);
return; return;
} }
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
trace!(target: "sync", "Received {} transactions from {}", transactions.len(), who);
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
for t in extrinsics { for t in transactions {
if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS { if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
debug!( debug!(
target: "sync", target: "sync",
@@ -1169,9 +1170,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
} }
let hash = self.transaction_pool.hash_of(&t); let hash = self.transaction_pool.hash_of(&t);
peer.known_extrinsics.insert(hash); peer.known_transactions.insert(hash);
self.peerset_handle.report_peer(who.clone(), rep::ANY_EXTRINSIC); self.peerset_handle.report_peer(who.clone(), rep::ANY_TRANSACTION);
self.pending_transactions.push(PendingTransaction { self.pending_transactions.push(PendingTransaction {
peer_id: who.clone(), peer_id: who.clone(),
@@ -1181,45 +1182,45 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
} }
} }
fn on_handle_extrinsic_import(&mut self, who: PeerId, import: TransactionImport) { fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
match import { match import {
TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_EXTRINSIC_REFUND), TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_TRANSACTION_REFUND),
TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_EXTRINSIC), TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_TRANSACTION),
TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_EXTRINSIC), TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_TRANSACTION),
TransactionImport::None => {}, TransactionImport::None => {},
} }
} }
/// Propagate one extrinsic. /// Propagate one transaction.
pub fn propagate_extrinsic( pub fn propagate_transaction(
&mut self, &mut self,
hash: &H, hash: &H,
) { ) {
debug!(target: "sync", "Propagating extrinsic [{:?}]", hash); debug!(target: "sync", "Propagating transaction [{:?}]", hash);
// Accept transactions only when fully synced // Accept transactions only when fully synced
if self.sync.status().state != SyncState::Idle { if self.sync.status().state != SyncState::Idle {
return; return;
} }
if let Some(extrinsic) = self.transaction_pool.transaction(hash) { if let Some(transaction) = self.transaction_pool.transaction(hash) {
let propagated_to = self.do_propagate_extrinsics(&[(hash.clone(), extrinsic)]); let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
self.transaction_pool.on_broadcasted(propagated_to); self.transaction_pool.on_broadcasted(propagated_to);
} }
} }
fn do_propagate_extrinsics( fn do_propagate_transactions(
&mut self, &mut self,
extrinsics: &[(H, B::Extrinsic)], transactions: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> { ) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::new(); let mut propagated_to = HashMap::new();
for (who, peer) in self.context_data.peers.iter_mut() { for (who, peer) in self.context_data.peers.iter_mut() {
// never send extrinsics to the light node // never send transactions to the light node
if !peer.info.roles.is_full() { if !peer.info.roles.is_full() {
continue; continue;
} }
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics let (hashes, to_send): (Vec<_>, Vec<_>) = transactions
.iter() .iter()
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone())) .filter(|&(ref hash, _)| peer.known_transactions.insert(hash.clone()))
.cloned() .cloned()
.unzip(); .unzip();
@@ -1244,22 +1245,22 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if propagated_to.len() > 0 { if propagated_to.len() > 0 {
if let Some(ref metrics) = self.metrics { if let Some(ref metrics) = self.metrics {
metrics.propagated_extrinsics.inc(); metrics.propagated_transactions.inc();
} }
} }
propagated_to propagated_to
} }
/// Call when we must propagate ready extrinsics to peers. /// Call when we must propagate ready transactions to peers.
pub fn propagate_extrinsics(&mut self) { pub fn propagate_transactions(&mut self) {
debug!(target: "sync", "Propagating extrinsics"); debug!(target: "sync", "Propagating transactions");
// Accept transactions only when fully synced // Accept transactions only when fully synced
if self.sync.status().state != SyncState::Idle { if self.sync.status().state != SyncState::Idle {
return; return;
} }
let extrinsics = self.transaction_pool.transactions(); let transactions = self.transaction_pool.transactions();
let propagated_to = self.do_propagate_extrinsics(&extrinsics); let propagated_to = self.do_propagate_transactions(&transactions);
self.transaction_pool.on_broadcasted(propagated_to); self.transaction_pool.on_broadcasted(propagated_to);
} }
@@ -1983,7 +1984,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
} }
while let Poll::Ready(Some(())) = self.propagate_timeout.poll_next_unpin(cx) { while let Poll::Ready(Some(())) = self.propagate_timeout.poll_next_unpin(cx) {
self.propagate_extrinsics(); self.propagate_transactions();
} }
for (id, mut r) in self.sync.block_requests() { for (id, mut r) in self.sync.block_requests() {
@@ -2011,7 +2012,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
self.pending_messages.push_back(event); self.pending_messages.push_back(event);
} }
if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) { if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
self.on_handle_extrinsic_import(peer_id, result); self.on_handle_transaction_import(peer_id, result);
} }
if let Some(message) = self.pending_messages.pop_front() { if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
@@ -2050,7 +2051,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
} }
Some(Fallback::Transactions) => { Some(Fallback::Transactions) => {
if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) { if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) {
self.on_extrinsics(peer_id, m); self.on_transactions(peer_id, m);
} else { } else {
warn!(target: "sub-libp2p", "Failed to decode transactions list"); warn!(target: "sub-libp2p", "Failed to decode transactions list");
} }
+9 -9
View File
@@ -587,15 +587,15 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// All transactions will be fetched from the `TransactionPool` that was passed at /// All transactions will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration and propagated to peers. /// initialization as part of the configuration and propagated to peers.
pub fn trigger_repropagate(&self) { pub fn trigger_repropagate(&self) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsics); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransactions);
} }
/// You must call when new transaction is imported by the transaction pool. /// You must call when new transaction is imported by the transaction pool.
/// ///
/// This transaction will be fetched from the `TransactionPool` that was passed at /// This transaction will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration and propagated to peers. /// initialization as part of the configuration and propagated to peers.
pub fn propagate_extrinsic(&self, hash: H) { pub fn propagate_transaction(&self, hash: H) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsic(hash)); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransaction(hash));
} }
/// Make sure an important block is propagated to peers. /// Make sure an important block is propagated to peers.
@@ -798,8 +798,8 @@ impl<B, H> NetworkStateInfo for NetworkService<B, H>
/// ///
/// Each entry corresponds to a method of `NetworkService`. /// Each entry corresponds to a method of `NetworkService`.
enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> { enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
PropagateExtrinsic(H), PropagateTransaction(H),
PropagateExtrinsics, PropagateTransactions,
RequestJustification(B::Hash, NumberFor<B>), RequestJustification(B::Hash, NumberFor<B>),
AnnounceBlock(B::Hash, Vec<u8>), AnnounceBlock(B::Hash, Vec<u8>),
GetValue(record::Key), GetValue(record::Key),
@@ -1119,10 +1119,10 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().announce_block(hash, data), this.network_service.user_protocol_mut().announce_block(hash, data),
ServiceToWorkerMsg::RequestJustification(hash, number) => ServiceToWorkerMsg::RequestJustification(hash, number) =>
this.network_service.user_protocol_mut().request_justification(&hash, number), this.network_service.user_protocol_mut().request_justification(&hash, number),
ServiceToWorkerMsg::PropagateExtrinsic(hash) => ServiceToWorkerMsg::PropagateTransaction(hash) =>
this.network_service.user_protocol_mut().propagate_extrinsic(&hash), this.network_service.user_protocol_mut().propagate_transaction(&hash),
ServiceToWorkerMsg::PropagateExtrinsics => ServiceToWorkerMsg::PropagateTransactions =>
this.network_service.user_protocol_mut().propagate_extrinsics(), this.network_service.user_protocol_mut().propagate_transactions(),
ServiceToWorkerMsg::GetValue(key) => ServiceToWorkerMsg::GetValue(key) =>
this.network_service.get_value(&key), this.network_service.get_value(&key),
ServiceToWorkerMsg::PutValue(key, value) => ServiceToWorkerMsg::PutValue(key, value) =>
+4 -4
View File
@@ -1067,7 +1067,7 @@ ServiceBuilder<
spawn_handle.spawn( spawn_handle.spawn(
"on-transaction-imported", "on-transaction-imported",
extrinsic_notifications(transaction_pool.clone(), network.clone()), transaction_notifications(transaction_pool.clone(), network.clone()),
); );
// Prometheus metrics. // Prometheus metrics.
@@ -1245,7 +1245,7 @@ ServiceBuilder<
} }
} }
async fn extrinsic_notifications<TBl, TExPool>( async fn transaction_notifications<TBl, TExPool>(
transaction_pool: Arc<TExPool>, transaction_pool: Arc<TExPool>,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>> network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>
) )
@@ -1253,10 +1253,10 @@ async fn extrinsic_notifications<TBl, TExPool>(
TBl: BlockT, TBl: BlockT,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>, TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>,
{ {
// extrinsic notifications // transaction notifications
transaction_pool.import_notification_stream() transaction_pool.import_notification_stream()
.for_each(move |hash| { .for_each(move |hash| {
network.propagate_extrinsic(hash); network.propagate_transaction(hash);
let status = transaction_pool.status(); let status = transaction_pool.status();
telemetry!(SUBSTRATE_INFO; "txpool.import"; telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready, "ready" => status.ready,