Sync: Propagate block announcement data (#7903)

* Sync: Propagate block announcement data

This pr adds a feature to the sync protocol to propagate the data that
we received alongside a block announcement. This is done by adding a
cache that caches the last X block announcement data where X is set to
the number of `in_peers` (giving every peer the chance to send us a
different block). This will be required by parachains to ensure that
even peers who are not connected to a collator receive the data
alongside the block announcement to properly validate it and request the
block.

* Review comment

* Bring back the code and add new variant to ensure we don't insert block
announce data when something wasn't checked

* Also use out_peers
This commit is contained in:
Bastian Köcher
2021-01-19 17:01:11 +01:00
committed by GitHub
parent 2e44ffb7a7
commit 450b96c50d
13 changed files with 198 additions and 72 deletions
+49 -11
View File
@@ -51,7 +51,10 @@ use sp_consensus::Error as ConsensusError;
use sp_consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport};
use futures::prelude::*;
use futures::future::BoxFuture;
use sc_network::{NetworkWorker, NetworkService, config::ProtocolId};
use sc_network::{
NetworkWorker, NetworkService, config::{ProtocolId, MultiaddrWithPeerId, NonReservedPeerMode},
Multiaddr,
};
use sc_network::config::{NetworkConfiguration, NonDefaultSetConfig, TransportConfig};
use libp2p::PeerId;
use parking_lot::Mutex;
@@ -228,6 +231,7 @@ pub struct Peer<D> {
network: NetworkWorker<Block, <Block as BlockT>::Hash>,
imported_blocks_stream: Pin<Box<dyn Stream<Item = BlockImportNotification<Block>> + Send>>,
finality_notification_stream: Pin<Box<dyn Stream<Item = FinalityNotification<Block>> + Send>>,
listen_addr: Multiaddr,
}
impl<D> Peer<D> {
@@ -267,7 +271,7 @@ impl<D> Peer<D> {
}
/// Announces an important block on the network.
pub fn announce_block(&self, hash: <Block as BlockT>::Hash, data: Vec<u8>) {
pub fn announce_block(&self, hash: <Block as BlockT>::Hash, data: Option<Vec<u8>>) {
self.network.service().announce_block(hash, data);
}
@@ -281,7 +285,7 @@ impl<D> Peer<D> {
where F: FnMut(BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>) -> Block
{
let best_hash = self.client.info().best_hash;
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block, false, true)
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block, false, true, true)
}
/// Add blocks to the peer -- edit the block before adding. The chain will
@@ -294,6 +298,7 @@ impl<D> Peer<D> {
mut edit_block: F,
headers_only: bool,
inform_sync_about_new_best_block: bool,
announce_block: bool,
) -> H256 where F: FnMut(BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>) -> Block {
let full_client = self.client.as_full()
.expect("blocks could only be generated by full clients");
@@ -327,7 +332,9 @@ impl<D> Peer<D> {
};
self.block_import.import_block(import_block, cache).expect("block_import failed");
self.network.service().announce_block(hash, Vec::new());
if announce_block {
self.network.service().announce_block(hash, None);
}
at = hash;
}
@@ -337,7 +344,6 @@ impl<D> Peer<D> {
full_client.header(&BlockId::Hash(at)).ok().flatten().unwrap().number().clone(),
);
}
self.network.service().announce_block(at.clone(), Vec::new());
at
}
@@ -350,13 +356,13 @@ impl<D> Peer<D> {
/// Push blocks to the peer (simplified: with or without a TX)
pub fn push_headers(&mut self, count: usize) -> H256 {
let best_hash = self.client.info().best_hash;
self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true, true)
self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true, true, true)
}
/// Push blocks to the peer (simplified: with or without a TX) starting from
/// given hash.
pub fn push_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 {
self.generate_tx_blocks_at(at, count, with_tx, false, true)
self.generate_tx_blocks_at(at, count, with_tx, false, true, true)
}
/// Push blocks to the peer (simplified: with or without a TX) starting from
@@ -367,7 +373,18 @@ impl<D> Peer<D> {
count: usize,
with_tx: bool,
) -> H256 {
self.generate_tx_blocks_at(at, count, with_tx, false, false)
self.generate_tx_blocks_at(at, count, with_tx, false, false, true)
}
/// Push blocks to the peer (simplified: with or without a TX) starting from
/// given hash without announcing the block.
pub fn push_blocks_at_without_announcing(
&mut self,
at: BlockId<Block>,
count: usize,
with_tx: bool,
) -> H256 {
self.generate_tx_blocks_at(at, count, with_tx, false, true, false)
}
/// Push blocks/headers to the peer (simplified: with or without a TX) starting from
@@ -379,6 +396,7 @@ impl<D> Peer<D> {
with_tx: bool,
headers_only: bool,
inform_sync_about_new_best_block: bool,
announce_block: bool,
) -> H256 {
let mut nonce = 0;
if with_tx {
@@ -398,6 +416,7 @@ impl<D> Peer<D> {
},
headers_only,
inform_sync_about_new_best_block,
announce_block,
)
} else {
self.generate_blocks_at(
@@ -407,6 +426,7 @@ impl<D> Peer<D> {
|builder| builder.build().unwrap().block,
headers_only,
inform_sync_about_new_best_block,
announce_block,
)
}
}
@@ -585,6 +605,10 @@ pub struct FullPeerConfig {
pub block_announce_validator: Option<Box<dyn BlockAnnounceValidator<Block> + Send + Sync>>,
/// List of notification protocols that the network must support.
pub notifications_protocols: Vec<Cow<'static, str>>,
/// The indices of the peers the peer should be connected to.
///
/// If `None`, it will be connected to all other peers.
pub connect_to_peers: Option<Vec<usize>>,
}
pub trait TestNetFactory: Sized {
@@ -689,6 +713,15 @@ pub trait TestNetFactory: Sized {
set_config: Default::default()
}
}).collect();
if let Some(connect_to) = config.connect_to_peers {
let addrs = connect_to.iter().map(|v| {
let peer_id = self.peer(*v).network_service().local_peer_id().clone();
let multiaddr = self.peer(*v).listen_addr.clone();
MultiaddrWithPeerId { peer_id, multiaddr }
}).collect();
network_config.default_peers_set.reserved_nodes = addrs;
network_config.default_peers_set.non_reserved_mode = NonReservedPeerMode::Deny;
}
let protocol_id = ProtocolId::from("test-protocol-name");
@@ -715,9 +748,12 @@ pub trait TestNetFactory: Sized {
trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id());
self.mut_peers(|peers| {
self.mut_peers(move |peers| {
for peer in peers.iter_mut() {
peer.network.add_known_address(network.service().local_peer_id().clone(), listen_addr.clone());
peer.network.add_known_address(
network.service().local_peer_id().clone(),
listen_addr.clone(),
);
}
let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse());
@@ -733,6 +769,7 @@ pub trait TestNetFactory: Sized {
block_import,
verifier,
network,
listen_addr,
});
});
}
@@ -813,6 +850,7 @@ pub trait TestNetFactory: Sized {
imported_blocks_stream,
finality_notification_stream,
network,
listen_addr,
});
});
}
@@ -912,7 +950,7 @@ pub trait TestNetFactory: Sized {
// We poll `imported_blocks_stream`.
while let Poll::Ready(Some(notification)) = peer.imported_blocks_stream.as_mut().poll_next(cx) {
peer.network.service().announce_block(notification.hash, Vec::new());
peer.network.service().announce_block(notification.hash, None);
}
// We poll `finality_notification_stream`, but we only take the last event.
+62 -7
View File
@@ -436,7 +436,7 @@ fn can_sync_small_non_best_forks() {
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
net.peer(0).announce_block(small_hash, Vec::new());
net.peer(0).announce_block(small_hash, None);
// after announcing, peer 1 downloads the block.
@@ -452,7 +452,7 @@ fn can_sync_small_non_best_forks() {
net.block_until_sync();
let another_fork = net.peer(0).push_blocks_at(BlockId::Number(35), 2, true);
net.peer(0).announce_block(another_fork, Vec::new());
net.peer(0).announce_block(another_fork, None);
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client().header(&BlockId::Hash(another_fork)).unwrap().is_none() {
@@ -500,7 +500,7 @@ fn light_peer_imports_header_from_announce() {
sp_tracing::try_init_simple();
fn import_with_announce(net: &mut TestNet, hash: H256) {
net.peer(0).announce_block(hash, Vec::new());
net.peer(0).announce_block(hash, None);
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
@@ -610,7 +610,7 @@ fn does_not_sync_announced_old_best_block() {
net.peer(0).push_blocks(18, true);
net.peer(1).push_blocks(20, true);
net.peer(0).announce_block(old_hash, Vec::new());
net.peer(0).announce_block(old_hash, None);
block_on(futures::future::poll_fn::<(), _>(|cx| {
// poll once to import announcement
net.poll(cx);
@@ -618,7 +618,7 @@ fn does_not_sync_announced_old_best_block() {
}));
assert!(!net.peer(1).is_major_syncing());
net.peer(0).announce_block(old_hash_with_parent, Vec::new());
net.peer(0).announce_block(old_hash_with_parent, None);
block_on(futures::future::poll_fn::<(), _>(|cx| {
// poll once to import announcement
net.poll(cx);
@@ -653,8 +653,8 @@ fn imports_stale_once() {
fn import_with_announce(net: &mut TestNet, hash: H256) {
// Announce twice
net.peer(0).announce_block(hash, Vec::new());
net.peer(0).announce_block(hash, Vec::new());
net.peer(0).announce_block(hash, None);
net.peer(0).announce_block(hash, None);
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
@@ -842,3 +842,58 @@ fn sync_to_tip_when_we_sync_together_with_multiple_peers() {
net.block_until_idle();
}
}
/// Ensures that when we receive a block announcement with some data attached, that we propagate
/// this data when reannouncing the block.
#[test]
fn block_announce_data_is_propagated() {
struct TestBlockAnnounceValidator;
impl BlockAnnounceValidator<Block> for TestBlockAnnounceValidator {
fn validate(
&mut self,
_: &Header,
data: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, Box<dyn std::error::Error + Send>>> + Send>> {
let correct = data.get(0) == Some(&137);
async move {
if correct {
Ok(Validation::Success { is_new_best: true })
} else {
Ok(Validation::Failure { disconnect: false })
}
}.boxed()
}
}
sp_tracing::try_init_simple();
let mut net = TestNet::new(1);
net.add_full_peer_with_config(FullPeerConfig {
block_announce_validator: Some(Box::new(TestBlockAnnounceValidator)),
..Default::default()
});
net.add_full_peer_with_config(FullPeerConfig {
block_announce_validator: Some(Box::new(TestBlockAnnounceValidator)),
connect_to_peers: Some(vec![1]),
..Default::default()
});
// Wait until peer 1 is connected to both nodes.
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).num_peers() == 2 {
Poll::Ready(())
} else {
Poll::Pending
}
}));
let block_hash = net.peer(0).push_blocks_at_without_announcing(BlockId::Number(0), 1, true);
net.peer(0).announce_block(block_hash, Some(vec![137]));
while !net.peer(1).has_block(&block_hash) || !net.peer(2).has_block(&block_hash) {
net.block_until_idle();
}
}