mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-29 16:07:57 +00:00
grandpa: consistent argument ordering in rebroadcast vs. network (#1647)
* grandpa: consistent argument ordering in rebroadcast vs. network * use a round type
This commit is contained in:
committed by
Gav Wood
parent
1ea56905cf
commit
2155e44e13
@@ -33,24 +33,29 @@ fn localized_payload<E: Encode>(round: u64, set_id: u64, message: &E) -> Vec<u8>
|
||||
(message, round, set_id).encode()
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
|
||||
struct Round(u64);
|
||||
#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
|
||||
struct SetId(u64);
|
||||
|
||||
enum Broadcast<Block: BlockT> {
|
||||
// set_id, round, encoded commit.
|
||||
Commit(u64, u64, Vec<u8>),
|
||||
// set_id, round, encoded signed message.
|
||||
Message(u64, u64, Vec<u8>),
|
||||
// set_id, round, announcement of block hash that should be downloaded
|
||||
Announcement(u64, u64, Block::Hash),
|
||||
// set_id, round being dropped.
|
||||
DropRound(u64, u64),
|
||||
// round, set id, encoded commit.
|
||||
Commit(Round, SetId, Vec<u8>),
|
||||
// round, set id, encoded signed message.
|
||||
Message(Round, SetId, Vec<u8>),
|
||||
// round, set id, announcement of block hash that should be downloaded
|
||||
Announcement(Round, SetId, Block::Hash),
|
||||
// round, set id being dropped.
|
||||
DropRound(Round, SetId),
|
||||
}
|
||||
|
||||
impl<Block: BlockT> Broadcast<Block> {
|
||||
fn set_id(&self) -> u64 {
|
||||
fn set_id(&self) -> SetId {
|
||||
match *self {
|
||||
Broadcast::Commit(s, _, _) => s,
|
||||
Broadcast::Message(s, _, _) => s,
|
||||
Broadcast::Announcement(s, _, _) => s,
|
||||
Broadcast::DropRound(s, _) => s,
|
||||
Broadcast::Commit(_, s, _) => s,
|
||||
Broadcast::Message(_, s, _) => s,
|
||||
Broadcast::Announcement(_, s, _) => s,
|
||||
Broadcast::DropRound(_, s) => s,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -66,9 +71,9 @@ pub(crate) fn rebroadcasting_network<B: BlockT, N: Network<B>>(network: N) -> (B
|
||||
(
|
||||
BroadcastWorker {
|
||||
interval: Interval::new_interval(REBROADCAST_PERIOD),
|
||||
set_id: 0, // will be overwritten on first item to broadcast.
|
||||
set_id: SetId(0), // will be overwritten on first item to broadcast.
|
||||
last_commit: None,
|
||||
round_messages: (0, Vec::new()),
|
||||
round_messages: (Round(0), Vec::new()),
|
||||
announcements: HashMap::new(),
|
||||
network: network.clone(),
|
||||
incoming_broadcast: rx,
|
||||
@@ -85,10 +90,10 @@ pub(crate) fn rebroadcasting_network<B: BlockT, N: Network<B>>(network: N) -> (B
|
||||
#[must_use = "network rebroadcast future must be driven to completion"]
|
||||
pub(crate) struct BroadcastWorker<B: BlockT, N: Network<B>> {
|
||||
interval: Interval,
|
||||
set_id: u64,
|
||||
last_commit: Option<(u64, Vec<u8>)>,
|
||||
round_messages: (u64, Vec<Vec<u8>>),
|
||||
announcements: HashMap<B::Hash, u64>,
|
||||
set_id: SetId,
|
||||
last_commit: Option<(Round, Vec<u8>)>,
|
||||
round_messages: (Round, Vec<Vec<u8>>),
|
||||
announcements: HashMap<B::Hash, Round>,
|
||||
network: N,
|
||||
incoming_broadcast: mpsc::UnboundedReceiver<Broadcast<B>>,
|
||||
}
|
||||
@@ -115,17 +120,18 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> {
|
||||
}
|
||||
|
||||
if rebroadcast {
|
||||
if let Some((c_round, ref c_commit)) = self.last_commit {
|
||||
self.network.send_commit(c_round, self.set_id, c_commit.clone());
|
||||
let SetId(set_id) = self.set_id;
|
||||
if let Some((Round(c_round), ref c_commit)) = self.last_commit {
|
||||
self.network.send_commit(c_round, set_id, c_commit.clone());
|
||||
}
|
||||
|
||||
let round = self.round_messages.0;
|
||||
let Round(round) = self.round_messages.0;
|
||||
for message in self.round_messages.1.iter().cloned() {
|
||||
self.network.send_message(round, self.set_id, message);
|
||||
self.network.send_message(round, set_id, message);
|
||||
}
|
||||
|
||||
for (&announce_hash, &round) in &self.announcements {
|
||||
self.network.announce(round, self.set_id, announce_hash);
|
||||
for (&announce_hash, &Round(round)) in &self.announcements {
|
||||
self.network.announce(round, set_id, announce_hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -139,22 +145,24 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> {
|
||||
if item.set_id() > self.set_id {
|
||||
self.set_id = item.set_id();
|
||||
self.last_commit = None;
|
||||
self.round_messages = (0, Vec::new());
|
||||
self.round_messages = (Round(0), Vec::new());
|
||||
self.announcements.clear();
|
||||
}
|
||||
|
||||
match item {
|
||||
Broadcast::Commit(set_id, round, commit) => {
|
||||
Broadcast::Commit(round, set_id, commit) => {
|
||||
if self.set_id == set_id {
|
||||
if round >= self.last_commit.as_ref().map_or(0, |&(r, _)| r) {
|
||||
if round >= self.last_commit.as_ref()
|
||||
.map_or(Round(0), |&(r, _)| r)
|
||||
{
|
||||
self.last_commit = Some((round, commit.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
// always send out to network.
|
||||
self.network.send_commit(round, self.set_id, commit);
|
||||
self.network.send_commit(round.0, self.set_id.0, commit);
|
||||
}
|
||||
Broadcast::Message(set_id, round, message) => {
|
||||
Broadcast::Message(round, set_id, message) => {
|
||||
if self.set_id == set_id {
|
||||
if round > self.round_messages.0 {
|
||||
self.round_messages = (round, vec![message.clone()]);
|
||||
@@ -166,20 +174,20 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> {
|
||||
}
|
||||
|
||||
// always send out to network.
|
||||
self.network.send_message(round, set_id, message);
|
||||
self.network.send_message(round.0, set_id.0, message);
|
||||
}
|
||||
Broadcast::Announcement(set_id, round, hash) => {
|
||||
Broadcast::Announcement(round, set_id, hash) => {
|
||||
if self.set_id == set_id {
|
||||
self.announcements.insert(hash, round);
|
||||
}
|
||||
|
||||
// always send out.
|
||||
self.network.announce(round, set_id, hash);
|
||||
self.network.announce(round.0, set_id.0, hash);
|
||||
}
|
||||
Broadcast::DropRound(set_id, round) => {
|
||||
Broadcast::DropRound(round, set_id) => {
|
||||
// stop making announcements for any dead rounds.
|
||||
self.announcements.retain(|_, &mut r| r > round);
|
||||
self.network.drop_messages(round, set_id);
|
||||
self.network.drop_messages(round.0, set_id.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -196,11 +204,11 @@ impl<B: BlockT, N: Network<B>> Network<B> for BroadcastHandle<B, N> {
|
||||
}
|
||||
|
||||
fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
|
||||
let _ = self.relay.unbounded_send(Broadcast::Message(set_id, round, message));
|
||||
let _ = self.relay.unbounded_send(Broadcast::Message(Round(round), SetId(set_id), message));
|
||||
}
|
||||
|
||||
fn drop_messages(&self, round: u64, set_id: u64) {
|
||||
let _ = self.relay.unbounded_send(Broadcast::DropRound(set_id, round));
|
||||
let _ = self.relay.unbounded_send(Broadcast::DropRound(Round(round), SetId(set_id)));
|
||||
}
|
||||
|
||||
fn commit_messages(&self, set_id: u64) -> Self::In {
|
||||
@@ -208,11 +216,13 @@ impl<B: BlockT, N: Network<B>> Network<B> for BroadcastHandle<B, N> {
|
||||
}
|
||||
|
||||
fn send_commit(&self, round: u64, set_id: u64, message: Vec<u8>) {
|
||||
let _ = self.relay.unbounded_send(Broadcast::Commit(round, set_id, message));
|
||||
let _ = self.relay.unbounded_send(Broadcast::Commit(Round(round), SetId(set_id), message));
|
||||
}
|
||||
|
||||
fn announce(&self, round: u64, set_id: u64, block: B::Hash) {
|
||||
let _ = self.relay.unbounded_send(Broadcast::Announcement(round, set_id, block));
|
||||
let _ = self.relay.unbounded_send(
|
||||
Broadcast::Announcement(Round(round), SetId(set_id), block)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user