Add stale branches heads to finality notifications (#10639)

* Add stale branches heads to finality notifications

Warning. Previous implementation was sending a notification for
each block between the previous (explicitly) finalized block and
the new finalized one (with an hardcoded limit of 256).

Now finality notification is sent only for the new finalized head and it
contains the hash of the new finalized head, new finalized head header,
a list of all the implicitly finalized blocks and a list of stale
branches heads (i.e. the branches heads that are not part of the
canonical chain anymore).

* Add implicitly finalized blocks list to `ChainEvent::Finalized` message

The list contains all the blocks between the previously finalized block
up to the parent of the currently finalized one, sorted by block number.

`Finalized` messages handler, part of the `MaintainedTransactionPool`
implementation for `BasicPool`, still propagate full set of finalized
blocks to the txpool by iterating over implicitly finalized blocks list.

* Rust fmt

* Greedy evaluation of `stale_heads` during finalization

* Fix outdated assumption in a comment

* Removed a test optimization that is no more relevant

The loop was there to prevent sending to
`peer.network.on_block_finalized` the full list of finalized blocks.

Now only the finalized heads are received.

* Last finalized block lookup not required anymore

* Tests for block finality notifications payloads

* Document a bit tricky condition to avoid duplicate finalization notifications

* More idiomatic way to skip an iterator entry

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Cargo fmt iteration

* Typo fix

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Fix potential failure when a finalized orphan block is imported

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Davide Galassi
2022-01-27 18:21:58 +01:00
committed by GitHub
parent 498c3a5ecc
commit 7f3bb8d0da
10 changed files with 303 additions and 111 deletions
+105 -57
View File
@@ -30,8 +30,8 @@ use rand::Rng;
use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
use sc_client_api::{
backend::{
self, apply_aux, BlockImportOperation, ClientImportOperation, Finalizer, ImportSummary,
LockImportRun, NewBlockState, StorageProvider,
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
ImportSummary, LockImportRun, NewBlockState, StorageProvider,
},
client::{
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
@@ -274,7 +274,7 @@ where
let mut op = ClientImportOperation {
op: self.backend.begin_operation()?,
notify_imported: None,
notify_finalized: Vec::new(),
notify_finalized: None,
};
let r = f(&mut op)?;
@@ -622,18 +622,6 @@ where
None
},
};
// Ensure parent chain is finalized to maintain invariant that
// finality is called sequentially. This will also send finality
// notifications for top 250 newly finalized blocks.
if finalized && parent_exists {
self.apply_finality_with_block_hash(
operation,
parent_hash,
None,
info.best_hash,
make_notifications,
)?;
}
operation.op.update_cache(new_cache);
storage_changes
@@ -641,6 +629,18 @@ where
None => None,
};
// Ensure parent chain is finalized to maintain invariant that finality is called
// sequentially.
if finalized && parent_exists {
self.apply_finality_with_block_hash(
operation,
parent_hash,
None,
info.best_hash,
make_notifications,
)?;
}
let is_new_best = !gap_block &&
(finalized ||
match fork_choice {
@@ -683,11 +683,36 @@ where
operation.op.insert_aux(aux)?;
// we only notify when we are already synced to the tip of the chain
// We only notify when we are already synced to the tip of the chain
// or if this import triggers a re-org
if make_notifications || tree_route.is_some() {
if finalized {
operation.notify_finalized.push(hash);
let mut summary = match operation.notify_finalized.take() {
Some(summary) => summary,
None => FinalizeSummary { finalized: Vec::new(), stale_heads: Vec::new() },
};
summary.finalized.push(hash);
if parent_exists {
// Add to the stale list all heads that are branching from parent besides our
// current `head`.
for head in self
.backend
.blockchain()
.leaves()?
.into_iter()
.filter(|h| *h != parent_hash)
{
let route_from_parent = sp_blockchain::tree_route(
self.backend.blockchain(),
parent_hash,
head,
)?;
if route_from_parent.retracted().is_empty() {
summary.stale_heads.push(head);
}
}
}
operation.notify_finalized = Some(summary);
}
operation.notify_imported = Some(ImportSummary {
@@ -831,58 +856,82 @@ where
operation.op.mark_finalized(BlockId::Hash(block), justification)?;
if notify {
// sometimes when syncing, tons of blocks can be finalized at once.
// we'll send notifications spuriously in that case.
const MAX_TO_NOTIFY: usize = 256;
let enacted = route_from_finalized.enacted();
let start = enacted.len() - std::cmp::min(enacted.len(), MAX_TO_NOTIFY);
for finalized in &enacted[start..] {
operation.notify_finalized.push(finalized.hash);
let finalized =
route_from_finalized.enacted().iter().map(|elem| elem.hash).collect::<Vec<_>>();
let last_finalized_number = self
.backend
.blockchain()
.number(last_finalized)?
.expect("Finalized block expected to be onchain; qed");
let mut stale_heads = Vec::new();
for head in self.backend.blockchain().leaves()? {
let route_from_finalized =
sp_blockchain::tree_route(self.backend.blockchain(), block, head)?;
let retracted = route_from_finalized.retracted();
let pivot = route_from_finalized.common_block();
// It is not guaranteed that `backend.blockchain().leaves()` doesn't return
// heads that were in a stale state before this finalization and thus already
// included in previous notifications. We want to skip such heads.
// Given the "route" from the currently finalized block to the head under
// analysis, the condition for it to be added to the new stale heads list is:
// `!retracted.is_empty() && last_finalized_number <= pivot.number`
// 1. "route" has some "retractions".
// 2. previously finalized block number is not greater than the "route" pivot:
// - if `last_finalized_number <= pivot.number` then this is a new stale head;
// - else the stale head was already included by some previous finalization.
if !retracted.is_empty() && last_finalized_number <= pivot.number {
stale_heads.push(head);
}
}
operation.notify_finalized = Some(FinalizeSummary { finalized, stale_heads });
}
Ok(())
}
fn notify_finalized(&self, notify_finalized: Vec<Block::Hash>) -> sp_blockchain::Result<()> {
fn notify_finalized(
&self,
notify_finalized: Option<FinalizeSummary<Block>>,
) -> sp_blockchain::Result<()> {
let mut sinks = self.finality_notification_sinks.lock();
if notify_finalized.is_empty() {
// cleanup any closed finality notification sinks
// since we won't be running the loop below which
// would also remove any closed sinks.
sinks.retain(|sink| !sink.is_closed());
let mut notify_finalized = match notify_finalized {
Some(notify_finalized) => notify_finalized,
None => {
// Cleanup any closed finality notification sinks
// since we won't be running the loop below which
// would also remove any closed sinks.
sinks.retain(|sink| !sink.is_closed());
return Ok(())
},
};
return Ok(())
}
let last = notify_finalized.finalized.pop().expect(
"At least one finalized block shall exist within a valid finalization summary; qed",
);
// We assume the list is sorted and only want to inform the
// telemetry once about the finalized block.
if let Some(last) = notify_finalized.last() {
let header = self.header(&BlockId::Hash(*last))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);
let header = self.header(&BlockId::Hash(last))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);
telemetry!(
self.telemetry;
SUBSTRATE_INFO;
"notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?last,
);
}
telemetry!(
self.telemetry;
SUBSTRATE_INFO;
"notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?last,
);
for finalized_hash in notify_finalized {
let header = self.header(&BlockId::Hash(finalized_hash))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);
let notification = FinalityNotification {
hash: last,
header,
tree_route: Arc::new(notify_finalized.finalized),
stale_heads: Arc::new(notify_finalized.stale_heads),
};
let notification = FinalityNotification { header, hash: finalized_hash };
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
Ok(())
}
@@ -901,7 +950,6 @@ where
// temporary leak of closed/discarded notification sinks (e.g.
// from consensus code).
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
return Ok(())
},
};
+3 -22
View File
@@ -34,10 +34,10 @@ mod client;
mod metrics;
mod task_manager;
use std::{collections::HashMap, io, net::SocketAddr, pin::Pin, task::Poll};
use std::{collections::HashMap, io, net::SocketAddr, pin::Pin};
use codec::{Decode, Encode};
use futures::{stream, Future, FutureExt, Stream, StreamExt};
use futures::{Future, FutureExt, StreamExt};
use log::{debug, error, warn};
use sc_network::PeerId;
use sc_utils::mpsc::TracingUnboundedReceiver;
@@ -152,26 +152,7 @@ async fn build_network_future<
let starting_block = client.info().best_number;
// Stream of finalized blocks reported by the client.
let mut finality_notification_stream = {
let mut finality_notification_stream = client.finality_notification_stream().fuse();
// We tweak the `Stream` in order to merge together multiple items if they happen to be
// ready. This way, we only get the latest finalized block.
stream::poll_fn(move |cx| {
let mut last = None;
while let Poll::Ready(Some(item)) =
Pin::new(&mut finality_notification_stream).poll_next(cx)
{
last = Some(item);
}
if let Some(last) = last {
Poll::Ready(Some(last))
} else {
Poll::Pending
}
})
.fuse()
};
let mut finality_notification_stream = client.finality_notification_stream().fuse();
loop {
futures::select! {