mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 07:01:05 +00:00
Explicit sync API for downloading important, possibly orphaned, forks (#3633)
* Explicit sync API * Keep sync requests * Don't request the finalized block we already have * Dropping requests & docs * Renamed a function
This commit is contained in:
committed by
Robert Habermeier
parent
5d82f453e8
commit
3242d7f2b6
@@ -1236,6 +1236,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
self.sync.request_justification(&hash, number)
|
||||
}
|
||||
|
||||
/// Request syncing for the given block from given set of peers.
|
||||
/// Uses `protocol` to queue a new block download request and tries to dispatch all pending
|
||||
/// requests.
|
||||
pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
|
||||
self.sync.set_sync_fork_request(peers, hash, number)
|
||||
}
|
||||
|
||||
/// A batch of blocks have been processed, with or without errors.
|
||||
/// Call this when a batch of blocks have been processed by the importqueue, with or without
|
||||
/// errors.
|
||||
|
||||
@@ -123,7 +123,10 @@ pub struct ChainSync<B: BlockT> {
|
||||
queue_blocks: HashSet<B::Hash>,
|
||||
/// The best block number that we are currently importing.
|
||||
best_importing_number: NumberFor<B>,
|
||||
/// Finality proof handler.
|
||||
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
|
||||
/// Explicit sync requests.
|
||||
sync_requests: HashMap<B::Hash, SyncRequest<B>>,
|
||||
/// A flag that caches idle state with no pending requests.
|
||||
is_idle: bool,
|
||||
/// A type to check incoming block announcements.
|
||||
@@ -157,6 +160,11 @@ pub struct PeerInfo<B: BlockT> {
|
||||
pub best_number: NumberFor<B>
|
||||
}
|
||||
|
||||
struct SyncRequest<B: BlockT> {
|
||||
number: NumberFor<B>,
|
||||
peers: HashSet<PeerId>,
|
||||
}
|
||||
|
||||
/// The state of syncing between a Peer and ourselves.
|
||||
///
|
||||
/// Generally two categories, "busy" or `Available`. If busy, the enum
|
||||
@@ -299,6 +307,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
queue_blocks: Default::default(),
|
||||
best_importing_number: Zero::zero(),
|
||||
request_builder,
|
||||
sync_requests: Default::default(),
|
||||
is_idle: false,
|
||||
block_announce_validator,
|
||||
}
|
||||
@@ -449,6 +458,51 @@ impl<B: BlockT> ChainSync<B> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Request syncing for the given block from given set of peers.
|
||||
// The implementation is similar to on_block_announce with unknown parent hash.
|
||||
pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
|
||||
if peers.is_empty() {
|
||||
if let Some(_) = self.sync_requests.remove(hash) {
|
||||
debug!(target: "sync", "Cleared sync request for block {:?} with {:?}", hash, peers);
|
||||
}
|
||||
return;
|
||||
}
|
||||
debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers);
|
||||
if self.is_known(&hash) {
|
||||
debug!(target: "sync", "Refusing to sync known hash {:?}", hash);
|
||||
return;
|
||||
}
|
||||
|
||||
let block_status = self.client.block_status(&BlockId::Number(number - One::one()))
|
||||
.unwrap_or(BlockStatus::Unknown);
|
||||
if block_status == BlockStatus::InChainPruned {
|
||||
trace!(target: "sync", "Refusing to sync ancient block {:?}", hash);
|
||||
return;
|
||||
}
|
||||
|
||||
self.is_idle = false;
|
||||
for peer_id in &peers {
|
||||
if let Some(peer) = self.peers.get_mut(peer_id) {
|
||||
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
|
||||
continue;
|
||||
}
|
||||
|
||||
if number > peer.best_number {
|
||||
peer.best_number = number;
|
||||
peer.best_hash = hash.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.sync_requests
|
||||
.entry(hash.clone())
|
||||
.or_insert_with(|| SyncRequest {
|
||||
number,
|
||||
peers: Default::default(),
|
||||
})
|
||||
.peers.extend(peers);
|
||||
}
|
||||
|
||||
/// Get an iterator over all scheduled justification requests.
|
||||
pub fn justification_requests(&mut self) -> impl Iterator<Item = (PeerId, BlockRequest<B>)> + '_ {
|
||||
let peers = &mut self.peers;
|
||||
@@ -508,13 +562,21 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
let blocks = &mut self.blocks;
|
||||
let attrs = &self.required_block_attributes;
|
||||
let sync_requests = &self.sync_requests;
|
||||
let mut have_requests = false;
|
||||
let last_finalized = self.client.info().chain.finalized_number;
|
||||
let best_queued = self.best_queued_number;
|
||||
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
|
||||
if !peer.state.is_available() {
|
||||
trace!(target: "sync", "Peer {} is busy", id);
|
||||
return None
|
||||
}
|
||||
if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) {
|
||||
if let Some((hash, req)) = explicit_sync_request(id, sync_requests, best_queued, last_finalized, attrs) {
|
||||
trace!(target: "sync", "Downloading explicitly requested block {:?} from {}", hash, id);
|
||||
peer.state = PeerSyncState::DownloadingStale(hash);
|
||||
have_requests = true;
|
||||
Some((id.clone(), req))
|
||||
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) {
|
||||
peer.state = PeerSyncState::DownloadingNew(range.start);
|
||||
trace!(target: "sync", "New block request for {}", id);
|
||||
have_requests = true;
|
||||
@@ -860,6 +922,9 @@ impl<B: BlockT> ChainSync<B> {
|
||||
self.best_queued_number = number;
|
||||
self.best_queued_hash = *hash;
|
||||
}
|
||||
if let Some(_) = self.sync_requests.remove(&hash) {
|
||||
trace!(target: "sync", "Completed explicit sync request {:?}", hash);
|
||||
}
|
||||
// Update common blocks
|
||||
for (n, peer) in self.peers.iter_mut() {
|
||||
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
|
||||
@@ -1232,3 +1297,32 @@ fn peer_block_request<B: BlockT>(
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Get pending explicit sync request for a peer.
|
||||
fn explicit_sync_request<B: BlockT>(
|
||||
id: &PeerId,
|
||||
requests: &HashMap<B::Hash, SyncRequest<B>>,
|
||||
best_num: NumberFor<B>,
|
||||
finalized: NumberFor<B>,
|
||||
attributes: &message::BlockAttributes,
|
||||
) -> Option<(B::Hash, BlockRequest<B>)>
|
||||
{
|
||||
for (hash, r) in requests {
|
||||
if !r.peers.contains(id) {
|
||||
continue
|
||||
}
|
||||
if r.number <= best_num {
|
||||
trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id);
|
||||
return Some((hash.clone(), message::generic::BlockRequest {
|
||||
id: 0,
|
||||
fields: attributes.clone(),
|
||||
from: message::FromBlock::Hash(hash.clone()),
|
||||
to: None,
|
||||
direction: message::Direction::Descending,
|
||||
max: Some((r.number - finalized).saturated_into::<u32>()), // up to the last finalized block
|
||||
}))
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
|
||||
@@ -496,6 +496,18 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Configure an explicit fork sync request.
|
||||
/// Note that this function should not be used for recent blocks.
|
||||
/// Sync should be able to download all the recent forks normally.
|
||||
/// `set_sync_fork_request` should only be used if external code detects that there's
|
||||
/// a stale fork missing.
|
||||
/// Passing empty `peers` set effectively removes the sync request.
|
||||
pub fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
|
||||
let _ = self
|
||||
.to_worker
|
||||
.unbounded_send(ServerToWorkerMsg::SyncFork(peers, hash, number));
|
||||
}
|
||||
|
||||
/// Modify a peerset priority group.
|
||||
pub fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> {
|
||||
let peers = peers.into_iter().map(|p| {
|
||||
@@ -586,6 +598,7 @@ enum ServerToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
|
||||
GetValue(record::Key),
|
||||
PutValue(record::Key, Vec<u8>),
|
||||
AddKnownAddress(PeerId, Multiaddr),
|
||||
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
|
||||
}
|
||||
|
||||
/// Main network worker. Must be polled in order for the network to advance.
|
||||
@@ -664,6 +677,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for Ne
|
||||
self.network_service.put_value(key, value),
|
||||
ServerToWorkerMsg::AddKnownAddress(peer_id, addr) =>
|
||||
self.network_service.add_known_address(peer_id, addr),
|
||||
ServerToWorkerMsg::SyncFork(peer_ids, hash, number) =>
|
||||
self.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -229,6 +229,11 @@ pub struct Peer<D, S: NetworkSpecialization<Block>> {
|
||||
}
|
||||
|
||||
impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
|
||||
/// Get this peer ID.
|
||||
pub fn id(&self) -> PeerId {
|
||||
self.network.service().local_peer_id()
|
||||
}
|
||||
|
||||
/// Returns true if we're major syncing.
|
||||
pub fn is_major_syncing(&self) -> bool {
|
||||
self.network.service().is_major_syncing()
|
||||
@@ -259,6 +264,11 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
|
||||
self.network.service().announce_block(hash, data);
|
||||
}
|
||||
|
||||
/// Request explicit fork sync.
|
||||
pub fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: <Block as BlockT>::Hash, number: NumberFor<Block>) {
|
||||
self.network.service().set_sync_fork_request(peers, hash, number);
|
||||
}
|
||||
|
||||
/// Add blocks to the peer -- edit the block before adding
|
||||
pub fn generate_blocks<F>(&mut self, count: usize, origin: BlockOrigin, edit_block: F) -> H256
|
||||
where F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block
|
||||
|
||||
@@ -526,3 +526,59 @@ fn light_peer_imports_header_from_announce() {
|
||||
let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true);
|
||||
import_with_announce(&mut net, &mut runtime, known_stale_hash);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_sync_explicit_forks() {
|
||||
let _ = ::env_logger::try_init();
|
||||
let mut runtime = current_thread::Runtime::new().unwrap();
|
||||
let mut net = TestNet::new(2);
|
||||
net.peer(0).push_blocks(30, false);
|
||||
net.peer(1).push_blocks(30, false);
|
||||
|
||||
// small fork + reorg on peer 1.
|
||||
net.peer(0).push_blocks_at(BlockId::Number(30), 2, true);
|
||||
let small_hash = net.peer(0).client().info().chain.best_hash;
|
||||
let small_number = net.peer(0).client().info().chain.best_number;
|
||||
net.peer(0).push_blocks_at(BlockId::Number(30), 10, false);
|
||||
assert_eq!(net.peer(0).client().info().chain.best_number, 40);
|
||||
|
||||
// peer 1 only ever had the long fork.
|
||||
net.peer(1).push_blocks(10, false);
|
||||
assert_eq!(net.peer(1).client().info().chain.best_number, 40);
|
||||
|
||||
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
|
||||
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none());
|
||||
|
||||
// poll until the two nodes connect, otherwise announcing the block will not work
|
||||
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
|
||||
net.poll();
|
||||
if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
})).unwrap();
|
||||
|
||||
// synchronization: 0 synced to longer chain and 1 didn't sync to small chain.
|
||||
|
||||
assert_eq!(net.peer(0).client().info().chain.best_number, 40);
|
||||
|
||||
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
|
||||
assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
|
||||
|
||||
// request explicit sync
|
||||
let first_peer_id = net.peer(0).id();
|
||||
net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number);
|
||||
|
||||
// peer 1 downloads the block.
|
||||
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
|
||||
net.poll();
|
||||
|
||||
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
|
||||
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
Ok(Async::Ready(()))
|
||||
})).unwrap();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user