// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
//! Incoming message streams that verify signatures, and outgoing message streams
//! that sign or re-shape.
use std::collections::HashMap;
use std::sync::Arc;
use grandpa::VoterSet;
use grandpa::Message::{Prevote, Precommit};
use futures::prelude::*;
use futures::sync::mpsc;
use log::{debug, trace};
use parity_codec::{Encode, Decode};
use substrate_primitives::{ed25519, Pair};
use substrate_telemetry::{telemetry, CONSENSUS_INFO};
use runtime_primitives::traits::Block as BlockT;
use tokio::timer::Interval;
use crate::{Error, Network, Message, SignedMessage, Commit,
CompactCommit, GossipMessage, FullCommitMessage, VoteOrPrecommitMessage};
use ed25519::{Public as AuthorityId, Signature as AuthoritySignature};
fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec {
(message, round, set_id).encode()
}
#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
struct Round(u64);
#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
struct SetId(u64);
enum Broadcast {
// round, set id, encoded commit.
Commit(Round, SetId, Vec),
// round, set id, encoded signed message.
Message(Round, SetId, Vec),
// round, set id, announcement of block hash that should be downloaded
Announcement(Round, SetId, Block::Hash),
// round, set id being dropped.
DropRound(Round, SetId),
// set_id being dropped.
DropSet(SetId),
}
impl Broadcast {
fn set_id(&self) -> SetId {
match *self {
Broadcast::Commit(_, s, _) => s,
Broadcast::Message(_, s, _) => s,
Broadcast::Announcement(_, s, _) => s,
Broadcast::DropRound(_, s) => s,
Broadcast::DropSet(s) => s,
}
}
}
/// Produces a future that should be run in the background and proxies
/// and rebroadcasts messages.
pub(crate) fn rebroadcasting_network>(network: N) -> (BroadcastWorker, BroadcastHandle) {
use std::time::Duration;
const REBROADCAST_PERIOD: Duration = Duration::from_secs(60);
let (tx, rx) = mpsc::unbounded();
(
BroadcastWorker {
interval: Interval::new_interval(REBROADCAST_PERIOD),
set_id: SetId(0), // will be overwritten on first item to broadcast.
last_commit: None,
round_messages: (Round(0), Vec::new()),
announcements: HashMap::new(),
network: network.clone(),
incoming_broadcast: rx,
},
BroadcastHandle {
relay: tx,
network,
},
)
}
// A worker which broadcasts messages to the background, potentially
// rebroadcasting.
#[must_use = "network rebroadcast future must be driven to completion"]
pub(crate) struct BroadcastWorker> {
interval: Interval,
set_id: SetId,
last_commit: Option<(Round, Vec)>,
round_messages: (Round, Vec>),
announcements: HashMap,
network: N,
incoming_broadcast: mpsc::UnboundedReceiver>,
}
/// A handle used by communication work to broadcast to network.
#[derive(Clone)]
pub(crate) struct BroadcastHandle {
relay: mpsc::UnboundedSender>,
network: N,
}
impl> Future for BroadcastWorker {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
{
let mut rebroadcast = false;
loop {
match self.interval.poll().map_err(Error::Timer)? {
Async::NotReady => break,
Async::Ready(_) => { rebroadcast = true; }
}
}
if rebroadcast {
let SetId(set_id) = self.set_id;
if let Some((Round(c_round), ref c_commit)) = self.last_commit {
self.network.send_commit(c_round, set_id, c_commit.clone());
}
let Round(round) = self.round_messages.0;
for message in self.round_messages.1.iter().cloned() {
self.network.send_message(round, set_id, message);
}
for (&announce_hash, &Round(round)) in &self.announcements {
self.network.announce(round, set_id, announce_hash);
}
}
}
loop {
match self.incoming_broadcast.poll().expect("UnboundedReceiver does not yield errors; qed") {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Err(Error::Network(
"all broadcast handles dropped, connection to network severed".into()
)),
Async::Ready(Some(item)) => {
if item.set_id() > self.set_id {
self.set_id = item.set_id();
self.last_commit = None;
self.round_messages = (Round(0), Vec::new());
self.announcements.clear();
}
match item {
Broadcast::Commit(round, set_id, commit) => {
if self.set_id == set_id {
if round >= self.last_commit.as_ref()
.map_or(Round(0), |&(r, _)| r)
{
self.last_commit = Some((round, commit.clone()));
}
}
// always send out to network.
self.network.send_commit(round.0, self.set_id.0, commit);
}
Broadcast::Message(round, set_id, message) => {
if self.set_id == set_id {
if round > self.round_messages.0 {
self.round_messages = (round, vec![message.clone()]);
} else if round == self.round_messages.0 {
self.round_messages.1.push(message.clone());
};
// ignore messages from earlier rounds.
}
// always send out to network.
self.network.send_message(round.0, set_id.0, message);
}
Broadcast::Announcement(round, set_id, hash) => {
if self.set_id == set_id {
self.announcements.insert(hash, round);
}
// always send out.
self.network.announce(round.0, set_id.0, hash);
}
Broadcast::DropRound(round, set_id) => {
// stop making announcements for any dead rounds.
self.announcements.retain(|_, &mut r| r > round);
self.network.drop_round_messages(round.0, set_id.0);
}
Broadcast::DropSet(set_id) => {
// stop making announcements for any dead rounds.
self.network.drop_set_messages(set_id.0);
}
}
}
}
}
}
}
impl> Network for BroadcastHandle {
type In = N::In;
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
self.network.messages_for(round, set_id)
}
fn send_message(&self, round: u64, set_id: u64, message: Vec) {
let _ = self.relay.unbounded_send(Broadcast::Message(Round(round), SetId(set_id), message));
}
fn drop_round_messages(&self, round: u64, set_id: u64) {
let _ = self.relay.unbounded_send(Broadcast::DropRound(Round(round), SetId(set_id)));
}
fn drop_set_messages(&self, set_id: u64) {
let _ = self.relay.unbounded_send(Broadcast::DropSet(SetId(set_id)));
}
fn commit_messages(&self, set_id: u64) -> Self::In {
self.network.commit_messages(set_id)
}
fn send_commit(&self, round: u64, set_id: u64, message: Vec) {
let _ = self.relay.unbounded_send(Broadcast::Commit(Round(round), SetId(set_id), message));
}
fn announce(&self, round: u64, set_id: u64, block: B::Hash) {
let _ = self.relay.unbounded_send(
Broadcast::Announcement(Round(round), SetId(set_id), block)
);
}
}
// check a message.
pub(crate) fn check_message_sig(
message: &Message,
id: &AuthorityId,
signature: &AuthoritySignature,
round: u64,
set_id: u64,
) -> Result<(), ()> {
let as_public = AuthorityId::from_raw(id.0);
let encoded_raw = localized_payload(round, set_id, message);
if ed25519::Pair::verify(signature, &encoded_raw, as_public) {
Ok(())
} else {
debug!(target: "afg", "Bad signature on message from {:?}", id);
Err(())
}
}
/// converts a message stream into a stream of signed messages.
/// the output stream checks signatures also.
pub(crate) fn checked_message_stream(
inner: S,
voters: Arc>,
)
-> impl Stream- ,Error=Error> where
S: Stream
- ,Error=()>
{
inner
.filter_map(|raw| {
let decoded = GossipMessage::::decode(&mut &raw[..]);
if decoded.is_none() {
debug!(target: "afg", "Skipping malformed message {:?}", raw);
}
decoded
})
.and_then(move |msg| {
match msg {
GossipMessage::VoteOrPrecommit(msg) => {
// check signature.
if !voters.contains_key(&msg.message.id) {
debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
return Ok(None);
}
match &msg.message.message {
Prevote(prevote) => {
telemetry!(CONSENSUS_INFO; "afg.received_prevote";
"voter" => ?format!("{}", msg.message.id),
"target_number" => ?prevote.target_number,
"target_hash" => ?prevote.target_hash,
);
},
Precommit(precommit) => {
telemetry!(CONSENSUS_INFO; "afg.received_precommit";
"voter" => ?format!("{}", msg.message.id),
"target_number" => ?precommit.target_number,
"target_hash" => ?precommit.target_hash,
);
},
};
Ok(Some(msg.message))
}
_ => {
debug!(target: "afg", "Skipping unknown message type");
return Ok(None);
}
}
})
.filter_map(|x| x)
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}
pub(crate) struct OutgoingMessages> {
round: u64,
set_id: u64,
locals: Option<(Arc, AuthorityId)>,
sender: mpsc::UnboundedSender>,
network: N,
}
impl> Sink for OutgoingMessages
{
type SinkItem = Message;
type SinkError = Error;
fn start_send(&mut self, msg: Message) -> StartSend, Error> {
// when locals exist, sign messages on import
if let Some((ref pair, ref local_id)) = self.locals {
let encoded = localized_payload(self.round, self.set_id, &msg);
let signature = pair.sign(&encoded[..]);
let target_hash = msg.target().0.clone();
let signed = SignedMessage:: {
message: msg,
signature,
id: local_id.clone(),
};
let message = GossipMessage::VoteOrPrecommit(VoteOrPrecommitMessage:: {
message: signed.clone(),
round: self.round,
set_id: self.set_id,
});
// announce our block hash to peers and propagate the
// message.
self.network.announce(self.round, self.set_id, target_hash);
self.network.send_message(self.round, self.set_id, message.encode());
// forward the message to the inner sender.
let _ = self.sender.unbounded_send(signed);
}
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
fn close(&mut self) -> Poll<(), Error> {
// ignore errors since we allow this inner sender to be closed already.
self.sender.close().or_else(|_| Ok(Async::Ready(())))
}
}
impl> Drop for OutgoingMessages {
fn drop(&mut self) {
self.network.drop_round_messages(self.round, self.set_id);
}
}
/// A sink for outgoing messages. This signs the messages with the key,
/// if we are an authority. A stream for the signed messages is also returned.
///
/// A future can push unsigned messages into the sink. They will be automatically
/// broadcast to the network. The returned stream should be combined with other input.
pub(crate) fn outgoing_messages>(
round: u64,
set_id: u64,
local_key: Option>,
voters: Arc>,
network: N,
) -> (
impl Stream
- ,Error=Error>,
OutgoingMessages,
) {
let locals = local_key.and_then(|pair| {
let public = pair.public();
let id = AuthorityId(public.0);
if voters.contains_key(&id) {
Some((pair, id))
} else {
None
}
});
let (tx, rx) = mpsc::unbounded();
let outgoing = OutgoingMessages:: {
round,
set_id,
network,
locals,
sender: tx,
};
let rx = rx.map_err(move |()| Error::Network(
format!("Failed to receive on unbounded receiver for round {}", round)
));
(rx, outgoing)
}
fn check_compact_commit(
msg: CompactCommit,
voters: &VoterSet,
) -> Option> {
if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() {
debug!(target: "afg", "Skipping malformed compact commit");
return None;
}
// check signatures on all contained precommits.
for (_, ref id) in &msg.auth_data {
if !voters.contains_key(id) {
debug!(target: "afg", "Skipping commit containing unknown voter {}", id);
return None;
}
}
Some(msg)
}
/// A stream for incoming commit messages. This checks all the signatures on the
/// messages.
pub(crate) fn checked_commit_stream(
inner: S,
voters: Arc>,
)
-> impl Stream
- ),Error=Error> where
S: Stream
- ,Error=()>
{
inner
.filter_map(|raw| {
// this could be optimized by decoding piecewise.
let decoded = GossipMessage::::decode(&mut &raw[..]);
if decoded.is_none() {
trace!(target: "afg", "Skipping malformed commit message {:?}", raw);
}
decoded
})
.filter_map(move |msg| {
match msg {
GossipMessage::Commit(msg) => {
let round = msg.round;
let precommits_signed_by: Vec =
msg.message.auth_data.iter().map(move |(_, a)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.received_commit";
"contains_precommits_signed_by" => ?precommits_signed_by,
"target_number" => ?msg.message.target_number,
"target_hash" => ?msg.message.target_hash,
);
check_compact_commit::(msg.message, &*voters).map(move |c| (round, c))
},
_ => {
debug!(target: "afg", "Skipping unknown message type");
return None;
}
}
})
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}
/// An output sink for commit messages.
pub(crate) struct CommitsOut> {
network: N,
set_id: u64,
_marker: ::std::marker::PhantomData,
is_voter: bool,
}
impl> CommitsOut {
/// Create a new commit output stream.
pub(crate) fn new(network: N, set_id: u64, is_voter: bool) -> Self {
CommitsOut {
network,
set_id,
is_voter,
_marker: Default::default(),
}
}
}
impl> Sink for CommitsOut {
type SinkItem = (u64, Commit);
type SinkError = Error;
fn start_send(&mut self, input: (u64, Commit)) -> StartSend {
if !self.is_voter {
return Ok(AsyncSink::Ready);
}
let (round, commit) = input;
telemetry!(CONSENSUS_INFO; "afg.commit_issued";
"target_number" => ?commit.target_number, "target_hash" => ?commit.target_hash,
);
let (precommits, auth_data) = commit.precommits.into_iter()
.map(|signed| (signed.precommit, (signed.signature, signed.id)))
.unzip();
let compact_commit = CompactCommit:: {
target_hash: commit.target_hash,
target_number: commit.target_number,
precommits,
auth_data
};
let message = GossipMessage::Commit(FullCommitMessage:: {
round: round,
set_id: self.set_id,
message: compact_commit,
});
self.network.send_commit(round, self.set_id, Encode::encode(&message));
Ok(AsyncSink::Ready)
}
fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
}
impl> Drop for CommitsOut {
fn drop(&mut self) {
self.network.drop_set_messages(self.set_id);
}
}