Files
pezkuwi-subxt/substrate/client/network-gossip/src/state_machine.rs
T
Gavin Wood fd5f9292f5 FRAME: Create TransactionExtension as a replacement for SignedExtension (#2280)
Closes #2160

First part of [Extrinsic
Horizon](https://github.com/paritytech/polkadot-sdk/issues/2415)

Introduces a new trait `TransactionExtension` to replace
`SignedExtension`. Introduce the idea of transactions which obey the
runtime's extensions and have according Extension data (né Extra data)
yet do not have hard-coded signatures.

Deprecate the terminology of "Unsigned" when used for
transactions/extrinsics owing to there now being "proper" unsigned
transactions which obey the extension framework and "old-style" unsigned
which do not. Instead we have __*General*__ for the former and
__*Bare*__ for the latter. (Ultimately, the latter will be phased out as
a type of transaction, and Bare will only be used for Inherents.)

Types of extrinsic are now therefore:
- Bare (no hardcoded signature, no Extra data; used to be known as
"Unsigned")
- Bare transactions (deprecated): Gossiped, validated with
`ValidateUnsigned` (deprecated) and the `_bare_compat` bits of
`TransactionExtension` (deprecated).
  - Inherents: Not gossiped, validated with `ProvideInherent`.
- Extended (Extra data): Gossiped, validated via `TransactionExtension`.
  - Signed transactions (with a hardcoded signature).
  - General transactions (without a hardcoded signature).

`TransactionExtension` differs from `SignedExtension` because:
- A signature on the underlying transaction may validly not be present.
- It may alter the origin during validation.
- `pre_dispatch` is renamed to `prepare` and need not contain the checks
present in `validate`.
- `validate` and `prepare` is passed an `Origin` rather than a
`AccountId`.
- `validate` may pass arbitrary information into `prepare` via a new
user-specifiable type `Val`.
- `AdditionalSigned`/`additional_signed` is renamed to
`Implicit`/`implicit`. It is encoded *for the entire transaction* and
passed in to each extension as a new argument to `validate`. This
facilitates the ability of extensions to acts as underlying crypto.

There is a new `DispatchTransaction` trait which contains only default
function impls and is impl'ed for any `TransactionExtension` impler. It
provides several utility functions which reduce some of the tedium from
using `TransactionExtension` (indeed, none of its regular functions
should now need to be called directly).

Three transaction version discriminator ("versions") are now
permissible:
- 0b000000100: Bare (used to be called "Unsigned"): contains Signature
or Extra (extension data). After bare transactions are no longer
supported, this will strictly identify an Inherents only.
- 0b100000100: Old-school "Signed" Transaction: contains Signature and
Extra (extension data).
- 0b010000100: New-school "General" Transaction: contains Extra
(extension data), but no Signature.

For the New-school General Transaction, it becomes trivial for authors
to publish extensions to the mechanism for authorizing an Origin, e.g.
through new kinds of key-signing schemes, ZK proofs, pallet state,
mutations over pre-authenticated origins or any combination of the
above.

## Code Migration

### NOW: Getting it to build

Wrap your `SignedExtension`s in `AsTransactionExtension`. This should be
accompanied by renaming your aggregate type in line with the new
terminology. E.g. Before:

```rust
/// The SignedExtension to the basic transaction logic.
pub type SignedExtra = (
	/* snip */
	MySpecialSignedExtension,
);
/// Unchecked extrinsic type as expected by this runtime.
pub type UncheckedExtrinsic =
	generic::UncheckedExtrinsic<Address, RuntimeCall, Signature, SignedExtra>;
```

After:

```rust
/// The extension to the basic transaction logic.
pub type TxExtension = (
	/* snip */
	AsTransactionExtension<MySpecialSignedExtension>,
);
/// Unchecked extrinsic type as expected by this runtime.
pub type UncheckedExtrinsic =
	generic::UncheckedExtrinsic<Address, RuntimeCall, Signature, TxExtension>;
```

You'll also need to alter any transaction building logic to add a
`.into()` to make the conversion happen. E.g. Before:

```rust
fn construct_extrinsic(
		/* snip */
) -> UncheckedExtrinsic {
	let extra: SignedExtra = (
		/* snip */
		MySpecialSignedExtension::new(/* snip */),
	);
	let payload = SignedPayload::new(call.clone(), extra.clone()).unwrap();
	let signature = payload.using_encoded(|e| sender.sign(e));
	UncheckedExtrinsic::new_signed(
		/* snip */
		Signature::Sr25519(signature),
		extra,
	)
}
```

After:

```rust
fn construct_extrinsic(
		/* snip */
) -> UncheckedExtrinsic {
	let tx_ext: TxExtension = (
		/* snip */
		MySpecialSignedExtension::new(/* snip */).into(),
	);
	let payload = SignedPayload::new(call.clone(), tx_ext.clone()).unwrap();
	let signature = payload.using_encoded(|e| sender.sign(e));
	UncheckedExtrinsic::new_signed(
		/* snip */
		Signature::Sr25519(signature),
		tx_ext,
	)
}
```

### SOON: Migrating to `TransactionExtension`

Most `SignedExtension`s can be trivially converted to become a
`TransactionExtension`. There are a few things to know.

- Instead of a single trait like `SignedExtension`, you should now
implement two traits individually: `TransactionExtensionBase` and
`TransactionExtension`.
- Weights are now a thing and must be provided via the new function `fn
weight`.

#### `TransactionExtensionBase`

This trait takes care of anything which is not dependent on types
specific to your runtime, most notably `Call`.

- `AdditionalSigned`/`additional_signed` is renamed to
`Implicit`/`implicit`.
- Weight must be returned by implementing the `weight` function. If your
extension is associated with a pallet, you'll probably want to do this
via the pallet's existing benchmarking infrastructure.

#### `TransactionExtension`

Generally:
- `pre_dispatch` is now `prepare` and you *should not reexecute the
`validate` functionality in there*!
- You don't get an account ID any more; you get an origin instead. If
you need to presume an account ID, then you can use the trait function
`AsSystemOriginSigner::as_system_origin_signer`.
- You get an additional ticket, similar to `Pre`, called `Val`. This
defines data which is passed from `validate` into `prepare`. This is
important since you should not be duplicating logic from `validate` to
`prepare`, you need a way of passing your working from the former into
the latter. This is it.
- This trait takes two type parameters: `Call` and `Context`. `Call` is
the runtime call type which used to be an associated type; you can just
move it to become a type parameter for your trait impl. `Context` is not
currently used and you can safely implement over it as an unbounded
type.
- There's no `AccountId` associated type any more. Just remove it.

Regarding `validate`:
- You get three new parameters in `validate`; all can be ignored when
migrating from `SignedExtension`.
- `validate` returns a tuple on success; the second item in the tuple is
the new ticket type `Self::Val` which gets passed in to `prepare`. If
you use any information extracted during `validate` (off-chain and
on-chain, non-mutating) in `prepare` (on-chain, mutating) then you can
pass it through with this. For the tuple's last item, just return the
`origin` argument.

Regarding `prepare`:
- This is renamed from `pre_dispatch`, but there is one change:
- FUNCTIONALITY TO VALIDATE THE TRANSACTION NEED NOT BE DUPLICATED FROM
`validate`!!
- (This is different to `SignedExtension` which was required to run the
same checks in `pre_dispatch` as in `validate`.)

Regarding `post_dispatch`:
- Since there are no unsigned transactions handled by
`TransactionExtension`, `Pre` is always defined, so the first parameter
is `Self::Pre` rather than `Option<Self::Pre>`.

If you make use of `SignedExtension::validate_unsigned` or
`SignedExtension::pre_dispatch_unsigned`, then:
- Just use the regular versions of these functions instead.
- Have your logic execute in the case that the `origin` is `None`.
- Ensure your transaction creation logic creates a General Transaction
rather than a Bare Transaction; this means having to include all
`TransactionExtension`s' data.
- `ValidateUnsigned` can still be used (for now) if you need to be able
to construct transactions which contain none of the extension data,
however these will be phased out in stage 2 of the Transactions Horizon,
so you should consider moving to an extension-centric design.

## TODO

- [x] Introduce `CheckSignature` impl of `TransactionExtension` to
ensure it's possible to have crypto be done wholly in a
`TransactionExtension`.
- [x] Deprecate `SignedExtension` and move all uses in codebase to
`TransactionExtension`.
  - [x] `ChargeTransactionPayment`
  - [x] `DummyExtension`
  - [x] `ChargeAssetTxPayment` (asset-tx-payment)
  - [x] `ChargeAssetTxPayment` (asset-conversion-tx-payment)
  - [x] `CheckWeight`
  - [x] `CheckTxVersion`
  - [x] `CheckSpecVersion`
  - [x] `CheckNonce`
  - [x] `CheckNonZeroSender`
  - [x] `CheckMortality`
  - [x] `CheckGenesis`
  - [x] `CheckOnlySudoAccount`
  - [x] `WatchDummy`
  - [x] `PrevalidateAttests`
  - [x] `GenericSignedExtension`
  - [x] `SignedExtension` (chain-polkadot-bulletin)
  - [x] `RefundSignedExtensionAdapter`
- [x] Implement `fn weight` across the board.
- [ ] Go through all pre-existing extensions which assume an account
signer and explicitly handle the possibility of another kind of origin.
- [x] `CheckNonce` should probably succeed in the case of a non-account
origin.
- [x] `CheckNonZeroSender` should succeed in the case of a non-account
origin.
- [x] `ChargeTransactionPayment` and family should fail in the case of a
non-account origin.
  - [ ] 
- [x] Fix any broken tests.

---------

Signed-off-by: georgepisaltu <george.pisaltu@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Co-authored-by: Nikhil Gupta <17176722+gupnik@users.noreply.github.com>
Co-authored-by: georgepisaltu <52418509+georgepisaltu@users.noreply.github.com>
Co-authored-by: Chevdor <chevdor@users.noreply.github.com>
Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Maciej <maciej.zyszkiewicz@parity.io>
Co-authored-by: Javier Viola <javier@parity.io>
Co-authored-by: Marcin S. <marcin@realemail.net>
Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
Co-authored-by: Javier Bullrich <javier@bullrich.dev>
Co-authored-by: Koute <koute@users.noreply.github.com>
Co-authored-by: Adrian Catangiu <adrian@parity.io>
Co-authored-by: Vladimir Istyufeev <vladimir@parity.io>
Co-authored-by: Ross Bulat <ross@parity.io>
Co-authored-by: Gonçalo Pestana <g6pestana@gmail.com>
Co-authored-by: Liam Aharon <liam.aharon@hotmail.com>
Co-authored-by: Svyatoslav Nikolsky <svyatonik@gmail.com>
Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com>
Co-authored-by: ordian <write@reusable.software>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Co-authored-by: Alexander Samusev <41779041+alvicsam@users.noreply.github.com>
Co-authored-by: Julian Eager <eagr@tutanota.com>
Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Co-authored-by: Davide Galassi <davxy@datawok.net>
Co-authored-by: Dónal Murray <donal.murray@parity.io>
Co-authored-by: yjh <yjh465402634@gmail.com>
Co-authored-by: Tom Mi <tommi@niemi.lol>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Will | Paradox | ParaNodes.io <79228812+paradox-tt@users.noreply.github.com>
Co-authored-by: Bastian Köcher <info@kchr.de>
Co-authored-by: Joshy Orndorff <JoshOrndorff@users.noreply.github.com>
Co-authored-by: Joshy Orndorff <git-user-email.h0ly5@simplelogin.com>
Co-authored-by: PG Herveou <pgherveou@gmail.com>
Co-authored-by: Alexander Theißen <alex.theissen@me.com>
Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>
Co-authored-by: Juan Girini <juangirini@gmail.com>
Co-authored-by: bader y <ibnbassem@gmail.com>
Co-authored-by: James Wilson <james@jsdw.me>
Co-authored-by: joe petrowski <25483142+joepetrowski@users.noreply.github.com>
Co-authored-by: asynchronous rob <rphmeier@gmail.com>
Co-authored-by: Parth <desaiparth08@gmail.com>
Co-authored-by: Andrew Jones <ascjones@gmail.com>
Co-authored-by: Jonathan Udd <jonathan@dwellir.com>
Co-authored-by: Serban Iorga <serban@parity.io>
Co-authored-by: Egor_P <egor@parity.io>
Co-authored-by: Branislav Kontur <bkontur@gmail.com>
Co-authored-by: Evgeny Snitko <evgeny@parity.io>
Co-authored-by: Just van Stam <vstam1@users.noreply.github.com>
Co-authored-by: Francisco Aguirre <franciscoaguirreperez@gmail.com>
Co-authored-by: gupnik <nikhilgupta.iitk@gmail.com>
Co-authored-by: dzmitry-lahoda <dzmitry@lahoda.pro>
Co-authored-by: zhiqiangxu <652732310@qq.com>
Co-authored-by: Nazar Mokrynskyi <nazar@mokrynskyi.com>
Co-authored-by: Anwesh <anweshknayak@gmail.com>
Co-authored-by: cheme <emericchevalier.pro@gmail.com>
Co-authored-by: Sam Johnson <sam@durosoft.com>
Co-authored-by: kianenigma <kian@parity.io>
Co-authored-by: Jegor Sidorenko <5252494+jsidorenko@users.noreply.github.com>
Co-authored-by: Muharem <ismailov.m.h@gmail.com>
Co-authored-by: joepetrowski <joe@parity.io>
Co-authored-by: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com>
Co-authored-by: Gabriel Facco de Arruda <arrudagates@gmail.com>
Co-authored-by: Squirrel <gilescope@gmail.com>
Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Co-authored-by: georgepisaltu <george.pisaltu@parity.io>
Co-authored-by: command-bot <>
2024-03-04 19:12:43 +00:00

943 lines
26 KiB
Rust

// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContext};
use ahash::AHashSet;
use libp2p::PeerId;
use schnellru::{ByLength, LruMap};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::{types::ProtocolName, NotificationService};
use sc_network_common::role::ObservedRole;
use sp_runtime::traits::{Block as BlockT, Hash, HashingFor};
use std::{collections::HashMap, iter, sync::Arc, time, time::Instant};
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
// NOTE: The current value is adjusted based on largest production network deployment (Kusama) and
// the current main gossip user (GRANDPA). Currently there are ~800 validators on Kusama, as such,
// each GRANDPA round should generate ~1600 messages, and we currently keep track of the last 2
// completed rounds and the current live one. That makes it so that at any point we will be holding
// ~4800 live messages.
//
// Assuming that each known message is tracked with a 32 byte hash (common for `Block::Hash`), then
// this cache should take about 256 KB of memory.
const KNOWN_MESSAGES_CACHE_SIZE: u32 = 8192;
const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750);
pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100);
mod rep {
use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sends us a gossip message that we didn't know about.
pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successful gossip");
/// Reputation change when a peer sends us a gossip message that we already knew about.
pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip");
}
struct PeerConsensus<H> {
known_messages: AHashSet<H>,
}
/// Topic stream message with sender.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TopicNotification {
/// Message data.
pub message: Vec<u8>,
/// Sender if available.
pub sender: Option<PeerId>,
}
struct MessageEntry<B: BlockT> {
message_hash: B::Hash,
topic: B::Hash,
message: Vec<u8>,
sender: Option<PeerId>,
}
/// Local implementation of `ValidatorContext`.
struct NetworkContext<'g, 'p, B: BlockT> {
gossip: &'g mut ConsensusGossip<B>,
notification_service: &'p mut Box<dyn NotificationService>,
}
impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
/// Broadcast all messages with given topic to peers that do not have it yet.
fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
self.gossip.broadcast_topic(self.notification_service, topic, force);
}
/// Broadcast a message to all peers that have not received it previously.
fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
self.gossip.multicast(self.notification_service, topic, message, force);
}
/// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.notification_service.send_sync_notification(who, message);
}
/// Send all messages with given topic to a peer.
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
self.gossip.send_topic(self.notification_service, who, topic, force);
}
}
fn propagate<'a, B: BlockT, I>(
notification_service: &mut Box<dyn NotificationService>,
protocol: ProtocolName,
messages: I,
intent: MessageIntent,
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
validator: &Arc<dyn Validator<B>>,
)
// (msg_hash, topic, message)
where
I: Clone + IntoIterator<Item = (&'a B::Hash, &'a B::Hash, &'a Vec<u8>)>,
{
let mut message_allowed = validator.message_allowed();
for (id, ref mut peer) in peers.iter_mut() {
for (message_hash, topic, message) in messages.clone() {
let intent = match intent {
MessageIntent::Broadcast { .. } =>
if peer.known_messages.contains(message_hash) {
continue
} else {
MessageIntent::Broadcast
},
MessageIntent::PeriodicRebroadcast => {
if peer.known_messages.contains(message_hash) {
MessageIntent::PeriodicRebroadcast
} else {
// peer doesn't know message, so the logic should treat it as an
// initial broadcast.
MessageIntent::Broadcast
}
},
other => other,
};
if !message_allowed(id, intent, topic, message) {
continue
}
peer.known_messages.insert(*message_hash);
tracing::trace!(
target: "gossip",
to = %id,
%protocol,
?message,
"Propagating message",
);
notification_service.send_sync_notification(id, message.clone());
}
}
}
/// Consensus network protocol handler. Manages statements and candidate requests.
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
messages: Vec<MessageEntry<B>>,
known_messages: LruMap<B::Hash, ()>,
protocol: ProtocolName,
validator: Arc<dyn Validator<B>>,
next_broadcast: Instant,
metrics: Option<Metrics>,
}
impl<B: BlockT> ConsensusGossip<B> {
/// Create a new instance using the given validator.
pub fn new(
validator: Arc<dyn Validator<B>>,
protocol: ProtocolName,
metrics_registry: Option<&Registry>,
) -> Self {
let metrics = match metrics_registry.map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
tracing::debug!(target: "gossip", "Failed to register metrics: {:?}", e);
None
},
None => None,
};
ConsensusGossip {
peers: HashMap::new(),
messages: Default::default(),
known_messages: { LruMap::new(ByLength::new(KNOWN_MESSAGES_CACHE_SIZE)) },
protocol,
validator,
next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
metrics,
}
}
/// Handle new connected peer.
pub fn new_peer(
&mut self,
notification_service: &mut Box<dyn NotificationService>,
who: PeerId,
role: ObservedRole,
) {
tracing::trace!(
target:"gossip",
%who,
protocol = %self.protocol,
?role,
"Registering peer",
);
self.peers.insert(who, PeerConsensus { known_messages: Default::default() });
let validator = self.validator.clone();
let mut context = NetworkContext { gossip: self, notification_service };
validator.new_peer(&mut context, &who, role);
}
fn register_message_hashed(
&mut self,
message_hash: B::Hash,
topic: B::Hash,
message: Vec<u8>,
sender: Option<PeerId>,
) {
if self.known_messages.insert(message_hash, ()) {
self.messages.push(MessageEntry { message_hash, topic, message, sender });
if let Some(ref metrics) = self.metrics {
metrics.registered_messages.inc();
}
}
}
/// Registers a message without propagating it to any peers. The message
/// becomes available to new peers or when the service is asked to gossip
/// the message's topic. No validation is performed on the message, if the
/// message is already expired it should be dropped on the next garbage
/// collection.
pub fn register_message(&mut self, topic: B::Hash, message: Vec<u8>) {
let message_hash = HashingFor::<B>::hash(&message[..]);
self.register_message_hashed(message_hash, topic, message, None);
}
/// Call when a peer has been disconnected to stop tracking gossip status.
pub fn peer_disconnected(
&mut self,
notification_service: &mut Box<dyn NotificationService>,
who: PeerId,
) {
let validator = self.validator.clone();
let mut context = NetworkContext { gossip: self, notification_service };
validator.peer_disconnected(&mut context, &who);
self.peers.remove(&who);
}
/// Perform periodic maintenance
pub fn tick(&mut self, notification_service: &mut Box<dyn NotificationService>) {
self.collect_garbage();
if Instant::now() >= self.next_broadcast {
self.rebroadcast(notification_service);
self.next_broadcast = Instant::now() + REBROADCAST_INTERVAL;
}
}
/// Rebroadcast all messages to all peers.
fn rebroadcast(&mut self, notification_service: &mut Box<dyn NotificationService>) {
let messages = self
.messages
.iter()
.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
propagate(
notification_service,
self.protocol.clone(),
messages,
MessageIntent::PeriodicRebroadcast,
&mut self.peers,
&self.validator,
);
}
/// Broadcast all messages with given topic.
pub fn broadcast_topic(
&mut self,
notification_service: &mut Box<dyn NotificationService>,
topic: B::Hash,
force: bool,
) {
let messages = self.messages.iter().filter_map(|entry| {
if entry.topic == topic {
Some((&entry.message_hash, &entry.topic, &entry.message))
} else {
None
}
});
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(
notification_service,
self.protocol.clone(),
messages,
intent,
&mut self.peers,
&self.validator,
);
}
/// Prune old or no longer relevant consensus messages. Provide a predicate
/// for pruning, which returns `false` when the items with a given topic should be pruned.
pub fn collect_garbage(&mut self) {
let known_messages = &mut self.known_messages;
let before = self.messages.len();
let mut message_expired = self.validator.message_expired();
self.messages.retain(|entry| !message_expired(entry.topic, &entry.message));
let expired_messages = before - self.messages.len();
if let Some(ref metrics) = self.metrics {
metrics.expired_messages.inc_by(expired_messages as u64)
}
tracing::trace!(
target: "gossip",
protocol = %self.protocol,
"Cleaned up {} stale messages, {} left ({} known)",
expired_messages,
self.messages.len(),
known_messages.len(),
);
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| known_messages.get(h).is_some());
}
}
/// Get valid messages received in the past for a topic (might have expired meanwhile).
pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ {
self.messages
.iter()
.filter(move |e| e.topic == topic)
.map(|entry| TopicNotification { message: entry.message.clone(), sender: entry.sender })
}
/// Register incoming messages and return the ones that are new and valid (according to a gossip
/// validator) and should thus be forwarded to the upper layers.
pub fn on_incoming(
&mut self,
network: &mut dyn Network<B>,
notification_service: &mut Box<dyn NotificationService>,
who: PeerId,
messages: Vec<Vec<u8>>,
) -> Vec<(B::Hash, TopicNotification)> {
let mut to_forward = vec![];
if !messages.is_empty() {
tracing::trace!(
target: "gossip",
messages_num = %messages.len(),
%who,
protocol = %self.protocol,
"Received messages from peer",
);
}
for message in messages {
let message_hash = HashingFor::<B>::hash(&message[..]);
if self.known_messages.get(&message_hash).is_some() {
tracing::trace!(
target: "gossip",
%who,
protocol = %self.protocol,
"Ignored already known message",
);
// If the peer already send us the message once, let's report them.
if self
.peers
.get_mut(&who)
.map_or(false, |p| !p.known_messages.insert(message_hash))
{
network.report_peer(who, rep::DUPLICATE_GOSSIP);
}
continue
}
// validate the message
let validation = {
let validator = self.validator.clone();
let mut context = NetworkContext { gossip: self, notification_service };
validator.validate(&mut context, &who, &message)
};
let (topic, keep) = match validation {
ValidationResult::ProcessAndKeep(topic) => (topic, true),
ValidationResult::ProcessAndDiscard(topic) => (topic, false),
ValidationResult::Discard => {
tracing::trace!(
target: "gossip",
%who,
protocol = %self.protocol,
"Discard message from peer",
);
continue
},
};
let peer = match self.peers.get_mut(&who) {
Some(peer) => peer,
None => {
tracing::error!(
target: "gossip",
%who,
protocol = %self.protocol,
"Got message from unregistered peer",
);
continue
},
};
network.report_peer(who, rep::GOSSIP_SUCCESS);
peer.known_messages.insert(message_hash);
to_forward
.push((topic, TopicNotification { message: message.clone(), sender: Some(who) }));
if keep {
self.register_message_hashed(message_hash, topic, message, Some(who));
}
}
to_forward
}
/// Send all messages with given topic to a peer.
pub fn send_topic(
&mut self,
notification_service: &mut Box<dyn NotificationService>,
who: &PeerId,
topic: B::Hash,
force: bool,
) {
let mut message_allowed = self.validator.message_allowed();
if let Some(ref mut peer) = self.peers.get_mut(who) {
for entry in self.messages.iter().filter(|m| m.topic == topic) {
let intent =
if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
if !force && peer.known_messages.contains(&entry.message_hash) {
continue
}
if !message_allowed(who, intent, &entry.topic, &entry.message) {
continue
}
peer.known_messages.insert(entry.message_hash);
tracing::trace!(
target: "gossip",
to = %who,
protocol = %self.protocol,
?entry.message,
"Sending topic message",
);
notification_service.send_sync_notification(who, entry.message.clone());
}
}
}
/// Multicast a message to all peers.
pub fn multicast(
&mut self,
notification_service: &mut Box<dyn NotificationService>,
topic: B::Hash,
message: Vec<u8>,
force: bool,
) {
let message_hash = HashingFor::<B>::hash(&message);
self.register_message_hashed(message_hash, topic, message.clone(), None);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(
notification_service,
self.protocol.clone(),
iter::once((&message_hash, &topic, &message)),
intent,
&mut self.peers,
&self.validator,
);
}
/// Send addressed message to a peer. The message is not kept or multicast
/// later on.
pub fn send_message(
&mut self,
notification_service: &mut Box<dyn NotificationService>,
who: &PeerId,
message: Vec<u8>,
) {
let peer = match self.peers.get_mut(who) {
None => return,
Some(peer) => peer,
};
let message_hash = HashingFor::<B>::hash(&message);
tracing::trace!(
target: "gossip",
to = %who,
protocol = %self.protocol,
?message,
"Sending direct message",
);
peer.known_messages.insert(message_hash);
notification_service.send_sync_notification(who, message)
}
}
struct Metrics {
registered_messages: Counter<U64>,
expired_messages: Counter<U64>,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
registered_messages: register(
Counter::new(
"substrate_network_gossip_registered_messages_total",
"Number of registered messages by the gossip service.",
)?,
registry,
)?,
expired_messages: register(
Counter::new(
"substrate_network_gossip_expired_messages_total",
"Number of expired messages by the gossip service.",
)?,
registry,
)?,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::multiaddr::Multiaddr;
use futures::prelude::*;
use sc_network::{
config::MultiaddrWithPeerId, event::Event, service::traits::NotificationEvent, MessageSink,
NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NotificationSenderError, NotificationSenderT as NotificationSender, ReputationChange,
};
use sp_runtime::{
generic::UncheckedExtrinsic,
testing::{Block as RawBlock, MockCallU64, H256},
traits::NumberFor,
};
use std::{
collections::HashSet,
pin::Pin,
sync::{Arc, Mutex},
};
type Block = RawBlock<UncheckedExtrinsic<u64, MockCallU64, (), ()>>;
macro_rules! push_msg {
($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
if $consensus.known_messages.insert($hash, ()) {
$consensus.messages.push(MessageEntry {
message_hash: $hash,
topic: $topic,
message: $m,
sender: None,
});
}
};
}
struct AllowAll;
impl Validator<Block> for AllowAll {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::ProcessAndKeep(H256::default())
}
}
struct DiscardAll;
impl Validator<Block> for DiscardAll {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::Discard
}
}
#[derive(Clone, Default)]
struct NoOpNetwork {
inner: Arc<Mutex<NoOpNetworkInner>>,
}
#[derive(Clone, Default)]
struct NoOpNetworkInner {
peer_reports: Vec<(PeerId, ReputationChange)>,
}
impl NetworkPeers for NoOpNetwork {
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
unimplemented!();
}
fn set_authorized_only(&self, _reserved_only: bool) {
unimplemented!();
}
fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
unimplemented!();
}
fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
self.inner.lock().unwrap().peer_reports.push((peer_id, cost_benefit));
}
fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
unimplemented!()
}
fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
unimplemented!();
}
fn accept_unreserved_peers(&self) {
unimplemented!();
}
fn deny_unreserved_peers(&self) {
unimplemented!();
}
fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
unimplemented!();
}
fn remove_reserved_peer(&self, _peer_id: PeerId) {
unimplemented!();
}
fn set_reserved_peers(
&self,
_protocol: ProtocolName,
_peers: HashSet<Multiaddr>,
) -> Result<(), String> {
unimplemented!();
}
fn add_peers_to_reserved_set(
&self,
_protocol: ProtocolName,
_peers: HashSet<Multiaddr>,
) -> Result<(), String> {
unimplemented!();
}
fn remove_peers_from_reserved_set(
&self,
_protocol: ProtocolName,
_peers: Vec<PeerId>,
) -> Result<(), String> {
unimplemented!();
}
fn sync_num_connected(&self) -> usize {
unimplemented!();
}
fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
None
}
}
impl NetworkEventStream for NoOpNetwork {
fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
unimplemented!();
}
}
impl NetworkNotification for NoOpNetwork {
fn write_notification(&self, _target: PeerId, _protocol: ProtocolName, _message: Vec<u8>) {
unimplemented!();
}
fn notification_sender(
&self,
_target: PeerId,
_protocol: ProtocolName,
) -> Result<Box<dyn NotificationSender>, NotificationSenderError> {
unimplemented!();
}
fn set_notification_handshake(&self, _protocol: ProtocolName, _handshake: Vec<u8>) {
unimplemented!();
}
}
impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for NoOpNetwork {
fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
unimplemented!();
}
fn new_best_block_imported(
&self,
_hash: <Block as BlockT>::Hash,
_number: NumberFor<Block>,
) {
unimplemented!();
}
}
#[derive(Debug, Default)]
struct NoOpNotificationService {}
#[async_trait::async_trait]
impl NotificationService for NoOpNotificationService {
/// Instruct `Notifications` to open a new substream for `peer`.
async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!();
}
/// Instruct `Notifications` to close substream for `peer`.
async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!();
}
/// Send synchronous `notification` to `peer`.
fn send_sync_notification(&self, _peer: &PeerId, _notification: Vec<u8>) {
unimplemented!();
}
/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
async fn send_async_notification(
&self,
_peer: &PeerId,
_notification: Vec<u8>,
) -> Result<(), sc_network::error::Error> {
unimplemented!();
}
/// Set handshake for the notification protocol replacing the old handshake.
async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!();
}
fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!();
}
/// Get next event from the `Notifications` event stream.
async fn next_event(&mut self) -> Option<NotificationEvent> {
None
}
fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
unimplemented!();
}
fn protocol(&self) -> &ProtocolName {
unimplemented!();
}
fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
unimplemented!();
}
}
#[test]
fn collects_garbage() {
struct AllowOne;
impl Validator<Block> for AllowOne {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
data: &[u8],
) -> ValidationResult<H256> {
if data[0] == 1 {
ValidationResult::ProcessAndKeep(H256::default())
} else {
ValidationResult::Discard
}
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
Box::new(move |_topic, data| data[0] != 1)
}
}
let prev_hash = H256::random();
let best_hash = H256::random();
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let m1_hash = H256::random();
let m2_hash = H256::random();
let m1 = vec![1, 2, 3];
let m2 = vec![4, 5, 6];
push_msg!(consensus, prev_hash, m1_hash, m1);
push_msg!(consensus, best_hash, m2_hash, m2);
consensus.known_messages.insert(m1_hash, ());
consensus.known_messages.insert(m2_hash, ());
consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.known_messages.len(), 2);
consensus.validator = Arc::new(AllowOne);
// m2 is expired
consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 1);
// known messages are only pruned based on size.
assert_eq!(consensus.known_messages.len(), 2);
assert!(consensus.known_messages.get(&m2_hash).is_some());
}
#[test]
fn message_stream_include_those_sent_before_asking() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
// Register message.
let message = vec![4, 5, 6];
let topic = HashingFor::<Block>::hash(&[1, 2, 3]);
consensus.register_message(topic, message.clone());
assert_eq!(
consensus.messages_for(topic).next(),
Some(TopicNotification { message, sender: None }),
);
}
#[test]
fn can_keep_multiple_messages_per_topic() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let topic = [1; 32].into();
let msg_a = vec![1, 2, 3];
let msg_b = vec![4, 5, 6];
consensus.register_message(topic, msg_a);
consensus.register_message(topic, msg_b);
assert_eq!(consensus.messages.len(), 2);
}
#[test]
fn peer_is_removed_on_disconnect() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let mut notification_service: Box<dyn NotificationService> =
Box::new(NoOpNotificationService::default());
let peer_id = PeerId::random();
consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
assert!(consensus.peers.contains_key(&peer_id));
consensus.peer_disconnected(&mut notification_service, peer_id);
assert!(!consensus.peers.contains_key(&peer_id));
}
#[test]
fn on_incoming_ignores_discarded_messages() {
let mut notification_service: Box<dyn NotificationService> =
Box::new(NoOpNotificationService::default());
let to_forward = ConsensusGossip::<Block>::new(Arc::new(DiscardAll), "/foo".into(), None)
.on_incoming(
&mut NoOpNetwork::default(),
&mut notification_service,
PeerId::random(),
vec![vec![1, 2, 3]],
);
assert!(
to_forward.is_empty(),
"Expected `on_incoming` to ignore discarded message but got {:?}",
to_forward,
);
}
#[test]
fn on_incoming_ignores_unregistered_peer() {
let mut network = NoOpNetwork::default();
let mut notification_service: Box<dyn NotificationService> =
Box::new(NoOpNotificationService::default());
let remote = PeerId::random();
let to_forward = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None)
.on_incoming(
&mut network,
&mut notification_service,
// Unregistered peer.
remote,
vec![vec![1, 2, 3]],
);
assert!(
to_forward.is_empty(),
"Expected `on_incoming` to ignore message from unregistered peer but got {:?}",
to_forward,
);
}
// Two peers can send us the same gossip message. We should not report the second peer
// sending the gossip message as long as its the first time the peer send us this message.
#[test]
fn do_not_report_peer_for_first_time_duplicate_gossip_message() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let mut network = NoOpNetwork::default();
let mut notification_service: Box<dyn NotificationService> =
Box::new(NoOpNotificationService::default());
let peer_id = PeerId::random();
consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
assert!(consensus.peers.contains_key(&peer_id));
let peer_id2 = PeerId::random();
consensus.new_peer(&mut notification_service, peer_id2, ObservedRole::Full);
assert!(consensus.peers.contains_key(&peer_id2));
let message = vec![vec![1, 2, 3]];
consensus.on_incoming(&mut network, &mut notification_service, peer_id, message.clone());
consensus.on_incoming(&mut network, &mut notification_service, peer_id2, message.clone());
assert_eq!(
vec![(peer_id, rep::GOSSIP_SUCCESS)],
network.inner.lock().unwrap().peer_reports
);
}
}