GRANDPA finality proof draft (#1268)

* grandpa finality proof

* prove GrandpaApi::grandpa_authorities using parent block + some docs

* create justification when consensus data is changed

* generate justifications periodically

* test for ConsensusChanges
This commit is contained in:
Svyatoslav Nikolsky
2019-01-11 21:25:03 +03:00
committed by Gav Wood
parent 677b79765b
commit 616716cb4b
18 changed files with 786 additions and 113 deletions
+175 -37
View File
@@ -92,7 +92,7 @@ use codec::{Encode, Decode};
use consensus_common::{BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, Authorities};
use runtime_primitives::traits::{
NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT,
DigestItemFor, DigestItem,
DigestItemFor, DigestItem, As, Zero,
};
use fg_primitives::GrandpaApi;
use runtime_primitives::generic::BlockId;
@@ -116,6 +116,7 @@ pub use fg_primitives::ScheduledChange;
mod authorities;
mod communication;
mod finality_proof;
mod until_imported;
#[cfg(feature="service-integration")]
@@ -123,11 +124,14 @@ mod service_integration;
#[cfg(feature="service-integration")]
pub use service_integration::{LinkHalfForService, BlockImportForService};
pub use finality_proof::{prove_finality, check_finality_proof};
#[cfg(test)]
mod tests;
const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round";
const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters";
const CONSENSUS_CHANGES_KEY: &[u8] = b"grandpa_consensus_changes";
/// round-number, round-state
type LastCompleted<H, N> = (u64, RoundState<H, N>);
@@ -165,6 +169,10 @@ pub type CompactCommit<Block> = grandpa::CompactCommit<
pub struct Config {
/// The expected duration for a message to be gossiped across the network.
pub gossip_duration: Duration,
/// Justification generation period (in blocks). GRANDPA will try to generate justifications
/// at least every justification_period blocks. There are some other events which might cause
/// justification generation.
pub justification_period: u64,
/// The local signing key.
pub local_key: Option<Arc<ed25519::Pair>>,
/// Some local identifier of the voter.
@@ -304,12 +312,65 @@ impl<B, E, Block: BlockT<Hash=H256>, RA> BlockStatus<Block> for Arc<Client<B, E,
}
}
/// Consensus-related data changes tracker.
#[derive(Debug, Encode, Decode)]
struct ConsensusChanges<H, N> {
pending_changes: Vec<(N, H)>,
}
impl<H: Copy + PartialEq, N: Copy + Ord> ConsensusChanges<H, N> {
/// Create empty consensus changes.
pub fn empty() -> Self {
ConsensusChanges { pending_changes: Vec::new(), }
}
/// Note unfinalized change of consensus-related data.
pub fn note_change(&mut self, at: (N, H)) {
let idx = self.pending_changes
.binary_search_by_key(&at.0, |change| change.0)
.unwrap_or_else(|i| i);
self.pending_changes.insert(idx, at);
}
/// Finalize all pending consensus changes that are finalized by given block.
/// Returns true if there any changes were finalized.
pub fn finalize<F: Fn(N) -> ::client::error::Result<Option<H>>>(
&mut self,
block: (N, H),
canonical_at_height: F,
) -> ::client::error::Result<(bool, bool)> {
let (split_idx, has_finalized_changes) = self.pending_changes.iter()
.enumerate()
.take_while(|(_, &(at_height, _))| at_height <= block.0)
.fold((None, Ok(false)), |(_, has_finalized_changes), (idx, ref at)|
(
Some(idx),
has_finalized_changes
.and_then(|has_finalized_changes| if has_finalized_changes {
Ok(has_finalized_changes)
} else {
canonical_at_height(at.0).map(|can_hash| Some(at.1) == can_hash)
}),
));
let altered_changes = split_idx.is_some();
if let Some(split_idx) = split_idx {
self.pending_changes = self.pending_changes.split_off(split_idx + 1);
}
has_finalized_changes.map(|has_finalized_changes| (altered_changes, has_finalized_changes))
}
}
/// Thread-safe consensus changes tracker reference.
type SharedConsensusChanges<H, N> = Arc<parking_lot::Mutex<ConsensusChanges<H, N>>>;
/// The environment we run GRANDPA in.
struct Environment<B, E, Block: BlockT, N: Network, RA> {
inner: Arc<Client<B, E, Block, RA>>,
voters: Arc<HashMap<Ed25519AuthorityId, u64>>,
config: Config,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
network: N,
set_id: u64,
}
@@ -515,7 +576,15 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
}
fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>, round: u64, commit: Commit<Block>) -> Result<(), Self::Error> {
finalize_block(&*self.inner, &self.authority_set, hash, number, (round, commit).into())
finalize_block(
&*self.inner,
&self.authority_set,
&self.consensus_changes,
Some(As::sa(self.config.justification_period)),
hash,
number,
(round, commit).into(),
)
}
fn round_commit_timer(&self) -> Self::Timer {
@@ -615,21 +684,24 @@ impl<Block: BlockT<Hash=H256>> GrandpaJustification<Block> {
voters: &HashMap<Ed25519AuthorityId, u64>,
) -> Result<GrandpaJustification<Block>, ClientError> where
NumberFor<Block>: grandpa::BlockNumberOps,
{
GrandpaJustification::<Block>::decode(&mut &*encoded).ok_or_else(|| {
let msg = "failed to decode grandpa justification".to_string();
ClientErrorKind::BadJustification(msg).into()
}).and_then(|just| just.verify(set_id, voters).map(|_| just))
}
/// Validate the commit and the votes' ancestry proofs.
fn verify(&self, set_id: u64, voters: &HashMap<Ed25519AuthorityId, u64>) -> Result<(), ClientError>
where
NumberFor<Block>: grandpa::BlockNumberOps,
{
use grandpa::Chain;
let justification = match GrandpaJustification::decode(&mut &*encoded) {
Some(justification) => justification,
_ => {
let msg = "failed to decode grandpa justification".to_string();
return Err(ClientErrorKind::BadJustification(msg).into());
}
};
let ancestry_chain = AncestryChain::<Block>::new(&justification.votes_ancestries);
let ancestry_chain = AncestryChain::<Block>::new(&self.votes_ancestries);
match grandpa::validate_commit(
&justification.commit,
&self.commit,
voters,
None,
&ancestry_chain,
@@ -642,23 +714,23 @@ impl<Block: BlockT<Hash=H256>> GrandpaJustification<Block> {
}
let mut visited_hashes = HashSet::new();
for signed in justification.commit.precommits.iter() {
for signed in self.commit.precommits.iter() {
if let Err(_) = communication::check_message_sig::<Block>(
&grandpa::Message::Precommit(signed.precommit.clone()),
&signed.id,
&signed.signature,
justification.round,
self.round,
set_id,
) {
return Err(ClientErrorKind::BadJustification(
"invalid signature for precommit in grandpa justification".to_string()).into());
}
if justification.commit.target_hash == signed.precommit.target_hash {
if self.commit.target_hash == signed.precommit.target_hash {
continue;
}
match ancestry_chain.ancestry(justification.commit.target_hash, signed.precommit.target_hash) {
match ancestry_chain.ancestry(self.commit.target_hash, signed.precommit.target_hash) {
Ok(route) => {
// ancestry starts from parent hash but the precommit target hash has been visited
visited_hashes.insert(signed.precommit.target_hash);
@@ -673,7 +745,7 @@ impl<Block: BlockT<Hash=H256>> GrandpaJustification<Block> {
}
}
let ancestry_hashes = justification.votes_ancestries
let ancestry_hashes = self.votes_ancestries
.iter()
.map(|h: &Block::Header| h.hash())
.collect();
@@ -683,7 +755,7 @@ impl<Block: BlockT<Hash=H256>> GrandpaJustification<Block> {
"invalid precommit ancestries in grandpa justification with unused headers".to_string()).into());
}
Ok(justification)
Ok(())
}
}
@@ -710,6 +782,8 @@ impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationOrCommit<
fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
client: &Client<B, E, Block, RA>,
authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
consensus_changes: &SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
justification_period: Option<NumberFor<Block>>,
hash: Block::Hash,
number: NumberFor<Block>,
justification_or_commit: JustificationOrCommit<Block>,
@@ -720,6 +794,7 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
{
// lock must be held through writing to DB to avoid race
let mut authority_set = authority_set.inner().write();
let mut consensus_changes = consensus_changes.lock();
let status = authority_set.apply_changes(number, |canon_number| {
canonical_at_height(client, (hash, number), canon_number)
})?;
@@ -756,6 +831,20 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
}
}
// check if this is this is the first finalization of some consensus changes
let (alters_consensus_changes, finalizes_consensus_changes) = consensus_changes
.finalize((number, hash), |at_height| canonical_at_height(client, (hash, number), at_height))?;
if alters_consensus_changes {
let encoded = consensus_changes.encode();
let write_result = Backend::insert_aux(&**client.backend(), &[(CONSENSUS_CHANGES_KEY, &encoded[..])], &[]);
if let Err(e) = write_result {
warn!(target: "finality", "Failed to write updated consensus changes to disk. Bailing.");
warn!(target: "finality", "Node is in a potentially inconsistent state.");
return Err(e.into());
}
}
// NOTE: this code assumes that honest voters will never vote past a
// transition block, thus we don't have to worry about the case where
// we have a transition with `effective_block = N`, but we finalize
@@ -764,8 +853,26 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
// syncing clients.
let justification = match justification_or_commit {
JustificationOrCommit::Justification(justification) => Some(justification.encode()),
JustificationOrCommit::Commit((round_number, commit)) =>
if status.new_set_block.is_some() {
JustificationOrCommit::Commit((round_number, commit)) => {
let mut justification_required =
// justification is always required when block that enacts new authorities
// set is finalized
status.new_set_block.is_some() ||
// justification is required when consensus changes are finalized
finalizes_consensus_changes;
// justification is required every N blocks to be able to prove blocks
// finalization to remote nodes
if !justification_required {
if let Some(justification_period) = justification_period {
let last_finalized_number = client.info()?.chain.finalized_number;
justification_required = (!last_finalized_number.is_zero() ||
number - last_finalized_number == justification_period) &&
(last_finalized_number / justification_period != number / justification_period);
}
}
if justification_required {
let justification = GrandpaJustification::from_commit(
client,
round_number,
@@ -775,7 +882,8 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
Some(justification.encode())
} else {
None
},
}
},
};
debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);
@@ -822,6 +930,7 @@ pub struct GrandpaBlockImport<B, E, Block: BlockT<Hash=H256>, RA, PRA> {
inner: Arc<Client<B, E, Block, RA>>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
authority_set_change: mpsc::UnboundedSender<NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
api: Arc<PRA>,
}
@@ -909,6 +1018,7 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
// we don't want to finalize on `inner.import_block`
let justification = block.justification.take();
let enacts_consensus_change = new_authorities.is_some();
let import_result = self.inner.import_block(block, new_authorities).map_err(|e| {
if let Some((old_set, mut authorities)) = just_in_case {
debug!(target: "afg", "Restoring old set after block import error: {:?}", e);
@@ -918,22 +1028,17 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
});
let import_result = match import_result {
Ok(ImportResult::Queued) => ImportResult::Queued,
Ok(r) => return Ok(r),
Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
Ok(ImportResult::Queued) => ImportResult::Queued,
Ok(r) => return Ok(r),
Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
};
let enacts_change = self.authority_set.inner().read().enacts_change(number, |canon_number| {
canonical_at_height(&self.inner, (hash, number), canon_number)
});
}).map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))?;
match enacts_change {
Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
Ok(enacted) => {
if !enacted {
return Ok(import_result);
}
}
if !enacts_change && !enacts_consensus_change {
return Ok(import_result);
}
match justification {
@@ -952,6 +1057,8 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
let result = finalize_block(
&*self.inner,
&self.authority_set,
&self.consensus_changes,
None,
hash,
number,
justification.into(),
@@ -959,11 +1066,14 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
match result {
Ok(_) => {
unreachable!("returns Ok when no authority set change should be enacted; \
verified previously that finalizing the current block enacts a change; \
qed;");
assert!(!enacts_change, "returns Ok when no authority set change should be enacted; qed;");
},
Err(ExitOrError::AuthoritiesChanged(new)) => {
assert!(
enacts_change,
"returns AuthoritiesChanged when authority set change should be enacted; qed;"
);
debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number);
if let Err(e) = self.authority_set_change.unbounded_send(new) {
return Err(ConsensusErrorKind::ClientImport(e.to_string()).into());
@@ -981,8 +1091,20 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
}
},
None => {
trace!(target: "finality", "Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.", number);
}
if enacts_change {
trace!(
target: "finality",
"Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.",
number,
);
}
// we have imported block with consensus data changes, but without justification
// => remember to create justification when next block will be finalized
if enacts_consensus_change {
self.consensus_changes.lock().note_change((number, hash));
}
},
}
Ok(import_result)
@@ -1060,6 +1182,7 @@ pub struct LinkHalf<B, E, Block: BlockT<Hash=H256>, RA> {
client: Arc<Client<B, E, Block, RA>>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
authority_set_change: mpsc::UnboundedReceiver<NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
}
struct AncestryChain<Block: BlockT> {
@@ -1142,6 +1265,15 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
.into(),
};
let consensus_changes = Backend::get_aux(&**client.backend(), CONSENSUS_CHANGES_KEY)?;
let consensus_changes = Arc::new(parking_lot::Mutex::new(match consensus_changes {
Some(raw) => ConsensusChanges::decode(&mut &raw[..])
.ok_or_else(|| ::client::error::ErrorKind::Backend(
format!("GRANDPA consensus changes kept in invalid format")
))?,
None => ConsensusChanges::empty(),
}));
let (authority_set_change_tx, authority_set_change_rx) = mpsc::unbounded();
Ok((
@@ -1149,12 +1281,14 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
inner: client.clone(),
authority_set: authority_set.clone(),
authority_set_change: authority_set_change_tx,
consensus_changes: consensus_changes.clone(),
api
},
LinkHalf {
client,
authority_set,
authority_set_change: authority_set_change_rx,
consensus_changes,
},
))
}
@@ -1231,6 +1365,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
client,
authority_set,
authority_set_change,
consensus_changes,
} = link;
let chain_info = client.info()?;
@@ -1253,6 +1388,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
network: network.clone(),
set_id: authority_set.set_id(),
authority_set: authority_set.clone(),
consensus_changes: consensus_changes.clone(),
});
let initial_state = (initial_environment, last_round_number, last_state, authority_set_change.into_future());
@@ -1291,6 +1427,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
let config = config.clone();
let network = network.clone();
let authority_set = authority_set.clone();
let consensus_changes = consensus_changes.clone();
let trigger_authority_set_change = |new: NewAuthoritySet<_, _>, authority_set_change| {
let env = Arc::new(Environment {
@@ -1300,6 +1437,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
set_id: new.set_id,
network,
authority_set,
consensus_changes,
});
// start the new authority set using the block where the