From 9ca5f4baa4461c3028bbd211056e9d2f1cb8d2cf Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Thu, 14 Aug 2025 18:14:31 +0200 Subject: [PATCH 1/4] WIP failed attempt at StreamMap-based implementation --- Cargo.lock | 14 +- Cargo.toml | 3 +- src/clients/networkmanager/dbus.rs | 16 +- src/clients/networkmanager/event.rs | 2 + src/clients/networkmanager/mod.rs | 294 ++++++++++------------------ src/clients/networkmanager/state.rs | 8 +- src/modules/networkmanager.rs | 107 ++-------- 7 files changed, 148 insertions(+), 296 deletions(-) create mode 100644 src/clients/networkmanager/event.rs diff --git a/Cargo.lock b/Cargo.lock index e6b8421..ec5356b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -733,7 +733,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -1869,6 +1869,7 @@ dependencies = [ "sysinfo", "system-tray", "tokio", + "tokio-stream", "tracing", "tracing-appender", "tracing-error", @@ -3524,6 +3525,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.13" diff --git a/Cargo.toml b/Cargo.toml index 50f0e85..fd295b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,7 +84,7 @@ music = ["dep:regex"] "music+mpris" = ["music", "mpris"] "music+mpd" = ["music", "mpd-utils"] -network_manager = ["futures-lite", "futures-signals", "zbus"] +network_manager = ["futures-lite", "futures-signals", "tokio-stream", "zbus"] notifications = ["zbus"] @@ -172,6 +172,7 @@ regex = { version = "1.11.1", default-features = false, features = [ # network_manager futures-signals = { version = "0.3.34", optional = true } +tokio-stream = { version = "0.1.17", optional = true } # sys_info sysinfo = { version = "0.35.2", optional = true } diff --git a/src/clients/networkmanager/dbus.rs b/src/clients/networkmanager/dbus.rs index b3dd915..0370bba 100644 --- a/src/clients/networkmanager/dbus.rs +++ b/src/clients/networkmanager/dbus.rs @@ -9,10 +9,10 @@ use zbus::zvariant::{ObjectPath, OwnedValue, Str}; )] pub(super) trait Dbus { #[zbus(property)] - fn active_connections(&self) -> Result>; + fn active_connections(&self) -> Result>>; #[zbus(property)] - fn devices(&self) -> Result>; + fn devices(&self) -> Result>>; // #[zbus(property)] // fn networking_enabled(&self) -> Result; @@ -42,13 +42,13 @@ pub(super) trait ActiveConnectionDbus { // fn default6(&self) -> Result; #[zbus(property)] - fn devices(&self) -> Result>; + fn devices(&self) -> Result>>; // #[zbus(property)] // fn id(&self) -> Result; #[zbus(property)] - fn type_(&self) -> Result; + fn type_(&self) -> Result>; // #[zbus(property)] // fn uuid(&self) -> Result; @@ -69,7 +69,7 @@ pub(super) trait DeviceDbus { fn state(&self) -> Result; } -#[derive(Clone, Debug, OwnedValue, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, OwnedValue, PartialEq)] #[repr(u32)] pub(super) enum DeviceType { Unknown = 0, @@ -131,3 +131,9 @@ impl DeviceState { ) } } + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub(super) struct Device<'l> { + pub object_path: ObjectPath<'l>, + pub type_: DeviceType, +} diff --git a/src/clients/networkmanager/event.rs b/src/clients/networkmanager/event.rs new file mode 100644 index 0000000..eea3902 --- /dev/null +++ b/src/clients/networkmanager/event.rs @@ -0,0 +1,2 @@ +#[derive(Debug, Clone)] +pub enum Event {} diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 5f75b08..d1b47dd 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -1,226 +1,130 @@ -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; - use color_eyre::Result; +use color_eyre::eyre::Error; use futures_lite::StreamExt; -use futures_signals::signal::{Mutable, MutableSignalCloned}; -use tracing::error; +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; +use tokio::join; +use tokio::sync::{RwLock, broadcast}; +use tokio::time::sleep; +use tokio_stream::StreamMap; use zbus::Connection; +use zbus::proxy::PropertyStream; use zbus::zvariant::ObjectPath; -use crate::clients::networkmanager::dbus::{ActiveConnectionDbusProxy, DbusProxy, DeviceDbusProxy}; -use crate::clients::networkmanager::state::{ - CellularState, State, VpnState, WifiState, WiredState, determine_cellular_state, - determine_vpn_state, determine_wifi_state, determine_wired_state, -}; -use crate::{ - read_lock, register_fallible_client, spawn_blocking, spawn_blocking_result, write_lock, -}; +use crate::clients::networkmanager::dbus::{DbusProxy, Device, DeviceDbusProxy, DeviceState}; +use crate::clients::networkmanager::event::Event; +use crate::{register_fallible_client, spawn}; mod dbus; -pub mod state; - -type PathMap<'l, ValueType> = HashMap, ValueType>; +pub mod event; #[derive(Debug)] -pub struct Client(Arc>); - -#[derive(Debug)] -struct ClientInner<'l> { - state: Mutable, - root_object: &'l DbusProxy<'l>, - active_connections: RwLock>>, - devices: RwLock>>, - dbus_connection: Connection, +pub struct Client { + tx: broadcast::Sender, } impl Client { async fn new() -> Result { - let state = Mutable::new(State { - wired: WiredState::Unknown, - wifi: WifiState::Unknown, - cellular: CellularState::Unknown, - vpn: VpnState::Unknown, - }); - let dbus_connection = Connection::system().await?; - let root_object = { - let root_object = DbusProxy::new(&dbus_connection).await?; - // Workaround for the fact that zbus (unnecessarily) requires a static lifetime here - Box::leak(Box::new(root_object)) - }; - - Ok(Client(Arc::new(ClientInner { - state, - root_object, - active_connections: RwLock::new(HashMap::new()), - devices: RwLock::new(HashMap::new()), - dbus_connection, - }))) + let (tx, _) = broadcast::channel(64); + Ok(Client { tx }) } async fn run(&self) -> Result<()> { - // TODO: Reimplement DBus watching without these write-only macros + let dbus_connection = Connection::system().await?; + let root_object = DbusProxy::new(&dbus_connection).await?; - let mut active_connections_stream = self.0.root_object.receive_active_connections_changed().await; - while let Some(change) = active_connections_stream.next().await { + let device_state_changes = + RwLock::new(StreamMap::>::new()); - } + let _ = join!( + // Handles the addition and removal of device objects + async { + let mut devices_changes = root_object.receive_devices_changed().await; + while let Some(change) = devices_changes.next().await { + println!("here?"); - // ActiveConnectionDbusProxy::builder(&self.0.dbus_connection) + let devices = HashSet::from_iter( + device_state_changes + .read() + .await + .keys() + .map(|device| &device.object_path) + .cloned(), + ); - // macro_rules! update_state_for_device_change { - // ($client:ident) => { - // $client.state.set(State { - // wired: determine_wired_state(&read_lock!($client.devices)).await?, - // wifi: determine_wifi_state(&read_lock!($client.devices)).await?, - // cellular: determine_cellular_state(&read_lock!($client.devices)).await?, - // vpn: $client.state.get_cloned().vpn, - // }); - // }; - // } - // - // macro_rules! initialise_path_map { - // ( - // $client:expr, - // $path_map:ident, - // $proxy_type:ident - // $(, |$new_path:ident| $property_watcher:expr)* - // ) => { - // let new_paths = $client.root_object.$path_map().await?; - // let mut path_map = HashMap::new(); - // for new_path in new_paths { - // let new_proxy = $proxy_type::builder(&$client.dbus_connection) - // .path(new_path.clone())? - // .build().await?; - // path_map.insert(new_path.clone(), new_proxy); - // $({ - // let $new_path = &new_path; - // $property_watcher; - // })* - // } - // *write_lock!($client.$path_map) = path_map; - // }; - // } - // - // macro_rules! spawn_path_list_watcher { - // ( - // $client:expr, - // $property:ident, - // $property_changes:ident, - // $proxy_type:ident, - // |$state_client:ident| $state_update:expr - // $(, |$property_client:ident, $new_path:ident| $property_watcher:expr)* - // ) => { - // let client = $client.clone(); - // - // let changes = client.root_object.$property_changes(); - // for _ in changes { - // let mut new_path_map = HashMap::new(); - // { - // let new_paths = client.root_object.$property()?; - // let path_map = read_lock!(client.$property); - // for new_path in new_paths { - // if path_map.contains_key(&new_path) { - // let proxy = path_map - // .get(&new_path) - // .expect("Should contain the key, guarded by runtime check"); - // new_path_map.insert(new_path, proxy.to_owned()); - // } else { - // let new_proxy = $proxy_type::builder(&client.dbus_connection) - // .path(new_path.clone())? - // .build()?; - // new_path_map.insert(new_path.clone(), new_proxy); - // $({ - // let $property_client = &client; - // let $new_path = &new_path; - // $property_watcher; - // })* - // } - // } - // } - // *write_lock!(client.$property) = new_path_map; - // let $state_client = &client; - // $state_update; - // } - // } - // } - // - // macro_rules! spawn_property_watcher { - // ( - // $client:expr, - // $path:expr, - // $property_changes:ident, - // $containing_list:ident, - // |$inner_client:ident| $state_update:expr - // ) => { - // let client = $client.clone(); - // let path = $path.clone(); - // - // let changes = read_lock!(client.$containing_list) - // .get(&path) - // .expect("Should contain the key upon watcher start") - // .$property_changes().await; - // for _ in changes { - // if !read_lock!(client.$containing_list).contains_key(&path) { - // break; - // } - // let $inner_client = &client; - // $state_update; - // } - // }; - // } - // - // initialise_path_map!(self.0, active_connections, ActiveConnectionDbusProxy); - // initialise_path_map!(self.0, devices, DeviceDbusProxy, |path| { - // spawn_property_watcher!(self.0, path, receive_state_changed, devices, |client| { - // update_state_for_device_change!(client); - // }); - // }); - // self.0.state.set(State { - // wired: determine_wired_state(&read_lock!(self.0.devices))?, - // wifi: determine_wifi_state(&read_lock!(self.0.devices))?, - // cellular: determine_cellular_state(&read_lock!(self.0.devices))?, - // vpn: determine_vpn_state(&read_lock!(self.0.active_connections))?, - // }); - // - // spawn_path_list_watcher!( - // self.0, - // active_connections, - // receive_active_connections_changed, - // ActiveConnectionDbusProxy, - // |client| { - // client.state.set(State { - // wired: client.state.get_cloned().wired, - // wifi: client.state.get_cloned().wifi, - // cellular: client.state.get_cloned().cellular, - // vpn: determine_vpn_state(&read_lock!(client.active_connections))?, - // }); - // } - // ); - // spawn_path_list_watcher!( - // self.0, - // devices, - // receive_devices_changed, - // DeviceDbusProxy, - // |client| { - // update_state_for_device_change!(client); - // }, - // |client, path| { - // spawn_property_watcher!(client, path, receive_state_changed, devices, |client| { - // update_state_for_device_change!(client); - // }); - // } - // ); + // The new list of devices from dbus, not to be confused with the added devices below + let new_devices_vec = change.get().await?; + let new_devices = HashSet::::from_iter(new_devices_vec); + println!("Existing devices: {:?}", devices); + println!("New devices: {:?}", new_devices); + + let added_devices = new_devices.difference(&devices); + println!("Added devices: {:?}", added_devices); + for added_device in added_devices { + let device_proxy = + DeviceDbusProxy::new(&dbus_connection, added_device).await?; + let device_type = device_proxy.device_type().await?; + let device_state_stream = device_proxy.receive_state_changed().await; + device_state_changes.write().await.insert( + Device { + object_path: added_device.clone(), + type_: device_type.clone(), // TODO: Remove clone when removing println below + }, + device_state_stream, + ); + println!("Device added: {} type {:?}", added_device, device_type); + } + + let removed_devices = devices.difference(&new_devices); + println!("Removed devices: {:?}", removed_devices); + for removed_device in removed_devices { + let device_proxy = + DeviceDbusProxy::new(&dbus_connection, removed_device).await?; + let device_type = device_proxy.device_type().await?; + device_state_changes.write().await.remove(&Device { + object_path: removed_device.clone(), + type_: device_type.clone(), // TODO: Remove clone when removing println below + }); + println!("Device removed: {} type {:?}", removed_device, device_type); + } + } + Ok::<(), Error>(()) + }, + // Handles changes to device properties + async { + sleep(Duration::from_secs(5)).await; + + /* + Okay so this causes a deadlock, and we should rewrite all of this with spawn() anyway cause join!() is not multithreaded apparently. + In order to not leak memory we could have closures for objects that don't exist anymore check this manually and return. + */ + while let Some((device, property)) = device_state_changes.write().await.next().await + { + let property = property.get().await?; + println!( + "Device state changed: {} to {:?}", + device.object_path, property + ); + } + + println!("Prop loop ended"); + + Ok::<(), Error>(()) + }, + ); Ok(()) } - pub fn subscribe(&self) -> MutableSignalCloned { - self.0.state.signal_cloned() + pub fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() } } pub async fn create_client() -> Result> { + // TODO: Use spawn here after all, otherwise we block on creation + let client = Arc::new(Client::new().await?); client.run().await?; Ok(client) diff --git a/src/clients/networkmanager/state.rs b/src/clients/networkmanager/state.rs index 079bbfa..9e5c4e4 100644 --- a/src/clients/networkmanager/state.rs +++ b/src/clients/networkmanager/state.rs @@ -1,9 +1,11 @@ -use color_eyre::Result; - -use crate::clients::networkmanager::PathMap; use crate::clients::networkmanager::dbus::{ ActiveConnectionDbusProxy, DeviceDbusProxy, DeviceState, DeviceType, }; +use color_eyre::Result; +use std::collections::HashMap; +use zbus::zvariant::ObjectPath; + +type PathMap<'l, ValueType> = HashMap, ValueType>; #[derive(Clone, Debug)] pub struct State { diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index 5914f26..301f0f3 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -1,6 +1,5 @@ use color_eyre::Result; use futures_lite::StreamExt; -use futures_signals::signal::SignalExt; use gtk::prelude::{ContainerExt, WidgetExt}; use gtk::{Box as GtkBox, Image, Orientation}; use serde::Deserialize; @@ -8,9 +7,7 @@ use tokio::sync::mpsc::Receiver; use crate::channels::{AsyncSenderExt, BroadcastReceiverExt}; use crate::clients::networkmanager::Client; -use crate::clients::networkmanager::state::{ - CellularState, State, VpnState, WifiState, WiredState, -}; +use crate::clients::networkmanager::event::Event; use crate::config::CommonConfig; use crate::gtk_helpers::IronbarGtkExt; use crate::modules::{Module, ModuleInfo, ModuleParts, ModuleUpdateEvent, WidgetContext}; @@ -31,7 +28,7 @@ const fn default_icon_size() -> i32 { } impl Module for NetworkManagerModule { - type SendMessage = State; + type SendMessage = Event; type ReceiveMessage = (); module_impl!("network_manager"); @@ -39,27 +36,27 @@ impl Module for NetworkManagerModule { fn spawn_controller( &self, _info: &ModuleInfo, - context: &WidgetContext, + context: &WidgetContext, _rx: Receiver<()>, ) -> Result<()> { let client = context.try_client::()?; - let mut client_signal = client.subscribe().to_stream(); - let widget_transmitter = context.tx.clone(); + // let mut client_signal = client.subscribe().to_stream(); + // let widget_transmitter = context.tx.clone(); - spawn(async move { - while let Some(state) = client_signal.next().await { - widget_transmitter - .send_expect(ModuleUpdateEvent::Update(state)) - .await; - } - }); + // spawn(async move { + // while let Some(state) = client_signal.next().await { + // widget_transmitter + // .send_expect(ModuleUpdateEvent::Update(state)) + // .await; + // } + // }); Ok(()) } fn into_widget( self, - context: WidgetContext, + context: WidgetContext, _info: &ModuleInfo, ) -> Result> { let container = GtkBox::new(Orientation::Horizontal, 0); @@ -88,81 +85,9 @@ impl Module for NetworkManagerModule { vpn_icon.add_class("vpn-icon"); container.add(&vpn_icon); - context.subscribe().recv_glib_async((), move |(), state| { - // TODO: Make this whole section less boneheaded - - let wired_icon_name = match state.wired { - WiredState::Connected => "icon:network-wired-symbolic", - WiredState::Disconnected => "icon:network-wired-disconnected-symbolic", - WiredState::NotPresent | WiredState::Unknown => "", - }; - let wifi_icon_name = match state.wifi { - WifiState::Connected(_) => "icon:network-wireless-connected-symbolic", - WifiState::Disconnected => "icon:network-wireless-offline-symbolic", - WifiState::Disabled => "icon:network-wireless-hardware-disabled-symbolic", - WifiState::NotPresent | WifiState::Unknown => "", - }; - let cellular_icon_name = match state.cellular { - CellularState::Connected => "icon:network-cellular-connected-symbolic", - CellularState::Disconnected => "icon:network-cellular-offline-symbolic", - CellularState::Disabled => "icon:network-cellular-hardware-disabled-symbolic", - CellularState::NotPresent | CellularState::Unknown => "", - }; - let vpn_icon_name = match state.vpn { - VpnState::Connected(_) => "icon:network-vpn-symbolic", - VpnState::Disconnected | VpnState::Unknown => "", - }; - - let wired_icon = wired_icon.clone(); - let wifi_icon = wifi_icon.clone(); - let cellular_icon = cellular_icon.clone(); - let vpn_icon = vpn_icon.clone(); - - let image_provider = context.ironbar.image_provider(); - - async move { - if wired_icon_name.is_empty() { - wired_icon.hide(); - } else { - image_provider - .load_into_image_silent(wired_icon_name, self.icon_size, false, &wired_icon) - .await; - wired_icon.show(); - } - - if wifi_icon_name.is_empty() { - wifi_icon.hide(); - } else { - image_provider - .load_into_image_silent(wifi_icon_name, self.icon_size, false, &wifi_icon) - .await; - wifi_icon.show(); - } - - if cellular_icon_name.is_empty() { - cellular_icon.hide(); - } else { - image_provider - .load_into_image_silent( - cellular_icon_name, - self.icon_size, - false, - &cellular_icon, - ) - .await; - cellular_icon.show(); - } - - if vpn_icon_name.is_empty() { - vpn_icon.hide(); - } else { - image_provider - .load_into_image_silent(vpn_icon_name, self.icon_size, false, &vpn_icon) - .await; - vpn_icon.show(); - } - } - }); + context + .subscribe() + .recv_glib_async((), move |(), event| async {}); Ok(ModuleParts::new(container, None)) } From 1836ab29438ad3042c22c62cfdf5c1e82ac28231 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Sat, 16 Aug 2025 15:31:06 +0200 Subject: [PATCH 2/4] refactor(networkmanager): replace state-based w/ event-based approach --- Cargo.toml | 3 + src/clients/networkmanager/dbus.rs | 65 ++-------- src/clients/networkmanager/event.rs | 14 ++- src/clients/networkmanager/mod.rs | 164 ++++++++++++------------- src/macros.rs | 11 -- src/modules/networkmanager.rs | 178 ++++++++++++++++++++-------- 6 files changed, 230 insertions(+), 205 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fd295b6..1db0add 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,9 @@ repository = "https://github.com/jakestanger/ironbar" categories = ["gui"] keywords = ["gtk", "bar", "wayland", "wlroots", "gtk-layer-shell"] +[profile.release] +lto = true + [features] default = [ "bindmode+all", diff --git a/src/clients/networkmanager/dbus.rs b/src/clients/networkmanager/dbus.rs index 0370bba..1c589d4 100644 --- a/src/clients/networkmanager/dbus.rs +++ b/src/clients/networkmanager/dbus.rs @@ -12,46 +12,10 @@ pub(super) trait Dbus { fn active_connections(&self) -> Result>>; #[zbus(property)] - fn devices(&self) -> Result>>; - - // #[zbus(property)] - // fn networking_enabled(&self) -> Result; - - // #[zbus(property)] - // fn primary_connection(&self) -> Result; - - // #[zbus(property)] - // fn primary_connection_type(&self) -> Result; - - // #[zbus(property)] - // fn wireless_enabled(&self) -> Result; -} - -#[proxy( - default_service = "org.freedesktop.NetworkManager", - interface = "org.freedesktop.NetworkManager.Connection.Active" -)] -pub(super) trait ActiveConnectionDbus { - // #[zbus(property)] - // fn connection(&self) -> Result; - - // #[zbus(property)] - // fn default(&self) -> Result; - - // #[zbus(property)] - // fn default6(&self) -> Result; + fn all_devices(&self) -> Result>>; #[zbus(property)] fn devices(&self) -> Result>>; - - // #[zbus(property)] - // fn id(&self) -> Result; - - #[zbus(property)] - fn type_(&self) -> Result>; - - // #[zbus(property)] - // fn uuid(&self) -> Result; } #[proxy( @@ -59,19 +23,20 @@ pub(super) trait ActiveConnectionDbus { interface = "org.freedesktop.NetworkManager.Device" )] pub(super) trait DeviceDbus { - // #[zbus(property)] - // fn active_connection(&self) -> Result; - #[zbus(property)] fn device_type(&self) -> Result; + #[zbus(property)] + fn interface(&self) -> Result>; + #[zbus(property)] fn state(&self) -> Result; } +// For reference: https://gitlab.freedesktop.org/NetworkManager/NetworkManager/-/blob/e1a7d5ac062f4f23ce3a6b33c62e856056161ad8/src/libnm-core-public/nm-dbus-interface.h#L212-L253 #[derive(Clone, Debug, Eq, Hash, OwnedValue, PartialEq)] #[repr(u32)] -pub(super) enum DeviceType { +pub enum DeviceType { Unknown = 0, Ethernet = 1, Wifi = 2, @@ -105,9 +70,10 @@ pub(super) enum DeviceType { Hsr = 33, } +// For reference: https://gitlab.freedesktop.org/NetworkManager/NetworkManager/-/blob/e1a7d5ac062f4f23ce3a6b33c62e856056161ad8/src/libnm-core-public/nm-dbus-interface.h#L501-L538 #[derive(Clone, Debug, OwnedValue, PartialEq)] #[repr(u32)] -pub(super) enum DeviceState { +pub enum DeviceState { Unknown = 0, Unmanaged = 10, Unavailable = 20, @@ -122,18 +88,3 @@ pub(super) enum DeviceState { Deactivating = 110, Failed = 120, } - -impl DeviceState { - pub(super) fn is_enabled(&self) -> bool { - !matches!( - self, - DeviceState::Unknown | DeviceState::Unmanaged | DeviceState::Unavailable, - ) - } -} - -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub(super) struct Device<'l> { - pub object_path: ObjectPath<'l>, - pub type_: DeviceType, -} diff --git a/src/clients/networkmanager/event.rs b/src/clients/networkmanager/event.rs index eea3902..b85307c 100644 --- a/src/clients/networkmanager/event.rs +++ b/src/clients/networkmanager/event.rs @@ -1,2 +1,14 @@ +use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; + #[derive(Debug, Clone)] -pub enum Event {} +pub enum Event { + DeviceAdded { + interface: String, + r#type: DeviceType, + }, + DeviceStateChanged { + interface: String, + r#type: DeviceType, + state: DeviceState, + }, +} diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index d1b47dd..c6faba7 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -1,22 +1,17 @@ use color_eyre::Result; -use color_eyre::eyre::Error; +use color_eyre::eyre::Ok; use futures_lite::StreamExt; use std::collections::HashSet; use std::sync::Arc; -use std::time::Duration; -use tokio::join; -use tokio::sync::{RwLock, broadcast}; -use tokio::time::sleep; -use tokio_stream::StreamMap; +use tokio::sync::broadcast; use zbus::Connection; -use zbus::proxy::PropertyStream; -use zbus::zvariant::ObjectPath; +use zbus::zvariant::{ObjectPath, Str}; -use crate::clients::networkmanager::dbus::{DbusProxy, Device, DeviceDbusProxy, DeviceState}; +use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy}; use crate::clients::networkmanager::event::Event; use crate::{register_fallible_client, spawn}; -mod dbus; +pub mod dbus; pub mod event; #[derive(Debug)] @@ -31,103 +26,98 @@ impl Client { } async fn run(&self) -> Result<()> { - let dbus_connection = Connection::system().await?; - let root_object = DbusProxy::new(&dbus_connection).await?; + // TODO: Use glib::clone!() - let device_state_changes = - RwLock::new(StreamMap::>::new()); + let tx = self.tx.clone(); + spawn(async move { + let dbus_connection = Connection::system().await?; + let root = DbusProxy::new(&dbus_connection).await?; - let _ = join!( - // Handles the addition and removal of device objects - async { - let mut devices_changes = root_object.receive_devices_changed().await; - while let Some(change) = devices_changes.next().await { - println!("here?"); + // All device types get added to this, but not all types emit events + let mut devices = HashSet::new(); - let devices = HashSet::from_iter( - device_state_changes - .read() - .await - .keys() - .map(|device| &device.object_path) - .cloned(), - ); + let mut devices_changes = root.receive_all_devices_changed().await; + while let Some(devices_change) = devices_changes.next().await { + // The new list of devices from dbus, not to be confused with the added devices below + let new_devices = HashSet::from_iter(devices_change.get().await?); - // The new list of devices from dbus, not to be confused with the added devices below - let new_devices_vec = change.get().await?; - let new_devices = HashSet::::from_iter(new_devices_vec); - println!("Existing devices: {:?}", devices); - println!("New devices: {:?}", new_devices); - - let added_devices = new_devices.difference(&devices); - println!("Added devices: {:?}", added_devices); - for added_device in added_devices { - let device_proxy = - DeviceDbusProxy::new(&dbus_connection, added_device).await?; - let device_type = device_proxy.device_type().await?; - let device_state_stream = device_proxy.receive_state_changed().await; - device_state_changes.write().await.insert( - Device { - object_path: added_device.clone(), - type_: device_type.clone(), // TODO: Remove clone when removing println below - }, - device_state_stream, - ); - println!("Device added: {} type {:?}", added_device, device_type); - } - - let removed_devices = devices.difference(&new_devices); - println!("Removed devices: {:?}", removed_devices); - for removed_device in removed_devices { - let device_proxy = - DeviceDbusProxy::new(&dbus_connection, removed_device).await?; - let device_type = device_proxy.device_type().await?; - device_state_changes.write().await.remove(&Device { - object_path: removed_device.clone(), - type_: device_type.clone(), // TODO: Remove clone when removing println below - }); - println!("Device removed: {} type {:?}", removed_device, device_type); - } - } - Ok::<(), Error>(()) - }, - // Handles changes to device properties - async { - sleep(Duration::from_secs(5)).await; - - /* - Okay so this causes a deadlock, and we should rewrite all of this with spawn() anyway cause join!() is not multithreaded apparently. - In order to not leak memory we could have closures for objects that don't exist anymore check this manually and return. - */ - while let Some((device, property)) = device_state_changes.write().await.next().await - { - let property = property.get().await?; - println!( - "Device state changed: {} to {:?}", - device.object_path, property - ); + let added_devices = new_devices.difference(&devices); + for added_device in added_devices { + spawn(watch_device(added_device.to_owned(), tx.clone())); } - println!("Prop loop ended"); + let removed_devices = devices.difference(&new_devices); + // TODO: Cook up some way to notify closures for removed devices to exit - Ok::<(), Error>(()) - }, - ); + devices = new_devices; + } + + Ok(()) + }); Ok(()) } pub fn subscribe(&self) -> broadcast::Receiver { + // Maybe we should pass a direct receiver so that the UI module also gets the events from before it was started self.tx.subscribe() } } pub async fn create_client() -> Result> { - // TODO: Use spawn here after all, otherwise we block on creation - let client = Arc::new(Client::new().await?); client.run().await?; Ok(client) } +async fn watch_device(device_path: ObjectPath<'_>, tx: broadcast::Sender) -> Result<()> { + let dbus_connection = Connection::system().await?; + let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?; + + let interface = device.interface().await?; + let device_type = device.device_type().await?; + tx.send(Event::DeviceAdded { + interface: interface.to_string(), + r#type: device_type, + })?; + + spawn(watch_device_state( + device_path.to_owned(), + interface.to_owned(), + tx.clone(), + )); + + Ok(()) +} + +async fn watch_device_state( + device_path: ObjectPath<'_>, + interface: Str<'_>, + tx: broadcast::Sender, +) -> Result<()> { + let dbus_connection = Connection::system().await?; + let device = DeviceDbusProxy::new(&dbus_connection, &device_path).await?; + let r#type = device.device_type().await?; + + // Send an event communicating the initial state + let state = device.state().await?; + tx.send(Event::DeviceStateChanged { + interface: interface.to_string(), + r#type: r#type.clone(), + state, + })?; + + let mut state_changes = device.receive_state_changed().await; + while let Some(state_change) = state_changes.next().await { + let state = state_change.get().await?; + tx.send(Event::DeviceStateChanged { + interface: interface.to_string(), + r#type: r#type.clone(), + state, + })?; + } + + Ok(()) +} + register_fallible_client!(Client, network_manager); diff --git a/src/macros.rs b/src/macros.rs index 2e5ff10..1ae00a3 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -255,14 +255,3 @@ macro_rules! rc_mut { std::rc::Rc::new(std::cell::RefCell::new($val)) }; } - -#[macro_export] -macro_rules! spawn_blocking_result { - ($body:block) => { - spawn_blocking(move || { - if let Err(error) = (|| -> Result<()> { $body })() { - error!("Error in fallible spawned closure: {}", error); - } - }); - }; -} diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index 301f0f3..ba1b116 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -1,17 +1,18 @@ -use color_eyre::Result; -use futures_lite::StreamExt; -use gtk::prelude::{ContainerExt, WidgetExt}; -use gtk::{Box as GtkBox, Image, Orientation}; -use serde::Deserialize; -use tokio::sync::mpsc::Receiver; - -use crate::channels::{AsyncSenderExt, BroadcastReceiverExt}; use crate::clients::networkmanager::Client; +use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; use crate::clients::networkmanager::event::Event; use crate::config::CommonConfig; use crate::gtk_helpers::IronbarGtkExt; -use crate::modules::{Module, ModuleInfo, ModuleParts, ModuleUpdateEvent, WidgetContext}; +use crate::image::Provider; +use crate::modules::{Module, ModuleInfo, ModuleParts, WidgetContext}; use crate::{module_impl, spawn}; +use color_eyre::{Result, eyre::Ok}; +use glib::spawn_future_local; +use gtk::prelude::{ContainerExt, WidgetExt}; +use gtk::{Image, Orientation}; +use serde::Deserialize; +use std::collections::HashMap; +use tokio::sync::{broadcast, mpsc}; #[derive(Debug, Deserialize, Clone)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] @@ -27,7 +28,7 @@ const fn default_icon_size() -> i32 { 24 } -impl Module for NetworkManagerModule { +impl Module for NetworkManagerModule { type SendMessage = Event; type ReceiveMessage = (); @@ -37,19 +38,19 @@ impl Module for NetworkManagerModule { &self, _info: &ModuleInfo, context: &WidgetContext, - _rx: Receiver<()>, + _rx: mpsc::Receiver<()>, ) -> Result<()> { let client = context.try_client::()?; - // let mut client_signal = client.subscribe().to_stream(); - // let widget_transmitter = context.tx.clone(); + // Should we be using context.tx with ModuleUpdateEvent::Update instead? + let tx = context.update_tx.clone(); + spawn(async move { + let mut client_rx = client.subscribe(); + while let Result::Ok(event) = client_rx.recv().await { + tx.send(event)?; + } - // spawn(async move { - // while let Some(state) = client_signal.next().await { - // widget_transmitter - // .send_expect(ModuleUpdateEvent::Update(state)) - // .await; - // } - // }); + Ok(()) + }); Ok(()) } @@ -58,37 +59,116 @@ impl Module for NetworkManagerModule { self, context: WidgetContext, _info: &ModuleInfo, - ) -> Result> { - let container = GtkBox::new(Orientation::Horizontal, 0); + ) -> Result> { + let container = gtk::Box::new(Orientation::Horizontal, 0); - // Wired icon - let wired_icon = Image::new(); - wired_icon.add_class("icon"); - wired_icon.add_class("wired-icon"); - container.add(&wired_icon); - - // Wifi icon - let wifi_icon = Image::new(); - wifi_icon.add_class("icon"); - wifi_icon.add_class("wifi-icon"); - container.add(&wifi_icon); - - // Cellular icon - let cellular_icon = Image::new(); - cellular_icon.add_class("icon"); - cellular_icon.add_class("cellular-icon"); - container.add(&cellular_icon); - - // VPN icon - let vpn_icon = Image::new(); - vpn_icon.add_class("icon"); - vpn_icon.add_class("vpn-icon"); - container.add(&vpn_icon); - - context - .subscribe() - .recv_glib_async((), move |(), event| async {}); + // TODO: Check if passing the widget context in its entirety here is possible + // We cannot use recv_glib_async() here because the lifetimes don't work out + spawn_future_local(handle_update_events( + context.subscribe(), + container.clone(), + self.icon_size, + context.ironbar.image_provider(), + )); Ok(ModuleParts::new(container, None)) } } + +async fn handle_update_events( + mut rx: broadcast::Receiver, + container: gtk::Box, + icon_size: i32, + image_provider: Provider, +) { + let mut icons = HashMap::::new(); + + while let Result::Ok(event) = rx.recv().await { + println!("NM UI event: {:?}", event); + + match event { + Event::DeviceAdded { interface, r#type } => { + if !is_supported_device_type(&r#type) { + continue; + } + let icon = Image::new(); + icon.add_class("icon"); + container.add(&icon); + icons.insert(interface, icon); + } + Event::DeviceStateChanged { + interface, + r#type, + state, + } => { + if !is_supported_device_type(&r#type) { + continue; + } + let icon = icons + .get(&interface) + .expect("the icon for the interface to be present"); + let icon_name = get_icon_for_device_state(&r#type, &state); + match icon_name { + Some(icon_name) => { + image_provider + .load_into_image_silent(icon_name, icon_size, false, icon) + .await; + icon.show(); + } + None => { + icon.hide(); + } + } + } + }; + } +} + +fn is_supported_device_type(r#type: &DeviceType) -> bool { + matches!( + r#type, + DeviceType::Ethernet | DeviceType::Wifi | DeviceType::Tun | DeviceType::Wireguard + ) +} + +fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option<&'static str> { + match r#type { + DeviceType::Ethernet => match state { + DeviceState::Unavailable => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::Disconnected => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::Prepare => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::Config => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::NeedAuth => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::IpConfig => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::IpCheck => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::Secondaries => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::Activated => Some("icon:network-wired-symbolic"), + DeviceState::Deactivating => Some("icon:network-wired-disconnected-symbolic"), + DeviceState::Failed => Some("icon:network-wired-disconnected-symbolic"), + _ => None, + }, + DeviceType::Wifi => match state { + DeviceState::Unavailable => Some("icon:network-wireless-hardware-disabled-symbolic"), + DeviceState::Disconnected => Some("icon:network-wireless-offline-symbolic"), + DeviceState::Prepare => Some("icon:network-wireless-offline-symbolic"), + DeviceState::Config => Some("icon:network-wireless-offline-symbolic"), + DeviceState::NeedAuth => Some("icon:network-wireless-offline-symbolic"), + DeviceState::IpConfig => Some("icon:network-wireless-offline-symbolic"), + DeviceState::IpCheck => Some("icon:network-wireless-offline-symbolic"), + DeviceState::Secondaries => Some("icon:network-wireless-offline-symbolic"), + DeviceState::Activated => Some("icon:network-wireless-connected-symbolic"), + DeviceState::Deactivating => Some("icon:network-wireless-offline-symbolic"), + DeviceState::Failed => Some("icon:network-wireless-offline-symbolic"), + _ => None, + }, + DeviceType::Tun => match state { + DeviceState::Activated => Some("icon:network-vpn-symbolic"), + _ => None, + }, + DeviceType::Wireguard => match state { + DeviceState::Activated => Some("icon:network-vpn-symbolic"), + _ => None, + }, + _ => panic!("Device type should be a supported one"), + } +} From 8bb95e2e3f3e11c7dc91a54e8c615176b7024714 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Sun, 17 Aug 2025 20:57:40 +0200 Subject: [PATCH 3/4] fix(networkmanager): prevent race condition, support all device types --- src/clients/networkmanager/mod.rs | 1 - src/modules/networkmanager.rs | 29 ++++++++--------------------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index c6faba7..52c1329 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -33,7 +33,6 @@ impl Client { let dbus_connection = Connection::system().await?; let root = DbusProxy::new(&dbus_connection).await?; - // All device types get added to this, but not all types emit events let mut devices = HashSet::new(); let mut devices_changes = root.receive_all_devices_changed().await; diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index ba1b116..a19f22a 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -43,8 +43,9 @@ impl Module for NetworkManagerModule { let client = context.try_client::()?; // Should we be using context.tx with ModuleUpdateEvent::Update instead? let tx = context.update_tx.clone(); + // Must be done here synchronously to avoid race condition + let mut client_rx = client.subscribe(); spawn(async move { - let mut client_rx = client.subscribe(); while let Result::Ok(event) = client_rx.recv().await { tx.send(event)?; } @@ -62,10 +63,11 @@ impl Module for NetworkManagerModule { ) -> Result> { let container = gtk::Box::new(Orientation::Horizontal, 0); - // TODO: Check if passing the widget context in its entirety here is possible - // We cannot use recv_glib_async() here because the lifetimes don't work out + // Must be done here synchronously to avoid race condition + let rx = context.subscribe(); + // We cannot use recv_glib_async here because the lifetimes don't work out spawn_future_local(handle_update_events( - context.subscribe(), + rx, container.clone(), self.icon_size, context.ironbar.image_provider(), @@ -84,13 +86,8 @@ async fn handle_update_events( let mut icons = HashMap::::new(); while let Result::Ok(event) = rx.recv().await { - println!("NM UI event: {:?}", event); - match event { - Event::DeviceAdded { interface, r#type } => { - if !is_supported_device_type(&r#type) { - continue; - } + Event::DeviceAdded { interface, .. } => { let icon = Image::new(); icon.add_class("icon"); container.add(&icon); @@ -101,9 +98,6 @@ async fn handle_update_events( r#type, state, } => { - if !is_supported_device_type(&r#type) { - continue; - } let icon = icons .get(&interface) .expect("the icon for the interface to be present"); @@ -124,13 +118,6 @@ async fn handle_update_events( } } -fn is_supported_device_type(r#type: &DeviceType) -> bool { - matches!( - r#type, - DeviceType::Ethernet | DeviceType::Wifi | DeviceType::Tun | DeviceType::Wireguard - ) -} - fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option<&'static str> { match r#type { DeviceType::Ethernet => match state { @@ -169,6 +156,6 @@ fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option DeviceState::Activated => Some("icon:network-vpn-symbolic"), _ => None, }, - _ => panic!("Device type should be a supported one"), + _ => None, } } From e6f610687c87a9cab269e447f06a7b32f71a7c44 Mon Sep 17 00:00:00 2001 From: Reinout Meliesie Date: Sun, 17 Aug 2025 21:15:10 +0200 Subject: [PATCH 4/4] refactor(networkmanager): remove unnecessary async & unused fields --- src/clients/mod.rs | 2 +- src/clients/networkmanager/dbus.rs | 6 ------ src/clients/networkmanager/event.rs | 1 - src/clients/networkmanager/mod.rs | 16 ++++++---------- src/modules/networkmanager.rs | 1 + 5 files changed, 8 insertions(+), 18 deletions(-) diff --git a/src/clients/mod.rs b/src/clients/mod.rs index c4fce0d..cbacaa3 100644 --- a/src/clients/mod.rs +++ b/src/clients/mod.rs @@ -190,7 +190,7 @@ impl Clients { if let Some(client) = &self.network_manager { Ok(client.clone()) } else { - let client = await_sync(async move { networkmanager::create_client().await })?; + let client = networkmanager::create_client()?; self.network_manager = Some(client.clone()); Ok(client) } diff --git a/src/clients/networkmanager/dbus.rs b/src/clients/networkmanager/dbus.rs index 1c589d4..0f1e36a 100644 --- a/src/clients/networkmanager/dbus.rs +++ b/src/clients/networkmanager/dbus.rs @@ -8,14 +8,8 @@ use zbus::zvariant::{ObjectPath, OwnedValue, Str}; default_path = "/org/freedesktop/NetworkManager" )] pub(super) trait Dbus { - #[zbus(property)] - fn active_connections(&self) -> Result>>; - #[zbus(property)] fn all_devices(&self) -> Result>>; - - #[zbus(property)] - fn devices(&self) -> Result>>; } #[proxy( diff --git a/src/clients/networkmanager/event.rs b/src/clients/networkmanager/event.rs index b85307c..4963f6e 100644 --- a/src/clients/networkmanager/event.rs +++ b/src/clients/networkmanager/event.rs @@ -4,7 +4,6 @@ use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; pub enum Event { DeviceAdded { interface: String, - r#type: DeviceType, }, DeviceStateChanged { interface: String, diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 52c1329..1a25183 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -7,6 +7,7 @@ use tokio::sync::broadcast; use zbus::Connection; use zbus::zvariant::{ObjectPath, Str}; +use crate::clients::ClientResult; use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy}; use crate::clients::networkmanager::event::Event; use crate::{register_fallible_client, spawn}; @@ -20,14 +21,12 @@ pub struct Client { } impl Client { - async fn new() -> Result { + fn new() -> Result { let (tx, _) = broadcast::channel(64); Ok(Client { tx }) } - async fn run(&self) -> Result<()> { - // TODO: Use glib::clone!() - + fn run(&self) -> Result<()> { let tx = self.tx.clone(); spawn(async move { let dbus_connection = Connection::system().await?; @@ -58,14 +57,13 @@ impl Client { } pub fn subscribe(&self) -> broadcast::Receiver { - // Maybe we should pass a direct receiver so that the UI module also gets the events from before it was started self.tx.subscribe() } } -pub async fn create_client() -> Result> { - let client = Arc::new(Client::new().await?); - client.run().await?; +pub fn create_client() -> ClientResult { + let client = Arc::new(Client::new()?); + client.run()?; Ok(client) } @@ -74,10 +72,8 @@ async fn watch_device(device_path: ObjectPath<'_>, tx: broadcast::Sender) let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?; let interface = device.interface().await?; - let device_type = device.device_type().await?; tx.send(Event::DeviceAdded { interface: interface.to_string(), - r#type: device_type, })?; spawn(watch_device_state( diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index a19f22a..8ca128b 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -101,6 +101,7 @@ async fn handle_update_events( let icon = icons .get(&interface) .expect("the icon for the interface to be present"); + // TODO: Make this configurable at runtime let icon_name = get_icon_for_device_state(&r#type, &state); match icon_name { Some(icon_name) => {