mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 22:11:06 +00:00
ValidationNetwork expose more functionality (#301)
* Expose `collator_id_to_peer_id` * `ValidationNetwork` expose `checked_statements` * Style nit
This commit is contained in:
@@ -216,6 +216,11 @@ impl CollatorPool {
|
|||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
|
self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert the given `CollatorId` to a `PeerId`.
|
||||||
|
pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
|
||||||
|
self.collators.get(collator_id).map(|ids| &ids.1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -460,6 +460,11 @@ impl PolkadotProtocol {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert the given `CollatorId` to a `PeerId`.
|
||||||
|
pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
|
||||||
|
self.collators.collator_id_to_peer_id(collator_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Specialization<Block> for PolkadotProtocol {
|
impl Specialization<Block> for PolkadotProtocol {
|
||||||
|
|||||||
@@ -28,8 +28,8 @@ use polkadot_validation::{
|
|||||||
SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated
|
SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated
|
||||||
};
|
};
|
||||||
use polkadot_primitives::{Block, Hash};
|
use polkadot_primitives::{Block, Hash};
|
||||||
use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost,
|
use polkadot_primitives::parachain::{
|
||||||
ValidatorIndex, Collation, PoVBlock,
|
Extrinsic, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock,
|
||||||
};
|
};
|
||||||
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement};
|
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement};
|
||||||
|
|
||||||
@@ -51,6 +51,23 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
|
|||||||
BlakeTwo256::hash(&v[..])
|
BlakeTwo256::hash(&v[..])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a `Stream` of checked statements.
|
||||||
|
///
|
||||||
|
/// The returned stream will not terminate, so it is required to make sure that the stream is
|
||||||
|
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
|
||||||
|
/// infinitely.
|
||||||
|
pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
|
||||||
|
impl Stream<Item=SignedStatement, Error=()> {
|
||||||
|
// spin up a task in the background that processes all incoming statements
|
||||||
|
// validation has been done already by the gossip validator.
|
||||||
|
// this will block internally until the gossip messages stream is obtained.
|
||||||
|
network.gossip_messages_for(topic)
|
||||||
|
.filter_map(|msg| match msg.0 {
|
||||||
|
GossipMessage::Statement(s) => Some(s.signed_statement),
|
||||||
|
_ => None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Table routing implementation.
|
/// Table routing implementation.
|
||||||
pub struct Router<P, E, N: NetworkService, T> {
|
pub struct Router<P, E, N: NetworkService, T> {
|
||||||
table: Arc<SharedTable>,
|
table: Arc<SharedTable>,
|
||||||
@@ -76,21 +93,14 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a future of checked messages. These should be imported into the router
|
/// Return a `Stream` of checked messages. These should be imported into the router
|
||||||
/// with `import_statement`.
|
/// with `import_statement`.
|
||||||
///
|
///
|
||||||
/// The returned stream will not terminate, so it is required to make sure that the stream is
|
/// The returned stream will not terminate, so it is required to make sure that the stream is
|
||||||
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
|
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
|
||||||
/// infinitely.
|
/// infinitely.
|
||||||
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> {
|
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> {
|
||||||
// spin up a task in the background that processes all incoming statements
|
checked_statements(&**self.network(), self.attestation_topic)
|
||||||
// validation has been done already by the gossip validator.
|
|
||||||
// this will block internally until the gossip messages stream is obtained.
|
|
||||||
self.network().gossip_messages_for(self.attestation_topic)
|
|
||||||
.filter_map(|msg| match msg.0 {
|
|
||||||
GossipMessage::Statement(s) => Some(s.signed_statement),
|
|
||||||
_ => None
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parent_hash(&self) -> Hash {
|
fn parent_hash(&self) -> Hash {
|
||||||
@@ -107,7 +117,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
|
|||||||
Router {
|
Router {
|
||||||
table: self.table.clone(),
|
table: self.table.clone(),
|
||||||
fetcher: self.fetcher.clone(),
|
fetcher: self.fetcher.clone(),
|
||||||
attestation_topic: self.attestation_topic.clone(),
|
attestation_topic: self.attestation_topic,
|
||||||
deferred_statements: self.deferred_statements.clone(),
|
deferred_statements: self.deferred_statements.clone(),
|
||||||
message_validator: self.message_validator.clone(),
|
message_validator: self.message_validator.clone(),
|
||||||
}
|
}
|
||||||
@@ -177,7 +187,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
|
|||||||
let table = self.table.clone();
|
let table = self.table.clone();
|
||||||
let network = self.network().clone();
|
let network = self.network().clone();
|
||||||
let knowledge = self.fetcher.knowledge().clone();
|
let knowledge = self.fetcher.knowledge().clone();
|
||||||
let attestation_topic = self.attestation_topic.clone();
|
let attestation_topic = self.attestation_topic;
|
||||||
let parent_hash = self.parent_hash();
|
let parent_hash = self.parent_hash();
|
||||||
|
|
||||||
producer.prime(self.fetcher.api().clone())
|
producer.prime(self.fetcher.api().clone())
|
||||||
@@ -232,7 +242,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
|
|||||||
|
|
||||||
impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
|
impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let parent_hash = self.parent_hash().clone();
|
let parent_hash = self.parent_hash();
|
||||||
self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
|
self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,9 @@ use substrate_network::{PeerId, Context as NetContext};
|
|||||||
use substrate_network::consensus_gossip::{
|
use substrate_network::consensus_gossip::{
|
||||||
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
|
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
|
||||||
};
|
};
|
||||||
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
|
use polkadot_validation::{
|
||||||
|
Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement,
|
||||||
|
};
|
||||||
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
|
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
|
||||||
use polkadot_primitives::parachain::{
|
use polkadot_primitives::parachain::{
|
||||||
Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId,
|
Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId,
|
||||||
@@ -286,6 +288,26 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
|
|||||||
|
|
||||||
rx
|
rx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert the given `CollatorId` to a `PeerId`.
|
||||||
|
pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
|
||||||
|
impl Future<Item=Option<PeerId>, Error=()> + Send
|
||||||
|
{
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
self.network.with_spec(move |spec, _| {
|
||||||
|
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned());
|
||||||
|
});
|
||||||
|
recv.map_err(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a `Stream` of checked statements for the given `relay_parent`.
|
||||||
|
///
|
||||||
|
/// The returned stream will not terminate, so it is required to make sure that the stream is
|
||||||
|
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
|
||||||
|
/// infinitely.
|
||||||
|
pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream<Item=SignedStatement, Error=()> {
|
||||||
|
crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A long-lived network which can create parachain statement routing processes on demand.
|
/// A long-lived network which can create parachain statement routing processes on demand.
|
||||||
@@ -305,7 +327,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
|
|||||||
table: Arc<SharedTable>,
|
table: Arc<SharedTable>,
|
||||||
authorities: &[ValidatorId],
|
authorities: &[ValidatorId],
|
||||||
) -> Self::BuildTableRouter {
|
) -> Self::BuildTableRouter {
|
||||||
let parent_hash = table.consensus_parent_hash().clone();
|
let parent_hash = *table.consensus_parent_hash();
|
||||||
let local_session_key = table.session_key();
|
let local_session_key = table.session_key();
|
||||||
|
|
||||||
let build_fetcher = self.instantiate_session(SessionParams {
|
let build_fetcher = self.instantiate_session(SessionParams {
|
||||||
@@ -343,7 +365,7 @@ pub struct NetworkDown;
|
|||||||
|
|
||||||
/// A future that resolves when a collation is received.
|
/// A future that resolves when a collation is received.
|
||||||
pub struct AwaitingCollation {
|
pub struct AwaitingCollation {
|
||||||
outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>,
|
outer: futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>,
|
||||||
inner: Option<::futures::sync::oneshot::Receiver<Collation>>
|
inner: Option<::futures::sync::oneshot::Receiver<Collation>>
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -576,7 +598,7 @@ impl LiveValidationSessions {
|
|||||||
&mut self,
|
&mut self,
|
||||||
params: SessionParams,
|
params: SessionParams,
|
||||||
) -> (ValidationSession, Option<ValidatorId>) {
|
) -> (ValidationSession, Option<ValidatorId>) {
|
||||||
let parent_hash = params.parent_hash.clone();
|
let parent_hash = params.parent_hash;
|
||||||
|
|
||||||
let key = params.local_session_key.clone();
|
let key = params.local_session_key.clone();
|
||||||
let recent = &mut self.recent;
|
let recent = &mut self.recent;
|
||||||
@@ -703,7 +725,7 @@ pub struct SessionDataFetcher<P, E, N: NetworkService, T> {
|
|||||||
impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> {
|
impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> {
|
||||||
/// Get the parent hash.
|
/// Get the parent hash.
|
||||||
pub(crate) fn parent_hash(&self) -> Hash {
|
pub(crate) fn parent_hash(&self) -> Hash {
|
||||||
self.parent_hash.clone()
|
self.parent_hash
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the shared knowledge.
|
/// Get the shared knowledge.
|
||||||
@@ -738,7 +760,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E
|
|||||||
network: self.network.clone(),
|
network: self.network.clone(),
|
||||||
api: self.api.clone(),
|
api: self.api.clone(),
|
||||||
task_executor: self.task_executor.clone(),
|
task_executor: self.task_executor.clone(),
|
||||||
parent_hash: self.parent_hash.clone(),
|
parent_hash: self.parent_hash,
|
||||||
knowledge: self.knowledge.clone(),
|
knowledge: self.knowledge.clone(),
|
||||||
exit: self.exit.clone(),
|
exit: self.exit.clone(),
|
||||||
message_validator: self.message_validator.clone(),
|
message_validator: self.message_validator.clone(),
|
||||||
@@ -754,7 +776,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
|
|||||||
{
|
{
|
||||||
/// Fetch PoV block for the given candidate receipt.
|
/// Fetch PoV block for the given candidate receipt.
|
||||||
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
|
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
|
||||||
let parachain = candidate.parachain_index.clone();
|
let parachain = candidate.parachain_index;
|
||||||
let parent_hash = self.parent_hash;
|
let parent_hash = self.parent_hash;
|
||||||
|
|
||||||
let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
|
let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
|
||||||
|
|||||||
Reference in New Issue
Block a user