client/finality-grandpa: Make round_communication use bounded channel (#4691)

* clinet/finality-grandpa: Make round_communication use bounded channel

`round_communication` returns a `Sink` and a `Stream` for outgoing and
incoming messages. The messages send into the `Sink` are forwarded down
to the network as well as send back into the `Stream` to ensure the node
processes its own messages.

So far, to send messages into the `Sink` back into the `Stream`, an
unbounded channel was used. This patch updates `round_communication` and
`OutgoingMessages` to use a bounded channel.

This is part of a greater effort to reduce the number of owners of
components within `finality-grandpa` and `network` as well as to reduce
the amount of unbounded channels. For details see d9837d7dd and
5f80929dc.

* client/finality-grandpa: Import futures03::future::ready at the top

* client/finality-grandpa: Make tests use compat of future 03

* client/finality-grandpa: Do not import ready into scope

Instead of importing futures03::future::ready into the scope, only
import futures::future03 into scope and call ready as furure03::ready.
This commit is contained in:
Max Inden
2020-01-23 12:58:36 +01:00
committed by Robert Habermeier
parent 4b2e6a5be2
commit 597c0a6c4f
3 changed files with 77 additions and 48 deletions
@@ -27,12 +27,13 @@
//! In the future, there will be a fallback for allowing sending the same message //! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol. //! under certain conditions that are used to un-stick the protocol.
use futures::{prelude::*, sync::mpsc}; use futures::prelude::*;
use futures03::{ use futures03::{
channel::mpsc as mpsc03, channel::mpsc as mpsc03,
compat::Compat, compat::Compat,
future::{Future as Future03}, future::{self as future03, Future as Future03},
stream::StreamExt, sink::Sink as Sink03,
stream::{Stream as Stream03, StreamExt},
}; };
use log::{debug, trace}; use log::{debug, trace};
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -276,8 +277,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
local_key: Option<AuthorityPair>, local_key: Option<AuthorityPair>,
has_voted: HasVoted<B>, has_voted: HasVoted<B>,
) -> ( ) -> (
impl Stream<Item=SignedMessage<B>,Error=Error>, impl Stream03<Item=SignedMessage<B>> + Unpin,
impl Sink<SinkItem=Message<B>,SinkError=Error>, OutgoingMessages<B>,
) { ) {
self.note_round( self.note_round(
round, round,
@@ -295,22 +296,20 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}); });
let topic = round_topic::<B>(round.0, set_id.0); let topic = round_topic::<B>(round.0, set_id.0);
let incoming = Compat::new(self.gossip_engine.messages_for(topic) let incoming = self.gossip_engine.messages_for(topic)
.map(|item| Ok::<_, ()>(item))) .filter_map(move |notification| {
.filter_map(|notification| {
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]); let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
if let Err(ref e) = decoded {
match decoded {
Err(ref e) => {
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
return future03::ready(None);
} }
decoded.ok() Ok(GossipMessage::Vote(msg)) => {
})
.and_then(move |msg| {
match msg {
GossipMessage::Vote(msg) => {
// check signature. // check signature.
if !voters.contains_key(&msg.message.id) { if !voters.contains_key(&msg.message.id) {
debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
return Ok(None); return future03::ready(None);
} }
if voters.len() <= TELEMETRY_VOTERS_LIMIT { if voters.len() <= TELEMETRY_VOTERS_LIMIT {
@@ -339,18 +338,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}; };
} }
Ok(Some(msg.message)) future03::ready(Some(msg.message))
} }
_ => { _ => {
debug!(target: "afg", "Skipping unknown message type"); debug!(target: "afg", "Skipping unknown message type");
return Ok(None); return future03::ready(None);
} }
} }
}) });
.filter_map(|x| x)
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
let (tx, out_rx) = mpsc::unbounded(); let (tx, out_rx) = mpsc03::channel(0);
let outgoing = OutgoingMessages::<B> { let outgoing = OutgoingMessages::<B> {
round: round.0, round: round.0,
set_id: set_id.0, set_id: set_id.0,
@@ -360,14 +357,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
has_voted, has_voted,
}; };
let out_rx = out_rx.map_err(move |()| Error::Network(
format!("Failed to receive on unbounded receiver for round {}", round.0)
));
// Combine incoming votes from external GRANDPA nodes with outgoing // Combine incoming votes from external GRANDPA nodes with outgoing
// votes from our own GRANDPA voter to have a single // votes from our own GRANDPA voter to have a single
// vote-import-pipeline. // vote-import-pipeline.
let incoming = incoming.select(out_rx); let incoming = futures03::stream::select(incoming, out_rx);
(incoming, outgoing) (incoming, outgoing)
} }
@@ -690,21 +683,29 @@ pub(crate) fn check_message_sig_with_buffer<Block: BlockT>(
/// use the same raw message and key to sign. This is currently true for /// use the same raw message and key to sign. This is currently true for
/// `ed25519` and `BLS` signatures (which we might use in the future), care must /// `ed25519` and `BLS` signatures (which we might use in the future), care must
/// be taken when switching to different key types. /// be taken when switching to different key types.
struct OutgoingMessages<Block: BlockT> { pub(crate) struct OutgoingMessages<Block: BlockT> {
round: RoundNumber, round: RoundNumber,
set_id: SetIdNumber, set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>, locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::UnboundedSender<SignedMessage<Block>>, sender: mpsc03::Sender<SignedMessage<Block>>,
network: GossipEngine<Block>, network: GossipEngine<Block>,
has_voted: HasVoted<Block>, has_voted: HasVoted<Block>,
} }
impl<Block: BlockT> Sink for OutgoingMessages<Block> impl<B: BlockT> Unpin for OutgoingMessages<B> {}
{
type SinkItem = Message<Block>;
type SinkError = Error;
fn start_send(&mut self, mut msg: Message<Block>) -> StartSend<Message<Block>, Error> { impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
{
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_ready(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
})})
}
fn start_send(mut self: Pin<&mut Self>, mut msg: Message<Block>) -> Result<(), Self::Error> {
// if we've voted on this round previously under the same key, send that vote instead // if we've voted on this round previously under the same key, send that vote instead
match &mut msg { match &mut msg {
finality_grandpa::Message::PrimaryPropose(ref mut vote) => finality_grandpa::Message::PrimaryPropose(ref mut vote) =>
@@ -760,17 +761,23 @@ impl<Block: BlockT> Sink for OutgoingMessages<Block>
self.network.gossip_message(topic, message.encode(), false); self.network.gossip_message(topic, message.encode(), false);
// forward the message to the inner sender. // forward the message to the inner sender.
let _ = self.sender.unbounded_send(signed); return self.sender.start_send(signed).map_err(|e| {
Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
});
};
Ok(())
} }
Ok(AsyncSink::Ready) fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Poll03::Ready(Ok(()))
} }
fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_close(Pin::new(&mut self.sender), cx)
fn close(&mut self) -> Poll<(), Error> { .map(|elem| { elem.map_err(|e| {
// ignore errors since we allow this inner sender to be closed already. Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
self.sender.close().or_else(|_| Ok(Async::Ready(()))) })})
} }
} }
@@ -22,7 +22,11 @@ use std::time::Duration;
use log::{debug, warn, info}; use log::{debug, warn, info};
use parity_scale_codec::{Decode, Encode}; use parity_scale_codec::{Decode, Encode};
use futures::prelude::*; use futures::prelude::*;
use futures03::future::{FutureExt as _, TryFutureExt as _}; use futures03::{
compat::{Compat, CompatSink},
future::{FutureExt as _, TryFutureExt as _},
stream::StreamExt as _,
};
use futures_timer::Delay; use futures_timer::Delay;
use parking_lot::RwLock; use parking_lot::RwLock;
use sp_blockchain::{HeaderBackend, Error as ClientError}; use sp_blockchain::{HeaderBackend, Error as ClientError};
@@ -608,6 +612,9 @@ where
has_voted, has_voted,
); );
let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item)));
let outgoing = CompatSink::new(outgoing);
// schedule incoming messages from the network to be held until // schedule incoming messages from the network to be held until
// corresponding blocks are imported. // corresponding blocks are imported.
let incoming = Box::new(UntilVoteTargetImported::new( let incoming = Box::new(UntilVoteTargetImported::new(
+20 -5
View File
@@ -37,15 +37,17 @@ use sp_consensus::{
BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport, BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport,
import_queue::{BoxJustificationImport, BoxFinalityProofImport}, import_queue::{BoxJustificationImport, BoxFinalityProofImport},
}; };
use std::collections::{HashMap, HashSet}; use std::{
use std::result; collections::{HashMap, HashSet},
result,
pin::Pin, task,
};
use parity_scale_codec::Decode; use parity_scale_codec::Decode;
use sp_runtime::traits::{Header as HeaderT, HasherFor}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HasherFor};
use sp_runtime::generic::{BlockId, DigestItem}; use sp_runtime::generic::{BlockId, DigestItem};
use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public}; use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public};
use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi}; use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check}; use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check};
use std::{pin::Pin, task};
use authorities::AuthoritySet; use authorities::AuthoritySet;
use finality_proof::{ use finality_proof::{
@@ -1282,6 +1284,9 @@ fn voter_persists_its_votes() {
HasVoted::No, HasVoted::No,
); );
let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item)));
let round_tx = futures03::compat::CompatSink::new(round_tx);
let round_tx = Arc::new(Mutex::new(round_tx)); let round_tx = Arc::new(Mutex::new(round_tx));
let exit_tx = Arc::new(Mutex::new(Some(exit_tx))); let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));
@@ -1332,7 +1337,17 @@ fn voter_persists_its_votes() {
target_hash: block_30_hash, target_hash: block_30_hash,
}; };
round_tx.lock().start_send(finality_grandpa::Message::Prevote(prevote)).unwrap(); // One should either be calling `Sink::send` or `Sink::start_send` followed
// by `Sink::poll_complete` to make sure items are being flushed. Given that
// we send in a loop including a delay until items are received, this can be
// ignored for the sake of reduced complexity.
if !round_tx.lock()
.start_send(finality_grandpa::Message::Prevote(prevote))
.unwrap()
.is_ready() {
panic!("expected sink to be ready to write to.");
}
Ok(()) Ok(())
}).map_err(|_| panic!())) }).map_err(|_| panic!()))