Switch consensus crates to new futures (#3146)

* Switch consensus-common to new futures

* Fix tests

* More tests fixing

* Fix Babe tests

* Fix Babe tests
This commit is contained in:
Pierre Krieger
2019-07-25 04:55:50 +02:00
committed by Gavin Wood
parent 3a6a309d84
commit b31dcdf342
21 changed files with 208 additions and 184 deletions
+25 -22
View File
@@ -32,7 +32,7 @@ use runtime_primitives::traits::{
Block as BlockT, Header, DigestItemFor, NumberFor, ProvideRuntimeApi,
SimpleBitOps, Zero,
};
use std::{collections::HashMap, sync::Arc, u64, fmt::{Debug, Display}, time::{Instant, Duration}};
use std::{collections::HashMap, sync::Arc, u64, fmt::{Debug, Display}, pin::Pin, time::{Instant, Duration}};
use runtime_support::serde::{Serialize, Deserialize};
use parity_codec::{Decode, Encode};
use parking_lot::{Mutex, MutexGuard};
@@ -74,9 +74,9 @@ use client::{
};
use fork_tree::ForkTree;
use slots::{CheckedHeader, check_equivocation};
use futures::{Future, IntoFuture, future, stream::Stream};
use futures03::{StreamExt as _, TryStreamExt as _};
use tokio_timer::Timeout;
use futures::{prelude::*, future};
use futures01::Stream as _;
use futures_timer::Delay;
use log::{error, warn, debug, info, trace};
use slots::{SlotWorker, SlotData, SlotInfo, SlotCompatible, SignedDuration};
@@ -182,7 +182,7 @@ pub fn start_babe<B, C, SC, E, I, SO, Error, H>(BabeParams {
force_authoring,
time_source,
}: BabeParams<C, E, I, SO, SC>) -> Result<
impl Future<Item=(), Error=()>,
impl futures01::Future<Item=(), Error=()>,
consensus_common::Error,
> where
B: BlockT<Header=H>,
@@ -190,7 +190,7 @@ pub fn start_babe<B, C, SC, E, I, SO, Error, H>(BabeParams {
C::Api: BabeApi<B>,
SC: SelectChain<B>,
E::Proposer: Proposer<B, Error=Error>,
<<E::Proposer as Proposer<B>>::Create as IntoFuture>::Future: Send + 'static,
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
H: Header<Hash=B::Hash>,
E: Environment<B, Error=Error>,
I: BlockImport<B> + Send + Sync + 'static,
@@ -214,7 +214,7 @@ pub fn start_babe<B, C, SC, E, I, SO, Error, H>(BabeParams {
sync_oracle,
inherent_data_providers,
time_source,
))
).map(|()| Ok::<(), ()>(())).compat())
}
struct BabeWorker<C, E, I, SO> {
@@ -233,7 +233,7 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
C::Api: BabeApi<B>,
E: Environment<B, Error=Error>,
E::Proposer: Proposer<B, Error=Error>,
<<E::Proposer as Proposer<B>>::Create as IntoFuture>::Future: Send + 'static,
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
Hash: Debug + Eq + Copy + SimpleBitOps + Encode + Decode + Serialize +
for<'de> Deserialize<'de> + Debug + Default + AsRef<[u8]> + AsMut<[u8]> +
std::hash::Hash + Display + Send + Sync + 'static,
@@ -242,7 +242,7 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
SO: SyncOracle + Send + Clone,
Error: std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
{
type OnSlot = Box<dyn Future<Item=(), Error=consensus_common::Error> + Send>;
type OnSlot = Pin<Box<dyn Future<Output = Result<(), consensus_common::Error>> + Send>>;
fn on_slot(
&self,
@@ -269,7 +269,7 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
telemetry!(CONSENSUS_WARN; "babe.unable_fetching_authorities";
"slot" => ?chain_head.hash(), "err" => ?e
);
return Box::new(future::ok(()));
return Box::pin(future::ready(Ok(())));
}
};
@@ -284,7 +284,7 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
telemetry!(CONSENSUS_DEBUG; "babe.skipping_proposal_slot";
"authorities_len" => authorities.len()
);
return Box::new(future::ok(()));
return Box::pin(future::ready(Ok(())));
}
let proposal_work = if let Some(claim) = claim_slot(
@@ -316,7 +316,7 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
telemetry!(CONSENSUS_WARN; "babe.unable_authoring_block";
"slot" => slot_number, "err" => ?e
);
return Box::new(future::ok(()))
return Box::pin(future::ready(Ok(())))
}
};
@@ -329,7 +329,7 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
// deadline our production to approx. the end of the slot
let remaining_duration = slot_info.remaining_duration();
Timeout::new(
futures::future::select(
proposer.propose(
slot_info.inherent_data,
generic::Digest {
@@ -338,14 +338,20 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
],
},
remaining_duration,
).into_future(),
remaining_duration,
)
).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()),
Delay::new(remaining_duration)
.map_err(|err| consensus_common::Error::FaultyTimer(err).into())
).map(|v| match v {
futures::future::Either::Left((v, _)) => v,
futures::future::Either::Right((Ok(_), _)) =>
Err(consensus_common::Error::ClientImport("Timeout in the BaBe proposer".into())),
futures::future::Either::Right((Err(err), _)) => Err(err),
})
} else {
return Box::new(future::ok(()));
return Box::pin(future::ready(Ok(())));
};
Box::new(proposal_work.map(move |b| {
Box::pin(proposal_work.map_ok(move |b| {
// minor hack since we don't have access to the timestamp
// that is actually set by the proposer.
let slot_after_building = SignedDuration::default().slot_now(slot_duration);
@@ -402,9 +408,6 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
"hash" => ?parent_hash, "err" => ?e
);
}
}).map_err(|e| {
warn!("Client import failed: {:?}", e);
consensus_common::Error::ClientImport(format!("{:?}", e)).into()
}))
}
}
@@ -1137,7 +1140,7 @@ pub fn import_queue<B, E, Block: BlockT<Hash=H256>, I, RA, PRA>(
BabeImportQueue<Block>,
BabeLink,
BabeBlockImport<B, E, Block, I, RA, PRA>,
impl Future<Item = (), Error = ()>,
impl futures01::Future<Item = (), Error = ()>,
)> where
B: Backend<Block, Blake2Hasher> + 'static,
I: BlockImport<Block> + Clone + Send + Sync + 'static,
+11 -14
View File
@@ -32,7 +32,6 @@ use keyring::sr25519::Keyring;
use super::generic::DigestItem;
use client::BlockchainEvents;
use test_client;
use futures::Async;
use log::debug;
use std::{time::Duration, borrow::Borrow, cell::RefCell};
type Item = generic::DigestItem<Hash>;
@@ -62,15 +61,15 @@ impl Environment<TestBlock> for DummyFactory {
impl Proposer<TestBlock> for DummyProposer {
type Error = Error;
type Create = Result<TestBlock, Error>;
type Create = future::Ready<Result<TestBlock, Error>>;
fn propose(
&self,
_: InherentData,
digests: DigestFor<TestBlock>,
_: Duration,
) -> Result<TestBlock, Error> {
self.1.new_block(digests).unwrap().bake().map_err(|e| e.into())
) -> Self::Create {
future::ready(self.1.new_block(digests).unwrap().bake().map_err(|e| e.into()))
}
}
@@ -203,9 +202,8 @@ fn run_one_test() {
let environ = Arc::new(DummyFactory(client.clone()));
import_notifications.push(
client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &5))
.for_each(move |_| Ok(()))
.take_while(|n| future::ready(!(n.origin != BlockOrigin::Own && n.header.number() < &5)))
.for_each(move |_| future::ready(()))
);
let config = Config::get_or_compute(&*client)
@@ -234,14 +232,13 @@ fn run_one_test() {
}).expect("Starts babe"));
}
// wait for all finalized on each.
let wait_for = futures::future::join_all(import_notifications);
let drive_to_completion = futures::future::poll_fn(|| {
runtime.spawn(futures01::future::poll_fn(move || {
net.lock().poll();
Ok(Async::NotReady)
});
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
Ok::<_, ()>(futures01::Async::NotReady::<()>)
}));
runtime.block_on(future::join_all(import_notifications)
.map(|_| Ok::<(), ()>(())).compat()).unwrap();
}
#[test]