Move BEEFY code to consensus (#13484)

* Move beefy primitives to consensus dir
* Move beefy gadget to client consensus folder
* Rename beefy crates
This commit is contained in:
Davide Galassi
2023-02-28 15:56:22 +01:00
committed by GitHub
parent 1eb0cd31b9
commit 1ef9c473e7
47 changed files with 260 additions and 262 deletions
@@ -0,0 +1,106 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Schema for BEEFY state persisted in the aux-db.
use crate::{worker::PersistedState, LOG_TARGET};
use codec::{Decode, Encode};
use log::{info, trace};
use sc_client_api::{backend::AuxStore, Backend};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_runtime::traits::Block as BlockT;
const VERSION_KEY: &[u8] = b"beefy_auxschema_version";
const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state";
const CURRENT_VERSION: u32 = 2;
pub(crate) fn write_current_version<BE: AuxStore>(backend: &BE) -> ClientResult<()> {
info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION);
AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[])
}
/// Write voter state.
pub(crate) fn write_voter_state<B: BlockT, BE: AuxStore>(
backend: &BE,
state: &PersistedState<B>,
) -> ClientResult<()> {
trace!(target: LOG_TARGET, "🥩 persisting {:?}", state);
AuxStore::insert_aux(backend, &[(WORKER_STATE_KEY, state.encode().as_slice())], &[])
}
fn load_decode<BE: AuxStore, T: Decode>(backend: &BE, key: &[u8]) -> ClientResult<Option<T>> {
match backend.get_aux(key)? {
None => Ok(None),
Some(t) => T::decode(&mut &t[..])
.map_err(|e| ClientError::Backend(format!("BEEFY DB is corrupted: {}", e)))
.map(Some),
}
}
/// Load or initialize persistent data from backend.
pub(crate) fn load_persistent<B, BE>(backend: &BE) -> ClientResult<Option<PersistedState<B>>>
where
B: BlockT,
BE: Backend<B>,
{
let version: Option<u32> = load_decode(backend, VERSION_KEY)?;
match version {
None => (),
Some(1) => (), // version 1 is totally obsolete and should be simply ignored
Some(2) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
other =>
return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
}
// No persistent state found in DB.
Ok(None)
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::tests::BeefyTestNet;
use sc_network_test::TestNetFactory;
// also used in tests.rs
pub fn verify_persisted_version<B: BlockT, BE: Backend<B>>(backend: &BE) -> bool {
let version: u32 = load_decode(backend, VERSION_KEY).unwrap().unwrap();
version == CURRENT_VERSION
}
#[tokio::test]
async fn should_load_persistent_sanity_checks() {
let mut net = BeefyTestNet::new(1);
let backend = net.peer(0).client().as_backend();
// version not available in db -> None
assert_eq!(load_persistent(&*backend).unwrap(), None);
// populate version in db
write_current_version(&*backend).unwrap();
// verify correct version is retrieved
assert_eq!(load_decode(&*backend, VERSION_KEY).unwrap(), Some(CURRENT_VERSION));
// version is available in db but state isn't -> None
assert_eq!(load_persistent(&*backend).unwrap(), None);
// full `PersistedState` load is tested in `tests.rs`.
}
}
@@ -0,0 +1,474 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use sc_network::PeerId;
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
use sp_core::hashing::twox_64;
use sp_runtime::traits::{Block, Hash, Header, NumberFor};
use codec::{Decode, Encode};
use log::{debug, trace};
use parking_lot::{Mutex, RwLock};
use wasm_timer::Instant;
use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore, LOG_TARGET};
use sp_consensus_beefy::{
crypto::{Public, Signature},
VoteMessage,
};
// Timeout for rebroadcasting messages.
const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
/// Gossip engine messages topic
pub(crate) fn topic<B: Block>() -> B::Hash
where
B: Block,
{
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
}
/// A type that represents hash of the message.
pub type MessageHash = [u8; 8];
struct KnownVotes<B: Block> {
last_done: Option<NumberFor<B>>,
live: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
}
impl<B: Block> KnownVotes<B> {
pub fn new() -> Self {
Self { last_done: None, live: BTreeMap::new() }
}
/// Create new round votes set if not already present.
fn insert(&mut self, round: NumberFor<B>) {
self.live.entry(round).or_default();
}
/// Remove `round` and older from live set, update `last_done` accordingly.
fn conclude(&mut self, round: NumberFor<B>) {
self.live.retain(|&number, _| number > round);
self.last_done = self.last_done.max(Some(round));
}
/// Return true if `round` is newer than previously concluded rounds.
///
/// Latest concluded round is still considered alive to allow proper gossiping for it.
fn is_live(&self, round: &NumberFor<B>) -> bool {
Some(*round) >= self.last_done
}
/// Add new _known_ `hash` to the round's known votes.
fn add_known(&mut self, round: &NumberFor<B>, hash: MessageHash) {
self.live.get_mut(round).map(|known| known.insert(hash));
}
/// Check if `hash` is already part of round's known votes.
fn is_known(&self, round: &NumberFor<B>, hash: &MessageHash) -> bool {
self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false)
}
}
/// BEEFY gossip validator
///
/// Validate BEEFY gossip messages and limit the number of live BEEFY voting rounds.
///
/// Allows messages for 'rounds >= last concluded' to flow, everything else gets
/// rejected/expired.
///
///All messaging is handled in a single BEEFY global topic.
pub(crate) struct GossipValidator<B>
where
B: Block,
{
topic: B::Hash,
known_votes: RwLock<KnownVotes<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
}
impl<B> GossipValidator<B>
where
B: Block,
{
pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
GossipValidator {
topic: topic::<B>(),
known_votes: RwLock::new(KnownVotes::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
}
}
/// Note a voting round.
///
/// Noting round will track gossiped votes for `round`.
pub(crate) fn note_round(&self, round: NumberFor<B>) {
debug!(target: LOG_TARGET, "🥩 About to note gossip round #{}", round);
self.known_votes.write().insert(round);
}
/// Conclude a voting round.
///
/// This can be called once round is complete so we stop gossiping for it.
pub(crate) fn conclude_round(&self, round: NumberFor<B>) {
debug!(target: LOG_TARGET, "🥩 About to drop gossip round #{}", round);
self.known_votes.write().conclude(round);
}
}
impl<B> Validator<B> for GossipValidator<B>
where
B: Block,
{
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, who: &PeerId) {
self.known_peers.lock().remove(who);
}
fn validate(
&self,
_context: &mut dyn ValidatorContext<B>,
sender: &PeerId,
mut data: &[u8],
) -> ValidationResult<B::Hash> {
if let Ok(msg) = VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
let msg_hash = twox_64(data);
let round = msg.commitment.block_number;
// Verify general usefulness of the message.
// We are going to discard old votes right away (without verification)
// Also we keep track of already received votes to avoid verifying duplicates.
{
let known_votes = self.known_votes.read();
if !known_votes.is_live(&round) {
return ValidationResult::Discard
}
if known_votes.is_known(&round, &msg_hash) {
return ValidationResult::ProcessAndKeep(self.topic)
}
}
if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) {
self.known_votes.write().add_known(&round, msg_hash);
self.known_peers.lock().note_vote_for(*sender, round);
return ValidationResult::ProcessAndKeep(self.topic)
} else {
// TODO: report peer
debug!(
target: LOG_TARGET,
"🥩 Bad signature on message: {:?}, from: {:?}", msg, sender
);
}
}
ValidationResult::Discard
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
let known_votes = self.known_votes.read();
Box::new(move |_topic, mut data| {
let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};
let round = msg.commitment.block_number;
let expired = !known_votes.is_live(&round);
trace!(target: LOG_TARGET, "🥩 Message for round #{} expired: {}", round, expired);
expired
})
}
fn message_allowed<'a>(
&'a self,
) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
let do_rebroadcast = {
let now = Instant::now();
let mut next_rebroadcast = self.next_rebroadcast.lock();
if now >= *next_rebroadcast {
*next_rebroadcast = now + REBROADCAST_AFTER;
true
} else {
false
}
};
let known_votes = self.known_votes.read();
Box::new(move |_who, intent, _topic, mut data| {
if let MessageIntent::PeriodicRebroadcast = intent {
return do_rebroadcast
}
let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return false,
};
let round = msg.commitment.block_number;
let allowed = known_votes.is_live(&round);
trace!(target: LOG_TARGET, "🥩 Message for round #{} allowed: {}", round, allowed);
allowed
})
}
}
#[cfg(test)]
mod tests {
use sc_keystore::LocalKeystore;
use sc_network_test::Block;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use crate::keystore::BeefyKeystore;
use sp_consensus_beefy::{
crypto::Signature, known_payloads, Commitment, Keyring, MmrRootHash, Payload, VoteMessage,
KEY_TYPE,
};
use super::*;
#[test]
fn known_votes_insert_remove() {
let mut kv = KnownVotes::<Block>::new();
kv.insert(1);
kv.insert(1);
kv.insert(2);
assert_eq!(kv.live.len(), 2);
let mut kv = KnownVotes::<Block>::new();
kv.insert(1);
kv.insert(2);
kv.insert(3);
assert!(kv.last_done.is_none());
kv.conclude(2);
assert_eq!(kv.live.len(), 1);
assert!(!kv.live.contains_key(&2));
assert_eq!(kv.last_done, Some(2));
kv.conclude(1);
assert_eq!(kv.last_done, Some(2));
kv.conclude(3);
assert_eq!(kv.last_done, Some(3));
assert!(kv.live.is_empty());
}
#[test]
fn note_and_drop_round_works() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.note_round(1u64);
assert!(gv.known_votes.read().is_live(&1u64));
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
assert_eq!(gv.known_votes.read().live.len(), 4);
gv.conclude_round(7u64);
let votes = gv.known_votes.read();
// rounds 1 and 3 are outdated, don't gossip anymore
assert!(!votes.is_live(&1u64));
assert!(!votes.is_live(&3u64));
// latest concluded round is still gossiped
assert!(votes.is_live(&7u64));
// round 10 is alive and in-progress
assert!(votes.is_live(&10u64));
}
#[test]
fn note_same_round_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
assert_eq!(gv.known_votes.read().live.len(), 3);
// note round #7 again -> should not change anything
gv.note_round(7u64);
let votes = gv.known_votes.read();
assert_eq!(votes.live.len(), 3);
assert!(votes.is_live(&3u64));
assert!(votes.is_live(&7u64));
assert!(votes.is_live(&10u64));
}
struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
todo!()
}
fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {
todo!()
}
fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec<u8>) {
todo!()
}
fn send_topic(&mut self, _who: &sc_network::PeerId, _topic: B::Hash, _force: bool) {
todo!()
}
}
fn sign_commitment<BN: Encode>(who: &Keyring, commitment: &Commitment<BN>) -> Signature {
let store: SyncCryptoStorePtr = std::sync::Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::ecdsa_generate_new(&*store, KEY_TYPE, Some(&who.to_seed())).unwrap();
let beefy_keystore: BeefyKeystore = Some(store).into();
beefy_keystore.sign(&who.public(), &commitment.encode()).unwrap()
}
fn dummy_vote(block_number: u64) -> VoteMessage<u64, Public, Signature> {
let payload = Payload::from_single_entry(
known_payloads::MMR_ROOT_ID,
MmrRootHash::default().encode(),
);
let commitment = Commitment { payload, block_number, validator_set_id: 0 };
let signature = sign_commitment(&Keyring::Alice, &commitment);
VoteMessage { commitment, id: Keyring::Alice.public(), signature }
}
#[test]
fn should_avoid_verifying_signatures_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let sender = sc_network::PeerId::random();
let mut context = TestContext;
let vote = dummy_vote(3);
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
// first time the cache should be populated
let res = gv.validate(&mut context, &sender, &vote.encode());
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
assert_eq!(
gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
Some(1)
);
// second time we should hit the cache
let res = gv.validate(&mut context, &sender, &vote.encode());
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
// next we should quickly reject if the round is not live
gv.conclude_round(7_u64);
assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number));
let res = gv.validate(&mut context, &sender, &vote.encode());
assert!(matches!(res, ValidationResult::Discard));
}
#[test]
fn messages_allowed_and_expired() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let sender = sc_network::PeerId::random();
let topic = Default::default();
let intent = MessageIntent::Broadcast;
// note round 2 and 3, then conclude 2
gv.note_round(2u64);
gv.note_round(3u64);
gv.conclude_round(2u64);
let mut allowed = gv.message_allowed();
let mut expired = gv.message_expired();
// check bad vote format
assert!(!allowed(&sender, intent, &topic, &mut [0u8; 16]));
assert!(expired(topic, &mut [0u8; 16]));
// inactive round 1 -> expired
let vote = dummy_vote(1);
let mut encoded_vote = vote.encode();
assert!(!allowed(&sender, intent, &topic, &mut encoded_vote));
assert!(expired(topic, &mut encoded_vote));
// active round 2 -> !expired - concluded but still gossiped
let vote = dummy_vote(2);
let mut encoded_vote = vote.encode();
assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
assert!(!expired(topic, &mut encoded_vote));
// in progress round 3 -> !expired
let vote = dummy_vote(3);
let mut encoded_vote = vote.encode();
assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
assert!(!expired(topic, &mut encoded_vote));
// unseen round 4 -> !expired
let vote = dummy_vote(3);
let mut encoded_vote = vote.encode();
assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
assert!(!expired(topic, &mut encoded_vote));
}
#[test]
fn messages_rebroadcast() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let sender = sc_network::PeerId::random();
let topic = Default::default();
let vote = dummy_vote(1);
let mut encoded_vote = vote.encode();
// re-broadcasting only allowed at `REBROADCAST_AFTER` intervals
let intent = MessageIntent::PeriodicRebroadcast;
let mut allowed = gv.message_allowed();
// rebroadcast not allowed so soon after GossipValidator creation
assert!(!allowed(&sender, intent, &topic, &mut encoded_vote));
// hack the inner deadline to be `now`
*gv.next_rebroadcast.lock() = Instant::now();
// still not allowed on old `allowed` closure result
assert!(!allowed(&sender, intent, &topic, &mut encoded_vote));
// renew closure result
let mut allowed = gv.message_allowed();
// rebroadcast should be allowed now
assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
}
}
@@ -0,0 +1,113 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Communication streams for the BEEFY networking protocols.
pub mod notification;
pub mod request_response;
pub(crate) mod gossip;
pub(crate) mod peers;
pub(crate) mod beefy_protocol_name {
use array_bytes::bytes2hex;
use sc_network::ProtocolName;
/// BEEFY votes gossip protocol name suffix.
const GOSSIP_NAME: &str = "/beefy/1";
/// BEEFY justifications protocol name suffix.
const JUSTIFICATIONS_NAME: &str = "/beefy/justifications/1";
/// Name of the votes gossip protocol used by BEEFY.
///
/// Must be registered towards the networking in order for BEEFY voter to properly function.
pub fn gossip_protocol_name<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
) -> ProtocolName {
let genesis_hash = genesis_hash.as_ref();
if let Some(fork_id) = fork_id {
format!("/{}/{}{}", bytes2hex("", genesis_hash), fork_id, GOSSIP_NAME).into()
} else {
format!("/{}{}", bytes2hex("", genesis_hash), GOSSIP_NAME).into()
}
}
/// Name of the BEEFY justifications request-response protocol.
pub fn justifications_protocol_name<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
) -> ProtocolName {
let genesis_hash = genesis_hash.as_ref();
if let Some(fork_id) = fork_id {
format!("/{}/{}{}", bytes2hex("", genesis_hash), fork_id, JUSTIFICATIONS_NAME).into()
} else {
format!("/{}{}", bytes2hex("", genesis_hash), JUSTIFICATIONS_NAME).into()
}
}
}
/// Returns the configuration value to put in
/// [`sc_network::config::NetworkConfiguration::extra_sets`].
/// For standard protocol name see [`beefy_protocol_name::gossip_protocol_name`].
pub fn beefy_peers_set_config(
gossip_protocol_name: sc_network::ProtocolName,
) -> sc_network_common::config::NonDefaultSetConfig {
let mut cfg =
sc_network_common::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024);
cfg.allow_non_reserved(25, 25);
cfg
}
#[cfg(test)]
mod tests {
use super::*;
use sp_core::H256;
#[test]
fn beefy_protocols_names() {
use beefy_protocol_name::{gossip_protocol_name, justifications_protocol_name};
// Create protocol name using random genesis hash.
let genesis_hash = H256::random();
let genesis_hex = array_bytes::bytes2hex("", genesis_hash.as_ref());
let expected_gossip_name = format!("/{}/beefy/1", genesis_hex);
let gossip_proto_name = gossip_protocol_name(&genesis_hash, None);
assert_eq!(gossip_proto_name.to_string(), expected_gossip_name);
let expected_justif_name = format!("/{}/beefy/justifications/1", genesis_hex);
let justif_proto_name = justifications_protocol_name(&genesis_hash, None);
assert_eq!(justif_proto_name.to_string(), expected_justif_name);
// Create protocol name using hardcoded genesis hash. Verify exact representation.
let genesis_hash = [
50, 4, 60, 123, 58, 106, 216, 246, 194, 188, 139, 193, 33, 212, 202, 171, 9, 55, 123,
94, 8, 43, 12, 251, 187, 57, 173, 19, 188, 74, 205, 147,
];
let genesis_hex = "32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93";
let expected_gossip_name = format!("/{}/beefy/1", genesis_hex);
let gossip_proto_name = gossip_protocol_name(&genesis_hash, None);
assert_eq!(gossip_proto_name.to_string(), expected_gossip_name);
let expected_justif_name = format!("/{}/beefy/justifications/1", genesis_hex);
let justif_proto_name = justifications_protocol_name(&genesis_hash, None);
assert_eq!(justif_proto_name.to_string(), expected_justif_name);
}
}
@@ -0,0 +1,55 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use sc_utils::notification::{NotificationSender, NotificationStream, TracingKeyStr};
use sp_runtime::traits::Block as BlockT;
use crate::justification::BeefyVersionedFinalityProof;
/// The sending half of the notifications channel(s) used to send
/// notifications about best BEEFY block from the gadget side.
pub type BeefyBestBlockSender<Block> = NotificationSender<<Block as BlockT>::Hash>;
/// The receiving half of a notifications channel used to receive
/// notifications about best BEEFY blocks determined on the gadget side.
pub type BeefyBestBlockStream<Block> =
NotificationStream<<Block as BlockT>::Hash, BeefyBestBlockTracingKey>;
/// The sending half of the notifications channel(s) used to send notifications
/// about versioned finality proof generated at the end of a BEEFY round.
pub type BeefyVersionedFinalityProofSender<Block> =
NotificationSender<BeefyVersionedFinalityProof<Block>>;
/// The receiving half of a notifications channel used to receive notifications
/// about versioned finality proof generated at the end of a BEEFY round.
pub type BeefyVersionedFinalityProofStream<Block> =
NotificationStream<BeefyVersionedFinalityProof<Block>, BeefyVersionedFinalityProofTracingKey>;
/// Provides tracing key for BEEFY best block stream.
#[derive(Clone)]
pub struct BeefyBestBlockTracingKey;
impl TracingKeyStr for BeefyBestBlockTracingKey {
const TRACING_KEY: &'static str = "mpsc_beefy_best_block_notification_stream";
}
/// Provides tracing key for BEEFY versioned finality proof stream.
#[derive(Clone)]
pub struct BeefyVersionedFinalityProofTracingKey;
impl TracingKeyStr for BeefyVersionedFinalityProofTracingKey {
const TRACING_KEY: &'static str = "mpsc_beefy_versioned_finality_proof_notification_stream";
}
@@ -0,0 +1,123 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Logic for keeping track of BEEFY peers.
// TODO (issue #12296): replace this naive peer tracking with generic one that infers data
// from multiple network protocols.
use sc_network::PeerId;
use sp_runtime::traits::{Block, NumberFor, Zero};
use std::collections::{HashMap, VecDeque};
struct PeerData<B: Block> {
last_voted_on: NumberFor<B>,
}
impl<B: Block> Default for PeerData<B> {
fn default() -> Self {
PeerData { last_voted_on: Zero::zero() }
}
}
/// Keep a simple map of connected peers
/// and the most recent voting round they participated in.
pub struct KnownPeers<B: Block> {
live: HashMap<PeerId, PeerData<B>>,
}
impl<B: Block> KnownPeers<B> {
pub fn new() -> Self {
Self { live: HashMap::new() }
}
/// Note vote round number for `peer`.
pub fn note_vote_for(&mut self, peer: PeerId, round: NumberFor<B>) {
let data = self.live.entry(peer).or_default();
data.last_voted_on = round.max(data.last_voted_on);
}
/// Remove connected `peer`.
pub fn remove(&mut self, peer: &PeerId) {
self.live.remove(peer);
}
/// Return _filtered and cloned_ list of peers that have voted on higher than `block`.
pub fn further_than(&self, block: NumberFor<B>) -> VecDeque<PeerId> {
self.live
.iter()
.filter_map(|(k, v)| (v.last_voted_on > block).then_some(k))
.cloned()
.collect()
}
/// Answer whether `peer` is part of `KnownPeers` set.
pub fn contains(&self, peer: &PeerId) -> bool {
self.live.contains_key(peer)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_track_known_peers_progress() {
let (alice, bob, charlie) = (PeerId::random(), PeerId::random(), PeerId::random());
let mut peers = KnownPeers::<sc_network_test::Block>::new();
assert!(peers.live.is_empty());
// 'Tracked' Bob seen voting for 5.
peers.note_vote_for(bob, 5);
// Previously unseen Charlie now seen voting for 10.
peers.note_vote_for(charlie, 10);
assert_eq!(peers.live.len(), 2);
assert!(!peers.contains(&alice));
assert!(peers.contains(&bob));
assert!(peers.contains(&charlie));
// Get peers at block > 4
let further_than_4 = peers.further_than(4);
// Should be Bob and Charlie
assert_eq!(further_than_4.len(), 2);
assert!(further_than_4.contains(&bob));
assert!(further_than_4.contains(&charlie));
// 'Tracked' Alice seen voting for 10.
peers.note_vote_for(alice, 10);
// Get peers at block > 9
let further_than_9 = peers.further_than(9);
// Should be Charlie and Alice
assert_eq!(further_than_9.len(), 2);
assert!(further_than_9.contains(&charlie));
assert!(further_than_9.contains(&alice));
// Remove Alice
peers.remove(&alice);
assert_eq!(peers.live.len(), 2);
assert!(!peers.contains(&alice));
// Get peers at block >= 9
let further_than_9 = peers.further_than(9);
// Now should be just Charlie
assert_eq!(further_than_9.len(), 1);
assert!(further_than_9.contains(&charlie));
}
}
@@ -0,0 +1,210 @@
// Copyright Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Helper for handling (i.e. answering) BEEFY justifications requests from a remote peer.
use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, trace};
use sc_client_api::BlockBackend;
use sc_network::{config as netconfig, config::RequestResponseConfig, PeerId, ReputationChange};
use sc_network_common::protocol::ProtocolName;
use sp_consensus_beefy::BEEFY_ENGINE_ID;
use sp_runtime::traits::Block;
use std::{marker::PhantomData, sync::Arc};
use crate::{
communication::request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
BEEFY_SYNC_LOG_TARGET,
},
metric_inc,
metrics::{register_metrics, OnDemandIncomingRequestsMetrics},
};
/// A request coming in, including a sender for sending responses.
#[derive(Debug)]
pub(crate) struct IncomingRequest<B: Block> {
/// `PeerId` of sending peer.
pub peer: PeerId,
/// The sent request.
pub payload: JustificationRequest<B>,
/// Sender for sending response back.
pub pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
}
impl<B: Block> IncomingRequest<B> {
/// Create new `IncomingRequest`.
pub fn new(
peer: PeerId,
payload: JustificationRequest<B>,
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
) -> Self {
Self { peer, payload, pending_response }
}
/// Try building from raw network request.
///
/// This function will fail if the request cannot be decoded and will apply passed in
/// reputation changes in that case.
///
/// Params:
/// - The raw request to decode
/// - Reputation changes to apply for the peer in case decoding fails.
pub fn try_from_raw(
raw: netconfig::IncomingRequest,
reputation_changes: Vec<ReputationChange>,
) -> Result<Self, Error> {
let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
let payload = match JustificationRequest::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let response = netconfig::OutgoingResponse {
result: Err(()),
reputation_changes,
sent_feedback: None,
};
if let Err(_) = pending_response.send(response) {
return Err(Error::DecodingErrorNoReputationChange(peer, err))
}
return Err(Error::DecodingError(peer, err))
},
};
Ok(Self::new(peer, payload, pending_response))
}
}
/// Receiver for incoming BEEFY justifications requests.
///
/// Takes care of decoding and handling of invalid encoded requests.
pub(crate) struct IncomingRequestReceiver {
raw: mpsc::Receiver<netconfig::IncomingRequest>,
}
impl IncomingRequestReceiver {
pub fn new(inner: mpsc::Receiver<netconfig::IncomingRequest>) -> Self {
Self { raw: inner }
}
/// Try to receive the next incoming request.
///
/// Any received request will be decoded, on decoding errors the provided reputation changes
/// will be applied and an error will be reported.
pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
where
B: Block,
F: FnOnce() -> Vec<ReputationChange>,
{
let req = match self.raw.next().await {
None => return Err(Error::RequestChannelExhausted),
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes())?,
};
Ok(req)
}
}
/// Handler for incoming BEEFY justifications requests from a remote peer.
pub struct BeefyJustifsRequestHandler<B, Client> {
pub(crate) request_receiver: IncomingRequestReceiver,
pub(crate) justif_protocol_name: ProtocolName,
pub(crate) client: Arc<Client>,
pub(crate) metrics: Option<OnDemandIncomingRequestsMetrics>,
pub(crate) _block: PhantomData<B>,
}
impl<B, Client> BeefyJustifsRequestHandler<B, Client>
where
B: Block,
Client: BlockBackend<B> + Send + Sync,
{
/// Create a new [`BeefyJustifsRequestHandler`].
pub fn new<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
client: Arc<Client>,
prometheus_registry: Option<prometheus::Registry>,
) -> (Self, RequestResponseConfig) {
let (request_receiver, config) =
on_demand_justifications_protocol_config(genesis_hash, fork_id);
let justif_protocol_name = config.name.clone();
let metrics = register_metrics(prometheus_registry);
(
Self { request_receiver, justif_protocol_name, client, metrics, _block: PhantomData },
config,
)
}
/// Network request-response protocol name used by this handler.
pub fn protocol_name(&self) -> ProtocolName {
self.justif_protocol_name.clone()
}
// Sends back justification response if justification found in client backend.
fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
// TODO (issue #12293): validate `request` and change peer reputation for invalid requests.
let maybe_encoded_proof = if let Some(hash) =
self.client.block_hash(request.payload.begin).map_err(Error::Client)?
{
self.client
.justifications(hash)
.map_err(Error::Client)?
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
// No BEEFY justification present.
.ok_or(())
} else {
Err(())
};
request
.pending_response
.send(netconfig::OutgoingResponse {
result: maybe_encoded_proof,
reputation_changes: Vec::new(),
sent_feedback: None,
})
.map_err(|_| Error::SendResponse)
}
/// Run [`BeefyJustifsRequestHandler`].
pub async fn run(mut self) {
trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");
while let Ok(request) = self.request_receiver.recv(|| vec![]).await {
let peer = request.peer;
match self.handle_request(request) {
Ok(()) => {
metric_inc!(self, beefy_successful_justification_responses);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 Handled BEEFY justification request from {:?}.", peer
)
},
Err(e) => {
metric_inc!(self, beefy_failed_justification_responses);
// TODO (issue #12293): apply reputation changes here based on error type.
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
)
},
}
}
}
}
@@ -0,0 +1,103 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Request/response protocol for syncing BEEFY justifications.
mod incoming_requests_handler;
pub(crate) mod outgoing_requests_engine;
pub use incoming_requests_handler::BeefyJustifsRequestHandler;
use futures::channel::mpsc;
use std::time::Duration;
use codec::{Decode, Encode, Error as CodecError};
use sc_network::{config::RequestResponseConfig, PeerId};
use sp_runtime::traits::{Block, NumberFor};
use crate::communication::beefy_protocol_name::justifications_protocol_name;
use incoming_requests_handler::IncomingRequestReceiver;
// 10 seems reasonable, considering justifs are explicitly requested only
// for mandatory blocks, by nodes that are syncing/catching-up.
const JUSTIF_CHANNEL_SIZE: usize = 10;
const MAX_RESPONSE_SIZE: u64 = 1024 * 1024;
const JUSTIF_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
const BEEFY_SYNC_LOG_TARGET: &str = "beefy::sync";
/// Get the configuration for the BEEFY justifications Request/response protocol.
///
/// Returns a receiver for messages received on this protocol and the requested
/// `ProtocolConfig`.
///
/// Consider using [`BeefyJustifsRequestHandler`] instead of this low-level function.
pub(crate) fn on_demand_justifications_protocol_config<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
) -> (IncomingRequestReceiver, RequestResponseConfig) {
let name = justifications_protocol_name(genesis_hash, fork_id);
let fallback_names = vec![];
let (tx, rx) = mpsc::channel(JUSTIF_CHANNEL_SIZE);
let rx = IncomingRequestReceiver::new(rx);
let cfg = RequestResponseConfig {
name,
fallback_names,
max_request_size: 32,
max_response_size: MAX_RESPONSE_SIZE,
// We are connected to all validators:
request_timeout: JUSTIF_REQUEST_TIMEOUT,
inbound_queue: Some(tx),
};
(rx, cfg)
}
/// BEEFY justification request.
#[derive(Debug, Clone, Encode, Decode)]
pub struct JustificationRequest<B: Block> {
/// Start collecting proofs from this block.
pub begin: NumberFor<B>,
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Client(#[from] sp_blockchain::Error),
#[error(transparent)]
RuntimeApi(#[from] sp_api::ApiError),
/// Decoding failed, we were able to change the peer's reputation accordingly.
#[error("Decoding request failed for peer {0}.")]
DecodingError(PeerId, #[source] CodecError),
/// Decoding failed, but sending reputation change failed.
#[error("Decoding request failed for peer {0}, and changing reputation failed.")]
DecodingErrorNoReputationChange(PeerId, #[source] CodecError),
/// Incoming request stream exhausted. Should only happen on shutdown.
#[error("Incoming request channel got closed.")]
RequestChannelExhausted,
#[error("Failed to send response.")]
SendResponse,
#[error("Received invalid response.")]
InvalidResponse,
}
@@ -0,0 +1,246 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Generating request logic for request/response protocol for syncing BEEFY justifications.
use codec::Encode;
use futures::channel::{oneshot, oneshot::Canceled};
use log::{debug, warn};
use parking_lot::Mutex;
use sc_network::{PeerId, ProtocolName};
use sc_network_common::{
request_responses::{IfDisconnected, RequestFailure},
service::NetworkRequest,
};
use sp_consensus_beefy::{crypto::AuthorityId, ValidatorSet};
use sp_runtime::traits::{Block, NumberFor};
use std::{collections::VecDeque, result::Result, sync::Arc};
use crate::{
communication::request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
metric_inc,
metrics::{register_metrics, OnDemandOutgoingRequestsMetrics},
KnownPeers,
};
/// Response type received from network.
type Response = Result<Vec<u8>, RequestFailure>;
/// Used to receive a response from the network.
type ResponseReceiver = oneshot::Receiver<Response>;
#[derive(Clone, Debug)]
struct RequestInfo<B: Block> {
block: NumberFor<B>,
active_set: ValidatorSet<AuthorityId>,
}
enum State<B: Block> {
Idle,
AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
}
pub struct OnDemandJustificationsEngine<B: Block> {
network: Arc<dyn NetworkRequest + Send + Sync>,
protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>,
peers_cache: VecDeque<PeerId>,
state: State<B>,
metrics: Option<OnDemandOutgoingRequestsMetrics>,
}
impl<B: Block> OnDemandJustificationsEngine<B> {
pub fn new(
network: Arc<dyn NetworkRequest + Send + Sync>,
protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>,
prometheus_registry: Option<prometheus::Registry>,
) -> Self {
let metrics = register_metrics(prometheus_registry);
Self {
network,
protocol_name,
live_peers,
peers_cache: VecDeque::new(),
state: State::Idle,
metrics,
}
}
fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
self.peers_cache = self.live_peers.lock().further_than(block);
}
fn try_next_peer(&mut self) -> Option<PeerId> {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
let live = self.live_peers.lock();
while let Some(peer) = self.peers_cache.pop_front() {
if live.contains(&peer) {
return Some(peer)
}
}
None
}
fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo<B>) {
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 requesting justif #{:?} from peer {:?}", req_info.block, peer,
);
let payload = JustificationRequest::<B> { begin: req_info.block }.encode();
let (tx, rx) = oneshot::channel();
self.network.start_request(
peer,
self.protocol_name.clone(),
payload,
tx,
IfDisconnected::ImmediateError,
);
self.state = State::AwaitingResponse(peer, req_info, rx);
}
/// Start new justification request for `block`, if no other request is in progress.
///
/// `active_set` will be used to verify validity of potential responses.
pub fn request(&mut self, block: NumberFor<B>, active_set: ValidatorSet<AuthorityId>) {
// ignore new requests while there's already one pending
if matches!(self.state, State::AwaitingResponse(_, _, _)) {
return
}
self.reset_peers_cache_for_block(block);
// Start the requests engine - each unsuccessful received response will automatically
// trigger a new request to the next peer in the `peers_cache` until there are none left.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, RequestInfo { block, active_set });
} else {
metric_inc!(self, beefy_on_demand_justification_no_peer_to_request_from);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 no good peers to request justif #{:?} from", block
);
}
}
/// Cancel any pending request for block numbers smaller or equal to `block`.
pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) {
match &self.state {
State::AwaitingResponse(_, req_info, _) if req_info.block <= block => {
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 cancel pending request for justification #{:?}", req_info.block
);
self.state = State::Idle;
},
_ => (),
}
}
fn process_response(
&mut self,
peer: PeerId,
req_info: &RequestInfo<B>,
response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_peer_hang_up);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
req_info.block,
peer,
e
);
Error::InvalidResponse
})?
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_peer_error);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
req_info.block,
peer,
e
);
Error::InvalidResponse
})
.and_then(|encoded| {
decode_and_verify_finality_proof::<B>(
&encoded[..],
req_info.block,
&req_info.active_set,
)
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_invalid_proof);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
req_info.block, peer, e
);
Error::InvalidResponse
})
})
}
pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
let (peer, req_info, resp) = match &mut self.state {
State::Idle => {
futures::pending!();
return None
},
State::AwaitingResponse(peer, req_info, receiver) => {
let resp = receiver.await;
(*peer, req_info.clone(), resp)
},
};
// We received the awaited response. Our 'receiver' will never generate any other response,
// meaning we're done with current state. Move the engine to `State::Idle`.
self.state = State::Idle;
let block = req_info.block;
self.process_response(peer, &req_info, resp)
.map_err(|_| {
// No valid justification received, try next peer in our set.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, req_info);
} else {
warn!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 ran out of peers to request justif #{:?} from", block
);
}
})
.map(|proof| {
metric_inc!(self, beefy_on_demand_justification_good_proof);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 received valid on-demand justif #{:?} from {:?}", block, peer
);
proof
})
.ok()
}
}
@@ -0,0 +1,51 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! BEEFY gadget specific errors
//!
//! Used for BEEFY gadget interal error handling only
use std::fmt::Debug;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Backend: {0}")]
Backend(String),
#[error("Keystore error: {0}")]
Keystore(String),
#[error("Runtime api error: {0}")]
RuntimeApi(sp_api::ApiError),
#[error("Signature error: {0}")]
Signature(String),
#[error("Session uninitialized")]
UninitSession,
}
#[cfg(test)]
impl PartialEq for Error {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Error::Backend(s1), Error::Backend(s2)) => s1 == s2,
(Error::Keystore(s1), Error::Keystore(s2)) => s1 == s2,
(Error::RuntimeApi(_), Error::RuntimeApi(_)) => true,
(Error::Signature(s1), Error::Signature(s2)) => s1 == s2,
(Error::UninitSession, Error::UninitSession) => true,
_ => false,
}
}
}
@@ -0,0 +1,189 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use log::debug;
use sp_consensus_beefy::{BeefyApi, BEEFY_ENGINE_ID};
use std::{collections::HashMap, sync::Arc};
use sp_api::{ProvideRuntimeApi, TransactionFor};
use sp_blockchain::well_known_cache_keys;
use sp_consensus::Error as ConsensusError;
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
EncodedJustification,
};
use sc_client_api::backend::Backend;
use sc_consensus::{BlockCheckParams, BlockImport, BlockImportParams, ImportResult};
use crate::{
communication::notification::BeefyVersionedFinalityProofSender,
justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
metric_inc,
metrics::BlockImportMetrics,
LOG_TARGET,
};
/// A block-import handler for BEEFY.
///
/// This scans each imported block for BEEFY justifications and verifies them.
/// Wraps a `inner: BlockImport` and ultimately defers to it.
///
/// When using BEEFY, the block import worker should be using this block import object.
pub struct BeefyBlockImport<Block: BlockT, Backend, RuntimeApi, I> {
backend: Arc<Backend>,
runtime: Arc<RuntimeApi>,
inner: I,
justification_sender: BeefyVersionedFinalityProofSender<Block>,
metrics: Option<BlockImportMetrics>,
}
impl<Block: BlockT, BE, Runtime, I: Clone> Clone for BeefyBlockImport<Block, BE, Runtime, I> {
fn clone(&self) -> Self {
BeefyBlockImport {
backend: self.backend.clone(),
runtime: self.runtime.clone(),
inner: self.inner.clone(),
justification_sender: self.justification_sender.clone(),
metrics: self.metrics.clone(),
}
}
}
impl<Block: BlockT, BE, Runtime, I> BeefyBlockImport<Block, BE, Runtime, I> {
/// Create a new BeefyBlockImport.
pub fn new(
backend: Arc<BE>,
runtime: Arc<Runtime>,
inner: I,
justification_sender: BeefyVersionedFinalityProofSender<Block>,
metrics: Option<BlockImportMetrics>,
) -> BeefyBlockImport<Block, BE, Runtime, I> {
BeefyBlockImport { backend, runtime, inner, justification_sender, metrics }
}
}
impl<Block, BE, Runtime, I> BeefyBlockImport<Block, BE, Runtime, I>
where
Block: BlockT,
BE: Backend<Block>,
Runtime: ProvideRuntimeApi<Block>,
Runtime::Api: BeefyApi<Block> + Send,
{
fn decode_and_verify(
&self,
encoded: &EncodedJustification,
number: NumberFor<Block>,
hash: <Block as BlockT>::Hash,
) -> Result<BeefyVersionedFinalityProof<Block>, ConsensusError> {
use ConsensusError::ClientImport as ImportError;
let beefy_genesis = self
.runtime
.runtime_api()
.beefy_genesis(hash)
.map_err(|e| ImportError(e.to_string()))?
.ok_or_else(|| ImportError("Unknown BEEFY genesis".to_string()))?;
if number < beefy_genesis {
return Err(ImportError("BEEFY genesis is set for future block".to_string()))
}
let validator_set = self
.runtime
.runtime_api()
.validator_set(hash)
.map_err(|e| ImportError(e.to_string()))?
.ok_or_else(|| ImportError("Unknown validator set".to_string()))?;
decode_and_verify_finality_proof::<Block>(&encoded[..], number, &validator_set)
}
}
#[async_trait::async_trait]
impl<Block, BE, Runtime, I> BlockImport<Block> for BeefyBlockImport<Block, BE, Runtime, I>
where
Block: BlockT,
BE: Backend<Block>,
I: BlockImport<
Block,
Error = ConsensusError,
Transaction = sp_api::TransactionFor<Runtime, Block>,
> + Send
+ Sync,
Runtime: ProvideRuntimeApi<Block> + Send + Sync,
Runtime::Api: BeefyApi<Block>,
{
type Error = ConsensusError;
type Transaction = TransactionFor<Runtime, Block>;
async fn import_block(
&mut self,
mut block: BlockImportParams<Block, Self::Transaction>,
new_cache: HashMap<well_known_cache_keys::Id, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
let hash = block.post_hash();
let number = *block.header.number();
let beefy_encoded = block.justifications.as_mut().and_then(|just| {
let encoded = just.get(BEEFY_ENGINE_ID).cloned();
// Remove BEEFY justification from the list before giving to `inner`; we send it to the
// voter (beefy-gadget) and it will append it to the backend after block is finalized.
just.remove(BEEFY_ENGINE_ID);
encoded
});
// Run inner block import.
let inner_import_result = self.inner.import_block(block, new_cache).await?;
match (beefy_encoded, &inner_import_result) {
(Some(encoded), ImportResult::Imported(_)) => {
match self.decode_and_verify(&encoded, number, hash) {
Ok(proof) => {
// The proof is valid and the block is imported and final, we can import.
debug!(
target: LOG_TARGET,
"🥩 import justif {:?} for block number {:?}.", proof, number
);
// Send the justification to the BEEFY voter for processing.
self.justification_sender
.notify(|| Ok::<_, ()>(proof))
.expect("the closure always returns Ok; qed.");
metric_inc!(self, beefy_good_justification_imports);
},
Err(err) => {
debug!(
target: LOG_TARGET,
"🥩 error importing BEEFY justification for block {:?}: {:?}",
number,
err,
);
metric_inc!(self, beefy_bad_justification_imports);
},
}
},
_ => (),
}
Ok(inner_import_result)
}
async fn check_block(
&mut self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.check_block(block).await
}
}
@@ -0,0 +1,188 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::keystore::BeefyKeystore;
use codec::{Decode, Encode};
use sp_consensus::Error as ConsensusError;
use sp_consensus_beefy::{
crypto::{AuthorityId, Signature},
ValidatorSet, VersionedFinalityProof,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
/// A finality proof with matching BEEFY authorities' signatures.
pub type BeefyVersionedFinalityProof<Block> =
sp_consensus_beefy::VersionedFinalityProof<NumberFor<Block>, Signature>;
/// Decode and verify a Beefy FinalityProof.
pub(crate) fn decode_and_verify_finality_proof<Block: BlockT>(
encoded: &[u8],
target_number: NumberFor<Block>,
validator_set: &ValidatorSet<AuthorityId>,
) -> Result<BeefyVersionedFinalityProof<Block>, ConsensusError> {
let proof = <BeefyVersionedFinalityProof<Block>>::decode(&mut &*encoded)
.map_err(|_| ConsensusError::InvalidJustification)?;
verify_with_validator_set::<Block>(target_number, validator_set, &proof).map(|_| proof)
}
/// Verify the Beefy finality proof against the validator set at the block it was generated.
fn verify_with_validator_set<Block: BlockT>(
target_number: NumberFor<Block>,
validator_set: &ValidatorSet<AuthorityId>,
proof: &BeefyVersionedFinalityProof<Block>,
) -> Result<(), ConsensusError> {
match proof {
VersionedFinalityProof::V1(signed_commitment) => {
if signed_commitment.signatures.len() != validator_set.len() ||
signed_commitment.commitment.validator_set_id != validator_set.id() ||
signed_commitment.commitment.block_number != target_number
{
return Err(ConsensusError::InvalidJustification)
}
// Arrangement of signatures in the commitment should be in the same order
// as validators for that set.
let message = signed_commitment.commitment.encode();
let valid_signatures = validator_set
.validators()
.into_iter()
.zip(signed_commitment.signatures.iter())
.filter(|(id, signature)| {
signature
.as_ref()
.map(|sig| BeefyKeystore::verify(id, sig, &message[..]))
.unwrap_or(false)
})
.count();
if valid_signatures >= crate::round::threshold(validator_set.len()) {
Ok(())
} else {
Err(ConsensusError::InvalidJustification)
}
},
}
}
#[cfg(test)]
pub(crate) mod tests {
use sp_consensus_beefy::{
known_payloads, Commitment, Keyring, Payload, SignedCommitment, VersionedFinalityProof,
};
use substrate_test_runtime_client::runtime::Block;
use super::*;
use crate::tests::make_beefy_ids;
pub(crate) fn new_finality_proof(
block_num: NumberFor<Block>,
validator_set: &ValidatorSet<AuthorityId>,
keys: &[Keyring],
) -> BeefyVersionedFinalityProof<Block> {
let commitment = Commitment {
payload: Payload::from_single_entry(known_payloads::MMR_ROOT_ID, vec![]),
block_number: block_num,
validator_set_id: validator_set.id(),
};
let message = commitment.encode();
let signatures = keys.iter().map(|key| Some(key.sign(&message))).collect();
VersionedFinalityProof::V1(SignedCommitment { commitment, signatures })
}
#[test]
fn should_verify_with_validator_set() {
let keys = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie];
let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
// build valid justification
let block_num = 42;
let proof = new_finality_proof(block_num, &validator_set, keys);
let good_proof = proof.clone().into();
// should verify successfully
verify_with_validator_set::<Block>(block_num, &validator_set, &good_proof).unwrap();
// wrong block number -> should fail verification
let good_proof = proof.clone().into();
match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &good_proof) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
};
// wrong validator set id -> should fail verification
let good_proof = proof.clone().into();
let other = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap();
match verify_with_validator_set::<Block>(block_num, &other, &good_proof) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
};
// wrong signatures length -> should fail verification
let mut bad_proof = proof.clone();
// change length of signatures
let bad_signed_commitment = match bad_proof {
VersionedFinalityProof::V1(ref mut sc) => sc,
};
bad_signed_commitment.signatures.pop().flatten().unwrap();
match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
};
// not enough signatures -> should fail verification
let mut bad_proof = proof.clone();
let bad_signed_commitment = match bad_proof {
VersionedFinalityProof::V1(ref mut sc) => sc,
};
// remove a signature (but same length)
*bad_signed_commitment.signatures.first_mut().unwrap() = None;
match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
};
// not enough _correct_ signatures -> should fail verification
let mut bad_proof = proof.clone();
let bad_signed_commitment = match bad_proof {
VersionedFinalityProof::V1(ref mut sc) => sc,
};
// change a signature to a different key
*bad_signed_commitment.signatures.first_mut().unwrap() =
Some(Keyring::Dave.sign(&bad_signed_commitment.commitment.encode()));
match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
};
}
#[test]
fn should_decode_and_verify_finality_proof() {
let keys = &[Keyring::Alice, Keyring::Bob];
let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
let block_num = 1;
// build valid justification
let proof = new_finality_proof(block_num, &validator_set, keys);
let versioned_proof: BeefyVersionedFinalityProof<Block> = proof.into();
let encoded = versioned_proof.encode();
// should successfully decode and verify
let verified =
decode_and_verify_finality_proof::<Block>(&encoded, block_num, &validator_set).unwrap();
assert_eq!(verified, versioned_proof);
}
}
@@ -0,0 +1,332 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use sp_application_crypto::RuntimeAppPublic;
use sp_core::keccak_256;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use log::warn;
use sp_consensus_beefy::{
crypto::{Public, Signature},
BeefyAuthorityId, KEY_TYPE,
};
use crate::{error, LOG_TARGET};
/// Hasher used for BEEFY signatures.
pub(crate) type BeefySignatureHasher = sp_runtime::traits::Keccak256;
/// A BEEFY specific keystore implemented as a `Newtype`. This is basically a
/// wrapper around [`sp_keystore::SyncCryptoStore`] and allows to customize
/// common cryptographic functionality.
pub(crate) struct BeefyKeystore(Option<SyncCryptoStorePtr>);
impl BeefyKeystore {
/// Check if the keystore contains a private key for one of the public keys
/// contained in `keys`. A public key with a matching private key is known
/// as a local authority id.
///
/// Return the public key for which we also do have a private key. If no
/// matching private key is found, `None` will be returned.
pub fn authority_id(&self, keys: &[Public]) -> Option<Public> {
let store = self.0.clone()?;
// we do check for multiple private keys as a key store sanity check.
let public: Vec<Public> = keys
.iter()
.filter(|k| SyncCryptoStore::has_keys(&*store, &[(k.to_raw_vec(), KEY_TYPE)]))
.cloned()
.collect();
if public.len() > 1 {
warn!(
target: LOG_TARGET,
"🥩 Multiple private keys found for: {:?} ({})",
public,
public.len()
);
}
public.get(0).cloned()
}
/// Sign `message` with the `public` key.
///
/// Note that `message` usually will be pre-hashed before being signed.
///
/// Return the message signature or an error in case of failure.
pub fn sign(&self, public: &Public, message: &[u8]) -> Result<Signature, error::Error> {
let store = self.0.clone().ok_or_else(|| error::Error::Keystore("no Keystore".into()))?;
let msg = keccak_256(message);
let public = public.as_ref();
let sig = SyncCryptoStore::ecdsa_sign_prehashed(&*store, KEY_TYPE, public, &msg)
.map_err(|e| error::Error::Keystore(e.to_string()))?
.ok_or_else(|| error::Error::Signature("ecdsa_sign_prehashed() failed".to_string()))?;
// check that `sig` has the expected result type
let sig = sig.clone().try_into().map_err(|_| {
error::Error::Signature(format!("invalid signature {:?} for key {:?}", sig, public))
})?;
Ok(sig)
}
/// Returns a vector of [`sp_consensus_beefy::crypto::Public`] keys which are currently
/// supported (i.e. found in the keystore).
pub fn public_keys(&self) -> Result<Vec<Public>, error::Error> {
let store = self.0.clone().ok_or_else(|| error::Error::Keystore("no Keystore".into()))?;
let pk: Vec<Public> = SyncCryptoStore::ecdsa_public_keys(&*store, KEY_TYPE)
.drain(..)
.map(Public::from)
.collect();
Ok(pk)
}
/// Use the `public` key to verify that `sig` is a valid signature for `message`.
///
/// Return `true` if the signature is authentic, `false` otherwise.
pub fn verify(public: &Public, sig: &Signature, message: &[u8]) -> bool {
BeefyAuthorityId::<BeefySignatureHasher>::verify(public, sig, message)
}
}
impl From<Option<SyncCryptoStorePtr>> for BeefyKeystore {
fn from(store: Option<SyncCryptoStorePtr>) -> BeefyKeystore {
BeefyKeystore(store)
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use sc_keystore::LocalKeystore;
use sp_core::{ecdsa, Pair};
use sp_consensus_beefy::{crypto, Keyring};
use super::*;
use crate::error::Error;
fn keystore() -> SyncCryptoStorePtr {
Arc::new(LocalKeystore::in_memory())
}
#[test]
fn verify_should_work() {
let msg = keccak_256(b"I am Alice!");
let sig = Keyring::Alice.sign(b"I am Alice!");
assert!(ecdsa::Pair::verify_prehashed(
&sig.clone().into(),
&msg,
&Keyring::Alice.public().into(),
));
// different public key -> fail
assert!(!ecdsa::Pair::verify_prehashed(
&sig.clone().into(),
&msg,
&Keyring::Bob.public().into(),
));
let msg = keccak_256(b"I am not Alice!");
// different msg -> fail
assert!(
!ecdsa::Pair::verify_prehashed(&sig.into(), &msg, &Keyring::Alice.public().into(),)
);
}
#[test]
fn pair_works() {
let want = crypto::Pair::from_string("//Alice", None).expect("Pair failed").to_raw_vec();
let got = Keyring::Alice.pair().to_raw_vec();
assert_eq!(want, got);
let want = crypto::Pair::from_string("//Bob", None).expect("Pair failed").to_raw_vec();
let got = Keyring::Bob.pair().to_raw_vec();
assert_eq!(want, got);
let want = crypto::Pair::from_string("//Charlie", None).expect("Pair failed").to_raw_vec();
let got = Keyring::Charlie.pair().to_raw_vec();
assert_eq!(want, got);
let want = crypto::Pair::from_string("//Dave", None).expect("Pair failed").to_raw_vec();
let got = Keyring::Dave.pair().to_raw_vec();
assert_eq!(want, got);
let want = crypto::Pair::from_string("//Eve", None).expect("Pair failed").to_raw_vec();
let got = Keyring::Eve.pair().to_raw_vec();
assert_eq!(want, got);
let want = crypto::Pair::from_string("//Ferdie", None).expect("Pair failed").to_raw_vec();
let got = Keyring::Ferdie.pair().to_raw_vec();
assert_eq!(want, got);
let want = crypto::Pair::from_string("//One", None).expect("Pair failed").to_raw_vec();
let got = Keyring::One.pair().to_raw_vec();
assert_eq!(want, got);
let want = crypto::Pair::from_string("//Two", None).expect("Pair failed").to_raw_vec();
let got = Keyring::Two.pair().to_raw_vec();
assert_eq!(want, got);
}
#[test]
fn authority_id_works() {
let store = keystore();
let alice: crypto::Public =
SyncCryptoStore::ecdsa_generate_new(&*store, KEY_TYPE, Some(&Keyring::Alice.to_seed()))
.ok()
.unwrap()
.into();
let bob = Keyring::Bob.public();
let charlie = Keyring::Charlie.public();
let store: BeefyKeystore = Some(store).into();
let mut keys = vec![bob, charlie];
let id = store.authority_id(keys.as_slice());
assert!(id.is_none());
keys.push(alice.clone());
let id = store.authority_id(keys.as_slice()).unwrap();
assert_eq!(id, alice);
}
#[test]
fn sign_works() {
let store = keystore();
let alice: crypto::Public =
SyncCryptoStore::ecdsa_generate_new(&*store, KEY_TYPE, Some(&Keyring::Alice.to_seed()))
.ok()
.unwrap()
.into();
let store: BeefyKeystore = Some(store).into();
let msg = b"are you involved or commited?";
let sig1 = store.sign(&alice, msg).unwrap();
let sig2 = Keyring::Alice.sign(msg);
assert_eq!(sig1, sig2);
}
#[test]
fn sign_error() {
let store = keystore();
let _ =
SyncCryptoStore::ecdsa_generate_new(&*store, KEY_TYPE, Some(&Keyring::Bob.to_seed()))
.ok()
.unwrap();
let store: BeefyKeystore = Some(store).into();
let alice = Keyring::Alice.public();
let msg = b"are you involved or commited?";
let sig = store.sign(&alice, msg).err().unwrap();
let err = Error::Signature("ecdsa_sign_prehashed() failed".to_string());
assert_eq!(sig, err);
}
#[test]
fn sign_no_keystore() {
let store: BeefyKeystore = None.into();
let alice = Keyring::Alice.public();
let msg = b"are you involved or commited";
let sig = store.sign(&alice, msg).err().unwrap();
let err = Error::Keystore("no Keystore".to_string());
assert_eq!(sig, err);
}
#[test]
fn verify_works() {
let store = keystore();
let alice: crypto::Public =
SyncCryptoStore::ecdsa_generate_new(&*store, KEY_TYPE, Some(&Keyring::Alice.to_seed()))
.ok()
.unwrap()
.into();
let store: BeefyKeystore = Some(store).into();
// `msg` and `sig` match
let msg = b"are you involved or commited?";
let sig = store.sign(&alice, msg).unwrap();
assert!(BeefyKeystore::verify(&alice, &sig, msg));
// `msg and `sig` don't match
let msg = b"you are just involved";
assert!(!BeefyKeystore::verify(&alice, &sig, msg));
}
// Note that we use keys with and without a seed for this test.
#[test]
fn public_keys_works() {
const TEST_TYPE: sp_application_crypto::KeyTypeId =
sp_application_crypto::KeyTypeId(*b"test");
let store = keystore();
let add_key = |key_type, seed: Option<&str>| {
SyncCryptoStore::ecdsa_generate_new(&*store, key_type, seed).unwrap()
};
// test keys
let _ = add_key(TEST_TYPE, Some(Keyring::Alice.to_seed().as_str()));
let _ = add_key(TEST_TYPE, Some(Keyring::Bob.to_seed().as_str()));
let _ = add_key(TEST_TYPE, None);
let _ = add_key(TEST_TYPE, None);
// BEEFY keys
let _ = add_key(KEY_TYPE, Some(Keyring::Dave.to_seed().as_str()));
let _ = add_key(KEY_TYPE, Some(Keyring::Eve.to_seed().as_str()));
let key1: crypto::Public = add_key(KEY_TYPE, None).into();
let key2: crypto::Public = add_key(KEY_TYPE, None).into();
let store: BeefyKeystore = Some(store).into();
let keys = store.public_keys().ok().unwrap();
assert!(keys.len() == 4);
assert!(keys.contains(&Keyring::Dave.public()));
assert!(keys.contains(&Keyring::Eve.public()));
assert!(keys.contains(&key1));
assert!(keys.contains(&key2));
}
}
+493
View File
@@ -0,0 +1,493 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
communication::{
notification::{
BeefyBestBlockSender, BeefyBestBlockStream, BeefyVersionedFinalityProofSender,
BeefyVersionedFinalityProofStream,
},
peers::KnownPeers,
request_response::{
outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler,
},
},
import::BeefyBlockImport,
metrics::register_metrics,
round::Rounds,
worker::PersistedState,
};
use futures::{stream::Fuse, StreamExt};
use log::{error, info};
use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_consensus::BlockImport;
use sc_network::ProtocolName;
use sc_network_common::service::NetworkRequest;
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{
Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult,
};
use sp_consensus::{Error as ConsensusError, SyncOracle};
use sp_consensus_beefy::{
crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet, BEEFY_ENGINE_ID,
GENESIS_AUTHORITY_SET_ID,
};
use sp_keystore::SyncCryptoStorePtr;
use sp_mmr_primitives::MmrApi;
use sp_runtime::traits::{Block, Zero};
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
mod aux_schema;
mod error;
mod keystore;
mod metrics;
mod round;
mod worker;
pub mod communication;
pub mod import;
pub mod justification;
pub use communication::beefy_protocol_name::{
gossip_protocol_name, justifications_protocol_name as justifs_protocol_name,
};
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "beefy";
/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as
/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking
/// issue is <https://github.com/rust-lang/rust/issues/41517>.
pub trait Client<B, BE>:
BlockchainEvents<B> + HeaderBackend<B> + Finalizer<B, BE> + Send + Sync
where
B: Block,
BE: Backend<B>,
{
// empty
}
impl<B, BE, T> Client<B, BE> for T
where
B: Block,
BE: Backend<B>,
T: BlockchainEvents<B>
+ HeaderBackend<B>
+ Finalizer<B, BE>
+ ProvideRuntimeApi<B>
+ Send
+ Sync,
{
// empty
}
/// Links between the block importer, the background voter and the RPC layer,
/// to be used by the voter.
#[derive(Clone)]
pub struct BeefyVoterLinks<B: Block> {
// BlockImport -> Voter links
/// Stream of BEEFY signed commitments from block import to voter.
pub from_block_import_justif_stream: BeefyVersionedFinalityProofStream<B>,
// Voter -> RPC links
/// Sends BEEFY signed commitments from voter to RPC.
pub to_rpc_justif_sender: BeefyVersionedFinalityProofSender<B>,
/// Sends BEEFY best block hashes from voter to RPC.
pub to_rpc_best_block_sender: BeefyBestBlockSender<B>,
}
/// Links used by the BEEFY RPC layer, from the BEEFY background voter.
#[derive(Clone)]
pub struct BeefyRPCLinks<B: Block> {
/// Stream of signed commitments coming from the voter.
pub from_voter_justif_stream: BeefyVersionedFinalityProofStream<B>,
/// Stream of BEEFY best block hashes coming from the voter.
pub from_voter_best_beefy_stream: BeefyBestBlockStream<B>,
}
/// Make block importer and link half necessary to tie the background voter to it.
pub fn beefy_block_import_and_links<B, BE, RuntimeApi, I>(
wrapped_block_import: I,
backend: Arc<BE>,
runtime: Arc<RuntimeApi>,
prometheus_registry: Option<Registry>,
) -> (BeefyBlockImport<B, BE, RuntimeApi, I>, BeefyVoterLinks<B>, BeefyRPCLinks<B>)
where
B: Block,
BE: Backend<B>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<RuntimeApi, B>>
+ Send
+ Sync,
RuntimeApi: ProvideRuntimeApi<B> + Send + Sync,
RuntimeApi::Api: BeefyApi<B>,
{
// Voter -> RPC links
let (to_rpc_justif_sender, from_voter_justif_stream) =
BeefyVersionedFinalityProofStream::<B>::channel();
let (to_rpc_best_block_sender, from_voter_best_beefy_stream) =
BeefyBestBlockStream::<B>::channel();
// BlockImport -> Voter links
let (to_voter_justif_sender, from_block_import_justif_stream) =
BeefyVersionedFinalityProofStream::<B>::channel();
let metrics = register_metrics(prometheus_registry);
// BlockImport
let import = BeefyBlockImport::new(
backend,
runtime,
wrapped_block_import,
to_voter_justif_sender,
metrics,
);
let voter_links = BeefyVoterLinks {
from_block_import_justif_stream,
to_rpc_justif_sender,
to_rpc_best_block_sender,
};
let rpc_links = BeefyRPCLinks { from_voter_best_beefy_stream, from_voter_justif_stream };
(import, voter_links, rpc_links)
}
/// BEEFY gadget network parameters.
pub struct BeefyNetworkParams<B: Block, N> {
/// Network implementing gossip, requests and sync-oracle.
pub network: Arc<N>,
/// Chain specific BEEFY gossip protocol name. See
/// [`communication::beefy_protocol_name::gossip_protocol_name`].
pub gossip_protocol_name: ProtocolName,
/// Chain specific BEEFY on-demand justifications protocol name. See
/// [`communication::beefy_protocol_name::justifications_protocol_name`].
pub justifications_protocol_name: ProtocolName,
pub _phantom: PhantomData<B>,
}
/// BEEFY gadget initialization parameters.
pub struct BeefyParams<B: Block, BE, C, N, P, R> {
/// BEEFY client
pub client: Arc<C>,
/// Client Backend
pub backend: Arc<BE>,
/// BEEFY Payload provider
pub payload_provider: P,
/// Runtime Api Provider
pub runtime: Arc<R>,
/// Local key store
pub key_store: Option<SyncCryptoStorePtr>,
/// BEEFY voter network params
pub network_params: BeefyNetworkParams<B, N>,
/// Minimal delta between blocks, BEEFY should vote for
pub min_block_delta: u32,
/// Prometheus metric registry
pub prometheus_registry: Option<Registry>,
/// Links between the block importer, the background voter and the RPC layer.
pub links: BeefyVoterLinks<B>,
/// Handler for incoming BEEFY justifications requests from a remote peer.
pub on_demand_justifications_handler: BeefyJustifsRequestHandler<B, C>,
}
/// Start the BEEFY gadget.
///
/// This is a thin shim around running and awaiting a BEEFY worker.
pub async fn start_beefy_gadget<B, BE, C, N, P, R>(beefy_params: BeefyParams<B, BE, C, N, P, R>)
where
B: Block,
BE: Backend<B>,
C: Client<B, BE> + BlockBackend<B>,
P: PayloadProvider<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B> + MmrApi<B, MmrRootHash, NumberFor<B>>,
N: GossipNetwork<B> + NetworkRequest + SyncOracle + Send + Sync + 'static,
{
let BeefyParams {
client,
backend,
payload_provider,
runtime,
key_store,
network_params,
min_block_delta,
prometheus_registry,
links,
on_demand_justifications_handler,
} = beefy_params;
let BeefyNetworkParams { network, gossip_protocol_name, justifications_protocol_name, .. } =
network_params;
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(communication::gossip::GossipValidator::new(known_peers.clone()));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
network.clone(),
gossip_protocol_name,
gossip_validator.clone(),
None,
);
let metrics = register_metrics(prometheus_registry.clone());
// The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name,
known_peers,
prometheus_registry.clone(),
);
// Subscribe to finality notifications and justifications before waiting for runtime pallet and
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
let mut finality_notifications = client.finality_notification_stream().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
// Wait for BEEFY pallet to be active before starting voter.
let persisted_state =
match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications)
.await
.and_then(|best_grandpa| {
load_or_init_voter_state(&*backend, &*runtime, best_grandpa, min_block_delta)
}) {
Ok(state) => state,
Err(e) => {
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
},
};
let worker_params = worker::WorkerParams {
backend,
payload_provider,
runtime,
network,
key_store: key_store.into(),
gossip_engine,
gossip_validator,
on_demand_justifications,
links,
metrics,
persisted_state,
};
let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params);
futures::future::join(
worker.run(block_import_justif, finality_notifications),
on_demand_justifications_handler.run(),
)
.await;
}
fn load_or_init_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
// Initialize voter state from AUX DB or from pallet genesis.
if let Some(mut state) = crate::aux_schema::load_persistent(backend)? {
// Overwrite persisted state with current best GRANDPA block.
state.set_best_grandpa(best_grandpa);
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
Ok(state)
} else {
initialize_voter_state(backend, runtime, best_grandpa, min_block_delta)
}
}
// If no persisted state present, walk back the chain from first GRANDPA notification to either:
// - latest BEEFY finalized block, or if none found on the way,
// - BEEFY pallet genesis;
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
fn initialize_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
let beefy_genesis = runtime
.runtime_api()
.beefy_genesis(best_grandpa.hash())
.ok()
.flatten()
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?;
// Walk back the imported blocks and initialize voter either, at the last block with
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
let blockchain = backend.blockchain();
let mut sessions = VecDeque::new();
let mut header = best_grandpa.clone();
let state = loop {
if let Some(true) = blockchain
.justifications(header.hash())
.ok()
.flatten()
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
{
info!(
target: LOG_TARGET,
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
*header.number()
);
let best_beefy = *header.number();
// If no session boundaries detected so far, just initialize new rounds here.
if sessions.is_empty() {
let active_set = expect_validator_set(runtime, header.hash())?;
let mut rounds = Rounds::new(best_beefy, active_set);
// Mark the round as already finalized.
rounds.conclude(best_beefy);
sessions.push_front(rounds);
}
let state =
PersistedState::checked_new(best_grandpa, best_beefy, sessions, min_block_delta)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
break state
}
if *header.number() == beefy_genesis {
// We've reached BEEFY genesis, initialize voter here.
let genesis_set =
expect_validator_set(runtime, header.hash()).and_then(genesis_set_sanity_check)?;
info!(
target: LOG_TARGET,
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
Starting voting rounds at block {:?}, genesis validator set {:?}.",
beefy_genesis,
genesis_set,
);
sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
break PersistedState::checked_new(best_grandpa, Zero::zero(), sessions, min_block_delta)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
}
if let Some(active) = worker::find_authorities_change::<B>(&header) {
info!(
target: LOG_TARGET,
"🥩 Marking block {:?} as BEEFY Mandatory.",
*header.number()
);
sessions.push_front(Rounds::new(*header.number(), active));
}
// Check if state is still available if we move up the chain.
let parent_hash = *header.parent_hash();
runtime.runtime_api().validator_set(parent_hash).ok().flatten().ok_or_else(|| {
let msg = format!("{}. Could not initialize BEEFY voter.", parent_hash);
error!(target: LOG_TARGET, "🥩 {}", msg);
ClientError::Consensus(sp_consensus::Error::StateUnavailable(msg))
})?;
// Move up the chain.
header = blockchain.expect_header(parent_hash)?;
};
aux_schema::write_current_version(backend)?;
aux_schema::write_voter_state(backend, &state)?;
Ok(state)
}
/// Wait for BEEFY runtime pallet to be available, return active validator set.
/// Should be called only once during worker initialization.
async fn wait_for_runtime_pallet<B, R>(
runtime: &R,
mut gossip_engine: &mut GossipEngine<B>,
finality: &mut Fuse<FinalityNotifications<B>>,
) -> ClientResult<<B as Block>::Header>
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
info!(target: LOG_TARGET, "🥩 BEEFY gadget waiting for BEEFY pallet to become available...");
loop {
futures::select! {
notif = finality.next() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = notif.header.hash();
if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() {
if *notif.header.number() >= start {
// Beefy pallet available, return header for best grandpa at the time.
info!(
target: LOG_TARGET,
"🥩 BEEFY pallet available: block {:?} beefy genesis {:?}",
notif.header.number(), start
);
return Ok(notif.header)
}
}
},
_ = gossip_engine => {
break
}
}
}
let err_msg = "🥩 Gossip engine has unexpectedly terminated.".into();
error!(target: LOG_TARGET, "{}", err_msg);
Err(ClientError::Backend(err_msg))
}
fn genesis_set_sanity_check(
active: ValidatorSet<AuthorityId>,
) -> ClientResult<ValidatorSet<AuthorityId>> {
if active.id() == GENESIS_AUTHORITY_SET_ID {
Ok(active)
} else {
error!(target: LOG_TARGET, "🥩 Unexpected ID for genesis validator set {:?}.", active);
Err(ClientError::Backend("BEEFY Genesis sanity check failed.".into()))
}
}
fn expect_validator_set<B, R>(
runtime: &R,
at_hash: B::Hash,
) -> ClientResult<ValidatorSet<AuthorityId>>
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
runtime
.runtime_api()
.validator_set(at_hash)
.ok()
.flatten()
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))
}
@@ -0,0 +1,349 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! BEEFY Prometheus metrics definition
use log::debug;
use prometheus::{register, Counter, Gauge, PrometheusError, Registry, U64};
/// Helper trait for registering BEEFY metrics to Prometheus registry.
pub(crate) trait PrometheusRegister<T: Sized = Self>: Sized {
const DESCRIPTION: &'static str;
fn register(registry: &Registry) -> Result<Self, PrometheusError>;
}
/// BEEFY voting-related metrics exposed through Prometheus
#[derive(Clone, Debug)]
pub struct VoterMetrics {
/// Current active validator set id
pub beefy_validator_set_id: Gauge<U64>,
/// Total number of votes sent by this node
pub beefy_votes_sent: Counter<U64>,
/// Best block finalized by BEEFY
pub beefy_best_block: Gauge<U64>,
/// Best block BEEFY voted on
pub beefy_best_voted: Gauge<U64>,
/// Next block BEEFY should vote on
pub beefy_should_vote_on: Gauge<U64>,
/// Number of sessions with lagging signed commitment on mandatory block
pub beefy_lagging_sessions: Counter<U64>,
/// Number of times no Authority public key found in store
pub beefy_no_authority_found_in_store: Counter<U64>,
/// Number of currently buffered votes
pub beefy_buffered_votes: Gauge<U64>,
/// Number of votes dropped due to full buffers
pub beefy_buffered_votes_dropped: Counter<U64>,
/// Number of good votes successfully handled
pub beefy_good_votes_processed: Counter<U64>,
/// Number of equivocation votes received
pub beefy_equivocation_votes: Counter<U64>,
/// Number of invalid votes received
pub beefy_invalid_votes: Counter<U64>,
/// Number of valid but stale votes received
pub beefy_stale_votes: Counter<U64>,
/// Number of currently buffered justifications
pub beefy_buffered_justifications: Gauge<U64>,
/// Number of valid but stale justifications received
pub beefy_stale_justifications: Counter<U64>,
/// Number of valid justifications successfully imported
pub beefy_imported_justifications: Counter<U64>,
/// Number of justifications dropped due to full buffers
pub beefy_buffered_justifications_dropped: Counter<U64>,
/// Trying to set Best Beefy block to old block
pub beefy_best_block_set_last_failure: Gauge<U64>,
}
impl PrometheusRegister for VoterMetrics {
const DESCRIPTION: &'static str = "voter";
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
beefy_validator_set_id: register(
Gauge::new(
"substrate_beefy_validator_set_id",
"Current BEEFY active validator set id.",
)?,
registry,
)?,
beefy_votes_sent: register(
Counter::new("substrate_beefy_votes_sent", "Number of votes sent by this node")?,
registry,
)?,
beefy_best_block: register(
Gauge::new("substrate_beefy_best_block", "Best block finalized by BEEFY")?,
registry,
)?,
beefy_best_voted: register(
Gauge::new("substrate_beefy_best_voted", "Best block voted on by BEEFY")?,
registry,
)?,
beefy_should_vote_on: register(
Gauge::new("substrate_beefy_should_vote_on", "Next block, BEEFY should vote on")?,
registry,
)?,
beefy_lagging_sessions: register(
Counter::new(
"substrate_beefy_lagging_sessions",
"Number of sessions with lagging signed commitment on mandatory block",
)?,
registry,
)?,
beefy_no_authority_found_in_store: register(
Counter::new(
"substrate_beefy_no_authority_found_in_store",
"Number of times no Authority public key found in store",
)?,
registry,
)?,
beefy_buffered_votes: register(
Gauge::new("substrate_beefy_buffered_votes", "Number of currently buffered votes")?,
registry,
)?,
beefy_buffered_votes_dropped: register(
Counter::new(
"substrate_beefy_buffered_votes_dropped",
"Number of votes dropped due to full buffers",
)?,
registry,
)?,
beefy_good_votes_processed: register(
Counter::new(
"substrate_beefy_successful_handled_votes",
"Number of good votes successfully handled",
)?,
registry,
)?,
beefy_equivocation_votes: register(
Counter::new(
"substrate_beefy_stale_votes",
"Number of equivocation votes received",
)?,
registry,
)?,
beefy_invalid_votes: register(
Counter::new("substrate_beefy_stale_votes", "Number of invalid votes received")?,
registry,
)?,
beefy_stale_votes: register(
Counter::new(
"substrate_beefy_stale_votes",
"Number of valid but stale votes received",
)?,
registry,
)?,
beefy_buffered_justifications: register(
Gauge::new(
"substrate_beefy_buffered_justifications",
"Number of currently buffered justifications",
)?,
registry,
)?,
beefy_stale_justifications: register(
Counter::new(
"substrate_beefy_stale_justifications",
"Number of valid but stale justifications received",
)?,
registry,
)?,
beefy_imported_justifications: register(
Counter::new(
"substrate_beefy_imported_justifications",
"Number of valid justifications successfully imported",
)?,
registry,
)?,
beefy_buffered_justifications_dropped: register(
Counter::new(
"substrate_beefy_buffered_justifications_dropped",
"Number of justifications dropped due to full buffers",
)?,
registry,
)?,
beefy_best_block_set_last_failure: register(
Gauge::new(
"substrate_beefy_best_block_to_old_block",
"Trying to set Best Beefy block to old block",
)?,
registry,
)?,
})
}
}
/// BEEFY block-import-related metrics exposed through Prometheus
#[derive(Clone, Debug)]
pub struct BlockImportMetrics {
/// Number of Good Justification imports
pub beefy_good_justification_imports: Counter<U64>,
/// Number of Bad Justification imports
pub beefy_bad_justification_imports: Counter<U64>,
}
impl PrometheusRegister for BlockImportMetrics {
const DESCRIPTION: &'static str = "block-import";
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
beefy_good_justification_imports: register(
Counter::new(
"substrate_beefy_good_justification_imports",
"Number of Good Justification imports",
)?,
registry,
)?,
beefy_bad_justification_imports: register(
Counter::new(
"substrate_beefy_bad_justification_imports",
"Number of Bad Justification imports",
)?,
registry,
)?,
})
}
}
/// BEEFY on-demand-justifications-related metrics exposed through Prometheus
#[derive(Clone, Debug)]
pub struct OnDemandIncomingRequestsMetrics {
/// Number of Successful Justification responses
pub beefy_successful_justification_responses: Counter<U64>,
/// Number of Failed Justification responses
pub beefy_failed_justification_responses: Counter<U64>,
}
impl PrometheusRegister for OnDemandIncomingRequestsMetrics {
const DESCRIPTION: &'static str = "on-demand incoming justification requests";
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
beefy_successful_justification_responses: register(
Counter::new(
"substrate_beefy_successful_justification_responses",
"Number of Successful Justification responses",
)?,
registry,
)?,
beefy_failed_justification_responses: register(
Counter::new(
"substrate_beefy_failed_justification_responses",
"Number of Failed Justification responses",
)?,
registry,
)?,
})
}
}
/// BEEFY on-demand-justifications-related metrics exposed through Prometheus
#[derive(Clone, Debug)]
pub struct OnDemandOutgoingRequestsMetrics {
/// Number of times there was no good peer to request justification from
pub beefy_on_demand_justification_no_peer_to_request_from: Counter<U64>,
/// Number of on-demand justification peer hang up
pub beefy_on_demand_justification_peer_hang_up: Counter<U64>,
/// Number of on-demand justification peer error
pub beefy_on_demand_justification_peer_error: Counter<U64>,
/// Number of on-demand justification invalid proof
pub beefy_on_demand_justification_invalid_proof: Counter<U64>,
/// Number of on-demand justification good proof
pub beefy_on_demand_justification_good_proof: Counter<U64>,
}
impl PrometheusRegister for OnDemandOutgoingRequestsMetrics {
const DESCRIPTION: &'static str = "on-demand outgoing justification requests";
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
beefy_on_demand_justification_no_peer_to_request_from: register(
Counter::new(
"substrate_beefy_on_demand_justification_no_peer_to_request_from",
"Number of times there was no good peer to request justification from",
)?,
registry,
)?,
beefy_on_demand_justification_peer_hang_up: register(
Counter::new(
"substrate_beefy_on_demand_justification_peer_hang_up",
"Number of on-demand justification peer hang up",
)?,
registry,
)?,
beefy_on_demand_justification_peer_error: register(
Counter::new(
"substrate_beefy_on_demand_justification_peer_error",
"Number of on-demand justification peer error",
)?,
registry,
)?,
beefy_on_demand_justification_invalid_proof: register(
Counter::new(
"substrate_beefy_on_demand_justification_invalid_proof",
"Number of on-demand justification invalid proof",
)?,
registry,
)?,
beefy_on_demand_justification_good_proof: register(
Counter::new(
"substrate_beefy_on_demand_justification_good_proof",
"Number of on-demand justification good proof",
)?,
registry,
)?,
})
}
}
pub(crate) fn register_metrics<T: PrometheusRegister>(
prometheus_registry: Option<prometheus::Registry>,
) -> Option<T> {
prometheus_registry.as_ref().map(T::register).and_then(|result| match result {
Ok(metrics) => {
debug!(target: "beefy", "🥩 Registered {} metrics", T::DESCRIPTION);
Some(metrics)
},
Err(err) => {
debug!(target: "beefy", "🥩 Failed to register {} metrics: {:?}", T::DESCRIPTION, err);
None
},
})
}
// Note: we use the `format` macro to convert an expr into a `u64`. This will fail,
// if expr does not derive `Display`.
#[macro_export]
macro_rules! metric_set {
($self:ident, $m:ident, $v:expr) => {{
let val: u64 = format!("{}", $v).parse().unwrap();
if let Some(metrics) = $self.metrics.as_ref() {
metrics.$m.set(val);
}
}};
}
#[macro_export]
macro_rules! metric_inc {
($self:ident, $m:ident) => {{
if let Some(metrics) = $self.metrics.as_ref() {
metrics.$m.inc();
}
}};
}
#[macro_export]
macro_rules! metric_get {
($self:ident, $m:ident) => {{
$self.metrics.as_ref().map(|metrics| metrics.$m.clone())
}};
}
@@ -0,0 +1,495 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::LOG_TARGET;
use codec::{Decode, Encode};
use log::debug;
use sp_consensus_beefy::{
crypto::{AuthorityId, Public, Signature},
Commitment, EquivocationProof, SignedCommitment, ValidatorSet, ValidatorSetId, VoteMessage,
};
use sp_runtime::traits::{Block, NumberFor};
use std::collections::BTreeMap;
/// Tracks for each round which validators have voted/signed and
/// whether the local `self` validator has voted/signed.
///
/// Does not do any validation on votes or signatures, layers above need to handle that (gossip).
#[derive(Debug, Decode, Default, Encode, PartialEq)]
pub(crate) struct RoundTracker {
votes: BTreeMap<Public, Signature>,
}
impl RoundTracker {
fn add_vote(&mut self, vote: (Public, Signature)) -> bool {
if self.votes.contains_key(&vote.0) {
return false
}
self.votes.insert(vote.0, vote.1);
true
}
fn is_done(&self, threshold: usize) -> bool {
self.votes.len() >= threshold
}
}
/// Minimum size of `authorities` subset that produced valid signatures for a block to finalize.
pub fn threshold(authorities: usize) -> usize {
let faulty = authorities.saturating_sub(1) / 3;
authorities - faulty
}
#[derive(Debug, PartialEq)]
pub enum VoteImportResult<B: Block> {
Ok,
RoundConcluded(SignedCommitment<NumberFor<B>, Signature>),
Equivocation(EquivocationProof<NumberFor<B>, Public, Signature>),
Invalid,
Stale,
}
/// Keeps track of all voting rounds (block numbers) within a session.
/// Only round numbers > `best_done` are of interest, all others are considered stale.
///
/// Does not do any validation on votes or signatures, layers above need to handle that (gossip).
#[derive(Debug, Decode, Encode, PartialEq)]
pub(crate) struct Rounds<B: Block> {
rounds: BTreeMap<Commitment<NumberFor<B>>, RoundTracker>,
previous_votes: BTreeMap<(Public, NumberFor<B>), VoteMessage<NumberFor<B>, Public, Signature>>,
session_start: NumberFor<B>,
validator_set: ValidatorSet<Public>,
mandatory_done: bool,
best_done: Option<NumberFor<B>>,
}
impl<B> Rounds<B>
where
B: Block,
{
pub(crate) fn new(session_start: NumberFor<B>, validator_set: ValidatorSet<Public>) -> Self {
Rounds {
rounds: BTreeMap::new(),
previous_votes: BTreeMap::new(),
session_start,
validator_set,
mandatory_done: false,
best_done: None,
}
}
pub(crate) fn validator_set(&self) -> &ValidatorSet<Public> {
&self.validator_set
}
pub(crate) fn validator_set_id(&self) -> ValidatorSetId {
self.validator_set.id()
}
pub(crate) fn validators(&self) -> &[Public] {
self.validator_set.validators()
}
pub(crate) fn session_start(&self) -> NumberFor<B> {
self.session_start
}
pub(crate) fn mandatory_done(&self) -> bool {
self.mandatory_done
}
pub(crate) fn add_vote(
&mut self,
vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
) -> VoteImportResult<B> {
let num = vote.commitment.block_number;
let vote_key = (vote.id.clone(), num);
if num < self.session_start || Some(num) <= self.best_done {
debug!(target: LOG_TARGET, "🥩 received vote for old stale round {:?}, ignoring", num);
return VoteImportResult::Stale
} else if vote.commitment.validator_set_id != self.validator_set_id() {
debug!(
target: LOG_TARGET,
"🥩 expected set_id {:?}, ignoring vote {:?}.",
self.validator_set_id(),
vote,
);
return VoteImportResult::Invalid
} else if !self.validators().iter().any(|id| &vote.id == id) {
debug!(
target: LOG_TARGET,
"🥩 received vote {:?} from validator that is not in the validator set, ignoring",
vote
);
return VoteImportResult::Invalid
}
if let Some(previous_vote) = self.previous_votes.get(&vote_key) {
// is the same public key voting for a different payload?
if previous_vote.commitment.payload != vote.commitment.payload {
debug!(
target: LOG_TARGET,
"🥩 detected equivocated vote: 1st: {:?}, 2nd: {:?}", previous_vote, vote
);
return VoteImportResult::Equivocation(EquivocationProof {
first: previous_vote.clone(),
second: vote,
})
}
} else {
// this is the first vote sent by `id` for `num`, all good
self.previous_votes.insert(vote_key, vote.clone());
}
// add valid vote
let round = self.rounds.entry(vote.commitment.clone()).or_default();
if round.add_vote((vote.id, vote.signature)) &&
round.is_done(threshold(self.validator_set.len()))
{
if let Some(round) = self.rounds.remove_entry(&vote.commitment) {
return VoteImportResult::RoundConcluded(self.signed_commitment(round))
}
}
VoteImportResult::Ok
}
fn signed_commitment(
&mut self,
round: (Commitment<NumberFor<B>>, RoundTracker),
) -> SignedCommitment<NumberFor<B>, Signature> {
let votes = round.1.votes;
let signatures = self
.validators()
.iter()
.map(|authority_id| votes.get(authority_id).cloned())
.collect();
SignedCommitment { commitment: round.0, signatures }
}
pub(crate) fn conclude(&mut self, round_num: NumberFor<B>) {
// Remove this and older (now stale) rounds.
self.rounds.retain(|commitment, _| commitment.block_number > round_num);
self.previous_votes.retain(|&(_, number), _| number > round_num);
self.mandatory_done = self.mandatory_done || round_num == self.session_start;
self.best_done = self.best_done.max(Some(round_num));
debug!(target: LOG_TARGET, "🥩 Concluded round #{}", round_num);
}
}
#[cfg(test)]
mod tests {
use sc_network_test::Block;
use sp_consensus_beefy::{
crypto::Public, known_payloads::MMR_ROOT_ID, Commitment, EquivocationProof, Keyring,
Payload, SignedCommitment, ValidatorSet, VoteMessage,
};
use super::{threshold, Block as BlockT, RoundTracker, Rounds};
use crate::round::VoteImportResult;
impl<B> Rounds<B>
where
B: BlockT,
{
pub(crate) fn test_set_mandatory_done(&mut self, done: bool) {
self.mandatory_done = done;
}
}
#[test]
fn round_tracker() {
let mut rt = RoundTracker::default();
let bob_vote = (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed"));
let threshold = 2;
// adding new vote allowed
assert!(rt.add_vote(bob_vote.clone()));
// adding existing vote not allowed
assert!(!rt.add_vote(bob_vote));
// vote is not done
assert!(!rt.is_done(threshold));
let alice_vote = (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed"));
// adding new vote (self vote this time) allowed
assert!(rt.add_vote(alice_vote));
// vote is now done
assert!(rt.is_done(threshold));
}
#[test]
fn vote_threshold() {
assert_eq!(threshold(1), 1);
assert_eq!(threshold(2), 2);
assert_eq!(threshold(3), 3);
assert_eq!(threshold(4), 3);
assert_eq!(threshold(100), 67);
assert_eq!(threshold(300), 201);
}
#[test]
fn new_rounds() {
sp_tracing::try_init_simple();
let validators = ValidatorSet::<Public>::new(
vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()],
42,
)
.unwrap();
let session_start = 1u64.into();
let rounds = Rounds::<Block>::new(session_start, validators);
assert_eq!(42, rounds.validator_set_id());
assert_eq!(1, rounds.session_start());
assert_eq!(
&vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()],
rounds.validators()
);
}
#[test]
fn add_and_conclude_votes() {
sp_tracing::try_init_simple();
let validators = ValidatorSet::<Public>::new(
vec![
Keyring::Alice.public(),
Keyring::Bob.public(),
Keyring::Charlie.public(),
Keyring::Eve.public(),
],
Default::default(),
)
.unwrap();
let validator_set_id = validators.id();
let session_start = 1u64.into();
let mut rounds = Rounds::<Block>::new(session_start, validators);
let payload = Payload::from_single_entry(MMR_ROOT_ID, vec![]);
let block_number = 1;
let commitment = Commitment { block_number, payload, validator_set_id };
let mut vote = VoteMessage {
id: Keyring::Alice.public(),
commitment: commitment.clone(),
signature: Keyring::Alice.sign(b"I am committed"),
};
// add 1st good vote
assert_eq!(rounds.add_vote(vote.clone()), VoteImportResult::Ok);
// double voting (same vote), ok, no effect
assert_eq!(rounds.add_vote(vote.clone()), VoteImportResult::Ok);
vote.id = Keyring::Dave.public();
vote.signature = Keyring::Dave.sign(b"I am committed");
// invalid vote (Dave is not a validator)
assert_eq!(rounds.add_vote(vote.clone()), VoteImportResult::Invalid);
vote.id = Keyring::Bob.public();
vote.signature = Keyring::Bob.sign(b"I am committed");
// add 2nd good vote
assert_eq!(rounds.add_vote(vote.clone()), VoteImportResult::Ok);
vote.id = Keyring::Charlie.public();
vote.signature = Keyring::Charlie.sign(b"I am committed");
// add 3rd good vote -> round concluded -> signatures present
assert_eq!(
rounds.add_vote(vote.clone()),
VoteImportResult::RoundConcluded(SignedCommitment {
commitment,
signatures: vec![
Some(Keyring::Alice.sign(b"I am committed")),
Some(Keyring::Bob.sign(b"I am committed")),
Some(Keyring::Charlie.sign(b"I am committed")),
None,
]
})
);
rounds.conclude(block_number);
vote.id = Keyring::Eve.public();
vote.signature = Keyring::Eve.sign(b"I am committed");
// Eve is a validator, but round was concluded, adding vote disallowed
assert_eq!(rounds.add_vote(vote), VoteImportResult::Stale);
}
#[test]
fn old_rounds_not_accepted() {
sp_tracing::try_init_simple();
let validators = ValidatorSet::<Public>::new(
vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()],
42,
)
.unwrap();
let validator_set_id = validators.id();
// active rounds starts at block 10
let session_start = 10u64.into();
let mut rounds = Rounds::<Block>::new(session_start, validators);
// vote on round 9
let block_number = 9;
let payload = Payload::from_single_entry(MMR_ROOT_ID, vec![]);
let commitment = Commitment { block_number, payload, validator_set_id };
let mut vote = VoteMessage {
id: Keyring::Alice.public(),
commitment,
signature: Keyring::Alice.sign(b"I am committed"),
};
// add vote for previous session, should fail
assert_eq!(rounds.add_vote(vote.clone()), VoteImportResult::Stale);
// no votes present
assert!(rounds.rounds.is_empty());
// simulate 11 was concluded
rounds.best_done = Some(11);
// add votes for current session, but already concluded rounds, should fail
vote.commitment.block_number = 10;
assert_eq!(rounds.add_vote(vote.clone()), VoteImportResult::Stale);
vote.commitment.block_number = 11;
assert_eq!(rounds.add_vote(vote.clone()), VoteImportResult::Stale);
// no votes present
assert!(rounds.rounds.is_empty());
// add vote for active round 12
vote.commitment.block_number = 12;
assert_eq!(rounds.add_vote(vote), VoteImportResult::Ok);
// good vote present
assert_eq!(rounds.rounds.len(), 1);
}
#[test]
fn multiple_rounds() {
sp_tracing::try_init_simple();
let validators = ValidatorSet::<Public>::new(
vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()],
Default::default(),
)
.unwrap();
let validator_set_id = validators.id();
let session_start = 1u64.into();
let mut rounds = Rounds::<Block>::new(session_start, validators);
let payload = Payload::from_single_entry(MMR_ROOT_ID, vec![]);
let commitment = Commitment { block_number: 1, payload, validator_set_id };
let mut alice_vote = VoteMessage {
id: Keyring::Alice.public(),
commitment: commitment.clone(),
signature: Keyring::Alice.sign(b"I am committed"),
};
let mut bob_vote = VoteMessage {
id: Keyring::Bob.public(),
commitment: commitment.clone(),
signature: Keyring::Bob.sign(b"I am committed"),
};
let mut charlie_vote = VoteMessage {
id: Keyring::Charlie.public(),
commitment,
signature: Keyring::Charlie.sign(b"I am committed"),
};
let expected_signatures = vec![
Some(Keyring::Alice.sign(b"I am committed")),
Some(Keyring::Bob.sign(b"I am committed")),
Some(Keyring::Charlie.sign(b"I am committed")),
];
// round 1 - only 2 out of 3 vote
assert_eq!(rounds.add_vote(alice_vote.clone()), VoteImportResult::Ok);
assert_eq!(rounds.add_vote(charlie_vote.clone()), VoteImportResult::Ok);
// should be 1 active round
assert_eq!(1, rounds.rounds.len());
// round 2 - only Charlie votes
charlie_vote.commitment.block_number = 2;
assert_eq!(rounds.add_vote(charlie_vote.clone()), VoteImportResult::Ok);
// should be 2 active rounds
assert_eq!(2, rounds.rounds.len());
// round 3 - all validators vote -> round is concluded
alice_vote.commitment.block_number = 3;
bob_vote.commitment.block_number = 3;
charlie_vote.commitment.block_number = 3;
assert_eq!(rounds.add_vote(alice_vote.clone()), VoteImportResult::Ok);
assert_eq!(rounds.add_vote(bob_vote.clone()), VoteImportResult::Ok);
assert_eq!(
rounds.add_vote(charlie_vote.clone()),
VoteImportResult::RoundConcluded(SignedCommitment {
commitment: charlie_vote.commitment,
signatures: expected_signatures
})
);
// should be only 2 active since this one auto-concluded
assert_eq!(2, rounds.rounds.len());
// conclude round 2
rounds.conclude(2);
// should be no more active rounds since 2 was officially concluded and round "1" is stale
assert!(rounds.rounds.is_empty());
// conclude round 3
rounds.conclude(3);
assert!(rounds.previous_votes.is_empty());
}
#[test]
fn should_provide_equivocation_proof() {
sp_tracing::try_init_simple();
let validators = ValidatorSet::<Public>::new(
vec![Keyring::Alice.public(), Keyring::Bob.public()],
Default::default(),
)
.unwrap();
let validator_set_id = validators.id();
let session_start = 1u64.into();
let mut rounds = Rounds::<Block>::new(session_start, validators);
let payload1 = Payload::from_single_entry(MMR_ROOT_ID, vec![1, 1, 1, 1]);
let payload2 = Payload::from_single_entry(MMR_ROOT_ID, vec![2, 2, 2, 2]);
let commitment1 = Commitment { block_number: 1, payload: payload1, validator_set_id };
let commitment2 = Commitment { block_number: 1, payload: payload2, validator_set_id };
let alice_vote1 = VoteMessage {
id: Keyring::Alice.public(),
commitment: commitment1,
signature: Keyring::Alice.sign(b"I am committed"),
};
let mut alice_vote2 = alice_vote1.clone();
alice_vote2.commitment = commitment2;
let expected_result = VoteImportResult::Equivocation(EquivocationProof {
first: alice_vote1.clone(),
second: alice_vote2.clone(),
});
// vote on one payload - ok
assert_eq!(rounds.add_vote(alice_vote1), VoteImportResult::Ok);
// vote on _another_ commitment/payload -> expected equivocation proof
assert_eq!(rounds.add_vote(alice_vote2), expected_result);
}
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff