transaction-pool: drop unpropagable txs if local node cant author blocks (#8048)

* transaction-pool: drop unpropagable txs if local node cant author blocks

* fix test compilation

* transaction-pool: remove unnecessary static bound on CanAuthor

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* rpc-api: add translation for PoolError::Unactionable

* transaction-pool: add test for rejecting unactionable transactions

* basic-authorship: fix doc test

* transaction-pool: fix benchmark compilation

* transaction-pool: rename CanAuthor to IsValidator

* transaction-pool: nit in error message

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
André Silva
2021-02-04 19:18:44 +00:00
committed by GitHub
parent 8e36d87ca8
commit 54def5f3d3
19 changed files with 157 additions and 56 deletions
@@ -52,6 +52,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_handle(),
client.clone(),
+1
View File
@@ -74,6 +74,7 @@ impl core::Benchmark for PoolBenchmark {
let executor = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
true.into(),
None,
executor,
context.client.clone(),
+1
View File
@@ -68,6 +68,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_handle(),
client.clone(),
@@ -408,6 +408,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
@@ -466,6 +467,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
@@ -506,6 +508,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
@@ -573,6 +576,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
@@ -34,6 +34,7 @@
//! # let spawner = sp_core::testing::TaskExecutor::new();
//! # let txpool = BasicPool::new_full(
//! # Default::default(),
//! # true.into(),
//! # None,
//! # spawner.clone(),
//! # client.clone(),
@@ -293,7 +293,7 @@ mod tests {
let inherent_data_providers = InherentDataProviders::new();
let spawner = sp_core::testing::TaskExecutor::new();
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(), api(), None, RevalidationType::Full, spawner.clone(),
Options::default(), true.into(), api(), None, RevalidationType::Full, spawner.clone(),
));
let env = ProposerFactory::new(
spawner.clone(),
@@ -364,7 +364,7 @@ mod tests {
let inherent_data_providers = InherentDataProviders::new();
let spawner = sp_core::testing::TaskExecutor::new();
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(), api(), None, RevalidationType::Full, spawner.clone(),
Options::default(), true.into(), api(), None, RevalidationType::Full, spawner.clone(),
));
let env = ProposerFactory::new(
spawner.clone(),
@@ -439,7 +439,7 @@ mod tests {
let pool_api = api();
let spawner = sp_core::testing::TaskExecutor::new();
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(), pool_api.clone(), None, RevalidationType::Full, spawner.clone(),
Options::default(), true.into(), pool_api.clone(), None, RevalidationType::Full, spawner.clone(),
));
let env = ProposerFactory::new(
spawner.clone(),
+1
View File
@@ -295,6 +295,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = TestPool(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner,
client.clone(),
@@ -99,6 +99,9 @@ const POOL_CYCLE_DETECTED: i64 = POOL_INVALID_TX + 5;
const POOL_IMMEDIATELY_DROPPED: i64 = POOL_INVALID_TX + 6;
/// The key type crypto is not known.
const UNSUPPORTED_KEY_TYPE: i64 = POOL_INVALID_TX + 7;
/// The transaction was not included to the pool since it is unactionable,
/// it is not propagable and the local node does not author blocks.
const POOL_UNACTIONABLE: i64 = POOL_INVALID_TX + 8;
impl From<Error> for rpc::Error {
fn from(e: Error) -> Self {
@@ -158,6 +161,14 @@ impl From<Error> for rpc::Error {
message: "Immediately Dropped".into(),
data: Some("The transaction couldn't enter the pool because of the limit".into()),
},
Error::Pool(PoolError::Unactionable) => rpc::Error {
code: rpc::ErrorCode::ServerError(POOL_UNACTIONABLE),
message: "Unactionable".into(),
data: Some(
"The transaction is unactionable since it is not propagable and \
the local node does not author blocks".into(),
),
},
Error::UnsupportedKeyType => rpc::Error {
code: rpc::ErrorCode::ServerError(UNSUPPORTED_KEY_TYPE),
message: "Unknown key type crypto" .into(),
+1
View File
@@ -65,6 +65,7 @@ impl Default for TestSetup {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner,
client.clone(),
+1
View File
@@ -599,6 +599,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner,
client.clone(),
@@ -164,13 +164,19 @@ fn benchmark_main(c: &mut Criterion) {
c.bench_function("sequential 50 tx", |b| {
b.iter(|| {
bench_configured(Pool::new(Default::default(), TestApi::new_dependant().into()), 50);
bench_configured(
Pool::new(Default::default(), true.into(), TestApi::new_dependant().into()),
50,
);
});
});
c.bench_function("random 100 tx", |b| {
b.iter(|| {
bench_configured(Pool::new(Default::default(), TestApi::default().into()), 100);
bench_configured(
Pool::new(Default::default(), true.into(), TestApi::default().into()),
100,
);
});
});
}
@@ -39,6 +39,6 @@ pub mod watcher;
pub use self::base_pool::Transaction;
pub use self::pool::{
Pool, Options, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash,
BlockHash, NumberFor, TransactionFor, ValidatedTransaction,
BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, IsValidator, NumberFor, Options,
Pool, TransactionFor, ValidatedTransaction,
};
@@ -36,7 +36,7 @@ use wasm_timer::Instant;
use futures::channel::mpsc::Receiver;
use crate::validated_pool::ValidatedPool;
pub use crate::validated_pool::ValidatedTransaction;
pub use crate::validated_pool::{IsValidator, ValidatedTransaction};
/// Modification notification event stream type;
pub type EventStream<H> = Receiver<H>;
@@ -150,9 +150,9 @@ where
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: Arc<B>) -> Self {
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
Pool {
validated_pool: Arc::new(ValidatedPool::new(options, api)),
validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)),
}
}
@@ -497,43 +497,58 @@ mod tests {
) -> Self::ValidationFuture {
let hash = self.hash_and_length(&uxt).0;
let block_number = self.block_id_to_number(at).unwrap().unwrap();
let nonce = uxt.transfer().nonce;
// This is used to control the test flow.
if nonce > 0 {
let opt = self.delay.lock().take();
if let Some(delay) = opt {
if delay.recv().is_err() {
println!("Error waiting for delay!");
let res = match uxt {
Extrinsic::Transfer { transfer, .. } => {
let nonce = transfer.nonce;
// This is used to control the test flow.
if nonce > 0 {
let opt = self.delay.lock().take();
if let Some(delay) = opt {
if delay.recv().is_err() {
println!("Error waiting for delay!");
}
}
}
}
}
if self.invalidate.lock().contains(&hash) {
return futures::future::ready(Ok(InvalidTransaction::Custom(0).into()));
}
if self.invalidate.lock().contains(&hash) {
InvalidTransaction::Custom(0).into()
} else if nonce < block_number {
InvalidTransaction::Stale.into()
} else {
let mut transaction = ValidTransaction {
priority: 4,
requires: if nonce > block_number { vec![vec![nonce as u8 - 1]] } else { vec![] },
provides: if nonce == INVALID_NONCE { vec![] } else { vec![vec![nonce as u8]] },
longevity: 3,
propagate: true,
};
futures::future::ready(if nonce < block_number {
Ok(InvalidTransaction::Stale.into())
} else {
let mut transaction = ValidTransaction {
priority: 4,
requires: if nonce > block_number { vec![vec![nonce as u8 - 1]] } else { vec![] },
provides: if nonce == INVALID_NONCE { vec![] } else { vec![vec![nonce as u8]] },
longevity: 3,
propagate: true,
};
if self.clear_requirements.lock().contains(&hash) {
transaction.requires.clear();
}
if self.clear_requirements.lock().contains(&hash) {
transaction.requires.clear();
}
if self.add_requirements.lock().contains(&hash) {
transaction.requires.push(vec![128]);
}
if self.add_requirements.lock().contains(&hash) {
transaction.requires.push(vec![128]);
}
Ok(transaction)
}
},
Extrinsic::IncludeData(_) => {
Ok(ValidTransaction {
priority: 9001,
requires: vec![],
provides: vec![vec![42]],
longevity: 9001,
propagate: false,
})
},
_ => unimplemented!(),
};
Ok(Ok(transaction))
})
futures::future::ready(Ok(res))
}
/// Returns a block number given the block id.
@@ -579,7 +594,7 @@ mod tests {
}
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default().into())
Pool::new(Default::default(), true.into(), TestApi::default().into())
}
#[test]
@@ -620,6 +635,26 @@ mod tests {
assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
}
#[test]
fn should_reject_unactionable_transactions() {
// given
let pool = Pool::new(
Default::default(),
// the node does not author blocks
false.into(),
TestApi::default().into(),
);
// after validation `IncludeData` will be set to non-propagable
let uxt = Extrinsic::IncludeData(vec![42]);
// when
let res = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt));
// then
assert_matches!(res.unwrap_err(), error::Error::Unactionable);
}
#[test]
fn should_notify_about_pool_events() {
let (stream, hash0, hash1) = {
@@ -722,11 +757,14 @@ mod tests {
count: 100,
total_bytes: 200,
};
let pool = Pool::new(Options {
let options = Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default().into());
};
let pool = Pool::new(options, true.into(), TestApi::default().into());
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
@@ -757,11 +795,14 @@ mod tests {
count: 100,
total_bytes: 10,
};
let pool = Pool::new(Options {
let options = Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default().into());
};
let pool = Pool::new(options, true.into(), TestApi::default().into());
// when
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
@@ -939,11 +980,13 @@ mod tests {
count: 1,
total_bytes: 1000,
};
let pool = Pool::new(Options {
let options = Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default().into());
};
let pool = Pool::new(options, true.into(), TestApi::default().into());
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
@@ -977,7 +1020,7 @@ mod tests {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let pool = Arc::new(Pool::new(Default::default(), api.into()));
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.into()));
// when
let xt = uxt(Transfer {
@@ -90,9 +90,25 @@ pub type ValidatedTransactionFor<B> = ValidatedTransaction<
<B as ChainApi>::Error,
>;
/// A closure that returns true if the local node is a validator that can author blocks.
pub struct IsValidator(Box<dyn Fn() -> bool + Send + Sync>);
impl From<bool> for IsValidator {
fn from(is_validator: bool) -> Self {
IsValidator(Box::new(move || is_validator))
}
}
impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
IsValidator(is_validator)
}
}
/// Pool that deals with validated transactions.
pub struct ValidatedPool<B: ChainApi> {
api: Arc<B>,
is_validator: IsValidator,
options: Options,
listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
pool: RwLock<base::BasePool<
@@ -116,9 +132,10 @@ where
impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: Arc<B>) -> Self {
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
ValidatedPool {
is_validator,
options,
listener: Default::default(),
api,
@@ -183,6 +200,10 @@ impl<B: ChainApi> ValidatedPool<B> {
fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
if !tx.propagate && !(self.is_validator.0)() {
return Err(error::Error::Unactionable.into());
}
let imported = self.pool.write().import(tx)?;
if let base::Imported::Ready { ref hash, .. } = imported {
+6 -4
View File
@@ -163,7 +163,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
pub fn new_test(
pool_api: Arc<PoolApi>,
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>, intervalier::BackSignalControl) {
let pool = Arc::new(sc_transaction_graph::Pool::new(Default::default(), pool_api.clone()));
let pool = Arc::new(sc_transaction_graph::Pool::new(Default::default(), true.into(), pool_api.clone()));
let (revalidation_queue, background_task, notifier) =
revalidation::RevalidationQueue::new_test(pool_api.clone(), pool.clone());
(
@@ -184,12 +184,13 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
/// revalidation type.
pub fn with_revalidation_type(
options: sc_transaction_graph::Options,
is_validator: txpool::IsValidator,
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
revalidation_type: RevalidationType,
spawner: impl SpawnNamed,
) -> Self {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, pool_api.clone()));
let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone()));
let (revalidation_queue, background_task) = match revalidation_type {
RevalidationType::Light => (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
RevalidationType::Full => {
@@ -346,7 +347,7 @@ where
) -> Self {
let pool_api = Arc::new(LightChainApi::new(client, fetcher));
Self::with_revalidation_type(
options, pool_api, prometheus, RevalidationType::Light, spawner,
options, false.into(), pool_api, prometheus, RevalidationType::Light, spawner,
)
}
}
@@ -364,13 +365,14 @@ where
/// Create new basic transaction pool for a full node with the provided api.
pub fn new_full(
options: sc_transaction_graph::Options,
is_validator: txpool::IsValidator,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed,
client: Arc<Client>,
) -> Arc<Self> {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
let pool = Arc::new(Self::with_revalidation_type(
options, pool_api, prometheus, RevalidationType::Full, spawner
options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner
));
// make transaction pool available for off-chain runtime calls.
@@ -370,7 +370,7 @@ mod tests {
fn setup() -> (Arc<TestApi>, Pool<TestApi>) {
let test_api = Arc::new(TestApi::empty());
let pool = Pool::new(Default::default(), test_api.clone());
let pool = Pool::new(Default::default(), true.into(), test_api.clone());
(test_api, pool)
}
@@ -37,7 +37,7 @@ use sc_block_builder::BlockBuilderProvider;
use sp_consensus::BlockOrigin;
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::with_alice_nonce(209).into())
Pool::new(Default::default(), true.into(), TestApi::with_alice_nonce(209).into())
}
fn maintained_pool() -> (
@@ -161,7 +161,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
api.set_valid_modifier(Box::new(|v: &mut ValidTransaction| {
v.provides.push(vec![155]);
}));
let pool = Pool::new(Default::default(), api.clone());
let pool = Pool::new(Default::default(), true.into(), api.clone());
let xt = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.validated_pool().status().ready, 1);
@@ -60,6 +60,9 @@ pub enum Error {
#[error("Transaction couldn't enter the pool because of the limit")]
ImmediatelyDropped,
#[error("Transaction cannot be propagated and the local node does not author blocks")]
Unactionable,
#[from(ignore)]
#[error("{0}")]
InvalidBlockId(String),
@@ -301,6 +301,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner,
client.clone(),
@@ -340,6 +341,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner,
client.clone(),
@@ -363,6 +365,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner,
client.clone(),
@@ -395,6 +398,7 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner,
client.clone(),