mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 04:01:10 +00:00
make it easier to dbg stalls (#3351)
* make it easier to dbg * revert channel sizes * BAnon
This commit is contained in:
@@ -401,7 +401,10 @@ impl From<FinalityNotification<Block>> for BlockInfo {
|
||||
enum Event {
|
||||
BlockImported(BlockInfo),
|
||||
BlockFinalized(BlockInfo),
|
||||
MsgToSubsystem(AllMessages),
|
||||
MsgToSubsystem {
|
||||
msg: AllMessages,
|
||||
origin: &'static str,
|
||||
},
|
||||
ExternalRequest(ExternalRequest),
|
||||
Stop,
|
||||
}
|
||||
@@ -452,8 +455,16 @@ impl OverseerHandler {
|
||||
}
|
||||
|
||||
/// Send some message to one of the `Subsystem`s.
|
||||
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) {
|
||||
self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await
|
||||
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
|
||||
self.send_and_log_error(Event::MsgToSubsystem {
|
||||
msg: msg.into(),
|
||||
origin,
|
||||
}).await
|
||||
}
|
||||
|
||||
/// Same as `send_msg`, but with no origin. Used for tests.
|
||||
pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
|
||||
self.send_msg(msg, "").await
|
||||
}
|
||||
|
||||
/// Inform the `Overseer` that some block was finalized.
|
||||
@@ -801,7 +812,8 @@ pub struct OverseerSubsystemSender {
|
||||
#[async_trait::async_trait]
|
||||
impl SubsystemSender for OverseerSubsystemSender {
|
||||
async fn send_message(&mut self, msg: AllMessages) {
|
||||
self.channels.send_and_log_error(self.signals_received.load(), msg).await;
|
||||
let needed_signals = self.signals_received.load();
|
||||
self.channels.send_and_log_error(needed_signals, msg).await;
|
||||
}
|
||||
|
||||
async fn send_messages<T>(&mut self, msgs: T)
|
||||
@@ -891,12 +903,18 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
|
||||
loop {
|
||||
// If we have a message pending an overseer signal, we only poll for signals
|
||||
// in the meantime.
|
||||
let signals_received = self.signals_received.load();
|
||||
if let Some((needs_signals_received, msg)) = self.pending_incoming.take() {
|
||||
if needs_signals_received <= self.signals_received.load() {
|
||||
if needs_signals_received <= signals_received {
|
||||
return Ok(FromOverseer::Communication { msg });
|
||||
} else {
|
||||
self.pending_incoming = Some((needs_signals_received, msg));
|
||||
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
subsystem = std::any::type_name::<M>(),
|
||||
diff = needs_signals_received - signals_received,
|
||||
"waiting for a signal",
|
||||
);
|
||||
// wait for next signal.
|
||||
let signal = self.signals.next().await
|
||||
.ok_or(SubsystemError::Context(
|
||||
@@ -911,7 +929,6 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
|
||||
|
||||
let mut await_message = self.messages.next();
|
||||
let mut await_signal = self.signals.next();
|
||||
let signals_received = self.signals_received.load();
|
||||
let pending_incoming = &mut self.pending_incoming;
|
||||
|
||||
// Otherwise, wait for the next signal or incoming message.
|
||||
@@ -989,7 +1006,7 @@ impl<M> OverseenSubsystem<M> {
|
||||
/// Send a message to the wrapped subsystem.
|
||||
///
|
||||
/// If the inner `instance` is `None`, nothing is happening.
|
||||
async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
|
||||
async fn send_message(&mut self, msg: M, origin: &'static str) -> SubsystemResult<()> {
|
||||
const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
if let Some(ref mut instance) = self.instance {
|
||||
@@ -999,7 +1016,12 @@ impl<M> OverseenSubsystem<M> {
|
||||
}).timeout(MESSAGE_TIMEOUT).await
|
||||
{
|
||||
None => {
|
||||
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
%origin,
|
||||
"Subsystem {} appears unresponsive.",
|
||||
instance.name,
|
||||
);
|
||||
Err(SubsystemError::SubsystemStalled(instance.name))
|
||||
}
|
||||
Some(res) => res.map_err(Into::into),
|
||||
@@ -1016,9 +1038,15 @@ impl<M> OverseenSubsystem<M> {
|
||||
const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
if let Some(ref mut instance) = self.instance {
|
||||
match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await {
|
||||
match instance.tx_signal.send(signal.clone()).timeout(SIGNAL_TIMEOUT).await {
|
||||
None => {
|
||||
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
?signal,
|
||||
received = instance.signals_received,
|
||||
"Subsystem {} appears unresponsive.",
|
||||
instance.name,
|
||||
);
|
||||
Err(SubsystemError::SubsystemStalled(instance.name))
|
||||
}
|
||||
Some(res) => {
|
||||
@@ -1903,8 +1931,8 @@ where
|
||||
select! {
|
||||
msg = self.events_rx.select_next_some() => {
|
||||
match msg {
|
||||
Event::MsgToSubsystem(msg) => {
|
||||
self.route_message(msg.into()).await?;
|
||||
Event::MsgToSubsystem { msg, origin } => {
|
||||
self.route_message(msg.into(), origin).await?;
|
||||
}
|
||||
Event::Stop => {
|
||||
self.stop().await;
|
||||
@@ -2028,59 +2056,63 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
|
||||
async fn route_message(
|
||||
&mut self,
|
||||
msg: AllMessages,
|
||||
origin: &'static str,
|
||||
) -> SubsystemResult<()> {
|
||||
self.metrics.on_message_relayed();
|
||||
match msg {
|
||||
AllMessages::CandidateValidation(msg) => {
|
||||
self.subsystems.candidate_validation.send_message(msg).await?;
|
||||
self.subsystems.candidate_validation.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::CandidateBacking(msg) => {
|
||||
self.subsystems.candidate_backing.send_message(msg).await?;
|
||||
self.subsystems.candidate_backing.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::StatementDistribution(msg) => {
|
||||
self.subsystems.statement_distribution.send_message(msg).await?;
|
||||
self.subsystems.statement_distribution.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::AvailabilityDistribution(msg) => {
|
||||
self.subsystems.availability_distribution.send_message(msg).await?;
|
||||
self.subsystems.availability_distribution.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::AvailabilityRecovery(msg) => {
|
||||
self.subsystems.availability_recovery.send_message(msg).await?;
|
||||
self.subsystems.availability_recovery.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::BitfieldDistribution(msg) => {
|
||||
self.subsystems.bitfield_distribution.send_message(msg).await?;
|
||||
self.subsystems.bitfield_distribution.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::BitfieldSigning(msg) => {
|
||||
self.subsystems.bitfield_signing.send_message(msg).await?;
|
||||
self.subsystems.bitfield_signing.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::Provisioner(msg) => {
|
||||
self.subsystems.provisioner.send_message(msg).await?;
|
||||
self.subsystems.provisioner.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::RuntimeApi(msg) => {
|
||||
self.subsystems.runtime_api.send_message(msg).await?;
|
||||
self.subsystems.runtime_api.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::AvailabilityStore(msg) => {
|
||||
self.subsystems.availability_store.send_message(msg).await?;
|
||||
self.subsystems.availability_store.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::NetworkBridge(msg) => {
|
||||
self.subsystems.network_bridge.send_message(msg).await?;
|
||||
self.subsystems.network_bridge.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::ChainApi(msg) => {
|
||||
self.subsystems.chain_api.send_message(msg).await?;
|
||||
self.subsystems.chain_api.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::CollationGeneration(msg) => {
|
||||
self.subsystems.collation_generation.send_message(msg).await?;
|
||||
self.subsystems.collation_generation.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::CollatorProtocol(msg) => {
|
||||
self.subsystems.collator_protocol.send_message(msg).await?;
|
||||
self.subsystems.collator_protocol.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::ApprovalDistribution(msg) => {
|
||||
self.subsystems.approval_distribution.send_message(msg).await?;
|
||||
self.subsystems.approval_distribution.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::ApprovalVoting(msg) => {
|
||||
self.subsystems.approval_voting.send_message(msg).await?;
|
||||
self.subsystems.approval_voting.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::GossipSupport(msg) => {
|
||||
self.subsystems.gossip_support.send_message(msg).await?;
|
||||
self.subsystems.gossip_support.send_message(msg, origin).await?;
|
||||
},
|
||||
AllMessages::DisputeCoordinator(_) => {}
|
||||
AllMessages::DisputeParticipation(_) => {}
|
||||
|
||||
@@ -228,7 +228,7 @@ fn overseer_metrics_work() {
|
||||
|
||||
handler.block_imported(second_block).await;
|
||||
handler.block_imported(third_block).await;
|
||||
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handler.stop().await;
|
||||
|
||||
select! {
|
||||
@@ -984,22 +984,22 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
|
||||
// send a msg to each subsystem
|
||||
// except for BitfieldSigning and GossipSupport as the messages are not instantiable
|
||||
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
|
||||
handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
|
||||
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
|
||||
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
|
||||
handler.send_msg(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
|
||||
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
|
||||
// handler.send_msg(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
|
||||
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
|
||||
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await;
|
||||
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
|
||||
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
|
||||
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
|
||||
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await;
|
||||
handler.send_msg(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
|
||||
handler.send_msg(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
|
||||
// handler.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
|
||||
// handler.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;
|
||||
|
||||
// Wait until all subsystems have received. Otherwise the messages might race against
|
||||
// the conclude signal.
|
||||
|
||||
Reference in New Issue
Block a user