mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 23:21:02 +00:00
Match substrate's fmt (#1148)
* Alter gitlab. * Use substrate's rustfmt.toml * cargo +nightly fmt --all * Fix spellcheck. * cargo +nightly fmt --all * format. * Fix spellcheck and fmt * fmt? * Fix spellcheck Co-authored-by: Tomasz Drwięga <tomasz@parity.io>
This commit is contained in:
@@ -20,22 +20,33 @@
|
||||
//! may stay until source/target chain state isn't updated. When a header reaches the
|
||||
//! `ready` sub-queue, it may be submitted to the target chain.
|
||||
|
||||
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader};
|
||||
use crate::sync_types::{
|
||||
HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader,
|
||||
};
|
||||
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use num_traits::{One, Zero};
|
||||
use relay_utils::HeaderId;
|
||||
use std::{
|
||||
collections::{btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap, HashSet},
|
||||
collections::{
|
||||
btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap,
|
||||
HashSet,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
type HeadersQueue<P> =
|
||||
BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, QueuedHeader<P>>>;
|
||||
type SyncedChildren<P> =
|
||||
BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, HashSet<HeaderIdOf<P>>>>;
|
||||
type KnownHeaders<P> =
|
||||
BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, HeaderStatus>>;
|
||||
type HeadersQueue<P> = BTreeMap<
|
||||
<P as HeadersSyncPipeline>::Number,
|
||||
HashMap<<P as HeadersSyncPipeline>::Hash, QueuedHeader<P>>,
|
||||
>;
|
||||
type SyncedChildren<P> = BTreeMap<
|
||||
<P as HeadersSyncPipeline>::Number,
|
||||
HashMap<<P as HeadersSyncPipeline>::Hash, HashSet<HeaderIdOf<P>>>,
|
||||
>;
|
||||
type KnownHeaders<P> = BTreeMap<
|
||||
<P as HeadersSyncPipeline>::Number,
|
||||
HashMap<<P as HeadersSyncPipeline>::Hash, HeaderStatus>,
|
||||
>;
|
||||
|
||||
/// We're trying to fetch completion data for single header at this interval.
|
||||
const RETRY_FETCH_COMPLETION_INTERVAL: Duration = Duration::from_secs(20);
|
||||
@@ -113,35 +124,31 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
pub fn headers_in_status(&self, status: HeaderStatus) -> usize {
|
||||
match status {
|
||||
HeaderStatus::Unknown | HeaderStatus::Synced => 0,
|
||||
HeaderStatus::MaybeOrphan => self
|
||||
.maybe_orphan
|
||||
.values()
|
||||
.fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Orphan => self.orphan.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::MaybeExtra => self
|
||||
.maybe_extra
|
||||
.values()
|
||||
.fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Extra => self.extra.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Ready => self.ready.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Incomplete => self.incomplete.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Submitted => self.submitted.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::MaybeOrphan =>
|
||||
self.maybe_orphan.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Orphan =>
|
||||
self.orphan.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::MaybeExtra =>
|
||||
self.maybe_extra.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Extra =>
|
||||
self.extra.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Ready =>
|
||||
self.ready.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Incomplete =>
|
||||
self.incomplete.values().fold(0, |total, headers| total + headers.len()),
|
||||
HeaderStatus::Submitted =>
|
||||
self.submitted.values().fold(0, |total, headers| total + headers.len()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns number of headers that are currently in the queue.
|
||||
pub fn total_headers(&self) -> usize {
|
||||
self.maybe_orphan
|
||||
.values()
|
||||
.fold(0, |total, headers| total + headers.len())
|
||||
+ self.orphan.values().fold(0, |total, headers| total + headers.len())
|
||||
+ self
|
||||
.maybe_extra
|
||||
.values()
|
||||
.fold(0, |total, headers| total + headers.len())
|
||||
+ self.extra.values().fold(0, |total, headers| total + headers.len())
|
||||
+ self.ready.values().fold(0, |total, headers| total + headers.len())
|
||||
+ self.incomplete.values().fold(0, |total, headers| total + headers.len())
|
||||
self.maybe_orphan.values().fold(0, |total, headers| total + headers.len()) +
|
||||
self.orphan.values().fold(0, |total, headers| total + headers.len()) +
|
||||
self.maybe_extra.values().fold(0, |total, headers| total + headers.len()) +
|
||||
self.extra.values().fold(0, |total, headers| total + headers.len()) +
|
||||
self.ready.values().fold(0, |total, headers| total + headers.len()) +
|
||||
self.incomplete.values().fold(0, |total, headers| total + headers.len())
|
||||
}
|
||||
|
||||
/// Returns number of best block in the queue.
|
||||
@@ -157,8 +164,16 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
std::cmp::max(
|
||||
self.ready.keys().next_back().cloned().unwrap_or_else(Zero::zero),
|
||||
std::cmp::max(
|
||||
self.incomplete.keys().next_back().cloned().unwrap_or_else(Zero::zero),
|
||||
self.submitted.keys().next_back().cloned().unwrap_or_else(Zero::zero),
|
||||
self.incomplete
|
||||
.keys()
|
||||
.next_back()
|
||||
.cloned()
|
||||
.unwrap_or_else(Zero::zero),
|
||||
self.submitted
|
||||
.keys()
|
||||
.next_back()
|
||||
.cloned()
|
||||
.unwrap_or_else(Zero::zero),
|
||||
),
|
||||
),
|
||||
),
|
||||
@@ -226,7 +241,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
id,
|
||||
status,
|
||||
);
|
||||
return;
|
||||
return
|
||||
}
|
||||
|
||||
if id.0 < self.prune_border {
|
||||
@@ -236,7 +251,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
P::SOURCE_NAME,
|
||||
id,
|
||||
);
|
||||
return;
|
||||
return
|
||||
}
|
||||
|
||||
let parent_id = header.parent_id();
|
||||
@@ -247,20 +262,20 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
HeaderStatus::Unknown | HeaderStatus::MaybeOrphan => {
|
||||
insert_header(&mut self.maybe_orphan, id, header);
|
||||
HeaderStatus::MaybeOrphan
|
||||
}
|
||||
},
|
||||
HeaderStatus::Orphan => {
|
||||
insert_header(&mut self.orphan, id, header);
|
||||
HeaderStatus::Orphan
|
||||
}
|
||||
HeaderStatus::MaybeExtra
|
||||
| HeaderStatus::Extra
|
||||
| HeaderStatus::Ready
|
||||
| HeaderStatus::Incomplete
|
||||
| HeaderStatus::Submitted
|
||||
| HeaderStatus::Synced => {
|
||||
},
|
||||
HeaderStatus::MaybeExtra |
|
||||
HeaderStatus::Extra |
|
||||
HeaderStatus::Ready |
|
||||
HeaderStatus::Incomplete |
|
||||
HeaderStatus::Submitted |
|
||||
HeaderStatus::Synced => {
|
||||
insert_header(&mut self.maybe_extra, id, header);
|
||||
HeaderStatus::MaybeExtra
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
self.known_headers.entry(id.0).or_default().insert(id.1, status);
|
||||
@@ -288,7 +303,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
HeaderStatus::Orphan,
|
||||
id,
|
||||
);
|
||||
return;
|
||||
return
|
||||
}
|
||||
|
||||
move_header_descendants::<P>(
|
||||
@@ -351,8 +366,8 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
id,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
return
|
||||
},
|
||||
};
|
||||
|
||||
// do not remove from `incomplete_headers` here, because otherwise we'll miss
|
||||
@@ -414,14 +429,20 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
}
|
||||
|
||||
/// Marks given headers incomplete.
|
||||
pub fn add_incomplete_headers(&mut self, make_header_incomplete: bool, new_incomplete_headers: Vec<HeaderIdOf<P>>) {
|
||||
pub fn add_incomplete_headers(
|
||||
&mut self,
|
||||
make_header_incomplete: bool,
|
||||
new_incomplete_headers: Vec<HeaderIdOf<P>>,
|
||||
) {
|
||||
for new_incomplete_header in new_incomplete_headers {
|
||||
if make_header_incomplete {
|
||||
self.header_synced(&new_incomplete_header);
|
||||
}
|
||||
|
||||
let move_origins = select_synced_children::<P>(&self.synced_children, &new_incomplete_header);
|
||||
let move_origins = move_origins.into_iter().chain(std::iter::once(new_incomplete_header));
|
||||
let move_origins =
|
||||
select_synced_children::<P>(&self.synced_children, &new_incomplete_header);
|
||||
let move_origins =
|
||||
move_origins.into_iter().chain(std::iter::once(new_incomplete_header));
|
||||
for move_origin in move_origins {
|
||||
move_header_descendants::<P>(
|
||||
&mut [&mut self.ready, &mut self.submitted],
|
||||
@@ -450,7 +471,9 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
// are moved from Ready/Submitted to Incomplete queue
|
||||
let new_incomplete_headers = ids
|
||||
.iter()
|
||||
.filter(|id| !self.incomplete_headers.contains_key(id) && !self.completion_data.contains_key(id))
|
||||
.filter(|id| {
|
||||
!self.incomplete_headers.contains_key(id) && !self.completion_data.contains_key(id)
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
self.add_incomplete_headers(true, new_incomplete_headers);
|
||||
@@ -468,8 +491,10 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
// sub2eth rejects H if H.Parent is incomplete
|
||||
// sub2sub allows 'syncing' headers like that
|
||||
// => let's check if there are some synced children of just completed header
|
||||
let move_origins = select_synced_children::<P>(&self.synced_children, &just_completed_header);
|
||||
let move_origins = move_origins.into_iter().chain(std::iter::once(just_completed_header));
|
||||
let move_origins =
|
||||
select_synced_children::<P>(&self.synced_children, &just_completed_header);
|
||||
let move_origins =
|
||||
move_origins.into_iter().chain(std::iter::once(just_completed_header));
|
||||
for move_origin in move_origins {
|
||||
move_header_descendants::<P>(
|
||||
&mut [&mut self.incomplete],
|
||||
@@ -500,7 +525,8 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
pub fn incomplete_header(&mut self) -> Option<HeaderIdOf<P>> {
|
||||
queued_incomplete_header(&mut self.incomplete_headers, |last_fetch_time| {
|
||||
let retry = match *last_fetch_time {
|
||||
Some(last_fetch_time) => last_fetch_time.elapsed() > RETRY_FETCH_COMPLETION_INTERVAL,
|
||||
Some(last_fetch_time) =>
|
||||
last_fetch_time.elapsed() > RETRY_FETCH_COMPLETION_INTERVAL,
|
||||
None => true,
|
||||
};
|
||||
|
||||
@@ -521,7 +547,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
/// Prune and never accept headers before this block.
|
||||
pub fn prune(&mut self, prune_border: P::Number) {
|
||||
if prune_border <= self.prune_border {
|
||||
return;
|
||||
return
|
||||
}
|
||||
|
||||
prune_queue(&mut self.maybe_orphan, prune_border);
|
||||
@@ -570,10 +596,10 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
match header {
|
||||
Some(header) => {
|
||||
let parent_id = header.header().parent_id();
|
||||
self.incomplete_headers.contains_key(&parent_id)
|
||||
|| self.completion_data.contains_key(&parent_id)
|
||||
|| self.status(&parent_id) == HeaderStatus::Incomplete
|
||||
}
|
||||
self.incomplete_headers.contains_key(&parent_id) ||
|
||||
self.completion_data.contains_key(&parent_id) ||
|
||||
self.status(&parent_id) == HeaderStatus::Incomplete
|
||||
},
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
@@ -603,12 +629,8 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
.expect("header has a given status; given queue has the header; qed");
|
||||
|
||||
// remember ids of all the children of the current header
|
||||
let synced_children_entry = self
|
||||
.synced_children
|
||||
.entry(current.0)
|
||||
.or_default()
|
||||
.entry(current.1)
|
||||
.or_default();
|
||||
let synced_children_entry =
|
||||
self.synced_children.entry(current.0).or_default().entry(current.1).or_default();
|
||||
let all_queues = [
|
||||
&self.maybe_orphan,
|
||||
&self.orphan,
|
||||
@@ -624,7 +646,9 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
.map(|potential_children| {
|
||||
potential_children
|
||||
.values()
|
||||
.filter(|potential_child| potential_child.header().parent_id() == current)
|
||||
.filter(|potential_child| {
|
||||
potential_child.header().parent_id() == current
|
||||
})
|
||||
.map(|child| child.id())
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
@@ -661,12 +685,19 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
|
||||
}
|
||||
|
||||
/// Insert header to the queue.
|
||||
fn insert_header<P: HeadersSyncPipeline>(queue: &mut HeadersQueue<P>, id: HeaderIdOf<P>, header: QueuedHeader<P>) {
|
||||
fn insert_header<P: HeadersSyncPipeline>(
|
||||
queue: &mut HeadersQueue<P>,
|
||||
id: HeaderIdOf<P>,
|
||||
header: QueuedHeader<P>,
|
||||
) {
|
||||
queue.entry(id.0).or_default().insert(id.1, header);
|
||||
}
|
||||
|
||||
/// Remove header from the queue.
|
||||
fn remove_header<P: HeadersSyncPipeline>(queue: &mut HeadersQueue<P>, id: &HeaderIdOf<P>) -> Option<QueuedHeader<P>> {
|
||||
fn remove_header<P: HeadersSyncPipeline>(
|
||||
queue: &mut HeadersQueue<P>,
|
||||
id: &HeaderIdOf<P>,
|
||||
) -> Option<QueuedHeader<P>> {
|
||||
let mut headers_at = match queue.entry(id.0) {
|
||||
BTreeMapEntry::Occupied(headers_at) => headers_at,
|
||||
BTreeMapEntry::Vacant(_) => return None,
|
||||
@@ -680,7 +711,10 @@ fn remove_header<P: HeadersSyncPipeline>(queue: &mut HeadersQueue<P>, id: &Heade
|
||||
}
|
||||
|
||||
/// Get header from the queue.
|
||||
fn header<'a, P: HeadersSyncPipeline>(queue: &'a HeadersQueue<P>, id: &HeaderIdOf<P>) -> Option<&'a QueuedHeader<P>> {
|
||||
fn header<'a, P: HeadersSyncPipeline>(
|
||||
queue: &'a HeadersQueue<P>,
|
||||
id: &HeaderIdOf<P>,
|
||||
) -> Option<&'a QueuedHeader<P>> {
|
||||
queue.get(&id.0).and_then(|by_hash| by_hash.get(&id.1))
|
||||
}
|
||||
|
||||
@@ -799,11 +833,7 @@ fn oldest_headers<P: HeadersSyncPipeline>(
|
||||
queue: &HeadersQueue<P>,
|
||||
mut f: impl FnMut(&QueuedHeader<P>) -> bool,
|
||||
) -> Option<Vec<&QueuedHeader<P>>> {
|
||||
let result = queue
|
||||
.values()
|
||||
.flat_map(|h| h.values())
|
||||
.take_while(|h| f(h))
|
||||
.collect::<Vec<_>>();
|
||||
let result = queue.values().flat_map(|h| h.values()).take_while(|h| f(h)).collect::<Vec<_>>();
|
||||
if result.is_empty() {
|
||||
None
|
||||
} else {
|
||||
@@ -817,7 +847,10 @@ fn prune_queue<P: HeadersSyncPipeline>(queue: &mut HeadersQueue<P>, prune_border
|
||||
}
|
||||
|
||||
/// Forget all known headers with number less than given.
|
||||
fn prune_known_headers<P: HeadersSyncPipeline>(known_headers: &mut KnownHeaders<P>, prune_border: P::Number) {
|
||||
fn prune_known_headers<P: HeadersSyncPipeline>(
|
||||
known_headers: &mut KnownHeaders<P>,
|
||||
prune_border: P::Number,
|
||||
) {
|
||||
let new_known_headers = known_headers.split_off(&prune_border);
|
||||
for (pruned_number, pruned_headers) in &*known_headers {
|
||||
for pruned_hash in pruned_headers.keys() {
|
||||
@@ -848,8 +881,8 @@ fn queued_incomplete_header<Id: Clone + Eq + std::hash::Hash, T>(
|
||||
map: &mut LinkedHashMap<Id, T>,
|
||||
filter: impl FnMut(&mut T) -> bool,
|
||||
) -> Option<(Id, &T)> {
|
||||
// TODO (#84): headers that have been just appended to the end of the queue would have to wait until
|
||||
// all previous headers will be retried
|
||||
// TODO (#84): headers that have been just appended to the end of the queue would have to wait
|
||||
// until all previous headers will be retried
|
||||
|
||||
let retry_old_header = map
|
||||
.front()
|
||||
@@ -857,9 +890,10 @@ fn queued_incomplete_header<Id: Clone + Eq + std::hash::Hash, T>(
|
||||
.and_then(|key| map.get_mut(&key).map(filter))
|
||||
.unwrap_or(false);
|
||||
if retry_old_header {
|
||||
let (header_key, header) = map.pop_front().expect("we have checked that front() exists; qed");
|
||||
let (header_key, header) =
|
||||
map.pop_front().expect("we have checked that front() exists; qed");
|
||||
map.insert(header_key, header);
|
||||
return map.back().map(|(id, data)| (id.clone(), data));
|
||||
return map.back().map(|(id, data)| (id.clone(), data))
|
||||
}
|
||||
|
||||
None
|
||||
@@ -868,15 +902,15 @@ fn queued_incomplete_header<Id: Clone + Eq + std::hash::Hash, T>(
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::sync_loop_tests::{TestHash, TestHeader, TestHeaderId, TestHeadersSyncPipeline, TestNumber};
|
||||
use crate::sync_types::QueuedHeader;
|
||||
use crate::{
|
||||
sync_loop_tests::{
|
||||
TestHash, TestHeader, TestHeaderId, TestHeadersSyncPipeline, TestNumber,
|
||||
},
|
||||
sync_types::QueuedHeader,
|
||||
};
|
||||
|
||||
pub(crate) fn header(number: TestNumber) -> QueuedHeader<TestHeadersSyncPipeline> {
|
||||
QueuedHeader::new(TestHeader {
|
||||
number,
|
||||
hash: hash(number),
|
||||
parent_hash: hash(number - 1),
|
||||
})
|
||||
QueuedHeader::new(TestHeader { number, hash: hash(number), parent_hash: hash(number - 1) })
|
||||
}
|
||||
|
||||
pub(crate) fn hash(number: TestNumber) -> TestHash {
|
||||
@@ -891,34 +925,41 @@ pub(crate) mod tests {
|
||||
fn total_headers_works() {
|
||||
// total headers just sums up number of headers in every queue
|
||||
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
|
||||
queue.maybe_orphan.entry(1).or_default().insert(
|
||||
hash(1),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue.maybe_orphan.entry(1).or_default().insert(
|
||||
hash(2),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue.maybe_orphan.entry(2).or_default().insert(
|
||||
hash(3),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue.orphan.entry(3).or_default().insert(
|
||||
hash(4),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue.maybe_extra.entry(4).or_default().insert(
|
||||
hash(5),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue.ready.entry(5).or_default().insert(
|
||||
hash(6),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue.incomplete.entry(6).or_default().insert(
|
||||
hash(7),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(1)
|
||||
.or_default()
|
||||
.insert(hash(1), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(1)
|
||||
.or_default()
|
||||
.insert(hash(2), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(2)
|
||||
.or_default()
|
||||
.insert(hash(3), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
queue
|
||||
.orphan
|
||||
.entry(3)
|
||||
.or_default()
|
||||
.insert(hash(4), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
queue
|
||||
.maybe_extra
|
||||
.entry(4)
|
||||
.or_default()
|
||||
.insert(hash(5), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
queue
|
||||
.ready
|
||||
.entry(5)
|
||||
.or_default()
|
||||
.insert(hash(6), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
queue
|
||||
.incomplete
|
||||
.entry(6)
|
||||
.or_default()
|
||||
.insert(hash(7), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
assert_eq!(queue.total_headers(), 7);
|
||||
}
|
||||
|
||||
@@ -926,48 +967,56 @@ pub(crate) mod tests {
|
||||
fn best_queued_number_works() {
|
||||
// initially there are headers in MaybeOrphan queue only
|
||||
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
|
||||
queue.maybe_orphan.entry(1).or_default().insert(
|
||||
hash(1),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue.maybe_orphan.entry(1).or_default().insert(
|
||||
hash(2),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue.maybe_orphan.entry(3).or_default().insert(
|
||||
hash(3),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(1)
|
||||
.or_default()
|
||||
.insert(hash(1), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(1)
|
||||
.or_default()
|
||||
.insert(hash(2), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(3)
|
||||
.or_default()
|
||||
.insert(hash(3), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
assert_eq!(queue.best_queued_number(), 3);
|
||||
// and then there's better header in Orphan
|
||||
queue.orphan.entry(10).or_default().insert(
|
||||
hash(10),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue
|
||||
.orphan
|
||||
.entry(10)
|
||||
.or_default()
|
||||
.insert(hash(10), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
assert_eq!(queue.best_queued_number(), 10);
|
||||
// and then there's better header in MaybeExtra
|
||||
queue.maybe_extra.entry(20).or_default().insert(
|
||||
hash(20),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue
|
||||
.maybe_extra
|
||||
.entry(20)
|
||||
.or_default()
|
||||
.insert(hash(20), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
assert_eq!(queue.best_queued_number(), 20);
|
||||
// and then there's better header in Ready
|
||||
queue.ready.entry(30).or_default().insert(
|
||||
hash(30),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue
|
||||
.ready
|
||||
.entry(30)
|
||||
.or_default()
|
||||
.insert(hash(30), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
assert_eq!(queue.best_queued_number(), 30);
|
||||
// and then there's better header in MaybeOrphan again
|
||||
queue.maybe_orphan.entry(40).or_default().insert(
|
||||
hash(40),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(40)
|
||||
.or_default()
|
||||
.insert(hash(40), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
assert_eq!(queue.best_queued_number(), 40);
|
||||
// and then there's some header in Incomplete
|
||||
queue.incomplete.entry(50).or_default().insert(
|
||||
hash(50),
|
||||
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
|
||||
);
|
||||
queue
|
||||
.incomplete
|
||||
.entry(50)
|
||||
.or_default()
|
||||
.insert(hash(50), QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()));
|
||||
assert_eq!(queue.best_queued_number(), 50);
|
||||
}
|
||||
|
||||
@@ -977,11 +1026,7 @@ pub(crate) mod tests {
|
||||
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
|
||||
assert_eq!(queue.status(&id(10)), HeaderStatus::Unknown);
|
||||
// and status is read from the KnownHeaders
|
||||
queue
|
||||
.known_headers
|
||||
.entry(10)
|
||||
.or_default()
|
||||
.insert(hash(10), HeaderStatus::Ready);
|
||||
queue.known_headers.entry(10).or_default().insert(hash(10), HeaderStatus::Ready);
|
||||
assert_eq!(queue.status(&id(10)), HeaderStatus::Ready);
|
||||
}
|
||||
|
||||
@@ -990,22 +1035,13 @@ pub(crate) mod tests {
|
||||
// initially we have oldest header #10
|
||||
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
|
||||
queue.maybe_orphan.entry(10).or_default().insert(hash(1), header(100));
|
||||
assert_eq!(
|
||||
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash,
|
||||
hash(100)
|
||||
);
|
||||
assert_eq!(queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash, hash(100));
|
||||
// inserting #20 changes nothing
|
||||
queue.maybe_orphan.entry(20).or_default().insert(hash(1), header(101));
|
||||
assert_eq!(
|
||||
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash,
|
||||
hash(100)
|
||||
);
|
||||
assert_eq!(queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash, hash(100));
|
||||
// inserting #5 makes it oldest
|
||||
queue.maybe_orphan.entry(5).or_default().insert(hash(1), header(102));
|
||||
assert_eq!(
|
||||
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash,
|
||||
hash(102)
|
||||
);
|
||||
assert_eq!(queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash, hash(102));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1091,11 +1127,7 @@ pub(crate) mod tests {
|
||||
.entry(100)
|
||||
.or_default()
|
||||
.insert(hash(100), HeaderStatus::MaybeOrphan);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(100)
|
||||
.or_default()
|
||||
.insert(hash(100), header(100));
|
||||
queue.maybe_orphan.entry(100).or_default().insert(hash(100), header(100));
|
||||
queue
|
||||
.known_headers
|
||||
.entry(99)
|
||||
@@ -1108,17 +1140,9 @@ pub(crate) mod tests {
|
||||
.or_default()
|
||||
.insert(hash(98), HeaderStatus::MaybeExtra);
|
||||
queue.maybe_extra.entry(98).or_default().insert(hash(98), header(98));
|
||||
queue
|
||||
.known_headers
|
||||
.entry(97)
|
||||
.or_default()
|
||||
.insert(hash(97), HeaderStatus::Extra);
|
||||
queue.known_headers.entry(97).or_default().insert(hash(97), HeaderStatus::Extra);
|
||||
queue.extra.entry(97).or_default().insert(hash(97), header(97));
|
||||
queue
|
||||
.known_headers
|
||||
.entry(96)
|
||||
.or_default()
|
||||
.insert(hash(96), HeaderStatus::Ready);
|
||||
queue.known_headers.entry(96).or_default().insert(hash(96), HeaderStatus::Ready);
|
||||
queue.ready.entry(96).or_default().insert(hash(96), header(96));
|
||||
queue.target_best_header_response(&id(100));
|
||||
|
||||
@@ -1137,31 +1161,19 @@ pub(crate) mod tests {
|
||||
// children of synced headers are stored
|
||||
assert_eq!(
|
||||
vec![id(97)],
|
||||
queue.synced_children[&96][&hash(96)]
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
queue.synced_children[&96][&hash(96)].iter().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
assert_eq!(
|
||||
vec![id(98)],
|
||||
queue.synced_children[&97][&hash(97)]
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
queue.synced_children[&97][&hash(97)].iter().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
assert_eq!(
|
||||
vec![id(99)],
|
||||
queue.synced_children[&98][&hash(98)]
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
queue.synced_children[&98][&hash(98)].iter().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
assert_eq!(
|
||||
vec![id(100)],
|
||||
queue.synced_children[&99][&hash(99)]
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
queue.synced_children[&99][&hash(99)].iter().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
assert_eq!(0, queue.synced_children[&100][&hash(100)].len());
|
||||
}
|
||||
@@ -1185,11 +1197,7 @@ pub(crate) mod tests {
|
||||
.entry(102)
|
||||
.or_default()
|
||||
.insert(hash(102), HeaderStatus::MaybeOrphan);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(102)
|
||||
.or_default()
|
||||
.insert(hash(102), header(102));
|
||||
queue.maybe_orphan.entry(102).or_default().insert(hash(102), header(102));
|
||||
queue
|
||||
.known_headers
|
||||
.entry(103)
|
||||
@@ -1221,11 +1229,7 @@ pub(crate) mod tests {
|
||||
.entry(100)
|
||||
.or_default()
|
||||
.insert(hash(100), HeaderStatus::MaybeOrphan);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(100)
|
||||
.or_default()
|
||||
.insert(hash(100), header(100));
|
||||
queue.maybe_orphan.entry(100).or_default().insert(hash(100), header(100));
|
||||
queue
|
||||
.known_headers
|
||||
.entry(101)
|
||||
@@ -1237,11 +1241,7 @@ pub(crate) mod tests {
|
||||
.entry(102)
|
||||
.or_default()
|
||||
.insert(hash(102), HeaderStatus::MaybeOrphan);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(102)
|
||||
.or_default()
|
||||
.insert(hash(102), header(102));
|
||||
queue.maybe_orphan.entry(102).or_default().insert(hash(102), header(102));
|
||||
queue.maybe_orphan_response(&id(99), true);
|
||||
|
||||
// then all headers (#100..#103) are moved to the MaybeExtra queue
|
||||
@@ -1266,21 +1266,13 @@ pub(crate) mod tests {
|
||||
.entry(100)
|
||||
.or_default()
|
||||
.insert(hash(100), HeaderStatus::MaybeOrphan);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(100)
|
||||
.or_default()
|
||||
.insert(hash(100), header(100));
|
||||
queue.maybe_orphan.entry(100).or_default().insert(hash(100), header(100));
|
||||
queue
|
||||
.known_headers
|
||||
.entry(101)
|
||||
.or_default()
|
||||
.insert(hash(101), HeaderStatus::MaybeOrphan);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(101)
|
||||
.or_default()
|
||||
.insert(hash(101), header(101));
|
||||
queue.maybe_orphan.entry(101).or_default().insert(hash(101), header(101));
|
||||
queue.maybe_orphan_response(&id(99), false);
|
||||
|
||||
// then all headers (#100..#101) are moved to the Orphan queue
|
||||
@@ -1395,7 +1387,9 @@ pub(crate) mod tests {
|
||||
queue.incomplete_headers.clear();
|
||||
queue.incomplete_headers.insert(
|
||||
id(100),
|
||||
Some(Instant::now() - RETRY_FETCH_COMPLETION_INTERVAL - RETRY_FETCH_COMPLETION_INTERVAL),
|
||||
Some(
|
||||
Instant::now() - RETRY_FETCH_COMPLETION_INTERVAL - RETRY_FETCH_COMPLETION_INTERVAL,
|
||||
),
|
||||
);
|
||||
assert_eq!(queue.incomplete_header(), Some(id(100)));
|
||||
}
|
||||
@@ -1551,11 +1545,7 @@ pub(crate) mod tests {
|
||||
.entry(104)
|
||||
.or_default()
|
||||
.insert(hash(104), HeaderStatus::MaybeOrphan);
|
||||
queue
|
||||
.maybe_orphan
|
||||
.entry(104)
|
||||
.or_default()
|
||||
.insert(hash(104), header(104));
|
||||
queue.maybe_orphan.entry(104).or_default().insert(hash(104), header(104));
|
||||
queue
|
||||
.known_headers
|
||||
.entry(103)
|
||||
@@ -1624,7 +1614,8 @@ pub(crate) mod tests {
|
||||
fn incomplete_headers_are_still_incomplete_after_advance() {
|
||||
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
|
||||
|
||||
// relay#1 knows that header#100 is incomplete && it has headers 101..104 in incomplete queue
|
||||
// relay#1 knows that header#100 is incomplete && it has headers 101..104 in incomplete
|
||||
// queue
|
||||
queue.incomplete_headers.insert(id(100), None);
|
||||
queue.incomplete.entry(101).or_default().insert(hash(101), header(101));
|
||||
queue.incomplete.entry(102).or_default().insert(hash(102), header(102));
|
||||
@@ -1656,8 +1647,8 @@ pub(crate) mod tests {
|
||||
.or_default()
|
||||
.insert(hash(104), HeaderStatus::Incomplete);
|
||||
|
||||
// let's say relay#2 completes header#100 and then submits header#101+header#102 and it turns
|
||||
// out that header#102 is also incomplete
|
||||
// let's say relay#2 completes header#100 and then submits header#101+header#102 and it
|
||||
// turns out that header#102 is also incomplete
|
||||
queue.incomplete_headers_response(vec![id(102)].into_iter().collect());
|
||||
|
||||
// then the header#103 and the header#104 must have Incomplete status
|
||||
|
||||
@@ -19,8 +19,10 @@
|
||||
//! to submit to the target chain? The context makes decisions basing on parameters
|
||||
//! passed using `HeadersSyncParams` structure.
|
||||
|
||||
use crate::headers::QueuedHeaders;
|
||||
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader};
|
||||
use crate::{
|
||||
headers::QueuedHeaders,
|
||||
sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader},
|
||||
};
|
||||
use num_traits::{One, Saturating, Zero};
|
||||
|
||||
/// Common sync params.
|
||||
@@ -121,20 +123,21 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
|
||||
// if we haven't received best header from source node yet, there's nothing we can download
|
||||
let source_best_number = self.source_best_number?;
|
||||
|
||||
// if we haven't received known best header from target node yet, there's nothing we can download
|
||||
// if we haven't received known best header from target node yet, there's nothing we can
|
||||
// download
|
||||
let target_best_header = self.target_best_header.as_ref()?;
|
||||
|
||||
// if there's too many headers in the queue, stop downloading
|
||||
let in_memory_headers = self.headers.total_headers();
|
||||
if in_memory_headers >= self.params.max_future_headers_to_download {
|
||||
return None;
|
||||
return None
|
||||
}
|
||||
|
||||
// if queue is empty and best header on target is > than best header on source,
|
||||
// then we shoud reorganization
|
||||
let best_queued_number = self.headers.best_queued_number();
|
||||
if best_queued_number.is_zero() && source_best_number < target_best_header.0 {
|
||||
return Some(source_best_number);
|
||||
return Some(source_best_number)
|
||||
}
|
||||
|
||||
// we assume that there were no reorganizations if we have already downloaded best header
|
||||
@@ -143,14 +146,14 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
|
||||
target_best_header.0,
|
||||
);
|
||||
if best_downloaded_number >= source_best_number {
|
||||
return None;
|
||||
return None
|
||||
}
|
||||
|
||||
// download new header
|
||||
Some(best_downloaded_number + One::one())
|
||||
}
|
||||
|
||||
/// Selech orphan header to download.
|
||||
/// Select orphan header to download.
|
||||
pub fn select_orphan_header_to_download(&self) -> Option<&QueuedHeader<P>> {
|
||||
let orphan_header = self.headers.header(HeaderStatus::Orphan)?;
|
||||
|
||||
@@ -159,7 +162,7 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
|
||||
// => let's avoid fetching duplicate headers
|
||||
let parent_id = orphan_header.parent_id();
|
||||
if self.headers.status(&parent_id) != HeaderStatus::Unknown {
|
||||
return None;
|
||||
return None
|
||||
}
|
||||
|
||||
Some(orphan_header)
|
||||
@@ -169,12 +172,12 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
|
||||
pub fn select_headers_to_submit(&self, stalled: bool) -> Option<Vec<&QueuedHeader<P>>> {
|
||||
// maybe we have paused new headers submit?
|
||||
if self.pause_submit {
|
||||
return None;
|
||||
return None
|
||||
}
|
||||
|
||||
// if we operate in backup mode, we only submit headers when sync has stalled
|
||||
if self.params.target_tx_mode == TargetTransactionMode::Backup && !stalled {
|
||||
return None;
|
||||
return None
|
||||
}
|
||||
|
||||
let headers_in_submit_status = self.headers.headers_in_status(HeaderStatus::Submitted);
|
||||
@@ -187,15 +190,17 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
|
||||
let mut total_headers = 0;
|
||||
self.headers.headers(HeaderStatus::Ready, |header| {
|
||||
if total_headers == headers_to_submit_count {
|
||||
return false;
|
||||
return false
|
||||
}
|
||||
if total_headers == self.params.max_headers_in_single_submit {
|
||||
return false;
|
||||
return false
|
||||
}
|
||||
|
||||
let encoded_size = P::estimate_size(header);
|
||||
if total_headers != 0 && total_size + encoded_size > self.params.max_headers_size_in_single_submit {
|
||||
return false;
|
||||
if total_headers != 0 &&
|
||||
total_size + encoded_size > self.params.max_headers_size_in_single_submit
|
||||
{
|
||||
return false
|
||||
}
|
||||
|
||||
total_size += encoded_size;
|
||||
@@ -228,15 +233,14 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
|
||||
|
||||
// early return if it is still the same
|
||||
if self.target_best_header == Some(best_header) {
|
||||
return false;
|
||||
return false
|
||||
}
|
||||
|
||||
// remember that this header is now known to the Substrate runtime
|
||||
self.headers.target_best_header_response(&best_header);
|
||||
|
||||
// prune ancient headers
|
||||
self.headers
|
||||
.prune(best_header.0.saturating_sub(self.params.prune_depth.into()));
|
||||
self.headers.prune(best_header.0.saturating_sub(self.params.prune_depth.into()));
|
||||
|
||||
// finally remember the best header itself
|
||||
self.target_best_header = Some(best_header);
|
||||
@@ -281,9 +285,11 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
use crate::headers::tests::{header, id};
|
||||
use crate::sync_loop_tests::{TestHash, TestHeadersSyncPipeline, TestNumber};
|
||||
use crate::sync_types::HeaderStatus;
|
||||
use crate::{
|
||||
headers::tests::{header, id},
|
||||
sync_loop_tests::{TestHash, TestHeadersSyncPipeline, TestNumber},
|
||||
sync_types::HeaderStatus,
|
||||
};
|
||||
use relay_utils::HeaderId;
|
||||
|
||||
fn side_hash(number: TestNumber) -> TestHash {
|
||||
|
||||
@@ -16,9 +16,11 @@
|
||||
|
||||
//! Entrypoint for running headers synchronization loop.
|
||||
|
||||
use crate::sync::{HeadersSync, HeadersSyncParams};
|
||||
use crate::sync_loop_metrics::SyncLoopMetrics;
|
||||
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders};
|
||||
use crate::{
|
||||
sync::{HeadersSync, HeadersSyncParams},
|
||||
sync_loop_metrics::SyncLoopMetrics,
|
||||
sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{future::FutureExt, stream::StreamExt};
|
||||
@@ -66,8 +68,10 @@ pub trait SourceClient<P: HeadersSyncPipeline>: RelayClient {
|
||||
async fn header_by_number(&self, number: P::Number) -> Result<P::Header, Self::Error>;
|
||||
|
||||
/// Get completion data by header hash.
|
||||
async fn header_completion(&self, id: HeaderIdOf<P>)
|
||||
-> Result<(HeaderIdOf<P>, Option<P::Completion>), Self::Error>;
|
||||
async fn header_completion(
|
||||
&self,
|
||||
id: HeaderIdOf<P>,
|
||||
) -> Result<(HeaderIdOf<P>, Option<P::Completion>), Self::Error>;
|
||||
|
||||
/// Get extra data by header hash.
|
||||
async fn header_extra(
|
||||
@@ -84,20 +88,32 @@ pub trait TargetClient<P: HeadersSyncPipeline>: RelayClient {
|
||||
async fn best_header_id(&self) -> Result<HeaderIdOf<P>, Self::Error>;
|
||||
|
||||
/// Returns true if header is known to the target node.
|
||||
async fn is_known_header(&self, id: HeaderIdOf<P>) -> Result<(HeaderIdOf<P>, bool), Self::Error>;
|
||||
async fn is_known_header(
|
||||
&self,
|
||||
id: HeaderIdOf<P>,
|
||||
) -> Result<(HeaderIdOf<P>, bool), Self::Error>;
|
||||
|
||||
/// Submit headers.
|
||||
async fn submit_headers(&self, headers: Vec<QueuedHeader<P>>) -> SubmittedHeaders<HeaderIdOf<P>, Self::Error>;
|
||||
async fn submit_headers(
|
||||
&self,
|
||||
headers: Vec<QueuedHeader<P>>,
|
||||
) -> SubmittedHeaders<HeaderIdOf<P>, Self::Error>;
|
||||
|
||||
/// Returns ID of headers that require to be 'completed' before children can be submitted.
|
||||
async fn incomplete_headers_ids(&self) -> Result<HashSet<HeaderIdOf<P>>, Self::Error>;
|
||||
|
||||
/// Submit completion data for header.
|
||||
async fn complete_header(&self, id: HeaderIdOf<P>, completion: P::Completion)
|
||||
-> Result<HeaderIdOf<P>, Self::Error>;
|
||||
async fn complete_header(
|
||||
&self,
|
||||
id: HeaderIdOf<P>,
|
||||
completion: P::Completion,
|
||||
) -> Result<HeaderIdOf<P>, Self::Error>;
|
||||
|
||||
/// Returns true if header requires extra data to be submitted.
|
||||
async fn requires_extra(&self, header: QueuedHeader<P>) -> Result<(HeaderIdOf<P>, bool), Self::Error>;
|
||||
async fn requires_extra(
|
||||
&self,
|
||||
header: QueuedHeader<P>,
|
||||
) -> Result<(HeaderIdOf<P>, bool), Self::Error>;
|
||||
}
|
||||
|
||||
/// Synchronization maintain procedure.
|
||||
@@ -110,7 +126,8 @@ pub trait SyncMaintain<P: HeadersSyncPipeline>: 'static + Clone + Send + Sync {
|
||||
|
||||
impl<P: HeadersSyncPipeline> SyncMaintain<P> for () {}
|
||||
|
||||
/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs sync loop.
|
||||
/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs
|
||||
/// sync loop.
|
||||
pub fn metrics_prefix<P: HeadersSyncPipeline>() -> String {
|
||||
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
|
||||
}
|
||||
@@ -480,7 +497,8 @@ async fn run_until_connection_lost<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
id,
|
||||
);
|
||||
|
||||
target_complete_header_future.set(target_client.complete_header(id, completion.clone()).fuse());
|
||||
target_complete_header_future
|
||||
.set(target_client.complete_header(id, completion.clone()).fuse());
|
||||
} else if let Some(header) = sync.headers().header(HeaderStatus::MaybeExtra) {
|
||||
log::debug!(
|
||||
target: "bridge",
|
||||
@@ -501,8 +519,8 @@ async fn run_until_connection_lost<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
);
|
||||
|
||||
target_existence_status_future.set(target_client.is_known_header(parent_id).fuse());
|
||||
} else if let Some(headers) =
|
||||
sync.select_headers_to_submit(last_update_time.elapsed() > BACKUP_STALL_SYNC_TIMEOUT)
|
||||
} else if let Some(headers) = sync
|
||||
.select_headers_to_submit(last_update_time.elapsed() > BACKUP_STALL_SYNC_TIMEOUT)
|
||||
{
|
||||
log::debug!(
|
||||
target: "bridge",
|
||||
@@ -580,7 +598,7 @@ async fn run_until_connection_lost<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
P::SOURCE_NAME,
|
||||
P::TARGET_NAME,
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
log::debug!(
|
||||
@@ -616,15 +634,14 @@ fn print_sync_progress<P: HeadersSyncPipeline>(
|
||||
let now_time = Instant::now();
|
||||
let (now_best_header, now_target_header) = eth_sync.status();
|
||||
|
||||
let need_update = now_time - prev_time > Duration::from_secs(10)
|
||||
|| match (prev_best_header, now_best_header) {
|
||||
(Some(prev_best_header), Some(now_best_header)) => {
|
||||
now_best_header.0.saturating_sub(prev_best_header) > 10.into()
|
||||
}
|
||||
let need_update = now_time - prev_time > Duration::from_secs(10) ||
|
||||
match (prev_best_header, now_best_header) {
|
||||
(Some(prev_best_header), Some(now_best_header)) =>
|
||||
now_best_header.0.saturating_sub(prev_best_header) > 10.into(),
|
||||
_ => false,
|
||||
};
|
||||
if !need_update {
|
||||
return (prev_time, prev_best_header, prev_target_header);
|
||||
return (prev_time, prev_best_header, prev_target_header)
|
||||
}
|
||||
|
||||
log::info!(
|
||||
|
||||
@@ -16,8 +16,10 @@
|
||||
|
||||
//! Metrics for headers synchronization relay loop.
|
||||
|
||||
use crate::sync::HeadersSync;
|
||||
use crate::sync_types::{HeaderStatus, HeadersSyncPipeline};
|
||||
use crate::{
|
||||
sync::HeadersSync,
|
||||
sync_types::{HeaderStatus, HeadersSyncPipeline},
|
||||
};
|
||||
|
||||
use num_traits::Zero;
|
||||
use relay_utils::metrics::{metric_name, register, GaugeVec, Opts, PrometheusError, Registry, U64};
|
||||
@@ -78,7 +80,8 @@ impl SyncLoopMetrics {
|
||||
pub fn update<P: HeadersSyncPipeline>(&self, sync: &HeadersSync<P>) {
|
||||
let headers = sync.headers();
|
||||
let source_best_number = sync.source_best_number().unwrap_or_else(Zero::zero);
|
||||
let target_best_number = sync.target_best_header().map(|id| id.0).unwrap_or_else(Zero::zero);
|
||||
let target_best_number =
|
||||
sync.target_best_header().map(|id| id.0).unwrap_or_else(Zero::zero);
|
||||
|
||||
self.update_best_block_at_source(source_best_number);
|
||||
self.update_best_block_at_target(target_best_number);
|
||||
|
||||
@@ -16,16 +16,18 @@
|
||||
|
||||
#![cfg(test)]
|
||||
|
||||
use crate::sync_loop::{run, SourceClient, TargetClient};
|
||||
use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders};
|
||||
use crate::{
|
||||
sync_loop::{run, SourceClient, TargetClient},
|
||||
sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::backoff::Backoff;
|
||||
use futures::{future::FutureExt, stream::StreamExt};
|
||||
use parking_lot::Mutex;
|
||||
use relay_utils::{
|
||||
metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient, retry_backoff, HeaderId,
|
||||
MaybeConnectionError,
|
||||
metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient,
|
||||
retry_backoff, HeaderId, MaybeConnectionError,
|
||||
};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
@@ -166,7 +168,10 @@ impl SourceClient<TestHeadersSyncPipeline> for Source {
|
||||
data.header_by_number.get(&number).cloned().ok_or(TestError(false))
|
||||
}
|
||||
|
||||
async fn header_completion(&self, id: TestHeaderId) -> Result<(TestHeaderId, Option<TestCompletion>), TestError> {
|
||||
async fn header_completion(
|
||||
&self,
|
||||
id: TestHeaderId,
|
||||
) -> Result<(TestHeaderId, Option<TestCompletion>), TestError> {
|
||||
let mut data = self.data.lock();
|
||||
(self.on_method_call)(SourceMethod::HeaderCompletion(id), &mut *data);
|
||||
if data.provides_completion {
|
||||
@@ -264,7 +269,10 @@ impl TargetClient<TestHeadersSyncPipeline> for Target {
|
||||
.unwrap_or(Ok((id, false)))
|
||||
}
|
||||
|
||||
async fn submit_headers(&self, headers: Vec<TestQueuedHeader>) -> SubmittedHeaders<TestHeaderId, TestError> {
|
||||
async fn submit_headers(
|
||||
&self,
|
||||
headers: Vec<TestQueuedHeader>,
|
||||
) -> SubmittedHeaders<TestHeaderId, TestError> {
|
||||
let mut data = self.data.lock();
|
||||
(self.on_method_call)(TargetMethod::SubmitHeaders(headers.clone()), &mut *data);
|
||||
data.submitted_headers
|
||||
@@ -287,14 +295,21 @@ impl TargetClient<TestHeadersSyncPipeline> for Target {
|
||||
}
|
||||
}
|
||||
|
||||
async fn complete_header(&self, id: TestHeaderId, completion: TestCompletion) -> Result<TestHeaderId, TestError> {
|
||||
async fn complete_header(
|
||||
&self,
|
||||
id: TestHeaderId,
|
||||
completion: TestCompletion,
|
||||
) -> Result<TestHeaderId, TestError> {
|
||||
let mut data = self.data.lock();
|
||||
(self.on_method_call)(TargetMethod::CompleteHeader(id, completion), &mut *data);
|
||||
data.completed_headers.insert(id.1, completion);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
async fn requires_extra(&self, header: TestQueuedHeader) -> Result<(TestHeaderId, bool), TestError> {
|
||||
async fn requires_extra(
|
||||
&self,
|
||||
header: TestQueuedHeader,
|
||||
) -> Result<(TestHeaderId, bool), TestError> {
|
||||
let mut data = self.data.lock();
|
||||
(self.on_method_call)(TargetMethod::RequiresExtra(header.clone()), &mut *data);
|
||||
if data.requires_extra {
|
||||
@@ -321,11 +336,7 @@ fn test_header(number: TestNumber) -> TestHeader {
|
||||
TestHeader {
|
||||
hash: id.1,
|
||||
number: id.0,
|
||||
parent_hash: if number == 0 {
|
||||
TestHash::default()
|
||||
} else {
|
||||
test_id(number - 1).1
|
||||
},
|
||||
parent_hash: if number == 0 { TestHash::default() } else { test_id(number - 1).1 },
|
||||
}
|
||||
}
|
||||
|
||||
@@ -467,18 +478,15 @@ fn run_sync_loop_test(params: SyncLoopTestParams) {
|
||||
let target_requires_extra = params.target_requires_extra;
|
||||
let target_requires_completion = params.target_requires_completion;
|
||||
let stop_at = params.stop_at;
|
||||
let source = Source::new(
|
||||
params.best_source_header.id(),
|
||||
params.headers_on_source,
|
||||
move |method, _| {
|
||||
let source =
|
||||
Source::new(params.best_source_header.id(), params.headers_on_source, move |method, _| {
|
||||
if !target_requires_extra {
|
||||
source_reject_extra(&method);
|
||||
}
|
||||
if !target_requires_completion {
|
||||
source_reject_completion(&method);
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
let target = Target::new(
|
||||
params.best_target_header.id(),
|
||||
params.headers_on_target.into_iter().map(|header| header.id()).collect(),
|
||||
|
||||
@@ -50,7 +50,14 @@ pub trait HeadersSyncPipeline: 'static + Clone + Send + Sync {
|
||||
const TARGET_NAME: &'static str;
|
||||
|
||||
/// Headers we're syncing are identified by this hash.
|
||||
type Hash: Eq + Clone + Copy + Send + Sync + std::fmt::Debug + std::fmt::Display + std::hash::Hash;
|
||||
type Hash: Eq
|
||||
+ Clone
|
||||
+ Copy
|
||||
+ Send
|
||||
+ Sync
|
||||
+ std::fmt::Debug
|
||||
+ std::fmt::Display
|
||||
+ std::hash::Hash;
|
||||
/// Headers we're syncing are identified by this number.
|
||||
type Number: relay_utils::BlockNumberBase;
|
||||
/// Type of header that we're syncing.
|
||||
@@ -77,7 +84,8 @@ pub trait HeadersSyncPipeline: 'static + Clone + Send + Sync {
|
||||
}
|
||||
|
||||
/// A HeaderId for `HeaderSyncPipeline`.
|
||||
pub type HeaderIdOf<P> = HeaderId<<P as HeadersSyncPipeline>::Hash, <P as HeadersSyncPipeline>::Number>;
|
||||
pub type HeaderIdOf<P> =
|
||||
HeaderId<<P as HeadersSyncPipeline>::Hash, <P as HeadersSyncPipeline>::Number>;
|
||||
|
||||
/// Header that we're receiving from source node.
|
||||
pub trait SourceHeader<Hash, Number>: Clone + std::fmt::Debug + PartialEq + Send + Sync {
|
||||
@@ -153,8 +161,8 @@ impl<P: HeadersSyncPipeline> QueuedHeader<P> {
|
||||
pub struct SubmittedHeaders<Id, Error> {
|
||||
/// IDs of headers that have been submitted to target node.
|
||||
pub submitted: Vec<Id>,
|
||||
/// IDs of incomplete headers. These headers were submitted (so this id is also in `submitted` vec),
|
||||
/// but all descendants are not.
|
||||
/// IDs of incomplete headers. These headers were submitted (so this id is also in `submitted`
|
||||
/// vec), but all descendants are not.
|
||||
pub incomplete: Vec<Id>,
|
||||
/// IDs of ignored headers that we have decided not to submit (they are either rejected by
|
||||
/// target node immediately, or their descendants of incomplete headers).
|
||||
@@ -180,10 +188,6 @@ impl<Id: std::fmt::Debug, Error> std::fmt::Display for SubmittedHeaders<Id, Erro
|
||||
let incomplete = format_ids(self.incomplete.iter());
|
||||
let rejected = format_ids(self.rejected.iter());
|
||||
|
||||
write!(
|
||||
f,
|
||||
"Submitted: {}, Incomplete: {}, Rejected: {}",
|
||||
submitted, incomplete, rejected
|
||||
)
|
||||
write!(f, "Submitted: {}, Incomplete: {}, Rejected: {}", submitted, incomplete, rejected)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user