Integrate ChainApi with all messages (#1533)

* propagate chain_api subsystem through various locations

* add ChainApi to AllMessages

Co-authored-by: Peter Goodspeed-Niklaus <peter.r.goodspeedniklaus@gmail.com>
This commit is contained in:
Andronik Ordian
2020-08-04 14:32:42 +02:00
committed by GitHub
parent 5a4bca765e
commit 07a9d2de3f
5 changed files with 43 additions and 8 deletions
+3 -3
View File
@@ -142,9 +142,9 @@ mod tests {
Header { Header {
parent_hash: Hash::zero(), parent_hash: Hash::zero(),
number: 100500, number: 100500,
state_root: Hash::zero(), state_root: Hash::zero(),
extrinsics_root: Hash::zero(), extrinsics_root: Hash::zero(),
digest: Default::default(), digest: Default::default(),
} }
} }
@@ -154,6 +154,7 @@ fn main() {
runtime_api: DummySubsystem, runtime_api: DummySubsystem,
availability_store: DummySubsystem, availability_store: DummySubsystem,
network_bridge: DummySubsystem, network_bridge: DummySubsystem,
chain_api: DummySubsystem,
}; };
let (overseer, _handler) = Overseer::new( let (overseer, _handler) = Overseer::new(
vec![], vec![],
+35 -4
View File
@@ -76,7 +76,7 @@ use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_subsystem::messages::{ use polkadot_subsystem::messages::{
CandidateValidationMessage, CandidateBackingMessage, CandidateValidationMessage, CandidateBackingMessage,
CandidateSelectionMessage, StatementDistributionMessage, CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage,
AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage, AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages,
@@ -360,6 +360,8 @@ pub struct Overseer<S: SpawnNamed> {
/// A network bridge subsystem. /// A network bridge subsystem.
network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>, network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>,
/// A Chain API subsystem
chain_api_subsystem: OverseenSubsystem<ChainApiMessage>,
/// Spawner to spawn tasks to. /// Spawner to spawn tasks to.
s: S, s: S,
@@ -393,7 +395,7 @@ pub struct Overseer<S: SpawnNamed> {
/// ///
/// [`Subsystem`]: trait.Subsystem.html /// [`Subsystem`]: trait.Subsystem.html
/// [`DummySubsystem`]: struct.DummySubsystem.html /// [`DummySubsystem`]: struct.DummySubsystem.html
pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB> { pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA> {
/// A candidate validation subsystem. /// A candidate validation subsystem.
pub candidate_validation: CV, pub candidate_validation: CV,
/// A candidate backing subsystem. /// A candidate backing subsystem.
@@ -418,6 +420,8 @@ pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB> {
pub availability_store: AS, pub availability_store: AS,
/// A network bridge subsystem. /// A network bridge subsystem.
pub network_bridge: NB, pub network_bridge: NB,
/// A Chain API subsystem.
pub chain_api: CA,
} }
impl<S> Overseer<S> impl<S> Overseer<S>
@@ -499,6 +503,7 @@ where
/// runtime_api: DummySubsystem, /// runtime_api: DummySubsystem,
/// availability_store: DummySubsystem, /// availability_store: DummySubsystem,
/// network_bridge: DummySubsystem, /// network_bridge: DummySubsystem,
/// chain_api: DummySubsystem,
/// }; /// };
/// let (overseer, _handler) = Overseer::new( /// let (overseer, _handler) = Overseer::new(
/// vec![], /// vec![],
@@ -519,9 +524,9 @@ where
/// # /// #
/// # }); } /// # }); }
/// ``` /// ```
pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB>( pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>(
leaves: impl IntoIterator<Item = BlockInfo>, leaves: impl IntoIterator<Item = BlockInfo>,
all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB>, all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>,
mut s: S, mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> ) -> SubsystemResult<(Self, OverseerHandler)>
where where
@@ -537,6 +542,7 @@ where
RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>> + Send, RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>> + Send,
AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send, AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send,
NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send, NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send,
CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>> + Send,
{ {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
@@ -631,6 +637,13 @@ where
all_subsystems.network_bridge, all_subsystems.network_bridge,
)?; )?;
let chain_api_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.chain_api,
)?;
let active_leaves = HashSet::new(); let active_leaves = HashSet::new();
let leaves = leaves let leaves = leaves
@@ -651,6 +664,7 @@ where
runtime_api_subsystem, runtime_api_subsystem,
availability_store_subsystem, availability_store_subsystem,
network_bridge_subsystem, network_bridge_subsystem,
chain_api_subsystem,
s, s,
running_subsystems, running_subsystems,
running_subsystems_rx, running_subsystems_rx,
@@ -708,6 +722,10 @@ where
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
} }
if let Some(ref mut s) = self.chain_api_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
loop { loop {
@@ -855,6 +873,10 @@ where
} }
if let Some(ref mut s) = self.network_bridge_subsystem.instance { if let Some(ref mut s) = self.network_bridge_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.chain_api_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal)).await?; s.tx.send(FromOverseer::Signal(signal)).await?;
} }
@@ -923,6 +945,11 @@ where
let _ = s.tx.send(FromOverseer::Communication { msg }).await; let _ = s.tx.send(FromOverseer::Communication { msg }).await;
} }
} }
AllMessages::ChainApi(msg) => {
if let Some(ref mut s) = self.chain_api_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
} }
} }
@@ -1084,6 +1111,7 @@ mod tests {
runtime_api: DummySubsystem, runtime_api: DummySubsystem,
availability_store: DummySubsystem, availability_store: DummySubsystem,
network_bridge: DummySubsystem, network_bridge: DummySubsystem,
chain_api: DummySubsystem,
}; };
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
vec![], vec![],
@@ -1147,6 +1175,7 @@ mod tests {
runtime_api: DummySubsystem, runtime_api: DummySubsystem,
availability_store: DummySubsystem, availability_store: DummySubsystem,
network_bridge: DummySubsystem, network_bridge: DummySubsystem,
chain_api: DummySubsystem,
}; };
let (overseer, _handle) = Overseer::new( let (overseer, _handle) = Overseer::new(
vec![], vec![],
@@ -1263,6 +1292,7 @@ mod tests {
runtime_api: DummySubsystem, runtime_api: DummySubsystem,
availability_store: DummySubsystem, availability_store: DummySubsystem,
network_bridge: DummySubsystem, network_bridge: DummySubsystem,
chain_api: DummySubsystem,
}; };
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
vec![first_block], vec![first_block],
@@ -1364,6 +1394,7 @@ mod tests {
runtime_api: DummySubsystem, runtime_api: DummySubsystem,
availability_store: DummySubsystem, availability_store: DummySubsystem,
network_bridge: DummySubsystem, network_bridge: DummySubsystem,
chain_api: DummySubsystem,
}; };
// start with two forks of different height. // start with two forks of different height.
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
+1
View File
@@ -287,6 +287,7 @@ fn real_overseer<S: SpawnNamed>(
runtime_api: DummySubsystem, runtime_api: DummySubsystem,
availability_store: DummySubsystem, availability_store: DummySubsystem,
network_bridge: DummySubsystem, network_bridge: DummySubsystem,
chain_api: DummySubsystem,
}; };
Overseer::new( Overseer::new(
leaves, leaves,
+3 -1
View File
@@ -310,7 +310,7 @@ pub enum ChainApiMessage {
hash: Hash, hash: Hash,
/// The number of ancestors to request. /// The number of ancestors to request.
k: usize, k: usize,
/// The response channel. /// The response channel.
response_channel: ChainApiResponseChannel<Vec<Hash>>, response_channel: ChainApiResponseChannel<Vec<Hash>>,
}, },
} }
@@ -488,6 +488,8 @@ pub enum AllMessages {
CandidateBacking(CandidateBackingMessage), CandidateBacking(CandidateBackingMessage),
/// Message for the candidate selection subsystem. /// Message for the candidate selection subsystem.
CandidateSelection(CandidateSelectionMessage), CandidateSelection(CandidateSelectionMessage),
/// Message for the Chain API subsystem.
ChainApi(ChainApiMessage),
/// Message for the statement distribution subsystem. /// Message for the statement distribution subsystem.
StatementDistribution(StatementDistributionMessage), StatementDistribution(StatementDistributionMessage),
/// Message for the availability distribution subsystem. /// Message for the availability distribution subsystem.