Import headers from BlockAnnounce message on light nodes (#2731)

* import headers from announce message on light

* lines width

* added comments
This commit is contained in:
Svyatoslav Nikolsky
2019-05-31 10:28:09 +03:00
committed by Gavin Wood
parent 1d6696a0ec
commit 0d9fad431b
4 changed files with 262 additions and 96 deletions
+47 -3
View File
@@ -521,8 +521,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
},
GenericMessage::BlockAnnounce(announce) => {
self.on_block_announce(network_out, who.clone(), announce);
let outcome = self.on_block_announce(network_out, who.clone(), announce);
self.update_peer_info(&who);
return outcome;
},
GenericMessage::Transactions(m) =>
self.on_extrinsics(network_out, transaction_pool, who, m),
@@ -1019,7 +1020,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
mut network_out: &mut dyn NetworkOut<B>,
who: PeerId,
announce: message::BlockAnnounce<B::Header>
) {
) -> CustomMessageOutcome<B> {
let header = announce.header;
let hash = header.hash();
{
@@ -1028,12 +1029,55 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
self.on_demand_core.on_block_announce(&mut network_out, who.clone(), *header.number());
self.sync.on_block_announce(
let try_import = self.sync.on_block_announce(
&mut ProtocolContext::new(&mut self.context_data, network_out),
who.clone(),
hash,
&header,
);
// try_import is only true when we have all data required to import block
// in the BlockAnnounce message. This is only when:
// 1) we're on light client;
// AND
// - EITHER 2.1) announced block is stale;
// - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e.
// there are no ascendants of this block scheduled for retrieval)
if !try_import {
return CustomMessageOutcome::None;
}
// to import header from announced block let's construct response to request that normally would have
// been sent over network (but it is not in our case)
let blocks_to_import = self.sync.on_block_data(
&mut ProtocolContext::new(&mut self.context_data, network_out),
who.clone(),
message::generic::BlockRequest {
id: 0,
fields: BlockAttributes::HEADER,
from: message::FromBlock::Hash(hash),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
},
message::generic::BlockResponse {
id: 0,
blocks: vec![
message::generic::BlockData {
hash: hash,
header: Some(header),
body: None,
receipt: None,
message_queue: None,
justification: None,
},
],
},
);
match blocks_to_import {
Some((origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks),
None => CustomMessageOutcome::None,
}
}
/// Call this when a block has been imported in the import queue and we should announce it on
+180 -91
View File
@@ -31,6 +31,7 @@
//!
use std::cmp::max;
use std::ops::Range;
use std::collections::{HashMap, VecDeque};
use log::{debug, trace, warn, info};
use crate::protocol::PeerInfo as ProtocolPeerInfo;
@@ -135,6 +136,7 @@ pub struct ChainSync<B: BlockT> {
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
best_queued_hash: B::Hash,
role: Roles,
required_block_attributes: message::BlockAttributes,
extra_requests: ExtraRequestsAggregator<B>,
queue_blocks: HashSet<B::Hash>,
@@ -195,6 +197,7 @@ impl<B: BlockT> ChainSync<B> {
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
extra_requests: ExtraRequestsAggregator::new(),
role,
required_block_attributes,
queue_blocks: Default::default(),
best_importing_number: Zero::zero(),
@@ -665,12 +668,21 @@ impl<B: BlockT> ChainSync<B> {
// Abort search.
peer.state = PeerSyncState::Available;
}
trace!(target: "sync", "Updating peer {} info, ours={}, common={}, their best={}", n, number, peer.common_number, peer.best_number);
if peer.best_number >= number {
peer.common_number = number;
let new_common_number = if peer.best_number >= number {
number
} else {
peer.common_number = peer.best_number;
}
peer.best_number
};
trace!(
target: "sync",
"Updating peer {} info, ours={}, common={}->{}, their best={}",
n,
number,
peer.common_number,
new_common_number,
peer.best_number,
);
peer.common_number = new_common_number;
}
}
@@ -681,12 +693,22 @@ impl<B: BlockT> ChainSync<B> {
}
/// Call when a node announces a new block.
pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, who: PeerId, hash: B::Hash, header: &B::Header) {
///
/// If true is returned, then the caller MUST try to import passed header (call `on_block_data).
/// The network request isn't sent in this case.
#[must_use]
pub(crate) fn on_block_announce(
&mut self,
protocol: &mut Context<B>,
who: PeerId,
hash: B::Hash,
header: &B::Header,
) -> bool {
let number = *header.number();
debug!(target: "sync", "Received block announcement with number {:?}", number);
if number.is_zero() {
warn!(target: "sync", "Ignored invalid block announcement from {}: {}", who, hash);
return;
return false;
}
let parent_status = block_status(&*protocol.client(), &self.queue_blocks, header.parent_hash().clone()).ok()
.unwrap_or(BlockStatus::Unknown);
@@ -705,7 +727,7 @@ impl<B: BlockT> ChainSync<B> {
peer.best_hash = hash;
}
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
return;
return false;
}
if header.parent_hash() == &self.best_queued_hash || known_parent {
peer.common_number = number - One::one();
@@ -713,39 +735,77 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = number
}
} else {
return;
return false;
}
if !(known || self.is_already_downloading(&hash)) {
let stale = number <= self.best_queued_number;
if stale {
if !(known_parent || self.is_already_downloading(header.parent_hash())) {
if protocol.client().block_status(&BlockId::Number(*header.number()))
.unwrap_or(BlockStatus::Unknown) == BlockStatus::InChainPruned
{
trace!(target: "sync", "Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header);
// known block case
if known || self.is_already_downloading(&hash) {
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
return false;
}
// stale block case
let requires_additional_data = !self.role.is_light();
let stale = number <= self.best_queued_number;
if stale {
if !(known_parent || self.is_already_downloading(header.parent_hash())) {
if protocol.client().block_status(&BlockId::Number(*header.number()))
.unwrap_or(BlockStatus::Unknown) == BlockStatus::InChainPruned
{
trace!(target: "sync", "Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header);
return false;
}
trace!(target: "sync", "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header);
let request = self.download_unknown_stale(&who, &hash);
match request {
Some(request) => if requires_additional_data {
protocol.send_block_request(who, request);
return false;
} else {
trace!(target: "sync", "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header);
self.download_unknown_stale(protocol, who, &hash);
}
} else {
if ancient_parent {
trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header);
} else {
self.download_stale(protocol, who, &hash);
}
return true;
},
None => return false,
}
} else {
if ancient_parent {
trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header);
} else {
trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header);
self.download_new(protocol, who);
trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header);
return false;
}
let request = self.download_stale(&who, &hash);
match request {
Some(request) => if requires_additional_data {
protocol.send_block_request(who, request);
return false;
} else {
return true;
},
None => return false,
}
}
} else {
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
}
if ancient_parent {
trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header);
return false;
}
trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header);
let (range, request) = match self.select_new_blocks(who.clone()) {
Some((range, request)) => (range, request),
None => return false,
};
let is_required_data_available =
!requires_additional_data &&
range.end - range.start == One::one() &&
range.start == *header.number();
if !is_required_data_available {
protocol.send_block_request(who, request);
return false;
}
true
}
fn is_already_downloading(&self, hash: &B::Hash) -> bool {
@@ -788,76 +848,105 @@ impl<B: BlockT> ChainSync<B> {
}
// Download old block with known parent.
fn download_stale(&mut self, protocol: &mut Context<B>, who: PeerId, hash: &B::Hash) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
PeerSyncState::Available => {
let request = message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Hash(*hash),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
};
peer.state = PeerSyncState::DownloadingStale(*hash);
protocol.send_block_request(who, request);
},
_ => (),
}
fn download_stale(
&mut self,
who: &PeerId,
hash: &B::Hash,
) -> Option<message::BlockRequest<B>> {
let peer = self.peers.get_mut(who)?;
match peer.state {
PeerSyncState::Available => {
peer.state = PeerSyncState::DownloadingStale(*hash);
Some(message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Hash(*hash),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
})
},
_ => None,
}
}
// Download old block with unknown parent.
fn download_unknown_stale(&mut self, protocol: &mut Context<B>, who: PeerId, hash: &B::Hash) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
PeerSyncState::Available => {
let request = message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Hash(*hash),
to: None,
direction: message::Direction::Descending,
max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN),
};
peer.state = PeerSyncState::DownloadingStale(*hash);
protocol.send_block_request(who, request);
},
_ => (),
}
fn download_unknown_stale(
&mut self,
who: &PeerId,
hash: &B::Hash,
) -> Option<message::BlockRequest<B>> {
let peer = self.peers.get_mut(who)?;
match peer.state {
PeerSyncState::Available => {
peer.state = PeerSyncState::DownloadingStale(*hash);
Some(message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Hash(*hash),
to: None,
direction: message::Direction::Descending,
max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN),
})
},
_ => None,
}
}
// Issue a request for a peer to download new blocks, if any are available
// Issue a request for a peer to download new blocks, if any are available.
fn download_new(&mut self, protocol: &mut Context<B>, who: PeerId) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
// when there are too many blocks in the queue => do not try to download new blocks
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: "sync", "Too many blocks in the queue.");
return;
}
match peer.state {
PeerSyncState::Available => {
trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", who, peer.common_number, peer.best_number);
if let Some(range) = self.blocks.needed_blocks(who.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) {
if let Some((_, request)) = self.select_new_blocks(who.clone()) {
protocol.send_block_request(who, request);
}
}
// Select a range of NEW blocks to download from peer.
fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range<NumberFor<B>>, message::BlockRequest<B>)> {
// when there are too many blocks in the queue => do not try to download new blocks
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: "sync", "Too many blocks in the queue.");
return None;
}
let peer = self.peers.get_mut(&who)?;
match peer.state {
PeerSyncState::Available => {
trace!(
target: "sync",
"Considering new block download from {}, common block is {}, best is {:?}",
who,
peer.common_number,
peer.best_number,
);
let range = self.blocks.needed_blocks(who.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number);
match range {
Some(range) => {
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end);
let request = message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Number(range.start),
to: None,
direction: message::Direction::Ascending,
max: Some((range.end - range.start).saturated_into::<u32>()),
};
let from = message::FromBlock::Number(range.start);
let max = Some((range.end - range.start).saturated_into::<u32>());
peer.state = PeerSyncState::DownloadingNew(range.start);
protocol.send_block_request(who, request);
} else {
Some((
range,
message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from,
to: None,
direction: message::Direction::Ascending,
max,
},
))
},
None => {
trace!(target: "sync", "Nothing to request");
}
},
_ => trace!(target: "sync", "Peer {} is busy", who),
}
None
},
}
},
_ => {
trace!(target: "sync", "Peer {} is busy", who);
None
},
}
}
+2 -2
View File
@@ -521,7 +521,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
/// Synchronize with import queue.
#[cfg(any(test, feature = "test-helpers"))]
fn import_queue_sync(&self) {
pub fn import_queue_sync(&self) {
self.import_queue.synchronize();
let _ = self.net_proto_channel.wait_sync();
}
@@ -675,7 +675,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
/// Push blocks to the peer (simplified: with or without a TX) starting from
/// given hash.
fn push_blocks_at(&self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 {
pub fn push_blocks_at(&self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 {
let mut nonce = 0;
if with_tx {
self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| {
+33
View File
@@ -452,3 +452,36 @@ fn can_not_sync_from_light_peer() {
// check that light #1 has disconnected from #2
assert_eq!(net.peer(1).protocol_status().num_peers, 1);
}
#[test]
fn light_peer_imports_header_from_announce() {
let _ = ::env_logger::try_init();
fn import_with_announce(net: &mut TestNet, hash: H256) {
let header = net.peer(0).client().header(&BlockId::Hash(hash)).unwrap().unwrap();
net.peer(1).receive_message(
&net.peer(0).peer_id,
message::generic::Message::BlockAnnounce(message::generic::BlockAnnounce {
header,
}),
);
net.peer(1).import_queue_sync();
assert!(net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some());
}
// given the network with 1 full nodes (#0) and 1 light node (#1)
let mut net = TestNet::new(1);
net.add_light_peer(&Default::default());
// let them connect to each other
net.sync();
// check that NEW block is imported from announce message
let new_hash = net.peer(0).push_blocks(1, false);
import_with_announce(&mut net, new_hash);
// check that KNOWN STALE block is imported from announce message
let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true);
import_with_announce(&mut net, known_stale_hash);
}