mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 02:01:02 +00:00
sc-consensus-beefy: restart voter on pallet reset (#14821)
When detecting pallet-beefy consensus reset, just reinitialize the worker and continue without bringing down the task (and possibly the node). Signed-off-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
@@ -221,7 +221,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
|
||||
B: Block,
|
||||
BE: Backend<B>,
|
||||
C: Client<B, BE> + BlockBackend<B>,
|
||||
P: PayloadProvider<B>,
|
||||
P: PayloadProvider<B> + Clone,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B, AuthorityId> + MmrApi<B, MmrRootHash, NumberFor<B>>,
|
||||
N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
|
||||
@@ -237,7 +237,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
|
||||
min_block_delta,
|
||||
prometheus_registry,
|
||||
links,
|
||||
on_demand_justifications_handler,
|
||||
mut on_demand_justifications_handler,
|
||||
} = beefy_params;
|
||||
|
||||
let BeefyNetworkParams {
|
||||
@@ -248,83 +248,105 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
|
||||
..
|
||||
} = network_params;
|
||||
|
||||
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
|
||||
// Default votes filter is to discard everything.
|
||||
// Validator is updated later with correct starting round and set id.
|
||||
let (gossip_validator, gossip_report_stream) =
|
||||
communication::gossip::GossipValidator::new(known_peers.clone());
|
||||
let gossip_validator = Arc::new(gossip_validator);
|
||||
let mut gossip_engine = GossipEngine::new(
|
||||
network.clone(),
|
||||
sync.clone(),
|
||||
gossip_protocol_name,
|
||||
gossip_validator.clone(),
|
||||
None,
|
||||
);
|
||||
let metrics = register_metrics(prometheus_registry.clone());
|
||||
|
||||
// The `GossipValidator` adds and removes known peers based on valid votes and network events.
|
||||
let on_demand_justifications = OnDemandJustificationsEngine::new(
|
||||
network.clone(),
|
||||
justifications_protocol_name,
|
||||
known_peers,
|
||||
prometheus_registry.clone(),
|
||||
);
|
||||
|
||||
// Subscribe to finality notifications and justifications before waiting for runtime pallet and
|
||||
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
|
||||
let mut finality_notifications = client.finality_notification_stream().fuse();
|
||||
let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
|
||||
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
|
||||
|
||||
// Wait for BEEFY pallet to be active before starting voter.
|
||||
let persisted_state =
|
||||
match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications)
|
||||
.await
|
||||
.and_then(|(beefy_genesis, best_grandpa)| {
|
||||
load_or_init_voter_state(
|
||||
&*backend,
|
||||
&*runtime,
|
||||
beefy_genesis,
|
||||
best_grandpa,
|
||||
min_block_delta,
|
||||
)
|
||||
}) {
|
||||
// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
|
||||
// select recoverable errors.
|
||||
loop {
|
||||
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
|
||||
// Default votes filter is to discard everything.
|
||||
// Validator is updated later with correct starting round and set id.
|
||||
let (gossip_validator, gossip_report_stream) =
|
||||
communication::gossip::GossipValidator::new(known_peers.clone());
|
||||
let gossip_validator = Arc::new(gossip_validator);
|
||||
let mut gossip_engine = GossipEngine::new(
|
||||
network.clone(),
|
||||
sync.clone(),
|
||||
gossip_protocol_name.clone(),
|
||||
gossip_validator.clone(),
|
||||
None,
|
||||
);
|
||||
|
||||
// The `GossipValidator` adds and removes known peers based on valid votes and network
|
||||
// events.
|
||||
let on_demand_justifications = OnDemandJustificationsEngine::new(
|
||||
network.clone(),
|
||||
justifications_protocol_name.clone(),
|
||||
known_peers,
|
||||
prometheus_registry.clone(),
|
||||
);
|
||||
|
||||
// Wait for BEEFY pallet to be active before starting voter.
|
||||
let persisted_state = match wait_for_runtime_pallet(
|
||||
&*runtime,
|
||||
&mut gossip_engine,
|
||||
&mut finality_notifications,
|
||||
)
|
||||
.await
|
||||
.and_then(|(beefy_genesis, best_grandpa)| {
|
||||
load_or_init_voter_state(
|
||||
&*backend,
|
||||
&*runtime,
|
||||
beefy_genesis,
|
||||
best_grandpa,
|
||||
min_block_delta,
|
||||
)
|
||||
}) {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
|
||||
return
|
||||
},
|
||||
};
|
||||
// Update the gossip validator with the right starting round and set id.
|
||||
if let Err(e) = persisted_state
|
||||
.gossip_filter_config()
|
||||
.map(|f| gossip_validator.update_filter(f))
|
||||
{
|
||||
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
|
||||
// Update the gossip validator with the right starting round and set id.
|
||||
if let Err(e) = persisted_state
|
||||
.gossip_filter_config()
|
||||
.map(|f| gossip_validator.update_filter(f))
|
||||
{
|
||||
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
|
||||
return
|
||||
}
|
||||
|
||||
let worker = worker::BeefyWorker {
|
||||
backend: backend.clone(),
|
||||
payload_provider: payload_provider.clone(),
|
||||
runtime: runtime.clone(),
|
||||
sync: sync.clone(),
|
||||
key_store: key_store.clone().into(),
|
||||
gossip_engine,
|
||||
gossip_validator,
|
||||
gossip_report_stream,
|
||||
on_demand_justifications,
|
||||
links: links.clone(),
|
||||
metrics: metrics.clone(),
|
||||
pending_justifications: BTreeMap::new(),
|
||||
persisted_state,
|
||||
};
|
||||
|
||||
match futures::future::select(
|
||||
Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
|
||||
Box::pin(on_demand_justifications_handler.run()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
// On `ConsensusReset` error, just reinit and restart voter.
|
||||
futures::future::Either::Left((error::Error::ConsensusReset, _)) => {
|
||||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
|
||||
continue
|
||||
},
|
||||
// On other errors, bring down / finish the task.
|
||||
futures::future::Either::Left((worker_err, _)) =>
|
||||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
|
||||
futures::future::Either::Right((odj_handler_err, _)) =>
|
||||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),
|
||||
};
|
||||
return
|
||||
}
|
||||
|
||||
let worker = worker::BeefyWorker {
|
||||
backend,
|
||||
payload_provider,
|
||||
runtime,
|
||||
sync,
|
||||
key_store: key_store.into(),
|
||||
gossip_engine,
|
||||
gossip_validator,
|
||||
gossip_report_stream,
|
||||
on_demand_justifications,
|
||||
links,
|
||||
metrics,
|
||||
pending_justifications: BTreeMap::new(),
|
||||
persisted_state,
|
||||
};
|
||||
|
||||
futures::future::select(
|
||||
Box::pin(worker.run(block_import_justif, finality_notifications)),
|
||||
Box::pin(on_demand_justifications_handler.run()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn load_or_init_voter_state<B, BE, R>(
|
||||
|
||||
Reference in New Issue
Block a user