mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-28 16:47:57 +00:00
Change on-the-wire protocol names to include genesis hash & fork id (#11938)
* Rename transactions protocol to include genesis hash
* Add protocol name generation to sc_network::utils
* Use utils functions for transactions protocol name generation
* Extract protocol name generation into public module
* Use sc_network::protocol_name::standard_protocol_name() for BEEFY and GRANDPA
* minor: add missing newline at EOF
* Change block-announces protocol name to include genesis_hash & fork_id
* Change protocol names to include genesis hash and fork id
Protocols changed:
- sync
- state
- light
- sync/warp
* Revert "Use sc_network::protocol_name::standard_protocol_name() for BEEFY and GRANDPA"
This reverts commit cd60a95a3face397e1b67f4bc95dd0f2b581bfae.
* Get rid of `protocol_name` module
This commit is contained in:
Generated
+2
@@ -8435,6 +8435,7 @@ name = "sc-network-light"
|
||||
version = "0.10.0-dev"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"hex",
|
||||
"libp2p",
|
||||
"log",
|
||||
"parity-scale-codec",
|
||||
@@ -8455,6 +8456,7 @@ version = "0.10.0-dev"
|
||||
dependencies = [
|
||||
"fork-tree",
|
||||
"futures",
|
||||
"hex",
|
||||
"libp2p",
|
||||
"log",
|
||||
"lru",
|
||||
|
||||
@@ -29,6 +29,9 @@ pub struct ProtocolConfig {
|
||||
/// Name of the protocol on the wire. Should be something like `/foo/bar`.
|
||||
pub name: Cow<'static, str>,
|
||||
|
||||
/// Fallback on the wire protocol names to support.
|
||||
pub fallback_names: Vec<Cow<'static, str>>,
|
||||
|
||||
/// Maximum allowed size, in bytes, of a request.
|
||||
///
|
||||
/// Any request larger than this value will be declined as a way to avoid allocating too
|
||||
|
||||
@@ -21,6 +21,7 @@ codec = { package = "parity-scale-codec", version = "3.0.0", features = [
|
||||
"derive",
|
||||
] }
|
||||
futures = "0.3.21"
|
||||
hex = "0.4.0"
|
||||
libp2p = "0.46.1"
|
||||
log = "0.4.16"
|
||||
prost = "0.10"
|
||||
|
||||
@@ -25,16 +25,31 @@ use sc_network_common::{config::ProtocolId, request_responses::ProtocolConfig};
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
/// Generate the light client protocol name from chain specific protocol identifier.
|
||||
fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
|
||||
/// Generate the light client protocol name from the genesis hash and fork id.
|
||||
fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
|
||||
if let Some(fork_id) = fork_id {
|
||||
format!("/{}/{}/light/2", hex::encode(genesis_hash), fork_id)
|
||||
} else {
|
||||
format!("/{}/light/2", hex::encode(genesis_hash))
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the legacy light client protocol name from chain specific protocol identifier.
|
||||
fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
|
||||
format!("/{}/light/2", protocol_id.as_ref())
|
||||
}
|
||||
|
||||
/// Generates a [`ProtocolConfig`] for the light client request protocol, refusing incoming
|
||||
/// requests.
|
||||
pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
|
||||
pub fn generate_protocol_config<Hash: AsRef<[u8]>>(
|
||||
protocol_id: &ProtocolId,
|
||||
genesis_hash: Hash,
|
||||
fork_id: Option<&str>,
|
||||
) -> ProtocolConfig {
|
||||
ProtocolConfig {
|
||||
name: generate_protocol_name(protocol_id).into(),
|
||||
name: generate_protocol_name(genesis_hash, fork_id).into(),
|
||||
fallback_names: std::iter::once(generate_legacy_protocol_name(protocol_id).into())
|
||||
.collect(),
|
||||
max_request_size: 1 * 1024 * 1024,
|
||||
max_response_size: 16 * 1024 * 1024,
|
||||
request_timeout: Duration::from_secs(15),
|
||||
|
||||
@@ -28,7 +28,7 @@ use futures::{channel::mpsc, prelude::*};
|
||||
use libp2p::PeerId;
|
||||
use log::{debug, trace};
|
||||
use prost::Message;
|
||||
use sc_client_api::{ProofProvider, StorageProof};
|
||||
use sc_client_api::{BlockBackend, ProofProvider, StorageProof};
|
||||
use sc_network_common::{
|
||||
config::ProtocolId,
|
||||
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
|
||||
@@ -54,15 +54,27 @@ pub struct LightClientRequestHandler<B, Client> {
|
||||
impl<B, Client> LightClientRequestHandler<B, Client>
|
||||
where
|
||||
B: Block,
|
||||
Client: ProofProvider<B> + Send + Sync + 'static,
|
||||
Client: BlockBackend<B> + ProofProvider<B> + Send + Sync + 'static,
|
||||
{
|
||||
/// Create a new [`LightClientRequestHandler`].
|
||||
pub fn new(protocol_id: &ProtocolId, client: Arc<Client>) -> (Self, ProtocolConfig) {
|
||||
pub fn new(
|
||||
protocol_id: &ProtocolId,
|
||||
fork_id: Option<&str>,
|
||||
client: Arc<Client>,
|
||||
) -> (Self, ProtocolConfig) {
|
||||
// For now due to lack of data on light client request handling in production systems, this
|
||||
// value is chosen to match the block request limit.
|
||||
let (tx, request_receiver) = mpsc::channel(20);
|
||||
|
||||
let mut protocol_config = super::generate_protocol_config(protocol_id);
|
||||
let mut protocol_config = super::generate_protocol_config(
|
||||
protocol_id,
|
||||
client
|
||||
.block_hash(0u32.into())
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("Genesis block exists; qed"),
|
||||
fork_id,
|
||||
);
|
||||
protocol_config.inbound_queue = Some(tx);
|
||||
|
||||
(Self { client, request_receiver, _block: PhantomData::default() }, protocol_config)
|
||||
|
||||
@@ -87,9 +87,13 @@ where
|
||||
/// the network.
|
||||
pub transaction_pool: Arc<dyn TransactionPool<H, B>>,
|
||||
|
||||
/// Name of the protocol to use on the wire. Should be different for each chain.
|
||||
/// Legacy name of the protocol to use on the wire. Should be different for each chain.
|
||||
pub protocol_id: ProtocolId,
|
||||
|
||||
/// Fork ID to distinguish protocols of different hard forks. Part of the standard protocol
|
||||
/// name on the wire.
|
||||
pub fork_id: Option<String>,
|
||||
|
||||
/// Import queue to use.
|
||||
///
|
||||
/// The import queue is the component that verifies that blocks received from other nodes are
|
||||
|
||||
@@ -276,6 +276,7 @@ where
|
||||
roles: Roles,
|
||||
chain: Arc<Client>,
|
||||
protocol_id: ProtocolId,
|
||||
fork_id: &Option<String>,
|
||||
network_config: &config::NetworkConfiguration,
|
||||
notifications_protocols_handshakes: Vec<Vec<u8>>,
|
||||
metrics_registry: Option<&Registry>,
|
||||
@@ -371,8 +372,17 @@ where
|
||||
sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets })
|
||||
};
|
||||
|
||||
let block_announces_protocol: Cow<'static, str> =
|
||||
format!("/{}/block-announces/1", protocol_id.as_ref()).into();
|
||||
let block_announces_protocol = {
|
||||
let genesis_hash =
|
||||
chain.block_hash(0u32.into()).ok().flatten().expect("Genesis block exists; qed");
|
||||
if let Some(fork_id) = fork_id {
|
||||
format!("/{}/{}/block-announces/1", hex::encode(genesis_hash), fork_id)
|
||||
} else {
|
||||
format!("/{}/block-announces/1", hex::encode(genesis_hash))
|
||||
}
|
||||
};
|
||||
|
||||
let legacy_ba_protocol_name = format!("/{}/block-announces/1", protocol_id.as_ref());
|
||||
|
||||
let behaviour = {
|
||||
let best_number = info.best_number;
|
||||
@@ -384,8 +394,8 @@ where
|
||||
.encode();
|
||||
|
||||
let sync_protocol_config = notifications::ProtocolConfig {
|
||||
name: block_announces_protocol,
|
||||
fallback_names: Vec::new(),
|
||||
name: block_announces_protocol.into(),
|
||||
fallback_names: iter::once(legacy_ba_protocol_name.into()).collect(),
|
||||
handshake: block_announces_handshake,
|
||||
max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE,
|
||||
};
|
||||
|
||||
@@ -220,7 +220,9 @@ impl RequestResponsesBehaviour {
|
||||
max_request_size: protocol.max_request_size,
|
||||
max_response_size: protocol.max_response_size,
|
||||
},
|
||||
iter::once((protocol.name.as_bytes().to_vec(), protocol_support)),
|
||||
iter::once(protocol.name.as_bytes().to_vec())
|
||||
.chain(protocol.fallback_names.iter().map(|name| name.as_bytes().to_vec()))
|
||||
.zip(iter::repeat(protocol_support)),
|
||||
cfg,
|
||||
);
|
||||
|
||||
@@ -1027,6 +1029,7 @@ mod tests {
|
||||
|
||||
let protocol_config = ProtocolConfig {
|
||||
name: From::from(protocol_name),
|
||||
fallback_names: Vec::new(),
|
||||
max_request_size: 1024,
|
||||
max_response_size: 1024 * 1024,
|
||||
request_timeout: Duration::from_secs(30),
|
||||
@@ -1127,6 +1130,7 @@ mod tests {
|
||||
|
||||
let protocol_config = ProtocolConfig {
|
||||
name: From::from(protocol_name),
|
||||
fallback_names: Vec::new(),
|
||||
max_request_size: 1024,
|
||||
max_response_size: 8, // <-- important for the test
|
||||
request_timeout: Duration::from_secs(30),
|
||||
@@ -1223,6 +1227,7 @@ mod tests {
|
||||
let protocol_configs = vec![
|
||||
ProtocolConfig {
|
||||
name: From::from(protocol_name_1),
|
||||
fallback_names: Vec::new(),
|
||||
max_request_size: 1024,
|
||||
max_response_size: 1024 * 1024,
|
||||
request_timeout: Duration::from_secs(30),
|
||||
@@ -1230,6 +1235,7 @@ mod tests {
|
||||
},
|
||||
ProtocolConfig {
|
||||
name: From::from(protocol_name_2),
|
||||
fallback_names: Vec::new(),
|
||||
max_request_size: 1024,
|
||||
max_response_size: 1024 * 1024,
|
||||
request_timeout: Duration::from_secs(30),
|
||||
@@ -1247,6 +1253,7 @@ mod tests {
|
||||
let protocol_configs = vec![
|
||||
ProtocolConfig {
|
||||
name: From::from(protocol_name_1),
|
||||
fallback_names: Vec::new(),
|
||||
max_request_size: 1024,
|
||||
max_response_size: 1024 * 1024,
|
||||
request_timeout: Duration::from_secs(30),
|
||||
@@ -1254,6 +1261,7 @@ mod tests {
|
||||
},
|
||||
ProtocolConfig {
|
||||
name: From::from(protocol_name_2),
|
||||
fallback_names: Vec::new(),
|
||||
max_request_size: 1024,
|
||||
max_response_size: 1024 * 1024,
|
||||
request_timeout: Duration::from_secs(30),
|
||||
|
||||
@@ -224,8 +224,16 @@ where
|
||||
fs::create_dir_all(path)?;
|
||||
}
|
||||
|
||||
let transactions_handler_proto =
|
||||
transactions::TransactionsHandlerPrototype::new(params.protocol_id.clone());
|
||||
let transactions_handler_proto = transactions::TransactionsHandlerPrototype::new(
|
||||
params.protocol_id.clone(),
|
||||
params
|
||||
.chain
|
||||
.block_hash(0u32.into())
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("Genesis block exists; qed"),
|
||||
params.fork_id.clone(),
|
||||
);
|
||||
params
|
||||
.network_config
|
||||
.extra_sets
|
||||
@@ -243,6 +251,7 @@ where
|
||||
From::from(¶ms.role),
|
||||
params.chain.clone(),
|
||||
params.protocol_id.clone(),
|
||||
¶ms.fork_id,
|
||||
¶ms.network_config,
|
||||
iter::once(Vec::new())
|
||||
.chain(
|
||||
|
||||
@@ -92,21 +92,25 @@ fn build_test_full_node(
|
||||
|
||||
let protocol_id = ProtocolId::from("/test-protocol-name");
|
||||
|
||||
let fork_id = Some(String::from("test-fork-id"));
|
||||
|
||||
let block_request_protocol_config = {
|
||||
let (handler, protocol_config) = BlockRequestHandler::new(&protocol_id, client.clone(), 50);
|
||||
let (handler, protocol_config) =
|
||||
BlockRequestHandler::new(&protocol_id, None, client.clone(), 50);
|
||||
async_std::task::spawn(handler.run().boxed());
|
||||
protocol_config
|
||||
};
|
||||
|
||||
let state_request_protocol_config = {
|
||||
let (handler, protocol_config) = StateRequestHandler::new(&protocol_id, client.clone(), 50);
|
||||
let (handler, protocol_config) =
|
||||
StateRequestHandler::new(&protocol_id, None, client.clone(), 50);
|
||||
async_std::task::spawn(handler.run().boxed());
|
||||
protocol_config
|
||||
};
|
||||
|
||||
let light_client_request_protocol_config = {
|
||||
let (handler, protocol_config) =
|
||||
LightClientRequestHandler::new(&protocol_id, client.clone());
|
||||
LightClientRequestHandler::new(&protocol_id, None, client.clone());
|
||||
async_std::task::spawn(handler.run().boxed());
|
||||
protocol_config
|
||||
};
|
||||
@@ -134,6 +138,7 @@ fn build_test_full_node(
|
||||
chain: client.clone(),
|
||||
transaction_pool: Arc::new(config::EmptyTransactionPool),
|
||||
protocol_id,
|
||||
fork_id,
|
||||
import_queue,
|
||||
chain_sync: Box::new(chain_sync),
|
||||
metrics_registry: None,
|
||||
|
||||
@@ -127,19 +127,34 @@ impl<H: ExHashT> Future for PendingTransaction<H> {
|
||||
/// Prototype for a [`TransactionsHandler`].
|
||||
pub struct TransactionsHandlerPrototype {
|
||||
protocol_name: Cow<'static, str>,
|
||||
fallback_protocol_names: Vec<Cow<'static, str>>,
|
||||
}
|
||||
|
||||
impl TransactionsHandlerPrototype {
|
||||
/// Create a new instance.
|
||||
pub fn new(protocol_id: ProtocolId) -> Self {
|
||||
Self { protocol_name: format!("/{}/transactions/1", protocol_id.as_ref()).into() }
|
||||
pub fn new<Hash: AsRef<[u8]>>(
|
||||
protocol_id: ProtocolId,
|
||||
genesis_hash: Hash,
|
||||
fork_id: Option<String>,
|
||||
) -> Self {
|
||||
let protocol_name = if let Some(fork_id) = fork_id {
|
||||
format!("/{}/{}/transactions/1", hex::encode(genesis_hash), fork_id)
|
||||
} else {
|
||||
format!("/{}/transactions/1", hex::encode(genesis_hash))
|
||||
};
|
||||
let legacy_protocol_name = format!("/{}/transactions/1", protocol_id.as_ref());
|
||||
|
||||
Self {
|
||||
protocol_name: protocol_name.into(),
|
||||
fallback_protocol_names: iter::once(legacy_protocol_name.into()).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the configuration of the set to put in the network configuration.
|
||||
pub fn set_config(&self) -> config::NonDefaultSetConfig {
|
||||
config::NonDefaultSetConfig {
|
||||
notifications_protocol: self.protocol_name.clone(),
|
||||
fallback_names: Vec::new(),
|
||||
fallback_names: self.fallback_protocol_names.clone(),
|
||||
max_notification_size: MAX_TRANSACTIONS_SIZE,
|
||||
set_config: config::SetConfig {
|
||||
in_peers: 0,
|
||||
|
||||
@@ -21,6 +21,7 @@ codec = { package = "parity-scale-codec", version = "3.0.0", features = [
|
||||
"derive",
|
||||
] }
|
||||
futures = "0.3.21"
|
||||
hex = "0.4.0"
|
||||
libp2p = "0.46.1"
|
||||
log = "0.4.17"
|
||||
lru = "0.7.5"
|
||||
|
||||
@@ -62,9 +62,15 @@ mod rep {
|
||||
}
|
||||
|
||||
/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
|
||||
pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
|
||||
pub fn generate_protocol_config<Hash: AsRef<[u8]>>(
|
||||
protocol_id: &ProtocolId,
|
||||
genesis_hash: Hash,
|
||||
fork_id: Option<&str>,
|
||||
) -> ProtocolConfig {
|
||||
ProtocolConfig {
|
||||
name: generate_protocol_name(protocol_id).into(),
|
||||
name: generate_protocol_name(genesis_hash, fork_id).into(),
|
||||
fallback_names: std::iter::once(generate_legacy_protocol_name(protocol_id).into())
|
||||
.collect(),
|
||||
max_request_size: 1024 * 1024,
|
||||
max_response_size: 16 * 1024 * 1024,
|
||||
request_timeout: Duration::from_secs(20),
|
||||
@@ -72,8 +78,17 @@ pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the block protocol name from chain specific protocol identifier.
|
||||
fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
|
||||
/// Generate the block protocol name from the genesis hash and fork id.
|
||||
fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
|
||||
if let Some(fork_id) = fork_id {
|
||||
format!("/{}/{}/sync/2", hex::encode(genesis_hash), fork_id)
|
||||
} else {
|
||||
format!("/{}/sync/2", hex::encode(genesis_hash))
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the legacy block protocol name from chain specific protocol identifier.
|
||||
fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
|
||||
format!("/{}/sync/2", protocol_id.as_ref())
|
||||
}
|
||||
|
||||
@@ -129,6 +144,7 @@ where
|
||||
/// Create a new [`BlockRequestHandler`].
|
||||
pub fn new(
|
||||
protocol_id: &ProtocolId,
|
||||
fork_id: Option<&str>,
|
||||
client: Arc<Client>,
|
||||
num_peer_hint: usize,
|
||||
) -> (Self, ProtocolConfig) {
|
||||
@@ -136,7 +152,15 @@ where
|
||||
// number of peers.
|
||||
let (tx, request_receiver) = mpsc::channel(num_peer_hint);
|
||||
|
||||
let mut protocol_config = generate_protocol_config(protocol_id);
|
||||
let mut protocol_config = generate_protocol_config(
|
||||
protocol_id,
|
||||
client
|
||||
.block_hash(0u32.into())
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("Genesis block exists; qed"),
|
||||
fork_id,
|
||||
);
|
||||
protocol_config.inbound_queue = Some(tx);
|
||||
|
||||
let seen_requests = LruCache::new(num_peer_hint * 2);
|
||||
|
||||
@@ -27,7 +27,7 @@ use libp2p::PeerId;
|
||||
use log::{debug, trace};
|
||||
use lru::LruCache;
|
||||
use prost::Message;
|
||||
use sc_client_api::ProofProvider;
|
||||
use sc_client_api::{BlockBackend, ProofProvider};
|
||||
use sc_network_common::{
|
||||
config::ProtocolId,
|
||||
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
|
||||
@@ -50,10 +50,16 @@ mod rep {
|
||||
pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
|
||||
}
|
||||
|
||||
/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
|
||||
pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
|
||||
/// Generates a [`ProtocolConfig`] for the state request protocol, refusing incoming requests.
|
||||
pub fn generate_protocol_config<Hash: AsRef<[u8]>>(
|
||||
protocol_id: &ProtocolId,
|
||||
genesis_hash: Hash,
|
||||
fork_id: Option<&str>,
|
||||
) -> ProtocolConfig {
|
||||
ProtocolConfig {
|
||||
name: generate_protocol_name(protocol_id).into(),
|
||||
name: generate_protocol_name(genesis_hash, fork_id).into(),
|
||||
fallback_names: std::iter::once(generate_legacy_protocol_name(protocol_id).into())
|
||||
.collect(),
|
||||
max_request_size: 1024 * 1024,
|
||||
max_response_size: 16 * 1024 * 1024,
|
||||
request_timeout: Duration::from_secs(40),
|
||||
@@ -61,8 +67,17 @@ pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the state protocol name from chain specific protocol identifier.
|
||||
fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
|
||||
/// Generate the state protocol name from the genesis hash and fork id.
|
||||
fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
|
||||
if let Some(fork_id) = fork_id {
|
||||
format!("/{}/{}/state/2", hex::encode(genesis_hash), fork_id)
|
||||
} else {
|
||||
format!("/{}/state/2", hex::encode(genesis_hash))
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the legacy state protocol name from chain specific protocol identifier.
|
||||
fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
|
||||
format!("/{}/state/2", protocol_id.as_ref())
|
||||
}
|
||||
|
||||
@@ -104,11 +119,12 @@ pub struct StateRequestHandler<B: BlockT, Client> {
|
||||
impl<B, Client> StateRequestHandler<B, Client>
|
||||
where
|
||||
B: BlockT,
|
||||
Client: ProofProvider<B> + Send + Sync + 'static,
|
||||
Client: BlockBackend<B> + ProofProvider<B> + Send + Sync + 'static,
|
||||
{
|
||||
/// Create a new [`StateRequestHandler`].
|
||||
pub fn new(
|
||||
protocol_id: &ProtocolId,
|
||||
fork_id: Option<&str>,
|
||||
client: Arc<Client>,
|
||||
num_peer_hint: usize,
|
||||
) -> (Self, ProtocolConfig) {
|
||||
@@ -116,7 +132,15 @@ where
|
||||
// number of peers.
|
||||
let (tx, request_receiver) = mpsc::channel(num_peer_hint);
|
||||
|
||||
let mut protocol_config = generate_protocol_config(protocol_id);
|
||||
let mut protocol_config = generate_protocol_config(
|
||||
protocol_id,
|
||||
client
|
||||
.block_hash(0u32.into())
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("Genesis block exists; qed"),
|
||||
fork_id,
|
||||
);
|
||||
protocol_config.inbound_queue = Some(tx);
|
||||
|
||||
let seen_requests = LruCache::new(num_peer_hint * 2);
|
||||
|
||||
@@ -36,9 +36,15 @@ const MAX_RESPONSE_SIZE: u64 = 16 * 1024 * 1024;
|
||||
|
||||
/// Generates a [`RequestResponseConfig`] for the grandpa warp sync request protocol, refusing
|
||||
/// incoming requests.
|
||||
pub fn generate_request_response_config(protocol_id: ProtocolId) -> RequestResponseConfig {
|
||||
pub fn generate_request_response_config<Hash: AsRef<[u8]>>(
|
||||
protocol_id: ProtocolId,
|
||||
genesis_hash: Hash,
|
||||
fork_id: Option<&str>,
|
||||
) -> RequestResponseConfig {
|
||||
RequestResponseConfig {
|
||||
name: generate_protocol_name(protocol_id).into(),
|
||||
name: generate_protocol_name(genesis_hash, fork_id).into(),
|
||||
fallback_names: std::iter::once(generate_legacy_protocol_name(protocol_id).into())
|
||||
.collect(),
|
||||
max_request_size: 32,
|
||||
max_response_size: MAX_RESPONSE_SIZE,
|
||||
request_timeout: Duration::from_secs(10),
|
||||
@@ -46,8 +52,17 @@ pub fn generate_request_response_config(protocol_id: ProtocolId) -> RequestRespo
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the grandpa warp sync protocol name from chain specific protocol identifier.
|
||||
fn generate_protocol_name(protocol_id: ProtocolId) -> String {
|
||||
/// Generate the grandpa warp sync protocol name from the genesi hash and fork id.
|
||||
fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
|
||||
if let Some(fork_id) = fork_id {
|
||||
format!("/{}/{}/sync/warp", hex::encode(genesis_hash), fork_id)
|
||||
} else {
|
||||
format!("/{}/sync/warp", hex::encode(genesis_hash))
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the legacy grandpa warp sync protocol name from chain specific protocol identifier.
|
||||
fn generate_legacy_protocol_name(protocol_id: ProtocolId) -> String {
|
||||
format!("/{}/sync/warp", protocol_id.as_ref())
|
||||
}
|
||||
|
||||
@@ -59,13 +74,16 @@ pub struct RequestHandler<TBlock: BlockT> {
|
||||
|
||||
impl<TBlock: BlockT> RequestHandler<TBlock> {
|
||||
/// Create a new [`RequestHandler`].
|
||||
pub fn new(
|
||||
pub fn new<Hash: AsRef<[u8]>>(
|
||||
protocol_id: ProtocolId,
|
||||
genesis_hash: Hash,
|
||||
fork_id: Option<&str>,
|
||||
backend: Arc<dyn WarpSyncProvider<TBlock>>,
|
||||
) -> (Self, RequestResponseConfig) {
|
||||
let (tx, request_receiver) = mpsc::channel(20);
|
||||
|
||||
let mut request_response_config = generate_request_response_config(protocol_id);
|
||||
let mut request_response_config =
|
||||
generate_request_response_config(protocol_id, genesis_hash, fork_id);
|
||||
request_response_config.inbound_queue = Some(tx);
|
||||
|
||||
(Self { backend, request_receiver }, request_response_config)
|
||||
|
||||
@@ -808,23 +808,25 @@ where
|
||||
|
||||
let protocol_id = ProtocolId::from("test-protocol-name");
|
||||
|
||||
let fork_id = Some(String::from("test-fork-id"));
|
||||
|
||||
let block_request_protocol_config = {
|
||||
let (handler, protocol_config) =
|
||||
BlockRequestHandler::new(&protocol_id, client.clone(), 50);
|
||||
BlockRequestHandler::new(&protocol_id, None, client.clone(), 50);
|
||||
self.spawn_task(handler.run().boxed());
|
||||
protocol_config
|
||||
};
|
||||
|
||||
let state_request_protocol_config = {
|
||||
let (handler, protocol_config) =
|
||||
StateRequestHandler::new(&protocol_id, client.clone(), 50);
|
||||
StateRequestHandler::new(&protocol_id, None, client.clone(), 50);
|
||||
self.spawn_task(handler.run().boxed());
|
||||
protocol_config
|
||||
};
|
||||
|
||||
let light_client_request_protocol_config = {
|
||||
let (handler, protocol_config) =
|
||||
LightClientRequestHandler::new(&protocol_id, client.clone());
|
||||
LightClientRequestHandler::new(&protocol_id, None, client.clone());
|
||||
self.spawn_task(handler.run().boxed());
|
||||
protocol_config
|
||||
};
|
||||
@@ -832,8 +834,16 @@ where
|
||||
let warp_sync = Arc::new(TestWarpSyncProvider(client.clone()));
|
||||
|
||||
let warp_protocol_config = {
|
||||
let (handler, protocol_config) =
|
||||
warp_request_handler::RequestHandler::new(protocol_id.clone(), warp_sync.clone());
|
||||
let (handler, protocol_config) = warp_request_handler::RequestHandler::new(
|
||||
protocol_id.clone(),
|
||||
client
|
||||
.block_hash(0u32.into())
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("Genesis block exists; qed"),
|
||||
None,
|
||||
warp_sync.clone(),
|
||||
);
|
||||
self.spawn_task(handler.run().boxed());
|
||||
protocol_config
|
||||
};
|
||||
@@ -867,6 +877,7 @@ where
|
||||
chain: client.clone(),
|
||||
transaction_pool: Arc::new(EmptyTransactionPool),
|
||||
protocol_id,
|
||||
fork_id,
|
||||
import_queue,
|
||||
chain_sync: Box::new(chain_sync),
|
||||
metrics_registry: None,
|
||||
|
||||
@@ -741,6 +741,7 @@ where
|
||||
// Allow both outgoing and incoming requests.
|
||||
let (handler, protocol_config) = BlockRequestHandler::new(
|
||||
&protocol_id,
|
||||
config.chain_spec.fork_id(),
|
||||
client.clone(),
|
||||
config.network.default_peers_set.in_peers as usize +
|
||||
config.network.default_peers_set.out_peers as usize,
|
||||
@@ -753,6 +754,7 @@ where
|
||||
// Allow both outgoing and incoming requests.
|
||||
let (handler, protocol_config) = StateRequestHandler::new(
|
||||
&protocol_id,
|
||||
config.chain_spec.fork_id(),
|
||||
client.clone(),
|
||||
config.network.default_peers_set_num_full as usize,
|
||||
);
|
||||
@@ -763,8 +765,16 @@ where
|
||||
let (warp_sync_provider, warp_sync_protocol_config) = warp_sync
|
||||
.map(|provider| {
|
||||
// Allow both outgoing and incoming requests.
|
||||
let (handler, protocol_config) =
|
||||
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
|
||||
let (handler, protocol_config) = WarpSyncRequestHandler::new(
|
||||
protocol_id.clone(),
|
||||
client
|
||||
.block_hash(0u32.into())
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("Genesis block exists; qed"),
|
||||
config.chain_spec.fork_id(),
|
||||
provider.clone(),
|
||||
);
|
||||
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
|
||||
(Some(provider), Some(protocol_config))
|
||||
})
|
||||
@@ -772,8 +782,11 @@ where
|
||||
|
||||
let light_client_request_protocol_config = {
|
||||
// Allow both outgoing and incoming requests.
|
||||
let (handler, protocol_config) =
|
||||
LightClientRequestHandler::new(&protocol_id, client.clone());
|
||||
let (handler, protocol_config) = LightClientRequestHandler::new(
|
||||
&protocol_id,
|
||||
config.chain_spec.fork_id(),
|
||||
client.clone(),
|
||||
);
|
||||
spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
|
||||
protocol_config
|
||||
};
|
||||
@@ -808,6 +821,7 @@ where
|
||||
chain: client.clone(),
|
||||
transaction_pool: transaction_pool_adapter as _,
|
||||
protocol_id,
|
||||
fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),
|
||||
import_queue: Box::new(import_queue),
|
||||
chain_sync: Box::new(chain_sync),
|
||||
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
|
||||
|
||||
Reference in New Issue
Block a user