relay_loop().await from main relay function (#829)

This commit is contained in:
Svyatoslav Nikolsky
2021-03-16 13:31:53 +03:00
committed by Bastian Köcher
parent 4cd6d128ea
commit c95b1eb970
16 changed files with 163 additions and 166 deletions
@@ -98,7 +98,7 @@ where
>, >,
P::Header: SourceHeader<C::BlockNumber>, P::Header: SourceHeader<C::BlockNumber>,
{ {
type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>>>>; type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>> + Send>>;
async fn best_finalized_block_number(&self) -> Result<P::Number, Error> { async fn best_finalized_block_number(&self) -> Result<P::Number, Error> {
// we **CAN** continue to relay finality proofs if source node is out of sync, because // we **CAN** continue to relay finality proofs if source node is out of sync, because
@@ -48,9 +48,7 @@ pub struct EthereumDeployContractParams {
} }
/// Deploy Bridge contract on Ethereum chain. /// Deploy Bridge contract on Ethereum chain.
pub fn run(params: EthereumDeployContractParams) { pub async fn run(params: EthereumDeployContractParams) {
let mut local_pool = futures::executor::LocalPool::new();
let EthereumDeployContractParams { let EthereumDeployContractParams {
eth_params, eth_params,
eth_sign, eth_sign,
@@ -61,7 +59,7 @@ pub fn run(params: EthereumDeployContractParams) {
eth_contract_code, eth_contract_code,
} = params; } = params;
let result = local_pool.run_until(async move { let result = async move {
let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params).await.map_err(RpcError::Substrate)?; let sub_client = SubstrateClient::<Rialto>::new(sub_params).await.map_err(RpcError::Substrate)?;
@@ -91,7 +89,7 @@ pub fn run(params: EthereumDeployContractParams) {
initial_set_id, initial_set_id,
initial_set, initial_set,
).await ).await
}); }.await;
if let Err(error) = result { if let Err(error) = result {
log::error!(target: "bridge", "{}", error); log::error!(target: "bridge", "{}", error);
@@ -282,19 +282,39 @@ impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget {
} }
/// Relay exchange transaction proof(s) to Substrate node. /// Relay exchange transaction proof(s) to Substrate node.
pub fn run(params: EthereumExchangeParams) { pub async fn run(params: EthereumExchangeParams) {
match params.mode { match params.mode {
ExchangeRelayMode::Single(eth_tx_hash) => run_single_transaction_relay(params, eth_tx_hash), ExchangeRelayMode::Single(eth_tx_hash) => {
ExchangeRelayMode::Auto(eth_start_with_block_number) => { let result = run_single_transaction_relay(params, eth_tx_hash).await;
run_auto_transactions_relay_loop(params, eth_start_with_block_number) match result {
Ok(_) => log::info!(
target: "bridge",
"Ethereum transaction {} proof has been successfully submitted to Substrate node",
eth_tx_hash,
),
Err(err) => log::error!(
target: "bridge",
"Error submitting Ethereum transaction {} proof to Substrate node: {}",
eth_tx_hash,
err,
),
}
} }
}; ExchangeRelayMode::Auto(eth_start_with_block_number) => {
let result = run_auto_transactions_relay_loop(params, eth_start_with_block_number).await;
if let Err(err) = result {
log::error!(
target: "bridge",
"Error auto-relaying Ethereum transactions proofs to Substrate node: {}",
err,
);
}
}
}
} }
/// Run single transaction proof relay and stop. /// Run single transaction proof relay and stop.
fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) { async fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) -> Result<(), String> {
let mut local_pool = futures::executor::LocalPool::new();
let EthereumExchangeParams { let EthereumExchangeParams {
eth_params, eth_params,
sub_params, sub_params,
@@ -303,43 +323,25 @@ fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H25
.. ..
} = params; } = params;
let result = local_pool.run_until(async move { let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?;
let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; let sub_client = SubstrateClient::<Rialto>::new(sub_params)
let sub_client = SubstrateClient::<Rialto>::new(sub_params) .await
.await .map_err(RpcError::Substrate)?;
.map_err(RpcError::Substrate)?;
let source = EthereumTransactionsSource { client: eth_client }; let source = EthereumTransactionsSource { client: eth_client };
let target = SubstrateTransactionsTarget { let target = SubstrateTransactionsTarget {
client: sub_client, client: sub_client,
sign_params: sub_sign, sign_params: sub_sign,
bridge_instance: instance, bridge_instance: instance,
}; };
relay_single_transaction_proof(&source, &target, eth_tx_hash).await relay_single_transaction_proof(&source, &target, eth_tx_hash).await
});
match result {
Ok(_) => {
log::info!(
target: "bridge",
"Ethereum transaction {} proof has been successfully submitted to Substrate node",
eth_tx_hash,
);
}
Err(err) => {
log::error!(
target: "bridge",
"Error submitting Ethereum transaction {} proof to Substrate node: {}",
eth_tx_hash,
err,
);
}
}
} }
/// Run auto-relay loop. async fn run_auto_transactions_relay_loop(
fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_with_block_number: Option<u64>) { params: EthereumExchangeParams,
eth_start_with_block_number: Option<u64>,
) -> Result<(), String> {
let EthereumExchangeParams { let EthereumExchangeParams {
eth_params, eth_params,
sub_params, sub_params,
@@ -349,46 +351,41 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi
.. ..
} = params; } = params;
let do_run_loop = move || -> Result<(), String> { let eth_client = EthereumClient::new(eth_params)
let eth_client = async_std::task::block_on(EthereumClient::new(eth_params)) .await
.map_err(|err| format!("Error starting Ethereum client: {:?}", err))?; .map_err(|err| format!("Error starting Ethereum client: {:?}", err))?;
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params)) let sub_client = SubstrateClient::<Rialto>::new(sub_params)
.map_err(|err| format!("Error starting Substrate client: {:?}", err))?; .await
.map_err(|err| format!("Error starting Substrate client: {:?}", err))?;
let eth_start_with_block_number = match eth_start_with_block_number { let eth_start_with_block_number = match eth_start_with_block_number {
Some(eth_start_with_block_number) => eth_start_with_block_number, Some(eth_start_with_block_number) => eth_start_with_block_number,
None => { None => {
async_std::task::block_on(sub_client.best_ethereum_finalized_block()) sub_client
.map_err(|err| { .best_ethereum_finalized_block()
format!( .await
"Error retrieving best finalized Ethereum block from Substrate node: {:?}", .map_err(|err| {
err format!(
) "Error retrieving best finalized Ethereum block from Substrate node: {:?}",
})? err
.0 )
} })?
}; .0
}
run_loop(
InMemoryStorage::new(eth_start_with_block_number),
EthereumTransactionsSource { client: eth_client },
SubstrateTransactionsTarget {
client: sub_client,
sign_params: sub_sign,
bridge_instance: instance,
},
metrics_params,
futures::future::pending(),
);
Ok(())
}; };
if let Err(err) = do_run_loop() { run_loop(
log::error!( InMemoryStorage::new(eth_start_with_block_number),
target: "bridge", EthereumTransactionsSource { client: eth_client },
"Error auto-relaying Ethereum transactions proofs to Substrate node: {}", SubstrateTransactionsTarget {
err, client: sub_client,
); sign_params: sub_sign,
} bridge_instance: instance,
},
metrics_params,
futures::future::pending(),
)
.await;
Ok(())
} }
@@ -42,9 +42,7 @@ pub struct EthereumExchangeSubmitParams {
} }
/// Submit single Ethereum -> Substrate exchange transaction. /// Submit single Ethereum -> Substrate exchange transaction.
pub fn run(params: EthereumExchangeSubmitParams) { pub async fn run(params: EthereumExchangeSubmitParams) {
let mut local_pool = futures::executor::LocalPool::new();
let EthereumExchangeSubmitParams { let EthereumExchangeSubmitParams {
eth_params, eth_params,
eth_sign, eth_sign,
@@ -53,7 +51,7 @@ pub fn run(params: EthereumExchangeSubmitParams) {
sub_recipient, sub_recipient,
} = params; } = params;
let result: Result<_, String> = local_pool.run_until(async move { let result: Result<_, String> = async move {
let eth_client = EthereumClient::new(eth_params) let eth_client = EthereumClient::new(eth_params)
.await .await
.map_err(|err| format!("error connecting to Ethereum node: {:?}", err))?; .map_err(|err| format!("error connecting to Ethereum node: {:?}", err))?;
@@ -94,7 +92,8 @@ pub fn run(params: EthereumExchangeSubmitParams) {
.map_err(|err| format!("error submitting transaction: {:?}", err))?; .map_err(|err| format!("error submitting transaction: {:?}", err))?;
Ok(eth_tx_unsigned) Ok(eth_tx_unsigned)
}); }
.await;
match result { match result {
Ok(eth_tx_unsigned) => { Ok(eth_tx_unsigned) => {
@@ -248,7 +248,7 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
} }
/// Run Ethereum headers synchronization. /// Run Ethereum headers synchronization.
pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
let EthereumSyncParams { let EthereumSyncParams {
eth_params, eth_params,
sub_params, sub_params,
@@ -278,7 +278,8 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
sync_params, sync_params,
metrics_params, metrics_params,
futures::future::pending(), futures::future::pending(),
); )
.await;
Ok(()) Ok(())
} }
+12 -16
View File
@@ -50,6 +50,10 @@ fn main() {
let yaml = clap::load_yaml!("cli.yml"); let yaml = clap::load_yaml!("cli.yml");
let matches = clap::App::from_yaml(yaml).get_matches(); let matches = clap::App::from_yaml(yaml).get_matches();
async_std::task::block_on(run_command(&matches));
}
async fn run_command(matches: &clap::ArgMatches<'_>) {
match matches.subcommand() { match matches.subcommand() {
("eth-to-sub", Some(eth_to_sub_matches)) => { ("eth-to-sub", Some(eth_to_sub_matches)) => {
log::info!(target: "bridge", "Starting ETH ➡ SUB relay."); log::info!(target: "bridge", "Starting ETH ➡ SUB relay.");
@@ -60,6 +64,7 @@ fn main() {
return; return;
} }
}) })
.await
.is_err() .is_err()
{ {
log::error!(target: "bridge", "Unable to get Substrate genesis block for Ethereum sync."); log::error!(target: "bridge", "Unable to get Substrate genesis block for Ethereum sync.");
@@ -74,6 +79,7 @@ fn main() {
return; return;
} }
}) })
.await
.is_err() .is_err()
{ {
log::error!(target: "bridge", "Unable to get Substrate genesis block for Substrate sync."); log::error!(target: "bridge", "Unable to get Substrate genesis block for Substrate sync.");
@@ -87,7 +93,8 @@ fn main() {
log::error!(target: "bridge", "Error during contract deployment: {}", err); log::error!(target: "bridge", "Error during contract deployment: {}", err);
return; return;
} }
}); })
.await;
} }
("eth-submit-exchange-tx", Some(eth_exchange_submit_matches)) => { ("eth-submit-exchange-tx", Some(eth_exchange_submit_matches)) => {
log::info!(target: "bridge", "Submitting ETH ➡ SUB exchange transaction."); log::info!(target: "bridge", "Submitting ETH ➡ SUB exchange transaction.");
@@ -97,7 +104,8 @@ fn main() {
log::error!(target: "bridge", "Error submitting Eethereum exchange transaction: {}", err); log::error!(target: "bridge", "Error submitting Eethereum exchange transaction: {}", err);
return; return;
} }
}); })
.await;
} }
("eth-exchange-sub", Some(eth_exchange_matches)) => { ("eth-exchange-sub", Some(eth_exchange_matches)) => {
log::info!(target: "bridge", "Starting ETH ➡ SUB exchange transactions relay."); log::info!(target: "bridge", "Starting ETH ➡ SUB exchange transactions relay.");
@@ -107,7 +115,8 @@ fn main() {
log::error!(target: "bridge", "Error relaying Ethereum transactions proofs: {}", err); log::error!(target: "bridge", "Error relaying Ethereum transactions proofs: {}", err);
return; return;
} }
}); })
.await;
} }
("", _) => { ("", _) => {
log::error!(target: "bridge", "No subcommand specified"); log::error!(target: "bridge", "No subcommand specified");
@@ -398,16 +407,3 @@ fn parse_hex_argument(matches: &clap::ArgMatches, arg: &str) -> Result<Option<Ve
None => Ok(None), None => Ok(None),
} }
} }
#[cfg(test)]
mod tests {
// Details: https://github.com/paritytech/parity-bridges-common/issues/118
#[test]
fn async_std_sleep_works() {
let mut local_pool = futures::executor::LocalPool::new();
local_pool.run_until(async move {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
});
}
}
@@ -163,7 +163,7 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
} }
/// Run Substrate headers synchronization. /// Run Substrate headers synchronization.
pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
let SubstrateSyncParams { let SubstrateSyncParams {
sub_params, sub_params,
eth_params, eth_params,
@@ -188,7 +188,8 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
sync_params, sync_params,
metrics_params, metrics_params,
futures::future::pending(), futures::future::pending(),
); )
.await;
Ok(()) Ok(())
} }
@@ -79,7 +79,7 @@ impl<BlockNumber: Clone + Copy> TransactionProofsRelayStorage for InMemoryStorag
} }
/// Run proofs synchronization. /// Run proofs synchronization.
pub fn run<P: TransactionProofPipeline>( pub async fn run<P: TransactionProofPipeline>(
storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>, storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>, source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>, target_client: impl TargetClient<P>,
@@ -119,7 +119,8 @@ pub fn run<P: TransactionProofPipeline>(
exit_signal.clone(), exit_signal.clone(),
) )
}, },
); )
.await;
} }
/// Run proofs synchronization. /// Run proofs synchronization.
@@ -91,7 +91,7 @@ pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
} }
/// Run finality proofs synchronization loop. /// Run finality proofs synchronization loop.
pub fn run<P: FinalitySyncPipeline>( pub async fn run<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>, source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>, target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams, sync_params: FinalitySyncParams,
@@ -132,7 +132,8 @@ pub fn run<P: FinalitySyncPipeline>(
exit_signal.clone(), exit_signal.clone(),
) )
}, },
); )
.await;
} }
/// Unjustified headers container. Ordered by header number. /// Unjustified headers container. Ordered by header number.
@@ -112,7 +112,7 @@ impl<P: HeadersSyncPipeline> SyncMaintain<P> for () {}
/// Run headers synchronization. /// Run headers synchronization.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>( pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
source_client: impl SourceClient<P>, source_client: impl SourceClient<P>,
source_tick: Duration, source_tick: Duration,
target_client: TC, target_client: TC,
@@ -159,7 +159,8 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
exit_signal.clone(), exit_signal.clone(),
) )
}, },
); )
.await;
} }
/// Run headers synchronization. /// Run headers synchronization.
@@ -206,7 +206,7 @@ pub struct ClientsState<P: MessageLane> {
} }
/// Run message lane service loop. /// Run message lane service loop.
pub fn run<P: MessageLane>( pub async fn run<P: MessageLane>(
params: Params, params: Params,
source_client: impl SourceClient<P>, source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>, target_client: impl TargetClient<P>,
@@ -251,7 +251,8 @@ pub fn run<P: MessageLane>(
exit_signal.clone(), exit_signal.clone(),
) )
}, },
); )
.await;
} }
/// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. /// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received.
+36 -40
View File
@@ -37,7 +37,7 @@ pub trait Client: Clone + Send + Sync {
/// This function represents an outer loop, which in turn calls provided `loop_run` function to do /// This function represents an outer loop, which in turn calls provided `loop_run` function to do
/// actual job. When `loop_run` returns, this outer loop reconnects to failed client (source, /// actual job. When `loop_run` returns, this outer loop reconnects to failed client (source,
/// target or both) and calls `loop_run` again. /// target or both) and calls `loop_run` again.
pub fn run<SC: Client, TC: Client, R, F>( pub async fn run<SC: Client, TC: Client, R, F>(
reconnect_delay: Duration, reconnect_delay: Duration,
mut source_client: SC, mut source_client: SC,
mut target_client: TC, mut target_client: TC,
@@ -46,50 +46,46 @@ pub fn run<SC: Client, TC: Client, R, F>(
R: Fn(SC, TC) -> F, R: Fn(SC, TC) -> F,
F: Future<Output = Result<(), FailedClient>>, F: Future<Output = Result<(), FailedClient>>,
{ {
let mut local_pool = futures::executor::LocalPool::new(); loop {
let result = loop_run(source_client.clone(), target_client.clone()).await;
local_pool.run_until(async move { match result {
loop { Ok(()) => break,
let result = loop_run(source_client.clone(), target_client.clone()).await; Err(failed_client) => loop {
async_std::task::sleep(reconnect_delay).await;
match result { if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
Ok(()) => break, match source_client.reconnect().await {
Err(failed_client) => loop { Ok(()) => (),
async_std::task::sleep(reconnect_delay).await; Err(error) => {
if failed_client == FailedClient::Both || failed_client == FailedClient::Source { log::warn!(
match source_client.reconnect().await { target: "bridge",
Ok(()) => (), "Failed to reconnect to source client. Going to retry in {}s: {:?}",
Err(error) => { reconnect_delay.as_secs(),
log::warn!( error,
target: "bridge", );
"Failed to reconnect to source client. Going to retry in {}s: {:?}", continue;
reconnect_delay.as_secs(),
error,
);
continue;
}
} }
} }
if failed_client == FailedClient::Both || failed_client == FailedClient::Target { }
match target_client.reconnect().await { if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
Ok(()) => (), match target_client.reconnect().await {
Err(error) => { Ok(()) => (),
log::warn!( Err(error) => {
target: "bridge", log::warn!(
"Failed to reconnect to target client. Going to retry in {}s: {:?}", target: "bridge",
reconnect_delay.as_secs(), "Failed to reconnect to target client. Going to retry in {}s: {:?}",
error, reconnect_delay.as_secs(),
); error,
continue; );
} continue;
} }
} }
}
break; break;
}, },
}
log::debug!(target: "bridge", "Restarting relay loop");
} }
});
log::debug!(target: "bridge", "Restarting relay loop");
}
} }
@@ -126,5 +126,6 @@ pub async fn run<SourceChain, TargetChain, P>(
}, },
metrics_params, metrics_params,
futures::future::pending(), futures::future::pending(),
); )
.await;
} }
@@ -125,7 +125,7 @@ type MillauSourceClient = SubstrateMessagesSource<Millau, MillauMessagesToRialto
type RialtoTargetClient = SubstrateMessagesTarget<Rialto, MillauMessagesToRialto>; type RialtoTargetClient = SubstrateMessagesTarget<Rialto, MillauMessagesToRialto>;
/// Run Millau-to-Rialto messages sync. /// Run Millau-to-Rialto messages sync.
pub fn run( pub async fn run(
millau_client: MillauClient, millau_client: MillauClient,
millau_sign: MillauSigningParams, millau_sign: MillauSigningParams,
rialto_client: RialtoClient, rialto_client: RialtoClient,
@@ -185,5 +185,6 @@ pub fn run(
RialtoTargetClient::new(rialto_client, lane, lane_id, MILLAU_BRIDGE_INSTANCE), RialtoTargetClient::new(rialto_client, lane, lane_id, MILLAU_BRIDGE_INSTANCE),
metrics_params, metrics_params,
futures::future::pending(), futures::future::pending(),
); )
.await;
} }
@@ -198,7 +198,8 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> {
rialto_sign, rialto_sign,
lane.into(), lane.into(),
prometheus_params.into(), prometheus_params.into(),
); )
.await;
} }
cli::RelayMessages::RialtoToMillau { cli::RelayMessages::RialtoToMillau {
rialto, rialto,
@@ -220,7 +221,8 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> {
millau_sign, millau_sign,
lane.into(), lane.into(),
prometheus_params.into(), prometheus_params.into(),
); )
.await;
} }
} }
Ok(()) Ok(())
@@ -125,7 +125,7 @@ type RialtoSourceClient = SubstrateMessagesSource<Rialto, RialtoMessagesToMillau
type MillauTargetClient = SubstrateMessagesTarget<Millau, RialtoMessagesToMillau>; type MillauTargetClient = SubstrateMessagesTarget<Millau, RialtoMessagesToMillau>;
/// Run Rialto-to-Millau messages sync. /// Run Rialto-to-Millau messages sync.
pub fn run( pub async fn run(
rialto_client: RialtoClient, rialto_client: RialtoClient,
rialto_sign: RialtoSigningParams, rialto_sign: RialtoSigningParams,
millau_client: MillauClient, millau_client: MillauClient,
@@ -184,5 +184,6 @@ pub fn run(
MillauTargetClient::new(millau_client, lane, lane_id, RIALTO_BRIDGE_INSTANCE), MillauTargetClient::new(millau_client, lane, lane_id, RIALTO_BRIDGE_INSTANCE),
metrics_params, metrics_params,
futures::future::pending(), futures::future::pending(),
); )
.await;
} }