Allow remotes to not open a legacy substream (#7075)

* Allow remotes to not open a legacy substream

* Misc fixes

* Special case first protocol as the one bearing the handshake
This commit is contained in:
Pierre Krieger
2020-09-14 17:08:23 +02:00
committed by GitHub
parent e18ddc6d4a
commit eadf5b09f9
8 changed files with 168 additions and 154 deletions
+117 -118
View File
@@ -36,7 +36,7 @@ use sp_consensus::{
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use codec::{Decode, Encode};
use codec::{Decode, DecodeAll, Encode};
use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
@@ -53,7 +53,7 @@ use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use wasm_timer::Instant;
@@ -271,8 +271,6 @@ struct Peer<B: BlockT, H: ExHashT> {
pub struct PeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
/// Protocol version
pub protocol_version: u32,
/// Peer best block hash
pub best_hash: B::Hash,
/// Peer best block number
@@ -391,14 +389,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};
let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let mut behaviour = GenericProto::new(
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
);
let mut legacy_equiv_by_name = HashMap::new();
@@ -409,7 +399,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.push_str("/transactions/1");
proto
});
behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);
let block_announces_protocol: Cow<'static, str> = Cow::from({
@@ -419,12 +408,24 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.push_str("/block-announces/1");
proto
});
behaviour.register_notif_protocol(
block_announces_protocol.clone(),
BlockAnnouncesHandshake::build(&config, &chain).encode()
);
legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);
let behaviour = {
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let block_announces_handshake = BlockAnnouncesHandshake::build(&config, &chain).encode();
GenericProto::new(
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
// As documented in `GenericProto`, the first protocol in the list is always the
// one carrying the handshake reported in the `CustomProtocolOpen` event.
iter::once((block_announces_protocol.clone(), block_announces_handshake))
.chain(iter::once((transactions_protocol.clone(), vec![]))),
)
};
let protocol = Protocol {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
@@ -829,99 +830,86 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
/// Called on receipt of a status message via the legacy protocol on the first connection between two peers.
pub fn on_peer_connected(
/// Called on the first connection between two peers, after their exchange of handshake.
fn on_peer_connected(
&mut self,
who: PeerId,
status: message::Status<B>,
status: BlockAnnouncesHandshake<B>,
notifications_sink: NotificationsSink,
) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let _protocol_version = {
if self.context_data.peers.contains_key(&who) {
debug!(target: "sync", "Ignoring duplicate status packet from {}", who);
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);
if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
}
if self.context_data.peers.contains_key(&who) {
debug!(target: "sync", "Ignoring duplicate status packet from {}", who);
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
log!(
if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer {:?} using unsupported protocol version {}", who, status.version
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL);
}
return CustomMessageOutcome::None;
}
if self.config.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
if self.config.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
}
let peer = Peer {
info: PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);
debug!(target: "sync", "Connected {}", who);
status.version
let peer = Peer {
info: PeerInfo {
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);
debug!(target: "sync", "Connected {}", who);
let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number));
@@ -1151,20 +1139,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if inserted || force {
let message = message::BlockAnnounce {
header: header.clone(),
state: if peer.info.protocol_version >= 4 {
if is_best {
Some(message::BlockState::Best)
} else {
Some(message::BlockState::Normal)
}
} else {
None
},
data: if peer.info.protocol_version >= 4 {
Some(data.clone())
state: if is_best {
Some(message::BlockState::Best)
} else {
None
Some(message::BlockState::Normal)
},
data: Some(data.clone()),
};
self.behaviour.write_notification(
@@ -1588,9 +1568,20 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
let outcome = match event {
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => {
match <Message<B> as Decode>::decode(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) =>
self.on_peer_connected(peer_id, handshake, notifications_sink),
// `received_handshake` can be either a `Status` message if received from the
// legacy substream ,or a `BlockAnnouncesHandshake` if received from the block
// announces substream.
match <Message<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => {
let handshake = BlockAnnouncesHandshake {
roles: handshake.roles,
best_number: handshake.best_number,
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};
self.on_peer_connected(peer_id, handshake, notifications_sink)
},
Ok(msg) => {
debug!(
target: "sync",
@@ -1602,15 +1593,23 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
CustomMessageOutcome::None
}
Err(err) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {}",
peer_id,
received_handshake,
err.what()
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(handshake) => {
self.on_peer_connected(peer_id, handshake, notifications_sink)
}
Err(err2) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {} & {}",
peer_id,
received_handshake,
err.what(),
err2,
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
}
}
}
}
}
@@ -336,14 +336,21 @@ impl GenericProto {
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>)>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs)| (n, Arc::new(RwLock::new(hs))))
.collect::<Vec<_>>();
assert!(!notif_protocols.is_empty());
let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);
GenericProto {
local_peer_id,
legacy_protocol,
notif_protocols: Vec::new(),
notif_protocols,
peerset,
peers: FnvHashMap::default(),
delays: Default::default(),
@@ -113,10 +113,11 @@ pub struct NotifsHandler {
/// Handler for backwards-compatibility.
legacy: LegacyProtoHandler,
/// In the situation where `legacy.is_open()` is true, but we haven't sent out any
/// [`NotifsHandlerOut::Open`] event yet, this contains the handshake received on the legacy
/// substream.
pending_legacy_handshake: Option<Vec<u8>>,
/// In the situation where either the legacy substream has been opened or the handshake-bearing
/// notifications protocol is open, but we haven't sent out any [`NotifsHandlerOut::Open`]
/// event yet, this contains the received handshake waiting to be reported through the
/// external API.
pending_handshake: Option<Vec<u8>>,
/// State of this handler.
enabled: EnabledState,
@@ -172,7 +173,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
.collect(),
endpoint: connected_point.clone(),
legacy: self.legacy.into_handler(remote_peer_id, connected_point),
pending_legacy_handshake: None,
pending_handshake: None,
enabled: EnabledState::Initial,
pending_in: Vec::new(),
notifications_sink_rx: None,
@@ -360,11 +361,20 @@ impl NotifsHandlerProto {
/// `list` is a list of notification protocols names, and the message to send as part of the
/// handshake. At the moment, the message is always the same whether we open a substream
/// ourselves or respond to handshake from the remote.
///
/// The first protocol in `list` is special-cased as the protocol that contains the handshake
/// to report through the [`NotifsHandlerOut::Open`] event.
///
/// # Panic
///
/// - Panics if `list` is empty.
///
pub fn new(
legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>>,
) -> Self {
let list = list.into();
assert!(!list.is_empty());
let out_handlers = list
.clone()
@@ -614,11 +624,12 @@ impl ProtocolsHandler for NotifsHandler {
}
}
// If `self.pending_legacy_handshake` is `Some`, we are in a state where the legacy
// substream is open but the user isn't aware yet of the substreams being open.
// If `self.pending_handshake` is `Some`, we are in a state where the handshake-bearing
// substream (either the legacy substream or the one special-cased as providing the
// handshake) is open but the user isn't aware yet of the substreams being open.
// When that is the case, neither the legacy substream nor the incoming notifications
// substreams should be polled, otherwise there is a risk of receiving messages from them.
if self.pending_legacy_handshake.is_none() {
if self.pending_handshake.is_none() {
while let Poll::Ready(ev) = self.legacy.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } =>
@@ -631,14 +642,16 @@ impl ProtocolsHandler for NotifsHandler {
received_handshake,
..
}) => {
self.pending_legacy_handshake = Some(received_handshake);
if self.notifications_sink_rx.is_none() {
debug_assert!(self.pending_handshake.is_none());
self.pending_handshake = Some(received_handshake);
}
cx.waker().wake_by_ref();
return Poll::Pending;
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => {
// We consciously drop the receivers despite notifications being potentially
// still buffered up.
debug_assert!(self.notifications_sink_rx.is_some());
self.notifications_sink_rx = None;
return Poll::Ready(ProtocolsHandlerEvent::Custom(
@@ -646,7 +659,6 @@ impl ProtocolsHandler for NotifsHandler {
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => {
debug_assert!(self.notifications_sink_rx.is_some());
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CustomMessage { message }
))
@@ -663,7 +675,7 @@ impl ProtocolsHandler for NotifsHandler {
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
loop {
let poll = if self.pending_legacy_handshake.is_none() {
let poll = if self.notifications_sink_rx.is_some() {
handler.poll(cx)
} else {
handler.poll_process(cx)
@@ -692,7 +704,7 @@ impl ProtocolsHandler for NotifsHandler {
},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
debug_assert!(self.pending_legacy_handshake.is_none());
debug_assert!(self.pending_handshake.is_none());
if self.notifications_sink_rx.is_some() {
let msg = NotifsHandlerOut::Notification {
message,
@@ -716,12 +728,17 @@ impl ProtocolsHandler for NotifsHandler {
}),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
// At the moment we don't actually care whether any notifications protocol
// opens or closes.
// Whether our communications with the remote are open or closed entirely
// depends on the legacy substream, because as long as we are open the user of
// this struct might try to send legacy protocol messages which we need to
// deliver for things to work properly.
// Opened substream on the handshake-bearing notification protocol.
ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { handshake })
if handler_num == 0 =>
{
if self.notifications_sink_rx.is_none() && self.pending_handshake.is_none() {
self.pending_handshake = Some(handshake);
}
},
// Nothing to do in response to other notification substreams being opened
// or closed.
ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { .. }) => {},
ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed) => {},
ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Refused) => {},
@@ -730,7 +747,7 @@ impl ProtocolsHandler for NotifsHandler {
}
if self.out_handlers.iter().all(|(h, _)| h.is_open() || h.is_refused()) {
if let Some(handshake) = self.pending_legacy_handshake.take() {
if let Some(handshake) = self.pending_handshake.take() {
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
@@ -32,7 +32,7 @@ use libp2p::swarm::{
Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters,
NetworkBehaviour, NetworkBehaviourAction
};
use std::{error, io, task::Context, task::Poll, time::Duration};
use std::{error, io, iter, task::{Context, Poll}, time::Duration};
/// Builds two nodes that have each other as bootstrap nodes.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
@@ -78,7 +78,10 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});
let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(local_peer_id, "test", &[1], vec![], peerset),
inner: GenericProto::new(
local_peer_id, "test", &[1], vec![], peerset,
iter::once(("/foo".into(), Vec::new()))
),
addrs: addrs
.iter()
.enumerate()
@@ -41,12 +41,6 @@ pub type Message<B> = generic::Message<
<B as BlockT>::Extrinsic,
>;
/// Type alias for using the status type using block type parameters.
pub type Status<B> = generic::Status<
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
>;
/// Type alias for using the block request type using block type parameters.
pub type BlockRequest<B> = generic::BlockRequest<
<B as BlockT>::Hash,