add ActiveLeavesUpdate, remove StartWork, StopWork (#1458)

* add ActiveLeavesUpdate, remove StartWork, StopWork

* replace StartWork, StopWork in subsystem crate tests

* mechanically update OverseerSignal in other modules

* convert overseer to take advantage of new multi-hash update abilities

Note: this does not yet convert the tests; some of the tests now freeze:

test tests::overseer_start_stop_works ... test tests::overseer_start_stop_works has been running for over 60 seconds
test tests::overseer_finalize_works ... test tests::overseer_finalize_works has been running for over 60 seconds

* fix broken overseer tests

* manually impl PartialEq for ActiveLeavesUpdate, rm trait Equivalent

This cleans up the code a bit and makes it easier in the future to
do the right thing when comparing ALUs.

* use target in all network bridge logging

* reduce spamming of  and
This commit is contained in:
Peter Goodspeed-Niklaus
2020-07-27 10:39:52 +02:00
committed by GitHub
parent 1cb92aa83e
commit 106bd929ce
11 changed files with 189 additions and 140 deletions
@@ -28,7 +28,7 @@ use node_primitives::{ProtocolId, View};
use log::{trace, warn};
use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
};
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use sc_network::ReputationChange;
@@ -157,24 +157,27 @@ impl BitfieldDistribution {
warn!(target: "bitd", "Failed to handle incomming network messages: {:?}", e);
}
}
FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => {
trace!(target: "bitd", "Start {:?}", relay_parent);
// query basic system parameters once
let (validator_set, signing_context) =
query_basics(&mut ctx, relay_parent).await?;
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
for relay_parent in activated {
trace!(target: "bitd", "Start {:?}", relay_parent);
// query basic system parameters once
let (validator_set, signing_context) =
query_basics(&mut ctx, relay_parent).await?;
let _ = state.per_relay_parent.insert(
relay_parent,
PerRelayParentData {
signing_context,
validator_set,
..Default::default()
},
);
}
FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => {
trace!(target: "bitd", "Stop {:?}", relay_parent);
// defer the cleanup to the view change
let _ = state.per_relay_parent.insert(
relay_parent,
PerRelayParentData {
signing_context,
validator_set,
..Default::default()
},
);
}
for relay_parent in deactivated {
trace!(target: "bitd", "Stop {:?}", relay_parent);
// defer the cleanup to the view change
}
}
FromOverseer::Signal(OverseerSignal::Conclude) => {
trace!(target: "bitd", "Conclude");
+2 -1
View File
@@ -17,6 +17,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
[dev-dependencies]
parking_lot = "0.10.0"
assert_matches = "1.3.0"
parking_lot = "0.10.0"
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+28 -42
View File
@@ -28,7 +28,7 @@ use sc_network::{
use sp_runtime::ConsensusEngineId;
use polkadot_subsystem::{
FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult,
};
use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages};
@@ -57,6 +57,9 @@ const UNKNOWN_PROTO_COST: ReputationChange
const MALFORMED_VIEW_COST: ReputationChange
= ReputationChange::new(-500, "Malformed view");
// network bridge log target
const TARGET: &'static str = "network_bridge";
/// Messages received on the network.
#[derive(Debug, Encode, Decode, Clone)]
pub enum WireMessage {
@@ -203,8 +206,7 @@ enum Action {
RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
ReportPeer(PeerId, ReputationChange),
StartWork(Hash),
StopWork(Hash),
ActiveLeaves(ActiveLeavesUpdate),
PeerConnected(PeerId, ObservedRole),
PeerDisconnected(PeerId),
@@ -217,10 +219,8 @@ fn action_from_overseer_message(
res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
) -> Action {
match res {
Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)))
=> Action::StartWork(relay_parent),
Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)))
=> Action::StopWork(relay_parent),
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves)))
=> Action::ActiveLeaves(active_leaves),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer)
@@ -230,7 +230,7 @@ fn action_from_overseer_message(
=> Action::SendMessage(peers, protocol, message),
},
Err(e) => {
log::warn!("Shutting down Network Bridge due to error {:?}", e);
log::warn!(target: TARGET, "Shutting down Network Bridge due to error {:?}", e);
Action::Abort
}
}
@@ -239,7 +239,7 @@ fn action_from_overseer_message(
fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> {
match event {
None => {
log::info!("Shutting down Network Bridge: underlying event stream concluded");
log::info!(target: TARGET, "Shutting down Network Bridge: underlying event stream concluded");
Some(Action::Abort)
}
Some(NetworkEvent::Dht(_)) => None,
@@ -392,37 +392,23 @@ async fn run_network<N: Network>(
Action::ReportPeer(peer, rep) => {
net.report_peer(peer, rep).await?;
}
Action::StartWork(relay_parent) => {
live_heads.push(relay_parent);
if let Some(view_update)
= update_view(&peers, &live_heads, &mut net, &mut local_view).await?
{
if let Err(e) = dispatch_update_to_all(
view_update,
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
}
}
Action::StopWork(relay_parent) => {
live_heads.retain(|h| h != &relay_parent);
if let Some(view_update)
= update_view(&peers, &live_heads, &mut net, &mut local_view).await?
{
if let Err(e) = dispatch_update_to_all(
view_update,
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
}
}
Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
live_heads.extend(activated);
live_heads.retain(|h| !deactivated.contains(h));
if let Some(view_update)
= update_view(&peers, &live_heads, &mut net, &mut local_view).await?
{
if let Err(e) = dispatch_update_to_all(
view_update,
event_producers.values(),
&mut ctx,
).await {
log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
}
}
Action::PeerConnected(peer, role) => {
match peers.entry(peer.clone()) {
HEntry::Occupied(_) => continue,
@@ -450,7 +436,7 @@ async fn run_network<N: Network>(
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
}
@@ -510,7 +496,7 @@ async fn run_network<N: Network>(
let send_messages = ctx.send_messages(outgoing_messages);
if let Err(e) = send_messages.await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
},
@@ -670,7 +656,7 @@ mod tests {
let hash_a = Hash::from([1; 32]);
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))).await;
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a)))).await;
let actions = network_handle.next_network_actions(2).await;
let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode();
@@ -21,7 +21,7 @@
use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor};
use polkadot_subsystem::{
OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem,
ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem,
};
use polkadot_subsystem::messages::{
PoVDistributionMessage, NetworkBridgeEvent, ReputationChange as Rep, PeerId,
@@ -107,23 +107,24 @@ async fn handle_signal(
) -> SubsystemResult<bool> {
match signal {
OverseerSignal::Conclude => Ok(true),
OverseerSignal::StartWork(relay_parent) => {
let (vals_tx, vals_rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(vals_tx),
))).await?;
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
for relay_parent in activated {
let (vals_tx, vals_rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(vals_tx),
))).await?;
state.relay_parent_state.insert(relay_parent, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: vals_rx.await?.len(),
});
state.relay_parent_state.insert(relay_parent, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: vals_rx.await?.len(),
});
}
Ok(false)
}
OverseerSignal::StopWork(relay_parent) => {
state.relay_parent_state.remove(&relay_parent);
for relay_parent in deactivated {
state.relay_parent_state.remove(&relay_parent);
}
Ok(false)
}
@@ -21,7 +21,7 @@
use polkadot_subsystem::{
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
FromOverseer, OverseerSignal,
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
};
use polkadot_subsystem::messages::{
AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage,
@@ -840,30 +840,29 @@ async fn run(
loop {
let message = ctx.recv().await?;
match message {
FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => {
let (validators, session_index) = {
let (val_tx, val_rx) = oneshot::channel();
let (session_tx, session_rx) = oneshot::channel();
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => {
for relay_parent in activated {
let (validators, session_index) = {
let (val_tx, val_rx) = oneshot::channel();
let (session_tx, session_rx) = oneshot::channel();
let val_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)),
);
let session_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)),
);
let val_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)),
);
let session_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)),
);
ctx.send_messages(
std::iter::once(val_message).chain(std::iter::once(session_message))
).await?;
ctx.send_messages(
std::iter::once(val_message).chain(std::iter::once(session_message))
).await?;
(val_rx.await?, session_rx.await?.session_index)
};
(val_rx.await?, session_rx.await?.session_index)
};
active_heads.entry(relay_parent)
.or_insert(ActiveHeadData::new(validators, session_index));
}
FromOverseer::Signal(OverseerSignal::StopWork(_relay_parent)) => {
// do nothing - we will handle this when our view changes.
active_heads.entry(relay_parent)
.or_insert(ActiveHeadData::new(validators, session_index));
}
}
FromOverseer::Signal(OverseerSignal::Conclude) => break,
FromOverseer::Communication { msg } => match msg {