diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 52af9e7..27fded8 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -3,7 +3,7 @@ use color_eyre::eyre::Ok; use futures_lite::StreamExt; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::{RwLock, broadcast}; +use tokio::sync::{Mutex, broadcast}; use zbus::Connection; use zbus::zvariant::{ObjectPath, Str}; @@ -32,79 +32,19 @@ impl Client { } fn run(&self) -> Result<()> { - let devices: &'static _ = Box::leak(Box::new(RwLock::new(HashSet::::new()))); + let devices = Arc::new(Mutex::new(HashSet::::new())); - { - let controller_sender = self.controller_sender.clone(); - spawn(async move { - let dbus_connection = Connection::system().await?; - let root = DbusProxy::new(&dbus_connection).await?; + spawn(watch_devices_list( + devices.clone(), + self.controller_sender.clone(), + )); - let mut devices_changes = root.receive_all_devices_changed().await; - while let Some(devices_change) = devices_changes.next().await { - // The new list of devices from dbus, not to be confused with the added devices below - let new_devices = devices_change - .get() - .await? - .iter() - .map(ObjectPath::to_owned) - .collect::>(); - - // We create a local clone here to avoid holding the lock for too long - let devices_snapshot = devices.read().await.clone(); - - let added_devices = new_devices.difference(&devices_snapshot); - for added_device in added_devices { - spawn(watch_device( - added_device.to_owned(), - controller_sender.clone(), - )); - } - - let _removed_devices = devices_snapshot.difference(&new_devices); - // TODO: Cook up some way to notify closures for removed devices to exit - - *devices.write().await = new_devices; - } - - Ok(()) - }); - } - - { - let controller_sender = self.controller_sender.clone(); - let mut receiver = self.sender.subscribe(); - spawn(async move { - while let Result::Ok(event) = receiver.recv().await { - match event { - ModuleToClientEvent::NewController => { - // We create a local clone here to avoid holding the lock for too long - let devices_snapshot = devices.read().await.clone(); - - for device_path in devices_snapshot { - let dbus_connection = Connection::system().await?; - let device = - DeviceDbusProxy::new(&dbus_connection, device_path).await?; - - // TODO: Create DeviceDbusProxy -> DeviceStateChanged function and use it in the watcher as well - let interface = device.interface().await?.to_string(); - let r#type = device.device_type().await?; - let state = device.state().await?; - controller_sender.send( - ClientToModuleEvent::DeviceStateChanged { - interface, - r#type, - state, - }, - )?; - } - } - } - } - - Ok(()) - }); - } + let receiver = self.sender.subscribe(); + spawn(handle_received_events( + receiver, + devices.clone(), + self.controller_sender.clone(), + )); Ok(()) } @@ -124,6 +64,77 @@ pub fn create_client() -> ClientResult { Ok(client) } +async fn watch_devices_list( + devices: Arc>>>, + controller_sender: broadcast::Sender, +) -> Result<()> { + let dbus_connection = Connection::system().await?; + let root = DbusProxy::new(&dbus_connection).await?; + + let mut devices_changes = root.receive_all_devices_changed().await; + while let Some(devices_change) = devices_changes.next().await { + // The new list of devices from dbus, not to be confused with the added devices below + let new_devices = devices_change + .get() + .await? + .iter() + .map(ObjectPath::to_owned) + .collect::>(); + + // Atomic read-then-write of `devices` + let mut devices_locked = devices.lock().await; + let devices_snapshot = devices_locked.clone(); + (*devices_locked).clone_from(&new_devices); + drop(devices_locked); + + let added_devices = new_devices.difference(&devices_snapshot); + for added_device in added_devices { + spawn(watch_device( + added_device.to_owned(), + controller_sender.clone(), + )); + } + + let _removed_devices = devices_snapshot.difference(&new_devices); + // TODO: Cook up some way to notify closures for removed devices to exit + } + + Ok(()) +} + +async fn handle_received_events( + mut receiver: broadcast::Receiver, + devices: Arc>>>, + controller_sender: broadcast::Sender, +) -> Result<()> { + while let Result::Ok(event) = receiver.recv().await { + match event { + ModuleToClientEvent::NewController => { + let dbus_connection = Connection::system().await?; + + // We create a local clone here to avoid holding the lock for too long + let devices_snapshot = devices.lock().await.clone(); + + for device_path in devices_snapshot { + let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; + + // TODO: Create DeviceDbusProxy -> DeviceStateChanged function and use it in the watcher as well + let interface = device.interface().await?.to_string(); + let r#type = device.device_type().await?; + let state = device.state().await?; + controller_sender.send(ClientToModuleEvent::DeviceStateChanged { + interface, + r#type, + state, + })?; + } + } + } + } + + Ok(()) +} + async fn watch_device( device_path: ObjectPath<'_>, controller_sender: broadcast::Sender,