dyn overseer channel capacity (#5454)

* allow runtime adjustment of signal channel size

Closes #5436
This commit is contained in:
Bernhard Schuster
2022-05-05 17:22:17 +02:00
committed by GitHub
parent 2e1a3441f9
commit abf882c591
8 changed files with 104 additions and 11 deletions
@@ -150,6 +150,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,
channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
/// Specify the the initialization function for a subsystem
@@ -171,6 +174,10 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,
channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
@@ -207,6 +214,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,
channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
@@ -254,6 +264,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#to_keep_baggage_name: self. #to_keep_baggage_name,
)*
spawner: self.spawner,
channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
@@ -272,6 +285,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#to_keep_baggage_name: self. #to_keep_baggage_name,
)*
spawner: self.spawner,
channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
@@ -359,13 +375,12 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
pub fn handle(&self) -> #handle {
self.handle.clone()
}
}
impl ::std::default::Default for #connector {
fn default() -> Self {
/// Create a new connector with non-default event channel capacity.
pub fn with_event_capacity(event_channel_capacity: usize) -> Self {
let (events_tx, events_rx) = #support_crate ::metered::channel::<
#event
>(SIGNAL_CHANNEL_CAPACITY);
>(event_channel_capacity);
Self {
handle: events_tx,
@@ -373,6 +388,12 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
}
}
}
impl ::std::default::Default for #connector {
fn default() -> Self {
Self::with_event_capacity(SIGNAL_CHANNEL_CAPACITY)
}
}
});
ts.extend(quote!{
@@ -385,6 +406,11 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: #baggage_passthrough_state_generics,
)*
spawner: InitStateSpawner,
// user provided runtime overrides,
// if `None`, the `overlord(message_capacity=123,..)` is used
// or the default value.
channel_capacity: Option<usize>,
signal_capacity: Option<usize>,
}
});
@@ -406,6 +432,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#field_name: Missing::<#field_type>::default(),
)*
spawner: Missing::<S>::default(),
channel_capacity: None,
signal_capacity: None,
}
}
}
@@ -419,18 +448,48 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#spawner_where_clause
{
/// The `spawner` to use for spawning tasks.
pub fn spawner(self, spawner: S) -> #builder<Init<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
pub fn spawner(self, spawner: S) -> #builder<
Init<S>,
#( #subsystem_passthrough_state_generics, )*
#( #baggage_passthrough_state_generics, )*
>
{
#builder {
#(
#field_name: self. #field_name,
)*
spawner: Init::<S>::Value(spawner),
channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
});
// message and signal channel capacity
ts.extend(quote! {
impl<S, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
#builder<Init<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
where
#spawner_where_clause,
{
/// Set the interconnecting signal channel capacity.
pub fn signal_channel_capacity(mut self, capacity: usize) -> Self
{
self.signal_capacity = Some(capacity);
self
}
/// Set the interconnecting message channel capacities.
pub fn message_channel_capacity(mut self, capacity: usize) -> Self
{
self.channel_capacity = Some(capacity);
self
}
}
});
ts.extend(quote! {
/// Type used to represent a builder where all fields are initialized and the overseer could be constructed.
pub type #initialized_builder<#initialized_builder_generics> = #builder<Init<S>, #( Init<#field_type>, )*>;
@@ -446,7 +505,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
/// Complete the construction and create the overseer type.
pub fn build(self)
-> ::std::result::Result<(#overseer_name<S, #( #baggage_generic_ty, )*>, #handle), #error_ty> {
let connector = #connector ::default();
let connector = #connector ::with_event_capacity(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
);
self.build_with_connector(connector)
}
@@ -470,7 +531,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
=
#support_crate ::metered::channel::<
MessagePacket< #consumes >
>(CHANNEL_CAPACITY);
>(
self.channel_capacity.unwrap_or(CHANNEL_CAPACITY)
);
)*
#(
@@ -510,7 +573,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
let message_rx: SubsystemIncomingMessages< #consumes > = #support_crate ::select(
#channel_name_rx, #channel_name_unbounded_rx
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(SIGNAL_CHANNEL_CAPACITY);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
);
// Generate subsystem name based on overseer field name.
let subsystem_string = String::from(stringify!(#subsystem_name));