diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index a9e27ca..5248321 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -260,6 +260,70 @@ async fn e2e_feed_add_and_remove_node() { server.shutdown().await; } +/// If a node is added, a connecting feed should be told about the new chain. +/// However, sending a duplicate "system.connected" message from the same node +/// should not count as a new node but rather the second message should be ignored. +/// If the node is removed, the feed should be told that the chain has gone. +#[tokio::test] +async fn e2e_feed_ignore_duplicate_nodes() { + // Connect server and add shard + let mut server = start_server_debug().await; + let shard_id = server.add_shard().await.unwrap(); + + // Connect a node to the shard: + let connect = || server.get_shard(shard_id).unwrap().connect_node(); + + let (mut node_tx, _) = connect().await.expect("can connect to shard"); + + let system_connected = json!( + { + "id":1, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Local Testnet", + "config":"", + "genesis_hash": ghash(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name":"Alice", + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + } + ); + // Send a "system connected" message: + node_tx.send_json_text(system_connected.clone()).unwrap(); + // ...twice! + node_tx.send_json_text(system_connected.clone()).unwrap(); + + // Wait a little for this message to propagate to the core + // (so that our feed connects after the core knows and not before). + tokio::time::sleep(Duration::from_millis(500)).await; + + // Connect a feed. + let (_feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); + + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert!(feed_messages.contains(&FeedMessage::AddedChain { + name: "Local Testnet".to_owned(), + genesis_hash: ghash(1), + node_count: 1, + })); + + // Disconnect the node: + node_tx.close().await.unwrap(); + + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert!(feed_messages.contains(&FeedMessage::RemovedChain { + genesis_hash: ghash(1), + })); + + // Tidy up: + server.shutdown().await; +} + /// If nodes connect and the chain name changes, feeds will be told about this /// and will keep receiving messages about the renamed chain (despite subscribing /// to it by name). diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 45aa104..3b30728 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -352,7 +352,11 @@ where } // Note of the message ID, allowing telemetry for it. - allowed_message_ids.insert(message_id, Instant::now()); + let prev_join_time = allowed_message_ids.insert(message_id, Instant::now()); + if prev_join_time.is_some() { + log::info!("Ignoring duplicate new node with ID {message_id} from {real_addr:?}"); + continue; + } // Tell the aggregator loop about the new node. log::info!("Adding node with message ID {message_id} from {real_addr:?}");