Network bridge refactoring impl (#1537)

* update networking types

* port over overseer-protocol message types

* Add the collation protocol to network bridge

* message sending

* stub for ConnectToValidators

* add some helper traits and methods to protocol types

* add collator protocol message

* leaves-updating

* peer connection and disconnection

* add utilities for dispatching multiple events

* implement message handling

* add an observedrole enum with equality and no sentry nodes

* derive partial-eq on network bridge event

* add PartialEq impls for network message types

* add Into implementation for observedrole

* port over existing network bridge tests

* add some more tests

* port bitfield distribution

* port over bitfield distribution tests

* add codec indices

* port PoV distribution

* port over PoV distribution tests

* port over statement distribution

* port over statement distribution tests

* update overseer and service-new

* address review comments

* port availability distribution

* port over availability distribution tests
This commit is contained in:
Robert Habermeier
2020-08-12 13:16:28 +02:00
committed by GitHub
parent 8e60a5197f
commit a6b1d91d6e
21 changed files with 1557 additions and 815 deletions
@@ -24,15 +24,16 @@ use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
};
use polkadot_subsystem::messages::{
AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage,
PeerId, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage,
RuntimeApiRequest,
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage,
RuntimeApiMessage, RuntimeApiRequest,
};
use node_primitives::{ProtocolId, View, SignedFullStatement};
use node_primitives::SignedFullStatement;
use polkadot_primitives::v1::{
Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature,
};
use parity_scale_codec::{Encode, Decode};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent,
};
use futures::prelude::*;
use futures::channel::oneshot;
@@ -40,11 +41,8 @@ use indexmap::IndexSet;
use std::collections::{HashMap, HashSet};
const PROTOCOL_V1: ProtocolId = *b"sdn1";
const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement");
const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature");
const COST_INVALID_MESSAGE: Rep = Rep::new(-500, "Invalid message");
const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer");
const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements");
@@ -77,10 +75,6 @@ impl<C> Subsystem<C> for StatementDistribution
}
}
fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(n))
}
/// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent.
///
@@ -480,13 +474,6 @@ fn check_statement_signature(
.and_then(|v| statement.check_signature(&signing_context, v))
}
#[derive(Encode, Decode)]
enum WireMessage {
/// relay-parent, full statement.
#[codec(index = "0")]
Statement(Hash, SignedFullStatement),
}
/// Places the statement in storage if it is new, and then
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
@@ -534,6 +521,14 @@ async fn circulate_statement_and_dependents(
Ok(())
}
fn statement_message(relay_parent: Hash, statement: SignedFullStatement)
-> protocol_v1::ValidationProtocol
{
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement)
)
}
/// Circulates a statement to all peers who have not seen it yet, and returns
/// an iterator over peers who need to have dependent statements sent.
async fn circulate_statement(
@@ -554,10 +549,9 @@ async fn circulate_statement(
// Send all these peers the initial statement.
if !peers_to_send.is_empty() {
let payload = WireMessage::Statement(relay_parent, stored.statement.clone()).encode();
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage(
let payload = statement_message(relay_parent, stored.statement.clone());
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send.keys().cloned().collect(),
PROTOCOL_V1,
payload,
))).await?;
}
@@ -580,16 +574,14 @@ async fn send_statements_about(
) -> SubsystemResult<()> {
for statement in active_head.statements_about(candidate_hash) {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = WireMessage::Statement(
let payload = statement_message(
relay_parent,
statement.statement.clone(),
).encode();
);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage(
vec![peer.clone()],
PROTOCOL_V1,
payload,
))).await?;
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await?;
}
}
@@ -606,16 +598,14 @@ async fn send_statements(
) -> SubsystemResult<()> {
for statement in active_head.statements() {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = WireMessage::Statement(
let payload = statement_message(
relay_parent,
statement.statement.clone(),
).encode();
);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage(
vec![peer.clone()],
PROTOCOL_V1,
payload,
))).await?;
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await?;
}
}
@@ -643,11 +633,10 @@ async fn handle_incoming_message<'a>(
our_view: &View,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
message: Vec<u8>,
message: protocol_v1::StatementDistributionMessage,
) -> SubsystemResult<Option<(Hash, &'a StoredStatement)>> {
let (relay_parent, statement) = match WireMessage::decode(&mut &message[..]) {
Err(_) => return report_peer(ctx, peer, COST_INVALID_MESSAGE).await.map(|_| None),
Ok(WireMessage::Statement(r, s)) => (r, s),
let (relay_parent, statement) = match message {
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
};
if !our_view.contains(&relay_parent) {
@@ -750,7 +739,7 @@ async fn handle_network_update(
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
our_view: &mut View,
update: NetworkBridgeEvent,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
) -> SubsystemResult<()> {
match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
@@ -827,12 +816,6 @@ async fn handle_network_update(
async fn run(
mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>,
) -> SubsystemResult<()> {
// startup: register the network protocol with the bridge.
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::RegisterEventProducer(
PROTOCOL_V1,
network_update_message,
))).await?;
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = View::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
@@ -897,13 +880,14 @@ async fn run(
relay_parent,
statement,
).await?,
StatementDistributionMessage::NetworkBridgeUpdate(event) => handle_network_update(
&mut peers,
&mut active_heads,
&mut ctx,
&mut our_view,
event,
).await?,
StatementDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut peers,
&mut active_heads,
&mut ctx,
&mut our_view,
event,
).await?,
}
}
}
@@ -1272,19 +1256,16 @@ mod tests {
for statement in active_head.statements_about(candidate_hash) {
let message = handle.recv().await;
let expected_to = vec![peer.clone()];
let expected_protocol = PROTOCOL_V1;
let expected_payload
= WireMessage::Statement(hash_c, statement.statement.clone()).encode();
= statement_message(hash_c, statement.statement.clone());
assert_matches!(
message,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage(
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
to,
protocol,
payload,
)) => {
assert_eq!(to, expected_to);
assert_eq!(protocol, expected_protocol);
assert_eq!(payload, expected_payload)
}
)
@@ -1383,19 +1364,17 @@ mod tests {
let message = handle.recv().await;
assert_matches!(
message,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage(
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
to,
protocol,
payload,
)) => {
assert_eq!(to.len(), 2);
assert!(to.contains(&peer_b));
assert!(to.contains(&peer_c));
assert_eq!(protocol, PROTOCOL_V1);
assert_eq!(
payload,
WireMessage::Statement(hash_b, statement.statement.clone()).encode(),
statement_message(hash_b, statement.statement.clone()),
);
}
)