Send block status with announcement (#3607)

* Send block status with announcement

* Fixed tests

* Whitespace

Co-Authored-By: Gavin Wood <gavin@parity.io>

* Additional comment

* Update comment

Co-Authored-By: André Silva <andre.beat@gmail.com>
This commit is contained in:
Arkadiy Paronyan
2019-09-17 11:19:46 +02:00
committed by Bastian Köcher
parent 6679c8b051
commit 84d0c790f3
6 changed files with 85 additions and 30 deletions
+37 -19
View File
@@ -64,9 +64,9 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);
/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 3;
pub(crate) const CURRENT_VERSION: u32 = 4;
/// Lowest version we support
pub(crate) const MIN_VERSION: u32 = 2;
pub(crate) const MIN_VERSION: u32 = 3;
// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
@@ -1028,14 +1028,33 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
return;
}
let is_best = self.context_data.chain.info().chain.best_hash == hash;
debug!(target: "sync", "Reannouncing block {:?}", hash);
self.send_announcement(&header, is_best, true)
}
fn send_announcement(&mut self, header: &B::Header, is_best: bool, force: bool) {
let hash = header.hash();
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() });
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who);
peer.known_blocks.insert(hash);
self.behaviour.send_packet(who, message.clone())
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
let inserted = peer.known_blocks.insert(hash);
if inserted || force {
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce {
header: header.clone(),
state: if peer.info.protocol_version >= 4 {
if is_best {
Some(message::BlockState::Best)
} else {
Some(message::BlockState::Normal)
}
} else {
None
}
});
self.behaviour.send_packet(who, message)
}
}
}
@@ -1072,7 +1091,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
peerset: self.peerset_handle.clone(),
}, who.clone(), *header.number());
match self.sync.on_block_announce(who.clone(), hash, &header) {
let is_their_best = match announce.state.unwrap_or(message::BlockState::Best) {
message::BlockState::Best => true,
message::BlockState::Normal => false,
};
match self.sync.on_block_announce(who.clone(), hash, &header, is_their_best) {
sync::OnBlockAnnounce::Request(peer, req) => {
self.send_message(peer, GenericMessage::BlockRequest(req));
return CustomMessageOutcome::None
@@ -1132,8 +1156,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Call this when a block has been imported in the import queue and we should announce it on
/// the network.
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) {
self.sync.update_chain_info(header);
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, is_best: bool) {
if is_best {
self.sync.update_chain_info(header);
}
self.specialization.on_block_imported(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
hash.clone(),
@@ -1146,15 +1172,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
// send out block announcements
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() });
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
if peer.known_blocks.insert(hash.clone()) {
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
self.behaviour.send_packet(who, message.clone())
}
}
self.send_announcement(&header, is_best, false);
}
/// Call this when a block has been finalized. The sync layer may have some additional
+37 -2
View File
@@ -107,6 +107,15 @@ pub enum Direction {
Descending = 1,
}
/// Block state in the chain.
#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)]
pub enum BlockState {
/// Block is not part of the best chain.
Normal,
/// Latest best block.
Best,
}
/// Remote call response.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct RemoteCallResponse {
@@ -127,12 +136,13 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use codec::{Encode, Decode};
use codec::{Encode, Decode, Input, Output};
use sr_primitives::Justification;
use crate::config::Roles;
use super::{
RemoteReadResponse, Transactions, Direction,
RequestId, BlockAttributes, RemoteCallResponse, ConsensusEngineId,
BlockState,
};
/// Consensus is mostly opaque to us
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
@@ -257,10 +267,35 @@ pub mod generic {
}
/// Announce a new complete relay chain block on the network.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BlockAnnounce<H> {
/// New block header.
pub header: H,
/// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common.
pub state: Option<BlockState>,
}
// Custom Encode/Decode impl to maintain backwards compatibility with v3.
// This assumes that the packet contains nothing but the announcement message.
// TODO: Get rid of it once protocol v4 is common.
impl<H: Encode> Encode for BlockAnnounce<H> {
fn encode_to<T: Output>(&self, dest: &mut T) {
self.header.encode_to(dest);
if let Some(state) = &self.state {
state.encode_to(dest);
}
}
}
impl<H: Decode> Decode for BlockAnnounce<H> {
fn decode<I: Input>(input: &mut I) -> Result<Self, codec::Error> {
let header = H::decode(input)?;
let state = BlockState::decode(input).ok();
Ok(BlockAnnounce {
header,
state,
})
}
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
+6 -4
View File
@@ -885,7 +885,9 @@ impl<B: BlockT> ChainSync<B> {
/// header (call `on_block_data`). The network request isn't sent
/// in this case. Both hash and header is passed as an optimization
/// to avoid rehashing the header.
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header) -> OnBlockAnnounce<B> {
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header, is_best: bool)
-> OnBlockAnnounce<B>
{
let number = *header.number();
debug!(target: "sync", "Received block announcement with number {:?}", number);
if number.is_zero() {
@@ -907,7 +909,7 @@ impl<B: BlockT> ChainSync<B> {
peer.recently_announced.pop_front();
}
peer.recently_announced.push_back(hash.clone());
if number > peer.best_number {
if is_best && number > peer.best_number {
// update their best block
peer.best_number = number;
peer.best_hash = hash;
@@ -915,9 +917,9 @@ impl<B: BlockT> ChainSync<B> {
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
return OnBlockAnnounce::Nothing
}
// We assume that the announced block is the latest they have seen, and so our common number
// If the announced block is the best they have seen, our common number
// is either one further ahead or it's the one they just announced, if we know about it.
if known {
if known && is_best {
peer.common_number = number
} else if header.parent_hash() == &self.best_queued_hash || known_parent {
peer.common_number = number - One::one();
+2 -2
View File
@@ -297,8 +297,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
}
/// You must call this when a new block is imported by the client.
pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header) {
self.network_service.user_protocol_mut().on_block_imported(hash, &header);
pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, is_best: bool) {
self.network_service.user_protocol_mut().on_block_imported(hash, &header, is_best);
}
/// You must call this when a new block is finalized by the client.
+2 -2
View File
@@ -300,7 +300,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
Default::default()
};
self.block_import.import_block(import_block, cache).expect("block_import failed");
self.network.on_block_imported(hash, header);
self.network.on_block_imported(hash, header, true);
at = hash;
}
@@ -662,7 +662,7 @@ pub trait TestNetFactory: Sized {
// We poll `imported_blocks_stream`.
while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() {
peer.network.on_block_imported(notification.hash, notification.header);
peer.network.on_block_imported(notification.hash, notification.header, true);
}
// We poll `finality_notification_stream`, but we only take the last event.
+1 -1
View File
@@ -682,7 +682,7 @@ fn build_network_future<
// We poll `imported_blocks_stream`.
while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() {
network.on_block_imported(notification.hash, notification.header);
network.on_block_imported(notification.hash, notification.header, notification.is_new_best);
}
// We poll `finality_notification_stream`, but we only take the last event.