Unify RelayChainInterface error handling and introduce async (#909)

This commit is contained in:
Sebastian Kunert
2022-01-25 18:27:54 +01:00
committed by GitHub
parent c70156b122
commit a9630551c2
17 changed files with 535 additions and 417 deletions
+69 -59
View File
@@ -38,11 +38,7 @@ use polkadot_primitives::v1::{
};
use codec::{Decode, DecodeAll, Encode};
use futures::{
channel::oneshot,
future::{ready, FutureExt},
Future,
};
use futures::{channel::oneshot, future::FutureExt, Future};
use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};
@@ -128,7 +124,7 @@ impl BlockAnnounceData {
/// Check the signature of the statement.
///
/// Returns an `Err(_)` if it failed.
fn check_signature<RCInterface>(
async fn check_signature<RCInterface>(
self,
relay_chain_client: &RCInterface,
) -> Result<Validation, BlockAnnounceError>
@@ -138,16 +134,16 @@ impl BlockAnnounceData {
let validator_index = self.statement.unchecked_validator_index();
let runtime_api_block_id = BlockId::Hash(self.relay_parent);
let session_index = match relay_chain_client.session_index_for_child(&runtime_api_block_id)
{
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
let session_index =
match relay_chain_client.session_index_for_child(&runtime_api_block_id).await {
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
// Check that the signer is a legit validator.
let authorities = match relay_chain_client.validators(&runtime_api_block_id) {
let authorities = match relay_chain_client.validators(&runtime_api_block_id).await {
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
@@ -222,6 +218,7 @@ impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
/// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
/// it. However, if the announcement is for a block below the tip the announcement is accepted
/// as it probably comes from a node that is currently syncing the chain.
#[derive(Clone)]
pub struct BlockAnnounceValidator<Block, RCInterface> {
phantom: PhantomData<Block>,
relay_chain_interface: RCInterface,
@@ -247,13 +244,14 @@ where
RCInterface: RelayChainInterface + Clone,
{
/// Get the included block of the given parachain in the relay chain.
fn included_block(
async fn included_block(
relay_chain_interface: &RCInterface,
block_id: &BlockId<PBlock>,
para_id: ParaId,
) -> Result<Block::Header, BoxedError> {
let validation_data = relay_chain_interface
.persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
@@ -269,56 +267,59 @@ where
}
/// Get the backed block hash of the given parachain in the relay chain.
fn backed_block_hash(
async fn backed_block_hash(
relay_chain_interface: &RCInterface,
block_id: &BlockId<PBlock>,
para_id: ParaId,
) -> Result<Option<PHash>, BoxedError> {
let candidate_receipt = relay_chain_interface
.candidate_pending_availability(block_id, para_id)
.await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
Ok(candidate_receipt.map(|cr| cr.descriptor.para_head))
}
/// Handle a block announcement with empty data (no statement) attached to it.
fn handle_empty_block_announce_data(
async fn handle_empty_block_announce_data(
&self,
header: Block::Header,
) -> impl Future<Output = Result<Validation, BoxedError>> {
) -> Result<Validation, BoxedError> {
let relay_chain_interface = self.relay_chain_interface.clone();
let para_id = self.para_id;
async move {
// Check if block is equal or higher than best (this requires a justification)
let relay_chain_best_hash = relay_chain_interface.best_block_hash();
let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
let block_number = header.number();
// Check if block is equal or higher than best (this requires a justification)
let relay_chain_best_hash = relay_chain_interface
.best_block_hash()
.await
.map_err(|e| Box::new(e) as Box<_>)?;
let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
let block_number = header.number();
let best_head =
Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id)?;
let known_best_number = best_head.number();
let backed_block =
|| Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id);
let best_head =
Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id).await?;
let known_best_number = best_head.number();
let backed_block = || async {
Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id).await
};
if best_head == header {
tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
if best_head == header {
tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
Ok(Validation::Success { is_new_best: true })
} else if Some(HeadData(header.encode()).hash()) == backed_block()? {
tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
Ok(Validation::Success { is_new_best: true })
} else if Some(HeadData(header.encode()).hash()) == backed_block().await? {
tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
Ok(Validation::Success { is_new_best: true })
} else if block_number >= known_best_number {
tracing::debug!(
Ok(Validation::Success { is_new_best: true })
} else if block_number >= known_best_number {
tracing::debug!(
target: LOG_TARGET,
"Validation failed because a justification is needed if the block at the top of the chain."
);
Ok(Validation::Failure { disconnect: false })
} else {
Ok(Validation::Success { is_new_best: false })
}
Ok(Validation::Failure { disconnect: false })
} else {
Ok(Validation::Success { is_new_best: false })
}
}
}
@@ -331,32 +332,40 @@ where
fn validate(
&mut self,
header: &Block::Header,
mut data: &[u8],
data: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
if self.relay_chain_interface.is_major_syncing() {
return ready(Ok(Validation::Success { is_new_best: false })).boxed()
}
if data.is_empty() {
return self.handle_empty_block_announce_data(header.clone()).boxed()
}
let block_announce_data = match BlockAnnounceData::decode_all(&mut data) {
Ok(r) => r,
Err(err) =>
return async move {
Err(Box::new(BlockAnnounceError(format!(
"Can not decode the `BlockAnnounceData`: {:?}",
err
))) as Box<_>)
}
.boxed(),
};
let relay_chain_interface = self.relay_chain_interface.clone();
let mut data = data.to_vec();
let header = header.clone();
let header_encoded = header.encode();
let block_announce_validator = self.clone();
async move {
let relay_chain_is_syncing = relay_chain_interface
.is_major_syncing()
.await
.map_err(|e| {
tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e)
})
.unwrap_or(false);
if relay_chain_is_syncing {
return Ok(Validation::Success { is_new_best: false })
}
if data.is_empty() {
return block_announce_validator.handle_empty_block_announce_data(header).await
}
let block_announce_data = match BlockAnnounceData::decode_all(&mut data) {
Ok(r) => r,
Err(err) =>
return Err(Box::new(BlockAnnounceError(format!(
"Can not decode the `BlockAnnounceData`: {:?}",
err
))) as Box<_>),
};
if let Err(e) = block_announce_data.validate(header_encoded) {
return Ok(e)
}
@@ -370,6 +379,7 @@ where
block_announce_data
.check_signature(&relay_chain_interface)
.await
.map_err(|e| Box::new(e) as Box<_>)
}
.boxed()
+82 -57
View File
@@ -16,15 +16,15 @@
use super::*;
use async_trait::async_trait;
use cumulus_relay_chain_interface::WaitError;
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use cumulus_relay_chain_local::{check_block_in_chain, BlockCheckStatus};
use cumulus_test_service::runtime::{Block, Hash, Header};
use futures::{executor::block_on, poll, task::Poll, FutureExt, StreamExt};
use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_primitives::v1::{
Block as PBlock, CandidateCommitments, CandidateDescriptor, CollatorPair,
CommittedCandidateReceipt, Hash as PHash, HeadData, Id as ParaId, InboundDownwardMessage,
CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt,
Hash as PHash, HeadData, Header as PHeader, Id as ParaId, InboundDownwardMessage,
InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
SigningContext, ValidationCodeHash, ValidatorId,
};
@@ -77,53 +77,60 @@ impl DummyRelayChainInterface {
#[async_trait]
impl RelayChainInterface for DummyRelayChainInterface {
fn validators(
async fn validators(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
) -> Result<Vec<ValidatorId>, sp_api::ApiError> {
) -> RelayChainResult<Vec<ValidatorId>> {
Ok(self.data.lock().validators.clone())
}
fn block_status(
async fn block_status(
&self,
block_id: cumulus_primitives_core::relay_chain::BlockId,
) -> Result<sp_blockchain::BlockStatus, sp_blockchain::Error> {
self.relay_backend.blockchain().status(block_id)
) -> RelayChainResult<sp_blockchain::BlockStatus> {
self.relay_backend
.blockchain()
.status(block_id)
.map_err(RelayChainError::BlockchainError)
}
fn best_block_hash(&self) -> PHash {
self.relay_backend.blockchain().info().best_hash
async fn best_block_hash(&self) -> RelayChainResult<PHash> {
Ok(self.relay_backend.blockchain().info().best_hash)
}
fn retrieve_dmq_contents(&self, _: ParaId, _: PHash) -> Option<Vec<InboundDownwardMessage>> {
unimplemented!("Not needed for test")
}
fn retrieve_all_inbound_hrmp_channel_contents(
async fn retrieve_dmq_contents(
&self,
_: ParaId,
_: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
Some(BTreeMap::new())
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
unimplemented!("Not needed for test")
}
fn persisted_validation_data(
async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
_: ParaId,
_: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
Ok(BTreeMap::new())
}
async fn persisted_validation_data(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: ParaId,
_: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, sp_api::ApiError> {
) -> RelayChainResult<Option<PersistedValidationData>> {
Ok(Some(PersistedValidationData {
parent_head: HeadData(default_header().encode()),
..Default::default()
}))
}
fn candidate_pending_availability(
async fn candidate_pending_availability(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, sp_api::ApiError> {
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
if self.data.lock().has_pending_availability {
Ok(Some(CommittedCandidateReceipt {
descriptor: CandidateDescriptor {
@@ -152,60 +159,58 @@ impl RelayChainInterface for DummyRelayChainInterface {
}
}
fn session_index_for_child(
async fn session_index_for_child(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
) -> Result<SessionIndex, sp_api::ApiError> {
) -> RelayChainResult<SessionIndex> {
Ok(0)
}
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
self.relay_client.import_notification_stream()
}
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
self.relay_client.finality_notification_stream()
}
fn storage_changes_notification_stream(
async fn import_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
self.relay_client
.storage_changes_notification_stream(filter_keys, child_filter_keys)
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
Ok(Box::pin(
self.relay_client
.import_notification_stream()
.map(|notification| notification.header),
))
}
fn is_major_syncing(&self) -> bool {
false
async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
Ok(Box::pin(
self.relay_client
.finality_notification_stream()
.map(|notification| notification.header),
))
}
fn overseer_handle(&self) -> Option<Handle> {
async fn is_major_syncing(&self) -> RelayChainResult<bool> {
Ok(false)
}
fn overseer_handle(&self) -> RelayChainResult<Option<Handle>> {
unimplemented!("Not needed for test")
}
fn get_storage_by_key(
async fn get_storage_by_key(
&self,
_: &polkadot_service::BlockId,
_: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error> {
) -> RelayChainResult<Option<StorageValue>> {
unimplemented!("Not needed for test")
}
fn prove_read(
async fn prove_read(
&self,
_: &polkadot_service::BlockId,
_: &Vec<Vec<u8>>,
) -> Result<Option<sc_client_api::StorageProof>, Box<dyn sp_state_machine::Error>> {
) -> RelayChainResult<sc_client_api::StorageProof> {
unimplemented!("Not needed for test")
}
async fn wait_for_block(
&self,
hash: PHash,
) -> Result<(), cumulus_relay_chain_interface::WaitError> {
async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
let mut listener = match check_block_in_chain(
self.relay_backend.clone(),
self.relay_client.clone(),
@@ -219,16 +224,32 @@ impl RelayChainInterface for DummyRelayChainInterface {
loop {
futures::select! {
_ = timeout => return Err(WaitError::Timeout(hash)),
_ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(WaitError::ImportListenerClosed(hash)),
None => return Err(RelayChainError::ImportListenerClosed(hash)),
}
}
}
}
async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let notifications_stream =
self.relay_client
.import_notification_stream()
.filter_map(|notification| async move {
if notification.is_new_best {
Some(notification.header)
} else {
None
}
});
Ok(Box::pin(notifications_stream))
}
}
fn make_validator_and_api(
@@ -274,6 +295,7 @@ async fn make_gossip_message_and_header(
.unwrap();
let session_index = relay_chain_interface
.session_index_for_child(&BlockId::Hash(relay_parent))
.await
.unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
@@ -442,9 +464,9 @@ fn check_statement_is_correctly_signed() {
assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
}
#[test]
fn check_statement_seconded() {
let (mut validator, api) = make_validator_and_api();
#[tokio::test]
async fn check_statement_seconded() {
let (mut validator, relay_chain_interface) = make_validator_and_api();
let header = default_header();
let relay_parent = H256::from_low_u64_be(1);
@@ -455,7 +477,10 @@ fn check_statement_seconded() {
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index = api.session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
let session_index = relay_chain_interface
.session_index_for_child(&BlockId::Hash(relay_parent))
.await
.unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let statement = Statement::Valid(Default::default());