mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 07:37:57 +00:00
DoS protection on the collator protocol (#3446)
* Move on to next validator after timeout. * Better naming. * Wrong implementation of validator fetch timeouts. * Validator side: Move on to next collator if download takes too long. * Drop multiple requests from same validator. * Add test that next response is sent after timeout. * Multiple requests by same validator should get dropped. * Test that another collator is tried after exclusive download time. * Add dep. * Cleanup. * Merge fix. * Review remarks. * Fixes. * Add log targets to trace logs Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin};
|
||||
use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin, time::Duration};
|
||||
|
||||
use futures::{FutureExt, StreamExt, channel::oneshot, stream::FuturesUnordered, select, Future};
|
||||
use sp_core::Pair;
|
||||
@@ -38,6 +38,7 @@ use polkadot_node_network_protocol::{
|
||||
v1 as protocol_v1,
|
||||
};
|
||||
use polkadot_node_subsystem_util::{
|
||||
TimeoutExt,
|
||||
metrics::{self, prometheus},
|
||||
runtime::{RuntimeInfo, get_availability_cores, get_group_rotation_info}
|
||||
};
|
||||
@@ -50,6 +51,19 @@ use super::{LOG_TARGET, Result};
|
||||
mod tests;
|
||||
|
||||
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
|
||||
const COST_APPARENT_FLOOD: Rep = Rep::CostMinor("Message received when previous one was still being processed");
|
||||
|
||||
/// Time after starting an upload to a validator we will start another one to the next validator,
|
||||
/// even if the upload was not finished yet.
|
||||
///
|
||||
/// This is to protect from a single slow validator preventing collations from happening.
|
||||
///
|
||||
/// With a collation size of 5Meg and bandwidth of 500Mbit/s (requirement for Kusama validators),
|
||||
/// the transfer should be possible within 0.1 seconds. 400 milliseconds should therefore be
|
||||
/// plenty and should be low enough for later validators to still be able to finish on time.
|
||||
///
|
||||
/// There is debug logging output, so we can adjust this value based on production results.
|
||||
const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(400);
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Metrics(Option<MetricsInner>);
|
||||
@@ -208,9 +222,14 @@ struct WaitingCollationFetches {
|
||||
collation_fetch_active: bool,
|
||||
/// The collation fetches waiting to be fulfilled.
|
||||
waiting: VecDeque<IncomingRequest<CollationFetchingRequest>>,
|
||||
/// All peers that are waiting or actively uploading.
|
||||
///
|
||||
/// We will not accept multiple requests from the same peer, otherwise our DoS protection of
|
||||
/// moving on to the next peer after `MAX_UNSHARED_UPLOAD_TIME` would be pointless.
|
||||
waiting_peers: HashSet<PeerId>,
|
||||
}
|
||||
|
||||
type ActiveCollationFetches = FuturesUnordered<Pin<Box<dyn Future<Output = Hash> + Send + 'static>>>;
|
||||
type ActiveCollationFetches = FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, PeerId)> + Send + 'static>>>;
|
||||
|
||||
struct State {
|
||||
/// Our network peer id.
|
||||
@@ -684,6 +703,17 @@ where
|
||||
|
||||
let waiting = state.waiting_collation_fetches.entry(incoming.payload.relay_parent).or_default();
|
||||
|
||||
if !waiting.waiting_peers.insert(incoming.peer) {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Dropping incoming request as peer has a request in flight already."
|
||||
);
|
||||
ctx.send_message(
|
||||
NetworkBridgeMessage::ReportPeer(incoming.peer, COST_APPARENT_FLOOD)
|
||||
).await;
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
if waiting.collation_fetch_active {
|
||||
waiting.waiting.push_back(incoming);
|
||||
} else {
|
||||
@@ -724,6 +754,7 @@ async fn send_collation(
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let relay_parent = request.payload.relay_parent;
|
||||
let peer_id = request.peer;
|
||||
|
||||
let response = OutgoingResponse {
|
||||
result: Ok(CollationFetchingResponse::Collation(receipt, pov)),
|
||||
@@ -739,8 +770,16 @@ async fn send_collation(
|
||||
}
|
||||
|
||||
state.active_collation_fetches.push(async move {
|
||||
let _ = rx.await;
|
||||
relay_parent
|
||||
let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
|
||||
if r.is_none() {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
?relay_parent,
|
||||
?peer_id,
|
||||
"Sending collation to validator timed out, carrying on with next validator."
|
||||
);
|
||||
}
|
||||
(relay_parent, peer_id)
|
||||
}.boxed());
|
||||
|
||||
state.metrics.on_collation_sent();
|
||||
@@ -986,8 +1025,9 @@ where
|
||||
FromOverseer::Signal(BlockFinalized(..)) => {}
|
||||
FromOverseer::Signal(Conclude) => return Ok(()),
|
||||
},
|
||||
relay_parent = state.active_collation_fetches.select_next_some() => {
|
||||
(relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => {
|
||||
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
|
||||
waiting.waiting_peers.remove(&peer_id);
|
||||
if let Some(next) = waiting.waiting.pop_front() {
|
||||
next
|
||||
} else {
|
||||
|
||||
@@ -560,6 +560,34 @@ fn advertise_and_send_collation() {
|
||||
)
|
||||
)
|
||||
).await;
|
||||
// Second request by same validator should get dropped and peer reported:
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::CollationFetchingRequest(
|
||||
IncomingRequest::new(
|
||||
peer,
|
||||
CollationFetchingRequest {
|
||||
relay_parent: test_state.relay_parent,
|
||||
para_id: test_state.para_id,
|
||||
},
|
||||
tx,
|
||||
)
|
||||
)
|
||||
).await;
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(bad_peer, _)) => {
|
||||
assert_eq!(bad_peer, peer);
|
||||
}
|
||||
);
|
||||
assert_matches!(
|
||||
rx.await,
|
||||
Err(_),
|
||||
"Multiple concurrent requests by the same validator should get dropped."
|
||||
);
|
||||
}
|
||||
|
||||
assert_matches!(
|
||||
rx.await,
|
||||
@@ -620,124 +648,30 @@ fn advertise_and_send_collation() {
|
||||
|
||||
#[test]
|
||||
fn send_only_one_collation_per_relay_parent_at_a_time() {
|
||||
let test_state = TestState::default();
|
||||
let local_peer_id = test_state.local_peer_id.clone();
|
||||
let collator_pair = test_state.collator_pair.clone();
|
||||
test_validator_send_sequence(|mut second_response_receiver, feedback_first_tx| async move {
|
||||
Delay::new(Duration::from_millis(100)).await;
|
||||
assert!(
|
||||
second_response_receiver.try_recv().unwrap().is_none(),
|
||||
"We should not have send the collation yet to the second validator",
|
||||
);
|
||||
|
||||
test_harness(local_peer_id, collator_pair, |test_harness| async move {
|
||||
let mut virtual_overseer = test_harness.virtual_overseer;
|
||||
|
||||
setup_system(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let DistributeCollation { candidate, pov_block } =
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
|
||||
for (val, peer) in test_state.current_group_validator_authority_ids()
|
||||
.into_iter()
|
||||
.zip(test_state.current_group_validator_peer_ids())
|
||||
{
|
||||
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
|
||||
// Signal that the collation fetch is finished
|
||||
feedback_first_tx.send(()).expect("Sending collation fetch finished");
|
||||
second_response_receiver
|
||||
}
|
||||
|
||||
// We declare to the connected validators that we are a collator.
|
||||
// We need to catch all `Declare` messages to the validators we've
|
||||
// previosly connected to.
|
||||
for peer_id in test_state.current_group_validator_peer_ids() {
|
||||
expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
|
||||
}
|
||||
|
||||
let validator_0 = test_state.current_group_validator_peer_ids()[0].clone();
|
||||
let validator_1 = test_state.current_group_validator_peer_ids()[1].clone();
|
||||
|
||||
// Send info about peer's view.
|
||||
send_peer_view_change(&mut virtual_overseer, &validator_0, vec![test_state.relay_parent]).await;
|
||||
send_peer_view_change(&mut virtual_overseer, &validator_1, vec![test_state.relay_parent]).await;
|
||||
|
||||
// The peer is interested in a leaf that we have a collation for;
|
||||
// advertise it.
|
||||
expect_advertise_collation_msg(&mut virtual_overseer, &validator_0, test_state.relay_parent).await;
|
||||
expect_advertise_collation_msg(&mut virtual_overseer, &validator_1, test_state.relay_parent).await;
|
||||
|
||||
// Request a collation.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::CollationFetchingRequest(
|
||||
IncomingRequest::new(
|
||||
validator_0,
|
||||
CollationFetchingRequest {
|
||||
relay_parent: test_state.relay_parent,
|
||||
para_id: test_state.para_id,
|
||||
},
|
||||
tx,
|
||||
)
|
||||
)
|
||||
).await;
|
||||
|
||||
// Keep the feedback channel alive because we need to use it to inform about the finished transfer.
|
||||
let feedback_tx = assert_matches!(
|
||||
rx.await,
|
||||
Ok(full_response) => {
|
||||
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
|
||||
= CollationFetchingResponse::decode(
|
||||
&mut full_response.result
|
||||
.expect("We should have a proper answer").as_ref()
|
||||
)
|
||||
.expect("Decoding should work");
|
||||
assert_eq!(receipt, candidate);
|
||||
assert_eq!(pov, pov_block);
|
||||
|
||||
full_response.sent_feedback.expect("Feedback channel is always set")
|
||||
}
|
||||
);
|
||||
|
||||
// Let the second validator request the collation.
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::CollationFetchingRequest(
|
||||
IncomingRequest::new(
|
||||
validator_1,
|
||||
CollationFetchingRequest {
|
||||
relay_parent: test_state.relay_parent,
|
||||
para_id: test_state.para_id,
|
||||
},
|
||||
tx,
|
||||
)
|
||||
)
|
||||
).await;
|
||||
|
||||
Delay::new(Duration::from_millis(500)).await;
|
||||
|
||||
assert!(
|
||||
rx.try_recv().unwrap().is_none(),
|
||||
"We should not have send the collation yet to the second validator",
|
||||
);
|
||||
|
||||
// Signal that the collation fetch is finished
|
||||
feedback_tx.send(()).expect("Sending collation fetch finished");
|
||||
|
||||
// Now we should send it to the second validator
|
||||
assert_matches!(
|
||||
rx.await,
|
||||
Ok(full_response) => {
|
||||
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
|
||||
= CollationFetchingResponse::decode(
|
||||
&mut full_response.result
|
||||
.expect("We should have a proper answer").as_ref()
|
||||
)
|
||||
.expect("Decoding should work");
|
||||
assert_eq!(receipt, candidate);
|
||||
assert_eq!(pov, pov_block);
|
||||
|
||||
full_response.sent_feedback.expect("Feedback channel is always set")
|
||||
}
|
||||
);
|
||||
|
||||
virtual_overseer
|
||||
});
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_next_collation_after_max_unshared_upload_time() {
|
||||
test_validator_send_sequence(|second_response_receiver, _| async move {
|
||||
Delay::new(MAX_UNSHARED_UPLOAD_TIME + Duration::from_millis(50)).await;
|
||||
second_response_receiver
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn collators_declare_to_connected_peers() {
|
||||
let test_state = TestState::default();
|
||||
@@ -924,3 +858,127 @@ fn collators_reject_declare_messages() {
|
||||
virtual_overseer
|
||||
})
|
||||
}
|
||||
|
||||
/// Run tests on validator response sequence.
|
||||
///
|
||||
/// After the first response is done, the passed in lambda will be called with the receiver for the
|
||||
/// next response and a sender for giving feedback on the response of the first transmission. After
|
||||
/// the lamda has passed it is assumed that the second response is sent, which is checked by this
|
||||
/// function.
|
||||
///
|
||||
/// The lambda can trigger occasions on which the second response should be sent, like timeouts,
|
||||
/// successful completion.
|
||||
fn test_validator_send_sequence<T, F>(handle_first_response: T)
|
||||
where
|
||||
T: FnOnce(oneshot::Receiver<sc_network::config::OutgoingResponse>, oneshot::Sender<()>) -> F,
|
||||
F: Future<Output=oneshot::Receiver<sc_network::config::OutgoingResponse>>
|
||||
{
|
||||
let test_state = TestState::default();
|
||||
let local_peer_id = test_state.local_peer_id.clone();
|
||||
let collator_pair = test_state.collator_pair.clone();
|
||||
|
||||
test_harness(local_peer_id, collator_pair, |test_harness| async move {
|
||||
let mut virtual_overseer = test_harness.virtual_overseer;
|
||||
|
||||
setup_system(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let DistributeCollation { candidate, pov_block } =
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
|
||||
for (val, peer) in test_state.current_group_validator_authority_ids()
|
||||
.into_iter()
|
||||
.zip(test_state.current_group_validator_peer_ids())
|
||||
{
|
||||
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
|
||||
}
|
||||
|
||||
// We declare to the connected validators that we are a collator.
|
||||
// We need to catch all `Declare` messages to the validators we've
|
||||
// previosly connected to.
|
||||
for peer_id in test_state.current_group_validator_peer_ids() {
|
||||
expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
|
||||
}
|
||||
|
||||
let validator_0 = test_state.current_group_validator_peer_ids()[0].clone();
|
||||
let validator_1 = test_state.current_group_validator_peer_ids()[1].clone();
|
||||
|
||||
// Send info about peer's view.
|
||||
send_peer_view_change(&mut virtual_overseer, &validator_0, vec![test_state.relay_parent]).await;
|
||||
send_peer_view_change(&mut virtual_overseer, &validator_1, vec![test_state.relay_parent]).await;
|
||||
|
||||
// The peer is interested in a leaf that we have a collation for;
|
||||
// advertise it.
|
||||
expect_advertise_collation_msg(&mut virtual_overseer, &validator_0, test_state.relay_parent).await;
|
||||
expect_advertise_collation_msg(&mut virtual_overseer, &validator_1, test_state.relay_parent).await;
|
||||
|
||||
// Request a collation.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::CollationFetchingRequest(
|
||||
IncomingRequest::new(
|
||||
validator_0,
|
||||
CollationFetchingRequest {
|
||||
relay_parent: test_state.relay_parent,
|
||||
para_id: test_state.para_id,
|
||||
},
|
||||
tx,
|
||||
)
|
||||
)
|
||||
).await;
|
||||
|
||||
// Keep the feedback channel alive because we need to use it to inform about the finished transfer.
|
||||
let feedback_tx = assert_matches!(
|
||||
rx.await,
|
||||
Ok(full_response) => {
|
||||
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
|
||||
= CollationFetchingResponse::decode(
|
||||
&mut full_response.result
|
||||
.expect("We should have a proper answer").as_ref()
|
||||
)
|
||||
.expect("Decoding should work");
|
||||
assert_eq!(receipt, candidate);
|
||||
assert_eq!(pov, pov_block);
|
||||
|
||||
full_response.sent_feedback.expect("Feedback channel is always set")
|
||||
}
|
||||
);
|
||||
|
||||
// Let the second validator request the collation.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::CollationFetchingRequest(
|
||||
IncomingRequest::new(
|
||||
validator_1,
|
||||
CollationFetchingRequest {
|
||||
relay_parent: test_state.relay_parent,
|
||||
para_id: test_state.para_id,
|
||||
},
|
||||
tx,
|
||||
)
|
||||
)
|
||||
).await;
|
||||
|
||||
let rx = handle_first_response(rx, feedback_tx).await;
|
||||
|
||||
// Now we should send it to the second validator
|
||||
assert_matches!(
|
||||
rx.await,
|
||||
Ok(full_response) => {
|
||||
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
|
||||
= CollationFetchingResponse::decode(
|
||||
&mut full_response.result
|
||||
.expect("We should have a proper answer").as_ref()
|
||||
)
|
||||
.expect("Decoding should work");
|
||||
assert_eq!(receipt, candidate);
|
||||
assert_eq!(pov, pov_block);
|
||||
|
||||
full_response.sent_feedback.expect("Feedback channel is always set")
|
||||
}
|
||||
);
|
||||
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
@@ -66,6 +66,19 @@ const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for
|
||||
const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected");
|
||||
const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem");
|
||||
|
||||
/// Time after starting a collation download from a collator we will start another one from the
|
||||
/// next collator even if the upload was not finished yet.
|
||||
///
|
||||
/// This is to protect from a single slow collator preventing collations from happening.
|
||||
///
|
||||
/// With a collation size of 5Meg and bandwidth of 500Mbit/s (requirement for Kusama validators),
|
||||
/// the transfer should be possible within 0.1 seconds. 400 milliseconds should therefore be
|
||||
/// plenty, even with multiple heads and should be low enough for later collators to still be able
|
||||
/// to finish on time.
|
||||
///
|
||||
/// There is debug logging output, so we can adjust this value based on production results.
|
||||
const MAX_UNSHARED_DOWNLOAD_TIME: Duration = Duration::from_millis(400);
|
||||
|
||||
// How often to check all peers with activity.
|
||||
#[cfg(not(test))]
|
||||
const ACTIVITY_POLL: Duration = Duration::from_secs(1);
|
||||
@@ -178,7 +191,7 @@ struct CollatingPeerState {
|
||||
enum PeerState {
|
||||
// The peer has connected at the given instant.
|
||||
Connected(Instant),
|
||||
// Thepe
|
||||
// Peer is collating.
|
||||
Collating(CollatingPeerState),
|
||||
}
|
||||
|
||||
@@ -514,6 +527,11 @@ impl CollationStatus {
|
||||
struct CollationsPerRelayParent {
|
||||
/// What is the current status in regards to a collation for this relay parent?
|
||||
status: CollationStatus,
|
||||
/// Collation currently being fetched.
|
||||
///
|
||||
/// This is the currently last started fetch, which did not exceed `MAX_UNSHARED_DOWNLOAD_TIME`
|
||||
/// yet.
|
||||
waiting_collation: Option<CollatorId>,
|
||||
/// Collation that were advertised to us, but we did not yet fetch.
|
||||
unfetched_collations: Vec<(PendingCollation, CollatorId)>,
|
||||
}
|
||||
@@ -523,14 +541,33 @@ impl CollationsPerRelayParent {
|
||||
///
|
||||
/// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`].
|
||||
///
|
||||
/// Returns `Some(_)` if there is any collation to fetch and the `status` is not `Seconded`.
|
||||
pub fn get_next_collation_to_fetch(&mut self) -> Option<(PendingCollation, CollatorId)> {
|
||||
/// Returns `Some(_)` if there is any collation to fetch, the `status` is not `Seconded` and
|
||||
/// the passed in `finished_one` is the currently `waiting_collation`.
|
||||
pub fn get_next_collation_to_fetch(
|
||||
&mut self,
|
||||
finished_one: Option<CollatorId>,
|
||||
) -> Option<(PendingCollation, CollatorId)> {
|
||||
// If finished one does not match waiting_collation, then we already dequeued another fetch
|
||||
// to replace it.
|
||||
if self.waiting_collation != finished_one {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
waiting_collation = ?self.waiting_collation,
|
||||
?finished_one,
|
||||
"Not proceeding to the next collation - has already been done."
|
||||
);
|
||||
return None
|
||||
}
|
||||
self.status.back_to_waiting();
|
||||
|
||||
match self.status {
|
||||
// We don't need to fetch any other collation when we already have seconded one.
|
||||
CollationStatus::Seconded => None,
|
||||
CollationStatus::Waiting => self.unfetched_collations.pop(),
|
||||
CollationStatus::Waiting => {
|
||||
let next = self.unfetched_collations.pop();
|
||||
self.waiting_collation = next.as_ref().map(|(_, collator_id)| collator_id.clone());
|
||||
next
|
||||
}
|
||||
CollationStatus::WaitingOnValidation | CollationStatus::Fetching =>
|
||||
unreachable!("We have reset the status above!"),
|
||||
}
|
||||
@@ -565,6 +602,13 @@ struct State {
|
||||
/// Keep track of all fetch collation requests
|
||||
collation_fetches: FuturesUnordered<BoxFuture<'static, PendingCollationFetch>>,
|
||||
|
||||
/// When a timer in this `FuturesUnordered` triggers, we should dequeue the next request
|
||||
/// attempt in the corresponding `collations_per_relay_parent`.
|
||||
///
|
||||
/// A triggering timer means that the fetching took too long for our taste and we should give
|
||||
/// another collator the chance to be faster (dequeue next fetch request as well).
|
||||
collation_fetch_timeouts: FuturesUnordered<BoxFuture<'static, (CollatorId, Hash)>>,
|
||||
|
||||
/// Information about the collations per relay parent.
|
||||
collations_per_relay_parent: HashMap<Hash, CollationsPerRelayParent>,
|
||||
|
||||
@@ -608,6 +652,13 @@ where
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let PendingCollation { relay_parent, para_id, peer_id, .. } = pc;
|
||||
|
||||
let timeout = |collator_id, relay_parent| async move {
|
||||
Delay::new(MAX_UNSHARED_DOWNLOAD_TIME).await;
|
||||
(collator_id, relay_parent)
|
||||
};
|
||||
state.collation_fetch_timeouts.push(timeout(id.clone(), relay_parent.clone()).boxed());
|
||||
|
||||
if state.peer_data.get(&peer_id).map_or(false, |d| d.has_advertised(&relay_parent)) {
|
||||
request_collation(ctx, state, relay_parent, para_id, peer_id, tx).await;
|
||||
}
|
||||
@@ -815,6 +866,7 @@ where
|
||||
);
|
||||
|
||||
modify_reputation(ctx, origin.clone(), COST_UNNEEDED_COLLATOR).await;
|
||||
tracing::trace!(target: LOG_TARGET, "Disconnecting unneeded collator");
|
||||
disconnect_peer(ctx, origin).await;
|
||||
}
|
||||
}
|
||||
@@ -863,7 +915,7 @@ where
|
||||
collations.unfetched_collations.push((pending_collation, id)),
|
||||
CollationStatus::Waiting => {
|
||||
collations.status = CollationStatus::Fetching;
|
||||
drop(collations);
|
||||
collations.waiting_collation = Some(id.clone());
|
||||
|
||||
fetch_collation(ctx, state, pending_collation.clone(), id).await;
|
||||
},
|
||||
@@ -959,6 +1011,7 @@ where
|
||||
// declare.
|
||||
if let Some(para_id) = peer_data.collating_para() {
|
||||
if !state.active_paras.is_current_or_next(para_id) {
|
||||
tracing::trace!(target: LOG_TARGET, "Disconnecting peer on view change");
|
||||
disconnect_peer(ctx, peer_id.clone()).await;
|
||||
}
|
||||
}
|
||||
@@ -1092,14 +1145,9 @@ where
|
||||
Entry::Vacant(_) => return,
|
||||
};
|
||||
|
||||
report_collator(ctx, &state.peer_data, id).await;
|
||||
report_collator(ctx, &state.peer_data, id.clone()).await;
|
||||
|
||||
if let Some((next, id)) = state.collations_per_relay_parent
|
||||
.get_mut(&parent)
|
||||
.and_then(|c| c.get_next_collation_to_fetch())
|
||||
{
|
||||
fetch_collation(ctx, state, next, id).await;
|
||||
}
|
||||
dequeue_next_collation_and_fetch(ctx, state, parent, id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1164,6 +1212,15 @@ where
|
||||
res = state.collation_fetches.select_next_some() => {
|
||||
handle_collation_fetched_result(&mut ctx, &mut state, res).await;
|
||||
}
|
||||
res = state.collation_fetch_timeouts.select_next_some() => {
|
||||
let (collator_id, relay_parent) = res;
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
?relay_parent,
|
||||
"Fetch for collation took too long, starting parallel download for next collator as well."
|
||||
);
|
||||
dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
let mut retained_requested = HashSet::new();
|
||||
@@ -1181,6 +1238,22 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Dequeue another collation and fetch.
|
||||
async fn dequeue_next_collation_and_fetch(
|
||||
ctx: &mut (impl SubsystemContext<Message = CollatorProtocolMessage> + overseer::SubsystemContext<Message = CollatorProtocolMessage>),
|
||||
state: &mut State,
|
||||
relay_parent: Hash,
|
||||
// The collator we tried to fetch from last.
|
||||
previous_fetch: CollatorId,
|
||||
) {
|
||||
if let Some((next, id)) = state.collations_per_relay_parent
|
||||
.get_mut(&relay_parent)
|
||||
.and_then(|c| c.get_next_collation_to_fetch(Some(previous_fetch)))
|
||||
{
|
||||
fetch_collation(ctx, state, next, id).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a fetched collation result.
|
||||
async fn handle_collation_fetched_result<Context>(
|
||||
ctx: &mut Context,
|
||||
@@ -1209,18 +1282,20 @@ where
|
||||
"Failed to fetch collation.",
|
||||
);
|
||||
|
||||
if let Some((next, id)) = state.collations_per_relay_parent
|
||||
.get_mut(&relay_parent)
|
||||
.and_then(|c| c.get_next_collation_to_fetch())
|
||||
{
|
||||
fetch_collation(ctx, state, next, id).await;
|
||||
}
|
||||
|
||||
dequeue_next_collation_and_fetch(ctx, state, relay_parent, collation_event.0).await;
|
||||
return
|
||||
},
|
||||
};
|
||||
|
||||
if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) {
|
||||
if let CollationStatus::Seconded = collations.status {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
?relay_parent,
|
||||
"Already seconded - no longer interested in collation fetch result."
|
||||
);
|
||||
return
|
||||
}
|
||||
collations.status = CollationStatus::WaitingOnValidation;
|
||||
}
|
||||
|
||||
@@ -1236,11 +1311,11 @@ where
|
||||
|
||||
entry.insert(collation_event);
|
||||
} else {
|
||||
tracing::error!(
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
?relay_parent,
|
||||
candidate = ?candidate_receipt.hash(),
|
||||
"Trying to insert a pending candidate failed, because there is already one!",
|
||||
"Trying to insert a pending candidate failed, because there is already one.",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1259,6 +1334,7 @@ where
|
||||
{
|
||||
for (peer, peer_data) in peers {
|
||||
if peer_data.is_inactive(&eviction_policy) {
|
||||
tracing::trace!(target: LOG_TARGET, "Disconnecting inactive peer");
|
||||
disconnect_peer(ctx, peer.clone()).await;
|
||||
}
|
||||
}
|
||||
@@ -1324,9 +1400,9 @@ where
|
||||
"Fetching collation failed due to network error"
|
||||
);
|
||||
// A minor decrease in reputation for any network failure seems
|
||||
// sensbile. In theory this could be exploited, by DoSing this node,
|
||||
// sensible. In theory this could be exploited, by DoSing this node,
|
||||
// which would result in reduced reputation for proper nodes, but the
|
||||
// same can happen for penalities on timeouts, which we also have.
|
||||
// same can happen for penalties on timeouts, which we also have.
|
||||
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_NETWORK_ERROR).await;
|
||||
}
|
||||
Err(RequestError::Canceled(_)) => {
|
||||
@@ -1338,9 +1414,9 @@ where
|
||||
"Request timed out"
|
||||
);
|
||||
// A minor decrease in reputation for any network failure seems
|
||||
// sensbile. In theory this could be exploited, by DoSing this node,
|
||||
// sensible. In theory this could be exploited, by DoSing this node,
|
||||
// which would result in reduced reputation for proper nodes, but the
|
||||
// same can happen for penalities on timeouts, which we also have.
|
||||
// same can happen for penalties on timeouts, which we also have.
|
||||
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_REQUEST_TIMED_OUT).await;
|
||||
}
|
||||
Ok(CollationFetchingResponse::Collation(receipt, _))
|
||||
|
||||
@@ -60,7 +60,7 @@ impl Default for TestState {
|
||||
let relay_parent = Hash::repeat_byte(0x05);
|
||||
let collators = iter::repeat(())
|
||||
.map(|_| CollatorPair::generate().0)
|
||||
.take(4)
|
||||
.take(5)
|
||||
.collect();
|
||||
|
||||
let validators = vec![
|
||||
@@ -512,9 +512,10 @@ fn collator_authentication_verification_works() {
|
||||
// our view.
|
||||
// - Collation protocol should request one PoV.
|
||||
// - Collation protocol should disconnect both collators after having received the collation.
|
||||
// - The same collators connect again and send povs for a different relay parent.
|
||||
// - The same collators plus an additional collator connect again and send povs for a different relay parent.
|
||||
// - Collation protocol will request one PoV, but we will cancel it.
|
||||
// - Collation protocol should request the second PoV.
|
||||
// - Collation protocol should request the second PoV which does not succeed in time.
|
||||
// - Collation protocol should request third PoV.
|
||||
#[test]
|
||||
fn fetch_collations_works() {
|
||||
let test_state = TestState::default();
|
||||
@@ -564,7 +565,7 @@ fn fetch_collations_works() {
|
||||
|
||||
assert!(
|
||||
overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30)).await.is_none(),
|
||||
"There should not be sent any other PoV request while the first one wasn't finished",
|
||||
"There should not be sent any other PoV request while the first one wasn't finished or timed out.",
|
||||
);
|
||||
|
||||
let pov = PoV { block_data: BlockData(vec![]) };
|
||||
@@ -597,6 +598,7 @@ fn fetch_collations_works() {
|
||||
|
||||
let peer_b = PeerId::random();
|
||||
let peer_c = PeerId::random();
|
||||
let peer_d = PeerId::random();
|
||||
|
||||
connect_and_declare_collator(
|
||||
&mut virtual_overseer,
|
||||
@@ -612,8 +614,16 @@ fn fetch_collations_works() {
|
||||
test_state.chain_ids[0].clone(),
|
||||
).await;
|
||||
|
||||
connect_and_declare_collator(
|
||||
&mut virtual_overseer,
|
||||
peer_d.clone(),
|
||||
test_state.collators[4].clone(),
|
||||
test_state.chain_ids[0].clone(),
|
||||
).await;
|
||||
|
||||
advertise_collation(&mut virtual_overseer, peer_b.clone(), second).await;
|
||||
advertise_collation(&mut virtual_overseer, peer_c.clone(), second).await;
|
||||
advertise_collation(&mut virtual_overseer, peer_d.clone(), second).await;
|
||||
|
||||
// Dropping the response channel should lead to fetching the second collation.
|
||||
assert_fetch_collation_request(
|
||||
@@ -633,6 +643,15 @@ fn fetch_collations_works() {
|
||||
}
|
||||
);
|
||||
|
||||
let response_channel_non_exclusive = assert_fetch_collation_request(
|
||||
&mut virtual_overseer,
|
||||
second,
|
||||
test_state.chain_ids[0],
|
||||
).await;
|
||||
|
||||
// Third collator should receive response after that timeout:
|
||||
Delay::new(MAX_UNSHARED_DOWNLOAD_TIME + Duration::from_millis(50)).await;
|
||||
|
||||
let response_channel = assert_fetch_collation_request(
|
||||
&mut virtual_overseer,
|
||||
second,
|
||||
@@ -643,6 +662,15 @@ fn fetch_collations_works() {
|
||||
let mut candidate_a = CandidateReceipt::default();
|
||||
candidate_a.descriptor.para_id = test_state.chain_ids[0];
|
||||
candidate_a.descriptor.relay_parent = second;
|
||||
|
||||
// First request finishes now:
|
||||
response_channel_non_exclusive.send(Ok(
|
||||
CollationFetchingResponse::Collation(
|
||||
candidate_a.clone(),
|
||||
pov.clone(),
|
||||
).encode()
|
||||
)).expect("Sending response should succeed");
|
||||
|
||||
response_channel.send(Ok(
|
||||
CollationFetchingResponse::Collation(
|
||||
candidate_a.clone(),
|
||||
|
||||
Reference in New Issue
Block a user