Wait for relay chain block import before validatiing a block announcement (#227)

* Start with something

* Whatever

* Update

* MOARE

* Make cumulus-network compile and tests work

* Update more and fixes

* More stuff

* More fixes

* Make collator build

* Make test almost work

* Remove contracts runtime

* More test work

* Make service compile

* Fix test-service

* Fix test client

* More fixes

* Fix collator test

* Fix network tests (again)

* Make everything compile, finally

* Fix tests

* Write test that should fail

* Add `WaitOnRelayChainBlock`

* Update git versions

* Make it all work

* Update logging

* Switch to provided method for pushing an extrinsic

* Try to debug CI

* Aaaa

* Only use Debug

* Updates

* Use native execution to hopefully make CI happy...
This commit is contained in:
Bastian Köcher
2020-11-23 00:21:02 +01:00
committed by GitHub
parent 9ed50e83c4
commit 63efcc49c3
17 changed files with 1310 additions and 683 deletions
+219 -175
View File
@@ -22,7 +22,9 @@
#[cfg(test)]
mod tests;
mod wait_on_relay_chain_block;
use sc_client_api::{Backend, BlockchainEvents};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{Error as ClientError, HeaderBackend};
use sp_consensus::{
@@ -32,7 +34,7 @@ use sp_consensus::{
use sp_core::traits::SpawnNamed;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
traits::{Block as BlockT, HashFor, Header as HeaderT},
};
use polkadot_node_primitives::{SignedFullStatement, Statement};
@@ -54,6 +56,10 @@ use log::{trace, warn};
use std::{marker::PhantomData, pin::Pin, sync::Arc};
use wait_on_relay_chain_block::WaitOnRelayChainBlock;
type BlockAnnounceError = Box<dyn std::error::Error + Send>;
/// Parachain specific block announce validator.
///
/// This block announce validator is required if the parachain is running
@@ -81,260 +87,298 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc};
/// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
/// it. However, if the announcement is for a block below the tip the announcement is accepted
/// as it probably comes from a node that is currently syncing the chain.
pub struct BlockAnnounceValidator<B, P> {
phantom: PhantomData<B>,
polkadot_client: Arc<P>,
pub struct BlockAnnounceValidator<Block, P, B, BCE> {
phantom: PhantomData<Block>,
relay_chain_client: Arc<P>,
relay_chain_backend: Arc<B>,
para_id: ParaId,
polkadot_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
wait_on_relay_chain_block: WaitOnRelayChainBlock<B, BCE>,
}
impl<B, P> BlockAnnounceValidator<B, P> {
impl<Block, P, B, BCE> BlockAnnounceValidator<Block, P, B, BCE> {
/// Create a new [`BlockAnnounceValidator`].
pub fn new(
polkadot_client: Arc<P>,
relay_chain_client: Arc<P>,
para_id: ParaId,
polkadot_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
relay_chain_blockchain_events: Arc<BCE>,
) -> Self {
Self {
phantom: Default::default(),
polkadot_client,
relay_chain_client,
para_id,
polkadot_sync_oracle,
relay_chain_sync_oracle,
relay_chain_backend: relay_chain_backend.clone(),
wait_on_relay_chain_block: WaitOnRelayChainBlock::new(
relay_chain_backend,
relay_chain_blockchain_events,
),
}
}
}
impl<B: BlockT, P> BlockAnnounceValidatorT<B> for BlockAnnounceValidator<B, P>
impl<Block: BlockT, P, B, BCE> BlockAnnounceValidator<Block, P, B, BCE>
where
P: ProvideRuntimeApi<PBlock> + HeaderBackend<PBlock> + 'static,
P: ProvideRuntimeApi<PBlock> + Send + Sync + 'static,
P::Api: ParachainHost<PBlock>,
B: Backend<PBlock> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
/// Handle a block announcement with empty data (no statement) attached to it.
fn handle_empty_block_announce_data(
&self,
header: Block::Header,
) -> impl Future<Output = Result<Validation, BlockAnnounceError>> {
let relay_chain_client = self.relay_chain_client.clone();
let relay_chain_backend = self.relay_chain_backend.clone();
let para_id = self.para_id;
async move {
// Check if block is equal or higher than best (this requires a justification)
let relay_chain_info = relay_chain_backend.blockchain().info();
let runtime_api_block_id = BlockId::Hash(relay_chain_info.best_hash);
let block_number = header.number();
let local_validation_data = relay_chain_client
.runtime_api()
.persisted_validation_data(
&runtime_api_block_id,
para_id,
OccupiedCoreAssumption::TimedOut,
)
.map_err(|e| Box::new(ClientError::Msg(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
Box::new(ClientError::Msg(
"Could not find parachain head in relay chain".into(),
)) as Box<_>
})?;
let parent_head = Block::Header::decode(&mut &local_validation_data.parent_head.0[..])
.map_err(|e| {
Box::new(ClientError::Msg(format!(
"Failed to decode parachain head: {:?}",
e
))) as Box<_>
})?;
let known_best_number = parent_head.number();
if block_number >= known_best_number {
trace!(
target: "cumulus-network",
"validation failed because a justification is needed if the block at the top of the chain."
);
Ok(Validation::Failure)
} else {
Ok(Validation::Success { is_new_best: false })
}
}
}
}
impl<Block: BlockT, P, B, BCE> BlockAnnounceValidatorT<Block>
for BlockAnnounceValidator<Block, P, B, BCE>
where
P: ProvideRuntimeApi<PBlock> + Send + Sync + 'static,
P::Api: ParachainHost<PBlock>,
B: Backend<PBlock> + 'static,
BCE: BlockchainEvents<PBlock> + 'static + Send + Sync,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
fn validate(
&mut self,
header: &B::Header,
header: &Block::Header,
mut data: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, Box<dyn std::error::Error + Send>>> + Send>>
{
if self.polkadot_sync_oracle.is_major_syncing() {
) -> Pin<Box<dyn Future<Output = Result<Validation, BlockAnnounceError>> + Send>> {
if self.relay_chain_sync_oracle.is_major_syncing() {
return ready(Ok(Validation::Success { is_new_best: false })).boxed();
}
let runtime_api = self.polkadot_client.runtime_api();
let polkadot_info = self.polkadot_client.info();
if data.is_empty() {
let polkadot_client = self.polkadot_client.clone();
let header = header.clone();
let para_id = self.para_id;
return async move {
// Check if block is equal or higher than best (this requires a justification)
let runtime_api_block_id = BlockId::Hash(polkadot_info.best_hash);
let block_number = header.number();
let local_validation_data = polkadot_client
.runtime_api()
.persisted_validation_data(
&runtime_api_block_id,
para_id,
OccupiedCoreAssumption::TimedOut,
)
.map_err(|e| Box::new(ClientError::Msg(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
Box::new(ClientError::Msg(
"Could not find parachain head in relay chain".into(),
)) as Box<_>
})?;
let parent_head = B::Header::decode(&mut &local_validation_data.parent_head.0[..])
.map_err(|e| {
Box::new(ClientError::Msg(format!(
"Failed to decode parachain head: {:?}",
e
))) as Box<_>
})?;
let known_best_number = parent_head.number();
if block_number >= known_best_number {
trace!(
target: "cumulus-network",
"validation failed because a justification is needed if the block at the top of the chain."
);
Ok(Validation::Failure)
} else {
Ok(Validation::Success { is_new_best: false })
}
}
.boxed();
return self
.handle_empty_block_announce_data(header.clone())
.boxed();
}
let signed_stmt = match SignedFullStatement::decode(&mut data) {
Ok(r) => r,
Err(_) => return ready(Err(Box::new(ClientError::BadJustification(
Err(_) => return ready(Err(Box::new(ClientError::Msg(
"cannot decode block announcement justification, must be a `SignedFullStatement`"
.to_string(),
.into(),
)) as Box<_>))
.boxed(),
};
// Check statement is a candidate statement.
let candidate_receipt = match signed_stmt.payload() {
Statement::Seconded(ref candidate_receipt) => candidate_receipt,
_ => {
return ready(Err(Box::new(ClientError::BadJustification(
"block announcement justification must be a `Statement::Seconded`".to_string(),
)) as Box<_>))
.boxed()
}
};
let relay_chain_client = self.relay_chain_client.clone();
let header_encoded = header.encode();
let wait_on_relay_chain_block = self.wait_on_relay_chain_block.clone();
// Check that the relay chain parent of the block is the relay chain head
let best_number = polkadot_info.best_number;
let validator_index = signed_stmt.validator_index();
let relay_parent = &candidate_receipt.descriptor.relay_parent;
async move {
// Check statement is a candidate statement.
let candidate_receipt = match signed_stmt.payload() {
Statement::Seconded(ref candidate_receipt) => candidate_receipt,
_ => {
return Err(Box::new(ClientError::Msg(
"block announcement justification must be a `Statement::Seconded`".into(),
)) as Box<_>)
}
};
match self.polkadot_client.number(*relay_parent) {
Err(err) => {
return ready(Err(Box::new(ClientError::Backend(format!(
"could not find block number for {}: {}",
relay_parent, err,
))) as Box<_>))
.boxed();
// Check the header in the candidate_receipt match header given header.
if header_encoded != candidate_receipt.commitments.head_data.0 {
return Err(Box::new(ClientError::Msg(
"block announcement header does not match the one justified".into(),
)) as Box<_>);
}
Ok(Some(x)) if x == best_number => {}
Ok(None) => {
return ready(Err(
Box::new(ClientError::UnknownBlock(relay_parent.to_string())) as Box<_>,
))
.boxed();
}
Ok(Some(_)) => {
trace!(
target: "cumulus-network",
"validation failed because the relay chain parent ({}) is not the relay chain \
head ({})",
relay_parent,
best_number,
);
return ready(Ok(Validation::Failure)).boxed();
let relay_parent = &candidate_receipt.descriptor.relay_parent;
wait_on_relay_chain_block
.wait_on_relay_chain_block(*relay_parent)
.await
.map_err(|e| Box::new(ClientError::Msg(e.to_string())) as Box<_>)?;
let runtime_api = relay_chain_client.runtime_api();
let validator_index = signed_stmt.validator_index();
let runtime_api_block_id = BlockId::Hash(*relay_parent);
let session_index = match runtime_api.session_index_for_child(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => {
return Err(Box::new(ClientError::Msg(format!("{:?}", e))) as Box<_>);
}
};
let signing_context = SigningContext {
parent_hash: *relay_parent,
session_index,
};
// Check that the signer is a legit validator.
let authorities = match runtime_api.validators(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => {
return Err(Box::new(ClientError::Msg(format!("{:?}", e))) as Box<_>);
}
};
let signer = match authorities.get(validator_index as usize) {
Some(r) => r,
None => {
return Err(Box::new(ClientError::Msg(
"block accouncement justification signer is a validator index out of bound"
.to_string(),
)) as Box<_>);
}
};
// Check statement is correctly signed.
if signed_stmt
.check_signature(&signing_context, &signer)
.is_err()
{
return Err(Box::new(ClientError::Msg(
"block announcement justification signature is invalid".to_string(),
)) as Box<_>);
}
Ok(Validation::Success { is_new_best: true })
}
let runtime_api_block_id = BlockId::Hash(*relay_parent);
let session_index = match runtime_api.session_index_for_child(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => {
return ready(Err(Box::new(ClientError::Msg(format!("{:?}", e))) as Box<_>)).boxed()
}
};
let signing_context = SigningContext {
parent_hash: *relay_parent,
session_index,
};
// Check that the signer is a legit validator.
let authorities = match runtime_api.validators(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => {
return ready(Err(Box::new(ClientError::Msg(format!("{:?}", e))) as Box<_>)).boxed()
}
};
let signer = match authorities.get(validator_index as usize) {
Some(r) => r,
None => {
return ready(Err(Box::new(ClientError::BadJustification(
"block accouncement justification signer is a validator index out of bound"
.to_string(),
)) as Box<_>))
.boxed()
}
};
// Check statement is correctly signed.
if signed_stmt
.check_signature(&signing_context, &signer)
.is_err()
{
return ready(Err(Box::new(ClientError::BadJustification(
"block announced justification signature is invalid".to_string(),
)) as Box<_>))
.boxed();
}
// Check the header in the candidate_receipt match header given header.
if header.encode() != candidate_receipt.commitments.head_data.0 {
return ready(Err(Box::new(ClientError::BadJustification(
"block announced header does not match the one justified".to_string(),
)) as Box<_>))
.boxed();
}
ready(Ok(Validation::Success { is_new_best: true })).boxed()
.boxed()
}
}
/// Build a block announce validator instance.
///
/// Returns a boxed [`BlockAnnounceValidator`].
pub fn build_block_announce_validator<B: BlockT>(
polkadot_client: polkadot_service::Client,
pub fn build_block_announce_validator<Block: BlockT, B>(
relay_chain_client: polkadot_service::Client,
para_id: ParaId,
polkadot_sync_oracle: Box<dyn SyncOracle + Send>,
) -> Box<dyn BlockAnnounceValidatorT<B> + Send> {
BlockAnnounceValidatorBuilder::new(polkadot_client, para_id, polkadot_sync_oracle).build()
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
) -> Box<dyn BlockAnnounceValidatorT<Block> + Send>
where
B: Backend<PBlock> + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
BlockAnnounceValidatorBuilder::new(
relay_chain_client,
para_id,
relay_chain_sync_oracle,
relay_chain_backend,
)
.build()
}
/// Block announce validator builder.
///
/// Builds a [`BlockAnnounceValidator`] for a parachain. As this requires
/// a concrete Polkadot client instance, the builder takes a [`polkadot_service::Client`]
/// a concrete relay chain client instance, the builder takes a [`polkadot_service::Client`]
/// that wraps this concrete instanace. By using [`polkadot_service::ExecuteWithClient`]
/// the builder gets access to this concrete instance.
struct BlockAnnounceValidatorBuilder<B> {
phantom: PhantomData<B>,
polkadot_client: polkadot_service::Client,
struct BlockAnnounceValidatorBuilder<Block, B> {
phantom: PhantomData<Block>,
relay_chain_client: polkadot_service::Client,
para_id: ParaId,
polkadot_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
}
impl<B: BlockT> BlockAnnounceValidatorBuilder<B> {
impl<Block: BlockT, B> BlockAnnounceValidatorBuilder<Block, B>
where
B: Backend<PBlock> + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
/// Create a new instance of the builder.
fn new(
polkadot_client: polkadot_service::Client,
relay_chain_client: polkadot_service::Client,
para_id: ParaId,
polkadot_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
) -> Self {
Self {
polkadot_client,
relay_chain_client,
para_id,
polkadot_sync_oracle,
relay_chain_sync_oracle,
relay_chain_backend,
phantom: PhantomData,
}
}
/// Build the block announce validator.
fn build(self) -> Box<dyn BlockAnnounceValidatorT<B> + Send> {
self.polkadot_client.clone().execute_with(self)
fn build(self) -> Box<dyn BlockAnnounceValidatorT<Block> + Send> {
self.relay_chain_client.clone().execute_with(self)
}
}
impl<B: BlockT> polkadot_service::ExecuteWithClient for BlockAnnounceValidatorBuilder<B> {
type Output = Box<dyn BlockAnnounceValidatorT<B> + Send>;
impl<Block: BlockT, B> polkadot_service::ExecuteWithClient
for BlockAnnounceValidatorBuilder<Block, B>
where
B: Backend<PBlock> + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
type Output = Box<dyn BlockAnnounceValidatorT<Block> + Send>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend:
sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
PBackend: sc_client_api::Backend<PBlock>,
PBackend: Backend<PBlock>,
PBackend::State: sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
Api: polkadot_service::RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
Box::new(BlockAnnounceValidator::new(
client,
client.clone(),
self.para_id,
self.polkadot_sync_oracle,
self.relay_chain_sync_oracle,
self.relay_chain_backend,
client,
))
}
}
+118 -136
View File
@@ -15,28 +15,38 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use cumulus_test_runtime::{Block, Header};
use futures::executor::block_on;
use cumulus_test_service::runtime::{Block, Header};
use futures::{executor::block_on, poll, task::Poll};
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, Block as PBlock, BlockNumber, CandidateCommitments, CandidateDescriptor,
CandidateEvent, CommittedCandidateReceipt, CoreState, GroupRotationInfo, Hash as PHash,
HeadData, Header as PHeader, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage,
OccupiedCoreAssumption, ParachainHost, PersistedValidationData, SessionIndex, SigningContext,
ValidationCode, ValidationData, ValidationOutputs, ValidatorId, ValidatorIndex,
HeadData, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
ParachainHost, PersistedValidationData, SessionIndex, SigningContext, ValidationCode,
ValidationData, ValidationOutputs, ValidatorId, ValidatorIndex,
};
use polkadot_test_client::{
Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend,
InitPolkadotBlockBuilder, TestClientBuilder, TestClientBuilderExt,
};
use sp_api::{ApiRef, ProvideRuntimeApi};
use sp_blockchain::{Error as ClientError, HeaderBackend};
use sp_consensus::block_validation::BlockAnnounceValidator as _;
use sp_consensus::{block_validation::BlockAnnounceValidator as _, BlockOrigin};
use sp_core::H256;
use sp_keyring::Sr25519Keyring;
use sp_keystore::{testing::KeyStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
traits::{NumberFor, Zero},
RuntimeAppPublic,
};
use sp_runtime::RuntimeAppPublic;
use std::collections::BTreeMap;
fn check_error(error: crate::BlockAnnounceError, check_error: impl Fn(&ClientError) -> bool) {
let error = *error
.downcast::<ClientError>()
.expect("Downcasts error to `ClientError`");
if !check_error(&error) {
panic!("Invalid error: {:?}", error);
}
}
#[derive(Clone)]
struct DummyCollatorNetwork;
@@ -50,7 +60,10 @@ impl SyncOracle for DummyCollatorNetwork {
}
}
fn make_validator_and_api() -> (BlockAnnounceValidator<Block, TestApi>, Arc<TestApi>) {
fn make_validator_and_api() -> (
BlockAnnounceValidator<Block, TestApi, PBackend, PClient>,
Arc<TestApi>,
) {
let api = Arc::new(TestApi::new());
(
@@ -58,6 +71,8 @@ fn make_validator_and_api() -> (BlockAnnounceValidator<Block, TestApi>, Arc<Test
api.clone(),
ParaId::from(56),
Box::new(DummyCollatorNetwork),
api.relay_backend.clone(),
api.relay_client.clone(),
),
api,
)
@@ -73,7 +88,22 @@ fn default_header() -> Header {
}
}
fn make_gossip_message_and_header(
/// Same as [`make_gossip_message_and_header`], but using the genesis header as relay parent.
async fn make_gossip_message_and_header_using_genesis(
api: Arc<TestApi>,
validator_index: u32,
) -> (SignedFullStatement, Header) {
let relay_parent = api
.relay_client
.hash(0)
.ok()
.flatten()
.expect("Genesis hash exists");
make_gossip_message_and_header(api, relay_parent, validator_index).await
}
async fn make_gossip_message_and_header(
api: Arc<TestApi>,
relay_parent: H256,
validator_index: u32,
@@ -106,13 +136,14 @@ fn make_gossip_message_and_header(
},
};
let statement = Statement::Seconded(candidate_receipt);
let signed = block_on(SignedFullStatement::sign(
let signed = SignedFullStatement::sign(
&keystore,
statement,
&signing_context,
validator_index,
&alice_public.into(),
))
)
.await
.expect("Signing statement");
(signed, header)
@@ -158,65 +189,19 @@ fn check_statement_is_encoded_correctly() {
.err()
.expect("Should fail on invalid encoded statement");
assert!(matches!(
*res.downcast::<ClientError>().unwrap(),
ClientError::BadJustification(x) if x.contains("must be a `SignedFullStatement`")
));
}
#[test]
fn check_relay_parent_is_head() {
let (mut validator, api) = make_validator_and_api();
let relay_chain_leaf = H256::zero();
let (gossip_message, header) = make_gossip_message_and_header(api, relay_chain_leaf, 0);
let data = gossip_message.encode();
let res = block_on(validator.validate(&header, data.as_slice()));
assert_eq!(
res.unwrap(),
Validation::Failure,
"validation fails if the relay chain parent is not the relay chain head",
);
}
#[test]
fn check_relay_parent_actually_exists() {
let (mut validator, api) = make_validator_and_api();
let relay_parent = H256::from_low_u64_be(42);
let (signed_statement, header) = make_gossip_message_and_header(api, relay_parent, 0);
let data = signed_statement.encode();
let res = block_on(validator.validate(&header, &data))
.err()
.expect("Should fail on unknown relay parent");
assert!(matches!(
*res.downcast::<ClientError>().unwrap(),
ClientError::UnknownBlock(_)
));
}
#[test]
fn check_relay_parent_fails_if_cannot_retrieve_number() {
let (mut validator, api) = make_validator_and_api();
let relay_parent = H256::from_low_u64_be(0xdead);
let (signed_statement, header) = make_gossip_message_and_header(api, relay_parent, 0);
let data = signed_statement.encode();
let res = block_on(validator.validate(&header, &data))
.err()
.expect("Should fail when the relay chain number could not be retrieved");
assert!(matches!(
*res.downcast::<ClientError>().unwrap(),
ClientError::Backend(_)
));
check_error(res, |error| {
matches!(
error,
ClientError::Msg(x) if x.contains("must be a `SignedFullStatement`")
)
});
}
#[test]
fn check_signer_is_legit_validator() {
let (mut validator, api) = make_validator_and_api();
let relay_parent = H256::from_low_u64_be(1);
let (signed_statement, header) = make_gossip_message_and_header(api, relay_parent, 1);
let (signed_statement, header) = block_on(make_gossip_message_and_header_using_genesis(api, 1));
let data = signed_statement.encode();
let res = block_on(validator.validate(&header, &data))
@@ -225,16 +210,15 @@ fn check_signer_is_legit_validator() {
assert!(matches!(
*res.downcast::<ClientError>().unwrap(),
ClientError::BadJustification(x) if x.contains("signer is a validator")
ClientError::Msg(x) if x.contains("signer is a validator")
));
}
#[test]
fn check_statement_is_correctly_signed() {
let (mut validator, api) = make_validator_and_api();
let relay_parent = H256::from_low_u64_be(1);
let (signed_statement, header) = make_gossip_message_and_header(api, relay_parent, 0);
let (signed_statement, header) = block_on(make_gossip_message_and_header_using_genesis(api, 0));
let mut data = signed_statement.encode();
@@ -246,10 +230,12 @@ fn check_statement_is_correctly_signed() {
.err()
.expect("Validation should fail if the statement is not signed correctly");
assert!(matches!(
*res.downcast::<ClientError>().unwrap(),
ClientError::BadJustification(x) if x.contains("signature is invalid")
));
check_error(res, |error| {
matches!(
error,
ClientError::Msg(x) if x.contains("signature is invalid")
)
});
}
#[test]
@@ -290,18 +276,20 @@ fn check_statement_seconded() {
.err()
.expect("validation should fail if not seconded statement");
assert!(matches!(
*res.downcast::<ClientError>().unwrap(),
ClientError::BadJustification(x) if x.contains("must be a `Statement::Seconded`")
));
check_error(res, |error| {
matches!(
error,
ClientError::Msg(x) if x.contains("must be a `Statement::Seconded`")
)
});
}
#[test]
fn check_header_match_candidate_receipt_header() {
let (mut validator, api) = make_validator_and_api();
let relay_parent = H256::from_low_u64_be(1);
let (signed_statement, mut header) = make_gossip_message_and_header(api, relay_parent, 0);
let (signed_statement, mut header) =
block_on(make_gossip_message_and_header_using_genesis(api, 0));
let data = signed_statement.encode();
header.number = 300;
@@ -309,10 +297,49 @@ fn check_header_match_candidate_receipt_header() {
.err()
.expect("validation should fail if the header in doesn't match");
assert!(matches!(
*res.downcast::<ClientError>().unwrap(),
ClientError::BadJustification(x) if x.contains("header does not match")
));
check_error(res, |error| {
matches!(
error,
ClientError::Msg(x) if x.contains("header does not match")
)
});
}
/// Test that ensures that we postpone the block announce verification until
/// a relay chain block is imported. This is important for when we receive a
/// block announcement before we have imported the associated relay chain block
/// which can happen on slow nodes or nodes with a slow network connection.
#[test]
fn relay_parent_not_imported_when_block_announce_is_processed() {
block_on(async move {
let (mut validator, api) = make_validator_and_api();
let mut client = api.relay_client.clone();
let block = client
.init_polkadot_block_builder()
.build()
.expect("Build new block")
.block;
let (signed_statement, header) = make_gossip_message_and_header(api, block.hash(), 0).await;
let data = signed_statement.encode();
let mut validation = validator.validate(&header, &data);
// The relay chain block is not available yet, so the first poll should return
// that the future is still pending.
assert!(poll!(&mut validation).is_pending());
client
.import(BlockOrigin::Own, block)
.expect("Imports the block");
assert!(matches!(
poll!(validation),
Poll::Ready(Ok(Validation::Success { is_new_best: true }))
));
});
}
#[derive(Default)]
@@ -322,14 +349,21 @@ struct ApiData {
struct TestApi {
data: Arc<ApiData>,
relay_client: Arc<PClient>,
relay_backend: Arc<PBackend>,
}
impl TestApi {
fn new() -> Self {
let builder = TestClientBuilder::new();
let relay_backend = builder.backend();
Self {
data: Arc::new(ApiData {
validators: vec![Sr25519Keyring::Alice.public().into()],
}),
relay_client: Arc::new(builder.build()),
relay_backend,
}
}
}
@@ -416,55 +450,3 @@ sp_api::mock_impl_runtime_apis! {
}
}
}
/// Blockchain database header backend. Does not perform any validation.
impl HeaderBackend<PBlock> for TestApi {
fn header(
&self,
_id: BlockId<PBlock>,
) -> std::result::Result<Option<PHeader>, sp_blockchain::Error> {
Ok(None)
}
fn info(&self) -> sc_client_api::blockchain::Info<PBlock> {
let best_hash = H256::from_low_u64_be(1);
sc_client_api::blockchain::Info {
best_hash,
best_number: 1,
finalized_hash: Default::default(),
finalized_number: Zero::zero(),
genesis_hash: Default::default(),
number_leaves: Default::default(),
}
}
fn status(
&self,
_id: BlockId<PBlock>,
) -> std::result::Result<sc_client_api::blockchain::BlockStatus, sp_blockchain::Error> {
Ok(sc_client_api::blockchain::BlockStatus::Unknown)
}
fn number(
&self,
hash: PHash,
) -> std::result::Result<Option<NumberFor<PBlock>>, sp_blockchain::Error> {
if hash == H256::zero() {
Ok(Some(0))
} else if hash == H256::from_low_u64_be(1) {
Ok(Some(1))
} else if hash == H256::from_low_u64_be(0xdead) {
Err(sp_blockchain::Error::Backend("dead".to_string()))
} else {
Ok(None)
}
}
fn hash(
&self,
_number: NumberFor<PBlock>,
) -> std::result::Result<Option<PHash>, sp_blockchain::Error> {
Ok(None)
}
}
+264
View File
@@ -0,0 +1,264 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Provides the [`WaitOnRelayChainBlock`] type.
use futures::{future::ready, Future, FutureExt, StreamExt};
use polkadot_primitives::v1::{Block as PBlock, Hash as PHash};
use sc_client_api::{
blockchain::{self, BlockStatus, HeaderBackend},
Backend, BlockchainEvents,
};
use sp_runtime::{generic::BlockId, traits::HashFor};
use std::{sync::Arc, time::Duration};
/// The timeout in seconds after that the waiting for a block should be aborted.
const TIMEOUT_IN_SECONDS: u64 = 6;
/// Custom error type used by [`WaitOnRelayChainBlock`].
#[derive(Debug, derive_more::Display)]
pub enum Error {
#[display(
fmt = "Timeout while waiting for relay-chain block `{}` to be imported.",
_0
)]
Timeout(PHash),
#[display(
fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.",
_0
)]
ImportListenerClosed(PHash),
#[display(
fmt = "Blockchain returned an error while waiting for relay-chain block `{}` to be imported: {:?}",
_0,
_1
)]
BlockchainError(PHash, blockchain::Error),
}
/// A helper to wait for a given relay chain block in an async way.
///
/// The caller needs to pass the hash of a block it waits for and the function will return when the
/// block is available or an error occurred.
///
/// The waiting for the block is implemented as follows:
///
/// 1. Get a read lock on the import lock from the backend.
///
/// 2. Check if the block is already imported. If yes, return from the function.
///
/// 3. If the block isn't imported yet, add an import notification listener.
///
/// 4. Poll the import notification listener until the block is imported or the timeout is fired.
///
/// The timeout is set to 6 seconds. This should be enough time to import the block in the current
/// round and if not, the new round of the relay chain already started anyway.
pub struct WaitOnRelayChainBlock<B, BCE> {
block_chain_events: Arc<BCE>,
backend: Arc<B>,
}
impl<B, BCE> Clone for WaitOnRelayChainBlock<B, BCE> {
fn clone(&self) -> Self {
Self {
backend: self.backend.clone(),
block_chain_events: self.block_chain_events.clone(),
}
}
}
impl<B, BCE> WaitOnRelayChainBlock<B, BCE> {
/// Creates a new instance of `Self`.
pub fn new(backend: Arc<B>, block_chain_events: Arc<BCE>) -> Self {
Self {
backend,
block_chain_events,
}
}
}
impl<B, BCE> WaitOnRelayChainBlock<B, BCE>
where
B: Backend<PBlock>,
BCE: BlockchainEvents<PBlock>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
pub fn wait_on_relay_chain_block(
&self,
hash: PHash,
) -> impl Future<Output = Result<(), Error>> {
let _lock = self.backend.get_import_lock().read();
match self.backend.blockchain().status(BlockId::Hash(hash)) {
Ok(BlockStatus::InChain) => {
return ready(Ok(())).boxed();
}
Err(err) => return ready(Err(Error::BlockchainError(hash, err))).boxed(),
_ => {}
}
let mut listener = self.block_chain_events.import_notification_stream();
// Now it is safe to drop the lock, even when the block is now imported, it should show
// up in our registered listener.
drop(_lock);
let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
async move {
loop {
futures::select! {
_ = timeout => return Err(Error::Timeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(Error::ImportListenerClosed(hash)),
}
}
}
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_test_client::{
construct_transfer_extrinsic, BlockBuilderExt, Client, ClientBlockImportExt,
DefaultTestClientBuilderExt, ExecutionStrategy, FullBackend, InitPolkadotBlockBuilder,
TestClientBuilder, TestClientBuilderExt,
};
use sp_consensus::BlockOrigin;
use sp_runtime::traits::Block as BlockT;
use futures::{executor::block_on, poll, task::Poll};
fn build_client_backend_and_block() -> (Arc<Client>, Arc<FullBackend>, PBlock) {
let builder =
TestClientBuilder::new().set_execution_strategy(ExecutionStrategy::NativeWhenPossible);
let backend = builder.backend();
let client = Arc::new(builder.build());
let block_builder = client.init_polkadot_block_builder();
let block = block_builder.build().expect("Finalizes the block").block;
(client, backend, block)
}
#[test]
fn returns_directly_for_available_block() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
client
.import(BlockOrigin::Own, block)
.expect("Imports the block");
let wait = WaitOnRelayChainBlock::new(backend, client);
block_on(async move {
// Should be ready on the first poll
assert!(matches!(
poll!(wait.wait_on_relay_chain_block(hash)),
Poll::Ready(Ok(()))
));
});
}
#[test]
fn resolve_after_block_import_notification_was_received() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
block_on(async move {
let mut future = wait.wait_on_relay_chain_block(hash);
// As the block is not yet imported, the first poll should return `Pending`
assert!(poll!(&mut future).is_pending());
// Import the block that should fire the notification
client
.import(BlockOrigin::Own, block)
.expect("Imports the block");
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
});
}
#[test]
fn wait_for_block_time_out_when_block_is_not_imported() {
let (client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
assert!(matches!(
block_on(wait.wait_on_relay_chain_block(hash)),
Err(Error::Timeout(_))
));
}
#[test]
fn do_not_resolve_after_different_block_import_notification_was_received() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let ext = construct_transfer_extrinsic(
&*client,
sp_keyring::Sr25519Keyring::Alice,
sp_keyring::Sr25519Keyring::Bob,
1000,
);
let mut block_builder = client.init_polkadot_block_builder();
// Push an extrinsic to get a different block hash.
block_builder
.push_polkadot_extrinsic(ext)
.expect("Push extrinsic");
let block2 = block_builder.build().expect("Build second block").block;
let hash2 = block2.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
block_on(async move {
let mut future = wait.wait_on_relay_chain_block(hash);
let mut future2 = wait.wait_on_relay_chain_block(hash2);
// As the block is not yet imported, the first poll should return `Pending`
assert!(poll!(&mut future).is_pending());
assert!(poll!(&mut future2).is_pending());
// Import the block that should fire the notification
client
.import(BlockOrigin::Own, block2)
.expect("Imports the second block");
// The import notification of the second block should not make this one finish
assert!(poll!(&mut future).is_pending());
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future2), Poll::Ready(Ok(()))));
client
.import(BlockOrigin::Own, block)
.expect("Imports the first block");
// Now it should be ready
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
});
}
}