Collators: Declare to all peers (#2816)

* fix tests

* add test for rejecting declares on collators

* fix bad test
This commit is contained in:
Robert Habermeier
2021-04-04 18:59:00 +02:00
committed by GitHub
parent 15dfdd68c0
commit bfc8f4fcf3
@@ -33,7 +33,7 @@ use polkadot_subsystem::{
use polkadot_node_network_protocol::{
OurView, PeerId, View, peer_set::PeerSet,
request_response::{IncomingRequest, v1::{CollationFetchingRequest, CollationFetchingResponse}},
v1 as protocol_v1
v1 as protocol_v1, UnifiedReputationChange as Rep,
};
use polkadot_node_subsystem_util::{
validator_discovery,
@@ -44,6 +44,8 @@ use polkadot_node_subsystem_util::{
};
use polkadot_node_primitives::{SignedFullStatement, Statement, PoV, CompressedPoV};
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
@@ -241,9 +243,6 @@ struct State {
/// Our validator groups per active leaf.
our_validators_groups: HashMap<Hash, ValidatorGroup>,
/// List of peers where we declared ourself as a collator.
declared_at: HashSet<PeerId>,
/// The connection requests to validators per relay parent.
connection_requests: validator_discovery::ConnectionRequests,
@@ -266,7 +265,6 @@ impl State {
collations: Default::default(),
collation_result_senders: Default::default(),
our_validators_groups: Default::default(),
declared_at: Default::default(),
connection_requests: Default::default(),
}
}
@@ -486,7 +484,7 @@ async fn advertise_collation(
match (state.collations.get_mut(&relay_parent), should_advertise) {
(None, _) => {
tracing::debug!(
tracing::trace!(
target: LOG_TARGET,
?relay_parent,
peer_id = %peer,
@@ -693,8 +691,9 @@ async fn send_collation(
}
/// A networking messages switch.
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_peer_message(
ctx: &mut impl SubsystemContext,
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
@@ -703,18 +702,32 @@ async fn handle_incoming_peer_message(
match msg {
Declare(_, _, _) => {
tracing::warn!(
tracing::trace!(
target: LOG_TARGET,
?origin,
"Declare message is not expected on the collator side of the protocol",
);
// If we are declared to, this is another collator, and we should disconnect.
ctx.send_message(
NetworkBridgeMessage::DisconnectPeer(origin, PeerSet::Collation).into()
).await;
}
AdvertiseCollation(_) => {
tracing::warn!(
tracing::trace!(
target: LOG_TARGET,
?origin,
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
ctx.send_message(
NetworkBridgeMessage::ReportPeer(origin.clone(), COST_UNEXPECTED_MESSAGE).into()
).await;
// If we are advertised to, this is another collator, and we should disconnect.
ctx.send_message(
NetworkBridgeMessage::DisconnectPeer(origin, PeerSet::Collation).into()
).await;
}
CollationSeconded(statement) => {
if !matches!(statement.payload(), Statement::Seconded(_)) {
@@ -775,12 +788,6 @@ async fn handle_validator_connected(
"Connected to requested validator"
);
let not_declared = state.declared_at.insert(peer_id.clone());
if not_declared {
declare(ctx, state, peer_id.clone()).await;
}
// Store the PeerId and find out if we should advertise to this peer.
//
// If this peer does not belong to the para validators, we also don't need to try to advertise our collation.
@@ -814,6 +821,9 @@ async fn handle_network_msg(
?observed_role,
"Peer connected",
);
// Always declare to every peer. We should be connecting only to validators.
declare(ctx, state, peer_id.clone()).await;
}
PeerViewChange(peer_id, view) => {
tracing::trace!(
@@ -831,7 +841,6 @@ async fn handle_network_msg(
"Peer disconnected",
);
state.peer_views.remove(&peer_id);
state.declared_at.remove(&peer_id);
}
OurViewChange(view) => {
tracing::trace!(
@@ -842,7 +851,7 @@ async fn handle_network_msg(
handle_our_view_change(state, view).await?;
}
PeerMessage(remote, msg) => {
handle_incoming_peer_message(state, remote, msg).await?;
handle_incoming_peer_message(ctx, state, remote, msg).await?;
}
}
@@ -1135,18 +1144,6 @@ mod tests {
collator_pair: CollatorPair,
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
Some("polkadot_collator_protocol"),
log::LevelFilter::Trace,
)
.filter(
Some(LOG_TARGET),
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
@@ -1468,10 +1465,14 @@ mod tests {
let DistributeCollation { mut connected, candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state).await;
test_state.current_group_validator_authority_ids()
for (val, peer) in test_state.current_group_validator_authority_ids()
.into_iter()
.zip(test_state.current_group_validator_peer_ids())
.for_each(|r| connected.try_send(r).unwrap());
{
connect_peer(&mut virtual_overseer, peer.clone()).await;
connected.try_send((val, peer)).unwrap();
}
// We declare to the connected validators that we are a collator.
// We need to catch all `Declare` messages to the validators we've
@@ -1549,10 +1550,13 @@ mod tests {
let DistributeCollation { mut connected, .. } =
distribute_collation(&mut virtual_overseer, &test_state).await;
test_state.current_group_validator_authority_ids()
for (val, peer) in test_state.current_group_validator_authority_ids()
.into_iter()
.zip(test_state.current_group_validator_peer_ids())
.for_each(|r| connected.try_send(r).unwrap());
{
connected.try_send((val, peer)).unwrap();
}
// Send info about peer's view.
overseer_send(
@@ -1569,10 +1573,8 @@ mod tests {
});
}
/// This test ensures that we declare a collator at a validator by sending the `Declare` message as soon as the
/// collator is aware of the validator being connected.
#[test]
fn collators_are_registered_correctly_at_validators() {
fn collators_declare_to_connected_peers() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
@@ -1581,16 +1583,11 @@ mod tests {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.validator_peer_id[0].clone();
let validator_id = test_state.validator_authority_id[0].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone()).await;
let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
connected.try_send((validator_id, peer.clone())).unwrap();
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
})
}
@@ -1618,6 +1615,9 @@ mod tests {
// Connect the second validator
connect_peer(&mut virtual_overseer, peer2.clone()).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
// And let it tell us that it is has the same view.
send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
@@ -1625,9 +1625,6 @@ mod tests {
connected.try_send((validator_id, peer.clone())).unwrap();
connected.try_send((validator_id2, peer2.clone())).unwrap();
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
// The other validator announces that it changed its view.
@@ -1661,13 +1658,13 @@ mod tests {
// Connect the second validator
connect_peer(&mut virtual_overseer, peer2.clone()).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
connected.try_send((validator_id.clone(), peer.clone())).unwrap();
connected.try_send((validator_id2.clone(), peer2.clone())).unwrap();
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
let old_relay_parent = test_state.relay_parent;
// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
@@ -1701,20 +1698,64 @@ mod tests {
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone()).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
connected.try_send((validator_id.clone(), peer.clone())).unwrap();
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Disconnect and reconnect directly
disconnect_peer(&mut virtual_overseer, peer.clone()).await;
connect_peer(&mut virtual_overseer, peer.clone()).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
})
}
#[test]
fn collators_reject_declare_messages() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
let collator_pair2 = CollatorPair::generate().0;
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
setup_system(&mut virtual_overseer, &test_state).await;
// A validator connected to us
connect_peer(&mut virtual_overseer, peer.clone()).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
collator_pair2.public(),
ParaId::from(5),
collator_pair2.sign(b"garbage"),
),
)
)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
p,
PeerSet::Collation,
)) if p == peer
);
})
}
}