From 5385c7e7056780e8ca7acd7e9cb71e70cd077d36 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Wed, 3 Sep 2025 11:29:40 +0200 Subject: [PATCH] refactor(networkmanager): Replace Mutex with RwLock for shared state in run(), add debug logging --- src/clients/networkmanager/mod.rs | 32 ++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 2490b0a..b6eb1ab 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -3,7 +3,8 @@ use color_eyre::eyre::Ok; use futures_lite::StreamExt; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::{Mutex, broadcast}; +use tokio::sync::{RwLock, broadcast}; +use tracing::debug; use zbus::Connection; use zbus::zvariant::ObjectPath; @@ -32,7 +33,9 @@ impl Client { } fn run(&self) -> Result<()> { - let devices = Arc::new(Mutex::new(HashSet::::new())); + debug!("Client running"); + + let devices = Arc::new(RwLock::new(HashSet::::new())); spawn(watch_devices_list( devices.clone(), @@ -65,9 +68,11 @@ pub fn create_client() -> ClientResult { } async fn watch_devices_list( - devices: Arc>>>, + devices: Arc>>>, controller_sender: broadcast::Sender, ) -> Result<()> { + debug!("D-Bus devices list watcher starting"); + let dbus_connection = Connection::system().await?; let root = DbusProxy::new(&dbus_connection).await?; @@ -82,7 +87,7 @@ async fn watch_devices_list( .collect::>(); // Atomic read-then-write of `devices` - let mut devices_locked = devices.lock().await; + let mut devices_locked = devices.write().await; let devices_snapshot = devices_locked.clone(); (*devices_locked).clone_from(&new_devices); drop(devices_locked); @@ -96,7 +101,8 @@ async fn watch_devices_list( } let _removed_devices = devices_snapshot.difference(&new_devices); - // TODO: Cook up some way to notify closures for removed devices to exit + // TODO: Store join handles for watchers and abort them when their device is removed + // TODO: Inform module of removed devices } Ok(()) @@ -104,16 +110,18 @@ async fn watch_devices_list( async fn handle_received_events( mut receiver: broadcast::Receiver, - devices: Arc>>>, + devices: Arc>>>, controller_sender: broadcast::Sender, ) -> Result<()> { while let Result::Ok(event) = receiver.recv().await { match event { ModuleToClientEvent::NewController => { + debug!("Client received NewController event"); + let dbus_connection = Connection::system().await?; // We create a local clone here to avoid holding the lock for too long - let devices_snapshot = devices.lock().await.clone(); + let devices_snapshot = devices.read().await.clone(); for device_path in devices_snapshot { let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; @@ -135,11 +143,11 @@ async fn handle_received_events( } async fn watch_device( - device_path: ObjectPath<'_>, + path: ObjectPath<'_>, controller_sender: broadcast::Sender, ) -> Result<()> { let dbus_connection = Connection::system().await?; - let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?; + let device = DeviceDbusProxy::new(&dbus_connection, path.to_owned()).await?; spawn(watch_device_state(device, controller_sender)); @@ -150,6 +158,10 @@ async fn watch_device_state( device: DeviceDbusProxy<'_>, controller_sender: broadcast::Sender, ) -> Result<()> { + let path = device.inner().path(); + + debug!("D-Bus device state watcher for {} starting", path); + let interface = device.interface().await?; let r#type = device.device_type().await?; @@ -171,6 +183,8 @@ async fn watch_device_state( })?; } + debug!("D-Bus device state watcher for {} ended", path); + Ok(()) }