Some tweaks in the network crate (#1108)

* Move Roles to network::config

* Make network::config public

* Move NetworkConfig and NonReservedMode to config

* Move Params to config

* Move node_id() to NetworkManager and fix tests

* Rename Specialization to NetworkSpecialization
This commit is contained in:
Pierre Krieger
2018-11-14 11:25:16 +01:00
committed by Gav Wood
parent 6ad88fc623
commit 74e907f3cb
16 changed files with 117 additions and 109 deletions
+53 -2
View File
@@ -14,9 +14,34 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
pub use service::Roles;
//! Configuration for the networking layer of Substrate.
/// Protocol configuration
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration};
use chain::Client;
use codec;
use on_demand::OnDemandService;
use runtime_primitives::traits::{Block as BlockT};
use service::{ExHashT, TransactionPool};
use std::sync::Arc;
/// Service initialization parameters.
pub struct Params<B: BlockT, S, H: ExHashT> {
/// Configuration.
pub config: ProtocolConfig,
/// Network layer configuration.
pub network_config: NetworkConfiguration,
/// Substrate relay chain access point.
pub chain: Arc<Client<B>>,
/// On-demand service reference.
pub on_demand: Option<Arc<OnDemandService<B>>>,
/// Transaction pool.
pub transaction_pool: Arc<TransactionPool<H, B>>,
/// Protocol specialization.
pub specialization: S,
}
/// Configuration for the Substrate-specific part of the networking layer.
#[derive(Clone)]
pub struct ProtocolConfig {
/// Assigned roles.
@@ -30,3 +55,29 @@ impl Default for ProtocolConfig {
}
}
}
bitflags! {
/// Bitmask of the roles that a node fulfills.
pub struct Roles: u8 {
/// No network.
const NONE = 0b00000000;
/// Full node, does not participate in consensus.
const FULL = 0b00000001;
/// Light client node.
const LIGHT = 0b00000010;
/// Act as an authority
const AUTHORITY = 0b00000100;
}
}
impl codec::Encode for Roles {
fn encode_to<T: codec::Output>(&self, dest: &mut T) {
dest.push_byte(self.bits())
}
}
impl codec::Decode for Roles {
fn decode<I: codec::Input>(input: &mut I) -> Option<Self> {
Self::from_bits(input.read_byte()?)
}
}
@@ -26,8 +26,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashF
use runtime_primitives::generic::BlockId;
use message::generic::{Message, ConsensusMessage};
use protocol::Context;
use service::Roles;
use specialization::Specialization;
use config::Roles;
use specialization::NetworkSpecialization;
use StatusMessage;
use generic_message;
@@ -262,7 +262,7 @@ where
}
}
impl<Block: BlockT> Specialization<Block> for ConsensusGossip<Block> where
impl<Block: BlockT> NetworkSpecialization<Block> for ConsensusGossip<Block> where
Block::Header: HeaderT<Number=u64>
{
fn status(&self) -> Vec<u8> {
+3 -4
View File
@@ -51,10 +51,10 @@ mod sync;
#[macro_use]
mod protocol;
mod io;
mod config;
mod chain;
mod blocks;
mod on_demand;
pub mod config;
pub mod import_queue;
pub mod consensus_gossip;
pub mod error;
@@ -65,13 +65,12 @@ pub mod specialization;
pub mod test;
pub use chain::Client as ClientHandle;
pub use service::{Service, FetchFuture, TransactionPool, Params, ManageNetwork, SyncProvider};
pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, SyncProvider};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeIndex, ProtocolId, Severity, Protocol};
pub use network_libp2p::{NodeIndex, ProtocolId, Severity, Protocol};
pub use message::{generic as generic_message, RequestId, Status as StatusMessage};
pub use error::Error;
pub use config::{Roles, ProtocolConfig};
pub use on_demand::{OnDemand, OnDemandService, RemoteResponse};
#[doc(hidden)]
pub use runtime_primitives::traits::Block as BlockT;
+1 -1
View File
@@ -125,7 +125,7 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use runtime_primitives::Justification;
use service::Roles;
use config::Roles;
use super::{
BlockAttributes, RemoteCallResponse, RemoteReadResponse,
RequestId, Transactions, Direction
+6 -4
View File
@@ -30,6 +30,7 @@ use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
use io::SyncIo;
use message;
use network_libp2p::{Severity, NodeIndex};
use config::Roles;
use service;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
@@ -41,7 +42,7 @@ const RETRY_COUNT: usize = 1;
/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
/// When new node is connected.
fn on_connect(&self, peer: NodeIndex, role: service::Roles, best_number: NumberFor<Block>);
fn on_connect(&self, peer: NodeIndex, role: Roles, best_number: NumberFor<Block>);
/// When block is announced by the peer.
fn on_block_announce(&self, peer: NodeIndex, best_number: NumberFor<Block>);
@@ -211,8 +212,8 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
fn on_connect(&self, peer: NodeIndex, role: service::Roles, best_number: NumberFor<B>) {
if !role.intersects(service::Roles::FULL | service::Roles::AUTHORITY) { // TODO: correct?
fn on_connect(&self, peer: NodeIndex, role: Roles, best_number: NumberFor<B>) {
if !role.intersects(Roles::FULL | Roles::AUTHORITY) { // TODO: correct?
return;
}
@@ -511,9 +512,10 @@ pub mod tests {
use client::{self, error::{ErrorKind as ClientErrorKind, Result as ClientResult}};
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest};
use config::Roles;
use message;
use network_libp2p::NodeIndex;
use service::{Roles, ExecuteInContext};
use service::ExecuteInContext;
use test::TestIo;
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
use test_client::runtime::{changes_trie_config, Block, Header};
+6 -6
View File
@@ -27,11 +27,11 @@ use codec::{Encode, Decode};
use message::{self, Message};
use message::generic::Message as GenericMessage;
use specialization::Specialization;
use specialization::NetworkSpecialization;
use sync::{ChainSync, Status as SyncStatus, SyncState};
use service::{Roles, TransactionPool, ExHashT};
use service::{TransactionPool, ExHashT};
use import_queue::ImportQueue;
use config::ProtocolConfig;
use config::{ProtocolConfig, Roles};
use chain::Client;
use on_demand::OnDemandService;
use io::SyncIo;
@@ -50,7 +50,7 @@ const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: Specialization<B>, H: ExHashT> {
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
config: ProtocolConfig,
on_demand: Option<Arc<OnDemandService<B>>>,
genesis_hash: B::Hash,
@@ -184,7 +184,7 @@ pub(crate) struct ContextData<B: BlockT, H: ExHashT> {
pub chain: Arc<Client<B>>,
}
impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new<I: 'static + ImportQueue<B>>(
config: ProtocolConfig,
@@ -761,7 +761,7 @@ macro_rules! construct_simple_protocol {
}
}
impl $crate::specialization::Specialization<$block> for $protocol {
impl $crate::specialization::NetworkSpecialization<$block> for $protocol {
fn status(&self) -> Vec<u8> {
$(
let status = self.$status_protocol_name.status();
+26 -70
View File
@@ -25,11 +25,9 @@ use network_libp2p::{start_service, Service as NetworkService, ServiceEvent as N
use network_libp2p::{RegisteredProtocol, parse_str_addr, Protocol as Libp2pProtocol};
use io::NetSyncIo;
use protocol::{self, Protocol, ProtocolContext, Context, ProtocolStatus};
use config::{ProtocolConfig};
use config::Params;
use error::Error;
use chain::Client;
use specialization::Specialization;
use on_demand::OnDemandService;
use specialization::NetworkSpecialization;
use import_queue::ImportQueue;
use runtime_primitives::traits::{Block as BlockT};
use tokio::{runtime::Runtime, timer::Interval};
@@ -40,38 +38,10 @@ pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
const TICK_TIMEOUT: Duration = Duration::from_millis(1000);
const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000);
bitflags! {
/// Node roles bitmask.
pub struct Roles: u8 {
/// No network.
const NONE = 0b00000000;
/// Full node, does not participate in consensus.
const FULL = 0b00000001;
/// Light client node.
const LIGHT = 0b00000010;
/// Act as an authority
const AUTHORITY = 0b00000100;
}
}
impl ::codec::Encode for Roles {
fn encode_to<T: ::codec::Output>(&self, dest: &mut T) {
dest.push_byte(self.bits())
}
}
impl ::codec::Decode for Roles {
fn decode<I: ::codec::Input>(input: &mut I) -> Option<Self> {
Self::from_bits(input.read_byte()?)
}
}
/// Sync status
pub trait SyncProvider<B: BlockT>: Send + Sync {
/// Get sync status
fn status(&self) -> ProtocolStatus<B>;
/// Get this node id if available.
fn node_id(&self) -> Option<String>;
}
pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {}
@@ -93,24 +63,8 @@ pub trait ExecuteInContext<B: BlockT>: Send + Sync {
fn execute_in_context<F: Fn(&mut Context<B>)>(&self, closure: F);
}
/// Service initialization parameters.
pub struct Params<B: BlockT, S, H: ExHashT> {
/// Configuration.
pub config: ProtocolConfig,
/// Network layer configuration.
pub network_config: NetworkConfiguration,
/// Substrate relay chain access point.
pub chain: Arc<Client<B>>,
/// On-demand service reference.
pub on_demand: Option<Arc<OnDemandService<B>>>,
/// Transaction pool.
pub transaction_pool: Arc<TransactionPool<H, B>>,
/// Protocol specialization.
pub specialization: S,
}
/// Substrate network service. Handles network IO and manages connectivity.
pub struct Service<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> {
pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> {
/// Network service
network: Arc<Mutex<NetworkService>>,
/// Protocol handler
@@ -123,7 +77,7 @@ pub struct Service<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> {
bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>,
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Service<B, S, H> {
/// Creates and register protocol with the network service
pub fn new<I: 'static + ImportQueue<B>>(
params: Params<B, S, H>,
@@ -178,13 +132,13 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ::consensus::SyncOracle for Service<B, S, H> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> ::consensus::SyncOracle for Service<B, S, H> {
fn is_major_syncing(&self) -> bool {
self.handler.sync().read().status().is_major_syncing()
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H:ExHashT> Drop for Service<B, S, H> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H:ExHashT> Drop for Service<B, S, H> {
fn drop(&mut self) {
self.handler.stop();
if let Some((sender, join)) = self.bg_thread.take() {
@@ -196,30 +150,17 @@ impl<B: BlockT + 'static, S: Specialization<B>, H:ExHashT> Drop for Service<B, S
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ExecuteInContext<B> for Service<B, S, H> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> ExecuteInContext<B> for Service<B, S, H> {
fn execute_in_context<F: Fn(&mut ::protocol::Context<B>)>(&self, closure: F) {
closure(&mut ProtocolContext::new(self.handler.context_data(), &mut NetSyncIo::new(&self.network, self.protocol_id)))
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> SyncProvider<B> for Service<B, S, H> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> SyncProvider<B> for Service<B, S, H> {
/// Get sync status
fn status(&self) -> ProtocolStatus<B> {
self.handler.status()
}
fn node_id(&self) -> Option<String> {
let network = self.network.lock();
let ret = network
.listeners()
.next()
.map(|addr| {
let mut addr = addr.clone();
addr.append(Libp2pProtocol::P2p(network.peer_id().clone().into()));
addr.to_string()
});
ret
}
}
/// Trait for managing network
@@ -232,9 +173,11 @@ pub trait ManageNetwork: Send + Sync {
fn remove_reserved_peer(&self, peer: PeerId);
/// Add reserved peer
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
/// Returns a user-friendly identifier of our node.
fn node_id(&self) -> Option<String>;
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ManageNetwork for Service<B, S, H> {
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> ManageNetwork for Service<B, S, H> {
fn accept_unreserved_peers(&self) {
self.network.lock().accept_unreserved_peers();
}
@@ -264,10 +207,23 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ManageNetwork for Se
self.network.lock().add_reserved_peer(addr, peer_id);
Ok(())
}
fn node_id(&self) -> Option<String> {
let network = self.network.lock();
let ret = network
.listeners()
.next()
.map(|addr| {
let mut addr = addr.clone();
addr.append(Libp2pProtocol::P2p(network.peer_id().clone().into()));
addr.to_string()
});
ret
}
}
/// Starts the background thread that handles the networking.
fn start_thread<B: BlockT + 'static, S: Specialization<B>, H: ExHashT>(
fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
config: NetworkConfiguration,
protocol: Arc<Protocol<B, S, H>>,
registered: RegisteredProtocol,
@@ -308,7 +264,7 @@ fn start_thread<B: BlockT + 'static, S: Specialization<B>, H: ExHashT>(
}
/// Runs the background thread that handles the networking.
fn run_thread<B: BlockT + 'static, S: Specialization<B>, H: ExHashT>(
fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
network_service: Arc<Mutex<NetworkService>>,
protocol: Arc<Protocol<B, S, H>>,
protocol_id: ProtocolId,
+1 -1
View File
@@ -21,7 +21,7 @@ use runtime_primitives::traits::Block as BlockT;
use protocol::Context;
/// A specialization of the substrate network protocol. Handles events and sends messages.
pub trait Specialization<B: BlockT>: Send + Sync + 'static {
pub trait NetworkSpecialization<B: BlockT>: Send + Sync + 'static {
/// Get the current specialization-status.
fn status(&self) -> Vec<u8>;
+1 -1
View File
@@ -25,7 +25,7 @@ use blocks::{self, BlockCollection};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor};
use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
use service::Roles;
use config::Roles;
use import_queue::ImportQueue;
// Maximum blocks to request in a single packet.
+2 -2
View File
@@ -35,7 +35,7 @@ use keyring::Keyring;
use codec::Encode;
use import_queue::{SyncImportQueue, PassThroughVerifier, Verifier};
use consensus::BlockOrigin;
use specialization::Specialization;
use specialization::NetworkSpecialization;
use consensus_gossip::ConsensusGossip;
use import_queue::ImportQueue;
use service::ExecuteInContext;
@@ -62,7 +62,7 @@ pub struct DummySpecialization {
pub gossip: ConsensusGossip<Block>,
}
impl Specialization<Block> for DummySpecialization {
impl NetworkSpecialization<Block> for DummySpecialization {
fn status(&self) -> Vec<u8> { vec![] }
fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: NodeIndex, status: ::message::Status<Block>) {
+1 -1
View File
@@ -16,9 +16,9 @@
use client::backend::Backend;
use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
use config::Roles;
use consensus::BlockOrigin;
use sync::SyncState;
use Roles;
use super::*;
#[test]