Do not force collators to update after enabling async backing (#1920)

The validators are checking if async backing is enabled by checking the
version of the runtime api. If the runtime api is upgraded by a runtime
upgrade, the validators start to also enable the async backing logic.
However, just because async backing is enabled, it doesn't mean that all
collators and parachain runtimes have upgraded. This pull request fixes
an issue about advertising collations to the relay chain when it has
async backing enabled, but the collator is still using the old
networking protocol. The implementation is actually backwards compatible
as we can not expect that everyone directly upgrades. However, the
collation advertisement logic was requiring V2 networking messages after
async backing was enabled, which was wrong. This is now fixed by this
pull request.

Closes: https://github.com/paritytech/polkadot-sdk/issues/1923

---------

Co-authored-by: eskimor <eskimor@users.noreply.github.com>
This commit is contained in:
Bastian Köcher
2023-10-19 21:45:33 +02:00
committed by GitHub
parent 961a8fd77e
commit b967ba53d3
4 changed files with 267 additions and 100 deletions
@@ -31,6 +31,7 @@ use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};
use futures::{future::BoxFuture, FutureExt};
use polkadot_node_network_protocol::{
peer_set::CollationVersion,
request_response::{outgoing::RequestError, v1 as request_v1, OutgoingResult},
PeerId,
};
@@ -160,6 +161,8 @@ pub fn fetched_collation_sanity_check(
pub struct CollationEvent {
/// Collator id.
pub collator_id: CollatorId,
/// The network protocol version the collator is using.
pub collator_protocol_version: CollationVersion,
/// The requested collation data.
pub pending_collation: PendingCollation,
}
@@ -307,6 +310,8 @@ pub(super) struct CollationFetchRequest {
pub pending_collation: PendingCollation,
/// Collator id.
pub collator_id: CollatorId,
/// The network protocol version the collator is using.
pub collator_protocol_version: CollationVersion,
/// Responses from collator.
pub from_collator: BoxFuture<'static, OutgoingResult<request_v1::CollationFetchingResponse>>,
/// Handle used for checking if this request was cancelled.
@@ -334,6 +339,7 @@ impl Future for CollationFetchRequest {
self.span.as_mut().map(|s| s.add_string_tag("success", "false"));
return Poll::Ready((
CollationEvent {
collator_protocol_version: self.collator_protocol_version,
collator_id: self.collator_id.clone(),
pending_collation: self.pending_collation,
},
@@ -344,6 +350,7 @@ impl Future for CollationFetchRequest {
let res = self.from_collator.poll_unpin(cx).map(|res| {
(
CollationEvent {
collator_protocol_version: self.collator_protocol_version,
collator_id: self.collator_id.clone(),
pending_collation: self.pending_collation,
},
@@ -85,6 +85,8 @@ const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Invalid network message signature");
const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem");
const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for the wrong para");
const COST_PROTOCOL_MISUSE: Rep =
Rep::Malicious("A collator advertising a collation for an async backing relay parent using V1");
const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected");
const BENEFIT_NOTIFY_GOOD: Rep =
Rep::BenefitMinor("A collator was noted good by another subsystem");
@@ -144,9 +146,6 @@ enum InsertAdvertisementError {
UndeclaredCollator,
/// A limit for announcements per peer is reached.
PeerLimitReached,
/// Mismatch of relay parent mode and advertisement arguments.
/// An internal error that should not happen.
ProtocolMismatch,
}
#[derive(Debug)]
@@ -252,23 +251,41 @@ impl PeerData {
},
(
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. },
Some(candidate_hash),
candidate_hash,
) => {
if state
.advertisements
.get(&on_relay_parent)
.map_or(false, |candidates| candidates.contains(&candidate_hash))
{
return Err(InsertAdvertisementError::Duplicate)
}
let candidates = state.advertisements.entry(on_relay_parent).or_default();
if let Some(candidate_hash) = candidate_hash {
if state
.advertisements
.get(&on_relay_parent)
.map_or(false, |candidates| candidates.contains(&candidate_hash))
{
return Err(InsertAdvertisementError::Duplicate)
}
if candidates.len() > max_candidate_depth {
return Err(InsertAdvertisementError::PeerLimitReached)
}
candidates.insert(candidate_hash);
let candidates =
state.advertisements.entry(on_relay_parent).or_default();
if candidates.len() > max_candidate_depth {
return Err(InsertAdvertisementError::PeerLimitReached)
}
candidates.insert(candidate_hash);
} else {
if self.version != CollationVersion::V1 {
gum::error!(
target: LOG_TARGET,
"Programming error, `candidate_hash` can not be `None` \
for non `V1` networking.",
);
}
if state.advertisements.contains_key(&on_relay_parent) {
return Err(InsertAdvertisementError::Duplicate)
}
state
.advertisements
.insert(on_relay_parent, HashSet::from_iter(candidate_hash));
};
},
_ => return Err(InsertAdvertisementError::ProtocolMismatch),
}
state.last_active = Instant::now();
@@ -705,6 +722,7 @@ async fn request_collation(
let collation_request = CollationFetchRequest {
pending_collation,
collator_id: collator_id.clone(),
collator_protocol_version: peer_protocol_version,
from_collator: response_recv.boxed(),
cancellation_token: cancellation_token.clone(),
span: state
@@ -920,10 +938,11 @@ enum AdvertisementError {
UndeclaredCollator,
/// We're assigned to a different para at the given relay parent.
InvalidAssignment,
/// An advertisement format doesn't match the relay parent.
ProtocolMismatch,
/// Para reached a limit of seconded candidates for this relay parent.
SecondedLimitReached,
/// Collator trying to advertise a collation using V1 protocol for an async backing relay
/// parent.
ProtocolMisuse,
/// Advertisement is invalid.
Invalid(InsertAdvertisementError),
}
@@ -933,8 +952,9 @@ impl AdvertisementError {
use AdvertisementError::*;
match self {
InvalidAssignment => Some(COST_WRONG_PARA),
ProtocolMisuse => Some(COST_PROTOCOL_MISUSE),
RelayParentUnknown | UndeclaredCollator | Invalid(_) => Some(COST_UNEXPECTED_MESSAGE),
UnknownPeer | ProtocolMismatch | SecondedLimitReached => None,
UnknownPeer | SecondedLimitReached => None,
}
}
}
@@ -1042,6 +1062,13 @@ where
.get(&relay_parent)
.map(|s| s.child("advertise-collation"));
let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?;
if peer_data.version == CollationVersion::V1 && !state.active_leaves.contains_key(&relay_parent)
{
return Err(AdvertisementError::ProtocolMisuse)
}
let per_relay_parent = state
.per_relay_parent
.get(&relay_parent)
@@ -1050,20 +1077,12 @@ where
let relay_parent_mode = per_relay_parent.prospective_parachains_mode;
let assignment = &per_relay_parent.assignment;
let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?;
let collator_para_id =
peer_data.collating_para().ok_or(AdvertisementError::UndeclaredCollator)?;
match assignment.current {
Some(id) if id == collator_para_id => {
// Our assignment.
},
_ => return Err(AdvertisementError::InvalidAssignment),
};
if relay_parent_mode.is_enabled() && prospective_candidate.is_none() {
// Expected v2 advertisement.
return Err(AdvertisementError::ProtocolMismatch)
// Check if this is assigned to us.
if assignment.current.map_or(true, |id| id != collator_para_id) {
return Err(AdvertisementError::InvalidAssignment)
}
// Always insert advertisements that pass all the checks for spam protection.
@@ -1077,13 +1096,17 @@ where
&state.active_leaves,
)
.map_err(AdvertisementError::Invalid)?;
if !per_relay_parent.collations.is_seconded_limit_reached(relay_parent_mode) {
return Err(AdvertisementError::SecondedLimitReached)
}
if let Some((candidate_hash, parent_head_data_hash)) = prospective_candidate {
let is_seconding_allowed = !relay_parent_mode.is_enabled() ||
can_second(
// We need to queue the advertisement if we are not allowed to second it.
//
// This is also only important when async backing is enabled.
let queue_advertisement = relay_parent_mode.is_enabled() &&
!can_second(
sender,
collator_para_id,
relay_parent,
@@ -1092,7 +1115,7 @@ where
)
.await;
if !is_seconding_allowed {
if queue_advertisement {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
@@ -1125,6 +1148,7 @@ where
prospective_candidate,
)
.await;
if let Err(fetch_error) = result {
gum::debug!(
target: LOG_TARGET,
@@ -1477,7 +1501,7 @@ async fn process_msg<Context>(
},
};
let fetched_collation = FetchedCollation::from(&receipt.to_plain());
if let Some(CollationEvent { collator_id, pending_collation }) =
if let Some(CollationEvent { collator_id, pending_collation, .. }) =
state.fetched_candidates.remove(&fetched_collation)
{
let PendingCollation { relay_parent, peer_id, prospective_candidate, .. } =
@@ -1635,7 +1659,7 @@ async fn run_inner<Context>(
Ok(res) => res
};
let CollationEvent {collator_id, pending_collation} = res.collation_event.clone();
let CollationEvent {collator_id, pending_collation, .. } = res.collation_event.clone();
if let Err(err) = kick_off_seconding(&mut ctx, &mut state, res).await {
gum::warn!(
target: LOG_TARGET,
@@ -1783,39 +1807,39 @@ async fn kick_off_seconding<Context>(
},
};
let collations = &mut per_relay_parent.collations;
let relay_parent_mode = per_relay_parent.prospective_parachains_mode;
let fetched_collation = FetchedCollation::from(&candidate_receipt);
if let Entry::Vacant(entry) = state.fetched_candidates.entry(fetched_collation) {
collation_event.pending_collation.commitments_hash =
Some(candidate_receipt.commitments_hash);
let pvd =
match (relay_parent_mode, collation_event.pending_collation.prospective_candidate) {
(
ProspectiveParachainsMode::Enabled { .. },
Some(ProspectiveCandidate { parent_head_data_hash, .. }),
) =>
request_prospective_validation_data(
ctx.sender(),
relay_parent,
parent_head_data_hash,
pending_collation.para_id,
)
.await?,
(ProspectiveParachainsMode::Disabled, _) =>
request_persisted_validation_data(
ctx.sender(),
candidate_receipt.descriptor().relay_parent,
candidate_receipt.descriptor().para_id,
)
.await?,
_ => {
// `handle_advertisement` checks for protocol mismatch.
return Ok(())
},
}
.ok_or(SecondingError::PersistedValidationDataNotFound)?;
let pvd = match (
collation_event.collator_protocol_version,
collation_event.pending_collation.prospective_candidate,
) {
(CollationVersion::V2, Some(ProspectiveCandidate { parent_head_data_hash, .. }))
if per_relay_parent.prospective_parachains_mode.is_enabled() =>
request_prospective_validation_data(
ctx.sender(),
relay_parent,
parent_head_data_hash,
pending_collation.para_id,
)
.await?,
// Support V2 collators without async backing enabled.
(CollationVersion::V2, Some(_)) | (CollationVersion::V1, _) =>
request_persisted_validation_data(
ctx.sender(),
candidate_receipt.descriptor().relay_parent,
candidate_receipt.descriptor().para_id,
)
.await?,
_ => {
// `handle_advertisement` checks for protocol mismatch.
return Ok(())
},
}
.ok_or(SecondingError::PersistedValidationDataNotFound)?;
fetched_collation_sanity_check(
&collation_event.pending_collation,
@@ -1864,7 +1888,8 @@ async fn handle_collation_fetch_response(
network_error_freq: &mut gum::Freq,
canceled_freq: &mut gum::Freq,
) -> std::result::Result<PendingCollationFetch, Option<(PeerId, Rep)>> {
let (CollationEvent { collator_id, pending_collation }, response) = response;
let (CollationEvent { collator_id, collator_protocol_version, pending_collation }, response) =
response;
// Remove the cancellation handle, as the future already completed.
state.collation_requests_cancel_handles.remove(&pending_collation);
@@ -1970,7 +1995,11 @@ async fn handle_collation_fetch_response(
metrics_result = Ok(());
Ok(PendingCollationFetch {
collation_event: CollationEvent { collator_id, pending_collation },
collation_event: CollationEvent {
collator_id,
pending_collation,
collator_protocol_version,
},
candidate_receipt,
pov,
})
@@ -269,15 +269,15 @@ async fn assert_candidate_backing_second(
expected_relay_parent: Hash,
expected_para_id: ParaId,
expected_pov: &PoV,
mode: ProspectiveParachainsMode,
version: CollationVersion,
) -> CandidateReceipt {
let pvd = dummy_pvd();
// Depending on relay parent mode pvd will be either requested
// from the Runtime API or Prospective Parachains.
let msg = overseer_recv(virtual_overseer).await;
match mode {
ProspectiveParachainsMode::Disabled => assert_matches!(
match version {
CollationVersion::V1 => assert_matches!(
msg,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
@@ -289,7 +289,7 @@ async fn assert_candidate_backing_second(
tx.send(Ok(Some(pvd.clone()))).unwrap();
}
),
ProspectiveParachainsMode::Enabled { .. } => assert_matches!(
CollationVersion::V2 => assert_matches!(
msg,
AllMessages::ProspectiveParachains(
ProspectiveParachainsMessage::GetProspectiveValidationData(request, tx),
@@ -532,7 +532,14 @@ fn act_on_advertisement_v2() {
)
.await;
let candidate_hash = CandidateHash::default();
let pov = PoV { block_data: BlockData(vec![]) };
let mut candidate_a =
dummy_candidate_receipt_bad_sig(dummy_hash(), Some(Default::default()));
candidate_a.descriptor.para_id = test_state.chain_ids[0];
candidate_a.descriptor.relay_parent = test_state.relay_parent;
candidate_a.descriptor.persisted_validation_data_hash = dummy_pvd().hash();
let candidate_hash = candidate_a.hash();
let parent_head_data_hash = Hash::zero();
// v2 advertisement.
advertise_collation(
@@ -543,7 +550,7 @@ fn act_on_advertisement_v2() {
)
.await;
assert_fetch_collation_request(
let response_channel = assert_fetch_collation_request(
&mut virtual_overseer,
test_state.relay_parent,
test_state.chain_ids[0],
@@ -551,6 +558,24 @@ fn act_on_advertisement_v2() {
)
.await;
response_channel
.send(Ok(request_v1::CollationFetchingResponse::Collation(
candidate_a.clone(),
pov.clone(),
)
.encode()))
.expect("Sending response should succeed");
assert_candidate_backing_second(
&mut virtual_overseer,
test_state.relay_parent,
test_state.chain_ids[0],
&pov,
// Async backing isn't enabled and thus it should do it the old way.
CollationVersion::V1,
)
.await;
virtual_overseer
});
}
@@ -748,7 +773,7 @@ fn fetch_one_collation_at_a_time() {
test_state.relay_parent,
test_state.chain_ids[0],
&pov,
ProspectiveParachainsMode::Disabled,
CollationVersion::V1,
)
.await;
@@ -880,7 +905,7 @@ fn fetches_next_collation() {
second,
test_state.chain_ids[0],
&pov,
ProspectiveParachainsMode::Disabled,
CollationVersion::V1,
)
.await;
@@ -1010,7 +1035,7 @@ fn fetch_next_collation_on_invalid_collation() {
test_state.relay_parent,
test_state.chain_ids[0],
&pov,
ProspectiveParachainsMode::Disabled,
CollationVersion::V1,
)
.await;
@@ -74,7 +74,7 @@ async fn assert_assign_incoming(
}
/// Handle a view update.
async fn update_view(
pub(super) async fn update_view(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
new_view: Vec<(Hash, u32)>, // Hash and block number.
@@ -212,6 +212,7 @@ async fn assert_collation_seconded(
virtual_overseer: &mut VirtualOverseer,
relay_parent: Hash,
peer_id: PeerId,
version: CollationVersion,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
@@ -222,29 +223,51 @@ async fn assert_collation_seconded(
assert_eq!(rep.value, BENEFIT_NOTIFY_GOOD.cost_or_benefit());
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendCollationMessage(
peers,
Versioned::V2(protocol_v2::CollationProtocol::CollatorProtocol(
protocol_v2::CollatorProtocolMessage::CollationSeconded(
_relay_parent,
..,
),
)),
)) => {
assert_eq!(peers, vec![peer_id]);
assert_eq!(relay_parent, _relay_parent);
}
);
match version {
CollationVersion::V1 => {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendCollationMessage(
peers,
Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(
protocol_v1::CollatorProtocolMessage::CollationSeconded(
_relay_parent,
..,
),
)),
)) => {
assert_eq!(peers, vec![peer_id]);
assert_eq!(relay_parent, _relay_parent);
}
);
},
CollationVersion::V2 => {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendCollationMessage(
peers,
Versioned::V2(protocol_v2::CollationProtocol::CollatorProtocol(
protocol_v2::CollatorProtocolMessage::CollationSeconded(
_relay_parent,
..,
),
)),
)) => {
assert_eq!(peers, vec![peer_id]);
assert_eq!(relay_parent, _relay_parent);
}
);
},
}
}
#[test]
fn v1_advertisement_rejected() {
fn v1_advertisement_accepted_and_seconded() {
let test_state = TestState::default();
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer, .. } = test_harness;
let TestHarness { mut virtual_overseer, keystore } = test_harness;
let pair_a = CollatorPair::generate().0;
@@ -267,9 +290,94 @@ fn v1_advertisement_rejected() {
advertise_collation(&mut virtual_overseer, peer_a, head_b, None).await;
// Not reported.
test_helpers::Yield::new().await;
assert_matches!(virtual_overseer.recv().now_or_never(), None);
let response_channel = assert_fetch_collation_request(
&mut virtual_overseer,
head_b,
test_state.chain_ids[0],
None,
)
.await;
let mut candidate = dummy_candidate_receipt_bad_sig(head_b, Some(Default::default()));
candidate.descriptor.para_id = test_state.chain_ids[0];
candidate.descriptor.persisted_validation_data_hash = dummy_pvd().hash();
let commitments = CandidateCommitments {
head_data: HeadData(vec![1 as u8]),
horizontal_messages: Default::default(),
upward_messages: Default::default(),
new_validation_code: None,
processed_downward_messages: 0,
hrmp_watermark: 0,
};
candidate.commitments_hash = commitments.hash();
let pov = PoV { block_data: BlockData(vec![1]) };
response_channel
.send(Ok(request_v2::CollationFetchingResponse::Collation(
candidate.clone(),
pov.clone(),
)
.encode()))
.expect("Sending response should succeed");
assert_candidate_backing_second(
&mut virtual_overseer,
head_b,
test_state.chain_ids[0],
&pov,
CollationVersion::V1,
)
.await;
let candidate = CommittedCandidateReceipt { descriptor: candidate.descriptor, commitments };
send_seconded_statement(&mut virtual_overseer, keystore.clone(), &candidate).await;
assert_collation_seconded(&mut virtual_overseer, head_b, peer_a, CollationVersion::V1)
.await;
virtual_overseer
});
}
#[test]
fn v1_advertisement_rejected_on_non_active_leave() {
let test_state = TestState::default();
test_harness(ReputationAggregator::new(|_| true), |test_harness| async move {
let TestHarness { mut virtual_overseer, .. } = test_harness;
let pair_a = CollatorPair::generate().0;
let head_b = Hash::from_low_u64_be(128);
let head_b_num: u32 = 5;
update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await;
let peer_a = PeerId::random();
// Accept both collators from the implicit view.
connect_and_declare_collator(
&mut virtual_overseer,
peer_a,
pair_a.clone(),
test_state.chain_ids[0],
CollationVersion::V1,
)
.await;
advertise_collation(&mut virtual_overseer, peer_a, get_parent_hash(head_b), None).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep)),
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep.value, COST_PROTOCOL_MISUSE.cost_or_benefit());
}
);
virtual_overseer
});
@@ -469,10 +577,7 @@ fn second_multiple_candidates_per_relay_parent() {
head_c,
test_state.chain_ids[0],
&pov,
ProspectiveParachainsMode::Enabled {
max_candidate_depth: ASYNC_BACKING_PARAMETERS.max_candidate_depth as _,
allowed_ancestry_len: ASYNC_BACKING_PARAMETERS.allowed_ancestry_len as _,
},
CollationVersion::V2,
)
.await;
@@ -481,7 +586,8 @@ fn second_multiple_candidates_per_relay_parent() {
send_seconded_statement(&mut virtual_overseer, keystore.clone(), &candidate).await;
assert_collation_seconded(&mut virtual_overseer, head_c, peer_a).await;
assert_collation_seconded(&mut virtual_overseer, head_c, peer_a, CollationVersion::V2)
.await;
}
// No more advertisements can be made for this relay parent.