mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 05:11:09 +00:00
Collator for the "adder" (formerly basic-add) parachain and various small fixes (#438)
* update basic_add wasm * wasm feature and collator feature * move test parachains around a little * fix wasm build for basic_add * move basic_add to adder, introduce README * minimal basic_add collator * ensure collator messages are sent in the right order * more logging * route consensus statements to all peers * minor bugfixes for parachains * genesis builder accounts for parachain heads * fix parachains tests * targets for txpool * tweak runtime + collator * fix version in adder-collator * consistency for overflowing * adjust comment * fix stable test run * remove dummy registration test * final grumbles
This commit is contained in:
committed by
GitHub
parent
503bcb5686
commit
bd3890da7e
@@ -176,6 +176,7 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> MessageProcessTask<P> {
|
||||
}
|
||||
}
|
||||
ConsensusMessage::ChainSpecific(msg, _) => {
|
||||
debug!(target: "consensus", "Processing consensus statement for live consensus");
|
||||
if let Some(Message::Statement(parent_hash, statement)) = Decode::decode(&mut msg.as_slice()) {
|
||||
if ::polkadot_consensus::check_statement(&statement.statement, &statement.signature, statement.sender, &parent_hash) {
|
||||
self.table_router.import_statement(statement);
|
||||
|
||||
@@ -111,10 +111,36 @@ struct BlockDataRequest {
|
||||
sender: oneshot::Sender<BlockData>,
|
||||
}
|
||||
|
||||
// ensures collator-protocol messages are sent in correct order.
|
||||
// session key must be sent before collator role.
|
||||
enum CollatorState {
|
||||
Fresh,
|
||||
RolePending(Role),
|
||||
Primed,
|
||||
}
|
||||
|
||||
impl CollatorState {
|
||||
fn send_key<F: FnMut(Message)>(&mut self, key: SessionKey, mut f: F) {
|
||||
f(Message::SessionKey(key));
|
||||
if let CollatorState::RolePending(role) = ::std::mem::replace(self, CollatorState::Primed) {
|
||||
f(Message::CollatorRole(role));
|
||||
}
|
||||
}
|
||||
|
||||
fn set_role<F: FnMut(Message)>(&mut self, role: Role, mut f: F) {
|
||||
if let CollatorState::Primed = *self {
|
||||
f(Message::CollatorRole(role));
|
||||
} else {
|
||||
*self = CollatorState::RolePending(role);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PeerInfo {
|
||||
collating_for: Option<(AccountId, ParaId)>,
|
||||
validator_key: Option<SessionKey>,
|
||||
claimed_validator: bool,
|
||||
collator_state: CollatorState,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -281,8 +307,8 @@ impl PolkadotProtocol {
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a statement to a validator.
|
||||
fn send_statement(&mut self, ctx: &mut Context<Block>, _val: SessionKey, parent_hash: Hash, statement: SignedStatement) {
|
||||
/// Gossip a consensus statement.
|
||||
fn gossip_statement(&mut self, ctx: &mut Context<Block>, parent_hash: Hash, statement: SignedStatement) {
|
||||
// TODO: something more targeted than gossip.
|
||||
let raw = Message::Statement(parent_hash, statement).encode();
|
||||
self.consensus_gossip.multicast_chain_specific(ctx, raw, parent_hash);
|
||||
@@ -309,14 +335,14 @@ impl PolkadotProtocol {
|
||||
let old_data = self.live_consensus.as_ref().map(|c| (c.parent_hash, c.local_session_key));
|
||||
|
||||
if Some(&consensus.local_session_key) != old_data.as_ref().map(|&(_, ref key)| key) {
|
||||
for (id, _) in self.peers.iter()
|
||||
for (id, peer_data) in self.peers.iter_mut()
|
||||
.filter(|&(_, ref info)| info.claimed_validator || info.collating_for.is_some())
|
||||
{
|
||||
send_polkadot_message(
|
||||
peer_data.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message(
|
||||
ctx,
|
||||
*id,
|
||||
Message::SessionKey(consensus.local_session_key)
|
||||
);
|
||||
msg
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -452,12 +478,15 @@ impl PolkadotProtocol {
|
||||
}
|
||||
};
|
||||
|
||||
debug!(target: "p_net", "New collator role {:?} from {}", role, who);
|
||||
|
||||
match info.validator_key {
|
||||
None => ctx.report_peer(
|
||||
who,
|
||||
Severity::Bad("Sent collator role without registering first as validator"),
|
||||
),
|
||||
Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) {
|
||||
debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent);
|
||||
send_polkadot_message(
|
||||
ctx,
|
||||
who,
|
||||
@@ -481,38 +510,41 @@ impl Specialization<Block> for PolkadotProtocol {
|
||||
}
|
||||
};
|
||||
|
||||
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
|
||||
let send_key = validator || local_status.collating_for.is_some();
|
||||
|
||||
let mut peer_info = PeerInfo {
|
||||
collating_for: local_status.collating_for,
|
||||
validator_key: None,
|
||||
claimed_validator: validator,
|
||||
collator_state: CollatorState::Fresh,
|
||||
};
|
||||
|
||||
if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
|
||||
if self.collator_peer_id(acc_id.clone()).is_some() {
|
||||
if self.collator_peer(acc_id.clone()).is_some() {
|
||||
ctx.report_peer(who, Severity::Useless("Unknown Polkadot-specific reason"));
|
||||
return
|
||||
}
|
||||
|
||||
let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
|
||||
send_polkadot_message(
|
||||
|
||||
peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message(
|
||||
ctx,
|
||||
who,
|
||||
Message::CollatorRole(collator_role),
|
||||
);
|
||||
msg,
|
||||
));
|
||||
}
|
||||
|
||||
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
|
||||
let send_key = validator || local_status.collating_for.is_some();
|
||||
|
||||
self.peers.insert(who, PeerInfo {
|
||||
collating_for: local_status.collating_for,
|
||||
validator_key: None,
|
||||
claimed_validator: validator,
|
||||
});
|
||||
|
||||
self.consensus_gossip.new_peer(ctx, who, status.roles);
|
||||
if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
|
||||
send_polkadot_message(
|
||||
peer_info.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message(
|
||||
ctx,
|
||||
who,
|
||||
Message::SessionKey(consensus.local_session_key)
|
||||
);
|
||||
msg,
|
||||
));
|
||||
}
|
||||
|
||||
self.peers.insert(who, peer_info);
|
||||
self.consensus_gossip.new_peer(ctx, who, status.roles);
|
||||
self.dispatch_pending_requests(ctx);
|
||||
}
|
||||
|
||||
@@ -520,14 +552,14 @@ impl Specialization<Block> for PolkadotProtocol {
|
||||
if let Some(info) = self.peers.remove(&who) {
|
||||
if let Some((acc_id, _)) = info.collating_for {
|
||||
let new_primary = self.collators.on_disconnect(acc_id)
|
||||
.and_then(|new_primary| self.collator_peer_id(new_primary));
|
||||
.and_then(|new_primary| self.collator_peer(new_primary));
|
||||
|
||||
if let Some(new_primary) = new_primary {
|
||||
send_polkadot_message(
|
||||
if let Some((new_primary, primary_info)) = new_primary {
|
||||
primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message(
|
||||
ctx,
|
||||
new_primary,
|
||||
Message::CollatorRole(Role::Primary),
|
||||
)
|
||||
msg,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -592,12 +624,12 @@ impl Specialization<Block> for PolkadotProtocol {
|
||||
for collator_action in self.collators.maintain_peers() {
|
||||
match collator_action {
|
||||
Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator),
|
||||
Action::NewRole(account_id, role) => if let Some(collator) = self.collator_peer_id(account_id) {
|
||||
send_polkadot_message(
|
||||
Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) {
|
||||
info.collator_state.set_role(role, |msg| send_polkadot_message(
|
||||
ctx,
|
||||
collator,
|
||||
Message::CollatorRole(role),
|
||||
)
|
||||
msg,
|
||||
))
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -622,6 +654,7 @@ impl PolkadotProtocol {
|
||||
Some((ref acc_id, ref para_id)) => {
|
||||
let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
|
||||
if structurally_valid && collation.receipt.check_signature().is_ok() {
|
||||
debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from);
|
||||
self.collators.on_collation(acc_id.clone(), relay_parent, collation)
|
||||
} else {
|
||||
ctx.report_peer(from, Severity::Bad("Sent malformed collation"))
|
||||
@@ -633,27 +666,28 @@ impl PolkadotProtocol {
|
||||
|
||||
fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
|
||||
self.collators.await_collation(relay_parent, para_id, tx);
|
||||
rx
|
||||
}
|
||||
|
||||
// get connected peer with given account ID for collation.
|
||||
fn collator_peer_id(&self, account_id: AccountId) -> Option<NodeIndex> {
|
||||
fn collator_peer(&mut self, account_id: AccountId) -> Option<(NodeIndex, &mut PeerInfo)> {
|
||||
let check_info = |info: &PeerInfo| info
|
||||
.collating_for
|
||||
.as_ref()
|
||||
.map_or(false, |&(ref acc_id, _)| acc_id == &account_id);
|
||||
|
||||
self.peers
|
||||
.iter()
|
||||
.filter(|&(_, info)| check_info(info))
|
||||
.map(|(who, _)| *who)
|
||||
.iter_mut()
|
||||
.filter(|&(_, ref info)| check_info(&**info))
|
||||
.map(|(who, info)| (*who, info))
|
||||
.next()
|
||||
}
|
||||
|
||||
// disconnect a collator by account-id.
|
||||
fn disconnect_bad_collator(&self, ctx: &mut Context<Block>, account_id: AccountId) {
|
||||
if let Some(who) = self.collator_peer_id(account_id) {
|
||||
fn disconnect_bad_collator(&mut self, ctx: &mut Context<Block>, account_id: AccountId) {
|
||||
if let Some((who, _)) = self.collator_peer(account_id) {
|
||||
ctx.report_peer(who, Severity::Bad("Consensus layer determined the given collator misbehaved"))
|
||||
}
|
||||
}
|
||||
@@ -668,13 +702,19 @@ impl PolkadotProtocol {
|
||||
targets: HashSet<SessionKey>,
|
||||
collation: Collation,
|
||||
) {
|
||||
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
|
||||
relay_parent, collation.receipt.parachain_index);
|
||||
|
||||
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
|
||||
match self.validators.get(&primary) {
|
||||
Some(who) => send_polkadot_message(
|
||||
ctx,
|
||||
*who,
|
||||
Message::Collation(relay_parent, cloned_collation),
|
||||
),
|
||||
Some(who) => {
|
||||
debug!(target: "p_net", "Sending local collation to {:?}", primary);
|
||||
send_polkadot_message(
|
||||
ctx,
|
||||
*who,
|
||||
Message::Collation(relay_parent, cloned_collation),
|
||||
)
|
||||
},
|
||||
None =>
|
||||
warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary),
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
use polkadot_api::{PolkadotApi, LocalPolkadotApi};
|
||||
use polkadot_consensus::{SharedTable, TableRouter, SignedStatement, GenericStatement, StatementProducer};
|
||||
use polkadot_primitives::{Hash, BlockId, SessionKey};
|
||||
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt, Id as ParaId};
|
||||
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt};
|
||||
|
||||
use futures::prelude::*;
|
||||
use tokio::runtime::TaskExecutor;
|
||||
@@ -89,14 +89,16 @@ impl<P: PolkadotApi> Clone for Router<P> {
|
||||
impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
|
||||
/// Import a statement whose signature has been checked already.
|
||||
pub(crate) fn import_statement(&self, statement: SignedStatement) {
|
||||
trace!(target: "p_net", "importing consensus statement {:?}", statement.statement);
|
||||
|
||||
// defer any statements for which we haven't imported the candidate yet
|
||||
let (c_hash, parachain_index) = {
|
||||
let c_hash = {
|
||||
let candidate_data = match statement.statement {
|
||||
GenericStatement::Candidate(ref c) => Some((c.hash(), c.parachain_index)),
|
||||
GenericStatement::Candidate(ref c) => Some(c.hash()),
|
||||
GenericStatement::Valid(ref hash)
|
||||
| GenericStatement::Invalid(ref hash)
|
||||
| GenericStatement::Available(ref hash)
|
||||
=> self.table.with_candidate(hash, |c| c.map(|c| (*hash, c.parachain_index))),
|
||||
=> self.table.with_candidate(hash, |c| c.map(|_| *hash)),
|
||||
};
|
||||
match candidate_data {
|
||||
Some(x) => x,
|
||||
@@ -115,6 +117,7 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
|
||||
};
|
||||
|
||||
// prepend the candidate statement.
|
||||
debug!(target: "consensus", "Importing statements about candidate {:?}", c_hash);
|
||||
statements.insert(0, statement);
|
||||
let producers: Vec<_> = self.table.import_remote_statements(
|
||||
self,
|
||||
@@ -122,17 +125,16 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
|
||||
);
|
||||
// dispatch future work as necessary.
|
||||
for (producer, statement) in producers.into_iter().zip(statements) {
|
||||
let producer = match producer {
|
||||
Some(p) => p,
|
||||
None => continue, // statement redundant
|
||||
};
|
||||
|
||||
self.knowledge.lock().note_statement(statement.sender, &statement.statement);
|
||||
self.dispatch_work(c_hash, producer, parachain_index);
|
||||
|
||||
if let Some(producer) = producer {
|
||||
trace!(target: "consensus", "driving statement work to completion");
|
||||
self.dispatch_work(c_hash, producer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dispatch_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>, parachain: ParaId) where
|
||||
fn dispatch_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>) where
|
||||
D: Future<Item=BlockData,Error=()> + Send + 'static,
|
||||
E: Future<Item=Extrinsic,Error=()> + Send + 'static,
|
||||
{
|
||||
@@ -160,13 +162,13 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
|
||||
|
||||
// propagate the statements
|
||||
if let Some(validity) = produced.validity {
|
||||
let signed = table.sign_and_import(validity.clone());
|
||||
route_statement(&*network, &*table, parachain, parent_hash, signed);
|
||||
let signed = table.sign_and_import(validity.clone()).0;
|
||||
network.with_spec(|spec, ctx| spec.gossip_statement(ctx, parent_hash, signed));
|
||||
}
|
||||
|
||||
if let Some(availability) = produced.availability {
|
||||
let signed = table.sign_and_import(availability);
|
||||
route_statement(&*network, &*table, parachain, parent_hash, signed);
|
||||
let signed = table.sign_and_import(availability).0;
|
||||
network.with_spec(|spec, ctx| spec.gossip_statement(ctx, parent_hash, signed));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -182,11 +184,15 @@ impl<P: LocalPolkadotApi + Send> TableRouter for Router<P> {
|
||||
fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) {
|
||||
// give to network to make available.
|
||||
let hash = receipt.hash();
|
||||
let para_id = receipt.parachain_index;
|
||||
let signed = self.table.sign_and_import(GenericStatement::Candidate(receipt));
|
||||
let (candidate, availability) = self.table.sign_and_import(GenericStatement::Candidate(receipt));
|
||||
|
||||
self.knowledge.lock().note_candidate(hash, Some(block_data), Some(extrinsic));
|
||||
route_statement(&*self.network, &*self.table, para_id, self.parent_hash, signed);
|
||||
self.network.with_spec(|spec, ctx| {
|
||||
spec.gossip_statement(ctx, self.parent_hash, candidate);
|
||||
if let Some(availability) = availability {
|
||||
spec.gossip_statement(ctx, self.parent_hash, availability);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver {
|
||||
@@ -217,32 +223,6 @@ impl Future for BlockDataReceiver {
|
||||
}
|
||||
}
|
||||
|
||||
// get statement to relevant validators.
|
||||
fn route_statement(network: &NetworkService, table: &SharedTable, para_id: ParaId, parent_hash: Hash, statement: SignedStatement) {
|
||||
let broadcast = |i: &mut Iterator<Item=&SessionKey>| {
|
||||
let local_key = table.session_key();
|
||||
network.with_spec(|spec, ctx| {
|
||||
for val in i.filter(|&x| x != &local_key) {
|
||||
spec.send_statement(ctx, *val, parent_hash, statement.clone());
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
let g_info = table
|
||||
.group_info()
|
||||
.get(¶_id)
|
||||
.expect("statements only produced about groups which exist");
|
||||
|
||||
match statement.statement {
|
||||
GenericStatement::Candidate(_) =>
|
||||
broadcast(&mut g_info.validity_guarantors.iter().chain(g_info.availability_guarantors.iter())),
|
||||
GenericStatement::Valid(_) | GenericStatement::Invalid(_) =>
|
||||
broadcast(&mut g_info.validity_guarantors.iter()),
|
||||
GenericStatement::Available(_) =>
|
||||
broadcast(&mut g_info.availability_guarantors.iter()),
|
||||
}
|
||||
}
|
||||
|
||||
// A unique trace for valid statements issued by a validator.
|
||||
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
|
||||
enum StatementTrace {
|
||||
|
||||
Reference in New Issue
Block a user