Beefy on-demand justifications as a custom RequestResponse protocol (#12124)

* client/beefy: create communication module and move gossip there

* client/beefy: move beefy_protocol_name module to communication

* client/beefy: move notification module under communication

* client/beefy: add incoming request_response protocol handler

* client/beefy: keep track of connected peers and their progress

* client/beefy: add logic for generating Justif requests

* client/beefy: cancel outdated on-demand justification requests

* try Andre's suggestion for JustificationEngine

* justif engine add justifs validation

* client/beefy: impl OnDemandJustificationsEngine async next()

* move beefy proto name test

* client/beefy: initialize OnDemandJustificationsEngine

* client/tests: allow for custom req-resp protocols

* client/beefy: on-demand-justif: implement simple peer selection strategy

* client/beefy: fix voter initialization

Fix corner case where voter gets a single burst of finality
notifications just when it starts.

The notification stream was consumed by "wait_for_pallet" logic,
then main loop would subscribe to finality notifications, but by that
time some notifications might've been lost.

Fix this by subscribing the main loop to notifications before waiting
for pallet to become available. Share the same stream with the main loop
so that notifications for blocks before pallet available are ignored,
while _all_ notifications after pallet available are processed.

Add regression test for this.

Signed-off-by: acatangiu <adrian@parity.io>

* client/beefy: make sure justif requests are always out for mandatory blocks

* client/beefy: add test for on-demand justifications sync

* client/beefy: tweak main loop event processing order

* client/beefy: run on-demand-justif-handler under same async task as voter

* client/beefy: add test for known-peers

* client/beefy: reorg request-response module

* client/beefy: add issue references for future work todos

* client/beefy: consolidate on-demand-justifications engine state machine

Signed-off-by: acatangiu <adrian@parity.io>

* client/beefy: fix for polkadot companion

* client/beefy: implement review suggestions

* cargo fmt and clippy

* fix merge damage

* fix rust-doc

* fix merge damage

* fix merge damage

* client/beefy: add test for justif proto name

Signed-off-by: acatangiu <adrian@parity.io>
This commit is contained in:
Adrian Catangiu
2022-10-03 16:00:57 +03:00
committed by GitHub
parent bb9d2fa75a
commit 2a27545afe
14 changed files with 1208 additions and 219 deletions
@@ -0,0 +1,464 @@
// This file is part of Substrate.
// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use sc_network::PeerId;
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
use sp_core::hashing::twox_64;
use sp_runtime::traits::{Block, Hash, Header, NumberFor};
use codec::{Decode, Encode};
use log::{debug, trace};
use parking_lot::{Mutex, RwLock};
use wasm_timer::Instant;
use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore};
use beefy_primitives::{
crypto::{Public, Signature},
VoteMessage,
};
// Timeout for rebroadcasting messages.
const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
/// Gossip engine messages topic
pub(crate) fn topic<B: Block>() -> B::Hash
where
B: Block,
{
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
}
/// A type that represents hash of the message.
pub type MessageHash = [u8; 8];
struct KnownVotes<B: Block> {
last_done: Option<NumberFor<B>>,
live: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
}
impl<B: Block> KnownVotes<B> {
pub fn new() -> Self {
Self { last_done: None, live: BTreeMap::new() }
}
/// Create new round votes set if not already present.
fn insert(&mut self, round: NumberFor<B>) {
self.live.entry(round).or_default();
}
/// Remove `round` and older from live set, update `last_done` accordingly.
fn conclude(&mut self, round: NumberFor<B>) {
self.live.retain(|&number, _| number > round);
self.last_done = self.last_done.max(Some(round));
}
/// Return true if `round` is newer than previously concluded rounds.
///
/// Latest concluded round is still considered alive to allow proper gossiping for it.
fn is_live(&self, round: &NumberFor<B>) -> bool {
Some(*round) >= self.last_done
}
/// Add new _known_ `hash` to the round's known votes.
fn add_known(&mut self, round: &NumberFor<B>, hash: MessageHash) {
self.live.get_mut(round).map(|known| known.insert(hash));
}
/// Check if `hash` is already part of round's known votes.
fn is_known(&self, round: &NumberFor<B>, hash: &MessageHash) -> bool {
self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false)
}
}
/// BEEFY gossip validator
///
/// Validate BEEFY gossip messages and limit the number of live BEEFY voting rounds.
///
/// Allows messages for 'rounds >= last concluded' to flow, everything else gets
/// rejected/expired.
///
///All messaging is handled in a single BEEFY global topic.
pub(crate) struct GossipValidator<B>
where
B: Block,
{
topic: B::Hash,
known_votes: RwLock<KnownVotes<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
}
impl<B> GossipValidator<B>
where
B: Block,
{
pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
GossipValidator {
topic: topic::<B>(),
known_votes: RwLock::new(KnownVotes::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
}
}
/// Note a voting round.
///
/// Noting round will start a live `round`.
pub(crate) fn note_round(&self, round: NumberFor<B>) {
debug!(target: "beefy", "🥩 About to note gossip round #{}", round);
self.known_votes.write().insert(round);
}
/// Conclude a voting round.
///
/// This can be called once round is complete so we stop gossiping for it.
pub(crate) fn conclude_round(&self, round: NumberFor<B>) {
debug!(target: "beefy", "🥩 About to drop gossip round #{}", round);
self.known_votes.write().conclude(round);
}
}
impl<B> Validator<B> for GossipValidator<B>
where
B: Block,
{
fn validate(
&self,
_context: &mut dyn ValidatorContext<B>,
sender: &PeerId,
mut data: &[u8],
) -> ValidationResult<B::Hash> {
if let Ok(msg) = VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
let msg_hash = twox_64(data);
let round = msg.commitment.block_number;
// Verify general usefulness of the message.
// We are going to discard old votes right away (without verification)
// Also we keep track of already received votes to avoid verifying duplicates.
{
let known_votes = self.known_votes.read();
if !known_votes.is_live(&round) {
return ValidationResult::Discard
}
if known_votes.is_known(&round, &msg_hash) {
return ValidationResult::ProcessAndKeep(self.topic)
}
}
if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) {
self.known_votes.write().add_known(&round, msg_hash);
self.known_peers.lock().note_vote_for(*sender, round);
return ValidationResult::ProcessAndKeep(self.topic)
} else {
// TODO: report peer
debug!(target: "beefy", "🥩 Bad signature on message: {:?}, from: {:?}", msg, sender);
}
}
ValidationResult::Discard
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
let known_votes = self.known_votes.read();
Box::new(move |_topic, mut data| {
let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};
let round = msg.commitment.block_number;
let expired = !known_votes.is_live(&round);
trace!(target: "beefy", "🥩 Message for round #{} expired: {}", round, expired);
expired
})
}
fn message_allowed<'a>(
&'a self,
) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
let do_rebroadcast = {
let now = Instant::now();
let mut next_rebroadcast = self.next_rebroadcast.lock();
if now >= *next_rebroadcast {
*next_rebroadcast = now + REBROADCAST_AFTER;
true
} else {
false
}
};
let known_votes = self.known_votes.read();
Box::new(move |_who, intent, _topic, mut data| {
if let MessageIntent::PeriodicRebroadcast = intent {
return do_rebroadcast
}
let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return false,
};
let round = msg.commitment.block_number;
let allowed = known_votes.is_live(&round);
trace!(target: "beefy", "🥩 Message for round #{} allowed: {}", round, allowed);
allowed
})
}
}
#[cfg(test)]
mod tests {
use sc_keystore::LocalKeystore;
use sc_network_test::Block;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use crate::keystore::{tests::Keyring, BeefyKeystore};
use beefy_primitives::{
crypto::Signature, known_payload_ids, Commitment, MmrRootHash, Payload, VoteMessage,
KEY_TYPE,
};
use super::*;
#[test]
fn known_votes_insert_remove() {
let mut kv = KnownVotes::<Block>::new();
kv.insert(1);
kv.insert(1);
kv.insert(2);
assert_eq!(kv.live.len(), 2);
let mut kv = KnownVotes::<Block>::new();
kv.insert(1);
kv.insert(2);
kv.insert(3);
assert!(kv.last_done.is_none());
kv.conclude(2);
assert_eq!(kv.live.len(), 1);
assert!(!kv.live.contains_key(&2));
assert_eq!(kv.last_done, Some(2));
kv.conclude(1);
assert_eq!(kv.last_done, Some(2));
kv.conclude(3);
assert_eq!(kv.last_done, Some(3));
assert!(kv.live.is_empty());
}
#[test]
fn note_and_drop_round_works() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.note_round(1u64);
assert!(gv.known_votes.read().is_live(&1u64));
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
assert_eq!(gv.known_votes.read().live.len(), 4);
gv.conclude_round(7u64);
let votes = gv.known_votes.read();
// rounds 1 and 3 are outdated, don't gossip anymore
assert!(!votes.is_live(&1u64));
assert!(!votes.is_live(&3u64));
// latest concluded round is still gossiped
assert!(votes.is_live(&7u64));
// round 10 is alive and in-progress
assert!(votes.is_live(&10u64));
}
#[test]
fn note_same_round_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
assert_eq!(gv.known_votes.read().live.len(), 3);
// note round #7 again -> should not change anything
gv.note_round(7u64);
let votes = gv.known_votes.read();
assert_eq!(votes.live.len(), 3);
assert!(votes.is_live(&3u64));
assert!(votes.is_live(&7u64));
assert!(votes.is_live(&10u64));
}
struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
todo!()
}
fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {
todo!()
}
fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec<u8>) {
todo!()
}
fn send_topic(&mut self, _who: &sc_network::PeerId, _topic: B::Hash, _force: bool) {
todo!()
}
}
fn sign_commitment<BN: Encode>(who: &Keyring, commitment: &Commitment<BN>) -> Signature {
let store: SyncCryptoStorePtr = std::sync::Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::ecdsa_generate_new(&*store, KEY_TYPE, Some(&who.to_seed())).unwrap();
let beefy_keystore: BeefyKeystore = Some(store).into();
beefy_keystore.sign(&who.public(), &commitment.encode()).unwrap()
}
fn dummy_vote(block_number: u64) -> VoteMessage<u64, Public, Signature> {
let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, MmrRootHash::default().encode());
let commitment = Commitment { payload, block_number, validator_set_id: 0 };
let signature = sign_commitment(&Keyring::Alice, &commitment);
VoteMessage { commitment, id: Keyring::Alice.public(), signature }
}
#[test]
fn should_avoid_verifying_signatures_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let sender = sc_network::PeerId::random();
let mut context = TestContext;
let vote = dummy_vote(3);
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
// first time the cache should be populated
let res = gv.validate(&mut context, &sender, &vote.encode());
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
assert_eq!(
gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
Some(1)
);
// second time we should hit the cache
let res = gv.validate(&mut context, &sender, &vote.encode());
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
// next we should quickly reject if the round is not live
gv.conclude_round(7_u64);
assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number));
let res = gv.validate(&mut context, &sender, &vote.encode());
assert!(matches!(res, ValidationResult::Discard));
}
#[test]
fn messages_allowed_and_expired() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let sender = sc_network::PeerId::random();
let topic = Default::default();
let intent = MessageIntent::Broadcast;
// note round 2 and 3, then conclude 2
gv.note_round(2u64);
gv.note_round(3u64);
gv.conclude_round(2u64);
let mut allowed = gv.message_allowed();
let mut expired = gv.message_expired();
// check bad vote format
assert!(!allowed(&sender, intent, &topic, &mut [0u8; 16]));
assert!(expired(topic, &mut [0u8; 16]));
// inactive round 1 -> expired
let vote = dummy_vote(1);
let mut encoded_vote = vote.encode();
assert!(!allowed(&sender, intent, &topic, &mut encoded_vote));
assert!(expired(topic, &mut encoded_vote));
// active round 2 -> !expired - concluded but still gossiped
let vote = dummy_vote(2);
let mut encoded_vote = vote.encode();
assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
assert!(!expired(topic, &mut encoded_vote));
// in progress round 3 -> !expired
let vote = dummy_vote(3);
let mut encoded_vote = vote.encode();
assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
assert!(!expired(topic, &mut encoded_vote));
// unseen round 4 -> !expired
let vote = dummy_vote(3);
let mut encoded_vote = vote.encode();
assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
assert!(!expired(topic, &mut encoded_vote));
}
#[test]
fn messages_rebroadcast() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let sender = sc_network::PeerId::random();
let topic = Default::default();
let vote = dummy_vote(1);
let mut encoded_vote = vote.encode();
// re-broadcasting only allowed at `REBROADCAST_AFTER` intervals
let intent = MessageIntent::PeriodicRebroadcast;
let mut allowed = gv.message_allowed();
// rebroadcast not allowed so soon after GossipValidator creation
assert!(!allowed(&sender, intent, &topic, &mut encoded_vote));
// hack the inner deadline to be `now`
*gv.next_rebroadcast.lock() = Instant::now();
// still not allowed on old `allowed` closure result
assert!(!allowed(&sender, intent, &topic, &mut encoded_vote));
// renew closure result
let mut allowed = gv.message_allowed();
// rebroadcast should be allowed now
assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
}
}
@@ -0,0 +1,118 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Communication streams for the BEEFY networking protocols.
pub mod notification;
pub mod request_response;
pub(crate) mod gossip;
pub(crate) mod peers;
pub(crate) mod beefy_protocol_name {
use array_bytes::bytes2hex;
use sc_network::ProtocolName;
/// BEEFY votes gossip protocol name suffix.
const GOSSIP_NAME: &str = "/beefy/1";
/// BEEFY justifications protocol name suffix.
const JUSTIFICATIONS_NAME: &str = "/beefy/justifications/1";
/// Old names for the gossip protocol, used for backward compatibility.
pub(super) const LEGACY_NAMES: [&str; 1] = ["/paritytech/beefy/1"];
/// Name of the votes gossip protocol used by BEEFY.
///
/// Must be registered towards the networking in order for BEEFY voter to properly function.
pub fn gossip_protocol_name<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
) -> ProtocolName {
let genesis_hash = genesis_hash.as_ref();
if let Some(fork_id) = fork_id {
format!("/{}/{}{}", bytes2hex("", genesis_hash), fork_id, GOSSIP_NAME).into()
} else {
format!("/{}{}", bytes2hex("", genesis_hash), GOSSIP_NAME).into()
}
}
/// Name of the BEEFY justifications request-response protocol.
pub fn justifications_protocol_name<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
) -> ProtocolName {
let genesis_hash = genesis_hash.as_ref();
if let Some(fork_id) = fork_id {
format!("/{}/{}{}", bytes2hex("", genesis_hash), fork_id, JUSTIFICATIONS_NAME).into()
} else {
format!("/{}{}", bytes2hex("", genesis_hash), JUSTIFICATIONS_NAME).into()
}
}
}
/// Returns the configuration value to put in
/// [`sc_network::config::NetworkConfiguration::extra_sets`].
/// For standard protocol name see [`beefy_protocol_name::gossip_protocol_name`].
pub fn beefy_peers_set_config(
gossip_protocol_name: sc_network::ProtocolName,
) -> sc_network_common::config::NonDefaultSetConfig {
let mut cfg =
sc_network_common::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024);
cfg.allow_non_reserved(25, 25);
cfg.add_fallback_names(beefy_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect());
cfg
}
#[cfg(test)]
mod tests {
use super::*;
use sp_core::H256;
#[test]
fn beefy_protocols_names() {
use beefy_protocol_name::{gossip_protocol_name, justifications_protocol_name};
// Create protocol name using random genesis hash.
let genesis_hash = H256::random();
let genesis_hex = array_bytes::bytes2hex("", genesis_hash.as_ref());
let expected_gossip_name = format!("/{}/beefy/1", genesis_hex);
let gossip_proto_name = gossip_protocol_name(&genesis_hash, None);
assert_eq!(gossip_proto_name.to_string(), expected_gossip_name);
let expected_justif_name = format!("/{}/beefy/justifications/1", genesis_hex);
let justif_proto_name = justifications_protocol_name(&genesis_hash, None);
assert_eq!(justif_proto_name.to_string(), expected_justif_name);
// Create protocol name using hardcoded genesis hash. Verify exact representation.
let genesis_hash = [
50, 4, 60, 123, 58, 106, 216, 246, 194, 188, 139, 193, 33, 212, 202, 171, 9, 55, 123,
94, 8, 43, 12, 251, 187, 57, 173, 19, 188, 74, 205, 147,
];
let genesis_hex = "32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93";
let expected_gossip_name = format!("/{}/beefy/1", genesis_hex);
let gossip_proto_name = gossip_protocol_name(&genesis_hash, None);
assert_eq!(gossip_proto_name.to_string(), expected_gossip_name);
let expected_justif_name = format!("/{}/beefy/justifications/1", genesis_hex);
let justif_proto_name = justifications_protocol_name(&genesis_hash, None);
assert_eq!(justif_proto_name.to_string(), expected_justif_name);
}
}
@@ -0,0 +1,55 @@
// This file is part of Substrate.
// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use sc_utils::notification::{NotificationSender, NotificationStream, TracingKeyStr};
use sp_runtime::traits::Block as BlockT;
use crate::justification::BeefyVersionedFinalityProof;
/// The sending half of the notifications channel(s) used to send
/// notifications about best BEEFY block from the gadget side.
pub type BeefyBestBlockSender<Block> = NotificationSender<<Block as BlockT>::Hash>;
/// The receiving half of a notifications channel used to receive
/// notifications about best BEEFY blocks determined on the gadget side.
pub type BeefyBestBlockStream<Block> =
NotificationStream<<Block as BlockT>::Hash, BeefyBestBlockTracingKey>;
/// The sending half of the notifications channel(s) used to send notifications
/// about versioned finality proof generated at the end of a BEEFY round.
pub type BeefyVersionedFinalityProofSender<Block> =
NotificationSender<BeefyVersionedFinalityProof<Block>>;
/// The receiving half of a notifications channel used to receive notifications
/// about versioned finality proof generated at the end of a BEEFY round.
pub type BeefyVersionedFinalityProofStream<Block> =
NotificationStream<BeefyVersionedFinalityProof<Block>, BeefyVersionedFinalityProofTracingKey>;
/// Provides tracing key for BEEFY best block stream.
#[derive(Clone)]
pub struct BeefyBestBlockTracingKey;
impl TracingKeyStr for BeefyBestBlockTracingKey {
const TRACING_KEY: &'static str = "mpsc_beefy_best_block_notification_stream";
}
/// Provides tracing key for BEEFY versioned finality proof stream.
#[derive(Clone)]
pub struct BeefyVersionedFinalityProofTracingKey;
impl TracingKeyStr for BeefyVersionedFinalityProofTracingKey {
const TRACING_KEY: &'static str = "mpsc_beefy_versioned_finality_proof_notification_stream";
}
@@ -0,0 +1,131 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Logic for keeping track of BEEFY peers.
// TODO (issue #12296): replace this naive peer tracking with generic one that infers data
// from multiple network protocols.
use sc_network::PeerId;
use sp_runtime::traits::{Block, NumberFor, Zero};
use std::collections::{HashMap, VecDeque};
struct PeerData<B: Block> {
last_voted_on: NumberFor<B>,
}
impl<B: Block> Default for PeerData<B> {
fn default() -> Self {
PeerData { last_voted_on: Zero::zero() }
}
}
/// Keep a simple map of connected peers
/// and the most recent voting round they participated in.
pub struct KnownPeers<B: Block> {
live: HashMap<PeerId, PeerData<B>>,
}
impl<B: Block> KnownPeers<B> {
pub fn new() -> Self {
Self { live: HashMap::new() }
}
/// Add new connected `peer`.
pub fn add_new(&mut self, peer: PeerId) {
self.live.entry(peer).or_default();
}
/// Note vote round number for `peer`.
pub fn note_vote_for(&mut self, peer: PeerId, round: NumberFor<B>) {
let data = self.live.entry(peer).or_default();
data.last_voted_on = round.max(data.last_voted_on);
}
/// Remove connected `peer`.
pub fn remove(&mut self, peer: &PeerId) {
self.live.remove(peer);
}
/// Return _filtered and cloned_ list of peers that have voted on `block` or higher.
pub fn at_least_at_block(&self, block: NumberFor<B>) -> VecDeque<PeerId> {
self.live
.iter()
.filter_map(|(k, v)| (v.last_voted_on >= block).then_some(k))
.cloned()
.collect()
}
/// Answer whether `peer` is part of `KnownPeers` set.
pub fn contains(&self, peer: &PeerId) -> bool {
self.live.contains_key(peer)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_track_known_peers_progress() {
let (alice, bob, charlie) = (PeerId::random(), PeerId::random(), PeerId::random());
let mut peers = KnownPeers::<sc_network_test::Block>::new();
assert!(peers.live.is_empty());
// Alice and Bob new connected peers.
peers.add_new(alice);
peers.add_new(bob);
// 'Tracked' Bob seen voting for 5.
peers.note_vote_for(bob, 5);
// Previously unseen Charlie now seen voting for 10.
peers.note_vote_for(charlie, 10);
assert_eq!(peers.live.len(), 3);
assert!(peers.contains(&alice));
assert!(peers.contains(&bob));
assert!(peers.contains(&charlie));
// Get peers at block >= 5
let at_5 = peers.at_least_at_block(5);
// Should be Bob and Charlie
assert_eq!(at_5.len(), 2);
assert!(at_5.contains(&bob));
assert!(at_5.contains(&charlie));
// 'Tracked' Alice seen voting for 10.
peers.note_vote_for(alice, 10);
// Get peers at block >= 9
let at_9 = peers.at_least_at_block(9);
// Should be Charlie and Alice
assert_eq!(at_9.len(), 2);
assert!(at_9.contains(&charlie));
assert!(at_9.contains(&alice));
// Remove Alice
peers.remove(&alice);
assert_eq!(peers.live.len(), 2);
assert!(!peers.contains(&alice));
// Get peers at block >= 9
let at_9 = peers.at_least_at_block(9);
// Now should be just Charlie
assert_eq!(at_9.len(), 1);
assert!(at_9.contains(&charlie));
}
}
@@ -0,0 +1,193 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Helper for handling (i.e. answering) BEEFY justifications requests from a remote peer.
use beefy_primitives::BEEFY_ENGINE_ID;
use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, trace};
use sc_client_api::BlockBackend;
use sc_network::{config as netconfig, config::RequestResponseConfig, PeerId, ReputationChange};
use sc_network_common::protocol::ProtocolName;
use sp_runtime::{generic::BlockId, traits::Block};
use std::{marker::PhantomData, sync::Arc};
use crate::communication::request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
};
/// A request coming in, including a sender for sending responses.
#[derive(Debug)]
pub(crate) struct IncomingRequest<B: Block> {
/// `PeerId` of sending peer.
pub peer: PeerId,
/// The sent request.
pub payload: JustificationRequest<B>,
/// Sender for sending response back.
pub pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
}
impl<B: Block> IncomingRequest<B> {
/// Create new `IncomingRequest`.
pub fn new(
peer: PeerId,
payload: JustificationRequest<B>,
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
) -> Self {
Self { peer, payload, pending_response }
}
/// Try building from raw network request.
///
/// This function will fail if the request cannot be decoded and will apply passed in
/// reputation changes in that case.
///
/// Params:
/// - The raw request to decode
/// - Reputation changes to apply for the peer in case decoding fails.
pub fn try_from_raw(
raw: netconfig::IncomingRequest,
reputation_changes: Vec<ReputationChange>,
) -> Result<Self, Error> {
let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
let payload = match JustificationRequest::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let response = netconfig::OutgoingResponse {
result: Err(()),
reputation_changes,
sent_feedback: None,
};
if let Err(_) = pending_response.send(response) {
return Err(Error::DecodingErrorNoReputationChange(peer, err))
}
return Err(Error::DecodingError(peer, err))
},
};
Ok(Self::new(peer, payload, pending_response))
}
}
/// Receiver for incoming BEEFY justifications requests.
///
/// Takes care of decoding and handling of invalid encoded requests.
pub(crate) struct IncomingRequestReceiver {
raw: mpsc::Receiver<netconfig::IncomingRequest>,
}
impl IncomingRequestReceiver {
pub fn new(inner: mpsc::Receiver<netconfig::IncomingRequest>) -> Self {
Self { raw: inner }
}
/// Try to receive the next incoming request.
///
/// Any received request will be decoded, on decoding errors the provided reputation changes
/// will be applied and an error will be reported.
pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
where
B: Block,
F: FnOnce() -> Vec<ReputationChange>,
{
let req = match self.raw.next().await {
None => return Err(Error::RequestChannelExhausted),
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes())?,
};
Ok(req)
}
}
/// Handler for incoming BEEFY justifications requests from a remote peer.
pub struct BeefyJustifsRequestHandler<B, Client> {
pub(crate) request_receiver: IncomingRequestReceiver,
pub(crate) justif_protocol_name: ProtocolName,
pub(crate) client: Arc<Client>,
pub(crate) _block: PhantomData<B>,
}
impl<B, Client> BeefyJustifsRequestHandler<B, Client>
where
B: Block,
Client: BlockBackend<B> + Send + Sync,
{
/// Create a new [`BeefyJustifsRequestHandler`].
pub fn new<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
client: Arc<Client>,
) -> (Self, RequestResponseConfig) {
let (request_receiver, config) =
on_demand_justifications_protocol_config(genesis_hash, fork_id);
let justif_protocol_name = config.name.clone();
(Self { request_receiver, justif_protocol_name, client, _block: PhantomData }, config)
}
/// Network request-response protocol name used by this handler.
pub fn protocol_name(&self) -> ProtocolName {
self.justif_protocol_name.clone()
}
// Sends back justification response if justification found in client backend.
fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
// TODO (issue #12293): validate `request` and change peer reputation for invalid requests.
let maybe_encoded_proof = self
.client
.justifications(&BlockId::Number(request.payload.begin))
.map_err(Error::Client)?
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
// No BEEFY justification present.
.ok_or(());
request
.pending_response
.send(netconfig::OutgoingResponse {
result: maybe_encoded_proof,
reputation_changes: Vec::new(),
sent_feedback: None,
})
.map_err(|_| Error::SendResponse)
}
/// Run [`BeefyJustifsRequestHandler`].
pub async fn run(mut self) {
trace!(target: "beefy::sync", "🥩 Running BeefyJustifsRequestHandler");
while let Ok(request) = self.request_receiver.recv(|| vec![]).await {
let peer = request.peer;
match self.handle_request(request) {
Ok(()) => {
debug!(
target: "beefy::sync",
"🥩 Handled BEEFY justification request from {:?}.", peer
)
},
Err(e) => {
// TODO (issue #12293): apply reputation changes here based on error type.
debug!(
target: "beefy::sync",
"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
)
},
}
}
}
}
@@ -0,0 +1,101 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Request/response protocol for syncing BEEFY justifications.
mod incoming_requests_handler;
pub(crate) mod outgoing_requests_engine;
pub use incoming_requests_handler::BeefyJustifsRequestHandler;
use futures::channel::mpsc;
use std::time::Duration;
use codec::{Decode, Encode, Error as CodecError};
use sc_network::{config::RequestResponseConfig, PeerId};
use sp_runtime::traits::{Block, NumberFor};
use crate::communication::beefy_protocol_name::justifications_protocol_name;
use incoming_requests_handler::IncomingRequestReceiver;
// 10 seems reasonable, considering justifs are explicitly requested only
// for mandatory blocks, by nodes that are syncing/catching-up.
const JUSTIF_CHANNEL_SIZE: usize = 10;
const MAX_RESPONSE_SIZE: u64 = 1024 * 1024;
const JUSTIF_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
/// Get the configuration for the BEEFY justifications Request/response protocol.
///
/// Returns a receiver for messages received on this protocol and the requested
/// `ProtocolConfig`.
///
/// Consider using [`BeefyJustifsRequestHandler`] instead of this low-level function.
pub(crate) fn on_demand_justifications_protocol_config<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
) -> (IncomingRequestReceiver, RequestResponseConfig) {
let name = justifications_protocol_name(genesis_hash, fork_id);
let fallback_names = vec![];
let (tx, rx) = mpsc::channel(JUSTIF_CHANNEL_SIZE);
let rx = IncomingRequestReceiver::new(rx);
let cfg = RequestResponseConfig {
name,
fallback_names,
max_request_size: 32,
max_response_size: MAX_RESPONSE_SIZE,
// We are connected to all validators:
request_timeout: JUSTIF_REQUEST_TIMEOUT,
inbound_queue: Some(tx),
};
(rx, cfg)
}
/// BEEFY justification request.
#[derive(Debug, Clone, Encode, Decode)]
pub struct JustificationRequest<B: Block> {
/// Start collecting proofs from this block.
pub begin: NumberFor<B>,
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Client(#[from] sp_blockchain::Error),
#[error(transparent)]
RuntimeApi(#[from] sp_api::ApiError),
/// Decoding failed, we were able to change the peer's reputation accordingly.
#[error("Decoding request failed for peer {0}.")]
DecodingError(PeerId, #[source] CodecError),
/// Decoding failed, but sending reputation change failed.
#[error("Decoding request failed for peer {0}, and changing reputation failed.")]
DecodingErrorNoReputationChange(PeerId, #[source] CodecError),
/// Incoming request stream exhausted. Should only happen on shutdown.
#[error("Incoming request channel got closed.")]
RequestChannelExhausted,
#[error("Failed to send response.")]
SendResponse,
#[error("Received invalid response.")]
InvalidResponse,
}
@@ -0,0 +1,245 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Generating request logic for request/response protocol for syncing BEEFY justifications.
use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet};
use codec::Encode;
use futures::{
channel::{oneshot, oneshot::Canceled},
stream::{self, StreamExt},
};
use log::{debug, error, warn};
use parking_lot::Mutex;
use sc_network::{PeerId, ProtocolName};
use sc_network_common::{
request_responses::{IfDisconnected, RequestFailure},
service::NetworkRequest,
};
use sp_api::ProvideRuntimeApi;
use sp_runtime::{
generic::BlockId,
traits::{Block, NumberFor},
};
use std::{collections::VecDeque, result::Result, sync::Arc};
use crate::{
communication::request_response::{Error, JustificationRequest},
justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
KnownPeers,
};
/// Response type received from network.
type Response = Result<Vec<u8>, RequestFailure>;
/// Used to receive a response from the network.
type ResponseReceiver = oneshot::Receiver<Response>;
enum State<B: Block> {
Idle(stream::Pending<Result<Response, Canceled>>),
AwaitingResponse(PeerId, NumberFor<B>, stream::Once<ResponseReceiver>),
}
pub struct OnDemandJustificationsEngine<B: Block, R> {
network: Arc<dyn NetworkRequest + Send + Sync>,
runtime: Arc<R>,
protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>,
peers_cache: VecDeque<PeerId>,
state: State<B>,
}
impl<B, R> OnDemandJustificationsEngine<B, R>
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
pub fn new(
network: Arc<dyn NetworkRequest + Send + Sync>,
runtime: Arc<R>,
protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>,
) -> Self {
Self {
network,
runtime,
protocol_name,
live_peers,
peers_cache: VecDeque::new(),
state: State::Idle(stream::pending()),
}
}
fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
self.peers_cache = self.live_peers.lock().at_least_at_block(block);
}
fn try_next_peer(&mut self) -> Option<PeerId> {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
let live = self.live_peers.lock();
while let Some(peer) = self.peers_cache.pop_front() {
if live.contains(&peer) {
return Some(peer)
}
}
None
}
fn request_from_peer(&mut self, peer: PeerId, block: NumberFor<B>) {
debug!(target: "beefy::sync", "🥩 requesting justif #{:?} from peer {:?}", block, peer);
let payload = JustificationRequest::<B> { begin: block }.encode();
let (tx, rx) = oneshot::channel();
self.network.start_request(
peer,
self.protocol_name.clone(),
payload,
tx,
IfDisconnected::ImmediateError,
);
self.state = State::AwaitingResponse(peer, block, stream::once(rx));
}
/// If no other request is in progress, start new justification request for `block`.
pub fn request(&mut self, block: NumberFor<B>) {
// ignore new requests while there's already one pending
match &self.state {
State::AwaitingResponse(_, _, _) => return,
State::Idle(_) => (),
}
self.reset_peers_cache_for_block(block);
// Start the requests engine - each unsuccessful received response will automatically
// trigger a new request to the next peer in the `peers_cache` until there are none left.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, block);
} else {
debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block);
}
}
/// Cancel any pending request for block numbers smaller or equal to `block`.
pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) {
match &self.state {
State::AwaitingResponse(_, number, _) if *number <= block => {
debug!(
target: "beefy::sync",
"🥩 cancel pending request for justification #{:?}",
number
);
self.state = State::Idle(stream::pending());
},
_ => (),
}
}
fn process_response(
&mut self,
peer: PeerId,
block: NumberFor<B>,
validator_set: &ValidatorSet<AuthorityId>,
response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
block, peer, e
);
Error::InvalidResponse
})?
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
block, peer, e
);
Error::InvalidResponse
})
.and_then(|encoded| {
decode_and_verify_finality_proof::<B>(&encoded[..], block, &validator_set).map_err(
|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
block, peer, e
);
Error::InvalidResponse
},
)
})
}
pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
let (peer, block, resp) = match &mut self.state {
State::Idle(pending) => {
let _ = pending.next().await;
// This never happens since 'stream::pending' never generates any items.
return None
},
State::AwaitingResponse(peer, block, receiver) => {
let resp = receiver.next().await?;
(*peer, *block, resp)
},
};
// We received the awaited response. Our 'stream::once()' receiver will never generate any
// other response, meaning we're done with current state. Move the engine to `State::Idle`.
self.state = State::Idle(stream::pending());
let block_id = BlockId::number(block);
let validator_set = self
.runtime
.runtime_api()
.validator_set(&block_id)
.map_err(|e| {
error!(target: "beefy::sync", "🥩 Runtime API error {:?} in on-demand justif engine.", e);
e
})
.ok()?
.or_else(|| {
error!(target: "beefy::sync", "🥩 BEEFY pallet not available for block {:?}.", block);
None
})?;
self.process_response(peer, block, &validator_set, resp)
.map_err(|_| {
// No valid justification received, try next peer in our set.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, block);
} else {
warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block);
}
})
.map(|proof| {
debug!(
target: "beefy::sync",
"🥩 received valid on-demand justif #{:?} from {:?}",
block, peer
);
proof
})
.ok()
}
}