1
0
Fork 0
mirror of https://github.com/Zedfrigg/ironbar.git synced 2025-10-06 04:31:55 +02:00

Compare commits

..

No commits in common. "a106f41b0af6ed9942ef03aec126882bedf872ac" and "4594271c42d2127e89978380da89393147b4d338" have entirely different histories.

3 changed files with 124 additions and 277 deletions

View file

@ -1,18 +1,13 @@
use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; use crate::clients::networkmanager::dbus::{DeviceState, DeviceType};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum ClientToModuleEvent { pub enum Event {
DeviceChanged { DeviceAdded {
number: u32, interface: String,
},
DeviceStateChanged {
interface: String,
r#type: DeviceType, r#type: DeviceType,
new_state: DeviceState, state: DeviceState,
},
DeviceRemoved {
number: u32,
}, },
} }
#[derive(Debug, Clone)]
pub enum ModuleToClientEvent {
NewController,
}

View file

@ -1,17 +1,15 @@
use color_eyre::Result; use color_eyre::Result;
use color_eyre::eyre::Ok; use color_eyre::eyre::Ok;
use futures_lite::StreamExt; use futures_lite::StreamExt;
use std::collections::{HashMap, HashSet}; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{RwLock, broadcast}; use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::debug;
use zbus::Connection; use zbus::Connection;
use zbus::zvariant::ObjectPath; use zbus::zvariant::{ObjectPath, Str};
use crate::clients::ClientResult; use crate::clients::ClientResult;
use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy}; use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy};
use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent}; use crate::clients::networkmanager::event::Event;
use crate::{register_fallible_client, spawn}; use crate::{register_fallible_client, spawn};
pub mod dbus; pub mod dbus;
@ -19,221 +17,102 @@ pub mod event;
#[derive(Debug)] #[derive(Debug)]
pub struct Client { pub struct Client {
inner: &'static ClientInner, tx: broadcast::Sender<Event>,
} }
impl Client { impl Client {
fn new() -> Client { fn new() -> Result<Client> {
let inner = Box::leak(Box::new(ClientInner::new())); let (tx, _) = broadcast::channel(64);
Client { inner } Ok(Client { tx })
} }
fn run(&self) -> Result<()> { fn run(&self) -> Result<()> {
self.inner.run() let tx = self.tx.clone();
} spawn(async move {
pub fn subscribe(&self) -> broadcast::Receiver<ClientToModuleEvent> {
self.inner.subscribe()
}
pub fn get_sender(&self) -> broadcast::Sender<ModuleToClientEvent> {
self.inner.get_sender()
}
}
#[derive(Debug)]
struct ClientInner {
controller_sender: broadcast::Sender<ClientToModuleEvent>,
sender: broadcast::Sender<ModuleToClientEvent>,
device_watchers: RwLock<HashMap<ObjectPath<'static>, DeviceWatcher>>,
dbus_connection: RwLock<Option<Connection>>,
}
#[derive(Clone, Debug)]
struct DeviceWatcher {
state_watcher: Arc<JoinHandle<Result<()>>>,
}
impl ClientInner {
fn new() -> ClientInner {
let (controller_sender, _) = broadcast::channel(64);
let (sender, _) = broadcast::channel(8);
let device_watchers = RwLock::new(HashMap::new());
let dbus_connection = RwLock::new(None);
ClientInner {
controller_sender,
sender,
device_watchers,
dbus_connection,
}
}
fn run(&'static self) -> Result<()> {
debug!("Client running");
spawn(self.watch_devices_list());
let receiver = self.sender.subscribe();
spawn(self.handle_received_events(receiver));
Ok(())
}
fn subscribe(&self) -> broadcast::Receiver<ClientToModuleEvent> {
self.controller_sender.subscribe()
}
fn get_sender(&self) -> broadcast::Sender<ModuleToClientEvent> {
self.sender.clone()
}
async fn watch_devices_list(&'static self) -> Result<()> {
debug!("D-Bus devices list watcher starting");
let root = DbusProxy::new(&self.dbus_connection().await?).await?;
let mut devices_changes = root.receive_all_devices_changed().await;
while let Some(devices_change) = devices_changes.next().await {
// The new list of devices from dbus, not to be confused with the added devices below
let new_device_paths = devices_change
.get()
.await?
.iter()
.map(ObjectPath::to_owned)
.collect::<HashSet<_>>();
let mut watchers = self.device_watchers.write().await;
let device_paths = watchers.keys().cloned().collect::<HashSet<_>>();
let added_device_paths = new_device_paths.difference(&device_paths);
for added_device_path in added_device_paths {
debug_assert!(!watchers.contains_key(added_device_path));
let watcher = self.watch_device(added_device_path.clone()).await?;
watchers.insert(added_device_path.clone(), watcher);
}
let removed_device_paths = device_paths.difference(&new_device_paths);
for removed_device_path in removed_device_paths {
let watcher = watchers
.get(removed_device_path)
.expect("Device to be removed should be present in watchers");
watcher.state_watcher.abort();
watchers.remove(removed_device_path);
let number = get_number_from_dbus_path(removed_device_path);
self.controller_sender
.send(ClientToModuleEvent::DeviceRemoved { number })?;
debug!("D-bus device watchers for {} stopped", removed_device_path);
}
}
Ok(())
}
async fn handle_received_events(
&'static self,
mut receiver: broadcast::Receiver<ModuleToClientEvent>,
) -> Result<()> {
while let Result::Ok(event) = receiver.recv().await {
match event {
ModuleToClientEvent::NewController => {
debug!("Client received NewController event");
for device_path in self.device_watchers.read().await.keys() {
let dbus_connection = &self.dbus_connection().await?;
let device = DeviceDbusProxy::new(dbus_connection, device_path).await?;
let number = get_number_from_dbus_path(device_path);
let r#type = device.device_type().await?;
let new_state = device.state().await?;
self.controller_sender
.send(ClientToModuleEvent::DeviceChanged {
number,
r#type,
new_state,
})?;
}
}
}
}
Ok(())
}
async fn watch_device(&'static self, path: ObjectPath<'_>) -> Result<DeviceWatcher> {
let dbus_connection = &self.dbus_connection().await?;
let proxy = DeviceDbusProxy::new(dbus_connection, path.to_owned()).await?;
let number = get_number_from_dbus_path(&path);
let r#type = proxy.device_type().await?;
let new_state = proxy.state().await?;
// Notify modules that the device exists even if its properties don't change
self.controller_sender
.send(ClientToModuleEvent::DeviceChanged {
number,
r#type: r#type.clone(),
new_state,
})?;
let state_watcher = Arc::new(spawn(self.watch_device_state(proxy)));
Ok(DeviceWatcher { state_watcher })
}
async fn watch_device_state(&'static self, proxy: DeviceDbusProxy<'_>) -> Result<()> {
let path = proxy.inner().path();
debug!("D-Bus device state watcher for {} starting", path);
let number = get_number_from_dbus_path(path);
let r#type = proxy.device_type().await?;
let mut changes = proxy.receive_state_changed().await;
while let Some(change) = changes.next().await {
let new_state = change.get().await?;
self.controller_sender
.send(ClientToModuleEvent::DeviceChanged {
number,
r#type: r#type.clone(),
new_state,
})?;
}
Ok(())
}
async fn dbus_connection(&self) -> Result<Connection> {
let dbus_connection_guard = self.dbus_connection.read().await;
if let Some(dbus_connection) = &*dbus_connection_guard {
Ok(dbus_connection.clone())
} else {
// Yes it's a bit awkward to first obtain a read lock and then a write lock but it
// needs to happen only once, and after that all read lock acquisitions will be
// instant
drop(dbus_connection_guard);
let dbus_connection = Connection::system().await?; let dbus_connection = Connection::system().await?;
*self.dbus_connection.write().await = Some(dbus_connection.clone()); let root = DbusProxy::new(&dbus_connection).await?;
Ok(dbus_connection)
} let mut devices = HashSet::new();
let mut devices_changes = root.receive_all_devices_changed().await;
while let Some(devices_change) = devices_changes.next().await {
// The new list of devices from dbus, not to be confused with the added devices below
let new_devices = HashSet::from_iter(devices_change.get().await?);
let added_devices = new_devices.difference(&devices);
for added_device in added_devices {
spawn(watch_device(added_device.to_owned(), tx.clone()));
}
let _removed_devices = devices.difference(&new_devices);
// TODO: Cook up some way to notify closures for removed devices to exit
devices = new_devices;
}
Ok(())
});
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.tx.subscribe()
} }
} }
pub fn create_client() -> ClientResult<Client> { pub fn create_client() -> ClientResult<Client> {
let client = Arc::new(Client::new()); let client = Arc::new(Client::new()?);
client.run()?; client.run()?;
Ok(client) Ok(client)
} }
fn get_number_from_dbus_path(path: &ObjectPath) -> u32 { async fn watch_device(device_path: ObjectPath<'_>, tx: broadcast::Sender<Event>) -> Result<()> {
let (_, number_str) = path let dbus_connection = Connection::system().await?;
.rsplit_once('/') let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?;
.expect("Path must have at least two segments to contain an object number");
number_str let interface = device.interface().await?;
.parse() tx.send(Event::DeviceAdded {
.expect("Last segment was not a positive integer") interface: interface.to_string(),
})?;
spawn(watch_device_state(
device_path.to_owned(),
interface.to_owned(),
tx.clone(),
));
Ok(())
}
async fn watch_device_state(
device_path: ObjectPath<'_>,
interface: Str<'_>,
tx: broadcast::Sender<Event>,
) -> Result<()> {
let dbus_connection = Connection::system().await?;
let device = DeviceDbusProxy::new(&dbus_connection, &device_path).await?;
let r#type = device.device_type().await?;
// Send an event communicating the initial state
let state = device.state().await?;
tx.send(Event::DeviceStateChanged {
interface: interface.to_string(),
r#type: r#type.clone(),
state,
})?;
let mut state_changes = device.receive_state_changed().await;
while let Some(state_change) = state_changes.next().await {
let state = state_change.get().await?;
tx.send(Event::DeviceStateChanged {
interface: interface.to_string(),
r#type: r#type.clone(),
state,
})?;
}
Ok(())
} }
register_fallible_client!(Client, network_manager); register_fallible_client!(Client, network_manager);

