grandpa: rewrite warp sync proof generation (#8148)

* grandpa: use AuthoritySetChanges to generate warp sync proof

* node: init grandpa warp sync protocol

* grandpa: iterator for AuthoritySetChanges

* grandpa: rewrite warp sync proof generation

* grandpa: remove old code for warp sync generation

* grandpa: fix indentation

* grandpa: fix off by one

* grandpa: use binary search to find start idx when generating warp sync proof

* grandpa: add method to verify warp sync proofs

* grandpa: remove unnecessary code to skip authority set changes

* grandpa: add test for warp sync proof generation and verification

* grandpa: add missing docs

* grandpa: remove trailing comma
This commit is contained in:
André Silva
2021-02-25 08:44:51 +00:00
committed by GitHub
parent 8a0e8ea9a6
commit 94c29ff666
10 changed files with 463 additions and 538 deletions
+7
View File
@@ -7254,18 +7254,25 @@ name = "sc-finality-grandpa-warp-sync"
version = "0.9.0"
dependencies = [
"derive_more",
"finality-grandpa",
"futures 0.3.12",
"log",
"num-traits",
"parity-scale-codec",
"parking_lot 0.11.1",
"prost",
"rand 0.8.3",
"sc-block-builder",
"sc-client-api",
"sc-finality-grandpa",
"sc-network",
"sc-service",
"sp-blockchain",
"sp-consensus",
"sp-finality-grandpa",
"sp-keyring",
"sp-runtime",
"substrate-test-runtime-client",
]
[[package]]
+8 -3
View File
@@ -197,9 +197,14 @@ pub fn new_full_base(
config.network.extra_sets.push(grandpa::grandpa_peers_set_config());
#[cfg(feature = "cli")]
config.network.request_response_protocols.push(sc_finality_grandpa_warp_sync::request_response_config_for_chain(
&config, task_manager.spawn_handle(), backend.clone(),
));
config.network.request_response_protocols.push(
sc_finality_grandpa_warp_sync::request_response_config_for_chain(
&config,
task_manager.spawn_handle(),
backend.clone(),
import_setup.1.shared_authority_set().clone(),
)
);
let (network, network_status_sinks, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
@@ -12,16 +12,25 @@ repository = "https://github.com/paritytech/substrate/"
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
sc-network = { version = "0.9.0", path = "../network" }
sc-finality-grandpa = { version = "0.9.0", path = "../finality-grandpa" }
sp-runtime = { version = "3.0.0", path = "../../primitives/runtime" }
sp-blockchain = { version = "3.0.0", path = "../../primitives/blockchain" }
sc-client-api = { version = "3.0.0", path = "../api" }
sc-service = { version = "0.9.0", path = "../service" }
codec = { package = "parity-scale-codec", version = "2.0.0" }
derive_more = "0.99.11"
futures = "0.3.8"
log = "0.4.11"
derive_more = "0.99.11"
codec = { package = "parity-scale-codec", version = "2.0.0" }
prost = "0.7"
num-traits = "0.2.14"
parking_lot = "0.11.1"
prost = "0.7"
sc-client-api = { version = "3.0.0", path = "../api" }
sc-finality-grandpa = { version = "0.9.0", path = "../finality-grandpa" }
sc-network = { version = "0.9.0", path = "../network" }
sc-service = { version = "0.9.0", path = "../service" }
sp-blockchain = { version = "3.0.0", path = "../../primitives/blockchain" }
sp-finality-grandpa = { version = "3.0.0", path = "../../primitives/finality-grandpa" }
sp-runtime = { version = "3.0.0", path = "../../primitives/runtime" }
[dev-dependencies]
finality-grandpa = { version = "0.14.0" }
rand = "0.8"
sc-block-builder = { version = "0.9.0", path = "../block-builder" }
sp-consensus = { version = "0.9.0", path = "../../primitives/consensus/common" }
sp-keyring = { version = "3.0.0", path = "../../primitives/keyring" }
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
@@ -16,7 +16,7 @@
//! Helper for handling (i.e. answering) grandpa warp sync requests from a remote peer.
use codec::Decode;
use codec::{Decode, Encode};
use sc_network::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig};
use sc_client_api::Backend;
use sp_runtime::traits::NumberFor;
@@ -27,13 +27,18 @@ use sp_runtime::traits::Block as BlockT;
use std::time::Duration;
use std::sync::Arc;
use sc_service::{SpawnTaskHandle, config::{Configuration, Role}};
use sc_finality_grandpa::WarpSyncFragmentCache;
use sc_finality_grandpa::SharedAuthoritySet;
mod proof;
pub use proof::{AuthoritySetChangeProof, WarpSyncProof};
/// Generates the appropriate [`RequestResponseConfig`] for a given chain configuration.
pub fn request_response_config_for_chain<TBlock: BlockT, TBackend: Backend<TBlock> + 'static>(
config: &Configuration,
spawn_handle: SpawnTaskHandle,
backend: Arc<TBackend>,
authority_set: SharedAuthoritySet<TBlock::Hash, NumberFor<TBlock>>,
) -> RequestResponseConfig
where NumberFor<TBlock>: sc_finality_grandpa::BlockNumberOps,
{
@@ -47,6 +52,7 @@ pub fn request_response_config_for_chain<TBlock: BlockT, TBackend: Backend<TBloc
let (handler, request_response_config) = GrandpaWarpSyncRequestHandler::new(
protocol_id.clone(),
backend.clone(),
authority_set,
);
spawn_handle.spawn("grandpa_warp_sync_request_handler", handler.run());
request_response_config
@@ -75,38 +81,40 @@ fn generate_protocol_name(protocol_id: ProtocolId) -> String {
s
}
#[derive(codec::Decode)]
#[derive(Decode)]
struct Request<B: BlockT> {
begin: B::Hash
begin: B::Hash,
}
/// Setting a large fragment limit, allowing client
/// to define it is possible.
const WARP_SYNC_FRAGMENTS_LIMIT: usize = 100;
/// Number of item with justification in warp sync cache.
/// This should be customizable, but setting it to the max number of fragments
/// we return seems like a good idea until then.
const WARP_SYNC_CACHE_SIZE: usize = WARP_SYNC_FRAGMENTS_LIMIT;
/// Handler for incoming grandpa warp sync requests from a remote peer.
pub struct GrandpaWarpSyncRequestHandler<TBackend, TBlock: BlockT> {
backend: Arc<TBackend>,
cache: Arc<parking_lot::RwLock<WarpSyncFragmentCache<TBlock::Header>>>,
authority_set: SharedAuthoritySet<TBlock::Hash, NumberFor<TBlock>>,
request_receiver: mpsc::Receiver<IncomingRequest>,
_phantom: std::marker::PhantomData<TBlock>
_phantom: std::marker::PhantomData<TBlock>,
}
impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TBackend, TBlock> {
/// Create a new [`GrandpaWarpSyncRequestHandler`].
pub fn new(protocol_id: ProtocolId, backend: Arc<TBackend>) -> (Self, RequestResponseConfig) {
pub fn new(
protocol_id: ProtocolId,
backend: Arc<TBackend>,
authority_set: SharedAuthoritySet<TBlock::Hash, NumberFor<TBlock>>,
) -> (Self, RequestResponseConfig) {
let (tx, request_receiver) = mpsc::channel(20);
let mut request_response_config = generate_request_response_config(protocol_id);
request_response_config.inbound_queue = Some(tx);
let cache = Arc::new(parking_lot::RwLock::new(WarpSyncFragmentCache::new(WARP_SYNC_CACHE_SIZE)));
(Self { backend, request_receiver, cache, _phantom: std::marker::PhantomData }, request_response_config)
(
Self {
backend,
request_receiver,
_phantom: std::marker::PhantomData,
authority_set,
},
request_response_config,
)
}
fn handle_request(
@@ -118,13 +126,14 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
{
let request = Request::<TBlock>::decode(&mut &payload[..])?;
let mut cache = self.cache.write();
let response = sc_finality_grandpa::prove_warp_sync(
self.backend.blockchain(), request.begin, Some(WARP_SYNC_FRAGMENTS_LIMIT), Some(&mut cache)
let proof = WarpSyncProof::generate(
self.backend.blockchain(),
request.begin,
&self.authority_set.authority_set_changes(),
)?;
pending_response.send(OutgoingResponse {
result: Ok(response),
result: Ok(proof.encode()),
reputation_changes: Vec::new(),
}).map_err(|_| HandleRequestError::SendResponse)
}
@@ -148,8 +157,8 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
}
}
#[derive(derive_more::Display, derive_more::From)]
enum HandleRequestError {
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum HandleRequestError {
#[display(fmt = "Failed to decode request: {}.", _0)]
DecodeProto(prost::DecodeError),
#[display(fmt = "Failed to encode response: {}.", _0)]
@@ -157,6 +166,10 @@ enum HandleRequestError {
#[display(fmt = "Failed to decode block hash: {}.", _0)]
DecodeScale(codec::Error),
Client(sp_blockchain::Error),
#[from(ignore)]
InvalidRequest(String),
#[from(ignore)]
InvalidProof(String),
#[display(fmt = "Failed to send response.")]
SendResponse,
}
@@ -0,0 +1,298 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use codec::{Decode, Encode};
use sc_finality_grandpa::{
find_scheduled_change, AuthoritySetChanges, BlockNumberOps, GrandpaJustification,
};
use sp_blockchain::Backend as BlockchainBackend;
use sp_finality_grandpa::{AuthorityList, SetId};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor},
};
use crate::HandleRequestError;
/// The maximum number of authority set change proofs to include in a single warp sync proof.
const MAX_CHANGES_PER_WARP_SYNC_PROOF: usize = 256;
/// A proof of an authority set change.
#[derive(Decode, Encode)]
pub struct AuthoritySetChangeProof<Block: BlockT> {
/// The last block that the given authority set finalized. This block should contain a digest
/// signaling an authority set change from which we can fetch the next authority set.
pub header: Block::Header,
/// A justification for the header above which proves its finality. In order to validate it the
/// verifier must be aware of the authorities and set id for which the justification refers to.
pub justification: GrandpaJustification<Block>,
}
/// An accumulated proof of multiple authority set changes.
#[derive(Decode, Encode)]
pub struct WarpSyncProof<Block: BlockT> {
proofs: Vec<AuthoritySetChangeProof<Block>>,
}
impl<Block: BlockT> WarpSyncProof<Block> {
/// Generates a warp sync proof starting at the given block. It will generate authority set
/// change proofs for all changes that happened from `begin` until the current authority set
/// (capped by MAX_CHANGES_PER_WARP_SYNC_PROOF).
pub fn generate<Backend>(
backend: &Backend,
begin: Block::Hash,
set_changes: &AuthoritySetChanges<NumberFor<Block>>,
) -> Result<WarpSyncProof<Block>, HandleRequestError>
where
Backend: BlockchainBackend<Block>,
{
// TODO: cache best response (i.e. the one with lowest begin_number)
let begin_number = backend
.block_number_from_id(&BlockId::Hash(begin))?
.ok_or_else(|| HandleRequestError::InvalidRequest("Missing start block".to_string()))?;
if begin_number > backend.info().finalized_number {
return Err(HandleRequestError::InvalidRequest(
"Start block is not finalized".to_string(),
));
}
let canon_hash = backend.hash(begin_number)?.expect(
"begin number is lower than finalized number; \
all blocks below finalized number must have been imported; \
qed.",
);
if canon_hash != begin {
return Err(HandleRequestError::InvalidRequest(
"Start block is not in the finalized chain".to_string(),
));
}
let mut proofs = Vec::new();
for (_, last_block) in set_changes.iter_from(begin_number) {
if proofs.len() >= MAX_CHANGES_PER_WARP_SYNC_PROOF {
break;
}
let header = backend.header(BlockId::Number(*last_block))?.expect(
"header number comes from previously applied set changes; must exist in db; qed.",
);
// the last block in a set is the one that triggers a change to the next set,
// therefore the block must have a digest that signals the authority set change
if find_scheduled_change::<Block>(&header).is_none() {
// if it doesn't contain a signal for standard change then the set must have changed
// through a forced changed, in which case we stop collecting proofs as the chain of
// trust in authority handoffs was broken.
break;
}
let justification = backend.justification(BlockId::Number(*last_block))?.expect(
"header is last in set and contains standard change signal; \
must have justification; \
qed.",
);
let justification = GrandpaJustification::<Block>::decode(&mut &justification[..])?;
proofs.push(AuthoritySetChangeProof {
header: header.clone(),
justification,
});
}
Ok(WarpSyncProof { proofs })
}
/// Verifies the warp sync proof starting at the given set id and with the given authorities.
/// If the proof is valid the new set id and authorities is returned.
pub fn verify(
&self,
set_id: SetId,
authorities: AuthorityList,
) -> Result<(SetId, AuthorityList), HandleRequestError>
where
NumberFor<Block>: BlockNumberOps,
{
let mut current_set_id = set_id;
let mut current_authorities = authorities;
for proof in &self.proofs {
proof
.justification
.verify(current_set_id, &current_authorities)
.map_err(|err| HandleRequestError::InvalidProof(err.to_string()))?;
let scheduled_change = find_scheduled_change::<Block>(&proof.header).ok_or(
HandleRequestError::InvalidProof(
"Header is missing authority set change digest".to_string(),
),
)?;
current_authorities = scheduled_change.next_authorities;
current_set_id += 1;
}
Ok((current_set_id, current_authorities))
}
}
#[cfg(test)]
mod tests {
use crate::WarpSyncProof;
use codec::Encode;
use rand::prelude::*;
use sc_block_builder::BlockBuilderProvider;
use sc_client_api::Backend;
use sc_finality_grandpa::{AuthoritySetChanges, GrandpaJustification};
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_keyring::Ed25519Keyring;
use sp_runtime::{generic::BlockId, traits::Header as _};
use std::sync::Arc;
use substrate_test_runtime_client::{
ClientBlockImportExt, ClientExt, DefaultTestClientBuilderExt, TestClientBuilder,
TestClientBuilderExt,
};
#[test]
fn warp_sync_proof_generate_verify() {
let mut rng = rand::rngs::StdRng::from_seed([0; 32]);
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut client = Arc::new(builder.build());
let available_authorities = Ed25519Keyring::iter().collect::<Vec<_>>();
let genesis_authorities = vec![(Ed25519Keyring::Alice.public().into(), 1)];
let mut current_authorities = vec![Ed25519Keyring::Alice];
let mut current_set_id = 0;
let mut authority_set_changes = Vec::new();
for n in 1..=100 {
let mut block = client
.new_block(Default::default())
.unwrap()
.build()
.unwrap()
.block;
let mut new_authorities = None;
// we will trigger an authority set change every 10 blocks
if n != 0 && n % 10 == 0 {
// pick next authorities and add digest for the set change
let n_authorities = rng.gen_range(1..available_authorities.len());
let next_authorities = available_authorities
.choose_multiple(&mut rng, n_authorities)
.cloned()
.collect::<Vec<_>>();
new_authorities = Some(next_authorities.clone());
let next_authorities = next_authorities
.iter()
.map(|keyring| (keyring.public().into(), 1))
.collect::<Vec<_>>();
let digest = sp_runtime::generic::DigestItem::Consensus(
sp_finality_grandpa::GRANDPA_ENGINE_ID,
sp_finality_grandpa::ConsensusLog::ScheduledChange(
sp_finality_grandpa::ScheduledChange {
delay: 0u64,
next_authorities,
},
)
.encode(),
);
block.header.digest_mut().logs.push(digest);
}
client.import(BlockOrigin::Own, block).unwrap();
if let Some(new_authorities) = new_authorities {
// generate a justification for this block, finalize it and note the authority set
// change
let (target_hash, target_number) = {
let info = client.info();
(info.best_hash, info.best_number)
};
let mut precommits = Vec::new();
for keyring in &current_authorities {
let precommit = finality_grandpa::Precommit {
target_hash,
target_number,
};
let msg = finality_grandpa::Message::Precommit(precommit.clone());
let encoded = sp_finality_grandpa::localized_payload(42, current_set_id, &msg);
let signature = keyring.sign(&encoded[..]).into();
let precommit = finality_grandpa::SignedPrecommit {
precommit,
signature,
id: keyring.public().into(),
};
precommits.push(precommit);
}
let commit = finality_grandpa::Commit {
target_hash,
target_number,
precommits,
};
let justification = GrandpaJustification::from_commit(&client, 42, commit).unwrap();
client
.finalize_block(BlockId::Hash(target_hash), Some(justification.encode()))
.unwrap();
authority_set_changes.push((current_set_id, n));
current_set_id += 1;
current_authorities = new_authorities;
}
}
let authority_set_changes = AuthoritySetChanges::from(authority_set_changes);
// generate a warp sync proof
let genesis_hash = client.hash(0).unwrap().unwrap();
let warp_sync_proof =
WarpSyncProof::generate(backend.blockchain(), genesis_hash, &authority_set_changes)
.unwrap();
// verifying the proof should yield the last set id and authorities
let (new_set_id, new_authorities) = warp_sync_proof.verify(0, genesis_authorities).unwrap();
let expected_authorities = current_authorities
.iter()
.map(|keyring| (keyring.public().into(), 1))
.collect::<Vec<_>>();
assert_eq!(new_set_id, current_set_id);
assert_eq!(new_authorities, expected_authorities);
}
}
@@ -650,10 +650,17 @@ impl<H, N: Add<Output=N> + Clone> PendingChange<H, N> {
}
}
// Tracks historical authority set changes. We store the block numbers for the first block of each
// authority set, once they have been finalized.
/// Tracks historical authority set changes. We store the block numbers for the last block
/// of each authority set, once they have been finalized. These blocks are guaranteed to
/// have a justification unless they were triggered by a forced change.
#[derive(Debug, Encode, Decode, Clone, PartialEq)]
pub struct AuthoritySetChanges<N>(pub Vec<(u64, N)>);
pub struct AuthoritySetChanges<N>(Vec<(u64, N)>);
impl<N> From<Vec<(u64, N)>> for AuthoritySetChanges<N> {
fn from(changes: Vec<(u64, N)>) -> AuthoritySetChanges<N> {
AuthoritySetChanges(changes)
}
}
impl<N: Ord + Clone> AuthoritySetChanges<N> {
pub(crate) fn empty() -> Self {
@@ -668,6 +675,7 @@ impl<N: Ord + Clone> AuthoritySetChanges<N> {
let idx = self.0
.binary_search_by_key(&block_number, |(_, n)| n.clone())
.unwrap_or_else(|b| b);
if idx < self.0.len() {
let (set_id, block_number) = self.0[idx].clone();
// To make sure we have the right set we need to check that the one before it also exists.
@@ -687,6 +695,19 @@ impl<N: Ord + Clone> AuthoritySetChanges<N> {
None
}
}
/// Returns an iterator over all historical authority set changes starting at the given block
/// number (excluded). The iterator yields a tuple representing the set id and the block number
/// of the last block in that set.
pub fn iter_from(&self, block_number: N) -> impl Iterator<Item = &(u64, N)> {
let idx = self.0.binary_search_by_key(&block_number, |(_, n)| n.clone())
// if there was a change at the given block number then we should start on the next
// index since we want to exclude the current block number
.map(|n| n + 1)
.unwrap_or_else(|b| b);
self.0[idx..].iter()
}
}
#[cfg(test)]
@@ -1627,4 +1648,32 @@ mod tests {
assert_eq!(authority_set_changes.get_set_id(42), Some((3, 81)));
assert_eq!(authority_set_changes.get_set_id(141), None);
}
#[test]
fn iter_from_works() {
let mut authority_set_changes = AuthoritySetChanges::empty();
authority_set_changes.append(1, 41);
authority_set_changes.append(2, 81);
authority_set_changes.append(3, 121);
assert_eq!(
vec![(1, 41), (2, 81), (3, 121)],
authority_set_changes.iter_from(40).cloned().collect::<Vec<_>>(),
);
assert_eq!(
vec![(2, 81), (3, 121)],
authority_set_changes.iter_from(41).cloned().collect::<Vec<_>>(),
);
assert_eq!(
0,
authority_set_changes.iter_from(121).count(),
);
assert_eq!(
0,
authority_set_changes.iter_from(200).count(),
);
}
}
@@ -44,10 +44,10 @@ use parity_scale_codec::{Encode, Decode};
use sp_blockchain::{Backend as BlockchainBackend, Error as ClientError, Result as ClientResult};
use sp_runtime::{
Justification, generic::BlockId,
traits::{NumberFor, Block as BlockT, Header as HeaderT, Zero, One},
traits::{NumberFor, Block as BlockT, Header as HeaderT, One},
};
use sc_client_api::backend::Backend;
use sp_finality_grandpa::{AuthorityId, AuthorityList};
use sp_finality_grandpa::AuthorityId;
use crate::authorities::AuthoritySetChanges;
use crate::justification::GrandpaJustification;
@@ -151,23 +151,6 @@ pub enum FinalityProofError {
Client(sp_blockchain::Error),
}
/// Single fragment of authority set proof.
///
/// Finality for block B is proved by providing:
/// 1) headers of this block;
/// 2) the justification for the block containing a authority set change digest;
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
pub(crate) struct AuthoritySetProofFragment<Header: HeaderT> {
/// The header of the given block.
pub header: Header,
/// Justification of the block F.
pub justification: Vec<u8>,
}
/// Proof of authority set is the ordered set of authority set fragments, where:
/// - last fragment match target block.
type AuthoritySetProof<Header> = Vec<AuthoritySetProofFragment<Header>>;
fn prove_finality<Block, B, J>(
blockchain: &B,
authority_set_changes: AuthoritySetChanges<NumberFor<Block>>,
@@ -242,238 +225,6 @@ where
))
}
/// Prepare authority proof for the best possible block starting at a given trusted block.
///
/// Started block should be in range of bonding duration.
/// We only return proof for finalized blocks (with justification).
///
/// It is assumed that the caller already have a proof-of-finality for the block 'begin'.
pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
blockchain: &B,
begin: Block::Hash,
max_fragment_limit: Option<usize>,
mut cache: Option<&mut WarpSyncFragmentCache<Block::Header>>,
) -> ::sp_blockchain::Result<Vec<u8>> {
let begin = BlockId::Hash(begin);
let begin_number = blockchain.block_number_from_id(&begin)?
.ok_or_else(|| ClientError::Backend("Missing start block".to_string()))?;
let end = BlockId::Hash(blockchain.last_finalized()?);
let end_number = blockchain.block_number_from_id(&end)?
// This error should not happen, we could also panic.
.ok_or_else(|| ClientError::Backend("Missing last finalized block".to_string()))?;
if begin_number > end_number {
return Err(ClientError::Backend("Unfinalized start for authority proof".to_string()));
}
let mut result = Vec::new();
let mut last_apply = None;
let header = blockchain.expect_header(begin)?;
let mut index = *header.number();
// Find previous change in case there is a delay.
// This operation is a costy and only for the delay corner case.
while index > Zero::zero() {
index = index - One::one();
if let Some((fragment, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if last_apply.map(|next| &next > header.number()).unwrap_or(false) {
result.push(fragment);
last_apply = Some(apply_block);
} else {
break;
}
}
}
let mut index = *header.number();
while index <= end_number {
if max_fragment_limit.map(|limit| result.len() >= limit).unwrap_or(false) {
break;
}
if let Some((fragement, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if last_apply.map(|next| apply_block < next).unwrap_or(false) {
// Previous delayed will not apply, do not include it.
result.pop();
}
result.push(fragement);
last_apply = Some(apply_block);
}
index = index + One::one();
}
let at_limit = max_fragment_limit.map(|limit| result.len() >= limit).unwrap_or(false);
// add last finalized block if reached and not already included.
if !at_limit && result.last().as_ref().map(|head| head.header.number()) != Some(&end_number) {
let header = blockchain.expect_header(end)?;
if let Some(justification) = blockchain.justification(BlockId::Number(end_number.clone()))? {
result.push(AuthoritySetProofFragment {
header: header.clone(),
justification,
});
} else {
// no justification, don't include it.
}
}
Ok(result.encode())
}
/// Try get a warp sync proof fragment a a given finalized block.
fn get_warp_sync_proof_fragment<Block: BlockT, B: BlockchainBackend<Block>>(
blockchain: &B,
index: NumberFor<Block>,
cache: &mut Option<&mut WarpSyncFragmentCache<Block::Header>>,
) -> sp_blockchain::Result<Option<(AuthoritySetProofFragment<Block::Header>, NumberFor<Block>)>> {
if let Some(cache) = cache.as_mut() {
if let Some(result) = cache.get_item(index) {
return Ok(result);
}
}
let mut result = None;
let header = blockchain.expect_header(BlockId::number(index))?;
if let Some((block_number, sp_finality_grandpa::ScheduledChange {
next_authorities: _,
delay,
})) = crate::import::find_forced_change::<Block>(&header) {
let dest = block_number + delay;
if let Some(justification) = blockchain.justification(BlockId::Number(index.clone()))? {
result = Some((AuthoritySetProofFragment {
header: header.clone(),
justification,
}, dest));
} else {
return Err(ClientError::Backend("Unjustified block with authority set change".to_string()));
}
}
if let Some(sp_finality_grandpa::ScheduledChange {
next_authorities: _,
delay,
}) = crate::import::find_scheduled_change::<Block>(&header) {
let dest = index + delay;
if let Some(justification) = blockchain.justification(BlockId::Number(index.clone()))? {
result = Some((AuthoritySetProofFragment {
header: header.clone(),
justification,
}, dest));
} else {
return Err(ClientError::Backend("Unjustified block with authority set change".to_string()));
}
}
cache.as_mut().map(|cache| cache.new_item(index, result.clone()));
Ok(result)
}
/// Check GRANDPA authority change sequence to assert finality of a target block.
///
/// Returns the header of the target block.
#[allow(unused)]
pub(crate) fn check_warp_sync_proof<Block: BlockT, J>(
current_set_id: u64,
current_authorities: AuthorityList,
remote_proof: Vec<u8>,
) -> ClientResult<(Block::Header, u64, AuthorityList)>
where
NumberFor<Block>: BlockNumberOps,
J: Decode + ProvableJustification<Block::Header> + BlockJustification<Block::Header>,
{
// decode finality proof
let proof = AuthoritySetProof::<Block::Header>::decode(&mut &remote_proof[..])
.map_err(|_| ClientError::BadJustification("failed to decode authority proof".into()))?;
let last = proof.len() - 1;
let mut result = (current_set_id, current_authorities, NumberFor::<Block>::zero());
for (ix, fragment) in proof.into_iter().enumerate() {
let is_last = ix == last;
result = check_warp_sync_proof_fragment::<Block, J>(
result.0,
&result.1,
&result.2,
is_last,
&fragment,
)?;
if is_last {
return Ok((fragment.header, result.0, result.1))
}
}
// empty proof can't prove anything
return Err(ClientError::BadJustification("empty proof of authority".into()));
}
/// Check finality authority set sequence.
fn check_warp_sync_proof_fragment<Block: BlockT, J>(
current_set_id: u64,
current_authorities: &AuthorityList,
previous_checked_block: &NumberFor<Block>,
is_last: bool,
authorities_proof: &AuthoritySetProofFragment<Block::Header>,
) -> ClientResult<(u64, AuthorityList, NumberFor<Block>)>
where
NumberFor<Block>: BlockNumberOps,
J: Decode + ProvableJustification<Block::Header> + BlockJustification<Block::Header>,
{
let justification: J = Decode::decode(&mut authorities_proof.justification.as_slice())
.map_err(|_| ClientError::JustificationDecode)?;
justification.verify(current_set_id, &current_authorities)?;
// assert justification is for this header
if &justification.number() != authorities_proof.header.number()
|| justification.hash().as_ref() != authorities_proof.header.hash().as_ref() {
return Err(ClientError::Backend("Invalid authority warp proof, justification do not match header".to_string()));
}
if authorities_proof.header.number() <= previous_checked_block {
return Err(ClientError::Backend("Invalid authority warp proof".to_string()));
}
let current_block = authorities_proof.header.number();
let mut at_block = None;
if let Some(sp_finality_grandpa::ScheduledChange {
next_authorities,
delay,
}) = crate::import::find_scheduled_change::<Block>(&authorities_proof.header) {
let dest = *current_block + delay;
at_block = Some((dest, next_authorities));
}
if let Some((block_number, sp_finality_grandpa::ScheduledChange {
next_authorities,
delay,
})) = crate::import::find_forced_change::<Block>(&authorities_proof.header) {
let dest = block_number + delay;
at_block = Some((dest, next_authorities));
}
// Fragment without change only allowed for proof last block.
if at_block.is_none() && !is_last {
return Err(ClientError::Backend("Invalid authority warp proof".to_string()));
}
if let Some((at_block, next_authorities)) = at_block {
Ok((current_set_id + 1, next_authorities, at_block))
} else {
Ok((current_set_id, current_authorities.clone(), current_block.clone()))
}
}
/// Block info extracted from the justification.
pub(crate) trait BlockJustification<Header: HeaderT> {
/// Block number justified.
fn number(&self) -> Header::Number;
/// Block hash justified.
fn hash(&self) -> Header::Hash;
}
/// Check GRANDPA proof-of-finality for the given block.
///
/// Returns the vector of headers that MUST be validated + imported
@@ -483,7 +234,7 @@ pub(crate) trait BlockJustification<Header: HeaderT> {
#[cfg(test)]
fn check_finality_proof<Header: HeaderT, J>(
current_set_id: u64,
current_authorities: AuthorityList,
current_authorities: sp_finality_grandpa::AuthorityList,
remote_proof: Vec<u8>,
) -> ClientResult<FinalityProof<Header>>
where
@@ -529,70 +280,7 @@ where
ClientError::Consensus(sp_consensus::Error::InvalidAuthoritiesSet),
)?;
GrandpaJustification::verify(self, set_id, &authorities)
}
}
impl<Block: BlockT> BlockJustification<Block::Header> for GrandpaJustification<Block> {
fn number(&self) -> NumberFor<Block> {
self.commit.target_number.clone()
}
fn hash(&self) -> Block::Hash {
self.commit.target_hash.clone()
}
}
/// Simple cache for warp sync queries.
pub struct WarpSyncFragmentCache<Header: HeaderT> {
header_has_proof_fragment: std::collections::HashMap<Header::Number, bool>,
cache: linked_hash_map::LinkedHashMap<
Header::Number,
(AuthoritySetProofFragment<Header>, Header::Number),
>,
limit: usize,
}
impl<Header: HeaderT> WarpSyncFragmentCache<Header> {
/// Instantiate a new cache for the warp sync prover.
pub fn new(size: usize) -> Self {
WarpSyncFragmentCache {
header_has_proof_fragment: Default::default(),
cache: Default::default(),
limit: size,
}
}
fn new_item(
&mut self,
at: Header::Number,
item: Option<(AuthoritySetProofFragment<Header>, Header::Number)>,
) {
self.header_has_proof_fragment.insert(at, item.is_some());
if let Some(item) = item {
if self.cache.len() == self.limit {
self.pop_one();
}
self.cache.insert(at, item);
}
}
fn pop_one(&mut self) {
if let Some((header_number, _)) = self.cache.pop_front() {
self.header_has_proof_fragment.remove(&header_number);
}
}
fn get_item(
&mut self,
block: Header::Number,
) -> Option<Option<(AuthoritySetProofFragment<Header>, Header::Number)>> {
match self.header_has_proof_fragment.get(&block) {
Some(true) => Some(self.cache.get_refresh(&block).cloned()),
Some(false) => Some(None),
None => None
}
GrandpaJustification::verify_with_voter_set(self, set_id, &authorities)
}
}
@@ -624,15 +312,6 @@ pub(crate) mod tests {
#[derive(Debug, PartialEq, Encode, Decode)]
pub struct TestBlockJustification(TestJustification, u64, H256);
impl BlockJustification<Header> for TestBlockJustification {
fn number(&self) -> <Header as HeaderT>::Number {
self.1
}
fn hash(&self) -> <Header as HeaderT>::Hash {
self.2.clone()
}
}
impl ProvableJustification<Header> for TestBlockJustification {
fn verify(&self, set_id: u64, authorities: &[(AuthorityId, u64)]) -> ClientResult<()> {
self.0.verify(set_id, authorities)
@@ -826,161 +505,4 @@ pub(crate) mod tests {
}
);
}
#[test]
fn warp_sync_proof_encoding_decoding() {
fn test_blockchain(
nb_blocks: u64,
mut set_change: &[(u64, Vec<u8>)],
mut justifications: &[(u64, Vec<u8>)],
) -> (InMemoryBlockchain<Block>, Vec<H256>) {
let blockchain = InMemoryBlockchain::<Block>::new();
let mut hashes = Vec::<H256>::new();
let mut set_id = 0;
for i in 0..nb_blocks {
let mut set_id_next = set_id;
let mut header = header(i);
set_change.first()
.map(|j| if i == j.0 {
set_change = &set_change[1..];
let next_authorities: Vec<_> = j.1.iter().map(|i| (AuthorityId::from_slice(&[*i; 32]), 1u64)).collect();
set_id_next += 1;
header.digest_mut().logs.push(
sp_runtime::generic::DigestItem::Consensus(
sp_finality_grandpa::GRANDPA_ENGINE_ID,
sp_finality_grandpa::ConsensusLog::ScheduledChange(
sp_finality_grandpa::ScheduledChange { delay: 0u64, next_authorities }
).encode(),
));
});
if let Some(parent) = hashes.last() {
header.set_parent_hash(parent.clone());
}
let header_hash = header.hash();
let justification = justifications.first()
.and_then(|j| if i == j.0 {
justifications = &justifications[1..];
let authority = j.1.iter().map(|j|
(AuthorityId::from_slice(&[*j; 32]), 1u64)
).collect();
let justification = TestBlockJustification(
TestJustification((set_id, authority), vec![i as u8]),
i,
header_hash,
);
Some(justification.encode())
} else {
None
});
hashes.push(header_hash.clone());
set_id = set_id_next;
blockchain.insert(header_hash, header, justification, None, NewBlockState::Final)
.unwrap();
}
(blockchain, hashes)
}
let (blockchain, hashes) = test_blockchain(
7,
vec![(3, vec![9])].as_slice(),
vec![
(1, vec![1, 2, 3]),
(2, vec![1, 2, 3]),
(3, vec![1, 2, 3]),
(4, vec![9]),
(6, vec![9]),
].as_slice(),
);
// proof after set change
let mut cache = WarpSyncFragmentCache::new(5);
let proof_no_cache = prove_warp_sync(&blockchain, hashes[6], None, Some(&mut cache)).unwrap();
let proof = prove_warp_sync(&blockchain, hashes[6], None, Some(&mut cache)).unwrap();
assert_eq!(proof_no_cache, proof);
let initial_authorities: Vec<_> = [1u8, 2, 3].iter().map(|i|
(AuthorityId::from_slice(&[*i; 32]), 1u64)
).collect();
let authorities_next: Vec<_> = [9u8].iter().map(|i|
(AuthorityId::from_slice(&[*i; 32]), 1u64)
).collect();
assert!(check_warp_sync_proof::<Block, TestBlockJustification>(
0,
initial_authorities.clone(),
proof.clone(),
).is_err());
assert!(check_warp_sync_proof::<Block, TestBlockJustification>(
0,
authorities_next.clone(),
proof.clone(),
).is_err());
assert!(check_warp_sync_proof::<Block, TestBlockJustification>(
1,
initial_authorities.clone(),
proof.clone(),
).is_err());
let (
_header,
current_set_id,
current_set,
) = check_warp_sync_proof::<Block, TestBlockJustification>(
1,
authorities_next.clone(),
proof.clone(),
).unwrap();
assert_eq!(current_set_id, 1);
assert_eq!(current_set, authorities_next);
// proof before set change
let proof = prove_warp_sync(&blockchain, hashes[1], None, None).unwrap();
let (
_header,
current_set_id,
current_set,
) = check_warp_sync_proof::<Block, TestBlockJustification>(
0,
initial_authorities.clone(),
proof.clone(),
).unwrap();
assert_eq!(current_set_id, 1);
assert_eq!(current_set, authorities_next);
// two changes
let (blockchain, hashes) = test_blockchain(
13,
vec![(3, vec![7]), (8, vec![9])].as_slice(),
vec![
(1, vec![1, 2, 3]),
(2, vec![1, 2, 3]),
(3, vec![1, 2, 3]),
(4, vec![7]),
(6, vec![7]),
(8, vec![7]), // warning, requires a justification on change set
(10, vec![9]),
].as_slice(),
);
// proof before set change
let proof = prove_warp_sync(&blockchain, hashes[1], None, None).unwrap();
let (
_header,
current_set_id,
current_set,
) = check_warp_sync_proof::<Block, TestBlockJustification>(
0,
initial_authorities.clone(),
proof.clone(),
).unwrap();
assert_eq!(current_set_id, 2);
assert_eq!(current_set, authorities_next);
}
}
@@ -182,9 +182,11 @@ impl<'a, Block: 'a + BlockT> Drop for PendingSetChanges<'a, Block> {
}
}
pub(crate) fn find_scheduled_change<B: BlockT>(header: &B::Header)
-> Option<ScheduledChange<NumberFor<B>>>
{
/// Checks the given header for a consensus digest signalling a **standard** scheduled change and
/// extracts it.
pub fn find_scheduled_change<B: BlockT>(
header: &B::Header,
) -> Option<ScheduledChange<NumberFor<B>>> {
let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
@@ -197,9 +199,11 @@ pub(crate) fn find_scheduled_change<B: BlockT>(header: &B::Header)
header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
}
pub(crate) fn find_forced_change<B: BlockT>(header: &B::Header)
-> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)>
{
/// Checks the given header for a consensus digest signalling a **forced** scheduled change and
/// extracts it.
pub fn find_forced_change<B: BlockT>(
header: &B::Header,
) -> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)> {
let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
@@ -19,15 +19,16 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use finality_grandpa::{voter_set::VoterSet, Error as GrandpaError};
use parity_scale_codec::{Decode, Encode};
use sp_blockchain::{Error as ClientError, HeaderBackend};
use parity_scale_codec::{Encode, Decode};
use finality_grandpa::voter_set::VoterSet;
use finality_grandpa::{Error as GrandpaError};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{NumberFor, Block as BlockT, Header as HeaderT};
use sp_finality_grandpa::AuthorityId;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT, NumberFor},
};
use crate::{Commit, Error};
use crate::{AuthorityList, Commit, Error};
/// A GRANDPA justification for block finality, it includes a commit message and
/// an ancestry proof including all headers routing all precommit target blocks
@@ -105,12 +106,30 @@ impl<Block: BlockT> GrandpaJustification<Block> {
let msg = "invalid commit target in grandpa justification".to_string();
Err(ClientError::BadJustification(msg))
} else {
justification.verify(set_id, voters).map(|_| justification)
justification
.verify_with_voter_set(set_id, voters)
.map(|_| justification)
}
}
/// Validate the commit and the votes' ancestry proofs.
pub(crate) fn verify(&self, set_id: u64, voters: &VoterSet<AuthorityId>) -> Result<(), ClientError>
pub fn verify(&self, set_id: u64, authorities: &AuthorityList) -> Result<(), ClientError>
where
NumberFor<Block>: finality_grandpa::BlockNumberOps,
{
let voters = VoterSet::new(authorities.iter().cloned()).ok_or(ClientError::Consensus(
sp_consensus::Error::InvalidAuthoritiesSet,
))?;
self.verify_with_voter_set(set_id, &voters)
}
/// Validate the commit and the votes' ancestry proofs.
pub(crate) fn verify_with_voter_set(
&self,
set_id: u64,
voters: &VoterSet<AuthorityId>,
) -> Result<(), ClientError>
where
NumberFor<Block>: finality_grandpa::BlockNumberOps,
{
+2 -3
View File
@@ -121,17 +121,16 @@ mod observer;
mod until_imported;
mod voting_rule;
pub use authorities::{SharedAuthoritySet, AuthoritySet};
pub use authorities::{AuthoritySet, AuthoritySetChanges, SharedAuthoritySet};
pub use finality_proof::{FinalityProof, FinalityProofProvider, FinalityProofError};
pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream};
pub use import::GrandpaBlockImport;
pub use import::{find_scheduled_change, find_forced_change, GrandpaBlockImport};
pub use justification::GrandpaJustification;
pub use voting_rule::{
BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRuleResult,
VotingRulesBuilder,
};
pub use finality_grandpa::voter::report;
pub use finality_proof::{prove_warp_sync, WarpSyncFragmentCache};
use aux_schema::PersistentData;
use environment::{Environment, VoterSetState};