mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 14:37:57 +00:00
Advertise to already connected validators (#1790)
* Advertise to already connected validators * Merge the loops and check the view * Extend a test to capture new logic * Fix a comment * Update node/network/collator-protocol/src/collator_side.rs Co-authored-by: Andronik Ordian <write@reusable.software> * Update comment Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
@@ -122,7 +122,7 @@ struct State {
|
||||
/// all actions from peers not in this map will be ignored.
|
||||
/// Entries in this map will be cleared as validator groups in `our_validator_groups`
|
||||
/// go out of scope with their respective deactivated leafs.
|
||||
known_validators: HashMap<PeerId, ValidatorId>,
|
||||
known_validators: HashMap<ValidatorId, PeerId>,
|
||||
|
||||
/// Use to await for the next validator connection and revoke the request.
|
||||
last_connection_request: Option<validator_discovery::ConnectionRequest>,
|
||||
@@ -195,6 +195,19 @@ where
|
||||
|
||||
state.our_validators_groups.insert(relay_parent, our_validators.clone());
|
||||
|
||||
// We may be already connected to some of the validators. In that case,
|
||||
// advertise a collation to them right away.
|
||||
for validator in our_validators.iter() {
|
||||
if let Some(peer) = state.known_validators.get(&validator) {
|
||||
if let Some(view) = state.peer_views.get(peer) {
|
||||
if view.contains(&relay_parent) {
|
||||
let peer = peer.clone();
|
||||
advertise_collation(ctx, state, relay_parent, vec![peer]).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Issue a discovery request for the validators of the current group and the next group.
|
||||
connect_to_validators(ctx, relay_parent, state, our_validators).await?;
|
||||
|
||||
@@ -554,9 +567,11 @@ async fn handle_validator_connected<Context>(
|
||||
where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>
|
||||
{
|
||||
state.peer_views.entry(peer_id.clone()).or_default();
|
||||
|
||||
declare(ctx, state, vec![peer_id]).await?;
|
||||
if !state.peer_views.contains_key(&peer_id) {
|
||||
// Only declare the new peers.
|
||||
declare(ctx, state, vec![peer_id.clone()]).await?;
|
||||
state.peer_views.insert(peer_id, Default::default());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -574,12 +589,14 @@ where
|
||||
|
||||
match bridge_message {
|
||||
PeerConnected(_peer_id, _observed_role) => {
|
||||
// validators first connection is handled by `handle_validator_connected`
|
||||
// If it is possible that a disconnected validator would attempt a reconnect
|
||||
// it should be handled here.
|
||||
}
|
||||
PeerViewChange(peer_id, view) => {
|
||||
handle_peer_view_change(ctx, state, peer_id, view).await?;
|
||||
}
|
||||
PeerDisconnected(peer_id) => {
|
||||
state.known_validators.retain(|_, v| *v != peer_id);
|
||||
state.peer_views.remove(&peer_id);
|
||||
}
|
||||
OurViewChange(view) => {
|
||||
@@ -606,9 +623,7 @@ async fn handle_our_view_change(
|
||||
|
||||
for removed in removed.into_iter() {
|
||||
state.collations.remove(removed);
|
||||
if let Some(group) = state.our_validators_groups.remove(removed) {
|
||||
state.known_validators.retain(|_, v| !group.contains(v));
|
||||
}
|
||||
state.our_validators_groups.remove(removed);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -636,7 +651,7 @@ where
|
||||
loop {
|
||||
if let Some(mut request) = state.last_connection_request.take() {
|
||||
while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) {
|
||||
state.known_validators.insert(peer_id.clone(), validator_id);
|
||||
state.known_validators.insert(validator_id, peer_id.clone());
|
||||
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id).await {
|
||||
warn!(
|
||||
target: TARGET,
|
||||
@@ -1129,6 +1144,127 @@ mod tests {
|
||||
).await;
|
||||
|
||||
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
|
||||
|
||||
let pov_block = PoV {
|
||||
block_data: BlockData(vec![45, 46, 47]),
|
||||
};
|
||||
|
||||
let pov_hash = pov_block.hash();
|
||||
let current = Hash::repeat_byte(33);
|
||||
|
||||
let candidate = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[0],
|
||||
relay_parent: current,
|
||||
pov_hash,
|
||||
..Default::default()
|
||||
}.build();
|
||||
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::OurViewChange(View(vec![current])),
|
||||
),
|
||||
).await;
|
||||
|
||||
// Send info about peer's view.
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(
|
||||
test_state.validator_peer_id[2].clone(),
|
||||
View(vec![current]),
|
||||
)
|
||||
)
|
||||
).await;
|
||||
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()),
|
||||
).await;
|
||||
|
||||
// obtain the availability cores.
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::AvailabilityCores(tx)
|
||||
)) => {
|
||||
assert_eq!(relay_parent, current);
|
||||
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// Obtain the validator groups
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorGroups(tx)
|
||||
)) => {
|
||||
assert_eq!(relay_parent, current);
|
||||
tx.send(Ok(test_state.validator_groups.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// obtain the validators per relay parent
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::Validators(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, current);
|
||||
tx.send(Ok(test_state.validator_public.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// The peer is interested in a leaf that we have a collation for;
|
||||
// advertise it.
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendCollationMessage(
|
||||
to,
|
||||
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
|
||||
)
|
||||
) => {
|
||||
assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]);
|
||||
assert_matches!(
|
||||
wire_message,
|
||||
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
|
||||
relay_parent,
|
||||
collating_on,
|
||||
) => {
|
||||
assert_eq!(relay_parent, current);
|
||||
assert_eq!(collating_on, test_state.chain_ids[0]);
|
||||
}
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
// obtain the validator_id to authority_id mapping
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, current);
|
||||
assert_eq!(validators.len(), 4);
|
||||
assert!(validators.contains(&test_state.validator_public[2]));
|
||||
assert!(validators.contains(&test_state.validator_public[0]));
|
||||
assert!(validators.contains(&test_state.validator_public[4]));
|
||||
assert!(validators.contains(&test_state.validator_public[1]));
|
||||
|
||||
let result = vec![
|
||||
Some(test_state.validator_authority_id[2].clone()),
|
||||
Some(test_state.validator_authority_id[0].clone()),
|
||||
Some(test_state.validator_authority_id[4].clone()),
|
||||
Some(test_state.validator_authority_id[1].clone()),
|
||||
];
|
||||
tx.send(Ok(result)).unwrap();
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user