Light GRANDPA import handler (#1669)

* GrandpaLightBlockImport

* extract authorities in AuraVerifier

* post-merge fix

* restore authorities cache

* license

* new finality proof draft

* generalized PendingJustifications

* finality proof messages

* fixed compilation

* pass verifier to import_finality_proof

* do not fetch remote proof from light import directly

* FinalityProofProvider

* fixed authorities cache test

* restored finality proof tests

* finality_proof docs

* use DB backend in test client

* justification_is_fetched_by_light_client_when_consensus_data_changes

* restore justification_is_fetched_by_light_client_when_consensus_data_changes

* some more tests

* added authorities-related TODO

* removed unneeded clear_finality_proof_requests field

* truncated some long lines

* more granular light import tests

* only provide finality proof if it is generated by the requested set

* post-merge fix

* finality_proof_is_none_if_first_justification_is_generated_by_unknown_set

* make light+grandpa test rely on finality proofs (instead of simple justifications)

* empty_finality_proof_is_returned_to_light_client_when_authority_set_is_different

* missing trait method impl

* fixed proof-of-finality docs

* one more doc fix

* fix docs

* initialize authorities cache (post-merge fix)

* fixed cache initialization (post-merge fix)

* post-fix merge: fix light + GRANDPA tests (bad way)

* proper fix of empty_finality_proof_is_returned_to_light_client_when_authority_set_is_different

* fixed easy grumbles

* import finality proofs in BlockImportWorker thread

* allow import of finality proofs for non-requested blocks

* limit number of fragments in finality proof

* GRANDPA post-merge fix

* BABE: pos-merge fix
This commit is contained in:
Svyatoslav Nikolsky
2019-05-13 12:36:52 +03:00
committed by Gavin Wood
parent 258f0835e4
commit 22586113ea
36 changed files with 3320 additions and 803 deletions
+89 -284
View File
@@ -32,18 +32,16 @@
use std::cmp::max;
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use log::{debug, trace, info, warn};
use log::{debug, trace, warn, info};
use crate::protocol::Context;
use fork_tree::ForkTree;
use network_libp2p::PeerId;
use client::{BlockStatus, ClientInfo};
use consensus::{BlockOrigin, import_queue::IncomingBlock};
use consensus::{BlockOrigin, import_queue::{IncomingBlock, SharedFinalityProofRequestBuilder}};
use client::error::Error as ClientError;
use crate::blocks::BlockCollection;
use runtime_primitives::Justification;
use crate::extra_requests::ExtraRequestsAggregator;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero, CheckedSub};
use runtime_primitives::generic::BlockId;
use runtime_primitives::{Justification, generic::BlockId};
use crate::message;
use crate::config::Roles;
use std::collections::HashSet;
@@ -54,8 +52,6 @@ const MAX_BLOCKS_TO_REQUEST: usize = 128;
const MAX_IMPORTING_BLOCKS: usize = 2048;
// Number of blocks in the queue that prevents ancestry search.
const MAJOR_SYNC_BLOCKS: usize = 5;
// Time to wait before trying to get a justification from the same peer.
const JUSTIFICATION_RETRY_WAIT: Duration = Duration::from_secs(10);
// Number of recently announced blocks to track for each peer.
const ANNOUNCE_HISTORY_SIZE: usize = 64;
// Max number of blocks to download for unknown forks.
@@ -68,7 +64,7 @@ const ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE: i32 = -(1 << 9);
const GENESIS_MISMATCH_REPUTATION_CHANGE: i32 = i32::min_value() + 1;
#[derive(Debug)]
struct PeerSync<B: BlockT> {
pub(crate) struct PeerSync<B: BlockT> {
pub common_number: NumberFor<B>,
pub best_hash: B::Hash,
pub best_number: NumberFor<B>,
@@ -86,7 +82,7 @@ pub(crate) struct PeerInfo<B: BlockT> {
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum AncestorSearchState<B: BlockT> {
pub(crate) enum AncestorSearchState<B: BlockT> {
/// Use exponential backoff to find an ancestor, then switch to binary search.
/// We keep track of the exponent.
ExponentialBackoff(NumberFor<B>),
@@ -96,270 +92,13 @@ enum AncestorSearchState<B: BlockT> {
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum PeerSyncState<B: BlockT> {
pub(crate) enum PeerSyncState<B: BlockT> {
AncestorSearch(NumberFor<B>, AncestorSearchState<B>),
Available,
DownloadingNew(NumberFor<B>),
DownloadingStale(B::Hash),
DownloadingJustification(B::Hash),
}
/// Pending justification request for the given block (hash and number).
type PendingJustification<B> = (<B as BlockT>::Hash, NumberFor<B>);
/// Manages pending block justification requests. Multiple justifications may be
/// requested for competing forks, or for the same branch at different
/// (increasing) heights. This structure will guarantee that justifications are
/// fetched in-order, and that obsolete changes are pruned (when finalizing a
/// competing fork).
struct PendingJustifications<B: BlockT> {
justifications: ForkTree<B::Hash, NumberFor<B>, ()>,
pending_requests: VecDeque<PendingJustification<B>>,
peer_requests: HashMap<PeerId, PendingJustification<B>>,
previous_requests: HashMap<PendingJustification<B>, Vec<(PeerId, Instant)>>,
importing_requests: HashSet<PendingJustification<B>>,
}
impl<B: BlockT> PendingJustifications<B> {
fn new() -> PendingJustifications<B> {
PendingJustifications {
justifications: ForkTree::new(),
pending_requests: VecDeque::new(),
peer_requests: HashMap::new(),
previous_requests: HashMap::new(),
importing_requests: HashSet::new(),
}
}
/// Dispatches all possible pending requests to the given peers. Peers are
/// filtered according to the current known best block (i.e. we won't send a
/// justification request for block #10 to a peer at block #2), and we also
/// throttle requests to the same peer if a previous justification request
/// yielded no results.
fn dispatch(&mut self, peers: &mut HashMap<PeerId, PeerSync<B>>, protocol: &mut Context<B>) {
if self.pending_requests.is_empty() {
return;
}
let initial_pending_requests = self.pending_requests.len();
// clean up previous failed requests so we can retry again
for (_, requests) in self.previous_requests.iter_mut() {
requests.retain(|(_, instant)| instant.elapsed() < JUSTIFICATION_RETRY_WAIT);
}
let mut available_peers = peers.iter().filter_map(|(peer, sync)| {
// don't request to any peers that already have pending requests or are unavailable
if sync.state != PeerSyncState::Available || self.peer_requests.contains_key(&peer) {
None
} else {
Some((peer.clone(), sync.best_number))
}
}).collect::<VecDeque<_>>();
let mut last_peer = available_peers.back().map(|p| p.0.clone());
let mut unhandled_requests = VecDeque::new();
loop {
let (peer, peer_best_number) = match available_peers.pop_front() {
Some(p) => p,
_ => break,
};
// only ask peers that have synced past the block number that we're
// asking the justification for and to whom we haven't already made
// the same request recently
let peer_eligible = {
let request = match self.pending_requests.front() {
Some(r) => r.clone(),
_ => break,
};
peer_best_number >= request.1 &&
!self.previous_requests
.get(&request)
.map(|requests| requests.iter().any(|i| i.0 == peer))
.unwrap_or(false)
};
if !peer_eligible {
available_peers.push_back((peer.clone(), peer_best_number));
// we tried all peers and none can answer this request
if Some(peer) == last_peer {
last_peer = available_peers.back().map(|p| p.0.clone());
let request = self.pending_requests.pop_front()
.expect("verified to be Some in the beginning of the loop; qed");
unhandled_requests.push_back(request);
}
continue;
}
last_peer = available_peers.back().map(|p| p.0.clone());
let request = self.pending_requests.pop_front()
.expect("verified to be Some in the beginning of the loop; qed");
self.peer_requests.insert(peer.clone(), request);
peers.get_mut(&peer)
.expect("peer was is taken from available_peers; available_peers is a subset of peers; qed")
.state = PeerSyncState::DownloadingJustification(request.0);
trace!(target: "sync", "Requesting justification for block #{} from {}", request.0, peer);
let request = message::generic::BlockRequest {
id: 0,
fields: message::BlockAttributes::JUSTIFICATION,
from: message::FromBlock::Hash(request.0),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
};
protocol.send_block_request(peer, request);
}
self.pending_requests.append(&mut unhandled_requests);
trace!(target: "sync", "Dispatched {} justification requests ({} pending)",
initial_pending_requests - self.pending_requests.len(),
self.pending_requests.len(),
);
}
/// Queue a justification request (without dispatching it).
fn queue_request<F>(
&mut self,
justification: &PendingJustification<B>,
is_descendent_of: F,
) where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError> {
match self.justifications.import(justification.0.clone(), justification.1.clone(), (), &is_descendent_of) {
Ok(true) => {
// this is a new root so we add it to the current `pending_requests`
self.pending_requests.push_back((justification.0, justification.1));
},
Err(err) => {
warn!(target: "sync", "Failed to insert requested justification {:?} {:?} into tree: {:?}",
justification.0,
justification.1,
err,
);
return;
},
_ => {},
};
}
/// Retry any pending request if a peer disconnected.
fn peer_disconnected(&mut self, who: PeerId) {
if let Some(request) = self.peer_requests.remove(&who) {
self.pending_requests.push_front(request);
}
}
/// Process the import of a justification.
/// Queues a retry in case the import failed.
fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
let request = (hash, number);
if !self.importing_requests.remove(&request) {
debug!(target: "sync", "Got justification import result for unknown justification {:?} {:?} request.",
request.0,
request.1,
);
return;
};
if success {
if self.justifications.finalize_root(&request.0).is_none() {
warn!(target: "sync", "Imported justification for {:?} {:?} which isn't a root in the tree: {:?}",
request.0,
request.1,
self.justifications.roots().collect::<Vec<_>>(),
);
return;
};
self.previous_requests.clear();
self.peer_requests.clear();
self.pending_requests =
self.justifications.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect();
return;
}
self.pending_requests.push_front(request);
}
/// Processes the response for the request previously sent to the given
/// peer. Queues a retry in case the given justification
/// was `None`.
///
/// Returns `Some` if this produces a justification that must be imported in the import queue.
#[must_use]
fn on_response(
&mut self,
who: PeerId,
justification: Option<Justification>,
) -> Option<(PeerId, B::Hash, NumberFor<B>, Justification)> {
// we assume that the request maps to the given response, this is
// currently enforced by the outer network protocol before passing on
// messages to chain sync.
if let Some(request) = self.peer_requests.remove(&who) {
if let Some(justification) = justification {
self.importing_requests.insert(request);
return Some((who, request.0, request.1, justification))
}
self.previous_requests
.entry(request)
.or_insert(Vec::new())
.push((who, Instant::now()));
self.pending_requests.push_front(request);
}
None
}
/// Removes any pending justification requests for blocks lower than the
/// given best finalized.
fn on_block_finalized<F>(
&mut self,
best_finalized_hash: &B::Hash,
best_finalized_number: NumberFor<B>,
is_descendent_of: F,
) -> Result<(), fork_tree::Error<ClientError>>
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
{
if self.importing_requests.contains(&(*best_finalized_hash, best_finalized_number)) {
// we imported this justification ourselves, so we should get back a response
// from the import queue through `justification_import_result`
return Ok(());
}
self.justifications.finalize(best_finalized_hash, best_finalized_number, &is_descendent_of)?;
let roots = self.justifications.roots().collect::<HashSet<_>>();
self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
self.peer_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
self.previous_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));
Ok(())
}
/// Clear all data.
fn clear(&mut self) {
self.justifications = ForkTree::new();
self.pending_requests.clear();
self.peer_requests.clear();
self.previous_requests.clear();
}
DownloadingFinalityProof(B::Hash),
}
/// Relay chain sync strategy.
@@ -370,7 +109,7 @@ pub struct ChainSync<B: BlockT> {
best_queued_number: NumberFor<B>,
best_queued_hash: B::Hash,
required_block_attributes: message::BlockAttributes,
justifications: PendingJustifications<B>,
extra_requests: ExtraRequestsAggregator<B>,
queue_blocks: HashSet<B::Hash>,
best_importing_number: NumberFor<B>,
}
@@ -428,7 +167,7 @@ impl<B: BlockT> ChainSync<B> {
blocks: BlockCollection::new(),
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
justifications: PendingJustifications::new(),
extra_requests: ExtraRequestsAggregator::new(),
required_block_attributes,
queue_blocks: Default::default(),
best_importing_number: Zero::zero(),
@@ -664,7 +403,7 @@ impl<B: BlockT> ChainSync<B> {
vec![]
}
},
PeerSyncState::Available | PeerSyncState::DownloadingJustification(..) => Vec::new(),
PeerSyncState::Available | PeerSyncState::DownloadingJustification(..) | PeerSyncState::DownloadingFinalityProof(..) => Vec::new(),
}
} else {
Vec::new()
@@ -722,7 +461,7 @@ impl<B: BlockT> ChainSync<B> {
return None;
}
return self.justifications.on_response(
return self.extra_requests.justifications().on_response(
who,
response.justification,
);
@@ -744,6 +483,42 @@ impl<B: BlockT> ChainSync<B> {
None
}
/// Handle new finality proof data.
pub(crate) fn on_block_finality_proof_data(
&mut self,
protocol: &mut Context<B>,
who: PeerId,
response: message::FinalityProofResponse<B::Hash>,
) -> Option<(PeerId, B::Hash, NumberFor<B>, Vec<u8>)> {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state {
peer.state = PeerSyncState::Available;
// we only request one finality proof at a time
if hash != response.block {
info!(
"Invalid block finality proof provided: requested: {:?} got: {:?}",
hash,
response.block,
);
protocol.report_peer(who.clone(), i32::min_value());
protocol.disconnect_peer(who);
return None;
}
return self.extra_requests.finality_proofs().on_response(
who,
response.proof,
);
}
}
self.maintain_sync(protocol);
None
}
/// A batch of blocks have been processed, with or without errors.
/// Call this when a batch of blocks have been processed by the import queue, with or without
/// errors.
pub fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) {
@@ -761,13 +536,13 @@ impl<B: BlockT> ChainSync<B> {
for peer in peers {
self.download_new(protocol, peer);
}
self.justifications.dispatch(&mut self.peers, protocol);
self.extra_requests.dispatch(&mut self.peers, protocol);
}
/// Called periodically to perform any time-based actions. Must be called at a regular
/// interval.
pub fn tick(&mut self, protocol: &mut Context<B>) {
self.justifications.dispatch(&mut self.peers, protocol);
self.extra_requests.dispatch(&mut self.peers, protocol);
}
/// Request a justification for the given block.
@@ -775,23 +550,53 @@ impl<B: BlockT> ChainSync<B> {
/// Uses `protocol` to queue a new justification request and tries to dispatch all pending
/// requests.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut Context<B>) {
self.justifications.queue_request(
&(*hash, number),
self.extra_requests.justifications().queue_request(
(*hash, number),
|base, block| protocol.client().is_descendent_of(base, block),
);
self.justifications.dispatch(&mut self.peers, protocol);
self.extra_requests.justifications().dispatch(&mut self.peers, protocol);
}
/// Clears all pending justification requests.
pub fn clear_justification_requests(&mut self) {
self.justifications.clear();
self.extra_requests.justifications().clear();
}
/// Call this when a justification has been processed by the import queue, with or without
/// errors.
pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
self.justifications.justification_import_result(hash, number, success);
let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
if !self.extra_requests.justifications().on_import_result((hash, number), finalization_result) {
debug!(target: "sync", "Got justification import result for unknown justification {:?} {:?} request.",
hash,
number,
);
}
}
/// Request a finality proof for the given block.
///
/// Queues a new finality proof request and tries to dispatch all pending requests.
pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut Context<B>) {
self.extra_requests.finality_proofs().queue_request(
(*hash, number),
|base, block| protocol.client().is_descendent_of(base, block),
);
self.extra_requests.finality_proofs().dispatch(&mut self.peers, protocol);
}
pub fn finality_proof_import_result(
&mut self,
request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {
self.extra_requests.finality_proofs().on_import_result(request_block, finalization_result);
}
pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder<B>) {
self.extra_requests.finality_proofs().essence().0 = Some(request_builder);
}
/// Notify about successful import of the given block.
@@ -801,12 +606,12 @@ impl<B: BlockT> ChainSync<B> {
/// Notify about finalization of the given block.
pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut Context<B>) {
if let Err(err) = self.justifications.on_block_finalized(
if let Err(err) = self.extra_requests.on_block_finalized(
hash,
number,
|base, block| protocol.client().is_descendent_of(base, block),
&|base, block| protocol.client().is_descendent_of(base, block),
) {
warn!(target: "sync", "Error cleaning up pending justification requests: {:?}", err);
warn!(target: "sync", "Error cleaning up pending extra data requests: {:?}", err);
};
}
@@ -916,7 +721,7 @@ impl<B: BlockT> ChainSync<B> {
pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: PeerId) {
self.blocks.clear_peer_download(&who);
self.peers.remove(&who);
self.justifications.peer_disconnected(who);
self.extra_requests.peer_disconnected(who);
self.maintain_sync(protocol);
}