From 1c6867c6ed44dc2e17157f684bd1a40418d5c3d9 Mon Sep 17 00:00:00 2001 From: Adrian Catangiu Date: Fri, 29 Jul 2022 18:47:21 +0300 Subject: [PATCH] Lean BEEFY to Full BEEFY - don't skip (older) mandatory blocks and import justifications (#11821) * client/beefy: don't accept vote for older rounds * client/beefy: clean up and reorg the worker struct * client/beefy: first step towards Full BEEFY The first step from Lean->Full BEEFY is to have the worker enforce uninterrupted line of BEEFY finalized mandatory blocks. There is one mandatory block per session (the first block in the session). As such, votes processing and votes generation now enforces that all mandatory blocks are finalized in strict monotonically increasing sequence and no block 'N' will be worked on if there is any GRANDPA finalized but BEEFY non-final mandatory block 'M', where 'M < N'. Implementation details: - Introduced 'VoterOracle' to separate the voting decisions logic, and track new/pending sessions. - New sessions get queued up with the worker operating either: 1. up-to-date - all mandatory blocks leading up to current GRANDPA finalized: queue has ONE element, the 'current session' where `mandatory_done == true`, 2. lagging behind GRANDPA: queue has [1, N] elements, where all `mandatory_done == false`. In this state, everytime a session gets its mandatory block BEEFY finalized, the session is popped off the queue, eventually getting to operating mode `1. up-to-date`. - Votes get triaged and those that fall withing the `VoterOracle` allowed window get processed, the others get dropped if stale, or buffered for later processing (when they reach the window). - Worker general code was also updated to fall in one of two roles: 1. react to external events and change internal 'state', 2. generate events/votes based on internal 'state'. Signed-off-by: acatangiu * client/beefy: sketch idea for block import and sync Signed-off-by: acatangiu * client/beefy: add BEEFY block import * client/beefy: process justifications from block import * client/beefy: add TODOs for sync protocol * client/beefy: add more docs and comments * client/beefy-rpc: fix RPC error * client/beefy: verify justification validity on block import * client/beefy: more tests * client/beefy: small fixes - first handle and note the self vote before gossiping it, - don't shortcircuit on err when processing pending votes. * client/beefy: remove invalid justifications at block import * todo: beefy block import tests * RFC: ideas for multiple justifications per block * Revert "RFC: ideas for multiple justifications per block" This reverts commit 8256fb07d3124db69daf252720b3c0208202624d. * client/beefy: append justif to backend on block import * client/beefy: groundwork for block import test * client/beefy: groundwork2 for block import test * client/beefy: groundwork3 for block import test * client/beefy: add block import test * client/beefy: add required trait bounds to block import builder * remove client from beefy block import, backend gets the job done Signed-off-by: acatangiu --- substrate/Cargo.lock | 2 + substrate/client/beefy/Cargo.toml | 4 +- substrate/client/beefy/rpc/src/lib.rs | 5 +- .../client/beefy/rpc/src/notification.rs | 2 +- substrate/client/beefy/src/error.rs | 4 + substrate/client/beefy/src/import.rs | 205 ++++ substrate/client/beefy/src/justification.rs | 177 ++++ substrate/client/beefy/src/lib.rs | 101 +- substrate/client/beefy/src/metrics.rs | 10 +- substrate/client/beefy/src/notification.rs | 6 +- substrate/client/beefy/src/round.rs | 82 +- substrate/client/beefy/src/tests.rs | 199 +++- substrate/client/beefy/src/worker.rs | 936 ++++++++++++------ substrate/client/network/test/src/lib.rs | 15 +- substrate/primitives/beefy/src/commitment.rs | 6 + substrate/primitives/runtime/src/lib.rs | 5 + 16 files changed, 1346 insertions(+), 413 deletions(-) create mode 100644 substrate/client/beefy/src/import.rs create mode 100644 substrate/client/beefy/src/justification.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 926d6d4cdf..34bf34c62c 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -440,6 +440,7 @@ dependencies = [ name = "beefy-gadget" version = "4.0.0-dev" dependencies = [ + "async-trait", "beefy-primitives", "fnv", "futures", @@ -448,6 +449,7 @@ dependencies = [ "log", "parity-scale-codec", "parking_lot 0.12.0", + "sc-block-builder", "sc-chain-spec", "sc-client-api", "sc-consensus", diff --git a/substrate/client/beefy/Cargo.toml b/substrate/client/beefy/Cargo.toml index b910f7ce30..e219420959 100644 --- a/substrate/client/beefy/Cargo.toml +++ b/substrate/client/beefy/Cargo.toml @@ -9,6 +9,7 @@ description = "BEEFY Client gadget for substrate" homepage = "https://substrate.io" [dependencies] +async-trait = "0.1.50" codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] } fnv = "1.0.6" futures = "0.3" @@ -22,6 +23,7 @@ beefy-primitives = { version = "4.0.0-dev", path = "../../primitives/beefy" } prometheus = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } sc-chain-spec = { version = "4.0.0-dev", path = "../../client/chain-spec" } sc-client-api = { version = "4.0.0-dev", path = "../api" } +sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sc-finality-grandpa = { version = "0.10.0-dev", path = "../../client/finality-grandpa" } sc-keystore = { version = "4.0.0-dev", path = "../keystore" } sc-network = { version = "0.10.0-dev", path = "../network" } @@ -42,7 +44,7 @@ serde = "1.0.136" strum = { version = "0.24.1", features = ["derive"] } tempfile = "3.1.0" tokio = "1.17.0" -sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } +sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } sc-network-test = { version = "0.8.0", path = "../network/test" } sp-finality-grandpa = { version = "4.0.0-dev", path = "../../primitives/finality-grandpa" } sp-keyring = { version = "6.0.0", path = "../../primitives/keyring" } diff --git a/substrate/client/beefy/rpc/src/lib.rs b/substrate/client/beefy/rpc/src/lib.rs index 2c3ffda056..13bca08d37 100644 --- a/substrate/client/beefy/rpc/src/lib.rs +++ b/substrate/client/beefy/rpc/src/lib.rs @@ -164,8 +164,9 @@ where mod tests { use super::*; - use beefy_gadget::notification::{ - BeefyBestBlockStream, BeefySignedCommitment, BeefySignedCommitmentSender, + use beefy_gadget::{ + justification::BeefySignedCommitment, + notification::{BeefyBestBlockStream, BeefySignedCommitmentSender}, }; use beefy_primitives::{known_payload_ids, Payload}; use codec::{Decode, Encode}; diff --git a/substrate/client/beefy/rpc/src/notification.rs b/substrate/client/beefy/rpc/src/notification.rs index 2f58c7c6bb..cdda667782 100644 --- a/substrate/client/beefy/rpc/src/notification.rs +++ b/substrate/client/beefy/rpc/src/notification.rs @@ -29,7 +29,7 @@ pub struct EncodedSignedCommitment(sp_core::Bytes); impl EncodedSignedCommitment { pub fn new( - signed_commitment: beefy_gadget::notification::BeefySignedCommitment, + signed_commitment: beefy_gadget::justification::BeefySignedCommitment, ) -> Self where Block: BlockT, diff --git a/substrate/client/beefy/src/error.rs b/substrate/client/beefy/src/error.rs index eacadeb761..dd5fd649d5 100644 --- a/substrate/client/beefy/src/error.rs +++ b/substrate/client/beefy/src/error.rs @@ -24,8 +24,12 @@ use std::fmt::Debug; #[derive(Debug, thiserror::Error, PartialEq)] pub enum Error { + #[error("Backend: {0}")] + Backend(String), #[error("Keystore error: {0}")] Keystore(String), #[error("Signature error: {0}")] Signature(String), + #[error("Session uninitialized")] + UninitSession, } diff --git a/substrate/client/beefy/src/import.rs b/substrate/client/beefy/src/import.rs new file mode 100644 index 0000000000..7caeb49db5 --- /dev/null +++ b/substrate/client/beefy/src/import.rs @@ -0,0 +1,205 @@ +// This file is part of Substrate. + +// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use beefy_primitives::{crypto::Signature, BeefyApi, VersionedFinalityProof, BEEFY_ENGINE_ID}; +use codec::Encode; +use log::error; +use std::{collections::HashMap, sync::Arc}; + +use sp_api::{ProvideRuntimeApi, TransactionFor}; +use sp_blockchain::{well_known_cache_keys, HeaderBackend}; +use sp_consensus::Error as ConsensusError; +use sp_runtime::{ + generic::BlockId, + traits::{Block as BlockT, Header as HeaderT, NumberFor}, + EncodedJustification, +}; + +use sc_client_api::backend::Backend; +use sc_consensus::{BlockCheckParams, BlockImport, BlockImportParams, ImportResult}; + +use crate::{ + justification::decode_and_verify_commitment, notification::BeefySignedCommitmentSender, +}; + +/// A block-import handler for BEEFY. +/// +/// This scans each imported block for BEEFY justifications and verifies them. +/// Wraps a `inner: BlockImport` and ultimately defers to it. +/// +/// When using BEEFY, the block import worker should be using this block import object. +pub struct BeefyBlockImport { + backend: Arc, + runtime: Arc, + inner: I, + justification_sender: BeefySignedCommitmentSender, +} + +impl Clone for BeefyBlockImport { + fn clone(&self) -> Self { + BeefyBlockImport { + backend: self.backend.clone(), + runtime: self.runtime.clone(), + inner: self.inner.clone(), + justification_sender: self.justification_sender.clone(), + } + } +} + +impl BeefyBlockImport { + /// Create a new BeefyBlockImport. + pub fn new( + backend: Arc, + runtime: Arc, + inner: I, + justification_sender: BeefySignedCommitmentSender, + ) -> BeefyBlockImport { + BeefyBlockImport { backend, runtime, inner, justification_sender } + } +} + +impl BeefyBlockImport +where + Block: BlockT, + BE: Backend, + Runtime: ProvideRuntimeApi, + Runtime::Api: BeefyApi + Send + Sync, +{ + fn decode_and_verify( + &self, + encoded: &EncodedJustification, + number: NumberFor, + hash: ::Hash, + ) -> Result, Signature>, ConsensusError> { + let block_id = BlockId::hash(hash); + let validator_set = self + .runtime + .runtime_api() + .validator_set(&block_id) + .map_err(|e| ConsensusError::ClientImport(e.to_string()))? + .ok_or_else(|| ConsensusError::ClientImport("Unknown validator set".to_string()))?; + + decode_and_verify_commitment::(&encoded[..], number, &validator_set) + } + + /// Import BEEFY justification: Send it to worker for processing and also append it to backend. + /// + /// This function assumes: + /// - `justification` is verified and valid, + /// - the block referred by `justification` has been imported _and_ finalized. + fn import_beefy_justification_unchecked( + &self, + number: NumberFor, + justification: VersionedFinalityProof, Signature>, + ) { + // Append the justification to the block in the backend. + if let Err(e) = self.backend.append_justification( + BlockId::Number(number), + (BEEFY_ENGINE_ID, justification.encode()), + ) { + error!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, justification); + } + // Send the justification to the BEEFY voter for processing. + match justification { + // TODO #11838: Should not unpack, these channels should also use + // `VersionedFinalityProof`. + VersionedFinalityProof::V1(signed_commitment) => self + .justification_sender + .notify(|| Ok::<_, ()>(signed_commitment)) + .expect("forwards closure result; the closure always returns Ok; qed."), + }; + } +} + +#[async_trait::async_trait] +impl BlockImport for BeefyBlockImport +where + Block: BlockT, + BE: Backend, + I: BlockImport< + Block, + Error = ConsensusError, + Transaction = sp_api::TransactionFor, + > + Send + + Sync, + Runtime: ProvideRuntimeApi + Send + Sync, + Runtime::Api: BeefyApi, +{ + type Error = ConsensusError; + type Transaction = TransactionFor; + + async fn import_block( + &mut self, + mut block: BlockImportParams, + new_cache: HashMap>, + ) -> Result { + let hash = block.post_hash(); + let number = *block.header.number(); + + let beefy_proof = block + .justifications + .as_mut() + .and_then(|just| { + let decoded = just + .get(BEEFY_ENGINE_ID) + .map(|encoded| self.decode_and_verify(encoded, number, hash)); + // Remove BEEFY justification from the list before giving to `inner`; + // we will append it to backend ourselves at the end if all goes well. + just.remove(BEEFY_ENGINE_ID); + decoded + }) + .transpose() + .unwrap_or(None); + + // Run inner block import. + let inner_import_result = self.inner.import_block(block, new_cache).await?; + + match (beefy_proof, &inner_import_result) { + (Some(proof), ImportResult::Imported(_)) => { + let status = self.backend.blockchain().info(); + if number <= status.finalized_number && + Some(hash) == + self.backend + .blockchain() + .hash(number) + .map_err(|e| ConsensusError::ClientImport(e.to_string()))? + { + // The proof is valid and the block is imported and final, we can import. + self.import_beefy_justification_unchecked(number, proof); + } else { + error!( + target: "beefy", + "🥩 Cannot import justification: {:?} for, not yet final, block number {:?}", + proof, + number, + ); + } + }, + _ => (), + } + + Ok(inner_import_result) + } + + async fn check_block( + &mut self, + block: BlockCheckParams, + ) -> Result { + self.inner.check_block(block).await + } +} diff --git a/substrate/client/beefy/src/justification.rs b/substrate/client/beefy/src/justification.rs new file mode 100644 index 0000000000..2a5191daec --- /dev/null +++ b/substrate/client/beefy/src/justification.rs @@ -0,0 +1,177 @@ +// This file is part of Substrate. + +// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::keystore::BeefyKeystore; +use beefy_primitives::{ + crypto::{AuthorityId, Signature}, + ValidatorSet, VersionedFinalityProof, +}; +use codec::{Decode, Encode}; +use sp_consensus::Error as ConsensusError; +use sp_runtime::traits::{Block as BlockT, NumberFor}; + +/// A commitment with matching BEEFY authorities' signatures. +pub type BeefySignedCommitment = + beefy_primitives::SignedCommitment, beefy_primitives::crypto::Signature>; + +/// Decode and verify a Beefy SignedCommitment. +pub(crate) fn decode_and_verify_commitment( + encoded: &[u8], + target_number: NumberFor, + validator_set: &ValidatorSet, +) -> Result, Signature>, ConsensusError> { + let proof = , Signature>>::decode(&mut &*encoded) + .map_err(|_| ConsensusError::InvalidJustification)?; + verify_with_validator_set::(target_number, validator_set, &proof).map(|_| proof) +} + +/// Verify the Beefy finality proof against the validator set at the block it was generated. +fn verify_with_validator_set( + target_number: NumberFor, + validator_set: &ValidatorSet, + proof: &VersionedFinalityProof, Signature>, +) -> Result<(), ConsensusError> { + match proof { + VersionedFinalityProof::V1(signed_commitment) => { + if signed_commitment.signatures.len() != validator_set.len() || + signed_commitment.commitment.validator_set_id != validator_set.id() || + signed_commitment.commitment.block_number != target_number + { + return Err(ConsensusError::InvalidJustification) + } + + // Arrangement of signatures in the commitment should be in the same order + // as validators for that set. + let message = signed_commitment.commitment.encode(); + let valid_signatures = validator_set + .validators() + .into_iter() + .zip(signed_commitment.signatures.iter()) + .filter(|(id, signature)| { + signature + .as_ref() + .map(|sig| BeefyKeystore::verify(id, sig, &message[..])) + .unwrap_or(false) + }) + .count(); + if valid_signatures >= crate::round::threshold(validator_set.len()) { + Ok(()) + } else { + Err(ConsensusError::InvalidJustification) + } + }, + } +} + +#[cfg(test)] +pub(crate) mod tests { + use beefy_primitives::{known_payload_ids, Commitment, Payload, SignedCommitment}; + use substrate_test_runtime_client::runtime::Block; + + use super::*; + use crate::{keystore::tests::Keyring, tests::make_beefy_ids}; + + pub(crate) fn new_signed_commitment( + block_num: NumberFor, + validator_set: &ValidatorSet, + keys: &[Keyring], + ) -> BeefySignedCommitment { + let commitment = Commitment { + payload: Payload::new(known_payload_ids::MMR_ROOT_ID, vec![]), + block_number: block_num, + validator_set_id: validator_set.id(), + }; + let message = commitment.encode(); + let signatures = keys.iter().map(|key| Some(key.sign(&message))).collect(); + SignedCommitment { commitment, signatures } + } + + #[test] + fn should_verify_with_validator_set() { + let keys = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + + // build valid justification + let block_num = 42; + let proof = new_signed_commitment(block_num, &validator_set, keys); + + let good_proof = proof.clone().into(); + // should verify successfully + verify_with_validator_set::(block_num, &validator_set, &good_proof).unwrap(); + + // wrong block number -> should fail verification + let good_proof = proof.clone().into(); + match verify_with_validator_set::(block_num + 1, &validator_set, &good_proof) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + + // wrong validator set id -> should fail verification + let good_proof = proof.clone().into(); + let other = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); + match verify_with_validator_set::(block_num, &other, &good_proof) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + + // wrong signatures length -> should fail verification + let mut bad_proof = proof.clone(); + // change length of signatures + bad_proof.signatures.pop().flatten().unwrap(); + match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + + // not enough signatures -> should fail verification + let mut bad_proof = proof.clone(); + // remove a signature (but same length) + *bad_proof.signatures.first_mut().unwrap() = None; + match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + + // not enough _correct_ signatures -> should fail verification + let mut bad_proof = proof.clone(); + // change a signature to a different key + *bad_proof.signatures.first_mut().unwrap() = + Some(Keyring::Dave.sign(&proof.commitment.encode())); + match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + } + + #[test] + fn should_decode_and_verify_commitment() { + let keys = &[Keyring::Alice, Keyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let block_num = 1; + + // build valid justification + let proof = new_signed_commitment(block_num, &validator_set, keys); + let versioned_proof: VersionedFinalityProof, Signature> = proof.into(); + let encoded = versioned_proof.encode(); + + // should successfully decode and verify + let verified = + decode_and_verify_commitment::(&encoded, block_num, &validator_set).unwrap(); + assert_eq!(verified, versioned_proof); + } +} diff --git a/substrate/client/beefy/src/lib.rs b/substrate/client/beefy/src/lib.rs index c025ec5686..81c72dec8c 100644 --- a/substrate/client/beefy/src/lib.rs +++ b/substrate/client/beefy/src/lib.rs @@ -16,23 +16,18 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::sync::Arc; - +use beefy_primitives::{BeefyApi, MmrRootHash}; use prometheus::Registry; - use sc_client_api::{Backend, BlockchainEvents, Finalizer}; +use sc_consensus::BlockImport; use sc_network_gossip::Network as GossipNetwork; - use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; -use sp_consensus::SyncOracle; +use sp_consensus::{Error as ConsensusError, SyncOracle}; use sp_keystore::SyncCryptoStorePtr; use sp_mmr_primitives::MmrApi; use sp_runtime::traits::Block; - -use beefy_primitives::{BeefyApi, MmrRootHash}; - -use crate::notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}; +use std::sync::Arc; mod error; mod gossip; @@ -41,11 +36,21 @@ mod metrics; mod round; mod worker; +pub mod import; +pub mod justification; pub mod notification; #[cfg(test)] mod tests; +use crate::{ + import::BeefyBlockImport, + notification::{ + BeefyBestBlockSender, BeefyBestBlockStream, BeefySignedCommitmentSender, + BeefySignedCommitmentStream, + }, +}; + pub use beefy_protocol_name::standard_name as protocol_standard_name; pub(crate) mod beefy_protocol_name { @@ -110,6 +115,68 @@ where // empty } +/// Links between the block importer, the background voter and the RPC layer, +/// to be used by the voter. +#[derive(Clone)] +pub struct BeefyVoterLinks { + // BlockImport -> Voter links + /// Stream of BEEFY signed commitments from block import to voter. + pub from_block_import_justif_stream: BeefySignedCommitmentStream, + + // Voter -> RPC links + /// Sends BEEFY signed commitments from voter to RPC. + pub to_rpc_justif_sender: BeefySignedCommitmentSender, + /// Sends BEEFY best block hashes from voter to RPC. + pub to_rpc_best_block_sender: BeefyBestBlockSender, +} + +/// Links used by the BEEFY RPC layer, from the BEEFY background voter. +#[derive(Clone)] +pub struct BeefyRPCLinks { + /// Stream of signed commitments coming from the voter. + pub from_voter_justif_stream: BeefySignedCommitmentStream, + /// Stream of BEEFY best block hashes coming from the voter. + pub from_voter_best_beefy_stream: BeefyBestBlockStream, +} + +/// Make block importer and link half necessary to tie the background voter to it. +pub fn beefy_block_import_and_links( + wrapped_block_import: I, + backend: Arc, + runtime: Arc, +) -> (BeefyBlockImport, BeefyVoterLinks, BeefyRPCLinks) +where + B: Block, + BE: Backend, + I: BlockImport> + + Send + + Sync, + RuntimeApi: ProvideRuntimeApi + Send + Sync, + RuntimeApi::Api: BeefyApi, +{ + // Voter -> RPC links + let (to_rpc_justif_sender, from_voter_justif_stream) = + notification::BeefySignedCommitmentStream::::channel(); + let (to_rpc_best_block_sender, from_voter_best_beefy_stream) = + notification::BeefyBestBlockStream::::channel(); + + // BlockImport -> Voter links + let (to_voter_justif_sender, from_block_import_justif_stream) = + notification::BeefySignedCommitmentStream::::channel(); + + // BlockImport + let import = + BeefyBlockImport::new(backend, runtime, wrapped_block_import, to_voter_justif_sender); + let voter_links = BeefyVoterLinks { + from_block_import_justif_stream, + to_rpc_justif_sender, + to_rpc_best_block_sender, + }; + let rpc_links = BeefyRPCLinks { from_voter_best_beefy_stream, from_voter_justif_stream }; + + (import, voter_links, rpc_links) +} + /// BEEFY gadget initialization parameters. pub struct BeefyParams where @@ -130,16 +197,14 @@ where pub key_store: Option, /// Gossip network pub network: N, - /// BEEFY signed commitment sender - pub signed_commitment_sender: BeefySignedCommitmentSender, - /// BEEFY best block sender - pub beefy_best_block_sender: BeefyBestBlockSender, /// Minimal delta between blocks, BEEFY should vote for pub min_block_delta: u32, /// Prometheus metric registry pub prometheus_registry: Option, /// Chain specific GRANDPA protocol name. See [`beefy_protocol_name::standard_name`]. pub protocol_name: std::borrow::Cow<'static, str>, + /// Links between the block importer, the background voter and the RPC layer. + pub links: BeefyVoterLinks, } /// Start the BEEFY gadget. @@ -160,11 +225,10 @@ where runtime, key_store, network, - signed_commitment_sender, - beefy_best_block_sender, min_block_delta, prometheus_registry, protocol_name, + links, } = beefy_params; let sync_oracle = network.clone(); @@ -194,14 +258,13 @@ where client, backend, runtime, + sync_oracle, key_store: key_store.into(), - signed_commitment_sender, - beefy_best_block_sender, gossip_engine, gossip_validator, - min_block_delta, + links, metrics, - sync_oracle, + min_block_delta, }; let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params); diff --git a/substrate/client/beefy/src/metrics.rs b/substrate/client/beefy/src/metrics.rs index a6d29dbf88..71e34e24c4 100644 --- a/substrate/client/beefy/src/metrics.rs +++ b/substrate/client/beefy/src/metrics.rs @@ -32,8 +32,8 @@ pub(crate) struct Metrics { pub beefy_best_block: Gauge, /// Next block BEEFY should vote on pub beefy_should_vote_on: Gauge, - /// Number of sessions without a signed commitment - pub beefy_skipped_sessions: Counter, + /// Number of sessions with lagging signed commitment on mandatory block + pub beefy_lagging_sessions: Counter, } impl Metrics { @@ -65,10 +65,10 @@ impl Metrics { Gauge::new("substrate_beefy_should_vote_on", "Next block, BEEFY should vote on")?, registry, )?, - beefy_skipped_sessions: register( + beefy_lagging_sessions: register( Counter::new( - "substrate_beefy_skipped_sessions", - "Number of sessions without a signed commitment", + "substrate_beefy_lagging_sessions", + "Number of sessions with lagging signed commitment on mandatory block", )?, registry, )?, diff --git a/substrate/client/beefy/src/notification.rs b/substrate/client/beefy/src/notification.rs index 7c18d809f6..9479891714 100644 --- a/substrate/client/beefy/src/notification.rs +++ b/substrate/client/beefy/src/notification.rs @@ -17,11 +17,9 @@ // along with this program. If not, see . use sc_utils::notification::{NotificationSender, NotificationStream, TracingKeyStr}; -use sp_runtime::traits::{Block as BlockT, NumberFor}; +use sp_runtime::traits::Block as BlockT; -/// A commitment with matching BEEFY authorities' signatures. -pub type BeefySignedCommitment = - beefy_primitives::SignedCommitment, beefy_primitives::crypto::Signature>; +use crate::justification::BeefySignedCommitment; /// The sending half of the notifications channel(s) used to send /// notifications about best BEEFY block from the gadget side. diff --git a/substrate/client/beefy/src/round.rs b/substrate/client/beefy/src/round.rs index fecb9557df..ebd85c8dea 100644 --- a/substrate/client/beefy/src/round.rs +++ b/substrate/client/beefy/src/round.rs @@ -59,7 +59,8 @@ impl RoundTracker { } } -fn threshold(authorities: usize) -> usize { +/// Minimum size of `authorities` subset that produced valid signatures for a block to finalize. +pub fn threshold(authorities: usize) -> usize { let faulty = authorities.saturating_sub(1) / 3; authorities - faulty } @@ -70,9 +71,10 @@ fn threshold(authorities: usize) -> usize { /// Does not do any validation on votes or signatures, layers above need to handle that (gossip). pub(crate) struct Rounds { rounds: BTreeMap<(Payload, NumberFor), RoundTracker>, - best_done: Option>, session_start: NumberFor, validator_set: ValidatorSet, + mandatory_done: bool, + best_done: Option>, } impl Rounds @@ -81,15 +83,15 @@ where B: Block, { pub(crate) fn new(session_start: NumberFor, validator_set: ValidatorSet) -> Self { - Rounds { rounds: BTreeMap::new(), best_done: None, session_start, validator_set } + Rounds { + rounds: BTreeMap::new(), + session_start, + validator_set, + mandatory_done: false, + best_done: None, + } } -} -impl Rounds -where - P: Ord + Hash + Clone, - B: Block, -{ pub(crate) fn validator_set_id(&self) -> ValidatorSetId { self.validator_set.id() } @@ -98,8 +100,12 @@ where self.validator_set.validators() } - pub(crate) fn session_start(&self) -> &NumberFor { - &self.session_start + pub(crate) fn session_start(&self) -> NumberFor { + self.session_start + } + + pub(crate) fn mandatory_done(&self) -> bool { + self.mandatory_done } pub(crate) fn should_self_vote(&self, round: &(P, NumberFor)) -> bool { @@ -113,12 +119,9 @@ where vote: (Public, Signature), self_vote: bool, ) -> bool { - if Some(round.1.clone()) <= self.best_done { - debug!( - target: "beefy", - "🥩 received vote for old stale round {:?}, ignoring", - round.1 - ); + let num = round.1; + if num < self.session_start || Some(num) <= self.best_done { + debug!(target: "beefy", "🥩 received vote for old stale round {:?}, ignoring", num); false } else if !self.validators().iter().any(|id| vote.0 == *id) { debug!( @@ -147,6 +150,7 @@ where // remove this and older (now stale) rounds let signatures = self.rounds.remove(round)?.votes; self.rounds.retain(|&(_, number), _| number > round.1); + self.mandatory_done = self.mandatory_done || round.1 == self.session_start; self.best_done = self.best_done.max(Some(round.1)); debug!(target: "beefy", "🥩 Concluded round #{}", round.1); @@ -160,6 +164,11 @@ where None } } + + #[cfg(test)] + pub(crate) fn test_set_mandatory_done(&mut self, done: bool) { + self.mandatory_done = done; + } } #[cfg(test)] @@ -226,7 +235,7 @@ mod tests { let rounds = Rounds::::new(session_start, validators); assert_eq!(42, rounds.validator_set_id()); - assert_eq!(1, *rounds.session_start()); + assert_eq!(1, rounds.session_start()); assert_eq!( &vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], rounds.validators() @@ -307,6 +316,43 @@ mod tests { )); } + #[test] + fn old_rounds_not_accepted() { + sp_tracing::try_init_simple(); + + let validators = ValidatorSet::::new( + vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], + 42, + ) + .unwrap(); + let alice = (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")); + + let session_start = 10u64.into(); + let mut rounds = Rounds::::new(session_start, validators); + + let mut vote = (H256::from_low_u64_le(1), 9); + // add vote for previous session, should fail + assert!(!rounds.add_vote(&vote, alice.clone(), true)); + // no votes present + assert!(rounds.rounds.is_empty()); + + // simulate 11 was concluded + rounds.best_done = Some(11); + // add votes for current session, but already concluded rounds, should fail + vote.1 = 10; + assert!(!rounds.add_vote(&vote, alice.clone(), true)); + vote.1 = 11; + assert!(!rounds.add_vote(&vote, alice.clone(), true)); + // no votes present + assert!(rounds.rounds.is_empty()); + + // add good vote + vote.1 = 12; + assert!(rounds.add_vote(&vote, alice, true)); + // good vote present + assert_eq!(rounds.rounds.len(), 1); + } + #[test] fn multiple_rounds() { sp_tracing::try_init_simple(); diff --git a/substrate/client/beefy/src/tests.rs b/substrate/client/beefy/src/tests.rs index 78e697a6ad..9c8f443dd1 100644 --- a/substrate/client/beefy/src/tests.rs +++ b/substrate/client/beefy/src/tests.rs @@ -21,12 +21,15 @@ use futures::{future, stream::FuturesUnordered, Future, StreamExt}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; -use std::{sync::Arc, task::Poll}; +use std::{collections::HashMap, sync::Arc, task::Poll}; use tokio::{runtime::Runtime, time::Duration}; use sc_chain_spec::{ChainSpec, GenericChainSpec}; use sc_client_api::HeaderBackend; -use sc_consensus::BoxJustificationImport; +use sc_consensus::{ + BlockImport, BlockImportParams, BoxJustificationImport, ForkChoiceStrategy, ImportResult, + ImportedAux, +}; use sc_keystore::LocalKeystore; use sc_network_test::{ Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient, @@ -35,7 +38,8 @@ use sc_network_test::{ use sc_utils::notification::NotificationReceiver; use beefy_primitives::{ - crypto::AuthorityId, BeefyApi, ConsensusLog, MmrRootHash, ValidatorSet, BEEFY_ENGINE_ID, + crypto::{AuthorityId, Signature}, + BeefyApi, ConsensusLog, MmrRootHash, ValidatorSet, VersionedFinalityProof, BEEFY_ENGINE_ID, KEY_TYPE as BeefyKeyType, }; use sp_mmr_primitives::{ @@ -47,19 +51,32 @@ use sp_consensus::BlockOrigin; use sp_core::H256; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use sp_runtime::{ - codec::Encode, generic::BlockId, traits::Header as HeaderT, BuildStorage, DigestItem, Storage, + codec::Encode, + generic::BlockId, + traits::{Header as HeaderT, NumberFor}, + BuildStorage, DigestItem, Justifications, Storage, }; use substrate_test_runtime_client::{runtime::Header, ClientExt}; -use crate::{beefy_protocol_name, keystore::tests::Keyring as BeefyKeyring, notification::*}; +use crate::{ + beefy_block_import_and_links, beefy_protocol_name, justification::*, + keystore::tests::Keyring as BeefyKeyring, BeefyRPCLinks, BeefyVoterLinks, +}; pub(crate) const BEEFY_PROTOCOL_NAME: &'static str = "/beefy/1"; const GOOD_MMR_ROOT: MmrRootHash = MmrRootHash::repeat_byte(0xbf); const BAD_MMR_ROOT: MmrRootHash = MmrRootHash::repeat_byte(0x42); +type BeefyBlockImport = crate::BeefyBlockImport< + Block, + substrate_test_runtime_client::Backend, + two_validators::TestApi, + BlockImportAdapter>, +>; + pub(crate) type BeefyValidatorSet = ValidatorSet; -pub(crate) type BeefyPeer = Peer; +pub(crate) type BeefyPeer = Peer; #[derive(Debug, Serialize, Deserialize)] struct Genesis(std::collections::BTreeMap); @@ -97,17 +114,10 @@ fn beefy_protocol_name() { assert_eq!(proto_name.to_string(), expected); } -// TODO: compiler warns us about unused `signed_commitment_stream`, will use in later tests -#[allow(dead_code)] -#[derive(Clone)] -pub(crate) struct BeefyLinkHalf { - pub signed_commitment_stream: BeefySignedCommitmentStream, - pub beefy_best_block_stream: BeefyBestBlockStream, -} - #[derive(Default)] pub(crate) struct PeerData { - pub(crate) beefy_link_half: Mutex>, + pub(crate) beefy_rpc_links: Mutex>>, + pub(crate) beefy_voter_links: Mutex>>, } #[derive(Default)] @@ -163,7 +173,7 @@ impl BeefyTestNet { impl TestNetFactory for BeefyTestNet { type Verifier = PassThroughVerifier; - type BlockImport = PeersClient; + type BlockImport = BeefyBlockImport; type PeerData = PeerData; fn make_verifier(&self, _client: PeersClient, _: &PeerData) -> Self::Verifier { @@ -178,7 +188,17 @@ impl TestNetFactory for BeefyTestNet { Option>, Self::PeerData, ) { - (client.as_block_import(), None, PeerData::default()) + let inner = BlockImportAdapter::new(client.clone()); + let (block_import, voter_links, rpc_links) = beefy_block_import_and_links( + inner, + client.as_backend(), + Arc::new(two_validators::TestApi {}), + ); + let peer_data = PeerData { + beefy_rpc_links: Mutex::new(Some(rpc_links)), + beefy_voter_links: Mutex::new(Some(voter_links)), + }; + (BlockImportAdapter::new(block_import), None, peer_data) } fn peer(&mut self, i: usize) -> &mut BeefyPeer { @@ -333,12 +353,12 @@ where let keystore = create_beefy_keystore(*key); - let (signed_commitment_sender, signed_commitment_stream) = - BeefySignedCommitmentStream::::channel(); - let (beefy_best_block_sender, beefy_best_block_stream) = - BeefyBestBlockStream::::channel(); - let beefy_link_half = BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream }; - *peer.data.beefy_link_half.lock() = Some(beefy_link_half); + let (_, _, peer_data) = net.make_block_import(peer.client().clone()); + let PeerData { beefy_rpc_links, beefy_voter_links } = peer_data; + + let beefy_voter_links = beefy_voter_links.lock().take(); + *peer.data.beefy_rpc_links.lock() = beefy_rpc_links.lock().take(); + *peer.data.beefy_voter_links.lock() = beefy_voter_links.clone(); let beefy_params = crate::BeefyParams { client: peer.client().as_client(), @@ -346,8 +366,7 @@ where runtime: api.clone(), key_store: Some(keystore), network: peer.network_service().clone(), - signed_commitment_sender, - beefy_best_block_sender, + links: beefy_voter_links.unwrap(), min_block_delta, prometheus_registry: None, protocol_name: BEEFY_PROTOCOL_NAME.into(), @@ -382,11 +401,11 @@ pub(crate) fn get_beefy_streams( let mut best_block_streams = Vec::new(); let mut signed_commitment_streams = Vec::new(); for peer_id in 0..peers.len() { - let beefy_link_half = - net.peer(peer_id).data.beefy_link_half.lock().as_ref().unwrap().clone(); - let BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream } = beefy_link_half; - best_block_streams.push(beefy_best_block_stream.subscribe()); - signed_commitment_streams.push(signed_commitment_stream.subscribe()); + let beefy_rpc_links = net.peer(peer_id).data.beefy_rpc_links.lock().clone().unwrap(); + let BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream } = + beefy_rpc_links; + best_block_streams.push(from_voter_best_beefy_stream.subscribe()); + signed_commitment_streams.push(from_voter_justif_stream.subscribe()); } (best_block_streams, signed_commitment_streams) } @@ -670,3 +689,123 @@ fn correct_beefy_payload() { wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[11]); wait_for_beefy_signed_commitments(signed_commitments, &net, &mut runtime, &[11]); } + +#[test] +fn beefy_importing_blocks() { + use futures::{executor::block_on, future::poll_fn, task::Poll}; + use sc_block_builder::BlockBuilderProvider; + use sc_client_api::BlockBackend; + + sp_tracing::try_init_simple(); + + let mut net = BeefyTestNet::new(2, 0); + + let client = net.peer(0).client().clone(); + let (mut block_import, _, peer_data) = net.make_block_import(client.clone()); + let PeerData { beefy_rpc_links: _, beefy_voter_links } = peer_data; + let justif_stream = beefy_voter_links.lock().take().unwrap().from_block_import_justif_stream; + + let params = |block: Block, justifications: Option| { + let mut import = BlockImportParams::new(BlockOrigin::File, block.header); + import.justifications = justifications; + import.body = Some(block.extrinsics); + import.finalized = true; + import.fork_choice = Some(ForkChoiceStrategy::LongestChain); + import + }; + + let full_client = client.as_client(); + let parent_id = BlockId::Number(0); + let block_id = BlockId::Number(1); + let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); + let block = builder.build().unwrap().block; + + // Import without justifications. + let mut justif_recv = justif_stream.subscribe(); + assert_eq!( + block_on(block_import.import_block(params(block.clone(), None), HashMap::new())).unwrap(), + ImportResult::Imported(ImportedAux { is_new_best: true, ..Default::default() }), + ); + assert_eq!( + block_on(block_import.import_block(params(block, None), HashMap::new())).unwrap(), + ImportResult::AlreadyInChain + ); + // Verify no justifications present: + { + // none in backend, + assert!(full_client.justifications(&block_id).unwrap().is_none()); + // and none sent to BEEFY worker. + block_on(poll_fn(move |cx| { + assert_eq!(justif_recv.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + } + + // Import with valid justification. + let parent_id = BlockId::Number(1); + let block_num = 2; + let keys = &[BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let proof = crate::justification::tests::new_signed_commitment(block_num, &validator_set, keys); + let versioned_proof: VersionedFinalityProof, Signature> = proof.into(); + let encoded = versioned_proof.encode(); + let justif = Some(Justifications::from((BEEFY_ENGINE_ID, encoded))); + + let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); + let block = builder.build().unwrap().block; + let mut justif_recv = justif_stream.subscribe(); + assert_eq!( + block_on(block_import.import_block(params(block, justif), HashMap::new())).unwrap(), + ImportResult::Imported(ImportedAux { + bad_justification: false, + is_new_best: true, + ..Default::default() + }), + ); + // Verify justification successfully imported: + { + // available in backend, + assert!(full_client.justifications(&BlockId::Number(block_num)).unwrap().is_some()); + // and also sent to BEEFY worker. + block_on(poll_fn(move |cx| { + match justif_recv.poll_next_unpin(cx) { + Poll::Ready(Some(_justification)) => (), + v => panic!("unexpected value: {:?}", v), + } + Poll::Ready(()) + })); + } + + // Import with invalid justification (incorrect validator set). + let parent_id = BlockId::Number(2); + let block_num = 3; + let keys = &[BeefyKeyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); + let proof = crate::justification::tests::new_signed_commitment(block_num, &validator_set, keys); + let versioned_proof: VersionedFinalityProof, Signature> = proof.into(); + let encoded = versioned_proof.encode(); + let justif = Some(Justifications::from((BEEFY_ENGINE_ID, encoded))); + + let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); + let block = builder.build().unwrap().block; + let mut justif_recv = justif_stream.subscribe(); + assert_eq!( + block_on(block_import.import_block(params(block, justif), HashMap::new())).unwrap(), + ImportResult::Imported(ImportedAux { + // Still `false` because we don't want to fail import on bad BEEFY justifications. + bad_justification: false, + is_new_best: true, + ..Default::default() + }), + ); + // Verify bad justifications was not imported: + { + // none in backend, + assert!(full_client.justifications(&block_id).unwrap().is_none()); + // and none sent to BEEFY worker. + block_on(poll_fn(move |cx| { + assert_eq!(justif_recv.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + } +} diff --git a/substrate/client/beefy/src/worker.rs b/substrate/client/beefy/src/worker.rs index dccf7ba750..3bff0822eb 100644 --- a/substrate/client/beefy/src/worker.rs +++ b/substrate/client/beefy/src/worker.rs @@ -17,11 +17,10 @@ // along with this program. If not, see . use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, VecDeque}, fmt::Debug, marker::PhantomData, sync::Arc, - time::Duration, }; use codec::{Codec, Decode, Encode}; @@ -48,57 +47,174 @@ use beefy_primitives::{ }; use crate::{ - error, + error::Error, gossip::{topic, GossipValidator}, + justification::BeefySignedCommitment, keystore::BeefyKeystore, metric_inc, metric_set, metrics::Metrics, - notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}, round::Rounds, - Client, + BeefyVoterLinks, Client, }; +enum RoundAction { + Drop, + Process, + Enqueue, +} + +/// Responsible for the voting strategy. +/// It chooses which incoming votes to accept and which votes to generate. +struct VoterOracle { + /// Queue of known sessions. Keeps track of voting rounds (block numbers) within each session. + /// + /// There are three voter states coresponding to three queue states: + /// 1. voter uninitialized: queue empty, + /// 2. up-to-date - all mandatory blocks leading up to current GRANDPA finalized: + /// queue has ONE element, the 'current session' where `mandatory_done == true`, + /// 3. lagging behind GRANDPA: queue has [1, N] elements, where all `mandatory_done == false`. + /// In this state, everytime a session gets its mandatory block BEEFY finalized, it's + /// popped off the queue, eventually getting to state `2. up-to-date`. + sessions: VecDeque>, + /// Min delta in block numbers between two blocks, BEEFY should vote on. + min_block_delta: u32, +} + +impl VoterOracle { + pub fn new(min_block_delta: u32) -> Self { + Self { + sessions: VecDeque::new(), + // Always target at least one block better than current best beefy. + min_block_delta: min_block_delta.max(1), + } + } + + /// Return mutable reference to rounds pertaining to first session in the queue. + /// Voting will always happen at the head of the queue. + pub fn rounds_mut(&mut self) -> Option<&mut Rounds> { + self.sessions.front_mut() + } + + /// Add new observed session to the Oracle. + pub fn add_session(&mut self, rounds: Rounds) { + self.sessions.push_back(rounds); + self.try_prune(); + } + + /// Prune the queue to keep the Oracle in one of the expected three states. + /// + /// Call this function on each BEEFY finality, + /// or at the very least on each BEEFY mandatory block finality. + pub fn try_prune(&mut self) { + if self.sessions.len() > 1 { + // when there's multiple sessions, only keep the `!mandatory_done()` ones. + self.sessions.retain(|s| !s.mandatory_done()) + } + } + + /// Return `(A, B)` tuple representing inclusive [A, B] interval of votes to accept. + pub fn accepted_interval( + &self, + best_grandpa: NumberFor, + ) -> Result<(NumberFor, NumberFor), Error> { + let rounds = self.sessions.front().ok_or(Error::UninitSession)?; + + if rounds.mandatory_done() { + // There's only one session active and its mandatory is done. + // Accept any GRANDPA finalized vote. + Ok((rounds.session_start(), best_grandpa.into())) + } else { + // There's at least one session with mandatory not done. + // Only accept votes for the mandatory block in the front of queue. + Ok((rounds.session_start(), rounds.session_start())) + } + } + + /// Utility function to quickly decide what to do for each round. + pub fn triage_round( + &self, + round: NumberFor, + best_grandpa: NumberFor, + ) -> Result { + let (start, end) = self.accepted_interval(best_grandpa)?; + if start <= round && round <= end { + Ok(RoundAction::Process) + } else if round > end { + Ok(RoundAction::Enqueue) + } else { + Ok(RoundAction::Drop) + } + } + + /// Return `Some(number)` if we should be voting on block `number`, + /// return `None` if there is no block we should vote on. + pub fn voting_target( + &self, + best_beefy: Option>, + best_grandpa: NumberFor, + ) -> Option> { + let rounds = if let Some(r) = self.sessions.front() { + r + } else { + debug!(target: "beefy", "🥩 No voting round started"); + return None + }; + + // `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`. + let target = + vote_target(best_grandpa, best_beefy, rounds.session_start(), self.min_block_delta); + trace!( + target: "beefy", + "🥩 best beefy: #{:?}, best finalized: #{:?}, current_vote_target: {:?}", + best_beefy, + best_grandpa, + target + ); + target + } +} + pub(crate) struct WorkerParams { pub client: Arc, pub backend: Arc, pub runtime: Arc, + pub sync_oracle: SO, pub key_store: BeefyKeystore, - pub signed_commitment_sender: BeefySignedCommitmentSender, - pub beefy_best_block_sender: BeefyBestBlockSender, pub gossip_engine: GossipEngine, pub gossip_validator: Arc>, - pub min_block_delta: u32, + pub links: BeefyVoterLinks, pub metrics: Option, - pub sync_oracle: SO, + pub min_block_delta: u32, } /// A BEEFY worker plays the BEEFY protocol pub(crate) struct BeefyWorker { + // utilities client: Arc, backend: Arc, runtime: Arc, + sync_oracle: SO, key_store: BeefyKeystore, - signed_commitment_sender: BeefySignedCommitmentSender, gossip_engine: GossipEngine, gossip_validator: Arc>, - /// Min delta in block numbers between two blocks, BEEFY should vote on - min_block_delta: u32, + + // channels + /// Links between the block importer, the background voter and the RPC layer. + links: BeefyVoterLinks, + + // voter state + /// BEEFY client metrics. metrics: Option, - rounds: Option>, - /// Buffer holding votes for blocks that the client hasn't seen finality for. - pending_votes: BTreeMap, Vec, AuthorityId, Signature>>>, - /// Best block we received a GRANDPA notification for + /// Best block we received a GRANDPA finality for. best_grandpa_block_header: ::Header, - /// Best block a BEEFY voting round has been concluded for + /// Best block a BEEFY voting round has been concluded for. best_beefy_block: Option>, - /// Used to keep RPC worker up to date on latest/best beefy - beefy_best_block_sender: BeefyBestBlockSender, - /// Validator set id for the last signed commitment - last_signed_id: u64, - /// Handle to the sync oracle - sync_oracle: SO, - // keep rustc happy - _backend: PhantomData, + /// Buffer holding votes for future processing. + pending_votes: BTreeMap, Vec, AuthorityId, Signature>>>, + /// Buffer holding justifications for future processing. + pending_justifications: BTreeMap, Vec>>, + /// Chooses which incoming votes to accept and which votes to generate. + voting_oracle: VoterOracle, } impl BeefyWorker @@ -122,13 +238,12 @@ where backend, runtime, key_store, - signed_commitment_sender, - beefy_best_block_sender, + sync_oracle, gossip_engine, gossip_validator, - min_block_delta, + links, metrics, - sync_oracle, + min_block_delta, } = worker_params; let last_finalized_header = client @@ -139,53 +254,29 @@ where client: client.clone(), backend, runtime, + sync_oracle, key_store, - signed_commitment_sender, gossip_engine, gossip_validator, - // always target at least one block better than current best beefy - min_block_delta: min_block_delta.max(1), + links, metrics, - rounds: None, - pending_votes: BTreeMap::new(), best_grandpa_block_header: last_finalized_header, best_beefy_block: None, - last_signed_id: 0, - beefy_best_block_sender, - sync_oracle, - _backend: PhantomData, + pending_votes: BTreeMap::new(), + pending_justifications: BTreeMap::new(), + voting_oracle: VoterOracle::new(min_block_delta), } } - /// Return `Some(number)` if we should be voting on block `number` now, - /// return `None` if there is no block we should vote on now. - fn current_vote_target(&self) -> Option> { - let rounds = if let Some(r) = &self.rounds { - r - } else { - debug!(target: "beefy", "🥩 No voting round started"); - return None - }; - - let best_finalized = *self.best_grandpa_block_header.number(); - // `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`. - let target = vote_target( - best_finalized, - self.best_beefy_block, - *rounds.session_start(), - self.min_block_delta, - ); - trace!( - target: "beefy", - "🥩 best beefy: #{:?}, best finalized: #{:?}, current_vote_target: {:?}", - self.best_beefy_block, - best_finalized, - target - ); - if let Some(target) = &target { - metric_set!(self, beefy_should_vote_on, target); - } - target + /// Simple wrapper that gets MMR root from header digests or from client state. + fn get_mmr_root_digest(&self, header: &B::Header) -> Option { + find_mmr_root_digest::(header).or_else(|| { + self.runtime + .runtime_api() + .mmr_root(&BlockId::hash(header.hash())) + .ok() + .and_then(|r| r.ok()) + }) } /// Verify `active` validator set for `block` against the key store @@ -200,7 +291,7 @@ where &self, block: &NumberFor, active: &ValidatorSet, - ) -> Result<(), error::Error> { + ) -> Result<(), Error> { let active: BTreeSet<&AuthorityId> = active.validators().iter().collect(); let public_keys = self.key_store.public_keys()?; @@ -209,150 +300,121 @@ where if store.intersection(&active).count() == 0 { let msg = "no authority public key found in store".to_string(); debug!(target: "beefy", "🥩 for block {:?} {}", block, msg); - Err(error::Error::Keystore(msg)) + Err(Error::Keystore(msg)) } else { Ok(()) } } - /// Set best BEEFY block to `block_num`. - /// - /// Also sends/updates the best BEEFY block hash to the RPC worker. - fn set_best_beefy_block(&mut self, block_num: NumberFor) { - if Some(block_num) > self.best_beefy_block { - // Try to get block hash ourselves. - let block_hash = match self.client.hash(block_num) { - Ok(h) => h, - Err(e) => { - error!(target: "beefy", "🥩 Failed to get hash for block number {}: {}", - block_num, e); - None - }, - }; - // Update RPC worker with new best BEEFY block hash. - block_hash.map(|hash| { - self.beefy_best_block_sender - .notify(|| Ok::<_, ()>(hash)) - .expect("forwards closure result; the closure always returns Ok; qed.") - }); - // Set new best BEEFY block number. - self.best_beefy_block = Some(block_num); - metric_set!(self, beefy_best_block, block_num); - } else { - debug!(target: "beefy", "🥩 Can't set best beefy to older: {}", block_num); - } - } - /// Handle session changes by starting new voting round for mandatory blocks. fn init_session_at( &mut self, - active: ValidatorSet, + validator_set: ValidatorSet, new_session_start: NumberFor, ) { - debug!(target: "beefy", "🥩 New active validator set: {:?}", active); - metric_set!(self, beefy_validator_set_id, active.id()); - // BEEFY should produce a signed commitment for each session - if active.id() != self.last_signed_id + 1 && - active.id() != GENESIS_AUTHORITY_SET_ID && - self.last_signed_id != 0 - { - debug!( - target: "beefy", "🥩 Detected skipped session: active-id {:?}, last-signed-id {:?}", - active.id(), - self.last_signed_id, - ); - metric_inc!(self, beefy_skipped_sessions); + debug!(target: "beefy", "🥩 New active validator set: {:?}", validator_set); + metric_set!(self, beefy_validator_set_id, validator_set.id()); + + // BEEFY should produce the mandatory block of each session. + if let Some(active_session) = self.voting_oracle.rounds_mut() { + if !active_session.mandatory_done() { + debug!( + target: "beefy", "🥩 New session {} while active session {} is still lagging.", + validator_set.id(), + active_session.validator_set_id(), + ); + metric_inc!(self, beefy_lagging_sessions); + } } if log_enabled!(target: "beefy", log::Level::Debug) { // verify the new validator set - only do it if we're also logging the warning - let _ = self.verify_validator_set(&new_session_start, &active); + let _ = self.verify_validator_set(&new_session_start, &validator_set); } - let id = active.id(); - self.rounds = Some(Rounds::new(new_session_start, active)); + let id = validator_set.id(); + self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set)); info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, new_session_start); } fn handle_finality_notification(&mut self, notification: &FinalityNotification) { debug!(target: "beefy", "🥩 Finality notification: {:?}", notification); - let number = *notification.header.number(); + let header = ¬ification.header; - // On start-up ignore old finality notifications that we're not interested in. - if number <= *self.best_grandpa_block_header.number() { - debug!(target: "beefy", "🥩 Got unexpected finality for old block #{:?}", number); - return - } + if *header.number() > *self.best_grandpa_block_header.number() { + // update best GRANDPA finalized block we have seen + self.best_grandpa_block_header = header.clone(); - // update best GRANDPA finalized block we have seen - self.best_grandpa_block_header = notification.header.clone(); - - self.handle_finality(¬ification.header); - } - - fn handle_finality(&mut self, header: &B::Header) { - // Check for and handle potential new session. - if let Some(new_validator_set) = find_authorities_change::(header) { - self.init_session_at(new_validator_set, *header.number()); - } - - // Handle any pending votes for now finalized blocks. - self.check_pending_votes(); - - // Vote if there's now a new vote target. - if let Some(target_number) = self.current_vote_target() { - self.do_vote(target_number); - } - } - - // Handles all buffered votes for now finalized blocks. - fn check_pending_votes(&mut self) { - let not_finalized = self.best_grandpa_block_header.number().saturating_add(1u32.into()); - let still_pending = self.pending_votes.split_off(¬_finalized); - let votes_to_handle = std::mem::replace(&mut self.pending_votes, still_pending); - for (num, votes) in votes_to_handle.into_iter() { - if Some(num) > self.best_beefy_block { - debug!(target: "beefy", "🥩 Handling buffered votes for now GRANDPA finalized block: {:?}.", num); - for v in votes.into_iter() { - self.handle_vote( - (v.commitment.payload, v.commitment.block_number), - (v.id, v.signature), - false, - ); - } - } else { - debug!(target: "beefy", "🥩 Dropping outdated buffered votes for now BEEFY finalized block: {:?}.", num); + // Check for and enqueue potential new session. + if let Some(new_validator_set) = find_authorities_change::(header) { + self.init_session_at(new_validator_set, *header.number()); + // TODO: when adding SYNC protocol, fire up a request for justification for this + // mandatory block here. } } } + /// Based on [VoterOracle] this vote is either processed here or enqueued for later. + fn triage_incoming_vote( + &mut self, + vote: VoteMessage, AuthorityId, Signature>, + ) -> Result<(), Error> { + let block_num = vote.commitment.block_number; + let best_grandpa = *self.best_grandpa_block_header.number(); + match self.voting_oracle.triage_round(block_num, best_grandpa)? { + RoundAction::Process => self.handle_vote( + (vote.commitment.payload, vote.commitment.block_number), + (vote.id, vote.signature), + false, + )?, + RoundAction::Enqueue => { + debug!(target: "beefy", "🥩 Buffer vote for round: {:?}.", block_num); + self.pending_votes.entry(block_num).or_default().push(vote) + }, + RoundAction::Drop => (), + }; + Ok(()) + } + + /// Based on [VoterOracle] this justification is either processed here or enqueued for later. + /// + /// Expects `justification` to be valid. + fn triage_incoming_justif( + &mut self, + justification: BeefySignedCommitment, + ) -> Result<(), Error> { + let block_num = justification.commitment.block_number; + let best_grandpa = *self.best_grandpa_block_header.number(); + match self.voting_oracle.triage_round(block_num, best_grandpa)? { + RoundAction::Process => self.finalize(justification), + RoundAction::Enqueue => { + debug!(target: "beefy", "🥩 Buffer justification for round: {:?}.", block_num); + self.pending_justifications.entry(block_num).or_default().push(justification) + }, + RoundAction::Drop => (), + }; + Ok(()) + } + fn handle_vote( &mut self, round: (Payload, NumberFor), vote: (AuthorityId, Signature), self_vote: bool, - ) { + ) -> Result<(), Error> { self.gossip_validator.note_round(round.1); - let rounds = if let Some(rounds) = self.rounds.as_mut() { - rounds - } else { - debug!(target: "beefy", "🥩 Missing validator set - can't handle vote {:?}", vote); - return - }; + let rounds = self.voting_oracle.rounds_mut().ok_or(Error::UninitSession)?; if rounds.add_vote(&round, vote, self_vote) { if let Some(signatures) = rounds.try_conclude(&round) { self.gossip_validator.conclude_round(round.1); - // id is stored for skipped session metric calculation - self.last_signed_id = rounds.validator_set_id(); - let block_num = round.1; let commitment = Commitment { payload: round.0, block_number: block_num, - validator_set_id: self.last_signed_id, + validator_set_id: rounds.validator_set_id(), }; let signed_commitment = SignedCommitment { commitment, signatures }; @@ -370,24 +432,115 @@ where ) { debug!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, signed_commitment); } - self.signed_commitment_sender - .notify(|| Ok::<_, ()>(signed_commitment)) - .expect("forwards closure result; the closure always returns Ok; qed."); - self.set_best_beefy_block(block_num); + // We created the `signed_commitment` and know to be valid. + self.finalize(signed_commitment); + } + } + Ok(()) + } - // Vote if there's now a new vote target. - if let Some(target_number) = self.current_vote_target() { - self.do_vote(target_number); + /// Provide BEEFY finality for block based on `signed_commitment`: + /// 1. Prune irrelevant past sessions from the oracle, + /// 2. Set BEEFY best block, + /// 3. Send best block hash and `signed_commitment` to RPC worker. + /// + /// Expects `signed commitment` to be valid. + fn finalize(&mut self, signed_commitment: BeefySignedCommitment) { + // Prune any now "finalized" sessions from queue. + self.voting_oracle.try_prune(); + + let block_num = signed_commitment.commitment.block_number; + if Some(block_num) > self.best_beefy_block { + // Set new best BEEFY block number. + self.best_beefy_block = Some(block_num); + metric_set!(self, beefy_best_block, block_num); + + self.client.hash(block_num).ok().flatten().map(|hash| { + self.links + .to_rpc_best_block_sender + .notify(|| Ok::<_, ()>(hash)) + .expect("forwards closure result; the closure always returns Ok; qed.") + }); + + self.links + .to_rpc_justif_sender + .notify(|| Ok::<_, ()>(signed_commitment)) + .expect("forwards closure result; the closure always returns Ok; qed."); + } else { + debug!(target: "beefy", "🥩 Can't set best beefy to older: {}", block_num); + } + } + + /// Handle previously buffered justifications and votes that now land in the voting interval. + fn try_pending_justif_and_votes(&mut self) -> Result<(), Error> { + let best_grandpa = *self.best_grandpa_block_header.number(); + let _ph = PhantomData::::default(); + + fn to_process_for( + pending: &mut BTreeMap, Vec>, + (start, end): (NumberFor, NumberFor), + _: PhantomData, + ) -> BTreeMap, Vec> { + // These are still pending. + let still_pending = pending.split_off(&end.saturating_add(1u32.into())); + // These can be processed. + let to_handle = pending.split_off(&start); + // The rest can be dropped. + *pending = still_pending; + // Return ones to process. + to_handle + } + + // Process pending justifications. + let interval = self.voting_oracle.accepted_interval(best_grandpa)?; + if !self.pending_justifications.is_empty() { + let justifs_to_handle = to_process_for(&mut self.pending_justifications, interval, _ph); + for (num, justifications) in justifs_to_handle.into_iter() { + debug!(target: "beefy", "🥩 Handle buffered justifications for: {:?}.", num); + for justif in justifications.into_iter() { + self.finalize(justif); } } } + + // Process pending votes. + let interval = self.voting_oracle.accepted_interval(best_grandpa)?; + if !self.pending_votes.is_empty() { + let votes_to_handle = to_process_for(&mut self.pending_votes, interval, _ph); + for (num, votes) in votes_to_handle.into_iter() { + debug!(target: "beefy", "🥩 Handle buffered votes for: {:?}.", num); + for v in votes.into_iter() { + if let Err(err) = self.handle_vote( + (v.commitment.payload, v.commitment.block_number), + (v.id, v.signature), + false, + ) { + error!(target: "beefy", "🥩 Error handling buffered vote: {}", err); + }; + } + } + } + Ok(()) + } + + /// Decide if should vote, then vote.. or don't.. + fn try_to_vote(&mut self) -> Result<(), Error> { + // Vote if there's now a new vote target. + if let Some(target) = self + .voting_oracle + .voting_target(self.best_beefy_block, *self.best_grandpa_block_header.number()) + { + metric_set!(self, beefy_should_vote_on, target); + self.do_vote(target)?; + } + Ok(()) } /// Create and gossip Signed Commitment for block number `target_number`. /// /// Also handle this self vote by calling `self.handle_vote()` for it. - fn do_vote(&mut self, target_number: NumberFor) { + fn do_vote(&mut self, target_number: NumberFor) -> Result<(), Error> { debug!(target: "beefy", "🥩 Try voting on {}", target_number); // Most of the time we get here, `target` is actually `best_grandpa`, @@ -395,18 +548,13 @@ where let target_header = if target_number == *self.best_grandpa_block_header.number() { self.best_grandpa_block_header.clone() } else { - match self.client.expect_header(BlockId::Number(target_number)) { - Ok(h) => h, - Err(err) => { - debug!( - target: "beefy", - "🥩 Could not get header for block #{:?} (error: {:?}), skipping vote..", - target_number, - err - ); - return - }, - } + self.client.expect_header(BlockId::Number(target_number)).map_err(|err| { + let err_msg = format!( + "Couldn't get header for block #{:?} (error: {:?}), skipping vote..", + target_number, err + ); + Error::Backend(err_msg) + })? }; let target_hash = target_header.hash(); @@ -414,26 +562,23 @@ where hash } else { warn!(target: "beefy", "🥩 No MMR root digest found for: {:?}", target_hash); - return + return Ok(()) }; let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, mmr_root.encode()); - let (validators, validator_set_id) = if let Some(rounds) = &self.rounds { - if !rounds.should_self_vote(&(payload.clone(), target_number)) { - debug!(target: "beefy", "🥩 Don't double vote for block number: {:?}", target_number); - return - } - (rounds.validators(), rounds.validator_set_id()) - } else { - debug!(target: "beefy", "🥩 Missing validator set - can't vote for: {:?}", target_hash); - return - }; + let rounds = self.voting_oracle.rounds_mut().ok_or(Error::UninitSession)?; + if !rounds.should_self_vote(&(payload.clone(), target_number)) { + debug!(target: "beefy", "🥩 Don't double vote for block number: {:?}", target_number); + return Ok(()) + } + let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); + let authority_id = if let Some(id) = self.key_store.authority_id(validators) { debug!(target: "beefy", "🥩 Local authority id: {:?}", id); id } else { debug!(target: "beefy", "🥩 Missing validator id - can't vote for: {:?}", target_hash); - return + return Ok(()) }; let commitment = Commitment { payload, block_number: target_number, validator_set_id }; @@ -443,7 +588,7 @@ where Ok(sig) => sig, Err(err) => { warn!(target: "beefy", "🥩 Error signing commitment: {:?}", err); - return + return Ok(()) }, }; @@ -462,13 +607,17 @@ where debug!(target: "beefy", "🥩 Sent vote message: {:?}", message); - self.handle_vote( + if let Err(err) = self.handle_vote( (message.commitment.payload, message.commitment.block_number), (message.id, message.signature), true, - ); + ) { + error!(target: "beefy", "🥩 Error handling self vote: {}", err); + } self.gossip_engine.gossip_message(topic::(), encoded_message, false); + + Ok(()) } /// Wait for BEEFY runtime pallet to be available. @@ -494,6 +643,9 @@ where // Once we'll implement 'initial sync' (catch-up), the worker will be able to // start voting right away. self.handle_finality_notification(¬if); + if let Err(err) = self.try_to_vote() { + debug!(target: "beefy", "🥩 {}", err); + } break } else { trace!(target: "beefy", "🥩 Finality notification: {:?}", notif); @@ -529,15 +681,14 @@ where }) .fuse(), ); + let mut block_import_justif = self.links.from_block_import_justif_stream.subscribe().fuse(); loop { - while self.sync_oracle.is_major_syncing() { - debug!(target: "beefy", "Waiting for major sync to complete..."); - futures_timer::Delay::new(Duration::from_secs(5)).await; - } - let mut gossip_engine = &mut self.gossip_engine; - futures::select! { + // Wait for, and handle external events. + // The branches below only change 'state', actual voting happen afterwards, + // based on the new resulting 'state'. + futures::select_biased! { notification = finality_notifications.next() => { if let Some(notification) = notification { self.handle_finality_notification(¬ification); @@ -545,24 +696,24 @@ where return; } }, + // TODO: when adding SYNC protocol, join the on-demand justifications stream to + // this one, and handle them both here. + justif = block_import_justif.next() => { + if let Some(justif) = justif { + // Block import justifications have already been verified to be valid + // by `BeefyBlockImport`. + if let Err(err) = self.triage_incoming_justif(justif) { + debug!(target: "beefy", "🥩 {}", err); + } + } else { + return; + } + }, vote = votes.next() => { if let Some(vote) = vote { - let block_num = vote.commitment.block_number; - if block_num > *self.best_grandpa_block_header.number() { - // Only handle votes for blocks we _know_ have been finalized. - // Buffer vote to be handled later. - debug!( - target: "beefy", - "🥩 Buffering vote for not (yet) finalized block: {:?}.", - block_num - ); - self.pending_votes.entry(block_num).or_default().push(vote); - } else { - self.handle_vote( - (vote.commitment.payload, vote.commitment.block_number), - (vote.id, vote.signature), - false - ); + // Votes have already been verified to be valid by the gossip validator. + if let Err(err) = self.triage_incoming_vote(vote) { + debug!(target: "beefy", "🥩 {}", err); } } else { return; @@ -573,18 +724,20 @@ where return; } } - } - } - /// Simple wrapper that gets MMR root from header digests or from client state. - fn get_mmr_root_digest(&self, header: &B::Header) -> Option { - find_mmr_root_digest::(header).or_else(|| { - self.runtime - .runtime_api() - .mmr_root(&BlockId::hash(header.hash())) - .ok() - .and_then(|r| r.ok()) - }) + // Don't bother acting on 'state' changes during major sync. + if !self.sync_oracle.is_major_syncing() { + // Handle pending justifications and/or votes for now GRANDPA finalized blocks. + if let Err(err) = self.try_pending_justif_and_votes() { + debug!(target: "beefy", "🥩 {}", err); + } + + // There were external events, 'state' is changed, author a vote if needed/possible. + if let Err(err) = self.try_to_vote() { + debug!(target: "beefy", "🥩 {}", err); + } + } + } } } @@ -684,11 +837,11 @@ pub(crate) mod tests { create_beefy_keystore, get_beefy_streams, make_beefy_ids, two_validators::TestApi, BeefyPeer, BeefyTestNet, BEEFY_PROTOCOL_NAME, }, + BeefyRPCLinks, }; use futures::{executor::block_on, future::poll_fn, task::Poll}; - use crate::tests::BeefyLinkHalf; use sc_client_api::HeaderBackend; use sc_network::NetworkService; use sc_network_test::{PeersFullClient, TestNetFactory}; @@ -705,12 +858,21 @@ pub(crate) mod tests { ) -> BeefyWorker>> { let keystore = create_beefy_keystore(*key); - let (signed_commitment_sender, signed_commitment_stream) = + let (to_rpc_justif_sender, from_voter_justif_stream) = BeefySignedCommitmentStream::::channel(); - let (beefy_best_block_sender, beefy_best_block_stream) = + let (to_rpc_best_block_sender, from_voter_best_beefy_stream) = BeefyBestBlockStream::::channel(); - let beefy_link_half = BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream }; - *peer.data.beefy_link_half.lock() = Some(beefy_link_half); + let (_, from_block_import_justif_stream) = BeefySignedCommitmentStream::::channel(); + + let beefy_rpc_links = + BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream }; + *peer.data.beefy_rpc_links.lock() = Some(beefy_rpc_links); + + let links = BeefyVoterLinks { + from_block_import_justif_stream, + to_rpc_justif_sender, + to_rpc_best_block_sender, + }; let api = Arc::new(TestApi {}); let network = peer.network_service().clone(); @@ -723,8 +885,7 @@ pub(crate) mod tests { backend: peer.client().as_backend(), runtime: api, key_store: Some(keystore).into(), - signed_commitment_sender, - beefy_best_block_sender, + links, gossip_engine, gossip_validator, min_block_delta, @@ -826,6 +987,107 @@ pub(crate) mod tests { assert_eq!(Some(1072), t); } + #[test] + fn should_vote_target() { + let mut oracle = VoterOracle::::new(1); + + // rounds not initialized -> should vote: `None` + assert_eq!(oracle.voting_target(None, 1), None); + + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + + oracle.add_session(Rounds::new(1, validator_set.clone())); + + // under min delta + oracle.min_block_delta = 4; + assert_eq!(oracle.voting_target(Some(1), 1), None); + assert_eq!(oracle.voting_target(Some(2), 5), None); + + // vote on min delta + assert_eq!(oracle.voting_target(Some(4), 9), Some(8)); + oracle.min_block_delta = 8; + assert_eq!(oracle.voting_target(Some(10), 18), Some(18)); + + // vote on power of two + oracle.min_block_delta = 1; + assert_eq!(oracle.voting_target(Some(1000), 1008), Some(1004)); + assert_eq!(oracle.voting_target(Some(1000), 1016), Some(1008)); + + // nothing new to vote on + assert_eq!(oracle.voting_target(Some(1000), 1000), None); + + // vote on mandatory + oracle.sessions.clear(); + oracle.add_session(Rounds::new(1000, validator_set.clone())); + assert_eq!(oracle.voting_target(None, 1008), Some(1000)); + oracle.sessions.clear(); + oracle.add_session(Rounds::new(1001, validator_set.clone())); + assert_eq!(oracle.voting_target(Some(1000), 1008), Some(1001)); + } + + #[test] + fn test_oracle_accepted_interval() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + + let mut oracle = VoterOracle::::new(1); + + // rounds not initialized -> should accept votes: `None` + assert!(oracle.accepted_interval(1).is_err()); + + let session_one = 1; + oracle.add_session(Rounds::new(session_one, validator_set.clone())); + // mandatory not done, only accept mandatory + for i in 0..15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_one, session_one))); + } + + // add more sessions, nothing changes + let session_two = 11; + let session_three = 21; + oracle.add_session(Rounds::new(session_two, validator_set.clone())); + oracle.add_session(Rounds::new(session_three, validator_set.clone())); + // mandatory not done, should accept mandatory for session_one + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_one, session_one))); + } + + // simulate finish mandatory for session one, prune oracle + oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true); + oracle.try_prune(); + // session_one pruned, should accept mandatory for session_two + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_two, session_two))); + } + + // simulate finish mandatory for session two, prune oracle + oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true); + oracle.try_prune(); + // session_two pruned, should accept mandatory for session_three + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_three, session_three))); + } + + // simulate finish mandatory for session three + oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true); + // verify all other blocks in this session are now open to voting + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_three, i))); + } + // pruning does nothing in this case + oracle.try_prune(); + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_three, i))); + } + + // adding new session automatically prunes "finalized" previous session + let session_four = 31; + oracle.add_session(Rounds::new(session_four, validator_set.clone())); + assert_eq!(oracle.sessions.front().unwrap().session_start(), session_four); + assert_eq!(oracle.accepted_interval(session_four + 10), Ok((session_four, session_four))); + } + #[test] fn extract_authorities_change_digest() { let mut header = Header::new( @@ -876,69 +1138,6 @@ pub(crate) mod tests { assert_eq!(extracted, Some(mmr_root_hash)); } - #[test] - fn should_vote_target() { - let keys = &[Keyring::Alice]; - let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut net = BeefyTestNet::new(1, 0); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); - - // rounds not initialized -> should vote: `None` - assert_eq!(worker.current_vote_target(), None); - - let set_up = |worker: &mut BeefyWorker< - Block, - Backend, - PeersFullClient, - TestApi, - Arc>, - >, - best_grandpa: u64, - best_beefy: Option, - session_start: u64, - min_delta: u32| { - let grandpa_header = Header::new( - best_grandpa, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - ); - worker.best_grandpa_block_header = grandpa_header; - worker.best_beefy_block = best_beefy; - worker.min_block_delta = min_delta; - worker.rounds = Some(Rounds::new(session_start, validator_set.clone())); - }; - - // under min delta - set_up(&mut worker, 1, Some(1), 1, 4); - assert_eq!(worker.current_vote_target(), None); - set_up(&mut worker, 5, Some(2), 1, 4); - assert_eq!(worker.current_vote_target(), None); - - // vote on min delta - set_up(&mut worker, 9, Some(4), 1, 4); - assert_eq!(worker.current_vote_target(), Some(8)); - set_up(&mut worker, 18, Some(10), 1, 8); - assert_eq!(worker.current_vote_target(), Some(18)); - - // vote on power of two - set_up(&mut worker, 1008, Some(1000), 1, 1); - assert_eq!(worker.current_vote_target(), Some(1004)); - set_up(&mut worker, 1016, Some(1000), 1, 2); - assert_eq!(worker.current_vote_target(), Some(1008)); - - // nothing new to vote on - set_up(&mut worker, 1000, Some(1000), 1, 1); - assert_eq!(worker.current_vote_target(), None); - - // vote on mandatory - set_up(&mut worker, 1008, None, 1000, 8); - assert_eq!(worker.current_vote_target(), Some(1000)); - set_up(&mut worker, 1008, Some(1000), 1001, 8); - assert_eq!(worker.current_vote_target(), Some(1001)); - } - #[test] fn keystore_vs_validator_set() { let keys = &[Keyring::Alice]; @@ -953,39 +1152,57 @@ pub(crate) mod tests { let keys = &[Keyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let err_msg = "no authority public key found in store".to_string(); - let expected = Err(error::Error::Keystore(err_msg)); + let expected = Err(Error::Keystore(err_msg)); assert_eq!(worker.verify_validator_set(&1, &validator_set), expected); // worker has no keystore worker.key_store = None.into(); - let expected_err = Err(error::Error::Keystore("no Keystore".into())); + let expected_err = Err(Error::Keystore("no Keystore".into())); assert_eq!(worker.verify_validator_set(&1, &validator_set), expected_err); } #[test] - fn setting_best_beefy_block() { + fn test_finalize() { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let mut net = BeefyTestNet::new(1, 0); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); - let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let (mut best_block_streams, mut signed_commitments) = get_beefy_streams(&mut net, keys); let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); + let mut signed_commitments = signed_commitments.drain(..).next().unwrap(); - // no 'best beefy block' + let create_signed_commitment = |block_num: NumberFor| { + let commitment = Commitment { + payload: Payload::new(known_payload_ids::MMR_ROOT_ID, vec![]), + block_number: block_num, + validator_set_id: validator_set.id(), + }; + SignedCommitment { commitment, signatures: vec![None] } + }; + + // no 'best beefy block' or signed commitments assert_eq!(worker.best_beefy_block, None); block_on(poll_fn(move |cx| { assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + assert_eq!(signed_commitments.poll_next_unpin(cx), Poll::Pending); Poll::Ready(()) })); // unknown hash for block #1 - let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let (mut best_block_streams, mut signed_commitments) = get_beefy_streams(&mut net, keys); let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); - worker.set_best_beefy_block(1); + let mut signed_commitments = signed_commitments.drain(..).next().unwrap(); + let justif = create_signed_commitment(1); + worker.finalize(justif.clone()); assert_eq!(worker.best_beefy_block, Some(1)); block_on(poll_fn(move |cx| { assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + match signed_commitments.poll_next_unpin(cx) { + // expect justification + Poll::Ready(Some(received)) => assert_eq!(received, justif), + v => panic!("unexpected value: {:?}", v), + } Poll::Ready(()) })); @@ -994,7 +1211,8 @@ pub(crate) mod tests { let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); net.generate_blocks(2, 10, &validator_set, false); - worker.set_best_beefy_block(2); + let justif = create_signed_commitment(2); + worker.finalize(justif); assert_eq!(worker.best_beefy_block, Some(2)); block_on(poll_fn(move |cx| { match best_block_stream.poll_next_unpin(cx) { @@ -1010,20 +1228,17 @@ pub(crate) mod tests { } #[test] - fn setting_initial_session() { + fn should_init_session() { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let mut net = BeefyTestNet::new(1, 0); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); - assert!(worker.rounds.is_none()); + assert!(worker.voting_oracle.sessions.is_empty()); - // verify setting the correct validator sets and boundary for genesis session worker.init_session_at(validator_set.clone(), 1); - - let worker_rounds = worker.rounds.as_ref().unwrap(); - assert_eq!(worker_rounds.session_start(), &1); - // in genesis case both current and prev validator sets are the same + let worker_rounds = worker.voting_oracle.rounds_mut().unwrap(); + assert_eq!(worker_rounds.session_start(), 1); assert_eq!(worker_rounds.validators(), validator_set.validators()); assert_eq!(worker_rounds.validator_set_id(), validator_set.id()); @@ -1031,12 +1246,79 @@ pub(crate) mod tests { let keys = &[Keyring::Bob]; let new_validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); - // verify setting the correct validator sets and boundary for non-genesis session worker.init_session_at(new_validator_set.clone(), 11); + // Since mandatory is not done for old rounds, we still get those. + let rounds = worker.voting_oracle.rounds_mut().unwrap(); + assert_eq!(rounds.validator_set_id(), validator_set.id()); + // Let's finalize mandatory. + rounds.test_set_mandatory_done(true); + worker.voting_oracle.try_prune(); + // Now we should get the next round. + let rounds = worker.voting_oracle.rounds_mut().unwrap(); + // Expect new values. + assert_eq!(rounds.session_start(), 11); + assert_eq!(rounds.validators(), new_validator_set.validators()); + assert_eq!(rounds.validator_set_id(), new_validator_set.id()); + } - let worker_rounds = worker.rounds.as_ref().unwrap(); - assert_eq!(worker_rounds.session_start(), &11); - assert_eq!(worker_rounds.validators(), new_validator_set.validators()); - assert_eq!(worker_rounds.validator_set_id(), new_validator_set.id()); + #[test] + fn should_triage_votes_and_process_later() { + let keys = &[Keyring::Alice, Keyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + fn new_vote( + block_number: NumberFor, + ) -> VoteMessage, AuthorityId, Signature> { + let commitment = Commitment { + payload: Payload::new(*b"BF", vec![]), + block_number, + validator_set_id: 0, + }; + VoteMessage { + commitment, + id: Keyring::Alice.public(), + signature: Keyring::Alice.sign(b"I am committed"), + } + } + + // best grandpa is 20 + let best_grandpa_header = Header::new( + 20u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + + worker.voting_oracle.add_session(Rounds::new(10, validator_set.clone())); + worker.best_grandpa_block_header = best_grandpa_header; + + // triage votes for blocks 10..13 + worker.triage_incoming_vote(new_vote(10)).unwrap(); + worker.triage_incoming_vote(new_vote(11)).unwrap(); + worker.triage_incoming_vote(new_vote(12)).unwrap(); + // triage votes for blocks 20..23 + worker.triage_incoming_vote(new_vote(20)).unwrap(); + worker.triage_incoming_vote(new_vote(21)).unwrap(); + worker.triage_incoming_vote(new_vote(22)).unwrap(); + + // vote for 10 should have been handled, while the rest buffered for later processing + let mut votes = worker.pending_votes.values(); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 11); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 12); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 20); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); + assert!(votes.next().is_none()); + + // simulate mandatory done, and retry buffered votes + worker.voting_oracle.rounds_mut().unwrap().test_set_mandatory_done(true); + worker.try_pending_justif_and_votes().unwrap(); + // all blocks <= grandpa finalized should have been handled, rest still buffered + let mut votes = worker.pending_votes.values(); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); } } diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index d7c83810d5..c0e1e9f0e9 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -25,6 +25,7 @@ mod sync; use std::{ borrow::Cow, collections::HashMap, + marker::PhantomData, pin::Pin, sync::Arc, task::{Context as FutureContext, Poll}, @@ -567,25 +568,27 @@ impl BlockImportAdapterFull for T where /// This is required as the `TestNetFactory` trait does not distinguish between /// full and light nodes. #[derive(Clone)] -pub struct BlockImportAdapter { +pub struct BlockImportAdapter { inner: I, + _phantom: PhantomData, } -impl BlockImportAdapter { +impl BlockImportAdapter { /// Create a new instance of `Self::Full`. pub fn new(inner: I) -> Self { - Self { inner } + Self { inner, _phantom: PhantomData } } } #[async_trait::async_trait] -impl BlockImport for BlockImportAdapter +impl BlockImport for BlockImportAdapter where I: BlockImport + Send + Sync, I::Transaction: Send, + Transaction: Send + 'static, { type Error = ConsensusError; - type Transaction = (); + type Transaction = Transaction; async fn check_block( &mut self, @@ -596,7 +599,7 @@ where async fn import_block( &mut self, - block: BlockImportParams, + block: BlockImportParams, cache: HashMap>, ) -> Result { self.inner.import_block(block.clear_storage_changes_and_mutate(), cache).await diff --git a/substrate/primitives/beefy/src/commitment.rs b/substrate/primitives/beefy/src/commitment.rs index ed392139de..ddf58474e7 100644 --- a/substrate/primitives/beefy/src/commitment.rs +++ b/substrate/primitives/beefy/src/commitment.rs @@ -293,6 +293,12 @@ pub enum VersionedFinalityProof { V1(SignedCommitment), } +impl From> for VersionedFinalityProof { + fn from(commitment: SignedCommitment) -> Self { + VersionedFinalityProof::V1(commitment) + } +} + #[cfg(test)] mod tests { diff --git a/substrate/primitives/runtime/src/lib.rs b/substrate/primitives/runtime/src/lib.rs index b41c605d8a..bf77c08b76 100644 --- a/substrate/primitives/runtime/src/lib.rs +++ b/substrate/primitives/runtime/src/lib.rs @@ -151,6 +151,11 @@ impl Justifications { self.iter().find(|j| j.0 == engine_id).map(|j| &j.1) } + /// Remove the encoded justification for the given consensus engine, if it exists. + pub fn remove(&mut self, engine_id: ConsensusEngineId) { + self.0.retain(|j| j.0 != engine_id) + } + /// Return a copy of the encoded justification for the given consensus /// engine, if it exists. pub fn into_justification(self, engine_id: ConsensusEngineId) -> Option {