Switch to new light client protocol (#5472)

* Switch to the new protocol

* Oops, forgot to remove light_dispatch.rs

* Fix tests

* Address review
This commit is contained in:
Pierre Krieger
2020-04-01 19:44:42 +02:00
committed by GitHub
parent c4aa597516
commit a8aedfa16f
7 changed files with 482 additions and 1683 deletions
+48 -251
View File
@@ -38,21 +38,20 @@ use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub
};
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::{BlockAnnounce, Message};
use message::generic::{Message as GenericMessage, ConsensusMessage};
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
use prometheus_endpoint::{Registry, Gauge, GaugeVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
use crate::service::{TransactionPool, ExHashT};
use crate::config::{BoxFinalityProofRequestBuilder, Roles};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use crate::chain::{Client, FinalityProofProvider};
use sc_client_api::{FetchChecker, ChangesProof, StorageProof};
use sc_client_api::{ChangesProof, StorageProof};
use crate::error;
use util::LruHashSet;
use wasm_timer::Instant;
@@ -74,7 +73,6 @@ pub mod block_requests;
pub mod message;
pub mod event;
pub mod light_client_handler;
pub mod light_dispatch;
pub mod sync;
pub use block_requests::BlockRequests;
@@ -201,9 +199,9 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Interval at which we call `propagate_extrinsics`.
propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
config: ProtocolConfig,
/// Handler for light client requests.
light_dispatch: LightDispatch<B>,
genesis_hash: B::Hash,
sync: ChainSync<B>,
context_data: ContextData<B, H>,
@@ -276,132 +274,6 @@ pub struct PeerInfo<B: BlockT> {
pub best_number: <B::Header as HeaderT>::Number,
}
struct LightDispatchIn<'a> {
behaviour: &'a mut GenericProto,
peerset: sc_peerset::PeersetHandle,
}
impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
fn report_peer(&mut self, who: &PeerId, reputation: sc_peerset::ReputationChange) {
self.peerset.report_peer(who.clone(), reputation)
}
fn disconnect_peer(&mut self, who: &PeerId) {
self.behaviour.disconnect_peer(who)
}
fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <<B as BlockT>::Header as HeaderT>::Number) {
let message: Message<B> = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest {
id,
block,
});
self.behaviour.send_packet(who, message.encode())
}
fn send_read_request(
&mut self,
who: &PeerId,
id: RequestId,
block: <B as BlockT>::Hash,
keys: Vec<Vec<u8>>,
) {
let message: Message<B> = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest {
id,
block,
keys,
});
self.behaviour.send_packet(who, message.encode())
}
fn send_read_child_request(
&mut self,
who: &PeerId,
id: RequestId,
block: <B as BlockT>::Hash,
storage_key: Vec<u8>,
child_info: Vec<u8>,
child_type: u32,
keys: Vec<Vec<u8>>,
) {
let message: Message<B> = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest {
id,
block,
storage_key,
child_info,
child_type,
keys,
});
self.behaviour.send_packet(who, message.encode())
}
fn send_call_request(
&mut self,
who: &PeerId,
id: RequestId,
block: <B as BlockT>::Hash,
method: String,
data: Vec<u8>
) {
let message: Message<B> = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest {
id,
block,
method,
data,
});
self.behaviour.send_packet(who, message.encode())
}
fn send_changes_request(
&mut self,
who: &PeerId,
id: RequestId,
first: <B as BlockT>::Hash,
last: <B as BlockT>::Hash,
min: <B as BlockT>::Hash,
max: <B as BlockT>::Hash,
storage_key: Option<Vec<u8>>,
key: Vec<u8>,
) {
let message: Message<B> = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest {
id,
first,
last,
min,
max,
storage_key,
key,
});
self.behaviour.send_packet(who, message.encode())
}
fn send_body_request(
&mut self,
who: &PeerId,
id: RequestId,
fields: BlockAttributes,
from: FromBlock<<B as BlockT>::Hash, <<B as BlockT>::Header as HeaderT>::Number>,
to: Option<<B as BlockT>::Hash>,
direction: Direction,
max: Option<u32>
) {
let message: Message<B> = message::generic::Message::BlockRequest(message::BlockRequest::<B> {
id,
fields,
from,
to,
direction,
max,
});
self.behaviour.send_packet(who, message.encode())
}
}
/// Data necessary to create a context.
struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers
@@ -444,7 +316,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn new(
config: ProtocolConfig,
chain: Arc<dyn Client<B>>,
checker: Arc<dyn FetchChecker<B>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
@@ -500,13 +371,13 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let protocol = Protocol {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
pending_messages: VecDeque::new(),
config,
context_data: ContextData {
peers: HashMap::new(),
stats: HashMap::new(),
chain,
},
light_dispatch: LightDispatch::new(checker),
genesis_hash: info.genesis_hash,
sync,
handshaking_peers: HashMap::new(),
@@ -609,20 +480,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.sync.num_sync_requests()
}
/// Starts a new data demand request.
///
/// The parameter contains a `Sender` where the result, once received, must be sent.
pub(crate) fn add_light_client_request(&mut self, rq: RequestData<B>) {
self.light_dispatch.add_request(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, rq);
}
fn is_light_response(&self, who: &PeerId, response_id: message::RequestId) -> bool {
self.light_dispatch.is_light_response(&who, response_id)
}
fn handle_response(
&mut self,
who: PeerId,
@@ -682,15 +539,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
GenericMessage::Status(s) => return self.on_status_message(who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
// Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter.
if self.is_light_response(&who, r.id) {
self.on_remote_body_response(who, r);
} else {
if let Some(request) = self.handle_response(who.clone(), &r) {
let outcome = self.on_block_response(who.clone(), request, r);
self.update_peer_info(&who);
return outcome
}
if let Some(request) = self.handle_response(who.clone(), &r) {
let outcome = self.on_block_response(who.clone(), request, r);
self.update_peer_info(&who);
return outcome
}
},
GenericMessage::BlockAnnounce(announce) => {
@@ -701,20 +553,20 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
GenericMessage::Transactions(m) =>
self.on_extrinsics(who, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request),
GenericMessage::RemoteCallResponse(response) =>
self.on_remote_call_response(who, response),
GenericMessage::RemoteCallResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
GenericMessage::RemoteReadRequest(request) =>
self.on_remote_read_request(who, request),
GenericMessage::RemoteReadResponse(response) =>
self.on_remote_read_response(who, response),
GenericMessage::RemoteReadResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteReadResponse"),
GenericMessage::RemoteHeaderRequest(request) =>
self.on_remote_header_request(who, request),
GenericMessage::RemoteHeaderResponse(response) =>
self.on_remote_header_response(who, response),
GenericMessage::RemoteHeaderResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteHeaderResponse"),
GenericMessage::RemoteChangesRequest(request) =>
self.on_remote_changes_request(who, request),
GenericMessage::RemoteChangesResponse(response) =>
self.on_remote_changes_response(who, response),
GenericMessage::RemoteChangesResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"),
GenericMessage::FinalityProofRequest(request) =>
self.on_finality_proof_request(who, request),
GenericMessage::FinalityProofResponse(response) =>
@@ -805,10 +657,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};
if let Some(_peer_data) = removed {
self.sync.peer_disconnected(peer.clone());
self.light_dispatch.on_disconnect(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, &peer);
// Notify all the notification protocols as closed.
CustomMessageOutcome::NotificationStreamClosed {
@@ -989,10 +837,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// > **Note**: This method normally doesn't have to be called except for testing purposes.
pub fn tick(&mut self) {
self.maintain_peers();
self.light_dispatch.maintain_peers(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
});
self.report_metrics()
}
@@ -1140,10 +984,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};
let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
self.light_dispatch.on_connect(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who.clone(), status.roles, status.best_number);
self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number));
if info.roles.is_full() {
match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
Ok(None) => (),
@@ -1408,13 +1249,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
announce: BlockAnnounce<B::Header>,
) -> CustomMessageOutcome<B> {
let hash = announce.header.hash();
let number = *announce.header.number();
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
peer.known_blocks.insert(hash.clone());
}
self.light_dispatch.update_best_number(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who.clone(), *announce.header.number());
let is_their_best = match announce.state.unwrap_or(message::BlockState::Best) {
message::BlockState::Best => true,
@@ -1429,7 +1268,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
// 1) we're on light client;
// AND
// 2) parent block is already imported and not pruned.
return CustomMessageOutcome::None
if is_their_best {
return CustomMessageOutcome::PeerNewBest(who, number);
} else {
return CustomMessageOutcome::None;
}
}
sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import.
}
@@ -1454,15 +1297,28 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
},
);
match blocks_to_import {
Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks),
Ok(sync::OnBlockData::Import(origin, blocks)) => {
if is_their_best {
self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who, number));
}
CustomMessageOutcome::BlockImport(origin, blocks)
},
Ok(sync::OnBlockData::Request(peer, req)) => {
self.send_request(&peer, GenericMessage::BlockRequest(req));
CustomMessageOutcome::None
if is_their_best {
CustomMessageOutcome::PeerNewBest(who, number)
} else {
CustomMessageOutcome::None
}
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
if is_their_best {
CustomMessageOutcome::PeerNewBest(who, number)
} else {
CustomMessageOutcome::None
}
}
}
}
@@ -1597,18 +1453,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.sync.on_finality_proof_import(request_block, finalization_result)
}
fn on_remote_call_response(
&mut self,
who: PeerId,
response: message::RemoteCallResponse
) {
trace!(target: "sync", "Remote call response {} from {}", response.id, who);
self.light_dispatch.on_remote_call_response(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who, response);
}
fn on_remote_read_request(
&mut self,
who: PeerId,
@@ -1723,18 +1567,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
);
}
fn on_remote_read_response(
&mut self,
who: PeerId,
response: message::RemoteReadResponse
) {
trace!(target: "sync", "Remote read response {} from {}", response.id, who);
self.light_dispatch.on_remote_read_response(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who, response);
}
fn on_remote_header_request(
&mut self,
who: PeerId,
@@ -1765,18 +1597,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
);
}
fn on_remote_header_response(
&mut self,
who: PeerId,
response: message::RemoteHeaderResponse<B::Header>,
) {
trace!(target: "sync", "Remote header proof response {} from {}", response.id, who);
self.light_dispatch.on_remote_header_response(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who, response);
}
fn on_remote_changes_request(
&mut self,
who: PeerId,
@@ -1838,22 +1658,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
);
}
fn on_remote_changes_response(
&mut self,
who: PeerId,
response: message::RemoteChangesResponse<NumberFor<B>, B::Hash>,
) {
trace!(target: "sync", "Remote changes proof response {} from {} (max={})",
response.id,
who,
response.max
);
self.light_dispatch.on_remote_changes_response(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who, response);
}
fn on_finality_proof_request(
&mut self,
who: PeerId,
@@ -1905,17 +1709,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
fn on_remote_body_response(
&mut self,
peer: PeerId,
response: message::BlockResponse<B>
) {
self.light_dispatch.on_remote_body_response(LightDispatchIn {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, peer, response);
}
fn format_stats(&self) -> String {
let mut out = String::new();
for (id, stats) in &self.context_data.stats {
@@ -1987,6 +1780,8 @@ pub enum CustomMessageOutcome<B: BlockT> {
NotificationStreamClosed { remote: PeerId, protocols: Vec<ConsensusEngineId> },
/// Messages have been received on one or more notifications protocols.
NotificationsReceived { remote: PeerId, messages: Vec<(ConsensusEngineId, Bytes)> },
/// Peer has a reported a new head of chain.
PeerNewBest(PeerId, NumberFor<B>),
None,
}
@@ -2067,6 +1862,10 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
Self::OutEvent
>
> {
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
}
while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) {
self.tick();
}
@@ -2221,7 +2020,6 @@ impl<B: BlockT, H: ExHashT> Drop for Protocol<B, H> {
#[cfg(test)]
mod tests {
use crate::PeerId;
use crate::protocol::light_dispatch::AlwaysBadChecker;
use crate::config::{EmptyTransactionPool, Roles};
use super::{CustomMessageOutcome, Protocol, ProtocolConfig};
@@ -2240,7 +2038,6 @@ mod tests {
max_parallel_downloads: 10,
},
client.clone(),
Arc::new(AlwaysBadChecker),
Arc::new(EmptyTransactionPool),
None,
None,