Unify RelayChainInterface error handling and introduce async (#909)

This commit is contained in:
Sebastian Kunert
2022-01-25 18:27:54 +01:00
committed by GitHub
parent ced952c1ce
commit dacd0aed5e
17 changed files with 535 additions and 417 deletions
+3
View File
@@ -1895,14 +1895,17 @@ dependencies = [
"async-trait",
"cumulus-primitives-core",
"derive_more",
"futures 0.3.19",
"parking_lot 0.11.2",
"polkadot-overseer",
"sc-client-api",
"sc-service",
"sp-api",
"sp-blockchain",
"sp-core",
"sp-runtime",
"sp-state-machine",
"thiserror",
]
[[package]]
+1 -1
View File
@@ -25,7 +25,7 @@ cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
futures = { version = "0.3.8", features = ["compat"] }
codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] }
tracing = "0.1.25"
async-trait = "0.1.42"
async-trait = "0.1.52"
dyn-clone = "1.0.4"
[dev-dependencies]
@@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use cumulus_relay_chain_interface::RelayChainInterface;
use async_trait::async_trait;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_blockchain::Error as ClientError;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
generic::BlockId,
@@ -29,11 +30,14 @@ use sp_runtime::{
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId, OccupiedCoreAssumption};
use codec::Decode;
use futures::{future, select, FutureExt, Stream, StreamExt};
use futures::{select, FutureExt, Stream, StreamExt};
use std::{pin::Pin, sync::Arc};
const LOG_TARGET: &str = "cumulus-consensus";
/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
/// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send;
@@ -42,17 +46,17 @@ pub trait RelaychainClient: Clone + 'static {
type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;
/// Get a stream of new best heads for the given parachain.
fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream;
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;
/// Get a stream of finalized heads for the given parachain.
fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream;
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;
/// Returns the parachain head for the given `para_id` at the given block id.
fn parachain_head_at(
async fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>>;
) -> RelayChainResult<Option<Vec<u8>>>;
}
/// Follow the finalized head of the given parachain.
@@ -66,7 +70,13 @@ where
R: RelaychainClient,
B: Backend<Block>,
{
let mut finalized_heads = relay_chain.finalized_heads(para_id);
let mut finalized_heads = match relay_chain.finalized_heads(para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};
loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
@@ -165,7 +175,14 @@ async fn follow_new_best<P, R, Block, B>(
R: RelaychainClient,
B: Backend<Block>,
{
let mut new_best_heads = relay_chain.new_best_heads(para_id).fuse();
let mut new_best_heads = match relay_chain.new_best_heads(para_id).await {
Ok(best_heads_stream) => best_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
return
},
};
let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to
@@ -368,6 +385,7 @@ where
}
}
#[async_trait]
impl<RCInterface> RelaychainClient for RCInterface
where
RCInterface: RelayChainInterface + Clone + 'static,
@@ -376,39 +394,53 @@ where
type HeadStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream {
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();
self.import_notification_stream()
let new_best_notification_stream = self
.new_best_notification_stream()
.await?
.filter_map(move |n| {
future::ready(if n.is_new_best {
relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten()
} else {
None
})
let relay_chain = relay_chain.clone();
async move {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash()), para_id)
.await
.ok()
.flatten()
}
})
.boxed()
.boxed();
Ok(new_best_notification_stream)
}
fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream {
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();
self.finality_notification_stream()
let finality_notification_stream = self
.finality_notification_stream()
.await?
.filter_map(move |n| {
future::ready(
relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten(),
)
let relay_chain = relay_chain.clone();
async move {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash()), para_id)
.await
.ok()
.flatten()
}
})
.boxed()
.boxed();
Ok(finality_notification_stream)
}
fn parachain_head_at(
async fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
) -> RelayChainResult<Option<Vec<u8>>> {
self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map(|s| s.map(|s| s.parent_head.0))
.map_err(Into::into)
}
}
+13 -6
View File
@@ -16,7 +16,9 @@
use crate::*;
use async_trait::async_trait;
use codec::Encode;
use cumulus_relay_chain_interface::RelayChainResult;
use cumulus_test_client::{
runtime::{Block, Header},
Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
@@ -26,7 +28,7 @@ use futures_timer::Delay;
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId};
use sc_client_api::UsageProvider;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_blockchain::Error as ClientError;
use sp_consensus::BlockOrigin;
use sp_runtime::generic::BlockId;
use std::{
@@ -66,12 +68,13 @@ impl Relaychain {
}
}
#[async_trait]
impl crate::parachain_consensus::RelaychainClient for Relaychain {
type Error = ClientError;
type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;
fn new_best_heads(&self, _: ParaId) -> Self::HeadStream {
async fn new_best_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
let stream = self
.inner
.lock()
@@ -80,10 +83,10 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");
Box::new(stream.map(|v| v.encode()))
Ok(Box::new(stream.map(|v| v.encode())))
}
fn finalized_heads(&self, _: ParaId) -> Self::HeadStream {
async fn finalized_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
let stream = self
.inner
.lock()
@@ -92,10 +95,14 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");
Box::new(stream.map(|v| v.encode()))
Ok(Box::new(stream.map(|v| v.encode())))
}
fn parachain_head_at(&self, _: &BlockId<PBlock>, _: ParaId) -> ClientResult<Option<Vec<u8>>> {
async fn parachain_head_at(
&self,
_: &BlockId<PBlock>,
_: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
unimplemented!("Not required for tests")
}
}
@@ -176,7 +176,7 @@ where
.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
// TODO: Fix this.
Duration::from_millis(500),
// Set the block limit to 50% of the maximum PoV size.
//
+69 -59
View File
@@ -38,11 +38,7 @@ use polkadot_primitives::v1::{
};
use codec::{Decode, DecodeAll, Encode};
use futures::{
channel::oneshot,
future::{ready, FutureExt},
Future,
};
use futures::{channel::oneshot, future::FutureExt, Future};
use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};
@@ -128,7 +124,7 @@ impl BlockAnnounceData {
/// Check the signature of the statement.
///
/// Returns an `Err(_)` if it failed.
fn check_signature<RCInterface>(
async fn check_signature<RCInterface>(
self,
relay_chain_client: &RCInterface,
) -> Result<Validation, BlockAnnounceError>
@@ -138,16 +134,16 @@ impl BlockAnnounceData {
let validator_index = self.statement.unchecked_validator_index();
let runtime_api_block_id = BlockId::Hash(self.relay_parent);
let session_index = match relay_chain_client.session_index_for_child(&runtime_api_block_id)
{
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
let session_index =
match relay_chain_client.session_index_for_child(&runtime_api_block_id).await {
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
// Check that the signer is a legit validator.
let authorities = match relay_chain_client.validators(&runtime_api_block_id) {
let authorities = match relay_chain_client.validators(&runtime_api_block_id).await {
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
@@ -222,6 +218,7 @@ impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
/// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
/// it. However, if the announcement is for a block below the tip the announcement is accepted
/// as it probably comes from a node that is currently syncing the chain.
#[derive(Clone)]
pub struct BlockAnnounceValidator<Block, RCInterface> {
phantom: PhantomData<Block>,
relay_chain_interface: RCInterface,
@@ -247,13 +244,14 @@ where
RCInterface: RelayChainInterface + Clone,
{
/// Get the included block of the given parachain in the relay chain.
fn included_block(
async fn included_block(
relay_chain_interface: &RCInterface,
block_id: &BlockId<PBlock>,
para_id: ParaId,
) -> Result<Block::Header, BoxedError> {
let validation_data = relay_chain_interface
.persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
@@ -269,56 +267,59 @@ where
}
/// Get the backed block hash of the given parachain in the relay chain.
fn backed_block_hash(
async fn backed_block_hash(
relay_chain_interface: &RCInterface,
block_id: &BlockId<PBlock>,
para_id: ParaId,
) -> Result<Option<PHash>, BoxedError> {
let candidate_receipt = relay_chain_interface
.candidate_pending_availability(block_id, para_id)
.await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
Ok(candidate_receipt.map(|cr| cr.descriptor.para_head))
}
/// Handle a block announcement with empty data (no statement) attached to it.
fn handle_empty_block_announce_data(
async fn handle_empty_block_announce_data(
&self,
header: Block::Header,
) -> impl Future<Output = Result<Validation, BoxedError>> {
) -> Result<Validation, BoxedError> {
let relay_chain_interface = self.relay_chain_interface.clone();
let para_id = self.para_id;
async move {
// Check if block is equal or higher than best (this requires a justification)
let relay_chain_best_hash = relay_chain_interface.best_block_hash();
let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
let block_number = header.number();
// Check if block is equal or higher than best (this requires a justification)
let relay_chain_best_hash = relay_chain_interface
.best_block_hash()
.await
.map_err(|e| Box::new(e) as Box<_>)?;
let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
let block_number = header.number();
let best_head =
Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id)?;
let known_best_number = best_head.number();
let backed_block =
|| Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id);
let best_head =
Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id).await?;
let known_best_number = best_head.number();
let backed_block = || async {
Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id).await
};
if best_head == header {
tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
if best_head == header {
tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
Ok(Validation::Success { is_new_best: true })
} else if Some(HeadData(header.encode()).hash()) == backed_block()? {
tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
Ok(Validation::Success { is_new_best: true })
} else if Some(HeadData(header.encode()).hash()) == backed_block().await? {
tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
Ok(Validation::Success { is_new_best: true })
} else if block_number >= known_best_number {
tracing::debug!(
Ok(Validation::Success { is_new_best: true })
} else if block_number >= known_best_number {
tracing::debug!(
target: LOG_TARGET,
"Validation failed because a justification is needed if the block at the top of the chain."
);
Ok(Validation::Failure { disconnect: false })
} else {
Ok(Validation::Success { is_new_best: false })
}
Ok(Validation::Failure { disconnect: false })
} else {
Ok(Validation::Success { is_new_best: false })
}
}
}
@@ -331,32 +332,40 @@ where
fn validate(
&mut self,
header: &Block::Header,
mut data: &[u8],
data: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
if self.relay_chain_interface.is_major_syncing() {
return ready(Ok(Validation::Success { is_new_best: false })).boxed()
}
if data.is_empty() {
return self.handle_empty_block_announce_data(header.clone()).boxed()
}
let block_announce_data = match BlockAnnounceData::decode_all(&mut data) {
Ok(r) => r,
Err(err) =>
return async move {
Err(Box::new(BlockAnnounceError(format!(
"Can not decode the `BlockAnnounceData`: {:?}",
err
))) as Box<_>)
}
.boxed(),
};
let relay_chain_interface = self.relay_chain_interface.clone();
let mut data = data.to_vec();
let header = header.clone();
let header_encoded = header.encode();
let block_announce_validator = self.clone();
async move {
let relay_chain_is_syncing = relay_chain_interface
.is_major_syncing()
.await
.map_err(|e| {
tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e)
})
.unwrap_or(false);
if relay_chain_is_syncing {
return Ok(Validation::Success { is_new_best: false })
}
if data.is_empty() {
return block_announce_validator.handle_empty_block_announce_data(header).await
}
let block_announce_data = match BlockAnnounceData::decode_all(&mut data) {
Ok(r) => r,
Err(err) =>
return Err(Box::new(BlockAnnounceError(format!(
"Can not decode the `BlockAnnounceData`: {:?}",
err
))) as Box<_>),
};
if let Err(e) = block_announce_data.validate(header_encoded) {
return Ok(e)
}
@@ -370,6 +379,7 @@ where
block_announce_data
.check_signature(&relay_chain_interface)
.await
.map_err(|e| Box::new(e) as Box<_>)
}
.boxed()
+82 -57
View File
@@ -16,15 +16,15 @@
use super::*;
use async_trait::async_trait;
use cumulus_relay_chain_interface::WaitError;
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use cumulus_relay_chain_local::{check_block_in_chain, BlockCheckStatus};
use cumulus_test_service::runtime::{Block, Hash, Header};
use futures::{executor::block_on, poll, task::Poll, FutureExt, StreamExt};
use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_primitives::v1::{
Block as PBlock, CandidateCommitments, CandidateDescriptor, CollatorPair,
CommittedCandidateReceipt, Hash as PHash, HeadData, Id as ParaId, InboundDownwardMessage,
CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt,
Hash as PHash, HeadData, Header as PHeader, Id as ParaId, InboundDownwardMessage,
InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
SigningContext, ValidationCodeHash, ValidatorId,
};
@@ -77,53 +77,60 @@ impl DummyRelayChainInterface {
#[async_trait]
impl RelayChainInterface for DummyRelayChainInterface {
fn validators(
async fn validators(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
) -> Result<Vec<ValidatorId>, sp_api::ApiError> {
) -> RelayChainResult<Vec<ValidatorId>> {
Ok(self.data.lock().validators.clone())
}
fn block_status(
async fn block_status(
&self,
block_id: cumulus_primitives_core::relay_chain::BlockId,
) -> Result<sp_blockchain::BlockStatus, sp_blockchain::Error> {
self.relay_backend.blockchain().status(block_id)
) -> RelayChainResult<sp_blockchain::BlockStatus> {
self.relay_backend
.blockchain()
.status(block_id)
.map_err(RelayChainError::BlockchainError)
}
fn best_block_hash(&self) -> PHash {
self.relay_backend.blockchain().info().best_hash
async fn best_block_hash(&self) -> RelayChainResult<PHash> {
Ok(self.relay_backend.blockchain().info().best_hash)
}
fn retrieve_dmq_contents(&self, _: ParaId, _: PHash) -> Option<Vec<InboundDownwardMessage>> {
unimplemented!("Not needed for test")
}
fn retrieve_all_inbound_hrmp_channel_contents(
async fn retrieve_dmq_contents(
&self,
_: ParaId,
_: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
Some(BTreeMap::new())
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
unimplemented!("Not needed for test")
}
fn persisted_validation_data(
async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
_: ParaId,
_: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
Ok(BTreeMap::new())
}
async fn persisted_validation_data(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: ParaId,
_: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, sp_api::ApiError> {
) -> RelayChainResult<Option<PersistedValidationData>> {
Ok(Some(PersistedValidationData {
parent_head: HeadData(default_header().encode()),
..Default::default()
}))
}
fn candidate_pending_availability(
async fn candidate_pending_availability(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, sp_api::ApiError> {
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
if self.data.lock().has_pending_availability {
Ok(Some(CommittedCandidateReceipt {
descriptor: CandidateDescriptor {
@@ -152,60 +159,58 @@ impl RelayChainInterface for DummyRelayChainInterface {
}
}
fn session_index_for_child(
async fn session_index_for_child(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
) -> Result<SessionIndex, sp_api::ApiError> {
) -> RelayChainResult<SessionIndex> {
Ok(0)
}
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
self.relay_client.import_notification_stream()
}
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
self.relay_client.finality_notification_stream()
}
fn storage_changes_notification_stream(
async fn import_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
self.relay_client
.storage_changes_notification_stream(filter_keys, child_filter_keys)
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
Ok(Box::pin(
self.relay_client
.import_notification_stream()
.map(|notification| notification.header),
))
}
fn is_major_syncing(&self) -> bool {
false
async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
Ok(Box::pin(
self.relay_client
.finality_notification_stream()
.map(|notification| notification.header),
))
}
fn overseer_handle(&self) -> Option<Handle> {
async fn is_major_syncing(&self) -> RelayChainResult<bool> {
Ok(false)
}
fn overseer_handle(&self) -> RelayChainResult<Option<Handle>> {
unimplemented!("Not needed for test")
}
fn get_storage_by_key(
async fn get_storage_by_key(
&self,
_: &polkadot_service::BlockId,
_: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error> {
) -> RelayChainResult<Option<StorageValue>> {
unimplemented!("Not needed for test")
}
fn prove_read(
async fn prove_read(
&self,
_: &polkadot_service::BlockId,
_: &Vec<Vec<u8>>,
) -> Result<Option<sc_client_api::StorageProof>, Box<dyn sp_state_machine::Error>> {
) -> RelayChainResult<sc_client_api::StorageProof> {
unimplemented!("Not needed for test")
}
async fn wait_for_block(
&self,
hash: PHash,
) -> Result<(), cumulus_relay_chain_interface::WaitError> {
async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
let mut listener = match check_block_in_chain(
self.relay_backend.clone(),
self.relay_client.clone(),
@@ -219,16 +224,32 @@ impl RelayChainInterface for DummyRelayChainInterface {
loop {
futures::select! {
_ = timeout => return Err(WaitError::Timeout(hash)),
_ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(WaitError::ImportListenerClosed(hash)),
None => return Err(RelayChainError::ImportListenerClosed(hash)),
}
}
}
}
async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let notifications_stream =
self.relay_client
.import_notification_stream()
.filter_map(|notification| async move {
if notification.is_new_best {
Some(notification.header)
} else {
None
}
});
Ok(Box::pin(notifications_stream))
}
}
fn make_validator_and_api(
@@ -274,6 +295,7 @@ async fn make_gossip_message_and_header(
.unwrap();
let session_index = relay_chain_interface
.session_index_for_child(&BlockId::Hash(relay_parent))
.await
.unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
@@ -442,9 +464,9 @@ fn check_statement_is_correctly_signed() {
assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
}
#[test]
fn check_statement_seconded() {
let (mut validator, api) = make_validator_and_api();
#[tokio::test]
async fn check_statement_seconded() {
let (mut validator, relay_chain_interface) = make_validator_and_api();
let header = default_header();
let relay_parent = H256::from_low_u64_be(1);
@@ -455,7 +477,10 @@ fn check_statement_seconded() {
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index = api.session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
let session_index = relay_chain_interface
.session_index_for_child(&BlockId::Hash(relay_parent))
.await
.unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let statement = Statement::Valid(Default::default());
+44 -24
View File
@@ -56,7 +56,7 @@ use polkadot_primitives::v1::{
};
use cumulus_primitives_core::ParachainBlockData;
use cumulus_relay_chain_interface::RelayChainInterface;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use codec::Decode;
use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
@@ -381,7 +381,14 @@ where
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
let pending_candidates =
pending_candidates(self.relay_chain_interface.clone(), self.para_id).fuse();
match pending_candidates(self.relay_chain_interface.clone(), self.para_id).await {
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
return
},
};
futures::pin_mut!(pending_candidates);
loop {
@@ -435,28 +442,41 @@ where
}
/// Returns a stream over pending candidates for the parachain corresponding to `para_id`.
fn pending_candidates(
relay_chain_client: impl RelayChainInterface,
async fn pending_candidates(
relay_chain_client: impl RelayChainInterface + Clone,
para_id: ParaId,
) -> impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)> {
relay_chain_client.import_notification_stream().filter_map(move |n| {
let res = relay_chain_client
.candidate_pending_availability(&BlockId::hash(n.hash), para_id)
.and_then(|pa| {
relay_chain_client
.session_index_for_child(&BlockId::hash(n.hash))
.map(|v| pa.map(|pa| (pa, v)))
})
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed fetch pending candidates.",
)
})
.ok()
.flatten();
) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
let import_notification_stream = relay_chain_client.import_notification_stream().await?;
async move { res }
})
let filtered_stream = import_notification_stream.filter_map(move |n| {
let client_for_closure = relay_chain_client.clone();
async move {
let block_id = BlockId::hash(n.hash());
let pending_availability_result = client_for_closure
.candidate_pending_availability(&block_id, para_id)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch pending candidates.",
)
});
let session_index_result =
client_for_closure.session_index_for_child(&block_id).await.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch session index.",
)
});
if let Ok(Some(candidate)) = pending_availability_result {
session_index_result.map(|session_index| (candidate, session_index)).ok()
} else {
None
}
}
});
Ok(filtered_stream)
}
@@ -15,7 +15,10 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.1"
parking_lot = "0.11.1"
derive_more = "0.99.2"
async-trait = "0.1.52"
thiserror = "1.0.30"
+97 -91
View File
@@ -14,136 +14,140 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use cumulus_primitives_core::{
relay_chain::{
v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage,
BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
use polkadot_overseer::Handle as OverseerHandle;
use sc_client_api::{blockchain::BlockStatus, StorageProof};
use futures::Stream;
use async_trait::async_trait;
use sp_api::ApiError;
use sp_state_machine::StorageValue;
use async_trait::async_trait;
pub type RelayChainResult<T> = Result<T, RelayChainError>;
#[derive(Debug, derive_more::Display)]
pub enum WaitError {
#[display(fmt = "Timeout while waiting for relay-chain block `{}` to be imported.", _0)]
Timeout(PHash),
#[display(
fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.",
_0
)]
#[derive(thiserror::Error, Debug)]
pub enum RelayChainError {
#[error("Error occured while calling relay chain runtime: {0:?}")]
ApiError(#[from] ApiError),
#[error("Timeout while waiting for relay-chain block `{0}` to be imported.")]
WaitTimeout(PHash),
#[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")]
ImportListenerClosed(PHash),
#[display(
fmt = "Blockchain returned an error while waiting for relay-chain block `{}` to be imported: {:?}",
_0,
_1
)]
BlockchainError(PHash, sp_blockchain::Error),
#[error("Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1:?}")]
WaitBlockchainError(PHash, sp_blockchain::Error),
#[error("Blockchain returned an error: {0:?}")]
BlockchainError(#[from] sp_blockchain::Error),
#[error("State machine error occured: {0:?}")]
StateMachineError(Box<dyn sp_state_machine::Error>),
#[error("Unspecified error occured: {0:?}")]
GenericError(String),
}
/// Trait that provides all necessary methods for interaction between collator and relay chain.
#[async_trait]
pub trait RelayChainInterface: Send + Sync {
/// Fetch a storage item by key.
fn get_storage_by_key(
async fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error>;
) -> RelayChainResult<Option<StorageValue>>;
/// Fetch a vector of current validators.
fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError>;
async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>>;
/// Get the status of a given block.
fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error>;
async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus>;
/// Get the hash of the current best block.
fn best_block_hash(&self) -> PHash;
async fn best_block_hash(&self) -> RelayChainResult<PHash>;
/// Returns the whole contents of the downward message queue for the parachain we are collating
/// for.
///
/// Returns `None` in case of an error.
fn retrieve_dmq_contents(
async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<Vec<InboundDownwardMessage>>;
) -> RelayChainResult<Vec<InboundDownwardMessage>>;
/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
/// collating for.
///
/// Empty channels are also included.
fn retrieve_all_inbound_hrmp_channel_contents(
async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;
/// Yields the persisted validation data for the given `ParaId` along with an assumption that
/// should be used if the para currently occupies a core.
///
/// Returns `None` if either the para is not registered or the assumption is `Freed`
/// and the para already occupies a core.
fn persisted_validation_data(
async fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
_: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, ApiError>;
) -> RelayChainResult<Option<PersistedValidationData>>;
/// Get the receipt of a candidate pending availability. This returns `Some` for any paras
/// assigned to occupied cores in `availability_cores` and `None` otherwise.
fn candidate_pending_availability(
async fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, ApiError>;
) -> RelayChainResult<Option<CommittedCandidateReceipt>>;
/// Returns the session index expected at a child of the block.
fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError>;
async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex>;
/// Get a stream of import block notifications.
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock>;
async fn import_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
/// Get a stream of new best block notifications.
async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
/// Wait for a block with a given hash in the relay chain.
///
/// This method returns immediately on error or if the block is already
/// reported to be in chain. Otherwise, it waits for the block to arrive.
async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError>;
async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()>;
/// Get a stream of finality notifications.
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock>;
/// Get a stream of storage change notifications.
fn storage_changes_notification_stream(
async fn finality_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>>;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
/// Whether the synchronization service is undergoing major sync.
/// Returns true if so.
fn is_major_syncing(&self) -> bool;
async fn is_major_syncing(&self) -> RelayChainResult<bool>;
/// Get a handle to the overseer.
fn overseer_handle(&self) -> Option<OverseerHandle>;
fn overseer_handle(&self) -> RelayChainResult<Option<OverseerHandle>>;
/// Generate a storage read proof.
fn prove_read(
async fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>>;
) -> RelayChainResult<StorageProof>;
}
#[async_trait]
@@ -151,98 +155,100 @@ impl<T> RelayChainInterface for Arc<T>
where
T: RelayChainInterface + ?Sized,
{
fn retrieve_dmq_contents(
async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<Vec<InboundDownwardMessage>> {
(**self).retrieve_dmq_contents(para_id, relay_parent)
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
(**self).retrieve_dmq_contents(para_id, relay_parent).await
}
fn retrieve_all_inbound_hrmp_channel_contents(
async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
(**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
(**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await
}
fn persisted_validation_data(
async fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, ApiError> {
(**self).persisted_validation_data(block_id, para_id, occupied_core_assumption)
) -> RelayChainResult<Option<PersistedValidationData>> {
(**self)
.persisted_validation_data(block_id, para_id, occupied_core_assumption)
.await
}
fn candidate_pending_availability(
async fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, ApiError> {
(**self).candidate_pending_availability(block_id, para_id)
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
(**self).candidate_pending_availability(block_id, para_id).await
}
fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError> {
(**self).session_index_for_child(block_id)
async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex> {
(**self).session_index_for_child(block_id).await
}
fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError> {
(**self).validators(block_id)
async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>> {
(**self).validators(block_id).await
}
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
(**self).import_notification_stream()
}
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
(**self).finality_notification_stream()
}
fn storage_changes_notification_stream(
async fn import_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
(**self).storage_changes_notification_stream(filter_keys, child_filter_keys)
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
(**self).import_notification_stream().await
}
fn best_block_hash(&self) -> PHash {
(**self).best_block_hash()
async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
(**self).finality_notification_stream().await
}
fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error> {
(**self).block_status(block_id)
async fn best_block_hash(&self) -> RelayChainResult<PHash> {
(**self).best_block_hash().await
}
fn is_major_syncing(&self) -> bool {
(**self).is_major_syncing()
async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus> {
(**self).block_status(block_id).await
}
fn overseer_handle(&self) -> Option<OverseerHandle> {
async fn is_major_syncing(&self) -> RelayChainResult<bool> {
(**self).is_major_syncing().await
}
fn overseer_handle(&self) -> RelayChainResult<Option<OverseerHandle>> {
(**self).overseer_handle()
}
fn get_storage_by_key(
async fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error> {
(**self).get_storage_by_key(block_id, key)
) -> RelayChainResult<Option<StorageValue>> {
(**self).get_storage_by_key(block_id, key).await
}
fn prove_read(
async fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>> {
(**self).prove_read(block_id, relevant_keys)
) -> RelayChainResult<StorageProof> {
(**self).prove_read(block_id, relevant_keys).await
}
async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> {
async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
(**self).wait_for_block(hash).await
}
async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
(**self).new_best_notification_stream().await
}
}
+85 -116
View File
@@ -14,19 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use std::{sync::Arc, time::Duration};
use std::{pin::Pin, sync::Arc, time::Duration};
use async_trait::async_trait;
use cumulus_primitives_core::{
relay_chain::{
v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
v2::ParachainHost,
Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage,
Block as PBlock, BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
use cumulus_relay_chain_interface::{RelayChainInterface, WaitError};
use futures::{FutureExt, StreamExt};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use polkadot_client::{ClientHandle, ExecuteWithClient, FullBackend};
use polkadot_service::{
@@ -37,12 +37,11 @@ use sc_client_api::{
StorageProof, UsageProvider,
};
use sc_telemetry::TelemetryWorkerHandle;
use sp_api::{ApiError, ProvideRuntimeApi};
use sp_api::ProvideRuntimeApi;
use sp_consensus::SyncOracle;
use sp_core::{sp_std::collections::btree_map::BTreeMap, Pair};
use sp_state_machine::{Backend as StateBackend, StorageValue};
const LOG_TARGET: &str = "relay-chain-local";
/// The timeout in seconds after that the waiting for a block should be aborted.
const TIMEOUT_IN_SECONDS: u64 = 6;
@@ -88,158 +87,117 @@ where
+ Send,
Client::Api: ParachainHost<PBlock> + BabeApi<PBlock>,
{
fn retrieve_dmq_contents(
async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<Vec<InboundDownwardMessage>> {
self.full_client
.runtime_api()
.dmq_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the downward messages.",
);
})
.ok()
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
Ok(self.full_client.runtime_api().dmq_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)?)
}
fn retrieve_all_inbound_hrmp_channel_contents(
async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
self.full_client
.runtime_api()
.inbound_hrmp_channels_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the inbound HRMP messages.",
);
})
.ok()
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
Ok(self.full_client.runtime_api().inbound_hrmp_channels_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)?)
}
fn persisted_validation_data(
async fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, ApiError> {
self.full_client.runtime_api().persisted_validation_data(
) -> RelayChainResult<Option<PersistedValidationData>> {
Ok(self.full_client.runtime_api().persisted_validation_data(
block_id,
para_id,
occupied_core_assumption,
)
)?)
}
fn candidate_pending_availability(
async fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, ApiError> {
self.full_client.runtime_api().candidate_pending_availability(block_id, para_id)
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
Ok(self
.full_client
.runtime_api()
.candidate_pending_availability(block_id, para_id)?)
}
fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError> {
self.full_client.runtime_api().session_index_for_child(block_id)
async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex> {
Ok(self.full_client.runtime_api().session_index_for_child(block_id)?)
}
fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError> {
self.full_client.runtime_api().validators(block_id)
async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>> {
Ok(self.full_client.runtime_api().validators(block_id)?)
}
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
self.full_client.import_notification_stream()
}
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
self.full_client.finality_notification_stream()
}
fn storage_changes_notification_stream(
async fn import_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
self.full_client
.storage_changes_notification_stream(filter_keys, child_filter_keys)
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let notification_stream = self
.full_client
.import_notification_stream()
.map(|notification| notification.header);
Ok(Box::pin(notification_stream))
}
fn best_block_hash(&self) -> PHash {
self.backend.blockchain().info().best_hash
async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let notification_stream = self
.full_client
.finality_notification_stream()
.map(|notification| notification.header);
Ok(Box::pin(notification_stream))
}
fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error> {
self.backend.blockchain().status(block_id)
async fn best_block_hash(&self) -> RelayChainResult<PHash> {
Ok(self.backend.blockchain().info().best_hash)
}
fn is_major_syncing(&self) -> bool {
async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus> {
Ok(self.backend.blockchain().status(block_id)?)
}
async fn is_major_syncing(&self) -> RelayChainResult<bool> {
let mut network = self.sync_oracle.lock();
network.is_major_syncing()
Ok(network.is_major_syncing())
}
fn overseer_handle(&self) -> Option<Handle> {
self.overseer_handle.clone()
fn overseer_handle(&self) -> RelayChainResult<Option<Handle>> {
Ok(self.overseer_handle.clone())
}
fn get_storage_by_key(
async fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error> {
) -> RelayChainResult<Option<StorageValue>> {
let state = self.backend.state_at(*block_id)?;
state.storage(key).map_err(sp_blockchain::Error::Storage)
state.storage(key).map_err(RelayChainError::GenericError)
}
fn prove_read(
async fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>> {
let state_backend = self
.backend
.state_at(*block_id)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?block_id,
error = ?e,
"Cannot obtain the state of the relay chain.",
);
})
.ok();
) -> RelayChainResult<StorageProof> {
let state_backend = self.backend.state_at(*block_id)?;
match state_backend {
Some(state) => sp_state_machine::prove_read(state, relevant_keys)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?block_id,
error = ?e,
"Failed to collect required relay chain state storage proof.",
);
e
})
.map(Some),
None => Ok(None),
}
sp_state_machine::prove_read(state_backend, relevant_keys)
.map_err(RelayChainError::StateMachineError)
}
/// Wait for a given relay chain block in an async way.
@@ -259,7 +217,7 @@ where
///
/// The timeout is set to 6 seconds. This should be enough time to import the block in the current
/// round and if not, the new round of the relay chain already started anyway.
async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> {
async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
let mut listener =
match check_block_in_chain(self.backend.clone(), self.full_client.clone(), hash)? {
BlockCheckStatus::InChain => return Ok(()),
@@ -270,16 +228,28 @@ where
loop {
futures::select! {
_ = timeout => return Err(WaitError::Timeout(hash)),
_ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(WaitError::ImportListenerClosed(hash)),
None => return Err(RelayChainError::ImportListenerClosed(hash)),
}
}
}
}
async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let notifications_stream =
self.full_client
.import_notification_stream()
.filter_map(|notification| async move {
notification.is_new_best.then(|| notification.header)
});
Ok(Box::pin(notifications_stream))
}
}
pub enum BlockCheckStatus {
@@ -294,16 +264,15 @@ pub fn check_block_in_chain<Client>(
backend: Arc<FullBackend>,
client: Arc<Client>,
hash: PHash,
) -> Result<BlockCheckStatus, WaitError>
) -> RelayChainResult<BlockCheckStatus>
where
Client: BlockchainEvents<PBlock>,
{
let _lock = backend.get_import_lock().read();
let block_id = BlockId::Hash(hash);
match backend.blockchain().status(block_id) {
Ok(BlockStatus::InChain) => return Ok(BlockCheckStatus::InChain),
Err(err) => return Err(WaitError::BlockchainError(hash, err)),
match backend.blockchain().status(block_id)? {
BlockStatus::InChain => return Ok(BlockCheckStatus::InChain),
_ => {},
}
@@ -495,7 +464,7 @@ mod tests {
assert!(matches!(
block_on(relay_chain_interface.wait_for_block(hash)),
Err(WaitError::Timeout(_))
Err(RelayChainError::WaitTimeout(_))
));
}
+13 -9
View File
@@ -107,10 +107,13 @@ where
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);
let overseer_handle = relay_chain_interface
.overseer_handle()
.map_err(|e| sc_service::Error::Application(Box::new(e)))?
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?;
let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
relay_chain_interface
.overseer_handle()
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
overseer_handle.clone(),
// We want that collators wait at maximum the relay chain slot duration before starting
// to recover blocks.
cumulus_client_pov_recovery::RecoveryDelay::WithMax { max: relay_chain_slot_duration },
@@ -128,9 +131,7 @@ where
runtime_api: client.clone(),
block_status,
announce_block,
overseer_handle: relay_chain_interface
.overseer_handle()
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
overseer_handle,
spawner,
para_id,
key: collator_key,
@@ -192,10 +193,13 @@ where
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);
let overseer_handle = relay_chain_interface
.overseer_handle()
.map_err(|e| sc_service::Error::Application(Box::new(e)))?
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?;
let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
relay_chain_interface
.overseer_handle()
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
overseer_handle,
// Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and
// in maximum 5 minutes before starting to recover blocks. Collators should already start
// the recovery way before full nodes try to recover a certain block and then share the
+1 -1
View File
@@ -605,7 +605,7 @@ pub mod pallet {
#[pallet::genesis_build]
impl<T: Config> GenesisBuild<T> for GenesisConfig {
fn build(&self) {
//TODO: Remove after https://github.com/paritytech/cumulus/issues/479
// TODO: Remove after https://github.com/paritytech/cumulus/issues/479
sp_io::storage::set(b":c", &[]);
}
}
@@ -436,14 +436,15 @@ pub async fn start_parachain_node(
BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
let parachain_inherent =
let relay_chain_interface = relay_chain_interface.clone();
async move {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
);
async move {
).await;
let time = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
+15 -11
View File
@@ -738,14 +738,15 @@ pub async fn start_rococo_parachain_node(
>(BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
let relay_chain_interface = relay_chain_interface.clone();
async move {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
);
async move {
).await;
let time = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
@@ -875,14 +876,15 @@ where
block_import: client.clone(),
relay_chain_interface: relay_chain_interface.clone(),
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
let parachain_inherent =
let relay_chain_interface = relay_chain_interface.clone();
async move {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
);
async move {
).await;
let parachain_inherent = parachain_inherent.ok_or_else(|| {
Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to create parachain inherent",
@@ -1157,14 +1159,15 @@ where
proposer_factory,
create_inherent_data_providers:
move |_, (relay_parent, validation_data)| {
let parachain_inherent =
let relay_chain_for_aura = relay_chain_for_aura.clone();
async move {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_for_aura,
&validation_data,
id,
);
async move {
).await;
let time =
sp_timestamp::InherentDataProvider::from_system_time();
@@ -1216,14 +1219,15 @@ where
relay_chain_interface: relay_chain_interface.clone(),
create_inherent_data_providers:
move |_, (relay_parent, validation_data)| {
let parachain_inherent =
let relay_chain_interface = relay_chain_interface.clone();
async move {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
);
async move {
).await;
let parachain_inherent =
parachain_inherent.ok_or_else(|| {
Box::<dyn std::error::Error + Send + Sync>::from(
@@ -29,7 +29,7 @@ const LOG_TARGET: &str = "parachain-inherent";
/// Collect the relevant relay chain state in form of a proof for putting it into the validation
/// data inherent.
fn collect_relay_storage_proof(
async fn collect_relay_storage_proof(
relay_chain_interface: &impl RelayChainInterface,
para_id: ParaId,
relay_parent: PHash,
@@ -42,6 +42,7 @@ fn collect_relay_storage_proof(
&relay_parent_block_id,
&relay_well_known_keys::hrmp_ingress_channel_index(para_id),
)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
@@ -70,6 +71,7 @@ fn collect_relay_storage_proof(
&relay_parent_block_id,
&relay_well_known_keys::hrmp_egress_channel_index(para_id),
)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
@@ -108,26 +110,57 @@ fn collect_relay_storage_proof(
relay_well_known_keys::hrmp_channels(HrmpChannelId { sender: para_id, recipient })
}));
relay_chain_interface.prove_read(&relay_parent_block_id, &relevant_keys).ok()?
relay_chain_interface
.prove_read(&relay_parent_block_id, &relevant_keys)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent_block_id,
error = ?e,
"Cannot obtain read proof from relay chain.",
);
})
.ok()
}
impl ParachainInherentData {
/// Create the [`ParachainInherentData`] at the given `relay_parent`.
///
/// Returns `None` if the creation failed.
pub fn create_at(
pub async fn create_at(
relay_parent: PHash,
relay_chain_interface: &impl RelayChainInterface,
validation_data: &PersistedValidationData,
para_id: ParaId,
) -> Option<ParachainInherentData> {
let relay_chain_state =
collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent)?;
collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent).await?;
let downward_messages =
relay_chain_interface.retrieve_dmq_contents(para_id, relay_parent)?;
let downward_messages = relay_chain_interface
.retrieve_dmq_contents(para_id, relay_parent)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the downward messages.",
);
})
.ok()?;
let horizontal_messages = relay_chain_interface
.retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)?;
.retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the inbound HRMP messages.",
);
})
.ok()?;
Some(ParachainInherentData {
downward_messages,
+6 -5
View File
@@ -31,9 +31,9 @@ use cumulus_client_service::{
use cumulus_primitives_core::ParaId;
use cumulus_relay_chain_local::RelayChainLocal;
use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi};
use parking_lot::Mutex;
use frame_system_rpc_runtime_api::AccountNonceApi;
use parking_lot::Mutex;
use polkadot_primitives::v1::{CollatorPair, Hash as PHash, PersistedValidationData};
use polkadot_service::ProvideRuntimeApi;
use sc_client_api::execution_extensions::ExecutionStrategies;
@@ -288,15 +288,16 @@ where
para_id,
proposer_factory,
move |_, (relay_parent, validation_data)| {
let parachain_inherent =
let relay_chain_interface = relay_chain_interface_for_closure.clone();
async move {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface_for_closure,
&relay_chain_interface,
&validation_data,
para_id,
);
).await;
async move {
let time = sp_timestamp::InherentDataProvider::from_system_time();
let parachain_inherent = parachain_inherent.ok_or_else(|| {