diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 63905fad77..8e6bdbdb1a 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -5924,6 +5924,7 @@ dependencies = [
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
+ "sc-network",
"sp-core",
"sp-keyring",
"sp-keystore",
diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml
index 971e4903f2..59fa1f4212 100644
--- a/polkadot/node/network/collator-protocol/Cargo.toml
+++ b/polkadot/node/network/collator-protocol/Cargo.toml
@@ -28,5 +28,6 @@ assert_matches = "1.4.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" }
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
index 6baa865232..6928ed7483 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin};
+use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin, time::Duration};
use futures::{FutureExt, StreamExt, channel::oneshot, stream::FuturesUnordered, select, Future};
use sp_core::Pair;
@@ -38,6 +38,7 @@ use polkadot_node_network_protocol::{
v1 as protocol_v1,
};
use polkadot_node_subsystem_util::{
+ TimeoutExt,
metrics::{self, prometheus},
runtime::{RuntimeInfo, get_availability_cores, get_group_rotation_info}
};
@@ -50,6 +51,19 @@ use super::{LOG_TARGET, Result};
mod tests;
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
+const COST_APPARENT_FLOOD: Rep = Rep::CostMinor("Message received when previous one was still being processed");
+
+/// Time after starting an upload to a validator we will start another one to the next validator,
+/// even if the upload was not finished yet.
+///
+/// This is to protect from a single slow validator preventing collations from happening.
+///
+/// With a collation size of 5Meg and bandwidth of 500Mbit/s (requirement for Kusama validators),
+/// the transfer should be possible within 0.1 seconds. 400 milliseconds should therefore be
+/// plenty and should be low enough for later validators to still be able to finish on time.
+///
+/// There is debug logging output, so we can adjust this value based on production results.
+const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(400);
#[derive(Clone, Default)]
pub struct Metrics(Option);
@@ -208,9 +222,14 @@ struct WaitingCollationFetches {
collation_fetch_active: bool,
/// The collation fetches waiting to be fulfilled.
waiting: VecDeque>,
+ /// All peers that are waiting or actively uploading.
+ ///
+ /// We will not accept multiple requests from the same peer, otherwise our DoS protection of
+ /// moving on to the next peer after `MAX_UNSHARED_UPLOAD_TIME` would be pointless.
+ waiting_peers: HashSet,
}
-type ActiveCollationFetches = FuturesUnordered + Send + 'static>>>;
+type ActiveCollationFetches = FuturesUnordered + Send + 'static>>>;
struct State {
/// Our network peer id.
@@ -684,6 +703,17 @@ where
let waiting = state.waiting_collation_fetches.entry(incoming.payload.relay_parent).or_default();
+ if !waiting.waiting_peers.insert(incoming.peer) {
+ tracing::debug!(
+ target: LOG_TARGET,
+ "Dropping incoming request as peer has a request in flight already."
+ );
+ ctx.send_message(
+ NetworkBridgeMessage::ReportPeer(incoming.peer, COST_APPARENT_FLOOD)
+ ).await;
+ return Ok(())
+ }
+
if waiting.collation_fetch_active {
waiting.waiting.push_back(incoming);
} else {
@@ -724,6 +754,7 @@ async fn send_collation(
let (tx, rx) = oneshot::channel();
let relay_parent = request.payload.relay_parent;
+ let peer_id = request.peer;
let response = OutgoingResponse {
result: Ok(CollationFetchingResponse::Collation(receipt, pov)),
@@ -739,8 +770,16 @@ async fn send_collation(
}
state.active_collation_fetches.push(async move {
- let _ = rx.await;
- relay_parent
+ let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
+ if r.is_none() {
+ tracing::debug!(
+ target: LOG_TARGET,
+ ?relay_parent,
+ ?peer_id,
+ "Sending collation to validator timed out, carrying on with next validator."
+ );
+ }
+ (relay_parent, peer_id)
}.boxed());
state.metrics.on_collation_sent();
@@ -986,8 +1025,9 @@ where
FromOverseer::Signal(BlockFinalized(..)) => {}
FromOverseer::Signal(Conclude) => return Ok(()),
},
- relay_parent = state.active_collation_fetches.select_next_some() => {
+ (relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => {
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
+ waiting.waiting_peers.remove(&peer_id);
if let Some(next) = waiting.waiting.pop_front() {
next
} else {
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
index 96319d8c15..40ab2610b1 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
@@ -560,6 +560,34 @@ fn advertise_and_send_collation() {
)
)
).await;
+ // Second request by same validator should get dropped and peer reported:
+ {
+ let (tx, rx) = oneshot::channel();
+ overseer_send(
+ &mut virtual_overseer,
+ CollatorProtocolMessage::CollationFetchingRequest(
+ IncomingRequest::new(
+ peer,
+ CollationFetchingRequest {
+ relay_parent: test_state.relay_parent,
+ para_id: test_state.para_id,
+ },
+ tx,
+ )
+ )
+ ).await;
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(bad_peer, _)) => {
+ assert_eq!(bad_peer, peer);
+ }
+ );
+ assert_matches!(
+ rx.await,
+ Err(_),
+ "Multiple concurrent requests by the same validator should get dropped."
+ );
+ }
assert_matches!(
rx.await,
@@ -620,124 +648,30 @@ fn advertise_and_send_collation() {
#[test]
fn send_only_one_collation_per_relay_parent_at_a_time() {
- let test_state = TestState::default();
- let local_peer_id = test_state.local_peer_id.clone();
- let collator_pair = test_state.collator_pair.clone();
+ test_validator_send_sequence(|mut second_response_receiver, feedback_first_tx| async move {
+ Delay::new(Duration::from_millis(100)).await;
+ assert!(
+ second_response_receiver.try_recv().unwrap().is_none(),
+ "We should not have send the collation yet to the second validator",
+ );
- test_harness(local_peer_id, collator_pair, |test_harness| async move {
- let mut virtual_overseer = test_harness.virtual_overseer;
-
- setup_system(&mut virtual_overseer, &test_state).await;
-
- let DistributeCollation { candidate, pov_block } =
- distribute_collation(&mut virtual_overseer, &test_state, true).await;
-
- for (val, peer) in test_state.current_group_validator_authority_ids()
- .into_iter()
- .zip(test_state.current_group_validator_peer_ids())
- {
- connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
+ // Signal that the collation fetch is finished
+ feedback_first_tx.send(()).expect("Sending collation fetch finished");
+ second_response_receiver
}
-
- // We declare to the connected validators that we are a collator.
- // We need to catch all `Declare` messages to the validators we've
- // previosly connected to.
- for peer_id in test_state.current_group_validator_peer_ids() {
- expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
- }
-
- let validator_0 = test_state.current_group_validator_peer_ids()[0].clone();
- let validator_1 = test_state.current_group_validator_peer_ids()[1].clone();
-
- // Send info about peer's view.
- send_peer_view_change(&mut virtual_overseer, &validator_0, vec![test_state.relay_parent]).await;
- send_peer_view_change(&mut virtual_overseer, &validator_1, vec![test_state.relay_parent]).await;
-
- // The peer is interested in a leaf that we have a collation for;
- // advertise it.
- expect_advertise_collation_msg(&mut virtual_overseer, &validator_0, test_state.relay_parent).await;
- expect_advertise_collation_msg(&mut virtual_overseer, &validator_1, test_state.relay_parent).await;
-
- // Request a collation.
- let (tx, rx) = oneshot::channel();
- overseer_send(
- &mut virtual_overseer,
- CollatorProtocolMessage::CollationFetchingRequest(
- IncomingRequest::new(
- validator_0,
- CollationFetchingRequest {
- relay_parent: test_state.relay_parent,
- para_id: test_state.para_id,
- },
- tx,
- )
- )
- ).await;
-
- // Keep the feedback channel alive because we need to use it to inform about the finished transfer.
- let feedback_tx = assert_matches!(
- rx.await,
- Ok(full_response) => {
- let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
- = CollationFetchingResponse::decode(
- &mut full_response.result
- .expect("We should have a proper answer").as_ref()
- )
- .expect("Decoding should work");
- assert_eq!(receipt, candidate);
- assert_eq!(pov, pov_block);
-
- full_response.sent_feedback.expect("Feedback channel is always set")
- }
- );
-
- // Let the second validator request the collation.
- let (tx, mut rx) = oneshot::channel();
- overseer_send(
- &mut virtual_overseer,
- CollatorProtocolMessage::CollationFetchingRequest(
- IncomingRequest::new(
- validator_1,
- CollationFetchingRequest {
- relay_parent: test_state.relay_parent,
- para_id: test_state.para_id,
- },
- tx,
- )
- )
- ).await;
-
- Delay::new(Duration::from_millis(500)).await;
-
- assert!(
- rx.try_recv().unwrap().is_none(),
- "We should not have send the collation yet to the second validator",
- );
-
- // Signal that the collation fetch is finished
- feedback_tx.send(()).expect("Sending collation fetch finished");
-
- // Now we should send it to the second validator
- assert_matches!(
- rx.await,
- Ok(full_response) => {
- let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
- = CollationFetchingResponse::decode(
- &mut full_response.result
- .expect("We should have a proper answer").as_ref()
- )
- .expect("Decoding should work");
- assert_eq!(receipt, candidate);
- assert_eq!(pov, pov_block);
-
- full_response.sent_feedback.expect("Feedback channel is always set")
- }
- );
-
- virtual_overseer
- });
+ );
}
+#[test]
+fn send_next_collation_after_max_unshared_upload_time() {
+ test_validator_send_sequence(|second_response_receiver, _| async move {
+ Delay::new(MAX_UNSHARED_UPLOAD_TIME + Duration::from_millis(50)).await;
+ second_response_receiver
+ }
+ );
+}
+
+
#[test]
fn collators_declare_to_connected_peers() {
let test_state = TestState::default();
@@ -924,3 +858,127 @@ fn collators_reject_declare_messages() {
virtual_overseer
})
}
+
+/// Run tests on validator response sequence.
+///
+/// After the first response is done, the passed in lambda will be called with the receiver for the
+/// next response and a sender for giving feedback on the response of the first transmission. After
+/// the lamda has passed it is assumed that the second response is sent, which is checked by this
+/// function.
+///
+/// The lambda can trigger occasions on which the second response should be sent, like timeouts,
+/// successful completion.
+fn test_validator_send_sequence(handle_first_response: T)
+where
+ T: FnOnce(oneshot::Receiver, oneshot::Sender<()>) -> F,
+ F: Future