Prepare for network protocol version upgrades (#5084)

* explicitly tag network requests with version

* fmt

* make PeerSet more aware of versioning

* some generalization of the network bridge to support upgrades

* walk back some renaming

* walk back some version stuff

* extract version from fallback

* remove V1 from NetworkBridgeUpdate

* add accidentally-removed timer

* implement focusing for versioned messages

* fmt

* fix up network bridge & tests

* remove inaccurate version check in bridge

* remove some TODO [now]s

* fix fallout in statement distribution

* fmt

* fallout in gossip-support

* fix fallout in collator-protocol

* fix fallout in bitfield-distribution

* fix fallout in approval-distribution

* fmt

* use never!

* fmt
This commit is contained in:
asynchronous rob
2022-04-21 11:34:59 -05:00
committed by GitHub
parent 203441981f
commit fc4b04db20
45 changed files with 942 additions and 594 deletions
+271 -130
View File
@@ -19,15 +19,19 @@
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
use always_assert::never;
use bytes::Bytes;
use futures::{prelude::*, stream::BoxStream};
use parity_scale_codec::{Decode, Encode};
use parity_scale_codec::{Decode, DecodeAll, Encode};
use parking_lot::Mutex;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;
use polkadot_node_network_protocol::{
peer_set::PeerSet, v1 as protocol_v1, ObservedRole, OurView, PeerId,
UnifiedReputationChange as Rep, View,
self as net_protocol,
peer_set::{PeerSet, PerPeerSet},
v1 as protocol_v1, ObservedRole, OurView, PeerId, ProtocolVersion,
UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_overseer::gen::{OverseerError, Subsystem};
@@ -83,58 +87,69 @@ const LOG_TARGET: &'static str = "parachain::network-bridge";
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
fn peer_set_label(peer_set: PeerSet, version: ProtocolVersion) -> &'static str {
// Higher level code is meant to protect against this ever happening.
peer_set.get_protocol_name_static(version).unwrap_or("<internal error>")
}
impl Metrics {
fn on_peer_connected(&self, peer_set: PeerSet) {
fn on_peer_connected(&self, peer_set: PeerSet, version: ProtocolVersion) {
self.0.as_ref().map(|metrics| {
metrics
.connected_events
.with_label_values(&[peer_set.get_protocol_name_static()])
.with_label_values(&[peer_set_label(peer_set, version)])
.inc()
});
}
fn on_peer_disconnected(&self, peer_set: PeerSet) {
fn on_peer_disconnected(&self, peer_set: PeerSet, version: ProtocolVersion) {
self.0.as_ref().map(|metrics| {
metrics
.disconnected_events
.with_label_values(&[peer_set.get_protocol_name_static()])
.with_label_values(&[peer_set_label(peer_set, version)])
.inc()
});
}
fn note_peer_count(&self, peer_set: PeerSet, count: usize) {
fn note_peer_count(&self, peer_set: PeerSet, version: ProtocolVersion, count: usize) {
self.0.as_ref().map(|metrics| {
metrics
.peer_count
.with_label_values(&[peer_set.get_protocol_name_static()])
.with_label_values(&[peer_set_label(peer_set, version)])
.set(count as u64)
});
}
fn on_notification_received(&self, peer_set: PeerSet, size: usize) {
fn on_notification_received(&self, peer_set: PeerSet, version: ProtocolVersion, size: usize) {
if let Some(metrics) = self.0.as_ref() {
metrics
.notifications_received
.with_label_values(&[peer_set.get_protocol_name_static()])
.with_label_values(&[peer_set_label(peer_set, version)])
.inc();
metrics
.bytes_received
.with_label_values(&[peer_set.get_protocol_name_static()])
.with_label_values(&[peer_set_label(peer_set, version)])
.inc_by(size as u64);
}
}
fn on_notification_sent(&self, peer_set: PeerSet, size: usize, to_peers: usize) {
fn on_notification_sent(
&self,
peer_set: PeerSet,
version: ProtocolVersion,
size: usize,
to_peers: usize,
) {
if let Some(metrics) = self.0.as_ref() {
metrics
.notifications_sent
.with_label_values(&[peer_set.get_protocol_name_static()])
.with_label_values(&[peer_set_label(peer_set, version)])
.inc_by(to_peers as u64);
metrics
.bytes_sent
.with_label_values(&[peer_set.get_protocol_name_static()])
.with_label_values(&[peer_set_label(peer_set, version)])
.inc_by((size * to_peers) as u64);
}
}
@@ -143,7 +158,7 @@ impl Metrics {
self.0.as_ref().map(|metrics| {
metrics
.desired_peer_count
.with_label_values(&[peer_set.get_protocol_name_static()])
.with_label_values(&[peer_set.get_default_protocol_name()])
.set(size as u64)
});
}
@@ -329,6 +344,7 @@ where
struct PeerData {
/// The Latest view sent by the peer.
view: View,
version: ProtocolVersion,
}
#[derive(Debug)]
@@ -467,22 +483,24 @@ where
?peer,
peer_set = ?peer_set,
);
network_service.disconnect_peer(peer, peer_set);
}
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
gum::trace!(
target: LOG_TARGET,
action = "SendValidationMessages",
num_messages = 1,
num_messages = 1usize,
);
send_message(
&mut network_service,
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
&metrics,
);
match msg {
Versioned::V1(msg) => send_validation_message_v1(
&mut network_service,
peers,
WireMessage::ProtocolMessage(msg),
&metrics,
),
}
}
NetworkBridgeMessage::SendValidationMessages(msgs) => {
gum::trace!(
@@ -492,29 +510,31 @@ where
);
for (peers, msg) in msgs {
send_message(
&mut network_service,
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
&metrics,
);
match msg {
Versioned::V1(msg) => send_validation_message_v1(
&mut network_service,
peers,
WireMessage::ProtocolMessage(msg),
&metrics,
),
}
}
}
NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
gum::trace!(
target: LOG_TARGET,
action = "SendCollationMessages",
num_messages = 1,
num_messages = 1usize,
);
send_message(
&mut network_service,
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
&metrics,
);
match msg {
Versioned::V1(msg) => send_collation_message_v1(
&mut network_service,
peers,
WireMessage::ProtocolMessage(msg),
&metrics,
),
}
}
NetworkBridgeMessage::SendCollationMessages(msgs) => {
gum::trace!(
@@ -524,13 +544,14 @@ where
);
for (peers, msg) in msgs {
send_message(
&mut network_service,
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
&metrics,
);
match msg {
Versioned::V1(msg) => send_collation_message_v1(
&mut network_service,
peers,
WireMessage::ProtocolMessage(msg),
&metrics,
),
}
}
}
NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => {
@@ -672,18 +693,58 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
Some(NetworkEvent::SyncConnected { .. }) |
Some(NetworkEvent::SyncDisconnected { .. }) => {},
Some(NetworkEvent::NotificationStreamOpened {
remote: peer, protocol, role, ..
remote: peer,
protocol,
role,
negotiated_fallback,
}) => {
let role = ObservedRole::from(role);
let peer_set = match PeerSet::try_from_protocol_name(&protocol) {
None => continue,
Some(peer_set) => peer_set,
let (peer_set, version) = {
let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) {
None => continue,
Some(p) => p,
};
if let Some(fallback) = negotiated_fallback {
match PeerSet::try_from_protocol_name(&fallback) {
None => {
gum::debug!(
target: LOG_TARGET,
fallback = &*fallback,
?peer,
?peer_set,
"Unknown fallback",
);
continue
},
Some((p2, v2)) => {
if p2 != peer_set {
gum::debug!(
target: LOG_TARGET,
fallback = &*fallback,
fallback_peerset = ?p2,
protocol = &*protocol,
peerset = ?peer_set,
"Fallback mismatched peer-set",
);
continue
}
(p2, v2)
},
}
} else {
(peer_set, version)
}
};
gum::debug!(
target: LOG_TARGET,
action = "PeerConnected",
peer_set = ?peer_set,
version,
peer = ?peer,
role = ?role
);
@@ -698,12 +759,12 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
match peer_map.entry(peer.clone()) {
hash_map::Entry::Occupied(_) => continue,
hash_map::Entry::Vacant(vacant) => {
vacant.insert(PeerData { view: View::default() });
vacant.insert(PeerData { view: View::default(), version });
},
}
metrics.on_peer_connected(peer_set);
metrics.note_peer_count(peer_set, peer_map.len());
metrics.on_peer_connected(peer_set, version);
metrics.note_peer_count(peer_set, version, peer_map.len());
shared.local_view.clone().unwrap_or(View::default())
};
@@ -718,6 +779,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
NetworkBridgeEvent::PeerConnected(
peer.clone(),
role,
1,
maybe_authority,
),
NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()),
@@ -730,6 +792,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
&mut network_service,
vec![peer],
PeerSet::Validation,
version,
WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(local_view),
&metrics,
);
@@ -740,6 +803,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
NetworkBridgeEvent::PeerConnected(
peer.clone(),
role,
1,
maybe_authority,
),
NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()),
@@ -752,6 +816,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
&mut network_service,
vec![peer],
PeerSet::Collation,
version,
WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(local_view),
&metrics,
);
@@ -759,7 +824,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
}
},
Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => {
let peer_set = match PeerSet::try_from_protocol_name(&protocol) {
let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) {
None => continue,
Some(peer_set) => peer_set,
};
@@ -780,13 +845,13 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
let w = peer_map.remove(&peer).is_some();
metrics.on_peer_disconnected(peer_set);
metrics.note_peer_count(peer_set, peer_map.len());
metrics.on_peer_disconnected(peer_set, version);
metrics.note_peer_count(peer_set, version, peer_map.len());
w
};
if was_connected {
if was_connected && version == peer_set.get_default_version() {
match peer_set {
PeerSet::Validation =>
dispatch_validation_event_to_all(
@@ -804,83 +869,151 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
}
},
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let expected_versions = {
let mut versions = PerPeerSet::<Option<ProtocolVersion>>::default();
let shared = shared.0.lock();
if let Some(peer_data) = shared.validation_peers.get(&remote) {
versions[PeerSet::Validation] = Some(peer_data.version);
}
if let Some(peer_data) = shared.collation_peers.get(&remote) {
versions[PeerSet::Collation] = Some(peer_data.version);
}
versions
};
// non-decoded, but version-checked validation messages.
let v_messages: Result<Vec<_>, _> = messages
.iter()
.filter(|(protocol, _)| protocol == &PeerSet::Validation.into_protocol_name())
.map(|(_, msg_bytes)| {
WireMessage::decode(&mut msg_bytes.as_ref()).map(|m| (m, msg_bytes.len()))
.filter_map(|(protocol, msg_bytes)| {
// version doesn't matter because we always receive on the 'correct'
// protocol name, not the negotiated fallback.
let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?;
if peer_set == PeerSet::Validation {
if expected_versions[PeerSet::Validation].is_none() {
return Some(Err(UNCONNECTED_PEERSET_COST))
}
Some(Ok(msg_bytes.clone()))
} else {
None
}
})
.collect();
let v_messages = match v_messages {
Err(_) => {
Err(rep) => {
gum::debug!(target: LOG_TARGET, action = "ReportPeer");
network_service.report_peer(remote, rep);
network_service.report_peer(remote, MALFORMED_MESSAGE_COST);
continue
},
Ok(v) => v,
};
// non-decoded, but version-checked colldation messages.
let c_messages: Result<Vec<_>, _> = messages
.iter()
.filter(|(protocol, _)| protocol == &PeerSet::Collation.into_protocol_name())
.map(|(_, msg_bytes)| {
WireMessage::decode(&mut msg_bytes.as_ref()).map(|m| (m, msg_bytes.len()))
.filter_map(|(protocol, msg_bytes)| {
// version doesn't matter because we always receive on the 'correct'
// protocol name, not the negotiated fallback.
let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?;
if peer_set == PeerSet::Collation {
if expected_versions[PeerSet::Collation].is_none() {
return Some(Err(UNCONNECTED_PEERSET_COST))
}
Some(Ok(msg_bytes.clone()))
} else {
None
}
})
.collect();
match c_messages {
Err(_) => {
let c_messages = match c_messages {
Err(rep) => {
gum::debug!(target: LOG_TARGET, action = "ReportPeer");
network_service.report_peer(remote, rep);
network_service.report_peer(remote, MALFORMED_MESSAGE_COST);
continue
},
Ok(c_messages) =>
if v_messages.is_empty() && c_messages.is_empty() {
continue
Ok(v) => v,
};
if v_messages.is_empty() && c_messages.is_empty() {
continue
}
gum::trace!(
target: LOG_TARGET,
action = "PeerMessages",
peer = ?remote,
num_validation_messages = %v_messages.len(),
num_collation_messages = %c_messages.len()
);
if !v_messages.is_empty() {
let (events, reports) =
if expected_versions[PeerSet::Validation] == Some(1) {
handle_v1_peer_messages::<protocol_v1::ValidationProtocol, _>(
remote.clone(),
PeerSet::Validation,
&mut shared.0.lock().validation_peers,
v_messages,
&metrics,
)
} else {
gum::trace!(
gum::warn!(
target: LOG_TARGET,
action = "PeerMessages",
peer = ?remote,
num_validation_messages = %v_messages.len(),
num_collation_messages = %c_messages.len()
version = ?expected_versions[PeerSet::Validation],
"Major logic bug. Peer somehow has unsupported validation protocol version."
);
if !v_messages.is_empty() {
let (events, reports) = handle_peer_messages(
remote.clone(),
PeerSet::Validation,
&mut shared.0.lock().validation_peers,
v_messages,
&metrics,
);
never!("Only version 1 is supported; peer set connection checked above; qed");
for report in reports {
network_service.report_peer(remote.clone(), report);
}
// If a peer somehow triggers this, we'll disconnect them
// eventually.
(Vec::new(), vec![UNCONNECTED_PEERSET_COST])
};
dispatch_validation_events_to_all(events, &mut sender).await;
}
for report in reports {
network_service.report_peer(remote.clone(), report);
}
if !c_messages.is_empty() {
let (events, reports) = handle_peer_messages(
remote.clone(),
PeerSet::Collation,
&mut shared.0.lock().collation_peers,
c_messages,
&metrics,
);
dispatch_validation_events_to_all(events, &mut sender).await;
}
for report in reports {
network_service.report_peer(remote.clone(), report);
}
if !c_messages.is_empty() {
let (events, reports) =
if expected_versions[PeerSet::Collation] == Some(1) {
handle_v1_peer_messages::<protocol_v1::CollationProtocol, _>(
remote.clone(),
PeerSet::Collation,
&mut shared.0.lock().collation_peers,
c_messages,
&metrics,
)
} else {
gum::warn!(
target: LOG_TARGET,
version = ?expected_versions[PeerSet::Collation],
"Major logic bug. Peer somehow has unsupported collation protocol version."
);
dispatch_collation_events_to_all(events, &mut sender).await;
}
},
never!("Only version 1 is supported; peer set connection checked above; qed");
// If a peer somehow triggers this, we'll disconnect them
// eventually.
(Vec::new(), vec![UNCONNECTED_PEERSET_COST])
};
for report in reports {
network_service.report_peer(remote.clone(), report);
}
dispatch_collation_events_to_all(events, &mut sender).await;
}
},
}
@@ -1007,14 +1140,14 @@ fn update_our_view(
)
};
send_validation_message(
send_validation_message_v1(
net,
validation_peers,
WireMessage::ViewUpdate(new_view.clone()),
metrics,
);
send_collation_message(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics);
send_collation_message_v1(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics);
let our_view = OurView::new(
live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)),
@@ -1032,27 +1165,34 @@ fn update_our_view(
);
}
// Handle messages on a specific peer-set. The peer is expected to be connected on that
// Handle messages on a specific v1 peer-set. The peer is expected to be connected on that
// peer-set.
fn handle_peer_messages<M>(
fn handle_v1_peer_messages<RawMessage: Decode, OutMessage: From<RawMessage>>(
peer: PeerId,
peer_set: PeerSet,
peers: &mut HashMap<PeerId, PeerData>,
messages: Vec<(WireMessage<M>, usize)>,
messages: Vec<Bytes>,
metrics: &Metrics,
) -> (Vec<NetworkBridgeEvent<M>>, Vec<Rep>) {
) -> (Vec<NetworkBridgeEvent<OutMessage>>, Vec<Rep>) {
let peer_data = match peers.get_mut(&peer) {
None => return (Vec::new(), vec![UNCONNECTED_PEERSET_COST]),
Some(d) => d,
};
let mut outgoing_messages = Vec::with_capacity(messages.len());
let mut outgoing_events = Vec::with_capacity(messages.len());
let mut reports = Vec::new();
for (message, size_bytes) in messages {
metrics.on_notification_received(peer_set, size_bytes);
for message in messages {
metrics.on_notification_received(peer_set, peer_data.version, message.len());
let message = match WireMessage::<RawMessage>::decode_all(&mut message.as_ref()) {
Err(_) => {
reports.push(MALFORMED_MESSAGE_COST);
continue
},
Ok(m) => m,
};
outgoing_messages.push(match message {
outgoing_events.push(match message {
WireMessage::ViewUpdate(new_view) => {
if new_view.len() > MAX_VIEW_HEADS ||
new_view.finalized_number < peer_data.view.finalized_number
@@ -1071,47 +1211,47 @@ fn handle_peer_messages<M>(
}
},
WireMessage::ProtocolMessage(message) =>
NetworkBridgeEvent::PeerMessage(peer.clone(), message),
NetworkBridgeEvent::PeerMessage(peer.clone(), message.into()),
})
}
(outgoing_messages, reports)
(outgoing_events, reports)
}
fn send_validation_message(
fn send_validation_message_v1(
net: &mut impl Network,
peers: Vec<PeerId>,
message: WireMessage<protocol_v1::ValidationProtocol>,
metrics: &Metrics,
) {
send_message(net, peers, PeerSet::Validation, message, metrics);
send_message(net, peers, PeerSet::Validation, 1, message, metrics);
}
fn send_collation_message(
fn send_collation_message_v1(
net: &mut impl Network,
peers: Vec<PeerId>,
message: WireMessage<protocol_v1::CollationProtocol>,
metrics: &Metrics,
) {
send_message(net, peers, PeerSet::Collation, message, metrics)
send_message(net, peers, PeerSet::Collation, 1, message, metrics)
}
async fn dispatch_validation_event_to_all(
event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
ctx: &mut impl SubsystemSender,
) {
dispatch_validation_events_to_all(std::iter::once(event), ctx).await
}
async fn dispatch_collation_event_to_all(
event: NetworkBridgeEvent<protocol_v1::CollationProtocol>,
event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
ctx: &mut impl SubsystemSender,
) {
dispatch_collation_events_to_all(std::iter::once(event), ctx).await
}
fn dispatch_validation_event_to_all_unbounded(
event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
ctx: &mut impl SubsystemSender,
) {
for msg in AllMessages::dispatch_iter(event) {
@@ -1120,17 +1260,17 @@ fn dispatch_validation_event_to_all_unbounded(
}
fn dispatch_collation_event_to_all_unbounded(
event: NetworkBridgeEvent<protocol_v1::CollationProtocol>,
event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
ctx: &mut impl SubsystemSender,
) {
if let Some(msg) = event.focus().ok().map(CollatorProtocolMessage::NetworkBridgeUpdateV1) {
if let Some(msg) = event.focus().ok().map(CollatorProtocolMessage::NetworkBridgeUpdate) {
ctx.send_unbounded_message(msg.into());
}
}
async fn dispatch_validation_events_to_all<I>(events: I, ctx: &mut impl SubsystemSender)
where
I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::ValidationProtocol>>,
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
ctx.send_messages(events.into_iter().flat_map(AllMessages::dispatch_iter)).await
@@ -1138,13 +1278,14 @@ where
async fn dispatch_collation_events_to_all<I>(events: I, ctx: &mut impl SubsystemSender)
where
I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::CollationProtocol>>,
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>>,
I::IntoIter: Send,
{
let messages_for = |event: NetworkBridgeEvent<protocol_v1::CollationProtocol>| {
event.focus().ok().map(|m| {
AllMessages::CollatorProtocol(CollatorProtocolMessage::NetworkBridgeUpdateV1(m))
})
let messages_for = |event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>| {
event
.focus()
.ok()
.map(|m| AllMessages::CollatorProtocol(CollatorProtocolMessage::NetworkBridgeUpdate(m)))
};
ctx.send_messages(events.into_iter().flat_map(messages_for)).await