sc-consensus-beefy: add peer reputation cost/benefit changes (#13881)

* add cost/benefit to gossip messages
* report BEEFY gossip peer reputation changes
* drop WorkerParams helper struct
* add reputation costs to tests
* add peer reputation cost/benefit to on-demand-requests protocol
* include amount of signatures checked in invalid proof reputation cost

Signed-off-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
Adrian Catangiu
2023-04-12 14:09:50 +03:00
committed by GitHub
parent 84c9e2f63a
commit 4253ecbc62
12 changed files with 430 additions and 225 deletions
@@ -18,7 +18,7 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use sc_network::PeerId;
use sc_network::{PeerId, ReputationChange};
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
use sp_core::hashing::twox_64;
use sp_runtime::traits::{Block, Hash, Header, NumberFor};
@@ -26,10 +26,14 @@ use sp_runtime::traits::{Block, Hash, Header, NumberFor};
use codec::{Decode, Encode};
use log::{debug, trace};
use parking_lot::{Mutex, RwLock};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use wasm_timer::Instant;
use crate::{
communication::peers::KnownPeers,
communication::{
benefit, cost,
peers::{KnownPeers, PeerReport},
},
justification::{
proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof,
},
@@ -47,6 +51,27 @@ const REBROADCAST_AFTER: Duration = Duration::from_secs(60);
#[cfg(test)]
const REBROADCAST_AFTER: Duration = Duration::from_secs(5);
#[derive(Debug, PartialEq)]
pub(super) enum Action<H> {
// repropagate under given topic, to the given peers, applying cost/benefit to originator.
Keep(H, ReputationChange),
// discard, applying cost/benefit to originator.
Discard(ReputationChange),
}
/// An outcome of examining a message.
#[derive(Debug, PartialEq, Clone, Copy)]
enum Consider {
/// Accept the message.
Accept,
/// Message is too early. Reject.
RejectPast,
/// Message is from the future. Reject.
RejectFuture,
/// Message cannot be evaluated. Reject.
RejectOutOfScope,
}
/// BEEFY gossip message type that gets encoded and sent on the network.
#[derive(Debug, Encode, Decode)]
pub(crate) enum GossipMessage<B: Block> {
@@ -135,26 +160,47 @@ impl<B: Block> Filter<B> {
}
}
/// Return true if `max(session_start, best_beefy) <= round <= best_grandpa`,
/// Accept if `max(session_start, best_beefy) <= round <= best_grandpa`,
/// and vote `set_id` matches session set id.
///
/// Latest concluded round is still considered alive to allow proper gossiping for it.
fn is_vote_accepted(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
fn consider_vote(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> Consider {
self.inner
.as_ref()
.map(|f| set_id == f.validator_set.id() && round >= f.start && round <= f.end)
.unwrap_or(false)
.map(|f|
// only from current set and only [filter.start, filter.end]
if set_id < f.validator_set.id() {
Consider::RejectPast
} else if set_id > f.validator_set.id() {
Consider::RejectFuture
} else if round < f.start {
Consider::RejectPast
} else if round > f.end {
Consider::RejectFuture
} else {
Consider::Accept
})
.unwrap_or(Consider::RejectOutOfScope)
}
/// Return true if `round` is >= than `max(session_start, best_beefy)`,
/// and proof `set_id` matches session set id.
///
/// Latest concluded round is still considered alive to allow proper gossiping for it.
fn is_finality_proof_accepted(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
fn consider_finality_proof(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> Consider {
self.inner
.as_ref()
.map(|f| set_id == f.validator_set.id() && round >= f.start)
.unwrap_or(false)
.map(|f|
// only from current set and only >= filter.start
if round < f.start || set_id < f.validator_set.id() {
Consider::RejectPast
} else if set_id > f.validator_set.id() {
Consider::RejectFuture
} else {
Consider::Accept
}
)
.unwrap_or(Consider::RejectOutOfScope)
}
/// Add new _known_ `hash` to the round's known votes.
@@ -189,20 +235,26 @@ where
gossip_filter: RwLock<Filter<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
report_sender: TracingUnboundedSender<PeerReport>,
}
impl<B> GossipValidator<B>
where
B: Block,
{
pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
GossipValidator {
pub(crate) fn new(
known_peers: Arc<Mutex<KnownPeers<B>>>,
) -> (GossipValidator<B>, TracingUnboundedReceiver<PeerReport>) {
let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 10_000);
let val = GossipValidator {
votes_topic: votes_topic::<B>(),
justifs_topic: proofs_topic::<B>(),
gossip_filter: RwLock::new(Filter::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
}
report_sender: tx,
};
(val, rx)
}
/// Update gossip validator filter.
@@ -213,12 +265,16 @@ where
self.gossip_filter.write().update(filter);
}
fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
}
fn validate_vote(
&self,
vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
sender: &PeerId,
data: &[u8],
) -> ValidationResult<B::Hash> {
) -> Action<B::Hash> {
let msg_hash = twox_64(data);
let round = vote.commitment.block_number;
let set_id = vote.commitment.validator_set_id;
@@ -230,25 +286,37 @@ where
{
let filter = self.gossip_filter.read();
if !filter.is_vote_accepted(round, set_id) {
return ValidationResult::Discard
match filter.consider_vote(round, set_id) {
Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
Consider::Accept => {},
}
if filter.is_known_vote(round, &msg_hash) {
return ValidationResult::ProcessAndKeep(self.votes_topic)
return Action::Keep(self.votes_topic, benefit::KNOWN_VOTE_MESSAGE)
}
// ensure authority is part of the set.
if !filter
.validator_set()
.map(|set| set.validators().contains(&vote.id))
.unwrap_or(false)
{
debug!(target: LOG_TARGET, "Message from voter not in validator set: {}", vote.id);
return Action::Discard(cost::UNKNOWN_VOTER)
}
}
if BeefyKeystore::verify(&vote.id, &vote.signature, &vote.commitment.encode()) {
self.gossip_filter.write().add_known_vote(round, msg_hash);
ValidationResult::ProcessAndKeep(self.votes_topic)
Action::Keep(self.votes_topic, benefit::VOTE_MESSAGE)
} else {
// TODO: report peer
debug!(
target: LOG_TARGET,
"🥩 Bad signature on message: {:?}, from: {:?}", vote, sender
);
ValidationResult::Discard
Action::Discard(cost::BAD_SIGNATURE)
}
}
@@ -256,31 +324,38 @@ where
&self,
proof: BeefyVersionedFinalityProof<B>,
sender: &PeerId,
) -> ValidationResult<B::Hash> {
) -> Action<B::Hash> {
let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
self.known_peers.lock().note_vote_for(*sender, round);
let guard = self.gossip_filter.read();
// Verify general usefulness of the justifications.
if !guard.is_finality_proof_accepted(round, set_id) {
return ValidationResult::Discard
// Verify general usefulness of the justification.
match guard.consider_finality_proof(round, set_id) {
Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
Consider::Accept => {},
}
// Verify justification signatures.
guard
.validator_set()
.map(|validator_set| {
if let Ok(()) = verify_with_validator_set::<B>(round, validator_set, &proof) {
ValidationResult::ProcessAndKeep(self.justifs_topic)
} else {
// TODO: report peer
if let Err((_, signatures_checked)) =
verify_with_validator_set::<B>(round, validator_set, &proof)
{
debug!(
target: LOG_TARGET,
"🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender
);
ValidationResult::Discard
let mut cost = cost::INVALID_PROOF;
cost.value +=
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
Action::Discard(cost)
} else {
Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF)
}
})
.unwrap_or(ValidationResult::Discard)
.unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE))
}
}
@@ -294,15 +369,32 @@ where
fn validate(
&self,
_context: &mut dyn ValidatorContext<B>,
context: &mut dyn ValidatorContext<B>,
sender: &PeerId,
mut data: &[u8],
) -> ValidationResult<B::Hash> {
match GossipMessage::<B>::decode(&mut data) {
Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, data),
let raw = data;
let action = match GossipMessage::<B>::decode(&mut data) {
Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, raw),
Ok(GossipMessage::FinalityProof(proof)) => self.validate_finality_proof(proof, sender),
Err(e) => {
debug!(target: LOG_TARGET, "Error decoding message: {}", e);
let bytes = raw.len().min(i32::MAX as usize) as i32;
let cost = ReputationChange::new(
bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
"BEEFY: Bad packet",
);
Action::Discard(cost)
},
};
match action {
Action::Keep(topic, cb) => {
self.report(*sender, cb);
context.broadcast_message(topic, data.to_vec(), false);
ValidationResult::ProcessAndKeep(topic)
},
Action::Discard(cb) => {
self.report(*sender, cb);
ValidationResult::Discard
},
}
@@ -314,13 +406,13 @@ where
Ok(GossipMessage::Vote(msg)) => {
let round = msg.commitment.block_number;
let set_id = msg.commitment.validator_set_id;
let expired = !filter.is_vote_accepted(round, set_id);
let expired = filter.consider_vote(round, set_id) != Consider::Accept;
trace!(target: LOG_TARGET, "🥩 Vote for round #{} expired: {}", round, expired);
expired
},
Ok(GossipMessage::FinalityProof(proof)) => {
let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
let expired = !filter.is_finality_proof_accepted(round, set_id);
let expired = filter.consider_finality_proof(round, set_id) != Consider::Accept;
trace!(
target: LOG_TARGET,
"🥩 Finality proof for round #{} expired: {}",
@@ -358,13 +450,13 @@ where
Ok(GossipMessage::Vote(msg)) => {
let round = msg.commitment.block_number;
let set_id = msg.commitment.validator_set_id;
let allowed = filter.is_vote_accepted(round, set_id);
let allowed = filter.consider_vote(round, set_id) == Consider::Accept;
trace!(target: LOG_TARGET, "🥩 Vote for round #{} allowed: {}", round, allowed);
allowed
},
Ok(GossipMessage::FinalityProof(proof)) => {
let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
let allowed = filter.is_finality_proof_accepted(round, set_id);
let allowed = filter.consider_finality_proof(round, set_id) == Consider::Accept;
trace!(
target: LOG_TARGET,
"🥩 Finality proof for round #{} allowed: {}",
@@ -409,15 +501,16 @@ pub(crate) mod tests {
assert_eq!(filter.live_votes.len(), 3);
assert!(filter.inner.is_none());
assert!(!filter.is_vote_accepted(1, 1));
assert_eq!(filter.consider_vote(1, 1), Consider::RejectOutOfScope);
filter.update(GossipFilterCfg { start: 3, end: 10, validator_set: &validator_set });
assert_eq!(filter.live_votes.len(), 1);
assert!(filter.live_votes.contains_key(&3));
assert!(!filter.is_vote_accepted(2, 1));
assert!(filter.is_vote_accepted(3, 1));
assert!(filter.is_vote_accepted(4, 1));
assert!(!filter.is_vote_accepted(4, 2));
assert_eq!(filter.consider_vote(2, 1), Consider::RejectPast);
assert_eq!(filter.consider_vote(3, 1), Consider::Accept);
assert_eq!(filter.consider_vote(4, 1), Consider::Accept);
assert_eq!(filter.consider_vote(20, 1), Consider::RejectFuture);
assert_eq!(filter.consider_vote(4, 2), Consider::RejectFuture);
let validator_set = ValidatorSet::<AuthorityId>::new(keys, 2).unwrap();
filter.update(GossipFilterCfg { start: 5, end: 10, validator_set: &validator_set });
@@ -430,9 +523,7 @@ pub(crate) mod tests {
todo!()
}
fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {
todo!()
}
fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {}
fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec<u8>) {
todo!()
@@ -485,18 +576,39 @@ pub(crate) mod tests {
fn should_validate_messages() {
let keys = vec![Keyring::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
let sender = sc_network::PeerId::random();
let (gv, mut report_stream) =
GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let sender = PeerId::random();
let mut context = TestContext;
// reject message, decoding error
let bad_encoding = b"0000000000".as_slice();
let expected_cost = ReputationChange::new(
(bad_encoding.len() as i32).saturating_mul(cost::PER_UNDECODABLE_BYTE),
"BEEFY: Bad packet",
);
let mut expected_report = PeerReport { who: sender, cost_benefit: expected_cost };
let res = gv.validate(&mut context, &sender, bad_encoding);
assert!(matches!(res, ValidationResult::Discard));
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// verify votes validation
let vote = dummy_vote(3);
let gossip_vote = GossipMessage::<Block>::Vote(vote.clone());
let encoded = GossipMessage::<Block>::Vote(vote.clone()).encode();
// first time the cache should be populated
let res = gv.validate(&mut context, &sender, &gossip_vote.encode());
// filter not initialized
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::OUT_OF_SCOPE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
// nothing in cache first time
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VOTE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(
gv.gossip_filter
.read()
@@ -507,43 +619,74 @@ pub(crate) mod tests {
);
// second time we should hit the cache
let res = gv.validate(&mut context, &sender, &gossip_vote.encode());
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::KNOWN_VOTE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// next we should quickly reject if the round is not live
gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set });
// reject vote, voter not in validator set
let mut bad_vote = vote.clone();
bad_vote.id = Keyring::Bob.public();
let bad_vote = GossipMessage::<Block>::Vote(bad_vote).encode();
let res = gv.validate(&mut context, &sender, &bad_vote);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::UNKNOWN_VOTER;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// reject if the round is not GRANDPA finalized
gv.update_filter(GossipFilterCfg { start: 1, end: 2, validator_set: &validator_set });
let number = vote.commitment.block_number;
let set_id = vote.commitment.validator_set_id;
assert!(!gv.gossip_filter.read().is_vote_accepted(number, set_id));
let res = gv.validate(&mut context, &sender, &vote.encode());
assert_eq!(gv.gossip_filter.read().consider_vote(number, set_id), Consider::RejectFuture);
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::FUTURE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// reject if the round is not live anymore
gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set });
let number = vote.commitment.block_number;
let set_id = vote.commitment.validator_set_id;
assert_eq!(gv.gossip_filter.read().consider_vote(number, set_id), Consider::RejectPast);
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// now verify proofs validation
// reject old proof
let proof = dummy_proof(5, &validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// accept next proof with good set_id
let proof = dummy_proof(7, &validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VALIDATED_PROOF;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// accept future proof with good set_id
let proof = dummy_proof(20, &validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VALIDATED_PROOF;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// reject proof, wrong set_id
// reject proof, future set_id
let bad_validator_set = ValidatorSet::<AuthorityId>::new(keys, 1).unwrap();
let proof = dummy_proof(20, &bad_validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::FUTURE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// reject proof, bad signatures (Bob instead of Alice)
let bad_validator_set =
@@ -552,13 +695,16 @@ pub(crate) mod tests {
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::INVALID_PROOF;
expected_report.cost_benefit.value += cost::PER_SIGNATURE_CHECKED;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
}
#[test]
fn messages_allowed_and_expired() {
let keys = vec![Keyring::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
let sender = sc_network::PeerId::random();
let topic = Default::default();
@@ -635,7 +781,7 @@ pub(crate) mod tests {
fn messages_rebroadcast() {
let keys = vec![Keyring::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
let sender = sc_network::PeerId::random();
let topic = Default::default();
@@ -73,6 +73,39 @@ pub fn beefy_peers_set_config(
cfg
}
// cost scalars for reporting peers.
mod cost {
use sc_network::ReputationChange as Rep;
// Message that's for an outdated round.
pub(super) const OUTDATED_MESSAGE: Rep = Rep::new(-50, "BEEFY: Past message");
// Message that's from the future relative to our current set-id.
pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-100, "BEEFY: Future message");
// Vote message containing bad signature.
pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature");
// Message received with vote from voter not in validator set.
pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter");
// A message received that cannot be evaluated relative to our current state.
pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message");
// Message containing invalid proof.
pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit");
// Reputation cost per signature checked for invalid proof.
pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
// Reputation cost per byte for un-decodable message.
pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
// On-demand request was refused by peer.
pub(super) const REFUSAL_RESPONSE: Rep = Rep::new(-100, "BEEFY: Proof request refused");
// On-demand request for a proof that can't be found in the backend.
pub(super) const UNKOWN_PROOF_REQUEST: Rep = Rep::new(-150, "BEEFY: Unknown proof request");
}
// benefit scalars for reporting peers.
mod benefit {
use sc_network::ReputationChange as Rep;
pub(super) const VOTE_MESSAGE: Rep = Rep::new(100, "BEEFY: Round vote message");
pub(super) const KNOWN_VOTE_MESSAGE: Rep = Rep::new(50, "BEEFY: Known vote");
pub(super) const VALIDATED_PROOF: Rep = Rep::new(100, "BEEFY: Justification");
}
#[cfg(test)]
mod tests {
use super::*;
@@ -18,13 +18,17 @@
//! Logic for keeping track of BEEFY peers.
// TODO (issue #12296): replace this naive peer tracking with generic one that infers data
// from multiple network protocols.
use sc_network::PeerId;
use sc_network::{PeerId, ReputationChange};
use sp_runtime::traits::{Block, NumberFor, Zero};
use std::collections::{HashMap, VecDeque};
/// Report specifying a reputation change for a given peer.
#[derive(Debug, PartialEq)]
pub(crate) struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
}
struct PeerData<B: Block> {
last_voted_on: NumberFor<B>,
}
@@ -32,9 +32,12 @@ use sp_runtime::traits::Block;
use std::{marker::PhantomData, sync::Arc};
use crate::{
communication::request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
BEEFY_SYNC_LOG_TARGET,
communication::{
cost,
request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
BEEFY_SYNC_LOG_TARGET,
},
},
metric_inc,
metrics::{register_metrics, OnDemandIncomingRequestsMetrics},
@@ -69,17 +72,20 @@ impl<B: Block> IncomingRequest<B> {
/// Params:
/// - The raw request to decode
/// - Reputation changes to apply for the peer in case decoding fails.
pub fn try_from_raw(
pub fn try_from_raw<F>(
raw: netconfig::IncomingRequest,
reputation_changes: Vec<ReputationChange>,
) -> Result<Self, Error> {
reputation_changes_on_err: F,
) -> Result<Self, Error>
where
F: FnOnce(usize) -> Vec<ReputationChange>,
{
let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
let payload = match JustificationRequest::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let response = netconfig::OutgoingResponse {
result: Err(()),
reputation_changes,
reputation_changes: reputation_changes_on_err(payload.len()),
sent_feedback: None,
};
if let Err(_) = pending_response.send(response) {
@@ -111,11 +117,11 @@ impl IncomingRequestReceiver {
pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
where
B: Block,
F: FnOnce() -> Vec<ReputationChange>,
F: FnOnce(usize) -> Vec<ReputationChange>,
{
let req = match self.raw.next().await {
None => return Err(Error::RequestChannelExhausted),
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes())?,
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes)?,
};
Ok(req)
}
@@ -159,26 +165,20 @@ where
// Sends back justification response if justification found in client backend.
fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
// TODO (issue #12293): validate `request` and change peer reputation for invalid requests.
let maybe_encoded_proof = if let Some(hash) =
self.client.block_hash(request.payload.begin).map_err(Error::Client)?
{
self.client
.justifications(hash)
.map_err(Error::Client)?
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
// No BEEFY justification present.
.ok_or(())
} else {
Err(())
};
let mut reputation_changes = vec![];
let maybe_encoded_proof = self
.client
.block_hash(request.payload.begin)
.ok()
.flatten()
.and_then(|hash| self.client.justifications(hash).ok().flatten())
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
.ok_or_else(|| reputation_changes.push(cost::UNKOWN_PROOF_REQUEST));
request
.pending_response
.send(netconfig::OutgoingResponse {
result: maybe_encoded_proof,
reputation_changes: Vec::new(),
reputation_changes,
sent_feedback: None,
})
.map_err(|_| Error::SendResponse)
@@ -188,7 +188,17 @@ where
pub async fn run(mut self) {
trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");
while let Ok(request) = self.request_receiver.recv(|| vec![]).await {
while let Ok(request) = self
.request_receiver
.recv(|bytes| {
let bytes = bytes.min(i32::MAX as usize) as i32;
vec![ReputationChange::new(
bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
"BEEFY: Bad request payload",
)]
})
.await
{
let peer = request.peer;
match self.handle_request(request) {
Ok(()) => {
@@ -199,8 +209,8 @@ where
)
},
Err(e) => {
// peer reputation changes already applied in `self.handle_request()`
metric_inc!(self, beefy_failed_justification_responses);
// TODO (issue #12293): apply reputation changes here based on error type.
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
@@ -30,7 +30,7 @@ use codec::{Decode, Encode, Error as CodecError};
use sc_network::{config::RequestResponseConfig, PeerId};
use sp_runtime::traits::{Block, NumberFor};
use crate::communication::beefy_protocol_name::justifications_protocol_name;
use crate::communication::{beefy_protocol_name::justifications_protocol_name, peers::PeerReport};
use incoming_requests_handler::IncomingRequestReceiver;
// 10 seems reasonable, considering justifs are explicitly requested only
@@ -76,7 +76,7 @@ pub struct JustificationRequest<B: Block> {
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] sp_blockchain::Error),
@@ -99,5 +99,8 @@ pub enum Error {
SendResponse,
#[error("Received invalid response.")]
InvalidResponse,
InvalidResponse(PeerReport),
#[error("Internal error while getting response.")]
ResponseError,
}
@@ -31,7 +31,11 @@ use sp_runtime::traits::{Block, NumberFor};
use std::{collections::VecDeque, result::Result, sync::Arc};
use crate::{
communication::request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
communication::{
benefit, cost,
peers::PeerReport,
request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
},
justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
metric_inc,
metrics::{register_metrics, OnDemandOutgoingRequestsMetrics},
@@ -54,6 +58,16 @@ enum State<B: Block> {
AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
}
/// Possible engine responses.
pub(crate) enum ResponseInfo<B: Block> {
/// No peer response available yet.
Pending,
/// Valid justification provided alongside peer reputation changes.
ValidProof(BeefyVersionedFinalityProof<B>, PeerReport),
/// No justification yet, only peer reputation changes.
PeerReport(PeerReport),
}
pub struct OnDemandJustificationsEngine<B: Block> {
network: Arc<dyn NetworkRequest + Send + Sync>,
protocol_name: ProtocolName,
@@ -84,12 +98,10 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
}
fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
self.peers_cache = self.live_peers.lock().further_than(block);
}
fn try_next_peer(&mut self) -> Option<PeerId> {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
let live = self.live_peers.lock();
while let Some(peer) = self.peers_cache.pop_front() {
if live.contains(&peer) {
@@ -159,24 +171,19 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
fn process_response(
&mut self,
peer: PeerId,
peer: &PeerId,
req_info: &RequestInfo<B>,
response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_peer_hang_up);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
req_info.block,
peer,
e
"🥩 on-demand sc-network channel sender closed, err: {:?}", e
);
Error::InvalidResponse
Error::ResponseError
})?
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_peer_error);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
@@ -184,7 +191,18 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
peer,
e
);
Error::InvalidResponse
match e {
RequestFailure::Refused => {
metric_inc!(self, beefy_on_demand_justification_peer_refused);
let peer_report =
PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE };
Error::InvalidResponse(peer_report)
},
_ => {
metric_inc!(self, beefy_on_demand_justification_peer_error);
Error::ResponseError
},
}
})
.and_then(|encoded| {
decode_and_verify_finality_proof::<B>(
@@ -192,23 +210,26 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
req_info.block,
&req_info.active_set,
)
.map_err(|e| {
.map_err(|(err, signatures_checked)| {
metric_inc!(self, beefy_on_demand_justification_invalid_proof);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
req_info.block, peer, e
req_info.block, peer, err
);
Error::InvalidResponse
let mut cost = cost::INVALID_PROOF;
cost.value +=
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
Error::InvalidResponse(PeerReport { who: *peer, cost_benefit: cost })
})
})
}
pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
pub(crate) async fn next(&mut self) -> ResponseInfo<B> {
let (peer, req_info, resp) = match &mut self.state {
State::Idle => {
futures::future::pending::<()>().await;
return None
return ResponseInfo::Pending
},
State::AwaitingResponse(peer, req_info, receiver) => {
let resp = receiver.await;
@@ -220,8 +241,8 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
self.state = State::Idle;
let block = req_info.block;
self.process_response(peer, &req_info, resp)
.map_err(|_| {
match self.process_response(&peer, &req_info, resp) {
Err(err) => {
// No valid justification received, try next peer in our set.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, req_info);
@@ -231,15 +252,22 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
"🥩 ran out of peers to request justif #{:?} from", block
);
}
})
.map(|proof| {
// Report peer based on error type.
if let Error::InvalidResponse(peer_report) = err {
ResponseInfo::PeerReport(peer_report)
} else {
ResponseInfo::Pending
}
},
Ok(proof) => {
metric_inc!(self, beefy_on_demand_justification_good_proof);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 received valid on-demand justif #{:?} from {:?}", block, peer
);
proof
})
.ok()
let peer_report = PeerReport { who: peer, cost_benefit: benefit::VALIDATED_PROOF };
ResponseInfo::ValidProof(proof, peer_report)
},
}
}
}
@@ -109,6 +109,7 @@ where
.ok_or_else(|| ImportError("Unknown validator set".to_string()))?;
decode_and_verify_finality_proof::<Block>(&encoded[..], number, &validator_set)
.map_err(|(err, _)| err)
}
}
@@ -42,9 +42,9 @@ pub(crate) fn decode_and_verify_finality_proof<Block: BlockT>(
encoded: &[u8],
target_number: NumberFor<Block>,
validator_set: &ValidatorSet<AuthorityId>,
) -> Result<BeefyVersionedFinalityProof<Block>, ConsensusError> {
) -> Result<BeefyVersionedFinalityProof<Block>, (ConsensusError, u32)> {
let proof = <BeefyVersionedFinalityProof<Block>>::decode(&mut &*encoded)
.map_err(|_| ConsensusError::InvalidJustification)?;
.map_err(|_| (ConsensusError::InvalidJustification, 0))?;
verify_with_validator_set::<Block>(target_number, validator_set, &proof).map(|_| proof)
}
@@ -53,14 +53,15 @@ pub(crate) fn verify_with_validator_set<Block: BlockT>(
target_number: NumberFor<Block>,
validator_set: &ValidatorSet<AuthorityId>,
proof: &BeefyVersionedFinalityProof<Block>,
) -> Result<(), ConsensusError> {
) -> Result<(), (ConsensusError, u32)> {
let mut signatures_checked = 0u32;
match proof {
VersionedFinalityProof::V1(signed_commitment) => {
if signed_commitment.signatures.len() != validator_set.len() ||
signed_commitment.commitment.validator_set_id != validator_set.id() ||
signed_commitment.commitment.block_number != target_number
{
return Err(ConsensusError::InvalidJustification)
return Err((ConsensusError::InvalidJustification, 0))
}
// Arrangement of signatures in the commitment should be in the same order
@@ -73,14 +74,17 @@ pub(crate) fn verify_with_validator_set<Block: BlockT>(
.filter(|(id, signature)| {
signature
.as_ref()
.map(|sig| BeefyKeystore::verify(id, sig, &message[..]))
.map(|sig| {
signatures_checked += 1;
BeefyKeystore::verify(id, sig, &message[..])
})
.unwrap_or(false)
})
.count();
if valid_signatures >= crate::round::threshold(validator_set.len()) {
Ok(())
} else {
Err(ConsensusError::InvalidJustification)
Err((ConsensusError::InvalidJustification, signatures_checked))
}
},
}
@@ -127,16 +131,16 @@ pub(crate) mod tests {
// wrong block number -> should fail verification
let good_proof = proof.clone().into();
match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &good_proof) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
Err((ConsensusError::InvalidJustification, 0)) => (),
e => assert!(false, "Got unexpected {:?}", e),
};
// wrong validator set id -> should fail verification
let good_proof = proof.clone().into();
let other = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap();
match verify_with_validator_set::<Block>(block_num, &other, &good_proof) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
Err((ConsensusError::InvalidJustification, 0)) => (),
e => assert!(false, "Got unexpected {:?}", e),
};
// wrong signatures length -> should fail verification
@@ -147,8 +151,8 @@ pub(crate) mod tests {
};
bad_signed_commitment.signatures.pop().flatten().unwrap();
match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
Err((ConsensusError::InvalidJustification, 0)) => (),
e => assert!(false, "Got unexpected {:?}", e),
};
// not enough signatures -> should fail verification
@@ -158,9 +162,9 @@ pub(crate) mod tests {
};
// remove a signature (but same length)
*bad_signed_commitment.signatures.first_mut().unwrap() = None;
match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
match verify_with_validator_set::<Block>(block_num, &validator_set, &bad_proof.into()) {
Err((ConsensusError::InvalidJustification, 2)) => (),
e => assert!(false, "Got unexpected {:?}", e),
};
// not enough _correct_ signatures -> should fail verification
@@ -171,9 +175,9 @@ pub(crate) mod tests {
// change a signature to a different key
*bad_signed_commitment.signatures.first_mut().unwrap() =
Some(Keyring::Dave.sign(&bad_signed_commitment.commitment.encode()));
match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
Err(ConsensusError::InvalidJustification) => (),
_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
match verify_with_validator_set::<Block>(block_num, &validator_set, &bad_proof.into()) {
Err((ConsensusError::InvalidJustification, 3)) => (),
e => assert!(false, "Got unexpected {:?}", e),
};
}
+12 -7
View File
@@ -52,7 +52,11 @@ use sp_consensus_beefy::{
use sp_keystore::KeystorePtr;
use sp_mmr_primitives::MmrApi;
use sp_runtime::traits::{Block, Zero};
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
use std::{
collections::{BTreeMap, VecDeque},
marker::PhantomData,
sync::Arc,
};
mod aux_schema;
mod error;
@@ -249,9 +253,10 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let gossip_validator =
Arc::new(communication::gossip::GossipValidator::new(known_peers.clone()));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name,
@@ -295,7 +300,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
return
}
let worker_params = worker::WorkerParams {
let worker = worker::BeefyWorker {
backend,
payload_provider,
runtime,
@@ -303,14 +308,14 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
key_store: key_store.into(),
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
links,
metrics,
pending_justifications: BTreeMap::new(),
persisted_state,
};
let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params);
futures::future::join(
worker.run(block_import_justif, finality_notifications),
on_demand_justifications_handler.run(),
@@ -228,8 +228,8 @@ impl PrometheusRegister for OnDemandIncomingRequestsMetrics {
pub struct OnDemandOutgoingRequestsMetrics {
/// Number of times there was no good peer to request justification from
pub beefy_on_demand_justification_no_peer_to_request_from: Counter<U64>,
/// Number of on-demand justification peer hang up
pub beefy_on_demand_justification_peer_hang_up: Counter<U64>,
/// Number of on-demand justification peer refused valid requests
pub beefy_on_demand_justification_peer_refused: Counter<U64>,
/// Number of on-demand justification peer error
pub beefy_on_demand_justification_peer_error: Counter<U64>,
/// Number of on-demand justification invalid proof
@@ -249,10 +249,10 @@ impl PrometheusRegister for OnDemandOutgoingRequestsMetrics {
)?,
registry,
)?,
beefy_on_demand_justification_peer_hang_up: register(
beefy_on_demand_justification_peer_refused: register(
Counter::new(
"substrate_beefy_on_demand_justification_peer_hang_up",
"Number of on-demand justification peer hang up",
"beefy_on_demand_justification_peer_refused",
"Number of on-demand justification peer refused valid requests",
)?,
registry,
)?,
@@ -24,6 +24,7 @@ use crate::{
communication::{
gossip::{
proofs_topic, tests::sign_commitment, votes_topic, GossipFilterCfg, GossipMessage,
GossipValidator,
},
request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler},
},
@@ -357,8 +358,8 @@ async fn voter_init_setup(
) -> sp_blockchain::Result<PersistedState<Block>> {
let backend = net.peer(0).client().as_backend();
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
let (gossip_validator, _) = GossipValidator::new(known_peers);
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
net.peer(0).sync_service().clone(),
@@ -1262,8 +1263,8 @@ async fn gossipped_finality_proofs() {
let charlie = &net.peers[2];
let known_peers = Arc::new(Mutex::new(KnownPeers::<Block>::new()));
// Charlie will run just the gossip engine and not the full voter.
let charlie_gossip_validator =
Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
let (gossip_validator, _) = GossipValidator::new(known_peers);
let charlie_gossip_validator = Arc::new(gossip_validator);
charlie_gossip_validator.update_filter(GossipFilterCfg::<Block> {
start: 1,
end: 10,
+44 -74
View File
@@ -19,7 +19,8 @@
use crate::{
communication::{
gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
peers::PeerReport,
request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo},
},
error::Error,
justification::BeefyVersionedFinalityProof,
@@ -34,7 +35,7 @@ use futures::{stream::Fuse, FutureExt, StreamExt};
use log::{debug, error, info, log_enabled, trace, warn};
use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
use sc_network_gossip::GossipEngine;
use sc_utils::notification::NotificationReceiver;
use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
use sp_api::{BlockId, ProvideRuntimeApi};
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
use sp_consensus::SyncOracle;
@@ -255,20 +256,6 @@ impl<B: Block> VoterOracle<B> {
}
}
pub(crate) struct WorkerParams<B: Block, BE, P, R, S> {
pub backend: Arc<BE>,
pub payload_provider: P,
pub runtime: Arc<R>,
pub sync: Arc<S>,
pub key_store: BeefyKeystore,
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
pub links: BeefyVoterLinks<B>,
pub metrics: Option<VoterMetrics>,
pub persisted_state: PersistedState<B>,
}
#[derive(Debug, Decode, Encode, PartialEq)]
pub(crate) struct PersistedState<B: Block> {
/// Best block we voted on.
@@ -311,28 +298,29 @@ impl<B: Block> PersistedState<B> {
/// A BEEFY worker plays the BEEFY protocol
pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
// utilities
backend: Arc<BE>,
payload_provider: P,
runtime: Arc<RuntimeApi>,
sync: Arc<S>,
key_store: BeefyKeystore,
pub backend: Arc<BE>,
pub payload_provider: P,
pub runtime: Arc<RuntimeApi>,
pub sync: Arc<S>,
pub key_store: BeefyKeystore,
// communication
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
on_demand_justifications: OnDemandJustificationsEngine<B>,
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
// channels
/// Links between the block importer, the background voter and the RPC layer.
links: BeefyVoterLinks<B>,
pub links: BeefyVoterLinks<B>,
// voter state
/// BEEFY client metrics.
metrics: Option<VoterMetrics>,
pub metrics: Option<VoterMetrics>,
/// Buffer holding justifications for future processing.
pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
/// Persisted voter state.
persisted_state: PersistedState<B>,
pub persisted_state: PersistedState<B>,
}
impl<B, BE, P, R, S> BeefyWorker<B, BE, P, R, S>
@@ -344,43 +332,6 @@ where
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
/// Return a new BEEFY worker instance.
///
/// Note that a BEEFY worker is only fully functional if a corresponding
/// BEEFY pallet has been deployed on-chain.
///
/// The BEEFY pallet is needed in order to keep track of the BEEFY authority set.
pub(crate) fn new(worker_params: WorkerParams<B, BE, P, R, S>) -> Self {
let WorkerParams {
backend,
payload_provider,
runtime,
key_store,
sync,
gossip_engine,
gossip_validator,
on_demand_justifications,
links,
metrics,
persisted_state,
} = worker_params;
BeefyWorker {
backend,
payload_provider,
runtime,
sync,
key_store,
gossip_engine,
gossip_validator,
on_demand_justifications,
links,
metrics,
pending_justifications: BTreeMap::new(),
persisted_state,
}
}
fn best_grandpa_block(&self) -> NumberFor<B> {
*self.persisted_state.voting_oracle.best_grandpa_block_header.number()
}
@@ -849,7 +800,12 @@ where
// Act on changed 'state'.
self.process_new_state();
// Mutable reference used to drive the gossip engine.
let mut gossip_engine = &mut self.gossip_engine;
// Use temp val and report after async section,
// to avoid having to Mutex-wrap `gossip_engine`.
let mut gossip_report: Option<PeerReport> = None;
// Wait for, and handle external events.
// The branches below only change 'state', actual voting happens afterwards,
// based on the new resulting 'state'.
@@ -870,11 +826,16 @@ where
return;
},
// Process incoming justifications as these can make some in-flight votes obsolete.
justif = self.on_demand_justifications.next().fuse() => {
if let Some(justif) = justif {
if let Err(err) = self.triage_incoming_justif(justif) {
debug!(target: LOG_TARGET, "🥩 {}", err);
}
response_info = self.on_demand_justifications.next().fuse() => {
match response_info {
ResponseInfo::ValidProof(justif, peer_report) => {
if let Err(err) = self.triage_incoming_justif(justif) {
debug!(target: LOG_TARGET, "🥩 {}", err);
}
gossip_report = Some(peer_report);
},
ResponseInfo::PeerReport(peer_report) => gossip_report = Some(peer_report),
ResponseInfo::Pending => (),
}
},
justif = block_import_justif.next() => {
@@ -918,6 +879,13 @@ where
return;
}
},
// Process peer reports.
report = self.gossip_report_stream.next() => {
gossip_report = report;
},
}
if let Some(PeerReport { who, cost_benefit }) = gossip_report {
self.gossip_engine.report(who, cost_benefit);
}
}
}
@@ -1122,7 +1090,8 @@ pub(crate) mod tests {
let network = peer.network_service().clone();
let sync = peer.sync_service().clone();
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator = Arc::new(GossipValidator::new(known_peers.clone()));
let (gossip_validator, gossip_report_stream) = GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
@@ -1152,7 +1121,7 @@ pub(crate) mod tests {
)
.unwrap();
let payload_provider = MmrRootProvider::new(api.clone());
let worker_params = crate::worker::WorkerParams {
BeefyWorker {
backend,
payload_provider,
runtime: api,
@@ -1160,12 +1129,13 @@ pub(crate) mod tests {
links,
gossip_engine,
gossip_validator,
gossip_report_stream,
metrics,
sync: Arc::new(sync),
on_demand_justifications,
pending_justifications: BTreeMap::new(),
persisted_state,
};
BeefyWorker::<_, _, _, _, _>::new(worker_params)
}
}
#[test]