mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 16:57:58 +00:00
Collator protocol followup (#1741)
* Metrics * Dont punish late collations * Fix metrics * Update node/network/collator-protocol/src/lib.rs Co-authored-by: Andronik Ordian <write@reusable.software> * Change on_request arg to Result Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
@@ -29,6 +29,7 @@ use polkadot_subsystem::{
|
||||
AllMessages, CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest,
|
||||
NetworkBridgeMessage,
|
||||
},
|
||||
metrics::{self, prometheus},
|
||||
};
|
||||
use polkadot_node_network_protocol::{
|
||||
v1 as protocol_v1, View, PeerId, PeerSet, NetworkBridgeEvent, RequestId,
|
||||
@@ -38,6 +39,54 @@ use polkadot_node_subsystem_util::{
|
||||
request_validator_groups_ctx,
|
||||
};
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(super) struct Metrics(Option<MetricsInner>);
|
||||
|
||||
impl Metrics {
|
||||
fn on_advertisment_made(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.advertisments_made.inc();
|
||||
}
|
||||
}
|
||||
|
||||
fn on_collation_sent(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.collations_sent.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
advertisments_made: prometheus::Counter<prometheus::U64>,
|
||||
collations_sent: prometheus::Counter<prometheus::U64>,
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry)
|
||||
-> std::result::Result<Self, prometheus::PrometheusError>
|
||||
{
|
||||
let metrics = MetricsInner {
|
||||
advertisments_made: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"parachain_advertisments_made_total",
|
||||
"A number of advertisments sent to validators.",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
collations_sent: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"parachain_collations_sent_total",
|
||||
"A number of collations sent to validators.",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct State {
|
||||
/// Our id.
|
||||
@@ -69,6 +118,9 @@ struct State {
|
||||
/// Entries in this map will be cleared as validator groups in `our_validator_groups`
|
||||
/// go out of scope with their respective deactivated leafs.
|
||||
known_validators: HashMap<PeerId, ValidatorId>,
|
||||
|
||||
/// Metrics.
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
/// Distribute a collation.
|
||||
@@ -287,6 +339,8 @@ where
|
||||
)
|
||||
)).await?;
|
||||
|
||||
state.metrics.on_advertisment_made();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -367,6 +421,7 @@ where
|
||||
/// Issue a response to a previously requested collation.
|
||||
async fn send_collation<Context>(
|
||||
ctx: &mut Context,
|
||||
state: &mut State,
|
||||
request_id: RequestId,
|
||||
origin: PeerId,
|
||||
receipt: CandidateReceipt,
|
||||
@@ -388,6 +443,8 @@ where
|
||||
)
|
||||
)).await?;
|
||||
|
||||
state.metrics.on_collation_sent();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -421,7 +478,7 @@ where
|
||||
Some(our_para_id) => {
|
||||
if our_para_id == para_id {
|
||||
if let Some(collation) = state.collations.get(&relay_parent).cloned() {
|
||||
send_collation(ctx, request_id, origin, collation.0, collation.1).await?;
|
||||
send_collation(ctx, state, request_id, origin, collation.0, collation.1).await?;
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
@@ -555,14 +612,21 @@ async fn handle_our_view_change(
|
||||
}
|
||||
|
||||
/// The collator protocol collator side main loop.
|
||||
pub(crate) async fn run<Context>(mut ctx: Context, our_id: CollatorId) -> Result<()>
|
||||
pub(crate) async fn run<Context>(
|
||||
mut ctx: Context,
|
||||
our_id: CollatorId,
|
||||
metrics: Metrics,
|
||||
) -> Result<()>
|
||||
where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>
|
||||
{
|
||||
use FromOverseer::*;
|
||||
use OverseerSignal::*;
|
||||
|
||||
let mut state = State::default();
|
||||
let mut state = State {
|
||||
metrics,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
state.our_id = our_id;
|
||||
|
||||
@@ -597,7 +661,7 @@ mod tests {
|
||||
};
|
||||
use polkadot_subsystem::ActiveLeavesUpdate;
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
use polkadot_subsystem_testhelpers::{self as test_helpers};
|
||||
use polkadot_subsystem_testhelpers as test_helpers;
|
||||
use polkadot_node_network_protocol::ObservedRole;
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -719,7 +783,7 @@ mod tests {
|
||||
|
||||
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
|
||||
|
||||
let subsystem = run(context, collator_id);
|
||||
let subsystem = run(context, collator_id, Metrics::default());
|
||||
|
||||
let test_fut = test(TestHarness { virtual_overseer });
|
||||
|
||||
|
||||
@@ -53,13 +53,15 @@ enum Error {
|
||||
RuntimeApi(RuntimeApiError),
|
||||
#[from]
|
||||
UtilError(util::Error),
|
||||
#[from]
|
||||
Prometheus(prometheus::PrometheusError),
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
enum ProtocolSide {
|
||||
Validator,
|
||||
Collator(CollatorId),
|
||||
Validator(validator_side::Metrics),
|
||||
Collator(CollatorId, collator_side::Metrics),
|
||||
}
|
||||
|
||||
/// The collator protocol subsystem.
|
||||
@@ -71,10 +73,12 @@ impl CollatorProtocolSubsystem {
|
||||
/// Start the collator protocol.
|
||||
/// If `id` is `Some` this is a collator side of the protocol.
|
||||
/// If `id` is `None` this is a validator side of the protocol.
|
||||
pub fn new(id: Option<CollatorId>) -> Self {
|
||||
/// Caller must provide a registry for prometheus metrics.
|
||||
pub fn new(id: Option<CollatorId>, registry: Option<&prometheus::Registry>) -> Self {
|
||||
use metrics::Metrics;
|
||||
let protocol_side = match id {
|
||||
Some(id) => ProtocolSide::Collator(id),
|
||||
None => ProtocolSide::Validator,
|
||||
Some(id) => ProtocolSide::Collator(id, collator_side::Metrics::register(registry)),
|
||||
None => ProtocolSide::Validator(validator_side::Metrics::register(registry)),
|
||||
};
|
||||
|
||||
Self {
|
||||
@@ -87,28 +91,26 @@ impl CollatorProtocolSubsystem {
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>,
|
||||
{
|
||||
match self.protocol_side {
|
||||
ProtocolSide::Validator => validator_side::run(ctx, REQUEST_TIMEOUT).await,
|
||||
ProtocolSide::Collator(id) => collator_side::run(ctx, id).await,
|
||||
ProtocolSide::Validator(metrics) => validator_side::run(
|
||||
ctx,
|
||||
REQUEST_TIMEOUT,
|
||||
metrics,
|
||||
).await,
|
||||
ProtocolSide::Collator(id, metrics) => collator_side::run(
|
||||
ctx,
|
||||
id,
|
||||
metrics,
|
||||
).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Collator protocol metrics.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Metrics;
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(_registry: &prometheus::Registry)
|
||||
-> std::result::Result<Self, prometheus::PrometheusError> {
|
||||
Ok(Metrics)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Context> Subsystem<Context> for CollatorProtocolSubsystem
|
||||
where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage> + Sync + Send,
|
||||
{
|
||||
type Metrics = Metrics;
|
||||
// The actual `Metrics` type depends on whether we're on the collator or validator side.
|
||||
type Metrics = ();
|
||||
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
SpawnedSubsystem {
|
||||
|
||||
@@ -34,6 +34,7 @@ use polkadot_subsystem::{
|
||||
messages::{
|
||||
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
|
||||
},
|
||||
metrics::{self, prometheus},
|
||||
};
|
||||
use polkadot_node_network_protocol::{
|
||||
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, RequestId,
|
||||
@@ -48,6 +49,46 @@ const COST_REQUEST_TIMED_OUT: Rep = Rep::new(-20, "A collation request has timed
|
||||
const COST_REPORT_BAD: Rep = Rep::new(-50, "A collator was reported by another subsystem");
|
||||
const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem");
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(super) struct Metrics(Option<MetricsInner>);
|
||||
|
||||
impl Metrics {
|
||||
fn on_request(&self, succeeded: std::result::Result<(), ()>) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
match succeeded {
|
||||
Ok(()) => metrics.collation_requests.with_label_values(&["succeeded"]).inc(),
|
||||
Err(()) => metrics.collation_requests.with_label_values(&["failed"]).inc(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
collation_requests: prometheus::CounterVec<prometheus::U64>,
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry)
|
||||
-> std::result::Result<Self, prometheus::PrometheusError>
|
||||
{
|
||||
let metrics = MetricsInner {
|
||||
collation_requests: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_collation_requests_total",
|
||||
"Number of collations requested from Collators.",
|
||||
),
|
||||
&["succeeded", "failed"],
|
||||
)?,
|
||||
registry,
|
||||
)?
|
||||
};
|
||||
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum CollationRequestResult {
|
||||
Received(RequestId),
|
||||
@@ -134,6 +175,14 @@ struct State {
|
||||
|
||||
/// Possessed collations.
|
||||
collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>,
|
||||
|
||||
/// Leaves have recently moved out of scope.
|
||||
/// These are looked into when we receive previously requested collations that we
|
||||
/// are no longer interested in.
|
||||
recently_removed_heads: HashSet<Hash>,
|
||||
|
||||
/// Metrics.
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
/// Another subsystem has requested to fetch collations on a particular leaf for some para.
|
||||
@@ -291,6 +340,7 @@ where
|
||||
let _ = per_request.received.send(());
|
||||
if let Some(collator_id) = state.known_collators.get(&origin) {
|
||||
let _ = per_request.result.send((receipt.clone(), pov.clone()));
|
||||
state.metrics.on_request(Ok(()));
|
||||
|
||||
state.collations
|
||||
.entry((relay_parent, para_id))
|
||||
@@ -300,11 +350,11 @@ where
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: https://github.com/paritytech/polkadot/issues/1694
|
||||
// This is tricky. If our chain has moved on, we have already canceled
|
||||
// the relevant request and removed it from the map; so and we are not expecting
|
||||
// this reply although technically it is not a malicious behaviur.
|
||||
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?;
|
||||
// If this collation is not just a delayed one that we were expecting,
|
||||
// but our view has moved on, in that case modify peer's reputation.
|
||||
if !state.recently_removed_heads.contains(&relay_parent) {
|
||||
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -481,7 +531,11 @@ async fn handle_our_view_change(
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Update the set of recently removed chain heads.
|
||||
state.recently_removed_heads.clear();
|
||||
|
||||
for removed in removed.into_iter() {
|
||||
state.recently_removed_heads.insert(removed.clone());
|
||||
remove_relay_parent(state, removed).await?;
|
||||
}
|
||||
|
||||
@@ -497,6 +551,8 @@ async fn request_timed_out<Context>(
|
||||
where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>
|
||||
{
|
||||
state.metrics.on_request(Err(()));
|
||||
|
||||
// We have to go backwards in the map, again.
|
||||
if let Some(key) = find_val_in_map(&state.requested_collations, &id) {
|
||||
if let Some(_) = state.requested_collations.remove(&key) {
|
||||
@@ -595,7 +651,11 @@ where
|
||||
}
|
||||
|
||||
/// The main run loop.
|
||||
pub(crate) async fn run<Context>(mut ctx: Context, request_timeout: Duration) -> Result<()>
|
||||
pub(crate) async fn run<Context>(
|
||||
mut ctx: Context,
|
||||
request_timeout: Duration,
|
||||
metrics: Metrics,
|
||||
) -> Result<()>
|
||||
where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>
|
||||
{
|
||||
@@ -604,6 +664,7 @@ where
|
||||
|
||||
let mut state = State {
|
||||
request_timeout,
|
||||
metrics,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -707,7 +768,7 @@ mod tests {
|
||||
|
||||
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
|
||||
|
||||
let subsystem = run(context, Duration::from_millis(50));
|
||||
let subsystem = run(context, Duration::from_millis(50), Metrics::default());
|
||||
|
||||
let test_fut = test(TestHarness { virtual_overseer });
|
||||
|
||||
|
||||
Reference in New Issue
Block a user