mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 05:51:02 +00:00
Rename on_demand to light_dispatch and various minor changes (#3315)
* Rename on_demand to light_server * Small docs improvement * Rename on_block_announce to update_best_number * More minor documentation * Light server -> light dispatch * is_light_rq_response -> is_light_response
This commit is contained in:
committed by
Gavin Wood
parent
87e72d9327
commit
b5b1c2a4d8
@@ -16,7 +16,7 @@
|
||||
|
||||
//! On-demand requests service.
|
||||
|
||||
use crate::protocol::on_demand::RequestData;
|
||||
use crate::protocol::light_dispatch::RequestData;
|
||||
use std::sync::Arc;
|
||||
use futures::{prelude::*, sync::mpsc, sync::oneshot};
|
||||
use futures03::compat::{Compat01As03, Future01CompatExt as _};
|
||||
|
||||
@@ -34,7 +34,7 @@ use message::{BlockAttributes, Direction, FromBlock, Message, RequestId};
|
||||
use message::generic::{Message as GenericMessage, ConsensusMessage};
|
||||
use event::Event;
|
||||
use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||
use on_demand::{OnDemandCore, OnDemandNetwork, RequestData};
|
||||
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
|
||||
use specialization::NetworkSpecialization;
|
||||
use sync::{ChainSync, SyncState};
|
||||
use crate::service::{TransactionPool, ExHashT};
|
||||
@@ -53,7 +53,7 @@ mod util;
|
||||
pub mod consensus_gossip;
|
||||
pub mod message;
|
||||
pub mod event;
|
||||
pub mod on_demand;
|
||||
pub mod light_dispatch;
|
||||
pub mod specialization;
|
||||
pub mod sync;
|
||||
|
||||
@@ -96,8 +96,8 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
|
||||
/// Interval at which we call `propagate_extrinsics`.
|
||||
propagate_timeout: Box<dyn Stream<Item = (), Error = ()> + Send>,
|
||||
config: ProtocolConfig,
|
||||
/// Handler for on-demand requests.
|
||||
on_demand_core: OnDemandCore<B>,
|
||||
/// Handler for light client requests.
|
||||
light_dispatch: LightDispatch<B>,
|
||||
genesis_hash: B::Hash,
|
||||
sync: ChainSync<B>,
|
||||
specialization: S,
|
||||
@@ -149,12 +149,12 @@ pub struct PeerInfo<B: BlockT> {
|
||||
pub best_number: <B::Header as HeaderT>::Number,
|
||||
}
|
||||
|
||||
struct OnDemandIn<'a, B: BlockT> {
|
||||
struct LightDispatchIn<'a, B: BlockT> {
|
||||
behaviour: &'a mut CustomProto<B, Substream<StreamMuxerBox>>,
|
||||
peerset: peerset::PeersetHandle,
|
||||
}
|
||||
|
||||
impl<'a, B: BlockT> OnDemandNetwork<B> for OnDemandIn<'a, B> {
|
||||
impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
|
||||
fn report_peer(&mut self, who: &PeerId, reputation: i32) {
|
||||
self.peerset.report_peer(who.clone(), reputation)
|
||||
}
|
||||
@@ -373,7 +373,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
peers: HashMap::new(),
|
||||
chain,
|
||||
},
|
||||
on_demand_core: OnDemandCore::new(checker),
|
||||
light_dispatch: LightDispatch::new(checker),
|
||||
genesis_hash: info.chain.genesis_hash,
|
||||
sync,
|
||||
specialization: specialization,
|
||||
@@ -445,15 +445,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
/// Starts a new data demand request.
|
||||
///
|
||||
/// The parameter contains a `Sender` where the result, once received, must be sent.
|
||||
pub(crate) fn add_on_demand_request(&mut self, rq: RequestData<B>) {
|
||||
self.on_demand_core.add_request(OnDemandIn {
|
||||
pub(crate) fn add_light_client_request(&mut self, rq: RequestData<B>) {
|
||||
self.light_dispatch.add_request(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, rq);
|
||||
}
|
||||
|
||||
fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool {
|
||||
self.on_demand_core.is_on_demand_response(&who, response_id)
|
||||
fn is_light_response(&self, who: &PeerId, response_id: message::RequestId) -> bool {
|
||||
self.light_dispatch.is_light_response(&who, response_id)
|
||||
}
|
||||
|
||||
fn handle_response(
|
||||
@@ -506,7 +506,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
|
||||
GenericMessage::BlockResponse(r) => {
|
||||
// Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter.
|
||||
if self.is_on_demand_response(&who, r.id) {
|
||||
if self.is_light_response(&who, r.id) {
|
||||
self.on_remote_body_response(who, r);
|
||||
} else {
|
||||
if let Some(request) = self.handle_response(who.clone(), &r) {
|
||||
@@ -629,7 +629,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
self.sync.peer_disconnected(peer.clone());
|
||||
self.specialization.on_disconnect(&mut context, peer.clone());
|
||||
self.on_demand_core.on_disconnect(OnDemandIn {
|
||||
self.light_dispatch.on_disconnect(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, peer);
|
||||
@@ -793,7 +793,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)
|
||||
);
|
||||
self.maintain_peers();
|
||||
self.on_demand_core.maintain_peers(OnDemandIn {
|
||||
self.light_dispatch.maintain_peers(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
});
|
||||
@@ -914,7 +914,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
};
|
||||
|
||||
let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
|
||||
self.on_demand_core.on_connect(OnDemandIn {
|
||||
self.light_dispatch.on_connect(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, who.clone(), status.roles, status.best_number);
|
||||
@@ -1053,7 +1053,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
peer.known_blocks.insert(hash.clone());
|
||||
}
|
||||
}
|
||||
self.on_demand_core.on_block_announce(OnDemandIn {
|
||||
self.light_dispatch.update_best_number(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, who.clone(), *header.number());
|
||||
@@ -1253,7 +1253,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
response: message::RemoteCallResponse
|
||||
) {
|
||||
trace!(target: "sync", "Remote call response {} from {}", response.id, who);
|
||||
self.on_demand_core.on_remote_call_response(OnDemandIn {
|
||||
self.light_dispatch.on_remote_call_response(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, who, response);
|
||||
@@ -1294,7 +1294,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
response: message::RemoteReadResponse
|
||||
) {
|
||||
trace!(target: "sync", "Remote read response {} from {}", response.id, who);
|
||||
self.on_demand_core.on_remote_read_response(OnDemandIn {
|
||||
self.light_dispatch.on_remote_read_response(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, who, response);
|
||||
@@ -1335,7 +1335,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
response: message::RemoteHeaderResponse<B::Header>,
|
||||
) {
|
||||
trace!(target: "sync", "Remote header proof response {} from {}", response.id, who);
|
||||
self.on_demand_core.on_remote_header_response(OnDemandIn {
|
||||
self.light_dispatch.on_remote_header_response(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, who, response);
|
||||
@@ -1401,7 +1401,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
who,
|
||||
response.max
|
||||
);
|
||||
self.on_demand_core.on_remote_changes_response(OnDemandIn {
|
||||
self.light_dispatch.on_remote_changes_response(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, who, response);
|
||||
@@ -1462,7 +1462,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
peer: PeerId,
|
||||
response: message::BlockResponse<B>
|
||||
) {
|
||||
self.on_demand_core.on_remote_body_response(OnDemandIn {
|
||||
self.light_dispatch.on_remote_body_response(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, peer, response);
|
||||
|
||||
+175
-147
@@ -14,7 +14,10 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! On-demand requests service.
|
||||
//! Light client requests service.
|
||||
//!
|
||||
//! Handles requests for data coming from our local light client and that must be answered by
|
||||
//! nodes on the network.
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::sync::Arc;
|
||||
@@ -38,8 +41,8 @@ const RETRY_COUNT: usize = 1;
|
||||
/// Reputation change for a peer when a request timed out.
|
||||
const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8);
|
||||
|
||||
/// Trait used by the `OnDemandCore` service to communicate messages back to the network.
|
||||
pub trait OnDemandNetwork<B: BlockT> {
|
||||
/// Trait used by the `LightDispatch` service to communicate messages back to the network.
|
||||
pub trait LightDispatchNetwork<B: BlockT> {
|
||||
/// Adjusts the reputation of the given peer.
|
||||
fn report_peer(&mut self, who: &PeerId, reputation_change: i32);
|
||||
|
||||
@@ -97,19 +100,30 @@ pub trait OnDemandNetwork<B: BlockT> {
|
||||
);
|
||||
}
|
||||
|
||||
/// On-demand requests service. Dispatches requests to appropriate peers.
|
||||
pub struct OnDemandCore<B: BlockT> {
|
||||
/// Light client requests service. Dispatches requests to appropriate peers.
|
||||
pub struct LightDispatch<B: BlockT> {
|
||||
/// Verifies that responses are correct. Passed at initialization.
|
||||
checker: Arc<dyn FetchChecker<B>>,
|
||||
/// Numeric ID to assign to the next outgoing request. Used to assign responses to their
|
||||
/// corresponding request.
|
||||
next_request_id: u64,
|
||||
/// Requests that we have yet to send out on the network.
|
||||
pending_requests: VecDeque<Request<B>>,
|
||||
/// List of nodes to which we have sent a request and that are yet to answer.
|
||||
active_peers: LinkedHashMap<PeerId, Request<B>>,
|
||||
/// List of nodes that we know of that aren't doing anything and that are available for new
|
||||
/// requests.
|
||||
idle_peers: VecDeque<PeerId>,
|
||||
/// Best known block for each node in `active_peers` and `idle_peers`.
|
||||
best_blocks: HashMap<PeerId, NumberFor<B>>,
|
||||
}
|
||||
|
||||
struct Request<Block: BlockT> {
|
||||
id: u64,
|
||||
/// When the request got created or sent out to the network.
|
||||
timestamp: Instant,
|
||||
/// Number of remaining attempts to fulfill this request. If it reaches 0, we interrupt the
|
||||
/// attempt.
|
||||
retry_count: usize,
|
||||
data: RequestData<Block>,
|
||||
}
|
||||
@@ -196,12 +210,12 @@ impl<Block: BlockT> FetchChecker<Block> for AlwaysBadChecker {
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT> OnDemandCore<B> where
|
||||
impl<B: BlockT> LightDispatch<B> where
|
||||
B::Header: HeaderT,
|
||||
{
|
||||
/// Creates new on-demand requests processer.
|
||||
/// Creates new light client requests processer.
|
||||
pub fn new(checker: Arc<dyn FetchChecker<B>>) -> Self {
|
||||
OnDemandCore {
|
||||
LightDispatch {
|
||||
checker,
|
||||
next_request_id: 0,
|
||||
pending_requests: VecDeque::new(),
|
||||
@@ -212,7 +226,7 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
}
|
||||
|
||||
/// Inserts a new request in the list of requests to execute.
|
||||
pub(crate) fn add_request(&mut self, network: impl OnDemandNetwork<B>, data: RequestData<B>) {
|
||||
pub(crate) fn add_request(&mut self, network: impl LightDispatchNetwork<B>, data: RequestData<B>) {
|
||||
self.insert(RETRY_COUNT, data);
|
||||
self.dispatch(network);
|
||||
}
|
||||
@@ -234,7 +248,7 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
fn accept_response(
|
||||
&mut self,
|
||||
rtype: &str,
|
||||
mut network: impl OnDemandNetwork<B>,
|
||||
mut network: impl LightDispatchNetwork<B>,
|
||||
peer: PeerId,
|
||||
request_id: u64,
|
||||
try_accept: impl FnOnce(Request<B>, &Arc<dyn FetchChecker<B>>) -> Accept<B>
|
||||
@@ -284,9 +298,10 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
self.dispatch(network);
|
||||
}
|
||||
|
||||
/// Call this when we connect to a node on the network.
|
||||
pub fn on_connect(
|
||||
&mut self,
|
||||
network: impl OnDemandNetwork<B>,
|
||||
network: impl LightDispatchNetwork<B>,
|
||||
peer: PeerId,
|
||||
role: Roles,
|
||||
best_number: NumberFor<B>
|
||||
@@ -301,17 +316,20 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
self.dispatch(network);
|
||||
}
|
||||
|
||||
pub fn on_block_announce(&mut self, network: impl OnDemandNetwork<B>, peer: PeerId, best_number: NumberFor<B>) {
|
||||
/// Sets the best seen block for the given node.
|
||||
pub fn update_best_number(&mut self, network: impl LightDispatchNetwork<B>, peer: PeerId, best_number: NumberFor<B>) {
|
||||
self.best_blocks.insert(peer, best_number);
|
||||
self.dispatch(network);
|
||||
}
|
||||
|
||||
pub fn on_disconnect(&mut self, network: impl OnDemandNetwork<B>, peer: PeerId) {
|
||||
/// Call this when we disconnect from a node.
|
||||
pub fn on_disconnect(&mut self, network: impl LightDispatchNetwork<B>, peer: PeerId) {
|
||||
self.remove_peer(peer);
|
||||
self.dispatch(network);
|
||||
}
|
||||
|
||||
pub fn maintain_peers(&mut self, mut network: impl OnDemandNetwork<B>) {
|
||||
/// Must be called periodically in order to perform maintenance.
|
||||
pub fn maintain_peers(&mut self, mut network: impl LightDispatchNetwork<B>) {
|
||||
let now = Instant::now();
|
||||
|
||||
loop {
|
||||
@@ -329,9 +347,10 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
self.dispatch(network);
|
||||
}
|
||||
|
||||
/// Handles a remote header response message from on the network.
|
||||
pub fn on_remote_header_response(
|
||||
&mut self,
|
||||
network: impl OnDemandNetwork<B>,
|
||||
network: impl LightDispatchNetwork<B>,
|
||||
peer: PeerId,
|
||||
response: message::RemoteHeaderResponse<B::Header>
|
||||
) {
|
||||
@@ -352,9 +371,10 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
})
|
||||
}
|
||||
|
||||
/// Handles a remote read response message from on the network.
|
||||
pub fn on_remote_read_response(
|
||||
&mut self,
|
||||
network: impl OnDemandNetwork<B>,
|
||||
network: impl LightDispatchNetwork<B>,
|
||||
peer: PeerId,
|
||||
response: message::RemoteReadResponse
|
||||
) {
|
||||
@@ -387,9 +407,10 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
})
|
||||
}
|
||||
|
||||
/// Handles a remote call response message from on the network.
|
||||
pub fn on_remote_call_response(
|
||||
&mut self,
|
||||
network: impl OnDemandNetwork<B>,
|
||||
network: impl LightDispatchNetwork<B>,
|
||||
peer: PeerId,
|
||||
response: message::RemoteCallResponse
|
||||
) {
|
||||
@@ -406,9 +427,10 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
})
|
||||
}
|
||||
|
||||
/// Handles a remote changes response message from on the network.
|
||||
pub fn on_remote_changes_response(
|
||||
&mut self,
|
||||
network: impl OnDemandNetwork<B>,
|
||||
network: impl LightDispatchNetwork<B>,
|
||||
peer: PeerId,
|
||||
response: message::RemoteChangesResponse<NumberFor<B>, B::Hash>
|
||||
) {
|
||||
@@ -431,9 +453,10 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
})
|
||||
}
|
||||
|
||||
/// Handles a remote body response message from on the network.
|
||||
pub fn on_remote_body_response(
|
||||
&mut self,
|
||||
network: impl OnDemandNetwork<B>,
|
||||
network: impl LightDispatchNetwork<B>,
|
||||
peer: PeerId,
|
||||
response: message::BlockResponse<B>
|
||||
) {
|
||||
@@ -466,7 +489,7 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool {
|
||||
pub fn is_light_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool {
|
||||
self.active_peers.get(&peer).map_or(false, |r| r.id == request_id)
|
||||
}
|
||||
|
||||
@@ -483,7 +506,10 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_peer(&mut self, peer: PeerId) {
|
||||
/// Removes a peer from the list of known peers.
|
||||
///
|
||||
/// Puts back the active request that this node was performing into `pending_requests`.
|
||||
fn remove_peer(&mut self, peer: PeerId) {
|
||||
self.best_blocks.remove(&peer);
|
||||
|
||||
if let Some(request) = self.active_peers.remove(&peer) {
|
||||
@@ -497,7 +523,7 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
}
|
||||
|
||||
/// Dispatches pending requests.
|
||||
fn dispatch(&mut self, mut network: impl OnDemandNetwork<B>) {
|
||||
fn dispatch(&mut self, mut network: impl LightDispatchNetwork<B>) {
|
||||
let mut last_peer = self.idle_peers.back().cloned();
|
||||
let mut unhandled_requests = VecDeque::new();
|
||||
|
||||
@@ -551,6 +577,8 @@ impl<B: BlockT> OnDemandCore<B> where
|
||||
}
|
||||
|
||||
impl<Block: BlockT> Request<Block> {
|
||||
/// Returns the block that the remote needs to have in order to be able to fulfill
|
||||
/// this request.
|
||||
fn required_block(&self) -> NumberFor<Block> {
|
||||
match self.data {
|
||||
RequestData::RemoteHeader(ref data, _) => data.block,
|
||||
@@ -562,7 +590,7 @@ impl<Block: BlockT> Request<Block> {
|
||||
}
|
||||
}
|
||||
|
||||
fn send_to(&self, out: &mut impl OnDemandNetwork<Block>, peer: &PeerId) {
|
||||
fn send_to(&self, out: &mut impl LightDispatchNetwork<Block>, peer: &PeerId) {
|
||||
match self.data {
|
||||
RequestData::RemoteHeader(ref data, _) =>
|
||||
out.send_header_request(
|
||||
@@ -645,7 +673,7 @@ pub mod tests {
|
||||
use crate::config::Roles;
|
||||
use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId};
|
||||
use libp2p::PeerId;
|
||||
use super::{REQUEST_TIMEOUT, OnDemandCore, OnDemandNetwork, RequestData};
|
||||
use super::{REQUEST_TIMEOUT, LightDispatch, LightDispatchNetwork, RequestData};
|
||||
use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header};
|
||||
|
||||
struct DummyFetchChecker { ok: bool }
|
||||
@@ -711,21 +739,21 @@ pub mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn dummy(ok: bool) -> OnDemandCore<Block> {
|
||||
OnDemandCore::new(Arc::new(DummyFetchChecker { ok }))
|
||||
fn dummy(ok: bool) -> LightDispatch<Block> {
|
||||
LightDispatch::new(Arc::new(DummyFetchChecker { ok }))
|
||||
}
|
||||
|
||||
fn total_peers(on_demand: &OnDemandCore<Block>) -> usize {
|
||||
on_demand.idle_peers.len() + on_demand.active_peers.len()
|
||||
fn total_peers(light_dispatch: &LightDispatch<Block>) -> usize {
|
||||
light_dispatch.idle_peers.len() + light_dispatch.active_peers.len()
|
||||
}
|
||||
|
||||
fn receive_call_response(
|
||||
network_interface: impl OnDemandNetwork<Block>,
|
||||
on_demand: &mut OnDemandCore<Block>,
|
||||
network_interface: impl LightDispatchNetwork<Block>,
|
||||
light_dispatch: &mut LightDispatch<Block>,
|
||||
peer: PeerId,
|
||||
id: message::RequestId
|
||||
) {
|
||||
on_demand.on_remote_call_response(network_interface, peer, message::RemoteCallResponse {
|
||||
light_dispatch.on_remote_call_response(network_interface, peer, message::RemoteCallResponse {
|
||||
id: id,
|
||||
proof: vec![vec![2]],
|
||||
});
|
||||
@@ -746,7 +774,7 @@ pub mod tests {
|
||||
disconnected_peers: HashSet<PeerId>,
|
||||
}
|
||||
|
||||
impl<'a, B: BlockT> OnDemandNetwork<B> for &'a mut DummyNetwork {
|
||||
impl<'a, B: BlockT> LightDispatchNetwork<B> for &'a mut DummyNetwork {
|
||||
fn report_peer(&mut self, _: &PeerId, _: i32) {}
|
||||
fn disconnect_peer(&mut self, who: &PeerId) {
|
||||
self.disconnected_peers.insert(who.clone());
|
||||
@@ -769,16 +797,16 @@ pub mod tests {
|
||||
#[test]
|
||||
fn knows_about_peers_roles() {
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let peer0 = PeerId::random();
|
||||
let peer1 = PeerId::random();
|
||||
let peer2 = PeerId::random();
|
||||
on_demand.on_connect(&mut network_interface, peer0, Roles::LIGHT, 1000);
|
||||
on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 2000);
|
||||
on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::AUTHORITY, 3000);
|
||||
assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(on_demand.best_blocks.get(&peer1), Some(&2000));
|
||||
assert_eq!(on_demand.best_blocks.get(&peer2), Some(&3000));
|
||||
light_dispatch.on_connect(&mut network_interface, peer0, Roles::LIGHT, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 2000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::AUTHORITY, 3000);
|
||||
assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(light_dispatch.best_blocks.get(&peer1), Some(&2000));
|
||||
assert_eq!(light_dispatch.best_blocks.get(&peer2), Some(&3000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -786,69 +814,69 @@ pub mod tests {
|
||||
let peer0 = PeerId::random();
|
||||
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let mut on_demand = dummy(true);
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 100);
|
||||
assert_eq!(1, total_peers(&on_demand));
|
||||
assert!(!on_demand.best_blocks.is_empty());
|
||||
let mut light_dispatch = dummy(true);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 100);
|
||||
assert_eq!(1, total_peers(&light_dispatch));
|
||||
assert!(!light_dispatch.best_blocks.is_empty());
|
||||
|
||||
on_demand.on_disconnect(&mut network_interface, peer0);
|
||||
assert_eq!(0, total_peers(&on_demand));
|
||||
assert!(on_demand.best_blocks.is_empty());
|
||||
light_dispatch.on_disconnect(&mut network_interface, peer0);
|
||||
assert_eq!(0, total_peers(&light_dispatch));
|
||||
assert!(light_dispatch.best_blocks.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disconnects_from_timeouted_peer() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer0 = PeerId::random();
|
||||
let peer1 = PeerId::random();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 1000);
|
||||
assert_eq!(vec![peer0.clone(), peer1.clone()], on_demand.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert!(on_demand.active_peers.is_empty());
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 1000);
|
||||
assert_eq!(vec![peer0.clone(), peer1.clone()], light_dispatch.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert!(light_dispatch.active_peers.is_empty());
|
||||
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
block: Default::default(),
|
||||
header: dummy_header(),
|
||||
method: "test".into(),
|
||||
call_data: vec![],
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
assert_eq!(vec![peer1.clone()], on_demand.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(vec![peer0.clone()], on_demand.active_peers.keys().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(vec![peer1.clone()], light_dispatch.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(vec![peer0.clone()], light_dispatch.active_peers.keys().cloned().collect::<Vec<_>>());
|
||||
|
||||
on_demand.active_peers[&peer0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT;
|
||||
on_demand.maintain_peers(&mut network_interface);
|
||||
assert!(on_demand.idle_peers.is_empty());
|
||||
assert_eq!(vec![peer1.clone()], on_demand.active_peers.keys().cloned().collect::<Vec<_>>());
|
||||
light_dispatch.active_peers[&peer0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT;
|
||||
light_dispatch.maintain_peers(&mut network_interface);
|
||||
assert!(light_dispatch.idle_peers.is_empty());
|
||||
assert_eq!(vec![peer1.clone()], light_dispatch.active_peers.keys().cloned().collect::<Vec<_>>());
|
||||
assert_disconnected_peer(&network_interface);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disconnects_from_peer_on_response_with_wrong_id() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let peer0 = PeerId::random();
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
block: Default::default(),
|
||||
header: dummy_header(),
|
||||
method: "test".into(),
|
||||
call_data: vec![],
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
receive_call_response(&mut network_interface, &mut on_demand, peer0, 1);
|
||||
receive_call_response(&mut network_interface, &mut light_dispatch, peer0, 1);
|
||||
assert_disconnected_peer(&network_interface);
|
||||
assert_eq!(on_demand.pending_requests.len(), 1);
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disconnects_from_peer_on_incorrect_response() {
|
||||
let mut on_demand = dummy(false);
|
||||
let mut light_dispatch = dummy(false);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer0 = PeerId::random();
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
block: Default::default(),
|
||||
header: dummy_header(),
|
||||
method: "test".into(),
|
||||
@@ -856,31 +884,31 @@ pub mod tests {
|
||||
retry_count: Some(1),
|
||||
}, oneshot::channel().0));
|
||||
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
receive_call_response(&mut network_interface, &mut on_demand, peer0.clone(), 0);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
receive_call_response(&mut network_interface, &mut light_dispatch, peer0.clone(), 0);
|
||||
assert_disconnected_peer(&network_interface);
|
||||
assert_eq!(on_demand.pending_requests.len(), 1);
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disconnects_from_peer_on_unexpected_response() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer0 = PeerId::random();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
|
||||
receive_call_response(&mut network_interface, &mut on_demand, peer0, 0);
|
||||
receive_call_response(&mut network_interface, &mut light_dispatch, peer0, 0);
|
||||
assert_disconnected_peer(&network_interface);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disconnects_from_peer_on_wrong_response_type() {
|
||||
let mut on_demand = dummy(false);
|
||||
let mut light_dispatch = dummy(false);
|
||||
let peer0 = PeerId::random();
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
block: Default::default(),
|
||||
header: dummy_header(),
|
||||
method: "test".into(),
|
||||
@@ -888,26 +916,26 @@ pub mod tests {
|
||||
retry_count: Some(1),
|
||||
}, oneshot::channel().0));
|
||||
|
||||
on_demand.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse {
|
||||
light_dispatch.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse {
|
||||
id: 0,
|
||||
proof: vec![vec![2]],
|
||||
});
|
||||
assert_disconnected_peer(&network_interface);
|
||||
assert_eq!(on_demand.pending_requests.len(), 1);
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receives_remote_failure_after_retry_count_failures() {
|
||||
let retry_count = 2;
|
||||
let peer_ids = (0 .. retry_count + 1).map(|_| PeerId::random()).collect::<Vec<_>>();
|
||||
let mut on_demand = dummy(false);
|
||||
let mut light_dispatch = dummy(false);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
for i in 0..retry_count+1 {
|
||||
on_demand.on_connect(&mut network_interface, peer_ids[i].clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer_ids[i].clone(), Roles::FULL, 1000);
|
||||
}
|
||||
|
||||
let (tx, mut response) = oneshot::channel();
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
block: Default::default(),
|
||||
header: dummy_header(),
|
||||
method: "test".into(),
|
||||
@@ -917,7 +945,7 @@ pub mod tests {
|
||||
|
||||
for i in 0..retry_count {
|
||||
assert!(response.try_recv().unwrap().is_none());
|
||||
receive_call_response(&mut network_interface, &mut on_demand, peer_ids[i].clone(), i as u64);
|
||||
receive_call_response(&mut network_interface, &mut light_dispatch, peer_ids[i].clone(), i as u64);
|
||||
}
|
||||
|
||||
assert!(response.try_recv().unwrap().unwrap().is_err());
|
||||
@@ -925,13 +953,13 @@ pub mod tests {
|
||||
|
||||
#[test]
|
||||
fn receives_remote_call_response() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer0 = PeerId::random();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
|
||||
let (tx, response) = oneshot::channel();
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest {
|
||||
block: Default::default(),
|
||||
header: dummy_header(),
|
||||
method: "test".into(),
|
||||
@@ -939,26 +967,26 @@ pub mod tests {
|
||||
retry_count: None,
|
||||
}, tx));
|
||||
|
||||
receive_call_response(&mut network_interface, &mut on_demand, peer0.clone(), 0);
|
||||
receive_call_response(&mut network_interface, &mut light_dispatch, peer0.clone(), 0);
|
||||
assert_eq!(response.wait().unwrap().unwrap(), vec![42]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receives_remote_read_response() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer0 = PeerId::random();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
|
||||
let (tx, response) = oneshot::channel();
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteRead(RemoteReadRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteRead(RemoteReadRequest {
|
||||
header: dummy_header(),
|
||||
block: Default::default(),
|
||||
key: b":key".to_vec(),
|
||||
retry_count: None,
|
||||
}, tx));
|
||||
|
||||
on_demand.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse {
|
||||
light_dispatch.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse {
|
||||
id: 0,
|
||||
proof: vec![vec![2]],
|
||||
});
|
||||
@@ -967,13 +995,13 @@ pub mod tests {
|
||||
|
||||
#[test]
|
||||
fn receives_remote_read_child_response() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer0 = PeerId::random();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
|
||||
let (tx, response) = oneshot::channel();
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteReadChild(RemoteReadChildRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteReadChild(RemoteReadChildRequest {
|
||||
header: dummy_header(),
|
||||
block: Default::default(),
|
||||
storage_key: b":child_storage:sub".to_vec(),
|
||||
@@ -981,7 +1009,7 @@ pub mod tests {
|
||||
retry_count: None,
|
||||
}, tx));
|
||||
|
||||
on_demand.on_remote_read_response(&mut network_interface,
|
||||
light_dispatch.on_remote_read_response(&mut network_interface,
|
||||
peer0.clone(), message::RemoteReadResponse {
|
||||
id: 0,
|
||||
proof: vec![vec![2]],
|
||||
@@ -991,19 +1019,19 @@ pub mod tests {
|
||||
|
||||
#[test]
|
||||
fn receives_remote_header_response() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer0 = PeerId::random();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
|
||||
let (tx, response) = oneshot::channel();
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 1,
|
||||
retry_count: None,
|
||||
}, tx));
|
||||
|
||||
on_demand.on_remote_header_response(&mut network_interface, peer0.clone(), message::RemoteHeaderResponse {
|
||||
light_dispatch.on_remote_header_response(&mut network_interface, peer0.clone(), message::RemoteHeaderResponse {
|
||||
id: 0,
|
||||
header: Some(Header {
|
||||
parent_hash: Default::default(),
|
||||
@@ -1022,13 +1050,13 @@ pub mod tests {
|
||||
|
||||
#[test]
|
||||
fn receives_remote_changes_response() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer0 = PeerId::random();
|
||||
on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000);
|
||||
|
||||
let (tx, response) = oneshot::channel();
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteChanges(RemoteChangesRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteChanges(RemoteChangesRequest {
|
||||
changes_trie_config: changes_trie_config(),
|
||||
first_block: (1, Default::default()),
|
||||
last_block: (100, Default::default()),
|
||||
@@ -1038,7 +1066,7 @@ pub mod tests {
|
||||
retry_count: None,
|
||||
}, tx));
|
||||
|
||||
on_demand.on_remote_changes_response(&mut network_interface, peer0.clone(), message::RemoteChangesResponse {
|
||||
light_dispatch.on_remote_changes_response(&mut network_interface, peer0.clone(), message::RemoteChangesResponse {
|
||||
id: 0,
|
||||
max: 1000,
|
||||
proof: vec![vec![2]],
|
||||
@@ -1050,52 +1078,52 @@ pub mod tests {
|
||||
|
||||
#[test]
|
||||
fn does_not_sends_request_to_peer_who_has_no_required_block() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer1 = PeerId::random();
|
||||
let peer2 = PeerId::random();
|
||||
|
||||
on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 100);
|
||||
light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 100);
|
||||
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 200,
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 250,
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 250,
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
|
||||
on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 150);
|
||||
light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 150);
|
||||
|
||||
assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(on_demand.pending_requests.len(), 3);
|
||||
assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 3);
|
||||
|
||||
on_demand.on_block_announce(&mut network_interface, peer1.clone(), 250);
|
||||
light_dispatch.update_best_number(&mut network_interface, peer1.clone(), 250);
|
||||
|
||||
assert_eq!(vec![peer2.clone()], on_demand.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(on_demand.pending_requests.len(), 2);
|
||||
assert_eq!(vec![peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 2);
|
||||
|
||||
on_demand.on_block_announce(&mut network_interface, peer2.clone(), 250);
|
||||
light_dispatch.update_best_number(&mut network_interface, peer2.clone(), 250);
|
||||
|
||||
assert!(!on_demand.idle_peers.iter().any(|_| true));
|
||||
assert_eq!(on_demand.pending_requests.len(), 1);
|
||||
assert!(!light_dispatch.idle_peers.iter().any(|_| true));
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 1);
|
||||
|
||||
on_demand.on_remote_header_response(&mut network_interface, peer1.clone(), message::RemoteHeaderResponse {
|
||||
light_dispatch.on_remote_header_response(&mut network_interface, peer1.clone(), message::RemoteHeaderResponse {
|
||||
id: 0,
|
||||
header: Some(dummy_header()),
|
||||
proof: vec![],
|
||||
});
|
||||
|
||||
assert!(!on_demand.idle_peers.iter().any(|_| true));
|
||||
assert_eq!(on_demand.pending_requests.len(), 0);
|
||||
assert!(!light_dispatch.idle_peers.iter().any(|_| true));
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1103,70 +1131,70 @@ pub mod tests {
|
||||
// this test is a regression for a bug where the dispatch function would
|
||||
// loop forever after dispatching a request to the last peer, since the
|
||||
// last peer was not updated
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer1 = PeerId::random();
|
||||
let peer2 = PeerId::random();
|
||||
let peer3 = PeerId::random();
|
||||
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 250,
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 250,
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
|
||||
on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 200);
|
||||
on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 200);
|
||||
on_demand.on_connect(&mut network_interface, peer3.clone(), Roles::FULL, 250);
|
||||
light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 200);
|
||||
light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 200);
|
||||
light_dispatch.on_connect(&mut network_interface, peer3.clone(), Roles::FULL, 250);
|
||||
|
||||
assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(on_demand.pending_requests.len(), 1);
|
||||
assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tries_to_send_all_pending_requests() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer1 = PeerId::random();
|
||||
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 300,
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 250,
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
|
||||
on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250);
|
||||
light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250);
|
||||
|
||||
assert!(on_demand.idle_peers.iter().cloned().collect::<Vec<_>>().is_empty());
|
||||
assert_eq!(on_demand.pending_requests.len(), 1);
|
||||
assert!(light_dispatch.idle_peers.iter().cloned().collect::<Vec<_>>().is_empty());
|
||||
assert_eq!(light_dispatch.pending_requests.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_body_with_one_block_body_should_succeed() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer1 = PeerId::random();
|
||||
|
||||
let header = dummy_header();
|
||||
on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250);
|
||||
light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250);
|
||||
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest {
|
||||
header: header.clone(),
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
|
||||
assert!(on_demand.pending_requests.is_empty());
|
||||
assert_eq!(on_demand.active_peers.len(), 1);
|
||||
assert!(light_dispatch.pending_requests.is_empty());
|
||||
assert_eq!(light_dispatch.active_peers.len(), 1);
|
||||
|
||||
let block = message::BlockData::<Block> {
|
||||
hash: primitives::H256::random(),
|
||||
@@ -1182,28 +1210,28 @@ pub mod tests {
|
||||
blocks: vec![block],
|
||||
};
|
||||
|
||||
on_demand.on_remote_body_response(&mut network_interface, peer1.clone(), response);
|
||||
light_dispatch.on_remote_body_response(&mut network_interface, peer1.clone(), response);
|
||||
|
||||
assert!(on_demand.active_peers.is_empty());
|
||||
assert_eq!(on_demand.idle_peers.len(), 1);
|
||||
assert!(light_dispatch.active_peers.is_empty());
|
||||
assert_eq!(light_dispatch.idle_peers.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_body_with_three_bodies_should_fail() {
|
||||
let mut on_demand = dummy(true);
|
||||
let mut light_dispatch = dummy(true);
|
||||
let mut network_interface = DummyNetwork::default();
|
||||
let peer1 = PeerId::random();
|
||||
|
||||
let header = dummy_header();
|
||||
on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250);
|
||||
light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250);
|
||||
|
||||
on_demand.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest {
|
||||
light_dispatch.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest {
|
||||
header: header.clone(),
|
||||
retry_count: None,
|
||||
}, oneshot::channel().0));
|
||||
|
||||
assert!(on_demand.pending_requests.is_empty());
|
||||
assert_eq!(on_demand.active_peers.len(), 1);
|
||||
assert!(light_dispatch.pending_requests.is_empty());
|
||||
assert_eq!(light_dispatch.active_peers.len(), 1);
|
||||
|
||||
let response = {
|
||||
let blocks: Vec<_> = (0..3).map(|_| message::BlockData::<Block> {
|
||||
@@ -1221,8 +1249,8 @@ pub mod tests {
|
||||
}
|
||||
};
|
||||
|
||||
on_demand.on_remote_body_response(&mut network_interface, peer1.clone(), response);
|
||||
assert!(on_demand.active_peers.is_empty());
|
||||
assert!(on_demand.idle_peers.is_empty(), "peer should be disconnected after bad response");
|
||||
light_dispatch.on_remote_body_response(&mut network_interface, peer1.clone(), response);
|
||||
assert!(light_dispatch.active_peers.is_empty());
|
||||
assert!(light_dispatch.idle_peers.is_empty(), "peer should be disconnected after bad response");
|
||||
}
|
||||
}
|
||||
@@ -47,7 +47,7 @@ use crate::config::{Params, TransportConfig};
|
||||
use crate::error::Error;
|
||||
use crate::protocol::{self, Protocol, Context, CustomMessageOutcome, PeerInfo};
|
||||
use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||
use crate::protocol::{event::Event, on_demand::{AlwaysBadChecker, RequestData}};
|
||||
use crate::protocol::{event::Event, light_dispatch::{AlwaysBadChecker, RequestData}};
|
||||
use crate::protocol::specialization::NetworkSpecialization;
|
||||
use crate::protocol::sync::SyncState;
|
||||
|
||||
@@ -241,7 +241,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
|
||||
service,
|
||||
import_queue: params.import_queue,
|
||||
from_worker,
|
||||
on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()),
|
||||
light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -585,8 +585,8 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
|
||||
import_queue: Box<dyn ImportQueue<B>>,
|
||||
/// Messages from the `NetworkService` and that must be processed.
|
||||
from_worker: mpsc::UnboundedReceiver<ServerToWorkerMsg<B, S>>,
|
||||
/// Receiver for queries from the on-demand that must be processed.
|
||||
on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
|
||||
/// Receiver for queries from the light client that must be processed.
|
||||
light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
|
||||
@@ -602,10 +602,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
|
||||
std::task::Poll::Pending::<Result<(), ()>>
|
||||
}).compat().poll();
|
||||
|
||||
// Check for new incoming on-demand requests.
|
||||
if let Some(on_demand_in) = self.on_demand_in.as_mut() {
|
||||
while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() {
|
||||
self.network_service.user_protocol_mut().add_on_demand_request(rq);
|
||||
// Check for new incoming light client requests.
|
||||
if let Some(light_client_rqs) = self.light_client_rqs.as_mut() {
|
||||
while let Ok(Async::Ready(Some(rq))) = light_client_rqs.poll() {
|
||||
self.network_service.user_protocol_mut().add_light_client_request(rq);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user