Merge remote-tracking branch 'origin/master' into update-scheduler

This commit is contained in:
Bryan Chen
2020-07-01 20:37:47 +12:00
28 changed files with 2090 additions and 584 deletions
+395 -216
View File
File diff suppressed because it is too large Load Diff
+3 -1
View File
@@ -42,10 +42,12 @@ members = [
"service",
"validation",
"node/messages",
"node/network/bridge",
"node/overseer",
"node/primitives",
"node/service",
"node/subsystem",
"node/test-helpers/subsystem",
"parachain/test-parachains",
"parachain/test-parachains/adder",
+2 -2
View File
@@ -46,8 +46,8 @@ async fn start_inner(chain_spec: String, log_level: String) -> Result<Client, Bo
info!("👤 Role: {}", config.display_role());
// Create the service. This is the most heavy initialization step.
let service = service::kusama_new_light(config)
let (task_manager, rpc_handlers) = service::kusama_new_light(config)
.map_err(|e| format!("{:?}", e))?;
Ok(browser_utils::start_client(service))
Ok(browser_utils::start_client(task_manager, rpc_handlers))
}
+47 -50
View File
@@ -19,8 +19,7 @@ use log::info;
use service::{IdentifyVariant, self};
#[cfg(feature = "service-rewr")]
use service_new::{IdentifyVariant, self as service};
use sc_executor::NativeExecutionDispatch;
use sc_cli::{SubstrateCli, Result};
use sc_cli::{SubstrateCli, Result, RuntimeVersion, Role};
use crate::cli::{Cli, Subcommand};
fn get_exec_name() -> Option<String> {
@@ -75,6 +74,16 @@ impl SubstrateCli for Cli {
path => Box::new(service::PolkadotChainSpec::from_json_file(std::path::PathBuf::from(path))?),
})
}
fn native_runtime_version(spec: &Box<dyn service::ChainSpec>) -> &'static RuntimeVersion {
if spec.is_kusama() {
&service::kusama_runtime::VERSION
} else if spec.is_westend() {
&service::westend_runtime::VERSION
} else {
&service::polkadot_runtime::VERSION
}
}
}
/// Parses polkadot specific CLI arguments and run the service.
@@ -116,56 +125,44 @@ pub fn run() -> Result<()> {
info!(" KUSAMA FOUNDATION ");
info!("----------------------------");
runtime.run_node(
|config| {
service::kusama_new_light(config)
},
|config| {
service::kusama_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(s, _, _)| s)
},
service::KusamaExecutor::native_version().runtime_version
)
runtime.run_node_until_exit(|config| match config.role {
Role::Light => service::kusama_new_light(config)
.map(|(components, _)| components),
_ => service::kusama_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(components, _, _)| components)
})
} else if chain_spec.is_westend() {
runtime.run_node(
|config| {
service::westend_new_light(config)
},
|config| {
service::westend_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(s, _, _)| s)
},
service::WestendExecutor::native_version().runtime_version
)
runtime.run_node_until_exit(|config| match config.role {
Role::Light => service::westend_new_light(config)
.map(|(components, _)| components),
_ => service::westend_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(components, _, _)| components)
})
} else {
runtime.run_node(
|config| {
service::polkadot_new_light(config)
},
|config| {
service::polkadot_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(s, _, _)| s)
},
service::PolkadotExecutor::native_version().runtime_version
)
runtime.run_node_until_exit(|config| match config.role {
Role::Light => service::polkadot_new_light(config)
.map(|(components, _)| components),
_ => service::polkadot_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(components, _, _)| components)
})
}
},
Some(Subcommand::Base(subcommand)) => {
+2 -2
View File
@@ -28,14 +28,14 @@ mod command;
#[cfg(not(feature = "service-rewr"))]
pub use service::{
AbstractService, ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
Block, self, RuntimeApiCollection, TFullClient
};
#[cfg(feature = "service-rewr")]
pub use service_new::{
self as service,
AbstractService, ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
Block, self, RuntimeApiCollection, TFullClient
};
+12 -12
View File
@@ -63,7 +63,7 @@ use polkadot_primitives::{
}
};
use polkadot_cli::{
ProvideRuntimeApi, AbstractService, ParachainHost, IdentifyVariant,
ProvideRuntimeApi, ParachainHost, IdentifyVariant,
service::{self, Role}
};
pub use polkadot_cli::service::Configuration;
@@ -81,6 +81,7 @@ use polkadot_service_new::{
self as polkadot_service,
Error as ServiceError, FullNodeHandles, PolkadotClient,
};
use sc_service::SpawnTaskHandle;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
@@ -236,8 +237,8 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
#[cfg(not(feature = "service-rewr"))]
fn build_collator_service<SP, P, C, R, Extrinsic>(
spawner: SP,
fn build_collator_service<P, C, R, Extrinsic>(
spawner: SpawnTaskHandle,
handles: FullNodeHandles,
client: Arc<C>,
para_id: ParaId,
@@ -265,7 +266,6 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
Extrinsic: service::Codec + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static,
{
let polkadot_network = handles.polkadot_network
.ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;
@@ -278,7 +278,7 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
let parachain_context = match build_parachain_context.build(
client.clone(),
spawner,
spawner.clone(),
polkadot_network.clone(),
) {
Ok(ctx) => ctx,
@@ -359,7 +359,7 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
let future = silenced.map(drop);
tokio::spawn(future);
spawner.spawn("collation-work", future);
}
}.boxed();
@@ -386,7 +386,7 @@ where
}
if config.chain_spec.is_kusama() {
let (service, client, handlers) = service::kusama_new_full(
let (task_manager, client, handlers) = service::kusama_new_full(
config,
Some((key.public(), para_id)),
None,
@@ -394,7 +394,7 @@ where
6000,
None,
)?;
let spawn_handle = service.spawn_task_handle();
let spawn_handle = task_manager.spawn_handle();
build_collator_service(
spawn_handle,
handlers,
@@ -404,7 +404,7 @@ where
build_parachain_context
)?.await;
} else if config.chain_spec.is_westend() {
let (service, client, handlers) = service::westend_new_full(
let (task_manager, client, handlers) = service::westend_new_full(
config,
Some((key.public(), para_id)),
None,
@@ -412,7 +412,7 @@ where
6000,
None,
)?;
let spawn_handle = service.spawn_task_handle();
let spawn_handle = task_manager.spawn_handle();
build_collator_service(
spawn_handle,
handlers,
@@ -422,7 +422,7 @@ where
build_parachain_context
)?.await;
} else {
let (service, client, handles) = service::polkadot_new_full(
let (task_manager, client, handles) = service::polkadot_new_full(
config,
Some((key.public(), para_id)),
None,
@@ -430,7 +430,7 @@ where
6000,
None,
)?;
let spawn_handle = service.spawn_task_handle();
let spawn_handle = task_manager.spawn_handle();
build_collator_service(
spawn_handle,
handles,
+4 -4
View File
@@ -60,7 +60,7 @@ fn import_single_good_block_works() {
let mut expected_aux = ImportedAux::default();
expected_aux.is_new_best = true;
match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org))
if *num == number as u32 && *aux == expected_aux && *org == Some(peer_id) => {}
r @ _ => panic!("{:?}", r)
@@ -70,7 +70,7 @@ fn import_single_good_block_works() {
#[test]
fn import_single_good_known_block_is_ignored() {
let (mut client, _hash, number, _, block) = prepare_good_block();
match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number as u32 => {}
_ => panic!()
}
@@ -80,7 +80,7 @@ fn import_single_good_known_block_is_ignored() {
fn import_single_good_block_without_header_fails() {
let (_, _, _, peer_id, mut block) = prepare_good_block();
block.header = None;
match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {}
_ => panic!()
}
@@ -91,7 +91,7 @@ fn async_import_queue_drops() {
let executor = sp_core::testing::SpawnBlockingExecutor::new();
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);
let verifier = PassThroughVerifier::new(true);
let queue = BasicQueue::new(
verifier,
+3 -3
View File
@@ -600,7 +600,7 @@ pub trait TestNetFactory: Sized {
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
}).unwrap();
@@ -677,7 +677,7 @@ pub trait TestNetFactory: Sized {
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
}).unwrap();
@@ -804,7 +804,7 @@ impl TestNetFactory for TestNet {
fn make_verifier(&self, _client: PeersClient, _config: &ProtocolConfig, _peer_data: &())
-> Self::Verifier
{
PassThroughVerifier(false)
PassThroughVerifier::new(false)
}
fn peer(&mut self, i: usize) -> &mut Peer<()> {
+22
View File
@@ -0,0 +1,22 @@
[package]
name = "polkadot-network-bridge"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../../primitives" }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
parity-scale-codec = "1.3.0"
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
+904
View File
@@ -0,0 +1,904 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.
use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use sc_network::{
ObservedRole, ReputationChange, PeerId,
Event as NetworkEvent,
};
use sp_runtime::ConsensusEngineId;
use polkadot_subsystem::{
FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult,
};
use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages};
use node_primitives::{ProtocolId, View};
use polkadot_primitives::{Block, Hash};
use std::collections::btree_map::{BTreeMap, Entry as BEntry};
use std::collections::hash_map::{HashMap, Entry as HEntry};
use std::pin::Pin;
use std::sync::Arc;
/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;
/// The engine ID of the polkadot network protocol.
pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2";
/// The protocol name.
pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2";
const MALFORMED_MESSAGE_COST: ReputationChange
= ReputationChange::new(-500, "Malformed Network-bridge message");
const UNKNOWN_PROTO_COST: ReputationChange
= ReputationChange::new(-50, "Message sent to unknown protocol");
const MALFORMED_VIEW_COST: ReputationChange
= ReputationChange::new(-500, "Malformed view");
/// Messages received on the network.
#[derive(Debug, Encode, Decode, Clone)]
pub enum WireMessage {
/// A message from a peer on a specific protocol.
#[codec(index = "1")]
ProtocolMessage(ProtocolId, Vec<u8>),
/// A view update from a peer.
#[codec(index = "2")]
ViewUpdate(View),
}
/// Information about the notifications protocol. Should be used during network configuration
/// or shortly after startup to register the protocol with the network service.
pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'static, [u8]>) {
(POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME.into())
}
/// An action to be carried out by the network.
#[derive(PartialEq)]
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, ReputationChange),
/// Write a notification to a given peer.
WriteNotification(PeerId, Vec<u8>),
}
/// An abstraction over networking for the purposes of this subsystem.
pub trait Network: Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID).
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
/// Get access to an underlying sink for all network actions.
fn action_sink<'a>(&'a mut self) -> Pin<
Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>
>;
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange)
-> BoxFuture<SubsystemResult<()>>
{
async move {
self.action_sink().send(NetworkAction::ReputationChange(who, cost_benefit)).await
}.boxed()
}
/// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic.
fn write_notification(&mut self, who: PeerId, message: Vec<u8>)
-> BoxFuture<SubsystemResult<()>>
{
async move {
self.action_sink().send(NetworkAction::WriteNotification(who, message)).await
}.boxed()
}
}
impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}
fn action_sink<'a>(&'a mut self)
-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
{
use futures::task::{Poll, Context};
// wrapper around a NetworkService to make it act like a sink.
struct ActionSink<'b>(&'b sc_network::NetworkService<Block, Hash>);
impl<'b> Sink<NetworkAction> for ActionSink<'b> {
type Error = SubsystemError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
match action {
NetworkAction::ReputationChange(peer, cost_benefit) => self.0.report_peer(
peer,
cost_benefit,
),
NetworkAction::WriteNotification(peer, message) => self.0.write_notification(
peer,
POLKADOT_ENGINE_ID,
message,
),
}
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
}
Box::pin(ActionSink(&**self))
}
}
/// The network bridge subsystem.
pub struct NetworkBridge<N>(N);
impl<N> NetworkBridge<N> {
/// Create a new network bridge subsystem with underlying network service.
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
pub fn new(net_service: N) -> Self {
NetworkBridge(net_service)
}
}
impl<Net, Context> Subsystem<Context> for NetworkBridge<Net>
where
Net: Network,
Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
SpawnedSubsystem(run_network(self.0, ctx).map(|_| ()).boxed())
}
}
struct PeerData {
/// Latest view sent by the peer.
view: View,
/// The role of the peer.
role: ObservedRole,
}
#[derive(Debug)]
enum Action {
RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
ReportPeer(PeerId, ReputationChange),
StartWork(Hash),
StopWork(Hash),
PeerConnected(PeerId, ObservedRole),
PeerDisconnected(PeerId),
PeerMessages(PeerId, Vec<WireMessage>),
Abort,
}
fn action_from_overseer_message(
res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
) -> Action {
match res {
Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)))
=> Action::StartWork(relay_parent),
Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)))
=> Action::StopWork(relay_parent),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer)
=> Action::RegisterEventProducer(protocol_id, message_producer),
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
NetworkBridgeMessage::SendMessage(peers, protocol, message)
=> Action::SendMessage(peers, protocol, message),
},
Err(e) => {
log::warn!("Shutting down Network Bridge due to error {:?}", e);
Action::Abort
}
}
}
fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> {
match event {
None => {
log::info!("Shutting down Network Bridge: underlying event stream concluded");
Some(Action::Abort)
}
Some(NetworkEvent::Dht(_)) => None,
Some(NetworkEvent::NotificationStreamOpened { remote, engine_id, role }) => {
if engine_id == POLKADOT_ENGINE_ID {
Some(Action::PeerConnected(remote, role))
} else {
None
}
}
Some(NetworkEvent::NotificationStreamClosed { remote, engine_id }) => {
if engine_id == POLKADOT_ENGINE_ID {
Some(Action::PeerDisconnected(remote))
} else {
None
}
}
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let v: Result<Vec<_>, _> = messages.iter()
.filter(|(engine_id, _)| engine_id == &POLKADOT_ENGINE_ID)
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();
match v {
Err(_) => Some(Action::ReportPeer(remote, MALFORMED_MESSAGE_COST)),
Ok(v) => if v.is_empty() {
None
} else {
Some(Action::PeerMessages(remote, v))
}
}
}
}
}
fn construct_view(live_heads: &[Hash]) -> View {
View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect())
}
async fn dispatch_update_to_all(
update: NetworkBridgeEvent,
event_producers: impl IntoIterator<Item=&fn(NetworkBridgeEvent) -> AllMessages>,
ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> polkadot_subsystem::SubsystemResult<()> {
// collect messages here to avoid the borrow lasting across await boundary.
let messages: Vec<_> = event_producers.into_iter()
.map(|producer| producer(update.clone()))
.collect();
ctx.send_messages(messages).await
}
async fn update_view(
peers: &HashMap<PeerId, PeerData>,
live_heads: &[Hash],
net: &mut impl Network,
local_view: &mut View,
) -> SubsystemResult<Option<NetworkBridgeEvent>> {
let new_view = construct_view(live_heads);
if *local_view == new_view { return Ok(None) }
*local_view = new_view.clone();
let message = WireMessage::ViewUpdate(new_view.clone()).encode();
let notifications = peers.keys().cloned()
.map(move |peer| Ok(NetworkAction::WriteNotification(peer, message.clone())));
net.action_sink().send_all(&mut stream::iter(notifications)).await?;
Ok(Some(NetworkBridgeEvent::OurViewChange(local_view.clone())))
}
async fn run_network<N: Network>(
mut net: N,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()> {
let mut event_stream = net.event_stream().fuse();
// Most recent heads are at the back.
let mut live_heads = Vec::with_capacity(MAX_VIEW_HEADS);
let mut local_view = View(Vec::new());
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut event_producers = BTreeMap::new();
loop {
let action = {
let subsystem_next = ctx.recv().fuse();
let mut net_event_next = event_stream.next().fuse();
futures::pin_mut!(subsystem_next);
let action = futures::select! {
subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)),
net_event = net_event_next => action_from_network_message(net_event),
};
match action {
Some(a) => a,
None => continue,
}
};
match action {
Action::RegisterEventProducer(protocol_id, event_producer) => {
// insert only if none present.
if let BEntry::Vacant(entry) = event_producers.entry(protocol_id) {
let event_producer = entry.insert(event_producer);
// send the event producer information on all connected peers.
let mut messages = Vec::with_capacity(peers.len() * 2);
for (peer, data) in &peers {
messages.push(event_producer(
NetworkBridgeEvent::PeerConnected(peer.clone(), data.role.clone())
));
messages.push(event_producer(
NetworkBridgeEvent::PeerViewChange(peer.clone(), data.view.clone())
));
}
ctx.send_messages(messages).await?;
}
}
Action::SendMessage(peers, protocol, message) => {
let mut message_producer = stream::iter({
let n_peers = peers.len();
let mut message = Some(
WireMessage::ProtocolMessage(protocol, message).encode()
);
peers.iter().cloned().enumerate().map(move |(i, peer)| {
// optimization: avoid cloning the message for the last peer in the
// list. The message payload can be quite large. If the underlying
// network used `Bytes` this would not be necessary.
let message = if i == n_peers - 1 {
message.take()
.expect("Only taken in last iteration of loop, never afterwards; qed")
} else {
message.as_ref()
.expect("Only taken in last iteration of loop, we are not there yet; qed")
.clone()
};
Ok(NetworkAction::WriteNotification(peer, message))
})
});
net.action_sink().send_all(&mut message_producer).await?;
}
Action::ReportPeer(peer, rep) => {
net.report_peer(peer, rep).await?;
}
Action::StartWork(relay_parent) => {
live_heads.push(relay_parent);
if let Some(view_update)
= update_view(&peers, &live_heads, &mut net, &mut local_view).await?
{
if let Err(e) = dispatch_update_to_all(
view_update,
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
}
}
Action::StopWork(relay_parent) => {
live_heads.retain(|h| h != &relay_parent);
if let Some(view_update)
= update_view(&peers, &live_heads, &mut net, &mut local_view).await?
{
if let Err(e) = dispatch_update_to_all(
view_update,
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
}
}
Action::PeerConnected(peer, role) => {
match peers.entry(peer.clone()) {
HEntry::Occupied(_) => continue,
HEntry::Vacant(vacant) => {
vacant.insert(PeerData {
view: View(Vec::new()),
role: role.clone(),
});
if let Err(e) = dispatch_update_to_all(
NetworkBridgeEvent::PeerConnected(peer, role),
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
}
}
}
Action::PeerDisconnected(peer) => {
if peers.remove(&peer).is_some() {
if let Err(e) = dispatch_update_to_all(
NetworkBridgeEvent::PeerDisconnected(peer),
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
}
},
Action::PeerMessages(peer, messages) => {
let peer_data = match peers.get_mut(&peer) {
None => continue,
Some(d) => d,
};
let mut outgoing_messages = Vec::with_capacity(messages.len());
for message in messages {
match message {
WireMessage::ViewUpdate(new_view) => {
if new_view.0.len() > MAX_VIEW_HEADS {
net.report_peer(
peer.clone(),
MALFORMED_VIEW_COST,
).await?;
continue
}
if new_view == peer_data.view { continue }
peer_data.view = new_view;
let update = NetworkBridgeEvent::PeerViewChange(
peer.clone(),
peer_data.view.clone(),
);
outgoing_messages.extend(
event_producers.values().map(|producer| producer(update.clone()))
);
}
WireMessage::ProtocolMessage(protocol, message) => {
let message = match event_producers.get(&protocol) {
Some(producer) => Some(producer(
NetworkBridgeEvent::PeerMessage(peer.clone(), message)
)),
None => {
net.report_peer(
peer.clone(),
UNKNOWN_PROTO_COST,
).await?;
None
}
};
if let Some(message) = message {
outgoing_messages.push(message);
}
}
}
}
let send_messages = ctx.send_messages(outgoing_messages);
if let Err(e) = send_messages.await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
},
Action::Abort => return Ok(()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::channel::mpsc;
use futures::executor::{self, ThreadPool};
use std::sync::Arc;
use parking_lot::Mutex;
use assert_matches::assert_matches;
use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage};
use subsystem_test::{SingleItemSink, SingleItemStream};
// The subsystem's view of the network - only supports a single call to `event_stream`.
struct TestNetwork {
net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
action_tx: mpsc::UnboundedSender<NetworkAction>,
}
// The test's view of the network. This receives updates from the subsystem in the form
// of `NetworkAction`s.
struct TestNetworkHandle {
action_rx: mpsc::UnboundedReceiver<NetworkAction>,
net_tx: SingleItemSink<NetworkEvent>,
}
fn new_test_network() -> (
TestNetwork,
TestNetworkHandle,
) {
let (net_tx, net_rx) = subsystem_test::single_item_sink();
let (action_tx, action_rx) = mpsc::unbounded();
(
TestNetwork {
net_events: Arc::new(Mutex::new(Some(net_rx))),
action_tx,
},
TestNetworkHandle {
action_rx,
net_tx,
},
)
}
impl Network for TestNetwork {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
self.net_events.lock()
.take()
.expect("Subsystem made more than one call to `event_stream`")
.boxed()
}
fn action_sink<'a>(&'a mut self)
-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
{
Box::pin((&mut self.action_tx).sink_map_err(Into::into))
}
}
impl TestNetworkHandle {
// Get the next network action.
async fn next_network_action(&mut self) -> NetworkAction {
self.action_rx.next().await.expect("subsystem concluded early")
}
// Wait for the next N network actions.
async fn next_network_actions(&mut self, n: usize) -> Vec<NetworkAction> {
let mut v = Vec::with_capacity(n);
for _ in 0..n {
v.push(self.next_network_action().await);
}
v
}
async fn connect_peer(&mut self, peer: PeerId, role: ObservedRole) {
self.send_network_event(NetworkEvent::NotificationStreamOpened {
remote: peer,
engine_id: POLKADOT_ENGINE_ID,
role,
}).await;
}
async fn disconnect_peer(&mut self, peer: PeerId) {
self.send_network_event(NetworkEvent::NotificationStreamClosed {
remote: peer,
engine_id: POLKADOT_ENGINE_ID,
}).await;
}
async fn peer_message(&mut self, peer: PeerId, message: Vec<u8>) {
self.send_network_event(NetworkEvent::NotificationsReceived {
remote: peer,
messages: vec![(POLKADOT_ENGINE_ID, message.into())],
}).await;
}
async fn send_network_event(&mut self, event: NetworkEvent) {
self.net_tx.send(event).await.expect("subsystem concluded early");
}
}
// network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so
// we need to use this to prevent fragile reliance on peer ordering.
fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool {
actions.iter().find(|&x| x == action).is_some()
}
struct TestHarness {
network_handle: TestNetworkHandle,
virtual_overseer: subsystem_test::TestSubsystemContextHandle<NetworkBridgeMessage>,
}
fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = ThreadPool::new().unwrap();
let (network, network_handle) = new_test_network();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool);
let network_bridge = run_network(
network,
context,
)
.map_err(|_| panic!("subsystem execution failed"))
.map(|_| ());
let test_fut = test(TestHarness {
network_handle,
virtual_overseer,
});
futures::pin_mut!(test_fut);
futures::pin_mut!(network_bridge);
executor::block_on(future::select(test_fut, network_bridge));
}
#[test]
fn sends_view_updates_to_peers() {
test_harness(|test_harness| async move {
let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;
let peer_a = PeerId::random();
let peer_b = PeerId::random();
network_handle.connect_peer(peer_a.clone(), ObservedRole::Full).await;
network_handle.connect_peer(peer_b.clone(), ObservedRole::Full).await;
let hash_a = Hash::from([1; 32]);
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))).await;
let actions = network_handle.next_network_actions(2).await;
let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode();
assert!(network_actions_contains(
&actions,
&NetworkAction::WriteNotification(peer_a, wire_message.clone()),
));
assert!(network_actions_contains(
&actions,
&NetworkAction::WriteNotification(peer_b, wire_message.clone()),
));
});
}
#[test]
fn peer_view_updates_sent_via_overseer() {
test_harness(|test_harness| async move {
let TestHarness {
mut network_handle,
mut virtual_overseer,
} = test_harness;
let peer = PeerId::random();
let proto_statement = *b"abcd";
let proto_bitfield = *b"wxyz";
network_handle.connect_peer(peer.clone(), ObservedRole::Full).await;
virtual_overseer.send(FromOverseer::Communication {
msg: NetworkBridgeMessage::RegisterEventProducer(
proto_statement,
|event| AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(event)
)
),
}).await;
virtual_overseer.send(FromOverseer::Communication {
msg: NetworkBridgeMessage::RegisterEventProducer(
proto_bitfield,
|event| AllMessages::BitfieldDistribution(
BitfieldDistributionMessage::NetworkBridgeUpdate(event)
)
),
}).await;
let view = View(vec![Hash::from([1u8; 32])]);
// bridge will inform about all previously-connected peers.
{
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full)
)
) if p == peer
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(p, v)
)
) if p == peer && v == View(Default::default())
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::BitfieldDistribution(
BitfieldDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full)
)
) if p == peer
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::BitfieldDistribution(
BitfieldDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(p, v)
)
) if p == peer && v == View(Default::default())
);
}
network_handle.peer_message(
peer.clone(),
WireMessage::ViewUpdate(view.clone()).encode(),
).await;
// statement distribution message comes first because handlers are ordered by
// protocol ID.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(p, v)
)
) => {
assert_eq!(p, peer);
assert_eq!(v, view);
}
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::BitfieldDistribution(
BitfieldDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(p, v)
)
) => {
assert_eq!(p, peer);
assert_eq!(v, view);
}
);
});
}
#[test]
fn peer_messages_sent_via_overseer() {
test_harness(|test_harness| async move {
let TestHarness {
mut network_handle,
mut virtual_overseer,
} = test_harness;
let peer = PeerId::random();
let proto_statement = *b"abcd";
let proto_bitfield = *b"wxyz";
network_handle.connect_peer(peer.clone(), ObservedRole::Full).await;
virtual_overseer.send(FromOverseer::Communication {
msg: NetworkBridgeMessage::RegisterEventProducer(
proto_statement,
|event| AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(event)
)
),
}).await;
virtual_overseer.send(FromOverseer::Communication {
msg: NetworkBridgeMessage::RegisterEventProducer(
proto_bitfield,
|event| AllMessages::BitfieldDistribution(
BitfieldDistributionMessage::NetworkBridgeUpdate(event)
)
),
}).await;
// bridge will inform about all previously-connected peers.
{
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full)
)
) if p == peer
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(p, v)
)
) if p == peer && v == View(Default::default())
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::BitfieldDistribution(
BitfieldDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full)
)
) if p == peer
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::BitfieldDistribution(
BitfieldDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(p, v)
)
) if p == peer && v == View(Default::default())
);
}
let payload = vec![1, 2, 3];
network_handle.peer_message(
peer.clone(),
WireMessage::ProtocolMessage(proto_statement, payload.clone()).encode(),
).await;
network_handle.disconnect_peer(peer.clone()).await;
// statement distribution message comes first because handlers are ordered by
// protocol ID, and then a disconnection event comes - indicating that the message
// was only sent to the correct protocol.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(p, m)
)
) => {
assert_eq!(p, peer);
assert_eq!(m, payload);
}
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerDisconnected(p)
)
) => {
assert_eq!(p, peer);
}
);
});
}
}
+2 -1
View File
@@ -11,7 +11,8 @@ futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../primitives" }
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
messages = { package = "polkadot-node-messages", path = "../messages" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
async-trait = "0.1"
[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
@@ -28,16 +28,17 @@ use futures_timer::Delay;
use kv_log_macro as log;
use polkadot_primitives::parachain::{BlockData, PoVBlock};
use polkadot_overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem};
use polkadot_overseer::Overseer;
use messages::{
AllMessages, CandidateBackingMessage, FromOverseer, CandidateValidationMessage
use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer};
use polkadot_subsystem::messages::{
AllMessages, CandidateBackingMessage, CandidateValidationMessage
};
struct Subsystem1;
impl Subsystem1 {
async fn run(mut ctx: SubsystemContext<CandidateBackingMessage>) {
async fn run(mut ctx: impl SubsystemContext<Message=CandidateBackingMessage>) {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
@@ -56,7 +57,7 @@ impl Subsystem1 {
Delay::new(Duration::from_secs(1)).await;
let (tx, _) = oneshot::channel();
ctx.send_msg(AllMessages::CandidateValidation(
ctx.send_message(AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
Default::default(),
@@ -70,8 +71,10 @@ impl Subsystem1 {
}
}
impl Subsystem<CandidateBackingMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for Subsystem1
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
@@ -81,7 +84,7 @@ impl Subsystem<CandidateBackingMessage> for Subsystem1 {
struct Subsystem2;
impl Subsystem2 {
async fn run(mut ctx: SubsystemContext<CandidateValidationMessage>) {
async fn run(mut ctx: impl SubsystemContext<Message=CandidateValidationMessage>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
@@ -105,8 +108,10 @@ impl Subsystem2 {
}
}
impl Subsystem<CandidateValidationMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for Subsystem2
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
@@ -124,8 +129,8 @@ fn main() {
let (overseer, _handler) = Overseer::new(
vec![],
Box::new(Subsystem2),
Box::new(Subsystem1),
Subsystem2,
Subsystem1,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
+90 -114
View File
@@ -65,8 +65,8 @@ use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll, select,
future::{BoxFuture, RemoteHandle},
stream::FuturesUnordered,
task::{Spawn, SpawnError, SpawnExt},
stream::{self, FuturesUnordered},
task::{Spawn, SpawnExt},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
@@ -75,50 +75,14 @@ use streamunordered::{StreamYield, StreamUnordered};
use polkadot_primitives::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
pub use messages::{
OverseerSignal, CandidateValidationMessage, CandidateBackingMessage, AllMessages,
FromOverseer,
use polkadot_subsystem::messages::{
CandidateValidationMessage, CandidateBackingMessage, AllMessages
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem,
};
/// An error type that describes faults that may happen
///
/// These are:
/// * Channels being closed
/// * Subsystems dying when they are not expected to
/// * Subsystems not dying when they are told to die
/// * etc.
#[derive(Debug)]
pub struct SubsystemError;
impl From<mpsc::SendError> for SubsystemError {
fn from(_: mpsc::SendError) -> Self {
Self
}
}
impl From<oneshot::Canceled> for SubsystemError {
fn from(_: oneshot::Canceled) -> Self {
Self
}
}
impl From<SpawnError> for SubsystemError {
fn from(_: SpawnError) -> Self {
Self
}
}
/// A `Result` type that wraps [`SubsystemError`].
///
/// [`SubsystemError`]: struct.SubsystemError.html
pub type SubsystemResult<T> = Result<T, SubsystemError>;
/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`].
///
/// In essence it's just a newtype wrapping a `BoxFuture`.
///
/// [`Overseer`]: struct.Overseer.html
pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>);
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
@@ -278,7 +242,7 @@ impl Debug for ToOverseer {
/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M: Debug> {
struct SubsystemInstance<M> {
tx: mpsc::Sender<FromOverseer<M>>,
}
@@ -289,17 +253,17 @@ struct SubsystemInstance<M: Debug> {
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
pub struct SubsystemContext<M: Debug>{
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<ToOverseer>,
}
impl<M: Debug> SubsystemContext<M> {
/// Try to asyncronously receive a message.
///
/// This has to be used with caution, if you loop over this without
/// using `pending!()` macro you will end up with a busy loop!
pub async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
#[async_trait::async_trait]
impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
type Message = M;
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
match poll!(self.rx.next()) {
Poll::Ready(Some(msg)) => Ok(Some(msg)),
Poll::Ready(None) => Err(()),
@@ -307,13 +271,11 @@ impl<M: Debug> SubsystemContext<M> {
}
}
/// Receive a message.
pub async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
self.rx.next().await.ok_or(SubsystemError)
}
/// Spawn a child task on the executor.
pub async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ToOverseer::SpawnJob {
s,
@@ -323,33 +285,25 @@ impl<M: Debug> SubsystemContext<M> {
rx.await?
}
/// Send a direct message to some other `Subsystem`, routed based on message type.
pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage(msg)).await?;
Ok(())
}
fn new(rx: mpsc::Receiver<FromOverseer<M>>, tx: mpsc::Sender<ToOverseer>) -> Self {
Self {
rx,
tx,
}
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
self.tx.send_all(&mut msgs).await?;
Ok(())
}
}
/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
///
/// It is generic over the message type circulating in the system.
/// The idea that we want some type contaning persistent state that
/// can spawn actually running subsystems when asked to.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
pub trait Subsystem<M: Debug> {
/// Start this `Subsystem` and return `SpawnedSubsystem`.
fn start(&mut self, ctx: SubsystemContext<M>) -> SpawnedSubsystem;
}
/// A subsystem compatible with the overseer - one which can be run in the context of the
/// overseer.
pub type CompatibleSubsystem<M> = Box<dyn Subsystem<OverseerSubsystemContext<M>> + Send>;
/// A subsystem that we oversee.
///
@@ -359,8 +313,7 @@ pub trait Subsystem<M: Debug> {
///
/// [`Subsystem`]: trait.Subsystem.html
#[allow(dead_code)]
struct OverseenSubsystem<M: Debug> {
subsystem: Box<dyn Subsystem<M> + Send>,
struct OverseenSubsystem<M> {
instance: Option<SubsystemInstance<M>>,
}
@@ -441,16 +394,20 @@ where
/// # use std::time::Duration;
/// # use futures::{executor, pin_mut, select, FutureExt};
/// # use futures_timer::Delay;
/// # use polkadot_overseer::{
/// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext,
/// # CandidateValidationMessage, CandidateBackingMessage,
/// # use polkadot_overseer::Overseer;
/// # use polkadot_subsystem::{
/// # Subsystem, SpawnedSubsystem, SubsystemContext,
/// # messages::{CandidateValidationMessage, CandidateBackingMessage},
/// # };
///
/// struct ValidationSubsystem;
/// impl Subsystem<CandidateValidationMessage> for ValidationSubsystem {
///
/// impl<C> Subsystem<C> for ValidationSubsystem
/// where C: SubsystemContext<Message=CandidateValidationMessage>
/// {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<CandidateValidationMessage>,
/// self,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
@@ -461,10 +418,12 @@ where
/// }
///
/// struct CandidateBackingSubsystem;
/// impl Subsystem<CandidateBackingMessage> for CandidateBackingSubsystem {
/// impl<C> Subsystem<C> for CandidateBackingSubsystem
/// where C: SubsystemContext<Message=CandidateBackingMessage>
/// {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<CandidateBackingMessage>,
/// self,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
@@ -478,8 +437,8 @@ where
/// let spawner = executor::ThreadPool::new().unwrap();
/// let (overseer, _handler) = Overseer::new(
/// vec![],
/// Box::new(ValidationSubsystem),
/// Box::new(CandidateBackingSubsystem),
/// ValidationSubsystem,
/// CandidateBackingSubsystem,
/// spawner,
/// ).unwrap();
///
@@ -498,8 +457,8 @@ where
/// ```
pub fn new(
leaves: impl IntoIterator<Item = BlockInfo>,
validation: Box<dyn Subsystem<CandidateValidationMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingMessage> + Send>,
validation: impl Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
candidate_backing: impl Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
@@ -680,6 +639,12 @@ where
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
_ => {
// TODO: temporary catch-all until all subsystems are integrated with overseer.
// The overseer is not complete until this is an exhaustive match with all
// messages targeting an included subsystem.
// https://github.com/paritytech/polkadot/issues/1317
}
}
}
@@ -688,15 +653,15 @@ where
}
}
fn spawn<S: Spawn, M: Debug>(
fn spawn<S: Spawn, M: Send + 'static>(
spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
mut s: Box<dyn Subsystem<M> + Send>,
s: impl Subsystem<OverseerSubsystemContext<M>>,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = SubsystemContext::new(to_rx, from_tx);
let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
let f = s.start(ctx);
let handle = spawner.spawn_with_handle(f.0)?;
@@ -709,7 +674,6 @@ fn spawn<S: Spawn, M: Debug>(
});
Ok(OverseenSubsystem {
subsystem: s,
instance,
})
}
@@ -723,9 +687,11 @@ mod tests {
struct TestSubsystem1(mpsc::Sender<usize>);
impl Subsystem<CandidateValidationMessage> for TestSubsystem1 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();
impl<C> Subsystem<C> for TestSubsystem1
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0;
SpawnedSubsystem(Box::pin(async move {
let mut i = 0;
loop {
@@ -746,14 +712,18 @@ mod tests {
struct TestSubsystem2(mpsc::Sender<usize>);
impl Subsystem<CandidateBackingMessage> for TestSubsystem2 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem2
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
let _sender = sender;
let mut c: usize = 0;
loop {
if c < 10 {
let (tx, _) = oneshot::channel();
ctx.send_msg(
ctx.send_message(
AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
@@ -786,8 +756,10 @@ mod tests {
struct TestSubsystem4;
impl Subsystem<CandidateBackingMessage> for TestSubsystem4 {
fn start(&mut self, mut _ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem4
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(self, mut _ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
// Do nothing and exit.
}))
@@ -805,8 +777,8 @@ mod tests {
let (overseer, mut handler) = Overseer::new(
vec![],
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem2(s2_tx)),
TestSubsystem1(s1_tx),
TestSubsystem2(s2_tx),
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
@@ -855,8 +827,8 @@ mod tests {
let (s1_tx, _) = mpsc::channel(64);
let (overseer, _handle) = Overseer::new(
vec![],
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem4),
TestSubsystem1(s1_tx),
TestSubsystem4,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
@@ -871,8 +843,10 @@ mod tests {
struct TestSubsystem5(mpsc::Sender<OverseerSignal>);
impl Subsystem<CandidateValidationMessage> for TestSubsystem5 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem5
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
@@ -895,8 +869,10 @@ mod tests {
struct TestSubsystem6(mpsc::Sender<OverseerSignal>);
impl Subsystem<CandidateBackingMessage> for TestSubsystem6 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem6
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
@@ -949,8 +925,8 @@ mod tests {
let (overseer, mut handler) = Overseer::new(
vec![first_block],
Box::new(TestSubsystem5(tx_5)),
Box::new(TestSubsystem6(tx_6)),
TestSubsystem5(tx_5),
TestSubsystem6(tx_6),
spawner,
).unwrap();
@@ -1034,8 +1010,8 @@ mod tests {
// start with two forks of different height.
let (overseer, mut handler) = Overseer::new(
vec![first_block, second_block],
Box::new(TestSubsystem5(tx_5)),
Box::new(TestSubsystem6(tx_6)),
TestSubsystem5(tx_5),
TestSubsystem6(tx_6),
spawner,
).unwrap();
+1
View File
@@ -10,3 +10,4 @@ polkadot-primitives = { path = "../../primitives" }
polkadot-statement-table = { path = "../../statement-table" }
parity-scale-codec = { version = "1.3.0", default-features = false, features = ["derive"] }
runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
async-trait = "0.1"
+10
View File
@@ -64,6 +64,7 @@ impl EncodeAs<CompactStatement> for Statement {
pub type SignedFullStatement = Signed<Statement, CompactStatement>;
/// A misbehaviour report.
#[derive(Debug)]
pub enum MisbehaviorReport {
/// These validator nodes disagree on this candidate's validity, please figure it out
///
@@ -79,3 +80,12 @@ pub enum MisbehaviorReport {
/// This peer has seconded more than one parachain candidate for this relay parent head
DoubleVote(CandidateReceipt, SignedFullStatement, SignedFullStatement),
}
/// A unique identifier for a network protocol.
pub type ProtocolId = [u8; 4];
/// A succinct representation of a peer's view. This consists of a bounded amount of chain heads.
///
/// Up to `N` (5?) chain heads.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct View(pub Vec<Hash>);
+1
View File
@@ -15,6 +15,7 @@ hex-literal = "0.2.1"
polkadot-primitives = { path = "../../primitives" }
polkadot-runtime = { path = "../../runtime/polkadot" }
polkadot-overseer = { path = "../overseer" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
kusama-runtime = { path = "../../runtime/kusama" }
westend-runtime = { path = "../../runtime/westend" }
polkadot-network = { path = "../../network", optional = true }
+49 -65
View File
@@ -29,15 +29,15 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use sc_executor::native_executor_instance;
use log::info;
use sp_blockchain::HeaderBackend;
use polkadot_overseer::{
self as overseer,
BlockInfo, Overseer, OverseerHandler, Subsystem, SubsystemContext, SpawnedSubsystem,
CandidateValidationMessage, CandidateBackingMessage,
use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler};
use polkadot_subsystem::{
Subsystem, SubsystemContext, SpawnedSubsystem,
messages::{CandidateValidationMessage, CandidateBackingMessage},
};
pub use service::{
AbstractService, Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis,
Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis,
TFullClient, TLightClient, TFullBackend, TLightBackend, TFullCallExecutor, TLightCallExecutor,
Configuration, ChainSpec, ServiceBuilderCommand,
Configuration, ChainSpec, ServiceBuilderCommand, ServiceComponents, TaskManager,
};
pub use service::config::{DatabaseConfig, PrometheusConfig};
pub use sc_executor::NativeExecutionDispatch;
@@ -269,8 +269,10 @@ macro_rules! new_full_start {
struct CandidateValidationSubsystem;
impl Subsystem<CandidateValidationMessage> for CandidateValidationSubsystem {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for CandidateValidationSubsystem
where C: SubsystemContext<Message = CandidateValidationMessage>
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {}
}))
@@ -279,8 +281,10 @@ impl Subsystem<CandidateValidationMessage> for CandidateValidationSubsystem {
struct CandidateBackingSubsystem;
impl Subsystem<CandidateBackingMessage> for CandidateBackingSubsystem {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for CandidateBackingSubsystem
where C: SubsystemContext<Message = CandidateBackingMessage>
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {}
}))
@@ -291,8 +295,8 @@ fn real_overseer<S: futures::task::Spawn>(
leaves: impl IntoIterator<Item = BlockInfo>,
s: S,
) -> Result<(Overseer<S>, OverseerHandler), ServiceError> {
let validation = Box::new(CandidateValidationSubsystem);
let candidate_backing = Box::new(CandidateBackingSubsystem);
let validation = CandidateValidationSubsystem;
let candidate_backing = CandidateBackingSubsystem;
Overseer::new(leaves, validation, candidate_backing, s)
.map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e)))
}
@@ -321,7 +325,10 @@ macro_rules! new_full {
let (builder, mut import_setup, inherent_data_providers, mut rpc_setup) =
new_full_start!($config, $runtime, $dispatch);
let service = builder
let ServiceComponents {
client, network, select_chain, keystore, transaction_pool, prometheus_registry,
task_manager, telemetry_on_connect_sinks, ..
} = builder
.with_finality_proof_provider(|client, backend| {
let provider = client as Arc<dyn grandpa::StorageAndProofProvider<_, _>>;
Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _)
@@ -334,11 +341,9 @@ macro_rules! new_full {
let shared_voter_state = rpc_setup.take()
.expect("The SharedVoterState is present for Full Services or setup failed before. qed");
let client = service.client();
let overseer_client = service.client();
let spawner = service.spawn_task_handle();
let leaves: Vec<_> = service.select_chain().ok_or(ServiceError::SelectChainRequired)?
let overseer_client = client.clone();
let spawner = task_manager.spawn_handle();
let leaves: Vec<_> = select_chain.clone().ok_or(ServiceError::SelectChainRequired)?
.leaves()
.unwrap_or_else(|_| vec![])
.into_iter()
@@ -356,7 +361,7 @@ macro_rules! new_full {
let (overseer, handler) = real_overseer(leaves, spawner)?;
service.spawn_essential_task_handle().spawn("overseer", Box::pin(async move {
task_manager.spawn_essential_handle().spawn_blocking("overseer", Box::pin(async move {
use futures::{pin_mut, select, FutureExt};
let forward = overseer::forward_events(overseer_client, handler);
@@ -377,24 +382,24 @@ macro_rules! new_full {
}));
if role.is_authority() {
let select_chain = service.select_chain().ok_or(ServiceError::SelectChainRequired)?;
let select_chain = select_chain.ok_or(ServiceError::SelectChainRequired)?;
let can_author_with =
consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());
// TODO: custom proposer (https://github.com/paritytech/polkadot/issues/1248)
let proposer = sc_basic_authorship::ProposerFactory::new(
client.clone(),
service.transaction_pool(),
transaction_pool,
None,
);
let babe_config = babe::BabeParams {
keystore: service.keystore(),
keystore: keystore.clone(),
client: client.clone(),
select_chain,
block_import,
env: proposer,
sync_oracle: service.network(),
sync_oracle: network.clone(),
inherent_data_providers: inherent_data_providers.clone(),
force_authoring,
babe_link,
@@ -402,13 +407,13 @@ macro_rules! new_full {
};
let babe = babe::start_babe(babe_config)?;
service.spawn_essential_task_handle().spawn_blocking("babe", babe);
task_manager.spawn_essential_handle().spawn_blocking("babe", babe);
}
// if the node isn't actively participating in consensus then it doesn't
// need a keystore, regardless of which protocol we use below.
let keystore = if is_authority {
Some(service.keystore() as BareCryptoStorePtr)
Some(keystore.clone() as BareCryptoStorePtr)
} else {
None
};
@@ -454,15 +459,15 @@ macro_rules! new_full {
let grandpa_config = grandpa::GrandpaParams {
config,
link: link_half,
network: service.network(),
network: network.clone(),
inherent_data_providers: inherent_data_providers.clone(),
telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
telemetry_on_connect: Some(telemetry_on_connect_sinks.on_connect_stream()),
voting_rule,
prometheus_registry: service.prometheus_registry(),
prometheus_registry: prometheus_registry,
shared_voter_state,
};
service.spawn_essential_task_handle().spawn_blocking(
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
grandpa::run_grandpa_voter(grandpa_config)?
);
@@ -470,11 +475,11 @@ macro_rules! new_full {
grandpa::setup_disabled_grandpa(
client.clone(),
&inherent_data_providers,
service.network(),
network.clone(),
)?;
}
(service, client)
(task_manager, client)
}}
}
@@ -566,6 +571,7 @@ macro_rules! new_light {
Ok(polkadot_rpc::create_light(light_deps))
})?
.build_light()
.map(|ServiceComponents { task_manager, .. }| task_manager)
}}
}
@@ -595,7 +601,7 @@ pub fn polkadot_new_full(
grandpa_pause: Option<(u32, u32)>,
)
-> Result<(
impl AbstractService,
TaskManager,
Arc<impl PolkadotClient<
Block,
TFullBackend<Block>,
@@ -604,7 +610,7 @@ pub fn polkadot_new_full(
FullNodeHandles,
), ServiceError>
{
let (service, client) = new_full!(
let (components, client) = new_full!(
config,
collating_for,
authority_discovery_enabled,
@@ -613,7 +619,7 @@ pub fn polkadot_new_full(
PolkadotExecutor,
);
Ok((service, client, FullNodeHandles))
Ok((components, client, FullNodeHandles))
}
/// Create a new Kusama service for a full node.
@@ -626,7 +632,7 @@ pub fn kusama_new_full(
_slot_duration: u64,
grandpa_pause: Option<(u32, u32)>,
) -> Result<(
impl AbstractService,
TaskManager,
Arc<impl PolkadotClient<
Block,
TFullBackend<Block>,
@@ -636,7 +642,7 @@ pub fn kusama_new_full(
FullNodeHandles,
), ServiceError>
{
let (service, client) = new_full!(
let (components, client) = new_full!(
config,
collating_for,
authority_discovery_enabled,
@@ -645,7 +651,7 @@ pub fn kusama_new_full(
KusamaExecutor,
);
Ok((service, client, FullNodeHandles))
Ok((components, client, FullNodeHandles))
}
/// Create a new Kusama service for a full node.
@@ -659,7 +665,7 @@ pub fn westend_new_full(
grandpa_pause: Option<(u32, u32)>,
)
-> Result<(
impl AbstractService,
TaskManager,
Arc<impl PolkadotClient<
Block,
TFullBackend<Block>,
@@ -668,7 +674,7 @@ pub fn westend_new_full(
FullNodeHandles,
), ServiceError>
{
let (service, client) = new_full!(
let (components, client) = new_full!(
config,
collating_for,
authority_discovery_enabled,
@@ -677,45 +683,23 @@ pub fn westend_new_full(
WestendExecutor,
);
Ok((service, client, FullNodeHandles))
Ok((components, client, FullNodeHandles))
}
/// Create a new Polkadot service for a light client.
pub fn polkadot_new_light(mut config: Configuration) -> Result<
impl AbstractService<
Block = Block,
RuntimeApi = polkadot_runtime::RuntimeApi,
Backend = TLightBackend<Block>,
SelectChain = LongestChain<TLightBackend<Block>, Block>,
CallExecutor = TLightCallExecutor<Block, PolkadotExecutor>,
>, ServiceError>
pub fn polkadot_new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
{
new_light!(config, polkadot_runtime::RuntimeApi, PolkadotExecutor)
}
/// Create a new Kusama service for a light client.
pub fn kusama_new_light(mut config: Configuration) -> Result<
impl AbstractService<
Block = Block,
RuntimeApi = kusama_runtime::RuntimeApi,
Backend = TLightBackend<Block>,
SelectChain = LongestChain<TLightBackend<Block>, Block>,
CallExecutor = TLightCallExecutor<Block, KusamaExecutor>,
>, ServiceError>
pub fn kusama_new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
{
new_light!(config, kusama_runtime::RuntimeApi, KusamaExecutor)
}
/// Create a new Westend service for a light client.
pub fn westend_new_light(mut config: Configuration, ) -> Result<
impl AbstractService<
Block = Block,
RuntimeApi = westend_runtime::RuntimeApi,
Backend = TLightBackend<Block>,
SelectChain = LongestChain<TLightBackend<Block>, Block>,
CallExecutor = TLightCallExecutor<Block, KusamaExecutor>
>,
ServiceError>
pub fn westend_new_light(mut config: Configuration, ) -> Result<TaskManager, ServiceError>
{
new_light!(config, westend_runtime::RuntimeApi, KusamaExecutor)
}
@@ -1,9 +1,9 @@
[package]
name = "polkadot-node-messages"
name = "polkadot-node-subsystem"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Message types used by Subsystems"
description = "Subsystem traits and message definitions"
[dependencies]
polkadot-primitives = { path = "../../primitives" }
@@ -11,3 +11,4 @@ polkadot-statement-table = { path = "../../statement-table" }
polkadot-node-primitives = { path = "../primitives" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.5"
async-trait = "0.1"
+150
View File
@@ -0,0 +1,150 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Subsystem trait definitions and message types.
//!
//! Node-side logic for Polkadot is mostly comprised of Subsystems, which are discrete components
//! that communicate via message-passing. They are coordinated by an overseer, provided by a
//! separate crate.
use std::pin::Pin;
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use polkadot_primitives::Hash;
use async_trait::async_trait;
use crate::messages::AllMessages;
pub mod messages;
/// Signals sent by an overseer to a subsystem.
#[derive(PartialEq, Clone, Debug)]
pub enum OverseerSignal {
/// `Subsystem` should start working on block-based work, given by the relay-chain block hash.
StartWork(Hash),
/// `Subsystem` should stop working on block-based work specified by the relay-chain block hash.
StopWork(Hash),
/// Conclude the work of the `Overseer` and all `Subsystem`s.
Conclude,
}
/// A message type that a subsystem receives from an overseer.
/// It wraps signals from an overseer and messages that are circulating
/// between subsystems.
///
/// It is generic over over the message type `M` that a particular `Subsystem` may use.
#[derive(Debug)]
pub enum FromOverseer<M> {
/// Signal from the `Overseer`.
Signal(OverseerSignal),
/// Some other `Subsystem`'s message.
Communication {
msg: M,
},
}
/// An error type that describes faults that may happen
///
/// These are:
/// * Channels being closed
/// * Subsystems dying when they are not expected to
/// * Subsystems not dying when they are told to die
/// * etc.
#[derive(Debug)]
pub struct SubsystemError;
impl From<mpsc::SendError> for SubsystemError {
fn from(_: mpsc::SendError) -> Self {
Self
}
}
impl From<oneshot::Canceled> for SubsystemError {
fn from(_: oneshot::Canceled) -> Self {
Self
}
}
impl From<futures::task::SpawnError> for SubsystemError {
fn from(_: futures::task::SpawnError) -> Self {
Self
}
}
impl From<std::convert::Infallible> for SubsystemError {
fn from(e: std::convert::Infallible) -> Self {
match e {}
}
}
/// An asynchronous subsystem task..
///
/// In essence it's just a newtype wrapping a `BoxFuture`.
pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>);
/// A `Result` type that wraps [`SubsystemError`].
///
/// [`SubsystemError`]: struct.SubsystemError.html
pub type SubsystemResult<T> = Result<T, SubsystemError>;
/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or spawn jobs.
///
/// [`Overseer`]: struct.Overseer.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
#[async_trait]
pub trait SubsystemContext: Send + 'static {
/// The message type of this context. Subsystems launched with this context will expect
/// to receive messages of this type.
type Message: Send;
/// Try to asynchronously receive a message.
///
/// This has to be used with caution, if you loop over this without
/// using `pending!()` macro you will end up with a busy loop!
async fn try_recv(&mut self) -> Result<Option<FromOverseer<Self::Message>>, ()>;
/// Receive a message.
async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>>;
/// Spawn a child task on the executor.
async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>;
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>;
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send;
}
/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
///
/// It is generic over the message type circulating in the system.
/// The idea that we want some type contaning persistent state that
/// can spawn actually running subsystems when asked to.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
pub trait Subsystem<C: SubsystemContext> {
/// Start this `Subsystem` and return `SpawnedSubsystem`.
fn start(self, ctx: C) -> SpawnedSubsystem;
}
@@ -24,27 +24,16 @@
use futures::channel::{mpsc, oneshot};
use sc_network::{ObservedRole, ReputationChange, PeerId, config::ProtocolId};
use sc_network::{ObservedRole, ReputationChange, PeerId};
use polkadot_primitives::{BlockNumber, Hash, Signature};
use polkadot_primitives::parachain::{
AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId,
SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex,
};
use polkadot_node_primitives::{
MisbehaviorReport, SignedFullStatement,
MisbehaviorReport, SignedFullStatement, View, ProtocolId,
};
/// Signals sent by an overseer to a subsystem.
#[derive(PartialEq, Clone, Debug)]
pub enum OverseerSignal {
/// `Subsystem` should start working on block-based work, given by the relay-chain block hash.
StartWork(Hash),
/// `Subsystem` should stop working on block-based work specified by the relay-chain block hash.
StopWork(Hash),
/// Conclude the work of the `Overseer` and all `Subsystem`s.
Conclude,
}
/// A notification of a new backed candidate.
#[derive(Debug)]
pub struct NewBackedCandidate(pub BackedCandidate);
@@ -90,12 +79,8 @@ pub enum CandidateValidationMessage {
),
}
/// Chain heads.
///
/// Up to `N` (5?) chain heads.
pub struct View(pub Vec<Hash>);
/// Events from network.
#[derive(Debug, Clone)]
pub enum NetworkBridgeEvent {
/// A peer has connected.
PeerConnected(PeerId, ObservedRole),
@@ -114,7 +99,8 @@ pub enum NetworkBridgeEvent {
}
/// Messages received by the network bridge subsystem.
pub enum NetworkBridgeSubsystemMessage {
#[derive(Debug)]
pub enum NetworkBridgeMessage {
/// Register an event producer on startup.
RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
@@ -126,6 +112,7 @@ pub enum NetworkBridgeSubsystemMessage {
}
/// Availability Distribution Message.
#[derive(Debug)]
pub enum AvailabilityDistributionMessage {
/// Distribute an availability chunk to other validators.
DistributeChunk(Hash, ErasureChunk),
@@ -138,6 +125,7 @@ pub enum AvailabilityDistributionMessage {
}
/// Bitfield distribution message.
#[derive(Debug)]
pub enum BitfieldDistributionMessage {
/// Distribute a bitfield via gossip to other validators.
DistributeBitfield(Hash, SignedAvailabilityBitfield),
@@ -147,6 +135,7 @@ pub enum BitfieldDistributionMessage {
}
/// Availability store subsystem message.
#[derive(Debug)]
pub enum AvailabilityStoreMessage {
/// Query a `PoVBlock` from the AV store.
QueryPoV(Hash, oneshot::Sender<Option<PoVBlock>>),
@@ -159,6 +148,7 @@ pub enum AvailabilityStoreMessage {
}
/// A request to the Runtime API subsystem.
#[derive(Debug)]
pub enum RuntimeApiRequest {
/// Get the current validator set.
Validators(oneshot::Sender<Vec<ValidatorId>>),
@@ -171,19 +161,24 @@ pub enum RuntimeApiRequest {
}
/// A message to the Runtime API subsystem.
#[derive(Debug)]
pub enum RuntimeApiMessage {
/// Make a request of the runtime API against the post-state of the given relay-parent.
Request(Hash, RuntimeApiRequest),
}
/// Statement distribution message.
#[derive(Debug)]
pub enum StatementDistributionMessage {
/// We have originated a signed statement in the context of
/// given relay-parent hash and it should be distributed to other validators.
Share(Hash, SignedFullStatement),
/// Event from the network bridge.
NetworkBridgeUpdate(NetworkBridgeEvent),
}
/// This data becomes intrinsics or extrinsics which should be included in a future relay chain block.
#[derive(Debug)]
pub enum ProvisionableData {
/// This bitfield indicates the availability of various candidate blocks.
Bitfield(Hash, SignedAvailabilityBitfield),
@@ -198,6 +193,7 @@ pub enum ProvisionableData {
/// Message to the Provisioner.
///
/// In all cases, the Hash is that of the relay parent.
#[derive(Debug)]
pub enum ProvisionerMessage {
/// This message allows potential block authors to be kept updated with all new authorship data
/// as it becomes available.
@@ -213,20 +209,18 @@ pub enum AllMessages {
CandidateValidation(CandidateValidationMessage),
/// Message for the candidate backing subsystem.
CandidateBacking(CandidateBackingMessage),
}
/// A message type that a subsystem receives from an overseer.
/// It wraps signals from an overseer and messages that are circulating
/// between subsystems.
///
/// It is generic over over the message type `M` that a particular `Subsystem` may use.
#[derive(Debug)]
pub enum FromOverseer<M: std::fmt::Debug> {
/// Signal from the `Overseer`.
Signal(OverseerSignal),
/// Some other `Subsystem`'s message.
Communication {
msg: M,
},
/// Message for the candidate selection subsystem.
CandidateSelection(CandidateSelectionMessage),
/// Message for the statement distribution subsystem.
StatementDistribution(StatementDistributionMessage),
/// Message for the availability distribution subsystem.
AvailabilityDistribution(AvailabilityDistributionMessage),
/// Message for the bitfield distribution subsystem.
BitfieldDistribution(BitfieldDistributionMessage),
/// Message for the Provisioner subsystem.
Provisioner(ProvisionerMessage),
/// Message for the Runtime API subsystem.
RuntimeApi(RuntimeApiMessage),
/// Message for the availability store subsystem.
AvailabilityStore(AvailabilityStoreMessage),
}
@@ -0,0 +1,12 @@
[package]
name = "polkadot-subsystem-test-helpers"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Helpers for testing subsystems"
[dependencies]
futures = "0.3.5"
async-trait = "0.1"
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
parking_lot = "0.10.0"
@@ -0,0 +1,229 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Utilities for testing subsystems.
use polkadot_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
use polkadot_subsystem::messages::AllMessages;
use futures::prelude::*;
use futures::channel::mpsc;
use futures::task::{Spawn, SpawnExt};
use futures::poll;
use parking_lot::Mutex;
use std::convert::Infallible;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
enum SinkState<T> {
Empty {
read_waker: Option<Waker>,
},
Item {
item: T,
ready_waker: Option<Waker>,
flush_waker: Option<Waker>,
},
}
/// The sink half of a single-item sink that does not resolve until the item has been read.
pub struct SingleItemSink<T>(Arc<Mutex<SinkState<T>>>);
/// The stream half of a single-item sink.
pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
impl<T> Sink<T> for SingleItemSink<T> {
type Error = Infallible;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { .. } => Poll::Ready(Ok(())),
SinkState::Item { ref mut ready_waker, .. } => {
*ready_waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
fn start_send(
self: Pin<&mut Self>,
item: T,
) -> Result<(), Infallible> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { ref mut read_waker } => {
if let Some(waker) = read_waker.take() {
waker.wake();
}
}
_ => panic!("start_send called outside of empty sink state ensured by poll_ready"),
}
*state = SinkState::Item {
item,
ready_waker: None,
flush_waker: None,
};
Ok(())
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { .. } => Poll::Ready(Ok(())),
SinkState::Item { ref mut flush_waker, .. } => {
*flush_waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
self.poll_flush(cx)
}
}
impl<T> Stream for SingleItemStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut state = self.0.lock();
let read_waker = Some(cx.waker().clone());
match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
SinkState::Empty { .. } => Poll::Pending,
SinkState::Item { item, ready_waker, flush_waker } => {
if let Some(waker) = ready_waker {
waker.wake();
}
if let Some(waker) = flush_waker {
waker.wake();
}
Poll::Ready(Some(item))
}
}
}
}
/// Create a single-item Sink/Stream pair.
///
/// The sink's send methods resolve at the point which the stream reads the item,
/// not when the item is buffered.
pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
(
SingleItemSink(inner.clone()),
SingleItemStream(inner),
)
}
/// A test subsystem context.
pub struct TestSubsystemContext<M, S> {
tx: mpsc::UnboundedSender<AllMessages>,
rx: SingleItemStream<FromOverseer<M>>,
spawn: S,
}
#[async_trait::async_trait]
impl<M: Send + 'static, S: Spawn + Send + 'static> SubsystemContext for TestSubsystemContext<M, S> {
type Message = M;
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
match poll!(self.rx.next()) {
Poll::Ready(Some(msg)) => Ok(Some(msg)),
Poll::Ready(None) => Err(()),
Poll::Pending => Ok(None),
}
}
async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
self.rx.next().await.ok_or(SubsystemError)
}
async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
self.spawn.spawn(s).map_err(Into::into)
}
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(msg).await.expect("test overseer no longer live");
Ok(())
}
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
let mut iter = stream::iter(msgs.into_iter().map(Ok));
self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
Ok(())
}
}
/// A handle for interacting with the subsystem context.
pub struct TestSubsystemContextHandle<M> {
tx: SingleItemSink<FromOverseer<M>>,
rx: mpsc::UnboundedReceiver<AllMessages>,
}
impl<M> TestSubsystemContextHandle<M> {
/// Send a message or signal to the subsystem. This resolves at the point in time where the
/// subsystem has _read_ the message.
pub async fn send(&mut self, from_overseer: FromOverseer<M>) {
self.tx.send(from_overseer).await.expect("Test subsystem no longer live");
}
/// Receive the next message from the subsystem.
pub async fn recv(&mut self) -> AllMessages {
self.rx.next().await.expect("Test subsystem no longer live")
}
}
/// Make a test subsystem context.
pub fn make_subsystem_context<M, S>(spawn: S)
-> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>)
{
let (overseer_tx, overseer_rx) = single_item_sink();
let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
(
TestSubsystemContext {
tx: all_messages_tx,
rx: overseer_rx,
spawn,
},
TestSubsystemContextHandle {
tx: overseer_tx,
rx: all_messages_rx
},
)
}
@@ -0,0 +1,23 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Inclusion Inherent primitives define types and constants which can be imported
//! without needing to import the entire inherent module.
use inherents::InherentIdentifier;
/// Unique identifier for the Inclusion Inherent
pub const INHERENT_IDENTIFIER: InherentIdentifier = *b"inclusn0";
+1
View File
@@ -23,6 +23,7 @@
use runtime_primitives::{generic, MultiSignature};
pub use runtime_primitives::traits::{BlakeTwo256, Hash as HashT, Verify, IdentifyAccount};
pub mod inclusion_inherent;
pub mod parachain;
pub use parity_scale_codec::Compact;
+6
View File
@@ -722,6 +722,12 @@ pub type SignedAvailabilityBitfield = Signed<AvailabilityBitfield>;
#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
pub struct SignedAvailabilityBitfields(pub Vec<SignedAvailabilityBitfield>);
impl From<Vec<SignedAvailabilityBitfield>> for SignedAvailabilityBitfields {
fn from(fields: Vec<SignedAvailabilityBitfield>) -> SignedAvailabilityBitfields {
SignedAvailabilityBitfields(fields)
}
}
/// A backed (or backable, depending on context) candidate.
// TODO: yes, this is roughly the same as AttestedCandidate.
// After https://github.com/paritytech/polkadot/issues/1250
+2 -1
View File
@@ -886,10 +886,11 @@ impl proxy::Trait for Runtime {
pub struct CustomOnRuntimeUpgrade;
impl frame_support::traits::OnRuntimeUpgrade for CustomOnRuntimeUpgrade {
fn on_runtime_upgrade() -> frame_support::weights::Weight {
treasury::Module::<Runtime>::migrate_retract_tip_for_tip_new();
if scheduler::Module::<Runtime>::migrate_v1_to_t2() {
<Runtime as system::Trait>::MaximumBlockWeight::get()
} else {
<Runtime as system::Trait>::DbWeight::get().reads(1)
<Runtime as system::Trait>::DbWeight::get().reads(1) + 500_000_000
}
}
}
@@ -23,18 +23,23 @@
use sp_std::prelude::*;
use primitives::{
inclusion_inherent,
parachain::{BackedCandidate, SignedAvailabilityBitfields},
};
use frame_support::{
decl_storage, decl_module, decl_error, ensure,
decl_error, decl_module, decl_storage, ensure,
dispatch::DispatchResult,
weights::{DispatchClass, Weight},
traits::Get,
};
use system::ensure_none;
use crate::{inclusion, scheduler::{self, FreedReason}};
use crate::{
inclusion,
scheduler::{self, FreedReason},
};
use inherents::{InherentIdentifier, InherentData, MakeFatalError, ProvideInherent};
pub trait Trait: inclusion::Trait + scheduler::Trait { }
pub trait Trait: inclusion::Trait + scheduler::Trait {}
decl_storage! {
trait Store for Module<T: Trait> as ParaInclusionInherent {
@@ -118,3 +123,23 @@ decl_module! {
}
}
}
impl<T: Trait> ProvideInherent for Module<T> {
type Call = Call<T>;
type Error = MakeFatalError<()>;
const INHERENT_IDENTIFIER: InherentIdentifier = inclusion_inherent::INHERENT_IDENTIFIER;
fn create_inherent(data: &InherentData) -> Option<Self::Call> {
data.get_data(&Self::INHERENT_IDENTIFIER)
.expect("inclusion inherent data failed to decode")
.map(|(signed_bitfields, backed_candidates): (SignedAvailabilityBitfields, Vec<BackedCandidate<T::Hash>>)| {
// Sanity check: session changes can invalidate an inherent, and we _really_ don't want that to happen.
// See github.com/paritytech/polkadot/issues/1327
if Self::inclusion(system::RawOrigin::None.into(), signed_bitfields.clone(), backed_candidates.clone()).is_ok() {
Call::inclusion(signed_bitfields, backed_candidates)
} else {
Call::inclusion(Vec::new().into(), Vec::new())
}
})
}
}
+43 -61
View File
@@ -30,9 +30,9 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use sc_executor::native_executor_instance;
use log::info;
pub use service::{
AbstractService, Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis,
Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis, RpcHandlers,
TFullClient, TLightClient, TFullBackend, TLightBackend, TFullCallExecutor, TLightCallExecutor,
Configuration, ChainSpec, ServiceBuilderCommand,
Configuration, ChainSpec, ServiceBuilderCommand, ServiceComponents, TaskManager,
};
pub use service::config::{DatabaseConfig, PrometheusConfig};
pub use sc_executor::NativeExecutionDispatch;
@@ -298,7 +298,10 @@ macro_rules! new_full {
let (builder, mut import_setup, inherent_data_providers, mut rpc_setup) =
new_full_start!($config, $runtime, $dispatch);
let service = builder
let ServiceComponents {
client, network, select_chain, keystore, transaction_pool, prometheus_registry,
task_manager, telemetry_on_connect_sinks, ..
} = builder
.with_finality_proof_provider(|client, backend| {
let provider = client as Arc<dyn grandpa::StorageAndProofProvider<_, _>>;
Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _)
@@ -311,16 +314,10 @@ macro_rules! new_full {
let shared_voter_state = rpc_setup.take()
.expect("The SharedVoterState is present for Full Services or setup failed before. qed");
let client = service.client();
let known_oracle = client.clone();
let mut handles = FullNodeHandles::default();
let select_chain = if let Some(select_chain) = service.select_chain() {
select_chain
} else {
info!("The node cannot start as an authority because it can't select chain.");
return Ok((service, client, handles));
};
let select_chain = select_chain.ok_or(ServiceError::SelectChainRequired)?;
let gossip_validator_select_chain = select_chain.clone();
let is_known = move |block_hash: &Hash| {
@@ -343,13 +340,13 @@ macro_rules! new_full {
};
let polkadot_network_service = network_protocol::start(
service.network(),
network.clone(),
network_protocol::Config {
collating_for: $collating_for,
},
(is_known, client.clone()),
client.clone(),
service.spawn_task_handle(),
task_manager.spawn_handle(),
).map_err(|e| format!("Could not spawn network worker: {:?}", e))?;
let authority_handles = if is_collator || role.is_authority() {
@@ -380,14 +377,14 @@ macro_rules! new_full {
client: client.clone(),
network: polkadot_network_service.clone(),
collators: polkadot_network_service.clone(),
spawner: service.spawn_task_handle(),
spawner: task_manager.spawn_handle(),
availability_store: availability_store.clone(),
select_chain: select_chain.clone(),
keystore: service.keystore(),
keystore: keystore.clone(),
max_block_data_size,
}.build();
service.spawn_essential_task_handle().spawn("validation-service", Box::pin(validation_service));
task_manager.spawn_essential_handle().spawn("validation-service", Box::pin(validation_service));
handles.validation_service_handle = Some(validation_service_handle.clone());
@@ -403,30 +400,29 @@ macro_rules! new_full {
let proposer = consensus::ProposerFactory::new(
client.clone(),
service.transaction_pool(),
transaction_pool,
validation_service_handle,
slot_duration,
service.prometheus_registry().as_ref(),
prometheus_registry.as_ref(),
);
let select_chain = service.select_chain().ok_or(ServiceError::SelectChainRequired)?;
let can_author_with =
consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());
let block_import = availability_store.block_import(
block_import,
client.clone(),
service.spawn_task_handle(),
service.keystore(),
task_manager.spawn_handle(),
keystore.clone(),
)?;
let babe_config = babe::BabeParams {
keystore: service.keystore(),
keystore: keystore.clone(),
client: client.clone(),
select_chain,
block_import,
env: proposer,
sync_oracle: service.network(),
sync_oracle: network.clone(),
inherent_data_providers: inherent_data_providers.clone(),
force_authoring,
babe_link,
@@ -434,7 +430,7 @@ macro_rules! new_full {
};
let babe = babe::start_babe(babe_config)?;
service.spawn_essential_task_handle().spawn_blocking("babe", babe);
task_manager.spawn_essential_handle().spawn_blocking("babe", babe);
}
if matches!(role, Role::Authority{..} | Role::Sentry{..}) {
@@ -443,7 +439,7 @@ macro_rules! new_full {
Role::Authority { ref sentry_nodes } => (
sentry_nodes.clone(),
authority_discovery::Role::Authority (
service.keystore(),
keystore.clone(),
),
),
Role::Sentry {..} => (
@@ -453,29 +449,28 @@ macro_rules! new_full {
_ => unreachable!("Due to outer matches! constraint; qed."),
};
let network = service.network();
let network_event_stream = network.event_stream("authority-discovery");
let dht_event_stream = network_event_stream.filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
_ => None,
}}).boxed();
let authority_discovery = authority_discovery::AuthorityDiscovery::new(
service.client(),
network,
client.clone(),
network.clone(),
sentries,
dht_event_stream,
authority_discovery_role,
service.prometheus_registry(),
prometheus_registry.clone(),
);
service.spawn_task_handle().spawn("authority-discovery", authority_discovery);
task_manager.spawn_handle().spawn("authority-discovery", authority_discovery);
}
}
// if the node isn't actively participating in consensus then it doesn't
// need a keystore, regardless of which protocol we use below.
let keystore = if is_authority {
Some(service.keystore() as BareCryptoStorePtr)
Some(keystore as BareCryptoStorePtr)
} else {
None
};
@@ -521,15 +516,15 @@ macro_rules! new_full {
let grandpa_config = grandpa::GrandpaParams {
config,
link: link_half,
network: service.network(),
network: network.clone(),
inherent_data_providers: inherent_data_providers.clone(),
telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
telemetry_on_connect: Some(telemetry_on_connect_sinks.on_connect_stream()),
voting_rule,
prometheus_registry: service.prometheus_registry(),
prometheus_registry: prometheus_registry.clone(),
shared_voter_state,
};
service.spawn_essential_task_handle().spawn_blocking(
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
grandpa::run_grandpa_voter(grandpa_config)?
);
@@ -537,12 +532,12 @@ macro_rules! new_full {
grandpa::setup_disabled_grandpa(
client.clone(),
&inherent_data_providers,
service.network(),
network.clone(),
)?;
}
handles.polkadot_network = Some(polkadot_network_service);
(service, client, handles)
(task_manager, client, handles)
}}
}
@@ -632,6 +627,9 @@ macro_rules! new_light {
Ok(polkadot_rpc::create_light(light_deps))
})?
.build_light()
.map(|ServiceComponents { task_manager, rpc_handlers, .. }| {
(task_manager, rpc_handlers)
})
}}
}
@@ -661,7 +659,7 @@ pub fn polkadot_new_full(
grandpa_pause: Option<(u32, u32)>,
)
-> Result<(
impl AbstractService,
TaskManager,
Arc<impl PolkadotClient<
Block,
TFullBackend<Block>,
@@ -694,7 +692,7 @@ pub fn kusama_new_full(
slot_duration: u64,
grandpa_pause: Option<(u32, u32)>,
) -> Result<(
impl AbstractService,
TaskManager,
Arc<impl PolkadotClient<
Block,
TFullBackend<Block>,
@@ -729,7 +727,7 @@ pub fn westend_new_full(
grandpa_pause: Option<(u32, u32)>,
)
-> Result<(
impl AbstractService,
TaskManager,
Arc<impl PolkadotClient<
Block,
TFullBackend<Block>,
@@ -765,40 +763,24 @@ pub struct FullNodeHandles {
/// Create a new Polkadot service for a light client.
pub fn polkadot_new_light(mut config: Configuration) -> Result<
impl AbstractService<
Block = Block,
RuntimeApi = polkadot_runtime::RuntimeApi,
Backend = TLightBackend<Block>,
SelectChain = LongestChain<TLightBackend<Block>, Block>,
CallExecutor = TLightCallExecutor<Block, PolkadotExecutor>,
>, ServiceError>
(TaskManager, Arc<RpcHandlers>), ServiceError
>
{
new_light!(config, polkadot_runtime::RuntimeApi, PolkadotExecutor)
}
/// Create a new Kusama service for a light client.
pub fn kusama_new_light(mut config: Configuration) -> Result<
impl AbstractService<
Block = Block,
RuntimeApi = kusama_runtime::RuntimeApi,
Backend = TLightBackend<Block>,
SelectChain = LongestChain<TLightBackend<Block>, Block>,
CallExecutor = TLightCallExecutor<Block, KusamaExecutor>,
>, ServiceError>
(TaskManager, Arc<RpcHandlers>), ServiceError
>
{
new_light!(config, kusama_runtime::RuntimeApi, KusamaExecutor)
}
/// Create a new Westend service for a light client.
pub fn westend_new_light(mut config: Configuration, ) -> Result<
impl AbstractService<
Block = Block,
RuntimeApi = westend_runtime::RuntimeApi,
Backend = TLightBackend<Block>,
SelectChain = LongestChain<TLightBackend<Block>, Block>,
CallExecutor = TLightCallExecutor<Block, KusamaExecutor>
>,
ServiceError>
(TaskManager, Arc<RpcHandlers>), ServiceError
>
{
new_light!(config, westend_runtime::RuntimeApi, KusamaExecutor)
}