Produce block always on updated transaction pool state (#5227)

* make sure return ready iterator once state is updated

* update sc_basic_authorship tests

* update node tests

* fix manual seal

* actually fix service test

* add tests

* Update client/basic-authorship/src/basic_authorship.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* helper function

* review suggestions

* warning and continue

* add debug log

* use futures::chennel::oneshot

* use declaration bound

* no option for updated_at

* no allocation

* ready_at / ready

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Nikolay Volf
2020-03-17 08:24:04 -07:00
committed by GitHub
parent bbf5bc6acf
commit db86094b03
12 changed files with 257 additions and 25 deletions
+2
View File
@@ -5642,6 +5642,7 @@ name = "sc-basic-authorship"
version = "0.8.0-alpha.3"
dependencies = [
"futures 0.3.4",
"futures-timer 3.0.2",
"log 0.4.8",
"parity-scale-codec",
"parking_lot 0.10.0",
@@ -5970,6 +5971,7 @@ dependencies = [
name = "sc-consensus-manual-seal"
version = "0.8.0-alpha.3"
dependencies = [
"assert_matches",
"derive_more",
"env_logger 0.7.1",
"futures 0.3.4",
+30 -1
View File
@@ -397,6 +397,7 @@ mod tests {
use sc_service::AbstractService;
use crate::service::{new_full, new_light};
use sp_runtime::traits::IdentifyAccount;
use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent};
type AccountPublic = <Signature as Verify>::Signer;
@@ -414,7 +415,21 @@ mod tests {
let dummy_runtime = ::tokio::runtime::Runtime::new().unwrap();
let block_factory = |service: &<Factory as service::ServiceFactory>::FullService| {
let block_id = BlockId::number(service.client().chain_info().best_number);
let parent_header = service.client().header(&block_id).unwrap().unwrap();
let parent_header = service.client().best_header(&block_id)
.expect("db error")
.expect("best block should exist");
futures::executor::block_on(
service.transaction_pool().maintain(
ChainEvent::NewBlock {
is_new_best: true,
id: block_id.clone(),
retracted: vec![],
header: parent_header,
},
)
);
let consensus_net = ConsensusNetwork::new(service.network(), service.client().clone());
let proposer_factory = consensus::ProposerFactory {
client: service.client().clone(),
@@ -464,6 +479,8 @@ mod tests {
}
#[test]
// It is "ignored", but the node-cli ignored tests are running on the CI.
// This can be run locally with `cargo test --release -p node-cli test_sync -- --ignored`.
#[ignore]
fn test_sync() {
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
@@ -504,6 +521,18 @@ mod tests {
let parent_header = service.client().header(&parent_id).unwrap().unwrap();
let parent_hash = parent_header.hash();
let parent_number = *parent_header.number();
futures::executor::block_on(
service.transaction_pool().maintain(
ChainEvent::NewBlock {
is_new_best: true,
id: parent_id.clone(),
retracted: vec![],
header: parent_header.clone(),
},
)
);
let mut proposer_factory = sc_basic_authorship::ProposerFactory::new(
service.client(),
service.transaction_pool()
@@ -23,6 +23,7 @@ sc-telemetry = { version = "2.0.0-alpha.2", path = "../telemetry" }
sp-transaction-pool = { version = "2.0.0-alpha.2", path = "../../primitives/transaction-pool" }
sc-block-builder = { version = "0.8.0-alpha.2", path = "../block-builder" }
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
futures-timer = "3.0.1"
[dev-dependencies]
sc-transaction-pool = { version = "2.0.0-alpha.2", path = "../../client/transaction-pool" }
@@ -33,7 +33,7 @@ use sp_transaction_pool::{TransactionPool, InPoolTransaction};
use sc_telemetry::{telemetry, CONSENSUS_INFO};
use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider};
use sp_api::{ProvideRuntimeApi, ApiExt};
use futures::prelude::*;
use futures::{executor, future, future::Either};
use sp_blockchain::{HeaderBackend, ApplyExtrinsicFailed};
use std::marker::PhantomData;
@@ -210,7 +210,18 @@ impl<A, B, Block, C> ProposerInner<B, Block, C, A>
let mut is_first = true;
let mut skipped = 0;
let mut unqueue_invalid = Vec::new();
let pending_iterator = self.transaction_pool.ready();
let pending_iterator = match executor::block_on(future::select(
self.transaction_pool.ready_at(self.parent_number),
futures_timer::Delay::new((deadline - (self.now)()) / 8),
)) {
Either::Left((iterator, _)) => iterator,
Either::Right(_) => {
log::warn!(
"Timeout fired waiting for transaction pool to be ready. Proceeding to block production anyway.",
);
self.transaction_pool.ready()
}
};
debug!("Attempting to push transactions from the pool.");
debug!("Pool status: {:?}", self.transaction_pool.status());
@@ -304,10 +315,12 @@ mod tests {
prelude::*,
runtime::{Extrinsic, Transfer},
};
use sp_transaction_pool::{ChainEvent, MaintainedTransactionPool};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sp_api::Core;
use backend::Backend;
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::NumberFor;
fn extrinsic(nonce: u64) -> Extrinsic {
Transfer {
@@ -318,6 +331,17 @@ mod tests {
}.into_signed_tx()
}
fn chain_event<B: BlockT>(block_number: u64, header: B::Header) -> ChainEvent<B>
where NumberFor<B>: From<u64>
{
ChainEvent::NewBlock {
id: BlockId::Number(block_number.into()),
retracted: vec![],
is_new_best: true,
header: header,
}
}
#[test]
fn should_cease_building_block_when_deadline_is_reached() {
// given
@@ -330,16 +354,27 @@ mod tests {
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)])
).unwrap();
futures::executor::block_on(
txpool.maintain(chain_event(
0,
client.header(&BlockId::Number(0u64)).expect("header get error").expect("there should be header")
))
);
let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
let cell = Mutex::new(time::Instant::now());
let cell = Mutex::new((false, time::Instant::now()));
let mut proposer = proposer_factory.init_with_now(
&client.header(&BlockId::number(0)).unwrap().unwrap(),
Box::new(move || {
let mut value = cell.lock();
let old = *value;
if !value.0 {
value.0 = true;
return value.1;
}
let old = value.1;
let new = old + time::Duration::from_secs(2);
*value = new;
*value = (true, new);
old
})
);
@@ -371,6 +406,13 @@ mod tests {
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0)]),
).unwrap();
futures::executor::block_on(
txpool.maintain(chain_event(
0,
client.header(&BlockId::Number(0u64)).expect("header get error").expect("there should be header")
))
);
let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
let mut proposer = proposer_factory.init_with_now(
@@ -459,15 +501,26 @@ mod tests {
block
};
futures::executor::block_on(
txpool.maintain(chain_event(
0,
client.header(&BlockId::Number(0u64)).expect("header get error").expect("there should be header")
))
);
// let's create one block and import it
let block = propose_block(&client, 0, 2, 7);
client.import(BlockOrigin::Own, block).unwrap();
// now let's make sure that we can still make some progress
futures::executor::block_on(
txpool.maintain(chain_event(
1,
client.header(&BlockId::Number(1)).expect("header get error").expect("there should be header")
))
);
// This is most likely incorrect, and caused by #5139
let tx_remaining = 0;
let block = propose_block(&client, 1, 2, tx_remaining);
// now let's make sure that we can still make some progress
let block = propose_block(&client, 1, 2, 5);
client.import(BlockOrigin::Own, block).unwrap();
}
}
@@ -17,6 +17,7 @@ jsonrpc-derive = "14.0.5"
log = "0.4.8"
parking_lot = "0.10.0"
serde = { version = "1.0", features=["derive"] }
assert_matches = "1.3.0"
sc-client = { path = "../../../client" , version = "0.8.0-alpha.2"}
sc-client-api = { path = "../../../client/api" , version = "2.0.0-alpha.2"}
@@ -224,7 +224,7 @@ mod tests {
txpool::Options,
};
use substrate_test_runtime_transaction_pool::{TestApi, uxt};
use sp_transaction_pool::TransactionPool;
use sp_transaction_pool::{TransactionPool, MaintainedTransactionPool};
use sp_runtime::generic::BlockId;
use sp_blockchain::HeaderBackend;
use sp_consensus::ImportedAux;
@@ -432,14 +432,24 @@ mod tests {
assert!(backend.blockchain().header(BlockId::Number(0)).unwrap().is_some());
assert!(pool.submit_one(&BlockId::Number(1), uxt(Alice, 1)).await.is_ok());
pool.maintain(sp_transaction_pool::ChainEvent::NewBlock {
id: BlockId::Number(1),
header: backend.blockchain().header(BlockId::Number(1)).expect("db error").expect("imported above"),
is_new_best: true,
retracted: vec![],
}).await;
let (tx1, rx1) = futures::channel::oneshot::channel();
assert!(sink.send(EngineCommand::SealNewBlock {
parent_hash: Some(created_block.hash.clone()),
parent_hash: Some(created_block.hash),
sender: Some(tx1),
create_empty: false,
finalize: false,
}).await.is_ok());
assert!(rx1.await.unwrap().is_ok());
assert_matches::assert_matches!(
rx1.await.expect("should be no error receiving"),
Ok(_)
);
assert!(backend.blockchain().header(BlockId::Number(1)).unwrap().is_some());
pool_api.increment_nonce(Alice.into());
+5 -1
View File
@@ -482,7 +482,11 @@ pub fn sync<G, E, Fb, F, Lb, L, B, ExF, U>(
let first_user_data = &network.full_nodes[0].2;
let best_block = BlockId::number(first_service.get().client().chain_info().best_number);
let extrinsic = extrinsic_factory(&first_service.get(), first_user_data);
futures::executor::block_on(first_service.get().transaction_pool().submit_one(&best_block, extrinsic)).unwrap();
futures::executor::block_on(
first_service.get().transaction_pool().submit_one(&best_block, extrinsic)
).expect("failed to submit extrinsic");
network.run_until_all_full(
|_index, service| service.get().transaction_pool().ready().count() == 1,
|_index, _service| true,
@@ -545,7 +545,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// Get an iterator for ready transactions ordered by priority
pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> {
pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> + Send {
self.pool.read().ready()
}
+77 -6
View File
@@ -31,12 +31,12 @@ pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};
use std::{collections::HashMap, sync::Arc, pin::Pin};
use futures::{Future, FutureExt, future::ready};
use futures::{Future, FutureExt, future::ready, channel::oneshot};
use parking_lot::Mutex;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic},
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero},
};
use sp_transaction_pool::{
TransactionPool, PoolStatus, ImportNotificationStream, TxHash, TransactionFor,
@@ -44,6 +44,12 @@ use sp_transaction_pool::{
};
use wasm_timer::Instant;
type BoxedReadyIterator<Hash, Data> = Box<dyn Iterator<Item=Arc<sc_transaction_graph::base_pool::Transaction<Hash, Data>>> + Send>;
type ReadyIteratorFor<PoolApi> = BoxedReadyIterator<sc_transaction_graph::ExHash<PoolApi>, sc_transaction_graph::ExtrinsicFor<PoolApi>>;
type PolledIterator<PoolApi> = Pin<Box<dyn Future<Output=ReadyIteratorFor<PoolApi>> + Send>>;
/// Basic implementation of transaction pool that can be customized by providing PoolApi.
pub struct BasicPool<PoolApi, Block>
where
@@ -54,6 +60,48 @@ pub struct BasicPool<PoolApi, Block>
api: Arc<PoolApi>,
revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
}
struct ReadyPoll<T, Block: BlockT> {
updated_at: NumberFor<Block>,
pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
}
impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
fn default() -> Self {
Self {
updated_at: NumberFor::<Block>::zero(),
pollers: Default::default(),
}
}
}
impl<T, Block: BlockT> ReadyPoll<T, Block> {
fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
self.updated_at = number;
let mut idx = 0;
while idx < self.pollers.len() {
if self.pollers[idx].0 <= number {
let poller_sender = self.pollers.swap_remove(idx);
log::debug!(target: "txpool", "Sending ready signal at block {}", number);
let _ = poller_sender.1.send(iterator_factory());
} else {
idx += 1;
}
}
}
fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
let (sender, receiver) = oneshot::channel();
self.pollers.push((number, sender));
receiver
}
fn updated_at(&self) -> NumberFor<Block> {
self.updated_at
}
}
#[cfg(not(target_os = "unknown"))]
@@ -128,6 +176,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
RevalidationType::Full => RevalidationStrategy::Always,
}
)),
ready_poll: Default::default(),
},
background_task,
)
@@ -196,10 +245,6 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
self.pool.validated_pool().status()
}
fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>> {
Box::new(self.pool.validated_pool().ready())
}
fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
self.pool.validated_pool().import_notification_stream()
}
@@ -215,6 +260,27 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
self.pool.validated_pool().ready_by_hash(hash)
}
fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
if self.ready_poll.lock().updated_at() >= at {
let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
return Box::pin(futures::future::ready(iterator));
}
Box::pin(
self.ready_poll
.lock()
.add(at)
.map(|received| received.unwrap_or_else(|e| {
log::warn!("Error receiving pending set: {:?}", e);
Box::new(vec![].into_iter())
}))
)
}
fn ready(&self) -> ReadyIteratorFor<PoolApi> {
Box::new(self.pool.validated_pool().ready())
}
}
#[cfg_attr(test, derive(Debug))]
@@ -329,6 +395,7 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
let revalidation_strategy = self.revalidation_strategy.clone();
let retracted = retracted.clone();
let revalidation_queue = self.revalidation_queue.clone();
let ready_poll = self.ready_poll.clone();
async move {
// We don't query block if we won't prune anything
@@ -348,6 +415,10 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
}
}
let extra_pool = pool.clone();
// After #5200 lands, this arguably might be moved to the handler of "all blocks notification".
ready_poll.lock().trigger(block_number, move || Box::new(extra_pool.validated_pool().ready()));
if next_action.resubmit {
let mut resubmit_transactions = Vec::new();
@@ -28,6 +28,7 @@ use substrate_test_runtime_client::{
};
use substrate_test_runtime_transaction_pool::{TestApi, uxt};
use crate::revalidation::BACKGROUND_REVALIDATION_INTERVAL;
use futures::task::Poll;
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::with_alice_nonce(209).into())
@@ -600,5 +601,56 @@ fn fork_aware_finalization() {
assert_eq!(stream.next(), Some(TransactionStatus::Finalized(e1.clone())));
assert_eq!(stream.next(), None);
}
}
#[test]
fn ready_set_should_not_resolve_before_block_update() {
let (pool, _guard) = maintained_pool();
let xt1 = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
assert!(pool.ready_at(1).now_or_never().is_none());
}
#[test]
fn ready_set_should_resolve_after_block_update() {
let (pool, _guard) = maintained_pool();
pool.api.push_block(1, vec![]);
let xt1 = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
block_on(pool.maintain(block_event(1)));
assert!(pool.ready_at(1).now_or_never().is_some());
}
#[test]
fn ready_set_should_eventually_resolve_when_block_update_arrives() {
let (pool, _guard) = maintained_pool();
pool.api.push_block(1, vec![]);
let xt1 = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
let noop_waker = futures::task::noop_waker();
let mut context = futures::task::Context::from_waker(&noop_waker);
let mut ready_set_future = pool.ready_at(1);
if let Poll::Ready(_) = ready_set_future.poll_unpin(&mut context) {
panic!("Ready set should not be ready before block update!");
}
block_on(pool.maintain(block_event(1)));
match ready_set_future.poll_unpin(&mut context) {
Poll::Pending => {
panic!("Ready set should become ready after block update!");
},
Poll::Ready(iterator) => {
let data = iterator.collect::<Vec<_>>();
assert_eq!(data.len(), 1);
}
}
}
@@ -127,6 +127,8 @@ pub enum Error {
/// Incomplete block import pipeline.
#[display(fmt = "Incomplete block import pipeline.")]
IncompletePipeline,
#[display(fmt = "Transaction pool not ready for block production.")]
TransactionPoolNotReady,
/// A convenience variant for String
#[display(fmt = "{}", _0)]
Msg(String),
@@ -29,7 +29,7 @@ use futures::{
use serde::{Deserialize, Serialize};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Member},
traits::{Block as BlockT, Member, NumberFor},
transaction_validity::{
TransactionLongevity, TransactionPriority, TransactionTag,
},
@@ -210,8 +210,15 @@ pub trait TransactionPool: Send + Sync {
) -> PoolFuture<Box<TransactionStatusStreamFor<Self>>, Self::Error>;
// *** Block production / Networking
/// Get an iterator for ready transactions ordered by priority
fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>>;
/// Get an iterator for ready transactions ordered by priority.
///
/// Guarantees to return only when transaction pool got updated at `at` block.
/// Guarantees to return immediately when `None` is passed.
fn ready_at(&self, at: NumberFor<Self::Block>)
-> Pin<Box<dyn Future<Output=Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>> + Send>> + Send>>;
/// Get an iterator for ready transactions ordered by priority.
fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>> + Send>;
// *** Block production
/// Remove transactions identified by given hashes (and dependent transactions) from the pool.