From fc7757e9fbbeb14115643e8f25df3b7d6eb9dbd1 Mon Sep 17 00:00:00 2001 From: Nova Date: Wed, 14 Sep 2022 19:17:02 -0400 Subject: [PATCH] refactor(client): use new queued client messenger --- src/core/client.rs | 18 +++++++++++++++++- src/nodes/core.rs | 11 ++++------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/core/client.rs b/src/core/client.rs index 16d519d..125c7bb 100644 --- a/src/core/client.rs +++ b/src/core/client.rs @@ -58,7 +58,10 @@ impl Client { stop_notifier: Default::default(), join_handle: OnceCell::new(), - messenger: Some(Messenger::new(connection)), + messenger: Some(Messenger::new( + tokio::runtime::Handle::current(), + connection, + )), scenegraph: Default::default(), root: OnceCell::new(), base_resource_prefixes: Default::default(), @@ -81,10 +84,16 @@ impl Client { client.dispatch().await? } }; + let flush_loop = async { + loop { + client.flush().await? + } + }; let result = tokio::select! { _ = client.stop_notifier.notified() => Ok(()), e = dispatch_loop => e, + e = flush_loop => e, }; client.disconnect().await; result @@ -100,6 +109,13 @@ impl Client { } } + pub async fn flush(&self) -> Result<(), std::io::Error> { + match &self.messenger { + Some(messenger) => messenger.flush().await, + None => Err(std::io::Error::from(std::io::ErrorKind::Unsupported)), + } + } + pub async fn disconnect(&self) { self.stop_notifier.notify_one(); if let Some(event_loop) = self.event_loop.upgrade() { diff --git a/src/nodes/core.rs b/src/nodes/core.rs index bbcb3de..b764337 100644 --- a/src/nodes/core.rs +++ b/src/nodes/core.rs @@ -175,13 +175,10 @@ impl Node { let path = self.path.clone(); let method = method.to_string(); let data = data.to_vec(); - tokio::spawn(async move { - if let Some(messenger) = client.messenger.as_ref() { - let _ = messenger - .send_remote_signal(path.as_str(), method.as_str(), data.as_slice()) - .await; - } - }); + + if let Some(messenger) = client.messenger.as_ref() { + let _ = messenger.send_remote_signal(path.as_str(), method.as_str(), data.as_slice()); + } Ok(()) } pub async fn execute_remote_method(&self, method: &str, data: Vec) -> Result> {