mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-11 10:41:11 +00:00
fix(shard): Ignore duplicate node adds with same message ID (#514)
* fix(shard): Ignore duplicate node adds with same message ID * adapt the test
This commit is contained in:
@@ -260,6 +260,70 @@ async fn e2e_feed_add_and_remove_node() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
/// If a node is added, a connecting feed should be told about the new chain.
|
||||
/// However, sending a duplicate "system.connected" message from the same node
|
||||
/// should not count as a new node but rather the second message should be ignored.
|
||||
/// If the node is removed, the feed should be told that the chain has gone.
|
||||
#[tokio::test]
|
||||
async fn e2e_feed_ignore_duplicate_nodes() {
|
||||
// Connect server and add shard
|
||||
let mut server = start_server_debug().await;
|
||||
let shard_id = server.add_shard().await.unwrap();
|
||||
|
||||
// Connect a node to the shard:
|
||||
let connect = || server.get_shard(shard_id).unwrap().connect_node();
|
||||
|
||||
let (mut node_tx, _) = connect().await.expect("can connect to shard");
|
||||
|
||||
let system_connected = json!(
|
||||
{
|
||||
"id":1,
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Local Testnet",
|
||||
"config":"",
|
||||
"genesis_hash": ghash(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name":"Alice",
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
},
|
||||
}
|
||||
);
|
||||
// Send a "system connected" message:
|
||||
node_tx.send_json_text(system_connected.clone()).unwrap();
|
||||
// ...twice!
|
||||
node_tx.send_json_text(system_connected.clone()).unwrap();
|
||||
|
||||
// Wait a little for this message to propagate to the core
|
||||
// (so that our feed connects after the core knows and not before).
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
||||
// Connect a feed.
|
||||
let (_feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap();
|
||||
|
||||
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
|
||||
assert!(feed_messages.contains(&FeedMessage::AddedChain {
|
||||
name: "Local Testnet".to_owned(),
|
||||
genesis_hash: ghash(1),
|
||||
node_count: 1,
|
||||
}));
|
||||
|
||||
// Disconnect the node:
|
||||
node_tx.close().await.unwrap();
|
||||
|
||||
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
|
||||
assert!(feed_messages.contains(&FeedMessage::RemovedChain {
|
||||
genesis_hash: ghash(1),
|
||||
}));
|
||||
|
||||
// Tidy up:
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
/// If nodes connect and the chain name changes, feeds will be told about this
|
||||
/// and will keep receiving messages about the renamed chain (despite subscribing
|
||||
/// to it by name).
|
||||
|
||||
@@ -352,7 +352,11 @@ where
|
||||
}
|
||||
|
||||
// Note of the message ID, allowing telemetry for it.
|
||||
allowed_message_ids.insert(message_id, Instant::now());
|
||||
let prev_join_time = allowed_message_ids.insert(message_id, Instant::now());
|
||||
if prev_join_time.is_some() {
|
||||
log::info!("Ignoring duplicate new node with ID {message_id} from {real_addr:?}");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Tell the aggregator loop about the new node.
|
||||
log::info!("Adding node with message ID {message_id} from {real_addr:?}");
|
||||
|
||||
Reference in New Issue
Block a user