Allow multiple SystemConnects to be handled from a single node in the shard

This commit is contained in:
James Wilson
2021-06-21 16:27:42 +01:00
parent 06d131bf3f
commit 19ef458e5b
6 changed files with 122 additions and 57 deletions
+4
View File
@@ -69,4 +69,8 @@ impl <Details> AssignId<Details> where Details: Eq + Hash + Clone {
pub fn clear(&mut self) {
*self = AssignId::new()
}
pub fn iter(&self) -> impl Iterator<Item = (Id, &Details)> {
self.from_id.iter().map(|(id, details)| (*id, details))
}
}
+4
View File
@@ -26,6 +26,10 @@ pub enum FromShardAggregator {
local_id: LocalId,
payload: Payload,
},
/// Inform the core that a node has been removed
RemoveNode {
local_id: LocalId
}
}
/// Message sent form the backend core to the shard
+31 -9
View File
@@ -29,22 +29,27 @@ enum ToAggregator {
/// messages from it will be ignored.
#[derive(Clone,Debug)]
pub enum FromWebsocket {
/// Tell the aggregator about a new node.
Add {
message_id: node::NodeMessageId,
ip: Option<std::net::IpAddr>,
node: common::types::NodeDetails,
/// Fire this when the connection is established.
Initialize {
/// When a message is sent back up this channel, we terminate
/// the websocket connection and force the node to reconnect
/// so that it sends its system info again incase the telemetry
/// core has restarted.
close_connection: mpsc::Sender<()>
},
/// Tell the aggregator about a new node.
Add {
message_id: node::NodeMessageId,
ip: Option<std::net::IpAddr>,
node: common::types::NodeDetails,
},
/// Update/pass through details about a node.
Update {
message_id: node::NodeMessageId,
payload: node::Payload
}
},
/// Make a note when the node disconnects.
Disconnected
}
pub type FromAggregator = internal_messages::FromShardAggregator;
@@ -139,10 +144,13 @@ impl Aggregator {
connected_to_telemetry_core = false;
log::info!("Disconnected from telemetry core");
},
ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node, close_connection }) => {
// Keep the close_connection channel incase we need it:
ToAggregator::FromWebsocket(_conn_id, FromWebsocket::Initialize { close_connection }) => {
// We boot all connections on a reconnect-to-core to force new systemconnected
// messages to be sent. We could boot on muting, but need to be careful not to boot
// connections where we mute one set of messages it sends and not others.
close_connections.push(close_connection);
},
ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node }) => {
// Don't bother doing anything else if we're disconnected, since we'll force the
// ndoe to reconnect anyway when the backend does:
if !connected_to_telemetry_core { continue }
@@ -178,6 +186,20 @@ impl Aggregator {
payload
}).await;
},
ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => {
// Find all of the local IDs corresponding to the disconnected connection ID and
// remove them, telling Telemetry Core about them too. This could be more efficient,
// but the mapping isn't currently cached and it's not a super frequent op.
let local_ids_disconnected: Vec<_> = to_local_id.iter()
.filter(|(_, &(conn_id, _))| disconnected_conn_id == conn_id)
.map(|(local_id, _)| local_id)
.collect();
for local_id in local_ids_disconnected {
to_local_id.remove_by_id(local_id);
let _ = tx_to_telemetry_core.send(FromShardAggregator::RemoveNode { local_id }).await;
}
},
ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute { local_id }) => {
// Ignore incoming messages if we're not connected to the backend:
if !connected_to_telemetry_core { continue }
+36 -36
View File
@@ -89,8 +89,13 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
let tx_to_aggregator = aggregator.subscribe_node();
log::info!("Opening /submit connection from {:?}", addr);
ws.on_upgrade(move |websocket| async move {
handle_websocket_connection(websocket, tx_to_aggregator, addr).await;
let (mut tx_to_aggregator, websocket) = handle_websocket_connection(websocket, tx_to_aggregator, addr).await;
log::info!("Closing /submit connection from {:?}", addr);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await;
// Note: IF we want to close with a status code and reason, we need to construct
// a ws::Message using `ws::Message::close_with`, rather than using this method:
let _ = websocket.close().await;
})
});
@@ -101,53 +106,38 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
}
/// This takes care of handling messages from an established socket connection.
async fn handle_websocket_connection<S>(websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option<SocketAddr>)
async fn handle_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option<SocketAddr>) -> (S, ws::WebSocket)
where S: futures::Sink<FromWebsocket, Error = anyhow::Error> + Unpin
{
let mut websocket = websocket.fuse();
// This could be a oneshot channel, but it's useful to be able to clone
// messages, and we can't clone oneshot channel senders.
let (close_connection_tx, mut close_connection_rx) = mpsc::channel(0);
// First, we wait until we receive a SystemConnected message.
// Until this turns up, we ignore other messages. We could buffer
// a few quite easily if we liked.
while let Some(msg) = websocket.next().await {
let node_message = match deserialize_ws_message(msg) {
Ok(Some(msg)) => msg,
Ok(None) => continue,
Err(e) => { log::error!("{}", e); break }
};
let message_id = node_message.id();
let payload = node_message.into_payload();
if let node::Payload::SystemConnected(info) = payload {
let _ = tx_to_aggregator.send(FromWebsocket::Add {
message_id,
ip: addr.map(|a| a.ip()),
node: info.node,
close_connection: close_connection_tx,
}).await;
break;
}
// Tell the aggregator about this new connection, and give it a way to close this connection:
let init_msg = FromWebsocket::Initialize {
close_connection: close_connection_tx
};
if let Err(e) = tx_to_aggregator.send(init_msg).await {
log::error!("Error sending message to aggregator: {}", e);
return (tx_to_aggregator, websocket);
}
// Now, the node has been added, so we forward messages along as updates.
// We keep an eye on the close_connection channel; if that resolves, then
// end this loop and let the connection close gracefully.
// Now we've "initialized", wait for messages from the node. Messages will
// either be `SystemConnected` type messages that inform us that a new set
// of messages with some message ID will be sent (a node could have more
// than one of these), or updates linked to a specific message_id.
loop {
futures::select_biased! {
tokio::select! {
// The close channel has fired, so end the loop:
_ = close_connection_rx.next() => {
log::info!("connection to {:?} being closed by aggregator", addr);
break
},
// A message was received; handle it:
msg = websocket.next() => {
let msg = match msg {
Some(msg) => msg,
None => break
None => { log::warn!("Websocket connection from {:?} closed", addr); break }
};
let node_message = match deserialize_ws_message(msg) {
@@ -159,7 +149,19 @@ async fn handle_websocket_connection<S>(websocket: ws::WebSocket, mut tx_to_aggr
let message_id = node_message.id();
let payload = node_message.into_payload();
if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await {
// Until the aggregator receives an `Add` message, which we can create once
// we see one of these SystemConnected ones, it will ignore messages with
// the corresponding message_id.
if let node::Payload::SystemConnected(info) = payload {
let _ = tx_to_aggregator.send(FromWebsocket::Add {
message_id,
ip: addr.map(|a| a.ip()),
node: info.node,
}).await;
}
// Anything that's not an "Add" is an Update. The aggregator will ignore
// updates against a message_id that hasn't first been Added, above.
else if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await {
log::error!("Failed to send node message to aggregator: {}", e);
continue;
}
@@ -167,10 +169,8 @@ async fn handle_websocket_connection<S>(websocket: ws::WebSocket, mut tx_to_aggr
}
}
// loops ended; attempt to close the connection gracefully.
// Note: IF we want to close with a status code and reason, we need to construct
// a ws::Message using `ws::Message::close_with`, rather than using this method:
let _ = websocket.close().await;
// Return what we need to close the connection gracefully:
(tx_to_aggregator, websocket)
}
/// Deserialize an incoming websocket message, returning an error if something
+44 -12
View File
@@ -38,6 +38,10 @@ pub enum FromShardWebsocket {
Update {
local_id: LocalId,
payload: node::Payload
},
/// Tell the aggregator that a node has been removed when it disconnects.
Remove {
local_id: LocalId,
}
}
@@ -138,44 +142,72 @@ impl Aggregator {
// any more, this task will gracefully end.
async fn handle_messages(mut rx_from_external: mpsc::Receiver<ToAggregator>, denylist: Vec<String>) {
let mut nodes_state = State::new();
let mut node_state = State::new();
// Maintain mappings from the shard connection ID and local ID of messages to a global ID
// that uniquely identifies nodes in our node state.
let mut to_global_id = AssignId::new();
let mut to_global_node_id = AssignId::new();
// Temporary: if we drop channels to shards, they will be booted:
let mut to_shards = vec![];
// Keep track of channels to communicate with feeds and shards:
let mut feed_channels = HashMap::new();
let mut shard_channels = HashMap::new();
// What chains have aour feeds subscribed to (one at a time at the mo):
let mut feed_conn_id_to_chain: HashMap<ConnId, Box<str>> = HashMap::new();
let mut chain_to_feed_conn_ids: HashMap<Box<str>, HashSet<ConnId>> = HashMap::new();
let mut feed_conn_id_finality: HashSet<ConnId> = HashSet::new();
// Now, loop and receive messages to handle.
while let Some(msg) = rx_from_external.next().await {
match msg {
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Initialize { channel }) => {
feed_channels.insert(feed_conn_id, channel);
// TODO: `feed::AddedChain` message to tell feed about current chains.
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Ping { chain }) => {
// TODO: Return with feed::Pong(chain) feed message.
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Subscribe { chain }) => {
// Unsubscribe from previous chain if subscribed to one:
if let Some(feed_ids) = chain_to_feed_conn_ids.get_mut(&chain) {
feed_ids.remove(&feed_conn_id);
}
// Subscribe to the new chain:
feed_conn_id_to_chain.insert(feed_conn_id, chain.clone());
chain_to_feed_conn_ids.entry(chain).or_default().insert(feed_conn_id);
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::SendFinality { chain }) => {
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::SendFinality { chain: _ }) => {
feed_conn_id_finality.insert(feed_conn_id);
// TODO: Do we care about the chain here?
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality { chain }) => {
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality { chain: _ }) => {
feed_conn_id_finality.remove(&feed_conn_id);
// TODO: Do we care about the chain here?
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Initialize { channel }) => {
to_shards.push(channel);
shard_channels.insert(shard_conn_id, channel);
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Add { local_id, ip, node }) => {
let global_id = to_global_id.assign_id((shard_conn_id, local_id));
let global_node_id = to_global_node_id.assign_id((shard_conn_id, local_id));
// TODO: node_state.add_node. Every feed should know about node count changes.
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Remove { local_id }) => {
println!("Removed node! {:?}", local_id);
if let Some(id) = to_global_node_id.remove_by_details(&(shard_conn_id, local_id)) {
// TODO: node_state.remove_node, Every feed should know about node count changes.
}
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => {
let global_id = match to_global_id.get_id(&(shard_conn_id, local_id)) {
let global_node_id = match to_global_node_id.get_id(&(shard_conn_id, local_id)) {
Some(id) => id,
None => continue
};
// TODO: node_state.update_node, then handle returned diffs
},
}
}
+3
View File
@@ -199,6 +199,9 @@ async fn handle_shard_websocket_connection<S>(mut websocket: ws::WebSocket, mut
internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => {
FromShardWebsocket::Update { local_id, payload }
},
internal_messages::FromShardAggregator::RemoveNode { local_id } => {
FromShardWebsocket::Remove { local_id }
},
};
if let Err(e) = tx_to_aggregator.send(aggregator_msg).await {
log::error!("Failed to send message to aggregator; closing shard: {}", e);