mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 23:31:07 +00:00
overseer: add a test to ensure all subsystem receive msgs (#1590)
* overseer: add a test to ensure all subsystem receive msgs * lol
This commit is contained in:
Generated
+1
@@ -4895,6 +4895,7 @@ dependencies = [
|
||||
"futures-timer 3.0.2",
|
||||
"kv-log-macro",
|
||||
"log 0.4.11",
|
||||
"polkadot-node-network-protocol",
|
||||
"polkadot-node-primitives",
|
||||
"polkadot-node-subsystem",
|
||||
"polkadot-primitives",
|
||||
|
||||
@@ -17,6 +17,7 @@ async-trait = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
polkadot-node-network-protocol = { path = "../network/protocol" }
|
||||
futures = { version = "0.3.5", features = ["thread-pool"] }
|
||||
futures-timer = "3.0.2"
|
||||
femme = "2.0.1"
|
||||
|
||||
@@ -717,6 +717,10 @@ where
|
||||
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
}
|
||||
|
||||
if let Some(ref mut s) = self.bitfield_signing_subsystem.instance {
|
||||
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
}
|
||||
|
||||
if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance {
|
||||
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
}
|
||||
@@ -733,7 +737,7 @@ where
|
||||
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
}
|
||||
|
||||
if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
|
||||
if let Some(ref mut s) = self.availability_store_subsystem.instance {
|
||||
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
}
|
||||
|
||||
@@ -817,7 +821,7 @@ where
|
||||
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
|
||||
let mut update = ActiveLeavesUpdate::default();
|
||||
|
||||
if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) {
|
||||
if let Some(parent) = block.number.checked_sub(1).and_then(|number| self.active_leaves.take(&(block.parent_hash, number))) {
|
||||
update.deactivated.push(parent.0);
|
||||
}
|
||||
|
||||
@@ -879,6 +883,10 @@ where
|
||||
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
|
||||
}
|
||||
|
||||
if let Some(ref mut s) = self.bitfield_signing_subsystem.instance {
|
||||
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
|
||||
}
|
||||
|
||||
if let Some(ref mut s) = self.provisioner_subsystem.instance {
|
||||
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
|
||||
}
|
||||
@@ -1021,10 +1029,15 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::atomic;
|
||||
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};
|
||||
|
||||
use polkadot_primitives::v1::{BlockData, PoV};
|
||||
use polkadot_subsystem::DummySubsystem;
|
||||
use polkadot_subsystem::messages::RuntimeApiRequest;
|
||||
|
||||
use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent};
|
||||
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -1492,4 +1505,206 @@ mod tests {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CounterSubsystem {
|
||||
stop_signals_received: Arc<atomic::AtomicUsize>,
|
||||
signals_received: Arc<atomic::AtomicUsize>,
|
||||
msgs_received: Arc<atomic::AtomicUsize>,
|
||||
}
|
||||
|
||||
impl CounterSubsystem {
|
||||
fn new(
|
||||
stop_signals_received: Arc<atomic::AtomicUsize>,
|
||||
signals_received: Arc<atomic::AtomicUsize>,
|
||||
msgs_received: Arc<atomic::AtomicUsize>,
|
||||
) -> Self {
|
||||
Self {
|
||||
stop_signals_received,
|
||||
signals_received,
|
||||
msgs_received,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, M> Subsystem<C> for CounterSubsystem
|
||||
where
|
||||
C: SubsystemContext<Message=M>,
|
||||
M: Send,
|
||||
{
|
||||
fn start(self, mut ctx: C) -> SpawnedSubsystem {
|
||||
SpawnedSubsystem {
|
||||
name: "counter-subsystem",
|
||||
future: Box::pin(async move {
|
||||
loop {
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
|
||||
self.stop_signals_received.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
break;
|
||||
},
|
||||
Ok(Some(FromOverseer::Signal(_))) => {
|
||||
self.signals_received.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
continue;
|
||||
},
|
||||
Ok(Some(FromOverseer::Communication { .. })) => {
|
||||
self.msgs_received.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
continue;
|
||||
},
|
||||
Err(_) => (),
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn test_candidate_validation_msg() -> CandidateValidationMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
let pov = Arc::new(PoV { block_data: BlockData(Vec::new()) });
|
||||
CandidateValidationMessage::ValidateFromChainState(Default::default(), pov, sender)
|
||||
}
|
||||
|
||||
fn test_candidate_backing_msg() -> CandidateBackingMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
CandidateBackingMessage::GetBackedCandidates(Default::default(), sender)
|
||||
}
|
||||
|
||||
fn test_candidate_selection_msg() -> CandidateSelectionMessage {
|
||||
CandidateSelectionMessage::default()
|
||||
}
|
||||
|
||||
fn test_chain_api_msg() -> ChainApiMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
ChainApiMessage::FinalizedBlockNumber(sender)
|
||||
}
|
||||
|
||||
fn test_collator_protocol_msg() -> CollatorProtocolMessage {
|
||||
CollatorProtocolMessage::CollateOn(Default::default())
|
||||
}
|
||||
|
||||
fn test_network_bridge_event<M>() -> NetworkBridgeEvent<M> {
|
||||
NetworkBridgeEvent::PeerDisconnected(PeerId::random())
|
||||
}
|
||||
|
||||
fn test_statement_distribution_msg() -> StatementDistributionMessage {
|
||||
StatementDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
|
||||
}
|
||||
|
||||
fn test_availability_distribution_msg() -> AvailabilityDistributionMessage {
|
||||
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
|
||||
}
|
||||
|
||||
fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
|
||||
BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
|
||||
}
|
||||
|
||||
fn test_provisioner_msg() -> ProvisionerMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
ProvisionerMessage::RequestInherentData(Default::default(), sender)
|
||||
}
|
||||
|
||||
fn test_pov_distribution_msg() -> PoVDistributionMessage {
|
||||
PoVDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
|
||||
}
|
||||
|
||||
fn test_runtime_api_msg() -> RuntimeApiMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
RuntimeApiMessage::Request(Default::default(), RuntimeApiRequest::Validators(sender))
|
||||
}
|
||||
|
||||
fn test_availability_store_msg() -> AvailabilityStoreMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
AvailabilityStoreMessage::QueryAvailableData(Default::default(), sender)
|
||||
}
|
||||
|
||||
fn test_network_bridge_msg() -> NetworkBridgeMessage {
|
||||
NetworkBridgeMessage::ReportPeer(PeerId::random(), ReputationChange::new(42, ""))
|
||||
}
|
||||
|
||||
// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
|
||||
#[test]
|
||||
fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
let spawner = sp_core::testing::TaskExecutor::new();
|
||||
|
||||
executor::block_on(async move {
|
||||
let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0));
|
||||
let signals_received = Arc::new(atomic::AtomicUsize::new(0));
|
||||
let msgs_received = Arc::new(atomic::AtomicUsize::new(0));
|
||||
|
||||
let subsystem = CounterSubsystem::new(
|
||||
stop_signals_received.clone(),
|
||||
signals_received.clone(),
|
||||
msgs_received.clone(),
|
||||
);
|
||||
|
||||
let all_subsystems = AllSubsystems {
|
||||
candidate_validation: subsystem.clone(),
|
||||
candidate_backing: subsystem.clone(),
|
||||
candidate_selection: subsystem.clone(),
|
||||
collator_protocol: subsystem.clone(),
|
||||
statement_distribution: subsystem.clone(),
|
||||
availability_distribution: subsystem.clone(),
|
||||
bitfield_signing: subsystem.clone(),
|
||||
bitfield_distribution: subsystem.clone(),
|
||||
provisioner: subsystem.clone(),
|
||||
pov_distribution: subsystem.clone(),
|
||||
runtime_api: subsystem.clone(),
|
||||
availability_store: subsystem.clone(),
|
||||
network_bridge: subsystem.clone(),
|
||||
chain_api: subsystem.clone(),
|
||||
};
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
spawner,
|
||||
).unwrap();
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
// send a signal to each subsystem
|
||||
handler.block_imported(BlockInfo {
|
||||
hash: Default::default(),
|
||||
parent_hash: Default::default(),
|
||||
number: Default::default(),
|
||||
}).await.unwrap();
|
||||
|
||||
// send a msg to each subsystem
|
||||
// except for BitfieldSigning as the message is not instantiable
|
||||
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await.unwrap();
|
||||
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await.unwrap();
|
||||
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await.unwrap();
|
||||
|
||||
// send a stop signal to each subsystems
|
||||
handler.stop().await.unwrap();
|
||||
|
||||
select! {
|
||||
res = overseer_fut => {
|
||||
const NUM_SUBSYSTEMS: usize = 14;
|
||||
|
||||
assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
|
||||
// x2 because of broadcast_signal on startup
|
||||
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS);
|
||||
// -1 for BitfieldSigning
|
||||
assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);
|
||||
|
||||
assert!(res.is_ok());
|
||||
},
|
||||
complete => (),
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user