mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-07-05 03:57:24 +00:00
Authentication of PeerIds in authority discovery records (#10317)
* Consolidating test and production code * Signing/verifying authority discovery records with PeerId Unsigned records cannot be rejected yet, they just produce a warning in the log. * Upgrading to libp2p 0.40 * libp2p::identity and sp_core::crypto Ed25519 are compatible * Rejecting authority records unsigned by peer id can be configured * Fixes based on review comments * No command-line argument needed * info was still too much spam in the logs * Added tests for both strict and loose validation * Fixing based on review comments * Pierre preferred a signing method * Ooops, I need to slow down * Update bin/node/cli/src/service.rs * Reexport libp2p crypto used in sc-network * Added proto3 compatibility tests. And import noise. Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -19,7 +19,7 @@
|
||||
use crate::{
|
||||
error::{Error, Result},
|
||||
interval::ExpIncInterval,
|
||||
ServicetoWorkerMsg,
|
||||
ServicetoWorkerMsg, WorkerConfig,
|
||||
};
|
||||
|
||||
use std::{
|
||||
@@ -57,7 +57,10 @@ use sp_runtime::{generic::BlockId, traits::Block as BlockT};
|
||||
mod addr_cache;
|
||||
/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
|
||||
mod schema {
|
||||
include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs"));
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
include!(concat!(env!("OUT_DIR"), "/authority_discovery_v2.rs"));
|
||||
}
|
||||
#[cfg(test)]
|
||||
pub mod tests;
|
||||
@@ -111,6 +114,7 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
|
||||
client: Arc<Client>,
|
||||
|
||||
network: Arc<Network>,
|
||||
|
||||
/// Channel we receive Dht events on.
|
||||
dht_event_rx: DhtEventStream,
|
||||
|
||||
@@ -124,6 +128,8 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
|
||||
latest_published_keys: HashSet<CryptoTypePublicPair>,
|
||||
/// Same value as in the configuration.
|
||||
publish_non_global_ips: bool,
|
||||
/// Same value as in the configuration.
|
||||
strict_record_validation: bool,
|
||||
|
||||
/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
|
||||
query_interval: ExpIncInterval,
|
||||
@@ -131,7 +137,7 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
|
||||
/// Queue of throttled lookups pending to be passed to the network.
|
||||
pending_lookups: Vec<AuthorityId>,
|
||||
/// Set of in-flight lookups.
|
||||
in_flight_lookups: HashMap<libp2p::kad::record::Key, AuthorityId>,
|
||||
in_flight_lookups: HashMap<sc_network::KademliaKey, AuthorityId>,
|
||||
|
||||
addr_cache: addr_cache::AddrCache,
|
||||
|
||||
@@ -158,7 +164,7 @@ where
|
||||
dht_event_rx: DhtEventStream,
|
||||
role: Role,
|
||||
prometheus_registry: Option<prometheus_endpoint::Registry>,
|
||||
config: crate::WorkerConfig,
|
||||
config: WorkerConfig,
|
||||
) -> Self {
|
||||
// When a node starts up publishing and querying might fail due to various reasons, for
|
||||
// example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather
|
||||
@@ -197,6 +203,7 @@ where
|
||||
publish_if_changed_interval,
|
||||
latest_published_keys: HashSet::new(),
|
||||
publish_non_global_ips: config.publish_non_global_ips,
|
||||
strict_record_validation: config.strict_record_validation,
|
||||
query_interval,
|
||||
pending_lookups: Vec::new(),
|
||||
in_flight_lookups: HashMap::new(),
|
||||
@@ -313,7 +320,7 @@ where
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let addresses = self.addresses_to_publish().map(|a| a.to_vec()).collect::<Vec<_>>();
|
||||
let addresses = serialize_addresses(self.addresses_to_publish());
|
||||
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.publish.inc();
|
||||
@@ -322,32 +329,21 @@ where
|
||||
.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
|
||||
}
|
||||
|
||||
let mut serialized_addresses = vec![];
|
||||
schema::AuthorityAddresses { addresses }
|
||||
.encode(&mut serialized_addresses)
|
||||
.map_err(Error::EncodingProto)?;
|
||||
let serialized_record = serialize_authority_record(addresses)?;
|
||||
let peer_signature = sign_record_with_peer_id(&serialized_record, self.network.as_ref())?;
|
||||
|
||||
let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
|
||||
let signatures = key_store
|
||||
.sign_with_all(
|
||||
key_types::AUTHORITY_DISCOVERY,
|
||||
keys_vec.clone(),
|
||||
serialized_addresses.as_slice(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| Error::Signing)?;
|
||||
|
||||
for (sign_result, key) in signatures.into_iter().zip(keys_vec.iter()) {
|
||||
let mut signed_addresses = vec![];
|
||||
let kv_pairs = sign_record_with_authority_ids(
|
||||
serialized_record,
|
||||
Some(peer_signature),
|
||||
key_store.as_ref(),
|
||||
keys_vec,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Verify that all signatures exist for all provided keys.
|
||||
let signature =
|
||||
sign_result.ok().flatten().ok_or_else(|| Error::MissingSignature(key.clone()))?;
|
||||
schema::SignedAuthorityAddresses { addresses: serialized_addresses.clone(), signature }
|
||||
.encode(&mut signed_addresses)
|
||||
.map_err(Error::EncodingProto)?;
|
||||
|
||||
self.network.put_value(hash_authority_id(key.1.as_ref()), signed_addresses);
|
||||
for (key, value) in kv_pairs.into_iter() {
|
||||
self.network.put_value(key, value);
|
||||
}
|
||||
|
||||
self.latest_published_keys = keys;
|
||||
@@ -471,18 +467,11 @@ where
|
||||
|
||||
fn handle_dht_value_found_event(
|
||||
&mut self,
|
||||
values: Vec<(libp2p::kad::record::Key, Vec<u8>)>,
|
||||
values: Vec<(sc_network::KademliaKey, Vec<u8>)>,
|
||||
) -> Result<()> {
|
||||
// Ensure `values` is not empty and all its keys equal.
|
||||
let remote_key = values
|
||||
.iter()
|
||||
.fold(Ok(None), |acc, (key, _)| match acc {
|
||||
Ok(None) => Ok(Some(key.clone())),
|
||||
Ok(Some(ref prev_key)) if prev_key != key =>
|
||||
Err(Error::ReceivingDhtValueFoundEventWithDifferentKeys),
|
||||
x @ Ok(_) => x,
|
||||
Err(e) => Err(e),
|
||||
})?
|
||||
let remote_key = single(values.iter().map(|(key, _)| key.clone()))
|
||||
.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentKeys)?
|
||||
.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
|
||||
|
||||
let authority_id: AuthorityId = self
|
||||
@@ -495,18 +484,18 @@ where
|
||||
let remote_addresses: Vec<Multiaddr> = values
|
||||
.into_iter()
|
||||
.map(|(_k, v)| {
|
||||
let schema::SignedAuthorityAddresses { signature, addresses } =
|
||||
schema::SignedAuthorityAddresses::decode(v.as_slice())
|
||||
let schema::SignedAuthorityRecord { record, auth_signature, peer_signature } =
|
||||
schema::SignedAuthorityRecord::decode(v.as_slice())
|
||||
.map_err(Error::DecodingProto)?;
|
||||
|
||||
let signature = AuthoritySignature::decode(&mut &signature[..])
|
||||
let auth_signature = AuthoritySignature::decode(&mut &auth_signature[..])
|
||||
.map_err(Error::EncodingDecodingScale)?;
|
||||
|
||||
if !AuthorityPair::verify(&signature, &addresses, &authority_id) {
|
||||
if !AuthorityPair::verify(&auth_signature, &record, &authority_id) {
|
||||
return Err(Error::VerifyingDhtPayload)
|
||||
}
|
||||
|
||||
let addresses = schema::AuthorityAddresses::decode(addresses.as_slice())
|
||||
let addresses: Vec<Multiaddr> = schema::AuthorityRecord::decode(record.as_slice())
|
||||
.map(|a| a.addresses)
|
||||
.map_err(Error::DecodingProto)?
|
||||
.into_iter()
|
||||
@@ -514,32 +503,49 @@ where
|
||||
.collect::<std::result::Result<_, _>>()
|
||||
.map_err(Error::ParsingMultiaddress)?;
|
||||
|
||||
let get_peer_id = |a: &Multiaddr| match a.iter().last() {
|
||||
Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses.
|
||||
let addresses: Vec<Multiaddr> = addresses
|
||||
.into_iter()
|
||||
.filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
|
||||
.collect();
|
||||
|
||||
let remote_peer_id = single(addresses.iter().map(get_peer_id))
|
||||
.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records
|
||||
.flatten()
|
||||
.ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them
|
||||
|
||||
// At this point we know all the valid multiaddresses from the record, know that
|
||||
// each of them belong to the same PeerId, we just need to check if the record is
|
||||
// properly signed by the owner of the PeerId
|
||||
|
||||
if let Some(peer_signature) = peer_signature {
|
||||
let public_key =
|
||||
sc_network::PublicKey::from_protobuf_encoding(&peer_signature.public_key)
|
||||
.map_err(|e| Error::ParsingLibp2pIdentity(e))?;
|
||||
let signature =
|
||||
sc_network::Signature { public_key, bytes: peer_signature.signature };
|
||||
|
||||
if !signature.verify(record, &remote_peer_id) {
|
||||
return Err(Error::VerifyingDhtPayload)
|
||||
}
|
||||
} else if self.strict_record_validation {
|
||||
return Err(Error::MissingPeerIdSignature)
|
||||
} else {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Received unsigned authority discovery record from {}", authority_id
|
||||
);
|
||||
}
|
||||
Ok(addresses)
|
||||
})
|
||||
.collect::<Result<Vec<Vec<Multiaddr>>>>()?
|
||||
.into_iter()
|
||||
.flatten()
|
||||
// Ignore [`Multiaddr`]s without [`PeerId`] and own addresses.
|
||||
.filter(|addr| {
|
||||
addr.iter().any(|protocol| {
|
||||
// Parse to PeerId first as Multihashes of old and new PeerId
|
||||
// representation don't equal.
|
||||
//
|
||||
// See https://github.com/libp2p/rust-libp2p/issues/555 for
|
||||
// details.
|
||||
if let multiaddr::Protocol::P2p(hash) = protocol {
|
||||
let peer_id = match PeerId::from_multihash(hash) {
|
||||
Ok(peer_id) => peer_id,
|
||||
Err(_) => return false, // Discard address.
|
||||
};
|
||||
|
||||
// Discard if equal to local peer id, keep if it differs.
|
||||
return !(peer_id == local_peer_id)
|
||||
}
|
||||
|
||||
false // `protocol` is not a [`Protocol::P2p`], let's keep looking.
|
||||
})
|
||||
})
|
||||
.take(MAX_ADDRESSES_PER_AUTHORITY)
|
||||
.collect();
|
||||
|
||||
@@ -588,16 +594,37 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub trait NetworkSigner {
|
||||
/// Sign a message in the name of `self.local_peer_id()`
|
||||
fn sign_with_local_identity(
|
||||
&self,
|
||||
msg: impl AsRef<[u8]>,
|
||||
) -> std::result::Result<sc_network::Signature, sc_network::SigningError>;
|
||||
}
|
||||
|
||||
/// NetworkProvider provides [`Worker`] with all necessary hooks into the
|
||||
/// underlying Substrate networking. Using this trait abstraction instead of
|
||||
/// [`sc_network::NetworkService`] directly is necessary to unit test [`Worker`].
|
||||
#[async_trait]
|
||||
pub trait NetworkProvider: NetworkStateInfo {
|
||||
pub trait NetworkProvider: NetworkStateInfo + NetworkSigner {
|
||||
/// Start putting a value in the Dht.
|
||||
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>);
|
||||
fn put_value(&self, key: sc_network::KademliaKey, value: Vec<u8>);
|
||||
|
||||
/// Start getting a value from the Dht.
|
||||
fn get_value(&self, key: &libp2p::kad::record::Key);
|
||||
fn get_value(&self, key: &sc_network::KademliaKey);
|
||||
}
|
||||
|
||||
impl<B, H> NetworkSigner for sc_network::NetworkService<B, H>
|
||||
where
|
||||
B: BlockT + 'static,
|
||||
H: ExHashT,
|
||||
{
|
||||
fn sign_with_local_identity(
|
||||
&self,
|
||||
msg: impl AsRef<[u8]>,
|
||||
) -> std::result::Result<sc_network::Signature, sc_network::SigningError> {
|
||||
self.sign_with_local_identity(msg)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -606,16 +633,87 @@ where
|
||||
B: BlockT + 'static,
|
||||
H: ExHashT,
|
||||
{
|
||||
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
|
||||
fn put_value(&self, key: sc_network::KademliaKey, value: Vec<u8>) {
|
||||
self.put_value(key, value)
|
||||
}
|
||||
fn get_value(&self, key: &libp2p::kad::record::Key) {
|
||||
fn get_value(&self, key: &sc_network::KademliaKey) {
|
||||
self.get_value(key)
|
||||
}
|
||||
}
|
||||
|
||||
fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
|
||||
libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
|
||||
fn hash_authority_id(id: &[u8]) -> sc_network::KademliaKey {
|
||||
sc_network::KademliaKey::new(&libp2p::multihash::Sha2_256::digest(id))
|
||||
}
|
||||
|
||||
// Makes sure all values are the same and returns it
|
||||
//
|
||||
// Returns Err(_) if not all values are equal. Returns Ok(None) if there are
|
||||
// no values.
|
||||
fn single<T>(values: impl IntoIterator<Item = T>) -> std::result::Result<Option<T>, ()>
|
||||
where
|
||||
T: PartialEq<T>,
|
||||
{
|
||||
values.into_iter().try_fold(None, |acc, item| match acc {
|
||||
None => Ok(Some(item)),
|
||||
Some(ref prev) if *prev != item => Err(()),
|
||||
Some(x) => Ok(Some(x)),
|
||||
})
|
||||
}
|
||||
|
||||
fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8>> {
|
||||
addresses.map(|a| a.to_vec()).collect()
|
||||
}
|
||||
|
||||
fn serialize_authority_record(addresses: Vec<Vec<u8>>) -> Result<Vec<u8>> {
|
||||
let mut serialized_record = vec![];
|
||||
schema::AuthorityRecord { addresses }
|
||||
.encode(&mut serialized_record)
|
||||
.map_err(Error::EncodingProto)?;
|
||||
Ok(serialized_record)
|
||||
}
|
||||
|
||||
fn sign_record_with_peer_id(
|
||||
serialized_record: &[u8],
|
||||
network: &impl NetworkSigner,
|
||||
) -> Result<schema::PeerSignature> {
|
||||
let signature = network
|
||||
.sign_with_local_identity(serialized_record)
|
||||
.map_err(|_| Error::Signing)?;
|
||||
let public_key = signature.public_key.to_protobuf_encoding();
|
||||
let signature = signature.bytes;
|
||||
Ok(schema::PeerSignature { signature, public_key })
|
||||
}
|
||||
|
||||
async fn sign_record_with_authority_ids(
|
||||
serialized_record: Vec<u8>,
|
||||
peer_signature: Option<schema::PeerSignature>,
|
||||
key_store: &dyn CryptoStore,
|
||||
keys: Vec<CryptoTypePublicPair>,
|
||||
) -> Result<Vec<(sc_network::KademliaKey, Vec<u8>)>> {
|
||||
let signatures = key_store
|
||||
.sign_with_all(key_types::AUTHORITY_DISCOVERY, keys.clone(), &serialized_record)
|
||||
.await
|
||||
.map_err(|_| Error::Signing)?;
|
||||
|
||||
let mut result = vec![];
|
||||
for (sign_result, key) in signatures.into_iter().zip(keys.iter()) {
|
||||
let mut signed_record = vec![];
|
||||
|
||||
// Verify that all signatures exist for all provided keys.
|
||||
let auth_signature =
|
||||
sign_result.ok().flatten().ok_or_else(|| Error::MissingSignature(key.clone()))?;
|
||||
schema::SignedAuthorityRecord {
|
||||
record: serialized_record.clone(),
|
||||
auth_signature,
|
||||
peer_signature: peer_signature.clone(),
|
||||
}
|
||||
.encode(&mut signed_record)
|
||||
.map_err(Error::EncodingProto)?;
|
||||
|
||||
result.push((hash_authority_id(key.1.as_ref()), signed_record));
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Prometheus metrics for a [`Worker`].
|
||||
|
||||
Reference in New Issue
Block a user