mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 11:41:02 +00:00
client/beefy: small code improvements (#12414)
* client/beefy: remove bounds on type definitions * client/beefy: remove gossip protocol legacy name * client/beefy: simplify justification request response engine Signed-off-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
@@ -33,9 +33,6 @@ pub(crate) mod beefy_protocol_name {
|
|||||||
/// BEEFY justifications protocol name suffix.
|
/// BEEFY justifications protocol name suffix.
|
||||||
const JUSTIFICATIONS_NAME: &str = "/beefy/justifications/1";
|
const JUSTIFICATIONS_NAME: &str = "/beefy/justifications/1";
|
||||||
|
|
||||||
/// Old names for the gossip protocol, used for backward compatibility.
|
|
||||||
pub(super) const LEGACY_NAMES: [&str; 1] = ["/paritytech/beefy/1"];
|
|
||||||
|
|
||||||
/// Name of the votes gossip protocol used by BEEFY.
|
/// Name of the votes gossip protocol used by BEEFY.
|
||||||
///
|
///
|
||||||
/// Must be registered towards the networking in order for BEEFY voter to properly function.
|
/// Must be registered towards the networking in order for BEEFY voter to properly function.
|
||||||
@@ -73,9 +70,7 @@ pub fn beefy_peers_set_config(
|
|||||||
) -> sc_network_common::config::NonDefaultSetConfig {
|
) -> sc_network_common::config::NonDefaultSetConfig {
|
||||||
let mut cfg =
|
let mut cfg =
|
||||||
sc_network_common::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024);
|
sc_network_common::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024);
|
||||||
|
|
||||||
cfg.allow_non_reserved(25, 25);
|
cfg.allow_non_reserved(25, 25);
|
||||||
cfg.add_fallback_names(beefy_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect());
|
|
||||||
cfg
|
cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+15
-19
@@ -20,10 +20,7 @@
|
|||||||
|
|
||||||
use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet};
|
use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet};
|
||||||
use codec::Encode;
|
use codec::Encode;
|
||||||
use futures::{
|
use futures::channel::{oneshot, oneshot::Canceled};
|
||||||
channel::{oneshot, oneshot::Canceled},
|
|
||||||
stream::{self, StreamExt},
|
|
||||||
};
|
|
||||||
use log::{debug, error, warn};
|
use log::{debug, error, warn};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use sc_network::{PeerId, ProtocolName};
|
use sc_network::{PeerId, ProtocolName};
|
||||||
@@ -50,8 +47,8 @@ type Response = Result<Vec<u8>, RequestFailure>;
|
|||||||
type ResponseReceiver = oneshot::Receiver<Response>;
|
type ResponseReceiver = oneshot::Receiver<Response>;
|
||||||
|
|
||||||
enum State<B: Block> {
|
enum State<B: Block> {
|
||||||
Idle(stream::Pending<Result<Response, Canceled>>),
|
Idle,
|
||||||
AwaitingResponse(PeerId, NumberFor<B>, stream::Once<ResponseReceiver>),
|
AwaitingResponse(PeerId, NumberFor<B>, ResponseReceiver),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct OnDemandJustificationsEngine<B: Block, R> {
|
pub struct OnDemandJustificationsEngine<B: Block, R> {
|
||||||
@@ -83,7 +80,7 @@ where
|
|||||||
protocol_name,
|
protocol_name,
|
||||||
live_peers,
|
live_peers,
|
||||||
peers_cache: VecDeque::new(),
|
peers_cache: VecDeque::new(),
|
||||||
state: State::Idle(stream::pending()),
|
state: State::Idle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -118,15 +115,14 @@ where
|
|||||||
IfDisconnected::ImmediateError,
|
IfDisconnected::ImmediateError,
|
||||||
);
|
);
|
||||||
|
|
||||||
self.state = State::AwaitingResponse(peer, block, stream::once(rx));
|
self.state = State::AwaitingResponse(peer, block, rx);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If no other request is in progress, start new justification request for `block`.
|
/// If no other request is in progress, start new justification request for `block`.
|
||||||
pub fn request(&mut self, block: NumberFor<B>) {
|
pub fn request(&mut self, block: NumberFor<B>) {
|
||||||
// ignore new requests while there's already one pending
|
// ignore new requests while there's already one pending
|
||||||
match &self.state {
|
if matches!(self.state, State::AwaitingResponse(_, _, _)) {
|
||||||
State::AwaitingResponse(_, _, _) => return,
|
return
|
||||||
State::Idle(_) => (),
|
|
||||||
}
|
}
|
||||||
self.reset_peers_cache_for_block(block);
|
self.reset_peers_cache_for_block(block);
|
||||||
|
|
||||||
@@ -148,7 +144,7 @@ where
|
|||||||
"🥩 cancel pending request for justification #{:?}",
|
"🥩 cancel pending request for justification #{:?}",
|
||||||
number
|
number
|
||||||
);
|
);
|
||||||
self.state = State::Idle(stream::pending());
|
self.state = State::Idle;
|
||||||
},
|
},
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
@@ -194,19 +190,19 @@ where
|
|||||||
|
|
||||||
pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
|
pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
|
||||||
let (peer, block, resp) = match &mut self.state {
|
let (peer, block, resp) = match &mut self.state {
|
||||||
State::Idle(pending) => {
|
State::Idle => {
|
||||||
let _ = pending.next().await;
|
futures::pending!();
|
||||||
// This never happens since 'stream::pending' never generates any items.
|
// Doesn't happen as 'futures::pending!()' is an 'await' barrier that never passes.
|
||||||
return None
|
return None
|
||||||
},
|
},
|
||||||
State::AwaitingResponse(peer, block, receiver) => {
|
State::AwaitingResponse(peer, block, receiver) => {
|
||||||
let resp = receiver.next().await?;
|
let resp = receiver.await;
|
||||||
(*peer, *block, resp)
|
(*peer, *block, resp)
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
// We received the awaited response. Our 'stream::once()' receiver will never generate any
|
// We received the awaited response. Our 'receiver' will never generate any other response,
|
||||||
// other response, meaning we're done with current state. Move the engine to `State::Idle`.
|
// meaning we're done with current state. Move the engine to `State::Idle`.
|
||||||
self.state = State::Idle(stream::pending());
|
self.state = State::Idle;
|
||||||
|
|
||||||
let block_id = BlockId::number(block);
|
let block_id = BlockId::number(block);
|
||||||
let validator_set = self
|
let validator_set = self
|
||||||
|
|||||||
@@ -153,11 +153,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// BEEFY gadget network parameters.
|
/// BEEFY gadget network parameters.
|
||||||
pub struct BeefyNetworkParams<B, N>
|
pub struct BeefyNetworkParams<B: Block, N> {
|
||||||
where
|
|
||||||
B: Block,
|
|
||||||
N: GossipNetwork<B> + NetworkRequest + SyncOracle + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
/// Network implementing gossip, requests and sync-oracle.
|
/// Network implementing gossip, requests and sync-oracle.
|
||||||
pub network: Arc<N>,
|
pub network: Arc<N>,
|
||||||
/// Chain specific BEEFY gossip protocol name. See
|
/// Chain specific BEEFY gossip protocol name. See
|
||||||
@@ -171,15 +167,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// BEEFY gadget initialization parameters.
|
/// BEEFY gadget initialization parameters.
|
||||||
pub struct BeefyParams<B, BE, C, N, R>
|
pub struct BeefyParams<B: Block, BE, C, N, R> {
|
||||||
where
|
|
||||||
B: Block,
|
|
||||||
BE: Backend<B>,
|
|
||||||
C: Client<B, BE>,
|
|
||||||
R: ProvideRuntimeApi<B>,
|
|
||||||
R::Api: BeefyApi<B> + MmrApi<B, MmrRootHash>,
|
|
||||||
N: GossipNetwork<B> + NetworkRequest + SyncOracle + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
/// BEEFY client
|
/// BEEFY client
|
||||||
pub client: Arc<C>,
|
pub client: Arc<C>,
|
||||||
/// Client Backend
|
/// Client Backend
|
||||||
|
|||||||
Reference in New Issue
Block a user