refactor(client): use new queued client messenger

This commit is contained in:
Nova
2022-09-14 19:17:02 -04:00
parent 4d897f4fcd
commit fc7757e9fb
2 changed files with 21 additions and 8 deletions

View File

@@ -58,7 +58,10 @@ impl Client {
stop_notifier: Default::default(), stop_notifier: Default::default(),
join_handle: OnceCell::new(), join_handle: OnceCell::new(),
messenger: Some(Messenger::new(connection)), messenger: Some(Messenger::new(
tokio::runtime::Handle::current(),
connection,
)),
scenegraph: Default::default(), scenegraph: Default::default(),
root: OnceCell::new(), root: OnceCell::new(),
base_resource_prefixes: Default::default(), base_resource_prefixes: Default::default(),
@@ -81,10 +84,16 @@ impl Client {
client.dispatch().await? client.dispatch().await?
} }
}; };
let flush_loop = async {
loop {
client.flush().await?
}
};
let result = tokio::select! { let result = tokio::select! {
_ = client.stop_notifier.notified() => Ok(()), _ = client.stop_notifier.notified() => Ok(()),
e = dispatch_loop => e, e = dispatch_loop => e,
e = flush_loop => e,
}; };
client.disconnect().await; client.disconnect().await;
result result
@@ -100,6 +109,13 @@ impl Client {
} }
} }
pub async fn flush(&self) -> Result<(), std::io::Error> {
match &self.messenger {
Some(messenger) => messenger.flush().await,
None => Err(std::io::Error::from(std::io::ErrorKind::Unsupported)),
}
}
pub async fn disconnect(&self) { pub async fn disconnect(&self) {
self.stop_notifier.notify_one(); self.stop_notifier.notify_one();
if let Some(event_loop) = self.event_loop.upgrade() { if let Some(event_loop) = self.event_loop.upgrade() {

View File

@@ -175,13 +175,10 @@ impl Node {
let path = self.path.clone(); let path = self.path.clone();
let method = method.to_string(); let method = method.to_string();
let data = data.to_vec(); let data = data.to_vec();
tokio::spawn(async move {
if let Some(messenger) = client.messenger.as_ref() { if let Some(messenger) = client.messenger.as_ref() {
let _ = messenger let _ = messenger.send_remote_signal(path.as_str(), method.as_str(), data.as_slice());
.send_remote_signal(path.as_str(), method.as_str(), data.as_slice()) }
.await;
}
});
Ok(()) Ok(())
} }
pub async fn execute_remote_method(&self, method: &str, data: Vec<u8>) -> Result<Vec<u8>> { pub async fn execute_remote_method(&self, method: &str, data: Vec<u8>) -> Result<Vec<u8>> {