Request based PoV distribution (#2640)

* Indentation fix.

* Prepare request-response for PoV fetching.

* Drop old PoV distribution.

* WIP: Fetch PoV directly from backing.

* Backing compiles.

* Runtime access and connection management for PoV distribution.

* Get rid of seemingly dead code.

* Implement PoV fetching.

Backing does not yet use it.

* Don't send `ConnectToValidators` for empty list.

* Even better - no need to check over and over again.

* PoV fetching implemented.

+ Typechecks
+ Should work

Missing:

- Guide
- Tests
- Do fallback fetching in case fetching from seconding validator fails.

* Check PoV hash upon reception.

* Implement retry of PoV fetching in backing.

* Avoid pointless validation spawning.

* Add jaeger span to pov requesting.

* Add back tracing.

* Review remarks.

* Whitespace.

* Whitespace again.

* Cleanup + fix tests.

* Log to log target in overseer.

* Fix more tests.

* Don't fail if group cannot be found.

* Simple test for PoV fetcher.

* Handle missing group membership better.

* Add test for retry functionality.

* Fix flaky test.

* Spaces again.

* Guide updates.

* Spaces.
This commit is contained in:
Robert Klotzner
2021-03-28 17:11:38 +02:00
committed by GitHub
parent 27b6d83974
commit c6f07d8f31
35 changed files with 1382 additions and 3184 deletions
@@ -17,20 +17,26 @@
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::request_response::request::RequestError;
use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_primitives::v1::SessionIndex;
use polkadot_primitives::v1::{CompressedPoVError, SessionIndex};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
use crate::LOG_TARGET;
/// Errors of this subsystem.
#[derive(Debug, Error)]
pub enum Error {
#[error("Response channel to obtain QueryChunk failed")]
#[error("Response channel to obtain chunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Response channel to obtain available data failed")]
QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
@@ -53,24 +59,43 @@ pub enum Error {
/// Sending response failed.
#[error("Sending a request's response failed.")]
SendResponse,
}
/// Error that we should handle gracefully by logging it.
#[derive(Debug)]
pub enum NonFatalError {
/// Some request to utility functions failed.
/// This can be either `RuntimeRequestCanceled` or `RuntimeApiError`.
#[error("Utility request failed")]
UtilRequest(UtilError),
/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),
/// Some request to the runtime failed.
/// For example if we prune a block we're requesting info about.
#[error("Runtime API error")]
RuntimeRequest(RuntimeApiError),
/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),
/// Decompressing PoV failed.
#[error("PoV could not be decompressed")]
PoVDecompression(CompressedPoVError),
/// Fetching PoV failed with `RequestError`.
#[error("FetchPoV request error")]
FetchPoV(#[source] RequestError),
/// Fetching PoV failed as the received PoV did not match the expected hash.
#[error("Fetched PoV does not match expected hash")]
UnexpectedPoV,
#[error("Remote responded with `NoSuchPoV`")]
NoSuchPoV,
/// No validator with the index could be found in current session.
#[error("Given validator index could not be found")]
InvalidValidatorIndex,
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -87,9 +112,20 @@ pub(crate) async fn recv_runtime<V>(
oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
UtilError,
>,
) -> std::result::Result<V, NonFatalError> {
r.map_err(NonFatalError::UtilRequest)?
) -> std::result::Result<V, Error> {
r.map_err(Error::UtilRequest)?
.await
.map_err(NonFatalError::RuntimeRequestCanceled)?
.map_err(NonFatalError::RuntimeRequest)
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) {
if let Err(error) = result {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
}
}
@@ -26,15 +26,23 @@ use polkadot_subsystem::{
/// Error and [`Result`] type for this subsystem.
mod error;
pub use error::Error;
use error::Result;
use error::{Result, log_error};
/// Runtime requests.
mod runtime;
use runtime::Runtime;
/// `Requester` taking care of requesting chunks for candidates pending availability.
mod requester;
use requester::Requester;
/// Handing requests for PoVs during backing.
mod pov_requester;
use pov_requester::PoVRequester;
/// Responding to erasure chunk requests:
mod responder;
use responder::answer_request_log;
use responder::{answer_chunk_request_log, answer_pov_request_log};
/// Cache for session information.
mod session_cache;
@@ -52,6 +60,8 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution";
pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: SyncCryptoStorePtr,
/// Easy and efficient runtime access for this subsystem.
runtime: Runtime,
/// Prometheus metrics.
metrics: Metrics,
}
@@ -74,17 +84,20 @@ where
}
impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
let runtime = Runtime::new(keystore.clone());
Self { keystore, runtime, metrics }
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(self, mut ctx: Context) -> Result<()>
async fn run<Context>(mut self, mut ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
let mut requester = Requester::new(self.keystore.clone(), self.metrics.clone()).fuse();
let mut pov_requester = PoVRequester::new();
loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
@@ -107,14 +120,14 @@ impl AvailabilityDistributionSubsystem {
};
match message {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
// Update the relay chain heads we are fetching our pieces for:
if let Some(e) = requester
.get_mut()
.update_fetching_heads(&mut ctx, update)
.await?
{
tracing::debug!(target: LOG_TARGET, "Error processing ActiveLeavesUpdate: {:?}", e);
}
log_error(
pov_requester.update_connected_validators(&mut ctx, &mut self.runtime, &update).await,
"PoVRequester::update_connected_validators"
);
log_error(
requester.get_mut().update_fetching_heads(&mut ctx, update).await,
"Error in Requester::update_fetching_heads"
);
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => {
@@ -123,7 +136,34 @@ impl AvailabilityDistributionSubsystem {
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::ChunkFetchingRequest(req),
} => {
answer_request_log(&mut ctx, req, &self.metrics).await
answer_chunk_request_log(&mut ctx, req, &self.metrics).await
}
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::PoVFetchingRequest(req),
} => {
answer_pov_request_log(&mut ctx, req, &self.metrics).await
}
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::FetchPoV {
relay_parent,
from_validator,
candidate_hash,
pov_hash,
tx,
},
} => {
log_error(
pov_requester.fetch_pov(
&mut ctx,
&mut self.runtime,
relay_parent,
from_validator,
candidate_hash,
pov_hash,
tx,
).await,
"PoVRequester::fetch_pov"
);
}
}
}
@@ -24,7 +24,7 @@ pub const SUCCEEDED: &'static str = "succeeded";
/// Label for fail counters.
pub const FAILED: &'static str = "failed";
/// Label for chunks that could not be served, because they were not available.
/// Label for chunks/PoVs that could not be served, because they were not available.
pub const NOT_FOUND: &'static str = "not-found";
/// Availability Distribution metrics.
@@ -47,6 +47,12 @@ struct MetricsInner {
/// to a chunk request. This includes `NoSuchChunk` responses.
served_chunks: CounterVec<U64>,
/// Number of PoVs served.
///
/// Note: Right now, `Succeeded` gets incremented whenever we were able to successfully respond
/// to a PoV request. This includes `NoSuchPoV` responses.
served_povs: CounterVec<U64>,
/// Number of times our first set of validators did not provide the needed chunk and we had to
/// query further validators.
retries: Counter<U64>,
@@ -66,12 +72,19 @@ impl Metrics {
}
/// Increment counter on served chunks.
pub fn on_served(&self, label: &'static str) {
pub fn on_served_chunk(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
metrics.served_chunks.with_label_values(&[label]).inc()
}
}
/// Increment counter on served PoVs.
pub fn on_served_pov(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
metrics.served_povs.with_label_values(&[label]).inc()
}
}
/// Increment retry counter.
pub fn on_retry(&self) {
if let Some(metrics) = &self.0 {
@@ -103,6 +116,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
served_povs: prometheus::register(
CounterVec::new(
Opts::new(
"parachain_served_povs_total",
"Total number of povs served by this backer.",
),
&["success"]
)?,
registry,
)?,
retries: prometheus::register(
Counter::new(
"parachain_fetch_retries_total",
@@ -0,0 +1,333 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! PoV requester takes care of requesting PoVs from validators of a backing group.
use futures::{FutureExt, channel::{mpsc, oneshot}, future::BoxFuture};
use lru::LruCache;
use polkadot_subsystem::jaeger;
use polkadot_node_network_protocol::{
PeerId, peer_set::PeerSet,
request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse}}
};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, Hash, PoV, SessionIndex, ValidatorIndex
};
use polkadot_subsystem::{
ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
messages::{AllMessages, NetworkBridgeMessage, IfDisconnected}
};
use crate::{error::{Error, log_error}, runtime::{Runtime, ValidatorInfo}};
/// Number of sessions we want to keep in the LRU.
const NUM_SESSIONS: usize = 2;
pub struct PoVRequester {
/// We only ever care about being connected to validators of at most two sessions.
///
/// So we keep an LRU for managing connection requests of size 2.
/// Cache will contain `None` if we are not a validator in that session.
connected_validators: LruCache<SessionIndex, Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>>,
}
impl PoVRequester {
/// Create a new requester for PoVs.
pub fn new() -> Self {
Self {
connected_validators: LruCache::new(NUM_SESSIONS),
}
}
/// Make sure we are connected to the right set of validators.
///
/// On every `ActiveLeavesUpdate`, we check whether we are connected properly to our current
/// validator group.
pub async fn update_connected_validators<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut Runtime,
update: &ActiveLeavesUpdate,
) -> super::Result<()>
where
Context: SubsystemContext,
{
let activated = update.activated.iter().map(|ActivatedLeaf { hash: h, .. }| h);
let activated_sessions =
get_activated_sessions(ctx, runtime, activated).await?;
for (parent, session_index) in activated_sessions {
if self.connected_validators.contains(&session_index) {
continue
}
let rx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?;
self.connected_validators.put(session_index, rx);
}
Ok(())
}
/// Start background worker for taking care of fetching the requested `PoV` from the network.
pub async fn fetch_pov<Context>(
&self,
ctx: &mut Context,
runtime: &mut Runtime,
parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
pov_hash: Hash,
tx: oneshot::Sender<PoV>
) -> super::Result<()>
where
Context: SubsystemContext,
{
let info = &runtime.get_session_info(ctx, parent).await?.session_info;
let authority_id = info.discovery_keys.get(from_validator.0 as usize)
.ok_or(Error::InvalidValidatorIndex)?
.clone();
let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id),
PoVFetchingRequest {
candidate_hash,
},
);
let full_req = Requests::PoVFetching(req);
ctx.send_message(
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
vec![full_req],
// We are supposed to be connected to validators of our group via `PeerSet`,
// but at session boundaries that is kind of racy, in case a connection takes
// longer to get established, so we try to connect in any case.
IfDisconnected::TryConnect
)
)).await;
let span = jaeger::Span::new(candidate_hash, "fetch-pov")
.with_validator_index(from_validator);
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
.await
.map_err(|e| Error::SpawnTask(e))
}
}
/// Future to be spawned for taking care of handling reception and sending of PoV.
async fn fetch_pov_job(
pov_hash: Hash,
pending_response: BoxFuture<'static, Result<PoVFetchingResponse, RequestError>>,
span: jaeger::Span,
tx: oneshot::Sender<PoV>,
) {
log_error(
do_fetch_pov(pov_hash, pending_response, span, tx).await,
"fetch_pov_job",
)
}
/// Do the actual work of waiting for the response.
async fn do_fetch_pov(
pov_hash: Hash,
pending_response: BoxFuture<'static, Result<PoVFetchingResponse, RequestError>>,
_span: jaeger::Span,
tx: oneshot::Sender<PoV>,
)
-> super::Result<()>
{
let response = pending_response.await.map_err(Error::FetchPoV)?;
let pov = match response {
PoVFetchingResponse::PoV(compressed) => {
compressed.decompress().map_err(Error::PoVDecompression)?
}
PoVFetchingResponse::NoSuchPoV => {
return Err(Error::NoSuchPoV)
}
};
if pov.hash() == pov_hash {
tx.send(pov).map_err(|_| Error::SendResponse)
} else {
Err(Error::UnexpectedPoV)
}
}
/// Get the session indeces for the given relay chain parents.
async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut Runtime, new_heads: impl Iterator<Item = &Hash>)
-> super::Result<impl Iterator<Item = (Hash, SessionIndex)>>
where
Context: SubsystemContext,
{
let mut sessions = Vec::new();
for parent in new_heads {
sessions.push((*parent, runtime.get_session_index(ctx, *parent).await?));
}
Ok(sessions.into_iter())
}
/// Connect to validators of our validator group.
async fn connect_to_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
parent: Hash,
session: SessionIndex
)
-> super::Result<Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>>
where
Context: SubsystemContext,
{
if let Some(validator_ids) = determine_relevant_validators(ctx, runtime, parent, session).await? {
// We don't actually care about `PeerId`s, just keeping receiver so we stay connected:
let (tx, rx) = mpsc::channel(0);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids, peer_set: PeerSet::Validation, connected: tx
})).await;
Ok(Some(rx))
} else {
Ok(None)
}
}
/// Get the validators in our validator group.
///
/// Return: `None` if not a validator.
async fn determine_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
parent: Hash,
session: SessionIndex,
)
-> super::Result<Option<Vec<AuthorityDiscoveryId>>>
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx, parent, session).await?;
if let ValidatorInfo {
our_index: Some(our_index),
our_group: Some(our_group)
} = &info.validator_info {
let indeces = info.session_info.validator_groups.get(our_group.0 as usize)
.expect("Our group got retrieved from that session info, it must exist. qed.")
.clone();
Ok(Some(
indeces.into_iter()
.filter(|i| *i != *our_index)
.map(|i| info.session_info.discovery_keys[i.0 as usize].clone())
.collect()
))
} else {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use futures::{executor, future};
use parity_scale_codec::Encode;
use sp_core::testing::TaskExecutor;
use polkadot_primitives::v1::{BlockData, CandidateHash, CompressedPoV, Hash, ValidatorIndex};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::messages::{AvailabilityDistributionMessage, RuntimeApiMessage, RuntimeApiRequest};
use super::*;
use crate::LOG_TARGET;
use crate::tests::mock::{make_session_info, make_ferdie_keystore};
#[test]
fn rejects_invalid_pov() {
sp_tracing::try_init_simple();
let pov = PoV {
block_data: BlockData(vec![1,2,3,4,5,6]),
};
test_run(Hash::default(), pov);
}
#[test]
fn accepts_valid_pov() {
sp_tracing::try_init_simple();
let pov = PoV {
block_data: BlockData(vec![1,2,3,4,5,6]),
};
test_run(pov.hash(), pov);
}
fn test_run(pov_hash: Hash, pov: PoV) {
let requester = PoVRequester::new();
let pool = TaskExecutor::new();
let (mut context, mut virtual_overseer) =
test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
let keystore = make_ferdie_keystore();
let mut runtime = crate::runtime::Runtime::new(keystore);
let (tx, rx) = oneshot::channel();
let testee = async {
requester.fetch_pov(
&mut context,
&mut runtime,
Hash::default(),
ValidatorIndex(0),
CandidateHash::default(),
pov_hash,
tx,
).await.expect("Should succeed");
};
let tester = async move {
loop {
match virtual_overseer.recv().await {
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
_,
RuntimeApiRequest::SessionIndexForChild(tx)
)
) => {
tx.send(Ok(0)).unwrap();
}
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
_,
RuntimeApiRequest::SessionInfo(_, tx)
)
) => {
tx.send(Ok(Some(make_session_info()))).unwrap();
}
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(mut reqs, _)) => {
let req = assert_matches!(
reqs.pop(),
Some(Requests::PoVFetching(outgoing)) => {outgoing}
);
req.pending_response.send(Ok(PoVFetchingResponse::PoV(
CompressedPoV::compress(&pov).unwrap()).encode()
)).unwrap();
break
},
msg => tracing::debug!(target: LOG_TARGET, msg = ?msg, "Received msg"),
}
}
if pov.hash() == pov_hash {
assert_eq!(rx.await, Ok(pov));
} else {
assert_eq!(rx.await, Err(oneshot::Canceled));
}
};
futures::pin_mut!(testee);
futures::pin_mut!(tester);
executor::block_on(future::join(testee, tester));
}
}
@@ -138,7 +138,7 @@ impl FetchTaskConfig {
let live_in = vec![leaf].into_iter().collect();
// Don't run tasks for our backing group:
if session_info.our_group == core.group_responsible {
if session_info.our_group == Some(core.group_responsible) {
return FetchTaskConfig {
live_in,
prepared_running: None,
@@ -39,7 +39,7 @@ use polkadot_subsystem::{
};
use super::{error::recv_runtime, session_cache::SessionCache, LOG_TARGET, Metrics};
use crate::error::NonFatalError;
use crate::error::Error;
/// A task fetching a particular chunk.
mod fetch_task;
@@ -96,7 +96,7 @@ impl Requester {
&mut self,
ctx: &mut Context,
update: ActiveLeavesUpdate,
) -> super::Result<Option<NonFatalError>>
) -> super::Result<()>
where
Context: SubsystemContext,
{
@@ -111,9 +111,9 @@ impl Requester {
} = update;
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
let err = self.start_requesting_chunks(ctx, activated.into_iter()).await?;
self.start_requesting_chunks(ctx, activated.into_iter()).await?;
self.stop_requesting_chunks(deactivated.into_iter());
Ok(err)
Ok(())
}
/// Start requesting chunks for newly imported heads.
@@ -121,25 +121,20 @@ impl Requester {
&mut self,
ctx: &mut Context,
new_heads: impl Iterator<Item = ActivatedLeaf>,
) -> super::Result<Option<NonFatalError>>
) -> super::Result<()>
where
Context: SubsystemContext,
{
for ActivatedLeaf { hash: leaf, .. } in new_heads {
let cores = match query_occupied_cores(ctx, leaf).await {
Err(err) => return Ok(Some(err)),
Ok(cores) => cores,
};
let cores = query_occupied_cores(ctx, leaf).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
if let Some(err) = self.add_cores(ctx, leaf, cores).await? {
return Ok(Some(err));
}
self.add_cores(ctx, leaf, cores).await?;
}
Ok(None)
Ok(())
}
/// Stop requesting chunks for obsolete heads.
@@ -164,7 +159,7 @@ impl Requester {
ctx: &mut Context,
leaf: Hash,
cores: impl IntoIterator<Item = OccupiedCore>,
) -> super::Result<Option<NonFatalError>>
) -> super::Result<()>
where
Context: SubsystemContext,
{
@@ -179,7 +174,7 @@ impl Requester {
let tx = self.tx.clone();
let metrics = self.metrics.clone();
let task_cfg = match self
let task_cfg = self
.session_cache
.with_session_info(
ctx,
@@ -189,11 +184,7 @@ impl Requester {
leaf,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
)
.await
{
Err(err) => return Ok(Some(err)),
Ok(task_cfg) => task_cfg,
};
.await?;
if let Some(task_cfg) = task_cfg {
e.insert(FetchTask::start(task_cfg, ctx).await?);
@@ -202,7 +193,7 @@ impl Requester {
}
}
}
Ok(None)
Ok(())
}
}
@@ -237,7 +228,7 @@ impl Stream for Requester {
async fn query_occupied_cores<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> Result<Vec<OccupiedCore>, NonFatalError>
) -> Result<Vec<OccupiedCore>, Error>
where
Context: SubsystemContext,
{
@@ -19,7 +19,7 @@
use futures::channel::oneshot;
use polkadot_node_network_protocol::request_response::{request::IncomingRequest, v1};
use polkadot_primitives::v1::{CandidateHash, ErasureChunk, ValidatorIndex};
use polkadot_primitives::v1::{AvailableData, CandidateHash, CompressedPoV, ErasureChunk, ValidatorIndex};
use polkadot_subsystem::{
messages::{AllMessages, AvailabilityStoreMessage},
SubsystemContext, jaeger,
@@ -28,10 +28,36 @@ use polkadot_subsystem::{
use crate::error::{Error, Result};
use crate::{LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED, NOT_FOUND}};
/// Variant of `answer_request` that does Prometheus metric and logging on errors.
/// Variant of `answer_pov_request` that does Prometheus metric and logging on errors.
///
/// Any errors of `answer_pov_request` will simply be logged.
pub async fn answer_pov_request_log<Context>(
ctx: &mut Context,
req: IncomingRequest<v1::PoVFetchingRequest>,
metrics: &Metrics,
)
where
Context: SubsystemContext,
{
let res = answer_pov_request(ctx, req).await;
match res {
Ok(result) =>
metrics.on_served_pov(if result {SUCCEEDED} else {NOT_FOUND}),
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
err= ?err,
"Serving PoV failed with error"
);
metrics.on_served_pov(FAILED);
}
}
}
/// Variant of `answer_chunk_request` that does Prometheus metric and logging on errors.
///
/// Any errors of `answer_request` will simply be logged.
pub async fn answer_request_log<Context>(
pub async fn answer_chunk_request_log<Context>(
ctx: &mut Context,
req: IncomingRequest<v1::ChunkFetchingRequest>,
metrics: &Metrics,
@@ -39,33 +65,71 @@ pub async fn answer_request_log<Context>(
where
Context: SubsystemContext,
{
let res = answer_request(ctx, req).await;
let res = answer_chunk_request(ctx, req).await;
match res {
Ok(result) =>
metrics.on_served(if result {SUCCEEDED} else {NOT_FOUND}),
metrics.on_served_chunk(if result {SUCCEEDED} else {NOT_FOUND}),
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
err= ?err,
"Serving chunk failed with error"
);
metrics.on_served(FAILED);
metrics.on_served_chunk(FAILED);
}
}
}
/// Answer an incoming PoV fetch request by querying the av store.
///
/// Returns: Ok(true) if chunk was found and served.
pub async fn answer_pov_request<Context>(
ctx: &mut Context,
req: IncomingRequest<v1::PoVFetchingRequest>,
) -> Result<bool>
where
Context: SubsystemContext,
{
let _span = jaeger::Span::new(req.payload.candidate_hash, "answer-pov-request");
let av_data = query_available_data(ctx, req.payload.candidate_hash).await?;
let result = av_data.is_some();
let response = match av_data {
None => v1::PoVFetchingResponse::NoSuchPoV,
Some(av_data) => {
let pov = match CompressedPoV::compress(&av_data.pov) {
Ok(pov) => pov,
Err(error) => {
tracing::error!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
);
// this should really not happen, let this request time out:
return Err(Error::PoVDecompression(error))
}
};
v1::PoVFetchingResponse::PoV(pov)
}
};
req.send_response(response).map_err(|_| Error::SendResponse)?;
Ok(result)
}
/// Answer an incoming chunk request by querying the av store.
///
/// Returns: Ok(true) if chunk was found and served.
pub async fn answer_request<Context>(
pub async fn answer_chunk_request<Context>(
ctx: &mut Context,
req: IncomingRequest<v1::ChunkFetchingRequest>,
) -> Result<bool>
where
Context: SubsystemContext,
{
let span = jaeger::Span::new(req.payload.candidate_hash, "answer-request")
.with_stage(jaeger::Stage::AvailabilityDistribution);
let span = jaeger::Span::new(req.payload.candidate_hash, "answer-chunk-request");
let _child_span = span.child("answer-chunk-request")
.with_chunk_index(req.payload.index.0);
@@ -119,3 +183,21 @@ where
Error::QueryChunkResponseChannel(e)
})
}
/// Query PoV from the availability store.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_available_data<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
) -> Result<Option<AvailableData>>
where
Context: SubsystemContext,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx),
))
.await;
rx.await.map_err(|e| Error::QueryAvailableDataResponseChannel(e))
}
@@ -0,0 +1,197 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Convenient interface to the runtime.
use lru::LruCache;
use sp_application_crypto::AppKey;
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_subsystem_util::{
request_session_index_for_child_ctx, request_session_info_ctx,
};
use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
use polkadot_subsystem::SubsystemContext;
use super::{
error::recv_runtime,
Error,
};
/// Caching of session info as needed by availability distribution.
///
/// It should be ensured that a cached session stays live in the cache as long as we might need it.
pub struct Runtime {
/// Get the session index for a given relay parent.
///
/// We query this up to a 100 times per block, so caching it here without roundtrips over the
/// overseer seems sensible.
session_index_cache: LruCache<Hash, SessionIndex>,
/// Look up cached sessions by SessionIndex.
session_info_cache: LruCache<SessionIndex, ExtendedSessionInfo>,
/// Key store for determining whether we are a validator and what `ValidatorIndex` we have.
keystore: SyncCryptoStorePtr,
}
/// SessionInfo with additional useful data for validator nodes.
pub struct ExtendedSessionInfo {
/// Actual session info as fetched from the runtime.
pub session_info: SessionInfo,
/// Contains useful information about ourselves, in case this node is a validator.
pub validator_info: ValidatorInfo,
}
/// Information about ourself, in case we are an `Authority`.
///
/// This data is derived from the `SessionInfo` and our key as found in the keystore.
pub struct ValidatorInfo {
/// The index this very validator has in `SessionInfo` vectors, if any.
pub our_index: Option<ValidatorIndex>,
/// The group we belong to, if any.
pub our_group: Option<GroupIndex>,
}
impl Runtime {
/// Create a new `Runtime` for convenient runtime fetches.
pub fn new(keystore: SyncCryptoStorePtr) -> Self {
Self {
// 5 relatively conservative, 1 to 2 should suffice:
session_index_cache: LruCache::new(5),
// We need to cache the current and the last session the most:
session_info_cache: LruCache::new(2),
keystore,
}
}
/// Retrieve the current session index.
pub async fn get_session_index<Context>(
&mut self,
ctx: &mut Context,
parent: Hash,
) -> Result<SessionIndex, Error>
where
Context: SubsystemContext,
{
match self.session_index_cache.get(&parent) {
Some(index) => Ok(*index),
None => {
let index =
recv_runtime(request_session_index_for_child_ctx(parent, ctx).await)
.await?;
self.session_index_cache.put(parent, index);
Ok(index)
}
}
}
/// Get `ExtendedSessionInfo` by relay parent hash.
pub async fn get_session_info<'a, Context>(
&'a mut self,
ctx: &mut Context,
parent: Hash,
) -> Result<&'a ExtendedSessionInfo, Error>
where
Context: SubsystemContext,
{
let session_index = self.get_session_index(ctx, parent).await?;
self.get_session_info_by_index(ctx, parent, session_index).await
}
/// Get `ExtendedSessionInfo` by session index.
///
/// `request_session_info_ctx` still requires the parent to be passed in, so we take the parent
/// in addition to the `SessionIndex`.
pub async fn get_session_info_by_index<'a, Context>(
&'a mut self,
ctx: &mut Context,
parent: Hash,
session_index: SessionIndex,
) -> Result<&'a ExtendedSessionInfo, Error>
where
Context: SubsystemContext,
{
if !self.session_info_cache.contains(&session_index) {
let session_info =
recv_runtime(request_session_info_ctx(parent, session_index, ctx).await)
.await?
.ok_or(Error::NoSuchSession(session_index))?;
let validator_info = self.get_validator_info(&session_info).await?;
let full_info = ExtendedSessionInfo {
session_info,
validator_info,
};
self.session_info_cache.put(session_index, full_info);
}
Ok(
self.session_info_cache.get(&session_index)
.expect("We just put the value there. qed.")
)
}
/// Build `ValidatorInfo` for the current session.
///
///
/// Returns: `None` if not a validator.
async fn get_validator_info(
&self,
session_info: &SessionInfo,
) -> Result<ValidatorInfo, Error>
{
if let Some(our_index) = self.get_our_index(&session_info.validators).await {
// Get our group index:
let our_group = session_info.validator_groups
.iter()
.enumerate()
.find_map(|(i, g)| {
g.iter().find_map(|v| {
if *v == our_index {
Some(GroupIndex(i as u32))
} else {
None
}
})
}
);
let info = ValidatorInfo {
our_index: Some(our_index),
our_group,
};
return Ok(info)
}
return Ok(ValidatorInfo { our_index: None, our_group: None })
}
/// Get our `ValidatorIndex`.
///
/// Returns: None if we are not a validator.
async fn get_our_index(&self, validators: &[ValidatorId]) -> Option<ValidatorIndex> {
for (i, v) in validators.iter().enumerate() {
if CryptoStore::has_keys(&*self.keystore, &[(v.to_raw_vec(), ValidatorId::ID)])
.await
{
return Some(ValidatorIndex(i as u32));
}
}
None
}
}
@@ -33,8 +33,7 @@ use polkadot_primitives::v1::{
use polkadot_subsystem::SubsystemContext;
use super::{
error::{recv_runtime, NonFatalError},
Error,
error::{recv_runtime, Error},
LOG_TARGET,
};
@@ -82,7 +81,9 @@ pub struct SessionInfo {
/// Remember to which group we belong, so we won't start fetching chunks for candidates with
/// our group being responsible. (We should have that chunk already.)
pub our_group: GroupIndex,
///
/// `None`, if we are not in fact part of any group.
pub our_group: Option<GroupIndex>,
}
/// Report of bad validators.
@@ -122,7 +123,7 @@ impl SessionCache {
ctx: &mut Context,
parent: Hash,
with_info: F,
) -> Result<Option<R>, NonFatalError>
) -> Result<Option<R>, Error>
where
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
@@ -219,7 +220,7 @@ impl SessionCache {
ctx: &mut Context,
parent: Hash,
session_index: SessionIndex,
) -> Result<Option<SessionInfo>, NonFatalError>
) -> Result<Option<SessionInfo>, Error>
where
Context: SubsystemContext,
{
@@ -230,7 +231,7 @@ impl SessionCache {
..
} = recv_runtime(request_session_info_ctx(parent, session_index, ctx).await)
.await?
.ok_or(NonFatalError::NoSuchSession(session_index))?;
.ok_or(Error::NoSuchSession(session_index))?;
if let Some(our_index) = self.get_our_index(validators).await {
// Get our group index:
@@ -245,8 +246,8 @@ impl SessionCache {
None
}
})
})
.expect("Every validator should be in a validator group. qed.");
}
);
// Shuffle validators in groups:
let mut rng = thread_rng();
@@ -274,9 +275,9 @@ impl SessionCache {
session_index,
our_group,
};
return Ok(Some(info));
return Ok(Some(info))
}
return Ok(None);
return Ok(None)
}
/// Get our `ValidatorIndex`.
@@ -19,14 +19,29 @@
use std::sync::Arc;
use sc_keystore::LocalKeystore;
use sp_keyring::Sr25519Keyring;
use sp_application_crypto::AppKey;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_primitives::v1::{AvailableData, BlockData, CandidateCommitments, CandidateDescriptor,
CandidateHash, CommittedCandidateReceipt, ErasureChunk, GroupIndex, Hash, HeadData, Id
as ParaId, OccupiedCore, PersistedValidationData, PoV, SessionInfo,
ValidatorIndex
use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, CandidateHash,
CommittedCandidateReceipt, ErasureChunk, GroupIndex, Hash, HeadData, Id as ParaId,
OccupiedCore, PersistedValidationData, PoV, SessionInfo, ValidatorId, ValidatorIndex
};
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
/// Get mock keystore with `Ferdie` key.
pub fn make_ferdie_keystore() -> SyncCryptoStorePtr {
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Ferdie.to_seed()),
)
.expect("Insert key into keystore");
keystore
}
/// Create dummy session info with two validator groups.
pub fn make_session_info() -> SessionInfo {
@@ -23,10 +23,7 @@ use smallvec::smallvec;
use futures::{FutureExt, channel::oneshot, SinkExt, channel::mpsc, StreamExt};
use futures_timer::Delay;
use sc_keystore::LocalKeystore;
use sp_application_crypto::AppKey;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_keyring::Sr25519Keyring;
use sp_keystore::SyncCryptoStorePtr;
use sp_core::{traits::SpawnNamed, testing::TaskExecutor};
use sc_network as network;
use sc_network::IfDisconnected;
@@ -39,7 +36,7 @@ use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, Activ
}
};
use polkadot_primitives::v1::{CandidateHash, CoreState, ErasureChunk, GroupIndex, Hash, Id
as ParaId, ScheduledCore, SessionInfo, ValidatorId,
as ParaId, ScheduledCore, SessionInfo,
ValidatorIndex
};
use polkadot_node_network_protocol::{jaeger,
@@ -48,7 +45,7 @@ use polkadot_node_network_protocol::{jaeger,
use polkadot_subsystem_testhelpers as test_helpers;
use test_helpers::SingleItemSink;
use super::mock::{make_session_info, OccupiedCoreBuilder, };
use super::mock::{make_session_info, OccupiedCoreBuilder, make_ferdie_keystore};
use crate::LOG_TARGET;
pub struct TestHarness {
@@ -83,17 +80,10 @@ impl Default for TestState {
let chain_ids = vec![chain_a, chain_b];
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let keystore = make_ferdie_keystore();
let session_info = make_session_info();
SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Ferdie.to_seed()),
)
.expect("Insert key into keystore");
let (cores, chunks) = {
let mut cores = HashMap::new();
let mut chunks = HashMap::new();
@@ -163,6 +153,9 @@ impl TestState {
/// This will simply advance through the simulated chain and examines whether the subsystem
/// behaves as expected: It will succeed if all valid chunks of other backing groups get stored
/// and no other.
///
/// We try to be as agnostic about details as possible, how the subsystem achieves those goals
/// should not be a matter to this test suite.
async fn run_inner(self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle<AvailabilityDistributionMessage>) {
// We skip genesis here (in reality ActiveLeavesUpdate can also skip a block:
let updates = {
@@ -258,15 +251,12 @@ impl TestState {
}
}
_ => {
panic!("Unexpected message received: {:?}", msg);
}
}
}
}
}
async fn overseer_signal(
mut tx: SingleItemSink<FromOverseer<AvailabilityDistributionMessage>>,
msg: impl Into<OverseerSignal>,