Expose node's IP address via feed (#491)

* Fix typos

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Extend `feed::LocatedNode` message with optional IP address

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Expose IpAddr from locator task

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Expose CLI flag to handle IP

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Consider Option<String> for IP address

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Add node's IP directly to the Node's details

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Adjust testing and serialize node's ip address

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* frontend: Propagate IP address for deserialization purposes

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* core: Clarify the CLI flag documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2022-08-17 15:35:36 +03:00
committed by GitHub
parent ad21d0cff0
commit 09b44ad00f
14 changed files with 46 additions and 39 deletions
+1 -1
View File
@@ -76,7 +76,7 @@ where
);
}
// Just a little ceremony we need to go to to return the correct response key:
// Just a little ceremony to return the correct response key:
let mut accept_key_buf = [0; 32];
let accept_key = generate_websocket_accept_key(key.as_bytes(), &mut accept_key_buf);
+3 -2
View File
@@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! This is the internal represenation of telemetry messages sent from nodes.
//! This is the internal representation of telemetry messages sent from nodes.
//! There is a separate JSON representation of these types, because internally we want to be
//! able to serialize these messages to bincode, and various serde attribtues aren't compatible
//! able to serialize these messages to bincode, and various serde attributes aren't compatible
//! with this, hence this separate internal representation.
use crate::node_types::{Block, BlockHash, BlockNumber, NodeDetails};
@@ -159,6 +159,7 @@ mod tests {
network_id: ArrayString::new(),
startup_time: None,
sysinfo: None,
ip: Some("127.0.0.1".into()),
},
}),
});
+1
View File
@@ -42,6 +42,7 @@ pub struct NodeDetails {
pub target_arch: Option<Box<str>>,
pub target_env: Option<Box<str>>,
pub sysinfo: Option<NodeSysInfo>,
pub ip: Option<Box<str>>,
}
/// Hardware and software information for the node.
@@ -44,6 +44,8 @@ pub struct AggregatorOpts {
/// How many nodes from third party chains are allowed to connect
/// before we prevent connections from them.
pub max_third_party_nodes: usize,
/// Flag to expose the IP addresses of all connected nodes to the feed subscribers.
pub expose_node_ips: bool,
}
struct AggregatorInternal {
@@ -76,9 +78,7 @@ impl Aggregator {
tokio::spawn(Aggregator::handle_messages(
rx_from_external,
tx_to_locator,
opts.max_queue_len,
opts.denylist,
opts.max_third_party_nodes,
opts,
));
// Return a handle to our aggregator:
@@ -95,18 +95,11 @@ impl Aggregator {
async fn handle_messages(
rx_from_external: flume::Receiver<inner_loop::ToAggregator>,
tx_to_aggregator: flume::Sender<(NodeId, IpAddr)>,
max_queue_len: usize,
denylist: Vec<String>,
max_third_party_nodes: usize,
opts: AggregatorOpts,
) {
inner_loop::InnerLoop::new(
tx_to_aggregator,
denylist,
max_queue_len,
max_third_party_nodes,
)
.handle(rx_from_external)
.await;
inner_loop::InnerLoop::new(tx_to_aggregator, opts)
.handle(rx_from_external)
.await;
}
/// Gather metrics from our aggregator loop
@@ -16,8 +16,8 @@
use super::aggregator::ConnId;
use crate::feed_message::{self, FeedMessageSerializer};
use crate::find_location;
use crate::state::{self, NodeId, State};
use crate::{find_location, AggregatorOpts};
use bimap::BiMap;
use common::{
internal_messages::{self, MuteReason, ShardNodeId},
@@ -144,7 +144,7 @@ impl FromStr for FromFeedWebsocket {
}
}
/// The aggregator can these messages back to a feed connection.
/// The aggregator can send these messages back to a feed connection.
#[derive(Clone, Debug)]
pub enum ToFeedWebsocket {
Bytes(bytes::Bytes),
@@ -173,24 +173,23 @@ pub struct InnerLoop {
/// How big can the queue of messages coming in to the aggregator get before messages
/// are prioritised and dropped to try and get back on track.
max_queue_len: usize,
/// Flag to expose the IP addresses of all connected nodes to the feed subscribers.
expose_node_ips: bool,
}
impl InnerLoop {
/// Create a new inner loop handler with the various state it needs.
pub fn new(
tx_to_locator: flume::Sender<(NodeId, IpAddr)>,
denylist: Vec<String>,
max_queue_len: usize,
max_third_party_nodes: usize,
) -> Self {
pub fn new(tx_to_locator: flume::Sender<(NodeId, IpAddr)>, opts: AggregatorOpts) -> Self {
InnerLoop {
node_state: State::new(denylist, max_third_party_nodes),
node_state: State::new(opts.denylist, opts.max_third_party_nodes),
node_ids: BiMap::new(),
feed_channels: HashMap::new(),
shard_channels: HashMap::new(),
chain_to_feed_conn_ids: MultiMapUnique::new(),
tx_to_locator,
max_queue_len,
max_queue_len: opts.max_queue_len,
expose_node_ips: opts.expose_node_ips,
}
}
@@ -323,9 +322,11 @@ impl InnerLoop {
FromShardWebsocket::Add {
local_id,
ip,
node,
mut node,
genesis_hash,
} => {
// Conditionally modify the node's details to include the IP address.
node.ip = self.expose_node_ips.then_some(ip.to_string().into());
match self.node_state.add_node(genesis_hash, node) {
state::AddNodeResult::ChainOnDenyList => {
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
@@ -376,7 +377,7 @@ impl InnerLoop {
));
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all);
// Ask for the grographical location of the node.
// Ask for the geographical location of the node.
let _ = self.tx_to_locator.send((node_id, ip));
}
}
@@ -548,7 +549,7 @@ impl InnerLoop {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes));
}
// Actually make a note of the new chain subsciption:
// Actually make a note of the new chain subscription:
let new_genesis_hash = new_chain.genesis_hash();
self.chain_to_feed_conn_ids
.insert(new_genesis_hash, feed_conn_id);
+2 -1
View File
@@ -45,7 +45,7 @@ where
}
pub struct FeedMessageSerializer {
/// Current buffer,
/// Current buffer.
buffer: Vec<u8>,
}
@@ -189,6 +189,7 @@ impl FeedMessageWrite for AddedNode<'_> {
&details.version,
&details.validator,
&details.network_id,
&details.ip,
);
ser.write(&(
+1 -1
View File
@@ -73,7 +73,7 @@ where
}
/// This struct can be used to make location requests, given
/// an IPV4 address.
/// an IPV4 or IPV6 address.
#[derive(Debug, Clone)]
struct Locator {
city: Arc<maxminddb::Reader<&'static [u8]>>,
+4
View File
@@ -82,6 +82,9 @@ struct Opts {
/// How many nodes from third party chains are allowed to connect before we prevent connections from them.
#[structopt(long, default_value = "1000")]
max_third_party_nodes: usize,
/// Flag to expose the IP addresses of all connected nodes to the feed subscribers.
#[structopt(long)]
pub expose_node_ips: bool,
}
fn main() {
@@ -132,6 +135,7 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()>
max_queue_len: aggregator_queue_len,
denylist: opts.denylist,
max_third_party_nodes: opts.max_third_party_nodes,
expose_node_ips: opts.expose_node_ips,
},
)
.await?;
+4 -3
View File
@@ -41,11 +41,11 @@ impl NodeId {
}
}
/// Our state constains node and chain information
/// Our state contains node and chain information
pub struct State {
chains: DenseMap<ChainId, Chain>,
// Find the right chain given various details.
/// Find the right chain given various details.
chains_by_genesis_hash: HashMap<BlockHash, ChainId>,
/// Chain labels that we do not want to allow connecting.
@@ -56,7 +56,7 @@ pub struct State {
max_third_party_nodes: usize,
}
/// Adding a node to a chain leads to this node_idult
/// Adding a node to a chain leads to this result.
pub enum AddNodeResult<'a> {
/// The chain is on the "deny list", so we can't add the node
ChainOnDenyList,
@@ -300,6 +300,7 @@ mod test {
network_id: NetworkId::new(),
startup_time: None,
sysinfo: None,
ip: None,
}
}
+1 -1
View File
@@ -105,7 +105,7 @@ impl Aggregator {
pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = flume::bounded(10);
// Establish a resiliant connection to the core (this retries as needed):
// Establish a resilient connection to the core (this retries as needed):
let (tx_to_telemetry_core, rx_from_telemetry_core) =
create_ws_connection_to_core(telemetry_uri).await;
@@ -248,6 +248,7 @@ pub struct NodeDetails {
pub target_arch: Option<Box<str>>,
pub target_env: Option<Box<str>>,
pub sysinfo: Option<NodeSysInfo>,
pub ip: Option<Box<str>>,
}
impl From<NodeDetails> for node_types::NodeDetails {
@@ -280,6 +281,7 @@ impl From<NodeDetails> for node_types::NodeDetails {
target_arch: details.target_arch,
target_env: details.target_env,
sysinfo: details.sysinfo.map(|sysinfo| sysinfo.into()),
ip: details.ip,
}
}
}
+1 -1
View File
@@ -279,7 +279,7 @@ where
let mut stale_interval = tokio::time::interval(stale_node_timeout / 2);
// Our main select loop atomically receives and handles telemetry messages from the node,
// and periodically checks for stale connections to keep our ndoe state tidy.
// and periodically checks for stale connections to keep our node state tidy.
loop {
tokio::select! {
// We periodically check for stale message IDs and remove nodes associated with
+3 -1
View File
@@ -134,6 +134,7 @@ pub struct NodeDetails {
pub version: String,
pub validator: Option<String>,
pub network_id: Option<String>,
pub ip: Option<String>,
}
impl FeedMessage {
@@ -185,7 +186,7 @@ impl FeedMessage {
3 => {
let (
node_id,
(name, implementation, version, validator, network_id),
(name, implementation, version, validator, network_id, ip),
stats,
io,
hardware,
@@ -205,6 +206,7 @@ impl FeedMessage {
version,
validator,
network_id,
ip,
},
stats,
block_details,
+2 -1
View File
@@ -55,7 +55,8 @@ export type NodeDetails = [
NodeImplementation,
NodeVersion,
Maybe<Address>,
Maybe<NetworkId>
Maybe<NetworkId>,
Maybe<string>
];
export type NodeStats = [PeerCount, TransactionCount];
export type NodeIO = [Array<Bytes>];