Always pass port to jsonrpsee WebSocket client (#2339)

* Always pass port to jsonrpsee

* Remove useless host check

* Do not silently drop
This commit is contained in:
Sebastian Kunert
2023-03-20 15:05:06 +01:00
committed by GitHub
parent 45f6fefc82
commit ccfe5fe08c
@@ -64,6 +64,25 @@ pub struct ReconnectingWsClient {
to_worker_channel: TokioSender<RpcDispatcherMessage>, to_worker_channel: TokioSender<RpcDispatcherMessage>,
} }
/// Format url and force addition of a port
fn url_to_string_with_port(url: Url) -> Option<String> {
// This is already validated on CLI side, just defensive here
if (url.scheme() != "ws" && url.scheme() != "wss") || url.host_str().is_none() {
tracing::warn!(target: LOG_TARGET, ?url, "Non-WebSocket URL or missing host.");
return None
}
// Either we have a user-supplied port or use the default for 'ws' or 'wss' here
Some(format!(
"{}://{}:{}{}{}",
url.scheme(),
url.host_str()?,
url.port_or_known_default()?,
url.path(),
url.query().map(|query| format!("?{}", query)).unwrap_or_default()
))
}
impl ReconnectingWsClient { impl ReconnectingWsClient {
/// Create a new websocket client frontend. /// Create a new websocket client frontend.
pub async fn new(urls: Vec<Url>, task_manager: &mut TaskManager) -> RelayChainResult<Self> { pub async fn new(urls: Vec<Url>, task_manager: &mut TaskManager) -> RelayChainResult<Self> {
@@ -144,7 +163,7 @@ impl ReconnectingWsClient {
/// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners. /// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners.
struct ReconnectingWebsocketWorker { struct ReconnectingWebsocketWorker {
ws_urls: Vec<Url>, ws_urls: Vec<String>,
/// Communication channel with the RPC client /// Communication channel with the RPC client
client_receiver: TokioReceiver<RpcDispatcherMessage>, client_receiver: TokioReceiver<RpcDispatcherMessage>,
@@ -176,7 +195,7 @@ fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>
/// and reconnections. /// and reconnections.
#[derive(Debug)] #[derive(Debug)]
struct ClientManager { struct ClientManager {
urls: Vec<Url>, urls: Vec<String>,
active_client: Arc<JsonRpcClient>, active_client: Arc<JsonRpcClient>,
active_index: usize, active_index: usize,
} }
@@ -189,7 +208,7 @@ struct RelayChainSubscriptions {
/// Try to find a new RPC server to connect to. /// Try to find a new RPC server to connect to.
async fn connect_next_available_rpc_server( async fn connect_next_available_rpc_server(
urls: &Vec<Url>, urls: &Vec<String>,
starting_position: usize, starting_position: usize,
) -> Result<(usize, Arc<JsonRpcClient>), ()> { ) -> Result<(usize, Arc<JsonRpcClient>), ()> {
tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server."); tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server.");
@@ -198,18 +217,19 @@ async fn connect_next_available_rpc_server(
tracing::info!( tracing::info!(
target: LOG_TARGET, target: LOG_TARGET,
index, index,
?url, url,
"Trying to connect to next external relaychain node.", "Trying to connect to next external relaychain node.",
); );
if let Ok(ws_client) = WsClientBuilder::default().build(url).await { match WsClientBuilder::default().build(&url).await {
return Ok((index, Arc::new(ws_client))) Ok(ws_client) => return Ok((index, Arc::new(ws_client))),
Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."),
}; };
} }
Err(()) Err(())
} }
impl ClientManager { impl ClientManager {
pub async fn new(urls: Vec<Url>) -> Result<Self, ()> { pub async fn new(urls: Vec<String>) -> Result<Self, ()> {
if urls.is_empty() { if urls.is_empty() {
return Err(()) return Err(())
} }
@@ -325,6 +345,8 @@ impl ReconnectingWebsocketWorker {
async fn new( async fn new(
urls: Vec<Url>, urls: Vec<Url>,
) -> (ReconnectingWebsocketWorker, TokioSender<RpcDispatcherMessage>) { ) -> (ReconnectingWebsocketWorker, TokioSender<RpcDispatcherMessage>) {
let urls = urls.into_iter().filter_map(url_to_string_with_port).collect();
let (tx, rx) = tokio_channel(100); let (tx, rx) = tokio_channel(100);
let worker = ReconnectingWebsocketWorker { let worker = ReconnectingWebsocketWorker {
ws_urls: urls, ws_urls: urls,
@@ -518,3 +540,36 @@ impl ReconnectingWebsocketWorker {
} }
} }
} }
#[cfg(test)]
mod test {
use super::url_to_string_with_port;
use url::Url;
#[test]
fn url_to_string_works() {
let url = Url::parse("wss://something/path").unwrap();
assert_eq!(Some("wss://something:443/path".to_string()), url_to_string_with_port(url));
let url = Url::parse("ws://something/path").unwrap();
assert_eq!(Some("ws://something:80/path".to_string()), url_to_string_with_port(url));
let url = Url::parse("wss://something:100/path").unwrap();
assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));
let url = Url::parse("wss://something:100/path").unwrap();
assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));
let url = Url::parse("wss://something/path?query=yes").unwrap();
assert_eq!(
Some("wss://something:443/path?query=yes".to_string()),
url_to_string_with_port(url)
);
let url = Url::parse("wss://something:9090/path?query=yes").unwrap();
assert_eq!(
Some("wss://something:9090/path?query=yes".to_string()),
url_to_string_with_port(url)
);
}
}