Switch AuRa to dynamic keystore lookup (#3359)

This commit is contained in:
Bastian Köcher
2019-08-10 00:10:53 +02:00
committed by André Silva
parent a29f5763a7
commit d1161b7d36
4 changed files with 63 additions and 52 deletions
+1
View File
@@ -4407,6 +4407,7 @@ dependencies = [
"substrate-service 2.0.0", "substrate-service 2.0.0",
"substrate-telemetry 2.0.0", "substrate-telemetry 2.0.0",
"substrate-test-runtime-client 2.0.0", "substrate-test-runtime-client 2.0.0",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
+2 -1
View File
@@ -18,7 +18,7 @@ inherents = { package = "substrate-inherents", path = "../../inherents" }
srml-aura = { path = "../../../srml/aura" } srml-aura = { path = "../../../srml/aura" }
client = { package = "substrate-client", path = "../../client" } client = { package = "substrate-client", path = "../../client" }
substrate-telemetry = { path = "../../telemetry" } substrate-telemetry = { path = "../../telemetry" }
substrate-keystore = { path = "../../keystore" } keystore = { package = "substrate-keystore", path = "../../keystore" }
consensus_common = { package = "substrate-consensus-common", path = "../common" } consensus_common = { package = "substrate-consensus-common", path = "../common" }
sr-primitives = { path = "../../sr-primitives" } sr-primitives = { path = "../../sr-primitives" }
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
@@ -35,3 +35,4 @@ service = { package = "substrate-service", path = "../../service" }
test-client = { package = "substrate-test-runtime-client", path = "../../test-runtime/client" } test-client = { package = "substrate-test-runtime-client", path = "../../test-runtime/client" }
tokio = "0.1.7" tokio = "0.1.7"
env_logger = "0.6" env_logger = "0.6"
tempfile = "3.1"
+55 -44
View File
@@ -39,12 +39,8 @@ use consensus_common::import_queue::{
Verifier, BasicQueue, BoxBlockImport, BoxJustificationImport, BoxFinalityProofImport, Verifier, BasicQueue, BoxBlockImport, BoxJustificationImport, BoxFinalityProofImport,
}; };
use client::{ use client::{
block_builder::api::BlockBuilder as BlockBuilderApi, block_builder::api::BlockBuilder as BlockBuilderApi, blockchain::ProvideCache,
blockchain::ProvideCache, runtime_api::ApiExt, error::Result as CResult, backend::AuxStore, BlockOf,
runtime_api::ApiExt,
error::Result as CResult,
backend::AuxStore,
BlockOf
}; };
use sr_primitives::{generic::{self, BlockId, OpaqueDigestItemId}, Justification}; use sr_primitives::{generic::{self, BlockId, OpaqueDigestItemId}, Justification};
@@ -67,6 +63,8 @@ use substrate_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS
use slots::{CheckedHeader, SlotData, SlotWorker, SlotInfo, SlotCompatible}; use slots::{CheckedHeader, SlotData, SlotWorker, SlotInfo, SlotCompatible};
use slots::{SignedDuration, check_equivocation}; use slots::{SignedDuration, check_equivocation};
use keystore::KeyStorePtr;
pub use aura_primitives::*; pub use aura_primitives::*;
pub use consensus_common::SyncOracle; pub use consensus_common::SyncOracle;
pub use digest::CompatibleDigestItem; pub use digest::CompatibleDigestItem;
@@ -103,8 +101,10 @@ fn slot_author<P: Pair>(slot_num: u64, authorities: &[AuthorityId<P>]) -> Option
if authorities.is_empty() { return None } if authorities.is_empty() { return None }
let idx = slot_num % (authorities.len() as u64); let idx = slot_num % (authorities.len() as u64);
assert!(idx <= usize::max_value() as u64, assert!(
"It is impossible to have a vector with length beyond the address space; qed"); idx <= usize::max_value() as u64,
"It is impossible to have a vector with length beyond the address space; qed",
);
let current_author = authorities.get(idx as usize) let current_author = authorities.get(idx as usize)
.expect("authorities not empty; index constrained to list length;\ .expect("authorities not empty; index constrained to list length;\
@@ -132,7 +132,6 @@ impl SlotCompatible for AuraSlotCompatible {
/// Start the aura worker. The returned future should be run in a futures executor. /// Start the aura worker. The returned future should be run in a futures executor.
pub fn start_aura<B, C, SC, E, I, P, SO, Error, H>( pub fn start_aura<B, C, SC, E, I, P, SO, Error, H>(
slot_duration: SlotDuration, slot_duration: SlotDuration,
local_key: Arc<P>,
client: Arc<C>, client: Arc<C>,
select_chain: SC, select_chain: SC,
block_import: I, block_import: I,
@@ -140,6 +139,7 @@ pub fn start_aura<B, C, SC, E, I, P, SO, Error, H>(
sync_oracle: SO, sync_oracle: SO,
inherent_data_providers: InherentDataProviders, inherent_data_providers: InherentDataProviders,
force_authoring: bool, force_authoring: bool,
keystore: Option<KeyStorePtr>,
) -> Result<impl futures01::Future<Item = (), Error = ()>, consensus_common::Error> where ) -> Result<impl futures01::Future<Item = (), Error = ()>, consensus_common::Error> where
B: BlockT<Header=H>, B: BlockT<Header=H>,
C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + AuxStore + Send + Sync, C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + AuxStore + Send + Sync,
@@ -160,9 +160,10 @@ pub fn start_aura<B, C, SC, E, I, P, SO, Error, H>(
client: client.clone(), client: client.clone(),
block_import: Arc::new(Mutex::new(block_import)), block_import: Arc::new(Mutex::new(block_import)),
env, env,
local_key, keystore,
sync_oracle: sync_oracle.clone(), sync_oracle: sync_oracle.clone(),
force_authoring, force_authoring,
_key_type: PhantomData::<P>,
}; };
register_aura_inherent_data_provider( register_aura_inherent_data_provider(
&inherent_data_providers, &inherent_data_providers,
@@ -182,9 +183,10 @@ struct AuraWorker<C, E, I, P, SO> {
client: Arc<C>, client: Arc<C>,
block_import: Arc<Mutex<I>>, block_import: Arc<Mutex<I>>,
env: E, env: E,
local_key: Arc<P>, keystore: Option<KeyStorePtr>,
sync_oracle: SO, sync_oracle: SO,
force_authoring: bool, force_authoring: bool,
_key_type: PhantomData<P>,
} }
impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> where impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> where
@@ -209,8 +211,6 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
chain_head: B::Header, chain_head: B::Header,
slot_info: SlotInfo, slot_info: SlotInfo,
) -> Self::OnSlot { ) -> Self::OnSlot {
let pair = self.local_key.clone();
let public_key = self.local_key.public();
let client = self.client.clone(); let client = self.client.clone();
let block_import = self.block_import.clone(); let block_import = self.block_import.clone();
@@ -220,13 +220,12 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
let authorities = match authorities(client.as_ref(), &BlockId::Hash(chain_head.hash())) { let authorities = match authorities(client.as_ref(), &BlockId::Hash(chain_head.hash())) {
Ok(authorities) => authorities, Ok(authorities) => authorities,
Err(e) => { Err(e) => {
warn!( warn!("Unable to fetch authorities at block {:?}: {:?}", chain_head.hash(), e);
"Unable to fetch authorities at block {:?}: {:?}",
chain_head.hash(), telemetry!(
e CONSENSUS_WARN; "aura.unable_fetching_authorities";
); "slot" => ?chain_head.hash(),
telemetry!(CONSENSUS_WARN; "aura.unable_fetching_authorities"; "err" => ?e,
"slot" => ?chain_head.hash(), "err" => ?e
); );
return Box::pin(future::ready(Ok(()))); return Box::pin(future::ready(Ok(())));
} }
@@ -234,22 +233,30 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
if !self.force_authoring && self.sync_oracle.is_offline() && authorities.len() > 1 { if !self.force_authoring && self.sync_oracle.is_offline() && authorities.len() > 1 {
debug!(target: "aura", "Skipping proposal slot. Waiting for the network."); debug!(target: "aura", "Skipping proposal slot. Waiting for the network.");
telemetry!(CONSENSUS_DEBUG; "aura.skipping_proposal_slot"; telemetry!(
"authorities_len" => authorities.len() CONSENSUS_DEBUG;
"aura.skipping_proposal_slot";
"authorities_len" => authorities.len(),
); );
return Box::pin(future::ready(Ok(()))); return Box::pin(future::ready(Ok(())));
} }
let maybe_author = slot_author::<P>(slot_num, &authorities); let maybe_author = slot_author::<P>(slot_num, &authorities);
let proposal_work = match maybe_author { let maybe_pair = maybe_author.and_then(|p|
self.keystore.as_ref().and_then(|k|
k.read().key_pair_by_type::<P>(&p, app_crypto::key_types::AURA).ok()
)
);
let proposal_work = match maybe_pair {
None => return Box::pin(future::ready(Ok(()))), None => return Box::pin(future::ready(Ok(()))),
Some(author) => if author == &public_key { Some(pair) => {
debug!( debug!(
target: "aura", "Starting authorship at slot {}; timestamp = {}", target: "aura", "Starting authorship at slot {}; timestamp = {}",
slot_num, slot_num,
timestamp timestamp,
); );
telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship"; telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship";
"slot_num" => slot_num, "timestamp" => timestamp "slot_num" => slot_num,
"timestamp" => timestamp,
); );
// we are the slot author. make a block and sign it. // we are the slot author. make a block and sign it.
@@ -280,27 +287,22 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
Delay::new(remaining_duration) Delay::new(remaining_duration)
.map_err(|err| consensus_common::Error::FaultyTimer(err).into()) .map_err(|err| consensus_common::Error::FaultyTimer(err).into())
).map(|v| match v { ).map(|v| match v {
futures::future::Either::Left((v, _)) => v, futures::future::Either::Left((v, _)) => v.map(|v| (v, pair)),
futures::future::Either::Right((Ok(_), _)) => futures::future::Either::Right((Ok(_), _)) =>
Err(consensus_common::Error::ClientImport("Timeout in the AuRa proposer".into())), Err(consensus_common::Error::ClientImport("Timeout in the AuRa proposer".into())),
futures::future::Either::Right((Err(err), _)) => Err(err), futures::future::Either::Right((Err(err), _)) => Err(err),
}) })
} else {
return Box::pin(future::ready(Ok(())));
} }
}; };
Box::pin(proposal_work.map_ok(move |b| { Box::pin(proposal_work.map_ok(move |(b, pair)| {
// minor hack since we don't have access to the timestamp // minor hack since we don't have access to the timestamp
// that is actually set by the proposer. // that is actually set by the proposer.
let slot_after_building = SignedDuration::default().slot_now(slot_duration); let slot_after_building = SignedDuration::default().slot_now(slot_duration);
if slot_after_building != slot_num { if slot_after_building != slot_num {
info!( info!("Discarding proposal for slot {}; block production took too long", slot_num);
"Discarding proposal for slot {}; block production took too long",
slot_num
);
telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long"; telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long";
"slot" => slot_num "slot" => slot_num,
); );
return return
} }
@@ -311,7 +313,7 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
error!(target: "aura", "FATAL ERROR: Invalid pre-digest: {}!", e); error!(target: "aura", "FATAL ERROR: Invalid pre-digest: {}!", e);
return return
} else { } else {
trace!(target: "aura", "Got correct number of seals. Good!") trace!(target: "aura", "Got correct number of seals. Good!")
}; };
let header_num = header.number().clone(); let header_num = header.number().clone();
@@ -342,14 +344,14 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block"; telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block";
"header_num" => ?header_num, "header_num" => ?header_num,
"hash_now" => ?import_block.post_header().hash(), "hash_now" => ?import_block.post_header().hash(),
"hash_previously" => ?header_hash "hash_previously" => ?header_hash,
); );
if let Err(e) = block_import.lock().import_block(import_block, Default::default()) { if let Err(e) = block_import.lock().import_block(import_block, Default::default()) {
warn!(target: "aura", "Error with block built on {:?}: {:?}", warn!(target: "aura", "Error with block built on {:?}: {:?}", parent_hash, e);
parent_hash, e);
telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on"; telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on";
"hash" => ?parent_hash, "err" => ?e "hash" => ?parent_hash, "err" => ?e,
); );
} }
})) }))
@@ -358,8 +360,9 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
macro_rules! aura_err { macro_rules! aura_err {
($($i: expr),+) => { ($($i: expr),+) => {
{ debug!(target: "aura", $($i),+) {
; format!($($i),+) debug!(target: "aura", $( $i ),+);
format!($( $i ),+)
} }
}; };
} }
@@ -820,7 +823,7 @@ mod tests {
#[test] #[test]
#[allow(deprecated)] #[allow(deprecated)]
fn authoring_blocks() { fn authoring_blocks() {
let _ = ::env_logger::try_init(); let _ = env_logger::try_init();
let net = AuraTestNet::new(3); let net = AuraTestNet::new(3);
let peers = &[ let peers = &[
@@ -833,7 +836,15 @@ mod tests {
let mut import_notifications = Vec::new(); let mut import_notifications = Vec::new();
let mut runtime = current_thread::Runtime::new().unwrap(); let mut runtime = current_thread::Runtime::new().unwrap();
let mut keystore_paths = Vec::new();
for (peer_id, key) in peers { for (peer_id, key) in peers {
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
let keystore = keystore::Store::open(keystore_path.path(), None).expect("Creates keystore.");
keystore.write().insert_ephemeral_from_seed::<AuthorityPair>(&key.to_seed())
.expect("Creates authority key");
keystore_paths.push(keystore_path);
let client = net.lock().peer(*peer_id).client().as_full().expect("full clients are created").clone(); let client = net.lock().peer(*peer_id).client().as_full().expect("full clients are created").clone();
#[allow(deprecated)] #[allow(deprecated)]
let select_chain = LongestChain::new( let select_chain = LongestChain::new(
@@ -856,7 +867,6 @@ mod tests {
let aura = start_aura::<_, _, _, _, _, AuthorityPair, _, _, _>( let aura = start_aura::<_, _, _, _, _, AuthorityPair, _, _, _>(
slot_duration, slot_duration,
Arc::new(key.clone().pair().into()),
client.clone(), client.clone(),
select_chain, select_chain,
client, client,
@@ -864,6 +874,7 @@ mod tests {
DummyOracle, DummyOracle,
inherent_data_providers, inherent_data_providers,
false, false,
Some(keystore),
).expect("Starts aura"); ).expect("Starts aura");
runtime.spawn(aura); runtime.spawn(aura);
+5 -7
View File
@@ -3,7 +3,6 @@
#![warn(unused_extern_crates)] #![warn(unused_extern_crates)]
use std::sync::Arc; use std::sync::Arc;
use log::info;
use transaction_pool::{self, txpool::{Pool as TransactionPool}}; use transaction_pool::{self, txpool::{Pool as TransactionPool}};
use node_template_runtime::{self, GenesisConfig, opaque::Block, RuntimeApi, WASM_BINARY}; use node_template_runtime::{self, GenesisConfig, opaque::Block, RuntimeApi, WASM_BINARY};
use substrate_service::{ use substrate_service::{
@@ -15,11 +14,11 @@ use basic_authorship::ProposerFactory;
use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration}; use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration};
use futures::prelude::*; use futures::prelude::*;
use substrate_client::{self as client, LongestChain}; use substrate_client::{self as client, LongestChain};
use primitives::{Pair as PairT};
use inherents::InherentDataProviders; use inherents::InherentDataProviders;
use network::{config::DummyFinalityProofRequestBuilder, construct_simple_protocol}; use network::{config::DummyFinalityProofRequestBuilder, construct_simple_protocol};
use substrate_executor::native_executor_instance; use substrate_executor::native_executor_instance;
use substrate_service::construct_service_factory; use substrate_service::construct_service_factory;
use aura_primitives::sr25519::AuthorityPair as AuraAuthorityPair;
pub use substrate_executor::NativeExecutor; pub use substrate_executor::NativeExecutor;
// Our native executor instance. // Our native executor instance.
@@ -66,8 +65,7 @@ construct_service_factory! {
}, },
AuthoritySetup = { AuthoritySetup = {
|service: Self::FullService| { |service: Self::FullService| {
if let Some(key) = None::<aura_primitives::sr25519::AuthorityPair> { if service.config().roles.is_authority() {
info!("Using authority key {}", key.public());
let proposer = ProposerFactory { let proposer = ProposerFactory {
client: service.client(), client: service.client(),
transaction_pool: service.transaction_pool(), transaction_pool: service.transaction_pool(),
@@ -75,9 +73,8 @@ construct_service_factory! {
let client = service.client(); let client = service.client();
let select_chain = service.select_chain() let select_chain = service.select_chain()
.ok_or_else(|| ServiceError::SelectChainRequired)?; .ok_or_else(|| ServiceError::SelectChainRequired)?;
let aura = start_aura( let aura = start_aura::<_, _, _, _, _, AuraAuthorityPair, _, _, _>(
SlotDuration::get_or_compute(&*client)?, SlotDuration::get_or_compute(&*client)?,
Arc::new(key),
client.clone(), client.clone(),
select_chain, select_chain,
client, client,
@@ -85,6 +82,7 @@ construct_service_factory! {
service.network(), service.network(),
service.config().custom.inherent_data_providers.clone(), service.config().custom.inherent_data_providers.clone(),
service.config().force_authoring, service.config().force_authoring,
Some(service.keystore()),
)?; )?;
service.spawn_task(Box::new(aura.select(service.on_exit()).then(|_| Ok(())))); service.spawn_task(Box::new(aura.select(service.on_exit()).then(|_| Ok(()))));
} }
@@ -113,7 +111,7 @@ construct_service_factory! {
> >
{ |config: &mut FactoryFullConfiguration<Self>, client: Arc<LightClient<Self>>| { { |config: &mut FactoryFullConfiguration<Self>, client: Arc<LightClient<Self>>| {
let fprb = Box::new(DummyFinalityProofRequestBuilder::default()) as Box<_>; let fprb = Box::new(DummyFinalityProofRequestBuilder::default()) as Box<_>;
import_queue::<_, _, aura_primitives::sr25519::AuthorityPair>( import_queue::<_, _, AuraAuthorityPair>(
SlotDuration::get_or_compute(&*client)?, SlotDuration::get_or_compute(&*client)?,
Box::new(client.clone()), Box::new(client.clone()),
None, None,