mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-09 15:18:00 +00:00
Make Proposer instantiation potentially async. (#4630)
* Make Proposer instantiation potentially async. * fix node-service test * fix basic-authority doc-test * only block once on futures in test * use async/await
This commit is contained in:
committed by
GitHub
parent
ab1be250bc
commit
c7069de044
@@ -535,13 +535,15 @@ mod tests {
|
||||
|
||||
digest.push(<DigestItem as CompatibleDigestItem>::babe_pre_digest(babe_pre_digest));
|
||||
|
||||
let mut proposer = proposer_factory.init(&parent_header).unwrap();
|
||||
let new_block = futures::executor::block_on(proposer.propose(
|
||||
inherent_data,
|
||||
digest,
|
||||
std::time::Duration::from_secs(1),
|
||||
RecordProof::Yes,
|
||||
)).expect("Error making test block").block;
|
||||
let new_block = futures::executor::block_on(async move {
|
||||
let proposer = proposer_factory.init(&parent_header).await;
|
||||
proposer.unwrap().propose(
|
||||
inherent_data,
|
||||
digest,
|
||||
std::time::Duration::from_secs(1),
|
||||
RecordProof::Yes,
|
||||
).await
|
||||
}).expect("Error making test block").block;
|
||||
|
||||
let (new_header, new_body) = new_block.deconstruct();
|
||||
let pre_hash = new_header.hash();
|
||||
|
||||
@@ -34,6 +34,7 @@ use sp_transaction_pool::{TransactionPool, InPoolTransaction};
|
||||
use sc_telemetry::{telemetry, CONSENSUS_INFO};
|
||||
use sc_block_builder::BlockBuilderApi;
|
||||
use sp_api::{ProvideRuntimeApi, ApiExt};
|
||||
use futures::prelude::*;
|
||||
|
||||
/// Proposer factory.
|
||||
pub struct ProposerFactory<C, A> where A: TransactionPool {
|
||||
@@ -59,7 +60,7 @@ impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
|
||||
&mut self,
|
||||
parent_header: &<Block as BlockT>::Header,
|
||||
now: Box<dyn Fn() -> time::Instant + Send + Sync>,
|
||||
) -> Result<Proposer<Block, SubstrateClient<B, E, Block, RA>, A>, sp_blockchain::Error> {
|
||||
) -> Proposer<Block, SubstrateClient<B, E, Block, RA>, A> {
|
||||
let parent_hash = parent_header.hash();
|
||||
|
||||
let id = BlockId::hash(parent_hash);
|
||||
@@ -77,7 +78,7 @@ impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
|
||||
}),
|
||||
};
|
||||
|
||||
Ok(proposer)
|
||||
proposer
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,14 +95,15 @@ impl<B, E, Block, RA, A> sp_consensus::Environment<Block> for
|
||||
BlockBuilderApi<Block, Error = sp_blockchain::Error> +
|
||||
ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>,
|
||||
{
|
||||
type CreateProposer = future::Ready<Result<Self::Proposer, Self::Error>>;
|
||||
type Proposer = Proposer<Block, SubstrateClient<B, E, Block, RA>, A>;
|
||||
type Error = sp_blockchain::Error;
|
||||
|
||||
fn init(
|
||||
&mut self,
|
||||
parent_header: &<Block as BlockT>::Header,
|
||||
) -> Result<Self::Proposer, sp_blockchain::Error> {
|
||||
self.init_with_now(parent_header, Box::new(time::Instant::now))
|
||||
) -> Self::CreateProposer {
|
||||
future::ready(Ok(self.init_with_now(parent_header, Box::new(time::Instant::now))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -324,7 +326,7 @@ mod tests {
|
||||
*value = new;
|
||||
old
|
||||
})
|
||||
).unwrap();
|
||||
);
|
||||
|
||||
// when
|
||||
let deadline = time::Duration::from_secs(3);
|
||||
@@ -359,7 +361,7 @@ mod tests {
|
||||
let mut proposer = proposer_factory.init_with_now(
|
||||
&client.header(&block_id).unwrap().unwrap(),
|
||||
Box::new(move || time::Instant::now()),
|
||||
).unwrap();
|
||||
);
|
||||
|
||||
let deadline = time::Duration::from_secs(9);
|
||||
let proposal = futures::executor::block_on(
|
||||
|
||||
@@ -34,9 +34,12 @@
|
||||
//! };
|
||||
//!
|
||||
//! // From this factory, we create a `Proposer`.
|
||||
//! let mut proposer = proposer_factory.init(
|
||||
//! let proposer = proposer_factory.init(
|
||||
//! &client.header(&BlockId::number(0)).unwrap().unwrap(),
|
||||
//! ).unwrap();
|
||||
//! );
|
||||
//!
|
||||
//! // The proposer is created asynchronously.
|
||||
//! let mut proposer = futures::executor::block_on(proposer).unwrap();
|
||||
//!
|
||||
//! // This `Proposer` allows us to create a block proposition.
|
||||
//! // The proposer will grab transactions from the transaction pool, and put them into the block.
|
||||
|
||||
@@ -217,6 +217,9 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
|
||||
{
|
||||
type BlockImport = I;
|
||||
type SyncOracle = SO;
|
||||
type CreateProposer = Pin<Box<
|
||||
dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
|
||||
>>;
|
||||
type Proposer = E::Proposer;
|
||||
type Claim = P;
|
||||
type EpochData = Vec<AuthorityId<P>>;
|
||||
@@ -302,10 +305,10 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
|
||||
&mut self.sync_oracle
|
||||
}
|
||||
|
||||
fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error> {
|
||||
self.env.init(block).map_err(|e| {
|
||||
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
|
||||
Box::pin(self.env.init(block).map_err(|e| {
|
||||
sp_consensus::Error::ClientImport(format!("{:?}", e)).into()
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
fn proposing_remaining_duration(
|
||||
@@ -874,12 +877,13 @@ mod tests {
|
||||
|
||||
impl Environment<TestBlock> for DummyFactory {
|
||||
type Proposer = DummyProposer;
|
||||
type CreateProposer = futures::future::Ready<Result<DummyProposer, Error>>;
|
||||
type Error = Error;
|
||||
|
||||
fn init(&mut self, parent_header: &<TestBlock as BlockT>::Header)
|
||||
-> Result<DummyProposer, Error>
|
||||
-> Self::CreateProposer
|
||||
{
|
||||
Ok(DummyProposer(parent_header.number + 1, self.0.clone()))
|
||||
futures::future::ready(Ok(DummyProposer(parent_header.number + 1, self.0.clone())))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -363,6 +363,9 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
|
||||
type EpochData = Epoch;
|
||||
type Claim = (BabePreDigest, AuthorityPair);
|
||||
type SyncOracle = SO;
|
||||
type CreateProposer = Pin<Box<
|
||||
dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
|
||||
>>;
|
||||
type Proposer = E::Proposer;
|
||||
type BlockImport = I;
|
||||
|
||||
@@ -466,10 +469,10 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
|
||||
&mut self.sync_oracle
|
||||
}
|
||||
|
||||
fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error> {
|
||||
self.env.init(block).map_err(|e| {
|
||||
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
|
||||
Box::pin(self.env.init(block).map_err(|e| {
|
||||
sp_consensus::Error::ClientImport(format!("{:?}", e))
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
fn proposing_remaining_duration(
|
||||
|
||||
@@ -72,23 +72,24 @@ struct DummyProposer {
|
||||
}
|
||||
|
||||
impl Environment<TestBlock> for DummyFactory {
|
||||
type CreateProposer = future::Ready<Result<DummyProposer, Error>>;
|
||||
type Proposer = DummyProposer;
|
||||
type Error = Error;
|
||||
|
||||
fn init(&mut self, parent_header: &<TestBlock as BlockT>::Header)
|
||||
-> Result<DummyProposer, Error>
|
||||
-> Self::CreateProposer
|
||||
{
|
||||
|
||||
let parent_slot = crate::find_pre_digest::<TestBlock>(parent_header)
|
||||
.expect("parent header has a pre-digest")
|
||||
.slot_number();
|
||||
|
||||
Ok(DummyProposer {
|
||||
future::ready(Ok(DummyProposer {
|
||||
factory: self.clone(),
|
||||
parent_hash: parent_header.hash(),
|
||||
parent_number: *parent_header.number(),
|
||||
parent_slot,
|
||||
})
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -547,7 +548,7 @@ fn propose_and_import_block<Transaction>(
|
||||
proposer_factory: &mut DummyFactory,
|
||||
block_import: &mut BoxBlockImport<TestBlock, Transaction>,
|
||||
) -> sp_core::H256 {
|
||||
let mut proposer = proposer_factory.init(parent).unwrap();
|
||||
let mut proposer = futures::executor::block_on(proposer_factory.init(parent)).unwrap();
|
||||
|
||||
let slot_number = slot_number.unwrap_or_else(|| {
|
||||
let parent_pre_digest = find_pre_digest::<TestBlock>(parent).unwrap();
|
||||
|
||||
@@ -486,7 +486,7 @@ fn mine_loop<B: BlockT, C, Algorithm, E, SO, S, CAW>(
|
||||
}
|
||||
|
||||
let mut aux = PowAux::read(client, &best_hash)?;
|
||||
let mut proposer = env.init(&best_header)
|
||||
let mut proposer = futures::executor::block_on(env.init(&best_header))
|
||||
.map_err(|e| Error::Environment(format!("{:?}", e)))?;
|
||||
|
||||
let inherent_data = inherent_data_providers
|
||||
|
||||
@@ -70,6 +70,10 @@ pub trait SimpleSlotWorker<B: BlockT> {
|
||||
/// A handle to a `SyncOracle`.
|
||||
type SyncOracle: SyncOracle;
|
||||
|
||||
/// The type of future resolving to the proposer.
|
||||
type CreateProposer: Future<Output = Result<Self::Proposer, sp_consensus::Error>>
|
||||
+ Send + Unpin + 'static;
|
||||
|
||||
/// The type of proposer to use to build blocks.
|
||||
type Proposer: Proposer<B>;
|
||||
|
||||
@@ -129,7 +133,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
|
||||
fn sync_oracle(&mut self) -> &mut Self::SyncOracle;
|
||||
|
||||
/// Returns a `Proposer` to author on top of the given block.
|
||||
fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error>;
|
||||
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer;
|
||||
|
||||
/// Remaining duration of the slot.
|
||||
fn slot_remaining_duration(&self, slot_info: &SlotInfo) -> Duration {
|
||||
@@ -216,32 +220,30 @@ pub trait SimpleSlotWorker<B: BlockT> {
|
||||
"timestamp" => timestamp,
|
||||
);
|
||||
|
||||
let mut proposer = match self.proposer(&chain_head) {
|
||||
Ok(proposer) => proposer,
|
||||
Err(err) => {
|
||||
warn!("Unable to author block in slot {:?}: {:?}", slot_number, err);
|
||||
let awaiting_proposer = self.proposer(&chain_head).map_err(move |err| {
|
||||
warn!("Unable to author block in slot {:?}: {:?}", slot_number, err);
|
||||
|
||||
telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block";
|
||||
"slot" => slot_number, "err" => ?err
|
||||
);
|
||||
telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block";
|
||||
"slot" => slot_number, "err" => ?err
|
||||
);
|
||||
|
||||
return Box::pin(future::ready(Ok(())));
|
||||
},
|
||||
};
|
||||
err
|
||||
});
|
||||
|
||||
let slot_remaining_duration = self.slot_remaining_duration(&slot_info);
|
||||
let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
|
||||
let logs = self.pre_digest_data(slot_number, &claim);
|
||||
|
||||
// deadline our production to approx. the end of the slot
|
||||
let proposing = proposer.propose(
|
||||
let proposing = awaiting_proposer.and_then(move |mut proposer| proposer.propose(
|
||||
slot_info.inherent_data,
|
||||
sp_runtime::generic::Digest {
|
||||
logs,
|
||||
},
|
||||
slot_remaining_duration,
|
||||
RecordProof::No,
|
||||
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)));
|
||||
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))));
|
||||
|
||||
let delay: Box<dyn Future<Output=()> + Unpin + Send> = match proposing_remaining_duration {
|
||||
Some(r) => Box::new(Delay::new(r)),
|
||||
None => Box::new(future::pending()),
|
||||
|
||||
@@ -74,13 +74,16 @@ pub enum BlockStatus {
|
||||
/// Environment producer for a Consensus instance. Creates proposer instance and communication streams.
|
||||
pub trait Environment<B: BlockT> {
|
||||
/// The proposer type this creates.
|
||||
type Proposer: Proposer<B> + 'static;
|
||||
type Proposer: Proposer<B> + Send + 'static;
|
||||
/// A future that resolves to the proposer.
|
||||
type CreateProposer: Future<Output = Result<Self::Proposer, Self::Error>>
|
||||
+ Send + Unpin + 'static;
|
||||
/// Error which can occur upon creation.
|
||||
type Error: From<Error> + std::fmt::Debug + 'static;
|
||||
|
||||
/// Initialize the proposal logic on top of a specific header. Provide
|
||||
/// the authorities at that header.
|
||||
fn init(&mut self, parent_header: &B::Header) -> Result<Self::Proposer, Self::Error>;
|
||||
fn init(&mut self, parent_header: &B::Header) -> Self::CreateProposer;
|
||||
}
|
||||
|
||||
/// A proposal that is created by a [`Proposer`].
|
||||
|
||||
Reference in New Issue
Block a user