diff --git a/src/clients/networkmanager/event.rs b/src/clients/networkmanager/event.rs index 4963f6e..2fdca55 100644 --- a/src/clients/networkmanager/event.rs +++ b/src/clients/networkmanager/event.rs @@ -1,13 +1,18 @@ use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; #[derive(Debug, Clone)] -pub enum Event { - DeviceAdded { - interface: String, - }, - DeviceStateChanged { - interface: String, +pub enum ClientToModuleEvent { + DeviceChanged { + number: u32, r#type: DeviceType, - state: DeviceState, + new_state: DeviceState, + }, + DeviceRemoved { + number: u32, }, } + +#[derive(Debug, Clone)] +pub enum ModuleToClientEvent { + NewController, +} diff --git a/src/clients/networkmanager/mod.rs b/src/clients/networkmanager/mod.rs index 94cb82d..7459531 100644 --- a/src/clients/networkmanager/mod.rs +++ b/src/clients/networkmanager/mod.rs @@ -1,15 +1,17 @@ use color_eyre::Result; use color_eyre::eyre::Ok; use futures_lite::StreamExt; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use tokio::sync::broadcast; +use tokio::sync::{RwLock, broadcast}; +use tokio::task::JoinHandle; +use tracing::debug; use zbus::Connection; -use zbus::zvariant::{ObjectPath, Str}; +use zbus::zvariant::ObjectPath; use crate::clients::ClientResult; use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy}; -use crate::clients::networkmanager::event::Event; +use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent}; use crate::{register_fallible_client, spawn}; pub mod dbus; @@ -17,102 +19,221 @@ pub mod event; #[derive(Debug)] pub struct Client { - tx: broadcast::Sender, + inner: &'static ClientInner, } impl Client { - fn new() -> Result { - let (tx, _) = broadcast::channel(64); - Ok(Client { tx }) + fn new() -> Client { + let inner = Box::leak(Box::new(ClientInner::new())); + Client { inner } } fn run(&self) -> Result<()> { - let tx = self.tx.clone(); - spawn(async move { - let dbus_connection = Connection::system().await?; - let root = DbusProxy::new(&dbus_connection).await?; + self.inner.run() + } - let mut devices = HashSet::new(); + pub fn subscribe(&self) -> broadcast::Receiver { + self.inner.subscribe() + } - let mut devices_changes = root.receive_all_devices_changed().await; - while let Some(devices_change) = devices_changes.next().await { - // The new list of devices from dbus, not to be confused with the added devices below - let new_devices = HashSet::from_iter(devices_change.get().await?); + pub fn get_sender(&self) -> broadcast::Sender { + self.inner.get_sender() + } +} - let added_devices = new_devices.difference(&devices); - for added_device in added_devices { - spawn(watch_device(added_device.to_owned(), tx.clone())); - } +#[derive(Debug)] +struct ClientInner { + controller_sender: broadcast::Sender, + sender: broadcast::Sender, + device_watchers: RwLock, DeviceWatcher>>, + dbus_connection: RwLock>, +} - let _removed_devices = devices.difference(&new_devices); - // TODO: Cook up some way to notify closures for removed devices to exit +#[derive(Clone, Debug)] +struct DeviceWatcher { + state_watcher: Arc>>, +} - devices = new_devices; - } +impl ClientInner { + fn new() -> ClientInner { + let (controller_sender, _) = broadcast::channel(64); + let (sender, _) = broadcast::channel(8); + let device_watchers = RwLock::new(HashMap::new()); + let dbus_connection = RwLock::new(None); + ClientInner { + controller_sender, + sender, + device_watchers, + dbus_connection, + } + } - Ok(()) - }); + fn run(&'static self) -> Result<()> { + debug!("Client running"); + + spawn(self.watch_devices_list()); + + let receiver = self.sender.subscribe(); + spawn(self.handle_received_events(receiver)); Ok(()) } - pub fn subscribe(&self) -> broadcast::Receiver { - self.tx.subscribe() + fn subscribe(&self) -> broadcast::Receiver { + self.controller_sender.subscribe() + } + + fn get_sender(&self) -> broadcast::Sender { + self.sender.clone() + } + + async fn watch_devices_list(&'static self) -> Result<()> { + debug!("D-Bus devices list watcher starting"); + + let root = DbusProxy::new(&self.dbus_connection().await?).await?; + + let mut devices_changes = root.receive_all_devices_changed().await; + while let Some(devices_change) = devices_changes.next().await { + // The new list of devices from dbus, not to be confused with the added devices below + let new_device_paths = devices_change + .get() + .await? + .iter() + .map(ObjectPath::to_owned) + .collect::>(); + + let mut watchers = self.device_watchers.write().await; + let device_paths = watchers.keys().cloned().collect::>(); + + let added_device_paths = new_device_paths.difference(&device_paths); + for added_device_path in added_device_paths { + debug_assert!(!watchers.contains_key(added_device_path)); + + let watcher = self.watch_device(added_device_path.clone()).await?; + watchers.insert(added_device_path.clone(), watcher); + } + + let removed_device_paths = device_paths.difference(&new_device_paths); + for removed_device_path in removed_device_paths { + let watcher = watchers + .get(removed_device_path) + .expect("Device to be removed should be present in watchers"); + watcher.state_watcher.abort(); + watchers.remove(removed_device_path); + + let number = get_number_from_dbus_path(removed_device_path); + self.controller_sender + .send(ClientToModuleEvent::DeviceRemoved { number })?; + + debug!("D-bus device watchers for {} stopped", removed_device_path); + } + } + + Ok(()) + } + + async fn handle_received_events( + &'static self, + mut receiver: broadcast::Receiver, + ) -> Result<()> { + while let Result::Ok(event) = receiver.recv().await { + match event { + ModuleToClientEvent::NewController => { + debug!("Client received NewController event"); + + for device_path in self.device_watchers.read().await.keys() { + let dbus_connection = &self.dbus_connection().await?; + let device = DeviceDbusProxy::new(dbus_connection, device_path).await?; + + let number = get_number_from_dbus_path(device_path); + let r#type = device.device_type().await?; + let new_state = device.state().await?; + self.controller_sender + .send(ClientToModuleEvent::DeviceChanged { + number, + r#type, + new_state, + })?; + } + } + } + } + + Ok(()) + } + + async fn watch_device(&'static self, path: ObjectPath<'_>) -> Result { + let dbus_connection = &self.dbus_connection().await?; + let proxy = DeviceDbusProxy::new(dbus_connection, path.to_owned()).await?; + + let number = get_number_from_dbus_path(&path); + let r#type = proxy.device_type().await?; + let new_state = proxy.state().await?; + + // Notify modules that the device exists even if its properties don't change + self.controller_sender + .send(ClientToModuleEvent::DeviceChanged { + number, + r#type: r#type.clone(), + new_state, + })?; + + let state_watcher = Arc::new(spawn(self.watch_device_state(proxy))); + + Ok(DeviceWatcher { state_watcher }) + } + + async fn watch_device_state(&'static self, proxy: DeviceDbusProxy<'_>) -> Result<()> { + let path = proxy.inner().path(); + + debug!("D-Bus device state watcher for {} starting", path); + + let number = get_number_from_dbus_path(path); + let r#type = proxy.device_type().await?; + + let mut changes = proxy.receive_state_changed().await; + while let Some(change) = changes.next().await { + let new_state = change.get().await?; + self.controller_sender + .send(ClientToModuleEvent::DeviceChanged { + number, + r#type: r#type.clone(), + new_state, + })?; + } + + Ok(()) + } + + async fn dbus_connection(&self) -> Result { + let dbus_connection_guard = self.dbus_connection.read().await; + if let Some(dbus_connection) = &*dbus_connection_guard { + Ok(dbus_connection.clone()) + } else { + // Yes it's a bit awkward to first obtain a read lock and then a write lock but it + // needs to happen only once, and after that all read lock acquisitions will be + // instant + drop(dbus_connection_guard); + let dbus_connection = Connection::system().await?; + *self.dbus_connection.write().await = Some(dbus_connection.clone()); + Ok(dbus_connection) + } } } pub fn create_client() -> ClientResult { - let client = Arc::new(Client::new()?); + let client = Arc::new(Client::new()); client.run()?; Ok(client) } -async fn watch_device(device_path: ObjectPath<'_>, tx: broadcast::Sender) -> Result<()> { - let dbus_connection = Connection::system().await?; - let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?; - - let interface = device.interface().await?; - tx.send(Event::DeviceAdded { - interface: interface.to_string(), - })?; - - spawn(watch_device_state( - device_path.to_owned(), - interface.to_owned(), - tx.clone(), - )); - - Ok(()) -} - -async fn watch_device_state( - device_path: ObjectPath<'_>, - interface: Str<'_>, - tx: broadcast::Sender, -) -> Result<()> { - let dbus_connection = Connection::system().await?; - let device = DeviceDbusProxy::new(&dbus_connection, &device_path).await?; - let r#type = device.device_type().await?; - - // Send an event communicating the initial state - let state = device.state().await?; - tx.send(Event::DeviceStateChanged { - interface: interface.to_string(), - r#type: r#type.clone(), - state, - })?; - - let mut state_changes = device.receive_state_changed().await; - while let Some(state_change) = state_changes.next().await { - let state = state_change.get().await?; - tx.send(Event::DeviceStateChanged { - interface: interface.to_string(), - r#type: r#type.clone(), - state, - })?; - } - - Ok(()) +fn get_number_from_dbus_path(path: &ObjectPath) -> u32 { + let (_, number_str) = path + .rsplit_once('/') + .expect("Path must have at least two segments to contain an object number"); + number_str + .parse() + .expect("Last segment was not a positive integer") } register_fallible_client!(Client, network_manager); diff --git a/src/modules/networkmanager.rs b/src/modules/networkmanager.rs index ea8b8d9..62518e4 100644 --- a/src/modules/networkmanager.rs +++ b/src/modules/networkmanager.rs @@ -1,6 +1,6 @@ use crate::clients::networkmanager::Client; use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; -use crate::clients::networkmanager::event::Event; +use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent}; use crate::config::CommonConfig; use crate::gtk_helpers::IronbarGtkExt; use crate::image::Provider; @@ -13,6 +13,7 @@ use gtk::{Image, Orientation}; use serde::Deserialize; use std::collections::HashMap; use tokio::sync::{broadcast, mpsc}; +use tracing::debug; #[derive(Debug, Deserialize, Clone)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] @@ -29,7 +30,7 @@ const fn default_icon_size() -> i32 { } impl Module for NetworkManagerModule { - type SendMessage = Event; + type SendMessage = ClientToModuleEvent; type ReceiveMessage = (); module_impl!("network_manager"); @@ -37,19 +38,24 @@ impl Module for NetworkManagerModule { fn spawn_controller( &self, _info: &ModuleInfo, - context: &WidgetContext, - _rx: mpsc::Receiver<()>, + context: &WidgetContext, + _widget_receiver: mpsc::Receiver<()>, ) -> Result<()> { let client = context.try_client::()?; // Should we be using context.tx with ModuleUpdateEvent::Update instead? - let tx = context.update_tx.clone(); - // Must be done here synchronously to avoid race condition - let mut client_rx = client.subscribe(); - spawn(async move { - while let Result::Ok(event) = client_rx.recv().await { - tx.send(event)?; - } + let widget_sender = context.update_tx.clone(); + // Must be done here otherwise we miss the response to our `NewController` event + let mut client_receiver = client.subscribe(); + + client + .get_sender() + .send(ModuleToClientEvent::NewController)?; + + spawn(async move { + while let Result::Ok(event) = client_receiver.recv().await { + widget_sender.send(event)?; + } Ok(()) }); @@ -58,16 +64,17 @@ impl Module for NetworkManagerModule { fn into_widget( self, - context: WidgetContext, + context: WidgetContext, _info: &ModuleInfo, ) -> Result> { + // Must be done here otherwise we miss the response to our `NewController` event + let receiver = context.subscribe(); + let container = gtk::Box::new(Orientation::Horizontal, 0); - // Must be done here synchronously to avoid race condition - let rx = context.subscribe(); // We cannot use recv_glib_async here because the lifetimes don't work out spawn_future_local(handle_update_events( - rx, + receiver, container.clone(), self.icon_size, context.ironbar.image_provider(), @@ -78,31 +85,37 @@ impl Module for NetworkManagerModule { } async fn handle_update_events( - mut rx: broadcast::Receiver, + mut widget_receiver: broadcast::Receiver, container: gtk::Box, icon_size: i32, image_provider: Provider, -) { - let mut icons = HashMap::::new(); +) -> Result<()> { + // TODO: Ensure the visible icons are always in the same order + let mut icons = HashMap::::new(); - while let Result::Ok(event) = rx.recv().await { + while let Result::Ok(event) = widget_receiver.recv().await { match event { - Event::DeviceAdded { interface, .. } => { - let icon = Image::new(); - icon.add_class("icon"); - container.add(&icon); - icons.insert(interface, icon); - } - Event::DeviceStateChanged { - interface, + ClientToModuleEvent::DeviceChanged { + number, r#type, - state, + new_state, } => { - let icon = icons - .get(&interface) - .expect("the icon for the interface to be present"); + debug!( + "Module widget received DeviceChanged event for number {}", + number + ); + + let icon: &_ = icons.entry(number).or_insert_with(|| { + debug!("Adding icon for device {}", number); + + let icon = Image::new(); + icon.add_class("icon"); + container.add(&icon); + icon + }); + // TODO: Make this configurable at runtime - let icon_name = get_icon_for_device_state(&r#type, &state); + let icon_name = get_icon_for_device_state(&r#type, &new_state); match icon_name { Some(icon_name) => { image_provider @@ -115,8 +128,22 @@ async fn handle_update_events( } } } - }; + ClientToModuleEvent::DeviceRemoved { number } => { + debug!( + "Module widget received DeviceRemoved event for number {}", + number + ); + + let icon = icons + .get(&number) + .expect("The icon for {} was about to be removed but was not present"); + container.remove(icon); + icons.remove(&number); + } + } } + + Ok(()) } fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option<&'static str> {