diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs index d9e1de0625..0b050d3c5e 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs @@ -19,7 +19,7 @@ use futures::{ channel::oneshot, future::{BoxFuture, Fuse, FusedFuture}, select, - stream::FuturesUnordered, + stream::{FusedStream, FuturesUnordered}, FutureExt, StreamExt, }; use futures_timer::Delay; @@ -94,6 +94,11 @@ const ACTIVITY_POLL: Duration = Duration::from_secs(1); #[cfg(test)] const ACTIVITY_POLL: Duration = Duration::from_millis(10); +// How often to poll collation responses. +// This is a hack that should be removed in a refactoring. +// See https://github.com/paritytech/polkadot/issues/4182 +const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5); + #[derive(Clone, Default)] pub struct Metrics(Option); @@ -467,6 +472,10 @@ impl ActiveParas { fn is_current_or_next(&self, id: ParaId) -> bool { self.current_assignments.contains_key(&id) || self.next_assignments.contains_key(&id) } + + fn is_current(&self, id: &ParaId) -> bool { + self.current_assignments.contains_key(id) + } } #[derive(Debug, Clone, Hash, Eq, PartialEq)] @@ -886,6 +895,20 @@ async fn process_incoming_peer_message( Some(p) => p, }; + if let PeerState::Collating(ref collating_state) = peer_data.state { + let para_id = collating_state.para_id; + if !state.active_paras.is_current(¶_id) { + tracing::debug!( + target: LOG_TARGET, + peer_id = ?origin, + %para_id, + ?relay_parent, + "Received advertise collation, but we are assigned to the next group", + ); + return + } + } + match peer_data.insert_advertisement(relay_parent, &state.view) { Ok((id, para_id)) => { tracing::debug!( @@ -1136,6 +1159,13 @@ async fn wait_until_next_check(last_poll: Instant) -> Instant { Instant::now() } +fn infinite_stream(every: Duration) -> impl FusedStream { + futures::stream::unfold(Instant::now() + every, |next_check| async move { + Some(((), wait_until_next_check(next_check).await)) + }) + .fuse() +} + /// The main run loop. pub(crate) async fn run( mut ctx: Context, @@ -1147,18 +1177,14 @@ where Context: overseer::SubsystemContext, Context: SubsystemContext, { - use OverseerSignal::*; - let mut state = State { metrics, ..Default::default() }; - let next_inactivity_stream = - futures::stream::unfold(Instant::now() + ACTIVITY_POLL, |next_check| async move { - Some(((), wait_until_next_check(next_check).await)) - }) - .fuse(); - + let next_inactivity_stream = infinite_stream(ACTIVITY_POLL); futures::pin_mut!(next_inactivity_stream); + let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL); + futures::pin_mut!(check_collations_stream); + loop { select! { res = ctx.recv().fuse() => { @@ -1172,8 +1198,8 @@ where &mut state, ).await; } - Ok(FromOverseer::Signal(Conclude)) => break, - _ => {}, + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) | Err(_) => break, + Ok(FromOverseer::Signal(_)) => continue, } } _ = next_inactivity_stream.next() => { @@ -1191,28 +1217,47 @@ where ); dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await; } - } + _ = check_collations_stream.next() => { + let reputation_changes = poll_requests( + &mut state.requested_collations, + &state.metrics, + &state.span_per_relay_parent, + ).await; - let mut retained_requested = HashSet::new(); - for (pending_collation, per_req) in state.requested_collations.iter_mut() { - // Despite the await, this won't block on the response itself. - let finished = poll_collation_response( - &mut ctx, - &state.metrics, - &state.span_per_relay_parent, - pending_collation, - per_req, - ) - .await; - if !finished { - retained_requested.insert(pending_collation.clone()); - } + for (peer_id, rep) in reputation_changes { + modify_reputation(&mut ctx, peer_id, rep).await; + } + }, } - state.requested_collations.retain(|k, _| retained_requested.contains(k)); } + Ok(()) } +async fn poll_requests( + requested_collations: &mut HashMap, + metrics: &Metrics, + span_per_relay_parent: &HashMap, +) -> Vec<(PeerId, Rep)> { + let mut retained_requested = HashSet::new(); + let mut reputation_changes = Vec::new(); + for (pending_collation, per_req) in requested_collations.iter_mut() { + // Despite the await, this won't block on the response itself. + let result = + poll_collation_response(metrics, span_per_relay_parent, pending_collation, per_req) + .await; + + if !result.is_ready() { + retained_requested.insert(pending_collation.clone()); + } + if let CollationFetchResult::Error(rep) = result { + reputation_changes.push((pending_collation.peer_id.clone(), rep)); + } + } + requested_collations.retain(|k, _| retained_requested.contains(k)); + reputation_changes +} + /// Dequeue another collation and fetch. async fn dequeue_next_collation_and_fetch( ctx: &mut (impl SubsystemContext @@ -1314,29 +1359,38 @@ async fn disconnect_inactive_peers( } } +enum CollationFetchResult { + /// The collation is still being fetched. + Pending, + /// The collation was fetched successfully. + Success, + /// An error occurred when fetching a collation or it was invalid. + /// A reputation change should be applied to the peer. + Error(Rep), +} + +impl CollationFetchResult { + fn is_ready(&self) -> bool { + !matches!(self, Self::Pending) + } +} + /// Poll collation response, return immediately if there is none. /// -/// Ready responses are handled, by logging and decreasing peer's reputation on error and by +/// Ready responses are handled, by logging and by /// forwarding proper responses to the requester. -/// -/// Returns: `true` if `from_collator` future was ready. -async fn poll_collation_response( - ctx: &mut Context, +async fn poll_collation_response( metrics: &Metrics, spans: &HashMap, pending_collation: &PendingCollation, per_req: &mut PerRequest, -) -> bool -where - Context: overseer::SubsystemContext, - Context: SubsystemContext, -{ +) -> CollationFetchResult { if never!(per_req.from_collator.is_terminated()) { tracing::error!( target: LOG_TARGET, "We remove pending responses once received, this should not happen." ); - return true + return CollationFetchResult::Success } if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) { @@ -1348,7 +1402,7 @@ where let mut metrics_result = Err(()); let mut success = "false"; - match response { + let result = match response { Err(RequestError::InvalidResponse(err)) => { tracing::warn!( target: LOG_TARGET, @@ -1358,8 +1412,7 @@ where err = ?err, "Collator provided response that could not be decoded" ); - modify_reputation(ctx, pending_collation.peer_id.clone(), COST_CORRUPTED_MESSAGE) - .await; + CollationFetchResult::Error(COST_CORRUPTED_MESSAGE) }, Err(RequestError::NetworkError(err)) => { tracing::debug!( @@ -1374,7 +1427,7 @@ where // sensible. In theory this could be exploited, by DoSing this node, // which would result in reduced reputation for proper nodes, but the // same can happen for penalties on timeouts, which we also have. - modify_reputation(ctx, pending_collation.peer_id.clone(), COST_NETWORK_ERROR).await; + CollationFetchResult::Error(COST_NETWORK_ERROR) }, Err(RequestError::Canceled(_)) => { tracing::debug!( @@ -1388,8 +1441,7 @@ where // sensible. In theory this could be exploited, by DoSing this node, // which would result in reduced reputation for proper nodes, but the // same can happen for penalties on timeouts, which we also have. - modify_reputation(ctx, pending_collation.peer_id.clone(), COST_REQUEST_TIMED_OUT) - .await; + CollationFetchResult::Error(COST_REQUEST_TIMED_OUT) }, Ok(CollationFetchingResponse::Collation(receipt, _)) if receipt.descriptor().para_id != pending_collation.para_id => @@ -1402,7 +1454,7 @@ where "Got wrong para ID for requested collation." ); - modify_reputation(ctx, pending_collation.peer_id.clone(), COST_WRONG_PARA).await; + CollationFetchResult::Error(COST_WRONG_PARA) } Ok(CollationFetchingResponse::Collation(receipt, pov)) => { tracing::debug!( @@ -1430,12 +1482,15 @@ where metrics_result = Ok(()); success = "true"; } + + CollationFetchResult::Success }, }; metrics.on_request(metrics_result); per_req.span.as_mut().map(|s| s.add_string_tag("success", success)); - true + + result } else { - false + CollationFetchResult::Pending } } diff --git a/polkadot/node/network/collator-protocol/src/validator_side/tests.rs b/polkadot/node/network/collator-protocol/src/validator_side/tests.rs index a4bc8cac68..7c110d67fe 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/tests.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/tests.rs @@ -674,6 +674,46 @@ fn fetch_collations_works() { }); } +#[test] +fn dont_fetch_collation_if_assigned_to_next_group() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + our_view![test_state.relay_parent], + )), + ) + .await; + + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + + let peer_b = PeerId::random(); + + connect_and_declare_collator( + &mut virtual_overseer, + peer_b.clone(), + test_state.collators[0].clone(), + test_state.chain_ids[1].clone(), // next, not current para_id + ) + .await; + + advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await; + + assert!( + overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30)) + .await + .is_none(), + "There should be no PoV fetching request.", + ); + + virtual_overseer + }) +} + // Ensure that we fetch a second collation, after the first checked collation was found to be invalid. #[test] fn fetch_next_collation_on_invalid_collation() {