Extract consensus_gossip.rs and put it in its own crate (#4284)

* Extract gossiping system from network

* Finish porting GRANDPA tests

* Try put correct engine ID

* Fix messages encoding

* Fix communication tests

* Use a threads pool to spawn stuff

* Fix compilation everywhere

* Fix bad merge conflict

* Remove dependency on async-std

* Apply suggestions from code review

Co-Authored-By: Robert Habermeier <rphmeier@gmail.com>

* More suggestions

* Remove network startup GP future

* Update to futures_timer

* adjust wait_when_behind test

* Pass correct Roles after handshake

* Revert "adjust wait_when_behind test"

This reverts commit 23cb3a0a6d25ed732c2cd648607bc44ef2ab0919.

* Crate root documentation

* Remove MessageRecipient

* Address concerns

* Fix more concerns

* Forgot Cargo.lock
This commit is contained in:
Pierre Krieger
2019-12-13 19:16:10 +01:00
committed by Ashley
parent 21cbd80f8c
commit c66c191b68
24 changed files with 1087 additions and 624 deletions
+32 -2
View File
@@ -1780,6 +1780,11 @@ dependencies = [
"scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "hashbrown"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "hashbrown"
version = "0.6.3"
@@ -2775,6 +2780,14 @@ dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lru"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lru"
version = "0.4.3"
@@ -5302,6 +5315,7 @@ dependencies = [
"fork-tree 2.0.0",
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-scale-codec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -5310,6 +5324,7 @@ dependencies = [
"sc-client-api 2.0.0",
"sc-keystore 2.0.0",
"sc-network 2.0.0",
"sc-network-gossip 2.0.0",
"sc-network-test 2.0.0",
"sc-telemetry 2.0.0",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -5327,8 +5342,6 @@ dependencies = [
"substrate-test-runtime-client 2.0.0",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -5398,6 +5411,21 @@ dependencies = [
"zeroize 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sc-network-gossip"
version = "2.0.0"
dependencies = [
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-timer 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"lru 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sc-network 2.0.0",
"sp-runtime 2.0.0",
]
[[package]]
name = "sc-network-test"
version = "2.0.0"
@@ -8233,6 +8261,7 @@ dependencies = [
"checksum hash-db 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d23bd4e7b5eda0d0f3a307e8b381fdc8ba9000f26fbe912250c0a4cc3956364a"
"checksum hash256-std-hasher 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)" = "92c171d55b98633f4ed3860808f004099b36c1cc29c42cfc53aa8591b21efcf2"
"checksum hashbrown 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3bae29b6653b3412c2e71e9d486db9f9df5d701941d86683005efb9f2d28e3da"
"checksum hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1de41fb8dba9714efd92241565cdff73f78508c95697dd56787d3cba27e2353"
"checksum hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead"
"checksum heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1679e6ea370dee694f91f1dc469bf94cf8f52051d147aec3e1f9497c6fc22461"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
@@ -8317,6 +8346,7 @@ dependencies = [
"checksum lock_api 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e57b3997725d2b60dbec1297f6c2e2957cc383db1cebd6be812163f969c7d586"
"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
"checksum lru 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "5d8f669d42c72d18514dfca8115689c5f6370a17d980cb5bd777a67f404594c8"
"checksum lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0609345ddee5badacf857d4f547e0e5a2e987db77085c24cd887f73573a04237"
"checksum mach 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "86dd2487cdfea56def77b88438a2c915fb45113c5319bfe7e14306ca4cd0b0e1"
"checksum malloc_size_of_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e37c5d4cd9473c5f4c9c111f033f15d4df9bd378fdf615944e360a4f55a05f0b"
+1
View File
@@ -33,6 +33,7 @@ members = [
"client/keystore",
"client/network",
"client/network/test",
"client/network-gossip",
"client/offchain",
"client/peerset",
"client/rpc-servers",
@@ -158,6 +158,7 @@ pub fn new_full<C: Send + Default + 'static>(config: Configuration<C, GenesisCon
grandpa_link,
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
},
(true, false) => {
@@ -170,6 +171,7 @@ pub fn new_full<C: Send + Default + 'static>(config: Configuration<C, GenesisCon
on_exit: service.on_exit(),
telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
executor: service.spawn_task_handle(),
};
// the GRANDPA voter task is considered infallible, i.e.
+2
View File
@@ -231,6 +231,7 @@ macro_rules! new_full {
grandpa_link,
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
},
(true, false) => {
@@ -243,6 +244,7 @@ macro_rules! new_full {
on_exit: service.on_exit(),
telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
executor: service.spawn_task_handle(),
};
// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
+2 -2
View File
@@ -8,10 +8,9 @@ edition = "2018"
fork-tree = { path = "../../utils/fork-tree" }
futures = "0.1.29"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures-timer = "2.0.2"
log = "0.4.8"
parking_lot = "0.9.0"
tokio-executor = "0.1.8"
tokio-timer = "0.2.11"
rand = "0.7.2"
codec = { package = "parity-scale-codec", version = "1.0.0", features = ["derive"] }
sp-runtime = { path = "../../primitives/runtime" }
@@ -25,6 +24,7 @@ client = { package = "sc-client", path = "../" }
inherents = { package = "sp-inherents", path = "../../primitives/inherents" }
sp-blockchain = { path = "../../primitives/blockchain" }
network = { package = "sc-network", path = "../network" }
network-gossip = { package = "sc-network-gossip", path = "../network-gossip" }
sp-finality-tracker = { path = "../../primitives/finality-tracker" }
fg_primitives = { package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" }
grandpa = { package = "finality-grandpa", version = "0.10.1", features = ["derive-codec"] }
@@ -83,7 +83,7 @@
//! We only send polite messages to peers,
use sp_runtime::traits::{NumberFor, Block as BlockT, Zero};
use network::consensus_gossip::{self as network_gossip, MessageIntent, ValidatorContext};
use network_gossip::{GossipEngine, MessageIntent, ValidatorContext};
use network::{config::Roles, PeerId, ReputationChange};
use codec::{Encode, Decode};
use fg_primitives::AuthorityId;
@@ -1459,29 +1459,26 @@ pub(super) struct ReportStream {
impl ReportStream {
/// Consume the report stream, converting it into a future that
/// handles all reports.
pub(super) fn consume<B, N>(self, net: N)
pub(super) fn consume<B>(self, net: GossipEngine<B>)
-> impl Future<Item=(),Error=()> + Send + 'static
where
B: BlockT,
N: super::Network<B> + Send + 'static,
{
ReportingTask {
reports: self.reports,
net,
_marker: Default::default(),
}
}
}
/// A future for reporting peers.
#[must_use = "Futures do nothing unless polled"]
struct ReportingTask<B, N> {
struct ReportingTask<B: BlockT> {
reports: mpsc::UnboundedReceiver<PeerReport>,
net: N,
_marker: std::marker::PhantomData<B>,
net: GossipEngine<B>,
}
impl<B: BlockT, N: super::Network<B>> Future for ReportingTask<B, N> {
impl<B: BlockT> Future for ReportingTask<B> {
type Item = ();
type Error = ();
@@ -29,19 +29,17 @@
use std::sync::Arc;
use futures::prelude::*;
use futures::sync::{oneshot, mpsc};
use futures03::stream::{StreamExt, TryStreamExt};
use futures::{prelude::*, future::Executor as _, sync::mpsc};
use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, future::TryFutureExt as _};
use grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use grandpa::{voter, voter_set::VoterSet};
use log::{debug, trace};
use network::{consensus_gossip as network_gossip, NetworkService, ReputationChange};
use network_gossip::ConsensusMessage;
use network::ReputationChange;
use network_gossip::{GossipEngine, Network};
use codec::{Encode, Decode};
use primitives::Pair;
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
use sc_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO};
use tokio_executor::Executor;
use crate::{
CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error,
@@ -97,50 +95,6 @@ mod benefit {
pub(super) const PER_EQUIVOCATION: i32 = 10;
}
/// A handle to the network. This is generally implemented by providing some
/// handle to a gossip service or similar.
///
/// Intended to be a lightweight handle such as an `Arc`.
pub trait Network<Block: BlockT>: Clone + Send + 'static {
/// A stream of input messages for a topic.
type In: Stream<Item = network_gossip::TopicNotification, Error = ()>;
/// Get a stream of messages for a specific gossip topic.
fn messages_for(&self, topic: Block::Hash) -> Self::In;
/// Register a gossip validator.
fn register_validator(&self, validator: Arc<dyn network_gossip::Validator<Block>>);
/// Gossip a message out to all connected peers.
///
/// Force causes it to be sent to all peers, even if they've seen it already.
/// Only should be used in case of consensus stall.
fn gossip_message(&self, topic: Block::Hash, data: Vec<u8>, force: bool);
/// Register a message with the gossip service, it isn't broadcast right
/// away to any peers, but may be sent to new peers joining or when asked to
/// broadcast the topic. Useful to register previous messages on node
/// startup.
fn register_gossip_message(&self, topic: Block::Hash, data: Vec<u8>);
/// Send a message to a bunch of specific peers, even if they've seen it already.
fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>);
/// Report a peer's cost or benefit after some action.
fn report(&self, who: network::PeerId, cost_benefit: ReputationChange);
/// Inform peers that a block with given hash should be downloaded.
fn announce(&self, block: Block::Hash, associated_data: Vec<u8>);
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}
/// Create a unique topic for a round and set-id combo.
pub(crate) fn round_topic<B: BlockT>(round: RoundNumber, set_id: SetIdNumber) -> B::Hash {
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes())
@@ -151,157 +105,32 @@ pub(crate) fn global_topic<B: BlockT>(set_id: SetIdNumber) -> B::Hash {
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes())
}
impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
B: BlockT,
S: network::specialization::NetworkSpecialization<B>,
H: network::ExHashT,
{
type In = NetworkStream<
Box<dyn Stream<Item = network_gossip::TopicNotification, Error = ()> + Send + 'static>,
>;
fn messages_for(&self, topic: B::Hash) -> Self::In {
// Given that one can only communicate with the Substrate network via the `NetworkService` via message-passing,
// and given that methods on the network consensus gossip are not exposed but only reachable by passing a
// closure into `with_gossip` on the `NetworkService` this function needs to make use of the `NetworkStream`
// construction.
//
// We create a oneshot channel and pass the sender within a closure to the network. At some point in the future
// the network passes the message channel back through the oneshot channel. But the consumer of this function
// expects a stream, not a stream within a oneshot. This complexity is abstracted within `NetworkStream`,
// waiting for the oneshot to resolve and from there on acting like a normal message channel.
let (tx, rx) = oneshot::channel();
self.with_gossip(move |gossip, _| {
let inner_rx: Box<dyn Stream<Item = _, Error = ()> + Send> = Box::new(gossip
.messages_for(GRANDPA_ENGINE_ID, topic)
.map(|x| Ok(x))
.compat()
);
let _ = tx.send(inner_rx);
});
NetworkStream::PollingOneshot(rx)
}
fn register_validator(&self, validator: Arc<dyn network_gossip::Validator<B>>) {
self.with_gossip(
move |gossip, context| gossip.register_validator(context, GRANDPA_ENGINE_ID, validator)
)
}
fn gossip_message(&self, topic: B::Hash, data: Vec<u8>, force: bool) {
let msg = ConsensusMessage {
engine_id: GRANDPA_ENGINE_ID,
data,
};
self.with_gossip(
move |gossip, ctx| gossip.multicast(ctx, topic, msg, force)
)
}
fn register_gossip_message(&self, topic: B::Hash, data: Vec<u8>) {
let msg = ConsensusMessage {
engine_id: GRANDPA_ENGINE_ID,
data,
};
self.with_gossip(move |gossip, _| gossip.register_message(topic, msg))
}
fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>) {
let msg = ConsensusMessage {
engine_id: GRANDPA_ENGINE_ID,
data,
};
self.with_gossip(move |gossip, ctx| for who in &who {
gossip.send_message(ctx, who, msg.clone())
})
}
fn report(&self, who: network::PeerId, cost_benefit: ReputationChange) {
self.report_peer(who, cost_benefit)
}
fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
self.announce_block(block, associated_data)
}
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
NetworkService::set_sync_fork_request(self, peers, hash, number)
}
}
/// A stream used by NetworkBridge in its implementation of Network. Given a oneshot that eventually returns a channel
/// which eventually returns messages, instead of:
///
/// 1. polling the oneshot until it returns a message channel
///
/// 2. polling the message channel for messages
///
/// `NetworkStream` combines the two steps into one, requiring a consumer to only poll `NetworkStream` to retrieve
/// messages directly.
pub enum NetworkStream<R> {
PollingOneshot(oneshot::Receiver<R>),
PollingTopicNotifications(R),
}
impl<R> Stream for NetworkStream<R>
where
R: Stream<Item = network_gossip::TopicNotification, Error = ()>,
{
type Item = R::Item;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
NetworkStream::PollingOneshot(oneshot) => {
match oneshot.poll() {
Ok(futures::Async::Ready(mut stream)) => {
let poll_result = stream.poll();
*self = NetworkStream::PollingTopicNotifications(stream);
poll_result
},
Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
Err(_) => Err(())
}
},
NetworkStream::PollingTopicNotifications(stream) => {
stream.poll()
},
}
}
}
/// Bridge between the underlying network service, gossiping consensus messages and Grandpa
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
pub(crate) struct NetworkBridge<B: BlockT> {
gossip_engine: GossipEngine<B>,
validator: Arc<GossipValidator<B>>,
neighbor_sender: periodic::NeighborPacketSender<B>,
}
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
impl<B: BlockT> NetworkBridge<B> {
/// Create a new NetworkBridge to the given NetworkService. Returns the service
/// handle and a future that must be polled to completion to finish startup.
/// handle.
/// On creation it will register previous rounds' votes with the gossip
/// service taken from the VoterSetState.
pub(crate) fn new(
pub(crate) fn new<N: Network<B> + Clone + Send + 'static>(
service: N,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
executor: &impl futures03::task::Spawn,
on_exit: impl futures03::Future<Output = ()> + Clone + Send + Unpin + 'static,
) -> (
Self,
impl Future<Item = (), Error = ()> + Send + 'static,
) {
) -> Self {
let (validator, report_stream) = GossipValidator::new(
config,
set_state.clone(),
);
let validator = Arc::new(validator);
service.register_validator(validator.clone());
let gossip_engine = GossipEngine::new(service, executor, GRANDPA_ENGINE_ID, validator.clone());
{
// register all previous votes with the gossip service so that they're
@@ -325,7 +154,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
);
service.register_gossip_message(
gossip_engine.register_gossip_message(
topic,
message.encode(),
);
@@ -341,34 +170,18 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
}
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone());
let reporting_job = report_stream.consume(service.clone());
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone());
let reporting_job = report_stream.consume(gossip_engine.clone());
let bridge = NetworkBridge { service, validator, neighbor_sender };
let bridge = NetworkBridge { gossip_engine, validator, neighbor_sender };
let startup_work = futures::future::lazy(move || {
// lazily spawn these jobs onto their own tasks. the lazy future has access
// to tokio globals, which aren't available outside.
let mut executor = tokio_executor::DefaultExecutor::current();
let executor = Compat::new(executor);
executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
.expect("failed to spawn grandpa rebroadcast job task");
executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
.expect("failed to spawn grandpa reporting job task");
use futures03::{FutureExt, TryFutureExt};
let rebroadcast_job = rebroadcast_job
.select(on_exit.clone().map(Ok).compat())
.then(|_| Ok(()));
let reporting_job = reporting_job
.select(on_exit.clone().map(Ok).compat())
.then(|_| Ok(()));
executor.spawn(Box::new(rebroadcast_job))
.expect("failed to spawn grandpa rebroadcast job task");
executor.spawn(Box::new(reporting_job))
.expect("failed to spawn grandpa reporting job task");
Ok(())
});
(bridge, startup_work)
bridge
}
/// Note the beginning of a new round to the `GossipValidator`.
@@ -420,7 +233,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
});
let topic = round_topic::<B>(round.0, set_id.0);
let incoming = self.service.messages_for(topic)
let incoming = Compat::new(self.gossip_engine.messages_for(topic)
.map(|item| Ok::<_, ()>(item)))
.filter_map(|notification| {
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
if let Err(ref e) = decoded {
@@ -473,10 +287,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
let (tx, out_rx) = mpsc::unbounded();
let outgoing = OutgoingMessages::<B, N> {
let outgoing = OutgoingMessages::<B> {
round: round.0,
set_id: set_id.0,
network: self.service.clone(),
network: self.gossip_engine.clone(),
locals,
sender: tx,
has_voted,
@@ -510,7 +324,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);
let service = self.service.clone();
let service = self.gossip_engine.clone();
let topic = global_topic::<B>(set_id.0);
let incoming = incoming_global(
service,
@@ -520,8 +334,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
self.neighbor_sender.clone(),
);
let outgoing = CommitsOut::<B, N>::new(
self.service.clone(),
let outgoing = CommitsOut::<B>::new(
self.gossip_engine.clone(),
set_id.0,
is_voter,
self.validator.clone(),
@@ -543,12 +357,12 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
pub(crate) fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
self.service.set_sync_fork_request(peers, hash, number)
self.gossip_engine.set_sync_fork_request(peers, hash, number)
}
}
fn incoming_global<B: BlockT, N: Network<B>>(
mut service: N,
fn incoming_global<B: BlockT>(
mut gossip_engine: GossipEngine<B>,
topic: B::Hash,
voters: Arc<VoterSet<AuthorityId>>,
gossip_validator: Arc<GossipValidator<B>>,
@@ -557,7 +371,7 @@ fn incoming_global<B: BlockT, N: Network<B>>(
let process_commit = move |
msg: FullCommitMessage<B>,
mut notification: network_gossip::TopicNotification,
service: &mut N,
gossip_engine: &mut GossipEngine<B>,
gossip_validator: &Arc<GossipValidator<B>>,
voters: &VoterSet<AuthorityId>,
| {
@@ -579,7 +393,7 @@ fn incoming_global<B: BlockT, N: Network<B>>(
msg.set_id,
) {
if let Some(who) = notification.sender {
service.report(who, cost);
gossip_engine.report(who, cost);
}
return None;
@@ -589,7 +403,7 @@ fn incoming_global<B: BlockT, N: Network<B>>(
let commit = msg.message;
let finalized_number = commit.target_number;
let gossip_validator = gossip_validator.clone();
let service = service.clone();
let gossip_engine = gossip_engine.clone();
let neighbor_sender = neighbor_sender.clone();
let cb = move |outcome| match outcome {
voter::CommitProcessingOutcome::Good(_) => {
@@ -601,12 +415,12 @@ fn incoming_global<B: BlockT, N: Network<B>>(
|to, neighbor| neighbor_sender.send(to, neighbor),
);
service.gossip_message(topic, notification.message.clone(), false);
gossip_engine.gossip_message(topic, notification.message.clone(), false);
}
voter::CommitProcessingOutcome::Bad(_) => {
// report peer and do not gossip.
if let Some(who) = notification.sender.take() {
service.report(who, cost::INVALID_COMMIT);
gossip_engine.report(who, cost::INVALID_COMMIT);
}
}
};
@@ -619,12 +433,12 @@ fn incoming_global<B: BlockT, N: Network<B>>(
let process_catch_up = move |
msg: FullCatchUpMessage<B>,
mut notification: network_gossip::TopicNotification,
service: &mut N,
gossip_engine: &mut GossipEngine<B>,
gossip_validator: &Arc<GossipValidator<B>>,
voters: &VoterSet<AuthorityId>,
| {
let gossip_validator = gossip_validator.clone();
let service = service.clone();
let gossip_engine = gossip_engine.clone();
if let Err(cost) = check_catch_up::<B>(
&msg.message,
@@ -632,7 +446,7 @@ fn incoming_global<B: BlockT, N: Network<B>>(
msg.set_id,
) {
if let Some(who) = notification.sender {
service.report(who, cost);
gossip_engine.report(who, cost);
}
return None;
@@ -642,7 +456,7 @@ fn incoming_global<B: BlockT, N: Network<B>>(
if let voter::CatchUpProcessingOutcome::Bad(_) = outcome {
// report peer
if let Some(who) = notification.sender.take() {
service.report(who, cost::INVALID_CATCH_UP);
gossip_engine.report(who, cost::INVALID_CATCH_UP);
}
}
@@ -654,7 +468,8 @@ fn incoming_global<B: BlockT, N: Network<B>>(
Some(voter::CommunicationIn::CatchUp(msg.message, cb))
};
service.messages_for(topic)
Compat::new(gossip_engine.messages_for(topic)
.map(|m| Ok::<_, ()>(m)))
.filter_map(|notification| {
// this could be optimized by decoding piecewise.
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
@@ -666,9 +481,9 @@ fn incoming_global<B: BlockT, N: Network<B>>(
.filter_map(move |(notification, msg)| {
match msg {
GossipMessage::Commit(msg) =>
process_commit(msg, notification, &mut service, &gossip_validator, &*voters),
process_commit(msg, notification, &mut gossip_engine, &gossip_validator, &*voters),
GossipMessage::CatchUp(msg) =>
process_catch_up(msg, notification, &mut service, &gossip_validator, &*voters),
process_catch_up(msg, notification, &mut gossip_engine, &gossip_validator, &*voters),
_ => {
debug!(target: "afg", "Skipping unknown message type");
return None;
@@ -678,10 +493,10 @@ fn incoming_global<B: BlockT, N: Network<B>>(
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}
impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
impl<B: BlockT> Clone for NetworkBridge<B> {
fn clone(&self) -> Self {
NetworkBridge {
service: self.service.clone(),
gossip_engine: self.gossip_engine.clone(),
validator: Arc::clone(&self.validator),
neighbor_sender: self.neighbor_sender.clone(),
}
@@ -725,16 +540,16 @@ pub(crate) fn check_message_sig<Block: BlockT>(
/// use the same raw message and key to sign. This is currently true for
/// `ed25519` and `BLS` signatures (which we might use in the future), care must
/// be taken when switching to different key types.
struct OutgoingMessages<Block: BlockT, N: Network<Block>> {
struct OutgoingMessages<Block: BlockT> {
round: RoundNumber,
set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
network: N,
network: GossipEngine<Block>,
has_voted: HasVoted<Block>,
}
impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
impl<Block: BlockT> Sink for OutgoingMessages<Block>
{
type SinkItem = Message<Block>;
type SinkError = Error;
@@ -978,18 +793,18 @@ fn check_catch_up<Block: BlockT>(
}
/// An output sink for commit messages.
struct CommitsOut<Block: BlockT, N: Network<Block>> {
network: N,
struct CommitsOut<Block: BlockT> {
network: GossipEngine<Block>,
set_id: SetId,
is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>,
neighbor_sender: periodic::NeighborPacketSender<Block>,
}
impl<Block: BlockT, N: Network<Block>> CommitsOut<Block, N> {
impl<Block: BlockT> CommitsOut<Block> {
/// Create a new commit output stream.
pub(crate) fn new(
network: N,
network: GossipEngine<Block>,
set_id: SetIdNumber,
is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>,
@@ -1005,7 +820,7 @@ impl<Block: BlockT, N: Network<Block>> CommitsOut<Block, N> {
}
}
impl<Block: BlockT, N: Network<Block>> Sink for CommitsOut<Block, N> {
impl<Block: BlockT> Sink for CommitsOut<Block> {
type SinkItem = (RoundNumber, Commit<Block>);
type SinkError = Error;
@@ -21,12 +21,14 @@ use std::time::{Instant, Duration};
use codec::Encode;
use futures::prelude::*;
use futures::sync::mpsc;
use futures_timer::Delay;
use futures03::future::{FutureExt as _, TryFutureExt as _};
use log::{debug, warn};
use tokio_timer::Delay;
use network::PeerId;
use network_gossip::GossipEngine;
use sp_runtime::traits::{NumberFor, Block as BlockT};
use super::{gossip::{NeighborPacket, GossipMessage}, Network};
use super::gossip::{NeighborPacket, GossipMessage};
// how often to rebroadcast, if no other
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
@@ -58,16 +60,15 @@ impl<B: BlockT> NeighborPacketSender<B> {
///
/// It may rebroadcast the last neighbor packet periodically when no
/// progress is made.
pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (
pub(super) fn neighbor_packet_worker<B>(net: GossipEngine<B>) -> (
impl Future<Item = (), Error = ()> + Send + 'static,
NeighborPacketSender<B>,
) where
B: BlockT,
N: Network<B>,
{
let mut last = None;
let (tx, mut rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
let mut delay = Delay::new(rebroadcast_instant());
let mut delay = Delay::new(REBROADCAST_AFTER);
let work = futures::future::poll_fn(move || {
loop {
@@ -88,7 +89,7 @@ pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (
// has to be done in a loop because it needs to be polled after
// re-scheduling.
loop {
match delay.poll() {
match (&mut delay).unit_error().compat().poll() {
Err(e) => {
warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e);
delay.reset(rebroadcast_instant());
@@ -18,25 +18,23 @@
use futures::sync::mpsc;
use futures::prelude::*;
use network::consensus_gossip as network_gossip;
use network::{Event as NetworkEvent, PeerId, config::Roles};
use sc_network_test::{Block, Hash};
use network_gossip::Validator;
use tokio::runtime::current_thread;
use std::sync::Arc;
use keyring::Ed25519Keyring;
use codec::Encode;
use sp_runtime::traits::NumberFor;
use sp_runtime::{ConsensusEngineId, traits::NumberFor};
use std::{pin::Pin, task::{Context, Poll}};
use crate::environment::SharedVoterSetState;
use fg_primitives::AuthorityList;
use fg_primitives::{AuthorityList, GRANDPA_ENGINE_ID};
use super::gossip::{self, GossipValidator};
use super::{AuthorityId, VoterSet, Round, SetId};
enum Event {
MessagesFor(Hash, mpsc::UnboundedSender<network_gossip::TopicNotification>),
RegisterValidator(Arc<dyn network_gossip::Validator<Block>>),
GossipMessage(Hash, Vec<u8>, bool),
SendMessage(Vec<network::PeerId>, Vec<u8>),
EventStream(mpsc::UnboundedSender<NetworkEvent>),
WriteNotification(network::PeerId, Vec<u8>),
Report(network::PeerId, network::ReputationChange),
Announce(Hash),
}
@@ -46,56 +44,36 @@ struct TestNetwork {
sender: mpsc::UnboundedSender<Event>,
}
impl super::Network<Block> for TestNetwork {
type In = mpsc::UnboundedReceiver<network_gossip::TopicNotification>;
/// Get a stream of messages for a specific gossip topic.
fn messages_for(&self, topic: Hash) -> Self::In {
impl network_gossip::Network<Block> for TestNetwork {
fn event_stream(&self)
-> Box<dyn futures::Stream<Item = NetworkEvent, Error = ()> + Send> {
let (tx, rx) = mpsc::unbounded();
let _ = self.sender.unbounded_send(Event::MessagesFor(topic, tx));
rx
let _ = self.sender.unbounded_send(Event::EventStream(tx));
Box::new(rx)
}
/// Register a gossip validator.
fn register_validator(&self, validator: Arc<dyn network_gossip::Validator<Block>>) {
let _ = self.sender.unbounded_send(Event::RegisterValidator(validator));
}
/// Gossip a message out to all connected peers.
///
/// Force causes it to be sent to all peers, even if they've seen it already.
/// Only should be used in case of consensus stall.
fn gossip_message(&self, topic: Hash, data: Vec<u8>, force: bool) {
let _ = self.sender.unbounded_send(Event::GossipMessage(topic, data, force));
}
/// Send a message to a bunch of specific peers, even if they've seen it already.
fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::SendMessage(who, data));
}
/// Register a message with the gossip service, it isn't broadcast right
/// away to any peers, but may be sent to new peers joining or when asked to
/// broadcast the topic. Useful to register previous messages on node
/// startup.
fn register_gossip_message(&self, _topic: Hash, _data: Vec<u8>) {
// NOTE: only required to restore previous state on startup
// not required for tests currently
}
/// Report a peer's cost or benefit after some action.
fn report(&self, who: network::PeerId, cost_benefit: network::ReputationChange) {
fn report_peer(&self, who: network::PeerId, cost_benefit: network::ReputationChange) {
let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit));
}
/// Inform peers that a block with given hash should be downloaded.
fn disconnect_peer(&self, _: PeerId) {}
fn write_notification(&self, who: PeerId, _: ConsensusEngineId, message: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::WriteNotification(who, message));
}
fn register_notifications_protocol(&self, _: ConsensusEngineId) {}
fn announce(&self, block: Hash, _associated_data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::Announce(block));
}
/// Notify the sync service to try syncing the given chain.
fn set_sync_fork_request(&self, _peers: Vec<network::PeerId>, _hash: Hash, _number: NumberFor<Block>) {}
fn set_sync_fork_request(
&self,
_peers: Vec<network::PeerId>,
_hash: Hash,
_number: NumberFor<Block>,
) {}
}
impl network_gossip::ValidatorContext<Block> for TestNetwork {
@@ -104,14 +82,19 @@ impl network_gossip::ValidatorContext<Block> for TestNetwork {
fn broadcast_message(&mut self, _: Hash, _: Vec<u8>, _: bool) { }
fn send_message(&mut self, who: &network::PeerId, data: Vec<u8>) {
<Self as super::Network<Block>>::send_message(self, vec![who.clone()], data);
<Self as network_gossip::Network<Block>>::write_notification(
self,
who.clone(),
GRANDPA_ENGINE_ID,
data,
);
}
fn send_topic(&mut self, _: &network::PeerId, _: Hash, _: bool) { }
}
struct Tester {
net_handle: super::NetworkBridge<Block, TestNetwork>,
net_handle: super::NetworkBridge<Block>,
gossip_validator: Arc<GossipValidator<Block>>,
events: mpsc::UnboundedReceiver<Event>,
}
@@ -165,7 +148,7 @@ fn voter_set_state() -> SharedVoterSetState<Block> {
}
// needs to run in a tokio runtime.
fn make_test_network() -> (
fn make_test_network(executor: &impl futures03::task::Spawn) -> (
impl Future<Item=Tester,Error=()>,
TestNetwork,
) {
@@ -183,15 +166,16 @@ fn make_test_network() -> (
}
}
let (bridge, startup_work) = super::NetworkBridge::new(
let bridge = super::NetworkBridge::new(
net.clone(),
config(),
voter_set_state(),
executor,
Exit,
);
(
startup_work.map(move |()| Tester {
futures::future::ok(Tester {
gossip_validator: bridge.validator.clone(),
net_handle: bridge,
events: rx,
@@ -261,7 +245,8 @@ fn good_commit_leads_to_relay() {
let id = network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);
let test = make_test_network().0
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let test = make_test_network(&threads_pool).0
.and_then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL);
@@ -286,11 +271,15 @@ fn good_commit_leads_to_relay() {
// send a message.
let sender_id = id.clone();
let send_message = tester.filter_network_events(move |event| match event {
Event::MessagesFor(topic, sender) => {
if topic != global_topic { return false }
let _ = sender.unbounded_send(network_gossip::TopicNotification {
message: commit_to_send.clone(),
sender: Some(sender_id.clone()),
Event::EventStream(sender) => {
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id.clone(),
engine_id: GRANDPA_ENGINE_ID,
roles: Roles::FULL,
});
let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
remote: sender_id.clone(),
messages: vec![(GRANDPA_ENGINE_ID, commit_to_send.clone().into())],
});
true
@@ -314,12 +303,8 @@ fn good_commit_leads_to_relay() {
// a repropagation event coming from the network.
send_message.join(handle_commit).and_then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::GossipMessage(topic, data, false) => {
if topic == global_topic && data == encoded_commit {
true
} else {
panic!("Trying to gossip something strange")
}
Event::WriteNotification(_, data) => {
data == encoded_commit
}
_ => false,
})
@@ -328,11 +313,12 @@ fn good_commit_leads_to_relay() {
.map(|_| ())
});
current_thread::block_on_all(test).unwrap();
current_thread::Runtime::new().unwrap().block_on(test).unwrap();
}
#[test]
fn bad_commit_leads_to_report() {
env_logger::init();
let private = [Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let public = make_ids(&private[..]);
let voter_set = Arc::new(public.iter().cloned().collect::<VoterSet<AuthorityId>>());
@@ -376,7 +362,8 @@ fn bad_commit_leads_to_report() {
let id = network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);
let test = make_test_network().0
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let test = make_test_network(&threads_pool).0
.and_then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL);
@@ -401,11 +388,15 @@ fn bad_commit_leads_to_report() {
// send a message.
let sender_id = id.clone();
let send_message = tester.filter_network_events(move |event| match event {
Event::MessagesFor(topic, sender) => {
if topic != global_topic { return false }
let _ = sender.unbounded_send(network_gossip::TopicNotification {
message: commit_to_send.clone(),
sender: Some(sender_id.clone()),
Event::EventStream(sender) => {
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id.clone(),
engine_id: GRANDPA_ENGINE_ID,
roles: Roles::FULL,
});
let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
remote: sender_id.clone(),
messages: vec![(GRANDPA_ENGINE_ID, commit_to_send.clone().into())],
});
true
@@ -430,11 +421,7 @@ fn bad_commit_leads_to_report() {
send_message.join(handle_commit).and_then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::Report(who, cost_benefit) => {
if who == id && cost_benefit == super::cost::INVALID_COMMIT {
true
} else {
panic!("reported unknown peer or unexpected cost");
}
who == id && cost_benefit == super::cost::INVALID_COMMIT
}
_ => false,
})
@@ -443,14 +430,15 @@ fn bad_commit_leads_to_report() {
.map(|_| ())
});
current_thread::block_on_all(test).unwrap();
current_thread::Runtime::new().unwrap().block_on(test).unwrap();
}
#[test]
fn peer_with_higher_view_leads_to_catch_up_request() {
let id = network::PeerId::random();
let (tester, mut net) = make_test_network();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let (tester, mut net) = make_test_network(&threads_pool);
let test = tester
.and_then(move |tester| {
// register a peer with authority role.
@@ -477,10 +465,10 @@ fn peer_with_higher_view_leads_to_catch_up_request() {
// a catch up request should be sent to the peer for round - 1
tester.filter_network_events(move |event| match event {
Event::SendMessage(peers, message) => {
Event::WriteNotification(peer, message) => {
assert_eq!(
peers,
vec![id.clone()],
peer,
id,
);
assert_eq!(
@@ -501,5 +489,5 @@ fn peer_with_higher_view_leads_to_catch_up_request() {
.map(|_| ())
});
current_thread::block_on_all(test).unwrap();
current_thread::Runtime::new().unwrap().block_on(test).unwrap();
}
@@ -17,12 +17,13 @@
use std::collections::BTreeMap;
use std::iter::FromIterator;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use log::{debug, warn, info};
use codec::{Decode, Encode};
use futures::prelude::*;
use tokio_timer::Delay;
use futures03::future::{FutureExt as _, TryFutureExt as _};
use futures_timer::Delay;
use parking_lot::RwLock;
use sp_blockchain::{HeaderBackend, Error as ClientError};
@@ -48,7 +49,7 @@ use sp_runtime::traits::{
use sc_telemetry::{telemetry, CONSENSUS_INFO};
use crate::{
CommandOrError, Commit, Config, Error, Network, Precommit, Prevote,
CommandOrError, Commit, Config, Error, Precommit, Prevote,
PrimaryPropose, SignedMessage, NewAuthoritySet, VoterCommand,
};
@@ -375,20 +376,20 @@ impl<Block: BlockT> SharedVoterSetState<Block> {
}
/// The environment we run GRANDPA in.
pub(crate) struct Environment<B, E, Block: BlockT, N: Network<Block>, RA, SC, VR> {
pub(crate) struct Environment<B, E, Block: BlockT, RA, SC, VR> {
pub(crate) client: Arc<Client<B, E, Block, RA>>,
pub(crate) select_chain: SC,
pub(crate) voters: Arc<VoterSet<AuthorityId>>,
pub(crate) config: Config,
pub(crate) authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
pub(crate) consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
pub(crate) network: crate::communication::NetworkBridge<Block, N>,
pub(crate) network: crate::communication::NetworkBridge<Block>,
pub(crate) set_id: SetId,
pub(crate) voter_set_state: SharedVoterSetState<Block>,
pub(crate) voting_rule: VR,
}
impl<B, E, Block: BlockT, N: Network<Block>, RA, SC, VR> Environment<B, E, Block, N, RA, SC, VR> {
impl<B, E, Block: BlockT, RA, SC, VR> Environment<B, E, Block, RA, SC, VR> {
/// Updates the voter set state using the given closure. The write lock is
/// held during evaluation of the closure and the environment's voter set
/// state is set to its result if successful.
@@ -404,15 +405,13 @@ impl<B, E, Block: BlockT, N: Network<Block>, RA, SC, VR> Environment<B, E, Block
}
}
impl<Block: BlockT<Hash=H256>, B, E, N, RA, SC, VR>
impl<Block: BlockT<Hash=H256>, B, E, RA, SC, VR>
grandpa::Chain<Block::Hash, NumberFor<Block>>
for Environment<B, E, Block, N, RA, SC, VR>
for Environment<B, E, Block, RA, SC, VR>
where
Block: 'static,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
N: Network<Block> + 'static,
N::In: 'static,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>>,
RA: Send + Sync,
@@ -555,15 +554,13 @@ pub(crate) fn ancestry<B, Block: BlockT<Hash=H256>, E, RA>(
Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
}
impl<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR>
impl<B, E, Block: BlockT<Hash=H256>, RA, SC, VR>
voter::Environment<Block::Hash, NumberFor<Block>>
for Environment<B, E, Block, N, RA, SC, VR>
for Environment<B, E, Block, RA, SC, VR>
where
Block: 'static,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Send + Sync,
N: Network<Block> + 'static + Send,
N::In: 'static + Send,
RA: 'static + Send + Sync,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>>,
@@ -589,9 +586,8 @@ where
&self,
round: RoundNumber,
) -> voter::RoundData<Self::Id, Self::Timer, Self::In, Self::Out> {
let now = Instant::now();
let prevote_timer = Delay::new(now + self.config.gossip_duration * 2);
let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
let prevote_timer = Delay::new(self.config.gossip_duration * 2);
let precommit_timer = Delay::new(self.config.gossip_duration * 4);
let local_key = crate::is_voter(&self.voters, &self.config.keystore);
@@ -629,8 +625,8 @@ where
voter::RoundData {
voter_id: local_key.map(|pair| pair.public()),
prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())),
precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())),
prevote_timer: Box::new(prevote_timer.map(Ok).compat()),
precommit_timer: Box::new(precommit_timer.map(Ok).compat()),
incoming,
outgoing,
}
@@ -904,9 +900,7 @@ where
//random between 0-1 seconds.
let delay: u64 = thread_rng().gen_range(0, 1000);
Box::new(Delay::new(
Instant::now() + Duration::from_millis(delay)
).map_err(|e| Error::Timer(e).into()))
Box::new(Delay::new(Duration::from_millis(delay)).map(Ok).compat())
}
fn prevote_equivocation(
+30 -32
View File
@@ -73,7 +73,7 @@ use sp_finality_tracker;
use grandpa::Error as GrandpaError;
use grandpa::{voter, BlockNumberOps, voter_set::VoterSet};
use std::fmt;
use std::{fmt, io};
use std::sync::Arc;
use std::time::Duration;
@@ -90,7 +90,7 @@ mod observer;
mod until_imported;
mod voting_rule;
pub use communication::Network;
pub use network_gossip::Network;
pub use finality_proof::FinalityProofProvider;
pub use justification::GrandpaJustification;
pub use light_import::light_block_import;
@@ -230,7 +230,7 @@ pub enum Error {
/// An invariant has been violated (e.g. not finalizing pending change blocks in-order)
Safety(String),
/// A timer failed to fire.
Timer(tokio_timer::Error),
Timer(io::Error),
}
impl From<GrandpaError> for Error {
@@ -276,9 +276,8 @@ pub(crate) trait BlockSyncRequester<Block: BlockT> {
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}
impl<Block, N> BlockSyncRequester<Block> for NetworkBridge<Block, N> where
impl<Block> BlockSyncRequester<Block> for NetworkBridge<Block> where
Block: BlockT,
N: communication::Network<Block>,
{
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>) {
NetworkBridge::set_sync_fork_request(self, peers, hash, number)
@@ -447,11 +446,11 @@ where
))
}
fn global_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
fn global_communication<Block: BlockT<Hash=H256>, B, E, RA>(
set_id: SetId,
voters: &Arc<VoterSet<AuthorityId>>,
client: &Arc<Client<B, E, Block, RA>>,
network: &NetworkBridge<Block, N>,
network: &NetworkBridge<Block>,
keystore: &Option<KeyStorePtr>,
) -> (
impl Stream<
@@ -465,7 +464,6 @@ fn global_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
) where
B: Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync,
N: Network<Block>,
RA: Send + Sync,
NumberFor<Block>: BlockNumberOps,
{
@@ -523,7 +521,7 @@ fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT<Hash=H25
}
/// Parameters used to run Grandpa.
pub struct GrandpaParams<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X> {
pub struct GrandpaParams<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X, Sp> {
/// Configuration for the GRANDPA service.
pub config: Config,
/// A link to the block import worker.
@@ -538,24 +536,26 @@ pub struct GrandpaParams<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X> {
pub telemetry_on_connect: Option<mpsc::UnboundedReceiver<()>>,
/// A voting rule used to potentially restrict target votes.
pub voting_rule: VR,
/// How to spawn background tasks.
pub executor: Sp,
}
/// Run a GRANDPA voter as a task. Provide configuration and a link to a
/// block import worker that has already been instantiated with `block_import`.
pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X>,
pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
) -> sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
N: Network<Block> + Send + Sync + 'static,
N::In: Send + 'static,
N: Network<Block> + Send + Clone + 'static,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
NumberFor<Block>: BlockNumberOps,
DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
Sp: futures03::task::Spawn + 'static,
{
let GrandpaParams {
config,
@@ -565,6 +565,7 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
on_exit,
telemetry_on_connect,
voting_rule,
executor,
} = grandpa_params;
let LinkHalf {
@@ -574,10 +575,11 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
voter_commands_rx,
} = link;
let (network, network_startup) = NetworkBridge::new(
let network = NetworkBridge::new(
network,
config.clone(),
persistent_data.set_state.clone(),
&executor,
on_exit.clone(),
);
@@ -628,8 +630,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e);
});
let voter_work = network_startup.and_then(move |()| voter_work);
// Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa.
let telemetry_task = telemetry_task
.then(|_| futures::future::empty::<(), ()>());
@@ -641,17 +641,15 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
/// Future that powers the voter.
#[must_use]
struct VoterWork<B, E, Block: BlockT, N: Network<Block>, RA, SC, VR> {
struct VoterWork<B, E, Block: BlockT, RA, SC, VR> {
voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>,
env: Arc<Environment<B, E, Block, N, RA, SC, VR>>,
env: Arc<Environment<B, E, Block, RA, SC, VR>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
}
impl<B, E, Block, N, RA, SC, VR> VoterWork<B, E, Block, N, RA, SC, VR>
impl<B, E, Block, RA, SC, VR> VoterWork<B, E, Block, RA, SC, VR>
where
Block: BlockT<Hash=H256>,
N: Network<Block> + Sync,
N::In: Send + 'static,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
@@ -662,7 +660,7 @@ where
fn new(
client: Arc<Client<B, E, Block, RA>>,
config: Config,
network: NetworkBridge<Block, N>,
network: NetworkBridge<Block>,
select_chain: SC,
voting_rule: VR,
persistent_data: PersistentData<Block>,
@@ -823,11 +821,9 @@ where
}
}
impl<B, E, Block, N, RA, SC, VR> Future for VoterWork<B, E, Block, N, RA, SC, VR>
impl<B, E, Block, RA, SC, VR> Future for VoterWork<B, E, Block, RA, SC, VR>
where
Block: BlockT<Hash=H256>,
N: Network<Block> + Sync,
N::In: Send + 'static,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
@@ -878,20 +874,20 @@ where
}
#[deprecated(since = "1.1.0", note = "Please switch to run_grandpa_voter.")]
pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X>,
pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
) -> ::sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
N: Network<Block> + Send + Sync + 'static,
N::In: Send + 'static,
N: Network<Block> + Send + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
Sp: futures03::task::Spawn + 'static,
{
run_grandpa_voter(grandpa_params)
}
@@ -910,15 +906,17 @@ pub fn setup_disabled_grandpa<B, E, Block: BlockT<Hash=H256>, RA, N>(
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
RA: Send + Sync + 'static,
N: Network<Block> + Send + Sync + 'static,
N::In: Send + 'static,
N: Network<Block> + Send + Clone + 'static,
{
register_finality_tracker_inherent_data_provider(
client,
inherent_data_providers,
)?;
network.register_validator(Arc::new(network::consensus_gossip::DiscardAll));
// We register the GRANDPA protocol so that we don't consider it an anomaly
// to receive GRANDPA messages on the network. We don't process the
// messages.
network.register_notifications_protocol(communication::GRANDPA_ENGINE_ID);
Ok(())
}
@@ -151,19 +151,20 @@ fn grandpa_observer<B, E, Block: BlockT<Hash=H256>, RA, S, F>(
/// listening for and validating GRANDPA commits instead of following the full
/// protocol. Provide configuration and a link to a block import worker that has
/// already been instantiated with `block_import`.
pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC, Sp>(
config: Config,
link: LinkHalf<B, E, Block, RA, SC>,
network: N,
on_exit: impl futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
executor: Sp,
) -> ::sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
N: Network<Block> + Send + Sync + 'static,
N::In: Send + 'static,
N: Network<Block> + Send + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
RA: Send + Sync + 'static,
Sp: futures03::task::Spawn + 'static,
{
let LinkHalf {
client,
@@ -172,10 +173,11 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
voter_commands_rx,
} = link;
let (network, network_startup) = NetworkBridge::new(
let network = NetworkBridge::new(
network,
config.clone(),
persistent_data.set_state.clone(),
&executor,
on_exit.clone(),
);
@@ -193,8 +195,6 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
warn!("GRANDPA Observer failed: {:?}", e);
});
let observer_work = network_startup.and_then(move |()| observer_work);
use futures03::{FutureExt, TryFutureExt};
Ok(observer_work.select(on_exit.map(Ok).compat()).map(|_| ()).map_err(|_| ()))
@@ -202,20 +202,18 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
/// Future that powers the observer.
#[must_use]
struct ObserverWork<B: BlockT<Hash=H256>, N: Network<B>, E, Backend, RA> {
struct ObserverWork<B: BlockT<Hash=H256>, E, Backend, RA> {
observer: Box<dyn Future<Item = (), Error = CommandOrError<B::Hash, NumberFor<B>>> + Send>,
client: Arc<Client<Backend, E, B, RA>>,
network: NetworkBridge<B, N>,
network: NetworkBridge<B>,
persistent_data: PersistentData<B>,
keystore: Option<keystore::KeyStorePtr>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
}
impl<B, N, E, Bk, RA> ObserverWork<B, N, E, Bk, RA>
impl<B, E, Bk, RA> ObserverWork<B, E, Bk, RA>
where
B: BlockT<Hash=H256>,
N: Network<B>,
N::In: Send + 'static,
NumberFor<B>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<B, Blake2Hasher> + Send + Sync + 'static,
@@ -223,7 +221,7 @@ where
{
fn new(
client: Arc<Client<Bk, E, B, RA>>,
network: NetworkBridge<B, N>,
network: NetworkBridge<B>,
persistent_data: PersistentData<B>,
keystore: Option<keystore::KeyStorePtr>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
@@ -327,11 +325,9 @@ where
}
}
impl<B, N, E, Bk, RA> Future for ObserverWork<B, N, E, Bk, RA>
impl<B, E, Bk, RA> Future for ObserverWork<B, E, Bk, RA>
where
B: BlockT<Hash=H256>,
N: Network<B>,
N::In: Send + 'static,
NumberFor<B>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<B, Blake2Hasher> + Send + Sync + 'static,
+55 -15
View File
@@ -22,6 +22,7 @@ use sc_network_test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, Pe
use sc_network_test::{PassThroughVerifier};
use network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder};
use parking_lot::Mutex;
use futures_timer::Delay;
use futures03::{StreamExt as _, TryStreamExt as _};
use tokio::runtime::current_thread;
use keyring::Ed25519Keyring;
@@ -338,6 +339,7 @@ fn create_keystore(authority: Ed25519Keyring) -> (KeyStorePtr, tempfile::TempDir
// the voters are spawned but before blocking on them.
fn run_to_completion_with<F>(
runtime: &mut current_thread::Runtime,
threads_pool: &futures03::executor::ThreadPool,
blocks: u64,
net: Arc<Mutex<GrandpaTestNet>>,
peers: &[Ed25519Keyring],
@@ -405,6 +407,7 @@ fn run_to_completion_with<F>(
on_exit: Exit,
telemetry_on_connect: None,
voting_rule: (),
executor: threads_pool.clone(),
};
let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
@@ -427,11 +430,12 @@ fn run_to_completion_with<F>(
fn run_to_completion(
runtime: &mut current_thread::Runtime,
threads_pool: &futures03::executor::ThreadPool,
blocks: u64,
net: Arc<Mutex<GrandpaTestNet>>,
peers: &[Ed25519Keyring]
) -> u64 {
run_to_completion_with(runtime, blocks, net, peers, |_| None)
run_to_completion_with(runtime, threads_pool, blocks, net, peers, |_| None)
}
fn add_scheduled_change(block: &mut Block, change: ScheduledChange<BlockNumber>) {
@@ -456,6 +460,7 @@ fn add_forced_change(
fn finalize_3_voters_no_observers() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
@@ -469,7 +474,7 @@ fn finalize_3_voters_no_observers() {
}
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 20, net.clone(), peers);
run_to_completion(&mut runtime, &threads_pool, 20, net.clone(), peers);
// normally there's no justification for finalized blocks
assert!(
@@ -481,6 +486,7 @@ fn finalize_3_voters_no_observers() {
#[test]
fn finalize_3_voters_1_full_observer() {
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
@@ -499,6 +505,8 @@ fn finalize_3_voters_1_full_observer() {
let mut keystore_paths = Vec::new();
let mut voters = Vec::new();
for (peer_id, local_key) in all_peers.enumerate() {
let (client, net_service, link) = {
let net = net.lock();
@@ -539,9 +547,13 @@ fn finalize_3_voters_1_full_observer() {
on_exit: Exit,
telemetry_on_connect: None,
voting_rule: (),
executor: threads_pool.clone(),
};
let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
voters.push(run_grandpa_voter(grandpa_params).expect("all in order with client and network"));
}
for voter in voters {
runtime.spawn(voter);
}
@@ -583,6 +595,7 @@ fn transition_3_voters_twice_1_full_observer() {
let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8)));
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
net.lock().peer(0).push_blocks(1, false);
net.lock().block_until_sync(&mut runtime);
@@ -687,6 +700,7 @@ fn transition_3_voters_twice_1_full_observer() {
assert_eq!(set.pending_changes().count(), 0);
})
);
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
@@ -702,6 +716,7 @@ fn transition_3_voters_twice_1_full_observer() {
on_exit: Exit,
telemetry_on_connect: None,
voting_rule: (),
executor: threads_pool.clone(),
};
let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
@@ -720,6 +735,7 @@ fn transition_3_voters_twice_1_full_observer() {
#[test]
fn justification_is_emitted_when_consensus_data_changes() {
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 3);
@@ -728,7 +744,7 @@ fn justification_is_emitted_when_consensus_data_changes() {
net.peer(0).push_authorities_change_block(new_authorities);
net.block_until_sync(&mut runtime);
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 1, net.clone(), peers);
run_to_completion(&mut runtime, &threads_pool, 1, net.clone(), peers);
// ... and check that there's justification for block#1
assert!(net.lock().peer(0).client().justification(&BlockId::Number(1)).unwrap().is_some(),
@@ -738,6 +754,7 @@ fn justification_is_emitted_when_consensus_data_changes() {
#[test]
fn justification_is_generated_periodically() {
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
@@ -746,7 +763,7 @@ fn justification_is_generated_periodically() {
net.block_until_sync(&mut runtime);
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 32, net.clone(), peers);
run_to_completion(&mut runtime, &threads_pool, 32, net.clone(), peers);
// when block#32 (justification_period) is finalized, justification
// is required => generated
@@ -777,6 +794,7 @@ fn consensus_changes_works() {
#[test]
fn sync_justifications_on_change_blocks() {
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let peers_b = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob];
let voters = make_ids(peers_b);
@@ -808,7 +826,7 @@ fn sync_justifications_on_change_blocks() {
}
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 25, net.clone(), peers_a);
run_to_completion(&mut runtime, &threads_pool, 25, net.clone(), peers_a);
// the first 3 peers are grandpa voters and therefore have already finalized
// block 21 and stored a justification
@@ -831,6 +849,7 @@ fn sync_justifications_on_change_blocks() {
fn finalizes_multiple_pending_changes_in_order() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let peers_b = &[Ed25519Keyring::Dave, Ed25519Keyring::Eve, Ed25519Keyring::Ferdie];
@@ -884,13 +903,14 @@ fn finalizes_multiple_pending_changes_in_order() {
}
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 30, net.clone(), all_peers);
run_to_completion(&mut runtime, &threads_pool, 30, net.clone(), all_peers);
}
#[test]
fn force_change_to_new_set() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
// two of these guys are offline.
let genesis_authorities = &[
Ed25519Keyring::Alice,
@@ -941,7 +961,7 @@ fn force_change_to_new_set() {
// it will only finalize if the forced transition happens.
// we add_blocks after the voters are spawned because otherwise
// the link-halfs have the wrong AuthoritySet
run_to_completion(&mut runtime, 25, net, peers_a);
run_to_completion(&mut runtime, &threads_pool, 25, net, peers_a);
}
#[test]
@@ -1059,6 +1079,7 @@ fn voter_persists_its_votes() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
// we have two authorities but we'll only be running the voter for alice
// we are going to be listening for the prevotes it casts
@@ -1097,6 +1118,7 @@ fn voter_persists_its_votes() {
net: Arc<Mutex<GrandpaTestNet>>,
client: PeersClient,
keystore: KeyStorePtr,
threads_pool: futures03::executor::ThreadPool,
}
impl Future for ResettableVoter {
@@ -1132,6 +1154,7 @@ fn voter_persists_its_votes() {
on_exit: Exit,
telemetry_on_connect: None,
voting_rule: VotingRulesBuilder::default().build(),
executor: self.threads_pool.clone(),
};
let voter = run_grandpa_voter(grandpa_params)
@@ -1163,6 +1186,7 @@ fn voter_persists_its_votes() {
net: net.clone(),
client: client.clone(),
keystore,
threads_pool: threads_pool.clone(),
});
}
@@ -1191,13 +1215,13 @@ fn voter_persists_its_votes() {
set_state
};
let (network, routing_work) = communication::NetworkBridge::new(
let network = communication::NetworkBridge::new(
net.lock().peers[1].network_service().clone(),
config.clone(),
set_state,
&threads_pool,
Exit,
);
runtime.block_on(routing_work).unwrap();
let (round_rx, round_tx) = network.round_communication(
communication::Round(1),
@@ -1232,7 +1256,14 @@ fn voter_persists_its_votes() {
let net = net.clone();
let voter_tx = voter_tx.clone();
let round_tx = round_tx.clone();
future::Either::A(tokio_timer::Interval::new_interval(Duration::from_millis(200))
let interval = futures03::stream::unfold(Delay::new(Duration::from_millis(200)), |delay|
Box::pin(async move {
delay.await;
Some(((), Delay::new(Duration::from_millis(200))))
})).map(Ok::<_, ()>).compat();
future::Either::A(interval
.take_while(move |_| {
Ok(net2.lock().peer(1).client().info().chain.best_number != 40)
})
@@ -1302,6 +1333,7 @@ fn voter_persists_its_votes() {
fn finalize_3_voters_1_light_observer() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let authorities = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(authorities);
@@ -1322,7 +1354,7 @@ fn finalize_3_voters_1_light_observer() {
.take_while(|n| Ok(n.header.number() < &20))
.collect();
run_to_completion_with(&mut runtime, 20, net.clone(), authorities, |executor| {
run_to_completion_with(&mut runtime, &threads_pool, 20, net.clone(), authorities, |executor| {
executor.spawn(
run_grandpa_observer(
Config {
@@ -1336,6 +1368,7 @@ fn finalize_3_voters_1_light_observer() {
link,
net.lock().peers[3].network_service().clone(),
Exit,
threads_pool.clone(),
).unwrap()
).unwrap();
@@ -1347,6 +1380,7 @@ fn finalize_3_voters_1_light_observer() {
fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice];
let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 1);
@@ -1356,7 +1390,7 @@ fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
// && instead fetches finality proof for block #1
net.peer(0).push_authorities_change_block(vec![babe_primitives::AuthorityId::from_slice(&[42; 32])]);
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 1, net.clone(), peers);
run_to_completion(&mut runtime, &threads_pool, 1, net.clone(), peers);
net.lock().block_until_sync(&mut runtime);
// check that the block#1 is finalized on light client
@@ -1377,6 +1411,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
// two of these guys are offline.
let genesis_authorities = if FORCE_CHANGE {
@@ -1424,7 +1459,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
net.lock().block_until_sync(&mut runtime);
// finalize block #11 on full clients
run_to_completion(&mut runtime, 11, net.clone(), peers_a);
run_to_completion(&mut runtime, &threads_pool, 11, net.clone(), peers_a);
// request finalization by light client
net.lock().add_light_peer(&GrandpaTestNet::default_config());
@@ -1441,6 +1476,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
fn voter_catches_up_to_latest_round_when_behind() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob];
let voters = make_ids(peers);
@@ -1468,6 +1504,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
on_exit: Exit,
telemetry_on_connect: None,
voting_rule: (),
executor: threads_pool.clone(),
};
Box::new(run_grandpa_voter(grandpa_params).expect("all in order with client and network"))
@@ -1555,6 +1592,8 @@ fn grandpa_environment_respects_voting_rules() {
use grandpa::Chain;
use sc_network_test::TestClient;
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice];
let voters = make_ids(peers);
@@ -1581,10 +1620,11 @@ fn grandpa_environment_respects_voting_rules() {
observer_enabled: true,
};
let (network, _) = NetworkBridge::new(
let network = NetworkBridge::new(
network_service.clone(),
config.clone(),
set_state.clone(),
&threads_pool,
Exit,
);
@@ -32,11 +32,11 @@ use log::{debug, warn};
use client_api::{BlockImportNotification, ImportNotifications};
use futures::prelude::*;
use futures::stream::Fuse;
use futures_timer::Delay;
use futures03::{StreamExt as _, TryStreamExt as _};
use grandpa::voter;
use parking_lot::Mutex;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use tokio_timer::Interval;
use std::collections::{HashMap, VecDeque};
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
@@ -76,7 +76,7 @@ pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester,
status_check: BlockStatus,
inner: Fuse<I>,
ready: VecDeque<M::Blocked>,
check_pending: Interval,
check_pending: Box<dyn Stream<Item = (), Error = std::io::Error> + Send>,
/// Mapping block hashes to their block number, the point in time it was
/// first encountered (Instant) and a list of GRANDPA messages referencing
/// the block hash.
@@ -104,9 +104,13 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
// the import notifications interval takes care of most of this; this is
// used in the event of missed import notifications
const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5);
let now = Instant::now();
let check_pending = Interval::new(now + CHECK_PENDING_INTERVAL, CHECK_PENDING_INTERVAL);
let check_pending = futures03::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay|
Box::pin(async move {
delay.await;
Some(((), Delay::new(CHECK_PENDING_INTERVAL)))
})).map(Ok).compat();
UntilImported {
import_notifications: {
let stream = import_notifications.map::<_, fn(_) -> _>(|v| Ok::<_, ()>(v)).compat();
@@ -116,7 +120,7 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
status_check,
inner: stream.fuse(),
ready: VecDeque::new(),
check_pending,
check_pending: Box::new(check_pending),
pending: HashMap::new(),
identifier,
}
@@ -471,12 +475,12 @@ mod tests {
use super::*;
use crate::{CatchUp, CompactCommit};
use tokio::runtime::current_thread::Runtime;
use tokio_timer::Delay;
use test_client::runtime::{Block, Hash, Header};
use consensus_common::BlockOrigin;
use client_api::BlockImportNotification;
use futures::future::Either;
use futures03::channel::mpsc;
use futures_timer::Delay;
use futures03::{channel::mpsc, future::FutureExt as _, future::TryFutureExt as _};
use grandpa::Precommit;
#[derive(Clone)]
@@ -628,7 +632,7 @@ mod tests {
let inner_chain_state = chain_state.clone();
let work = until_imported
.into_future()
.select2(Delay::new(Instant::now() + Duration::from_millis(100)))
.select2(Delay::new(Duration::from_millis(100)).unit_error().compat())
.then(move |res| match res {
Err(_) => panic!("neither should have had error"),
Ok(Either::A(_)) => panic!("timeout should have fired first"),
@@ -929,7 +933,7 @@ mod tests {
// the `until_imported` stream doesn't request the blocks immediately,
// but it should request them after a small timeout
let timeout = Delay::new(Instant::now() + Duration::from_secs(60));
let timeout = Delay::new(Duration::from_secs(60)).unit_error().compat();
let test = assert.select2(timeout).map(|res| match res {
Either::A(_) => {},
Either::B(_) => panic!("timed out waiting for block sync request"),
@@ -0,0 +1,18 @@
[package]
description = "Gossiping for the Substrate network protocol"
name = "sc-network-gossip"
version = "2.0.0"
license = "GPL-3.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
log = "0.4.8"
futures01 = { package = "futures", version = "0.1.29" }
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "0.4.0"
lru = "0.1.2"
libp2p = { version = "0.13.0", default-features = false, features = ["libp2p-websocket"] }
network = { package = "sc-network", path = "../network" }
parking_lot = "0.9.0"
sp-runtime = { path = "../../primitives/runtime" }
@@ -0,0 +1,301 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::Network;
use crate::state_machine::{ConsensusGossip, Validator, TopicNotification};
use network::Context;
use network::message::generic::ConsensusMessage;
use network::{Event, ReputationChange};
use futures::{prelude::*, channel::mpsc, compat::Compat01As03, task::SpawnExt as _};
use libp2p::PeerId;
use parking_lot::Mutex;
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
use std::{sync::Arc, time::Duration};
/// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on
/// top of it.
pub struct GossipEngine<B: BlockT> {
inner: Arc<Mutex<GossipEngineInner<B>>>,
engine_id: ConsensusEngineId,
}
struct GossipEngineInner<B: BlockT> {
state_machine: ConsensusGossip<B>,
context: Box<dyn Context<B> + Send>,
context_ext: Box<dyn ContextExt<B> + Send>,
}
impl<B: BlockT> GossipEngine<B> {
/// Create a new instance.
pub fn new<N: Network<B> + Send + Clone + 'static>(
network: N,
executor: &impl futures::task::Spawn,
engine_id: ConsensusEngineId,
validator: Arc<dyn Validator<B>>,
) -> Self where B: 'static {
let mut state_machine = ConsensusGossip::new();
let mut context = Box::new(ContextOverService {
network: network.clone(),
});
let context_ext = Box::new(ContextOverService {
network: network.clone(),
});
// We grab the event stream before registering the notifications protocol, otherwise we
// might miss events.
let event_stream = network.event_stream();
network.register_notifications_protocol(engine_id);
state_machine.register_validator(&mut *context, engine_id, validator);
let inner = Arc::new(Mutex::new(GossipEngineInner {
state_machine,
context,
context_ext,
}));
let gossip_engine = GossipEngine {
inner: inner.clone(),
engine_id,
};
let res = executor.spawn({
let inner = Arc::downgrade(&inner);
async move {
loop {
let _ = futures_timer::Delay::new(Duration::from_millis(1100)).await;
if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.tick(&mut *inner.context);
} else {
// We reach this branch if the `Arc<GossipEngineInner>` has no reference
// left. We can now let the task end.
break;
}
}
}
});
// Note: we consider the chances of an error to spawn a background task almost null.
if res.is_err() {
log::error!(target: "gossip", "Failed to spawn background task");
}
let res = executor.spawn(async move {
let mut stream = Compat01As03::new(event_stream);
while let Some(Ok(event)) = stream.next().await {
match event {
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => {
if msg_engine_id != engine_id {
continue;
}
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.new_peer(&mut *inner.context, remote, roles);
}
Event::NotificationsStreamClosed { remote, engine_id: msg_engine_id } => {
if msg_engine_id != engine_id {
continue;
}
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.peer_disconnected(&mut *inner.context, remote);
},
Event::NotificationsReceived { remote, messages } => {
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.on_incoming(
&mut *inner.context,
remote,
messages.into_iter()
.filter_map(|(engine, data)| if engine == engine_id {
Some(ConsensusMessage { engine_id: engine, data: data.to_vec() })
} else { None })
.collect()
);
},
Event::Dht(_) => {}
}
}
});
// Note: we consider the chances of an error to spawn a background task almost null.
if res.is_err() {
log::error!(target: "gossip", "Failed to spawn background task");
}
gossip_engine
}
/// Closes all notification streams.
pub fn abort(&self) {
self.inner.lock().state_machine.abort();
}
pub fn report(&self, who: PeerId, reputation: ReputationChange) {
self.inner.lock().context.report_peer(who, reputation);
}
/// Registers a message without propagating it to any peers. The message
/// becomes available to new peers or when the service is asked to gossip
/// the message's topic. No validation is performed on the message, if the
/// message is already expired it should be dropped on the next garbage
/// collection.
pub fn register_gossip_message(
&self,
topic: B::Hash,
message: Vec<u8>,
) {
let message = ConsensusMessage {
engine_id: self.engine_id,
data: message,
};
self.inner.lock().state_machine.register_message(topic, message);
}
/// Broadcast all messages with given topic.
pub fn broadcast_topic(&self, topic: B::Hash, force: bool) {
let mut inner = self.inner.lock();
let inner = &mut *inner;
inner.state_machine.broadcast_topic(&mut *inner.context, topic, force);
}
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile).
pub fn messages_for(&self, topic: B::Hash)
-> mpsc::UnboundedReceiver<TopicNotification>
{
self.inner.lock().state_machine.messages_for(self.engine_id, topic)
}
/// Send all messages with given topic to a peer.
pub fn send_topic(
&self,
who: &PeerId,
topic: B::Hash,
force: bool
) {
let mut inner = self.inner.lock();
let inner = &mut *inner;
inner.state_machine.send_topic(&mut *inner.context, who, topic, self.engine_id, force)
}
/// Multicast a message to all peers.
pub fn gossip_message(
&self,
topic: B::Hash,
message: Vec<u8>,
force: bool,
) {
let message = ConsensusMessage {
engine_id: self.engine_id,
data: message,
};
let mut inner = self.inner.lock();
let inner = &mut *inner;
inner.state_machine.multicast(&mut *inner.context, topic, message, force)
}
/// Send addressed message to the given peers. The message is not kept or multicast
/// later on.
pub fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>) {
let mut inner = self.inner.lock();
let inner = &mut *inner;
for who in &who {
inner.state_machine.send_message(&mut *inner.context, who, ConsensusMessage {
engine_id: self.engine_id,
data: data.clone(),
});
}
}
/// Notify everyone we're connected to that we have the given block.
///
/// Note: this method isn't strictly related to gossiping and should eventually be moved
/// somewhere else.
pub fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
self.inner.lock().context_ext.announce(block, associated_data);
}
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
///
/// Note: this method isn't strictly related to gossiping and should eventually be moved
/// somewhere else.
pub fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
self.inner.lock().context_ext.set_sync_fork_request(peers, hash, number);
}
}
impl<B: BlockT> Clone for GossipEngine<B> {
fn clone(&self) -> Self {
GossipEngine {
inner: self.inner.clone(),
engine_id: self.engine_id.clone(),
}
}
}
struct ContextOverService<N> {
network: N,
}
impl<B: BlockT, N: Network<B>> Context<B> for ContextOverService<N> {
fn report_peer(&mut self, who: PeerId, reputation: ReputationChange) {
self.network.report_peer(who, reputation);
}
fn disconnect_peer(&mut self, who: PeerId) {
self.network.disconnect_peer(who)
}
fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>) {
for message in messages {
self.network.write_notification(who.clone(), message.engine_id, message.data);
}
}
fn send_chain_specific(&mut self, _: PeerId, _: Vec<u8>) {
log::error!(
target: "sub-libp2p",
"send_chain_specific has been called in a context where it shouldn't"
);
}
}
trait ContextExt<B: BlockT> {
fn announce(&self, block: B::Hash, associated_data: Vec<u8>);
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>);
}
impl<B: BlockT, N: Network<B>> ContextExt<B> for ContextOverService<N> {
fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
Network::announce(&self.network, block, associated_data)
}
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
Network::set_sync_fork_request(&self.network, peers, hash, number)
}
}
+140
View File
@@ -0,0 +1,140 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Polite gossiping.
//!
//! This crate provides gossiping capabilities on top of a network.
//!
//! Gossip messages are separated by two categories: "topics" and consensus engine ID.
//! The consensus engine ID is sent over the wire with the message, while the topic is not,
//! with the expectation that the topic can be derived implicitly from the content of the
//! message, assuming it is valid.
//!
//! Topics are a single 32-byte tag associated with a message, used to group those messages
//! in an opaque way. Consensus code can invoke `broadcast_topic` to attempt to send all messages
//! under a single topic to all peers who don't have them yet, and `send_topic` to
//! send all messages under a single topic to a specific peer.
//!
//! # Usage
//!
//! - Implement the `Network` trait, representing the low-level networking primitives. It is
//! already implemented on `sc_network::NetworkService`.
//! - Implement the `Validator` trait. See the section below.
//! - Decide on a `ConsensusEngineId`. Each gossiping protocol should have a different one.
//! - Build a `GossipEngine` using these three elements.
//! - Use the methods of the `GossipEngine` in order to send out messages and receive incoming
//! messages.
//!
//! # What is a validator?
//!
//! The primary role of a `Validator` is to process incoming messages from peers, and decide
//! whether to discard them or process them. It also decides whether to re-broadcast the message.
//!
//! The secondary role of the `Validator` is to check if a message is allowed to be sent to a given
//! peer. All messages, before being sent, will be checked against this filter.
//! This enables the validator to use information it's aware of about connected peers to decide
//! whether to send messages to them at any given moment in time - In particular, to wait until
//! peers can accept and process the message before sending it.
//!
//! Lastly, the fact that gossip validators can decide not to rebroadcast messages
//! opens the door for neighbor status packets to be baked into the gossip protocol.
//! These status packets will typically contain light pieces of information
//! used to inform peers of a current view of protocol state.
pub use self::bridge::GossipEngine;
pub use self::state_machine::{TopicNotification, MessageIntent};
pub use self::state_machine::{Validator, ValidatorContext, ValidationResult};
pub use self::state_machine::DiscardAll;
use network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
use std::sync::Arc;
mod bridge;
mod state_machine;
/// Abstraction over a network.
pub trait Network<B: BlockT> {
/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send>;
/// Adjust the reputation of a node.
fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange);
/// Force-disconnect a peer.
fn disconnect_peer(&self, who: PeerId);
/// Send a notification to a peer.
fn write_notification(&self, who: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>);
/// Registers a notifications protocol.
///
/// See the documentation of [`NetworkService:register_notifications_protocol`] for more information.
fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId
);
/// Notify everyone we're connected to that we have the given block.
///
/// Note: this method isn't strictly related to gossiping and should eventually be moved
/// somewhere else.
fn announce(&self, block: B::Hash, associated_data: Vec<u8>);
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
///
/// Note: this method isn't strictly related to gossiping and should eventually be moved
/// somewhere else.
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>);
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Network<B> for Arc<NetworkService<B, S, H>> {
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send> {
Box::new(NetworkService::event_stream(self))
}
fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {
NetworkService::report_peer(self, peer_id, reputation);
}
fn disconnect_peer(&self, who: PeerId) {
NetworkService::disconnect_peer(self, who)
}
fn write_notification(&self, who: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) {
NetworkService::write_notification(self, who, engine_id, message)
}
fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
) {
NetworkService::register_notifications_protocol(self, engine_id)
}
fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
NetworkService::announce_block(self, block, associated_data)
}
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
NetworkService::set_sync_fork_request(self, peers, hash, number)
}
}
@@ -14,48 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Utility for gossip of network messages between nodes.
//! Handles chain-specific and standard BFT messages.
//!
//! Gossip messages are separated by two categories: "topics" and consensus engine ID.
//! The consensus engine ID is sent over the wire with the message, while the topic is not,
//! with the expectation that the topic can be derived implicitly from the content of the
//! message, assuming it is valid.
//!
//! Topics are a single 32-byte tag associated with a message, used to group those messages
//! in an opaque way. Consensus code can invoke `broadcast_topic` to attempt to send all messages
//! under a single topic to all peers who don't have them yet, and `send_topic` to
//! send all messages under a single topic to a specific peer.
//!
//! Each consensus engine ID must have an associated,
//! registered `Validator` for all gossip messages. The primary role of this `Validator` is
//! to process incoming messages from peers, and decide whether to discard them or process
//! them. It also decides whether to re-broadcast the message.
//!
//! The secondary role of the `Validator` is to check if a message is allowed to be sent to a given
//! peer. All messages, before being sent, will be checked against this filter.
//! This enables the validator to use information it's aware of about connected peers to decide
//! whether to send messages to them at any given moment in time - In particular, to wait until
//! peers can accept and process the message before sending it.
//!
//! Lastly, the fact that gossip validators can decide not to rebroadcast messages
//! opens the door for neighbor status packets to be baked into the gossip protocol.
//! These status packets will typically contain light pieces of information
//! used to inform peers of a current view of protocol state.
use std::collections::{HashMap, HashSet, hash_map::Entry};
use std::sync::Arc;
use std::iter;
use std::time;
use log::{trace, debug};
use futures03::channel::mpsc;
use futures::channel::mpsc;
use lru::LruCache;
use libp2p::PeerId;
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use sp_runtime::ConsensusEngineId;
pub use crate::message::generic::{Message, ConsensusMessage};
use crate::protocol::Context;
use crate::config::Roles;
pub use network::message::generic::{Message, ConsensusMessage};
use network::Context;
use network::config::Roles;
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
@@ -63,7 +34,7 @@ const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30);
mod rep {
use peerset::ReputationChange as Rep;
use network::ReputationChange as Rep;
/// Reputation change when a peer sends us a gossip message that we didn't know about.
pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successfull gossip");
/// Reputation change when a peer sends us a gossip message that we already knew about.
@@ -96,16 +67,6 @@ struct MessageEntry<B: BlockT> {
sender: Option<PeerId>,
}
/// Consensus message destination.
pub enum MessageRecipient {
/// Send to all peers.
BroadcastToAll,
/// Send to peers that don't have that message already.
BroadcastNew,
/// Send to specific peer.
Peer(PeerId),
}
/// The reason for sending out the message.
#[derive(Eq, PartialEq, Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
@@ -190,7 +151,7 @@ fn propagate<'a, B: BlockT, I>(
validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
)
// (msg_hash, topic, message)
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>,
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>,
{
let mut check_fns = HashMap::new();
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| {
@@ -637,7 +598,7 @@ impl<B: BlockT> Validator<B> for DiscardAll {
mod tests {
use std::sync::Arc;
use sp_runtime::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use futures03::executor::block_on_stream;
use futures::executor::block_on_stream;
use super::*;
+39 -9
View File
@@ -16,10 +16,11 @@
use crate::{
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour,
protocol::event::DhtEvent
Event, protocol::event::DhtEvent
};
use crate::{ExHashT, specialization::NetworkSpecialization};
use crate::protocol::{CustomMessageOutcome, Protocol};
use consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use futures::prelude::*;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
@@ -27,7 +28,7 @@ use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess};
use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox};
use log::{debug, warn};
use sp_runtime::traits::Block as BlockT;
use sp_runtime::{traits::{Block as BlockT, NumberFor}, Justification};
use std::iter;
use void;
@@ -50,8 +51,10 @@ pub struct Behaviour<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
/// Event generated by `Behaviour`.
pub enum BehaviourOut<B: BlockT> {
SubstrateAction(CustomMessageOutcome<B>),
Dht(DhtEvent),
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
Event(Event),
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
@@ -127,7 +130,34 @@ Behaviour<B, S, H> {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for
Behaviour<B, S, H> {
fn inject_event(&mut self, event: CustomMessageOutcome<B>) {
self.events.push(BehaviourOut::SubstrateAction(event));
match event {
CustomMessageOutcome::BlockImport(origin, blocks) =>
self.events.push(BehaviourOut::BlockImport(origin, blocks)),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
self.events.push(BehaviourOut::JustificationImport(origin, hash, nb, justification)),
CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) =>
self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)),
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } =>
for engine_id in protocols {
self.events.push(BehaviourOut::Event(Event::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
roles,
}));
},
CustomMessageOutcome::NotificationsStreamClosed { remote, protocols } =>
for engine_id in protocols {
self.events.push(BehaviourOut::Event(Event::NotificationsStreamClosed {
remote: remote.clone(),
engine_id,
}));
},
CustomMessageOutcome::NotificationsReceived { remote, messages } => {
let ev = Event::NotificationsReceived { remote, messages };
self.events.push(BehaviourOut::Event(ev));
},
CustomMessageOutcome::None => {}
}
}
}
@@ -166,16 +196,16 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventPr
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
DiscoveryOut::ValueFound(results) => {
self.events.push(BehaviourOut::Dht(DhtEvent::ValueFound(results)));
self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results))));
}
DiscoveryOut::ValueNotFound(key) => {
self.events.push(BehaviourOut::Dht(DhtEvent::ValueNotFound(key)));
self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key))));
}
DiscoveryOut::ValuePut(key) => {
self.events.push(BehaviourOut::Dht(DhtEvent::ValuePut(key)));
self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key))));
}
DiscoveryOut::ValuePutFailed(key) => {
self.events.push(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key)));
self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key))));
}
}
}
+1 -1
View File
@@ -186,7 +186,7 @@ pub use service::{
NetworkService, NetworkWorker, TransactionPool, ExHashT, ReportHandle,
NetworkStateInfo,
};
pub use protocol::{PeerInfo, Context, ProtocolConfig, consensus_gossip, message, specialization};
pub use protocol::{PeerInfo, Context, ProtocolConfig, message, specialization};
pub use protocol::event::{Event, DhtEvent};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
+114 -68
View File
@@ -17,7 +17,7 @@
use crate::{DiscoveryNetBehaviour, config::ProtocolId};
use legacy_proto::{LegacyProto, LegacyProtoOut};
use crate::utils::interval;
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
use libp2p::{Multiaddr, PeerId};
@@ -38,7 +38,6 @@ use sp_runtime::traits::{
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::generic::{Message as GenericMessage, ConsensusMessage};
use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
use specialization::NetworkSpecialization;
use sync::{ChainSync, SyncState};
@@ -58,7 +57,6 @@ use util::LruHashSet;
mod legacy_proto;
mod util;
pub mod consensus_gossip;
pub mod message;
pub mod event;
pub mod light_dispatch;
@@ -135,7 +133,6 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
genesis_hash: B::Hash,
sync: ChainSync<B>,
specialization: S,
consensus_gossip: ConsensusGossip<B>,
context_data: ContextData<B, H>,
/// List of nodes for which we perform additional logging because they are important for the
/// user.
@@ -149,6 +146,8 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
/// Handles opening the unique substream and sending and receiving raw messages.
behaviour: LegacyProto<Substream<StreamMuxerBox>>,
/// List of notification protocols that have been registered.
registered_notif_protocols: HashSet<ConsensusEngineId>,
}
#[derive(Default)]
@@ -473,13 +472,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
genesis_hash: info.chain.genesis_hash,
sync,
specialization,
consensus_gossip: ConsensusGossip::new(),
handshaking_peers: HashMap::new(),
important_peers,
transaction_pool,
finality_proof_provider,
peerset_handle: peerset_handle.clone(),
behaviour,
registered_notif_protocols: HashSet::new(),
};
Ok((protocol, peerset_handle))
@@ -614,7 +613,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
stats.count_in += 1;
match message {
GenericMessage::Status(s) => self.on_status_message(who, s),
GenericMessage::Status(s) => return self.on_status_message(who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
// Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter.
@@ -656,20 +655,38 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
return self.on_finality_proof_response(who, response),
GenericMessage::RemoteReadChildRequest(request) =>
self.on_remote_read_child_request(who, request),
GenericMessage::Consensus(msg) => {
self.consensus_gossip.on_incoming(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
vec![msg],
);
}
GenericMessage::Consensus(msg) =>
return if self.registered_notif_protocols.contains(&msg.engine_id) {
CustomMessageOutcome::NotificationsReceived {
remote: who.clone(),
messages: vec![(msg.engine_id, From::from(msg.data))],
}
} else {
warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
CustomMessageOutcome::None
},
GenericMessage::ConsensusBatch(messages) => {
self.consensus_gossip.on_incoming(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
messages,
);
}
let messages = messages
.into_iter()
.filter_map(|msg| {
if self.registered_notif_protocols.contains(&msg.engine_id) {
Some((msg.engine_id, From::from(msg.data)))
} else {
warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
None
}
})
.collect::<Vec<_>>();
return if !messages.is_empty() {
CustomMessageOutcome::NotificationsReceived {
remote: who.clone(),
messages,
}
} else {
CustomMessageOutcome::None
};
},
GenericMessage::ChainSpecific(msg) => self.specialization.on_message(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
@@ -699,14 +716,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
);
}
/// Locks `self` and returns a context plus the `ConsensusGossip` struct.
pub fn consensus_gossip_lock<'a>(
&'a mut self,
) -> (impl Context<B> + 'a, &'a mut ConsensusGossip<B>) {
let context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
(context, &mut self.consensus_gossip)
}
/// Locks `self` and returns a context plus the network specialization.
pub fn specialization_lock<'a>(
&'a mut self,
@@ -715,26 +724,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
(context, &mut self.specialization)
}
/// Gossip a consensus message to the network.
pub fn gossip_consensus_message(
&mut self,
topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>,
recipient: GossipMessageRecipient,
) {
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
let message = ConsensusMessage { data: message, engine_id };
match recipient {
GossipMessageRecipient::BroadcastToAll =>
self.consensus_gossip.multicast(&mut context, topic, message, true),
GossipMessageRecipient::BroadcastNew =>
self.consensus_gossip.multicast(&mut context, topic, message, false),
GossipMessageRecipient::Peer(who) =>
self.send_message(&who, GenericMessage::Consensus(message)),
}
}
/// Called when a new peer is connected
pub fn on_peer_connected(&mut self, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
@@ -755,11 +744,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.handshaking_peers.remove(&peer);
self.context_data.peers.remove(&peer)
};
if let Some(peer_data) = removed {
if let Some(_peer_data) = removed {
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
if peer_data.info.protocol_version > 2 {
self.consensus_gossip.peer_disconnected(&mut context, peer.clone());
}
self.sync.peer_disconnected(peer.clone());
self.specialization.on_disconnect(&mut context, peer.clone());
self.light_dispatch.on_disconnect(LightDispatchIn {
@@ -922,9 +908,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
///
/// > **Note**: This method normally doesn't have to be called except for testing purposes.
pub fn tick(&mut self) {
self.consensus_gossip.tick(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)
);
self.maintain_peers();
self.light_dispatch.maintain_peers(LightDispatchIn {
behaviour: &mut self.behaviour,
@@ -975,9 +958,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
/// Called by peer to report status
fn on_status_message(&mut self, who: PeerId, status: message::Status<B>) {
fn on_status_message(&mut self, who: PeerId, status: message::Status<B>) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let protocol_version = {
let _protocol_version = {
if self.context_data.peers.contains_key(&who) {
log!(
target: "sync",
@@ -985,7 +968,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
"Unexpected status packet from {}", who
);
self.peerset_handle.report_peer(who, rep::UNEXPECTED_STATUS);
return;
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
@@ -996,7 +979,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);
return;
return CustomMessageOutcome::None;
}
if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
log!(
@@ -1006,7 +989,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
);
self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL);
self.behaviour.disconnect_peer(&who);
return;
return CustomMessageOutcome::None;
}
if self.config.roles.is_light() {
@@ -1015,7 +998,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return;
return CustomMessageOutcome::None;
}
// we don't interested in peers that are far behind us
@@ -1032,7 +1015,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return;
return CustomMessageOutcome::None;
}
}
@@ -1047,7 +1030,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
},
None => {
error!(target: "sync", "Received status from previously unconnected node {}", who);
return;
return CustomMessageOutcome::None;
},
};
@@ -1082,11 +1065,64 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
}
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
if protocol_version > 2 {
self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles);
self.specialization.on_connect(&mut context, who.clone(), status);
// Notify all the notification protocols as open.
CustomMessageOutcome::NotificationStreamOpened {
remote: who,
protocols: self.registered_notif_protocols.iter().cloned().collect(),
roles: info.roles,
}
self.specialization.on_connect(&mut context, who, status);
}
/// Send a notification to the given peer we're connected to.
///
/// Doesn't do anything if we don't have a notifications substream for that protocol with that
/// peer.
pub fn write_notification(
&mut self,
target: PeerId,
engine_id: ConsensusEngineId,
message: impl Into<Vec<u8>>
) {
if !self.registered_notif_protocols.contains(&engine_id) {
error!(
target: "sub-libp2p",
"Sending a notification with a protocol that wasn't registered: {:?}",
engine_id
);
}
self.send_message(&target, GenericMessage::Consensus(ConsensusMessage {
engine_id,
data: message.into(),
}));
}
/// Registers a new notifications protocol.
///
/// You are very strongly encouraged to call this method very early on. Any connection open
/// will retain the protocols that were registered then, and not any new one.
pub fn register_notifications_protocol(
&mut self,
engine_id: ConsensusEngineId,
) -> Vec<event::Event> {
if !self.registered_notif_protocols.insert(engine_id) {
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", engine_id);
}
// Registering a protocol while we already have open connections isn't great, but for now
// we handle it by notifying that we opened channels with everyone.
self.context_data.peers.iter()
.map(|(peer_id, peer)|
event::Event::NotificationStreamOpened {
remote: peer_id.clone(),
engine_id,
roles: peer.info.roles,
})
.collect()
}
/// Called when peer sends us new extrinsics
@@ -1758,6 +1794,12 @@ pub enum CustomMessageOutcome<B: BlockT> {
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Notification protocols have been opened with a remote.
NotificationStreamOpened { remote: PeerId, protocols: Vec<ConsensusEngineId>, roles: Roles },
/// Notification protocols have been closed with a remote.
NotificationsStreamClosed { remote: PeerId, protocols: Vec<ConsensusEngineId> },
/// Messages have been received on one or more notifications protocols.
NotificationsReceived { remote: PeerId, messages: Vec<(ConsensusEngineId, Bytes)> },
None,
}
@@ -1887,12 +1929,16 @@ Protocol<B, S, H> {
version <= CURRENT_VERSION as u8
&& version >= MIN_VERSION as u8
);
self.on_peer_connected(peer_id);
self.on_peer_connected(peer_id.clone());
CustomMessageOutcome::None
}
LegacyProtoOut::CustomProtocolClosed { peer_id, .. } => {
self.on_peer_disconnected(peer_id);
CustomMessageOutcome::None
self.on_peer_disconnected(peer_id.clone());
// Notify all the notification protocols as closed.
CustomMessageOutcome::NotificationsStreamClosed {
remote: peer_id,
protocols: self.registered_notif_protocols.iter().cloned().collect(),
}
},
LegacyProtoOut::CustomMessage { peer_id, message } =>
self.on_custom_message(peer_id, message),
@@ -17,10 +17,15 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.
use crate::config::Roles;
use bytes::Bytes;
use libp2p::core::PeerId;
use libp2p::kad::record::Key;
use sp_runtime::ConsensusEngineId;
/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
#[must_use]
pub enum DhtEvent {
/// The value was found.
ValueFound(Vec<(Key, Vec<u8>)>),
@@ -37,7 +42,37 @@ pub enum DhtEvent {
/// Type for events generated by networking layer.
#[derive(Debug, Clone)]
#[must_use]
pub enum Event {
/// Event generated by a DHT.
Dht(DhtEvent),
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
/// Roles that the remote .
roles: Roles,
},
/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationsStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
},
/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<(ConsensusEngineId, Bytes)>,
},
}
+97 -54
View File
@@ -45,8 +45,7 @@ use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer};
use crate::{transport, config::NonReservedPeerMode, ReputationChange};
use crate::config::{Params, TransportConfig};
use crate::error::Error;
use crate::protocol::{self, Protocol, Context, CustomMessageOutcome, PeerInfo};
use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::protocol::{self, Protocol, Context, PeerInfo};
use crate::protocol::{event::Event, light_dispatch::{AlwaysBadChecker, RequestData}};
use crate::protocol::specialization::NetworkSpecialization;
use crate::protocol::sync::SyncState;
@@ -276,6 +275,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
import_queue: params.import_queue,
from_worker,
light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()),
event_streams: Vec::new(),
})
}
@@ -416,6 +416,55 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
self.local_peer_id.clone()
}
/// Writes a message on an open notifications channel. Has no effect if the notifications
/// channel with this protocol name is closed.
///
/// > **Note**: The reason why this is a no-op in the situation where we have no channel is
/// > that we don't guarantee message delivery anyway. Networking issues can cause
/// > connections to drop at any time, and higher-level logic shouldn't differentiate
/// > between the remote voluntarily closing a substream or a network error
/// > preventing the message from being delivered.
///
/// The protocol must have been registered with `register_notifications_protocol`.
///
pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::WriteNotification {
target,
engine_id,
message,
});
}
/// Returns a stream containing the events that happen on the network.
///
/// If this method is called multiple times, the events are duplicated.
///
/// The stream never ends (unless the `NetworkWorker` gets shut down).
pub fn event_stream(&self) -> impl Stream<Item = Event, Error = ()> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = mpsc::unbounded();
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::EventStream(tx));
rx
}
/// Registers a new notifications protocol.
///
/// After that, you can call `write_notifications`.
///
/// Please call `event_stream` before registering a protocol, otherwise you may miss events
/// about the protocol that you have registered.
///
/// You are very strongly encouraged to call this method very early on. Any connection open
/// will retain the protocols that were registered then, and not any new one.
pub fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::RegisterNotifProtocol {
engine_id,
});
}
/// You must call this when new transactons are imported by the transaction pool.
///
/// The latest transactions will be fetched from the `TransactionPool` that was passed at
@@ -432,27 +481,19 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash, data));
}
/// Send a consensus message through the gossip
pub fn gossip_consensus_message(
&self,
topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>,
recipient: GossipMessageRecipient,
) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::GossipConsensusMessage(
topic, engine_id, message, recipient,
));
}
/// Report a given peer as either beneficial (+) or costly (-) according to the
/// given scalar.
pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
self.peerset.report_peer(who, cost_benefit);
}
/// Disconnect from a node as soon as possible.
///
/// This triggers the same effects as if the connection had closed itself spontaneously.
pub fn disconnect_peer(&self, who: PeerId) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::DisconnectPeer(who));
}
/// Request a justification for the given block from the network.
///
/// On success, the justification will be passed to the import queue that was part at
@@ -472,15 +513,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
.unbounded_send(ServerToWorkerMsg::ExecuteWithSpec(Box::new(f)));
}
/// Execute a closure with the consensus gossip.
pub fn with_gossip<F>(&self, f: F)
where F: FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>) + Send + 'static
{
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::ExecuteWithGossip(Box::new(f)));
}
/// Are we in the process of downloading the chain?
pub fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::Relaxed)
@@ -630,12 +662,20 @@ enum ServerToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
RequestJustification(B::Hash, NumberFor<B>),
AnnounceBlock(B::Hash, Vec<u8>),
ExecuteWithSpec(Box<dyn FnOnce(&mut S, &mut dyn Context<B>) + Send>),
ExecuteWithGossip(Box<dyn FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>) + Send>),
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, GossipMessageRecipient),
GetValue(record::Key),
PutValue(record::Key, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
EventStream(mpsc::UnboundedSender<Event>),
WriteNotification {
message: Vec<u8>,
engine_id: ConsensusEngineId,
target: PeerId,
},
RegisterNotifProtocol {
engine_id: ConsensusEngineId,
},
DisconnectPeer(PeerId),
}
/// Main network worker. Must be polled in order for the network to advance.
@@ -659,13 +699,15 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
from_worker: mpsc::UnboundedReceiver<ServerToWorkerMsg<B, S>>,
/// Receiver for queries from the light client that must be processed.
light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
/// Senders for events that happen on the network.
event_streams: Vec<mpsc::UnboundedSender<Event>>,
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for NetworkWorker<B, S, H> {
type Item = Event;
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Poll the import queue for actions to perform.
let _ = futures03::future::poll_fn(|cx| {
self.import_queue.poll_actions(cx, &mut NetworkLink {
@@ -685,7 +727,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for Ne
// Process the next message coming from the `NetworkService`.
let msg = match self.from_worker.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(None)),
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => break,
};
@@ -695,13 +737,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for Ne
let (mut context, spec) = protocol.specialization_lock();
task(spec, &mut context);
},
ServerToWorkerMsg::ExecuteWithGossip(task) => {
let protocol = self.network_service.user_protocol_mut();
let (mut context, gossip) = protocol.consensus_gossip_lock();
task(gossip, &mut context);
}
ServerToWorkerMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
self.network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient),
ServerToWorkerMsg::AnnounceBlock(hash, data) =>
self.network_service.user_protocol_mut().announce_block(hash, data),
ServerToWorkerMsg::RequestJustification(hash, number) =>
@@ -716,6 +751,18 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for Ne
self.network_service.add_known_address(peer_id, addr),
ServerToWorkerMsg::SyncFork(peer_ids, hash, number) =>
self.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServerToWorkerMsg::EventStream(sender) =>
self.event_streams.push(sender),
ServerToWorkerMsg::WriteNotification { message, engine_id, target } =>
self.network_service.user_protocol_mut().write_notification(target, engine_id, message),
ServerToWorkerMsg::RegisterNotifProtocol { engine_id } => {
let events = self.network_service.user_protocol_mut().register_notifications_protocol(engine_id);
for event in events {
self.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok());
}
},
ServerToWorkerMsg::DisconnectPeer(who) =>
self.network_service.user_protocol_mut().disconnect_peer(&who),
}
}
@@ -723,27 +770,23 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for Ne
// Process the next action coming from the network.
let poll_value = self.network_service.poll();
let outcome = match poll_value {
match poll_value {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome,
Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) =>
return Ok(Async::Ready(Some(Event::Dht(ev)))),
Ok(Async::Ready(None)) => CustomMessageOutcome::None,
Ok(Async::Ready(Some(BehaviourOut::BlockImport(origin, blocks)))) =>
self.import_queue.import_blocks(origin, blocks),
Ok(Async::Ready(Some(BehaviourOut::JustificationImport(origin, hash, nb, justification)))) =>
self.import_queue.import_justification(origin, hash, nb, justification),
Ok(Async::Ready(Some(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)))) =>
self.import_queue.import_finality_proof(origin, hash, nb, proof),
Ok(Async::Ready(Some(BehaviourOut::Event(ev)))) => {
self.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok());
},
Ok(Async::Ready(None)) => {},
Err(err) => {
error!(target: "sync", "Error in the network: {:?}", err);
return Err(err)
}
};
match outcome {
CustomMessageOutcome::BlockImport(origin, blocks) =>
self.import_queue.import_blocks(origin, blocks),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
self.import_queue.import_justification(origin, hash, nb, justification),
CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) =>
self.import_queue.import_finality_proof(origin, hash, nb, proof),
CustomMessageOutcome::None => {}
}
}
// Update the variables shared with the `NetworkService`.
+34 -13
View File
@@ -131,6 +131,14 @@ impl Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for SpawnTaskHandle
}
}
impl futures03::task::Spawn for SpawnTaskHandle {
fn spawn_obj(&self, future: futures03::task::FutureObj<'static, ()>)
-> Result<(), futures03::task::SpawnError> {
self.execute(Box::new(futures03::compat::Compat::new(future.unit_error())))
.map_err(|_| futures03::task::SpawnError::shutdown())
}
}
/// Abstraction over a Substrate service.
pub trait AbstractService: 'static + Future<Item = (), Error = Error> +
Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send {
@@ -375,6 +383,9 @@ fn build_network_future<
let mut finality_notification_stream = client.finality_notification_stream().fuse()
.map(|v| Ok::<_, ()>(v)).compat();
// Initializing a stream in order to obtain DHT events from the network.
let mut event_stream = network.service().event_stream();
futures::future::poll_fn(move || {
let before_polling = Instant::now();
@@ -451,22 +462,32 @@ fn build_network_future<
(status, state)
});
// Processing DHT events.
while let Ok(Async::Ready(Some(event))) = event_stream.poll() {
match event {
Event::Dht(event) => {
// Given that client/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht
// events are being passed on to the authority-discovery module. In the future there might be multiple
// consumers of these events. In that case this would need to be refactored to properly dispatch the events,
// e.g. via a subscriber model.
if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) {
if e.is_full() {
warn!(target: "service", "Dht event channel to authority discovery is full, dropping event.");
} else if e.is_disconnected() {
warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event.");
}
}
}
_ => {}
}
}
// Main network polling.
while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| {
if let Ok(Async::Ready(())) = network.poll().map_err(|err| {
warn!(target: "service", "Error in network: {:?}", err);
}) {
// Given that client/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht
// events are being passed on to the authority-discovery module. In the future there might be multiple
// consumers of these events. In that case this would need to be refactored to properly dispatch the events,
// e.g. via a subscriber model.
if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) {
if e.is_full() {
warn!(target: "service", "Dht event channel to authority discovery is full, dropping event.");
} else if e.is_disconnected() {
warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event.");
}
}
};
return Ok(Async::Ready(()));
}
// Now some diagnostic for performances.
let polling_dur = before_polling.elapsed();