mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 21:11:07 +00:00
Switch RPCs to stable futures (#3287)
This commit is contained in:
committed by
Gavin Wood
parent
aa86185648
commit
c792dd358d
@@ -14,11 +14,12 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use futures::{prelude::*, sync::oneshot};
|
||||
use futures::prelude::*;
|
||||
use futures03::{channel::oneshot, compat::Compat};
|
||||
|
||||
/// Wraps around `oneshot::Receiver` and adjusts the error type to produce an internal error if the
|
||||
/// sender gets dropped.
|
||||
pub struct Receiver<T>(pub oneshot::Receiver<T>);
|
||||
pub struct Receiver<T>(pub Compat<oneshot::Receiver<T>>);
|
||||
|
||||
impl<T> Future for Receiver<T> {
|
||||
type Item = T;
|
||||
|
||||
@@ -23,7 +23,7 @@ pub mod helpers;
|
||||
mod tests;
|
||||
|
||||
use crate::helpers::Receiver;
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
use futures03::{channel::{mpsc, oneshot}, compat::Compat};
|
||||
use jsonrpc_derive::rpc;
|
||||
use network;
|
||||
use sr_primitives::traits::{self, Header as HeaderT};
|
||||
@@ -124,18 +124,18 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy
|
||||
fn system_health(&self) -> Receiver<Health> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.send_back.unbounded_send(Request::Health(tx));
|
||||
Receiver(rx)
|
||||
Receiver(Compat::new(rx))
|
||||
}
|
||||
|
||||
fn system_peers(&self) -> Receiver<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.send_back.unbounded_send(Request::Peers(tx));
|
||||
Receiver(rx)
|
||||
Receiver(Compat::new(rx))
|
||||
}
|
||||
|
||||
fn system_network_state(&self) -> Receiver<network::NetworkState> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.send_back.unbounded_send(Request::NetworkState(tx));
|
||||
Receiver(rx)
|
||||
Receiver(Compat::new(rx))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ use network::{self, PeerId};
|
||||
use network::config::Roles;
|
||||
use test_client::runtime::Block;
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{prelude::*, sync::mpsc};
|
||||
use futures03::{prelude::*, channel::mpsc};
|
||||
use std::thread;
|
||||
|
||||
struct Status {
|
||||
@@ -46,7 +46,7 @@ fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> {
|
||||
let should_have_peers = !status.is_dev;
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
thread::spawn(move || {
|
||||
tokio::run(rx.for_each(move |request| {
|
||||
futures03::executor::block_on(rx.for_each(move |request| {
|
||||
match request {
|
||||
Request::Health(sender) => {
|
||||
let _ = sender.send(Health {
|
||||
@@ -82,7 +82,7 @@ fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> {
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
future::ready(())
|
||||
}))
|
||||
});
|
||||
System::new(SystemInfo {
|
||||
|
||||
@@ -32,7 +32,8 @@ use sr_primitives::{
|
||||
use crate::config::Configuration;
|
||||
use primitives::{Blake2Hasher, H256, Pair};
|
||||
use rpc::{self, apis::system::SystemInfo};
|
||||
use futures::{prelude::*, future::Executor, sync::mpsc};
|
||||
use futures::{prelude::*, future::Executor};
|
||||
use futures03::channel::mpsc;
|
||||
|
||||
// Type aliases.
|
||||
// These exist mainly to avoid typing `<F as Factory>::Foo` all over the code.
|
||||
|
||||
@@ -407,7 +407,7 @@ impl<Components: components::Components> Service<Components> {
|
||||
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
|
||||
|
||||
// RPC
|
||||
let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded();
|
||||
let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded();
|
||||
let gen_handler = || {
|
||||
let system_info = rpc::apis::system::SystemInfo {
|
||||
chain_name: config.chain_spec.name().into(),
|
||||
@@ -635,9 +635,13 @@ fn build_network_future<
|
||||
mut network: network::NetworkWorker<ComponentBlock<Components>, S, H>,
|
||||
client: Arc<ComponentClient<Components>>,
|
||||
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<ComponentBlock<Components>>, NetworkState)>>>>,
|
||||
mut rpc_rx: mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<Components>>>,
|
||||
rpc_rx: futures03::channel::mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<Components>>>,
|
||||
should_have_peers: bool,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
// Compatibility shim while we're transitionning to stable Futures.
|
||||
// See https://github.com/paritytech/substrate/issues/3099
|
||||
let mut rpc_rx = futures03::compat::Compat::new(rpc_rx.map(|v| Ok::<_, ()>(v)));
|
||||
|
||||
// Interval at which we send status updates on the status stream.
|
||||
const STATUS_INTERVAL: Duration = Duration::from_millis(5000);
|
||||
let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL);
|
||||
|
||||
Reference in New Issue
Block a user