diff --git a/Cargo.toml b/Cargo.toml index d4ba83e..6743570 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,6 @@ edition = "2018" name = "stardust-xr" version = "0.9.0" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] anyhow = "1.0.57" clap = { version = "3.1.6", features = ["derive"] } @@ -15,7 +13,6 @@ flexbuffers = "2.0.0" glam = {version = "0.20.5", features = ["mint"]} lazy_static = "1.4.0" mint = "0.5.9" -mio = {version = "0.8.3", features = ["net", "os-poll", "os-ext"]} nanoid = "0.4.0" once_cell = "1.12.0" parking_lot = "0.12.1" @@ -23,12 +20,13 @@ portable-atomic = {version = "0.3.0", features = ["float", "std"]} rccell = "0.1.3" rustc-hash = "1.1.0" slab = "0.4.6" +tokio = { version = "1", features = ["full"] } thiserror = "1.0.31" [dependencies.libstardustxr] path = "../libstardustxr-rs" -[dependencies.stereokit-rs] +[dependencies.stereokit] path = "../stereokit-rs" default-features = false features = ["linux-egl"] diff --git a/src/core/client.rs b/src/core/client.rs index 128239d..4b08bfd 100644 --- a/src/core/client.rs +++ b/src/core/client.rs @@ -1,3 +1,4 @@ +use super::eventloop::EventLoop; use super::scenegraph::Scenegraph; use crate::nodes::data; use crate::nodes::field; @@ -5,32 +6,52 @@ use crate::nodes::input; use crate::nodes::item; use crate::nodes::root::Root; use crate::nodes::spatial; +use anyhow::Result; use lazy_static::lazy_static; use libstardustxr::messenger::Messenger; -use mio::net::UnixStream; use once_cell::sync::OnceCell; -use std::sync::Arc; +use std::sync::{Arc, Weak}; +use tokio::net::UnixStream; +use tokio::sync::Notify; +use tokio::task::JoinHandle; lazy_static! { - pub static ref INTERNAL_CLIENT: Arc = Client::new_local(); + pub static ref INTERNAL_CLIENT: Arc = Arc::new(Client { + event_loop: Weak::new(), + index: 0, + + stop_notifier: Default::default(), + join_handle: OnceCell::new(), + + messenger: None, + scenegraph: Default::default(), + root: OnceCell::new(), + }); } pub struct Client { + event_loop: Weak, + index: usize, + stop_notifier: Arc, + join_handle: OnceCell>>, + pub messenger: Option, pub scenegraph: Scenegraph, pub root: OnceCell>, } impl Client { - pub fn new_local() -> Arc { - Arc::new(Client { - messenger: None, - scenegraph: Default::default(), - root: OnceCell::new(), - }) - } - pub fn from_connection(connection: UnixStream) -> Arc { + pub fn from_connection( + index: usize, + event_loop: &Arc, + connection: UnixStream, + ) -> Arc { println!("New client connected"); let client = Arc::new(Client { + event_loop: Arc::downgrade(event_loop), + index, + stop_notifier: Default::default(), + join_handle: OnceCell::new(), + messenger: Some(Messenger::new(connection)), scenegraph: Default::default(), root: OnceCell::new(), @@ -42,18 +63,44 @@ impl Client { data::create_interface(&client); item::create_interface(&client); input::create_interface(&client); + + let _ = client.join_handle.set(tokio::spawn({ + let client = client.clone(); + async move { + let dispatch_loop = async { + loop { + client.dispatch().await? + } + }; + + let result = tokio::select! { + _ = client.stop_notifier.notified() => Ok(()), + e = dispatch_loop => e, + }; + client.disconnect().await; + result + } + })); client } - pub fn dispatch(&self) -> Result<(), std::io::Error> { - if let Some(messenger) = &self.messenger { - messenger.dispatch(&self.scenegraph) - } else { - Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) + + pub async fn dispatch(&self) -> Result<(), std::io::Error> { + match &self.messenger { + Some(messenger) => messenger.dispatch(&self.scenegraph).await, + None => Err(std::io::Error::from(std::io::ErrorKind::Unsupported)), + } + } + + pub async fn disconnect(&self) { + self.stop_notifier.notify_one(); + if let Some(event_loop) = self.event_loop.upgrade() { + event_loop.clients.lock().await.remove(self.index); } } } impl Drop for Client { fn drop(&mut self) { + self.stop_notifier.notify_one(); println!("Client disconnected"); } } diff --git a/src/core/eventloop.rs b/src/core/eventloop.rs index 0a3ab95..fca80cd 100644 --- a/src/core/eventloop.rs +++ b/src/core/eventloop.rs @@ -1,143 +1,64 @@ use super::client::Client; use anyhow::Result; use libstardustxr::server; -use mio::net::UnixListener; -use mio::unix::pipe; -use mio::{Events, Interest, Poll, Token}; use slab::Slab; -use std::io::Write; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use std::thread::{self, JoinHandle}; +use tokio::net::UnixListener; +use tokio::sync::{Mutex, Notify, OnceCell}; +use tokio::task::JoinHandle; pub static FRAME: AtomicU64 = AtomicU64::new(0); pub struct EventLoop { pub socket_path: String, - join_handle: Option>>, - stop_write: pipe::Sender, + stop_notifier: Arc, + pub clients: Mutex>>>, } impl EventLoop { - pub fn new(timeout: Option) -> Result { + pub fn new() -> Result<(Arc, JoinHandle>)> { let socket_path = server::get_free_socket_path() .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::Other))?; - let (sender, receiver) = pipe::new()?; - let socket_path_captured = socket_path.clone(); - let join_handle = thread::Builder::new() - .name("event_loop".to_owned()) - .spawn(move || EventLoop::run_loop(timeout, socket_path_captured, receiver)) - .ok(); - Ok(EventLoop { + let socket = UnixListener::bind(socket_path.clone())?; + + let event_loop = Arc::new(EventLoop { socket_path, - join_handle, - stop_write: sender, - }) + stop_notifier: Default::default(), + clients: Mutex::new(Slab::new()), + }); + + let event_loop_join_handle = tokio::spawn({ + let event_loop = event_loop.clone(); + async move { EventLoop::event_loop(socket, event_loop).await } + }); + + Ok((event_loop, event_loop_join_handle)) } - fn run_loop( - timeout: Option, - socket_path: String, - stop_receiver: pipe::Receiver, - ) -> Result<()> { - let mut stop_receiver = stop_receiver; - let mut socket = UnixListener::bind(socket_path)?; - let mut clients: Slab>> = Slab::new(); - let mut poll = Poll::new()?; - let mut events = Events::with_capacity(1024); - const LISTENER: Token = Token(usize::MAX - 1); - poll.registry() - .register(&mut socket, LISTENER, Interest::READABLE)?; - const STOP: Token = Token(usize::MAX); - poll.registry() - .register(&mut stop_receiver, STOP, Interest::READABLE)?; - 'event_loop: loop { - match poll.poll(&mut events, timeout) { - Err(e) => { - if e.kind() == std::io::ErrorKind::Interrupted { - continue 'event_loop; - } - } - _ => {} - } - for event in &events { - match event.token() { - LISTENER => EventLoop::accept_client(&socket, &mut clients, &poll)?, - STOP => break 'event_loop, - token => EventLoop::handle_client_message(token.0, &mut clients)?, - } - } - } - - clients.clear(); // for better log messages - println!("Event loop gracefully finished"); - Ok(()) - } - - fn accept_client( - socket: &UnixListener, - clients: &mut Slab>>, - poll: &Poll, - ) -> Result<()> { - loop { - match socket.accept() { - Ok((mut socket, _)) => { - let client_number = clients.insert(None); - poll.registry().register( - &mut socket, - Token(client_number), - Interest::READABLE, - )?; - let client = Client::from_connection(socket); - *clients.get_mut(client_number).unwrap() = Some(client); - } - Err(e) => match e.kind() { - std::io::ErrorKind::WouldBlock => break, - _ => return Err(e.into()), - }, - } - } - Ok(()) - } - - fn handle_client_message( - client_id: usize, - clients: &mut Slab>>, - ) -> Result<()> { - let client = clients.get(client_id).and_then(|client| client.as_ref()); - if let Some(client) = client { + async fn event_loop(socket: UnixListener, event_loop: Arc) -> Result<()> { + let event_loop_async = async { loop { - let dispatch_result = client.dispatch(); - match dispatch_result { - Ok(_) => continue, - Err(e) => match e.kind() { - std::io::ErrorKind::WouldBlock => break, - std::io::ErrorKind::Interrupted => continue, - _ => { - clients.remove(client_id); - break; - } - }, - } + let (socket, _) = socket.accept().await?; + let mut clients = event_loop.clients.lock().await; + let idx = clients.insert(OnceCell::new()); + let _ = clients.get(idx).unwrap().set(Client::from_connection( + idx, + &event_loop, + socket, + )); } + }; + + tokio::select! { + _ = event_loop.stop_notifier.notified() => Ok(()), + e = event_loop_async => e, } - Ok(()) } } impl Drop for EventLoop { fn drop(&mut self) { - let buf: [u8; 1] = [1; 1]; - let _ = self.stop_write.write(buf.as_slice()); - if let Some(handle) = self.join_handle.take() { - match handle.join() { - Ok(r) => { - if let Err(e) = r { - eprintln!("Event loop error: {}", e); - } - } - Err(e) => eprintln!("Event loop failed to rejoin with error: {:#?}", e), - } - } + self.stop_notifier.notify_one(); } } diff --git a/src/main.rs b/src/main.rs index 40aec9d..12012b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,11 +2,15 @@ mod core; mod nodes; use self::core::eventloop::EventLoop; -use anyhow::{ensure, Result}; +use anyhow::Result; use clap::Parser; -use stereokit_rs as sk; -use stereokit_rs::enums::DisplayMode; -use stereokit_rs::functions::*; +use once_cell::sync::Lazy; +use parking_lot::Mutex; +use std::sync::Arc; +use stereokit::{lifecycle::DisplayMode, Settings}; +use tokio::runtime::Handle; + +static TOKIO_HANDLE: Lazy>> = Lazy::new(Default::default); #[derive(Parser)] #[clap(author, version, about, long_about = None)] @@ -14,32 +18,64 @@ struct CliArgs { /// Force flatscreen mode and use the mouse pointer as a 3D pointer #[clap(short, action)] flatscreen: bool, + + /// Run Stardust XR as an overlay + #[clap(short, action)] + overlay: bool, } fn main() -> Result<()> { - let cli_args = CliArgs::parse(); - ctrlc::set_handler(sk_quit).expect("Error setting Ctrl-C handler"); + let cli_args = Arc::new(CliArgs::parse()); - let mut init_settings = SKSettings::default().app_name("Stardust XR"); + let mut init_settings = Settings::default() + .app_name("Stardust XR") + .overlay_app(cli_args.overlay) + .overlay_priority(u32::MAX); if cli_args.flatscreen { init_settings = init_settings.display_preference(DisplayMode::Flatscreen); } - ensure!(init_settings.init(), "StereoKit failed to initialize"); + let stereokit = init_settings + .init() + .expect("StereoKit failed to initialize"); - let event_loop = EventLoop::new(None).expect("Couldn't create server socket"); - println!("Stardust socket created at {}", event_loop.socket_path); + let event_thread = std::thread::Builder::new() + .name("event_loop".to_owned()) + .spawn(event_loop)?; - let mut previous_time = 0_f64; - sk_run( - &mut Box::new(&mut move || { - let current_time = unsafe { sk::sys::time_get() }; - nodes::root::Root::logic_step(current_time - previous_time); - previous_time = current_time; - }), - &mut Box::new(&mut || { - println!("Shutting down..."); - }), + stereokit.run( + |_draw_ctx| { + nodes::root::Root::logic_step(stereokit.time_elapsed()); + }, + || { + println!("Shut down StereoKit"); + }, ); - + event_thread + .join() + .expect("Failed to cleanly shut down event loop")?; + println!("Cleanly shut down Stardust"); Ok(()) } + +#[tokio::main] +async fn event_loop() -> anyhow::Result<()> { + TOKIO_HANDLE.lock().replace(Handle::current()); + + let (event_loop, event_loop_join_handle) = + EventLoop::new().expect("Couldn't create server socket"); + println!("Init event loop"); + println!("Stardust socket created at {}", event_loop.socket_path); + + let result = tokio::select! { + biased; + _ = tokio::signal::ctrl_c() => Ok(()), + // e = task => e?, + e = event_loop_join_handle => e?, + }; + + unsafe { + stereokit::sys::sk_quit(); + } + + result +} diff --git a/src/nodes/core.rs b/src/nodes/core.rs index 17aa5b1..baff62c 100644 --- a/src/nodes/core.rs +++ b/src/nodes/core.rs @@ -5,6 +5,7 @@ use super::item::{Item, ItemAcceptor, ItemUI}; use super::spatial::Spatial; use crate::core::client::Client; use crate::core::registry::Registry; +use crate::TOKIO_HANDLE; use anyhow::{anyhow, Result}; use libstardustxr::scenegraph::ScenegraphError; use nanoid::nanoid; @@ -156,7 +157,8 @@ impl Node { } } pub fn send_remote_signal(&self, method: &str, data: &[u8]) -> Result<()> { - self.aliases + let _ = self + .aliases .get_valid_contents() .iter() .filter(|alias| alias.remote_signals.iter().any(|e| e == &method)) @@ -167,25 +169,28 @@ impl Node { .unwrap() .send_remote_signal(method, data); }); - self.get_client() - .messenger - .as_ref() - .ok_or_else(|| anyhow!("Node's client has no messenger"))? - .send_remote_signal(self.path.as_str(), method, data) - .map_err(|_| anyhow!("Unable to write in messenger")) + let client = self.get_client(); + let path = self.path.clone(); + let method = method.to_string(); + let data = data.to_vec(); + TOKIO_HANDLE.lock().as_ref().unwrap().spawn(async move { + if let Some(messenger) = client.messenger.as_ref() { + let _ = messenger + .send_remote_signal(path.as_str(), method.as_str(), data.as_slice()) + .await; + } + }); + Ok(()) } - pub fn execute_remote_method( - &self, - method: &str, - data: &[u8], - callback: Box, - ) -> Result<()> { - self.get_client() - .messenger - .as_ref() - .ok_or_else(|| anyhow!("Node's client has no messenger"))? - .execute_remote_method(self.path.as_str(), method, data, callback) - .map_err(|_| anyhow!("Unable to write in messenger")) + pub async fn execute_remote_method(&self, method: &str, data: Vec) -> Result> { + match self.get_client().messenger.as_ref() { + None => Err(anyhow!("Messenger does not exist for this node's client")), + Some(messenger) => { + messenger + .execute_remote_method(self.path.as_str(), method, &data) + .await + } + } } } diff --git a/src/nodes/input.rs b/src/nodes/input.rs index f2adaf4..04f6577 100644 --- a/src/nodes/input.rs +++ b/src/nodes/input.rs @@ -156,23 +156,23 @@ impl InputHandler { return; } - let serialized_data = distance_link.serialize(); + match distance_link.serialize() { + None => InputHandler::next_input(old_frame, distance_links), + Some(data) => { + let node = self.node.upgrade().unwrap(); - if let Some(data) = serialized_data { - let _ = self.node.upgrade().unwrap().execute_remote_method( - "input", - &data, - Box::new(move |data| { - let capture = flexbuffers::Reader::get_root(data) - .and_then(|data| data.get_bool()) - .unwrap_or(false); - if !distance_links.is_empty() && !capture { - InputHandler::next_input(old_frame, distance_links); + tokio::spawn(async move { + let data = node.execute_remote_method("input", data).await; + if let Ok(data) = data { + let capture = flexbuffers::Reader::get_root(data.as_slice()) + .and_then(|data| data.get_bool()) + .unwrap_or(false); + if !distance_links.is_empty() && !capture { + InputHandler::next_input(old_frame, distance_links); + } } - }), - ); - } else { - InputHandler::next_input(old_frame, distance_links); + }); + } } }