mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 15:51:12 +00:00
Remove blocking operations in SyncOracle implementation (#1852)
* remove blocking operations in SyncOracle implementation * docs * docs
This commit is contained in:
committed by
Gav Wood
parent
733ce7d616
commit
077ed00951
@@ -31,6 +31,7 @@ use crate::config::{ProtocolConfig, Roles};
|
||||
use rustc_hex::ToHex;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::{cmp, num::NonZeroUsize, thread, time};
|
||||
use log::{trace, debug, warn};
|
||||
use crate::chain::Client;
|
||||
@@ -200,10 +201,6 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
|
||||
ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
|
||||
/// Incoming gossip consensus message.
|
||||
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>),
|
||||
/// Is protocol currently major-syncing?
|
||||
IsMajorSyncing(Sender<bool>),
|
||||
/// Is protocol currently offline?
|
||||
IsOffline(Sender<bool>),
|
||||
/// Return a list of peers currently known to protocol.
|
||||
Peers(Sender<Vec<(NodeIndex, PeerInfo<B>)>>),
|
||||
/// Let protocol know a peer is currenlty clogged.
|
||||
@@ -237,6 +234,8 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
|
||||
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
/// Create a new instance.
|
||||
pub fn new(
|
||||
is_offline: Arc<AtomicBool>,
|
||||
is_major_syncing: Arc<AtomicBool>,
|
||||
network_chan: NetworkChan<B>,
|
||||
config: ProtocolConfig,
|
||||
chain: Arc<Client<B>>,
|
||||
@@ -247,7 +246,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
) -> error::Result<Sender<ProtocolMsg<B, S>>> {
|
||||
let (sender, port) = channel::unbounded();
|
||||
let info = chain.info()?;
|
||||
let sync = ChainSync::new(config.roles, &info, import_queue);
|
||||
let sync = ChainSync::new(is_offline, is_major_syncing, config.roles, &info, import_queue);
|
||||
let _ = thread::Builder::new()
|
||||
.name("Protocol".into())
|
||||
.spawn(move || {
|
||||
@@ -330,14 +329,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => {
|
||||
self.gossip_consensus_message(topic, engine_id, message)
|
||||
}
|
||||
ProtocolMsg::IsMajorSyncing(sender) => {
|
||||
let is_syncing = self.sync.status().is_major_syncing();
|
||||
let _ = sender.send(is_syncing);
|
||||
}
|
||||
ProtocolMsg::IsOffline(sender) => {
|
||||
let is_offline = self.sync.status().is_offline();
|
||||
let _ = sender.send(is_offline);
|
||||
}
|
||||
ProtocolMsg::MaintainSync => {
|
||||
let mut context =
|
||||
ProtocolContext::new(&mut self.context_data, &self.network_chan);
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::{io, thread};
|
||||
use log::{warn, debug, error, trace, info};
|
||||
use futures::{Async, Future, Stream, stream, sync::oneshot};
|
||||
@@ -117,6 +118,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
|
||||
|
||||
/// Substrate network service. Handles network IO and manages connectivity.
|
||||
pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
|
||||
// Are we connected to any peer?
|
||||
is_offline: Arc<AtomicBool>,
|
||||
// Are we actively catching up with the chain?
|
||||
is_major_syncing: Arc<AtomicBool>,
|
||||
/// Network service
|
||||
network: Arc<Mutex<NetworkService<Message<B>>>>,
|
||||
/// Protocol sender
|
||||
@@ -135,7 +140,12 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
import_queue: Box<ImportQueue<B>>,
|
||||
) -> Result<(Arc<Service<B, S>>, NetworkChan<B>), Error> {
|
||||
let (network_chan, network_port) = network_channel(protocol_id);
|
||||
// Start in off-line mode, since we're not connected to any nodes yet.
|
||||
let is_offline = Arc::new(AtomicBool::new(true));
|
||||
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
||||
let protocol_sender = Protocol::new(
|
||||
is_offline.clone(),
|
||||
is_major_syncing.clone(),
|
||||
network_chan.clone(),
|
||||
params.config,
|
||||
params.chain,
|
||||
@@ -154,6 +164,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
)?;
|
||||
|
||||
let service = Arc::new(Service {
|
||||
is_offline,
|
||||
is_major_syncing,
|
||||
network,
|
||||
protocol_sender: protocol_sender.clone(),
|
||||
bg_thread: Some(thread),
|
||||
@@ -244,22 +256,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
|
||||
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ::consensus::SyncOracle for Service<B, S> {
|
||||
fn is_major_syncing(&self) -> bool {
|
||||
let (sender, port) = channel::unbounded();
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::IsMajorSyncing(sender));
|
||||
port.recv().expect("1. Protocol keeps handling messages until all senders are dropped,
|
||||
or the ProtocolMsg::Stop message is received,
|
||||
2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.")
|
||||
self.is_major_syncing.load(Ordering::Relaxed)
|
||||
}
|
||||
fn is_offline(&self) -> bool {
|
||||
let (sender, port) = channel::unbounded();
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::IsOffline(sender));
|
||||
port.recv().expect("1. Protocol keeps handling messages until all senders are dropped,
|
||||
or the ProtocolMsg::Stop message is received,
|
||||
2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.")
|
||||
self.is_offline.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberF
|
||||
use runtime_primitives::generic::BlockId;
|
||||
use crate::message::{self, generic::Message as GenericMessage};
|
||||
use crate::config::Roles;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
// Maximum blocks to request in a single packet.
|
||||
@@ -46,6 +47,7 @@ const ANNOUNCE_HISTORY_SIZE: usize = 64;
|
||||
// TODO: this should take finality into account. See https://github.com/paritytech/substrate/issues/1606
|
||||
const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PeerSync<B: BlockT> {
|
||||
pub common_number: NumberFor<B>,
|
||||
pub best_hash: B::Hash,
|
||||
@@ -302,6 +304,8 @@ pub struct ChainSync<B: BlockT> {
|
||||
justifications: PendingJustifications<B>,
|
||||
import_queue: Box<ImportQueue<B>>,
|
||||
is_stopping: AtomicBool,
|
||||
is_offline: Arc<AtomicBool>,
|
||||
is_major_syncing: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
/// Reported sync state.
|
||||
@@ -342,7 +346,13 @@ impl<B: BlockT> Status<B> {
|
||||
|
||||
impl<B: BlockT> ChainSync<B> {
|
||||
/// Create a new instance.
|
||||
pub(crate) fn new(role: Roles, info: &ClientInfo<B>, import_queue: Box<ImportQueue<B>>) -> Self {
|
||||
pub(crate) fn new(
|
||||
is_offline: Arc<AtomicBool>,
|
||||
is_major_syncing: Arc<AtomicBool>,
|
||||
role: Roles,
|
||||
info: &ClientInfo<B>,
|
||||
import_queue: Box<ImportQueue<B>>
|
||||
) -> Self {
|
||||
let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION;
|
||||
if role.intersects(Roles::FULL | Roles::AUTHORITY) {
|
||||
required_block_attributes |= message::BlockAttributes::BODY;
|
||||
@@ -358,6 +368,8 @@ impl<B: BlockT> ChainSync<B> {
|
||||
required_block_attributes,
|
||||
import_queue,
|
||||
is_stopping: Default::default(),
|
||||
is_offline,
|
||||
is_major_syncing,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -370,13 +382,17 @@ impl<B: BlockT> ChainSync<B> {
|
||||
self.import_queue.clone()
|
||||
}
|
||||
|
||||
fn state(&self, best_seen: &Option<NumberFor<B>>) -> SyncState {
|
||||
match best_seen {
|
||||
&Some(n) if n > self.best_queued_number && n - self.best_queued_number > As::sa(5) => SyncState::Downloading,
|
||||
_ => SyncState::Idle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns sync status.
|
||||
pub(crate) fn status(&self) -> Status<B> {
|
||||
let best_seen = self.best_seen_block();
|
||||
let state = match &best_seen {
|
||||
&Some(n) if n > self.best_queued_number && n - self.best_queued_number > As::sa(5) => SyncState::Downloading,
|
||||
_ => SyncState::Idle,
|
||||
};
|
||||
let state = self.state(&best_seen);
|
||||
Status {
|
||||
state: state,
|
||||
best_seen_block: best_seen,
|
||||
@@ -386,6 +402,13 @@ impl<B: BlockT> ChainSync<B> {
|
||||
|
||||
/// Handle new connected peer.
|
||||
pub(crate) fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
|
||||
// Initialize some variables to determine if
|
||||
// is_offline or is_major_syncing should be updated
|
||||
// after processing this new peer.
|
||||
let previous_len = self.peers.len();
|
||||
let previous_best_seen = self.best_seen_block();
|
||||
let previous_state = self.state(&previous_best_seen);
|
||||
|
||||
if let Some(info) = protocol.peer_info(who) {
|
||||
match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) {
|
||||
(Err(e), _) => {
|
||||
@@ -450,6 +473,22 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let current_best_seen = self.best_seen_block();
|
||||
let current_state = self.state(¤t_best_seen);
|
||||
let current_len = self.peers.len();
|
||||
if previous_len == 0 && current_len > 0 {
|
||||
// We were offline, and now we're connected to at least one peer.
|
||||
self.is_offline.store(false, Ordering::Relaxed);
|
||||
}
|
||||
if previous_len < current_len {
|
||||
// We added a peer, let's see if major_syncing should be updated.
|
||||
match (previous_state, current_state) {
|
||||
(SyncState::Idle, SyncState::Downloading) => self.is_major_syncing.store(true, Ordering::Relaxed),
|
||||
(SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed),
|
||||
_ => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle new block data.
|
||||
@@ -660,10 +699,19 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
fn block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
|
||||
let best_seen = self.best_seen_block();
|
||||
let previous_state = self.state(&best_seen);
|
||||
if number > self.best_queued_number {
|
||||
self.best_queued_number = number;
|
||||
self.best_queued_hash = *hash;
|
||||
}
|
||||
let current_state = self.state(&best_seen);
|
||||
// If the latest queued block changed our state, update is_major_syncing.
|
||||
match (previous_state, current_state) {
|
||||
(SyncState::Idle, SyncState::Downloading) => self.is_major_syncing.store(true, Ordering::Relaxed),
|
||||
(SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed),
|
||||
_ => {},
|
||||
}
|
||||
// Update common blocks
|
||||
for (n, peer) in self.peers.iter_mut() {
|
||||
if let PeerSyncState::AncestorSearch(_) = peer.state {
|
||||
@@ -744,8 +792,21 @@ impl<B: BlockT> ChainSync<B> {
|
||||
|
||||
/// Handle disconnected peer.
|
||||
pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
|
||||
let previous_best_seen = self.best_seen_block();
|
||||
let previous_state = self.state(&previous_best_seen);
|
||||
self.blocks.clear_peer_download(who);
|
||||
self.peers.remove(&who);
|
||||
if self.peers.len() == 0 {
|
||||
// We're not connected to any peer anymore.
|
||||
self.is_offline.store(true, Ordering::Relaxed);
|
||||
}
|
||||
let current_best_seen = self.best_seen_block();
|
||||
let current_state = self.state(¤t_best_seen);
|
||||
// We removed a peer, let's see if this put us in idle state and is_major_syncing should be updated.
|
||||
match (previous_state, current_state) {
|
||||
(SyncState::Downloading, SyncState::Idle) => self.is_major_syncing.store(false, Ordering::Relaxed),
|
||||
_ => {},
|
||||
}
|
||||
self.justifications.peer_disconnected(who);
|
||||
self.maintain_sync(protocol);
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ mod sync;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -44,7 +45,7 @@ use network_libp2p::{NodeIndex, ProtocolId};
|
||||
use parity_codec::Encode;
|
||||
use parking_lot::Mutex;
|
||||
use primitives::{H256, Ed25519AuthorityId};
|
||||
use crate::protocol::{Context, Protocol, ProtocolMsg, ProtocolStatus};
|
||||
use crate::protocol::{Context, Protocol, ProtocolMsg};
|
||||
use runtime_primitives::generic::BlockId;
|
||||
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor};
|
||||
use runtime_primitives::Justification;
|
||||
@@ -117,6 +118,8 @@ impl NetworkSpecialization<Block> for DummySpecialization {
|
||||
pub type PeersClient = client::Client<test_client::Backend, test_client::Executor, Block, test_client::runtime::RuntimeApi>;
|
||||
|
||||
pub struct Peer<D> {
|
||||
pub is_offline: Arc<AtomicBool>,
|
||||
pub is_major_syncing: Arc<AtomicBool>,
|
||||
client: Arc<PeersClient>,
|
||||
pub protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
|
||||
|
||||
@@ -130,6 +133,8 @@ pub struct Peer<D> {
|
||||
|
||||
impl<D> Peer<D> {
|
||||
fn new(
|
||||
is_offline: Arc<AtomicBool>,
|
||||
is_major_syncing: Arc<AtomicBool>,
|
||||
client: Arc<PeersClient>,
|
||||
import_queue: Box<ImportQueue<Block>>,
|
||||
protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
|
||||
@@ -139,6 +144,8 @@ impl<D> Peer<D> {
|
||||
) -> Self {
|
||||
let network_port = Mutex::new(network_port);
|
||||
Peer {
|
||||
is_offline,
|
||||
is_major_syncing,
|
||||
client,
|
||||
protocol_sender,
|
||||
import_queue,
|
||||
@@ -179,6 +186,16 @@ impl<D> Peer<D> {
|
||||
.send(ProtocolMsg::BlockImported(hash, header.clone()));
|
||||
}
|
||||
|
||||
// SyncOracle: are we connected to any peer?
|
||||
fn is_offline(&self) -> bool {
|
||||
self.is_offline.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
// SyncOracle: are we in the process of catching-up with the chain?
|
||||
fn is_major_syncing(&self) -> bool {
|
||||
self.is_major_syncing.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Called on connection to other indicated peer.
|
||||
fn on_connect(&self, other: NodeIndex) {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::PeerConnected(other, String::new()));
|
||||
@@ -266,12 +283,6 @@ impl<D> Peer<D> {
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::Abort);
|
||||
}
|
||||
|
||||
pub fn status(&self) -> ProtocolStatus<Block> {
|
||||
let (sender, port) = channel::unbounded();
|
||||
let _ = self.protocol_sender.send(ProtocolMsg::Status(sender));
|
||||
port.recv().unwrap()
|
||||
}
|
||||
|
||||
/// Push a message into the gossip network and relay to peers.
|
||||
/// `TestNet::sync_step` needs to be called to ensure it's propagated.
|
||||
pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, engine_id: ConsensusEngineId, data: Vec<u8>) {
|
||||
@@ -467,7 +478,11 @@ pub trait TestNetFactory: Sized {
|
||||
|
||||
let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import));
|
||||
let specialization = DummySpecialization {};
|
||||
let is_offline = Arc::new(AtomicBool::new(true));
|
||||
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
||||
let protocol_sender = Protocol::new(
|
||||
is_offline.clone(),
|
||||
is_major_syncing.clone(),
|
||||
network_sender.clone(),
|
||||
config.clone(),
|
||||
client.clone(),
|
||||
@@ -478,6 +493,8 @@ pub trait TestNetFactory: Sized {
|
||||
).unwrap();
|
||||
|
||||
let peer = Arc::new(Peer::new(
|
||||
is_offline,
|
||||
is_major_syncing,
|
||||
client,
|
||||
import_queue,
|
||||
protocol_sender,
|
||||
|
||||
@@ -19,10 +19,78 @@ use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
|
||||
use crate::config::Roles;
|
||||
use consensus::BlockOrigin;
|
||||
use network_libp2p::NodeIndex;
|
||||
use crate::sync::SyncState;
|
||||
use std::collections::HashSet;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn sync_cycle_from_offline_to_syncing_to_offline() {
|
||||
let _ = ::env_logger::try_init();
|
||||
let mut net = TestNet::new(3);
|
||||
for peer in 0..3 {
|
||||
// Offline, and not major syncing.
|
||||
assert!(net.peer(peer).is_offline());
|
||||
assert!(!net.peer(peer).is_major_syncing());
|
||||
}
|
||||
|
||||
// Generate blocks.
|
||||
net.peer(2).push_blocks(100, false);
|
||||
net.start();
|
||||
net.route_fast();
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
net.route_fast();
|
||||
for peer in 0..3 {
|
||||
// Online
|
||||
assert!(!net.peer(peer).is_offline());
|
||||
if peer < 2 {
|
||||
// Major syncing.
|
||||
assert!(net.peer(peer).is_major_syncing());
|
||||
}
|
||||
}
|
||||
net.sync();
|
||||
for peer in 0..3 {
|
||||
// All done syncing.
|
||||
assert!(!net.peer(peer).is_major_syncing());
|
||||
}
|
||||
|
||||
// Now disconnect them all.
|
||||
for peer in 0..3 {
|
||||
for other in 0..3 {
|
||||
if other != peer {
|
||||
net.peer(peer).on_disconnect(other);
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
assert!(net.peer(peer).is_offline());
|
||||
assert!(!net.peer(peer).is_major_syncing());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn syncing_node_not_major_syncing_when_disconnected() {
|
||||
let _ = ::env_logger::try_init();
|
||||
let mut net = TestNet::new(3);
|
||||
|
||||
// Generate blocks.
|
||||
net.peer(2).push_blocks(100, false);
|
||||
net.start();
|
||||
net.route_fast();
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
net.route_fast();
|
||||
|
||||
// Peer 1 is major-syncing.
|
||||
assert!(net.peer(1).is_major_syncing());
|
||||
|
||||
// Disconnect peer 1 form everyone else.
|
||||
net.peer(1).on_disconnect(0);
|
||||
net.peer(1).on_disconnect(2);
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// Peer 1 is not major-syncing.
|
||||
assert!(!net.peer(1).is_major_syncing());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_from_two_peers_works() {
|
||||
let _ = ::env_logger::try_init();
|
||||
@@ -32,8 +100,7 @@ fn sync_from_two_peers_works() {
|
||||
net.sync();
|
||||
assert!(net.peer(0).client.backend().as_in_memory().blockchain()
|
||||
.equals_to(net.peer(1).client.backend().as_in_memory().blockchain()));
|
||||
let status = net.peer(0).status();
|
||||
assert_eq!(status.sync.state, SyncState::Idle);
|
||||
assert!(!net.peer(0).is_major_syncing());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user