mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-23 15:31:08 +00:00
Hook RPC extrinsic import into propagation (#158)
* call `on_new_transactions` when we import * fix trace * pass correct bytes to network * clean up * cull before repropagating; repropagate on timer * add a little tracing
This commit is contained in:
committed by
Gav Wood
parent
413ebf3f19
commit
f997a3bdf1
Generated
+3
@@ -1297,16 +1297,19 @@ dependencies = [
|
|||||||
"hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"polkadot-executor 0.1.0",
|
"polkadot-executor 0.1.0",
|
||||||
"polkadot-primitives 0.1.0",
|
"polkadot-primitives 0.1.0",
|
||||||
"polkadot-runtime 0.1.0",
|
"polkadot-runtime 0.1.0",
|
||||||
"polkadot-service 0.1.0",
|
"polkadot-service 0.1.0",
|
||||||
|
"polkadot-transaction-pool 0.1.0",
|
||||||
"regex 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
"regex 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"substrate-client 0.1.0",
|
"substrate-client 0.1.0",
|
||||||
"substrate-codec 0.1.0",
|
"substrate-codec 0.1.0",
|
||||||
"substrate-executor 0.1.0",
|
"substrate-executor 0.1.0",
|
||||||
"substrate-network 0.1.0",
|
"substrate-network 0.1.0",
|
||||||
"substrate-primitives 0.1.0",
|
"substrate-primitives 0.1.0",
|
||||||
|
"substrate-rpc 0.1.0",
|
||||||
"substrate-rpc-servers 0.1.0",
|
"substrate-rpc-servers 0.1.0",
|
||||||
"substrate-runtime-support 0.1.0",
|
"substrate-runtime-support 0.1.0",
|
||||||
"substrate-state-machine 0.1.0",
|
"substrate-state-machine 0.1.0",
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ tokio-core = "0.1.12"
|
|||||||
futures = "0.1.17"
|
futures = "0.1.17"
|
||||||
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
|
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
|
||||||
fdlimit = "0.1"
|
fdlimit = "0.1"
|
||||||
|
parking_lot = "0.4"
|
||||||
substrate-client = { path = "../../substrate/client" }
|
substrate-client = { path = "../../substrate/client" }
|
||||||
substrate-network = { path = "../../substrate/network" }
|
substrate-network = { path = "../../substrate/network" }
|
||||||
substrate-codec = { path = "../../substrate/codec" }
|
substrate-codec = { path = "../../substrate/codec" }
|
||||||
@@ -29,8 +30,10 @@ substrate-runtime-support = { path = "../../substrate/runtime-support" }
|
|||||||
substrate-state-machine = { path = "../../substrate/state-machine" }
|
substrate-state-machine = { path = "../../substrate/state-machine" }
|
||||||
substrate-executor = { path = "../../substrate/executor" }
|
substrate-executor = { path = "../../substrate/executor" }
|
||||||
substrate-primitives = { path = "../../substrate/primitives" }
|
substrate-primitives = { path = "../../substrate/primitives" }
|
||||||
|
substrate-rpc = { path = "../../substrate/rpc" }
|
||||||
substrate-rpc-servers = { path = "../../substrate/rpc-servers" }
|
substrate-rpc-servers = { path = "../../substrate/rpc-servers" }
|
||||||
polkadot-primitives = { path = "../primitives" }
|
polkadot-primitives = { path = "../primitives" }
|
||||||
polkadot-executor = { path = "../executor" }
|
polkadot-executor = { path = "../executor" }
|
||||||
polkadot-runtime = { path = "../runtime" }
|
polkadot-runtime = { path = "../runtime" }
|
||||||
polkadot-service = { path = "../service" }
|
polkadot-service = { path = "../service" }
|
||||||
|
polkadot-transaction-pool = { path = "../transaction-pool" }
|
||||||
|
|||||||
@@ -30,17 +30,21 @@ extern crate ctrlc;
|
|||||||
extern crate fdlimit;
|
extern crate fdlimit;
|
||||||
extern crate ed25519;
|
extern crate ed25519;
|
||||||
extern crate triehash;
|
extern crate triehash;
|
||||||
|
extern crate parking_lot;
|
||||||
|
|
||||||
extern crate substrate_codec as codec;
|
extern crate substrate_codec as codec;
|
||||||
extern crate substrate_state_machine as state_machine;
|
extern crate substrate_state_machine as state_machine;
|
||||||
extern crate substrate_client as client;
|
extern crate substrate_client as client;
|
||||||
extern crate substrate_primitives as primitives;
|
extern crate substrate_primitives as primitives;
|
||||||
extern crate substrate_network as network;
|
extern crate substrate_network as network;
|
||||||
|
extern crate substrate_rpc;
|
||||||
extern crate substrate_rpc_servers as rpc;
|
extern crate substrate_rpc_servers as rpc;
|
||||||
extern crate substrate_runtime_support as runtime_support;
|
extern crate substrate_runtime_support as runtime_support;
|
||||||
extern crate polkadot_primitives;
|
extern crate polkadot_primitives;
|
||||||
extern crate polkadot_executor;
|
extern crate polkadot_executor;
|
||||||
extern crate polkadot_runtime;
|
extern crate polkadot_runtime;
|
||||||
extern crate polkadot_service as service;
|
extern crate polkadot_service as service;
|
||||||
|
extern crate polkadot_transaction_pool as txpool;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate lazy_static;
|
extern crate lazy_static;
|
||||||
@@ -57,10 +61,39 @@ mod informant;
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::sync::mpsc;
|
use futures::sync::mpsc;
|
||||||
use futures::{Sink, Future, Stream};
|
use futures::{Sink, Future, Stream};
|
||||||
use tokio_core::reactor;
|
use tokio_core::reactor;
|
||||||
|
use parking_lot::Mutex;
|
||||||
use service::ChainSpec;
|
use service::ChainSpec;
|
||||||
|
use primitives::block::Extrinsic;
|
||||||
|
|
||||||
|
struct RpcTransactionPool {
|
||||||
|
inner: Arc<Mutex<txpool::TransactionPool>>,
|
||||||
|
network: Arc<network::Service>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl substrate_rpc::author::AuthorApi for RpcTransactionPool {
|
||||||
|
fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
|
||||||
|
use primitives::hexdisplay::HexDisplay;
|
||||||
|
use polkadot_runtime::UncheckedExtrinsic;
|
||||||
|
use codec::Slicable;
|
||||||
|
|
||||||
|
info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
|
||||||
|
let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
|
||||||
|
.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;
|
||||||
|
|
||||||
|
info!("Correctly formatted: {:?}", decoded);
|
||||||
|
|
||||||
|
self.inner.lock().import(decoded)
|
||||||
|
.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?;
|
||||||
|
|
||||||
|
self.network.trigger_repropagate();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse command line arguments and start the node.
|
/// Parse command line arguments and start the node.
|
||||||
///
|
///
|
||||||
@@ -172,7 +205,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
|
|||||||
|
|
||||||
let handler = || {
|
let handler = || {
|
||||||
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
|
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
|
||||||
rpc::rpc_handler(service.client(), chain, service.transaction_pool())
|
let pool = RpcTransactionPool {
|
||||||
|
inner: service.transaction_pool(),
|
||||||
|
network: service.network(),
|
||||||
|
};
|
||||||
|
rpc::rpc_handler(service.client(), chain, pool)
|
||||||
};
|
};
|
||||||
(
|
(
|
||||||
start_server(http_address, |address| rpc::start_http(address, handler())),
|
start_server(http_address, |address| rpc::start_http(address, handler())),
|
||||||
|
|||||||
@@ -17,7 +17,6 @@
|
|||||||
extern crate ed25519;
|
extern crate ed25519;
|
||||||
extern crate ethereum_types;
|
extern crate ethereum_types;
|
||||||
extern crate substrate_codec as codec;
|
extern crate substrate_codec as codec;
|
||||||
extern crate substrate_rpc;
|
|
||||||
extern crate substrate_primitives as substrate_primitives;
|
extern crate substrate_primitives as substrate_primitives;
|
||||||
extern crate substrate_runtime_primitives as substrate_runtime_primitives;
|
extern crate substrate_runtime_primitives as substrate_runtime_primitives;
|
||||||
extern crate polkadot_runtime as runtime;
|
extern crate polkadot_runtime as runtime;
|
||||||
@@ -35,10 +34,8 @@ use std::collections::HashMap;
|
|||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use codec::Slicable;
|
|
||||||
use polkadot_api::PolkadotApi;
|
use polkadot_api::PolkadotApi;
|
||||||
use primitives::{AccountId, Timestamp};
|
use primitives::{AccountId, Timestamp};
|
||||||
use substrate_primitives::block::Extrinsic;
|
|
||||||
use runtime::{Block, UncheckedExtrinsic, TimestampCall, Call};
|
use runtime::{Block, UncheckedExtrinsic, TimestampCall, Call};
|
||||||
use substrate_runtime_primitives::traits::Checkable;
|
use substrate_runtime_primitives::traits::Checkable;
|
||||||
use transaction_pool::{Pool, Readiness};
|
use transaction_pool::{Pool, Readiness};
|
||||||
@@ -380,19 +377,6 @@ impl TransactionPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl substrate_rpc::author::AsyncAuthorApi for TransactionPool {
|
|
||||||
fn submit_extrinsic(&mut self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
|
|
||||||
use substrate_primitives::hexdisplay::HexDisplay;
|
|
||||||
info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
|
|
||||||
let xt = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
|
|
||||||
.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;
|
|
||||||
info!("Correctly formatted: {:?}", xt);
|
|
||||||
self.import(xt)
|
|
||||||
.map(|_| ())
|
|
||||||
.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -434,7 +434,7 @@ impl Protocol {
|
|||||||
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
|
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
trace!(target: "sync", "Received {} transactions from {}", peer_id, transactions.len());
|
trace!(target: "sync", "Received {} transactions from {}", transactions.len(), peer_id);
|
||||||
let mut peers = self.peers.write();
|
let mut peers = self.peers.write();
|
||||||
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
|
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
|
||||||
for t in transactions {
|
for t in transactions {
|
||||||
@@ -445,12 +445,17 @@ impl Protocol {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Called when peer sends us new transactions
|
/// Called when we propagate ready transactions to peers.
|
||||||
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
|
pub fn propagate_transactions(&self, io: &mut SyncIo) {
|
||||||
|
debug!(target: "sync", "Propagating transactions");
|
||||||
|
|
||||||
// Accept transactions only when fully synced
|
// Accept transactions only when fully synced
|
||||||
if self.sync.read().status().state != SyncState::Idle {
|
if self.sync.read().status().state != SyncState::Idle {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let transactions = self.transaction_pool.transactions();
|
||||||
|
|
||||||
let mut peers = self.peers.write();
|
let mut peers = self.peers.write();
|
||||||
for (peer_id, ref mut peer) in peers.iter_mut() {
|
for (peer_id, ref mut peer) in peers.iter_mut() {
|
||||||
let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)|
|
let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)|
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::collections::{BTreeMap};
|
use std::collections::{BTreeMap};
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::time::Duration;
|
||||||
use futures::sync::{oneshot, mpsc};
|
use futures::sync::{oneshot, mpsc};
|
||||||
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
|
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
|
||||||
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
|
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
|
||||||
@@ -41,6 +42,12 @@ pub type StatementStream = mpsc::UnboundedReceiver<Statement>;
|
|||||||
/// Type that represents bft messages stream.
|
/// Type that represents bft messages stream.
|
||||||
pub type BftMessageStream = mpsc::UnboundedReceiver<LocalizedBftMessage>;
|
pub type BftMessageStream = mpsc::UnboundedReceiver<LocalizedBftMessage>;
|
||||||
|
|
||||||
|
const TICK_TOKEN: TimerToken = 0;
|
||||||
|
const TICK_TIMEOUT: Duration = Duration::from_millis(1000);
|
||||||
|
|
||||||
|
const PROPAGATE_TOKEN: TimerToken = 1;
|
||||||
|
const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||||
|
|
||||||
bitflags! {
|
bitflags! {
|
||||||
/// Node roles bitmask.
|
/// Node roles bitmask.
|
||||||
pub struct Role: u32 {
|
pub struct Role: u32 {
|
||||||
@@ -162,9 +169,9 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Called when new transactons are imported by the client.
|
/// Called when new transactons are imported by the client.
|
||||||
pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
|
pub fn trigger_repropagate(&self) {
|
||||||
self.network.with_context(DOT_PROTOCOL_ID, |context| {
|
self.network.with_context(DOT_PROTOCOL_ID, |context| {
|
||||||
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions);
|
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -268,7 +275,11 @@ impl ConsensusService for Service {
|
|||||||
|
|
||||||
impl NetworkProtocolHandler for ProtocolHandler {
|
impl NetworkProtocolHandler for ProtocolHandler {
|
||||||
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
|
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
|
||||||
io.register_timer(0, ::std::time::Duration::from_millis(1000)).expect("Error registering sync timer");
|
io.register_timer(TICK_TOKEN, TICK_TIMEOUT)
|
||||||
|
.expect("Error registering sync timer");
|
||||||
|
|
||||||
|
io.register_timer(PROPAGATE_TOKEN, PROPAGATE_TIMEOUT)
|
||||||
|
.expect("Error registering transaction propagation timer");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) {
|
fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) {
|
||||||
@@ -283,8 +294,12 @@ impl NetworkProtocolHandler for ProtocolHandler {
|
|||||||
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
|
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
|
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
|
||||||
self.protocol.tick(&mut NetSyncIo::new(io));
|
match timer {
|
||||||
|
TICK_TOKEN => self.protocol.tick(&mut NetSyncIo::new(io)),
|
||||||
|
PROPAGATE_TOKEN => self.protocol.propagate_transactions(&mut NetSyncIo::new(io)),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,8 +16,6 @@
|
|||||||
|
|
||||||
//! Substrate block-author/full-node API.
|
//! Substrate block-author/full-node API.
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use parking_lot::Mutex;
|
|
||||||
use primitives::block::Extrinsic;
|
use primitives::block::Extrinsic;
|
||||||
|
|
||||||
pub mod error;
|
pub mod error;
|
||||||
@@ -35,15 +33,3 @@ build_rpc_trait! {
|
|||||||
fn submit_extrinsic(&self, Extrinsic) -> Result<()>;
|
fn submit_extrinsic(&self, Extrinsic) -> Result<()>;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Variant of the AuthorApi that doesn't need to be Sync + Send + 'static.
|
|
||||||
pub trait AsyncAuthorApi: Send + 'static {
|
|
||||||
/// Submit extrinsic for inclusion in block.
|
|
||||||
fn submit_extrinsic(&mut self, Extrinsic) -> Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncAuthorApi> AuthorApi for Arc<Mutex<T>> {
|
|
||||||
fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> {
|
|
||||||
self.as_ref().lock().submit_extrinsic(xt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -14,20 +14,24 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use primitives::block;
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use super::error::*;
|
use super::error::*;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use primitives::block;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct DummyTxPool {
|
struct DummyTxPool {
|
||||||
submitted: Vec<block::Extrinsic>,
|
submitted: Vec<block::Extrinsic>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncAuthorApi for DummyTxPool {
|
impl AuthorApi for Arc<Mutex<DummyTxPool>> {
|
||||||
/// Submit extrinsic for inclusion in block.
|
/// Submit extrinsic for inclusion in block.
|
||||||
fn submit_extrinsic(&mut self, xt: Extrinsic) -> Result<()> {
|
fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> {
|
||||||
if self.submitted.len() < 1 {
|
let mut s = self.lock();
|
||||||
self.submitted.push(xt);
|
if s.submitted.len() < 1 {
|
||||||
|
s.submitted.push(xt);
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(ErrorKind::PoolError.into())
|
Err(ErrorKind::PoolError.into())
|
||||||
|
|||||||
Reference in New Issue
Block a user