From 6ca93ea24c32127436444bd43284e3c55b1d1cbb Mon Sep 17 00:00:00 2001 From: Nova Date: Sat, 14 Jan 2023 12:38:05 -0500 Subject: [PATCH] fix(event loop, client): better async --- src/core/client.rs | 144 +++++++++++++++++++++++------------------ src/core/eventloop.rs | 47 +++++--------- src/core/registry.rs | 32 +++++++++ src/main.rs | 9 +-- src/nodes/input/mod.rs | 26 ++++---- src/wayland/mod.rs | 42 ++++++------ src/wayland/seat.rs | 4 +- 7 files changed, 171 insertions(+), 133 deletions(-) diff --git a/src/core/client.rs b/src/core/client.rs index 20b156b..df4f64d 100644 --- a/src/core/client.rs +++ b/src/core/client.rs @@ -1,6 +1,6 @@ -use super::{eventloop::EventLoop, scenegraph::Scenegraph}; +use super::scenegraph::Scenegraph; use crate::{ - core::registry::Registry, + core::registry::OwnedRegistry, nodes::{ data, drawable, fields, hmd, input, items, root::Root, @@ -9,35 +9,26 @@ use crate::{ Node, }, }; -use color_eyre::{ - eyre::{eyre, Result}, - Report, -}; +use color_eyre::eyre::{eyre, Result}; use lazy_static::lazy_static; use once_cell::sync::OnceCell; use parking_lot::Mutex; use rustc_hash::FxHashMap; use stardust_xr::messenger::{self, MessageSenderHandle}; -use std::{ - fs, - iter::FromIterator, - path::PathBuf, - sync::{Arc, Weak}, -}; -use tokio::{net::UnixStream, sync::Notify, task::JoinHandle}; -use tracing::{info, warn}; +use std::{fs, iter::FromIterator, path::PathBuf, sync::Arc}; +use tokio::{net::UnixStream, task::JoinHandle}; +use tracing::info; lazy_static! { - pub static ref CLIENTS: Registry = Registry::new(); + pub static ref CLIENTS: OwnedRegistry = OwnedRegistry::new(); pub static ref INTERNAL_CLIENT: Arc = CLIENTS.add(Client { - event_loop: Weak::new(), - index: 0, pid: None, // env: None, exe: None, - stop_notifier: Default::default(), - join_handle: OnceCell::new(), + dispatch_join_handle: OnceCell::new(), + flush_join_handle: OnceCell::new(), + disconnect_status: OnceCell::new(), message_sender_handle: None, scenegraph: Default::default(), @@ -61,13 +52,12 @@ pub fn startup_settings(env: &FxHashMap) -> Option, - index: usize, pid: Option, // env: Option>, exe: Option, - stop_notifier: Arc, - join_handle: OnceCell>>, + dispatch_join_handle: OnceCell>>, + flush_join_handle: OnceCell>>, + disconnect_status: OnceCell>, pub message_sender_handle: Option, pub scenegraph: Arc, @@ -76,16 +66,11 @@ pub struct Client { pub startup_settings: Option, } impl Client { - pub fn from_connection( - index: usize, - event_loop: &Arc, - connection: UnixStream, - ) -> Arc { + pub fn from_connection(connection: UnixStream) -> Arc { let pid = connection.peer_cred().ok().and_then(|c| c.pid()); let env = pid.and_then(|pid| get_env(pid).ok()); let exe = pid.and_then(|pid| fs::read_link(format!("/proc/{}/exe", pid)).ok()); info!( - index = index, pid, exe = exe .as_ref() @@ -98,13 +83,13 @@ impl Client { let startup_settings = env.as_ref().and_then(|env| startup_settings(env)); let client = CLIENTS.add(Client { - event_loop: Arc::downgrade(event_loop), - index, pid, // env, - exe, - stop_notifier: Default::default(), - join_handle: OnceCell::new(), + exe: exe.clone(), + + dispatch_join_handle: OnceCell::new(), + flush_join_handle: OnceCell::new(), + disconnect_status: OnceCell::new(), message_sender_handle: Some(messenger_tx.handle()), scenegraph: scenegraph.clone(), @@ -123,32 +108,57 @@ impl Client { input::create_interface(&client); startup::create_interface(&client); - let _ = client.join_handle.set(tokio::spawn({ - let client = client.clone(); - async move { - let dispatch_loop = async move { - loop { - messenger_rx.dispatch(&*scenegraph).await? + let pid_printable = pid + .map(|pid| pid.to_string()) + .unwrap_or_else(|| "??".to_string()); + let exe_printable = exe + .and_then(|exe| { + exe.file_name() + .and_then(|exe| exe.to_str()) + .map(|exe| exe.to_string()) + }) + .unwrap_or_else(|| "??".to_string()); + let _ = client.dispatch_join_handle.get_or_try_init(|| { + tokio::task::Builder::new() + .name(&format!( + "client dispatch pid={} exe={}", + &pid_printable, &exe_printable, + )) + .spawn({ + let client = client.clone(); + async move { + loop { + match messenger_rx.dispatch(&*scenegraph).await { + Err(e) => { + client.disconnect(Err(e.into())); + } + _ => (), + } + } } - }; - let flush_loop = async { - loop { - messenger_tx.flush().await? + }) + }); + let _ = client.flush_join_handle.get_or_try_init(|| { + tokio::task::Builder::new() + .name(&format!( + "client flush pid={} exe={}", + &pid_printable, &exe_printable, + )) + .spawn({ + let client = client.clone(); + async move { + loop { + match messenger_tx.flush().await { + Err(e) => { + client.disconnect(Err(e.into())); + } + _ => (), + } + } } - }; + }) + }); - let result: Result<(), Report> = tokio::select! { - _ = client.stop_notifier.notified() => Ok(()), - e = dispatch_loop => e, - e = flush_loop => e, - }; - if let Err(e) = &result { - warn!(error = e.root_cause(), "Client disconnected with error!"); - } - client.disconnect().await; - result - } - })); client } @@ -159,24 +169,30 @@ impl Client { .ok_or_else(|| eyre!("{} not found", name)) } - pub async fn disconnect(&self) { - self.stop_notifier.notify_one(); - if let Some(event_loop) = self.event_loop.upgrade() { - event_loop.clients.lock().await.remove(self.index); + pub fn disconnect(&self, reason: Result<()>) { + let _ = self.disconnect_status.set(reason); + if let Some(dispatch_join_handle) = self.dispatch_join_handle.get() { + dispatch_join_handle.abort(); } + if let Some(flush_join_handle) = self.flush_join_handle.get() { + flush_join_handle.abort(); + } + CLIENTS.remove(self); } } impl Drop for Client { fn drop(&mut self) { - self.stop_notifier.notify_one(); - CLIENTS.remove(self); info!( - index = self.index, pid = self.pid, exe = self .exe .as_ref() .and_then(|exe| exe.to_str().map(|s| s.to_string())), + disconnect_status = match self.disconnect_status.take() { + Some(Ok(_)) => "Graceful disconnect".to_string(), + Some(Err(e)) => format!("Error: {}", e.root_cause()), + None => "Unknown".to_string(), + }, "Client disconnected" ); } diff --git a/src/core/eventloop.rs b/src/core/eventloop.rs index b94486a..8a331f6 100644 --- a/src/core/eventloop.rs +++ b/src/core/eventloop.rs @@ -1,62 +1,49 @@ use super::client::Client; use color_eyre::eyre::Result; -use slab::Slab; +use once_cell::sync::OnceCell; use stardust_xr::server; use std::path::PathBuf; use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::net::UnixListener; -use tokio::sync::{Mutex, Notify}; use tokio::task::JoinHandle; pub static FRAME: AtomicU64 = AtomicU64::new(0); pub struct EventLoop { pub socket_path: PathBuf, - stop_notifier: Arc, - pub clients: Mutex>>, + join_handle: OnceCell>, } impl EventLoop { - pub fn new() -> Result<(Arc, JoinHandle>)> { + pub fn new() -> Result> { let socket_path = server::get_free_socket_path() .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::Other))?; let socket = UnixListener::bind(socket_path.clone())?; let event_loop = Arc::new(EventLoop { socket_path, - stop_notifier: Default::default(), - clients: Mutex::new(Slab::new()), + join_handle: OnceCell::new(), }); - let event_loop_join_handle = tokio::spawn({ - let event_loop = event_loop.clone(); - async move { EventLoop::event_loop(socket, event_loop).await } - }); + let join_handle = tokio::task::Builder::new() + .name("event loop") + .spawn(async move { + loop { + let Ok((socket, _)) = socket.accept().await else { continue }; + Client::from_connection(socket); + } + })?; + let _ = event_loop.join_handle.set(join_handle); - Ok((event_loop, event_loop_join_handle)) - } - - async fn event_loop(socket: UnixListener, event_loop: Arc) -> Result<()> { - let event_loop_async = async { - loop { - let (socket, _) = socket.accept().await?; - let mut clients = event_loop.clients.lock().await; - let vacant_client = clients.vacant_entry(); - let idx = vacant_client.key(); - vacant_client.insert(Client::from_connection(idx, &event_loop, socket)); - } - }; - - tokio::select! { - _ = event_loop.stop_notifier.notified() => Ok(()), - e = event_loop_async => e, - } + Ok(event_loop) } } impl Drop for EventLoop { fn drop(&mut self) { - self.stop_notifier.notify_one(); + if let Some(join_handle) = self.join_handle.take() { + join_handle.abort(); + } } } diff --git a/src/core/registry.rs b/src/core/registry.rs index db9743d..d0f0c02 100644 --- a/src/core/registry.rs +++ b/src/core/registry.rs @@ -42,3 +42,35 @@ impl Registry { self.0.lock().clear(); } } + +pub struct OwnedRegistry(Lazy>>>); + +impl OwnedRegistry { + pub const fn new() -> Self { + OwnedRegistry(Lazy::new(|| Mutex::new(FxHashMap::default()))) + } + pub fn add(&self, t: T) -> Arc + where + T: Sized, + { + let t_arc = Arc::new(t); + self.add_raw(t_arc.clone()); + t_arc + } + pub fn add_raw(&self, t: Arc) { + self.0 + .lock() + .insert(Arc::as_ptr(&t) as *const () as usize, t); + } + pub fn get_vec(&self) -> Vec> { + self.0.lock().values().cloned().collect::>() + } + pub fn remove(&self, t: &T) { + self.0 + .lock() + .remove(&(ptr::addr_of!(*t) as *const () as usize)); + } + pub fn clear(&self) { + self.0.lock().clear(); + } +} diff --git a/src/main.rs b/src/main.rs index 77d7948..c1f7d98 100644 --- a/src/main.rs +++ b/src/main.rs @@ -42,9 +42,8 @@ struct CliArgs { } fn main() -> Result<()> { - if !cfg!(feature = "profile") { - tracing_subscriber::fmt::init(); - } + #[cfg(not(feature = "profile"))] + tracing_subscriber::fmt::init(); #[cfg(feature = "profile")] console_subscriber::init(); let project_dirs = ProjectDirs::from("", "", "stardust").unwrap(); @@ -163,8 +162,7 @@ async fn event_loop( let _ = handle_sender.send(Handle::current()); // console_subscriber::init(); - let (event_loop, event_loop_join_handle) = - EventLoop::new().expect("Couldn't create server socket"); + let event_loop = EventLoop::new().expect("Couldn't create server socket"); info!("Init event loop"); info!( "Stardust socket created at {}", @@ -175,7 +173,6 @@ async fn event_loop( biased; _ = tokio::signal::ctrl_c() => Ok(()), _ = stop_rx => Ok(()), - e = event_loop_join_handle => e?, }; info!("Cleanly shut down event loop"); diff --git a/src/nodes/input/mod.rs b/src/nodes/input/mod.rs index 170c896..e69dbb4 100644 --- a/src/nodes/input/mod.rs +++ b/src/nodes/input/mod.rs @@ -192,23 +192,25 @@ impl InputHandler { let handler = Arc::downgrade(&distance_link.handler); if let Ok(data) = node.execute_remote_method("input", data) { - tokio::spawn(async move { - if let Ok(data) = data.await { - if frame == FRAME.load(Ordering::Relaxed) { - let capture = flexbuffers::Reader::get_root(data.as_slice()) - .and_then(|data| data.get_bool()) - .unwrap_or(false); + let _ = tokio::task::Builder::new() + .name("input capture") + .spawn(async move { + if let Ok(data) = data.await { + if frame == FRAME.load(Ordering::Relaxed) { + let capture = flexbuffers::Reader::get_root(data.as_slice()) + .and_then(|data| data.get_bool()) + .unwrap_or(false); - if let Some(method) = method.upgrade() { - if let Some(handler) = handler.upgrade() { - if capture { - method.captures.add_raw(&handler); + if let Some(method) = method.upgrade() { + if let Some(handler) = handler.upgrade() { + if capture { + method.captures.add_raw(&handler); + } } } } } - } - }); + }); } } } diff --git a/src/wayland/mod.rs b/src/wayland/mod.rs index 718ecf1..1a5f39f 100644 --- a/src/wayland/mod.rs +++ b/src/wayland/mod.rs @@ -123,29 +123,31 @@ impl Wayland { let dh1 = display.lock().handle(); let mut dh2 = dh1.clone(); - Ok(tokio::task::spawn(async move { - let _socket = socket; // Keep the socket alive - loop { - tokio::select! { - e = global_destroy_queue.recv() => { // New global to destroy - dh1.remove_global::(e.unwrap()); - } - acc = listen_async.accept() => { // New client connected - let (stream, _) = acc?; - let client = dh2.insert_client(stream.into_std()?, Arc::new(ClientState))?; + Ok(tokio::task::Builder::new() + .name("wayland loop") + .spawn(async move { + let _socket = socket; // Keep the socket alive + loop { + tokio::select! { + e = global_destroy_queue.recv() => { // New global to destroy + dh1.remove_global::(e.unwrap()); + } + acc = listen_async.accept() => { // New client connected + let (stream, _) = acc?; + let client = dh2.insert_client(stream.into_std()?, Arc::new(ClientState))?; - state.lock().new_client(client.id(), &dh2); - } - e = dispatch_poll_listener.readable() => { // Dispatch - let mut guard = e?; - let mut display = display.lock(); - display.dispatch_clients(&mut *state.lock())?; - display.flush_clients()?; - guard.clear_ready(); + state.lock().new_client(client.id(), &dh2); + } + e = dispatch_poll_listener.readable() => { // Dispatch + let mut guard = e?; + let mut display = display.lock(); + display.dispatch_clients(&mut *state.lock())?; + display.flush_clients()?; + guard.clear_ready(); + } } } - } - })) + })?) } pub fn frame(&mut self, sk: &StereoKitDraw) { diff --git a/src/wayland/seat.rs b/src/wayland/seat.rs index ad5b8b0..622575a 100644 --- a/src/wayland/seat.rs +++ b/src/wayland/seat.rs @@ -148,7 +148,9 @@ impl SeatDataInner { impl Drop for SeatDataInner { fn drop(&mut self) { let id = self.global_id.take().unwrap(); - tokio::spawn(async move { GLOBAL_DESTROY_QUEUE.get().unwrap().send(id).await }); + let _ = tokio::task::Builder::new() + .name("global destroy queue garbage collection") + .spawn(async move { GLOBAL_DESTROY_QUEUE.get().unwrap().send(id).await }); } }