ci: add quick-check with rustfmt (#615)

* ci: add quick-check with clippy and rustfmt

* chore: rustfmt round

* chore: set the same rustfmt config than substrate

* chore: fix formatting

* cI: remove clippy

* ci: switch to nightly for the checks

* ci: fix toolchains and naming

* ci: Limit the check to formatting

* chore: fix formatting

* Update .rustfmt.toml

* Update .rustfmt.toml

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Chevdor
2021-09-16 16:57:52 +02:00
committed by GitHub
parent 035a576008
commit 1dd000a011
98 changed files with 1244 additions and 1872 deletions
+8 -13
View File
@@ -54,10 +54,8 @@ impl PurgeChainCmd {
relay_config: sc_service::Configuration,
) -> sc_cli::Result<()> {
let databases = match (self.parachain, self.relaychain) {
(true, true) | (false, false) => vec![
("parachain", para_config.database),
("relaychain", relay_config.database),
],
(true, true) | (false, false) =>
vec![("parachain", para_config.database), ("relaychain", relay_config.database)],
(true, false) => vec![("parachain", para_config.database)],
(false, true) => vec![("relaychain", relay_config.database)],
};
@@ -86,11 +84,11 @@ impl PurgeChainCmd {
let input = input.trim();
match input.chars().nth(0) {
Some('y') | Some('Y') => {}
Some('y') | Some('Y') => {},
_ => {
println!("Aborted");
return Ok(());
}
return Ok(())
},
}
}
@@ -98,10 +96,10 @@ impl PurgeChainCmd {
match fs::remove_dir_all(&db_path) {
Ok(_) => {
println!("{:?} removed.", &db_path);
}
},
Err(ref err) if err.kind() == io::ErrorKind::NotFound => {
eprintln!("{:?} did not exist.", &db_path);
}
},
Err(err) => return Err(err.into()),
}
}
@@ -155,10 +153,7 @@ impl RunCmd {
new_base.validator = self.base.validator || self.collator;
NormalizedRunCmd {
base: new_base,
parachain_id: self.parachain_id,
}
NormalizedRunCmd { base: new_base, parachain_id: self.parachain_id }
}
}
+19 -41
View File
@@ -81,12 +81,7 @@ where
) -> Self {
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));
Self {
block_status,
wait_to_announce,
runtime_api,
parachain_consensus,
}
Self { block_status, wait_to_announce, runtime_api, parachain_consensus }
}
/// Checks the status of the given block hash in the Parachain.
@@ -101,7 +96,7 @@ where
"Skipping candidate production, because block is still queued for import.",
);
false
}
},
Ok(BlockStatus::InChainWithState) => true,
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
@@ -110,7 +105,7 @@ where
hash,
);
false
}
},
Ok(BlockStatus::KnownBad) => {
tracing::error!(
target: LOG_TARGET,
@@ -118,7 +113,7 @@ where
"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
);
false
}
},
Ok(BlockStatus::Unknown) => {
if header.number().is_zero() {
tracing::error!(
@@ -134,7 +129,7 @@ where
);
}
false
}
},
Err(e) => {
tracing::error!(
target: LOG_TARGET,
@@ -143,7 +138,7 @@ where
"Failed to get block status.",
);
false
}
},
}
}
@@ -168,8 +163,8 @@ where
error = ?e,
"Failed to collect collation info.",
);
return None;
}
return None
},
};
Some(Collation {
@@ -202,13 +197,13 @@ where
error = ?e,
"Could not decode the head data."
);
return None;
}
return None
},
};
let last_head_hash = last_head.hash();
if !self.check_block_status(last_head_hash, &last_head) {
return None;
return None
}
tracing::info!(
@@ -232,8 +227,8 @@ where
Ok(proof) => proof,
Err(e) => {
tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
return None;
}
return None
},
};
// Create the parachain block data for the validators.
@@ -252,20 +247,11 @@ where
let (result_sender, signed_stmt_recv) = oneshot::channel();
self.wait_to_announce
.lock()
.wait_to_announce(block_hash, signed_stmt_recv);
self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
tracing::info!(
target: LOG_TARGET,
?block_hash,
"Produced proof-of-validity candidate.",
);
tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",);
Some(CollationResult {
collation,
result_sender: Some(result_sender),
})
Some(CollationResult { collation, result_sender: Some(result_sender) })
}
}
@@ -322,10 +308,7 @@ pub async fn start_collator<Block, RA, BS, Spawner>(
};
overseer_handle
.send_msg(
CollationGenerationMessage::Initialize(config),
"StartCollator",
)
.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
.await;
overseer_handle
@@ -384,10 +367,7 @@ mod tests {
.await
.expect("Imports the block");
Some(ParachainCandidate {
block,
proof: proof.expect("Proof is returned"),
})
Some(ParachainCandidate { block, proof: proof.expect("Proof is returned") })
}
}
@@ -424,9 +404,7 @@ mod tests {
spawner,
para_id,
key: CollatorPair::generate().0,
parachain_consensus: Box::new(DummyParachainConsensus {
client: client.clone(),
}),
parachain_consensus: Box::new(DummyParachainConsensus { client: client.clone() }),
});
block_on(collator_start);
+3 -7
View File
@@ -215,9 +215,8 @@ where
relay_parent: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>> {
let (inherent_data, inherent_data_providers) = self
.inherent_data(parent.hash(), validation_data, relay_parent)
.await?;
let (inherent_data, inherent_data_providers) =
self.inherent_data(parent.hash(), validation_data, relay_parent).await?;
let info = SlotInfo::new(
inherent_data_providers.slot(),
@@ -234,10 +233,7 @@ where
let res = self.aura_worker.lock().await.on_slot(info).await?;
Some(ParachainCandidate {
block: res.block,
proof: res.storage_proof,
})
Some(ParachainCandidate { block: res.block, proof: res.storage_proof })
}
}
+1 -3
View File
@@ -65,9 +65,7 @@ impl<B: BlockT> ParachainConsensus<B> for Box<dyn ParachainConsensus<B> + Send +
relay_parent: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>> {
(*self)
.produce_candidate(parent, relay_parent, validation_data)
.await
(*self).produce_candidate(parent, relay_parent, validation_data).await
}
}
@@ -75,7 +75,7 @@ where
h
} else {
tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
return;
return
};
let header = match Block::Header::decode(&mut &finalized_head[..]) {
@@ -86,8 +86,8 @@ where
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue;
}
continue
},
};
let hash = header.hash();
@@ -140,12 +140,8 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
R: RelaychainClient,
B: Backend<Block>,
{
let follow_new_best = follow_new_best(
para_id,
parachain.clone(),
relay_chain.clone(),
announce_block,
);
let follow_new_best =
follow_new_best(para_id, parachain.clone(), relay_chain.clone(), announce_block);
let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
_ = follow_new_best.fuse() => {},
@@ -242,12 +238,12 @@ async fn handle_new_block_imported<Block, P>(
};
let unset_hash = if notification.header.number() < unset_best_header.number() {
return;
return
} else if notification.header.number() == unset_best_header.number() {
let unset_hash = unset_best_header.hash();
if unset_hash != notification.hash {
return;
return
} else {
unset_hash
}
@@ -263,7 +259,7 @@ async fn handle_new_block_imported<Block, P>(
.expect("We checked above that the value is set; qed");
import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
}
},
state => tracing::debug!(
target: "cumulus-consensus",
?unset_best_header,
@@ -292,8 +288,8 @@ async fn handle_new_best_parachain_head<Block, P>(
error = ?err,
"Could not decode Parachain header while following best heads.",
);
return;
}
return
},
};
let hash = parachain_head.hash();
@@ -311,14 +307,14 @@ async fn handle_new_best_parachain_head<Block, P>(
unset_best_header.take();
import_block_as_new_best(hash, parachain_head, parachain).await;
}
},
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
target: "cumulus-collator",
block_hash = ?hash,
"Trying to set pruned block as new best!",
);
}
},
Ok(BlockStatus::Unknown) => {
*unset_best_header = Some(parachain_head);
@@ -327,7 +323,7 @@ async fn handle_new_best_parachain_head<Block, P>(
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
);
}
},
Err(e) => {
tracing::error!(
target: "cumulus-collator",
@@ -335,8 +331,8 @@ async fn handle_new_best_parachain_head<Block, P>(
error = ?e,
"Failed to get block status of block.",
);
}
_ => {}
},
_ => {},
}
}
}
@@ -356,7 +352,7 @@ where
"Skipping importing block as new best block, because there already exists a \
best block with an higher number",
);
return;
return
}
// Make it the new best block
@@ -364,10 +360,7 @@ where
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
block_import_params.import_existing = true;
if let Err(err) = (&*parachain)
.import_block(block_import_params, Default::default())
.await
{
if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()).await {
tracing::warn!(
target: "cumulus-consensus",
block_hash = ?hash,
@@ -392,10 +385,7 @@ where
self.import_notification_stream()
.filter_map(move |n| {
future::ready(if n.is_new_best {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash), para_id)
.ok()
.flatten()
relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten()
} else {
None
})
@@ -409,10 +399,7 @@ where
self.finality_notification_stream()
.filter_map(move |n| {
future::ready(
relay_chain
.parachain_head_at(&BlockId::hash(n.hash), para_id)
.ok()
.flatten(),
relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten(),
)
})
.boxed()
+18 -60
View File
@@ -62,9 +62,7 @@ struct Relaychain {
impl Relaychain {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(RelaychainInner::new())),
}
Self { inner: Arc::new(Mutex::new(RelaychainInner::new())) }
}
}
@@ -125,24 +123,17 @@ fn follow_new_best_works() {
let block = build_and_import_block(client.clone(), false);
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
new_best_heads_sender
.unbounded_send(block.header().clone())
.unwrap();
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.best_hash {
break;
break
}
}
};
@@ -166,24 +157,17 @@ fn follow_finalized_works() {
let block = build_and_import_block(client.clone(), false);
let relay_chain = Relaychain::new();
let finalized_sender = relay_chain
.inner
.lock()
.unwrap()
.finalized_heads_sender
.clone();
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
finalized_sender
.unbounded_send(block.header().clone())
.unwrap();
finalized_sender.unbounded_send(block.header().clone()).unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.finalized_hash {
break;
break
}
}
};
@@ -214,32 +198,23 @@ fn follow_finalized_does_not_stop_on_unknown_block() {
};
let relay_chain = Relaychain::new();
let finalized_sender = relay_chain
.inner
.lock()
.unwrap()
.finalized_heads_sender
.clone();
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
for _ in 0..3usize {
finalized_sender
.unbounded_send(unknown_block.header().clone())
.unwrap();
finalized_sender.unbounded_send(unknown_block.header().clone()).unwrap();
Delay::new(Duration::from_millis(100)).await;
}
finalized_sender
.unbounded_send(block.header().clone())
.unwrap();
finalized_sender.unbounded_send(block.header().clone()).unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.finalized_hash {
break;
break
}
}
};
@@ -273,32 +248,23 @@ fn follow_new_best_sets_best_after_it_is_imported() {
};
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
new_best_heads_sender
.unbounded_send(block.header().clone())
.unwrap();
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.best_hash {
break;
break
}
}
// Announce the unknown block
new_best_heads_sender
.unbounded_send(unknown_block.header().clone())
.unwrap();
new_best_heads_sender.unbounded_send(unknown_block.header().clone()).unwrap();
// Do some iterations. As this is a local task executor, only one task can run at a time.
// Meaning that it should already have processed the unknown block.
@@ -313,15 +279,12 @@ fn follow_new_best_sets_best_after_it_is_imported() {
block_import_params.body = Some(body);
// Now import the unkown block to make it "known"
client
.import_block(block_import_params, Default::default())
.await
.unwrap();
client.import_block(block_import_params, Default::default()).await.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if unknown_block.hash() == client.usage_info().chain.best_hash {
break;
break
}
}
};
@@ -362,12 +325,7 @@ fn do_not_set_best_block_to_older_block() {
assert_eq!(NUM_BLOCKS as u32, client.usage_info().chain.best_number);
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
@@ -40,11 +40,7 @@ pub struct Verifier<Client, Block, CIDP> {
impl<Client, Block, CIDP> Verifier<Client, Block, CIDP> {
/// Create a new instance.
pub fn new(client: Arc<Client>, create_inherent_data_providers: CIDP) -> Self {
Self {
client,
create_inherent_data_providers,
_marker: PhantomData,
}
Self { client, create_inherent_data_providers, _marker: PhantomData }
}
}
@@ -59,13 +55,7 @@ where
async fn verify(
&mut self,
mut block_params: BlockImportParams<Block, ()>,
) -> Result<
(
BlockImportParams<Block, ()>,
Option<Vec<(CacheKeyId, Vec<u8>)>>,
),
String,
> {
) -> Result<(BlockImportParams<Block, ()>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String> {
if let Some(inner_body) = block_params.body.take() {
let inherent_data_providers = self
.create_inherent_data_providers
@@ -73,9 +63,8 @@ where
.await
.map_err(|e| e.to_string())?;
let inherent_data = inherent_data_providers
.create_inherent_data()
.map_err(|e| format!("{:?}", e))?;
let inherent_data =
inherent_data_providers.create_inherent_data().map_err(|e| format!("{:?}", e))?;
let block = Block::new(block_params.header.clone(), inner_body);
@@ -130,9 +119,7 @@ where
Ok(BasicQueue::new(
verifier,
Box::new(cumulus_client_consensus_common::ParachainBlockImport::new(
block_import,
)),
Box::new(cumulus_client_consensus_common::ParachainBlockImport::new(block_import)),
None,
spawner,
registry,
+4 -9
View File
@@ -180,15 +180,10 @@ where
)
.ok()?;
let inherent_data = self
.inherent_data(parent.hash(), &validation_data, relay_parent)
.await?;
let inherent_data =
self.inherent_data(parent.hash(), &validation_data, relay_parent).await?;
let Proposal {
block,
storage_changes,
proof,
} = proposer
let Proposal { block, storage_changes, proof } = proposer
.propose(
inherent_data,
Default::default(),
@@ -226,7 +221,7 @@ where
"Error importing build block.",
);
return None;
return None
}
Some(ParachainCandidate { block, proof })
+37 -66
View File
@@ -33,13 +33,13 @@ use sp_runtime::{
traits::{Block as BlockT, HashFor, Header as HeaderT},
};
use polkadot_node_primitives::{SignedFullStatement, Statement, CollationSecondedSignal};
use polkadot_client::ClientHandle;
use polkadot_node_primitives::{CollationSecondedSignal, SignedFullStatement, Statement};
use polkadot_parachain::primitives::HeadData;
use polkadot_primitives::v1::{
Block as PBlock, Hash as PHash, CandidateReceipt, CompactStatement, Id as ParaId,
OccupiedCoreAssumption, ParachainHost, UncheckedSigned, SigningContext,
Block as PBlock, CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId,
OccupiedCoreAssumption, ParachainHost, SigningContext, UncheckedSigned,
};
use polkadot_client::ClientHandle;
use codec::{Decode, Encode};
use futures::{
@@ -85,14 +85,13 @@ impl BlockAnnounceData {
///
/// This will not check the signature, for this you should use [`BlockAnnounceData::check_signature`].
fn validate(&self, encoded_header: Vec<u8>) -> Result<(), Validation> {
let candidate_hash = if let CompactStatement::Seconded(h) = self.statement.unchecked_payload() {
let candidate_hash = if let CompactStatement::Seconded(h) =
self.statement.unchecked_payload()
{
h
} else {
tracing::debug!(
target: LOG_TARGET,
"`CompactStatement` isn't the candidate variant!",
);
return Err(Validation::Failure { disconnect: true });
tracing::debug!(target: LOG_TARGET, "`CompactStatement` isn't the candidate variant!",);
return Err(Validation::Failure { disconnect: true })
};
if *candidate_hash != self.receipt.hash() {
@@ -100,7 +99,7 @@ impl BlockAnnounceData {
target: LOG_TARGET,
"Receipt candidate hash doesn't match candidate hash in statement",
);
return Err(Validation::Failure { disconnect: true });
return Err(Validation::Failure { disconnect: true })
}
if HeadData(encoded_header).hash() != self.receipt.descriptor.para_head {
@@ -108,7 +107,7 @@ impl BlockAnnounceData {
target: LOG_TARGET,
"Receipt para head hash doesn't match the hash of the header in the block announcement",
);
return Err(Validation::Failure { disconnect: true });
return Err(Validation::Failure { disconnect: true })
}
Ok(())
@@ -131,22 +130,16 @@ impl BlockAnnounceData {
let runtime_api_block_id = BlockId::Hash(self.receipt.descriptor.relay_parent);
let session_index = match runtime_api.session_index_for_child(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => {
return Err(BlockAnnounceError(format!("{:?}", e)));
}
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
let signing_context = SigningContext {
parent_hash: self.receipt.descriptor.relay_parent,
session_index,
};
let signing_context =
SigningContext { parent_hash: self.receipt.descriptor.relay_parent, session_index };
// Check that the signer is a legit validator.
let authorities = match runtime_api.validators(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => {
return Err(BlockAnnounceError(format!("{:?}", e)));
}
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
let signer = match authorities.get(validator_index.0 as usize) {
Some(r) => r,
@@ -156,22 +149,18 @@ impl BlockAnnounceData {
"Block announcement justification signer is a validator index out of bound",
);
return Ok(Validation::Failure { disconnect: true });
}
return Ok(Validation::Failure { disconnect: true })
},
};
// Check statement is correctly signed.
if self
.statement
.try_into_checked(&signing_context, &signer)
.is_err()
{
if self.statement.try_into_checked(&signing_context, &signer).is_err() {
tracing::debug!(
target: LOG_TARGET,
"Block announcement justification signature is invalid.",
);
return Ok(Validation::Failure { disconnect: true });
return Ok(Validation::Failure { disconnect: true })
}
Ok(Validation::Success { is_new_best: true })
@@ -185,13 +174,10 @@ impl TryFrom<&'_ SignedFullStatement> for BlockAnnounceData {
let receipt = if let Statement::Seconded(receipt) = stmt.payload() {
receipt.to_plain()
} else {
return Err(());
return Err(())
};
Ok(BlockAnnounceData {
receipt,
statement: stmt.convert_payload().into(),
})
Ok(BlockAnnounceData { receipt, statement: stmt.convert_payload().into() })
}
}
@@ -273,16 +259,13 @@ where
.persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut)
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
Box::new(BlockAnnounceError(
"Could not find parachain head in relay chain".into(),
)) as Box<_>
Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
as Box<_>
})?;
let para_head =
Block::Header::decode(&mut &validation_data.parent_head.0[..]).map_err(|e| {
Box::new(BlockAnnounceError(format!(
"Failed to decode parachain head: {:?}",
e
))) as Box<_>
Box::new(BlockAnnounceError(format!("Failed to decode parachain head: {:?}", e)))
as Box<_>
})?;
Ok(para_head)
@@ -320,21 +303,15 @@ where
let best_head =
Self::included_block(&*relay_chain_client, &runtime_api_block_id, para_id)?;
let known_best_number = best_head.number();
let backed_block = ||
Self::backed_block_hash(&*relay_chain_client, &runtime_api_block_id, para_id);
let backed_block =
|| Self::backed_block_hash(&*relay_chain_client, &runtime_api_block_id, para_id);
if best_head == header {
tracing::debug!(
target: LOG_TARGET,
"Announced block matches best block.",
);
tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
Ok(Validation::Success { is_new_best: true })
} else if Some(HeadData(header.encode()).hash()) == backed_block()? {
tracing::debug!(
target: LOG_TARGET,
"Announced block matches latest backed block.",
);
tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
Ok(Validation::Success { is_new_best: true })
} else if block_number >= known_best_number {
@@ -367,23 +344,20 @@ where
mut data: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
if self.relay_chain_sync_oracle.is_major_syncing() {
return ready(Ok(Validation::Success { is_new_best: false })).boxed();
return ready(Ok(Validation::Success { is_new_best: false })).boxed()
}
if data.is_empty() {
return self
.handle_empty_block_announce_data(header.clone())
.boxed();
return self.handle_empty_block_announce_data(header.clone()).boxed()
}
let block_announce_data = match BlockAnnounceData::decode(&mut data) {
Ok(r) => r,
Err(_) => {
Err(_) =>
return ready(Err(Box::new(BlockAnnounceError(
"Can not decode the `BlockAnnounceData`".into(),
)) as Box<_>))
.boxed()
}
.boxed(),
};
let relay_chain_client = self.relay_chain_client.clone();
@@ -392,7 +366,7 @@ where
async move {
if let Err(e) = block_announce_data.validate(header_encoded) {
return Ok(e);
return Ok(e)
}
let relay_parent = block_announce_data.receipt.descriptor.relay_parent;
@@ -519,10 +493,7 @@ impl<Block: BlockT> WaitToAnnounce<Block> {
spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
) -> WaitToAnnounce<Block> {
WaitToAnnounce {
spawner,
announce_block,
}
WaitToAnnounce { spawner, announce_block }
}
/// Wait for a candidate message for the block, then announce the block. The candidate
@@ -567,8 +538,8 @@ async fn wait_to_announce<Block: BlockT>(
block = ?block_hash,
"Wait to announce stopped, because sender was dropped.",
);
return;
}
return
},
};
if let Ok(data) = BlockAnnounceData::try_from(&statement) {
+23 -68
View File
@@ -15,15 +15,16 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use cumulus_test_service::runtime::{Block, Header, Hash};
use cumulus_test_service::runtime::{Block, Hash, Header};
use futures::{executor::block_on, poll, task::Poll};
use parking_lot::Mutex;
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_primitives::v1::{
Block as PBlock, BlockNumber, CandidateCommitments, CandidateDescriptor, CandidateEvent,
CommittedCandidateReceipt, CoreState, GroupRotationInfo, Hash as PHash, HeadData, Id as ParaId,
InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, ParachainHost,
PersistedValidationData, SessionIndex, SessionInfo, SigningContext, ValidationCode, ValidationCodeHash,
ValidatorId, ValidatorIndex,
PersistedValidationData, SessionIndex, SessionInfo, SigningContext, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex,
};
use polkadot_test_client::{
Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend,
@@ -37,7 +38,6 @@ use sp_keyring::Sr25519Keyring;
use sp_keystore::{testing::KeyStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::RuntimeAppPublic;
use std::collections::BTreeMap;
use parking_lot::Mutex;
fn check_error(error: crate::BoxedError, check_error: impl Fn(&BlockAnnounceError) -> bool) {
let error = *error
@@ -61,10 +61,8 @@ impl SyncOracle for DummyCollatorNetwork {
}
}
fn make_validator_and_api() -> (
BlockAnnounceValidator<Block, TestApi, PBackend, PClient>,
Arc<TestApi>,
) {
fn make_validator_and_api(
) -> (BlockAnnounceValidator<Block, TestApi, PBackend, PClient>, Arc<TestApi>) {
let api = Arc::new(TestApi::new());
(
@@ -94,12 +92,7 @@ async fn make_gossip_message_and_header_using_genesis(
api: Arc<TestApi>,
validator_index: u32,
) -> (SignedFullStatement, Header) {
let relay_parent = api
.relay_client
.hash(0)
.ok()
.flatten()
.expect("Genesis hash exists");
let relay_parent = api.relay_client.hash(0).ok().flatten().expect("Genesis hash exists");
make_gossip_message_and_header(api, relay_parent, validator_index).await
}
@@ -116,14 +109,9 @@ async fn make_gossip_message_and_header(
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index = api
.runtime_api()
.session_index_for_child(&BlockId::Hash(relay_parent))
.unwrap();
let signing_context = SigningContext {
parent_hash: relay_parent,
session_index,
};
let session_index =
api.runtime_api().session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let header = default_header();
let candidate_receipt = CommittedCandidateReceipt {
@@ -156,10 +144,7 @@ async fn make_gossip_message_and_header(
#[test]
fn valid_if_no_data_and_less_than_best_known_number() {
let mut validator = make_validator_and_api().0;
let header = Header {
number: 0,
..default_header()
};
let header = Header { number: 0, ..default_header() };
let res = block_on(validator.validate(&header, &[]));
assert_eq!(
@@ -172,11 +157,7 @@ fn valid_if_no_data_and_less_than_best_known_number() {
#[test]
fn invalid_if_no_data_exceeds_best_known_number() {
let mut validator = make_validator_and_api().0;
let header = Header {
number: 1,
state_root: Hash::random(),
..default_header()
};
let header = Header { number: 1, state_root: Hash::random(), ..default_header() };
let res = block_on(validator.validate(&header, &[]));
assert_eq!(
@@ -219,9 +200,7 @@ fn check_signer_is_legit_validator() {
let (mut validator, api) = make_validator_and_api();
let (signed_statement, header) = block_on(make_gossip_message_and_header_using_genesis(api, 1));
let data = BlockAnnounceData::try_from(&signed_statement)
.unwrap()
.encode();
let data = BlockAnnounceData::try_from(&signed_statement).unwrap().encode();
let res = block_on(validator.validate(&header, &data));
assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
@@ -233,9 +212,7 @@ fn check_statement_is_correctly_signed() {
let (signed_statement, header) = block_on(make_gossip_message_and_header_using_genesis(api, 0));
let mut data = BlockAnnounceData::try_from(&signed_statement)
.unwrap()
.encode();
let mut data = BlockAnnounceData::try_from(&signed_statement).unwrap().encode();
// The signature comes at the end of the type, so change a bit to make the signature invalid.
let last = data.len() - 1;
@@ -258,14 +235,9 @@ fn check_statement_seconded() {
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index = api
.runtime_api()
.session_index_for_child(&BlockId::Hash(relay_parent))
.unwrap();
let signing_context = SigningContext {
parent_hash: relay_parent,
session_index,
};
let session_index =
api.runtime_api().session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let statement = Statement::Valid(Default::default());
@@ -296,9 +268,7 @@ fn check_header_match_candidate_receipt_header() {
let (signed_statement, mut header) =
block_on(make_gossip_message_and_header_using_genesis(api, 0));
let data = BlockAnnounceData::try_from(&signed_statement)
.unwrap()
.encode();
let data = BlockAnnounceData::try_from(&signed_statement).unwrap().encode();
header.number = 300;
let res = block_on(validator.validate(&header, &data));
@@ -315,17 +285,11 @@ fn relay_parent_not_imported_when_block_announce_is_processed() {
let (mut validator, api) = make_validator_and_api();
let mut client = api.relay_client.clone();
let block = client
.init_polkadot_block_builder()
.build()
.expect("Build new block")
.block;
let block = client.init_polkadot_block_builder().build().expect("Build new block").block;
let (signed_statement, header) = make_gossip_message_and_header(api, block.hash(), 0).await;
let data = BlockAnnounceData::try_from(&signed_statement)
.unwrap()
.encode();
let data = BlockAnnounceData::try_from(&signed_statement).unwrap().encode();
let mut validation = validator.validate(&header, &data);
@@ -333,10 +297,7 @@ fn relay_parent_not_imported_when_block_announce_is_processed() {
// that the future is still pending.
assert!(poll!(&mut validation).is_pending());
client
.import(BlockOrigin::Own, block)
.await
.expect("Imports the block");
client.import(BlockOrigin::Own, block).await.expect("Imports the block");
assert!(matches!(
poll!(validation),
@@ -357,10 +318,7 @@ fn block_announced_without_statement_and_block_only_backed() {
let validation = validator.validate(&header, &[]);
assert!(matches!(
validation.await,
Ok(Validation::Success { is_new_best: true })
));
assert!(matches!(validation.await, Ok(Validation::Success { is_new_best: true })));
});
}
@@ -401,10 +359,7 @@ impl ProvideRuntimeApi<PBlock> for TestApi {
type Api = RuntimeApi;
fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
RuntimeApi {
data: self.data.clone(),
}
.into()
RuntimeApi { data: self.data.clone() }.into()
}
}
+11 -39
View File
@@ -31,10 +31,7 @@ const TIMEOUT_IN_SECONDS: u64 = 6;
/// Custom error type used by [`WaitOnRelayChainBlock`].
#[derive(Debug, derive_more::Display)]
pub enum Error {
#[display(
fmt = "Timeout while waiting for relay-chain block `{}` to be imported.",
_0
)]
#[display(fmt = "Timeout while waiting for relay-chain block `{}` to be imported.", _0)]
Timeout(PHash),
#[display(
fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.",
@@ -73,20 +70,14 @@ pub struct WaitOnRelayChainBlock<B, BCE> {
impl<B, BCE> Clone for WaitOnRelayChainBlock<B, BCE> {
fn clone(&self) -> Self {
Self {
backend: self.backend.clone(),
block_chain_events: self.block_chain_events.clone(),
}
Self { backend: self.backend.clone(), block_chain_events: self.block_chain_events.clone() }
}
}
impl<B, BCE> WaitOnRelayChainBlock<B, BCE> {
/// Creates a new instance of `Self`.
pub fn new(backend: Arc<B>, block_chain_events: Arc<BCE>) -> Self {
Self {
backend,
block_chain_events,
}
Self { backend, block_chain_events }
}
}
@@ -103,11 +94,9 @@ where
) -> impl Future<Output = Result<(), Error>> {
let _lock = self.backend.get_import_lock().read();
match self.backend.blockchain().status(BlockId::Hash(hash)) {
Ok(BlockStatus::InChain) => {
return ready(Ok(())).boxed();
}
Ok(BlockStatus::InChain) => return ready(Ok(())).boxed(),
Err(err) => return ready(Err(Error::BlockchainError(hash, err))).boxed(),
_ => {}
_ => {},
}
let mut listener = self.block_chain_events.import_notification_stream();
@@ -171,10 +160,7 @@ mod tests {
block_on(async move {
// Should be ready on the first poll
assert!(matches!(
poll!(wait.wait_on_relay_chain_block(hash)),
Poll::Ready(Ok(()))
));
assert!(matches!(poll!(wait.wait_on_relay_chain_block(hash)), Poll::Ready(Ok(()))));
});
}
@@ -191,10 +177,7 @@ mod tests {
assert!(poll!(&mut future).is_pending());
// Import the block that should fire the notification
client
.import(BlockOrigin::Own, block)
.await
.expect("Imports the block");
client.import(BlockOrigin::Own, block).await.expect("Imports the block");
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
@@ -208,10 +191,7 @@ mod tests {
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
assert!(matches!(
block_on(wait.wait_on_relay_chain_block(hash)),
Err(Error::Timeout(_))
));
assert!(matches!(block_on(wait.wait_on_relay_chain_block(hash)), Err(Error::Timeout(_))));
}
#[test]
@@ -227,9 +207,7 @@ mod tests {
);
let mut block_builder = client.init_polkadot_block_builder();
// Push an extrinsic to get a different block hash.
block_builder
.push_polkadot_extrinsic(ext)
.expect("Push extrinsic");
block_builder.push_polkadot_extrinsic(ext).expect("Push extrinsic");
let block2 = block_builder.build().expect("Build second block").block;
let hash2 = block2.hash();
@@ -243,20 +221,14 @@ mod tests {
assert!(poll!(&mut future2).is_pending());
// Import the block that should fire the notification
client
.import(BlockOrigin::Own, block2)
.await
.expect("Imports the second block");
client.import(BlockOrigin::Own, block2).await.expect("Imports the second block");
// The import notification of the second block should not make this one finish
assert!(poll!(&mut future).is_pending());
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future2), Poll::Ready(Ok(()))));
client
.import(BlockOrigin::Own, block)
.await
.expect("Imports the first block");
client.import(BlockOrigin::Own, block).await.expect("Imports the first block");
// Now it should be ready
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
@@ -39,11 +39,7 @@ pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
impl<Block: BlockT> ActiveCandidateRecovery<Block> {
pub fn new(overseer_handle: OverseerHandle) -> Self {
Self {
recoveries: Default::default(),
candidates: Default::default(),
overseer_handle,
}
Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
}
/// Recover the given `pending_candidate`.
@@ -80,14 +76,14 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
"Availability recovery failed",
);
(block_hash, None)
}
},
Err(_) => {
tracing::debug!(
target: crate::LOG_TARGET,
"Availability recovery oneshot channel closed",
);
(block_hash, None)
}
},
}
}
.boxed(),
@@ -106,7 +102,7 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
loop {
if let Some(res) = self.recoveries.next().await {
self.candidates.remove(&res.0);
return res;
return res
} else {
futures::pending!()
}
+39 -45
View File
@@ -149,12 +149,12 @@ where
error = ?e,
"Failed to decode parachain header from pending candidate",
);
return;
}
return
},
};
if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
return;
return
}
let hash = header.hash();
@@ -169,8 +169,8 @@ where
block_hash = ?hash,
"Failed to get block status",
);
return;
}
return
},
}
if self
@@ -185,7 +185,7 @@ where
)
.is_some()
{
return;
return
}
// Wait some random time, with the maximum being the slot duration of the relay chain
@@ -207,8 +207,7 @@ where
/// Handle a finalized block with the given `block_number`.
fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
self.pending_candidates
.retain(|_, pc| pc.block_number > block_number);
self.pending_candidates.retain(|_, pc| pc.block_number > block_number);
}
/// Recover the candidate for the given `block_hash`.
@@ -245,8 +244,8 @@ where
Some(data) => data,
None => {
self.clear_waiting_for_parent(block_hash);
return;
}
return
},
};
let raw_block_data = match sp_maybe_compressed_blob::decompress(
@@ -259,8 +258,8 @@ where
self.clear_waiting_for_parent(block_hash);
return;
}
return
},
};
let block_data = match ParachainBlockData::<Block>::decode(&mut &raw_block_data[..]) {
@@ -274,8 +273,8 @@ where
self.clear_waiting_for_parent(block_hash);
return;
}
return
},
};
let block = block_data.into_block();
@@ -292,11 +291,8 @@ where
"Parent is still being recovered, waiting.",
);
self.waiting_for_parent
.entry(parent)
.or_default()
.push(block);
return;
self.waiting_for_parent.entry(parent).or_default().push(block);
return
} else {
tracing::debug!(
target: "cumulus-consensus",
@@ -307,9 +303,9 @@ where
self.clear_waiting_for_parent(block_hash);
return;
return
}
}
},
Err(error) => {
tracing::debug!(
target: "cumulus-consensus",
@@ -320,8 +316,8 @@ where
self.clear_waiting_for_parent(block_hash);
return;
}
return
},
// Any other status is fine to "ignore/accept"
_ => (),
}
@@ -431,27 +427,25 @@ where
RC: ProvideRuntimeApi<PBlock> + BlockchainEvents<PBlock>,
RC::Api: ParachainHost<PBlock>,
{
relay_chain_client
.import_notification_stream()
.filter_map(move |n| {
let runtime_api = relay_chain_client.runtime_api();
let res = runtime_api
.candidate_pending_availability(&BlockId::hash(n.hash), para_id)
.and_then(|pa| {
runtime_api
.session_index_for_child(&BlockId::hash(n.hash))
.map(|v| pa.map(|pa| (pa, v)))
})
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed fetch pending candidates.",
)
})
.ok()
.flatten();
relay_chain_client.import_notification_stream().filter_map(move |n| {
let runtime_api = relay_chain_client.runtime_api();
let res = runtime_api
.candidate_pending_availability(&BlockId::hash(n.hash), para_id)
.and_then(|pa| {
runtime_api
.session_index_for_child(&BlockId::hash(n.hash))
.map(|v| pa.map(|pa| (pa, v)))
})
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed fetch pending candidates.",
)
})
.ok()
.flatten();
async move { res }
})
async move { res }
})
}
+15 -26
View File
@@ -118,19 +118,17 @@ where
_phantom: PhantomData,
});
relay_chain_full_node
.client
.execute_with(StartPoVRecovery {
para_id,
client: client.clone(),
import_queue,
task_manager,
overseer_handle: relay_chain_full_node
.overseer_handle
.clone()
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
_phantom: PhantomData,
})?;
relay_chain_full_node.client.execute_with(StartPoVRecovery {
para_id,
client: client.clone(),
import_queue,
task_manager,
overseer_handle: relay_chain_full_node
.overseer_handle
.clone()
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
_phantom: PhantomData,
})?;
cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
runtime_api: client.clone(),
@@ -239,9 +237,7 @@ where
self.announce_block,
);
self.task_manager
.spawn_essential_handle()
.spawn("cumulus-consensus", consensus);
self.task_manager.spawn_essential_handle().spawn("cumulus-consensus", consensus);
}
}
@@ -311,9 +307,7 @@ pub fn build_polkadot_full_node(
) -> Result<RFullNode<PClient>, polkadot_service::Error> {
let is_light = matches!(config.role, Role::Light);
if is_light {
Err(polkadot_service::Error::Sub(
"Light client not supported.".into(),
))
Err(polkadot_service::Error::Sub("Light client not supported.".into()))
} else {
let collator_key = CollatorPair::generate().0;
@@ -327,10 +321,7 @@ pub fn build_polkadot_full_node(
polkadot_service::RealOverseerGen,
)?;
Ok(RFullNode {
relay_chain_full_node,
collator_key,
})
Ok(RFullNode { relay_chain_full_node, collator_key })
}
}
@@ -359,9 +350,7 @@ impl<Block: BlockT> ImportQueue<Block> for SharedImportQueue<Block> {
number: NumberFor<Block>,
justifications: Justifications,
) {
self.0
.lock()
.import_justifications(who, hash, number, justifications)
self.0.lock().import_justifications(who, hash, number, justifications)
}
fn poll_actions(&mut self, cx: &mut std::task::Context, link: &mut dyn Link<Block>) {