mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 18:41:05 +00:00
Do not run unneeded subsystems on collator and its alongside node (#3061)
Currently, collators and their alongside nodes spin up a full-scale overseer running a bunch of subsystems that are not needed if the node is not a validator. That was considered to be harmless; however, we've got problems with unused subsystems getting stalled for a reason not currently known, resulting in the overseer exiting and bringing down the whole node. This PR aims to only run needed subsystems on such nodes, replacing the rest with `DummySubsystem`. It also enables collator-optimized availability recovery subsystem implementation. Partially solves #1730.
This commit is contained in:
@@ -31,7 +31,10 @@ pub mod overseer;
|
||||
pub mod workers;
|
||||
|
||||
#[cfg(feature = "full-node")]
|
||||
pub use self::overseer::{OverseerGen, OverseerGenArgs, RealOverseerGen};
|
||||
pub use self::overseer::{
|
||||
CollatorOverseerGen, ExtendedOverseerGenArgs, OverseerGen, OverseerGenArgs,
|
||||
ValidatorOverseerGen,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -775,8 +778,6 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
|
||||
let keystore = basics.keystore_container.local_keystore();
|
||||
let auth_or_collator = role.is_authority() || is_parachain_node.is_collator();
|
||||
// We only need to enable the pvf checker when this is a validator.
|
||||
let pvf_checker_enabled = role.is_authority();
|
||||
|
||||
let select_chain = if auth_or_collator {
|
||||
let metrics =
|
||||
@@ -867,10 +868,6 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
|
||||
let req_protocol_names = ReqProtocolNames::new(&genesis_hash, config.chain_spec.fork_id());
|
||||
|
||||
let (pov_req_receiver, cfg) = IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
let (chunk_req_receiver, cfg) = IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
let (collation_req_v1_receiver, cfg) =
|
||||
IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
@@ -880,12 +877,9 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
let (available_data_req_receiver, cfg) =
|
||||
IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
let (statement_req_receiver, cfg) = IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
let (pov_req_receiver, cfg) = IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
let (candidate_req_v2_receiver, cfg) =
|
||||
IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
let (dispute_req_receiver, cfg) = IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
let (chunk_req_receiver, cfg) = IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
|
||||
let grandpa_hard_forks = if config.chain_spec.is_kusama() {
|
||||
@@ -900,6 +894,69 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
grandpa_hard_forks,
|
||||
));
|
||||
|
||||
let ext_overseer_args = if is_parachain_node.is_running_alongside_parachain_node() {
|
||||
None
|
||||
} else {
|
||||
let parachains_db = open_database(&config.database)?;
|
||||
let candidate_validation_config = if role.is_authority() {
|
||||
let (prep_worker_path, exec_worker_path) = workers::determine_workers_paths(
|
||||
workers_path,
|
||||
workers_names,
|
||||
node_version.clone(),
|
||||
)?;
|
||||
log::info!("🚀 Using prepare-worker binary at: {:?}", prep_worker_path);
|
||||
log::info!("🚀 Using execute-worker binary at: {:?}", exec_worker_path);
|
||||
|
||||
Some(CandidateValidationConfig {
|
||||
artifacts_cache_path: config
|
||||
.database
|
||||
.path()
|
||||
.ok_or(Error::DatabasePathRequired)?
|
||||
.join("pvf-artifacts"),
|
||||
node_version,
|
||||
secure_validator_mode,
|
||||
prep_worker_path,
|
||||
exec_worker_path,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let (statement_req_receiver, cfg) =
|
||||
IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
let (candidate_req_v2_receiver, cfg) =
|
||||
IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
let (dispute_req_receiver, cfg) = IncomingRequest::get_config_receiver(&req_protocol_names);
|
||||
net_config.add_request_response_protocol(cfg);
|
||||
let approval_voting_config = ApprovalVotingConfig {
|
||||
col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data,
|
||||
slot_duration_millis: slot_duration.as_millis() as u64,
|
||||
};
|
||||
let dispute_coordinator_config = DisputeCoordinatorConfig {
|
||||
col_dispute_data: parachains_db::REAL_COLUMNS.col_dispute_coordinator_data,
|
||||
};
|
||||
let chain_selection_config = ChainSelectionConfig {
|
||||
col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data,
|
||||
stagnant_check_interval: Default::default(),
|
||||
stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly,
|
||||
};
|
||||
Some(ExtendedOverseerGenArgs {
|
||||
keystore,
|
||||
parachains_db,
|
||||
candidate_validation_config,
|
||||
availability_config: AVAILABILITY_CONFIG,
|
||||
pov_req_receiver,
|
||||
chunk_req_receiver,
|
||||
statement_req_receiver,
|
||||
candidate_req_v2_receiver,
|
||||
approval_voting_config,
|
||||
dispute_req_receiver,
|
||||
dispute_coordinator_config,
|
||||
chain_selection_config,
|
||||
})
|
||||
};
|
||||
|
||||
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
|
||||
service::build_network(service::BuildNetworkParams {
|
||||
config: &config,
|
||||
@@ -936,44 +993,6 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
);
|
||||
}
|
||||
|
||||
let parachains_db = open_database(&config.database)?;
|
||||
|
||||
let approval_voting_config = ApprovalVotingConfig {
|
||||
col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data,
|
||||
slot_duration_millis: slot_duration.as_millis() as u64,
|
||||
};
|
||||
|
||||
let candidate_validation_config = if role.is_authority() {
|
||||
let (prep_worker_path, exec_worker_path) =
|
||||
workers::determine_workers_paths(workers_path, workers_names, node_version.clone())?;
|
||||
log::info!("🚀 Using prepare-worker binary at: {:?}", prep_worker_path);
|
||||
log::info!("🚀 Using execute-worker binary at: {:?}", exec_worker_path);
|
||||
|
||||
Some(CandidateValidationConfig {
|
||||
artifacts_cache_path: config
|
||||
.database
|
||||
.path()
|
||||
.ok_or(Error::DatabasePathRequired)?
|
||||
.join("pvf-artifacts"),
|
||||
node_version,
|
||||
secure_validator_mode,
|
||||
prep_worker_path,
|
||||
exec_worker_path,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let chain_selection_config = ChainSelectionConfig {
|
||||
col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data,
|
||||
stagnant_check_interval: Default::default(),
|
||||
stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly,
|
||||
};
|
||||
|
||||
let dispute_coordinator_config = DisputeCoordinatorConfig {
|
||||
col_dispute_data: parachains_db::REAL_COLUMNS.col_dispute_coordinator_data,
|
||||
};
|
||||
|
||||
let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams {
|
||||
config,
|
||||
backend: backend.clone(),
|
||||
@@ -1067,29 +1086,16 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
.generate::<service::SpawnTaskHandle, FullClient>(
|
||||
overseer_connector,
|
||||
OverseerGenArgs {
|
||||
keystore,
|
||||
runtime_client: overseer_client.clone(),
|
||||
parachains_db,
|
||||
network_service: network.clone(),
|
||||
sync_service: sync_service.clone(),
|
||||
authority_discovery_service,
|
||||
pov_req_receiver,
|
||||
chunk_req_receiver,
|
||||
collation_req_v1_receiver,
|
||||
collation_req_v2_receiver,
|
||||
available_data_req_receiver,
|
||||
statement_req_receiver,
|
||||
candidate_req_v2_receiver,
|
||||
dispute_req_receiver,
|
||||
registry: prometheus_registry.as_ref(),
|
||||
spawner,
|
||||
is_parachain_node,
|
||||
approval_voting_config,
|
||||
availability_config: AVAILABILITY_CONFIG,
|
||||
candidate_validation_config,
|
||||
chain_selection_config,
|
||||
dispute_coordinator_config,
|
||||
pvf_checker_enabled,
|
||||
overseer_message_channel_capacity_override,
|
||||
req_protocol_names,
|
||||
peerset_protocol_names,
|
||||
@@ -1098,6 +1104,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
),
|
||||
notification_services,
|
||||
},
|
||||
ext_overseer_args,
|
||||
)
|
||||
.map_err(|e| {
|
||||
gum::error!("Failed to init overseer: {}", e);
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use super::{AuthorityDiscoveryApi, Block, Error, Hash, IsParachainNode, Registry};
|
||||
use polkadot_node_subsystem_types::DefaultSubsystemClient;
|
||||
use polkadot_overseer::{DummySubsystem, InitializedOverseerBuilder, SubsystemError};
|
||||
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
|
||||
use sp_core::traits::SpawnNamed;
|
||||
|
||||
@@ -32,13 +33,10 @@ use polkadot_node_network_protocol::{
|
||||
},
|
||||
};
|
||||
#[cfg(any(feature = "malus", test))]
|
||||
pub use polkadot_overseer::{
|
||||
dummy::{dummy_overseer_builder, DummySubsystem},
|
||||
HeadSupportsParachains,
|
||||
};
|
||||
pub use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
|
||||
use polkadot_overseer::{
|
||||
metrics::Metrics as OverseerMetrics, InitializedOverseerBuilder, MetricsTrait, Overseer,
|
||||
OverseerConnector, OverseerHandle, SpawnGlue,
|
||||
metrics::Metrics as OverseerMetrics, MetricsTrait, Overseer, OverseerConnector, OverseerHandle,
|
||||
SpawnGlue,
|
||||
};
|
||||
|
||||
use parking_lot::Mutex;
|
||||
@@ -86,22 +84,14 @@ where
|
||||
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
/// The keystore to use for i.e. validator keys.
|
||||
pub keystore: Arc<LocalKeystore>,
|
||||
/// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others.
|
||||
pub runtime_client: Arc<RuntimeClient>,
|
||||
/// The underlying key value store for the parachains.
|
||||
pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
|
||||
/// Underlying network service implementation.
|
||||
pub network_service: Arc<sc_network::NetworkService<Block, Hash>>,
|
||||
/// Underlying syncing service implementation.
|
||||
pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
|
||||
/// Underlying authority discovery service.
|
||||
pub authority_discovery_service: AuthorityDiscoveryService,
|
||||
/// POV request receiver.
|
||||
pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
|
||||
/// Erasure chunks request receiver.
|
||||
pub chunk_req_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
|
||||
/// Collations request receiver for network protocol v1.
|
||||
pub collation_req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
|
||||
/// Collations request receiver for network protocol v2.
|
||||
@@ -109,30 +99,12 @@ where
|
||||
/// Receiver for available data requests.
|
||||
pub available_data_req_receiver:
|
||||
IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||
/// Receiver for incoming large statement requests.
|
||||
pub statement_req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingRequest>,
|
||||
/// Receiver for incoming candidate requests.
|
||||
pub candidate_req_v2_receiver: IncomingRequestReceiver<request_v2::AttestedCandidateRequest>,
|
||||
/// Receiver for incoming disputes.
|
||||
pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
|
||||
/// Prometheus registry, commonly used for production systems, less so for test.
|
||||
pub registry: Option<&'a Registry>,
|
||||
/// Task spawner to be used throughout the overseer and the APIs it provides.
|
||||
pub spawner: Spawner,
|
||||
/// Determines the behavior of the collator.
|
||||
pub is_parachain_node: IsParachainNode,
|
||||
/// Configuration for the approval voting subsystem.
|
||||
pub approval_voting_config: ApprovalVotingConfig,
|
||||
/// Configuration for the availability store subsystem.
|
||||
pub availability_config: AvailabilityConfig,
|
||||
/// Configuration for the candidate validation subsystem.
|
||||
pub candidate_validation_config: Option<CandidateValidationConfig>,
|
||||
/// Configuration for the chain selection subsystem.
|
||||
pub chain_selection_config: ChainSelectionConfig,
|
||||
/// Configuration for the dispute coordinator subsystem.
|
||||
pub dispute_coordinator_config: DisputeCoordinatorConfig,
|
||||
/// Enable PVF pre-checking
|
||||
pub pvf_checker_enabled: bool,
|
||||
/// Overseer channel capacity override.
|
||||
pub overseer_message_channel_capacity_override: Option<usize>,
|
||||
/// Request-response protocol names source.
|
||||
@@ -145,39 +117,66 @@ where
|
||||
pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
|
||||
}
|
||||
|
||||
/// Obtain a prepared `OverseerBuilder`, that is initialized
|
||||
/// with all default values.
|
||||
pub fn prepared_overseer_builder<Spawner, RuntimeClient>(
|
||||
pub struct ExtendedOverseerGenArgs {
|
||||
/// The keystore to use for i.e. validator keys.
|
||||
pub keystore: Arc<LocalKeystore>,
|
||||
/// The underlying key value store for the parachains.
|
||||
pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
|
||||
/// Configuration for the candidate validation subsystem.
|
||||
pub candidate_validation_config: Option<CandidateValidationConfig>,
|
||||
/// Configuration for the availability store subsystem.
|
||||
pub availability_config: AvailabilityConfig,
|
||||
/// POV request receiver.
|
||||
pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
|
||||
/// Erasure chunks request receiver.
|
||||
pub chunk_req_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
|
||||
/// Receiver for incoming large statement requests.
|
||||
pub statement_req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingRequest>,
|
||||
/// Receiver for incoming candidate requests.
|
||||
pub candidate_req_v2_receiver: IncomingRequestReceiver<request_v2::AttestedCandidateRequest>,
|
||||
/// Configuration for the approval voting subsystem.
|
||||
pub approval_voting_config: ApprovalVotingConfig,
|
||||
/// Receiver for incoming disputes.
|
||||
pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
|
||||
/// Configuration for the dispute coordinator subsystem.
|
||||
pub dispute_coordinator_config: DisputeCoordinatorConfig,
|
||||
/// Configuration for the chain selection subsystem.
|
||||
pub chain_selection_config: ChainSelectionConfig,
|
||||
}
|
||||
|
||||
/// Obtain a prepared validator `Overseer`, that is initialized with all default values.
|
||||
pub fn validator_overseer_builder<Spawner, RuntimeClient>(
|
||||
OverseerGenArgs {
|
||||
keystore,
|
||||
runtime_client,
|
||||
parachains_db,
|
||||
network_service,
|
||||
sync_service,
|
||||
authority_discovery_service,
|
||||
pov_req_receiver,
|
||||
chunk_req_receiver,
|
||||
collation_req_v1_receiver,
|
||||
collation_req_v2_receiver,
|
||||
collation_req_v1_receiver: _,
|
||||
collation_req_v2_receiver: _,
|
||||
available_data_req_receiver,
|
||||
statement_req_receiver,
|
||||
candidate_req_v2_receiver,
|
||||
dispute_req_receiver,
|
||||
registry,
|
||||
spawner,
|
||||
is_parachain_node,
|
||||
approval_voting_config,
|
||||
availability_config,
|
||||
candidate_validation_config,
|
||||
chain_selection_config,
|
||||
dispute_coordinator_config,
|
||||
pvf_checker_enabled,
|
||||
overseer_message_channel_capacity_override,
|
||||
req_protocol_names,
|
||||
peerset_protocol_names,
|
||||
offchain_transaction_pool_factory,
|
||||
notification_services,
|
||||
}: OverseerGenArgs<Spawner, RuntimeClient>,
|
||||
ExtendedOverseerGenArgs {
|
||||
keystore,
|
||||
parachains_db,
|
||||
candidate_validation_config,
|
||||
availability_config,
|
||||
pov_req_receiver,
|
||||
chunk_req_receiver,
|
||||
statement_req_receiver,
|
||||
candidate_req_v2_receiver,
|
||||
approval_voting_config,
|
||||
dispute_req_receiver,
|
||||
dispute_coordinator_config,
|
||||
chain_selection_config,
|
||||
}: ExtendedOverseerGenArgs,
|
||||
) -> Result<
|
||||
InitializedOverseerBuilder<
|
||||
SpawnGlue<Spawner>,
|
||||
@@ -280,23 +279,15 @@ where
|
||||
Metrics::register(registry)?, // candidate-validation metrics
|
||||
Metrics::register(registry)?, // validation host metrics
|
||||
))
|
||||
.pvf_checker(PvfCheckerSubsystem::new(
|
||||
pvf_checker_enabled,
|
||||
keystore.clone(),
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.pvf_checker(PvfCheckerSubsystem::new(keystore.clone(), Metrics::register(registry)?))
|
||||
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
|
||||
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
|
||||
.collator_protocol({
|
||||
let side = match is_parachain_node {
|
||||
IsParachainNode::Collator(collator_pair) => ProtocolSide::Collator {
|
||||
peer_id: network_service.local_peer_id(),
|
||||
collator_pair,
|
||||
request_receiver_v1: collation_req_v1_receiver,
|
||||
request_receiver_v2: collation_req_v2_receiver,
|
||||
metrics: Metrics::register(registry)?,
|
||||
},
|
||||
IsParachainNode::FullNode => ProtocolSide::None,
|
||||
IsParachainNode::Collator(_) | IsParachainNode::FullNode =>
|
||||
return Err(Error::Overseer(SubsystemError::Context(
|
||||
"build validator overseer for parachain node".to_owned(),
|
||||
))),
|
||||
IsParachainNode::No => ProtocolSide::Validator {
|
||||
keystore: keystore.clone(),
|
||||
eviction_policy: Default::default(),
|
||||
@@ -352,23 +343,173 @@ where
|
||||
.metrics(metrics)
|
||||
.spawner(spawner);
|
||||
|
||||
if let Some(capacity) = overseer_message_channel_capacity_override {
|
||||
Ok(builder.message_channel_capacity(capacity))
|
||||
let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
|
||||
builder.message_channel_capacity(capacity)
|
||||
} else {
|
||||
Ok(builder)
|
||||
}
|
||||
builder
|
||||
};
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
/// Obtain a prepared collator `Overseer`, that is initialized with all default values.
|
||||
pub fn collator_overseer_builder<Spawner, RuntimeClient>(
|
||||
OverseerGenArgs {
|
||||
runtime_client,
|
||||
network_service,
|
||||
sync_service,
|
||||
authority_discovery_service,
|
||||
collation_req_v1_receiver,
|
||||
collation_req_v2_receiver,
|
||||
available_data_req_receiver,
|
||||
registry,
|
||||
spawner,
|
||||
is_parachain_node,
|
||||
overseer_message_channel_capacity_override,
|
||||
req_protocol_names,
|
||||
peerset_protocol_names,
|
||||
offchain_transaction_pool_factory,
|
||||
notification_services,
|
||||
}: OverseerGenArgs<Spawner, RuntimeClient>,
|
||||
) -> Result<
|
||||
InitializedOverseerBuilder<
|
||||
SpawnGlue<Spawner>,
|
||||
Arc<DefaultSubsystemClient<RuntimeClient>>,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
AvailabilityRecoverySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
RuntimeApiSubsystem<DefaultSubsystemClient<RuntimeClient>>,
|
||||
DummySubsystem,
|
||||
NetworkBridgeRxSubsystem<
|
||||
Arc<sc_network::NetworkService<Block, Hash>>,
|
||||
AuthorityDiscoveryService,
|
||||
>,
|
||||
NetworkBridgeTxSubsystem<
|
||||
Arc<sc_network::NetworkService<Block, Hash>>,
|
||||
AuthorityDiscoveryService,
|
||||
>,
|
||||
ChainApiSubsystem<RuntimeClient>,
|
||||
CollationGenerationSubsystem,
|
||||
CollatorProtocolSubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
ProspectiveParachainsSubsystem,
|
||||
>,
|
||||
Error,
|
||||
>
|
||||
where
|
||||
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
|
||||
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
use polkadot_node_subsystem_util::metrics::Metrics;
|
||||
|
||||
let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
|
||||
let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let spawner = SpawnGlue(spawner);
|
||||
|
||||
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
|
||||
|
||||
let runtime_api_client = Arc::new(DefaultSubsystemClient::new(
|
||||
runtime_client.clone(),
|
||||
offchain_transaction_pool_factory,
|
||||
));
|
||||
|
||||
let builder = Overseer::builder()
|
||||
.network_bridge_tx(NetworkBridgeTxSubsystem::new(
|
||||
network_service.clone(),
|
||||
authority_discovery_service.clone(),
|
||||
network_bridge_metrics.clone(),
|
||||
req_protocol_names,
|
||||
peerset_protocol_names.clone(),
|
||||
notification_sinks.clone(),
|
||||
))
|
||||
.network_bridge_rx(NetworkBridgeRxSubsystem::new(
|
||||
network_service.clone(),
|
||||
authority_discovery_service.clone(),
|
||||
Box::new(sync_service.clone()),
|
||||
network_bridge_metrics,
|
||||
peerset_protocol_names,
|
||||
notification_services,
|
||||
notification_sinks,
|
||||
))
|
||||
.availability_distribution(DummySubsystem)
|
||||
.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
|
||||
available_data_req_receiver,
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.availability_store(DummySubsystem)
|
||||
.bitfield_distribution(DummySubsystem)
|
||||
.bitfield_signing(DummySubsystem)
|
||||
.candidate_backing(DummySubsystem)
|
||||
.candidate_validation(DummySubsystem)
|
||||
.pvf_checker(DummySubsystem)
|
||||
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
|
||||
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
|
||||
.collator_protocol({
|
||||
let side = match is_parachain_node {
|
||||
IsParachainNode::No =>
|
||||
return Err(Error::Overseer(SubsystemError::Context(
|
||||
"build parachain node overseer for validator".to_owned(),
|
||||
))),
|
||||
IsParachainNode::Collator(collator_pair) => ProtocolSide::Collator {
|
||||
peer_id: network_service.local_peer_id(),
|
||||
collator_pair,
|
||||
request_receiver_v1: collation_req_v1_receiver,
|
||||
request_receiver_v2: collation_req_v2_receiver,
|
||||
metrics: Metrics::register(registry)?,
|
||||
},
|
||||
IsParachainNode::FullNode => ProtocolSide::None,
|
||||
};
|
||||
CollatorProtocolSubsystem::new(side)
|
||||
})
|
||||
.provisioner(DummySubsystem)
|
||||
.runtime_api(RuntimeApiSubsystem::new(
|
||||
runtime_api_client.clone(),
|
||||
Metrics::register(registry)?,
|
||||
spawner.clone(),
|
||||
))
|
||||
.statement_distribution(DummySubsystem)
|
||||
.approval_distribution(DummySubsystem)
|
||||
.approval_voting(DummySubsystem)
|
||||
.gossip_support(DummySubsystem)
|
||||
.dispute_coordinator(DummySubsystem)
|
||||
.dispute_distribution(DummySubsystem)
|
||||
.chain_selection(DummySubsystem)
|
||||
.prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
|
||||
.activation_external_listeners(Default::default())
|
||||
.span_per_active_leaf(Default::default())
|
||||
.active_leaves(Default::default())
|
||||
.supports_parachains(runtime_api_client)
|
||||
.metrics(metrics)
|
||||
.spawner(spawner);
|
||||
|
||||
let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
|
||||
builder.message_channel_capacity(capacity)
|
||||
} else {
|
||||
builder
|
||||
};
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
/// Trait for the `fn` generating the overseer.
|
||||
///
|
||||
/// Default behavior is to create an unmodified overseer, as `RealOverseerGen`
|
||||
/// would do.
|
||||
pub trait OverseerGen {
|
||||
/// Overwrite the full generation of the overseer, including the subsystems.
|
||||
fn generate<Spawner, RuntimeClient>(
|
||||
&self,
|
||||
connector: OverseerConnector,
|
||||
args: OverseerGenArgs<Spawner, RuntimeClient>,
|
||||
ext_args: Option<ExtendedOverseerGenArgs>,
|
||||
) -> Result<
|
||||
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
|
||||
Error,
|
||||
@@ -376,24 +517,22 @@ pub trait OverseerGen {
|
||||
where
|
||||
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
|
||||
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
let gen = RealOverseerGen;
|
||||
RealOverseerGen::generate::<Spawner, RuntimeClient>(&gen, connector, args)
|
||||
}
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin;
|
||||
|
||||
// It would be nice to make `create_subsystems` part of this trait,
|
||||
// but the amount of generic arguments that would be required as
|
||||
// as consequence make this rather annoying to implement and use.
|
||||
}
|
||||
|
||||
/// The regular set of subsystems.
|
||||
pub struct RealOverseerGen;
|
||||
pub struct ValidatorOverseerGen;
|
||||
|
||||
impl OverseerGen for RealOverseerGen {
|
||||
impl OverseerGen for ValidatorOverseerGen {
|
||||
fn generate<Spawner, RuntimeClient>(
|
||||
&self,
|
||||
connector: OverseerConnector,
|
||||
args: OverseerGenArgs<Spawner, RuntimeClient>,
|
||||
ext_args: Option<ExtendedOverseerGenArgs>,
|
||||
) -> Result<
|
||||
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
|
||||
Error,
|
||||
@@ -403,7 +542,35 @@ impl OverseerGen for RealOverseerGen {
|
||||
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
prepared_overseer_builder(args)?
|
||||
let ext_args = ext_args.ok_or(Error::Overseer(SubsystemError::Context(
|
||||
"create validator overseer as mandatory extended arguments were not provided"
|
||||
.to_owned(),
|
||||
)))?;
|
||||
validator_overseer_builder(args, ext_args)?
|
||||
.build_with_connector(connector)
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Reduced set of subsystems, to use in collator and collator's full node.
|
||||
pub struct CollatorOverseerGen;
|
||||
|
||||
impl OverseerGen for CollatorOverseerGen {
|
||||
fn generate<Spawner, RuntimeClient>(
|
||||
&self,
|
||||
connector: OverseerConnector,
|
||||
args: OverseerGenArgs<Spawner, RuntimeClient>,
|
||||
_ext_args: Option<ExtendedOverseerGenArgs>,
|
||||
) -> Result<
|
||||
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
|
||||
Error,
|
||||
>
|
||||
where
|
||||
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
|
||||
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
collator_overseer_builder(args)?
|
||||
.build_with_connector(connector)
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user