mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-15 10:21:05 +00:00
Set reserved nodes with offchain worker. (#6996)
* add offchain worker api to set reserved nodes. * new offchain api to get node public key. * node public key from converter * refactor set reserved nodes ocw api. * new ndoe authorization pallet * remove unnecessary clone and more. * more * tests for node authorization pallet * remove dependency * fix build * more tests. * refactor * Update primitives/core/src/offchain/testing.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * Update frame/node-authorization/src/lib.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * Update frame/node-authorization/src/lib.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * Update frame/node-authorization/src/lib.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * format code * expose NetworkService * remove NetworkStateInfo in offchain * replace NodePublicKey with PeerId. * set max length of peer id. * clear more * use BTreeSet for set of peers. * decode opaque peer id. * extract NetworkProvider for client offchain. * use OpaquePeerId in node authorization pallet. * fix test * better documentation * fix test * doc * more fix * Update primitives/core/src/offchain/mod.rs Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * Update client/offchain/src/api.rs Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * derive serialize and deserialize Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
@@ -20,7 +20,7 @@
|
||||
//!
|
||||
//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
|
||||
//! The [`NetworkWorker`] *is* the network and implements the `Future` trait. It must be polled in
|
||||
//! order fo the network to advance.
|
||||
//! order for the network to advance.
|
||||
//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
|
||||
//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
|
||||
//!
|
||||
@@ -605,6 +605,22 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
|
||||
&self.local_peer_id
|
||||
}
|
||||
|
||||
/// Set authorized peers.
|
||||
///
|
||||
/// Need a better solution to manage authorized peers, but now just use reserved peers for
|
||||
/// prototyping.
|
||||
pub fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
|
||||
self.peerset.set_reserved_peers(peers)
|
||||
}
|
||||
|
||||
/// Set authorized_only flag.
|
||||
///
|
||||
/// Need a better solution to decide authorized_only, but now just use reserved_only flag for
|
||||
/// prototyping.
|
||||
pub fn set_authorized_only(&self, reserved_only: bool) {
|
||||
self.peerset.set_reserved_only(reserved_only)
|
||||
}
|
||||
|
||||
/// Appends a notification to the buffer of pending outgoing notifications with the given peer.
|
||||
/// Has no effect if the notifications channel with this protocol name is not open.
|
||||
///
|
||||
|
||||
@@ -19,16 +19,18 @@ use std::{
|
||||
sync::Arc,
|
||||
convert::TryFrom,
|
||||
thread::sleep,
|
||||
collections::HashSet,
|
||||
};
|
||||
|
||||
use sp_core::offchain::OffchainStorage;
|
||||
use crate::NetworkProvider;
|
||||
use futures::Future;
|
||||
use log::error;
|
||||
use sc_network::{PeerId, Multiaddr, NetworkStateInfo};
|
||||
use sc_network::{PeerId, Multiaddr};
|
||||
use codec::{Encode, Decode};
|
||||
use sp_core::OpaquePeerId;
|
||||
use sp_core::offchain::{
|
||||
Externalities as OffchainExt, HttpRequestId, Timestamp, HttpRequestStatus, HttpError,
|
||||
OpaqueNetworkState, OpaquePeerId, OpaqueMultiaddr, StorageKind,
|
||||
OffchainStorage, OpaqueNetworkState, OpaqueMultiaddr, StorageKind,
|
||||
};
|
||||
pub use sp_offchain::STORAGE_PREFIX;
|
||||
pub use http::SharedClient;
|
||||
@@ -49,8 +51,8 @@ mod timestamp;
|
||||
pub(crate) struct Api<Storage> {
|
||||
/// Offchain Workers database.
|
||||
db: Storage,
|
||||
/// A NetworkState provider.
|
||||
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
|
||||
/// A provider for substrate networking.
|
||||
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
|
||||
/// Is this node a potential validator?
|
||||
is_validator: bool,
|
||||
/// Everything HTTP-related is handled by a different struct.
|
||||
@@ -73,10 +75,10 @@ impl<Storage: OffchainStorage> OffchainExt for Api<Storage> {
|
||||
}
|
||||
|
||||
fn network_state(&self) -> Result<OpaqueNetworkState, ()> {
|
||||
let external_addresses = self.network_state.external_addresses();
|
||||
let external_addresses = self.network_provider.external_addresses();
|
||||
|
||||
let state = NetworkState::new(
|
||||
self.network_state.local_peer_id(),
|
||||
self.network_provider.local_peer_id(),
|
||||
external_addresses,
|
||||
);
|
||||
Ok(OpaqueNetworkState::from(state))
|
||||
@@ -180,6 +182,15 @@ impl<Storage: OffchainStorage> OffchainExt for Api<Storage> {
|
||||
) -> Result<usize, HttpError> {
|
||||
self.http.response_read_body(request_id, buffer, deadline)
|
||||
}
|
||||
|
||||
fn set_authorized_nodes(&mut self, nodes: Vec<OpaquePeerId>, authorized_only: bool) {
|
||||
let peer_ids: HashSet<PeerId> = nodes.into_iter()
|
||||
.filter_map(|node| PeerId::from_bytes(node.0).ok())
|
||||
.collect();
|
||||
|
||||
self.network_provider.set_authorized_peers(peer_ids);
|
||||
self.network_provider.set_authorized_only(authorized_only);
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about the local node's network state.
|
||||
@@ -256,10 +267,10 @@ pub(crate) struct AsyncApi {
|
||||
}
|
||||
|
||||
impl AsyncApi {
|
||||
/// Creates new Offchain extensions API implementation an the asynchronous processing part.
|
||||
/// Creates new Offchain extensions API implementation an the asynchronous processing part.
|
||||
pub fn new<S: OffchainStorage>(
|
||||
db: S,
|
||||
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
|
||||
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
|
||||
is_validator: bool,
|
||||
shared_client: SharedClient,
|
||||
) -> (Api<S>, Self) {
|
||||
@@ -267,7 +278,7 @@ impl AsyncApi {
|
||||
|
||||
let api = Api {
|
||||
db,
|
||||
network_state,
|
||||
network_provider,
|
||||
is_validator,
|
||||
http: http_api,
|
||||
};
|
||||
@@ -292,11 +303,21 @@ mod tests {
|
||||
use super::*;
|
||||
use std::{convert::{TryFrom, TryInto}, time::SystemTime};
|
||||
use sc_client_db::offchain::LocalStorage;
|
||||
use sc_network::PeerId;
|
||||
use sc_network::{NetworkStateInfo, PeerId};
|
||||
|
||||
struct MockNetworkStateInfo();
|
||||
struct TestNetwork();
|
||||
|
||||
impl NetworkStateInfo for MockNetworkStateInfo {
|
||||
impl NetworkProvider for TestNetwork {
|
||||
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn set_authorized_only(&self, _reserved_only: bool) {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkStateInfo for TestNetwork {
|
||||
fn external_addresses(&self) -> Vec<Multiaddr> {
|
||||
Vec::new()
|
||||
}
|
||||
@@ -309,10 +330,9 @@ mod tests {
|
||||
fn offchain_api() -> (Api<LocalStorage>, AsyncApi) {
|
||||
let _ = env_logger::try_init();
|
||||
let db = LocalStorage::new_test();
|
||||
let mock = Arc::new(MockNetworkStateInfo());
|
||||
let mock = Arc::new(TestNetwork());
|
||||
let shared_client = SharedClient::new();
|
||||
|
||||
|
||||
AsyncApi::new(
|
||||
db,
|
||||
mock,
|
||||
|
||||
@@ -33,14 +33,17 @@
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use std::{fmt, marker::PhantomData, sync::Arc};
|
||||
use std::{
|
||||
fmt, marker::PhantomData, sync::Arc,
|
||||
collections::HashSet,
|
||||
};
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use threadpool::ThreadPool;
|
||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||
use futures::future::Future;
|
||||
use log::{debug, warn};
|
||||
use sc_network::NetworkStateInfo;
|
||||
use sc_network::{ExHashT, NetworkService, NetworkStateInfo, PeerId};
|
||||
use sp_core::{offchain::{self, OffchainStorage}, ExecutionContext, traits::SpawnNamed};
|
||||
use sp_runtime::{generic::BlockId, traits::{self, Header}};
|
||||
use futures::{prelude::*, future::ready};
|
||||
@@ -50,6 +53,30 @@ use api::SharedClient;
|
||||
|
||||
pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
|
||||
|
||||
/// NetworkProvider provides [`OffchainWorkers`] with all necessary hooks into the
|
||||
/// underlying Substrate networking.
|
||||
pub trait NetworkProvider: NetworkStateInfo {
|
||||
/// Set the authorized peers.
|
||||
fn set_authorized_peers(&self, peers: HashSet<PeerId>);
|
||||
|
||||
/// Set the authorized only flag.
|
||||
fn set_authorized_only(&self, reserved_only: bool);
|
||||
}
|
||||
|
||||
impl<B, H> NetworkProvider for NetworkService<B, H>
|
||||
where
|
||||
B: traits::Block + 'static,
|
||||
H: ExHashT,
|
||||
{
|
||||
fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
|
||||
self.set_authorized_peers(peers)
|
||||
}
|
||||
|
||||
fn set_authorized_only(&self, reserved_only: bool) {
|
||||
self.set_authorized_only(reserved_only)
|
||||
}
|
||||
}
|
||||
|
||||
/// An offchain workers manager.
|
||||
pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
|
||||
client: Arc<Client>,
|
||||
@@ -98,7 +125,7 @@ impl<Client, Storage, Block> OffchainWorkers<
|
||||
pub fn on_block_imported(
|
||||
&self,
|
||||
header: &Block::Header,
|
||||
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
|
||||
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
|
||||
is_validator: bool,
|
||||
) -> impl Future<Output = ()> {
|
||||
let runtime = self.client.runtime_api();
|
||||
@@ -122,7 +149,7 @@ impl<Client, Storage, Block> OffchainWorkers<
|
||||
if version > 0 {
|
||||
let (api, runner) = api::AsyncApi::new(
|
||||
self.db.clone(),
|
||||
network_state.clone(),
|
||||
network_provider,
|
||||
is_validator,
|
||||
self.shared_client.clone(),
|
||||
);
|
||||
@@ -173,7 +200,7 @@ pub async fn notification_future<Client, Storage, Block, Spawner>(
|
||||
client: Arc<Client>,
|
||||
offchain: Arc<OffchainWorkers<Client, Storage, Block>>,
|
||||
spawner: Spawner,
|
||||
network_state_info: Arc<dyn NetworkStateInfo + Send + Sync>,
|
||||
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
|
||||
)
|
||||
where
|
||||
Block: traits::Block,
|
||||
@@ -188,7 +215,7 @@ pub async fn notification_future<Client, Storage, Block, Spawner>(
|
||||
"offchain-on-block",
|
||||
offchain.on_block_imported(
|
||||
&n.header,
|
||||
network_state_info.clone(),
|
||||
network_provider.clone(),
|
||||
is_validator,
|
||||
).boxed(),
|
||||
);
|
||||
@@ -213,9 +240,9 @@ mod tests {
|
||||
use sc_transaction_pool::{BasicPool, FullChainApi};
|
||||
use sp_transaction_pool::{TransactionPool, InPoolTransaction};
|
||||
|
||||
struct MockNetworkStateInfo();
|
||||
struct TestNetwork();
|
||||
|
||||
impl NetworkStateInfo for MockNetworkStateInfo {
|
||||
impl NetworkStateInfo for TestNetwork {
|
||||
fn external_addresses(&self) -> Vec<Multiaddr> {
|
||||
Vec::new()
|
||||
}
|
||||
@@ -225,6 +252,16 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkProvider for TestNetwork {
|
||||
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn set_authorized_only(&self, _reserved_only: bool) {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
struct TestPool(
|
||||
Arc<BasicPool<FullChainApi<TestClient, Block>, Block>>
|
||||
);
|
||||
@@ -255,12 +292,14 @@ mod tests {
|
||||
client.clone(),
|
||||
));
|
||||
let db = sc_client_db::offchain::LocalStorage::new_test();
|
||||
let network_state = Arc::new(MockNetworkStateInfo());
|
||||
let network = Arc::new(TestNetwork());
|
||||
let header = client.header(&BlockId::number(0)).unwrap().unwrap();
|
||||
|
||||
// when
|
||||
let offchain = OffchainWorkers::new(client, db);
|
||||
futures::executor::block_on(offchain.on_block_imported(&header, network_state, false));
|
||||
futures::executor::block_on(
|
||||
offchain.on_block_imported(&header, network, false)
|
||||
);
|
||||
|
||||
// then
|
||||
assert_eq!(pool.0.status().ready, 1);
|
||||
|
||||
@@ -45,6 +45,7 @@ const FORGET_AFTER: Duration = Duration::from_secs(3600);
|
||||
enum Action {
|
||||
AddReservedPeer(PeerId),
|
||||
RemoveReservedPeer(PeerId),
|
||||
SetReservedPeers(HashSet<PeerId>),
|
||||
SetReservedOnly(bool),
|
||||
ReportPeer(PeerId, ReputationChange),
|
||||
SetPriorityGroup(String, HashSet<PeerId>),
|
||||
@@ -102,6 +103,11 @@ impl PeersetHandle {
|
||||
pub fn set_reserved_only(&self, reserved: bool) {
|
||||
let _ = self.tx.unbounded_send(Action::SetReservedOnly(reserved));
|
||||
}
|
||||
|
||||
/// Set reserved peers to the new set.
|
||||
pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
|
||||
let _ = self.tx.unbounded_send(Action::SetReservedPeers(peer_ids));
|
||||
}
|
||||
|
||||
/// Reports an adjustment to the reputation of the given peer.
|
||||
pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) {
|
||||
@@ -246,6 +252,10 @@ impl Peerset {
|
||||
fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
|
||||
self.on_remove_from_priority_group(RESERVED_NODES, peer_id);
|
||||
}
|
||||
|
||||
fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
|
||||
self.on_set_priority_group(RESERVED_NODES, peer_ids);
|
||||
}
|
||||
|
||||
fn on_set_reserved_only(&mut self, reserved_only: bool) {
|
||||
self.reserved_only = reserved_only;
|
||||
@@ -655,6 +665,8 @@ impl Stream for Peerset {
|
||||
self.on_add_reserved_peer(peer_id),
|
||||
Action::RemoveReservedPeer(peer_id) =>
|
||||
self.on_remove_reserved_peer(peer_id),
|
||||
Action::SetReservedPeers(peer_ids) =>
|
||||
self.on_set_reserved_peers(peer_ids),
|
||||
Action::SetReservedOnly(reserved) =>
|
||||
self.on_set_reserved_only(reserved),
|
||||
Action::ReportPeer(peer_id, score_diff) =>
|
||||
|
||||
@@ -431,7 +431,7 @@ pub fn build_offchain_workers<TBl, TBackend, TCl>(
|
||||
client.clone(),
|
||||
offchain,
|
||||
Clone::clone(&spawn_handle),
|
||||
network.clone()
|
||||
network.clone(),
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user