mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-01 02:11:01 +00:00
client/finality-grandpa/src/until_imported: Refactor BlockGlobalMessage (#5390)
* client/finality-grandpa/src/until_imported: Refactor BlockGlobalMessage `BlockGlobalMessage` owns an `inner` which contains (1) a count for the amount of outstanding blocks to be waited on and (2) the message itself. Given that both is already wrapped in an `Arc` there is no need to keep track of the outstanding blocks, given that it simply corresponds to the amount of strong reference counts on the `Arc` itself. This commit removes the atomic counter within `inner` and piggy backs on the `Arc` reference counter instead. * client/finality-grandpa/src/until_imported: Remove useless match * client/finality-grandpa/src/until_imported.rs: Remove unused var Co-Authored-By: André Silva <andre.beat@gmail.com> * client/finality-grandpa/src/until_imported: Address comment suggestion Co-authored-by: André Silva <andre.beat@gmail.com>
This commit is contained in:
@@ -40,7 +40,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
|||||||
|
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use sp_finality_grandpa::AuthorityId;
|
use sp_finality_grandpa::AuthorityId;
|
||||||
@@ -307,8 +307,12 @@ pub(crate) type UntilVoteTargetImported<Block, BlockStatus, BlockSyncRequester,
|
|||||||
///
|
///
|
||||||
/// This is used for compact commits and catch up messages which have already
|
/// This is used for compact commits and catch up messages which have already
|
||||||
/// been checked for structural soundness (e.g. valid signatures).
|
/// been checked for structural soundness (e.g. valid signatures).
|
||||||
|
///
|
||||||
|
/// We use the `Arc`'s reference count to implicitly count the number of outstanding blocks that we
|
||||||
|
/// are waiting on for the same message (i.e. other `BlockGlobalMessage` instances with the same
|
||||||
|
/// `inner`).
|
||||||
pub(crate) struct BlockGlobalMessage<Block: BlockT> {
|
pub(crate) struct BlockGlobalMessage<Block: BlockT> {
|
||||||
inner: Arc<(AtomicUsize, Mutex<Option<CommunicationIn<Block>>>)>,
|
inner: Arc<Mutex<Option<CommunicationIn<Block>>>>,
|
||||||
target_number: NumberFor<Block>,
|
target_number: NumberFor<Block>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -416,7 +420,7 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
|
|||||||
return Ok(())
|
return Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
let locked_global = Arc::new((AtomicUsize::new(unknown_count), Mutex::new(Some(input))));
|
let locked_global = Arc::new(Mutex::new(Some(input)));
|
||||||
|
|
||||||
// schedule waits for all unknown messages.
|
// schedule waits for all unknown messages.
|
||||||
// when the last one of these has `wait_completed` called on it,
|
// when the last one of these has `wait_completed` called on it,
|
||||||
@@ -438,30 +442,20 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
|
|||||||
|
|
||||||
fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
|
fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
|
||||||
if self.target_number != canon_number {
|
if self.target_number != canon_number {
|
||||||
// if we return without deducting the counter, then none of the other
|
// Delete the inner message so it won't ever be forwarded. Future calls to
|
||||||
// handles can return the commit message.
|
// `wait_completed` on the same `inner` will ignore it.
|
||||||
|
*self.inner.lock() = None;
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut last_count = self.inner.0.load(Ordering::Acquire);
|
match Arc::try_unwrap(self.inner) {
|
||||||
|
// This is the last reference and thus the last outstanding block to be awaited. `inner`
|
||||||
// CAS loop to ensure that we always have a last reader.
|
// is either `Some(_)` or `None`. The latter implies that a previous `wait_completed`
|
||||||
loop {
|
// call witnessed a block number mismatch (see above).
|
||||||
if last_count == 1 { // we are the last one left.
|
Ok(inner) => Mutex::into_inner(inner),
|
||||||
return self.inner.1.lock().take();
|
// There are still other strong references to this `Arc`, thus the message is blocked on
|
||||||
}
|
// other blocks to be imported.
|
||||||
|
Err(_) => None,
|
||||||
let prev_value = self.inner.0.compare_and_swap(
|
|
||||||
last_count,
|
|
||||||
last_count - 1,
|
|
||||||
Ordering::SeqCst,
|
|
||||||
);
|
|
||||||
|
|
||||||
if prev_value == last_count {
|
|
||||||
return None;
|
|
||||||
} else {
|
|
||||||
last_count = prev_value;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -941,4 +935,67 @@ mod tests {
|
|||||||
|
|
||||||
futures::executor::block_on(test);
|
futures::executor::block_on(test);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn test_catch_up() -> Arc<Mutex<Option<CommunicationIn<Block>>>> {
|
||||||
|
let header = make_header(5);
|
||||||
|
|
||||||
|
let unknown_catch_up = finality_grandpa::CatchUp {
|
||||||
|
round_number: 1,
|
||||||
|
precommits: vec![],
|
||||||
|
prevotes: vec![],
|
||||||
|
base_hash: header.hash(),
|
||||||
|
base_number: *header.number(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let catch_up = voter::CommunicationIn::CatchUp(
|
||||||
|
unknown_catch_up.clone(),
|
||||||
|
voter::Callback::Blank,
|
||||||
|
);
|
||||||
|
|
||||||
|
Arc::new(Mutex::new(Some(catch_up)))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_global_message_wait_completed_return_when_all_awaited() {
|
||||||
|
let msg_inner = test_catch_up();
|
||||||
|
|
||||||
|
let waiting_block_1 = BlockGlobalMessage::<Block> {
|
||||||
|
inner: msg_inner.clone(),
|
||||||
|
target_number: 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
let waiting_block_2 = BlockGlobalMessage::<Block> {
|
||||||
|
inner: msg_inner,
|
||||||
|
target_number: 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
// waiting_block_2 is still waiting for block 2, thus this should return `None`.
|
||||||
|
assert!(waiting_block_1.wait_completed(1).is_none());
|
||||||
|
|
||||||
|
// Message only depended on block 1 and 2. Both have been imported, thus this should yield
|
||||||
|
// the message.
|
||||||
|
assert!(waiting_block_2.wait_completed(2).is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_global_message_wait_completed_return_none_on_block_number_missmatch() {
|
||||||
|
let msg_inner = test_catch_up();
|
||||||
|
|
||||||
|
let waiting_block_1 = BlockGlobalMessage::<Block> {
|
||||||
|
inner: msg_inner.clone(),
|
||||||
|
target_number: 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
let waiting_block_2 = BlockGlobalMessage::<Block> {
|
||||||
|
inner: msg_inner,
|
||||||
|
target_number: 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Calling wait_completed with wrong block number should yield None.
|
||||||
|
assert!(waiting_block_1.wait_completed(1234).is_none());
|
||||||
|
|
||||||
|
// All blocks, that the message depended on, have been imported. Still, given the above
|
||||||
|
// block number mismatch this should return None.
|
||||||
|
assert!(waiting_block_2.wait_completed(2).is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user