mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-01 14:47:55 +00:00
Adds start and stop work heartbeats. (#1188)
* Adds start and stop work heartbeats. * Apply suggestions from code review Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Fix code after suggested changes * Finalizing stops work on earlier lower blocks. * Fix func parameter and flaky test Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Generated
+1
@@ -3308,6 +3308,7 @@ dependencies = [
|
||||
"futures-timer 3.0.2",
|
||||
"kv-log-macro",
|
||||
"log 0.4.8",
|
||||
"polkadot-primitives",
|
||||
"streamunordered",
|
||||
]
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ futures = "0.3.5"
|
||||
log = "0.4.8"
|
||||
futures-timer = "3.0.2"
|
||||
streamunordered = "0.5.1"
|
||||
polkadot-primitives = { path = "../primitives" }
|
||||
|
||||
[dev-dependencies]
|
||||
futures = { version = "0.3.5", features = ["thread-pool"] }
|
||||
|
||||
@@ -111,6 +111,7 @@ fn main() {
|
||||
});
|
||||
|
||||
let (overseer, _handler) = Overseer::new(
|
||||
vec![],
|
||||
Box::new(Subsystem2),
|
||||
Box::new(Subsystem1),
|
||||
spawner,
|
||||
|
||||
+330
-13
@@ -58,6 +58,7 @@ use std::fmt::Debug;
|
||||
use std::pin::Pin;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::{
|
||||
@@ -70,6 +71,8 @@ use futures::{
|
||||
use futures_timer::Delay;
|
||||
use streamunordered::{StreamYield, StreamUnordered};
|
||||
|
||||
use polkadot_primitives::{BlockNumber, Hash};
|
||||
|
||||
/// An error type that describes faults that may happen
|
||||
///
|
||||
/// These are:
|
||||
@@ -136,10 +139,25 @@ enum ToOverseer {
|
||||
},
|
||||
}
|
||||
|
||||
/// An event telling the `Overseer` on the particular block
|
||||
/// that has been imported or finalized.
|
||||
///
|
||||
/// This structure exists solely for the purposes of decoupling
|
||||
/// `Overseer` code from the client code and the necessity to call
|
||||
/// `HeaderBackend::block_number_from_id()`.
|
||||
pub struct BlockInfo {
|
||||
/// hash of the block.
|
||||
pub hash: Hash,
|
||||
/// hash of the parent block.
|
||||
pub parent_hash: Hash,
|
||||
/// block's number.
|
||||
pub number: BlockNumber,
|
||||
}
|
||||
|
||||
/// Some event from outer world.
|
||||
enum Event {
|
||||
BlockImport,
|
||||
BlockFinalized,
|
||||
BlockImported(BlockInfo),
|
||||
BlockFinalized(BlockInfo),
|
||||
MsgToSubsystem(AllMessages),
|
||||
Stop,
|
||||
}
|
||||
@@ -160,8 +178,8 @@ pub struct OverseerHandler {
|
||||
|
||||
impl OverseerHandler {
|
||||
/// Inform the `Overseer` that that some block was imported.
|
||||
pub async fn block_imported(&mut self) -> SubsystemResult<()> {
|
||||
self.events_tx.send(Event::BlockImport).await?;
|
||||
pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
|
||||
self.events_tx.send(Event::BlockImported(block)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -174,8 +192,8 @@ impl OverseerHandler {
|
||||
}
|
||||
|
||||
/// Inform the `Overseer` that that some block was finalized.
|
||||
pub async fn block_finalized(&mut self) -> SubsystemResult<()> {
|
||||
self.events_tx.send(Event::BlockFinalized).await?;
|
||||
pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
|
||||
self.events_tx.send(Event::BlockFinalized(block)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -222,12 +240,12 @@ pub struct SubsystemContext<M: Debug>{
|
||||
///
|
||||
/// [`Overseer`]: struct.Overseer.html
|
||||
/// [`Subsystem`]: trait.Subsystem.html
|
||||
#[derive(Debug)]
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub enum OverseerSignal {
|
||||
/// `Subsystem` should start working.
|
||||
StartWork,
|
||||
StartWork(Hash),
|
||||
/// `Subsystem` should stop working.
|
||||
StopWork,
|
||||
StopWork(Hash),
|
||||
/// Conclude the work of the `Overseer` and all `Subsystem`s.
|
||||
Conclude,
|
||||
}
|
||||
@@ -366,6 +384,14 @@ pub struct Overseer<S: Spawn> {
|
||||
|
||||
/// Events that are sent to the overseer from the outside world
|
||||
events_rx: mpsc::Receiver<Event>,
|
||||
|
||||
/// A set of leaves that `Overseer` starts working with.
|
||||
///
|
||||
/// Drained at the beginning of `run` and never used again.
|
||||
leaves: Vec<(Hash, BlockNumber)>,
|
||||
|
||||
/// The set of the "active leaves".
|
||||
active_leaves: HashSet<(Hash, BlockNumber)>,
|
||||
}
|
||||
|
||||
impl<S> Overseer<S>
|
||||
@@ -452,6 +478,7 @@ where
|
||||
/// # fn main() { executor::block_on(async move {
|
||||
/// let spawner = executor::ThreadPool::new().unwrap();
|
||||
/// let (overseer, _handler) = Overseer::new(
|
||||
/// vec![],
|
||||
/// Box::new(ValidationSubsystem),
|
||||
/// Box::new(CandidateBackingSubsystem),
|
||||
/// spawner,
|
||||
@@ -471,6 +498,7 @@ where
|
||||
/// # }); }
|
||||
/// ```
|
||||
pub fn new(
|
||||
leaves: impl IntoIterator<Item = BlockInfo>,
|
||||
validation: Box<dyn Subsystem<ValidationSubsystemMessage> + Send>,
|
||||
candidate_backing: Box<dyn Subsystem<CandidateBackingSubsystemMessage> + Send>,
|
||||
mut s: S,
|
||||
@@ -498,6 +526,13 @@ where
|
||||
candidate_backing,
|
||||
)?;
|
||||
|
||||
let active_leaves = HashSet::new();
|
||||
|
||||
let leaves = leaves
|
||||
.into_iter()
|
||||
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
|
||||
.collect();
|
||||
|
||||
let this = Self {
|
||||
validation_subsystem,
|
||||
candidate_backing_subsystem,
|
||||
@@ -505,6 +540,8 @@ where
|
||||
running_subsystems,
|
||||
running_subsystems_rx,
|
||||
events_rx,
|
||||
leaves,
|
||||
active_leaves,
|
||||
};
|
||||
|
||||
Ok((this, handler))
|
||||
@@ -537,6 +574,13 @@ where
|
||||
|
||||
/// Run the `Overseer`.
|
||||
pub async fn run(mut self) -> SubsystemResult<()> {
|
||||
let leaves = std::mem::take(&mut self.leaves);
|
||||
|
||||
for leaf in leaves.into_iter() {
|
||||
self.broadcast_signal(OverseerSignal::StartWork(leaf.0)).await?;
|
||||
self.active_leaves.insert(leaf);
|
||||
}
|
||||
|
||||
loop {
|
||||
while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
|
||||
match msg {
|
||||
@@ -547,7 +591,12 @@ where
|
||||
self.stop().await;
|
||||
return Ok(());
|
||||
}
|
||||
_ => ()
|
||||
Event::BlockImported(block) => {
|
||||
self.block_imported(block).await?;
|
||||
}
|
||||
Event::BlockFinalized(block) => {
|
||||
self.block_finalized(block).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -576,6 +625,50 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
|
||||
if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) {
|
||||
self.broadcast_signal(OverseerSignal::StopWork(parent.0)).await?;
|
||||
}
|
||||
|
||||
if !self.active_leaves.contains(&(block.hash, block.number)) {
|
||||
self.broadcast_signal(OverseerSignal::StartWork(block.hash)).await?;
|
||||
self.active_leaves.insert((block.hash, block.number));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
|
||||
let mut stop_these = Vec::new();
|
||||
|
||||
self.active_leaves.retain(|(h, n)| {
|
||||
if *n <= block.number {
|
||||
stop_these.push(*h);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
for hash in stop_these.into_iter() {
|
||||
self.broadcast_signal(OverseerSignal::StopWork(hash)).await?
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
|
||||
if let Some(ref mut s) = self.validation_subsystem.instance {
|
||||
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
|
||||
}
|
||||
|
||||
if let Some(ref mut s) = self.candidate_backing_subsystem.instance {
|
||||
s.tx.send(FromOverseer::Signal(signal)).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn route_message(&mut self, msg: AllMessages) {
|
||||
match msg {
|
||||
AllMessages::Validation(msg) => {
|
||||
@@ -591,7 +684,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> {
|
||||
self.s.spawn(j).map_err(|_| SubsystemError)
|
||||
}
|
||||
@@ -642,7 +734,7 @@ mod tests {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
Ok(FromOverseer::Signal(OverseerSignal::StopWork)) => return,
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
}
|
||||
@@ -668,7 +760,7 @@ mod tests {
|
||||
continue;
|
||||
}
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
|
||||
break;
|
||||
}
|
||||
Ok(Some(_)) => {
|
||||
@@ -703,6 +795,7 @@ mod tests {
|
||||
let (s2_tx, mut s2_rx) = mpsc::channel(64);
|
||||
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
vec![],
|
||||
Box::new(TestSubsystem1(s1_tx)),
|
||||
Box::new(TestSubsystem2(s2_tx)),
|
||||
spawner,
|
||||
@@ -752,6 +845,7 @@ mod tests {
|
||||
executor::block_on(async move {
|
||||
let (s1_tx, _) = mpsc::channel(64);
|
||||
let (overseer, _handle) = Overseer::new(
|
||||
vec![],
|
||||
Box::new(TestSubsystem1(s1_tx)),
|
||||
Box::new(TestSubsystem4),
|
||||
spawner,
|
||||
@@ -765,4 +859,227 @@ mod tests {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
struct TestSubsystem5(mpsc::Sender<OverseerSignal>);
|
||||
|
||||
impl Subsystem<ValidationSubsystemMessage> for TestSubsystem5 {
|
||||
fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
|
||||
let mut sender = self.0.clone();
|
||||
|
||||
SpawnedSubsystem(Box::pin(async move {
|
||||
loop {
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
|
||||
Ok(Some(FromOverseer::Signal(s))) => {
|
||||
sender.send(s).await.unwrap();
|
||||
continue;
|
||||
},
|
||||
Ok(Some(_)) => continue,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
struct TestSubsystem6(mpsc::Sender<OverseerSignal>);
|
||||
|
||||
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem6 {
|
||||
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
|
||||
let mut sender = self.0.clone();
|
||||
|
||||
SpawnedSubsystem(Box::pin(async move {
|
||||
loop {
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
|
||||
Ok(Some(FromOverseer::Signal(s))) => {
|
||||
sender.send(s).await.unwrap();
|
||||
continue;
|
||||
},
|
||||
Ok(Some(_)) => continue,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that starting with a defined set of leaves and receiving
|
||||
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
|
||||
#[test]
|
||||
fn overseer_start_stop_works() {
|
||||
let spawner = executor::ThreadPool::new().unwrap();
|
||||
|
||||
executor::block_on(async move {
|
||||
let first_block_hash = [1; 32].into();
|
||||
let second_block_hash = [2; 32].into();
|
||||
let third_block_hash = [3; 32].into();
|
||||
|
||||
let first_block = BlockInfo {
|
||||
hash: first_block_hash,
|
||||
parent_hash: [0; 32].into(),
|
||||
number: 1,
|
||||
};
|
||||
let second_block = BlockInfo {
|
||||
hash: second_block_hash,
|
||||
parent_hash: first_block_hash,
|
||||
number: 2,
|
||||
};
|
||||
let third_block = BlockInfo {
|
||||
hash: third_block_hash,
|
||||
parent_hash: second_block_hash,
|
||||
number: 3,
|
||||
};
|
||||
|
||||
let (tx_5, mut rx_5) = mpsc::channel(64);
|
||||
let (tx_6, mut rx_6) = mpsc::channel(64);
|
||||
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
vec![first_block],
|
||||
Box::new(TestSubsystem5(tx_5)),
|
||||
Box::new(TestSubsystem6(tx_6)),
|
||||
spawner,
|
||||
).unwrap();
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
let mut ss5_results = Vec::new();
|
||||
let mut ss6_results = Vec::new();
|
||||
|
||||
handler.block_imported(second_block).await.unwrap();
|
||||
handler.block_imported(third_block).await.unwrap();
|
||||
|
||||
let expected_heartbeats = vec![
|
||||
OverseerSignal::StartWork(first_block_hash),
|
||||
OverseerSignal::StopWork(first_block_hash),
|
||||
OverseerSignal::StartWork(second_block_hash),
|
||||
OverseerSignal::StopWork(second_block_hash),
|
||||
OverseerSignal::StartWork(third_block_hash),
|
||||
];
|
||||
|
||||
loop {
|
||||
select! {
|
||||
res = overseer_fut => {
|
||||
assert!(res.is_ok());
|
||||
break;
|
||||
},
|
||||
res = rx_5.next() => {
|
||||
if let Some(res) = res {
|
||||
ss5_results.push(res);
|
||||
}
|
||||
}
|
||||
res = rx_6.next() => {
|
||||
if let Some(res) = res {
|
||||
ss6_results.push(res);
|
||||
}
|
||||
}
|
||||
complete => break,
|
||||
}
|
||||
|
||||
if ss5_results.len() == expected_heartbeats.len() &&
|
||||
ss6_results.len() == expected_heartbeats.len() {
|
||||
handler.stop().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(ss5_results, expected_heartbeats);
|
||||
assert_eq!(ss6_results, expected_heartbeats);
|
||||
});
|
||||
}
|
||||
|
||||
// Tests that starting with a defined set of leaves and receiving
|
||||
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
|
||||
#[test]
|
||||
fn overseer_finalize_works() {
|
||||
let spawner = executor::ThreadPool::new().unwrap();
|
||||
|
||||
executor::block_on(async move {
|
||||
let first_block_hash = [1; 32].into();
|
||||
let second_block_hash = [2; 32].into();
|
||||
let third_block_hash = [3; 32].into();
|
||||
|
||||
let first_block = BlockInfo {
|
||||
hash: first_block_hash,
|
||||
parent_hash: [0; 32].into(),
|
||||
number: 1,
|
||||
};
|
||||
let second_block = BlockInfo {
|
||||
hash: second_block_hash,
|
||||
parent_hash: [42; 32].into(),
|
||||
number: 2,
|
||||
};
|
||||
let third_block = BlockInfo {
|
||||
hash: third_block_hash,
|
||||
parent_hash: second_block_hash,
|
||||
number: 3,
|
||||
};
|
||||
|
||||
let (tx_5, mut rx_5) = mpsc::channel(64);
|
||||
let (tx_6, mut rx_6) = mpsc::channel(64);
|
||||
|
||||
// start with two forks of different height.
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
vec![first_block, second_block],
|
||||
Box::new(TestSubsystem5(tx_5)),
|
||||
Box::new(TestSubsystem6(tx_6)),
|
||||
spawner,
|
||||
).unwrap();
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
let mut ss5_results = Vec::new();
|
||||
let mut ss6_results = Vec::new();
|
||||
|
||||
// this should stop work on both forks we started with earlier.
|
||||
handler.block_finalized(third_block).await.unwrap();
|
||||
|
||||
let expected_heartbeats = vec![
|
||||
OverseerSignal::StartWork(first_block_hash),
|
||||
OverseerSignal::StartWork(second_block_hash),
|
||||
OverseerSignal::StopWork(first_block_hash),
|
||||
OverseerSignal::StopWork(second_block_hash),
|
||||
];
|
||||
|
||||
loop {
|
||||
select! {
|
||||
res = overseer_fut => {
|
||||
assert!(res.is_ok());
|
||||
break;
|
||||
},
|
||||
res = rx_5.next() => {
|
||||
if let Some(res) = res {
|
||||
ss5_results.push(res);
|
||||
}
|
||||
}
|
||||
res = rx_6.next() => {
|
||||
if let Some(res) = res {
|
||||
ss6_results.push(res);
|
||||
}
|
||||
}
|
||||
complete => break,
|
||||
}
|
||||
|
||||
if ss5_results.len() == expected_heartbeats.len() &&
|
||||
ss6_results.len() == expected_heartbeats.len() {
|
||||
handler.stop().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(ss5_results.len(), expected_heartbeats.len());
|
||||
assert_eq!(ss6_results.len(), expected_heartbeats.len());
|
||||
|
||||
// Notifications on finality for multiple blocks at once
|
||||
// may be received in different orders.
|
||||
for expected in expected_heartbeats {
|
||||
assert!(ss5_results.contains(&expected));
|
||||
assert!(ss6_results.contains(&expected));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user