Grandpa warp sync request-response protocol (#7711)

* Made a start

* So the proof between authority set is phragmen one, this is crazy big,
or is there some signing of the result : that is the storage key, damn?

* ok getting from header digest seems doable.

* for testing

* get set id from storage directly (should use runtime to handler change).

* move test to init

* correct auth key

* fix iteration

* Correct proof content

* actually update block number.

* actually check last justif against its header

* justification relation to new authorities through header hash check is
needed here. This assumes the hash from header is calculated.

* Few changes

* Connected up cheme's branch

* Clean up

* Move things around a bit so that adding the grandpa warp sync request response protocol happens in the node code

* Nits

* Changes to comments

* Cheme changes

* Remove todos and test compile.

* Rename _authority_ related proof function to _warp_sync_ .

* Update client/grandpa-warp-sync/src/lib.rs

quick fix

* Put the warp sync request response protocol behind a feature flag because we dont' need it on a light client.

* Update client/grandpa-warp-sync/src/lib.rs

Quick fix

* Update Cargo.lock

* Adding test, comment on limitation related to 'delay', this could
be implemented but with a cost.

* Set between a delay override last fragment.

* Check for pending authority set change at start.

* adjust index

* custom cache is not a good idea.

* Use a simple cache instead.

* restore broken indentation

* Address crate rename

* Merge conflict badly resolved, sorry

Co-authored-by: cheme <emericchevalier.pro@gmail.com>
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Ashley
2021-01-21 18:14:07 +01:00
committed by GitHub
parent cd0ad4805d
commit 87cc216774
12 changed files with 770 additions and 33 deletions
@@ -52,7 +52,7 @@ use parity_scale_codec::{Encode, Decode};
use finality_grandpa::BlockNumberOps;
use sp_runtime::{
Justification, generic::BlockId,
traits::{NumberFor, Block as BlockT, Header as HeaderT, One},
traits::{NumberFor, Block as BlockT, Header as HeaderT, Zero, One},
};
use sp_core::storage::StorageKey;
use sc_telemetry::{telemetry, CONSENSUS_INFO};
@@ -247,6 +247,23 @@ pub struct FinalityProofFragment<Header: HeaderT> {
/// - all other fragments provide justifications for GRANDPA authorities set changes within requested range.
type FinalityProof<Header> = Vec<FinalityProofFragment<Header>>;
/// Single fragment of authority set proof.
///
/// Finality for block B is proved by providing:
/// 1) headers of this block;
/// 2) the justification for the block containing a authority set change digest;
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
pub(crate) struct AuthoritySetProofFragment<Header: HeaderT> {
/// The header of the given block.
pub header: Header,
/// Justification of the block F.
pub justification: Vec<u8>,
}
/// Proof of authority set is the ordered set of authority set fragments, where:
/// - last fragment match target block.
type AuthoritySetProof<Header> = Vec<AuthoritySetProofFragment<Header>>;
/// Finality proof request data.
#[derive(Debug, Encode, Decode)]
enum FinalityProofRequest<H: Encode + Decode> {
@@ -425,6 +442,133 @@ pub(crate) fn prove_finality<Block: BlockT, B: BlockchainBackend<Block>, J>(
}
}
/// Prepare authority proof for the best possible block starting at a given trusted block.
///
/// Started block should be in range of bonding duration.
/// We only return proof for finalized blocks (with justification).
///
/// It is assumed that the caller already have a proof-of-finality for the block 'begin'.
pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
blockchain: &B,
begin: Block::Hash,
max_fragment_limit: Option<usize>,
mut cache: Option<&mut WarpSyncFragmentCache<Block::Header>>,
) -> ::sp_blockchain::Result<Vec<u8>> {
let begin = BlockId::Hash(begin);
let begin_number = blockchain.block_number_from_id(&begin)?
.ok_or_else(|| ClientError::Backend("Missing start block".to_string()))?;
let end = BlockId::Hash(blockchain.last_finalized()?);
let end_number = blockchain.block_number_from_id(&end)?
// This error should not happen, we could also panic.
.ok_or_else(|| ClientError::Backend("Missing last finalized block".to_string()))?;
if begin_number > end_number {
return Err(ClientError::Backend("Unfinalized start for authority proof".to_string()));
}
let mut result = Vec::new();
let mut last_apply = None;
let header = blockchain.expect_header(begin)?;
let mut index = *header.number();
// Find previous change in case there is a delay.
// This operation is a costy and only for the delay corner case.
while index > Zero::zero() {
index = index - One::one();
if let Some((fragement, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if last_apply.map(|next| &next > header.number()).unwrap_or(false) {
result.push(fragement);
last_apply = Some(apply_block);
} else {
break;
}
}
}
let mut index = *header.number();
while index <= end_number {
if max_fragment_limit.map(|limit| result.len() <= limit).unwrap_or(false) {
break;
}
if let Some((fragement, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if last_apply.map(|next| apply_block < next).unwrap_or(false) {
// Previous delayed will not apply, do not include it.
result.pop();
}
result.push(fragement);
last_apply = Some(apply_block);
}
index = index + One::one();
}
if result.last().as_ref().map(|head| head.header.number()) != Some(&end_number) {
let header = blockchain.expect_header(end)?;
if let Some(justification) = blockchain.justification(BlockId::Number(end_number.clone()))? {
result.push(AuthoritySetProofFragment {
header: header.clone(),
justification,
});
} else {
// no justification, don't include it.
}
}
Ok(result.encode())
}
/// Try get a warp sync proof fragment a a given finalized block.
fn get_warp_sync_proof_fragment<Block: BlockT, B: BlockchainBackend<Block>>(
blockchain: &B,
index: NumberFor<Block>,
cache: &mut Option<&mut WarpSyncFragmentCache<Block::Header>>,
) -> sp_blockchain::Result<Option<(AuthoritySetProofFragment<Block::Header>, NumberFor<Block>)>> {
if let Some(cache) = cache.as_mut() {
if let Some(result) = cache.get_item(index) {
return Ok(result.clone());
}
}
let mut result = None;
let header = blockchain.expect_header(BlockId::number(index))?;
if let Some((block_number, sp_finality_grandpa::ScheduledChange {
next_authorities: _,
delay,
})) = crate::import::find_forced_change::<Block>(&header) {
let dest = block_number + delay;
if let Some(justification) = blockchain.justification(BlockId::Number(index.clone()))? {
result = Some((AuthoritySetProofFragment {
header: header.clone(),
justification,
}, dest));
} else {
return Err(ClientError::Backend("Unjustified block with authority set change".to_string()));
}
}
if let Some(sp_finality_grandpa::ScheduledChange {
next_authorities: _,
delay,
}) = crate::import::find_scheduled_change::<Block>(&header) {
let dest = index + delay;
if let Some(justification) = blockchain.justification(BlockId::Number(index.clone()))? {
result = Some((AuthoritySetProofFragment {
header: header.clone(),
justification,
}, dest));
} else {
return Err(ClientError::Backend("Unjustified block with authority set change".to_string()));
}
}
cache.as_mut().map(|cache| cache.new_item(index, result.clone()));
Ok(result)
}
/// Check GRANDPA proof-of-finality for the given block.
///
/// Returns the vector of headers that MUST be validated + imported
@@ -483,6 +627,98 @@ pub(crate) fn check_finality_proof<Block: BlockT, B, J>(
Ok(effects)
}
/// Check GRANDPA authority change sequence to assert finality of a target block.
///
/// Returns the header of the target block.
pub(crate) fn check_warp_sync_proof<Block: BlockT, J>(
current_set_id: u64,
current_authorities: AuthorityList,
remote_proof: Vec<u8>,
) -> ClientResult<(Block::Header, u64, AuthorityList)>
where
NumberFor<Block>: BlockNumberOps,
J: Decode + ProvableJustification<Block::Header> + BlockJustification<Block::Header>,
{
// decode finality proof
let proof = AuthoritySetProof::<Block::Header>::decode(&mut &remote_proof[..])
.map_err(|_| ClientError::BadJustification("failed to decode authority proof".into()))?;
let last = proof.len() - 1;
let mut result = (current_set_id, current_authorities, NumberFor::<Block>::zero());
for (ix, fragment) in proof.into_iter().enumerate() {
let is_last = ix == last;
result = check_warp_sync_proof_fragment::<Block, J>(
result.0,
&result.1,
&result.2,
is_last,
&fragment,
)?;
if is_last {
return Ok((fragment.header, result.0, result.1))
}
}
// empty proof can't prove anything
return Err(ClientError::BadJustification("empty proof of authority".into()));
}
/// Check finality authority set sequence.
fn check_warp_sync_proof_fragment<Block: BlockT, J>(
current_set_id: u64,
current_authorities: &AuthorityList,
previous_checked_block: &NumberFor<Block>,
is_last: bool,
authorities_proof: &AuthoritySetProofFragment<Block::Header>,
) -> ClientResult<(u64, AuthorityList, NumberFor<Block>)>
where
NumberFor<Block>: BlockNumberOps,
J: Decode + ProvableJustification<Block::Header> + BlockJustification<Block::Header>,
{
let justification: J = Decode::decode(&mut authorities_proof.justification.as_slice())
.map_err(|_| ClientError::JustificationDecode)?;
justification.verify(current_set_id, &current_authorities)?;
// assert justification is for this header
if &justification.number() != authorities_proof.header.number()
|| justification.hash().as_ref() != authorities_proof.header.hash().as_ref() {
return Err(ClientError::Backend("Invalid authority warp proof, justification do not match header".to_string()));
}
if authorities_proof.header.number() <= previous_checked_block {
return Err(ClientError::Backend("Invalid authority warp proof".to_string()));
}
let current_block = authorities_proof.header.number();
let mut at_block = None;
if let Some(sp_finality_grandpa::ScheduledChange {
next_authorities,
delay,
}) = crate::import::find_scheduled_change::<Block>(&authorities_proof.header) {
let dest = *current_block + delay;
at_block = Some((dest, next_authorities));
}
if let Some((block_number, sp_finality_grandpa::ScheduledChange {
next_authorities,
delay,
})) = crate::import::find_forced_change::<Block>(&authorities_proof.header) {
let dest = block_number + delay;
at_block = Some((dest, next_authorities));
}
// Fragment without change only allowed for proof last block.
if at_block.is_none() && !is_last {
return Err(ClientError::Backend("Invalid authority warp proof".to_string()));
}
if let Some((at_block, next_authorities)) = at_block {
Ok((current_set_id + 1, next_authorities, at_block))
} else {
Ok((current_set_id, current_authorities.clone(), current_block.clone()))
}
}
/// Check finality proof for the single block.
fn check_finality_proof_fragment<Block: BlockT, B, J>(
blockchain: &B,
@@ -551,6 +787,15 @@ impl<Header: HeaderT> AuthoritiesOrEffects<Header> {
}
}
/// Block info extracted from the justification.
pub(crate) trait BlockJustification<Header: HeaderT> {
/// Block number justified.
fn number(&self) -> Header::Number;
/// Block hash justified.
fn hash(&self) -> Header::Hash;
}
/// Justification used to prove block finality.
pub(crate) trait ProvableJustification<Header: HeaderT>: Encode + Decode {
/// Verify justification with respect to authorities set and authorities set id.
@@ -582,6 +827,68 @@ impl<Block: BlockT> ProvableJustification<Block::Header> for GrandpaJustificatio
}
}
impl<Block: BlockT> BlockJustification<Block::Header> for GrandpaJustification<Block> {
fn number(&self) -> NumberFor<Block> {
self.commit.target_number.clone()
}
fn hash(&self) -> Block::Hash {
self.commit.target_hash.clone()
}
}
/// Simple cache for warp sync queries.
pub struct WarpSyncFragmentCache<Header: HeaderT> {
cache: linked_hash_map::LinkedHashMap<
Header::Number,
Option<(AuthoritySetProofFragment<Header>, Header::Number)>,
>,
headers_with_justification: usize,
limit: usize,
}
impl<Header: HeaderT> WarpSyncFragmentCache<Header> {
/// Instantiate a new cache for the warp sync prover.
pub fn new(size: usize) -> Self {
WarpSyncFragmentCache {
cache: Default::default(),
headers_with_justification: 0,
limit: size,
}
}
fn new_item(
&mut self,
at: Header::Number,
item: Option<(AuthoritySetProofFragment<Header>, Header::Number)>,
) {
if self.cache.len() == self.limit {
self.pop_one();
}
if item.is_some() {
// we do not check previous value as cached value is always supposed to
// be queried before calling 'new_item'.
self.headers_with_justification += 1;
}
self.cache.insert(at, item);
}
fn pop_one(&mut self) {
while let Some(v) = self.cache.pop_front() {
if v.1.is_some() {
self.headers_with_justification -= 1;
break;
}
}
}
fn get_item(
&mut self,
block: Header::Number,
) -> Option<&mut Option<(AuthoritySetProofFragment<Header>, Header::Number)>> {
self.cache.get_refresh(&block)
}
}
#[cfg(test)]
pub(crate) mod tests {
use substrate_test_runtime_client::runtime::{Block, Header, H256};
@@ -635,6 +942,24 @@ pub(crate) mod tests {
}
}
#[derive(Debug, PartialEq, Encode, Decode)]
pub struct TestBlockJustification(TestJustification, u64, H256);
impl BlockJustification<Header> for TestBlockJustification {
fn number(&self) -> <Header as HeaderT>::Number {
self.1
}
fn hash(&self) -> <Header as HeaderT>::Hash {
self.2.clone()
}
}
impl ProvableJustification<Header> for TestBlockJustification {
fn verify(&self, set_id: u64, authorities: &[(AuthorityId, u64)]) -> ClientResult<()> {
self.0.verify(set_id, authorities)
}
}
fn header(number: u64) -> Header {
let parent_hash = match number {
0 => Default::default(),
@@ -1027,4 +1352,161 @@ pub(crate) mod tests {
).unwrap();
assert!(proof_of_4.is_none());
}
#[test]
fn warp_sync_proof_encoding_decoding() {
fn test_blockchain(
nb_blocks: u64,
mut set_change: &[(u64, Vec<u8>)],
mut justifications: &[(u64, Vec<u8>)],
) -> (InMemoryBlockchain<Block>, Vec<H256>) {
let blockchain = InMemoryBlockchain::<Block>::new();
let mut hashes = Vec::<H256>::new();
let mut set_id = 0;
for i in 0..nb_blocks {
let mut set_id_next = set_id;
let mut header = header(i);
set_change.first()
.map(|j| if i == j.0 {
set_change = &set_change[1..];
let next_authorities: Vec<_> = j.1.iter().map(|i| (AuthorityId::from_slice(&[*i; 32]), 1u64)).collect();
set_id_next += 1;
header.digest_mut().logs.push(
sp_runtime::generic::DigestItem::Consensus(
sp_finality_grandpa::GRANDPA_ENGINE_ID,
sp_finality_grandpa::ConsensusLog::ScheduledChange(
sp_finality_grandpa::ScheduledChange { delay: 0u64, next_authorities }
).encode(),
));
});
if let Some(parent) = hashes.last() {
header.set_parent_hash(parent.clone());
}
let header_hash = header.hash();
let justification = justifications.first()
.and_then(|j| if i == j.0 {
justifications = &justifications[1..];
let authority = j.1.iter().map(|j|
(AuthorityId::from_slice(&[*j; 32]), 1u64)
).collect();
let justification = TestBlockJustification(
TestJustification((set_id, authority), vec![i as u8]),
i,
header_hash,
);
Some(justification.encode())
} else {
None
});
hashes.push(header_hash.clone());
set_id = set_id_next;
blockchain.insert(header_hash, header, justification, None, NewBlockState::Final)
.unwrap();
}
(blockchain, hashes)
}
let (blockchain, hashes) = test_blockchain(
7,
vec![(3, vec![9])].as_slice(),
vec![
(1, vec![1, 2, 3]),
(2, vec![1, 2, 3]),
(3, vec![1, 2, 3]),
(4, vec![9]),
(6, vec![9]),
].as_slice(),
);
// proof after set change
let mut cache = WarpSyncFragmentCache::new(5);
let proof_no_cache = prove_warp_sync(&blockchain, hashes[6], None, Some(&mut cache)).unwrap();
let proof = prove_warp_sync(&blockchain, hashes[6], None, Some(&mut cache)).unwrap();
assert_eq!(proof_no_cache, proof);
let initial_authorities: Vec<_> = [1u8, 2, 3].iter().map(|i|
(AuthorityId::from_slice(&[*i; 32]), 1u64)
).collect();
let authorities_next: Vec<_> = [9u8].iter().map(|i|
(AuthorityId::from_slice(&[*i; 32]), 1u64)
).collect();
assert!(check_warp_sync_proof::<Block, TestBlockJustification>(
0,
initial_authorities.clone(),
proof.clone(),
).is_err());
assert!(check_warp_sync_proof::<Block, TestBlockJustification>(
0,
authorities_next.clone(),
proof.clone(),
).is_err());
assert!(check_warp_sync_proof::<Block, TestBlockJustification>(
1,
initial_authorities.clone(),
proof.clone(),
).is_err());
let (
_header,
current_set_id,
current_set,
) = check_warp_sync_proof::<Block, TestBlockJustification>(
1,
authorities_next.clone(),
proof.clone(),
).unwrap();
assert_eq!(current_set_id, 1);
assert_eq!(current_set, authorities_next);
// proof before set change
let proof = prove_warp_sync(&blockchain, hashes[1], None, None).unwrap();
let (
_header,
current_set_id,
current_set,
) = check_warp_sync_proof::<Block, TestBlockJustification>(
0,
initial_authorities.clone(),
proof.clone(),
).unwrap();
assert_eq!(current_set_id, 1);
assert_eq!(current_set, authorities_next);
// two changes
let (blockchain, hashes) = test_blockchain(
13,
vec![(3, vec![7]), (8, vec![9])].as_slice(),
vec![
(1, vec![1, 2, 3]),
(2, vec![1, 2, 3]),
(3, vec![1, 2, 3]),
(4, vec![7]),
(6, vec![7]),
(8, vec![7]), // warning, requires a justification on change set
(10, vec![9]),
].as_slice(),
);
// proof before set change
let proof = prove_warp_sync(&blockchain, hashes[1], None, None).unwrap();
let (
_header,
current_set_id,
current_set,
) = check_warp_sync_proof::<Block, TestBlockJustification>(
0,
initial_authorities.clone(),
proof.clone(),
).unwrap();
assert_eq!(current_set_id, 2);
assert_eq!(current_set, authorities_next);
}
}
@@ -182,7 +182,7 @@ impl<'a, Block: 'a + BlockT> Drop for PendingSetChanges<'a, Block> {
}
}
fn find_scheduled_change<B: BlockT>(header: &B::Header)
pub(crate) fn find_scheduled_change<B: BlockT>(header: &B::Header)
-> Option<ScheduledChange<NumberFor<B>>>
{
let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
@@ -197,7 +197,7 @@ fn find_scheduled_change<B: BlockT>(header: &B::Header)
header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
}
fn find_forced_change<B: BlockT>(header: &B::Header)
pub(crate) fn find_forced_change<B: BlockT>(header: &B::Header)
-> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)>
{
let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
@@ -130,6 +130,7 @@ pub use voting_rule::{
BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRulesBuilder
};
pub use finality_grandpa::voter::report;
pub use finality_proof::{prove_warp_sync, WarpSyncFragmentCache};
use aux_schema::PersistentData;
use environment::{Environment, VoterSetState};