mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 07:41:08 +00:00
Import multiple authority set change blocks (#1808)
* core: implement logic for tracking dag of possible pending changes * core: move pending justifications dag to its own crate * core: remove unnecessary clone bounds on dag * core: request justifications in-order from the dag * core: dag: rename changes variables to node * core: dag: allow finalizing blocks not part of dag * core: dag: track best finalized number * core: dag: add more tests * core: sync: clean up pending justifications dag * core: dag: derive codec decode encode * core: dag: better error support * core: dag: add finalization guarded by predicate * core: grandpa: track multiple authority set changes in dag * core: dag: add pre-order iterator * core: grandpa: request justifications on startup * core: dag: rearrange order of definitions * core: rename util/dag to util/fork_tree * core: fork_tree: add docs * core: fork_tree: add more tests * core: fork_tree: fix issues found in tests * core: grandpa: fix authorities tests * core: grandpa: add docs for is_descendent_of * core: sync: add docs for PendingJustifications * core: sync: add test for justification requests across forks * core: sync: don't resend import or finality notifications in tests * core: grandpa: add test for importing multiple change blocks * core: grandpa: fix logic for checking if a block enacts a change * core: grandpa: fix authorities tests
This commit is contained in:
@@ -68,6 +68,9 @@ pub trait Client<Block: BlockT>: Send + Sync {
|
||||
max: Block::Hash,
|
||||
key: &StorageKey
|
||||
) -> Result<ChangesProof<Block::Header>, Error>;
|
||||
|
||||
/// Returns `true` if the given `block` is a descendent of `base`.
|
||||
fn is_descendent_of(&self, base: &Block::Hash, block: &Block::Hash) -> Result<bool, Error>;
|
||||
}
|
||||
|
||||
impl<B, E, Block, RA> Client<Block> for SubstrateClient<B, E, Block, RA> where
|
||||
@@ -129,4 +132,18 @@ impl<B, E, Block, RA> Client<Block> for SubstrateClient<B, E, Block, RA> where
|
||||
) -> Result<ChangesProof<Block::Header>, Error> {
|
||||
(self as &SubstrateClient<B, E, Block, RA>).key_changes_proof(first, last, min, max, key)
|
||||
}
|
||||
|
||||
fn is_descendent_of(&self, base: &Block::Hash, block: &Block::Hash) -> Result<bool, Error> {
|
||||
if base == block {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let tree_route = ::client::blockchain::tree_route(
|
||||
self.backend().blockchain(),
|
||||
BlockId::Hash(*block),
|
||||
BlockId::Hash(*base),
|
||||
)?;
|
||||
|
||||
Ok(tree_route.common_block().hash == *base)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -874,7 +874,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
|
||||
fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) {
|
||||
self.sync.block_finalized(&hash, *header.number());
|
||||
self.sync.on_block_finalized(
|
||||
&hash,
|
||||
*header.number(),
|
||||
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
|
||||
);
|
||||
}
|
||||
|
||||
fn on_remote_call_request(
|
||||
|
||||
@@ -14,10 +14,11 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::time::{Duration, Instant};
|
||||
use log::{trace, debug};
|
||||
use log::{debug, trace, warn};
|
||||
use crate::protocol::Context;
|
||||
use fork_tree::ForkTree;
|
||||
use network_libp2p::{Severity, NodeIndex};
|
||||
use client::{BlockStatus, ClientInfo};
|
||||
use consensus::BlockOrigin;
|
||||
@@ -65,9 +66,13 @@ enum PeerSyncState<B: BlockT> {
|
||||
/// Pending justification request for the given block (hash and number).
|
||||
type PendingJustification<B> = (<B as BlockT>::Hash, NumberFor<B>);
|
||||
|
||||
/// Manages pending block justification requests.
|
||||
/// Manages pending block justification requests. Multiple justifications may be
|
||||
/// requested for competing forks, or for the same branch at different
|
||||
/// (increasing) heights. This structure will guarantee that justifications are
|
||||
/// fetched in-order, and that obsolete changes are pruned (when finalizing a
|
||||
/// competing fork).
|
||||
struct PendingJustifications<B: BlockT> {
|
||||
justifications: HashSet<PendingJustification<B>>,
|
||||
justifications: ForkTree<B::Hash, NumberFor<B>, ()>,
|
||||
pending_requests: VecDeque<PendingJustification<B>>,
|
||||
peer_requests: HashMap<NodeIndex, PendingJustification<B>>,
|
||||
previous_requests: HashMap<PendingJustification<B>, Vec<(NodeIndex, Instant)>>,
|
||||
@@ -76,7 +81,7 @@ struct PendingJustifications<B: BlockT> {
|
||||
impl<B: BlockT> PendingJustifications<B> {
|
||||
fn new() -> PendingJustifications<B> {
|
||||
PendingJustifications {
|
||||
justifications: HashSet::new(),
|
||||
justifications: ForkTree::new(),
|
||||
pending_requests: VecDeque::new(),
|
||||
peer_requests: HashMap::new(),
|
||||
previous_requests: HashMap::new(),
|
||||
@@ -183,11 +188,26 @@ impl<B: BlockT> PendingJustifications<B> {
|
||||
}
|
||||
|
||||
/// Queue a justification request (without dispatching it).
|
||||
fn queue_request(&mut self, justification: &PendingJustification<B>) {
|
||||
if !self.justifications.insert(*justification) {
|
||||
return;
|
||||
}
|
||||
self.pending_requests.push_back(*justification);
|
||||
fn queue_request<F>(
|
||||
&mut self,
|
||||
justification: &PendingJustification<B>,
|
||||
is_descendent_of: F,
|
||||
) where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError> {
|
||||
match self.justifications.import(justification.0.clone(), justification.1.clone(), (), &is_descendent_of) {
|
||||
Ok(true) => {
|
||||
// this is a new root so we add it to the current `pending_requests`
|
||||
self.pending_requests.push_back((justification.0, justification.1));
|
||||
},
|
||||
Err(err) => {
|
||||
warn!(target: "sync", "Failed to insert requested justification {:?} {:?} into tree: {:?}",
|
||||
justification.0,
|
||||
justification.1,
|
||||
err,
|
||||
);
|
||||
return;
|
||||
},
|
||||
_ => {},
|
||||
};
|
||||
}
|
||||
|
||||
/// Retry any pending request if a peer disconnected.
|
||||
@@ -202,8 +222,20 @@ impl<B: BlockT> PendingJustifications<B> {
|
||||
fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
|
||||
let request = (hash, number);
|
||||
if success {
|
||||
self.justifications.remove(&request);
|
||||
self.previous_requests.remove(&request);
|
||||
if self.justifications.finalize_root(&request.0).is_none() {
|
||||
warn!(target: "sync", "Imported justification for {:?} {:?} which isn't a root in the tree: {:?}",
|
||||
request.0,
|
||||
request.1,
|
||||
self.justifications.roots().collect::<Vec<_>>(),
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
self.previous_requests.clear();
|
||||
self.peer_requests.clear();
|
||||
self.pending_requests =
|
||||
self.justifications.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect();
|
||||
|
||||
return;
|
||||
}
|
||||
self.pending_requests.push_front(request);
|
||||
@@ -226,6 +258,7 @@ impl<B: BlockT> PendingJustifications<B> {
|
||||
import_queue.import_justification(who.clone(), request.0, request.1, justification);
|
||||
return
|
||||
}
|
||||
|
||||
self.previous_requests
|
||||
.entry(request)
|
||||
.or_insert(Vec::new())
|
||||
@@ -236,11 +269,25 @@ impl<B: BlockT> PendingJustifications<B> {
|
||||
|
||||
/// Removes any pending justification requests for blocks lower than the
|
||||
/// given best finalized.
|
||||
fn collect_garbage(&mut self, best_finalized: NumberFor<B>) {
|
||||
self.justifications.retain(|(_, n)| *n > best_finalized);
|
||||
self.pending_requests.retain(|(_, n)| *n > best_finalized);
|
||||
self.peer_requests.retain(|_, (_, n)| *n > best_finalized);
|
||||
self.previous_requests.retain(|(_, n), _| *n > best_finalized);
|
||||
fn on_block_finalized<F>(
|
||||
&mut self,
|
||||
best_finalized_hash: &B::Hash,
|
||||
best_finalized_number: NumberFor<B>,
|
||||
is_descendent_of: F,
|
||||
) -> Result<(), fork_tree::Error<ClientError>>
|
||||
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
|
||||
{
|
||||
use std::collections::HashSet;
|
||||
|
||||
self.justifications.finalize(best_finalized_hash, best_finalized_number, &is_descendent_of)?;
|
||||
|
||||
let roots = self.justifications.roots().collect::<HashSet<_>>();
|
||||
|
||||
self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
|
||||
self.peer_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
|
||||
self.previous_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -579,7 +626,11 @@ impl<B: BlockT> ChainSync<B> {
|
||||
///
|
||||
/// Queues a new justification request and tries to dispatch all pending requests.
|
||||
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut Context<B>) {
|
||||
self.justifications.queue_request(&(*hash, number));
|
||||
self.justifications.queue_request(
|
||||
&(*hash, number),
|
||||
|base, block| protocol.client().is_descendent_of(base, block),
|
||||
);
|
||||
|
||||
self.justifications.dispatch(&mut self.peers, protocol);
|
||||
}
|
||||
|
||||
@@ -598,8 +649,14 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
/// Notify about finalization of the given block.
|
||||
pub fn block_finalized(&mut self, _hash: &B::Hash, number: NumberFor<B>) {
|
||||
self.justifications.collect_garbage(number);
|
||||
pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut Context<B>) {
|
||||
if let Err(err) = self.justifications.on_block_finalized(
|
||||
hash,
|
||||
number,
|
||||
|base, block| protocol.client().is_descendent_of(base, block),
|
||||
) {
|
||||
warn!(target: "sync", "Error cleaning up pending justification requests: {:?}", err);
|
||||
};
|
||||
}
|
||||
|
||||
fn block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||
|
||||
@@ -124,6 +124,8 @@ pub struct Peer<D> {
|
||||
pub import_queue: Box<ImportQueue<Block>>,
|
||||
network_sender: NetworkChan<Block>,
|
||||
pub data: D,
|
||||
best_hash: Mutex<Option<H256>>,
|
||||
finalized_hash: Mutex<Option<H256>>,
|
||||
}
|
||||
|
||||
impl<D> Peer<D> {
|
||||
@@ -143,6 +145,8 @@ impl<D> Peer<D> {
|
||||
network_sender,
|
||||
network_port,
|
||||
data,
|
||||
best_hash: Mutex::new(None),
|
||||
finalized_hash: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
/// Called after blockchain has been populated to updated current state.
|
||||
@@ -222,19 +226,39 @@ impl<D> Peer<D> {
|
||||
/// Send block import notifications.
|
||||
fn send_import_notifications(&self) {
|
||||
let info = self.client.info().expect("In-mem client does not fail");
|
||||
|
||||
let mut best_hash = self.best_hash.lock();
|
||||
match *best_hash {
|
||||
None => {},
|
||||
Some(hash) if hash != info.chain.best_hash => {},
|
||||
_ => return,
|
||||
}
|
||||
|
||||
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::BlockImported(info.chain.best_hash, header));
|
||||
|
||||
*best_hash = Some(info.chain.best_hash);
|
||||
}
|
||||
|
||||
/// Send block finalization notifications.
|
||||
pub fn send_finality_notifications(&self) {
|
||||
let info = self.client.info().expect("In-mem client does not fail");
|
||||
|
||||
let mut finalized_hash = self.finalized_hash.lock();
|
||||
match *finalized_hash {
|
||||
None => {},
|
||||
Some(hash) if hash != info.chain.finalized_hash => {},
|
||||
_ => return,
|
||||
}
|
||||
|
||||
let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap();
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone()));
|
||||
|
||||
*finalized_hash = Some(info.chain.finalized_hash);
|
||||
}
|
||||
|
||||
/// Restart sync for a peer.
|
||||
@@ -296,7 +320,7 @@ impl<D> Peer<D> {
|
||||
}
|
||||
|
||||
/// Add blocks to the peer -- edit the block before adding
|
||||
pub fn generate_blocks<F>(&self, count: usize, origin: BlockOrigin, edit_block: F)
|
||||
pub fn generate_blocks<F>(&self, count: usize, origin: BlockOrigin, edit_block: F) -> H256
|
||||
where F: FnMut(BlockBuilder<Block, PeersClient>) -> Block
|
||||
{
|
||||
let best_hash = self.client.info().unwrap().chain.best_hash;
|
||||
@@ -305,11 +329,12 @@ impl<D> Peer<D> {
|
||||
|
||||
/// Add blocks to the peer -- edit the block before adding. The chain will
|
||||
/// start at the given block iD.
|
||||
pub fn generate_blocks_at<F>(&self, mut at: BlockId<Block>, count: usize, origin: BlockOrigin, mut edit_block: F)
|
||||
pub fn generate_blocks_at<F>(&self, at: BlockId<Block>, count: usize, origin: BlockOrigin, mut edit_block: F) -> H256
|
||||
where F: FnMut(BlockBuilder<Block, PeersClient>) -> Block
|
||||
{
|
||||
let mut at = self.client.header(&at).unwrap().unwrap().hash();
|
||||
for _ in 0..count {
|
||||
let builder = self.client.new_block_at(&at).unwrap();
|
||||
let builder = self.client.new_block_at(&BlockId::Hash(at)).unwrap();
|
||||
let block = edit_block(builder);
|
||||
let hash = block.header.hash();
|
||||
trace!(
|
||||
@@ -319,7 +344,7 @@ impl<D> Peer<D> {
|
||||
block.header.parent_hash
|
||||
);
|
||||
let header = block.header.clone();
|
||||
at = BlockId::Hash(hash);
|
||||
at = hash;
|
||||
|
||||
self.import_queue.import_blocks(
|
||||
origin,
|
||||
@@ -336,17 +361,18 @@ impl<D> Peer<D> {
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
}
|
||||
}
|
||||
at
|
||||
}
|
||||
|
||||
/// Push blocks to the peer (simplified: with or without a TX)
|
||||
pub fn push_blocks(&self, count: usize, with_tx: bool) {
|
||||
pub fn push_blocks(&self, count: usize, with_tx: bool) -> H256 {
|
||||
let best_hash = self.client.info().unwrap().chain.best_hash;
|
||||
self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx);
|
||||
self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx)
|
||||
}
|
||||
|
||||
/// Push blocks to the peer (simplified: with or without a TX) starting from
|
||||
/// given hash.
|
||||
pub fn push_blocks_at(&self, at: BlockId<Block>, count: usize, with_tx: bool) {
|
||||
pub fn push_blocks_at(&self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 {
|
||||
let mut nonce = 0;
|
||||
if with_tx {
|
||||
self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| {
|
||||
@@ -360,17 +386,17 @@ impl<D> Peer<D> {
|
||||
builder.push(Extrinsic::Transfer(transfer, signature)).unwrap();
|
||||
nonce = nonce + 1;
|
||||
builder.bake().unwrap()
|
||||
});
|
||||
})
|
||||
} else {
|
||||
self.generate_blocks_at(at, count, BlockOrigin::File, |builder| builder.bake().unwrap());
|
||||
self.generate_blocks_at(at, count, BlockOrigin::File, |builder| builder.bake().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_authorities_change_block(&self, new_authorities: Vec<Ed25519AuthorityId>) {
|
||||
pub fn push_authorities_change_block(&self, new_authorities: Vec<Ed25519AuthorityId>) -> H256 {
|
||||
self.generate_blocks(1, BlockOrigin::File, |mut builder| {
|
||||
builder.push(Extrinsic::AuthoritiesChange(new_authorities.clone())).unwrap();
|
||||
builder.bake().unwrap()
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a reference to the client.
|
||||
@@ -571,7 +597,7 @@ pub trait TestNetFactory: Sized {
|
||||
let mut done = 0;
|
||||
|
||||
loop {
|
||||
if done > 10 { break; }
|
||||
if done > 3 { break; }
|
||||
if self.done() {
|
||||
done += 1;
|
||||
} else {
|
||||
|
||||
@@ -80,11 +80,46 @@ fn sync_justifications() {
|
||||
assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), None);
|
||||
assert_eq!(net.peer(1).client().justification(&BlockId::Number(10)).unwrap(), None);
|
||||
|
||||
// we finalize block #10 for peer 0 with a justification
|
||||
// we finalize block #10, #15 and #20 for peer 0 with a justification
|
||||
net.peer(0).client().finalize_block(BlockId::Number(10), Some(Vec::new()), true).unwrap();
|
||||
net.peer(0).client().finalize_block(BlockId::Number(15), Some(Vec::new()), true).unwrap();
|
||||
net.peer(0).client().finalize_block(BlockId::Number(20), Some(Vec::new()), true).unwrap();
|
||||
|
||||
let header = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap();
|
||||
net.peer(1).request_justification(&header.hash().into(), 10);
|
||||
let h1 = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap();
|
||||
let h2 = net.peer(1).client().header(&BlockId::Number(15)).unwrap().unwrap();
|
||||
let h3 = net.peer(1).client().header(&BlockId::Number(20)).unwrap().unwrap();
|
||||
|
||||
// peer 1 should get the justifications from the network
|
||||
net.peer(1).request_justification(&h1.hash().into(), 10);
|
||||
net.peer(1).request_justification(&h2.hash().into(), 15);
|
||||
net.peer(1).request_justification(&h3.hash().into(), 20);
|
||||
|
||||
net.sync();
|
||||
|
||||
for height in (10..21).step_by(5) {
|
||||
assert_eq!(net.peer(0).client().justification(&BlockId::Number(height)).unwrap(), Some(Vec::new()));
|
||||
assert_eq!(net.peer(1).client().justification(&BlockId::Number(height)).unwrap(), Some(Vec::new()));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_justifications_across_forks() {
|
||||
let _ = ::env_logger::try_init();
|
||||
let mut net = JustificationTestNet::new(3);
|
||||
// we push 5 blocks
|
||||
net.peer(0).push_blocks(5, false);
|
||||
// and then two forks 5 and 6 blocks long
|
||||
let f1_best = net.peer(0).push_blocks_at(BlockId::Number(5), 5, false);
|
||||
let f2_best = net.peer(0).push_blocks_at(BlockId::Number(5), 6, false);
|
||||
|
||||
// peer 1 will only see the longer fork. but we'll request justifications
|
||||
// for both and finalize the small fork instead.
|
||||
net.sync();
|
||||
|
||||
net.peer(0).client().finalize_block(BlockId::Hash(f1_best), Some(Vec::new()), true).unwrap();
|
||||
|
||||
net.peer(1).request_justification(&f1_best, 10);
|
||||
net.peer(1).request_justification(&f2_best, 11);
|
||||
|
||||
net.sync();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user