Split the send_request method in multiple requests (#2696)

This commit is contained in:
Pierre Krieger
2019-05-28 17:13:54 +02:00
committed by Gavin Wood
parent a706d994cb
commit f329908887
2 changed files with 204 additions and 49 deletions
+107 -48
View File
@@ -26,7 +26,7 @@ use client::error::Error as ClientError;
use client::light::fetcher::{FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof,
RemoteReadChildRequest, RemoteBodyRequest};
use crate::message;
use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId};
use network_libp2p::PeerId;
use crate::config::Roles;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
@@ -46,8 +46,55 @@ pub trait OnDemandNetwork<B: BlockT> {
/// Disconnect from the given peer. Used in case of misbehaviour.
fn disconnect_peer(&mut self, who: &PeerId);
/// Send a request to a peer.
fn send_request(&mut self, who: &PeerId, message: message::Message<B>);
/// Send to `who` a request for a header.
fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <<B as BlockT>::Header as HeaderT>::Number);
/// Send to `who` a read request.
fn send_read_request(&mut self, who: &PeerId, id: RequestId, block: <B as BlockT>::Hash, key: Vec<u8>);
/// Send to `who` a child read request.
fn send_read_child_request(
&mut self,
who: &PeerId,
id: RequestId,
block: <B as BlockT>::Hash,
storage_key: Vec<u8>,
key: Vec<u8>
);
/// Send to `who` a call request.
fn send_call_request(
&mut self,
who: &PeerId,
id: RequestId,
block: <B as BlockT>::Hash,
method: String,
data: Vec<u8>
);
/// Send to `who` a changes request.
fn send_changes_request(
&mut self,
who: &PeerId,
id: RequestId,
first: <B as BlockT>::Hash,
last: <B as BlockT>::Hash,
min: <B as BlockT>::Hash,
max: <B as BlockT>::Hash,
key: Vec<u8>
);
/// Send to `who` a body request.
fn send_body_request(
&mut self,
who: &PeerId,
id: RequestId,
fields: BlockAttributes,
from: FromBlock<<B as BlockT>::Hash, <<B as BlockT>::Header as HeaderT>::Number>,
to: Option<B::Hash>,
direction: Direction,
max: Option<u32>
);
}
/// On-demand requests service. Dispatches requests to appropriate peers.
@@ -495,7 +542,7 @@ impl<B: BlockT> OnDemandCore<B> where
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
request.timestamp = Instant::now();
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
network.send_request(&peer, request.message());
request.send_to(&mut network, &peer);
self.active_peers.insert(peer, request);
}
@@ -515,53 +562,57 @@ impl<Block: BlockT> Request<Block> {
}
}
fn message(&self) -> message::Message<Block> {
fn send_to(&self, out: &mut impl OnDemandNetwork<Block>, peer: &PeerId) {
match self.data {
RequestData::RemoteHeader(ref data, _) =>
message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest {
id: self.id,
block: data.block,
}),
out.send_header_request(
peer,
self.id,
data.block,
),
RequestData::RemoteRead(ref data, _) =>
message::generic::Message::RemoteReadRequest(message::RemoteReadRequest {
id: self.id,
block: data.block,
key: data.key.clone(),
}),
out.send_read_request(
peer,
self.id,
data.block,
data.key.clone(),
),
RequestData::RemoteReadChild(ref data, _) =>
message::generic::Message::RemoteReadChildRequest(
message::RemoteReadChildRequest {
id: self.id,
block: data.block,
storage_key: data.storage_key.clone(),
key: data.key.clone(),
}),
out.send_read_child_request(
peer,
self.id,
data.block,
data.storage_key.clone(),
data.key.clone(),
),
RequestData::RemoteCall(ref data, _) =>
message::generic::Message::RemoteCallRequest(message::RemoteCallRequest {
id: self.id,
block: data.block,
method: data.method.clone(),
data: data.call_data.clone(),
}),
out.send_call_request(
peer,
self.id,
data.block,
data.method.clone(),
data.call_data.clone(),
),
RequestData::RemoteChanges(ref data, _) =>
message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest {
id: self.id,
first: data.first_block.1.clone(),
last: data.last_block.1.clone(),
min: data.tries_roots.1.clone(),
max: data.max_block.1.clone(),
key: data.key.clone(),
}),
RequestData::RemoteBody(ref data, _) => {
message::generic::Message::BlockRequest(message::BlockRequest::<Block> {
id: self.id,
fields: message::BlockAttributes::BODY,
from: message::FromBlock::Hash(data.header.hash()),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
})
}
out.send_changes_request(
peer,
self.id,
data.first_block.1.clone(),
data.last_block.1.clone(),
data.tries_roots.1.clone(),
data.max_block.1.clone(),
data.key.clone(),
),
RequestData::RemoteBody(ref data, _) =>
out.send_body_request(
peer,
self.id,
message::BlockAttributes::BODY,
message::FromBlock::Hash(data.header.hash()),
None,
message::Direction::Ascending,
Some(1)
),
}
}
}
@@ -586,13 +637,13 @@ pub mod tests {
use std::sync::Arc;
use std::time::Instant;
use futures::{Future, sync::oneshot};
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use runtime_primitives::traits::{Block as BlockT, NumberFor, Header as HeaderT};
use client::{error::{Error as ClientError, Result as ClientResult}};
use client::light::fetcher::{FetchChecker, RemoteHeaderRequest,
ChangesProof, RemoteCallRequest, RemoteReadRequest,
RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest};
use crate::config::Roles;
use crate::message;
use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId};
use network_libp2p::PeerId;
use super::{REQUEST_TIMEOUT, OnDemandCore, OnDemandNetwork, RequestData};
use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header};
@@ -700,7 +751,15 @@ pub mod tests {
fn disconnect_peer(&mut self, who: &PeerId) {
self.disconnected_peers.insert(who.clone());
}
fn send_request(&mut self, _: &PeerId, _: message::Message<B>) {}
fn send_header_request(&mut self, _: &PeerId, _: RequestId, _: <<B as BlockT>::Header as HeaderT>::Number) {}
fn send_read_request(&mut self, _: &PeerId, _: RequestId, _: <B as BlockT>::Hash, _: Vec<u8>) {}
fn send_read_child_request(&mut self, _: &PeerId, _: RequestId, _: <B as BlockT>::Hash, _: Vec<u8>,
_: Vec<u8>) {}
fn send_call_request(&mut self, _: &PeerId, _: RequestId, _: <B as BlockT>::Hash, _: String, _: Vec<u8>) {}
fn send_changes_request(&mut self, _: &PeerId, _: RequestId, _: <B as BlockT>::Hash, _: <B as BlockT>::Hash,
_: <B as BlockT>::Hash, _: <B as BlockT>::Hash, _: Vec<u8>) {}
fn send_body_request(&mut self, _: &PeerId, _: RequestId, _: BlockAttributes, _: FromBlock<<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number>, _: Option<B::Hash>, _: Direction, _: Option<u32>) {}
}
fn assert_disconnected_peer(dummy: &DummyNetwork) {
+97 -1
View File
@@ -28,6 +28,7 @@ use crate::message::{
self, BlockRequest as BlockRequestMessage,
FinalityProofRequest as FinalityProofRequestMessage, Message,
};
use crate::message::{BlockAttributes, Direction, FromBlock, RequestId};
use crate::message::generic::{Message as GenericMessage, ConsensusMessage};
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::on_demand::{OnDemandCore, OnDemandNetwork, RequestData};
@@ -169,7 +170,102 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork<B> for &'a mut &'b mut dyn NetworkOut<B>
NetworkOut::disconnect_peer(**self, who.clone())
}
fn send_request(&mut self, who: &PeerId, message: Message<B>) {
fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <<B as BlockT>::Header as HeaderT>::Number) {
let message = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest {
id,
block,
});
NetworkOut::send_message(**self, who.clone(), message)
}
fn send_read_request(&mut self, who: &PeerId, id: RequestId, block: <B as BlockT>::Hash, key: Vec<u8>) {
let message = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest {
id,
block,
key,
});
NetworkOut::send_message(**self, who.clone(), message)
}
fn send_read_child_request(
&mut self,
who: &PeerId,
id: RequestId,
block: <B as BlockT>::Hash,
storage_key: Vec<u8>,
key: Vec<u8>
) {
let message = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest {
id,
block,
storage_key,
key,
});
NetworkOut::send_message(**self, who.clone(), message)
}
fn send_call_request(
&mut self,
who: &PeerId,
id: RequestId,
block: <B as BlockT>::Hash,
method: String,
data: Vec<u8>
) {
let message = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest {
id,
block,
method,
data,
});
NetworkOut::send_message(**self, who.clone(), message)
}
fn send_changes_request(
&mut self,
who: &PeerId,
id: RequestId,
first: <B as BlockT>::Hash,
last: <B as BlockT>::Hash,
min: <B as BlockT>::Hash,
max: <B as BlockT>::Hash,
key: Vec<u8>
) {
let message = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest {
id,
first,
last,
min,
max,
key,
});
NetworkOut::send_message(**self, who.clone(), message)
}
fn send_body_request(
&mut self,
who: &PeerId,
id: RequestId,
fields: BlockAttributes,
from: FromBlock<<B as BlockT>::Hash, <<B as BlockT>::Header as HeaderT>::Number>,
to: Option<<B as BlockT>::Hash>,
direction: Direction,
max: Option<u32>
) {
let message = message::generic::Message::BlockRequest(message::BlockRequest::<B> {
id,
fields,
from,
to,
direction,
max,
});
NetworkOut::send_message(**self, who.clone(), message)
}
}