Network sync refactoring (part 6) (#11940)

* Extract `NetworkKVProvider` trait in `sc-authority-discovery` and remove unnecessary dependency

* Extract `NetworkSyncForkRequest` trait in `sc-finality-grandpa`

* Relax requirements on `SyncOracle` trait, remove extra native methods from `NetworkService` that are already provided by trait impls

* Move `NetworkSigner` trait from `sc-authority-discovery` into `sc-network-common` and de-duplicate methods on `NetworkService`

* Move `NetworkKVProvider` trait from `sc-authority-discovery` into `sc-network-common` and de-duplicate methods on `NetworkService`

* Minimize `sc-authority-discovery` dependency on `sc-network`

* Move `NetworkSyncForkRequest` trait from `sc-finality-grandpa` to `sc-network-common` and de-duplicate methods in `NetworkService`

* Extract `NetworkStatusProvider` trait and de-duplicate methods on `NetworkService`

* Extract `NetworkPeers` trait and de-duplicate methods on `NetworkService`

* Extract `NetworkEventStream` trait and de-duplicate methods on `NetworkService`

* Move more methods from `NetworkService` into `NetworkPeers` trait

* Move `NetworkStateInfo` trait into `sc-network-common`

* Extract `NetworkNotification` trait and de-duplicate methods on `NetworkService`

* Extract `NetworkRequest` trait and de-duplicate methods on `NetworkService`

* Remove `NetworkService::local_peer_id()`, it is already provided by `NetworkStateInfo` impl

* Extract `NetworkTransaction` trait and de-duplicate methods on `NetworkService`

* Extract `NetworkBlock` trait and de-duplicate methods on `NetworkService`

* Remove dependencies on `NetworkService` from most of the methods of `sc-service`

* Address simple review comments
This commit is contained in:
Nazar Mokrynskyi
2022-08-09 21:28:32 +03:00
committed by GitHub
parent 9c56e79c43
commit a685582bfd
49 changed files with 1889 additions and 1029 deletions
+121 -17
View File
@@ -21,7 +21,8 @@ use crate::{
Network, Validator,
};
use sc_network::{Event, ReputationChange};
use sc_network::ReputationChange;
use sc_network_common::protocol::event::Event;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
@@ -85,7 +86,7 @@ impl<B: BlockT> GossipEngine<B> {
B: 'static,
{
let protocol = protocol.into();
let network_event_stream = network.event_stream();
let network_event_stream = network.event_stream("network-gossip");
GossipEngine {
state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry),
@@ -162,7 +163,7 @@ impl<B: BlockT> GossipEngine<B> {
/// Note: this method isn't strictly related to gossiping and should eventually be moved
/// somewhere else.
pub fn announce(&self, block: B::Hash, associated_data: Option<Vec<u8>>) {
self.network.announce(block, associated_data);
self.network.announce_block(block, associated_data);
}
}
@@ -181,7 +182,10 @@ impl<B: BlockT> Future for GossipEngine<B> {
this.network.add_set_reserved(remote, this.protocol.clone());
},
Event::SyncDisconnected { remote } => {
this.network.remove_set_reserved(remote, this.protocol.clone());
this.network.remove_peers_from_reserved_set(
this.protocol.clone(),
vec![remote],
);
},
Event::NotificationStreamOpened { remote, protocol, role, .. } => {
if protocol != this.protocol {
@@ -304,7 +308,7 @@ impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{ValidationResult, ValidatorContext};
use crate::{multiaddr::Multiaddr, ValidationResult, ValidatorContext};
use async_std::task::spawn;
use futures::{
channel::mpsc::{unbounded, UnboundedSender},
@@ -312,10 +316,20 @@ mod tests {
future::poll_fn,
};
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sc_network::ObservedRole;
use sp_runtime::{testing::H256, traits::Block as BlockT};
use sc_network_common::{
protocol::event::ObservedRole,
service::{
NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NotificationSender, NotificationSenderError,
},
};
use sp_runtime::{
testing::H256,
traits::{Block as BlockT, NumberFor},
};
use std::{
borrow::Cow,
collections::HashSet,
sync::{Arc, Mutex},
};
use substrate_test_runtime_client::runtime::Block;
@@ -330,29 +344,119 @@ mod tests {
event_senders: Vec<UnboundedSender<Event>>,
}
impl<B: BlockT> Network<B> for TestNetwork {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
impl NetworkPeers for TestNetwork {
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
unimplemented!();
}
fn set_authorized_only(&self, _reserved_only: bool) {
unimplemented!();
}
fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
unimplemented!();
}
fn report_peer(&self, _who: PeerId, _cost_benefit: ReputationChange) {}
fn disconnect_peer(&self, _who: PeerId, _protocol: Cow<'static, str>) {
unimplemented!();
}
fn accept_unreserved_peers(&self) {
unimplemented!();
}
fn deny_unreserved_peers(&self) {
unimplemented!();
}
fn add_reserved_peer(&self, _peer: String) -> Result<(), String> {
unimplemented!();
}
fn remove_reserved_peer(&self, _peer_id: PeerId) {
unimplemented!();
}
fn set_reserved_peers(
&self,
_protocol: Cow<'static, str>,
_peers: HashSet<Multiaddr>,
) -> Result<(), String> {
unimplemented!();
}
fn add_peers_to_reserved_set(
&self,
_protocol: Cow<'static, str>,
_peers: HashSet<Multiaddr>,
) -> Result<(), String> {
unimplemented!();
}
fn remove_peers_from_reserved_set(
&self,
_protocol: Cow<'static, str>,
_peers: Vec<PeerId>,
) {
}
fn add_to_peers_set(
&self,
_protocol: Cow<'static, str>,
_peers: HashSet<Multiaddr>,
) -> Result<(), String> {
unimplemented!();
}
fn remove_from_peers_set(&self, _protocol: Cow<'static, str>, _peers: Vec<PeerId>) {
unimplemented!();
}
fn sync_num_connected(&self) -> usize {
unimplemented!();
}
}
impl NetworkEventStream for TestNetwork {
fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
let (tx, rx) = unbounded();
self.inner.lock().unwrap().event_senders.push(tx);
Box::pin(rx)
}
}
fn report_peer(&self, _: PeerId, _: ReputationChange) {}
fn disconnect_peer(&self, _: PeerId, _: Cow<'static, str>) {
impl NetworkNotification for TestNetwork {
fn write_notification(
&self,
_target: PeerId,
_protocol: Cow<'static, str>,
_message: Vec<u8>,
) {
unimplemented!();
}
fn add_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {}
fn notification_sender(
&self,
_target: PeerId,
_protocol: Cow<'static, str>,
) -> Result<Box<dyn NotificationSender>, NotificationSenderError> {
unimplemented!();
}
}
fn remove_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {}
fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec<u8>) {
impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestNetwork {
fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
unimplemented!();
}
fn announce(&self, _: B::Hash, _: Option<Vec<u8>>) {
fn new_best_block_imported(
&self,
_hash: <Block as BlockT>::Hash,
_number: NumberFor<Block>,
) {
unimplemented!();
}
}
+21 -60
View File
@@ -41,9 +41,9 @@
//! messages.
//!
//! The [`GossipEngine`] will automatically use [`Network::add_set_reserved`] and
//! [`Network::remove_set_reserved`] to maintain a set of peers equal to the set of peers the
//! node is syncing from. See the documentation of `sc-network` for more explanations about the
//! concepts of peer sets.
//! [`NetworkPeers::remove_peers_from_reserved_set`] to maintain a set of peers equal to the set of
//! peers the node is syncing from. See the documentation of `sc-network` for more explanations
//! about the concepts of peer sets.
//!
//! # What is a validator?
//!
@@ -67,74 +67,35 @@ pub use self::{
validator::{DiscardAll, MessageIntent, ValidationResult, Validator, ValidatorContext},
};
use futures::prelude::*;
use sc_network::{multiaddr, Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::traits::Block as BlockT;
use std::{borrow::Cow, iter, pin::Pin, sync::Arc};
use sc_network::{multiaddr, PeerId};
use sc_network_common::service::{
NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{borrow::Cow, iter};
mod bridge;
mod state_machine;
mod validator;
/// Abstraction over a network.
pub trait Network<B: BlockT> {
/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>>;
/// Adjust the reputation of a node.
fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange);
/// Adds the peer to the set of peers to be connected to with this protocol.
fn add_set_reserved(&self, who: PeerId, protocol: Cow<'static, str>);
/// Removes the peer from the set of peers to be connected to with this protocol.
fn remove_set_reserved(&self, who: PeerId, protocol: Cow<'static, str>);
/// Force-disconnect a peer.
fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>);
/// Send a notification to a peer.
fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec<u8>);
/// Notify everyone we're connected to that we have the given block.
///
/// Note: this method isn't strictly related to gossiping and should eventually be moved
/// somewhere else.
fn announce(&self, block: B::Hash, associated_data: Option<Vec<u8>>);
}
impl<B: BlockT, H: ExHashT> Network<B> for Arc<NetworkService<B, H>> {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
Box::pin(NetworkService::event_stream(self, "network-gossip"))
}
fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {
NetworkService::report_peer(self, peer_id, reputation);
}
pub trait Network<B: BlockT>:
NetworkPeers + NetworkEventStream + NetworkNotification + NetworkBlock<B::Hash, NumberFor<B>>
{
fn add_set_reserved(&self, who: PeerId, protocol: Cow<'static, str>) {
let addr =
iter::once(multiaddr::Protocol::P2p(who.into())).collect::<multiaddr::Multiaddr>();
let result =
NetworkService::add_peers_to_reserved_set(self, protocol, iter::once(addr).collect());
let result = self.add_peers_to_reserved_set(protocol, iter::once(addr).collect());
if let Err(err) = result {
log::error!(target: "gossip", "add_set_reserved failed: {}", err);
}
}
fn remove_set_reserved(&self, who: PeerId, protocol: Cow<'static, str>) {
NetworkService::remove_peers_from_reserved_set(self, protocol, iter::once(who).collect());
}
fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>) {
NetworkService::disconnect_peer(self, who, protocol)
}
fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec<u8>) {
NetworkService::write_notification(self, who, protocol, message)
}
fn announce(&self, block: B::Hash, associated_data: Option<Vec<u8>>) {
NetworkService::announce_block(self, block, associated_data)
}
}
impl<T, B: BlockT> Network<B> for T where
T: NetworkPeers
+ NetworkEventStream
+ NetworkNotification
+ NetworkBlock<B::Hash, NumberFor<B>>
{
}
@@ -22,7 +22,7 @@ use ahash::AHashSet;
use libp2p::PeerId;
use lru::LruCache;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::ObservedRole;
use sc_network_common::protocol::event::ObservedRole;
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use std::{borrow::Cow, collections::HashMap, iter, sync::Arc, time, time::Instant};
@@ -511,11 +511,23 @@ impl Metrics {
#[cfg(test)]
mod tests {
use super::*;
use crate::multiaddr::Multiaddr;
use futures::prelude::*;
use sc_network::{Event, ReputationChange};
use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper, H256};
use sc_network::ReputationChange;
use sc_network_common::{
protocol::event::Event,
service::{
NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NotificationSender, NotificationSenderError,
},
};
use sp_runtime::{
testing::{Block as RawBlock, ExtrinsicWrapper, H256},
traits::NumberFor,
};
use std::{
borrow::Cow,
collections::HashSet,
pin::Pin,
sync::{Arc, Mutex},
};
@@ -569,28 +581,118 @@ mod tests {
peer_reports: Vec<(PeerId, ReputationChange)>,
}
impl<B: BlockT> Network<B> for NoOpNetwork {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
impl NetworkPeers for NoOpNetwork {
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
unimplemented!();
}
fn report_peer(&self, peer_id: PeerId, reputation_change: ReputationChange) {
self.inner.lock().unwrap().peer_reports.push((peer_id, reputation_change));
}
fn disconnect_peer(&self, _: PeerId, _: Cow<'static, str>) {
fn set_authorized_only(&self, _reserved_only: bool) {
unimplemented!();
}
fn add_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {}
fn remove_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {}
fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec<u8>) {
fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
unimplemented!();
}
fn announce(&self, _: B::Hash, _: Option<Vec<u8>>) {
fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
self.inner.lock().unwrap().peer_reports.push((who, cost_benefit));
}
fn disconnect_peer(&self, _who: PeerId, _protocol: Cow<'static, str>) {
unimplemented!();
}
fn accept_unreserved_peers(&self) {
unimplemented!();
}
fn deny_unreserved_peers(&self) {
unimplemented!();
}
fn add_reserved_peer(&self, _peer: String) -> Result<(), String> {
unimplemented!();
}
fn remove_reserved_peer(&self, _peer_id: PeerId) {
unimplemented!();
}
fn set_reserved_peers(
&self,
_protocol: Cow<'static, str>,
_peers: HashSet<Multiaddr>,
) -> Result<(), String> {
unimplemented!();
}
fn add_peers_to_reserved_set(
&self,
_protocol: Cow<'static, str>,
_peers: HashSet<Multiaddr>,
) -> Result<(), String> {
unimplemented!();
}
fn remove_peers_from_reserved_set(
&self,
_protocol: Cow<'static, str>,
_peers: Vec<PeerId>,
) {
}
fn add_to_peers_set(
&self,
_protocol: Cow<'static, str>,
_peers: HashSet<Multiaddr>,
) -> Result<(), String> {
unimplemented!();
}
fn remove_from_peers_set(&self, _protocol: Cow<'static, str>, _peers: Vec<PeerId>) {
unimplemented!();
}
fn sync_num_connected(&self) -> usize {
unimplemented!();
}
}
impl NetworkEventStream for NoOpNetwork {
fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
unimplemented!();
}
}
impl NetworkNotification for NoOpNetwork {
fn write_notification(
&self,
_target: PeerId,
_protocol: Cow<'static, str>,
_message: Vec<u8>,
) {
unimplemented!();
}
fn notification_sender(
&self,
_target: PeerId,
_protocol: Cow<'static, str>,
) -> Result<Box<dyn NotificationSender>, NotificationSenderError> {
unimplemented!();
}
}
impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for NoOpNetwork {
fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
unimplemented!();
}
fn new_best_block_imported(
&self,
_hash: <Block as BlockT>::Hash,
_number: NumberFor<Block>,
) {
unimplemented!();
}
}
@@ -16,7 +16,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use sc_network::{ObservedRole, PeerId};
use sc_network::PeerId;
use sc_network_common::protocol::event::ObservedRole;
use sp_runtime::traits::Block as BlockT;
/// Validates consensus messages.