mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 21:01:02 +00:00
Rewrite some Future structs as async functions (#679)
* Squashed commit of the following: commit e97a17157ae0887320994661e2f816275fc75b76 Author: Ashley <ashley.ruglys@gmail.com> Date: Tue Dec 10 15:06:28 2019 +0100 Rewrite some functions as async commit 970e485179f1e087cf0a51c6a4e71f923e87df45 Merge: f98966acdf3ea965Author: Ashley <ashley.ruglys@gmail.com> Date: Tue Dec 10 11:19:37 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-futures-update commit f98966ac188067158071d1e3e243c34ea5738f56 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 23:40:20 2019 +0100 Add async blocks back in commit 7fa88af0271db659de9274c94cb8e7eead0e4289 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 23:17:02 2019 +0100 Revert "Asyncify network functions" This reverts commit f20ae6548dc482cb1e75bc80641cfe55c6131a53. commit 82413550cdac40bd14a09f62df12de49dd7e55af Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 19:09:55 2019 +0100 Fix validation test again commit 47e002b08369c9c775b92aea9b6f6ed81b30241b Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 19:07:43 2019 +0100 Switch favicon commit 0c5c1409078fc57120a39e40ec5cb1763d67d593 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 18:54:10 2019 +0100 Fix validation test commit 8bb6a0189fe824da09054cbf5b06f11a0f87072d Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 18:53:54 2019 +0100 Nits commit 33410f3a4910d3e688956cecfcca02cc2dfa6a7a Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 18:43:09 2019 +0100 Fix av store test commit f0c517eb240c42848cdb3305e0b554ef407bdfaa Merge: 938f411a 60e72111 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 18:21:39 2019 +0100 Merge branch 'ashley-futures-updates' into ashley-futures-update commit 60e72111651f2b366592c1e56756c6bf5d8ce2f1 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 18:19:40 2019 +0100 Clean up browser validation worker error commit f20ae6548dc482cb1e75bc80641cfe55c6131a53 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 18:16:40 2019 +0100 Asyncify network functions commit b22758d0a3852d701923bd238484e1c9eabec5e2 Merge: 2e8b05edef562cd7Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 17:47:26 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-futures-updates commit 2e8b05edf1a1fadd6943f967c27b6d34675ba06a Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 17:45:52 2019 +0100 Box pin changes commit 08bfdf7f2d27721abffee49221213304ebc4fd47 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 17:15:38 2019 +0100 Update network/src/lib.rs Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com> commit d8be456c508d5e5a03178db45d9f272b302a8a65 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 17:15:32 2019 +0100 Update network/src/lib.rs Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com> commit ec7367276fdd374b19f41555fd5985454c559600 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 17:14:36 2019 +0100 Update availability-store/src/worker.rs Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com> commit 938f411a9365e9c5fb16bfedb62aacac4403d063 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 17:05:05 2019 +0100 Revert "Revert removal of tokio_executor that causes tokio version mismatch panic" This reverts commit cfeb50c01d8df5e209483406a711e64761b44ae9. commit f92f58044b4fe04bde73a60820d154080dd64b16 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 15:47:35 2019 +0100 Fix adder test parachain commit cfeb50c01d8df5e209483406a711e64761b44ae9 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 15:31:36 2019 +0100 Revert removal of tokio_executor that causes tokio version mismatch panic commit 5bcb83a122b9a30f240a238ca670c6b658f4ddf1 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 15:17:55 2019 +0100 Fix typo commit fc02b1dc16e277649677396833a8d70e8588a56c Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 15:02:50 2019 +0100 Fix collator commit 6c4ff5b3bf1084a618ffec2d864090c9c8077f0f Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 14:35:37 2019 +0100 Small changes commit e1338cb4450df5377d8c911da56445914d667472 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 14:24:42 2019 +0100 Fix network tests commit 4e458f7a91c1ed5c986795f40ed55e596d176c4b Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 12:25:26 2019 +0100 Remove futures01 from availability-store commit 5729f6cd6b53f061ff155320c815509feb02309e Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 12:22:33 2019 +0100 Fix validation tests commit a820612565b42780f8b6c09c9c1c30f06a9985ba Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 12:01:48 2019 +0100 Fix availability store tests commit 112344faeee5f8f03b3b87c6baf7036a7fcbe415 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 11:36:03 2019 +0100 Update tokio version commit d2de6d8b3f0c3682679fe437d5459ac50a3c3895 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 11:33:25 2019 +0100 Revert cli tokio version to avoid libp2p panic commit 0c5f24e0c1131ac58a947448456e7fb62c869702 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 9 11:27:13 2019 +0100 Switch to polkadot-master commit 2e2311e33a4af87c2c545094ea8cb595cd6cfe2d Author: Ashley <ashley.ruglys@gmail.com> Date: Fri Dec 6 15:07:21 2019 +0100 Re-add release flag commit 6adc1b6114e154a590acf82acfaf0c1265409518 Merge: 9767f832533c80adAuthor: Ashley <ashley.ruglys@gmail.com> Date: Fri Dec 6 13:36:35 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit 9767f8325c33211065ef6830becdac0e3cf852de Merge: c528dc6d0bf7d294Author: Ashley <ashley.ruglys@gmail.com> Date: Wed Dec 4 17:11:39 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit c528dc6df8fc31cdcbc10889636355241398debd Author: Ashley <ashley.ruglys@gmail.com> Date: Wed Dec 4 17:07:00 2019 +0100 Fix wasm build commit da233a122c678dc7767dac7cc6e2564575b15cc8 Author: Ashley <ashley.ruglys@gmail.com> Date: Wed Dec 4 16:25:49 2019 +0100 tidy commit 832f8054df78afbcef1903e0f9e7e246b348c10d Merge: 4e1da888121c917dAuthor: Ashley <ashley.ruglys@gmail.com> Date: Wed Dec 4 15:56:56 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit 4e1da8888dd2160064dd453782fb05513c65ade4 Author: Ashley <ashley.ruglys@gmail.com> Date: Tue Dec 3 16:47:02 2019 +0100 Temp switch back to substrate/master commit af88a87338688797bbc52315fdd0fc22cf23c6cf Merge: a03a980c7832ad93Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 2 19:33:14 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit a03a980ce417ec7b446bfcbe7a66ec0ed6458135 Merge: 31a88a930c1ef335Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 2 13:52:37 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit 31a88a930ffdf5da72b3e587ec8c0e6b00922e3e Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 2 13:52:35 2019 +0100 Tidy commit 5b33b7a7af08d7a3aa3853b8e4995484fb640d52 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 2 11:55:51 2019 +0100 Add browser-demo commit 868f6e51dfdc0a64252acd9adabe7b9ba436b1f4 Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 2 10:51:57 2019 +0100 Add initial browser file commit e5e399c20f1dc4e1023ee57773dcdd9ab2a0a14b Author: Ashley <ashley.ruglys@gmail.com> Date: Mon Dec 2 10:45:02 2019 +0100 Add browser-demo commit 408288b05292d952944a6b8e1f2bcf9cf259a040 Author: Ashley <ashley.ruglys@gmail.com> Date: Sun Dec 1 19:28:33 2019 +0100 Get polkadot to compile via wasm! commit 04ffe72e868be57841d31f01eec1b90423a595d6 Author: Ashley <ashley.ruglys@gmail.com> Date: Sun Dec 1 19:28:16 2019 +0100 Migrate service commit 119f0829a53b825a3ebc9efdefa76ae7eabb04aa Merge: 93fb642837fec553Author: Ashley <ashley.ruglys@gmail.com> Date: Sun Dec 1 17:43:49 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit 93fb6428501bac612a1675cf3b6e3d26f5bbc7c2 Author: Ashley <ashley.ruglys@gmail.com> Date: Sun Dec 1 12:21:25 2019 +0100 Switch branch commit 0c4fe8331bdc9665ac2427eb8c795112ac728d70 Author: Ashley <ashley.ruglys@gmail.com> Date: Sat Nov 30 11:45:59 2019 +0100 Tidy up validation commit 73563253d95962657108820ae130a8d3f3093ee8 Author: Ashley <ashley.ruglys@gmail.com> Date: Sat Nov 30 11:39:09 2019 +0100 Tidy up network commit 1c9cf0427c0e2d15c4b6d52b91d67d4a3963e30d Author: Ashley <ashley.ruglys@gmail.com> Date: Sat Nov 30 01:16:35 2019 +0100 Final changes to validation commit 322cca5224fdca0a29d88ff91700ef704a9d0c2a Author: Ashley <ashley.ruglys@gmail.com> Date: Sat Nov 30 00:31:55 2019 +0100 Migrate network to std futures commit 96f1a99491f5ae2957effa58cc1e385014575a32 Author: Ashley <ashley.ruglys@gmail.com> Date: Fri Nov 29 23:31:04 2019 +0100 Migrate validation to std futures commit aaf5e55fffd1367c05687eb34f4365a24e3a34c0 Author: Ashley <ashley.ruglys@gmail.com> Date: Fri Nov 29 17:10:11 2019 +0100 Switch to Spawn trait commit 2ab282f57e8b9a55cf8d285b283cf009216511d2 Merge: cceb6b72ed7ee572Author: Ashley <ashley.ruglys@gmail.com> Date: Fri Nov 29 16:31:24 2019 +0100 Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm commit cceb6b72f5677a1c43d2cd61bd525539054f0c01 Author: Ashley <ashley.ruglys@gmail.com> Date: Fri Nov 29 15:47:14 2019 +0100 Make validation work on wasm! commit b45a95cf7d829a916bf2ad6936d1e7f4b6f3ef77 Merge:4ec635eedb7eaa6b Author: Ashley <ashley.ruglys@gmail.com> Date: Fri Nov 29 13:57:23 2019 +0100 Merge remote-tracking branch 'tomaka/wasm-start' into HEAD commit db7eaa6bd5d3bbcea829570fb47ab4d06f3558ce Merge: 6f97dbb7f826ce53Author: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Thu Nov 28 13:58:15 2019 +0100 Merge branch 'master' into wasm-start commit 6f97dbb786750d854cf8f7a56c6a336ea5979228 Author: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Thu Nov 28 12:47:45 2019 +0100 Use --manifest-path instead commit 20104e98ff1713b6c81b0251b43d060d4e672d55 Author: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Thu Nov 28 10:44:51 2019 +0100 Make availability-store compile for WASM * Fix build * Fix futures blocking panic in validators (again) * Deindent
This commit is contained in:
@@ -48,6 +48,7 @@ use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
|
||||
use log::{warn, error};
|
||||
@@ -242,20 +243,26 @@ impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E,
|
||||
SP: Spawn + Clone + Send + Sync
|
||||
{
|
||||
type Error = String;
|
||||
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
|
||||
type FutureEgress = Pin<Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Send>>;
|
||||
|
||||
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
|
||||
// TODO: https://github.com/paritytech/polkadot/issues/253
|
||||
//
|
||||
// Fetch ingress and accumulate all unrounted egress
|
||||
let _session = self.network.instantiate_leaf_work(LeafWorkParams {
|
||||
local_session_key: None,
|
||||
parent_hash: self.parent_hash,
|
||||
authorities: self.validators.clone(),
|
||||
})
|
||||
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));
|
||||
let network = self.network.clone();
|
||||
let parent_hash = self.parent_hash;
|
||||
let authorities = self.validators.clone();
|
||||
|
||||
Box::new(future::ok(ConsolidatedIngress(Vec::new())))
|
||||
async move {
|
||||
// TODO: https://github.com/paritytech/polkadot/issues/253
|
||||
//
|
||||
// Fetch ingress and accumulate all unrounted egress
|
||||
let _session = network.instantiate_leaf_work(LeafWorkParams {
|
||||
local_session_key: None,
|
||||
parent_hash,
|
||||
authorities,
|
||||
})
|
||||
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));
|
||||
|
||||
Ok(ConsolidatedIngress(Vec::new()))
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -425,7 +432,7 @@ impl<P, E> Worker for CollationNode<P, E> where
|
||||
);
|
||||
|
||||
let exit = inner_exit_2.clone();
|
||||
tokio::spawn(future::select(res, exit).map(drop));
|
||||
tokio::spawn(future::select(res.boxed(), exit).map(drop));
|
||||
})
|
||||
});
|
||||
|
||||
|
||||
+13
-21
@@ -28,7 +28,6 @@ pub mod gossip;
|
||||
use codec::{Decode, Encode};
|
||||
use futures::channel::{oneshot, mpsc};
|
||||
use futures::prelude::*;
|
||||
use futures::future::Either;
|
||||
use polkadot_primitives::{Block, Hash, Header};
|
||||
use polkadot_primitives::parachain::{
|
||||
Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
|
||||
@@ -837,25 +836,6 @@ impl PolkadotProtocol {
|
||||
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
|
||||
relay_parent, collation.info.parachain_index);
|
||||
|
||||
let res = match self.availability_store {
|
||||
Some(ref availability_store) => {
|
||||
let availability_store_cloned = availability_store.clone();
|
||||
let collation_cloned = collation.clone();
|
||||
Either::Left((async move {
|
||||
let _ = availability_store_cloned.make_available(av_store::Data {
|
||||
relay_parent,
|
||||
parachain_id: collation_cloned.info.parachain_index,
|
||||
block_data: collation_cloned.pov.block_data.clone(),
|
||||
outgoing_queues: Some(outgoing_targeted.clone().into()),
|
||||
}).await;
|
||||
}
|
||||
)
|
||||
.boxed()
|
||||
)
|
||||
}
|
||||
None => Either::Right(futures::future::ready(())),
|
||||
};
|
||||
|
||||
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
|
||||
match self.validators.get(&primary) {
|
||||
Some(who) => {
|
||||
@@ -871,7 +851,19 @@ impl PolkadotProtocol {
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
let availability_store = self.availability_store.clone();
|
||||
let collation_cloned = collation.clone();
|
||||
|
||||
async move {
|
||||
if let Some(availability_store) = availability_store {
|
||||
let _ = availability_store.make_available(av_store::Data {
|
||||
relay_parent,
|
||||
parachain_id: collation_cloned.info.parachain_index,
|
||||
block_data: collation_cloned.pov.block_data.clone(),
|
||||
outgoing_queues: Some(outgoing_targeted.clone().into()),
|
||||
}).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Give the network protocol a handle to an availability store, used for
|
||||
|
||||
@@ -41,8 +41,9 @@ use log::{debug, trace};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::validation::{self, LeafWorkDataFetcher, Executor};
|
||||
use crate::validation::{LeafWorkDataFetcher, Executor};
|
||||
use crate::NetworkService;
|
||||
|
||||
/// Compute the gossip topic for attestations on the given parent hash.
|
||||
@@ -232,7 +233,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
|
||||
E: Future<Output=()> + Clone + Send + 'static,
|
||||
{
|
||||
type Error = io::Error;
|
||||
type FetchValidationProof = validation::PoVReceiver;
|
||||
type FetchValidationProof = Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>>;
|
||||
|
||||
// We have fetched from a collator and here the receipt should have been already formed.
|
||||
fn local_collation(
|
||||
|
||||
@@ -41,7 +41,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Poll, Context};
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use futures::{prelude::*, channel::mpsc, future::{select, Either}};
|
||||
use codec::Encode;
|
||||
|
||||
use super::{TestContext, TestChainContext};
|
||||
@@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification {
|
||||
}
|
||||
}
|
||||
|
||||
struct GossipRouter {
|
||||
incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
|
||||
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
|
||||
outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
|
||||
messages: Vec<(Hash, TopicNotification)>,
|
||||
}
|
||||
async fn gossip_router(
|
||||
mut incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
|
||||
mut incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>
|
||||
) {
|
||||
let mut outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)> = Vec::new();
|
||||
let mut messages = Vec::new();
|
||||
|
||||
impl GossipRouter {
|
||||
fn add_message(&mut self, topic: Hash, message: TopicNotification) {
|
||||
self.outgoing.retain(|&(ref o_topic, ref sender)| {
|
||||
o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
|
||||
});
|
||||
self.messages.push((topic, message));
|
||||
}
|
||||
loop {
|
||||
match select(incoming_messages.next(), incoming_streams.next()).await {
|
||||
Either::Left((Some((topic, message)), _)) => {
|
||||
outgoing.retain(|&(ref o_topic, ref sender)| {
|
||||
o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
|
||||
});
|
||||
messages.push((topic, message));
|
||||
},
|
||||
Either::Right((Some((topic, sender)), _)) => {
|
||||
for message in messages.iter()
|
||||
.filter(|&&(ref t, _)| t == &topic)
|
||||
.map(|&(_, ref msg)| clone_gossip(msg))
|
||||
{
|
||||
if let Err(_) = sender.unbounded_send(message) { return }
|
||||
}
|
||||
|
||||
fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<TopicNotification>) {
|
||||
for message in self.messages.iter()
|
||||
.filter(|&&(ref t, _)| t == &topic)
|
||||
.map(|&(_, ref msg)| clone_gossip(msg))
|
||||
{
|
||||
if let Err(_) = sender.unbounded_send(message) { return }
|
||||
outgoing.push((topic, sender));
|
||||
},
|
||||
Either::Left((None, _)) | Either::Right((None, _)) => panic!("ended early.")
|
||||
}
|
||||
|
||||
self.outgoing.push((topic, sender));
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for GossipRouter {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
|
||||
loop {
|
||||
match Pin::new(&mut this.incoming_messages).poll_next(cx) {
|
||||
Poll::Ready(Some((topic, message))) => this.add_message(topic, message),
|
||||
Poll::Ready(None) => panic!("ended early."),
|
||||
Poll::Pending => break,
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
match Pin::new(&mut this.incoming_streams).poll_next(cx) {
|
||||
Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender),
|
||||
Poll::Ready(None) => panic!("ended early."),
|
||||
Poll::Pending => break,
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
struct GossipHandle {
|
||||
send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>,
|
||||
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
|
||||
}
|
||||
|
||||
fn make_gossip() -> (GossipRouter, GossipHandle) {
|
||||
fn make_gossip() -> (impl Future<Output = ()>, GossipHandle) {
|
||||
let (message_tx, message_rx) = mpsc::unbounded();
|
||||
let (listener_tx, listener_rx) = mpsc::unbounded();
|
||||
|
||||
(
|
||||
GossipRouter {
|
||||
incoming_messages: message_rx,
|
||||
incoming_streams: listener_rx,
|
||||
outgoing: Vec::new(),
|
||||
messages: Vec::new(),
|
||||
},
|
||||
gossip_router(message_rx, listener_rx),
|
||||
GossipHandle { send_message: message_tx, send_listener: listener_tx },
|
||||
)
|
||||
}
|
||||
@@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork<
|
||||
>;
|
||||
|
||||
struct Built {
|
||||
gossip: GossipRouter,
|
||||
gossip: Pin<Box<dyn Future<Output = ()>>>,
|
||||
api_handle: Arc<Mutex<ApiData>>,
|
||||
networks: Vec<TestValidationNetwork>,
|
||||
}
|
||||
@@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
|
||||
let networks: Vec<_> = networks.collect();
|
||||
|
||||
Built {
|
||||
gossip: gossip_router,
|
||||
gossip: gossip_router.boxed(),
|
||||
api_handle,
|
||||
networks,
|
||||
}
|
||||
|
||||
@@ -33,14 +33,13 @@ use polkadot_primitives::parachain::{
|
||||
use futures::prelude::*;
|
||||
use futures::task::SpawnExt;
|
||||
pub use futures::task::Spawn as Executor;
|
||||
use futures::channel::oneshot::{self, Receiver};
|
||||
use futures::channel::oneshot;
|
||||
use futures::future::{ready, select};
|
||||
|
||||
use std::collections::hash_map::{HashMap, Entry};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Poll, Context};
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
use parking_lot::Mutex;
|
||||
@@ -242,47 +241,30 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct NetworkDown;
|
||||
|
||||
/// A future that resolves when a collation is received.
|
||||
pub struct AwaitingCollation {
|
||||
outer: oneshot::Receiver<oneshot::Receiver<Collation>>,
|
||||
inner: Option<oneshot::Receiver<Collation>>
|
||||
}
|
||||
|
||||
impl Future for AwaitingCollation {
|
||||
type Output = Result<Collation, NetworkDown>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
|
||||
if let Some(ref mut inner) = this.inner {
|
||||
return Pin::new(inner).poll(cx).map_err(|_| NetworkDown)
|
||||
}
|
||||
match Pin::new(&mut this.outer).poll(cx) {
|
||||
Poll::Ready(Ok(inner)) => {
|
||||
this.inner = Some(inner);
|
||||
Pin::new(this).poll(cx)
|
||||
},
|
||||
Poll::Ready(Err(_)) => Poll::Ready(Err(NetworkDown)),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
|
||||
P: ProvideRuntimeApi + Send + Sync + 'static,
|
||||
P::Api: ParachainHost<Block>,
|
||||
N: NetworkService,
|
||||
{
|
||||
type Error = NetworkDown;
|
||||
type Collation = AwaitingCollation;
|
||||
type Collation = Pin<Box<dyn Future<Output = Result<Collation, NetworkDown>> + Send>>;
|
||||
|
||||
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.network.with_spec(move |spec, _| {
|
||||
let collation = spec.await_collation(relay_parent, parachain);
|
||||
let _ = tx.send(collation);
|
||||
});
|
||||
AwaitingCollation{outer: rx, inner: None}
|
||||
let network = self.network.clone();
|
||||
|
||||
// A future that resolves when a collation is received.
|
||||
async move {
|
||||
network.with_spec(move |spec, _| {
|
||||
let collation = spec.await_collation(relay_parent, parachain);
|
||||
let _ = tx.send(collation);
|
||||
});
|
||||
|
||||
rx.await
|
||||
.map_err(|_| NetworkDown)?
|
||||
.await
|
||||
.map_err(|_| NetworkDown)
|
||||
}.boxed()
|
||||
}
|
||||
|
||||
|
||||
@@ -348,27 +330,6 @@ impl Knowledge {
|
||||
}
|
||||
}
|
||||
|
||||
/// receiver for incoming data.
|
||||
#[derive(Clone)]
|
||||
pub struct IncomingReceiver {
|
||||
inner: future::Shared<Receiver<Incoming>>
|
||||
}
|
||||
|
||||
impl Future for IncomingReceiver {
|
||||
type Output = Result<Incoming, io::Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
match Pin::new(&mut Pin::into_inner(self).inner).poll(cx) {
|
||||
Poll::Ready(Ok(i)) => Poll::Ready(Ok(Incoming::clone(&i))),
|
||||
Poll::Ready(Err(_)) => Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Sending end of channel hung up",
|
||||
))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A current validation leaf-work instance
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct LiveValidationLeaf {
|
||||
@@ -564,36 +525,6 @@ impl LiveValidationLeaves {
|
||||
}
|
||||
}
|
||||
|
||||
/// Receiver for block data.
|
||||
pub struct PoVReceiver {
|
||||
outer: Receiver<Receiver<PoVBlock>>,
|
||||
inner: Option<Receiver<PoVBlock>>
|
||||
}
|
||||
|
||||
impl Future for PoVReceiver {
|
||||
type Output = Result<PoVBlock, io::Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
|
||||
let map_err = |_| io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Sending end of channel hung up",
|
||||
);
|
||||
|
||||
if let Some(ref mut inner) = this.inner {
|
||||
return Pin::new(inner).poll(cx).map_err(map_err);
|
||||
}
|
||||
match Pin::new(&mut this.outer).poll(cx).map_err(map_err)? {
|
||||
Poll::Ready(inner) => {
|
||||
this.inner = Some(inner);
|
||||
Pin::new(this).poll(cx)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Can fetch data for a given validation leaf-work instance.
|
||||
pub struct LeafWorkDataFetcher<P, E, N: NetworkService, T> {
|
||||
network: Arc<N>,
|
||||
@@ -658,9 +589,14 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
|
||||
E: Future<Output=()> + Clone + Send + 'static,
|
||||
{
|
||||
/// Fetch PoV block for the given candidate receipt.
|
||||
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
|
||||
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt)
|
||||
-> Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>> {
|
||||
|
||||
let parachain = candidate.parachain_index;
|
||||
let parent_hash = self.parent_hash;
|
||||
let network = self.network.clone();
|
||||
let candidate = candidate.clone();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let canon_roots = self.api.runtime_api().ingress(
|
||||
&BlockId::hash(parent_hash),
|
||||
@@ -676,15 +612,24 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
|
||||
)
|
||||
);
|
||||
|
||||
let candidate = candidate.clone();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.network.with_spec(move |spec, ctx| {
|
||||
if let Ok(Some(canon_roots)) = canon_roots {
|
||||
let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots);
|
||||
let _ = tx.send(inner_rx);
|
||||
}
|
||||
});
|
||||
PoVReceiver { outer: rx, inner: None }
|
||||
async move {
|
||||
network.with_spec(move |spec, ctx| {
|
||||
if let Ok(Some(canon_roots)) = canon_roots {
|
||||
let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots);
|
||||
let _ = tx.send(inner_rx);
|
||||
}
|
||||
});
|
||||
|
||||
let map_err = |_| io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Sending end of channel hung up",
|
||||
);
|
||||
|
||||
rx.await
|
||||
.map_err(map_err)?
|
||||
.await
|
||||
.map_err(map_err)
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ edition = "2018"
|
||||
futures = "0.3.1"
|
||||
futures-timer = "2.0"
|
||||
parking_lot = "0.9.0"
|
||||
tokio = { version = "0.2.4", features = ["rt-core", "blocking"] }
|
||||
tokio = { version = "0.2.4", features = ["rt-core"] }
|
||||
derive_more = "0.14.1"
|
||||
log = "0.4.8"
|
||||
exit-future = "0.2.0"
|
||||
|
||||
@@ -32,8 +32,6 @@ use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, Messag
|
||||
use trie::TrieConfiguration;
|
||||
use futures::prelude::*;
|
||||
use log::debug;
|
||||
use std::task::{Poll, Context};
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Encapsulates connections to collators and allows collation on any parachain.
|
||||
///
|
||||
@@ -58,94 +56,41 @@ pub trait Collators: Clone {
|
||||
}
|
||||
|
||||
/// A future which resolves when a collation is available.
|
||||
///
|
||||
/// This future is fused.
|
||||
pub struct CollationFetch<C: Collators, P> {
|
||||
pub async fn collation_fetch<C: Collators, P>(
|
||||
parachain: ParaId,
|
||||
relay_parent_hash: Hash,
|
||||
relay_parent: BlockId,
|
||||
collators: C,
|
||||
live_fetch: Option<C::Collation>,
|
||||
client: Arc<P>,
|
||||
max_block_data_size: Option<u64>,
|
||||
}
|
||||
|
||||
impl<C: Collators, P> CollationFetch<C, P> {
|
||||
/// Create a new collation fetcher for the given chain.
|
||||
pub fn new(
|
||||
parachain: ParaId,
|
||||
relay_parent_hash: Hash,
|
||||
collators: C,
|
||||
client: Arc<P>,
|
||||
max_block_data_size: Option<u64>,
|
||||
) -> Self {
|
||||
CollationFetch {
|
||||
relay_parent: BlockId::hash(relay_parent_hash),
|
||||
relay_parent_hash,
|
||||
collators,
|
||||
client,
|
||||
parachain,
|
||||
live_fetch: None,
|
||||
max_block_data_size,
|
||||
}
|
||||
}
|
||||
|
||||
/// Access the underlying relay parent hash.
|
||||
pub fn relay_parent(&self) -> Hash {
|
||||
self.relay_parent_hash
|
||||
}
|
||||
|
||||
/// Access the local parachain ID.
|
||||
pub fn parachain(&self) -> ParaId {
|
||||
self.parachain
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, P> Future for CollationFetch<C, P>
|
||||
) -> Result<(Collation, OutgoingMessages, Balance),C::Error>
|
||||
where
|
||||
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||
C: Collators + Unpin,
|
||||
P: ProvideRuntimeApi,
|
||||
<C as Collators>::Collation: Unpin,
|
||||
{
|
||||
type Output = Result<(Collation, OutgoingMessages, Balance),C::Error>;
|
||||
let relay_parent = BlockId::hash(relay_parent_hash);
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
loop {
|
||||
let collation = collators.collate(parachain, relay_parent_hash)
|
||||
.await?;
|
||||
|
||||
loop {
|
||||
let collation = {
|
||||
let parachain = this.parachain.clone();
|
||||
let (r, c) = (this.relay_parent_hash, &this.collators);
|
||||
let res = validate_collation(
|
||||
&*client,
|
||||
&relay_parent,
|
||||
&collation,
|
||||
max_block_data_size,
|
||||
);
|
||||
|
||||
let future = this.live_fetch
|
||||
.get_or_insert_with(move || c.collate(parachain, r));
|
||||
match res {
|
||||
Ok((messages, fees)) => {
|
||||
return Ok((collation, messages, fees))
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to validate parachain due to API error: {}", e);
|
||||
|
||||
match Pin::new(future).poll(cx) {
|
||||
Poll::Ready(Ok(c)) => c,
|
||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
|
||||
Poll::Pending => return Poll::Pending
|
||||
}
|
||||
};
|
||||
|
||||
let res = validate_collation(
|
||||
&*this.client,
|
||||
&this.relay_parent,
|
||||
&collation,
|
||||
this.max_block_data_size,
|
||||
);
|
||||
|
||||
match res {
|
||||
Ok((messages, fees)) => {
|
||||
return Poll::Ready(Ok((collation, messages, fees)))
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to validate parachain due to API error: {}", e);
|
||||
|
||||
// just continue if we got a bad collation or failed to validate
|
||||
this.live_fetch = None;
|
||||
this.collators.note_bad_collator(collation.info.collator)
|
||||
}
|
||||
// just continue if we got a bad collation or failed to validate
|
||||
collators.note_bad_collator(collation.info.collator)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+47
-161
@@ -34,8 +34,6 @@ use std::{
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
time::{self, Duration, Instant},
|
||||
task::{Poll, Context},
|
||||
mem,
|
||||
};
|
||||
|
||||
use babe_primitives::BabeApi;
|
||||
@@ -60,8 +58,8 @@ use txpool_api::{TransactionPool, InPoolTransaction};
|
||||
|
||||
use attestation_service::ServiceHandle;
|
||||
use futures::prelude::*;
|
||||
use futures::{future::{self, Either, select, ready}, stream::unfold, task::{Spawn, SpawnExt}};
|
||||
use collation::CollationFetch;
|
||||
use futures::{future::{select, ready}, stream::unfold, task::{Spawn, SpawnExt}};
|
||||
use collation::collation_fetch;
|
||||
use dynamic_inclusion::DynamicInclusion;
|
||||
use inherents::InherentData;
|
||||
use sp_timestamp::TimestampInherentData;
|
||||
@@ -396,7 +394,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where
|
||||
|
||||
let with_router = move |router: N::TableRouter| {
|
||||
// fetch a local collation from connected collators.
|
||||
let collation_work = CollationFetch::new(
|
||||
let collation_work = collation_fetch(
|
||||
validation_para,
|
||||
relay_parent,
|
||||
collators,
|
||||
@@ -611,14 +609,13 @@ impl<C, TxPool> consensus::Proposer<Block> for Proposer<C, TxPool> where
|
||||
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
||||
{
|
||||
type Error = Error;
|
||||
type Create = Either<CreateProposal<C, TxPool>, future::Ready<Result<Block, Error>>>;
|
||||
type Create = Pin<Box<dyn Future<Output = Result<Block, Error>> + Send>>;
|
||||
|
||||
fn propose(&mut self,
|
||||
inherent_data: InherentData,
|
||||
inherent_digests: DigestFor<Block>,
|
||||
max_duration: Duration,
|
||||
) -> Self::Create {
|
||||
const ATTEMPT_PROPOSE_EVERY: Duration = Duration::from_millis(100);
|
||||
const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
|
||||
|
||||
let initial_included = self.tracker.table.includable_count();
|
||||
@@ -630,56 +627,59 @@ impl<C, TxPool> consensus::Proposer<Block> for Proposer<C, TxPool> where
|
||||
Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
|
||||
);
|
||||
|
||||
let enough_candidates = dynamic_inclusion.acceptable_in(
|
||||
now,
|
||||
initial_included,
|
||||
).unwrap_or_else(|| Duration::from_millis(1));
|
||||
let parent_hash = self.parent_hash.clone();
|
||||
let parent_number = self.parent_number.clone();
|
||||
let parent_id = self.parent_id.clone();
|
||||
let client = self.client.clone();
|
||||
let transaction_pool = self.transaction_pool.clone();
|
||||
let table = self.tracker.table.clone();
|
||||
|
||||
let believed_timestamp = match inherent_data.timestamp_inherent_data() {
|
||||
Ok(timestamp) => timestamp,
|
||||
Err(e) => return Either::Right(future::err(Error::InherentError(e))),
|
||||
};
|
||||
async move {
|
||||
let enough_candidates = dynamic_inclusion.acceptable_in(
|
||||
now,
|
||||
initial_included,
|
||||
).unwrap_or_else(|| Duration::from_millis(1));
|
||||
|
||||
// set up delay until next allowed timestamp.
|
||||
let current_timestamp = current_timestamp();
|
||||
let delay_future = if current_timestamp >= believed_timestamp {
|
||||
None
|
||||
} else {
|
||||
Some(Delay::new(Duration::from_millis (current_timestamp - believed_timestamp)))
|
||||
};
|
||||
let believed_timestamp = match inherent_data.timestamp_inherent_data() {
|
||||
Ok(timestamp) => timestamp,
|
||||
Err(e) => return Err(Error::InherentError(e)),
|
||||
};
|
||||
|
||||
let timing = ProposalTiming {
|
||||
minimum: delay_future,
|
||||
attempt_propose: Box::new(interval(ATTEMPT_PROPOSE_EVERY)),
|
||||
enough_candidates: Delay::new(enough_candidates),
|
||||
dynamic_inclusion,
|
||||
last_included: initial_included,
|
||||
};
|
||||
let deadline_diff = max_duration - max_duration / 3;
|
||||
let deadline = match Instant::now().checked_add(deadline_diff) {
|
||||
None => return Err(Error::DeadlineComputeFailure(deadline_diff)),
|
||||
Some(d) => d,
|
||||
};
|
||||
|
||||
let deadline_diff = max_duration - max_duration / 3;
|
||||
let deadline = match Instant::now().checked_add(deadline_diff) {
|
||||
None => return Either::Right(
|
||||
future::err(Error::DeadlineComputeFailure(deadline_diff)),
|
||||
),
|
||||
Some(d) => d,
|
||||
};
|
||||
|
||||
Either::Left(CreateProposal {
|
||||
state: CreateProposalState::Pending(CreateProposalData {
|
||||
parent_hash: self.parent_hash.clone(),
|
||||
parent_number: self.parent_number.clone(),
|
||||
parent_id: self.parent_id.clone(),
|
||||
client: self.client.clone(),
|
||||
transaction_pool: self.transaction_pool.clone(),
|
||||
table: self.tracker.table.clone(),
|
||||
let data = CreateProposalData {
|
||||
parent_hash,
|
||||
parent_number,
|
||||
parent_id,
|
||||
client,
|
||||
transaction_pool,
|
||||
table,
|
||||
believed_minimum_timestamp: believed_timestamp,
|
||||
timing,
|
||||
inherent_data: Some(inherent_data),
|
||||
inherent_digests,
|
||||
// leave some time for the proposal finalisation
|
||||
deadline,
|
||||
};
|
||||
|
||||
// set up delay until next allowed timestamp.
|
||||
let current_timestamp = current_timestamp();
|
||||
if current_timestamp < believed_timestamp {
|
||||
Delay::new(Duration::from_millis(current_timestamp - believed_timestamp))
|
||||
.await;
|
||||
}
|
||||
|
||||
Delay::new(enough_candidates).await;
|
||||
|
||||
tokio_executor::blocking::run(move || {
|
||||
let proposed_candidates = data.table.proposed_set();
|
||||
data.propose_with(proposed_candidates)
|
||||
})
|
||||
})
|
||||
.await
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -689,67 +689,6 @@ fn current_timestamp() -> u64 {
|
||||
.as_millis() as u64
|
||||
}
|
||||
|
||||
struct ProposalTiming {
|
||||
minimum: Option<Delay>,
|
||||
attempt_propose: Box<dyn Stream<Item=()> + Send + Unpin>,
|
||||
dynamic_inclusion: DynamicInclusion,
|
||||
enough_candidates: Delay,
|
||||
last_included: usize,
|
||||
}
|
||||
|
||||
impl ProposalTiming {
|
||||
// whether it's time to attempt a proposal.
|
||||
// shouldn't be called outside of the context of a task.
|
||||
fn poll(&mut self, cx: &mut Context, included: usize) -> Poll<()> {
|
||||
// first drain from the interval so when the minimum delay is up
|
||||
// we don't have any notifications built up.
|
||||
//
|
||||
// this interval is just meant to produce periodic task wakeups
|
||||
// that lead to the `dynamic_inclusion` getting updated as necessary.
|
||||
while let Poll::Ready(x) = self.attempt_propose.poll_next_unpin(cx) {
|
||||
x.expect("timer still alive; intervals never end; qed");
|
||||
}
|
||||
|
||||
// wait until the minimum time has passed.
|
||||
if let Some(mut minimum) = self.minimum.take() {
|
||||
if let Poll::Pending = minimum.poll_unpin(cx) {
|
||||
self.minimum = Some(minimum);
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
|
||||
if included == self.last_included {
|
||||
return self.enough_candidates.poll_unpin(cx);
|
||||
}
|
||||
|
||||
// the amount of includable candidates has changed. schedule a wakeup
|
||||
// if it's not sufficient anymore.
|
||||
match self.dynamic_inclusion.acceptable_in(Instant::now(), included) {
|
||||
Some(instant) => {
|
||||
self.last_included = included;
|
||||
self.enough_candidates.reset(Instant::now() + instant);
|
||||
self.enough_candidates.poll_unpin(cx)
|
||||
}
|
||||
None => Poll::Ready(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Future which resolves upon the creation of a proposal.
|
||||
pub struct CreateProposal<C: Send + Sync, TxPool> {
|
||||
state: CreateProposalState<C, TxPool>,
|
||||
}
|
||||
|
||||
/// Current status of the proposal future.
|
||||
enum CreateProposalState<C: Send + Sync, TxPool> {
|
||||
/// Pending inclusion, with given proposal data.
|
||||
Pending(CreateProposalData<C, TxPool>),
|
||||
/// Represents the state when we switch from pending to fired.
|
||||
Switching,
|
||||
/// Block proposing has fired.
|
||||
Fired(tokio_executor::blocking::Blocking<Result<Block, Error>>),
|
||||
}
|
||||
|
||||
/// Inner data of the create proposal.
|
||||
struct CreateProposalData<C: Send + Sync, TxPool> {
|
||||
parent_hash: Hash,
|
||||
@@ -758,7 +697,6 @@ struct CreateProposalData<C: Send + Sync, TxPool> {
|
||||
client: Arc<C>,
|
||||
transaction_pool: Arc<TxPool>,
|
||||
table: Arc<SharedTable>,
|
||||
timing: ProposalTiming,
|
||||
believed_minimum_timestamp: u64,
|
||||
inherent_data: Option<InherentData>,
|
||||
inherent_digests: DigestFor<Block>,
|
||||
@@ -859,58 +797,6 @@ impl<C, TxPool> CreateProposalData<C, TxPool> where
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, TxPool> Future for CreateProposal<C, TxPool> where
|
||||
TxPool: TransactionPool<Block=Block> + 'static,
|
||||
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
|
||||
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
||||
{
|
||||
type Output = Result<Block, Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let mut state = CreateProposalState::Switching;
|
||||
mem::swap(&mut state, &mut self.state);
|
||||
|
||||
// 1. try to propose if we have enough includable candidates and other
|
||||
// delays have concluded.
|
||||
let data = match state {
|
||||
CreateProposalState::Pending(mut data) => {
|
||||
let included = data.table.includable_count();
|
||||
match data.timing.poll(cx, included) {
|
||||
Poll::Pending => {
|
||||
self.state = CreateProposalState::Pending(data);
|
||||
return Poll::Pending
|
||||
},
|
||||
Poll::Ready(()) => (),
|
||||
}
|
||||
|
||||
data
|
||||
},
|
||||
CreateProposalState::Switching =>
|
||||
unreachable!(
|
||||
"State Switching are only created on call, \
|
||||
and immediately swapped out; \
|
||||
the data being read is from state; \
|
||||
thus Switching will never be reachable here; qed"
|
||||
),
|
||||
CreateProposalState::Fired(mut future) => {
|
||||
let ret = Pin::new(&mut future).poll(cx);
|
||||
self.state = CreateProposalState::Fired(future);
|
||||
return ret
|
||||
},
|
||||
};
|
||||
|
||||
// 2. propose
|
||||
let mut future = tokio_executor::blocking::run(move || {
|
||||
let proposed_candidates = data.table.proposed_set();
|
||||
data.propose_with(proposed_candidates)
|
||||
});
|
||||
let polled = Pin::new(&mut future).poll(cx);
|
||||
self.state = CreateProposalState::Fired(future);
|
||||
|
||||
polled
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -17,16 +17,13 @@
|
||||
//! Implements a future which resolves when all of the candidates referenced are includable.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::channel::oneshot;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Poll, Context};
|
||||
|
||||
use polkadot_primitives::Hash;
|
||||
|
||||
/// Track includability of a set of candidates,
|
||||
pub(super) fn track<I: IntoIterator<Item=(Hash, bool)>>(candidates: I) -> (IncludabilitySender, Includable) {
|
||||
pub(super) fn track<I: IntoIterator<Item=(Hash, bool)>>(candidates: I)
|
||||
-> (IncludabilitySender, oneshot::Receiver<()>) {
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let tracking: HashMap<_, _> = candidates.into_iter().collect();
|
||||
let includable_count = tracking.values().filter(|x| **x).count();
|
||||
@@ -39,10 +36,7 @@ pub(super) fn track<I: IntoIterator<Item=(Hash, bool)>>(candidates: I) -> (Inclu
|
||||
|
||||
sender.try_complete();
|
||||
|
||||
(
|
||||
sender,
|
||||
Includable(rx),
|
||||
)
|
||||
(sender, rx)
|
||||
}
|
||||
|
||||
/// The sending end of the includability sender.
|
||||
@@ -93,17 +87,6 @@ impl IncludabilitySender {
|
||||
}
|
||||
}
|
||||
|
||||
/// Future that resolves when all the candidates within are includable.
|
||||
pub struct Includable(oneshot::Receiver<()>);
|
||||
|
||||
impl Future for Includable {
|
||||
type Output = Result<(), oneshot::Canceled>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
Pin::new(&mut Pin::into_inner(self).0).poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -30,6 +30,7 @@ use polkadot_primitives::parachain::{
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use futures::prelude::*;
|
||||
use futures::channel::oneshot;
|
||||
use log::{warn, debug};
|
||||
use bitvec::bitvec;
|
||||
|
||||
@@ -40,7 +41,6 @@ use runtime_primitives::traits::ProvideRuntimeApi;
|
||||
|
||||
mod includable;
|
||||
|
||||
pub use self::includable::Includable;
|
||||
pub use table::{SignedStatement, Statement};
|
||||
pub use table::generic::Statement as GenericStatement;
|
||||
|
||||
@@ -543,7 +543,7 @@ impl SharedTable {
|
||||
}
|
||||
|
||||
/// Track includability of a given set of candidate hashes.
|
||||
pub fn track_includability<I>(&self, iterable: I) -> Includable
|
||||
pub fn track_includability<I>(&self, iterable: I) -> oneshot::Receiver<()>
|
||||
where I: IntoIterator<Item=Hash>
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
Reference in New Issue
Block a user