mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 04:41:03 +00:00
Overseer: subsystems communicate directly (#2227)
* overseer: pass messages directly between subsystems * test that message is held on to * Update node/overseer/src/lib.rs Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> * give every subsystem an unbounded sender too * remove metered_channel::name 1. we don't provide good names 2. these names are never used anywhere * unused mut * remove unnecessary &mut * subsystem unbounded_send * remove unused MaybeTimer We have channel size metrics that serve the same purpose better now and the implementation of message timing was pretty ugly. * remove comment * split up senders and receivers * update metrics * fix tests * fix test subsystem context * fix flaky test * fix docs * doc * use select_biased to favor signals * Update node/subsystem/src/lib.rs Co-authored-by: Andronik Ordian <write@reusable.software> Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
committed by
GitHub
parent
c6f07d8f31
commit
5952e790fa
Generated
+1
-7
@@ -4236,12 +4236,6 @@ dependencies = [
|
|||||||
"parking_lot 0.11.1",
|
"parking_lot 0.11.1",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "oorandom"
|
|
||||||
version = "11.1.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "opaque-debug"
|
name = "opaque-debug"
|
||||||
version = "0.2.3"
|
version = "0.2.3"
|
||||||
@@ -5886,12 +5880,12 @@ dependencies = [
|
|||||||
name = "polkadot-overseer"
|
name = "polkadot-overseer"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"assert_matches",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"femme",
|
"femme",
|
||||||
"futures 0.3.13",
|
"futures 0.3.13",
|
||||||
"futures-timer 3.0.2",
|
"futures-timer 3.0.2",
|
||||||
"kv-log-macro",
|
"kv-log-macro",
|
||||||
"oorandom",
|
|
||||||
"polkadot-node-network-protocol",
|
"polkadot-node-network-protocol",
|
||||||
"polkadot-node-primitives",
|
"polkadot-node-primitives",
|
||||||
"polkadot-node-subsystem",
|
"polkadot-node-subsystem",
|
||||||
|
|||||||
@@ -25,10 +25,9 @@ use super::Meter;
|
|||||||
|
|
||||||
|
|
||||||
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
|
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
|
||||||
pub fn channel<T>(capacity: usize, name: &'static str) -> (MeteredSender<T>, MeteredReceiver<T>) {
|
pub fn channel<T>(capacity: usize) -> (MeteredSender<T>, MeteredReceiver<T>) {
|
||||||
let (tx, rx) = mpsc::channel(capacity);
|
let (tx, rx) = mpsc::channel(capacity);
|
||||||
let mut shared_meter = Meter::default();
|
let shared_meter = Meter::default();
|
||||||
shared_meter.name = name;
|
|
||||||
let tx = MeteredSender { meter: shared_meter.clone(), inner: tx };
|
let tx = MeteredSender { meter: shared_meter.clone(), inner: tx };
|
||||||
let rx = MeteredReceiver { meter: shared_meter, inner: rx };
|
let rx = MeteredReceiver { meter: shared_meter, inner: rx };
|
||||||
(tx, rx)
|
(tx, rx)
|
||||||
|
|||||||
@@ -30,8 +30,6 @@ pub use self::unbounded::*;
|
|||||||
/// A peek into the inner state of a meter.
|
/// A peek into the inner state of a meter.
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct Meter {
|
pub struct Meter {
|
||||||
/// Name of the receiver and sender pair.
|
|
||||||
name: &'static str,
|
|
||||||
// Number of sends on this channel.
|
// Number of sends on this channel.
|
||||||
sent: Arc<AtomicUsize>,
|
sent: Arc<AtomicUsize>,
|
||||||
// Number of receives on this channel.
|
// Number of receives on this channel.
|
||||||
@@ -60,11 +58,6 @@ impl Meter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Obtain the name of the channel `Sender` and `Receiver` pair.
|
|
||||||
pub fn name(&self) -> &'static str {
|
|
||||||
self.name
|
|
||||||
}
|
|
||||||
|
|
||||||
fn note_sent(&self) {
|
fn note_sent(&self) {
|
||||||
self.sent.fetch_add(1, Ordering::Relaxed);
|
self.sent.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
@@ -92,7 +85,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn try_send_try_next() {
|
fn try_send_try_next() {
|
||||||
block_on(async move {
|
block_on(async move {
|
||||||
let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
|
let (mut tx, mut rx) = channel::<Msg>(5);
|
||||||
let msg = Msg::default();
|
let msg = Msg::default();
|
||||||
assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 });
|
assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 });
|
||||||
tx.try_send(msg).unwrap();
|
tx.try_send(msg).unwrap();
|
||||||
@@ -116,7 +109,7 @@ mod tests {
|
|||||||
fn with_tasks() {
|
fn with_tasks() {
|
||||||
let (ready, go) = futures::channel::oneshot::channel();
|
let (ready, go) = futures::channel::oneshot::channel();
|
||||||
|
|
||||||
let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
|
let (mut tx, mut rx) = channel::<Msg>(5);
|
||||||
block_on(async move {
|
block_on(async move {
|
||||||
futures::join!(
|
futures::join!(
|
||||||
async move {
|
async move {
|
||||||
@@ -149,7 +142,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn stream_and_sink() {
|
fn stream_and_sink() {
|
||||||
let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
|
let (mut tx, mut rx) = channel::<Msg>(5);
|
||||||
|
|
||||||
block_on(async move {
|
block_on(async move {
|
||||||
futures::join!(
|
futures::join!(
|
||||||
@@ -175,8 +168,8 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn failed_send_does_not_inc_sent() {
|
fn failed_send_does_not_inc_sent() {
|
||||||
let (mut bounded, _) = channel::<Msg>(5, "pluto");
|
let (mut bounded, _) = channel::<Msg>(5);
|
||||||
let (mut unbounded, _) = unbounded::<Msg>("pluto");
|
let (mut unbounded, _) = unbounded::<Msg>();
|
||||||
|
|
||||||
block_on(async move {
|
block_on(async move {
|
||||||
assert!(bounded.send(Msg::default()).await.is_err());
|
assert!(bounded.send(Msg::default()).await.is_err());
|
||||||
|
|||||||
@@ -25,10 +25,9 @@ use super::Meter;
|
|||||||
|
|
||||||
|
|
||||||
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
|
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
|
||||||
pub fn unbounded<T>(name: &'static str) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
|
pub fn unbounded<T>() -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
|
||||||
let (tx, rx) = mpsc::unbounded();
|
let (tx, rx) = mpsc::unbounded();
|
||||||
let mut shared_meter = Meter::default();
|
let shared_meter = Meter::default();
|
||||||
shared_meter.name = name;
|
|
||||||
let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx };
|
let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx };
|
||||||
let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx };
|
let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx };
|
||||||
(tx, rx)
|
(tx, rx)
|
||||||
@@ -147,7 +146,7 @@ impl<T> UnboundedMeteredSender<T> {
|
|||||||
|
|
||||||
|
|
||||||
/// Attempt to send message or fail immediately.
|
/// Attempt to send message or fail immediately.
|
||||||
pub fn unbounded_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
|
pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
|
||||||
self.meter.note_sent();
|
self.meter.note_sent();
|
||||||
self.inner.unbounded_send(msg).map_err(|e| {
|
self.inner.unbounded_send(msg).map_err(|e| {
|
||||||
self.meter.retract_sent();
|
self.meter.retract_sent();
|
||||||
|
|||||||
@@ -734,7 +734,7 @@ mod tests {
|
|||||||
TestAuthorityDiscovery,
|
TestAuthorityDiscovery,
|
||||||
) {
|
) {
|
||||||
let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
|
let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
|
||||||
let (action_tx, action_rx) = metered::unbounded("test_action");
|
let (action_tx, action_rx) = metered::unbounded();
|
||||||
|
|
||||||
(
|
(
|
||||||
TestNetwork {
|
TestNetwork {
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ async-trait = "0.1.42"
|
|||||||
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
|
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
futures = "0.3.12"
|
futures = "0.3.12"
|
||||||
futures-timer = "3.0.2"
|
futures-timer = "3.0.2"
|
||||||
oorandom = "11.1.3"
|
|
||||||
polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" }
|
polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" }
|
||||||
polkadot-node-subsystem-util = { path = "../subsystem-util" }
|
polkadot-node-subsystem-util = { path = "../subsystem-util" }
|
||||||
polkadot-primitives = { path = "../../primitives" }
|
polkadot-primitives = { path = "../../primitives" }
|
||||||
@@ -20,6 +19,6 @@ tracing = "0.1.25"
|
|||||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
polkadot-node-network-protocol = { path = "../network/protocol" }
|
polkadot-node-network-protocol = { path = "../network/protocol" }
|
||||||
futures = { version = "0.3.12", features = ["thread-pool"] }
|
futures = { version = "0.3.12", features = ["thread-pool"] }
|
||||||
futures-timer = "3.0.2"
|
|
||||||
femme = "2.1.1"
|
femme = "2.1.1"
|
||||||
kv-log-macro = "1.0.7"
|
kv-log-macro = "1.0.7"
|
||||||
|
assert_matches = "1.4.0"
|
||||||
|
|||||||
+1250
-635
File diff suppressed because it is too large
Load Diff
@@ -21,7 +21,7 @@
|
|||||||
use polkadot_node_subsystem::messages::AllMessages;
|
use polkadot_node_subsystem::messages::AllMessages;
|
||||||
use polkadot_node_subsystem::{
|
use polkadot_node_subsystem::{
|
||||||
FromOverseer, SubsystemContext, SubsystemError, SubsystemResult, Subsystem,
|
FromOverseer, SubsystemContext, SubsystemError, SubsystemResult, Subsystem,
|
||||||
SpawnedSubsystem, OverseerSignal,
|
SpawnedSubsystem, OverseerSignal, SubsystemSender,
|
||||||
};
|
};
|
||||||
use polkadot_node_subsystem_util::TimeoutExt;
|
use polkadot_node_subsystem_util::TimeoutExt;
|
||||||
|
|
||||||
@@ -156,9 +156,41 @@ pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
|
|||||||
(SingleItemSink(inner.clone()), SingleItemStream(inner))
|
(SingleItemSink(inner.clone()), SingleItemStream(inner))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A test subsystem sender.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TestSubsystemSender {
|
||||||
|
tx: mpsc::UnboundedSender<AllMessages>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl SubsystemSender for TestSubsystemSender {
|
||||||
|
async fn send_message(&mut self, msg: AllMessages) {
|
||||||
|
self.tx
|
||||||
|
.send(msg)
|
||||||
|
.await
|
||||||
|
.expect("test overseer no longer live");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_messages<T>(&mut self, msgs: T)
|
||||||
|
where
|
||||||
|
T: IntoIterator<Item = AllMessages> + Send,
|
||||||
|
T::IntoIter: Send,
|
||||||
|
{
|
||||||
|
let mut iter = stream::iter(msgs.into_iter().map(Ok));
|
||||||
|
self.tx
|
||||||
|
.send_all(&mut iter)
|
||||||
|
.await
|
||||||
|
.expect("test overseer no longer live");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_unbounded_message(&mut self, msg: AllMessages) {
|
||||||
|
self.tx.unbounded_send(msg).expect("test overseer no longer live");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A test subsystem context.
|
/// A test subsystem context.
|
||||||
pub struct TestSubsystemContext<M, S> {
|
pub struct TestSubsystemContext<M, S> {
|
||||||
tx: mpsc::UnboundedSender<AllMessages>,
|
tx: TestSubsystemSender,
|
||||||
rx: SingleItemStream<FromOverseer<M>>,
|
rx: SingleItemStream<FromOverseer<M>>,
|
||||||
spawn: S,
|
spawn: S,
|
||||||
}
|
}
|
||||||
@@ -168,6 +200,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
|
|||||||
for TestSubsystemContext<M, S>
|
for TestSubsystemContext<M, S>
|
||||||
{
|
{
|
||||||
type Message = M;
|
type Message = M;
|
||||||
|
type Sender = TestSubsystemSender;
|
||||||
|
|
||||||
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
|
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
|
||||||
match poll!(self.rx.next()) {
|
match poll!(self.rx.next()) {
|
||||||
@@ -198,23 +231,8 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_message(&mut self, msg: AllMessages) {
|
fn sender(&mut self) -> &mut TestSubsystemSender {
|
||||||
self.tx
|
&mut self.tx
|
||||||
.send(msg)
|
|
||||||
.await
|
|
||||||
.expect("test overseer no longer live");
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_messages<T>(&mut self, msgs: T)
|
|
||||||
where
|
|
||||||
T: IntoIterator<Item = AllMessages> + Send,
|
|
||||||
T::IntoIter: Send,
|
|
||||||
{
|
|
||||||
let mut iter = stream::iter(msgs.into_iter().map(Ok));
|
|
||||||
self.tx
|
|
||||||
.send_all(&mut iter)
|
|
||||||
.await
|
|
||||||
.expect("test overseer no longer live");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -260,7 +278,7 @@ pub fn make_subsystem_context<M, S>(
|
|||||||
|
|
||||||
(
|
(
|
||||||
TestSubsystemContext {
|
TestSubsystemContext {
|
||||||
tx: all_messages_tx,
|
tx: TestSubsystemSender { tx: all_messages_tx },
|
||||||
rx: overseer_rx,
|
rx: overseer_rx,
|
||||||
spawn,
|
spawn,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -210,6 +210,27 @@ pub struct SpawnedSubsystem {
|
|||||||
/// [`SubsystemError`]: struct.SubsystemError.html
|
/// [`SubsystemError`]: struct.SubsystemError.html
|
||||||
pub type SubsystemResult<T> = Result<T, SubsystemError>;
|
pub type SubsystemResult<T> = Result<T, SubsystemError>;
|
||||||
|
|
||||||
|
/// A sender used by subsystems to communicate with other subsystems.
|
||||||
|
///
|
||||||
|
/// Each clone of this type may add more capacity to the bounded buffer, so clones should
|
||||||
|
/// be used sparingly.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait SubsystemSender: Send + Clone + 'static {
|
||||||
|
/// Send a direct message to some other `Subsystem`, routed based on message type.
|
||||||
|
async fn send_message(&mut self, msg: AllMessages);
|
||||||
|
|
||||||
|
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
|
||||||
|
async fn send_messages<T>(&mut self, msgs: T)
|
||||||
|
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send;
|
||||||
|
|
||||||
|
/// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message
|
||||||
|
/// type.
|
||||||
|
///
|
||||||
|
/// This function should be used only when there is some other bounding factor on the messages
|
||||||
|
/// sent with it. Otherwise, it risks a memory leak.
|
||||||
|
fn send_unbounded_message(&mut self, msg: AllMessages);
|
||||||
|
}
|
||||||
|
|
||||||
/// A context type that is given to the [`Subsystem`] upon spawning.
|
/// A context type that is given to the [`Subsystem`] upon spawning.
|
||||||
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
|
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
|
||||||
/// or spawn jobs.
|
/// or spawn jobs.
|
||||||
@@ -217,11 +238,14 @@ pub type SubsystemResult<T> = Result<T, SubsystemError>;
|
|||||||
/// [`Overseer`]: struct.Overseer.html
|
/// [`Overseer`]: struct.Overseer.html
|
||||||
/// [`SubsystemJob`]: trait.SubsystemJob.html
|
/// [`SubsystemJob`]: trait.SubsystemJob.html
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait SubsystemContext: Send + 'static {
|
pub trait SubsystemContext: Send + Sized + 'static {
|
||||||
/// The message type of this context. Subsystems launched with this context will expect
|
/// The message type of this context. Subsystems launched with this context will expect
|
||||||
/// to receive messages of this type.
|
/// to receive messages of this type.
|
||||||
type Message: Send;
|
type Message: Send;
|
||||||
|
|
||||||
|
/// The message sender type of this context. Clones of the sender should be used sparingly.
|
||||||
|
type Sender: SubsystemSender;
|
||||||
|
|
||||||
/// Try to asynchronously receive a message.
|
/// Try to asynchronously receive a message.
|
||||||
///
|
///
|
||||||
/// This has to be used with caution, if you loop over this without
|
/// This has to be used with caution, if you loop over this without
|
||||||
@@ -241,12 +265,34 @@ pub trait SubsystemContext: Send + 'static {
|
|||||||
s: Pin<Box<dyn Future<Output = ()> + Send>>,
|
s: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||||
) -> SubsystemResult<()>;
|
) -> SubsystemResult<()>;
|
||||||
|
|
||||||
|
/// Get a mutable reference to the sender.
|
||||||
|
fn sender(&mut self) -> &mut Self::Sender;
|
||||||
|
|
||||||
/// Send a direct message to some other `Subsystem`, routed based on message type.
|
/// Send a direct message to some other `Subsystem`, routed based on message type.
|
||||||
async fn send_message(&mut self, msg: AllMessages);
|
async fn send_message(&mut self, msg: AllMessages) {
|
||||||
|
self.sender().send_message(msg).await
|
||||||
|
}
|
||||||
|
|
||||||
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
|
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
|
||||||
async fn send_messages<T>(&mut self, msgs: T)
|
async fn send_messages<T>(&mut self, msgs: T)
|
||||||
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send;
|
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
|
||||||
|
{
|
||||||
|
self.sender().send_messages(msgs).await
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message
|
||||||
|
/// type.
|
||||||
|
///
|
||||||
|
/// This function should be used only when there is some other bounding factor on the messages
|
||||||
|
/// sent with it. Otherwise, it risks a memory leak.
|
||||||
|
///
|
||||||
|
/// Generally, for this method to be used, these conditions should be met:
|
||||||
|
/// * There is a communication cycle between subsystems
|
||||||
|
/// * One of the parts of the cycle has a clear bound on the number of messages produced.
|
||||||
|
fn send_unbounded_message(&mut self, msg: AllMessages) {
|
||||||
|
self.sender().send_unbounded_message(msg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
|
/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
|
||||||
|
|||||||
Reference in New Issue
Block a user