|
|
|
@@ -26,7 +26,6 @@ use lru::LruCache;
|
|
|
|
|
use libp2p::PeerId;
|
|
|
|
|
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
|
|
|
|
|
use sp_runtime::ConsensusEngineId;
|
|
|
|
|
pub use sc_network::message::generic::{Message, ConsensusMessage};
|
|
|
|
|
use sc_network::config::Roles;
|
|
|
|
|
use wasm_timer::Instant;
|
|
|
|
|
|
|
|
|
@@ -67,7 +66,8 @@ pub struct TopicNotification {
|
|
|
|
|
struct MessageEntry<B: BlockT> {
|
|
|
|
|
message_hash: B::Hash,
|
|
|
|
|
topic: B::Hash,
|
|
|
|
|
message: ConsensusMessage,
|
|
|
|
|
engine_id: ConsensusEngineId,
|
|
|
|
|
message: Vec<u8>,
|
|
|
|
|
sender: Option<PeerId>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -89,7 +89,8 @@ impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
|
|
|
|
|
self.gossip.multicast(
|
|
|
|
|
self.network,
|
|
|
|
|
topic,
|
|
|
|
|
ConsensusMessage{ data: message, engine_id: self.engine_id.clone() },
|
|
|
|
|
self.engine_id.clone(),
|
|
|
|
|
message,
|
|
|
|
|
force,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
@@ -113,11 +114,10 @@ fn propagate<'a, B: BlockT, I>(
|
|
|
|
|
validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
|
|
|
|
|
)
|
|
|
|
|
// (msg_hash, topic, message)
|
|
|
|
|
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>,
|
|
|
|
|
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, ConsensusEngineId, &'a Vec<u8>)>,
|
|
|
|
|
{
|
|
|
|
|
let mut check_fns = HashMap::new();
|
|
|
|
|
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| {
|
|
|
|
|
let engine_id = message.engine_id;
|
|
|
|
|
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, engine_id: ConsensusEngineId, message: &Vec<u8>| {
|
|
|
|
|
let check_fn = match check_fns.entry(engine_id) {
|
|
|
|
|
Entry::Occupied(entry) => entry.into_mut(),
|
|
|
|
|
Entry::Vacant(vacant) => match validators.get(&engine_id) {
|
|
|
|
@@ -126,11 +126,11 @@ fn propagate<'a, B: BlockT, I>(
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
(check_fn)(who, intent, topic, &message.data)
|
|
|
|
|
(check_fn)(who, intent, topic, &message)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
for (id, ref mut peer) in peers.iter_mut() {
|
|
|
|
|
for (message_hash, topic, message) in messages.clone() {
|
|
|
|
|
for (message_hash, topic, engine_id, message) in messages.clone() {
|
|
|
|
|
let intent = match intent {
|
|
|
|
|
MessageIntent::Broadcast { .. } =>
|
|
|
|
|
if peer.known_messages.contains(&message_hash) {
|
|
|
|
@@ -149,14 +149,14 @@ fn propagate<'a, B: BlockT, I>(
|
|
|
|
|
other => other,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if !message_allowed(id, intent, &topic, &message) {
|
|
|
|
|
if !message_allowed(id, intent, &topic, engine_id, &message) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
peer.known_messages.insert(message_hash.clone());
|
|
|
|
|
|
|
|
|
|
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
|
|
|
|
|
network.write_notification(id.clone(), message.engine_id, message.data.clone());
|
|
|
|
|
network.write_notification(id.clone(), engine_id, message.clone());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -225,13 +225,15 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
&mut self,
|
|
|
|
|
message_hash: B::Hash,
|
|
|
|
|
topic: B::Hash,
|
|
|
|
|
message: ConsensusMessage,
|
|
|
|
|
engine_id: ConsensusEngineId,
|
|
|
|
|
message: Vec<u8>,
|
|
|
|
|
sender: Option<PeerId>,
|
|
|
|
|
) {
|
|
|
|
|
if self.known_messages.put(message_hash.clone(), ()).is_none() {
|
|
|
|
|
self.messages.push(MessageEntry {
|
|
|
|
|
message_hash,
|
|
|
|
|
topic,
|
|
|
|
|
engine_id,
|
|
|
|
|
message,
|
|
|
|
|
sender,
|
|
|
|
|
});
|
|
|
|
@@ -246,10 +248,11 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
pub fn register_message(
|
|
|
|
|
&mut self,
|
|
|
|
|
topic: B::Hash,
|
|
|
|
|
message: ConsensusMessage,
|
|
|
|
|
engine_id: ConsensusEngineId,
|
|
|
|
|
message: Vec<u8>,
|
|
|
|
|
) {
|
|
|
|
|
let message_hash = HashFor::<B>::hash(&message.data[..]);
|
|
|
|
|
self.register_message_hashed(message_hash, topic, message, None);
|
|
|
|
|
let message_hash = HashFor::<B>::hash(&message[..]);
|
|
|
|
|
self.register_message_hashed(message_hash, topic, engine_id, message, None);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Call when a peer has been disconnected to stop tracking gossip status.
|
|
|
|
@@ -273,7 +276,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
/// Rebroadcast all messages to all peers.
|
|
|
|
|
fn rebroadcast(&mut self, network: &mut dyn Network<B>) {
|
|
|
|
|
let messages = self.messages.iter()
|
|
|
|
|
.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
|
|
|
|
|
.map(|entry| (&entry.message_hash, &entry.topic, entry.engine_id, &entry.message));
|
|
|
|
|
propagate(network, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -281,7 +284,9 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
pub fn broadcast_topic(&mut self, network: &mut dyn Network<B>, topic: B::Hash, force: bool) {
|
|
|
|
|
let messages = self.messages.iter()
|
|
|
|
|
.filter_map(|entry|
|
|
|
|
|
if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None }
|
|
|
|
|
if entry.topic == topic {
|
|
|
|
|
Some((&entry.message_hash, &entry.topic, entry.engine_id, &entry.message))
|
|
|
|
|
} else { None }
|
|
|
|
|
);
|
|
|
|
|
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
|
|
|
|
|
propagate(network, messages, intent, &mut self.peers, &self.validators);
|
|
|
|
@@ -301,7 +306,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
|
|
|
|
|
let mut check_fns = HashMap::new();
|
|
|
|
|
let mut message_expired = move |entry: &MessageEntry<B>| {
|
|
|
|
|
let engine_id = entry.message.engine_id;
|
|
|
|
|
let engine_id = entry.engine_id;
|
|
|
|
|
let check_fn = match check_fns.entry(engine_id) {
|
|
|
|
|
Entry::Occupied(entry) => entry.into_mut(),
|
|
|
|
|
Entry::Vacant(vacant) => match validators.get(&engine_id) {
|
|
|
|
@@ -310,7 +315,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
(check_fn)(entry.topic, &entry.message.data)
|
|
|
|
|
(check_fn)(entry.topic, &entry.message)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
self.messages.retain(|entry| !message_expired(entry));
|
|
|
|
@@ -332,10 +337,10 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
{
|
|
|
|
|
let (tx, rx) = mpsc::unbounded();
|
|
|
|
|
for entry in self.messages.iter_mut()
|
|
|
|
|
.filter(|e| e.topic == topic && e.message.engine_id == engine_id)
|
|
|
|
|
.filter(|e| e.topic == topic && e.engine_id == engine_id)
|
|
|
|
|
{
|
|
|
|
|
tx.unbounded_send(TopicNotification {
|
|
|
|
|
message: entry.message.data.clone(),
|
|
|
|
|
message: entry.message.clone(),
|
|
|
|
|
sender: entry.sender.clone(),
|
|
|
|
|
})
|
|
|
|
|
.expect("receiver known to be live; qed");
|
|
|
|
@@ -346,22 +351,22 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
rx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle an incoming ConsensusMessage for topic by who via protocol. Discard message if topic
|
|
|
|
|
/// already known, the message is old, its source peers isn't a registered peer or the connection
|
|
|
|
|
/// to them is broken. Return `Some(topic, message)` if it was added to the internal queue, `None`
|
|
|
|
|
/// Handle an incoming message for topic by who via protocol. Discard message if topic already
|
|
|
|
|
/// known, the message is old, its source peers isn't a registered peer or the connection to
|
|
|
|
|
/// them is broken. Return `Some(topic, message)` if it was added to the internal queue, `None`
|
|
|
|
|
/// in all other cases.
|
|
|
|
|
pub fn on_incoming(
|
|
|
|
|
&mut self,
|
|
|
|
|
network: &mut dyn Network<B>,
|
|
|
|
|
who: PeerId,
|
|
|
|
|
messages: Vec<ConsensusMessage>,
|
|
|
|
|
messages: Vec<(ConsensusEngineId, Vec<u8>)>,
|
|
|
|
|
) {
|
|
|
|
|
if !messages.is_empty() {
|
|
|
|
|
trace!(target: "gossip", "Received {} messages from peer {}", messages.len(), who);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for message in messages {
|
|
|
|
|
let message_hash = HashFor::<B>::hash(&message.data[..]);
|
|
|
|
|
for (engine_id, message) in messages {
|
|
|
|
|
let message_hash = HashFor::<B>::hash(&message[..]);
|
|
|
|
|
|
|
|
|
|
if self.known_messages.contains(&message_hash) {
|
|
|
|
|
trace!(target:"gossip", "Ignored already known message from {}", who);
|
|
|
|
@@ -369,13 +374,12 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let engine_id = message.engine_id;
|
|
|
|
|
// validate the message
|
|
|
|
|
let validation = self.validators.get(&engine_id)
|
|
|
|
|
.cloned()
|
|
|
|
|
.map(|v| {
|
|
|
|
|
let mut context = NetworkContext { gossip: self, network, engine_id };
|
|
|
|
|
v.validate(&mut context, &who, &message.data)
|
|
|
|
|
v.validate(&mut context, &who, &message)
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let validation_result = match validation {
|
|
|
|
@@ -398,7 +402,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
|
|
|
|
|
entry.get_mut().retain(|sink| {
|
|
|
|
|
if let Err(e) = sink.unbounded_send(TopicNotification {
|
|
|
|
|
message: message.data.clone(),
|
|
|
|
|
message: message.clone(),
|
|
|
|
|
sender: Some(who.clone())
|
|
|
|
|
}) {
|
|
|
|
|
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
|
|
|
|
@@ -410,7 +414,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if keep {
|
|
|
|
|
self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
|
|
|
|
|
self.register_message_hashed(message_hash, topic, engine_id, message, Some(who.clone()));
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
|
|
|
|
@@ -438,7 +442,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if let Some(ref mut peer) = self.peers.get_mut(who) {
|
|
|
|
|
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
|
|
|
|
|
for entry in self.messages.iter().filter(|m| m.topic == topic && m.engine_id == engine_id) {
|
|
|
|
|
let intent = if force {
|
|
|
|
|
MessageIntent::ForcedBroadcast
|
|
|
|
|
} else {
|
|
|
|
@@ -449,14 +453,14 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !message_allowed(who, intent, &entry.topic, &entry.message.data) {
|
|
|
|
|
if !message_allowed(who, intent, &entry.topic, &entry.message) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
peer.known_messages.insert(entry.message_hash.clone());
|
|
|
|
|
|
|
|
|
|
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
|
|
|
|
|
network.write_notification(who.clone(), engine_id, entry.message.data.clone());
|
|
|
|
|
network.write_notification(who.clone(), engine_id, entry.message.clone());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -466,13 +470,14 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
&mut self,
|
|
|
|
|
network: &mut dyn Network<B>,
|
|
|
|
|
topic: B::Hash,
|
|
|
|
|
message: ConsensusMessage,
|
|
|
|
|
engine_id: ConsensusEngineId,
|
|
|
|
|
message: Vec<u8>,
|
|
|
|
|
force: bool,
|
|
|
|
|
) {
|
|
|
|
|
let message_hash = HashFor::<B>::hash(&message.data);
|
|
|
|
|
self.register_message_hashed(message_hash, topic, message.clone(), None);
|
|
|
|
|
let message_hash = HashFor::<B>::hash(&message);
|
|
|
|
|
self.register_message_hashed(message_hash, topic, engine_id, message.clone(), None);
|
|
|
|
|
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
|
|
|
|
|
propagate(network, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
|
|
|
|
|
propagate(network, iter::once((&message_hash, &topic, engine_id, &message)), intent, &mut self.peers, &self.validators);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send addressed message to a peer. The message is not kept or multicast
|
|
|
|
@@ -481,19 +486,20 @@ impl<B: BlockT> ConsensusGossip<B> {
|
|
|
|
|
&mut self,
|
|
|
|
|
network: &mut dyn Network<B>,
|
|
|
|
|
who: &PeerId,
|
|
|
|
|
message: ConsensusMessage,
|
|
|
|
|
engine_id: ConsensusEngineId,
|
|
|
|
|
message: Vec<u8>,
|
|
|
|
|
) {
|
|
|
|
|
let peer = match self.peers.get_mut(who) {
|
|
|
|
|
None => return,
|
|
|
|
|
Some(peer) => peer,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let message_hash = HashFor::<B>::hash(&message.data);
|
|
|
|
|
let message_hash = HashFor::<B>::hash(&message);
|
|
|
|
|
|
|
|
|
|
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
|
|
|
|
|
|
|
|
|
|
peer.known_messages.insert(message_hash);
|
|
|
|
|
network.write_notification(who.clone(), message.engine_id, message.data);
|
|
|
|
|
network.write_notification(who.clone(), engine_id, message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -513,7 +519,8 @@ mod tests {
|
|
|
|
|
$consensus.messages.push(MessageEntry {
|
|
|
|
|
message_hash: $hash,
|
|
|
|
|
topic: $topic,
|
|
|
|
|
message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]},
|
|
|
|
|
engine_id: [0, 0, 0, 0],
|
|
|
|
|
message: $m,
|
|
|
|
|
sender: None,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
@@ -588,13 +595,14 @@ mod tests {
|
|
|
|
|
let mut consensus = ConsensusGossip::<Block>::new();
|
|
|
|
|
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
|
|
|
|
|
|
|
|
|
|
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
|
|
|
|
|
let engine_id = [0, 0, 0, 0];
|
|
|
|
|
let message = vec![4, 5, 6];
|
|
|
|
|
let topic = HashFor::<Block>::hash(&[1,2,3]);
|
|
|
|
|
|
|
|
|
|
consensus.register_message(topic, message.clone());
|
|
|
|
|
consensus.register_message(topic, engine_id, message.clone());
|
|
|
|
|
let mut stream = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
|
|
|
|
|
|
|
|
|
|
assert_eq!(stream.next(), Some(TopicNotification { message: message.data, sender: None }));
|
|
|
|
|
assert_eq!(stream.next(), Some(TopicNotification { message: message, sender: None }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
@@ -602,11 +610,11 @@ mod tests {
|
|
|
|
|
let mut consensus = ConsensusGossip::<Block>::new();
|
|
|
|
|
|
|
|
|
|
let topic = [1; 32].into();
|
|
|
|
|
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
|
|
|
|
|
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
|
|
|
|
|
let msg_a = vec![1, 2, 3];
|
|
|
|
|
let msg_b = vec![4, 5, 6];
|
|
|
|
|
|
|
|
|
|
consensus.register_message(topic, msg_a);
|
|
|
|
|
consensus.register_message(topic, msg_b);
|
|
|
|
|
consensus.register_message(topic, [0, 0, 0, 0], msg_a);
|
|
|
|
|
consensus.register_message(topic, [0, 0, 0, 0], msg_b);
|
|
|
|
|
|
|
|
|
|
assert_eq!(consensus.messages.len(), 2);
|
|
|
|
|
}
|
|
|
|
@@ -616,17 +624,16 @@ mod tests {
|
|
|
|
|
let mut consensus = ConsensusGossip::<Block>::new();
|
|
|
|
|
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
|
|
|
|
|
|
|
|
|
|
let data = vec![4, 5, 6];
|
|
|
|
|
let message = ConsensusMessage { data: data.clone(), engine_id: [0, 0, 0, 0] };
|
|
|
|
|
let message = vec![4, 5, 6];
|
|
|
|
|
let topic = HashFor::<Block>::hash(&[1, 2, 3]);
|
|
|
|
|
|
|
|
|
|
consensus.register_message(topic, message.clone());
|
|
|
|
|
consensus.register_message(topic, [0, 0, 0, 0], message.clone());
|
|
|
|
|
|
|
|
|
|
let mut stream1 = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
|
|
|
|
|
let mut stream2 = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
|
|
|
|
|
|
|
|
|
|
assert_eq!(stream1.next(), Some(TopicNotification { message: data.clone(), sender: None }));
|
|
|
|
|
assert_eq!(stream2.next(), Some(TopicNotification { message: data, sender: None }));
|
|
|
|
|
assert_eq!(stream1.next(), Some(TopicNotification { message: message.clone(), sender: None }));
|
|
|
|
|
assert_eq!(stream2.next(), Some(TopicNotification { message, sender: None }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
@@ -635,11 +642,11 @@ mod tests {
|
|
|
|
|
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
|
|
|
|
|
|
|
|
|
|
let topic = [1; 32].into();
|
|
|
|
|
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
|
|
|
|
|
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] };
|
|
|
|
|
let msg_a = vec![1, 2, 3];
|
|
|
|
|
let msg_b = vec![4, 5, 6];
|
|
|
|
|
|
|
|
|
|
consensus.register_message(topic, msg_a);
|
|
|
|
|
consensus.register_message(topic, msg_b);
|
|
|
|
|
consensus.register_message(topic, [0, 0, 0, 0], msg_a);
|
|
|
|
|
consensus.register_message(topic, [0, 0, 0, 1], msg_b);
|
|
|
|
|
|
|
|
|
|
let mut stream = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
|
|
|
|
|
|
|
|
|
|