Switch GrandPa to std futures (replaces #3909) (#4612)

* Switch GrandPa to new futures

* Work on making tests work

* until_imported tests working again

* Work on switching tests to stable futures

* Modifications

* Re-add test as #[ignore]

* Don't ignore

* Add manual unpins

* Remove Header import

* Return concrete Sink type

* Switch to crates.io finality-grandpa version

* Remove use statement that slipped in

* Fix some nitpicks

* Remove unpin from i

* Fixed typo

* Move futures01 to dev-deps

* Fix nitpicks

* Update client/finality-grandpa/src/communication/mod.rs

Co-Authored-By: André Silva <andre.beat@gmail.com>

* nitpicking

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
Co-authored-by: André Silva <andre.beat@gmail.com>
This commit is contained in:
Ashley
2020-01-24 13:34:42 +01:00
committed by GitHub
parent 14e95f3398
commit c2c429877e
16 changed files with 468 additions and 513 deletions
@@ -90,8 +90,7 @@ use sp_finality_grandpa::AuthorityId;
use sc_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug};
use futures::prelude::*;
use futures03::channel::mpsc;
use futures::channel::mpsc;
use rand::seq::SliceRandom;
use crate::{environment, CatchUp, CompactCommit, SignedMessage};
@@ -27,17 +27,10 @@
//! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol.
use futures::prelude::*;
use futures03::{
channel::mpsc as mpsc03,
compat::Compat,
future::{self as future03, Future as Future03},
sink::Sink as Sink03,
stream::{Stream as Stream03, StreamExt},
};
use futures::{prelude::*, channel::mpsc};
use log::{debug, trace};
use parking_lot::Mutex;
use std::{pin::Pin, sync::Arc, task::{Context, Poll as Poll03}};
use std::{pin::Pin, sync::Arc, task::{Context, Poll}};
use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use finality_grandpa::{voter, voter_set::VoterSet};
@@ -49,8 +42,8 @@ use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, Numb
use sc_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO};
use crate::{
CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error,
Message, SignedMessage,
CatchUp, Commit, CommunicationIn, CommunicationOutH,
CompactCommit, Error, Message, SignedMessage,
};
use crate::environment::HasVoted;
use gossip::{
@@ -171,7 +164,7 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
// thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given that it is
// just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer
// channel implementation.
gossip_validator_report_stream: Arc<Mutex<mpsc03::UnboundedReceiver<PeerReport>>>,
gossip_validator_report_stream: Arc<Mutex<mpsc::UnboundedReceiver<PeerReport>>>,
}
impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}
@@ -185,7 +178,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
service: N,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
executor: &impl futures03::task::Spawn,
executor: &impl futures::task::Spawn,
) -> Self {
let (validator, report_stream) = GossipValidator::new(
config,
@@ -277,7 +270,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
local_key: Option<AuthorityPair>,
has_voted: HasVoted<B>,
) -> (
impl Stream03<Item=SignedMessage<B>> + Unpin,
impl Stream<Item = SignedMessage<B>> + Unpin,
OutgoingMessages<B>,
) {
self.note_round(
@@ -303,13 +296,13 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
match decoded {
Err(ref e) => {
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
return future03::ready(None);
return future::ready(None);
}
Ok(GossipMessage::Vote(msg)) => {
// check signature.
if !voters.contains_key(&msg.message.id) {
debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
return future03::ready(None);
return future::ready(None);
}
if voters.len() <= TELEMETRY_VOTERS_LIMIT {
@@ -338,16 +331,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
};
}
future03::ready(Some(msg.message))
future::ready(Some(msg.message))
}
_ => {
debug!(target: "afg", "Skipping unknown message type");
return future03::ready(None);
return future::ready(None);
}
}
});
let (tx, out_rx) = mpsc03::channel(0);
let (tx, out_rx) = mpsc::channel(0);
let outgoing = OutgoingMessages::<B> {
round: round.0,
set_id: set_id.0,
@@ -360,7 +353,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
// Combine incoming votes from external GRANDPA nodes with outgoing
// votes from our own GRANDPA voter to have a single
// vote-import-pipeline.
let incoming = futures03::stream::select(incoming, out_rx);
let incoming = stream::select(incoming, out_rx);
(incoming, outgoing)
}
@@ -372,8 +365,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
voters: Arc<VoterSet<AuthorityId>>,
is_voter: bool,
) -> (
impl Stream<Item = CommunicationIn<B>, Error = Error>,
impl Sink<SinkItem = CommunicationOut<B>, SinkError = Error>,
impl Stream<Item = CommunicationIn<B>>,
impl Sink<CommunicationOutH<B, B::Hash>, Error = Error> + Unpin,
) {
self.validator.note_set(
set_id,
@@ -401,7 +394,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
let outgoing = outgoing.with(|out| {
let voter::CommunicationOut::Commit(round, commit) = out;
Ok((round, commit))
future::ok((round, commit))
});
(incoming, outgoing)
@@ -423,35 +416,35 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
}
impl<B: BlockT, N: Network<B>> Future03 for NetworkBridge<B, N> {
impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
Poll03::Ready(Some((to, packet))) => {
Poll::Ready(Some((to, packet))) => {
self.gossip_engine.send_message(to, packet.encode());
},
Poll03::Ready(None) => return Poll03::Ready(
Poll::Ready(None) => return Poll::Ready(
Err(Error::Network("Neighbor packet worker stream closed.".into()))
),
Poll03::Pending => break,
Poll::Pending => break,
}
}
loop {
match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
Poll03::Ready(Some(PeerReport { who, cost_benefit })) => {
Poll::Ready(Some(PeerReport { who, cost_benefit })) => {
self.gossip_engine.report(who, cost_benefit);
},
Poll03::Ready(None) => return Poll03::Ready(
Poll::Ready(None) => return Poll::Ready(
Err(Error::Network("Gossip validator report stream closed.".into()))
),
Poll03::Pending => break,
Poll::Pending => break,
}
}
Poll03::Pending
Poll::Pending
}
}
@@ -461,7 +454,7 @@ fn incoming_global<B: BlockT>(
voters: Arc<VoterSet<AuthorityId>>,
gossip_validator: Arc<GossipValidator<B>>,
neighbor_sender: periodic::NeighborPacketSender<B>,
) -> impl Stream<Item = CommunicationIn<B>, Error = Error> {
) -> impl Stream<Item = CommunicationIn<B>> {
let process_commit = move |
msg: FullCommitMessage<B>,
mut notification: sc_network_gossip::TopicNotification,
@@ -564,29 +557,27 @@ fn incoming_global<B: BlockT>(
Some(voter::CommunicationIn::CatchUp(msg.message, cb))
};
Compat::new(gossip_engine.messages_for(topic)
.map(|m| Ok::<_, ()>(m)))
gossip_engine.messages_for(topic)
.filter_map(|notification| {
// this could be optimized by decoding piecewise.
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
if let Err(ref e) = decoded {
trace!(target: "afg", "Skipping malformed commit message {:?}: {}", notification, e);
}
decoded.map(move |d| (notification, d)).ok()
future::ready(decoded.map(move |d| (notification, d)).ok())
})
.filter_map(move |(notification, msg)| {
match msg {
future::ready(match msg {
GossipMessage::Commit(msg) =>
process_commit(msg, notification, &mut gossip_engine, &gossip_validator, &*voters),
GossipMessage::CatchUp(msg) =>
process_catch_up(msg, notification, &mut gossip_engine, &gossip_validator, &*voters),
_ => {
debug!(target: "afg", "Skipping unknown message type");
return None;
None
}
}
})
})
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}
impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
@@ -687,19 +678,19 @@ pub(crate) struct OutgoingMessages<Block: BlockT> {
round: RoundNumber,
set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc03::Sender<SignedMessage<Block>>,
sender: mpsc::Sender<SignedMessage<Block>>,
network: GossipEngine<Block>,
has_voted: HasVoted<Block>,
}
impl<B: BlockT> Unpin for OutgoingMessages<B> {}
impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
impl<Block: BlockT> Sink<Message<Block>> for OutgoingMessages<Block>
{
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_ready(Pin::new(&mut self.sender), cx)
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_ready(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
})})
@@ -769,12 +760,12 @@ impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Poll03::Ready(Ok(()))
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_close(Pin::new(&mut self.sender), cx)
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_close(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
})})
@@ -985,13 +976,16 @@ impl<Block: BlockT> CommitsOut<Block> {
}
}
impl<Block: BlockT> Sink for CommitsOut<Block> {
type SinkItem = (RoundNumber, Commit<Block>);
type SinkError = Error;
impl<Block: BlockT> Sink<(RoundNumber, Commit<Block>)> for CommitsOut<Block> {
type Error = Error;
fn start_send(&mut self, input: (RoundNumber, Commit<Block>)) -> StartSend<Self::SinkItem, Error> {
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, input: (RoundNumber, Commit<Block>)) -> Result<(), Self::Error> {
if !self.is_voter {
return Ok(AsyncSink::Ready);
return Ok(());
}
let (round, commit) = input;
@@ -1027,9 +1021,14 @@ impl<Block: BlockT> Sink for CommitsOut<Block> {
);
self.network.gossip_message(topic, message.encode(), false);
Ok(AsyncSink::Ready)
Ok(())
}
fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
@@ -17,7 +17,7 @@
//! Periodic rebroadcast of neighbor packets.
use futures_timer::Delay;
use futures03::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream};
use futures::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream};
use log::debug;
use std::{pin::Pin, task::{Context, Poll}, time::{Instant, Duration}};
@@ -16,12 +16,11 @@
//! Tests for the communication portion of the GRANDPA crate.
use futures::sync::mpsc;
use futures::channel::mpsc;
use futures::prelude::*;
use sc_network::{Event as NetworkEvent, PeerId, config::Roles};
use sc_network_test::{Block, Hash};
use sc_network_gossip::Validator;
use tokio::runtime::current_thread;
use std::sync::Arc;
use sp_keyring::Ed25519Keyring;
use parity_scale_codec::Encode;
@@ -44,11 +43,19 @@ struct TestNetwork {
sender: mpsc::UnboundedSender<Event>,
}
impl sc_network_gossip::Network<Block> for TestNetwork {
fn event_stream(&self) -> Box<dyn futures::Stream<Item = NetworkEvent, Error = ()> + Send> {
impl TestNetwork {
fn event_stream_03(&self) -> Pin<Box<dyn futures::Stream<Item = NetworkEvent> + Send>> {
let (tx, rx) = mpsc::unbounded();
let _ = self.sender.unbounded_send(Event::EventStream(tx));
Box::new(rx)
Box::pin(rx)
}
}
impl sc_network_gossip::Network<Block> for TestNetwork {
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = NetworkEvent, Error = ()> + Send> {
Box::new(
self.event_stream_03().map(Ok::<_, ()>).compat()
)
}
fn report_peer(&self, who: sc_network::PeerId, cost_benefit: sc_network::ReputationChange) {
@@ -101,17 +108,17 @@ struct Tester {
}
impl Tester {
fn filter_network_events<F>(self, mut pred: F) -> impl Future<Item=Self,Error=()>
fn filter_network_events<F>(self, mut pred: F) -> impl Future<Output = Self>
where F: FnMut(Event) -> bool
{
let mut s = Some(self);
futures::future::poll_fn(move || loop {
match s.as_mut().unwrap().events.poll().expect("concluded early") {
Async::Ready(None) => panic!("concluded early"),
Async::Ready(Some(item)) => if pred(item) {
return Ok(Async::Ready(s.take().unwrap()))
futures::future::poll_fn(move |cx| loop {
match Stream::poll_next(Pin::new(&mut s.as_mut().unwrap().events), cx) {
Poll::Ready(None) => panic!("concluded early"),
Poll::Ready(Some(item)) => if pred(item) {
return Poll::Ready(s.take().unwrap())
},
Async::NotReady => return Ok(Async::NotReady),
Poll::Pending => return Poll::Pending,
}
})
}
@@ -149,8 +156,8 @@ fn voter_set_state() -> SharedVoterSetState<Block> {
}
// needs to run in a tokio runtime.
fn make_test_network(executor: &impl futures03::task::Spawn) -> (
impl Future<Item=Tester,Error=()>,
fn make_test_network(executor: &impl futures::task::Spawn) -> (
impl Future<Output = Tester>,
TestNetwork,
) {
let (tx, rx) = mpsc::unbounded();
@@ -159,7 +166,7 @@ fn make_test_network(executor: &impl futures03::task::Spawn) -> (
#[derive(Clone)]
struct Exit;
impl futures03::Future for Exit {
impl futures::Future for Exit {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> {
@@ -175,7 +182,7 @@ fn make_test_network(executor: &impl futures03::task::Spawn) -> (
);
(
futures::future::ok(Tester {
futures::future::ready(Tester {
gossip_validator: bridge.validator.clone(),
net_handle: bridge,
events: rx,
@@ -245,14 +252,14 @@ fn good_commit_leads_to_relay() {
let id = sc_network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let test = make_test_network(&threads_pool).0
.and_then(move |tester| {
.then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::FULL);
Ok((tester, id))
future::ready((tester, id))
})
.and_then(move |(tester, id)| {
.then(move |(tester, id)| {
// start round, dispatch commit, and wait for broadcast.
let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false);
@@ -305,12 +312,11 @@ fn good_commit_leads_to_relay() {
},
_ => panic!("commit expected"),
}
})
.map_err(|_| panic!("could not process commit"));
});
// once the message is sent and commit is "handled" we should have
// a repropagation event coming from the network.
send_message.join(handle_commit).and_then(move |(tester, ())| {
future::join(send_message, handle_commit).then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::WriteNotification(_, data) => {
data == encoded_commit
@@ -318,11 +324,10 @@ fn good_commit_leads_to_relay() {
_ => false,
})
})
.map_err(|_| panic!("could not watch for gossip message"))
.map(|_| ())
});
current_thread::Runtime::new().unwrap().block_on(test).unwrap();
futures::executor::block_on(test);
}
#[test]
@@ -371,14 +376,14 @@ fn bad_commit_leads_to_report() {
let id = sc_network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let test = make_test_network(&threads_pool).0
.and_then(move |tester| {
.map(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::FULL);
Ok((tester, id))
(tester, id)
})
.and_then(move |(tester, id)| {
.then(move |(tester, id)| {
// start round, dispatch commit, and wait for broadcast.
let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false);
@@ -422,12 +427,11 @@ fn bad_commit_leads_to_report() {
},
_ => panic!("commit expected"),
}
})
.map_err(|_| panic!("could not process commit"));
});
// once the message is sent and commit is "handled" we should have
// a report event coming from the network.
send_message.join(handle_commit).and_then(move |(tester, ())| {
future::join(send_message, handle_commit).then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::Report(who, cost_benefit) => {
who == id && cost_benefit == super::cost::INVALID_COMMIT
@@ -435,26 +439,25 @@ fn bad_commit_leads_to_report() {
_ => false,
})
})
.map_err(|_| panic!("could not watch for peer report"))
.map(|_| ())
});
current_thread::Runtime::new().unwrap().block_on(test).unwrap();
futures::executor::block_on(test);
}
#[test]
fn peer_with_higher_view_leads_to_catch_up_request() {
let id = sc_network::PeerId::random();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let (tester, mut net) = make_test_network(&threads_pool);
let test = tester
.and_then(move |tester| {
.map(move |tester| {
// register a peer with authority role.
tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::AUTHORITY);
Ok((tester, id))
((tester, id))
})
.and_then(move |(tester, id)| {
.then(move |(tester, id)| {
// send neighbor message at round 10 and height 50
let result = tester.gossip_validator.validate(
&mut net,
@@ -494,9 +497,8 @@ fn peer_with_higher_view_leads_to_catch_up_request() {
},
_ => false,
})
.map_err(|_| panic!("could not watch for peer send message"))
.map(|_| ())
});
current_thread::Runtime::new().unwrap().block_on(test).unwrap();
futures::executor::block_on(test);
}
@@ -16,17 +16,13 @@
use std::collections::BTreeMap;
use std::iter::FromIterator;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use log::{debug, warn, info};
use parity_scale_codec::{Decode, Encode};
use futures::prelude::*;
use futures03::{
compat::{Compat, CompatSink},
future::{FutureExt as _, TryFutureExt as _},
stream::StreamExt as _,
};
use futures_timer::Delay;
use parking_lot::RwLock;
use sp_blockchain::{HeaderBackend, Error as ClientError};
@@ -568,19 +564,18 @@ where
NumberFor<Block>: BlockNumberOps,
Client<B, E, Block, RA>: AuxStore,
{
type Timer = Box<dyn Future<Item = (), Error = Self::Error> + Send>;
type Timer = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
type Id = AuthorityId;
type Signature = AuthoritySignature;
// regular round message streams
type In = Box<dyn Stream<
Item = ::finality_grandpa::SignedMessage<Block::Hash, NumberFor<Block>, Self::Signature, Self::Id>,
type In = Pin<Box<dyn Stream<
Item = Result<::finality_grandpa::SignedMessage<Block::Hash, NumberFor<Block>, Self::Signature, Self::Id>, Self::Error>
> + Send>>;
type Out = Pin<Box<dyn Sink<
::finality_grandpa::Message<Block::Hash, NumberFor<Block>>,
Error = Self::Error,
> + Send>;
type Out = Box<dyn Sink<
SinkItem = ::finality_grandpa::Message<Block::Hash, NumberFor<Block>>,
SinkError = Self::Error,
> + Send>;
> + Send>>;
type Error = CommandOrError<Block::Hash, NumberFor<Block>>;
@@ -612,12 +607,9 @@ where
has_voted,
);
let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item)));
let outgoing = CompatSink::new(outgoing);
// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
let incoming = Box::new(UntilVoteTargetImported::new(
let incoming = Box::pin(UntilVoteTargetImported::new(
self.client.import_notification_stream(),
self.network.clone(),
self.client.clone(),
@@ -626,12 +618,12 @@ where
).map_err(Into::into));
// schedule network message cleanup when sink drops.
let outgoing = Box::new(outgoing.sink_map_err(Into::into));
let outgoing = Box::pin(outgoing.sink_err_into());
voter::RoundData {
voter_id: local_key.map(|pair| pair.public()),
prevote_timer: Box::new(prevote_timer.map(Ok).compat()),
precommit_timer: Box::new(precommit_timer.map(Ok).compat()),
prevote_timer: Box::pin(prevote_timer.map(Ok)),
precommit_timer: Box::pin(precommit_timer.map(Ok)),
incoming,
outgoing,
}
@@ -905,7 +897,7 @@ where
//random between 0-1 seconds.
let delay: u64 = thread_rng().gen_range(0, 1000);
Box::new(Delay::new(Duration::from_millis(delay)).map(Ok).compat())
Box::pin(Delay::new(Duration::from_millis(delay)).map(Ok))
}
fn prevote_equivocation(
@@ -18,7 +18,7 @@ use std::{sync::Arc, collections::HashMap};
use log::{debug, trace, info};
use parity_scale_codec::Encode;
use futures::sync::mpsc;
use futures::channel::mpsc;
use parking_lot::RwLockWriteGuard;
use sp_blockchain::{HeaderBackend, BlockStatus, well_known_cache_keys};
+47 -68
View File
@@ -53,9 +53,9 @@
//! included in the newly-finalized chain.
use futures::prelude::*;
use futures03::{StreamExt, future::ready};
use log::{debug, error, info};
use futures::sync::mpsc;
use futures::StreamExt;
use log::{debug, info};
use futures::channel::mpsc;
use sc_client_api::{BlockchainEvents, CallExecutor, backend::{AuxStore, Backend}, ExecutionStrategy};
use sp_blockchain::{HeaderBackend, Error as ClientError};
use sc_client::Client;
@@ -66,7 +66,7 @@ use sc_keystore::KeyStorePtr;
use sp_inherents::InherentDataProviders;
use sp_consensus::SelectChain;
use sp_core::Pair;
use sc_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG, CONSENSUS_WARN};
use sc_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG};
use serde_json;
use sp_finality_tracker;
@@ -77,6 +77,8 @@ use finality_grandpa::{voter, BlockNumberOps, voter_set::VoterSet};
use std::{fmt, io};
use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
use std::task::{Poll, Context};
mod authorities;
mod aux_schema;
@@ -456,13 +458,12 @@ fn global_communication<Block: BlockT, B, E, N, RA>(
keystore: &Option<KeyStorePtr>,
) -> (
impl Stream<
Item = CommunicationInH<Block, Block::Hash>,
Error = CommandOrError<Block::Hash, NumberFor<Block>>,
Item = Result<CommunicationInH<Block, Block::Hash>, CommandOrError<Block::Hash, NumberFor<Block>>>,
>,
impl Sink<
SinkItem = CommunicationOutH<Block, Block::Hash>,
SinkError = CommandOrError<Block::Hash, NumberFor<Block>>,
>,
CommunicationOutH<Block, Block::Hash>,
Error = CommandOrError<Block::Hash, NumberFor<Block>>,
> + Unpin,
) where
B: Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
@@ -536,7 +537,7 @@ pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> {
/// Handle to a future that will resolve on exit.
pub on_exit: X,
/// If supplied, can be used to hook on telemetry connection established events.
pub telemetry_on_connect: Option<futures03::channel::mpsc::UnboundedReceiver<()>>,
pub telemetry_on_connect: Option<futures::channel::mpsc::UnboundedReceiver<()>>,
/// A voting rule used to potentially restrict target votes.
pub voting_rule: VR,
/// How to spawn background tasks.
@@ -547,7 +548,7 @@ pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> {
/// block import worker that has already been instantiated with `block_import`.
pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
) -> sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
@@ -557,9 +558,9 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
NumberFor<Block>: BlockNumberOps,
DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
X: futures::Future<Output=()> + Clone + Send + Unpin + 'static,
Client<B, E, Block, RA>: AuxStore,
Sp: futures03::task::Spawn + 'static,
Sp: futures::task::Spawn + 'static,
{
let GrandpaParams {
config,
@@ -608,13 +609,11 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
.expect("authorities is always at least an empty vector; elements are always of type string")
}
);
ready(())
})
.unit_error()
.compat();
futures::future::Either::A(events)
future::ready(())
});
future::Either::Left(events)
} else {
futures::future::Either::B(futures::future::empty())
future::Either::Right(future::pending())
};
let voter_work = VoterWork::new(
@@ -628,28 +627,22 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
);
let voter_work = voter_work
.map(|_| ())
.map_err(|e| {
error!("GRANDPA Voter failed: {:?}", e);
telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e);
});
.map(|_| ());
// Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa.
let telemetry_task = telemetry_task
.then(|_| futures::future::empty::<(), ()>());
.then(|_| future::pending::<()>());
use futures03::{FutureExt, TryFutureExt};
Ok(voter_work.select(on_exit.map(Ok).compat()).select2(telemetry_task).then(|_| Ok(())))
Ok(future::select(future::select(voter_work, on_exit), telemetry_task).map(drop))
}
/// Future that powers the voter.
#[must_use]
struct VoterWork<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> {
voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>,
voter: Pin<Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>>,
env: Arc<Environment<B, E, Block, N, RA, SC, VR>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
network: futures03::compat::Compat<NetworkBridge<Block, N>>,
network: NetworkBridge<Block, N>,
}
impl<B, E, Block, N, RA, SC, VR> VoterWork<B, E, Block, N, RA, SC, VR>
@@ -691,10 +684,10 @@ where
let mut work = VoterWork {
// `voter` is set to a temporary value and replaced below when
// calling `rebuild_voter`.
voter: Box::new(futures::empty()) as Box<_>,
voter: Box::pin(future::pending()),
env,
voter_commands_rx,
network: futures03::future::TryFutureExt::compat(network),
network,
};
work.rebuild_voter();
work
@@ -757,10 +750,10 @@ where
last_finalized,
);
self.voter = Box::new(voter);
self.voter = Box::pin(voter);
},
VoterSetState::Paused { .. } =>
self.voter = Box::new(futures::empty()),
self.voter = Box::pin(future::pending()),
};
}
@@ -841,61 +834,47 @@ where
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
Client<B, E, Block, RA>: AuxStore,
{
type Item = ();
type Error = Error;
type Output = Result<(), Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.voter.poll() {
Ok(Async::NotReady) => {}
Ok(Async::Ready(())) => {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.voter), cx) {
Poll::Pending => {}
Poll::Ready(Ok(())) => {
// voters don't conclude naturally
return Err(Error::Safety("GRANDPA voter has concluded.".into()))
return Poll::Ready(Err(Error::Safety("GRANDPA voter has concluded.".into())))
}
Err(CommandOrError::Error(e)) => {
Poll::Ready(Err(CommandOrError::Error(e))) => {
// return inner observer error
return Err(e)
return Poll::Ready(Err(e))
}
Err(CommandOrError::VoterCommand(command)) => {
Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
// some command issued internally
self.handle_voter_command(command)?;
futures::task::current().notify();
cx.waker().wake_by_ref();
}
}
match self.voter_commands_rx.poll() {
Ok(Async::NotReady) => {}
Err(_) => {
// the `voter_commands_rx` stream should not fail.
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => {
match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
Poll::Pending => {}
Poll::Ready(None) => {
// the `voter_commands_rx` stream should never conclude since it's never closed.
return Ok(Async::Ready(()))
return Poll::Ready(Ok(()))
}
Ok(Async::Ready(Some(command))) => {
Poll::Ready(Some(command)) => {
// some command issued externally
self.handle_voter_command(command)?;
futures::task::current().notify();
cx.waker().wake_by_ref();
}
}
match self.network.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(())) => {
// the network bridge future should never conclude.
return Ok(Async::Ready(()))
}
e @ Err(_) => return e,
};
Ok(Async::NotReady)
Future::poll(Pin::new(&mut self.network), cx)
}
}
#[deprecated(since = "1.1.0", note = "Please switch to run_grandpa_voter.")]
pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
) -> sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
) -> sp_blockchain::Result<impl Future<Output=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
@@ -905,9 +884,9 @@ pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
X: futures::Future<Output=()> + Clone + Send + Unpin + 'static,
Client<B, E, Block, RA>: AuxStore,
Sp: futures03::task::Spawn + 'static,
Sp: futures::task::Spawn + 'static,
{
run_grandpa_voter(grandpa_params)
}
@@ -14,10 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::prelude::*;
use futures::{future, sync::mpsc};
use futures::{prelude::*, channel::mpsc};
use finality_grandpa::{
BlockNumberOps, Error as GrandpaError, voter, voter_set::VoterSet
@@ -64,14 +65,13 @@ fn grandpa_observer<B, E, Block: BlockT, RA, S, F>(
last_finalized_number: NumberFor<Block>,
commits: S,
note_round: F,
) -> impl Future<Item=(), Error=CommandOrError<Block::Hash, NumberFor<Block>>> where
) -> impl Future<Output=Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> where
NumberFor<Block>: BlockNumberOps,
B: Backend<Block>,
E: CallExecutor<Block> + Send + Sync + 'static,
RA: Send + Sync,
S: Stream<
Item = CommunicationIn<Block>,
Error = CommandOrError<Block::Hash, NumberFor<Block>>,
Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>,
>,
F: Fn(u64),
{
@@ -80,7 +80,7 @@ fn grandpa_observer<B, E, Block: BlockT, RA, S, F>(
let client = client.clone();
let voters = voters.clone();
let observer = commits.fold(last_finalized_number, move |last_finalized_number, global| {
let observer = commits.try_fold(last_finalized_number, move |last_finalized_number, global| {
let (round, commit, callback) = match global {
voter::CommunicationIn::Commit(round, commit, callback) => {
let commit = finality_grandpa::Commit::from(commit);
@@ -143,7 +143,7 @@ fn grandpa_observer<B, E, Block: BlockT, RA, S, F>(
}
});
observer.map(|_| ())
observer.map_ok(|_| ())
}
/// Run a GRANDPA observer as a task, the observer will finalize blocks only by
@@ -154,16 +154,16 @@ pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>(
config: Config,
link: LinkHalf<B, E, Block, RA, SC>,
network: N,
on_exit: impl futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
on_exit: impl futures::Future<Output=()> + Clone + Send + Unpin + 'static,
executor: Sp,
) -> sp_blockchain::Result<impl Future<Item=(), Error=()> + Send + 'static> where
) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
N: NetworkT<Block> + Send + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
RA: Send + Sync + 'static,
Sp: futures03::task::Spawn + 'static,
Sp: futures::task::Spawn + 'static,
Client<B, E, Block, RA>: AuxStore,
{
let LinkHalf {
@@ -189,20 +189,18 @@ pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>(
);
let observer_work = observer_work
.map(|_| ())
.map_ok(|_| ())
.map_err(|e| {
warn!("GRANDPA Observer failed: {:?}", e);
});
use futures03::{FutureExt, TryFutureExt};
Ok(observer_work.select(on_exit.map(Ok).compat()).map(|_| ()).map_err(|_| ()))
Ok(future::select(observer_work, on_exit).map(drop))
}
/// Future that powers the observer.
#[must_use]
struct ObserverWork<B: BlockT, N: NetworkT<B>, E, Backend, RA> {
observer: Box<dyn Future<Item = (), Error = CommandOrError<B::Hash, NumberFor<B>>> + Send>,
observer: Pin<Box<dyn Future<Output = Result<(), CommandOrError<B::Hash, NumberFor<B>>>> + Send>>,
client: Arc<Client<Backend, E, B, RA>>,
network: NetworkBridge<B, N>,
persistent_data: PersistentData<B>,
@@ -231,7 +229,7 @@ where
let mut work = ObserverWork {
// `observer` is set to a temporary value and replaced below when
// calling `rebuild_observer`.
observer: Box::new(futures::empty()) as Box<_>,
observer: Box::pin(future::pending()) as Pin<Box<_>>,
client,
network,
persistent_data,
@@ -286,7 +284,7 @@ where
note_round,
);
self.observer = Box::new(observer);
self.observer = Box::pin(observer);
}
fn handle_voter_command(
@@ -336,44 +334,41 @@ where
Bk: Backend<B> + 'static,
Client<Bk, E, B, RA>: AuxStore,
{
type Item = ();
type Error = Error;
type Output = Result<(), Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.observer.poll() {
Ok(Async::NotReady) => {}
Ok(Async::Ready(())) => {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
match Future::poll(Pin::new(&mut this.observer), cx) {
Poll::Pending => {}
Poll::Ready(Ok(())) => {
// observer commit stream doesn't conclude naturally; this could reasonably be an error.
return Ok(Async::Ready(()))
return Poll::Ready(Ok(()))
}
Err(CommandOrError::Error(e)) => {
Poll::Ready(Err(CommandOrError::Error(e))) => {
// return inner observer error
return Err(e)
return Poll::Ready(Err(e))
}
Err(CommandOrError::VoterCommand(command)) => {
Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
// some command issued internally
self.handle_voter_command(command)?;
futures::task::current().notify();
this.handle_voter_command(command)?;
cx.waker().wake_by_ref();
}
}
match self.voter_commands_rx.poll() {
Ok(Async::NotReady) => {}
Err(_) => {
// the `voter_commands_rx` stream should not fail.
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => {
match Stream::poll_next(Pin::new(&mut this.voter_commands_rx), cx) {
Poll::Pending => {}
Poll::Ready(None) => {
// the `voter_commands_rx` stream should never conclude since it's never closed.
return Ok(Async::Ready(()))
return Poll::Ready(Ok(()))
}
Ok(Async::Ready(Some(command))) => {
Poll::Ready(Some(command)) => {
// some command issued externally
self.handle_voter_command(command)?;
futures::task::current().notify();
this.handle_voter_command(command)?;
cx.waker().wake_by_ref();
}
}
Ok(Async::NotReady)
Poll::Pending
}
}
+179 -191
View File
@@ -25,7 +25,6 @@ use sc_network_test::{
use sc_network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder};
use parking_lot::Mutex;
use futures_timer::Delay;
use futures03::TryStreamExt as _;
use tokio::runtime::current_thread;
use sp_keyring::Ed25519Keyring;
use sc_client::LongestChain;
@@ -48,6 +47,8 @@ use sp_runtime::generic::{BlockId, DigestItem};
use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public};
use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check};
use futures01::Async;
use futures::compat::Future01CompatExt;
use authorities::AuthoritySet;
use finality_proof::{
@@ -196,7 +197,7 @@ impl TestNetFactory for GrandpaTestNet {
#[derive(Clone)]
struct Exit;
impl futures03::Future for Exit {
impl futures::Future for Exit {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut task::Context) -> task::Poll<()> {
@@ -371,17 +372,28 @@ fn create_keystore(authority: Ed25519Keyring) -> (KeyStorePtr, tempfile::TempDir
(keystore, keystore_path)
}
fn block_until_complete(future: impl Future + Unpin, net: &Arc<Mutex<GrandpaTestNet>>, runtime: &mut current_thread::Runtime) {
let drive_to_completion = futures01::future::poll_fn(|| {
net.lock().poll(); Ok::<Async<()>, ()>(Async::NotReady)
});
runtime.block_on(
future::select(future, drive_to_completion.compat())
.map(|_| Ok::<(), ()>(()))
.compat()
).unwrap();
}
// run the voters to completion. provide a closure to be invoked after
// the voters are spawned but before blocking on them.
fn run_to_completion_with<F>(
runtime: &mut current_thread::Runtime,
threads_pool: &futures03::executor::ThreadPool,
threads_pool: &futures::executor::ThreadPool,
blocks: u64,
net: Arc<Mutex<GrandpaTestNet>>,
peers: &[Ed25519Keyring],
with: F,
) -> u64 where
F: FnOnce(current_thread::Handle) -> Option<Box<dyn Future<Item=(), Error=()>>>
F: FnOnce(current_thread::Handle) -> Option<Pin<Box<dyn Future<Output = ()>>>>
{
use parking_lot::RwLock;
@@ -411,17 +423,16 @@ fn run_to_completion_with<F>(
};
wait_for.push(
Box::new(
Box::pin(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(move |n| {
let mut highest_finalized = highest_finalized.write();
if *n.header.number() > *highest_finalized {
*highest_finalized = *n.header.number();
}
Ok(n.header.number() < &blocks)
future::ready(n.header.number() < &blocks)
})
.collect()
.collect::<Vec<_>>()
.map(|_| ())
)
);
@@ -449,24 +460,20 @@ fn run_to_completion_with<F>(
assert_send(&voter);
runtime.spawn(voter);
runtime.spawn(voter.unit_error().compat());
}
// wait for all finalized on each.
let wait_for = ::futures::future::join_all(wait_for)
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
let wait_for = ::futures::future::join_all(wait_for);
block_until_complete(wait_for, &net, runtime);
let highest_finalized = *highest_finalized.read();
highest_finalized
}
fn run_to_completion(
runtime: &mut current_thread::Runtime,
threads_pool: &futures03::executor::ThreadPool,
threads_pool: &futures::executor::ThreadPool,
blocks: u64,
net: Arc<Mutex<GrandpaTestNet>>,
peers: &[Ed25519Keyring]
@@ -496,7 +503,7 @@ fn add_forced_change(
fn finalize_3_voters_no_observers() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
@@ -522,7 +529,7 @@ fn finalize_3_voters_no_observers() {
#[test]
fn finalize_3_voters_1_full_observer() {
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
@@ -555,9 +562,8 @@ fn finalize_3_voters_1_full_observer() {
};
finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &20))
.for_each(move |_| Ok(()))
.take_while(|n| future::ready(n.header.number() < &20))
.for_each(move |_| future::ready(()))
);
let keystore = if let Some(local_key) = local_key {
@@ -590,16 +596,14 @@ fn finalize_3_voters_1_full_observer() {
}
for voter in voters {
runtime.spawn(voter);
runtime.spawn(voter.unit_error().compat());
}
// wait for all finalized on each.
let wait_for = futures::future::join_all(finality_notifications)
.map(|_| ())
.map_err(|_| ());
.map(|_| ());
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
block_until_complete(wait_for, &net, &mut runtime);
}
#[test]
@@ -631,7 +635,7 @@ fn transition_3_voters_twice_1_full_observer() {
let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8)));
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
net.lock().peer(0).push_blocks(1, false);
net.lock().block_until_sync(&mut runtime);
@@ -654,8 +658,7 @@ fn transition_3_voters_twice_1_full_observer() {
// wait for blocks to be finalized before generating new ones
let block_production = client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &30))
.take_while(|n| future::ready(n.header.number() < &30))
.for_each(move |n| {
match n.header.number() {
1 => {
@@ -692,10 +695,10 @@ fn transition_3_voters_twice_1_full_observer() {
_ => {},
}
Ok(())
future::ready(())
});
runtime.spawn(block_production);
runtime.spawn(block_production.unit_error().compat());
}
let mut finality_notifications = Vec::new();
@@ -725,9 +728,8 @@ fn transition_3_voters_twice_1_full_observer() {
finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &30))
.for_each(move |_| Ok(()))
.take_while(|n| future::ready(n.header.number() < &30))
.for_each(move |_| future::ready(()))
.map(move |()| {
let full_client = client.as_full().expect("only full clients are used in test");
let set: AuthoritySet<Hash, BlockNumber> = crate::aux_schema::load_authorities(&*full_client).unwrap();
@@ -756,22 +758,19 @@ fn transition_3_voters_twice_1_full_observer() {
};
let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
runtime.spawn(voter);
runtime.spawn(voter.unit_error().compat());
}
// wait for all finalized on each.
let wait_for = ::futures::future::join_all(finality_notifications)
.map(|_| ())
.map_err(|_| ());
let wait_for = ::futures::future::join_all(finality_notifications);
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
block_until_complete(wait_for, &net, &mut runtime);
}
#[test]
fn justification_is_emitted_when_consensus_data_changes() {
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 3);
@@ -790,7 +789,7 @@ fn justification_is_emitted_when_consensus_data_changes() {
#[test]
fn justification_is_generated_periodically() {
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
@@ -830,7 +829,7 @@ fn consensus_changes_works() {
#[test]
fn sync_justifications_on_change_blocks() {
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let peers_b = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob];
let voters = make_ids(peers_b);
@@ -871,21 +870,21 @@ fn sync_justifications_on_change_blocks() {
}
// the last peer should get the justification by syncing from other peers
runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> {
futures::executor::block_on(futures::future::poll_fn(move |_| {
if net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() {
net.lock().poll();
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(())
}
})).unwrap()
}))
}
#[test]
fn finalizes_multiple_pending_changes_in_order() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let peers_b = &[Ed25519Keyring::Dave, Ed25519Keyring::Eve, Ed25519Keyring::Ferdie];
@@ -946,7 +945,7 @@ fn finalizes_multiple_pending_changes_in_order() {
fn force_change_to_new_set() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
// two of these guys are offline.
let genesis_authorities = &[
Ed25519Keyring::Alice,
@@ -1123,11 +1122,11 @@ fn voter_persists_its_votes() {
use std::iter::FromIterator;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::future;
use futures::sync::mpsc;
use futures::channel::mpsc;
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
// we have two authorities but we'll only be running the voter for alice
// we are going to be listening for the prevotes it casts
@@ -1161,56 +1160,56 @@ fn voter_persists_its_votes() {
keystore_paths.push(keystore_path);
struct ResettableVoter {
voter: Box<dyn Future<Item = (), Error = ()> + Send>,
voter: Pin<Box<dyn Future<Output = ()> + Send + Unpin>>,
voter_rx: mpsc::UnboundedReceiver<()>,
net: Arc<Mutex<GrandpaTestNet>>,
client: PeersClient,
keystore: KeyStorePtr,
threads_pool: futures03::executor::ThreadPool,
threads_pool: futures::executor::ThreadPool,
}
impl Future for ResettableVoter {
type Item = ();
type Error = ();
type Output = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.voter.poll() {
Ok(Async::Ready(())) | Err(_) => panic!("error in the voter"),
Ok(Async::NotReady) => {},
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
if let Poll::Ready(()) = Pin::new(&mut this.voter).poll(cx) {
panic!("error in the voter");
}
match self.voter_rx.poll() {
Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => {}
Ok(Async::Ready(Some(()))) => {
match Pin::new(&mut this.voter_rx).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(())) => {
let (_block_import, _, _, _, link) =
self.net.lock()
this.net.lock()
.make_block_import::<
TransactionFor<substrate_test_runtime_client::Backend, Block>
>(self.client.clone());
>(this.client.clone());
let link = link.lock().take().unwrap();
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
justification_period: 32,
keystore: Some(self.keystore.clone()),
keystore: Some(this.keystore.clone()),
name: Some(format!("peer#{}", 0)),
is_authority: true,
observer_enabled: true,
},
link,
network: self.net.lock().peers[0].network_service().clone(),
network: this.net.lock().peers[0].network_service().clone(),
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
voting_rule: VotingRulesBuilder::default().build(),
executor: self.threads_pool.clone(),
executor: this.threads_pool.clone(),
};
let voter = run_grandpa_voter(grandpa_params)
.expect("all in order with client and network")
.then(move |r| {
.map(move |r| {
// we need to keep the block_import alive since it owns the
// sender for the voter commands channel, if that gets dropped
// then the voter will stop
@@ -1218,30 +1217,30 @@ fn voter_persists_its_votes() {
r
});
self.voter = Box::new(voter);
this.voter = Box::pin(voter);
// notify current task in order to poll the voter
futures::task::current().notify();
cx.waker().wake_by_ref();
}
};
Ok(Async::NotReady)
Poll::Pending
}
}
// we create a "dummy" voter by setting it to `empty` and triggering the `tx`.
// we create a "dummy" voter by setting it to `pending` and triggering the `tx`.
// this way, the `ResettableVoter` will reset its `voter` field to a value ASAP.
voter_tx.unbounded_send(()).unwrap();
runtime.spawn(ResettableVoter {
voter: Box::new(futures::future::empty()),
voter: Box::pin(futures::future::pending()),
voter_rx,
net: net.clone(),
client: client.clone(),
keystore,
threads_pool: threads_pool.clone(),
});
}.unit_error().compat());
}
let (exit_tx, exit_rx) = futures::sync::oneshot::channel::<()>();
let (exit_tx, exit_rx) = futures::channel::oneshot::channel::<()>();
// create the communication layer for bob, but don't start any
// voter. instead we'll listen for the prevote that alice casts
@@ -1284,122 +1283,107 @@ fn voter_persists_its_votes() {
HasVoted::No,
);
let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item)));
let round_tx = futures03::compat::CompatSink::new(round_tx);
let round_tx = Arc::new(Mutex::new(round_tx));
let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));
let net = net.clone();
let state = AtomicUsize::new(0);
let state = Arc::new(AtomicUsize::new(0));
runtime.spawn(round_rx.for_each(move |signed| {
if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
// the first message we receive should be a prevote from alice.
let prevote = match signed.message {
finality_grandpa::Message::Prevote(prevote) => prevote,
_ => panic!("voter should prevote."),
};
let net2 = net.clone();
let net = net.clone();
let voter_tx = voter_tx.clone();
let round_tx = round_tx.clone();
let state = state.clone();
let exit_tx = exit_tx.clone();
// its chain has 20 blocks and the voter targets 3/4 of the
// unfinalized chain, so the vote should be for block 15
assert!(prevote.target_number == 15);
async move {
if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
// the first message we receive should be a prevote from alice.
let prevote = match signed.message {
finality_grandpa::Message::Prevote(prevote) => prevote,
_ => panic!("voter should prevote."),
};
// we push 20 more blocks to alice's chain
net.lock().peer(0).push_blocks(20, false);
// its chain has 20 blocks and the voter targets 3/4 of the
// unfinalized chain, so the vote should be for block 15
assert!(prevote.target_number == 15);
let net2 = net.clone();
let net = net.clone();
let voter_tx = voter_tx.clone();
let round_tx = round_tx.clone();
// we push 20 more blocks to alice's chain
net.lock().peer(0).push_blocks(20, false);
let interval = futures03::stream::unfold(Delay::new(Duration::from_millis(200)), |delay|
Box::pin(async move {
delay.await;
Some(((), Delay::new(Duration::from_millis(200))))
})).map(Ok::<_, ()>).compat();
let interval = futures::stream::unfold(Delay::new(Duration::from_millis(200)), |delay|
Box::pin(async move {
delay.await;
Some(((), Delay::new(Duration::from_millis(200))))
})
);
future::Either::A(interval
.take_while(move |_| {
Ok(net2.lock().peer(1).client().info().best_number != 40)
})
.for_each(|_| Ok(()))
.and_then(move |_| {
let block_30_hash =
net.lock().peer(0).client().as_full().unwrap().hash(30).unwrap().unwrap();
interval
.take_while(move |_| {
future::ready(net2.lock().peer(1).client().info().best_number != 40)
})
.for_each(|_| future::ready(()))
.await;
// we restart alice's voter
voter_tx.unbounded_send(()).unwrap();
let block_30_hash =
net.lock().peer(0).client().as_full().unwrap().hash(30).unwrap().unwrap();
// and we push our own prevote for block 30
let prevote = finality_grandpa::Prevote {
target_number: 30,
target_hash: block_30_hash,
};
// we restart alice's voter
voter_tx.unbounded_send(()).unwrap();
// One should either be calling `Sink::send` or `Sink::start_send` followed
// by `Sink::poll_complete` to make sure items are being flushed. Given that
// we send in a loop including a delay until items are received, this can be
// ignored for the sake of reduced complexity.
if !round_tx.lock()
.start_send(finality_grandpa::Message::Prevote(prevote))
.unwrap()
.is_ready() {
panic!("expected sink to be ready to write to.");
}
// and we push our own prevote for block 30
let prevote = finality_grandpa::Prevote {
target_number: 30,
target_hash: block_30_hash,
};
Ok(())
}).map_err(|_| panic!()))
// One should either be calling `Sink::send` or `Sink::start_send` followed
// by `Sink::poll_complete` to make sure items are being flushed. Given that
// we send in a loop including a delay until items are received, this can be
// ignored for the sake of reduced complexity.
Pin::new(&mut *round_tx.lock()).start_send(finality_grandpa::Message::Prevote(prevote)).unwrap();
} else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 {
// the next message we receive should be our own prevote
let prevote = match signed.message {
finality_grandpa::Message::Prevote(prevote) => prevote,
_ => panic!("We should receive our own prevote."),
};
} else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 {
// the next message we receive should be our own prevote
let prevote = match signed.message {
finality_grandpa::Message::Prevote(prevote) => prevote,
_ => panic!("We should receive our own prevote."),
};
// targeting block 30
assert!(prevote.target_number == 30);
// targeting block 30
assert!(prevote.target_number == 30);
// after alice restarts it should send its previous prevote
// therefore we won't ever receive it again since it will be a
// known message on the gossip layer
// after alice restarts it should send its previous prevote
// therefore we won't ever receive it again since it will be a
// known message on the gossip layer
} else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 {
// we then receive a precommit from alice for block 15
// even though we casted a prevote for block 30
let precommit = match signed.message {
finality_grandpa::Message::Precommit(precommit) => precommit,
_ => panic!("voter should precommit."),
};
future::Either::B(future::ok(()))
assert!(precommit.target_number == 15);
} else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 {
// we then receive a precommit from alice for block 15
// even though we casted a prevote for block 30
let precommit = match signed.message {
finality_grandpa::Message::Precommit(precommit) => precommit,
_ => panic!("voter should precommit."),
};
assert!(precommit.target_number == 15);
// signal exit
exit_tx.clone().lock().take().unwrap().send(()).unwrap();
future::Either::B(future::ok(()))
} else {
panic!()
// signal exit
exit_tx.clone().lock().take().unwrap().send(()).unwrap();
} else {
panic!()
}
}
}).map_err(|_| ()));
}).map(Ok).boxed().compat());
}
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let exit = exit_rx.into_future().map(|_| ()).map_err(|_| ());
runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap();
block_until_complete(exit_rx.into_future(), &net, &mut runtime);
}
#[test]
fn finalize_3_voters_1_light_observer() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let authorities = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(authorities);
@@ -1416,9 +1400,8 @@ fn finalize_3_voters_1_light_observer() {
let link = net.lock().peer(3).data.lock().take().expect("link initialized on startup; qed");
let finality_notifications = net.lock().peer(3).client().finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &20))
.collect();
.take_while(|n| future::ready(n.header.number() < &20))
.collect::<Vec<_>>();
run_to_completion_with(&mut runtime, &threads_pool, 20, net.clone(), authorities, |executor| {
executor.spawn(
@@ -1435,10 +1418,10 @@ fn finalize_3_voters_1_light_observer() {
net.lock().peers[3].network_service().clone(),
Exit,
threads_pool.clone(),
).unwrap()
).unwrap().unit_error().compat()
).unwrap();
Some(Box::new(finality_notifications.map(|_| ())))
Some(Box::pin(finality_notifications.map(|_| ())))
});
}
@@ -1446,7 +1429,7 @@ fn finalize_3_voters_1_light_observer() {
fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice];
let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 1);
@@ -1460,14 +1443,15 @@ fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
net.lock().block_until_sync(&mut runtime);
// check that the block#1 is finalized on light client
runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> {
let mut runtime = current_thread::Runtime::new().unwrap();
let _ = runtime.block_on(futures::future::poll_fn(move |_| {
if net.lock().peer(1).client().info().finalized_number == 1 {
Ok(Async::Ready(()))
Poll::Ready(())
} else {
net.lock().poll();
Ok(Async::NotReady)
Poll::Pending
}
})).unwrap()
}).unit_error().compat());
}
#[test]
@@ -1477,7 +1461,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
// two of these guys are offline.
let genesis_authorities = if FORCE_CHANGE {
@@ -1542,7 +1526,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
fn voter_catches_up_to_latest_round_when_behind() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob];
let voters = make_ids(peers);
@@ -1554,7 +1538,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
let net = Arc::new(Mutex::new(net));
let mut finality_notifications = Vec::new();
let voter = |keystore, peer_id, link, net: Arc<Mutex<GrandpaTestNet>>| -> Box<dyn Future<Item=(), Error=()> + Send> {
let voter = |keystore, peer_id, link, net: Arc<Mutex<GrandpaTestNet>>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
@@ -1573,7 +1557,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
executor: threads_pool.clone(),
};
Box::new(run_grandpa_voter(grandpa_params).expect("all in order with client and network"))
Box::pin(run_grandpa_voter(grandpa_params).expect("all in order with client and network"))
};
let mut keystore_paths = Vec::new();
@@ -1591,9 +1575,8 @@ fn voter_catches_up_to_latest_round_when_behind() {
finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &50))
.for_each(move |_| Ok(()))
.take_while(|n| future::ready(n.header.number() < &50))
.for_each(move |_| future::ready(()))
);
let (keystore, keystore_path) = create_keystore(*key);
@@ -1601,14 +1584,13 @@ fn voter_catches_up_to_latest_round_when_behind() {
let voter = voter(Some(keystore), peer_id, link, net.clone());
runtime.spawn(voter);
runtime.spawn(voter.unit_error().compat());
}
// wait for them to finalize block 50. since they'll vote on 3/4 of the
// unfinalized chain it will take at least 4 rounds to do it.
let wait_for_finality = ::futures::future::join_all(finality_notifications)
.map(|_| ())
.map_err(|_| ());
.map(|_| ());
// spawn a new voter, it should be behind by at least 4 rounds and should be
// able to catch up to the latest round
@@ -1616,7 +1598,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
let net = net.clone();
let runtime = runtime.handle();
wait_for_finality.and_then(move |_| {
wait_for_finality.then(move |_| {
let peer_id = 2;
let link = {
let net = net.lock();
@@ -1628,20 +1610,20 @@ fn voter_catches_up_to_latest_round_when_behind() {
let voter = voter(None, peer_id, link, net);
runtime.spawn(voter).unwrap();
runtime.spawn(voter.unit_error().compat()).unwrap();
let start_time = std::time::Instant::now();
let timeout = Duration::from_secs(5 * 60);
let wait_for_catch_up = futures::future::poll_fn(move || {
let wait_for_catch_up = futures::future::poll_fn(move |_| {
// The voter will start at round 1 and since everyone else is
// already at a later round the only way to get to round 4 (or
// later) is by issuing a catch up request.
if set_state.read().last_completed_round().number >= 4 {
Ok(Async::Ready(()))
Poll::Ready(())
} else if start_time.elapsed() > timeout {
panic!("Timed out while waiting for catch up to happen")
} else {
Ok(Async::NotReady)
Poll::Pending
}
});
@@ -1649,8 +1631,14 @@ fn voter_catches_up_to_latest_round_when_behind() {
})
};
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(test.select(drive_to_completion).map_err(|_| ())).unwrap();
let drive_to_completion = futures01::future::poll_fn(|| {
net.lock().poll(); Ok::<Async<()>, ()>(Async::NotReady)
});
runtime.block_on(
future::select(test, drive_to_completion.compat())
.map(|_| Ok::<(), ()>(()))
.compat()
).unwrap();
}
#[test]
@@ -1658,7 +1646,7 @@ fn grandpa_environment_respects_voting_rules() {
use finality_grandpa::Chain;
use sc_network_test::TestClient;
let threads_pool = futures03::executor::ThreadPool::new().unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let peers = &[Ed25519Keyring::Alice];
let voters = make_ids(peers);
@@ -33,13 +33,15 @@ use sc_client_api::{BlockImportNotification, ImportNotifications};
use futures::prelude::*;
use futures::stream::Fuse;
use futures_timer::Delay;
use futures03::{StreamExt as _, TryStreamExt as _};
use futures::channel::mpsc::UnboundedReceiver;
use finality_grandpa::voter;
use parking_lot::Mutex;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use sp_finality_grandpa::AuthorityId;
@@ -70,13 +72,15 @@ pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
}
/// Buffering imported messages until blocks with given hashes are imported.
#[pin_project::pin_project]
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> {
import_notifications: Fuse<Box<dyn Stream<Item = BlockImportNotification<Block>, Error = ()> + Send>>,
import_notifications: Fuse<UnboundedReceiver<BlockImportNotification<Block>>>,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
#[pin]
inner: Fuse<I>,
ready: VecDeque<M::Blocked>,
check_pending: Box<dyn Stream<Item = (), Error = std::io::Error> + Send>,
check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send>>,
/// Mapping block hashes to their block number, the point in time it was
/// first encountered (Instant) and a list of GRANDPA messages referencing
/// the block hash.
@@ -87,8 +91,9 @@ pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester,
impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
Block: BlockT,
BlockStatus: BlockStatusT<Block>,
BlockSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked>,
M: BlockUntilImported<Block>,
I: Stream,
{
/// Create a new `UntilImported` wrapper.
pub(crate) fn new(
@@ -105,22 +110,19 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
// used in the event of missed import notifications
const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5);
let check_pending = futures03::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay|
let check_pending = futures::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay|
Box::pin(async move {
delay.await;
Some(((), Delay::new(CHECK_PENDING_INTERVAL)))
})).map(Ok).compat();
Some((Ok(()), Delay::new(CHECK_PENDING_INTERVAL)))
}));
UntilImported {
import_notifications: {
let stream = import_notifications.map::<_, fn(_) -> _>(|v| Ok::<_, ()>(v)).compat();
Box::new(stream) as Box<dyn Stream<Item = _, Error = _> + Send>
}.fuse(),
import_notifications: import_notifications.fuse(),
block_sync_requester,
status_check,
inner: stream.fuse(),
ready: VecDeque::new(),
check_pending: Box::new(check_pending),
check_pending: Box::pin(check_pending),
pending: HashMap::new(),
identifier,
}
@@ -131,24 +133,27 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
Block: BlockT,
BStatus: BlockStatusT<Block>,
BSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item=M::Blocked,Error=Error>,
I: Stream<Item = M::Blocked>,
M: BlockUntilImported<Block>,
{
type Item = M::Blocked;
type Error = Error;
type Item = Result<M::Blocked, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// We are using a `this` variable in order to allow multiple simultaneous mutable borrow
// to `self`.
let mut this = self.project();
fn poll(&mut self) -> Poll<Option<M::Blocked>, Error> {
loop {
match self.inner.poll()? {
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::Ready(Some(input)) => {
match Stream::poll_next(Pin::new(&mut this.inner), cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(input)) => {
// new input: schedule wait of any parts which require
// blocks to be known.
let ready = &mut self.ready;
let pending = &mut self.pending;
let ready = &mut this.ready;
let pending = &mut this.pending;
M::schedule_wait(
input,
&self.status_check,
this.status_check,
|target_hash, target_number, wait| pending
.entry(target_hash)
.or_insert_with(|| (target_number, Instant::now(), Vec::new()))
@@ -157,37 +162,36 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
|ready_item| ready.push_back(ready_item),
)?;
}
Async::NotReady => break,
Poll::Pending => break,
}
}
loop {
match self.import_notifications.poll() {
Err(_) => return Err(Error::Network(format!("Failed to get new message"))),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(notification))) => {
match Stream::poll_next(Pin::new(&mut this.import_notifications), cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(notification)) => {
// new block imported. queue up all messages tied to that hash.
if let Some((_, _, messages)) = self.pending.remove(&notification.hash) {
if let Some((_, _, messages)) = this.pending.remove(&notification.hash) {
let canon_number = notification.header.number().clone();
let ready_messages = messages.into_iter()
.filter_map(|m| m.wait_completed(canon_number));
self.ready.extend(ready_messages);
this.ready.extend(ready_messages);
}
}
Ok(Async::NotReady) => break,
Poll::Pending => break,
}
}
let mut update_interval = false;
while let Async::Ready(Some(_)) = self.check_pending.poll().map_err(Error::Timer)? {
while let Poll::Ready(Some(Ok(()))) = this.check_pending.poll_next_unpin(cx) {
update_interval = true;
}
if update_interval {
let mut known_keys = Vec::new();
for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in &mut self.pending {
if let Some(number) = self.status_check.block_number(block_hash)? {
for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in this.pending.iter_mut() {
if let Some(number) = this.status_check.block_number(block_hash)? {
known_keys.push((block_hash, number));
} else {
let next_log = *last_log + LOG_PENDING_INTERVAL;
@@ -199,13 +203,13 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
Possible fork?",
block_hash,
v.len(),
self.identifier,
this.identifier,
);
// NOTE: when sending an empty vec of peers the
// underlying should make a best effort to sync the
// block from any peers it knows about.
self.block_sync_requester.set_sync_fork_request(
this.block_sync_requester.set_sync_fork_request(
vec![],
block_hash,
block_number,
@@ -217,23 +221,23 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
}
for (known_hash, canon_number) in known_keys {
if let Some((_, _, pending_messages)) = self.pending.remove(&known_hash) {
if let Some((_, _, pending_messages)) = this.pending.remove(&known_hash) {
let ready_messages = pending_messages.into_iter()
.filter_map(|m| m.wait_completed(canon_number));
self.ready.extend(ready_messages);
this.ready.extend(ready_messages);
}
}
}
if let Some(ready) = self.ready.pop_front() {
return Ok(Async::Ready(Some(ready)))
if let Some(ready) = this.ready.pop_front() {
return Poll::Ready(Some(Ok(ready)))
}
if self.import_notifications.is_done() && self.inner.is_done() {
Ok(Async::Ready(None))
if this.import_notifications.is_done() && this.inner.is_done() {
Poll::Ready(None)
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
@@ -308,6 +312,8 @@ pub(crate) struct BlockGlobalMessage<Block: BlockT> {
target_number: NumberFor<Block>,
}
impl<Block: BlockT> Unpin for BlockGlobalMessage<Block> {}
impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
type Blocked = CommunicationIn<Block>;
@@ -474,13 +480,12 @@ pub(crate) type UntilGlobalMessageBlocksImported<Block, BlockStatus, BlockSyncRe
mod tests {
use super::*;
use crate::{CatchUp, CompactCommit};
use tokio::runtime::current_thread::Runtime;
use substrate_test_runtime_client::runtime::{Block, Hash, Header};
use sp_consensus::BlockOrigin;
use sc_client_api::BlockImportNotification;
use futures::future::Either;
use futures_timer::Delay;
use futures03::{channel::mpsc, future::FutureExt as _, future::TryFutureExt as _};
use futures::channel::mpsc;
use finality_grandpa::Precommit;
#[derive(Clone)]
@@ -588,13 +593,13 @@ mod tests {
// enact all dependencies before importing the message
enact_dependencies(&chain_state);
let (global_tx, global_rx) = futures::sync::mpsc::unbounded();
let (global_tx, global_rx) = futures::channel::mpsc::unbounded();
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
TestBlockSyncRequester::default(),
block_status,
global_rx.map_err(|_| panic!("should never error")),
global_rx,
"global",
);
@@ -602,8 +607,7 @@ mod tests {
let work = until_imported.into_future();
let mut runtime = Runtime::new().unwrap();
runtime.block_on(work).map_err(|(e, _)| e).unwrap().0.unwrap()
futures::executor::block_on(work).0.unwrap().unwrap()
}
fn blocking_message_on_dependencies<F>(
@@ -615,13 +619,13 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();
let (global_tx, global_rx) = futures::sync::mpsc::unbounded();
let (global_tx, global_rx) = futures::channel::mpsc::unbounded();
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
TestBlockSyncRequester::default(),
block_status,
global_rx.map_err(|_| panic!("should never error")),
global_rx,
"global",
);
@@ -630,13 +634,10 @@ mod tests {
// NOTE: needs to be cloned otherwise it is moved to the stream and
// dropped too early.
let inner_chain_state = chain_state.clone();
let work = until_imported
.into_future()
.select2(Delay::new(Duration::from_millis(100)).unit_error().compat())
let work = future::select(until_imported.into_future(), Delay::new(Duration::from_millis(100)))
.then(move |res| match res {
Err(_) => panic!("neither should have had error"),
Ok(Either::A(_)) => panic!("timeout should have fired first"),
Ok(Either::B((_, until_imported))) => {
Either::Left(_) => panic!("timeout should have fired first"),
Either::Right((_, until_imported)) => {
// timeout fired. push in the headers.
enact_dependencies(&inner_chain_state);
@@ -644,8 +645,7 @@ mod tests {
}
});
let mut runtime = Runtime::new().unwrap();
runtime.block_on(work).map_err(|(e, _)| e).unwrap().0.unwrap()
futures::executor::block_on(work).0.unwrap().unwrap()
}
#[test]
@@ -871,7 +871,7 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();
let (global_tx, global_rx) = futures::sync::mpsc::unbounded();
let (global_tx, global_rx) = futures::channel::mpsc::unbounded();
let block_sync_requester = TestBlockSyncRequester::default();
@@ -879,7 +879,7 @@ mod tests {
import_notifications,
block_sync_requester.clone(),
block_status,
global_rx.map_err(|_| panic!("should never error")),
global_rx,
"global",
);
@@ -914,31 +914,31 @@ mod tests {
// we send the commit message and spawn the until_imported stream
global_tx.unbounded_send(unknown_commit()).unwrap();
let mut runtime = Runtime::new().unwrap();
runtime.spawn(until_imported.into_future().map(|_| ()).map_err(|_| ()));
let threads_pool = futures::executor::ThreadPool::new().unwrap();
threads_pool.spawn_ok(until_imported.into_future().map(|_| ()));
// assert that we will make sync requests
let assert = futures::future::poll_fn::<(), (), _>(|| {
let assert = futures::future::poll_fn(|_| {
let block_sync_requests = block_sync_requester.requests.lock();
// we request blocks targeted by the precommits that aren't imported
if block_sync_requests.contains(&(h2.hash(), *h2.number())) &&
block_sync_requests.contains(&(h3.hash(), *h3.number()))
{
return Ok(Async::Ready(()));
return Poll::Ready(());
}
Ok(Async::NotReady)
Poll::Pending
});
// the `until_imported` stream doesn't request the blocks immediately,
// but it should request them after a small timeout
let timeout = Delay::new(Duration::from_secs(60)).unit_error().compat();
let test = assert.select2(timeout).map(|res| match res {
Either::A(_) => {},
Either::B(_) => panic!("timed out waiting for block sync request"),
}).map_err(|_| ());
let timeout = Delay::new(Duration::from_secs(60));
let test = future::select(assert, timeout).map(|res| match res {
Either::Left(_) => {},
Either::Right(_) => panic!("timed out waiting for block sync request"),
}).map(drop);
runtime.block_on(test).unwrap();
futures::executor::block_on(test);
}
}