View file

@ -1,6 +1,6 @@
use crate::clients::networkmanager::Client; use crate::clients::networkmanager::Client;
use crate::clients::networkmanager::dbus::{DeviceState, DeviceType}; use crate::clients::networkmanager::dbus::{DeviceState, DeviceType};
use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent}; use crate::clients::networkmanager::event::Event;
use crate::config::CommonConfig; use crate::config::CommonConfig;
use crate::gtk_helpers::IronbarGtkExt; use crate::gtk_helpers::IronbarGtkExt;
use crate::image::Provider; use crate::image::Provider;
@ -13,7 +13,6 @@ use gtk::{Image, Orientation};
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use tracing::debug;
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
@ -30,7 +29,7 @@ const fn default_icon_size() -> i32 {
} }
impl Module<gtk::Box> for NetworkManagerModule { impl Module<gtk::Box> for NetworkManagerModule {
type SendMessage = ClientToModuleEvent; type SendMessage = Event;
type ReceiveMessage = (); type ReceiveMessage = ();
module_impl!("network_manager"); module_impl!("network_manager");
@ -38,24 +37,19 @@ impl Module<gtk::Box> for NetworkManagerModule {
fn spawn_controller( fn spawn_controller(
&self, &self,
_info: &ModuleInfo, _info: &ModuleInfo,
context: &WidgetContext<ClientToModuleEvent, ()>, context: &WidgetContext<Event, ()>,
_widget_receiver: mpsc::Receiver<()>, _rx: mpsc::Receiver<()>,
) -> Result<()> { ) -> Result<()> {
let client = context.try_client::<Client>()?; let client = context.try_client::<Client>()?;
// Should we be using context.tx with ModuleUpdateEvent::Update instead? // Should we be using context.tx with ModuleUpdateEvent::Update instead?
let widget_sender = context.update_tx.clone(); let tx = context.update_tx.clone();
// Must be done here synchronously to avoid race condition
// Must be done here otherwise we miss the response to our `NewController` event let mut client_rx = client.subscribe();
let mut client_receiver = client.subscribe();
client
.get_sender()
.send(ModuleToClientEvent::NewController)?;
spawn(async move { spawn(async move {
while let Result::Ok(event) = client_receiver.recv().await { while let Result::Ok(event) = client_rx.recv().await {
widget_sender.send(event)?; tx.send(event)?;
} }
Ok(()) Ok(())
}); });
@ -64,17 +58,16 @@ impl Module<gtk::Box> for NetworkManagerModule {
fn into_widget( fn into_widget(
self, self,
context: WidgetContext<ClientToModuleEvent, ()>, context: WidgetContext<Event, ()>,
_info: &ModuleInfo, _info: &ModuleInfo,
) -> Result<ModuleParts<gtk::Box>> { ) -> Result<ModuleParts<gtk::Box>> {
// Must be done here otherwise we miss the response to our `NewController` event
let receiver = context.subscribe();
let container = gtk::Box::new(Orientation::Horizontal, 0); let container = gtk::Box::new(Orientation::Horizontal, 0);
// Must be done here synchronously to avoid race condition
let rx = context.subscribe();
// We cannot use recv_glib_async here because the lifetimes don't work out // We cannot use recv_glib_async here because the lifetimes don't work out
spawn_future_local(handle_update_events( spawn_future_local(handle_update_events(
receiver, rx,
container.clone(), container.clone(),
self.icon_size, self.icon_size,
context.ironbar.image_provider(), context.ironbar.image_provider(),
@ -85,37 +78,31 @@ impl Module<gtk::Box> for NetworkManagerModule {
} }
async fn handle_update_events( async fn handle_update_events(
mut widget_receiver: broadcast::Receiver<ClientToModuleEvent>, mut rx: broadcast::Receiver<Event>,
container: gtk::Box, container: gtk::Box,
icon_size: i32, icon_size: i32,
image_provider: Provider, image_provider: Provider,
) -> Result<()> { ) {
// TODO: Ensure the visible icons are always in the same order let mut icons = HashMap::<String, Image>::new();
let mut icons = HashMap::<u32, Image>::new();
while let Result::Ok(event) = widget_receiver.recv().await { while let Result::Ok(event) = rx.recv().await {
match event { match event {
ClientToModuleEvent::DeviceChanged { Event::DeviceAdded { interface, .. } => {
number, let icon = Image::new();
icon.add_class("icon");
container.add(&icon);
icons.insert(interface, icon);
}
Event::DeviceStateChanged {
interface,
r#type, r#type,
new_state, state,
} => { } => {
debug!( let icon = icons
"Module widget received DeviceChanged event for number {}", .get(&interface)
number .expect("the icon for the interface to be present");
);
let icon: &_ = icons.entry(number).or_insert_with(|| {
debug!("Adding icon for device {}", number);
let icon = Image::new();
icon.add_class("icon");
container.add(&icon);
icon
});
// TODO: Make this configurable at runtime // TODO: Make this configurable at runtime
let icon_name = get_icon_for_device_state(&r#type, &new_state); let icon_name = get_icon_for_device_state(&r#type, &state);
match icon_name { match icon_name {
Some(icon_name) => { Some(icon_name) => {
image_provider image_provider
@ -128,22 +115,8 @@ async fn handle_update_events(
} }
} }
} }
ClientToModuleEvent::DeviceRemoved { number } => { };
debug!(
"Module widget received DeviceRemoved event for number {}",
number
);
let icon = icons
.get(&number)
.expect("The icon for {} was about to be removed but was not present");
container.remove(icon);
icons.remove(&number);
}
}
} }
Ok(())
} }
fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option<&'static str> { fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option<&'static str> {