>(client: Arc, mut handle: Handle) {
let mut finality = client.finality_notification_stream();
let mut imports = client.import_notification_stream();
loop {
select! {
f = finality.next() => {
match f {
Some(block) => {
handle.block_finalized(block.into()).await;
}
None => break,
}
},
i = imports.next() => {
match i {
Some(block) => {
handle.block_imported(block.into()).await;
}
None => break,
}
},
complete => break,
}
}
}
/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
///
/// This returns the overseer along with an [`OverseerHandle`] which can
/// be used to send messages from external parts of the codebase.
///
/// The [`OverseerHandle`] returned from this function is connected to
/// the returned [`Overseer`].
///
/// ```text
/// +------------------------------------+
/// | Overseer |
/// +------------------------------------+
/// / | | \
/// ................. subsystems...................................
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// . | | | | | | | | .
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// ...............................................................
/// |
/// probably `spawn`
/// a `job`
/// |
/// V
/// +-----------+
/// | |
/// +-----------+
/// ```
///
/// [`Subsystem`]: trait.Subsystem.html
///
/// # Example
///
/// The [`Subsystems`] may be any type as long as they implement an expected interface.
/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with
/// them. For the sake of simplicity the termination of the example is done with a timeout.
/// ```
/// # use std::time::Duration;
/// # use futures::{executor, pin_mut, select, FutureExt};
/// # use futures_timer::Delay;
/// # use pezkuwi_primitives::Hash;
/// # use pezkuwi_overseer::{
/// # self as overseer,
/// # OverseerSignal,
/// # SubsystemSender as _,
/// # AllMessages,
/// # HeadSupportsTeyrchains,
/// # Overseer,
/// # SubsystemError,
/// # gen::{
/// # SubsystemContext,
/// # FromOrchestra,
/// # SpawnedSubsystem,
/// # },
/// # };
/// # use pezkuwi_node_subsystem_types::messages::{
/// # CandidateValidationMessage, CandidateBackingMessage,
/// # NetworkBridgeTxMessage,
/// # };
///
/// struct ValidationSubsystem;
///
/// impl overseer::Subsystem for ValidationSubsystem
/// where
/// Ctx: overseer::SubsystemContext<
/// Message=CandidateValidationMessage,
/// AllMessages=AllMessages,
/// Signal=OverseerSignal,
/// Error=SubsystemError,
/// >,
/// {
/// fn start(
/// self,
/// mut ctx: Ctx,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem {
/// name: "validation-subsystem",
/// future: Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }),
/// }
/// }
/// }
///
/// # fn main() { executor::block_on(async move {
///
/// struct AlwaysSupportsTeyrchains;
///
/// #[async_trait::async_trait]
/// impl HeadSupportsTeyrchains for AlwaysSupportsTeyrchains {
/// async fn head_supports_teyrchains(&self, _head: &Hash) -> bool { true }
/// }
///
/// let spawner = pezsp_core::testing::TaskExecutor::new();
/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsTeyrchains, None)
/// .unwrap()
/// .replace_candidate_validation(|_| ValidationSubsystem)
/// .build()
/// .unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
///
/// let overseer_fut = overseer.run().fuse();
/// pin_mut!(timer);
/// pin_mut!(overseer_fut);
///
/// select! {
/// _ = overseer_fut => (),
/// _ = timer => (),
/// }
/// #
/// # });
/// # }
/// ```
#[orchestra(
gen=AllMessages,
event=Event,
signal=OverseerSignal,
error=SubsystemError,
message_capacity=2048,
)]
pub struct Overseer {
#[subsystem(CandidateValidationMessage, sends: [
ChainApiMessage,
RuntimeApiMessage,
])]
candidate_validation: CandidateValidation,
#[subsystem(sends: [
CandidateValidationMessage,
RuntimeApiMessage,
])]
pvf_checker: PvfChecker,
#[subsystem(CandidateBackingMessage, sends: [
CandidateValidationMessage,
CollatorProtocolMessage,
ChainApiMessage,
AvailabilityDistributionMessage,
AvailabilityStoreMessage,
StatementDistributionMessage,
ProvisionerMessage,
RuntimeApiMessage,
ProspectiveTeyrchainsMessage,
])]
candidate_backing: CandidateBacking,
#[subsystem(StatementDistributionMessage, sends: [
NetworkBridgeTxMessage,
CandidateBackingMessage,
RuntimeApiMessage,
ProspectiveTeyrchainsMessage,
ChainApiMessage,
], can_receive_priority_messages)]
statement_distribution: StatementDistribution,
#[subsystem(AvailabilityDistributionMessage, sends: [
AvailabilityStoreMessage,
ChainApiMessage,
RuntimeApiMessage,
NetworkBridgeTxMessage,
])]
availability_distribution: AvailabilityDistribution,
#[subsystem(AvailabilityRecoveryMessage, sends: [
NetworkBridgeTxMessage,
RuntimeApiMessage,
AvailabilityStoreMessage,
])]
availability_recovery: AvailabilityRecovery,
#[subsystem(blocking, sends: [
AvailabilityStoreMessage,
RuntimeApiMessage,
BitfieldDistributionMessage,
])]
bitfield_signing: BitfieldSigning,
#[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
RuntimeApiMessage,
NetworkBridgeTxMessage,
ProvisionerMessage,
], can_receive_priority_messages)]
bitfield_distribution: BitfieldDistribution,
#[subsystem(ProvisionerMessage, sends: [
RuntimeApiMessage,
CandidateBackingMessage,
DisputeCoordinatorMessage,
ProspectiveTeyrchainsMessage,
ChainApiMessage,
])]
provisioner: Provisioner,
#[subsystem(blocking, RuntimeApiMessage, sends: [])]
runtime_api: RuntimeApi,
#[subsystem(blocking, AvailabilityStoreMessage, sends: [
ChainApiMessage,
RuntimeApiMessage,
])]
availability_store: AvailabilityStore,
#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
BitfieldDistributionMessage,
StatementDistributionMessage,
ApprovalVotingParallelMessage,
GossipSupportMessage,
DisputeDistributionMessage,
CollationGenerationMessage,
CollatorProtocolMessage,
])]
network_bridge_rx: NetworkBridgeRx,
#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
network_bridge_tx: NetworkBridgeTx,
#[subsystem(blocking, ChainApiMessage, sends: [])]
chain_api: ChainApi,
#[subsystem(CollationGenerationMessage, sends: [
RuntimeApiMessage,
CollatorProtocolMessage,
])]
collation_generation: CollationGeneration,
#[subsystem(CollatorProtocolMessage, sends: [
NetworkBridgeTxMessage,
RuntimeApiMessage,
CandidateBackingMessage,
ChainApiMessage,
ProspectiveTeyrchainsMessage,
])]
collator_protocol: CollatorProtocol,
#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
NetworkBridgeTxMessage,
ApprovalVotingMessage,
RuntimeApiMessage,
], can_receive_priority_messages)]
approval_distribution: ApprovalDistribution,
#[subsystem(blocking, ApprovalVotingMessage, sends: [
ApprovalDistributionMessage,
AvailabilityRecoveryMessage,
CandidateValidationMessage,
ChainApiMessage,
ChainSelectionMessage,
DisputeCoordinatorMessage,
RuntimeApiMessage,
])]
approval_voting: ApprovalVoting,
#[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
AvailabilityRecoveryMessage,
CandidateValidationMessage,
ChainApiMessage,
ChainSelectionMessage,
DisputeCoordinatorMessage,
RuntimeApiMessage,
NetworkBridgeTxMessage,
ApprovalVotingParallelMessage,
], can_receive_priority_messages)]
approval_voting_parallel: ApprovalVotingParallel,
#[subsystem(GossipSupportMessage, sends: [
NetworkBridgeTxMessage,
NetworkBridgeRxMessage, // TODO
RuntimeApiMessage,
ChainSelectionMessage,
ChainApiMessage,
], can_receive_priority_messages)]
gossip_support: GossipSupport,
#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
RuntimeApiMessage,
ChainApiMessage,
DisputeDistributionMessage,
CandidateValidationMessage,
AvailabilityStoreMessage,
AvailabilityRecoveryMessage,
ChainSelectionMessage,
ApprovalVotingParallelMessage,
], can_receive_priority_messages)]
dispute_coordinator: DisputeCoordinator,
#[subsystem(DisputeDistributionMessage, sends: [
RuntimeApiMessage,
DisputeCoordinatorMessage,
NetworkBridgeTxMessage,
])]
dispute_distribution: DisputeDistribution,
#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
chain_selection: ChainSelection,
#[subsystem(ProspectiveTeyrchainsMessage, sends: [
RuntimeApiMessage,
ChainApiMessage,
])]
prospective_teyrchains: ProspectiveTeyrchains,
/// External listeners waiting for a hash to be in the active-leave set.
pub activation_external_listeners: HashMap>>>,
/// The set of the "active leaves".
pub active_leaves: HashMap,
/// An implementation for checking whether a header supports teyrchain consensus.
pub supports_teyrchains: SupportsTeyrchains,
/// Various Prometheus metrics.
pub metrics: OverseerMetrics,
}
/// Spawn the metrics metronome task.
pub fn spawn_metronome_metrics(
overseer: &mut Overseer,
metronome_metrics: OverseerMetrics,
) -> Result<(), SubsystemError>
where
S: Spawner,
SupportsTeyrchains: HeadSupportsTeyrchains,
{
struct ExtractNameAndMeters;
impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem> for ExtractNameAndMeters {
type Output = Option<(&'static str, SubsystemMeters)>;
fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem) -> Self::Output {
subsystem
.instance
.as_ref()
.map(|instance| (instance.name, instance.meters.clone()))
}
}
let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let collect_memory_stats: Box =
match memory_stats::MemoryAllocationTracker::new() {
Ok(memory_stats) => {
Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
Ok(memory_stats_snapshot) => {
gum::trace!(
target: LOG_TARGET,
"memory_stats: {:?}",
&memory_stats_snapshot
);
metrics.memory_stats_snapshot(memory_stats_snapshot);
},
Err(e) => {
gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e)
},
})
},
Err(_) => {
gum::debug!(
target: LOG_TARGET,
"Memory allocation tracking is not supported by the allocator.",
);
Box::new(|_| {})
},
};
#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
let collect_memory_stats: Box = Box::new(|_| {});
let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
collect_memory_stats(&metronome_metrics);
// We combine the amount of messages from subsystems to the overseer
// as well as the amount of messages from external sources to the overseer
// into one `to_overseer` value.
metronome_metrics.channel_metrics_snapshot(
subsystem_meters
.iter()
.cloned()
.flatten()
.map(|(name, ref meters)| (name, meters.read())),
);
futures::future::ready(())
});
overseer
.spawner()
.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
Ok(())
}
impl Overseer
where
SupportsTeyrchains: HeadSupportsTeyrchains,
S: Spawner,
{
/// Stop the `Overseer`.
async fn stop(mut self) {
let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
}
/// Run the `Overseer`.
///
/// Logging any errors.
pub async fn run(self) {
if let Err(err) = self.run_inner().await {
gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
}
}
async fn run_inner(mut self) -> SubsystemResult<()> {
let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;
loop {
select! {
msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem { msg, origin, priority } => {
match priority {
PriorityLevel::Normal => {
self.route_message(msg.into(), origin).await?;
},
PriorityLevel::High => {
self.route_message_with_priority::(msg.into(), origin).await?;
},
}
self.metrics.on_message_relayed();
}
Event::Stop => {
self.stop().await;
return Ok(());
}
Event::BlockImported(block) => {
self.block_imported(block).await?;
}
Event::BlockFinalized(block) => {
self.block_finalized(block).await?;
}
Event::ExternalRequest(request) => {
self.handle_external_request(request);
}
}
},
msg = self.to_orchestra_rx.select_next_some() => {
match msg {
ToOrchestra::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
}
ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
}
}
},
res = self.running_subsystems.select_next_some() => {
gum::error!(
target: LOG_TARGET,
subsystem = ?res,
"subsystem finished unexpectedly",
);
self.stop().await;
return res;
},
}
}
}
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => entry.insert(block.number),
hash_map::Entry::Occupied(entry) => {
debug_assert_eq!(*entry.get(), block.number);
return Ok(());
},
};
let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: block.hash,
number: block.number,
unpin_handle: block.unpin_handle,
}),
None => ActiveLeavesUpdate::default(),
};
if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
debug_assert_eq!(block.number.saturating_sub(1), number);
update.deactivated.push(block.parent_hash);
self.on_head_deactivated(&block.parent_hash);
}
self.clean_up_external_listeners();
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
Ok(())
}
async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();
self.active_leaves.retain(|h, n| {
// prune all orphaned leaves, but don't prune
// the finalized block if it is itself a leaf.
if *n <= block.number && *h != block.hash {
update.deactivated.push(*h);
false
} else {
true
}
});
for deactivated in &update.deactivated {
self.on_head_deactivated(deactivated)
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
.await?;
// If there are no leaves being deactivated, we don't need to send an update.
//
// Our peers will be informed about our finalized block the next time we
// activating/deactivating some leaf.
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
Ok(())
}
/// Handles a header activation. If the header's state doesn't support the teyrchains API,
/// this returns `None`.
async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option) -> Option<()> {
if !self.supports_teyrchains.head_supports_teyrchains(hash).await {
return None;
}
self.metrics.on_head_activated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Leaf got activated, notifying external listeners"
);
for listener in listeners {
// it's fine if the listener is no longer interested
let _ = listener.send(Ok(()));
}
}
Some(())
}
fn on_head_deactivated(&mut self, hash: &Hash) {
self.metrics.on_head_deactivated();
self.activation_external_listeners.remove(hash);
}
fn clean_up_external_listeners(&mut self) {
self.activation_external_listeners.retain(|_, v| {
// remove dead listeners
v.retain(|c| !c.is_canceled());
!v.is_empty()
})
}
fn handle_external_request(&mut self, request: ExternalRequest) {
match request {
ExternalRequest::WaitForActivation { hash, response_channel } => {
if self.active_leaves.get(&hash).is_some() {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Leaf was already ready - answering `WaitForActivation`"
);
// it's fine if the listener is no longer interested
let _ = response_channel.send(Ok(()));
} else {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Leaf not yet ready - queuing `WaitForActivation` sender"
);
self.activation_external_listeners
.entry(hash)
.or_default()
.push(response_channel);
}
},
}
}
fn spawn_job(
&mut self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn(task_name, subsystem_name, j);
}
fn spawn_blocking_job(
&mut self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn_blocking(task_name, subsystem_name, j);
}
}