Add block announce validator. (#3346)

* Add `BlockAnnounceValidator` trait.

* Add associated data to block announcement.

* Make tests compile.

* Move validator into `sync.rs`.

* Smaller changes.

* Update core/network/src/protocol.rs

Co-Authored-By: Kian Paimani <5588131+kianenigma@users.noreply.github.com>

* Update core/network/src/protocol.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update core/network/src/test/sync.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Formatting.

* Remove assoc. data from `BlockImportNotification`.

* Use `Option<Vec<u8>>` for associated data.

* Update core/network/src/protocol/sync.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Fix type error.
This commit is contained in:
Toralf Wittner
2019-09-24 10:48:21 +02:00
committed by Bastian Köcher
parent 4888c253a3
commit af0d71d389
16 changed files with 207 additions and 89 deletions
+2 -2
View File
@@ -16,10 +16,10 @@
//! Blockchain access trait
use client::{self, Client as SubstrateClient, ClientInfo, BlockStatus, CallExecutor};
use client::{self, Client as SubstrateClient, ClientInfo, CallExecutor};
use client::error::Error;
use client::light::fetcher::ChangesProof;
use consensus::{BlockImport, Error as ConsensusError};
use consensus::{BlockImport, BlockStatus, Error as ConsensusError};
use sr_primitives::traits::{Block as BlockT, Header as HeaderT};
use sr_primitives::generic::{BlockId};
use sr_primitives::Justification;
+4 -1
View File
@@ -26,7 +26,7 @@ use crate::chain::{Client, FinalityProofProvider};
use crate::on_demand_layer::OnDemand;
use crate::service::{ExHashT, TransactionPool};
use bitflags::bitflags;
use consensus::import_queue::ImportQueue;
use consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
use sr_primitives::traits::{Block as BlockT};
use std::sync::Arc;
use libp2p::identity::{Keypair, secp256k1, ed25519};
@@ -80,6 +80,9 @@ pub struct Params<B: BlockT, S, H: ExHashT> {
/// Customization of the network. Use this to plug additional networking capabilities.
pub specialization: S,
/// Type to check incoming block announcements.
pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
}
bitflags! {
+34 -26
View File
@@ -23,14 +23,17 @@ use libp2p::core::{ConnectedPoint, nodes::Substream, muxing::StreamMuxerBox};
use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use primitives::storage::StorageKey;
use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin};
use consensus::{
BlockOrigin,
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use sr_primitives::{generic::BlockId, ConsensusEngineId, Justification};
use sr_primitives::traits::{
Block as BlockT, Header as HeaderT, NumberFor, One, Zero,
CheckedSub, SaturatedConversion
};
use consensus::import_queue::{BlockImportResult, BlockImportError};
use message::{BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::generic::{Message as GenericMessage, ConsensusMessage};
use event::Event;
use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
@@ -366,9 +369,16 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
protocol_id: ProtocolId,
peerset_config: peerset::PeersetConfig,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
) -> error::Result<(Protocol<B, S, H>, peerset::PeersetHandle)> {
let info = chain.info();
let sync = ChainSync::new(config.roles, chain.clone(), &info, finality_proof_request_builder);
let sync = ChainSync::new(
config.roles,
chain.clone(),
&info,
finality_proof_request_builder,
block_announce_validator,
);
let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let behaviour = LegacyProto::new(protocol_id, versions, peerset);
@@ -376,7 +386,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let protocol = Protocol {
tick_timeout: Box::new(futures_timer::Interval::new(TICK_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()),
propagate_timeout: Box::new(futures_timer::Interval::new(PROPAGATE_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()),
config: config,
config,
context_data: ContextData {
peers: HashMap::new(),
chain,
@@ -384,7 +394,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
light_dispatch: LightDispatch::new(checker),
genesis_hash: info.chain.genesis_hash,
sync,
specialization: specialization,
specialization,
consensus_gossip: ConsensusGossip::new(),
handshaking_peers: HashMap::new(),
transaction_pool,
@@ -1010,7 +1020,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
///
/// In chain-based consensus, we often need to make sure non-best forks are
/// at least temporarily synced.
pub fn announce_block(&mut self, hash: B::Hash) {
pub fn announce_block(&mut self, hash: B::Hash, data: Vec<u8>) {
let header = match self.context_data.chain.header(&BlockId::Hash(hash)) {
Ok(Some(header)) => header,
Ok(None) => {
@@ -1030,10 +1040,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let is_best = self.context_data.chain.info().chain.best_hash == hash;
debug!(target: "sync", "Reannouncing block {:?}", hash);
self.send_announcement(&header, is_best, true)
self.send_announcement(&header, data, is_best, true)
}
fn send_announcement(&mut self, header: &B::Header, is_best: bool, force: bool) {
fn send_announcement(&mut self, header: &B::Header, data: Vec<u8>, is_best: bool, force: bool) {
let hash = header.hash();
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
@@ -1050,7 +1060,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
} else {
None
}
},
data: if peer.info.protocol_version >= 4 {
Some(data.clone())
} else {
None
},
});
self.behaviour.send_packet(who, message)
@@ -1074,29 +1089,22 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.send_message(who, GenericMessage::Status(status))
}
fn on_block_announce(
&mut self,
who: PeerId,
announce: message::BlockAnnounce<B::Header>
) -> CustomMessageOutcome<B> {
let header = announce.header;
let hash = header.hash();
{
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
peer.known_blocks.insert(hash.clone());
}
fn on_block_announce(&mut self, who: PeerId, announce: BlockAnnounce<B::Header>) -> CustomMessageOutcome<B> {
let hash = announce.header.hash();
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
peer.known_blocks.insert(hash.clone());
}
self.light_dispatch.update_best_number(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who.clone(), *header.number());
}, who.clone(), *announce.header.number());
let is_their_best = match announce.state.unwrap_or(message::BlockState::Best) {
message::BlockState::Best => true,
message::BlockState::Normal => false,
};
match self.sync.on_block_announce(who.clone(), hash, &header, is_their_best) {
match self.sync.on_block_announce(who.clone(), hash, &announce, is_their_best) {
sync::OnBlockAnnounce::Request(peer, req) => {
self.send_message(peer, GenericMessage::BlockRequest(req));
return CustomMessageOutcome::None
@@ -1131,7 +1139,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
blocks: vec![
message::generic::BlockData {
hash: hash,
header: Some(header),
header: Some(announce.header),
body: None,
receipt: None,
message_queue: None,
@@ -1156,7 +1164,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Call this when a block has been imported in the import queue and we should announce it on
/// the network.
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, is_best: bool) {
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, data: Vec<u8>, is_best: bool) {
if is_best {
self.sync.update_chain_info(header);
}
@@ -1172,7 +1180,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
// send out block announcements
self.send_announcement(&header, is_best, false);
self.send_announcement(header, data, is_best, false);
}
/// Call this when a block has been finalized. The sync layer may have some additional
@@ -273,6 +273,8 @@ pub mod generic {
pub header: H,
/// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common.
pub state: Option<BlockState>,
/// Data associated with this block announcement, e.g. a candidate message.
pub data: Option<Vec<u8>>,
}
// Custom Encode/Decode impl to maintain backwards compatibility with v3.
@@ -284,6 +286,9 @@ pub mod generic {
if let Some(state) = &self.state {
state.encode_to(dest);
}
if let Some(data) = &self.data {
data.encode_to(dest)
}
}
}
@@ -291,9 +296,11 @@ pub mod generic {
fn decode<I: Input>(input: &mut I) -> Result<Self, codec::Error> {
let header = H::decode(input)?;
let state = BlockState::decode(input).ok();
let data = Vec::decode(input).ok();
Ok(BlockAnnounce {
header,
state,
data,
})
}
}
+28 -5
View File
@@ -28,11 +28,15 @@
//!
use blocks::BlockCollection;
use client::{BlockStatus, ClientInfo, error::Error as ClientError};
use consensus::{BlockOrigin, import_queue::{IncomingBlock, BlockImportResult, BlockImportError}};
use client::{ClientInfo, error::Error as ClientError};
use consensus::{BlockOrigin, BlockStatus,
block_validation::{BlockAnnounceValidator, Validation},
import_queue::{IncomingBlock, BlockImportResult, BlockImportError}
};
use crate::{
config::{Roles, BoxFinalityProofRequestBuilder},
message::{self, generic::FinalityProofRequest, BlockAttributes, BlockRequest, BlockResponse, FinalityProofResponse},
message::{self, generic::FinalityProofRequest, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse,
FinalityProofResponse},
protocol
};
use either::Either;
@@ -122,6 +126,8 @@ pub struct ChainSync<B: BlockT> {
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
/// A flag that caches idle state with no pending requests.
is_idle: bool,
/// A type to check incoming block announcements.
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
}
/// All the data we have about a Peer that we are trying to sync with
@@ -271,7 +277,8 @@ impl<B: BlockT> ChainSync<B> {
role: Roles,
client: Arc<dyn crate::chain::Client<B>>,
info: &ClientInfo<B>,
request_builder: Option<BoxFinalityProofRequestBuilder<B>>
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
) -> Self {
let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION;
@@ -293,6 +300,7 @@ impl<B: BlockT> ChainSync<B> {
best_importing_number: Zero::zero(),
request_builder,
is_idle: false,
block_announce_validator,
}
}
@@ -885,9 +893,10 @@ impl<B: BlockT> ChainSync<B> {
/// header (call `on_block_data`). The network request isn't sent
/// in this case. Both hash and header is passed as an optimization
/// to avoid rehashing the header.
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header, is_best: bool)
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, announce: &BlockAnnounce<B::Header>, is_best: bool)
-> OnBlockAnnounce<B>
{
let header = &announce.header;
let number = *header.number();
debug!(target: "sync", "Received block announcement with number {:?}", number);
if number.is_zero() {
@@ -932,6 +941,20 @@ impl<B: BlockT> ChainSync<B> {
return OnBlockAnnounce::Nothing
}
// Let external validator check the block announcement.
let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice());
match self.block_announce_validator.validate(&header, assoc_data) {
Ok(Validation::Success) => (),
Ok(Validation::Failure) => {
debug!(target: "sync", "block announcement validation of block {} from {} failed", hash, who);
return OnBlockAnnounce::Nothing
}
Err(e) => {
error!(target: "sync", "block announcement validation errored: {}", e);
return OnBlockAnnounce::Nothing
}
}
// stale block case
let requires_additional_data = !self.role.is_light();
if number <= self.best_queued_number {
+9 -10
View File
@@ -113,9 +113,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(
params: Params<B, S, H>,
) -> Result<NetworkWorker<B, S, H>, Error> {
pub fn new(params: Params<B, S, H>) -> Result<NetworkWorker<B, S, H>, Error> {
let (to_worker, from_worker) = mpsc::unbounded();
if let Some(ref path) = params.network_config.net_config_path {
@@ -178,6 +176,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
params.finality_proof_request_builder,
params.protocol_id,
peerset_config,
params.block_announce_validator
)?;
// Build the swarm.
@@ -297,8 +296,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
}
/// You must call this when a new block is imported by the client.
pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, is_best: bool) {
self.network_service.user_protocol_mut().on_block_imported(hash, &header, is_best);
pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, data: Vec<u8>, is_best: bool) {
self.network_service.user_protocol_mut().on_block_imported(hash, &header, data, is_best);
}
/// You must call this when a new block is finalized by the client.
@@ -394,8 +393,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
///
/// In chain-based consensus, we often need to make sure non-best forks are
/// at least temporarily synced. This function forces such an announcement.
pub fn announce_block(&self, hash: B::Hash) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash));
pub fn announce_block(&self, hash: B::Hash, data: Vec<u8>) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash, data));
}
/// Send a consensus message through the gossip
@@ -580,7 +579,7 @@ impl<B, S, H> NetworkStateInfo for NetworkService<B, S, H>
enum ServerToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
PropagateExtrinsics,
RequestJustification(B::Hash, NumberFor<B>),
AnnounceBlock(B::Hash),
AnnounceBlock(B::Hash, Vec<u8>),
ExecuteWithSpec(Box<dyn FnOnce(&mut S, &mut dyn Context<B>) + Send>),
ExecuteWithGossip(Box<dyn FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>) + Send>),
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, GossipMessageRecipient),
@@ -653,8 +652,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for Ne
}
ServerToWorkerMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
self.network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient),
ServerToWorkerMsg::AnnounceBlock(hash) =>
self.network_service.user_protocol_mut().announce_block(hash),
ServerToWorkerMsg::AnnounceBlock(hash, data) =>
self.network_service.user_protocol_mut().announce_block(hash, data),
ServerToWorkerMsg::RequestJustification(hash, number) =>
self.network_service.user_protocol_mut().request_justification(&hash, number),
ServerToWorkerMsg::PropagateExtrinsics =>
+8 -5
View File
@@ -37,6 +37,7 @@ use client::{
use client::block_builder::BlockBuilder;
use client::backend::{AuxStore, Backend, Finalizer};
use crate::config::Roles;
use consensus::block_validation::DefaultBlockAnnounceValidator;
use consensus::import_queue::BasicQueue;
use consensus::import_queue::{
BoxBlockImport, BoxJustificationImport, Verifier, BoxFinalityProofImport,
@@ -254,8 +255,8 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
}
/// Announces an important block on the network.
pub fn announce_block(&self, hash: <Block as BlockT>::Hash) {
self.network.service().announce_block(hash);
pub fn announce_block(&self, hash: <Block as BlockT>::Hash, data: Vec<u8>) {
self.network.service().announce_block(hash, data);
}
/// Add blocks to the peer -- edit the block before adding
@@ -302,11 +303,11 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
Default::default()
};
self.block_import.import_block(import_block, cache).expect("block_import failed");
self.network.on_block_imported(hash, header, true);
self.network.on_block_imported(hash, header, Vec::new(), true);
at = hash;
}
self.network.service().announce_block(at.clone());
self.network.service().announce_block(at.clone(), Vec::new());
at
}
@@ -555,6 +556,7 @@ pub trait TestNetFactory: Sized {
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
specialization: self::SpecializationFactory::create(),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone()))
}).unwrap();
self.mut_peers(|peers| {
@@ -628,6 +630,7 @@ pub trait TestNetFactory: Sized {
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
specialization: self::SpecializationFactory::create(),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone()))
}).unwrap();
self.mut_peers(|peers| {
@@ -690,7 +693,7 @@ pub trait TestNetFactory: Sized {
// We poll `imported_blocks_stream`.
while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() {
peer.network.on_block_imported(notification.hash, notification.header, true);
peer.network.on_block_imported(notification.hash, notification.header, Vec::new(), true);
}
// We poll `finality_notification_stream`, but we only take the last event.
+2 -2
View File
@@ -444,7 +444,7 @@ fn can_sync_small_non_best_forks() {
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
net.peer(0).announce_block(small_hash);
net.peer(0).announce_block(small_hash, Vec::new());
// after announcing, peer 1 downloads the block.
@@ -499,7 +499,7 @@ fn light_peer_imports_header_from_announce() {
let mut runtime = current_thread::Runtime::new().unwrap();
fn import_with_announce(net: &mut TestNet, runtime: &mut current_thread::Runtime, hash: H256) {
net.peer(0).announce_block(hash);
net.peer(0).announce_block(hash, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();