mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 09:57:56 +00:00
client/finality-grandpa: Make round_communication use bounded channel (#4691)
* clinet/finality-grandpa: Make round_communication use bounded channel `round_communication` returns a `Sink` and a `Stream` for outgoing and incoming messages. The messages send into the `Sink` are forwarded down to the network as well as send back into the `Stream` to ensure the node processes its own messages. So far, to send messages into the `Sink` back into the `Stream`, an unbounded channel was used. This patch updates `round_communication` and `OutgoingMessages` to use a bounded channel. This is part of a greater effort to reduce the number of owners of components within `finality-grandpa` and `network` as well as to reduce the amount of unbounded channels. For details seed9837d7ddand5f80929dc. * client/finality-grandpa: Import futures03::future::ready at the top * client/finality-grandpa: Make tests use compat of future 03 * client/finality-grandpa: Do not import ready into scope Instead of importing futures03::future::ready into the scope, only import futures::future03 into scope and call ready as furure03::ready.
This commit is contained in:
committed by
Robert Habermeier
parent
4b2e6a5be2
commit
597c0a6c4f
@@ -27,12 +27,13 @@
|
||||
//! In the future, there will be a fallback for allowing sending the same message
|
||||
//! under certain conditions that are used to un-stick the protocol.
|
||||
|
||||
use futures::{prelude::*, sync::mpsc};
|
||||
use futures::prelude::*;
|
||||
use futures03::{
|
||||
channel::mpsc as mpsc03,
|
||||
compat::Compat,
|
||||
future::{Future as Future03},
|
||||
stream::StreamExt,
|
||||
future::{self as future03, Future as Future03},
|
||||
sink::Sink as Sink03,
|
||||
stream::{Stream as Stream03, StreamExt},
|
||||
};
|
||||
use log::{debug, trace};
|
||||
use parking_lot::Mutex;
|
||||
@@ -276,8 +277,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
local_key: Option<AuthorityPair>,
|
||||
has_voted: HasVoted<B>,
|
||||
) -> (
|
||||
impl Stream<Item=SignedMessage<B>,Error=Error>,
|
||||
impl Sink<SinkItem=Message<B>,SinkError=Error>,
|
||||
impl Stream03<Item=SignedMessage<B>> + Unpin,
|
||||
OutgoingMessages<B>,
|
||||
) {
|
||||
self.note_round(
|
||||
round,
|
||||
@@ -295,22 +296,20 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
});
|
||||
|
||||
let topic = round_topic::<B>(round.0, set_id.0);
|
||||
let incoming = Compat::new(self.gossip_engine.messages_for(topic)
|
||||
.map(|item| Ok::<_, ()>(item)))
|
||||
.filter_map(|notification| {
|
||||
let incoming = self.gossip_engine.messages_for(topic)
|
||||
.filter_map(move |notification| {
|
||||
let decoded = GossipMessage::<B>::decode(&mut ¬ification.message[..]);
|
||||
if let Err(ref e) = decoded {
|
||||
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
|
||||
}
|
||||
decoded.ok()
|
||||
})
|
||||
.and_then(move |msg| {
|
||||
match msg {
|
||||
GossipMessage::Vote(msg) => {
|
||||
|
||||
match decoded {
|
||||
Err(ref e) => {
|
||||
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
|
||||
return future03::ready(None);
|
||||
}
|
||||
Ok(GossipMessage::Vote(msg)) => {
|
||||
// check signature.
|
||||
if !voters.contains_key(&msg.message.id) {
|
||||
debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
|
||||
return Ok(None);
|
||||
return future03::ready(None);
|
||||
}
|
||||
|
||||
if voters.len() <= TELEMETRY_VOTERS_LIMIT {
|
||||
@@ -339,18 +338,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
};
|
||||
}
|
||||
|
||||
Ok(Some(msg.message))
|
||||
future03::ready(Some(msg.message))
|
||||
}
|
||||
_ => {
|
||||
debug!(target: "afg", "Skipping unknown message type");
|
||||
return Ok(None);
|
||||
return future03::ready(None);
|
||||
}
|
||||
}
|
||||
})
|
||||
.filter_map(|x| x)
|
||||
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
|
||||
});
|
||||
|
||||
let (tx, out_rx) = mpsc::unbounded();
|
||||
let (tx, out_rx) = mpsc03::channel(0);
|
||||
let outgoing = OutgoingMessages::<B> {
|
||||
round: round.0,
|
||||
set_id: set_id.0,
|
||||
@@ -360,14 +357,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|
||||
has_voted,
|
||||
};
|
||||
|
||||
let out_rx = out_rx.map_err(move |()| Error::Network(
|
||||
format!("Failed to receive on unbounded receiver for round {}", round.0)
|
||||
));
|
||||
|
||||
// Combine incoming votes from external GRANDPA nodes with outgoing
|
||||
// votes from our own GRANDPA voter to have a single
|
||||
// vote-import-pipeline.
|
||||
let incoming = incoming.select(out_rx);
|
||||
let incoming = futures03::stream::select(incoming, out_rx);
|
||||
|
||||
(incoming, outgoing)
|
||||
}
|
||||
@@ -690,21 +683,29 @@ pub(crate) fn check_message_sig_with_buffer<Block: BlockT>(
|
||||
/// use the same raw message and key to sign. This is currently true for
|
||||
/// `ed25519` and `BLS` signatures (which we might use in the future), care must
|
||||
/// be taken when switching to different key types.
|
||||
struct OutgoingMessages<Block: BlockT> {
|
||||
pub(crate) struct OutgoingMessages<Block: BlockT> {
|
||||
round: RoundNumber,
|
||||
set_id: SetIdNumber,
|
||||
locals: Option<(AuthorityPair, AuthorityId)>,
|
||||
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
|
||||
sender: mpsc03::Sender<SignedMessage<Block>>,
|
||||
network: GossipEngine<Block>,
|
||||
has_voted: HasVoted<Block>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT> Sink for OutgoingMessages<Block>
|
||||
{
|
||||
type SinkItem = Message<Block>;
|
||||
type SinkError = Error;
|
||||
impl<B: BlockT> Unpin for OutgoingMessages<B> {}
|
||||
|
||||
fn start_send(&mut self, mut msg: Message<Block>) -> StartSend<Message<Block>, Error> {
|
||||
impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
|
||||
{
|
||||
type Error = Error;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
|
||||
Sink03::poll_ready(Pin::new(&mut self.sender), cx)
|
||||
.map(|elem| { elem.map_err(|e| {
|
||||
Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
|
||||
})})
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, mut msg: Message<Block>) -> Result<(), Self::Error> {
|
||||
// if we've voted on this round previously under the same key, send that vote instead
|
||||
match &mut msg {
|
||||
finality_grandpa::Message::PrimaryPropose(ref mut vote) =>
|
||||
@@ -760,17 +761,23 @@ impl<Block: BlockT> Sink for OutgoingMessages<Block>
|
||||
self.network.gossip_message(topic, message.encode(), false);
|
||||
|
||||
// forward the message to the inner sender.
|
||||
let _ = self.sender.unbounded_send(signed);
|
||||
}
|
||||
return self.sender.start_send(signed).map_err(|e| {
|
||||
Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
|
||||
});
|
||||
};
|
||||
|
||||
Ok(AsyncSink::Ready)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
|
||||
Poll03::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), Error> {
|
||||
// ignore errors since we allow this inner sender to be closed already.
|
||||
self.sender.close().or_else(|_| Ok(Async::Ready(())))
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
|
||||
Sink03::poll_close(Pin::new(&mut self.sender), cx)
|
||||
.map(|elem| { elem.map_err(|e| {
|
||||
Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
|
||||
})})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,11 @@ use std::time::Duration;
|
||||
use log::{debug, warn, info};
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use futures::prelude::*;
|
||||
use futures03::future::{FutureExt as _, TryFutureExt as _};
|
||||
use futures03::{
|
||||
compat::{Compat, CompatSink},
|
||||
future::{FutureExt as _, TryFutureExt as _},
|
||||
stream::StreamExt as _,
|
||||
};
|
||||
use futures_timer::Delay;
|
||||
use parking_lot::RwLock;
|
||||
use sp_blockchain::{HeaderBackend, Error as ClientError};
|
||||
@@ -608,6 +612,9 @@ where
|
||||
has_voted,
|
||||
);
|
||||
|
||||
let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item)));
|
||||
let outgoing = CompatSink::new(outgoing);
|
||||
|
||||
// schedule incoming messages from the network to be held until
|
||||
// corresponding blocks are imported.
|
||||
let incoming = Box::new(UntilVoteTargetImported::new(
|
||||
|
||||
@@ -37,15 +37,17 @@ use sp_consensus::{
|
||||
BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport,
|
||||
import_queue::{BoxJustificationImport, BoxFinalityProofImport},
|
||||
};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::result;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
result,
|
||||
pin::Pin, task,
|
||||
};
|
||||
use parity_scale_codec::Decode;
|
||||
use sp_runtime::traits::{Header as HeaderT, HasherFor};
|
||||
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HasherFor};
|
||||
use sp_runtime::generic::{BlockId, DigestItem};
|
||||
use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public};
|
||||
use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
|
||||
use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check};
|
||||
use std::{pin::Pin, task};
|
||||
|
||||
use authorities::AuthoritySet;
|
||||
use finality_proof::{
|
||||
@@ -1282,6 +1284,9 @@ fn voter_persists_its_votes() {
|
||||
HasVoted::No,
|
||||
);
|
||||
|
||||
let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item)));
|
||||
let round_tx = futures03::compat::CompatSink::new(round_tx);
|
||||
|
||||
let round_tx = Arc::new(Mutex::new(round_tx));
|
||||
let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));
|
||||
|
||||
@@ -1332,7 +1337,17 @@ fn voter_persists_its_votes() {
|
||||
target_hash: block_30_hash,
|
||||
};
|
||||
|
||||
round_tx.lock().start_send(finality_grandpa::Message::Prevote(prevote)).unwrap();
|
||||
// One should either be calling `Sink::send` or `Sink::start_send` followed
|
||||
// by `Sink::poll_complete` to make sure items are being flushed. Given that
|
||||
// we send in a loop including a delay until items are received, this can be
|
||||
// ignored for the sake of reduced complexity.
|
||||
if !round_tx.lock()
|
||||
.start_send(finality_grandpa::Message::Prevote(prevote))
|
||||
.unwrap()
|
||||
.is_ready() {
|
||||
panic!("expected sink to be ready to write to.");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}).map_err(|_| panic!()))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user