Simplify trait bounds in network to prepare for collator-rpc (#12082)

* Hack towards PoC

* Abstract away runtime requirement

* blockchainevents

* Remove bitswap

* Remove unused sync more

* Remove unused features in network

* Re-enable bitswap change

* Remove `Chain` trait bound

* Reimplement blockchain-rpc-events

* Move network to cumulus

* Make AuthorityDiscovery async

* Remove `ProofProvider` requirement from network behaviour

* Extract bitswap

* Adjustments after merge

* Remove HeaderMetadata trait from network

* Introduce NetworkHeaderBackend

* Add comments

* Improve comments

* Move NetworkHeaderBackend to new module

* Improve naming, remove redundand send + sync

* Clean up generics

* Fix CI

* Improve comment and readability

* Remove NetworkHeaderBackend

* Fix Cargo.lock

Co-authored-by: Sebastian Kunert <skunert@Sebastians-MacBook-Pro.fritz.box>
This commit is contained in:
Sebastian Kunert
2022-08-31 14:55:46 +02:00
committed by GitHub
parent 30951822ba
commit 800bc5cd8c
14 changed files with 165 additions and 155 deletions
+11 -60
View File
@@ -39,7 +39,6 @@ use libp2p::{
};
use log::debug;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::import_queue::{IncomingBlock, Origin};
use sc_network_common::{
config::ProtocolId,
@@ -47,7 +46,7 @@ use sc_network_common::{
request_responses::{IfDisconnected, ProtocolConfig, RequestFailure},
};
use sc_peerset::PeersetHandle;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
@@ -69,13 +68,7 @@ pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, R
pub struct Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
/// All the substrate-specific protocols.
substrate: Protocol<B, Client>,
@@ -85,7 +78,7 @@ where
/// Discovers nodes of the network.
discovery: DiscoveryBehaviour,
/// Bitswap server for blockchain data.
bitswap: Toggle<Bitswap<B, Client>>,
bitswap: Toggle<Bitswap<B>>,
/// Generic request-response protocols.
request_responses: request_responses::RequestResponsesBehaviour,
@@ -208,13 +201,7 @@ pub enum BehaviourOut<B: BlockT> {
impl<B, Client> Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
/// Builds a new `Behaviour`.
pub fn new(
@@ -225,7 +212,7 @@ where
block_request_protocol_config: ProtocolConfig,
state_request_protocol_config: ProtocolConfig,
warp_sync_protocol_config: Option<ProtocolConfig>,
bitswap: Option<Bitswap<B, Client>>,
bitswap: Option<Bitswap<B>>,
light_client_request_protocol_config: ProtocolConfig,
// All remaining request protocol configs.
mut request_response_protocols: Vec<ProtocolConfig>,
@@ -352,13 +339,7 @@ fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole {
impl<B, Client> NetworkBehaviourEventProcess<void::Void> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
@@ -368,13 +349,7 @@ where
impl<B, Client> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, event: CustomMessageOutcome<B>) {
match event {
@@ -483,13 +458,7 @@ where
impl<B, Client> NetworkBehaviourEventProcess<request_responses::Event> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, event: request_responses::Event) {
match event {
@@ -515,13 +484,7 @@ where
impl<B, Client> NetworkBehaviourEventProcess<peer_info::PeerInfoEvent> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, event: peer_info::PeerInfoEvent) {
let peer_info::PeerInfoEvent::Identified {
@@ -548,13 +511,7 @@ where
impl<B, Client> NetworkBehaviourEventProcess<DiscoveryOut> for Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
fn inject_event(&mut self, out: DiscoveryOut) {
match out {
@@ -592,13 +549,7 @@ where
impl<B, Client> Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
fn poll(
&mut self,
+84 -8
View File
@@ -178,24 +178,99 @@ impl Prefix {
}
}
/// Bitswap trait
pub trait BitswapT<B: BlockT> {
/// Get single indexed transaction by content hash.
///
/// Note that this will only fetch transactions
/// that are indexed by the runtime with `storage_index_transaction`.
fn indexed_transaction(
&self,
hash: <B as BlockT>::Hash,
) -> sp_blockchain::Result<Option<Vec<u8>>>;
/// Queue of blocks ready to be sent out on `poll()`
fn ready_blocks(&mut self) -> &mut VecDeque<(PeerId, BitswapMessage)>;
}
/// Network behaviour that handles sending and receiving IPFS blocks.
pub struct Bitswap<B, Client> {
struct BitswapInternal<B, Client> {
client: Arc<Client>,
ready_blocks: VecDeque<(PeerId, BitswapMessage)>,
_block: PhantomData<B>,
}
impl<B, Client> Bitswap<B, Client> {
impl<B, Client> BitswapInternal<B, Client> {
/// Create a new instance of the bitswap protocol handler.
pub fn new(client: Arc<Client>) -> Self {
Self { client, ready_blocks: Default::default(), _block: PhantomData::default() }
}
}
impl<B, Client> NetworkBehaviour for Bitswap<B, Client>
impl<Block, Client> BitswapT<Block> for BitswapInternal<Block, Client>
where
Block: BlockT,
Client: BlockBackend<Block>,
{
fn indexed_transaction(
&self,
hash: <Block as BlockT>::Hash,
) -> sp_blockchain::Result<Option<Vec<u8>>> {
self.client.indexed_transaction(&hash)
}
fn ready_blocks(&mut self) -> &mut VecDeque<(PeerId, BitswapMessage)> {
&mut self.ready_blocks
}
}
/// Wrapper for bitswap trait object implement NetworkBehaviour
pub struct Bitswap<Block: BlockT> {
inner: Box<dyn BitswapT<Block> + Sync + Send>,
}
impl<B: BlockT> Bitswap<B> {
/// Create new Bitswap wrapper
pub fn from_client<Client: BlockBackend<B> + Send + Sync + 'static>(
client: Arc<Client>,
) -> Self {
let inner = Box::new(BitswapInternal::new(client)) as Box<_>;
Self { inner }
}
}
impl<Block: BlockT> BitswapT<Block> for Bitswap<Block> {
fn indexed_transaction(
&self,
hash: <Block as BlockT>::Hash,
) -> sp_blockchain::Result<Option<Vec<u8>>> {
self.inner.indexed_transaction(hash)
}
fn ready_blocks(&mut self) -> &mut VecDeque<(PeerId, BitswapMessage)> {
self.inner.ready_blocks()
}
}
impl<Block: BlockT, T> BitswapT<Block> for Box<T>
where
T: BitswapT<Block>,
{
fn indexed_transaction(
&self,
hash: <Block as BlockT>::Hash,
) -> sp_blockchain::Result<Option<Vec<u8>>> {
T::indexed_transaction(self, hash)
}
fn ready_blocks(&mut self) -> &mut VecDeque<(PeerId, BitswapMessage)> {
T::ready_blocks(self)
}
}
impl<B> NetworkBehaviour for Bitswap<B>
where
B: BlockT,
Client: BlockBackend<B> + Send + Sync + 'static,
{
type ConnectionHandler = OneShotHandler<BitswapConfig, BitswapMessage, HandlerEvent>;
type OutEvent = void::Void;
@@ -214,10 +289,11 @@ where
HandlerEvent::Request(msg) => msg,
};
trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
if self.ready_blocks.len() > MAX_RESPONSE_QUEUE {
if self.ready_blocks().len() > MAX_RESPONSE_QUEUE {
debug!(target: LOG_TARGET, "Ignored request: queue is full");
return
}
let mut response = BitswapMessage {
wantlist: None,
blocks: Default::default(),
@@ -253,7 +329,7 @@ where
}
let mut hash = B::Hash::default();
hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
let transaction = match self.client.indexed_transaction(&hash) {
let transaction = match self.indexed_transaction(hash) {
Ok(ex) => ex,
Err(e) => {
error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
@@ -292,7 +368,7 @@ where
}
}
trace!(target: LOG_TARGET, "Response: {:?}", response);
self.ready_blocks.push_back((peer, response));
self.ready_blocks().push_back((peer, response));
}
fn poll(
@@ -300,7 +376,7 @@ where
_ctx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some((peer_id, message)) = self.ready_blocks.pop_front() {
if let Some((peer_id, message)) = self.ready_blocks().pop_front() {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
+4 -1
View File
@@ -31,7 +31,7 @@ pub use sc_network_common::{
pub use libp2p::{build_multiaddr, core::PublicKey, identity};
use crate::ExHashT;
use crate::{bitswap::Bitswap, ExHashT};
use core::{fmt, iter};
use futures::future;
@@ -80,6 +80,9 @@ where
/// Client that contains the blockchain.
pub chain: Arc<Client>,
/// Bitswap block request protocol implementation.
pub bitswap: Option<Bitswap<B>>,
/// Pool of transactions.
///
/// The network worker will fetch transactions from this object in order to propagate them on
+4 -17
View File
@@ -40,7 +40,7 @@ use message::{
};
use notifications::{Notifications, NotificationsOut};
use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_client_api::HeaderBackend;
use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin};
use sc_network_common::{
config::ProtocolId,
@@ -56,7 +56,6 @@ use sc_network_common::{
},
};
use sp_arithmetic::traits::SaturatedConversion;
use sp_blockchain::HeaderMetadata;
use sp_consensus::BlockOrigin;
use sp_runtime::{
generic::BlockId,
@@ -262,13 +261,7 @@ impl<B: BlockT> BlockAnnouncesHandshake<B> {
impl<B, Client> Protocol<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
/// Create a new instance.
pub fn new(
@@ -373,7 +366,7 @@ where
let block_announces_protocol = {
let genesis_hash =
chain.block_hash(0u32.into()).ok().flatten().expect("Genesis block exists; qed");
chain.hash(0u32.into()).ok().flatten().expect("Genesis block exists; qed");
if let Some(fork_id) = fork_id {
format!("/{}/{}/block-announces/1", hex::encode(genesis_hash), fork_id)
} else {
@@ -1318,13 +1311,7 @@ pub enum CustomMessageOutcome<B: BlockT> {
impl<B, Client> NetworkBehaviour for Protocol<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
type OutEvent = CustomMessageOutcome<B>;
+9 -49
View File
@@ -29,7 +29,6 @@
use crate::{
behaviour::{self, Behaviour, BehaviourOut},
bitswap::Bitswap,
config::{Params, TransportConfig},
discovery::DiscoveryConfig,
error::Error,
@@ -59,7 +58,6 @@ use libp2p::{
use log::{debug, error, info, trace, warn};
use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
use parking_lot::Mutex;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link};
use sc_network_common::{
config::MultiaddrWithPeerId,
@@ -75,7 +73,7 @@ use sc_network_common::{
};
use sc_peerset::PeersetHandle;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{
borrow::Cow,
@@ -137,13 +135,7 @@ impl<B, H, Client> NetworkWorker<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: sp_blockchain::HeaderBackend<B> + 'static,
{
/// Creates the network service.
///
@@ -220,7 +212,7 @@ where
params.protocol_id.clone(),
params
.chain
.block_hash(0u32.into())
.hash(0u32.into())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
@@ -374,7 +366,6 @@ where
};
let behaviour = {
let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(params.chain));
let result = Behaviour::new(
protocol,
user_agent,
@@ -383,7 +374,7 @@ where
params.block_request_protocol_config,
params.state_request_protocol_config,
params.warp_sync_protocol_config,
bitswap,
params.bitswap,
params.light_client_request_protocol_config,
params.network_config.request_response_protocols,
peerset_handle.clone(),
@@ -1297,13 +1288,7 @@ pub struct NetworkWorker<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
@@ -1336,13 +1321,7 @@ impl<B, H, Client> Future for NetworkWorker<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
type Output = ();
@@ -1375,7 +1354,6 @@ where
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => break,
};
match msg {
ServiceToWorkerMsg::AnnounceBlock(hash, data) => this
.network_service
@@ -1988,13 +1966,7 @@ impl<B, H, Client> Unpin for NetworkWorker<B, H, Client>
where
B: BlockT + 'static,
H: ExHashT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
}
@@ -2002,13 +1974,7 @@ where
struct NetworkLink<'a, B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
protocol: &'a mut Swarm<Behaviour<B, Client>>,
}
@@ -2016,13 +1982,7 @@ where
impl<'a, B, Client> Link<B> for NetworkLink<'a, B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
Client: HeaderBackend<B> + 'static,
{
fn blocks_processed(
&mut self,
@@ -146,6 +146,7 @@ fn build_test_full_node(
import_queue,
chain_sync: Box::new(chain_sync),
metrics_registry: None,
bitswap: None,
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,