grandpa: always create and send justification if there are any subscribers (#6935)

* grandpa: use bytes type for justification rpc notification

* grandpa: always create justification if there are rpc subscribers

* grandpa: wording

* grandpa: replace notify_justification macro with function

* grandpa: prefer Option<&T> over &Option<T>
This commit is contained in:
André Silva
2020-08-24 14:29:17 +01:00
committed by GitHub
parent 4462f7150d
commit e05055c91c
7 changed files with 74 additions and 44 deletions
@@ -8,9 +8,10 @@ edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0" license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies] [dependencies]
sc-rpc = { version = "2.0.0-rc6", path = "../../rpc" }
sp-runtime = { version = "2.0.0-rc6", path = "../../../primitives/runtime" }
sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" } sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" }
sc-rpc = { version = "2.0.0-rc6", path = "../../rpc" }
sp-core = { version = "2.0.0-rc6", path = "../../../primitives/core" }
sp-runtime = { version = "2.0.0-rc6", path = "../../../primitives/runtime" }
finality-grandpa = { version = "0.12.3", features = ["derive-codec"] } finality-grandpa = { version = "0.12.3", features = ["derive-codec"] }
jsonrpc-core = "14.2.0" jsonrpc-core = "14.2.0"
jsonrpc-core-client = "14.2.0" jsonrpc-core-client = "14.2.0"
@@ -406,7 +406,7 @@ mod tests {
// Notify with a header and justification // Notify with a header and justification
let justification = create_justification(); let justification = create_justification();
let _ = justification_sender.notify(justification.clone()).unwrap(); justification_sender.notify(|| Ok(justification.clone())).unwrap();
// Inspect what we received // Inspect what we received
let recv = receiver.take(1).wait().flatten().collect::<Vec<_>>(); let recv = receiver.take(1).wait().flatten().collect::<Vec<_>>();
@@ -418,7 +418,7 @@ mod tests {
let recv_sub_id: String = let recv_sub_id: String =
serde_json::from_value(json_map["subscription"].take()).unwrap(); serde_json::from_value(json_map["subscription"].take()).unwrap();
let recv_justification: Vec<u8> = let recv_justification: sp_core::Bytes =
serde_json::from_value(json_map["result"].take()).unwrap(); serde_json::from_value(json_map["result"].take()).unwrap();
let recv_justification: GrandpaJustification<Block> = let recv_justification: GrandpaJustification<Block> =
Decode::decode(&mut &recv_justification[..]).unwrap(); Decode::decode(&mut &recv_justification[..]).unwrap();
@@ -23,10 +23,10 @@ use sc_finality_grandpa::GrandpaJustification;
/// An encoded justification proving that the given header has been finalized /// An encoded justification proving that the given header has been finalized
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct JustificationNotification(Vec<u8>); pub struct JustificationNotification(sp_core::Bytes);
impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationNotification { impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationNotification {
fn from(notification: GrandpaJustification<Block>) -> Self { fn from(notification: GrandpaJustification<Block>) -> Self {
JustificationNotification(notification.encode()) JustificationNotification(notification.encode().into())
} }
} }
@@ -645,7 +645,8 @@ pub(crate) fn ancestry<Block: BlockT, Client>(
client: &Arc<Client>, client: &Arc<Client>,
base: Block::Hash, base: Block::Hash,
block: Block::Hash, block: Block::Hash,
) -> Result<Vec<Block::Hash>, GrandpaError> where ) -> Result<Vec<Block::Hash>, GrandpaError>
where
Client: HeaderMetadata<Block, Error = sp_blockchain::Error>, Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
{ {
if base == block { return Err(GrandpaError::NotDescendent) } if base == block { return Err(GrandpaError::NotDescendent) }
@@ -671,15 +672,14 @@ pub(crate) fn ancestry<Block: BlockT, Client>(
Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect()) Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
} }
impl<B, Block: BlockT, C, N, SC, VR> impl<B, Block: BlockT, C, N, SC, VR> voter::Environment<Block::Hash, NumberFor<Block>>
voter::Environment<Block::Hash, NumberFor<Block>> for Environment<B, Block, C, N, SC, VR>
for Environment<B, Block, C, N, SC, VR>
where where
Block: 'static, Block: 'static,
B: Backend<Block>, B: Backend<Block>,
C: crate::ClientForGrandpa<Block, B> + 'static, C: crate::ClientForGrandpa<Block, B> + 'static,
C::Api: GrandpaApi<Block, Error = sp_blockchain::Error>, C::Api: GrandpaApi<Block, Error = sp_blockchain::Error>,
N: NetworkT<Block> + 'static + Send + Sync, N: NetworkT<Block> + 'static + Send + Sync,
SC: SelectChain<Block> + 'static, SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, C>, VR: VotingRule<Block, C>,
NumberFor<Block>: BlockNumberOps, NumberFor<Block>: BlockNumberOps,
@@ -1023,7 +1023,7 @@ where
number, number,
(round, commit).into(), (round, commit).into(),
false, false,
&self.justification_sender, self.justification_sender.as_ref(),
) )
} }
@@ -1088,9 +1088,10 @@ pub(crate) fn finalize_block<BE, Block, Client>(
number: NumberFor<Block>, number: NumberFor<Block>,
justification_or_commit: JustificationOrCommit<Block>, justification_or_commit: JustificationOrCommit<Block>,
initial_sync: bool, initial_sync: bool,
justification_sender: &Option<GrandpaJustificationSender<Block>>, justification_sender: Option<&GrandpaJustificationSender<Block>>,
) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>> where ) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>
Block: BlockT, where
Block: BlockT,
BE: Backend<Block>, BE: Backend<Block>,
Client: crate::ClientForGrandpa<Block, BE>, Client: crate::ClientForGrandpa<Block, BE>,
{ {
@@ -1154,6 +1155,18 @@ pub(crate) fn finalize_block<BE, Block, Client>(
} }
} }
// send a justification notification if a sender exists and in case of error log it.
fn notify_justification<Block: BlockT>(
justification_sender: Option<&GrandpaJustificationSender<Block>>,
justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
) {
if let Some(sender) = justification_sender {
if let Err(err) = sender.notify(justification) {
warn!(target: "afg", "Error creating justification for subscriber: {:?}", err);
}
}
}
// NOTE: this code assumes that honest voters will never vote past a // NOTE: this code assumes that honest voters will never vote past a
// transition block, thus we don't have to worry about the case where // transition block, thus we don't have to worry about the case where
// we have a transition with `effective_block = N`, but we finalize // we have a transition with `effective_block = N`, but we finalize
@@ -1161,7 +1174,10 @@ pub(crate) fn finalize_block<BE, Block, Client>(
// justifications for transition blocks which will be requested by // justifications for transition blocks which will be requested by
// syncing clients. // syncing clients.
let justification = match justification_or_commit { let justification = match justification_or_commit {
JustificationOrCommit::Justification(justification) => Some(justification), JustificationOrCommit::Justification(justification) => {
notify_justification(justification_sender, || Ok(justification.clone()));
Some(justification.encode())
},
JustificationOrCommit::Commit((round_number, commit)) => { JustificationOrCommit::Commit((round_number, commit)) => {
let mut justification_required = let mut justification_required =
// justification is always required when block that enacts new authorities // justification is always required when block that enacts new authorities
@@ -1181,29 +1197,31 @@ pub(crate) fn finalize_block<BE, Block, Client>(
} }
} }
if justification_required { // NOTE: the code below is a bit more verbose because we
let justification = GrandpaJustification::from_commit( // really want to avoid creating a justification if it isn't
&client, // needed (e.g. if there's no subscribers), and also to avoid
round_number, // creating it twice. depending on the vote tree for the round,
commit, // creating a justification might require multiple fetches of
)?; // headers from the database.
let justification = || GrandpaJustification::from_commit(
&client,
round_number,
commit,
);
Some(justification) if justification_required {
let justification = justification()?;
notify_justification(justification_sender, || Ok(justification.clone()));
Some(justification.encode())
} else { } else {
notify_justification(justification_sender, justification);
None None
} }
}, },
}; };
// Notify any registered listeners in case we have a justification
if let Some(sender) = justification_sender {
if let Some(ref justification) = justification {
let _ = sender.notify(justification.clone());
}
}
let justification = justification.map(|j| j.encode());
debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash); debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);
// ideally some handle to a synchronization oracle would be used // ideally some handle to a synchronization oracle would be used
@@ -619,7 +619,6 @@ where
Client: crate::ClientForGrandpa<Block, BE>, Client: crate::ClientForGrandpa<Block, BE>,
NumberFor<Block>: finality_grandpa::BlockNumberOps, NumberFor<Block>: finality_grandpa::BlockNumberOps,
{ {
/// Import a block justification and finalize the block. /// Import a block justification and finalize the block.
/// ///
/// If `enacts_change` is set to true, then finalizing this block *must* /// If `enacts_change` is set to true, then finalizing this block *must*
@@ -653,7 +652,7 @@ where
number, number,
justification.into(), justification.into(),
initial_sync, initial_sync,
&Some(self.justification_sender.clone()), Some(&self.justification_sender),
); );
match result { match result {
@@ -20,9 +20,10 @@ use std::sync::Arc;
use parking_lot::Mutex; use parking_lot::Mutex;
use sp_runtime::traits::Block as BlockT; use sp_runtime::traits::Block as BlockT;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use crate::justification::GrandpaJustification; use crate::justification::GrandpaJustification;
use crate::Error;
// Stream of justifications returned when subscribing. // Stream of justifications returned when subscribing.
type JustificationStream<Block> = TracingUnboundedReceiver<GrandpaJustification<Block>>; type JustificationStream<Block> = TracingUnboundedReceiver<GrandpaJustification<Block>>;
@@ -54,10 +55,22 @@ impl<Block: BlockT> GrandpaJustificationSender<Block> {
/// Send out a notification to all subscribers that a new justification /// Send out a notification to all subscribers that a new justification
/// is available for a block. /// is available for a block.
pub fn notify(&self, notification: GrandpaJustification<Block>) -> Result<(), ()> { pub fn notify(
self.subscribers.lock().retain(|n| { &self,
!n.is_closed() && n.unbounded_send(notification.clone()).is_ok() justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
}); ) -> Result<(), Error> {
let mut subscribers = self.subscribers.lock();
// do an initial prune on closed subscriptions
subscribers.retain(|n| !n.is_closed());
// if there's no subscribers we avoid creating
// the justification which is a costly operation
if !subscribers.is_empty() {
let justification = justification()?;
subscribers.retain(|n| n.unbounded_send(justification.clone()).is_ok());
}
Ok(()) Ok(())
} }
} }
@@ -74,11 +74,10 @@ fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
last_finalized_number: NumberFor<Block>, last_finalized_number: NumberFor<Block>,
commits: S, commits: S,
note_round: F, note_round: F,
) -> impl Future<Output=Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> where ) -> impl Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>>
where
NumberFor<Block>: BlockNumberOps, NumberFor<Block>: BlockNumberOps,
S: Stream< S: Stream<Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>>,
Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>,
>,
F: Fn(u64), F: Fn(u64),
BE: Backend<Block>, BE: Backend<Block>,
Client: crate::ClientForGrandpa<Block, BE>, Client: crate::ClientForGrandpa<Block, BE>,
@@ -130,7 +129,7 @@ fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
finalized_number, finalized_number,
(round, commit).into(), (round, commit).into(),
false, false,
&justification_sender, justification_sender.as_ref(),
) { ) {
Ok(_) => {}, Ok(_) => {},
Err(e) => return future::err(e), Err(e) => return future::err(e),