Replace the telemetry Mutex with a channel (#3266)

* Replace the telemetry Mutex with a channel

* Don't return Err() if channel is full

* Change polling pattern

* Add more issue numbers
This commit is contained in:
Pierre Krieger
2019-08-01 09:43:37 +02:00
committed by Gavin Wood
parent 0aa3bbf202
commit f8db199b97
3 changed files with 100 additions and 37 deletions
+69 -37
View File
@@ -58,12 +58,12 @@
//! ```
//!
use futures::{prelude::*, task::AtomicWaker};
use futures::{prelude::*, channel::mpsc};
use libp2p::{Multiaddr, wasm_ext};
use log::warn;
use parking_lot::Mutex;
use serde::{Serialize, Deserialize};
use std::{pin::Pin, sync::{Arc, Weak}, task::{Context, Poll}, time::{Duration, Instant}};
use std::{pin::Pin, sync::Arc, task::{Context, Poll}, time::{Duration, Instant}};
pub use slog_scope::with_logger;
pub use slog;
@@ -112,31 +112,32 @@ pub const CONSENSUS_INFO: &str = "1";
/// Telemetry object. Implements `Future` and must be polled regularly.
/// Contains an `Arc` and can be cloned and pass around. Only one clone needs to be polled
/// regularly.
/// regularly and should be polled regularly.
/// Dropping all the clones unregisters the telemetry.
#[derive(Clone)]
pub struct Telemetry {
inner: Arc<TelemetryInner>,
inner: Arc<Mutex<TelemetryInner>>,
/// Slog guard so that we don't get deregistered.
_guard: Arc<slog_scope::GlobalLoggerGuard>,
}
// Implementation notes: considering that logging can happen at any moment, we only have two
// options: locking a mutex (which we currently do), or using a channel (which we should do).
// At the moment, `slog` doesn't provide any easy way to serialize records in order to send them
// over a channel, but ideally that's what should be done.
/// Shared between `Telemetry` and `TelemetryDrain`.
/// Behind the `Mutex` in `Telemetry`.
///
/// Note that ideally we wouldn't have to make the `Telemetry` clonable, as that would remove the
/// need for a `Mutex`. However there is currently a weird hack in place in `substrate-service`
/// where we extract the telemetry registration so that it continues running during the shutdown
/// process.
struct TelemetryInner {
/// Worker for the telemetry.
worker: Mutex<worker::TelemetryWorker>,
/// Waker to wake up when we add a log entry to the worker.
polling_waker: AtomicWaker,
worker: worker::TelemetryWorker,
/// Receives log entries for them to be dispatched to the worker.
receiver: mpsc::Receiver<slog_async::AsyncRecord>,
}
/// Implements `slog::Drain`.
struct TelemetryDrain {
inner: std::panic::AssertUnwindSafe<Weak<TelemetryInner>>,
/// Sends log entries.
sender: std::panic::AssertUnwindSafe<mpsc::Sender<slog_async::AsyncRecord>>,
}
/// Initializes the telemetry. See the crate root documentation for more information.
@@ -153,19 +154,18 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
}
}
let inner = Arc::new(TelemetryInner {
worker: Mutex::new(worker::TelemetryWorker::new(endpoints, config.wasm_external_transport)),
polling_waker: AtomicWaker::new(),
});
let (sender, receiver) = mpsc::channel(16);
let guard = {
let logger = TelemetryDrain { inner: std::panic::AssertUnwindSafe(Arc::downgrade(&inner)) };
let logger = TelemetryDrain { sender: std::panic::AssertUnwindSafe(sender) };
let root = slog::Logger::root(slog::Drain::fuse(logger), slog::o!());
slog_scope::set_global_logger(root)
};
Telemetry {
inner,
inner: Arc::new(Mutex::new(TelemetryInner {
worker: worker::TelemetryWorker::new(endpoints, config.wasm_external_transport),
receiver,
})),
_guard: Arc::new(guard),
}
}
@@ -184,12 +184,42 @@ impl Stream for Telemetry {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let before = Instant::now();
// Because the `Telemetry` is clonable, we need to put the actual fields behind a `Mutex`.
// However, the user is only ever supposed to poll from one instance of `Telemetry`, while
// the other instances are used only for RAII purposes.
// We assume that the user is following this advice and therefore that the `Mutex` is only
// ever locked once at a time.
let mut inner = match self.inner.try_lock() {
Some(l) => l,
None => {
warn!(
target: "telemetry",
"The telemetry seems to be polled multiple times simultaneously"
);
// Returning `Pending` here means that we may never get polled again, but this is
// ok because we're in a situation where something else is actually currently doing
// the polling.
return Poll::Pending;
}
};
let mut has_connected = false;
while let Poll::Ready(event) = self.inner.worker.lock().poll(cx) {
// Right now we only have one possible event. This line is here in order to not
// forget to handle any possible new event type.
let worker::TelemetryWorkerEvent::Connected = event;
has_connected = true;
// The polling pattern is: poll the worker so that it processes its queue, then add one
// message from the receiver (if possible), then poll the worker again, and so on.
loop {
while let Poll::Ready(event) = inner.worker.poll(cx) {
// Right now we only have one possible event. This line is here in order to not
// forget to handle any possible new event type.
let worker::TelemetryWorkerEvent::Connected = event;
has_connected = true;
}
if let Poll::Ready(Some(log_entry)) = Stream::poll_next(Pin::new(&mut inner.receiver), cx) {
log_entry.as_record_values(|rec, val| { let _ = inner.worker.log(rec, val); });
} else {
break;
}
}
if before.elapsed() > Duration::from_millis(200) {
@@ -199,7 +229,6 @@ impl Stream for Telemetry {
if has_connected {
Poll::Ready(Some(TelemetryEvent::Connected))
} else {
self.inner.polling_waker.register(cx.waker());
Poll::Pending
}
}
@@ -210,17 +239,20 @@ impl slog::Drain for TelemetryDrain {
type Err = ();
fn log(&self, record: &slog::Record, values: &slog::OwnedKVList) -> Result<Self::Ok, Self::Err> {
if let Some(inner) = self.inner.0.upgrade() {
let before = Instant::now();
let result = inner.worker.lock().log(record, values);
inner.polling_waker.wake();
if before.elapsed() > Duration::from_millis(50) {
warn!(target: "telemetry", "Writing a telemetry log took more than 50ms");
}
result
} else {
Ok(())
let before = Instant::now();
let serialized = slog_async::AsyncRecord::from(record, values);
// Note: interestingly, `try_send` requires a `&mut` because it modifies some internal value, while `clone()`
// is lock-free.
if let Err(err) = self.sender.clone().try_send(serialized) {
warn!(target: "telemetry", "Ignored telemetry message because of error on channel: {:?}", err);
}
if before.elapsed() > Duration::from_millis(50) {
warn!(target: "telemetry", "Writing a telemetry log took more than 50ms");
}
Ok(())
}
}