Remove HashMap<EngineId, ...> from consensus-gossip (#5553)

This commit is contained in:
Pierre Krieger
2020-04-13 09:27:51 +02:00
committed by GitHub
parent 309fdf3b3f
commit 1e1b066817
2 changed files with 71 additions and 171 deletions
+8 -12
View File
@@ -40,22 +40,18 @@ impl<B: BlockT> Unpin for GossipEngine<B> {}
impl<B: BlockT> GossipEngine<B> { impl<B: BlockT> GossipEngine<B> {
/// Create a new instance. /// Create a new instance.
pub fn new<N: Network<B> + Send + Clone + 'static>( pub fn new<N: Network<B> + Send + Clone + 'static>(
mut network: N, network: N,
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>, protocol_name: impl Into<Cow<'static, [u8]>>,
validator: Arc<dyn Validator<B>>, validator: Arc<dyn Validator<B>>,
) -> Self where B: 'static { ) -> Self where B: 'static {
let mut state_machine = ConsensusGossip::new();
// We grab the event stream before registering the notifications protocol, otherwise we // We grab the event stream before registering the notifications protocol, otherwise we
// might miss events. // might miss events.
let network_event_stream = network.event_stream(); let network_event_stream = network.event_stream();
network.register_notifications_protocol(engine_id, protocol_name.into()); network.register_notifications_protocol(engine_id, protocol_name.into());
state_machine.register_validator(&mut network, engine_id, validator);
GossipEngine { GossipEngine {
state_machine, state_machine: ConsensusGossip::new(validator, engine_id),
network: Box::new(network), network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL), periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
network_event_stream, network_event_stream,
@@ -77,7 +73,7 @@ impl<B: BlockT> GossipEngine<B> {
topic: B::Hash, topic: B::Hash,
message: Vec<u8>, message: Vec<u8>,
) { ) {
self.state_machine.register_message(topic, self.engine_id, message); self.state_machine.register_message(topic, message);
} }
/// Broadcast all messages with given topic. /// Broadcast all messages with given topic.
@@ -89,7 +85,7 @@ impl<B: BlockT> GossipEngine<B> {
pub fn messages_for(&mut self, topic: B::Hash) pub fn messages_for(&mut self, topic: B::Hash)
-> TracingUnboundedReceiver<TopicNotification> -> TracingUnboundedReceiver<TopicNotification>
{ {
self.state_machine.messages_for(self.engine_id, topic) self.state_machine.messages_for(topic)
} }
/// Send all messages with given topic to a peer. /// Send all messages with given topic to a peer.
@@ -99,7 +95,7 @@ impl<B: BlockT> GossipEngine<B> {
topic: B::Hash, topic: B::Hash,
force: bool force: bool
) { ) {
self.state_machine.send_topic(&mut *self.network, who, topic, self.engine_id, force) self.state_machine.send_topic(&mut *self.network, who, topic, force)
} }
/// Multicast a message to all peers. /// Multicast a message to all peers.
@@ -109,14 +105,14 @@ impl<B: BlockT> GossipEngine<B> {
message: Vec<u8>, message: Vec<u8>,
force: bool, force: bool,
) { ) {
self.state_machine.multicast(&mut *self.network, topic, self.engine_id, message, force) self.state_machine.multicast(&mut *self.network, topic, message, force)
} }
/// Send addressed message to the given peers. The message is not kept or multicast /// Send addressed message to the given peers. The message is not kept or multicast
/// later on. /// later on.
pub fn send_message(&mut self, who: Vec<sc_network::PeerId>, data: Vec<u8>) { pub fn send_message(&mut self, who: Vec<sc_network::PeerId>, data: Vec<u8>) {
for who in &who { for who in &who {
self.state_machine.send_message(&mut *self.network, who, self.engine_id, data.clone()); self.state_machine.send_message(&mut *self.network, who, data.clone());
} }
} }
@@ -157,7 +153,7 @@ impl<B: BlockT> Future for GossipEngine<B> {
remote, remote,
messages.into_iter() messages.into_iter()
.filter_map(|(engine, data)| if engine == engine_id { .filter_map(|(engine, data)| if engine == engine_id {
Some((engine, data.to_vec())) Some(data.to_vec())
} else { None }) } else { None })
.collect() .collect()
); );
@@ -42,16 +42,12 @@ mod rep {
pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successfull gossip"); pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successfull gossip");
/// Reputation change when a peer sends us a gossip message that we already knew about. /// Reputation change when a peer sends us a gossip message that we already knew about.
pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip"); pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip");
/// Reputation change when a peer sends us a gossip message for an unknown engine, whatever that
/// means.
pub const UNKNOWN_GOSSIP: Rep = Rep::new(-(1 << 6), "Unknown gossip message engine id");
/// Reputation change when a peer sends a message from a topic it isn't registered on. /// Reputation change when a peer sends a message from a topic it isn't registered on.
pub const UNREGISTERED_TOPIC: Rep = Rep::new(-(1 << 10), "Unregistered gossip message topic"); pub const UNREGISTERED_TOPIC: Rep = Rep::new(-(1 << 10), "Unregistered gossip message topic");
} }
struct PeerConsensus<H> { struct PeerConsensus<H> {
known_messages: HashSet<H>, known_messages: HashSet<H>,
role: ObservedRole,
} }
/// Topic stream message with sender. /// Topic stream message with sender.
@@ -66,7 +62,6 @@ pub struct TopicNotification {
struct MessageEntry<B: BlockT> { struct MessageEntry<B: BlockT> {
message_hash: B::Hash, message_hash: B::Hash,
topic: B::Hash, topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>, message: Vec<u8>,
sender: Option<PeerId>, sender: Option<PeerId>,
} }
@@ -75,7 +70,6 @@ struct MessageEntry<B: BlockT> {
struct NetworkContext<'g, 'p, B: BlockT> { struct NetworkContext<'g, 'p, B: BlockT> {
gossip: &'g mut ConsensusGossip<B>, gossip: &'g mut ConsensusGossip<B>,
network: &'p mut dyn Network<B>, network: &'p mut dyn Network<B>,
engine_id: ConsensusEngineId,
} }
impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> { impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
@@ -89,7 +83,6 @@ impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
self.gossip.multicast( self.gossip.multicast(
self.network, self.network,
topic, topic,
self.engine_id.clone(),
message, message,
force, force,
); );
@@ -97,40 +90,30 @@ impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
/// Send addressed message to a peer. /// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) { fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.network.write_notification(who.clone(), self.engine_id, message); self.network.write_notification(who.clone(), self.gossip.engine_id, message);
} }
/// Send all messages with given topic to a peer. /// Send all messages with given topic to a peer.
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) { fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
self.gossip.send_topic(self.network, who, topic, self.engine_id, force); self.gossip.send_topic(self.network, who, topic, force);
} }
} }
fn propagate<'a, B: BlockT, I>( fn propagate<'a, B: BlockT, I>(
network: &mut dyn Network<B>, network: &mut dyn Network<B>,
engine_id: ConsensusEngineId,
messages: I, messages: I,
intent: MessageIntent, intent: MessageIntent,
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>, peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>, validator: &Arc<dyn Validator<B>>,
) )
// (msg_hash, topic, message) // (msg_hash, topic, message)
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, ConsensusEngineId, &'a Vec<u8>)>, where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a Vec<u8>)>,
{ {
let mut check_fns = HashMap::new(); let mut message_allowed = validator.message_allowed();
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, engine_id: ConsensusEngineId, message: &Vec<u8>| {
let check_fn = match check_fns.entry(engine_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(vacant) => match validators.get(&engine_id) {
None => return false, // treat all messages with no validator as not allowed
Some(validator) => vacant.insert(validator.message_allowed()),
}
};
(check_fn)(who, intent, topic, &message)
};
for (id, ref mut peer) in peers.iter_mut() { for (id, ref mut peer) in peers.iter_mut() {
for (message_hash, topic, engine_id, message) in messages.clone() { for (message_hash, topic, message) in messages.clone() {
let intent = match intent { let intent = match intent {
MessageIntent::Broadcast { .. } => MessageIntent::Broadcast { .. } =>
if peer.known_messages.contains(&message_hash) { if peer.known_messages.contains(&message_hash) {
@@ -149,7 +132,7 @@ fn propagate<'a, B: BlockT, I>(
other => other, other => other,
}; };
if !message_allowed(id, intent, &topic, engine_id, &message) { if !message_allowed(id, intent, &topic, &message) {
continue; continue;
} }
@@ -164,45 +147,28 @@ fn propagate<'a, B: BlockT, I>(
/// Consensus network protocol handler. Manages statements and candidate requests. /// Consensus network protocol handler. Manages statements and candidate requests.
pub struct ConsensusGossip<B: BlockT> { pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<PeerId, PeerConsensus<B::Hash>>, peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec<TracingUnboundedSender<TopicNotification>>>, live_message_sinks: HashMap<B::Hash, Vec<TracingUnboundedSender<TopicNotification>>>,
messages: Vec<MessageEntry<B>>, messages: Vec<MessageEntry<B>>,
known_messages: LruCache<B::Hash, ()>, known_messages: LruCache<B::Hash, ()>,
validators: HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>, engine_id: ConsensusEngineId,
validator: Arc<dyn Validator<B>>,
next_broadcast: Instant, next_broadcast: Instant,
} }
impl<B: BlockT> ConsensusGossip<B> { impl<B: BlockT> ConsensusGossip<B> {
/// Create a new instance. /// Create a new instance using the given validator.
pub fn new() -> Self { pub fn new(validator: Arc<dyn Validator<B>>, engine_id: ConsensusEngineId) -> Self {
ConsensusGossip { ConsensusGossip {
peers: HashMap::new(), peers: HashMap::new(),
live_message_sinks: HashMap::new(), live_message_sinks: HashMap::new(),
messages: Default::default(), messages: Default::default(),
known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE), known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
validators: Default::default(), engine_id,
validator,
next_broadcast: Instant::now() + REBROADCAST_INTERVAL, next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
} }
} }
/// Register message validator for a message type.
pub fn register_validator(
&mut self,
network: &mut dyn Network<B>,
engine_id: ConsensusEngineId,
validator: Arc<dyn Validator<B>>
) {
self.register_validator_internal(engine_id, validator.clone());
let peers: Vec<_> = self.peers.iter().map(|(id, peer)| (id.clone(), peer.role.clone())).collect();
for (id, role) in peers {
let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
validator.new_peer(&mut context, &id, role);
}
}
fn register_validator_internal(&mut self, engine_id: ConsensusEngineId, validator: Arc<dyn Validator<B>>) {
self.validators.insert(engine_id, validator.clone());
}
/// Handle new connected peer. /// Handle new connected peer.
pub fn new_peer(&mut self, network: &mut dyn Network<B>, who: PeerId, role: ObservedRole) { pub fn new_peer(&mut self, network: &mut dyn Network<B>, who: PeerId, role: ObservedRole) {
// light nodes are not valid targets for consensus gossip messages // light nodes are not valid targets for consensus gossip messages
@@ -213,19 +179,17 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target:"gossip", "Registering {:?} {}", role, who); trace!(target:"gossip", "Registering {:?} {}", role, who);
self.peers.insert(who.clone(), PeerConsensus { self.peers.insert(who.clone(), PeerConsensus {
known_messages: HashSet::new(), known_messages: HashSet::new(),
role: role.clone(),
}); });
for (engine_id, v) in self.validators.clone() {
let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() }; let validator = self.validator.clone();
v.new_peer(&mut context, &who, role.clone()); let mut context = NetworkContext { gossip: self, network };
} validator.new_peer(&mut context, &who, role.clone());
} }
fn register_message_hashed( fn register_message_hashed(
&mut self, &mut self,
message_hash: B::Hash, message_hash: B::Hash,
topic: B::Hash, topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>, message: Vec<u8>,
sender: Option<PeerId>, sender: Option<PeerId>,
) { ) {
@@ -233,7 +197,6 @@ impl<B: BlockT> ConsensusGossip<B> {
self.messages.push(MessageEntry { self.messages.push(MessageEntry {
message_hash, message_hash,
topic, topic,
engine_id,
message, message,
sender, sender,
}); });
@@ -248,19 +211,17 @@ impl<B: BlockT> ConsensusGossip<B> {
pub fn register_message( pub fn register_message(
&mut self, &mut self,
topic: B::Hash, topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>, message: Vec<u8>,
) { ) {
let message_hash = HashFor::<B>::hash(&message[..]); let message_hash = HashFor::<B>::hash(&message[..]);
self.register_message_hashed(message_hash, topic, engine_id, message, None); self.register_message_hashed(message_hash, topic, message, None);
} }
/// Call when a peer has been disconnected to stop tracking gossip status. /// Call when a peer has been disconnected to stop tracking gossip status.
pub fn peer_disconnected(&mut self, network: &mut dyn Network<B>, who: PeerId) { pub fn peer_disconnected(&mut self, network: &mut dyn Network<B>, who: PeerId) {
for (engine_id, v) in self.validators.clone() { let validator = self.validator.clone();
let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() }; let mut context = NetworkContext { gossip: self, network };
v.peer_disconnected(&mut context, &who); validator.peer_disconnected(&mut context, &who);
}
self.peers.remove(&who); self.peers.remove(&who);
} }
@@ -276,8 +237,8 @@ impl<B: BlockT> ConsensusGossip<B> {
/// Rebroadcast all messages to all peers. /// Rebroadcast all messages to all peers.
fn rebroadcast(&mut self, network: &mut dyn Network<B>) { fn rebroadcast(&mut self, network: &mut dyn Network<B>) {
let messages = self.messages.iter() let messages = self.messages.iter()
.map(|entry| (&entry.message_hash, &entry.topic, entry.engine_id, &entry.message)); .map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
propagate(network, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators); propagate(network, self.engine_id, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validator);
} }
/// Broadcast all messages with given topic. /// Broadcast all messages with given topic.
@@ -285,11 +246,11 @@ impl<B: BlockT> ConsensusGossip<B> {
let messages = self.messages.iter() let messages = self.messages.iter()
.filter_map(|entry| .filter_map(|entry|
if entry.topic == topic { if entry.topic == topic {
Some((&entry.message_hash, &entry.topic, entry.engine_id, &entry.message)) Some((&entry.message_hash, &entry.topic, &entry.message))
} else { None } } else { None }
); );
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(network, messages, intent, &mut self.peers, &self.validators); propagate(network, self.engine_id, messages, intent, &mut self.peers, &self.validator);
} }
/// Prune old or no longer relevant consensus messages. Provide a predicate /// Prune old or no longer relevant consensus messages. Provide a predicate
@@ -302,23 +263,9 @@ impl<B: BlockT> ConsensusGossip<B> {
let known_messages = &mut self.known_messages; let known_messages = &mut self.known_messages;
let before = self.messages.len(); let before = self.messages.len();
let validators = &self.validators;
let mut check_fns = HashMap::new(); let mut message_expired = self.validator.message_expired();
let mut message_expired = move |entry: &MessageEntry<B>| { self.messages.retain(|entry| !message_expired(entry.topic, &entry.message));
let engine_id = entry.engine_id;
let check_fn = match check_fns.entry(engine_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(vacant) => match validators.get(&engine_id) {
None => return true, // treat all messages with no validator as expired
Some(validator) => vacant.insert(validator.message_expired()),
}
};
(check_fn)(entry.topic, &entry.message)
};
self.messages.retain(|entry| !message_expired(entry));
trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
before - self.messages.len(), before - self.messages.len(),
@@ -332,13 +279,11 @@ impl<B: BlockT> ConsensusGossip<B> {
} }
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile) /// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash) pub fn messages_for(&mut self, topic: B::Hash)
-> TracingUnboundedReceiver<TopicNotification> -> TracingUnboundedReceiver<TopicNotification>
{ {
let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for"); let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for");
for entry in self.messages.iter_mut() for entry in self.messages.iter_mut().filter(|e| e.topic == topic) {
.filter(|e| e.topic == topic && e.engine_id == engine_id)
{
tx.unbounded_send(TopicNotification { tx.unbounded_send(TopicNotification {
message: entry.message.clone(), message: entry.message.clone(),
sender: entry.sender.clone(), sender: entry.sender.clone(),
@@ -346,7 +291,7 @@ impl<B: BlockT> ConsensusGossip<B> {
.expect("receiver known to be live; qed"); .expect("receiver known to be live; qed");
} }
self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); self.live_message_sinks.entry(topic).or_default().push(tx);
rx rx
} }
@@ -358,13 +303,13 @@ impl<B: BlockT> ConsensusGossip<B> {
&mut self, &mut self,
network: &mut dyn Network<B>, network: &mut dyn Network<B>,
who: PeerId, who: PeerId,
messages: Vec<(ConsensusEngineId, Vec<u8>)>, messages: Vec<Vec<u8>>,
) { ) {
if !messages.is_empty() { if !messages.is_empty() {
trace!(target: "gossip", "Received {} messages from peer {}", messages.len(), who); trace!(target: "gossip", "Received {} messages from peer {}", messages.len(), who);
} }
for (engine_id, message) in messages { for message in messages {
let message_hash = HashFor::<B>::hash(&message[..]); let message_hash = HashFor::<B>::hash(&message[..]);
if self.known_messages.contains(&message_hash) { if self.known_messages.contains(&message_hash) {
@@ -374,30 +319,23 @@ impl<B: BlockT> ConsensusGossip<B> {
} }
// validate the message // validate the message
let validation = self.validators.get(&engine_id) let validation = {
.cloned() let validator = self.validator.clone();
.map(|v| { let mut context = NetworkContext { gossip: self, network };
let mut context = NetworkContext { gossip: self, network, engine_id }; validator.validate(&mut context, &who, &message)
v.validate(&mut context, &who, &message) };
});
let validation_result = match validation { let validation_result = match validation {
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)), ValidationResult::ProcessAndKeep(topic) => Some((topic, true)),
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)), ValidationResult::ProcessAndDiscard(topic) => Some((topic, false)),
Some(ValidationResult::Discard) => None, ValidationResult::Discard => None,
None => {
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
network.report_peer(who.clone(), rep::UNKNOWN_GOSSIP);
network.disconnect_peer(who.clone());
continue;
}
}; };
if let Some((topic, keep)) = validation_result { if let Some((topic, keep)) = validation_result {
network.report_peer(who.clone(), rep::GOSSIP_SUCCESS); network.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
if let Some(ref mut peer) = self.peers.get_mut(&who) { if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(message_hash); peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) {
trace!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); trace!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
entry.get_mut().retain(|sink| { entry.get_mut().retain(|sink| {
if let Err(e) = sink.unbounded_send(TopicNotification { if let Err(e) = sink.unbounded_send(TopicNotification {
@@ -413,7 +351,7 @@ impl<B: BlockT> ConsensusGossip<B> {
} }
} }
if keep { if keep {
self.register_message_hashed(message_hash, topic, engine_id, message, Some(who.clone())); self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
} }
} else { } else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
@@ -431,17 +369,12 @@ impl<B: BlockT> ConsensusGossip<B> {
network: &mut dyn Network<B>, network: &mut dyn Network<B>,
who: &PeerId, who: &PeerId,
topic: B::Hash, topic: B::Hash,
engine_id: ConsensusEngineId,
force: bool force: bool
) { ) {
let validator = self.validators.get(&engine_id); let mut message_allowed = self.validator.message_allowed();
let mut message_allowed = match validator {
None => return, // treat all messages with no validator as not allowed
Some(validator) => validator.message_allowed(),
};
if let Some(ref mut peer) = self.peers.get_mut(who) { if let Some(ref mut peer) = self.peers.get_mut(who) {
for entry in self.messages.iter().filter(|m| m.topic == topic && m.engine_id == engine_id) { for entry in self.messages.iter().filter(|m| m.topic == topic) {
let intent = if force { let intent = if force {
MessageIntent::ForcedBroadcast MessageIntent::ForcedBroadcast
} else { } else {
@@ -459,7 +392,7 @@ impl<B: BlockT> ConsensusGossip<B> {
peer.known_messages.insert(entry.message_hash.clone()); peer.known_messages.insert(entry.message_hash.clone());
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message); trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
network.write_notification(who.clone(), engine_id, entry.message.clone()); network.write_notification(who.clone(), self.engine_id, entry.message.clone());
} }
} }
} }
@@ -469,14 +402,13 @@ impl<B: BlockT> ConsensusGossip<B> {
&mut self, &mut self,
network: &mut dyn Network<B>, network: &mut dyn Network<B>,
topic: B::Hash, topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>, message: Vec<u8>,
force: bool, force: bool,
) { ) {
let message_hash = HashFor::<B>::hash(&message); let message_hash = HashFor::<B>::hash(&message);
self.register_message_hashed(message_hash, topic, engine_id, message.clone(), None); self.register_message_hashed(message_hash, topic, message.clone(), None);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(network, iter::once((&message_hash, &topic, engine_id, &message)), intent, &mut self.peers, &self.validators); propagate(network, self.engine_id, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validator);
} }
/// Send addressed message to a peer. The message is not kept or multicast /// Send addressed message to a peer. The message is not kept or multicast
@@ -485,7 +417,6 @@ impl<B: BlockT> ConsensusGossip<B> {
&mut self, &mut self,
network: &mut dyn Network<B>, network: &mut dyn Network<B>,
who: &PeerId, who: &PeerId,
engine_id: ConsensusEngineId,
message: Vec<u8>, message: Vec<u8>,
) { ) {
let peer = match self.peers.get_mut(who) { let peer = match self.peers.get_mut(who) {
@@ -498,7 +429,7 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message); trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
peer.known_messages.insert(message_hash); peer.known_messages.insert(message_hash);
network.write_notification(who.clone(), engine_id, message); network.write_notification(who.clone(), self.engine_id, message);
} }
} }
@@ -518,7 +449,6 @@ mod tests {
$consensus.messages.push(MessageEntry { $consensus.messages.push(MessageEntry {
message_hash: $hash, message_hash: $hash,
topic: $topic, topic: $topic,
engine_id: [0, 0, 0, 0],
message: $m, message: $m,
sender: None, sender: None,
}); });
@@ -562,7 +492,7 @@ mod tests {
let prev_hash = H256::random(); let prev_hash = H256::random();
let best_hash = H256::random(); let best_hash = H256::random();
let mut consensus = ConsensusGossip::<Block>::new(); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
let m1_hash = H256::random(); let m1_hash = H256::random();
let m2_hash = H256::random(); let m2_hash = H256::random();
let m1 = vec![1, 2, 3]; let m1 = vec![1, 2, 3];
@@ -573,13 +503,11 @@ mod tests {
consensus.known_messages.put(m1_hash, ()); consensus.known_messages.put(m1_hash, ());
consensus.known_messages.put(m2_hash, ()); consensus.known_messages.put(m2_hash, ());
let test_engine_id = Default::default();
consensus.register_validator_internal(test_engine_id, Arc::new(AllowAll));
consensus.collect_garbage(); consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 2); assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.known_messages.len(), 2); assert_eq!(consensus.known_messages.len(), 2);
consensus.register_validator_internal(test_engine_id, Arc::new(AllowOne)); consensus.validator = Arc::new(AllowOne);
// m2 is expired // m2 is expired
consensus.collect_garbage(); consensus.collect_garbage();
@@ -591,70 +519,47 @@ mod tests {
#[test] #[test]
fn message_stream_include_those_sent_before_asking_for_stream() { fn message_stream_include_those_sent_before_asking_for_stream() {
let mut consensus = ConsensusGossip::<Block>::new(); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let engine_id = [0, 0, 0, 0];
let message = vec![4, 5, 6]; let message = vec![4, 5, 6];
let topic = HashFor::<Block>::hash(&[1,2,3]); let topic = HashFor::<Block>::hash(&[1,2,3]);
consensus.register_message(topic, engine_id, message.clone()); consensus.register_message(topic, message.clone());
let mut stream = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic)); let mut stream = block_on_stream(consensus.messages_for(topic));
assert_eq!(stream.next(), Some(TopicNotification { message: message, sender: None })); assert_eq!(stream.next(), Some(TopicNotification { message: message, sender: None }));
} }
#[test] #[test]
fn can_keep_multiple_messages_per_topic() { fn can_keep_multiple_messages_per_topic() {
let mut consensus = ConsensusGossip::<Block>::new(); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
let topic = [1; 32].into(); let topic = [1; 32].into();
let msg_a = vec![1, 2, 3]; let msg_a = vec![1, 2, 3];
let msg_b = vec![4, 5, 6]; let msg_b = vec![4, 5, 6];
consensus.register_message(topic, [0, 0, 0, 0], msg_a); consensus.register_message(topic, msg_a);
consensus.register_message(topic, [0, 0, 0, 0], msg_b); consensus.register_message(topic, msg_b);
assert_eq!(consensus.messages.len(), 2); assert_eq!(consensus.messages.len(), 2);
} }
#[test] #[test]
fn can_keep_multiple_subscribers_per_topic() { fn can_keep_multiple_subscribers_per_topic() {
let mut consensus = ConsensusGossip::<Block>::new(); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let message = vec![4, 5, 6]; let message = vec![4, 5, 6];
let topic = HashFor::<Block>::hash(&[1, 2, 3]); let topic = HashFor::<Block>::hash(&[1, 2, 3]);
consensus.register_message(topic, [0, 0, 0, 0], message.clone()); consensus.register_message(topic, message.clone());
let mut stream1 = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic)); let mut stream1 = block_on_stream(consensus.messages_for(topic));
let mut stream2 = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic)); let mut stream2 = block_on_stream(consensus.messages_for(topic));
assert_eq!(stream1.next(), Some(TopicNotification { message: message.clone(), sender: None })); assert_eq!(stream1.next(), Some(TopicNotification { message: message.clone(), sender: None }));
assert_eq!(stream2.next(), Some(TopicNotification { message, sender: None })); assert_eq!(stream2.next(), Some(TopicNotification { message, sender: None }));
} }
#[test]
fn topics_are_localized_to_engine_id() {
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let topic = [1; 32].into();
let msg_a = vec![1, 2, 3];
let msg_b = vec![4, 5, 6];
consensus.register_message(topic, [0, 0, 0, 0], msg_a);
consensus.register_message(topic, [0, 0, 0, 1], msg_b);
let mut stream = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
assert_eq!(stream.next(), Some(TopicNotification { message: vec![1, 2, 3], sender: None }));
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
assert_eq!(stream.next(), None);
}
#[test] #[test]
fn peer_is_removed_on_disconnect() { fn peer_is_removed_on_disconnect() {
struct TestNetwork; struct TestNetwork;
@@ -690,8 +595,7 @@ mod tests {
} }
} }
let mut consensus = ConsensusGossip::<Block>::new(); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let mut network = TestNetwork; let mut network = TestNetwork;