diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index f77b50dac6..9ce75c4749 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -26,7 +26,7 @@ use client::error::Error as ClientError; use client::light::fetcher::{FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof, RemoteReadChildRequest, RemoteBodyRequest}; -use crate::message; +use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; use network_libp2p::PeerId; use crate::config::Roles; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; @@ -46,8 +46,55 @@ pub trait OnDemandNetwork { /// Disconnect from the given peer. Used in case of misbehaviour. fn disconnect_peer(&mut self, who: &PeerId); - /// Send a request to a peer. - fn send_request(&mut self, who: &PeerId, message: message::Message); + /// Send to `who` a request for a header. + fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <::Header as HeaderT>::Number); + + /// Send to `who` a read request. + fn send_read_request(&mut self, who: &PeerId, id: RequestId, block: ::Hash, key: Vec); + + /// Send to `who` a child read request. + fn send_read_child_request( + &mut self, + who: &PeerId, + id: RequestId, + block: ::Hash, + storage_key: Vec, + key: Vec + ); + + /// Send to `who` a call request. + fn send_call_request( + &mut self, + who: &PeerId, + id: RequestId, + block: ::Hash, + method: String, + data: Vec + ); + + /// Send to `who` a changes request. + fn send_changes_request( + &mut self, + who: &PeerId, + id: RequestId, + first: ::Hash, + last: ::Hash, + min: ::Hash, + max: ::Hash, + key: Vec + ); + + /// Send to `who` a body request. + fn send_body_request( + &mut self, + who: &PeerId, + id: RequestId, + fields: BlockAttributes, + from: FromBlock<::Hash, <::Header as HeaderT>::Number>, + to: Option, + direction: Direction, + max: Option + ); } /// On-demand requests service. Dispatches requests to appropriate peers. @@ -495,7 +542,7 @@ impl OnDemandCore where let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); request.timestamp = Instant::now(); trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); - network.send_request(&peer, request.message()); + request.send_to(&mut network, &peer); self.active_peers.insert(peer, request); } @@ -515,53 +562,57 @@ impl Request { } } - fn message(&self) -> message::Message { + fn send_to(&self, out: &mut impl OnDemandNetwork, peer: &PeerId) { match self.data { RequestData::RemoteHeader(ref data, _) => - message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest { - id: self.id, - block: data.block, - }), + out.send_header_request( + peer, + self.id, + data.block, + ), RequestData::RemoteRead(ref data, _) => - message::generic::Message::RemoteReadRequest(message::RemoteReadRequest { - id: self.id, - block: data.block, - key: data.key.clone(), - }), + out.send_read_request( + peer, + self.id, + data.block, + data.key.clone(), + ), RequestData::RemoteReadChild(ref data, _) => - message::generic::Message::RemoteReadChildRequest( - message::RemoteReadChildRequest { - id: self.id, - block: data.block, - storage_key: data.storage_key.clone(), - key: data.key.clone(), - }), + out.send_read_child_request( + peer, + self.id, + data.block, + data.storage_key.clone(), + data.key.clone(), + ), RequestData::RemoteCall(ref data, _) => - message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { - id: self.id, - block: data.block, - method: data.method.clone(), - data: data.call_data.clone(), - }), + out.send_call_request( + peer, + self.id, + data.block, + data.method.clone(), + data.call_data.clone(), + ), RequestData::RemoteChanges(ref data, _) => - message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest { - id: self.id, - first: data.first_block.1.clone(), - last: data.last_block.1.clone(), - min: data.tries_roots.1.clone(), - max: data.max_block.1.clone(), - key: data.key.clone(), - }), - RequestData::RemoteBody(ref data, _) => { - message::generic::Message::BlockRequest(message::BlockRequest:: { - id: self.id, - fields: message::BlockAttributes::BODY, - from: message::FromBlock::Hash(data.header.hash()), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }) - } + out.send_changes_request( + peer, + self.id, + data.first_block.1.clone(), + data.last_block.1.clone(), + data.tries_roots.1.clone(), + data.max_block.1.clone(), + data.key.clone(), + ), + RequestData::RemoteBody(ref data, _) => + out.send_body_request( + peer, + self.id, + message::BlockAttributes::BODY, + message::FromBlock::Hash(data.header.hash()), + None, + message::Direction::Ascending, + Some(1) + ), } } } @@ -586,13 +637,13 @@ pub mod tests { use std::sync::Arc; use std::time::Instant; use futures::{Future, sync::oneshot}; - use runtime_primitives::traits::{Block as BlockT, NumberFor}; + use runtime_primitives::traits::{Block as BlockT, NumberFor, Header as HeaderT}; use client::{error::{Error as ClientError, Result as ClientResult}}; use client::light::fetcher::{FetchChecker, RemoteHeaderRequest, ChangesProof, RemoteCallRequest, RemoteReadRequest, RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest}; use crate::config::Roles; - use crate::message; + use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; use network_libp2p::PeerId; use super::{REQUEST_TIMEOUT, OnDemandCore, OnDemandNetwork, RequestData}; use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header}; @@ -700,7 +751,15 @@ pub mod tests { fn disconnect_peer(&mut self, who: &PeerId) { self.disconnected_peers.insert(who.clone()); } - fn send_request(&mut self, _: &PeerId, _: message::Message) {} + fn send_header_request(&mut self, _: &PeerId, _: RequestId, _: <::Header as HeaderT>::Number) {} + fn send_read_request(&mut self, _: &PeerId, _: RequestId, _: ::Hash, _: Vec) {} + fn send_read_child_request(&mut self, _: &PeerId, _: RequestId, _: ::Hash, _: Vec, + _: Vec) {} + fn send_call_request(&mut self, _: &PeerId, _: RequestId, _: ::Hash, _: String, _: Vec) {} + fn send_changes_request(&mut self, _: &PeerId, _: RequestId, _: ::Hash, _: ::Hash, + _: ::Hash, _: ::Hash, _: Vec) {} + fn send_body_request(&mut self, _: &PeerId, _: RequestId, _: BlockAttributes, _: FromBlock<::Hash, + <::Header as HeaderT>::Number>, _: Option, _: Direction, _: Option) {} } fn assert_disconnected_peer(dummy: &DummyNetwork) { diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 2fafc9e756..d0b14bada7 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -28,6 +28,7 @@ use crate::message::{ self, BlockRequest as BlockRequestMessage, FinalityProofRequest as FinalityProofRequestMessage, Message, }; +use crate::message::{BlockAttributes, Direction, FromBlock, RequestId}; use crate::message::generic::{Message as GenericMessage, ConsensusMessage}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; @@ -169,7 +170,102 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut NetworkOut::disconnect_peer(**self, who.clone()) } - fn send_request(&mut self, who: &PeerId, message: Message) { + fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <::Header as HeaderT>::Number) { + let message = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest { + id, + block, + }); + + NetworkOut::send_message(**self, who.clone(), message) + } + + fn send_read_request(&mut self, who: &PeerId, id: RequestId, block: ::Hash, key: Vec) { + let message = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest { + id, + block, + key, + }); + + NetworkOut::send_message(**self, who.clone(), message) + } + + fn send_read_child_request( + &mut self, + who: &PeerId, + id: RequestId, + block: ::Hash, + storage_key: Vec, + key: Vec + ) { + let message = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest { + id, + block, + storage_key, + key, + }); + + NetworkOut::send_message(**self, who.clone(), message) + } + + fn send_call_request( + &mut self, + who: &PeerId, + id: RequestId, + block: ::Hash, + method: String, + data: Vec + ) { + let message = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { + id, + block, + method, + data, + }); + + NetworkOut::send_message(**self, who.clone(), message) + } + + fn send_changes_request( + &mut self, + who: &PeerId, + id: RequestId, + first: ::Hash, + last: ::Hash, + min: ::Hash, + max: ::Hash, + key: Vec + ) { + let message = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest { + id, + first, + last, + min, + max, + key, + }); + + NetworkOut::send_message(**self, who.clone(), message) + } + + fn send_body_request( + &mut self, + who: &PeerId, + id: RequestId, + fields: BlockAttributes, + from: FromBlock<::Hash, <::Header as HeaderT>::Number>, + to: Option<::Hash>, + direction: Direction, + max: Option + ) { + let message = message::generic::Message::BlockRequest(message::BlockRequest:: { + id, + fields, + from, + to, + direction, + max, + }); + NetworkOut::send_message(**self, who.clone(), message) } }