mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 06:21:11 +00:00
Persist block announcements (#3826)
* Persist block announcements * Renamed sync requests to fork targets * Fixed pruning detection condition
This commit is contained in:
committed by
Gavin Wood
parent
3437d64115
commit
3963bb58ff
@@ -1121,18 +1121,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match self.sync.on_block_announce(who.clone(), hash, &announce, is_their_best) {
|
match self.sync.on_block_announce(who.clone(), hash, &announce, is_their_best) {
|
||||||
sync::OnBlockAnnounce::Request(peer, req) => {
|
|
||||||
self.send_message(peer, GenericMessage::BlockRequest(req));
|
|
||||||
return CustomMessageOutcome::None
|
|
||||||
}
|
|
||||||
sync::OnBlockAnnounce::Nothing => {
|
sync::OnBlockAnnounce::Nothing => {
|
||||||
// try_import is only true when we have all data required to import block
|
// `on_block_announce` returns `OnBlockAnnounce::ImportHeader`
|
||||||
|
// when we have all data required to import the block
|
||||||
// in the BlockAnnounce message. This is only when:
|
// in the BlockAnnounce message. This is only when:
|
||||||
// 1) we're on light client;
|
// 1) we're on light client;
|
||||||
// AND
|
// AND
|
||||||
// - EITHER 2.1) announced block is stale;
|
// 2) parent block is already imported and not pruned.
|
||||||
// - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e.
|
|
||||||
// there are no ascendants of this block scheduled for retrieval)
|
|
||||||
return CustomMessageOutcome::None
|
return CustomMessageOutcome::None
|
||||||
}
|
}
|
||||||
sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import.
|
sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import.
|
||||||
|
|||||||
@@ -69,9 +69,6 @@ const MAJOR_SYNC_BLOCKS: u8 = 5;
|
|||||||
/// Number of recently announced blocks to track for each peer.
|
/// Number of recently announced blocks to track for each peer.
|
||||||
const ANNOUNCE_HISTORY_SIZE: usize = 64;
|
const ANNOUNCE_HISTORY_SIZE: usize = 64;
|
||||||
|
|
||||||
/// Max number of blocks to download for unknown forks.
|
|
||||||
const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32;
|
|
||||||
|
|
||||||
/// Reputation change when a peer sent us a status message that led to a
|
/// Reputation change when a peer sent us a status message that led to a
|
||||||
/// database read error.
|
/// database read error.
|
||||||
const BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE: i32 = -(1 << 16);
|
const BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE: i32 = -(1 << 16);
|
||||||
@@ -125,8 +122,8 @@ pub struct ChainSync<B: BlockT> {
|
|||||||
best_importing_number: NumberFor<B>,
|
best_importing_number: NumberFor<B>,
|
||||||
/// Finality proof handler.
|
/// Finality proof handler.
|
||||||
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
|
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
|
||||||
/// Explicit sync requests.
|
/// Fork sync targets.
|
||||||
sync_requests: HashMap<B::Hash, SyncRequest<B>>,
|
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
|
||||||
/// A flag that caches idle state with no pending requests.
|
/// A flag that caches idle state with no pending requests.
|
||||||
is_idle: bool,
|
is_idle: bool,
|
||||||
/// A type to check incoming block announcements.
|
/// A type to check incoming block announcements.
|
||||||
@@ -160,8 +157,9 @@ pub struct PeerInfo<B: BlockT> {
|
|||||||
pub best_number: NumberFor<B>
|
pub best_number: NumberFor<B>
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SyncRequest<B: BlockT> {
|
struct ForkTarget<B: BlockT> {
|
||||||
number: NumberFor<B>,
|
number: NumberFor<B>,
|
||||||
|
parent_hash: Option<B::Hash>,
|
||||||
peers: HashSet<PeerId>,
|
peers: HashSet<PeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -242,13 +240,11 @@ pub enum OnBlockData<B: BlockT> {
|
|||||||
|
|
||||||
/// Result of [`ChainSync::on_block_announce`].
|
/// Result of [`ChainSync::on_block_announce`].
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum OnBlockAnnounce<B: BlockT> {
|
pub enum OnBlockAnnounce {
|
||||||
/// The announcement does not require further handling.
|
/// The announcement does not require further handling.
|
||||||
Nothing,
|
Nothing,
|
||||||
/// The announcement header should be imported.
|
/// The announcement header should be imported.
|
||||||
ImportHeader,
|
ImportHeader,
|
||||||
/// Another block request to the given peer is necessary.
|
|
||||||
Request(PeerId, BlockRequest<B>)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result of [`ChainSync::on_block_justification`].
|
/// Result of [`ChainSync::on_block_justification`].
|
||||||
@@ -307,7 +303,7 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
queue_blocks: Default::default(),
|
queue_blocks: Default::default(),
|
||||||
best_importing_number: Zero::zero(),
|
best_importing_number: Zero::zero(),
|
||||||
request_builder,
|
request_builder,
|
||||||
sync_requests: Default::default(),
|
fork_targets: Default::default(),
|
||||||
is_idle: false,
|
is_idle: false,
|
||||||
block_announce_validator,
|
block_announce_validator,
|
||||||
}
|
}
|
||||||
@@ -462,7 +458,7 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
// The implementation is similar to on_block_announce with unknown parent hash.
|
// The implementation is similar to on_block_announce with unknown parent hash.
|
||||||
pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
|
pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
|
||||||
if peers.is_empty() {
|
if peers.is_empty() {
|
||||||
if let Some(_) = self.sync_requests.remove(hash) {
|
if let Some(_) = self.fork_targets.remove(hash) {
|
||||||
debug!(target: "sync", "Cleared sync request for block {:?} with {:?}", hash, peers);
|
debug!(target: "sync", "Cleared sync request for block {:?} with {:?}", hash, peers);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
@@ -494,11 +490,12 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.sync_requests
|
self.fork_targets
|
||||||
.entry(hash.clone())
|
.entry(hash.clone())
|
||||||
.or_insert_with(|| SyncRequest {
|
.or_insert_with(|| ForkTarget {
|
||||||
number,
|
number,
|
||||||
peers: Default::default(),
|
peers: Default::default(),
|
||||||
|
parent_hash: None,
|
||||||
})
|
})
|
||||||
.peers.extend(peers);
|
.peers.extend(peers);
|
||||||
}
|
}
|
||||||
@@ -562,17 +559,30 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
}
|
}
|
||||||
let blocks = &mut self.blocks;
|
let blocks = &mut self.blocks;
|
||||||
let attrs = &self.required_block_attributes;
|
let attrs = &self.required_block_attributes;
|
||||||
let sync_requests = &self.sync_requests;
|
let fork_targets = &self.fork_targets;
|
||||||
let mut have_requests = false;
|
let mut have_requests = false;
|
||||||
let last_finalized = self.client.info().chain.finalized_number;
|
let last_finalized = self.client.info().chain.finalized_number;
|
||||||
let best_queued = self.best_queued_number;
|
let best_queued = self.best_queued_number;
|
||||||
|
let client = &self.client;
|
||||||
|
let queue = &self.queue_blocks;
|
||||||
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
|
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
|
||||||
if !peer.state.is_available() {
|
if !peer.state.is_available() {
|
||||||
trace!(target: "sync", "Peer {} is busy", id);
|
trace!(target: "sync", "Peer {} is busy", id);
|
||||||
return None
|
return None
|
||||||
}
|
}
|
||||||
if let Some((hash, req)) = explicit_sync_request(id, sync_requests, best_queued, last_finalized, attrs) {
|
if let Some((hash, req)) = fork_sync_request(
|
||||||
trace!(target: "sync", "Downloading explicitly requested block {:?} from {}", hash, id);
|
id,
|
||||||
|
fork_targets,
|
||||||
|
best_queued,
|
||||||
|
last_finalized,
|
||||||
|
attrs,
|
||||||
|
|hash| if queue.contains(hash) {
|
||||||
|
BlockStatus::Queued
|
||||||
|
} else {
|
||||||
|
client.block_status(&BlockId::Hash(*hash)).unwrap_or(BlockStatus::Unknown)
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
|
||||||
peer.state = PeerSyncState::DownloadingStale(hash);
|
peer.state = PeerSyncState::DownloadingStale(hash);
|
||||||
have_requests = true;
|
have_requests = true;
|
||||||
Some((id.clone(), req))
|
Some((id.clone(), req))
|
||||||
@@ -665,6 +675,26 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
peer.state = PeerSyncState::AncestorSearch(next_num, next_state);
|
peer.state = PeerSyncState::AncestorSearch(next_num, next_state);
|
||||||
return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num)))
|
return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num)))
|
||||||
} else {
|
} else {
|
||||||
|
// Ancestry search is complete. Check if peer is on a stale fork unknown to us and
|
||||||
|
// add it to sync targets if necessary.
|
||||||
|
trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={}",
|
||||||
|
self.best_queued_hash,
|
||||||
|
self.best_queued_number,
|
||||||
|
peer.best_hash,
|
||||||
|
peer.best_number,
|
||||||
|
peer.common_number
|
||||||
|
);
|
||||||
|
if peer.common_number < peer.best_number && peer.best_number < self.best_queued_number {
|
||||||
|
trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who);
|
||||||
|
self.fork_targets
|
||||||
|
.entry(peer.best_hash.clone())
|
||||||
|
.or_insert_with(|| ForkTarget {
|
||||||
|
number: peer.best_number,
|
||||||
|
parent_hash: None,
|
||||||
|
peers: Default::default(),
|
||||||
|
})
|
||||||
|
.peers.insert(who);
|
||||||
|
}
|
||||||
peer.state = PeerSyncState::Available;
|
peer.state = PeerSyncState::Available;
|
||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
@@ -922,14 +952,14 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
self.best_queued_number = number;
|
self.best_queued_number = number;
|
||||||
self.best_queued_hash = *hash;
|
self.best_queued_hash = *hash;
|
||||||
}
|
}
|
||||||
if let Some(_) = self.sync_requests.remove(&hash) {
|
if let Some(_) = self.fork_targets.remove(&hash) {
|
||||||
trace!(target: "sync", "Completed explicit sync request {:?}", hash);
|
trace!(target: "sync", "Completed fork sync {:?}", hash);
|
||||||
}
|
}
|
||||||
// Update common blocks
|
// Update common blocks
|
||||||
for (n, peer) in self.peers.iter_mut() {
|
for (n, peer) in self.peers.iter_mut() {
|
||||||
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
|
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
|
||||||
// Abort search.
|
// Wait for ancestry search to complete first.
|
||||||
peer.state = PeerSyncState::Available;
|
continue;
|
||||||
}
|
}
|
||||||
let new_common_number = if peer.best_number >= number {
|
let new_common_number = if peer.best_number >= number {
|
||||||
number
|
number
|
||||||
@@ -952,12 +982,12 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
|
|
||||||
/// Call when a node announces a new block.
|
/// Call when a node announces a new block.
|
||||||
///
|
///
|
||||||
/// If true is returned, then the caller MUST try to import passed
|
/// If `OnBlockAnnounce::ImportHeader` is returned, then the caller MUST try to import passed
|
||||||
/// header (call `on_block_data`). The network request isn't sent
|
/// header (call `on_block_data`). The network request isn't sent
|
||||||
/// in this case. Both hash and header is passed as an optimization
|
/// in this case. Both hash and header is passed as an optimization
|
||||||
/// to avoid rehashing the header.
|
/// to avoid rehashing the header.
|
||||||
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, announce: &BlockAnnounce<B::Header>, is_best: bool)
|
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, announce: &BlockAnnounce<B::Header>, is_best: bool)
|
||||||
-> OnBlockAnnounce<B>
|
-> OnBlockAnnounce
|
||||||
{
|
{
|
||||||
let header = &announce.header;
|
let header = &announce.header;
|
||||||
let number = *header.number();
|
let number = *header.number();
|
||||||
@@ -1001,6 +1031,9 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
// known block case
|
// known block case
|
||||||
if known || self.is_already_downloading(&hash) {
|
if known || self.is_already_downloading(&hash) {
|
||||||
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
|
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
|
||||||
|
if let Some(target) = self.fork_targets.get_mut(&hash) {
|
||||||
|
target.peers.insert(who);
|
||||||
|
}
|
||||||
return OnBlockAnnounce::Nothing
|
return OnBlockAnnounce::Nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1009,79 +1042,42 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
match self.block_announce_validator.validate(&header, assoc_data) {
|
match self.block_announce_validator.validate(&header, assoc_data) {
|
||||||
Ok(Validation::Success) => (),
|
Ok(Validation::Success) => (),
|
||||||
Ok(Validation::Failure) => {
|
Ok(Validation::Failure) => {
|
||||||
debug!(target: "sync", "block announcement validation of block {} from {} failed", hash, who);
|
debug!(target: "sync", "Block announcement validation of block {} from {} failed", hash, who);
|
||||||
return OnBlockAnnounce::Nothing
|
return OnBlockAnnounce::Nothing
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(target: "sync", "block announcement validation errored: {}", e);
|
error!(target: "sync", "Block announcement validation errored: {}", e);
|
||||||
return OnBlockAnnounce::Nothing
|
return OnBlockAnnounce::Nothing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// stale block case
|
|
||||||
let requires_additional_data = !self.role.is_light();
|
|
||||||
if number <= self.best_queued_number {
|
|
||||||
if !(known_parent || self.is_already_downloading(header.parent_hash())) {
|
|
||||||
let block_status = self.client.block_status(&BlockId::Number(*header.number()))
|
|
||||||
.unwrap_or(BlockStatus::Unknown);
|
|
||||||
if block_status == BlockStatus::InChainPruned {
|
|
||||||
trace!(
|
|
||||||
target: "sync",
|
|
||||||
"Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header
|
|
||||||
);
|
|
||||||
return OnBlockAnnounce::Nothing
|
|
||||||
}
|
|
||||||
trace!(
|
|
||||||
target: "sync",
|
|
||||||
"Considering new unknown stale block announced from {}: {} {:?}", who, hash, header
|
|
||||||
);
|
|
||||||
if let Some(request) = self.download_unknown_stale(&who, &hash) {
|
|
||||||
if requires_additional_data {
|
|
||||||
return OnBlockAnnounce::Request(who, request)
|
|
||||||
} else {
|
|
||||||
return OnBlockAnnounce::ImportHeader
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return OnBlockAnnounce::Nothing
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if ancient_parent {
|
|
||||||
trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header);
|
|
||||||
return OnBlockAnnounce::Nothing
|
|
||||||
}
|
|
||||||
if let Some(request) = self.download_stale(&who, &hash) {
|
|
||||||
if requires_additional_data {
|
|
||||||
return OnBlockAnnounce::Request(who, request)
|
|
||||||
} else {
|
|
||||||
return OnBlockAnnounce::ImportHeader
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return OnBlockAnnounce::Nothing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ancient_parent {
|
if ancient_parent {
|
||||||
trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header);
|
trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header);
|
||||||
return OnBlockAnnounce::Nothing
|
return OnBlockAnnounce::Nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header);
|
let requires_additional_data = !self.role.is_light() || !known_parent;
|
||||||
|
if !requires_additional_data {
|
||||||
let (range, request) = match self.select_new_blocks(who.clone()) {
|
trace!(target: "sync", "Importing new header announced from {}: {} {:?}", who, hash, header);
|
||||||
Some((range, request)) => (range, request),
|
return OnBlockAnnounce::ImportHeader
|
||||||
None => return OnBlockAnnounce::Nothing
|
|
||||||
};
|
|
||||||
|
|
||||||
let is_required_data_available = !requires_additional_data
|
|
||||||
&& range.end - range.start == One::one()
|
|
||||||
&& range.start == *header.number();
|
|
||||||
|
|
||||||
if !is_required_data_available {
|
|
||||||
return OnBlockAnnounce::Request(who, request)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
OnBlockAnnounce::ImportHeader
|
if number <= self.best_queued_number {
|
||||||
|
trace!(
|
||||||
|
target: "sync",
|
||||||
|
"Added sync target for block announced from {}: {} {:?}", who, hash, header
|
||||||
|
);
|
||||||
|
self.fork_targets
|
||||||
|
.entry(hash.clone())
|
||||||
|
.or_insert_with(|| ForkTarget {
|
||||||
|
number,
|
||||||
|
parent_hash: Some(header.parent_hash().clone()),
|
||||||
|
peers: Default::default(),
|
||||||
|
})
|
||||||
|
.peers.insert(who);
|
||||||
|
}
|
||||||
|
|
||||||
|
OnBlockAnnounce::Nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Call when a peer has disconnected.
|
/// Call when a peer has disconnected.
|
||||||
@@ -1117,40 +1113,6 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Download old block with known parent.
|
|
||||||
fn download_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option<BlockRequest<B>> {
|
|
||||||
let peer = self.peers.get_mut(who)?;
|
|
||||||
if !peer.state.is_available() {
|
|
||||||
return None
|
|
||||||
}
|
|
||||||
peer.state = PeerSyncState::DownloadingStale(*hash);
|
|
||||||
Some(message::generic::BlockRequest {
|
|
||||||
id: 0,
|
|
||||||
fields: self.required_block_attributes.clone(),
|
|
||||||
from: message::FromBlock::Hash(*hash),
|
|
||||||
to: None,
|
|
||||||
direction: message::Direction::Ascending,
|
|
||||||
max: Some(1),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Download old block with unknown parent.
|
|
||||||
fn download_unknown_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option<BlockRequest<B>> {
|
|
||||||
let peer = self.peers.get_mut(who)?;
|
|
||||||
if !peer.state.is_available() {
|
|
||||||
return None
|
|
||||||
}
|
|
||||||
peer.state = PeerSyncState::DownloadingStale(*hash);
|
|
||||||
Some(message::generic::BlockRequest {
|
|
||||||
id: 0,
|
|
||||||
fields: self.required_block_attributes.clone(),
|
|
||||||
from: message::FromBlock::Hash(*hash),
|
|
||||||
to: None,
|
|
||||||
direction: message::Direction::Descending,
|
|
||||||
max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Select a range of new blocks to download from the given peer.
|
/// Select a range of new blocks to download from the given peer.
|
||||||
fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
|
fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
|
||||||
// when there are too many blocks in the queue => do not try to download new blocks
|
// when there are too many blocks in the queue => do not try to download new blocks
|
||||||
@@ -1298,28 +1260,35 @@ fn peer_block_request<B: BlockT>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get pending explicit sync request for a peer.
|
/// Get pending fork sync targets for a peer.
|
||||||
fn explicit_sync_request<B: BlockT>(
|
fn fork_sync_request<B: BlockT>(
|
||||||
id: &PeerId,
|
id: &PeerId,
|
||||||
requests: &HashMap<B::Hash, SyncRequest<B>>,
|
targets: &HashMap<B::Hash, ForkTarget<B>>,
|
||||||
best_num: NumberFor<B>,
|
best_num: NumberFor<B>,
|
||||||
finalized: NumberFor<B>,
|
finalized: NumberFor<B>,
|
||||||
attributes: &message::BlockAttributes,
|
attributes: &message::BlockAttributes,
|
||||||
|
check_block: impl Fn(&B::Hash) -> BlockStatus,
|
||||||
) -> Option<(B::Hash, BlockRequest<B>)>
|
) -> Option<(B::Hash, BlockRequest<B>)>
|
||||||
{
|
{
|
||||||
for (hash, r) in requests {
|
for (hash, r) in targets {
|
||||||
if !r.peers.contains(id) {
|
if !r.peers.contains(id) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if r.number <= best_num {
|
if r.number <= best_num {
|
||||||
trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id);
|
trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id);
|
||||||
|
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
|
||||||
|
let mut count = (r.number - finalized).saturated_into::<u32>(); // up to the last finalized block
|
||||||
|
if parent_status != BlockStatus::Unknown {
|
||||||
|
// request only single block
|
||||||
|
count = 1;
|
||||||
|
}
|
||||||
return Some((hash.clone(), message::generic::BlockRequest {
|
return Some((hash.clone(), message::generic::BlockRequest {
|
||||||
id: 0,
|
id: 0,
|
||||||
fields: attributes.clone(),
|
fields: attributes.clone(),
|
||||||
from: message::FromBlock::Hash(hash.clone()),
|
from: message::FromBlock::Hash(hash.clone()),
|
||||||
to: None,
|
to: None,
|
||||||
direction: message::Direction::Descending,
|
direction: message::Direction::Descending,
|
||||||
max: Some((r.number - finalized).saturated_into::<u32>()), // up to the last finalized block
|
max: Some(count),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -704,7 +704,9 @@ pub trait TestNetFactory: Sized {
|
|||||||
fn poll(&mut self) {
|
fn poll(&mut self) {
|
||||||
self.mut_peers(|peers| {
|
self.mut_peers(|peers| {
|
||||||
for peer in peers {
|
for peer in peers {
|
||||||
|
trace!(target: "sync", "-- Polling {}", peer.id());
|
||||||
peer.network.poll().unwrap();
|
peer.network.poll().unwrap();
|
||||||
|
trace!(target: "sync", "-- Polling complete {}", peer.id());
|
||||||
|
|
||||||
// We poll `imported_blocks_stream`.
|
// We poll `imported_blocks_stream`.
|
||||||
while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() {
|
while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() {
|
||||||
|
|||||||
@@ -457,6 +457,17 @@ fn can_sync_small_non_best_forks() {
|
|||||||
}
|
}
|
||||||
Ok(Async::Ready(()))
|
Ok(Async::Ready(()))
|
||||||
})).unwrap();
|
})).unwrap();
|
||||||
|
net.block_until_sync(&mut runtime);
|
||||||
|
|
||||||
|
let another_fork = net.peer(0).push_blocks_at(BlockId::Number(35), 2, true);
|
||||||
|
net.peer(0).announce_block(another_fork, Vec::new());
|
||||||
|
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
|
||||||
|
net.poll();
|
||||||
|
if net.peer(1).client().header(&BlockId::Hash(another_fork)).unwrap().is_none() {
|
||||||
|
return Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
})).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
Reference in New Issue
Block a user