1
0
Fork 0
mirror of https://github.com/Zedfrigg/ironbar.git synced 2025-09-18 04:36:57 +02:00

refactor(networkmanager): break Client::run up into multiple functions

Also replace its shared state lifetime and synchonisation mechanisms with Arc<Mutex<_>>.
This commit is contained in:
Reinout Meliesie 2025-09-02 22:20:54 +02:00
commit ec00b2ce69
Signed by: zedfrigg
GPG key ID: 3AFCC06481308BC6

View file

@ -3,7 +3,7 @@ use color_eyre::eyre::Ok;
use futures_lite::StreamExt;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast};
use tokio::sync::{Mutex, broadcast};
use zbus::Connection;
use zbus::zvariant::{ObjectPath, Str};
@ -32,79 +32,19 @@ impl Client {
}
fn run(&self) -> Result<()> {
let devices: &'static _ = Box::leak(Box::new(RwLock::new(HashSet::<ObjectPath>::new())));
let devices = Arc::new(Mutex::new(HashSet::<ObjectPath>::new()));
{
let controller_sender = self.controller_sender.clone();
spawn(async move {
let dbus_connection = Connection::system().await?;
let root = DbusProxy::new(&dbus_connection).await?;
spawn(watch_devices_list(
devices.clone(),
self.controller_sender.clone(),
));
let mut devices_changes = root.receive_all_devices_changed().await;
while let Some(devices_change) = devices_changes.next().await {
// The new list of devices from dbus, not to be confused with the added devices below
let new_devices = devices_change
.get()
.await?
.iter()
.map(ObjectPath::to_owned)
.collect::<HashSet<_>>();
// We create a local clone here to avoid holding the lock for too long
let devices_snapshot = devices.read().await.clone();
let added_devices = new_devices.difference(&devices_snapshot);
for added_device in added_devices {
spawn(watch_device(
added_device.to_owned(),
controller_sender.clone(),
));
}
let _removed_devices = devices_snapshot.difference(&new_devices);
// TODO: Cook up some way to notify closures for removed devices to exit
*devices.write().await = new_devices;
}
Ok(())
});
}
{
let controller_sender = self.controller_sender.clone();
let mut receiver = self.sender.subscribe();
spawn(async move {
while let Result::Ok(event) = receiver.recv().await {
match event {
ModuleToClientEvent::NewController => {
// We create a local clone here to avoid holding the lock for too long
let devices_snapshot = devices.read().await.clone();
for device_path in devices_snapshot {
let dbus_connection = Connection::system().await?;
let device =
DeviceDbusProxy::new(&dbus_connection, device_path).await?;
// TODO: Create DeviceDbusProxy -> DeviceStateChanged function and use it in the watcher as well
let interface = device.interface().await?.to_string();
let r#type = device.device_type().await?;
let state = device.state().await?;
controller_sender.send(
ClientToModuleEvent::DeviceStateChanged {
interface,
r#type,
state,
},
)?;
}
}
}
}
Ok(())
});
}
let receiver = self.sender.subscribe();
spawn(handle_received_events(
receiver,
devices.clone(),
self.controller_sender.clone(),
));
Ok(())
}
@ -124,6 +64,77 @@ pub fn create_client() -> ClientResult<Client> {
Ok(client)
}
async fn watch_devices_list(
devices: Arc<Mutex<HashSet<ObjectPath<'_>>>>,
controller_sender: broadcast::Sender<ClientToModuleEvent>,
) -> Result<()> {
let dbus_connection = Connection::system().await?;
let root = DbusProxy::new(&dbus_connection).await?;
let mut devices_changes = root.receive_all_devices_changed().await;
while let Some(devices_change) = devices_changes.next().await {
// The new list of devices from dbus, not to be confused with the added devices below
let new_devices = devices_change
.get()
.await?
.iter()
.map(ObjectPath::to_owned)
.collect::<HashSet<_>>();
// Atomic read-then-write of `devices`
let mut devices_locked = devices.lock().await;
let devices_snapshot = devices_locked.clone();
(*devices_locked).clone_from(&new_devices);
drop(devices_locked);
let added_devices = new_devices.difference(&devices_snapshot);
for added_device in added_devices {
spawn(watch_device(
added_device.to_owned(),
controller_sender.clone(),
));
}
let _removed_devices = devices_snapshot.difference(&new_devices);
// TODO: Cook up some way to notify closures for removed devices to exit
}
Ok(())
}
async fn handle_received_events(
mut receiver: broadcast::Receiver<ModuleToClientEvent>,
devices: Arc<Mutex<HashSet<ObjectPath<'_>>>>,
controller_sender: broadcast::Sender<ClientToModuleEvent>,
) -> Result<()> {
while let Result::Ok(event) = receiver.recv().await {
match event {
ModuleToClientEvent::NewController => {
let dbus_connection = Connection::system().await?;
// We create a local clone here to avoid holding the lock for too long
let devices_snapshot = devices.lock().await.clone();
for device_path in devices_snapshot {
let device = DeviceDbusProxy::new(&dbus_connection, device_path).await?;
// TODO: Create DeviceDbusProxy -> DeviceStateChanged function and use it in the watcher as well
let interface = device.interface().await?.to_string();
let r#type = device.device_type().await?;
let state = device.state().await?;
controller_sender.send(ClientToModuleEvent::DeviceStateChanged {
interface,
r#type,
state,
})?;
}
}
}
}
Ok(())
}
async fn watch_device(
device_path: ObjectPath<'_>,
controller_sender: broadcast::Sender<ClientToModuleEvent>,