fix: io safety error

This commit is contained in:
Nova
2025-05-15 00:59:59 -07:00
parent 7f7a8b5264
commit 4c70ded2b0

View File

@@ -13,29 +13,29 @@ use self::{state::WaylandState, surface::CORE_SURFACES};
use crate::{core::task, wayland::state::ClientState}; use crate::{core::task, wayland::state::ClientState};
use color_eyre::eyre::{Result, ensure}; use color_eyre::eyre::{Result, ensure};
use parking_lot::Mutex; use parking_lot::Mutex;
use smithay::backend::allocator::dmabuf::Dmabuf; use smithay::{
use smithay::backend::egl::EGLContext; backend::{
use smithay::backend::renderer::gles::GlesRenderer; allocator::dmabuf::Dmabuf,
use smithay::backend::renderer::{ImportDma, Renderer}; egl::EGLContext,
use smithay::output::Output; renderer::{ImportDma, Renderer, gles::GlesRenderer},
use smithay::reexports::wayland_server::DisplayHandle; },
use smithay::reexports::wayland_server::backend::ClientId; output::Output,
use smithay::reexports::wayland_server::{Display, ListeningSocket}; reexports::wayland_server::{Display, DisplayHandle, ListeningSocket},
use smithay::wayland::dmabuf; wayland::dmabuf,
use std::ffi::OsStr; };
use std::os::fd::{IntoRawFd, OwnedFd};
use std::os::unix::prelude::AsRawFd;
use std::sync::OnceLock;
use std::{ use std::{
ffi::c_void, ffi::{OsStr, c_void},
os::unix::{net::UnixListener, prelude::FromRawFd}, os::fd::AsFd,
sync::Arc, sync::{Arc, OnceLock},
}; };
use stereokit_rust::system::{Backend, BackendGraphics}; use stereokit_rust::system::{Backend, BackendGraphics};
use tokio::io::unix::AsyncFdReadyGuard;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::{ use tokio::{
io::unix::AsyncFd, net::UnixListener as AsyncUnixListener, sync::mpsc, task::JoinHandle, io::unix::AsyncFd,
sync::{
Notify,
mpsc::{self, UnboundedReceiver},
},
task::AbortHandle,
}; };
use tracing::{debug_span, info, instrument}; use tracing::{debug_span, info, instrument};
@@ -61,39 +61,10 @@ fn get_sk_egl() -> Result<EGLRawHandles> {
}) })
} }
pub struct DisplayWrapper(Mutex<Display<WaylandState>>, DisplayHandle);
impl DisplayWrapper {
pub fn handle(&self) -> DisplayHandle {
self.1.clone()
}
pub fn dispatch_clients(&self, state: &mut WaylandState) -> Result<usize, std::io::Error> {
self.0.lock().dispatch_clients(state)
}
pub fn flush_clients(&self, client: Option<ClientId>) {
if let Some(mut lock) = self.0.try_lock() {
let _ = lock.backend().flush(client);
}
}
pub fn poll_fd(&self) -> Result<OwnedFd, std::io::Error> {
self.0.lock().backend().poll_fd().try_clone_to_owned()
}
}
struct UnownedFd(Option<AsyncFd<OwnedFd>>);
impl UnownedFd {
async fn readable(&self) -> std::io::Result<AsyncFdReadyGuard<'_, OwnedFd>> {
self.0.as_ref().unwrap().readable().await
}
}
impl Drop for UnownedFd {
fn drop(&mut self) {
let _ = self.0.take().unwrap().into_inner().into_raw_fd();
}
}
pub struct Wayland { pub struct Wayland {
display: Arc<DisplayWrapper>, flush_notify: Arc<Notify>,
join_handle: JoinHandle<Result<()>>, client_listener: AbortHandle,
client_dispatcher: AbortHandle,
renderer: GlesRenderer, renderer: GlesRenderer,
output: Output, output: Output,
dmabuf_rx: UnboundedReceiver<(Dmabuf, Option<dmabuf::ImportNotifier>)>, dmabuf_rx: UnboundedReceiver<(Dmabuf, Option<dmabuf::ImportNotifier>)>,
@@ -113,7 +84,6 @@ impl Wayland {
let display_handle = display.handle(); let display_handle = display.handle();
let (dmabuf_tx, dmabuf_rx) = mpsc::unbounded_channel(); let (dmabuf_tx, dmabuf_rx) = mpsc::unbounded_channel();
let display = Arc::new(DisplayWrapper(Mutex::new(display), display_handle.clone()));
let wayland_state = WaylandState::new(display_handle.clone(), &renderer, dmabuf_tx); let wayland_state = WaylandState::new(display_handle.clone(), &renderer, dmabuf_tx);
let output = wayland_state.lock().output.clone(); let output = wayland_state.lock().output.clone();
@@ -128,57 +98,78 @@ impl Wayland {
} }
info!(socket_name, "Wayland active"); info!(socket_name, "Wayland active");
let join_handle = Wayland::start_loop(display.clone(), socket, wayland_state)?; let flush_notify = Arc::new(Notify::new());
let client_listener = task::new(
|| "Wayland client listener loop",
Wayland::client_listener_loop(display_handle, socket, wayland_state.clone()),
)?
.abort_handle();
let client_dispatcher = task::new(
|| "Wayland dispatch client loop",
Wayland::dispatch_client_loop(display, flush_notify.clone(), wayland_state),
)?
.abort_handle();
Ok(Wayland { Ok(Wayland {
display, flush_notify,
join_handle, client_listener,
client_dispatcher,
renderer, renderer,
output, output,
dmabuf_rx, dmabuf_rx,
}) })
} }
fn start_loop( async fn client_listener_loop(
display: Arc<DisplayWrapper>, mut display_handle: DisplayHandle,
socket: ListeningSocket, socket: ListeningSocket,
state: Arc<Mutex<WaylandState>>, state: Arc<Mutex<WaylandState>>,
) -> Result<JoinHandle<Result<()>>> { ) -> Result<()> {
let listen_async = let async_fd = AsyncFd::new(socket.as_fd())?;
AsyncUnixListener::from_std(unsafe { UnixListener::from_raw_fd(socket.as_raw_fd()) })?; loop {
let mut guard = async_fd.readable().await?;
let Ok(Some(stream)) = socket.accept() else {
guard.clear_ready();
continue;
};
let dispatch_poll_fd = display.poll_fd()?; let stream = tokio::net::UnixStream::from_std(stream)?;
let dispatch_poll_listener = UnownedFd(Some(AsyncFd::new(dispatch_poll_fd)?)); let pid = stream.peer_cred().ok().and_then(|c| c.pid());
let dh1 = display.handle(); // New client connected
let mut dh2 = dh1.clone(); let client_state = Arc::new(ClientState {
pid,
id: OnceLock::new(),
compositor_state: Default::default(),
seat: state.lock().seat.clone(),
});
let _client = display_handle.insert_client(stream.into_std()?, client_state.clone())?;
}
}
task::new(|| "wayland loop", async move { async fn dispatch_client_loop(
let _socket = socket; // Keep the socket alive mut display: Display<WaylandState>,
loop { flush_notify: Arc<Notify>,
tokio::select! { state: Arc<Mutex<WaylandState>>,
acc = listen_async.accept() => { // New client connected ) -> std::io::Result<()> {
let (stream, _) = acc?; loop {
let client_state = Arc::new(ClientState { let poll_fd = display.backend().poll_fd();
pid: stream.peer_cred().ok().and_then(|c| c.pid()), let async_fd = AsyncFd::new(poll_fd)?;
id: OnceLock::new(), tokio::select! {
compositor_state: Default::default(), biased;
seat: state.lock().seat.clone(), _ = async_fd.readable() => {
}); drop(async_fd);
let _client = dh2.insert_client(stream.into_std()?, client_state.clone())?; let _span = debug_span!("Dispatch wayland event");
} let _span = _span.enter();
e = dispatch_poll_listener.readable() => { // Dispatch let _ = display.dispatch_clients(&mut *state.lock());
let mut guard = e?; let _ = display.flush_clients();
debug_span!("Dispatch wayland event").in_scope(|| -> Result<(), color_eyre::Report> {
display.dispatch_clients(&mut state.lock())?;
display.flush_clients(None);
Ok(())
})?;
guard.clear_ready();
}
} }
_ = flush_notify.notified() => {
drop(async_fd);
let _ = display.flush_clients();
},
} }
}) }
} }
#[instrument(level = "debug", name = "Wayland frame", skip(self))] #[instrument(level = "debug", name = "Wayland frame", skip(self))]
@@ -195,7 +186,7 @@ impl Wayland {
} }
let _ = self.renderer.cleanup_texture_cache(); let _ = self.renderer.cleanup_texture_cache();
self.display.flush_clients(None); self.flush_notify.notify_waiters();
} }
pub fn frame_event(&self) { pub fn frame_event(&self) {
@@ -212,6 +203,7 @@ impl Wayland {
} }
impl Drop for Wayland { impl Drop for Wayland {
fn drop(&mut self) { fn drop(&mut self) {
self.join_handle.abort(); self.client_listener.abort();
self.client_dispatcher.abort();
} }
} }