Batch gossip messages (#4055)

This commit is contained in:
Arkadiy Paronyan
2019-11-08 21:34:45 +01:00
committed by Gavin Wood
parent c3f6e5bd40
commit 2cd3b2bce1
3 changed files with 113 additions and 77 deletions
+43 -16
View File
@@ -69,12 +69,14 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);
/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 4;
pub(crate) const CURRENT_VERSION: u32 = 5;
/// Lowest version we support
pub(crate) const MIN_VERSION: u32 = 3;
// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
// Maximum allowed entries in `ConsensusBatch`
const MAX_CONSENSUS_MESSAGES: usize = 256;
/// When light node connects to the full node and the full node is behind light node
/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful
/// and disconnect to free connection slot.
@@ -298,7 +300,7 @@ pub trait Context<B: BlockT> {
fn disconnect_peer(&mut self, who: PeerId);
/// Send a consensus message to a peer.
fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage);
fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>);
/// Send a chain-specific message to a peer.
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>);
@@ -330,13 +332,33 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
self.behaviour.disconnect_peer(&who)
}
fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) {
send_message::<B> (
self.behaviour,
&mut self.context_data.stats,
&who,
GenericMessage::Consensus(consensus)
)
fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>) {
if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 4) {
let mut batch = Vec::new();
let len = messages.len();
for (index, message) in messages.into_iter().enumerate() {
batch.reserve(MAX_CONSENSUS_MESSAGES);
batch.push(message);
if batch.len() == MAX_CONSENSUS_MESSAGES || index == len - 1 {
send_message::<B> (
self.behaviour,
&mut self.context_data.stats,
&who,
GenericMessage::ConsensusBatch(std::mem::replace(&mut batch, Vec::new())),
)
}
}
} else {
// Backwards compatibility
for message in messages {
send_message::<B> (
self.behaviour,
&mut self.context_data.stats,
&who,
GenericMessage::Consensus(message)
)
}
}
}
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) {
@@ -598,13 +620,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::RemoteReadChildRequest(request) =>
self.on_remote_read_child_request(who, request),
GenericMessage::Consensus(msg) => {
if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 2) {
self.consensus_gossip.on_incoming(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
msg,
);
}
self.consensus_gossip.on_incoming(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
vec![msg],
);
}
GenericMessage::ConsensusBatch(messages) => {
self.consensus_gossip.on_incoming(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
messages,
);
}
GenericMessage::ChainSpecific(msg) => self.specialization.on_message(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
@@ -171,10 +171,10 @@ impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
/// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.protocol.send_consensus(who.clone(), ConsensusMessage {
self.protocol.send_consensus(who.clone(), vec![ConsensusMessage {
engine_id: self.engine_id,
data: message,
});
}]);
}
/// Send all messages with given topic to a peer.
@@ -190,7 +190,7 @@ fn propagate<'a, B: BlockT, I>(
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
)
where I: IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
{
let mut check_fns = HashMap::new();
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| {
@@ -206,8 +206,9 @@ fn propagate<'a, B: BlockT, I>(
(check_fn)(who, intent, topic, &message.data)
};
for (message_hash, topic, message) in messages {
for (id, ref mut peer) in peers.iter_mut() {
for (id, ref mut peer) in peers.iter_mut() {
let mut batch = Vec::new();
for (message_hash, topic, message) in messages.clone() {
let previous_attempts = peer.filtered_messages
.get(&message_hash)
.cloned()
@@ -245,8 +246,9 @@ fn propagate<'a, B: BlockT, I>(
peer.known_messages.insert(message_hash.clone());
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
protocol.send_consensus(id.clone(), message.clone());
batch.push(message.clone())
}
protocol.send_consensus(id.clone(), batch);
}
}
@@ -477,65 +479,68 @@ impl<B: BlockT> ConsensusGossip<B> {
&mut self,
protocol: &mut dyn Context<B>,
who: PeerId,
message: ConsensusMessage,
messages: Vec<ConsensusMessage>,
) {
let message_hash = HashFor::<B>::hash(&message.data[..]);
trace!(target:"gossip", "Received {} messages from peer {}", messages.len(), who);
for message in messages {
let message_hash = HashFor::<B>::hash(&message.data[..]);
if self.known_messages.contains_key(&message_hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE);
return;
}
let engine_id = message.engine_id;
// validate the message
let validation = self.validators.get(&engine_id)
.cloned()
.map(|v| {
let mut context = NetworkContext { gossip: self, protocol, engine_id };
v.validate(&mut context, &who, &message.data)
});
let validation_result = match validation {
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)),
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),
Some(ValidationResult::Discard) => None,
None => {
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE);
protocol.disconnect_peer(who);
return;
if self.known_messages.contains_key(&message_hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE);
continue;
}
};
if let Some((topic, keep)) = validation_result {
protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
entry.get_mut().retain(|sink| {
if let Err(e) = sink.unbounded_send(TopicNotification {
message: message.data.clone(),
sender: Some(who.clone())
}) {
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
}
!sink.is_closed()
});
if entry.get().is_empty() {
entry.remove_entry();
}
let engine_id = message.engine_id;
// validate the message
let validation = self.validators.get(&engine_id)
.cloned()
.map(|v| {
let mut context = NetworkContext { gossip: self, protocol, engine_id };
v.validate(&mut context, &who, &message.data)
});
let validation_result = match validation {
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)),
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),
Some(ValidationResult::Discard) => None,
None => {
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE);
protocol.disconnect_peer(who.clone());
continue;
}
if keep {
self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
};
if let Some((topic, keep)) = validation_result {
protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
entry.get_mut().retain(|sink| {
if let Err(e) = sink.unbounded_send(TopicNotification {
message: message.data.clone(),
sender: Some(who.clone())
}) {
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
}
!sink.is_closed()
});
if entry.get().is_empty() {
entry.remove_entry();
}
}
if keep {
self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE);
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE);
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
}
} else {
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
}
}
@@ -555,6 +560,7 @@ impl<B: BlockT> ConsensusGossip<B> {
};
if let Some(ref mut peer) = self.peers.get_mut(who) {
let mut batch = Vec::new();
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
let intent = if force {
MessageIntent::ForcedBroadcast
@@ -585,11 +591,12 @@ impl<B: BlockT> ConsensusGossip<B> {
peer.known_messages.insert(entry.message_hash.clone());
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
protocol.send_consensus(who.clone(), ConsensusMessage {
batch.push(ConsensusMessage {
engine_id: engine_id.clone(),
data: entry.message.data.clone(),
});
}
protocol.send_consensus(who.clone(), batch);
}
}
@@ -626,8 +633,7 @@ impl<B: BlockT> ConsensusGossip<B> {
peer.filtered_messages.remove(&message_hash);
peer.known_messages.insert(message_hash);
protocol.send_consensus(who.clone(), message.clone());
protocol.send_consensus(who.clone(), vec![message.clone()]);
}
}
@@ -812,7 +818,7 @@ mod tests {
impl<B: BlockT> Context<B> for DummyNetworkContext {
fn report_peer(&mut self, _who: PeerId, _reputation: i32) {}
fn disconnect_peer(&mut self, _who: PeerId) {}
fn send_consensus(&mut self, _who: PeerId, _consensus: ConsensusMessage) {}
fn send_consensus(&mut self, _who: PeerId, _consensus: Vec<ConsensusMessage>) {}
fn send_chain_specific(&mut self, _who: PeerId, _message: Vec<u8>) {}
}
@@ -217,6 +217,8 @@ pub mod generic {
FinalityProofRequest(FinalityProofRequest<Hash>),
/// Finality proof reponse.
FinalityProofResponse(FinalityProofResponse<Hash>),
/// Batch of consensus protocol messages.
ConsensusBatch(Vec<ConsensusMessage>),
/// Chain-specific message.
#[codec(index = "255")]
ChainSpecific(Vec<u8>),
@@ -243,6 +245,7 @@ pub mod generic {
Message::RemoteReadChildRequest(_) => "RemoteReadChildRequest",
Message::FinalityProofRequest(_) => "FinalityProofRequest",
Message::FinalityProofResponse(_) => "FinalityProofResponse",
Message::ConsensusBatch(_) => "ConsensusBatch",
Message::ChainSpecific(_) => "ChainSpecific",
}
}