client/beefy: fix on-demand justifications sync for old blocks (#12767)

* client/beefy: fix on-demand justif sync for old blocks

When receiving BEEFY justifications for old blocks the state might
be pruned for them, in which case justification verification fails
because BEEFY validator set cannot be retrieved from runtime state.

Fix this by having the voter give the validator set to the
`OnDemandJustificationsEngine` as request information. On receiving
a BEEFY justification for requested block, the provided validator
set will be used to validate the justification.

Signed-off-by: acatangiu <adrian@parity.io>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* impl review suggestions

* client/beefy: fail initialization if state unavailable

* beefy: remove spammy log

Signed-off-by: acatangiu <adrian@parity.io>
Co-authored-by: parity-processbot <>
Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Adrian Catangiu
2022-11-28 13:38:24 +02:00
committed by GitHub
parent 0c934a9352
commit 2d4126d239
4 changed files with 96 additions and 103 deletions
@@ -18,21 +18,17 @@
//! Generating request logic for request/response protocol for syncing BEEFY justifications. //! Generating request logic for request/response protocol for syncing BEEFY justifications.
use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet}; use beefy_primitives::{crypto::AuthorityId, ValidatorSet};
use codec::Encode; use codec::Encode;
use futures::channel::{oneshot, oneshot::Canceled}; use futures::channel::{oneshot, oneshot::Canceled};
use log::{debug, error, warn}; use log::{debug, warn};
use parking_lot::Mutex; use parking_lot::Mutex;
use sc_network::{PeerId, ProtocolName}; use sc_network::{PeerId, ProtocolName};
use sc_network_common::{ use sc_network_common::{
request_responses::{IfDisconnected, RequestFailure}, request_responses::{IfDisconnected, RequestFailure},
service::NetworkRequest, service::NetworkRequest,
}; };
use sp_api::ProvideRuntimeApi; use sp_runtime::traits::{Block, NumberFor};
use sp_runtime::{
generic::BlockId,
traits::{Block, NumberFor},
};
use std::{collections::VecDeque, result::Result, sync::Arc}; use std::{collections::VecDeque, result::Result, sync::Arc};
use crate::{ use crate::{
@@ -46,14 +42,19 @@ type Response = Result<Vec<u8>, RequestFailure>;
/// Used to receive a response from the network. /// Used to receive a response from the network.
type ResponseReceiver = oneshot::Receiver<Response>; type ResponseReceiver = oneshot::Receiver<Response>;
enum State<B: Block> { #[derive(Clone, Debug)]
Idle, struct RequestInfo<B: Block> {
AwaitingResponse(PeerId, NumberFor<B>, ResponseReceiver), block: NumberFor<B>,
active_set: ValidatorSet<AuthorityId>,
} }
pub struct OnDemandJustificationsEngine<B: Block, R> { enum State<B: Block> {
Idle,
AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
}
pub struct OnDemandJustificationsEngine<B: Block> {
network: Arc<dyn NetworkRequest + Send + Sync>, network: Arc<dyn NetworkRequest + Send + Sync>,
runtime: Arc<R>,
protocol_name: ProtocolName, protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>, live_peers: Arc<Mutex<KnownPeers<B>>>,
@@ -62,21 +63,14 @@ pub struct OnDemandJustificationsEngine<B: Block, R> {
state: State<B>, state: State<B>,
} }
impl<B, R> OnDemandJustificationsEngine<B, R> impl<B: Block> OnDemandJustificationsEngine<B> {
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
pub fn new( pub fn new(
network: Arc<dyn NetworkRequest + Send + Sync>, network: Arc<dyn NetworkRequest + Send + Sync>,
runtime: Arc<R>,
protocol_name: ProtocolName, protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>, live_peers: Arc<Mutex<KnownPeers<B>>>,
) -> Self { ) -> Self {
Self { Self {
network, network,
runtime,
protocol_name, protocol_name,
live_peers, live_peers,
peers_cache: VecDeque::new(), peers_cache: VecDeque::new(),
@@ -100,10 +94,15 @@ where
None None
} }
fn request_from_peer(&mut self, peer: PeerId, block: NumberFor<B>) { fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo<B>) {
debug!(target: "beefy::sync", "🥩 requesting justif #{:?} from peer {:?}", block, peer); debug!(
target: "beefy::sync",
"🥩 requesting justif #{:?} from peer {:?}",
req_info.block,
peer,
);
let payload = JustificationRequest::<B> { begin: block }.encode(); let payload = JustificationRequest::<B> { begin: req_info.block }.encode();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
@@ -115,11 +114,13 @@ where
IfDisconnected::ImmediateError, IfDisconnected::ImmediateError,
); );
self.state = State::AwaitingResponse(peer, block, rx); self.state = State::AwaitingResponse(peer, req_info, rx);
} }
/// If no other request is in progress, start new justification request for `block`. /// Start new justification request for `block`, if no other request is in progress.
pub fn request(&mut self, block: NumberFor<B>) { ///
/// `active_set` will be used to verify validity of potential responses.
pub fn request(&mut self, block: NumberFor<B>, active_set: ValidatorSet<AuthorityId>) {
// ignore new requests while there's already one pending // ignore new requests while there's already one pending
if matches!(self.state, State::AwaitingResponse(_, _, _)) { if matches!(self.state, State::AwaitingResponse(_, _, _)) {
return return
@@ -129,7 +130,7 @@ where
// Start the requests engine - each unsuccessful received response will automatically // Start the requests engine - each unsuccessful received response will automatically
// trigger a new request to the next peer in the `peers_cache` until there are none left. // trigger a new request to the next peer in the `peers_cache` until there are none left.
if let Some(peer) = self.try_next_peer() { if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, block); self.request_from_peer(peer, RequestInfo { block, active_set });
} else { } else {
debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block); debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block);
} }
@@ -138,11 +139,10 @@ where
/// Cancel any pending request for block numbers smaller or equal to `block`. /// Cancel any pending request for block numbers smaller or equal to `block`.
pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) { pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) {
match &self.state { match &self.state {
State::AwaitingResponse(_, number, _) if *number <= block => { State::AwaitingResponse(_, req_info, _) if req_info.block <= block => {
debug!( debug!(
target: "beefy::sync", target: "beefy::sync", "🥩 cancel pending request for justification #{:?}",
"🥩 cancel pending request for justification #{:?}", req_info.block
number
); );
self.state = State::Idle; self.state = State::Idle;
}, },
@@ -153,8 +153,7 @@ where
fn process_response( fn process_response(
&mut self, &mut self,
peer: PeerId, peer: PeerId,
block: NumberFor<B>, req_info: &RequestInfo<B>,
validator_set: &ValidatorSet<AuthorityId>,
response: Result<Response, Canceled>, response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> { ) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response response
@@ -162,7 +161,7 @@ where
debug!( debug!(
target: "beefy::sync", target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}", "🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
block, peer, e req_info.block, peer, e
); );
Error::InvalidResponse Error::InvalidResponse
})? })?
@@ -170,60 +169,49 @@ where
debug!( debug!(
target: "beefy::sync", target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}", "🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
block, peer, e req_info.block, peer, e
); );
Error::InvalidResponse Error::InvalidResponse
}) })
.and_then(|encoded| { .and_then(|encoded| {
decode_and_verify_finality_proof::<B>(&encoded[..], block, &validator_set).map_err( decode_and_verify_finality_proof::<B>(
|e| { &encoded[..],
debug!( req_info.block,
target: "beefy::sync", &req_info.active_set,
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
block, peer, e
);
Error::InvalidResponse
},
) )
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
req_info.block, peer, e
);
Error::InvalidResponse
})
}) })
} }
pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> { pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
let (peer, block, resp) = match &mut self.state { let (peer, req_info, resp) = match &mut self.state {
State::Idle => { State::Idle => {
futures::pending!(); futures::pending!();
// Doesn't happen as 'futures::pending!()' is an 'await' barrier that never passes. // Doesn't happen as 'futures::pending!()' is an 'await' barrier that never passes.
return None return None
}, },
State::AwaitingResponse(peer, block, receiver) => { State::AwaitingResponse(peer, req_info, receiver) => {
let resp = receiver.await; let resp = receiver.await;
(*peer, *block, resp) (*peer, req_info.clone(), resp)
}, },
}; };
// We received the awaited response. Our 'receiver' will never generate any other response, // We received the awaited response. Our 'receiver' will never generate any other response,
// meaning we're done with current state. Move the engine to `State::Idle`. // meaning we're done with current state. Move the engine to `State::Idle`.
self.state = State::Idle; self.state = State::Idle;
let block_id = BlockId::number(block); let block = req_info.block;
let validator_set = self self.process_response(peer, &req_info, resp)
.runtime
.runtime_api()
.validator_set(&block_id)
.map_err(|e| {
error!(target: "beefy::sync", "🥩 Runtime API error {:?} in on-demand justif engine.", e);
e
})
.ok()?
.or_else(|| {
error!(target: "beefy::sync", "🥩 BEEFY pallet not available for block {:?}.", block);
None
})?;
self.process_response(peer, block, &validator_set, resp)
.map_err(|_| { .map_err(|_| {
// No valid justification received, try next peer in our set. // No valid justification received, try next peer in our set.
if let Some(peer) = self.try_next_peer() { if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, block); self.request_from_peer(peer, req_info);
} else { } else {
warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block); warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block);
} }
+16 -13
View File
@@ -244,7 +244,6 @@ where
// The `GossipValidator` adds and removes known peers based on valid votes and network events. // The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new( let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(), network.clone(),
runtime.clone(),
justifications_protocol_name, justifications_protocol_name,
known_peers, known_peers,
); );
@@ -295,7 +294,7 @@ where
persisted_state, persisted_state,
}; };
let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params); let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params);
futures::future::join( futures::future::join(
worker.run(block_import_justif, finality_notifications), worker.run(block_import_justif, finality_notifications),
@@ -377,17 +376,8 @@ where
break state break state
} }
// Check if we should move up the chain. if *header.number() == One::one() {
let parent_hash = *header.parent_hash(); // We've reached chain genesis, initialize voter here.
if *header.number() == One::one() ||
runtime
.runtime_api()
.validator_set(&BlockId::hash(parent_hash))
.ok()
.flatten()
.is_none()
{
// We've reached pallet genesis, initialize voter here.
let genesis_num = *header.number(); let genesis_num = *header.number();
let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash())) let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash()))
.and_then(genesis_set_sanity_check)?; .and_then(genesis_set_sanity_check)?;
@@ -408,6 +398,19 @@ where
sessions.push_front(Rounds::new(*header.number(), active)); sessions.push_front(Rounds::new(*header.number(), active));
} }
// Check if state is still available if we move up the chain.
let parent_hash = *header.parent_hash();
runtime
.runtime_api()
.validator_set(&BlockId::hash(parent_hash))
.ok()
.flatten()
.ok_or_else(|| {
let msg = format!("{}. Could not initialize BEEFY voter.", parent_hash);
error!(target: "beefy", "🥩 {}", msg);
ClientError::Consensus(sp_consensus::Error::StateUnavailable(msg))
})?;
// Move up the chain. // Move up the chain.
header = blockchain.expect_header(BlockId::Hash(parent_hash))?; header = blockchain.expect_header(BlockId::Hash(parent_hash))?;
}; };
+4
View File
@@ -89,6 +89,10 @@ where
} }
} }
pub(crate) fn validator_set(&self) -> &ValidatorSet<Public> {
&self.validator_set
}
pub(crate) fn validator_set_id(&self) -> ValidatorSetId { pub(crate) fn validator_set_id(&self) -> ValidatorSetId {
self.validator_set.id() self.validator_set.id()
} }
+24 -26
View File
@@ -31,8 +31,8 @@ use crate::{
}; };
use beefy_primitives::{ use beefy_primitives::{
crypto::{AuthorityId, Signature}, crypto::{AuthorityId, Signature},
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment, Commitment, ConsensusLog, Payload, PayloadProvider, SignedCommitment, ValidatorSet,
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
}; };
use codec::{Codec, Decode, Encode}; use codec::{Codec, Decode, Encode};
use futures::{stream::Fuse, FutureExt, StreamExt}; use futures::{stream::Fuse, FutureExt, StreamExt};
@@ -41,10 +41,9 @@ use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, Header
use sc_network_common::service::{NetworkEventStream, NetworkRequest}; use sc_network_common::service::{NetworkEventStream, NetworkRequest};
use sc_network_gossip::GossipEngine; use sc_network_gossip::GossipEngine;
use sc_utils::notification::NotificationReceiver; use sc_utils::notification::NotificationReceiver;
use sp_api::{BlockId, ProvideRuntimeApi}; use sp_api::BlockId;
use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
use sp_consensus::SyncOracle; use sp_consensus::SyncOracle;
use sp_mmr_primitives::MmrApi;
use sp_runtime::{ use sp_runtime::{
generic::OpaqueDigestItemId, generic::OpaqueDigestItemId,
traits::{Block, Header, NumberFor, Zero}, traits::{Block, Header, NumberFor, Zero},
@@ -166,13 +165,13 @@ impl<B: Block> VoterOracle<B> {
Ok(()) Ok(())
} }
/// Return current pending mandatory block, if any. /// Return current pending mandatory block, if any, plus its active validator set.
pub fn mandatory_pending(&self) -> Option<NumberFor<B>> { pub fn mandatory_pending(&self) -> Option<(NumberFor<B>, ValidatorSet<AuthorityId>)> {
self.sessions.front().and_then(|round| { self.sessions.front().and_then(|round| {
if round.mandatory_done() { if round.mandatory_done() {
None None
} else { } else {
Some(round.session_start()) Some((round.session_start(), round.validator_set().clone()))
} }
}) })
} }
@@ -239,14 +238,14 @@ impl<B: Block> VoterOracle<B> {
} }
} }
pub(crate) struct WorkerParams<B: Block, BE, P, R, N> { pub(crate) struct WorkerParams<B: Block, BE, P, N> {
pub backend: Arc<BE>, pub backend: Arc<BE>,
pub payload_provider: P, pub payload_provider: P,
pub network: N, pub network: N,
pub key_store: BeefyKeystore, pub key_store: BeefyKeystore,
pub gossip_engine: GossipEngine<B>, pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>, pub gossip_validator: Arc<GossipValidator<B>>,
pub on_demand_justifications: OnDemandJustificationsEngine<B, R>, pub on_demand_justifications: OnDemandJustificationsEngine<B>,
pub links: BeefyVoterLinks<B>, pub links: BeefyVoterLinks<B>,
pub metrics: Option<Metrics>, pub metrics: Option<Metrics>,
pub persisted_state: PersistedState<B>, pub persisted_state: PersistedState<B>,
@@ -287,7 +286,7 @@ impl<B: Block> PersistedState<B> {
} }
/// A BEEFY worker plays the BEEFY protocol /// A BEEFY worker plays the BEEFY protocol
pub(crate) struct BeefyWorker<B: Block, BE, P, R, N> { pub(crate) struct BeefyWorker<B: Block, BE, P, N> {
// utilities // utilities
backend: Arc<BE>, backend: Arc<BE>,
payload_provider: P, payload_provider: P,
@@ -297,7 +296,7 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, R, N> {
// communication // communication
gossip_engine: GossipEngine<B>, gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>, gossip_validator: Arc<GossipValidator<B>>,
on_demand_justifications: OnDemandJustificationsEngine<B, R>, on_demand_justifications: OnDemandJustificationsEngine<B>,
// channels // channels
/// Links between the block importer, the background voter and the RPC layer. /// Links between the block importer, the background voter and the RPC layer.
@@ -314,13 +313,11 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, R, N> {
persisted_state: PersistedState<B>, persisted_state: PersistedState<B>,
} }
impl<B, BE, P, R, N> BeefyWorker<B, BE, P, R, N> impl<B, BE, P, N> BeefyWorker<B, BE, P, N>
where where
B: Block + Codec, B: Block + Codec,
BE: Backend<B>, BE: Backend<B>,
P: PayloadProvider<B>, P: PayloadProvider<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B> + MmrApi<B, MmrRootHash, NumberFor<B>>,
N: NetworkEventStream + NetworkRequest + SyncOracle + Send + Sync + Clone + 'static, N: NetworkEventStream + NetworkRequest + SyncOracle + Send + Sync + Clone + 'static,
{ {
/// Return a new BEEFY worker instance. /// Return a new BEEFY worker instance.
@@ -329,7 +326,7 @@ where
/// BEEFY pallet has been deployed on-chain. /// BEEFY pallet has been deployed on-chain.
/// ///
/// The BEEFY pallet is needed in order to keep track of the BEEFY authority set. /// The BEEFY pallet is needed in order to keep track of the BEEFY authority set.
pub(crate) fn new(worker_params: WorkerParams<B, BE, P, R, N>) -> Self { pub(crate) fn new(worker_params: WorkerParams<B, BE, P, N>) -> Self {
let WorkerParams { let WorkerParams {
backend, backend,
payload_provider, payload_provider,
@@ -551,10 +548,15 @@ where
// New state is persisted after finalization. // New state is persisted after finalization.
self.finalize(finality_proof)?; self.finalize(finality_proof)?;
} else { } else {
if self_vote || self.voting_oracle().mandatory_pending() == Some(round.1) { let mandatory_round = self
// Persist state after handling self vote to avoid double voting in case .voting_oracle()
// of voter restarts. .mandatory_pending()
// Also persist state after handling mandatory block vote. .map(|p| p.0 == round.1)
.unwrap_or(false);
// Persist state after handling self vote to avoid double voting in case
// of voter restarts.
// Also persist state after handling mandatory block vote.
if self_vote || mandatory_round {
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
.map_err(|e| Error::Backend(e.to_string()))?; .map_err(|e| Error::Backend(e.to_string()))?;
} }
@@ -784,12 +786,10 @@ where
} }
// If the current target is a mandatory block, // If the current target is a mandatory block,
// make sure there's also an on-demand justification request out for it. // make sure there's also an on-demand justification request out for it.
if let Some(block) = self.voting_oracle().mandatory_pending() { if let Some((block, active)) = self.voting_oracle().mandatory_pending() {
// This only starts new request if there isn't already an active one. // This only starts new request if there isn't already an active one.
self.on_demand_justifications.request(block); self.on_demand_justifications.request(block, active);
} }
} else {
debug!(target: "beefy", "🥩 Skipping voting while major syncing.");
} }
} }
@@ -993,7 +993,6 @@ pub(crate) mod tests {
Block, Block,
Backend, Backend,
MmrRootProvider<Block, TestApi>, MmrRootProvider<Block, TestApi>,
TestApi,
Arc<NetworkService<Block, H256>>, Arc<NetworkService<Block, H256>>,
> { > {
let keystore = create_beefy_keystore(*key); let keystore = create_beefy_keystore(*key);
@@ -1024,7 +1023,6 @@ pub(crate) mod tests {
GossipEngine::new(network.clone(), "/beefy/1", gossip_validator.clone(), None); GossipEngine::new(network.clone(), "/beefy/1", gossip_validator.clone(), None);
let on_demand_justifications = OnDemandJustificationsEngine::new( let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(), network.clone(),
api.clone(),
"/beefy/justifs/1".into(), "/beefy/justifs/1".into(),
known_peers, known_peers,
); );
@@ -1050,7 +1048,7 @@ pub(crate) mod tests {
on_demand_justifications, on_demand_justifications,
persisted_state, persisted_state,
}; };
BeefyWorker::<_, _, _, _, _>::new(worker_params) BeefyWorker::<_, _, _, _>::new(worker_params)
} }
#[test] #[test]