consensus: refactor aura and babe proposer (#3377)

This commit is contained in:
André Silva
2019-08-15 07:20:39 +01:00
committed by Bastian Köcher
parent 5b83e6426f
commit 6058207935
6 changed files with 372 additions and 291 deletions
+85 -151
View File
@@ -186,10 +186,10 @@ pub fn start_babe<B, C, SC, E, I, SO, Error, H>(BabeParams {
C: ProvideRuntimeApi + ProvideCache<B> + ProvideUncles<B> + Send + Sync + 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error=Error> + Send + Sync,
E::Proposer: Proposer<B, Error=Error>,
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
H: Header<Hash=B::Hash>,
E: Environment<B, Error=Error>,
I: BlockImport<B> + Send + Sync + 'static,
Error: std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
@@ -229,155 +229,77 @@ struct BabeWorker<C, E, I, SO> {
keystore: KeyStorePtr,
}
impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> where
B: BlockT<Header=H, Hash=Hash>,
impl<H, B, C, E, I, Error, SO> slots::SimpleSlotWorker<B> for BabeWorker<C, E, I, SO> where
B: BlockT<Header=H>,
C: ProvideRuntimeApi + ProvideCache<B>,
C::Api: BabeApi<B>,
E: Environment<B, Error=Error>,
E::Proposer: Proposer<B, Error=Error>,
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
Hash: Debug + Eq + Copy + SimpleBitOps + Encode + Decode + Serialize +
for<'de> Deserialize<'de> + Debug + Default + AsRef<[u8]> + AsMut<[u8]> +
std::hash::Hash + Display + Send + Sync + 'static,
H: Header<Hash=B::Hash>,
I: BlockImport<B> + Send + Sync + 'static,
SO: SyncOracle + Send + Clone,
Error: std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
{
type OnSlot = Pin<Box<dyn Future<Output = Result<(), consensus_common::Error>> + Send>>;
type EpochData = Epoch;
type Claim = (VRFInOut, VRFProof, u32, AuthorityPair);
type SyncOracle = SO;
type Proposer = E::Proposer;
type BlockImport = I;
fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
) -> Self::OnSlot {
let ref client = self.client;
let block_import = self.block_import.clone();
fn logging_target(&self) -> &'static str {
"babe"
}
let (timestamp, slot_number, slot_duration) =
(slot_info.timestamp, slot_info.number, slot_info.duration);
fn block_import(&self) -> Arc<Mutex<Self::BlockImport>> {
self.block_import.clone()
}
let epoch = match epoch(client.as_ref(), &BlockId::Hash(chain_head.hash())) {
Ok(authorities) => authorities,
Err(e) => {
error!(
target: "babe",
"Unable to fetch authorities at block {:?}: {:?}",
chain_head.hash(),
e
);
telemetry!(CONSENSUS_WARN; "babe.unable_fetching_authorities";
"slot" => ?chain_head.hash(), "err" => ?e
);
return Box::pin(future::ready(Ok(())));
}
};
fn epoch_data(&self, block: &B::Hash) -> Result<Self::EpochData, consensus_common::Error> {
epoch(self.client.as_ref(), &BlockId::Hash(*block))
}
let Epoch { ref authorities, .. } = epoch;
fn authorities_len(&self, epoch_data: &Self::EpochData) -> usize {
epoch_data.authorities.len()
}
if authorities.is_empty() {
error!(target: "babe", "No authorities at block {:?}", chain_head.hash());
}
if !self.force_authoring && self.sync_oracle.is_offline() && authorities.len() > 1 {
debug!(target: "babe", "Skipping proposal slot. Waiting for the network.");
telemetry!(CONSENSUS_DEBUG; "babe.skipping_proposal_slot";
"authorities_len" => authorities.len()
);
return Box::pin(future::ready(Ok(())));
}
let proposal_work = if let Some(claim) = claim_slot(
slot_info.number,
epoch,
fn claim_slot(&self, slot_number: u64, epoch_data: &Self::EpochData) -> Option<Self::Claim> {
claim_slot(
slot_number,
epoch_data,
self.c,
&self.keystore,
) {
let ((inout, vrf_proof, _batchable_proof), authority_index, key) = claim;
).map(|((inout, vrf_proof, _), authority_index, key)| {
(inout, vrf_proof, authority_index as u32, key)
})
}
debug!(
target: "babe", "Starting authorship at slot {}; timestamp = {}",
slot_number,
timestamp,
);
telemetry!(CONSENSUS_DEBUG; "babe.starting_authorship";
"slot_number" => slot_number, "timestamp" => timestamp
);
// we are the slot author. make a block and sign it.
let mut proposer = match self.env.init(&chain_head) {
Ok(p) => p,
Err(e) => {
warn!(target: "babe",
"Unable to author block in slot {:?}: {:?}",
slot_number,
e,
);
telemetry!(CONSENSUS_WARN; "babe.unable_authoring_block";
"slot" => slot_number, "err" => ?e
);
return Box::pin(future::ready(Ok(())))
}
};
let inherent_digest = BabePreDigest {
vrf_proof,
vrf_output: inout.to_output(),
authority_index: authority_index as u32,
slot_number,
};
// deadline our production to approx. the end of the slot
let remaining_duration = slot_info.remaining_duration();
futures::future::select(
proposer.propose(
slot_info.inherent_data,
generic::Digest {
logs: vec![
generic::DigestItem::babe_pre_digest(inherent_digest.clone()),
],
},
remaining_duration,
).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()),
Delay::new(remaining_duration)
.map_err(|err| consensus_common::Error::FaultyTimer(err).into())
).map(|v| match v {
futures::future::Either::Left((v, _)) => v.map(|v| (v, key)),
futures::future::Either::Right((Ok(_), _)) =>
Err(consensus_common::Error::ClientImport("Timeout in the BaBe proposer".into())),
futures::future::Either::Right((Err(err), _)) => Err(err),
})
} else {
return Box::pin(future::ready(Ok(())));
fn pre_digest_data(&self, slot_number: u64, claim: &Self::Claim) -> Vec<sr_primitives::DigestItem<B::Hash>> {
let inherent_digest = BabePreDigest {
vrf_proof: claim.1.clone(),
vrf_output: claim.0.to_output(),
authority_index: claim.2,
slot_number,
};
Box::pin(proposal_work.map_ok(move |(b, key)| {
// minor hack since we don't have access to the timestamp
// that is actually set by the proposer.
let slot_after_building = SignedDuration::default().slot_now(slot_duration);
if slot_after_building != slot_number {
info!(
target: "babe",
"Discarding proposal for slot {}; block production took too long",
slot_number
);
telemetry!(CONSENSUS_INFO; "babe.discarding_proposal_took_too_long";
"slot" => slot_number
);
return;
}
let (header, body) = b.deconstruct();
let header_num = header.number().clone();
let parent_hash = header.parent_hash().clone();
vec![
<DigestItemFor<B> as CompatibleDigestItem>::babe_pre_digest(inherent_digest),
]
}
fn import_block(&self) -> Box<dyn Fn(
B::Header,
&B::Hash,
Vec<B::Extrinsic>,
Self::Claim,
) -> consensus_common::BlockImportParams<B> + Send> {
Box::new(|header, header_hash, body, (_, _, _, pair)| {
// sign the pre-sealed hash of the block and then
// add it to a digest item.
let header_hash = header.hash();
let signature = key.sign(header_hash.as_ref());
let signature_digest_item = DigestItemFor::<B>::babe_seal(signature);
let signature = pair.sign(header_hash.as_ref());
let signature_digest_item = <DigestItemFor<B> as CompatibleDigestItem>::babe_seal(signature);
let import_block = BlockImportParams::<B> {
BlockImportParams {
origin: BlockOrigin::Own,
header,
justification: None,
@@ -386,29 +308,41 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
finalized: false,
auxiliary: Vec::new(),
fork_choice: ForkChoiceStrategy::LongestChain,
};
info!(target: "babe",
"Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
header_num,
import_block.post_header().hash(),
header_hash,
);
telemetry!(CONSENSUS_INFO; "babe.pre_sealed_block";
"header_num" => ?header_num,
"hash_now" => ?import_block.post_header().hash(),
"hash_previously" => ?header_hash,
);
if let Err(e) = block_import.lock().import_block(import_block, Default::default()) {
warn!(target: "babe", "Error with block built on {:?}: {:?}",
parent_hash, e);
telemetry!(CONSENSUS_WARN; "babe.err_with_block_built_on";
"hash" => ?parent_hash, "err" => ?e
);
}
}))
})
}
fn force_authoring(&self) -> bool {
self.force_authoring
}
fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
&mut self.sync_oracle
}
fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, consensus_common::Error> {
self.env.init(block).map_err(|e| {
consensus_common::Error::ClientImport(format!("{:?}", e)).into()
})
}
}
impl<H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> where
B: BlockT<Header=H>,
C: ProvideRuntimeApi + ProvideCache<B> + Send + Sync,
C::Api: BabeApi<B>,
E: Environment<B, Error=Error> + Send + Sync,
E::Proposer: Proposer<B, Error=Error>,
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
H: Header<Hash=B::Hash>,
I: BlockImport<B> + Send + Sync + 'static,
SO: SyncOracle + Send + Sync + Clone,
Error: std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
{
type OnSlot = Pin<Box<dyn Future<Output = Result<(), consensus_common::Error>> + Send>>;
fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot {
<Self as slots::SimpleSlotWorker<B>>::on_slot(self, chain_head, slot_info)
}
}
@@ -831,7 +765,7 @@ fn calculate_threshold(
/// so it returns `Some(_)`. Otherwise, it returns `None`.
fn claim_slot(
slot_number: u64,
Epoch { ref authorities, ref randomness, epoch_index, .. }: Epoch,
Epoch { authorities, randomness, epoch_index, .. }: &Epoch,
c: (u64, u64),
keystore: &KeyStorePtr,
) -> Option<((VRFInOut, VRFProof, VRFProofBatchable), usize, AuthorityPair)> {
@@ -841,7 +775,7 @@ fn claim_slot(
.find_map(|(i, a)| {
keystore.key_pair::<AuthorityPair>(&a.0).ok().map(|kp| (kp, i))
})?;
let transcript = make_transcript(randomness, slot_number, epoch_index);
let transcript = make_transcript(randomness, slot_number, *epoch_index);
// Compute the threshold we will use.
//
@@ -1228,7 +1162,7 @@ pub mod test_helpers {
super::claim_slot(
slot_number,
epoch,
&epoch,
c,
keystore,
).map(|((inout, vrf_proof, _), authority_index, _)| {
+1 -1
View File
@@ -325,7 +325,7 @@ fn can_author_block() {
duration: 100,
};
loop {
match claim_slot(i, epoch.clone(), (3, 10), &keystore) {
match claim_slot(i, &epoch.clone(), (3, 10), &keystore) {
None => i += 1,
Some(s) => {
debug!(target: "babe", "Authored block {:?}", s.0);