From d7638e3c2d842b55b19835700f26d1b727de395c Mon Sep 17 00:00:00 2001 From: Nova Date: Tue, 12 Jul 2022 22:47:47 -0400 Subject: [PATCH] fix(event loop): print error on drop/join --- src/core/eventloop.rs | 152 +++++++++++++++++++++++++----------------- 1 file changed, 92 insertions(+), 60 deletions(-) diff --git a/src/core/eventloop.rs b/src/core/eventloop.rs index d59d15f..549b18c 100644 --- a/src/core/eventloop.rs +++ b/src/core/eventloop.rs @@ -22,66 +22,11 @@ impl EventLoop { pub fn new(timeout: Option) -> Result { let socket_path = server::get_free_socket_path() .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::Other))?; - let mut socket = UnixListener::bind(socket_path.clone())?; - let (sender, mut receiver) = pipe::new()?; + let (sender, receiver) = pipe::new()?; + let socket_path_captured = socket_path.clone(); let join_handle = thread::Builder::new() .name("event_loop".to_owned()) - .spawn(move || -> Result<()> { - let mut clients: Slab>> = Slab::new(); - let mut poll = Poll::new()?; - let mut events = Events::with_capacity(1024); - const LISTENER: Token = Token(usize::MAX - 1); - poll.registry() - .register(&mut socket, LISTENER, Interest::READABLE)?; - const STOP: Token = Token(usize::MAX); - poll.registry() - .register(&mut receiver, STOP, Interest::READABLE)?; - loop { - poll.poll(&mut events, timeout)?; - for event in &events { - match event.token() { - LISTENER => loop { - match socket.accept() { - Ok((mut socket, _)) => { - let client_number = clients.insert(None); - poll.registry().register( - &mut socket, - Token(client_number), - Interest::READABLE, - )?; - let client = Client::from_connection(socket); - *clients.get_mut(client_number).unwrap() = Some(client); - } - Err(e) => match e.kind() { - std::io::ErrorKind::WouldBlock => break, - _ => return Err(e.into()), - }, - } - }, - STOP => return Ok(()), - token => { - let client = - clients.get(token.0).and_then(|client| client.as_ref()); - if let Some(client) = client { - loop { - let dispatch_result = client.dispatch(); - match dispatch_result { - Ok(_) => continue, - Err(e) => match e.kind() { - std::io::ErrorKind::WouldBlock => break, - _ => { - clients.remove(token.0); - break; - } - }, - } - } - } - } - } - } - } - }) + .spawn(move || EventLoop::run_loop(timeout, socket_path_captured, receiver)) .ok(); Ok(EventLoop { socket_path, @@ -89,6 +34,87 @@ impl EventLoop { stop_write: sender, }) } + + fn run_loop( + timeout: Option, + socket_path: String, + stop_receiver: pipe::Receiver, + ) -> Result<()> { + let mut stop_receiver = stop_receiver; + let mut socket = UnixListener::bind(socket_path)?; + let mut clients: Slab>> = Slab::new(); + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + const LISTENER: Token = Token(usize::MAX - 1); + poll.registry() + .register(&mut socket, LISTENER, Interest::READABLE)?; + const STOP: Token = Token(usize::MAX); + poll.registry() + .register(&mut stop_receiver, STOP, Interest::READABLE)?; + 'event_loop: loop { + poll.poll(&mut events, timeout)?; + for event in &events { + match event.token() { + LISTENER => EventLoop::accept_client(&socket, &mut clients, &poll)?, + STOP => break 'event_loop, + token => EventLoop::handle_client_message(token.0, &mut clients)?, + } + } + } + + println!("Event loop gracefully finished"); + Ok(()) + } + + fn accept_client( + socket: &UnixListener, + clients: &mut Slab>>, + poll: &Poll, + ) -> Result<()> { + loop { + match socket.accept() { + Ok((mut socket, _)) => { + let client_number = clients.insert(None); + poll.registry().register( + &mut socket, + Token(client_number), + Interest::READABLE, + )?; + let client = Client::from_connection(socket); + *clients.get_mut(client_number).unwrap() = Some(client); + } + Err(e) => match e.kind() { + std::io::ErrorKind::WouldBlock => break, + _ => return Err(e.into()), + }, + } + } + Ok(()) + } + + fn handle_client_message( + client_id: usize, + clients: &mut Slab>>, + ) -> Result<()> { + let client = clients.get(client_id).and_then(|client| client.as_ref()); + if let Some(client) = client { + loop { + let dispatch_result = client.dispatch(); + match dispatch_result { + Ok(_) => continue, + Err(e) => match e.kind() { + std::io::ErrorKind::WouldBlock => break, + std::io::ErrorKind::Interrupted => continue, + _ => { + clients.remove(client_id); + break; + } + }, + } + } + } + Ok(()) + } } impl Drop for EventLoop { @@ -96,8 +122,14 @@ impl Drop for EventLoop { let buf: [u8; 1] = [1; 1]; let _ = self.stop_write.write(buf.as_slice()); if let Some(handle) = self.join_handle.take() { - let _ = handle.join(); - // handle.join().unwrap().unwrap(); + match handle.join() { + Ok(r) => { + if let Err(e) = r { + eprintln!("Event loop error: {}", e); + } + } + Err(e) => eprintln!("Event loop failed to rejoin with error: {:#?}", e), + } } } }