mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 07:41:08 +00:00
Fix light sync deadlock (blocks import queue) (#260)
* fix light sync deadlock * some more docs
This commit is contained in:
committed by
Arkadiy Paronyan
parent
c6119cbbdf
commit
f775b38828
@@ -25,7 +25,7 @@ use runtime_primitives::bft::Justification;
|
||||
/// Local client abstraction for the network.
|
||||
pub trait Client<Block: BlockT>: Send + Sync {
|
||||
/// Import a new block. Parent is supposed to be existing in the blockchain.
|
||||
fn import(&self, is_best: bool, header: Block::Header, justification: Justification<Block::Hash>, body: Option<Vec<Block::Extrinsic>>) -> Result<ImportResult, Error>;
|
||||
fn import(&self, origin: BlockOrigin, header: Block::Header, justification: Justification<Block::Hash>, body: Option<Vec<Block::Extrinsic>>) -> Result<ImportResult, Error>;
|
||||
|
||||
/// Get blockchain info.
|
||||
fn info(&self) -> Result<ClientInfo<Block>, Error>;
|
||||
@@ -54,10 +54,9 @@ impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
|
||||
E: CallExecutor<Block> + Send + Sync + 'static,
|
||||
Block: BlockT,
|
||||
{
|
||||
fn import(&self, is_best: bool, header: Block::Header, justification: Justification<Block::Hash>, body: Option<Vec<Block::Extrinsic>>) -> Result<ImportResult, Error> {
|
||||
fn import(&self, origin: BlockOrigin, header: Block::Header, justification: Justification<Block::Hash>, body: Option<Vec<Block::Extrinsic>>) -> Result<ImportResult, Error> {
|
||||
// TODO: defer justification check.
|
||||
let justified_header = self.check_justification(header, justification.into())?;
|
||||
let origin = if is_best { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync };
|
||||
(self as &SubstrateClient<B, E, Block>).import_block(origin, justified_header, body)
|
||||
}
|
||||
|
||||
|
||||
@@ -16,12 +16,14 @@
|
||||
|
||||
//! Polkadot service possible errors.
|
||||
|
||||
use std::io::Error as IoError;
|
||||
use network_libp2p::Error as NetworkError;
|
||||
use client;
|
||||
|
||||
error_chain! {
|
||||
foreign_links {
|
||||
Network(NetworkError) #[doc = "Devp2p error."];
|
||||
Io(IoError) #[doc = "IO error."];
|
||||
}
|
||||
|
||||
links {
|
||||
|
||||
@@ -0,0 +1,566 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
//! Blocks import queue.
|
||||
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use parking_lot::{Condvar, Mutex, RwLock};
|
||||
|
||||
use client::{BlockOrigin, BlockStatus, ImportResult};
|
||||
use network_libp2p::PeerId;
|
||||
use runtime_primitives::generic::BlockId;
|
||||
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
|
||||
|
||||
use blocks::BlockData;
|
||||
use chain::Client;
|
||||
use error::{ErrorKind, Error};
|
||||
use protocol::Context;
|
||||
use service::ExecuteInContext;
|
||||
use sync::ChainSync;
|
||||
|
||||
/// Blocks import queue API.
|
||||
pub trait ImportQueue<B: BlockT>: Send + Sync {
|
||||
/// Clear the queue when sync is restarting.
|
||||
fn clear(&self);
|
||||
/// Get queue status.
|
||||
fn status(&self) -> ImportQueueStatus<B>;
|
||||
/// Is block with given hash is currently in the queue.
|
||||
fn is_importing(&self, hash: &B::Hash) -> bool;
|
||||
/// Import bunch of blocks.
|
||||
fn import_blocks(&self, sync: &mut ChainSync<B>, protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>));
|
||||
}
|
||||
|
||||
/// Import queue status. It isn't completely accurate.
|
||||
pub struct ImportQueueStatus<B: BlockT> {
|
||||
/// Number of blocks that are currently in the queue.
|
||||
pub importing_count: usize,
|
||||
/// The number of the best block that was ever in the queue since start/last failure.
|
||||
pub best_importing_number: <<B as BlockT>::Header as HeaderT>::Number,
|
||||
}
|
||||
|
||||
/// Blocks import queue that is importing blocks in the separate thread.
|
||||
pub struct AsyncImportQueue<B: BlockT> {
|
||||
handle: Mutex<Option<::std::thread::JoinHandle<()>>>,
|
||||
data: Arc<AsyncImportQueueData<B>>,
|
||||
}
|
||||
|
||||
/// Locks order: queue, queue_blocks, best_importing_number
|
||||
struct AsyncImportQueueData<B: BlockT> {
|
||||
signal: Condvar,
|
||||
queue: Mutex<VecDeque<(BlockOrigin, Vec<BlockData<B>>)>>,
|
||||
queue_blocks: RwLock<HashSet<B::Hash>>,
|
||||
best_importing_number: RwLock<<<B as BlockT>::Header as HeaderT>::Number>,
|
||||
is_stopping: AtomicBool,
|
||||
}
|
||||
|
||||
impl<B: BlockT> AsyncImportQueue<B> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
handle: Mutex::new(None),
|
||||
data: Arc::new(AsyncImportQueueData::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start<E: 'static + ExecuteInContext<B>>(&self, sync: Weak<RwLock<ChainSync<B>>>, service: Weak<E>, chain: Weak<Client<B>>) -> Result<(), Error> {
|
||||
debug_assert!(self.handle.lock().is_none());
|
||||
|
||||
let qdata = self.data.clone();
|
||||
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
|
||||
import_thread(sync, service, chain, qdata)
|
||||
}).map_err(|err| Error::from(ErrorKind::Io(err)))?);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT> AsyncImportQueueData<B> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
signal: Default::default(),
|
||||
queue: Mutex::new(VecDeque::new()),
|
||||
queue_blocks: RwLock::new(HashSet::new()),
|
||||
best_importing_number: RwLock::new(Zero::zero()),
|
||||
is_stopping: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {
|
||||
fn clear(&self) {
|
||||
let mut queue = self.data.queue.lock();
|
||||
let mut queue_blocks = self.data.queue_blocks.write();
|
||||
let mut best_importing_number = self.data.best_importing_number.write();
|
||||
queue_blocks.clear();
|
||||
queue.clear();
|
||||
*best_importing_number = Zero::zero();
|
||||
}
|
||||
|
||||
fn status(&self) -> ImportQueueStatus<B> {
|
||||
ImportQueueStatus {
|
||||
importing_count: self.data.queue_blocks.read().len(),
|
||||
best_importing_number: *self.data.best_importing_number.read(),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_importing(&self, hash: &B::Hash) -> bool {
|
||||
self.data.queue_blocks.read().contains(hash)
|
||||
}
|
||||
|
||||
fn import_blocks(&self, _sync: &mut ChainSync<B>, _protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>)) {
|
||||
trace!(target:"sync", "Scheduling {} blocks for import", blocks.1.len());
|
||||
|
||||
let mut queue = self.data.queue.lock();
|
||||
let mut queue_blocks = self.data.queue_blocks.write();
|
||||
let mut best_importing_number = self.data.best_importing_number.write();
|
||||
let new_best_importing_number = blocks.1.last().and_then(|b| b.block.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero());
|
||||
queue_blocks.extend(blocks.1.iter().map(|b| b.block.hash.clone()));
|
||||
if new_best_importing_number > *best_importing_number {
|
||||
*best_importing_number = new_best_importing_number;
|
||||
}
|
||||
queue.push_back(blocks);
|
||||
self.data.signal.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT> Drop for AsyncImportQueue<B> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(handle) = self.handle.lock().take() {
|
||||
self.data.is_stopping.store(true, Ordering::SeqCst);
|
||||
let _ = handle.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks import thread.
|
||||
fn import_thread<B: BlockT, E: ExecuteInContext<B>>(sync: Weak<RwLock<ChainSync<B>>>, service: Weak<E>, chain: Weak<Client<B>>, qdata: Arc<AsyncImportQueueData<B>>) {
|
||||
trace!(target: "sync", "Starting import thread");
|
||||
|
||||
loop {
|
||||
let new_blocks = {
|
||||
let mut queue_lock = qdata.queue.lock();
|
||||
if queue_lock.is_empty() {
|
||||
qdata.signal.wait(&mut queue_lock);
|
||||
}
|
||||
|
||||
match queue_lock.pop_front() {
|
||||
Some(new_blocks) => new_blocks,
|
||||
None => break,
|
||||
}
|
||||
};
|
||||
if qdata.is_stopping.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
|
||||
match (sync.upgrade(), service.upgrade(), chain.upgrade()) {
|
||||
(Some(sync), Some(service), Some(chain)) => {
|
||||
let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect();
|
||||
if !import_many_blocks(&mut SyncLink::Indirect(&sync, &*chain, &*service), Some(&*qdata), new_blocks) {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut queue_blocks = qdata.queue_blocks.write();
|
||||
for blocks_hash in blocks_hashes {
|
||||
queue_blocks.remove(&blocks_hash);
|
||||
}
|
||||
},
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "sync", "Stopping import thread");
|
||||
}
|
||||
|
||||
/// ChainSync link trait.
|
||||
trait SyncLinkApi<B: BlockT> {
|
||||
/// Get chain reference.
|
||||
fn chain(&self) -> &Client<B>;
|
||||
/// Block imported.
|
||||
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>);
|
||||
/// Maintain sync.
|
||||
fn maintain_sync(&mut self);
|
||||
/// Disconnect from peer.
|
||||
fn disconnect(&mut self, peer_id: PeerId);
|
||||
/// Disconnect from peer and restart sync.
|
||||
fn disconnect_and_restart(&mut self, peer_id: PeerId);
|
||||
/// Restart sync.
|
||||
fn restart(&mut self);
|
||||
}
|
||||
|
||||
/// Link with the ChainSync service.
|
||||
enum SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext<B>> {
|
||||
/// Indirect link (through service).
|
||||
Indirect(&'a RwLock<ChainSync<B>>, &'a Client<B>, &'a E),
|
||||
/// Direct references are given.
|
||||
#[cfg(test)]
|
||||
Direct(&'a mut ChainSync<B>, &'a mut Context<B>),
|
||||
}
|
||||
|
||||
/// Block import successful result.
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum BlockImportResult<H: ::std::fmt::Debug + PartialEq, N: ::std::fmt::Debug + PartialEq> {
|
||||
/// Block is not imported.
|
||||
NotImported(H, N),
|
||||
/// Imported known block.
|
||||
ImportedKnown(H, N),
|
||||
/// Imported unknown block.
|
||||
ImportedUnknown(H, N),
|
||||
}
|
||||
|
||||
/// Block import error.
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum BlockImportError {
|
||||
/// Disconnect from peer and continue import of next bunch of blocks.
|
||||
Disconnect(PeerId),
|
||||
/// Disconnect from peer and restart sync.
|
||||
DisconnectAndRestart(PeerId),
|
||||
/// Restart sync.
|
||||
Restart,
|
||||
}
|
||||
|
||||
/// Import a bunch of blocks.
|
||||
fn import_many_blocks<'a, B: BlockT>(
|
||||
link: &mut SyncLinkApi<B>,
|
||||
qdata: Option<&AsyncImportQueueData<B>>,
|
||||
blocks: (BlockOrigin, Vec<BlockData<B>>)
|
||||
) -> bool
|
||||
{
|
||||
let (blocks_origin, blocks) = blocks;
|
||||
let count = blocks.len();
|
||||
let mut imported = 0;
|
||||
|
||||
// Blocks in the response/drain should be in ascending order.
|
||||
for block in blocks {
|
||||
let import_result = import_single_block(link.chain(), blocks_origin.clone(), block);
|
||||
let is_import_failed = import_result.is_err();
|
||||
imported += process_import_result(link, import_result);
|
||||
if is_import_failed {
|
||||
qdata.map(|qdata| *qdata.best_importing_number.write() = Zero::zero());
|
||||
return true;
|
||||
}
|
||||
|
||||
if qdata.map(|qdata| qdata.is_stopping.load(Ordering::SeqCst)).unwrap_or_default() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "sync", "Imported {} of {}", imported, count);
|
||||
link.maintain_sync();
|
||||
true
|
||||
}
|
||||
|
||||
/// Single block import function.
|
||||
fn import_single_block<B: BlockT>(
|
||||
chain: &Client<B>,
|
||||
block_origin: BlockOrigin,
|
||||
block: BlockData<B>
|
||||
) -> Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
|
||||
{
|
||||
let origin = block.origin;
|
||||
let block = block.block;
|
||||
match (block.header, block.justification) {
|
||||
(Some(header), Some(justification)) => {
|
||||
let number = header.number().clone();
|
||||
let hash = header.hash();
|
||||
let parent = header.parent_hash().clone();
|
||||
|
||||
// check whether the block is known before importing.
|
||||
match chain.block_status(&BlockId::Hash(hash)) {
|
||||
Ok(BlockStatus::InChain) => return Ok(BlockImportResult::NotImported(hash, number)),
|
||||
Ok(_) => {},
|
||||
Err(e) => {
|
||||
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
|
||||
return Err(BlockImportError::Restart);
|
||||
}
|
||||
}
|
||||
|
||||
let result = chain.import(
|
||||
block_origin,
|
||||
header,
|
||||
justification,
|
||||
block.body,
|
||||
);
|
||||
match result {
|
||||
Ok(ImportResult::AlreadyInChain) => {
|
||||
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
|
||||
Ok(BlockImportResult::ImportedKnown(hash, number))
|
||||
},
|
||||
Ok(ImportResult::AlreadyQueued) => {
|
||||
trace!(target: "sync", "Block already queued {}: {:?}", number, hash);
|
||||
Ok(BlockImportResult::ImportedKnown(hash, number))
|
||||
},
|
||||
Ok(ImportResult::Queued) => {
|
||||
trace!(target: "sync", "Block queued {}: {:?}", number, hash);
|
||||
Ok(BlockImportResult::ImportedUnknown(hash, number))
|
||||
},
|
||||
Ok(ImportResult::UnknownParent) => {
|
||||
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
|
||||
Err(BlockImportError::Restart)
|
||||
},
|
||||
Ok(ImportResult::KnownBad) => {
|
||||
debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash);
|
||||
Err(BlockImportError::DisconnectAndRestart(origin)) //TODO: use persistent ID
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
|
||||
Err(BlockImportError::Restart)
|
||||
}
|
||||
}
|
||||
},
|
||||
(None, _) => {
|
||||
debug!(target: "sync", "Header {} was not provided by {} ", block.hash, origin);
|
||||
Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID
|
||||
},
|
||||
(_, None) => {
|
||||
debug!(target: "sync", "Justification set for block {} was not provided by {} ", block.hash, origin);
|
||||
Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Process single block import result.
|
||||
fn process_import_result<'a, B: BlockT>(
|
||||
link: &mut SyncLinkApi<B>,
|
||||
result: Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
|
||||
) -> usize
|
||||
{
|
||||
match result {
|
||||
Ok(BlockImportResult::NotImported(_, _)) => 0,
|
||||
Ok(BlockImportResult::ImportedKnown(hash, number)) => {
|
||||
link.block_imported(&hash, number);
|
||||
0
|
||||
},
|
||||
Ok(BlockImportResult::ImportedUnknown(hash, number)) => {
|
||||
link.block_imported(&hash, number);
|
||||
1
|
||||
},
|
||||
Err(BlockImportError::Disconnect(peer_id)) => {
|
||||
link.disconnect(peer_id);
|
||||
0
|
||||
},
|
||||
Err(BlockImportError::DisconnectAndRestart(peer_id)) => {
|
||||
link.disconnect_and_restart(peer_id);
|
||||
0
|
||||
},
|
||||
Err(BlockImportError::Restart) => {
|
||||
link.restart();
|
||||
0
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLink<'a, B, E> {
|
||||
/// Execute closure with locked ChainSync.
|
||||
fn with_sync<F: Fn(&mut ChainSync<B>, &mut Context<B>)>(&mut self, closure: F) {
|
||||
match *self {
|
||||
#[cfg(test)]
|
||||
SyncLink::Direct(ref mut sync, ref mut protocol) =>
|
||||
closure(*sync, *protocol),
|
||||
SyncLink::Indirect(ref sync, _, ref service) =>
|
||||
service.execute_in_context(move |protocol| {
|
||||
let mut sync = sync.write();
|
||||
closure(&mut *sync, protocol)
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, B: 'static + BlockT, E: ExecuteInContext<B>> SyncLinkApi<B> for SyncLink<'a, B, E> {
|
||||
fn chain(&self) -> &Client<B> {
|
||||
match *self {
|
||||
#[cfg(test)]
|
||||
SyncLink::Direct(_, ref protocol) => protocol.client(),
|
||||
SyncLink::Indirect(_, ref chain, _) => *chain,
|
||||
}
|
||||
}
|
||||
|
||||
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||
self.with_sync(|sync, _| sync.block_imported(&hash, number))
|
||||
}
|
||||
|
||||
fn maintain_sync(&mut self) {
|
||||
self.with_sync(|sync, protocol| sync.maintain_sync(protocol))
|
||||
}
|
||||
|
||||
fn disconnect(&mut self, peer_id: PeerId) {
|
||||
self.with_sync(|_, protocol| protocol.disconnect_peer(peer_id))
|
||||
}
|
||||
|
||||
fn disconnect_and_restart(&mut self, peer_id: PeerId) {
|
||||
self.with_sync(|sync, protocol| {
|
||||
protocol.disconnect_peer(peer_id);
|
||||
sync.restart(protocol);
|
||||
})
|
||||
}
|
||||
|
||||
fn restart(&mut self) {
|
||||
self.with_sync(|sync, protocol| sync.restart(protocol))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use client;
|
||||
use message;
|
||||
use test_client::{self, TestClient};
|
||||
use test_client::runtime::{Block, Hash};
|
||||
use super::*;
|
||||
|
||||
/// Blocks import queue that is importing blocks in the same thread.
|
||||
pub struct SyncImportQueue;
|
||||
struct DummyExecuteInContext;
|
||||
|
||||
impl<B: 'static + BlockT> ExecuteInContext<B> for DummyExecuteInContext {
|
||||
fn execute_in_context<F: Fn(&mut Context<B>)>(&self, _closure: F) { }
|
||||
}
|
||||
|
||||
impl<B: 'static + BlockT> ImportQueue<B> for SyncImportQueue {
|
||||
fn clear(&self) { }
|
||||
|
||||
fn status(&self) -> ImportQueueStatus<B> {
|
||||
ImportQueueStatus {
|
||||
importing_count: 0,
|
||||
best_importing_number: Zero::zero(),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_importing(&self, _hash: &B::Hash) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn import_blocks(&self, sync: &mut ChainSync<B>, protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>)) {
|
||||
import_many_blocks(&mut SyncLink::Direct::<_, DummyExecuteInContext>(sync, protocol), None, blocks);
|
||||
}
|
||||
}
|
||||
|
||||
struct TestLink {
|
||||
chain: Arc<Client<Block>>,
|
||||
imported: usize,
|
||||
maintains: usize,
|
||||
disconnects: usize,
|
||||
restarts: usize,
|
||||
}
|
||||
|
||||
impl TestLink {
|
||||
fn new() -> TestLink {
|
||||
TestLink {
|
||||
chain: Arc::new(test_client::new()),
|
||||
imported: 0,
|
||||
maintains: 0,
|
||||
disconnects: 0,
|
||||
restarts: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn total(&self) -> usize {
|
||||
self.imported + self.maintains + self.disconnects + self.restarts
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncLinkApi<Block> for TestLink {
|
||||
fn chain(&self) -> &Client<Block> { &*self.chain }
|
||||
fn block_imported(&mut self, _hash: &Hash, _number: NumberFor<Block>) { self.imported += 1; }
|
||||
fn maintain_sync(&mut self) { self.maintains += 1; }
|
||||
fn disconnect(&mut self, _peer_id: PeerId) { self.disconnects += 1; }
|
||||
fn disconnect_and_restart(&mut self, _peer_id: PeerId) { self.disconnects += 1; self.restarts += 1; }
|
||||
fn restart(&mut self) { self.restarts += 1; }
|
||||
}
|
||||
|
||||
fn prepare_good_block() -> (client::Client<test_client::Backend, test_client::Executor, Block>, Hash, u64, BlockData<Block>) {
|
||||
let client = test_client::new();
|
||||
let block = client.new_block().unwrap().bake().unwrap();
|
||||
client.justify_and_import(BlockOrigin::File, block).unwrap();
|
||||
|
||||
let (hash, number) = (client.block_hash(1).unwrap().unwrap(), 1);
|
||||
let block = message::BlockData::<Block> {
|
||||
hash: client.block_hash(1).unwrap().unwrap(),
|
||||
header: client.header(&BlockId::Number(1)).unwrap(),
|
||||
body: None,
|
||||
receipt: None,
|
||||
message_queue: None,
|
||||
justification: client.justification(&BlockId::Number(1)).unwrap(),
|
||||
};
|
||||
|
||||
(client, hash, number, BlockData { block, origin: 0 })
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_single_good_block_works() {
|
||||
let (_, hash, number, block) = prepare_good_block();
|
||||
assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Ok(BlockImportResult::ImportedUnknown(hash, number)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_single_good_known_block_is_ignored() {
|
||||
let (client, hash, number, block) = prepare_good_block();
|
||||
assert_eq!(import_single_block(&client, BlockOrigin::File, block), Ok(BlockImportResult::NotImported(hash, number)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_single_good_block_without_header_fails() {
|
||||
let (_, _, _, mut block) = prepare_good_block();
|
||||
block.block.header = None;
|
||||
assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Err(BlockImportError::Disconnect(0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_single_good_block_without_justification_fails() {
|
||||
let (_, _, _, mut block) = prepare_good_block();
|
||||
block.block.justification = None;
|
||||
assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Err(BlockImportError::Disconnect(0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_import_result_works() {
|
||||
let mut link = TestLink::new();
|
||||
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::NotImported(Default::default(), 0))), 0);
|
||||
assert_eq!(link.total(), 0);
|
||||
|
||||
let mut link = TestLink::new();
|
||||
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 0);
|
||||
assert_eq!(link.total(), 1);
|
||||
assert_eq!(link.imported, 1);
|
||||
|
||||
let mut link = TestLink::new();
|
||||
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1);
|
||||
assert_eq!(link.total(), 1);
|
||||
assert_eq!(link.imported, 1);
|
||||
|
||||
let mut link = TestLink::new();
|
||||
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Disconnect(0))), 0);
|
||||
assert_eq!(link.total(), 1);
|
||||
assert_eq!(link.disconnects, 1);
|
||||
|
||||
let mut link = TestLink::new();
|
||||
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::DisconnectAndRestart(0))), 0);
|
||||
assert_eq!(link.total(), 2);
|
||||
assert_eq!(link.disconnects, 1);
|
||||
assert_eq!(link.restarts, 1);
|
||||
|
||||
let mut link = TestLink::new();
|
||||
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Restart)), 0);
|
||||
assert_eq!(link.total(), 1);
|
||||
assert_eq!(link.restarts, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_many_blocks_stops_when_stopping() {
|
||||
let (_, _, _, block) = prepare_good_block();
|
||||
let qdata = AsyncImportQueueData::new();
|
||||
qdata.is_stopping.store(true, Ordering::SeqCst);
|
||||
assert!(!import_many_blocks(&mut TestLink::new(), Some(&qdata), (BlockOrigin::File, vec![block.clone(), block])));
|
||||
}
|
||||
}
|
||||
@@ -46,6 +46,7 @@ mod config;
|
||||
mod chain;
|
||||
mod blocks;
|
||||
mod on_demand;
|
||||
mod import_queue;
|
||||
pub mod consensus_gossip;
|
||||
pub mod error;
|
||||
pub mod message;
|
||||
|
||||
@@ -506,7 +506,7 @@ pub mod generic {
|
||||
3 => Some(Message::BlockAnnounce(Decode::decode(input)?)),
|
||||
4 => Some(Message::Transactions(Decode::decode(input)?)),
|
||||
5 => Some(Message::BftMessage(Decode::decode(input)?)),
|
||||
6 => Some(Message::RemoteCallResponse(Decode::decode(input)?)),
|
||||
6 => Some(Message::RemoteCallRequest(Decode::decode(input)?)),
|
||||
7 => Some(Message::RemoteCallResponse(Decode::decode(input)?)),
|
||||
255 => Some(Message::ChainSpecific(Decode::decode(input)?)),
|
||||
_ => None,
|
||||
|
||||
@@ -29,6 +29,7 @@ use message::generic::Message as GenericMessage;
|
||||
use specialization::Specialization;
|
||||
use sync::{ChainSync, Status as SyncStatus, SyncState};
|
||||
use service::{Roles, TransactionPool};
|
||||
use import_queue::ImportQueue;
|
||||
use config::ProtocolConfig;
|
||||
use chain::Client;
|
||||
use on_demand::OnDemandService;
|
||||
@@ -51,7 +52,7 @@ pub struct Protocol<B: BlockT, S: Specialization<B>> {
|
||||
config: ProtocolConfig,
|
||||
on_demand: Option<Arc<OnDemandService<B>>>,
|
||||
genesis_hash: B::Hash,
|
||||
sync: RwLock<ChainSync<B>>,
|
||||
sync: Arc<RwLock<ChainSync<B>>>,
|
||||
specialization: RwLock<S>,
|
||||
context_data: ContextData<B>,
|
||||
// Connected peers pending Status message.
|
||||
@@ -196,12 +197,13 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
|
||||
pub fn new(
|
||||
config: ProtocolConfig,
|
||||
chain: Arc<Client<B>>,
|
||||
import_queue: Arc<ImportQueue<B>>,
|
||||
on_demand: Option<Arc<OnDemandService<B>>>,
|
||||
transaction_pool: Arc<TransactionPool<B>>,
|
||||
specialization: S,
|
||||
) -> error::Result<Self> {
|
||||
let info = chain.info()?;
|
||||
let sync = ChainSync::new(config.roles, &info);
|
||||
let sync = ChainSync::new(config.roles, &info, import_queue);
|
||||
let protocol = Protocol {
|
||||
config: config,
|
||||
context_data: ContextData {
|
||||
@@ -210,7 +212,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
|
||||
},
|
||||
on_demand,
|
||||
genesis_hash: info.chain.genesis_hash,
|
||||
sync: RwLock::new(sync),
|
||||
sync: Arc::new(RwLock::new(sync)),
|
||||
specialization: RwLock::new(specialization),
|
||||
handshaking_peers: RwLock::new(HashMap::new()),
|
||||
transaction_pool: transaction_pool,
|
||||
@@ -222,6 +224,10 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
|
||||
&self.context_data
|
||||
}
|
||||
|
||||
pub(crate) fn sync(&self) -> &Arc<RwLock<ChainSync<B>>> {
|
||||
&self.sync
|
||||
}
|
||||
|
||||
/// Returns protocol status
|
||||
pub fn status(&self) -> ProtocolStatus<B> {
|
||||
let sync = self.sync.read();
|
||||
|
||||
@@ -31,6 +31,7 @@ use chain::Client;
|
||||
use message::LocalizedBftMessage;
|
||||
use specialization::Specialization;
|
||||
use on_demand::OnDemandService;
|
||||
use import_queue::AsyncImportQueue;
|
||||
use runtime_primitives::traits::{Block as BlockT};
|
||||
|
||||
/// Type that represents fetch completion future.
|
||||
@@ -147,21 +148,30 @@ pub struct Service<B: BlockT + 'static, S: Specialization<B>> {
|
||||
impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
|
||||
/// Creates and register protocol with the network service
|
||||
pub fn new(params: Params<B, S>, protocol_id: ProtocolId) -> Result<Arc<Service<B, S>>, Error> {
|
||||
let chain = params.chain.clone();
|
||||
let service = NetworkService::new(params.network_config.clone(), None)?;
|
||||
let import_queue = Arc::new(AsyncImportQueue::new());
|
||||
let sync = Arc::new(Service {
|
||||
network: service,
|
||||
protocol_id,
|
||||
handler: Arc::new(ProtocolHandler {
|
||||
protocol: Protocol::new(
|
||||
params.config,
|
||||
params.chain.clone(),
|
||||
params.chain,
|
||||
import_queue.clone(),
|
||||
params.on_demand,
|
||||
params.transaction_pool,
|
||||
params.specialization
|
||||
params.specialization,
|
||||
)?,
|
||||
}),
|
||||
});
|
||||
|
||||
import_queue.start(
|
||||
Arc::downgrade(sync.handler.protocol.sync()),
|
||||
Arc::downgrade(&sync),
|
||||
Arc::downgrade(&chain)
|
||||
)?;
|
||||
|
||||
Ok(sync)
|
||||
}
|
||||
|
||||
|
||||
@@ -15,17 +15,22 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use protocol::Context;
|
||||
use network_libp2p::PeerId;
|
||||
use client::{ImportResult, BlockStatus, ClientInfo};
|
||||
use client::{BlockStatus, BlockOrigin, ClientInfo};
|
||||
use client::error::Error as ClientError;
|
||||
use blocks::{self, BlockCollection};
|
||||
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor};
|
||||
use runtime_primitives::generic::BlockId;
|
||||
use message::{self, generic::Message as GenericMessage};
|
||||
use service::Roles;
|
||||
use import_queue::ImportQueue;
|
||||
|
||||
// Maximum blocks to request in a single packet.
|
||||
const MAX_BLOCKS_TO_REQUEST: usize = 128;
|
||||
// Maximum blocks to store in the import queue.
|
||||
const MAX_IMPORING_BLOCKS: usize = 2048;
|
||||
|
||||
struct PeerSync<B: BlockT> {
|
||||
pub common_hash: B::Hash,
|
||||
@@ -51,6 +56,7 @@ pub struct ChainSync<B: BlockT> {
|
||||
best_queued_number: NumberFor<B>,
|
||||
best_queued_hash: B::Hash,
|
||||
required_block_attributes: message::BlockAttributes,
|
||||
import_queue: Arc<ImportQueue<B>>,
|
||||
}
|
||||
|
||||
/// Reported sync state.
|
||||
@@ -73,7 +79,7 @@ pub struct Status<B: BlockT> {
|
||||
|
||||
impl<B: BlockT> ChainSync<B> {
|
||||
/// Create a new instance.
|
||||
pub(crate) fn new(role: Roles, info: &ClientInfo<B>) -> Self {
|
||||
pub(crate) fn new(role: Roles, info: &ClientInfo<B>, import_queue: Arc<ImportQueue<B>>) -> Self {
|
||||
let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION;
|
||||
if role.intersects(Roles::FULL) {
|
||||
required_block_attributes |= message::BlockAttributes::BODY;
|
||||
@@ -86,6 +92,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
|
||||
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
|
||||
required_block_attributes,
|
||||
import_queue,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +116,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
/// Handle new connected peer.
|
||||
pub(crate) fn new_peer(&mut self, protocol: &mut Context<B>, peer_id: PeerId) {
|
||||
if let Some(info) = protocol.peer_info(peer_id) {
|
||||
match (protocol.client().block_status(&BlockId::Hash(info.best_hash)), info.best_number) {
|
||||
match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) {
|
||||
(Err(e), _) => {
|
||||
debug!(target:"sync", "Error reading blockchain: {:?}", e);
|
||||
protocol.disconnect_peer(peer_id);
|
||||
@@ -160,8 +167,6 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
pub(crate) fn on_block_data(&mut self, protocol: &mut Context<B>, peer_id: PeerId, _request: message::BlockRequest<B>, response: message::BlockResponse<B>) {
|
||||
let count = response.blocks.len();
|
||||
let mut imported: usize = 0;
|
||||
let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
match peer.state {
|
||||
PeerSyncState::DownloadingNew(start_block) => {
|
||||
@@ -225,87 +230,20 @@ impl<B: BlockT> ChainSync<B> {
|
||||
};
|
||||
|
||||
let best_seen = self.best_seen_block();
|
||||
// Blocks in the response/drain should be in ascending order.
|
||||
for block in new_blocks {
|
||||
let origin = block.origin;
|
||||
let block = block.block;
|
||||
match (block.header, block.justification) {
|
||||
(Some(header), Some(justification)) => {
|
||||
let number = header.number().clone();
|
||||
let hash = header.hash();
|
||||
let parent = header.parent_hash().clone();
|
||||
let is_best = best_seen.as_ref().map_or(false, |n| number >= *n);
|
||||
|
||||
// check whether the block is known before importing.
|
||||
match protocol.client().block_status(&BlockId::Hash(hash)) {
|
||||
Ok(BlockStatus::InChain) => continue,
|
||||
Ok(_) => {},
|
||||
Err(e) => {
|
||||
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
|
||||
self.restart(protocol);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let result = protocol.client().import(
|
||||
is_best,
|
||||
header,
|
||||
justification,
|
||||
block.body,
|
||||
);
|
||||
match result {
|
||||
Ok(ImportResult::AlreadyInChain) => {
|
||||
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
|
||||
self.block_imported(&hash, number);
|
||||
},
|
||||
Ok(ImportResult::AlreadyQueued) => {
|
||||
trace!(target: "sync", "Block already queued {}: {:?}", number, hash);
|
||||
self.block_imported(&hash, number);
|
||||
},
|
||||
Ok(ImportResult::Queued) => {
|
||||
trace!(target: "sync", "Block queued {}: {:?}", number, hash);
|
||||
self.block_imported(&hash, number);
|
||||
imported = imported + 1;
|
||||
},
|
||||
Ok(ImportResult::UnknownParent) => {
|
||||
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
|
||||
self.restart(protocol);
|
||||
return;
|
||||
},
|
||||
Ok(ImportResult::KnownBad) => {
|
||||
protocol.disable_peer(origin, &format!("Peer gave us a bad block {}: {:?}", number, hash)); //TODO: use persistent ID
|
||||
self.restart(protocol);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
|
||||
self.restart(protocol);
|
||||
return;
|
||||
}
|
||||
}
|
||||
},
|
||||
(None, _) => {
|
||||
protocol.disable_peer(origin, &format!("Header {} was not provided by peer ", block.hash)); //TODO: use persistent ID
|
||||
return;
|
||||
},
|
||||
(_, None) => {
|
||||
protocol.disable_peer(origin, &format!("Justification set for block {} was not provided by peer", block.hash)); //TODO: use persistent ID
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!(target: "sync", "Imported {} of {}", imported, count);
|
||||
self.maintain_sync(protocol);
|
||||
let is_best = new_blocks.first().and_then(|b| b.block.header.as_ref()).map(|h| best_seen.as_ref().map_or(false, |n| h.number() >= n));
|
||||
let origin = if is_best.unwrap_or_default() { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync };
|
||||
let import_queue = self.import_queue.clone();
|
||||
import_queue.import_blocks(self, protocol, (origin, new_blocks))
|
||||
}
|
||||
|
||||
fn maintain_sync(&mut self, protocol: &mut Context<B>) {
|
||||
pub fn maintain_sync(&mut self, protocol: &mut Context<B>) {
|
||||
let peers: Vec<PeerId> = self.peers.keys().map(|p| *p).collect();
|
||||
for peer in peers {
|
||||
self.download_new(protocol, peer);
|
||||
}
|
||||
}
|
||||
|
||||
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||
pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||
if number > self.best_queued_number {
|
||||
self.best_queued_number = number;
|
||||
self.best_queued_hash = *hash;
|
||||
@@ -359,7 +297,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
|
||||
fn is_known_or_already_downloading(&self, protocol: &mut Context<B>, hash: &B::Hash) -> bool {
|
||||
self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
|
||||
|| protocol.client().block_status(&BlockId::Hash(*hash)).ok().map_or(false, |s| s != BlockStatus::Unknown)
|
||||
|| block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
|
||||
}
|
||||
|
||||
pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, peer_id: PeerId) {
|
||||
@@ -369,6 +307,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
pub(crate) fn restart(&mut self, protocol: &mut Context<B>) {
|
||||
self.import_queue.clear();
|
||||
self.blocks.clear();
|
||||
let ids: Vec<PeerId> = self.peers.keys().map(|p| *p).collect();
|
||||
for id in ids {
|
||||
@@ -416,10 +355,18 @@ impl<B: BlockT> ChainSync<B> {
|
||||
// Issue a request for a peer to download new blocks, if any are available
|
||||
fn download_new(&mut self, protocol: &mut Context<B>, peer_id: PeerId) {
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, peer.common_number, peer.best_number);
|
||||
let import_status = self.import_queue.status();
|
||||
// when there are too many blocks in the queue => do not try to download new blocks
|
||||
if import_status.importing_count > MAX_IMPORING_BLOCKS {
|
||||
return;
|
||||
}
|
||||
// we should not download already queued blocks
|
||||
let common_number = ::std::cmp::max(peer.common_number, import_status.best_importing_number);
|
||||
|
||||
trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, common_number, peer.best_number);
|
||||
match peer.state {
|
||||
PeerSyncState::Available => {
|
||||
if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) {
|
||||
if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, common_number) {
|
||||
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", peer_id, range.start, range.end);
|
||||
let request = message::generic::BlockRequest {
|
||||
id: 0,
|
||||
@@ -453,3 +400,16 @@ impl<B: BlockT> ChainSync<B> {
|
||||
protocol.send_message(peer_id, GenericMessage::BlockRequest(request));
|
||||
}
|
||||
}
|
||||
|
||||
/// Get block status, taking into account import queue.
|
||||
fn block_status<B: BlockT>(
|
||||
chain: &::chain::Client<B>,
|
||||
queue: &ImportQueue<B>,
|
||||
hash: B::Hash) -> Result<BlockStatus, ClientError>
|
||||
{
|
||||
if queue.is_importing(&hash) {
|
||||
return Ok(BlockStatus::Queued);
|
||||
}
|
||||
|
||||
chain.block_status(&BlockId::Hash(hash))
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ use service::TransactionPool;
|
||||
use network_libp2p::{PeerId, SessionInfo, Error as NetworkError};
|
||||
use keyring::Keyring;
|
||||
use codec::Encode;
|
||||
use import_queue::tests::SyncImportQueue;
|
||||
use test_client::{self, TestClient};
|
||||
use test_client::runtime::{Block, Hash, Transfer, Extrinsic};
|
||||
use specialization::Specialization;
|
||||
@@ -247,7 +248,8 @@ impl TestNet {
|
||||
pub fn add_peer(&mut self, config: &ProtocolConfig) {
|
||||
let client = Arc::new(test_client::new());
|
||||
let tx_pool = Arc::new(EmptyTransactionPool);
|
||||
let sync = Protocol::new(config.clone(), client.clone(), None, tx_pool, DummySpecialization).unwrap();
|
||||
let import_queue = Arc::new(SyncImportQueue);
|
||||
let sync = Protocol::new(config.clone(), client.clone(), import_queue, None, tx_pool, DummySpecialization).unwrap();
|
||||
self.peers.push(Arc::new(Peer {
|
||||
sync: sync,
|
||||
client: client,
|
||||
|
||||
Reference in New Issue
Block a user