mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 08:11:03 +00:00
Update for Substrate master (#600)
* update substrate for change to palette * change paint to palette * update lock * Fix missing import * change to polkadot-master * Use same commit hash of parity-common * Resolve linking errors * Rename to frame * bump spec * Subsume #602 and #596 * Fix DispatchInfo * Merge `futures03` and `joe-update-to-palette` (#606) * Change repo and branch * Made changes * Bumped async-std version * Fix line width * Bump spec_version * Fix `run_to_block` for Crowdfund module (#603) Probably a copy paste error. * Bump dependencies * Update trie-db to be inline with substrate * Fix documentation warning * Fix test compilation
This commit is contained in:
committed by
Gavin Wood
parent
e229074f79
commit
c9b1e3d959
+111
-96
@@ -49,8 +49,10 @@ use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::{future, Stream, Future, IntoFuture};
|
||||
use futures03::{TryStreamExt as _, StreamExt as _};
|
||||
use futures::{
|
||||
future, Future, Stream, FutureExt, TryFutureExt, StreamExt,
|
||||
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}
|
||||
};
|
||||
use log::{warn, error};
|
||||
use client::BlockchainEvents;
|
||||
use primitives::{Pair, Blake2Hasher};
|
||||
@@ -67,7 +69,6 @@ use polkadot_cli::{
|
||||
use polkadot_network::validation::{LeafWorkParams, ValidationNetwork};
|
||||
use polkadot_network::{PolkadotNetworkService, PolkadotProtocol};
|
||||
use polkadot_runtime::RuntimeApi;
|
||||
use tokio::timer::Timeout;
|
||||
|
||||
pub use polkadot_cli::{VersionInfo, TaskExecutor};
|
||||
pub use polkadot_network::validation::Incoming;
|
||||
@@ -81,14 +82,14 @@ const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
pub trait Network: Send + Sync {
|
||||
/// Convert the given `CollatorId` to a `PeerId`.
|
||||
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
|
||||
Box<dyn Future<Item=Option<PeerId>, Error=()> + Send>;
|
||||
Box<dyn Future<Output=Option<PeerId>> + Unpin + Send>;
|
||||
|
||||
/// Create a `Stream` of checked statements for the given `relay_parent`.
|
||||
///
|
||||
/// The returned stream will not terminate, so it is required to make sure that the stream is
|
||||
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
|
||||
/// infinitely.
|
||||
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>>;
|
||||
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
|
||||
}
|
||||
|
||||
impl<P, E> Network for ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor> where
|
||||
@@ -96,13 +97,21 @@ impl<P, E> Network for ValidationNetwork<P, E, PolkadotNetworkService, TaskExecu
|
||||
E: 'static + Send + Sync,
|
||||
{
|
||||
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
|
||||
Box<dyn Future<Item=Option<PeerId>, Error=()> + Send>
|
||||
Box<dyn Future<Output=Option<PeerId>> + Unpin + Send>
|
||||
{
|
||||
Box::new(Self::collator_id_to_peer_id(self, collator_id))
|
||||
Box::new(
|
||||
Self::collator_id_to_peer_id(self, collator_id)
|
||||
.compat()
|
||||
.map(|res| res.ok().and_then(|id| id))
|
||||
)
|
||||
}
|
||||
|
||||
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>> {
|
||||
Box::new(Self::checked_statements(self, relay_parent))
|
||||
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>> {
|
||||
Box::new(
|
||||
Self::checked_statements(self, relay_parent)
|
||||
.compat()
|
||||
.filter_map(|item| future::ready(item.ok()))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,7 +162,7 @@ pub trait BuildParachainContext {
|
||||
/// This can be implemented through an externally attached service or a stub.
|
||||
/// This is expected to be a lightweight, shared type like an Arc.
|
||||
pub trait ParachainContext: Clone {
|
||||
type ProduceCandidate: IntoFuture<Item=(BlockData, HeadData, OutgoingMessages), Error=InvalidHead>;
|
||||
type ProduceCandidate: Future<Output = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>>;
|
||||
|
||||
/// Produce a candidate, given the relay parent hash, the latest ingress queue information
|
||||
/// and the last parachain head.
|
||||
@@ -173,14 +182,14 @@ pub trait RelayChainContext {
|
||||
|
||||
/// Future that resolves to the un-routed egress queues of a parachain.
|
||||
/// The first item is the oldest.
|
||||
type FutureEgress: IntoFuture<Item=ConsolidatedIngress, Error=Self::Error>;
|
||||
type FutureEgress: Future<Output = Result<ConsolidatedIngress, Self::Error>>;
|
||||
|
||||
/// Get un-routed egress queues from a parachain to the local parachain.
|
||||
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress;
|
||||
}
|
||||
|
||||
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
|
||||
pub fn collate<'a, R, P>(
|
||||
pub async fn collate<R, P>(
|
||||
relay_parent: Hash,
|
||||
local_id: ParaId,
|
||||
parachain_status: ParachainStatus,
|
||||
@@ -188,53 +197,45 @@ pub fn collate<'a, R, P>(
|
||||
mut para_context: P,
|
||||
key: Arc<CollatorPair>,
|
||||
)
|
||||
-> impl Future<Item=(parachain::Collation, OutgoingMessages), Error=Error<R::Error>> + 'a
|
||||
-> Result<(parachain::Collation, OutgoingMessages), Error<R::Error>>
|
||||
where
|
||||
R: RelayChainContext,
|
||||
R::Error: 'a,
|
||||
R::FutureEgress: 'a,
|
||||
P: ParachainContext + 'a,
|
||||
<P::ProduceCandidate as IntoFuture>::Future: Send,
|
||||
P: ParachainContext,
|
||||
P::ProduceCandidate: Send,
|
||||
{
|
||||
let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot);
|
||||
ingress
|
||||
.and_then(move |ingress| {
|
||||
para_context.produce_candidate(
|
||||
relay_parent,
|
||||
parachain_status,
|
||||
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
|
||||
)
|
||||
.into_future()
|
||||
.map(move |x| (ingress, x))
|
||||
.map_err(Error::Collator)
|
||||
})
|
||||
.and_then(move |(ingress, (block_data, head_data, mut outgoing))| {
|
||||
let block_data_hash = block_data.hash();
|
||||
let signature = key.sign(block_data_hash.as_ref()).into();
|
||||
let egress_queue_roots =
|
||||
polkadot_validation::egress_roots(&mut outgoing.outgoing_messages);
|
||||
let ingress = relay_context.unrouted_egress(local_id).await.map_err(Error::Polkadot)?;
|
||||
|
||||
let receipt = parachain::CandidateReceipt {
|
||||
parachain_index: local_id,
|
||||
collator: key.public(),
|
||||
signature,
|
||||
head_data,
|
||||
egress_queue_roots,
|
||||
fees: 0,
|
||||
block_data_hash,
|
||||
upward_messages: Vec::new(),
|
||||
};
|
||||
let (block_data, head_data, mut outgoing) = para_context.produce_candidate(
|
||||
relay_parent,
|
||||
parachain_status,
|
||||
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
|
||||
).map_err(Error::Collator).await?;
|
||||
|
||||
let collation = parachain::Collation {
|
||||
receipt,
|
||||
pov: PoVBlock {
|
||||
block_data,
|
||||
ingress,
|
||||
},
|
||||
};
|
||||
let block_data_hash = block_data.hash();
|
||||
let signature = key.sign(block_data_hash.as_ref());
|
||||
let egress_queue_roots =
|
||||
polkadot_validation::egress_roots(&mut outgoing.outgoing_messages);
|
||||
|
||||
Ok((collation, outgoing))
|
||||
})
|
||||
let receipt = parachain::CandidateReceipt {
|
||||
parachain_index: local_id,
|
||||
collator: key.public(),
|
||||
signature,
|
||||
head_data,
|
||||
egress_queue_roots,
|
||||
fees: 0,
|
||||
block_data_hash,
|
||||
upward_messages: Vec::new(),
|
||||
};
|
||||
|
||||
let collation = parachain::Collation {
|
||||
receipt,
|
||||
pov: PoVBlock {
|
||||
block_data,
|
||||
ingress,
|
||||
},
|
||||
};
|
||||
|
||||
Ok((collation, outgoing))
|
||||
}
|
||||
|
||||
/// Polkadot-api context.
|
||||
@@ -247,10 +248,10 @@ struct ApiContext<P, E> {
|
||||
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
|
||||
P: ProvideRuntimeApi + Send + Sync,
|
||||
P::Api: ParachainHost<Block>,
|
||||
E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
|
||||
E: futures01::Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
|
||||
{
|
||||
type Error = String;
|
||||
type FutureEgress = Box<dyn Future<Item=ConsolidatedIngress, Error=String> + Send>;
|
||||
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
|
||||
|
||||
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
|
||||
// TODO: https://github.com/paritytech/polkadot/issues/253
|
||||
@@ -260,7 +261,9 @@ impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
|
||||
local_session_key: None,
|
||||
parent_hash: self.parent_hash,
|
||||
authorities: self.validators.clone(),
|
||||
}).map_err(|e| format!("unable to instantiate validation session: {:?}", e));
|
||||
})
|
||||
.compat()
|
||||
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));
|
||||
|
||||
Box::new(future::ok(ConsolidatedIngress(Vec::new())))
|
||||
}
|
||||
@@ -274,27 +277,27 @@ struct CollationNode<P, E> {
|
||||
}
|
||||
|
||||
impl<P, E> IntoExit for CollationNode<P, E> where
|
||||
E: Future<Item=(),Error=()> + Send + 'static
|
||||
E: futures01::Future<Item=(),Error=()> + Unpin + Send + 'static
|
||||
{
|
||||
type Exit = E;
|
||||
type Exit = future::Map<Compat01As03<E>, fn (Result<(), ()>) -> ()>;
|
||||
fn into_exit(self) -> Self::Exit {
|
||||
self.exit
|
||||
self.exit.compat().map(drop)
|
||||
}
|
||||
}
|
||||
|
||||
impl<P, E> Worker for CollationNode<P, E> where
|
||||
P: BuildParachainContext + Send + 'static,
|
||||
P::ParachainContext: Send + 'static,
|
||||
<<P::ParachainContext as ParachainContext>::ProduceCandidate as IntoFuture>::Future: Send + 'static,
|
||||
E: Future<Item=(), Error=()> + Clone + Send + Sync + 'static,
|
||||
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
|
||||
E: futures01::Future<Item=(),Error=()> + Clone + Unpin + Send + Sync + 'static,
|
||||
{
|
||||
type Work = Box<dyn Future<Item=(), Error=()> + Send>;
|
||||
type Work = Box<dyn Future<Output=()> + Unpin + Send>;
|
||||
|
||||
fn configuration(&self) -> CustomConfiguration {
|
||||
let mut config = CustomConfiguration::default();
|
||||
config.collating_for = Some((
|
||||
self.key.public(),
|
||||
self.para_id.clone(),
|
||||
self.para_id,
|
||||
));
|
||||
config
|
||||
}
|
||||
@@ -321,7 +324,7 @@ impl<P, E> Worker for CollationNode<P, E> where
|
||||
select_chain
|
||||
} else {
|
||||
error!("The node cannot work because it can't select chain.");
|
||||
return Box::new(future::err(()));
|
||||
return Box::new(future::ready(()));
|
||||
};
|
||||
|
||||
let is_known = move |block_hash: &Hash| {
|
||||
@@ -364,20 +367,18 @@ impl<P, E> Worker for CollationNode<P, E> where
|
||||
Ok(ctx) => ctx,
|
||||
Err(()) => {
|
||||
error!("Could not build the parachain context!");
|
||||
return Box::new(future::err(()))
|
||||
return Box::new(future::ready(()))
|
||||
}
|
||||
};
|
||||
|
||||
let inner_exit = exit.clone();
|
||||
let work = client.import_notification_stream()
|
||||
.map(|v| Ok::<_, ()>(v))
|
||||
.compat()
|
||||
.for_each(move |notification| {
|
||||
macro_rules! try_fr {
|
||||
($e:expr) => {
|
||||
match $e {
|
||||
Ok(x) => x,
|
||||
Err(e) => return future::Either::A(future::err(Error::Polkadot(
|
||||
Err(e) => return future::Either::Left(future::err(Error::Polkadot(
|
||||
format!("{:?}", e)
|
||||
))),
|
||||
}
|
||||
@@ -393,11 +394,11 @@ impl<P, E> Worker for CollationNode<P, E> where
|
||||
let parachain_context = parachain_context.clone();
|
||||
let validation_network = validation_network.clone();
|
||||
|
||||
let work = future::lazy(move || {
|
||||
let work = future::lazy(move |_| {
|
||||
let api = client.runtime_api();
|
||||
let status = match try_fr!(api.parachain_status(&id, para_id)) {
|
||||
Some(status) => status,
|
||||
None => return future::Either::A(future::ok(())),
|
||||
None => return future::Either::Left(future::ok(())),
|
||||
};
|
||||
|
||||
let validators = try_fr!(api.validators(&id));
|
||||
@@ -421,7 +422,7 @@ impl<P, E> Worker for CollationNode<P, E> where
|
||||
context,
|
||||
parachain_context,
|
||||
key,
|
||||
).map(move |(collation, outgoing)| {
|
||||
).map_ok(move |(collation, outgoing)| {
|
||||
network.with_spec(move |spec, ctx| {
|
||||
let res = spec.add_local_collation(
|
||||
ctx,
|
||||
@@ -437,23 +438,36 @@ impl<P, E> Worker for CollationNode<P, E> where
|
||||
})
|
||||
});
|
||||
|
||||
future::Either::B(collation_work)
|
||||
});
|
||||
let deadlined = Timeout::new(work, COLLATION_TIMEOUT);
|
||||
let silenced = deadlined.then(|res| match res {
|
||||
Ok(()) => Ok(()),
|
||||
Err(_) => {
|
||||
warn!("Collation failure: timeout");
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
future::Either::Right(collation_work)
|
||||
}).map(|_| Ok::<_, ()>(()));
|
||||
|
||||
tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(())));
|
||||
Ok(())
|
||||
let deadlined = future::select(
|
||||
work,
|
||||
futures_timer::Delay::new(COLLATION_TIMEOUT)
|
||||
);
|
||||
|
||||
let silenced = deadlined
|
||||
.map(|either| {
|
||||
if let future::Either::Right(_) = either {
|
||||
warn!("Collation failure: timeout");
|
||||
}
|
||||
});
|
||||
|
||||
let future = future::select(
|
||||
silenced,
|
||||
inner_exit.clone().map(|_| Ok::<_, ()>(())).compat()
|
||||
).map(|_| Ok::<_, ()>(())).compat();
|
||||
|
||||
tokio::spawn(future);
|
||||
future::ready(())
|
||||
});
|
||||
|
||||
let work_and_exit = work.select(exit).then(|_| Ok(()));
|
||||
Box::new(work_and_exit) as Box<_>
|
||||
let work_and_exit = future::select(
|
||||
work,
|
||||
exit.map(|_| Ok::<_, ()>(())).compat()
|
||||
).map(|_| ());
|
||||
|
||||
Box::new(work_and_exit)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -481,11 +495,10 @@ pub fn run_collator<P, E>(
|
||||
) -> polkadot_cli::error::Result<()> where
|
||||
P: BuildParachainContext + Send + 'static,
|
||||
P::ParachainContext: Send + 'static,
|
||||
<<P::ParachainContext as ParachainContext>::ProduceCandidate as IntoFuture>::Future: Send + 'static,
|
||||
E: IntoFuture<Item=(), Error=()>,
|
||||
E::Future: Send + Clone + Sync + 'static,
|
||||
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
|
||||
E: futures01::Future<Item = (),Error=()> + Unpin + Send + Clone + Sync + 'static,
|
||||
{
|
||||
let node_logic = CollationNode { build_parachain_context, exit: exit.into_future(), para_id, key };
|
||||
let node_logic = CollationNode { build_parachain_context, exit, para_id, key };
|
||||
polkadot_cli::run(node_logic, version)
|
||||
}
|
||||
|
||||
@@ -503,12 +516,12 @@ mod tests {
|
||||
|
||||
impl RelayChainContext for DummyRelayChainContext {
|
||||
type Error = ();
|
||||
type FutureEgress = Box<dyn Future<Item=ConsolidatedIngress,Error=()>>;
|
||||
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress,()>> + Unpin>;
|
||||
|
||||
fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress {
|
||||
match self.ingress.get(¶_id) {
|
||||
Some(ingress) => Box::new(future::ok(ingress.clone())),
|
||||
None => Box::new(future::empty()),
|
||||
None => Box::new(future::pending()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -517,16 +530,16 @@ mod tests {
|
||||
struct DummyParachainContext;
|
||||
|
||||
impl ParachainContext for DummyParachainContext {
|
||||
type ProduceCandidate = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>;
|
||||
type ProduceCandidate = future::Ready<Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>>;
|
||||
|
||||
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
|
||||
&mut self,
|
||||
_relay_parent: Hash,
|
||||
_status: ParachainStatus,
|
||||
ingress: I,
|
||||
) -> Result<(BlockData, HeadData, OutgoingMessages), InvalidHead> {
|
||||
) -> Self::ProduceCandidate {
|
||||
// send messages right back.
|
||||
Ok((
|
||||
future::ok((
|
||||
BlockData(vec![1, 2, 3, 4, 5,]),
|
||||
HeadData(vec![9, 9, 9]),
|
||||
OutgoingMessages {
|
||||
@@ -570,7 +583,7 @@ mod tests {
|
||||
(a, messages_from_a),
|
||||
]));
|
||||
|
||||
let collation = collate(
|
||||
let future = collate(
|
||||
Default::default(),
|
||||
id,
|
||||
ParachainStatus {
|
||||
@@ -584,7 +597,9 @@ mod tests {
|
||||
context.clone(),
|
||||
DummyParachainContext,
|
||||
Arc::new(Sr25519Keyring::Alice.pair().into()),
|
||||
).wait().unwrap().0;
|
||||
);
|
||||
|
||||
let collation = futures::executor::block_on(future).unwrap().0;
|
||||
|
||||
// ascending order by root.
|
||||
assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
|
||||
|
||||
Reference in New Issue
Block a user