From 4c70ded2b0b9da7d6eaeae147fa9336be36231df Mon Sep 17 00:00:00 2001 From: Nova Date: Thu, 15 May 2025 00:59:59 -0700 Subject: [PATCH] fix: io safety error --- src/wayland/mod.rs | 172 +++++++++++++++++++++------------------------ 1 file changed, 82 insertions(+), 90 deletions(-) diff --git a/src/wayland/mod.rs b/src/wayland/mod.rs index 656709a..f474075 100644 --- a/src/wayland/mod.rs +++ b/src/wayland/mod.rs @@ -13,29 +13,29 @@ use self::{state::WaylandState, surface::CORE_SURFACES}; use crate::{core::task, wayland::state::ClientState}; use color_eyre::eyre::{Result, ensure}; use parking_lot::Mutex; -use smithay::backend::allocator::dmabuf::Dmabuf; -use smithay::backend::egl::EGLContext; -use smithay::backend::renderer::gles::GlesRenderer; -use smithay::backend::renderer::{ImportDma, Renderer}; -use smithay::output::Output; -use smithay::reexports::wayland_server::DisplayHandle; -use smithay::reexports::wayland_server::backend::ClientId; -use smithay::reexports::wayland_server::{Display, ListeningSocket}; -use smithay::wayland::dmabuf; -use std::ffi::OsStr; -use std::os::fd::{IntoRawFd, OwnedFd}; -use std::os::unix::prelude::AsRawFd; -use std::sync::OnceLock; +use smithay::{ + backend::{ + allocator::dmabuf::Dmabuf, + egl::EGLContext, + renderer::{ImportDma, Renderer, gles::GlesRenderer}, + }, + output::Output, + reexports::wayland_server::{Display, DisplayHandle, ListeningSocket}, + wayland::dmabuf, +}; use std::{ - ffi::c_void, - os::unix::{net::UnixListener, prelude::FromRawFd}, - sync::Arc, + ffi::{OsStr, c_void}, + os::fd::AsFd, + sync::{Arc, OnceLock}, }; use stereokit_rust::system::{Backend, BackendGraphics}; -use tokio::io::unix::AsyncFdReadyGuard; -use tokio::sync::mpsc::UnboundedReceiver; use tokio::{ - io::unix::AsyncFd, net::UnixListener as AsyncUnixListener, sync::mpsc, task::JoinHandle, + io::unix::AsyncFd, + sync::{ + Notify, + mpsc::{self, UnboundedReceiver}, + }, + task::AbortHandle, }; use tracing::{debug_span, info, instrument}; @@ -61,39 +61,10 @@ fn get_sk_egl() -> Result { }) } -pub struct DisplayWrapper(Mutex>, DisplayHandle); -impl DisplayWrapper { - pub fn handle(&self) -> DisplayHandle { - self.1.clone() - } - pub fn dispatch_clients(&self, state: &mut WaylandState) -> Result { - self.0.lock().dispatch_clients(state) - } - pub fn flush_clients(&self, client: Option) { - if let Some(mut lock) = self.0.try_lock() { - let _ = lock.backend().flush(client); - } - } - pub fn poll_fd(&self) -> Result { - self.0.lock().backend().poll_fd().try_clone_to_owned() - } -} - -struct UnownedFd(Option>); -impl UnownedFd { - async fn readable(&self) -> std::io::Result> { - self.0.as_ref().unwrap().readable().await - } -} -impl Drop for UnownedFd { - fn drop(&mut self) { - let _ = self.0.take().unwrap().into_inner().into_raw_fd(); - } -} - pub struct Wayland { - display: Arc, - join_handle: JoinHandle>, + flush_notify: Arc, + client_listener: AbortHandle, + client_dispatcher: AbortHandle, renderer: GlesRenderer, output: Output, dmabuf_rx: UnboundedReceiver<(Dmabuf, Option)>, @@ -113,7 +84,6 @@ impl Wayland { let display_handle = display.handle(); let (dmabuf_tx, dmabuf_rx) = mpsc::unbounded_channel(); - let display = Arc::new(DisplayWrapper(Mutex::new(display), display_handle.clone())); let wayland_state = WaylandState::new(display_handle.clone(), &renderer, dmabuf_tx); let output = wayland_state.lock().output.clone(); @@ -128,57 +98,78 @@ impl Wayland { } info!(socket_name, "Wayland active"); - let join_handle = Wayland::start_loop(display.clone(), socket, wayland_state)?; + let flush_notify = Arc::new(Notify::new()); + let client_listener = task::new( + || "Wayland client listener loop", + Wayland::client_listener_loop(display_handle, socket, wayland_state.clone()), + )? + .abort_handle(); + let client_dispatcher = task::new( + || "Wayland dispatch client loop", + Wayland::dispatch_client_loop(display, flush_notify.clone(), wayland_state), + )? + .abort_handle(); Ok(Wayland { - display, - join_handle, + flush_notify, + client_listener, + client_dispatcher, renderer, output, dmabuf_rx, }) } - fn start_loop( - display: Arc, + async fn client_listener_loop( + mut display_handle: DisplayHandle, socket: ListeningSocket, state: Arc>, - ) -> Result>> { - let listen_async = - AsyncUnixListener::from_std(unsafe { UnixListener::from_raw_fd(socket.as_raw_fd()) })?; + ) -> Result<()> { + let async_fd = AsyncFd::new(socket.as_fd())?; + loop { + let mut guard = async_fd.readable().await?; + let Ok(Some(stream)) = socket.accept() else { + guard.clear_ready(); + continue; + }; - let dispatch_poll_fd = display.poll_fd()?; - let dispatch_poll_listener = UnownedFd(Some(AsyncFd::new(dispatch_poll_fd)?)); + let stream = tokio::net::UnixStream::from_std(stream)?; + let pid = stream.peer_cred().ok().and_then(|c| c.pid()); - let dh1 = display.handle(); - let mut dh2 = dh1.clone(); + // New client connected + let client_state = Arc::new(ClientState { + pid, + id: OnceLock::new(), + compositor_state: Default::default(), + seat: state.lock().seat.clone(), + }); + let _client = display_handle.insert_client(stream.into_std()?, client_state.clone())?; + } + } - task::new(|| "wayland loop", async move { - let _socket = socket; // Keep the socket alive - loop { - tokio::select! { - acc = listen_async.accept() => { // New client connected - let (stream, _) = acc?; - let client_state = Arc::new(ClientState { - pid: stream.peer_cred().ok().and_then(|c| c.pid()), - id: OnceLock::new(), - compositor_state: Default::default(), - seat: state.lock().seat.clone(), - }); - let _client = dh2.insert_client(stream.into_std()?, client_state.clone())?; - } - e = dispatch_poll_listener.readable() => { // Dispatch - let mut guard = e?; - debug_span!("Dispatch wayland event").in_scope(|| -> Result<(), color_eyre::Report> { - display.dispatch_clients(&mut state.lock())?; - display.flush_clients(None); - Ok(()) - })?; - guard.clear_ready(); - } + async fn dispatch_client_loop( + mut display: Display, + flush_notify: Arc, + state: Arc>, + ) -> std::io::Result<()> { + loop { + let poll_fd = display.backend().poll_fd(); + let async_fd = AsyncFd::new(poll_fd)?; + tokio::select! { + biased; + _ = async_fd.readable() => { + drop(async_fd); + let _span = debug_span!("Dispatch wayland event"); + let _span = _span.enter(); + let _ = display.dispatch_clients(&mut *state.lock()); + let _ = display.flush_clients(); } + _ = flush_notify.notified() => { + drop(async_fd); + let _ = display.flush_clients(); + }, } - }) + } } #[instrument(level = "debug", name = "Wayland frame", skip(self))] @@ -195,7 +186,7 @@ impl Wayland { } let _ = self.renderer.cleanup_texture_cache(); - self.display.flush_clients(None); + self.flush_notify.notify_waiters(); } pub fn frame_event(&self) { @@ -212,6 +203,7 @@ impl Wayland { } impl Drop for Wayland { fn drop(&mut self) { - self.join_handle.abort(); + self.client_listener.abort(); + self.client_dispatcher.abort(); } }