mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 16:51:03 +00:00
Flood protection for large statements. (#2984)
* Flood protection for large statements. * Add test for flood protection. * Doc improvements.
This commit is contained in:
@@ -93,6 +93,9 @@ const VC_THRESHOLD: usize = 2;
|
|||||||
|
|
||||||
const LOG_TARGET: &str = "parachain::statement-distribution";
|
const LOG_TARGET: &str = "parachain::statement-distribution";
|
||||||
|
|
||||||
|
/// Large statements should be rare.
|
||||||
|
const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20;
|
||||||
|
|
||||||
/// The statement distribution subsystem.
|
/// The statement distribution subsystem.
|
||||||
pub struct StatementDistribution {
|
pub struct StatementDistribution {
|
||||||
/// Pointer to a keystore, which is required for determining this nodes validator index.
|
/// Pointer to a keystore, which is required for determining this nodes validator index.
|
||||||
@@ -194,6 +197,29 @@ struct PeerRelayParentKnowledge {
|
|||||||
seconded_counts: HashMap<ValidatorIndex, VcPerPeerTracker>,
|
seconded_counts: HashMap<ValidatorIndex, VcPerPeerTracker>,
|
||||||
/// How many statements we've received for each candidate that we're aware of.
|
/// How many statements we've received for each candidate that we're aware of.
|
||||||
received_message_count: HashMap<CandidateHash, usize>,
|
received_message_count: HashMap<CandidateHash, usize>,
|
||||||
|
|
||||||
|
|
||||||
|
/// How many large statements this peer already sent us.
|
||||||
|
///
|
||||||
|
/// Flood protection for large statements is rather hard and as soon as we get
|
||||||
|
/// https://github.com/paritytech/polkadot/issues/2979 implemented also no longer necessary.
|
||||||
|
/// Reason: We keep messages around until we fetched the payload, but if a node makes up
|
||||||
|
/// statements and never provides the data, we will keep it around for the slot duration. Not
|
||||||
|
/// even signature checking would help, as the sender, if a validator, can just sign arbitrary
|
||||||
|
/// invalid statements and will not face any consequences as long as it won't provide the
|
||||||
|
/// payload.
|
||||||
|
///
|
||||||
|
/// Quick and temporary fix, only accept `MAX_LARGE_STATEMENTS_PER_SENDER` per connected node.
|
||||||
|
///
|
||||||
|
/// Large statements should be rare, if they were not, we would run into problems anyways, as
|
||||||
|
/// we would not be able to distribute them in a timely manner. Therefore
|
||||||
|
/// `MAX_LARGE_STATEMENTS_PER_SENDER` can be set to a relatively small number. It is also not
|
||||||
|
/// per candidate hash, but in total as candidate hashes can be made up, as illustrated above.
|
||||||
|
///
|
||||||
|
/// An attacker could still try to fill up our memory, by repeatedly disconnecting and
|
||||||
|
/// connecting again with new peer ids, but we assume that the resulting effective bandwidth
|
||||||
|
/// for such an attack would be too low.
|
||||||
|
large_statement_count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerRelayParentKnowledge {
|
impl PeerRelayParentKnowledge {
|
||||||
@@ -318,6 +344,15 @@ impl PeerRelayParentKnowledge {
|
|||||||
Ok(self.received_candidates.insert(candidate_hash.clone()))
|
Ok(self.received_candidates.insert(candidate_hash.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Note a received large statement metadata.
|
||||||
|
fn receive_large_statement(&mut self) -> std::result::Result<(), Rep> {
|
||||||
|
if self.large_statement_count >= MAX_LARGE_STATEMENTS_PER_SENDER {
|
||||||
|
return Err(COST_APPARENT_FLOOD);
|
||||||
|
}
|
||||||
|
self.large_statement_count += 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// This method does the same checks as `receive` without modifying the internal state.
|
/// This method does the same checks as `receive` without modifying the internal state.
|
||||||
/// Returns an error if the peer should not have sent us this message according to protocol
|
/// Returns an error if the peer should not have sent us this message according to protocol
|
||||||
/// rules for flood protection.
|
/// rules for flood protection.
|
||||||
@@ -458,6 +493,17 @@ impl PeerData {
|
|||||||
.ok_or(COST_UNEXPECTED_STATEMENT)?
|
.ok_or(COST_UNEXPECTED_STATEMENT)?
|
||||||
.check_can_receive(fingerprint, max_message_count)
|
.check_can_receive(fingerprint, max_message_count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Basic flood protection for large statements.
|
||||||
|
fn receive_large_statement(
|
||||||
|
&mut self,
|
||||||
|
relay_parent: &Hash,
|
||||||
|
) -> std::result::Result<(), Rep> {
|
||||||
|
self.view_knowledge
|
||||||
|
.get_mut(relay_parent)
|
||||||
|
.ok_or(COST_UNEXPECTED_STATEMENT)?
|
||||||
|
.receive_large_statement()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A statement stored while a relay chain head is active.
|
// A statement stored while a relay chain head is active.
|
||||||
@@ -1278,6 +1324,20 @@ async fn handle_incoming_message<'a>(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let protocol_v1::StatementDistributionMessage::LargeStatement(_) = message {
|
||||||
|
if let Err(rep) = peer_data.receive_large_statement(&relay_parent) {
|
||||||
|
tracing::debug!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
?peer,
|
||||||
|
?message,
|
||||||
|
?rep,
|
||||||
|
"Unexpected large statement.",
|
||||||
|
);
|
||||||
|
report_peer(ctx, peer, rep).await;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let fingerprint = message.get_fingerprint();
|
let fingerprint = message.get_fingerprint();
|
||||||
let candidate_hash = fingerprint.0.candidate_hash().clone();
|
let candidate_hash = fingerprint.0.candidate_hash().clone();
|
||||||
let handle_incoming_span = active_head.span.child("handle-incoming")
|
let handle_incoming_span = active_head.span.child("handle-incoming")
|
||||||
@@ -3471,6 +3531,176 @@ mod tests {
|
|||||||
executor::block_on(future::join(test_fut, bg));
|
executor::block_on(future::join(test_fut, bg));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn peer_cant_flood_with_large_statements() {
|
||||||
|
sp_tracing::try_init_simple();
|
||||||
|
let hash_a = Hash::repeat_byte(1);
|
||||||
|
|
||||||
|
let candidate = {
|
||||||
|
let mut c = CommittedCandidateReceipt::default();
|
||||||
|
c.descriptor.relay_parent = hash_a;
|
||||||
|
c.descriptor.para_id = 1.into();
|
||||||
|
c.commitments.new_validation_code = Some(ValidationCode(vec![1,2,3]));
|
||||||
|
c
|
||||||
|
};
|
||||||
|
|
||||||
|
let peer_a = PeerId::random(); // Alice
|
||||||
|
|
||||||
|
let validators = vec![
|
||||||
|
Sr25519Keyring::Alice.pair(),
|
||||||
|
Sr25519Keyring::Bob.pair(),
|
||||||
|
Sr25519Keyring::Charlie.pair(),
|
||||||
|
// other group
|
||||||
|
Sr25519Keyring::Dave.pair(),
|
||||||
|
// We:
|
||||||
|
Sr25519Keyring::Ferdie.pair(),
|
||||||
|
];
|
||||||
|
|
||||||
|
let first_group = vec![0,1,2,4];
|
||||||
|
let session_info = make_session_info(
|
||||||
|
validators,
|
||||||
|
vec![first_group, vec![3]]
|
||||||
|
);
|
||||||
|
|
||||||
|
let session_index = 1;
|
||||||
|
|
||||||
|
let pool = sp_core::testing::TaskExecutor::new();
|
||||||
|
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
|
||||||
|
|
||||||
|
let bg = async move {
|
||||||
|
let s = StatementDistribution { metrics: Default::default(), keystore: make_ferdie_keystore()};
|
||||||
|
s.run(ctx).await.unwrap();
|
||||||
|
};
|
||||||
|
|
||||||
|
let (_, rx_reqs) = mpsc::channel(1);
|
||||||
|
|
||||||
|
let test_fut = async move {
|
||||||
|
handle.send(FromOverseer::Communication {
|
||||||
|
msg: StatementDistributionMessage::StatementFetchingReceiver(rx_reqs)
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
// register our active heads.
|
||||||
|
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
|
activated: vec![ActivatedLeaf {
|
||||||
|
hash: hash_a,
|
||||||
|
number: 1,
|
||||||
|
span: Arc::new(jaeger::Span::Disabled),
|
||||||
|
}].into(),
|
||||||
|
deactivated: vec![].into(),
|
||||||
|
}))).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
handle.recv().await,
|
||||||
|
AllMessages::RuntimeApi(
|
||||||
|
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
|
||||||
|
)
|
||||||
|
if r == hash_a
|
||||||
|
=> {
|
||||||
|
let _ = tx.send(Ok(session_index));
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
handle.recv().await,
|
||||||
|
AllMessages::RuntimeApi(
|
||||||
|
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
|
||||||
|
)
|
||||||
|
if r == hash_a && sess_index == session_index
|
||||||
|
=> {
|
||||||
|
let _ = tx.send(Ok(Some(session_info)));
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// notify of peers and view
|
||||||
|
handle.send(FromOverseer::Communication {
|
||||||
|
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerConnected(
|
||||||
|
peer_a.clone(),
|
||||||
|
ObservedRole::Full,
|
||||||
|
Some(Sr25519Keyring::Alice.public().into())
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
handle.send(FromOverseer::Communication {
|
||||||
|
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a])
|
||||||
|
)
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
// receive a seconded statement from peer A.
|
||||||
|
let statement = {
|
||||||
|
let signing_context = SigningContext {
|
||||||
|
parent_hash: hash_a,
|
||||||
|
session_index,
|
||||||
|
};
|
||||||
|
|
||||||
|
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
|
||||||
|
let alice_public = CryptoStore::sr25519_generate_new(
|
||||||
|
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
|
||||||
|
).await.unwrap();
|
||||||
|
|
||||||
|
SignedFullStatement::sign(
|
||||||
|
&keystore,
|
||||||
|
Statement::Seconded(candidate.clone()),
|
||||||
|
&signing_context,
|
||||||
|
ValidatorIndex(0),
|
||||||
|
&alice_public.into(),
|
||||||
|
).await.ok().flatten().expect("should be signed")
|
||||||
|
};
|
||||||
|
|
||||||
|
let metadata =
|
||||||
|
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into()).get_metadata();
|
||||||
|
|
||||||
|
for _ in 0..MAX_LARGE_STATEMENTS_PER_SENDER + 1 {
|
||||||
|
handle.send(FromOverseer::Communication {
|
||||||
|
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||||
|
NetworkBridgeEvent::PeerMessage(
|
||||||
|
peer_a.clone(),
|
||||||
|
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should try to fetch the data:
|
||||||
|
assert_matches!(
|
||||||
|
handle.recv().await,
|
||||||
|
AllMessages::NetworkBridge(
|
||||||
|
NetworkBridgeMessage::SendRequests(
|
||||||
|
mut reqs, IfDisconnected::ImmediateError
|
||||||
|
)
|
||||||
|
) => {
|
||||||
|
let reqs = reqs.pop().unwrap();
|
||||||
|
let outgoing = match reqs {
|
||||||
|
Requests::StatementFetching(outgoing) => outgoing,
|
||||||
|
_ => panic!("Unexpected request"),
|
||||||
|
};
|
||||||
|
let req = outgoing.payload;
|
||||||
|
assert_eq!(req.relay_parent, metadata.relay_parent);
|
||||||
|
assert_eq!(req.candidate_hash, metadata.candidate_hash);
|
||||||
|
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
|
||||||
|
// Just drop request - should trigger error.
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Then we should punish peer:
|
||||||
|
assert_matches!(
|
||||||
|
handle.recv().await,
|
||||||
|
AllMessages::NetworkBridge(
|
||||||
|
NetworkBridgeMessage::ReportPeer(p, r)
|
||||||
|
) if p == peer_a && r == COST_APPARENT_FLOOD => {}
|
||||||
|
);
|
||||||
|
|
||||||
|
handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||||
|
};
|
||||||
|
|
||||||
|
futures::pin_mut!(test_fut);
|
||||||
|
futures::pin_mut!(bg);
|
||||||
|
|
||||||
|
executor::block_on(future::join(test_fut, bg));
|
||||||
|
}
|
||||||
|
|
||||||
fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInfo {
|
fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInfo {
|
||||||
|
|
||||||
let validator_groups: Vec<Vec<ValidatorIndex>> = groups
|
let validator_groups: Vec<Vec<ValidatorIndex>> = groups
|
||||||
|
|||||||
Reference in New Issue
Block a user