fix(event loop, client): better async

This commit is contained in:
Nova
2023-01-14 12:38:05 -05:00
parent 49810e8fd1
commit 6ca93ea24c
7 changed files with 171 additions and 133 deletions

View File

@@ -1,6 +1,6 @@
use super::{eventloop::EventLoop, scenegraph::Scenegraph}; use super::scenegraph::Scenegraph;
use crate::{ use crate::{
core::registry::Registry, core::registry::OwnedRegistry,
nodes::{ nodes::{
data, drawable, fields, hmd, input, items, data, drawable, fields, hmd, input, items,
root::Root, root::Root,
@@ -9,35 +9,26 @@ use crate::{
Node, Node,
}, },
}; };
use color_eyre::{ use color_eyre::eyre::{eyre, Result};
eyre::{eyre, Result},
Report,
};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use parking_lot::Mutex; use parking_lot::Mutex;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use stardust_xr::messenger::{self, MessageSenderHandle}; use stardust_xr::messenger::{self, MessageSenderHandle};
use std::{ use std::{fs, iter::FromIterator, path::PathBuf, sync::Arc};
fs, use tokio::{net::UnixStream, task::JoinHandle};
iter::FromIterator, use tracing::info;
path::PathBuf,
sync::{Arc, Weak},
};
use tokio::{net::UnixStream, sync::Notify, task::JoinHandle};
use tracing::{info, warn};
lazy_static! { lazy_static! {
pub static ref CLIENTS: Registry<Client> = Registry::new(); pub static ref CLIENTS: OwnedRegistry<Client> = OwnedRegistry::new();
pub static ref INTERNAL_CLIENT: Arc<Client> = CLIENTS.add(Client { pub static ref INTERNAL_CLIENT: Arc<Client> = CLIENTS.add(Client {
event_loop: Weak::new(),
index: 0,
pid: None, pid: None,
// env: None, // env: None,
exe: None, exe: None,
stop_notifier: Default::default(), dispatch_join_handle: OnceCell::new(),
join_handle: OnceCell::new(), flush_join_handle: OnceCell::new(),
disconnect_status: OnceCell::new(),
message_sender_handle: None, message_sender_handle: None,
scenegraph: Default::default(), scenegraph: Default::default(),
@@ -61,13 +52,12 @@ pub fn startup_settings(env: &FxHashMap<String, String>) -> Option<StartupSettin
} }
pub struct Client { pub struct Client {
event_loop: Weak<EventLoop>,
index: usize,
pid: Option<i32>, pid: Option<i32>,
// env: Option<FxHashMap<String, String>>, // env: Option<FxHashMap<String, String>>,
exe: Option<PathBuf>, exe: Option<PathBuf>,
stop_notifier: Arc<Notify>, dispatch_join_handle: OnceCell<JoinHandle<Result<()>>>,
join_handle: OnceCell<JoinHandle<Result<()>>>, flush_join_handle: OnceCell<JoinHandle<Result<()>>>,
disconnect_status: OnceCell<Result<()>>,
pub message_sender_handle: Option<MessageSenderHandle>, pub message_sender_handle: Option<MessageSenderHandle>,
pub scenegraph: Arc<Scenegraph>, pub scenegraph: Arc<Scenegraph>,
@@ -76,16 +66,11 @@ pub struct Client {
pub startup_settings: Option<StartupSettings>, pub startup_settings: Option<StartupSettings>,
} }
impl Client { impl Client {
pub fn from_connection( pub fn from_connection(connection: UnixStream) -> Arc<Self> {
index: usize,
event_loop: &Arc<EventLoop>,
connection: UnixStream,
) -> Arc<Self> {
let pid = connection.peer_cred().ok().and_then(|c| c.pid()); let pid = connection.peer_cred().ok().and_then(|c| c.pid());
let env = pid.and_then(|pid| get_env(pid).ok()); let env = pid.and_then(|pid| get_env(pid).ok());
let exe = pid.and_then(|pid| fs::read_link(format!("/proc/{}/exe", pid)).ok()); let exe = pid.and_then(|pid| fs::read_link(format!("/proc/{}/exe", pid)).ok());
info!( info!(
index = index,
pid, pid,
exe = exe exe = exe
.as_ref() .as_ref()
@@ -98,13 +83,13 @@ impl Client {
let startup_settings = env.as_ref().and_then(|env| startup_settings(env)); let startup_settings = env.as_ref().and_then(|env| startup_settings(env));
let client = CLIENTS.add(Client { let client = CLIENTS.add(Client {
event_loop: Arc::downgrade(event_loop),
index,
pid, pid,
// env, // env,
exe, exe: exe.clone(),
stop_notifier: Default::default(),
join_handle: OnceCell::new(), dispatch_join_handle: OnceCell::new(),
flush_join_handle: OnceCell::new(),
disconnect_status: OnceCell::new(),
message_sender_handle: Some(messenger_tx.handle()), message_sender_handle: Some(messenger_tx.handle()),
scenegraph: scenegraph.clone(), scenegraph: scenegraph.clone(),
@@ -123,32 +108,57 @@ impl Client {
input::create_interface(&client); input::create_interface(&client);
startup::create_interface(&client); startup::create_interface(&client);
let _ = client.join_handle.set(tokio::spawn({ let pid_printable = pid
let client = client.clone(); .map(|pid| pid.to_string())
async move { .unwrap_or_else(|| "??".to_string());
let dispatch_loop = async move { let exe_printable = exe
loop { .and_then(|exe| {
messenger_rx.dispatch(&*scenegraph).await? exe.file_name()
.and_then(|exe| exe.to_str())
.map(|exe| exe.to_string())
})
.unwrap_or_else(|| "??".to_string());
let _ = client.dispatch_join_handle.get_or_try_init(|| {
tokio::task::Builder::new()
.name(&format!(
"client dispatch pid={} exe={}",
&pid_printable, &exe_printable,
))
.spawn({
let client = client.clone();
async move {
loop {
match messenger_rx.dispatch(&*scenegraph).await {
Err(e) => {
client.disconnect(Err(e.into()));
}
_ => (),
}
}
} }
}; })
let flush_loop = async { });
loop { let _ = client.flush_join_handle.get_or_try_init(|| {
messenger_tx.flush().await? tokio::task::Builder::new()
.name(&format!(
"client flush pid={} exe={}",
&pid_printable, &exe_printable,
))
.spawn({
let client = client.clone();
async move {
loop {
match messenger_tx.flush().await {
Err(e) => {
client.disconnect(Err(e.into()));
}
_ => (),
}
}
} }
}; })
});
let result: Result<(), Report> = tokio::select! {
_ = client.stop_notifier.notified() => Ok(()),
e = dispatch_loop => e,
e = flush_loop => e,
};
if let Err(e) = &result {
warn!(error = e.root_cause(), "Client disconnected with error!");
}
client.disconnect().await;
result
}
}));
client client
} }
@@ -159,24 +169,30 @@ impl Client {
.ok_or_else(|| eyre!("{} not found", name)) .ok_or_else(|| eyre!("{} not found", name))
} }
pub async fn disconnect(&self) { pub fn disconnect(&self, reason: Result<()>) {
self.stop_notifier.notify_one(); let _ = self.disconnect_status.set(reason);
if let Some(event_loop) = self.event_loop.upgrade() { if let Some(dispatch_join_handle) = self.dispatch_join_handle.get() {
event_loop.clients.lock().await.remove(self.index); dispatch_join_handle.abort();
} }
if let Some(flush_join_handle) = self.flush_join_handle.get() {
flush_join_handle.abort();
}
CLIENTS.remove(self);
} }
} }
impl Drop for Client { impl Drop for Client {
fn drop(&mut self) { fn drop(&mut self) {
self.stop_notifier.notify_one();
CLIENTS.remove(self);
info!( info!(
index = self.index,
pid = self.pid, pid = self.pid,
exe = self exe = self
.exe .exe
.as_ref() .as_ref()
.and_then(|exe| exe.to_str().map(|s| s.to_string())), .and_then(|exe| exe.to_str().map(|s| s.to_string())),
disconnect_status = match self.disconnect_status.take() {
Some(Ok(_)) => "Graceful disconnect".to_string(),
Some(Err(e)) => format!("Error: {}", e.root_cause()),
None => "Unknown".to_string(),
},
"Client disconnected" "Client disconnected"
); );
} }

View File

@@ -1,62 +1,49 @@
use super::client::Client; use super::client::Client;
use color_eyre::eyre::Result; use color_eyre::eyre::Result;
use slab::Slab; use once_cell::sync::OnceCell;
use stardust_xr::server; use stardust_xr::server;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use tokio::sync::{Mutex, Notify};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
pub static FRAME: AtomicU64 = AtomicU64::new(0); pub static FRAME: AtomicU64 = AtomicU64::new(0);
pub struct EventLoop { pub struct EventLoop {
pub socket_path: PathBuf, pub socket_path: PathBuf,
stop_notifier: Arc<Notify>, join_handle: OnceCell<JoinHandle<()>>,
pub clients: Mutex<Slab<Arc<Client>>>,
} }
impl EventLoop { impl EventLoop {
pub fn new() -> Result<(Arc<Self>, JoinHandle<Result<()>>)> { pub fn new() -> Result<Arc<Self>> {
let socket_path = server::get_free_socket_path() let socket_path = server::get_free_socket_path()
.ok_or_else(|| std::io::Error::from(std::io::ErrorKind::Other))?; .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::Other))?;
let socket = UnixListener::bind(socket_path.clone())?; let socket = UnixListener::bind(socket_path.clone())?;
let event_loop = Arc::new(EventLoop { let event_loop = Arc::new(EventLoop {
socket_path, socket_path,
stop_notifier: Default::default(), join_handle: OnceCell::new(),
clients: Mutex::new(Slab::new()),
}); });
let event_loop_join_handle = tokio::spawn({ let join_handle = tokio::task::Builder::new()
let event_loop = event_loop.clone(); .name("event loop")
async move { EventLoop::event_loop(socket, event_loop).await } .spawn(async move {
}); loop {
let Ok((socket, _)) = socket.accept().await else { continue };
Client::from_connection(socket);
}
})?;
let _ = event_loop.join_handle.set(join_handle);
Ok((event_loop, event_loop_join_handle)) Ok(event_loop)
}
async fn event_loop(socket: UnixListener, event_loop: Arc<EventLoop>) -> Result<()> {
let event_loop_async = async {
loop {
let (socket, _) = socket.accept().await?;
let mut clients = event_loop.clients.lock().await;
let vacant_client = clients.vacant_entry();
let idx = vacant_client.key();
vacant_client.insert(Client::from_connection(idx, &event_loop, socket));
}
};
tokio::select! {
_ = event_loop.stop_notifier.notified() => Ok(()),
e = event_loop_async => e,
}
} }
} }
impl Drop for EventLoop { impl Drop for EventLoop {
fn drop(&mut self) { fn drop(&mut self) {
self.stop_notifier.notify_one(); if let Some(join_handle) = self.join_handle.take() {
join_handle.abort();
}
} }
} }

View File

@@ -42,3 +42,35 @@ impl<T: Send + Sync + ?Sized> Registry<T> {
self.0.lock().clear(); self.0.lock().clear();
} }
} }
pub struct OwnedRegistry<T: Send + Sync + ?Sized>(Lazy<Mutex<FxHashMap<usize, Arc<T>>>>);
impl<T: Send + Sync + ?Sized> OwnedRegistry<T> {
pub const fn new() -> Self {
OwnedRegistry(Lazy::new(|| Mutex::new(FxHashMap::default())))
}
pub fn add(&self, t: T) -> Arc<T>
where
T: Sized,
{
let t_arc = Arc::new(t);
self.add_raw(t_arc.clone());
t_arc
}
pub fn add_raw(&self, t: Arc<T>) {
self.0
.lock()
.insert(Arc::as_ptr(&t) as *const () as usize, t);
}
pub fn get_vec(&self) -> Vec<Arc<T>> {
self.0.lock().values().cloned().collect::<Vec<_>>()
}
pub fn remove(&self, t: &T) {
self.0
.lock()
.remove(&(ptr::addr_of!(*t) as *const () as usize));
}
pub fn clear(&self) {
self.0.lock().clear();
}
}

View File

@@ -42,9 +42,8 @@ struct CliArgs {
} }
fn main() -> Result<()> { fn main() -> Result<()> {
if !cfg!(feature = "profile") { #[cfg(not(feature = "profile"))]
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
}
#[cfg(feature = "profile")] #[cfg(feature = "profile")]
console_subscriber::init(); console_subscriber::init();
let project_dirs = ProjectDirs::from("", "", "stardust").unwrap(); let project_dirs = ProjectDirs::from("", "", "stardust").unwrap();
@@ -163,8 +162,7 @@ async fn event_loop(
let _ = handle_sender.send(Handle::current()); let _ = handle_sender.send(Handle::current());
// console_subscriber::init(); // console_subscriber::init();
let (event_loop, event_loop_join_handle) = let event_loop = EventLoop::new().expect("Couldn't create server socket");
EventLoop::new().expect("Couldn't create server socket");
info!("Init event loop"); info!("Init event loop");
info!( info!(
"Stardust socket created at {}", "Stardust socket created at {}",
@@ -175,7 +173,6 @@ async fn event_loop(
biased; biased;
_ = tokio::signal::ctrl_c() => Ok(()), _ = tokio::signal::ctrl_c() => Ok(()),
_ = stop_rx => Ok(()), _ = stop_rx => Ok(()),
e = event_loop_join_handle => e?,
}; };
info!("Cleanly shut down event loop"); info!("Cleanly shut down event loop");

View File

@@ -192,23 +192,25 @@ impl InputHandler {
let handler = Arc::downgrade(&distance_link.handler); let handler = Arc::downgrade(&distance_link.handler);
if let Ok(data) = node.execute_remote_method("input", data) { if let Ok(data) = node.execute_remote_method("input", data) {
tokio::spawn(async move { let _ = tokio::task::Builder::new()
if let Ok(data) = data.await { .name("input capture")
if frame == FRAME.load(Ordering::Relaxed) { .spawn(async move {
let capture = flexbuffers::Reader::get_root(data.as_slice()) if let Ok(data) = data.await {
.and_then(|data| data.get_bool()) if frame == FRAME.load(Ordering::Relaxed) {
.unwrap_or(false); let capture = flexbuffers::Reader::get_root(data.as_slice())
.and_then(|data| data.get_bool())
.unwrap_or(false);
if let Some(method) = method.upgrade() { if let Some(method) = method.upgrade() {
if let Some(handler) = handler.upgrade() { if let Some(handler) = handler.upgrade() {
if capture { if capture {
method.captures.add_raw(&handler); method.captures.add_raw(&handler);
}
} }
} }
} }
} }
} });
});
} }
} }
} }

View File

@@ -123,29 +123,31 @@ impl Wayland {
let dh1 = display.lock().handle(); let dh1 = display.lock().handle();
let mut dh2 = dh1.clone(); let mut dh2 = dh1.clone();
Ok(tokio::task::spawn(async move { Ok(tokio::task::Builder::new()
let _socket = socket; // Keep the socket alive .name("wayland loop")
loop { .spawn(async move {
tokio::select! { let _socket = socket; // Keep the socket alive
e = global_destroy_queue.recv() => { // New global to destroy loop {
dh1.remove_global::<WaylandState>(e.unwrap()); tokio::select! {
} e = global_destroy_queue.recv() => { // New global to destroy
acc = listen_async.accept() => { // New client connected dh1.remove_global::<WaylandState>(e.unwrap());
let (stream, _) = acc?; }
let client = dh2.insert_client(stream.into_std()?, Arc::new(ClientState))?; acc = listen_async.accept() => { // New client connected
let (stream, _) = acc?;
let client = dh2.insert_client(stream.into_std()?, Arc::new(ClientState))?;
state.lock().new_client(client.id(), &dh2); state.lock().new_client(client.id(), &dh2);
} }
e = dispatch_poll_listener.readable() => { // Dispatch e = dispatch_poll_listener.readable() => { // Dispatch
let mut guard = e?; let mut guard = e?;
let mut display = display.lock(); let mut display = display.lock();
display.dispatch_clients(&mut *state.lock())?; display.dispatch_clients(&mut *state.lock())?;
display.flush_clients()?; display.flush_clients()?;
guard.clear_ready(); guard.clear_ready();
}
} }
} }
} })?)
}))
} }
pub fn frame(&mut self, sk: &StereoKitDraw) { pub fn frame(&mut self, sk: &StereoKitDraw) {

View File

@@ -148,7 +148,9 @@ impl SeatDataInner {
impl Drop for SeatDataInner { impl Drop for SeatDataInner {
fn drop(&mut self) { fn drop(&mut self) {
let id = self.global_id.take().unwrap(); let id = self.global_id.take().unwrap();
tokio::spawn(async move { GLOBAL_DESTROY_QUEUE.get().unwrap().send(id).await }); let _ = tokio::task::Builder::new()
.name("global destroy queue garbage collection")
.spawn(async move { GLOBAL_DESTROY_QUEUE.get().unwrap().send(id).await });
} }
} }