sc-consensus-beefy: reuse instead of recreate GossipEngine (#1262)

"sc-consensus-beefy: restart voter on pallet reset #14821" introduced
a mechanism to reinitialize the BEEFY worker on certain errors; but
re-creating the GossipEngine doesn't play well with
"Rework the event system of sc-network #14197".

So this PR slightly changes the re-initialization logic to reuse the original
GossipEngine and not recreate it.

Signed-off-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
Adrian Catangiu
2023-08-29 18:47:05 +03:00
committed by GitHub
parent cd10c46146
commit 562557a948
2 changed files with 87 additions and 62 deletions
+52 -31
View File
@@ -313,6 +313,16 @@ impl<B: Block> PersistedState<B> {
}
}
/// Helper object holding BEEFY worker communication/gossip components.
///
/// These are created once, but will be reused if worker is restarted/reinitialized.
pub(crate) struct BeefyComms<B: Block> {
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
}
/// A BEEFY worker plays the BEEFY protocol
pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
// utilities
@@ -322,11 +332,8 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
pub sync: Arc<S>,
pub key_store: BeefyKeystore,
// communication
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
// communication (created once, but returned and reused if worker is restarted/reinitialized)
pub comms: BeefyComms<B>,
// channels
/// Links between the block importer, the background voter and the RPC layer.
@@ -475,7 +482,7 @@ where
if let Err(e) = self
.persisted_state
.gossip_filter_config()
.map(|filter| self.gossip_validator.update_filter(filter))
.map(|filter| self.comms.gossip_validator.update_filter(filter))
{
error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e);
}
@@ -495,7 +502,11 @@ where
if let Some(finality_proof) = self.handle_vote(vote)? {
let gossip_proof = GossipMessage::<B>::FinalityProof(finality_proof);
let encoded_proof = gossip_proof.encode();
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
self.comms.gossip_engine.gossip_message(
proofs_topic::<B>(),
encoded_proof,
true,
);
},
RoundAction::Drop => metric_inc!(self, beefy_stale_votes),
RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
@@ -603,7 +614,7 @@ where
metric_set!(self, beefy_best_block, block_num);
self.on_demand_justifications.cancel_requests_older_than(block_num);
self.comms.on_demand_justifications.cancel_requests_older_than(block_num);
if let Err(e) = self
.backend
@@ -632,7 +643,7 @@ where
// Update gossip validator votes filter.
self.persisted_state
.gossip_filter_config()
.map(|filter| self.gossip_validator.update_filter(filter))?;
.map(|filter| self.comms.gossip_validator.update_filter(filter))?;
Ok(())
}
@@ -752,12 +763,14 @@ where
err
})? {
let encoded_proof = GossipMessage::<B>::FinalityProof(finality_proof).encode();
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
self.comms
.gossip_engine
.gossip_message(proofs_topic::<B>(), encoded_proof, true);
} else {
metric_inc!(self, beefy_votes_sent);
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
self.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
}
// Persist state after vote to avoid double voting in case of voter restarts.
@@ -783,7 +796,7 @@ where
// make sure there's also an on-demand justification request out for it.
if let Some((block, active)) = self.voting_oracle().mandatory_pending() {
// This only starts new request if there isn't already an active one.
self.on_demand_justifications.request(block, active);
self.comms.on_demand_justifications.request(block, active);
}
}
}
@@ -796,7 +809,7 @@ where
mut self,
block_import_justif: &mut Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
finality_notifications: &mut Fuse<FinalityNotifications<B>>,
) -> Error {
) -> (Error, BeefyComms<B>) {
info!(
target: LOG_TARGET,
"🥩 run BEEFY worker, best grandpa: #{:?}.",
@@ -804,7 +817,8 @@ where
);
let mut votes = Box::pin(
self.gossip_engine
self.comms
.gossip_engine
.messages_for(votes_topic::<B>())
.filter_map(|notification| async move {
let vote = GossipMessage::<B>::decode_all(&mut &notification.message[..])
@@ -816,7 +830,8 @@ where
.fuse(),
);
let mut gossip_proofs = Box::pin(
self.gossip_engine
self.comms
.gossip_engine
.messages_for(proofs_topic::<B>())
.filter_map(|notification| async move {
let proof = GossipMessage::<B>::decode_all(&mut &notification.message[..])
@@ -828,12 +843,12 @@ where
.fuse(),
);
loop {
let error = loop {
// Act on changed 'state'.
self.process_new_state();
// Mutable reference used to drive the gossip engine.
let mut gossip_engine = &mut self.gossip_engine;
let mut gossip_engine = &mut self.comms.gossip_engine;
// Use temp val and report after async section,
// to avoid having to Mutex-wrap `gossip_engine`.
let mut gossip_report: Option<PeerReport> = None;
@@ -847,18 +862,18 @@ where
notification = finality_notifications.next() => {
if let Some(notif) = notification {
if let Err(err) = self.handle_finality_notification(&notif) {
return err;
break err;
}
} else {
return Error::FinalityStreamTerminated;
break Error::FinalityStreamTerminated;
}
},
// Make sure to pump gossip engine.
_ = gossip_engine => {
return Error::GossipEngineTerminated;
break Error::GossipEngineTerminated;
},
// Process incoming justifications as these can make some in-flight votes obsolete.
response_info = self.on_demand_justifications.next().fuse() => {
response_info = self.comms.on_demand_justifications.next().fuse() => {
match response_info {
ResponseInfo::ValidProof(justif, peer_report) => {
if let Err(err) = self.triage_incoming_justif(justif) {
@@ -878,7 +893,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::BlockImportStreamTerminated;
break Error::BlockImportStreamTerminated;
}
},
justif = gossip_proofs.next() => {
@@ -888,7 +903,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::FinalityProofGossipStreamTerminated;
break Error::FinalityProofGossipStreamTerminated;
}
},
// Finally process incoming votes.
@@ -899,18 +914,21 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::VotesGossipStreamTerminated;
break Error::VotesGossipStreamTerminated;
}
},
// Process peer reports.
report = self.gossip_report_stream.next() => {
report = self.comms.gossip_report_stream.next() => {
gossip_report = report;
},
}
if let Some(PeerReport { who, cost_benefit }) = gossip_report {
self.gossip_engine.report(who, cost_benefit);
self.comms.gossip_engine.report(who, cost_benefit);
}
}
};
// return error _and_ `comms` that can be reused
(error, self.comms)
}
/// Report the given equivocation to the BEEFY runtime module. This method
@@ -1146,18 +1164,21 @@ pub(crate) mod tests {
)
.unwrap();
let payload_provider = MmrRootProvider::new(api.clone());
let comms = BeefyComms {
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
};
BeefyWorker {
backend,
payload_provider,
runtime: api,
key_store: Some(keystore).into(),
links,
gossip_engine,
gossip_validator,
gossip_report_stream,
comms,
metrics,
sync: Arc::new(sync),
on_demand_justifications,
pending_justifications: BTreeMap::new(),
persisted_state,
}