mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-05-29 23:31:12 +00:00
feed/shard disconnects can be handled, and unbounded output to feeds
This commit is contained in:
@@ -49,7 +49,9 @@ pub enum FromShardWebsocket {
|
||||
/// Tell the aggregator that a node has been removed when it disconnects.
|
||||
Remove {
|
||||
local_id: LocalId,
|
||||
}
|
||||
},
|
||||
/// The shard is disconnected.
|
||||
Disconnected
|
||||
}
|
||||
|
||||
/// The aggregator can these messages back to a shard connection.
|
||||
@@ -66,8 +68,10 @@ pub enum ToShardWebsocket {
|
||||
pub enum FromFeedWebsocket {
|
||||
/// When the socket is opened, it'll send this first
|
||||
/// so that we have a way to communicate back to it.
|
||||
/// Unbounded so that slow feeds don't block aggregato
|
||||
/// progress.
|
||||
Initialize {
|
||||
channel: mpsc::Sender<ToFeedWebsocket>,
|
||||
channel: mpsc::UnboundedSender<ToFeedWebsocket>,
|
||||
},
|
||||
/// The feed can subscribe to a chain to receive
|
||||
/// messages relating to it.
|
||||
@@ -81,7 +85,9 @@ pub enum FromFeedWebsocket {
|
||||
/// An explicit ping message.
|
||||
Ping {
|
||||
chain: Box<str>
|
||||
}
|
||||
},
|
||||
/// The feed is disconnected.
|
||||
Disconnected
|
||||
}
|
||||
|
||||
// The frontend sends text based commands; parse them into these messages:
|
||||
@@ -166,6 +172,7 @@ impl Aggregator {
|
||||
// Now, loop and receive messages to handle.
|
||||
while let Some(msg) = rx_from_external.next().await {
|
||||
match msg {
|
||||
// FROM FEED
|
||||
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Initialize { mut channel }) => {
|
||||
feed_channels.insert(feed_conn_id, channel.clone());
|
||||
|
||||
@@ -268,6 +275,16 @@ impl Aggregator {
|
||||
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality) => {
|
||||
feed_conn_id_finality.remove(&feed_conn_id);
|
||||
},
|
||||
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Disconnected) => {
|
||||
// The feed has disconnected; clean up references to it:
|
||||
if let Some(chain) = feed_conn_id_to_chain.remove(&feed_conn_id) {
|
||||
chain_to_feed_conn_ids.remove(&chain);
|
||||
}
|
||||
feed_channels.remove(&feed_conn_id);
|
||||
feed_conn_id_finality.remove(&feed_conn_id);
|
||||
},
|
||||
|
||||
// FROM SHARD
|
||||
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Initialize { channel }) => {
|
||||
shard_channels.insert(shard_conn_id, channel);
|
||||
},
|
||||
@@ -325,6 +342,10 @@ impl Aggregator {
|
||||
|
||||
// TODO: node_state.update_node, then handle returned diffs
|
||||
},
|
||||
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Disconnected) => {
|
||||
// The shard has disconnected; remove the shard channel, but also
|
||||
// remove any nodes associated with the shard, firing the relevant feed messages.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,8 +91,10 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
let tx_to_aggregator = shard_aggregator.subscribe_shard();
|
||||
log::info!("Opening /shard_submit connection from {:?}", addr);
|
||||
ws.on_upgrade(move |websocket| async move {
|
||||
let websocket = handle_shard_websocket_connection(websocket, tx_to_aggregator).await;
|
||||
let (mut tx_to_aggregator, websocket) = handle_shard_websocket_connection(websocket, tx_to_aggregator).await;
|
||||
log::info!("Closing /shard_submit connection from {:?}", addr);
|
||||
// Tell the aggregator that this connection has closed, so it can tidy up.
|
||||
let _ = tx_to_aggregator.send(FromShardWebsocket::Disconnected).await;
|
||||
let _ = websocket.close().await;
|
||||
})
|
||||
});
|
||||
@@ -106,8 +108,10 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
let tx_to_aggregator = feed_aggregator.subscribe_feed();
|
||||
log::info!("Opening /feed connection from {:?}", addr);
|
||||
ws.on_upgrade(move |websocket| async move {
|
||||
let websocket = handle_feed_websocket_connection(websocket, tx_to_aggregator).await;
|
||||
let (mut tx_to_aggregator, websocket) = handle_feed_websocket_connection(websocket, tx_to_aggregator).await;
|
||||
log::info!("Closing /feed connection from {:?}", addr);
|
||||
// Tell the aggregator that this connection has closed, so it can tidy up.
|
||||
let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await;
|
||||
let _ = websocket.close().await;
|
||||
})
|
||||
});
|
||||
@@ -121,7 +125,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
/// This handles messages coming to/from a shard connection
|
||||
async fn handle_shard_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> ws::WebSocket
|
||||
async fn handle_shard_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> (S, ws::WebSocket)
|
||||
where S: futures::Sink<FromShardWebsocket, Error = anyhow::Error> + Unpin
|
||||
{
|
||||
let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::channel(10);
|
||||
@@ -132,7 +136,7 @@ async fn handle_shard_websocket_connection<S>(mut websocket: ws::WebSocket, mut
|
||||
};
|
||||
if let Err(e) = tx_to_aggregator.send(init_msg).await {
|
||||
log::error!("Error sending message to aggregator: {}", e);
|
||||
return websocket;
|
||||
return (tx_to_aggregator, websocket);
|
||||
}
|
||||
|
||||
// Loop, handling new messages from the shard or from the aggregator:
|
||||
@@ -213,14 +217,15 @@ async fn handle_shard_websocket_connection<S>(mut websocket: ws::WebSocket, mut
|
||||
}
|
||||
|
||||
// loop ended; give socket back to parent:
|
||||
websocket
|
||||
(tx_to_aggregator, websocket)
|
||||
}
|
||||
|
||||
/// This handles messages coming from a feed connection
|
||||
async fn handle_feed_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> ws::WebSocket
|
||||
async fn handle_feed_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_aggregator: S) -> (S, ws::WebSocket)
|
||||
where S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin
|
||||
{
|
||||
let (tx_to_feed_conn, mut rx_from_aggregator) = mpsc::channel(10);
|
||||
// unbounded channel so that slow feeds don't block aggregator progress:
|
||||
let (tx_to_feed_conn, mut rx_from_aggregator) = mpsc::unbounded();
|
||||
|
||||
// Tell the aggregator about this new connection, and give it a way to send messages to us:
|
||||
let init_msg = FromFeedWebsocket::Initialize {
|
||||
@@ -228,7 +233,7 @@ async fn handle_feed_websocket_connection<S>(mut websocket: ws::WebSocket, mut t
|
||||
};
|
||||
if let Err(e) = tx_to_aggregator.send(init_msg).await {
|
||||
log::error!("Error sending message to aggregator: {}", e);
|
||||
return websocket;
|
||||
return (tx_to_aggregator, websocket);
|
||||
}
|
||||
|
||||
// Loop, handling new messages from the shard or from the aggregator:
|
||||
@@ -290,5 +295,5 @@ async fn handle_feed_websocket_connection<S>(mut websocket: ws::WebSocket, mut t
|
||||
}
|
||||
|
||||
// loop ended; give socket back to parent:
|
||||
websocket
|
||||
(tx_to_aggregator, websocket)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user