Parachain multiplexing (#295)

* Handle continuation frames

* Parachain multiplexing MVP

* Better multiplexing

* Rename NodeMultiplex -> ConnMultiplex

* Cleaner `handle_message`
This commit is contained in:
Maciej Hirsz
2020-12-16 21:24:02 +01:00
committed by GitHub
parent 4ef655c5f3
commit 81cd70cf7d
5 changed files with 137 additions and 68 deletions
+8 -12
View File
@@ -6,7 +6,7 @@ use crate::feed::connector::{FeedConnector, Connected, FeedId};
use crate::util::DenseMap; use crate::util::DenseMap;
use crate::feed::{self, FeedMessageSerializer}; use crate::feed::{self, FeedMessageSerializer};
use crate::chain::{self, Chain, ChainId, Label, GetNodeNetworkState}; use crate::chain::{self, Chain, ChainId, Label, GetNodeNetworkState};
use crate::types::{NodeDetails, NodeId}; use crate::types::{ConnId, NodeDetails, NodeId};
pub struct Aggregator { pub struct Aggregator {
labels: HashMap<Label, ChainId>, labels: HashMap<Label, ChainId>,
@@ -106,8 +106,11 @@ impl Actor for Aggregator {
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct AddNode { pub struct AddNode {
/// Details of the node being added to the aggregator
pub node: NodeDetails, pub node: NodeDetails,
pub network_id: Option<Label>, /// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>, pub rec: Recipient<Initialize>,
} }
@@ -173,21 +176,14 @@ impl Handler<AddNode> for Aggregator {
type Result = (); type Result = ();
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode { node, network_id, rec } = msg; let AddNode { node, conn_id, rec } = msg;
let cid = self.lazy_chain(&node.chain, &network_id, ctx); let cid = self.lazy_chain(&node.chain, &None, ctx);
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed"); let chain = self.chains.get_mut(cid).expect("Entry just created above; qed");
if let Some(network_id) = network_id {
// Attach network id to the chain if it was not done yet
if chain.network_id.is_none() {
chain.network_id = Some(network_id.clone());
self.networks.insert(network_id, cid);
}
}
chain.addr.do_send(chain::AddNode { chain.addr.do_send(chain::AddNode {
node, node,
conn_id,
rec, rec,
}); });
} }
+10 -4
View File
@@ -9,7 +9,7 @@ use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}};
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed}; use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer}; use crate::feed::{self, FeedMessageSerializer};
use crate::util::{DenseMap, NumStats, now}; use crate::util::{DenseMap, NumStats, now};
use crate::types::{NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber}; use crate::types::{ConnId, NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber};
const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes
@@ -194,7 +194,11 @@ impl Actor for Chain {
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct AddNode { pub struct AddNode {
/// Details of the node being added to the aggregator
pub node: NodeDetails, pub node: NodeDetails,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>, pub rec: Recipient<Initialize>,
} }
@@ -248,11 +252,13 @@ impl Handler<AddNode> for Chain {
type Result = (); type Result = ();
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) { fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
self.increment_label_count(&msg.node.chain); let AddNode { node, conn_id, rec } = msg;
self.increment_label_count(&node.chain);
let nid = self.nodes.add(Node::new(msg.node)); let nid = self.nodes.add(Node::new(node));
let chain = ctx.address();
if let Err(_) = msg.rec.do_send(Initialize(nid, ctx.address())) { if let Err(_) = rec.do_send(Initialize { nid, conn_id, chain }) {
self.nodes.remove(nid); self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) { } else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(nid, node)); self.serializer.push(feed::AddedNode(nid, node));
+116 -51
View File
@@ -1,35 +1,60 @@
use std::collections::BTreeMap;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::mem;
use bytes::Bytes; use bytes::{Bytes, BytesMut};
use actix::prelude::*; use actix::prelude::*;
use actix_web_actors::ws; use actix_web_actors::ws;
use actix_http::ws::Item;
use crate::aggregator::{Aggregator, AddNode}; use crate::aggregator::{Aggregator, AddNode};
use crate::chain::{Chain, UpdateNode, RemoveNode}; use crate::chain::{Chain, UpdateNode, RemoveNode};
use crate::node::NodeId; use crate::node::NodeId;
use crate::node::message::{NodeMessage, Details, SystemConnected}; use crate::node::message::{NodeMessage, Details, SystemConnected};
use crate::util::LocateRequest; use crate::util::LocateRequest;
use crate::types::ConnId;
/// How often heartbeat pings are sent /// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
/// How long before lack of client response causes a timeout /// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
/// Continuation buffer limit, 10mb
const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;
pub struct NodeConnector { pub struct NodeConnector {
/// Id of the node this connector is responsible for handling /// Multiplexing connections by id
nid: NodeId, multiplex: BTreeMap<ConnId, ConnMultiplex>,
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
hb: Instant, hb: Instant,
/// Aggregator actor address /// Aggregator actor address
aggregator: Addr<Aggregator>, aggregator: Addr<Aggregator>,
/// Chain actor address
chain: Option<Addr<Chain>>,
/// Backlog of messages to be sent once we get a recipient handle to the chain
backlog: Vec<NodeMessage>,
/// IP address of the node this connector is responsible for /// IP address of the node this connector is responsible for
ip: Option<Ipv4Addr>, ip: Option<Ipv4Addr>,
/// Actix address of location services /// Actix address of location services
locator: Recipient<LocateRequest>, locator: Recipient<LocateRequest>,
/// Buffer for constructing continuation messages
contbuf: BytesMut,
}
enum ConnMultiplex {
Connected {
/// Id of the node this multiplex connector is responsible for handling
nid: NodeId,
/// Chain address to which this multiplex connector is delegating messages
chain: Addr<Chain>,
},
Waiting {
/// Backlog of messages to be sent once we get a recipient handle to the chain
backlog: Vec<NodeMessage>,
}
}
impl Default for ConnMultiplex {
fn default() -> Self {
ConnMultiplex::Waiting {
backlog: Vec::new(),
}
}
} }
impl Actor for NodeConnector { impl Actor for NodeConnector {
@@ -40,8 +65,10 @@ impl Actor for NodeConnector {
} }
fn stopped(&mut self, _: &mut Self::Context) { fn stopped(&mut self, _: &mut Self::Context) {
if let Some(chain) = self.chain.as_ref() { for mx in self.multiplex.values() {
chain.do_send(RemoveNode(self.nid)); if let ConnMultiplex::Connected { chain, nid } = mx {
chain.do_send(RemoveNode(*nid));
}
} }
} }
} }
@@ -49,14 +76,12 @@ impl Actor for NodeConnector {
impl NodeConnector { impl NodeConnector {
pub fn new(aggregator: Addr<Aggregator>, locator: Recipient<LocateRequest>, ip: Option<Ipv4Addr>) -> Self { pub fn new(aggregator: Addr<Aggregator>, locator: Recipient<LocateRequest>, ip: Option<Ipv4Addr>) -> Self {
Self { Self {
// Garbage id, will be replaced by the Initialize message multiplex: BTreeMap::new(),
nid: !0,
hb: Instant::now(), hb: Instant::now(),
aggregator, aggregator,
chain: None,
backlog: Vec::new(),
ip, ip,
locator, locator,
contbuf: BytesMut::new(),
} }
} }
@@ -71,58 +96,88 @@ impl NodeConnector {
} }
fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut <Self as Actor>::Context) { fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut <Self as Actor>::Context) {
if let Some(chain) = self.chain.as_ref() { let conn_id = msg.id.unwrap_or(0);
chain.do_send(UpdateNode {
nid: self.nid,
msg,
raw: Some(data)
});
return; match self.multiplex.entry(conn_id).or_default() {
} ConnMultiplex::Connected { nid, chain } => {
chain.do_send(UpdateNode {
if let Details::SystemConnected(connected) = msg.details { nid: *nid,
let SystemConnected { network_id: _, mut node } = connected; msg,
let rec = ctx.address().recipient(); raw: Some(data),
});
// FIXME: mergin chains by network_id is not the way to do it.
// This will at least force all CC3 nodes to be aggregated with
// the rest.
let network_id = None; // network_id.map(Into::into);
match &*node.chain {
"Kusama CC3" => node.chain = "Kusama".into(),
"Polkadot CC1" => node.chain = "Polkadot".into(),
_ => (),
} }
ConnMultiplex::Waiting { backlog } => {
if let Details::SystemConnected(connected) = msg.details {
let SystemConnected { network_id: _, mut node } = connected;
let rec = ctx.address().recipient();
self.aggregator.do_send(AddNode { rec, network_id, node }); // FIXME: Use genesis hash instead of names to avoid this mess
match &*node.chain {
"Kusama CC3" => node.chain = "Kusama".into(),
"Polkadot CC1" => node.chain = "Polkadot".into(),
_ => (),
}
self.aggregator.do_send(AddNode { rec, conn_id, node });
} else {
if backlog.len() >= 10 {
backlog.remove(0);
}
backlog.push(msg);
}
}
}
}
fn start_frame(&mut self, bytes: &[u8]) {
if !self.contbuf.is_empty() {
log::error!("Unused continuation buffer");
self.contbuf.clear();
}
self.continue_frame(bytes);
}
fn continue_frame(&mut self, bytes: &[u8]) {
if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT {
self.contbuf.extend_from_slice(&bytes);
} else { } else {
if self.backlog.len() >= 10 { log::error!("Continuation buffer overflow");
self.backlog.remove(0); self.contbuf = BytesMut::new();
}
self.backlog.push(msg);
} }
} }
fn finish_frame(&mut self) -> Bytes {
mem::replace(&mut self.contbuf, BytesMut::new()).freeze()
}
} }
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct Initialize(pub NodeId, pub Addr<Chain>); pub struct Initialize {
pub nid: NodeId,
pub conn_id: ConnId,
pub chain: Addr<Chain>,
}
impl Handler<Initialize> for NodeConnector { impl Handler<Initialize> for NodeConnector {
type Result = (); type Result = ();
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) { fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
let Initialize(nid, chain) = msg; let Initialize { nid, conn_id, chain } = msg;
let backlog = std::mem::replace(&mut self.backlog, Vec::new());
for msg in backlog { let mx = self.multiplex.entry(conn_id).or_default();
chain.do_send(UpdateNode { nid, msg, raw: None });
}
self.nid = nid; if let ConnMultiplex::Waiting { backlog } = mx {
self.chain = Some(chain.clone()); for msg in backlog.drain(..) {
chain.do_send(UpdateNode { nid, msg, raw: None });
}
*mx = ConnMultiplex::Connected {
nid,
chain: chain.clone(),
};
};
// Acquire the node's physical location // Acquire the node's physical location
if let Some(ip) = self.ip { if let Some(ip) = self.ip {
@@ -148,9 +203,19 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
return; return;
} }
Ok(ws::Message::Nop) => return, Ok(ws::Message::Nop) => return,
Ok(ws::Message::Continuation(_)) => { Ok(ws::Message::Continuation(cont)) => match cont {
log::error!("Continuation not supported"); Item::FirstText(bytes) | Item::FirstBinary(bytes) => {
return; self.start_frame(&bytes);
return;
}
Item::Continue(bytes) => {
self.continue_frame(&bytes);
return;
}
Item::Last(bytes) => {
self.continue_frame(&bytes);
self.finish_frame()
}
} }
Err(error) => { Err(error) => {
log::error!("{:?}", error); log::error!("{:?}", error);
+2 -1
View File
@@ -3,12 +3,13 @@ use chrono::{DateTime, Utc};
use serde::Deserialize; use serde::Deserialize;
use serde::de::IgnoredAny; use serde::de::IgnoredAny;
use crate::node::NodeDetails; use crate::node::NodeDetails;
use crate::types::{Block, BlockNumber, BlockHash}; use crate::types::{Block, BlockNumber, BlockHash, ConnId};
#[derive(Deserialize, Debug, Message)] #[derive(Deserialize, Debug, Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct NodeMessage { pub struct NodeMessage {
pub ts: DateTime<Utc>, pub ts: DateTime<Utc>,
pub id: Option<ConnId>,
#[serde(flatten)] #[serde(flatten)]
pub details: Details, pub details: Details,
} }
+1
View File
@@ -4,6 +4,7 @@ use serde::Deserialize;
use crate::util::{MeanList, now}; use crate::util::{MeanList, now};
pub type NodeId = usize; pub type NodeId = usize;
pub type ConnId = u64;
pub type BlockNumber = u64; pub type BlockNumber = u64;
pub type Timestamp = u64; pub type Timestamp = u64;
pub type Address = Box<str>; pub type Address = Box<str>;