mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-12 16:51:02 +00:00
Subscribe to chains by genesis hash (#395)
* Handle subscription by hash in the frontend * Forward-ported backend changes * Fix unit tests * Remove unused `chains_by_label` * fmt * Updated but failing E2E tests * subscribe by genesis hash in tests * fmt * Copy `BlockHash` instead of returning a ref * Pin chains by genesisHash Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
@@ -94,7 +94,7 @@ pub enum FromFeedWebsocket {
|
||||
},
|
||||
/// The feed can subscribe to a chain to receive
|
||||
/// messages relating to it.
|
||||
Subscribe { chain: Box<str> },
|
||||
Subscribe { chain: BlockHash },
|
||||
/// The feed wants finality info for the chain, too.
|
||||
SendFinality,
|
||||
/// The feed doesn't want any more finality info for the chain.
|
||||
@@ -136,12 +136,16 @@ impl FromStr for FromFeedWebsocket {
|
||||
type Err = anyhow::Error;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let (cmd, value) = match s.find(':') {
|
||||
Some(idx) => (&s[..idx], s[idx + 1..].into()),
|
||||
Some(idx) => (&s[..idx], &s[idx + 1..]),
|
||||
None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`")),
|
||||
};
|
||||
match cmd {
|
||||
"ping" => Ok(FromFeedWebsocket::Ping { value }),
|
||||
"subscribe" => Ok(FromFeedWebsocket::Subscribe { chain: value }),
|
||||
"ping" => Ok(FromFeedWebsocket::Ping {
|
||||
value: value.into(),
|
||||
}),
|
||||
"subscribe" => Ok(FromFeedWebsocket::Subscribe {
|
||||
chain: value.parse()?,
|
||||
}),
|
||||
"send-finality" => Ok(FromFeedWebsocket::SendFinality),
|
||||
"no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality),
|
||||
_ => return Err(anyhow::anyhow!("Command {} not recognised", cmd)),
|
||||
@@ -306,7 +310,7 @@ impl InnerLoop {
|
||||
let chain_genesis_hash = self
|
||||
.node_state
|
||||
.get_chain_by_node_id(node_id)
|
||||
.map(|chain| *chain.genesis_hash());
|
||||
.map(|chain| chain.genesis_hash());
|
||||
|
||||
if let Some(chain_genesis_hash) = chain_genesis_hash {
|
||||
self.finalize_and_broadcast_to_chain_feeds(
|
||||
@@ -353,7 +357,6 @@ impl InnerLoop {
|
||||
self.node_ids.insert(node_id, (shard_conn_id, local_id));
|
||||
|
||||
// Don't hold onto details too long because we want &mut self later:
|
||||
let old_chain_label = details.old_chain_label.to_owned();
|
||||
let new_chain_label = details.new_chain_label.to_owned();
|
||||
let chain_node_count = details.chain_node_count;
|
||||
let has_chain_label_changed = details.has_chain_label_changed;
|
||||
@@ -371,11 +374,13 @@ impl InnerLoop {
|
||||
// Tell everybody about the new node count and potential rename:
|
||||
let mut feed_messages_for_all = FeedMessageSerializer::new();
|
||||
if has_chain_label_changed {
|
||||
feed_messages_for_all
|
||||
.push(feed_message::RemovedChain(&old_chain_label));
|
||||
feed_messages_for_all.push(feed_message::RemovedChain(genesis_hash));
|
||||
}
|
||||
feed_messages_for_all
|
||||
.push(feed_message::AddedChain(&new_chain_label, chain_node_count));
|
||||
feed_messages_for_all.push(feed_message::AddedChain(
|
||||
&new_chain_label,
|
||||
genesis_hash,
|
||||
chain_node_count,
|
||||
));
|
||||
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all);
|
||||
|
||||
// Ask for the grographical location of the node.
|
||||
@@ -419,7 +424,7 @@ impl InnerLoop {
|
||||
.update_node(node_id, payload, &mut feed_message_serializer);
|
||||
|
||||
if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) {
|
||||
let genesis_hash = *chain.genesis_hash();
|
||||
let genesis_hash = chain.genesis_hash();
|
||||
if broadcast_finality {
|
||||
self.finalize_and_broadcast_to_chain_finality_feeds(
|
||||
&genesis_hash,
|
||||
@@ -456,10 +461,13 @@ impl InnerLoop {
|
||||
|
||||
// Tell the new feed subscription some basic things to get it going:
|
||||
let mut feed_serializer = FeedMessageSerializer::new();
|
||||
feed_serializer.push(feed_message::Version(31));
|
||||
feed_serializer.push(feed_message::Version(32));
|
||||
for chain in self.node_state.iter_chains() {
|
||||
feed_serializer
|
||||
.push(feed_message::AddedChain(chain.label(), chain.node_count()));
|
||||
feed_serializer.push(feed_message::AddedChain(
|
||||
chain.label(),
|
||||
chain.genesis_hash(),
|
||||
chain.node_count(),
|
||||
));
|
||||
}
|
||||
|
||||
// Send this to the channel that subscribed:
|
||||
@@ -498,7 +506,7 @@ impl InnerLoop {
|
||||
old_genesis_hash.and_then(|hash| node_state.get_chain_by_genesis_hash(&hash));
|
||||
|
||||
// Get new chain, ignoring the rest if it doesn't exist.
|
||||
let new_chain = match self.node_state.get_chain_by_label(&chain) {
|
||||
let new_chain = match self.node_state.get_chain_by_genesis_hash(&chain) {
|
||||
Some(chain) => chain,
|
||||
None => return,
|
||||
};
|
||||
@@ -506,9 +514,9 @@ impl InnerLoop {
|
||||
// Send messages to the feed about this subscription:
|
||||
let mut feed_serializer = FeedMessageSerializer::new();
|
||||
if let Some(old_chain) = old_chain {
|
||||
feed_serializer.push(feed_message::UnsubscribedFrom(old_chain.label()));
|
||||
feed_serializer.push(feed_message::UnsubscribedFrom(old_chain.genesis_hash()));
|
||||
}
|
||||
feed_serializer.push(feed_message::SubscribedTo(new_chain.label()));
|
||||
feed_serializer.push(feed_message::SubscribedTo(new_chain.genesis_hash()));
|
||||
feed_serializer.push(feed_message::TimeSync(time::now()));
|
||||
feed_serializer.push(feed_message::BestBlock(
|
||||
new_chain.best_block().height,
|
||||
@@ -559,7 +567,7 @@ impl InnerLoop {
|
||||
}
|
||||
|
||||
// Actually make a note of the new chain subsciption:
|
||||
let new_genesis_hash = *new_chain.genesis_hash();
|
||||
let new_genesis_hash = new_chain.genesis_hash();
|
||||
self.chain_to_feed_conn_ids
|
||||
.insert(new_genesis_hash, feed_conn_id);
|
||||
}
|
||||
@@ -585,7 +593,7 @@ impl InnerLoop {
|
||||
for node_id in node_ids.into_iter() {
|
||||
if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) {
|
||||
node_ids_per_chain
|
||||
.entry(*chain.genesis_hash())
|
||||
.entry(chain.genesis_hash())
|
||||
.or_default()
|
||||
.push(node_id);
|
||||
}
|
||||
@@ -629,13 +637,16 @@ impl InnerLoop {
|
||||
|
||||
// The chain has been removed (no nodes left in it, or it was renamed):
|
||||
if removed_details.chain_node_count == 0 || removed_details.has_chain_label_changed {
|
||||
feed_for_all.push(feed_message::RemovedChain(&removed_details.old_chain_label));
|
||||
feed_for_all.push(feed_message::RemovedChain(
|
||||
removed_details.chain_genesis_hash,
|
||||
));
|
||||
}
|
||||
|
||||
// If the chain still exists, tell everybody about the new label or updated node count:
|
||||
if removed_details.chain_node_count != 0 {
|
||||
feed_for_all.push(feed_message::AddedChain(
|
||||
&removed_details.new_chain_label,
|
||||
removed_details.chain_genesis_hash,
|
||||
removed_details.chain_node_count,
|
||||
));
|
||||
}
|
||||
|
||||
@@ -115,9 +115,9 @@ actions! {
|
||||
9: Hardware<'_>,
|
||||
10: TimeSync,
|
||||
11: AddedChain<'_>,
|
||||
12: RemovedChain<'_>,
|
||||
13: SubscribedTo<'_>,
|
||||
14: UnsubscribedFrom<'_>,
|
||||
12: RemovedChain,
|
||||
13: SubscribedTo,
|
||||
14: UnsubscribedFrom,
|
||||
15: Pong<'_>,
|
||||
16: AfgFinalized,
|
||||
17: AfgReceivedPrevote,
|
||||
@@ -163,16 +163,16 @@ pub struct Hardware<'a>(pub FeedNodeId, pub &'a NodeHardware);
|
||||
pub struct TimeSync(pub u64);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct AddedChain<'a>(pub &'a str, pub usize);
|
||||
pub struct AddedChain<'a>(pub &'a str, pub BlockHash, pub usize);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct RemovedChain<'a>(pub &'a str);
|
||||
pub struct RemovedChain(pub BlockHash);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct SubscribedTo<'a>(pub &'a str);
|
||||
pub struct SubscribedTo(pub BlockHash);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct UnsubscribedFrom<'a>(pub &'a str);
|
||||
pub struct UnsubscribedFrom(pub BlockHash);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct Pong<'a>(pub &'a str);
|
||||
|
||||
@@ -369,8 +369,8 @@ impl Chain {
|
||||
pub fn finalized_block(&self) -> &Block {
|
||||
&self.finalized
|
||||
}
|
||||
pub fn genesis_hash(&self) -> &BlockHash {
|
||||
&self.genesis_hash
|
||||
pub fn genesis_hash(&self) -> BlockHash {
|
||||
self.genesis_hash
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,6 @@ pub struct State {
|
||||
|
||||
// Find the right chain given various details.
|
||||
chains_by_genesis_hash: HashMap<BlockHash, ChainId>,
|
||||
chains_by_label: HashMap<Box<str>, ChainId>,
|
||||
|
||||
/// Chain labels that we do not want to allow connecting.
|
||||
denylist: HashSet<String>,
|
||||
@@ -96,6 +95,8 @@ pub struct RemovedNode {
|
||||
pub has_chain_label_changed: bool,
|
||||
/// The old label of the chain.
|
||||
pub old_chain_label: Box<str>,
|
||||
/// Genesis hash of the chain to be updated.
|
||||
pub chain_genesis_hash: BlockHash,
|
||||
/// The new label of the chain.
|
||||
pub new_chain_label: Box<str>,
|
||||
}
|
||||
@@ -105,7 +106,6 @@ impl State {
|
||||
State {
|
||||
chains: DenseMap::new(),
|
||||
chains_by_genesis_hash: HashMap::new(),
|
||||
chains_by_label: HashMap::new(),
|
||||
denylist: denylist.into_iter().collect(),
|
||||
}
|
||||
}
|
||||
@@ -127,13 +127,6 @@ impl State {
|
||||
.map(|chain| StateChain { chain })
|
||||
}
|
||||
|
||||
pub fn get_chain_by_label(&self, label: &str) -> Option<StateChain<'_>> {
|
||||
self.chains_by_label
|
||||
.get(label)
|
||||
.and_then(|&chain_id| self.chains.get(chain_id))
|
||||
.map(|chain| StateChain { chain })
|
||||
}
|
||||
|
||||
pub fn add_node(
|
||||
&mut self,
|
||||
genesis_hash: BlockHash,
|
||||
@@ -169,17 +162,10 @@ impl State {
|
||||
chain::AddNodeResult::Added { id, chain_renamed } => {
|
||||
let chain = &*chain;
|
||||
|
||||
// Update the label we use to reference the chain if
|
||||
// it changes (it'll always change first time a node's added):
|
||||
if chain_renamed {
|
||||
self.chains_by_label.remove(&old_chain_label);
|
||||
self.chains_by_label.insert(chain.label().into(), chain_id);
|
||||
}
|
||||
|
||||
AddNodeResult::NodeAddedToChain(NodeAddedToChain {
|
||||
id: NodeId(chain_id, id),
|
||||
node: chain.get_node(id).expect("node added above"),
|
||||
old_chain_label: old_chain_label,
|
||||
old_chain_label,
|
||||
new_chain_label: chain.label(),
|
||||
chain_node_count: chain.node_count(),
|
||||
has_chain_label_changed: chain_renamed,
|
||||
@@ -199,26 +185,20 @@ impl State {
|
||||
// Get updated chain details.
|
||||
let new_chain_label: Box<str> = chain.label().into();
|
||||
let chain_node_count = chain.node_count();
|
||||
let chain_genesis_hash = chain.genesis_hash();
|
||||
|
||||
// Is the chain empty? Remove if so and clean up indexes to it
|
||||
if chain_node_count == 0 {
|
||||
let genesis_hash = *chain.genesis_hash();
|
||||
self.chains_by_label.remove(&old_chain_label);
|
||||
let genesis_hash = chain.genesis_hash();
|
||||
self.chains_by_genesis_hash.remove(&genesis_hash);
|
||||
self.chains.remove(chain_id);
|
||||
}
|
||||
|
||||
// Make sure chains always referenced by their most common label:
|
||||
if remove_result.chain_renamed {
|
||||
self.chains_by_label.remove(&old_chain_label);
|
||||
self.chains_by_label
|
||||
.insert(new_chain_label.clone(), chain_id);
|
||||
}
|
||||
|
||||
Some(RemovedNode {
|
||||
old_chain_label,
|
||||
new_chain_label,
|
||||
chain_node_count: chain_node_count,
|
||||
chain_node_count,
|
||||
chain_genesis_hash,
|
||||
has_chain_label_changed: remove_result.chain_renamed,
|
||||
})
|
||||
}
|
||||
@@ -268,7 +248,7 @@ impl<'a> StateChain<'a> {
|
||||
pub fn label(&self) -> &'a str {
|
||||
self.chain.label()
|
||||
}
|
||||
pub fn genesis_hash(&self) -> &'a BlockHash {
|
||||
pub fn genesis_hash(&self) -> BlockHash {
|
||||
self.chain.genesis_hash()
|
||||
}
|
||||
pub fn node_count(&self) -> usize {
|
||||
@@ -358,7 +338,6 @@ mod test {
|
||||
.label(),
|
||||
"Chain One"
|
||||
);
|
||||
assert!(state.get_chain_by_label("Chain One").is_some());
|
||||
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
|
||||
|
||||
let node_id1 = state
|
||||
@@ -373,7 +352,6 @@ mod test {
|
||||
.label(),
|
||||
"Chain One"
|
||||
);
|
||||
assert!(state.get_chain_by_label("Chain One").is_some());
|
||||
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
|
||||
|
||||
let node_id2 = state
|
||||
@@ -388,8 +366,6 @@ mod test {
|
||||
.label(),
|
||||
"Chain Two"
|
||||
);
|
||||
assert!(state.get_chain_by_label("Chain One").is_none());
|
||||
assert!(state.get_chain_by_label("Chain Two").is_some());
|
||||
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
|
||||
|
||||
state.remove_node(node_id1).expect("Removal OK (id: 1)");
|
||||
@@ -403,8 +379,6 @@ mod test {
|
||||
.label(),
|
||||
"Chain One"
|
||||
);
|
||||
assert!(state.get_chain_by_label("Chain One").is_some());
|
||||
assert!(state.get_chain_by_label("Chain Two").is_none());
|
||||
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
|
||||
}
|
||||
|
||||
@@ -417,13 +391,11 @@ mod test {
|
||||
.add_node(chain1_genesis, node("A", "Chain One")) // 0
|
||||
.unwrap_id();
|
||||
|
||||
assert!(state.get_chain_by_label("Chain One").is_some());
|
||||
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
|
||||
assert_eq!(state.iter_chains().count(), 1);
|
||||
|
||||
state.remove_node(node_id);
|
||||
|
||||
assert!(state.get_chain_by_label("Chain One").is_none());
|
||||
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_none());
|
||||
assert_eq!(state.iter_chains().count(), 0);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user