fix(event loop, client): better async
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use super::{eventloop::EventLoop, scenegraph::Scenegraph};
|
||||
use super::scenegraph::Scenegraph;
|
||||
use crate::{
|
||||
core::registry::Registry,
|
||||
core::registry::OwnedRegistry,
|
||||
nodes::{
|
||||
data, drawable, fields, hmd, input, items,
|
||||
root::Root,
|
||||
@@ -9,35 +9,26 @@ use crate::{
|
||||
Node,
|
||||
},
|
||||
};
|
||||
use color_eyre::{
|
||||
eyre::{eyre, Result},
|
||||
Report,
|
||||
};
|
||||
use color_eyre::eyre::{eyre, Result};
|
||||
use lazy_static::lazy_static;
|
||||
use once_cell::sync::OnceCell;
|
||||
use parking_lot::Mutex;
|
||||
use rustc_hash::FxHashMap;
|
||||
use stardust_xr::messenger::{self, MessageSenderHandle};
|
||||
use std::{
|
||||
fs,
|
||||
iter::FromIterator,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Weak},
|
||||
};
|
||||
use tokio::{net::UnixStream, sync::Notify, task::JoinHandle};
|
||||
use tracing::{info, warn};
|
||||
use std::{fs, iter::FromIterator, path::PathBuf, sync::Arc};
|
||||
use tokio::{net::UnixStream, task::JoinHandle};
|
||||
use tracing::info;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref CLIENTS: Registry<Client> = Registry::new();
|
||||
pub static ref CLIENTS: OwnedRegistry<Client> = OwnedRegistry::new();
|
||||
pub static ref INTERNAL_CLIENT: Arc<Client> = CLIENTS.add(Client {
|
||||
event_loop: Weak::new(),
|
||||
index: 0,
|
||||
pid: None,
|
||||
// env: None,
|
||||
exe: None,
|
||||
|
||||
stop_notifier: Default::default(),
|
||||
join_handle: OnceCell::new(),
|
||||
dispatch_join_handle: OnceCell::new(),
|
||||
flush_join_handle: OnceCell::new(),
|
||||
disconnect_status: OnceCell::new(),
|
||||
|
||||
message_sender_handle: None,
|
||||
scenegraph: Default::default(),
|
||||
@@ -61,13 +52,12 @@ pub fn startup_settings(env: &FxHashMap<String, String>) -> Option<StartupSettin
|
||||
}
|
||||
|
||||
pub struct Client {
|
||||
event_loop: Weak<EventLoop>,
|
||||
index: usize,
|
||||
pid: Option<i32>,
|
||||
// env: Option<FxHashMap<String, String>>,
|
||||
exe: Option<PathBuf>,
|
||||
stop_notifier: Arc<Notify>,
|
||||
join_handle: OnceCell<JoinHandle<Result<()>>>,
|
||||
dispatch_join_handle: OnceCell<JoinHandle<Result<()>>>,
|
||||
flush_join_handle: OnceCell<JoinHandle<Result<()>>>,
|
||||
disconnect_status: OnceCell<Result<()>>,
|
||||
|
||||
pub message_sender_handle: Option<MessageSenderHandle>,
|
||||
pub scenegraph: Arc<Scenegraph>,
|
||||
@@ -76,16 +66,11 @@ pub struct Client {
|
||||
pub startup_settings: Option<StartupSettings>,
|
||||
}
|
||||
impl Client {
|
||||
pub fn from_connection(
|
||||
index: usize,
|
||||
event_loop: &Arc<EventLoop>,
|
||||
connection: UnixStream,
|
||||
) -> Arc<Self> {
|
||||
pub fn from_connection(connection: UnixStream) -> Arc<Self> {
|
||||
let pid = connection.peer_cred().ok().and_then(|c| c.pid());
|
||||
let env = pid.and_then(|pid| get_env(pid).ok());
|
||||
let exe = pid.and_then(|pid| fs::read_link(format!("/proc/{}/exe", pid)).ok());
|
||||
info!(
|
||||
index = index,
|
||||
pid,
|
||||
exe = exe
|
||||
.as_ref()
|
||||
@@ -98,13 +83,13 @@ impl Client {
|
||||
let startup_settings = env.as_ref().and_then(|env| startup_settings(env));
|
||||
|
||||
let client = CLIENTS.add(Client {
|
||||
event_loop: Arc::downgrade(event_loop),
|
||||
index,
|
||||
pid,
|
||||
// env,
|
||||
exe,
|
||||
stop_notifier: Default::default(),
|
||||
join_handle: OnceCell::new(),
|
||||
exe: exe.clone(),
|
||||
|
||||
dispatch_join_handle: OnceCell::new(),
|
||||
flush_join_handle: OnceCell::new(),
|
||||
disconnect_status: OnceCell::new(),
|
||||
|
||||
message_sender_handle: Some(messenger_tx.handle()),
|
||||
scenegraph: scenegraph.clone(),
|
||||
@@ -123,32 +108,57 @@ impl Client {
|
||||
input::create_interface(&client);
|
||||
startup::create_interface(&client);
|
||||
|
||||
let _ = client.join_handle.set(tokio::spawn({
|
||||
let client = client.clone();
|
||||
async move {
|
||||
let dispatch_loop = async move {
|
||||
loop {
|
||||
messenger_rx.dispatch(&*scenegraph).await?
|
||||
let pid_printable = pid
|
||||
.map(|pid| pid.to_string())
|
||||
.unwrap_or_else(|| "??".to_string());
|
||||
let exe_printable = exe
|
||||
.and_then(|exe| {
|
||||
exe.file_name()
|
||||
.and_then(|exe| exe.to_str())
|
||||
.map(|exe| exe.to_string())
|
||||
})
|
||||
.unwrap_or_else(|| "??".to_string());
|
||||
let _ = client.dispatch_join_handle.get_or_try_init(|| {
|
||||
tokio::task::Builder::new()
|
||||
.name(&format!(
|
||||
"client dispatch pid={} exe={}",
|
||||
&pid_printable, &exe_printable,
|
||||
))
|
||||
.spawn({
|
||||
let client = client.clone();
|
||||
async move {
|
||||
loop {
|
||||
match messenger_rx.dispatch(&*scenegraph).await {
|
||||
Err(e) => {
|
||||
client.disconnect(Err(e.into()));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let flush_loop = async {
|
||||
loop {
|
||||
messenger_tx.flush().await?
|
||||
})
|
||||
});
|
||||
let _ = client.flush_join_handle.get_or_try_init(|| {
|
||||
tokio::task::Builder::new()
|
||||
.name(&format!(
|
||||
"client flush pid={} exe={}",
|
||||
&pid_printable, &exe_printable,
|
||||
))
|
||||
.spawn({
|
||||
let client = client.clone();
|
||||
async move {
|
||||
loop {
|
||||
match messenger_tx.flush().await {
|
||||
Err(e) => {
|
||||
client.disconnect(Err(e.into()));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
})
|
||||
});
|
||||
|
||||
let result: Result<(), Report> = tokio::select! {
|
||||
_ = client.stop_notifier.notified() => Ok(()),
|
||||
e = dispatch_loop => e,
|
||||
e = flush_loop => e,
|
||||
};
|
||||
if let Err(e) = &result {
|
||||
warn!(error = e.root_cause(), "Client disconnected with error!");
|
||||
}
|
||||
client.disconnect().await;
|
||||
result
|
||||
}
|
||||
}));
|
||||
client
|
||||
}
|
||||
|
||||
@@ -159,24 +169,30 @@ impl Client {
|
||||
.ok_or_else(|| eyre!("{} not found", name))
|
||||
}
|
||||
|
||||
pub async fn disconnect(&self) {
|
||||
self.stop_notifier.notify_one();
|
||||
if let Some(event_loop) = self.event_loop.upgrade() {
|
||||
event_loop.clients.lock().await.remove(self.index);
|
||||
pub fn disconnect(&self, reason: Result<()>) {
|
||||
let _ = self.disconnect_status.set(reason);
|
||||
if let Some(dispatch_join_handle) = self.dispatch_join_handle.get() {
|
||||
dispatch_join_handle.abort();
|
||||
}
|
||||
if let Some(flush_join_handle) = self.flush_join_handle.get() {
|
||||
flush_join_handle.abort();
|
||||
}
|
||||
CLIENTS.remove(self);
|
||||
}
|
||||
}
|
||||
impl Drop for Client {
|
||||
fn drop(&mut self) {
|
||||
self.stop_notifier.notify_one();
|
||||
CLIENTS.remove(self);
|
||||
info!(
|
||||
index = self.index,
|
||||
pid = self.pid,
|
||||
exe = self
|
||||
.exe
|
||||
.as_ref()
|
||||
.and_then(|exe| exe.to_str().map(|s| s.to_string())),
|
||||
disconnect_status = match self.disconnect_status.take() {
|
||||
Some(Ok(_)) => "Graceful disconnect".to_string(),
|
||||
Some(Err(e)) => format!("Error: {}", e.root_cause()),
|
||||
None => "Unknown".to_string(),
|
||||
},
|
||||
"Client disconnected"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,62 +1,49 @@
|
||||
use super::client::Client;
|
||||
use color_eyre::eyre::Result;
|
||||
use slab::Slab;
|
||||
use once_cell::sync::OnceCell;
|
||||
use stardust_xr::server;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UnixListener;
|
||||
use tokio::sync::{Mutex, Notify};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub static FRAME: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
pub struct EventLoop {
|
||||
pub socket_path: PathBuf,
|
||||
stop_notifier: Arc<Notify>,
|
||||
pub clients: Mutex<Slab<Arc<Client>>>,
|
||||
join_handle: OnceCell<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl EventLoop {
|
||||
pub fn new() -> Result<(Arc<Self>, JoinHandle<Result<()>>)> {
|
||||
pub fn new() -> Result<Arc<Self>> {
|
||||
let socket_path = server::get_free_socket_path()
|
||||
.ok_or_else(|| std::io::Error::from(std::io::ErrorKind::Other))?;
|
||||
let socket = UnixListener::bind(socket_path.clone())?;
|
||||
|
||||
let event_loop = Arc::new(EventLoop {
|
||||
socket_path,
|
||||
stop_notifier: Default::default(),
|
||||
clients: Mutex::new(Slab::new()),
|
||||
join_handle: OnceCell::new(),
|
||||
});
|
||||
|
||||
let event_loop_join_handle = tokio::spawn({
|
||||
let event_loop = event_loop.clone();
|
||||
async move { EventLoop::event_loop(socket, event_loop).await }
|
||||
});
|
||||
let join_handle = tokio::task::Builder::new()
|
||||
.name("event loop")
|
||||
.spawn(async move {
|
||||
loop {
|
||||
let Ok((socket, _)) = socket.accept().await else { continue };
|
||||
Client::from_connection(socket);
|
||||
}
|
||||
})?;
|
||||
let _ = event_loop.join_handle.set(join_handle);
|
||||
|
||||
Ok((event_loop, event_loop_join_handle))
|
||||
}
|
||||
|
||||
async fn event_loop(socket: UnixListener, event_loop: Arc<EventLoop>) -> Result<()> {
|
||||
let event_loop_async = async {
|
||||
loop {
|
||||
let (socket, _) = socket.accept().await?;
|
||||
let mut clients = event_loop.clients.lock().await;
|
||||
let vacant_client = clients.vacant_entry();
|
||||
let idx = vacant_client.key();
|
||||
vacant_client.insert(Client::from_connection(idx, &event_loop, socket));
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = event_loop.stop_notifier.notified() => Ok(()),
|
||||
e = event_loop_async => e,
|
||||
}
|
||||
Ok(event_loop)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EventLoop {
|
||||
fn drop(&mut self) {
|
||||
self.stop_notifier.notify_one();
|
||||
if let Some(join_handle) = self.join_handle.take() {
|
||||
join_handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,3 +42,35 @@ impl<T: Send + Sync + ?Sized> Registry<T> {
|
||||
self.0.lock().clear();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OwnedRegistry<T: Send + Sync + ?Sized>(Lazy<Mutex<FxHashMap<usize, Arc<T>>>>);
|
||||
|
||||
impl<T: Send + Sync + ?Sized> OwnedRegistry<T> {
|
||||
pub const fn new() -> Self {
|
||||
OwnedRegistry(Lazy::new(|| Mutex::new(FxHashMap::default())))
|
||||
}
|
||||
pub fn add(&self, t: T) -> Arc<T>
|
||||
where
|
||||
T: Sized,
|
||||
{
|
||||
let t_arc = Arc::new(t);
|
||||
self.add_raw(t_arc.clone());
|
||||
t_arc
|
||||
}
|
||||
pub fn add_raw(&self, t: Arc<T>) {
|
||||
self.0
|
||||
.lock()
|
||||
.insert(Arc::as_ptr(&t) as *const () as usize, t);
|
||||
}
|
||||
pub fn get_vec(&self) -> Vec<Arc<T>> {
|
||||
self.0.lock().values().cloned().collect::<Vec<_>>()
|
||||
}
|
||||
pub fn remove(&self, t: &T) {
|
||||
self.0
|
||||
.lock()
|
||||
.remove(&(ptr::addr_of!(*t) as *const () as usize));
|
||||
}
|
||||
pub fn clear(&self) {
|
||||
self.0.lock().clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,9 +42,8 @@ struct CliArgs {
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
if !cfg!(feature = "profile") {
|
||||
tracing_subscriber::fmt::init();
|
||||
}
|
||||
#[cfg(not(feature = "profile"))]
|
||||
tracing_subscriber::fmt::init();
|
||||
#[cfg(feature = "profile")]
|
||||
console_subscriber::init();
|
||||
let project_dirs = ProjectDirs::from("", "", "stardust").unwrap();
|
||||
@@ -163,8 +162,7 @@ async fn event_loop(
|
||||
let _ = handle_sender.send(Handle::current());
|
||||
// console_subscriber::init();
|
||||
|
||||
let (event_loop, event_loop_join_handle) =
|
||||
EventLoop::new().expect("Couldn't create server socket");
|
||||
let event_loop = EventLoop::new().expect("Couldn't create server socket");
|
||||
info!("Init event loop");
|
||||
info!(
|
||||
"Stardust socket created at {}",
|
||||
@@ -175,7 +173,6 @@ async fn event_loop(
|
||||
biased;
|
||||
_ = tokio::signal::ctrl_c() => Ok(()),
|
||||
_ = stop_rx => Ok(()),
|
||||
e = event_loop_join_handle => e?,
|
||||
};
|
||||
|
||||
info!("Cleanly shut down event loop");
|
||||
|
||||
@@ -192,23 +192,25 @@ impl InputHandler {
|
||||
let handler = Arc::downgrade(&distance_link.handler);
|
||||
|
||||
if let Ok(data) = node.execute_remote_method("input", data) {
|
||||
tokio::spawn(async move {
|
||||
if let Ok(data) = data.await {
|
||||
if frame == FRAME.load(Ordering::Relaxed) {
|
||||
let capture = flexbuffers::Reader::get_root(data.as_slice())
|
||||
.and_then(|data| data.get_bool())
|
||||
.unwrap_or(false);
|
||||
let _ = tokio::task::Builder::new()
|
||||
.name("input capture")
|
||||
.spawn(async move {
|
||||
if let Ok(data) = data.await {
|
||||
if frame == FRAME.load(Ordering::Relaxed) {
|
||||
let capture = flexbuffers::Reader::get_root(data.as_slice())
|
||||
.and_then(|data| data.get_bool())
|
||||
.unwrap_or(false);
|
||||
|
||||
if let Some(method) = method.upgrade() {
|
||||
if let Some(handler) = handler.upgrade() {
|
||||
if capture {
|
||||
method.captures.add_raw(&handler);
|
||||
if let Some(method) = method.upgrade() {
|
||||
if let Some(handler) = handler.upgrade() {
|
||||
if capture {
|
||||
method.captures.add_raw(&handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,29 +123,31 @@ impl Wayland {
|
||||
let dh1 = display.lock().handle();
|
||||
let mut dh2 = dh1.clone();
|
||||
|
||||
Ok(tokio::task::spawn(async move {
|
||||
let _socket = socket; // Keep the socket alive
|
||||
loop {
|
||||
tokio::select! {
|
||||
e = global_destroy_queue.recv() => { // New global to destroy
|
||||
dh1.remove_global::<WaylandState>(e.unwrap());
|
||||
}
|
||||
acc = listen_async.accept() => { // New client connected
|
||||
let (stream, _) = acc?;
|
||||
let client = dh2.insert_client(stream.into_std()?, Arc::new(ClientState))?;
|
||||
Ok(tokio::task::Builder::new()
|
||||
.name("wayland loop")
|
||||
.spawn(async move {
|
||||
let _socket = socket; // Keep the socket alive
|
||||
loop {
|
||||
tokio::select! {
|
||||
e = global_destroy_queue.recv() => { // New global to destroy
|
||||
dh1.remove_global::<WaylandState>(e.unwrap());
|
||||
}
|
||||
acc = listen_async.accept() => { // New client connected
|
||||
let (stream, _) = acc?;
|
||||
let client = dh2.insert_client(stream.into_std()?, Arc::new(ClientState))?;
|
||||
|
||||
state.lock().new_client(client.id(), &dh2);
|
||||
}
|
||||
e = dispatch_poll_listener.readable() => { // Dispatch
|
||||
let mut guard = e?;
|
||||
let mut display = display.lock();
|
||||
display.dispatch_clients(&mut *state.lock())?;
|
||||
display.flush_clients()?;
|
||||
guard.clear_ready();
|
||||
state.lock().new_client(client.id(), &dh2);
|
||||
}
|
||||
e = dispatch_poll_listener.readable() => { // Dispatch
|
||||
let mut guard = e?;
|
||||
let mut display = display.lock();
|
||||
display.dispatch_clients(&mut *state.lock())?;
|
||||
display.flush_clients()?;
|
||||
guard.clear_ready();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
})?)
|
||||
}
|
||||
|
||||
pub fn frame(&mut self, sk: &StereoKitDraw) {
|
||||
|
||||
@@ -148,7 +148,9 @@ impl SeatDataInner {
|
||||
impl Drop for SeatDataInner {
|
||||
fn drop(&mut self) {
|
||||
let id = self.global_id.take().unwrap();
|
||||
tokio::spawn(async move { GLOBAL_DESTROY_QUEUE.get().unwrap().send(id).await });
|
||||
let _ = tokio::task::Builder::new()
|
||||
.name("global destroy queue garbage collection")
|
||||
.spawn(async move { GLOBAL_DESTROY_QUEUE.get().unwrap().send(id).await });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user