From 19ef458e5be8e84c60fb474bbf515f53ce2fc7a7 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 21 Jun 2021 16:27:42 +0100 Subject: [PATCH] Allow multiple SystemConnects to be handled from a single node in the shard --- backend/common/src/assign_id.rs | 4 ++ backend/common/src/internal_messages.rs | 4 ++ backend/shard/src/aggregator.rs | 40 ++++++++++---- backend/shard/src/main.rs | 72 ++++++++++++------------- backend/telemetry/src/aggregator.rs | 56 ++++++++++++++----- backend/telemetry/src/main.rs | 3 ++ 6 files changed, 122 insertions(+), 57 deletions(-) diff --git a/backend/common/src/assign_id.rs b/backend/common/src/assign_id.rs index ed60d78..a85db06 100644 --- a/backend/common/src/assign_id.rs +++ b/backend/common/src/assign_id.rs @@ -69,4 +69,8 @@ impl
AssignId
where Details: Eq + Hash + Clone { pub fn clear(&mut self) { *self = AssignId::new() } + + pub fn iter(&self) -> impl Iterator { + self.from_id.iter().map(|(id, details)| (*id, details)) + } } \ No newline at end of file diff --git a/backend/common/src/internal_messages.rs b/backend/common/src/internal_messages.rs index 1aebab0..1024b65 100644 --- a/backend/common/src/internal_messages.rs +++ b/backend/common/src/internal_messages.rs @@ -26,6 +26,10 @@ pub enum FromShardAggregator { local_id: LocalId, payload: Payload, }, + /// Inform the core that a node has been removed + RemoveNode { + local_id: LocalId + } } /// Message sent form the backend core to the shard diff --git a/backend/shard/src/aggregator.rs b/backend/shard/src/aggregator.rs index 5c80a87..e54234b 100644 --- a/backend/shard/src/aggregator.rs +++ b/backend/shard/src/aggregator.rs @@ -29,22 +29,27 @@ enum ToAggregator { /// messages from it will be ignored. #[derive(Clone,Debug)] pub enum FromWebsocket { - /// Tell the aggregator about a new node. - Add { - message_id: node::NodeMessageId, - ip: Option, - node: common::types::NodeDetails, + /// Fire this when the connection is established. + Initialize { /// When a message is sent back up this channel, we terminate /// the websocket connection and force the node to reconnect /// so that it sends its system info again incase the telemetry /// core has restarted. close_connection: mpsc::Sender<()> }, + /// Tell the aggregator about a new node. + Add { + message_id: node::NodeMessageId, + ip: Option, + node: common::types::NodeDetails, + }, /// Update/pass through details about a node. Update { message_id: node::NodeMessageId, payload: node::Payload - } + }, + /// Make a note when the node disconnects. + Disconnected } pub type FromAggregator = internal_messages::FromShardAggregator; @@ -139,10 +144,13 @@ impl Aggregator { connected_to_telemetry_core = false; log::info!("Disconnected from telemetry core"); }, - ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node, close_connection }) => { - // Keep the close_connection channel incase we need it: + ToAggregator::FromWebsocket(_conn_id, FromWebsocket::Initialize { close_connection }) => { + // We boot all connections on a reconnect-to-core to force new systemconnected + // messages to be sent. We could boot on muting, but need to be careful not to boot + // connections where we mute one set of messages it sends and not others. close_connections.push(close_connection); - + }, + ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node }) => { // Don't bother doing anything else if we're disconnected, since we'll force the // ndoe to reconnect anyway when the backend does: if !connected_to_telemetry_core { continue } @@ -178,6 +186,20 @@ impl Aggregator { payload }).await; }, + ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => { + // Find all of the local IDs corresponding to the disconnected connection ID and + // remove them, telling Telemetry Core about them too. This could be more efficient, + // but the mapping isn't currently cached and it's not a super frequent op. + let local_ids_disconnected: Vec<_> = to_local_id.iter() + .filter(|(_, &(conn_id, _))| disconnected_conn_id == conn_id) + .map(|(local_id, _)| local_id) + .collect(); + + for local_id in local_ids_disconnected { + to_local_id.remove_by_id(local_id); + let _ = tx_to_telemetry_core.send(FromShardAggregator::RemoveNode { local_id }).await; + } + }, ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute { local_id }) => { // Ignore incoming messages if we're not connected to the backend: if !connected_to_telemetry_core { continue } diff --git a/backend/shard/src/main.rs b/backend/shard/src/main.rs index 1f7c374..b1ae7b7 100644 --- a/backend/shard/src/main.rs +++ b/backend/shard/src/main.rs @@ -89,8 +89,13 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { let tx_to_aggregator = aggregator.subscribe_node(); log::info!("Opening /submit connection from {:?}", addr); ws.on_upgrade(move |websocket| async move { - handle_websocket_connection(websocket, tx_to_aggregator, addr).await; + let (mut tx_to_aggregator, websocket) = handle_websocket_connection(websocket, tx_to_aggregator, addr).await; log::info!("Closing /submit connection from {:?}", addr); + // Tell the aggregator that this connection has closed, so it can tidy up. + let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await; + // Note: IF we want to close with a status code and reason, we need to construct + // a ws::Message using `ws::Message::close_with`, rather than using this method: + let _ = websocket.close().await; }) }); @@ -101,53 +106,38 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { } /// This takes care of handling messages from an established socket connection. -async fn handle_websocket_connection(websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option) +async fn handle_websocket_connection(mut websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option) -> (S, ws::WebSocket) where S: futures::Sink + Unpin { - let mut websocket = websocket.fuse(); - // This could be a oneshot channel, but it's useful to be able to clone // messages, and we can't clone oneshot channel senders. let (close_connection_tx, mut close_connection_rx) = mpsc::channel(0); - // First, we wait until we receive a SystemConnected message. - // Until this turns up, we ignore other messages. We could buffer - // a few quite easily if we liked. - while let Some(msg) = websocket.next().await { - let node_message = match deserialize_ws_message(msg) { - Ok(Some(msg)) => msg, - Ok(None) => continue, - Err(e) => { log::error!("{}", e); break } - }; - - let message_id = node_message.id(); - let payload = node_message.into_payload(); - - if let node::Payload::SystemConnected(info) = payload { - let _ = tx_to_aggregator.send(FromWebsocket::Add { - message_id, - ip: addr.map(|a| a.ip()), - node: info.node, - close_connection: close_connection_tx, - }).await; - break; - } + // Tell the aggregator about this new connection, and give it a way to close this connection: + let init_msg = FromWebsocket::Initialize { + close_connection: close_connection_tx + }; + if let Err(e) = tx_to_aggregator.send(init_msg).await { + log::error!("Error sending message to aggregator: {}", e); + return (tx_to_aggregator, websocket); } - // Now, the node has been added, so we forward messages along as updates. - // We keep an eye on the close_connection channel; if that resolves, then - // end this loop and let the connection close gracefully. + // Now we've "initialized", wait for messages from the node. Messages will + // either be `SystemConnected` type messages that inform us that a new set + // of messages with some message ID will be sent (a node could have more + // than one of these), or updates linked to a specific message_id. loop { - futures::select_biased! { + tokio::select! { // The close channel has fired, so end the loop: _ = close_connection_rx.next() => { + log::info!("connection to {:?} being closed by aggregator", addr); break }, // A message was received; handle it: msg = websocket.next() => { let msg = match msg { Some(msg) => msg, - None => break + None => { log::warn!("Websocket connection from {:?} closed", addr); break } }; let node_message = match deserialize_ws_message(msg) { @@ -159,7 +149,19 @@ async fn handle_websocket_connection(websocket: ws::WebSocket, mut tx_to_aggr let message_id = node_message.id(); let payload = node_message.into_payload(); - if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await { + // Until the aggregator receives an `Add` message, which we can create once + // we see one of these SystemConnected ones, it will ignore messages with + // the corresponding message_id. + if let node::Payload::SystemConnected(info) = payload { + let _ = tx_to_aggregator.send(FromWebsocket::Add { + message_id, + ip: addr.map(|a| a.ip()), + node: info.node, + }).await; + } + // Anything that's not an "Add" is an Update. The aggregator will ignore + // updates against a message_id that hasn't first been Added, above. + else if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await { log::error!("Failed to send node message to aggregator: {}", e); continue; } @@ -167,10 +169,8 @@ async fn handle_websocket_connection(websocket: ws::WebSocket, mut tx_to_aggr } } - // loops ended; attempt to close the connection gracefully. - // Note: IF we want to close with a status code and reason, we need to construct - // a ws::Message using `ws::Message::close_with`, rather than using this method: - let _ = websocket.close().await; + // Return what we need to close the connection gracefully: + (tx_to_aggregator, websocket) } /// Deserialize an incoming websocket message, returning an error if something diff --git a/backend/telemetry/src/aggregator.rs b/backend/telemetry/src/aggregator.rs index 7f7c22c..c750bc4 100644 --- a/backend/telemetry/src/aggregator.rs +++ b/backend/telemetry/src/aggregator.rs @@ -38,6 +38,10 @@ pub enum FromShardWebsocket { Update { local_id: LocalId, payload: node::Payload + }, + /// Tell the aggregator that a node has been removed when it disconnects. + Remove { + local_id: LocalId, } } @@ -138,44 +142,72 @@ impl Aggregator { // any more, this task will gracefully end. async fn handle_messages(mut rx_from_external: mpsc::Receiver, denylist: Vec) { - let mut nodes_state = State::new(); + let mut node_state = State::new(); // Maintain mappings from the shard connection ID and local ID of messages to a global ID // that uniquely identifies nodes in our node state. - let mut to_global_id = AssignId::new(); + let mut to_global_node_id = AssignId::new(); - // Temporary: if we drop channels to shards, they will be booted: - let mut to_shards = vec![]; + // Keep track of channels to communicate with feeds and shards: + let mut feed_channels = HashMap::new(); + let mut shard_channels = HashMap::new(); + + // What chains have aour feeds subscribed to (one at a time at the mo): + let mut feed_conn_id_to_chain: HashMap> = HashMap::new(); + let mut chain_to_feed_conn_ids: HashMap, HashSet> = HashMap::new(); + let mut feed_conn_id_finality: HashSet = HashSet::new(); // Now, loop and receive messages to handle. while let Some(msg) = rx_from_external.next().await { match msg { ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Initialize { channel }) => { + feed_channels.insert(feed_conn_id, channel); + // TODO: `feed::AddedChain` message to tell feed about current chains. }, ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Ping { chain }) => { - + // TODO: Return with feed::Pong(chain) feed message. }, ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Subscribe { chain }) => { + // Unsubscribe from previous chain if subscribed to one: + if let Some(feed_ids) = chain_to_feed_conn_ids.get_mut(&chain) { + feed_ids.remove(&feed_conn_id); + } + + // Subscribe to the new chain: + feed_conn_id_to_chain.insert(feed_conn_id, chain.clone()); + chain_to_feed_conn_ids.entry(chain).or_default().insert(feed_conn_id); }, - ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::SendFinality { chain }) => { - + ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::SendFinality { chain: _ }) => { + feed_conn_id_finality.insert(feed_conn_id); + // TODO: Do we care about the chain here? }, - ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality { chain }) => { - + ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality { chain: _ }) => { + feed_conn_id_finality.remove(&feed_conn_id); + // TODO: Do we care about the chain here? }, ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Initialize { channel }) => { - to_shards.push(channel); + shard_channels.insert(shard_conn_id, channel); }, ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Add { local_id, ip, node }) => { - let global_id = to_global_id.assign_id((shard_conn_id, local_id)); + let global_node_id = to_global_node_id.assign_id((shard_conn_id, local_id)); + + // TODO: node_state.add_node. Every feed should know about node count changes. + }, + ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Remove { local_id }) => { + println!("Removed node! {:?}", local_id); + if let Some(id) = to_global_node_id.remove_by_details(&(shard_conn_id, local_id)) { + // TODO: node_state.remove_node, Every feed should know about node count changes. + } }, ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => { - let global_id = match to_global_id.get_id(&(shard_conn_id, local_id)) { + let global_node_id = match to_global_node_id.get_id(&(shard_conn_id, local_id)) { Some(id) => id, None => continue }; + + // TODO: node_state.update_node, then handle returned diffs }, } } diff --git a/backend/telemetry/src/main.rs b/backend/telemetry/src/main.rs index de8f0f5..cefa094 100644 --- a/backend/telemetry/src/main.rs +++ b/backend/telemetry/src/main.rs @@ -199,6 +199,9 @@ async fn handle_shard_websocket_connection(mut websocket: ws::WebSocket, mut internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => { FromShardWebsocket::Update { local_id, payload } }, + internal_messages::FromShardAggregator::RemoveNode { local_id } => { + FromShardWebsocket::Remove { local_id } + }, }; if let Err(e) = tx_to_aggregator.send(aggregator_msg).await { log::error!("Failed to send message to aggregator; closing shard: {}", e);