mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 07:37:57 +00:00
collator-protocol/validator_side: a couple of fixes (#4179)
* collator-protocol/validator: do not wait 1s to poll requested collations * collator-protocol/validator: do not request collation for the next group * put everything into select * fmt * more hacks yay * a test * review nits * remove outdated comment
This commit is contained in:
@@ -19,7 +19,7 @@ use futures::{
|
||||
channel::oneshot,
|
||||
future::{BoxFuture, Fuse, FusedFuture},
|
||||
select,
|
||||
stream::FuturesUnordered,
|
||||
stream::{FusedStream, FuturesUnordered},
|
||||
FutureExt, StreamExt,
|
||||
};
|
||||
use futures_timer::Delay;
|
||||
@@ -94,6 +94,11 @@ const ACTIVITY_POLL: Duration = Duration::from_secs(1);
|
||||
#[cfg(test)]
|
||||
const ACTIVITY_POLL: Duration = Duration::from_millis(10);
|
||||
|
||||
// How often to poll collation responses.
|
||||
// This is a hack that should be removed in a refactoring.
|
||||
// See https://github.com/paritytech/polkadot/issues/4182
|
||||
const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5);
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Metrics(Option<MetricsInner>);
|
||||
|
||||
@@ -467,6 +472,10 @@ impl ActiveParas {
|
||||
fn is_current_or_next(&self, id: ParaId) -> bool {
|
||||
self.current_assignments.contains_key(&id) || self.next_assignments.contains_key(&id)
|
||||
}
|
||||
|
||||
fn is_current(&self, id: &ParaId) -> bool {
|
||||
self.current_assignments.contains_key(id)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
|
||||
@@ -886,6 +895,20 @@ async fn process_incoming_peer_message<Context>(
|
||||
Some(p) => p,
|
||||
};
|
||||
|
||||
if let PeerState::Collating(ref collating_state) = peer_data.state {
|
||||
let para_id = collating_state.para_id;
|
||||
if !state.active_paras.is_current(¶_id) {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
peer_id = ?origin,
|
||||
%para_id,
|
||||
?relay_parent,
|
||||
"Received advertise collation, but we are assigned to the next group",
|
||||
);
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
match peer_data.insert_advertisement(relay_parent, &state.view) {
|
||||
Ok((id, para_id)) => {
|
||||
tracing::debug!(
|
||||
@@ -1136,6 +1159,13 @@ async fn wait_until_next_check(last_poll: Instant) -> Instant {
|
||||
Instant::now()
|
||||
}
|
||||
|
||||
fn infinite_stream(every: Duration) -> impl FusedStream<Item = ()> {
|
||||
futures::stream::unfold(Instant::now() + every, |next_check| async move {
|
||||
Some(((), wait_until_next_check(next_check).await))
|
||||
})
|
||||
.fuse()
|
||||
}
|
||||
|
||||
/// The main run loop.
|
||||
pub(crate) async fn run<Context>(
|
||||
mut ctx: Context,
|
||||
@@ -1147,18 +1177,14 @@ where
|
||||
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>,
|
||||
{
|
||||
use OverseerSignal::*;
|
||||
|
||||
let mut state = State { metrics, ..Default::default() };
|
||||
|
||||
let next_inactivity_stream =
|
||||
futures::stream::unfold(Instant::now() + ACTIVITY_POLL, |next_check| async move {
|
||||
Some(((), wait_until_next_check(next_check).await))
|
||||
})
|
||||
.fuse();
|
||||
|
||||
let next_inactivity_stream = infinite_stream(ACTIVITY_POLL);
|
||||
futures::pin_mut!(next_inactivity_stream);
|
||||
|
||||
let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL);
|
||||
futures::pin_mut!(check_collations_stream);
|
||||
|
||||
loop {
|
||||
select! {
|
||||
res = ctx.recv().fuse() => {
|
||||
@@ -1172,8 +1198,8 @@ where
|
||||
&mut state,
|
||||
).await;
|
||||
}
|
||||
Ok(FromOverseer::Signal(Conclude)) => break,
|
||||
_ => {},
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) | Err(_) => break,
|
||||
Ok(FromOverseer::Signal(_)) => continue,
|
||||
}
|
||||
}
|
||||
_ = next_inactivity_stream.next() => {
|
||||
@@ -1191,28 +1217,47 @@ where
|
||||
);
|
||||
dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await;
|
||||
}
|
||||
}
|
||||
_ = check_collations_stream.next() => {
|
||||
let reputation_changes = poll_requests(
|
||||
&mut state.requested_collations,
|
||||
&state.metrics,
|
||||
&state.span_per_relay_parent,
|
||||
).await;
|
||||
|
||||
let mut retained_requested = HashSet::new();
|
||||
for (pending_collation, per_req) in state.requested_collations.iter_mut() {
|
||||
// Despite the await, this won't block on the response itself.
|
||||
let finished = poll_collation_response(
|
||||
&mut ctx,
|
||||
&state.metrics,
|
||||
&state.span_per_relay_parent,
|
||||
pending_collation,
|
||||
per_req,
|
||||
)
|
||||
.await;
|
||||
if !finished {
|
||||
retained_requested.insert(pending_collation.clone());
|
||||
}
|
||||
for (peer_id, rep) in reputation_changes {
|
||||
modify_reputation(&mut ctx, peer_id, rep).await;
|
||||
}
|
||||
},
|
||||
}
|
||||
state.requested_collations.retain(|k, _| retained_requested.contains(k));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn poll_requests(
|
||||
requested_collations: &mut HashMap<PendingCollation, PerRequest>,
|
||||
metrics: &Metrics,
|
||||
span_per_relay_parent: &HashMap<Hash, PerLeafSpan>,
|
||||
) -> Vec<(PeerId, Rep)> {
|
||||
let mut retained_requested = HashSet::new();
|
||||
let mut reputation_changes = Vec::new();
|
||||
for (pending_collation, per_req) in requested_collations.iter_mut() {
|
||||
// Despite the await, this won't block on the response itself.
|
||||
let result =
|
||||
poll_collation_response(metrics, span_per_relay_parent, pending_collation, per_req)
|
||||
.await;
|
||||
|
||||
if !result.is_ready() {
|
||||
retained_requested.insert(pending_collation.clone());
|
||||
}
|
||||
if let CollationFetchResult::Error(rep) = result {
|
||||
reputation_changes.push((pending_collation.peer_id.clone(), rep));
|
||||
}
|
||||
}
|
||||
requested_collations.retain(|k, _| retained_requested.contains(k));
|
||||
reputation_changes
|
||||
}
|
||||
|
||||
/// Dequeue another collation and fetch.
|
||||
async fn dequeue_next_collation_and_fetch(
|
||||
ctx: &mut (impl SubsystemContext<Message = CollatorProtocolMessage>
|
||||
@@ -1314,29 +1359,38 @@ async fn disconnect_inactive_peers<Context>(
|
||||
}
|
||||
}
|
||||
|
||||
enum CollationFetchResult {
|
||||
/// The collation is still being fetched.
|
||||
Pending,
|
||||
/// The collation was fetched successfully.
|
||||
Success,
|
||||
/// An error occurred when fetching a collation or it was invalid.
|
||||
/// A reputation change should be applied to the peer.
|
||||
Error(Rep),
|
||||
}
|
||||
|
||||
impl CollationFetchResult {
|
||||
fn is_ready(&self) -> bool {
|
||||
!matches!(self, Self::Pending)
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll collation response, return immediately if there is none.
|
||||
///
|
||||
/// Ready responses are handled, by logging and decreasing peer's reputation on error and by
|
||||
/// Ready responses are handled, by logging and by
|
||||
/// forwarding proper responses to the requester.
|
||||
///
|
||||
/// Returns: `true` if `from_collator` future was ready.
|
||||
async fn poll_collation_response<Context>(
|
||||
ctx: &mut Context,
|
||||
async fn poll_collation_response(
|
||||
metrics: &Metrics,
|
||||
spans: &HashMap<Hash, PerLeafSpan>,
|
||||
pending_collation: &PendingCollation,
|
||||
per_req: &mut PerRequest,
|
||||
) -> bool
|
||||
where
|
||||
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
|
||||
Context: SubsystemContext,
|
||||
{
|
||||
) -> CollationFetchResult {
|
||||
if never!(per_req.from_collator.is_terminated()) {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
"We remove pending responses once received, this should not happen."
|
||||
);
|
||||
return true
|
||||
return CollationFetchResult::Success
|
||||
}
|
||||
|
||||
if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) {
|
||||
@@ -1348,7 +1402,7 @@ where
|
||||
let mut metrics_result = Err(());
|
||||
let mut success = "false";
|
||||
|
||||
match response {
|
||||
let result = match response {
|
||||
Err(RequestError::InvalidResponse(err)) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
@@ -1358,8 +1412,7 @@ where
|
||||
err = ?err,
|
||||
"Collator provided response that could not be decoded"
|
||||
);
|
||||
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_CORRUPTED_MESSAGE)
|
||||
.await;
|
||||
CollationFetchResult::Error(COST_CORRUPTED_MESSAGE)
|
||||
},
|
||||
Err(RequestError::NetworkError(err)) => {
|
||||
tracing::debug!(
|
||||
@@ -1374,7 +1427,7 @@ where
|
||||
// sensible. In theory this could be exploited, by DoSing this node,
|
||||
// which would result in reduced reputation for proper nodes, but the
|
||||
// same can happen for penalties on timeouts, which we also have.
|
||||
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_NETWORK_ERROR).await;
|
||||
CollationFetchResult::Error(COST_NETWORK_ERROR)
|
||||
},
|
||||
Err(RequestError::Canceled(_)) => {
|
||||
tracing::debug!(
|
||||
@@ -1388,8 +1441,7 @@ where
|
||||
// sensible. In theory this could be exploited, by DoSing this node,
|
||||
// which would result in reduced reputation for proper nodes, but the
|
||||
// same can happen for penalties on timeouts, which we also have.
|
||||
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_REQUEST_TIMED_OUT)
|
||||
.await;
|
||||
CollationFetchResult::Error(COST_REQUEST_TIMED_OUT)
|
||||
},
|
||||
Ok(CollationFetchingResponse::Collation(receipt, _))
|
||||
if receipt.descriptor().para_id != pending_collation.para_id =>
|
||||
@@ -1402,7 +1454,7 @@ where
|
||||
"Got wrong para ID for requested collation."
|
||||
);
|
||||
|
||||
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_WRONG_PARA).await;
|
||||
CollationFetchResult::Error(COST_WRONG_PARA)
|
||||
}
|
||||
Ok(CollationFetchingResponse::Collation(receipt, pov)) => {
|
||||
tracing::debug!(
|
||||
@@ -1430,12 +1482,15 @@ where
|
||||
metrics_result = Ok(());
|
||||
success = "true";
|
||||
}
|
||||
|
||||
CollationFetchResult::Success
|
||||
},
|
||||
};
|
||||
metrics.on_request(metrics_result);
|
||||
per_req.span.as_mut().map(|s| s.add_string_tag("success", success));
|
||||
true
|
||||
|
||||
result
|
||||
} else {
|
||||
false
|
||||
CollationFetchResult::Pending
|
||||
}
|
||||
}
|
||||
|
||||
@@ -674,6 +674,46 @@ fn fetch_collations_works() {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dont_fetch_collation_if_assigned_to_next_group() {
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(|test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
|
||||
our_view![test_state.relay_parent],
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
|
||||
respond_to_core_info_queries(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let peer_b = PeerId::random();
|
||||
|
||||
connect_and_declare_collator(
|
||||
&mut virtual_overseer,
|
||||
peer_b.clone(),
|
||||
test_state.collators[0].clone(),
|
||||
test_state.chain_ids[1].clone(), // next, not current para_id
|
||||
)
|
||||
.await;
|
||||
|
||||
advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await;
|
||||
|
||||
assert!(
|
||||
overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30))
|
||||
.await
|
||||
.is_none(),
|
||||
"There should be no PoV fetching request.",
|
||||
);
|
||||
|
||||
virtual_overseer
|
||||
})
|
||||
}
|
||||
|
||||
// Ensure that we fetch a second collation, after the first checked collation was found to be invalid.
|
||||
#[test]
|
||||
fn fetch_next_collation_on_invalid_collation() {
|
||||
|
||||
Reference in New Issue
Block a user