Refactor network transactions handling (#5939)

* change propagation

* add bound
This commit is contained in:
Nikolay Volf
2020-05-11 11:36:00 +03:00
committed by GitHub
parent f028c84a5b
commit 679c91e18c
4 changed files with 122 additions and 59 deletions
+64 -12
View File
@@ -17,13 +17,13 @@
use crate::{
ExHashT,
chain::{Client, FinalityProofProvider},
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool},
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport},
error,
utils::interval
};
use bytes::{Bytes, BytesMut};
use futures::prelude::*;
use futures::{prelude::*, stream::FuturesUnordered};
use generic_proto::{GenericProto, GenericProtoOut};
use libp2p::{Multiaddr, PeerId};
use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}};
@@ -78,6 +78,9 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
/// Maximim number of known extrinsic hashes to keep for a peer.
const MAX_KNOWN_EXTRINSICS: usize = 4096; // ~128kb per peer + overhead
/// Maximim number of transaction validation request we keep at any moment.
const MAX_PENDING_TRANSACTIONS: usize = 8192;
/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 6;
/// Lowest version we support
@@ -101,6 +104,13 @@ mod rep {
pub const UNEXPECTED_STATUS: Rep = Rep::new(-(1 << 20), "Unexpected status message");
/// Reputation change when we are a light client and a peer is behind us.
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
/// Reputation change when a peer sends us any extrinsic.
///
/// This forces node to verify it, thus the negative value here. Once extrinsic is verified,
/// reputation change should be refunded with `ANY_EXTRINSIC_REFUND`
pub const ANY_EXTRINSIC: Rep = Rep::new(-(1 << 4), "Any extrinsic");
/// Reputation change when a peer sends us any extrinsic that is not invalid.
pub const ANY_EXTRINSIC_REFUND: Rep = Rep::new(1 << 4, "Any extrinsic (refund)");
/// Reputation change when a peer sends us an extrinsic that we didn't know about.
pub const GOOD_EXTRINSIC: Rep = Rep::new(1 << 7, "Good extrinsic");
/// Reputation change when a peer sends us a bad extrinsic.
@@ -182,6 +192,24 @@ impl Metrics {
}
}
struct PendingTransaction {
validation: TransactionImportFuture,
peer_id: PeerId,
}
impl Future for PendingTransaction {
type Output = (PeerId, TransactionImport);
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) {
return Poll::Ready((this.peer_id.clone(), import_result));
}
Poll::Pending
}
}
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, H: ExHashT> {
/// Interval at which we call `tick`.
@@ -190,6 +218,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
/// Pending extrinsic verification tasks.
pending_transactions: FuturesUnordered<PendingTransaction>,
config: ProtocolConfig,
genesis_hash: B::Hash,
sync: ChainSync<B>,
@@ -394,6 +424,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
pending_messages: VecDeque::new(),
pending_transactions: FuturesUnordered::new(),
config,
context_data: ContextData {
peers: HashMap::new(),
@@ -1118,20 +1149,37 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
for t in extrinsics {
if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
debug!(
target: "sync",
"Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
MAX_PENDING_TRANSACTIONS,
);
break;
}
let hash = self.transaction_pool.hash_of(&t);
peer.known_extrinsics.insert(hash);
self.transaction_pool.import(
self.peerset_handle.clone().into(),
who.clone(),
rep::GOOD_EXTRINSIC,
rep::BAD_EXTRINSIC,
t,
);
self.peerset_handle.report_peer(who.clone(), rep::ANY_EXTRINSIC);
self.pending_transactions.push(PendingTransaction {
peer_id: who.clone(),
validation: self.transaction_pool.import(t),
});
}
}
}
fn on_handle_extrinsic_import(&mut self, who: PeerId, import: TransactionImport) {
match import {
TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_EXTRINSIC_REFUND),
TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_EXTRINSIC),
TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_EXTRINSIC),
TransactionImport::None => {},
}
}
/// Propagate one extrinsic.
pub fn propagate_extrinsic(
&mut self,
@@ -1953,7 +2001,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::BlockRequest(r)
GenericMessage::BlockRequest(r),
)
}
}
@@ -1970,7 +2018,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::BlockRequest(r)
GenericMessage::BlockRequest(r),
)
}
}
@@ -1988,9 +2036,13 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::FinalityProofRequest(r))
GenericMessage::FinalityProofRequest(r),
)
}
}
if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
self.on_handle_extrinsic_import(peer_id, result);
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
}