mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 02:21:03 +00:00
Make sure we send the validator key to collators on status (#968)
Before the validator only send the keys if it was updated and thus the collators would "never" be informed about the key of the validator.
This commit is contained in:
@@ -130,7 +130,7 @@ enum BackgroundToWorkerMsg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Operations that a handle to an underlying network service should provide.
|
/// Operations that a handle to an underlying network service should provide.
|
||||||
trait NetworkServiceOps: Send + Sync {
|
pub trait NetworkServiceOps: Send + Sync {
|
||||||
/// Report the peer as having a particular positive or negative value.
|
/// Report the peer as having a particular positive or negative value.
|
||||||
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange);
|
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange);
|
||||||
|
|
||||||
@@ -193,10 +193,18 @@ impl GossipOps for RegisteredMessageValidator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// An async handle to the network service.
|
/// An async handle to the network service.
|
||||||
#[derive(Clone)]
|
pub struct Service<N = PolkadotNetworkService> {
|
||||||
pub struct Service {
|
|
||||||
sender: mpsc::Sender<ServiceToWorkerMsg>,
|
sender: mpsc::Sender<ServiceToWorkerMsg>,
|
||||||
network_service: Arc<dyn NetworkServiceOps>,
|
network_service: Arc<N>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N> Clone for Service<N> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
network_service: self.network_service.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Registers the protocol.
|
/// Registers the protocol.
|
||||||
@@ -209,7 +217,7 @@ pub fn start<C, Api, SP>(
|
|||||||
chain_context: C,
|
chain_context: C,
|
||||||
api: Arc<Api>,
|
api: Arc<Api>,
|
||||||
executor: SP,
|
executor: SP,
|
||||||
) -> Result<Service, futures::task::SpawnError> where
|
) -> Result<Service<PolkadotNetworkService>, futures::task::SpawnError> where
|
||||||
C: ChainContext + 'static,
|
C: ChainContext + 'static,
|
||||||
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||||
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||||
@@ -292,14 +300,14 @@ pub fn start<C, Api, SP>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The Polkadot protocol status message.
|
/// The Polkadot protocol status message.
|
||||||
#[derive(Debug, Encode, Decode)]
|
#[derive(Debug, Encode, Decode, PartialEq)]
|
||||||
pub struct Status {
|
pub struct Status {
|
||||||
version: u32, // protocol version.
|
version: u32, // protocol version.
|
||||||
collating_for: Option<(CollatorId, ParaId)>,
|
collating_for: Option<(CollatorId, ParaId)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Polkadot-specific messages from peer to peer.
|
/// Polkadot-specific messages from peer to peer.
|
||||||
#[derive(Debug, Encode, Decode)]
|
#[derive(Debug, Encode, Decode, PartialEq)]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
/// Exchange status with a peer. This should be the first message sent.
|
/// Exchange status with a peer. This should be the first message sent.
|
||||||
#[codec(index = "0")]
|
#[codec(index = "0")]
|
||||||
@@ -451,6 +459,11 @@ impl RecentValidatorIds {
|
|||||||
fn as_slice(&self) -> &[ValidatorId] {
|
fn as_slice(&self) -> &[ValidatorId] {
|
||||||
&*self.inner
|
&*self.inner
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the last inserted session key.
|
||||||
|
fn latest(&self) -> Option<&ValidatorId> {
|
||||||
|
self.inner.last()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ProtocolHandler {
|
struct ProtocolHandler {
|
||||||
@@ -582,7 +595,19 @@ impl ProtocolHandler {
|
|||||||
let role = self.collators
|
let role = self.collators
|
||||||
.on_new_collator(collator_id, para_id, remote.clone());
|
.on_new_collator(collator_id, para_id, remote.clone());
|
||||||
let service = &self.service;
|
let service = &self.service;
|
||||||
|
let send_key = peer.should_send_key();
|
||||||
|
|
||||||
if let Some(c_state) = peer.collator_state_mut() {
|
if let Some(c_state) = peer.collator_state_mut() {
|
||||||
|
if send_key {
|
||||||
|
if let Some(key) = self.local_keys.latest() {
|
||||||
|
c_state.send_key(key.clone(), |msg| service.write_notification(
|
||||||
|
remote.clone(),
|
||||||
|
POLKADOT_ENGINE_ID,
|
||||||
|
msg.encode(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
c_state.set_role(role, |msg| service.write_notification(
|
c_state.set_role(role, |msg| service.write_notification(
|
||||||
remote.clone(),
|
remote.clone(),
|
||||||
POLKADOT_ENGINE_ID,
|
POLKADOT_ENGINE_ID,
|
||||||
@@ -1323,7 +1348,7 @@ struct RouterInner {
|
|||||||
sender: mpsc::Sender<ServiceToWorkerMsg>,
|
sender: mpsc::Sender<ServiceToWorkerMsg>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl<N: NetworkServiceOps> Service<N> {
|
||||||
/// Register an availablility-store that the network can query.
|
/// Register an availablility-store that the network can query.
|
||||||
pub fn register_availability_store(&self, store: av_store::Store) {
|
pub fn register_availability_store(&self, store: av_store::Store) {
|
||||||
let _ = self.sender.clone()
|
let _ = self.sender.clone()
|
||||||
@@ -1373,7 +1398,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ParachainNetwork for Service {
|
impl<N> ParachainNetwork for Service<N> {
|
||||||
type Error = mpsc::SendError;
|
type Error = mpsc::SendError;
|
||||||
type TableRouter = Router;
|
type TableRouter = Router;
|
||||||
type BuildTableRouter = Pin<Box<dyn Future<Output=Result<Router,Self::Error>> + Send>>;
|
type BuildTableRouter = Pin<Box<dyn Future<Output=Result<Router,Self::Error>> + Send>>;
|
||||||
@@ -1403,7 +1428,7 @@ impl ParachainNetwork for Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Collators for Service {
|
impl<N> Collators for Service<N> {
|
||||||
type Error = future::Either<mpsc::SendError, oneshot::Canceled>;
|
type Error = future::Either<mpsc::SendError, oneshot::Canceled>;
|
||||||
type Collation = Pin<Box<dyn Future<Output = Result<Collation, Self::Error>> + Send>>;
|
type Collation = Pin<Box<dyn Future<Output = Result<Collation, Self::Error>> + Send>>;
|
||||||
|
|
||||||
@@ -1425,7 +1450,7 @@ impl Collators for Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl av_store::ErasureNetworking for Service {
|
impl<N> av_store::ErasureNetworking for Service<N> {
|
||||||
type Error = future::Either<mpsc::SendError, oneshot::Canceled>;
|
type Error = future::Either<mpsc::SendError, oneshot::Canceled>;
|
||||||
|
|
||||||
fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32)
|
fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32)
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ use futures::executor::LocalPool;
|
|||||||
use futures::task::LocalSpawnExt;
|
use futures::task::LocalSpawnExt;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct MockNetworkOps {
|
pub struct MockNetworkOps {
|
||||||
recorded: Mutex<Recorded>,
|
recorded: Mutex<Recorded>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -188,7 +188,7 @@ sp_api::mock_impl_runtime_apis! {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl super::Service {
|
impl super::Service<MockNetworkOps> {
|
||||||
async fn connect_peer(&mut self, peer: PeerId, roles: Roles) {
|
async fn connect_peer(&mut self, peer: PeerId, roles: Roles) {
|
||||||
self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap();
|
self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap();
|
||||||
}
|
}
|
||||||
@@ -222,7 +222,7 @@ impl super::Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn test_setup(config: Config) -> (
|
fn test_setup(config: Config) -> (
|
||||||
Service,
|
Service<MockNetworkOps>,
|
||||||
MockGossip,
|
MockGossip,
|
||||||
LocalPool,
|
LocalPool,
|
||||||
impl Future<Output = ()> + 'static,
|
impl Future<Output = ()> + 'static,
|
||||||
@@ -264,7 +264,7 @@ fn worker_task_shuts_down_when_sender_dropped() {
|
|||||||
/// is handled. This helper functions checks multiple times that the given instance is dropped. Even
|
/// is handled. This helper functions checks multiple times that the given instance is dropped. Even
|
||||||
/// if the first round fails, the second one should be successful as the consensus instance drop
|
/// if the first round fails, the second one should be successful as the consensus instance drop
|
||||||
/// should be already handled this time.
|
/// should be already handled this time.
|
||||||
fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) {
|
fn wait_for_instance_drop(service: &mut Service<MockNetworkOps>, pool: &mut LocalPool, instance: Hash) {
|
||||||
let mut try_counter = 0;
|
let mut try_counter = 0;
|
||||||
let max_tries = 3;
|
let max_tries = 3;
|
||||||
|
|
||||||
@@ -363,7 +363,6 @@ fn collation_is_received_with_dropped_router() {
|
|||||||
})));
|
})));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn validator_peer_cleaned_up() {
|
fn validator_peer_cleaned_up() {
|
||||||
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
|
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
|
||||||
@@ -575,3 +574,32 @@ fn fetches_pov_block_from_gossip() {
|
|||||||
|
|
||||||
pool.run_until(test_work).unwrap();
|
pool.run_until(test_work).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn validator_sends_key_to_collator_on_status() {
|
||||||
|
let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
|
||||||
|
|
||||||
|
let peer = PeerId::random();
|
||||||
|
let peer_clone = peer.clone();
|
||||||
|
let validator_key = Sr25519Keyring::Alice.pair();
|
||||||
|
let validator_id = ValidatorId::from(validator_key.public());
|
||||||
|
let validator_id_clone = validator_id.clone();
|
||||||
|
let collator_id = CollatorId::from(Sr25519Keyring::Bob.public());
|
||||||
|
let para_id = ParaId::from(100);
|
||||||
|
let mut service_clone = service.clone();
|
||||||
|
|
||||||
|
pool.spawner().spawn_local(worker_task).unwrap();
|
||||||
|
pool.run_until(async move {
|
||||||
|
service_clone.synchronize(move |proto| { proto.local_keys.insert(validator_id_clone); }).await;
|
||||||
|
service_clone.connect_peer(peer_clone.clone(), Roles::AUTHORITY).await;
|
||||||
|
service_clone.peer_message(peer_clone.clone(), Message::Status(Status {
|
||||||
|
version: VERSION,
|
||||||
|
collating_for: Some((collator_id, para_id)),
|
||||||
|
})).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let expected_msg = Message::ValidatorId(validator_id.clone());
|
||||||
|
assert!(service.network_service.recorded.lock().notifications.iter().any(|(p, notification)| {
|
||||||
|
peer == *p && *notification == expected_msg
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ use std::time::Duration;
|
|||||||
use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance};
|
use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance};
|
||||||
#[cfg(feature = "full-node")]
|
#[cfg(feature = "full-node")]
|
||||||
use polkadot_network::{legacy::gossip::Known, protocol as network_protocol};
|
use polkadot_network::{legacy::gossip::Known, protocol as network_protocol};
|
||||||
use service::{error::{Error as ServiceError}, ServiceBuilder};
|
use service::{error::Error as ServiceError, ServiceBuilder};
|
||||||
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
|
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
|
||||||
use inherents::InherentDataProviders;
|
use inherents::InherentDataProviders;
|
||||||
use sc_executor::native_executor_instance;
|
use sc_executor::native_executor_instance;
|
||||||
@@ -103,11 +103,9 @@ where
|
|||||||
<Self as sp_api::ApiExt<Block>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
|
<Self as sp_api::ApiExt<Block>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
|
||||||
{}
|
{}
|
||||||
|
|
||||||
pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static
|
pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static {}
|
||||||
{}
|
|
||||||
|
|
||||||
impl<E> RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static
|
impl<E> RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static {}
|
||||||
{}
|
|
||||||
|
|
||||||
/// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network.
|
/// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network.
|
||||||
pub trait IsKusama {
|
pub trait IsKusama {
|
||||||
|
|||||||
Reference in New Issue
Block a user