mirror of
https://github.com/Zedfrigg/ironbar.git
synced 2025-10-05 20:31:53 +02:00
Compare commits
12 commits
4594271c42
...
a106f41b0a
Author | SHA1 | Date | |
---|---|---|---|
a106f41b0a |
|||
db88e12b8e |
|||
af49acb40b |
|||
d752e88abf |
|||
f83c9e6852 |
|||
01de9da7e0 |
|||
13c2520c76 |
|||
5385c7e705 |
|||
3ffb668e6b |
|||
4c516a1c2a |
|||
ec00b2ce69 |
|||
226b32ce6a |
3 changed files with 269 additions and 116 deletions
|
@ -1,13 +1,18 @@
|
|||
use crate::clients::networkmanager::dbus::{DeviceState, DeviceType};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Event {
|
||||
DeviceAdded {
|
||||
interface: String,
|
||||
},
|
||||
DeviceStateChanged {
|
||||
interface: String,
|
||||
pub enum ClientToModuleEvent {
|
||||
DeviceChanged {
|
||||
number: u32,
|
||||
r#type: DeviceType,
|
||||
state: DeviceState,
|
||||
new_state: DeviceState,
|
||||
},
|
||||
DeviceRemoved {
|
||||
number: u32,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ModuleToClientEvent {
|
||||
NewController,
|
||||
}
|
||||
|
|
|
@ -1,15 +1,17 @@
|
|||
use color_eyre::Result;
|
||||
use color_eyre::eyre::Ok;
|
||||
use futures_lite::StreamExt;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{RwLock, broadcast};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::debug;
|
||||
use zbus::Connection;
|
||||
use zbus::zvariant::{ObjectPath, Str};
|
||||
use zbus::zvariant::ObjectPath;
|
||||
|
||||
use crate::clients::ClientResult;
|
||||
use crate::clients::networkmanager::dbus::{DbusProxy, DeviceDbusProxy};
|
||||
use crate::clients::networkmanager::event::Event;
|
||||
use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent};
|
||||
use crate::{register_fallible_client, spawn};
|
||||
|
||||
pub mod dbus;
|
||||
|
@ -17,102 +19,221 @@ pub mod event;
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct Client {
|
||||
tx: broadcast::Sender<Event>,
|
||||
inner: &'static ClientInner,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
fn new() -> Result<Client> {
|
||||
let (tx, _) = broadcast::channel(64);
|
||||
Ok(Client { tx })
|
||||
fn new() -> Client {
|
||||
let inner = Box::leak(Box::new(ClientInner::new()));
|
||||
Client { inner }
|
||||
}
|
||||
|
||||
fn run(&self) -> Result<()> {
|
||||
let tx = self.tx.clone();
|
||||
spawn(async move {
|
||||
let dbus_connection = Connection::system().await?;
|
||||
let root = DbusProxy::new(&dbus_connection).await?;
|
||||
self.inner.run()
|
||||
}
|
||||
|
||||
let mut devices = HashSet::new();
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<ClientToModuleEvent> {
|
||||
self.inner.subscribe()
|
||||
}
|
||||
|
||||
pub fn get_sender(&self) -> broadcast::Sender<ModuleToClientEvent> {
|
||||
self.inner.get_sender()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ClientInner {
|
||||
controller_sender: broadcast::Sender<ClientToModuleEvent>,
|
||||
sender: broadcast::Sender<ModuleToClientEvent>,
|
||||
device_watchers: RwLock<HashMap<ObjectPath<'static>, DeviceWatcher>>,
|
||||
dbus_connection: RwLock<Option<Connection>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct DeviceWatcher {
|
||||
state_watcher: Arc<JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
impl ClientInner {
|
||||
fn new() -> ClientInner {
|
||||
let (controller_sender, _) = broadcast::channel(64);
|
||||
let (sender, _) = broadcast::channel(8);
|
||||
let device_watchers = RwLock::new(HashMap::new());
|
||||
let dbus_connection = RwLock::new(None);
|
||||
ClientInner {
|
||||
controller_sender,
|
||||
sender,
|
||||
device_watchers,
|
||||
dbus_connection,
|
||||
}
|
||||
}
|
||||
|
||||
fn run(&'static self) -> Result<()> {
|
||||
debug!("Client running");
|
||||
|
||||
spawn(self.watch_devices_list());
|
||||
|
||||
let receiver = self.sender.subscribe();
|
||||
spawn(self.handle_received_events(receiver));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe(&self) -> broadcast::Receiver<ClientToModuleEvent> {
|
||||
self.controller_sender.subscribe()
|
||||
}
|
||||
|
||||
fn get_sender(&self) -> broadcast::Sender<ModuleToClientEvent> {
|
||||
self.sender.clone()
|
||||
}
|
||||
|
||||
async fn watch_devices_list(&'static self) -> Result<()> {
|
||||
debug!("D-Bus devices list watcher starting");
|
||||
|
||||
let root = DbusProxy::new(&self.dbus_connection().await?).await?;
|
||||
|
||||
let mut devices_changes = root.receive_all_devices_changed().await;
|
||||
while let Some(devices_change) = devices_changes.next().await {
|
||||
// The new list of devices from dbus, not to be confused with the added devices below
|
||||
let new_devices = HashSet::from_iter(devices_change.get().await?);
|
||||
let new_device_paths = devices_change
|
||||
.get()
|
||||
.await?
|
||||
.iter()
|
||||
.map(ObjectPath::to_owned)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let added_devices = new_devices.difference(&devices);
|
||||
for added_device in added_devices {
|
||||
spawn(watch_device(added_device.to_owned(), tx.clone()));
|
||||
let mut watchers = self.device_watchers.write().await;
|
||||
let device_paths = watchers.keys().cloned().collect::<HashSet<_>>();
|
||||
|
||||
let added_device_paths = new_device_paths.difference(&device_paths);
|
||||
for added_device_path in added_device_paths {
|
||||
debug_assert!(!watchers.contains_key(added_device_path));
|
||||
|
||||
let watcher = self.watch_device(added_device_path.clone()).await?;
|
||||
watchers.insert(added_device_path.clone(), watcher);
|
||||
}
|
||||
|
||||
let _removed_devices = devices.difference(&new_devices);
|
||||
// TODO: Cook up some way to notify closures for removed devices to exit
|
||||
let removed_device_paths = device_paths.difference(&new_device_paths);
|
||||
for removed_device_path in removed_device_paths {
|
||||
let watcher = watchers
|
||||
.get(removed_device_path)
|
||||
.expect("Device to be removed should be present in watchers");
|
||||
watcher.state_watcher.abort();
|
||||
watchers.remove(removed_device_path);
|
||||
|
||||
devices = new_devices;
|
||||
let number = get_number_from_dbus_path(removed_device_path);
|
||||
self.controller_sender
|
||||
.send(ClientToModuleEvent::DeviceRemoved { number })?;
|
||||
|
||||
debug!("D-bus device watchers for {} stopped", removed_device_path);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_received_events(
|
||||
&'static self,
|
||||
mut receiver: broadcast::Receiver<ModuleToClientEvent>,
|
||||
) -> Result<()> {
|
||||
while let Result::Ok(event) = receiver.recv().await {
|
||||
match event {
|
||||
ModuleToClientEvent::NewController => {
|
||||
debug!("Client received NewController event");
|
||||
|
||||
for device_path in self.device_watchers.read().await.keys() {
|
||||
let dbus_connection = &self.dbus_connection().await?;
|
||||
let device = DeviceDbusProxy::new(dbus_connection, device_path).await?;
|
||||
|
||||
let number = get_number_from_dbus_path(device_path);
|
||||
let r#type = device.device_type().await?;
|
||||
let new_state = device.state().await?;
|
||||
self.controller_sender
|
||||
.send(ClientToModuleEvent::DeviceChanged {
|
||||
number,
|
||||
r#type,
|
||||
new_state,
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
|
||||
self.tx.subscribe()
|
||||
async fn watch_device(&'static self, path: ObjectPath<'_>) -> Result<DeviceWatcher> {
|
||||
let dbus_connection = &self.dbus_connection().await?;
|
||||
let proxy = DeviceDbusProxy::new(dbus_connection, path.to_owned()).await?;
|
||||
|
||||
let number = get_number_from_dbus_path(&path);
|
||||
let r#type = proxy.device_type().await?;
|
||||
let new_state = proxy.state().await?;
|
||||
|
||||
// Notify modules that the device exists even if its properties don't change
|
||||
self.controller_sender
|
||||
.send(ClientToModuleEvent::DeviceChanged {
|
||||
number,
|
||||
r#type: r#type.clone(),
|
||||
new_state,
|
||||
})?;
|
||||
|
||||
let state_watcher = Arc::new(spawn(self.watch_device_state(proxy)));
|
||||
|
||||
Ok(DeviceWatcher { state_watcher })
|
||||
}
|
||||
|
||||
async fn watch_device_state(&'static self, proxy: DeviceDbusProxy<'_>) -> Result<()> {
|
||||
let path = proxy.inner().path();
|
||||
|
||||
debug!("D-Bus device state watcher for {} starting", path);
|
||||
|
||||
let number = get_number_from_dbus_path(path);
|
||||
let r#type = proxy.device_type().await?;
|
||||
|
||||
let mut changes = proxy.receive_state_changed().await;
|
||||
while let Some(change) = changes.next().await {
|
||||
let new_state = change.get().await?;
|
||||
self.controller_sender
|
||||
.send(ClientToModuleEvent::DeviceChanged {
|
||||
number,
|
||||
r#type: r#type.clone(),
|
||||
new_state,
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn dbus_connection(&self) -> Result<Connection> {
|
||||
let dbus_connection_guard = self.dbus_connection.read().await;
|
||||
if let Some(dbus_connection) = &*dbus_connection_guard {
|
||||
Ok(dbus_connection.clone())
|
||||
} else {
|
||||
// Yes it's a bit awkward to first obtain a read lock and then a write lock but it
|
||||
// needs to happen only once, and after that all read lock acquisitions will be
|
||||
// instant
|
||||
drop(dbus_connection_guard);
|
||||
let dbus_connection = Connection::system().await?;
|
||||
*self.dbus_connection.write().await = Some(dbus_connection.clone());
|
||||
Ok(dbus_connection)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_client() -> ClientResult<Client> {
|
||||
let client = Arc::new(Client::new()?);
|
||||
let client = Arc::new(Client::new());
|
||||
client.run()?;
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
async fn watch_device(device_path: ObjectPath<'_>, tx: broadcast::Sender<Event>) -> Result<()> {
|
||||
let dbus_connection = Connection::system().await?;
|
||||
let device = DeviceDbusProxy::new(&dbus_connection, device_path.to_owned()).await?;
|
||||
|
||||
let interface = device.interface().await?;
|
||||
tx.send(Event::DeviceAdded {
|
||||
interface: interface.to_string(),
|
||||
})?;
|
||||
|
||||
spawn(watch_device_state(
|
||||
device_path.to_owned(),
|
||||
interface.to_owned(),
|
||||
tx.clone(),
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn watch_device_state(
|
||||
device_path: ObjectPath<'_>,
|
||||
interface: Str<'_>,
|
||||
tx: broadcast::Sender<Event>,
|
||||
) -> Result<()> {
|
||||
let dbus_connection = Connection::system().await?;
|
||||
let device = DeviceDbusProxy::new(&dbus_connection, &device_path).await?;
|
||||
let r#type = device.device_type().await?;
|
||||
|
||||
// Send an event communicating the initial state
|
||||
let state = device.state().await?;
|
||||
tx.send(Event::DeviceStateChanged {
|
||||
interface: interface.to_string(),
|
||||
r#type: r#type.clone(),
|
||||
state,
|
||||
})?;
|
||||
|
||||
let mut state_changes = device.receive_state_changed().await;
|
||||
while let Some(state_change) = state_changes.next().await {
|
||||
let state = state_change.get().await?;
|
||||
tx.send(Event::DeviceStateChanged {
|
||||
interface: interface.to_string(),
|
||||
r#type: r#type.clone(),
|
||||
state,
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
fn get_number_from_dbus_path(path: &ObjectPath) -> u32 {
|
||||
let (_, number_str) = path
|
||||
.rsplit_once('/')
|
||||
.expect("Path must have at least two segments to contain an object number");
|
||||
number_str
|
||||
.parse()
|
||||
.expect("Last segment was not a positive integer")
|
||||
}
|
||||
|
||||
register_fallible_client!(Client, network_manager);
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::clients::networkmanager::Client;
|
||||
use crate::clients::networkmanager::dbus::{DeviceState, DeviceType};
|
||||
use crate::clients::networkmanager::event::Event;
|
||||
use crate::clients::networkmanager::event::{ClientToModuleEvent, ModuleToClientEvent};
|
||||
use crate::config::CommonConfig;
|
||||
use crate::gtk_helpers::IronbarGtkExt;
|
||||
use crate::image::Provider;
|
||||
|
@ -13,6 +13,7 @@ use gtk::{Image, Orientation};
|
|||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::debug;
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
|
||||
|
@ -29,7 +30,7 @@ const fn default_icon_size() -> i32 {
|
|||
}
|
||||
|
||||
impl Module<gtk::Box> for NetworkManagerModule {
|
||||
type SendMessage = Event;
|
||||
type SendMessage = ClientToModuleEvent;
|
||||
type ReceiveMessage = ();
|
||||
|
||||
module_impl!("network_manager");
|
||||
|
@ -37,19 +38,24 @@ impl Module<gtk::Box> for NetworkManagerModule {
|
|||
fn spawn_controller(
|
||||
&self,
|
||||
_info: &ModuleInfo,
|
||||
context: &WidgetContext<Event, ()>,
|
||||
_rx: mpsc::Receiver<()>,
|
||||
context: &WidgetContext<ClientToModuleEvent, ()>,
|
||||
_widget_receiver: mpsc::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
let client = context.try_client::<Client>()?;
|
||||
// Should we be using context.tx with ModuleUpdateEvent::Update instead?
|
||||
let tx = context.update_tx.clone();
|
||||
// Must be done here synchronously to avoid race condition
|
||||
let mut client_rx = client.subscribe();
|
||||
spawn(async move {
|
||||
while let Result::Ok(event) = client_rx.recv().await {
|
||||
tx.send(event)?;
|
||||
}
|
||||
let widget_sender = context.update_tx.clone();
|
||||
|
||||
// Must be done here otherwise we miss the response to our `NewController` event
|
||||
let mut client_receiver = client.subscribe();
|
||||
|
||||
client
|
||||
.get_sender()
|
||||
.send(ModuleToClientEvent::NewController)?;
|
||||
|
||||
spawn(async move {
|
||||
while let Result::Ok(event) = client_receiver.recv().await {
|
||||
widget_sender.send(event)?;
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
|
@ -58,16 +64,17 @@ impl Module<gtk::Box> for NetworkManagerModule {
|
|||
|
||||
fn into_widget(
|
||||
self,
|
||||
context: WidgetContext<Event, ()>,
|
||||
context: WidgetContext<ClientToModuleEvent, ()>,
|
||||
_info: &ModuleInfo,
|
||||
) -> Result<ModuleParts<gtk::Box>> {
|
||||
// Must be done here otherwise we miss the response to our `NewController` event
|
||||
let receiver = context.subscribe();
|
||||
|
||||
let container = gtk::Box::new(Orientation::Horizontal, 0);
|
||||
|
||||
// Must be done here synchronously to avoid race condition
|
||||
let rx = context.subscribe();
|
||||
// We cannot use recv_glib_async here because the lifetimes don't work out
|
||||
spawn_future_local(handle_update_events(
|
||||
rx,
|
||||
receiver,
|
||||
container.clone(),
|
||||
self.icon_size,
|
||||
context.ironbar.image_provider(),
|
||||
|
@ -78,31 +85,37 @@ impl Module<gtk::Box> for NetworkManagerModule {
|
|||
}
|
||||
|
||||
async fn handle_update_events(
|
||||
mut rx: broadcast::Receiver<Event>,
|
||||
mut widget_receiver: broadcast::Receiver<ClientToModuleEvent>,
|
||||
container: gtk::Box,
|
||||
icon_size: i32,
|
||||
image_provider: Provider,
|
||||
) {
|
||||
let mut icons = HashMap::<String, Image>::new();
|
||||
) -> Result<()> {
|
||||
// TODO: Ensure the visible icons are always in the same order
|
||||
let mut icons = HashMap::<u32, Image>::new();
|
||||
|
||||
while let Result::Ok(event) = rx.recv().await {
|
||||
while let Result::Ok(event) = widget_receiver.recv().await {
|
||||
match event {
|
||||
Event::DeviceAdded { interface, .. } => {
|
||||
ClientToModuleEvent::DeviceChanged {
|
||||
number,
|
||||
r#type,
|
||||
new_state,
|
||||
} => {
|
||||
debug!(
|
||||
"Module widget received DeviceChanged event for number {}",
|
||||
number
|
||||
);
|
||||
|
||||
let icon: &_ = icons.entry(number).or_insert_with(|| {
|
||||
debug!("Adding icon for device {}", number);
|
||||
|
||||
let icon = Image::new();
|
||||
icon.add_class("icon");
|
||||
container.add(&icon);
|
||||
icons.insert(interface, icon);
|
||||
}
|
||||
Event::DeviceStateChanged {
|
||||
interface,
|
||||
r#type,
|
||||
state,
|
||||
} => {
|
||||
let icon = icons
|
||||
.get(&interface)
|
||||
.expect("the icon for the interface to be present");
|
||||
icon
|
||||
});
|
||||
|
||||
// TODO: Make this configurable at runtime
|
||||
let icon_name = get_icon_for_device_state(&r#type, &state);
|
||||
let icon_name = get_icon_for_device_state(&r#type, &new_state);
|
||||
match icon_name {
|
||||
Some(icon_name) => {
|
||||
image_provider
|
||||
|
@ -115,9 +128,23 @@ async fn handle_update_events(
|
|||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
ClientToModuleEvent::DeviceRemoved { number } => {
|
||||
debug!(
|
||||
"Module widget received DeviceRemoved event for number {}",
|
||||
number
|
||||
);
|
||||
|
||||
let icon = icons
|
||||
.get(&number)
|
||||
.expect("The icon for {} was about to be removed but was not present");
|
||||
container.remove(icon);
|
||||
icons.remove(&number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_icon_for_device_state(r#type: &DeviceType, state: &DeviceState) -> Option<&'static str> {
|
||||
match r#type {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue