Remove the legacy requests-answering protocols (#6709)

Co-authored-by: parity-processbot <>
This commit is contained in:
Pierre Krieger
2020-08-20 17:53:37 +02:00
committed by GitHub
parent 74cf119d4a
commit 09662dc873
2 changed files with 18 additions and 323 deletions
+18 -322
View File
@@ -18,7 +18,7 @@
use crate::{
ExHashT,
chain::{Client, FinalityProofProvider},
chain::Client,
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport},
error,
utils::{interval, LruHashSet},
@@ -31,10 +31,6 @@ use libp2p::{Multiaddr, PeerId};
use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}};
use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use sp_core::{
storage::{StorageKey, PrefixedStorageKey, ChildInfo, ChildType},
hexdisplay::HexDisplay
};
use sp_consensus::{
BlockOrigin,
block_validation::BlockAnnounceValidator,
@@ -54,12 +50,11 @@ use prometheus_endpoint::{
};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map::Entry};
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use sc_client_api::{ChangesProof, StorageProof};
use wasm_timer::Instant;
mod generic_proto;
@@ -118,8 +113,6 @@ mod rep {
pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
/// Reputation change when a peer sends us a bad transaction.
pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
/// We sent an RPC query to the given node, but it failed.
pub const RPC_FAILED: Rep = Rep::new(-(1 << 12), "Remote call failed");
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// We received an unexpected response.
@@ -249,8 +242,6 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
/// Used to report reputation changes.
peerset_handle: sc_peerset::PeersetHandle,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
/// When asked for a proof of finality, we use this struct to build one.
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
/// Handles opening the unique substream and sending and receiving raw messages.
behaviour: GenericProto,
/// For each legacy gossiping engine ID, the corresponding new protocol name.
@@ -388,7 +379,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
local_peer_id: PeerId,
chain: Arc<dyn Client<B>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
protocol_id: ProtocolId,
peerset_config: sc_peerset::PeersetConfig,
@@ -464,7 +454,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
sync,
important_peers,
transaction_pool,
finality_proof_provider,
peerset_handle: peerset_handle.clone(),
behaviour,
protocol_name_by_engine: HashMap::new(),
@@ -626,27 +615,30 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
},
GenericMessage::Transactions(m) =>
self.on_transactions(who, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request),
GenericMessage::RemoteCallResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
GenericMessage::RemoteReadRequest(request) =>
self.on_remote_read_request(who, request),
GenericMessage::RemoteReadResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteReadResponse"),
GenericMessage::RemoteHeaderRequest(request) =>
self.on_remote_header_request(who, request),
GenericMessage::RemoteHeaderResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteHeaderResponse"),
GenericMessage::RemoteChangesRequest(request) =>
self.on_remote_changes_request(who, request),
GenericMessage::RemoteChangesResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"),
GenericMessage::FinalityProofRequest(request) =>
self.on_finality_proof_request(who, request),
GenericMessage::FinalityProofResponse(response) =>
return self.on_finality_proof_response(who, response),
GenericMessage::RemoteReadChildRequest(request) =>
self.on_remote_read_child_request(who, request),
GenericMessage::FinalityProofResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected FinalityProofResponse"),
GenericMessage::FinalityProofRequest(_) |
GenericMessage::RemoteReadChildRequest(_) |
GenericMessage::RemoteCallRequest(_) |
GenericMessage::RemoteReadRequest(_) |
GenericMessage::RemoteHeaderRequest(_) |
GenericMessage::RemoteChangesRequest(_) => {
debug!(
target: "sub-libp2p",
"Received no longer supported legacy request from {:?}",
who
);
self.disconnect_peer(&who);
self.peerset_handle.report_peer(who, rep::BAD_PROTOCOL);
},
GenericMessage::Consensus(msg) =>
return if self.protocol_name_by_engine.contains_key(&msg.engine_id) {
CustomMessageOutcome::NotificationsReceived {
@@ -1391,51 +1383,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.sync.on_block_finalized(&hash, *header.number())
}
fn on_remote_call_request(
&mut self,
who: PeerId,
request: message::RemoteCallRequest<B::Hash>,
) {
trace!(target: "sync", "Remote call request {} from {} ({} at {})",
request.id,
who,
request.method,
request.block
);
if let Some(metrics) = &self.metrics {
metrics.legacy_requests_received.with_label_values(&["remote-call"]).inc();
}
let proof = match self.context_data.chain.execution_proof(
&BlockId::Hash(request.block),
&request.method,
&request.data,
) {
Ok((_, proof)) => proof,
Err(error) => {
trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}",
request.id,
who,
request.method,
request.block,
error
);
self.peerset_handle.report_peer(who.clone(), rep::RPC_FAILED);
StorageProof::empty()
}
};
self.send_message(
&who,
None,
GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
id: request.id,
proof,
}),
);
}
/// Request a justification for the given block.
///
/// Uses `protocol` to queue a new justification request and tries to dispatch all pending
@@ -1522,257 +1469,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.sync.on_finality_proof_import(request_block, finalization_result)
}
fn on_remote_read_request(
&mut self,
who: PeerId,
request: message::RemoteReadRequest<B::Hash>,
) {
if let Some(metrics) = &self.metrics {
metrics.legacy_requests_received.with_label_values(&["remote-read"]).inc();
}
if request.keys.is_empty() {
debug!(target: "sync", "Invalid remote read request sent by {}", who);
self.behaviour.disconnect_peer(&who);
self.peerset_handle.report_peer(who, rep::BAD_MESSAGE);
return;
}
let keys_str = || match request.keys.len() {
1 => HexDisplay::from(&request.keys[0]).to_string(),
_ => format!(
"{}..{}",
HexDisplay::from(&request.keys[0]),
HexDisplay::from(&request.keys[request.keys.len() - 1]),
),
};
trace!(target: "sync", "Remote read request {} from {} ({} at {})",
request.id, who, keys_str(), request.block);
let proof = match self.context_data.chain.read_proof(
&BlockId::Hash(request.block),
&mut request.keys.iter().map(AsRef::as_ref)
) {
Ok(proof) => proof,
Err(error) => {
trace!(target: "sync", "Remote read request {} from {} ({} at {}) failed with: {}",
request.id,
who,
keys_str(),
request.block,
error
);
StorageProof::empty()
}
};
self.send_message(
&who,
None,
GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
id: request.id,
proof,
}),
);
}
fn on_remote_read_child_request(
&mut self,
who: PeerId,
request: message::RemoteReadChildRequest<B::Hash>,
) {
if let Some(metrics) = &self.metrics {
metrics.legacy_requests_received.with_label_values(&["remote-child"]).inc();
}
if request.keys.is_empty() {
debug!(target: "sync", "Invalid remote child read request sent by {}", who);
self.behaviour.disconnect_peer(&who);
self.peerset_handle.report_peer(who, rep::BAD_MESSAGE);
return;
}
let keys_str = || match request.keys.len() {
1 => HexDisplay::from(&request.keys[0]).to_string(),
_ => format!(
"{}..{}",
HexDisplay::from(&request.keys[0]),
HexDisplay::from(&request.keys[request.keys.len() - 1]),
),
};
trace!(target: "sync", "Remote read child request {} from {} ({} {} at {})",
request.id, who, HexDisplay::from(&request.storage_key), keys_str(), request.block);
let prefixed_key = PrefixedStorageKey::new_ref(&request.storage_key);
let child_info = match ChildType::from_prefixed_key(prefixed_key) {
Some((ChildType::ParentKeyId, storage_key)) => Ok(ChildInfo::new_default(storage_key)),
None => Err("Invalid child storage key".into()),
};
let proof = match child_info.and_then(|child_info| self.context_data.chain.read_child_proof(
&BlockId::Hash(request.block),
&child_info,
&mut request.keys.iter().map(AsRef::as_ref),
)) {
Ok(proof) => proof,
Err(error) => {
trace!(target: "sync", "Remote read child request {} from {} ({} {} at {}) failed with: {}",
request.id,
who,
HexDisplay::from(&request.storage_key),
keys_str(),
request.block,
error
);
StorageProof::empty()
}
};
self.send_message(
&who,
None,
GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
id: request.id,
proof,
}),
);
}
fn on_remote_header_request(
&mut self,
who: PeerId,
request: message::RemoteHeaderRequest<NumberFor<B>>,
) {
if let Some(metrics) = &self.metrics {
metrics.legacy_requests_received.with_label_values(&["remote-header"]).inc();
}
trace!(target: "sync", "Remote header proof request {} from {} ({})",
request.id, who, request.block);
let (header, proof) = match self.context_data.chain.header_proof(&BlockId::Number(request.block)) {
Ok((header, proof)) => (Some(header), proof),
Err(error) => {
trace!(target: "sync", "Remote header proof request {} from {} ({}) failed with: {}",
request.id,
who,
request.block,
error
);
(Default::default(), StorageProof::empty())
}
};
self.send_message(
&who,
None,
GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse {
id: request.id,
header,
proof,
}),
);
}
fn on_remote_changes_request(
&mut self,
who: PeerId,
request: message::RemoteChangesRequest<B::Hash>,
) {
if let Some(metrics) = &self.metrics {
metrics.legacy_requests_received.with_label_values(&["remote-changes"]).inc();
}
trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})",
request.id,
who,
if let Some(sk) = request.storage_key.as_ref() {
format!("{} : {}", HexDisplay::from(sk), HexDisplay::from(&request.key))
} else {
HexDisplay::from(&request.key).to_string()
},
request.first,
request.last
);
let key = StorageKey(request.key);
let prefixed_key = request.storage_key.as_ref()
.map(|storage_key| PrefixedStorageKey::new_ref(storage_key));
let (first, last, min, max) = (request.first, request.last, request.min, request.max);
let proof = match self.context_data.chain.key_changes_proof(
first,
last,
min,
max,
prefixed_key,
&key,
) {
Ok(proof) => proof,
Err(error) => {
trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}",
request.id,
who,
if let Some(sk) = request.storage_key.as_ref() {
format!("{} : {}", HexDisplay::from(sk), HexDisplay::from(&key.0))
} else {
HexDisplay::from(&key.0).to_string()
},
request.first,
request.last,
error
);
ChangesProof::<B::Header> {
max_block: Zero::zero(),
proof: vec![],
roots: BTreeMap::new(),
roots_proof: StorageProof::empty(),
}
}
};
self.send_message(
&who,
None,
GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse {
id: request.id,
max: proof.max_block,
proof: proof.proof,
roots: proof.roots.into_iter().collect(),
roots_proof: proof.roots_proof,
}),
);
}
fn on_finality_proof_request(
&mut self,
who: PeerId,
request: message::FinalityProofRequest<B::Hash>,
) {
if let Some(metrics) = &self.metrics {
metrics.legacy_requests_received.with_label_values(&["finality-proof"]).inc();
}
trace!(target: "sync", "Finality proof request from {} for {}", who, request.block);
let finality_proof = self.finality_proof_provider.as_ref()
.ok_or_else(|| String::from("Finality provider is not configured"))
.and_then(|provider|
provider.prove_finality(request.block, &request.request).map_err(|e| e.to_string())
);
let finality_proof = match finality_proof {
Ok(finality_proof) => finality_proof,
Err(error) => {
trace!(target: "sync", "Finality proof request from {} for {} failed with: {}",
who,
request.block,
error
);
None
},
};
self.send_message(
&who,
None,
GenericMessage::FinalityProofResponse(message::FinalityProofResponse {
id: 0,
block: request.block,
proof: finality_proof,
}),
);
}
/// Must be called after a [`CustomMessageOutcome::FinalityProofRequest`] has been emitted,
/// to notify of the response having arrived.
pub fn on_finality_proof_response(