core/finality-grandpa: Request block sync from network after import timeout (#3800)

* core/finality-grandpa: Pass Grandpa msg sender up to UntilImported

* core/finality-grandpa: Track senders to maybe later request blocks

* core/finality-grandpa: Make BlockStatus pub only within crate

* core/finality-grandpa: Abstract NetworkBridge with BlockSyncRequester

* core/finality-grandpa: Pass BlockSyncRequester to UntilImported

* core/finality-grandpa: Track block number of pending within UntilImported

* core/finality-grandpa: Request block sync on long wait

* core/finality-grandpa: Adjust unit tests to previous changes

* core/finality-grandpa: Fix line length

* core/finality-grandpa: Add comment explaining in & out vote combination

* core/finality-grandpa: Log after, not before, timeout expired

The UntilImported component should log whenever waiting for a specific
block to be imported surpassed a defined constant timeout. Without this
patch the code would log whenever the current time was below the
timeout.

* core/finality-grandpa: Collect senders as HashSet for deduplication

* Revert "core/finality-grandpa: Track senders to maybe later request blocks"

This reverts commit 61ac9dd715612d5fdbf7b8f00b84e450f282ade0.

* Revert "core/finality-grandpa: Pass Grandpa msg sender up to UntilImported"

This reverts commit afdc9646a6c314f99a9d19242f1878f85980e70d.

* core/network/sync: Ask for block from all peers if none provided

When requesting an explicit fork sync, try to sync from all known peers,
when no specific peers were provided.

* core/network/sync: Request specific fork sync from peers ahead or on par

When making an explicit fork sync request without specifying any peers,
make sure to only request it from the locally known peers that are
either ahead or on a par compared to the block number we are looking
for.

* grandpa: fix tests

* grandpa: fix warnings

* grandpa: add test for block sync request on until_imported

* grandpa: rename Environment field inner to client

* grandpa: fix minor nits

* grandpa: minor nits in until_imported

* grandpa: copy docs for set_sync_fork_request

* grandpa: remove stale TODO on UntilImported
This commit is contained in:
Max Inden
2019-10-24 17:01:14 +02:00
committed by André Silva
parent 002057dcc5
commit 743a34bc1d
7 changed files with 258 additions and 66 deletions
@@ -39,7 +39,7 @@ use network::{consensus_gossip as network_gossip, NetworkService};
use network_gossip::ConsensusMessage;
use codec::{Encode, Decode};
use primitives::Pair;
use sr_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use sr_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO};
use tokio_executor::Executor;
@@ -129,6 +129,14 @@ pub trait Network<Block: BlockT>: Clone + Send + 'static {
/// Inform peers that a block with given hash should be downloaded.
fn announce(&self, block: Block::Hash, associated_data: Vec<u8>);
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}
/// Create a unique topic for a round and set-id combo.
@@ -216,6 +224,10 @@ impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
self.announce_block(block, associated_data)
}
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
NetworkService::set_sync_fork_request(self, peers, hash, number)
}
}
/// A stream used by NetworkBridge in its implementation of Network. Given a oneshot that eventually returns a channel
@@ -468,6 +480,9 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
format!("Failed to receive on unbounded receiver for round {}", round.0)
));
// Combine incoming votes from external GRANDPA nodes with outgoing
// votes from our own GRANDPA voter to have a single
// vote-import-pipeline.
let incoming = incoming.select(out_rx);
(incoming, outgoing)
@@ -514,6 +529,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
(incoming, outgoing)
}
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
pub(crate) fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
self.service.set_sync_fork_request(peers, hash, number)
}
}
fn incoming_global<B: BlockT, N: Network<B>>(
@@ -25,6 +25,7 @@ use tokio::runtime::current_thread;
use std::sync::Arc;
use keyring::Ed25519Keyring;
use codec::Encode;
use sr_primitives::traits::NumberFor;
use crate::environment::SharedVoterSetState;
use super::gossip::{self, GossipValidator};
@@ -91,6 +92,9 @@ impl super::Network<Block> for TestNetwork {
fn announce(&self, block: Hash, _associated_data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::Announce(block));
}
/// Notify the sync service to try syncing the given chain.
fn set_sync_fork_request(&self, _peers: Vec<network::PeerId>, _hash: Hash, _number: NumberFor<Block>) {}
}
impl network_gossip::ValidatorContext<Block> for TestNetwork {
@@ -370,7 +370,7 @@ impl<Block: BlockT> SharedVoterSetState<Block> {
/// The environment we run GRANDPA in.
pub(crate) struct Environment<B, E, Block: BlockT, N: Network<Block>, RA, SC, VR> {
pub(crate) inner: Arc<Client<B, E, Block, RA>>,
pub(crate) client: Arc<Client<B, E, Block, RA>>,
pub(crate) select_chain: SC,
pub(crate) voters: Arc<VoterSet<AuthorityId>>,
pub(crate) config: Config,
@@ -413,7 +413,7 @@ where
NumberFor<Block>: BlockNumberOps,
{
fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result<Vec<Block::Hash>, GrandpaError> {
ancestry(&self.inner, base, block)
ancestry(&self.client, base, block)
}
fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
@@ -434,7 +434,7 @@ where
match self.select_chain.finality_target(block, None) {
Ok(Some(best_hash)) => {
let base_header = self.inner.header(&BlockId::Hash(block)).ok()?
let base_header = self.client.header(&BlockId::Hash(block)).ok()?
.expect("Header known to exist after `best_containing` call; qed");
if let Some(limit) = limit {
@@ -449,7 +449,7 @@ where
}
}
let best_header = self.inner.header(&BlockId::Hash(best_hash)).ok()?
let best_header = self.client.header(&BlockId::Hash(best_hash)).ok()?
.expect("Header known to exist after `best_containing` call; qed");
// check if our vote is currently being limited due to a pending change
@@ -473,7 +473,7 @@ where
break;
}
target_header = self.inner.header(&BlockId::Hash(*target_header.parent_hash())).ok()?
target_header = self.client.header(&BlockId::Hash(*target_header.parent_hash())).ok()?
.expect("Header known to exist after `best_containing` call; qed");
}
@@ -492,7 +492,7 @@ where
// authority set limit filter, which can be considered a
// mandatory/implicit voting rule.
self.voting_rule
.restrict_vote(&*self.inner, &base_header, &best_header, target_header)
.restrict_vote(&*self.client, &base_header, &best_header, target_header)
.or(Some((target_header.hash(), *target_header.number())))
},
Ok(None) => {
@@ -601,8 +601,9 @@ where
// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
let incoming = Box::new(UntilVoteTargetImported::new(
self.inner.import_notification_stream(),
self.inner.clone(),
self.client.import_notification_stream(),
self.network.clone(),
self.client.clone(),
incoming,
"round",
).map_err(Into::into));
@@ -650,7 +651,7 @@ where
current_rounds,
};
crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?;
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
@@ -691,7 +692,7 @@ where
current_rounds,
};
crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?;
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
@@ -742,7 +743,7 @@ where
current_rounds,
};
crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?;
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
@@ -800,7 +801,7 @@ where
current_rounds,
};
crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?;
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
@@ -816,7 +817,7 @@ where
commit: Commit<Block>,
) -> Result<(), Self::Error> {
finalize_block(
&*self.inner,
&*self.client,
&self.authority_set,
&self.consensus_changes,
Some(self.config.justification_period.into()),
+33 -12
View File
@@ -22,11 +22,11 @@
//!
//! # Usage
//!
//! First, create a block-import wrapper with the `block_import` function.
//! The GRANDPA worker needs to be linked together with this block import object,
//! so a `LinkHalf` is returned as well. All blocks imported (from network or consensus or otherwise)
//! must pass through this wrapper, otherwise consensus is likely to break in
//! unexpected ways.
//! First, create a block-import wrapper with the `block_import` function. The
//! GRANDPA worker needs to be linked together with this block import object, so
//! a `LinkHalf` is returned as well. All blocks imported (from network or
//! consensus or otherwise) must pass through this wrapper, otherwise consensus
//! is likely to break in unexpected ways.
//!
//! Next, use the `LinkHalf` and a local configuration to `run_grandpa_voter`.
//! This requires a `Network` implementation. The returned future should be
@@ -242,7 +242,7 @@ impl From<ClientError> for Error {
}
/// Something which can determine if a block is known.
pub trait BlockStatus<Block: BlockT> {
pub(crate) trait BlockStatus<Block: BlockT> {
/// Return `Ok(Some(number))` or `Ok(None)` depending on whether the block
/// is definitely known and has been imported.
/// If an unexpected error occurs, return that.
@@ -261,6 +261,26 @@ impl<B, E, Block: BlockT<Hash=H256>, RA> BlockStatus<Block> for Arc<Client<B, E,
}
}
/// Something that one can ask to do a block sync request.
pub(crate) trait BlockSyncRequester<Block: BlockT> {
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}
impl<Block, N> BlockSyncRequester<Block> for NetworkBridge<Block, N> where
Block: BlockT,
N: communication::Network<Block>,
{
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>) {
NetworkBridge::set_sync_fork_request(self, peers, hash, number)
}
}
/// A new authority set along with the canonical block it changed at.
#[derive(Debug)]
pub(crate) struct NewAuthoritySet<H, N> {
@@ -429,6 +449,7 @@ fn global_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
// block commit and catch up messages until relevant blocks are imported.
let global_in = UntilGlobalMessageBlocksImported::new(
client.import_notification_stream(),
network.clone(),
client.clone(),
global_in,
"global",
@@ -617,7 +638,7 @@ where
let voters = persistent_data.authority_set.current_authorities();
let env = Arc::new(Environment {
inner: client,
client,
select_chain,
voting_rule,
voters: Arc::new(voters),
@@ -656,7 +677,7 @@ where
"authority_id" => authority_id.to_string(),
);
let chain_info = self.env.inner.info();
let chain_info = self.env.client.info();
telemetry!(CONSENSUS_INFO; "afg.authority_set";
"number" => ?chain_info.chain.finalized_number,
"hash" => ?chain_info.chain.finalized_hash,
@@ -680,7 +701,7 @@ where
let global_comms = global_communication(
self.env.set_id,
&self.env.voters,
&self.env.inner,
&self.env.client,
&self.env.network,
&self.env.config.keystore,
);
@@ -728,7 +749,7 @@ where
(new.canon_hash, new.canon_number),
);
aux_schema::write_voter_set_state(&*self.env.inner, &set_state)?;
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;
@@ -737,7 +758,7 @@ where
set_id: new.set_id,
voter_set_state: self.env.voter_set_state.clone(),
// Fields below are simply transferred and not updated.
inner: self.env.inner.clone(),
client: self.env.client.clone(),
select_chain: self.env.select_chain.clone(),
config: self.env.config.clone(),
authority_set: self.env.authority_set.clone(),
@@ -757,7 +778,7 @@ where
let completed_rounds = voter_set_state.completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
aux_schema::write_voter_set_state(&*self.env.inner, &set_state)?;
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;
+1 -1
View File
@@ -1556,7 +1556,7 @@ fn grandpa_environment_respects_voting_rules() {
authority_set: authority_set.clone(),
config: config.clone(),
consensus_changes: consensus_changes.clone(),
inner: link.client.clone(),
client: link.client.clone(),
select_chain: link.select_chain.clone(),
set_id: authority_set.set_id(),
voter_set_state: set_state.clone(),
@@ -20,7 +20,13 @@
//!
//! This is used for votes and commit messages currently.
use super::{BlockStatus, CommunicationIn, Error, SignedMessage};
use super::{
BlockStatus as BlockStatusT,
BlockSyncRequester as BlockSyncRequesterT,
CommunicationIn,
Error,
SignedMessage,
};
use log::{debug, warn};
use client::{BlockImportNotification, ImportNotifications};
@@ -54,8 +60,8 @@ pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
wait: Wait,
ready: Ready,
) -> Result<(), Error> where
S: BlockStatus<Block>,
Wait: FnMut(Block::Hash, Self),
S: BlockStatusT<Block>,
Wait: FnMut(Block::Hash, NumberFor<Block>, Self),
Ready: FnMut(Self::Blocked);
/// called when the wait has completed. The canonical number is passed through
@@ -64,23 +70,31 @@ pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
}
/// Buffering imported messages until blocks with given hashes are imported.
pub(crate) struct UntilImported<Block: BlockT, Status, I, M: BlockUntilImported<Block>> {
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> {
import_notifications: Fuse<Box<dyn Stream<Item = BlockImportNotification<Block>, Error = ()> + Send>>,
status_check: Status,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
inner: Fuse<I>,
ready: VecDeque<M::Blocked>,
check_pending: Interval,
pending: HashMap<Block::Hash, (Instant, Vec<M>)>,
/// Mapping block hashes to their block number, the point in time it was
/// first encountered (Instant) and a list of GRANDPA messages referencing
/// the block hash.
pending: HashMap<Block::Hash, (NumberFor<Block>, Instant, Vec<M>)>,
identifier: &'static str,
}
impl<Block: BlockT, Status, I: Stream, M> UntilImported<Block, Status, I, M>
where Status: BlockStatus<Block>, M: BlockUntilImported<Block>
impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
Block: BlockT,
BlockStatus: BlockStatusT<Block>,
M: BlockUntilImported<Block>,
I: Stream,
{
/// Create a new `UntilImported` wrapper.
pub(crate) fn new(
import_notifications: ImportNotifications<Block>,
status_check: Status,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
stream: I,
identifier: &'static str,
) -> Self {
@@ -98,6 +112,7 @@ impl<Block: BlockT, Status, I: Stream, M> UntilImported<Block, Status, I, M>
let stream = import_notifications.map::<_, fn(_) -> _>(|v| Ok::<_, ()>(v)).compat();
Box::new(stream) as Box<dyn Stream<Item = _, Error = _> + Send>
}.fuse(),
block_sync_requester,
status_check,
inner: stream.fuse(),
ready: VecDeque::new(),
@@ -108,8 +123,10 @@ impl<Block: BlockT, Status, I: Stream, M> UntilImported<Block, Status, I, M>
}
}
impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M> where
Status: BlockStatus<Block>,
impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStatus, BSyncRequester, I, M> where
Block: BlockT,
BStatus: BlockStatusT<Block>,
BSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item=M::Blocked,Error=Error>,
M: BlockUntilImported<Block>,
{
@@ -128,10 +145,10 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
M::schedule_wait(
input,
&self.status_check,
|target_hash, wait| pending
|target_hash, target_number, wait| pending
.entry(target_hash)
.or_insert_with(|| (Instant::now(), Vec::new()))
.1
.or_insert_with(|| (target_number, Instant::now(), Vec::new()))
.2
.push(wait),
|ready_item| ready.push_back(ready_item),
)?;
@@ -146,7 +163,7 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(notification))) => {
// new block imported. queue up all messages tied to that hash.
if let Some((_, messages)) = self.pending.remove(&notification.hash) {
if let Some((_, _, messages)) = self.pending.remove(&notification.hash) {
let canon_number = notification.header.number().clone();
let ready_messages = messages.into_iter()
.filter_map(|m| m.wait_completed(canon_number));
@@ -165,28 +182,38 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
if update_interval {
let mut known_keys = Vec::new();
for (&block_hash, &mut (ref mut last_log, ref v)) in &mut self.pending {
for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in &mut self.pending {
if let Some(number) = self.status_check.block_number(block_hash)? {
known_keys.push((block_hash, number));
} else {
let next_log = *last_log + LOG_PENDING_INTERVAL;
if Instant::now() <= next_log {
if Instant::now() >= next_log {
debug!(
target: "afg",
"Waiting to import block {} before {} {} messages can be imported. \
Requesting network sync service to retrieve block from. \
Possible fork?",
block_hash,
v.len(),
self.identifier,
);
// NOTE: when sending an empty vec of peers the
// underlying should make a best effort to sync the
// block from any peers it knows about.
self.block_sync_requester.set_sync_fork_request(
vec![],
block_hash,
block_number,
);
*last_log = next_log;
}
}
}
for (known_hash, canon_number) in known_keys {
if let Some((_, pending_messages)) = self.pending.remove(&known_hash) {
if let Some((_, _, pending_messages)) = self.pending.remove(&known_hash) {
let ready_messages = pending_messages.into_iter()
.filter_map(|m| m.wait_completed(canon_number));
@@ -220,14 +247,14 @@ fn warn_authority_wrong_target<H: ::std::fmt::Display>(hash: H, id: AuthorityId)
impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block> {
type Blocked = Self;
fn schedule_wait<S, Wait, Ready>(
fn schedule_wait<BlockStatus, Wait, Ready>(
msg: Self::Blocked,
status_check: &S,
status_check: &BlockStatus,
mut wait: Wait,
mut ready: Ready,
) -> Result<(), Error> where
S: BlockStatus<Block>,
Wait: FnMut(Block::Hash, Self),
BlockStatus: BlockStatusT<Block>,
Wait: FnMut(Block::Hash, NumberFor<Block>, Self),
Ready: FnMut(Self::Blocked),
{
let (&target_hash, target_number) = msg.target();
@@ -239,7 +266,7 @@ impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block> {
ready(msg);
}
} else {
wait(target_hash, msg)
wait(target_hash, target_number, msg)
}
Ok(())
@@ -259,7 +286,13 @@ impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block> {
/// Helper type definition for the stream which waits until vote targets for
/// signed messages are imported.
pub(crate) type UntilVoteTargetImported<Block, Status, I> = UntilImported<Block, Status, I, SignedMessage<Block>>;
pub(crate) type UntilVoteTargetImported<Block, BlockStatus, BlockSyncRequester, I> = UntilImported<
Block,
BlockStatus,
BlockSyncRequester,
I,
SignedMessage<Block>,
>;
/// This blocks a global message import, i.e. a commit or catch up messages,
/// until all blocks referenced in its votes are known.
@@ -274,14 +307,14 @@ pub(crate) struct BlockGlobalMessage<Block: BlockT> {
impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
type Blocked = CommunicationIn<Block>;
fn schedule_wait<S, Wait, Ready>(
fn schedule_wait<BlockStatus, Wait, Ready>(
input: Self::Blocked,
status_check: &S,
status_check: &BlockStatus,
mut wait: Wait,
mut ready: Ready,
) -> Result<(), Error> where
S: BlockStatus<Block>,
Wait: FnMut(Block::Hash, Self),
BlockStatus: BlockStatusT<Block>,
Wait: FnMut(Block::Hash, NumberFor<Block>, Self),
Ready: FnMut(Self::Blocked),
{
use std::collections::hash_map::Entry;
@@ -383,7 +416,7 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
// if this is taking a long time.
for (hash, is_known) in checked_hashes {
if let KnownOrUnknown::Unknown(target_number) = is_known {
wait(hash, BlockGlobalMessage {
wait(hash, target_number, BlockGlobalMessage {
inner: locked_global.clone(),
target_number,
})
@@ -425,9 +458,10 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
/// A stream which gates off incoming global messages, i.e. commit and catch up
/// messages, until all referenced block hashes have been imported.
pub(crate) type UntilGlobalMessageBlocksImported<Block, Status, I> = UntilImported<
pub(crate) type UntilGlobalMessageBlocksImported<Block, BlockStatus, BlockSyncRequester, I> = UntilImported<
Block,
Status,
BlockStatus,
BlockSyncRequester,
I,
BlockGlobalMessage<Block>,
>;
@@ -485,12 +519,31 @@ mod tests {
inner: Arc<Mutex<HashMap<Hash, u64>>>,
}
impl BlockStatus<Block> for TestBlockStatus {
impl BlockStatusT<Block> for TestBlockStatus {
fn block_number(&self, hash: Hash) -> Result<Option<u64>, Error> {
Ok(self.inner.lock().get(&hash).map(|x| x.clone()))
}
}
#[derive(Clone)]
struct TestBlockSyncRequester {
requests: Arc<Mutex<Vec<(Hash, NumberFor<Block>)>>>,
}
impl Default for TestBlockSyncRequester {
fn default() -> Self {
TestBlockSyncRequester {
requests: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl BlockSyncRequesterT<Block> for TestBlockSyncRequester {
fn set_sync_fork_request(&self, _peers: Vec<network::PeerId>, hash: Hash, number: NumberFor<Block>) {
self.requests.lock().push((hash, number));
}
}
fn make_header(number: u64) -> Header {
Header::new(
number,
@@ -535,6 +588,7 @@ mod tests {
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
TestBlockSyncRequester::default(),
block_status,
global_rx.map_err(|_| panic!("should never error")),
"global",
@@ -561,6 +615,7 @@ mod tests {
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
TestBlockSyncRequester::default(),
block_status,
global_rx.map_err(|_| panic!("should never error")),
"global",
@@ -806,4 +861,80 @@ mod tests {
unapply_catch_up(unknown_catch_up()),
);
}
#[test]
fn request_block_sync_for_needed_blocks() {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();
let (global_tx, global_rx) = futures::sync::mpsc::unbounded();
let block_sync_requester = TestBlockSyncRequester::default();
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
block_sync_requester.clone(),
block_status,
global_rx.map_err(|_| panic!("should never error")),
"global",
);
let h1 = make_header(5);
let h2 = make_header(6);
let h3 = make_header(7);
// we create a commit message, with precommits for blocks 6 and 7 which
// we haven't imported.
let unknown_commit = CompactCommit::<Block> {
target_hash: h1.hash(),
target_number: 5,
precommits: vec![
Precommit {
target_hash: h2.hash(),
target_number: 6,
},
Precommit {
target_hash: h3.hash(),
target_number: 7,
},
],
auth_data: Vec::new(), // not used
};
let unknown_commit = || voter::CommunicationIn::Commit(
0,
unknown_commit.clone(),
voter::Callback::Blank,
);
// we send the commit message and spawn the until_imported stream
global_tx.unbounded_send(unknown_commit()).unwrap();
let mut runtime = Runtime::new().unwrap();
runtime.spawn(until_imported.into_future().map(|_| ()).map_err(|_| ()));
// assert that we will make sync requests
let assert = futures::future::poll_fn::<(), (), _>(|| {
let block_sync_requests = block_sync_requester.requests.lock();
// we request blocks targeted by the precommits that aren't imported
if block_sync_requests.contains(&(h2.hash(), *h2.number())) &&
block_sync_requests.contains(&(h3.hash(), *h3.number()))
{
return Ok(Async::Ready(()));
}
Ok(Async::NotReady)
});
// the `until_imported` stream doesn't request the blocks immediately,
// but it should request them after a small timeout
let timeout = Delay::new(Instant::now() + Duration::from_secs(60));
let test = assert.select2(timeout).map(|res| match res {
Either::A(_) => {},
Either::B(_) => panic!("timed out waiting for block sync request"),
}).map_err(|_| ());
runtime.block_on(test).unwrap();
}
}
+17 -7
View File
@@ -456,14 +456,24 @@ impl<B: BlockT> ChainSync<B> {
/// Request syncing for the given block from given set of peers.
// The implementation is similar to on_block_announce with unknown parent hash.
pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
pub fn set_sync_fork_request(&mut self, mut peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
if peers.is_empty() {
if let Some(_) = self.fork_targets.remove(hash) {
debug!(target: "sync", "Cleared sync request for block {:?} with {:?}", hash, peers);
}
return;
debug!(
target: "sync",
"Explicit sync request for block {:?} with no peers specified. \
Syncing from all connected peers {:?} instead.",
hash, peers,
);
peers = self.peers.iter()
// Only request blocks from peers who are ahead or on a par.
.filter(|(_, peer)| peer.best_number >= number)
.map(|(id, _)| id.clone())
.collect();
} else {
debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers);
}
debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers);
if self.is_known(&hash) {
debug!(target: "sync", "Refusing to sync known hash {:?}", hash);
return;
@@ -1074,7 +1084,7 @@ impl<B: BlockT> ChainSync<B> {
parent_hash: Some(header.parent_hash().clone()),
peers: Default::default(),
})
.peers.insert(who);
.peers.insert(who);
}
OnBlockAnnounce::Nothing