From 226b32ce6a674c32ee94da0361e82a98d9c01ecb Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Tue, 2 Sep 2025 20:42:26 +0200 Subject: [PATCH 01/12] fix(networkmanager): support late module initialisation For example when a second monitor is connected while Ironbar is already running. --- src/clients/networkmanager/event.rs | 13 ++- src/clients/networkmanager/mod.rs | 123 ++++++++++++++++++++-------- src/modules/networkmanager.rs | 65 ++++++++------- 3 files changed, 135 insertions(+), 66 deletions(-) diff --git a/src/clients/networkmanager/event.rs b/src/clients/networkmanager/event.rs index 4963f6e..034d6e6 100644 --- a/src/clients/networkmanager/event.rs +++ b/src/clients/networkmanager/event.rs @@ -1,13 +1,18 @@ use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; #[derive(Debug, Clone)] -pub enum Event { - DeviceAdded { - interface: String, - }, +pub enum ClientToModuleEvent { DeviceStateChanged { interface: String, r#type: DeviceType, state: DeviceState, }, + DeviceRemoved { + interface: String, + }, +} + +#[derive(Debug, Clone)] +pub enum ModuleToClientEvent { + NewController, } diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 94cb82d..52af9e7 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -3,13 +3,13 @@ use color_eyre::eyre::Ok; use futures_lite::StreamExt; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::broadcast; +use tokio::sync::{RwLock, broadcast}; use zbus::Connection; use zbus::zvariant::{ObjectPath, Str}; use crate::clients::ClientResult; use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy}; -use crate::clients::networkmanager::event::Event; +use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent}; use crate::{register_fallible_client, spawn}; pub mod dbus; @@ -17,47 +17,104 @@ pub mod event; #[derive(Debug)] pub struct Client { - tx: broadcast::Sender, + controller_sender: broadcast::Sender, + sender: broadcast::Sender, } impl Client { fn new() -> Result { - let (tx, _) = broadcast::channel(64); - Ok(Client { tx }) + let (controller_sender, _) = broadcast::channel(64); + let (sender, _) = broadcast::channel(8); + Ok(Client { + controller_sender, + sender, + }) } fn run(&self) -> Result<()> { - let tx = self.tx.clone(); - spawn(async move { - let dbus_connection = Connection::system().await?; - let root = DbusProxy::new(&dbus_connection).await?; + let devices: &'static _ = Box::leak(Box::new(RwLock::new(HashSet::::new()))); - let mut devices = HashSet::new(); + { + let controller_sender = self.controller_sender.clone(); + spawn(async move { + let dbus_connection = Connection::system().await?; + let root = DbusProxy::new(&dbus_connection).await?; - let mut devices_changes = root.receive_all_devices_changed().await; - while let Some(devices_change) = devices_changes.next().await { - // The new list of devices from dbus, not to be confused with the added devices below - let new_devices = HashSet::from_iter(devices_change.get().await?); + let mut devices_changes = root.receive_all_devices_changed().await; + while let Some(devices_change) = devices_changes.next().await { + // The new list of devices from dbus, not to be confused with the added devices below + let new_devices = devices_change + .get() + .await? + .iter() + .map(ObjectPath::to_owned) + .collect::>(); - let added_devices = new_devices.difference(&devices); - for added_device in added_devices { - spawn(watch_device(added_device.to_owned(), tx.clone())); + // We create a local clone here to avoid holding the lock for too long + let devices_snapshot = devices.read().await.clone(); + + let added_devices = new_devices.difference(&devices_snapshot); + for added_device in added_devices { + spawn(watch_device( + added_device.to_owned(), + controller_sender.clone(), + )); + } + + let _removed_devices = devices_snapshot.difference(&new_devices); + // TODO: Cook up some way to notify closures for removed devices to exit + + *devices.write().await = new_devices; } - let _removed_devices = devices.difference(&new_devices); - // TODO: Cook up some way to notify closures for removed devices to exit + Ok(()) + }); + } - devices = new_devices; - } + { + let controller_sender = self.controller_sender.clone(); + let mut receiver = self.sender.subscribe(); + spawn(async move { + while let Result::Ok(event) = receiver.recv().await { + match event { + ModuleToClientEvent::NewController => { + // We create a local clone here to avoid holding the lock for too long + let devices_snapshot = devices.read().await.clone(); - Ok(()) - }); + for device_path in devices_snapshot { + let dbus_connection = Connection::system().await?; + let device = + DeviceDbusProxy::new(&dbus_connection, device_path).await?; + + // TODO: Create DeviceDbusProxy -> DeviceStateChanged function and use it in the watcher as well + let interface = device.interface().await?.to_string(); + let r#type = device.device_type().await?; + let state = device.state().await?; + controller_sender.send( + ClientToModuleEvent::DeviceStateChanged { + interface, + r#type, + state, + }, + )?; + } + } + } + } + + Ok(()) + }); + } Ok(()) } - pub fn subscribe(&self) -> broadcast::Receiver { - self.tx.subscribe() + pub fn subscribe(&self) -> broadcast::Receiver { + self.controller_sender.subscribe() + } + + pub fn get_sender(&self) -> broadcast::Sender { + self.sender.clone() } } @@ -67,19 +124,19 @@ pub fn create_client() -> ClientResult { Ok(client) } -async fn watch_device(device_path: ObjectPath<'_>, tx: broadcast::Sender) -> Result<()> { +async fn watch_device( + device_path: ObjectPath<'_>, + controller_sender: broadcast::Sender, +) -> Result<()> { let dbus_connection = Connection::system().await?; let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?; let interface = device.interface().await?; - tx.send(Event::DeviceAdded { - interface: interface.to_string(), - })?; spawn(watch_device_state( device_path.to_owned(), interface.to_owned(), - tx.clone(), + controller_sender.clone(), )); Ok(()) @@ -88,7 +145,7 @@ async fn watch_device(device_path: ObjectPath<'_>, tx: broadcast::Sender) async fn watch_device_state( device_path: ObjectPath<'_>, interface: Str<'_>, - tx: broadcast::Sender, + controller_sender: broadcast::Sender, ) -> Result<()> { let dbus_connection = Connection::system().await?; let device = DeviceDbusProxy::new(&dbus_connection, &device_path).await?; @@ -96,7 +153,7 @@ async fn watch_device_state( // Send an event communicating the initial state let state = device.state().await?; - tx.send(Event::DeviceStateChanged { + controller_sender.send(ClientToModuleEvent::DeviceStateChanged { interface: interface.to_string(), r#type: r#type.clone(), state, @@ -105,7 +162,7 @@ async fn watch_device_state( let mut state_changes = device.receive_state_changed().await; while let Some(state_change) = state_changes.next().await { let state = state_change.get().await?; - tx.send(Event::DeviceStateChanged { + controller_sender.send(ClientToModuleEvent::DeviceStateChanged { interface: interface.to_string(), r#type: r#type.clone(), state, diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index ea8b8d9..4f464f0 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -1,6 +1,6 @@ use crate::clients::networkmanager::Client; use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; -use crate::clients::networkmanager::event::Event; +use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent}; use crate::config::CommonConfig; use crate::gtk_helpers::IronbarGtkExt; use crate::image::Provider; @@ -29,7 +29,7 @@ const fn default_icon_size() -> i32 { } impl Module for NetworkManagerModule { - type SendMessage = Event; + type SendMessage = ClientToModuleEvent; type ReceiveMessage = (); module_impl!("network_manager"); @@ -37,19 +37,24 @@ impl Module for NetworkManagerModule { fn spawn_controller( &self, _info: &ModuleInfo, - context: &WidgetContext, - _rx: mpsc::Receiver<()>, + context: &WidgetContext, + _widget_receiver: mpsc::Receiver<()>, ) -> Result<()> { let client = context.try_client::()?; // Should we be using context.tx with ModuleUpdateEvent::Update instead? - let tx = context.update_tx.clone(); - // Must be done here synchronously to avoid race condition - let mut client_rx = client.subscribe(); - spawn(async move { - while let Result::Ok(event) = client_rx.recv().await { - tx.send(event)?; - } + let widget_sender = context.update_tx.clone(); + // Must be done here otherwise we miss the response to our `NewController` event + let mut client_receiver = client.subscribe(); + + client + .get_sender() + .send(ModuleToClientEvent::NewController)?; + + spawn(async move { + while let Result::Ok(event) = client_receiver.recv().await { + widget_sender.send(event)?; + } Ok(()) }); @@ -58,16 +63,17 @@ impl Module for NetworkManagerModule { fn into_widget( self, - context: WidgetContext, + context: WidgetContext, _info: &ModuleInfo, ) -> Result> { + // Must be done here otherwise we miss the response to our `NewController` event + let receiver = context.subscribe(); + let container = gtk::Box::new(Orientation::Horizontal, 0); - // Must be done here synchronously to avoid race condition - let rx = context.subscribe(); // We cannot use recv_glib_async here because the lifetimes don't work out spawn_future_local(handle_update_events( - rx, + receiver, container.clone(), self.icon_size, context.ironbar.image_provider(), @@ -78,29 +84,27 @@ impl Module for NetworkManagerModule { } async fn handle_update_events( - mut rx: broadcast::Receiver, + mut receiver: broadcast::Receiver, container: gtk::Box, icon_size: i32, image_provider: Provider, -) { +) -> Result<()> { let mut icons = HashMap::::new(); - while let Result::Ok(event) = rx.recv().await { + while let Result::Ok(event) = receiver.recv().await { match event { - Event::DeviceAdded { interface, .. } => { - let icon = Image::new(); - icon.add_class("icon"); - container.add(&icon); - icons.insert(interface, icon); - } - Event::DeviceStateChanged { + ClientToModuleEvent::DeviceStateChanged { interface, r#type, state, } => { - let icon = icons - .get(&interface) - .expect("the icon for the interface to be present"); + let icon: &_ = icons.entry(interface).or_insert_with(|| { + let icon = Image::new(); + icon.add_class("icon"); + container.add(&icon); + icon + }); + // TODO: Make this configurable at runtime let icon_name = get_icon_for_device_state(&r#type, &state); match icon_name { @@ -115,8 +119,11 @@ async fn handle_update_events( } } } - }; + ClientToModuleEvent::DeviceRemoved { .. } => {} + } } + + Ok(()) } fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option<&'static str> { From ec00b2ce690fce9d1b20b1f50416f11600492641 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Tue, 2 Sep 2025 22:20:54 +0200 Subject: [PATCH 02/12] refactor(networkmanager): break Client::run up into multiple functions Also replace its shared state lifetime and synchonisation mechanisms with Arc>. --- src/clients/networkmanager/mod.rs | 155 ++++++++++++++++-------------- 1 file changed, 83 insertions(+), 72 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 52af9e7..27fded8 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -3,7 +3,7 @@ use color_eyre::eyre::Ok; use futures_lite::StreamExt; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::{RwLock, broadcast}; +use tokio::sync::{Mutex, broadcast}; use zbus::Connection; use zbus::zvariant::{ObjectPath, Str}; @@ -32,79 +32,19 @@ impl Client { } fn run(&self) -> Result<()> { - let devices: &'static _ = Box::leak(Box::new(RwLock::new(HashSet::::new()))); + let devices = Arc::new(Mutex::new(HashSet::::new())); - { - let controller_sender = self.controller_sender.clone(); - spawn(async move { - let dbus_connection = Connection::system().await?; - let root = DbusProxy::new(&dbus_connection).await?; + spawn(watch_devices_list( + devices.clone(), + self.controller_sender.clone(), + )); - let mut devices_changes = root.receive_all_devices_changed().await; - while let Some(devices_change) = devices_changes.next().await { - // The new list of devices from dbus, not to be confused with the added devices below - let new_devices = devices_change - .get() - .await? - .iter() - .map(ObjectPath::to_owned) - .collect::>(); - - // We create a local clone here to avoid holding the lock for too long - let devices_snapshot = devices.read().await.clone(); - - let added_devices = new_devices.difference(&devices_snapshot); - for added_device in added_devices { - spawn(watch_device( - added_device.to_owned(), - controller_sender.clone(), - )); - } - - let _removed_devices = devices_snapshot.difference(&new_devices); - // TODO: Cook up some way to notify closures for removed devices to exit - - *devices.write().await = new_devices; - } - - Ok(()) - }); - } - - { - let controller_sender = self.controller_sender.clone(); - let mut receiver = self.sender.subscribe(); - spawn(async move { - while let Result::Ok(event) = receiver.recv().await { - match event { - ModuleToClientEvent::NewController => { - // We create a local clone here to avoid holding the lock for too long - let devices_snapshot = devices.read().await.clone(); - - for device_path in devices_snapshot { - let dbus_connection = Connection::system().await?; - let device = - DeviceDbusProxy::new(&dbus_connection, device_path).await?; - - // TODO: Create DeviceDbusProxy -> DeviceStateChanged function and use it in the watcher as well - let interface = device.interface().await?.to_string(); - let r#type = device.device_type().await?; - let state = device.state().await?; - controller_sender.send( - ClientToModuleEvent::DeviceStateChanged { - interface, - r#type, - state, - }, - )?; - } - } - } - } - - Ok(()) - }); - } + let receiver = self.sender.subscribe(); + spawn(handle_received_events( + receiver, + devices.clone(), + self.controller_sender.clone(), + )); Ok(()) } @@ -124,6 +64,77 @@ pub fn create_client() -> ClientResult { Ok(client) } +async fn watch_devices_list( + devices: Arc>>>, + controller_sender: broadcast::Sender, +) -> Result<()> { + let dbus_connection = Connection::system().await?; + let root = DbusProxy::new(&dbus_connection).await?; + + let mut devices_changes = root.receive_all_devices_changed().await; + while let Some(devices_change) = devices_changes.next().await { + // The new list of devices from dbus, not to be confused with the added devices below + let new_devices = devices_change + .get() + .await? + .iter() + .map(ObjectPath::to_owned) + .collect::>(); + + // Atomic read-then-write of `devices` + let mut devices_locked = devices.lock().await; + let devices_snapshot = devices_locked.clone(); + (*devices_locked).clone_from(&new_devices); + drop(devices_locked); + + let added_devices = new_devices.difference(&devices_snapshot); + for added_device in added_devices { + spawn(watch_device( + added_device.to_owned(), + controller_sender.clone(), + )); + } + + let _removed_devices = devices_snapshot.difference(&new_devices); + // TODO: Cook up some way to notify closures for removed devices to exit + } + + Ok(()) +} + +async fn handle_received_events( + mut receiver: broadcast::Receiver, + devices: Arc>>>, + controller_sender: broadcast::Sender, +) -> Result<()> { + while let Result::Ok(event) = receiver.recv().await { + match event { + ModuleToClientEvent::NewController => { + let dbus_connection = Connection::system().await?; + + // We create a local clone here to avoid holding the lock for too long + let devices_snapshot = devices.lock().await.clone(); + + for device_path in devices_snapshot { + let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; + + // TODO: Create DeviceDbusProxy -> DeviceStateChanged function and use it in the watcher as well + let interface = device.interface().await?.to_string(); + let r#type = device.device_type().await?; + let state = device.state().await?; + controller_sender.send(ClientToModuleEvent::DeviceStateChanged { + interface, + r#type, + state, + })?; + } + } + } + } + + Ok(()) +} + async fn watch_device( device_path: ObjectPath<'_>, controller_sender: broadcast::Sender, From 4c516a1c2a6b9d417b87d36cc0080fce453a5153 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Tue, 2 Sep 2025 22:44:14 +0200 Subject: [PATCH 03/12] refactor(networkmanager): rename DeviceStateChanged event to DeviceChanged Also add a little TODO about icon order. --- src/clients/networkmanager/event.rs | 4 ++-- src/clients/networkmanager/mod.rs | 19 +++++++++---------- src/modules/networkmanager.rs | 7 ++++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/clients/networkmanager/event.rs b/src/clients/networkmanager/event.rs index 034d6e6..85293f5 100644 --- a/src/clients/networkmanager/event.rs +++ b/src/clients/networkmanager/event.rs @@ -2,10 +2,10 @@ use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; #[derive(Debug, Clone)] pub enum ClientToModuleEvent { - DeviceStateChanged { + DeviceChanged { interface: String, r#type: DeviceType, - state: DeviceState, + new_state: DeviceState, }, DeviceRemoved { interface: String, diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 27fded8..d3b02fe 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -118,14 +118,13 @@ async fn handle_received_events( for device_path in devices_snapshot { let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; - // TODO: Create DeviceDbusProxy -> DeviceStateChanged function and use it in the watcher as well let interface = device.interface().await?.to_string(); let r#type = device.device_type().await?; - let state = device.state().await?; - controller_sender.send(ClientToModuleEvent::DeviceStateChanged { + let new_state = device.state().await?; + controller_sender.send(ClientToModuleEvent::DeviceChanged { interface, r#type, - state, + new_state, })?; } } @@ -163,20 +162,20 @@ async fn watch_device_state( let r#type = device.device_type().await?; // Send an event communicating the initial state - let state = device.state().await?; - controller_sender.send(ClientToModuleEvent::DeviceStateChanged { + let new_state = device.state().await?; + controller_sender.send(ClientToModuleEvent::DeviceChanged { interface: interface.to_string(), r#type: r#type.clone(), - state, + new_state, })?; let mut state_changes = device.receive_state_changed().await; while let Some(state_change) = state_changes.next().await { - let state = state_change.get().await?; - controller_sender.send(ClientToModuleEvent::DeviceStateChanged { + let new_state = state_change.get().await?; + controller_sender.send(ClientToModuleEvent::DeviceChanged { interface: interface.to_string(), r#type: r#type.clone(), - state, + new_state, })?; } diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index 4f464f0..01e99ca 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -89,14 +89,15 @@ async fn handle_update_events( icon_size: i32, image_provider: Provider, ) -> Result<()> { + // TODO: Ensure the visible icons are always in the same order let mut icons = HashMap::::new(); while let Result::Ok(event) = receiver.recv().await { match event { - ClientToModuleEvent::DeviceStateChanged { + ClientToModuleEvent::DeviceChanged { interface, r#type, - state, + new_state, } => { let icon: &_ = icons.entry(interface).or_insert_with(|| { let icon = Image::new(); @@ -106,7 +107,7 @@ async fn handle_update_events( }); // TODO: Make this configurable at runtime - let icon_name = get_icon_for_device_state(&r#type, &state); + let icon_name = get_icon_for_device_state(&r#type, &new_state); match icon_name { Some(icon_name) => { image_provider From 3ffb668e6b82efee0bc3b88321364cb824e72c8f Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Tue, 2 Sep 2025 23:23:44 +0200 Subject: [PATCH 04/12] refactor(networkmanager): pass device proxy directly to device state watcher Also clarify what receiver we're dealing with in handle_update_events. --- src/clients/networkmanager/mod.rs | 16 ++++------------ src/modules/networkmanager.rs | 4 ++-- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index d3b02fe..2490b0a 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -5,7 +5,7 @@ use std::collections::HashSet; use std::sync::Arc; use tokio::sync::{Mutex, broadcast}; use zbus::Connection; -use zbus::zvariant::{ObjectPath, Str}; +use zbus::zvariant::ObjectPath; use crate::clients::ClientResult; use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy}; @@ -141,24 +141,16 @@ async fn watch_device( let dbus_connection = Connection::system().await?; let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?; - let interface = device.interface().await?; - - spawn(watch_device_state( - device_path.to_owned(), - interface.to_owned(), - controller_sender.clone(), - )); + spawn(watch_device_state(device, controller_sender)); Ok(()) } async fn watch_device_state( - device_path: ObjectPath<'_>, - interface: Str<'_>, + device: DeviceDbusProxy<'_>, controller_sender: broadcast::Sender, ) -> Result<()> { - let dbus_connection = Connection::system().await?; - let device = DeviceDbusProxy::new(&dbus_connection, &device_path).await?; + let interface = device.interface().await?; let r#type = device.device_type().await?; // Send an event communicating the initial state diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index 01e99ca..2ee5e9c 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -84,7 +84,7 @@ impl Module for NetworkManagerModule { } async fn handle_update_events( - mut receiver: broadcast::Receiver, + mut widget_receiver: broadcast::Receiver, container: gtk::Box, icon_size: i32, image_provider: Provider, @@ -92,7 +92,7 @@ async fn handle_update_events( // TODO: Ensure the visible icons are always in the same order let mut icons = HashMap::::new(); - while let Result::Ok(event) = receiver.recv().await { + while let Result::Ok(event) = widget_receiver.recv().await { match event { ClientToModuleEvent::DeviceChanged { interface, From 5385c7e7056780e8ca7acd7e9cb71e70cd077d36 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Wed, 3 Sep 2025 11:29:40 +0200 Subject: [PATCH 05/12] refactor(networkmanager): Replace Mutex with RwLock for shared state in run(), add debug logging --- src/clients/networkmanager/mod.rs | 32 ++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 2490b0a..b6eb1ab 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -3,7 +3,8 @@ use color_eyre::eyre::Ok; use futures_lite::StreamExt; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::{Mutex, broadcast}; +use tokio::sync::{RwLock, broadcast}; +use tracing::debug; use zbus::Connection; use zbus::zvariant::ObjectPath; @@ -32,7 +33,9 @@ impl Client { } fn run(&self) -> Result<()> { - let devices = Arc::new(Mutex::new(HashSet::::new())); + debug!("Client running"); + + let devices = Arc::new(RwLock::new(HashSet::::new())); spawn(watch_devices_list( devices.clone(), @@ -65,9 +68,11 @@ pub fn create_client() -> ClientResult { } async fn watch_devices_list( - devices: Arc>>>, + devices: Arc>>>, controller_sender: broadcast::Sender, ) -> Result<()> { + debug!("D-Bus devices list watcher starting"); + let dbus_connection = Connection::system().await?; let root = DbusProxy::new(&dbus_connection).await?; @@ -82,7 +87,7 @@ async fn watch_devices_list( .collect::>(); // Atomic read-then-write of `devices` - let mut devices_locked = devices.lock().await; + let mut devices_locked = devices.write().await; let devices_snapshot = devices_locked.clone(); (*devices_locked).clone_from(&new_devices); drop(devices_locked); @@ -96,7 +101,8 @@ async fn watch_devices_list( } let _removed_devices = devices_snapshot.difference(&new_devices); - // TODO: Cook up some way to notify closures for removed devices to exit + // TODO: Store join handles for watchers and abort them when their device is removed + // TODO: Inform module of removed devices } Ok(()) @@ -104,16 +110,18 @@ async fn watch_devices_list( async fn handle_received_events( mut receiver: broadcast::Receiver, - devices: Arc>>>, + devices: Arc>>>, controller_sender: broadcast::Sender, ) -> Result<()> { while let Result::Ok(event) = receiver.recv().await { match event { ModuleToClientEvent::NewController => { + debug!("Client received NewController event"); + let dbus_connection = Connection::system().await?; // We create a local clone here to avoid holding the lock for too long - let devices_snapshot = devices.lock().await.clone(); + let devices_snapshot = devices.read().await.clone(); for device_path in devices_snapshot { let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; @@ -135,11 +143,11 @@ async fn handle_received_events( } async fn watch_device( - device_path: ObjectPath<'_>, + path: ObjectPath<'_>, controller_sender: broadcast::Sender, ) -> Result<()> { let dbus_connection = Connection::system().await?; - let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?; + let device = DeviceDbusProxy::new(&dbus_connection, path.to_owned()).await?; spawn(watch_device_state(device, controller_sender)); @@ -150,6 +158,10 @@ async fn watch_device_state( device: DeviceDbusProxy<'_>, controller_sender: broadcast::Sender, ) -> Result<()> { + let path = device.inner().path(); + + debug!("D-Bus device state watcher for {} starting", path); + let interface = device.interface().await?; let r#type = device.device_type().await?; @@ -171,6 +183,8 @@ async fn watch_device_state( })?; } + debug!("D-Bus device state watcher for {} ended", path); + Ok(()) } From 13c2520c769360b873e700a7090eaab41176a07d Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Wed, 3 Sep 2025 12:41:20 +0200 Subject: [PATCH 06/12] refactor(networkmanager): use inner client with static lifetime, make its functions methods --- src/clients/networkmanager/mod.rs | 292 +++++++++++++++--------------- 1 file changed, 150 insertions(+), 142 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index b6eb1ab..4825d0b 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -18,36 +18,54 @@ pub mod event; #[derive(Debug)] pub struct Client { - controller_sender: broadcast::Sender, - sender: broadcast::Sender, + inner: &'static ClientInner, } impl Client { - fn new() -> Result { - let (controller_sender, _) = broadcast::channel(64); - let (sender, _) = broadcast::channel(8); - Ok(Client { - controller_sender, - sender, - }) + fn new() -> Client { + let inner = Box::leak(Box::new(ClientInner::new())); + Client { inner } } fn run(&self) -> Result<()> { + self.inner.run() + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.inner.subscribe() + } + + pub fn get_sender(&self) -> broadcast::Sender { + self.inner.get_sender() + } +} + +#[derive(Debug)] +struct ClientInner { + controller_sender: broadcast::Sender, + sender: broadcast::Sender, + devices: RwLock>>, +} + +impl ClientInner { + fn new() -> ClientInner { + let (controller_sender, _) = broadcast::channel(64); + let (sender, _) = broadcast::channel(8); + let devices = RwLock::new(HashSet::::new()); + ClientInner { + controller_sender, + sender, + devices, + } + } + + fn run(&'static self) -> Result<()> { debug!("Client running"); - let devices = Arc::new(RwLock::new(HashSet::::new())); - - spawn(watch_devices_list( - devices.clone(), - self.controller_sender.clone(), - )); + spawn(self.watch_devices_list()); let receiver = self.sender.subscribe(); - spawn(handle_received_events( - receiver, - devices.clone(), - self.controller_sender.clone(), - )); + spawn(self.handle_received_events(receiver)); Ok(()) } @@ -59,133 +77,123 @@ impl Client { pub fn get_sender(&self) -> broadcast::Sender { self.sender.clone() } + + async fn watch_devices_list(&'static self) -> Result<()> { + debug!("D-Bus devices list watcher starting"); + + let dbus_connection = Connection::system().await?; + let root = DbusProxy::new(&dbus_connection).await?; + + let mut devices_changes = root.receive_all_devices_changed().await; + while let Some(devices_change) = devices_changes.next().await { + // The new list of devices from dbus, not to be confused with the added devices below + let new_devices = devices_change + .get() + .await? + .iter() + .map(ObjectPath::to_owned) + .collect::>(); + + // Atomic read-then-write of `devices` + let mut devices_locked = self.devices.write().await; + let devices_snapshot = devices_locked.clone(); + (*devices_locked).clone_from(&new_devices); + drop(devices_locked); + + let added_devices = new_devices.difference(&devices_snapshot); + for added_device in added_devices { + spawn(self.watch_device(added_device.to_owned())); + } + + let _removed_devices = devices_snapshot.difference(&new_devices); + // TODO: Store join handles for watchers and abort them when their device is removed + // TODO: Inform module of removed devices + } + + Ok(()) + } + + async fn handle_received_events( + &'static self, + mut receiver: broadcast::Receiver, + ) -> Result<()> { + while let Result::Ok(event) = receiver.recv().await { + match event { + ModuleToClientEvent::NewController => { + debug!("Client received NewController event"); + + let dbus_connection = Connection::system().await?; + + // We create a local clone here to avoid holding the lock for too long + let devices_snapshot = self.devices.read().await.clone(); + + for device_path in devices_snapshot { + let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; + + let interface = device.interface().await?.to_string(); + let r#type = device.device_type().await?; + let new_state = device.state().await?; + self.controller_sender + .send(ClientToModuleEvent::DeviceChanged { + interface, + r#type, + new_state, + })?; + } + } + } + } + + Ok(()) + } + + async fn watch_device(&'static self, path: ObjectPath<'static>) -> Result<()> { + let dbus_connection = Connection::system().await?; + let device = DeviceDbusProxy::new(&dbus_connection, path).await?; + + spawn(self.watch_device_state(device)); + + Ok(()) + } + + async fn watch_device_state(&'static self, device: DeviceDbusProxy<'_>) -> Result<()> { + let path = device.inner().path(); + + debug!("D-Bus device state watcher for {} starting", path); + + let interface = device.interface().await?; + let r#type = device.device_type().await?; + + // Send an event communicating the initial state + let new_state = device.state().await?; + self.controller_sender + .send(ClientToModuleEvent::DeviceChanged { + interface: interface.to_string(), + r#type: r#type.clone(), + new_state, + })?; + + let mut state_changes = device.receive_state_changed().await; + while let Some(state_change) = state_changes.next().await { + let new_state = state_change.get().await?; + self.controller_sender + .send(ClientToModuleEvent::DeviceChanged { + interface: interface.to_string(), + r#type: r#type.clone(), + new_state, + })?; + } + + debug!("D-Bus device state watcher for {} ended", path); + + Ok(()) + } } pub fn create_client() -> ClientResult { - let client = Arc::new(Client::new()?); + let client = Arc::new(Client::new()); client.run()?; Ok(client) } -async fn watch_devices_list( - devices: Arc>>>, - controller_sender: broadcast::Sender, -) -> Result<()> { - debug!("D-Bus devices list watcher starting"); - - let dbus_connection = Connection::system().await?; - let root = DbusProxy::new(&dbus_connection).await?; - - let mut devices_changes = root.receive_all_devices_changed().await; - while let Some(devices_change) = devices_changes.next().await { - // The new list of devices from dbus, not to be confused with the added devices below - let new_devices = devices_change - .get() - .await? - .iter() - .map(ObjectPath::to_owned) - .collect::>(); - - // Atomic read-then-write of `devices` - let mut devices_locked = devices.write().await; - let devices_snapshot = devices_locked.clone(); - (*devices_locked).clone_from(&new_devices); - drop(devices_locked); - - let added_devices = new_devices.difference(&devices_snapshot); - for added_device in added_devices { - spawn(watch_device( - added_device.to_owned(), - controller_sender.clone(), - )); - } - - let _removed_devices = devices_snapshot.difference(&new_devices); - // TODO: Store join handles for watchers and abort them when their device is removed - // TODO: Inform module of removed devices - } - - Ok(()) -} - -async fn handle_received_events( - mut receiver: broadcast::Receiver, - devices: Arc>>>, - controller_sender: broadcast::Sender, -) -> Result<()> { - while let Result::Ok(event) = receiver.recv().await { - match event { - ModuleToClientEvent::NewController => { - debug!("Client received NewController event"); - - let dbus_connection = Connection::system().await?; - - // We create a local clone here to avoid holding the lock for too long - let devices_snapshot = devices.read().await.clone(); - - for device_path in devices_snapshot { - let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; - - let interface = device.interface().await?.to_string(); - let r#type = device.device_type().await?; - let new_state = device.state().await?; - controller_sender.send(ClientToModuleEvent::DeviceChanged { - interface, - r#type, - new_state, - })?; - } - } - } - } - - Ok(()) -} - -async fn watch_device( - path: ObjectPath<'_>, - controller_sender: broadcast::Sender, -) -> Result<()> { - let dbus_connection = Connection::system().await?; - let device = DeviceDbusProxy::new(&dbus_connection, path.to_owned()).await?; - - spawn(watch_device_state(device, controller_sender)); - - Ok(()) -} - -async fn watch_device_state( - device: DeviceDbusProxy<'_>, - controller_sender: broadcast::Sender, -) -> Result<()> { - let path = device.inner().path(); - - debug!("D-Bus device state watcher for {} starting", path); - - let interface = device.interface().await?; - let r#type = device.device_type().await?; - - // Send an event communicating the initial state - let new_state = device.state().await?; - controller_sender.send(ClientToModuleEvent::DeviceChanged { - interface: interface.to_string(), - r#type: r#type.clone(), - new_state, - })?; - - let mut state_changes = device.receive_state_changed().await; - while let Some(state_change) = state_changes.next().await { - let new_state = state_change.get().await?; - controller_sender.send(ClientToModuleEvent::DeviceChanged { - interface: interface.to_string(), - r#type: r#type.clone(), - new_state, - })?; - } - - debug!("D-Bus device state watcher for {} ended", path); - - Ok(()) -} - register_fallible_client!(Client, network_manager); From 01de9da7e0ee0bc6675004082c8d9b1fcfaf62b7 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Wed, 3 Sep 2025 18:11:56 +0200 Subject: [PATCH 07/12] refactor(networkmanager): store property watcher join handles & stop them when no longer needed Also optimise dbus connection and proxy creation. --- src/clients/networkmanager/mod.rs | 50 +++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 4825d0b..b085a12 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -1,9 +1,10 @@ use color_eyre::Result; use color_eyre::eyre::Ok; use futures_lite::StreamExt; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::sync::{RwLock, broadcast}; +use tokio::task::JoinHandle; use tracing::debug; use zbus::Connection; use zbus::zvariant::ObjectPath; @@ -45,17 +46,27 @@ struct ClientInner { controller_sender: broadcast::Sender, sender: broadcast::Sender, devices: RwLock>>, + watchers: RwLock, Device>>, + // TODO: Maybe find some way to late-init a dbus connection here + // so we can just clone it when we need it instead of awaiting it every time +} + +#[derive(Debug)] +struct Device { + state_watcher: JoinHandle>, } impl ClientInner { fn new() -> ClientInner { let (controller_sender, _) = broadcast::channel(64); let (sender, _) = broadcast::channel(8); - let devices = RwLock::new(HashSet::::new()); + let devices = RwLock::new(HashSet::new()); + let watchers = RwLock::new(HashMap::new()); ClientInner { controller_sender, sender, devices, + watchers, } } @@ -70,11 +81,11 @@ impl ClientInner { Ok(()) } - pub fn subscribe(&self) -> broadcast::Receiver { + fn subscribe(&self) -> broadcast::Receiver { self.controller_sender.subscribe() } - pub fn get_sender(&self) -> broadcast::Sender { + fn get_sender(&self) -> broadcast::Sender { self.sender.clone() } @@ -94,6 +105,8 @@ impl ClientInner { .map(ObjectPath::to_owned) .collect::>(); + // TODO: Use `self.watchers` instead of `self.devices`, which requires creating all property watchers straightaway + // Atomic read-then-write of `devices` let mut devices_locked = self.devices.write().await; let devices_snapshot = devices_locked.clone(); @@ -105,9 +118,16 @@ impl ClientInner { spawn(self.watch_device(added_device.to_owned())); } - let _removed_devices = devices_snapshot.difference(&new_devices); - // TODO: Store join handles for watchers and abort them when their device is removed // TODO: Inform module of removed devices + let removed_devices = devices_snapshot.difference(&new_devices); + for removed_device in removed_devices { + let mut watchers = self.watchers.write().await; + let device = watchers.get(removed_device).unwrap(); + device.state_watcher.abort(); + watchers.remove(removed_device); + + debug!("D-bus device state watcher for {} stopped", removed_device); + } } Ok(()) @@ -117,13 +137,13 @@ impl ClientInner { &'static self, mut receiver: broadcast::Receiver, ) -> Result<()> { + let dbus_connection = Connection::system().await?; + while let Result::Ok(event) = receiver.recv().await { match event { ModuleToClientEvent::NewController => { debug!("Client received NewController event"); - let dbus_connection = Connection::system().await?; - // We create a local clone here to avoid holding the lock for too long let devices_snapshot = self.devices.read().await.clone(); @@ -148,16 +168,20 @@ impl ClientInner { } async fn watch_device(&'static self, path: ObjectPath<'static>) -> Result<()> { - let dbus_connection = Connection::system().await?; - let device = DeviceDbusProxy::new(&dbus_connection, path).await?; + debug_assert!(!self.watchers.read().await.contains_key(&path)); - spawn(self.watch_device_state(device)); + let state_watcher = spawn(self.watch_device_state(path.clone())); + self.watchers + .write() + .await + .insert(path, Device { state_watcher }); Ok(()) } - async fn watch_device_state(&'static self, device: DeviceDbusProxy<'_>) -> Result<()> { - let path = device.inner().path(); + async fn watch_device_state(&'static self, path: ObjectPath<'_>) -> Result<()> { + let dbus_connection = Connection::system().await?; + let device = DeviceDbusProxy::new(&dbus_connection, path.clone()).await?; debug!("D-Bus device state watcher for {} starting", path); From f83c9e6852971b024eb7039d2586f4684b45dd82 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Thu, 4 Sep 2025 10:59:48 +0200 Subject: [PATCH 08/12] refactor(networkmanager): merge devices and watchers fields in ClientInner --- src/clients/networkmanager/mod.rs | 71 ++++++++++++------------------- 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index b085a12..da7e5bf 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -45,28 +45,25 @@ impl Client { struct ClientInner { controller_sender: broadcast::Sender, sender: broadcast::Sender, - devices: RwLock>>, - watchers: RwLock, Device>>, + device_watchers: RwLock, DeviceWatcher>>, // TODO: Maybe find some way to late-init a dbus connection here // so we can just clone it when we need it instead of awaiting it every time } -#[derive(Debug)] -struct Device { - state_watcher: JoinHandle>, +#[derive(Clone, Debug)] +struct DeviceWatcher { + state_watcher: Arc>>, } impl ClientInner { fn new() -> ClientInner { let (controller_sender, _) = broadcast::channel(64); let (sender, _) = broadcast::channel(8); - let devices = RwLock::new(HashSet::new()); - let watchers = RwLock::new(HashMap::new()); + let device_watchers = RwLock::new(HashMap::new()); ClientInner { controller_sender, sender, - devices, - watchers, + device_watchers, } } @@ -98,35 +95,34 @@ impl ClientInner { let mut devices_changes = root.receive_all_devices_changed().await; while let Some(devices_change) = devices_changes.next().await { // The new list of devices from dbus, not to be confused with the added devices below - let new_devices = devices_change + let new_device_paths = devices_change .get() .await? .iter() .map(ObjectPath::to_owned) .collect::>(); - // TODO: Use `self.watchers` instead of `self.devices`, which requires creating all property watchers straightaway + let mut watchers = self.device_watchers.write().await; + let device_paths = watchers.keys().cloned().collect::>(); - // Atomic read-then-write of `devices` - let mut devices_locked = self.devices.write().await; - let devices_snapshot = devices_locked.clone(); - (*devices_locked).clone_from(&new_devices); - drop(devices_locked); + let added_device_paths = new_device_paths.difference(&device_paths); + for added_device_path in added_device_paths { + debug_assert!(!watchers.contains_key(added_device_path)); - let added_devices = new_devices.difference(&devices_snapshot); - for added_device in added_devices { - spawn(self.watch_device(added_device.to_owned())); + let watcher = self.watch_device(added_device_path.clone()); + watchers.insert(added_device_path.clone(), watcher); } // TODO: Inform module of removed devices - let removed_devices = devices_snapshot.difference(&new_devices); - for removed_device in removed_devices { - let mut watchers = self.watchers.write().await; - let device = watchers.get(removed_device).unwrap(); - device.state_watcher.abort(); - watchers.remove(removed_device); + let removed_device_paths = device_paths.difference(&new_device_paths); + for removed_device_path in removed_device_paths { + let watcher = watchers + .get(removed_device_path) + .expect("Device to be removed should be present in watchers"); + watcher.state_watcher.abort(); + watchers.remove(removed_device_path); - debug!("D-bus device state watcher for {} stopped", removed_device); + debug!("D-bus device watchers for {} stopped", removed_device_path); } } @@ -144,10 +140,7 @@ impl ClientInner { ModuleToClientEvent::NewController => { debug!("Client received NewController event"); - // We create a local clone here to avoid holding the lock for too long - let devices_snapshot = self.devices.read().await.clone(); - - for device_path in devices_snapshot { + for device_path in self.device_watchers.read().await.keys() { let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; let interface = device.interface().await?.to_string(); @@ -167,24 +160,18 @@ impl ClientInner { Ok(()) } - async fn watch_device(&'static self, path: ObjectPath<'static>) -> Result<()> { - debug_assert!(!self.watchers.read().await.contains_key(&path)); + fn watch_device(&'static self, path: ObjectPath<'static>) -> DeviceWatcher { + let state_watcher = Arc::new(spawn(self.watch_device_state(path))); - let state_watcher = spawn(self.watch_device_state(path.clone())); - self.watchers - .write() - .await - .insert(path, Device { state_watcher }); - - Ok(()) + DeviceWatcher { state_watcher } } async fn watch_device_state(&'static self, path: ObjectPath<'_>) -> Result<()> { + debug!("D-Bus device state watcher for {} starting", path); + let dbus_connection = Connection::system().await?; let device = DeviceDbusProxy::new(&dbus_connection, path.clone()).await?; - debug!("D-Bus device state watcher for {} starting", path); - let interface = device.interface().await?; let r#type = device.device_type().await?; @@ -208,8 +195,6 @@ impl ClientInner { })?; } - debug!("D-Bus device state watcher for {} ended", path); - Ok(()) } } From d752e88abf3e338fdfefbfd9ca2d9b0f7e54c0ce Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Thu, 4 Sep 2025 13:25:48 +0200 Subject: [PATCH 09/12] refactor(networkmanager): make dbus connection a ClientInner field Should be more efficient as the connection will now only be created once. --- src/clients/networkmanager/mod.rs | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index da7e5bf..4ee6a6f 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -46,8 +46,7 @@ struct ClientInner { controller_sender: broadcast::Sender, sender: broadcast::Sender, device_watchers: RwLock, DeviceWatcher>>, - // TODO: Maybe find some way to late-init a dbus connection here - // so we can just clone it when we need it instead of awaiting it every time + dbus_connection: RwLock>, } #[derive(Clone, Debug)] @@ -60,10 +59,12 @@ impl ClientInner { let (controller_sender, _) = broadcast::channel(64); let (sender, _) = broadcast::channel(8); let device_watchers = RwLock::new(HashMap::new()); + let dbus_connection = RwLock::new(None); ClientInner { controller_sender, sender, device_watchers, + dbus_connection, } } @@ -89,8 +90,7 @@ impl ClientInner { async fn watch_devices_list(&'static self) -> Result<()> { debug!("D-Bus devices list watcher starting"); - let dbus_connection = Connection::system().await?; - let root = DbusProxy::new(&dbus_connection).await?; + let root = DbusProxy::new(&self.dbus_connection().await?).await?; let mut devices_changes = root.receive_all_devices_changed().await; while let Some(devices_change) = devices_changes.next().await { @@ -133,15 +133,14 @@ impl ClientInner { &'static self, mut receiver: broadcast::Receiver, ) -> Result<()> { - let dbus_connection = Connection::system().await?; - while let Result::Ok(event) = receiver.recv().await { match event { ModuleToClientEvent::NewController => { debug!("Client received NewController event"); for device_path in self.device_watchers.read().await.keys() { - let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?; + let dbus_connection = &self.dbus_connection().await?; + let device = DeviceDbusProxy::new(dbus_connection, device_path).await?; let interface = device.interface().await?.to_string(); let r#type = device.device_type().await?; @@ -197,6 +196,21 @@ impl ClientInner { Ok(()) } + + async fn dbus_connection(&self) -> Result { + let dbus_connection_guard = self.dbus_connection.read().await; + if let Some(dbus_connection) = &*dbus_connection_guard { + Ok(dbus_connection.clone()) + } else { + // Yes it's a bit awkward to first obtain a read lock and then a write lock but it + // needs to happen only once, and after that all read lock acquisitions will be + // instant + drop(dbus_connection_guard); + let dbus_connection = Connection::system().await?; + *self.dbus_connection.write().await = Some(dbus_connection.clone()); + Ok(dbus_connection) + } + } } pub fn create_client() -> ClientResult { From af49acb40bb393ecfe1619268c8c3a1b2a61013f Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Thu, 4 Sep 2025 14:25:20 +0200 Subject: [PATCH 10/12] fix(networkmanager): remove icons for removed devices --- src/clients/networkmanager/mod.rs | 8 +++++++- src/modules/networkmanager.rs | 15 +++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 4ee6a6f..350a68f 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -113,7 +113,6 @@ impl ClientInner { watchers.insert(added_device_path.clone(), watcher); } - // TODO: Inform module of removed devices let removed_device_paths = device_paths.difference(&new_device_paths); for removed_device_path in removed_device_paths { let watcher = watchers @@ -122,6 +121,13 @@ impl ClientInner { watcher.state_watcher.abort(); watchers.remove(removed_device_path); + // TODO: Replace the identifier sent to modules with the dbus device number (last segment of its path) + let dbus_connection = &self.dbus_connection().await?; + let device = DeviceDbusProxy::new(dbus_connection, removed_device_path).await?; + let interface = device.interface().await?.to_string(); + self.controller_sender + .send(ClientToModuleEvent::DeviceRemoved { interface })?; + debug!("D-bus device watchers for {} stopped", removed_device_path); } } diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index 2ee5e9c..34225b7 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -13,6 +13,7 @@ use gtk::{Image, Orientation}; use serde::Deserialize; use std::collections::HashMap; use tokio::sync::{broadcast, mpsc}; +use tracing::debug; #[derive(Debug, Deserialize, Clone)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] @@ -99,7 +100,9 @@ async fn handle_update_events( r#type, new_state, } => { - let icon: &_ = icons.entry(interface).or_insert_with(|| { + let icon: &_ = icons.entry(interface.clone()).or_insert_with(|| { + debug!("Adding icon for {}", interface); + let icon = Image::new(); icon.add_class("icon"); container.add(&icon); @@ -120,7 +123,15 @@ async fn handle_update_events( } } } - ClientToModuleEvent::DeviceRemoved { .. } => {} + ClientToModuleEvent::DeviceRemoved { interface } => { + let icon = icons + .get(interface.as_str()) + .expect("The icon for {} was about to be removed but was not present"); + container.remove(icon); + icons.remove(interface.as_str()); + + debug!("Removed icon for {}", interface); + } } } From db88e12b8e3ba01c520d5aced852e8682f294616 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Thu, 4 Sep 2025 14:54:23 +0200 Subject: [PATCH 11/12] refactor(networkmanager): identify devices by their number outside of the client --- src/clients/networkmanager/event.rs | 4 ++-- src/clients/networkmanager/mod.rs | 26 ++++++++++++++++---------- src/modules/networkmanager.rs | 16 ++++++++-------- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/clients/networkmanager/event.rs b/src/clients/networkmanager/event.rs index 85293f5..2fdca55 100644 --- a/src/clients/networkmanager/event.rs +++ b/src/clients/networkmanager/event.rs @@ -3,12 +3,12 @@ use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; #[derive(Debug, Clone)] pub enum ClientToModuleEvent { DeviceChanged { - interface: String, + number: u32, r#type: DeviceType, new_state: DeviceState, }, DeviceRemoved { - interface: String, + number: u32, }, } diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 350a68f..a0758af 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -121,12 +121,9 @@ impl ClientInner { watcher.state_watcher.abort(); watchers.remove(removed_device_path); - // TODO: Replace the identifier sent to modules with the dbus device number (last segment of its path) - let dbus_connection = &self.dbus_connection().await?; - let device = DeviceDbusProxy::new(dbus_connection, removed_device_path).await?; - let interface = device.interface().await?.to_string(); + let number = get_number_from_dbus_path(removed_device_path); self.controller_sender - .send(ClientToModuleEvent::DeviceRemoved { interface })?; + .send(ClientToModuleEvent::DeviceRemoved { number })?; debug!("D-bus device watchers for {} stopped", removed_device_path); } @@ -148,12 +145,12 @@ impl ClientInner { let dbus_connection = &self.dbus_connection().await?; let device = DeviceDbusProxy::new(dbus_connection, device_path).await?; - let interface = device.interface().await?.to_string(); + let number = get_number_from_dbus_path(device_path); let r#type = device.device_type().await?; let new_state = device.state().await?; self.controller_sender .send(ClientToModuleEvent::DeviceChanged { - interface, + number, r#type, new_state, })?; @@ -177,14 +174,14 @@ impl ClientInner { let dbus_connection = Connection::system().await?; let device = DeviceDbusProxy::new(&dbus_connection, path.clone()).await?; - let interface = device.interface().await?; + let number = get_number_from_dbus_path(&path); let r#type = device.device_type().await?; // Send an event communicating the initial state let new_state = device.state().await?; self.controller_sender .send(ClientToModuleEvent::DeviceChanged { - interface: interface.to_string(), + number, r#type: r#type.clone(), new_state, })?; @@ -194,7 +191,7 @@ impl ClientInner { let new_state = state_change.get().await?; self.controller_sender .send(ClientToModuleEvent::DeviceChanged { - interface: interface.to_string(), + number, r#type: r#type.clone(), new_state, })?; @@ -225,4 +222,13 @@ pub fn create_client() -> ClientResult { Ok(client) } +fn get_number_from_dbus_path(path: &ObjectPath) -> u32 { + let (_, number_str) = path + .rsplit_once('/') + .expect("Path must have at least two segments to contain an object number"); + number_str + .parse() + .expect("Last segment was not a positive integer") +} + register_fallible_client!(Client, network_manager); diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index 34225b7..238f558 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -91,17 +91,17 @@ async fn handle_update_events( image_provider: Provider, ) -> Result<()> { // TODO: Ensure the visible icons are always in the same order - let mut icons = HashMap::::new(); + let mut icons = HashMap::::new(); while let Result::Ok(event) = widget_receiver.recv().await { match event { ClientToModuleEvent::DeviceChanged { - interface, + number, r#type, new_state, } => { - let icon: &_ = icons.entry(interface.clone()).or_insert_with(|| { - debug!("Adding icon for {}", interface); + let icon: &_ = icons.entry(number).or_insert_with(|| { + debug!("Adding icon for device {}", number); let icon = Image::new(); icon.add_class("icon"); @@ -123,14 +123,14 @@ async fn handle_update_events( } } } - ClientToModuleEvent::DeviceRemoved { interface } => { + ClientToModuleEvent::DeviceRemoved { number } => { let icon = icons - .get(interface.as_str()) + .get(&number) .expect("The icon for {} was about to be removed but was not present"); container.remove(icon); - icons.remove(interface.as_str()); + icons.remove(&number); - debug!("Removed icon for {}", interface); + debug!("Removed icon for device {}", number); } } } From a106f41b0af6ed9942ef03aec126882bedf872ac Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Thu, 4 Sep 2025 20:41:55 +0200 Subject: [PATCH 12/12] fix(networkmanager): notify upon new device from watch_device() Additionally: - Pass the device proxy to watch_device_state() now that we've had to create one anyway - Improve event received logging in module widget --- src/clients/networkmanager/mod.rs | 41 +++++++++++++++++-------------- src/modules/networkmanager.rs | 12 +++++++-- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index a0758af..7459531 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -109,7 +109,7 @@ impl ClientInner { for added_device_path in added_device_paths { debug_assert!(!watchers.contains_key(added_device_path)); - let watcher = self.watch_device(added_device_path.clone()); + let watcher = self.watch_device(added_device_path.clone()).await?; watchers.insert(added_device_path.clone(), watcher); } @@ -162,23 +162,15 @@ impl ClientInner { Ok(()) } - fn watch_device(&'static self, path: ObjectPath<'static>) -> DeviceWatcher { - let state_watcher = Arc::new(spawn(self.watch_device_state(path))); - - DeviceWatcher { state_watcher } - } - - async fn watch_device_state(&'static self, path: ObjectPath<'_>) -> Result<()> { - debug!("D-Bus device state watcher for {} starting", path); - - let dbus_connection = Connection::system().await?; - let device = DeviceDbusProxy::new(&dbus_connection, path.clone()).await?; + async fn watch_device(&'static self, path: ObjectPath<'_>) -> Result { + let dbus_connection = &self.dbus_connection().await?; + let proxy = DeviceDbusProxy::new(dbus_connection, path.to_owned()).await?; let number = get_number_from_dbus_path(&path); - let r#type = device.device_type().await?; + let r#type = proxy.device_type().await?; + let new_state = proxy.state().await?; - // Send an event communicating the initial state - let new_state = device.state().await?; + // Notify modules that the device exists even if its properties don't change self.controller_sender .send(ClientToModuleEvent::DeviceChanged { number, @@ -186,9 +178,22 @@ impl ClientInner { new_state, })?; - let mut state_changes = device.receive_state_changed().await; - while let Some(state_change) = state_changes.next().await { - let new_state = state_change.get().await?; + let state_watcher = Arc::new(spawn(self.watch_device_state(proxy))); + + Ok(DeviceWatcher { state_watcher }) + } + + async fn watch_device_state(&'static self, proxy: DeviceDbusProxy<'_>) -> Result<()> { + let path = proxy.inner().path(); + + debug!("D-Bus device state watcher for {} starting", path); + + let number = get_number_from_dbus_path(path); + let r#type = proxy.device_type().await?; + + let mut changes = proxy.receive_state_changed().await; + while let Some(change) = changes.next().await { + let new_state = change.get().await?; self.controller_sender .send(ClientToModuleEvent::DeviceChanged { number, diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index 238f558..62518e4 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -100,6 +100,11 @@ async fn handle_update_events( r#type, new_state, } => { + debug!( + "Module widget received DeviceChanged event for number {}", + number + ); + let icon: &_ = icons.entry(number).or_insert_with(|| { debug!("Adding icon for device {}", number); @@ -124,13 +129,16 @@ async fn handle_update_events( } } ClientToModuleEvent::DeviceRemoved { number } => { + debug!( + "Module widget received DeviceRemoved event for number {}", + number + ); + let icon = icons .get(&number) .expect("The icon for {} was about to be removed but was not present"); container.remove(icon); icons.remove(&number); - - debug!("Removed icon for device {}", number); } } }