Polite-grandpa improvements (#2229)

* send neighbor packets in more generic way

* integrate periodic neighbor-packet rebroadcaster

* integrate reporting

* attach callbacks to commit messages for rebroadcasting and reporting

* Tests for commit relay

* crunch up some nice warnings

* exit-scope sub-futures of grandpa

* address small grumbles

* some changes to commit handling
This commit is contained in:
Robert Habermeier
2019-04-16 09:25:46 +02:00
committed by Gav Wood
parent 7e1ac69791
commit 1aa6eb9fa8
10 changed files with 936 additions and 247 deletions
@@ -73,10 +73,12 @@ use network::{config::Roles, PeerId};
use parity_codec::{Encode, Decode};
use substrate_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug};
use log::{trace, debug, warn};
use futures::prelude::*;
use futures::sync::mpsc;
use crate::{CompactCommit, SignedMessage};
use super::{Round, SetId, Network};
use super::{cost, benefit, Round, SetId};
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
@@ -227,6 +229,12 @@ pub(super) enum GossipMessage<Block: BlockT> {
Neighbor(VersionedNeighborPacket<NumberFor<Block>>),
}
impl<Block: BlockT> From<NeighborPacket<NumberFor<Block>>> for GossipMessage<Block> {
fn from(neighbor: NeighborPacket<NumberFor<Block>>) -> Self {
GossipMessage::Neighbor(VersionedNeighborPacket::V1(neighbor))
}
}
/// Network level message with topic information.
#[derive(Debug, Encode, Decode)]
pub(super) struct VoteOrPrecommitMessage<Block: BlockT> {
@@ -273,32 +281,12 @@ impl<N> VersionedNeighborPacket<N> {
}
}
// cost scalars for reporting peers.
mod cost {
pub(super) const PAST_REJECTION: i32 = -50;
pub(super) const BAD_SIGNATURE: i32 = -100;
pub(super) const MALFORMED_COMMIT: i32 = -1000;
pub(super) const FUTURE_MESSAGE: i32 = -500;
pub(super) const INVALID_VIEW_CHANGE: i32 = -500;
pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
pub(super) const PER_BLOCK_LOADED: i32 = -10;
}
// benefit scalars for reporting peers.
mod benefit {
pub(super) const ROUND_MESSAGE: i32 = 100;
pub(super) const BASIC_VALIDATED_COMMIT: i32 = 100;
pub(super) const PER_EQUIVOCATION: i32 = 10;
}
/// Misbehavior that peers can perform.
///
/// `cost` gives a cost that can be used to perform cost/benefit analysis of a
/// peer.
#[derive(Clone, Copy, Debug, PartialEq)]
enum Misbehavior {
pub(super) enum Misbehavior {
// invalid neighbor message, considering the last one.
InvalidViewChange,
// could not decode neighbor message. bytes-length of the packet.
@@ -315,7 +303,7 @@ enum Misbehavior {
}
impl Misbehavior {
fn cost(&self) -> i32 {
pub(super) fn cost(&self) -> i32 {
use Misbehavior::*;
match *self {
@@ -402,9 +390,13 @@ impl<N: Ord> Peers<N> {
Some(p) => p,
};
if peer.view.last_commit.as_ref() >= Some(&new_height) {
// this doesn't allow a peer to send us unlimited commits with the
// same height, because there is still a misbehavior condition based on
// sending commits that are <= the best we are aware of.
if peer.view.last_commit.as_ref() > Some(&new_height) {
return Err(Misbehavior::InvalidViewChange);
}
peer.view.last_commit = Some(new_height);
Ok(())
@@ -416,7 +408,7 @@ impl<N: Ord> Peers<N> {
}
#[derive(Debug)]
enum Action<H> {
pub(super) enum Action<H> {
// repropagate under given topic, to the given peers, applying cost/benefit to originator.
Keep(H, i32),
// discard and process.
@@ -445,7 +437,9 @@ impl<Block: BlockT> Inner<Block> {
}
/// Note a round in a set has started.
fn note_round<N: Network<Block>>(&mut self, round: Round, set_id: SetId, net: &N) {
fn note_round<F>(&mut self, round: Round, set_id: SetId, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
if self.local_view.round == round && self.local_view.set_id == set_id {
return
}
@@ -457,24 +451,28 @@ impl<Block: BlockT> Inner<Block> {
self.local_view.set_id = set_id;
self.live_topics.push(round, set_id);
self.multicast_neighbor_packet(net);
self.multicast_neighbor_packet(send_neighbor);
}
/// Note that a voter set with given ID has started. Does nothing if the last
/// call to the function was with the same `set_id`.
fn note_set<N: Network<Block>>(&mut self, set_id: SetId, net: &N) {
fn note_set<F>(&mut self, set_id: SetId, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
if self.local_view.set_id == set_id { return }
self.local_view.update_set(set_id);
self.live_topics.push(Round(0), set_id);
self.multicast_neighbor_packet(net);
self.multicast_neighbor_packet(send_neighbor);
}
/// Note that we've imported a commit finalizing a given block.
fn note_commit_finalized<N: Network<Block>>(&mut self, finalized: NumberFor<Block>, net: &N) {
fn note_commit_finalized<F>(&mut self, finalized: NumberFor<Block>, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
if self.local_view.last_commit.as_ref() < Some(&finalized) {
self.local_view.last_commit = Some(finalized);
self.multicast_neighbor_packet(net)
self.multicast_neighbor_packet(send_neighbor)
}
}
@@ -520,7 +518,6 @@ impl<Block: BlockT> Inner<Block> {
fn validate_commit_message(&mut self, who: &PeerId, full: &FullCommitMessage<Block>)
-> Action<Block::Hash>
{
use grandpa::Message as GrandpaMessage;
if let Err(misbehavior) = self.peers.update_commit_height(who, full.message.target_number) {
return Action::Discard(misbehavior.cost());
@@ -543,28 +540,6 @@ impl<Block: BlockT> Inner<Block> {
return Action::Discard(cost::MALFORMED_COMMIT);
}
// check signatures on all contained precommits.
for (i, (precommit, &(ref sig, ref id))) in full.message.precommits.iter()
.zip(&full.message.auth_data)
.enumerate()
{
if let Err(()) = super::check_message_sig::<Block>(
&GrandpaMessage::Precommit(precommit.clone()),
id,
sig,
full.round.0,
full.set_id.0,
) {
debug!(target: "afg", "Bad commit message signature {}", id);
telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id);
return Action::Discard(Misbehavior::BadCommitMessage {
signatures_checked: i as i32,
blocks_loaded: 0,
equivocations_caught: 0,
}.cost());
}
}
// always discard commits initially and rebroadcast after doing full
// checking.
let topic = super::global_topic::<Block>(full.set_id.0);
@@ -585,54 +560,64 @@ impl<Block: BlockT> Inner<Block> {
(neighbor_topics, Action::Discard(cb))
}
fn construct_neighbor_packet(&self) -> GossipMessage<Block> {
fn multicast_neighbor_packet<F>(&self, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
let packet = NeighborPacket {
round: self.local_view.round,
set_id: self.local_view.set_id,
commit_finalized_height: self.local_view.last_commit.unwrap_or(Zero::zero()),
};
GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
}
fn multicast_neighbor_packet<N: Network<Block>>(&self, net: &N) {
let packet = self.construct_neighbor_packet();
let peers = self.peers.inner.keys().cloned().collect();
net.send_message(peers, packet.encode());
send_neighbor(peers, packet);
}
}
/// A validator for GRANDPA gossip messages.
pub(super) struct GossipValidator<Block: BlockT> {
inner: parking_lot::RwLock<Inner<Block>>,
report_sender: mpsc::UnboundedSender<PeerReport>,
}
impl<Block: BlockT> GossipValidator<Block> {
/// Create a new gossip-validator.
pub(super) fn new(config: crate::Config) -> GossipValidator<Block> {
GossipValidator { inner: parking_lot::RwLock::new(Inner::new(config)) }
pub(super) fn new(config: crate::Config) -> (GossipValidator<Block>, ReportStream) {
let (tx, rx) = mpsc::unbounded();
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
report_sender: tx,
};
(val, ReportStream { reports: rx })
}
/// Note a round in a set has started.
pub(super) fn note_round<N: Network<Block>>(&self, round: Round, set_id: SetId, net: &N) {
self.inner.write().note_round(round, set_id, net);
pub(super) fn note_round<F>(&self, round: Round, set_id: SetId, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
self.inner.write().note_round(round, set_id, send_neighbor);
}
/// Note that a voter set with given ID has started.
pub(super) fn note_set<N: Network<Block>>(&self, set_id: SetId, net: &N) {
self.inner.write().note_set(set_id, net);
pub(super) fn note_set<F>(&self, set_id: SetId, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
self.inner.write().note_set(set_id, send_neighbor);
}
/// Note that we've imported a commit finalizing a given block.
pub(super) fn note_commit_finalized<N: Network<Block>>(&self, finalized: NumberFor<Block>, net: &N) {
self.inner.write().note_commit_finalized(finalized, net);
pub(super) fn note_commit_finalized<F>(&self, finalized: NumberFor<Block>, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
self.inner.write().note_commit_finalized(finalized, send_neighbor);
}
fn report(&self, _who: &PeerId, _cost_benefit: i32) {
// report
fn report(&self, who: PeerId, cost_benefit: i32) {
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
}
fn do_validate(&self, who: &PeerId, mut data: &[u8])
pub(super) fn do_validate(&self, who: &PeerId, mut data: &[u8])
-> (Action<Block::Hash>, Vec<Block::Hash>)
{
let mut broadcast_topics = Vec::new();
@@ -670,7 +655,14 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
let packet_data = {
let mut inner = self.inner.write();
inner.peers.new_peer(who.clone());
inner.construct_neighbor_packet().encode()
let packet = NeighborPacket {
round: inner.local_view.round,
set_id: inner.local_view.set_id,
commit_finalized_height: inner.local_view.last_commit.unwrap_or(Zero::zero()),
};
GossipMessage::<Block>::from(packet).encode()
};
context.send_message(who, packet_data);
}
@@ -691,15 +683,15 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
match action {
Action::Keep(topic, cb) => {
self.report(who, cb);
self.report(who.clone(), cb);
network_gossip::ValidationResult::ProcessAndKeep(topic)
}
Action::ProcessAndDiscard(topic, cb) => {
self.report(who, cb);
self.report(who.clone(), cb);
network_gossip::ValidationResult::ProcessAndDiscard(topic)
}
Action::Discard(cb) => {
self.report(who, cb);
self.report(who.clone(), cb);
network_gossip::ValidationResult::Discard
}
}
@@ -788,6 +780,62 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
}
}
struct PeerReport {
who: PeerId,
cost_benefit: i32,
}
// wrapper around a stream of reports.
#[must_use = "The report stream must be consumed"]
pub(super) struct ReportStream {
reports: mpsc::UnboundedReceiver<PeerReport>,
}
impl ReportStream {
/// Consume the report stream, converting it into a future that
/// handles all reports.
pub(super) fn consume<B, N>(self, net: N)
-> impl Future<Item=(),Error=()> + Send + 'static
where
B: BlockT,
N: super::Network<B> + Send + 'static,
{
ReportingTask {
reports: self.reports,
net,
_marker: Default::default(),
}
}
}
/// A future for reporting peers.
#[must_use = "Futures do nothing unless polled"]
struct ReportingTask<B, N> {
reports: mpsc::UnboundedReceiver<PeerReport>,
net: N,
_marker: std::marker::PhantomData<B>,
}
impl<B: BlockT, N: super::Network<B>> Future for ReportingTask<B, N> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.reports.poll() {
Err(_) => {
warn!(target: "afg", "Report stream terminated unexpectedly");
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(PeerReport { who, cost_benefit }))) =>
self.net.report(who, cost_benefit),
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -804,36 +852,6 @@ mod tests {
}
}
#[derive(Clone)]
struct StubNetwork;
impl<Block: BlockT> super::Network<Block> for StubNetwork {
type In = futures::stream::Empty<network_gossip::TopicNotification, ()>;
fn messages_for(&self, _topic: Block::Hash) -> Self::In {
futures::stream::empty()
}
fn register_validator(
&self,
_validator: std::sync::Arc<dyn network_gossip::Validator<Block>>,
) {
}
fn gossip_message(&self, _topic: Block::Hash, _data: Vec<u8>, _force: bool) {
}
fn send_message(&self, _who: Vec<network::PeerId>, _data: Vec<u8>) {
}
fn announce(&self, _block: Block::Hash) {
}
}
#[test]
fn view_vote_rules() {
let view = View { round: Round(100), set_id: SetId(1), last_commit: Some(1000u64) };
@@ -975,12 +993,12 @@ mod tests {
#[test]
fn messages_not_expired_immediately() {
let val = GossipValidator::<Block>::new(config());
let (val, _) = GossipValidator::<Block>::new(config());
let set_id = 1;
for round_num in 1u64..10 {
val.note_round(Round(round_num), SetId(set_id), &StubNetwork);
val.note_round(Round(round_num), SetId(set_id), |_, _| {});
}
{
@@ -989,14 +1007,12 @@ mod tests {
// messages from old rounds are expired.
for round_num in 1u64..last_kept_round {
println!("{} should be expired?", round_num);
let topic = crate::communication::round_topic::<Block>(round_num, 1);
assert!(is_expired(topic, &[1, 2, 3]));
}
// messages from not-too-old rounds are not expired.
for round_num in last_kept_round..10 {
println!("{} should not be expired?", round_num);
let topic = crate::communication::round_topic::<Block>(round_num, 1);
assert!(!is_expired(topic, &[1, 2, 3]));
}
@@ -38,8 +38,8 @@ use parity_codec::{Encode, Decode};
use substrate_primitives::{ed25519, Pair};
use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO};
use runtime_primitives::ConsensusEngineId;
use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
use network::{consensus_gossip as network_gossip, Service as NetworkService,};
use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use network::{consensus_gossip as network_gossip, Service as NetworkService};
use network_gossip::ConsensusMessage;
use crate::{Error, Message, SignedMessage, Commit, CompactCommit};
@@ -50,15 +50,40 @@ use gossip::{
use substrate_primitives::ed25519::{Public as AuthorityId, Signature as AuthoritySignature};
pub mod gossip;
mod periodic;
#[cfg(test)]
mod tests;
/// The consensus engine ID of GRANDPA.
pub const GRANDPA_ENGINE_ID: ConsensusEngineId = [b'a', b'f', b'g', b'1'];
// cost scalars for reporting peers.
mod cost {
pub(super) const PAST_REJECTION: i32 = -50;
pub(super) const BAD_SIGNATURE: i32 = -100;
pub(super) const MALFORMED_COMMIT: i32 = -1000;
pub(super) const FUTURE_MESSAGE: i32 = -500;
pub(super) const INVALID_VIEW_CHANGE: i32 = -500;
pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
pub(super) const PER_BLOCK_LOADED: i32 = -10;
pub(super) const INVALID_COMMIT: i32 = -5000;
}
// benefit scalars for reporting peers.
mod benefit {
pub(super) const ROUND_MESSAGE: i32 = 100;
pub(super) const BASIC_VALIDATED_COMMIT: i32 = 100;
pub(super) const PER_EQUIVOCATION: i32 = 10;
}
/// A handle to the network. This is generally implemented by providing some
/// handle to a gossip service or similar.
///
/// Intended to be a lightweight handle such as an `Arc`.
pub trait Network<Block: BlockT>: Clone {
pub trait Network<Block: BlockT>: Clone + Send + 'static {
/// A stream of input messages for a topic.
type In: Stream<Item=network_gossip::TopicNotification,Error=()>;
@@ -77,6 +102,9 @@ pub trait Network<Block: BlockT>: Clone {
/// Send a message to a bunch of specific peers, even if they've seen it already.
fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>);
/// Report a peer's cost or benefit after some action.
fn report(&self, who: network::PeerId, cost_benefit: i32);
/// Inform peers that a block with given hash should be downloaded.
fn announce(&self, block: Block::Hash);
}
@@ -133,6 +161,10 @@ impl<B, S> Network<B> for Arc<NetworkService<B, S>> where
})
}
fn report(&self, who: network::PeerId, cost_benefit: i32) {
self.report_peer(who, cost_benefit)
}
fn announce(&self, block: B::Hash) {
self.announce_block(block)
}
@@ -164,18 +196,49 @@ impl Stream for NetworkStream {
}
}
/// The result of processing a commit.
pub(crate) enum CommitProcessingOutcome {
Good,
Bad,
}
/// Bridge between the underlying network service, gossiping consensus messages and Grandpa
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
validator: Arc<GossipValidator<B>>,
neighbor_sender: periodic::NeighborPacketSender<B>,
}
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// Create a new NetworkBridge to the given NetworkService
pub(crate) fn new(service: N, config: crate::Config) -> Self {
let validator = Arc::new(GossipValidator::new(config));
/// Create a new NetworkBridge to the given NetworkService. Returns the service
/// handle and a future that must be polled to completion to finish startup.
pub(crate) fn new(
service: N,
config: crate::Config,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
) -> (
Self,
impl futures::Future<Item = (), Error = ()> + Send + 'static,
) {
let (validator, report_stream) = GossipValidator::new(config);
let validator = Arc::new(validator);
service.register_validator(validator.clone());
NetworkBridge { service, validator: validator }
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone());
let reporting_job = report_stream.consume(service.clone());
let bridge = NetworkBridge { service, validator, neighbor_sender };
let startup_work = futures::future::lazy(move || {
// lazily spawn these jobs onto their own tasks. the lazy future has access
// to tokio globals, which aren't available outside.
tokio::spawn(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(())));
tokio::spawn(reporting_job.select(on_exit.clone()).then(|_| Ok(())));
Ok(())
});
(bridge, startup_work)
}
/// Get the round messages for a round in a given set ID. These are signature-checked.
@@ -190,7 +253,14 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
impl Stream<Item=SignedMessage<B>,Error=Error>,
impl Sink<SinkItem=Message<B>,SinkError=Error>,
) {
self.validator.note_round(round, set_id, &self.service);
self.validator.note_round(
round,
set_id,
|to, neighbor| self.service.send_message(
to,
GossipMessage::<B>::from(neighbor).encode()
),
);
let locals = local_key.and_then(|pair| {
let public = pair.public();
@@ -275,61 +345,113 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
/// Set up the global communication streams.
pub(crate) fn global_communication(&self,
pub(crate) fn global_communication(
&self,
set_id: SetId,
voters: Arc<VoterSet<AuthorityId>>,
is_voter: bool
is_voter: bool,
) -> (
impl Stream<Item = (u64, CompactCommit<B>), Error = Error>,
impl Stream<Item = (u64, CompactCommit<B>, impl FnMut(CommitProcessingOutcome)), Error = Error>,
impl Sink<SinkItem = (u64, Commit<B>), SinkError = Error>,
) {
self.validator.note_set(set_id, &self.service);
self.validator.note_set(
set_id,
|to, neighbor| self.service.send_message(to, GossipMessage::<B>::from(neighbor).encode()),
);
let service = self.service.clone();
let topic = global_topic::<B>(set_id.0);
let incoming = self.service.messages_for(topic)
.filter_map(|notification| {
// this could be optimized by decoding piecewise.
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
if decoded.is_none() {
trace!(target: "afg", "Skipping malformed commit message {:?}", notification);
}
decoded
})
.filter_map(move |msg| {
match msg {
GossipMessage::Commit(msg) => {
let round = msg.round;
let precommits_signed_by: Vec<String> =
msg.message.auth_data.iter().map(move |(_, a)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.received_commit";
"contains_precommits_signed_by" => ?precommits_signed_by,
"target_number" => ?msg.message.target_number,
"target_hash" => ?msg.message.target_hash,
);
check_compact_commit::<B>(msg.message, &*voters).map(move |c| (round.0, c))
},
_ => {
debug!(target: "afg", "Skipping unknown message type");
return None;
}
}
})
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
let incoming = incoming_global(service, topic, voters, self.validator.clone());
let outgoing = CommitsOut::<B, N>::new(
self.service.clone(),
set_id.0,
is_voter,
self.validator.clone(),
);
(incoming, outgoing)
}
}
pub(crate) fn note_commit_finalized(&self, number: NumberFor<B>) {
self.validator.note_commit_finalized(number, &self.service);
}
fn incoming_global<B: BlockT, N: Network<B>>(
service: N,
topic: B::Hash,
voters: Arc<VoterSet<AuthorityId>>,
gossip_validator: Arc<GossipValidator<B>>,
) -> impl Stream<Item = (u64, CompactCommit<B>, impl FnMut(CommitProcessingOutcome)), Error = Error> {
service.messages_for(topic)
.filter_map(|notification| {
// this could be optimized by decoding piecewise.
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
if decoded.is_none() {
trace!(target: "afg", "Skipping malformed commit message {:?}", notification);
}
decoded.map(move |d| (notification, d))
})
.filter_map(move |(notification, msg)| {
match msg {
GossipMessage::Commit(msg) => {
let precommits_signed_by: Vec<String> =
msg.message.auth_data.iter().map(move |(_, a)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.received_commit";
"contains_precommits_signed_by" => ?precommits_signed_by,
"target_number" => ?msg.message.target_number.clone(),
"target_hash" => ?msg.message.target_hash.clone(),
);
if let Err(cost) = check_compact_commit::<B>(
&msg.message,
&*voters,
msg.round,
msg.set_id,
) {
if let Some(who) = notification.sender {
service.report(who, cost);
}
None
} else {
Some((msg, notification, service.clone()))
}
},
_ => {
debug!(target: "afg", "Skipping unknown message type");
return None;
}
}
})
.map(move |(msg, mut notification, service)| {
let round = msg.round.0;
let commit = msg.message;
let finalized_number = commit.target_number;
let gossip_validator = gossip_validator.clone();
let cb = move |outcome| match outcome {
CommitProcessingOutcome::Good => {
// if it checks out, gossip it. not accounting for
// any discrepancy between the actual ghost and the claimed
// finalized number.
gossip_validator.note_commit_finalized(
finalized_number,
|to, neighbor_msg| service.send_message(
to,
GossipMessage::<B>::from(neighbor_msg).encode(),
),
);
service.gossip_message(topic, notification.message.clone(), false);
}
CommitProcessingOutcome::Bad => {
// report peer and do not gossip.
if let Some(who) = notification.sender.take() {
service.report(who, cost::INVALID_COMMIT);
}
}
};
(round, commit, cb)
})
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}
impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
@@ -337,6 +459,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
NetworkBridge {
service: self.service.clone(),
validator: Arc::clone(&self.validator),
neighbor_sender: self.neighbor_sender.clone(),
}
}
}
@@ -462,41 +585,86 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
}
}
// checks a compact commit. returns `None` if it was bad and
fn check_compact_commit<Block: BlockT>(
msg: CompactCommit<Block>,
msg: &CompactCommit<Block>,
voters: &VoterSet<AuthorityId>,
) -> Option<CompactCommit<Block>> {
if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() {
debug!(target: "afg", "Skipping malformed compact commit");
return None;
}
round: Round,
set_id: SetId,
) -> Result<(), i32> {
// 4f + 1 = equivocations from f voters.
let f = voters.total_weight() - voters.threshold();
let full_threshold = voters.total_weight() + f;
// check signatures on all contained precommits.
// check total weight is not too high.
let mut total_weight = 0;
for (_, ref id) in &msg.auth_data {
if !voters.contains_key(id) {
if let Some(weight) = voters.info(id).map(|info| info.weight()) {
total_weight += weight;
if total_weight > full_threshold {
return Err(cost::MALFORMED_COMMIT);
}
} else {
debug!(target: "afg", "Skipping commit containing unknown voter {}", id);
return None;
return Err(cost::MALFORMED_COMMIT);
}
}
Some(msg)
if total_weight < voters.threshold() {
return Err(cost::MALFORMED_COMMIT);
}
// check signatures on all contained precommits.
for (i, (precommit, &(ref sig, ref id))) in msg.precommits.iter()
.zip(&msg.auth_data)
.enumerate()
{
use crate::communication::gossip::Misbehavior;
use grandpa::Message as GrandpaMessage;
if let Err(()) = check_message_sig::<Block>(
&GrandpaMessage::Precommit(precommit.clone()),
id,
sig,
round.0,
set_id.0,
) {
debug!(target: "afg", "Bad commit message signature {}", id);
telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id);
let cost = Misbehavior::BadCommitMessage {
signatures_checked: i as i32,
blocks_loaded: 0,
equivocations_caught: 0,
}.cost();
return Err(cost);
}
}
Ok(())
}
/// An output sink for commit messages.
struct CommitsOut<Block: BlockT, N: Network<Block>> {
network: N,
set_id: SetId,
is_voter: bool,
_marker: ::std::marker::PhantomData<Block>,
gossip_validator: Arc<GossipValidator<Block>>,
}
impl<Block: BlockT, N: Network<Block>> CommitsOut<Block, N> {
/// Create a new commit output stream.
pub(crate) fn new(network: N, set_id: u64, is_voter: bool) -> Self {
pub(crate) fn new(
network: N,
set_id: u64,
is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>,
) -> Self {
CommitsOut {
network,
set_id: SetId(set_id),
is_voter,
_marker: Default::default(),
gossip_validator,
}
}
}
@@ -534,6 +702,16 @@ impl<Block: BlockT, N: Network<Block>> Sink for CommitsOut<Block, N> {
});
let topic = global_topic::<Block>(self.set_id.0);
// the gossip validator needs to be made aware of the best commit-height we know of
// before gosipping
self.gossip_validator.note_commit_finalized(
commit.target_number,
|to, neighbor| self.network.send_message(
to,
GossipMessage::<Block>::from(neighbor).encode(),
),
);
self.network.gossip_message(topic, message.encode(), false);
Ok(AsyncSink::Ready)
@@ -0,0 +1,93 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Periodic rebroadcast of neighbor packets.
use super::{gossip::{NeighborPacket, GossipMessage}, Network};
use futures::prelude::*;
use futures::sync::mpsc;
use runtime_primitives::traits::{NumberFor, Block as BlockT};
use network::PeerId;
use tokio::timer::Delay;
use log::warn;
use parity_codec::Encode;
use std::time::{Instant, Duration};
// how often to rebroadcast, if no other
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
fn rebroadcast_instant() -> Instant {
Instant::now() + REBROADCAST_AFTER
}
/// A sender used to send neighbor packets to a background job.
pub(super) type NeighborPacketSender<B> = mpsc::UnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>;
/// Does the work of sending neighbor packets, asynchronously.
///
/// It may rebroadcast the last neighbor packet periodically when no
/// progress is made.
pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (
impl Future<Item = (), Error = ()> + Send + 'static,
NeighborPacketSender<B>,
) where
B: BlockT,
N: Network<B>,
{
let mut last = None;
let (tx, mut rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
let mut delay = Delay::new(rebroadcast_instant());
let work = futures::future::poll_fn(move || {
loop {
match rx.poll().expect("unbounded receivers do not error; qed") {
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some((to, packet))) => {
// send to peers.
net.send_message(to.clone(), GossipMessage::<B>::from(packet.clone()).encode());
// rebroadcasting network.
delay.reset(rebroadcast_instant());
last = Some((to, packet));
}
Async::NotReady => break,
}
}
// has to be done in a loop because it needs to be polled after
// re-scheduling.
loop {
match delay.poll() {
Err(e) => {
warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e);
delay.reset(rebroadcast_instant());
}
Ok(Async::Ready(())) => {
delay.reset(rebroadcast_instant());
if let Some((ref to, ref packet)) = last {
// send to peers.
net.send_message(to.clone(), GossipMessage::<B>::from(packet.clone()).encode());
}
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
}
});
(work, tx)
}
@@ -0,0 +1,385 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Tests for the communication portion of the GRANDPA crate.
use futures::sync::mpsc;
use futures::prelude::*;
use network::consensus_gossip as network_gossip;
use network::test::{Block, Hash};
use network_gossip::Validator;
use tokio::runtime::current_thread;
use std::sync::Arc;
use keyring::AuthorityKeyring;
use parity_codec::Encode;
use super::gossip::{self, GossipValidator};
use super::{AuthorityId, VoterSet, Round, SetId};
enum Event {
MessagesFor(Hash, mpsc::UnboundedSender<network_gossip::TopicNotification>),
RegisterValidator(Arc<dyn network_gossip::Validator<Block>>),
GossipMessage(Hash, Vec<u8>, bool),
SendMessage(Vec<network::PeerId>, Vec<u8>),
Report(network::PeerId, i32),
Announce(Hash),
}
#[derive(Clone)]
struct TestNetwork {
sender: mpsc::UnboundedSender<Event>,
}
impl super::Network<Block> for TestNetwork {
type In = mpsc::UnboundedReceiver<network_gossip::TopicNotification>;
/// Get a stream of messages for a specific gossip topic.
fn messages_for(&self, topic: Hash) -> Self::In {
let (tx, rx) = mpsc::unbounded();
let _ = self.sender.unbounded_send(Event::MessagesFor(topic, tx));
rx
}
/// Register a gossip validator.
fn register_validator(&self, validator: Arc<dyn network_gossip::Validator<Block>>) {
let _ = self.sender.unbounded_send(Event::RegisterValidator(validator));
}
/// Gossip a message out to all connected peers.
///
/// Force causes it to be sent to all peers, even if they've seen it already.
/// Only should be used in case of consensus stall.
fn gossip_message(&self, topic: Hash, data: Vec<u8>, force: bool) {
let _ = self.sender.unbounded_send(Event::GossipMessage(topic, data, force));
}
/// Send a message to a bunch of specific peers, even if they've seen it already.
fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::SendMessage(who, data));
}
/// Report a peer's cost or benefit after some action.
fn report(&self, who: network::PeerId, cost_benefit: i32) {
let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit));
}
/// Inform peers that a block with given hash should be downloaded.
fn announce(&self, block: Hash) {
let _ = self.sender.unbounded_send(Event::Announce(block));
}
}
struct Tester {
net_handle: super::NetworkBridge<Block, TestNetwork>,
gossip_validator: Arc<GossipValidator<Block>>,
events: mpsc::UnboundedReceiver<Event>,
}
impl Tester {
fn filter_network_events<F>(self, mut pred: F) -> impl Future<Item=Self,Error=()>
where F: FnMut(Event) -> bool
{
let mut s = Some(self);
futures::future::poll_fn(move || loop {
match s.as_mut().unwrap().events.poll().expect("concluded early") {
Async::Ready(None) => panic!("concluded early"),
Async::Ready(Some(item)) => if pred(item) {
return Ok(Async::Ready(s.take().unwrap()))
},
Async::NotReady => return Ok(Async::NotReady),
}
})
}
}
// some random config (not really needed)
fn config() -> crate::Config {
crate::Config {
gossip_duration: std::time::Duration::from_millis(10),
justification_period: 256,
local_key: None,
name: None,
}
}
// needs to run in a tokio runtime.
fn make_test_network() -> impl Future<Item=Tester,Error=()> {
let (tx, rx) = mpsc::unbounded();
let net = TestNetwork { sender: tx };
#[derive(Clone)]
struct Exit;
impl Future for Exit {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
}
}
let (bridge, startup_work) = super::NetworkBridge::new(
net.clone(),
config(),
Exit,
);
startup_work.map(move |()| Tester {
gossip_validator: bridge.validator.clone(),
net_handle: bridge,
events: rx,
})
}
fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(AuthorityId, u64)> {
keys.iter()
.map(|key| AuthorityId(key.to_raw_public()))
.map(|id| (id, 1))
.collect()
}
struct NoopContext;
impl network_gossip::ValidatorContext<Block> for NoopContext {
fn broadcast_topic(&mut self, _: Hash, _: bool) { }
fn broadcast_message(&mut self, _: Hash, _: Vec<u8>, _: bool) { }
fn send_message(&mut self, _: &network::PeerId, _: Vec<u8>) { }
fn send_topic(&mut self, _: &network::PeerId, _: Hash, _: bool) { }
}
#[test]
fn good_commit_leads_to_relay() {
let private = [AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let public = make_ids(&private[..]);
let voter_set = Arc::new(public.iter().cloned().collect::<VoterSet<AuthorityId>>());
let round = 0;
let set_id = 1;
let commit = {
let target_hash: Hash = [1; 32].into();
let target_number = 500;
let precommit = grandpa::Precommit { target_hash: target_hash.clone(), target_number };
let payload = super::localized_payload(
round, set_id, &grandpa::Message::Precommit(precommit.clone())
);
let mut precommits = Vec::new();
let mut auth_data = Vec::new();
for (i, key) in private.iter().enumerate() {
precommits.push(precommit.clone());
let signature = key.sign(&payload[..]);
auth_data.push((signature, public[i].0.clone()))
}
grandpa::CompactCommit {
target_hash,
target_number,
precommits,
auth_data,
}
};
let encoded_commit = gossip::GossipMessage::<Block>::Commit(gossip::FullCommitMessage {
round: Round(round),
set_id: SetId(set_id),
message: commit,
}).encode();
let id = network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);
let test = make_test_network()
.and_then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL);
Ok((tester, id))
})
.and_then(move |(tester, id)| {
// start round, dispatch commit, and wait for broadcast.
let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false);
{
let (action, _) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]);
match action {
gossip::Action::ProcessAndDiscard(t, _) => assert_eq!(t, global_topic),
_ => panic!("wrong expected outcome from initial commit validation"),
}
}
let commit_to_send = encoded_commit.clone();
// asking for global communication will cause the test network
// to send us an event asking us for a stream. use it to
// send a message.
let sender_id = id.clone();
let send_message = tester.filter_network_events(move |event| match event {
Event::MessagesFor(topic, sender) => {
if topic != global_topic { return false }
let _ = sender.unbounded_send(network_gossip::TopicNotification {
message: commit_to_send.clone(),
sender: Some(sender_id.clone()),
});
true
}
_ => false,
});
// when the commit comes in, we'll tell the callback it was good.
let handle_commit = commits_in.into_future()
.map(|(item, _)| {
let (_, _, mut callback) = item.unwrap();
(callback)(super::CommitProcessingOutcome::Good);
})
.map_err(|_| panic!("could not process commit"));
// once the message is sent and commit is "handled" we should have
// a repropagation event coming from the network.
send_message.join(handle_commit).and_then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::GossipMessage(topic, data, false) => {
if topic == global_topic && data == encoded_commit {
true
} else {
panic!("Trying to gossip something strange")
}
}
_ => false,
})
})
.map_err(|_| panic!("could not watch for gossip message"))
.map(|_| ())
});
current_thread::block_on_all(test).unwrap();
}
#[test]
fn bad_commit_leads_to_report() {
let private = [AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let public = make_ids(&private[..]);
let voter_set = Arc::new(public.iter().cloned().collect::<VoterSet<AuthorityId>>());
let round = 0;
let set_id = 1;
let commit = {
let target_hash: Hash = [1; 32].into();
let target_number = 500;
let precommit = grandpa::Precommit { target_hash: target_hash.clone(), target_number };
let payload = super::localized_payload(
round, set_id, &grandpa::Message::Precommit(precommit.clone())
);
let mut precommits = Vec::new();
let mut auth_data = Vec::new();
for (i, key) in private.iter().enumerate() {
precommits.push(precommit.clone());
let signature = key.sign(&payload[..]);
auth_data.push((signature, public[i].0.clone()))
}
grandpa::CompactCommit {
target_hash,
target_number,
precommits,
auth_data,
}
};
let encoded_commit = gossip::GossipMessage::<Block>::Commit(gossip::FullCommitMessage {
round: Round(round),
set_id: SetId(set_id),
message: commit,
}).encode();
let id = network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);
let test = make_test_network()
.and_then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL);
Ok((tester, id))
})
.and_then(move |(tester, id)| {
// start round, dispatch commit, and wait for broadcast.
let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false);
{
let (action, _) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]);
match action {
gossip::Action::ProcessAndDiscard(t, _) => assert_eq!(t, global_topic),
_ => panic!("wrong expected outcome from initial commit validation"),
}
}
let commit_to_send = encoded_commit.clone();
// asking for global communication will cause the test network
// to send us an event asking us for a stream. use it to
// send a message.
let sender_id = id.clone();
let send_message = tester.filter_network_events(move |event| match event {
Event::MessagesFor(topic, sender) => {
if topic != global_topic { return false }
let _ = sender.unbounded_send(network_gossip::TopicNotification {
message: commit_to_send.clone(),
sender: Some(sender_id.clone()),
});
true
}
_ => false,
});
// when the commit comes in, we'll tell the callback it was good.
let handle_commit = commits_in.into_future()
.map(|(item, _)| {
let (_, _, mut callback) = item.unwrap();
(callback)(super::CommitProcessingOutcome::Bad);
})
.map_err(|_| panic!("could not process commit"));
// once the message is sent and commit is "handled" we should have
// a report event coming from the network.
send_message.join(handle_commit).and_then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::Report(who, cost_benefit) => {
if who == id && cost_benefit == super::cost::INVALID_COMMIT {
true
} else {
panic!("reported unknown peer or unexpected cost");
}
}
_ => false,
})
})
.map_err(|_| panic!("could not watch for peer report"))
.map(|_| ())
});
current_thread::block_on_all(test).unwrap();
}
@@ -642,7 +642,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
return Ok(());
}
let res = finalize_block(
finalize_block(
&*self.inner,
&self.authority_set,
&self.consensus_changes,
@@ -650,13 +650,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
hash,
number,
(round, commit).into(),
);
if let Ok(_) = res {
self.network.note_commit_finalized(number);
}
res
)
}
fn round_commit_timer(&self) -> Self::Timer {
+31 -52
View File
@@ -337,9 +337,12 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
))
}
fn global_communication<Block: BlockT<Hash=H256>, I, O>(
commits_in: I,
commits_out: O,
fn global_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
local_key: Option<&Arc<ed25519::Pair>>,
set_id: u64,
voters: &Arc<VoterSet<AuthorityId>>,
client: &Arc<Client<B, E, Block, RA>>,
network: &NetworkBridge<Block, N>,
) -> (
impl Stream<
Item = voter::CommunicationIn<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>,
@@ -349,44 +352,6 @@ fn global_communication<Block: BlockT<Hash=H256>, I, O>(
SinkItem = voter::CommunicationOut<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>,
SinkError = CommandOrError<H256, NumberFor<Block>>,
>,
) where
I: Stream<
Item = (u64, ::grandpa::CompactCommit<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>),
Error = CommandOrError<H256, NumberFor<Block>>,
>,
O: Sink<
SinkItem = (u64, ::grandpa::Commit<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>),
SinkError = CommandOrError<H256, NumberFor<Block>>,
>,
{
let global_in = commits_in.map(|(round, commit)| {
voter::CommunicationIn::Commit(round, commit, voter::Callback::Blank)
});
// NOTE: eventually this will also handle catch-up requests
let global_out = commits_out.with(|global| match global {
voter::CommunicationOut::Commit(round, commit) => Ok((round, commit)),
_ => unimplemented!(),
});
(global_in, global_out)
}
fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
local_key: Option<&Arc<ed25519::Pair>>,
set_id: u64,
voters: &Arc<VoterSet<AuthorityId>>,
client: &Arc<Client<B, E, Block, RA>>,
network: &NetworkBridge<Block, N>,
) -> (
impl Stream<
Item = (u64, ::grandpa::CompactCommit<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>),
Error = CommandOrError<H256, NumberFor<Block>>,
>,
impl Sink<
SinkItem = (u64, ::grandpa::Commit<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>),
SinkError = CommandOrError<H256, NumberFor<Block>>,
>,
) where
B: Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync,
@@ -395,6 +360,7 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
NumberFor<Block>: BlockNumberOps,
DigestItemFor<Block>: DigestItem<AuthorityId=AuthorityId>,
{
let is_voter = local_key
.map(|pair| voters.contains_key(&pair.public().into()))
.unwrap_or(false);
@@ -413,10 +379,26 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
commit_in,
);
let commit_in = commit_in.map_err(Into::into);
let commit_out = commit_out.sink_map_err(Into::into);
let commits_in = commit_in.map_err(CommandOrError::from);
let commits_out = commit_out.sink_map_err(CommandOrError::from);
(commit_in, commit_out)
let global_in = commits_in.map(|(round, commit, mut callback)| {
let callback = voter::Callback::Work(Box::new(move |outcome| match outcome {
voter::CommitProcessingOutcome::Good(_) =>
callback(communication::CommitProcessingOutcome::Good),
voter::CommitProcessingOutcome::Bad(_) =>
callback(communication::CommitProcessingOutcome::Bad),
}));
voter::CommunicationIn::Commit(round, commit, callback)
});
// NOTE: eventually this will also handle catch-up requests
let global_out = commits_out.with(|global| match global {
voter::CommunicationOut::Commit(round, commit) => Ok((round, commit)),
_ => unimplemented!(),
});
(global_in, global_out)
}
/// Register the finality tracker inherent data provider (which is used by
@@ -456,7 +438,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
link: LinkHalf<B, E, Block, RA>,
network: N,
inherent_data_providers: InherentDataProviders,
on_exit: impl Future<Item=(),Error=()> + Send + 'static,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
@@ -470,7 +452,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
{
use futures::future::{self, Loop as FutureLoop};
let network = NetworkBridge::new(network, config.clone());
let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone());
let LinkHalf {
client,
@@ -538,7 +520,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
chain_info.chain.finalized_number,
);
let (commit_in, commit_out) = committer_communication(
let global_comms = global_communication(
config.local_key.as_ref(),
env.set_id,
&env.voters,
@@ -546,11 +528,6 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
&network,
);
let global_comms = global_communication::<Block, _, _>(
commit_in,
commit_out,
);
let voters = (*env.voters).clone();
let last_completed_round = completed_rounds.last();
@@ -675,5 +652,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e);
});
let voter_work = network_startup.and_then(move |()| voter_work);
Ok(voter_work.select(on_exit).then(|_| Ok(())))
}
+22 -5
View File
@@ -204,11 +204,27 @@ impl Network<Block> for MessageRouting {
})
}
fn report(&self, _who: network::PeerId, _cost_benefit: i32) {
}
fn announce(&self, _block: Hash) {
}
}
#[derive(Clone)]
struct Exit;
impl Future for Exit {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
}
}
#[derive(Default, Clone)]
struct TestApi {
genesis_authorities: Vec<(AuthorityId, u64)>,
@@ -402,7 +418,7 @@ fn run_to_completion_with<F: FnOnce()>(
link,
MessageRouting::new(net.clone(), peer_id),
InherentDataProviders::new(),
futures::empty(),
Exit,
).expect("all in order with client and network");
assert_send(&voter);
@@ -503,7 +519,7 @@ fn finalize_3_voters_1_observer() {
link,
MessageRouting::new(net.clone(), peer_id),
InherentDataProviders::new(),
futures::empty(),
Exit,
).expect("all in order with client and network");
runtime.spawn(voter);
@@ -665,7 +681,7 @@ fn transition_3_voters_twice_1_observer() {
link,
MessageRouting::new(net.clone(), peer_id),
InherentDataProviders::new(),
futures::empty(),
Exit,
).expect("all in order with client and network");
runtime.spawn(voter);
@@ -1065,7 +1081,7 @@ fn voter_persists_its_votes() {
link,
MessageRouting::new(net.clone(), 0),
InherentDataProviders::new(),
futures::empty(),
Exit,
).expect("all in order with client and network");
let voter = future::poll_fn(move || {
@@ -1112,7 +1128,8 @@ fn voter_persists_its_votes() {
name: Some(format!("peer#{}", 1)),
};
let routing = MessageRouting::new(net.clone(), 1);
let network = communication::NetworkBridge::new(routing, config.clone());
let (network, routing_work) = communication::NetworkBridge::new(routing, config.clone(), Exit);
runtime.block_on(routing_work).unwrap();
let (round_rx, round_tx) = network.round_communication(
communication::Round(1),
@@ -258,13 +258,13 @@ pub(crate) type UntilVoteTargetImported<Block, Status, I> = UntilImported<Block,
///
/// This is used for compact commits which have already been checked for
/// structural soundness.
pub(crate) struct BlockCommitMessage<Block: BlockT> {
inner: Arc<(AtomicUsize, Mutex<Option<(u64, CompactCommit<Block>)>>)>,
pub(crate) struct BlockCommitMessage<Block: BlockT, U> {
inner: Arc<(AtomicUsize, Mutex<Option<(u64, CompactCommit<Block>, U)>>)>,
target_number: NumberFor<Block>,
}
impl<Block: BlockT> BlockUntilImported<Block> for BlockCommitMessage<Block> {
type Blocked = (u64, CompactCommit<Block>);
impl<Block: BlockT, U> BlockUntilImported<Block> for BlockCommitMessage<Block, U> {
type Blocked = (u64, CompactCommit<Block>, U);
fn schedule_wait<S, Wait, Ready>(
input: Self::Blocked,
@@ -400,11 +400,11 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockCommitMessage<Block> {
/// A stream which gates off incoming commit messages until all referenced
/// block hashes have been imported.
pub(crate) type UntilCommitBlocksImported<Block, Status, I> = UntilImported<
pub(crate) type UntilCommitBlocksImported<Block, Status, I, U> = UntilImported<
Block,
Status,
I,
BlockCommitMessage<Block>,
BlockCommitMessage<Block, U>,
>;
#[cfg(test)]
@@ -507,7 +507,7 @@ mod tests {
commit_rx.map_err(|_| panic!("should never error")),
);
commit_tx.unbounded_send((0, unknown_commit.clone())).unwrap();
commit_tx.unbounded_send((0, unknown_commit.clone(), ())).unwrap();
let inner_chain_state = chain_state.clone();
let work = until_imported
@@ -527,7 +527,7 @@ mod tests {
});
let mut runtime = Runtime::new().unwrap();
assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, unknown_commit)));
assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, unknown_commit, ())));
}
#[test]
@@ -567,11 +567,11 @@ mod tests {
commit_rx.map_err(|_| panic!("should never error")),
);
commit_tx.unbounded_send((0, known_commit.clone())).unwrap();
commit_tx.unbounded_send((0, known_commit.clone(), ())).unwrap();
let work = until_imported.into_future();
let mut runtime = Runtime::new().unwrap();
assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, known_commit)));
assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, known_commit, ())));
}
}
+4 -1
View File
@@ -38,7 +38,10 @@ pub mod specialization;
pub mod test;
pub use chain::Client as ClientHandle;
pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg, SyncProvider, ExHashT};
pub use service::{
Service, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg,
SyncProvider, ExHashT, ReportHandle,
};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{
+25 -1
View File
@@ -45,7 +45,6 @@ pub use network_libp2p::PeerId;
/// Type that represents fetch completion future.
pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
/// Sync status
pub trait SyncProvider<B: BlockT>: Send + Sync {
/// Get a stream of sync statuses.
@@ -129,6 +128,20 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
}
}
/// A cloneable handle for reporting cost/benefits of peers.
#[derive(Clone)]
pub struct ReportHandle {
inner: PeersetHandle, // wraps it so we don't have to worry about breaking API.
}
impl ReportHandle {
/// Report a given peer as either beneficial (+) or costly (-) according to the
/// given scalar.
pub fn report_peer(&self, who: PeerId, cost_benefit: i32) {
self.inner.report_peer(who, cost_benefit);
}
}
/// Substrate network service. Handles network IO and manages connectivity.
pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
/// Sinks to propagate status updates.
@@ -268,6 +281,17 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
));
}
/// Return a cloneable handle for reporting peers' benefits or misbehavior.
pub fn report_handle(&self) -> ReportHandle {
ReportHandle { inner: self.peerset.clone() }
}
/// Report a given peer as either beneficial (+) or costly (-) according to the
/// given scalar.
pub fn report_peer(&self, who: PeerId, cost_benefit: i32) {
self.peerset.report_peer(who, cost_benefit);
}
/// Execute a closure with the chain-specific network specialization.
pub fn with_spec<F>(&self, f: F)
where F: FnOnce(&mut S, &mut Context<B>) + Send + 'static