Collation protocol: stricter validators (#2810)

* guide: declare one para as a collator

* add ParaId to Declare messages and clean up

* fix build

* fix the testerinos

* begin adding keystore to collator-protocol

* remove request_x_ctx

* add core_for_group

* add bump_rotation

* add some more helpers to subsystem-util

* change signing_key API to take ref

* determine current and next para assignments

* disconnect collators who are not on current or next para

* add collator peer count metric

* notes for later

* some fixes

* add data & keystore to test state

* add a test utility for answering runtime API requests

* fix existing collator tests

* add new tests

* remove sc_keystore

* update cargo lock

Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Robert Habermeier
2021-04-03 21:48:58 +02:00
committed by GitHub
parent 94b0ccc8f1
commit 11b8e4c821
22 changed files with 1064 additions and 334 deletions
+1
View File
@@ -5594,6 +5594,7 @@ dependencies = [
"polkadot-primitives",
"sp-core",
"sp-keyring",
"sp-keystore",
"sp-runtime",
"thiserror",
"tracing",
+10 -10
View File
@@ -32,8 +32,8 @@ use polkadot_node_subsystem::{
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
};
use polkadot_node_subsystem_util::{
request_availability_cores_ctx, request_persisted_validation_data_ctx,
request_validators_ctx, request_validation_code_ctx,
request_availability_cores, request_persisted_validation_data,
request_validators, request_validation_code,
metrics::{self, prometheus},
};
use polkadot_primitives::v1::{
@@ -198,8 +198,8 @@ async fn handle_new_activations<Context: SubsystemContext>(
let _relay_parent_timer = metrics.time_new_activations_relay_parent();
let (availability_cores, validators) = join!(
request_availability_cores_ctx(relay_parent, ctx).await?,
request_validators_ctx(relay_parent, ctx).await?,
request_availability_cores(relay_parent, ctx.sender()).await,
request_validators(relay_parent, ctx.sender()).await,
);
let availability_cores = availability_cores??;
@@ -248,13 +248,13 @@ async fn handle_new_activations<Context: SubsystemContext>(
// within the subtask loop, because we have only a single mutable handle to the
// context, so the work can't really be distributed
let validation_data = match request_persisted_validation_data_ctx(
let validation_data = match request_persisted_validation_data(
relay_parent,
scheduled_core.para_id,
assumption,
ctx,
ctx.sender(),
)
.await?
.await
.await??
{
Some(v) => v,
@@ -271,13 +271,13 @@ async fn handle_new_activations<Context: SubsystemContext>(
}
};
let validation_code = match request_validation_code_ctx(
let validation_code = match request_validation_code(
relay_parent,
scheduled_core.para_id,
assumption,
ctx,
ctx.sender(),
)
.await?
.await
.await??
{
Some(v) => v,
@@ -294,7 +294,7 @@ impl CandidateSelectionJob {
).await {
Ok(response) => response,
Err(err) => {
tracing::warn!(
tracing::debug!(
target: LOG_TARGET,
err = ?err,
"failed to get collation from collator protocol subsystem",
@@ -109,13 +109,9 @@ impl From<SubsystemError> for Error {
/// Receive a response from a runtime request and convert errors.
pub(crate) async fn recv_runtime<V>(
r: std::result::Result<
oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
UtilError,
>,
r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
) -> std::result::Result<V, Error> {
r.map_err(Error::UtilRequest)?
.await
r.await
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}
@@ -32,7 +32,7 @@ use futures::{
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem_util::request_availability_cores_ctx;
use polkadot_node_subsystem_util::request_availability_cores;
use polkadot_primitives::v1::{CandidateHash, CoreState, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
@@ -235,7 +235,7 @@ async fn query_occupied_cores<Context>(
where
Context: SubsystemContext,
{
let cores = recv_runtime(request_availability_cores_ctx(relay_parent, ctx).await).await?;
let cores = recv_runtime(request_availability_cores(relay_parent, ctx.sender()).await).await?;
Ok(cores
.into_iter()
@@ -23,7 +23,7 @@ use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_subsystem_util::{
request_session_index_for_child_ctx, request_session_info_ctx,
request_session_index_for_child, request_session_info,
};
use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
use polkadot_subsystem::SubsystemContext;
@@ -93,7 +93,7 @@ impl Runtime {
Some(index) => Ok(*index),
None => {
let index =
recv_runtime(request_session_index_for_child_ctx(parent, ctx).await)
recv_runtime(request_session_index_for_child(parent, ctx.sender()).await)
.await?;
self.session_index_cache.put(parent, index);
Ok(index)
@@ -117,7 +117,7 @@ impl Runtime {
/// Get `ExtendedSessionInfo` by session index.
///
/// `request_session_info_ctx` still requires the parent to be passed in, so we take the parent
/// `request_session_info` still requires the parent to be passed in, so we take the parent
/// in addition to the `SessionIndex`.
pub async fn get_session_info_by_index<'a, Context>(
&'a mut self,
@@ -130,7 +130,7 @@ impl Runtime {
{
if !self.session_info_cache.contains(&session_index) {
let session_info =
recv_runtime(request_session_info_ctx(parent, session_index, ctx).await)
recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
.await?
.ok_or(Error::NoSuchSession(session_index))?;
let validator_info = self.get_validator_info(&session_info).await?;
@@ -24,7 +24,7 @@ use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_subsystem_util::{
request_session_index_for_child_ctx, request_session_info_ctx,
request_session_index_for_child, request_session_info,
};
use polkadot_primitives::v1::SessionInfo as GlobalSessionInfo;
use polkadot_primitives::v1::{
@@ -132,7 +132,7 @@ impl SessionCache {
Some(index) => *index,
None => {
let index =
recv_runtime(request_session_index_for_child_ctx(parent, ctx).await)
recv_runtime(request_session_index_for_child(parent, ctx.sender()).await)
.await?;
self.session_index_cache.put(parent, index);
index
@@ -210,7 +210,7 @@ impl SessionCache {
/// Query needed information from runtime.
///
/// We need to pass in the relay parent for our call to `request_session_info_ctx`. We should
/// We need to pass in the relay parent for our call to `request_session_info`. We should
/// actually don't need that: I suppose it is used for internal caching based on relay parents,
/// which we don't use here. It should not do any harm though.
///
@@ -229,7 +229,7 @@ impl SessionCache {
discovery_keys,
mut validator_groups,
..
} = recv_runtime(request_session_info_ctx(parent, session_index, ctx).await)
} = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
.await?
.ok_or(Error::NoSuchSession(session_index))?;
@@ -111,7 +111,7 @@ fn check_fetch_retry() {
})
.collect();
state.valid_chunks.retain(|(ch, _)| valid_candidate_hashes.contains(ch));
for (_, v) in state.chunks.iter_mut() {
// This should still succeed as cores are still pending availability on next block.
@@ -49,7 +49,7 @@ use polkadot_node_network_protocol::{
request::RequestError,
},
};
use polkadot_node_subsystem_util::request_session_info_ctx;
use polkadot_node_subsystem_util::request_session_info;
use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1};
mod error;
@@ -697,11 +697,11 @@ async fn handle_recover(
}
let _span = span.child("not-cached");
let session_info = request_session_info_ctx(
let session_info = request_session_info(
state.live_block.1,
session_index,
ctx,
).await?.await.map_err(error::Error::CanceledSessionInfo)??;
ctx.sender(),
).await.await.map_err(error::Error::CanceledSessionInfo)??;
let _span = span.child("session-info-ctx-received");
match session_info {
+2
View File
@@ -1794,6 +1794,7 @@ mod tests {
let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare(
Sr25519Keyring::Alice.public().into(),
Default::default(),
Default::default(),
);
let message = protocol_v1::CollationProtocol::CollatorProtocol(
@@ -2064,6 +2065,7 @@ mod tests {
let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare(
Sr25519Keyring::Alice.public().into(),
Default::default(),
Default::default(),
);
let message = protocol_v1::CollationProtocol::CollatorProtocol(
@@ -11,8 +11,9 @@ futures-timer = "3"
thiserror = "1.0.23"
tracing = "0.1.25"
sp-core = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
@@ -37,9 +37,9 @@ use polkadot_node_network_protocol::{
};
use polkadot_node_subsystem_util::{
validator_discovery,
request_validators_ctx,
request_validator_groups_ctx,
request_availability_cores_ctx,
request_validators,
request_validator_groups,
request_availability_cores,
metrics::{self, prometheus},
};
use polkadot_node_primitives::{SignedFullStatement, Statement, PoV, CompressedPoV};
@@ -380,7 +380,7 @@ async fn determine_core(
para_id: ParaId,
relay_parent: Hash,
) -> Result<Option<(CoreIndex, usize)>> {
let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??;
let cores = request_availability_cores(relay_parent, ctx.sender()).await.await??;
for (idx, core) in cores.iter().enumerate() {
if let CoreState::Scheduled(occupied) = core {
@@ -403,7 +403,7 @@ async fn determine_our_validators(
cores: usize,
relay_parent: Hash,
) -> Result<(HashSet<ValidatorId>, HashSet<ValidatorId>)> {
let groups = request_validator_groups_ctx(relay_parent, ctx).await?;
let groups = request_validator_groups(relay_parent, ctx.sender()).await;
let groups = groups.await??;
@@ -413,7 +413,7 @@ async fn determine_our_validators(
let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len();
let next_validators = groups.0.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default();
let validators = request_validators_ctx(relay_parent, ctx).await?.await??;
let validators = request_validators(relay_parent, ctx.sender()).await.await??;
let current_validators = current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
let next_validators = next_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
@@ -430,17 +430,20 @@ async fn declare(
) {
let declare_signature_payload = protocol_v1::declare_signature_payload(&state.local_peer_id);
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(
state.collator_pair.public(),
state.collator_pair.sign(&declare_signature_payload),
);
if let Some(para_id) = state.collating_on {
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(
state.collator_pair.public(),
para_id,
state.collator_pair.sign(&declare_signature_payload),
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![peer],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![peer],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
}
}
/// Issue a connection request to a set of validators and
@@ -476,11 +479,6 @@ async fn advertise_collation(
relay_parent: Hash,
peer: PeerId,
) {
let collating_on = match state.collating_on {
Some(collating_on) => collating_on,
None => return,
};
let should_advertise = state.our_validators_groups
.get(&relay_parent)
.map(|g| g.should_advertise_to(&peer))
@@ -518,7 +516,6 @@ async fn advertise_collation(
let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
relay_parent,
collating_on,
);
ctx.send_message(AllMessages::NetworkBridge(
@@ -705,14 +702,14 @@ async fn handle_incoming_peer_message(
use protocol_v1::CollatorProtocolMessage::*;
match msg {
Declare(_, _) => {
Declare(_, _, _) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
"Declare message is not expected on the collator side of the protocol",
);
}
AdvertiseCollation(_, _) => {
AdvertiseCollation(_) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
@@ -772,6 +769,12 @@ async fn handle_validator_connected(
validator_id: ValidatorId,
relay_parent: Hash,
) {
tracing::trace!(
target: LOG_TARGET,
?validator_id,
"Connected to requested validator"
);
let not_declared = state.declared_at.insert(peer_id.clone());
if not_declared {
@@ -1382,7 +1385,11 @@ mod tests {
}
/// Check that the next received message is a `Declare` message.
async fn expect_declare_msg(virtual_overseer: &mut VirtualOverseer, test_state: &TestState, peer: &PeerId) {
async fn expect_declare_msg(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
peer: &PeerId,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
@@ -1394,12 +1401,17 @@ mod tests {
assert_eq!(to[0], *peer);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Declare(collator_id, signature) => {
protocol_v1::CollatorProtocolMessage::Declare(
collator_id,
para_id,
signature,
) => {
assert!(signature.verify(
&*protocol_v1::declare_signature_payload(&test_state.local_peer_id),
&collator_id),
);
assert_eq!(collator_id, test_state.collator_pair.public());
assert_eq!(para_id, test_state.para_id);
}
);
}
@@ -1409,7 +1421,6 @@ mod tests {
/// Check that the next received message is a collation advertisment message.
async fn expect_advertise_collation_msg(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
peer: &PeerId,
expected_relay_parent: Hash,
) {
@@ -1426,10 +1437,8 @@ mod tests {
wire_message,
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
relay_parent,
collating_on,
) => {
assert_eq!(relay_parent, expected_relay_parent);
assert_eq!(collating_on, test_state.para_id);
}
);
}
@@ -1478,7 +1487,7 @@ mod tests {
// The peer is interested in a leaf that we have a collation for;
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Request a collation.
let (tx, rx) = oneshot::channel();
@@ -1556,7 +1565,7 @@ mod tests {
)
).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
});
}
@@ -1619,13 +1628,13 @@ mod tests {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
// The other validator announces that it changed its view.
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
// After changing the view we should receive the advertisement
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
})
}
@@ -1669,10 +1678,10 @@ mod tests {
connected.try_send((validator_id2, peer2.clone())).unwrap();
send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, old_relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await;
send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
})
}
@@ -1698,7 +1707,7 @@ mod tests {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Disconnect and reconnect directly
disconnect_peer(&mut virtual_overseer, peer.clone()).await;
@@ -25,6 +25,8 @@ use std::time::Duration;
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use thiserror::Error;
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange as Rep};
use polkadot_node_subsystem_util::{self as util, metrics::prometheus};
use polkadot_primitives::v1::CollatorPair;
@@ -57,18 +59,33 @@ type Result<T> = std::result::Result<T, Error>;
/// A collator eviction policy - how fast to evict collators which are inactive.
#[derive(Debug, Clone, Copy)]
pub struct CollatorEvictionPolicy(pub Duration);
pub struct CollatorEvictionPolicy {
/// How fast to evict collators who are inactive.
pub inactive_collator: Duration,
/// How fast to evict peers which don't declare their para.
pub undeclared: Duration,
}
impl Default for CollatorEvictionPolicy {
fn default() -> Self {
CollatorEvictionPolicy(Duration::from_secs(24))
CollatorEvictionPolicy {
inactive_collator: Duration::from_secs(24),
undeclared: Duration::from_secs(1),
}
}
}
/// What side of the collator protocol is being engaged
pub enum ProtocolSide {
/// Validators operate on the relay chain.
Validator(CollatorEvictionPolicy, validator_side::Metrics),
Validator {
/// The keystore holding validator keys.
keystore: SyncCryptoStorePtr,
/// An eviction policy for inactive peers or validators.
eviction_policy: CollatorEvictionPolicy,
/// Prometheus metrics for validators.
metrics: validator_side::Metrics,
},
/// Collators operate on a parachain.
Collator(PeerId, CollatorPair, collator_side::Metrics),
}
@@ -95,9 +112,10 @@ impl CollatorProtocolSubsystem {
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
match self.protocol_side {
ProtocolSide::Validator(policy, metrics) => validator_side::run(
ProtocolSide::Validator { keystore, eviction_policy, metrics } => validator_side::run(
ctx,
policy,
keystore,
eviction_policy,
metrics,
).await,
ProtocolSide::Collator(local_peer_id, collator_pair, metrics) => collator_side::run(
File diff suppressed because it is too large Load Diff
@@ -104,7 +104,7 @@ async fn determine_relevant_authorities(
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
) -> Result<Vec<AuthorityDiscoveryId>, util::Error> {
let authorities = util::request_authorities_ctx(relay_parent, ctx).await?.await??;
let authorities = util::request_authorities(relay_parent, ctx.sender()).await.await??;
Ok(authorities)
}
@@ -135,7 +135,7 @@ impl State {
leaves: impl Iterator<Item = Hash>,
) -> Result<(), util::Error> {
for leaf in leaves {
let current_index = util::request_session_index_for_child_ctx(leaf, ctx).await?.await??;
let current_index = util::request_session_index_for_child(leaf, ctx.sender()).await.await??;
let maybe_new_session = match self.last_session_index {
Some(i) if i <= current_index => None,
_ => Some((current_index, leaf)),
+2 -2
View File
@@ -335,11 +335,11 @@ pub mod v1 {
/// Declare the intent to advertise collations under a collator ID, attaching a
/// signature of the `PeerId` of the node using the given collator ID key.
#[codec(index = 0)]
Declare(CollatorId, CollatorSignature),
Declare(CollatorId, ParaId, CollatorSignature),
/// Advertise a collation to a validator. Can only be sent once the peer has
/// declared that they are a collator with given ID.
#[codec(index = 1)]
AdvertiseCollation(Hash, ParaId),
AdvertiseCollation(Hash),
/// A collation sent to a validator was seconded.
#[codec(index = 4)]
CollationSeconded(SignedFullStatement),
+5 -1
View File
@@ -533,7 +533,11 @@ where
collator_pair,
Metrics::register(registry)?,
),
IsCollator::No => ProtocolSide::Validator(Default::default(),Metrics::register(registry)?),
IsCollator::No => ProtocolSide::Validator {
keystore: keystore.clone(),
eviction_policy: Default::default(),
metrics: Metrics::register(registry)?,
},
};
CollatorProtocolSubsystem::new(
side,
+31 -96
View File
@@ -39,13 +39,13 @@ use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, SessionInfo,
AuthorityDiscoveryId,
AuthorityDiscoveryId, GroupIndex,
};
use sp_core::{traits::SpawnNamed, Public};
use sp_application_crypto::AppKey;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError};
use std::{
collections::{HashMap, hash_map::Entry}, convert::{TryFrom, TryInto}, marker::Unpin, pin::Pin, task::{Poll, Context},
collections::{HashMap, hash_map::Entry}, convert::TryFrom, marker::Unpin, pin::Pin, task::{Poll, Context},
time::Duration, fmt, sync::Arc,
};
use streamunordered::{StreamUnordered, StreamYield};
@@ -179,98 +179,37 @@ specialize_requests! {
fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
}
/// Request some data from the `RuntimeApi` via a SubsystemContext.
async fn request_from_runtime_ctx<RequestBuilder, Context, Response>(
parent: Hash,
ctx: &mut Context,
request_builder: RequestBuilder,
) -> Result<RuntimeApiReceiver<Response>, Error>
where
RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
Context: SubsystemContext,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
).await;
Ok(rx)
}
/// Construct specialized request functions for the runtime.
///
/// These would otherwise get pretty repetitive.
macro_rules! specialize_requests_ctx {
// expand return type name for documentation purposes
(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
specialize_requests_ctx!{
named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
}
};
// create a single specialized request function
(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime via a `SubsystemContext`"]
pub async fn $func_name<Context: SubsystemContext>(
parent: Hash,
$(
$param_name: $param_ty,
)*
ctx: &mut Context,
) -> Result<RuntimeApiReceiver<$return_ty>, Error> {
request_from_runtime_ctx(parent, ctx, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
}
};
// recursive decompose
(
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
$(
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
)+
) => {
specialize_requests_ctx!{
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
}
specialize_requests_ctx!{
$(
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
)+
}
};
}
specialize_requests_ctx! {
fn request_authorities_ctx() -> Vec<AuthorityDiscoveryId>; Authorities;
fn request_validators_ctx() -> Vec<ValidatorId>; Validators;
fn request_validator_groups_ctx() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
fn request_availability_cores_ctx() -> Vec<CoreState>; AvailabilityCores;
fn request_persisted_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
fn request_session_index_for_child_ctx() -> SessionIndex; SessionIndexForChild;
fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
fn request_candidate_events_ctx() -> Vec<CandidateEvent>; CandidateEvents;
fn request_session_info_ctx(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
}
/// From the given set of validators, find the first key we can sign with, if any.
pub async fn signing_key(validators: &[ValidatorId], keystore: SyncCryptoStorePtr) -> Option<ValidatorId> {
for v in validators.iter() {
if CryptoStore::has_keys(&*keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await {
return Some(v.clone());
pub async fn signing_key(validators: &[ValidatorId], keystore: &SyncCryptoStorePtr)
-> Option<ValidatorId>
{
signing_key_and_index(validators, keystore).await.map(|(k, _)| k)
}
/// From the given set of validators, find the first key we can sign with, if any, and return it
/// along with the validator index.
pub async fn signing_key_and_index(validators: &[ValidatorId], keystore: &SyncCryptoStorePtr)
-> Option<(ValidatorId, ValidatorIndex)>
{
for (i, v) in validators.iter().enumerate() {
if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await {
return Some((v.clone(), ValidatorIndex(i as _)));
}
}
None
}
/// Find the validator group the given validator index belongs to.
pub fn find_validator_group(groups: &[Vec<ValidatorIndex>], index: ValidatorIndex)
-> Option<GroupIndex>
{
groups.iter().enumerate().find_map(|(i, g)| if g.contains(&index) {
Some(GroupIndex(i as _))
} else {
None
})
}
/// Chooses a random subset of sqrt(v.len()), but at least `min` elements.
pub fn choose_random_sqrt_subset<T>(mut v: Vec<T>, min: usize) -> Vec<T> {
use rand::seq::SliceRandom as _;
@@ -299,7 +238,7 @@ impl Validator {
pub async fn new(
parent: Hash,
keystore: SyncCryptoStorePtr,
sender: &mut JobSender<impl SubsystemSender>,
sender: &mut impl SubsystemSender,
) -> Result<Self, Error> {
// Note: request_validators and request_session_index_for_child do not and cannot
// run concurrently: they both have a mutable handle to the same sender.
@@ -327,13 +266,9 @@ impl Validator {
signing_context: SigningContext,
keystore: SyncCryptoStorePtr,
) -> Result<Self, Error> {
let key = signing_key(validators, keystore).await.ok_or(Error::NotAValidator)?;
let index = validators
.iter()
.enumerate()
.find(|(_, k)| k == &&key)
.map(|(idx, _)| ValidatorIndex(idx as u32))
.expect("signing_key would have already returned NotAValidator if the item we're searching for isn't in this list; qed");
let (key, index) = signing_key_and_index(validators, &keystore)
.await
.ok_or(Error::NotAValidator)?;
Ok(Validator {
signing_context,
@@ -46,7 +46,7 @@ pub async fn connect_to_validators<Context: SubsystemContext>(
validators: Vec<ValidatorId>,
peer_set: PeerSet,
) -> Result<ConnectionRequest, Error> {
let current_index = crate::request_session_index_for_child_ctx(relay_parent, ctx).await?.await??;
let current_index = crate::request_session_index_for_child(relay_parent, ctx.sender()).await.await??;
connect_to_validators_in_session(
ctx,
relay_parent,
@@ -64,11 +64,11 @@ pub async fn connect_to_validators_in_session<Context: SubsystemContext>(
peer_set: PeerSet,
session_index: SessionIndex,
) -> Result<ConnectionRequest, Error> {
let session_info = crate::request_session_info_ctx(
let session_info = crate::request_session_info(
relay_parent,
session_index,
ctx,
).await?.await??;
ctx.sender(),
).await.await??;
let (session_validators, discovery_keys) = match session_info {
Some(info) => (info.validators, info.discovery_keys),
+53
View File
@@ -613,9 +613,42 @@ impl GroupRotationInfo {
let blocks_since_start = self.now.saturating_sub(self.session_start_block);
let rotations = blocks_since_start / self.group_rotation_frequency;
// g = c + r mod cores
let idx = (core_index.0 as usize + rotations as usize) % cores;
GroupIndex(idx as u32)
}
/// Returns the index of the group assigned to the given core. This does no checking or
/// whether the group index is in-bounds.
///
/// `core_index` should be less than `cores`, which is capped at u32::max().
pub fn core_for_group(&self, group_index: GroupIndex, cores: usize) -> CoreIndex {
if self.group_rotation_frequency == 0 { return CoreIndex(group_index.0) }
if cores == 0 { return CoreIndex(0) }
let cores = sp_std::cmp::min(cores, u32::max_value() as usize);
let blocks_since_start = self.now.saturating_sub(self.session_start_block);
let rotations = blocks_since_start / self.group_rotation_frequency;
let rotations = rotations % cores as u32;
// g = c + r mod cores
// c = g - r mod cores
// x = x + cores mod cores
// c = (g + cores) - r mod cores
let idx = (group_index.0 as usize + cores - rotations as usize) % cores;
CoreIndex(idx as u32)
}
/// Create a new `GroupRotationInfo` with one further rotation applied.
pub fn bump_rotation(&self) -> Self {
GroupRotationInfo {
session_start_block: self.session_start_block,
group_rotation_frequency: self.group_rotation_frequency,
now: self.next_rotation_at(),
}
}
}
impl<N: Saturating + BaseArithmetic + Copy> GroupRotationInfo<N> {
@@ -1107,6 +1140,26 @@ mod tests {
assert_eq!(info.last_rotation_at(), 15);
}
#[test]
fn group_for_core_is_core_for_group() {
for cores in 1..=256 {
for rotations in 0..(cores * 2) {
let info = GroupRotationInfo {
session_start_block: 0u32,
now: rotations,
group_rotation_frequency: 1,
};
for core in 0..cores {
let group = info.group_for_core(CoreIndex(core), cores as usize);
assert_eq!(info.core_for_group(group, cores as usize).0, core);
}
}
}
}
#[test]
fn collator_signature_payload_is_valid() {
// if this fails, collator signature verification code has to be updated.
@@ -55,8 +55,6 @@ As with most other subsystems, we track the active leaves set by following `Acti
For the purposes of actually distributing a collation, we need to be connected to the validators who are interested in collations on that `ParaId` at this point in time. We assume that there is a discovery API for connecting to a set of validators.
> TODO: design & expose the discovery API not just for connecting to such peers but also to determine which of our current peers are validators.
As seen in the [Scheduler Module][SCH] of the runtime, validator groups are fixed for an entire session and their rotations across cores are predictable. Collators will want to do these things when attempting to distribute collations at a given relay-parent:
* Determine which core the para collated-on is assigned to.
* Determine the group on that core and the next group on that core.
@@ -100,7 +98,7 @@ digraph G {
}
```
When peers connect to us, they can `Declare` that they represent a collator with given public key. Once they've declared that, and we checked their signature, they can begin to send advertisements of collations. The peers should not send us any advertisements for collations that are on a relay-parent outside of our view.
When peers connect to us, they can `Declare` that they represent a collator with given public key and intend to collate on a specific para ID. Once they've declared that, and we checked their signature, they can begin to send advertisements of collations. The peers should not send us any advertisements for collations that are on a relay-parent outside of our view or for a para outside of the one they've declared.
The protocol tracks advertisements received and the source of the advertisement. The advertisement source is the `PeerId` of the peer who sent the message. We accept one advertisement per collator per source per relay-parent.
@@ -102,16 +102,12 @@ enum StatementDistributionV1Message {
```rust
enum CollatorProtocolV1Message {
/// Declare the intent to advertise collations under a collator ID, attaching a
/// Declare the intent to advertise collations under a collator ID and `Para`, attaching a
/// signature of the `PeerId` of the node using the given collator ID key.
Declare(CollatorId, CollatorSignature),
Declare(CollatorId, ParaId, CollatorSignature),
/// Advertise a collation to a validator. Can only be sent once the peer has
/// declared that they are a collator with given ID.
AdvertiseCollation(Hash, ParaId),
/// Request the advertised collation at that relay-parent.
RequestCollation(RequestId, Hash, ParaId),
/// A requested collation.
Collation(RequestId, CandidateReceipt, CompressedPoV),
AdvertiseCollation(Hash),
/// A collation sent to a validator was seconded.
CollationSeconded(SignedFullStatement),
}