Add some State tests, and use genesis_hash, not label, where possible

This commit is contained in:
James Wilson
2021-06-28 11:20:48 +01:00
parent 8a0eb14aca
commit c5ca84ee9a
4 changed files with 188 additions and 70 deletions
+2 -1
View File
@@ -137,8 +137,9 @@ mod test {
#[test]
fn default_renames_instantly() {
let mut a: MostSeen<&str> = MostSeen::default();
a.insert(&"Hello");
let res = a.insert(&"Hello");
assert_eq!(*a.best(), "Hello");
assert!(res.has_changed());
}
#[test]
+50 -37
View File
@@ -4,6 +4,7 @@ use common::{
LocalId,
MuteReason
},
types::BlockHash,
node,
util::now
};
@@ -88,7 +89,7 @@ pub enum FromFeedWebsocket {
NoMoreFinality,
/// An explicit ping message.
Ping {
chain: Box<str>
value: Box<str>
},
/// The feed is disconnected.
Disconnected
@@ -98,13 +99,13 @@ pub enum FromFeedWebsocket {
impl FromStr for FromFeedWebsocket {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (cmd, chain) = match s.find(':') {
let (cmd, value) = match s.find(':') {
Some(idx) => (&s[..idx], s[idx+1..].into()),
None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`"))
};
match cmd {
"ping" => Ok(FromFeedWebsocket::Ping { chain }),
"subscribe" => Ok(FromFeedWebsocket::Subscribe { chain }),
"ping" => Ok(FromFeedWebsocket::Ping { value }),
"subscribe" => Ok(FromFeedWebsocket::Subscribe { chain: value }),
"send-finality" => Ok(FromFeedWebsocket::SendFinality),
"no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality),
_ => return Err(anyhow::anyhow!("Command {} not recognised", cmd))
@@ -136,9 +137,11 @@ pub struct InnerLoop {
shard_channels: HashMap<ConnId, mpsc::Sender<ToShardWebsocket>>,
/// Which chain is a feed subscribed to?
feed_conn_id_to_chain: HashMap<ConnId, Box<str>>,
/// Feed Connection ID -> Chain Genesis Hash
feed_conn_id_to_chain: HashMap<ConnId, BlockHash>,
/// Which feeds are subscribed to a given chain (needs to stay in sync with above)?
chain_to_feed_conn_ids: HashMap<Box<str>, HashSet<ConnId>>,
/// Chain Genesis Hash -> Feed Connection IDs
chain_to_feed_conn_ids: HashMap<BlockHash, HashSet<ConnId>>,
/// These feeds want finality info, too.
feed_conn_id_finality: HashSet<ConnId>,
@@ -197,18 +200,19 @@ impl InnerLoop {
&loc.city
));
let chain_label = self.node_state
let chain_genesis_hash = self.node_state
.get_node_chain(node_id)
.map(|chain| chain.label().to_owned());
.map(|chain| *chain.genesis_hash());
if let Some(chain_label) = chain_label {
self.finalize_and_broadcast_to_chain_feeds(&chain_label, feed_message_serializer).await;
if let Some(chain_genesis_hash) = chain_genesis_hash {
self.finalize_and_broadcast_to_chain_feeds(&chain_genesis_hash, feed_message_serializer).await;
}
}
}
/// Handle messages coming from shards.
async fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) {
log::debug!("Message from shard ({}): {:?}", shard_conn_id, msg);
match msg {
FromShardWebsocket::Initialize { channel } => {
self.shard_channels.insert(shard_conn_id, channel);
@@ -246,7 +250,7 @@ impl InnerLoop {
// Tell chain subscribers about the node we've just added:
let mut feed_messages_for_chain = FeedMessageSerializer::new();
feed_messages_for_chain.push(feed_message::AddedNode(node_id, &details.node));
self.finalize_and_broadcast_to_chain_feeds(&old_chain_label, feed_messages_for_chain).await;
self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_messages_for_chain).await;
// Tell everybody about the new node count and potential rename:
let mut feed_messages_for_all = FeedMessageSerializer::new();
@@ -334,6 +338,7 @@ impl InnerLoop {
/// Handle messages coming from feeds.
async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) {
log::debug!("Message from feed ({}): {:?}", feed_conn_id, msg);
match msg {
FromFeedWebsocket::Initialize { mut channel } => {
self.feed_channels.insert(feed_conn_id, channel.clone());
@@ -353,7 +358,7 @@ impl InnerLoop {
let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
},
FromFeedWebsocket::Ping { chain } => {
FromFeedWebsocket::Ping { value } => {
let feed_channel = match self.feed_channels.get_mut(&feed_conn_id) {
Some(chan) => chan,
None => return
@@ -361,7 +366,7 @@ impl InnerLoop {
// Pong!
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::Pong(&chain));
feed_serializer.push(feed_message::Pong(&value));
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
@@ -373,9 +378,9 @@ impl InnerLoop {
};
// Unsubscribe from previous chain if subscribed to one:
let old_chain_label = self.feed_conn_id_to_chain.remove(&feed_conn_id);
if let Some(old_chain_label) = &old_chain_label {
if let Some(map) = self.chain_to_feed_conn_ids.get_mut(old_chain_label) {
let old_genesis_hash = self.feed_conn_id_to_chain.remove(&feed_conn_id);
if let Some(old_genesis_hash) = &old_genesis_hash {
if let Some(map) = self.chain_to_feed_conn_ids.get_mut(old_genesis_hash) {
map.remove(&feed_conn_id);
}
}
@@ -383,29 +388,34 @@ impl InnerLoop {
// Untoggle request for finality feeds:
self.feed_conn_id_finality.remove(&feed_conn_id);
// Get the chain we're subscribing to, ignoring the rest if it doesn't exist.
let chain = match self.node_state.get_chain_by_label(&chain) {
// Get old chain if there was one:
let node_state = &self.node_state;
let old_chain = old_genesis_hash
.and_then(|hash| node_state.get_chain_by_genesis_hash(&hash));
// Get new chain, ignoring the rest if it doesn't exist.
let new_chain = match self.node_state.get_chain_by_label(&chain) {
Some(chain) => chain,
None => return
};
// Send messages to the feed about the new chain:
// Send messages to the feed about this subscription:
let mut feed_serializer = FeedMessageSerializer::new();
if let Some(old_chain_label) = old_chain_label {
feed_serializer.push(feed_message::UnsubscribedFrom(&old_chain_label));
if let Some(old_chain) = old_chain {
feed_serializer.push(feed_message::UnsubscribedFrom(old_chain.label()));
}
feed_serializer.push(feed_message::SubscribedTo(chain.label()));
feed_serializer.push(feed_message::SubscribedTo(new_chain.label()));
feed_serializer.push(feed_message::TimeSync(now()));
feed_serializer.push(feed_message::BestBlock (
chain.best_block().height,
chain.timestamp(),
chain.average_block_time()
new_chain.best_block().height,
new_chain.timestamp(),
new_chain.average_block_time()
));
feed_serializer.push(feed_message::BestFinalized (
chain.finalized_block().height,
chain.finalized_block().hash
new_chain.finalized_block().height,
new_chain.finalized_block().hash
));
for (idx, (node_id, node)) in chain.iter_nodes().enumerate() {
for (idx, (node_id, node)) in new_chain.iter_nodes().enumerate() {
// Send subscription confirmation and chain head before doing all the nodes,
// and continue sending batches of 32 nodes a time over the wire subsequently
if idx % 32 == 0 {
@@ -428,8 +438,9 @@ impl InnerLoop {
}
// Actually make a note of the new chain subsciption:
self.feed_conn_id_to_chain.insert(feed_conn_id, chain.label().into());
self.chain_to_feed_conn_ids.entry(chain.label().into()).or_default().insert(feed_conn_id);
let new_genesis_hash = *new_chain.genesis_hash();
self.feed_conn_id_to_chain.insert(feed_conn_id, new_genesis_hash);
self.chain_to_feed_conn_ids.entry(new_genesis_hash).or_default().insert(feed_conn_id);
},
FromFeedWebsocket::SendFinality => {
self.feed_conn_id_finality.insert(feed_conn_id);
@@ -452,11 +463,10 @@ impl InnerLoop {
async fn remove_nodes_and_broadcast_result(&mut self, node_ids: impl IntoIterator<Item=NodeId>) {
// Group by chain to simplify the handling of feed messages:
let mut node_ids_per_chain: HashMap<String,Vec<NodeId>> = HashMap::new();
let mut node_ids_per_chain: HashMap<BlockHash,Vec<NodeId>> = HashMap::new();
for node_id in node_ids.into_iter() {
if let Some(chain) = self.node_state.get_node_chain(node_id) {
let chain_label = chain.label().to_owned();
node_ids_per_chain.entry(chain_label).or_default().push(node_id);
node_ids_per_chain.entry(*chain.genesis_hash()).or_default().push(node_id);
}
}
@@ -519,15 +529,18 @@ impl InnerLoop {
}
/// Finalize a [`FeedMessageSerializer`] and broadcast the result to feeds for the chain.
async fn finalize_and_broadcast_to_chain_feeds(&mut self, chain: &str, serializer: FeedMessageSerializer) {
async fn finalize_and_broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, serializer: FeedMessageSerializer) {
if let Some(bytes) = serializer.into_finalized() {
self.broadcast_to_chain_feeds(chain, ToFeedWebsocket::Bytes(bytes)).await;
self.broadcast_to_chain_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes)).await;
}
}
/// Send a message to all chain feeds.
async fn broadcast_to_chain_feeds(&mut self, chain: &str, message: ToFeedWebsocket) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) {
async fn broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) {
println!("BROADCAST TO CHAIN FEEDS, {}, \n\n{:?}\n\n{:?}\n\n", genesis_hash, self.chain_to_feed_conn_ids, self.feed_conn_id_to_chain);
if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) {
for &feed_id in feeds {
// How much faster would it be if we processed these in parallel?
// Is it practical to do so given lifetimes and such?
+1
View File
@@ -252,6 +252,7 @@ async fn handle_feed_websocket_connection<S>(mut websocket: ws::WebSocket, mut t
// pre-serialized bytes that we send as binary):
match msg {
ToFeedWebsocket::Bytes(bytes) => {
log::debug!("Message to feed: {}", std::str::from_utf8(&bytes).unwrap_or("INVALID UTF8"));
let _ = websocket.send(ws::Message::binary(bytes)).await;
}
}
+135 -32
View File
@@ -94,6 +94,13 @@ impl State {
.map(move |(_,chain)| StateChain { state: self, chain })
}
pub fn get_chain_by_genesis_hash(&self, genesis_hash: &BlockHash) -> Option<StateChain<'_>> {
self.chains_by_genesis_hash
.get(genesis_hash)
.and_then(|&chain_id| self.chains.get(chain_id))
.map(|chain| StateChain { state: self, chain })
}
pub fn get_chain_by_label(&self, label: &str) -> Option<StateChain<'_>> {
self.chains_by_label
.get(label)
@@ -107,6 +114,9 @@ impl State {
}
// Get the chain ID, creating a new empty chain if one doesn't exist.
// If we create a chain here, we are expecting that it will allow at
// least this node to be added, because we don't currently try and clean it up
// if the add fails.
let chain_id = match self.chains_by_genesis_hash.get(&genesis_hash) {
Some(id) => *id,
None => {
@@ -124,22 +134,25 @@ impl State {
// to add it until we know whether the chain will accept it, but we want
// an ID to give to the chain.
let node_id = self.nodes.next_id();
let chain_label = node_details.chain.clone();
let node_chain_label = node_details.chain.clone();
let old_chain_label = chain.label().into();
match chain.add_node(node_id, &chain_label) {
match chain.add_node(node_id, &node_chain_label) {
chain::AddNodeResult::Overquota => {
AddNodeResult::ChainOverQuota
},
chain::AddNodeResult::Added { chain_renamed } => {
let chain = &*chain;
// Actually add the node if the chain accepts it:
// Actually add the node, and a reference to its chain,
// if the chain adds it successfully:
self.nodes.add(Node::new(node_details));
self.chains_by_node.insert(node_id, chain_id);
// Update the label we use to reference the chain if
// it changes (it'll always change first time a node's added):
if chain_renamed {
self.chains_by_label.remove(&chain_label);
self.chains_by_label.remove(&old_chain_label);
self.chains_by_label.insert(chain.label().to_string().into_boxed_str(), chain_id);
}
@@ -147,7 +160,7 @@ impl State {
AddNodeResult::NodeAddedToChain(NodeAddedToChain {
id: node_id,
node: node,
old_chain_label: chain_label,
old_chain_label: old_chain_label,
new_chain_label: chain.label(),
chain_node_count: chain.node_count(),
has_chain_label_changed: chain_renamed
@@ -216,38 +229,13 @@ impl State {
fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> {
self.nodes.get_mut(node_id)
}
// /// Add a new node to our state.
// pub fn add_node(&mut self, id: GlobalId, genesis_hash: BlockHash, node: &NodeDetails) -> AddNodeResult {
// if self.denylist.contains(&*node.chain) {
// return AddNodeResult::ChainOnDenyList;
// }
// let chain_id = self.chains.get_or_create(genesis_hash, &node.chain);
// return Ok(())
// }
// /// Remove a node from our state.
// pub fn remove_node(&mut self, id: GlobalId) -> RemoveNodeResult {
// }
// /// Update a node with new data. This needs breaking down into parts so
// /// that we can emit a useful result in each case to inform the aggregator
// /// what messages it needs to emit.
// pub fn update_node(&mut self, id: GlobalId, payload: Payload) {
// }
// fn get_or_create_chain(genesis_hash: BlockHash, chain: &str) -> ChainId {
// }
}
/// When we ask for a chain, we get this struct back. This ensures that we have
/// a consistent public interface, and don't expose methods on [`Chain`] that
/// aren't really intended for use outside of [`State`] methods.
/// aren't really intended for use outside of [`State`] methods. Any modification
/// of a chain needs to go through [`State`].
pub struct StateChain<'a> {
state: &'a State,
chain: &'a Chain
@@ -257,6 +245,9 @@ impl <'a> StateChain<'a> {
pub fn label(&self) -> &'a str {
self.chain.label()
}
pub fn genesis_hash(&self) -> &'a BlockHash {
self.chain.genesis_hash()
}
pub fn node_count(&self) -> usize {
self.chain.node_count()
}
@@ -278,4 +269,116 @@ impl <'a> StateChain<'a> {
Some((id, state.nodes.get(id)?))
})
}
}
#[cfg(test)]
mod test {
use super::*;
fn node(name: &str, chain: &str) -> NodeDetails {
NodeDetails {
chain: chain.into(),
name: name.into(),
implementation: "Bar".into(),
version: "0.1".into(),
validator: None,
network_id: None,
startup_time: None
}
}
#[test]
fn adding_a_node_returns_expected_response() {
let mut state = State::new(None);
let chain1_genesis = BlockHash::from_low_u64_be(1);
let add_result = state.add_node(
chain1_genesis,
node("A", "Chain One")
);
let add_node_result = match add_result {
AddNodeResult::ChainOnDenyList => panic!("Chain not on deny list"),
AddNodeResult::ChainOverQuota => panic!("Chain not Overquota"),
AddNodeResult::NodeAddedToChain(details) => details
};
assert_eq!(add_node_result.id, 0);
assert_eq!(&*add_node_result.old_chain_label, "");
assert_eq!(&*add_node_result.new_chain_label, "Chain One");
assert_eq!(add_node_result.chain_node_count, 1);
assert_eq!(add_node_result.has_chain_label_changed, true);
let add_result = state.add_node(
chain1_genesis,
node("A", "Chain One")
);
let add_node_result = match add_result {
AddNodeResult::ChainOnDenyList => panic!("Chain not on deny list"),
AddNodeResult::ChainOverQuota => panic!("Chain not Overquota"),
AddNodeResult::NodeAddedToChain(details) => details
};
assert_eq!(add_node_result.id, 1);
assert_eq!(&*add_node_result.old_chain_label, "Chain One");
assert_eq!(&*add_node_result.new_chain_label, "Chain One");
assert_eq!(add_node_result.chain_node_count, 2);
assert_eq!(add_node_result.has_chain_label_changed, false);
}
#[test]
fn adding_and_removing_nodes_updates_chain_label_mapping() {
let mut state = State::new(None);
let chain1_genesis = BlockHash::from_low_u64_be(1);
state.add_node(chain1_genesis, node("A", "Chain One")); // 0
assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One");
assert!(state.get_chain_by_label("Chain One").is_some());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
state.add_node(chain1_genesis, node("B", "Chain Two")); // 1
// Chain name hasn't changed yet; "Chain One" as common as "Chain Two"..
assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One");
assert!(state.get_chain_by_label("Chain One").is_some());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
state.add_node(chain1_genesis, node("B", "Chain Two")); // 2
// Chain name has changed; "Chain Two" the winner now..
assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain Two");
assert!(state.get_chain_by_label("Chain One").is_none());
assert!(state.get_chain_by_label("Chain Two").is_some());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
state.remove_node(1).expect("Removal OK (id: 1)");
state.remove_node(2).expect("Removal OK (id: 2");
// Removed both "Chain Two" nodes; dominant name now "Chain One" again..
assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One");
assert!(state.get_chain_by_label("Chain One").is_some());
assert!(state.get_chain_by_label("Chain Two").is_none());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
}
#[test]
fn chain_removed_when_last_node_is() {
let mut state = State::new(None);
let chain1_genesis = BlockHash::from_low_u64_be(1);
state.add_node(chain1_genesis, node("A", "Chain One")); // 0
assert!(state.get_chain_by_label("Chain One").is_some());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
assert_eq!(state.iter_chains().count(), 1);
state.remove_node(0);
assert!(state.get_chain_by_label("Chain One").is_none());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_none());
assert_eq!(state.iter_chains().count(), 0);
}
}