Upgrade libp2p to 0.49.0 (#12256)

* cargo upgrade libp2p

* Get rid of `NetworkBehaviourEventProcess` in handling of `CustomMessageOutcome`

* Get rid of `NetworkBehaviourEventProcess` in handling of `request_responses::Event`

* Get rid of `NetworkBehaviourEventProcess` in handling of `peer_info::PeerInfoEvent`

* Get rid of `NetworkBehaviourEventProcess` in handling of `DiscoveryOut`

* Get rid of `poll()` method in `Bahaviour`

* minor: comments

* Upgrade libp2p to 0.49.0 (unreleased)

* Support multiple Kad protocol names

* Make borrow checker happy

* minor: wording

* Make substrate build with libp2p-0.49.0

* rustfmt

* Get rid of MdnsWrapper

* Resolve deprecation warnings

* Fix documentation

* Apply suggestions from code review: fix typos

Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com>

* Apply suggestion: simplify kad protocol name matching

Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com>
This commit is contained in:
Dmitrii Markin
2022-10-17 14:02:37 +03:00
committed by GitHub
parent 882023f570
commit bb175f0373
21 changed files with 543 additions and 846 deletions
+123 -231
View File
@@ -24,18 +24,13 @@ use crate::{
};
use bytes::Bytes;
use codec::Encode;
use futures::channel::oneshot;
use libp2p::{
core::{Multiaddr, PeerId, PublicKey},
identify::IdentifyInfo,
identify::Info as IdentifyInfo,
kad::record,
swarm::{
NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters,
},
NetworkBehaviour,
};
use log::debug;
use sc_consensus::import_queue::{IncomingBlock, RuntimeOrigin};
use sc_network_common::{
@@ -46,26 +41,22 @@ use sc_network_common::{
ProtocolName,
},
request_responses::{IfDisconnected, ProtocolConfig, RequestFailure},
sync::{warp::WarpProofRequest, OpaqueBlockRequest, OpaqueStateRequest},
};
use sc_peerset::PeersetHandle;
use sc_peerset::{PeersetHandle, ReputationChange};
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Justifications,
};
use std::{
collections::{HashSet, VecDeque},
iter,
task::{Context, Poll},
time::Duration,
};
use std::{collections::HashSet, time::Duration};
pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, ResponseFailure};
/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll", event_process = true)]
#[behaviour(out_event = "BehaviourOut<B>")]
pub struct Behaviour<B, Client>
where
B: BlockT,
@@ -80,25 +71,6 @@ where
discovery: DiscoveryBehaviour,
/// Generic request-response protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Queue of events to produce for the outside.
#[behaviour(ignore)]
events: VecDeque<BehaviourOut<B>>,
/// Protocol name used to send out block requests via
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
block_request_protocol_name: String,
/// Protocol name used to send out state requests via
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
state_request_protocol_name: String,
/// Protocol name used to send out warp sync requests via
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
warp_sync_protocol_name: Option<String>,
}
/// Event generated by `Behaviour`.
@@ -107,7 +79,7 @@ pub enum BehaviourOut<B: BlockT> {
JustificationImport(RuntimeOrigin, B::Hash, NumberFor<B>, Justifications),
/// Started a random iterative Kademlia discovery query.
RandomKademliaStarted(ProtocolId),
RandomKademliaStarted(Vec<ProtocolId>),
/// We have received a request from a peer and answered it.
///
@@ -136,6 +108,12 @@ pub enum BehaviourOut<B: BlockT> {
result: Result<(), RequestFailure>,
},
/// A request protocol handler issued reputation changes for the given peer.
ReputationChanges {
peer: PeerId,
changes: Vec<ReputationChange>,
},
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
@@ -186,15 +164,60 @@ pub enum BehaviourOut<B: BlockT> {
messages: Vec<(ProtocolName, Bytes)>,
},
/// A new block request must be emitted.
BlockRequest {
/// Node we send the request to.
target: PeerId,
/// Opaque implementation-specific block request.
request: OpaqueBlockRequest,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new state request must be emitted.
StateRequest {
/// Node we send the request to.
target: PeerId,
/// Opaque implementation-specific state request.
request: OpaqueStateRequest,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new warp sync request must be emitted.
WarpSyncRequest {
/// Node we send the request to.
target: PeerId,
/// Warp sync request.
request: WarpProofRequest<B>,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// Now connected to a new peer for syncing purposes.
SyncConnected(PeerId),
/// No longer connected to a peer for syncing purposes.
SyncDisconnected(PeerId),
/// We have obtained identity information from a peer, including the addresses it is listening
/// on.
PeerIdentify {
/// Id of the peer that has been identified.
peer_id: PeerId,
/// Information about the peer.
info: IdentifyInfo,
},
/// We have learned about the existence of a node on the default set.
Discovered(PeerId),
/// Events generated by a DHT as a response to get_value or put_value requests as well as the
/// request duration.
Dht(DhtEvent, Duration),
/// Ignored event generated by lower layers.
None,
}
impl<B, Client> Behaviour<B, Client>
@@ -216,17 +239,9 @@ where
mut request_response_protocols: Vec<ProtocolConfig>,
peerset: PeersetHandle,
) -> Result<Self, request_responses::RegisterError> {
// Extract protocol name and add to `request_response_protocols`.
let block_request_protocol_name = block_request_protocol_config.name.to_string();
let state_request_protocol_name = state_request_protocol_config.name.to_string();
let warp_sync_protocol_name = match warp_sync_protocol_config {
Some(config) => {
let name = config.name.to_string();
request_response_protocols.push(config);
Some(name)
},
None => None,
};
if let Some(config) = warp_sync_protocol_config {
request_response_protocols.push(config);
}
request_response_protocols.push(block_request_protocol_config);
request_response_protocols.push(state_request_protocol_config);
request_response_protocols.push(light_client_request_protocol_config);
@@ -239,10 +254,6 @@ where
request_response_protocols.into_iter(),
peerset,
)?,
events: VecDeque::new(),
block_request_protocol_name,
state_request_protocol_name,
warp_sync_protocol_name,
})
}
@@ -310,6 +321,17 @@ where
&mut self.substrate
}
/// Add a self-reported address of a remote peer to the k-buckets of the supported
/// DHTs (`supported_protocols`).
pub fn add_self_reported_address_to_dht(
&mut self,
peer_id: &PeerId,
supported_protocols: &[impl AsRef<[u8]>],
addr: Multiaddr,
) {
self.discovery.add_self_reported_address(peer_id, supported_protocols, addr);
}
/// Start querying a record from the DHT. Will later produce either a `ValueFound` or a
/// `ValueNotFound` event.
pub fn get_value(&mut self, key: record::Key) {
@@ -333,221 +355,91 @@ fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole {
}
}
impl<B, Client> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, event: CustomMessageOutcome<B>) {
impl<B: BlockT> From<CustomMessageOutcome<B>> for BehaviourOut<B> {
fn from(event: CustomMessageOutcome<B>) -> Self {
match event {
CustomMessageOutcome::BlockImport(origin, blocks) =>
self.events.push_back(BehaviourOut::BlockImport(origin, blocks)),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => self
.events
.push_back(BehaviourOut::JustificationImport(origin, hash, nb, justification)),
CustomMessageOutcome::BlockRequest { target, request, pending_response } => {
match self.substrate.encode_block_request(&request) {
Ok(data) => {
self.request_responses.send_request(
&target,
&self.block_request_protocol_name,
data,
pending_response,
IfDisconnected::ImmediateError,
);
},
Err(err) => {
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
request, err
);
},
}
},
CustomMessageOutcome::StateRequest { target, request, pending_response } => {
match self.substrate.encode_state_request(&request) {
Ok(data) => {
self.request_responses.send_request(
&target,
&self.state_request_protocol_name,
data,
pending_response,
IfDisconnected::ImmediateError,
);
},
Err(err) => {
log::warn!(
target: "sync",
"Failed to encode state request {:?}: {:?}",
request, err
);
},
}
},
BehaviourOut::BlockImport(origin, blocks),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
BehaviourOut::JustificationImport(origin, hash, nb, justification),
CustomMessageOutcome::BlockRequest { target, request, pending_response } =>
BehaviourOut::BlockRequest { target, request, pending_response },
CustomMessageOutcome::StateRequest { target, request, pending_response } =>
BehaviourOut::StateRequest { target, request, pending_response },
CustomMessageOutcome::WarpSyncRequest { target, request, pending_response } =>
match &self.warp_sync_protocol_name {
Some(name) => self.request_responses.send_request(
&target,
name,
request.encode(),
pending_response,
IfDisconnected::ImmediateError,
),
None => {
log::warn!(
target: "sync",
"Trying to send warp sync request when no protocol is configured {:?}",
request,
);
},
},
BehaviourOut::WarpSyncRequest { target, request, pending_response },
CustomMessageOutcome::NotificationStreamOpened {
remote,
protocol,
negotiated_fallback,
roles,
notifications_sink,
} => {
self.events.push_back(BehaviourOut::NotificationStreamOpened {
remote,
protocol,
negotiated_fallback,
role: reported_roles_to_observed_role(roles),
notifications_sink,
});
} => BehaviourOut::NotificationStreamOpened {
remote,
protocol,
negotiated_fallback,
role: reported_roles_to_observed_role(roles),
notifications_sink,
},
CustomMessageOutcome::NotificationStreamReplaced {
remote,
protocol,
notifications_sink,
} => self.events.push_back(BehaviourOut::NotificationStreamReplaced {
remote,
protocol,
notifications_sink,
}),
CustomMessageOutcome::NotificationStreamClosed { remote, protocol } => self
.events
.push_back(BehaviourOut::NotificationStreamClosed { remote, protocol }),
CustomMessageOutcome::NotificationsReceived { remote, messages } => {
self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages });
},
CustomMessageOutcome::PeerNewBest(_peer_id, _number) => {},
CustomMessageOutcome::SyncConnected(peer_id) =>
self.events.push_back(BehaviourOut::SyncConnected(peer_id)),
} => BehaviourOut::NotificationStreamReplaced { remote, protocol, notifications_sink },
CustomMessageOutcome::NotificationStreamClosed { remote, protocol } =>
BehaviourOut::NotificationStreamClosed { remote, protocol },
CustomMessageOutcome::NotificationsReceived { remote, messages } =>
BehaviourOut::NotificationsReceived { remote, messages },
CustomMessageOutcome::PeerNewBest(_peer_id, _number) => BehaviourOut::None,
CustomMessageOutcome::SyncConnected(peer_id) => BehaviourOut::SyncConnected(peer_id),
CustomMessageOutcome::SyncDisconnected(peer_id) =>
self.events.push_back(BehaviourOut::SyncDisconnected(peer_id)),
CustomMessageOutcome::None => {},
BehaviourOut::SyncDisconnected(peer_id),
CustomMessageOutcome::None => BehaviourOut::None,
}
}
}
impl<B, Client> NetworkBehaviourEventProcess<request_responses::Event> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, event: request_responses::Event) {
impl<B: BlockT> From<request_responses::Event> for BehaviourOut<B> {
fn from(event: request_responses::Event) -> Self {
match event {
request_responses::Event::InboundRequest { peer, protocol, result } => {
self.events.push_back(BehaviourOut::InboundRequest { peer, protocol, result });
},
request_responses::Event::RequestFinished { peer, protocol, duration, result } => {
self.events.push_back(BehaviourOut::RequestFinished {
peer,
protocol,
duration,
result,
});
},
request_responses::Event::InboundRequest { peer, protocol, result } =>
BehaviourOut::InboundRequest { peer, protocol, result },
request_responses::Event::RequestFinished { peer, protocol, duration, result } =>
BehaviourOut::RequestFinished { peer, protocol, duration, result },
request_responses::Event::ReputationChanges { peer, changes } =>
for change in changes {
self.substrate.report_peer(peer, change);
},
BehaviourOut::ReputationChanges { peer, changes },
}
}
}
impl<B, Client> NetworkBehaviourEventProcess<peer_info::PeerInfoEvent> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, event: peer_info::PeerInfoEvent) {
let peer_info::PeerInfoEvent::Identified {
peer_id,
info: IdentifyInfo { protocol_version, agent_version, mut listen_addrs, protocols, .. },
} = event;
if listen_addrs.len() > 30 {
debug!(
target: "sub-libp2p",
"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
peer_id, protocol_version, agent_version
);
listen_addrs.truncate(30);
}
for addr in listen_addrs {
self.discovery.add_self_reported_address(&peer_id, protocols.iter(), addr);
}
self.substrate.add_default_set_discovered_nodes(iter::once(peer_id));
impl<B: BlockT> From<peer_info::PeerInfoEvent> for BehaviourOut<B> {
fn from(event: peer_info::PeerInfoEvent) -> Self {
let peer_info::PeerInfoEvent::Identified { peer_id, info } = event;
BehaviourOut::PeerIdentify { peer_id, info }
}
}
impl<B, Client> NetworkBehaviourEventProcess<DiscoveryOut> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, out: DiscoveryOut) {
match out {
impl<B: BlockT> From<DiscoveryOut> for BehaviourOut<B> {
fn from(event: DiscoveryOut) -> Self {
match event {
DiscoveryOut::UnroutablePeer(_peer_id) => {
// Obtaining and reporting listen addresses for unroutable peers back
// to Kademlia is handled by the `Identify` protocol, part of the
// `PeerInfoBehaviour`. See the `NetworkBehaviourEventProcess`
// implementation for `PeerInfoEvent`.
},
DiscoveryOut::Discovered(peer_id) => {
self.substrate.add_default_set_discovered_nodes(iter::once(peer_id));
},
DiscoveryOut::ValueFound(results, duration) => {
self.events
.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results), duration));
},
DiscoveryOut::ValueNotFound(key, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key), duration));
},
DiscoveryOut::ValuePut(key, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key), duration));
},
DiscoveryOut::ValuePutFailed(key, duration) => {
self.events
.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration));
// `PeerInfoBehaviour`. See the `From<peer_info::PeerInfoEvent>`
// implementation.
BehaviourOut::None
},
DiscoveryOut::Discovered(peer_id) => BehaviourOut::Discovered(peer_id),
DiscoveryOut::ValueFound(results, duration) =>
BehaviourOut::Dht(DhtEvent::ValueFound(results), duration),
DiscoveryOut::ValueNotFound(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValueNotFound(key), duration),
DiscoveryOut::ValuePut(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePut(key), duration),
DiscoveryOut::ValuePutFailed(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration),
DiscoveryOut::RandomKademliaStarted(protocols) =>
for protocol in protocols {
self.events.push_back(BehaviourOut::RandomKademliaStarted(protocol));
},
BehaviourOut::RandomKademliaStarted(protocols),
}
}
}
impl<B, Client> Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + 'static,
{
fn poll(
&mut self,
_cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<BehaviourOut<B>, <Self as NetworkBehaviour>::ConnectionHandler>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
Poll::Pending
}
}