mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 15:21:08 +00:00
Do not send messages twice in bitfield distribution (#2005)
* Do not send messages twice in bitfield distribution This removes a bug which resulted in sending bitfield messages multiple times by not checking if we already relayed them. Besides that it also adds an optimization to not relay a message to a peer that send us this message. * Review comments * Break some lines
This commit is contained in:
Generated
-2
@@ -4746,11 +4746,9 @@ dependencies = [
|
||||
"polkadot-node-subsystem-test-helpers",
|
||||
"polkadot-node-subsystem-util",
|
||||
"polkadot-primitives",
|
||||
"sc-keystore",
|
||||
"sp-application-crypto",
|
||||
"sp-core",
|
||||
"sp-keystore",
|
||||
"tempfile",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
]
|
||||
|
||||
@@ -20,9 +20,7 @@ bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
maplit = "1.0.2"
|
||||
log = "0.4.11"
|
||||
env_logger = "0.8.2"
|
||||
assert_matches = "1.4.0"
|
||||
tempfile = "3.1.0"
|
||||
|
||||
@@ -121,11 +121,8 @@ impl PerRelayParentData {
|
||||
peer: &PeerId,
|
||||
validator: &ValidatorId,
|
||||
) -> bool {
|
||||
if let Some(set) = self.message_sent_to_peer.get(peer) {
|
||||
!set.contains(validator)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
self.message_sent_to_peer.get(peer).map(|v| !v.contains(validator)).unwrap_or(true)
|
||||
&& self.message_received_from_peer.get(peer).map(|v| !v.contains(validator)).unwrap_or(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,21 +318,24 @@ where
|
||||
))
|
||||
.await;
|
||||
|
||||
let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
|
||||
|
||||
// pass on the bitfield distribution to all interested peers
|
||||
let interested_peers = peer_views
|
||||
.iter()
|
||||
.filter_map(|(peer, view)| {
|
||||
// check interest in the peer in this message's relay parent
|
||||
if view.contains(&message.relay_parent) {
|
||||
let message_needed = job_data.message_from_validator_needed_by_peer(&peer, &validator);
|
||||
// track the message as sent for this peer
|
||||
message_sent_to_peer
|
||||
job_data.message_sent_to_peer
|
||||
.entry(peer.clone())
|
||||
.or_default()
|
||||
.insert(validator.clone());
|
||||
|
||||
Some(peer.clone())
|
||||
if message_needed {
|
||||
Some(peer.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -529,11 +529,7 @@ async fn handle_peer_view_change<Context>(
|
||||
where
|
||||
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
|
||||
{
|
||||
let current = state.peer_views.entry(origin.clone()).or_default();
|
||||
|
||||
let added: Vec<Hash> = view.difference(&*current).cloned().collect();
|
||||
|
||||
*current = view;
|
||||
let added = state.peer_views.entry(origin.clone()).or_default().replace_difference(view).cloned().collect::<Vec<_>>();
|
||||
|
||||
// Send all messages we've seen before and the peer is now interested
|
||||
// in to that peer.
|
||||
@@ -585,8 +581,7 @@ where
|
||||
return;
|
||||
};
|
||||
|
||||
let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
|
||||
message_sent_to_peer
|
||||
job_data.message_sent_to_peer
|
||||
.entry(dest.clone())
|
||||
.or_default()
|
||||
.insert(validator.clone());
|
||||
@@ -755,7 +750,7 @@ mod test {
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore};
|
||||
use sp_application_crypto::AppKey;
|
||||
use sc_keystore::LocalKeystore;
|
||||
use sp_keystore::testing::KeyStore;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use assert_matches::assert_matches;
|
||||
@@ -767,12 +762,6 @@ mod test {
|
||||
];
|
||||
}
|
||||
|
||||
macro_rules! peers {
|
||||
( $( $peer:expr ),* $(,)? ) => [
|
||||
vec![ $( $peer.clone() ),* ]
|
||||
];
|
||||
}
|
||||
|
||||
macro_rules! launch {
|
||||
($fut:expr) => {
|
||||
$fut
|
||||
@@ -816,7 +805,6 @@ mod test {
|
||||
fn state_with_view(
|
||||
view: View,
|
||||
relay_parent: Hash,
|
||||
keystore_path: &tempfile::TempDir,
|
||||
) -> (ProtocolState, SigningContext, SyncCryptoStorePtr, ValidatorId) {
|
||||
let mut state = ProtocolState::default();
|
||||
|
||||
@@ -825,8 +813,7 @@ mod test {
|
||||
parent_hash: relay_parent.clone(),
|
||||
};
|
||||
|
||||
let keystore : SyncCryptoStorePtr = Arc::new(LocalKeystore::open(keystore_path.path(), None)
|
||||
.expect("Creates keystore"));
|
||||
let keystore : SyncCryptoStorePtr = Arc::new(KeyStore::new());
|
||||
let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
|
||||
.expect("generating sr25519 key not to fail");
|
||||
|
||||
@@ -865,18 +852,20 @@ mod test {
|
||||
};
|
||||
|
||||
// another validator not part of the validatorset
|
||||
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
|
||||
let keystore : SyncCryptoStorePtr = Arc::new(LocalKeystore::open(keystore_path.path(), None)
|
||||
.expect("Creates keystore"));
|
||||
let keystore : SyncCryptoStorePtr = Arc::new(KeyStore::new());
|
||||
let malicious = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
|
||||
.expect("Malicious key created");
|
||||
let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
|
||||
.expect("Malicious key created");
|
||||
|
||||
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
|
||||
let signed =
|
||||
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &malicious.into()))
|
||||
.expect("should be signed");
|
||||
let signed = executor::block_on(Signed::<AvailabilityBitfield>::sign(
|
||||
&keystore,
|
||||
payload,
|
||||
&signing_context,
|
||||
0,
|
||||
&malicious.into(),
|
||||
)).expect("should be signed");
|
||||
|
||||
let msg = BitfieldGossipMessage {
|
||||
relay_parent: hash_a.clone(),
|
||||
@@ -929,17 +918,19 @@ mod test {
|
||||
let peer_b = PeerId::random();
|
||||
assert_ne!(peer_a, peer_b);
|
||||
|
||||
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
|
||||
// validator 0 key pair
|
||||
let (mut state, signing_context, keystore, validator) =
|
||||
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
|
||||
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
|
||||
|
||||
state.peer_views.insert(peer_b.clone(), view![hash_a]);
|
||||
|
||||
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
|
||||
let signed =
|
||||
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 42, &validator))
|
||||
.expect("should be signed");
|
||||
let signed = executor::block_on(Signed::<AvailabilityBitfield>::sign(
|
||||
&keystore,
|
||||
payload,
|
||||
&signing_context,
|
||||
42,
|
||||
&validator,
|
||||
)).expect("should be signed");
|
||||
|
||||
let msg = BitfieldGossipMessage {
|
||||
relay_parent: hash_a.clone(),
|
||||
@@ -985,16 +976,18 @@ mod test {
|
||||
let peer_b = PeerId::random();
|
||||
assert_ne!(peer_a, peer_b);
|
||||
|
||||
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
|
||||
// validator 0 key pair
|
||||
let (mut state, signing_context, keystore, validator) =
|
||||
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
|
||||
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
|
||||
|
||||
// create a signed message by validator 0
|
||||
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
|
||||
let signed_bitfield =
|
||||
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &validator))
|
||||
.expect("should be signed");
|
||||
let signed_bitfield = executor::block_on(Signed::<AvailabilityBitfield>::sign(
|
||||
&keystore,
|
||||
payload,
|
||||
&signing_context,
|
||||
0,
|
||||
&validator,
|
||||
)).expect("should be signed");
|
||||
|
||||
let msg = BitfieldGossipMessage {
|
||||
relay_parent: hash_a.clone(),
|
||||
@@ -1085,6 +1078,101 @@ mod test {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn do_not_relay_message_twice() {
|
||||
let _ = env_logger::builder()
|
||||
.filter(None, log::LevelFilter::Trace)
|
||||
.is_test(true)
|
||||
.try_init();
|
||||
|
||||
let hash = Hash::random();
|
||||
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
assert_ne!(peer_a, peer_b);
|
||||
|
||||
// validator 0 key pair
|
||||
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash.clone());
|
||||
|
||||
// create a signed message by validator 0
|
||||
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
|
||||
let signed_bitfield = executor::block_on(Signed::<AvailabilityBitfield>::sign(
|
||||
&keystore,
|
||||
payload,
|
||||
&signing_context,
|
||||
0,
|
||||
&validator,
|
||||
)).expect("should be signed");
|
||||
|
||||
state.peer_views.insert(peer_b.clone(), view![hash]);
|
||||
state.peer_views.insert(peer_a.clone(), view![hash]);
|
||||
|
||||
let msg = BitfieldGossipMessage {
|
||||
relay_parent: hash.clone(),
|
||||
signed_availability: signed_bitfield.clone(),
|
||||
};
|
||||
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (mut ctx, mut handle) =
|
||||
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
|
||||
|
||||
executor::block_on(async move {
|
||||
relay_message(
|
||||
&mut ctx,
|
||||
state.per_relay_parent.get_mut(&hash).unwrap(),
|
||||
&mut state.peer_views,
|
||||
validator.clone(),
|
||||
msg.clone(),
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
|
||||
_,
|
||||
ProvisionableData::Bitfield(h, signed)
|
||||
)) => {
|
||||
assert_eq!(h, hash);
|
||||
assert_eq!(signed, signed_bitfield)
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendValidationMessage(peers, send_msg),
|
||||
) => {
|
||||
assert_eq!(2, peers.len());
|
||||
assert!(peers.contains(&peer_a));
|
||||
assert!(peers.contains(&peer_b));
|
||||
assert_eq!(send_msg, msg.clone().into_validation_protocol());
|
||||
}
|
||||
);
|
||||
|
||||
// Relaying the message a second time shouldn't work.
|
||||
relay_message(
|
||||
&mut ctx,
|
||||
state.per_relay_parent.get_mut(&hash).unwrap(),
|
||||
&mut state.peer_views,
|
||||
validator.clone(),
|
||||
msg.clone(),
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
|
||||
_,
|
||||
ProvisionableData::Bitfield(h, signed)
|
||||
)) => {
|
||||
assert_eq!(h, hash);
|
||||
assert_eq!(signed, signed_bitfield)
|
||||
}
|
||||
);
|
||||
|
||||
// There shouldn't be any other message
|
||||
assert!(handle.recv().timeout(Duration::from_millis(10)).await.is_none());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn changing_view() {
|
||||
let _ = env_logger::builder()
|
||||
@@ -1099,16 +1187,18 @@ mod test {
|
||||
let peer_b = PeerId::random();
|
||||
assert_ne!(peer_a, peer_b);
|
||||
|
||||
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
|
||||
// validator 0 key pair
|
||||
let (mut state, signing_context, keystore, validator) =
|
||||
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
|
||||
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
|
||||
|
||||
// create a signed message by validator 0
|
||||
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
|
||||
let signed_bitfield =
|
||||
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &validator))
|
||||
.expect("should be signed");
|
||||
let signed_bitfield = executor::block_on(Signed::<AvailabilityBitfield>::sign(
|
||||
&keystore,
|
||||
payload,
|
||||
&signing_context,
|
||||
0,
|
||||
&validator,
|
||||
)).expect("should be signed");
|
||||
|
||||
let msg = BitfieldGossipMessage {
|
||||
relay_parent: hash_a.clone(),
|
||||
@@ -1160,17 +1250,6 @@ mod test {
|
||||
}
|
||||
);
|
||||
|
||||
// gossip to the network
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage (
|
||||
peers, out_msg,
|
||||
)) => {
|
||||
assert_eq!(peers, peers![peer_b]);
|
||||
assert_eq!(out_msg, msg.clone().into_validation_protocol());
|
||||
}
|
||||
);
|
||||
|
||||
// reputation change for peer B
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
@@ -1253,4 +1332,88 @@ mod test {
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn do_not_send_message_back_to_origin() {
|
||||
let _ = env_logger::builder()
|
||||
.filter(None, log::LevelFilter::Trace)
|
||||
.is_test(true)
|
||||
.try_init();
|
||||
|
||||
let hash: Hash = [0; 32].into();
|
||||
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
assert_ne!(peer_a, peer_b);
|
||||
|
||||
// validator 0 key pair
|
||||
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash);
|
||||
|
||||
// create a signed message by validator 0
|
||||
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
|
||||
let signed_bitfield = executor::block_on(Signed::<AvailabilityBitfield>::sign(
|
||||
&keystore,
|
||||
payload,
|
||||
&signing_context,
|
||||
0,
|
||||
&validator,
|
||||
)).expect("should be signed");
|
||||
|
||||
state.peer_views.insert(peer_b.clone(), view![hash]);
|
||||
state.peer_views.insert(peer_a.clone(), view![hash]);
|
||||
|
||||
let msg = BitfieldGossipMessage {
|
||||
relay_parent: hash.clone(),
|
||||
signed_availability: signed_bitfield.clone(),
|
||||
};
|
||||
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (mut ctx, mut handle) =
|
||||
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
|
||||
|
||||
executor::block_on(async move {
|
||||
// send a first message
|
||||
launch!(handle_network_msg(
|
||||
&mut ctx,
|
||||
&mut state,
|
||||
&Default::default(),
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_b.clone(),
|
||||
msg.clone().into_network_message(),
|
||||
),
|
||||
));
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
|
||||
_,
|
||||
ProvisionableData::Bitfield(hash, signed)
|
||||
)) => {
|
||||
assert_eq!(hash, hash);
|
||||
assert_eq!(signed, signed_bitfield)
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendValidationMessage(peers, send_msg),
|
||||
) => {
|
||||
assert_eq!(1, peers.len());
|
||||
assert!(peers.contains(&peer_a));
|
||||
assert_eq!(send_msg, msg.clone().into_validation_protocol());
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ReportPeer(peer, rep)
|
||||
) => {
|
||||
assert_eq!(peer, peer_b);
|
||||
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,6 +166,15 @@ impl<M> NetworkBridgeEvent<M> {
|
||||
pub struct View(pub Vec<Hash>);
|
||||
|
||||
impl View {
|
||||
/// Replace `self` with `new`.
|
||||
///
|
||||
/// Returns an iterator that will yield all elements of `new` that were not part of `self`.
|
||||
pub fn replace_difference(&mut self, new: View) -> impl Iterator<Item = &Hash> {
|
||||
let old = std::mem::replace(self, new);
|
||||
|
||||
self.0.iter().filter(move |h| !old.contains(h))
|
||||
}
|
||||
|
||||
/// Returns an iterator of the hashes present in `Self` but not in `other`.
|
||||
pub fn difference<'a>(&'a self, other: &'a View) -> impl Iterator<Item = &'a Hash> + 'a {
|
||||
self.0.iter().filter(move |h| !other.contains(h))
|
||||
|
||||
Reference in New Issue
Block a user