Switch to new RPC interface (#131)

* Move EthereumRpc implementation to Eth client

* Move SubstrateRpc implementation to SubstrateClient

* Update deploy_contract to use new RPC interface

* Fix some types in the Substrate client

* Swap out method bodies in Eth sync loop

* Swap out method bodies in Substrate sync loop

* Remove Client from SourceClient trait return types

* Remove Client from TargetClient trait return types

* Remove client from Source select! arms

* Remove client from Target select! arms

* Add missing mutable refs in Substrate client

* Use mutable references in Source/Target Client traits

* Try and use mutable references in Source/Client trait implementations

* Handle errors more gracefully

* Remove unused imports

* Remove dead_code and unused_variables lints

* Remove usage of `jsonrpsee::RawClient`

By using a `jsonrpsee::Client` we are able to remove all the shared
mutable references required when interacting with the RPC server. This
is convenient as trying to sharing mutable references in code that uses
async/await is a bit of a pain.

However, using a `Client` instead of a `RawClient` is not yet supported
by the `jsonrpsee::rpc_api` macro, so a fork must be used for the moment.

* Clean up dead code and warnings

* Clean up higher level RPCs

Some of the RPCs that were "high level" didn't necessarily belong
as part of the trait, so they were removed.

* Use positional parameters for RPCs

Both Substrate and Ethereum's RPCs use positional (array) parameters,
so in order to be compatible with both we need to make sure that
our API is defined with positional paramters in mind.

* Rename argument for eth_getBlockByNumber

* Remove some unecessary Ok-wraps

* Process client requests synchonously

Before the refactoring the sync loop would wait until a client finished
handling a request before issuing another one. This behaviour was
inadvertently changed during the refactoring leading to race conditions.
This commit makes sure that the previous behaviour is respected.

* Reduce the errors that are considered a connection error

* Only decode bridge contract once

* Set genesis_config at RPC client startup

* Fetch genesis hash in SubstrateRpcClient::new()

* Move Decode error into SubstrateNodeError

* Suppress warnings caused by `rpc_api!`

* Implement From RpcError for String

* Handle Substrate client initalization errors more gracefully

* Remove match in favour of ?

Co-authored-by: Svyatoslav Nikolsky <svyatonik@gmail.com>
This commit is contained in:
Hernando Castano
2020-06-23 16:55:51 -04:00
committed by Bastian Köcher
parent ba3b8537a5
commit ea45fa8da7
11 changed files with 856 additions and 1248 deletions
+127 -116
View File
@@ -43,11 +43,6 @@ const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// reconnection again.
const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
/// Type alias for all SourceClient futures.
pub type OwnedSourceFutureOutput<Client, P, T> = (Client, Result<T, <Client as SourceClient<P>>::Error>);
/// Type alias for all TargetClient futures.
pub type OwnedTargetFutureOutput<Client, P, T> = (Client, Result<T, <Client as TargetClient<P>>::Error>);
/// Source client trait.
#[async_trait]
pub trait SourceClient<P: HeadersSyncPipeline>: Sized {
@@ -55,26 +50,26 @@ pub trait SourceClient<P: HeadersSyncPipeline>: Sized {
type Error: std::fmt::Debug + MaybeConnectionError;
/// Get best block number.
async fn best_block_number(self) -> OwnedSourceFutureOutput<Self, P, P::Number>;
async fn best_block_number(&self) -> Result<P::Number, Self::Error>;
/// Get header by hash.
async fn header_by_hash(self, hash: P::Hash) -> OwnedSourceFutureOutput<Self, P, P::Header>;
async fn header_by_hash(&self, hash: P::Hash) -> Result<P::Header, Self::Error>;
/// Get canonical header by number.
async fn header_by_number(self, number: P::Number) -> OwnedSourceFutureOutput<Self, P, P::Header>;
async fn header_by_number(&self, number: P::Number) -> Result<P::Header, Self::Error>;
/// Get completion data by header hash.
async fn header_completion(
self,
&self,
id: HeaderId<P::Hash, P::Number>,
) -> OwnedSourceFutureOutput<Self, P, (HeaderId<P::Hash, P::Number>, Option<P::Completion>)>;
) -> Result<(HeaderId<P::Hash, P::Number>, Option<P::Completion>), Self::Error>;
/// Get extra data by header hash.
async fn header_extra(
self,
&self,
id: HeaderId<P::Hash, P::Number>,
header: QueuedHeader<P>,
) -> OwnedSourceFutureOutput<Self, P, (HeaderId<P::Hash, P::Number>, P::Extra)>;
) -> Result<(HeaderId<P::Hash, P::Number>, P::Extra), Self::Error>;
}
/// Target client trait.
@@ -84,35 +79,35 @@ pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
type Error: std::fmt::Debug + MaybeConnectionError;
/// Returns ID of best header known to the target node.
async fn best_header_id(self) -> OwnedTargetFutureOutput<Self, P, HeaderId<P::Hash, P::Number>>;
async fn best_header_id(&self) -> Result<HeaderId<P::Hash, P::Number>, Self::Error>;
/// Returns true if header is known to the target node.
async fn is_known_header(
self,
&self,
id: HeaderId<P::Hash, P::Number>,
) -> OwnedTargetFutureOutput<Self, P, (HeaderId<P::Hash, P::Number>, bool)>;
) -> Result<(HeaderId<P::Hash, P::Number>, bool), Self::Error>;
/// Submit headers.
async fn submit_headers(
self,
&self,
headers: Vec<QueuedHeader<P>>,
) -> OwnedTargetFutureOutput<Self, P, Vec<HeaderId<P::Hash, P::Number>>>;
) -> Result<Vec<HeaderId<P::Hash, P::Number>>, Self::Error>;
/// Returns ID of headers that require to be 'completed' before children can be submitted.
async fn incomplete_headers_ids(self) -> OwnedTargetFutureOutput<Self, P, HashSet<HeaderId<P::Hash, P::Number>>>;
async fn incomplete_headers_ids(&self) -> Result<HashSet<HeaderId<P::Hash, P::Number>>, Self::Error>;
/// Submit completion data for header.
async fn complete_header(
self,
&self,
id: HeaderId<P::Hash, P::Number>,
completion: P::Completion,
) -> OwnedTargetFutureOutput<Self, P, HeaderId<P::Hash, P::Number>>;
) -> Result<HeaderId<P::Hash, P::Number>, Self::Error>;
/// Returns true if header requires extra data to be submitted.
async fn requires_extra(
self,
&self,
header: QueuedHeader<P>,
) -> OwnedTargetFutureOutput<Self, P, (HeaderId<P::Hash, P::Number>, bool)>;
) -> Result<(HeaderId<P::Hash, P::Number>, bool), Self::Error>;
}
/// Run headers synchronization.
@@ -131,7 +126,7 @@ pub fn run<P: HeadersSyncPipeline>(
let mut stall_countdown = None;
let mut last_update_time = Instant::now();
let mut source_maybe_client = None;
let mut source_client_is_online = false;
let mut source_best_block_number_required = false;
let source_best_block_number_future = source_client.best_block_number().fuse();
let source_new_header_future = futures::future::Fuse::terminated();
@@ -141,7 +136,7 @@ pub fn run<P: HeadersSyncPipeline>(
let source_go_offline_future = futures::future::Fuse::terminated();
let source_tick_stream = interval(source_tick).fuse();
let mut target_maybe_client = None;
let mut target_client_is_online = false;
let mut target_best_block_required = false;
let mut target_incomplete_headers_required = true;
let target_best_block_future = target_client.best_header_id().fuse();
@@ -173,77 +168,65 @@ pub fn run<P: HeadersSyncPipeline>(
loop {
futures::select! {
(source_client, source_best_block_number) = source_best_block_number_future => {
source_best_block_number = source_best_block_number_future => {
source_best_block_number_required = false;
process_future_result(
&mut source_maybe_client,
source_client,
source_client_is_online = process_future_result(
source_best_block_number,
|source_best_block_number| sync.source_best_header_number_response(source_best_block_number),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving best header number from {}", P::SOURCE_NAME),
);
},
(source_client, source_new_header) = source_new_header_future => {
process_future_result(
&mut source_maybe_client,
source_client,
source_new_header = source_new_header_future => {
source_client_is_online = process_future_result(
source_new_header,
|source_new_header| sync.headers_mut().header_response(source_new_header),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving header from {} node", P::SOURCE_NAME),
);
},
(source_client, source_orphan_header) = source_orphan_header_future => {
process_future_result(
&mut source_maybe_client,
source_client,
source_orphan_header = source_orphan_header_future => {
source_client_is_online = process_future_result(
source_orphan_header,
|source_orphan_header| sync.headers_mut().header_response(source_orphan_header),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving orphan header from {} node", P::SOURCE_NAME),
);
},
(source_client, source_extra) = source_extra_future => {
process_future_result(
&mut source_maybe_client,
source_client,
source_extra = source_extra_future => {
source_client_is_online = process_future_result(
source_extra,
|(header, extra)| sync.headers_mut().extra_response(&header, extra),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving extra data from {} node", P::SOURCE_NAME),
);
},
(source_client, source_completion) = source_completion_future => {
process_future_result(
&mut source_maybe_client,
source_client,
source_completion = source_completion_future => {
source_client_is_online = process_future_result(
source_completion,
|(header, completion)| sync.headers_mut().completion_response(&header, completion),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving completion data from {} node", P::SOURCE_NAME),
);
},
source_client = source_go_offline_future => {
source_maybe_client = Some(source_client);
source_client_is_online = true;
},
_ = source_tick_stream.next() => {
if sync.is_almost_synced() {
source_best_block_number_required = true;
}
},
(target_client, target_best_block) = target_best_block_future => {
target_best_block = target_best_block_future => {
target_best_block_required = false;
process_future_result(
&mut target_maybe_client,
target_client,
target_client_is_online = process_future_result(
target_best_block,
|target_best_block| {
let head_updated = sync.target_best_header_response(target_best_block);
@@ -279,73 +262,63 @@ pub fn run<P: HeadersSyncPipeline>(
}
},
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving best known header from {} node", P::TARGET_NAME),
);
},
(target_client, incomplete_headers_ids) = target_incomplete_headers_future => {
incomplete_headers_ids = target_incomplete_headers_future => {
target_incomplete_headers_required = false;
process_future_result(
&mut target_maybe_client,
target_client,
target_client_is_online = process_future_result(
incomplete_headers_ids,
|incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME),
);
},
(target_client, target_existence_status) = target_existence_status_future => {
process_future_result(
&mut target_maybe_client,
target_client,
target_existence_status = target_existence_status_future => {
target_client_is_online = process_future_result(
target_existence_status,
|(target_header, target_existence_status)| sync
.headers_mut()
.maybe_orphan_response(&target_header, target_existence_status),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving existence status from {} node", P::TARGET_NAME),
);
},
(target_client, target_submit_header_result) = target_submit_header_future => {
process_future_result(
&mut target_maybe_client,
target_client,
target_submit_header_result = target_submit_header_future => {
target_client_is_online = process_future_result(
target_submit_header_result,
|submitted_headers| sync.headers_mut().headers_submitted(submitted_headers),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error submitting headers to {} node", P::TARGET_NAME),
);
},
(target_client, target_complete_header_result) = target_complete_header_future => {
process_future_result(
&mut target_maybe_client,
target_client,
target_complete_header_result = target_complete_header_future => {
target_client_is_online = process_future_result(
target_complete_header_result,
|completed_header| sync.headers_mut().header_completed(&completed_header),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error completing headers at {}", P::TARGET_NAME),
);
},
(target_client, target_extra_check_result) = target_extra_check_future => {
process_future_result(
&mut target_maybe_client,
target_client,
target_extra_check_result = target_extra_check_future => {
target_client_is_online = process_future_result(
target_extra_check_result,
|(header, extra_check_result)| sync
.headers_mut()
.maybe_extra_response(&header, extra_check_result),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME),
);
},
target_client = target_go_offline_future => {
target_maybe_client = Some(target_client);
target_client_is_online = true;
},
_ = target_tick_stream.next() => {
target_best_block_required = true;
@@ -356,15 +329,35 @@ pub fn run<P: HeadersSyncPipeline>(
// print progress
progress_context = print_sync_progress(progress_context, &sync);
// if target client is available: wait, or call required target methods
if let Some(target_client) = target_maybe_client.take() {
// the priority is to:
// 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely;
// 2) get incomplete headers - it stops us from submitting new blocks + we call it rarely;
// 3) complete headers - it stops us from submitting new blocks;
// 4) check if we need extra data from source - it stops us from downloading/submitting new blocks;
// 5) check existence - it stops us from submitting new blocks;
// 6) submit header
// If the target client is accepting requests we update the requests that
// we want it to run
if target_client_is_online {
// NOTE: Is is important to reset this so that we only have one
// request being processed by the client at a time. This prevents
// race conditions like receiving two transactions with the same
// nonce from the client.
target_client_is_online = false;
// The following is how we prioritize requests:
//
// 1. Get best block
// - Stops us from downloading or submitting new blocks
// - Only called rarely
//
// 2. Get incomplete headers
// - Stops us from submitting new blocks
// - Only called rarely
//
// 3. Get complete headers
// - Stops us from submitting new blocks
//
// 4. Check if we need extra data from source
// - Stops us from downloading or submitting new blocks
//
// 5. Check existence of header
// - Stops us from submitting new blocks
//
// 6. Submit header
if target_best_block_required {
log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME);
@@ -424,18 +417,35 @@ pub fn run<P: HeadersSyncPipeline>(
stall_countdown = Some(Instant::now());
}
} else {
target_maybe_client = Some(target_client);
target_client_is_online = true;
}
}
// if source client is available: wait, or call required source methods
if let Some(source_client) = source_maybe_client.take() {
// the priority is to:
// 1) get best block - it stops us from downloading new blocks + we call it rarely;
// 2) download completion data - it stops us from submitting new blocks;
// 3) download extra data - it stops us from submitting new blocks;
// 4) download missing headers - it stops us from downloading/submitting new blocks;
// 5) downloading new headers
// If the source client is accepting requests we update the requests that
// we want it to run
if source_client_is_online {
// NOTE: Is is important to reset this so that we only have one
// request being processed by the client at a time. This prevents
// race conditions like receiving two transactions with the same
// nonce from the client.
source_client_is_online = false;
// The following is how we prioritize requests:
//
// 1. Get best block
// - Stops us from downloading or submitting new blocks
// - Only called rarely
//
// 2. Download completion data
// - Stops us from submitting new blocks
//
// 3. Download extra data
// - Stops us from submitting new blocks
//
// 4. Download missing headers
// - Stops us from downloading or submitting new blocks
//
// 5. Downloading new headers
if source_best_block_number_required {
log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME);
@@ -488,55 +498,56 @@ pub fn run<P: HeadersSyncPipeline>(
source_new_header_future.set(source_client.header_by_number(id).fuse());
} else {
source_maybe_client = Some(source_client);
source_client_is_online = true;
}
}
}
});
}
/// Future that resolves into given value after given timeout.
async fn delay<T>(timeout: Duration, retval: T) -> T {
async_std::task::sleep(timeout).await;
retval
}
/// Stream that emits item every `timeout_ms` milliseconds.
fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold((), move |_| async move {
delay(timeout, ()).await;
async_std::task::sleep(timeout).await;
Some(((), ()))
})
}
/// Process result of the future that may have been caused by connection failure.
fn process_future_result<TClient, TResult, TError, TGoOfflineFuture>(
maybe_client: &mut Option<TClient>,
client: TClient,
/// Process result of the future from a client.
///
/// Returns whether or not the client we're interacting with is online. In this context
/// what online means is that the client is currently not handling any other requests
/// that we've previously sent.
fn process_future_result<TResult, TError, TGoOfflineFuture>(
result: Result<TResult, TError>,
on_success: impl FnOnce(TResult),
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
go_offline: impl FnOnce(TClient) -> TGoOfflineFuture,
go_offline: impl FnOnce() -> TGoOfflineFuture,
error_pattern: impl FnOnce() -> String,
) where
) -> bool
where
TError: std::fmt::Debug + MaybeConnectionError,
TGoOfflineFuture: FutureExt,
{
let mut client_is_online = false;
match result {
Ok(result) => {
*maybe_client = Some(client);
on_success(result);
client_is_online = true
}
Err(error) => {
if error.is_connection_error() {
go_offline_future.set(go_offline(client).fuse());
go_offline_future.set(go_offline().fuse());
} else {
*maybe_client = Some(client);
client_is_online = true
}
log::error!(target: "bridge", "{}: {:?}", error_pattern(), error);
}
}
client_is_online
}
/// Print synchronization progress.