From 54d6970efc6bfb9cb5223add2af460394e893cc8 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Wed, 11 Apr 2018 19:25:41 +0200 Subject: [PATCH] Fixed block proagation after import and import notifications (#119) * Fixed block proagation after import and import notifications * Removed cargo check --- substrate/.travis.yml | 1 - substrate/polkadot/service/src/lib.rs | 2 +- substrate/substrate/client/src/client.rs | 4 +-- substrate/substrate/network/src/protocol.rs | 40 ++++++++++++++++----- substrate/substrate/network/src/service.rs | 8 +++-- substrate/substrate/network/src/sync.rs | 3 +- substrate/substrate/network/src/test/mod.rs | 2 +- 7 files changed, 41 insertions(+), 19 deletions(-) diff --git a/substrate/.travis.yml b/substrate/.travis.yml index 33a20d3d62..4165a6fb10 100644 --- a/substrate/.travis.yml +++ b/substrate/.travis.yml @@ -12,6 +12,5 @@ matrix: - rust: stable script: - - cargo check --all - cargo test --all - ./publish-wasm.sh diff --git a/substrate/polkadot/service/src/lib.rs b/substrate/polkadot/service/src/lib.rs index 20bbe3530a..da5a59c5aa 100644 --- a/substrate/polkadot/service/src/lib.rs +++ b/substrate/polkadot/service/src/lib.rs @@ -234,7 +234,7 @@ impl Service { thread_network.start_network(); let mut core = Core::new().expect("tokio::Core could not be created"); let events = thread_client.import_notification_stream().for_each(|notification| { - thread_network.on_block_imported(¬ification.header); + thread_network.on_block_imported(notification.hash, ¬ification.header); Ok(()) }); if let Err(e) = core.run(events) { diff --git a/substrate/substrate/client/src/client.rs b/substrate/substrate/client/src/client.rs index 14a69154a4..a84446f671 100644 --- a/substrate/substrate/client/src/client.rs +++ b/substrate/substrate/client/src/client.rs @@ -315,7 +315,7 @@ impl Client where let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1; let hash: block::HeaderHash = header.blake2_256().into(); - trace!("Imported {}, (#{}), best={}", hash, header.number, is_new_best); + trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin); transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?; transaction.set_storage(overlay.drain())?; self.backend.commit_operation(transaction)?; @@ -407,7 +407,7 @@ impl bft::BlockImport for Client justification, }; - let _ = self.import_block(BlockOrigin::Genesis, justified_header, Some(block.transactions)); + let _ = self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(block.transactions)); } } diff --git a/substrate/substrate/network/src/protocol.rs b/substrate/substrate/network/src/protocol.rs index c7e438045f..fe022491a8 100644 --- a/substrate/substrate/network/src/protocol.rs +++ b/substrate/substrate/network/src/protocol.rs @@ -23,6 +23,7 @@ use futures::sync::oneshot; use serde_json; use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId}; use primitives::{Hash, blake2_256}; +use runtime_support::Hashable; use network::{PeerId, NodeId}; use message::{self, Message}; @@ -82,6 +83,8 @@ struct Peer { request_timestamp: Option, /// Holds a set of transactions known to this peer. known_transactions: HashSet, + /// Holds a set of blocks known to this peer. + known_blocks: HashSet, /// Request counter, next_request_id: message::RequestId, } @@ -185,17 +188,17 @@ impl Protocol { } pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) { - let mut peers = self.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&peer_id) { - match &mut message { - &mut Message::BlockRequest(ref mut r) => { + match &mut message { + &mut Message::BlockRequest(ref mut r) => { + let mut peers = self.peers.write(); + if let Some(ref mut peer) = peers.get_mut(&peer_id) { r.id = peer.next_request_id; peer.next_request_id = peer.next_request_id + 1; peer.block_request = Some(r.clone()); peer.request_timestamp = Some(time::Instant::now()); - }, - _ => (), - } + } + }, + _ => (), } let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed"); if let Err(e) = io.send(peer_id, data) { @@ -410,6 +413,7 @@ impl Protocol { block_request: None, request_timestamp: None, known_transactions: HashSet::new(), + known_blocks: HashSet::new(), next_request_id: 0, }; peers.insert(peer_id.clone(), peer); @@ -484,11 +488,29 @@ impl Protocol { pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) { let header = announce.header; - self.sync.write().on_block_announce(io, self, peer_id, &header); + let hash: HeaderHash = header.blake2_256().into(); + { + let mut peers = self.peers.write(); + if let Some(ref mut peer) = peers.get_mut(&peer_id) { + peer.known_blocks.insert(hash.clone()); + } + } + self.sync.write().on_block_announce(io, self, peer_id, hash, &header); } - pub fn on_block_imported(&self, header: &Header) { + pub fn on_block_imported(&self, io: &mut SyncIo, hash: HeaderHash, header: &Header) { self.sync.write().update_chain_info(&header); + // send out block announcements + let mut peers = self.peers.write(); + + for (peer_id, ref mut peer) in peers.iter_mut() { + if peer.known_blocks.insert(hash.clone()) { + trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id); + self.send_message(io, *peer_id, Message::BlockAnnounce(message::BlockAnnounce { + header: header.clone() + })); + } + } } pub fn transactions_stats(&self) -> BTreeMap { diff --git a/substrate/substrate/network/src/service.rs b/substrate/substrate/network/src/service.rs index 9115691c26..25238385c5 100644 --- a/substrate/substrate/network/src/service.rs +++ b/substrate/substrate/network/src/service.rs @@ -21,7 +21,7 @@ use futures::sync::{oneshot, mpsc}; use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId, NetworkConfiguration , NonReservedPeerMode, ErrorKind}; use network_devp2p::{NetworkService}; -use primitives::block::{TransactionHash, Header}; +use primitives::block::{TransactionHash, Header, HeaderHash}; use primitives::Hash; use core_io::{TimerToken}; use io::NetSyncIo; @@ -154,8 +154,10 @@ impl Service { } /// Called when a new block is imported by the client. - pub fn on_block_imported(&self, header: &Header) { - self.handler.protocol.on_block_imported(header) + pub fn on_block_imported(&self, hash: HeaderHash, header: &Header) { + self.network.with_context(DOT_PROTOCOL_ID, |context| { + self.handler.protocol.on_block_imported(&mut NetSyncIo::new(context), hash, header) + }); } /// Called when new transactons are imported by the client. diff --git a/substrate/substrate/network/src/sync.rs b/substrate/substrate/network/src/sync.rs index 60640664df..37d579771d 100644 --- a/substrate/substrate/network/src/sync.rs +++ b/substrate/substrate/network/src/sync.rs @@ -307,8 +307,7 @@ impl ChainSync { self.block_imported(&hash, best_header.number) } - pub fn on_block_announce(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, header: &Header) { - let hash = header_hash(&header); + pub fn on_block_announce(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, hash: HeaderHash, header: &Header) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { if header.number > peer.best_number { peer.best_number = header.number; diff --git a/substrate/substrate/network/src/test/mod.rs b/substrate/substrate/network/src/test/mod.rs index 52bb5d57ff..8c40164d80 100644 --- a/substrate/substrate/network/src/test/mod.rs +++ b/substrate/substrate/network/src/test/mod.rs @@ -114,7 +114,7 @@ impl Peer { // Update the sync state to the latest chain state. let info = self.client.info().expect("In-mem client does not fail"); let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); - self.sync.on_block_imported(&header); + self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header); } /// Called on connection to other indicated peer.