diff --git a/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs b/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs index 9184ab7465..05d1c23bb7 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs @@ -64,6 +64,25 @@ pub struct ReconnectingWsClient { to_worker_channel: TokioSender, } +/// Format url and force addition of a port +fn url_to_string_with_port(url: Url) -> Option { + // This is already validated on CLI side, just defensive here + if (url.scheme() != "ws" && url.scheme() != "wss") || url.host_str().is_none() { + tracing::warn!(target: LOG_TARGET, ?url, "Non-WebSocket URL or missing host."); + return None + } + + // Either we have a user-supplied port or use the default for 'ws' or 'wss' here + Some(format!( + "{}://{}:{}{}{}", + url.scheme(), + url.host_str()?, + url.port_or_known_default()?, + url.path(), + url.query().map(|query| format!("?{}", query)).unwrap_or_default() + )) +} + impl ReconnectingWsClient { /// Create a new websocket client frontend. pub async fn new(urls: Vec, task_manager: &mut TaskManager) -> RelayChainResult { @@ -144,7 +163,7 @@ impl ReconnectingWsClient { /// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners. struct ReconnectingWebsocketWorker { - ws_urls: Vec, + ws_urls: Vec, /// Communication channel with the RPC client client_receiver: TokioReceiver, @@ -176,7 +195,7 @@ fn distribute_header(header: RelayHeader, senders: &mut Vec> /// and reconnections. #[derive(Debug)] struct ClientManager { - urls: Vec, + urls: Vec, active_client: Arc, active_index: usize, } @@ -189,7 +208,7 @@ struct RelayChainSubscriptions { /// Try to find a new RPC server to connect to. async fn connect_next_available_rpc_server( - urls: &Vec, + urls: &Vec, starting_position: usize, ) -> Result<(usize, Arc), ()> { tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server."); @@ -198,18 +217,19 @@ async fn connect_next_available_rpc_server( tracing::info!( target: LOG_TARGET, index, - ?url, + url, "Trying to connect to next external relaychain node.", ); - if let Ok(ws_client) = WsClientBuilder::default().build(url).await { - return Ok((index, Arc::new(ws_client))) + match WsClientBuilder::default().build(&url).await { + Ok(ws_client) => return Ok((index, Arc::new(ws_client))), + Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."), }; } Err(()) } impl ClientManager { - pub async fn new(urls: Vec) -> Result { + pub async fn new(urls: Vec) -> Result { if urls.is_empty() { return Err(()) } @@ -325,6 +345,8 @@ impl ReconnectingWebsocketWorker { async fn new( urls: Vec, ) -> (ReconnectingWebsocketWorker, TokioSender) { + let urls = urls.into_iter().filter_map(url_to_string_with_port).collect(); + let (tx, rx) = tokio_channel(100); let worker = ReconnectingWebsocketWorker { ws_urls: urls, @@ -518,3 +540,36 @@ impl ReconnectingWebsocketWorker { } } } + +#[cfg(test)] +mod test { + use super::url_to_string_with_port; + use url::Url; + + #[test] + fn url_to_string_works() { + let url = Url::parse("wss://something/path").unwrap(); + assert_eq!(Some("wss://something:443/path".to_string()), url_to_string_with_port(url)); + + let url = Url::parse("ws://something/path").unwrap(); + assert_eq!(Some("ws://something:80/path".to_string()), url_to_string_with_port(url)); + + let url = Url::parse("wss://something:100/path").unwrap(); + assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url)); + + let url = Url::parse("wss://something:100/path").unwrap(); + assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url)); + + let url = Url::parse("wss://something/path?query=yes").unwrap(); + assert_eq!( + Some("wss://something:443/path?query=yes".to_string()), + url_to_string_with_port(url) + ); + + let url = Url::parse("wss://something:9090/path?query=yes").unwrap(); + assert_eq!( + Some("wss://something:9090/path?query=yes".to_string()), + url_to_string_with_port(url) + ); + } +}