fixed on-demand parachains relay case: if better relay header is delivered, then we must select para header that may be proved using this relay header (#1419)

This commit is contained in:
Svyatoslav Nikolsky
2022-05-27 16:49:50 +03:00
committed by Bastian Köcher
parent 188f16beb0
commit 0a3f8ace26
3 changed files with 246 additions and 314 deletions
@@ -37,14 +37,14 @@ use num_traits::Zero;
use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient}; use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient};
use relay_substrate_client::{ use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
TransactionSignScheme, TransactionSignScheme,
}; };
use relay_utils::{ use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId, metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId,
}; };
use sp_runtime::traits::Header as HeaderT; use sp_runtime::traits::Header as HeaderT;
use std::{cmp::Ordering, collections::BTreeMap}; use std::fmt::Debug;
/// On-demand Substrate <-> Substrate parachain finality relay. /// On-demand Substrate <-> Substrate parachain finality relay.
/// ///
@@ -142,9 +142,8 @@ async fn background_task<P: SubstrateParachainsPipeline>(
let target_transactions_mortality = target_transaction_params.mortality; let target_transactions_mortality = target_transaction_params.mortality;
let mut relay_state = RelayState::Idle; let mut relay_state = RelayState::Idle;
let mut headers_map_cache = BTreeMap::new();
let mut required_parachain_header_number = Zero::zero(); let mut required_parachain_header_number = Zero::zero();
let required_para_header_number_ref = Arc::new(Mutex::new(required_parachain_header_number)); let required_para_header_number_ref = Arc::new(Mutex::new(None));
let mut restart_relay = true; let mut restart_relay = true;
let parachains_relay_task = futures::future::Fuse::terminated(); let parachains_relay_task = futures::future::Fuse::terminated();
@@ -191,7 +190,10 @@ async fn background_task<P: SubstrateParachainsPipeline>(
// the workflow of the on-demand parachains relay is: // the workflow of the on-demand parachains relay is:
// //
// 1) message relay (or any other dependent relay) sees new message at parachain header // 1) message relay (or any other dependent relay) sees new message at parachain header
// `PH`; 2) it sees that the target chain does not know `PH`; // `PH`;
//
// 2) it sees that the target chain does not know `PH`;
//
// 3) it asks on-demand parachains relay to relay `PH` to the target chain; // 3) it asks on-demand parachains relay to relay `PH` to the target chain;
// //
// Phase#1: relaying relay chain header // Phase#1: relaying relay chain header
@@ -204,21 +206,21 @@ async fn background_task<P: SubstrateParachainsPipeline>(
// Phase#2: relaying parachain header // Phase#2: relaying parachain header
// //
// 7) on-demand parachains relay sets `ParachainsSource::maximal_header_number` to the // 7) on-demand parachains relay sets `ParachainsSource::maximal_header_number` to the
// `PH'.number()`. 8) parachains finality relay sees that the parachain head has been // `PH'.number()`.
// updated and relays `PH'` to the target chain. // 8) parachains finality relay sees that the parachain head has been
// updated and relays `PH'` to the target chain.
// select headers to relay // select headers to relay
let relay_data = read_relay_data( let relay_data = read_relay_data(
&parachains_source, &parachains_source,
&parachains_target, &parachains_target,
required_parachain_header_number, required_parachain_header_number,
&mut headers_map_cache,
) )
.await; .await;
match relay_data { match relay_data {
Ok(mut relay_data) => { Ok(relay_data) => {
let prev_relay_state = relay_state; let prev_relay_state = relay_state;
relay_state = select_headers_to_relay(&mut relay_data, relay_state); relay_state = select_headers_to_relay(&relay_data, relay_state);
log::trace!( log::trace!(
target: "bridge", target: "bridge",
"Selected new relay state in {}: {:?} using old state {:?} and data {:?}", "Selected new relay state in {}: {:?} using old state {:?} and data {:?}",
@@ -244,13 +246,13 @@ async fn background_task<P: SubstrateParachainsPipeline>(
// requirements // requirements
match relay_state { match relay_state {
RelayState::Idle => (), RelayState::Idle => (),
RelayState::RelayingRelayHeader(required_relay_header, _) => { RelayState::RelayingRelayHeader(required_relay_header) => {
on_demand_source_relay_to_target_headers on_demand_source_relay_to_target_headers
.require_more_headers(required_relay_header) .require_more_headers(required_relay_header)
.await; .await;
}, },
RelayState::RelayingParaHeader(required_para_header) => { RelayState::RelayingParaHeader(required_para_header) => {
*required_para_header_number_ref.lock().await = required_para_header; *required_para_header_number_ref.lock().await = Some(required_para_header);
}, },
} }
@@ -300,55 +302,44 @@ fn on_demand_parachains_relay_name<SourceChain: Chain, TargetChain: Chain>() ->
/// On-demand relay state. /// On-demand relay state.
#[derive(Clone, Copy, Debug, PartialEq)] #[derive(Clone, Copy, Debug, PartialEq)]
enum RelayState<SourceParaBlock, SourceRelayBlock> { enum RelayState<ParaHash, ParaNumber, RelayNumber> {
/// On-demand relay is not doing anything. /// On-demand relay is not doing anything.
Idle, Idle,
/// Relaying given relay header to relay given parachain header later. /// Relaying given relay header to relay given parachain header later.
RelayingRelayHeader(SourceRelayBlock, SourceParaBlock), RelayingRelayHeader(RelayNumber),
/// Relaying given parachain header. /// Relaying given parachain header.
RelayingParaHeader(SourceParaBlock), RelayingParaHeader(HeaderId<ParaHash, ParaNumber>),
} }
/// Data gathered from source and target clients, used by on-demand relay. /// Data gathered from source and target clients, used by on-demand relay.
#[derive(Debug)] #[derive(Debug)]
struct RelayData<'a, SourceParaBlock, SourceRelayBlock> { struct RelayData<ParaHash, ParaNumber, RelayNumber> {
/// Parachain header number that is required at the target chain. /// Parachain header number that is required at the target chain.
pub required_para_header: SourceParaBlock, pub required_para_header: ParaNumber,
/// Parachain header number, known to the target chain. /// Parachain header number, known to the target chain.
pub para_header_at_target: SourceParaBlock, pub para_header_at_target: ParaNumber,
/// Parachain header number, known to the source (relay) chain. /// Parachain header id, known to the source (relay) chain.
pub para_header_at_source: Option<SourceParaBlock>, pub para_header_at_source: Option<HeaderId<ParaHash, ParaNumber>>,
/// Parachain header, that is available at the source relay chain at `relay_header_at_target`
/// block.
pub para_header_at_relay_header_at_target: Option<HeaderId<ParaHash, ParaNumber>>,
/// Relay header number at the source chain. /// Relay header number at the source chain.
pub relay_header_at_source: SourceRelayBlock, pub relay_header_at_source: RelayNumber,
/// Relay header number at the target chain. /// Relay header number at the target chain.
pub relay_header_at_target: SourceRelayBlock, pub relay_header_at_target: RelayNumber,
/// Map of relay to para header block numbers for recent relay headers.
///
/// Even if we have been trying to relay relay header #100 to relay parachain header #50
/// afterwards, it may happen that the relay header #200 may be relayed instead - either
/// by us (e.g. if GRANDPA justification is generated for #200, or if we are only syncing
/// mandatory headers), or by other relayer. Then, instead of parachain header #50 we may
/// relay parachain header #70.
///
/// This cache is especially important, given that we assume that the nodes we're connected
/// to are not necessarily archive nodes. Then, if current relay chain block is #210 and #200
/// has been delivered to the target chain, we have more chances to generate storage proof
/// at relay block #200 than on relay block #100, which is most likely has pruned state
/// already.
pub headers_map_cache: &'a mut BTreeMap<SourceRelayBlock, SourceParaBlock>,
} }
/// Read required data from source and target clients. /// Read required data from source and target clients.
async fn read_relay_data<'a, P: SubstrateParachainsPipeline>( async fn read_relay_data<P: SubstrateParachainsPipeline>(
source: &ParachainsSource<P>, source: &ParachainsSource<P>,
target: &ParachainsTarget<P>, target: &ParachainsTarget<P>,
required_header_number: BlockNumberOf<P::SourceParachain>, required_header_number: BlockNumberOf<P::SourceParachain>,
headers_map_cache: &'a mut BTreeMap<
BlockNumberOf<P::SourceRelayChain>,
BlockNumberOf<P::SourceParachain>,
>,
) -> Result< ) -> Result<
RelayData<'a, BlockNumberOf<P::SourceParachain>, BlockNumberOf<P::SourceRelayChain>>, RelayData<
HashOf<P::SourceParachain>,
BlockNumberOf<P::SourceParachain>,
BlockNumberOf<P::SourceRelayChain>,
>,
FailedClient, FailedClient,
> >
where where
@@ -398,7 +389,7 @@ where
) )
.await .await
.map_err(map_source_err)? .map_err(map_source_err)?
.map(|h| *h.number()); .map(|h| HeaderId(*h.number(), h.hash()));
let relay_header_at_source = best_finalized_relay_block_id.0; let relay_header_at_source = best_finalized_relay_block_id.0;
let relay_header_at_target = let relay_header_at_target =
@@ -408,68 +399,52 @@ where
P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD, P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD,
) )
.await .await
.map_err(map_target_err)? .map_err(map_target_err)?;
.0;
let para_header_at_relay_header_at_target = source
.on_chain_parachain_header(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into())
.await
.map_err(map_source_err)?
.map(|h| HeaderId(*h.number(), h.hash()));
Ok(RelayData { Ok(RelayData {
required_para_header: required_header_number, required_para_header: required_header_number,
para_header_at_target, para_header_at_target,
para_header_at_source, para_header_at_source,
relay_header_at_source, relay_header_at_source,
relay_header_at_target, relay_header_at_target: relay_header_at_target.0,
headers_map_cache, para_header_at_relay_header_at_target,
}) })
} }
// This number is bigger than the session length of any well-known Substrate-based relay
// chain. We expect that the underlying on-demand relay will submit at least 1 header per
// session.
const MAX_HEADERS_MAP_CACHE_ENTRIES: usize = 4096;
/// Select relay and parachain headers that need to be relayed. /// Select relay and parachain headers that need to be relayed.
fn select_headers_to_relay<'a, SourceParaBlock, SourceRelayBlock>( fn select_headers_to_relay<ParaHash, ParaNumber, RelayNumber>(
data: &mut RelayData<'a, SourceParaBlock, SourceRelayBlock>, data: &RelayData<ParaHash, ParaNumber, RelayNumber>,
mut state: RelayState<SourceParaBlock, SourceRelayBlock>, mut state: RelayState<ParaHash, ParaNumber, RelayNumber>,
) -> RelayState<SourceParaBlock, SourceRelayBlock> ) -> RelayState<ParaHash, ParaNumber, RelayNumber>
where where
RelayData<'a, SourceParaBlock, SourceRelayBlock>: std::fmt::Debug, // TODO: remove ParaHash: Clone,
SourceParaBlock: Copy + PartialOrd, ParaNumber: Copy + PartialOrd,
SourceRelayBlock: Copy + Ord, RelayNumber: Copy + Debug + Ord,
{ {
// despite of our current state, we want to update the headers map cache
if let Some(para_header_at_source) = data.para_header_at_source {
data.headers_map_cache
.insert(data.relay_header_at_source, para_header_at_source);
if data.headers_map_cache.len() > MAX_HEADERS_MAP_CACHE_ENTRIES {
let first_key = *data.headers_map_cache.keys().next().expect("map is not empty; qed");
data.headers_map_cache.remove(&first_key);
}
}
// this switch is responsible for processing `RelayingRelayHeader` state // this switch is responsible for processing `RelayingRelayHeader` state
match state { match state {
RelayState::Idle | RelayState::RelayingParaHeader(_) => (), RelayState::Idle | RelayState::RelayingParaHeader(_) => (),
RelayState::RelayingRelayHeader(relay_header_number, para_header_number) => { RelayState::RelayingRelayHeader(relay_header_number) => {
match data.relay_header_at_target.cmp(&relay_header_number) { if data.relay_header_at_target < relay_header_number {
Ordering::Less => { // required relay header hasn't yet been relayed
// relay header hasn't yet been relayed return RelayState::RelayingRelayHeader(relay_header_number)
return RelayState::RelayingRelayHeader(relay_header_number, para_header_number) }
},
Ordering::Equal => { // we may switch to `RelayingParaHeader` if parachain head is available
// relay header has been realyed and we may continue with parachain header if let Some(para_header_at_relay_header_at_target) =
state = RelayState::RelayingParaHeader(para_header_number); data.para_header_at_relay_header_at_target.clone()
}, {
Ordering::Greater => { state = RelayState::RelayingParaHeader(para_header_at_relay_header_at_target);
// relay header descendant has been relayed and we may need to change parachain } else {
// header that we want to relay // otherwise, we'd need to restart (this may happen only if parachain has been
let next_para_header_number = data // deregistered)
.headers_map_cache state = RelayState::Idle;
.range(..=data.relay_header_at_target)
.next_back()
.map(|(_, next_para_header_number)| *next_para_header_number)
.unwrap_or_else(|| para_header_number);
state = RelayState::RelayingParaHeader(next_para_header_number);
},
} }
}, },
} }
@@ -477,11 +452,11 @@ where
// this switch is responsible for processing `RelayingParaHeader` state // this switch is responsible for processing `RelayingParaHeader` state
match state { match state {
RelayState::Idle => (), RelayState::Idle => (),
RelayState::RelayingRelayHeader(_, _) => unreachable!("processed by previous match; qed"), RelayState::RelayingRelayHeader(_) => unreachable!("processed by previous match; qed"),
RelayState::RelayingParaHeader(para_header_number) => { RelayState::RelayingParaHeader(para_header_id) => {
if data.para_header_at_target < para_header_number { if data.para_header_at_target < para_header_id.0 {
// parachain header hasn't yet been relayed // parachain header hasn't yet been relayed
return RelayState::RelayingParaHeader(para_header_number) return RelayState::RelayingParaHeader(para_header_id)
} }
}, },
} }
@@ -491,8 +466,14 @@ where
return RelayState::Idle return RelayState::Idle
} }
// if we haven't read para head from the source, we can't yet do anyhting
let para_header_at_source = match data.para_header_at_source {
Some(ref para_header_at_source) => para_header_at_source.clone(),
None => return RelayState::Idle,
};
// if required header is not available even at the source chain, let's wait // if required header is not available even at the source chain, let's wait
if Some(data.required_para_header) > data.para_header_at_source { if data.required_para_header > para_header_at_source.0 {
return RelayState::Idle return RelayState::Idle
} }
@@ -501,14 +482,11 @@ where
// we need relay chain header first // we need relay chain header first
if data.relay_header_at_target < data.relay_header_at_source { if data.relay_header_at_target < data.relay_header_at_source {
return RelayState::RelayingRelayHeader( return RelayState::RelayingRelayHeader(data.relay_header_at_source)
data.relay_header_at_source,
data.required_para_header,
)
} }
// if all relay headers synced, we may start directly with parachain header // if all relay headers synced, we may start directly with parachain header
RelayState::RelayingParaHeader(data.required_para_header) RelayState::RelayingParaHeader(para_header_at_source)
} }
#[cfg(test)] #[cfg(test)]
@@ -519,17 +497,17 @@ mod tests {
fn relay_waits_for_relay_header_to_be_delivered() { fn relay_waits_for_relay_header_to_be_delivered() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 100, required_para_header: 90,
para_header_at_target: 50, para_header_at_target: 50,
para_header_at_source: Some(110), para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 700, relay_header_at_target: 700,
headers_map_cache: &mut BTreeMap::new(), para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
}, },
RelayState::RelayingRelayHeader(750, 100), RelayState::RelayingRelayHeader(750),
), ),
RelayState::RelayingRelayHeader(750, 100), RelayState::RelayingRelayHeader(750),
); );
} }
@@ -537,53 +515,17 @@ mod tests {
fn relay_starts_relaying_requested_para_header_after_relay_header_is_delivered() { fn relay_starts_relaying_requested_para_header_after_relay_header_is_delivered() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 100, required_para_header: 90,
para_header_at_target: 50, para_header_at_target: 50,
para_header_at_source: Some(110), para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 750, relay_header_at_target: 750,
headers_map_cache: &mut BTreeMap::new(), para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
}, },
RelayState::RelayingRelayHeader(750, 100), RelayState::RelayingRelayHeader(750),
), ),
RelayState::RelayingParaHeader(100), RelayState::RelayingParaHeader(HeaderId(100, 100)),
);
}
#[test]
fn relay_selects_same_para_header_after_better_relay_header_is_delivered_1() {
assert_eq!(
select_headers_to_relay(
&mut RelayData {
required_para_header: 100,
para_header_at_target: 50,
para_header_at_source: Some(110),
relay_header_at_source: 800,
relay_header_at_target: 780,
headers_map_cache: &mut vec![(700, 90), (750, 100)].into_iter().collect(),
},
RelayState::RelayingRelayHeader(750, 100),
),
RelayState::RelayingParaHeader(100),
);
}
#[test]
fn relay_selects_same_para_header_after_better_relay_header_is_delivered_2() {
assert_eq!(
select_headers_to_relay(
&mut RelayData {
required_para_header: 100,
para_header_at_target: 50,
para_header_at_source: Some(110),
relay_header_at_source: 800,
relay_header_at_target: 780,
headers_map_cache: &mut BTreeMap::new(),
},
RelayState::RelayingRelayHeader(750, 100),
),
RelayState::RelayingParaHeader(100),
); );
} }
@@ -591,37 +533,34 @@ mod tests {
fn relay_selects_better_para_header_after_better_relay_header_is_delivered() { fn relay_selects_better_para_header_after_better_relay_header_is_delivered() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 100, required_para_header: 90,
para_header_at_target: 50, para_header_at_target: 50,
para_header_at_source: Some(120), para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 780, relay_header_at_target: 780,
headers_map_cache: &mut vec![(700, 90), (750, 100), (780, 110), (790, 120)] para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
.into_iter()
.collect(),
}, },
RelayState::RelayingRelayHeader(750, 100), RelayState::RelayingRelayHeader(750),
), ),
RelayState::RelayingParaHeader(110), RelayState::RelayingParaHeader(HeaderId(105, 105)),
); );
} }
#[test] #[test]
fn relay_waits_for_para_header_to_be_delivered() { fn relay_waits_for_para_header_to_be_delivered() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 100, required_para_header: 90,
para_header_at_target: 50, para_header_at_target: 50,
para_header_at_source: Some(110), para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 700, relay_header_at_target: 780,
headers_map_cache: &mut BTreeMap::new(), para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
}, },
RelayState::RelayingParaHeader(100), RelayState::RelayingParaHeader(HeaderId(105, 105)),
), ),
RelayState::RelayingParaHeader(100), RelayState::RelayingParaHeader(HeaderId(105, 105)),
); );
} }
@@ -629,13 +568,13 @@ mod tests {
fn relay_stays_idle_if_required_para_header_is_already_delivered() { fn relay_stays_idle_if_required_para_header_is_already_delivered() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 100, required_para_header: 90,
para_header_at_target: 100, para_header_at_target: 105,
para_header_at_source: Some(110), para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 700, relay_header_at_target: 780,
headers_map_cache: &mut BTreeMap::new(), para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
}, },
RelayState::Idle, RelayState::Idle,
), ),
@@ -647,13 +586,13 @@ mod tests {
fn relay_waits_for_required_para_header_to_appear_at_source_1() { fn relay_waits_for_required_para_header_to_appear_at_source_1() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 110, required_para_header: 120,
para_header_at_target: 100, para_header_at_target: 105,
para_header_at_source: None, para_header_at_source: None,
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 700, relay_header_at_target: 780,
headers_map_cache: &mut BTreeMap::new(), para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
}, },
RelayState::Idle, RelayState::Idle,
), ),
@@ -665,13 +604,13 @@ mod tests {
fn relay_waits_for_required_para_header_to_appear_at_source_2() { fn relay_waits_for_required_para_header_to_appear_at_source_2() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 110, required_para_header: 120,
para_header_at_target: 100, para_header_at_target: 105,
para_header_at_source: Some(100), para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 700, relay_header_at_target: 780,
headers_map_cache: &mut BTreeMap::new(), para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
}, },
RelayState::Idle, RelayState::Idle,
), ),
@@ -683,17 +622,17 @@ mod tests {
fn relay_starts_relaying_relay_header_when_new_para_header_is_requested() { fn relay_starts_relaying_relay_header_when_new_para_header_is_requested() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 110, required_para_header: 120,
para_header_at_target: 100, para_header_at_target: 105,
para_header_at_source: Some(110), para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 700, relay_header_at_target: 780,
headers_map_cache: &mut BTreeMap::new(), para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
}, },
RelayState::Idle, RelayState::Idle,
), ),
RelayState::RelayingRelayHeader(800, 110), RelayState::RelayingRelayHeader(800),
); );
} }
@@ -701,97 +640,35 @@ mod tests {
fn relay_starts_relaying_para_header_when_new_para_header_is_requested() { fn relay_starts_relaying_para_header_when_new_para_header_is_requested() {
assert_eq!( assert_eq!(
select_headers_to_relay( select_headers_to_relay(
&mut RelayData { &RelayData {
required_para_header: 110, required_para_header: 120,
para_header_at_target: 100, para_header_at_target: 105,
para_header_at_source: Some(110), para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800, relay_header_at_source: 800,
relay_header_at_target: 800, relay_header_at_target: 800,
headers_map_cache: &mut BTreeMap::new(), para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
}, },
RelayState::Idle, RelayState::Idle,
), ),
RelayState::RelayingParaHeader(110), RelayState::RelayingParaHeader(HeaderId(125, 125)),
); );
} }
#[test] #[test]
fn headers_map_cache_is_updated() { fn relay_goes_idle_when_parachain_is_deregistered() {
let mut headers_map_cache = BTreeMap::new(); assert_eq!(
select_headers_to_relay::<i32, _, _>(
// when parachain header is known, map is updated &RelayData {
select_headers_to_relay( required_para_header: 120,
&mut RelayData { para_header_at_target: 105,
required_para_header: 0, para_header_at_source: None,
para_header_at_target: 50, relay_header_at_source: 800,
para_header_at_source: Some(110), relay_header_at_target: 800,
relay_header_at_source: 800, para_header_at_relay_header_at_target: None,
relay_header_at_target: 700,
headers_map_cache: &mut headers_map_cache,
},
RelayState::RelayingRelayHeader(750, 100),
);
assert_eq!(headers_map_cache.clone().into_iter().collect::<Vec<_>>(), vec![(800, 110)],);
// when parachain header is not known, map is NOT updated
select_headers_to_relay(
&mut RelayData {
required_para_header: 0,
para_header_at_target: 50,
para_header_at_source: None,
relay_header_at_source: 800,
relay_header_at_target: 700,
headers_map_cache: &mut headers_map_cache,
},
RelayState::RelayingRelayHeader(750, 100),
);
assert_eq!(headers_map_cache.clone().into_iter().collect::<Vec<_>>(), vec![(800, 110)],);
// map auto-deduplicates equal entries
select_headers_to_relay(
&mut RelayData {
required_para_header: 0,
para_header_at_target: 50,
para_header_at_source: Some(110),
relay_header_at_source: 800,
relay_header_at_target: 700,
headers_map_cache: &mut headers_map_cache,
},
RelayState::RelayingRelayHeader(750, 100),
);
assert_eq!(headers_map_cache.clone().into_iter().collect::<Vec<_>>(), vec![(800, 110)],);
// nothing is pruned if number of map entries is < MAX_HEADERS_MAP_CACHE_ENTRIES
for i in 1..MAX_HEADERS_MAP_CACHE_ENTRIES {
select_headers_to_relay(
&mut RelayData {
required_para_header: 0,
para_header_at_target: 50,
para_header_at_source: Some(110 + i),
relay_header_at_source: 800 + i,
relay_header_at_target: 700,
headers_map_cache: &mut headers_map_cache,
}, },
RelayState::RelayingRelayHeader(750, 100), RelayState::RelayingRelayHeader(800),
); ),
assert_eq!(headers_map_cache.len(), i + 1); RelayState::Idle,
}
// when we add next entry, the oldest one is pruned
assert!(headers_map_cache.contains_key(&800));
assert_eq!(headers_map_cache.len(), MAX_HEADERS_MAP_CACHE_ENTRIES);
select_headers_to_relay(
&mut RelayData {
required_para_header: 0,
para_header_at_target: 50,
para_header_at_source: Some(110 + MAX_HEADERS_MAP_CACHE_ENTRIES),
relay_header_at_source: 800 + MAX_HEADERS_MAP_CACHE_ENTRIES,
relay_header_at_target: 700,
headers_map_cache: &mut headers_map_cache,
},
RelayState::RelayingRelayHeader(750, 100),
); );
assert!(!headers_map_cache.contains_key(&800));
assert_eq!(headers_map_cache.len(), MAX_HEADERS_MAP_CACHE_ENTRIES);
} }
} }
@@ -16,39 +16,38 @@
//! Parachain heads source. //! Parachain heads source.
use crate::{ use crate::parachains::{ParachainsPipelineAdapter, SubstrateParachainsPipeline};
finality::source::RequiredHeaderNumberRef,
parachains::{ParachainsPipelineAdapter, SubstrateParachainsPipeline},
};
use async_std::sync::{Arc, Mutex}; use async_std::sync::{Arc, Mutex};
use async_trait::async_trait; use async_trait::async_trait;
use bp_parachains::parachain_head_storage_key_at_source; use bp_parachains::parachain_head_storage_key_at_source;
use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId}; use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId};
use codec::Decode; use codec::Decode;
use parachains_relay::parachains_loop::SourceClient; use parachains_relay::parachains_loop::{ParaHashAtSource, SourceClient};
use relay_substrate_client::{ use relay_substrate_client::{
Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain, Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain,
}; };
use relay_utils::relay_loop::Client as RelayClient; use relay_utils::relay_loop::Client as RelayClient;
use sp_runtime::traits::Header as HeaderT; use sp_runtime::traits::Header as HeaderT;
/// Shared updatable reference to the maximal parachain header id that we want to sync from the
/// source.
pub type RequiredHeaderIdRef<C> = Arc<Mutex<Option<HeaderIdOf<C>>>>;
/// Substrate client as parachain heads source. /// Substrate client as parachain heads source.
#[derive(Clone)] #[derive(Clone)]
pub struct ParachainsSource<P: SubstrateParachainsPipeline> { pub struct ParachainsSource<P: SubstrateParachainsPipeline> {
client: Client<P::SourceRelayChain>, client: Client<P::SourceRelayChain>,
maximal_header_number: Option<RequiredHeaderNumberRef<P::SourceParachain>>, maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
previous_parachain_head: Arc<Mutex<Option<ParaHash>>>,
} }
impl<P: SubstrateParachainsPipeline> ParachainsSource<P> { impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
/// Creates new parachains source client. /// Creates new parachains source client.
pub fn new( pub fn new(
client: Client<P::SourceRelayChain>, client: Client<P::SourceRelayChain>,
maximal_header_number: Option<RequiredHeaderNumberRef<P::SourceParachain>>, maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
) -> Self { ) -> Self {
let previous_parachain_head = Arc::new(Mutex::new(None)); ParachainsSource { client, maximal_header_id }
ParachainsSource { client, maximal_header_number, previous_parachain_head }
} }
/// Returns reference to the underlying RPC client. /// Returns reference to the underlying RPC client.
@@ -102,7 +101,7 @@ where
&self, &self,
at_block: HeaderIdOf<P::SourceRelayChain>, at_block: HeaderIdOf<P::SourceRelayChain>,
para_id: ParaId, para_id: ParaId,
) -> Result<Option<ParaHash>, Self::Error> { ) -> Result<ParaHashAtSource, Self::Error> {
// we don't need to support many parachains now // we don't need to support many parachains now
if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID { if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID {
return Err(SubstrateError::Custom(format!( return Err(SubstrateError::Custom(format!(
@@ -112,29 +111,33 @@ where
))) )))
} }
let parachain_head = match self.on_chain_parachain_header(at_block, para_id).await? { Ok(match self.on_chain_parachain_header(at_block, para_id).await? {
Some(parachain_header) => { Some(parachain_header) => {
let mut parachain_head = Some(parachain_header.hash()); let mut parachain_head = ParaHashAtSource::Some(parachain_header.hash());
// never return head that is larger than requested. This way we'll never sync // never return head that is larger than requested. This way we'll never sync
// headers past `maximal_header_number` // headers past `maximal_header_id`
if let Some(ref maximal_header_number) = self.maximal_header_number { if let Some(ref maximal_header_id) = self.maximal_header_id {
let maximal_header_number = *maximal_header_number.lock().await; let maximal_header_id = *maximal_header_id.lock().await;
if *parachain_header.number() > maximal_header_number { match maximal_header_id {
let previous_parachain_head = *self.previous_parachain_head.lock().await; Some(maximal_header_id)
if let Some(previous_parachain_head) = previous_parachain_head { if *parachain_header.number() > maximal_header_id.0 =>
parachain_head = Some(previous_parachain_head); {
} // we don't want this header yet => let's report previously requested
// header
parachain_head = ParaHashAtSource::Some(maximal_header_id.1);
},
Some(_) => (),
None => {
// on-demand relay has not yet asked us to sync anything let's do that
parachain_head = ParaHashAtSource::Unavailable;
},
} }
} }
parachain_head parachain_head
}, },
None => None, None => ParaHashAtSource::None,
}; })
*self.previous_parachain_head.lock().await = parachain_head;
Ok(parachain_head)
} }
async fn prove_parachain_heads( async fn prove_parachain_heads(
@@ -52,6 +52,23 @@ pub enum ParachainSyncStrategy {
All, All,
} }
/// Parachain head hash, available at the source (relay) chain.
#[derive(Clone, Copy, Debug)]
pub enum ParaHashAtSource {
/// There's no parachain head at the source chain.
///
/// Normally it means that the parachain is not registered there.
None,
/// Parachain head with given hash is available at the source chain.
Some(ParaHash),
/// The source client refuses to report parachain head hash at this moment.
///
/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
/// This variant must be treated as "we don't want to update parachain head value at the
/// target chain at this moment".
Unavailable,
}
/// Source client used in parachain heads synchronization loop. /// Source client used in parachain heads synchronization loop.
#[async_trait] #[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient { pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
@@ -63,7 +80,7 @@ pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
&self, &self,
at_block: HeaderIdOf<P::SourceChain>, at_block: HeaderIdOf<P::SourceChain>,
para_id: ParaId, para_id: ParaId,
) -> Result<Option<ParaHash>, Self::Error>; ) -> Result<ParaHashAtSource, Self::Error>;
/// Get parachain heads proof. /// Get parachain heads proof.
async fn prove_parachain_heads( async fn prove_parachain_heads(
@@ -291,7 +308,7 @@ where
/// Given heads at source and target clients, returns set of heads that are out of sync. /// Given heads at source and target clients, returns set of heads that are out of sync.
fn select_parachains_to_update<P: ParachainsPipeline>( fn select_parachains_to_update<P: ParachainsPipeline>(
heads_at_source: BTreeMap<ParaId, Option<ParaHash>>, heads_at_source: BTreeMap<ParaId, ParaHashAtSource>,
heads_at_target: BTreeMap<ParaId, Option<BestParaHeadHash>>, heads_at_target: BTreeMap<ParaId, Option<BestParaHeadHash>>,
best_finalized_relay_block: HeaderIdOf<P::SourceChain>, best_finalized_relay_block: HeaderIdOf<P::SourceChain>,
) -> Vec<ParaId> ) -> Vec<ParaId>
@@ -317,7 +334,12 @@ where
.zip(heads_at_target.into_iter()) .zip(heads_at_target.into_iter())
.filter(|((para, head_at_source), (_, head_at_target))| { .filter(|((para, head_at_source), (_, head_at_target))| {
let needs_update = match (head_at_source, head_at_target) { let needs_update = match (head_at_source, head_at_target) {
(Some(head_at_source), Some(head_at_target)) (ParaHashAtSource::Unavailable, _) => {
// source client has politely asked us not to update current parachain head
// at the target chain
false
},
(ParaHashAtSource::Some(head_at_source), Some(head_at_target))
if head_at_target.at_relay_block_number < best_finalized_relay_block.0 && if head_at_target.at_relay_block_number < best_finalized_relay_block.0 &&
head_at_target.head_hash != *head_at_source => head_at_target.head_hash != *head_at_source =>
{ {
@@ -325,22 +347,22 @@ where
// client // client
true true
}, },
(Some(_), Some(_)) => { (ParaHashAtSource::Some(_), Some(_)) => {
// this is normal case when relay has recently updated heads, when parachain is // this is normal case when relay has recently updated heads, when parachain is
// not progressing or when our source client is // not progressing or when our source client is
false false
}, },
(Some(_), None) => { (ParaHashAtSource::Some(_), None) => {
// parachain is not yet known to the target client. This is true when parachain // parachain is not yet known to the target client. This is true when parachain
// or bridge has been just onboarded/started // or bridge has been just onboarded/started
true true
}, },
(None, Some(_)) => { (ParaHashAtSource::None, Some(_)) => {
// parachain/parathread has been offboarded removed from the system. It needs to // parachain/parathread has been offboarded removed from the system. It needs to
// be propageted to the target client // be propageted to the target client
true true
}, },
(None, None) => { (ParaHashAtSource::None, None) => {
// all's good - parachain is unknown to both clients // all's good - parachain is unknown to both clients
false false
}, },
@@ -378,7 +400,7 @@ async fn read_heads_at_source<P: ParachainsPipeline>(
source_client: &impl SourceClient<P>, source_client: &impl SourceClient<P>,
at_relay_block: &HeaderIdOf<P::SourceChain>, at_relay_block: &HeaderIdOf<P::SourceChain>,
parachains: &[ParaId], parachains: &[ParaId],
) -> Result<BTreeMap<ParaId, Option<ParaHash>>, FailedClient> { ) -> Result<BTreeMap<ParaId, ParaHashAtSource>, FailedClient> {
let mut para_head_hashes = BTreeMap::new(); let mut para_head_hashes = BTreeMap::new();
for para in parachains { for para in parachains {
let para_head = source_client.parachain_head(*at_relay_block, *para).await; let para_head = source_client.parachain_head(*at_relay_block, *para).await;
@@ -554,7 +576,7 @@ mod tests {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct TestClientData { struct TestClientData {
source_sync_status: Result<bool, TestError>, source_sync_status: Result<bool, TestError>,
source_heads: BTreeMap<u32, Result<ParaHash, TestError>>, source_heads: BTreeMap<u32, Result<ParaHashAtSource, TestError>>,
source_proofs: BTreeMap<u32, Result<Vec<u8>, TestError>>, source_proofs: BTreeMap<u32, Result<Vec<u8>, TestError>>,
target_best_block: Result<HeaderIdOf<TestChain>, TestError>, target_best_block: Result<HeaderIdOf<TestChain>, TestError>,
@@ -569,7 +591,9 @@ mod tests {
pub fn minimal() -> Self { pub fn minimal() -> Self {
TestClientData { TestClientData {
source_sync_status: Ok(true), source_sync_status: Ok(true),
source_heads: vec![(PARA_ID, Ok(PARA_0_HASH))].into_iter().collect(), source_heads: vec![(PARA_ID, Ok(ParaHashAtSource::Some(PARA_0_HASH)))]
.into_iter()
.collect(),
source_proofs: vec![(PARA_ID, Ok(PARA_0_HASH.encode()))].into_iter().collect(), source_proofs: vec![(PARA_ID, Ok(PARA_0_HASH.encode()))].into_iter().collect(),
target_best_block: Ok(HeaderId(0, Default::default())), target_best_block: Ok(HeaderId(0, Default::default())),
@@ -615,8 +639,11 @@ mod tests {
&self, &self,
_at_block: HeaderIdOf<TestChain>, _at_block: HeaderIdOf<TestChain>,
para_id: ParaId, para_id: ParaId,
) -> Result<Option<ParaHash>, TestError> { ) -> Result<ParaHashAtSource, TestError> {
self.data.lock().await.source_heads.get(&para_id.0).cloned().transpose() match self.data.lock().await.source_heads.get(&para_id.0).cloned() {
Some(result) => result,
None => Ok(ParaHashAtSource::None),
}
} }
async fn prove_parachain_heads( async fn prove_parachain_heads(
@@ -923,7 +950,7 @@ mod tests {
fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() { fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() {
assert_eq!( assert_eq!(
select_parachains_to_update::<TestParachainsPipeline>( select_parachains_to_update::<TestParachainsPipeline>(
vec![(ParaId(PARA_ID), None)].into_iter().collect(), vec![(ParaId(PARA_ID), ParaHashAtSource::None)].into_iter().collect(),
vec![(ParaId(PARA_ID), None)].into_iter().collect(), vec![(ParaId(PARA_ID), None)].into_iter().collect(),
HeaderId(10, Default::default()), HeaderId(10, Default::default()),
), ),
@@ -935,7 +962,9 @@ mod tests {
fn parachain_is_not_updated_if_it_has_been_updated_at_better_relay_block() { fn parachain_is_not_updated_if_it_has_been_updated_at_better_relay_block() {
assert_eq!( assert_eq!(
select_parachains_to_update::<TestParachainsPipeline>( select_parachains_to_update::<TestParachainsPipeline>(
vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(), vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))]
.into_iter()
.collect(),
vec![( vec![(
ParaId(PARA_ID), ParaId(PARA_ID),
Some(BestParaHeadHash { at_relay_block_number: 20, head_hash: PARA_1_HASH }) Some(BestParaHeadHash { at_relay_block_number: 20, head_hash: PARA_1_HASH })
@@ -952,7 +981,9 @@ mod tests {
fn parachain_is_not_updated_if_hash_is_the_same_at_next_relay_block() { fn parachain_is_not_updated_if_hash_is_the_same_at_next_relay_block() {
assert_eq!( assert_eq!(
select_parachains_to_update::<TestParachainsPipeline>( select_parachains_to_update::<TestParachainsPipeline>(
vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(), vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))]
.into_iter()
.collect(),
vec![( vec![(
ParaId(PARA_ID), ParaId(PARA_ID),
Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH })
@@ -969,7 +1000,7 @@ mod tests {
fn parachain_is_updated_after_offboarding() { fn parachain_is_updated_after_offboarding() {
assert_eq!( assert_eq!(
select_parachains_to_update::<TestParachainsPipeline>( select_parachains_to_update::<TestParachainsPipeline>(
vec![(ParaId(PARA_ID), None)].into_iter().collect(), vec![(ParaId(PARA_ID), ParaHashAtSource::None)].into_iter().collect(),
vec![( vec![(
ParaId(PARA_ID), ParaId(PARA_ID),
Some(BestParaHeadHash { Some(BestParaHeadHash {
@@ -989,7 +1020,9 @@ mod tests {
fn parachain_is_updated_after_onboarding() { fn parachain_is_updated_after_onboarding() {
assert_eq!( assert_eq!(
select_parachains_to_update::<TestParachainsPipeline>( select_parachains_to_update::<TestParachainsPipeline>(
vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(), vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))]
.into_iter()
.collect(),
vec![(ParaId(PARA_ID), None)].into_iter().collect(), vec![(ParaId(PARA_ID), None)].into_iter().collect(),
HeaderId(10, Default::default()), HeaderId(10, Default::default()),
), ),
@@ -1001,7 +1034,9 @@ mod tests {
fn parachain_is_updated_if_newer_head_is_known() { fn parachain_is_updated_if_newer_head_is_known() {
assert_eq!( assert_eq!(
select_parachains_to_update::<TestParachainsPipeline>( select_parachains_to_update::<TestParachainsPipeline>(
vec![(ParaId(PARA_ID), Some(PARA_1_HASH))].into_iter().collect(), vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_1_HASH))]
.into_iter()
.collect(),
vec![( vec![(
ParaId(PARA_ID), ParaId(PARA_ID),
Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH })
@@ -1014,6 +1049,23 @@ mod tests {
); );
} }
#[test]
fn parachain_is_not_updated_if_source_head_is_unavailable() {
assert_eq!(
select_parachains_to_update::<TestParachainsPipeline>(
vec![(ParaId(PARA_ID), ParaHashAtSource::Unavailable)].into_iter().collect(),
vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH })
)]
.into_iter()
.collect(),
HeaderId(10, Default::default()),
),
vec![],
);
}
#[test] #[test]
fn is_update_required_works() { fn is_update_required_works() {
let mut sync_params = ParachainSyncParams { let mut sync_params = ParachainSyncParams {