mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 07:37:57 +00:00
collator-side: send parent head data (#3521)
On top of #3302. We want the validators to upgrade first before we add changes to the collation side to send the new variants, which is why this part is extracted into a separate PR. The detection of when to send the parent head is based on the core assignments at the relay parent of the candidate. We probably want to make it more flexible in the future, but for now, it will work for a simple use case when a para always has multiple cores assigned to it. --------- Signed-off-by: Matteo Muraca <mmuraca247@gmail.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Matteo Muraca <56828990+muraca@users.noreply.github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Juan Ignacio Rios <54085674+JuaniRios@users.noreply.github.com> Co-authored-by: Branislav Kontur <bkontur@gmail.com> Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
@@ -27,7 +27,8 @@ use polkadot_node_network_protocol::{
|
||||
PeerId,
|
||||
};
|
||||
use polkadot_node_primitives::PoV;
|
||||
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, HeadData, Id as ParaId};
|
||||
use polkadot_node_subsystem::messages::ParentHeadData;
|
||||
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, Id as ParaId};
|
||||
|
||||
/// The status of a collation as seen from the collator.
|
||||
pub enum CollationStatus {
|
||||
@@ -59,12 +60,10 @@ impl CollationStatus {
|
||||
pub struct Collation {
|
||||
/// Candidate receipt.
|
||||
pub receipt: CandidateReceipt,
|
||||
/// Parent head-data hash.
|
||||
pub parent_head_data_hash: Hash,
|
||||
/// Proof to verify the state transition of the parachain.
|
||||
pub pov: PoV,
|
||||
/// Parent head-data needed for elastic scaling.
|
||||
pub parent_head_data: HeadData,
|
||||
/// Parent head-data (or just hash).
|
||||
pub parent_head_data: ParentHeadData,
|
||||
/// Collation status.
|
||||
pub status: CollationStatus,
|
||||
}
|
||||
|
||||
@@ -40,7 +40,8 @@ use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement};
|
||||
use polkadot_node_subsystem::{
|
||||
jaeger,
|
||||
messages::{
|
||||
CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, RuntimeApiMessage,
|
||||
CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData,
|
||||
RuntimeApiMessage,
|
||||
},
|
||||
overseer, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal, PerLeafSpan,
|
||||
};
|
||||
@@ -395,12 +396,11 @@ async fn distribute_collation<Context>(
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// Determine which core the para collated-on is assigned to.
|
||||
// Determine which core(s) the para collated-on is assigned to.
|
||||
// If it is not scheduled then ignore the message.
|
||||
let (our_core, num_cores) =
|
||||
match determine_core(ctx.sender(), id, candidate_relay_parent, relay_parent_mode).await? {
|
||||
Some(core) => core,
|
||||
None => {
|
||||
let (our_cores, num_cores) =
|
||||
match determine_cores(ctx.sender(), id, candidate_relay_parent, relay_parent_mode).await? {
|
||||
(cores, _num_cores) if cores.is_empty() => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
para_id = %id,
|
||||
@@ -409,8 +409,20 @@ async fn distribute_collation<Context>(
|
||||
|
||||
return Ok(())
|
||||
},
|
||||
(cores, num_cores) => (cores, num_cores),
|
||||
};
|
||||
|
||||
let elastic_scaling = our_cores.len() > 1;
|
||||
if elastic_scaling {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
para_id = %id,
|
||||
cores = ?our_cores,
|
||||
"{} is assigned to {} cores at {}", id, our_cores.len(), candidate_relay_parent,
|
||||
);
|
||||
}
|
||||
|
||||
let our_core = our_cores[0];
|
||||
// Determine the group on that core.
|
||||
//
|
||||
// When prospective parachains are disabled, candidate relay parent here is
|
||||
@@ -464,15 +476,15 @@ async fn distribute_collation<Context>(
|
||||
state.collation_result_senders.insert(candidate_hash, result_sender);
|
||||
}
|
||||
|
||||
let parent_head_data = if elastic_scaling {
|
||||
ParentHeadData::WithData { hash: parent_head_data_hash, head_data: parent_head_data }
|
||||
} else {
|
||||
ParentHeadData::OnlyHash(parent_head_data_hash)
|
||||
};
|
||||
|
||||
per_relay_parent.collations.insert(
|
||||
candidate_hash,
|
||||
Collation {
|
||||
receipt,
|
||||
parent_head_data_hash,
|
||||
pov,
|
||||
parent_head_data,
|
||||
status: CollationStatus::Created,
|
||||
},
|
||||
Collation { receipt, pov, parent_head_data, status: CollationStatus::Created },
|
||||
);
|
||||
|
||||
// If prospective parachains are disabled, a leaf should be known to peer.
|
||||
@@ -513,15 +525,17 @@ async fn distribute_collation<Context>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the Id of the Core that is assigned to the para being collated on if any
|
||||
/// Get the core indices that are assigned to the para being collated on if any
|
||||
/// and the total number of cores.
|
||||
async fn determine_core(
|
||||
async fn determine_cores(
|
||||
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
|
||||
para_id: ParaId,
|
||||
relay_parent: Hash,
|
||||
relay_parent_mode: ProspectiveParachainsMode,
|
||||
) -> Result<Option<(CoreIndex, usize)>> {
|
||||
) -> Result<(Vec<CoreIndex>, usize)> {
|
||||
let cores = get_availability_cores(sender, relay_parent).await?;
|
||||
let n_cores = cores.len();
|
||||
let mut assigned_cores = Vec::new();
|
||||
|
||||
for (idx, core) in cores.iter().enumerate() {
|
||||
let core_para_id = match core {
|
||||
@@ -538,11 +552,11 @@ async fn determine_core(
|
||||
};
|
||||
|
||||
if core_para_id == Some(para_id) {
|
||||
return Ok(Some(((idx as u32).into(), cores.len())))
|
||||
assigned_cores.push(CoreIndex::from(idx as u32));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
Ok((assigned_cores, n_cores))
|
||||
}
|
||||
|
||||
/// Validators of a particular group index.
|
||||
@@ -725,7 +739,7 @@ async fn advertise_collation<Context>(
|
||||
let wire_message = protocol_v2::CollatorProtocolMessage::AdvertiseCollation {
|
||||
relay_parent,
|
||||
candidate_hash: *candidate_hash,
|
||||
parent_head_data_hash: collation.parent_head_data_hash,
|
||||
parent_head_data_hash: collation.parent_head_data.hash(),
|
||||
};
|
||||
Versioned::V2(protocol_v2::CollationProtocol::CollatorProtocol(wire_message))
|
||||
},
|
||||
@@ -849,7 +863,7 @@ async fn send_collation(
|
||||
request: VersionedCollationRequest,
|
||||
receipt: CandidateReceipt,
|
||||
pov: PoV,
|
||||
_parent_head_data: HeadData,
|
||||
parent_head_data: ParentHeadData,
|
||||
) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
@@ -857,20 +871,25 @@ async fn send_collation(
|
||||
let peer_id = request.peer_id();
|
||||
let candidate_hash = receipt.hash();
|
||||
|
||||
// The response payload is the same for v1 and v2 versions of protocol
|
||||
// and doesn't have v2 alias for simplicity.
|
||||
// For now, we don't send parent head data to the collation requester.
|
||||
let result =
|
||||
// if assigned_multiple_cores {
|
||||
// Ok(request_v1::CollationFetchingResponse::CollationWithParentHeadData {
|
||||
// receipt,
|
||||
// pov,
|
||||
// parent_head_data,
|
||||
// })
|
||||
// } else {
|
||||
#[cfg(feature = "elastic-scaling-experimental")]
|
||||
let result = match parent_head_data {
|
||||
ParentHeadData::WithData { head_data, .. } =>
|
||||
Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData {
|
||||
receipt,
|
||||
pov,
|
||||
parent_head_data: head_data,
|
||||
}),
|
||||
ParentHeadData::OnlyHash(_) =>
|
||||
Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov)),
|
||||
};
|
||||
#[cfg(not(feature = "elastic-scaling-experimental"))]
|
||||
let result = {
|
||||
// suppress unused warning
|
||||
let _parent_head_data = parent_head_data;
|
||||
|
||||
Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov))
|
||||
// }
|
||||
;
|
||||
};
|
||||
|
||||
let response =
|
||||
OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };
|
||||
|
||||
|
||||
@@ -142,6 +142,21 @@ impl Default for TestState {
|
||||
}
|
||||
|
||||
impl TestState {
|
||||
/// Adds a few more scheduled cores to the state for the same para id
|
||||
/// compared to the default.
|
||||
#[cfg(feature = "elastic-scaling-experimental")]
|
||||
pub fn with_elastic_scaling() -> Self {
|
||||
let mut state = Self::default();
|
||||
let para_id = state.para_id;
|
||||
state
|
||||
.availability_cores
|
||||
.push(CoreState::Scheduled(ScheduledCore { para_id, collator: None }));
|
||||
state
|
||||
.availability_cores
|
||||
.push(CoreState::Scheduled(ScheduledCore { para_id, collator: None }));
|
||||
state
|
||||
}
|
||||
|
||||
fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
|
||||
let core_num = self.availability_cores.len();
|
||||
let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(CoreIndex(0), core_num);
|
||||
|
||||
+113
@@ -372,6 +372,119 @@ fn distribute_collation_up_to_limit() {
|
||||
)
|
||||
}
|
||||
|
||||
/// Tests that collator send the parent head data in
|
||||
/// case the para is assigned to multiple cores (elastic scaling).
|
||||
#[test]
|
||||
#[cfg(feature = "elastic-scaling-experimental")]
|
||||
fn send_parent_head_data_for_elastic_scaling() {
|
||||
let test_state = TestState::with_elastic_scaling();
|
||||
|
||||
let local_peer_id = test_state.local_peer_id;
|
||||
let collator_pair = test_state.collator_pair.clone();
|
||||
|
||||
test_harness(
|
||||
local_peer_id,
|
||||
collator_pair,
|
||||
ReputationAggregator::new(|_| true),
|
||||
|test_harness| async move {
|
||||
let mut virtual_overseer = test_harness.virtual_overseer;
|
||||
let req_v1_cfg = test_harness.req_v1_cfg;
|
||||
let mut req_v2_cfg = test_harness.req_v2_cfg;
|
||||
|
||||
let head_b = Hash::from_low_u64_be(129);
|
||||
let head_b_num: u32 = 63;
|
||||
|
||||
// Set collating para id.
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::CollateOn(test_state.para_id),
|
||||
)
|
||||
.await;
|
||||
update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await;
|
||||
|
||||
let pov_data = PoV { block_data: BlockData(vec![1 as u8]) };
|
||||
let candidate = TestCandidateBuilder {
|
||||
para_id: test_state.para_id,
|
||||
relay_parent: head_b,
|
||||
pov_hash: pov_data.hash(),
|
||||
..Default::default()
|
||||
}
|
||||
.build();
|
||||
|
||||
let phd = HeadData(vec![1, 2, 3]);
|
||||
let phdh = phd.hash();
|
||||
|
||||
distribute_collation_with_receipt(
|
||||
&mut virtual_overseer,
|
||||
&test_state,
|
||||
head_b,
|
||||
true,
|
||||
candidate.clone(),
|
||||
pov_data.clone(),
|
||||
phdh,
|
||||
)
|
||||
.await;
|
||||
|
||||
let peer = test_state.validator_peer_id[0];
|
||||
let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
|
||||
connect_peer(
|
||||
&mut virtual_overseer,
|
||||
peer,
|
||||
CollationVersion::V2,
|
||||
Some(validator_id.clone()),
|
||||
)
|
||||
.await;
|
||||
expect_declare_msg_v2(&mut virtual_overseer, &test_state, &peer).await;
|
||||
|
||||
send_peer_view_change(&mut virtual_overseer, &peer, vec![head_b]).await;
|
||||
let hashes: Vec<_> = vec![candidate.hash()];
|
||||
expect_advertise_collation_msg(&mut virtual_overseer, &peer, head_b, Some(hashes))
|
||||
.await;
|
||||
|
||||
let (pending_response, rx) = oneshot::channel();
|
||||
req_v2_cfg
|
||||
.inbound_queue
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.send(RawIncomingRequest {
|
||||
peer,
|
||||
payload: request_v2::CollationFetchingRequest {
|
||||
relay_parent: head_b,
|
||||
para_id: test_state.para_id,
|
||||
candidate_hash: candidate.hash(),
|
||||
}
|
||||
.encode(),
|
||||
pending_response,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
rx.await,
|
||||
Ok(full_response) => {
|
||||
let response: request_v2::CollationFetchingResponse =
|
||||
request_v2::CollationFetchingResponse::decode(&mut
|
||||
full_response.result
|
||||
.expect("We should have a proper answer").as_ref()
|
||||
).expect("Decoding should work");
|
||||
assert_matches!(
|
||||
response,
|
||||
request_v1::CollationFetchingResponse::CollationWithParentHeadData {
|
||||
receipt, pov, parent_head_data
|
||||
} => {
|
||||
assert_eq!(receipt, candidate);
|
||||
assert_eq!(pov, pov_data);
|
||||
assert_eq!(parent_head_data, phd);
|
||||
}
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
TestHarness { virtual_overseer, req_v1_cfg, req_v2_cfg }
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Tests that collator correctly handles peer V2 requests.
|
||||
#[test]
|
||||
fn advertise_and_send_collation_by_hash() {
|
||||
|
||||
Reference in New Issue
Block a user