diff --git a/src/clients/networkmanager/event.rs b/src/clients/networkmanager/event.rs index 2fdca55..4963f6e 100644 --- a/src/clients/networkmanager/event.rs +++ b/src/clients/networkmanager/event.rs @@ -1,18 +1,13 @@ use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; #[derive(Debug, Clone)] -pub enum ClientToModuleEvent { - DeviceChanged { - number: u32, +pub enum Event { + DeviceAdded { + interface: String, + }, + DeviceStateChanged { + interface: String, r#type: DeviceType, - new_state: DeviceState, - }, - DeviceRemoved { - number: u32, + state: DeviceState, }, } - -#[derive(Debug, Clone)] -pub enum ModuleToClientEvent { - NewController, -} diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 7459531..94cb82d 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -1,17 +1,15 @@ use color_eyre::Result; use color_eyre::eyre::Ok; use futures_lite::StreamExt; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::{RwLock, broadcast}; -use tokio::task::JoinHandle; -use tracing::debug; +use tokio::sync::broadcast; use zbus::Connection; -use zbus::zvariant::ObjectPath; +use zbus::zvariant::{ObjectPath, Str}; use crate::clients::ClientResult; use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy}; -use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent}; +use crate::clients::networkmanager::event::Event; use crate::{register_fallible_client, spawn}; pub mod dbus; @@ -19,221 +17,102 @@ pub mod event; #[derive(Debug)] pub struct Client { - inner: &'static ClientInner, + tx: broadcast::Sender, } impl Client { - fn new() -> Client { - let inner = Box::leak(Box::new(ClientInner::new())); - Client { inner } + fn new() -> Result { + let (tx, _) = broadcast::channel(64); + Ok(Client { tx }) } fn run(&self) -> Result<()> { - self.inner.run() - } - - pub fn subscribe(&self) -> broadcast::Receiver { - self.inner.subscribe() - } - - pub fn get_sender(&self) -> broadcast::Sender { - self.inner.get_sender() - } -} - -#[derive(Debug)] -struct ClientInner { - controller_sender: broadcast::Sender, - sender: broadcast::Sender, - device_watchers: RwLock, DeviceWatcher>>, - dbus_connection: RwLock>, -} - -#[derive(Clone, Debug)] -struct DeviceWatcher { - state_watcher: Arc>>, -} - -impl ClientInner { - fn new() -> ClientInner { - let (controller_sender, _) = broadcast::channel(64); - let (sender, _) = broadcast::channel(8); - let device_watchers = RwLock::new(HashMap::new()); - let dbus_connection = RwLock::new(None); - ClientInner { - controller_sender, - sender, - device_watchers, - dbus_connection, - } - } - - fn run(&'static self) -> Result<()> { - debug!("Client running"); - - spawn(self.watch_devices_list()); - - let receiver = self.sender.subscribe(); - spawn(self.handle_received_events(receiver)); - - Ok(()) - } - - fn subscribe(&self) -> broadcast::Receiver { - self.controller_sender.subscribe() - } - - fn get_sender(&self) -> broadcast::Sender { - self.sender.clone() - } - - async fn watch_devices_list(&'static self) -> Result<()> { - debug!("D-Bus devices list watcher starting"); - - let root = DbusProxy::new(&self.dbus_connection().await?).await?; - - let mut devices_changes = root.receive_all_devices_changed().await; - while let Some(devices_change) = devices_changes.next().await { - // The new list of devices from dbus, not to be confused with the added devices below - let new_device_paths = devices_change - .get() - .await? - .iter() - .map(ObjectPath::to_owned) - .collect::>(); - - let mut watchers = self.device_watchers.write().await; - let device_paths = watchers.keys().cloned().collect::>(); - - let added_device_paths = new_device_paths.difference(&device_paths); - for added_device_path in added_device_paths { - debug_assert!(!watchers.contains_key(added_device_path)); - - let watcher = self.watch_device(added_device_path.clone()).await?; - watchers.insert(added_device_path.clone(), watcher); - } - - let removed_device_paths = device_paths.difference(&new_device_paths); - for removed_device_path in removed_device_paths { - let watcher = watchers - .get(removed_device_path) - .expect("Device to be removed should be present in watchers"); - watcher.state_watcher.abort(); - watchers.remove(removed_device_path); - - let number = get_number_from_dbus_path(removed_device_path); - self.controller_sender - .send(ClientToModuleEvent::DeviceRemoved { number })?; - - debug!("D-bus device watchers for {} stopped", removed_device_path); - } - } - - Ok(()) - } - - async fn handle_received_events( - &'static self, - mut receiver: broadcast::Receiver, - ) -> Result<()> { - while let Result::Ok(event) = receiver.recv().await { - match event { - ModuleToClientEvent::NewController => { - debug!("Client received NewController event"); - - for device_path in self.device_watchers.read().await.keys() { - let dbus_connection = &self.dbus_connection().await?; - let device = DeviceDbusProxy::new(dbus_connection, device_path).await?; - - let number = get_number_from_dbus_path(device_path); - let r#type = device.device_type().await?; - let new_state = device.state().await?; - self.controller_sender - .send(ClientToModuleEvent::DeviceChanged { - number, - r#type, - new_state, - })?; - } - } - } - } - - Ok(()) - } - - async fn watch_device(&'static self, path: ObjectPath<'_>) -> Result { - let dbus_connection = &self.dbus_connection().await?; - let proxy = DeviceDbusProxy::new(dbus_connection, path.to_owned()).await?; - - let number = get_number_from_dbus_path(&path); - let r#type = proxy.device_type().await?; - let new_state = proxy.state().await?; - - // Notify modules that the device exists even if its properties don't change - self.controller_sender - .send(ClientToModuleEvent::DeviceChanged { - number, - r#type: r#type.clone(), - new_state, - })?; - - let state_watcher = Arc::new(spawn(self.watch_device_state(proxy))); - - Ok(DeviceWatcher { state_watcher }) - } - - async fn watch_device_state(&'static self, proxy: DeviceDbusProxy<'_>) -> Result<()> { - let path = proxy.inner().path(); - - debug!("D-Bus device state watcher for {} starting", path); - - let number = get_number_from_dbus_path(path); - let r#type = proxy.device_type().await?; - - let mut changes = proxy.receive_state_changed().await; - while let Some(change) = changes.next().await { - let new_state = change.get().await?; - self.controller_sender - .send(ClientToModuleEvent::DeviceChanged { - number, - r#type: r#type.clone(), - new_state, - })?; - } - - Ok(()) - } - - async fn dbus_connection(&self) -> Result { - let dbus_connection_guard = self.dbus_connection.read().await; - if let Some(dbus_connection) = &*dbus_connection_guard { - Ok(dbus_connection.clone()) - } else { - // Yes it's a bit awkward to first obtain a read lock and then a write lock but it - // needs to happen only once, and after that all read lock acquisitions will be - // instant - drop(dbus_connection_guard); + let tx = self.tx.clone(); + spawn(async move { let dbus_connection = Connection::system().await?; - *self.dbus_connection.write().await = Some(dbus_connection.clone()); - Ok(dbus_connection) - } + let root = DbusProxy::new(&dbus_connection).await?; + + let mut devices = HashSet::new(); + + let mut devices_changes = root.receive_all_devices_changed().await; + while let Some(devices_change) = devices_changes.next().await { + // The new list of devices from dbus, not to be confused with the added devices below + let new_devices = HashSet::from_iter(devices_change.get().await?); + + let added_devices = new_devices.difference(&devices); + for added_device in added_devices { + spawn(watch_device(added_device.to_owned(), tx.clone())); + } + + let _removed_devices = devices.difference(&new_devices); + // TODO: Cook up some way to notify closures for removed devices to exit + + devices = new_devices; + } + + Ok(()) + }); + + Ok(()) + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() } } pub fn create_client() -> ClientResult { - let client = Arc::new(Client::new()); + let client = Arc::new(Client::new()?); client.run()?; Ok(client) } -fn get_number_from_dbus_path(path: &ObjectPath) -> u32 { - let (_, number_str) = path - .rsplit_once('/') - .expect("Path must have at least two segments to contain an object number"); - number_str - .parse() - .expect("Last segment was not a positive integer") +async fn watch_device(device_path: ObjectPath<'_>, tx: broadcast::Sender) -> Result<()> { + let dbus_connection = Connection::system().await?; + let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?; + + let interface = device.interface().await?; + tx.send(Event::DeviceAdded { + interface: interface.to_string(), + })?; + + spawn(watch_device_state( + device_path.to_owned(), + interface.to_owned(), + tx.clone(), + )); + + Ok(()) +} + +async fn watch_device_state( + device_path: ObjectPath<'_>, + interface: Str<'_>, + tx: broadcast::Sender, +) -> Result<()> { + let dbus_connection = Connection::system().await?; + let device = DeviceDbusProxy::new(&dbus_connection, &device_path).await?; + let r#type = device.device_type().await?; + + // Send an event communicating the initial state + let state = device.state().await?; + tx.send(Event::DeviceStateChanged { + interface: interface.to_string(), + r#type: r#type.clone(), + state, + })?; + + let mut state_changes = device.receive_state_changed().await; + while let Some(state_change) = state_changes.next().await { + let state = state_change.get().await?; + tx.send(Event::DeviceStateChanged { + interface: interface.to_string(), + r#type: r#type.clone(), + state, + })?; + } + + Ok(()) } register_fallible_client!(Client, network_manager); diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index 62518e4..ea8b8d9 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -1,6 +1,6 @@ use crate::clients::networkmanager::Client; use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; -use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent}; +use crate::clients::networkmanager::event::Event; use crate::config::CommonConfig; use crate::gtk_helpers::IronbarGtkExt; use crate::image::Provider; @@ -13,7 +13,6 @@ use gtk::{Image, Orientation}; use serde::Deserialize; use std::collections::HashMap; use tokio::sync::{broadcast, mpsc}; -use tracing::debug; #[derive(Debug, Deserialize, Clone)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] @@ -30,7 +29,7 @@ const fn default_icon_size() -> i32 { } impl Module for NetworkManagerModule { - type SendMessage = ClientToModuleEvent; + type SendMessage = Event; type ReceiveMessage = (); module_impl!("network_manager"); @@ -38,24 +37,19 @@ impl Module for NetworkManagerModule { fn spawn_controller( &self, _info: &ModuleInfo, - context: &WidgetContext, - _widget_receiver: mpsc::Receiver<()>, + context: &WidgetContext, + _rx: mpsc::Receiver<()>, ) -> Result<()> { let client = context.try_client::()?; // Should we be using context.tx with ModuleUpdateEvent::Update instead? - let widget_sender = context.update_tx.clone(); - - // Must be done here otherwise we miss the response to our `NewController` event - let mut client_receiver = client.subscribe(); - - client - .get_sender() - .send(ModuleToClientEvent::NewController)?; - + let tx = context.update_tx.clone(); + // Must be done here synchronously to avoid race condition + let mut client_rx = client.subscribe(); spawn(async move { - while let Result::Ok(event) = client_receiver.recv().await { - widget_sender.send(event)?; + while let Result::Ok(event) = client_rx.recv().await { + tx.send(event)?; } + Ok(()) }); @@ -64,17 +58,16 @@ impl Module for NetworkManagerModule { fn into_widget( self, - context: WidgetContext, + context: WidgetContext, _info: &ModuleInfo, ) -> Result> { - // Must be done here otherwise we miss the response to our `NewController` event - let receiver = context.subscribe(); - let container = gtk::Box::new(Orientation::Horizontal, 0); + // Must be done here synchronously to avoid race condition + let rx = context.subscribe(); // We cannot use recv_glib_async here because the lifetimes don't work out spawn_future_local(handle_update_events( - receiver, + rx, container.clone(), self.icon_size, context.ironbar.image_provider(), @@ -85,37 +78,31 @@ impl Module for NetworkManagerModule { } async fn handle_update_events( - mut widget_receiver: broadcast::Receiver, + mut rx: broadcast::Receiver, container: gtk::Box, icon_size: i32, image_provider: Provider, -) -> Result<()> { - // TODO: Ensure the visible icons are always in the same order - let mut icons = HashMap::::new(); +) { + let mut icons = HashMap::::new(); - while let Result::Ok(event) = widget_receiver.recv().await { + while let Result::Ok(event) = rx.recv().await { match event { - ClientToModuleEvent::DeviceChanged { - number, + Event::DeviceAdded { interface, .. } => { + let icon = Image::new(); + icon.add_class("icon"); + container.add(&icon); + icons.insert(interface, icon); + } + Event::DeviceStateChanged { + interface, r#type, - new_state, + state, } => { - debug!( - "Module widget received DeviceChanged event for number {}", - number - ); - - let icon: &_ = icons.entry(number).or_insert_with(|| { - debug!("Adding icon for device {}", number); - - let icon = Image::new(); - icon.add_class("icon"); - container.add(&icon); - icon - }); - + let icon = icons + .get(&interface) + .expect("the icon for the interface to be present"); // TODO: Make this configurable at runtime - let icon_name = get_icon_for_device_state(&r#type, &new_state); + let icon_name = get_icon_for_device_state(&r#type, &state); match icon_name { Some(icon_name) => { image_provider @@ -128,22 +115,8 @@ async fn handle_update_events( } } } - ClientToModuleEvent::DeviceRemoved { number } => { - debug!( - "Module widget received DeviceRemoved event for number {}", - number - ); - - let icon = icons - .get(&number) - .expect("The icon for {} was about to be removed but was not present"); - container.remove(icon); - icons.remove(&number); - } - } + }; } - - Ok(()) } fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option<&'static str> {