Remove the RelaychainClient trait (#2068)

* Remove the `RelaychainClient` trait

It was just some historical trait that isn't really required anymore. Besides that this pr re-exports
types that are being used by the relay chain interface to make its usage easier.

* Fix warning
This commit is contained in:
Bastian Köcher
2023-01-09 09:44:06 +01:00
committed by GitHub
parent 19269c196d
commit 38cdf7c47e
6 changed files with 197 additions and 123 deletions
-1
View File
@@ -1951,7 +1951,6 @@ dependencies = [
"polkadot-node-primitives", "polkadot-node-primitives",
"polkadot-parachain", "polkadot-parachain",
"polkadot-primitives", "polkadot-primitives",
"polkadot-service",
"polkadot-test-client", "polkadot-test-client",
"portpicker", "portpicker",
"sc-cli", "sc-cli",
@@ -14,7 +14,6 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>. // along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use async_trait::async_trait;
use sc_client_api::{ use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
}; };
@@ -29,9 +28,9 @@ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};
use codec::Decode; use codec::Decode;
use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt}; use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt};
use std::{pin::Pin, sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
const LOG_TARGET: &str = "cumulus-consensus"; const LOG_TARGET: &str = "cumulus-consensus";
@@ -42,29 +41,6 @@ const LOG_TARGET: &str = "cumulus-consensus";
const RECOVERY_DELAY: RecoveryDelay = const RECOVERY_DELAY: RecoveryDelay =
RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) }; RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };
/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
/// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send;
/// A stream that yields head-data for a parachain.
type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;
/// Get a stream of new best heads for the given parachain.
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;
/// Get a stream of finalized heads for the given parachain.
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;
/// Returns the parachain head for the given `para_id` at the given block id.
async fn parachain_head_at(
&self,
at: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>>;
}
/// Follow the finalized head of the given parachain. /// Follow the finalized head of the given parachain.
/// ///
/// For every finalized block of the relay chain, it will get the included parachain header /// For every finalized block of the relay chain, it will get the included parachain header
@@ -73,10 +49,10 @@ async fn follow_finalized_head<P, Block, B, R>(para_id: ParaId, parachain: Arc<P
where where
Block: BlockT, Block: BlockT,
P: Finalizer<Block, B> + UsageProvider<Block>, P: Finalizer<Block, B> + UsageProvider<Block>,
R: RelaychainClient, R: RelayChainInterface + Clone,
B: Backend<Block>, B: Backend<Block>,
{ {
let mut finalized_heads = match relay_chain.finalized_heads(para_id).await { let finalized_heads = match finalized_heads(relay_chain, para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream, Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => { Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
@@ -84,6 +60,8 @@ where
}, },
}; };
pin_mut!(finalized_heads);
loop { loop {
let finalized_head = if let Some(h) = finalized_heads.next().await { let finalized_head = if let Some(h) = finalized_heads.next().await {
h h
@@ -152,7 +130,7 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
+ BlockBackend<Block> + BlockBackend<Block>
+ BlockchainEvents<Block>, + BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>, for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient, R: RelayChainInterface + Clone,
B: Backend<Block>, B: Backend<Block>,
{ {
let follow_new_best = follow_new_best( let follow_new_best = follow_new_best(
@@ -175,7 +153,7 @@ async fn follow_new_best<P, R, Block, B>(
parachain: Arc<P>, parachain: Arc<P>,
relay_chain: R, relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>, announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>, mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where ) where
Block: BlockT, Block: BlockT,
P: Finalizer<Block, B> P: Finalizer<Block, B>
@@ -185,10 +163,10 @@ async fn follow_new_best<P, R, Block, B>(
+ BlockBackend<Block> + BlockBackend<Block>
+ BlockchainEvents<Block>, + BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>, for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient, R: RelayChainInterface + Clone,
B: Backend<Block>, B: Backend<Block>,
{ {
let mut new_best_heads = match relay_chain.new_best_heads(para_id).await { let new_best_heads = match new_best_heads(relay_chain, para_id).await {
Ok(best_heads_stream) => best_heads_stream.fuse(), Ok(best_heads_stream) => best_heads_stream.fuse(),
Err(err) => { Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream."); tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
@@ -196,9 +174,11 @@ async fn follow_new_best<P, R, Block, B>(
}, },
}; };
pin_mut!(new_best_heads);
let mut imported_blocks = parachain.import_notification_stream().fuse(); let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain // The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to // block before the associated parachain block. In this case we need to wait for this block to
// be imported to set it as new best. // be imported to set it as new best.
let mut unset_best_header = None; let mut unset_best_header = None;
@@ -210,7 +190,7 @@ async fn follow_new_best<P, R, Block, B>(
h, h,
&*parachain, &*parachain,
&mut unset_best_header, &mut unset_best_header,
recovery_chan_tx.clone(), recovery_chan_tx.as_mut(),
).await, ).await,
None => { None => {
tracing::debug!( tracing::debug!(
@@ -304,7 +284,7 @@ async fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>, head: Vec<u8>,
parachain: &P, parachain: &P,
unset_best_header: &mut Option<Block::Header>, unset_best_header: &mut Option<Block::Header>,
mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>, mut recovery_chan_tx: Option<&mut Sender<RecoveryRequest<Block>>>,
) where ) where
Block: BlockT, Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>, P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
@@ -416,50 +396,42 @@ where
} }
} }
#[async_trait] /// Returns a stream that will yield best heads for the given `para_id`.
impl<RCInterface> RelaychainClient for RCInterface async fn new_best_heads(
where relay_chain: impl RelayChainInterface + Clone,
RCInterface: RelayChainInterface + Clone + 'static, para_id: ParaId,
{ ) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
type Error = ClientError; let new_best_notification_stream =
relay_chain.new_best_notification_stream().await?.filter_map(move |n| {
type HeadStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();
let new_best_notification_stream = self
.new_best_notification_stream()
.await?
.filter_map(move |n| {
let relay_chain = relay_chain.clone(); let relay_chain = relay_chain.clone();
async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() } async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
}) });
.boxed();
Ok(new_best_notification_stream) Ok(new_best_notification_stream)
} }
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> { /// Returns a stream that will yield finalized heads for the given `para_id`.
let relay_chain = self.clone(); async fn finalized_heads(
relay_chain: impl RelayChainInterface + Clone,
let finality_notification_stream = self para_id: ParaId,
.finality_notification_stream() ) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
.await? let finality_notification_stream =
.filter_map(move |n| { relay_chain.finality_notification_stream().await?.filter_map(move |n| {
let relay_chain = relay_chain.clone(); let relay_chain = relay_chain.clone();
async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() } async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
}) });
.boxed();
Ok(finality_notification_stream)
}
async fn parachain_head_at( Ok(finality_notification_stream)
&self, }
/// Returns head of the parachain at the given relay chain block.
async fn parachain_head_at(
relay_chain: &impl RelayChainInterface,
at: PHash, at: PHash,
para_id: ParaId, para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> { ) -> RelayChainResult<Option<Vec<u8>>> {
self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut) relay_chain
.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.await .await
.map(|s| s.map(|s| s.parent_head.0)) .map(|s| s.map(|s| s.parent_head.0))
}
} }
+123 -21
View File
@@ -19,20 +19,24 @@ use crate::*;
use async_trait::async_trait; use async_trait::async_trait;
use codec::Encode; use codec::Encode;
use cumulus_client_pov_recovery::RecoveryKind; use cumulus_client_pov_recovery::RecoveryKind;
use cumulus_relay_chain_interface::RelayChainResult; use cumulus_primitives_core::{InboundDownwardMessage, InboundHrmpMessage};
use cumulus_relay_chain_interface::{
CommittedCandidateReceipt, OccupiedCoreAssumption, OverseerHandle, PHeader, ParaId,
RelayChainInterface, RelayChainResult, SessionIndex, StorageValue, ValidatorId,
};
use cumulus_test_client::{ use cumulus_test_client::{
runtime::{Block, Header}, runtime::{Block, Header},
Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt, Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
}; };
use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt}; use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt};
use futures_timer::Delay; use futures_timer::Delay;
use polkadot_primitives::v2::Id as ParaId;
use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider}; use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::Error as ClientError;
use sp_consensus::{BlockOrigin, BlockStatus}; use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::generic::BlockId; use sp_runtime::generic::BlockId;
use std::{ use std::{
collections::{BTreeMap, HashMap},
pin::Pin,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Duration, time::Duration,
}; };
@@ -42,6 +46,7 @@ struct RelaychainInner {
finalized_heads: Option<mpsc::UnboundedReceiver<Header>>, finalized_heads: Option<mpsc::UnboundedReceiver<Header>>,
new_best_heads_sender: mpsc::UnboundedSender<Header>, new_best_heads_sender: mpsc::UnboundedSender<Header>,
finalized_heads_sender: mpsc::UnboundedSender<Header>, finalized_heads_sender: mpsc::UnboundedSender<Header>,
relay_chain_hash_to_header: HashMap<PHash, Header>,
} }
impl RelaychainInner { impl RelaychainInner {
@@ -54,6 +59,7 @@ impl RelaychainInner {
finalized_heads_sender, finalized_heads_sender,
new_best_heads: Some(new_best_heads), new_best_heads: Some(new_best_heads),
finalized_heads: Some(finalized_heads), finalized_heads: Some(finalized_heads),
relay_chain_hash_to_header: Default::default(),
} }
} }
} }
@@ -70,37 +76,133 @@ impl Relaychain {
} }
#[async_trait] #[async_trait]
impl crate::parachain_consensus::RelaychainClient for Relaychain { impl RelayChainInterface for Relaychain {
type Error = ClientError; async fn validators(&self, _: PHash) -> RelayChainResult<Vec<ValidatorId>> {
unimplemented!("Not needed for test")
}
type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>; async fn best_block_hash(&self) -> RelayChainResult<PHash> {
unimplemented!("Not needed for test")
}
async fn new_best_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> { async fn retrieve_dmq_contents(
let stream = self &self,
_: ParaId,
_: PHash,
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
unimplemented!("Not needed for test")
}
async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
_: ParaId,
_: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
unimplemented!("Not needed for test")
}
async fn persisted_validation_data(
&self,
hash: PHash,
_: ParaId,
_: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>> {
Ok(Some(PersistedValidationData {
parent_head: self
.inner .inner
.lock() .lock()
.unwrap() .unwrap()
.new_best_heads .relay_chain_hash_to_header
.take() .get(&hash)
.expect("Should only be called once"); .unwrap()
.encode()
Ok(Box::new(stream.map(|v| v.encode()))) .into(),
..Default::default()
}))
} }
async fn finalized_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> { async fn candidate_pending_availability(
let stream = self &self,
_: PHash,
_: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
unimplemented!("Not needed for test")
}
async fn session_index_for_child(&self, _: PHash) -> RelayChainResult<SessionIndex> {
unimplemented!("Not needed for test")
}
async fn import_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
unimplemented!("Not needed for test")
}
async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let inner = self.inner.clone();
Ok(self
.inner .inner
.lock() .lock()
.unwrap() .unwrap()
.finalized_heads .finalized_heads
.take() .take()
.expect("Should only be called once"); .unwrap()
.map(move |h| {
Ok(Box::new(stream.map(|v| v.encode()))) // Let's abuse the "parachain header" directly as relay chain header.
inner.lock().unwrap().relay_chain_hash_to_header.insert(h.hash(), h.clone());
h
})
.boxed())
} }
async fn parachain_head_at(&self, _: PHash, _: ParaId) -> RelayChainResult<Option<Vec<u8>>> { async fn is_major_syncing(&self) -> RelayChainResult<bool> {
unimplemented!("Not required for tests") Ok(false)
}
fn overseer_handle(&self) -> RelayChainResult<OverseerHandle> {
unimplemented!("Not needed for test")
}
async fn get_storage_by_key(
&self,
_: PHash,
_: &[u8],
) -> RelayChainResult<Option<StorageValue>> {
unimplemented!("Not needed for test")
}
async fn prove_read(
&self,
_: PHash,
_: &Vec<Vec<u8>>,
) -> RelayChainResult<sc_client_api::StorageProof> {
unimplemented!("Not needed for test")
}
async fn wait_for_block(&self, _: PHash) -> RelayChainResult<()> {
unimplemented!("Not needed for test")
}
async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let inner = self.inner.clone();
Ok(self
.inner
.lock()
.unwrap()
.new_best_heads
.take()
.unwrap()
.map(move |h| {
// Let's abuse the "parachain header" directly as relay chain header.
inner.lock().unwrap().relay_chain_hash_to_header.insert(h.hash(), h.clone());
h
})
.boxed())
} }
} }
@@ -121,7 +223,7 @@ fn build_block<B: InitBlockBuilder>(
let mut block = builder.build().unwrap().block; let mut block = builder.build().unwrap().block;
// Simulate some form of post activity (like a Seal or Other generic things). // Simulate some form of post activity (like a Seal or Other generic things).
// This is mostly used to excercise the `LevelMonitor` correct behavior. // This is mostly used to exercise the `LevelMonitor` correct behavior.
// (in practice we want that header post-hash != pre-hash) // (in practice we want that header post-hash != pre-hash)
block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3])); block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3]));
-1
View File
@@ -45,7 +45,6 @@ substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch
# Polkadot # Polkadot
polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus # Cumulus
+6 -6
View File
@@ -17,18 +17,18 @@
use super::*; use super::*;
use async_trait::async_trait; use async_trait::async_trait;
use cumulus_relay_chain_inprocess_interface::{check_block_in_chain, BlockCheckStatus}; use cumulus_relay_chain_inprocess_interface::{check_block_in_chain, BlockCheckStatus};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; use cumulus_relay_chain_interface::{
OverseerHandle, PHeader, ParaId, RelayChainError, RelayChainResult,
};
use cumulus_test_service::runtime::{Block, Hash, Header}; use cumulus_test_service::runtime::{Block, Hash, Header};
use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt}; use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt};
use parking_lot::Mutex; use parking_lot::Mutex;
use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_primitives::v2::{ use polkadot_primitives::v2::{
CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt, CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt,
Hash as PHash, HeadData, Header as PHeader, Id as ParaId, InboundDownwardMessage, Hash as PHash, HeadData, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, PersistedValidationData, SessionIndex, SigningContext, ValidationCodeHash, ValidatorId,
SigningContext, ValidationCodeHash, ValidatorId,
}; };
use polkadot_service::Handle;
use polkadot_test_client::{ use polkadot_test_client::{
Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend, Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend,
InitPolkadotBlockBuilder, TestClientBuilder, TestClientBuilderExt, InitPolkadotBlockBuilder, TestClientBuilder, TestClientBuilderExt,
@@ -174,7 +174,7 @@ impl RelayChainInterface for DummyRelayChainInterface {
Ok(false) Ok(false)
} }
fn overseer_handle(&self) -> RelayChainResult<Handle> { fn overseer_handle(&self) -> RelayChainResult<OverseerHandle> {
unimplemented!("Not needed for test") unimplemented!("Not needed for test")
} }
@@ -16,14 +16,7 @@
use std::{collections::BTreeMap, pin::Pin, sync::Arc}; use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use cumulus_primitives_core::{ use polkadot_overseer::prometheus::PrometheusError;
relay_chain::{
v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
Hash as PHash, Header as PHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
use polkadot_overseer::{prometheus::PrometheusError, Handle as OverseerHandle};
use polkadot_service::SubstrateServiceError; use polkadot_service::SubstrateServiceError;
use sc_client_api::StorageProof; use sc_client_api::StorageProof;
@@ -33,7 +26,16 @@ use async_trait::async_trait;
use jsonrpsee_core::Error as JsonRpcError; use jsonrpsee_core::Error as JsonRpcError;
use parity_scale_codec::Error as CodecError; use parity_scale_codec::Error as CodecError;
use sp_api::ApiError; use sp_api::ApiError;
use sp_state_machine::StorageValue;
pub use cumulus_primitives_core::{
relay_chain::{
v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
Hash as PHash, Header as PHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
pub use polkadot_overseer::Handle as OverseerHandle;
pub use sp_state_machine::StorageValue;
pub type RelayChainResult<T> = Result<T, RelayChainError>; pub type RelayChainResult<T> = Result<T, RelayChainError>;