Fixed block proagation after import and import notifications (#119)

* Fixed block proagation after import and import notifications

* Removed cargo check
This commit is contained in:
Arkadiy Paronyan
2018-04-11 19:25:41 +02:00
committed by Gav Wood
parent d978425f05
commit 54d6970efc
7 changed files with 41 additions and 19 deletions
-1
View File
@@ -12,6 +12,5 @@ matrix:
- rust: stable - rust: stable
script: script:
- cargo check --all
- cargo test --all - cargo test --all
- ./publish-wasm.sh - ./publish-wasm.sh
+1 -1
View File
@@ -234,7 +234,7 @@ impl Service {
thread_network.start_network(); thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created"); let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| { let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(&notification.header); thread_network.on_block_imported(notification.hash, &notification.header);
Ok(()) Ok(())
}); });
if let Err(e) = core.run(events) { if let Err(e) = core.run(events) {
+2 -2
View File
@@ -315,7 +315,7 @@ impl<B, E> Client<B, E> where
let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1; let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1;
let hash: block::HeaderHash = header.blake2_256().into(); let hash: block::HeaderHash = header.blake2_256().into();
trace!("Imported {}, (#{}), best={}", hash, header.number, is_new_best); trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin);
transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?; transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?;
transaction.set_storage(overlay.drain())?; transaction.set_storage(overlay.drain())?;
self.backend.commit_operation(transaction)?; self.backend.commit_operation(transaction)?;
@@ -407,7 +407,7 @@ impl<B, E> bft::BlockImport for Client<B, E>
justification, justification,
}; };
let _ = self.import_block(BlockOrigin::Genesis, justified_header, Some(block.transactions)); let _ = self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(block.transactions));
} }
} }
+31 -9
View File
@@ -23,6 +23,7 @@ use futures::sync::oneshot;
use serde_json; use serde_json;
use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId}; use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::{Hash, blake2_256}; use primitives::{Hash, blake2_256};
use runtime_support::Hashable;
use network::{PeerId, NodeId}; use network::{PeerId, NodeId};
use message::{self, Message}; use message::{self, Message};
@@ -82,6 +83,8 @@ struct Peer {
request_timestamp: Option<time::Instant>, request_timestamp: Option<time::Instant>,
/// Holds a set of transactions known to this peer. /// Holds a set of transactions known to this peer.
known_transactions: HashSet<TransactionHash>, known_transactions: HashSet<TransactionHash>,
/// Holds a set of blocks known to this peer.
known_blocks: HashSet<HeaderHash>,
/// Request counter, /// Request counter,
next_request_id: message::RequestId, next_request_id: message::RequestId,
} }
@@ -185,17 +188,17 @@ impl Protocol {
} }
pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) { pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) {
let mut peers = self.peers.write(); match &mut message {
if let Some(ref mut peer) = peers.get_mut(&peer_id) { &mut Message::BlockRequest(ref mut r) => {
match &mut message { let mut peers = self.peers.write();
&mut Message::BlockRequest(ref mut r) => { if let Some(ref mut peer) = peers.get_mut(&peer_id) {
r.id = peer.next_request_id; r.id = peer.next_request_id;
peer.next_request_id = peer.next_request_id + 1; peer.next_request_id = peer.next_request_id + 1;
peer.block_request = Some(r.clone()); peer.block_request = Some(r.clone());
peer.request_timestamp = Some(time::Instant::now()); peer.request_timestamp = Some(time::Instant::now());
}, }
_ => (), },
} _ => (),
} }
let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed"); let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed");
if let Err(e) = io.send(peer_id, data) { if let Err(e) = io.send(peer_id, data) {
@@ -410,6 +413,7 @@ impl Protocol {
block_request: None, block_request: None,
request_timestamp: None, request_timestamp: None,
known_transactions: HashSet::new(), known_transactions: HashSet::new(),
known_blocks: HashSet::new(),
next_request_id: 0, next_request_id: 0,
}; };
peers.insert(peer_id.clone(), peer); peers.insert(peer_id.clone(), peer);
@@ -484,11 +488,29 @@ impl Protocol {
pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) { pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) {
let header = announce.header; let header = announce.header;
self.sync.write().on_block_announce(io, self, peer_id, &header); let hash: HeaderHash = header.blake2_256().into();
{
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
peer.known_blocks.insert(hash.clone());
}
}
self.sync.write().on_block_announce(io, self, peer_id, hash, &header);
} }
pub fn on_block_imported(&self, header: &Header) { pub fn on_block_imported(&self, io: &mut SyncIo, hash: HeaderHash, header: &Header) {
self.sync.write().update_chain_info(&header); self.sync.write().update_chain_info(&header);
// send out block announcements
let mut peers = self.peers.write();
for (peer_id, ref mut peer) in peers.iter_mut() {
if peer.known_blocks.insert(hash.clone()) {
trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id);
self.send_message(io, *peer_id, Message::BlockAnnounce(message::BlockAnnounce {
header: header.clone()
}));
}
}
} }
pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> { pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
+5 -3
View File
@@ -21,7 +21,7 @@ use futures::sync::{oneshot, mpsc};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId, use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind}; NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_devp2p::{NetworkService}; use network_devp2p::{NetworkService};
use primitives::block::{TransactionHash, Header}; use primitives::block::{TransactionHash, Header, HeaderHash};
use primitives::Hash; use primitives::Hash;
use core_io::{TimerToken}; use core_io::{TimerToken};
use io::NetSyncIo; use io::NetSyncIo;
@@ -154,8 +154,10 @@ impl Service {
} }
/// Called when a new block is imported by the client. /// Called when a new block is imported by the client.
pub fn on_block_imported(&self, header: &Header) { pub fn on_block_imported(&self, hash: HeaderHash, header: &Header) {
self.handler.protocol.on_block_imported(header) self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.on_block_imported(&mut NetSyncIo::new(context), hash, header)
});
} }
/// Called when new transactons are imported by the client. /// Called when new transactons are imported by the client.
+1 -2
View File
@@ -307,8 +307,7 @@ impl ChainSync {
self.block_imported(&hash, best_header.number) self.block_imported(&hash, best_header.number)
} }
pub fn on_block_announce(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, header: &Header) { pub fn on_block_announce(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, hash: HeaderHash, header: &Header) {
let hash = header_hash(&header);
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if header.number > peer.best_number { if header.number > peer.best_number {
peer.best_number = header.number; peer.best_number = header.number;
+1 -1
View File
@@ -114,7 +114,7 @@ impl Peer {
// Update the sync state to the latest chain state. // Update the sync state to the latest chain state.
let info = self.client.info().expect("In-mem client does not fail"); let info = self.client.info().expect("In-mem client does not fail");
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
self.sync.on_block_imported(&header); self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
} }
/// Called on connection to other indicated peer. /// Called on connection to other indicated peer.