mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 20:57:59 +00:00
Make transactions and block announces use notifications substre… (#5360)
* Make transactions and block announces use notifications * Add documentation
This commit is contained in:
@@ -39,7 +39,7 @@ use sp_runtime::traits::{
|
||||
};
|
||||
use sp_arithmetic::traits::SaturatedConversion;
|
||||
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
|
||||
use message::generic::Message as GenericMessage;
|
||||
use message::generic::{Message as GenericMessage, ConsensusMessage};
|
||||
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
|
||||
use prometheus_endpoint::{Registry, Gauge, GaugeVec, PrometheusError, Opts, register, U64};
|
||||
use sync::{ChainSync, SyncState};
|
||||
@@ -221,8 +221,12 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
|
||||
behaviour: GenericProto,
|
||||
/// For each legacy gossiping engine ID, the corresponding new protocol name.
|
||||
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
|
||||
/// For each protocol name, the legacy gossiping engine ID.
|
||||
protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>,
|
||||
/// For each protocol name, the legacy equivalent.
|
||||
legacy_equiv_by_name: HashMap<Cow<'static, [u8]>, Fallback>,
|
||||
/// Name of the protocol used for transactions.
|
||||
transactions_protocol: Cow<'static, [u8]>,
|
||||
/// Name of the protocol used for block announces.
|
||||
block_announces_protocol: Cow<'static, [u8]>,
|
||||
/// Prometheus metrics.
|
||||
metrics: Option<Metrics>,
|
||||
/// The `PeerId`'s of all boot nodes.
|
||||
@@ -424,6 +428,17 @@ impl Default for ProtocolConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Fallback mechanism to use to send a notification if no substream is open.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
enum Fallback {
|
||||
/// Use a `Message::Consensus` with the given engine ID.
|
||||
Consensus(ConsensusEngineId),
|
||||
/// The message is the bytes encoding of a `Transactions<E>` (which is itself defined as a `Vec<E>`).
|
||||
Transactions,
|
||||
/// The message is the bytes encoding of a `BlockAnnounce<H>`.
|
||||
BlockAnnounce,
|
||||
}
|
||||
|
||||
impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
/// Create a new instance.
|
||||
pub fn new(
|
||||
@@ -460,7 +475,27 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
|
||||
let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
|
||||
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
|
||||
let behaviour = GenericProto::new(protocol_id, versions, peerset);
|
||||
let mut behaviour = GenericProto::new(protocol_id.clone(), versions, peerset);
|
||||
|
||||
let mut legacy_equiv_by_name = HashMap::new();
|
||||
|
||||
let transactions_protocol: Cow<'static, [u8]> = Cow::from({
|
||||
let mut proto = b"/".to_vec();
|
||||
proto.extend(protocol_id.as_bytes());
|
||||
proto.extend(b"/transactions/1");
|
||||
proto
|
||||
});
|
||||
behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
|
||||
legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);
|
||||
|
||||
let block_announces_protocol: Cow<'static, [u8]> = Cow::from({
|
||||
let mut proto = b"/".to_vec();
|
||||
proto.extend(protocol_id.as_bytes());
|
||||
proto.extend(b"/block-announces/1");
|
||||
proto
|
||||
});
|
||||
behaviour.register_notif_protocol(block_announces_protocol.clone(), Vec::new());
|
||||
legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);
|
||||
|
||||
let protocol = Protocol {
|
||||
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
|
||||
@@ -481,7 +516,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
peerset_handle: peerset_handle.clone(),
|
||||
behaviour,
|
||||
protocol_name_by_engine: HashMap::new(),
|
||||
protocol_engine_by_name: HashMap::new(),
|
||||
legacy_equiv_by_name,
|
||||
transactions_protocol,
|
||||
block_announces_protocol,
|
||||
metrics: if let Some(r) = metrics_registry {
|
||||
Some(Metrics::register(r)?)
|
||||
} else {
|
||||
@@ -731,12 +768,18 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
);
|
||||
}
|
||||
|
||||
fn send_message(&mut self, who: &PeerId, message: Message<B>) {
|
||||
fn send_message(
|
||||
&mut self,
|
||||
who: &PeerId,
|
||||
message: Option<(Cow<'static, [u8]>, Vec<u8>)>,
|
||||
legacy: Message<B>,
|
||||
) {
|
||||
send_message::<B>(
|
||||
&mut self.behaviour,
|
||||
&mut self.context_data.stats,
|
||||
who,
|
||||
message,
|
||||
legacy,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -793,11 +836,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_block_request(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
request: message::BlockRequest<B>
|
||||
) {
|
||||
fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest<B>) {
|
||||
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
|
||||
request.id,
|
||||
peer,
|
||||
@@ -874,7 +913,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
blocks: blocks,
|
||||
};
|
||||
trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len());
|
||||
self.send_message(&peer, GenericMessage::BlockResponse(response))
|
||||
self.send_message(&peer, None, GenericMessage::BlockResponse(response))
|
||||
}
|
||||
|
||||
/// Adjusts the reputation of a node.
|
||||
@@ -1132,10 +1171,15 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
&mut self,
|
||||
target: PeerId,
|
||||
engine_id: ConsensusEngineId,
|
||||
message: impl Into<Vec<u8>>
|
||||
message: impl Into<Vec<u8>>,
|
||||
) {
|
||||
if let Some(protocol_name) = self.protocol_name_by_engine.get(&engine_id) {
|
||||
self.behaviour.write_notification(&target, engine_id, protocol_name.clone(), message);
|
||||
let message = message.into();
|
||||
let fallback = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage {
|
||||
engine_id,
|
||||
data: message.clone(),
|
||||
}).encode();
|
||||
self.behaviour.write_notification(&target, protocol_name.clone(), message, fallback);
|
||||
} else {
|
||||
error!(
|
||||
target: "sub-libp2p",
|
||||
@@ -1158,8 +1202,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() {
|
||||
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name);
|
||||
} else {
|
||||
self.behaviour.register_notif_protocol(protocol_name.clone(), engine_id, Vec::new());
|
||||
self.protocol_engine_by_name.insert(protocol_name, engine_id);
|
||||
self.behaviour.register_notif_protocol(protocol_name.clone(), Vec::new());
|
||||
self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id));
|
||||
}
|
||||
|
||||
// Registering a protocol while we already have open connections isn't great, but for now
|
||||
@@ -1229,7 +1273,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
fn do_propagate_extrinsics(
|
||||
&mut self,
|
||||
extrinsics: &[(H, B::Extrinsic)],
|
||||
) -> HashMap<H, Vec<String>> {
|
||||
) -> HashMap<H, Vec<String>> {
|
||||
let mut propagated_to = HashMap::new();
|
||||
for (who, peer) in self.context_data.peers.iter_mut() {
|
||||
// never send extrinsics to the light node
|
||||
@@ -1251,10 +1295,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
.push(who.to_base58());
|
||||
}
|
||||
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
|
||||
let encoded = to_send.encode();
|
||||
send_message::<B> (
|
||||
&mut self.behaviour,
|
||||
&mut self.context_data.stats,
|
||||
&who,
|
||||
Some((self.transactions_protocol.clone(), encoded)),
|
||||
GenericMessage::Transactions(to_send)
|
||||
)
|
||||
}
|
||||
@@ -1309,7 +1355,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
|
||||
let inserted = peer.known_blocks.insert(hash);
|
||||
if inserted || force {
|
||||
let message: Message<B> = GenericMessage::BlockAnnounce(message::BlockAnnounce {
|
||||
let message = message::BlockAnnounce {
|
||||
header: header.clone(),
|
||||
state: if peer.info.protocol_version >= 4 {
|
||||
if is_best {
|
||||
@@ -1325,13 +1371,16 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
} else {
|
||||
None
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
let encoded = message.encode();
|
||||
|
||||
send_message::<B> (
|
||||
&mut self.behaviour,
|
||||
&mut self.context_data.stats,
|
||||
&who,
|
||||
message,
|
||||
Some((self.block_announces_protocol.clone(), encoded)),
|
||||
Message::<B>::BlockAnnounce(message),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1350,10 +1399,14 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
|
||||
};
|
||||
|
||||
self.send_message(&who, GenericMessage::Status(status))
|
||||
self.send_message(&who, None, GenericMessage::Status(status))
|
||||
}
|
||||
|
||||
fn on_block_announce(&mut self, who: PeerId, announce: BlockAnnounce<B::Header>) -> CustomMessageOutcome<B> {
|
||||
fn on_block_announce(
|
||||
&mut self,
|
||||
who: PeerId,
|
||||
announce: BlockAnnounce<B::Header>,
|
||||
) -> CustomMessageOutcome<B> {
|
||||
let hash = announce.header.hash();
|
||||
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
|
||||
peer.known_blocks.insert(hash.clone());
|
||||
@@ -1468,6 +1521,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
|
||||
self.send_message(
|
||||
&who,
|
||||
None,
|
||||
GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
|
||||
id: request.id,
|
||||
proof,
|
||||
@@ -1598,6 +1652,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
};
|
||||
self.send_message(
|
||||
&who,
|
||||
None,
|
||||
GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
|
||||
id: request.id,
|
||||
proof,
|
||||
@@ -1662,6 +1717,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
};
|
||||
self.send_message(
|
||||
&who,
|
||||
None,
|
||||
GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
|
||||
id: request.id,
|
||||
proof,
|
||||
@@ -1702,6 +1758,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
};
|
||||
self.send_message(
|
||||
&who,
|
||||
None,
|
||||
GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse {
|
||||
id: request.id,
|
||||
header,
|
||||
@@ -1772,6 +1829,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
};
|
||||
self.send_message(
|
||||
&who,
|
||||
None,
|
||||
GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse {
|
||||
id: request.id,
|
||||
max: proof.max_block,
|
||||
@@ -1822,6 +1880,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
};
|
||||
self.send_message(
|
||||
&who,
|
||||
None,
|
||||
GenericMessage::FinalityProofResponse(message::FinalityProofResponse {
|
||||
id: 0,
|
||||
block: request.block,
|
||||
@@ -1951,20 +2010,25 @@ fn send_request<B: BlockT, H: ExHashT>(
|
||||
peer.block_request = Some((Instant::now(), r.clone()));
|
||||
}
|
||||
}
|
||||
send_message::<B>(behaviour, stats, who, message)
|
||||
send_message::<B>(behaviour, stats, who, None, message)
|
||||
}
|
||||
|
||||
fn send_message<B: BlockT>(
|
||||
behaviour: &mut GenericProto,
|
||||
stats: &mut HashMap<&'static str, PacketStats>,
|
||||
who: &PeerId,
|
||||
message: Message<B>,
|
||||
message: Option<(Cow<'static, [u8]>, Vec<u8>)>,
|
||||
legacy_message: Message<B>,
|
||||
) {
|
||||
let encoded = message.encode();
|
||||
let mut stats = stats.entry(message.id()).or_default();
|
||||
let encoded = legacy_message.encode();
|
||||
let mut stats = stats.entry(legacy_message.id()).or_default();
|
||||
stats.bytes_out += encoded.len() as u64;
|
||||
stats.count_out += 1;
|
||||
behaviour.send_packet(who, encoded);
|
||||
if let Some((proto, msg)) = message {
|
||||
behaviour.write_notification(who, proto, msg, encoded);
|
||||
} else {
|
||||
behaviour.send_packet(who, encoded);
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
|
||||
@@ -2061,8 +2125,39 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
|
||||
GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
|
||||
self.on_peer_disconnected(peer_id.clone())
|
||||
},
|
||||
GenericProtoOut::CustomMessage { peer_id, message } =>
|
||||
GenericProtoOut::LegacyMessage { peer_id, message } =>
|
||||
self.on_custom_message(peer_id, message),
|
||||
GenericProtoOut::Notification { peer_id, protocol_name, message } =>
|
||||
match self.legacy_equiv_by_name.get(&protocol_name) {
|
||||
Some(Fallback::Consensus(engine_id)) => {
|
||||
CustomMessageOutcome::NotificationsReceived {
|
||||
remote: peer_id,
|
||||
messages: vec![(*engine_id, message.freeze())],
|
||||
}
|
||||
}
|
||||
Some(Fallback::Transactions) => {
|
||||
if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) {
|
||||
self.on_extrinsics(peer_id, m);
|
||||
} else {
|
||||
warn!(target: "sub-libp2p", "Failed to decode transactions list");
|
||||
}
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
Some(Fallback::BlockAnnounce) => {
|
||||
if let Ok(announce) = message::BlockAnnounce::decode(&mut message.as_ref()) {
|
||||
let outcome = self.on_block_announce(peer_id.clone(), announce);
|
||||
self.update_peer_info(&peer_id);
|
||||
outcome
|
||||
} else {
|
||||
warn!(target: "sub-libp2p", "Failed to decode block announce");
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
}
|
||||
None => {
|
||||
error!(target: "sub-libp2p", "Received notification from unknown protocol {:?}", protocol_name);
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
}
|
||||
GenericProtoOut::Clogged { peer_id, messages } => {
|
||||
debug!(target: "sync", "{} clogging messages:", messages.len());
|
||||
for msg in messages.into_iter().take(5) {
|
||||
|
||||
Reference in New Issue
Block a user