Upgrade to libp2p 0.45.1 (#11682)

* Upgrade to libp2p 0.45.1

* Limit max_negotiating_inbound_streams to 512

* Upgrade prost-build to 0.10

* Set max_negotiating_inbound_streams to 2048

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>

* Fix authority discovery protobuf

* Fix comments in authority-discovery schema

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>

* Add a comment about transport initialization

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Roman
2022-06-17 17:47:14 +04:00
committed by GitHub
parent 0108d216d2
commit 1988c4ac1d
17 changed files with 277 additions and 158 deletions
+27 -9
View File
@@ -42,7 +42,10 @@ use log::{error, warn};
use parking_lot::Mutex;
use serde::Serialize;
use std::{
collections::HashMap,
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
},
sync::{atomic, Arc},
};
@@ -147,7 +150,6 @@ pub struct TelemetryWorker {
register_receiver: mpsc::UnboundedReceiver<Register>,
register_sender: mpsc::UnboundedSender<Register>,
id_counter: Arc<atomic::AtomicU64>,
transport: WsTrans,
}
impl TelemetryWorker {
@@ -155,7 +157,11 @@ impl TelemetryWorker {
///
/// Only one is needed per process.
pub fn new(buffer_size: usize) -> Result<Self> {
let transport = initialize_transport()?;
// Let's try to initialize a transport to get an early return.
// Later transport will be initialized multiple times in
// `::process_register`, so it's a convenient way to get an
// error as early as possible.
let _transport = initialize_transport()?;
let (message_sender, message_receiver) = mpsc::channel(buffer_size);
let (register_sender, register_receiver) = mpsc::unbounded();
@@ -165,7 +171,6 @@ impl TelemetryWorker {
register_receiver,
register_sender,
id_counter: Arc::new(atomic::AtomicU64::new(1)),
transport,
})
}
@@ -200,7 +205,6 @@ impl TelemetryWorker {
&mut node_pool,
&mut node_map,
&mut pending_connection_notifications,
self.transport.clone(),
).await,
}
}
@@ -211,7 +215,6 @@ impl TelemetryWorker {
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
transport: WsTrans,
) {
let input = input.expect("the stream is never closed; qed");
@@ -248,9 +251,24 @@ impl TelemetryWorker {
);
node_map.entry(id).or_default().push((verbosity, addr.clone()));
let node = node_pool.entry(addr.clone()).or_insert_with(|| {
Node::new(transport.clone(), addr.clone(), Vec::new(), Vec::new())
});
let node = match node_pool.entry(addr.clone()) {
Occupied(entry) => entry.into_mut(),
Vacant(entry) => {
let transport = initialize_transport();
let transport = match transport {
Ok(t) => t,
Err(err) => {
log::error!(
target: "telemetry",
"Could not initialise transport: {}",
err,
);
continue
},
};
entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
},
};
node.connection_messages.extend(connection_message.clone());
+13 -11
View File
@@ -110,7 +110,6 @@ impl<TTrans: Transport> Node<TTrans> {
impl<TTrans: Transport, TSinkErr> Node<TTrans>
where
TTrans: Clone + Unpin,
TTrans::Dial: Unpin,
TTrans::Output:
Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
@@ -137,7 +136,7 @@ pub(crate) enum Infallible {}
impl<TTrans: Transport, TSinkErr> Sink<TelemetryPayload> for Node<TTrans>
where
TTrans: Clone + Unpin,
TTrans: Unpin,
TTrans::Dial: Unpin,
TTrans::Output:
Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
@@ -228,15 +227,18 @@ where
socket = NodeSocket::wait_reconnect();
},
},
NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) {
Ok(d) => {
log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
socket = NodeSocket::Dialing(d);
},
Err(err) => {
log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
socket = NodeSocket::wait_reconnect();
},
NodeSocket::ReconnectNow => {
let addr = self.addr.clone();
match self.transport.dial(addr) {
Ok(d) => {
log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
socket = NodeSocket::Dialing(d);
},
Err(err) => {
log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
socket = NodeSocket::wait_reconnect();
},
}
},
NodeSocket::WaitingReconnect(mut s) => {
if Future::poll(Pin::new(&mut s), cx).is_ready() {